XrdCmsFinder.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                       X r d C m s F i n d e r . c c                        */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdCmsFinder.cc 38011 2011-02-08 18:35:57Z ganis $
00012 
00013 const char *XrdCmsFinderCVSID = "$Id: XrdCmsFinder.cc 38011 2011-02-08 18:35:57Z ganis $";
00014 
00015 #include <stdlib.h>
00016 #include <stdio.h>
00017 #include <errno.h>
00018 #include <fcntl.h>
00019 #include <limits.h>
00020 #include <signal.h>
00021 #include <strings.h>
00022 #include <time.h>
00023 #include <unistd.h>
00024 #include <sys/shm.h>
00025 #include <sys/socket.h>
00026 #include <sys/stat.h>
00027 #include <sys/times.h>
00028 #include <sys/types.h>
00029 #include <sys/uio.h>
00030 #include <sys/un.h>
00031 #include <sys/wait.h>
00032 #include <netinet/in.h>
00033 #include <inttypes.h>
00034   
00035 #include "XProtocol/YProtocol.hh"
00036 
00037 #include "XrdCms/XrdCmsClientConfig.hh"
00038 #include "XrdCms/XrdCmsClientMan.hh"
00039 #include "XrdCms/XrdCmsClientMsg.hh"
00040 
00041 #include "XrdCms/XrdCmsFinder.hh"
00042 #include "XrdCms/XrdCmsParser.hh"
00043 #include "XrdCms/XrdCmsResp.hh"
00044 #include "XrdCms/XrdCmsRRData.hh"
00045 #include "XrdCms/XrdCmsTrace.hh"
00046 
00047 #include "XrdOss/XrdOss.hh"
00048 
00049 #include "XrdOuc/XrdOucEnv.hh"
00050 #include "XrdOuc/XrdOucErrInfo.hh"
00051 #include "XrdOuc/XrdOucReqID.hh"
00052 #include "XrdOuc/XrdOucStream.hh"
00053 #include "XrdOuc/XrdOucUtils.hh"
00054 #include "XrdNet/XrdNetDNS.hh"
00055 #include "XrdNet/XrdNetOpts.hh"
00056 #include "XrdNet/XrdNetSocket.hh"
00057 #include "XrdSfs/XrdSfsInterface.hh"
00058 
00059 #include "XrdSys/XrdSysError.hh"
00060 #include "XrdSys/XrdSysTimer.hh"
00061 #include "XrdSys/XrdSysPlatform.hh"
00062 
00063 using namespace XrdCms;
00064 
00065 /******************************************************************************/
00066 /*                               G l o b a l s                                */
00067 /******************************************************************************/
00068 
00069 namespace XrdCms
00070 {
00071 XrdSysError  Say(0, "cms_");
00072   
00073 XrdOucTrace  Trace(&Say);
00074 };
00075 
00076 /******************************************************************************/
00077 /*                         R e m o t e   F i n d e r                          */
00078 /******************************************************************************/
00079 /******************************************************************************/
00080 /*                           C o n s t r u c t o r                            */
00081 /******************************************************************************/
00082   
00083 XrdCmsFinderRMT::XrdCmsFinderRMT(XrdSysLogger *lp, int whoami, int Port)
00084                : XrdCmsClient((whoami & IsProxy
00085                                       ? XrdCmsClient::amProxy
00086                                       : XrdCmsClient::amRemote))
00087 {
00088      myManagers  = 0;
00089      myManCount  = 0;
00090      myPort      = Port;
00091      SMode       = 0;
00092      sendID      = 0;
00093      isMeta      = whoami & IsMeta;
00094      isTarget    = whoami & IsTarget;
00095      Say.logger(lp);
00096 }
00097  
00098 /******************************************************************************/
00099 /*                            D e s t r u c t o r                             */
00100 /******************************************************************************/
00101 
00102 XrdCmsFinderRMT::~XrdCmsFinderRMT()
00103 {
00104     XrdCmsClientMan *mp, *nmp = myManagers;
00105 
00106     while((mp = nmp)) {nmp = mp->nextManager(); delete mp;}
00107 }
00108 
00109 /******************************************************************************/
00110 /*                             C o n f i g u r e                              */
00111 /******************************************************************************/
00112   
00113 int XrdCmsFinderRMT::Configure(char *cfn)
00114 {
00115    XrdCmsClientConfig             config;
00116    XrdCmsClientConfig::configHow  How;
00117    XrdCmsClientConfig::configWhat What;
00118 
00119 // Establish what we will be configuring
00120 //
00121    if (myPersona == XrdCmsClient::amProxy) 
00122       How = XrdCmsClientConfig::configProxy;
00123       else if (isMeta) How = XrdCmsClientConfig::configMeta;
00124               else     How = XrdCmsClientConfig::configNorm;
00125    What = (isTarget ? XrdCmsClientConfig::configSuper
00126                     : XrdCmsClientConfig::configMan);
00127 
00128 // Set the error dest and simply call the configration object
00129 //
00130    if (config.Configure(cfn, What, How)) return 0;
00131    XrdCmsClientMan::setConfig(cfn);
00132 
00133 // Set configured values and start the managers
00134 //
00135    CMSPath    = config.CMSPath;
00136    RepDelay   = config.RepDelay;
00137    RepNone    = config.RepNone;
00138    RepWait    = config.RepWait;
00139    ConWait    = config.ConWait;
00140    PrepWait   = config.PrepWait;
00141    if (myPersona == XrdCmsClient::amProxy)
00142            {SMode = config.SModeP; StartManagers(config.PanList);}
00143       else {SMode = config.SMode;  StartManagers(config.ManList);}
00144 
00145 // If we are a plain manager but have a meta manager then we must start
00146 // a responder (that we will hide) to pass through the port number.
00147 //
00148    if (!isMeta && !isTarget && config.haveMeta)
00149       {XrdCmsFinderTRG *Rsp = new XrdCmsFinderTRG(Say.logger(),IsRedir,myPort);
00150        return Rsp->RunAdmin(CMSPath);
00151       }
00152 
00153 // All done
00154 //
00155    return 1;
00156 }
00157 
00158 /******************************************************************************/
00159 /*                               F o r w a r d                                */
00160 /******************************************************************************/
00161 
00162 int XrdCmsFinderRMT::Forward(XrdOucErrInfo &Resp, const char *cmd, 
00163                              const char *arg1,    const char *arg2,
00164                              const char *arg3,    const char *arg4)
00165 {
00166    static const int xNum   = 12;
00167 
00168    XrdCmsClientMan *Manp;
00169    XrdCmsRRData     Data;
00170    int              iovcnt, is2way, doAll = 0;
00171    char             Work[xNum*12];
00172    struct iovec     xmsg[xNum];
00173 
00174 // Encode the request as a redirector command
00175 //
00176    if ((is2way = (*cmd == '+'))) cmd++;
00177 
00178         if (!strcmp("chmod", cmd)) Data.Request.rrCode = kYR_chmod;
00179    else if (!strcmp("mkdir", cmd)) Data.Request.rrCode = kYR_mkdir;
00180    else if (!strcmp("mkpath",cmd)) Data.Request.rrCode = kYR_mkpath;
00181    else if (!strcmp("mv",    cmd)){Data.Request.rrCode = kYR_mv;    doAll=1;}
00182    else if (!strcmp("rm",    cmd)){Data.Request.rrCode = kYR_rm;    doAll=1;}
00183    else if (!strcmp("rmdir", cmd)){Data.Request.rrCode = kYR_rmdir; doAll=1;}
00184    else if (!strcmp("trunc", cmd)) Data.Request.rrCode = kYR_trunc;
00185    else {Say.Emsg("Finder", "Unable to forward '", cmd, "'.");
00186          Resp.setErrInfo(EINVAL, "Internal error processing file.");
00187          return -EINVAL;
00188         }
00189 
00190 // Fill out the RR data structure
00191 //
00192    Data.Ident   = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : "");
00193    Data.Path    = (char *)arg1;
00194    Data.Mode    = (char *)arg2;
00195    Data.Path2   = (char *)arg2;
00196    Data.Opaque  = (char *)arg3;
00197    Data.Opaque2 = (char *)arg4;
00198 
00199 // Pack the arguments
00200 //
00201    if (!(iovcnt = Parser.Pack(int(Data.Request.rrCode), &xmsg[1], &xmsg[xNum],
00202                               (char *)&Data, Work)))
00203       {Resp.setErrInfo(EINVAL, "Internal error processing file.");
00204        return -EINVAL;
00205       }
00206 
00207 // Insert the header into the stream
00208 //
00209    Data.Request.streamid = 0;
00210    Data.Request.modifier = 0;
00211    xmsg[0].iov_base      = (char *)&Data.Request;
00212    xmsg[0].iov_len       = sizeof(Data.Request);
00213 
00214 // This may be a 2way message. If so, use the longer path.
00215 //
00216    if (is2way) return send2Man(Resp, (arg1 ? arg1 : "/"), xmsg, iovcnt+1);
00217 
00218 // Select the right manager for this request
00219 //
00220    if (!(Manp = SelectManager(Resp, (arg1 ? arg1 : "/")))) return ConWait;
00221 
00222 // Send message and simply wait for the reply
00223 //
00224    if (Manp->Send(xmsg, iovcnt+1))
00225       {if (doAll && !is2way)
00226           {Data.Request.modifier |= kYR_dnf;
00227            Inform(Manp, xmsg, iovcnt+1);
00228           }
00229        return 0;
00230       }
00231 
00232 // Indicate client should retry later
00233 //
00234    Resp.setErrInfo(RepDelay, "");
00235    return RepDelay;
00236 }
00237   
00238 /******************************************************************************/
00239 /*                                I n f o r m                                 */
00240 /******************************************************************************/
00241   
00242 void XrdCmsFinderRMT::Inform(XrdCmsClientMan *xman,
00243                              struct iovec     xmsg[], int xnum)
00244 {
00245    XrdCmsClientMan *Womp, *Manp;
00246 
00247 // Make sure we are configured
00248 //
00249    if (!myManagers)
00250       {Say.Emsg("Finder", "SelectManager() called prior to Configure().");
00251        return;
00252       }
00253 
00254 // Start at the beginning (we will avoid the previously selected one)
00255 //
00256    Womp = Manp = myManagers;
00257    do {if (Manp != xman && Manp->isActive()) Manp->Send(xmsg, xnum);
00258       } while((Manp = Manp->nextManager()) != Womp);
00259 }
00260   
00261 /******************************************************************************/
00262 /*                                L o c a t e                                 */
00263 /******************************************************************************/
00264   
00265 int XrdCmsFinderRMT::Locate(XrdOucErrInfo &Resp, const char *path, int flags,
00266                             XrdOucEnv *Env)
00267 {
00268    static const int xNum   = 12;
00269 
00270    XrdCmsRRData   Data;
00271    int            n, iovcnt;
00272    char           Work[xNum*12];
00273    struct iovec   xmsg[xNum];
00274 
00275 // Fill out the RR data structure
00276 //
00277    Data.Ident   = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : "");
00278    Data.Path    = (char *)path;
00279    Data.Opaque  = (Env ? Env->Env(n)       : 0);
00280    Data.Avoid   = (Env ? Env->Get("tried") : 0);
00281 
00282 // Set options and command
00283 //
00284    if (flags & SFS_O_LOCATE)
00285       {Data.Request.rrCode = kYR_locate;
00286        Data.Opts = (flags & SFS_O_NOWAIT ? CmsLocateRequest::kYR_asap    : 0)
00287                  | (flags & SFS_O_RESET  ? CmsLocateRequest::kYR_refresh : 0);
00288       } else
00289   {     Data.Request.rrCode = kYR_select;
00290         if (flags & SFS_O_TRUNC) Data.Opts = CmsSelectRequest::kYR_trunc;
00291    else if (flags & SFS_O_CREAT)
00292            {   Data.Opts = CmsSelectRequest::kYR_create;
00293             if (flags & SFS_O_REPLICA)
00294                Data.Opts|= CmsSelectRequest::kYR_replica;
00295            }
00296    else if (flags & SFS_O_STAT)  Data.Opts = CmsSelectRequest::kYR_stat;
00297    else                          Data.Opts = 0;
00298 
00299    Data.Opts |= (flags & (SFS_O_WRONLY | SFS_O_RDWR)
00300               ? CmsSelectRequest::kYR_write : CmsSelectRequest::kYR_read);
00301 
00302    if (flags & SFS_O_META)      Data.Opts  |= CmsSelectRequest::kYR_metaop;
00303 
00304    if (flags & SFS_O_NOWAIT)    Data.Opts  |= CmsSelectRequest::kYR_online;
00305 
00306    if (flags & SFS_O_RESET)     Data.Opts  |= CmsSelectRequest::kYR_refresh;
00307   }
00308 
00309 // Pack the arguments
00310 //
00311    if (!(iovcnt = Parser.Pack(int(Data.Request.rrCode), &xmsg[1], &xmsg[xNum],
00312                               (char *)&Data, Work)))
00313       {Resp.setErrInfo(EINVAL, "Internal error processing file.");
00314        return -EINVAL;
00315       }
00316 
00317 // Insert the header into the stream
00318 //
00319    Data.Request.streamid = 0;
00320    Data.Request.modifier = 0;
00321    xmsg[0].iov_base      = (char *)&Data.Request;
00322    xmsg[0].iov_len       = sizeof(Data.Request);
00323 
00324 // Send the 2way message
00325 //
00326    return send2Man(Resp, path, xmsg, iovcnt+1);
00327 }
00328   
00329 /******************************************************************************/
00330 /*                               P r e p a r e                                */
00331 /******************************************************************************/
00332   
00333 int XrdCmsFinderRMT::Prepare(XrdOucErrInfo &Resp, XrdSfsPrep &pargs)
00334 {
00335    EPNAME("Prepare")
00336    static const int   xNum   = 16;
00337    static XrdSysMutex prepMutex;
00338 
00339    XrdCmsRRData       Data;
00340    XrdOucTList       *tp, *op;
00341    XrdCmsClientMan   *Manp = 0;
00342 
00343    int                iovcnt = 0, NoteLen, n;
00344    char               Prty[1032], *NoteNum = 0, *colocp = 0;
00345    char               Work[xNum*12];
00346    struct iovec       xmsg[xNum];
00347 
00348 // Prefill the RR data structure and iovec
00349 //
00350    Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : "");
00351    Data.Reqid = pargs.reqid;
00352    Data.Request.streamid = 0;
00353    Data.Request.modifier = 0;
00354    xmsg[0].iov_base = (char *)&Data.Request;
00355    xmsg[0].iov_len  = sizeof(Data.Request);
00356 
00357 // Check for a cancel request
00358 //
00359    if (!(tp = pargs.paths))
00360       {Data.Request.rrCode = kYR_prepdel;
00361        if (!(iovcnt = Parser.Pack(kYR_prepdel, &xmsg[1], &xmsg[xNum],
00362                                  (char *)&Data, Work)))
00363           {Resp.setErrInfo(EINVAL, "Internal error processing file.");
00364            return -EINVAL;
00365           }
00366        if (!(Manp = SelectManager(Resp, 0))) return ConWait;
00367        if (Manp->Send((const struct iovec *)&xmsg, iovcnt+1)) return 0;
00368        DEBUG("Finder: Failed to send prepare cancel to " 
00369              <<Manp->Name() <<" reqid=" <<pargs.reqid);
00370        Resp.setErrInfo(RepDelay, "");
00371        return RepDelay;
00372       }
00373 
00374 // Set prepadd options
00375 //
00376    Data.Request.modifier =
00377                (pargs.opts & Prep_STAGE ? CmsPrepAddRequest::kYR_stage : 0)
00378              | (pargs.opts & Prep_WMODE ? CmsPrepAddRequest::kYR_write : 0)
00379              | (pargs.opts & Prep_FRESH ? CmsPrepAddRequest::kYR_fresh : 0);
00380 
00381 // Set coloc information if staging wanted and there are atleast two paths
00382 //
00383 
00384 // Set the prepadd mode
00385 //
00386    if (!pargs.notify || !(pargs.opts & Prep_SENDACK))
00387       {Data.Mode   = (char *)(pargs.opts & Prep_WMODE ? "wq" : "rq");
00388        Data.Notify = (char *)"*";
00389        NoteNum     = 0;
00390       } else {
00391        NoteLen      = strlen(pargs.notify);
00392        Data.Notify  = (char *)malloc(NoteLen+16);
00393        strcpy(Data.Notify, pargs.notify);
00394        NoteNum = Data.Notify+NoteLen; *NoteNum++ = '-';
00395        if (pargs.opts & Prep_SENDERR) 
00396                Data.Mode = (char *)(pargs.opts & Prep_WMODE ? "wn"  : "rn");
00397           else Data.Mode = (char *)(pargs.opts & Prep_WMODE ? "wnq" : "rnq");
00398       }
00399 
00400 // Set the priority (if co-locate, add in the co-locate path)
00401 //
00402    n = sprintf(Prty, "%d", (pargs.opts & Prep_PMASK));
00403    if (pargs.opts & Prep_STAGE && pargs.opts & Prep_COLOC
00404    &&  pargs.paths && pargs.paths->next) 
00405       {colocp = Prty + n;
00406        strlcpy(colocp+1, pargs.paths->text, sizeof(Prty)-n-1);
00407       }
00408    Data.Prty = Prty;
00409 
00410 // Distribute out paths to the various managers
00411 //
00412    Data.Request.rrCode = kYR_prepadd;
00413    op = pargs.oinfo;
00414    while(tp)
00415         {if (NoteNum) sprintf(NoteNum, "%d", tp->val);
00416          Data.Path = tp->text;
00417          if (op) {Data.Opaque = op->text; op = op->next;}
00418             else  Data.Opaque = 0;
00419          if (!(iovcnt = Parser.Pack(kYR_prepadd, &xmsg[1], &xmsg[xNum],
00420                                    (char *)&Data, Work))) break;
00421          if (!(Manp = SelectManager(Resp, tp->text))) break;
00422          DEBUG("Finder: Sending " <<Manp->Name() <<' ' <<Data.Reqid
00423                       <<' ' <<Data.Path);
00424          if (!Manp->Send((const struct iovec *)&xmsg, iovcnt+1)) break;
00425          if ((tp = tp->next))
00426             {prepMutex.Lock(); XrdSysTimer::Wait(PrepWait); prepMutex.UnLock();}
00427          if (colocp) {Data.Request.modifier |= CmsPrepAddRequest::kYR_coloc;
00428                       *colocp = ' '; colocp = 0;
00429                      }
00430         }
00431 
00432 // Check if all went well
00433 //
00434    if (NoteNum) free(Data.Notify);
00435    if (!tp) return 0;
00436 
00437 // Decode the error condition
00438 //
00439    if (!Manp) return ConWait;
00440 
00441    if (!iovcnt)
00442       {Say.Emsg("Finder", "Unable to send prepadd; too much data.");
00443        Resp.setErrInfo(EINVAL, "Internal error processing file.");
00444        return -EINVAL;
00445       }
00446 
00447    Resp.setErrInfo(RepDelay, "");
00448    DEBUG("Finder: Failed to send prepare to " <<(Manp ? Manp->Name() : "?")
00449                   <<" reqid=" <<pargs.reqid);
00450    return RepDelay;
00451 }
00452 
00453 /******************************************************************************/
00454 /*                         S e l e c t M a n a g e r                          */
00455 /******************************************************************************/
00456   
00457 XrdCmsClientMan *XrdCmsFinderRMT::SelectManager(XrdOucErrInfo &Resp, 
00458                                                 const char    *path)
00459 {
00460    XrdCmsClientMan *Womp, *Manp;
00461 
00462 // Make sure we are configured
00463 //
00464    if (!myManagers)
00465       {Say.Emsg("Finder", "SelectManager() called prior to Configure().");
00466        Resp.setErrInfo(ConWait, "");
00467        return (XrdCmsClientMan *)0;
00468       }
00469 
00470 // Get where to start
00471 //
00472    if (SMode != XrdCmsClientConfig::RoundRob || !path) Womp = Manp = myManagers;
00473       else Womp = Manp = myManTable[XrdOucReqID::Index(myManCount, path)];
00474 
00475 // Find the next active server
00476 //
00477    do {if (Manp->isActive()) return (Manp->Suspended() ? 0 : Manp);
00478       } while((Manp = Manp->nextManager()) != Womp);
00479 
00480 // All managers are dead
00481 //
00482    SelectManFail(Resp);
00483    return (XrdCmsClientMan *)0;
00484 }
00485   
00486 /******************************************************************************/
00487 /*                         S e l e c t M a n F a i l                          */
00488 /******************************************************************************/
00489   
00490 void XrdCmsFinderRMT::SelectManFail(XrdOucErrInfo &Resp)
00491 {
00492    EPNAME("SelectManFail")
00493    static time_t nextMsg = 0;
00494    time_t now;
00495 
00496 // All servers are dead, indicate so every minute
00497 //
00498    now = time(0);
00499    myData.Lock();
00500    if (nextMsg < now)
00501       {nextMsg = now + 60;
00502        myData.UnLock();
00503        Say.Emsg("Finder", "All managers are dysfunctional.");
00504       } else myData.UnLock();
00505    Resp.setErrInfo(ConWait, "");
00506    TRACE(Redirect, "user=" <<Resp.getErrUser() <<" No managers available; wait " <<ConWait);
00507 }
00508   
00509 /******************************************************************************/
00510 /*                              s e n d 2 M a n                               */
00511 /******************************************************************************/
00512   
00513 int XrdCmsFinderRMT::send2Man(XrdOucErrInfo &Resp, const char *path,
00514                               struct iovec  *xmsg, int         xnum)
00515 {
00516    EPNAME("send2Man")
00517    int              retc;
00518    XrdCmsClientMsg *mp;
00519    XrdCmsClientMan *Manp;
00520 
00521 // Select the right manager for this request
00522 //
00523    if (!(Manp = SelectManager(Resp, path)) || Manp->Suspended()) 
00524       return ConWait;
00525 
00526 // Allocate a message object. There is only a fixed number of these and if
00527 // all of them are in use, th client has to wait to prevent over-runs.
00528 //
00529    if (!(mp = XrdCmsClientMsg::Alloc(&Resp)))
00530       {Resp.setErrInfo(RepDelay, "");
00531        TRACE(Redirect, Resp.getErrUser() <<" no more msg objects; path=" <<path);
00532        return RepDelay;
00533       }
00534 
00535 // Insert the message number into the header
00536 //
00537    ((CmsRRHdr *)(xmsg[0].iov_base))->streamid = mp->ID();
00538    if (QTRACE(Redirect)) Resp.setErrInfo(0,path);
00539       else Resp.setErrInfo(0, "");
00540 
00541 // Send message and simply wait for the reply (msg object is locked via Alloc)
00542 //
00543    if (!Manp->Send(xmsg, xnum) || (mp->Wait4Reply(Manp->waitTime())))
00544       {mp->Recycle();
00545        retc = Manp->whatsUp(Resp.getErrUser(), path);
00546        Resp.setErrInfo(retc, "");
00547        return retc;
00548       }
00549 
00550 // A reply was received; process as appropriate
00551 //
00552    retc = mp->getResult();
00553    if (retc == -EINPROGRESS) retc = Manp->delayResp(Resp);
00554       else if (retc == -EAGAIN) retc = Resp.getErrInfo();
00555 
00556 // All done
00557 //
00558    mp->Recycle();
00559    return retc;
00560 }
00561 
00562 /******************************************************************************/
00563 /*                         S t a r t M a n a g e r s                          */
00564 /******************************************************************************/
00565   
00566 void *XrdCmsStartManager(void *carg)
00567       {XrdCmsClientMan *mp = (XrdCmsClientMan *)carg;
00568        return mp->Start();
00569       }
00570 
00571 void *XrdCmsStartResp(void *carg)
00572       {XrdCmsResp::Reply();
00573        return (void *)0;
00574       }
00575 
00576 int XrdCmsFinderRMT::StartManagers(XrdOucTList *myManList)
00577 {
00578    XrdOucTList *tp;
00579    XrdCmsClientMan *mp, *firstone = 0;
00580    int i = 0;
00581    pthread_t tid;
00582    char buff[128];
00583 
00584 // Clear manager table
00585 //
00586    memset((void *)myManTable, 0, sizeof(myManTable));
00587 
00588 // For each manager, start a thread to handle it
00589 //
00590    tp = myManList;
00591    while(tp && i < MaxMan)
00592         {mp = new XrdCmsClientMan(tp->text,tp->val,ConWait,RepNone,RepWait,RepDelay);
00593          myManTable[i] = mp;
00594          if (myManagers) mp->setNext(myManagers);
00595             else firstone = mp;
00596          myManagers = mp;
00597          if (XrdSysThread::Run(&tid,XrdCmsStartManager,(void *)mp,0,tp->text))
00598             Say.Emsg("Finder", errno, "start manager");
00599          tp = tp->next; i++;
00600         }
00601 
00602 // Check if we exceeded maximum manager count
00603 //
00604    if (tp) 
00605       while(tp)
00606            {Say.Emsg("Config warning: too many managers;",tp->text,"ignored.");
00607             tp = tp->next;
00608            }
00609 
00610 // Make this a circular chain
00611 //
00612    if (firstone) firstone->setNext(myManagers);
00613 
00614 // Indicate how many managers have been started
00615 //
00616    sprintf(buff, "%d manager(s) started.", i);
00617    Say.Say("Config ", buff);
00618    myManCount = i;
00619 
00620 // Now Start that many callback threads
00621 //
00622    while(i--)
00623         if (XrdSysThread::Run(&tid,XrdCmsStartResp,(void *)0,0,"async callback"))
00624             Say.Emsg("Finder", errno, "start callback manager");
00625 
00626 // All done
00627 //
00628    return 0;
00629 }
00630  
00631 /******************************************************************************/
00632 /*                                 S p a c e                                  */
00633 /******************************************************************************/
00634   
00635 int XrdCmsFinderRMT::Space(XrdOucErrInfo &Resp, const char *path)
00636 {
00637    static const int xNum   = 4;
00638 
00639    XrdCmsRRData   Data;
00640    int            iovcnt;
00641    char           Work[xNum*12];
00642    struct iovec   xmsg[xNum];
00643 
00644 // Fill out the RR data structure
00645 //
00646    Data.Ident   = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : "");
00647    Data.Path    = (char *)path;
00648 
00649 // Pack the arguments
00650 //
00651    if (!(iovcnt = Parser.Pack(kYR_statfs, &xmsg[1], &xmsg[xNum],
00652                                   (char *)&Data, Work)))
00653       {Resp.setErrInfo(EINVAL, "Internal error processing file.");
00654        return -EINVAL;
00655       }
00656 
00657 // Insert the header into the stream
00658 //
00659    Data.Request.rrCode   = kYR_statfs;
00660    Data.Request.streamid = 0;
00661    Data.Request.modifier = 0;
00662    xmsg[0].iov_base      = (char *)&Data.Request;
00663    xmsg[0].iov_len       = sizeof(Data.Request);
00664 
00665 // Send the 2way message
00666 //
00667    return send2Man(Resp, path, xmsg, iovcnt+1);
00668 }
00669   
00670 /******************************************************************************/
00671 /*                         T a r g e t   F i n d e r                          */
00672 /******************************************************************************/
00673 /******************************************************************************/
00674 /*                           C o n s t r u c t o r                            */
00675 /******************************************************************************/
00676   
00677 XrdCmsFinderTRG::XrdCmsFinderTRG(XrdSysLogger *lp, int whoami, int port,
00678                                  XrdOss *theSS)
00679                : XrdCmsClient(XrdCmsClient::amTarget)
00680 {
00681    char buff [256];
00682    isRedir = whoami & IsRedir;
00683    isProxy = whoami & IsProxy;
00684    SS      = theSS;
00685    CMSPath = 0;
00686    CMSp    = new XrdOucStream(&Say);
00687    Active  = 0;
00688    myPort  = port;
00689    sprintf(buff, "login %c %d port %d\n",(isProxy ? 'P' : 'p'),
00690                  static_cast<int>(getpid()), port);
00691    Login = strdup(buff);
00692    Say.logger(lp);
00693 }
00694  
00695 /******************************************************************************/
00696 /*                            D e s t r u c t o r                             */
00697 /******************************************************************************/
00698 
00699 XrdCmsFinderTRG::~XrdCmsFinderTRG()
00700 {
00701   if (CMSp)  delete CMSp;
00702   if (Login) free(Login);
00703 }
00704   
00705 /******************************************************************************/
00706 /*                                 A d d e d                                  */
00707 /******************************************************************************/
00708   
00709 void XrdCmsFinderTRG::Added(const char *path, int Pend)
00710 {
00711    char *data[4];
00712    int   dlen[4];
00713 
00714 // Set up to notify the cluster that a file has been added
00715 //
00716    data[0] = (char *)"newfn ";   dlen[0] = 6;
00717    data[1] = (char *)path;       dlen[1] = strlen(path);
00718    if (Pend)
00719   {data[2] = (char *)" p\n";     dlen[2] = 3;}
00720       else
00721   {data[2] = (char *)"\n";       dlen[2] = 1;}
00722    data[3] = 0;                  dlen[3] = 0;
00723 
00724 // Now send the notification
00725 //
00726    myData.Lock();
00727    if (Active && CMSp->Put((const char **)data, (const int *)dlen))
00728       {CMSp->Close(); Active = 0;}
00729    myData.UnLock();
00730 }
00731 
00732 /******************************************************************************/
00733 /*                             C o n f i g u r e                              */
00734 /******************************************************************************/
00735 
00736 void *XrdCmsStartRsp(void *carg)
00737       {XrdCmsFinderTRG *mp = (XrdCmsFinderTRG *)carg;
00738        return mp->Start();
00739       }
00740   
00741 int XrdCmsFinderTRG::Configure(char *cfn)
00742 {
00743    XrdCmsClientConfig             config;
00744    XrdCmsClientConfig::configWhat What;
00745 
00746 // Establish what we will be configuring
00747 //
00748    What = (isRedir ? XrdCmsClientConfig::configSuper 
00749                    : XrdCmsClientConfig::configServer);
00750 
00751 // Set the error dest and simply call the configration object and if
00752 // successful, run the Admin thread
00753 //
00754    if (config.Configure(cfn, What, XrdCmsClientConfig::configNorm)) return 0;
00755    return RunAdmin(config.CMSPath);
00756 }
00757   
00758 /******************************************************************************/
00759 /*                               R e m o v e d                                */
00760 /******************************************************************************/
00761   
00762 void XrdCmsFinderTRG::Removed(const char *path)
00763 {
00764    char *data[4];
00765    int   dlen[4];
00766 
00767 // Set up to notify the cluster that a file has been removed
00768 //
00769    data[0] = (char *)"rmdid ";   dlen[0] = 6;
00770    data[1] = (char *)path;       dlen[1] = strlen(path);
00771    data[2] = (char *)"\n";       dlen[2] = 1;
00772    data[3] = 0;                  dlen[3] = 0;
00773 
00774 // Now send the notification
00775 //
00776    myData.Lock();
00777    if (Active && CMSp->Put((const char **)data, (const int *)dlen))
00778       {CMSp->Close(); Active = 0;}
00779    myData.UnLock();
00780 }
00781 
00782 /******************************************************************************/
00783 /*                              R u n A d m i n                               */
00784 /******************************************************************************/
00785   
00786 int XrdCmsFinderTRG::RunAdmin(char *Path)
00787 {
00788    pthread_t tid;
00789 
00790 // Make sure we have a path to the cmsd
00791 //
00792    if (!(CMSPath = Path))
00793       {Say.Emsg("Config", "Unable to determine cms admin path"); return 0;}
00794 
00795 // Start a thread to connect with the local cmsd
00796 //
00797    if (XrdSysThread::Run(&tid, XrdCmsStartRsp, (void *)this, 0, "cms i/f"))
00798       {Say.Emsg("Config", errno, "start cmsd interface"); return 0;}
00799 
00800    return 1;
00801 }
00802 
00803 /******************************************************************************/
00804 /*                                 S t a r t                                  */
00805 /******************************************************************************/
00806   
00807 void *XrdCmsFinderTRG::Start()
00808 {
00809    XrdCmsRRData Data;
00810    int retc;
00811 
00812 // First step is to connect to the local cmsd. We also establish a binary
00813 // read stream (old olbd's never used it) to get requests that can only be
00814 // executed by the xrootd (e.g., rm and mv).
00815 //
00816    while(1)
00817         {do {Hookup();
00818 
00819              // Login to cmsd
00820              //
00821              myData.Lock();
00822              retc = CMSp->Put(Login);
00823              myData.UnLock();
00824 
00825              // Get the FD for this connection
00826              //
00827              Data.Routing = CMSp->FDNum();
00828 
00829              // Put up a read to process local requests. Sould the cmsd die,
00830              // we will notice and try to reconnect.
00831              //
00832              while(recv(Data.Routing, &Data.Request, sizeof(Data.Request),
00833                         MSG_WAITALL) > 0 && Process(Data)) {}
00834              break;
00835             } while(1);
00836 
00837          // The cmsd went away
00838          //
00839          myData.Lock();
00840          CMSp->Close();
00841          Active = 0;
00842          myData.UnLock();
00843          Say.Emsg("Finder", "Lost contact with cmsd via", CMSPath);
00844          XrdSysTimer::Wait(10*1000);
00845         }
00846 
00847 // We should never get here
00848 //
00849    return (void *)0;
00850 }
00851 
00852 /******************************************************************************/
00853 /*                       P r i v a t e   M e t h o d s                        */
00854 /******************************************************************************/
00855 /******************************************************************************/
00856 /*                                H o o k u p                                 */
00857 /******************************************************************************/
00858   
00859 void XrdCmsFinderTRG::Hookup()
00860 {
00861    struct stat buf;
00862    XrdNetSocket Sock(&Say);
00863    int opts = 0, tries = 6;
00864 
00865 // Wait for the cmsd path to be created
00866 //
00867    while(stat(CMSPath, &buf))
00868         {if (!tries--)
00869             {Say.Emsg("Finder", "Waiting for cms path", CMSPath); tries=6;}
00870          XrdSysTimer::Wait(10*1000);
00871         }
00872 
00873 // We can now try to connect
00874 //
00875    tries = 0;
00876    while(Sock.Open(CMSPath, -1, opts) < 0)
00877         {if (!tries--)
00878             {opts = XRDNET_NOEMSG;
00879              tries = 6;
00880             } else if (!tries) opts = 0;
00881          XrdSysTimer::Wait(10*1000);
00882         };
00883 
00884 // Transfer the socket FD to a stream
00885 //
00886    myData.Lock();
00887    Active = 1;
00888    CMSp->Attach(Sock.Detach());
00889    myData.UnLock();
00890 
00891 // Tell the world
00892 //
00893    Say.Emsg("Finder", "Connected to cmsd via", CMSPath);
00894 }
00895 
00896 /******************************************************************************/
00897 /*                               P r o c e s s                                */
00898 /******************************************************************************/
00899   
00900 int XrdCmsFinderTRG::Process(XrdCmsRRData &Data)
00901 {
00902    EPNAME("Process")
00903    static const int maxReqSize = 16384;
00904    static       int Wmsg = 255;
00905    const char *myArgs, *myArgt, *Act;
00906    char buff[16];
00907    int rc;
00908 
00909 // Decode the length and get the rest of the data
00910 //
00911    Data.Dlen = static_cast<int>(ntohs(Data.Request.datalen));
00912    if (!(Data.Dlen)) {myArgs = myArgt = 0;}
00913       else {if (Data.Dlen > maxReqSize)
00914                {Say.Emsg("Finder","Request args too long from local cmsd");
00915                 return 0;
00916                }
00917             if ((!Data.Buff || Data.Blen < Data.Dlen)
00918             &&  !Data.getBuff(Data.Dlen))
00919                {Say.Emsg("Finder", "No buffers to serve local cmsd");
00920                 return 0;
00921                }
00922             if (recv(Data.Routing,Data.Buff,Data.Dlen,MSG_WAITALL) != Data.Dlen)
00923                 return 0;
00924             myArgs = Data.Buff; myArgt = Data.Buff + Data.Dlen;
00925            }
00926 
00927 // Process the request as needed. We ignore opaque information for now.
00928 // If the request is not valid is could be that we lost sync on the connection.
00929 // The only way to recover is to tear it down and start over.
00930 //
00931    switch(Data.Request.rrCode)
00932          {case kYR_mv:    Act = "mv";                             break;
00933           case kYR_rm:    Act = "rm";    Data.Path2 = (char *)""; break;
00934           case kYR_rmdir: Act = "rmdir"; Data.Path2 = (char *)""; break;
00935           default: sprintf(buff, "%d", Data.Request.rrCode);
00936                    Say.Emsg("Finder","Local cmsd sent an invalid request -",buff);
00937                    return 0;
00938          }
00939 
00940 // Parse the arguments
00941 //
00942    if (!myArgs || !Parser.Parse(int(Data.Request.rrCode),myArgs,myArgt,&Data))
00943       {Say.Emsg("Finder", "Local cmsd sent a badly formed",Act,"request");
00944        return 1;
00945       }
00946    DEBUG("cmsd requested " <<Act <<" " <<Data.Path <<' ' <<Data.Path2);
00947 
00948 // If we have no storage system then issue a warning but otherwise
00949 // ignore this operation (this may happen in proxy mode).
00950 //
00951    if (SS == 0)
00952       {Wmsg++;
00953        if (!(Wmsg & 255)) Say.Emsg("Finder", "Local cmsd request",Act,
00954                                    "ignored; no storage system provided.");
00955        return 1;
00956       }
00957 
00958 // Perform the request
00959 //
00960    switch(Data.Request.rrCode)
00961          {case kYR_mv:    rc = SS->Rename(Data.Path, Data.Path2);   break;
00962           case kYR_rm:    rc = SS->Unlink(Data.Path);               break;
00963           case kYR_rmdir: rc = SS->Remdir(Data.Path);               break;
00964           default:        rc = 0;                                   break;
00965          }
00966    if (rc) Say.Emsg("Finder", rc, Act, Data.Path);
00967 
00968 // All Done
00969 //
00970    return 1;
00971 }

Generated on Tue Jul 5 14:46:24 2011 for ROOT_528-00b_version by  doxygen 1.5.1