00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdOfsPoscqCVSID = "$Id: XrdOfsPoscq.cc 30949 2009-11-02 16:37:58Z ganis $";
00014
00015 #include <string.h>
00016 #include <strings.h>
00017 #include <stddef.h>
00018 #include <stdio.h>
00019 #include <fcntl.h>
00020 #include <unistd.h>
00021 #include <errno.h>
00022 #include <sys/param.h>
00023 #include <sys/types.h>
00024 #include <sys/stat.h>
00025
00026 #include "XrdOfs/XrdOfsPoscq.hh"
00027 #include "XrdOss/XrdOss.hh"
00028 #include "XrdSys/XrdSysError.hh"
00029 #include "XrdSys/XrdSysPlatform.hh"
00030
00031
00032
00033
00034
00035 XrdOfsPoscq::XrdOfsPoscq(XrdSysError *erp, XrdOss *oss, const char *fn)
00036 {
00037 eDest = erp;
00038 ossFS = oss;
00039 pocFN = strdup(fn);
00040 pocFD = -1;
00041 pocSZ = 0;
00042 pocIQ = 0;
00043 SlotList = SlotLust = 0;
00044 }
00045
00046
00047
00048
00049
00050 int XrdOfsPoscq::Add(const char *Tident, const char *Lfn)
00051 {
00052 XrdOfsPoscq::Request tmpReq;
00053 FileSlot *freeSlot;
00054 int fP;
00055
00056
00057
00058 tmpReq.addT = 0;
00059 strlcpy(tmpReq.LFN, Lfn, sizeof(tmpReq.LFN));
00060 strlcpy(tmpReq.User, Tident, sizeof(tmpReq.User));
00061 memset(tmpReq.Reserved, 0, sizeof(tmpReq.Reserved));
00062
00063
00064
00065 myMutex.Lock();
00066 if ((freeSlot = SlotList))
00067 {fP = freeSlot->Offset;
00068 SlotList = freeSlot->Next;
00069 freeSlot->Next = SlotLust;
00070 SlotLust = freeSlot;
00071 } else {fP = pocSZ; pocSZ += ReqSize;}
00072 pocIQ++;
00073 myMutex.UnLock();
00074
00075
00076
00077 if (!reqWrite((void *)&tmpReq, sizeof(tmpReq), fP))
00078 {eDest->Emsg("Add", Lfn, "not added to the persist queue.");
00079 myMutex.Lock(); pocIQ--; myMutex.UnLock();
00080 return -EIO;
00081 }
00082
00083
00084
00085 return fP;
00086 }
00087
00088
00089
00090
00091
00092 int XrdOfsPoscq::Commit(const char *Lfn, int Offset)
00093 {
00094 long long addT = static_cast<long long>(time(0));
00095
00096
00097
00098 if (!VerOffset(Lfn, Offset)) return -EINVAL;
00099
00100
00101
00102 if (reqWrite((void *)&addT, sizeof(addT), Offset)) return 0;
00103 eDest->Emsg("Commit", Lfn, "not commited to the persist queue.");
00104 return -EIO;
00105 }
00106
00107
00108
00109
00110
00111 int XrdOfsPoscq::Del(const char *Lfn, int Offset, int Unlink)
00112 {
00113 static int Zero = 0;
00114 FileSlot *freeSlot;
00115 int retc;
00116
00117
00118
00119 if (!VerOffset(Lfn, Offset)) return -EINVAL;
00120
00121
00122
00123 if (Unlink && (retc = ossFS->Unlink(Lfn)) && retc != -ENOENT)
00124 {eDest->Emsg("Del", retc, "remove", Lfn);
00125 return (retc < 0 ? retc : -retc);
00126 }
00127
00128
00129
00130 if (!reqWrite((void *)&Zero, sizeof(Zero), Offset+offsetof(Request,LFN)))
00131 {eDest->Emsg("Del", Lfn, "not removed from the persist queue.");
00132 return -EIO;
00133 }
00134
00135
00136
00137 myMutex.Lock();
00138 if ((freeSlot = SlotLust)) SlotLust = freeSlot->Next;
00139 else freeSlot = new FileSlot;
00140 freeSlot->Offset = Offset;
00141 freeSlot->Next = SlotList;
00142 SlotList = freeSlot;
00143 if (pocIQ > 0) pocIQ--;
00144 myMutex.UnLock();
00145
00146
00147
00148 return 0;
00149 }
00150
00151
00152
00153
00154
00155 XrdOfsPoscq::recEnt *XrdOfsPoscq::Init(int &Ok)
00156 {
00157 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00158 Request tmpReq;
00159 struct stat buf, Stat;
00160 recEnt *First = 0;
00161 char Buff[80];
00162 int rc, Offs, numreq = 0;
00163
00164
00165
00166 Ok = 0;
00167
00168
00169
00170 if ((pocFD = open(pocFN, O_RDWR|O_CREAT, Mode)) < 0)
00171 {eDest->Emsg("Init",errno,"open",pocFN);
00172 return 0;
00173 }
00174
00175
00176
00177 if (fstat(pocFD, &buf)) {FailIni("stat"); return 0;}
00178
00179
00180
00181 if (buf.st_size < ReqSize)
00182 {pocSZ = ReqOffs;
00183 if (ftruncate(pocFD, ReqOffs)) FailIni("trunc");
00184 else Ok = 1;
00185 return 0;
00186 }
00187
00188
00189
00190 for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
00191 {do {rc = pread(pocFD, (void *)&tmpReq, ReqSize, Offs);}
00192 while(rc < 0 && errno == EINTR);
00193 if (rc < 0) {eDest->Emsg("Init",errno,"read",pocFN); return First;}
00194 if (*tmpReq.LFN == '\0'
00195 || ossFS->Stat(tmpReq.LFN, &Stat)
00196 || !(S_ISREG(Stat.st_mode) || !(Stat.st_mode & S_ISUID))) continue;
00197 First = new recEnt(tmpReq, Stat.st_mode & S_IAMB, First); numreq++;
00198 }
00199
00200
00201
00202 sprintf(Buff, " %d pending create%s", numreq, (numreq != 1 ? "s" : ""));
00203 eDest->Say("Init", Buff, " recovered from ", pocFN);
00204 if (ReWrite(First)) Ok = 1;
00205 return First;
00206 }
00207
00208
00209
00210
00211
00212 XrdOfsPoscq::recEnt *XrdOfsPoscq::List(XrdSysError *Say, const char *theFN)
00213 {
00214 XrdOfsPoscq::Request tmpReq;
00215 struct stat buf;
00216 recEnt *First = 0;
00217 int rc, theFD, Offs;
00218
00219
00220
00221 if ((theFD = open(theFN, O_RDONLY)) < 0)
00222 {Say->Emsg("Init",errno,"open",theFN);
00223 return 0;
00224 }
00225
00226
00227
00228 if (fstat(theFD, &buf))
00229 {Say->Emsg("Init",errno,"stat",theFN);
00230 close(theFD);
00231 return 0;
00232 }
00233 if (buf.st_size < ReqSize) buf.st_size = 0;
00234
00235
00236
00237 for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
00238 {do {rc = pread(theFD, (void *)&tmpReq, ReqSize, Offs);}
00239 while(rc < 0 && errno == EINTR);
00240 if (rc < 0) {Say->Emsg("List",errno,"read",theFN); return First;}
00241 if (*tmpReq.LFN != '\0') First = new recEnt(tmpReq, 0, First);
00242 }
00243
00244
00245
00246 close(theFD);
00247 return First;
00248 }
00249
00250
00251
00252
00253
00254 void XrdOfsPoscq::FailIni(const char *txt)
00255 {
00256 eDest->Emsg("Init", errno, txt, pocFN);
00257 }
00258
00259
00260
00261
00262
00263 int XrdOfsPoscq::reqWrite(void *Buff, int Bsz, int Offs)
00264 {
00265 int rc = 0;
00266
00267 do {rc = pwrite(pocFD, Buff, Bsz, Offs);} while(rc < 0 && errno == EINTR);
00268
00269 if (rc >= 0 && Bsz > 8) rc = fsync(pocFD);
00270
00271 if (rc < 0) {eDest->Emsg("reqWrite",errno,"write", pocFN); return 0;}
00272 return 1;
00273 }
00274
00275
00276
00277
00278
00279 int XrdOfsPoscq::ReWrite(XrdOfsPoscq::recEnt *rP)
00280 {
00281 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00282 char newFN[MAXPATHLEN], *oldFN;
00283 int newFD, oldFD, Offs = ReqOffs, aOK = 1;
00284
00285
00286
00287 strcpy(newFN, pocFN); strcat(newFN, ".new");
00288 if ((newFD = open(newFN, O_RDWR|O_CREAT|O_TRUNC, Mode)) < 0)
00289 {eDest->Emsg("ReWrite",errno,"open",newFN); return 0;}
00290
00291
00292
00293 oldFD = pocFD; pocFD = newFD;
00294 oldFN = pocFN; pocFN = newFN;
00295
00296
00297
00298 while(rP)
00299 {rP->Offset = Offs;
00300 if (!reqWrite((void *)&rP->reqData, ReqSize, Offs)) {aOK = 0; break;}
00301 Offs += ReqSize;
00302 rP = rP->Next;
00303 }
00304
00305
00306
00307 if (aOK && rename(newFN, oldFN) < 0)
00308 {eDest->Emsg("ReWrite",errno,"rename",newFN); aOK = 0;}
00309
00310
00311
00312 if (aOK) close(oldFD);
00313 else {close(newFD); pocFD = oldFD;}
00314 pocFN = oldFN;
00315 pocSZ = Offs;
00316 return aOK;
00317 }
00318
00319
00320
00321
00322
00323 int XrdOfsPoscq::VerOffset(const char *Lfn, int Offset)
00324 {
00325
00326
00327
00328 if (Offset < ReqOffs || (Offset-ReqOffs)%ReqSize)
00329 {char buff[128];
00330 sprintf(buff, "Invalid slot %d for", Offset);
00331 eDest->Emsg("VerOffset", buff, Lfn);
00332 return 0;
00333 }
00334 return 1;
00335 }