00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdFrmTransferCVSID = "$Id: XrdFrmTransfer.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <string.h>
00016 #include <strings.h>
00017 #include <stdio.h>
00018 #include <fcntl.h>
00019 #include <unistd.h>
00020 #include <utime.h>
00021 #include <sys/param.h>
00022 #include <sys/types.h>
00023 #include <sys/socket.h>
00024 #include <sys/stat.h>
00025
00026 #include "XrdFrm/XrdFrmCID.hh"
00027 #include "XrdFrm/XrdFrmConfig.hh"
00028 #include "XrdFrm/XrdFrmMonitor.hh"
00029 #include "XrdFrm/XrdFrmReqFile.hh"
00030 #include "XrdFrm/XrdFrmRequest.hh"
00031 #include "XrdFrm/XrdFrmTrace.hh"
00032 #include "XrdFrm/XrdFrmTransfer.hh"
00033 #include "XrdFrm/XrdFrmXfrJob.hh"
00034 #include "XrdFrm/XrdFrmXfrQueue.hh"
00035 #include "XrdNet/XrdNetCmsNotify.hh"
00036 #include "XrdOss/XrdOss.hh"
00037 #include "XrdOss/XrdOssLock.hh"
00038 #include "XrdOuc/XrdOucEnv.hh"
00039 #include "XrdOuc/XrdOucMsubs.hh"
00040 #include "XrdOuc/XrdOucProg.hh"
00041 #include "XrdOuc/XrdOucSxeq.hh"
00042 #include "XrdSys/XrdSysError.hh"
00043 #include "XrdSys/XrdSysPlatform.hh"
00044
00045 using namespace XrdFrm;
00046
00047
00048
00049
00050
00051 struct XrdFrmTranArg
00052 {
00053 XrdOucEnv *theEnv;
00054 XrdOucProg *theCmd;
00055 XrdOucMsubs *theVec;
00056 char *theSrc;
00057 char *theDst;
00058 char *theINS;
00059 char theMDP[8];
00060
00061 XrdFrmTranArg(XrdOucEnv *Env)
00062 : theEnv(Env), theSrc(0), theDst(0), theINS(0)
00063 {theMDP[0] = '0'; theMDP[1] = 0;}
00064 ~XrdFrmTranArg() {}
00065 };
00066
00067 struct XrdFrmTranChk
00068 { XrdOucSxeq *lkfP;
00069 struct stat *Stat;
00070
00071 XrdFrmTranChk(struct stat *sP) : lkfP(0), Stat(sP) {}
00072 ~XrdFrmTranChk() {if (lkfP) delete lkfP;}
00073 };
00074
00075
00076
00077
00078
00079 XrdSysMutex XrdFrmTransfer::pMutex;
00080 XrdOucHash<char> XrdFrmTransfer::pTab;
00081
00082
00083
00084
00085
00086 XrdFrmTransfer::XrdFrmTransfer()
00087 {
00088 int i;
00089
00090
00091
00092 for (i = 0; i < 4; i++)
00093 xfrCmd[i] = (Config.xfrCmd[i].theVec ? new XrdOucProg(&Say) : 0);
00094 }
00095
00096
00097
00098
00099
00100 const char *XrdFrmTransfer::checkFF(const char *Path)
00101 {
00102 EPNAME("checkFF");
00103 struct stat buf;
00104
00105
00106
00107 if (!stat(Path, &buf))
00108 {if (buf.st_ctime+Config.FailHold >= time(0))
00109 return "request previously failed";
00110 if (Config.Test) {DEBUG("would have removed '" <<Path <<"'");}
00111 else {Config.ossFS->Unlink(Path, XRDOSS_isPFN);
00112 DEBUG("removed '" <<Path <<"'");
00113 }
00114 }
00115
00116
00117
00118 return 0;
00119 }
00120
00121
00122
00123
00124
00125 const char *XrdFrmTransfer::Fetch()
00126 {
00127 EPNAME("Fetch");
00128 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00129 static const int crOpts = (O_CREAT|O_TRUNC)<<8|XRDOSS_mkpath;
00130
00131 XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
00132 XrdFrmTranArg cmdArg(&myEnv);
00133 struct stat pfnStat;
00134 time_t xfrET;
00135 const char *eTxt;
00136 char lfnpath[MAXPATHLEN+8], *Lfn, Rfn[MAXPATHLEN+256], *theSrc;
00137 int iXfr, lfnEnd, rc, isURL = 0;
00138 long long fSize = 0;
00139
00140
00141
00142 if ((isURL = xfrP->reqData.LFO)) theSrc = xfrP->reqData.LFN;
00143 else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
00144 return "lfn2rfn failed";
00145 theSrc = Rfn;
00146 isURL = (*Rfn != '/');
00147 }
00148
00149
00150
00151 if (isURL)
00152 {if (xfrCmd[2]) iXfr = 2;
00153 else return "url copies not configured";
00154 } else {
00155 if (xfrCmd[0]) iXfr = 0;
00156 else return "non-url copies not configured";
00157 }
00158
00159
00160
00161 if ((eTxt = ffCheck())) return eTxt;
00162
00163
00164
00165 if (!stat(xfrP->PFN, &pfnStat))
00166 {DEBUG(xfrP->PFN <<" exists; not fetched.");
00167 return 0;
00168 }
00169
00170
00171
00172
00173 Lfn = (xfrP->reqData.LFN)+xfrP->reqData.LFO;
00174 lfnEnd = strlen(Lfn);
00175 strlcpy(lfnpath, Lfn, sizeof(lfnpath)-8);
00176 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
00177 {strcpy(&lfnpath[lfnEnd], ".anew");
00178 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".anew");
00179 }
00180
00181
00182
00183 cmdArg.theCmd = xfrCmd[iXfr];
00184 cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
00185 cmdArg.theSrc = theSrc;
00186 cmdArg.theDst = xfrP->PFN;
00187 cmdArg.theINS = xfrP->reqData.iName;
00188 if (!SetupCmd(&cmdArg)) return "incoming transfer setup failed";
00189
00190
00191
00192
00193
00194
00195 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
00196 {Config.ossFS->Unlink(lfnpath);
00197 rc = Config.ossFS->Create(xfrP->reqData.User,lfnpath,fMode,myEnv,crOpts);
00198 if (rc)
00199 {Say.Emsg("Fetch", rc, "create placeholder for", lfnpath);
00200 return "create failed";
00201 }
00202 }
00203
00204
00205
00206
00207
00208 xfrET = time(0);
00209 if (!(rc = cmdArg.theCmd->Run()))
00210 {if ((rc = stat(xfrP->PFN, &pfnStat)))
00211 Say.Emsg("Fetch", lfnpath, "fetched but not found!");
00212 else {fSize = pfnStat.st_size;
00213 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
00214 Fetch(lfnpath, rc, pfnStat.st_mtime+3);
00215 }
00216 }
00217
00218
00219
00220 xfrP->PFN[xfrP->pfnEnd] = '\0';
00221 if (rc)
00222 {Config.ossFS->Unlink(lfnpath);
00223 ffMake(rc == -2);
00224 if (rc == -2) {xfrP->RetCode = 2; return "file not found";}
00225 return "fetch failed";
00226 }
00227 if (Config.cmsPath) Config.cmsPath->Have(Lfn);
00228
00229
00230
00231 if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats) || Config.monStage
00232 || (Trace.What & TRACE_Debug))
00233 {time_t eNow = time(0);
00234 int inqT, xfrT;
00235 inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
00236 if ((xfrT = static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
00237 if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
00238 || (Trace.What & TRACE_Debug))
00239 {char sbuff[80];
00240 sprintf(sbuff, "Got: %lld qt: %d xt: %d up: ",fSize,inqT,xfrT);
00241 lfnpath[lfnEnd] = '\0';
00242 Say.Say(0, sbuff, xfrP->reqData.User, " ", lfnpath);
00243 }
00244 if (Config.monStage)
00245 {snprintf(lfnpath+lfnEnd, sizeof(lfnpath)-lfnEnd-1,
00246 "\n&tod=%lld&sz=%lld&qt=%d&tm=%d",
00247 static_cast<long long>(eNow), fSize, inqT, xfrT);
00248 XrdFrmMonitor::Map(XROOTD_MON_MAPSTAG,xfrP->reqData.User,lfnpath);
00249 }
00250 }
00251
00252
00253
00254 return 0;
00255 }
00256
00257
00258
00259 const char *XrdFrmTransfer::Fetch(char *lfnpath, int &rc, time_t lktime)
00260 {
00261 struct stat lkfStat;
00262
00263
00264
00265 strcpy(&xfrP->PFN[xfrP->pfnEnd+5], ".lock");
00266 if (!stat(xfrP->PFN, &lkfStat))
00267 {struct utimbuf tbuff;
00268 tbuff.actime = tbuff.modtime = lktime;
00269 if ((rc = utime(xfrP->PFN, &tbuff)))
00270 Say.Emsg("Fetch", rc, "set utime on", xfrP->PFN);
00271 }
00272
00273
00274
00275 if (!rc && (rc=Config.ossFS->Rename(lfnpath,xfrP->reqData.LFN)))
00276 Say.Emsg("Fetch", rc, "rename", lfnpath);
00277
00278
00279
00280 return (rc ? "Failed" : 0);
00281 }
00282
00283
00284
00285
00286
00287 const char *XrdFrmTransfer::ffCheck()
00288 {
00289 const char *eTxt;
00290
00291 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
00292 eTxt = checkFF(xfrP->PFN);
00293 xfrP->PFN[xfrP->pfnEnd] = '\0';
00294 if (eTxt) xfrP->RetCode = 1;
00295 return eTxt;
00296 }
00297
00298
00299
00300
00301
00302 void XrdFrmTransfer::ffMake(int nofile)
00303 {
00304 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00305 int myFD;
00306
00307
00308
00309
00310 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
00311 myFD = open(xfrP->PFN, O_CREAT, fMode);
00312 if (myFD >= 0)
00313 {close(myFD);
00314 if (nofile)
00315 {struct utimbuf tbuff;
00316 tbuff.actime = time(0); tbuff.modtime = 2;
00317 utime(xfrP->PFN, &tbuff);
00318 }
00319 }
00320 xfrP->PFN[xfrP->pfnEnd] = '\0';
00321 }
00322
00323
00324
00325
00326
00327 void *InitXfer(void *parg)
00328 { XrdFrmTransfer *xP = new XrdFrmTransfer;
00329 xP->Start();
00330 return (void *)0;
00331 }
00332
00333 int XrdFrmTransfer::Init()
00334 {
00335 pthread_t tid;
00336 int retc, n;
00337
00338
00339
00340 CID.Init(Config.QPath);
00341
00342
00343
00344 if (!XrdFrmXfrQueue::Init()) return 0;
00345
00346
00347
00348 n = Config.xfrMax;
00349 while(n--)
00350 {if ((retc = XrdSysThread::Run(&tid, InitXfer, (void *)0,
00351 XRDSYSTHREAD_BIND, "transfer")))
00352 {Say.Emsg("main", retc, "create xfr thread"); return 0;}
00353 }
00354
00355
00356
00357 return 1;
00358 }
00359
00360
00361
00362
00363
00364 int XrdFrmTransfer::SetupCmd(XrdFrmTranArg *argP)
00365 {
00366 char *pdata[XrdOucMsubs::maxElem + 2], *cP;
00367 int pdlen[XrdOucMsubs::maxElem + 2], i, k, n;
00368
00369 XrdOucMsubsInfo
00370 Info(xfrP->reqData.User, argP->theEnv, Config.the_N2N,
00371 xfrP->reqData.LFN+xfrP->reqData.LFO,
00372 argP->theSrc, xfrP->reqData.Prty,
00373 xfrP->reqData.Options & XrdFrmRequest::makeRW?O_RDWR:O_RDONLY,
00374 argP->theMDP, xfrP->reqData.ID, xfrP->PFN, argP->theDst);
00375
00376
00377
00378 if (argP->theINS && argP->theEnv)
00379 {CID.Get(argP->theINS, CMS_CID, argP->theEnv);
00380 argP->theEnv->Put(XRD_INS, argP->theINS);
00381 }
00382
00383
00384
00385 k = argP->theVec->Subs(Info, pdata, pdlen);
00386
00387
00388
00389 *cmdBuff = '\0'; n = sizeof(cmdBuff) - 4; cP = cmdBuff;
00390 for (i = 0; i < k; i++)
00391 {n -= pdlen[i];
00392 if (n < 0)
00393 {Say.Emsg("Setup",E2BIG,"build command line for", xfrP->reqData.LFN);
00394 return 0;
00395 }
00396 strcpy(cP, pdata[i]); cP += pdlen[i];
00397 }
00398
00399
00400
00401 return (argP->theCmd->Setup(cmdBuff, &Say) == 0);
00402 }
00403
00404
00405
00406
00407
00408 void XrdFrmTransfer::Start()
00409 {
00410 EPNAME("Transfer");
00411 const char *Msg;
00412
00413
00414
00415
00416
00417 while(1)
00418 {xfrP = XrdFrmXfrQueue::Get();
00419
00420 DEBUG(xfrP->Type <<" starting " <<xfrP->reqData.LFN
00421 <<" for " <<xfrP->reqData.User);
00422
00423 Msg = (xfrP->qNum & XrdFrmRequest::outQ ? Throw() : Fetch());
00424 if (Msg && !(xfrP->RetCode)) xfrP->RetCode = 1;
00425 xfrP->PFN[xfrP->pfnEnd] = 0;
00426
00427 if (xfrP->RetCode || Config.Verbose)
00428 {char buff1[80], buff2[80];
00429 sprintf(buff1, "%s for %s ", xfrP->RetCode ? "failed" : "complete",
00430 xfrP->reqData.User);
00431 if (xfrP->RetCode == 0) *buff2 = 0;
00432 else sprintf(buff2, "; %s", (Msg ? Msg : "reason unknown"));
00433 Say.Say(0, xfrP->Type, buff1, xfrP->reqData.LFN,buff2);
00434 } else {
00435 DEBUG(xfrP->Type
00436 <<(xfrP->RetCode ? " failed " : " complete ")
00437 << xfrP->reqData.LFN <<" rc=" <<xfrP->RetCode
00438 <<' ' <<(Msg ? Msg : ""));
00439 }
00440
00441 XrdFrmXfrQueue::Done(xfrP, Msg);
00442 }
00443 }
00444
00445
00446
00447
00448
00449 int XrdFrmTransfer::TrackDC(char *Lfn, char *Mdp, char *Rfn)
00450 {
00451 char *FName, *Slash, *Slush = 0, *begRfn = Rfn;
00452 int n = -1;
00453
00454
00455
00456 if (*Rfn != '/'
00457 && (Slash = index(Rfn, '/')) && *(Slash+1) == '/'
00458 && (Slash = index(Slash+2, '/')) && *(Slash+1) == '/') begRfn = Slash+1;
00459
00460
00461
00462 if (!(FName = rindex(begRfn, '/')) || FName == begRfn) return 0;
00463 *FName = 0; Slash = Slush = FName;
00464
00465
00466
00467 pMutex.Lock();
00468 while(Slash != begRfn && !pTab.Find(Rfn))
00469 {do {Slash--;} while(Slash != begRfn && *Slash != '/');
00470 if (Slush) *Slush = '/';
00471 *Slash = 0; Slush = Slash;
00472 n++;
00473 }
00474 pMutex.UnLock();
00475
00476
00477
00478 *Slash = '/';
00479 if (Slash == begRfn) n = 0;
00480 else n = (n >= 0 ? Slash - begRfn : FName - begRfn);
00481 sprintf(Mdp, "%d", n);
00482
00483
00484
00485 return n;
00486 }
00487
00488
00489
00490 int XrdFrmTransfer::TrackDC(char *Rfn)
00491 {
00492 char *Slash;
00493
00494
00495
00496 if (!(Slash = rindex(Rfn, '/')) || Slash == Rfn) return 0;
00497 *Slash = 0;
00498
00499
00500
00501 pMutex.Lock();
00502 pTab.Add(Rfn, 0, 0, Hash_data_is_key);
00503 pMutex.UnLock();
00504 *Slash = '/';
00505 return 0;
00506 }
00507
00508
00509
00510
00511
00512 const char *XrdFrmTransfer::Throw()
00513 {
00514 XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
00515 XrdFrmTranArg cmdArg(&myEnv);
00516 struct stat begStat, endStat;
00517 XrdFrmTranChk Chk(&begStat);
00518 time_t xfrET;
00519 const char *eTxt;
00520 char Rfn[MAXPATHLEN+256], *lfnpath = xfrP->reqData.LFN, *theDest;
00521 int isMigr = xfrP->reqData.Options & XrdFrmRequest::Migrate;
00522 int iXfr, isURL, rc, mDP = -1;
00523
00524
00525
00526 if ((isURL = xfrP->reqData.LFO)) theDest = xfrP->reqData.LFN;
00527 else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
00528 return "lfn2rfn failed";
00529 theDest = Rfn;
00530 isURL = (*Rfn != '/');
00531 }
00532
00533
00534
00535 if (isURL)
00536 {if (xfrCmd[3]) iXfr = 3;
00537 else return "url copies not configured";
00538 } else {
00539 if (xfrCmd[1]) iXfr = 1;
00540 else return "non-url copies not configured";
00541 }
00542
00543
00544
00545 if (stat(xfrP->PFN, &begStat)) return (xfrP->reqFQ ? "file not found" : 0);
00546
00547
00548
00549 if ((eTxt = ffCheck())) return eTxt;
00550
00551
00552
00553
00554
00555
00556 if (isMigr && (eTxt = ThrowOK(&Chk)))
00557 {if (*eTxt) return eTxt;
00558 if (!(xfrP->reqData.Options & XrdFrmRequest::Purge)) return "logic error";
00559 Throwaway();
00560 return 0;
00561 }
00562
00563
00564
00565 cmdArg.theCmd = xfrCmd[iXfr];
00566 cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
00567 cmdArg.theDst = theDest;
00568 cmdArg.theSrc = xfrP->PFN;
00569 cmdArg.theINS = xfrP->reqData.iName;
00570 if (Config.xfrCmd[iXfr].Opts & Config.cmdMDP)
00571 mDP = TrackDC(lfnpath+xfrP->reqData.LFO, cmdArg.theMDP, Rfn);
00572 if (!SetupCmd(&cmdArg)) return "outgoing transfer setup failed";
00573
00574
00575
00576
00577 xfrET = time(0);
00578 if ((rc = cmdArg.theCmd->Run()))
00579 {if (isMigr) ffMake(rc == 2);
00580 return "copy failed";
00581 }
00582
00583
00584
00585 if (mDP >= 0) TrackDC(Rfn);
00586
00587
00588
00589 if (stat(xfrP->PFN, &endStat))
00590 {Say.Emsg("Throw", lfnpath, "transfered but not found!");
00591 return "unable to verify copy";
00592 }
00593
00594
00595
00596
00597 if (begStat.st_mtime != endStat.st_mtime
00598 || begStat.st_size != endStat.st_size)
00599 {Say.Emsg("Throw", lfnpath, "modified during transfer!");
00600 return "file modified during copy";
00601 }
00602
00603
00604
00605
00606
00607 if (xfrP->reqData.Options & XrdFrmRequest::Purge) Throwaway();
00608 else if (isMigr)
00609 {strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
00610 if (!stat(xfrP->PFN, &begStat))
00611 {struct utimbuf tbuff;
00612 tbuff.actime = tbuff.modtime = endStat.st_mtime+1;
00613 if (utime(xfrP->PFN, &tbuff))
00614 Say.Emsg("Throw", errno, "set utime for", xfrP->PFN);
00615 }
00616 xfrP->PFN[xfrP->pfnEnd] = '\0';
00617 }
00618
00619
00620
00621 if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
00622 || (Trace.What & TRACE_Debug))
00623 {int inqT, xfrT;
00624 long long Fsize = endStat.st_size;
00625 char sbuff[80];
00626 inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
00627 if ((xfrT = static_cast<int>(time(0) - xfrET)) <= 0) xfrT = 1;
00628 sprintf(sbuff, "Put: %lld qt: %d xt: %d up: ",Fsize,inqT,xfrT);
00629 Say.Say(0, sbuff, xfrP->reqData.User, " ", xfrP->reqData.LFN);
00630 }
00631
00632
00633
00634 return 0;
00635 }
00636
00637
00638
00639
00640
00641 void XrdFrmTransfer::Throwaway()
00642 {
00643 EPNAME("Throwaway");
00644
00645
00646
00647
00648 if (Config.Test) {DEBUG("Would have removed '" <<xfrP->PFN <<"'");}
00649 else {Config.ossFS->Unlink(xfrP->PFN, XRDOSS_isPFN|XRDOSS_isMIG);
00650 DEBUG("removed '" <<xfrP->PFN <<"'");
00651 if (Config.cmsPath) Config.cmsPath->Gone(xfrP->PFN);
00652 }
00653 }
00654
00655
00656
00657
00658
00659 const char *XrdFrmTransfer::ThrowOK(XrdFrmTranChk *cP)
00660 {
00661 XrdOssLock ufs_file;
00662 struct stat lokStat;
00663 int fnFD, statRC;
00664
00665
00666
00667 if (ufs_file.Serialize(xfrP->PFN, XrdOssDIR|XrdOssEXC) < 0)
00668 return "unable to lock directory";
00669
00670
00671
00672 if ((fnFD = open(xfrP->PFN, O_RDWR)) < 0) return "unable to open file";
00673 fcntl(fnFD, F_SETFD, FD_CLOEXEC);
00674 if (ufs_file.Serialize(fnFD, XrdOssEXC|XrdOssNOWAIT))
00675 {close(fnFD); return "file in use";}
00676 close(fnFD);
00677
00678
00679
00680 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
00681 statRC = stat(xfrP->PFN, &lokStat);
00682 xfrP->PFN[xfrP->pfnEnd] = '\0';
00683
00684
00685
00686 if (statRC) return "missing lock file";
00687 if (lokStat.st_mtime >= cP->Stat->st_mtime)
00688 {if (xfrP->reqData.Options & XrdFrmRequest::Purge) return "";
00689 return "already migrated";
00690 }
00691
00692
00693
00694
00695 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
00696 cP->lkfP = new XrdOucSxeq(XrdOucSxeq::Lock, xfrP->PFN);
00697 xfrP->PFN[xfrP->pfnEnd] = '\0';
00698 return 0;
00699 }