00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdCmsPrepareCVSID = "$Id: XrdCmsPrepare.cc 34990 2010-08-25 10:31:23Z ganis $";
00016
00017 #include <fcntl.h>
00018 #include <stdlib.h>
00019 #include <unistd.h>
00020 #include <sys/types.h>
00021 #include <sys/stat.h>
00022
00023 #include "XrdCms/XrdCmsConfig.hh"
00024 #include "XrdCms/XrdCmsPrepare.hh"
00025 #include "XrdCms/XrdCmsTrace.hh"
00026 #include "XrdFrm/XrdFrmProxy.hh"
00027 #include "XrdNet/XrdNetMsg.hh"
00028 #include "XrdOss/XrdOss.hh"
00029 #include "XrdOuc/XrdOucEnv.hh"
00030 #include "XrdOuc/XrdOucMsubs.hh"
00031 #include "XrdOuc/XrdOucTList.hh"
00032 #include "XrdSys/XrdSysError.hh"
00033
00034 using namespace XrdCms;
00035
00036
00037
00038
00039
00040 XrdCmsPrepare XrdCms::PrepQ;
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 int XrdCmsScrubScan(const char *key, char *cip, void *xargp)
00051 {
00052 struct stat buf;
00053
00054
00055
00056 return (Config.ossFS->Stat(key, &buf, XRDOSS_resonly) ? 0 : -1);
00057 }
00058
00059
00060
00061
00062
00063 XrdCmsPrepare::XrdCmsPrepare() : XrdJob("Prep cache scrubber"),
00064 prepSched(&Say)
00065 {prepif = 0;
00066 preppid = 0;
00067 resetcnt = scrub2rst = 3;
00068 scrubtime= 20*60;
00069 NumFiles = 0;
00070 lastemsg = time(0);
00071 Relay = new XrdNetMsg(&Say);
00072 PrepFrm = 0;
00073 prepOK = 0;
00074 }
00075
00076
00077
00078
00079
00080 int XrdCmsPrepare::Add(XrdCmsPrepArgs &pargs)
00081 {
00082 char *pdata[XrdOucMsubs::maxElem+2], prtybuff[8], *pP=prtybuff;
00083 int rc, pdlen[XrdOucMsubs::maxElem + 2];
00084
00085
00086
00087 if (PrepFrm)
00088 {rc = PrepFrm->Add('+',pargs.path, pargs.opaque,pargs.Ident,pargs.reqid,
00089 pargs.notify,pargs.mode,atoi(pargs.prty));
00090 if (rc) Say.Emsg("Add", rc, "prepare", pargs.path);
00091 else {PTMutex.Lock();
00092 if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
00093 PTMutex.UnLock();
00094 }
00095 return rc == 0;
00096 }
00097
00098
00099
00100 PTMutex.Lock();
00101 if (!prepif || !prepSched.isAlive())
00102 {Say.Emsg("Add","No prepare manager; prepare",pargs.reqid,"ignored.");
00103 PTMutex.UnLock();
00104 return 0;
00105 }
00106
00107
00108
00109 if (!prepMsg)
00110 {*pP++ = pargs.prty[0]; *pP = '\0';
00111 pdata[0] = (char *)"+ "; pdlen[0] = 2;
00112 pdata[1] = pargs.reqid; pdlen[1] = strlen(pargs.reqid);
00113 pdata[2] = (char *)" "; pdlen[2] = 1;
00114 pdata[3] = pargs.notify; pdlen[3] = strlen(pargs.notify);
00115 pdata[4] = (char *)" "; pdlen[4] = 1;
00116 pdata[5] = prtybuff; pdlen[5] = strlen(prtybuff);
00117 pdata[6] = (char *)" "; pdlen[6] = 1;
00118 pdata[7] = pargs.mode; pdlen[7] = strlen(pargs.mode);
00119 pdata[8] = (char *)" "; pdlen[8] = 1;
00120 pdata[9] = pargs.path; pdlen[9] = strlen(pargs.path);
00121 pdata[10] = (char *)"\n"; pdlen[10] = 1;
00122 pdata[11]= 0; pdlen[11]= 0;
00123 if (!(rc = prepSched.Put((const char **)pdata, (const int *)pdlen)))
00124 if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
00125 } else {
00126 int Oflag = (index(pargs.mode, (int)'w') ? O_RDWR : 0);
00127 mode_t Prty = atoi(pargs.prty);
00128 XrdOucEnv Env(pargs.opaque);
00129 XrdOucMsubsInfo Info(pargs.Ident, &Env, N2N, pargs.path,
00130 pargs.notify, Prty, Oflag, pargs.mode, pargs.reqid);
00131 int k = prepMsg->Subs(Info, pdata, pdlen);
00132 pdata[k] = (char *)"\n"; pdlen[k++] = 1;
00133 pdata[k] = 0; pdlen[k] = 0;
00134 if (!(rc = prepSched.Put((const char **)pdata, (const int *)pdlen)))
00135 if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
00136 }
00137
00138
00139
00140 PTMutex.UnLock();
00141 return rc == 0;
00142 }
00143
00144
00145
00146
00147
00148 int XrdCmsPrepare::Del(char *reqid)
00149 {
00150 char *pdata[4];
00151 int rc, pdlen[4];
00152
00153
00154
00155 if (PrepFrm)
00156 {if ((rc = PrepFrm->Del('-', reqid)))
00157 Say.Emsg("Del", rc, "unprepare", reqid);
00158 return rc == 0;
00159 }
00160
00161
00162
00163 PTMutex.Lock();
00164 if (!prepif || !prepSched.isAlive())
00165 {Say.Emsg("Del","No prepare manager; unprepare",reqid,"ignored.");
00166 PTMutex.UnLock();
00167 return 0;
00168 }
00169
00170
00171
00172 pdata[0] = (char *)"- ";
00173 pdlen[0] = 2;
00174 pdata[1] = reqid;
00175 pdlen[1] = strlen(reqid);
00176 pdata[2] = (char *)"\n";
00177 pdlen[2] = 1;
00178 pdata[3] = (char *)0;
00179 pdlen[3] = 0;
00180 rc = prepSched.Put((const char **)pdata, (const int *)pdlen);
00181 PTMutex.UnLock();
00182 return rc == 0;
00183 }
00184
00185
00186
00187
00188
00189 void XrdCmsPrepare::DoIt()
00190 {
00191
00192
00193 Scrub();
00194 Sched->Schedule((XrdJob *)this,scrubtime+time(0));
00195 }
00196
00197
00198
00199
00200
00201 int XrdCmsPrepare::Exists(char *path)
00202 {
00203 int Found;
00204
00205
00206
00207 PTMutex.Lock();
00208
00209
00210
00211 Found = (NumFiles ? PTable.Find(path) != 0 : 0);
00212
00213
00214
00215 PTMutex.UnLock();
00216 return Found;
00217 }
00218
00219
00220
00221
00222
00223 void XrdCmsPrepare::Gone(char *path)
00224 {
00225
00226
00227
00228 PTMutex.Lock();
00229
00230
00231
00232 if (NumFiles > 0 && PTable.Del(path) == 0) NumFiles--;
00233
00234
00235
00236 PTMutex.UnLock();
00237 }
00238
00239
00240
00241
00242
00243 void XrdCmsPrepare::Inform(const char *cmd, XrdCmsPrepArgs *pargs)
00244 {
00245 EPNAME("Inform")
00246 struct iovec Msg[8];
00247 char *mdest, *minfo;
00248
00249
00250
00251 if (!index(pargs->mode, (int)'n')
00252 || strncmp("udp://", pargs->notify, 6)
00253 || !Relay)
00254 {DEBUG(pargs->Ident <<' ' <<cmd <<' ' <<pargs->reqid <<" not sent to "
00255 <<pargs->notify);
00256 return;
00257 }
00258
00259
00260
00261 mdest = pargs->notify+6;
00262 if ((minfo = index(mdest, (int)'/')))
00263 {*minfo = '\0'; minfo++;}
00264 if (!minfo || !*minfo) minfo = (char *)"*";
00265 DEBUG("Sending " <<mdest <<": " <<cmd <<' '<<pargs->reqid <<' ' <<minfo);
00266
00267
00268
00269 Msg[0].iov_base = (char *)cmd; Msg[0].iov_len = strlen(cmd);
00270 Msg[1].iov_base = (char *)" "; Msg[1].iov_len = 1;
00271 Msg[2].iov_base = pargs->reqid; Msg[2].iov_len = strlen(pargs->reqid);
00272 Msg[3].iov_base = (char *)" "; Msg[3].iov_len = 1;
00273 Msg[4].iov_base = minfo; Msg[4].iov_len = strlen(minfo);
00274 Msg[5].iov_base = (char *)" "; Msg[5].iov_len = 1;
00275 Msg[6].iov_base = pargs->path; Msg[6].iov_len = (pargs->pathlen)-1;
00276 Msg[7].iov_base = (char *)"\n"; Msg[7].iov_len = 1;
00277
00278
00279
00280 Relay->Send(Msg, 8, mdest);
00281 }
00282
00283
00284
00285
00286
00287 void XrdCmsPrepare::Prepare(XrdCmsPrepArgs *pargs)
00288 {
00289 EPNAME("Prepare");
00290 int rc;
00291
00292
00293
00294 if (!(rc = isOnline(pargs->path)))
00295 {DEBUG("Preparing " <<pargs->reqid <<' ' <<pargs->notify <<' '
00296 <<pargs->prty <<' ' <<pargs->mode <<' ' <<pargs->path);
00297 if (!Config.DiskSS) Say.Emsg("Prepare","staging disallowed; ignoring prep",
00298 pargs->Ident, pargs->reqid);
00299 else Add(*pargs);
00300 return;
00301 }
00302
00303
00304
00305 if (rc > 0) Inform("avail", pargs);
00306 }
00307
00308
00309
00310
00311
00312 void XrdCmsPrepare::Reset(const char *iName, const char *aPath, int aMode)
00313 {
00314 EPNAME("Reset");
00315 char baseAP[1024], *Slash;
00316
00317
00318
00319
00320 if (!prepif) return;
00321
00322
00323
00324
00325
00326 if (!*prepif)
00327 {PrepFrm = new XrdFrmProxy(Say.logger(), iName);
00328 DEBUG("Initializing internal FRM prepare interface.");
00329 strcpy(baseAP, aPath); baseAP[strlen(baseAP)-1] = '\0';
00330 if ((Slash = rindex(baseAP, '/'))) *Slash = '\0';
00331 if (!(prepOK = PrepFrm->Init(XrdFrmProxy::opStg, baseAP, aMode)))
00332 {Say.Emsg("Reset", "Built-in prepare init failed; prepare disabled.");
00333 return;
00334 }
00335 }
00336
00337
00338
00339 Reset();
00340 if (scrubtime) Sched->Schedule((XrdJob *)this,scrubtime+time(0));
00341
00342 }
00343
00344
00345
00346
00347
00348 int XrdCmsPrepare::setParms(int rcnt, int stime, int deco)
00349 {if (rcnt > 0) resetcnt = scrub2rst = rcnt;
00350 if (stime > 0) scrubtime = stime;
00351 doEcho = deco;
00352 return 0;
00353 }
00354
00355 int XrdCmsPrepare::setParms(const char *ifpgm, char *ifmsg)
00356 {if (ifpgm)
00357 {const char *Slash = rindex(ifpgm, '/');
00358 if (prepif) free(prepif);
00359 if (Slash && !strcmp(Slash+1, "frm_xfragent")) ifpgm = "";
00360 prepif = strdup(ifpgm);
00361 }
00362 if (ifmsg)
00363 {if (prepMsg) delete prepMsg;
00364 prepMsg = new XrdOucMsubs(&Say);
00365 if (!(prepMsg->Parse("prepmsg", ifmsg)))
00366 {delete prepMsg; prepMsg = 0; return 1;}
00367 }
00368 return 0;
00369 }
00370
00371
00372
00373
00374
00375
00376
00377
00378 int XrdCmsPrepare::isOnline(char *path)
00379 {
00380 static const int Sopts = XRDOSS_resonly | XRDOSS_updtatm;
00381 struct stat buf;
00382
00383
00384
00385
00386 if (Config.ossFS->Stat(path, &buf, Sopts))
00387 {if (Config.DiskSS && Exists(path)) return -1;
00388 else return 0;
00389 }
00390 return 1;
00391 }
00392
00393
00394
00395
00396
00397 void XrdCmsPrepare::Reset()
00398 {
00399 char *lp, *pdata[] = {(char *)"?\n", 0};
00400 int pdlen[] = {2, 0};
00401
00402
00403
00404 if (PrepFrm)
00405 {XrdFrmProxy::Queues State(XrdFrmProxy::opStg);
00406 char Buff[1024];
00407 if (prepOK)
00408 {PTable.Purge(); NumFiles = 0;
00409 while(PrepFrm->List(State, Buff, sizeof(Buff)))
00410 {PTable.Add(Buff, 0, 0, Hash_data_is_key); NumFiles++;
00411 if (doEcho) Say.Emsg("Reset","Prepare pending for",Buff);
00412 }
00413 }
00414 return;
00415 }
00416
00417
00418
00419 if (!prepif)
00420 {Say.Emsg("Reset", "Prepare program not specified; prepare disabled.");
00421 return;
00422 }
00423
00424
00425
00426 if (!prepSched.isAlive() && !startIF()) return;
00427 if (prepSched.Put((const char **)pdata, (const int *)pdlen))
00428 {Say.Emsg("Prepare", prepSched.LastError(), "write to", prepif);
00429 prepSched.Drain(); prepOK = 0;
00430 }
00431 else {PTable.Purge(); NumFiles = 0;
00432 while((lp = prepSched.GetLine()) && *lp)
00433 {PTable.Add(lp, 0, 0, Hash_data_is_key); NumFiles++;
00434 if (doEcho) Say.Emsg("Reset","Prepare pending for",lp);
00435 }
00436 }
00437 }
00438
00439
00440
00441
00442
00443 void XrdCmsPrepare::Scrub()
00444 {
00445 PTMutex.Lock();
00446 if (scrub2rst <= 0)
00447 {Reset();
00448 scrub2rst = resetcnt;
00449 }
00450 else {PTable.Apply(XrdCmsScrubScan, (void *)0);
00451 scrub2rst--;
00452 }
00453 if (!PrepFrm && !prepSched.isAlive()) startIF();
00454 PTMutex.UnLock();
00455 }
00456
00457
00458
00459
00460
00461 int XrdCmsPrepare::startIF()
00462 {
00463 EPNAME("startIF")
00464
00465
00466
00467 if (PrepFrm) return prepOK;
00468
00469
00470
00471 if (!prepif)
00472 {Say.Emsg("startIF","Prepare program not specified; prepare disabled.");
00473 return (prepOK = 0);
00474 }
00475
00476
00477
00478 DEBUG("Prepare: Starting " <<prepif);
00479 if (prepSched.Exec(prepif, 1))
00480 {time_t eNow = time(0);
00481 prepOK = 0;
00482 if ((eNow - lastemsg) >= 60)
00483 {lastemsg = eNow;
00484 Say.Emsg("Prepare", prepSched.LastError(), "start", prepif);
00485 }
00486 } else prepOK = 1;
00487
00488
00489
00490 return prepOK;
00491 }