XrdCmsCache.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                        X r d C m s C a c h e . c c                         */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //         $Id: XrdCmsCache.cc 32231 2010-02-05 18:24:46Z ganis $
00012 
00013 // Original Version: 1.13 2007/07/12 21:57:38 abh
00014 
00015 const char *XrdCmsCacheCVSID = "$Id: XrdCmsCache.cc 32231 2010-02-05 18:24:46Z ganis $";
00016   
00017 #include <stdio.h>
00018 #include <sys/types.h>
00019 
00020 #include "XrdCms/XrdCmsCache.hh"
00021 #include "XrdCms/XrdCmsRRQ.hh"
00022 #include "XrdCms/XrdCmsTrace.hh"
00023 
00024 #include "XrdSys/XrdSysTimer.hh"
00025 
00026 #include "Xrd/XrdJob.hh"
00027 #include "Xrd/XrdScheduler.hh"
00028 
00029 namespace XrdCms
00030 {
00031 extern XrdScheduler *Sched;
00032 }
00033 
00034 using namespace XrdCms;
00035 
00036 /******************************************************************************/
00037 /*                               G l o b a l s                                */
00038 /******************************************************************************/
00039   
00040 XrdCmsCache XrdCms::Cache;
00041   
00042 /******************************************************************************/
00043 /*                         L o c a l   C l a s s e s                          */
00044 /******************************************************************************/
00045   
00046 class XrdCmsCacheJob : XrdJob
00047 {
00048 public:
00049 
00050 void   DoIt() {Cache.Recycle(myList); delete this;}
00051 
00052        XrdCmsCacheJob(XrdCmsKeyItem *List)
00053                      : XrdJob("cache scrubber"), myList(List) {}
00054       ~XrdCmsCacheJob() {}
00055 
00056 private:
00057 
00058 XrdCmsKeyItem *myList;
00059 };
00060 
00061 /******************************************************************************/
00062 /*            E x t e r n a l   T h r e a d   I n t e r f a c e s             */
00063 /******************************************************************************/
00064   
00065 void *XrdCmsStartTickTock(void *carg)
00066      {XrdCmsCache *myCache = (XrdCmsCache *)carg;
00067       return myCache->TickTock();
00068      }
00069 
00070 /******************************************************************************/
00071 /*     P u b l i c   C a c h e   M a n i p u l a t i o n   M e t h o d s      */
00072 /******************************************************************************/
00073 /******************************************************************************/
00074 /* Public                        A d d F i l e                                */
00075 /******************************************************************************/
00076 
00077 // This method insert or updates information about a path in the cache.
00078 
00079 // Key was found:  Location information is updated depending on mask
00080 // mask == 0       Indicates that the information is being refreshed.
00081 //                 Location information is nullified. The update deadline is set
00082 //                 DLTime seconds in the future. The entry window is set to the
00083 //                 current window to be held for a full fxhold period.
00084 // mask != 0       Indicates that some location information is now known.
00085 //                 Location information is updated according to the mask.
00086 //                 For a r/w location, the deadline is satisfied and all
00087 //                 callbacks are dispatched. For an r/o location the deadline
00088 //                 is satisfied if no r/w callback is pending. Any r/o
00089 //                 callback is dispatched. The Info object is ignored.
00090 
00091 // Key not found:  A selective addition occurs, depending on Sel.Opts
00092 // Opts !Advisory: The entry is added to the cache with location information
00093 //                 set as passed (usually 0). The update deadline is us set to
00094 //                 DLTtime seconds in the future. The entry window is set 
00095 //                 to the current window.
00096 // Opts  Advisory: The call is ignored since we do not keep information about
00097 //                 paths that were never asked for.
00098 
00099 // Returns True    If this is the first time location information was added
00100 //                 to the entry.
00101 // Returns False   Otherwise.
00102   
00103 int XrdCmsCache::AddFile(XrdCmsSelect &Sel, SMask_t mask)
00104 {
00105    XrdCmsKeyItem *iP;
00106    SMask_t xmask;
00107    int isrw = (Sel.Opts & XrdCmsSelect::Write), isnew = 0;
00108 
00109 // Serialize processing
00110 //
00111    myMutex.Lock();
00112 
00113 // Check for fast path processing
00114 //
00115    if (  !(iP = Sel.Path.TODRef) || !(iP->Key.Equiv(Sel.Path)))
00116       if ((iP = Sel.Path.TODRef = CTable.Find(Sel.Path)))
00117          Sel.Path.Ref = iP->Key.Ref;
00118 
00119 // Add/Modify the entry
00120 //
00121    if (iP)
00122       {if (!mask)
00123           {iP->Loc.deadline = DLTime + time(0);
00124            iP->Loc.hfvec = 0; iP->Loc.pfvec = 0; iP->Loc.qfvec = 0;
00125            iP->Loc.TOD_B = BClock;
00126            iP->Key.TOD = Tock;
00127           } else {
00128            xmask = iP->Loc.pfvec;
00129            if (Sel.Opts & XrdCmsSelect::Pending) iP->Loc.pfvec |= mask;
00130               else iP->Loc.pfvec &= ~mask;
00131            isnew = (iP->Loc.hfvec == 0) || (iP->Loc.pfvec != xmask);
00132            iP->Loc.hfvec |=  mask;
00133            iP->Loc.qfvec &= ~mask;
00134            if (isrw) {iP->Loc.deadline = 0;
00135                       if (iP->Loc.roPend || iP->Loc.rwPend)
00136                          Dispatch(iP, iP->Loc.roPend, iP->Loc.rwPend);
00137                      }
00138               else   {if (!iP->Loc.rwPend) iP->Loc.deadline = 0;
00139                       if (iP->Loc.roPend) Dispatch(iP, iP->Loc.roPend, 0);
00140                      }
00141           }
00142       } else if (!(Sel.Opts & XrdCmsSelect::Advisory))
00143                 {Sel.Path.TOD = Tock;
00144                  if ((iP = CTable.Add(Sel.Path)))
00145                     {iP->Loc.pfvec    = (Sel.Opts&XrdCmsSelect::Pending?mask:0);
00146                      iP->Loc.hfvec    = mask;
00147                      iP->Loc.TOD_B    = BClock;
00148                      iP->Loc.qfvec    = 0;
00149                      iP->Loc.deadline = DLTime + time(0);
00150                      Sel.Path.Ref     = iP->Key.Ref;
00151                      Sel.Path.TODRef  = iP; isnew = 1;
00152                     }
00153                 }
00154 
00155 // All done
00156 //
00157    myMutex.UnLock();
00158    return isnew;
00159 }
00160   
00161 /******************************************************************************/
00162 /* Public                        D e l F i l e                                */
00163 /******************************************************************************/
00164 
00165 // This method removes location information from existing valid entries. If an
00166 // existing valid entry is found, based on Sel.Opts the following occurs:
00167 
00168 // Opts  Advisory only locate information is removed.
00169 // Opts !Advisory if the entry has no location information it is removed from
00170 //                the cache, if possible.
00171 
00172 // TRUE is returned if the entry was valid but location information was cleared.
00173 // Otherwise, FALSE is returned.
00174   
00175 int XrdCmsCache::DelFile(XrdCmsSelect &Sel, SMask_t mask)
00176 {
00177    XrdCmsKeyItem *iP;
00178    int gone4good;
00179 
00180 // Lock the hash table
00181 //
00182    myMutex.Lock();
00183 
00184 // Look up the entry and remove server
00185 //
00186    if ((iP = CTable.Find(Sel.Path)))
00187       {iP->Loc.hfvec &= ~mask;
00188        iP->Loc.pfvec &= ~mask;
00189        if ((gone4good = (iP->Loc.hfvec == 0))
00190        && (!(Sel.Opts & XrdCmsSelect::Advisory))
00191        && (XrdCmsKeyItem::Unload(iP) && !CTable.Recycle(iP)))
00192           Say.Emsg("DelFile", "Delete failed for", iP->Key.Val);
00193       } else gone4good = 0;
00194 
00195 // All done
00196 //
00197    myMutex.UnLock();
00198    return gone4good;
00199 }
00200   
00201 /******************************************************************************/
00202 /* Public                        G e t F i l e                                */
00203 /******************************************************************************/
00204 
00205 // This method looks up entries in the cache. An "entry not found" condition
00206 // holds is the entry was found but is marked as deleted.
00207 
00208 // Entry was found: Location information is passed bask. If the update deadline
00209 //                  has passed, it is nullified and 1 is returned. Otherwise,
00210 //                  -1 is returned indicating a query is in progress.
00211 
00212 // Entry not found: FALSE is returned.
00213   
00214 int  XrdCmsCache::GetFile(XrdCmsSelect &Sel, SMask_t mask)
00215 {
00216    XrdCmsKeyItem *iP;
00217    SMask_t bVec;
00218    int retc;
00219 
00220 // Lock the hash table
00221 //
00222    myMutex.Lock();
00223 
00224 // Look up the entry and return location information
00225 //
00226    if ((iP = CTable.Find(Sel.Path)))
00227       {if ((bVec = (iP->Loc.TOD_B < BClock 
00228                  ? getBVec(iP->Key.TOD, iP->Loc.TOD_B) & mask : 0)))
00229           {iP->Loc.hfvec &= ~bVec; 
00230            iP->Loc.pfvec &= ~bVec;
00231            iP->Loc.qfvec &= ~mask;
00232            iP->Loc.deadline = DLTime + time(0); 
00233            retc = -1;
00234           } else if (iP->Loc.deadline)
00235                     if (iP->Loc.deadline > time(0)) retc = -1;
00236                        else {iP->Loc.deadline = 0;  retc =  1;}
00237                     else retc = 1;
00238        Sel.Vec.hf      = okVec & iP->Loc.hfvec;
00239        Sel.Vec.pf      = okVec & iP->Loc.pfvec;
00240        Sel.Vec.bf      = okVec & (bVec | iP->Loc.qfvec); iP->Loc.qfvec = 0;
00241        Sel.Path.Ref    = iP->Key.Ref;
00242       } else retc = 0;
00243 
00244 // All done
00245 //
00246    myMutex.UnLock();
00247    Sel.Path.TODRef = iP;
00248    return retc;
00249 }
00250 
00251 /******************************************************************************/
00252 /* Public                        U n k F i l e                                */
00253 /******************************************************************************/
00254   
00255 int XrdCmsCache::UnkFile(XrdCmsSelect &Sel, SMask_t mask)
00256 {
00257    EPNAME("UnkFile");
00258    XrdCmsKeyItem *iP;
00259 
00260 // Make sure we have the proper information. If so, lock the hash table
00261 //
00262    myMutex.Lock();
00263 
00264 // Look up the entry and if valid update the unqueried vector. Note that
00265 // this method may only be called after GetFile() or AddFile() for a new entry
00266 //
00267    if ((iP = Sel.Path.TODRef))
00268       {if (iP->Key.Equiv(Sel.Path)) iP->Loc.qfvec = mask;
00269           else iP = 0;
00270       }
00271 
00272 // Return result
00273 //
00274    myMutex.UnLock();
00275    DEBUG("rc=" <<(iP ? 1 : 0) <<" path=" <<Sel.Path.Val);
00276    return (iP ? 1 : 0);
00277 }
00278   
00279 /******************************************************************************/
00280 /* Public                        W T 4 F i l e                                */
00281 /******************************************************************************/
00282   
00283 int XrdCmsCache::WT4File(XrdCmsSelect &Sel, SMask_t mask)
00284 {
00285    EPNAME("WT4File");
00286    XrdCmsKeyItem *iP;
00287    time_t  Now;
00288    int     retc;
00289 
00290 // Make sure we have the proper information. If so, lock the hash table
00291 //
00292    if (!Sel.InfoP) return DLTime;
00293    myMutex.Lock();
00294 
00295 // Look up the entry and if valid add it to the callback queue. Note that
00296 // this method may only be called after GetFile() or AddFile() for a new entry
00297 //
00298    if (!(iP = Sel.Path.TODRef) || !(iP->Key.Equiv(Sel.Path))) retc = DLTime;
00299       else if (iP->Loc.hfvec != mask)                         retc = 1;
00300               else {Now = time(0);                            retc = 0;
00301                     if (iP->Loc.deadline && iP->Loc.deadline <= Now)
00302                         iP->Loc.deadline = DLTime + Now;
00303                     Add2Q(Sel.InfoP, iP, Sel.Opts & XrdCmsSelect::Write);
00304                    }
00305 
00306 // Return result
00307 //
00308    myMutex.UnLock();
00309    DEBUG("rc=" <<retc <<" path=" <<Sel.Path.Val);
00310    return retc;
00311 }
00312   
00313 /******************************************************************************/
00314 /*         P u b l i c   A d m i n i s t r a t i v e   C l a s s e s          */
00315 /******************************************************************************/
00316 /******************************************************************************/
00317 /* public                         B o u n c e                                 */
00318 /******************************************************************************/
00319 
00320 void XrdCmsCache::Bounce(SMask_t smask, int SNum)
00321 {
00322 
00323 // Simply indicate that this server bounced
00324 //
00325    myMutex.Lock();
00326    Bounced[SNum] = ++BClock;
00327    okVec |= smask;
00328    if (SNum > vecHi) vecHi = SNum;
00329    myMutex.UnLock();
00330 }
00331 
00332 /******************************************************************************/
00333 /* Public                           D r o p                                   */
00334 /******************************************************************************/
00335   
00336 void XrdCmsCache::Drop(SMask_t smask, int SNum, int xHi)
00337 {
00338    SMask_t nmask(~smask);
00339 
00340 // Remove the node from the path list
00341 //
00342    Paths.Remove(smask);
00343 
00344 // Remove the node from the list of valid nodes
00345 //
00346    myMutex.Lock();
00347    Bounced[SNum] = 0;
00348    okVec &= nmask;
00349    vecHi = xHi;
00350    myMutex.UnLock();
00351 }
00352 
00353 /******************************************************************************/
00354 /* public                           I n i t                                   */
00355 /******************************************************************************/
00356   
00357 int XrdCmsCache::Init(int fxHold, int fxDelay)
00358 {
00359    XrdCmsKeyItem *iP;
00360    pthread_t tid;
00361 
00362 // Initialize the delay time and the bounce clock tick window
00363 //
00364    DLTime = fxDelay;
00365    if (!(Tick = fxHold/XrdCmsKeyItem::TickRate)) Tick = 1;
00366 
00367 // Start the clock thread
00368 //
00369    if (XrdSysThread::Run(&tid, XrdCmsStartTickTock, (void *)this,
00370                             0, "Cache Clock"))
00371       {Say.Emsg("Init", errno, "start cache clock");
00372        return 0;
00373       }
00374 
00375 // Get the first reserve of cache items
00376 //
00377    iP = XrdCmsKeyItem::Alloc(0);
00378    XrdCmsKeyItem::Unload((unsigned int)0);
00379    iP->Recycle();
00380 
00381 // All done
00382 //
00383    return 1;
00384 }
00385 
00386 /******************************************************************************/
00387 /* public                       T i c k T o c k                               */
00388 /******************************************************************************/
00389 
00390 void *XrdCmsCache::TickTock()
00391 {
00392    XrdCmsKeyItem *iP;
00393 
00394 // Simply adjust the clock and trim old entries
00395 //
00396    do {XrdSysTimer::Snooze(Tick);
00397        myMutex.Lock();
00398        Tock = (Tock+1) & XrdCmsKeyItem::TickMask;
00399        Bhistory[Tock].Start = Bhistory[Tock].End = 0;
00400        iP = XrdCmsKeyItem::Unload(Tock);
00401        myMutex.UnLock();
00402        if (iP) Sched->Schedule((XrdJob *)new XrdCmsCacheJob(iP));
00403       } while(1);
00404 
00405 // Keep compiler happy
00406 //
00407    return (void *)0;
00408 }
00409 
00410 /******************************************************************************/
00411 /*                       P r i v a t e   M e t h o d s                        */
00412 /******************************************************************************/
00413 /******************************************************************************/
00414 /*                                 A d d 2 Q                                  */
00415 /******************************************************************************/
00416   
00417 void XrdCmsCache::Add2Q(XrdCmsRRQInfo *Info, XrdCmsKeyItem *iP, int isrw)
00418 {
00419    short Slot = (isrw ? iP->Loc.rwPend : iP->Loc.roPend);
00420 
00421 // Add the request to the appropriate pending queue
00422 //
00423    Info->Key = iP;
00424    Info->isRW= isrw;
00425    if (!(Slot = RRQ.Add(Slot, Info))) Info->Key = 0;
00426       else if (isrw) iP->Loc.rwPend = Slot;
00427                else  iP->Loc.roPend = Slot;
00428 }
00429 
00430 /******************************************************************************/
00431 /*                              D i s p a t c h                               */
00432 /******************************************************************************/
00433   
00434 void XrdCmsCache::Dispatch(XrdCmsKeyItem *iP, short roQ, short rwQ)
00435 {
00436 
00437 // Dispach the waiting elements
00438 //
00439    if (roQ) {RRQ.Ready(roQ, iP, iP->Loc.hfvec, iP->Loc.pfvec);
00440              iP->Loc.roPend = 0;
00441             }
00442    if (rwQ) {RRQ.Ready(rwQ, iP, iP->Loc.hfvec, iP->Loc.pfvec);
00443              iP->Loc.rwPend = 0;
00444             }
00445 }
00446 
00447 /******************************************************************************/
00448 /*                               g e t B V e c                                */
00449 /******************************************************************************/
00450   
00451 SMask_t XrdCmsCache::getBVec(unsigned int TODa, unsigned int &TODb)
00452 {
00453    EPNAME("getBVec");
00454    SMask_t BVec(0);
00455    long long i;
00456 
00457 // See if we can use a previously calculated bVec
00458 //
00459    if (Bhistory[TODa].End == BClock && Bhistory[TODa].Start <= TODb)
00460       {Bhits++; TODb = BClock; return Bhistory[TODa].Vec;}
00461 
00462 // Calculate the new vector
00463 //
00464    for (i = 0; i <= vecHi; i++)
00465        if (TODb < Bounced[i]) BVec |= 1ULL << i;
00466 
00467    Bhistory[TODa].Vec   = BVec;
00468    Bhistory[TODa].Start = TODb;
00469    Bhistory[TODa].End   = BClock;
00470    TODb                 = BClock;
00471    Bmiss++;
00472    if (!(Bmiss & 0xff)) DEBUG("hits=" <<Bhits <<" miss=" <<Bmiss);
00473    return BVec;
00474 }
00475 
00476 /******************************************************************************/
00477 /*                               R e c y c l e                                */
00478 /******************************************************************************/
00479   
00480 void XrdCmsCache::Recycle(XrdCmsKeyItem *theList)
00481 {
00482    XrdCmsKeyItem *iP;
00483    char msgBuff[100];
00484    int numNull, numHave, numFree, numRecycled = 0;
00485 
00486 // Recycle the list of cache items, as needed
00487 //
00488    while((iP = theList))
00489         {theList = iP->Key.TODRef;
00490          if (iP->Loc.roPend) RRQ.Del(iP->Loc.roPend, iP);
00491          if (iP->Loc.rwPend) RRQ.Del(iP->Loc.rwPend, iP);
00492          myMutex.Lock(); CTable.Recycle(iP); myMutex.UnLock();
00493          numRecycled++;
00494         }
00495 
00496 // See if we have enough items in reserve
00497 //
00498    myMutex.Lock();
00499    XrdCmsKeyItem::Stats(numHave, numFree, numNull);
00500    if (numFree < XrdCmsKeyItem::minFree)
00501       {myMutex.UnLock();
00502        if (!(numNull /= 4)) numNull = 1;
00503        numHave += XrdCmsKeyItem::minAlloc * numNull;
00504        while(numNull--)
00505             {myMutex.Lock();
00506              numFree = XrdCmsKeyItem::Replenish();
00507              myMutex.UnLock();
00508             }
00509       } else myMutex.UnLock();
00510 
00511 // Log the stats
00512 //
00513    sprintf(msgBuff, "%d cache items; %d allocated %d free",
00514            numRecycled, numHave, numFree);
00515    Say.Emsg("Recycle", msgBuff);
00516 }

Generated on Tue Jul 5 14:46:22 2011 for ROOT_528-00b_version by  doxygen 1.5.1