XrdCmsProtocol.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                     X r d C m s P r o t o c o l . c c                      */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //       $Id: XrdCmsProtocol.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 // Original Version: 1.7 2007/07/31 02:25:15 abh
00014 
00015 const char *XrdCmsProtocolCVSID = "$Id: XrdCmsProtocol.cc 35287 2010-09-14 21:19:35Z ganis $";
00016  
00017   
00018 #include <unistd.h>
00019 #include <ctype.h>
00020 #include <errno.h>
00021 #include <signal.h>
00022 #include <stdlib.h>
00023 #include <string.h>
00024 #include <strings.h>
00025 #include <stdio.h>
00026 #include <netinet/in.h>
00027 #include <sys/param.h>
00028 
00029 #include "XProtocol/YProtocol.hh"
00030 
00031 #include "Xrd/XrdInet.hh"
00032 #include "Xrd/XrdLink.hh"
00033 
00034 #include "XrdCms/XrdCmsCache.hh"
00035 #include "XrdCms/XrdCmsCluster.hh"
00036 #include "XrdCms/XrdCmsConfig.hh"
00037 #include "XrdCms/XrdCmsJob.hh"
00038 #include "XrdCms/XrdCmsLogin.hh"
00039 #include "XrdCms/XrdCmsManager.hh"
00040 #include "XrdCms/XrdCmsManTree.hh"
00041 #include "XrdCms/XrdCmsMeter.hh"
00042 #include "XrdCms/XrdCmsProtocol.hh"
00043 #include "XrdCms/XrdCmsRouting.hh"
00044 #include "XrdCms/XrdCmsRTable.hh"
00045 #include "XrdCms/XrdCmsState.hh"
00046 #include "XrdCms/XrdCmsTrace.hh"
00047 
00048 #include "XrdNet/XrdNetDNS.hh"
00049 
00050 #include "XrdOuc/XrdOucCRC.hh"
00051 #include "XrdOuc/XrdOucPup.hh"
00052 #include "XrdOuc/XrdOucTokenizer.hh"
00053 
00054 #include "XrdSys/XrdSysError.hh"
00055 #include "XrdSys/XrdSysHeaders.hh"
00056 #include "XrdSys/XrdSysTimer.hh"
00057 
00058 using namespace XrdCms;
00059   
00060 /******************************************************************************/
00061 /*                        G l o b a l   O b j e c t s                         */
00062 /******************************************************************************/
00063 
00064        XrdSysMutex      XrdCmsProtocol::ProtMutex;
00065        XrdCmsProtocol  *XrdCmsProtocol::ProtStack = 0;
00066 
00067        int              XrdCmsProtocol::readWait  = 1000;
00068 
00069        XrdCmsParser     XrdCmsProtocol::ProtArgs;
00070 
00071 /******************************************************************************/
00072 /*                       P r o t o c o l   L o a d e r                        */
00073 /*                        X r d g e t P r o t o c o l                         */
00074 /******************************************************************************/
00075   
00076 // This protocol can live in a shared library. It can also be statically linked
00077 // to provide a default protocol (which, for cms protocol we do). The interface
00078 // below is used by Xrd to obtain a copy of the protocol object that can be
00079 // used to decide whether or not a link is talking our particular protocol.
00080 // Phase 1 initialization occured on the call to XrdgetProtocolPort(). At this
00081 // point a network interface is defined and we can complete initialization.
00082 //
00083 
00084 extern "C"
00085 {
00086 XrdProtocol *XrdgetProtocol(const char *pname, char *parms,
00087                             XrdProtocol_Config *pi)
00088 {
00089 // If we failed in Phase 1 initialization, immediately fail Phase 2.
00090 //
00091    if (Config.doWait < 0) return (XrdProtocol *)0;
00092 
00093 // Initialize the network interface and get the actual port number assigned
00094 //
00095    Config.PortTCP = pi->NetTCP->Port();
00096    Config.NetTCP  = pi->NetTCP;
00097 
00098 // If we have a connection allow list, add it to the network object. Note that
00099 // we clear the address because the object is lost in the add process.
00100 //
00101    if (Config.Police) {pi->NetTCP->Secure(Config.Police); Config.Police = 0;}
00102 
00103 // Complete initialization and upon success return a protocol object
00104 //
00105    if (Config.Configure2()) return (XrdProtocol *)0;
00106 
00107 // Return a new instance of this object
00108 //
00109    return (XrdProtocol *)new XrdCmsProtocol();
00110 }
00111 }
00112 
00113 /******************************************************************************/
00114 /*           P r o t o c o l   P o r t   D e t e r m i n a t i o n            */
00115 /*                    X r d g e t P r o t o c o l P o r t                     */
00116 /******************************************************************************/
00117   
00118 // Because the dcm port numbers are determined dynamically based on the role the
00119 // dcm plays, we need to process the configration file and return the right
00120 // port number if it differs from the one provided by the protocol driver. Only
00121 // one port instance of the cmsd protocol is allowed.
00122 //
00123 
00124 extern "C"
00125 {
00126 int XrdgetProtocolPort(const char *pname, char *parms,
00127                        XrdProtocol_Config *pi)
00128 {
00129    static int thePort = -1;
00130    char *cfn = pi->ConfigFN, buff[128];
00131 
00132 // Check if we have been here before
00133 //
00134    if (thePort >= 0)
00135       {if (pi->Port && pi->Port != thePort)
00136           {sprintf(buff, "%d disallowed; only using port %d",pi->Port,thePort);
00137            Say.Emsg("Config", "Alternate port", buff);
00138           }
00139        return thePort;
00140       }
00141 
00142 // Initialize the error message handler and some default values
00143 //
00144    Say.logger(pi->eDest->logger(0));
00145    Config.myName    = strdup(pi->myName);
00146    Config.PortTCP   = (pi->Port < 0 ? 0 : pi->Port);
00147    Config.myInsName = strdup(pi->myInst);
00148    Config.myProg    = strdup(pi->myProg);
00149    Sched            = pi->Sched;
00150    if (pi->DebugON) Trace.What = TRACE_ALL;
00151    memcpy(&Config.myAddr, pi->myAddr, sizeof(struct sockaddr));
00152 
00153 // The only parameter we accept is the name of an alternate config file
00154 //
00155    if (parms) 
00156       {while(*parms == ' ') parms++;
00157        if (*parms) 
00158           {char *pp = parms;
00159            while(*parms != ' ' && *parms) parms++;
00160            cfn = pp;
00161           }
00162       }
00163 
00164 // Put up the banner
00165 //
00166    Say.Say("Copr.  2007 Stanford University/SLAC cmsd.");
00167 
00168 // Indicate failure if static init fails
00169 //
00170    if (cfn) cfn = strdup(cfn);
00171    if (Config.Configure1(pi->argc, pi->argv, cfn))
00172       {Config.doWait = -1; return 0;}
00173 
00174 // Return the port number to be used
00175 //
00176    thePort = Config.PortTCP;
00177    return thePort;
00178 }
00179 }
00180 
00181 /******************************************************************************/
00182 /*                               E x e c u t e                                */
00183 /******************************************************************************/
00184   
00185 int XrdCmsProtocol::Execute(XrdCmsRRData &Arg)
00186 {
00187    EPNAME("Execute");
00188    static kXR_unt32 theDelay = htonl(Config.SUPDelay);
00189    XrdCmsRouter::NodeMethod_t Method;
00190    const char *etxt;
00191 
00192 // Check if we can continue
00193 //
00194    if (CmsState.Suspended && Arg.Routing & XrdCmsRouting::Delayable)
00195       {Reply_Delay(Arg, theDelay); return 0;}
00196 
00197 // Validate request code and execute the request. If successful, forward the
00198 // request to subscribers of this node if the request is forwardable.
00199 //
00200    if (!(Method = Router.getMethod(Arg.Request.rrCode)))
00201       Say.Emsg("Protocol", "invalid request code from", myNode->Ident);
00202       else if ((etxt = (myNode->*Method)(Arg)))
00203               if (*etxt == '!')
00204                  {DEBUGR(etxt+1 <<" delayed " <<Arg.waitVal <<" seconds");
00205                   return -EINPROGRESS;
00206                  } else if (*etxt == '.') return -ECONNABORTED;
00207                            else Reply_Error(Arg, kYR_EINVAL, etxt);
00208               else if (Arg.Routing & XrdCmsRouting::Forward && Cluster.NodeCnt
00209                    &&  !(Arg.Request.modifier & kYR_dnf)) Reissue(Arg);
00210    return 0;
00211 }
00212 
00213 /******************************************************************************/
00214 /*                                 M a t c h                                  */
00215 /******************************************************************************/
00216 
00217 XrdProtocol *XrdCmsProtocol::Match(XrdLink *lp)
00218 {
00219 CmsRRHdr          Hdr;
00220 int               dlen;
00221 
00222 // Peek at the first few bytes of data (shouldb be all zeroes)
00223 //
00224    if ((dlen = lp->Peek((char *)&Hdr,sizeof(Hdr),readWait)) != sizeof(Hdr))
00225       {if (dlen <= 0) lp->setEtext("login not received");
00226        return (XrdProtocol *)0;
00227       }
00228 
00229 // Verify that this is our protocol and whether a version1 client is here
00230 //
00231    if (Hdr.streamid || Hdr.rrCode != kYR_login)
00232       {if (!strncmp((char *)&Hdr, "login ", 6))
00233           lp->setEtext("protocol version 1 unsupported");
00234        return (XrdProtocol *)0;
00235       }
00236 
00237 // Return the protocol object
00238 //
00239    return (XrdProtocol *)XrdCmsProtocol::Alloc();
00240 }
00241 
00242 /******************************************************************************/
00243 /*                                P a n d e r                                 */
00244 /******************************************************************************/
00245   
00246 // Pander() handles all outgoing connections to a manager/supervisor
00247 
00248 void XrdCmsProtocol::Pander(const char *manager, int mport)
00249 {
00250    EPNAME("Pander");
00251 
00252    CmsLoginData Data, loginData;
00253    unsigned int Mode, Role = 0;
00254    int Lvl=0, Netopts=0, waits=6, tries=6, fails=0, xport=mport;
00255    int rc, fsUtil, KickedOut, myNID = ManTree.Register();
00256    int chk4Suspend = XrdCmsState::All_Suspend, TimeOut = Config.AskPing*1000;
00257    char manbuff[256];
00258    const char *Reason = 0, *manp = manager;
00259    const int manblen = sizeof(manbuff);
00260 
00261 // Do some debugging
00262 //
00263    DEBUG(myRole <<" services to " <<manager <<':' <<mport);
00264 
00265 // Prefill the login data
00266 //
00267    loginData.SID   = (kXR_char *)Config.mySID;
00268    loginData.Paths = (kXR_char *)Config.myPaths;
00269    loginData.sPort = Config.PortTCP;
00270    loginData.fsNum = Meter.numFS();
00271    loginData.tSpace= Meter.TotalSpace(loginData.mSpace);
00272 
00273    loginData.Version = kYR_Version; // These to keep compiler happy
00274    loginData.HoldTime= static_cast<int>(getpid());
00275    loginData.Mode    = 0;
00276    loginData.Size    = 0;
00277 
00278 // Establish request routing based on who we are
00279 //
00280    Routing = (Config.asManager() || Config.asPeer() ? &supVOps : &srvVOps);
00281 
00282 // Compute the Manager's status (this never changes for managers/supervisors)
00283 //
00284    if (Config.asPeer())                Role  = CmsLoginData::kYR_peer;
00285       else if (Config.asManager())     Role  = CmsLoginData::kYR_manager;
00286               else                     Role  = CmsLoginData::kYR_server;
00287    if (Config.asProxy())               Role |= CmsLoginData::kYR_proxy;
00288 
00289 // If we are a simple server, permanently add the nostage option if we are
00290 // not able to stage any files.
00291 //
00292    if (Role == CmsLoginData::kYR_server)
00293       {if (!Config.DiskSS) Role |=  CmsLoginData::kYR_nostage;}
00294       else chk4Suspend = XrdCmsState::FES_Suspend;
00295 
00296 // Keep connecting to our manager. If suspended, wait for a resumption first
00297 //
00298    do {if (Config.doWait && chk4Suspend)
00299           while(CmsState.Suspended & chk4Suspend)
00300                {if (!waits--)
00301                    {Say.Emsg("Pander", "Suspend state still active."); waits=6;}
00302                 XrdSysTimer::Snooze(12);
00303                }
00304 
00305        if (!ManTree.Trying(myNID, Lvl) && Lvl)
00306           {DEBUG("restarting at root node " <<manager <<':' <<mport);
00307            manp = manager; xport = mport; Lvl = 0;
00308           }
00309 
00310        DEBUG("trying to connect to lvl " <<Lvl <<' ' <<manp <<':' <<xport);
00311 
00312        if (!(Link = Config.NetTCP->Connect(manp, xport, Netopts)))
00313           {if (tries--) Netopts = XRDNET_NOEMSG;
00314               else {tries = 6; Netopts = 0;}
00315            if ((Lvl = myMans.Next(xport,manbuff,manblen)))
00316                    {XrdSysTimer::Snooze(3); manp = manbuff;}
00317               else {if (manp != manager) fails++;
00318                     XrdSysTimer::Snooze(6); manp = manager; xport = mport;
00319                    }
00320            continue;
00321           }
00322        Netopts = 0; tries = waits = 6;
00323 
00324        // Obtain a new node object for this connection
00325        //
00326        if (!(myNode = Manager.Add(Link, Lvl+1)))
00327           {Say.Emsg("Pander", "Unable to obtain node object.");
00328            Link->Close(); XrdSysTimer::Snooze(15); continue;
00329           }
00330 
00331       // Compute current login mode
00332       //
00333       Mode = Role
00334            | (CmsState.Suspended ? CmsLoginData::kYR_suspend : 0)
00335            | (CmsState.NoStaging ? CmsLoginData::kYR_nostage : 0);
00336        if (fails >= 6 && manp == manager) 
00337           {fails = 0; Mode |=    CmsLoginData::kYR_trying;}
00338 
00339        // Login this node with the correct state
00340        //
00341        loginData.fSpace= Meter.FreeSpace(fsUtil);
00342        loginData.fsUtil= static_cast<kXR_unt16>(fsUtil);
00343        KickedOut = 0; loginData.dPort = CmsState.Port();
00344        Data = loginData; Data.Mode = Mode;
00345        if (!(rc = XrdCmsLogin::Login(Link, Data, TimeOut)))
00346           {if(!ManTree.Connect(myNID, myNode)) KickedOut = 1;
00347              else {Say.Emsg("Protocol", "Logged into", Link->Name());
00348                    Reason = Dispatch(isUp, TimeOut, 2);
00349                    rc = 0;
00350                    loginData.fSpace= Meter.FreeSpace(fsUtil);
00351                    loginData.fsUtil= static_cast<kXR_unt16>(fsUtil);
00352                   }
00353           }
00354 
00355        // Remove manager from the config
00356        //
00357        Manager.Remove(myNode, (rc == kYR_redirect ? "redirected"
00358                                   : (Reason ? Reason : "lost connection")));
00359        ManTree.Disc(myNID);
00360        Link->Close();
00361        delete myNode; myNode = 0;
00362 
00363        // Check if we should process the redirection
00364        //
00365        if (rc == kYR_redirect)
00366           {struct sockaddr netaddr;
00367            XrdOucTokenizer hList((char *)Data.Paths);
00368            unsigned int ipaddr;
00369            char *hP;
00370            Link->Name(&netaddr);
00371            ipaddr = XrdNetDNS::IPAddr(&netaddr);
00372            myMans.Del(ipaddr);
00373            while((hP = hList.GetToken()))
00374                  myMans.Add(ipaddr, hP, Config.PortTCP, Lvl+1);
00375            free(Data.Paths);
00376           }
00377 
00378        // Cycle on to the next manager if we have one or snooze and try over
00379        //
00380        if (!KickedOut && (Lvl = myMans.Next(xport,manbuff,manblen)))
00381           {manp = manbuff; continue;}
00382        XrdSysTimer::Snooze(9); Lvl = 0;
00383        if (manp != manager) fails++;
00384        manp = manager; xport = mport;
00385       } while(1);
00386 }
00387   
00388 /******************************************************************************/
00389 /*                               P r o c e s s                                */
00390 /******************************************************************************/
00391   
00392 // Process is called only when we get a new connection. We only return when
00393 // the connection drops.
00394 //
00395 int XrdCmsProtocol::Process(XrdLink *lp)
00396 {
00397    const char *Reason;
00398    Bearing     myWay;
00399    int         tOut;
00400 
00401 // Now admit the login
00402 //
00403    Link = lp;
00404    if ((Routing=Admit()))
00405       {loggedIn = 1;
00406        if (RSlot) {myWay = isLateral; tOut = -1;}
00407           else    {myWay = isDown;    tOut = Config.AskPing*1000;}
00408        myNode->UnLock();
00409        if ((Reason = Dispatch(myWay, tOut, 2))) lp->setEtext(Reason);
00410        myNode->Lock();
00411       }
00412 
00413 // Serialize all activity on the link before we proceed
00414 //
00415    lp->Serialize();
00416 
00417 // Immediately terminate redirectors (they have an Rslot).
00418 //
00419    if (RSlot)
00420       {RTable.Del(myNode); RSlot  = 0;
00421        myNode->UnLock(); delete myNode; myNode = 0;
00422        return -1;
00423       }
00424 
00425 // We have a node that may or may not be in the cluster at this point, or may
00426 // need to remain in the cluster as a shadow member. In any case, the node
00427 // object lock will be released either by Remove() or the destructor.
00428 //
00429    if (myNode)
00430       {myNode->isConn = 0;
00431        if (myNode->isBound) Cluster.Remove(0, myNode, !loggedIn);
00432           else if (myNode->isGone) delete myNode;
00433               else myNode->UnLock();
00434       }
00435 
00436 // All done indicate the connection is dead
00437 //
00438    return -1;
00439 }
00440 
00441 /******************************************************************************/
00442 /*                               R e c y c l e                                */
00443 /******************************************************************************/
00444 
00445 void XrdCmsProtocol::Recycle(XrdLink *lp, int consec, const char *reason)
00446 {
00447    if (loggedIn) 
00448       if (reason) Say.Emsg("Protocol", lp->ID, "logged out;",   reason);
00449          else     Say.Emsg("Protocol", lp->ID, "logged out.");
00450       else     
00451       if (reason) Say.Emsg("Protocol", lp->ID, "login failed;", reason);
00452 
00453    ProtMutex.Lock();
00454    ProtLink  = ProtStack;
00455    ProtStack = this;
00456    ProtMutex.UnLock();
00457 }
00458   
00459 /******************************************************************************/
00460 /*                       P r i v a t e   M e t h o d s                        */
00461 /******************************************************************************/
00462 /******************************************************************************/
00463 /*                                 A d m i t                                  */
00464 /******************************************************************************/
00465   
00466 XrdCmsRouting *XrdCmsProtocol::Admit()
00467 {
00468    EPNAME("Admit");
00469    char         myBuff[1024];
00470    XrdCmsLogin  Source(myBuff, sizeof(myBuff));
00471    CmsLoginData Data;
00472    const char  *Reason;
00473    SMask_t      newmask, servset(0);
00474    int addedp = 0, Status = 0, isMan = 0, isPeer = 0, isProxy = 0, isServ = 0;
00475    int wasSuspended = 0;
00476 
00477 // Establish outgoing mode
00478 //
00479    Data.Mode = 0;
00480    if (Trace.What & TRACE_Debug) Data.Mode |= CmsLoginData::kYR_debug;
00481    if (CmsState.Suspended)      {Data.Mode |= CmsLoginData::kYR_suspend;
00482                                  wasSuspended = 1;
00483                                 }
00484    Data.HoldTime = Config.LUPHold;
00485 
00486 // Do the login and get the data
00487 //
00488    if (!Source.Admit(Link, Data)) return 0;
00489 
00490 // Handle Redirectors here (minimal stuff to do)
00491 //
00492    if (Data.Mode & CmsLoginData::kYR_director) 
00493       {Link->setID("redirector", Data.HoldTime);
00494        return Admit_Redirector(wasSuspended);
00495       }
00496 
00497 // Disallow subscriptions we are are configured as a solo manager
00498 //
00499    if (Config.asSolo())
00500       return Login_Failed("configuration disallows subscribers");
00501 
00502 // Determine the role of this incomming login.
00503 //
00504         if ((isMan = Data.Mode & CmsLoginData::kYR_manager))
00505            {Status = CMS_isMan;
00506             if ((isPeer =  Data.Mode & CmsLoginData::kYR_peer))
00507                {Status |= CMS_isPeer;  myRole = "manager";}
00508                else                    myRole = "supervisor";
00509            }
00510    else if ((isServ =  Data.Mode & CmsLoginData::kYR_server))
00511             if ((isProxy=  Data.Mode & CmsLoginData::kYR_proxy))
00512                {Status = CMS_isProxy; myRole = "proxy_srvr";}
00513                else                   myRole = "server";
00514    else if ((isPeer =  Data.Mode & CmsLoginData::kYR_peer))
00515            {Status |= CMS_isPeer;
00516             myRole = (CmsLoginData::kYR_proxy ? "peer" : "peer_proxy");
00517            }
00518    else    return Login_Failed("invalid login role");
00519 
00520 // Set the link identification
00521 //
00522    Link->setID(myRole, Data.HoldTime);
00523 
00524 // Make sure that our role is compatible with the incomming role
00525 //
00526    Reason = 0;
00527    if (Config.asServer())       // We are a supervisor
00528       {if (Config.asProxy() && (!isProxy || isPeer))
00529           Reason = "configuration only allows proxies";
00530           else if (!isServ)
00531           Reason = "configuration disallows peers and proxies";
00532       } else {                  // We are a manager
00533        if (Config.asProxy() &&   isServ)
00534           Reason = "configuration only allows peers or proxies";
00535           else if (isProxy)
00536           Reason = "configuration disallows proxies";
00537       }
00538    if (Reason) return Login_Failed(Reason);
00539 
00540 // The server may specify nostage and suspend
00541 //
00542    if (Data.Mode & CmsLoginData::kYR_nostage) Status |= CMS_noStage;
00543    if (Data.Mode & CmsLoginData::kYR_suspend) Status |= CMS_Suspend;
00544 
00545 // The server may specify that it has been trying for a long time
00546 //
00547    if (Data.Mode & CmsLoginData::kYR_trying)
00548       Say.Emsg("Protocol",Link->Name(),"has not yet found a cluster slot!");
00549 
00550 // Add the node. The resulting node object will be locked and the caller will
00551 // unlock it prior to dispatching.
00552 //
00553    if (!(myNode = Cluster.Add(Link, Data.dPort, Status, Data.sPort,
00554                               (const char *)Data.SID)))
00555       return (XrdCmsRouting *)0;
00556 
00557 // Record the status of the server's filesystem
00558 //
00559    DEBUG(Link->Name() <<" TSpace=" <<Data.tSpace <<"GB NumFS=" <<Data.fsNum
00560                       <<" FSpace=" <<Data.fSpace <<"MB MinFR=" <<Data.mSpace
00561                       <<"MB Util=" <<Data.fsUtil);
00562    myNode->DiskTotal = Data.tSpace;
00563    myNode->DiskMinF  = Data.mSpace;
00564    myNode->DiskFree  = Data.fSpace;
00565    myNode->DiskNums  = Data.fsNum;
00566    myNode->DiskUtil  = Data.fsUtil;
00567    Meter.setVirtUpdt();
00568 
00569 // Check for any configuration changes and then process all of the paths.
00570 //
00571    if (Data.Paths && *Data.Paths)
00572       {XrdOucTokenizer thePaths((char *)Data.Paths);
00573        char *tp, *pp;
00574        ConfigCheck(Data.Paths);
00575        while((tp = thePaths.GetLine()))
00576             {DEBUG(Link->Name() <<" adding path: " <<tp);
00577              if (!(tp = thePaths.GetToken())
00578              ||  !(pp = thePaths.GetToken())) break;
00579              if (!(newmask = AddPath(myNode, tp, pp)))
00580                 return Login_Failed("invalid exported path");
00581              servset |= newmask;
00582              addedp= 1;
00583             }
00584       }
00585 
00586 // Check if we have any special paths. If none, then add the default path.
00587 //
00588    if (!addedp) 
00589       {XrdCmsPInfo pinfo;
00590        ConfigCheck(0);
00591        pinfo.rovec = myNode->Mask();
00592        if (myNode->isPeer) pinfo.ssvec = myNode->Mask();
00593        servset = Cache.Paths.Insert("/", &pinfo);
00594        Say.Emsg("Protocol", myNode->Ident, "defaulted r /");
00595       }
00596 
00597 // Set the reference counts for intersecting nodes to be the same.
00598 // Additionally, indicate cache refresh will be needed because we have a new
00599 // node that may have files the we already reported on.
00600 //
00601    Cluster.ResetRef(servset);
00602    if (Config.asManager()) {Manager.Reset(); myNode->SyncSpace();}
00603    myNode->isDisable = 0;
00604 
00605 // Document the login
00606 //
00607    Say.Emsg("Protocol", myNode->Ident,
00608             (myNode->isSuspend ? "logged in suspended." : "logged in."));
00609 
00610 // All done
00611 //
00612    return &rspVOps;
00613 }
00614   
00615 /******************************************************************************/
00616 /*                      A d m i t _ R e d i r e c t o r                       */
00617 /******************************************************************************/
00618   
00619 XrdCmsRouting *XrdCmsProtocol::Admit_Redirector(int wasSuspended)
00620 {
00621    EPNAME("Admit_Redirector");
00622    static CmsStatusRequest newState 
00623                    = {{0, kYR_status, CmsStatusRequest::kYR_Resume, 0}};
00624 
00625 // Indicate what role I have
00626 //
00627    myRole = "redirector";
00628 
00629 // Director logins have no additional parameters. We return with the node object
00630 // locked to be consistent with the way server/suprvisors nodes are returned.
00631 //
00632    myNode = new XrdCmsNode(Link); myNode->Lock();
00633    if (!(RSlot = RTable.Add(myNode)))
00634       {Say.Emsg("Protocol",myNode->Ident,"login failed; too many redirectors.");
00635        return 0;
00636       } else myNode->setSlot(RSlot);
00637 
00638 // If we told the redirector we were suspended then we must check if that is no
00639 // longer true and generate a reume event as the redirector may have missed it
00640 //
00641    if (wasSuspended && !CmsState.Suspended)
00642       myNode->Send((char *)&newState, sizeof(newState));
00643 
00644 // Login succeeded
00645 //
00646    Say.Emsg("Protocol", myNode->Ident, "logged in.");
00647    DEBUG(myNode->Ident <<" assigned slot " <<RSlot);
00648    return &rdrVOps;
00649 }
00650 
00651 /******************************************************************************/
00652 /*                               A d d P a t h                                */
00653 /******************************************************************************/
00654   
00655 SMask_t XrdCmsProtocol::AddPath(XrdCmsNode *nP,
00656                                 const char *pType, const char *Path)
00657 {
00658     XrdCmsPInfo pinfo;
00659 
00660 // Process: addpath {r | w | rw}[s] path
00661 //
00662    while(*pType)
00663         {     if ('r' == *pType) pinfo.rovec =               nP->Mask();
00664          else if ('w' == *pType) pinfo.rovec = pinfo.rwvec = nP->Mask();
00665          else if ('s' == *pType) pinfo.rovec = pinfo.ssvec = nP->Mask();
00666          else return 0;
00667          pType++;
00668         }
00669 
00670 // Set node options
00671 //
00672    nP->isRW = (pinfo.rwvec ? XrdCmsNode::allowsRW : 0) 
00673             | (pinfo.ssvec ? XrdCmsNode::allowsSS : 0);
00674 
00675 // Add the path to the known path list
00676 //
00677    return Cache.Paths.Insert(Path, &pinfo);
00678 }
00679 
00680 /******************************************************************************/
00681 /*                                 A l l o c                                  */
00682 /******************************************************************************/
00683 
00684 XrdCmsProtocol *XrdCmsProtocol::Alloc(const char *theRole, 
00685                                       const char *theMan,
00686                                             int   thePort)
00687 {
00688    XrdCmsProtocol *xp;
00689 
00690 // Grab a protocol object and, if none, return a new one
00691 //
00692    ProtMutex.Lock();
00693    if ((xp = ProtStack)) ProtStack = xp->ProtLink;
00694       else xp = new XrdCmsProtocol();
00695    ProtMutex.UnLock();
00696 
00697 // Initialize the object if we actually got one
00698 //
00699    if (!xp) Say.Emsg("Protocol","No more protocol objects.");
00700       else {xp->myRole    = theRole;
00701             xp->myMan     = theMan;
00702             xp->myManPort = thePort;
00703             xp->loggedIn  = 0;
00704            }
00705 
00706 // All done
00707 //
00708    return xp;
00709 }
00710 
00711 /******************************************************************************/
00712 /*                           C o n f i g C h e c k                            */
00713 /******************************************************************************/
00714   
00715 void XrdCmsProtocol::ConfigCheck(unsigned char *theConfig)
00716 {
00717   unsigned int ConfigID;
00718   int tmp;
00719 
00720 // Compute the new configuration ID
00721 //
00722    if (!theConfig) ConfigID = 1;
00723       else ConfigID = XrdOucCRC::CRC32(theConfig, strlen((char *)theConfig));
00724 
00725 // If the configuration chaged or a new node, then we must bounce this node
00726 //
00727    if (ConfigID != myNode->ConfigID)
00728       {if (myNode->ConfigID) Say.Emsg("Protocol",Link->Name(),"reconfigured.");
00729        Cache.Paths.Remove(myNode->Mask());
00730        Cache.Bounce(myNode->Mask(), myNode->ID(tmp));
00731        myNode->ConfigID = ConfigID;
00732       }
00733 }
00734 
00735 /******************************************************************************/
00736 /*                              D i s p a t c h                               */
00737 /******************************************************************************/
00738 
00739 // Dispatch is provided with three key pieces of information:
00740 // 1) The connection bearing (isUp, isDown, isLateral) the determines how
00741 //    timeouts are to be handled.
00742 // 2) The maximum amount to wait for data to arrive.
00743 // 3) The number of successive timeouts we can have before we give up.
00744   
00745 const char *XrdCmsProtocol::Dispatch(Bearing cDir, int maxWait, int maxTries)
00746 {
00747    EPNAME("Dispatch");
00748    static const int ReqSize = sizeof(CmsRRHdr);
00749    static CmsPingRequest Ping = {{0, kYR_ping,  0, 0}};
00750    XrdCmsRRData *Data = XrdCmsRRData::Objectify();
00751    XrdCmsJob  *jp;
00752    const char *toRC = (cDir == isUp ? "manager not active"
00753                                     : "server not responding");
00754    const char *myArgs, *myArgt;
00755    char        buff[8];
00756    int         rc, toLeft = maxTries;
00757 
00758 // Dispatch runs with the current thread bound to the link.
00759 //
00760    Link->Bind(XrdSysThread::ID());
00761 
00762 // Read in the request header
00763 //
00764 do{if ((rc = Link->RecvAll((char *)&Data->Request, ReqSize, maxWait)) < 0)
00765       {if (rc != -ETIMEDOUT) return "request read failed";
00766        if (!toLeft--) return toRC;
00767        if (cDir == isDown && Link->Send((char *)&Ping, sizeof(Ping)) < 0)
00768           return "server unreachable";
00769        continue;
00770       }
00771 
00772 // Decode the length and get the rest of the data
00773 //
00774    toLeft = maxTries;
00775    Data->Dlen = static_cast<int>(ntohs(Data->Request.datalen));
00776    if ((QTRACE(Debug))
00777    && Data->Request.rrCode != kYR_ping && Data->Request.rrCode != kYR_pong)
00778       DEBUG(myNode->Ident <<" for " <<Router.getName(Data->Request.rrCode)
00779                           <<" dlen=" <<Data->Dlen);
00780    if (!(Data->Dlen)) {myArgs = myArgt = 0;}
00781       else {if (Data->Dlen > maxReqSize)
00782                {Say.Emsg("Protocol","Request args too long from",Link->Name());
00783                 return "protocol error";
00784                }
00785             if ((!Data->Buff || Data->Blen < Data->Dlen)
00786             &&  !Data->getBuff(Data->Dlen))
00787                {Say.Emsg("Protocol", "No buffers to serve", Link->Name());
00788                 return "insufficient buffers";
00789                }
00790             if ((rc = Link->RecvAll(Data->Buff, Data->Dlen, maxWait)) < 0)
00791                return (rc == -ETIMEDOUT ? "read timed out" : "read failed");
00792             myArgs = Data->Buff; myArgt = Data->Buff + Data->Dlen;
00793            }
00794 
00795 // Check if request is actually valid
00796 //
00797    if (!(Data->Routing = Routing->getRoute(int(Data->Request.rrCode))))
00798       {sprintf(buff, "%d", Data->Request.rrCode);
00799        Say.Emsg("Protocol",Link->Name(),"sent an invalid request -", buff);
00800        continue;
00801       }
00802 
00803 // Parse the arguments (we do this in the main thread to avoid overruns)
00804 //
00805    if (!(Data->Routing & XrdCmsRouting::noArgs))
00806       {if (Data->Request.modifier & kYR_raw)
00807           {Data->Path = Data->Buff; Data->PathLen = Data->Dlen;}
00808           else if (!myArgs
00809                || !ProtArgs.Parse(int(Data->Request.rrCode),myArgs,myArgt,Data))
00810                   {Reply_Error(*Data, kYR_EINVAL, "badly formed request");
00811                    continue;
00812                   }
00813       }
00814 
00815 // Insert correct identification
00816 //
00817    if (!(Data->Ident) || !(*Data->Ident)) Data->Ident = myNode->Ident;
00818 
00819 // Schedule this request if async. Otherwise, do this inline. Note that
00820 // synchrnous requests are allowed to return status changes (e.g., redirect)
00821 //
00822    if (Data->Routing & XrdCmsRouting::isSync)
00823       {if ((rc = Execute(*Data)) && rc == -ECONNABORTED) return "redirected";}
00824       else if ((jp = XrdCmsJob::Alloc(this, Data)))
00825               {Sched->Schedule((XrdJob *)jp);
00826                Data = XrdCmsRRData::Objectify();
00827               }
00828               else Say.Emsg("Protocol", "No jobs to serve", Link->Name());
00829   } while(1);
00830 
00831 // We should never get here
00832 //
00833    return "logic error";
00834 }
00835 
00836 /******************************************************************************/
00837 /*                                  D o I t                                   */
00838 /******************************************************************************/
00839   
00840 // Determine how we should proceed here
00841 //
00842 void XrdCmsProtocol::DoIt()
00843 {
00844 
00845 // If we have a role, then we should simply pander it
00846 //
00847    if (myRole) Pander(myMan, myManPort);
00848 }
00849 
00850 /******************************************************************************/
00851 /*                          L o g i n _ F a i l e d                           */
00852 /******************************************************************************/
00853   
00854 XrdCmsRouting *XrdCmsProtocol::Login_Failed(const char *reason)
00855 {
00856    Link->setEtext(reason);
00857    return (XrdCmsRouting *)0;
00858 }
00859 
00860 /******************************************************************************/
00861 /*                               R e i s s u e                                */
00862 /******************************************************************************/
00863 
00864 void XrdCmsProtocol::Reissue(XrdCmsRRData &Data)
00865 {
00866    EPNAME("Resisue");
00867    XrdCmsPInfo pinfo;
00868    SMask_t amask;
00869    struct iovec ioB[2] = {{(char *)&Data.Request, sizeof(Data.Request)},
00870                           {         Data.Buff,    Data.Dlen}
00871                          };
00872 
00873 // Check if we can really reissue the command
00874 //
00875    if (!((Data.Request.modifier += kYR_hopincr) & kYR_hopcount))
00876       {Say.Emsg("Job", Router.getName(Data.Request.rrCode),
00877                        "msg TTL exceeded for", Data.Path);
00878        return;
00879       }
00880 
00881 // We do not support 2way re-issued messages
00882 //
00883    Data.Request.streamid = 0;
00884   
00885 // Find all the nodes that might be able to do somthing on this path
00886 //
00887    if (!Cache.Paths.Find(Data.Path, pinfo)
00888    || (amask = pinfo.rwvec | pinfo.rovec) == 0)
00889       {Say.Emsg("Job", Router.getName(Data.Request.rrCode),
00890                        "aborted; no servers handling", Data.Path);
00891        return;
00892       }
00893 
00894 // Do some debugging
00895 //
00896    DEBUG("FWD " <<Router.getName(Data.Request.rrCode) <<' ' <<Data.Path);
00897 
00898 // Now send off the message to all the nodes
00899 //
00900    Cluster.Broadcast(amask, ioB, 2, sizeof(Data.Request)+Data.Dlen);
00901 }
00902   
00903 /******************************************************************************/
00904 /*                           R e p l y _ D e l a y                            */
00905 /******************************************************************************/
00906   
00907 void XrdCmsProtocol::Reply_Delay(XrdCmsRRData &Data, kXR_unt32 theDelay)
00908 {
00909      EPNAME("Reply_Delay");
00910      const char *act;
00911 
00912      if (Data.Request.streamid && (Data.Routing & XrdCmsRouting::Repliable))
00913         {CmsResponse Resp = {{Data.Request.streamid, kYR_wait, 0,
00914                                htons(sizeof(kXR_unt32))}, theDelay};
00915          act = " sent";
00916          Link->Send((char *)&Resp, sizeof(Resp));
00917         } else act = " skip";
00918 
00919      DEBUG(myNode->Ident <<act <<" delay " <<ntohl(theDelay));
00920 }
00921 
00922 /******************************************************************************/
00923 /*                           R e p l y _ E r r o r                            */
00924 /******************************************************************************/
00925   
00926 void XrdCmsProtocol::Reply_Error(XrdCmsRRData &Data, int ecode, const char *etext)
00927 {
00928      EPNAME("Reply_Error");
00929      const char *act;
00930      int n = strlen(etext)+1;
00931 
00932      if (Data.Request.streamid && (Data.Routing & XrdCmsRouting::Repliable))
00933         {CmsResponse Resp = {{Data.Request.streamid, kYR_error, 0,
00934                               htons(sizeof(kXR_unt32)+n)},
00935                              htonl(static_cast<unsigned int>(ecode))};
00936          struct iovec ioV[2] = {{(char *)&Resp, sizeof(Resp)},
00937                                 {(char *)etext, n}};
00938          act = " sent";
00939          Link->Send(ioV, 2);
00940         } else act = " skip";
00941 
00942      DEBUG(myNode->Ident <<act <<" err " <<ecode  <<' ' <<etext);
00943 }

Generated on Tue Jul 5 14:46:31 2011 for ROOT_528-00b_version by  doxygen 1.5.1