XrdBwmHandle.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                       X r d B w m H a n d l e . c c                        */
00004 /*                                                                            */
00005 /* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC03-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //         $Id: XrdBwmHandle.cc 32231 2010-02-05 18:24:46Z ganis $
00012 
00013 const char *XrdBwmHandleCVSID = "$Id: XrdBwmHandle.cc 32231 2010-02-05 18:24:46Z ganis $";
00014 
00015 #include <stdio.h>
00016 #include <string.h>
00017 
00018 #include "XrdBwm/XrdBwmHandle.hh"
00019 #include "XrdBwm/XrdBwmLogger.hh"
00020 #include "XrdBwm/XrdBwmTrace.hh"
00021 #include "XrdSfs/XrdSfsInterface.hh"
00022 #include "XrdSys/XrdSysError.hh"
00023 #include "XrdSys/XrdSysPlatform.hh"
00024 
00025 #include "XProtocol/XProtocol.hh"
00026 
00027 /******************************************************************************/
00028 /*                        S t a t i c   O b j e c t s                         */
00029 /******************************************************************************/
00030   
00031 XrdBwmLogger   *XrdBwmHandle::Logger = 0;
00032 XrdBwmPolicy   *XrdBwmHandle::Policy = 0;
00033 XrdBwmHandle   *XrdBwmHandle::Free = 0;
00034 unsigned int    XrdBwmHandle::numQueued = 0;
00035 
00036 extern XrdSysError BwmEroute;
00037 
00038 /******************************************************************************/
00039 /*                         L o c a l   C l a s s e s                          */
00040 /******************************************************************************/
00041   
00042 class XrdBwmHandleCB : public XrdOucEICB, public XrdOucErrInfo
00043 {
00044 public:
00045 
00046 static
00047 XrdBwmHandleCB *Alloc()
00048                   {XrdBwmHandleCB *mP;
00049                    xMutex.Lock();
00050                    if (!(mP = Free)) mP = new XrdBwmHandleCB;
00051                       else Free = mP->Next;
00052                    xMutex.UnLock();
00053                    return mP;
00054                   }
00055 
00056 void  Done(int &Results, XrdOucErrInfo *eInfo)
00057                   {xMutex.Lock();
00058                    Next = Free;
00059                    Free = this;
00060                    xMutex.UnLock();
00061                   }
00062 
00063 int   Same(unsigned long long arg1, unsigned long long arg2) {return 0;}
00064 
00065       XrdBwmHandleCB() : Next(0) {}
00066      ~XrdBwmHandleCB() {}
00067 
00068 private:
00069        XrdBwmHandleCB *Next;
00070 static XrdSysMutex     xMutex;
00071 static XrdBwmHandleCB *Free;
00072 };
00073 
00074 XrdSysMutex     XrdBwmHandleCB::xMutex;
00075 XrdBwmHandleCB *XrdBwmHandleCB::Free = 0;
00076   
00077 /******************************************************************************/
00078 /*                     E x t e r n a l   L i n k a g e s                      */
00079 /******************************************************************************/
00080   
00081 void *XrdBwmHanXeq(void *pp)
00082 {
00083      return XrdBwmHandle::Dispatch();
00084 }
00085 
00086 /******************************************************************************/
00087 /*                    c l a s s   X r d B w m H a n d l e                     */
00088 /******************************************************************************/
00089 /******************************************************************************/
00090 /*                              A c t i v a t e                               */
00091 /******************************************************************************/
00092 
00093 #define tident Parms.Tident
00094   
00095 int XrdBwmHandle::Activate(XrdOucErrInfo &einfo)
00096 {
00097    EPNAME("Activate");
00098    XrdSysMutexHelper myHelper(hMutex);
00099    char *rBuff;
00100    int  rSize, rc;
00101 
00102 // Check the status of this request.
00103 //
00104    if (Status != Idle)
00105       {if (Status == Scheduled)
00106           einfo.setErrInfo(kXR_inProgress, "Request already scheduled.");
00107           else einfo.setErrInfo(kXR_InvalidRequest, "Visa already issued.");
00108        return SFS_ERROR;
00109       }
00110 
00111 // Try to schedule this request.
00112 //
00113    qTime = time(0);
00114    rBuff = einfo.getMsgBuff(rSize);
00115    if (!(rc = Policy->Schedule(rBuff, rSize, Parms))) return SFS_ERROR;
00116 
00117 // If resource immediately available, let client run
00118 //
00119    if (rc > 0)
00120       {rHandle = rc;
00121        Status  = Dispatched;
00122        rTime   = time(0);
00123        ZTRACE(sched,"Run " <<Parms.Lfn <<' ' <<Parms.LclNode
00124                     <<(Parms.Direction==XrdBwmPolicy::Incomming?" <- ":" -> ")
00125                     <<Parms.RmtNode);
00126        einfo.setErrCode(strlen(rBuff));
00127        return (*rBuff ? SFS_DATA : SFS_OK);
00128       }
00129 
00130 // Request was queued. We need to hold on to this so we can issue an async
00131 // response later when the resource becomes available.
00132 //
00133    rHandle = -rc;
00134    ErrCB = einfo.getErrCB(ErrCBarg);
00135    einfo.setErrCB((XrdOucEICB *)&myEICB);
00136    Status = Scheduled;
00137    refHandle(rHandle, this);
00138    ZTRACE(sched, "inQ " <<Parms.Lfn <<' ' <<Parms.LclNode
00139                 <<(Parms.Direction==XrdBwmPolicy::Incomming?" <- ":" -> ")
00140                 <<Parms.RmtNode);
00141 
00142 // Indicate that client needs to wait
00143 //
00144    return SFS_STARTED;
00145 }
00146 #undef tident
00147 
00148 /******************************************************************************/
00149 /* static public                A l l o c   # 1                               */
00150 /******************************************************************************/
00151   
00152 XrdBwmHandle *XrdBwmHandle::Alloc(const char *theUsr,  const char *thePath,
00153                                   const char *LclNode, const char *RmtNode,
00154                                   int Incomming)
00155 {
00156    XrdBwmHandle *hP = Alloc();
00157 
00158 // Initialize the hanlde
00159 //
00160    if (hP)
00161       {hP->Parms.Tident    = theUsr;           // Always available
00162        hP->Parms.Lfn       = strdup(thePath);
00163        hP->Parms.LclNode   = strdup(LclNode);
00164        hP->Parms.RmtNode   = strdup(RmtNode);
00165        hP->Parms.Direction = (Incomming ? XrdBwmPolicy::Incomming
00166                                         : XrdBwmPolicy::Outgoing);
00167        hP->Status          = Idle;
00168        hP->qTime           = 0;
00169        hP->rTime           = 0;
00170        hP->xSize           = 0;
00171        hP->xTime           = 0;
00172       }
00173 
00174 // All done
00175 //
00176    return hP;
00177 }
00178 
00179 /******************************************************************************/
00180 /* private                      A l l o c   # 2                               */
00181 /******************************************************************************/
00182   
00183 XrdBwmHandle *XrdBwmHandle::Alloc(XrdBwmHandle *old_hP)
00184 {
00185    static const int minAlloc = 4096/sizeof(XrdBwmHandle);
00186    static XrdSysMutex aMutex;
00187    XrdBwmHandle *hP;
00188 
00189 // No handle currently in the table. Get a new one off the free list or
00190 // return one to the free list.
00191 //
00192    aMutex.Lock();
00193    if (old_hP) {old_hP->Next = Free; Free = old_hP; hP = 0;}
00194      else {if (!Free && (hP = new XrdBwmHandle[minAlloc]))
00195               {int i = minAlloc; while(i--) {hP->Next = Free; Free = hP; hP++;}}
00196            if ((hP = Free)) Free = hP->Next;
00197           }
00198    aMutex.UnLock();
00199 
00200    return hP;
00201 }
00202   
00203 /******************************************************************************/
00204 /*                              D i s p a t c h                               */
00205 /******************************************************************************/
00206 
00207 #define tident hP->Parms.Tident
00208   
00209 void *XrdBwmHandle::Dispatch()
00210 {
00211    EPNAME("Dispatch");
00212    XrdBwmHandleCB *erP = XrdBwmHandleCB::Alloc();
00213    XrdBwmHandle   *hP;
00214    char *RespBuff;
00215    int   RespSize, readyH, Result, Err;
00216 
00217 // Dispatch ready requests in an endless loop
00218 //
00219    do {
00220 
00221 // Setup buffer
00222 //
00223    RespBuff = erP->getMsgBuff(RespSize); 
00224    *RespBuff = '\0';
00225    erP->setErrCode(0);
00226 
00227 // Get next ready request and test if it ended with an error
00228 //
00229    if ((Err = (readyH = Policy->Dispatch(RespBuff, RespSize)) < 0))
00230       readyH = -readyH;
00231 
00232 // Find the matching handle
00233 //
00234    if (!(hP = refHandle(readyH)))
00235       {sprintf(RespBuff, "%d", readyH);
00236        BwmEroute.Emsg("Dispatch", "Lost handle from", RespBuff);
00237        if (!Err) Policy->Done(readyH);
00238        continue;
00239       }
00240 
00241 // Lock the handle and make sure it can be dispatched
00242 //
00243    hP->hMutex.Lock();
00244    if (hP->Status != Scheduled)
00245       {BwmEroute.Emsg("Dispatch", "ref to unscheduled handle",
00246                       hP->Parms.Tident, hP->Parms.Lfn);
00247        if (!Err) Policy->Done(readyH);
00248       } else {
00249        hP->myEICB.Wait(); hP->rTime = time(0);
00250        erP->setErrCB((XrdOucEICB *)erP, hP->ErrCBarg);
00251        if (Err) {hP->Status = Idle; Result = SFS_ERROR;}
00252           else  {hP->Status = Dispatched;
00253                  erP->setErrCode(strlen(RespBuff));
00254                  Result = (*RespBuff ? SFS_DATA : SFS_OK);
00255                 }
00256        ZTRACE(sched,(Err?"Err ":"Run ") <<hP->Parms.Lfn <<' ' <<hP->Parms.LclNode
00257              <<(hP->Parms.Direction == XrdBwmPolicy::Incomming ? " <- ":" -> ")
00258              <<hP->Parms.RmtNode);
00259        hP->ErrCB->Done(Result, (XrdOucErrInfo *)erP);
00260        erP = XrdBwmHandleCB::Alloc();
00261       }
00262     hP->hMutex.UnLock();
00263    } while(1);
00264 
00265 // Keep the compiler happy
00266 //
00267    return (void *)0;
00268 }
00269 
00270 #undef tident
00271 
00272 /******************************************************************************/
00273 /* private                     r e f H a n d l e                              */
00274 /******************************************************************************/
00275   
00276 XrdBwmHandle *XrdBwmHandle::refHandle(int refID, XrdBwmHandle *hP)
00277 {
00278    static XrdSysMutex tMutex;
00279    static struct {XrdBwmHandle *First;
00280                   XrdBwmHandle *Last;
00281                  }              hTab[256] = {{0,0}};
00282    XrdBwmHandle *pP = 0;
00283    int i = refID % 256;
00284 
00285 // If we have a handle passed, add the handle to the table
00286 //
00287    tMutex.Lock();
00288    if (hP)
00289       {hP->Next = 0;
00290        if (hTab[i].Last) {hTab[i].Last->Next = hP; hTab[i].Last = hP;}
00291           else {hTab[i].First = hTab[i].Last = hP; hP->Next = 0;}
00292        numQueued++;
00293       } else {
00294        hP = hTab[i].First;
00295        while(hP && hP->rHandle != refID) {pP = hP; hP = hP->Next;}
00296        if (hP)
00297           {if (pP) pP->Next = hP->Next;
00298               else hTab[i].First = hP->Next;
00299            if (hTab[i].Last == hP) hTab[i].Last = pP;
00300            numQueued--;
00301           }
00302       }
00303     tMutex.UnLock();
00304 
00305 // All done.
00306 //
00307    return hP;
00308 }
00309 
00310 /******************************************************************************/
00311 /* public                         R e t i r e                                 */
00312 /******************************************************************************/
00313 
00314 // The handle must be locked upon entry! It is unlocked upon exit.
00315 
00316 void XrdBwmHandle::Retire()
00317 {
00318    XrdSysMutexHelper myHelper(hMutex);
00319 
00320 // Get the global lock as the links field can only be manipulated with it.
00321 // If not idle, cancel the resource. If scheduled, remove it from the table.
00322 //
00323    if (Status != Idle) 
00324       {Policy->Done(rHandle);
00325        if (Status == Scheduled && !refHandle(rHandle, this))
00326           BwmEroute.Emsg("Retire", "Lost handle to", Parms.Tident, Parms.Lfn);
00327        Status = Idle; rHandle = 0;
00328       }
00329 
00330 // If we have a logger, then log this event
00331 //
00332    if (Logger && qTime)
00333       {XrdBwmLogger::Info myInfo;
00334        myInfo.Tident  = Parms.Tident;
00335        myInfo.Lfn     = Parms.Lfn;
00336        myInfo.lclNode = Parms.LclNode;
00337        myInfo.rmtNode = Parms.RmtNode;
00338        myInfo.ATime   = qTime;
00339        myInfo.BTime   = rTime;
00340        myInfo.CTime   = time(0);
00341        myInfo.Size    = xSize;
00342        myInfo.ESec    = xTime;
00343        myInfo.Flow    = (Parms.Direction == XrdBwmPolicy::Incomming ? 'I':'O');
00344        Policy->Status(myInfo.numqIn, myInfo.numqOut, myInfo.numqXeq);
00345        Logger->Event(myInfo);
00346       }
00347 
00348 // Free storage appendages and recycle handle
00349 //
00350    if (Parms.Lfn)     {free(Parms.Lfn);     Parms.Lfn = 0;}
00351    if (Parms.LclNode) {free(Parms.LclNode); Parms.LclNode = 0;}
00352    if (Parms.RmtNode) {free(Parms.RmtNode); Parms.RmtNode = 0;}
00353    Alloc(this);
00354 }
00355 
00356 /******************************************************************************/
00357 /*                             s e t P o l i c y                              */
00358 /******************************************************************************/
00359   
00360 int XrdBwmHandle::setPolicy(XrdBwmPolicy *pP, XrdBwmLogger *lP)
00361 {
00362    pthread_t tid;
00363    int rc, startThread = (Policy == 0);
00364 
00365 // Set the policy and then start a thread to do dispatching if we have none
00366 //
00367    Policy = pP;
00368    if (startThread)
00369       if ((rc = XrdSysThread::Run(&tid, XrdBwmHanXeq, (void *)0,
00370                                   0, "Handle Dispatcher")))
00371          {BwmEroute.Emsg("setPolicy", rc, "create handle dispatch thread");
00372           return 1;
00373          }
00374 
00375 // All done
00376 //
00377    Logger = lP;
00378    return 0;
00379 }

Generated on Tue Jul 5 14:46:16 2011 for ROOT_528-00b_version by  doxygen 1.5.1