00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048 #include "TTreeCacheUnzip.h"
00049 #include "TChain.h"
00050 #include "TBranch.h"
00051 #include "TFile.h"
00052 #include "TEventList.h"
00053 #include "TVirtualMutex.h"
00054 #include "TThread.h"
00055 #include "TCondition.h"
00056 #include "TMath.h"
00057 #include "Bytes.h"
00058
00059 #include "TEnv.h"
00060
00061 #define THREADCNT 2
00062 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
00063 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
00064
00065 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::fgParallel = TTreeCacheUnzip::kDisable;
00066
00067
00068
00069
00070 Double_t TTreeCacheUnzip::fgRelBuffSize = .5;
00071
00072 ClassImp(TTreeCacheUnzip)
00073
00074
00075 TTreeCacheUnzip::TTreeCacheUnzip() : TTreeCache(),
00076
00077 fActiveThread(kFALSE),
00078 fAsyncReading(kFALSE),
00079 fCycle(0),
00080 fLastReadPos(0),
00081 fBlocksToGo(0),
00082 fUnzipLen(0),
00083 fUnzipChunks(0),
00084 fUnzipStatus(0),
00085 fTotalUnzipBytes(0),
00086 fNseekMax(0),
00087 fUnzipBufferSize(0),
00088 fNUnzip(0),
00089 fNFound(0),
00090 fNStalls(0),
00091 fNMissed(0)
00092
00093 {
00094
00095
00096 Init();
00097 }
00098
00099
00100 TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
00101 fActiveThread(kFALSE),
00102 fAsyncReading(kFALSE),
00103 fCycle(0),
00104 fLastReadPos(0),
00105 fBlocksToGo(0),
00106 fUnzipLen(0),
00107 fUnzipChunks(0),
00108 fUnzipStatus(0),
00109 fTotalUnzipBytes(0),
00110 fNseekMax(0),
00111 fUnzipBufferSize(0),
00112 fNUnzip(0),
00113 fNFound(0),
00114 fNStalls(0),
00115 fNMissed(0)
00116 {
00117
00118
00119 Init();
00120 }
00121
00122
00123 void TTreeCacheUnzip::Init()
00124 {
00125
00126
00127 fMutexList = new TMutex(kTRUE);
00128 fIOMutex = new TMutex(kTRUE);
00129
00130 fUnzipStartCondition = new TCondition(fMutexList);
00131 fUnzipDoneCondition = new TCondition(fMutexList);
00132
00133 fTotalUnzipBytes = 0;
00134
00135 fCompBuffer = new char[16384];
00136 fCompBufferSize = 16384;
00137
00138 if (fgParallel == kDisable) {
00139 fParallel = kFALSE;
00140 }
00141 else if(fgParallel == kEnable || fgParallel == kForce) {
00142 SysInfo_t info;
00143 gSystem->GetSysInfo(&info);
00144
00145 fUnzipBufferSize = Long64_t(fgRelBuffSize * GetBufferSize());
00146
00147 if(gDebug > 0)
00148 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
00149
00150 fParallel = kTRUE;
00151
00152 for (Int_t i = 0; i < 10; i++) fUnzipThread[i] = 0;
00153
00154 StartThreadUnzip(THREADCNT);
00155
00156 }
00157 else {
00158 Warning("TTreeCacheUnzip", "Parallel Option unknown");
00159 }
00160
00161
00162 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
00163 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
00164 fAsyncReading = kTRUE;
00165 }
00166
00167 }
00168
00169
00170 TTreeCacheUnzip::~TTreeCacheUnzip()
00171 {
00172
00173
00174
00175 ResetCache();
00176
00177 if (IsActiveThread())
00178 StopThreadUnzip();
00179
00180
00181 delete [] fUnzipLen;
00182
00183 delete fUnzipStartCondition;
00184 delete fUnzipDoneCondition;
00185
00186
00187 delete fMutexList;
00188 delete fIOMutex;
00189
00190 delete [] fUnzipStatus;
00191 delete [] fUnzipChunks;
00192 }
00193
00194
00195 void TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches )
00196 {
00197
00198
00199 R__LOCKGUARD(fMutexList);
00200
00201 TTreeCache::AddBranch(b, subbranches);
00202 }
00203
00204
00205 void TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches )
00206 {
00207
00208
00209 R__LOCKGUARD(fMutexList);
00210
00211 TTreeCache::AddBranch(branch, subbranches);
00212 }
00213
00214
00215 Bool_t TTreeCacheUnzip::FillBuffer()
00216 {
00217
00218 {
00219
00220 R__LOCKGUARD(fMutexList);
00221 fIsTransferred = kFALSE;
00222
00223 if (fNbranches <= 0) return kFALSE;
00224 TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
00225 Long64_t entry = tree->GetReadEntry();
00226
00227
00228
00229
00230
00231 if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
00232
00233
00234 if (entry == -1) entry=0;
00235
00236
00237
00238 Long64_t autoFlush = tree->GetAutoFlush();
00239 if (autoFlush > 0) {
00240
00241 Int_t averageEntrySize = tree->GetZipBytes()/tree->GetEntries();
00242 Int_t nauto = fBufferSizeMin/(averageEntrySize*autoFlush);
00243 if (nauto < 1) nauto = 1;
00244 fEntryNext = entry - entry%autoFlush + nauto*autoFlush;
00245 } else {
00246
00247 if (fZipBytes==0) {
00248 fEntryNext = entry + tree->GetEntries();;
00249 } else {
00250 fEntryNext = entry + tree->GetEntries()*fBufferSizeMin/fZipBytes;
00251 }
00252 }
00253 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
00254 if (fEntryNext > fEntryMax) fEntryNext = fEntryMax+1;
00255
00256
00257 fEntryCurrent = entry;
00258
00259
00260
00261
00262 TEventList *elist = fOwner->GetEventList();
00263 Long64_t chainOffset = 0;
00264 if (elist) {
00265 if (fOwner->IsA() ==TChain::Class()) {
00266 TChain *chain = (TChain*)fOwner;
00267 Int_t t = chain->GetTreeNumber();
00268 chainOffset = chain->GetTreeOffset()[t];
00269 }
00270 }
00271
00272
00273 TFileCacheRead::Prefetch(0,0);
00274
00275
00276 for (Int_t i=0;i<fNbranches;i++) {
00277 TBranch *b = (TBranch*)fBranches->UncheckedAt(i);
00278 if (b->GetDirectory()==0) continue;
00279 if (b->GetDirectory()->GetFile() != fFile) continue;
00280 Int_t nb = b->GetMaxBaskets();
00281 Int_t *lbaskets = b->GetBasketBytes();
00282 Long64_t *entries = b->GetBasketEntry();
00283 if (!lbaskets || !entries) continue;
00284
00285
00286 Int_t blistsize = b->GetListOfBaskets()->GetSize();
00287 for (Int_t j=0;j<nb;j++) {
00288
00289 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
00290
00291 Long64_t pos = b->GetBasketSeek(j);
00292 Int_t len = lbaskets[j];
00293 if (pos <= 0 || len <= 0) continue;
00294
00295 if (entries[j] >= fEntryNext) continue;
00296 if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry)) continue;
00297 if (elist) {
00298 Long64_t emax = fEntryMax;
00299 if (j<nb-1) emax = entries[j+1]-1;
00300 if (!elist->ContainsRange(entries[j]+chainOffset,emax+chainOffset)) continue;
00301 }
00302 fNReadPref++;
00303
00304 TFileCacheRead::Prefetch(pos,len);
00305 }
00306 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n",entry,((TBranch*)fBranches->UncheckedAt(i))->GetName(),fEntryNext,fNseek,fNtot);
00307 }
00308
00309
00310
00311 ResetCache();
00312
00313 fIsLearning = kFALSE;
00314
00315 }
00316
00317 return kTRUE;
00318 }
00319
00320
00321 void TTreeCacheUnzip::SetEntryRange(Long64_t emin, Long64_t emax)
00322 {
00323
00324
00325
00326 R__LOCKGUARD(fMutexList);
00327
00328 TTreeCache::SetEntryRange(emin, emax);
00329 }
00330
00331
00332 void TTreeCacheUnzip::StopLearningPhase()
00333 {
00334
00335
00336 R__LOCKGUARD(fMutexList);
00337
00338
00339 TTreeCache::StopLearningPhase();
00340
00341 }
00342
00343
00344 void TTreeCacheUnzip::UpdateBranches(TTree *tree, Bool_t owner)
00345 {
00346
00347 R__LOCKGUARD(fMutexList);
00348
00349 TTreeCache::UpdateBranches(tree, owner);
00350 }
00351
00352
00353
00354
00355
00356
00357
00358
00359 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::GetParallelUnzip()
00360 {
00361
00362
00363
00364 return fgParallel;
00365 }
00366
00367
00368 Bool_t TTreeCacheUnzip::IsParallelUnzip()
00369 {
00370
00371
00372
00373 if (fgParallel == kEnable || fgParallel == kForce)
00374 return kTRUE;
00375
00376 return kFALSE;
00377 }
00378
00379
00380 Bool_t TTreeCacheUnzip::IsActiveThread()
00381 {
00382
00383
00384
00385 R__LOCKGUARD(fMutexList);
00386
00387 return fActiveThread;
00388 }
00389
00390
00391 Bool_t TTreeCacheUnzip::IsQueueEmpty()
00392 {
00393
00394
00395 R__LOCKGUARD(fMutexList);
00396
00397 if ( fIsLearning )
00398 return kTRUE;
00399
00400 return kFALSE;
00401 }
00402
00403 void TTreeCacheUnzip::WaitUnzipStartSignal()
00404 {
00405
00406
00407 fUnzipStartCondition->TimedWaitRelative(2000);
00408
00409 }
00410
00411 void TTreeCacheUnzip::SendUnzipStartSignal(Bool_t broadcast)
00412 {
00413
00414
00415
00416 if (gDebug > 0) Info("SendSignal", " fUnzipCondition->Signal()");
00417
00418 if (broadcast)
00419 fUnzipStartCondition->Broadcast();
00420 else
00421 fUnzipStartCondition->Signal();
00422 }
00423
00424
00425 Int_t TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option)
00426 {
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436 if(fgParallel == kEnable || fgParallel == kForce || fgParallel == kDisable) {
00437 fgParallel = option;
00438 return 1;
00439 }
00440 return 0;
00441 }
00442
00443
00444 struct TTreeCacheUnzipData {
00445 TTreeCacheUnzip *inst;
00446 Int_t cnt;
00447 };
00448
00449
00450 Int_t TTreeCacheUnzip::StartThreadUnzip(Int_t nthreads)
00451 {
00452
00453
00454
00455
00456 Int_t nt = nthreads;
00457 if (nt > 10) nt = 10;
00458
00459 if (gDebug > 0)
00460 Info("StartThreadUnzip", "Going to start %d threads.", nt);
00461
00462 for (Int_t i = 0; i < nt; i++) {
00463 if (!fUnzipThread[i]) {
00464 TString nm("UnzipLoop");
00465 nm += i;
00466
00467 if (gDebug > 0)
00468 Info("StartThreadUnzip", "Going to start thread '%s'", nm.Data());
00469
00470 TTreeCacheUnzipData *d = new TTreeCacheUnzipData;
00471 d->inst = this;
00472 d->cnt = i;
00473
00474 fUnzipThread[i] = new TThread(nm.Data(), UnzipLoop, (void*)d);
00475 if (!fUnzipThread[i])
00476 Error("TTreeCacheUnzip::StartThreadUnzip", " Unable to create new thread.");
00477
00478 fUnzipThread[i]->Run();
00479
00480
00481 fActiveThread=kTRUE;
00482
00483 }
00484 }
00485
00486 return (fActiveThread == kTRUE);
00487 }
00488
00489
00490 Int_t TTreeCacheUnzip::StopThreadUnzip()
00491 {
00492
00493
00494
00495
00496
00497 fActiveThread = kFALSE;
00498
00499 for (Int_t i = 0; i < 1; i++) {
00500 if(fUnzipThread[i]){
00501
00502 SendUnzipStartSignal(kTRUE);
00503
00504 if (fUnzipThread[i]->Exists()) {
00505 fUnzipThread[i]->Join();
00506 delete fUnzipThread[i];
00507 }
00508 }
00509
00510 }
00511
00512 return 1;
00513 }
00514
00515
00516
00517
00518 void* TTreeCacheUnzip::UnzipLoop(void *arg)
00519 {
00520
00521
00522
00523
00524
00525 TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
00526 TTreeCacheUnzip *unzipMng = d->inst;
00527
00528 TThread::SetCancelOn();
00529 TThread::SetCancelDeferred();
00530
00531 Int_t thrnum = d->cnt;
00532 Int_t startindex = thrnum;
00533 Int_t locbuffsz = 16384;
00534 char *locbuff = new char[16384];
00535 Int_t res = 0;
00536 Int_t myCycle = 0;
00537
00538 while( unzipMng->IsActiveThread() ) {
00539 res = 1;
00540
00541 {
00542 R__LOCKGUARD(unzipMng->fMutexList);
00543 if (myCycle != unzipMng->fCycle) startindex = thrnum;
00544 myCycle = unzipMng->fCycle;
00545 if (unzipMng->fNseek) startindex = startindex % unzipMng->fNseek;
00546 else startindex = -1;
00547 }
00548
00549
00550 if (startindex >= 0)
00551 res = unzipMng->UnzipCache(startindex, locbuffsz, locbuff);
00552
00553 {
00554 R__LOCKGUARD(unzipMng->fMutexList);
00555
00556 if(!unzipMng->IsActiveThread()) break;
00557
00558 if ((res == 1) || (!unzipMng->fIsTransferred)) {
00559 unzipMng->WaitUnzipStartSignal();
00560 startindex = unzipMng->fLastReadPos+3+thrnum;
00561 }
00562 }
00563
00564
00565 }
00566
00567 delete d;
00568 delete [] locbuff;
00569 return (void *)0;
00570 }
00571
00572
00573
00574
00575
00576
00577
00578
00579 Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
00580 {
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595 Version_t versionkey;
00596 Short_t klen;
00597 UInt_t datime;
00598 Int_t nb = 0,olen;
00599 Int_t nread = maxbytes;
00600 frombuf(buf,&nb);
00601 nbytes = nb;
00602 if (nb < 0) return nread;
00603
00604 const Int_t headerSize = 16;
00605 if (nread < headerSize) return nread;
00606 frombuf(buf, &versionkey);
00607 frombuf(buf, &olen);
00608 frombuf(buf, &datime);
00609 frombuf(buf, &klen);
00610 if (!olen) olen = nbytes-klen;
00611 objlen = olen;
00612 keylen = klen;
00613 return nread;
00614 }
00615
00616
00617 void TTreeCacheUnzip::ResetCache()
00618 {
00619
00620
00621
00622
00623
00624
00625
00626
00627 {
00628 R__LOCKGUARD(fMutexList);
00629
00630 if (gDebug > 0)
00631 Info("ResetCache", "Thread: %ld -- Resetting the cache. fNseek:%d fNSeekMax:%d fTotalUnzipBytes:%lld", TThread::SelfId(), fNseek, fNseekMax, fTotalUnzipBytes);
00632
00633
00634 fCycle++;
00635 for (Int_t i = 0; i < fNseekMax; i++) {
00636 if (fUnzipLen) fUnzipLen[i] = 0;
00637 if (fUnzipChunks) {
00638 if (fUnzipChunks[i]) delete [] fUnzipChunks[i];
00639 fUnzipChunks[i] = 0;
00640 }
00641 if (fUnzipStatus) fUnzipStatus[i] = 0;
00642
00643 }
00644
00645 while (fActiveBlks.size()) fActiveBlks.pop();
00646
00647 if(fNseekMax < fNseek){
00648 if (gDebug > 0)
00649 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
00650
00651 Byte_t *aUnzipStatus = new Byte_t[fNseek];
00652 memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
00653
00654 Int_t *aUnzipLen = new Int_t[fNseek];
00655 memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
00656
00657 char **aUnzipChunks = new char *[fNseek];
00658 memset(aUnzipChunks, 0, fNseek*sizeof(char *));
00659
00660 if (fUnzipStatus) delete [] fUnzipStatus;
00661 if (fUnzipLen) delete [] fUnzipLen;
00662 if (fUnzipChunks) delete [] fUnzipChunks;
00663
00664 fUnzipStatus = aUnzipStatus;
00665 fUnzipLen = aUnzipLen;
00666 fUnzipChunks = aUnzipChunks;
00667
00668 fNseekMax = fNseek;
00669 }
00670
00671
00672
00673 fLastReadPos = 0;
00674 fTotalUnzipBytes = 0;
00675 fBlocksToGo = fNseek;
00676 }
00677
00678
00679 SendUnzipStartSignal(kTRUE);
00680
00681
00682 }
00683
00684
00685 Int_t TTreeCacheUnzip::GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
00686 {
00687
00688
00689
00690
00691
00692
00693
00694
00695 Int_t res = 0;
00696 Int_t loc = -1;
00697
00698 {
00699 R__LOCKGUARD(fMutexList);
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712 Int_t myCycle = fCycle;
00713
00714
00715 if (fParallel && !fIsLearning) {
00716
00717
00718 if(fNseekMax < fNseek){
00719 if (gDebug > 0)
00720 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
00721
00722 Byte_t *aUnzipStatus = new Byte_t[fNseek];
00723 memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
00724
00725 Int_t *aUnzipLen = new Int_t[fNseek];
00726 memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
00727
00728 char **aUnzipChunks = new char *[fNseek];
00729 memset(aUnzipChunks, 0, fNseek*sizeof(char *));
00730
00731 for (Int_t i = 0; i < fNseekMax; i++) {
00732 aUnzipStatus[i] = fUnzipStatus[i];
00733 aUnzipLen[i] = fUnzipLen[i];
00734 aUnzipChunks[i] = fUnzipChunks[i];
00735 }
00736
00737 if (fUnzipStatus) delete [] fUnzipStatus;
00738 if (fUnzipLen) delete [] fUnzipLen;
00739 if (fUnzipChunks) delete [] fUnzipChunks;
00740
00741 fUnzipStatus = aUnzipStatus;
00742 fUnzipLen = aUnzipLen;
00743 fUnzipChunks = aUnzipChunks;
00744
00745 fNseekMax = fNseek;
00746 }
00747
00748
00749
00750
00751
00752
00753
00754 loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);
00755 if ( (fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc]) ) {
00756
00757
00758
00759 Int_t seekidx = fSeekIndex[loc];
00760
00761 fLastReadPos = seekidx;
00762
00763 do {
00764
00765
00766
00767
00768
00769
00770
00771 if ((fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0)) {
00772
00773
00774
00775
00776 if(!(*buf)) {
00777 *buf = fUnzipChunks[seekidx];
00778 fUnzipChunks[seekidx] = 0;
00779 fTotalUnzipBytes -= fUnzipLen[seekidx];
00780 SendUnzipStartSignal(kFALSE);
00781 *free = kTRUE;
00782 }
00783 else {
00784 memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
00785 delete fUnzipChunks[seekidx];
00786 fTotalUnzipBytes -= fUnzipLen[seekidx];
00787 fUnzipChunks[seekidx] = 0;
00788 SendUnzipStartSignal(kFALSE);
00789 *free = kFALSE;
00790 }
00791
00792 fNFound++;
00793
00794 return fUnzipLen[seekidx];
00795 }
00796
00797
00798
00799 if ( fUnzipStatus[seekidx] == 1 ) {
00800
00801 fUnzipDoneCondition->TimedWaitRelative(200);
00802
00803
00804 if ( myCycle != fCycle ) {
00805 if (gDebug > 0)
00806 Info("GetUnzipBuffer", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
00807 IsActiveThread(), fNseek, fIsLearning);
00808
00809 fLastReadPos = 0;
00810
00811 seekidx = -1;
00812 break;
00813 }
00814
00815 }
00816
00817 } while ( fUnzipStatus[seekidx] == 1 );
00818
00819
00820
00821
00822
00823 if ( (seekidx >= 0) && (fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0) ) {
00824
00825
00826
00827
00828 if(!(*buf)) {
00829 *buf = fUnzipChunks[seekidx];
00830 fUnzipChunks[seekidx] = 0;
00831 fTotalUnzipBytes -= fUnzipLen[seekidx];
00832 SendUnzipStartSignal(kFALSE);
00833 *free = kTRUE;
00834 }
00835 else {
00836 memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
00837 delete fUnzipChunks[seekidx];
00838 fTotalUnzipBytes -= fUnzipLen[seekidx];
00839 fUnzipChunks[seekidx] = 0;
00840 SendUnzipStartSignal(kFALSE);
00841 *free = kFALSE;
00842 }
00843
00844
00845 fNStalls++;
00846
00847 return fUnzipLen[seekidx];
00848 }
00849 else {
00850
00851
00852 fUnzipStatus[seekidx] = 2;
00853 fUnzipChunks[seekidx] = 0;
00854
00855 if ((fTotalUnzipBytes < fUnzipBufferSize) && fBlocksToGo)
00856 SendUnzipStartSignal(kFALSE);
00857
00858
00859
00860 }
00861
00862
00863
00864 } else {
00865 loc = -1;
00866
00867 fIsTransferred = kFALSE;
00868 }
00869
00870 } else {
00871
00872
00873
00874 }
00875
00876 }
00877
00878 if (len > fCompBufferSize) {
00879 delete [] fCompBuffer;
00880 fCompBuffer = new char[len];
00881 fCompBufferSize = len;
00882 } else {
00883 if (fCompBufferSize > len*4) {
00884 delete [] fCompBuffer;
00885 fCompBuffer = new char[len*2];
00886 fCompBufferSize = len*2;
00887 }
00888 }
00889
00890
00891 {
00892 R__LOCKGUARD(fIOMutex);
00893
00894
00895
00896 res = 0;
00897 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
00898
00899 fFile->Seek(pos);
00900 res = fFile->ReadBuffer(fCompBuffer, len);
00901 }
00902
00903
00904 if (res) res = -1;
00905
00906 }
00907
00908 if (!res) {
00909 res = UnzipBuffer(buf, fCompBuffer);
00910 *free = kTRUE;
00911 }
00912
00913
00914 if (!fIsLearning) {
00915 fNMissed++;
00916 }
00917
00918 return res;
00919
00920 }
00921
00922
00923
00924 void TTreeCacheUnzip::SetUnzipRelBufferSize(Float_t relbufferSize)
00925 {
00926
00927
00928
00929 fgRelBuffSize = relbufferSize;
00930 }
00931
00932
00933
00934 void TTreeCacheUnzip::SetUnzipBufferSize(Long64_t bufferSize)
00935 {
00936
00937
00938 R__LOCKGUARD(fMutexList);
00939
00940 fUnzipBufferSize = bufferSize;
00941 }
00942
00943
00944 Int_t TTreeCacheUnzip::UnzipBuffer(char **dest, char *src)
00945 {
00946
00947
00948
00949
00950
00951
00952
00953 Int_t uzlen = 0;
00954 Bool_t alloc = kFALSE;
00955
00956
00957 const Int_t hlen=128;
00958 Int_t nbytes=0, objlen=0, keylen=0;
00959 GetRecordHeader(src, hlen, nbytes, objlen, keylen);
00960
00961 if (!(*dest)) {
00962
00963 UChar_t *bufcur = (UChar_t *) (src + keylen);
00964 Int_t nin, nbuf;
00965 if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
00966 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
00967 uzlen = -1;
00968 return uzlen;
00969 }
00970 Int_t l = keylen+objlen;
00971 *dest = new char[l];
00972 alloc = kTRUE;
00973 }
00974
00975
00976
00977
00978
00979
00980 Bool_t oldCase = objlen==nbytes-keylen
00981 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel()!=0
00982 && fFile->GetVersion()<=30401;
00983
00984 if (objlen > nbytes-keylen || oldCase) {
00985
00986
00987 memcpy(*dest, src, keylen);
00988 uzlen += keylen;
00989
00990 char *objbuf = *dest + keylen;
00991 UChar_t *bufcur = (UChar_t *) (src + keylen);
00992 Int_t nin, nout, nbuf;
00993 Int_t noutot = 0;
00994
00995 while (1) {
00996 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
00997 if (hc!=0) break;
00998 if (gDebug > 2)
00999 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
01000 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
01001 if (oldCase && (nin > objlen || nbuf > objlen)) {
01002 if (gDebug > 2)
01003 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
01004
01005
01006 memcpy( *dest + keylen, src + keylen, objlen);
01007 uzlen += objlen;
01008 return uzlen;
01009 }
01010
01011 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
01012
01013
01014
01015 if (gDebug > 2)
01016 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
01017 nin, bufcur, nbuf, objbuf, nout);
01018
01019 if (!nout) break;
01020 noutot += nout;
01021 if (noutot >= objlen) break;
01022 bufcur += nin;
01023 objbuf += nout;
01024 }
01025
01026 if (noutot != objlen) {
01027 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
01028 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
01029 uzlen = -1;
01030 if(alloc) delete [] *dest;
01031 *dest = 0;
01032 return uzlen;
01033 }
01034 uzlen += objlen;
01035 } else {
01036 memcpy(*dest, src, keylen);
01037 uzlen += keylen;
01038 memcpy(*dest + keylen, src + keylen, objlen);
01039 uzlen += objlen;
01040 }
01041 return uzlen;
01042 }
01043
01044
01045 Int_t TTreeCacheUnzip::UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
01046 {
01047
01048
01049
01050
01051
01052
01053
01054
01055
01056
01057
01058
01059
01060
01061
01062
01063
01064
01065
01066 Int_t myCycle;
01067 const Int_t hlen=128;
01068 Int_t objlen=0, keylen=0;
01069 Int_t nbytes=0;
01070 Int_t readbuf = 0;
01071
01072 Int_t idxtounzip = -1;
01073 Long64_t rdoffs = 0;
01074 Int_t rdlen = 0;
01075 {
01076 R__LOCKGUARD(fMutexList);
01077
01078 if (!IsActiveThread() || !fNseek || fIsLearning || !fIsTransferred) {
01079 if (gDebug > 0)
01080 Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
01081 IsActiveThread(), fNseek, fIsLearning);
01082 return 1;
01083 }
01084
01085
01086 myCycle = fCycle;
01087
01088
01089 idxtounzip = -1;
01090 rdoffs = 0;
01091 rdlen = 0;
01092 if (fTotalUnzipBytes < fUnzipBufferSize) {
01093
01094
01095 if (fBlocksToGo > 0) {
01096 for (Int_t ii=0; ii < fNseek; ii++) {
01097 Int_t reqi = (startindex+ii) % fNseek;
01098 if (!fUnzipStatus[reqi] && (fSeekLen[reqi] > 256) ) {
01099
01100 fUnzipStatus[reqi] = 1;
01101 idxtounzip = reqi;
01102
01103 rdoffs = fSeek[idxtounzip];
01104 rdlen = fSeekLen[idxtounzip];
01105 break;
01106 }
01107 }
01108 if (idxtounzip < 0) fBlocksToGo = 0;
01109 }
01110
01111
01112 }
01113
01114 }
01115
01116
01117
01118 if (idxtounzip < 0) {
01119 if (gDebug > 0)
01120 Info("UnzipCache", "Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
01121 startindex, fTotalUnzipBytes, fUnzipBufferSize, fNseek );
01122 return 1;
01123 }
01124
01125
01126
01127 startindex = idxtounzip+THREADCNT;
01128
01129
01130 if (!IsActiveThread() || !fNseek || fIsLearning ) {
01131 if (gDebug > 0)
01132 Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
01133 IsActiveThread(), fNseek, fIsLearning);
01134 return 1;
01135 }
01136
01137 Int_t loc = -1;
01138
01139
01140 if(locbuffsz < rdlen) {
01141 if (locbuff) delete [] locbuff;
01142 locbuffsz = rdlen;
01143 locbuff = new char[locbuffsz];
01144
01145 } else
01146 if(locbuffsz > rdlen*3) {
01147 if (locbuff) delete [] locbuff;
01148 locbuffsz = rdlen*2;
01149 locbuff = new char[locbuffsz];
01150
01151 }
01152
01153
01154 if (gDebug > 0)
01155 Info("UnzipCache", "Going to unzip block %d", idxtounzip);
01156
01157 readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
01158
01159 {
01160 R__LOCKGUARD(fMutexList);
01161
01162 if ( (myCycle != fCycle) || !fIsTransferred ) {
01163 if (gDebug > 0)
01164 Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
01165 IsActiveThread(), fNseek, fIsLearning);
01166
01167 fUnzipStatus[idxtounzip] = 2;
01168 fUnzipChunks[idxtounzip] = 0;
01169 fUnzipLen[idxtounzip] = 0;
01170 fUnzipDoneCondition->Signal();
01171
01172 startindex = 0;
01173 return 1;
01174 }
01175
01176
01177 if (readbuf <= 0) {
01178 fUnzipStatus[idxtounzip] = 2;
01179 fUnzipChunks[idxtounzip] = 0;
01180 fUnzipLen[idxtounzip] = 0;
01181 if (gDebug > 0)
01182 Info("UnzipCache", "Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
01183 return -1;
01184 }
01185
01186
01187 GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
01188
01189 Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
01190
01191
01192
01193
01194 if (len > 4*fUnzipBufferSize) {
01195
01196
01197 Info("UnzipCache", "Block %d is too big, skipping.", idxtounzip);
01198
01199 fUnzipStatus[idxtounzip] = 2;
01200 fUnzipChunks[idxtounzip] = 0;
01201 fUnzipLen[idxtounzip] = 0;
01202
01203 fUnzipDoneCondition->Signal();
01204 return 0;
01205 }
01206
01207 }
01208
01209
01210 char *ptr = 0;
01211 Int_t loclen = 0;
01212
01213 loclen = UnzipBuffer(&ptr, locbuff);
01214
01215 if ((loclen > 0) && (loclen == objlen+keylen)) {
01216 R__LOCKGUARD(fMutexList);
01217
01218 if ( (myCycle != fCycle) || !fIsTransferred) {
01219 if (gDebug > 0)
01220 Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
01221 IsActiveThread(), fNseek, fIsLearning);
01222 delete [] ptr;
01223
01224 fUnzipStatus[idxtounzip] = 2;
01225 fUnzipChunks[idxtounzip] = 0;
01226 fUnzipLen[idxtounzip] = 0;
01227
01228 startindex = 0;
01229 fUnzipDoneCondition->Signal();
01230 return 1;
01231 }
01232
01233 fUnzipStatus[idxtounzip] = 2;
01234 fUnzipChunks[idxtounzip] = ptr;
01235 fUnzipLen[idxtounzip] = loclen;
01236 fTotalUnzipBytes += loclen;
01237
01238 fActiveBlks.push(idxtounzip);
01239
01240 if (gDebug > 0)
01241 Info("UnzipCache", "reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
01242 idxtounzip, rdoffs, rdlen, loclen);
01243
01244 fNUnzip++;
01245 }
01246 else {
01247 R__LOCKGUARD(fMutexList);
01248 Info("argh", "loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
01249 fUnzipStatus[idxtounzip] = 2;
01250 fUnzipChunks[idxtounzip] = 0;
01251 fUnzipLen[idxtounzip] = 0;
01252 }
01253
01254
01255 fUnzipDoneCondition->Signal();
01256
01257 return 0;
01258 }
01259
01260 void TTreeCacheUnzip::Print(Option_t* option) const {
01261
01262 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
01263 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
01264 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
01265 printf("Number of hits: %d\n", fNFound);
01266 printf("Number of stalls: %d\n", fNStalls);
01267 printf("Number of misses: %d\n", fNMissed);
01268
01269 TTreeCache::Print(option);
01270 }
01271
01272
01273
01274 Int_t TTreeCacheUnzip::ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) {
01275
01276 R__LOCKGUARD(fIOMutex);
01277 return TTreeCache::ReadBufferExt(buf, pos, len, loc);
01278
01279 }