XrdCmsNode.cc

Go to the documentation of this file.
00001 /***********************************************************************************************/
00002 /*                                                                            */
00003 /*                         X r d C m s N o d e . c c                          */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //         $Id: XrdCmsNode.cc 38011 2011-02-08 18:35:57Z ganis $
00012 
00013 // Original Version: 1.52 2007/08/28 01:32:15 abh
00014 
00015 const char *XrdCmsNodeCVSID = "$Id: XrdCmsNode.cc 38011 2011-02-08 18:35:57Z ganis $";
00016   
00017 #include <errno.h>
00018 #include <limits.h>
00019 #include <stdio.h>
00020 #include <time.h>
00021 #include <netinet/in.h>
00022 #include <sys/types.h>
00023 #include <sys/stat.h>
00024 
00025 #include "Xrd/XrdJob.hh"
00026 #include "Xrd/XrdLink.hh"
00027 
00028 #include "XProtocol/YProtocol.hh"
00029 
00030 #include "XrdCms/XrdCmsCache.hh"
00031 #include "XrdCms/XrdCmsCluster.hh"
00032 #include "XrdCms/XrdCmsConfig.hh"
00033 #include "XrdCms/XrdCmsManager.hh"
00034 #include "XrdCms/XrdCmsManList.hh"
00035 #include "XrdCms/XrdCmsManTree.hh"
00036 #include "XrdCms/XrdCmsMeter.hh"
00037 #include "XrdCms/XrdCmsPList.hh"
00038 #include "XrdCms/XrdCmsPrepare.hh"
00039 #include "XrdCms/XrdCmsRRData.hh"
00040 #include "XrdCms/XrdCmsNode.hh"
00041 #include "XrdCms/XrdCmsSelect.hh"
00042 #include "XrdCms/XrdCmsState.hh"
00043 #include "XrdCms/XrdCmsTrace.hh"
00044 #include "XrdCms/XrdCmsXmi.hh"
00045 
00046 #include "XrdNet/XrdNetDNS.hh"
00047 
00048 #include "XrdOss/XrdOss.hh"
00049 
00050 #include "XrdOuc/XrdOucName2Name.hh"
00051 #include "XrdOuc/XrdOucProg.hh"
00052 #include "XrdOuc/XrdOucPup.hh"
00053 #include "XrdOuc/XrdOucTokenizer.hh"
00054 #include "XrdOuc/XrdOucUtils.hh"
00055 
00056 #include "XrdSys/XrdSysPlatform.hh"
00057 
00058 using namespace XrdCms;
00059 
00060 /******************************************************************************/
00061 /*                        S t a t i c   O b j e c t s                         */
00062 /******************************************************************************/
00063 
00064 XrdSysMutex XrdCmsNode::mlMutex;
00065 
00066 int         XrdCmsNode::LastFree = 0;
00067   
00068 /******************************************************************************/
00069 /*                           C o n s t r u c t o r                            */
00070 /******************************************************************************/
00071   
00072 XrdCmsNode::XrdCmsNode(XrdLink *lnkp, int port,
00073                        const char *nid,  int lvl, int id)
00074 {
00075     static XrdSysMutex   iMutex;
00076     static const SMask_t smask_1(1);
00077     static int           iNum = 1;
00078 
00079     Link     =  lnkp;
00080     IPAddr   =  0;
00081     NodeMask =  (id < 0 ? 0 : smask_1 << id);
00082     NodeID   = id;
00083     isDisable=  0;
00084     isNoStage=  0;
00085     isOffline=  (lnkp == 0);
00086     isSuspend=  0;
00087     isBound  =  0;
00088     isConn   =  0;
00089     isGone   =  0;
00090     isPerm   =  0;
00091     isMan    =  0;
00092     isKnown  =  0;
00093     isPeer   =  0;
00094     isProxy  =  0;
00095     myCost   =  0;
00096     myLoad   =  0;
00097     myMass   =  0;
00098     myCNUM   = -3;
00099     DiskTotal=  0;
00100     DiskFree =  0;
00101     DiskMinF =  0;
00102     DiskNums =  0;
00103     DiskUtil =  0;
00104     Next     =  0;
00105     RefA     =  0;
00106     RefTotA  =  0;
00107     RefR     =  0;
00108     RefTotR  =  0;
00109     logload  =  Config.LogPerf;
00110     DropTime =  0;
00111     DropJob  =  0;
00112     myName   =  0;
00113     myNlen   =  0;
00114     Ident    =  0;
00115     Port     =  0;
00116     myNID    = strdup(nid ? nid : "?");
00117     if ((myCID = index(myNID, ' '))) myCID++;
00118        else myCID = myNID;
00119     myLevel  = lvl;
00120     ConfigID =  0;
00121 
00122 // setName() will set Ident, IPAddr, IPV6, myName, myNlen, & Port!
00123 //
00124    setName(lnkp, (nid ? port : 0));
00125 
00126    iMutex.Lock();
00127    Instance =  iNum++;
00128    iMutex.UnLock();
00129 }
00130 
00131 /******************************************************************************/
00132 /*                            D e s t r u c t o r                             */
00133 /******************************************************************************/
00134   
00135 XrdCmsNode::~XrdCmsNode()
00136 {
00137    isOffline = 1;
00138    if (isLocked) UnLock();
00139 
00140 // Delete other appendages
00141 //
00142    if (Ident) free(Ident);
00143    if (myNID) free(myNID);
00144    if (myName)free(myName);
00145 }
00146 
00147 /******************************************************************************/
00148 /*                               s e t N a m e                                */
00149 /******************************************************************************/
00150   
00151 void XrdCmsNode::setName(XrdLink *lnkp, int port)
00152 {
00153    struct sockaddr netaddr;
00154    char *bp, buff[512];
00155    const char *hname = lnkp->Host();
00156    unsigned int hAddr;
00157 
00158 // Get our address (the long way)
00159 //
00160    lnkp->Name(&netaddr);
00161    hAddr= XrdNetDNS::IPAddr(&netaddr);
00162 
00163 // Check if this is a duplicate
00164 //
00165    if (myName)
00166       {if (!strcmp(myName, hname) && port == Port && hAddr == IPAddr) return;
00167           else free(myName);
00168       }
00169 
00170 // Construct our identification
00171 //
00172    IPAddr = hAddr;
00173    myName = strdup(hname);
00174    myNlen = strlen(hname)+1;
00175    Port = port;
00176 
00177    if (!port) strcpy(buff, lnkp->ID);
00178       else    sprintf(buff, "%s:%d", lnkp->ID, port);
00179    if (Ident) free(Ident);
00180    Ident = strdup(buff);
00181 
00182    strcpy(IPV6, "[::");
00183    bp = IPV6+3;
00184    bp += XrdNetDNS::IP2String(IPAddr, 0, bp, 24); // We're cheating
00185    *bp++ = ']';
00186    if (Port) {*bp++ = ':'; bp += sprintf(bp, "%d", Port);}
00187    IPV6Len = bp - IPV6;
00188 }
00189 
00190 /******************************************************************************/
00191 /*                                  D i s c                                   */
00192 /******************************************************************************/
00193 
00194 void XrdCmsNode::Disc(const char *reason, int needLock)
00195 {
00196 
00197 // Lock the object of not yet locked
00198 //
00199    if (needLock) myMutex.Lock();
00200    isOffline = 1;
00201 
00202 // If we are still connected, initiate a teardown
00203 //
00204    if (isConn)
00205       {Link->setEtext(reason);
00206        Link->Close(1);
00207        isConn = 0;
00208       }
00209 
00210 // Unlock ourselves if we locked ourselves
00211 //
00212    if (needLock) myMutex.UnLock();
00213 }
00214   
00215 /******************************************************************************/
00216 /*                              d o _ A v a i l                               */
00217 /******************************************************************************/
00218   
00219 // Node responses to space usage requests from a manager are localized to the 
00220 // cell and need not be propopagated in any direction.
00221 //
00222 const char *XrdCmsNode::do_Avail(XrdCmsRRData &Arg)
00223 {
00224    EPNAME("do_Avail")
00225 
00226 // Process: avail <fsdsk> <util>
00227 //
00228    DiskFree = Arg.dskFree;
00229    DiskUtil = static_cast<int>(Arg.dskUtil);
00230 
00231 // Do some debugging
00232 //
00233    DEBUGR(DiskFree <<"MB free; " <<DiskUtil <<"% util");
00234    return 0;
00235 }
00236 
00237 /******************************************************************************/
00238 /*                              d o _ C h m o d                               */
00239 /******************************************************************************/
00240   
00241 // Chmod requests are forwarded to all subscribers subject to an Xmi callout.
00242 //
00243 const char *XrdCmsNode::do_Chmod(XrdCmsRRData &Arg)
00244 {
00245    EPNAME("do_Chmod")
00246    mode_t mode = 0;
00247    int rc;
00248 
00249 // Do some debugging
00250 //
00251    DEBUGR("mode " <<Arg.Mode <<' ' <<Arg.Path);
00252 
00253 // If we have an Xmi then call it
00254 //
00255    if (Xmi_Chmod)
00256       {XrdCmsReq Req(this, Arg.Request.streamid);
00257        if (!getMode(Arg.Mode, mode)) return "invalid mode";
00258           else if (Xmi_Chmod->Chmod(&Req, mode, Arg.Path, Arg.Opaque)) return 0;
00259       }
00260 
00261 // We are don here if we have no data; otherwise convert the mode if we
00262 // haven't done so already.
00263 //
00264    if (!Config.DiskOK) return 0;
00265    if (!mode && !getMode(Arg.Mode, mode)) return "invalid mode";
00266 
00267 // Attempt to change the mode either via call-out or the oss plug-in
00268 //
00269    if (Config.ProgCH) rc = fsExec(Config.ProgCH, Arg.Mode, Arg.Path);
00270       else rc = Config.ossFS->Chmod(Arg.Path, mode);
00271 
00272 // Return appropriate result
00273 //
00274    return (rc ? fsFail(Arg.Ident, "chmod", Arg.Path, rc) : 0);
00275 }
00276 
00277 /******************************************************************************/
00278 /*                               d o _ D i s c                                */
00279 /******************************************************************************/
00280 
00281 // When a manager receives a disc response from a node it sends a disc request
00282 // and then closes the connection.
00283 // When a node    receives a disc request it simply closes the connection.
00284 
00285 const char *XrdCmsNode::do_Disc(XrdCmsRRData &Arg)
00286 {
00287 
00288 // Indicate we have received a disconnect
00289 //
00290    Say.Emsg("Node", Link->Name(), "requested a disconnect");
00291 
00292 // If we must send a disc request, do so now
00293 //
00294    if (Config.asManager()) Link->Send((char *)&Arg.Request,sizeof(Arg.Request));
00295 
00296 // Close the link and return an error
00297 //
00298    isOffline = 1;
00299    Link->Close(1);
00300    return ".";   // Signal disconnect
00301 }
00302 
00303 /******************************************************************************/
00304 /*                               d o _ G o n e                                */
00305 /******************************************************************************/
00306 
00307 // When a manager receives a gone request it is propogated if we are subscribed
00308 // and we have not sent a gone request in the immediate past.
00309 //
00310 const char *XrdCmsNode::do_Gone(XrdCmsRRData &Arg)
00311 {
00312    EPNAME("do_Gone")
00313    int newgone;
00314 
00315 // Do some debugging
00316 //
00317    TRACER(Files,Arg.Path);
00318 
00319 // Update path information and delete this from the prep queue if we are a
00320 // staging node. We can also be called via the admin end-point interface
00321 // In this case, we have no cache and simply forward up the request.
00322 //
00323    if (Config.asManager())
00324       {XrdCmsSelect Sel(XrdCmsSelect::Advisory, Arg.Path, Arg.PathLen-1);
00325        newgone = Cache.DelFile(Sel, NodeMask);
00326       } else {
00327        newgone = 1;
00328        if (Config.DiskSS) PrepQ.Gone(Arg.Path);
00329       }
00330 
00331 // If we have no managers and we still have the file or never had it, return
00332 //
00333    if (!Manager.Present() || !newgone) return 0;
00334 
00335 // Back-propogate the gone to all of our managers
00336 //
00337    Manager.Inform(Arg.Request, Arg.Buff, Arg.Dlen);
00338 
00339 // All done
00340 //
00341    return 0;
00342 }
00343 
00344 /******************************************************************************/
00345 /*                               d o _ H a v e                                */
00346 /******************************************************************************/
00347   
00348 // When a manager receives a have request it is propogated if we are subscribed
00349 // and we have not sent a have request in the immediate past.
00350 //
00351 const char *XrdCmsNode::do_Have(XrdCmsRRData &Arg)
00352 {
00353    EPNAME("do_Have")
00354    XrdCmsPInfo  pinfo;
00355    int isnew, Opts;
00356 
00357 // Do some debugging
00358 //
00359    TRACER(Files, (Arg.Request.modifier&CmsHaveRequest::Pending ? "P ":"") 
00360                  <<Arg.Path);
00361 
00362 // Find if we can handle the file in r/w mode and if staging is present
00363 //
00364    Opts = (Cache.Paths.Find(Arg.Path, pinfo) && (pinfo.rwvec & NodeMask)
00365         ? XrdCmsSelect::Write : 0);
00366    if (Arg.Request.modifier & CmsHaveRequest::Pending)
00367       Opts |= XrdCmsSelect::Pending;
00368 
00369 // Update path information
00370 //
00371    if (!Config.asManager()) isnew = 1;
00372       else {XrdCmsSelect Sel(XrdCmsSelect::Advisory|Opts,Arg.Path,Arg.PathLen-1);
00373             Sel.Path.Hash = Arg.Request.streamid;
00374             isnew = Cache.AddFile(Sel, NodeMask);
00375            }
00376 
00377 // Return if we have no managers or we already informed the managers
00378 //
00379    if (!Manager.Present() || !isnew) return 0;
00380 
00381 // Back-propogate the have to all of our managers
00382 //
00383    Manager.Inform(Arg.Request, Arg.Buff, Arg.Dlen);
00384 
00385 // All done
00386 //
00387    return 0;
00388 }
00389   
00390 /******************************************************************************/
00391 /*                               d o _ L o a d                                */
00392 /******************************************************************************/
00393   
00394 // Responses to usage requests are local to the cell and never propagated.
00395 //
00396 const char *XrdCmsNode::do_Load(XrdCmsRRData &Arg)
00397 {
00398    EPNAME("do_Load")
00399    int temp, pcpu, pnet, pxeq, pmem, ppag, pdsk;
00400 
00401 // Process: load <cpu> <io> <load> <mem> <pag> <util> <rsvd> <dskFree>
00402 //               0     1    2      3     4     5      6
00403    pcpu = static_cast<int>(Arg.Opaque[CmsLoadRequest::cpuLoad]);
00404    pnet = static_cast<int>(Arg.Opaque[CmsLoadRequest::netLoad]);
00405    pxeq = static_cast<int>(Arg.Opaque[CmsLoadRequest::xeqLoad]);
00406    pmem = static_cast<int>(Arg.Opaque[CmsLoadRequest::memLoad]);
00407    ppag = static_cast<int>(Arg.Opaque[CmsLoadRequest::pagLoad]);
00408    pdsk = static_cast<int>(Arg.Opaque[CmsLoadRequest::dskLoad]);
00409 
00410 // Compute actual load value
00411 //
00412    myLoad = Meter.calcLoad(pcpu, pnet, pxeq, pmem, ppag);
00413    myMass = Meter.calcLoad(myLoad, pdsk);
00414    DiskFree = Arg.dskFree;
00415    DiskUtil = pdsk;
00416 
00417 // Do some debugging
00418 //
00419    DEBUGR("cpu=" <<pcpu <<" net=" <<pnet <<" xeq=" <<pxeq
00420        <<" mem=" <<pmem <<" pag=" <<ppag <<" dsk=" <<pdsk
00421        <<"% " <<DiskFree <<"MB load=" <<myLoad <<" mass=" <<myMass);
00422 
00423 // If we are also a manager then use this load figure to come up with
00424 // an overall load to report when asked. If we get free space, then we
00425 // must report that now so that we can be selected for allocation.
00426 //
00427    if (Config.asManager())
00428       {Meter.Record(pcpu, pnet, pxeq, pmem, ppag, pdsk);
00429        if (isRW && DiskFree != LastFree)
00430           {mlMutex.Lock();
00431            temp = LastFree; LastFree = DiskFree; Meter.setVirtUpdt();
00432            if (!temp && DiskFree >= Config.DiskMin) do_Space(Arg);
00433            mlMutex.UnLock();
00434           }
00435       }
00436 
00437 // Report new load if need be
00438 //
00439    if (Config.LogPerf && !logload)
00440       {char buff[1024];
00441        snprintf(buff, sizeof(buff)-1,
00442                "load=%d; cpu=%d net=%d inq=%d mem=%d pag=%d dsk=%d utl=%d",
00443                myLoad, pcpu, pnet, pxeq, pmem, ppag, Arg.dskFree, pdsk);
00444        Say.Emsg("Node", Name(), buff);
00445        logload = Config.LogPerf;
00446       } else logload--;
00447 
00448 // Return as if we had gotten a pong
00449 //
00450    return do_Pong(Arg);
00451 }
00452 
00453   
00454 /******************************************************************************/
00455 /*                             d o _ L o c a t e                              */
00456 /******************************************************************************/
00457 
00458 const char *XrdCmsNode::do_Locate(XrdCmsRRData &Arg)
00459 {
00460    EPNAME("do_Locate";)
00461    XrdCmsRRQInfo reqInfo(Instance, RSlot, Arg.Request.streamid);
00462    XrdCmsSelect    Sel(0, Arg.Path, Arg.PathLen-1);
00463    XrdCmsSelected *sP = 0;
00464    struct {kXR_unt32 Val; 
00465            char outbuff[CmsLocateRequest::RILen*STMax];} Resp;
00466    struct iovec ioV[2] = {{(char *)&Arg.Request, sizeof(Arg.Request)},
00467                           {(char *)&Resp,        0}};
00468    const char *Why;
00469    char theopts[8], *toP = theopts;
00470    int rc, bytes;
00471 
00472 // Do a callout to the external manager if we have one
00473 //
00474    if (Xmi_Select)
00475       {XrdCmsReq Req(this, Arg.Request.streamid);
00476        if (Xmi_Select->Select(&Req, XMI_LOCATE, Arg.Path, Arg.Opaque)) return 0;
00477       }
00478 
00479 // Grab the refresh option (the only one we support)
00480 //
00481    if (Arg.Opts & CmsLocateRequest::kYR_refresh) 
00482       {Sel.Opts  = XrdCmsSelect::Refresh; *toP++='s';}
00483    if (Arg.Opts & CmsLocateRequest::kYR_asap)
00484       {Sel.Opts |= XrdCmsSelect::Asap;    *toP++='i'; Sel.InfoP = &reqInfo;}
00485       else                                            Sel.InfoP = 0;
00486 
00487 // Do some debugging
00488 //
00489    *toP = '\0';
00490    DEBUGR(theopts <<' ' <<Arg.Path);
00491 
00492 // Perform location
00493 //
00494    if ((rc = Cluster.Locate(Sel)))
00495       {if (rc > 0)
00496           {Arg.Request.rrCode = kYR_wait;
00497            bytes = sizeof(Resp.Val); Why = "delay ";
00498           } else {
00499            if (rc == -2) return 0;
00500            Arg.Request.rrCode = kYR_error;
00501            rc = kYR_ENOENT; Why = "miss ";
00502            bytes = strlcpy(Resp.outbuff, "No servers have access to the file",
00503                    sizeof(Resp.outbuff)) + sizeof(Resp.Val) + 1;
00504           }
00505       } else {Why = "?"; bytes = 0;}
00506 
00507 // List the servers
00508 //
00509    if (!rc)
00510       {if (!Sel.Vec.hf || !(sP=Cluster.List(Sel.Vec.hf,XrdCmsCluster::LS_IPV6)))
00511           {Arg.Request.rrCode = kYR_error;
00512            rc = kYR_ENOENT; Why = "none ";
00513            bytes = strlcpy(Resp.outbuff, "No servers have the file",
00514                           sizeof(Resp.outbuff)) + sizeof(Resp.Val) + 1;
00515           } else rc = 0;
00516       }
00517 
00518 // Either prepare to send an error or format the result
00519 //
00520    if (rc)
00521       {Resp.Val           = htonl(rc);
00522        DEBUGR(Why <<Arg.Path);
00523       } else {
00524        bytes=do_LocFmt(Resp.outbuff,sP,Sel.Vec.pf,Sel.Vec.wf)+sizeof(Resp.Val)+1;
00525        Resp.Val            = 0;
00526        Arg.Request.rrCode  = kYR_data;
00527       }
00528 
00529 // Send off the response
00530 //
00531    Arg.Request.datalen = htons(bytes);
00532    ioV[1].iov_len      = bytes;
00533    Link->Send(ioV, 2, bytes+sizeof(Arg.Request));
00534    return 0;
00535 }
00536 
00537 /******************************************************************************/
00538 /* Static                      d o _ L o c F m t                              */
00539 /******************************************************************************/
00540   
00541 int XrdCmsNode::do_LocFmt(char *buff, XrdCmsSelected *sP,
00542                           SMask_t pfVec, SMask_t wfVec)
00543 {
00544    static const int Skip = (XrdCmsSelected::Disable | XrdCmsSelected::Offline);
00545    XrdCmsSelected *pP;
00546    char *oP = buff;
00547 
00548 // format out the request as follows:                   
00549 // 01234567810123456789212345678
00550 // xy[::123.123.123.123]:123456
00551 //
00552    while(sP)
00553         {if (!(sP->Status & Skip))
00554             {*oP     = (sP->Status & XrdCmsSelected::isMangr ? 'M' : 'S');
00555              if (sP->Mask & pfVec) *oP = tolower(*oP);
00556              *(oP+1) = (sP->Mask   & wfVec                   ? 'w' : 'r');
00557              strcpy(oP+2, sP->IPV6); oP += sP->IPV6Len + 2;
00558              if (sP->next) *oP++ = ' ';
00559             }
00560          pP = sP; sP = sP->next; delete pP;
00561         }
00562 
00563 // Send of the result
00564 //
00565    *oP = '\0';
00566    return (oP - buff);
00567 }
00568 
00569 /******************************************************************************/
00570 /*                              d o _ M k d i r                               */
00571 /******************************************************************************/
00572   
00573 // Mkdir requests are forwarded to all subscribers subject to an Xmi callout.
00574 //
00575 const char *XrdCmsNode::do_Mkdir(XrdCmsRRData &Arg)
00576 {
00577    EPNAME("do_Mkdir")
00578    mode_t mode = 0;
00579    int rc;
00580 
00581 // Do some debugging
00582 //
00583    DEBUGR("mode " <<Arg.Mode <<' ' <<Arg.Path);
00584 
00585 // If we have an Xmi then call it
00586 //
00587    if (Xmi_Mkdir)
00588       {XrdCmsReq Req(this, Arg.Request.streamid);
00589        if (!getMode(Arg.Mode, mode)) return "invalid mode";
00590           else if (Xmi_Mkdir->Mkdir(&Req, mode, Arg.Path, Arg.Opaque)) return 0;
00591       }
00592 
00593 // We are don here if we have no data; otherwise convert the mode if we
00594 // haven't done so already.
00595 //
00596    if (!Config.DiskOK) return 0;
00597    if (!mode && !getMode(Arg.Mode, mode)) return "invalid mode";
00598 
00599 // Attempt to create the directory either via call-out of oss plug-in
00600 //
00601    if (Config.ProgMD) rc = fsExec(Config.ProgMD, Arg.Mode, Arg.Path);
00602       else rc = Config.ossFS->Mkdir(Arg.Path, mode);
00603 
00604 // Return appropriate result
00605 //
00606    return (rc ? fsFail(Arg.Ident, "mkdir", Arg.Path, rc) : 0);
00607 }
00608 
00609 /******************************************************************************/
00610 /*                             d o _ M k p a t h                              */
00611 /******************************************************************************/
00612   
00613 // Mkpath requests are forwarded to all subscribers subjectto an Xmi callout.
00614 //
00615 const char *XrdCmsNode::do_Mkpath(XrdCmsRRData &Arg)
00616 {
00617    EPNAME("do_Mkpath")
00618    mode_t mode = 0;
00619    int rc;
00620 
00621 // Do some debugging
00622 //
00623    DEBUGR("mode " <<Arg.Mode <<' ' <<Arg.Path);
00624 
00625 // If we have an Xmi then call it
00626 //
00627    if (Xmi_Mkpath)
00628       {XrdCmsReq Req(this, Arg.Request.streamid);
00629        if (!getMode(Arg.Mode, mode)) return "invalid mode";
00630           else if (Xmi_Mkpath->Mkpath(&Req,mode,Arg.Path,Arg.Opaque)) return 0;
00631       }
00632 
00633 // We are don here if we have no data; otherwise convert the mode if we
00634 // haven't done so already.
00635 //
00636    if (!Config.DiskOK) return 0;
00637    if (!mode && !getMode(Arg.Mode, mode)) return "invalid mode";
00638 
00639 // Attempt to create the directory path via call-out or oss plugin
00640 //
00641    if (Config.ProgMP) rc = fsExec(Config.ProgMP, Arg.Mode, Arg.Path);
00642       else rc = Config.ossFS->Mkdir(Arg.Path, mode, 1);
00643 
00644 // Return appropriate result
00645 //
00646    return (rc ? fsFail(Arg.Ident, "mkpath", Arg.Path, rc) : 0);
00647 }
00648 
00649 /******************************************************************************/
00650 /*                                 d o _ M v                                  */
00651 /******************************************************************************/
00652   
00653 // Mv requests are forwarded to all subscribers subject to an Xmi callout.
00654 //
00655 const char *XrdCmsNode::do_Mv(XrdCmsRRData &Arg)
00656 {
00657    EPNAME("do_Mv")
00658    static const SMask_t allNodes(~0);
00659    int rc;
00660 
00661 // Do some debugging
00662 //
00663    DEBUGR(Arg.Path <<" to " <<Arg.Path2);
00664 
00665 // If we have an Xmi then call it
00666 //
00667    if (Xmi_Rename)
00668       {XrdCmsReq Req(this, Arg.Request.streamid, Arg.Request.modifier & kYR_dnf);
00669        if (Xmi_Rename->Rename(&Req,Arg.Path,Arg.Opaque,Arg.Path2,Arg.Opaque2))
00670           return 0;
00671       }
00672 
00673 // If we are not a server, if must remove references to the old and new names
00674 // from our cache. This is independent of how the raname is handled. We need
00675 // not back percolate the mv since it was hanled top down in the first place.
00676 // Note that we will scuttle the mv if the target file exists somewhere.
00677 //
00678    if (!Config.DiskOK)
00679       {XrdCmsSelect Sel1(XrdCmsSelect::Defer, Arg.Path, strlen(Arg.Path ));
00680        XrdCmsSelect Sel2(XrdCmsSelect::Defer, Arg.Path2,strlen(Arg.Path2));
00681 
00682        // Setup select data (note that mv does not allow fast redirect)
00683        //
00684        Sel2.iovP = 0; Sel2.iovN = 0;
00685        Sel2.InfoP = 0;  // No fast redirects
00686        Sel2.nmask = SMask_t(0);
00687 
00688        // Perform selection
00689        //
00690        if ((rc = Cluster.Select(Sel2)))
00691           {if (rc > 0) {Arg.waitVal = rc; return "!mv";}
00692               else if (Sel2.Vec.hf)
00693                      {Say.Emsg("do_Mv",Arg.Path2,"exists; mv failed for",Arg.Path);
00694                       return "target file exists";
00695                      }
00696           }
00697        Cache.DelFile(Sel2, allNodes);
00698        Cache.DelFile(Sel1, allNodes);
00699        return 0;
00700       }
00701   
00702 // Rename the file via call-out or oss plug-in (we used to do this via a requeue
00703 // to the local xrootd but it's no longer necessary).
00704 //
00705    if (Config.ProgMV) rc = fsExec(Config.ProgMV, Arg.Path, Arg.Path2);
00706       else rc = Config.ossFS->Rename(Arg.Path, Arg.Path2);
00707 
00708 // Return appropriate result
00709 //
00710    return (rc ? fsFail(Arg.Ident, "mv", Arg.Path, rc) : 0);
00711 }
00712 
00713 /******************************************************************************/
00714 /*                               d o _ P i n g                                */
00715 /******************************************************************************/
00716   
00717 // Ping requests from a manager are local to the cell and never propagated.
00718 //
00719 const char *XrdCmsNode::do_Ping(XrdCmsRRData &Arg)
00720 {
00721   static CmsPongRequest pongIt = {{0, kYR_pong, 0, 0}};
00722 
00723 // Process: ping
00724 // Respond: pong
00725 //
00726    Link->Send((char *)&pongIt, sizeof(pongIt));
00727    return 0;
00728 }
00729   
00730 /******************************************************************************/
00731 /*                               d o _ P o n g                                */
00732 /******************************************************************************/
00733   
00734 // Responses to a ping are local to the cell and never propagated.
00735 //
00736 const char *XrdCmsNode::do_Pong(XrdCmsRRData &Arg)
00737 {
00738 // Process: pong
00739 // Reponds: n/a
00740 
00741    return 0;
00742 }
00743   
00744 /******************************************************************************/
00745 /*                            d o _ P r e p A d d                             */
00746 /******************************************************************************/
00747   
00748 const char *XrdCmsNode::do_PrepAdd(XrdCmsRRData &Arg)
00749 {
00750    EPNAME("do_PrepAdd")
00751 
00752 // Do some debugging
00753 //
00754    DEBUGR("parms: " <<Arg.Reqid <<' ' <<Arg.Notify <<' ' <<Arg.Prty <<' '
00755                     <<Arg.Mode  <<' ' <<Arg.Path);
00756 
00757 // Do an Xmi callout if need be
00758 //
00759    if (Xmi_Prep
00760    &&  Xmi_Prep->Prep(Arg.Reqid,
00761                       Arg.Opts & CmsPrepAddRequest::kYR_write ? XMI_RW : 0,
00762                       Arg.Path, Arg.Opaque))
00763        return 0;
00764 
00765 // Queue this request for async processing
00766 //
00767    (new XrdCmsPrepArgs(Arg))->Queue();
00768    return 0;
00769 }
00770   
00771 /******************************************************************************/
00772 /*                            d o _ P r e p D e l                             */
00773 /******************************************************************************/
00774   
00775 const char *XrdCmsNode::do_PrepDel(XrdCmsRRData &Arg)
00776 {
00777    EPNAME("do_PrepDel")
00778 
00779 // Do some debugging
00780 //
00781    DEBUGR("reqid " <<Arg.Reqid);
00782 
00783 // Do a callout to the external manager if we have one
00784 //
00785    if (Xmi_Prep && Xmi_Prep->Prep(Arg.Reqid, XMI_CANCEL, "", 0)) return 0;
00786 
00787 // Cancel the request if applicable.
00788 //
00789    if (Config.DiskOK)
00790       {if (!Config.DiskSS) {DEBUGR("ignoring cancel prepare " <<Arg.Reqid);}
00791           else {DEBUGR("canceling prepare " <<Arg.Reqid);
00792                 PrepQ.Del(Arg.Reqid);
00793                }
00794       }
00795   return 0;
00796 }
00797   
00798 /******************************************************************************/
00799 /*                                 d o _ R m                                  */
00800 /******************************************************************************/
00801   
00802 // Rm requests are forwarded to all subscribers subject to an Xmi callout.
00803 //
00804 const char *XrdCmsNode::do_Rm(XrdCmsRRData &Arg)
00805 {
00806    EPNAME("do_Rm")
00807    static const SMask_t allNodes(~0);
00808    int rc;
00809 
00810 // Do some debugging
00811 //
00812    DEBUGR(Arg.Path);
00813 
00814 // If we have an Xmi then call it
00815 //
00816    if (Xmi_Remove)
00817       {XrdCmsReq Req(this, Arg.Request.streamid, Arg.Request.modifier & kYR_dnf);
00818        if (Xmi_Remove->Remove(&Req, Arg.Path, Arg.Opaque)) return 0;
00819       }
00820 
00821 // If we have no data then we should remove this file from our cache
00822 //
00823    if (!Config.DiskOK)
00824       {XrdCmsSelect Sel(0, Arg.Path, strlen(Arg.Path));
00825        Cache.DelFile(Sel, allNodes);
00826        return 0;
00827       }
00828   
00829 // Remove the file either via call-out or the oss plugin. We used to requeue
00830 // the request to the local xrootd but this is no longer needed.
00831 //
00832    if (Config.ProgRM) rc = fsExec(Config.ProgRM, Arg.Path);
00833       else rc = Config.ossFS->Unlink(Arg.Path);
00834 
00835 // Return appropriate result
00836 //
00837    return (rc ? fsFail(Arg.Ident, "rm", Arg.Path, rc) : 0);
00838 }
00839   
00840 /******************************************************************************/
00841 /*                              d o _ R m d i r                               */
00842 /******************************************************************************/
00843   
00844 // Rmdir requests are forwarded to all subscribers subject to an Xmi callout.
00845 //
00846 const char *XrdCmsNode::do_Rmdir(XrdCmsRRData &Arg)
00847 {
00848    EPNAME("do_Rmdir")
00849    static const SMask_t allNodes(~0);
00850    int rc;
00851 
00852 // Do some debugging
00853 //
00854    DEBUGR(Arg.Path);
00855 
00856 // If we have an Xmi then call it
00857 //
00858    if (Xmi_Remdir)
00859       {XrdCmsReq Req(this, Arg.Request.streamid, Arg.Request.modifier & kYR_dnf);
00860        if (Xmi_Remdir->Remdir(&Req, Arg.Path, Arg.Opaque)) return 0;
00861       }
00862 
00863 // If we have no data then we should remove this directory from our cache
00864 //
00865    if (!Config.DiskOK)
00866       {XrdCmsSelect Sel(0, Arg.Path, strlen(Arg.Path));
00867        Cache.DelFile(Sel, allNodes);
00868        return 0;
00869       }
00870   
00871 // Remove the directory either via call-out or the oss plug-in (we used to
00872 // do this by requeing the request to the local xrootd; no longer needed).
00873 //
00874    if (Config.ProgRD) rc = fsExec(Config.ProgRD, Arg.Path);
00875       else rc = Config.ossFS->Remdir(Arg.Path);
00876 
00877 // Return appropriate result
00878 //
00879    return (rc ? fsFail(Arg.Ident, "rmdir", Arg.Path, rc) : 0);
00880 }
00881   
00882 /******************************************************************************/
00883 /*                             d o _ S e l e c t                              */
00884 /******************************************************************************/
00885   
00886 // A select request comes from a redirector and is handled locally within the
00887 // cell. This may cause "state" requests to be broadcast to subscribers.
00888 //
00889 const char *XrdCmsNode::do_Select(XrdCmsRRData &Arg)
00890 {
00891    EPNAME("do_Select")
00892    XrdCmsRRQInfo reqInfo(Instance, RSlot, Arg.Request.streamid);
00893    XrdCmsSelect Sel(XrdCmsSelect::Peers, Arg.Path, Arg.PathLen-1);
00894    struct iovec ioV[2];
00895    char theopts[16], *Avoid, *toP = theopts;
00896    int rc, bytes;
00897 
00898 // Do a callout to the external manager if we have one
00899 //
00900    if (Arg.Opts & CmsSelectRequest::kYR_stat && Xmi_Stat)
00901       {XrdCmsReq Req(this, Arg.Request.streamid);
00902        if (Xmi_Stat->Stat(&Req, Arg.Path, Arg.Opaque)) return 0;
00903       } else 
00904    if (Xmi_Select)
00905       {XrdCmsReq Req(this, Arg.Request.streamid);
00906        int opts = (Arg.Opts & CmsSelectRequest::kYR_write ? XMI_RW : 0);
00907        if (Arg.Opts & CmsSelectRequest::kYR_create) opts |= XMI_NEW;
00908        if (Arg.Opts & CmsSelectRequest::kYR_trunc)  opts |= XMI_TRUNC;
00909        if (Xmi_Select->Select(&Req, opts, Arg.Path, Arg.Opaque)) return 0;
00910       }
00911 
00912 // Init select data (note that refresh supresses fast redirects)
00913 //
00914    Sel.iovP  = 0; Sel.iovN  = 0; Sel.InfoP = &reqInfo;
00915 
00916 // Complete the arguments to select
00917 //
00918          if (Arg.Opts & CmsSelectRequest::kYR_refresh) 
00919            {Sel.Opts |= XrdCmsSelect::Refresh;                     *toP++='s';}
00920          if (Arg.Opts & CmsSelectRequest::kYR_online)  
00921            {Sel.Opts |= XrdCmsSelect::Online;                      *toP++='o';}
00922          if (Arg.Opts & CmsSelectRequest::kYR_stat)
00923            {Sel.Opts |= XrdCmsSelect::noBind;                      *toP++='x';}
00924    else {if (Arg.Opts & CmsSelectRequest::kYR_trunc)   
00925            {Sel.Opts |= XrdCmsSelect::Write | XrdCmsSelect::Trunc; *toP++='t';}
00926          if (Arg.Opts & CmsSelectRequest::kYR_write)   
00927            {Sel.Opts |= XrdCmsSelect::Write;                       *toP++='w';}
00928          if (Arg.Opts & CmsSelectRequest::kYR_metaop)
00929            {Sel.Opts |= XrdCmsSelect::Write|XrdCmsSelect::isMeta;  *toP++='m';}
00930          if (Arg.Opts & CmsSelectRequest::kYR_create)  
00931            {Sel.Opts |= XrdCmsSelect::Write|XrdCmsSelect::NewFile; *toP++='c';
00932             if (Arg.Opts & CmsSelectRequest::kYR_replica)
00933                {Sel.Opts |= XrdCmsSelect::Replica;                 *toP++='+';}
00934            }
00935         }
00936    *toP = '\0';
00937 
00938 // Check if an avoid node present. If so, this is ineligible for fast redirect.
00939 //
00940    Sel.nmask = SMask_t(0);
00941    if ((Avoid = Arg.Avoid))
00942       {unsigned int IPaddr;
00943        char *Comma;
00944        DEBUGR(theopts <<' ' <<Arg.Path <<" avoiding " <<Avoid);
00945        Sel.InfoP = 0;
00946        do {if ((Comma = index(Avoid,','))) *Comma = '\0';
00947            if (*Avoid == '+') Sel.nmask |= Cluster.getMask(Avoid+1);
00948               else if (XrdNetDNS::Host2IP(Avoid, &IPaddr))
00949                               Sel.nmask |= Cluster.getMask(IPaddr);
00950            Avoid = Comma+1;
00951           } while(Comma && *Avoid);
00952       } else DEBUGR(theopts <<' ' <<Arg.Path);
00953 
00954 // Perform selection
00955 //
00956    if ((rc = Cluster.Select(Sel)))
00957       {if (rc > 0)
00958           {Arg.Request.rrCode = kYR_wait;
00959            Sel.Resp.Port      = rc;
00960            Sel.Resp.DLen      = 0;
00961            DEBUGR("delay " <<rc <<' ' <<Arg.Path);
00962           } else {
00963            Arg.Request.rrCode = kYR_error;
00964            Sel.Resp.Port      = kYR_ENOENT;
00965            DEBUGR("failed; " <<Sel.Resp.Data << ' ' <<Arg.Path);
00966           }
00967       } else if (!Sel.Resp.DLen) return 0;
00968                 else {Arg.Request.rrCode = kYR_redirect;
00969                       DEBUGR("Redirect -> " <<Sel.Resp.Data <<':'
00970                             <<Sel.Resp.Port <<" for " <<Arg.Path);
00971              }
00972 
00973 // Format the response
00974 //
00975    bytes               = Sel.Resp.DLen+sizeof(Sel.Resp.Port);
00976    Arg.Request.datalen = htons(bytes);
00977    Sel.Resp.Port       = htonl(Sel.Resp.Port);
00978 
00979 // Fill out the I/O vector
00980 //
00981    ioV[0].iov_base = (char *)&Arg.Request;
00982    ioV[0].iov_len  = sizeof(Arg.Request);
00983    ioV[1].iov_base = (char *)&Sel.Resp;
00984    ioV[1].iov_len  = bytes;
00985 
00986 // Send back the response
00987 //
00988    Link->Send(ioV, 2, bytes+sizeof(Arg.Request));
00989    return 0;
00990 }
00991   
00992 /******************************************************************************/
00993 /*                            d o _ S e l P r e p                             */
00994 /******************************************************************************/
00995   
00996 int XrdCmsNode::do_SelPrep(XrdCmsPrepArgs &Arg) // Static!!!
00997 {
00998    EPNAME("do_SelPrep")
00999    XrdCmsSelect Sel(XrdCmsSelect::Peers, Arg.path, Arg.pathlen-1);
01000    int rc;
01001 
01002 // Do a callout to the external manager if we have one
01003 //
01004    if (Xmi_Prep)
01005       {int opts = (Arg.options & CmsPrepAddRequest::kYR_write ? XMI_RW : 0);
01006        if (Xmi_Prep->Prep(Arg.reqid, opts, Arg.path, Arg.opaque)) return 0;
01007       }
01008 
01009 // Complete the arguments to select
01010 //
01011    if ( Arg.options & CmsPrepAddRequest::kYR_fresh)
01012       Sel.Opts |= XrdCmsSelect::Freshen;
01013    if ( Arg.options & CmsPrepAddRequest::kYR_write) 
01014       Sel.Opts |= XrdCmsSelect::Write;
01015    if (Arg.options & CmsPrepAddRequest::kYR_stage) 
01016            {Sel.iovP = Arg.ioV; Sel.iovN = Arg.iovNum;}
01017       else {Sel.iovP = 0;       Sel.iovN = 0;
01018             Sel.Opts |= XrdCmsSelect::Defer;
01019            }
01020 
01021 // Setup select data (note that prepare does not allow fast redirect)
01022 //
01023    Sel.InfoP = 0;  // No fast redirects
01024    Sel.nmask = SMask_t(0);
01025 
01026 // Check if co-location wanted relevant only when staging wanted
01027 //
01028    if (Arg.clPath && Sel.iovP)
01029       {XrdCmsSelect Scl(XrdCmsSelect::Peers, Arg.clPath, strlen(Arg.clPath));
01030        Scl.iovP = 0; Scl.iovN  = 0; Scl.InfoP = 0; Scl.nmask = SMask_t(0);
01031        DEBUGR("colocating " <<Arg.path <<" w.r.t. " <<Arg.clPath);
01032        rc = Cluster.Select(Scl);
01033        if (rc > 0) {Sched->Schedule((XrdJob *)&Arg, rc+time(0));
01034                     DEBUGR("coloc to " <<Arg.clPath <<" delayed " <<rc <<" seconds");
01035                     return 1;
01036                    }
01037        if (rc < 0) Say.Emsg("SelPrep", Arg.path, "failed;", Sel.Resp.Data);
01038           else Sel.nmask = ~Scl.smask;
01039       }
01040 
01041 // Perform selection
01042 //
01043    if ((rc = Cluster.Select(Sel)))
01044       {if (rc > 0)
01045           {if (!(Arg.options & CmsPrepAddRequest::kYR_stage)) return 0;
01046            Sched->Schedule((XrdJob *)&Arg, rc+time(0));
01047            DEBUGR("prep delayed " <<rc <<" seconds");
01048            return 1;
01049           }
01050        Say.Emsg("SelPrep", Arg.path, "failed;", Sel.Resp.Data);
01051        PrepQ.Inform("unavail", &Arg);
01052       }
01053 
01054 // All done
01055 //
01056    return 0;
01057 }
01058 
01059 /******************************************************************************/
01060 /*                              d o _ S p a c e                               */
01061 /******************************************************************************/
01062   
01063 // Manager space requests are local to the cell and never propagated.
01064 //
01065 const char *XrdCmsNode::do_Space(XrdCmsRRData &Arg)
01066 {
01067    EPNAME("do_Space")
01068    struct iovec xmsg[2];
01069    CmsAvailRequest mySpace = {{0, kYR_avail, 0, 0}};
01070    char         buff[sizeof(int)*2+2], *bp = buff;
01071    int blen, maxfr, tutil;
01072 
01073 // Process: <id> space
01074 // Respond: <id> avail <numkb> <dskutil>
01075 //
01076    maxfr = Meter.FreeSpace(tutil);
01077 
01078 // Do some debugging
01079 //
01080    DEBUGR(maxfr <<"MB free; " <<tutil <<"% util");
01081 
01082 // Construct a message to be sent to the manager.
01083 //
01084    blen  = XrdOucPup::Pack(&bp, maxfr);
01085    blen += XrdOucPup::Pack(&bp, tutil);
01086    mySpace.Hdr.datalen = htons(static_cast<unsigned short>(blen));
01087 
01088 // Send the response
01089 //
01090    if (Arg.Request.rrCode != kYR_space) Manager.Inform(mySpace.Hdr, buff, blen);
01091       else {xmsg[0].iov_base = (char *)&mySpace;
01092             xmsg[0].iov_len  = sizeof(mySpace);
01093             xmsg[1].iov_base = buff;
01094             xmsg[1].iov_len  = blen;
01095             mySpace.Hdr.datalen = htons(static_cast<unsigned short>(blen));
01096             Link->Send(xmsg, 2);
01097            }
01098    return 0;
01099 }
01100   
01101 /******************************************************************************/
01102 /*                              d o _ S t a t e                               */
01103 /******************************************************************************/
01104   
01105 // State requests from a manager are rebroadcast to all relevant subscribers.
01106 //
01107 const char *XrdCmsNode::do_State(XrdCmsRRData &Arg)
01108 {
01109    EPNAME("do_State")
01110    struct iovec xmsg[2];
01111    int noResp = Arg.Request.modifier & CmsStateRequest::kYR_noresp;
01112 
01113 // Do some debugging
01114 //
01115    TRACER(Files,Arg.Path);
01116 
01117 // Process: state <path>
01118 // Respond: have <path>
01119 //
01120    isKnown = 1;
01121 
01122 // If we are a manager then check for the file in the local cache. Otherwise,
01123 // if we have actual data, do a stat() on the file.
01124 //
01125    if (isMan) Arg.Request.modifier = do_StateFWD(Arg);
01126       else if (Config.DiskOK)
01127               Arg.Request.modifier = isOnline(Arg.Path, 0);
01128               else return 0;
01129 
01130 // Respond appropriately
01131 //
01132    if (Arg.Request.modifier && !noResp)
01133       {xmsg[0].iov_base      = (char *)&Arg.Request;
01134        xmsg[0].iov_len       = sizeof(Arg.Request);
01135        xmsg[1].iov_base      = Arg.Buff;
01136        xmsg[1].iov_len       = Arg.Dlen;
01137        Arg.Request.rrCode    = kYR_have;
01138        Arg.Request.modifier |= kYR_raw;
01139        Link->Send(xmsg, 2);
01140       }
01141    return 0;
01142 }
01143   
01144 /******************************************************************************/
01145 /*                           d o _ S t a t e F W D                            */
01146 /******************************************************************************/
01147   
01148 int XrdCmsNode::do_StateFWD(XrdCmsRRData &Arg)
01149 {
01150    EPNAME("do_StateFWD");
01151    XrdCmsSelect Sel(0, Arg.Path, Arg.PathLen-1);
01152    XrdCmsPInfo  pinfo;
01153    int retc;
01154 
01155 // Find out who could serve this file
01156 //
01157    if (!Cache.Paths.Find(Arg.Path, pinfo) || pinfo.rovec == 0)
01158       {DEBUGR("Path find failed for state " <<Arg.Path);
01159        return 0;
01160       }
01161 
01162 // Get the primary locations for this file
01163 //
01164    Sel.Vec.hf = Sel.Vec.pf = Sel.Vec.bf = 0;
01165    if (Arg.Request.modifier & CmsStateRequest::kYR_refresh) retc = 0;
01166       else retc = Cache.GetFile(Sel, pinfo.rovec);
01167 
01168 // If need be, ask the relevant nodes if they have the file.
01169 //
01170    if (!retc || Sel.Vec.bf != 0)
01171       {if (!retc) Cache.AddFile(Sel, 0);
01172        Cluster.Broadcast((retc ? Sel.Vec.bf : pinfo.rovec), Arg.Request,
01173                          (void *)Arg.Buff, Arg.Dlen);
01174       }
01175 
01176 // Return true if anyone has the file at this point
01177 //
01178    if (Sel.Vec.hf != 0) return CmsHaveRequest::Online;
01179    if (Sel.Vec.pf != 0) return CmsHaveRequest::Pending;
01180                         return 0;
01181 }
01182   
01183 /******************************************************************************/
01184 /*                             d o _ S t a t F S                              */
01185 /******************************************************************************/
01186   
01187 const char *XrdCmsNode::do_StatFS(XrdCmsRRData &Arg)
01188 {
01189    static kXR_unt32 Zero = 0;
01190    char         buff[256];
01191    struct iovec ioV[3] = {{(char *)&Arg.Request, sizeof(Arg.Request)},
01192                           {(char *)&Zero,        sizeof(Zero)},
01193                           {(char *)&buff,        0}};
01194    XrdCmsPInfo   pinfo;
01195    int           bytes;
01196    SpaceData     theSpace;
01197 
01198 // Find out who serves this path and get space relative to it
01199 //
01200    if (Cache.Paths.Find(Arg.Path, pinfo) && pinfo.rovec)
01201       {Cluster.Space(theSpace, pinfo.rovec);
01202        bytes = sprintf(buff, "%d %d %d %d %d %d",
01203                        theSpace.wNum, theSpace.wFree>>10, theSpace.wUtil,
01204                        theSpace.sNum, theSpace.sFree>>10, theSpace.sUtil) + 1;
01205       } else bytes = strlcpy(buff, "-1 -1 -1 -1 -1 -1", sizeof(buff)) + 1;
01206 
01207 // Send the response
01208 //
01209    ioV[2].iov_len      = bytes;
01210    bytes              += sizeof(Zero);
01211    Arg.Request.rrCode  = kYR_data;
01212    Arg.Request.datalen = htons(bytes);
01213    Link->Send(ioV, 3, bytes+sizeof(Arg.Request));
01214    return 0;
01215 }
01216 
01217 /******************************************************************************/
01218 /*                              d o _ S t a t s                               */
01219 /******************************************************************************/
01220   
01221 // We punt on stats requests as we have no way to export them anyway.
01222 //
01223 const char *XrdCmsNode::do_Stats(XrdCmsRRData &Arg)
01224 {
01225    static const unsigned short szLen = sizeof(kXR_unt32);
01226    static XrdSysMutex StatsData;
01227    static int         statsz = 0;
01228    static int         statln = 0;
01229    static char       *statbuff = 0;
01230    static time_t      statlast = 0;
01231    static kXR_unt32   theSize;
01232 
01233    struct iovec  ioV[3] = {{(char *)&Arg.Request, sizeof(Arg.Request)},
01234                            {(char *)&theSize,     sizeof(theSize)},
01235                            {0,                    0}
01236                           };
01237    time_t tNow;
01238 
01239 // Allocate buffer if we do not have one
01240 //
01241    StatsData.Lock();
01242    if (!statsz || !statbuff)
01243       {statsz   = Cluster.Stats(0,0);
01244        statbuff = (char *)malloc(statsz);
01245        theSize = htonl(statsz);
01246       }
01247 
01248 // Check if only the size is wanted
01249 //
01250    if (Arg.Request.modifier & CmsStatsRequest::kYR_size)
01251       {ioV[1].iov_len = sizeof(theSize);
01252        Arg.Request.datalen = htons(szLen);
01253        Arg.Request.rrCode  = kYR_data;
01254        Link->Send(ioV, 2);
01255        StatsData.UnLock();
01256        return 0;
01257       }
01258 
01259 // Get full statistics if enough time has passed
01260 //
01261    tNow = time(0);
01262    if (statlast+9 >= tNow)
01263       {statln = Cluster.Stats(statbuff, statsz); statlast = tNow;}
01264 
01265 // Format result and send response
01266 //
01267    ioV[2].iov_base = statbuff;
01268    ioV[2].iov_len  = statln;
01269    Arg.Request.datalen = htons(static_cast<unsigned short>(szLen+statln));
01270    Arg.Request.rrCode  = kYR_data;
01271    Link->Send(ioV, 3);
01272 
01273 // All done
01274 //
01275    StatsData.UnLock();
01276    return 0;
01277 }
01278 
01279 /******************************************************************************/
01280 /*                             d o _ S t a t u s                              */
01281 /******************************************************************************/
01282   
01283 // the reset request is propagated to all of our managers. A special reset case
01284 // is sent when a subscribed supervisor adds a new node. This causes all cache
01285 // lines for the supervisor to be marked suspect. Status change requests are
01286 // propagated to upper-level managers only if the summary state has changed.
01287 //
01288 const char *XrdCmsNode::do_Status(XrdCmsRRData &Arg)
01289 {
01290    EPNAME("do_Status")
01291    const char *srvMsg, *stgMsg;
01292    int   Stage = Arg.Request.modifier & CmsStatusRequest::kYR_Stage;
01293    int noStage = Arg.Request.modifier & CmsStatusRequest::kYR_noStage;
01294    int Resume  = Arg.Request.modifier & CmsStatusRequest::kYR_Resume;
01295    int Suspend = Arg.Request.modifier & CmsStatusRequest::kYR_Suspend;
01296    int Reset   = Arg.Request.modifier & CmsStatusRequest::kYR_Reset;
01297    int add2Activ, add2Stage;
01298 
01299 // Do some debugging
01300 //
01301    DEBUGR(  (Reset  ? "reset " : "")
01302           <<(Resume ? "resume " : (Suspend ? "suspend " : ""))
01303           <<(Stage  ? "stage "  : (noStage ? "nostage " : "")));
01304 
01305 // Process reset requests. These are exclsuive to any other request
01306 //
01307    if (Reset)
01308       {Manager.Reset();                // Propagate the reset to our managers
01309        Cache.Bounce(NodeMask, NodeID); // Now invalidate our cache lines
01310       }
01311 
01312 // Process stage/nostage
01313 //
01314     if ((Stage && isNoStage) || (noStage && !isNoStage))
01315        if (noStage) {add2Stage = -1; isNoStage = 1; stgMsg="staging suspended";}
01316           else      {add2Stage =  1; isNoStage = 0; stgMsg="staging resumed";}
01317        else         {add2Stage =  0;                stgMsg = 0;}
01318 
01319 // Process suspend/resume
01320 //
01321     if ((Resume && isSuspend) || (Suspend && !isSuspend))
01322        if (Suspend) {add2Activ = -1; isSuspend = 1;
01323                      srvMsg="service suspended"; 
01324                      stgMsg = 0;
01325                     }
01326           else      {add2Activ =  1; isSuspend = 0;
01327                      srvMsg="service resumed";
01328                      stgMsg = (isNoStage ? "(no staging)" : "(staging)");
01329                      Port = ntohl(Arg.Request.streamid);
01330                      DEBUGR("set data port to " <<Port);
01331                     }
01332        else         {add2Activ =  0; srvMsg = 0;}
01333 
01334 // Get the most important message out
01335 //
01336    if (isOffline)         {srvMsg = "service offline";  stgMsg = 0;}
01337       else if (isDisable) {srvMsg = "service disabled"; stgMsg = 0;}
01338 
01339 // Now see if we need to change anything
01340 //
01341    if (add2Activ || add2Stage)
01342        {CmsState.Update(XrdCmsState::Counts, add2Activ, add2Stage);
01343         Say.Emsg("Node", Name(), srvMsg, stgMsg);
01344        }
01345 
01346    return 0;
01347 }
01348 
01349 /******************************************************************************/
01350 /*                              d o _ T r u n c                               */
01351 /******************************************************************************/
01352   
01353 // Trunc requests are forwarded to all subscribers subject to an Xmi callout.
01354 // Currently, we have no definition in Xmi for trunc!
01355 //
01356 const char *XrdCmsNode::do_Trunc(XrdCmsRRData &Arg)
01357 {
01358    EPNAME("do_Trunc")
01359    long long Size = -1;
01360    int rc;
01361 
01362 // Do some debugging
01363 //
01364    DEBUGR("size " <<Arg.Mode <<' ' <<Arg.Path);
01365 
01366 // If we have an Xmi then call it
01367 //
01368 // if (Xmi_Trunc)
01369 //    {XrdCmsReq Req(this, Arg.Request.streamid);
01370 //     if (!getSize(Arg.Mode, Size)) return "invalid size";
01371 //        else if (Xmi_Trunc->Trunc(&Req, Size, Arg.Path, Arg.Opaque)) return 0;
01372 //    }
01373 
01374 // We are don here if we have no data; otherwise convert the mode if we
01375 // haven't done so already.
01376 //
01377    if (!Config.DiskOK) return 0;
01378    if (Size < 0 && !getSize(Arg.Mode, Size)) return "invalid size";
01379 
01380 // Attempt to change the size either via call-out or the oss plug-in
01381 //
01382    if (Config.ProgTR) rc = fsExec(Config.ProgTR, Arg.Mode, Arg.Path);
01383       else rc = Config.ossFS->Truncate(Arg.Path, Size);
01384 
01385 // Return appropriate result
01386 //
01387    return (rc ? fsFail(Arg.Ident, "trunc", Arg.Path, rc) : 0);
01388 }
01389 
01390 /******************************************************************************/
01391 /*                                d o _ T r y                                 */
01392 /******************************************************************************/
01393 
01394 // Try requests from a manager indicate that we are being displaced and should
01395 // hunt for another manager. The request provides hints as to where to try.
01396 //
01397 const char *XrdCmsNode::do_Try(XrdCmsRRData &Arg)
01398 {
01399    EPNAME("do_Try")
01400    XrdOucTokenizer theList(Arg.Path);
01401    char *tp;
01402 
01403 // Do somde debugging
01404 //
01405    DEBUGR(Arg.Path);
01406 
01407 // Delete any additions from this manager
01408 //
01409    myMans.Del(IPAddr);
01410 
01411 // Add all the alternates to our alternate list
01412 //
01413    tp = theList.GetLine();
01414    while((tp = theList.GetToken()))
01415          myMans.Add(IPAddr, tp, Config.PortTCP, myLevel);
01416 
01417 // Close the link and return an error
01418 //
01419 // Disc("redirected.");
01420    return ".redirected";
01421 }
01422   
01423 /******************************************************************************/
01424 /*                             d o _ U p d a t e                              */
01425 /******************************************************************************/
01426   
01427 const char *XrdCmsNode::do_Update(XrdCmsRRData &Arg)
01428 {
01429 
01430 // Process: <id> update
01431 // Respond: <id> status
01432 //
01433    CmsState.sendState(Link);
01434    return 0;
01435 }
01436   
01437 /******************************************************************************/
01438 /*                              d o _ U s a g e                               */
01439 /******************************************************************************/
01440   
01441 // Usage requests from a manager are local to the cell and never propagated.
01442 //
01443 const char *XrdCmsNode::do_Usage(XrdCmsRRData &Arg)
01444 {
01445 
01446 // Process: <id> usage
01447 // Respond: <id> load <cpu> <io> <load> <mem> <pag> <dskfree> <dskutil>
01448 //
01449    Report_Usage(Link);
01450    return 0;
01451 }
01452 
01453 /******************************************************************************/
01454 /*                          R e p o r t _ U s a g e                           */
01455 /******************************************************************************/
01456   
01457 void XrdCmsNode::Report_Usage(XrdLink *lp)   // Static!
01458 {
01459    EPNAME("Report_Usage")
01460    CmsLoadRequest myLoad = {{0, kYR_load, 0, 0}};
01461    struct iovec xmsg[2];
01462    char loadbuff[CmsLoadRequest::numLoad];
01463    char respbuff[sizeof(loadbuff)+2+sizeof(int)+2], *bp = respbuff;
01464    int  blen, maxfr, pcpu, pnet, pxeq, pmem, ppag, pdsk;
01465 
01466 // Respond: <id> load <cpu> <io> <load> <mem> <pag> <dskfree> <dskutil>
01467 //
01468    maxfr = Meter.Report(pcpu, pnet, pxeq, pmem, ppag, pdsk);
01469 
01470    loadbuff[CmsLoadRequest::cpuLoad] = static_cast<char>(pcpu);
01471    loadbuff[CmsLoadRequest::netLoad] = static_cast<char>(pnet);
01472    loadbuff[CmsLoadRequest::xeqLoad] = static_cast<char>(pxeq);
01473    loadbuff[CmsLoadRequest::memLoad] = static_cast<char>(pmem);
01474    loadbuff[CmsLoadRequest::pagLoad] = static_cast<char>(ppag);
01475    loadbuff[CmsLoadRequest::dskLoad] = static_cast<char>(pdsk);
01476 
01477    blen  = XrdOucPup::Pack(&bp, loadbuff, sizeof(loadbuff));
01478    blen += XrdOucPup::Pack(&bp, maxfr);
01479    myLoad.Hdr.datalen = htons(static_cast<unsigned short>(blen));
01480 
01481    xmsg[0].iov_base = (char *)&myLoad;
01482    xmsg[0].iov_len  = sizeof(myLoad);
01483    xmsg[1].iov_base = respbuff;
01484    xmsg[1].iov_len  = blen;
01485    if (lp) lp->Send(xmsg, 2);
01486       else Manager.Inform("usage", xmsg, 2);
01487 
01488 // Do some debugging
01489 //
01490    DEBUG("cpu=" <<pcpu <<" net=" <<pnet <<" xeq=" <<pxeq
01491       <<" mem=" <<pmem <<" pag=" <<ppag <<" dsk=" <<pdsk <<' ' <<maxfr);
01492 }
01493   
01494 /******************************************************************************/
01495 /*                             S y n c S p a c e                              */
01496 /******************************************************************************/
01497 
01498 void XrdCmsNode::SyncSpace()
01499 {
01500    XrdCmsRRData Arg;
01501    int          old_free = 0;
01502 
01503 // For newly logged in nodes, we need to sync the free space stats
01504 //
01505    mlMutex.Lock();
01506    if (isRW && DiskFree > LastFree)
01507       {old_free = LastFree; LastFree = DiskFree;}
01508    mlMutex.UnLock();
01509 
01510 // Tell our manager if we now have more space, if need be.
01511 //
01512    if (!old_free)
01513       {Arg.Request.rrCode = kYR_login;
01514        Arg.Ident   = Ident;
01515        Arg.dskFree = DiskFree;
01516        Arg.dskUtil = DiskUtil;
01517        do_Space(Arg);
01518       }
01519 }
01520   
01521 /******************************************************************************/
01522 /*                       P r i v a t e   M e t h o d s                        */
01523 /******************************************************************************/
01524 /******************************************************************************/
01525 /*                                f s E x e c                                 */
01526 /******************************************************************************/
01527 
01528 int XrdCmsNode::fsExec(XrdOucProg *Prog, char *Arg1, char *Arg2)
01529 {
01530    static const int PfnSZ = XrdCmsMAX_PATH_LEN+1;
01531    char Pfn1[PfnSZ], Pfn2[PfnSZ];
01532 
01533 // The first argument may or may not be a path. The second, if present is always
01534 // a path. If we have a name mapper then we need to substitute remapped paths.
01535 //
01536    if (Config.lcl_N2N)
01537       {if (*Arg1 == '/')
01538           {if (Config.lcl_N2N->lfn2pfn(Arg1,Pfn1,PfnSZ-1)) return fsL2PFail1;
01539            Arg1 = Pfn1;
01540           }
01541        if ( Arg2)
01542           {if (Config.lcl_N2N->lfn2pfn(Arg2,Pfn2,PfnSZ-1)) return fsL2PFail2;
01543            Arg2 = Pfn2;
01544           }
01545       }
01546 
01547 // Return results of the call-out
01548 //
01549    return Prog->Run(Arg1, Arg2);
01550 }
01551   
01552 /******************************************************************************/
01553 /*                                f s F a i l                                 */
01554 /******************************************************************************/
01555   
01556 const char *XrdCmsNode::fsFail(const char *Who,  const char *What,
01557                                const char *Path, int rc)
01558 {
01559    EPNAME("fsFail")
01560 
01561 // Immediately return on the two most unlikely failures; o/w issue message
01562 //
01563    rc = abs(rc);
01564    if (rc == fsL2PFail1) return "lfn2pfn path1 failed";
01565    if (rc == fsL2PFail2) return "lfn2pfn path2 failed";
01566    if (rc != ENOENT) Say.Emsg("Node", rc, What, Path);
01567       else {struct {const char *Ident;} Arg = {Who};
01568             DEBUGR("rc=" <<rc <<' ' <<What <<' ' <<Path);
01569            }
01570    return rc ? strerror(rc) : 0;
01571 }
01572 
01573 /******************************************************************************/
01574 /*                               g e t M o d e                                */
01575 /******************************************************************************/
01576 
01577 int XrdCmsNode::getMode(const char *theMode, mode_t &Mode)
01578 {
01579    char *eP;
01580 
01581 // Convert the mode argument
01582 //
01583    if (!(Mode = strtol(theMode, &eP, 8)) || *eP || (Mode >> 9)) return 0;
01584    return 1;
01585 }
01586 
01587 /******************************************************************************/
01588 /*                               g e t S i z e                                */
01589 /******************************************************************************/
01590 
01591 int XrdCmsNode::getSize(const char *theSize, long long &Size)
01592 {
01593    char *eP;
01594 
01595 // Convert the size argument
01596 //
01597    if (!(Size = strtoll(theSize, &eP, 10)) || *eP) return 0;
01598    return 1;
01599 }
01600   
01601 /******************************************************************************/
01602 /*                              i s O n l i n e                               */
01603 /******************************************************************************/
01604   
01605 int XrdCmsNode::isOnline(char *path, int upt) // Static!!!
01606 {
01607    static const int Sopts = XRDOSS_resonly | XRDOSS_updtatm;
01608    struct stat buf;
01609 
01610 // Issue stat() via oss plugin. If it fails. the file may still exist in the
01611 // prepare queue (i.e., logically present).
01612 //
01613    if (Config.ossFS->Stat(path, &buf, Sopts))
01614       {if (Config.DiskSS && PrepQ.Exists(path)) return CmsHaveRequest::Pending;
01615           else return 0;
01616       }
01617 
01618 // Determine what to return
01619 // the update if an oss plugin exists as we don't have a way of doing this then.
01620 //
01621    if ((buf.st_mode & S_IFMT) == S_IFREG)
01622        return (buf.st_mode & S_ISUID ? CmsHaveRequest::Pending
01623                                      : CmsHaveRequest::Online);
01624 
01625    return (buf.st_mode & S_IFMT) == S_IFDIR ? CmsHaveRequest::Online : 0;
01626 }

Generated on Tue Jul 5 14:46:31 2011 for ROOT_528-00b_version by  doxygen 1.5.1