XrdFfsWcache.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /* XrdFfsWcache.cc simple write cache that captures consecutive small writes  */
00003 /*                                                                            */
00004 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00005 /*                            All Rights Reserved                             */
00006 /* Author: Wei Yang (SLAC National Accelerator Laboratory, 2009)              */
00007 /*         Contract DE-AC02-76-SFO0515 with the Department of Energy          */
00008 /******************************************************************************/
00009 
00010 /* 
00011    When direct_io is not used, kernel will break large write to 4Kbyte  
00012    writes. This significantly reduces the writting performance. This 
00013    simple cache mechanism is to improve the performace on small writes. 
00014 
00015    Note that fuse 2.8.0 pre2 or above and kernel 2.6.27 or above provide
00016    a big_writes option to allow > 4KByte writing. It will make this 
00017    smiple write caching obsolete. 
00018 */
00019 #define XrdFfsWcacheBufsize 131072
00020 
00021 #if defined(__linux__)
00022 /* For pread()/pwrite() */
00023 #ifndef _XOPEN_SOURCE
00024 #define _XOPEN_SOURCE 500
00025 #endif
00026 #endif
00027 
00028 #include <string.h>
00029 #include <stdlib.h>
00030 #include <sys/types.h>
00031 #include <sys/resource.h>
00032 #include <unistd.h>
00033 #include <errno.h>
00034 
00035 #include <pthread.h>
00036 
00037 #include "XrdFfs/XrdFfsWcache.hh"
00038 #ifndef NOXRD
00039     #include "XrdFfs/XrdFfsPosix.hh"
00040 #endif
00041 
00042 #ifdef __cplusplus
00043   extern "C" {
00044 #endif
00045 
00046 struct XrdFfsWcacheFilebuf {
00047     off_t offset;
00048     size_t len;
00049     char *buf;
00050     pthread_mutex_t *mlock;
00051 };
00052 
00053 struct XrdFfsWcacheFilebuf *XrdFfsWcacheFbufs;
00054 
00055 /* #include "xrdposix.h" */
00056 
00057 int XrdFfsWcacheNFILES;
00058 void XrdFfsWcache_init()
00059 {
00060     int fd;
00061     struct rlimit rlp;
00062 
00063     getrlimit(RLIMIT_NOFILE, &rlp);
00064     XrdFfsWcacheNFILES = rlp.rlim_cur;
00065     XrdFfsWcacheNFILES = (XrdFfsWcacheNFILES == (int)RLIM_INFINITY? 4096 : XrdFfsWcacheNFILES);
00066     
00067 /*    printf("%d %d\n", XrdFfsWcacheNFILES, sizeof(struct XrdFfsWcacheFilebuf)); */
00068     XrdFfsWcacheFbufs = (struct XrdFfsWcacheFilebuf*)malloc(sizeof(struct XrdFfsWcacheFilebuf) * XrdFfsWcacheNFILES);
00069     for (fd = 0; fd < XrdFfsWcacheNFILES; fd++)
00070     {
00071         XrdFfsWcacheFbufs[fd].offset = 0;
00072         XrdFfsWcacheFbufs[fd].len = 0;
00073         XrdFfsWcacheFbufs[fd].buf = NULL;
00074         XrdFfsWcacheFbufs[fd].mlock = NULL;
00075     }
00076 }
00077 
00078 int XrdFfsWcache_create(int fd) 
00079 {
00080     XrdFfsWcache_destroy(fd);
00081 
00082     XrdFfsWcacheFbufs[fd].offset = 0;
00083     XrdFfsWcacheFbufs[fd].len = 0;
00084     XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsWcacheBufsize);
00085     if (XrdFfsWcacheFbufs[fd].buf == NULL)
00086         return 0;
00087     XrdFfsWcacheFbufs[fd].mlock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
00088     if (XrdFfsWcacheFbufs[fd].mlock == NULL)
00089         return 0;
00090     else
00091         pthread_mutex_init(XrdFfsWcacheFbufs[fd].mlock, NULL);
00092     return 1;
00093 }
00094 
00095 void XrdFfsWcache_destroy(int fd)
00096 {
00097 /*  XrdFfsWcache_flush(fd); */
00098     
00099     XrdFfsWcacheFbufs[fd].offset = 0;
00100     XrdFfsWcacheFbufs[fd].len = 0;
00101     if (XrdFfsWcacheFbufs[fd].buf != NULL) 
00102         free(XrdFfsWcacheFbufs[fd].buf);
00103     XrdFfsWcacheFbufs[fd].buf = NULL;
00104     if (XrdFfsWcacheFbufs[fd].mlock != NULL)
00105     {
00106         pthread_mutex_destroy(XrdFfsWcacheFbufs[fd].mlock);
00107         free(XrdFfsWcacheFbufs[fd].mlock);
00108     }
00109     XrdFfsWcacheFbufs[fd].mlock = NULL;
00110 }
00111 
00112 ssize_t XrdFfsWcache_flush(int fd)
00113 {
00114     ssize_t rc;
00115 
00116     if (XrdFfsWcacheFbufs[fd].len == 0 || XrdFfsWcacheFbufs[fd].buf == NULL )
00117         return 0;
00118 
00119 #ifndef NOXRD
00120     rc = XrdFfsPosix_pwrite(fd, XrdFfsWcacheFbufs[fd].buf, XrdFfsWcacheFbufs[fd].len, XrdFfsWcacheFbufs[fd].offset);
00121 #else
00122     rc =     pwrite(fd, XrdFfsWcacheFbufs[fd].buf, XrdFfsWcacheFbufs[fd].len, XrdFfsWcacheFbufs[fd].offset);
00123 #endif
00124     if (rc > 0)
00125     {
00126         XrdFfsWcacheFbufs[fd].offset = 0;
00127         XrdFfsWcacheFbufs[fd].len = 0;
00128     }
00129     return rc;
00130 }
00131 
00132 ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
00133 {
00134     ssize_t rc;
00135     char *bufptr;
00136 
00137 /* do not use caching under these cases */
00138     if (len > XrdFfsWcacheBufsize/2 || fd >= XrdFfsWcacheNFILES)
00139     {
00140 #ifndef NOXRD
00141         rc = XrdFfsPosix_pwrite(fd, buf, len, offset);
00142 #else 
00143         rc = pwrite(fd, buf, len, offset);
00144 #endif
00145         return rc;
00146     }
00147 
00148     pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
00149     rc = XrdFfsWcacheFbufs[fd].len;
00150 /* 
00151    in the following two cases, a XrdFfsWcache_flush is required:
00152    1. current offset isnn't pointing to the tail of data in buffer
00153    2. adding new data will exceed the current buffer 
00154 */ 
00155     if (offset != (off_t)(XrdFfsWcacheFbufs[fd].offset + XrdFfsWcacheFbufs[fd].len) ||
00156         (off_t)(offset + len) > (XrdFfsWcacheFbufs[fd].offset + XrdFfsWcacheBufsize))
00157         rc = XrdFfsWcache_flush(fd);
00158 
00159     errno = 0;
00160     if (rc < 0) 
00161     {
00162         errno = ENOSPC;
00163         pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
00164         return -1;
00165     }
00166 
00167     bufptr = &XrdFfsWcacheFbufs[fd].buf[XrdFfsWcacheFbufs[fd].len];
00168     memcpy(bufptr, buf, len);
00169     if (XrdFfsWcacheFbufs[fd].len == 0)
00170         XrdFfsWcacheFbufs[fd].offset = offset;
00171     XrdFfsWcacheFbufs[fd].len += len;
00172 
00173     pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
00174     return (ssize_t)len;
00175 }
00176 
00177 #ifdef __cplusplus
00178   }
00179 #endif

Generated on Tue Jul 5 14:46:35 2011 for ROOT_528-00b_version by  doxygen 1.5.1