TFileCacheRead.cxx

Go to the documentation of this file.
00001 // @(#)root/io:$Id: TFileCacheRead.cxx 34934 2010-08-23 09:11:42Z brun $
00002 // Author: Rene Brun   18/05/2006
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TFileCacheRead : a cache when reading files over the network         //
00015 //                                                                      //
00016 // A caching system to speed up network I/O, i.e. when there is         //
00017 // no operating system caching support (like the buffer cache for       //
00018 // local disk I/O). The cache makes sure that every I/O is done with    //
00019 // a (large) fixed length buffer thereby avoiding many small I/O's.     //
00020 // Currently the read cache system is used by the classes TNetFile,     //
00021 // TXNetFile and TWebFile (via TFile::ReadBuffers()).                   //
00022 //                                                                      //
00023 // When processing TTree, TChain, a specialized class TTreeCache that   //
00024 // derives from this class is automatically created.                    //
00025 //                                                                      //
00026 //////////////////////////////////////////////////////////////////////////
00027 
00028 #include "TEnv.h"
00029 #include "TFile.h"
00030 #include "TFileCacheRead.h"
00031 #include "TFileCacheWrite.h"
00032 #include "TMath.h"
00033 
00034 ClassImp(TFileCacheRead)
00035 
00036 //______________________________________________________________________________
00037 TFileCacheRead::TFileCacheRead() : TObject()
00038 {
00039    // Default Constructor.
00040 
00041    fBufferSizeMin = 0;
00042    fBufferSize  = 0;
00043    fBufferLen   = 0;
00044    fNseek       = 0;
00045    fNtot        = 0;
00046    fNb          = 0;
00047    fSeekSize    = 0;
00048    fSeek        = 0;
00049    fSeekIndex   = 0;
00050    fSeekSort    = 0;
00051    fPos         = 0;
00052    fSeekLen     = 0;
00053    fSeekSortLen = 0;
00054    fSeekPos     = 0;
00055    fLen         = 0;
00056    fFile        = 0;
00057    fBuffer      = 0;
00058    fIsSorted    = kFALSE;
00059    fIsTransferred = kFALSE;
00060 
00061    fAsyncReading = kFALSE;
00062 }
00063 
00064 //_____________________________________________________________________________
00065 TFileCacheRead::TFileCacheRead(TFile *file, Int_t buffersize)
00066            : TObject()
00067 {
00068    // Creates a TFileCacheRead data structure.
00069 
00070    if (buffersize <=10000) fBufferSize = 100000;
00071    else fBufferSize = buffersize;
00072 
00073    fBufferSizeMin = fBufferSize;
00074    fBufferLen   = 0;
00075    fNseek       = 0;
00076    fNtot        = 0;
00077    fNb          = 0;
00078    fSeekSize    = 10000;
00079    fSeek        = new Long64_t[fSeekSize];
00080    fSeekIndex   = new Int_t[fSeekSize];
00081    fSeekSort    = new Long64_t[fSeekSize];
00082    fPos         = new Long64_t[fSeekSize];
00083    fSeekLen     = new Int_t[fSeekSize];
00084    fSeekSortLen = new Int_t[fSeekSize];
00085    fSeekPos     = new Int_t[fSeekSize];
00086    fLen         = new Int_t[fSeekSize];
00087    fFile        = file;
00088 
00089    fBuffer = 0;
00090 
00091    fAsyncReading = gEnv->GetValue("TFile.AsyncReading", 1);
00092    if (fAsyncReading) {
00093       // Check if asynchronous reading is supported by this TFile specialization
00094       fAsyncReading = kFALSE;
00095       if (file && !(file->ReadBufferAsync(0, 0)))
00096          fAsyncReading = kTRUE;
00097    }
00098    if (!fAsyncReading) {
00099       // we use sync primitives, hence we need the local buffer
00100       fBuffer = new char[fBufferSize];
00101    }
00102 
00103    fIsSorted    = kFALSE;
00104    fIsTransferred = kFALSE;
00105    if (file) file->SetCacheRead(this);
00106 }
00107 
00108 //_____________________________________________________________________________
00109 TFileCacheRead::~TFileCacheRead()
00110 {
00111    // Destructor.
00112 
00113    delete [] fSeek;
00114    delete [] fSeekIndex;
00115    delete [] fSeekSort;
00116    delete [] fPos;
00117    delete [] fSeekLen;
00118    delete [] fSeekSortLen;
00119    delete [] fSeekPos;
00120    delete [] fLen;
00121    delete [] fBuffer;
00122 }
00123 
00124 //_____________________________________________________________________________
00125 void TFileCacheRead::Prefetch(Long64_t pos, Int_t len)
00126 {
00127    // Add block of length len at position pos in the list of blocks to
00128    // be prefetched. If pos <= 0 the current blocks (if any) are reset.
00129 
00130    fIsSorted = kFALSE;
00131    fIsTransferred = kFALSE;
00132    if (pos <= 0) {
00133       fNseek = 0;
00134       fNtot  = 0;
00135       return;
00136    }
00137    if (fNseek >= fSeekSize) {
00138       //reallocate buffers
00139       fSeekSize *= 2;
00140       Long64_t *aSeek        = new Long64_t[fSeekSize];
00141       Int_t    *aSeekIndex   = new Int_t[fSeekSize];
00142       Long64_t *aSeekSort    = new Long64_t[fSeekSize];
00143       Long64_t *aPos         = new Long64_t[fSeekSize];
00144       Int_t    *aSeekLen     = new Int_t[fSeekSize];
00145       Int_t    *aSeekSortLen = new Int_t[fSeekSize];
00146       Int_t    *aSeekPos     = new Int_t[fSeekSize];
00147       Int_t    *aLen         = new Int_t[fSeekSize];
00148       for (Int_t i=0;i<fNseek;i++) {
00149          aSeek[i]        = fSeek[i];
00150          aSeekIndex[i]   = fSeekIndex[i];
00151          aSeekSort[i]    = fSeekSort[i];
00152          aPos[i]         = fPos[i];
00153          aSeekLen[i]     = fSeekLen[i];
00154          aSeekSortLen[i] = fSeekSortLen[i];
00155          aSeekPos[i]     = fSeekPos[i];
00156          aLen[i]         = fLen[i];
00157       }
00158       delete [] fSeek;
00159       delete [] fSeekIndex;
00160       delete [] fSeekSort;
00161       delete [] fPos;
00162       delete [] fSeekLen;
00163       delete [] fSeekSortLen;
00164       delete [] fSeekPos;
00165       delete [] fLen;
00166       fSeek        = aSeek;
00167       fSeekIndex   = aSeekIndex;
00168       fSeekSort    = aSeekSort;
00169       fPos         = aPos;
00170       fSeekLen     = aSeekLen;
00171       fSeekSortLen = aSeekSortLen;
00172       fSeekPos     = aSeekPos;
00173       fLen         = aLen;
00174    }
00175 
00176    fSeek[fNseek] = pos;
00177    fSeekLen[fNseek] = len;
00178    fNseek++;
00179    fNtot += len;
00180 }
00181 
00182 //_____________________________________________________________________________
00183 void TFileCacheRead::Print(Option_t *option) const
00184 {
00185    // Print cache statistics, like
00186    //   ******TreeCache statistics for file: cms2.root ******
00187    //   Reading............................: 72761843 bytes in 7 transactions
00188    //   Readahead..........................: 256000 bytes with overhead = 0 bytes
00189    //   Average transaction................: 10394.549000 Kbytes
00190    //   Number of blocks in current cache..: 210, total size: 6280352
00191    //
00192    // if option = "a" the list of blocks in the cache is printed
00193    // NB: this function is automatically called by TTreeCache::Print
00194       
00195    TString opt = option;
00196    opt.ToLower();
00197    printf("Reading............................: %lld bytes in %d transactions\n",fFile->GetBytesRead(),  fFile->GetReadCalls());
00198    printf("Readahead..........................: %d bytes with overhead = %lld bytes\n",TFile::GetReadaheadSize(),fFile->GetBytesReadExtra());
00199    printf("Average transaction................: %f Kbytes\n",0.001*Double_t(fFile->GetBytesRead())/Double_t(fFile->GetReadCalls()));
00200    printf("Number of blocks in current cache..: %d, total size: %d\n",fNseek,fNtot);
00201    if (!opt.Contains("a")) return;
00202    for (Int_t i=0;i<fNseek;i++) {
00203       if (fIsSorted && !opt.Contains("s")) {
00204          printf("block: %5d, from: %lld to %lld, len = %d bytes\n",i,fSeekSort[i],fSeekSort[i]+fSeekSortLen[i],fSeekSortLen[i]);
00205       } else {
00206          printf("block: %5d, from: %lld to %lld, len = %d bytes\n",i,fSeek[i],fSeek[i]+fSeekLen[i],fSeekLen[i]);
00207       }
00208    }
00209    printf ("Number of long buffers = %d\n",fNb);
00210    for (Int_t j=0;j<fNb;j++) {
00211       printf("fPos[%d] = %lld, fLen = %d\n",j,fPos[j],fLen[j]);
00212    }
00213 }
00214 
00215 //_____________________________________________________________________________
00216 Int_t TFileCacheRead::ReadBuffer(char *buf, Long64_t pos, Int_t len)
00217 {
00218    // Read buffer at position pos.
00219    // If pos is in the list of prefetched blocks read from fBuffer,
00220    // otherwise need to make a normal read from file. Returns -1 in case of
00221    // read error, 0 in case not in cache, 1 in case read from cache.
00222 
00223    Int_t loc = -1;
00224    return ReadBufferExt(buf, pos, len, loc);
00225 }
00226 
00227 //_____________________________________________________________________________
00228 Int_t TFileCacheRead::ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
00229 {
00230    // Base function for ReadBuffer. Also gives out the position
00231    // of the block in the internal buffer. This helps TTreeCacheUnzip to avoid
00232    // doing twice the binary search
00233 
00234    if (fNseek > 0 && !fIsSorted) {
00235       Sort();
00236       loc = -1;
00237 
00238       // If ReadBufferAsync is not supported by this implementation...
00239       if (!fAsyncReading) {
00240          // Then we use the vectored read to read everything now
00241          if (fFile->ReadBuffers(fBuffer,fPos,fLen,fNb)) {
00242             return -1;
00243          }
00244          fIsTransferred = kTRUE;
00245       } else {
00246          // In any case, we'll start to request the chunks.
00247          // This implementation simply reads all the chunks in advance
00248          // in the async way.
00249 
00250          // Use the async readv instead of single reads
00251          fFile->ReadBuffers(0, 0, 0, 0); //Clear the XrdClient cache
00252          if (fFile->ReadBuffers(0,fPos,fLen,fNb)) {
00253             return -1;
00254          }
00255          fIsTransferred = kTRUE;
00256       }
00257    }
00258 
00259    // in case we are writing and reading to/from this file, we much check
00260    // if this buffer is in the write cache (not yet written to the file)
00261    if (TFileCacheWrite *cachew = fFile->GetCacheWrite()) {
00262       if (cachew->ReadBuffer(buf,pos,len) == 0) {
00263          fFile->SetOffset(pos+len);
00264          return 1;
00265       }
00266    }
00267 
00268    // If asynchronous reading is supported by this implementation...
00269    if (fAsyncReading) {
00270 
00271          // Now we dont have to look for it in the local buffer
00272          // if it's async, we expect that the communication library
00273          // will handle it more efficiently than we can do here
00274 
00275       Int_t retval;
00276       if (loc < 0)
00277          loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);
00278       
00279       // We use the internal list just to notify if the list is to be reconstructed
00280       if (loc >= 0 && loc < fNseek && pos == fSeekSort[loc]) {
00281          // Block found, the caller will get it
00282          
00283          if (buf) {
00284             // disable cache to avoid infinite recursion
00285             fFile->SetCacheRead(0);
00286             if (fFile->ReadBuffer(buf, pos, len)) {
00287                return -1;
00288             }
00289             fFile->SetOffset(pos+len);
00290             fFile->SetCacheRead(this);
00291          }
00292          
00293          retval = 1;
00294       } else {
00295          // Block not found in the list, we report it as a miss
00296          retval = 0;
00297       }
00298 
00299       if (gDebug > 0)
00300          Info("ReadBuffer","pos=%lld, len=%d, retval=%d, loc=%d, fseekSort[loc]=%lld, fSeekLen[loc]=%d", pos, len, retval, loc, fSeekSort[loc], fSeekLen[loc]);
00301       
00302       return retval;
00303    } else {
00304 
00305       if (loc < 0)
00306          loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);
00307 
00308       if (loc >= 0 && loc <fNseek && pos == fSeekSort[loc]) {
00309          if (buf) {
00310             memcpy(buf,&fBuffer[fSeekPos[loc]],len);
00311             fFile->SetOffset(pos+len);
00312          }
00313          return 1;
00314       }
00315    }
00316 
00317    return 0;
00318 }
00319 
00320 //_____________________________________________________________________________
00321 void TFileCacheRead::SetFile(TFile *file)
00322 {
00323    // Set the file using this cache and reset the current blocks (if any).
00324 
00325    fFile = file;
00326 
00327    if (fAsyncReading) {
00328       // If asynchronous reading is not supported by this TFile specialization
00329       // we use sync primitives, hence we need the local buffer
00330       if (file && file->ReadBufferAsync(0, 0)) {
00331          fAsyncReading = kFALSE;
00332          fBuffer       = new char[fBufferSize];
00333       }
00334    }
00335 
00336    Prefetch(0,0);
00337 }
00338 
00339 //_____________________________________________________________________________
00340 void TFileCacheRead::Sort()
00341 {
00342    // Sort buffers to be prefetched in increasing order of positions.
00343    // Merge consecutive blocks if necessary.
00344 
00345    if (!fNseek) return;
00346    TMath::Sort(fNseek,fSeek,fSeekIndex,kFALSE);
00347    Int_t i;
00348    Int_t nb = 0;
00349    for (i=0;i<fNseek;i++) {
00350       Int_t ind = fSeekIndex[i];
00351       fSeekSort[i] = fSeek[ind];
00352       fSeekSortLen[i] = fSeekLen[ind];
00353    }
00354    if (fNtot > fBufferSizeMin) {
00355       fBufferSize = fNtot + 100;
00356       delete [] fBuffer;
00357       fBuffer = 0;
00358       // If ReadBufferAsync is not supported by this implementation
00359       // it means that we are using sync primitives, hence we need the local buffer
00360       if (!fAsyncReading)
00361          fBuffer = new char[fBufferSize];
00362    }
00363    fPos[0]  = fSeekSort[0];
00364    fLen[0]  = fSeekSortLen[0];
00365    fSeekPos[0] = 0;
00366    for (i=1;i<fNseek;i++) {
00367       fSeekPos[i] = fSeekPos[i-1] + fSeekSortLen[i-1];
00368       //in the test below 16 MBytes is pure empirirical and may depend on the file system.
00369       //increasing this number must be done with care, as it may increase
00370       //the job real time (mismatch with OS buffers)
00371       if ((fSeekSort[i] != fSeekSort[i-1]+fSeekSortLen[i-1]) ||
00372           (fLen[nb] > 16000000)) {
00373          nb++;
00374          fPos[nb] = fSeekSort[i];
00375          fLen[nb] = fSeekSortLen[i];
00376       } else {
00377          fLen[nb] += fSeekSortLen[i];
00378       }
00379    }
00380    fNb = nb+1;
00381    fIsSorted = kTRUE;
00382 }

Generated on Tue Jul 5 14:30:17 2011 for ROOT_528-00b_version by  doxygen 1.5.1