XrdFrmXfrQueue.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                     X r d F r m X f r Q u e u e . c c                      */
00004 /*                                                                            */
00005 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdFrmXfrQueue.cc 34000 2010-06-21 06:49:56Z ganis $
00012 
00013 const char *XrdFrmXfrQueueCVSID = "$Id: XrdFrmXfrQueue.cc 34000 2010-06-21 06:49:56Z ganis $";
00014 
00015 #include <string.h>
00016 #include <strings.h>
00017 #include <stdio.h>
00018 #include <fcntl.h>
00019 #include <unistd.h>
00020 #include <utime.h>
00021 #include <sys/param.h>
00022 #include <sys/types.h>
00023 #include <sys/stat.h>
00024 
00025 #include "XrdFrm/XrdFrmConfig.hh"
00026 #include "XrdFrm/XrdFrmReqFile.hh"
00027 #include "XrdFrm/XrdFrmTrace.hh"
00028 #include "XrdFrm/XrdFrmXfrJob.hh"
00029 #include "XrdFrm/XrdFrmXfrQueue.hh"
00030 #include "XrdNet/XrdNetMsg.hh"
00031 #include "XrdOuc/XrdOucTList.hh"
00032 #include "XrdSys/XrdSysError.hh"
00033 #include "XrdSys/XrdSysTimer.hh"
00034 #include "XrdSys/XrdSysPlatform.hh"
00035 
00036 using namespace XrdFrm;
00037   
00038 /******************************************************************************/
00039 /*                               S t a t i c s                                */
00040 /******************************************************************************/
00041 
00042 XrdSysMutex               XrdFrmXfrQueue::hMutex;
00043 XrdOucHash<XrdFrmXfrJob>  XrdFrmXfrQueue::hTab;
00044   
00045 XrdSysMutex               XrdFrmXfrQueue::qMutex;
00046 XrdSysSemaphore           XrdFrmXfrQueue::qReady(0);
00047 
00048 XrdFrmXfrQueue::theQueue  XrdFrmXfrQueue::xfrQ[XrdFrmRequest::numQ];
00049 
00050 /******************************************************************************/
00051 /* Public:                           A d d                                    */
00052 /******************************************************************************/
00053   
00054 int XrdFrmXfrQueue::Add(XrdFrmRequest *rP, XrdFrmReqFile *reqFQ, int qNum)
00055 {
00056    XrdFrmXfrJob *xP;
00057    struct stat buf;
00058    const char *xfrType = xfrName(*rP, qNum);
00059    char *Lfn, lclpath[MAXPATHLEN];
00060    int Outgoing = (qNum & XrdFrmRequest::outQ);
00061 
00062 // Validate queue number
00063 //
00064    if (qNum < 0 || qNum >= XrdFrmRequest::numQ-1)
00065       {sprintf(lclpath, "%d", qNum);
00066        Say.Emsg("Queue", lclpath, " is an invalid queue; skipping", rP->LFN);
00067        if (reqFQ) reqFQ->Del(rP);
00068        return 0;
00069       }
00070 
00071 // First check if this request is active or pending. If it's an inbound request
00072 // then only the lfn matters regardless of source. For outgoing requests then
00073 // the lfn plus the target only matters.
00074 //
00075    Lfn = (Outgoing ? rP->LFN : (rP->LFN)+rP->LFO);
00076    hMutex.Lock();
00077    if ((xP = hTab.Find(Lfn)))
00078       {if (rP->Options & (XrdFrmRequest::msgSucc | XrdFrmRequest::msgFail)
00079        &&  strcmp(xP->reqData.Notify, rP->Notify))
00080           {XrdOucTList *tP = new XrdOucTList(rP->Notify, 0, xP->NoteList);
00081            xP->NoteList = tP;
00082           }
00083        hMutex.UnLock();
00084        if (Config.Verbose || Trace.What & TRACE_Debug)
00085           {sprintf(lclpath, " in progress; %s skipped for ", xfrType);
00086            Say.Say(0, xP->Type, xP->reqData.LFN, lclpath, rP->User);
00087           }
00088        if (reqFQ) reqFQ->Del(rP);
00089        return 0;
00090       }
00091    hMutex.UnLock();
00092 
00093 // Obtain the local name
00094 //
00095    if (!Config.LocalPath((rP->LFN)+rP->LFO, lclpath, sizeof(lclpath)-16))
00096       {if (reqFQ) reqFQ->Del(rP);
00097        return Notify(rP, qNum, 1, "Unable to generate pfn");
00098       }
00099 
00100 // Check if the file exists or not. For incomming requests, the file must not
00101 // exist. For outgoing requests the file must exist.
00102 //
00103    if (stat(lclpath, &buf))
00104       {if (Outgoing)
00105           {if (Config.Verbose || Trace.What & TRACE_Debug)
00106               Say.Say(0, xfrType,"skipped; ",lclpath," does not exist.");
00107            if (reqFQ) reqFQ->Del(rP);
00108            return Notify(rP, qNum, 2, "file not found");
00109           }
00110       } else {
00111        if (!Outgoing)
00112           {if (Config.Verbose || Trace.What & TRACE_Debug)
00113               Say.Say(0, xfrType, "skipped; ", lclpath, " exists.");
00114            if (reqFQ) reqFQ->Del(rP);
00115            return Notify(rP, qNum, 0);
00116           }
00117       }
00118 
00119 // Obtain a queue slot, we may block until one is available
00120 //
00121    do {qMutex.Lock();
00122        if ((xP = xfrQ[qNum].Free)) break;
00123        qMutex.UnLock();
00124        xfrQ[qNum].Avail.Wait();
00125       } while(!xP);
00126    xfrQ[qNum].Free = xP->Next;
00127    qMutex.UnLock();
00128 
00129 // Initialize the slot
00130 //
00131    xP->Next     = 0;
00132    xP->NoteList = 0;
00133    xP->reqFQ    = reqFQ;
00134    xP->reqData  = *rP;
00135    xP->reqFile  = (Outgoing ? xP->reqData.LFN : (xP->reqData.LFN)+rP->LFO);
00136    strcpy(xP->PFN, lclpath);
00137    xP->pfnEnd   = strlen(lclpath);
00138    xP->RetCode  = 0;
00139    xP->qNum     = qNum;
00140    xP->Type     = xfrType;
00141 
00142 // Add this to the table of requests
00143 //
00144    hMutex.Lock();
00145    hTab.Add(xP->reqFile, xP, 0, Hash_keep);
00146    hMutex.UnLock();
00147 
00148 // Place request in the appropriate transfer queue
00149 //
00150    qMutex.Lock();
00151    if (xfrQ[qNum].Last) {xfrQ[qNum].Last->Next = xP; xfrQ[qNum].Last = xP;}
00152       else               xfrQ[qNum].Last       = xfrQ[qNum].First    = xP;
00153    qMutex.UnLock();
00154    qReady.Post();
00155 
00156 // All done
00157 //
00158    return 1;
00159 }
00160 
00161 /******************************************************************************/
00162 /* Public:                          D o n e                                   */
00163 /******************************************************************************/
00164 
00165 void XrdFrmXfrQueue::Done(XrdFrmXfrJob *xP, const char *Msg)
00166 {
00167    XrdOucTList *tP;
00168 
00169 // Send notifications to everyone that wants it that this job is done
00170 //
00171    do {Notify(&(xP->reqData), xP->qNum, xP->RetCode, Msg);
00172        if ((tP = xP->NoteList))
00173           {strcpy(xP->reqData.Notify, tP->text);
00174            xP->NoteList = tP->next;
00175            delete tP;
00176           }
00177       } while(tP);
00178 
00179 // Remove this job from the queue file
00180 //
00181    if (xP->reqFQ) xP->reqFQ->Del(&(xP->reqData));
00182 
00183 // Remove this job from the active table
00184 //
00185    hMutex.Lock(); hTab.Del(xP->reqFile); hMutex.UnLock();
00186   
00187 // Place job element on the free queue
00188 //
00189    qMutex.Lock();
00190    xP->Next = xfrQ[xP->qNum].Free;
00191    xfrQ[xP->qNum].Free = xP;
00192    xfrQ[xP->qNum].Avail.Post();
00193    qMutex.UnLock();
00194 }
00195 
00196 /******************************************************************************/
00197 /* Public:                           G e t                                    */
00198 /******************************************************************************/
00199   
00200 XrdFrmXfrJob *XrdFrmXfrQueue::Get()
00201 {
00202    XrdFrmXfrJob *xfrP;
00203 
00204 // Wait for an available job and return it
00205 //
00206    do {qReady.Wait();} while(!(xfrP = Pull()));
00207    return xfrP;
00208 }
00209   
00210 /******************************************************************************/
00211 /*                                  I n i t                                   */
00212 /******************************************************************************/
00213   
00214 void *InitStop(void *parg)
00215 {   XrdFrmXfrQueue::StopMon(parg);
00216     return (void *)0;
00217 }
00218   
00219 int XrdFrmXfrQueue::Init()
00220 {
00221    static const char *StopFN[] = {"STAGE", "MIGR", "COPYIN", "COPYOUT"};
00222    static const char *StopQN[] = {"stage", "migr", "copyin", "copyout"};
00223    XrdFrmXfrJob *xP;
00224    pthread_t tid;
00225    char StopFile[1024], *fnSfx;
00226    int n, qNum, retc;
00227 
00228 // Prepare to initialize the queues
00229 //
00230    strcpy(StopFile, Config.AdminPath);
00231    strcat(StopFile, "STOP");
00232    fnSfx = StopFile + strlen(StopFile);
00233 
00234 // Initialize each queue
00235 //
00236    for (qNum= 0; qNum < XrdFrmRequest::numQ-1; qNum++)
00237       {
00238 
00239    // Initialize the stop file name and set the queue name and number
00240    //
00241         strcpy(fnSfx, StopFN[qNum]);
00242         xfrQ[qNum].File = strdup(StopFile);
00243         xfrQ[qNum].Name = StopQN[qNum];
00244         xfrQ[qNum].qNum = qNum;
00245 
00246    // Start the stop file monitor thread for this queue
00247    //
00248         if ((retc = XrdSysThread::Run(&tid, InitStop, (void *)&xfrQ[qNum],
00249                                       XRDSYSTHREAD_BIND, "Stopfile monitor")))
00250            {Say.Emsg("main", retc, "create stopfile thread"); return 0;}
00251 
00252    // Create twice as many free queue elements as we have xfr agents for the
00253    // queue. This prevents stalls when a particular queue is stopped but keeps
00254    // us from exceeding internal resources when we get flooded with requests.
00255    //
00256         n = Config.xfrMax*2;
00257         while(n--)
00258              {xP = new XrdFrmXfrJob;
00259               xP->Next = xfrQ[qNum].Free;
00260               xfrQ[qNum].Free = xP;
00261               xfrQ[qNum].Avail.Post();
00262              }
00263        }
00264 
00265 // All done
00266 //
00267    return 1;
00268 }
00269 
00270 /******************************************************************************/
00271 /* Private:                         P u l l                                   */
00272 /******************************************************************************/
00273   
00274 XrdFrmXfrJob *XrdFrmXfrQueue::Pull()
00275 {
00276    static int ioX = 0, prevQ[2] = {0,0};
00277    XrdFrmXfrJob *xfrP;
00278    int pikQ, theQ, Q1, Q2, nSel = 1;
00279 
00280 // Setup to pick a request equally multiplexing between all possible queues
00281 //
00282    qMutex.Lock();
00283 do{ioX = (ioX + 1) & 1;
00284    if (ioX) {Q1 = XrdFrmRequest::migQ; Q2 = XrdFrmRequest::putQ; pikQ = 1;}
00285       else  {Q1 = XrdFrmRequest::stgQ; Q2 = XrdFrmRequest::getQ; pikQ = 0;}
00286 
00287 // Check if we should avoid either queue because it is stopped
00288 //
00289    if (xfrQ[Q1].Stop || Stopped(Q1)) Q1 = XrdFrmRequest::nilQ;
00290    if (xfrQ[Q2].Stop || Stopped(Q2)) Q2 = XrdFrmRequest::nilQ;
00291 
00292 // Pick the oldest possible request
00293 //
00294    if (xfrQ[Q1].First && xfrQ[Q2].First)
00295       {     if (xfrQ[Q1].First->reqData.addTOD < xfrQ[Q2].First->reqData.addTOD)
00296                theQ = Q1;
00297        else if (xfrQ[Q1].First->reqData.addTOD > xfrQ[Q2].First->reqData.addTOD)
00298                theQ = Q2;
00299        else theQ = (prevQ[pikQ] == Q1 ? Q2 : Q1);
00300       }else theQ = (xfrQ[Q1].First    ? Q1 : Q2);
00301 
00302 // Dequeue the request (we may have an empty selectoin here)
00303 //
00304    if ((xfrP = xfrQ[theQ].First)
00305    &&  !(xfrQ[theQ].First = xfrP->Next)) xfrQ[theQ].Last = 0;
00306   } while(!xfrP && nSel--);
00307 
00308 // Return the job, if any
00309 //
00310    prevQ[pikQ] = theQ;
00311    qMutex.UnLock();
00312    return xfrP;
00313 }
00314 
00315 /******************************************************************************/
00316 /* Private:                       N o t i f y                                 */
00317 /******************************************************************************/
00318   
00319 int XrdFrmXfrQueue::Notify(XrdFrmRequest *rP, int qNum, int rc, const char *msg)
00320 {
00321    static const char *isFile = "file:///";
00322    static const int   lnFile = 8;
00323    static const char *isUDP  = "udp://";
00324    static const int   lnUDP  = 6;
00325    static const char *qOpr[] = {"stage", "migr", "get", "put"};
00326    char msgbuff[4096], *nP, *mP = rP->Notify;
00327    int n;
00328 
00329 // Check if message really needs to be sent
00330 //
00331    if ((!rc && !(rP->Options & XrdFrmRequest::msgSucc))
00332    ||  ( rc && !(rP->Options & XrdFrmRequest::msgFail))) return 0;
00333 
00334 // Multiple destinations can be specified, each destination separated by a
00335 // carriable rturn. We don't screen out duplicates.
00336 //
00337 do{if ((nP = index(rP->Notify, '\r'))) *nP++ = '\0';
00338 
00339 // Check for file destination
00340 //
00341         if (!strncmp(mP, isFile, lnFile))
00342            {if (rc) n = sprintf(msgbuff, "%s %s %s %s\n", qOpr[qNum],
00343                         (rc > 1 ? "ENOENT":"BAD"), rP->LFN, (msg ? msg:"?"));
00344                else n = sprintf(msgbuff, "stage OK %s\n", rP->LFN);
00345             Send2File(mP+lnFile, msgbuff, n);
00346            }
00347 
00348 // Check for udp destination
00349 //
00350    else if (!strncmp(mP, isUDP,  lnUDP))
00351            {char *txtP, *dstP = mP+lnUDP;
00352             if ((txtP = index(dstP, '/'))) *txtP++ = '\0';
00353                else txtP = (char *)"";
00354             n = sprintf(msgbuff, "%s %s %s %s", (rc ? "unprep" : "ready"),
00355                                  rP->ID, txtP, rP->LFN);
00356             Send2UDP(dstP, msgbuff, n);
00357            }
00358 
00359 // Issue warning as we don't yet support mail or tcp notifications
00360 //
00361    else if (*mP != '-')
00362            Say.Emsg("Notify", "Unsupported notification path '", mP, "'.");
00363   } while((mP = nP));
00364 
00365 // All done
00366 //
00367    return 0;
00368 }
00369 
00370 /******************************************************************************/
00371 /* Private:                    S e n d 2 F i l e                              */
00372 /******************************************************************************/
00373   
00374 void XrdFrmXfrQueue::Send2File(char *Dest, char *Msg, int Mln)
00375 {
00376    EPNAME("Notify");
00377    int FD;
00378 
00379 // Do some debugging
00380 //
00381    DEBUG("sending '" <<Msg <<"' via " <<Dest);
00382 
00383 // Open the file
00384 //
00385    if ((FD = open(Dest, O_WRONLY)) < 0)
00386       {Say.Emsg("Notify", errno, "send notification via", Dest); return;}
00387    fcntl(FD, F_SETFD, FD_CLOEXEC);
00388 
00389 // Write the message
00390 //
00391    if (write(FD, Msg, Mln) < 0)
00392       Say.Emsg("Notify", errno, "send notification via", Dest);
00393    close(FD);
00394 }
00395 
00396 /******************************************************************************/
00397 /* Private:                     S e n d 2 U D P                               */
00398 /******************************************************************************/
00399 
00400 void XrdFrmXfrQueue::Send2UDP(char *Dest, char *Msg, int Mln)
00401 {
00402    EPNAME("Notify");
00403    static XrdNetMsg Relay(&Say, 0);
00404 
00405 // Do some debugging
00406 //
00407    DEBUG("sending '" <<Msg <<"' via " <<Dest);
00408   
00409 // Send off the message
00410 //
00411    Relay.Send(Msg, Mln, Dest);
00412 }
00413 
00414 /******************************************************************************/
00415 /* Public:                       S t o p M o n                                */
00416 /******************************************************************************/
00417   
00418 void XrdFrmXfrQueue::StopMon(void *parg)
00419 {
00420    struct theQueue *monQ = (struct theQueue *)parg;
00421    XrdFrmXfrJob *xP;
00422    struct stat buf;
00423    char theMsg[80];
00424    int Cnt;
00425 
00426 // Establish which message to produce
00427 //
00428    sprintf(theMsg, "exists; %s transfers suspended.", monQ->Name);
00429 
00430 // Wait until someone needs to tell us to check for a stop file
00431 //
00432    while(1)
00433         {monQ->Alert.Wait();
00434          Cnt = 0;
00435          while(!stat(monQ->File, &buf))
00436               {if (!Cnt--) {Say.Emsg("StopMon", monQ->File, theMsg); Cnt = 12;}
00437                XrdSysTimer::Snooze(5);
00438               }
00439          qMutex.Lock();
00440          monQ->Stop = 0;
00441          xP = monQ->First;
00442          while(xP) {qReady.Post(); xP = xP->Next;}
00443          qMutex.UnLock();
00444         }
00445 }
00446 
00447 /******************************************************************************/
00448 /* Private:                      S t o p p e d                                */
00449 /******************************************************************************/
00450   
00451 int XrdFrmXfrQueue::Stopped(int qNum) // Called with qMutex locked!
00452 {
00453    struct stat buf;
00454 
00455 // Check for stop file existence. If it exists and the queue has not been
00456 // stopped; stop it and alert the stop file monitor.
00457 //
00458    if (stat(xfrQ[qNum].File, &buf)) return 0;
00459    if (!xfrQ[qNum].Stop) {xfrQ[qNum].Stop = 1; xfrQ[qNum].Alert.Post();}
00460    return 1;
00461 }
00462 
00463 /******************************************************************************/
00464 /* Private:                      x f r N a m e                                */
00465 /******************************************************************************/
00466   
00467 const char *XrdFrmXfrQueue::xfrName(XrdFrmRequest &reqData, int qNum)
00468 {
00469 
00470 // Return a human name for this transfer:
00471 // Migrate
00472 // Migr+rm
00473 // Staging
00474 // CopyIn
00475 // CopyOut
00476 // Copy+rm
00477 //
00478    switch(qNum)
00479          {case XrdFrmRequest::getQ:
00480                return "CopyIn ";
00481                break;
00482           case XrdFrmRequest::migQ: 
00483                return (reqData.Options & XrdFrmRequest::Purge ?
00484                        "Migr+rm ":"Migrate ");
00485                break;
00486           case XrdFrmRequest::putQ:
00487                return (reqData.Options&XrdFrmRequest::Purge ?
00488                        "Copy+rm " : "CopyOut ");
00489                break;
00490           case XrdFrmRequest::stgQ:
00491                return "Staging ";
00492                break;
00493           default:   break;
00494          }
00495 
00496    return "Unknown ";
00497 }

Generated on Tue Jul 5 14:46:36 2011 for ROOT_528-00b_version by  doxygen 1.5.1