XrdFrmReqBoss.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                      X r d F r m R e q B o s s . c c                       */
00004 /*                                                                            */
00005 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdFrmReqBoss.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdFrmReqBossCVSID = "$Id: XrdFrmReqBoss.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <stdio.h>
00016 #include <stdlib.h>
00017 #include <string.h>
00018 #include <strings.h>
00019 #include <unistd.h>
00020 #include <fcntl.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023 
00024 #include "XrdFrm/XrdFrmCID.hh"
00025 #include "XrdFrm/XrdFrmReqBoss.hh"
00026 #include "XrdFrm/XrdFrmTrace.hh"
00027 #include "XrdFrm/XrdFrmUtils.hh"
00028 #include "XrdFrm/XrdFrmXfrQueue.hh"
00029 #include "XrdNet/XrdNetMsg.hh"
00030 #include "XrdOuc/XrdOucUtils.hh"
00031 #include "XrdSys/XrdSysHeaders.hh"
00032 
00033 using namespace XrdFrm;
00034 
00035 /******************************************************************************/
00036 /*                     T h r e a d   I n t e r f a c e s                      */
00037 /******************************************************************************/
00038   
00039 void *mainServerXeq(void *parg)
00040 {
00041     XrdFrmReqBoss *theBoss = (XrdFrmReqBoss *)parg;
00042     theBoss->Process();
00043     return (void *)0;
00044 }
00045 
00046 /******************************************************************************/
00047 /* Public:                           A d d                                    */
00048 /******************************************************************************/
00049   
00050 void XrdFrmReqBoss::Add(XrdFrmRequest &Request)
00051 {
00052 
00053 // Complete the request including verifying the priority
00054 //
00055    if (Request.Prty > XrdFrmRequest::maxPrty)
00056       Request.Prty = XrdFrmRequest::maxPrty;
00057       else if (Request.Prty < 0)Request.Prty = 0;
00058    Request.addTOD = time(0);
00059 
00060 // Now add it to the queue
00061 //
00062    rQueue[static_cast<int>(Request.Prty)]->Add(&Request);
00063 
00064 // Now wake ourselves up
00065 //
00066    Wakeup(1);
00067 }
00068 
00069 /******************************************************************************/
00070 /* Public:                           D e l                                    */
00071 /******************************************************************************/
00072   
00073 void XrdFrmReqBoss::Del(XrdFrmRequest &Request)
00074 {
00075    int i;
00076   
00077 // Remove all pending requests for this id
00078 //
00079    for (i = 0; i <= XrdFrmRequest::maxPrty; i++) rQueue[i]->Can(&Request);
00080 }
00081 
00082 /******************************************************************************/
00083 /* Public:                       P r o c e s s                                */
00084 /******************************************************************************/
00085   
00086 void XrdFrmReqBoss::Process()
00087 {
00088    EPNAME("Process");
00089    XrdFrmRequest myReq;
00090    int i, rc, numXfr, numPull;;
00091 
00092 // Perform staging in an endless loop
00093 //
00094 do{Wakeup(0);
00095    do{numXfr = 0;
00096       for (i = XrdFrmRequest::maxPrty; i >= 0; i--)
00097           {numPull = i+1;
00098            while(numPull && (rc = rQueue[i]->Get(&myReq)))
00099                 {if (myReq.Options & XrdFrmRequest::Register) Register(myReq,i);
00100                     else {numPull -= XrdFrmXfrQueue::Add(&myReq,rQueue[i],theQ);
00101                           numXfr++;
00102                           DEBUG(Persona <<" from Q " << i <<' ' <<numPull <<" left");
00103                           if (rc < 0) break;
00104                          }
00105                 }
00106           }
00107      } while(numXfr);
00108   } while(1);
00109 }
00110 
00111 /******************************************************************************/
00112 /* Private:                     R e g i s t e r                               */
00113 /******************************************************************************/
00114 
00115 void XrdFrmReqBoss::Register(XrdFrmRequest &Req, int qNum)
00116 {
00117    EPNAME("Register");
00118    char *eP;
00119    int Pid;
00120 
00121 // Ignore this request if there is no cluster id or the process if is invalid
00122 //
00123    if (!(*Req.LFN)) return;
00124    Pid = strtol(Req.ID, &eP, 10);
00125    if (*eP || Pid == 0) return;
00126 
00127 // Register this cluster
00128 //
00129    if (CID.Add(Req.iName, Req.LFN, static_cast<time_t>(Req.addTOD), Pid))
00130       {DEBUG("Instance=" <<Req.iName <<" cluster=" <<Req.LFN <<" pid=" <<Pid);}
00131       else rQueue[qNum]->Del(&Req);
00132 }
00133 
00134 /******************************************************************************/
00135 /*                                 S t a r t                                  */
00136 /******************************************************************************/
00137   
00138 int XrdFrmReqBoss::Start(char *aPath, int aMode)
00139 {
00140    pthread_t tid;
00141    char buff[2048], *qPath;
00142    int retc, i;
00143 
00144 // Generate the queue directory path
00145 //
00146    if (!(qPath = XrdFrmUtils::makeQDir(aPath, aMode))) return 0;
00147 
00148 // Initialize the request queues if all went well
00149 //
00150    for (i = 0; i <= XrdFrmRequest::maxPrty; i++)
00151        {sprintf(buff, "%s%sQ.%d", qPath, Persona, i);
00152         rQueue[i] = new XrdFrmReqFile(buff, 0);
00153         if (!rQueue[i]->Init()) return 0;
00154        }
00155 
00156 // Start the request processing thread
00157 //
00158    if ((retc = XrdSysThread::Run(&tid, mainServerXeq, (void *)this,
00159                                  XRDSYSTHREAD_BIND, Persona)))
00160       {sprintf(buff, "create %s request thread", Persona);
00161        Say.Emsg("Start", retc, buff);
00162        return 0;
00163       }
00164 
00165 // All done
00166 //
00167    return 1;
00168 }
00169 
00170 /******************************************************************************/
00171 /* Public:                        W a k e u p                                 */
00172 /******************************************************************************/
00173   
00174 void XrdFrmReqBoss::Wakeup(int PushIt)
00175 {
00176    static XrdSysMutex     rqMutex;
00177 
00178 // If this is a PushIt then see if we need to push the binary semaphore
00179 //
00180    if (PushIt) {rqMutex.Lock();
00181                 if (!isPosted) {rqReady.Post(); isPosted = 1;}
00182                 rqMutex.UnLock();
00183                }
00184       else     {rqReady.Wait();
00185                 rqMutex.Lock(); isPosted = 0; rqMutex.UnLock();
00186                }
00187 }

Generated on Tue Jul 5 14:46:36 2011 for ROOT_528-00b_version by  doxygen 1.5.1