00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #define XrdFfsWcacheBufsize 131072
00020
00021 #if defined(__linux__)
00022
00023 #ifndef _XOPEN_SOURCE
00024 #define _XOPEN_SOURCE 500
00025 #endif
00026 #endif
00027
00028 #include <string.h>
00029 #include <stdlib.h>
00030 #include <sys/types.h>
00031 #include <sys/resource.h>
00032 #include <unistd.h>
00033 #include <errno.h>
00034
00035 #include <pthread.h>
00036
00037 #include "XrdFfs/XrdFfsWcache.hh"
00038 #ifndef NOXRD
00039 #include "XrdFfs/XrdFfsPosix.hh"
00040 #endif
00041
00042 #ifdef __cplusplus
00043 extern "C" {
00044 #endif
00045
00046 struct XrdFfsWcacheFilebuf {
00047 off_t offset;
00048 size_t len;
00049 char *buf;
00050 pthread_mutex_t *mlock;
00051 };
00052
00053 struct XrdFfsWcacheFilebuf *XrdFfsWcacheFbufs;
00054
00055
00056
00057 int XrdFfsWcacheNFILES;
00058 void XrdFfsWcache_init()
00059 {
00060 int fd;
00061 struct rlimit rlp;
00062
00063 getrlimit(RLIMIT_NOFILE, &rlp);
00064 XrdFfsWcacheNFILES = rlp.rlim_cur;
00065 XrdFfsWcacheNFILES = (XrdFfsWcacheNFILES == (int)RLIM_INFINITY? 4096 : XrdFfsWcacheNFILES);
00066
00067
00068 XrdFfsWcacheFbufs = (struct XrdFfsWcacheFilebuf*)malloc(sizeof(struct XrdFfsWcacheFilebuf) * XrdFfsWcacheNFILES);
00069 for (fd = 0; fd < XrdFfsWcacheNFILES; fd++)
00070 {
00071 XrdFfsWcacheFbufs[fd].offset = 0;
00072 XrdFfsWcacheFbufs[fd].len = 0;
00073 XrdFfsWcacheFbufs[fd].buf = NULL;
00074 XrdFfsWcacheFbufs[fd].mlock = NULL;
00075 }
00076 }
00077
00078 int XrdFfsWcache_create(int fd)
00079 {
00080 XrdFfsWcache_destroy(fd);
00081
00082 XrdFfsWcacheFbufs[fd].offset = 0;
00083 XrdFfsWcacheFbufs[fd].len = 0;
00084 XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsWcacheBufsize);
00085 if (XrdFfsWcacheFbufs[fd].buf == NULL)
00086 return 0;
00087 XrdFfsWcacheFbufs[fd].mlock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
00088 if (XrdFfsWcacheFbufs[fd].mlock == NULL)
00089 return 0;
00090 else
00091 pthread_mutex_init(XrdFfsWcacheFbufs[fd].mlock, NULL);
00092 return 1;
00093 }
00094
00095 void XrdFfsWcache_destroy(int fd)
00096 {
00097
00098
00099 XrdFfsWcacheFbufs[fd].offset = 0;
00100 XrdFfsWcacheFbufs[fd].len = 0;
00101 if (XrdFfsWcacheFbufs[fd].buf != NULL)
00102 free(XrdFfsWcacheFbufs[fd].buf);
00103 XrdFfsWcacheFbufs[fd].buf = NULL;
00104 if (XrdFfsWcacheFbufs[fd].mlock != NULL)
00105 {
00106 pthread_mutex_destroy(XrdFfsWcacheFbufs[fd].mlock);
00107 free(XrdFfsWcacheFbufs[fd].mlock);
00108 }
00109 XrdFfsWcacheFbufs[fd].mlock = NULL;
00110 }
00111
00112 ssize_t XrdFfsWcache_flush(int fd)
00113 {
00114 ssize_t rc;
00115
00116 if (XrdFfsWcacheFbufs[fd].len == 0 || XrdFfsWcacheFbufs[fd].buf == NULL )
00117 return 0;
00118
00119 #ifndef NOXRD
00120 rc = XrdFfsPosix_pwrite(fd, XrdFfsWcacheFbufs[fd].buf, XrdFfsWcacheFbufs[fd].len, XrdFfsWcacheFbufs[fd].offset);
00121 #else
00122 rc = pwrite(fd, XrdFfsWcacheFbufs[fd].buf, XrdFfsWcacheFbufs[fd].len, XrdFfsWcacheFbufs[fd].offset);
00123 #endif
00124 if (rc > 0)
00125 {
00126 XrdFfsWcacheFbufs[fd].offset = 0;
00127 XrdFfsWcacheFbufs[fd].len = 0;
00128 }
00129 return rc;
00130 }
00131
00132 ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
00133 {
00134 ssize_t rc;
00135 char *bufptr;
00136
00137
00138 if (len > XrdFfsWcacheBufsize/2 || fd >= XrdFfsWcacheNFILES)
00139 {
00140 #ifndef NOXRD
00141 rc = XrdFfsPosix_pwrite(fd, buf, len, offset);
00142 #else
00143 rc = pwrite(fd, buf, len, offset);
00144 #endif
00145 return rc;
00146 }
00147
00148 pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
00149 rc = XrdFfsWcacheFbufs[fd].len;
00150
00151
00152
00153
00154
00155 if (offset != (off_t)(XrdFfsWcacheFbufs[fd].offset + XrdFfsWcacheFbufs[fd].len) ||
00156 (off_t)(offset + len) > (XrdFfsWcacheFbufs[fd].offset + XrdFfsWcacheBufsize))
00157 rc = XrdFfsWcache_flush(fd);
00158
00159 errno = 0;
00160 if (rc < 0)
00161 {
00162 errno = ENOSPC;
00163 pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
00164 return -1;
00165 }
00166
00167 bufptr = &XrdFfsWcacheFbufs[fd].buf[XrdFfsWcacheFbufs[fd].len];
00168 memcpy(bufptr, buf, len);
00169 if (XrdFfsWcacheFbufs[fd].len == 0)
00170 XrdFfsWcacheFbufs[fd].offset = offset;
00171 XrdFfsWcacheFbufs[fd].len += len;
00172
00173 pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
00174 return (ssize_t)len;
00175 }
00176
00177 #ifdef __cplusplus
00178 }
00179 #endif