00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdCmsClusterCVSID = "$Id: XrdCmsCluster.cc 38011 2011-02-08 18:35:57Z ganis $";
00016
00017 #include <errno.h>
00018 #include <fcntl.h>
00019 #include <stdio.h>
00020 #include <stdlib.h>
00021 #include <unistd.h>
00022 #include <netinet/in.h>
00023 #include <sys/types.h>
00024
00025 #include "XProtocol/YProtocol.hh"
00026
00027 #include "Xrd/XrdJob.hh"
00028 #include "Xrd/XrdLink.hh"
00029 #include "Xrd/XrdScheduler.hh"
00030
00031 #include "XrdCms/XrdCmsCache.hh"
00032 #include "XrdCms/XrdCmsConfig.hh"
00033 #include "XrdCms/XrdCmsCluster.hh"
00034 #include "XrdCms/XrdCmsNode.hh"
00035 #include "XrdCms/XrdCmsState.hh"
00036 #include "XrdCms/XrdCmsSelect.hh"
00037 #include "XrdCms/XrdCmsTrace.hh"
00038 #include "XrdCms/XrdCmsTypes.hh"
00039
00040 #include "XrdNet/XrdNetDNS.hh"
00041
00042 #include "XrdOuc/XrdOucPup.hh"
00043
00044 #include "XrdSys/XrdSysPlatform.hh"
00045 #include "XrdSys/XrdSysPthread.hh"
00046 #include "XrdSys/XrdSysTimer.hh"
00047
00048 using namespace XrdCms;
00049
00050
00051
00052
00053
00054 XrdCmsCluster XrdCms::Cluster;
00055
00056
00057
00058
00059
00060 class XrdCmsDrop : XrdJob
00061 {
00062 public:
00063
00064 void DoIt() {Cluster.STMutex.Lock();
00065 int rc = Cluster.Drop(nodeEnt, nodeInst, this);
00066 Cluster.STMutex.UnLock();
00067 if (!rc) delete this;
00068 }
00069
00070 XrdCmsDrop(int nid, int inst) : XrdJob("drop node")
00071 {nodeEnt = nid;
00072 nodeInst = inst;
00073 Sched->Schedule((XrdJob *)this, time(0)+Config.DRPDelay);
00074 }
00075 ~XrdCmsDrop() {}
00076
00077 int nodeEnt;
00078 int nodeInst;
00079 };
00080
00081
00082
00083
00084
00085 XrdCmsCluster::XrdCmsCluster()
00086 {
00087 memset((void *)NodeTab, 0, sizeof(NodeTab));
00088 memset((void *)AltMans, (int)' ', sizeof(AltMans));
00089 cidFirst= 0;
00090 AltMend = AltMans;
00091 AltMent = -1;
00092 NodeCnt = 0;
00093 STHi = -1;
00094 SelAcnt = 0;
00095 SelRcnt = 0;
00096 doReset = 0;
00097 resetMask = 0;
00098 peerHost = 0;
00099 peerMask = ~peerHost;
00100 }
00101
00102
00103
00104
00105
00106 XrdCmsNode *XrdCmsCluster::Add(XrdLink *lp, int port, int Status,
00107 int sport, const char *theNID)
00108 {
00109 EPNAME("Add")
00110 sockaddr InetAddr;
00111 const char *act = "";
00112 unsigned int ipaddr;
00113 XrdCmsNode *nP = 0;
00114 int Slot, Free = -1, Bump1 = -1, Bump2 = -1, Bump3 = -1, aSet = 0;
00115 int tmp, Special = (Status & (CMS_isMan|CMS_isPeer));
00116 XrdSysMutexHelper STMHelper(STMutex);
00117
00118
00119
00120 lp->Name(&InetAddr);
00121 ipaddr = XrdNetDNS::IPAddr(&InetAddr);
00122
00123
00124
00125
00126
00127
00128
00129
00130 for (Slot = 0; Slot < STMax; Slot++)
00131 if (NodeTab[Slot])
00132 {if (NodeTab[Slot]->isNode(ipaddr, theNID)) break;
00133 if (NodeTab[Slot]->isConn)
00134 {if (!NodeTab[Slot]->isPerm && Special)
00135 Bump2 = Slot;
00136 } else {
00137 if ( NodeTab[Slot]->isPerm)
00138 {if (Bump3 < 0 && Special) Bump3 = Slot;}
00139 else Bump1 = Slot;
00140 }
00141 } else if (Free < 0) Free = Slot;
00142
00143
00144
00145 if (Slot < STMax)
00146 {if (NodeTab[Slot] && NodeTab[Slot]->isBound)
00147 {Say.Emsg("Cluster", lp->ID, "already logged in.");
00148 return 0;
00149 } else {
00150 nP = NodeTab[Slot];
00151 nP->Link = lp;
00152 nP->isOffline = 0;
00153 nP->isConn = 1;
00154 nP->Instance++;
00155 nP->setName(lp, port);
00156 act = "Re-added ";
00157 }
00158 }
00159
00160
00161
00162 if (!nP)
00163 {if (Free >= 0) Slot = Free;
00164 else {if (Bump1 >= 0) Slot = Bump1;
00165 else Slot = (Bump2 >= 0 ? Bump2 : Bump3);
00166 if (Slot < 0)
00167 {if (Status & CMS_isPeer) Say.Emsg("Cluster", "Add peer", lp->ID,
00168 "failed; too many subscribers.");
00169 else {sendAList(lp);
00170 DEBUG(lp->ID <<" redirected; too many subscribers.");
00171 }
00172 return 0;
00173 }
00174
00175 if (Status & CMS_isMan) {setAltMan(Slot,ipaddr,sport); aSet=1;}
00176 if (NodeTab[Slot] && !(Status & CMS_isPeer))
00177 sendAList(NodeTab[Slot]->Link);
00178
00179 DEBUG(lp->ID << " bumps " << NodeTab[Slot]->Ident <<" #" <<Slot);
00180 NodeTab[Slot]->Lock();
00181 Remove("redirected", NodeTab[Slot], -1);
00182 act = "Shoved ";
00183 }
00184 NodeTab[Slot] = nP = new XrdCmsNode(lp, port, theNID, 0, Slot);
00185 }
00186
00187
00188
00189 nP->isPerm = (Status & (CMS_isMan | CMS_isPeer)) ? CMS_Perm : 0;
00190
00191
00192
00193 if (!aSet && (Status & CMS_isMan)) setAltMan(Slot, ipaddr, sport);
00194 if (Slot > STHi) STHi = Slot;
00195 nP->isBound = 1;
00196 nP->isConn = 1;
00197 nP->isNoStage = (Status & CMS_noStage);
00198 nP->isSuspend = (Status & CMS_Suspend);
00199 nP->isMan = (Status & CMS_isMan);
00200 nP->isPeer = (Status & CMS_isPeer);
00201 nP->isDisable = 1;
00202 NodeCnt++;
00203 if (Config.SUPLevel
00204 && (tmp = NodeCnt*Config.SUPLevel/100) > Config.SUPCount)
00205 {Config.SUPCount=tmp; CmsState.Set(tmp);}
00206
00207
00208
00209 if (nP->isPeer) peerHost |= nP->NodeMask;
00210 else peerHost &= ~nP->NodeMask;
00211 peerMask = ~peerHost;
00212
00213
00214
00215 nP->myCNUM = Assign(nP->myCID);
00216
00217
00218
00219 DEBUG(act <<nP->Ident <<" to cluster " <<nP->myCNUM <<" slot "
00220 <<Slot <<'.' <<nP->Instance <<" (n=" <<NodeCnt <<" m="
00221 <<Config.SUPCount <<") ID=" <<nP->myNID);
00222
00223
00224
00225 if (Config.asManager())
00226 CmsState.Update(XrdCmsState::Counts,nP->isSuspend?0:1,nP->isNoStage?0:1);
00227
00228
00229
00230 return nP;
00231 }
00232
00233
00234
00235
00236
00237 SMask_t XrdCmsCluster::Broadcast(SMask_t smask, const struct iovec *iod,
00238 int iovcnt, int iotot)
00239 {
00240 EPNAME("Broadcast")
00241 int i;
00242 XrdCmsNode *nP;
00243 SMask_t bmask, unQueried(0);
00244
00245
00246
00247 STMutex.Lock();
00248 bmask = smask & peerMask;
00249
00250
00251
00252 for (i = 0; i <= STHi; i++)
00253 {if ((nP = NodeTab[i]) && nP->isNode(bmask))
00254 {nP->Lock();
00255 STMutex.UnLock();
00256 if (nP->Send(iod, iovcnt, iotot) < 0)
00257 {unQueried |= nP->Mask();
00258 DEBUG(nP->Ident <<" is unreachable");
00259 }
00260 nP->UnLock();
00261 STMutex.Lock();
00262 }
00263 }
00264 STMutex.UnLock();
00265 return unQueried;
00266 }
00267
00268
00269
00270 SMask_t XrdCmsCluster::Broadcast(SMask_t smask, XrdCms::CmsRRHdr &Hdr,
00271 char *Data, int Dlen)
00272 {
00273 struct iovec ioV[3], *iovP = &ioV[1];
00274 unsigned short Temp;
00275 int Blen;
00276
00277
00278
00279
00280 Blen = XrdOucPup::Pack(&iovP, Data, Temp, (Dlen ? strlen(Data)+1 : Dlen));
00281 Hdr.datalen = htons(static_cast<unsigned short>(Blen));
00282
00283
00284
00285 ioV[0].iov_base = (char *)&Hdr; ioV[0].iov_len = sizeof(Hdr);
00286 return Broadcast(smask, ioV, 3, Blen+sizeof(Hdr));
00287 }
00288
00289
00290
00291 SMask_t XrdCmsCluster::Broadcast(SMask_t smask, XrdCms::CmsRRHdr &Hdr,
00292 void *Data, int Dlen)
00293 {
00294 struct iovec ioV[2] = {{(char *)&Hdr, sizeof(Hdr)}, {(char *)Data, Dlen}};
00295
00296
00297
00298 Hdr.datalen = htons(static_cast<unsigned short>(Dlen));
00299 return Broadcast(smask, ioV, 2, Dlen+sizeof(Hdr));
00300 }
00301
00302
00303
00304
00305
00306 SMask_t XrdCmsCluster::getMask(unsigned int IPv4adr)
00307 {
00308 int i;
00309 XrdCmsNode *nP;
00310 SMask_t smask(0);
00311
00312
00313
00314 STMutex.Lock();
00315
00316
00317
00318 for (i = 0; i <= STHi; i++)
00319 if ((nP = NodeTab[i]) && nP->isNode(IPv4adr))
00320 {smask = nP->NodeMask; break;}
00321
00322
00323
00324 STMutex.UnLock();
00325 return smask;
00326 }
00327
00328
00329
00330 SMask_t XrdCmsCluster::getMask(const char *Cid)
00331 {
00332 XrdCmsNode *nP;
00333 SMask_t smask(0);
00334 XrdOucTList *cP;
00335 int i = 1, Cnum = -1;
00336
00337
00338
00339 cidMutex.Lock();
00340
00341
00342
00343 cP = cidFirst;
00344 while(cP && (i = strcmp(Cid, cP->text)) < 0) cP = cP->next;
00345
00346
00347
00348 if (cP) Cnum = cP->val;
00349 cidMutex.UnLock();
00350 if (i) return smask;
00351
00352
00353
00354 STMutex.Lock();
00355
00356
00357
00358 for (i = 0; i <= STHi; i++)
00359 if ((nP = NodeTab[i]) && nP->myCNUM == Cnum) smask |= nP->NodeMask;
00360
00361
00362
00363 STMutex.UnLock();
00364 return smask;
00365 }
00366
00367
00368
00369
00370
00371 XrdCmsSelected *XrdCmsCluster::List(SMask_t mask, CmsLSOpts opts)
00372 {
00373 const char *reason;
00374 int i, iend, nump, delay, lsall = opts & LS_All;
00375 XrdCmsNode *nP;
00376 XrdCmsSelected *sipp = 0, *sip;
00377
00378
00379
00380 STMutex.Lock();
00381 iend = (opts & LS_Best ? 0 : STHi);
00382 for (i = 0; i <= iend; i++)
00383 {if (opts & LS_Best)
00384 nP = (Config.sched_RR
00385 ? SelbyRef( mask, nump, delay, &reason, 0)
00386 : SelbyLoad(mask, nump, delay, &reason, 0));
00387 else if ((nP=NodeTab[i]) && !lsall && !(nP->NodeMask & mask)) nP=0;
00388 if (nP)
00389 {sip = new XrdCmsSelected((opts & LS_IPO) ? 0 : nP->Name(), sipp);
00390 if (opts & LS_IPV6)
00391 {sip->IPV6Len = nP->IPV6Len;
00392 strcpy(sip->IPV6, nP->IPV6);
00393 }
00394 sip->Mask = nP->NodeMask;
00395 sip->Id = nP->NodeID;
00396 sip->IPAddr = nP->IPAddr;
00397 sip->Port = nP->Port;
00398 sip->Load = nP->myLoad;
00399 sip->Util = nP->DiskUtil;
00400 sip->RefTotA = nP->RefTotA + nP->RefA;
00401 sip->RefTotR = nP->RefTotR + nP->RefR;
00402 if (nP->isOffline) sip->Status = XrdCmsSelected::Offline;
00403 else sip->Status = 0;
00404 if (nP->isDisable) sip->Status |= XrdCmsSelected::Disable;
00405 if (nP->isNoStage) sip->Status |= XrdCmsSelected::NoStage;
00406 if (nP->isSuspend) sip->Status |= XrdCmsSelected::Suspend;
00407 if (nP->isRW ) sip->Status |= XrdCmsSelected::isRW;
00408 if (nP->isMan ) sip->Status |= XrdCmsSelected::isMangr;
00409 if (nP->isPeer ) sip->Status |= XrdCmsSelected::isPeer;
00410 if (nP->isProxy ) sip->Status |= XrdCmsSelected::isProxy;
00411 nP->UnLock();
00412 sipp = sip;
00413 }
00414 }
00415 STMutex.UnLock();
00416
00417
00418
00419 return sipp;
00420 }
00421
00422
00423
00424
00425
00426 int XrdCmsCluster::Locate(XrdCmsSelect &Sel)
00427 {
00428 EPNAME("Locate");
00429 XrdCmsPInfo pinfo;
00430 SMask_t qfVec(0);
00431 char *Path;
00432 int retc = 0;
00433
00434
00435
00436 if (*Sel.Path.Val != '*') Path = Sel.Path.Val;
00437 else {if (*(Sel.Path.Val+1) == '\0')
00438 {Sel.Vec.hf = ~0LL; Sel.Vec.pf = Sel.Vec.wf = 0;
00439 return 0;
00440 }
00441 Path = Sel.Path.Val+1;
00442 }
00443
00444
00445
00446 if (!Cache.Paths.Find(Path, pinfo) || !pinfo.rovec)
00447 {Sel.Vec.hf = Sel.Vec.pf = Sel.Vec.wf = 0;
00448 return -1;
00449 } else Sel.Vec.wf = pinfo.rwvec;
00450
00451
00452
00453 if (*Sel.Path.Val == '*')
00454 {Sel.Vec.hf = pinfo.rovec; Sel.Vec.pf = 0;
00455 Sel.Vec.wf = pinfo.rwvec;
00456 return 0;
00457 }
00458
00459
00460
00461 if (Sel.InfoP)
00462 {Sel.InfoP->rwVec = pinfo.rwvec;
00463 Sel.InfoP->isLU = 1;
00464 }
00465
00466
00467
00468
00469
00470
00471 if (Sel.Opts & XrdCmsSelect::Refresh
00472 || !(retc = Cache.GetFile(Sel, pinfo.rovec)))
00473 {Cache.AddFile(Sel, 0);
00474 qfVec = pinfo.rovec; Sel.Vec.hf = 0;
00475 } else qfVec = Sel.Vec.bf;
00476
00477
00478
00479 if ((!qfVec && retc >= 0) || (Sel.Vec.hf && Sel.InfoP)) retc = 0;
00480 else if (!(retc = Cache.WT4File(Sel, Sel.Vec.hf))) retc = -2;
00481
00482
00483
00484 if (qfVec)
00485 {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
00486 if (Sel.Opts & XrdCmsSelect::Refresh)
00487 QReq.Hdr.modifier |= CmsStateRequest::kYR_refresh;
00488 TRACE(Files, "seeking " <<Sel.Path.Val);
00489 qfVec = Cluster.Broadcast(qfVec, QReq.Hdr,
00490 (void *)Sel.Path.Val, Sel.Path.Len+1);
00491 if (qfVec) Cache.UnkFile(Sel, qfVec);
00492 }
00493 return retc;
00494 }
00495
00496
00497
00498
00499
00500 void *XrdCmsCluster::MonPerf()
00501 {
00502 CmsUsageRequest Usage = {{0, kYR_usage, 0, 0}};
00503 struct iovec ioV[] = {{(char *)&Usage, sizeof(Usage)}};
00504 int ioVnum = sizeof(ioV)/sizeof(struct iovec);
00505 int ioVtot = sizeof(Usage);
00506 SMask_t allNodes(~0);
00507 int uInterval = Config.AskPing*Config.AskPerf;
00508
00509
00510
00511 while(uInterval)
00512 {XrdSysTimer::Snooze(uInterval);
00513 Broadcast(allNodes, ioV, ioVnum, ioVtot);
00514 }
00515 return (void *)0;
00516 }
00517
00518
00519
00520
00521
00522 void *XrdCmsCluster::MonRefs()
00523 {
00524 XrdCmsNode *nP;
00525 int i, snooze_interval = 10*60, loopmax, loopcnt = 0;
00526 int resetA, resetR, resetAR;
00527
00528
00529
00530 if ((loopmax = Config.RefReset / snooze_interval) <= 1)
00531 {if (!Config.RefReset) loopmax = 0;
00532 else {loopmax = 1; snooze_interval = Config.RefReset;}
00533 }
00534
00535
00536
00537
00538
00539 do {XrdSysTimer::Snooze(snooze_interval);
00540 loopcnt++;
00541 STMutex.Lock();
00542 resetA = (SelAcnt >= Config.RefTurn);
00543 resetR = (SelRcnt >= Config.RefTurn);
00544 resetAR = (loopmax && loopcnt >= loopmax && (resetA || resetR));
00545 if (doReset || resetAR)
00546 {for (i = 0; i <= STHi; i++)
00547 if ((nP = NodeTab[i])
00548 && (resetAR || (doReset && nP->isNode(resetMask))) )
00549 {nP->Lock();
00550 if (resetA || doReset) {nP->RefTotA += nP->RefA;nP->RefA=0;}
00551 if (resetR || doReset) {nP->RefTotR += nP->RefR;nP->RefR=0;}
00552 nP->UnLock();
00553 }
00554 if (resetAR)
00555 {if (resetA) SelAcnt = 0;
00556 if (resetR) SelRcnt = 0;
00557 loopcnt = 0;
00558 }
00559 if (doReset) {doReset = 0; resetMask = 0;}
00560 }
00561 STMutex.UnLock();
00562 } while(1);
00563 return (void *)0;
00564 }
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576 void XrdCmsCluster::Remove(const char *reason, XrdCmsNode *theNode, int immed)
00577 {
00578 EPNAME("Remove_Node")
00579 struct theLocks
00580 {XrdSysMutex *myMutex;
00581 XrdCmsNode *myNode;
00582 char *myIdent;
00583 int myImmed;
00584 int myNID;
00585 int myInst;
00586
00587 theLocks(XrdSysMutex *mtx, XrdCmsNode *node, int immed)
00588 : myMutex(mtx), myNode(node), myImmed(immed)
00589 {myIdent = strdup(node->Ident);
00590 myNID = node->ID(myInst);
00591 if (myImmed >= 0)
00592 {myNode->UnLock();
00593 myMutex->Lock();
00594 myNode->Lock();
00595 }
00596 }
00597 ~theLocks()
00598 {if (myImmed >= 0) myMutex->UnLock();
00599 if (myNode) myNode->UnLock();
00600 free(myIdent);
00601 }
00602 } LockHandler(&STMutex, theNode, immed);
00603
00604 int Inst, NodeID = theNode->ID(Inst);
00605
00606
00607
00608
00609
00610
00611 if (LockHandler.myNID != NodeID || LockHandler.myInst != Inst)
00612 {Say.Emsg("Manager", LockHandler.myIdent, "removal aborted.");
00613 DEBUG(LockHandler.myIdent <<" node " <<NodeID <<'.' <<Inst <<" != "
00614 << LockHandler.myNID <<'.' <<LockHandler.myInst <<" at entry.");
00615 }
00616
00617
00618
00619 theNode->isOffline = 1;
00620
00621
00622
00623
00624
00625
00626 if (theNode->isConn)
00627 {theNode->Disc(reason, 0);
00628 theNode->isGone = 1;
00629 return;
00630 }
00631
00632
00633
00634
00635
00636 if (theNode->isBound)
00637 {theNode->isBound = 0; NodeCnt--;
00638 if (Config.asManager())
00639 CmsState.Update(XrdCmsState::Counts, theNode->isSuspend ? 0 : -1,
00640 theNode->isNoStage ? 0 : -1);
00641 }
00642
00643
00644
00645
00646 if (immed || !Config.DRPDelay)
00647 {Drop(NodeID, Inst);
00648 LockHandler.myNode = 0;
00649 return;
00650 }
00651
00652
00653
00654
00655 theNode->DropTime = time(0)+Config.DRPDelay;
00656 if (theNode->DropJob) theNode->DropJob->nodeInst = Inst;
00657 else theNode->DropJob = new XrdCmsDrop(NodeID, Inst);
00658
00659
00660
00661 if (reason)
00662 Say.Emsg("Manager", theNode->Ident, "scheduled for removal;", reason);
00663 else DEBUG(theNode->Ident <<" node " <<NodeID <<'.' <<Inst);
00664 }
00665
00666
00667
00668
00669
00670 void XrdCmsCluster::ResetRef(SMask_t smask)
00671 {
00672
00673
00674
00675 STMutex.Lock();
00676
00677
00678
00679 doReset = 1;
00680 resetMask |= smask;
00681
00682
00683
00684 STMutex.UnLock();
00685 }
00686
00687
00688
00689
00690
00691 int XrdCmsCluster::Select(XrdCmsSelect &Sel)
00692 {
00693 EPNAME("Select");
00694 XrdCmsPInfo pinfo;
00695 const char *Amode;
00696 int dowt = 0, retc, isRW, fRD, noSel = (Sel.Opts & XrdCmsSelect::Defer);
00697 SMask_t amask, smask, pmask;
00698
00699
00700
00701 if (Sel.Opts & XrdCmsSelect::Write)
00702 {isRW = 1; Amode = "write";
00703 if (Config.RWDelay)
00704 if (Sel.Opts & XrdCmsSelect::Create && Config.RWDelay < 2) fRD = 1;
00705 else fRD = 0;
00706 else fRD = 1;
00707 }
00708 else {isRW = 0; Amode = "read"; fRD = 1;}
00709
00710
00711
00712 if (!Cache.Paths.Find(Sel.Path.Val, pinfo)
00713 || (amask = ((isRW ? pinfo.rwvec : pinfo.rovec) & ~Sel.nmask)) == 0)
00714 {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
00715 "No servers have %s access to the file", Amode)+1;
00716 return -1;
00717 }
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727 if (!(Sel.Opts & XrdCmsSelect::Refresh)
00728 && (retc = Cache.GetFile(Sel, pinfo.rovec)))
00729 {if (isRW)
00730 { if (Sel.Opts & XrdCmsSelect::Replica)
00731 {pmask = amask & ~(Sel.Vec.hf | Sel.Vec.bf); smask = 0;
00732 if (!pmask && !Sel.Vec.bf) return SelFail(Sel,eNoRep);
00733 }
00734 else if (Sel.Vec.bf) pmask = smask = 0;
00735 else if (Sel.Vec.hf)
00736 {if (Sel.Opts & XrdCmsSelect::NewFile) return SelFail(Sel,eExists);
00737 if (!(Sel.Opts & XrdCmsSelect::isMeta)
00738 && Multiple(Sel.Vec.hf)) return SelFail(Sel,eDups);
00739 if (!(pmask = Sel.Vec.hf & amask)) return SelFail(Sel,eROfs);
00740 smask = 0;
00741 }
00742 else if (Sel.Opts & XrdCmsSelect::Trunc) {pmask = amask; smask = 0;}
00743 else pmask = ((smask = pinfo.ssvec & amask) ? 0 : amask);
00744 } else {
00745 pmask = Sel.Vec.hf & amask;
00746 if (Sel.Opts & XrdCmsSelect::Online) {pmask &= ~Sel.Vec.pf; smask=0;}
00747 else smask = pinfo.ssvec & amask;
00748 }
00749 if (Sel.Vec.hf & Sel.nmask) Cache.UnkFile(Sel, Sel.nmask);
00750 } else {
00751 Cache.AddFile(Sel, 0);
00752 Sel.Vec.bf = pinfo.rovec;
00753 Sel.Vec.hf = Sel.Vec.pf = pmask = smask = 0;
00754 retc = 0;
00755 }
00756
00757
00758
00759 dowt = (!pmask && !smask);
00760
00761
00762
00763
00764 if (Sel.Vec.bf)
00765 {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
00766 if (Sel.Opts & XrdCmsSelect::Refresh)
00767 QReq.Hdr.modifier |= CmsStateRequest::kYR_refresh;
00768 if (dowt) retc= (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
00769 TRACE(Files, "seeking " <<Sel.Path.Val);
00770 amask = Cluster.Broadcast(Sel.Vec.bf, QReq.Hdr,
00771 (void *)Sel.Path.Val,Sel.Path.Len+1);
00772 if (amask) Cache.UnkFile(Sel, amask);
00773 if (dowt) return retc;
00774 } else if (dowt && retc < 0 && !noSel)
00775 return (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
00776
00777
00778
00779 if ((Sel.Opts & XrdCmsSelect::Freshen) && (amask = pmask & ~Sel.Vec.bf))
00780 {CmsStateRequest Qupt={{0,kYR_state,kYR_raw|CmsStateRequest::kYR_noresp,0}};
00781 Cluster.Broadcast(amask, Qupt.Hdr,(void *)Sel.Path.Val,Sel.Path.Len+1);
00782 }
00783
00784
00785
00786 if (noSel) return 0;
00787
00788
00789
00790 if (dowt || (retc = SelNode(Sel, pmask, smask)) < 0)
00791 {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
00792 "No servers are available to %s%s the file.",
00793 Sel.Opts & XrdCmsSelect::Online ? "immediately " : "",
00794 (smask ? "stage" : Amode))+1;
00795 return -1;
00796 }
00797
00798
00799
00800 return retc;
00801 }
00802
00803
00804
00805 int XrdCmsCluster::Select(int isrw, SMask_t pmask,
00806 int &port, char *hbuff, int &hlen)
00807 {
00808 static const SMask_t smLow(255);
00809 XrdCmsNode *nP = 0;
00810 SMask_t tmask;
00811 int Snum = 0;
00812
00813
00814
00815 if (!pmask) return 0;
00816 do {if (!(tmask = pmask & smLow)) Snum += 8;
00817 else {while((tmask = tmask>>1)) Snum++; break;}
00818 } while((pmask = pmask >> 8));
00819
00820
00821
00822 STMutex.Lock();
00823 if ((nP = NodeTab[Snum]))
00824 {if (nP->isOffline || nP->isSuspend || nP->isDisable) nP = 0;
00825 else if (!Config.sched_RR
00826 && (nP->myLoad > Config.MaxLoad)) nP = 0;
00827 if (nP)
00828 {if (isrw)
00829 if (nP->isNoStage || nP->DiskFree < nP->DiskMinF) nP = 0;
00830 else {SelAcnt++; nP->Lock();}
00831 else {SelRcnt++; nP->Lock();}
00832 }
00833 }
00834 STMutex.UnLock();
00835
00836
00837
00838 if (nP)
00839 {strcpy(hbuff, nP->Name(hlen, port));
00840 nP->RefR++;
00841 nP->UnLock();
00842 return 1;
00843 }
00844 return 0;
00845 }
00846
00847
00848
00849
00850
00851 int XrdCmsCluster::SelFail(XrdCmsSelect &Sel, int rc)
00852 {
00853 const char *etext = (rc == eExists
00854 ? "Unable to create new file; file already exists."
00855 : (rc == eROfs)
00856 ? "Unable to write file; r/o file already exists."
00857 : (rc == eDups)
00858 ? "Unable to write file; multiple files exist."
00859 : "Unable to replicate file; no new sites available.");
00860
00861 Sel.Resp.DLen = strlcpy(Sel.Resp.Data, etext, sizeof(Sel.Resp.Data))+1;
00862 return -1;
00863 }
00864
00865
00866
00867
00868
00869 void XrdCmsCluster::Space(SpaceData &sData, SMask_t smask)
00870 {
00871 int i;
00872 XrdCmsNode *nP;
00873 SMask_t bmask;
00874
00875
00876
00877 STMutex.Lock();
00878 bmask = smask & peerMask;
00879
00880
00881
00882 for (i = 0; i <= STHi; i++)
00883 if ((nP = NodeTab[i]) && nP->isNode(bmask)
00884 && !nP->isOffline && nP->isRW)
00885 {sData.Total += nP->DiskTotal;
00886 sData.sNum++;
00887 if (sData.sFree < nP->DiskFree)
00888 {sData.sFree = nP->DiskFree; sData.sUtil = nP->DiskUtil;}
00889 if (nP->isRW & XrdCmsNode::allowsRW)
00890 {sData.wNum++;
00891 if (sData.wFree < nP->DiskFree)
00892 {sData.wFree = nP->DiskFree; sData.wUtil = nP->DiskUtil;
00893 sData.wMinF = nP->DiskMinF;
00894 }
00895 }
00896 }
00897 STMutex.UnLock();
00898 }
00899
00900
00901
00902
00903
00904 int XrdCmsCluster::Stats(char *bfr, int bln)
00905 {
00906 static const char statfmt1[] = "<stats id=\"cms\"><name>%s</name>";
00907 static const char statfmt2[] = "<subscriber><name>%s</name>"
00908 "<status>%s</status><load>%d</load><diskfree>%d</diskfree>"
00909 "<refa>%d</refa><refr>%d</refr></subscriber>";
00910 static const char statfmt3[] = "</stats>\n";
00911 XrdCmsSelected *sp;
00912 int mlen, tlen = sizeof(statfmt3);
00913 char stat[6], *stp;
00914
00915 class spmngr {
00916 public: XrdCmsSelected *sp;
00917
00918 spmngr() {sp = 0;}
00919 ~spmngr() {XrdCmsSelected *xsp;
00920 while((xsp = sp)) {sp = sp->next; delete xsp;}
00921 }
00922 } mngrsp;
00923
00924
00925
00926 if (!bfr) return sizeof(statfmt1) + 256 +
00927 (sizeof(statfmt2) + 20*4 + 256) * STMax +
00928 sizeof(statfmt3) + 1;
00929
00930
00931
00932 mngrsp.sp = sp = List(FULLMASK, LS_All);
00933
00934
00935
00936 mlen = snprintf(bfr, bln, statfmt1, Config.myName);
00937 if ((bln -= mlen) <= 0) return 0;
00938 tlen += mlen;
00939
00940 while(sp && bln)
00941 {stp = stat;
00942 if (sp->Status)
00943 {if (sp->Status & XrdCmsSelected::Offline) *stp++ = 'o';
00944 if (sp->Status & XrdCmsSelected::Suspend) *stp++ = 's';
00945 if (sp->Status & XrdCmsSelected::NoStage) *stp++ = 'n';
00946 if (sp->Status & XrdCmsSelected::Disable) *stp++ = 'd';
00947 } else *stp++ = 'a';
00948 bfr += mlen;
00949 mlen = snprintf(bfr, bln, statfmt2, sp->Name, stat,
00950 sp->Load, sp->Free, sp->RefTotA, sp->RefTotR);
00951 bln -= mlen;
00952 tlen += mlen;
00953 sp = sp->next;
00954 }
00955
00956
00957
00958 if (sp || bln < (int)sizeof(statfmt1)) return 0;
00959 bfr += mlen;
00960 strcpy(bfr, statfmt3);
00961 return tlen;
00962 }
00963
00964
00965
00966
00967
00968
00969
00970
00971 int XrdCmsCluster::Assign(const char *Cid)
00972 {
00973 static int cNum = 0;
00974 XrdOucTList *cP, *cPP, *cNew;
00975 int n = -1;
00976
00977
00978
00979 cidMutex.Lock();
00980
00981
00982
00983 cP = cidFirst; cPP = 0;
00984 while(cP && (n = strcmp(Cid, cP->text)) < 0) {cPP = cP; cP = cP->next;}
00985
00986
00987
00988 if (!n && cP) {n = cP->val; cidMutex.UnLock(); return n;}
00989
00990
00991
00992 n = ++cNum;
00993 cNew = new XrdOucTList(Cid, cNum, cP);
00994 if (cPP) cPP->next = cNew;
00995 else cidFirst = cNew;
00996
00997
00998
00999 cidMutex.UnLock();
01000 return n;
01001 }
01002
01003
01004
01005
01006
01007 XrdCmsNode *XrdCmsCluster::calcDelay(int nump, int numd, int numf, int numo,
01008 int nums, int &delay, const char **reason)
01009 {
01010 if (!nump) {delay = 0;
01011 *reason = "no eligible servers for";
01012 }
01013 else if (numf) {delay = Config.DiskWT;
01014 *reason = "no eligible servers have space for";
01015 }
01016 else if (numo) {delay = Config.MaxDelay;
01017 *reason = "eligible servers overloaded for";
01018 }
01019 else if (nums) {delay = Config.SUSDelay;
01020 *reason = "eligible servers suspended for";
01021 }
01022 else if (numd) {delay = Config.SUPDelay;
01023 *reason = "eligible servers offline for";
01024 }
01025 else {delay = Config.SUPDelay;
01026 *reason = "server selection error for";
01027 }
01028 return (XrdCmsNode *)0;
01029 }
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040 int XrdCmsCluster::Drop(int sent, int sinst, XrdCmsDrop *djp)
01041 {
01042 EPNAME("Drop_Node")
01043 XrdCmsNode *nP;
01044 char hname[512];
01045
01046
01047
01048 if (!(nP = NodeTab[sent]) || nP->Inst() != sinst)
01049 {if (nP && djp == nP->DropJob) {nP->DropJob = 0; nP->DropTime = 0;}
01050 DEBUG(sent <<'.' <<sinst <<" cancelled.");
01051 return 0;
01052 }
01053
01054
01055
01056 if (djp && time(0) < nP->DropTime)
01057 {Sched->Schedule((XrdJob *)djp, nP->DropTime);
01058 return 1;
01059 }
01060
01061
01062
01063 strlcpy(hname, nP->Ident, sizeof(hname));
01064
01065
01066
01067 NodeTab[sent] = 0;
01068 nP->isOffline = 1;
01069 nP->DropTime = 0;
01070 nP->DropJob = 0;
01071 nP->isBound = 0;
01072
01073
01074
01075 if (nP->isPeer) {peerHost &= nP->NodeMask; peerMask = ~peerHost;}
01076
01077
01078
01079 if (nP->isMan)
01080 {memset((void *)&AltMans[sent*AltSize], (int)' ', AltSize);
01081 if (sent == AltMent)
01082 {AltMent--;
01083 while(AltMent >= 0 && NodeTab[AltMent]
01084 && !NodeTab[AltMent]->isMan) AltMent--;
01085 if (AltMent < 0) AltMend = AltMans;
01086 else AltMend = AltMans + ((AltMent+1)*AltSize);
01087 }
01088 }
01089
01090
01091
01092 if (sent == STHi) while(STHi >= 0 && !NodeTab[STHi]) STHi--;
01093
01094
01095
01096 if (nP->NodeMask) Cache.Drop(nP->NodeMask, sent, STHi);
01097
01098
01099
01100 Say.Emsg("Drop_Node", hname, "dropped.");
01101
01102
01103
01104 delete nP;
01105 return 0;
01106 }
01107
01108
01109
01110
01111
01112 int XrdCmsCluster::Multiple(SMask_t mVec)
01113 {
01114 static const unsigned long long Left32 = 0xffffffff00000000LL;
01115 static const unsigned long long Right32 = 0x00000000ffffffffLL;
01116 static const unsigned long long Left16 = 0x00000000ffff0000LL;
01117 static const unsigned long long Right16 = 0x000000000000ffffLL;
01118 static const unsigned long long Left08 = 0x000000000000ff00LL;
01119 static const unsigned long long Right08 = 0x00000000000000ffLL;
01120 static const unsigned long long Left04 = 0x00000000000000f0LL;
01121 static const unsigned long long Right04 = 0x000000000000000fLL;
01122
01123 static const int isMult[16] = {0,0,0,1,0,1,1,1,0,1,1,1,1,1,1,1};
01124
01125 if (mVec & Left32) {if (mVec & Right32) return 1;
01126 else mVec = mVec >> 32LL;
01127 }
01128 if (mVec & Left16) {if (mVec & Right16) return 1;
01129 else mVec = mVec >> 16LL;
01130 }
01131 if (mVec & Left08) {if (mVec & Right08) return 1;
01132 else mVec = mVec >> 8LL;
01133 }
01134 if (mVec & Left04) {if (mVec & Right04) return 1;
01135 else mVec = mVec >> 4LL;
01136 }
01137 return isMult[mVec];
01138 }
01139
01140
01141
01142
01143
01144 void XrdCmsCluster::Record(char *path, const char *reason)
01145 {
01146 EPNAME("Record")
01147 static int msgcnt = 256;
01148 static XrdSysMutex mcMutex;
01149 int mcnt;
01150
01151 DEBUG(reason <<path);
01152 mcMutex.Lock();
01153 msgcnt++; mcnt = msgcnt;
01154 mcMutex.UnLock();
01155
01156 if (mcnt > 255)
01157 {Say.Emsg("client defered;", reason, path);
01158 mcnt = 1;
01159 }
01160 }
01161
01162
01163
01164
01165
01166 int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
01167 {
01168 EPNAME("SelNode")
01169 const char *act=0, *reason, *reason2 = "";
01170 int pspace, needspace, delay = 0, delay2 = 0, nump, isalt = 0, pass = 2;
01171 SMask_t mask;
01172 XrdCmsNode *nP = 0;
01173
01174
01175
01176
01177 if (Sel.Opts & XrdCmsSelect::isMeta) needspace = 0;
01178 else needspace = (Sel.Opts & XrdCmsSelect::Write?XrdCmsNode::allowsRW:0);
01179 pspace = needspace;
01180
01181
01182
01183
01184 STMutex.Lock();
01185 mask = pmask & peerMask;
01186 while(pass--)
01187 {if (mask)
01188 {nP = (Config.sched_RR
01189 ? SelbyRef( mask, nump, delay, &reason, needspace)
01190 : SelbyLoad(mask, nump, delay, &reason, needspace));
01191 if (nP || (nump && delay) || NodeCnt < Config.SUPCount) break;
01192 }
01193 mask = amask & peerMask; isalt = XrdCmsNode::allowsSS;
01194 if (!(Sel.Opts & XrdCmsSelect::isMeta)) needspace |= isalt;
01195 }
01196 STMutex.UnLock();
01197
01198
01199
01200 if (nP)
01201 {strcpy(Sel.Resp.Data, nP->Name(Sel.Resp.DLen, Sel.Resp.Port));
01202 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
01203 if (isalt || (Sel.Opts & XrdCmsSelect::Create) || Sel.iovN)
01204 {if (isalt || (Sel.Opts & XrdCmsSelect::Create))
01205 {Sel.Opts |= (XrdCmsSelect::Pending | XrdCmsSelect::Advisory);
01206 if (Sel.Opts & XrdCmsSelect::noBind) act = " handling ";
01207 else Cache.AddFile(Sel, nP->NodeMask);
01208 }
01209 if (Sel.iovN && Sel.iovP)
01210 {nP->Send(Sel.iovP, Sel.iovN); act = " staging ";}
01211 else if (!act) act = " assigned ";
01212 } else act = " serving ";
01213 nP->UnLock();
01214 TRACE(Stage, Sel.Resp.Data <<act <<Sel.Path.Val);
01215 return 0;
01216 } else if (!delay && NodeCnt < Config.SUPCount)
01217 {reason = "insufficient number of nodes";
01218 delay = Config.SUPDelay;
01219 }
01220
01221
01222
01223 if (delay && delay < Config.PSDelay)
01224 {Record(Sel.Path.Val, reason);
01225 return delay;
01226 }
01227
01228
01229
01230 if (Sel.Opts & XrdCmsSelect::Peers)
01231 {STMutex.Lock();
01232 if ((mask = (pmask | amask) & peerHost))
01233 nP = SelbyCost(mask, nump, delay2, &reason2, pspace);
01234 STMutex.UnLock();
01235 if (nP)
01236 {strcpy(Sel.Resp.Data, nP->Name(Sel.Resp.DLen, Sel.Resp.Port));
01237 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
01238 if (Sel.iovN && Sel.iovP) nP->Send(Sel.iovP, Sel.iovN);
01239 nP->UnLock();
01240 TRACE(Stage, "Peer " <<Sel.Resp.Data <<" handling " <<Sel.Path.Val);
01241 return 0;
01242 }
01243 if (!delay) {delay = delay2; reason = reason2;}
01244 }
01245
01246
01247
01248 if (delay)
01249 {TRACE(Defer, "client defered; " <<reason <<" for " <<Sel.Path.Val);
01250 return delay;
01251 }
01252 return -1;
01253 }
01254
01255
01256
01257
01258
01259
01260
01261
01262 XrdCmsNode *XrdCmsCluster::SelbyCost(SMask_t mask, int &nump, int &delay,
01263 const char **reason, int needspace)
01264 {
01265 int i, numd, numf, nums;
01266 XrdCmsNode *np, *sp = 0;
01267
01268
01269
01270 nump = nums = numf = numd = 0;
01271 for (i = 0; i <= STHi; i++)
01272 if ((np = NodeTab[i]) && (np->NodeMask & mask))
01273 {nump++;
01274 if (np->isOffline) {numd++; continue;}
01275 if (np->isSuspend || np->isDisable) {nums++; continue;}
01276 if (needspace && np->isNoStage) {numf++; continue;}
01277 if (!sp) sp = np;
01278 else if (abs(sp->myCost - np->myCost)
01279 <= Config.P_fuzz)
01280 {if (needspace)
01281 {if (sp->RefA > (np->RefA+Config.DiskLinger))
01282 sp=np;
01283 }
01284 else if (sp->RefR > np->RefR) sp=np;
01285 }
01286 else if (sp->myCost > np->myCost) sp=np;
01287 }
01288
01289
01290
01291 if (!sp) return calcDelay(nump, numd, numf, 0, nums, delay, reason);
01292 sp->Lock();
01293 if (needspace) {SelAcnt++; sp->RefA++;}
01294 else {SelRcnt++; sp->RefR++;}
01295 delay = 0;
01296 return sp;
01297 }
01298
01299
01300
01301
01302
01303 XrdCmsNode *XrdCmsCluster::SelbyLoad(SMask_t mask, int &nump, int &delay,
01304 const char **reason, int needspace)
01305 {
01306 int i, numd, numf, numo, nums;
01307 int reqSS = needspace & XrdCmsNode::allowsSS;
01308 XrdCmsNode *np, *sp = 0;
01309
01310
01311
01312 nump = nums = numo = numf = numd = 0;
01313 for (i = 0; i <= STHi; i++)
01314 if ((np = NodeTab[i]) && (np->NodeMask & mask))
01315 {nump++;
01316 if (np->isOffline) {numd++; continue;}
01317 if (np->isSuspend || np->isDisable) {nums++; continue;}
01318 if (np->myLoad > Config.MaxLoad) {numo++; continue;}
01319 if (needspace && (np->DiskFree < np->DiskMinF
01320 || (reqSS && np->isNoStage)))
01321 {numf++; continue;}
01322 if (!sp) sp = np;
01323 else if (needspace)
01324 {if (abs(sp->myMass - np->myMass) <= Config.P_fuzz)
01325 {if (sp->RefA > (np->RefA+Config.DiskLinger)) sp=np;}
01326 else if (sp->myMass > np->myMass) sp=np;
01327 } else {
01328 if (abs(sp->myLoad - np->myLoad) <= Config.P_fuzz)
01329 {if (sp->RefR > np->RefR) sp=np;}
01330 else if (sp->myLoad > np->myLoad) sp=np;
01331 }
01332 }
01333
01334
01335
01336 if (!sp) return calcDelay(nump, numd, numf, numo, nums, delay, reason);
01337 sp->Lock();
01338 if (needspace) {SelAcnt++; sp->RefA++;}
01339 else {SelRcnt++; sp->RefR++;}
01340 delay = 0;
01341 return sp;
01342 }
01343
01344
01345
01346
01347
01348 XrdCmsNode *XrdCmsCluster::SelbyRef(SMask_t mask, int &nump, int &delay,
01349 const char **reason, int needspace)
01350 {
01351 int i, numd, numf, nums;
01352 int reqSS = needspace & XrdCmsNode::allowsSS;
01353 XrdCmsNode *np, *sp = 0;
01354
01355
01356
01357 nump = nums = numf = numd = 0;
01358 for (i = 0; i <= STHi; i++)
01359 if ((np = NodeTab[i]) && (np->NodeMask & mask))
01360 {nump++;
01361 if (np->isOffline) {numd++; continue;}
01362 if (np->isSuspend || np->isDisable) {nums++; continue;}
01363 if (needspace && (np->DiskFree < np->DiskMinF
01364 || (reqSS && np->isNoStage)))
01365 {numf++; continue;}
01366 if (!sp) sp = np;
01367 else if (needspace)
01368 {if (sp->RefA > (np->RefA+Config.DiskLinger)) sp=np;}
01369 else if (sp->RefR > np->RefR) sp=np;
01370 }
01371
01372
01373
01374 if (!sp) return calcDelay(nump, numd, numf, 0, nums, delay, reason);
01375 sp->Lock();
01376 if (needspace) {SelAcnt++; sp->RefA++;}
01377 else {SelRcnt++; sp->RefR++;}
01378 delay = 0;
01379 return sp;
01380 }
01381
01382
01383
01384
01385
01386
01387
01388 void XrdCmsCluster::sendAList(XrdLink *lp)
01389 {
01390 static CmsTryRequest Req = {{0, kYR_try, 0, 0}, 0};
01391 static int HdrSize = sizeof(Req.Hdr) + sizeof(Req.sLen);
01392 static char *AltNext = AltMans;
01393 static struct iovec iov[4] = {{(caddr_t)&Req, HdrSize},
01394 {0, 0},
01395 {AltMans, 0},
01396 {(caddr_t)"\0", 1}};
01397 int dlen;
01398
01399
01400
01401 AltNext = AltNext + AltSize;
01402 if (AltNext >= AltMend)
01403 {AltNext = AltMans;
01404 iov[1].iov_len = 0;
01405 iov[2].iov_len = dlen = AltMend - AltMans;
01406 } else {
01407 iov[1].iov_base = (caddr_t)AltNext;
01408 iov[1].iov_len = AltMend - AltNext;
01409 iov[2].iov_len = AltNext - AltMans;
01410 dlen = iov[1].iov_len + iov[2].iov_len;
01411 }
01412
01413
01414
01415 dlen++;
01416 Req.Hdr.datalen = htons(static_cast<unsigned short>(dlen+sizeof(Req.sLen)));
01417 Req.sLen = htons(static_cast<unsigned short>(dlen));
01418
01419
01420
01421 lp->Send(iov, 4, dlen+HdrSize);
01422 }
01423
01424
01425
01426
01427
01428
01429
01430 void XrdCmsCluster::setAltMan(int snum, unsigned int ipaddr, int port)
01431 {
01432 char *ap = &AltMans[snum*AltSize];
01433 int i;
01434
01435
01436
01437 if (!port || (port > 0x0000ffff)) port = Config.PortTCP;
01438 memset(ap, int(' '), AltSize);
01439
01440
01441
01442 i = XrdNetDNS::IP2String(ipaddr, port, ap, AltSize);
01443 ap[i] = ' ';
01444
01445
01446
01447 if (ap >= AltMend) {AltMend = ap + AltSize; AltMent = snum;}
01448 }