00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdFrmXfrQueueCVSID = "$Id: XrdFrmXfrQueue.cc 34000 2010-06-21 06:49:56Z ganis $";
00014
00015 #include <string.h>
00016 #include <strings.h>
00017 #include <stdio.h>
00018 #include <fcntl.h>
00019 #include <unistd.h>
00020 #include <utime.h>
00021 #include <sys/param.h>
00022 #include <sys/types.h>
00023 #include <sys/stat.h>
00024
00025 #include "XrdFrm/XrdFrmConfig.hh"
00026 #include "XrdFrm/XrdFrmReqFile.hh"
00027 #include "XrdFrm/XrdFrmTrace.hh"
00028 #include "XrdFrm/XrdFrmXfrJob.hh"
00029 #include "XrdFrm/XrdFrmXfrQueue.hh"
00030 #include "XrdNet/XrdNetMsg.hh"
00031 #include "XrdOuc/XrdOucTList.hh"
00032 #include "XrdSys/XrdSysError.hh"
00033 #include "XrdSys/XrdSysTimer.hh"
00034 #include "XrdSys/XrdSysPlatform.hh"
00035
00036 using namespace XrdFrm;
00037
00038
00039
00040
00041
00042 XrdSysMutex XrdFrmXfrQueue::hMutex;
00043 XrdOucHash<XrdFrmXfrJob> XrdFrmXfrQueue::hTab;
00044
00045 XrdSysMutex XrdFrmXfrQueue::qMutex;
00046 XrdSysSemaphore XrdFrmXfrQueue::qReady(0);
00047
00048 XrdFrmXfrQueue::theQueue XrdFrmXfrQueue::xfrQ[XrdFrmRequest::numQ];
00049
00050
00051
00052
00053
00054 int XrdFrmXfrQueue::Add(XrdFrmRequest *rP, XrdFrmReqFile *reqFQ, int qNum)
00055 {
00056 XrdFrmXfrJob *xP;
00057 struct stat buf;
00058 const char *xfrType = xfrName(*rP, qNum);
00059 char *Lfn, lclpath[MAXPATHLEN];
00060 int Outgoing = (qNum & XrdFrmRequest::outQ);
00061
00062
00063
00064 if (qNum < 0 || qNum >= XrdFrmRequest::numQ-1)
00065 {sprintf(lclpath, "%d", qNum);
00066 Say.Emsg("Queue", lclpath, " is an invalid queue; skipping", rP->LFN);
00067 if (reqFQ) reqFQ->Del(rP);
00068 return 0;
00069 }
00070
00071
00072
00073
00074
00075 Lfn = (Outgoing ? rP->LFN : (rP->LFN)+rP->LFO);
00076 hMutex.Lock();
00077 if ((xP = hTab.Find(Lfn)))
00078 {if (rP->Options & (XrdFrmRequest::msgSucc | XrdFrmRequest::msgFail)
00079 && strcmp(xP->reqData.Notify, rP->Notify))
00080 {XrdOucTList *tP = new XrdOucTList(rP->Notify, 0, xP->NoteList);
00081 xP->NoteList = tP;
00082 }
00083 hMutex.UnLock();
00084 if (Config.Verbose || Trace.What & TRACE_Debug)
00085 {sprintf(lclpath, " in progress; %s skipped for ", xfrType);
00086 Say.Say(0, xP->Type, xP->reqData.LFN, lclpath, rP->User);
00087 }
00088 if (reqFQ) reqFQ->Del(rP);
00089 return 0;
00090 }
00091 hMutex.UnLock();
00092
00093
00094
00095 if (!Config.LocalPath((rP->LFN)+rP->LFO, lclpath, sizeof(lclpath)-16))
00096 {if (reqFQ) reqFQ->Del(rP);
00097 return Notify(rP, qNum, 1, "Unable to generate pfn");
00098 }
00099
00100
00101
00102
00103 if (stat(lclpath, &buf))
00104 {if (Outgoing)
00105 {if (Config.Verbose || Trace.What & TRACE_Debug)
00106 Say.Say(0, xfrType,"skipped; ",lclpath," does not exist.");
00107 if (reqFQ) reqFQ->Del(rP);
00108 return Notify(rP, qNum, 2, "file not found");
00109 }
00110 } else {
00111 if (!Outgoing)
00112 {if (Config.Verbose || Trace.What & TRACE_Debug)
00113 Say.Say(0, xfrType, "skipped; ", lclpath, " exists.");
00114 if (reqFQ) reqFQ->Del(rP);
00115 return Notify(rP, qNum, 0);
00116 }
00117 }
00118
00119
00120
00121 do {qMutex.Lock();
00122 if ((xP = xfrQ[qNum].Free)) break;
00123 qMutex.UnLock();
00124 xfrQ[qNum].Avail.Wait();
00125 } while(!xP);
00126 xfrQ[qNum].Free = xP->Next;
00127 qMutex.UnLock();
00128
00129
00130
00131 xP->Next = 0;
00132 xP->NoteList = 0;
00133 xP->reqFQ = reqFQ;
00134 xP->reqData = *rP;
00135 xP->reqFile = (Outgoing ? xP->reqData.LFN : (xP->reqData.LFN)+rP->LFO);
00136 strcpy(xP->PFN, lclpath);
00137 xP->pfnEnd = strlen(lclpath);
00138 xP->RetCode = 0;
00139 xP->qNum = qNum;
00140 xP->Type = xfrType;
00141
00142
00143
00144 hMutex.Lock();
00145 hTab.Add(xP->reqFile, xP, 0, Hash_keep);
00146 hMutex.UnLock();
00147
00148
00149
00150 qMutex.Lock();
00151 if (xfrQ[qNum].Last) {xfrQ[qNum].Last->Next = xP; xfrQ[qNum].Last = xP;}
00152 else xfrQ[qNum].Last = xfrQ[qNum].First = xP;
00153 qMutex.UnLock();
00154 qReady.Post();
00155
00156
00157
00158 return 1;
00159 }
00160
00161
00162
00163
00164
00165 void XrdFrmXfrQueue::Done(XrdFrmXfrJob *xP, const char *Msg)
00166 {
00167 XrdOucTList *tP;
00168
00169
00170
00171 do {Notify(&(xP->reqData), xP->qNum, xP->RetCode, Msg);
00172 if ((tP = xP->NoteList))
00173 {strcpy(xP->reqData.Notify, tP->text);
00174 xP->NoteList = tP->next;
00175 delete tP;
00176 }
00177 } while(tP);
00178
00179
00180
00181 if (xP->reqFQ) xP->reqFQ->Del(&(xP->reqData));
00182
00183
00184
00185 hMutex.Lock(); hTab.Del(xP->reqFile); hMutex.UnLock();
00186
00187
00188
00189 qMutex.Lock();
00190 xP->Next = xfrQ[xP->qNum].Free;
00191 xfrQ[xP->qNum].Free = xP;
00192 xfrQ[xP->qNum].Avail.Post();
00193 qMutex.UnLock();
00194 }
00195
00196
00197
00198
00199
00200 XrdFrmXfrJob *XrdFrmXfrQueue::Get()
00201 {
00202 XrdFrmXfrJob *xfrP;
00203
00204
00205
00206 do {qReady.Wait();} while(!(xfrP = Pull()));
00207 return xfrP;
00208 }
00209
00210
00211
00212
00213
00214 void *InitStop(void *parg)
00215 { XrdFrmXfrQueue::StopMon(parg);
00216 return (void *)0;
00217 }
00218
00219 int XrdFrmXfrQueue::Init()
00220 {
00221 static const char *StopFN[] = {"STAGE", "MIGR", "COPYIN", "COPYOUT"};
00222 static const char *StopQN[] = {"stage", "migr", "copyin", "copyout"};
00223 XrdFrmXfrJob *xP;
00224 pthread_t tid;
00225 char StopFile[1024], *fnSfx;
00226 int n, qNum, retc;
00227
00228
00229
00230 strcpy(StopFile, Config.AdminPath);
00231 strcat(StopFile, "STOP");
00232 fnSfx = StopFile + strlen(StopFile);
00233
00234
00235
00236 for (qNum= 0; qNum < XrdFrmRequest::numQ-1; qNum++)
00237 {
00238
00239
00240
00241 strcpy(fnSfx, StopFN[qNum]);
00242 xfrQ[qNum].File = strdup(StopFile);
00243 xfrQ[qNum].Name = StopQN[qNum];
00244 xfrQ[qNum].qNum = qNum;
00245
00246
00247
00248 if ((retc = XrdSysThread::Run(&tid, InitStop, (void *)&xfrQ[qNum],
00249 XRDSYSTHREAD_BIND, "Stopfile monitor")))
00250 {Say.Emsg("main", retc, "create stopfile thread"); return 0;}
00251
00252
00253
00254
00255
00256 n = Config.xfrMax*2;
00257 while(n--)
00258 {xP = new XrdFrmXfrJob;
00259 xP->Next = xfrQ[qNum].Free;
00260 xfrQ[qNum].Free = xP;
00261 xfrQ[qNum].Avail.Post();
00262 }
00263 }
00264
00265
00266
00267 return 1;
00268 }
00269
00270
00271
00272
00273
00274 XrdFrmXfrJob *XrdFrmXfrQueue::Pull()
00275 {
00276 static int ioX = 0, prevQ[2] = {0,0};
00277 XrdFrmXfrJob *xfrP;
00278 int pikQ, theQ, Q1, Q2, nSel = 1;
00279
00280
00281
00282 qMutex.Lock();
00283 do{ioX = (ioX + 1) & 1;
00284 if (ioX) {Q1 = XrdFrmRequest::migQ; Q2 = XrdFrmRequest::putQ; pikQ = 1;}
00285 else {Q1 = XrdFrmRequest::stgQ; Q2 = XrdFrmRequest::getQ; pikQ = 0;}
00286
00287
00288
00289 if (xfrQ[Q1].Stop || Stopped(Q1)) Q1 = XrdFrmRequest::nilQ;
00290 if (xfrQ[Q2].Stop || Stopped(Q2)) Q2 = XrdFrmRequest::nilQ;
00291
00292
00293
00294 if (xfrQ[Q1].First && xfrQ[Q2].First)
00295 { if (xfrQ[Q1].First->reqData.addTOD < xfrQ[Q2].First->reqData.addTOD)
00296 theQ = Q1;
00297 else if (xfrQ[Q1].First->reqData.addTOD > xfrQ[Q2].First->reqData.addTOD)
00298 theQ = Q2;
00299 else theQ = (prevQ[pikQ] == Q1 ? Q2 : Q1);
00300 }else theQ = (xfrQ[Q1].First ? Q1 : Q2);
00301
00302
00303
00304 if ((xfrP = xfrQ[theQ].First)
00305 && !(xfrQ[theQ].First = xfrP->Next)) xfrQ[theQ].Last = 0;
00306 } while(!xfrP && nSel--);
00307
00308
00309
00310 prevQ[pikQ] = theQ;
00311 qMutex.UnLock();
00312 return xfrP;
00313 }
00314
00315
00316
00317
00318
00319 int XrdFrmXfrQueue::Notify(XrdFrmRequest *rP, int qNum, int rc, const char *msg)
00320 {
00321 static const char *isFile = "file:///";
00322 static const int lnFile = 8;
00323 static const char *isUDP = "udp://";
00324 static const int lnUDP = 6;
00325 static const char *qOpr[] = {"stage", "migr", "get", "put"};
00326 char msgbuff[4096], *nP, *mP = rP->Notify;
00327 int n;
00328
00329
00330
00331 if ((!rc && !(rP->Options & XrdFrmRequest::msgSucc))
00332 || ( rc && !(rP->Options & XrdFrmRequest::msgFail))) return 0;
00333
00334
00335
00336
00337 do{if ((nP = index(rP->Notify, '\r'))) *nP++ = '\0';
00338
00339
00340
00341 if (!strncmp(mP, isFile, lnFile))
00342 {if (rc) n = sprintf(msgbuff, "%s %s %s %s\n", qOpr[qNum],
00343 (rc > 1 ? "ENOENT":"BAD"), rP->LFN, (msg ? msg:"?"));
00344 else n = sprintf(msgbuff, "stage OK %s\n", rP->LFN);
00345 Send2File(mP+lnFile, msgbuff, n);
00346 }
00347
00348
00349
00350 else if (!strncmp(mP, isUDP, lnUDP))
00351 {char *txtP, *dstP = mP+lnUDP;
00352 if ((txtP = index(dstP, '/'))) *txtP++ = '\0';
00353 else txtP = (char *)"";
00354 n = sprintf(msgbuff, "%s %s %s %s", (rc ? "unprep" : "ready"),
00355 rP->ID, txtP, rP->LFN);
00356 Send2UDP(dstP, msgbuff, n);
00357 }
00358
00359
00360
00361 else if (*mP != '-')
00362 Say.Emsg("Notify", "Unsupported notification path '", mP, "'.");
00363 } while((mP = nP));
00364
00365
00366
00367 return 0;
00368 }
00369
00370
00371
00372
00373
00374 void XrdFrmXfrQueue::Send2File(char *Dest, char *Msg, int Mln)
00375 {
00376 EPNAME("Notify");
00377 int FD;
00378
00379
00380
00381 DEBUG("sending '" <<Msg <<"' via " <<Dest);
00382
00383
00384
00385 if ((FD = open(Dest, O_WRONLY)) < 0)
00386 {Say.Emsg("Notify", errno, "send notification via", Dest); return;}
00387 fcntl(FD, F_SETFD, FD_CLOEXEC);
00388
00389
00390
00391 if (write(FD, Msg, Mln) < 0)
00392 Say.Emsg("Notify", errno, "send notification via", Dest);
00393 close(FD);
00394 }
00395
00396
00397
00398
00399
00400 void XrdFrmXfrQueue::Send2UDP(char *Dest, char *Msg, int Mln)
00401 {
00402 EPNAME("Notify");
00403 static XrdNetMsg Relay(&Say, 0);
00404
00405
00406
00407 DEBUG("sending '" <<Msg <<"' via " <<Dest);
00408
00409
00410
00411 Relay.Send(Msg, Mln, Dest);
00412 }
00413
00414
00415
00416
00417
00418 void XrdFrmXfrQueue::StopMon(void *parg)
00419 {
00420 struct theQueue *monQ = (struct theQueue *)parg;
00421 XrdFrmXfrJob *xP;
00422 struct stat buf;
00423 char theMsg[80];
00424 int Cnt;
00425
00426
00427
00428 sprintf(theMsg, "exists; %s transfers suspended.", monQ->Name);
00429
00430
00431
00432 while(1)
00433 {monQ->Alert.Wait();
00434 Cnt = 0;
00435 while(!stat(monQ->File, &buf))
00436 {if (!Cnt--) {Say.Emsg("StopMon", monQ->File, theMsg); Cnt = 12;}
00437 XrdSysTimer::Snooze(5);
00438 }
00439 qMutex.Lock();
00440 monQ->Stop = 0;
00441 xP = monQ->First;
00442 while(xP) {qReady.Post(); xP = xP->Next;}
00443 qMutex.UnLock();
00444 }
00445 }
00446
00447
00448
00449
00450
00451 int XrdFrmXfrQueue::Stopped(int qNum)
00452 {
00453 struct stat buf;
00454
00455
00456
00457
00458 if (stat(xfrQ[qNum].File, &buf)) return 0;
00459 if (!xfrQ[qNum].Stop) {xfrQ[qNum].Stop = 1; xfrQ[qNum].Alert.Post();}
00460 return 1;
00461 }
00462
00463
00464
00465
00466
00467 const char *XrdFrmXfrQueue::xfrName(XrdFrmRequest &reqData, int qNum)
00468 {
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478 switch(qNum)
00479 {case XrdFrmRequest::getQ:
00480 return "CopyIn ";
00481 break;
00482 case XrdFrmRequest::migQ:
00483 return (reqData.Options & XrdFrmRequest::Purge ?
00484 "Migr+rm ":"Migrate ");
00485 break;
00486 case XrdFrmRequest::putQ:
00487 return (reqData.Options&XrdFrmRequest::Purge ?
00488 "Copy+rm " : "CopyOut ");
00489 break;
00490 case XrdFrmRequest::stgQ:
00491 return "Staging ";
00492 break;
00493 default: break;
00494 }
00495
00496 return "Unknown ";
00497 }