00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdClientReadCacheCVSID = "$Id: XrdClientReadCache.cc 31508 2009-12-02 19:11:01Z brun $";
00014
00015 #include "XrdClient/XrdClientReadCache.hh"
00016 #include "XrdSys/XrdSysPthread.hh"
00017 #include "XrdClient/XrdClientDebug.hh"
00018 #include "XrdClient/XrdClientEnv.hh"
00019
00020
00021
00022 XrdClientReadCacheItem::XrdClientReadCacheItem(const void *buffer, long long begin_offs,
00023 long long end_offs, long long ticksnow, bool placeholder)
00024 {
00025
00026 fIsPlaceholder = placeholder;
00027
00028 fData = (void *)0;
00029 if (!fIsPlaceholder)
00030 fData = (void *)buffer;
00031
00032 Touch(ticksnow);
00033 fBeginOffset = begin_offs;
00034 fEndOffset = end_offs;
00035 Pinned = false;
00036 }
00037
00038
00039 XrdClientReadCacheItem::~XrdClientReadCacheItem()
00040 {
00041
00042
00043 if (fData)
00044 free(fData);
00045 }
00046
00047
00048
00049
00050
00051
00052 long long XrdClientReadCache::GetTimestampTick()
00053 {
00054
00055
00056
00057 XrdSysMutexHelper mtx(fMutex);
00058 return ++fTimestampTickCounter;
00059 }
00060
00061
00062 XrdClientReadCache::XrdClientReadCache() : fItems(4096)
00063 {
00064
00065
00066 fTimestampTickCounter = 0;
00067 fTotalByteCount = 0;
00068
00069 fMissRate = 0.0;
00070 fMissCount = 0;
00071 fReadsCounter = 0;
00072
00073 fBytesSubmitted = 0;
00074 fBytesHit = 0;
00075 fBytesUsefulness = 0.0;
00076
00077 fMaxCacheSize = EnvGetLong(NAME_READCACHESIZE);
00078 fBlkRemPolicy = EnvGetLong(NAME_READCACHEBLKREMPOLICY);
00079 }
00080
00081
00082 XrdClientReadCache::~XrdClientReadCache()
00083 {
00084
00085 RemoveItems(false);
00086
00087 }
00088
00089
00090
00091
00092 bool XrdClientReadCache::SubmitRawData(const void *buffer, long long begin_offs,
00093 long long end_offs, bool pinned)
00094 {
00095 if (!buffer) return true;
00096 XrdClientReadCacheItem *itm;
00097
00098 Info(XrdClientDebug::kHIDEBUG, "Cache",
00099 "Submitting " << begin_offs << "->" << end_offs << " to cache" << (pinned ? " as pinned data." : ".") );
00100
00101
00102 XrdSysMutexHelper mtx(fMutex);
00103
00104
00105
00106
00107
00108 RemoveItems(begin_offs, end_offs);
00109 bool spaceok = MakeFreeSpace(end_offs - begin_offs + 1);
00110
00111 if (pinned || spaceok) {
00112
00113
00114
00115
00116
00117
00118
00119 int pos = FindInsertionApprox(begin_offs);
00120 if (fItems.GetSize())
00121 for (; pos >= 0; pos--)
00122 if ((pos < fItems.GetSize()) &&
00123 fItems[pos] && (fItems[pos]->EndOffset() < begin_offs)) break;
00124 if (pos < 0) pos = 0;
00125
00126 for (; pos < fItems.GetSize(); pos++) {
00127
00128 if (!fItems[pos]->IsPlaceholder() && fItems[pos]->ContainsInterval(begin_offs, end_offs)) {
00129 pos = -1;
00130 break;
00131 }
00132 if (fItems[pos]->BeginOffset() >= begin_offs)
00133 break;
00134 }
00135
00136 if (pos >= 0) {
00137 itm = new XrdClientReadCacheItem(buffer, begin_offs, end_offs,
00138 GetTimestampTick());
00139 itm->Pinned = pinned;
00140
00141 fItems.Insert(itm, pos);
00142
00143 if (!pinned) {
00144 fTotalByteCount += itm->Size();
00145 fBytesSubmitted += itm->Size();
00146 }
00147
00148 return true;
00149 }
00150
00151 return false;
00152 }
00153
00154
00155 return false;
00156 }
00157
00158
00159
00160 void XrdClientReadCache::SubmitXMessage(XrdClientMessage *xmsg, long long begin_offs,
00161 long long end_offs)
00162 {
00163
00164
00165 const void *buffer = xmsg->DonateData();
00166
00167 if (!SubmitRawData(buffer, begin_offs, end_offs))
00168 free(const_cast<void *>(buffer));
00169 }
00170
00171
00172
00173
00174 int XrdClientReadCache::FindInsertionApprox(long long begin_offs) {
00175
00176
00177
00178
00179
00180 if (!fItems.GetSize()) return 0;
00181
00182 int pos, i;
00183 pos = FindInsertionApprox_rec(0, fItems.GetSize()-1, begin_offs);
00184
00185 for (i = pos-1; i >= 0; i--) {
00186 if (fItems[i] && (fItems[i]->BeginOffset() >= begin_offs)) pos = i;
00187 else break;
00188 }
00189
00190 return pos;
00191 }
00192
00193
00194
00195 int XrdClientReadCache::FindInsertionApprox_rec(int startidx, int endidx,
00196 long long begin_offs) {
00197
00198
00199
00200
00201 if (endidx - startidx <= 1) {
00202
00203
00204 if (fItems[startidx]->BeginOffset() >= begin_offs) {
00205
00206 return startidx;
00207 }
00208 if (fItems[endidx]->BeginOffset() < begin_offs) {
00209
00210 return endidx+1;
00211 }
00212
00213 return endidx;
00214
00215 }
00216
00217 int pos2 = (endidx + startidx) / 2;
00218
00219 if (fItems[startidx]->BeginOffset() >= begin_offs) {
00220
00221 return startidx;
00222 }
00223 if (fItems[endidx]->BeginOffset() < begin_offs) {
00224
00225 return endidx+1;
00226 }
00227
00228 if (fItems[pos2]->BeginOffset() >= begin_offs) {
00229
00230 return FindInsertionApprox_rec(startidx, pos2, begin_offs);
00231 }
00232
00233 if (fItems[pos2]->BeginOffset() < begin_offs) {
00234
00235 return FindInsertionApprox_rec(pos2, endidx, begin_offs);
00236 }
00237
00238 return endidx;
00239 }
00240
00241
00242 void XrdClientReadCache::PutPlaceholder(long long begin_offs,
00243 long long end_offs)
00244 {
00245
00246
00247 XrdClientReadCacheItem *itm = 0;
00248
00249 {
00250
00251 XrdSysMutexHelper mtx(fMutex);
00252
00253
00254
00255 int pos = FindInsertionApprox(begin_offs);
00256 int p = pos - 1;
00257
00258 if (fItems.GetSize())
00259 for (; p >= 0; p--)
00260 if ((p < fItems.GetSize()) &&
00261 fItems[p] && (fItems[p]->EndOffset() < begin_offs)) break;
00262 if (p < 0) p = 0;
00263
00264 for (; p < fItems.GetSize(); p++) {
00265 if (fItems[p]->ContainsInterval(begin_offs, end_offs)) {
00266 return;
00267 }
00268
00269 if (fItems[p]->BeginOffset() > end_offs)
00270 break;
00271
00272
00273
00274 if ( (fItems[p]->BeginOffset() >= begin_offs) &&
00275 (fItems[p]->BeginOffset() <= end_offs) ) {
00276
00277 itm = 0;
00278 if (begin_offs < fItems[p]->BeginOffset()-1)
00279 itm = new XrdClientReadCacheItem(0, begin_offs, fItems[p]->BeginOffset()-1,
00280 GetTimestampTick(), true);
00281 begin_offs = fItems[p]->EndOffset()+1;
00282 if (itm) {
00283 fItems.Insert(itm, p);
00284
00285
00286 p++;
00287 }
00288
00289 }
00290
00291 if ( (fItems[p]->BeginOffset() <= begin_offs) &&
00292 (fItems[p]->EndOffset() >= begin_offs) ) {
00293
00294 begin_offs = fItems[p]->EndOffset()+1;
00295
00296 }
00297
00298
00299 pos = p+1;
00300
00301 if (begin_offs >= end_offs) return;
00302
00303
00304 }
00305
00306 itm = new XrdClientReadCacheItem(0, begin_offs, end_offs,
00307 GetTimestampTick(), true);
00308 fItems.Insert(itm, pos);
00309
00310 }
00311
00312
00313 }
00314
00315
00316 long XrdClientReadCache::GetDataIfPresent(const void *buffer,
00317 long long begin_offs,
00318 long long end_offs,
00319 bool PerfCalc,
00320 XrdClientIntvList &missingblks,
00321 long &outstandingblks)
00322 {
00323
00324
00325
00326
00327
00328 int it;
00329 long bytesgot = 0;
00330
00331 long long lastseenbyte = begin_offs-1;
00332
00333 outstandingblks = 0;
00334 missingblks.Clear();
00335
00336 XrdSysMutexHelper mtx(fMutex);
00337
00338
00339
00340 if (PerfCalc)
00341 fReadsCounter++;
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355 it = FindInsertionApprox(begin_offs);
00356
00357 if (fItems.GetSize())
00358 for (; it >= 0; it--)
00359 if ((it < fItems.GetSize()) &&
00360 fItems[it] && (fItems[it]->EndOffset() < begin_offs)) break;
00361 if (it < 0) it = 0;
00362
00363 for (; it < fItems.GetSize(); it++) {
00364 long l = 0;
00365
00366 if (!fItems[it]) continue;
00367
00368 if (fItems[it]->BeginOffset() > lastseenbyte+1) break;
00369
00370 if (!fItems[it]->IsPlaceholder())
00371
00372 l = fItems[it]->GetPartialInterval(((char *)buffer)+bytesgot,
00373 begin_offs+bytesgot, end_offs);
00374 else {
00375
00376
00377 if (fItems[it]->GetPartialInterval(0, begin_offs+bytesgot, end_offs) > 0) {
00378
00379 if (fBlkRemPolicy != kRmBlk_FIFO)
00380 fItems[it]->Touch(GetTimestampTick());
00381
00382 outstandingblks++;
00383
00384 }
00385
00386 }
00387
00388 lastseenbyte = xrdmax(lastseenbyte, fItems[it]->EndOffset());
00389
00390 if (l > 0) {
00391 bytesgot += l;
00392
00393 if (fBlkRemPolicy != kRmBlk_FIFO)
00394 fItems[it]->Touch(GetTimestampTick());
00395
00396 if (PerfCalc) {
00397 fBytesHit += l;
00398 UpdatePerfCounters();
00399 }
00400
00401 if (bytesgot >= end_offs - begin_offs + 1) {
00402 return bytesgot;
00403 }
00404
00405 }
00406
00407 }
00408
00409
00410
00411
00412
00413
00414
00415 XrdClientCacheInterval intv;
00416
00417
00418 for (; it < fItems.GetSize(); it++) {
00419 long l;
00420
00421 if (fItems[it]->BeginOffset() > end_offs) break;
00422
00423 if (fItems[it]->BeginOffset() > lastseenbyte+1) {
00424
00425
00426
00427
00428 intv.beginoffs = lastseenbyte+1;
00429 intv.endoffs = fItems[it]->BeginOffset()-1;
00430 missingblks.Push_back( intv );
00431
00432 lastseenbyte = fItems[it]->EndOffset();
00433 if (lastseenbyte >= end_offs) break;
00434 continue;
00435 }
00436
00437
00438 l = fItems[it]->GetPartialInterval(0, lastseenbyte+1, end_offs);
00439
00440 if (l > 0) {
00441
00442
00443
00444 if (fItems[it]->IsPlaceholder()) {
00445
00446 outstandingblks++;
00447
00448 }
00449
00450
00451 lastseenbyte += l;
00452 }
00453
00454
00455 }
00456
00457 if (lastseenbyte+1 <= end_offs) {
00458 intv.beginoffs = lastseenbyte+1;
00459 intv.endoffs = end_offs;
00460 missingblks.Push_back( intv );
00461 }
00462
00463
00464 if (PerfCalc) {
00465 fMissCount++;
00466 UpdatePerfCounters();
00467 }
00468
00469 return bytesgot;
00470 }
00471
00472
00473
00474 void XrdClientReadCache::PrintCache() {
00475
00476 XrdSysMutexHelper mtx(fMutex);
00477 int it;
00478
00479 Info(XrdClientDebug::kUSERDEBUG, "Cache",
00480 "Cache Status --------------------------");
00481
00482 for (it = 0; it < fItems.GetSize(); it++) {
00483
00484 if (fItems[it]) {
00485
00486 if (fItems[it]->IsPlaceholder()) {
00487
00488 Info(XrdClientDebug::kUSERDEBUG,
00489 "Cache blk", it << "Placeholder " <<
00490 fItems[it]->BeginOffset() << "->" << fItems[it]->EndOffset() );
00491
00492 }
00493 else
00494 Info(XrdClientDebug::kUSERDEBUG,
00495 "Cache blk", it << "Data block " <<
00496 fItems[it]->BeginOffset() << "->" << fItems[it]->EndOffset() <<
00497 (fItems[it]->Pinned ? " (pinned) " : "" ) );
00498
00499 }
00500 }
00501
00502 Info(XrdClientDebug::kUSERDEBUG, "Cache",
00503 "-------------------------------------- fTotalByteCount = " << fTotalByteCount );
00504
00505 }
00506
00507 void *XrdClientReadCache::FindBlk(long long begin_offs, long long end_offs) {
00508
00509 int it;
00510 XrdSysMutexHelper mtx(fMutex);
00511
00512 it = FindInsertionApprox(begin_offs);
00513
00514 if (fItems.GetSize())
00515 for (; it >= 0; it--)
00516 if ((it < fItems.GetSize()) &&
00517 fItems[it] && (fItems[it]->EndOffset() < begin_offs)) break;
00518 if (it < 0) it = 0;
00519
00520 while (it < fItems.GetSize()) {
00521 if (fItems[it]) {
00522
00523 if (fItems[it]->BeginOffset() > end_offs) break;
00524
00525 if ((fItems[it]->BeginOffset() == begin_offs) &&
00526 (fItems[it]->EndOffset() == end_offs)) {
00527 return fItems[it]->GetData();
00528 }
00529 else it++;
00530
00531 }
00532 else it++;
00533
00534 }
00535
00536 return 0;
00537
00538 }
00539
00540
00541
00542 void XrdClientReadCache::UnPinCacheBlk(long long begin_offs, long long end_offs) {
00543
00544 int it;
00545 XrdSysMutexHelper mtx(fMutex);
00546
00547 it = FindInsertionApprox(begin_offs);
00548
00549 if (fItems.GetSize())
00550 for (; it >= 0; it--)
00551 if ((it < fItems.GetSize()) &&
00552 fItems[it] && (fItems[it]->EndOffset() < begin_offs)) break;
00553 if (it < 0) it = 0;
00554
00555
00556 while (it < fItems.GetSize()) {
00557 if (fItems[it]) {
00558
00559 if (fItems[it]->BeginOffset() > end_offs) break;
00560
00561 if (fItems[it]->Pinned && fItems[it]->ContainedInInterval(begin_offs, end_offs)) {
00562 fItems[it]->Pinned = false;
00563 fTotalByteCount += fItems[it]->Size();
00564 break;
00565 }
00566 else it++;
00567
00568 }
00569 else it++;
00570
00571 }
00572
00573 }
00574
00575
00576
00577 void XrdClientReadCache::RemoveItems(long long begin_offs, long long end_offs, bool remove_overlapped)
00578 {
00579
00580
00581
00582 int it;
00583 XrdSysMutexHelper mtx(fMutex);
00584
00585 it = FindInsertionApprox(begin_offs);
00586
00587
00588 if (it < fItems.GetSize())
00589 for (; it >= 0; it--)
00590 if ((it < fItems.GetSize()) &&
00591 fItems[it] && (fItems[it]->EndOffset() < begin_offs)) break;
00592 if (it < 0) it = 0;
00593
00594
00595 while (it < fItems.GetSize()) {
00596 if (fItems[it]) {
00597
00598 if (!remove_overlapped) {
00599 if (fItems[it]->BeginOffset() > end_offs) break;
00600
00601 if (!fItems[it]->Pinned && fItems[it]->ContainedInInterval(begin_offs, end_offs)) {
00602
00603 if (!fItems[it]->IsPlaceholder())
00604 fTotalByteCount -= fItems[it]->Size();
00605
00606 delete fItems[it];
00607 fItems.Erase(it);
00608 }
00609 else it++;
00610 }
00611 else {
00612
00613 if (fItems[it]->BeginOffset() > end_offs) break;
00614 if (!fItems[it]->Pinned && !fItems[it]->IsPlaceholder() &&
00615 fItems[it]->IntersectInterval(begin_offs, end_offs)) {
00616
00617 fTotalByteCount -= fItems[it]->Size();
00618
00619 delete fItems[it];
00620 fItems.Erase(it);
00621 }
00622 else it++;
00623
00624 }
00625
00626 }
00627 else it++;
00628
00629 }
00630
00631 bool changed;
00632 it = FindInsertionApprox(begin_offs);
00633 if (fItems.GetSize())
00634 for (; it >= 0; it--)
00635 if ((it < fItems.GetSize()) &&
00636 fItems[it] && (fItems[it]->EndOffset() < begin_offs)) break;
00637 if (it < 0) it = 0;
00638
00639
00640 do {
00641 changed = false;
00642 for (; it < fItems.GetSize(); it++) {
00643
00644
00645 if (fItems[it]) {
00646
00647 if (fItems[it]->BeginOffset() > end_offs) break;
00648
00649 if ( fItems[it]->IsPlaceholder() ) {
00650 long long plc1_beg = 0;
00651 long long plc1_end = 0;
00652
00653 long long plc2_beg = 0;
00654 long long plc2_end = 0;
00655
00656
00657 plc1_beg = fItems[it]->BeginOffset();
00658 plc1_end = begin_offs-1;
00659
00660 plc2_beg = end_offs+1;
00661 plc2_end = fItems[it]->EndOffset();
00662
00663 if ( ( (begin_offs >= fItems[it]->BeginOffset()) &&
00664 (begin_offs <= fItems[it]->EndOffset()) ) ||
00665 ( (end_offs >= fItems[it]->BeginOffset()) &&
00666 (end_offs <= fItems[it]->EndOffset()) ) ) {
00667
00668 delete fItems[it];
00669 fItems.Erase(it);
00670 changed = true;
00671 it--;
00672
00673 if (plc1_end - plc1_beg > 32) {
00674 PutPlaceholder(plc1_beg, plc1_end);
00675 }
00676
00677 if (plc2_end - plc2_beg > 32) {
00678 PutPlaceholder(plc2_beg, plc2_end);
00679 }
00680
00681 break;
00682
00683 }
00684
00685
00686
00687
00688 }
00689
00690 }
00691
00692 }
00693
00694 it = xrdmax(0, it-2);
00695 } while (changed);
00696
00697
00698
00699 }
00700
00701
00702 void XrdClientReadCache::RemoveItems(bool leavepinned)
00703 {
00704
00705
00706
00707
00708 XrdSysMutexHelper mtx(fMutex);
00709 int it = fItems.GetSize()-1;
00710
00711 for (; it >= 0; it--) {
00712 if (!fItems[it]->Pinned) {
00713 fTotalByteCount -= fItems[it]->Size();
00714 delete fItems[it];
00715 fItems.Erase(it, true);
00716 continue;
00717 }
00718
00719 if (fItems[it]->Pinned && !leavepinned) {
00720 delete fItems[it];
00721 fItems.Erase(it, true);
00722 continue;
00723 }
00724 }
00725
00726 if (!leavepinned) fTotalByteCount = 0;
00727
00728 }
00729
00730
00731
00732 void XrdClientReadCache::RemovePlaceholders() {
00733
00734
00735
00736
00737 int it = 0;
00738
00739 XrdSysMutexHelper mtx(fMutex);
00740
00741 if (!fItems.GetSize()) return;
00742
00743 while (1) {
00744
00745 if (fItems[it] && fItems[it]->IsPlaceholder()) {
00746 delete fItems[it];
00747 fItems.Erase(it);
00748 }
00749 else
00750 it++;
00751
00752 if (it == fItems.GetSize()) break;
00753 }
00754
00755 }
00756
00757
00758
00759 bool XrdClientReadCache::RemoveFirstItem()
00760 {
00761
00762
00763
00764 int it, lruit;
00765 XrdClientReadCacheItem *item;
00766
00767 XrdSysMutexHelper mtx(fMutex);
00768
00769 lruit = -1;
00770
00771
00772 lruit = -1;
00773 for (it = 0; it < fItems.GetSize(); it++) {
00774
00775 if (!fItems[it]->IsPlaceholder() && !fItems[it]->Pinned) {
00776 lruit = it;
00777 break;
00778 }
00779 }
00780
00781
00782 if (lruit >= 0)
00783 item = fItems[lruit];
00784 else return false;
00785
00786 fTotalByteCount -= item->Size();
00787 delete item;
00788 fItems.Erase(lruit);
00789
00790
00791 return true;
00792 }
00793
00794
00795
00796 bool XrdClientReadCache::RemoveLRUItem()
00797 {
00798
00799
00800
00801 int it, lruit;
00802 long long minticks = -1;
00803 XrdClientReadCacheItem *item = 0;
00804
00805 XrdSysMutexHelper mtx(fMutex);
00806
00807 lruit = -1;
00808
00809 if (fItems.GetSize() < 1000000)
00810 for (it = 0; it < fItems.GetSize(); it++) {
00811
00812 if (fItems[it] && !fItems[it]->IsPlaceholder() && !fItems[it]->Pinned) {
00813 if ((minticks < 0) || (fItems[it]->GetTimestampTicks() < minticks)) {
00814 minticks = fItems[it]->GetTimestampTicks();
00815 lruit = it;
00816 }
00817 }
00818 }
00819 else {
00820
00821
00822 lruit = -1;
00823 for (it = 0; it < fItems.GetSize(); it++) {
00824
00825 if (!fItems[it]->IsPlaceholder() && !fItems[it]->Pinned) {
00826 lruit = it;
00827 minticks = 0;
00828 break;
00829 }
00830 }
00831 }
00832
00833 if (lruit >= 0)
00834 item = fItems[lruit];
00835 else return false;
00836
00837 if (item) {
00838 fTotalByteCount -= item->Size();
00839 delete item;
00840 fItems.Erase(lruit);
00841 }
00842
00843 return true;
00844 }
00845
00846
00847 bool XrdClientReadCache::RemoveItem() {
00848
00849 switch (fBlkRemPolicy) {
00850
00851 case kRmBlk_LRU:
00852 case kRmBlk_FIFO:
00853 return RemoveLRUItem();
00854
00855 case kRmBlk_LeastOffs:
00856 return RemoveFirstItem();
00857
00858 }
00859
00860 return RemoveLRUItem();
00861 }
00862
00863
00864 bool XrdClientReadCache::MakeFreeSpace(long long bytes)
00865 {
00866
00867
00868 if (!WillFit(bytes))
00869 return false;
00870
00871 XrdSysMutexHelper mtx(fMutex);
00872
00873 while (fMaxCacheSize - fTotalByteCount < bytes)
00874 if (!RemoveItem())
00875 return false;
00876
00877 return true;
00878 }
00879
00880
00881 void XrdClientReadCache::GetInfo(int &size, long long &bytessubmitted,
00882 long long &byteshit,
00883 long long &misscount,
00884 float &missrate,
00885 long long &readreqcnt,
00886 float &bytesusefulness ) {
00887 size = fMaxCacheSize;
00888 bytessubmitted = fBytesSubmitted;
00889 byteshit = fBytesHit;
00890 misscount = fMissCount;
00891 missrate = fMissRate;
00892 readreqcnt = fReadsCounter;
00893 bytesusefulness = fBytesUsefulness;
00894 }