TTreeCacheUnzip.cxx

Go to the documentation of this file.
00001 // @(#)root/tree:$Id: TTreeCacheUnzip.cxx 37986 2011-02-04 21:42:15Z pcanal $
00002 // Author: Leandro Franco   10/04/2008
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 //////////////////////////////////////////////////////////////////////////
00012 //                                                                      //
00013 // TTreeCacheUnzip                                                      //
00014 //                                                                      //
00015 // Specialization of TTreeCache for parallel Unzipping                  //
00016 //                                                                      //
00017 // Fabrizio Furano (CERN) Aug 2009                                      //
00018 // Core TTree-related code borrowed from the previous version           //
00019 //  by Leandro Franco and Rene Brun                                     //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 //////////////////////////////////////////////////////////////////////////
00024 // Parallel Unzipping                                                   //
00025 //                                                                      //
00026 // TTreeCache has been specialised in order to let additional threads   //
00027 //  free to unzip in advance its content. In this implementation we     //
00028 //  support up to 10 threads, but right now it makes more sense to      //
00029 //  limit their number to 1-2                                           //
00030 //                                                                      //
00031 // The application reading data is carefully synchronized, in order to: //
00032 //  - if the block it wants is not unzipped, it self-unzips it without  //
00033 //     waiting                                                          //
00034 //  - if the block is being unzipped in parallel, it waits only         //
00035 //    for that unzip to finish                                          //
00036 //  - if the block has already been unzipped, it takes it               //
00037 //                                                                      //
00038 // This is supposed to cancel a part of the unzipping latency, at the   //
00039 //  expenses of cpu time.                                               //
00040 //                                                                      //
00041 // The default parameters are the same of the prev version, i.e. 20%    //
00042 //  of the TTreeCache cache size. To change it use                      //
00043 // TTreeCache::SetUnzipBufferSize(Long64_t bufferSize)                  //
00044 // where bufferSize must be passed in bytes.                            //
00045 //                                                                      //
00046 //////////////////////////////////////////////////////////////////////////
00047 
00048 #include "TTreeCacheUnzip.h"
00049 #include "TChain.h"
00050 #include "TBranch.h"
00051 #include "TFile.h"
00052 #include "TEventList.h"
00053 #include "TVirtualMutex.h"
00054 #include "TThread.h"
00055 #include "TCondition.h"
00056 #include "TMath.h"
00057 #include "Bytes.h"
00058 
00059 #include "TEnv.h"
00060 
00061 #define THREADCNT 2
00062 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
00063 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
00064 
00065 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::fgParallel = TTreeCacheUnzip::kDisable;
00066 
00067 // The unzip cache does not consume memory by itself, it just allocates in advance
00068 // mem blocks which are then picked as they are by the baskets.
00069 // Hence there is no good reason to limit it too much
00070 Double_t TTreeCacheUnzip::fgRelBuffSize = .5;
00071 
00072 ClassImp(TTreeCacheUnzip)
00073 
00074 //______________________________________________________________________________
00075 TTreeCacheUnzip::TTreeCacheUnzip() : TTreeCache(),
00076 
00077    fActiveThread(kFALSE),
00078    fAsyncReading(kFALSE),
00079    fCycle(0),
00080    fLastReadPos(0),
00081    fBlocksToGo(0),
00082    fUnzipLen(0),
00083    fUnzipChunks(0),
00084    fUnzipStatus(0),
00085    fTotalUnzipBytes(0),
00086    fNseekMax(0),
00087    fUnzipBufferSize(0),
00088    fNUnzip(0),
00089    fNFound(0),
00090    fNStalls(0),
00091    fNMissed(0)
00092 
00093 {
00094    // Default Constructor.
00095 
00096    Init();
00097 }
00098 
00099 //______________________________________________________________________________
00100 TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
00101    fActiveThread(kFALSE),
00102    fAsyncReading(kFALSE),
00103    fCycle(0),
00104    fLastReadPos(0),
00105    fBlocksToGo(0),
00106    fUnzipLen(0),
00107    fUnzipChunks(0),
00108    fUnzipStatus(0),
00109    fTotalUnzipBytes(0),
00110    fNseekMax(0),
00111    fUnzipBufferSize(0),
00112    fNUnzip(0),
00113    fNFound(0),
00114    fNStalls(0),                                                                 
00115    fNMissed(0)
00116 {
00117    // Constructor.
00118 
00119    Init();
00120 }
00121 
00122 //______________________________________________________________________________
00123 void TTreeCacheUnzip::Init()
00124 {
00125    // Initialization procedure common to all the constructors
00126 
00127    fMutexList        = new TMutex(kTRUE);
00128    fIOMutex          = new TMutex(kTRUE);
00129 
00130    fUnzipStartCondition   = new TCondition(fMutexList);
00131    fUnzipDoneCondition   = new TCondition(fMutexList);
00132 
00133    fTotalUnzipBytes = 0;
00134    
00135    fCompBuffer = new char[16384];
00136    fCompBufferSize = 16384;
00137  
00138    if (fgParallel == kDisable) {
00139       fParallel = kFALSE;
00140    }
00141    else if(fgParallel == kEnable || fgParallel == kForce) {
00142       SysInfo_t info;
00143       gSystem->GetSysInfo(&info);
00144 
00145       fUnzipBufferSize = Long64_t(fgRelBuffSize * GetBufferSize());
00146 
00147       if(gDebug > 0)
00148          Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
00149 
00150       fParallel = kTRUE;
00151 
00152       for (Int_t i = 0; i < 10; i++) fUnzipThread[i] = 0;
00153 
00154       StartThreadUnzip(THREADCNT);
00155 
00156    }
00157    else {
00158       Warning("TTreeCacheUnzip", "Parallel Option unknown");
00159    }
00160 
00161    // Check if asynchronous reading is supported by this TFile specialization
00162    if (gEnv->GetValue("TFile.AsyncReading", 1)) {
00163       if (fFile && !(fFile->ReadBufferAsync(0, 0)))
00164          fAsyncReading = kTRUE;
00165    }
00166 
00167 }
00168 
00169 //______________________________________________________________________________
00170 TTreeCacheUnzip::~TTreeCacheUnzip()
00171 {
00172    // destructor. (in general called by the TFile destructor
00173    // destructor. (in general called by the TFile destructor)
00174 
00175    ResetCache();
00176 
00177    if (IsActiveThread())
00178       StopThreadUnzip();
00179 
00180 
00181    delete [] fUnzipLen;
00182 
00183    delete fUnzipStartCondition;
00184    delete fUnzipDoneCondition;
00185 
00186 
00187    delete fMutexList;
00188    delete fIOMutex;
00189 
00190    delete [] fUnzipStatus;
00191    delete [] fUnzipChunks;
00192 }
00193 
00194 //_____________________________________________________________________________
00195 void TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches /*= kFALSE*/)
00196 {
00197    //add a branch to the list of branches to be stored in the cache
00198    //this function is called by TBranch::GetBasket
00199    R__LOCKGUARD(fMutexList);
00200 
00201    TTreeCache::AddBranch(b, subbranches);
00202 }
00203 
00204 //_____________________________________________________________________________
00205 void TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
00206 {
00207    //add a branch to the list of branches to be stored in the cache
00208    //this function is called by TBranch::GetBasket
00209    R__LOCKGUARD(fMutexList);
00210 
00211    TTreeCache::AddBranch(branch, subbranches);
00212 }
00213 
00214 //_____________________________________________________________________________
00215 Bool_t TTreeCacheUnzip::FillBuffer()
00216 {
00217 
00218    {
00219    // Fill the cache buffer with the branches in the cache.
00220    R__LOCKGUARD(fMutexList);
00221    fIsTransferred = kFALSE;
00222 
00223    if (fNbranches <= 0) return kFALSE;
00224    TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
00225    Long64_t entry = tree->GetReadEntry();
00226 
00227    // If the entry is in the range we previously prefetched, there is 
00228    // no point in retrying.   Note that this will also return false
00229    // during the training phase (fEntryNext is then set intentional to 
00230    // the end of the training phase).
00231    if (fEntryCurrent <= entry  && entry < fEntryNext) return kFALSE;
00232 
00233    // Triggered by the user, not the learning phase
00234    if (entry == -1)  entry=0;
00235 
00236    // Estimate number of entries that can fit in the cache compare it
00237    // to the original value of fBufferSize not to the real one
00238    Long64_t autoFlush = tree->GetAutoFlush();
00239    if (autoFlush > 0) {
00240       //case when the tree autoflush has been set
00241       Int_t averageEntrySize = tree->GetZipBytes()/tree->GetEntries();
00242       Int_t nauto = fBufferSizeMin/(averageEntrySize*autoFlush);
00243       if (nauto < 1) nauto = 1;
00244       fEntryNext = entry - entry%autoFlush + nauto*autoFlush;
00245    } else { 
00246       //case of old files before November 9 2009
00247       if (fZipBytes==0) {
00248          fEntryNext = entry + tree->GetEntries();;    
00249       } else {
00250          fEntryNext = entry + tree->GetEntries()*fBufferSizeMin/fZipBytes;
00251       }
00252    }
00253    if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
00254    if (fEntryNext > fEntryMax) fEntryNext = fEntryMax+1;
00255 
00256 
00257    fEntryCurrent = entry;
00258    
00259    // Check if owner has a TEventList set. If yes we optimize for this
00260    // Special case reading only the baskets containing entries in the
00261    // list.
00262    TEventList *elist = fOwner->GetEventList();
00263    Long64_t chainOffset = 0;
00264    if (elist) {
00265       if (fOwner->IsA() ==TChain::Class()) {
00266          TChain *chain = (TChain*)fOwner;
00267          Int_t t = chain->GetTreeNumber();
00268          chainOffset = chain->GetTreeOffset()[t];
00269       }
00270    }
00271 
00272    //clear cache buffer
00273    TFileCacheRead::Prefetch(0,0);
00274 
00275    //store baskets
00276    for (Int_t i=0;i<fNbranches;i++) {
00277       TBranch *b = (TBranch*)fBranches->UncheckedAt(i);
00278       if (b->GetDirectory()==0) continue;
00279       if (b->GetDirectory()->GetFile() != fFile) continue;
00280       Int_t nb = b->GetMaxBaskets();
00281       Int_t *lbaskets   = b->GetBasketBytes();
00282       Long64_t *entries = b->GetBasketEntry();
00283       if (!lbaskets || !entries) continue;
00284       //we have found the branch. We now register all its baskets
00285       //from the requested offset to the basket below fEntrymax
00286       Int_t blistsize = b->GetListOfBaskets()->GetSize();
00287       for (Int_t j=0;j<nb;j++) {
00288          // This basket has already been read, skip it
00289          if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
00290 
00291          Long64_t pos = b->GetBasketSeek(j);
00292          Int_t len = lbaskets[j];
00293          if (pos <= 0 || len <= 0) continue;
00294          //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
00295          if (entries[j] >= fEntryNext) continue;
00296          if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry)) continue;
00297          if (elist) {
00298             Long64_t emax = fEntryMax;
00299             if (j<nb-1) emax = entries[j+1]-1;
00300             if (!elist->ContainsRange(entries[j]+chainOffset,emax+chainOffset)) continue;
00301          }
00302          fNReadPref++;
00303 
00304          TFileCacheRead::Prefetch(pos,len);
00305       }
00306       if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n",entry,((TBranch*)fBranches->UncheckedAt(i))->GetName(),fEntryNext,fNseek,fNtot);
00307    }
00308 
00309 
00310    // Now fix the size of the status arrays
00311    ResetCache();
00312 
00313    fIsLearning = kFALSE;
00314 
00315    }
00316 
00317    return kTRUE;
00318 }
00319 
00320 //_____________________________________________________________________________
00321 void TTreeCacheUnzip::SetEntryRange(Long64_t emin, Long64_t emax)
00322 {
00323    // Set the minimum and maximum entry number to be processed
00324    // this information helps to optimize the number of baskets to read
00325    // when prefetching the branch buffers.
00326    R__LOCKGUARD(fMutexList);
00327 
00328    TTreeCache::SetEntryRange(emin, emax);
00329 }
00330 
00331 //_____________________________________________________________________________
00332 void TTreeCacheUnzip::StopLearningPhase()
00333 {
00334    // It's the same as TTreeCache::StopLearningPhase but we guarantee that
00335    // we start the unzipping just after getting the buffers
00336    R__LOCKGUARD(fMutexList);
00337 
00338 
00339    TTreeCache::StopLearningPhase();
00340 
00341 }
00342 
00343 //_____________________________________________________________________________
00344 void TTreeCacheUnzip::UpdateBranches(TTree *tree, Bool_t owner)
00345 {
00346    //update pointer to current Tree and recompute pointers to the branches in the cache
00347    R__LOCKGUARD(fMutexList);
00348 
00349    TTreeCache::UpdateBranches(tree, owner);
00350 }
00351 
00352 ///////////////////////////////////////////////////////////////////////////////
00353 //                                                                           //
00354 // From now on we have the method concerning the threading part of the cache //
00355 //                                                                           //
00356 ///////////////////////////////////////////////////////////////////////////////
00357 
00358 //_____________________________________________________________________________
00359 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::GetParallelUnzip()
00360 {
00361    // Static function that returns the parallel option
00362    // (to indicate an additional thread)
00363 
00364    return fgParallel;
00365 }
00366 
00367 //_____________________________________________________________________________
00368 Bool_t TTreeCacheUnzip::IsParallelUnzip()
00369 {
00370    // Static function that tells wether the multithreading unzipping
00371    // is activated
00372 
00373    if (fgParallel == kEnable || fgParallel == kForce)
00374       return kTRUE;
00375 
00376    return kFALSE;
00377 }
00378 
00379 //_____________________________________________________________________________
00380 Bool_t TTreeCacheUnzip::IsActiveThread()
00381 {
00382    // This indicates if the thread is active in this moment...
00383    // this variable is very important because if we change it from true to
00384    // false the thread will stop... ( see StopThreadTreeCacheUnzip() )
00385    R__LOCKGUARD(fMutexList);
00386 
00387    return fActiveThread;
00388 }
00389 
00390 //_____________________________________________________________________________
00391 Bool_t TTreeCacheUnzip::IsQueueEmpty()
00392 {
00393    // It says if the queue is empty... useful to see if we have to process
00394    // it.
00395    R__LOCKGUARD(fMutexList);
00396 
00397    if ( fIsLearning )
00398       return kTRUE;
00399 
00400    return kFALSE;
00401 }
00402 
00403 void TTreeCacheUnzip::WaitUnzipStartSignal()
00404 {
00405    // Here the threads sleep waiting for some blocks to unzip
00406 
00407    fUnzipStartCondition->TimedWaitRelative(2000);
00408 
00409 }
00410 //_____________________________________________________________________________
00411 void TTreeCacheUnzip::SendUnzipStartSignal(Bool_t broadcast)
00412 {
00413    // This will send the signal corresponfing to the queue... normally used
00414    // when we want to start processing the list of buffers.
00415 
00416    if (gDebug > 0) Info("SendSignal", " fUnzipCondition->Signal()");
00417 
00418    if (broadcast)
00419       fUnzipStartCondition->Broadcast();
00420    else 
00421       fUnzipStartCondition->Signal();
00422 }
00423 
00424 //_____________________________________________________________________________
00425 Int_t TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option)
00426 {
00427    // Static function that(de)activates multithreading unzipping
00428    // The possible options are:
00429    // kEnable _Enable_ it, which causes an automatic detection and launches the
00430    // additional thread if the number of cores in the machine is greater than one
00431    // kDisable _Disable_ will not activate the additional thread.
00432    // kForce _Force_ will start the additional thread even if there is only one core.
00433    // the default will be taken as kEnable.
00434    // returns 0 if there was an error, 1 otherwise.
00435 
00436    if(fgParallel == kEnable || fgParallel == kForce || fgParallel == kDisable) {
00437       fgParallel = option;
00438       return 1;
00439    }
00440    return 0;
00441 }
00442 
00443 
00444 struct TTreeCacheUnzipData {
00445    TTreeCacheUnzip *inst;
00446    Int_t cnt;
00447 };
00448 
00449 //_____________________________________________________________________________
00450 Int_t TTreeCacheUnzip::StartThreadUnzip(Int_t nthreads)
00451 {
00452    // The Thread is only a part of the TTreeCache but it is the part that
00453    // waits for info in the queue and process it... unfortunatly, a Thread is
00454    // not an object an we have to deal with it in the old C-Style way
00455    // Returns 0 if the thread was initialized or 1 if it was already running
00456    Int_t nt = nthreads;
00457    if (nt > 10) nt = 10;
00458 
00459    if (gDebug > 0)
00460       Info("StartThreadUnzip", "Going to start %d threads.", nt);
00461 
00462    for (Int_t i = 0; i < nt; i++) {
00463       if (!fUnzipThread[i]) {
00464          TString nm("UnzipLoop");
00465          nm += i;
00466 
00467          if (gDebug > 0)
00468             Info("StartThreadUnzip", "Going to start thread '%s'", nm.Data());
00469 
00470          TTreeCacheUnzipData *d = new TTreeCacheUnzipData;
00471          d->inst = this;
00472          d->cnt = i;
00473 
00474          fUnzipThread[i] = new TThread(nm.Data(), UnzipLoop, (void*)d);
00475          if (!fUnzipThread[i])
00476             Error("TTreeCacheUnzip::StartThreadUnzip", " Unable to create new thread.");
00477 
00478          fUnzipThread[i]->Run();
00479 
00480          // There is at least one active thread
00481          fActiveThread=kTRUE;
00482         
00483       }
00484    }
00485 
00486    return (fActiveThread == kTRUE);
00487 }
00488 
00489 //_____________________________________________________________________________
00490 Int_t TTreeCacheUnzip::StopThreadUnzip()
00491 {
00492    // To stop the thread we only need to change the value of the variable
00493    // fActiveThread to false and the loop will stop (of course, we will have)
00494    // to do the cleaning after that.
00495    // Note: The syncronization part is important here or we will try to delete
00496    //       teh object while it's still processing the queue
00497    fActiveThread = kFALSE;
00498 
00499    for (Int_t i = 0; i < 1; i++) {
00500       if(fUnzipThread[i]){
00501 
00502          SendUnzipStartSignal(kTRUE);
00503 
00504          if (fUnzipThread[i]->Exists()) {
00505             fUnzipThread[i]->Join();
00506             delete fUnzipThread[i];
00507          }
00508       }
00509 
00510    }
00511 
00512    return 1;
00513 }
00514 
00515 
00516 
00517 //_____________________________________________________________________________
00518 void* TTreeCacheUnzip::UnzipLoop(void *arg)
00519 {
00520    // This is a static function.
00521    // This is the call that will be executed in the Thread generated by
00522    // StartThreadTreeCacheUnzip... what we want to do is to inflate the next
00523    // series of buffers leaving them in the second cache.
00524    // Returns 0 when it finishes
00525    TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
00526    TTreeCacheUnzip *unzipMng = d->inst;
00527    
00528    TThread::SetCancelOn();
00529    TThread::SetCancelDeferred();
00530 
00531    Int_t thrnum = d->cnt;
00532    Int_t startindex = thrnum;
00533    Int_t locbuffsz = 16384;
00534    char *locbuff = new char[16384];
00535    Int_t res = 0;
00536    Int_t myCycle = 0;
00537 
00538    while( unzipMng->IsActiveThread() ) {
00539       res = 1;
00540 
00541       {
00542          R__LOCKGUARD(unzipMng->fMutexList);
00543          if (myCycle != unzipMng->fCycle) startindex = thrnum;
00544          myCycle = unzipMng->fCycle;
00545          if (unzipMng->fNseek) startindex = startindex % unzipMng->fNseek;
00546          else startindex = -1;
00547       }
00548    
00549 
00550       if (startindex >= 0)
00551          res = unzipMng->UnzipCache(startindex, locbuffsz, locbuff);
00552 
00553       {
00554          R__LOCKGUARD(unzipMng->fMutexList);
00555 
00556          if(!unzipMng->IsActiveThread()) break;
00557 
00558          if ((res == 1) || (!unzipMng->fIsTransferred)) {
00559             unzipMng->WaitUnzipStartSignal();
00560             startindex = unzipMng->fLastReadPos+3+thrnum;
00561          }
00562       }
00563 
00564 
00565    }
00566 
00567    delete d;
00568    delete [] locbuff;
00569    return (void *)0;
00570 }
00571 
00572 ///////////////////////////////////////////////////////////////////////////////
00573 //                                                                           //
00574 // From now on we have the method concerning the unzipping part of the cache //
00575 //                                                                           //
00576 ///////////////////////////////////////////////////////////////////////////////
00577 
00578 //_____________________________________________________________________________
00579 Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
00580 {
00581    // Read the logical record header from the buffer buf.
00582    // That must be the pointer tho the header part not the object by itself and
00583    // must contain data of at least maxbytes
00584    // Returns nread;
00585    // In output arguments:
00586    //    nbytes : number of bytes in record
00587    //             if negative, this is a deleted record
00588    //             if 0, cannot read record, wrong value of argument first
00589    //    objlen : uncompressed object size
00590    //    keylen : length of logical record header
00591    // Note that the arguments objlen and keylen are returned only
00592    // if maxbytes >=16
00593    // Note: This was adapted from TFile... so some things dont apply
00594 
00595    Version_t versionkey;
00596    Short_t klen;
00597    UInt_t datime;
00598    Int_t nb = 0,olen;
00599    Int_t nread = maxbytes;
00600    frombuf(buf,&nb);
00601    nbytes = nb;
00602    if (nb < 0) return nread;
00603    //   const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
00604    const Int_t headerSize = 16;
00605    if (nread < headerSize) return nread;
00606    frombuf(buf, &versionkey);
00607    frombuf(buf, &olen);
00608    frombuf(buf, &datime);
00609    frombuf(buf, &klen);
00610    if (!olen) olen = nbytes-klen;
00611    objlen = olen;
00612    keylen = klen;
00613    return nread;
00614 }
00615 
00616 //_____________________________________________________________________________
00617 void TTreeCacheUnzip::ResetCache()
00618 {
00619    // This will delete the list of buffers that are in the unzipping cache
00620    // and will reset certain values in the cache.
00621    // This name is ambiguos because the method doesn't reset the whole cache,
00622    // only the part related to the unzipping
00623    // Note: This method is completely different from TTreeCache::ResetCache(),
00624    // in that method we were cleaning the prefetching buffer while here we
00625    // delete the information about the unzipped buffers
00626 
00627    {
00628    R__LOCKGUARD(fMutexList);
00629 
00630    if (gDebug > 0)
00631       Info("ResetCache", "Thread: %ld -- Resetting the cache. fNseek:%d fNSeekMax:%d fTotalUnzipBytes:%lld", TThread::SelfId(), fNseek, fNseekMax, fTotalUnzipBytes);
00632    
00633    // Reset all the lists and wipe all the chunks
00634    fCycle++;
00635    for (Int_t i = 0; i < fNseekMax; i++) {
00636       if (fUnzipLen) fUnzipLen[i] = 0;
00637       if (fUnzipChunks) {
00638          if (fUnzipChunks[i]) delete [] fUnzipChunks[i];
00639          fUnzipChunks[i] = 0;
00640       }
00641       if (fUnzipStatus) fUnzipStatus[i] = 0;
00642 
00643    }
00644 
00645    while (fActiveBlks.size()) fActiveBlks.pop();
00646 
00647    if(fNseekMax < fNseek){
00648       if (gDebug > 0)
00649          Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
00650 
00651       Byte_t *aUnzipStatus = new Byte_t[fNseek];
00652       memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
00653 
00654       Int_t *aUnzipLen = new Int_t[fNseek];
00655       memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
00656 
00657       char **aUnzipChunks = new char *[fNseek];
00658       memset(aUnzipChunks, 0, fNseek*sizeof(char *));
00659 
00660       if (fUnzipStatus) delete [] fUnzipStatus;
00661       if (fUnzipLen) delete [] fUnzipLen;
00662       if (fUnzipChunks) delete [] fUnzipChunks;
00663 
00664       fUnzipStatus  = aUnzipStatus;
00665       fUnzipLen  = aUnzipLen;
00666       fUnzipChunks = aUnzipChunks;
00667 
00668       fNseekMax  = fNseek;
00669    }
00670 
00671 
00672 
00673    fLastReadPos = 0;
00674    fTotalUnzipBytes = 0;
00675    fBlocksToGo = fNseek;
00676    }
00677 
00678 
00679    SendUnzipStartSignal(kTRUE);
00680 
00681 
00682 }
00683 
00684 //_____________________________________________________________________________
00685 Int_t TTreeCacheUnzip::GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
00686 {
00687    // We try to read a buffer that has already been unzipped
00688    // Returns -1 in case of read failure, 0 in case it's not in the
00689    // cache and n>0 in case read from cache (number of bytes copied).
00690    // pos and len are the original values as were passed to ReadBuffer
00691    // but instead we will return the inflated buffer.
00692    // Note!! : If *buf == 0 we will allocate the buffer and it will be the
00693    // responsability of the caller to free it... it is useful for example
00694    // to pass it to the creator of TBuffer
00695    Int_t res = 0;
00696    Int_t loc = -1;
00697 
00698    {
00699       R__LOCKGUARD(fMutexList);
00700 
00701 
00702       // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
00703       //  pointer to the original zipped chunk
00704       //  its index in the original unsorted offsets lists
00705       //  
00706       // Actually there are situations in which copying the buffer is not
00707       // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
00708       // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
00709       // better to be done in the main thread.
00710 
00711       // And now loc is the position of the chunk in the array of the sorted chunks
00712       Int_t myCycle = fCycle;
00713 
00714 
00715       if (fParallel && !fIsLearning) {
00716 
00717 
00718          if(fNseekMax < fNseek){
00719             if (gDebug > 0)
00720                Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
00721 
00722             Byte_t *aUnzipStatus = new Byte_t[fNseek];
00723             memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
00724 
00725             Int_t *aUnzipLen = new Int_t[fNseek];
00726             memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
00727 
00728             char **aUnzipChunks = new char *[fNseek];
00729             memset(aUnzipChunks, 0, fNseek*sizeof(char *));
00730 
00731             for (Int_t i = 0; i < fNseekMax; i++) {
00732                aUnzipStatus[i] = fUnzipStatus[i];
00733                aUnzipLen[i] = fUnzipLen[i];
00734                aUnzipChunks[i] = fUnzipChunks[i];
00735             }
00736 
00737             if (fUnzipStatus) delete [] fUnzipStatus;
00738             if (fUnzipLen) delete [] fUnzipLen;
00739             if (fUnzipChunks) delete [] fUnzipChunks;
00740 
00741             fUnzipStatus  = aUnzipStatus;
00742             fUnzipLen  = aUnzipLen;
00743             fUnzipChunks = aUnzipChunks;
00744 
00745             fNseekMax  = fNseek;
00746          }
00747 
00748 
00749 
00750 
00751 
00752 
00753          
00754          loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);
00755          if ( (fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc]) ) {
00756 
00757             // The buffer is, at minimum, in the file cache. We must know its index in the requests list
00758             // In order to get its info
00759             Int_t seekidx = fSeekIndex[loc];
00760 
00761             fLastReadPos = seekidx;
00762 
00763             do {
00764 
00765 
00766 
00767 
00768                // If the block is ready we get it immediately.
00769                // And also we don't have to alloc the blks. This is supposed to be
00770                // the main thread of the app.
00771                if ((fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0)) {
00772 
00773                   //if (gDebug > 0)
00774                   //   Info("GetUnzipBuffer", "++++++++++++++++++++ CacheHIT Block wanted: %d  len:%d req_len:%d fNseek:%d", seekidx, fUnzipLen[seekidx], len,  fNseek);
00775 
00776                   if(!(*buf)) {
00777                      *buf = fUnzipChunks[seekidx];
00778                      fUnzipChunks[seekidx] = 0;
00779                      fTotalUnzipBytes -= fUnzipLen[seekidx];
00780                      SendUnzipStartSignal(kFALSE);
00781                      *free = kTRUE;
00782                   }
00783                   else {
00784                      memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
00785                      delete fUnzipChunks[seekidx];
00786                      fTotalUnzipBytes -= fUnzipLen[seekidx];
00787                      fUnzipChunks[seekidx] = 0;
00788                      SendUnzipStartSignal(kFALSE);
00789                      *free = kFALSE;
00790                   }
00791                
00792                   fNFound++;
00793                
00794                   return fUnzipLen[seekidx];
00795                }
00796 
00797                // If the status of the unzipped chunk is pending
00798                // we wait on the condvar, hoping that the next signal is the good one
00799                if ( fUnzipStatus[seekidx] == 1 ) {
00800                   //fMutexList->UnLock();
00801                   fUnzipDoneCondition->TimedWaitRelative(200);
00802                   //fMutexList->Lock();
00803 
00804                   if ( myCycle != fCycle ) {
00805                      if (gDebug > 0)
00806                         Info("GetUnzipBuffer", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
00807                              IsActiveThread(), fNseek, fIsLearning);
00808 
00809                      fLastReadPos = 0;
00810 
00811                      seekidx = -1;
00812                      break;
00813                   }
00814 
00815                }
00816 
00817             } while ( fUnzipStatus[seekidx] == 1 );
00818 
00819             //if (gDebug > 0)
00820             //   Info("GetUnzipBuffer", "------- Block wanted: %d  status: %d len: %d chunk: %llx ", seekidx, fUnzipStatus[seekidx], fUnzipLen[seekidx], fUnzipChunks[seekidx]);
00821 
00822             // Here the block is not pending. It could be done or aborted or not yet being processed.
00823             if ( (seekidx >= 0) && (fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0) ) {
00824 
00825                //if (gDebug > 0)
00826                //   Info("GetUnzipBuffer", "++++++++++++++++++++ CacheLateHIT Block wanted: %d  len:%d fNseek:%d", seekidx, fUnzipLen[seekidx], fNseek);
00827 
00828                if(!(*buf)) {
00829                   *buf = fUnzipChunks[seekidx];
00830                   fUnzipChunks[seekidx] = 0;
00831                   fTotalUnzipBytes -= fUnzipLen[seekidx];
00832                   SendUnzipStartSignal(kFALSE);
00833                   *free = kTRUE;
00834                }
00835                else {
00836                   memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
00837                   delete fUnzipChunks[seekidx];
00838                   fTotalUnzipBytes -= fUnzipLen[seekidx];
00839                   fUnzipChunks[seekidx] = 0;
00840                   SendUnzipStartSignal(kFALSE);
00841                   *free = kFALSE;
00842                }
00843 
00844 
00845                fNStalls++;
00846 
00847                return fUnzipLen[seekidx];
00848             }
00849             else {
00850                // This is a complete miss. We want to avoid the threads
00851                // to try unzipping this block in the future.
00852                fUnzipStatus[seekidx] = 2;
00853                fUnzipChunks[seekidx] = 0;
00854 
00855                if ((fTotalUnzipBytes < fUnzipBufferSize) && fBlocksToGo)
00856                   SendUnzipStartSignal(kFALSE);
00857 
00858                //if (gDebug > 0)
00859                //   Info("GetUnzipBuffer", "++++++++++++++++++++ CacheMISS Block wanted: %d  len:%d fNseek:%d", seekidx, len, fNseek);
00860             }
00861 
00862 
00863 
00864          } else {
00865             loc = -1;
00866             //fLastReadPos = 0;
00867             fIsTransferred = kFALSE;
00868          }
00869 
00870       } else {
00871          // We need to reset it for new transferences...
00872          //ResetCache();
00873          //TFileCacheRead::Prefetch(0,0);
00874       }
00875 
00876    } // scope of the lock!
00877 
00878    if (len > fCompBufferSize) {
00879       delete [] fCompBuffer;
00880       fCompBuffer = new char[len];
00881       fCompBufferSize = len;
00882    } else {
00883       if (fCompBufferSize > len*4) {
00884       delete [] fCompBuffer;
00885       fCompBuffer = new char[len*2];
00886       fCompBufferSize = len*2;
00887       }
00888    }
00889 
00890 
00891    {
00892       R__LOCKGUARD(fIOMutex);
00893       // Here we know that the async unzip of the wanted chunk
00894       // was not done for some reason. We continue.
00895 
00896       res = 0;
00897       if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
00898          //Info("GetUnzipBuffer", "++++++++++++++++++++ CacheMISS %d %d", loc, fNseek);
00899          fFile->Seek(pos);
00900          res = fFile->ReadBuffer(fCompBuffer, len);
00901       }
00902    
00903       
00904       if (res) res = -1;
00905       
00906    } // scope of the lock!
00907 
00908    if (!res) {
00909       res = UnzipBuffer(buf, fCompBuffer);
00910       *free = kTRUE;
00911    }
00912 
00913 
00914    if (!fIsLearning) {
00915       fNMissed++;
00916    }
00917 
00918    return res;
00919    
00920 }
00921 
00922 
00923 //_____________________________________________________________________________
00924 void TTreeCacheUnzip::SetUnzipRelBufferSize(Float_t relbufferSize)
00925 {
00926    // static function: Sets the unzip relatibe buffer size
00927    // FABRIZIO: PLEASE DOCUMENT and also in TTree::Set...
00928 
00929    fgRelBuffSize = relbufferSize;
00930 }
00931 
00932 
00933 //_____________________________________________________________________________
00934 void TTreeCacheUnzip::SetUnzipBufferSize(Long64_t bufferSize)
00935 {
00936    // Sets the size for the unzipping cache... by default it should be
00937    // two times the size of the prefetching cache
00938    R__LOCKGUARD(fMutexList);
00939    
00940    fUnzipBufferSize = bufferSize;
00941 }
00942 
00943 //_____________________________________________________________________________
00944 Int_t TTreeCacheUnzip::UnzipBuffer(char **dest, char *src)
00945 {
00946    // UNzips a ROOT specific buffer... by reading the header at the beginning.
00947    // returns the size of the inflated buffer or -1 if error
00948    // Note!! : If *dest == 0 we will allocate the buffer and it will be the
00949    // responsability of the caller to free it... it is useful for example
00950    // to pass it to the creator of TBuffer
00951    // src is the original buffer with the record (header+compressed data)
00952    // *dest is the inflated buffer (including the header)
00953    Int_t  uzlen = 0;
00954    Bool_t alloc = kFALSE;
00955 
00956    // Here we read the header of the buffer
00957    const Int_t hlen=128;
00958    Int_t nbytes=0, objlen=0, keylen=0;
00959    GetRecordHeader(src, hlen, nbytes, objlen, keylen);
00960 
00961    if (!(*dest)) {
00962       /* early consistency check */
00963       UChar_t *bufcur = (UChar_t *) (src + keylen);
00964       Int_t nin, nbuf;
00965       if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
00966          Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
00967          uzlen = -1;
00968          return uzlen;
00969       }
00970       Int_t l = keylen+objlen;
00971       *dest = new char[l];
00972       alloc = kTRUE;
00973    }
00974    // Must unzip the buffer
00975    // fSeekPos[ind]; adress of zipped buffer
00976    // fSeekLen[ind]; len of the zipped buffer
00977    // &fBuffer[fSeekPos[ind]]; memory address
00978 
00979    // This is similar to TBasket::ReadBasketBuffers
00980    Bool_t oldCase = objlen==nbytes-keylen
00981       && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel()!=0
00982       && fFile->GetVersion()<=30401;
00983 
00984    if (objlen > nbytes-keylen || oldCase) {
00985 
00986       // Copy the key
00987       memcpy(*dest, src, keylen);
00988       uzlen += keylen;
00989 
00990       char *objbuf = *dest + keylen;
00991       UChar_t *bufcur = (UChar_t *) (src + keylen);
00992       Int_t nin, nout, nbuf;
00993       Int_t noutot = 0;
00994 
00995       while (1) {
00996          Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
00997          if (hc!=0) break;
00998          if (gDebug > 2)
00999             Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
01000                  nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
01001          if (oldCase && (nin > objlen || nbuf > objlen)) {
01002             if (gDebug > 2)
01003                Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
01004 
01005             //buffer was very likely not compressed in an old version
01006             memcpy( *dest + keylen, src + keylen, objlen);
01007             uzlen += objlen;
01008             return uzlen;
01009          }
01010 
01011          R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
01012 
01013 
01014 
01015          if (gDebug > 2)
01016             Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
01017                  nin, bufcur, nbuf, objbuf, nout);
01018 
01019          if (!nout) break;
01020          noutot += nout;
01021          if (noutot >= objlen) break;
01022          bufcur += nin;
01023          objbuf += nout;
01024       }
01025 
01026       if (noutot != objlen) {
01027          Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
01028                nbytes,keylen,objlen, noutot,nout,nin,nbuf);
01029          uzlen = -1;
01030          if(alloc) delete [] *dest;
01031          *dest = 0;
01032          return uzlen;
01033       }
01034       uzlen += objlen;
01035    } else {
01036       memcpy(*dest, src, keylen);
01037       uzlen += keylen;
01038       memcpy(*dest + keylen, src + keylen, objlen);
01039       uzlen += objlen;
01040    }
01041    return uzlen;
01042 }
01043 
01044 //_____________________________________________________________________________
01045 Int_t TTreeCacheUnzip::UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
01046 {
01047    // This inflates all the buffers in the cache.. passing the data to a new
01048    // buffer that will only wait there to be read...
01049    // We can not inflate all the buffers in the cache so we will try to do
01050    // it until the cache gets full... there is a member called fUnzipBufferSize which will
01051    // tell us the max size we can allocate for this cache.
01052    //
01053    // note that we will  unzip in the order they were put into the cache not
01054    // the order of the transference so it has to be read in that order or the
01055    // pre-unzipping will be useless.
01056    //
01057    // startindex is used as start index to check for blks to be unzipped
01058    //
01059    // returns 0 in normal conditions or -1 if error, 1 if it would like to sleep
01060    //
01061    // This func is supposed to compete among an indefinite number of threads to get a chunk to inflate
01062    // in order to accommodate multiple unzippers
01063    // Since everything is so async, we cannot use a fixed buffer, we are forced to keep
01064    // the individual chunks as separate blocks, whose summed size does not exceed the maximum
01065    // allowed. The pointers are kept globally in the array fUnzipChunks
01066    Int_t myCycle;
01067    const Int_t hlen=128;
01068    Int_t objlen=0, keylen=0;
01069    Int_t nbytes=0;
01070    Int_t readbuf = 0;
01071 
01072    Int_t idxtounzip = -1;
01073    Long64_t rdoffs = 0;
01074    Int_t rdlen = 0;
01075    {
01076       R__LOCKGUARD(fMutexList);
01077 
01078       if (!IsActiveThread() || !fNseek || fIsLearning || !fIsTransferred) {
01079          if (gDebug > 0)
01080             Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
01081                  IsActiveThread(), fNseek, fIsLearning);
01082          return 1;
01083       }
01084 
01085       // To synchronize with the 'paging'
01086       myCycle = fCycle;
01087 
01088       // Try to look for a blk to unzip
01089       idxtounzip = -1;
01090       rdoffs = 0;
01091       rdlen = 0;
01092       if (fTotalUnzipBytes < fUnzipBufferSize) {
01093  
01094 
01095          if (fBlocksToGo > 0) {
01096             for (Int_t ii=0; ii < fNseek; ii++) {
01097                Int_t reqi = (startindex+ii) % fNseek;
01098                if (!fUnzipStatus[reqi] && (fSeekLen[reqi] > 256)   ) {
01099                   // We found a chunk which is not unzipped nor pending
01100                   fUnzipStatus[reqi] = 1; // Set it as pending
01101                   idxtounzip = reqi;
01102 
01103                   rdoffs = fSeek[idxtounzip];
01104                   rdlen = fSeekLen[idxtounzip];
01105                   break;
01106                }
01107             }
01108             if (idxtounzip < 0) fBlocksToGo = 0;
01109          }
01110 
01111 
01112       }
01113 
01114    } // lock scope
01115 
01116 
01117 
01118    if (idxtounzip < 0) {
01119       if (gDebug > 0)
01120          Info("UnzipCache", "Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
01121               startindex, fTotalUnzipBytes, fUnzipBufferSize, fNseek );
01122       return 1;
01123    }
01124 
01125 
01126    // And here we have a new blk to unzip
01127    startindex = idxtounzip+THREADCNT;
01128 
01129 
01130    if (!IsActiveThread() || !fNseek || fIsLearning ) {
01131       if (gDebug > 0)
01132          Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
01133               IsActiveThread(), fNseek, fIsLearning);
01134       return 1;
01135    }
01136 
01137    Int_t loc = -1;
01138 
01139    // Prepare a static tmp buf of adequate size
01140    if(locbuffsz < rdlen) {
01141       if (locbuff) delete [] locbuff;
01142       locbuffsz = rdlen;
01143       locbuff = new char[locbuffsz];
01144       //memset(locbuff, 0, locbuffsz);
01145    } else
01146       if(locbuffsz > rdlen*3) {
01147          if (locbuff) delete [] locbuff;
01148          locbuffsz = rdlen*2;
01149          locbuff = new char[locbuffsz];
01150          //memset(locbuff, 0, locbuffsz);
01151       }
01152 
01153 
01154       if (gDebug > 0) 
01155       Info("UnzipCache", "Going to unzip block %d", idxtounzip);
01156 
01157       readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
01158 
01159    {
01160       R__LOCKGUARD(fMutexList);
01161 
01162       if ( (myCycle != fCycle) || !fIsTransferred )  {
01163          if (gDebug > 0)
01164             Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
01165                  IsActiveThread(), fNseek, fIsLearning);
01166 
01167          fUnzipStatus[idxtounzip] = 2; // Set it as not done
01168          fUnzipChunks[idxtounzip] = 0;
01169          fUnzipLen[idxtounzip] = 0;
01170               fUnzipDoneCondition->Signal();
01171 
01172          startindex = 0;
01173          return 1;
01174       }
01175 
01176 
01177       if (readbuf <= 0) {
01178          fUnzipStatus[idxtounzip] = 2; // Set it as not done
01179          fUnzipChunks[idxtounzip] = 0;
01180          fUnzipLen[idxtounzip] = 0;
01181          if (gDebug > 0)
01182             Info("UnzipCache", "Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
01183          return -1;
01184       }
01185 
01186 
01187       GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
01188 
01189       Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
01190 
01191       // If the single unzipped chunk is really too big, reset it to not processable
01192       // I.e. mark it as done but set the pointer to 0
01193       // This block will be unzipped synchronously in the main thread
01194       if (len > 4*fUnzipBufferSize) {
01195 
01196          //if (gDebug > 0)
01197             Info("UnzipCache", "Block %d is too big, skipping.", idxtounzip);
01198 
01199          fUnzipStatus[idxtounzip] = 2; // Set it as done
01200          fUnzipChunks[idxtounzip] = 0;
01201          fUnzipLen[idxtounzip] = 0;
01202 
01203          fUnzipDoneCondition->Signal();
01204          return 0;
01205       }
01206 
01207    } // Scope of the lock
01208 
01209    // Unzip it into a new blk
01210    char *ptr = 0;
01211    Int_t loclen = 0;
01212 
01213    loclen = UnzipBuffer(&ptr, locbuff);
01214 
01215    if ((loclen > 0) && (loclen == objlen+keylen)) {
01216       R__LOCKGUARD(fMutexList);
01217 
01218       if ( (myCycle != fCycle)  || !fIsTransferred) {
01219          if (gDebug > 0)
01220             Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
01221                  IsActiveThread(), fNseek, fIsLearning);
01222          delete [] ptr;
01223 
01224          fUnzipStatus[idxtounzip] = 2; // Set it as not done
01225          fUnzipChunks[idxtounzip] = 0;
01226          fUnzipLen[idxtounzip] = 0;
01227 
01228          startindex = 0;
01229               fUnzipDoneCondition->Signal();
01230          return 1;
01231       }
01232 
01233       fUnzipStatus[idxtounzip] = 2; // Set it as done
01234       fUnzipChunks[idxtounzip] = ptr;
01235       fUnzipLen[idxtounzip] = loclen;
01236       fTotalUnzipBytes += loclen;
01237 
01238       fActiveBlks.push(idxtounzip);
01239 
01240       if (gDebug > 0)
01241          Info("UnzipCache", "reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
01242               idxtounzip, rdoffs, rdlen, loclen);
01243       
01244       fNUnzip++;
01245    }
01246    else {
01247       R__LOCKGUARD(fMutexList);
01248       Info("argh", "loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
01249       fUnzipStatus[idxtounzip] = 2; // Set it as done
01250       fUnzipChunks[idxtounzip] = 0;
01251       fUnzipLen[idxtounzip] = 0;
01252    }
01253 
01254 
01255    fUnzipDoneCondition->Signal();
01256 
01257    return 0;
01258 }
01259 
01260 void  TTreeCacheUnzip::Print(Option_t* option) const {
01261 
01262    printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
01263    printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
01264    printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
01265    printf("Number of hits: %d\n", fNFound);
01266    printf("Number of stalls: %d\n", fNStalls);
01267    printf("Number of misses: %d\n", fNMissed);
01268    
01269    TTreeCache::Print(option);
01270 }
01271 
01272 
01273 //_____________________________________________________________________________
01274 Int_t TTreeCacheUnzip::ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) {
01275 
01276    R__LOCKGUARD(fIOMutex);
01277    return TTreeCache::ReadBufferExt(buf, pos, len, loc);
01278 
01279 }

Generated on Tue Jul 5 15:34:10 2011 for ROOT_528-00b_version by  doxygen 1.5.1