XrdCmsPrepare.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                      X r d C m s P r e p a r e . c c                       */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //         $Id: XrdCmsPrepare.cc 34990 2010-08-25 10:31:23Z ganis $
00012 
00013 // Original Version: 1.11 2007/08/08 19:18:47 abh
00014 
00015 const char *XrdCmsPrepareCVSID = "$Id: XrdCmsPrepare.cc 34990 2010-08-25 10:31:23Z ganis $";
00016   
00017 #include <fcntl.h>
00018 #include <stdlib.h>
00019 #include <unistd.h>
00020 #include <sys/types.h>
00021 #include <sys/stat.h>
00022 
00023 #include "XrdCms/XrdCmsConfig.hh"
00024 #include "XrdCms/XrdCmsPrepare.hh"
00025 #include "XrdCms/XrdCmsTrace.hh"
00026 #include "XrdFrm/XrdFrmProxy.hh"
00027 #include "XrdNet/XrdNetMsg.hh"
00028 #include "XrdOss/XrdOss.hh"
00029 #include "XrdOuc/XrdOucEnv.hh"
00030 #include "XrdOuc/XrdOucMsubs.hh"
00031 #include "XrdOuc/XrdOucTList.hh"
00032 #include "XrdSys/XrdSysError.hh"
00033 
00034 using namespace XrdCms;
00035 
00036 /******************************************************************************/
00037 /*                        S t a t i c   O b j e c t s                         */
00038 /******************************************************************************/
00039   
00040 XrdCmsPrepare   XrdCms::PrepQ;
00041 
00042 /******************************************************************************/
00043 /*          G l o b a l s   &   E x t e r n a l   F u n c t i o n s           */
00044 /******************************************************************************/
00045 
00046 // This function is applied to all prepare queue entries. It checks if the file
00047 // in online and if so, returns a -1 to delete the entry from the queue. O/W
00048 // it returns a zero which keeps the entry in the queue. The key is the LFN.
00049 //
00050 int XrdCmsScrubScan(const char *key, char *cip, void *xargp)
00051 {
00052    struct stat buf;
00053 
00054 // Use oss interface to determine whether the file exists or not
00055 //
00056    return (Config.ossFS->Stat(key, &buf, XRDOSS_resonly) ? 0 : -1);
00057 }
00058 
00059 /******************************************************************************/
00060 /*                           C o n s t r u c t o r                            */
00061 /******************************************************************************/
00062   
00063 XrdCmsPrepare::XrdCmsPrepare() : XrdJob("Prep cache scrubber"),
00064                                  prepSched(&Say)
00065 {prepif   = 0;
00066  preppid  = 0;
00067  resetcnt = scrub2rst = 3;
00068  scrubtime= 20*60;
00069  NumFiles = 0;
00070  lastemsg = time(0);
00071  Relay    = new XrdNetMsg(&Say);
00072  PrepFrm  = 0;
00073  prepOK   = 0;
00074 }
00075 
00076 /******************************************************************************/
00077 /*                                   A d d                                    */
00078 /******************************************************************************/
00079   
00080 int XrdCmsPrepare::Add(XrdCmsPrepArgs &pargs)
00081 {
00082    char *pdata[XrdOucMsubs::maxElem+2], prtybuff[8], *pP=prtybuff;
00083    int rc, pdlen[XrdOucMsubs::maxElem + 2];
00084 
00085 // Check if we are using the built-in mechanism
00086 //
00087    if (PrepFrm)
00088       {rc = PrepFrm->Add('+',pargs.path,  pargs.opaque,pargs.Ident,pargs.reqid,
00089                              pargs.notify,pargs.mode,atoi(pargs.prty));
00090        if (rc) Say.Emsg("Add", rc, "prepare", pargs.path);
00091           else {PTMutex.Lock();
00092                 if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
00093                 PTMutex.UnLock();
00094                }
00095        return rc == 0;
00096       }
00097 
00098 // Restart the scheduler if need be
00099 //
00100    PTMutex.Lock();
00101    if (!prepif || !prepSched.isAlive())
00102       {Say.Emsg("Add","No prepare manager; prepare",pargs.reqid,"ignored.");
00103        PTMutex.UnLock();
00104        return 0;
00105       }
00106 
00107 // Write out the header line
00108 //
00109    if (!prepMsg)
00110       {*pP++ = pargs.prty[0]; *pP = '\0';
00111        pdata[0] = (char *)"+ ";               pdlen[0] = 2;
00112        pdata[1] = pargs.reqid;                pdlen[1] = strlen(pargs.reqid);
00113        pdata[2] = (char *)" ";                pdlen[2] = 1;
00114        pdata[3] = pargs.notify;               pdlen[3] = strlen(pargs.notify);
00115        pdata[4] = (char *)" ";                pdlen[4] = 1;
00116        pdata[5] = prtybuff;                   pdlen[5] = strlen(prtybuff);
00117        pdata[6] = (char *)" ";                pdlen[6] = 1;
00118        pdata[7] = pargs.mode;                 pdlen[7] = strlen(pargs.mode);
00119        pdata[8] = (char *)" ";                pdlen[8] = 1;
00120        pdata[9] = pargs.path;                 pdlen[9] = strlen(pargs.path);
00121        pdata[10] = (char *)"\n";              pdlen[10] = 1;
00122        pdata[11]= 0;                          pdlen[11]= 0;
00123       if (!(rc = prepSched.Put((const char **)pdata, (const int *)pdlen)))
00124          if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
00125       } else {
00126        int Oflag = (index(pargs.mode, (int)'w') ? O_RDWR : 0);
00127        mode_t Prty = atoi(pargs.prty);
00128        XrdOucEnv Env(pargs.opaque);
00129        XrdOucMsubsInfo Info(pargs.Ident, &Env,  N2N,   pargs.path,
00130                             pargs.notify, Prty, Oflag, pargs.mode, pargs.reqid);
00131        int k = prepMsg->Subs(Info, pdata, pdlen);
00132        pdata[k]   = (char *)"\n"; pdlen[k++] = 1;
00133        pdata[k]   = 0;            pdlen[k]   = 0;
00134        if (!(rc = prepSched.Put((const char **)pdata, (const int *)pdlen)))
00135           if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
00136       }
00137 
00138 // All done
00139 //
00140    PTMutex.UnLock();
00141    return rc == 0;
00142 }
00143 
00144 /******************************************************************************/
00145 /*                                   D e l                                    */
00146 /******************************************************************************/
00147   
00148 int XrdCmsPrepare::Del(char *reqid)
00149 {
00150    char *pdata[4];
00151    int rc, pdlen[4];
00152 
00153 // Use our built-in mechanism if so wanted
00154 //
00155    if (PrepFrm)
00156       {if ((rc = PrepFrm->Del('-', reqid)))
00157           Say.Emsg("Del", rc, "unprepare", reqid);
00158        return rc == 0;
00159       }
00160 
00161 // Restart the scheduler if need be
00162 //
00163    PTMutex.Lock();
00164    if (!prepif || !prepSched.isAlive())
00165       {Say.Emsg("Del","No prepare manager; unprepare",reqid,"ignored.");
00166        PTMutex.UnLock();
00167        return 0;
00168       }
00169 
00170 // Write out the delete request
00171 //
00172    pdata[0] = (char *)"- ";
00173    pdlen[0] = 2;
00174    pdata[1] = reqid;
00175    pdlen[1] = strlen(reqid);
00176    pdata[2] = (char *)"\n";
00177    pdlen[2] = 1;
00178    pdata[3] = (char *)0;
00179    pdlen[3] = 0;
00180    rc = prepSched.Put((const char **)pdata, (const int *)pdlen);
00181    PTMutex.UnLock();
00182    return rc == 0;
00183 }
00184  
00185 /******************************************************************************/
00186 /*                                  D o I t                                   */
00187 /******************************************************************************/
00188   
00189 void XrdCmsPrepare::DoIt()
00190 {
00191 // Simply scrub the cache
00192 //
00193    Scrub();
00194    Sched->Schedule((XrdJob *)this,scrubtime+time(0));
00195 }
00196 
00197 /******************************************************************************/
00198 /*                                E x i s t s                                 */
00199 /******************************************************************************/
00200   
00201 int  XrdCmsPrepare::Exists(char *path)
00202 {
00203    int Found;
00204 
00205 // Lock the hash table
00206 //
00207    PTMutex.Lock();
00208 
00209 // Look up the entry
00210 //
00211    Found = (NumFiles ? PTable.Find(path) != 0 : 0);
00212 
00213 // All done
00214 //
00215    PTMutex.UnLock();
00216    return Found;
00217 }
00218  
00219 /******************************************************************************/
00220 /*                                  G o n e                                   */
00221 /******************************************************************************/
00222   
00223 void XrdCmsPrepare::Gone(char *path)
00224 {
00225 
00226 // Lock the hash table
00227 //
00228    PTMutex.Lock();
00229 
00230 // Delete the entry
00231 //
00232    if (NumFiles > 0 && PTable.Del(path) == 0) NumFiles--;
00233 
00234 // All done
00235 //
00236    PTMutex.UnLock();
00237 }
00238 
00239 /******************************************************************************/
00240 /*                                I n f o r m                                 */
00241 /******************************************************************************/
00242   
00243 void XrdCmsPrepare::Inform(const char *cmd, XrdCmsPrepArgs *pargs)
00244 {
00245    EPNAME("Inform")
00246    struct iovec Msg[8];
00247    char *mdest, *minfo;
00248 
00249 // See if requestor wants a response
00250 //
00251    if (!index(pargs->mode, (int)'n')
00252    ||  strncmp("udp://", pargs->notify, 6)
00253    ||  !Relay)
00254       {DEBUG(pargs->Ident <<' ' <<cmd <<' ' <<pargs->reqid <<" not sent to "
00255                           <<pargs->notify);
00256        return;
00257       }
00258 
00259 // Extract out destination and argument
00260 //
00261    mdest = pargs->notify+6;
00262    if ((minfo = index(mdest, (int)'/')))
00263       {*minfo = '\0'; minfo++;}
00264    if (!minfo || !*minfo) minfo = (char *)"*";
00265    DEBUG("Sending " <<mdest <<": " <<cmd <<' '<<pargs->reqid <<' ' <<minfo);
00266 
00267 // Create message to be sent
00268 //
00269    Msg[0].iov_base = (char *)cmd;  Msg[0].iov_len  = strlen(cmd);
00270    Msg[1].iov_base = (char *)" ";  Msg[1].iov_len  = 1;
00271    Msg[2].iov_base = pargs->reqid; Msg[2].iov_len  = strlen(pargs->reqid);
00272    Msg[3].iov_base = (char *)" ";  Msg[3].iov_len  = 1;
00273    Msg[4].iov_base = minfo;        Msg[4].iov_len  = strlen(minfo);
00274    Msg[5].iov_base = (char *)" ";  Msg[5].iov_len  = 1;
00275    Msg[6].iov_base = pargs->path;  Msg[6].iov_len  = (pargs->pathlen)-1;
00276    Msg[7].iov_base = (char *)"\n"; Msg[7].iov_len  = 1;
00277 
00278 // Send the message and return
00279 //
00280    Relay->Send(Msg, 8, mdest);
00281 }
00282 
00283 /******************************************************************************/
00284 /*                               P r e p a r e                                */
00285 /******************************************************************************/
00286 
00287 void XrdCmsPrepare::Prepare(XrdCmsPrepArgs *pargs)
00288 {
00289    EPNAME("Prepare");
00290    int rc;
00291 
00292 // Check if this file is not online, prepare it
00293 //
00294    if (!(rc = isOnline(pargs->path)))
00295       {DEBUG("Preparing " <<pargs->reqid <<' ' <<pargs->notify <<' ' 
00296                           <<pargs->prty <<' ' <<pargs->mode <<' ' <<pargs->path);
00297        if (!Config.DiskSS) Say.Emsg("Prepare","staging disallowed; ignoring prep",
00298                                     pargs->Ident, pargs->reqid);
00299           else Add(*pargs);
00300        return;
00301       }
00302 
00303 // If the file is really online, inform the requestor
00304 //
00305    if (rc > 0) Inform("avail", pargs);
00306 }
00307 
00308 /******************************************************************************/
00309 /*                                 R e s e t                                  */
00310 /******************************************************************************/
00311 
00312 void XrdCmsPrepare::Reset(const char *iName, const char *aPath, int aMode)
00313 {
00314    EPNAME("Reset");
00315    char baseAP[1024], *Slash;
00316 
00317 // This is a call from the configurator. No need to do anything if we have
00318 // no interface to initialize.
00319 //
00320    if (!prepif) return;
00321 
00322 // If this is a built-in mechanism, then allocate the prepare interface
00323 // and initialize it. This is a one-time thing and it better work right away.
00324 // In any case, do a standard reset.
00325 //
00326    if (!*prepif)
00327       {PrepFrm = new XrdFrmProxy(Say.logger(), iName);
00328        DEBUG("Initializing internal FRM prepare interface.");
00329        strcpy(baseAP, aPath); baseAP[strlen(baseAP)-1] = '\0';
00330        if ((Slash = rindex(baseAP, '/'))) *Slash = '\0';
00331        if (!(prepOK = PrepFrm->Init(XrdFrmProxy::opStg, baseAP, aMode)))
00332           {Say.Emsg("Reset", "Built-in prepare init failed; prepare disabled.");
00333            return;
00334           }
00335       }
00336 
00337 // Reset the interface and schedule a scrub
00338 //
00339    Reset();
00340    if (scrubtime) Sched->Schedule((XrdJob *)this,scrubtime+time(0));
00341 
00342 }
00343 
00344 /******************************************************************************/
00345 /*                              s e t P a r m s                               */
00346 /******************************************************************************/
00347   
00348 int XrdCmsPrepare::setParms(int rcnt, int stime, int deco)
00349 {if (rcnt  > 0) resetcnt  = scrub2rst = rcnt;
00350  if (stime > 0) scrubtime = stime;
00351  doEcho = deco;
00352  return 0;
00353 }
00354 
00355 int XrdCmsPrepare::setParms(const char *ifpgm, char *ifmsg)
00356 {if (ifpgm)
00357     {const char *Slash = rindex(ifpgm, '/');
00358      if (prepif) free(prepif);
00359      if (Slash && !strcmp(Slash+1, "frm_xfragent")) ifpgm = "";
00360      prepif = strdup(ifpgm);
00361     }
00362  if (ifmsg)
00363     {if (prepMsg) delete prepMsg;
00364      prepMsg = new XrdOucMsubs(&Say);
00365      if (!(prepMsg->Parse("prepmsg", ifmsg)))
00366         {delete prepMsg; prepMsg = 0; return 1;}
00367     }
00368  return 0;
00369 }
00370  
00371 /******************************************************************************/
00372 /*                       P r i v a t e   M e t h o d s                        */
00373 /******************************************************************************/
00374 /******************************************************************************/
00375 /*                              i s O n l i n e                               */
00376 /******************************************************************************/
00377   
00378 int XrdCmsPrepare::isOnline(char *path)
00379 {
00380    static const int Sopts = XRDOSS_resonly | XRDOSS_updtatm;
00381    struct stat buf;
00382 
00383 // Issue the stat() via oss plugin. If it indicates the file is not there is
00384 // still might be logically here because it's in a staging queue.
00385 //
00386    if (Config.ossFS->Stat(path, &buf, Sopts))
00387       {if (Config.DiskSS && Exists(path)) return -1;
00388           else return 0;
00389       }
00390    return 1;
00391 }
00392 
00393 /******************************************************************************/
00394 /*                                 R e s e t                                  */
00395 /******************************************************************************/
00396   
00397 void XrdCmsPrepare::Reset()  // Must be called with PTMutex locked!
00398 {
00399    char *lp,  *pdata[] = {(char *)"?\n", 0};
00400    int         pdlen[] = {2, 0};
00401 
00402 // Hanlde via built-in mechanism
00403 //
00404    if (PrepFrm)
00405       {XrdFrmProxy::Queues State(XrdFrmProxy::opStg);
00406        char Buff[1024];
00407        if (prepOK)
00408           {PTable.Purge(); NumFiles = 0;
00409            while(PrepFrm->List(State, Buff, sizeof(Buff)))
00410                 {PTable.Add(Buff, 0, 0, Hash_data_is_key); NumFiles++;
00411                  if (doEcho) Say.Emsg("Reset","Prepare pending for",Buff);
00412                 }
00413           }
00414        return;
00415       }
00416 
00417 // Check if we really have an interface to reset
00418 //
00419    if (!prepif)
00420       {Say.Emsg("Reset", "Prepare program not specified; prepare disabled.");
00421        return;
00422       }
00423 
00424 // Do it the slow external way
00425 //
00426    if (!prepSched.isAlive() && !startIF()) return;
00427    if (prepSched.Put((const char **)pdata, (const int *)pdlen))
00428       {Say.Emsg("Prepare", prepSched.LastError(), "write to", prepif);
00429        prepSched.Drain(); prepOK = 0;
00430       }
00431       else {PTable.Purge(); NumFiles = 0;
00432             while((lp = prepSched.GetLine()) && *lp)
00433                  {PTable.Add(lp, 0, 0, Hash_data_is_key); NumFiles++;
00434                   if (doEcho) Say.Emsg("Reset","Prepare pending for",lp);
00435                  }
00436            }
00437 }
00438   
00439 /******************************************************************************/
00440 /*                                 S c r u b                                  */
00441 /******************************************************************************/
00442   
00443 void XrdCmsPrepare::Scrub()
00444 {
00445      PTMutex.Lock();
00446      if (scrub2rst <= 0)
00447         {Reset();
00448          scrub2rst = resetcnt;
00449         }
00450         else {PTable.Apply(XrdCmsScrubScan, (void *)0);
00451               scrub2rst--;
00452              }
00453      if (!PrepFrm && !prepSched.isAlive()) startIF();
00454      PTMutex.UnLock();
00455 }
00456 
00457 /******************************************************************************/
00458 /*                               s t a r t I F                                */
00459 /******************************************************************************/
00460   
00461 int XrdCmsPrepare::startIF()  // Must be called with PTMutex locked!
00462 {   
00463    EPNAME("startIF")
00464 
00465 // If we are using a local interface then there is nothing to start.
00466 //
00467    if (PrepFrm) return prepOK;
00468 
00469 // Complain if there is no external prepare program
00470 //
00471    if (!prepif)
00472       {Say.Emsg("startIF","Prepare program not specified; prepare disabled.");
00473        return (prepOK = 0);
00474       }
00475 
00476 // Setup the external program
00477 //
00478    DEBUG("Prepare: Starting " <<prepif);
00479    if (prepSched.Exec(prepif, 1))
00480       {time_t eNow = time(0);
00481        prepOK = 0;
00482        if ((eNow - lastemsg) >= 60)
00483           {lastemsg = eNow;
00484            Say.Emsg("Prepare", prepSched.LastError(), "start", prepif);
00485           }
00486       } else prepOK = 1;
00487 
00488 // All done
00489 //
00490    return prepOK;
00491 }

Generated on Tue Jul 5 14:46:31 2011 for ROOT_528-00b_version by  doxygen 1.5.1