XrdcpXtremeRead.cc

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // XrdXtremeRead                                                        //
00004 //                                                                      //
00005 // Author: Fabrizio Furano (CERN, 2009)                                 //
00006 //                                                                      //
00007 // Utility classes handling coordinated parallel reads from multiple    //
00008 // XrdClient instances                                                  //
00009 //                                                                      //
00010 //////////////////////////////////////////////////////////////////////////
00011 
00012 //         $Id: XrdcpXtremeRead.cc 38011 2011-02-08 18:35:57Z ganis $
00013 
00014 const char *XrdXtremeReadCVSID = "$Id: XrdcpXtremeRead.cc 38011 2011-02-08 18:35:57Z ganis $";
00015 
00016 #include "XrdClient/XrdcpXtremeRead.hh"
00017 #include "XrdClient/XrdClientAdmin.hh"
00018 
00019 XrdXtRdFile::XrdXtRdFile(int blksize, long long filesize) {
00020    blocks = 0;
00021    clientidxcnt = 0;
00022    freeblks = 0;
00023    doneblks = 0;
00024 
00025 
00026    freeblks = nblks = (filesize + blksize - 1) / blksize;
00027 
00028    blocks = new XrdXtRdBlkInfo[nblks];
00029 
00030    // Init the list of blocks
00031    long long ofs = 0;
00032    for (int i = 0; i < nblks; i++) {
00033       blocks[i].offs = ofs;
00034       blocks[i].len = xrdmax(0, xrdmin(filesize, ofs+blksize) - ofs);
00035       ofs += blocks[i].len;
00036    }
00037 
00038 }
00039 
00040 XrdXtRdFile::~XrdXtRdFile() {
00041    delete []blocks;
00042 }
00043 
00044 int XrdXtRdFile::GimmeANewClientIdx() {
00045    XrdSysMutexHelper m(mtx);
00046    return ++clientidxcnt;
00047 }
00048 
00049 int XrdXtRdFile::GetBlkToPrefetch(int fromidx, int clientidx, XrdXtRdBlkInfo *&blkreadonly) {
00050    // Considering fromidx as a starting point in the blocks array,
00051    // finds a block which is worth prefetching
00052    // If there are free blocks it's trivial
00053    // Otherwise it will be stolen from other readers which are clearly late
00054 
00055    XrdSysMutexHelper m(mtx);
00056 
00057 
00058    // Find a non assigned blk
00059    for (int i = 0; i < nblks; i++) {
00060       int pos = (fromidx + i) % nblks;
00061 
00062       // Find a non assigned blk
00063       if (blocks[pos].requests.GetSize() == 0) {
00064          blocks[pos].requests.Push_back(clientidx);
00065          blocks[pos].lastrequested = time(0);
00066          blkreadonly = &blocks[pos];
00067          return pos;
00068       }   
00069    }
00070 
00071    // Steal an outstanding missing block, even if in progress
00072    // The outcome of this is that, at the end, all thethe fastest free clients will
00073    // ask for the missing blks
00074    // The only thing to avoid is that a client asks twice the same blk for itself
00075 
00076    for (int i = nblks; i > 0; i--) {
00077       int pos = (fromidx + i) % nblks;
00078 
00079       // Find a non finished blk to steal
00080       if (!blocks[pos].done && !blocks[pos].AlreadyRequested(clientidx) &&
00081           (blocks[pos].requests.GetSize() < 3) ) {
00082 
00083          blocks[pos].requests.Push_back(clientidx);
00084          blkreadonly = &blocks[pos];
00085          blocks[pos].lastrequested = time(0);
00086          return pos;
00087       }
00088    }
00089 
00090    // No blocks to request or steal... probably everything's finished
00091    return -1;
00092 
00093 }
00094 
00095 int XrdXtRdFile::GetBlkToRead(int fromidx, int clientidx, XrdXtRdBlkInfo *&blkreadonly) {
00096    // Get the next already prefetched block, now we want to get its content
00097 
00098    XrdSysMutexHelper m(mtx);
00099 
00100    for (int i = 0; i < nblks; i++) {
00101       int pos = (fromidx + i) % nblks;
00102       if (!blocks[pos].done &&
00103           blocks[pos].AlreadyRequested(clientidx)) {
00104 
00105          blocks[pos].lastrequested = time(0);
00106          blkreadonly = &blocks[pos];
00107          return pos;
00108       }
00109    }
00110 
00111    return -1;
00112 }
00113 
00114 int XrdXtRdFile::MarkBlkAsRead(int blkidx) {
00115    XrdSysMutexHelper m(mtx);
00116 
00117    int reward = 0;
00118 
00119    // If the block was stolen by somebody else then the reward is negative
00120    if (blocks[blkidx].done) reward = -1;
00121    if (!blocks[blkidx].done) {
00122       doneblks++;
00123       if (blocks[blkidx].requests.GetSize() > 1) reward = 1;
00124    }
00125 
00126 
00127    blocks[blkidx].done = true;
00128    return reward;
00129 }
00130 
00131 
00132 int XrdXtRdFile::GetListOfSources(XrdClient *ref, XrdOucString xtrememgr, XrdClientVector<XrdClient *> &clients) {
00133    // Exploit Locate in order to find as many sources as possible.
00134    // Make sure that ref appears once and only once
00135    // Instantiate and open the relative client instances
00136 
00137    XrdClientVector<XrdClientLocate_Info> hosts;
00138    if (xtrememgr == "") return 0;
00139 
00140    // In the simple case the xtrememgr is just the host of the original url.
00141    if (!xtrememgr.beginswith("root://") && !xtrememgr.beginswith("xroot://")) {
00142       
00143       // Create an acceptable xrootd url
00144       XrdOucString loc2;
00145       loc2 = "root://";
00146       loc2 += xtrememgr;
00147       loc2 += "/xyz";
00148       xtrememgr = loc2;
00149    }
00150 
00151    XrdClientAdmin adm(xtrememgr.c_str());
00152    if (!adm.Connect()) return 0;
00153 
00154    int locateok = adm.Locate((kXR_char *)ref->GetCurrentUrl().File.c_str(), hosts, kXR_nowait);
00155    if (!locateok || !hosts.GetSize()) return 0;
00156 
00157    // Here we have at least a result... hopefully
00158    bool found = false;
00159    for (int i = 0; i < hosts.GetSize(); i++)
00160       if (ref->GetCurrentUrl().HostWPort == (const char *)(hosts[i].Location)) {
00161          found = true;
00162          break;
00163       }
00164 
00165    // Now initialize the clients and start the parallel opens
00166    for (int i = 0; i < hosts.GetSize(); i++) {
00167       XrdOucString loc;
00168 
00169       loc = "root://";
00170       loc += (const char *)hosts[i].Location;
00171       loc += "/";
00172       loc += ref->GetCurrentUrl().File;
00173       cout << "Source #" << i+1 << " " << loc << endl;
00174 
00175       XrdClient *cli = new XrdClient(loc.c_str());
00176       if (cli) {
00177             clients.Push_back(cli);
00178 
00179       }
00180 
00181    }
00182 
00183    // Eventually add the ref client to the vector
00184    if (!found && ref) clients.Push_back(ref);
00185 
00186    return clients.GetSize();
00187 }

Generated on Tue Jul 5 14:46:22 2011 for ROOT_528-00b_version by  doxygen 1.5.1