00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdcpCVSID = "$Id: Xrdcp.cc 38011 2011-02-08 18:35:57Z ganis $";
00014
00015 #include "XrdClient/XrdClientUrlInfo.hh"
00016 #include "XrdSys/XrdSysPthread.hh"
00017 #include "XrdClient/XrdClient.hh"
00018 #include "XrdClient/XrdCpMthrQueue.hh"
00019 #include "XrdClient/XrdClientDebug.hh"
00020 #include "XrdClient/XrdCpWorkLst.hh"
00021 #include "XrdClient/XrdClientEnv.hh"
00022 #include "XrdSys/XrdSysPlatform.hh"
00023
00024 #include <XrdCrypto/XrdCryptoFactory.hh>
00025 #include <XrdCrypto/XrdCryptoMsgDigest.hh>
00026
00027 #include "XrdClient/XrdClientAbsMonIntf.hh"
00028 #include "XrdClient/XrdcpXtremeRead.hh"
00029
00030 #include <sys/types.h>
00031 #include <sys/stat.h>
00032 #include <fcntl.h>
00033 #ifndef WIN32
00034 #include <sys/time.h>
00035 #include <unistd.h>
00036 #include <dlfcn.h>
00037 #endif
00038 #include <stdarg.h>
00039 #include <stdio.h>
00040
00041 #ifdef HAVE_LIBZ
00042 #include <zlib.h>
00043 #endif
00044
00045 extern "C" {
00046
00047
00048
00049 void cout_print(const char *format, ...)
00050 {
00051 char cout_buff[4096];
00052 va_list args;
00053 va_start(args, format);
00054 vsprintf(cout_buff, format, args);
00055 va_end(args);
00056 cout << cout_buff;
00057 }
00058
00059 void cerr_print(const char *format, ...)
00060 {
00061 char cerr_buff[4096];
00062 va_list args;
00063 va_start(args, format);
00064 vsprintf(cerr_buff, format, args);
00065 va_end(args);
00066 cerr << cerr_buff;
00067 }
00068
00069 #define COUT(s) do { \
00070 cout_print s; \
00071 } while (0)
00072
00073 #define CERR(s) do { \
00074 cerr_print s; \
00075 } while (0)
00076
00077 }
00078
00079
00080
00081 struct XrdCpInfo {
00082 XrdClient *XrdCli;
00083 int localfile;
00084 long long len, bread, bwritten;
00085 XrdCpMthrQueue queue;
00086 XrdClientAbsMonIntf *mon;
00087 } cpnfo;
00088
00089 #define XRDCP_BLOCKSIZE (8*1024*1024)
00090 #define XRDCP_XRDRASIZE (30*XRDCP_BLOCKSIZE)
00091 #define XRDCP_VERSION "(C) 2004-2010 by the Xrootd group. $Revision$ - Xrootd version: "XrdVSTRING
00092
00093
00094
00095
00096 bool summary=false;
00097 bool progbar=true;
00098 bool md5=false;
00099 bool adlerchk=false;
00100
00101 XrdOucString monlibname = "libXrdCpMonitorClient.so";
00102
00103 char *srcopaque=0,
00104 *dstopaque=0;
00105
00106 kXR_unt16 xrd_wr_flags=kXR_async | kXR_mkpath | kXR_open_updt | kXR_new;
00107
00108
00109 #define LOC_WR_FLAGS_FORCE ( O_CREAT | O_WRONLY | O_TRUNC | O_BINARY );
00110 #define LOC_WR_FLAGS ( O_CREAT | O_WRONLY | O_EXCL | O_BINARY );
00111 int loc_wr_flags = LOC_WR_FLAGS;
00112
00113 bool recurse = false;
00114
00115 char BWMHost[1024];
00116
00117 bool doXtremeCp = false;
00118 XrdOucString XtremeCpRdr;
00119
00120
00121
00122
00123 struct timeval abs_start_time;
00124 struct timeval abs_stop_time;
00125 struct timezone tz;
00126
00127 #ifdef HAVE_XRDCRYPTO
00128
00129 XrdCryptoMsgDigest *MD_5=0;
00130 XrdCryptoFactory *gCryptoFactory = 0;
00131 #endif
00132
00133
00134 unsigned int adler = 0;
00135
00136 #ifdef HAVE_XRDCRYPTO
00137 void print_summary(const char* src, const char* dst, unsigned long long bytesread, XrdCryptoMsgDigest* _MD_5, unsigned int adler ) {
00138 #else
00139 void print_summary(const char* src, const char* dst, unsigned long long bytesread, unsigned int adler ) {
00140 #endif
00141 gettimeofday (&abs_stop_time, &tz);
00142 float abs_time=((float)((abs_stop_time.tv_sec - abs_start_time.tv_sec) *1000 +
00143 (abs_stop_time.tv_usec - abs_start_time.tv_usec) / 1000));
00144
00145
00146 XrdOucString xsrc(src);
00147 XrdOucString xdst(dst);
00148 xsrc.erase(xsrc.rfind('?'));
00149 xdst.erase(xdst.rfind('?'));
00150
00151 COUT(("[xrdcp] #################################################################\n"));
00152 COUT(("[xrdcp] # Source Name : %s\n",xsrc.c_str()));
00153 COUT(("[xrdcp] # Destination Name : %s\n",xdst.c_str()));
00154 COUT(("[xrdcp] # Data Copied [bytes] : %lld\n",bytesread));
00155 COUT(("[xrdcp] # Realtime [s] : %f\n",abs_time/1000.0));
00156 if (abs_time > 0) {
00157 COUT(("[xrdcp] # Eff.Copy. Rate[MB/s] : %f\n",bytesread/abs_time/1000.0));
00158 }
00159 #ifdef HAVE_XRDCRYPTO
00160 #ifndef WIN32
00161 if (md5) {
00162 COUT(("[xrdcp] # md5 : %s\n",_MD_5->AsHexString()));
00163 }
00164 #endif
00165 #endif
00166 if (adlerchk) {
00167 COUT(("[xrdcp] # adler32 : %x\n", adler));
00168 }
00169 COUT(("[xrdcp] #################################################################\n"));
00170 }
00171
00172 void print_progbar(unsigned long long bytesread, unsigned long long size) {
00173 CERR(("[xrootd] Total %.02f MB\t|",(float)size/1024/1024));
00174 for (int l=0; l< 20;l++) {
00175 if (l< ( (int)(20.0*bytesread/size)))
00176 CERR(("="));
00177 if (l==( (int)(20.0*bytesread/size)))
00178 CERR((">"));
00179 if (l> ( (int)(20.0*bytesread/size)))
00180 CERR(("."));
00181 }
00182
00183 float abs_time=((float)((abs_stop_time.tv_sec - abs_start_time.tv_sec) *1000 +
00184 (abs_stop_time.tv_usec - abs_start_time.tv_usec) / 1000));
00185 CERR(("| %.02f %% [%.01f MB/s]\r",100.0*bytesread/size,bytesread/abs_time/1000.0));
00186 }
00187
00188 #ifdef HAVE_XRDCRYPTO
00189 void print_chksum(const char* src, unsigned long long bytesread, XrdCryptoMsgDigest* _MD_5, unsigned adler) {
00190 if (_MD_5 || adlerchk) {
00191 #else
00192 void print_chksum(const char* src, unsigned long long bytesread, unsigned adler) {
00193 if (adlerchk) {
00194 #endif
00195 XrdOucString xsrc(src);
00196 xsrc.erase(xsrc.rfind('?'));
00197
00198 #ifdef HAVE_XRDCRYPTO
00199 #ifndef WIN32
00200 if (_MD_5)
00201 cout << "md5: " << _MD_5->AsHexString() << " " << xsrc << " " << bytesread << endl;
00202 #endif
00203 #endif
00204 if (adlerchk)
00205 cout << "adler32: " << hex << adler << " " << xsrc << bytesread << endl;
00206
00207 }
00208 }
00209
00210
00211
00212
00213
00214
00215
00216 void *ReaderThread_xrd(void *)
00217 {
00218
00219 Info(XrdClientDebug::kHIDEBUG,
00220 "ReaderThread_xrd",
00221 "Reader Thread starting.");
00222
00223 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00224 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00225
00226
00227 void *buf;
00228 long long offs = 0;
00229 int nr = 1;
00230 long long bread = 0, len = 0;
00231 long blksize;
00232
00233 len = cpnfo.len;
00234
00235 while ((nr > 0) && (offs < len)) {
00236 buf = malloc(XRDCP_BLOCKSIZE);
00237 if (!buf) {
00238 cerr << "Out of memory." << endl;
00239 abort();
00240 }
00241
00242
00243 blksize = xrdmin(XRDCP_BLOCKSIZE, len-offs);
00244
00245 if ( (nr = cpnfo.XrdCli->Read(buf, offs, blksize)) ) {
00246 cpnfo.queue.PutBuffer(buf, offs, nr);
00247 cpnfo.XrdCli->RemoveDataFromCache(offs, offs+nr-1, false);
00248 bread += nr;
00249 offs += nr;
00250 }
00251
00252 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00253 pthread_testcancel();
00254 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00255 }
00256
00257 cpnfo.bread = bread;
00258
00259
00260 cpnfo.queue.PutBuffer(0, 0, 0);
00261
00262 return 0;
00263 }
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273 struct xtreme_threadnfo {
00274 XrdXtRdFile *xtrdhandler;
00275
00276
00277 XrdClient *cli;
00278
00279
00280 int clientidx;
00281
00282
00283 int startfromblk;
00284
00285
00286 int maxoutstanding;
00287 };
00288 void *ReaderThread_xrd_xtreme(void *parm)
00289 {
00290
00291 Info(XrdClientDebug::kHIDEBUG,
00292 "ReaderThread_xrd_xtreme",
00293 "Reader Thread starting.");
00294
00295 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00296 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00297
00298 void *buf;
00299
00300 int nr = 1;
00301 int noutstanding = 0;
00302
00303
00304
00305 XrdXtRdBlkInfo *blknfo = 0;
00306 xtreme_threadnfo *thrnfo = (xtreme_threadnfo *)parm;
00307
00308
00309 int lastprefetched = thrnfo->startfromblk;
00310 int lastread = lastprefetched;
00311
00312 thrnfo->cli->Open(0, 0, true);
00313
00314 thrnfo->cli->SetCacheParameters(XRDCP_BLOCKSIZE*4*thrnfo->maxoutstanding*2, 0, XrdClientReadCache::kRmBlk_FIFO);
00315 if (thrnfo->cli->IsOpen_wait())
00316 while (nr > 0) {
00317
00318
00319 while (noutstanding < thrnfo->maxoutstanding) {
00320 int lp;
00321 lp = thrnfo->xtrdhandler->GetBlkToPrefetch(lastprefetched, thrnfo->clientidx, blknfo);
00322 if (lp >= 0) {
00323
00324 if ( thrnfo->cli->Read_Async(blknfo->offs, blknfo->len) == kOK ) {
00325 lastprefetched = lp;
00326 noutstanding++;
00327 }
00328 else break;
00329 }
00330 else break;
00331 }
00332
00333 int lr = thrnfo->xtrdhandler->GetBlkToRead(lastread, thrnfo->clientidx, blknfo);
00334 if (lr >= 0) {
00335
00336 buf = malloc(blknfo->len);
00337 if (!buf) {
00338 cerr << "Out of memory." << endl;
00339 abort();
00340 }
00341
00342
00343
00344
00345 nr = thrnfo->cli->Read(buf, blknfo->offs, blknfo->len);
00346 if ( nr >= 0 ) {
00347 lastread = lr;
00348 noutstanding--;
00349
00350
00351
00352 int reward = thrnfo->xtrdhandler->MarkBlkAsRead(lr);
00353 if (reward >= 0)
00354
00355 cpnfo.queue.PutBuffer(buf, blknfo->offs, nr);
00356
00357 if (reward > 0) {
00358 thrnfo->maxoutstanding++;
00359 thrnfo->maxoutstanding = xrdmin(20, thrnfo->maxoutstanding);
00360 thrnfo->cli->SetCacheParameters(XRDCP_BLOCKSIZE*4*thrnfo->maxoutstanding*2, 0, XrdClientReadCache::kRmBlk_FIFO);
00361 }
00362 if (reward < 0) {
00363 thrnfo->maxoutstanding--;
00364 free(buf);
00365 }
00366
00367 if (thrnfo->maxoutstanding <= 0) {
00368 sleep(1);
00369 thrnfo->maxoutstanding = 1;
00370 }
00371
00372 }
00373
00374
00375 thrnfo->cli->RemoveDataFromCache(blknfo->offs, blknfo->offs+blknfo->len-1, false);
00376 }
00377 else {
00378
00379 if (thrnfo->xtrdhandler->AllDone()) break;
00380 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00381 sleep(1);
00382 }
00383
00384
00385 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00386 pthread_testcancel();
00387 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00388 }
00389
00390
00391
00392 cpnfo.queue.PutBuffer(0, 0, 0);
00393
00394 return 0;
00395 }
00396
00397
00398
00399
00400
00401 void *ReaderThread_loc(void *) {
00402
00403 Info(XrdClientDebug::kHIDEBUG,
00404 "ReaderThread_loc",
00405 "Reader Thread starting.");
00406
00407 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00408 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00409
00410 void *buf;
00411 long long offs = 0;
00412 int nr = 1;
00413 long long bread = 0;
00414
00415 while (nr > 0) {
00416 buf = malloc(XRDCP_BLOCKSIZE);
00417 if (!buf) {
00418 cerr << "Out of memory." << endl;
00419 abort();
00420 }
00421
00422 if ( (nr = read(cpnfo.localfile, buf, XRDCP_BLOCKSIZE)) ) {
00423 cpnfo.queue.PutBuffer(buf, offs, nr);
00424 bread += nr;
00425 offs += nr;
00426 }
00427 }
00428
00429 cpnfo.bread = bread;
00430
00431
00432 cpnfo.queue.PutBuffer(0, 0, 0);
00433
00434 return 0;
00435 }
00436
00437
00438 int CreateDestPath_loc(XrdOucString path, bool isdir) {
00439
00440 if (!isdir) {
00441 int pos = path.rfind('/');
00442
00443 if (pos != STR_NPOS)
00444 path.erase(pos);
00445 else path = "";
00446
00447
00448 }
00449
00450 if (path != "")
00451 return ( MAKEDIR(
00452 path.c_str(),
00453 S_IRUSR | S_IWUSR | S_IXUSR |
00454 S_IRGRP | S_IWGRP | S_IXGRP |
00455 S_IROTH | S_IXOTH)
00456 );
00457 else
00458 return 0;
00459
00460 }
00461
00462 void BuildFullDestFilename(XrdOucString &src, XrdOucString &dest, bool destisdir) {
00463 if (destisdir) {
00464
00465 XrdOucString fn(src);
00466 fn.erase(fn.find('?'));
00467 int lsl = fn.rfind('/');
00468 if (lsl != STR_NPOS)
00469 fn.erase(0, lsl+1);
00470 dest += fn;
00471 }
00472 }
00473
00474 int CreateDestPath_xrd(XrdOucString url, bool isdir) {
00475
00476 bool statok = FALSE, done = FALSE, direxists = TRUE;
00477 long id, flags, modtime;
00478 long long size;
00479 char *path, *slash;
00480
00481 if (url == "-") return 0;
00482
00483
00484 url.erase(url.rfind('/') + 1);
00485
00486 XrdClientAdmin *adm = new XrdClientAdmin(url.c_str());
00487 if (adm->Connect()) {
00488 XrdClientUrlInfo u(url);
00489
00490 statok = adm->Stat((char *)u.File.c_str(), id, size, flags, modtime);
00491
00492
00493
00494 if (adm->GetCurrentUrl().IsValid()) {
00495 u.Host = adm->GetCurrentUrl().Host;
00496 u.Port = adm->GetCurrentUrl().Port;
00497 url = u.GetUrl();
00498 }
00499
00500 path = (char *)u.File.c_str();
00501 slash = path;
00502
00503
00504 slash += strspn(slash, "/");
00505 slash += strcspn(slash, "/");
00506
00507
00508 done = (statok && (flags & kXR_isDir));
00509
00510
00511 while (!done) {
00512 slash += strspn(slash, "/");
00513 slash += strcspn(slash, "/");
00514
00515 char nextChar = *(slash+1);
00516 done = (*slash == '\0' || nextChar == '\0');
00517 *(slash+1) = '\0';
00518
00519 if (direxists) {
00520 statok = adm->Stat(path, id, size, flags, modtime);
00521 if (!statok || (!(flags & kXR_xset) && !(flags & kXR_other))) {
00522 direxists = FALSE;
00523 }
00524 }
00525
00526 if (!direxists) {
00527 Info(XrdClientDebug::kHIDEBUG,
00528 "CreateDestPath__xrd",
00529 "Creating directory " << path);
00530
00531 adm->Mkdir(path, 7, 5, 5);
00532
00533 }
00534 *(slash+1) = nextChar;
00535 }
00536 }
00537
00538 delete adm;
00539 return 0;
00540 }
00541
00542 int doCp_xrd2xrd(XrdClient **xrddest, const char *src, const char *dst) {
00543
00544 pthread_t myTID;
00545 XrdClientVector<pthread_t> myTIDVec;
00546
00547 void *thret;
00548 XrdClientStatInfo stat;
00549 int retvalue = 0;
00550
00551 gettimeofday(&abs_start_time,&tz);
00552
00553
00554
00555 if (!cpnfo.XrdCli) {
00556 cpnfo.XrdCli = new XrdClient(src);
00557 if ( ( !cpnfo.XrdCli->Open(0, kXR_async) ||
00558 (cpnfo.XrdCli->LastServerResp()->status != kXR_ok) ) ) {
00559 cerr << "Error opening remote source file " << src << endl;
00560 PrintLastServerError(cpnfo.XrdCli);
00561
00562 delete cpnfo.XrdCli;
00563 cpnfo.XrdCli = 0;
00564 return 1;
00565 }
00566 }
00567
00568
00569 cpnfo.XrdCli->Stat(&stat);
00570 cpnfo.len = stat.size;
00571
00572
00573 if (!*xrddest) {
00574 *xrddest = new XrdClient(dst);
00575
00576 if (!PedanticOpen4Write(*xrddest, kXR_ur | kXR_uw | kXR_gw | kXR_gr | kXR_or,
00577 xrd_wr_flags)) {
00578 cerr << "Error opening remote destination file " << dst << endl;
00579 PrintLastServerError(*xrddest);
00580
00581 delete cpnfo.XrdCli;
00582 delete *xrddest;
00583 *xrddest = 0;
00584 cpnfo.XrdCli = 0;
00585 return -1;
00586 }
00587
00588 }
00589
00590
00591
00592 XrdClientVector<XrdClient *> xtremeclients;
00593 XrdXtRdFile *xrdxtrdfile = 0;
00594
00595 if (doXtremeCp)
00596 XrdXtRdFile::GetListOfSources(cpnfo.XrdCli, XtremeCpRdr, xtremeclients);
00597
00598
00599 if (doXtremeCp && (xtremeclients.GetSize() > 1)) {
00600
00601
00602
00603
00604 xrdxtrdfile = new XrdXtRdFile(XRDCP_BLOCKSIZE*4, cpnfo.len);
00605
00606 for (int iii = 0; iii < xtremeclients.GetSize(); iii++) {
00607 xtreme_threadnfo *nfo = new(xtreme_threadnfo);
00608 nfo->xtrdhandler = xrdxtrdfile;
00609 nfo->cli = xtremeclients[iii];
00610 nfo->clientidx = xrdxtrdfile->GimmeANewClientIdx();
00611 nfo->startfromblk = iii*xrdxtrdfile->GetNBlks() / xtremeclients.GetSize();
00612 nfo->maxoutstanding = xrdmin( 5, xrdxtrdfile->GetNBlks() / xtremeclients.GetSize() );
00613
00614 XrdSysThread::Run(&myTID, ReaderThread_xrd_xtreme, (void *)nfo);
00615 myTIDVec.Push_back(myTID);
00616 }
00617
00618 }
00619 else {
00620 XrdSysThread::Run(&myTID, ReaderThread_xrd, (void *)&cpnfo);
00621 myTIDVec.Push_back(myTID);
00622 }
00623
00624
00625
00626
00627
00628 int len = 1;
00629 void *buf;
00630 long long offs = 0;
00631 long long bytesread=0;
00632 long long size = cpnfo.len;
00633 bool draining = false;
00634
00635
00636 while (1) {
00637
00638 if (xrdxtrdfile && xrdxtrdfile->AllDone()) draining = true;
00639 if (draining && !cpnfo.queue.GetLength()) break;
00640
00641 if ( cpnfo.queue.GetBuffer(&buf, offs, len) ) {
00642
00643 if (len && buf) {
00644
00645 bytesread+=len;
00646 if (progbar) {
00647 gettimeofday(&abs_stop_time,&tz);
00648 print_progbar(bytesread,size);
00649 }
00650
00651 #ifdef HAVE_XRDCRYPTO
00652 if (md5) {
00653 MD_5->Update((const char*)buf,len);
00654 }
00655 #endif
00656
00657 #ifdef HAVE_LIBZ
00658 if (adlerchk) {
00659 adler = adler32(adler, (const Bytef*)buf, len);
00660 }
00661 #endif
00662
00663 if (!(*xrddest)->Write(buf, offs, len)) {
00664 cerr << "Error writing to output server." << endl;
00665 PrintLastServerError(*xrddest);
00666 retvalue = 11;
00667 break;
00668 }
00669
00670 if (cpnfo.mon)
00671 cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0);
00672
00673 free(buf);
00674
00675 }
00676 else
00677 if (!xrdxtrdfile && ( ((buf == 0) && (len == 0)) || (bytesread >= size))) {
00678 if (buf) free(buf);
00679 break;
00680 }
00681
00682 }
00683 else {
00684 cerr << endl << endl <<
00685 "Critical read timeout. Unable to read data from the source." << endl;
00686 retvalue = -1;
00687 break;
00688 }
00689
00690 buf = 0;
00691 }
00692
00693 if (cpnfo.mon)
00694 cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0, 1);
00695
00696 if(progbar) {
00697 cout << endl;
00698 }
00699
00700 if (cpnfo.len != bytesread) {
00701 cerr << endl << endl <<
00702 "File length mismatch. Read:" << bytesread << " Length:" << cpnfo.len << endl;
00703 retvalue = 13;
00704 }
00705
00706 #ifdef HAVE_XRDCRYPTO
00707 if (md5) MD_5->Final();
00708 if (adlerchk || md5) {
00709 print_chksum(src, bytesread, MD_5, adler);
00710 }
00711
00712 if (summary) {
00713 print_summary(src, dst, bytesread, MD_5, adler);
00714 }
00715 #else
00716 if (adlerchk) {
00717 print_chksum(src, bytesread, adler);
00718 }
00719
00720 if (summary) {
00721 print_summary(src, dst, bytesread, adler);
00722 }
00723 #endif
00724
00725 if (retvalue >= 0) {
00726
00727 for (int i = 0; i < myTIDVec.GetSize(); i++) {
00728 pthread_cancel(myTIDVec[i]);
00729 pthread_join(myTIDVec[i], &thret);
00730
00731 delete cpnfo.XrdCli;
00732 cpnfo.XrdCli = 0;
00733 }
00734 }
00735
00736 delete *xrddest;
00737 *xrddest = 0;
00738
00739 return retvalue;
00740 }
00741
00742
00743 XrdClient *BWMToken_Init(const char *bwmhost, const char *srcurl, const char *dsturl) {
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754 if (!bwmhost[0]) return 0;
00755
00756 XrdClientUrlInfo usrc(srcurl);
00757 XrdClientUrlInfo udst(dsturl);
00758 XrdOucString s = "root://";
00759 s += bwmhost;
00760 s += "//_bwm_/";
00761
00762 s += usrc.File;
00763
00764 char hname[1024];
00765 memset(hname, 0, sizeof(hname));
00766
00767 if (gethostname(hname, sizeof(hname)))
00768 strcpy(hname, "Unknown");
00769
00770 s += "?bwm.src=";
00771 if (usrc.Host != "")
00772 s += usrc.Host;
00773 else
00774 s += hname;
00775
00776 s += "?bwm.dst=";
00777 if (udst.Host != "")
00778 s += udst.Host;
00779 else
00780 s += hname;
00781
00782 XrdClient *cli = new XrdClient(s.c_str());
00783 if (cli) cli->Open(0, kXR_open_updt);
00784 return cli;
00785 }
00786
00787 bool BWMToken_WaitFor(XrdClient *cli) {
00788
00789
00790
00791
00792
00793 kXR_char buf[4096];
00794
00795 if (cli) {
00796 if (!cli->IsOpen()) return false;
00797 return cli->Query(kXR_Qvisa, 0, buf, sizeof(buf));
00798 }
00799 else return true;
00800 }
00801
00802
00803 int doCp_xrd2loc(const char *src, const char *dst) {
00804
00805 pthread_t myTID;
00806 XrdClientVector<pthread_t> myTIDVec;
00807
00808 void *thret;
00809 XrdClientStatInfo stat;
00810 int f;
00811 int retvalue = 0;
00812
00813 if (BWMHost[0]) {
00814
00815 XrdClient *tok1 = BWMToken_Init(BWMHost, src, dst);
00816 if (!tok1 || !BWMToken_WaitFor(tok1)) return 100;
00817
00818
00819 XrdClientUrlInfo u(src);
00820 XrdClient *tok2 = BWMToken_Init(u.Host.c_str(), src, dst);
00821 if (!tok2 || !BWMToken_WaitFor(tok2)) return 100;
00822 }
00823
00824 gettimeofday(&abs_start_time,&tz);
00825
00826
00827
00828 if (!cpnfo.XrdCli) {
00829 cpnfo.XrdCli = new XrdClient(src);
00830 if ( ( !cpnfo.XrdCli->Open(0, kXR_async) ||
00831 (cpnfo.XrdCli->LastServerResp()->status != kXR_ok) ) ) {
00832
00833 cerr << "Error opening remote source file " << src << endl;
00834 PrintLastServerError(cpnfo.XrdCli);
00835
00836 delete cpnfo.XrdCli;
00837 cpnfo.XrdCli = 0;
00838 return 1;
00839 }
00840 }
00841
00842
00843 cpnfo.XrdCli->Stat(&stat);
00844 cpnfo.len = stat.size;
00845
00846 if (strcmp(dst, "-")) {
00847
00848
00849 f = open(dst, loc_wr_flags,
00850 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
00851 if (f < 0) {
00852 cerr << "Error '" << strerror(errno) <<
00853 "' creating " << dst << endl;
00854
00855 cpnfo.XrdCli->Close();
00856 delete cpnfo.XrdCli;
00857 cpnfo.XrdCli = 0;
00858 return -1;
00859 }
00860
00861 }
00862 else
00863
00864 f = STDOUT_FILENO;
00865
00866
00867
00868
00869 XrdClientVector<XrdClient *> xtremeclients;
00870 XrdXtRdFile *xrdxtrdfile = 0;
00871
00872 if (doXtremeCp)
00873 XrdXtRdFile::GetListOfSources(cpnfo.XrdCli, XtremeCpRdr, xtremeclients);
00874
00875
00876 if (doXtremeCp && (xtremeclients.GetSize() > 1)) {
00877
00878
00879
00880
00881 xrdxtrdfile = new XrdXtRdFile(XRDCP_BLOCKSIZE*4, cpnfo.len);
00882
00883 for (int iii = 0; iii < xtremeclients.GetSize(); iii++) {
00884 xtreme_threadnfo *nfo = new(xtreme_threadnfo);
00885 nfo->xtrdhandler = xrdxtrdfile;
00886 nfo->cli = xtremeclients[iii];
00887 nfo->clientidx = xrdxtrdfile->GimmeANewClientIdx();
00888 nfo->startfromblk = iii*xrdxtrdfile->GetNBlks() / xtremeclients.GetSize();
00889 nfo->maxoutstanding = xrdmax(xrdmin( 3, xrdxtrdfile->GetNBlks() / xtremeclients.GetSize() ), 1);
00890
00891 XrdSysThread::Run(&myTID, ReaderThread_xrd_xtreme, (void *)nfo);
00892 }
00893
00894 }
00895 else {
00896 doXtremeCp = false;
00897 XrdSysThread::Run(&myTID, ReaderThread_xrd, (void *)&cpnfo);
00898 }
00899
00900 int len = 1;
00901 void *buf;
00902 long long bytesread=0, offs = 0;
00903 long long size = cpnfo.len;
00904 bool draining = false;
00905
00906
00907 while (1) {
00908
00909 if (xrdxtrdfile && xrdxtrdfile->AllDone()) draining = true;
00910 if (draining && !cpnfo.queue.GetLength()) break;
00911
00912 if ( cpnfo.queue.GetBuffer(&buf, offs, len) ) {
00913
00914 if (len && buf) {
00915
00916 bytesread+=len;
00917 if (progbar) {
00918 gettimeofday(&abs_stop_time,&tz);
00919 print_progbar(bytesread,size);
00920 }
00921
00922 #ifdef HAVE_XRDCRYPTO
00923 if (md5) {
00924 MD_5->Update((const char*)buf,len);
00925 }
00926 #endif
00927
00928 #ifdef HAVE_LIBZ
00929 if (adlerchk) {
00930 adler = adler32(adler, (const Bytef*)buf, len);
00931 }
00932 #endif
00933
00934 if (doXtremeCp && (f != STDOUT_FILENO) && lseek(f, offs, SEEK_SET) < 0) {
00935 cerr << "Error '" << strerror(errno) <<
00936 "' seeking to " << dst << endl;
00937 retvalue = 10;
00938 break;
00939 }
00940 if (write(f, buf, len) <= 0) {
00941 cerr << "Error '" << strerror(errno) <<
00942 "' writing to " << dst << endl;
00943 retvalue = 10;
00944 break;
00945 }
00946
00947 if (cpnfo.mon)
00948 cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0);
00949
00950 free(buf);
00951
00952 }
00953 else
00954 if (!xrdxtrdfile && ( ((buf == 0) && (len == 0)) || (bytesread >= size)) ) {
00955 if (buf) free(buf);
00956 break;
00957 }
00958
00959
00960 }
00961 else {
00962 cerr << endl << endl << "Critical read timeout. Unable to read data from the source." << endl;
00963 retvalue = -1;
00964 break;
00965 }
00966
00967 buf = 0;
00968
00969 }
00970
00971 if (cpnfo.mon)
00972 cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0, 1);
00973
00974 if(progbar) {
00975 cout << endl;
00976 }
00977
00978 if (cpnfo.len != bytesread) retvalue = 13;
00979
00980 #ifdef HAVE_XRDCRYPTO
00981 if (md5) MD_5->Final();
00982 if (md5 || adlerchk) {
00983 print_chksum(src, bytesread, MD_5, adler);
00984 }
00985
00986 if (summary) {
00987 print_summary(src,dst,bytesread,MD_5, adler);
00988 }
00989 #else
00990 if (adlerchk) {
00991 print_chksum(src, bytesread, adler);
00992 }
00993
00994 if (summary) {
00995 print_summary(src,dst,bytesread,adler);
00996 }
00997 #endif
00998
00999 int closeres = close(f);
01000 if (!retvalue) retvalue = closeres;
01001
01002 if (retvalue >= 0) {
01003 for (int i = 0; i < myTIDVec.GetSize(); i++) {
01004 pthread_cancel(myTIDVec[i]);
01005 pthread_join(myTIDVec[i], &thret);
01006
01007 delete cpnfo.XrdCli;
01008 cpnfo.XrdCli = 0;
01009 }
01010
01011 delete cpnfo.XrdCli;
01012 cpnfo.XrdCli = 0;
01013 }
01014
01015 return retvalue;
01016 }
01017
01018
01019
01020 int doCp_loc2xrd(XrdClient **xrddest, const char *src, const char * dst) {
01021
01022 pthread_t myTID;
01023 void * thret;
01024 int retvalue = 0;
01025 struct stat stat;
01026
01027 gettimeofday(&abs_start_time,&tz);
01028
01029
01030 cpnfo.localfile = open(src, O_RDONLY | O_BINARY);
01031 if (cpnfo.localfile < 0) {
01032 cerr << "Error '" << strerror(errno) << "' opening " << src << endl;
01033 cpnfo.localfile = 0;
01034 return -1;
01035 }
01036
01037 if (fstat(cpnfo.localfile, &stat)) {
01038 cerr << "Error '" << strerror(errno) << "' stat " << src << endl;
01039 cpnfo.localfile = 0;
01040 return -1;
01041 }
01042
01043
01044 if (!*xrddest) {
01045
01046 *xrddest = new XrdClient(dst);
01047 if (!PedanticOpen4Write(*xrddest, kXR_ur | kXR_uw | kXR_gw | kXR_gr | kXR_or,
01048 xrd_wr_flags) ) {
01049 cerr << "Error opening remote destination file " << dst << endl;
01050 PrintLastServerError(*xrddest);
01051
01052 close(cpnfo.localfile);
01053 delete *xrddest;
01054 *xrddest = 0;
01055 cpnfo.localfile = 0;
01056 return -1;
01057 }
01058 }
01059
01060
01061 XrdSysThread::Run(&myTID, ReaderThread_loc, (void *)&cpnfo);
01062
01063 int len = 1;
01064 void *buf;
01065 long long offs = 0;
01066 unsigned long long bytesread=0;
01067 unsigned long long size = stat.st_size;
01068 int blkcnt = 0;
01069
01070
01071 while (len > 0) {
01072
01073 if ( cpnfo.queue.GetBuffer(&buf, offs, len) ) {
01074 if (len && buf) {
01075
01076 bytesread+=len;
01077 if (progbar) {
01078 gettimeofday(&abs_stop_time,&tz);
01079 print_progbar(bytesread,size);
01080 }
01081
01082 #ifdef HAVE_XRDCRYPTO
01083 if (md5) {
01084 MD_5->Update((const char*)buf,len);
01085 }
01086 #endif
01087
01088 #ifdef HAVE_LIBZ
01089 if (adlerchk) {
01090 adler = adler32(adler, (const Bytef*)buf, len);
01091 }
01092 #endif
01093 if ( !(*xrddest)->Write(buf, offs, len) ) {
01094 cerr << "Error writing to output server." << endl;
01095 PrintLastServerError(*xrddest);
01096 retvalue = 12;
01097 break;
01098 }
01099
01100 if (cpnfo.mon)
01101 cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0);
01102
01103 free(buf);
01104 }
01105 else {
01106
01107 if (buf) free(buf);
01108 break;
01109 }
01110 }
01111 else {
01112 cerr << endl << endl << "Critical read timeout. Unable to read data from the source." << endl;
01113 retvalue = -1;
01114 break;
01115 }
01116
01117 buf = 0;
01118 blkcnt++;
01119 }
01120
01121
01122 if (cpnfo.mon)
01123 cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0, 1);
01124
01125 if(progbar) {
01126 cout << endl;
01127 }
01128
01129 if (size != bytesread) retvalue = 13;
01130
01131 #ifdef HAVE_XRDCRYPTO
01132 if (md5) MD_5->Final();
01133 if (md5 || adlerchk) {
01134 print_chksum(src, bytesread, MD_5, adler);
01135 }
01136
01137 if (summary) {
01138 print_summary(src, dst, bytesread, MD_5, adler);
01139 }
01140 #else
01141 if (adlerchk) {
01142 print_chksum(src, bytesread, adler);
01143 }
01144
01145 if (summary) {
01146 print_summary(src, dst, bytesread, adler);
01147 }
01148 #endif
01149
01150 pthread_cancel(myTID);
01151 pthread_join(myTID, &thret);
01152
01153 delete *xrddest;
01154 *xrddest = 0;
01155
01156 close(cpnfo.localfile);
01157 cpnfo.localfile = 0;
01158
01159 return retvalue;
01160 }
01161
01162
01163 void PrintUsage() {
01164 cerr << "usage: xrdcp <source> <dest> "
01165 "[-d lvl] [-DSparmname stringvalue] ... [-DIparmname intvalue] [-s] [-ns] [-v]"
01166 " [-OS<opaque info>] [-OD<opaque info>] [-force] [-md5] [-adler] [-np] [-f] [-R] [-S]" << endl << endl;
01167
01168 cerr << "<source> can be:" << endl <<
01169 " a local file" << endl <<
01170 " a local directory name suffixed by /" << endl <<
01171 " an xrootd URL in the form root://user@host/<absolute Logical File Name in xrootd domain>" << endl <<
01172 " (can be a directory. In this case the -R option can be fully honored only on a standalone server)" << endl;
01173 cerr << "<dest> can be:" << endl <<
01174 " a local file" << endl <<
01175 " a local directory name suffixed by /" << endl <<
01176 " an xrootd URL in the form root://user@host/<absolute Logical File Name in xrootd domain>" << endl <<
01177 " (can be a directory LFN)" << endl << endl;
01178
01179 cerr << " -d lvl : debug level: 1 (low), 2 (medium), 3 (high)" << endl;
01180 cerr << " -D proxyaddr:proxyport" << endl <<
01181 " : use proxyaddr:proxyport as a SOCKS4 proxy."
01182 " Only numerical addresses are supported." << endl <<
01183 cerr << " -DSparmname stringvalue" << endl <<
01184 " : set the internal parm <parmname> with the string value <stringvalue>" << endl <<
01185 " See XrdClientConst.hh for a list of parameters." << endl;
01186 cerr << " -DIparmname intvalue" << endl <<
01187 " : set the internal parm <parmname> with the integer value <intvalue>" << endl <<
01188 " See XrdClientConst.hh for a list of parameters." << endl <<
01189 " Examples: -DIReadCacheSize 3000000 -DIReadAheadSize 1000000" << endl;
01190 cerr << " -s : silent mode, no summary output, no progress bar" << endl;
01191 cerr << " -np : no progress bar" << endl;
01192 cerr << " -v : display summary output" << endl <<endl;
01193 cerr << " -OS : adds some opaque information to any SOURCE xrootd url" << endl;
01194 cerr << " -OD : adds some opaque information to any DEST xrootd url" << endl;
01195 cerr << " -f : re-create a file if already present" << endl;
01196 cerr << " -F : set the 'force' flag for xrootd dest file opens"
01197 " (ignore if file is already opened)" << endl;
01198 cerr << " -force : set 1-min (re)connect attempts to retry for up to 1 week,"
01199 " to block until xrdcp is executed" << endl << endl;
01200 cerr << " -md5 : calculate the md5 checksum during transfers\n" << endl;
01201 #ifdef HAVE_LIBZ
01202 cerr << " -adler : calculate the adler32 checksum during transfers\n" << endl;
01203 #endif
01204 cerr << " -R : recurse subdirectories (where it can be applied)" << endl;
01205 cerr << " -S num : use <num> additional parallel streams to do the xfer." << endl <<
01206 " The max value is 15. The default is 0 (i.e. use only the main stream)" << endl;
01207 cerr << " -MLlibname" << endl <<
01208 " : use <libname> as external monitoring reporting library." << endl <<
01209 " The default name if XrdCpMonitorClient.so . Make sure it is reachable." << endl;
01210 cerr << " -X rdr : Activate the Xtreme copy algorithm. Use 'rdr' as hostname where to query for " << endl <<
01211 " additional sources." << endl;
01212 cerr << " -x : Activate the Xtreme copy algorithm. Use the source hostname to query for " << endl <<
01213 " additional sources." << endl;
01214 cerr << " -P : request POSC (persist-on-successful-close) processing to create a new file." << endl;
01215 cerr << " where:" << endl;
01216 cerr << " parmname is the name of an internal parameter" << endl;
01217 cerr << " stringvalue is a string to be assigned to an internal parameter" << endl;
01218 cerr << " intvalue is an int to be assigned to an internal parameter" << endl;
01219 }
01220
01221
01222
01223 int main(int argc, char**argv) {
01224 char *srcpath = 0, *destpath = 0;
01225 memset (BWMHost, 0, sizeof(BWMHost));
01226
01227 if (argc < 3) {
01228 PrintUsage();
01229 exit(1);
01230 }
01231
01232 #ifdef WIN32
01233 WORD wVersionRequested;
01234 WSADATA wsaData;
01235 int err;
01236 wVersionRequested = MAKEWORD( 2, 2 );
01237 err = WSAStartup( wVersionRequested, &wsaData );
01238 #endif
01239
01240 DebugSetLevel(-1);
01241
01242
01243
01244
01245
01246 EnvPutString( NAME_REDIRDOMAINALLOW_RE, "*" );
01247 EnvPutString( NAME_CONNECTDOMAINALLOW_RE, "*" );
01248 EnvPutString( NAME_REDIRDOMAINDENY_RE, "" );
01249 EnvPutString( NAME_CONNECTDOMAINDENY_RE, "" );
01250
01251 EnvPutInt( NAME_READAHEADSIZE, XRDCP_XRDRASIZE);
01252 EnvPutInt( NAME_READCACHESIZE, 2*XRDCP_XRDRASIZE );
01253 EnvPutInt( NAME_READCACHEBLKREMPOLICY, XrdClientReadCache::kRmBlk_LeastOffs );
01254 EnvPutInt( NAME_PURGEWRITTENBLOCKS, 1 );
01255
01256
01257 EnvPutInt( NAME_DEBUG, -1);
01258
01259 for (int i=1; i < argc; i++) {
01260
01261 if ( (strstr(argv[i], "-s") == argv[i])) {
01262 summary=false;
01263 progbar=false;
01264 continue;
01265 }
01266
01267 if ( (strstr(argv[i], "-np") == argv[i])) {
01268 progbar=false;
01269 continue;
01270 }
01271
01272 if ( (strstr(argv[i], "-v") == argv[i])) {
01273 summary=true;
01274 continue;
01275 }
01276
01277 if ( (strstr(argv[i], "-R") == argv[i])) {
01278 recurse=true;
01279 continue;
01280 }
01281
01282 if ( (strstr(argv[i], "-OS") == argv[i])) {
01283 srcopaque=argv[i]+3;
01284 continue;
01285 }
01286
01287 if ( (strstr(argv[i], "-OD") == argv[i])) {
01288 dstopaque=argv[i]+3;
01289 continue;
01290 }
01291
01292 if ( (strstr(argv[i], "-F") == argv[i])) {
01293 xrd_wr_flags |= kXR_force;
01294 continue;
01295 }
01296
01297 if ( (strstr(argv[i], "-P") == argv[i])) {
01298 xrd_wr_flags |= kXR_posc;
01299 continue;
01300 }
01301
01302 if ( (strstr(argv[i], "-f") == argv[i])) {
01303
01304 kXR_unt16 tmp = kXR_new;
01305 tmp = ~tmp;
01306
01307 xrd_wr_flags &= tmp;
01308
01309
01310 xrd_wr_flags |= kXR_delete;
01311
01312
01313
01314 loc_wr_flags = LOC_WR_FLAGS_FORCE;
01315
01316 continue;
01317 }
01318
01319 if ( (strstr(argv[i], "-force") == argv[i])) {
01320 EnvPutInt( NAME_CONNECTTIMEOUT , 60);
01321 EnvPutInt( NAME_FIRSTCONNECTMAXCNT, 7*24*60);
01322 continue;
01323 }
01324
01325
01326 if ( (strstr(argv[i], "-DS") == argv[i]) &&
01327 (argc >= i+2) ) {
01328 cerr << "Overriding " << argv[i]+3 << " with value " << argv[i+1] << ". ";
01329 EnvPutString( argv[i]+3, argv[i+1] );
01330 cerr << " Final value: " << EnvGetString(argv[i]+3) << endl;
01331 i++;
01332 continue;
01333 }
01334
01335 if ( (strstr(argv[i], "-DI") == argv[i]) &&
01336 (argc >= i+2) ) {
01337 cerr << "Overriding '" << argv[i]+3 << "' with value " << argv[i+1] << ". ";
01338 EnvPutInt( argv[i]+3, atoi(argv[i+1]) );
01339 cerr << " Final value: " << EnvGetLong(argv[i]+3) << endl;
01340 i++;
01341 continue;
01342 }
01343
01344
01345 if ( (strstr(argv[i], "-D") == argv[i]) &&
01346 (argc >= i+2) ) {
01347
01348 char host[1024];
01349 char *pos;
01350 int port;
01351
01352 pos = strstr(argv[i+1], ":");
01353
01354 if (pos && strlen(pos) > 1) {
01355
01356 cerr << "Using '" << argv[i+1] << "' as a SOCKS4 proxy.";
01357 strncpy(host, argv[i+1], pos-argv[i+1]);
01358 host[pos-argv[i+1]] = 0;
01359
01360 sscanf(pos+1, "%d", &port);
01361
01362 cerr << " Host:" << host << " port: " << port << endl;
01363 EnvPutString( NAME_SOCKS4HOST, host);
01364 EnvPutInt( NAME_SOCKS4PORT, port);
01365 }
01366 else {
01367 cerr << "Malformed -D option." << endl;
01368 exit(-1);
01369 }
01370
01371 i++;
01372 continue;
01373 }
01374
01375
01376 if ( (strstr(argv[i], "-d") == argv[i]) &&
01377 (argc >= i+2) ) {
01378 int dbglvl = atoi(argv[i+1]);
01379 if (dbglvl > 0) {
01380 EnvPutInt( NAME_DEBUG, dbglvl);
01381 cerr << "Set debug level " << EnvGetLong(NAME_DEBUG)<< endl;
01382 }
01383 i++;
01384 continue;
01385 }
01386
01387 if ( (strstr(argv[i], "-S") == argv[i]) &&
01388 (argc >= i+2) ) {
01389 int parstreams = atoi(argv[i+1]);
01390 parstreams = xrdmin(parstreams, 15);
01391 parstreams = xrdmax(0, parstreams);
01392
01393 EnvPutInt( NAME_MULTISTREAMCNT, parstreams);
01394
01395 cerr << "Set " << NAME_MULTISTREAMCNT << " to " <<
01396 EnvGetLong(NAME_MULTISTREAMCNT) << endl;
01397
01398 i++;
01399 continue;
01400 }
01401
01402 if ( (strstr(argv[i], "-X") == argv[i]) &&
01403 (argc >= i+2) ) {
01404 doXtremeCp = true;
01405 XtremeCpRdr = argv[i+1];
01406
01407 cerr << "Extreme Copy enabled. XtremeCpRdr: " << XtremeCpRdr << endl;
01408
01409
01410 i++;
01411 continue;
01412 }
01413
01414 if ( (strstr(argv[i], "-x") == argv[i]) ) {
01415 doXtremeCp = true;
01416 XtremeCpRdr = "";
01417
01418 cerr << "Extreme Copy enabled. " << endl;
01419
01420 continue;
01421 }
01422
01423 #ifdef HAVE_XRDCRYPTO
01424 #ifndef WIN32
01425 if ( (strstr(argv[i], "-md5") == argv[i])) {
01426 md5=true;
01427
01428 if (!(gCryptoFactory = XrdCryptoFactory::GetCryptoFactory("ssl"))) {
01429 cerr << "Cannot instantiate crypto factory ssl" << endl;
01430 exit(-1);
01431 }
01432
01433 MD_5 = gCryptoFactory->MsgDigest("md5");
01434 if (! MD_5) {
01435 cerr << "MD object could not be instantiated " << endl;
01436 exit(-1);
01437 }
01438 continue;
01439 }
01440 #endif
01441 #endif
01442
01443 #ifdef HAVE_LIBZ
01444 if ( (strstr(argv[i], "-adler") == argv[i])) {
01445 adlerchk=true;
01446 continue;
01447 }
01448 #endif
01449
01450
01451 if ( (strstr(argv[i], "-") == argv[i]) && (strlen(argv[i]) > 1) ) {
01452 cerr << "Unknown parameter " << argv[i] << endl;
01453 continue;
01454 }
01455
01456 if (!srcpath) srcpath = argv[i];
01457 else
01458 if (!destpath) destpath = argv[i];
01459
01460
01461 }
01462
01463 if (!srcpath || !destpath) {
01464 PrintUsage();
01465 exit(1);
01466 }
01467
01468 if (XtremeCpRdr == "") XtremeCpRdr = srcpath;
01469
01470 DebugSetLevel(EnvGetLong(NAME_DEBUG));
01471
01472 Info(XrdClientDebug::kUSERDEBUG, "main", XRDCP_VERSION);
01473
01474 XrdCpWorkLst *wklst = new XrdCpWorkLst();
01475 XrdOucString src, dest;
01476 XrdClient *xrddest;
01477
01478 cpnfo.XrdCli = 0;
01479
01480 if (wklst->SetSrc(&cpnfo.XrdCli, srcpath, srcopaque, recurse)) {
01481 cerr << "Error accessing path/file for " << srcpath << endl;
01482 exit(1);
01483 }
01484
01485 xrddest = 0;
01486
01487
01488
01489
01490 if (wklst->SetDest(&xrddest, destpath, dstopaque, xrd_wr_flags)) {
01491 cerr << "Error accessing path/file for " << destpath << endl;
01492 PrintLastServerError(xrddest);
01493 exit(1);
01494 }
01495
01496 int retval = 0;
01497 while (!retval && wklst->GetCpJob(src, dest)) {
01498 Info(XrdClientDebug::kUSERDEBUG, "main", src << " --> " << dest);
01499
01500 #ifdef HAVE_XRDCRYPTO
01501 if (md5) {
01502 MD_5->Reset("md5");
01503 }
01504 #endif
01505 adler = 0;
01506
01507
01508
01509 cpnfo.mon = 0;
01510 #ifndef WIN32
01511 void *monhandle = dlopen (monlibname.c_str(), RTLD_LAZY);
01512
01513 if (monhandle) {
01514 XrdClientMonIntfHook monlibhook = (XrdClientMonIntfHook)dlsym(monhandle, "XrdClientgetMonIntf");
01515
01516 const char *err = 0;
01517 if ((err = dlerror())) {
01518 cerr << "Error accessing monitoring client library " << monhandle << ". Inappropriate content." << endl <<
01519 "error: " << err << endl;
01520 dlclose(monhandle);
01521 monhandle = 0;
01522 }
01523 else
01524 cpnfo.mon = (XrdClientAbsMonIntf *)monlibhook(src.c_str(), dest.c_str());
01525 }
01526 #endif
01527
01528 if (cpnfo.mon) {
01529
01530 char *name=0, *ver=0, *rem=0;
01531 if (!cpnfo.mon->GetMonLibInfo(&name, &ver, &rem)) {
01532 Info(XrdClientDebug::kUSERDEBUG,
01533 "main", "Monitoring client plugin found. Name:'" << name <<
01534 "' Ver:'" << ver << "' Remarks:'" << rem << "'");
01535 }
01536 else {
01537 delete cpnfo.mon;
01538 cpnfo.mon = 0;
01539 }
01540
01541 }
01542
01543 #ifndef WIN32
01544 if (!cpnfo.mon && monhandle) {
01545 dlclose(monhandle);
01546 monhandle = 0;
01547 }
01548 #endif
01549
01550
01551 if (cpnfo.mon) {
01552
01553 cpnfo.mon->Init(src.c_str(), dest.c_str(), (DebugLevel() > 0) );
01554 cpnfo.mon->PutProgressInfo(0, cpnfo.len, 0, 1);
01555 }
01556
01557 if ( (src.beginswith("root://")) || (src.beginswith("xroot://")) ) {
01558
01559
01560 if (srcopaque) {
01561 src += "?";
01562 src += srcopaque;
01563 }
01564
01565 if ( (dest.beginswith("root://")) || (dest.beginswith("xroot://")) ) {
01566 XrdOucString d;
01567 bool isd;
01568 wklst->GetDest(d, isd);
01569
01570 BuildFullDestFilename(src, d, isd);
01571
01572 if (dstopaque) {
01573 d += "?";
01574 d += dstopaque;
01575 }
01576
01577 retval = doCp_xrd2xrd(&xrddest, src.c_str(), d.c_str());
01578
01579 }
01580 else {
01581 XrdOucString d;
01582 bool isd;
01583 int res;
01584 wklst->GetDest(d, isd);
01585 res = CreateDestPath_loc(d, isd);
01586 if (!res || (errno == EEXIST) || !errno) {
01587 BuildFullDestFilename(src, d, isd);
01588 retval = doCp_xrd2loc(src.c_str(), d.c_str());
01589 }
01590 else
01591 cerr << "Error " << strerror(errno) <<
01592 " accessing path for " << d << endl;
01593 }
01594 }
01595 else {
01596
01597
01598 if ( (dest.beginswith("root://")) || (dest.beginswith("xroot://")) ) {
01599 XrdOucString d;
01600 bool isd;
01601 wklst->GetDest(d, isd);
01602
01603 BuildFullDestFilename(src, d, isd);
01604
01605 if (dstopaque) {
01606 d += "?";
01607 d += dstopaque;
01608 }
01609
01610 retval = doCp_loc2xrd(&xrddest, src.c_str(), d.c_str());
01611
01612 }
01613 else {
01614 cerr << "Better to use cp in this case. (dest: "<<dest<<")" << endl;
01615 exit(2);
01616 }
01617
01618 }
01619
01620
01621 if (cpnfo.mon) {
01622 cpnfo.mon->DeInit();
01623 delete cpnfo.mon;
01624 cpnfo.mon = 0;
01625 #ifndef WIN32
01626 if (monhandle) dlclose(monhandle);
01627 monhandle = 0;
01628 #endif
01629 }
01630
01631
01632 }
01633
01634 #ifdef HAVE_XRDCRYPTO
01635 if (md5 && MD_5)
01636 delete MD_5;
01637 #endif
01638
01639 return retval;
01640 }