XrdCmsCluster.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                      X r d C m s C l u s t e r . c c                       */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //         $Id: XrdCmsCluster.cc 38011 2011-02-08 18:35:57Z ganis $
00012 
00013 // Original Version: 1.38 2007/07/26 15:18:24 ganis
00014 
00015 const char *XrdCmsClusterCVSID = "$Id: XrdCmsCluster.cc 38011 2011-02-08 18:35:57Z ganis $";
00016 
00017 #include <errno.h>
00018 #include <fcntl.h>
00019 #include <stdio.h>
00020 #include <stdlib.h>
00021 #include <unistd.h>
00022 #include <netinet/in.h>
00023 #include <sys/types.h>
00024 
00025 #include "XProtocol/YProtocol.hh"
00026   
00027 #include "Xrd/XrdJob.hh"
00028 #include "Xrd/XrdLink.hh"
00029 #include "Xrd/XrdScheduler.hh"
00030 
00031 #include "XrdCms/XrdCmsCache.hh"
00032 #include "XrdCms/XrdCmsConfig.hh"
00033 #include "XrdCms/XrdCmsCluster.hh"
00034 #include "XrdCms/XrdCmsNode.hh"
00035 #include "XrdCms/XrdCmsState.hh"
00036 #include "XrdCms/XrdCmsSelect.hh"
00037 #include "XrdCms/XrdCmsTrace.hh"
00038 #include "XrdCms/XrdCmsTypes.hh"
00039 
00040 #include "XrdNet/XrdNetDNS.hh"
00041 
00042 #include "XrdOuc/XrdOucPup.hh"
00043 
00044 #include "XrdSys/XrdSysPlatform.hh"
00045 #include "XrdSys/XrdSysPthread.hh"
00046 #include "XrdSys/XrdSysTimer.hh"
00047 
00048 using namespace XrdCms;
00049 
00050 /******************************************************************************/
00051 /*                        G l o b a l   O b j e c t s                         */
00052 /******************************************************************************/
00053 
00054        XrdCmsCluster   XrdCms::Cluster;
00055 
00056 /******************************************************************************/
00057 /*                      L o c a l   S t r u c t u r e s                       */
00058 /******************************************************************************/
00059   
00060 class XrdCmsDrop : XrdJob
00061 {
00062 public:
00063 
00064      void DoIt() {Cluster.STMutex.Lock();
00065                   int rc = Cluster.Drop(nodeEnt, nodeInst, this);
00066                   Cluster.STMutex.UnLock();
00067                   if (!rc) delete this;
00068                  }
00069 
00070           XrdCmsDrop(int nid, int inst) : XrdJob("drop node")
00071                     {nodeEnt  = nid;
00072                      nodeInst = inst;
00073                      Sched->Schedule((XrdJob *)this, time(0)+Config.DRPDelay);
00074                     }
00075          ~XrdCmsDrop() {}
00076 
00077 int  nodeEnt;
00078 int  nodeInst;
00079 };
00080   
00081 /******************************************************************************/
00082 /*                           C o n s t r u c t o r                            */
00083 /******************************************************************************/
00084 
00085 XrdCmsCluster::XrdCmsCluster()
00086 {
00087      memset((void *)NodeTab, 0, sizeof(NodeTab));
00088      memset((void *)AltMans, (int)' ', sizeof(AltMans));
00089      cidFirst=  0;
00090      AltMend = AltMans;
00091      AltMent = -1;
00092      NodeCnt =  0;
00093      STHi    = -1;
00094      SelAcnt = 0;
00095      SelRcnt = 0;
00096      doReset = 0;
00097      resetMask = 0;
00098      peerHost  = 0;
00099      peerMask  = ~peerHost;
00100 }
00101   
00102 /******************************************************************************/
00103 /*                                   A d d                                    */
00104 /******************************************************************************/
00105   
00106 XrdCmsNode *XrdCmsCluster::Add(XrdLink *lp, int port, int Status,
00107                                int sport, const char *theNID)
00108 {
00109    EPNAME("Add")
00110    sockaddr InetAddr;
00111    const char *act = "";
00112    unsigned int ipaddr;
00113    XrdCmsNode *nP = 0;
00114    int Slot, Free = -1, Bump1 = -1, Bump2 = -1, Bump3 = -1, aSet = 0;
00115    int tmp, Special = (Status & (CMS_isMan|CMS_isPeer));
00116    XrdSysMutexHelper STMHelper(STMutex);
00117 
00118 // Establish our IP address
00119 //
00120    lp->Name(&InetAddr);
00121    ipaddr = XrdNetDNS::IPAddr(&InetAddr);
00122 
00123 // Find available slot for this node. Here are the priorities:
00124 // Slot  = Reconnecting node
00125 // Free  = Available slot           ( 1st in table)
00126 // Bump1 = Disconnected server      (last in table)
00127 // Bump2 = Connected    server      (last in table) if new one is managr/peer
00128 // Bump3 = Disconnected managr/peer ( 1st in table) if new one is managr/peer
00129 //
00130    for (Slot = 0; Slot < STMax; Slot++)
00131        if (NodeTab[Slot])
00132           {if (NodeTab[Slot]->isNode(ipaddr, theNID)) break;
00133 /*Conn*/   if (NodeTab[Slot]->isConn)
00134               {if (!NodeTab[Slot]->isPerm && Special)
00135                                              Bump2 = Slot; // Last conn Server
00136 /*Disc*/      } else {
00137                if ( NodeTab[Slot]->isPerm)
00138                   {if (Bump3 < 0 && Special) Bump3 = Slot;}//  1st disc Man/Pr
00139                   else                       Bump1 = Slot; // Last disc Server
00140               }
00141           } else if (Free < 0)               Free  = Slot; //  1st free slot
00142 
00143 // Check if node is already logged in or is a relogin
00144 //
00145    if (Slot < STMax)
00146       {if (NodeTab[Slot] && NodeTab[Slot]->isBound)
00147           {Say.Emsg("Cluster", lp->ID, "already logged in.");
00148            return 0;
00149           } else { // Rehook node to previous unconnected entry
00150            nP = NodeTab[Slot];
00151            nP->Link      = lp;
00152            nP->isOffline = 0;
00153            nP->isConn    = 1;
00154            nP->Instance++;
00155            nP->setName(lp, port);  // Just in case it changed
00156            act = "Re-added ";
00157           }
00158       }
00159 
00160 // Reuse an old ID if we must or redirect the incomming node
00161 //
00162    if (!nP) 
00163       {if (Free >= 0) Slot = Free;
00164           else {if (Bump1 >= 0) Slot = Bump1;
00165                    else Slot = (Bump2 >= 0 ? Bump2 : Bump3);
00166                 if (Slot < 0)
00167                    {if (Status & CMS_isPeer) Say.Emsg("Cluster", "Add peer", lp->ID,
00168                                                 "failed; too many subscribers.");
00169                        else {sendAList(lp);
00170                              DEBUG(lp->ID <<" redirected; too many subscribers.");
00171                             }
00172                     return 0;
00173                    }
00174 
00175                 if (Status & CMS_isMan) {setAltMan(Slot,ipaddr,sport); aSet=1;}
00176                 if (NodeTab[Slot] && !(Status & CMS_isPeer))
00177                    sendAList(NodeTab[Slot]->Link);
00178 
00179                 DEBUG(lp->ID << " bumps " << NodeTab[Slot]->Ident <<" #" <<Slot);
00180                 NodeTab[Slot]->Lock();
00181                 Remove("redirected", NodeTab[Slot], -1);
00182                 act = "Shoved ";
00183                }
00184        NodeTab[Slot] = nP = new XrdCmsNode(lp, port, theNID, 0, Slot);
00185       }
00186 
00187 // Indicate whether this snode can be redirected
00188 //
00189    nP->isPerm = (Status & (CMS_isMan | CMS_isPeer)) ? CMS_Perm : 0;
00190 
00191 // Assign new server
00192 //
00193    if (!aSet && (Status & CMS_isMan)) setAltMan(Slot, ipaddr, sport);
00194    if (Slot > STHi) STHi = Slot;
00195    nP->isBound   = 1;
00196    nP->isConn    = 1;
00197    nP->isNoStage = (Status & CMS_noStage);
00198    nP->isSuspend = (Status & CMS_Suspend);
00199    nP->isMan     = (Status & CMS_isMan);
00200    nP->isPeer    = (Status & CMS_isPeer);
00201    nP->isDisable = 1;
00202    NodeCnt++;
00203    if (Config.SUPLevel
00204    && (tmp = NodeCnt*Config.SUPLevel/100) > Config.SUPCount)
00205       {Config.SUPCount=tmp; CmsState.Set(tmp);}
00206 
00207 // Compute new peer mask, as needed
00208 //
00209    if (nP->isPeer) peerHost |=  nP->NodeMask;
00210       else         peerHost &= ~nP->NodeMask;
00211    peerMask = ~peerHost;
00212 
00213 // Assign a unique cluster number
00214 //
00215    nP->myCNUM = Assign(nP->myCID);
00216 
00217 // Document login
00218 //
00219    DEBUG(act <<nP->Ident <<" to cluster " <<nP->myCNUM <<" slot "
00220          <<Slot <<'.' <<nP->Instance <<" (n=" <<NodeCnt <<" m="
00221          <<Config.SUPCount <<") ID=" <<nP->myNID);
00222 
00223 // Compute new state of all nodes if we are a reporting manager.
00224 //
00225    if (Config.asManager()) 
00226       CmsState.Update(XrdCmsState::Counts,nP->isSuspend?0:1,nP->isNoStage?0:1);
00227 
00228 // All done
00229 //
00230    return nP;
00231 }
00232 
00233 /******************************************************************************/
00234 /*                             B r o a d c a s t                              */
00235 /******************************************************************************/
00236 
00237 SMask_t XrdCmsCluster::Broadcast(SMask_t smask, const struct iovec *iod,
00238                                  int iovcnt, int iotot)
00239 {
00240    EPNAME("Broadcast")
00241    int i;
00242    XrdCmsNode *nP;
00243    SMask_t bmask, unQueried(0);
00244 
00245 // Obtain a lock on the table and screen out peer nodes
00246 //
00247    STMutex.Lock();
00248    bmask = smask & peerMask;
00249 
00250 // Run through the table looking for nodes to send messages to
00251 //
00252    for (i = 0; i <= STHi; i++)
00253        {if ((nP = NodeTab[i]) && nP->isNode(bmask))
00254            {nP->Lock();
00255             STMutex.UnLock();
00256             if (nP->Send(iod, iovcnt, iotot) < 0) 
00257                {unQueried |= nP->Mask();
00258                 DEBUG(nP->Ident <<" is unreachable");
00259                }
00260             nP->UnLock();
00261             STMutex.Lock();
00262            }
00263        }
00264    STMutex.UnLock();
00265    return unQueried;
00266 }
00267 
00268 /******************************************************************************/
00269 
00270 SMask_t XrdCmsCluster::Broadcast(SMask_t smask, XrdCms::CmsRRHdr &Hdr,
00271                                  char *Data,    int Dlen)
00272 {
00273    struct iovec ioV[3], *iovP = &ioV[1];
00274    unsigned short Temp;
00275    int Blen;
00276 
00277 // Construct packed data for the character argument. If data is a string then
00278 // Dlen must include the null byte if it is specified at all.
00279 //
00280    Blen  = XrdOucPup::Pack(&iovP, Data, Temp, (Dlen ? strlen(Data)+1 : Dlen));
00281    Hdr.datalen = htons(static_cast<unsigned short>(Blen));
00282 
00283 // Complete the iovec and send off the data
00284 //
00285    ioV[0].iov_base = (char *)&Hdr; ioV[0].iov_len = sizeof(Hdr);
00286    return Broadcast(smask, ioV, 3, Blen+sizeof(Hdr));
00287 }
00288 
00289 /******************************************************************************/
00290 
00291 SMask_t XrdCmsCluster::Broadcast(SMask_t smask, XrdCms::CmsRRHdr &Hdr,
00292                                  void *Data,    int Dlen)
00293 {
00294    struct iovec ioV[2] = {{(char *)&Hdr, sizeof(Hdr)}, {(char *)Data, Dlen}};
00295 
00296 // Send of the data as eveything was constructed properly
00297 //
00298    Hdr.datalen = htons(static_cast<unsigned short>(Dlen));
00299    return Broadcast(smask, ioV, 2, Dlen+sizeof(Hdr));
00300 }
00301 
00302 /******************************************************************************/
00303 /*                               g e t M a s k                                */
00304 /******************************************************************************/
00305 
00306 SMask_t XrdCmsCluster::getMask(unsigned int IPv4adr)
00307 {
00308    int i;
00309    XrdCmsNode *nP;
00310    SMask_t smask(0);
00311 
00312 // Obtain a lock on the table
00313 //
00314    STMutex.Lock();
00315 
00316 // Run through the table looking for a node with matching IP address
00317 //
00318    for (i = 0; i <= STHi; i++)
00319        if ((nP = NodeTab[i]) && nP->isNode(IPv4adr))
00320           {smask = nP->NodeMask; break;}
00321 
00322 // All done
00323 //
00324    STMutex.UnLock();
00325    return smask;
00326 }
00327 
00328 /******************************************************************************/
00329 
00330 SMask_t XrdCmsCluster::getMask(const char *Cid)
00331 {
00332    XrdCmsNode *nP;
00333    SMask_t smask(0);
00334    XrdOucTList *cP;
00335    int i = 1, Cnum = -1;
00336 
00337 // Lock the cluster ID list
00338 //
00339    cidMutex.Lock();
00340 
00341 // Now find the cluster
00342 //
00343    cP = cidFirst;
00344    while(cP && (i = strcmp(Cid, cP->text)) < 0) cP = cP->next;
00345 
00346 // If we didn't find the cluster, return a mask of zeroes
00347 //
00348    if (cP) Cnum = cP->val;
00349    cidMutex.UnLock();
00350    if (i) return smask;
00351 
00352 // Obtain a lock on the table
00353 //
00354    STMutex.Lock();
00355 
00356 // Run through the table looking for a node with matching cluster number
00357 //
00358    for (i = 0; i <= STHi; i++)
00359        if ((nP = NodeTab[i]) && nP->myCNUM == Cnum) smask |= nP->NodeMask;
00360 
00361 // All done
00362 //
00363    STMutex.UnLock();
00364    return smask;
00365 }
00366 
00367 /******************************************************************************/
00368 /*                                  L i s t                                   */
00369 /******************************************************************************/
00370   
00371 XrdCmsSelected *XrdCmsCluster::List(SMask_t mask, CmsLSOpts opts)
00372 {
00373     const char *reason;
00374     int i, iend, nump, delay, lsall = opts & LS_All;
00375     XrdCmsNode     *nP;
00376     XrdCmsSelected *sipp = 0, *sip;
00377 
00378 // If only one wanted, the select appropriately
00379 //
00380    STMutex.Lock();
00381    iend = (opts & LS_Best ? 0 : STHi);
00382    for (i = 0; i <= iend; i++)
00383        {if (opts & LS_Best)
00384             nP = (Config.sched_RR
00385                  ? SelbyRef( mask, nump, delay, &reason, 0)
00386                  : SelbyLoad(mask, nump, delay, &reason, 0));
00387            else if ((nP=NodeTab[i]) && !lsall && !(nP->NodeMask & mask)) nP=0;
00388         if (nP)
00389            {sip = new XrdCmsSelected((opts & LS_IPO) ? 0 : nP->Name(), sipp);
00390             if (opts & LS_IPV6)
00391                {sip->IPV6Len = nP->IPV6Len;
00392                 strcpy(sip->IPV6, nP->IPV6);
00393                }
00394             sip->Mask    = nP->NodeMask;
00395             sip->Id      = nP->NodeID;
00396             sip->IPAddr  = nP->IPAddr;
00397             sip->Port    = nP->Port;
00398             sip->Load    = nP->myLoad;
00399             sip->Util    = nP->DiskUtil;
00400             sip->RefTotA = nP->RefTotA + nP->RefA;
00401             sip->RefTotR = nP->RefTotR + nP->RefR;
00402             if (nP->isOffline) sip->Status  = XrdCmsSelected::Offline;
00403                else sip->Status  = 0;
00404             if (nP->isDisable) sip->Status |= XrdCmsSelected::Disable;
00405             if (nP->isNoStage) sip->Status |= XrdCmsSelected::NoStage;
00406             if (nP->isSuspend) sip->Status |= XrdCmsSelected::Suspend;
00407             if (nP->isRW     ) sip->Status |= XrdCmsSelected::isRW;
00408             if (nP->isMan    ) sip->Status |= XrdCmsSelected::isMangr;
00409             if (nP->isPeer   ) sip->Status |= XrdCmsSelected::isPeer;
00410             if (nP->isProxy  ) sip->Status |= XrdCmsSelected::isProxy;
00411             nP->UnLock();
00412             sipp = sip;
00413            }
00414        }
00415    STMutex.UnLock();
00416 
00417 // Return result
00418 //
00419    return sipp;
00420 }
00421   
00422 /******************************************************************************/
00423 /*                                L o c a t e                                 */
00424 /******************************************************************************/
00425 
00426 int XrdCmsCluster::Locate(XrdCmsSelect &Sel)
00427 {
00428    EPNAME("Locate");
00429    XrdCmsPInfo   pinfo;
00430    SMask_t       qfVec(0);
00431    char         *Path;
00432    int           retc = 0;
00433 
00434 // Check if this is a locate for all current servers
00435 //
00436    if (*Sel.Path.Val != '*') Path = Sel.Path.Val;
00437       else {if (*(Sel.Path.Val+1) == '\0')
00438                {Sel.Vec.hf = ~0LL; Sel.Vec.pf = Sel.Vec.wf = 0;
00439                 return 0;
00440                }
00441             Path = Sel.Path.Val+1;
00442            }
00443 
00444 // Find out who serves this path
00445 //
00446    if (!Cache.Paths.Find(Path, pinfo) || !pinfo.rovec)
00447       {Sel.Vec.hf = Sel.Vec.pf = Sel.Vec.wf = 0;
00448        return -1;
00449       } else Sel.Vec.wf = pinfo.rwvec;
00450 
00451 // Check if this was a non-lookup request
00452 //
00453    if (*Sel.Path.Val == '*')
00454       {Sel.Vec.hf = pinfo.rovec; Sel.Vec.pf = 0;
00455        Sel.Vec.wf = pinfo.rwvec;
00456        return 0;
00457       }
00458 
00459 // Complete the request info object if we have one
00460 //
00461    if (Sel.InfoP)
00462       {Sel.InfoP->rwVec = pinfo.rwvec;
00463        Sel.InfoP->isLU  = 1;
00464       }
00465 
00466 // First check if we have seen this file before. If so, get nodes that have it.
00467 // A Refresh request kills this because it's as if we hadn't seen it before.
00468 // If the file was found but either a query is in progress or we have a server
00469 // bounce; the client must wait.
00470 //
00471    if (Sel.Opts & XrdCmsSelect::Refresh 
00472    || !(retc = Cache.GetFile(Sel, pinfo.rovec)))
00473       {Cache.AddFile(Sel, 0);
00474        qfVec = pinfo.rovec; Sel.Vec.hf = 0;
00475       } else qfVec = Sel.Vec.bf;
00476 
00477 // Compute the delay, if any
00478 //
00479    if ((!qfVec && retc >= 0) || (Sel.Vec.hf && Sel.InfoP)) retc =  0;
00480       else if (!(retc = Cache.WT4File(Sel, Sel.Vec.hf)))   retc = -2;
00481 
00482 // Check if we have to ask any nodes if they have the file
00483 //
00484    if (qfVec)
00485       {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
00486        if (Sel.Opts & XrdCmsSelect::Refresh)
00487           QReq.Hdr.modifier |= CmsStateRequest::kYR_refresh;
00488        TRACE(Files, "seeking " <<Sel.Path.Val);
00489        qfVec = Cluster.Broadcast(qfVec, QReq.Hdr, 
00490                                  (void *)Sel.Path.Val, Sel.Path.Len+1);
00491        if (qfVec) Cache.UnkFile(Sel, qfVec);
00492       }
00493    return retc;
00494 }
00495   
00496 /******************************************************************************/
00497 /*                               M o n P e r f                                */
00498 /******************************************************************************/
00499   
00500 void *XrdCmsCluster::MonPerf()
00501 {
00502    CmsUsageRequest Usage = {{0, kYR_usage, 0, 0}};
00503    struct iovec ioV[] = {{(char *)&Usage, sizeof(Usage)}};
00504    int ioVnum = sizeof(ioV)/sizeof(struct iovec);
00505    int ioVtot = sizeof(Usage);
00506    SMask_t allNodes(~0);
00507    int uInterval = Config.AskPing*Config.AskPerf;
00508 
00509 // Sleep for the indicated amount of time, then ask for load on each server
00510 //
00511    while(uInterval)
00512         {XrdSysTimer::Snooze(uInterval);
00513          Broadcast(allNodes, ioV, ioVnum, ioVtot);
00514         }
00515    return (void *)0;
00516 }
00517   
00518 /******************************************************************************/
00519 /*                               M o n R e f s                                */
00520 /******************************************************************************/
00521   
00522 void *XrdCmsCluster::MonRefs()
00523 {
00524    XrdCmsNode *nP;
00525    int  i, snooze_interval = 10*60, loopmax, loopcnt = 0;
00526    int resetA, resetR, resetAR;
00527 
00528 // Compute snooze interval
00529 //
00530    if ((loopmax = Config.RefReset / snooze_interval) <= 1)
00531       {if (!Config.RefReset) loopmax = 0;
00532           else {loopmax = 1; snooze_interval = Config.RefReset;}
00533       }
00534 
00535 // Sleep for the snooze interval. If a reset was requested then do a selective
00536 // reset unless we reached our snooze maximum and enough selections have gone
00537 // by; in which case, do a global reset.
00538 //
00539    do {XrdSysTimer::Snooze(snooze_interval);
00540        loopcnt++;
00541        STMutex.Lock();
00542        resetA  = (SelAcnt >= Config.RefTurn);
00543        resetR  = (SelRcnt >= Config.RefTurn);
00544        resetAR = (loopmax && loopcnt >= loopmax && (resetA || resetR));
00545        if (doReset || resetAR)
00546            {for (i = 0; i <= STHi; i++)
00547                 if ((nP = NodeTab[i])
00548                 &&  (resetAR || (doReset && nP->isNode(resetMask))) )
00549                     {nP->Lock();
00550                      if (resetA || doReset) {nP->RefTotA += nP->RefA;nP->RefA=0;}
00551                      if (resetR || doReset) {nP->RefTotR += nP->RefR;nP->RefR=0;}
00552                      nP->UnLock();
00553                     }
00554             if (resetAR)
00555                {if (resetA) SelAcnt = 0;
00556                 if (resetR) SelRcnt = 0;
00557                 loopcnt = 0;
00558                }
00559             if (doReset) {doReset = 0; resetMask = 0;}
00560            }
00561        STMutex.UnLock();
00562       } while(1);
00563    return (void *)0;
00564 }
00565 
00566 /******************************************************************************/
00567 /*                                R e m o v e                                 */
00568 /******************************************************************************/
00569 
00570 // Warning! The node object must be locked upon entry. The lock is released
00571 //          prior to returning to the caller. This entry obtains the node
00572 //          table lock. When immed != 0 then the node is immediately dropped.
00573 //          When immed if < 0 then the caller already holds the STMutex and it 
00574 //          is not released upon exit.
00575 
00576 void XrdCmsCluster::Remove(const char *reason, XrdCmsNode *theNode, int immed)
00577 {
00578    EPNAME("Remove_Node")
00579    struct theLocks
00580           {XrdSysMutex *myMutex;
00581            XrdCmsNode  *myNode;
00582            char        *myIdent;
00583            int          myImmed;
00584            int          myNID;
00585            int          myInst;
00586 
00587                        theLocks(XrdSysMutex *mtx, XrdCmsNode *node, int immed)
00588                                : myMutex(mtx), myNode(node), myImmed(immed)
00589                                {myIdent = strdup(node->Ident);
00590                                 myNID = node->ID(myInst);
00591                                 if (myImmed >= 0)
00592                                    {myNode->UnLock();
00593                                     myMutex->Lock();
00594                                     myNode->Lock();
00595                                    }
00596                                }
00597                       ~theLocks()
00598                                {if (myImmed >= 0) myMutex->UnLock();
00599                                 if (myNode) myNode->UnLock();
00600                                 free(myIdent);
00601                                }
00602           } LockHandler(&STMutex, theNode, immed);
00603 
00604    int Inst, NodeID = theNode->ID(Inst);
00605 
00606 // The LockHandler makes sure that the proper locks are obtained in a deadlock
00607 // free order. However, this may require that the node lock be released and
00608 // then re-aquired. We check if we are still dealing with same node at entry.
00609 // If not, issue message and high-tail it out.
00610 //
00611    if (LockHandler.myNID != NodeID || LockHandler.myInst != Inst)
00612       {Say.Emsg("Manager", LockHandler.myIdent, "removal aborted.");
00613        DEBUG(LockHandler.myIdent <<" node " <<NodeID <<'.' <<Inst <<" != "
00614              << LockHandler.myNID <<'.' <<LockHandler.myInst <<" at entry.");
00615       }
00616 
00617 // Mark node as being offline
00618 //
00619    theNode->isOffline = 1;
00620 
00621 // If the node is connected the simply close the connection. This will cause
00622 // the connection handler to re-initiate the node removal. The LockHandler
00623 // destructor will release the node table and node object locks as needed.
00624 // This condition exists only if one node is being displaced by another node.
00625 //
00626    if (theNode->isConn)
00627       {theNode->Disc(reason, 0);
00628        theNode->isGone = 1;
00629        return;
00630       }
00631 
00632 
00633 // If the node is part of the cluster, do not count it anymore and
00634 // indicate new state of this nodes if we are a reporting manager
00635 //
00636    if (theNode->isBound)
00637       {theNode->isBound = 0; NodeCnt--;
00638        if (Config.asManager())
00639           CmsState.Update(XrdCmsState::Counts, theNode->isSuspend ? 0 : -1,
00640                                                theNode->isNoStage ? 0 : -1);
00641       }
00642 
00643 // If this is an immediate drop request, do so now. Drop() will delete
00644 // the node object and remove the node lock. So, tell LockHandler that.
00645 //
00646    if (immed || !Config.DRPDelay) 
00647       {Drop(NodeID, Inst);
00648        LockHandler.myNode = 0;
00649        return;
00650       }
00651 
00652 // If a drop job is already scheduled, update the instance field. Otherwise,
00653 // Schedule a node drop at a future time.
00654 //
00655    theNode->DropTime = time(0)+Config.DRPDelay;
00656    if (theNode->DropJob) theNode->DropJob->nodeInst = Inst;
00657       else theNode->DropJob = new XrdCmsDrop(NodeID, Inst);
00658 
00659 // Document removal
00660 //
00661    if (reason) 
00662       Say.Emsg("Manager", theNode->Ident, "scheduled for removal;", reason);
00663       else DEBUG(theNode->Ident <<" node " <<NodeID <<'.' <<Inst);
00664 }
00665 
00666 /******************************************************************************/
00667 /*                              R e s e t R e f                               */
00668 /******************************************************************************/
00669   
00670 void XrdCmsCluster::ResetRef(SMask_t smask)
00671 {
00672 
00673 // Obtain a lock on the table
00674 //
00675    STMutex.Lock();
00676 
00677 // Inform the reset thread that we need a reset
00678 //
00679    doReset = 1;
00680    resetMask |= smask;
00681 
00682 // Unlock table and exit
00683 //
00684    STMutex.UnLock();
00685 }
00686 
00687 /******************************************************************************/
00688 /*                                S e l e c t                                 */
00689 /******************************************************************************/
00690   
00691 int XrdCmsCluster::Select(XrdCmsSelect &Sel)
00692 {
00693    EPNAME("Select");
00694    XrdCmsPInfo  pinfo;
00695    const char  *Amode;
00696    int dowt = 0, retc, isRW, fRD, noSel = (Sel.Opts & XrdCmsSelect::Defer);
00697    SMask_t amask, smask, pmask;
00698 
00699 // Establish some local options
00700 //
00701    if (Sel.Opts & XrdCmsSelect::Write) 
00702       {isRW = 1; Amode = "write";
00703        if (Config.RWDelay)
00704           if (Sel.Opts & XrdCmsSelect::Create && Config.RWDelay < 2) fRD = 1;
00705              else fRD = 0;
00706           else fRD = 1;
00707       }
00708       else {isRW = 0; Amode = "read"; fRD = 1;}
00709 
00710 // Find out who serves this path
00711 //
00712    if (!Cache.Paths.Find(Sel.Path.Val, pinfo)
00713    || (amask = ((isRW ? pinfo.rwvec : pinfo.rovec) & ~Sel.nmask)) == 0)
00714       {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
00715                        "No servers have %s access to the file", Amode)+1;
00716        return -1;
00717       }
00718 
00719 // If either a refresh is wanted or we didn't find the file, re-prime the cache
00720 // which will force the client to wait. Otherwise, compute the primary and
00721 // secondary selections. If there are none, the client may have to wait if we
00722 // have servers that we can query regarding the file. Note that for files being
00723 // opened in write mode, only one writable copy may exist unless this is a
00724 // meta-operation (e.g., remove) in which case the file itself remain unmodified
00725 // or a replica request, in which case we select a new target server.
00726 //
00727    if (!(Sel.Opts & XrdCmsSelect::Refresh)
00728    &&   (retc = Cache.GetFile(Sel, pinfo.rovec)))
00729       {if (isRW)
00730           {     if (Sel.Opts & XrdCmsSelect::Replica)
00731                    {pmask = amask & ~(Sel.Vec.hf | Sel.Vec.bf); smask = 0;
00732                     if (!pmask && !Sel.Vec.bf) return SelFail(Sel,eNoRep);
00733                    }
00734            else if (Sel.Vec.bf) pmask = smask = 0;
00735            else if (Sel.Vec.hf)
00736                    {if (Sel.Opts & XrdCmsSelect::NewFile) return SelFail(Sel,eExists);
00737                     if (!(Sel.Opts & XrdCmsSelect::isMeta)
00738                     &&  Multiple(Sel.Vec.hf))             return SelFail(Sel,eDups);
00739                     if (!(pmask = Sel.Vec.hf & amask))    return SelFail(Sel,eROfs);
00740                     smask = 0;
00741                    }
00742            else if (Sel.Opts & XrdCmsSelect::Trunc) {pmask = amask; smask = 0;}
00743            else    pmask = ((smask = pinfo.ssvec & amask) ? 0 : amask);
00744           } else {
00745            pmask = Sel.Vec.hf  & amask; 
00746            if (Sel.Opts & XrdCmsSelect::Online) {pmask &= ~Sel.Vec.pf; smask=0;}
00747               else smask = pinfo.ssvec & amask;
00748           }
00749        if (Sel.Vec.hf & Sel.nmask) Cache.UnkFile(Sel, Sel.nmask);
00750       } else {
00751        Cache.AddFile(Sel, 0); 
00752        Sel.Vec.bf = pinfo.rovec; 
00753        Sel.Vec.hf = Sel.Vec.pf = pmask = smask = 0;
00754        retc = 0;
00755       }
00756 
00757 // A wait is required if we don't have any primary or seconday servers
00758 //
00759    dowt = (!pmask && !smask);
00760 
00761 // If we can query additional servers, do so now. The client will be placed
00762 // in the callback queue only if we have no possible selections
00763 //
00764    if (Sel.Vec.bf)
00765       {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
00766        if (Sel.Opts & XrdCmsSelect::Refresh)
00767           QReq.Hdr.modifier |= CmsStateRequest::kYR_refresh;
00768        if (dowt) retc= (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
00769        TRACE(Files, "seeking " <<Sel.Path.Val);
00770        amask = Cluster.Broadcast(Sel.Vec.bf, QReq.Hdr,
00771                                  (void *)Sel.Path.Val,Sel.Path.Len+1);
00772        if (amask) Cache.UnkFile(Sel, amask);
00773        if (dowt) return retc;
00774       } else if (dowt && retc < 0 && !noSel)
00775                 return (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
00776 
00777 // Broadcast a freshen up request if wanted
00778 //
00779    if ((Sel.Opts & XrdCmsSelect::Freshen) && (amask = pmask & ~Sel.Vec.bf))
00780       {CmsStateRequest Qupt={{0,kYR_state,kYR_raw|CmsStateRequest::kYR_noresp,0}};
00781        Cluster.Broadcast(amask, Qupt.Hdr,(void *)Sel.Path.Val,Sel.Path.Len+1);
00782       }
00783 
00784 // If we need to defer selection, simply return as this is a mindless prepare
00785 //
00786    if (noSel) return 0;
00787 
00788 // Select a node
00789 //
00790    if (dowt || (retc = SelNode(Sel, pmask, smask)) < 0)
00791       {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
00792                        "No servers are available to %s%s the file.",
00793                        Sel.Opts & XrdCmsSelect::Online ? "immediately " : "",
00794                        (smask ? "stage" : Amode))+1;
00795        return -1;
00796       }
00797 
00798 // All done
00799 //
00800    return retc;
00801 }
00802 
00803 /******************************************************************************/
00804   
00805 int XrdCmsCluster::Select(int isrw, SMask_t pmask,
00806                           int &port, char *hbuff, int &hlen)
00807 {
00808    static const SMask_t smLow(255);
00809    XrdCmsNode *nP = 0;
00810    SMask_t tmask;
00811    int Snum = 0;
00812 
00813 // Compute the a single node number that is contained in the mask
00814 //
00815    if (!pmask) return 0;
00816    do {if (!(tmask = pmask & smLow)) Snum += 8;
00817          else {while((tmask = tmask>>1)) Snum++; break;}
00818       } while((pmask = pmask >> 8));
00819 
00820 // See if the node passes muster
00821 //
00822    STMutex.Lock();
00823    if ((nP = NodeTab[Snum]))
00824       {if (nP->isOffline || nP->isSuspend || nP->isDisable)      nP = 0;
00825           else if (!Config.sched_RR
00826                && (nP->myLoad > Config.MaxLoad))                 nP = 0;
00827        if (nP)
00828           {if (isrw)
00829               if (nP->isNoStage || nP->DiskFree < nP->DiskMinF)  nP = 0;
00830                  else {SelAcnt++; nP->Lock();}
00831               else     {SelRcnt++; nP->Lock();}
00832           }
00833       }
00834    STMutex.UnLock();
00835 
00836 // At this point either we have a node or we do not
00837 //
00838    if (nP)
00839       {strcpy(hbuff, nP->Name(hlen, port));
00840        nP->RefR++;
00841        nP->UnLock();
00842        return 1;
00843       }
00844    return 0;
00845 }
00846 
00847 /******************************************************************************/
00848 /*                               S e l F a i l                                */
00849 /******************************************************************************/
00850   
00851 int XrdCmsCluster::SelFail(XrdCmsSelect &Sel, int rc)
00852 {
00853     const char *etext = (rc == eExists
00854                       ? "Unable to create new file; file already exists."
00855                       : (rc == eROfs)
00856                       ? "Unable to write file; r/o file already exists."
00857                       : (rc == eDups)
00858                       ? "Unable to write file; multiple files exist."
00859                       : "Unable to replicate file; no new sites available.");
00860 
00861     Sel.Resp.DLen = strlcpy(Sel.Resp.Data, etext, sizeof(Sel.Resp.Data))+1;
00862     return -1;
00863 }
00864   
00865 /******************************************************************************/
00866 /*                                 S p a c e                                  */
00867 /******************************************************************************/
00868   
00869 void XrdCmsCluster::Space(SpaceData &sData, SMask_t smask)
00870 {
00871    int i;
00872    XrdCmsNode *nP;
00873    SMask_t bmask;
00874 
00875 // Obtain a lock on the table and screen out peer nodes
00876 //
00877    STMutex.Lock();
00878    bmask = smask & peerMask;
00879 
00880 // Run through the table getting space information
00881 //
00882    for (i = 0; i <= STHi; i++)
00883        if ((nP = NodeTab[i]) && nP->isNode(bmask)
00884        &&  !nP->isOffline    && nP->isRW)
00885           {sData.Total += nP->DiskTotal;
00886            sData.sNum++;
00887            if (sData.sFree < nP->DiskFree)
00888               {sData.sFree = nP->DiskFree; sData.sUtil = nP->DiskUtil;}
00889            if (nP->isRW & XrdCmsNode::allowsRW)
00890               {sData.wNum++;
00891                if (sData.wFree < nP->DiskFree)
00892                   {sData.wFree = nP->DiskFree; sData.wUtil = nP->DiskUtil;
00893                    sData.wMinF = nP->DiskMinF;
00894                   }
00895               }
00896           }
00897    STMutex.UnLock();
00898 }
00899 
00900 /******************************************************************************/
00901 /*                                 S t a t s                                  */
00902 /******************************************************************************/
00903   
00904 int XrdCmsCluster::Stats(char *bfr, int bln)
00905 {
00906    static const char statfmt1[] = "<stats id=\"cms\"><name>%s</name>";
00907    static const char statfmt2[] = "<subscriber><name>%s</name>"
00908           "<status>%s</status><load>%d</load><diskfree>%d</diskfree>"
00909           "<refa>%d</refa><refr>%d</refr></subscriber>";
00910    static const char statfmt3[] = "</stats>\n";
00911    XrdCmsSelected *sp;
00912    int mlen, tlen = sizeof(statfmt3);
00913    char stat[6], *stp;
00914 
00915    class spmngr {
00916          public: XrdCmsSelected *sp;
00917 
00918                  spmngr() {sp = 0;}
00919                 ~spmngr() {XrdCmsSelected *xsp;
00920                            while((xsp = sp)) {sp = sp->next; delete xsp;}
00921                           }
00922                 } mngrsp;
00923 
00924 // Check if actual length wanted
00925 //
00926    if (!bfr) return  sizeof(statfmt1) + 256  +
00927                     (sizeof(statfmt2) + 20*4 + 256) * STMax +
00928                      sizeof(statfmt3) + 1;
00929 
00930 // Get the statistics
00931 //
00932    mngrsp.sp = sp = List(FULLMASK, LS_All);
00933 
00934 // Format the statistics
00935 //
00936    mlen = snprintf(bfr, bln, statfmt1, Config.myName);
00937    if ((bln -= mlen) <= 0) return 0;
00938    tlen += mlen;
00939 
00940    while(sp && bln)
00941         {stp = stat;
00942          if (sp->Status)
00943             {if (sp->Status & XrdCmsSelected::Offline) *stp++ = 'o';
00944              if (sp->Status & XrdCmsSelected::Suspend) *stp++ = 's';
00945              if (sp->Status & XrdCmsSelected::NoStage) *stp++ = 'n';
00946              if (sp->Status & XrdCmsSelected::Disable) *stp++ = 'd';
00947             } else *stp++ = 'a';
00948          bfr += mlen;
00949          mlen = snprintf(bfr, bln, statfmt2, sp->Name, stat,
00950                 sp->Load, sp->Free, sp->RefTotA, sp->RefTotR);
00951          bln  -= mlen;
00952          tlen += mlen;
00953          sp = sp->next;
00954         }
00955 
00956 // See if we overflowed. otherwise finish up
00957 //
00958    if (sp || bln < (int)sizeof(statfmt1)) return 0;
00959    bfr += mlen;
00960    strcpy(bfr, statfmt3);
00961    return tlen;
00962 }
00963   
00964 /******************************************************************************/
00965 /*                       P r i v a t e   M e t h o d s                        */
00966 /******************************************************************************/
00967 /******************************************************************************/
00968 /*                                A s s i g n                                 */
00969 /******************************************************************************/
00970   
00971 int XrdCmsCluster::Assign(const char *Cid)
00972 {
00973    static int cNum = 0;
00974    XrdOucTList *cP, *cPP, *cNew;
00975    int n = -1;
00976 
00977 // Lock the cluster ID list
00978 //
00979    cidMutex.Lock();
00980 
00981 // Now find the cluster
00982 //
00983    cP = cidFirst; cPP = 0;
00984    while(cP && (n = strcmp(Cid, cP->text)) < 0) {cPP = cP; cP = cP->next;}
00985 
00986 // If an exiting cluster simply return the cluster number
00987 //
00988    if (!n && cP) {n = cP->val; cidMutex.UnLock(); return n;}
00989 
00990 // Add this cluster
00991 //
00992    n = ++cNum;
00993    cNew = new XrdOucTList(Cid, cNum, cP);
00994    if (cPP) cPP->next = cNew;
00995       else  cidFirst  = cNew;
00996 
00997 // Return the cluster number
00998 //
00999    cidMutex.UnLock();
01000    return n;
01001 }
01002 
01003 /******************************************************************************/
01004 /*                             c a l c D e l a y                              */
01005 /******************************************************************************/
01006   
01007 XrdCmsNode *XrdCmsCluster::calcDelay(int nump, int numd, int numf, int numo,
01008                                      int nums, int &delay, const char **reason)
01009 {
01010         if (!nump) {delay = 0;
01011                     *reason = "no eligible servers for";
01012                    }
01013    else if (numf)  {delay = Config.DiskWT;
01014                     *reason = "no eligible servers have space for";
01015                    }
01016    else if (numo)  {delay = Config.MaxDelay;
01017                     *reason = "eligible servers overloaded for";
01018                    }
01019    else if (nums)  {delay = Config.SUSDelay;
01020                     *reason = "eligible servers suspended for";
01021                    }
01022    else if (numd)  {delay = Config.SUPDelay;
01023                     *reason = "eligible servers offline for";
01024                    }
01025    else            {delay = Config.SUPDelay;
01026                     *reason = "server selection error for";
01027                    }
01028    return (XrdCmsNode *)0;
01029 }
01030 
01031 /******************************************************************************/
01032 /*                                  D r o p                                   */
01033 /******************************************************************************/
01034   
01035 // Warning: STMutex must be locked upon entry; the caller must release it.
01036 //          This method may only be called via Remove() either directly or via
01037 //          a defered job scheduled by that method. This method actually
01038 //          deletes the node object.
01039 
01040 int XrdCmsCluster::Drop(int sent, int sinst, XrdCmsDrop *djp)
01041 {
01042    EPNAME("Drop_Node")
01043    XrdCmsNode *nP;
01044    char hname[512];
01045 
01046 // Make sure this node is the right one
01047 //
01048    if (!(nP = NodeTab[sent]) || nP->Inst() != sinst)
01049       {if (nP && djp == nP->DropJob) {nP->DropJob = 0; nP->DropTime = 0;}
01050        DEBUG(sent <<'.' <<sinst <<" cancelled.");
01051        return 0;
01052       }
01053 
01054 // Check if the drop has been rescheduled
01055 //
01056    if (djp && time(0) < nP->DropTime)
01057       {Sched->Schedule((XrdJob *)djp, nP->DropTime);
01058        return 1;
01059       }
01060 
01061 // Save the node name (don't want to hold a lock across a message)
01062 //
01063    strlcpy(hname, nP->Ident, sizeof(hname));
01064 
01065 // Remove node from the node table
01066 //
01067    NodeTab[sent] = 0;
01068    nP->isOffline = 1;
01069    nP->DropTime  = 0;
01070    nP->DropJob   = 0;
01071    nP->isBound   = 0;
01072 
01073 // Remove node from the peer list (if it is one)
01074 //
01075    if (nP->isPeer) {peerHost &= nP->NodeMask; peerMask = ~peerHost;}
01076 
01077 // Remove node entry from the alternate list and readjust the end pointer.
01078 //
01079    if (nP->isMan)
01080       {memset((void *)&AltMans[sent*AltSize], (int)' ', AltSize);
01081        if (sent == AltMent)
01082           {AltMent--;
01083            while(AltMent >= 0 &&  NodeTab[AltMent]
01084                               && !NodeTab[AltMent]->isMan) AltMent--;
01085            if (AltMent < 0) AltMend = AltMans;
01086               else AltMend = AltMans + ((AltMent+1)*AltSize);
01087           }
01088       }
01089 
01090 // Readjust STHi
01091 //
01092    if (sent == STHi) while(STHi >= 0 && !NodeTab[STHi]) STHi--;
01093 
01094 // Invalidate any cached entries for this node
01095 //
01096    if (nP->NodeMask) Cache.Drop(nP->NodeMask, sent, STHi);
01097 
01098 // Document the drop
01099 //
01100    Say.Emsg("Drop_Node", hname, "dropped.");
01101 
01102 // Delete the node object
01103 //
01104    delete nP;
01105    return 0;
01106 }
01107 
01108 /******************************************************************************/
01109 /*                              M u l t i p l e                               */
01110 /******************************************************************************/
01111 
01112 int XrdCmsCluster::Multiple(SMask_t mVec)
01113 {
01114    static const unsigned long long Left32  = 0xffffffff00000000LL;
01115    static const unsigned long long Right32 = 0x00000000ffffffffLL;
01116    static const unsigned long long Left16  = 0x00000000ffff0000LL;
01117    static const unsigned long long Right16 = 0x000000000000ffffLL;
01118    static const unsigned long long Left08  = 0x000000000000ff00LL;
01119    static const unsigned long long Right08 = 0x00000000000000ffLL;
01120    static const unsigned long long Left04  = 0x00000000000000f0LL;
01121    static const unsigned long long Right04 = 0x000000000000000fLL;
01122 //                                0 1 2 3 4 5 6 7 8 9 A B C D E F
01123    static const int isMult[16] = {0,0,0,1,0,1,1,1,0,1,1,1,1,1,1,1};
01124 
01125    if (mVec & Left32) {if (mVec & Right32) return 1;
01126                           else mVec = mVec >> 32LL;
01127                       }
01128    if (mVec & Left16) {if (mVec & Right16) return 1;
01129                           else mVec = mVec >> 16LL;
01130                       }
01131    if (mVec & Left08) {if (mVec & Right08) return 1;
01132                           else mVec = mVec >>  8LL;
01133                       }
01134    if (mVec & Left04) {if (mVec & Right04) return 1;
01135                           else mVec = mVec >>  4LL;
01136                       }
01137    return isMult[mVec];
01138 }
01139   
01140 /******************************************************************************/
01141 /*                                R e c o r d                                 */
01142 /******************************************************************************/
01143   
01144 void XrdCmsCluster::Record(char *path, const char *reason)
01145 {
01146    EPNAME("Record")
01147    static int msgcnt = 256;
01148    static XrdSysMutex mcMutex;
01149    int mcnt;
01150 
01151    DEBUG(reason <<path);
01152    mcMutex.Lock();
01153    msgcnt++; mcnt = msgcnt;
01154    mcMutex.UnLock();
01155 
01156    if (mcnt > 255)
01157       {Say.Emsg("client defered;", reason, path);
01158        mcnt = 1;
01159       }
01160 }
01161  
01162 /******************************************************************************/
01163 /*                               S e l N o d e                                */
01164 /******************************************************************************/
01165   
01166 int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
01167 {
01168     EPNAME("SelNode")
01169     const char *act=0, *reason, *reason2 = "";
01170     int pspace, needspace, delay = 0, delay2 = 0, nump, isalt = 0, pass = 2;
01171     SMask_t mask;
01172     XrdCmsNode *nP = 0;
01173 
01174 // There is a difference bwteen needing space and needing r/w access. The former
01175 // is needed when we will be writing data the latter for inode modifications.
01176 //
01177    if (Sel.Opts & XrdCmsSelect::isMeta) needspace = 0;
01178       else needspace = (Sel.Opts & XrdCmsSelect::Write?XrdCmsNode::allowsRW:0);
01179    pspace = needspace;
01180 
01181 // Scan for a primary and alternate node (alternates do staging). At this
01182 // point we omit all peer nodes as they are our last resort.
01183 //
01184    STMutex.Lock();
01185    mask = pmask & peerMask;
01186    while(pass--)
01187         {if (mask)
01188             {nP = (Config.sched_RR
01189                    ? SelbyRef( mask, nump, delay, &reason, needspace)
01190                    : SelbyLoad(mask, nump, delay, &reason, needspace));
01191              if (nP || (nump && delay) || NodeCnt < Config.SUPCount) break;
01192             }
01193          mask = amask & peerMask; isalt = XrdCmsNode::allowsSS;
01194          if (!(Sel.Opts & XrdCmsSelect::isMeta)) needspace |= isalt;
01195         }
01196    STMutex.UnLock();
01197 
01198 // Update info
01199 //
01200    if (nP)
01201       {strcpy(Sel.Resp.Data, nP->Name(Sel.Resp.DLen, Sel.Resp.Port));
01202        Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
01203        if (isalt || (Sel.Opts & XrdCmsSelect::Create) || Sel.iovN)
01204           {if (isalt || (Sel.Opts & XrdCmsSelect::Create))
01205               {Sel.Opts |= (XrdCmsSelect::Pending | XrdCmsSelect::Advisory);
01206                if (Sel.Opts & XrdCmsSelect::noBind) act = " handling ";
01207                   else Cache.AddFile(Sel, nP->NodeMask);
01208               }
01209            if (Sel.iovN && Sel.iovP) 
01210               {nP->Send(Sel.iovP, Sel.iovN); act = " staging ";}
01211               else if (!act)                 act = " assigned ";
01212           } else                             act = " serving ";
01213        nP->UnLock();
01214        TRACE(Stage, Sel.Resp.Data <<act <<Sel.Path.Val);
01215        return 0;
01216       } else if (!delay && NodeCnt < Config.SUPCount)
01217                 {reason = "insufficient number of nodes";
01218                  delay = Config.SUPDelay;
01219                 }
01220 
01221 // Return delay if selection failure is recoverable
01222 //
01223    if (delay && delay < Config.PSDelay)
01224       {Record(Sel.Path.Val, reason);
01225        return delay;
01226       }
01227 
01228 // At this point, we attempt a peer node selection (choice of last resort)
01229 //
01230    if (Sel.Opts & XrdCmsSelect::Peers)
01231       {STMutex.Lock();
01232        if ((mask = (pmask | amask) & peerHost))
01233           nP = SelbyCost(mask, nump, delay2, &reason2, pspace);
01234        STMutex.UnLock();
01235        if (nP)
01236           {strcpy(Sel.Resp.Data, nP->Name(Sel.Resp.DLen, Sel.Resp.Port));
01237            Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
01238            if (Sel.iovN && Sel.iovP) nP->Send(Sel.iovP, Sel.iovN);
01239            nP->UnLock();
01240            TRACE(Stage, "Peer " <<Sel.Resp.Data <<" handling " <<Sel.Path.Val);
01241            return 0;
01242           }
01243        if (!delay) {delay = delay2; reason = reason2;}
01244       }
01245 
01246 // At this point we either don't have enough nodes or simply can't handle this
01247 //
01248    if (delay)
01249       {TRACE(Defer, "client defered; " <<reason <<" for " <<Sel.Path.Val);
01250        return delay;
01251       }
01252    return -1;
01253 }
01254 
01255 /******************************************************************************/
01256 /*                             S e l b y C o s t                              */
01257 /******************************************************************************/
01258 
01259 // Cost selection is used only for peer node selection as peers do not
01260 // report a load and handle their own scheduling.
01261 
01262 XrdCmsNode *XrdCmsCluster::SelbyCost(SMask_t mask, int &nump, int &delay,
01263                                      const char **reason, int needspace)
01264 {
01265     int i, numd, numf, nums;
01266     XrdCmsNode *np, *sp = 0;
01267 
01268 // Scan for a node (sp points to the selected one)
01269 //
01270    nump = nums = numf = numd = 0; // possible, suspended, full, and dead
01271    for (i = 0; i <= STHi; i++)
01272        if ((np = NodeTab[i]) && (np->NodeMask & mask))
01273           {nump++;
01274            if (np->isOffline)                   {numd++; continue;}
01275            if (np->isSuspend || np->isDisable)  {nums++; continue;}
01276            if (needspace &&     np->isNoStage)  {numf++; continue;}
01277            if (!sp) sp = np;
01278               else if (abs(sp->myCost - np->myCost)
01279                           <= Config.P_fuzz)
01280                       {if (needspace)
01281                           {if (sp->RefA > (np->RefA+Config.DiskLinger))
01282                                sp=np;
01283                            } 
01284                            else if (sp->RefR > np->RefR) sp=np;
01285                        }
01286                        else if (sp->myCost > np->myCost) sp=np;
01287           }
01288 
01289 // Check for overloaded node and return result
01290 //
01291    if (!sp) return calcDelay(nump, numd, numf, 0, nums, delay, reason);
01292    sp->Lock();
01293    if (needspace) {SelAcnt++; sp->RefA++;}  // Protected by STMutex
01294       else        {SelRcnt++; sp->RefR++;}
01295    delay = 0;
01296    return sp;
01297 }
01298   
01299 /******************************************************************************/
01300 /*                             S e l b y L o a d                              */
01301 /******************************************************************************/
01302   
01303 XrdCmsNode *XrdCmsCluster::SelbyLoad(SMask_t mask, int &nump, int &delay,
01304                                      const char **reason, int needspace)
01305 {
01306     int i, numd, numf, numo, nums;
01307     int reqSS = needspace & XrdCmsNode::allowsSS;
01308     XrdCmsNode *np, *sp = 0;
01309 
01310 // Scan for a node (preset possible, suspended, overloaded, full, and dead)
01311 //
01312    nump = nums = numo = numf = numd = 0; 
01313    for (i = 0; i <= STHi; i++)
01314        if ((np = NodeTab[i]) && (np->NodeMask & mask))
01315           {nump++;
01316            if (np->isOffline)                     {numd++; continue;}
01317            if (np->isSuspend || np->isDisable)    {nums++; continue;}
01318            if (np->myLoad > Config.MaxLoad)       {numo++; continue;}
01319            if (needspace && (np->DiskFree < np->DiskMinF
01320                              || (reqSS && np->isNoStage)))
01321               {numf++; continue;}
01322            if (!sp) sp = np;
01323               else if (needspace)
01324                       {if (abs(sp->myMass - np->myMass) <= Config.P_fuzz)
01325                           {if (sp->RefA > (np->RefA+Config.DiskLinger)) sp=np;}
01326                           else if (sp->myMass > np->myMass)             sp=np;
01327                       } else {
01328                        if (abs(sp->myLoad - np->myLoad) <= Config.P_fuzz)
01329                           {if (sp->RefR > np->RefR)                     sp=np;}
01330                           else if (sp->myLoad > np->myLoad)             sp=np;
01331                       }
01332           }
01333 
01334 // Check for overloaded node and return result
01335 //
01336    if (!sp) return calcDelay(nump, numd, numf, numo, nums, delay, reason);
01337    sp->Lock();
01338    if (needspace) {SelAcnt++; sp->RefA++;}  // Protected by STMutex
01339       else        {SelRcnt++; sp->RefR++;}
01340    delay = 0;
01341    return sp;
01342 }
01343 
01344 /******************************************************************************/
01345 /*                              S e l b y R e f                               */
01346 /******************************************************************************/
01347 
01348 XrdCmsNode *XrdCmsCluster::SelbyRef(SMask_t mask, int &nump, int &delay,
01349                                     const char **reason, int needspace)
01350 {
01351     int i, numd, numf, nums;
01352     int reqSS = needspace & XrdCmsNode::allowsSS;
01353     XrdCmsNode *np, *sp = 0;
01354 
01355 // Scan for a node (sp points to the selected one)
01356 //
01357    nump = nums = numf = numd = 0; // possible, suspended, full, and dead
01358    for (i = 0; i <= STHi; i++)
01359        if ((np = NodeTab[i]) && (np->NodeMask & mask))
01360           {nump++;
01361            if (np->isOffline)                   {numd++; continue;}
01362            if (np->isSuspend || np->isDisable)  {nums++; continue;}
01363            if (needspace && (np->DiskFree < np->DiskMinF
01364                              || (reqSS && np->isNoStage)))
01365               {numf++; continue;}
01366            if (!sp) sp = np;
01367               else if (needspace)
01368                       {if (sp->RefA > (np->RefA+Config.DiskLinger)) sp=np;}
01369                       else if (sp->RefR > np->RefR) sp=np;
01370           }
01371 
01372 // Check for overloaded node and return result
01373 //
01374    if (!sp) return calcDelay(nump, numd, numf, 0, nums, delay, reason);
01375    sp->Lock();
01376    if (needspace) {SelAcnt++; sp->RefA++;}  // Protected by STMutex
01377       else        {SelRcnt++; sp->RefR++;}
01378    delay = 0;
01379    return sp;
01380 }
01381  
01382 /******************************************************************************/
01383 /*                             s e n d A L i s t                              */
01384 /******************************************************************************/
01385   
01386 // Single entry at a time, protected by STMutex!
01387 
01388 void XrdCmsCluster::sendAList(XrdLink *lp)
01389 {
01390    static CmsTryRequest Req = {{0, kYR_try, 0, 0}, 0};
01391    static int HdrSize = sizeof(Req.Hdr) + sizeof(Req.sLen);
01392    static char *AltNext = AltMans;
01393    static struct iovec iov[4] = {{(caddr_t)&Req, HdrSize},
01394                                  {0, 0},
01395                                  {AltMans, 0},
01396                                  {(caddr_t)"\0", 1}};
01397    int dlen;
01398 
01399 // Calculate what to send
01400 //
01401    AltNext = AltNext + AltSize;
01402    if (AltNext >= AltMend)
01403       {AltNext = AltMans;
01404        iov[1].iov_len = 0;
01405        iov[2].iov_len = dlen = AltMend - AltMans;
01406       } else {
01407         iov[1].iov_base = (caddr_t)AltNext;
01408         iov[1].iov_len  = AltMend - AltNext;
01409         iov[2].iov_len  = AltNext - AltMans;
01410         dlen = iov[1].iov_len + iov[2].iov_len;
01411       }
01412 
01413 // Complete the request (account for trailing null character)
01414 //
01415    dlen++;
01416    Req.Hdr.datalen = htons(static_cast<unsigned short>(dlen+sizeof(Req.sLen)));
01417    Req.sLen = htons(static_cast<unsigned short>(dlen));
01418 
01419 // Send the list of alternates (rotated once)
01420 //
01421    lp->Send(iov, 4, dlen+HdrSize);
01422 }
01423 
01424 /******************************************************************************/
01425 /*                             s e t A l t M a n                              */
01426 /******************************************************************************/
01427   
01428 // Single entry at a time, protected by STMutex!
01429   
01430 void XrdCmsCluster::setAltMan(int snum, unsigned int ipaddr, int port)
01431 {
01432    char *ap = &AltMans[snum*AltSize];
01433    int i;
01434 
01435 // Preset the buffer and pre-screen the port number
01436 //
01437    if (!port || (port > 0x0000ffff)) port = Config.PortTCP;
01438    memset(ap, int(' '), AltSize);
01439 
01440 // Insert the ip address of this node into the list of nodes
01441 //
01442    i = XrdNetDNS::IP2String(ipaddr, port, ap, AltSize);
01443    ap[i] = ' ';
01444 
01445 // Compute new fence
01446 //
01447    if (ap >= AltMend) {AltMend = ap + AltSize; AltMent = snum;}
01448 }

Generated on Tue Jul 5 14:46:24 2011 for ROOT_528-00b_version by  doxygen 1.5.1