XrdFrmTransfer.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                     X r d F r m T r a n s f e r . c c                      */
00004 /*                                                                            */
00005 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdFrmTransfer.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdFrmTransferCVSID = "$Id: XrdFrmTransfer.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <string.h>
00016 #include <strings.h>
00017 #include <stdio.h>
00018 #include <fcntl.h>
00019 #include <unistd.h>
00020 #include <utime.h>
00021 #include <sys/param.h>
00022 #include <sys/types.h>
00023 #include <sys/socket.h>
00024 #include <sys/stat.h>
00025 
00026 #include "XrdFrm/XrdFrmCID.hh"
00027 #include "XrdFrm/XrdFrmConfig.hh"
00028 #include "XrdFrm/XrdFrmMonitor.hh"
00029 #include "XrdFrm/XrdFrmReqFile.hh"
00030 #include "XrdFrm/XrdFrmRequest.hh"
00031 #include "XrdFrm/XrdFrmTrace.hh"
00032 #include "XrdFrm/XrdFrmTransfer.hh"
00033 #include "XrdFrm/XrdFrmXfrJob.hh"
00034 #include "XrdFrm/XrdFrmXfrQueue.hh"
00035 #include "XrdNet/XrdNetCmsNotify.hh"
00036 #include "XrdOss/XrdOss.hh"
00037 #include "XrdOss/XrdOssLock.hh"
00038 #include "XrdOuc/XrdOucEnv.hh"
00039 #include "XrdOuc/XrdOucMsubs.hh"
00040 #include "XrdOuc/XrdOucProg.hh"
00041 #include "XrdOuc/XrdOucSxeq.hh"
00042 #include "XrdSys/XrdSysError.hh"
00043 #include "XrdSys/XrdSysPlatform.hh"
00044 
00045 using namespace XrdFrm;
00046 
00047 /******************************************************************************/
00048 /*                         L o c a l   C l a s s e s                          */
00049 /******************************************************************************/
00050 
00051 struct XrdFrmTranArg
00052 {
00053 XrdOucEnv   *theEnv;
00054 XrdOucProg  *theCmd;
00055 XrdOucMsubs *theVec;
00056 char        *theSrc;
00057 char        *theDst;
00058 char        *theINS;
00059 char         theMDP[8];
00060 
00061             XrdFrmTranArg(XrdOucEnv *Env)
00062                          : theEnv(Env), theSrc(0), theDst(0), theINS(0)
00063                            {theMDP[0] = '0'; theMDP[1] = 0;}
00064            ~XrdFrmTranArg() {}
00065 };
00066 
00067 struct XrdFrmTranChk
00068 {      XrdOucSxeq            *lkfP;
00069        struct stat           *Stat;
00070 
00071        XrdFrmTranChk(struct stat *sP) : lkfP(0), Stat(sP) {}
00072       ~XrdFrmTranChk() {if (lkfP) delete lkfP;}
00073 };
00074   
00075 /******************************************************************************/
00076 /*                               S t a t i c s                                */
00077 /******************************************************************************/
00078   
00079 XrdSysMutex               XrdFrmTransfer::pMutex;
00080 XrdOucHash<char>          XrdFrmTransfer::pTab;
00081 
00082 /******************************************************************************/
00083 /*                           C o n s t r u c t o r                            */
00084 /******************************************************************************/
00085   
00086 XrdFrmTransfer::XrdFrmTransfer()
00087 {
00088    int i;
00089 
00090 // Construct program objects
00091 //
00092    for (i = 0; i < 4; i++)
00093        xfrCmd[i] = (Config.xfrCmd[i].theVec ? new XrdOucProg(&Say) : 0);
00094 }
00095 
00096 /******************************************************************************/
00097 /* Public:                       c h e c k F F                                */
00098 /******************************************************************************/
00099   
00100 const char *XrdFrmTransfer::checkFF(const char *Path)
00101 {
00102    EPNAME("checkFF");
00103    struct stat buf;
00104 
00105 // Check for a fail file
00106 //
00107    if (!stat(Path, &buf))
00108       {if (buf.st_ctime+Config.FailHold >= time(0))
00109           return "request previously failed";
00110        if (Config.Test) {DEBUG("would have removed '" <<Path <<"'");}
00111           else {Config.ossFS->Unlink(Path, XRDOSS_isPFN);
00112                 DEBUG("removed '" <<Path <<"'");
00113                }
00114       }
00115 
00116 // Return all is well
00117 //
00118    return 0;
00119 }
00120 
00121 /******************************************************************************/
00122 /*                                 F e t c h                                  */
00123 /******************************************************************************/
00124   
00125 const char *XrdFrmTransfer::Fetch()
00126 {
00127    EPNAME("Fetch");
00128    static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00129    static const int crOpts = (O_CREAT|O_TRUNC)<<8|XRDOSS_mkpath;
00130 
00131    XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
00132    XrdFrmTranArg cmdArg(&myEnv);
00133    struct stat pfnStat;
00134    time_t xfrET;
00135    const char *eTxt;
00136    char lfnpath[MAXPATHLEN+8], *Lfn, Rfn[MAXPATHLEN+256], *theSrc;
00137    int iXfr, lfnEnd, rc, isURL = 0;
00138    long long fSize = 0;
00139 
00140 // The remote source is either the url-lfn or a translated lfn
00141 //
00142    if ((isURL = xfrP->reqData.LFO)) theSrc = xfrP->reqData.LFN;
00143       else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
00144                 return "lfn2rfn failed";
00145             theSrc = Rfn;
00146             isURL = (*Rfn != '/');
00147            }
00148 
00149 // Check if we can actually handle this transfer
00150 //
00151    if (isURL)
00152       {if (xfrCmd[2]) iXfr = 2;
00153           else return "url copies not configured";
00154       } else {
00155        if (xfrCmd[0]) iXfr = 0;
00156           else return "non-url copies not configured";
00157       }
00158 
00159 // Check for a fail file
00160 //
00161    if ((eTxt = ffCheck())) return eTxt;
00162 
00163 // Check if the file exists
00164 //
00165    if (!stat(xfrP->PFN, &pfnStat))
00166       {DEBUG(xfrP->PFN <<" exists; not fetched.");
00167        return 0;
00168       }
00169 
00170 // Construct the file name to which to we originally transfer the data. This is
00171 // the lfn if we do not pre-allocate files and "lfn.anew" otherwise.
00172 //
00173    Lfn = (xfrP->reqData.LFN)+xfrP->reqData.LFO;
00174    lfnEnd = strlen(Lfn);
00175    strlcpy(lfnpath, Lfn, sizeof(lfnpath)-8);
00176    if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
00177       {strcpy(&lfnpath[lfnEnd], ".anew");
00178        strcpy(&xfrP->PFN[xfrP->pfnEnd], ".anew");
00179       }
00180 
00181 // Setup the command
00182 //
00183    cmdArg.theCmd = xfrCmd[iXfr];
00184    cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
00185    cmdArg.theSrc = theSrc;
00186    cmdArg.theDst = xfrP->PFN;
00187    cmdArg.theINS = xfrP->reqData.iName;
00188    if (!SetupCmd(&cmdArg)) return "incoming transfer setup failed";
00189 
00190 // If the copycmd needs a placeholder in the filesystem for this transfer, we
00191 // must create one. We first remove any existing "anew" file because we will
00192 // over-write it. The create process will create a lock file if need be. 
00193 // However, we can ignore it as we are the only ones actually using it.
00194 //
00195    if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
00196       {Config.ossFS->Unlink(lfnpath);
00197        rc = Config.ossFS->Create(xfrP->reqData.User,lfnpath,fMode,myEnv,crOpts);
00198        if (rc)
00199           {Say.Emsg("Fetch", rc, "create placeholder for", lfnpath);
00200            return "create failed";
00201           }
00202       }
00203 
00204 // Now run the command to get the file and make sure the file is there
00205 // If it is, make sure that if a lock file exists its date/time is greater than
00206 // the file we just fetched; then rename it to be the correct name.
00207 //
00208    xfrET = time(0);
00209    if (!(rc = cmdArg.theCmd->Run()))
00210       {if ((rc = stat(xfrP->PFN, &pfnStat)))
00211           Say.Emsg("Fetch", lfnpath, "fetched but not found!");
00212           else {fSize  = pfnStat.st_size;
00213                 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
00214                    Fetch(lfnpath, rc, pfnStat.st_mtime+3);
00215                }
00216       }
00217 
00218 // Clean up if we failed otherwise tell the cmsd that we have a new file
00219 //
00220    xfrP->PFN[xfrP->pfnEnd] = '\0';
00221    if (rc)
00222       {Config.ossFS->Unlink(lfnpath);
00223        ffMake(rc == -2);
00224        if (rc == -2) {xfrP->RetCode = 2; return "file not found";}
00225        return "fetch failed";
00226       }
00227    if (Config.cmsPath) Config.cmsPath->Have(Lfn);
00228 
00229 // We completed successfully, see if we need to do statistics
00230 //
00231    if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats) || Config.monStage
00232    ||  (Trace.What & TRACE_Debug))
00233       {time_t eNow = time(0);
00234        int inqT, xfrT;
00235        inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
00236        if ((xfrT = static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
00237        if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats) 
00238        ||  (Trace.What & TRACE_Debug))
00239           {char sbuff[80];
00240            sprintf(sbuff, "Got: %lld qt: %d xt: %d up: ",fSize,inqT,xfrT);
00241            lfnpath[lfnEnd] = '\0';
00242            Say.Say(0, sbuff, xfrP->reqData.User, " ", lfnpath);
00243           }
00244        if (Config.monStage)
00245           {snprintf(lfnpath+lfnEnd, sizeof(lfnpath)-lfnEnd-1,
00246                     "\n&tod=%lld&sz=%lld&qt=%d&tm=%d",
00247                     static_cast<long long>(eNow), fSize, inqT, xfrT);
00248            XrdFrmMonitor::Map(XROOTD_MON_MAPSTAG,xfrP->reqData.User,lfnpath);
00249           }
00250      }
00251 
00252 // All done
00253 //
00254    return 0;
00255 }
00256 
00257 /******************************************************************************/
00258   
00259 const char *XrdFrmTransfer::Fetch(char *lfnpath, int &rc, time_t lktime)
00260 {
00261    struct stat lkfStat;
00262 
00263 // Check for a lock file and if we have one, reset it's time
00264 //
00265    strcpy(&xfrP->PFN[xfrP->pfnEnd+5], ".lock");
00266    if (!stat(xfrP->PFN, &lkfStat))
00267       {struct utimbuf tbuff;
00268        tbuff.actime = tbuff.modtime = lktime;
00269        if ((rc = utime(xfrP->PFN, &tbuff)))
00270           Say.Emsg("Fetch", rc, "set utime on", xfrP->PFN);
00271       }
00272 
00273 // Now rename the lfn to be what it needs to be in the end
00274 //
00275    if (!rc && (rc=Config.ossFS->Rename(lfnpath,xfrP->reqData.LFN)))
00276       Say.Emsg("Fetch", rc, "rename", lfnpath);
00277 
00278 // Done
00279 //
00280    return (rc ? "Failed" : 0);
00281 }
00282 
00283 /******************************************************************************/
00284 /* Private:                      f f C h e c k                                */
00285 /******************************************************************************/
00286   
00287 const char *XrdFrmTransfer::ffCheck()
00288 {
00289    const char *eTxt;
00290 
00291    strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
00292    eTxt = checkFF(xfrP->PFN);
00293    xfrP->PFN[xfrP->pfnEnd] = '\0';
00294    if (eTxt) xfrP->RetCode = 1;
00295    return eTxt;
00296 }
00297   
00298 /******************************************************************************/
00299 /* Private:                       f f M a k e                                 */
00300 /******************************************************************************/
00301   
00302 void XrdFrmTransfer::ffMake(int nofile)
00303 {
00304    static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00305    int myFD;
00306 
00307 // Create a fail file and if failure is due to "file not found" set the mtime
00308 // to 2 so that the oss layer picks up the same error in the future.
00309 //
00310    strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
00311    myFD = open(xfrP->PFN, O_CREAT, fMode);
00312    if (myFD >= 0)
00313       {close(myFD);
00314        if (nofile)
00315           {struct utimbuf tbuff;
00316            tbuff.actime = time(0); tbuff.modtime = 2;
00317            utime(xfrP->PFN, &tbuff);
00318           }
00319       }
00320    xfrP->PFN[xfrP->pfnEnd] = '\0';
00321 }
00322   
00323 /******************************************************************************/
00324 /*                                  I n i t                                   */
00325 /******************************************************************************/
00326   
00327 void *InitXfer(void *parg)
00328 {   XrdFrmTransfer *xP = new XrdFrmTransfer;
00329     xP->Start();
00330     return (void *)0;
00331 }
00332   
00333 int XrdFrmTransfer::Init()
00334 {
00335    pthread_t tid;
00336    int retc, n;
00337 
00338 // Initialize the cluster identification object first
00339 //
00340    CID.Init(Config.QPath);
00341 
00342 // Initialize the transfer queue first
00343 //
00344    if (!XrdFrmXfrQueue::Init()) return 0;
00345 
00346 // Start the required number of transfer threads
00347 //
00348    n = Config.xfrMax;
00349    while(n--)
00350         {if ((retc = XrdSysThread::Run(&tid, InitXfer, (void *)0,
00351                                        XRDSYSTHREAD_BIND, "transfer")))
00352             {Say.Emsg("main", retc, "create xfr thread"); return 0;}
00353         }
00354 
00355 // All done
00356 //
00357    return 1;
00358 }
00359 
00360 /******************************************************************************/
00361 /* Private:                     S e t u p C m d                               */
00362 /******************************************************************************/
00363   
00364 int XrdFrmTransfer::SetupCmd(XrdFrmTranArg *argP)
00365 {
00366    char *pdata[XrdOucMsubs::maxElem + 2], *cP;
00367    int   pdlen[XrdOucMsubs::maxElem + 2], i, k, n;
00368 
00369    XrdOucMsubsInfo 
00370               Info(xfrP->reqData.User, argP->theEnv, Config.the_N2N,
00371                    xfrP->reqData.LFN+xfrP->reqData.LFO,
00372                    argP->theSrc, xfrP->reqData.Prty,
00373                    xfrP->reqData.Options & XrdFrmRequest::makeRW?O_RDWR:O_RDONLY,
00374                    argP->theMDP, xfrP->reqData.ID, xfrP->PFN, argP->theDst);
00375 
00376 // We must establish the cluster and instance name if we have one
00377 //
00378    if (argP->theINS && argP->theEnv)
00379       {CID.Get(argP->theINS, CMS_CID, argP->theEnv);
00380        argP->theEnv->Put(XRD_INS, argP->theINS);
00381       }
00382 
00383 // Substitute in the parameters
00384 //
00385    k = argP->theVec->Subs(Info, pdata, pdlen);
00386 
00387 // Catenate all of the arguments
00388 //
00389    *cmdBuff = '\0'; n = sizeof(cmdBuff) - 4; cP = cmdBuff;
00390    for (i = 0; i < k; i++)
00391        {n -= pdlen[i];
00392         if (n < 0)
00393            {Say.Emsg("Setup",E2BIG,"build command line for", xfrP->reqData.LFN);
00394             return 0;
00395            }
00396         strcpy(cP, pdata[i]); cP += pdlen[i];
00397        }
00398 
00399 // Now setup the command
00400 //
00401    return (argP->theCmd->Setup(cmdBuff, &Say) == 0);
00402 }
00403 
00404 /******************************************************************************/
00405 /* Public:                         S t a r t                                  */
00406 /******************************************************************************/
00407   
00408 void XrdFrmTransfer::Start()
00409 {
00410    EPNAME("Transfer");  // Wrong but looks better
00411    const char *Msg;
00412 
00413 // Prime I/O queue selection
00414 
00415 // Endless loop looking for transfer jobs
00416 //
00417    while(1)
00418         {xfrP = XrdFrmXfrQueue::Get();
00419 
00420          DEBUG(xfrP->Type <<" starting " <<xfrP->reqData.LFN
00421                <<" for " <<xfrP->reqData.User);
00422 
00423          Msg = (xfrP->qNum & XrdFrmRequest::outQ ? Throw() : Fetch());
00424          if (Msg && !(xfrP->RetCode)) xfrP->RetCode = 1;
00425          xfrP->PFN[xfrP->pfnEnd] = 0;
00426 
00427          if (xfrP->RetCode || Config.Verbose)
00428             {char buff1[80], buff2[80];
00429              sprintf(buff1, "%s for %s ", xfrP->RetCode ? "failed" : "complete",
00430                                           xfrP->reqData.User);
00431              if (xfrP->RetCode == 0) *buff2 = 0;
00432                 else sprintf(buff2, "; %s", (Msg ? Msg : "reason unknown"));
00433              Say.Say(0, xfrP->Type, buff1, xfrP->reqData.LFN,buff2);
00434             } else {
00435              DEBUG(xfrP->Type
00436                   <<(xfrP->RetCode ? " failed   " : " complete ")
00437                   << xfrP->reqData.LFN <<" rc=" <<xfrP->RetCode
00438                   <<' ' <<(Msg ? Msg : ""));
00439             }
00440 
00441          XrdFrmXfrQueue::Done(xfrP, Msg);
00442         }
00443 }
00444 
00445 /******************************************************************************/
00446 /* Private:                      T r a c k D C                                */
00447 /******************************************************************************/
00448   
00449 int XrdFrmTransfer::TrackDC(char *Lfn, char *Mdp, char *Rfn)
00450 {
00451    char *FName, *Slash, *Slush = 0, *begRfn = Rfn;
00452    int n = -1;
00453 
00454 // If this is a url, then don't back space into the url part
00455 //
00456    if (*Rfn != '/'
00457    &&  (Slash = index(Rfn, '/'))     && *(Slash+1) == '/'
00458    &&  (Slash = index(Slash+2, '/')) && *(Slash+1) == '/') begRfn = Slash+1;
00459 
00460 // Discard the filename component
00461 //
00462    if (!(FName = rindex(begRfn, '/')) || FName == begRfn) return 0;
00463    *FName = 0; Slash = Slush = FName;
00464 
00465 // Try to find the created directory path
00466 //
00467    pMutex.Lock();
00468    while(Slash != begRfn && !pTab.Find(Rfn))
00469         {do {Slash--;} while(Slash != begRfn && *Slash != '/');
00470          if (Slush) *Slush = '/';
00471          *Slash = 0; Slush = Slash;
00472          n++;
00473         }
00474    pMutex.UnLock();
00475 
00476 // Compute offset of uncreated part
00477 //
00478    *Slash = '/';
00479    if (Slash == begRfn) n = 0;
00480       else n = (n >= 0 ? Slash - begRfn : FName - begRfn);
00481    sprintf(Mdp, "%d", n);
00482 
00483 // All done
00484 //
00485    return n;
00486 }
00487   
00488 /******************************************************************************/
00489   
00490 int XrdFrmTransfer::TrackDC(char *Rfn)
00491 {
00492    char *Slash;
00493 
00494 // Trim off the trailing end
00495 //
00496    if (!(Slash = rindex(Rfn, '/')) || Slash == Rfn) return 0;
00497    *Slash = 0;
00498 
00499 // The path has been added, do insert it into the table of created paths
00500 //
00501    pMutex.Lock();
00502    pTab.Add(Rfn, 0, 0, Hash_data_is_key);
00503    pMutex.UnLock();
00504    *Slash = '/';
00505    return 0;
00506 }
00507   
00508 /******************************************************************************/
00509 /*                                 T h r o w                                  */
00510 /******************************************************************************/
00511   
00512 const char *XrdFrmTransfer::Throw()
00513 {
00514    XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
00515    XrdFrmTranArg cmdArg(&myEnv);
00516    struct stat begStat, endStat;
00517    XrdFrmTranChk Chk(&begStat);
00518    time_t xfrET;
00519    const char *eTxt;
00520    char Rfn[MAXPATHLEN+256], *lfnpath = xfrP->reqData.LFN, *theDest;
00521    int isMigr = xfrP->reqData.Options & XrdFrmRequest::Migrate;
00522    int iXfr, isURL, rc, mDP = -1;
00523 
00524 // The remote source is either the url-lfn or a translated lfn
00525 //
00526    if ((isURL = xfrP->reqData.LFO)) theDest = xfrP->reqData.LFN;
00527       else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
00528                 return "lfn2rfn failed";
00529             theDest = Rfn;
00530             isURL = (*Rfn != '/');
00531            }
00532 
00533 // Check if we can actually handle this transfer
00534 //
00535    if (isURL)
00536       {if (xfrCmd[3]) iXfr = 3;
00537           else return "url copies not configured";
00538       } else {
00539        if (xfrCmd[1]) iXfr = 1;
00540           else return "non-url copies not configured";
00541       }
00542 
00543 // Check if the file exists
00544 //
00545    if (stat(xfrP->PFN, &begStat)) return (xfrP->reqFQ ? "file not found" : 0);
00546 
00547 // Check for a fail file
00548 //
00549    if ((eTxt = ffCheck())) return eTxt;
00550 
00551 // If this is an mss migration request, then recheck if the file can and
00552 // need to be migrated based on the lock file. This also obtains a directory
00553 // lock and lock file lock, as needed. If the file need not be migrated but
00554 // should be purge, we will get a null string error.
00555 //
00556    if (isMigr && (eTxt = ThrowOK(&Chk)))
00557       {if (*eTxt) return eTxt;
00558        if (!(xfrP->reqData.Options & XrdFrmRequest::Purge)) return "logic error";
00559        Throwaway();
00560        return 0;
00561       }
00562 
00563 // Setup the command, including directory tracking, as needed
00564 //
00565    cmdArg.theCmd = xfrCmd[iXfr];
00566    cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
00567    cmdArg.theDst = theDest;
00568    cmdArg.theSrc = xfrP->PFN;
00569    cmdArg.theINS = xfrP->reqData.iName;
00570    if (Config.xfrCmd[iXfr].Opts & Config.cmdMDP)
00571       mDP = TrackDC(lfnpath+xfrP->reqData.LFO, cmdArg.theMDP, Rfn);
00572    if (!SetupCmd(&cmdArg)) return "outgoing transfer setup failed";
00573 
00574 // Now run the command to put the file. If the command fails and this is a
00575 // migration request, cretae a fail file if one does not exist.
00576 //
00577    xfrET = time(0);
00578    if ((rc = cmdArg.theCmd->Run()))
00579       {if (isMigr) ffMake(rc == 2);
00580        return "copy failed";
00581       }
00582 
00583 // Track directory creations if we need to track them
00584 //
00585    if (mDP >= 0) TrackDC(Rfn);
00586 
00587 // Obtain state of the file after the copy
00588 //
00589    if (stat(xfrP->PFN, &endStat))
00590       {Say.Emsg("Throw", lfnpath, "transfered but not found!");
00591        return "unable to verify copy";
00592       }
00593 
00594 // Make sure the file was not modified during the copy. This is an error for
00595 // queued requests but internally generated requests will simply be retried.
00596 //
00597    if (begStat.st_mtime != endStat.st_mtime
00598     || begStat.st_size  != endStat.st_size)
00599       {Say.Emsg("Throw", lfnpath, "modified during transfer!");
00600        return "file modified during copy";
00601       }
00602 
00603 // Purge the file if so wanted. Otherwise, if this is a migration request,
00604 // make sure that if a lock file exists its date/time is greater than the file
00605 // we just copied to prevent the file from being copied again (we have a lock).
00606 //
00607    if (xfrP->reqData.Options & XrdFrmRequest::Purge) Throwaway();
00608       else if (isMigr)
00609               {strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
00610                if (!stat(xfrP->PFN, &begStat))
00611                   {struct utimbuf tbuff;
00612                    tbuff.actime = tbuff.modtime = endStat.st_mtime+1;
00613                    if (utime(xfrP->PFN, &tbuff))
00614                       Say.Emsg("Throw", errno, "set utime for", xfrP->PFN);
00615                    }
00616                xfrP->PFN[xfrP->pfnEnd] = '\0';
00617               }
00618 
00619 // Do statistics if so wanted
00620 //
00621    if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats) 
00622    ||  (Trace.What & TRACE_Debug))
00623       {int inqT, xfrT;
00624        long long Fsize = endStat.st_size;
00625        char sbuff[80];
00626        inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
00627        if ((xfrT = static_cast<int>(time(0) - xfrET)) <= 0) xfrT = 1;
00628        sprintf(sbuff, "Put: %lld qt: %d xt: %d up: ",Fsize,inqT,xfrT);
00629        Say.Say(0, sbuff, xfrP->reqData.User, " ", xfrP->reqData.LFN);
00630      }
00631 
00632 // All done
00633 //
00634    return 0;
00635 }
00636 
00637 /******************************************************************************/
00638 /* Private:                    T h r o w a w a y                              */
00639 /******************************************************************************/
00640 
00641 void XrdFrmTransfer::Throwaway()
00642 {
00643    EPNAME("Throwaway");
00644 
00645 // Purge the file. We do this via the pfn but also indicate we want all
00646 // migration support suffixes removed it they exist.
00647 //
00648    if (Config.Test) {DEBUG("Would have removed '" <<xfrP->PFN <<"'");}
00649       else {Config.ossFS->Unlink(xfrP->PFN, XRDOSS_isPFN|XRDOSS_isMIG);
00650             DEBUG("removed '" <<xfrP->PFN <<"'");
00651             if (Config.cmsPath) Config.cmsPath->Gone(xfrP->PFN);
00652            }
00653 }
00654   
00655 /******************************************************************************/
00656 /* Private:                      T h r o w O K                                */
00657 /******************************************************************************/
00658   
00659 const char *XrdFrmTransfer::ThrowOK(XrdFrmTranChk *cP)
00660 {
00661    XrdOssLock ufs_file;
00662    struct stat lokStat;
00663    int fnFD, statRC;
00664 
00665 // Get an exclusive lock on the directory
00666 //
00667    if (ufs_file.Serialize(xfrP->PFN, XrdOssDIR|XrdOssEXC) < 0)
00668       return "unable to lock directory";
00669 
00670 // Check if the file is in use by checking if we got an exclusive lock
00671 //
00672    if ((fnFD = open(xfrP->PFN, O_RDWR)) < 0) return "unable to open file";
00673    fcntl(fnFD, F_SETFD, FD_CLOEXEC);
00674    if (ufs_file.Serialize(fnFD, XrdOssEXC|XrdOssNOWAIT))
00675       {close(fnFD); return "file in use";}
00676    close(fnFD);
00677 
00678 // Get the info on the lock file
00679 //
00680    strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
00681    statRC = stat(xfrP->PFN, &lokStat);
00682    xfrP->PFN[xfrP->pfnEnd] = '\0';
00683 
00684 // Verify he information
00685 //
00686    if (statRC) return "missing lock file";
00687    if (lokStat.st_mtime >= cP->Stat->st_mtime)
00688       {if (xfrP->reqData.Options & XrdFrmRequest::Purge) return "";
00689        return "already migrated";
00690       }
00691 
00692 
00693 // Obtain a lock on the file
00694 //
00695    strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
00696    cP->lkfP = new XrdOucSxeq(XrdOucSxeq::Lock, xfrP->PFN);
00697    xfrP->PFN[xfrP->pfnEnd] = '\0';
00698    return 0;
00699 }

Generated on Tue Jul 5 14:46:36 2011 for ROOT_528-00b_version by  doxygen 1.5.1