00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdFrmReqFileCVSID = "$Id: XrdFrmReqFile.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <string.h>
00016 #include <strings.h>
00017 #include <stdio.h>
00018 #include <fcntl.h>
00019 #include <unistd.h>
00020 #include <errno.h>
00021 #include <sys/param.h>
00022 #include <sys/types.h>
00023 #include <sys/stat.h>
00024
00025 #include "XrdFrm/XrdFrmCID.hh"
00026 #include "XrdFrm/XrdFrmReqFile.hh"
00027 #include "XrdFrm/XrdFrmTrace.hh"
00028 #include "XrdSys/XrdSysError.hh"
00029 #include "XrdSys/XrdSysPlatform.hh"
00030
00031 using namespace XrdFrm;
00032
00033
00034
00035
00036
00037 XrdSysMutex XrdFrmReqFile::rqMonitor::rqMutex;
00038
00039
00040
00041
00042
00043 XrdFrmReqFile::XrdFrmReqFile(const char *fn, int aVal)
00044 {
00045 char buff[1200];
00046
00047 memset((void *)&HdrData, 0, sizeof(HdrData));
00048 reqFN = strdup(fn);
00049 strcpy(buff, fn); strcat(buff, ".lock");
00050 lokFN = strdup(buff);
00051 lokFD = reqFD = -1;
00052 isAgent = aVal;
00053 }
00054
00055
00056
00057
00058
00059 void XrdFrmReqFile::Add(XrdFrmRequest *rP)
00060 {
00061 rqMonitor rqMon(isAgent);
00062 XrdFrmRequest tmpReq;
00063 int fP;
00064
00065
00066
00067 if (!FileLock()) {FailAdd(rP->LFN, 0); return;}
00068
00069
00070
00071 if ((fP = HdrData.Free))
00072 {if (!reqRead((void *)&tmpReq, fP)) {FailAdd(rP->LFN, 1); return;}
00073 HdrData.Free = tmpReq.Next;
00074 } else {
00075 struct stat buf;
00076 if (fstat(reqFD, &buf))
00077 {Say.Emsg("Add",errno,"stat",reqFN); FailAdd(rP->LFN, 1); return;}
00078 fP = buf.st_size;
00079 }
00080
00081
00082
00083 if (rP->Options & XrdFrmRequest::Register)
00084 {if (HdrData.First) rP->Next = HdrData.First;
00085 else {HdrData.First = HdrData.Last = fP; rP->Next = 0;}
00086 } else {
00087 if (HdrData.First && HdrData.Last)
00088 {if (!reqRead((void *)&tmpReq, HdrData.Last))
00089 {FailAdd(rP->LFN, 1); return;}
00090 tmpReq.Next = fP;
00091 if (!reqWrite((void *)&tmpReq, HdrData.Last, 0))
00092 {FailAdd(rP->LFN, 1); return;}
00093 } else HdrData.First = fP;
00094 HdrData.Last = fP; rP->Next = 0;
00095 }
00096
00097
00098
00099 rP->This = fP;
00100 if (!reqWrite(rP, fP)) FailAdd(rP->LFN, 0);
00101 FileLock(lkNone);
00102 }
00103
00104
00105
00106
00107
00108 void XrdFrmReqFile::Can(XrdFrmRequest *rP)
00109 {
00110 rqMonitor rqMon(isAgent);
00111 XrdFrmRequest tmpReq;
00112 int Offs, numCan = 0, numBad = 0;
00113 struct stat buf;
00114 char txt[128];
00115
00116
00117
00118 if (!FileLock() || fstat(reqFD, &buf)) {FailCan(rP->ID, 0); return;}
00119
00120
00121
00122 for (Offs = ReqSize; Offs < buf.st_size; Offs += ReqSize)
00123 {if (!reqRead((void *)&tmpReq, Offs)) return FailCan(rP->ID);
00124 if (!strcmp(tmpReq.ID, rP->ID))
00125 {tmpReq.LFN[0] = '\0';
00126 if (!reqWrite((void *)&tmpReq, Offs, 0)) numBad++;
00127 else numCan++;
00128 }
00129 }
00130
00131
00132
00133 if (numCan) fsync(reqFD);
00134
00135
00136
00137 if (numCan || numBad)
00138 {sprintf(txt, "has %d entries; %d removed (%d failures).",
00139 numCan+numBad, numCan, numBad);
00140 Say.Emsg("Can", rP->ID, txt);
00141 }
00142 FileLock(lkNone);
00143 }
00144
00145
00146
00147
00148
00149 void XrdFrmReqFile::Del(XrdFrmRequest *rP)
00150 {
00151 rqMonitor rqMon(isAgent);
00152 XrdFrmRequest tmpReq;
00153
00154
00155
00156 if (!FileLock()) {FailDel(rP->LFN, 0); return;}
00157
00158
00159
00160 memset(&tmpReq, 0, sizeof(tmpReq));
00161 tmpReq.Next = HdrData.Free;
00162 HdrData.Free = rP->This;
00163 if (!reqWrite((void *)&tmpReq, rP->This)) FailDel(rP->LFN, 0);
00164 FileLock(lkNone);
00165 }
00166
00167
00168
00169
00170
00171 int XrdFrmReqFile::Get(XrdFrmRequest *rP)
00172 {
00173 int fP, rc;
00174
00175
00176
00177 if (!FileLock()) return 0;
00178
00179
00180
00181 while((fP = HdrData.First))
00182 {if (!reqRead((void *)rP, fP)) {FileLock(lkNone); return 0;}
00183 HdrData.First= rP->Next;
00184 if (*(rP->LFN)) break;
00185 rP->Next = HdrData.Free;
00186 HdrData.Free = fP;
00187 if (!reqWrite(rP, fP)) {fP = 0; break;}
00188 }
00189 reqWrite(0,0,1);
00190 if (fP) rc = (HdrData.First ? 1 : -1);
00191 else rc = 0;
00192 FileLock(lkNone);
00193 return rc;
00194 }
00195
00196
00197
00198
00199
00200 int XrdFrmReqFile::Init()
00201 {
00202 EPNAME("Init");
00203 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00204 XrdFrmRequest tmpReq;
00205 struct stat buf;
00206 recEnt *RegList = 0, *First = 0, *rP, *pP, *tP;
00207 int Offs, rc, numreq = 0;
00208
00209
00210
00211 if ((lokFD = open(lokFN, O_RDWR|O_CREAT, Mode)) < 0)
00212 {Say.Emsg("Init",errno,"open",lokFN); return 0;}
00213 fcntl(lokFD, F_SETFD, FD_CLOEXEC);
00214
00215
00216
00217 if (!FileLock(lkInit)) return 0;
00218
00219
00220
00221 if ((reqFD = open(reqFN, O_RDWR|O_CREAT, Mode)) < 0)
00222 {FileLock(lkNone);
00223 Say.Emsg("Init",errno,"open",reqFN);
00224 return 0;
00225 }
00226 fcntl(reqFD, F_SETFD, FD_CLOEXEC);
00227
00228
00229
00230 if (fstat(reqFD, &buf)) return FailIni("stat");
00231 if (buf.st_size < ReqSize)
00232 {memset(&tmpReq, 0, sizeof(tmpReq));
00233 HdrData.Free = ReqSize;
00234 if (!reqWrite((void *)&tmpReq, ReqSize)) return FailIni("init file");
00235 FileLock(lkNone);
00236 return 1;
00237 }
00238
00239
00240
00241 if (isAgent)
00242 {FileLock(lkNone);
00243 return 1;
00244 }
00245
00246
00247
00248 for (Offs = ReqSize; Offs < buf.st_size; Offs += ReqSize)
00249 {if (!reqRead((void *)&tmpReq, Offs)) return FailIni("read file");
00250 if (*tmpReq.LFN == '\0' || !tmpReq.addTOD
00251 || tmpReq.Opaque >= int(sizeof(tmpReq.LFN))) continue;
00252 pP = 0; rP = First; tP = new recEnt(tmpReq); numreq++;
00253 if (tmpReq.Options & XrdFrmRequest::Register)
00254 {tP->Next = RegList; RegList = tP;
00255 } else {
00256 while(rP && rP->reqData.addTOD < tmpReq.addTOD) {pP=rP;rP=rP->Next;}
00257 if (pP) pP->Next = tP;
00258 else First = tP;
00259 tP->Next = rP;
00260 }
00261 }
00262
00263
00264
00265 while((rP = RegList))
00266 {RegList = rP->Next;
00267 rP->Next = First;
00268 First = rP;
00269 }
00270
00271
00272
00273 DEBUG(numreq <<" request(s) recovered from " <<reqFN);
00274 rc = ReWrite(First);
00275
00276
00277
00278 while((tP = First))
00279 {First = tP->Next;
00280 CID.Ref(tP->reqData.iName);
00281 delete tP;
00282 }
00283
00284
00285
00286 FileLock(lkNone);
00287 return rc;
00288 }
00289
00290
00291
00292
00293
00294 char *XrdFrmReqFile::List(char *Buff, int bsz, int &Offs,
00295 XrdFrmRequest::Item *ITList, int ITNum)
00296 {
00297 rqMonitor rqMon(isAgent);
00298 XrdFrmRequest tmpReq;
00299 int rc;
00300
00301
00302
00303 if (Offs < ReqSize) Offs = ReqSize;
00304
00305
00306
00307 if (!FileLock(lkShare)) return 0;
00308
00309
00310
00311 do{do {rc = pread(reqFD, (void *)&tmpReq, ReqSize, Offs);}
00312 while(rc < 0 && errno == EINTR);
00313 if (rc == ReqSize)
00314 {Offs += ReqSize;
00315 if (*tmpReq.LFN == '\0' || !tmpReq.addTOD
00316 || tmpReq.Opaque >= int(sizeof(tmpReq.LFN))
00317 || tmpReq.Options & XrdFrmRequest::Register) continue;
00318 FileLock(lkNone);
00319 if (!ITNum || !ITList) strlcpy(Buff, tmpReq.LFN, bsz);
00320 else ListL(tmpReq, Buff, bsz, ITList, ITNum);
00321 return Buff;
00322 }
00323 } while(rc == ReqSize);
00324
00325
00326
00327 if (rc < 0) Say.Emsg("List",errno,"read",reqFN);
00328
00329
00330
00331 FileLock(lkNone);
00332 return 0;
00333 }
00334
00335
00336
00337
00338
00339 void XrdFrmReqFile::ListL(XrdFrmRequest &tmpReq, char *Buff, int bsz,
00340 XrdFrmRequest::Item *ITList, int ITNum)
00341 {
00342 char What, tbuf[32];
00343 long long tval;
00344 int i, k, n, bln = bsz-2, Lfo;
00345
00346 for (i = 0; i < ITNum && bln > 0; i++)
00347 {Lfo = tmpReq.LFO;
00348 switch(ITList[i])
00349 {case XrdFrmRequest::getOBJ:
00350 Lfo = 0;
00351
00352 case XrdFrmRequest::getLFN:
00353 n = strlen(tmpReq.LFN+Lfo);
00354 strlcpy(Buff, tmpReq.LFN+Lfo, bln);
00355 break;
00356
00357 case XrdFrmRequest::getOBJCGI:
00358 Lfo = 0;
00359
00360 case XrdFrmRequest::getLFNCGI:
00361 n = strlen(tmpReq.LFN); tmpReq.LFN[n] = '?';
00362 if (!tmpReq.Opaque) tmpReq.LFN[n+1] = '\0';
00363 strlcpy(Buff, tmpReq.LFN+Lfo, bln);
00364 k = strlen(tmpReq.LFN+Lfo);
00365 tmpReq.LFN[n] = '\0'; n = k;
00366 break;
00367
00368 case XrdFrmRequest::getMODE:
00369 n = 0;
00370 What = (tmpReq.Options & XrdFrmRequest::makeRW
00371 ? 'w' : 'r');
00372 if (bln) {Buff[n] = What; n++;}
00373 if (tmpReq.Options & XrdFrmRequest::msgFail)
00374 if (bln-n > 0) {Buff[n] = 'f'; n++;}
00375 if (tmpReq.Options & XrdFrmRequest::msgSucc)
00376 if (bln-n > 0) {Buff[n] = 'n'; n++;}
00377 break;
00378
00379 case XrdFrmRequest::getNOTE:
00380 n = strlen(tmpReq.Notify);
00381 strlcpy(Buff, tmpReq.Notify, bln);
00382 break;
00383
00384 case XrdFrmRequest::getOP:
00385 *Buff = tmpReq.OPc;
00386 n = 1;
00387 break;
00388
00389 case XrdFrmRequest::getPRTY:
00390 if (tmpReq.Prty == 2) What = '2';
00391 else if (tmpReq.Prty == 1) What = '1';
00392 else What = '0';
00393 n = 1;
00394 if (bln) *Buff = What;
00395 break;
00396
00397 case XrdFrmRequest::getQWT:
00398 case XrdFrmRequest::getTOD:
00399 tval = tmpReq.addTOD;
00400 if (ITList[i] == XrdFrmRequest::getQWT) tval = time(0)-tval;
00401 if ((n = sprintf(tbuf, "%lld", tval)) >= 0)
00402 strlcpy(Buff, tbuf, bln);
00403 break;
00404
00405 case XrdFrmRequest::getRID:
00406 n = strlen(tmpReq.ID);
00407 strlcpy(Buff, tmpReq.ID, bln);
00408 break;
00409
00410 case XrdFrmRequest::getUSER:
00411 n = strlen(tmpReq.User);
00412 strlcpy(Buff, tmpReq.User, bln);
00413 break;
00414
00415 default: n = 0; break;
00416 }
00417 if (bln > 0) {bln -= n; Buff += n;}
00418 if (bln > 0) {*Buff++ = ' '; bln--;}
00419 }
00420 *Buff = '\0';
00421 }
00422
00423
00424
00425
00426
00427 void XrdFrmReqFile::FailAdd(char *lfn, int unlk)
00428 {
00429 Say.Emsg("Add", lfn, "not added to prestage queue.");
00430 if (unlk) FileLock(lkNone);
00431 }
00432
00433
00434
00435
00436
00437 void XrdFrmReqFile::FailCan(char *rid, int unlk)
00438 {
00439 Say.Emsg("Can", rid, "request not removed from prestage queue.");
00440 if (unlk) FileLock(lkNone);
00441 }
00442
00443
00444
00445
00446
00447 void XrdFrmReqFile::FailDel(char *lfn, int unlk)
00448 {
00449 Say.Emsg("Del", lfn, "not removed from prestage queue.");
00450 if (unlk) FileLock(lkNone);
00451 }
00452
00453
00454
00455
00456
00457 int XrdFrmReqFile::FailIni(const char *txt)
00458 {
00459 Say.Emsg("Init", errno, txt, reqFN);
00460 FileLock(lkNone);
00461 return 0;
00462 }
00463
00464
00465
00466
00467
00468 int XrdFrmReqFile::FileLock(LockType lktype)
00469 {
00470 FLOCK_t lock_args;
00471 const char *What;
00472 int rc;
00473
00474
00475
00476 bzero(&lock_args, sizeof(lock_args));
00477 if (lktype == lkNone)
00478 {lock_args.l_type = F_UNLCK; What = "unlock";
00479 if (isAgent && reqFD >= 0) {close(reqFD); reqFD = -1;}
00480 }
00481 else {lock_args.l_type = (lktype == lkShare ? F_RDLCK : F_WRLCK);
00482 What = "lock";
00483 }
00484
00485
00486
00487 do {rc = fcntl(lokFD,F_SETLKW,&lock_args);}
00488 while(rc < 0 && errno == EINTR);
00489 if (rc < 0) {Say.Emsg("FileLock", errno, What , lokFN); return 0;}
00490
00491
00492
00493 if (lktype == lkExcl || lktype == lkShare)
00494 {if (reqFD < 0 && (reqFD = open(reqFN, O_RDWR)) < 0)
00495 {Say.Emsg("FileLock",errno,"open",reqFN);
00496 FileLock(lkNone);
00497 return 0;
00498 }
00499 fcntl(reqFD, F_SETFD, FD_CLOEXEC);
00500 do {rc = pread(reqFD, (void *)&HdrData, sizeof(HdrData), 0);}
00501 while(rc < 0 && errno == EINTR);
00502 if (rc < 0) {Say.Emsg("reqRead",errno,"refresh hdr from", reqFN);
00503 FileLock(lkNone); return 0;
00504 }
00505 }
00506
00507
00508
00509 return 1;
00510 }
00511
00512
00513
00514
00515
00516 int XrdFrmReqFile::reqRead(void *Buff, int Offs)
00517 {
00518 int rc;
00519
00520 do {rc = pread(reqFD, Buff, ReqSize, Offs);} while(rc < 0 && errno == EINTR);
00521 if (rc < 0) {Say.Emsg("reqRead",errno,"read",reqFN); return 0;}
00522 return 1;
00523 }
00524
00525
00526
00527
00528
00529 int XrdFrmReqFile::reqWrite(void *Buff, int Offs, int updthdr)
00530 {
00531 int rc = 0;
00532
00533 if (Buff && Offs) do {rc = pwrite(reqFD, Buff, ReqSize, Offs);}
00534 while(rc < 0 && errno == EINTR);
00535 if (rc >= 0 && updthdr){do {rc = pwrite(reqFD,&HdrData, sizeof(HdrData), 0);}
00536 while(rc < 0 && errno == EINTR);
00537 if (rc >= 0) rc = fsync(reqFD);
00538 }
00539 if (rc < 0) {Say.Emsg("reqWrite",errno,"write", reqFN); return 0;}
00540 return 1;
00541 }
00542
00543
00544
00545
00546
00547 int XrdFrmReqFile::ReWrite(XrdFrmReqFile::recEnt *rP)
00548 {
00549 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00550 char newFN[MAXPATHLEN], *oldFN;
00551 int newFD, oldFD, Offs = ReqSize, aOK = 1;
00552
00553
00554
00555 strcpy(newFN, reqFN); strcat(newFN, ".new");
00556 if ((newFD = open(newFN, O_RDWR|O_CREAT|O_TRUNC, Mode)) < 0)
00557 {Say.Emsg("ReWrite",errno,"open",newFN); FileLock(lkNone); return 0;}
00558 fcntl(newFD, F_SETFD, FD_CLOEXEC);
00559
00560
00561
00562 oldFD = reqFD; reqFD = newFD;
00563 oldFN = reqFN; reqFN = newFN;
00564
00565
00566
00567 if (rP)
00568 {HdrData.First = Offs;
00569 while(rP)
00570 {rP->reqData.This = Offs;
00571 rP->reqData.Next = (rP->Next ? Offs+ReqSize : 0);
00572 if (!reqWrite((void *)&rP->reqData, Offs, 0)) {aOK = 0; break;}
00573 Offs += ReqSize;
00574 rP = rP->Next;
00575 }
00576 HdrData.Last = Offs - ReqSize;
00577 } else {
00578 HdrData.First = HdrData.Last = 0;
00579 if (ftruncate(newFD, ReqSize) < 0)
00580 {Say.Emsg("ReWrite",errno,"trunc",newFN); aOK = 0;}
00581 }
00582
00583
00584
00585 HdrData.Free = 0;
00586 if (aOK && !(aOK = reqWrite(0, 0)))
00587 Say.Emsg("ReWrite",errno,"write header",newFN);
00588
00589
00590
00591 if (aOK && rename(newFN, oldFN) < 0)
00592 {Say.Emsg("ReWrite",errno,"rename",newFN); aOK = 0;}
00593
00594
00595
00596 if (aOK) close(oldFD);
00597 else {close(newFD); reqFD = oldFD;}
00598 reqFN = oldFN;
00599 return aOK;
00600 }