00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdCmsProtocolCVSID = "$Id: XrdCmsProtocol.cc 35287 2010-09-14 21:19:35Z ganis $";
00016
00017
00018 #include <unistd.h>
00019 #include <ctype.h>
00020 #include <errno.h>
00021 #include <signal.h>
00022 #include <stdlib.h>
00023 #include <string.h>
00024 #include <strings.h>
00025 #include <stdio.h>
00026 #include <netinet/in.h>
00027 #include <sys/param.h>
00028
00029 #include "XProtocol/YProtocol.hh"
00030
00031 #include "Xrd/XrdInet.hh"
00032 #include "Xrd/XrdLink.hh"
00033
00034 #include "XrdCms/XrdCmsCache.hh"
00035 #include "XrdCms/XrdCmsCluster.hh"
00036 #include "XrdCms/XrdCmsConfig.hh"
00037 #include "XrdCms/XrdCmsJob.hh"
00038 #include "XrdCms/XrdCmsLogin.hh"
00039 #include "XrdCms/XrdCmsManager.hh"
00040 #include "XrdCms/XrdCmsManTree.hh"
00041 #include "XrdCms/XrdCmsMeter.hh"
00042 #include "XrdCms/XrdCmsProtocol.hh"
00043 #include "XrdCms/XrdCmsRouting.hh"
00044 #include "XrdCms/XrdCmsRTable.hh"
00045 #include "XrdCms/XrdCmsState.hh"
00046 #include "XrdCms/XrdCmsTrace.hh"
00047
00048 #include "XrdNet/XrdNetDNS.hh"
00049
00050 #include "XrdOuc/XrdOucCRC.hh"
00051 #include "XrdOuc/XrdOucPup.hh"
00052 #include "XrdOuc/XrdOucTokenizer.hh"
00053
00054 #include "XrdSys/XrdSysError.hh"
00055 #include "XrdSys/XrdSysHeaders.hh"
00056 #include "XrdSys/XrdSysTimer.hh"
00057
00058 using namespace XrdCms;
00059
00060
00061
00062
00063
00064 XrdSysMutex XrdCmsProtocol::ProtMutex;
00065 XrdCmsProtocol *XrdCmsProtocol::ProtStack = 0;
00066
00067 int XrdCmsProtocol::readWait = 1000;
00068
00069 XrdCmsParser XrdCmsProtocol::ProtArgs;
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084 extern "C"
00085 {
00086 XrdProtocol *XrdgetProtocol(const char *pname, char *parms,
00087 XrdProtocol_Config *pi)
00088 {
00089
00090
00091 if (Config.doWait < 0) return (XrdProtocol *)0;
00092
00093
00094
00095 Config.PortTCP = pi->NetTCP->Port();
00096 Config.NetTCP = pi->NetTCP;
00097
00098
00099
00100
00101 if (Config.Police) {pi->NetTCP->Secure(Config.Police); Config.Police = 0;}
00102
00103
00104
00105 if (Config.Configure2()) return (XrdProtocol *)0;
00106
00107
00108
00109 return (XrdProtocol *)new XrdCmsProtocol();
00110 }
00111 }
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124 extern "C"
00125 {
00126 int XrdgetProtocolPort(const char *pname, char *parms,
00127 XrdProtocol_Config *pi)
00128 {
00129 static int thePort = -1;
00130 char *cfn = pi->ConfigFN, buff[128];
00131
00132
00133
00134 if (thePort >= 0)
00135 {if (pi->Port && pi->Port != thePort)
00136 {sprintf(buff, "%d disallowed; only using port %d",pi->Port,thePort);
00137 Say.Emsg("Config", "Alternate port", buff);
00138 }
00139 return thePort;
00140 }
00141
00142
00143
00144 Say.logger(pi->eDest->logger(0));
00145 Config.myName = strdup(pi->myName);
00146 Config.PortTCP = (pi->Port < 0 ? 0 : pi->Port);
00147 Config.myInsName = strdup(pi->myInst);
00148 Config.myProg = strdup(pi->myProg);
00149 Sched = pi->Sched;
00150 if (pi->DebugON) Trace.What = TRACE_ALL;
00151 memcpy(&Config.myAddr, pi->myAddr, sizeof(struct sockaddr));
00152
00153
00154
00155 if (parms)
00156 {while(*parms == ' ') parms++;
00157 if (*parms)
00158 {char *pp = parms;
00159 while(*parms != ' ' && *parms) parms++;
00160 cfn = pp;
00161 }
00162 }
00163
00164
00165
00166 Say.Say("Copr. 2007 Stanford University/SLAC cmsd.");
00167
00168
00169
00170 if (cfn) cfn = strdup(cfn);
00171 if (Config.Configure1(pi->argc, pi->argv, cfn))
00172 {Config.doWait = -1; return 0;}
00173
00174
00175
00176 thePort = Config.PortTCP;
00177 return thePort;
00178 }
00179 }
00180
00181
00182
00183
00184
00185 int XrdCmsProtocol::Execute(XrdCmsRRData &Arg)
00186 {
00187 EPNAME("Execute");
00188 static kXR_unt32 theDelay = htonl(Config.SUPDelay);
00189 XrdCmsRouter::NodeMethod_t Method;
00190 const char *etxt;
00191
00192
00193
00194 if (CmsState.Suspended && Arg.Routing & XrdCmsRouting::Delayable)
00195 {Reply_Delay(Arg, theDelay); return 0;}
00196
00197
00198
00199
00200 if (!(Method = Router.getMethod(Arg.Request.rrCode)))
00201 Say.Emsg("Protocol", "invalid request code from", myNode->Ident);
00202 else if ((etxt = (myNode->*Method)(Arg)))
00203 if (*etxt == '!')
00204 {DEBUGR(etxt+1 <<" delayed " <<Arg.waitVal <<" seconds");
00205 return -EINPROGRESS;
00206 } else if (*etxt == '.') return -ECONNABORTED;
00207 else Reply_Error(Arg, kYR_EINVAL, etxt);
00208 else if (Arg.Routing & XrdCmsRouting::Forward && Cluster.NodeCnt
00209 && !(Arg.Request.modifier & kYR_dnf)) Reissue(Arg);
00210 return 0;
00211 }
00212
00213
00214
00215
00216
00217 XrdProtocol *XrdCmsProtocol::Match(XrdLink *lp)
00218 {
00219 CmsRRHdr Hdr;
00220 int dlen;
00221
00222
00223
00224 if ((dlen = lp->Peek((char *)&Hdr,sizeof(Hdr),readWait)) != sizeof(Hdr))
00225 {if (dlen <= 0) lp->setEtext("login not received");
00226 return (XrdProtocol *)0;
00227 }
00228
00229
00230
00231 if (Hdr.streamid || Hdr.rrCode != kYR_login)
00232 {if (!strncmp((char *)&Hdr, "login ", 6))
00233 lp->setEtext("protocol version 1 unsupported");
00234 return (XrdProtocol *)0;
00235 }
00236
00237
00238
00239 return (XrdProtocol *)XrdCmsProtocol::Alloc();
00240 }
00241
00242
00243
00244
00245
00246
00247
00248 void XrdCmsProtocol::Pander(const char *manager, int mport)
00249 {
00250 EPNAME("Pander");
00251
00252 CmsLoginData Data, loginData;
00253 unsigned int Mode, Role = 0;
00254 int Lvl=0, Netopts=0, waits=6, tries=6, fails=0, xport=mport;
00255 int rc, fsUtil, KickedOut, myNID = ManTree.Register();
00256 int chk4Suspend = XrdCmsState::All_Suspend, TimeOut = Config.AskPing*1000;
00257 char manbuff[256];
00258 const char *Reason = 0, *manp = manager;
00259 const int manblen = sizeof(manbuff);
00260
00261
00262
00263 DEBUG(myRole <<" services to " <<manager <<':' <<mport);
00264
00265
00266
00267 loginData.SID = (kXR_char *)Config.mySID;
00268 loginData.Paths = (kXR_char *)Config.myPaths;
00269 loginData.sPort = Config.PortTCP;
00270 loginData.fsNum = Meter.numFS();
00271 loginData.tSpace= Meter.TotalSpace(loginData.mSpace);
00272
00273 loginData.Version = kYR_Version;
00274 loginData.HoldTime= static_cast<int>(getpid());
00275 loginData.Mode = 0;
00276 loginData.Size = 0;
00277
00278
00279
00280 Routing = (Config.asManager() || Config.asPeer() ? &supVOps : &srvVOps);
00281
00282
00283
00284 if (Config.asPeer()) Role = CmsLoginData::kYR_peer;
00285 else if (Config.asManager()) Role = CmsLoginData::kYR_manager;
00286 else Role = CmsLoginData::kYR_server;
00287 if (Config.asProxy()) Role |= CmsLoginData::kYR_proxy;
00288
00289
00290
00291
00292 if (Role == CmsLoginData::kYR_server)
00293 {if (!Config.DiskSS) Role |= CmsLoginData::kYR_nostage;}
00294 else chk4Suspend = XrdCmsState::FES_Suspend;
00295
00296
00297
00298 do {if (Config.doWait && chk4Suspend)
00299 while(CmsState.Suspended & chk4Suspend)
00300 {if (!waits--)
00301 {Say.Emsg("Pander", "Suspend state still active."); waits=6;}
00302 XrdSysTimer::Snooze(12);
00303 }
00304
00305 if (!ManTree.Trying(myNID, Lvl) && Lvl)
00306 {DEBUG("restarting at root node " <<manager <<':' <<mport);
00307 manp = manager; xport = mport; Lvl = 0;
00308 }
00309
00310 DEBUG("trying to connect to lvl " <<Lvl <<' ' <<manp <<':' <<xport);
00311
00312 if (!(Link = Config.NetTCP->Connect(manp, xport, Netopts)))
00313 {if (tries--) Netopts = XRDNET_NOEMSG;
00314 else {tries = 6; Netopts = 0;}
00315 if ((Lvl = myMans.Next(xport,manbuff,manblen)))
00316 {XrdSysTimer::Snooze(3); manp = manbuff;}
00317 else {if (manp != manager) fails++;
00318 XrdSysTimer::Snooze(6); manp = manager; xport = mport;
00319 }
00320 continue;
00321 }
00322 Netopts = 0; tries = waits = 6;
00323
00324
00325
00326 if (!(myNode = Manager.Add(Link, Lvl+1)))
00327 {Say.Emsg("Pander", "Unable to obtain node object.");
00328 Link->Close(); XrdSysTimer::Snooze(15); continue;
00329 }
00330
00331
00332
00333 Mode = Role
00334 | (CmsState.Suspended ? CmsLoginData::kYR_suspend : 0)
00335 | (CmsState.NoStaging ? CmsLoginData::kYR_nostage : 0);
00336 if (fails >= 6 && manp == manager)
00337 {fails = 0; Mode |= CmsLoginData::kYR_trying;}
00338
00339
00340
00341 loginData.fSpace= Meter.FreeSpace(fsUtil);
00342 loginData.fsUtil= static_cast<kXR_unt16>(fsUtil);
00343 KickedOut = 0; loginData.dPort = CmsState.Port();
00344 Data = loginData; Data.Mode = Mode;
00345 if (!(rc = XrdCmsLogin::Login(Link, Data, TimeOut)))
00346 {if(!ManTree.Connect(myNID, myNode)) KickedOut = 1;
00347 else {Say.Emsg("Protocol", "Logged into", Link->Name());
00348 Reason = Dispatch(isUp, TimeOut, 2);
00349 rc = 0;
00350 loginData.fSpace= Meter.FreeSpace(fsUtil);
00351 loginData.fsUtil= static_cast<kXR_unt16>(fsUtil);
00352 }
00353 }
00354
00355
00356
00357 Manager.Remove(myNode, (rc == kYR_redirect ? "redirected"
00358 : (Reason ? Reason : "lost connection")));
00359 ManTree.Disc(myNID);
00360 Link->Close();
00361 delete myNode; myNode = 0;
00362
00363
00364
00365 if (rc == kYR_redirect)
00366 {struct sockaddr netaddr;
00367 XrdOucTokenizer hList((char *)Data.Paths);
00368 unsigned int ipaddr;
00369 char *hP;
00370 Link->Name(&netaddr);
00371 ipaddr = XrdNetDNS::IPAddr(&netaddr);
00372 myMans.Del(ipaddr);
00373 while((hP = hList.GetToken()))
00374 myMans.Add(ipaddr, hP, Config.PortTCP, Lvl+1);
00375 free(Data.Paths);
00376 }
00377
00378
00379
00380 if (!KickedOut && (Lvl = myMans.Next(xport,manbuff,manblen)))
00381 {manp = manbuff; continue;}
00382 XrdSysTimer::Snooze(9); Lvl = 0;
00383 if (manp != manager) fails++;
00384 manp = manager; xport = mport;
00385 } while(1);
00386 }
00387
00388
00389
00390
00391
00392
00393
00394
00395 int XrdCmsProtocol::Process(XrdLink *lp)
00396 {
00397 const char *Reason;
00398 Bearing myWay;
00399 int tOut;
00400
00401
00402
00403 Link = lp;
00404 if ((Routing=Admit()))
00405 {loggedIn = 1;
00406 if (RSlot) {myWay = isLateral; tOut = -1;}
00407 else {myWay = isDown; tOut = Config.AskPing*1000;}
00408 myNode->UnLock();
00409 if ((Reason = Dispatch(myWay, tOut, 2))) lp->setEtext(Reason);
00410 myNode->Lock();
00411 }
00412
00413
00414
00415 lp->Serialize();
00416
00417
00418
00419 if (RSlot)
00420 {RTable.Del(myNode); RSlot = 0;
00421 myNode->UnLock(); delete myNode; myNode = 0;
00422 return -1;
00423 }
00424
00425
00426
00427
00428
00429 if (myNode)
00430 {myNode->isConn = 0;
00431 if (myNode->isBound) Cluster.Remove(0, myNode, !loggedIn);
00432 else if (myNode->isGone) delete myNode;
00433 else myNode->UnLock();
00434 }
00435
00436
00437
00438 return -1;
00439 }
00440
00441
00442
00443
00444
00445 void XrdCmsProtocol::Recycle(XrdLink *lp, int consec, const char *reason)
00446 {
00447 if (loggedIn)
00448 if (reason) Say.Emsg("Protocol", lp->ID, "logged out;", reason);
00449 else Say.Emsg("Protocol", lp->ID, "logged out.");
00450 else
00451 if (reason) Say.Emsg("Protocol", lp->ID, "login failed;", reason);
00452
00453 ProtMutex.Lock();
00454 ProtLink = ProtStack;
00455 ProtStack = this;
00456 ProtMutex.UnLock();
00457 }
00458
00459
00460
00461
00462
00463
00464
00465
00466 XrdCmsRouting *XrdCmsProtocol::Admit()
00467 {
00468 EPNAME("Admit");
00469 char myBuff[1024];
00470 XrdCmsLogin Source(myBuff, sizeof(myBuff));
00471 CmsLoginData Data;
00472 const char *Reason;
00473 SMask_t newmask, servset(0);
00474 int addedp = 0, Status = 0, isMan = 0, isPeer = 0, isProxy = 0, isServ = 0;
00475 int wasSuspended = 0;
00476
00477
00478
00479 Data.Mode = 0;
00480 if (Trace.What & TRACE_Debug) Data.Mode |= CmsLoginData::kYR_debug;
00481 if (CmsState.Suspended) {Data.Mode |= CmsLoginData::kYR_suspend;
00482 wasSuspended = 1;
00483 }
00484 Data.HoldTime = Config.LUPHold;
00485
00486
00487
00488 if (!Source.Admit(Link, Data)) return 0;
00489
00490
00491
00492 if (Data.Mode & CmsLoginData::kYR_director)
00493 {Link->setID("redirector", Data.HoldTime);
00494 return Admit_Redirector(wasSuspended);
00495 }
00496
00497
00498
00499 if (Config.asSolo())
00500 return Login_Failed("configuration disallows subscribers");
00501
00502
00503
00504 if ((isMan = Data.Mode & CmsLoginData::kYR_manager))
00505 {Status = CMS_isMan;
00506 if ((isPeer = Data.Mode & CmsLoginData::kYR_peer))
00507 {Status |= CMS_isPeer; myRole = "manager";}
00508 else myRole = "supervisor";
00509 }
00510 else if ((isServ = Data.Mode & CmsLoginData::kYR_server))
00511 if ((isProxy= Data.Mode & CmsLoginData::kYR_proxy))
00512 {Status = CMS_isProxy; myRole = "proxy_srvr";}
00513 else myRole = "server";
00514 else if ((isPeer = Data.Mode & CmsLoginData::kYR_peer))
00515 {Status |= CMS_isPeer;
00516 myRole = (CmsLoginData::kYR_proxy ? "peer" : "peer_proxy");
00517 }
00518 else return Login_Failed("invalid login role");
00519
00520
00521
00522 Link->setID(myRole, Data.HoldTime);
00523
00524
00525
00526 Reason = 0;
00527 if (Config.asServer())
00528 {if (Config.asProxy() && (!isProxy || isPeer))
00529 Reason = "configuration only allows proxies";
00530 else if (!isServ)
00531 Reason = "configuration disallows peers and proxies";
00532 } else {
00533 if (Config.asProxy() && isServ)
00534 Reason = "configuration only allows peers or proxies";
00535 else if (isProxy)
00536 Reason = "configuration disallows proxies";
00537 }
00538 if (Reason) return Login_Failed(Reason);
00539
00540
00541
00542 if (Data.Mode & CmsLoginData::kYR_nostage) Status |= CMS_noStage;
00543 if (Data.Mode & CmsLoginData::kYR_suspend) Status |= CMS_Suspend;
00544
00545
00546
00547 if (Data.Mode & CmsLoginData::kYR_trying)
00548 Say.Emsg("Protocol",Link->Name(),"has not yet found a cluster slot!");
00549
00550
00551
00552
00553 if (!(myNode = Cluster.Add(Link, Data.dPort, Status, Data.sPort,
00554 (const char *)Data.SID)))
00555 return (XrdCmsRouting *)0;
00556
00557
00558
00559 DEBUG(Link->Name() <<" TSpace=" <<Data.tSpace <<"GB NumFS=" <<Data.fsNum
00560 <<" FSpace=" <<Data.fSpace <<"MB MinFR=" <<Data.mSpace
00561 <<"MB Util=" <<Data.fsUtil);
00562 myNode->DiskTotal = Data.tSpace;
00563 myNode->DiskMinF = Data.mSpace;
00564 myNode->DiskFree = Data.fSpace;
00565 myNode->DiskNums = Data.fsNum;
00566 myNode->DiskUtil = Data.fsUtil;
00567 Meter.setVirtUpdt();
00568
00569
00570
00571 if (Data.Paths && *Data.Paths)
00572 {XrdOucTokenizer thePaths((char *)Data.Paths);
00573 char *tp, *pp;
00574 ConfigCheck(Data.Paths);
00575 while((tp = thePaths.GetLine()))
00576 {DEBUG(Link->Name() <<" adding path: " <<tp);
00577 if (!(tp = thePaths.GetToken())
00578 || !(pp = thePaths.GetToken())) break;
00579 if (!(newmask = AddPath(myNode, tp, pp)))
00580 return Login_Failed("invalid exported path");
00581 servset |= newmask;
00582 addedp= 1;
00583 }
00584 }
00585
00586
00587
00588 if (!addedp)
00589 {XrdCmsPInfo pinfo;
00590 ConfigCheck(0);
00591 pinfo.rovec = myNode->Mask();
00592 if (myNode->isPeer) pinfo.ssvec = myNode->Mask();
00593 servset = Cache.Paths.Insert("/", &pinfo);
00594 Say.Emsg("Protocol", myNode->Ident, "defaulted r /");
00595 }
00596
00597
00598
00599
00600
00601 Cluster.ResetRef(servset);
00602 if (Config.asManager()) {Manager.Reset(); myNode->SyncSpace();}
00603 myNode->isDisable = 0;
00604
00605
00606
00607 Say.Emsg("Protocol", myNode->Ident,
00608 (myNode->isSuspend ? "logged in suspended." : "logged in."));
00609
00610
00611
00612 return &rspVOps;
00613 }
00614
00615
00616
00617
00618
00619 XrdCmsRouting *XrdCmsProtocol::Admit_Redirector(int wasSuspended)
00620 {
00621 EPNAME("Admit_Redirector");
00622 static CmsStatusRequest newState
00623 = {{0, kYR_status, CmsStatusRequest::kYR_Resume, 0}};
00624
00625
00626
00627 myRole = "redirector";
00628
00629
00630
00631
00632 myNode = new XrdCmsNode(Link); myNode->Lock();
00633 if (!(RSlot = RTable.Add(myNode)))
00634 {Say.Emsg("Protocol",myNode->Ident,"login failed; too many redirectors.");
00635 return 0;
00636 } else myNode->setSlot(RSlot);
00637
00638
00639
00640
00641 if (wasSuspended && !CmsState.Suspended)
00642 myNode->Send((char *)&newState, sizeof(newState));
00643
00644
00645
00646 Say.Emsg("Protocol", myNode->Ident, "logged in.");
00647 DEBUG(myNode->Ident <<" assigned slot " <<RSlot);
00648 return &rdrVOps;
00649 }
00650
00651
00652
00653
00654
00655 SMask_t XrdCmsProtocol::AddPath(XrdCmsNode *nP,
00656 const char *pType, const char *Path)
00657 {
00658 XrdCmsPInfo pinfo;
00659
00660
00661
00662 while(*pType)
00663 { if ('r' == *pType) pinfo.rovec = nP->Mask();
00664 else if ('w' == *pType) pinfo.rovec = pinfo.rwvec = nP->Mask();
00665 else if ('s' == *pType) pinfo.rovec = pinfo.ssvec = nP->Mask();
00666 else return 0;
00667 pType++;
00668 }
00669
00670
00671
00672 nP->isRW = (pinfo.rwvec ? XrdCmsNode::allowsRW : 0)
00673 | (pinfo.ssvec ? XrdCmsNode::allowsSS : 0);
00674
00675
00676
00677 return Cache.Paths.Insert(Path, &pinfo);
00678 }
00679
00680
00681
00682
00683
00684 XrdCmsProtocol *XrdCmsProtocol::Alloc(const char *theRole,
00685 const char *theMan,
00686 int thePort)
00687 {
00688 XrdCmsProtocol *xp;
00689
00690
00691
00692 ProtMutex.Lock();
00693 if ((xp = ProtStack)) ProtStack = xp->ProtLink;
00694 else xp = new XrdCmsProtocol();
00695 ProtMutex.UnLock();
00696
00697
00698
00699 if (!xp) Say.Emsg("Protocol","No more protocol objects.");
00700 else {xp->myRole = theRole;
00701 xp->myMan = theMan;
00702 xp->myManPort = thePort;
00703 xp->loggedIn = 0;
00704 }
00705
00706
00707
00708 return xp;
00709 }
00710
00711
00712
00713
00714
00715 void XrdCmsProtocol::ConfigCheck(unsigned char *theConfig)
00716 {
00717 unsigned int ConfigID;
00718 int tmp;
00719
00720
00721
00722 if (!theConfig) ConfigID = 1;
00723 else ConfigID = XrdOucCRC::CRC32(theConfig, strlen((char *)theConfig));
00724
00725
00726
00727 if (ConfigID != myNode->ConfigID)
00728 {if (myNode->ConfigID) Say.Emsg("Protocol",Link->Name(),"reconfigured.");
00729 Cache.Paths.Remove(myNode->Mask());
00730 Cache.Bounce(myNode->Mask(), myNode->ID(tmp));
00731 myNode->ConfigID = ConfigID;
00732 }
00733 }
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745 const char *XrdCmsProtocol::Dispatch(Bearing cDir, int maxWait, int maxTries)
00746 {
00747 EPNAME("Dispatch");
00748 static const int ReqSize = sizeof(CmsRRHdr);
00749 static CmsPingRequest Ping = {{0, kYR_ping, 0, 0}};
00750 XrdCmsRRData *Data = XrdCmsRRData::Objectify();
00751 XrdCmsJob *jp;
00752 const char *toRC = (cDir == isUp ? "manager not active"
00753 : "server not responding");
00754 const char *myArgs, *myArgt;
00755 char buff[8];
00756 int rc, toLeft = maxTries;
00757
00758
00759
00760 Link->Bind(XrdSysThread::ID());
00761
00762
00763
00764 do{if ((rc = Link->RecvAll((char *)&Data->Request, ReqSize, maxWait)) < 0)
00765 {if (rc != -ETIMEDOUT) return "request read failed";
00766 if (!toLeft--) return toRC;
00767 if (cDir == isDown && Link->Send((char *)&Ping, sizeof(Ping)) < 0)
00768 return "server unreachable";
00769 continue;
00770 }
00771
00772
00773
00774 toLeft = maxTries;
00775 Data->Dlen = static_cast<int>(ntohs(Data->Request.datalen));
00776 if ((QTRACE(Debug))
00777 && Data->Request.rrCode != kYR_ping && Data->Request.rrCode != kYR_pong)
00778 DEBUG(myNode->Ident <<" for " <<Router.getName(Data->Request.rrCode)
00779 <<" dlen=" <<Data->Dlen);
00780 if (!(Data->Dlen)) {myArgs = myArgt = 0;}
00781 else {if (Data->Dlen > maxReqSize)
00782 {Say.Emsg("Protocol","Request args too long from",Link->Name());
00783 return "protocol error";
00784 }
00785 if ((!Data->Buff || Data->Blen < Data->Dlen)
00786 && !Data->getBuff(Data->Dlen))
00787 {Say.Emsg("Protocol", "No buffers to serve", Link->Name());
00788 return "insufficient buffers";
00789 }
00790 if ((rc = Link->RecvAll(Data->Buff, Data->Dlen, maxWait)) < 0)
00791 return (rc == -ETIMEDOUT ? "read timed out" : "read failed");
00792 myArgs = Data->Buff; myArgt = Data->Buff + Data->Dlen;
00793 }
00794
00795
00796
00797 if (!(Data->Routing = Routing->getRoute(int(Data->Request.rrCode))))
00798 {sprintf(buff, "%d", Data->Request.rrCode);
00799 Say.Emsg("Protocol",Link->Name(),"sent an invalid request -", buff);
00800 continue;
00801 }
00802
00803
00804
00805 if (!(Data->Routing & XrdCmsRouting::noArgs))
00806 {if (Data->Request.modifier & kYR_raw)
00807 {Data->Path = Data->Buff; Data->PathLen = Data->Dlen;}
00808 else if (!myArgs
00809 || !ProtArgs.Parse(int(Data->Request.rrCode),myArgs,myArgt,Data))
00810 {Reply_Error(*Data, kYR_EINVAL, "badly formed request");
00811 continue;
00812 }
00813 }
00814
00815
00816
00817 if (!(Data->Ident) || !(*Data->Ident)) Data->Ident = myNode->Ident;
00818
00819
00820
00821
00822 if (Data->Routing & XrdCmsRouting::isSync)
00823 {if ((rc = Execute(*Data)) && rc == -ECONNABORTED) return "redirected";}
00824 else if ((jp = XrdCmsJob::Alloc(this, Data)))
00825 {Sched->Schedule((XrdJob *)jp);
00826 Data = XrdCmsRRData::Objectify();
00827 }
00828 else Say.Emsg("Protocol", "No jobs to serve", Link->Name());
00829 } while(1);
00830
00831
00832
00833 return "logic error";
00834 }
00835
00836
00837
00838
00839
00840
00841
00842 void XrdCmsProtocol::DoIt()
00843 {
00844
00845
00846
00847 if (myRole) Pander(myMan, myManPort);
00848 }
00849
00850
00851
00852
00853
00854 XrdCmsRouting *XrdCmsProtocol::Login_Failed(const char *reason)
00855 {
00856 Link->setEtext(reason);
00857 return (XrdCmsRouting *)0;
00858 }
00859
00860
00861
00862
00863
00864 void XrdCmsProtocol::Reissue(XrdCmsRRData &Data)
00865 {
00866 EPNAME("Resisue");
00867 XrdCmsPInfo pinfo;
00868 SMask_t amask;
00869 struct iovec ioB[2] = {{(char *)&Data.Request, sizeof(Data.Request)},
00870 { Data.Buff, Data.Dlen}
00871 };
00872
00873
00874
00875 if (!((Data.Request.modifier += kYR_hopincr) & kYR_hopcount))
00876 {Say.Emsg("Job", Router.getName(Data.Request.rrCode),
00877 "msg TTL exceeded for", Data.Path);
00878 return;
00879 }
00880
00881
00882
00883 Data.Request.streamid = 0;
00884
00885
00886
00887 if (!Cache.Paths.Find(Data.Path, pinfo)
00888 || (amask = pinfo.rwvec | pinfo.rovec) == 0)
00889 {Say.Emsg("Job", Router.getName(Data.Request.rrCode),
00890 "aborted; no servers handling", Data.Path);
00891 return;
00892 }
00893
00894
00895
00896 DEBUG("FWD " <<Router.getName(Data.Request.rrCode) <<' ' <<Data.Path);
00897
00898
00899
00900 Cluster.Broadcast(amask, ioB, 2, sizeof(Data.Request)+Data.Dlen);
00901 }
00902
00903
00904
00905
00906
00907 void XrdCmsProtocol::Reply_Delay(XrdCmsRRData &Data, kXR_unt32 theDelay)
00908 {
00909 EPNAME("Reply_Delay");
00910 const char *act;
00911
00912 if (Data.Request.streamid && (Data.Routing & XrdCmsRouting::Repliable))
00913 {CmsResponse Resp = {{Data.Request.streamid, kYR_wait, 0,
00914 htons(sizeof(kXR_unt32))}, theDelay};
00915 act = " sent";
00916 Link->Send((char *)&Resp, sizeof(Resp));
00917 } else act = " skip";
00918
00919 DEBUG(myNode->Ident <<act <<" delay " <<ntohl(theDelay));
00920 }
00921
00922
00923
00924
00925
00926 void XrdCmsProtocol::Reply_Error(XrdCmsRRData &Data, int ecode, const char *etext)
00927 {
00928 EPNAME("Reply_Error");
00929 const char *act;
00930 int n = strlen(etext)+1;
00931
00932 if (Data.Request.streamid && (Data.Routing & XrdCmsRouting::Repliable))
00933 {CmsResponse Resp = {{Data.Request.streamid, kYR_error, 0,
00934 htons(sizeof(kXR_unt32)+n)},
00935 htonl(static_cast<unsigned int>(ecode))};
00936 struct iovec ioV[2] = {{(char *)&Resp, sizeof(Resp)},
00937 {(char *)etext, n}};
00938 act = " sent";
00939 Link->Send(ioV, 2);
00940 } else act = " skip";
00941
00942 DEBUG(myNode->Ident <<act <<" err " <<ecode <<' ' <<etext);
00943 }