Xrdcp.cc

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // xrdcp                                                                //
00004 //                                                                      //
00005 // Author: Fabrizio Furano (INFN Padova, 2004)                          //
00006 //                                                                      //
00007 // A cp-like command line tool for xrootd environments                  //
00008 //                                                                      //
00009 //////////////////////////////////////////////////////////////////////////
00010 
00011 //       $Id: Xrdcp.cc 38011 2011-02-08 18:35:57Z ganis $
00012 
00013 const char *XrdcpCVSID = "$Id: Xrdcp.cc 38011 2011-02-08 18:35:57Z ganis $";
00014 
00015 #include "XrdClient/XrdClientUrlInfo.hh"
00016 #include "XrdSys/XrdSysPthread.hh"
00017 #include "XrdClient/XrdClient.hh"
00018 #include "XrdClient/XrdCpMthrQueue.hh"
00019 #include "XrdClient/XrdClientDebug.hh"
00020 #include "XrdClient/XrdCpWorkLst.hh"
00021 #include "XrdClient/XrdClientEnv.hh"
00022 #include "XrdSys/XrdSysPlatform.hh"
00023 
00024 #include <XrdCrypto/XrdCryptoFactory.hh>
00025 #include <XrdCrypto/XrdCryptoMsgDigest.hh>
00026 
00027 #include "XrdClient/XrdClientAbsMonIntf.hh"
00028 #include "XrdClient/XrdcpXtremeRead.hh"
00029 
00030 #include <sys/types.h>
00031 #include <sys/stat.h>
00032 #include <fcntl.h>
00033 #ifndef WIN32
00034 #include <sys/time.h>
00035 #include <unistd.h>
00036 #include <dlfcn.h>
00037 #endif
00038 #include <stdarg.h>
00039 #include <stdio.h>
00040 
00041 #ifdef HAVE_LIBZ
00042 #include <zlib.h>
00043 #endif
00044 
00045 extern "C" {
00046 /////////////////////////////////////////////////////////////////////
00047 // function + macro to allow formatted print via cout,cerr
00048 /////////////////////////////////////////////////////////////////////
00049  void cout_print(const char *format, ...)
00050  {
00051     char cout_buff[4096];
00052     va_list args;
00053     va_start(args, format);
00054     vsprintf(cout_buff, format,  args);
00055     va_end(args);
00056     cout << cout_buff;
00057  }
00058 
00059    void cerr_print(const char *format, ...)
00060    {
00061       char cerr_buff[4096];
00062       va_list args;
00063       va_start(args, format);
00064       vsprintf(cerr_buff, format,  args);
00065       va_end(args);
00066       cerr << cerr_buff;
00067    }
00068 
00069 #define COUT(s) do {                            \
00070       cout_print s;                             \
00071    } while (0)
00072 
00073 #define CERR(s) do {                            \
00074       cerr_print s;                             \
00075    } while (0)
00076 
00077 }
00078 //////////////////////////////////////////////////////////////////////
00079 
00080 
00081 struct XrdCpInfo {
00082    XrdClient                    *XrdCli;
00083    int                          localfile;
00084    long long                    len, bread, bwritten;
00085    XrdCpMthrQueue               queue;
00086    XrdClientAbsMonIntf          *mon;
00087 } cpnfo;
00088 
00089 #define XRDCP_BLOCKSIZE          (8*1024*1024)
00090 #define XRDCP_XRDRASIZE          (30*XRDCP_BLOCKSIZE)
00091 #define XRDCP_VERSION            "(C) 2004-2010 by the Xrootd group. $Revision$ - Xrootd version: "XrdVSTRING
00092 
00093 ///////////////////////////////////////////////////////////////////////
00094 // Coming from parameters on the cmd line
00095 
00096 bool summary=false;            // print summary
00097 bool progbar=true;             // print progbar
00098 bool md5=false;                // print md5
00099 bool adlerchk=false;           // print adler32 chksum
00100 
00101 XrdOucString monlibname = "libXrdCpMonitorClient.so"; // Default name for the ext monitoring lib
00102 
00103 char *srcopaque=0,
00104    *dstopaque=0;   // opaque info to be added to urls
00105 // Default open flags for opening a file (xrd)
00106 kXR_unt16 xrd_wr_flags=kXR_async | kXR_mkpath | kXR_open_updt | kXR_new;
00107 
00108 // Flags for open() to force overwriting or not. Default is not.
00109 #define LOC_WR_FLAGS_FORCE ( O_CREAT | O_WRONLY | O_TRUNC | O_BINARY );
00110 #define LOC_WR_FLAGS       ( O_CREAT | O_WRONLY | O_EXCL | O_BINARY );
00111 int loc_wr_flags = LOC_WR_FLAGS;
00112 
00113 bool recurse = false;
00114 
00115 char BWMHost[1024]; // The given bandwidth limiter on the local site. If not empty then a bwm has to be used
00116 
00117 bool doXtremeCp = false;
00118 XrdOucString XtremeCpRdr;
00119 
00120 ///////////////////////
00121 
00122 // To compute throughput etc
00123 struct timeval abs_start_time;
00124 struct timeval abs_stop_time;
00125 struct timezone tz;
00126 
00127 #ifdef HAVE_XRDCRYPTO
00128 // To calculate md5 sums during transfers
00129 XrdCryptoMsgDigest *MD_5=0;    // md5 computation
00130 XrdCryptoFactory *gCryptoFactory = 0;
00131 #endif
00132 
00133 // To calculate the adler32 cksum
00134 unsigned int adler = 0;
00135 
00136 #ifdef HAVE_XRDCRYPTO
00137 void print_summary(const char* src, const char* dst, unsigned long long bytesread, XrdCryptoMsgDigest* _MD_5, unsigned int adler ) {
00138 #else
00139 void print_summary(const char* src, const char* dst, unsigned long long bytesread, unsigned int adler ) {
00140 #endif
00141    gettimeofday (&abs_stop_time, &tz);
00142    float abs_time=((float)((abs_stop_time.tv_sec - abs_start_time.tv_sec) *1000 +
00143                            (abs_stop_time.tv_usec - abs_start_time.tv_usec) / 1000));
00144 
00145 
00146    XrdOucString xsrc(src);
00147    XrdOucString xdst(dst);
00148    xsrc.erase(xsrc.rfind('?'));
00149    xdst.erase(xdst.rfind('?'));
00150 
00151    COUT(("[xrdcp] #################################################################\n"));
00152    COUT(("[xrdcp] # Source Name              : %s\n",xsrc.c_str()));
00153    COUT(("[xrdcp] # Destination Name         : %s\n",xdst.c_str()));
00154    COUT(("[xrdcp] # Data Copied [bytes]      : %lld\n",bytesread));
00155    COUT(("[xrdcp] # Realtime [s]             : %f\n",abs_time/1000.0));
00156    if (abs_time > 0) {
00157       COUT(("[xrdcp] # Eff.Copy. Rate[MB/s]     : %f\n",bytesread/abs_time/1000.0));
00158    }
00159 #ifdef HAVE_XRDCRYPTO
00160 #ifndef WIN32
00161    if (md5) {
00162      COUT(("[xrdcp] # md5                      : %s\n",_MD_5->AsHexString()));
00163    }
00164 #endif
00165 #endif
00166    if (adlerchk) {
00167       COUT(("[xrdcp] # adler32                  : %x\n", adler));
00168    }
00169    COUT(("[xrdcp] #################################################################\n"));
00170 }
00171 
00172 void print_progbar(unsigned long long bytesread, unsigned long long size) {
00173    CERR(("[xrootd] Total %.02f MB\t|",(float)size/1024/1024));
00174    for (int l=0; l< 20;l++) {
00175       if (l< ( (int)(20.0*bytesread/size)))
00176          CERR(("="));
00177       if (l==( (int)(20.0*bytesread/size)))
00178          CERR((">"));
00179       if (l> ( (int)(20.0*bytesread/size)))
00180          CERR(("."));
00181    }
00182   
00183    float abs_time=((float)((abs_stop_time.tv_sec - abs_start_time.tv_sec) *1000 +
00184                            (abs_stop_time.tv_usec - abs_start_time.tv_usec) / 1000));
00185    CERR(("| %.02f %% [%.01f MB/s]\r",100.0*bytesread/size,bytesread/abs_time/1000.0));
00186 }
00187 
00188 #ifdef HAVE_XRDCRYPTO
00189 void print_chksum(const char* src, unsigned long long bytesread, XrdCryptoMsgDigest* _MD_5, unsigned adler) {
00190   if (_MD_5 || adlerchk) {
00191 #else
00192 void print_chksum(const char* src, unsigned long long bytesread, unsigned adler) {
00193   if (adlerchk) {
00194 #endif
00195     XrdOucString xsrc(src);
00196     xsrc.erase(xsrc.rfind('?'));
00197     //    printf("md5: %s\n",_MD_5->AsHexString());
00198 #ifdef HAVE_XRDCRYPTO
00199 #ifndef WIN32
00200     if (_MD_5)
00201        cout << "md5: " << _MD_5->AsHexString() << " " << xsrc << " " << bytesread << endl;
00202 #endif
00203 #endif
00204     if (adlerchk)
00205        cout << "adler32: " << hex << adler << " " << xsrc << bytesread << endl;
00206 
00207   }
00208 }
00209 
00210 
00211 
00212 
00213 // The body of a thread which reads from the global
00214 //  XrdClient and keeps the queue filled
00215 //____________________________________________________________________________
00216 void *ReaderThread_xrd(void *)
00217 {
00218 
00219    Info(XrdClientDebug::kHIDEBUG,
00220         "ReaderThread_xrd",
00221         "Reader Thread starting.");
00222    
00223    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00224    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00225 
00226 
00227    void *buf;
00228    long long offs = 0;
00229    int nr = 1;
00230    long long bread = 0, len = 0;
00231    long blksize;
00232 
00233    len = cpnfo.len;
00234 
00235    while ((nr > 0) && (offs < len)) {
00236       buf = malloc(XRDCP_BLOCKSIZE);
00237       if (!buf) {
00238          cerr << "Out of memory." << endl;
00239          abort();
00240       }
00241 
00242       
00243       blksize = xrdmin(XRDCP_BLOCKSIZE, len-offs);
00244 
00245       if ( (nr = cpnfo.XrdCli->Read(buf, offs, blksize)) ) {
00246          cpnfo.queue.PutBuffer(buf, offs, nr);
00247          cpnfo.XrdCli->RemoveDataFromCache(offs, offs+nr-1, false);
00248          bread += nr;
00249          offs += nr;
00250       }
00251 
00252       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00253       pthread_testcancel();
00254       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00255    }
00256 
00257    cpnfo.bread = bread;
00258 
00259    // This ends the transmission... bye bye
00260    cpnfo.queue.PutBuffer(0, 0, 0);
00261 
00262    return 0;
00263 }
00264 
00265 
00266 
00267 
00268 // The body of a thread which reads from the global
00269 //  XrdClient and keeps the queue filled
00270 // This is the thread for extreme reads, in this case we may have multiple of these
00271 // threads, reading the same file from different server endpoints
00272 //____________________________________________________________________________
00273 struct xtreme_threadnfo {
00274    XrdXtRdFile *xtrdhandler;
00275 
00276    // The client used by this thread
00277    XrdClient *cli;
00278 
00279    // A unique integer identifying the client instance
00280    int clientidx;
00281 
00282    // The block from which to start prefetching/reading
00283    int startfromblk;
00284 
00285    // Max convenient number of outstanding blks
00286    int maxoutstanding;
00287 }; 
00288 void *ReaderThread_xrd_xtreme(void *parm)
00289 {
00290 
00291    Info(XrdClientDebug::kHIDEBUG,
00292         "ReaderThread_xrd_xtreme",
00293         "Reader Thread starting.");
00294    
00295    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00296    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00297 
00298    void *buf;
00299 
00300    int nr = 1;
00301    int noutstanding = 0;
00302 
00303 
00304    // Which block to read
00305    XrdXtRdBlkInfo *blknfo = 0;
00306    xtreme_threadnfo *thrnfo = (xtreme_threadnfo *)parm;
00307 
00308    // Block to prefetch
00309    int lastprefetched = thrnfo->startfromblk;
00310    int lastread = lastprefetched;
00311 
00312    thrnfo->cli->Open(0, 0, true);
00313 
00314    thrnfo->cli->SetCacheParameters(XRDCP_BLOCKSIZE*4*thrnfo->maxoutstanding*2, 0, XrdClientReadCache::kRmBlk_FIFO);
00315    if (thrnfo->cli->IsOpen_wait())
00316    while (nr > 0) {
00317 
00318       // Keep always some blocks outstanding from the point of view of this reader
00319       while (noutstanding < thrnfo->maxoutstanding) {
00320          int lp;
00321          lp = thrnfo->xtrdhandler->GetBlkToPrefetch(lastprefetched, thrnfo->clientidx, blknfo);
00322          if (lp >= 0) {
00323             //cout << "cli: " << thrnfo->clientidx << " prefetch: " << lp << " offs: " << blknfo->offs << " len: " << blknfo->len << endl;
00324             if ( thrnfo->cli->Read_Async(blknfo->offs, blknfo->len) == kOK ) {  
00325                lastprefetched = lp;
00326                noutstanding++;
00327             }
00328             else break;
00329          }
00330          else break;
00331       }
00332 
00333       int lr = thrnfo->xtrdhandler->GetBlkToRead(lastread, thrnfo->clientidx, blknfo);
00334       if (lr >= 0) {
00335 
00336          buf = malloc(blknfo->len);
00337          if (!buf) {
00338             cerr << "Out of memory." << endl;
00339             abort();
00340          }
00341 
00342          //cout << "cli: " << thrnfo->clientidx << "     read: " << lr << " offs: " << blknfo->offs << " len: " << blknfo->len << endl;
00343 
00344          // It is very important that the search for a blk to read starts from the first block upwards
00345          nr = thrnfo->cli->Read(buf, blknfo->offs, blknfo->len);
00346          if ( nr >= 0 ) {
00347             lastread = lr;
00348             noutstanding--;
00349 
00350             // If this block was stolen by somebody else then this client has to be penalized
00351             // If this client stole the blk to some other client, then this client has to be rewarded
00352             int reward = thrnfo->xtrdhandler->MarkBlkAsRead(lr);
00353             if (reward >= 0) 
00354                // Enqueue the block only if it was not already read
00355                cpnfo.queue.PutBuffer(buf, blknfo->offs, nr);
00356 
00357             if (reward > 0) {
00358                thrnfo->maxoutstanding++;
00359                thrnfo->maxoutstanding = xrdmin(20, thrnfo->maxoutstanding);
00360                thrnfo->cli->SetCacheParameters(XRDCP_BLOCKSIZE*4*thrnfo->maxoutstanding*2, 0, XrdClientReadCache::kRmBlk_FIFO);
00361             }
00362             if (reward < 0) {
00363                thrnfo->maxoutstanding--;
00364                free(buf);
00365             }
00366 
00367             if (thrnfo->maxoutstanding <= 0) {
00368                sleep(1);
00369                thrnfo->maxoutstanding = 1;
00370             }
00371 
00372          }
00373 
00374          // It is very important that the search for a blk to read starts from the first block upwards
00375          thrnfo->cli->RemoveDataFromCache(blknfo->offs, blknfo->offs+blknfo->len-1, false);
00376       }
00377       else {
00378 
00379          if (thrnfo->xtrdhandler->AllDone()) break;
00380          pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00381          sleep(1);
00382       }
00383 
00384 
00385       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00386       pthread_testcancel();
00387       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00388    }
00389 
00390    // We get here if there are no more blocks to read or to steal from other readers
00391    // This ends the transmission... bye bye
00392    cpnfo.queue.PutBuffer(0, 0, 0);
00393 
00394    return 0;
00395 }
00396 
00397 
00398 // The body of a thread which reads from the global filehandle
00399 //  and keeps the queue filled
00400 //____________________________________________________________________________
00401 void *ReaderThread_loc(void *) {
00402 
00403    Info(XrdClientDebug::kHIDEBUG,
00404         "ReaderThread_loc",
00405         "Reader Thread starting.");
00406 
00407    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00408    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00409 
00410    void *buf;
00411    long long offs = 0;
00412    int nr = 1;
00413    long long bread = 0;
00414 
00415    while (nr > 0) {
00416       buf = malloc(XRDCP_BLOCKSIZE);
00417       if (!buf) {
00418          cerr << "Out of memory." << endl;
00419          abort();
00420       }
00421 
00422       if ( (nr = read(cpnfo.localfile, buf, XRDCP_BLOCKSIZE)) ) {
00423          cpnfo.queue.PutBuffer(buf, offs, nr);
00424          bread += nr;
00425          offs += nr;
00426       }
00427    }
00428 
00429    cpnfo.bread = bread;
00430 
00431    // This ends the transmission... bye bye
00432    cpnfo.queue.PutBuffer(0, 0, 0);
00433 
00434    return 0;
00435 }
00436 
00437 
00438 int CreateDestPath_loc(XrdOucString path, bool isdir) {
00439    // We need the path name without the file
00440    if (!isdir) {
00441       int pos = path.rfind('/');
00442 
00443       if (pos != STR_NPOS)
00444          path.erase(pos);
00445       else path = "";
00446 
00447 
00448    }
00449 
00450    if (path != "")
00451       return ( MAKEDIR(
00452                      path.c_str(),
00453                      S_IRUSR | S_IWUSR | S_IXUSR |
00454                      S_IRGRP | S_IWGRP | S_IXGRP |
00455                      S_IROTH | S_IXOTH)
00456                );
00457    else
00458       return 0;
00459 
00460 }
00461    
00462 void BuildFullDestFilename(XrdOucString &src, XrdOucString &dest, bool destisdir) {
00463    if (destisdir) {
00464       // We need the filename from the source
00465       XrdOucString fn(src);
00466       fn.erase(fn.find('?'));
00467       int lsl = fn.rfind('/');
00468       if (lsl != STR_NPOS)
00469          fn.erase(0, lsl+1);
00470       dest += fn;
00471    }
00472 }
00473 
00474 int CreateDestPath_xrd(XrdOucString url, bool isdir) {
00475    // We need the path name without the file
00476    bool statok = FALSE, done = FALSE, direxists = TRUE;
00477    long id, flags, modtime;
00478    long long size;
00479    char *path, *slash;
00480 
00481    if (url == "-") return 0;
00482 
00483    //   if (!isdir)
00484    url.erase(url.rfind('/') + 1);
00485 
00486    XrdClientAdmin *adm = new XrdClientAdmin(url.c_str());
00487    if (adm->Connect()) {
00488      XrdClientUrlInfo u(url);
00489 
00490      statok = adm->Stat((char *)u.File.c_str(), id, size, flags, modtime);
00491 
00492      // We might have been redirected to a destination server. Better to remember it and use
00493      //  only this one as output.
00494      if (adm->GetCurrentUrl().IsValid()) {
00495         u.Host = adm->GetCurrentUrl().Host;
00496         u.Port = adm->GetCurrentUrl().Port;
00497         url = u.GetUrl();
00498      }
00499 
00500      path = (char *)u.File.c_str();
00501      slash = path;
00502 
00503      // FIXME: drop the top level directory as it cannot be stat by the xrootd server
00504      slash += strspn(slash, "/");
00505      slash += strcspn(slash, "/");
00506      
00507      // If the path already exists, it's good
00508      done = (statok && (flags & kXR_isDir));
00509 
00510      // The idea of slash pointer is taken from the BSD mkdir implementation
00511      while (!done) {
00512        slash += strspn(slash, "/");
00513        slash += strcspn(slash, "/");
00514        
00515        char nextChar = *(slash+1);
00516        done = (*slash == '\0' || nextChar == '\0');
00517        *(slash+1) = '\0';
00518 
00519        if (direxists) {
00520          statok = adm->Stat(path, id, size, flags, modtime);
00521          if (!statok || (!(flags & kXR_xset) && !(flags & kXR_other))) {
00522            direxists = FALSE;
00523          }
00524        }
00525          
00526        if (!direxists) {
00527          Info(XrdClientDebug::kHIDEBUG,
00528               "CreateDestPath__xrd",
00529               "Creating directory " << path);
00530          
00531          adm->Mkdir(path, 7, 5, 5);
00532          
00533        }
00534        *(slash+1) = nextChar;
00535      }
00536    }
00537 
00538    delete adm;
00539    return 0;
00540 }
00541 
00542 int doCp_xrd2xrd(XrdClient **xrddest, const char *src, const char *dst) {
00543    // ----------- xrd to xrd affair
00544    pthread_t myTID;
00545    XrdClientVector<pthread_t> myTIDVec;
00546 
00547    void *thret;
00548    XrdClientStatInfo stat;
00549    int retvalue = 0;
00550 
00551    gettimeofday(&abs_start_time,&tz);
00552 
00553    // Open the input file (xrdc)
00554    // If Xrdcli is non-null, the correct src file has already been opened
00555    if (!cpnfo.XrdCli) {
00556       cpnfo.XrdCli = new XrdClient(src);
00557       if ( ( !cpnfo.XrdCli->Open(0, kXR_async) ||
00558              (cpnfo.XrdCli->LastServerResp()->status != kXR_ok) ) ) {
00559          cerr << "Error opening remote source file " << src << endl;
00560          PrintLastServerError(cpnfo.XrdCli);
00561 
00562          delete cpnfo.XrdCli;
00563          cpnfo.XrdCli = 0;
00564          return 1;
00565       }
00566    }
00567 
00568    
00569    cpnfo.XrdCli->Stat(&stat);
00570    cpnfo.len = stat.size;
00571    
00572    // if xrddest if nonzero, then the file is already opened for writing
00573    if (!*xrddest) {
00574       *xrddest = new XrdClient(dst);
00575       
00576       if (!PedanticOpen4Write(*xrddest, kXR_ur | kXR_uw | kXR_gw | kXR_gr | kXR_or,
00577                               xrd_wr_flags)) {
00578          cerr << "Error opening remote destination file " << dst << endl;
00579          PrintLastServerError(*xrddest);
00580          
00581          delete cpnfo.XrdCli;
00582          delete *xrddest;
00583          *xrddest = 0;
00584          cpnfo.XrdCli = 0;
00585          return -1;
00586       }
00587       
00588    }
00589    
00590    // If the Extreme Copy flag is set, we try to find more sources for this file
00591    // Each source gets assigned to a different reader thread
00592    XrdClientVector<XrdClient *> xtremeclients;
00593    XrdXtRdFile *xrdxtrdfile = 0;
00594    
00595    if (doXtremeCp) 
00596       XrdXtRdFile::GetListOfSources(cpnfo.XrdCli, XtremeCpRdr, xtremeclients);
00597    
00598    // Start reader on xrdc
00599    if (doXtremeCp && (xtremeclients.GetSize() > 1)) {
00600       
00601       // Beware... with the extreme copy the normal read ahead mechanism
00602       // makes no sense at all.
00603       //EnvPutInt(NAME_REMUSEDCACHEBLKS, 1);
00604       xrdxtrdfile = new XrdXtRdFile(XRDCP_BLOCKSIZE*4, cpnfo.len);
00605       
00606       for (int iii = 0; iii < xtremeclients.GetSize(); iii++) {
00607          xtreme_threadnfo *nfo = new(xtreme_threadnfo);
00608          nfo->xtrdhandler = xrdxtrdfile;
00609          nfo->cli = xtremeclients[iii];
00610          nfo->clientidx = xrdxtrdfile->GimmeANewClientIdx();
00611          nfo->startfromblk = iii*xrdxtrdfile->GetNBlks() / xtremeclients.GetSize();
00612          nfo->maxoutstanding = xrdmin( 5, xrdxtrdfile->GetNBlks() / xtremeclients.GetSize() );
00613 
00614          XrdSysThread::Run(&myTID, ReaderThread_xrd_xtreme, (void *)nfo);
00615          myTIDVec.Push_back(myTID);
00616       }
00617       
00618    }
00619    else {
00620       XrdSysThread::Run(&myTID, ReaderThread_xrd, (void *)&cpnfo);
00621       myTIDVec.Push_back(myTID);
00622    }
00623    
00624    
00625    
00626    
00627    
00628    int len = 1;
00629    void *buf;
00630    long long offs = 0;
00631    long long bytesread=0;
00632    long long size = cpnfo.len;
00633    bool draining = false;
00634    
00635    // Loop to write until ended or timeout err
00636    while (1) {
00637       
00638       if (xrdxtrdfile && xrdxtrdfile->AllDone()) draining = true;
00639       if (draining && !cpnfo.queue.GetLength()) break;
00640 
00641       if ( cpnfo.queue.GetBuffer(&buf, offs, len) ) {
00642 
00643          if (len && buf) {
00644 
00645             bytesread+=len;
00646             if (progbar) {
00647                gettimeofday(&abs_stop_time,&tz);
00648                print_progbar(bytesread,size);
00649             }
00650 
00651 #ifdef HAVE_XRDCRYPTO
00652             if (md5) {
00653                MD_5->Update((const char*)buf,len);
00654             }
00655 #endif
00656 
00657 #ifdef HAVE_LIBZ
00658             if (adlerchk) {
00659                adler = adler32(adler, (const Bytef*)buf, len);
00660             }
00661 #endif
00662 
00663             if (!(*xrddest)->Write(buf, offs, len)) {
00664                cerr << "Error writing to output server." << endl;
00665                PrintLastServerError(*xrddest);
00666                retvalue = 11;
00667                break;
00668             }
00669 
00670             if (cpnfo.mon)
00671                cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0);
00672 
00673             free(buf);
00674 
00675          }
00676          else
00677             if (!xrdxtrdfile && ( ((buf == 0) && (len == 0)) || (bytesread >= size))) {
00678                if (buf) free(buf);
00679                break;
00680             }
00681 
00682       }
00683       else {
00684          cerr << endl << endl << 
00685             "Critical read timeout. Unable to read data from the source." << endl;
00686          retvalue = -1;
00687          break;
00688       }
00689 
00690       buf = 0;
00691    }
00692 
00693    if (cpnfo.mon)
00694       cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0, 1);
00695 
00696    if(progbar) {
00697       cout << endl;
00698    }
00699 
00700    if (cpnfo.len != bytesread) {
00701       cerr << endl << endl << 
00702          "File length mismatch. Read:" << bytesread << " Length:" << cpnfo.len << endl;
00703       retvalue = 13;
00704    }
00705 
00706 #ifdef HAVE_XRDCRYPTO
00707    if (md5) MD_5->Final();
00708    if (adlerchk || md5) {
00709       print_chksum(src, bytesread, MD_5, adler);
00710    }
00711       
00712    if (summary) {        
00713       print_summary(src, dst, bytesread, MD_5, adler);
00714    }
00715 #else
00716    if (adlerchk) {
00717       print_chksum(src, bytesread, adler);
00718    }
00719       
00720    if (summary) {        
00721       print_summary(src, dst, bytesread, adler);
00722    }
00723 #endif
00724       
00725    if (retvalue >= 0) {
00726 
00727       for (int i = 0; i < myTIDVec.GetSize(); i++) {
00728          pthread_cancel(myTIDVec[i]);
00729          pthread_join(myTIDVec[i], &thret);      
00730 
00731          delete cpnfo.XrdCli;
00732          cpnfo.XrdCli = 0;
00733       }
00734    }
00735 
00736    delete *xrddest;
00737    *xrddest = 0;
00738 
00739    return retvalue;
00740 }
00741 
00742 
00743 XrdClient *BWMToken_Init(const char *bwmhost, const char *srcurl, const char *dsturl) {
00744    // Initialize a special client in order to get a bandwidth manager token
00745    // bwmhost is the hostname of the bwm to contact
00746    //  it can come from the one specified in the command line option -bwm
00747    //  it is mandatory
00748    //
00749    // src and dst are the src and dest urls, ev. 0
00750    //
00751    // The token is considered gone by the bwm server when the fake file is closed
00752    // or when the connection drops
00753    //
00754    if (!bwmhost[0]) return 0;
00755 
00756    XrdClientUrlInfo usrc(srcurl);
00757    XrdClientUrlInfo udst(dsturl);
00758    XrdOucString s = "root://";
00759    s += bwmhost;
00760    s += "//_bwm_/";
00761    
00762    s += usrc.File;
00763 
00764    char hname[1024];
00765    memset(hname, 0, sizeof(hname));
00766 
00767    if (gethostname(hname, sizeof(hname)))
00768        strcpy(hname, "Unknown");
00769 
00770    s += "?bwm.src=";
00771    if (usrc.Host != "")
00772       s += usrc.Host; // or the hostname() if it's local
00773    else
00774       s += hname;
00775 
00776    s += "?bwm.dst=";
00777    if (udst.Host != "")
00778       s += udst.Host; // or the hostname() if it's local
00779    else
00780       s += hname;
00781 
00782    XrdClient *cli = new XrdClient(s.c_str());
00783    if (cli) cli->Open(0, kXR_open_updt);
00784    return cli;
00785 }
00786 
00787 bool BWMToken_WaitFor(XrdClient *cli) {
00788 
00789    // Here the actual wait phase is performed through a call to kxr_query(Qvisa)
00790    // Note that this func is synchronous. To allow for parallel enqueueing in multiple
00791    // different BWMs we will have to use threads calling this func
00792 
00793    kXR_char buf[4096];
00794    // This handles the enqueueing for the current file handle opened
00795    if (cli) {
00796       if (!cli->IsOpen()) return false;
00797       return cli->Query(kXR_Qvisa, 0, buf, sizeof(buf));
00798    }
00799    else return true;
00800 }
00801 
00802 
00803 int doCp_xrd2loc(const char *src, const char *dst) {
00804    // ----------- xrd to loc affair
00805    pthread_t myTID;
00806    XrdClientVector<pthread_t> myTIDVec;
00807 
00808    void *thret;
00809    XrdClientStatInfo stat;
00810    int f;
00811    int retvalue = 0;
00812 
00813    if (BWMHost[0]) {
00814    // Get the queue bwm token from the local site
00815    XrdClient *tok1 = BWMToken_Init(BWMHost, src, dst);
00816    if (!tok1 || !BWMToken_WaitFor(tok1)) return 100;
00817 
00818    // Get the queue bwm token from the remote site
00819    XrdClientUrlInfo u(src);
00820    XrdClient *tok2 = BWMToken_Init(u.Host.c_str(), src, dst);
00821    if (!tok2 || !BWMToken_WaitFor(tok2)) return 100;
00822    }
00823 
00824    gettimeofday(&abs_start_time,&tz);
00825 
00826    // Open the input file (xrdc)
00827    // If Xrdcli is non-null, the correct src file has already been opened
00828    if (!cpnfo.XrdCli) {
00829       cpnfo.XrdCli = new XrdClient(src);
00830       if ( ( !cpnfo.XrdCli->Open(0, kXR_async) ||
00831              (cpnfo.XrdCli->LastServerResp()->status != kXR_ok) ) ) {
00832 
00833          cerr << "Error opening remote source file " << src << endl;
00834          PrintLastServerError(cpnfo.XrdCli);
00835 
00836          delete cpnfo.XrdCli;
00837          cpnfo.XrdCli = 0;
00838          return 1;
00839       }
00840    }
00841 
00842    // Open the output file (loc)
00843    cpnfo.XrdCli->Stat(&stat);
00844    cpnfo.len = stat.size;
00845 
00846    if (strcmp(dst, "-")) {
00847       // Copy to local fs
00848       //unlink(dst);
00849       f = open(dst, loc_wr_flags, 
00850           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
00851       if (f < 0) {
00852          cerr << "Error '" << strerror(errno) <<
00853             "' creating " << dst << endl;
00854 
00855          cpnfo.XrdCli->Close();
00856          delete cpnfo.XrdCli;
00857          cpnfo.XrdCli = 0;
00858          return -1;
00859       }
00860       
00861    }
00862    else
00863       // Copy to stdout
00864       f = STDOUT_FILENO;
00865 
00866 
00867    // If the Extreme Copy flag is set, we try to find more sources for this file
00868    // Each source gets assigned to a different reader thread
00869    XrdClientVector<XrdClient *> xtremeclients;
00870    XrdXtRdFile *xrdxtrdfile = 0;
00871 
00872    if (doXtremeCp) 
00873       XrdXtRdFile::GetListOfSources(cpnfo.XrdCli, XtremeCpRdr, xtremeclients);
00874 
00875    // Start reader on xrdc
00876    if (doXtremeCp && (xtremeclients.GetSize() > 1)) {
00877 
00878       // Beware... with the extreme copy the normal read ahead mechanism
00879       // makes no sense at all.
00880 
00881       xrdxtrdfile = new XrdXtRdFile(XRDCP_BLOCKSIZE*4, cpnfo.len);
00882 
00883       for (int iii = 0; iii < xtremeclients.GetSize(); iii++) {
00884          xtreme_threadnfo *nfo = new(xtreme_threadnfo);
00885          nfo->xtrdhandler = xrdxtrdfile;
00886          nfo->cli = xtremeclients[iii];
00887          nfo->clientidx = xrdxtrdfile->GimmeANewClientIdx();
00888          nfo->startfromblk = iii*xrdxtrdfile->GetNBlks() / xtremeclients.GetSize();
00889          nfo->maxoutstanding = xrdmax(xrdmin( 3, xrdxtrdfile->GetNBlks() / xtremeclients.GetSize() ), 1);
00890 
00891          XrdSysThread::Run(&myTID, ReaderThread_xrd_xtreme, (void *)nfo);
00892       }
00893 
00894    }
00895    else {
00896       doXtremeCp = false;
00897       XrdSysThread::Run(&myTID, ReaderThread_xrd, (void *)&cpnfo);
00898    }
00899 
00900    int len = 1;
00901    void *buf;
00902    long long bytesread=0, offs = 0;
00903    long long size = cpnfo.len;
00904    bool draining = false;
00905 
00906    // Loop to write until ended or timeout err
00907    while (1) {
00908 
00909       if (xrdxtrdfile && xrdxtrdfile->AllDone()) draining = true;
00910       if (draining && !cpnfo.queue.GetLength()) break;
00911 
00912       if ( cpnfo.queue.GetBuffer(&buf, offs, len) ) {
00913 
00914          if (len && buf) {
00915 
00916             bytesread+=len;
00917             if (progbar) {
00918                gettimeofday(&abs_stop_time,&tz);
00919                print_progbar(bytesread,size);
00920             }
00921 
00922 #ifdef HAVE_XRDCRYPTO
00923             if (md5) {
00924               MD_5->Update((const char*)buf,len);
00925             }
00926 #endif
00927 
00928 #ifdef HAVE_LIBZ
00929                if (adlerchk) {
00930                   adler = adler32(adler, (const Bytef*)buf, len);
00931                }
00932 #endif
00933 
00934             if (doXtremeCp && (f != STDOUT_FILENO) && lseek(f, offs, SEEK_SET) < 0) {
00935                cerr << "Error '" << strerror(errno) <<
00936                   "' seeking to " << dst << endl;
00937                retvalue = 10;
00938                break;
00939             }
00940             if (write(f, buf, len) <= 0) {
00941                cerr << "Error '" << strerror(errno) <<
00942                   "' writing to " << dst << endl;
00943                retvalue = 10;
00944                break;
00945             }
00946 
00947             if (cpnfo.mon)
00948               cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0);
00949 
00950             free(buf);
00951 
00952          }
00953          else
00954             if (!xrdxtrdfile && ( ((buf == 0) && (len == 0)) || (bytesread >= size)) ) {
00955                if (buf) free(buf);
00956                break;
00957             }
00958 
00959 
00960       }
00961       else {
00962          cerr << endl << endl << "Critical read timeout. Unable to read data from the source." << endl;
00963          retvalue = -1;
00964          break;
00965       }
00966          
00967       buf = 0;
00968 
00969    }
00970 
00971    if (cpnfo.mon)
00972      cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0, 1);
00973 
00974    if(progbar) {
00975       cout << endl;
00976    }
00977 
00978    if (cpnfo.len != bytesread) retvalue = 13;
00979 
00980 #ifdef HAVE_XRDCRYPTO
00981    if (md5) MD_5->Final();
00982    if (md5 || adlerchk) {
00983       print_chksum(src, bytesread, MD_5, adler);
00984    }
00985       
00986    if (summary) {        
00987       print_summary(src,dst,bytesread,MD_5, adler);
00988    }      
00989 #else
00990    if (adlerchk) {
00991       print_chksum(src, bytesread, adler);
00992    }
00993       
00994    if (summary) {        
00995       print_summary(src,dst,bytesread,adler);
00996    }      
00997 #endif
00998 
00999    int closeres = close(f);
01000    if (!retvalue) retvalue = closeres;
01001 
01002    if (retvalue >= 0) {
01003       for (int i = 0; i < myTIDVec.GetSize(); i++) {
01004          pthread_cancel(myTIDVec[i]);
01005          pthread_join(myTIDVec[i], &thret);      
01006 
01007          delete cpnfo.XrdCli;
01008          cpnfo.XrdCli = 0;
01009       }
01010 
01011       delete cpnfo.XrdCli;
01012       cpnfo.XrdCli = 0;
01013    }
01014 
01015    return retvalue;
01016 }
01017 
01018 
01019 
01020 int doCp_loc2xrd(XrdClient **xrddest, const char *src, const char * dst) {
01021 // ----------- loc to xrd affair
01022    pthread_t myTID;
01023    void * thret;
01024    int retvalue = 0;
01025    struct stat stat;
01026 
01027    gettimeofday(&abs_start_time,&tz);
01028 
01029    // Open the input file (loc)
01030    cpnfo.localfile = open(src, O_RDONLY | O_BINARY);   
01031    if (cpnfo.localfile < 0) {
01032       cerr << "Error '" << strerror(errno) << "' opening " << src << endl;
01033       cpnfo.localfile = 0;
01034       return -1;
01035    }
01036 
01037    if (fstat(cpnfo.localfile, &stat)) {
01038      cerr << "Error '" << strerror(errno) << "' stat " << src << endl;
01039      cpnfo.localfile = 0;
01040      return -1;
01041    }
01042 
01043    // if xrddest if nonzero, then the file is already opened for writing
01044    if (!*xrddest) {
01045 
01046       *xrddest = new XrdClient(dst);
01047       if (!PedanticOpen4Write(*xrddest, kXR_ur | kXR_uw | kXR_gw | kXR_gr | kXR_or,
01048                            xrd_wr_flags) ) {
01049          cerr << "Error opening remote destination file " << dst << endl;
01050          PrintLastServerError(*xrddest);
01051          
01052          close(cpnfo.localfile);
01053          delete *xrddest;
01054          *xrddest = 0;
01055          cpnfo.localfile = 0;
01056          return -1;
01057       }
01058    }
01059       
01060    // Start reader on loc
01061    XrdSysThread::Run(&myTID, ReaderThread_loc, (void *)&cpnfo);
01062 
01063    int len = 1;
01064    void *buf;
01065    long long offs = 0;
01066    unsigned long long bytesread=0;
01067    unsigned long long size = stat.st_size;
01068    int blkcnt = 0;
01069 
01070    // Loop to write until ended or timeout err
01071    while (len > 0) {
01072 
01073       if ( cpnfo.queue.GetBuffer(&buf, offs, len) ) {
01074          if (len && buf) {
01075 
01076             bytesread+=len;
01077             if (progbar) {
01078               gettimeofday(&abs_stop_time,&tz);
01079               print_progbar(bytesread,size);
01080             }
01081 
01082 #ifdef HAVE_XRDCRYPTO
01083             if (md5) {
01084               MD_5->Update((const char*)buf,len);
01085             }
01086 #endif
01087 
01088 #ifdef HAVE_LIBZ
01089             if (adlerchk) {
01090                adler = adler32(adler, (const Bytef*)buf, len);
01091             }
01092 #endif
01093             if ( !(*xrddest)->Write(buf, offs, len) ) {
01094                cerr << "Error writing to output server." << endl;
01095                PrintLastServerError(*xrddest);
01096                retvalue = 12;
01097                break;
01098             }
01099 
01100             if (cpnfo.mon)
01101               cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0);
01102 
01103             free(buf);
01104          }
01105          else {
01106             // If we get len == 0 then we have to stop
01107             if (buf) free(buf);
01108             break;
01109          }
01110       }
01111       else {
01112          cerr << endl << endl << "Critical read timeout. Unable to read data from the source." << endl;
01113          retvalue = -1;
01114          break;
01115       }
01116 
01117       buf = 0;
01118       blkcnt++;
01119    }
01120 
01121 
01122    if (cpnfo.mon)
01123      cpnfo.mon->PutProgressInfo(bytesread, cpnfo.len, (float)bytesread / cpnfo.len * 100.0, 1);
01124 
01125    if(progbar) {
01126      cout << endl;
01127    }
01128 
01129    if (size != bytesread) retvalue = 13;
01130 
01131 #ifdef HAVE_XRDCRYPTO
01132    if (md5) MD_5->Final();
01133    if (md5 || adlerchk) {
01134       print_chksum(src, bytesread, MD_5, adler);
01135    }
01136    
01137    if (summary) {        
01138       print_summary(src, dst, bytesread, MD_5, adler);
01139    }     
01140 #else
01141    if (adlerchk) {
01142       print_chksum(src, bytesread, adler);
01143    }
01144    
01145    if (summary) {        
01146       print_summary(src, dst, bytesread, adler);
01147    }     
01148 #endif
01149    
01150    pthread_cancel(myTID);
01151    pthread_join(myTID, &thret);
01152 
01153    delete *xrddest;
01154    *xrddest = 0;
01155 
01156    close(cpnfo.localfile);
01157    cpnfo.localfile = 0;
01158 
01159    return retvalue;
01160 }
01161 
01162 
01163 void PrintUsage() {
01164    cerr << "usage: xrdcp <source> <dest> "
01165      "[-d lvl] [-DSparmname stringvalue] ... [-DIparmname intvalue] [-s] [-ns] [-v]"
01166      " [-OS<opaque info>] [-OD<opaque info>] [-force] [-md5] [-adler] [-np] [-f] [-R] [-S]" << endl << endl;
01167 
01168    cerr << "<source> can be:" << endl <<
01169      "   a local file" << endl <<
01170      "   a local directory name suffixed by /" << endl <<
01171      "   an xrootd URL in the form root://user@host/<absolute Logical File Name in xrootd domain>" << endl <<
01172      "      (can be a directory. In this case the -R option can be fully honored only on a standalone server)" << endl;
01173    cerr << "<dest> can be:" << endl <<
01174      "   a local file" << endl <<
01175      "   a local directory name suffixed by /" << endl <<
01176      "   an xrootd URL in the form root://user@host/<absolute Logical File Name in xrootd domain>" << endl <<
01177      "      (can be a directory LFN)" << endl << endl;
01178 
01179    cerr << " -d lvl :         debug level: 1 (low), 2 (medium), 3 (high)" << endl;
01180    cerr << " -D proxyaddr:proxyport" << endl <<
01181            "        :         use proxyaddr:proxyport as a SOCKS4 proxy."
01182      " Only numerical addresses are supported." << endl <<
01183    cerr << " -DSparmname stringvalue" << endl <<
01184            "        :         set the internal parm <parmname> with the string value <stringvalue>" << endl <<
01185            "                   See XrdClientConst.hh for a list of parameters." << endl;
01186    cerr << " -DIparmname intvalue" << endl <<
01187            "        :         set the internal parm <parmname> with the integer value <intvalue>" << endl <<
01188            "                   See XrdClientConst.hh for a list of parameters." << endl <<
01189            "                   Examples: -DIReadCacheSize 3000000 -DIReadAheadSize 1000000" << endl;
01190    cerr << " -s     :         silent mode, no summary output, no progress bar" << endl;
01191    cerr << " -np    :         no progress bar" << endl;
01192    cerr << " -v     :         display summary output" << endl <<endl;
01193    cerr << " -OS    :         adds some opaque information to any SOURCE xrootd url" << endl;
01194    cerr << " -OD    :         adds some opaque information to any DEST xrootd url" << endl;
01195    cerr << " -f     :         re-create a file if already present" << endl;
01196    cerr << " -F     :         set the 'force' flag for xrootd dest file opens"
01197      " (ignore if file is already opened)" << endl;
01198    cerr << " -force :         set 1-min (re)connect attempts to retry for up to 1 week,"
01199      " to block until xrdcp is executed" << endl << endl;
01200    cerr << " -md5   :         calculate the md5 checksum during transfers\n" << endl; 
01201 #ifdef HAVE_LIBZ
01202    cerr << " -adler :         calculate the adler32 checksum during transfers\n" << endl; 
01203 #endif
01204    cerr << " -R     :         recurse subdirectories (where it can be applied)" << endl;
01205    cerr << " -S num :         use <num> additional parallel streams to do the xfer." << endl << 
01206            "                  The max value is 15. The default is 0 (i.e. use only the main stream)" << endl;
01207    cerr << " -MLlibname" << endl <<
01208            "        :         use <libname> as external monitoring reporting library." << endl <<
01209            "                  The default name if XrdCpMonitorClient.so . Make sure it is reachable." << endl;
01210    cerr << " -X rdr :         Activate the Xtreme copy algorithm. Use 'rdr' as hostname where to query for " << endl <<
01211            "                  additional sources." << endl;
01212    cerr << " -x     :         Activate the Xtreme copy algorithm. Use the source hostname to query for " << endl <<
01213            "                  additional sources." << endl;
01214    cerr << " -P     :         request POSC (persist-on-successful-close) processing to create a new file." << endl;
01215    cerr << " where:" << endl;
01216    cerr << "   parmname     is the name of an internal parameter" << endl;
01217    cerr << "   stringvalue  is a string to be assigned to an internal parameter" << endl;
01218    cerr << "   intvalue     is an int to be assigned to an internal parameter" << endl;
01219 }
01220 
01221 
01222 // Main program
01223 int main(int argc, char**argv) {
01224    char *srcpath = 0, *destpath = 0;
01225    memset (BWMHost, 0, sizeof(BWMHost));
01226 
01227    if (argc < 3) {
01228       PrintUsage();
01229       exit(1);
01230    }
01231 
01232 #ifdef WIN32
01233    WORD wVersionRequested;
01234    WSADATA wsaData;
01235    int err;
01236    wVersionRequested = MAKEWORD( 2, 2 );
01237    err = WSAStartup( wVersionRequested, &wsaData );
01238 #endif
01239 
01240    DebugSetLevel(-1);
01241 
01242    // We want this tool to be able to copy from/to everywhere
01243    // Note that the side effect of these calls here is to initialize the
01244    // XrdClient environment.
01245    // This is crucial if we want to later override its default values
01246    EnvPutString( NAME_REDIRDOMAINALLOW_RE, "*" );
01247    EnvPutString( NAME_CONNECTDOMAINALLOW_RE, "*" );
01248    EnvPutString( NAME_REDIRDOMAINDENY_RE, "" );
01249    EnvPutString( NAME_CONNECTDOMAINDENY_RE, "" );
01250 
01251    EnvPutInt( NAME_READAHEADSIZE, XRDCP_XRDRASIZE);
01252    EnvPutInt( NAME_READCACHESIZE, 2*XRDCP_XRDRASIZE );
01253    EnvPutInt( NAME_READCACHEBLKREMPOLICY, XrdClientReadCache::kRmBlk_LeastOffs );
01254    EnvPutInt( NAME_PURGEWRITTENBLOCKS, 1 );
01255 
01256 
01257    EnvPutInt( NAME_DEBUG, -1);
01258 
01259    for (int i=1; i < argc; i++) {
01260 
01261       if ( (strstr(argv[i], "-s") == argv[i])) {
01262         summary=false;
01263         progbar=false;
01264         continue;
01265       }
01266 
01267       if ( (strstr(argv[i], "-np") == argv[i])) {
01268         progbar=false;
01269         continue;
01270       }
01271 
01272       if ( (strstr(argv[i], "-v") == argv[i])) {
01273         summary=true;
01274         continue;
01275       }
01276 
01277       if ( (strstr(argv[i], "-R") == argv[i])) {
01278         recurse=true;
01279         continue;
01280       }
01281 
01282       if ( (strstr(argv[i], "-OS") == argv[i])) {
01283          srcopaque=argv[i]+3;
01284          continue;
01285       }
01286       
01287       if ( (strstr(argv[i], "-OD") == argv[i])) {
01288          dstopaque=argv[i]+3;
01289          continue;
01290       }
01291 
01292       if ( (strstr(argv[i], "-F") == argv[i])) {
01293          xrd_wr_flags |= kXR_force;
01294         continue;
01295       }
01296 
01297       if ( (strstr(argv[i], "-P") == argv[i])) {
01298          xrd_wr_flags |= kXR_posc;
01299         continue;
01300       }
01301 
01302       if ( (strstr(argv[i], "-f") == argv[i])) {
01303         // Remove the kXR_new option
01304         kXR_unt16 tmp = kXR_new;
01305         tmp = ~tmp;
01306 
01307         xrd_wr_flags &= tmp;
01308 
01309         // Also delete the existing file
01310         xrd_wr_flags |= kXR_delete;
01311 
01312 
01313         // Also set up the flags for the local fs
01314         loc_wr_flags = LOC_WR_FLAGS_FORCE;
01315 
01316         continue;
01317       }
01318 
01319       if ( (strstr(argv[i], "-force") == argv[i])) {
01320          EnvPutInt( NAME_CONNECTTIMEOUT , 60);
01321          EnvPutInt( NAME_FIRSTCONNECTMAXCNT, 7*24*60);
01322          continue;
01323       }
01324 
01325 
01326       if ( (strstr(argv[i], "-DS") == argv[i]) &&
01327            (argc >= i+2) ) {
01328         cerr << "Overriding " << argv[i]+3 << " with value " << argv[i+1] << ". ";
01329          EnvPutString( argv[i]+3, argv[i+1] );
01330          cerr << " Final value: " << EnvGetString(argv[i]+3) << endl;
01331          i++;
01332          continue;
01333       }
01334 
01335       if ( (strstr(argv[i], "-DI") == argv[i]) &&
01336            (argc >= i+2) ) {
01337         cerr << "Overriding '" << argv[i]+3 << "' with value " << argv[i+1] << ". ";
01338          EnvPutInt( argv[i]+3, atoi(argv[i+1]) );
01339          cerr << " Final value: " << EnvGetLong(argv[i]+3) << endl;
01340          i++;
01341          continue;
01342       }
01343 
01344 
01345       if ( (strstr(argv[i], "-D") == argv[i]) &&
01346            (argc >= i+2) ) { 
01347 
01348         char host[1024];
01349         char *pos;
01350         int port;
01351 
01352         pos = strstr(argv[i+1], ":");
01353 
01354         if (pos && strlen(pos) > 1) {
01355 
01356           cerr << "Using '" << argv[i+1] << "' as a SOCKS4 proxy.";
01357           strncpy(host, argv[i+1], pos-argv[i+1]);
01358           host[pos-argv[i+1]] = 0;
01359 
01360           sscanf(pos+1, "%d", &port);
01361 
01362           cerr << " Host:" << host << " port: " << port << endl;
01363           EnvPutString( NAME_SOCKS4HOST, host);
01364           EnvPutInt( NAME_SOCKS4PORT, port);
01365         }
01366         else {
01367           cerr << "Malformed -D option." << endl;
01368           exit(-1);
01369         }
01370 
01371         i++;
01372         continue;
01373       }
01374 
01375 
01376       if ( (strstr(argv[i], "-d") == argv[i]) &&
01377            (argc >= i+2) ) {
01378          int dbglvl = atoi(argv[i+1]);
01379          if (dbglvl > 0) {
01380             EnvPutInt( NAME_DEBUG, dbglvl);
01381             cerr << "Set debug level " <<  EnvGetLong(NAME_DEBUG)<< endl;
01382          }
01383          i++;
01384          continue;
01385       }
01386 
01387       if ( (strstr(argv[i], "-S") == argv[i]) &&
01388            (argc >= i+2) ) {
01389          int parstreams = atoi(argv[i+1]);
01390          parstreams = xrdmin(parstreams, 15);
01391          parstreams = xrdmax(0, parstreams);
01392 
01393          EnvPutInt( NAME_MULTISTREAMCNT, parstreams);
01394 
01395          cerr << "Set " << NAME_MULTISTREAMCNT << " to " <<
01396            EnvGetLong(NAME_MULTISTREAMCNT) << endl;
01397 
01398          i++;
01399          continue;
01400       }
01401 
01402       if ( (strstr(argv[i], "-X") == argv[i]) &&
01403            (argc >= i+2) ) {
01404          doXtremeCp = true;
01405          XtremeCpRdr = argv[i+1];
01406 
01407          cerr << "Extreme Copy enabled. XtremeCpRdr: " << XtremeCpRdr << endl;
01408 
01409 
01410          i++;
01411          continue;
01412       }
01413 
01414       if ( (strstr(argv[i], "-x") == argv[i]) ) {
01415          doXtremeCp = true;
01416          XtremeCpRdr = "";
01417 
01418          cerr << "Extreme Copy enabled. " << endl;
01419 
01420          continue;
01421       }
01422 
01423 #ifdef HAVE_XRDCRYPTO
01424 #ifndef WIN32
01425       if ( (strstr(argv[i], "-md5") == argv[i])) {
01426         md5=true;
01427 
01428         if (!(gCryptoFactory = XrdCryptoFactory::GetCryptoFactory("ssl"))) {
01429           cerr << "Cannot instantiate crypto factory ssl" << endl;
01430           exit(-1);
01431         }
01432 
01433         MD_5 = gCryptoFactory->MsgDigest("md5");
01434         if (! MD_5) {
01435           cerr << "MD object could not be instantiated " << endl;
01436           exit(-1);
01437         }
01438         continue;
01439       }
01440 #endif
01441 #endif
01442 
01443 #ifdef HAVE_LIBZ
01444      if ( (strstr(argv[i], "-adler") == argv[i])) {
01445         adlerchk=true;
01446         continue;
01447      }
01448 #endif
01449 
01450       // Any other par is ignored
01451       if ( (strstr(argv[i], "-") == argv[i]) && (strlen(argv[i]) > 1) ) {
01452          cerr << "Unknown parameter " << argv[i] << endl;
01453          continue;
01454       }
01455 
01456       if (!srcpath) srcpath = argv[i];
01457       else
01458          if (!destpath) destpath = argv[i];
01459       
01460 
01461    }
01462 
01463    if (!srcpath || !destpath) {
01464       PrintUsage();
01465       exit(1);
01466    }
01467 
01468    if (XtremeCpRdr == "") XtremeCpRdr = srcpath;
01469 
01470    DebugSetLevel(EnvGetLong(NAME_DEBUG));
01471 
01472    Info(XrdClientDebug::kUSERDEBUG, "main", XRDCP_VERSION);
01473 
01474    XrdCpWorkLst *wklst = new XrdCpWorkLst();
01475    XrdOucString src, dest;
01476    XrdClient *xrddest;
01477 
01478    cpnfo.XrdCli = 0;
01479   
01480    if (wklst->SetSrc(&cpnfo.XrdCli, srcpath, srcopaque, recurse)) {
01481      cerr << "Error accessing path/file for " << srcpath << endl;
01482      exit(1);
01483    }
01484 
01485    xrddest = 0;
01486 
01487    // From here, we will have:
01488    // the knowledge if the dest is a dir name or file name
01489    // an open instance of xrdclient if it's a file
01490    if (wklst->SetDest(&xrddest, destpath, dstopaque, xrd_wr_flags)) {
01491       cerr << "Error accessing path/file for " << destpath << endl;
01492       PrintLastServerError(xrddest);
01493       exit(1);
01494    }
01495 
01496    int retval = 0;
01497    while (!retval && wklst->GetCpJob(src, dest)) {
01498       Info(XrdClientDebug::kUSERDEBUG, "main", src << " --> " << dest);
01499       
01500 #ifdef HAVE_XRDCRYPTO
01501       if (md5) {
01502         MD_5->Reset("md5");
01503       }
01504 #endif
01505       adler = 0;
01506 
01507 
01508       // Initialize monitoring client, if a plugin is present
01509       cpnfo.mon = 0;
01510 #ifndef WIN32
01511       void *monhandle = dlopen (monlibname.c_str(), RTLD_LAZY);
01512 
01513       if (monhandle) {
01514         XrdClientMonIntfHook monlibhook = (XrdClientMonIntfHook)dlsym(monhandle, "XrdClientgetMonIntf");
01515 
01516         const char *err = 0;
01517         if ((err = dlerror())) {
01518           cerr << "Error accessing monitoring client library " << monhandle << ". Inappropriate content." << endl <<
01519             "error: " << err << endl;
01520           dlclose(monhandle);
01521           monhandle = 0;
01522         }
01523         else    
01524           cpnfo.mon = (XrdClientAbsMonIntf *)monlibhook(src.c_str(), dest.c_str());
01525       }
01526 #endif
01527       
01528       if (cpnfo.mon) {
01529 
01530         char *name=0, *ver=0, *rem=0;
01531         if (!cpnfo.mon->GetMonLibInfo(&name, &ver, &rem)) {
01532           Info(XrdClientDebug::kUSERDEBUG,
01533                "main", "Monitoring client plugin found. Name:'" << name <<
01534                "' Ver:'" << ver << "' Remarks:'" << rem << "'");
01535         }
01536         else {
01537           delete cpnfo.mon;
01538           cpnfo.mon = 0;
01539         }
01540 
01541       }
01542 
01543 #ifndef WIN32
01544       if (!cpnfo.mon && monhandle) {
01545         dlclose(monhandle);
01546         monhandle = 0;
01547       }
01548 #endif
01549 
01550       // Ok, the plugin is now loaded...
01551       if (cpnfo.mon) {
01552         // We associate the monitoring debug to the XrdClient debug level
01553         cpnfo.mon->Init(src.c_str(), dest.c_str(), (DebugLevel() > 0) );
01554         cpnfo.mon->PutProgressInfo(0, cpnfo.len, 0, 1);
01555       }
01556 
01557       if ( (src.beginswith("root://")) || (src.beginswith("xroot://")) ) {
01558          // source is xrootd
01559 
01560          if (srcopaque) {
01561             src += "?";
01562             src += srcopaque;
01563          }
01564 
01565          if ( (dest.beginswith("root://")) || (dest.beginswith("xroot://")) ) {
01566             XrdOucString d;
01567             bool isd;
01568             wklst->GetDest(d, isd);
01569 
01570             BuildFullDestFilename(src, d, isd);
01571 
01572             if (dstopaque) {
01573                d += "?";
01574                d += dstopaque;
01575             }
01576 
01577             retval = doCp_xrd2xrd(&xrddest, src.c_str(), d.c_str());
01578 
01579          }
01580          else {
01581             XrdOucString d;
01582             bool isd;
01583             int res;
01584             wklst->GetDest(d, isd);
01585             res = CreateDestPath_loc(d, isd);
01586             if (!res || (errno == EEXIST) || !errno) {
01587                BuildFullDestFilename(src, d, isd);
01588                retval = doCp_xrd2loc(src.c_str(), d.c_str());
01589             }
01590             else
01591                cerr << "Error " << strerror(errno) <<
01592                      " accessing path for " << d << endl;
01593          }
01594       }
01595       else {
01596          // source is localfs
01597 
01598          if ( (dest.beginswith("root://")) || (dest.beginswith("xroot://")) ) {
01599             XrdOucString d;
01600             bool isd;
01601             wklst->GetDest(d, isd);
01602 
01603             BuildFullDestFilename(src, d, isd);
01604 
01605             if (dstopaque) {
01606                d += "?";
01607                d += dstopaque;
01608             }
01609 
01610             retval = doCp_loc2xrd(&xrddest, src.c_str(), d.c_str());
01611 
01612          }
01613          else {
01614             cerr << "Better to use cp in this case. (dest: "<<dest<<")" << endl;
01615             exit(2);
01616          }
01617 
01618       }
01619 
01620 
01621       if (cpnfo.mon) {
01622         cpnfo.mon->DeInit();
01623         delete cpnfo.mon;
01624         cpnfo.mon = 0;
01625 #ifndef WIN32
01626         if (monhandle) dlclose(monhandle);
01627         monhandle = 0;
01628 #endif
01629       }
01630 
01631 
01632    }
01633 
01634 #ifdef HAVE_XRDCRYPTO
01635    if (md5 && MD_5) 
01636      delete MD_5;
01637 #endif
01638 
01639    return retval;
01640 }

Generated on Tue Jul 5 14:46:22 2011 for ROOT_528-00b_version by  doxygen 1.5.1