00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "TPacketizerAdaptive.h"
00036
00037 #include "Riostream.h"
00038 #include "TDSet.h"
00039 #include "TError.h"
00040 #include "TEnv.h"
00041 #include "TEntryList.h"
00042 #include "TEventList.h"
00043 #include "TMap.h"
00044 #include "TMessage.h"
00045 #include "TMonitor.h"
00046 #include "TNtupleD.h"
00047 #include "TObject.h"
00048 #include "TParameter.h"
00049 #include "TPerfStats.h"
00050 #include "TProofDebug.h"
00051 #include "TProof.h"
00052 #include "TProofServ.h"
00053 #include "TSlave.h"
00054 #include "TSocket.h"
00055 #include "TSortedList.h"
00056 #include "TUrl.h"
00057 #include "TClass.h"
00058 #include "TRandom.h"
00059 #include "TMath.h"
00060 #include "TObjString.h"
00061 #include "TList.h"
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077 class TPacketizerAdaptive::TFileStat : public TObject {
00078
00079 private:
00080 Bool_t fIsDone;
00081 TFileNode *fNode;
00082 TDSetElement *fElement;
00083 Long64_t fNextEntry;
00084
00085 public:
00086 TFileStat(TFileNode *node, TDSetElement *elem, TList *file);
00087
00088 Bool_t IsDone() const {return fIsDone;}
00089 Bool_t IsSortable() const { return kTRUE; }
00090 void SetDone() {fIsDone = kTRUE;}
00091 TFileNode *GetNode() const {return fNode;}
00092 TDSetElement *GetElement() const {return fElement;}
00093 Long64_t GetNextEntry() const {return fNextEntry;}
00094 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
00095
00096
00097 Int_t Compare(const TObject* obj) const
00098 {
00099
00100
00101 const TFileStat *fst = dynamic_cast<const TFileStat*>(obj);
00102 if (fst && GetElement() && fst->GetElement()) {
00103 Long64_t ent = GetElement()->GetNum();
00104 Long64_t entfst = fst->GetElement()->GetNum();
00105 if (ent > 0 && entfst > 0) {
00106 if (ent > entfst) {
00107 return 1;
00108 } else if (ent < entfst) {
00109 return -1;
00110 } else {
00111 return 0;
00112 }
00113 }
00114 }
00115
00116 return 0;
00117 }
00118 void Print(Option_t * = 0) const
00119 {
00120 Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---",
00121 fElement ? fElement->GetNum() : -1);
00122 }
00123 };
00124
00125 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files)
00126 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
00127 {
00128
00129 if (files) files->Add(this);
00130 }
00131
00132
00133
00134
00135 class TPacketizerAdaptive::TFileNode : public TObject {
00136
00137 private:
00138 TString fNodeName;
00139 TList *fFiles;
00140 TObject *fUnAllocFileNext;
00141 TList *fActFiles;
00142 TObject *fActFileNext;
00143 Int_t fMySlaveCnt;
00144
00145 Int_t fExtSlaveCnt;
00146
00147 Int_t fRunSlaveCnt;
00148
00149 Long64_t fProcessed;
00150 Long64_t fEvents;
00151
00152 Int_t fStrategy;
00153
00154 TSortedList *fFilesToProcess;
00155
00156 public:
00157 TFileNode(const char *name, Int_t strategy, TSortedList *files);
00158 ~TFileNode() { delete fFiles; delete fActFiles; }
00159
00160 void IncMySlaveCnt() { fMySlaveCnt++; }
00161 Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
00162 void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
00163 void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
00164 Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
00165 void IncRunSlaveCnt() { fRunSlaveCnt++; }
00166 void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
00167 Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; }
00168 Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; }
00169 Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
00170 Bool_t IsSortable() const { return kTRUE; }
00171 Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
00172 void IncProcessed(Long64_t nEvents)
00173 { fProcessed += nEvents; }
00174 Long64_t GetProcessed() const { return fProcessed; }
00175 void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
00176
00177
00178 Long64_t GetEventsLeftPerSlave() const
00179 { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
00180 void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
00181 const char *GetName() const { return fNodeName.Data(); }
00182 Long64_t GetNEvents() const { return fEvents; }
00183
00184 void Print(Option_t * = 0) const
00185 {
00186 TFileStat *fs = 0;
00187 TDSetElement *e = 0;
00188 Int_t nn = 0;
00189 Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
00190 Printf("+++ TFileNode: %s +++", fNodeName.Data());
00191 Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
00192 Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
00193 Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
00194 if (fFiles && fFiles->GetSize() > 0) {
00195 TIter nxf(fFiles);
00196 while ((fs = (TFileStat *) nxf())) {
00197 if ((e = fs->GetElement())) {
00198 Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
00199 e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
00200 } else {
00201 Printf("+++ #%d: no element! ", ++nn);
00202 }
00203 }
00204 }
00205 Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
00206 if (fActFiles && fActFiles->GetSize() > 0) {
00207 TIter nxaf(fActFiles);
00208 while ((fs = (TFileStat *) nxaf())) {
00209 if ((e = fs->GetElement())) {
00210 Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
00211 e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
00212 } else {
00213 Printf("+++ #%d: no element! ", ++nn);
00214 }
00215 }
00216 }
00217 Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
00218 }
00219
00220 void Add(TDSetElement *elem, Bool_t tolist)
00221 {
00222 TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
00223 TFileStat *f = new TFileStat(this, elem, files);
00224 fFiles->Add(f);
00225 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
00226 }
00227
00228 TFileStat *GetNextUnAlloc()
00229 {
00230 TObject *next = fUnAllocFileNext;
00231
00232 if (next != 0) {
00233
00234 fActFiles->Add(next);
00235 if (fActFileNext == 0) fActFileNext = fActFiles->First();
00236
00237
00238 fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
00239 }
00240 return (TFileStat *) next;
00241 }
00242
00243 TFileStat *GetNextActive()
00244 {
00245 TObject *next = fActFileNext;
00246
00247 if (fActFileNext != 0) {
00248 fActFileNext = fActFiles->After(fActFileNext);
00249 if (fActFileNext == 0) fActFileNext = fActFiles->First();
00250 }
00251
00252 return (TFileStat *) next;
00253 }
00254
00255 void RemoveActive(TFileStat *file)
00256 {
00257 if (fActFileNext == file) fActFileNext = fActFiles->After(file);
00258 fActFiles->Remove(file);
00259 if (fFilesToProcess) fFilesToProcess->Remove(file);
00260 if (fActFileNext == 0) fActFileNext = fActFiles->First();
00261 }
00262
00263 Int_t Compare(const TObject *other) const
00264 {
00265
00266
00267
00268
00269
00270
00271 const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
00272 if (!obj) {
00273 Error("Compare", "input is not a TPacketizer::TFileNode object");
00274 return 0;
00275 }
00276
00277
00278
00279 if (fStrategy == 1) {
00280
00281 Int_t myVal = GetRunSlaveCnt();
00282 Int_t otherVal = obj->GetRunSlaveCnt();
00283 if (myVal < otherVal) {
00284 return -1;
00285 } else if (myVal > otherVal) {
00286 return 1;
00287 } else {
00288
00289 if ((fEvents - fProcessed) >
00290 (obj->GetNEvents() - obj->GetProcessed())) {
00291 return -1;
00292 } else {
00293 return 1;
00294 }
00295 }
00296 } else {
00297 Int_t myVal = GetSlaveCnt();
00298 Int_t otherVal = obj->GetSlaveCnt();
00299 if (myVal < otherVal) {
00300 return -1;
00301 } else if (myVal > otherVal) {
00302 return 1;
00303 } else {
00304 return 0;
00305 }
00306 }
00307 }
00308
00309 void Reset()
00310 {
00311 fUnAllocFileNext = fFiles->First();
00312 fActFiles->Clear();
00313 fActFileNext = 0;
00314 fExtSlaveCnt = 0;
00315 fMySlaveCnt = 0;
00316 fRunSlaveCnt = 0;
00317 }
00318 };
00319
00320
00321 TPacketizerAdaptive::TFileNode::TFileNode(const char *name, Int_t strategy, TSortedList *files)
00322 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
00323 fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
00324 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
00325 fStrategy(strategy), fFilesToProcess(files)
00326 {
00327
00328
00329 fFiles->SetOwner();
00330 fActFiles->SetOwner(kFALSE);
00331 }
00332
00333
00334
00335 class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
00336
00337 friend class TPacketizerAdaptive;
00338
00339 private:
00340 TFileNode *fFileNode;
00341 TFileStat *fCurFile;
00342 TDSetElement *fCurElem;
00343 Long64_t fCurProcessed;
00344 Float_t fCurProcTime;
00345 TList *fDSubSet;
00346
00347 public:
00348 TSlaveStat(TSlave *slave);
00349 ~TSlaveStat();
00350 TFileNode *GetFileNode() const { return fFileNode; }
00351 Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
00352 Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
00353 TFileStat *GetCurFile() { return fCurFile; }
00354 void SetFileNode(TFileNode *node) { fFileNode = node; }
00355 void UpdateRates(TProofProgressStatus *st);
00356 Float_t GetAvgRate() { return fStatus->GetRate(); }
00357 Float_t GetCurRate() {
00358 return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
00359 Int_t GetLocalEventsLeft() {
00360 return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
00361 TList *GetProcessedSubSet() { return fDSubSet; }
00362 TProofProgressStatus *GetProgressStatus() { return fStatus; }
00363 TProofProgressStatus *AddProcessed(TProofProgressStatus *st = 0);
00364 };
00365
00366
00367 TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
00368 : fFileNode(0), fCurFile(0), fCurElem(0),
00369 fCurProcessed(0), fCurProcTime(0)
00370 {
00371
00372
00373 fDSubSet = new TList();
00374 fDSubSet->SetOwner();
00375 fSlave = slave;
00376 fStatus = new TProofProgressStatus();
00377
00378
00379 fWrkFQDN = slave->GetName();
00380 if (strcmp(slave->ClassName(), "TSlaveLite")) {
00381 fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN();
00382
00383 if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1")
00384 fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN();
00385 }
00386 PDB(kPacketizer, 2)
00387 Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
00388 }
00389
00390
00391 TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
00392 {
00393
00394
00395 SafeDelete(fDSubSet);
00396 SafeDelete(fStatus);
00397 }
00398
00399
00400 void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
00401 {
00402
00403
00404 if (!st) {
00405 Error("UpdateRates", "no status object!");
00406 return;
00407 }
00408 if (fCurFile->IsDone()) {
00409 fCurProcTime = 0;
00410 fCurProcessed = 0;
00411 } else {
00412 fCurProcTime += st->GetProcTime() - GetProcTime();
00413 fCurProcessed += st->GetEntries() - GetEntriesProcessed();
00414 }
00415 fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
00416 st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
00417 SafeDelete(fStatus);
00418 fStatus = st;
00419 }
00420
00421
00422 TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
00423 {
00424
00425
00426
00427
00428 if (st && fDSubSet && fCurElem) {
00429 if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
00430 fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
00431 fDSubSet->Add(fCurElem);
00432 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
00433 return diff;
00434 } else {
00435 Error("AddProcessed", "processed subset of current elem undefined");
00436 return 0;
00437 }
00438 }
00439
00440
00441
00442 ClassImp(TPacketizerAdaptive)
00443
00444
00445 TPacketizerAdaptive::TPacketizerAdaptive(TDSet *dset, TList *slaves,
00446 Long64_t first, Long64_t num,
00447 TList *input, TProofProgressStatus *st)
00448 : TVirtualPacketizer(input, st)
00449 {
00450
00451
00452 PDB(kPacketizer,1) Info("TPacketizerAdaptive",
00453 "enter (first %lld, num %lld)", first, num);
00454
00455
00456 fSlaveStats = 0;
00457 fUnAllocated = 0;
00458 fActive = 0;
00459 fFileNodes = 0;
00460 fMaxPerfIdx = 1;
00461 fCachePacketSync = kTRUE;
00462 fMaxEntriesRatio = 2.;
00463
00464 fMaxSlaveCnt = -1;
00465 fPacketAsAFraction = 4;
00466 fStrategy = 1;
00467 fFilesToProcess = new TSortedList;
00468 fFilesToProcess->SetOwner(kFALSE);
00469
00470 if (!fProgressStatus) {
00471 Error("TPacketizerAdaptive", "No progress status");
00472 return;
00473 }
00474
00475
00476 Int_t cpsync = -1;
00477 if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) {
00478
00479 cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1);
00480 }
00481 if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
00482
00483
00484
00485 if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
00486
00487 fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.);
00488 }
00489
00490
00491
00492 Int_t strategy = -1;
00493 if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
00494
00495 strategy = gEnv->GetValue("Packetizer.Strategy", 1);
00496 }
00497 if (strategy == 0) {
00498 fStrategy = 0;
00499 Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
00500 } else if (strategy != 1) {
00501 Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
00502 }
00503
00504 Long_t maxSlaveCnt = 0;
00505 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
00506 if (maxSlaveCnt < 1) {
00507 Info("TPacketizerAdaptive",
00508 "The value of PROOF_MaxSlavesPerNode must be grater than 0");
00509 maxSlaveCnt = 0;
00510 }
00511 } else {
00512
00513 Int_t mxslcnt = -1;
00514 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
00515 if (mxslcnt < 1) {
00516 Info("TPacketizerAdaptive",
00517 "The value of PROOF_MaxSlavesPerNode must be grater than 0");
00518 mxslcnt = 0;
00519 }
00520 maxSlaveCnt = (Long_t) mxslcnt;
00521 }
00522 }
00523
00524 if (!maxSlaveCnt)
00525 maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
00526 if (maxSlaveCnt > 0) {
00527 fMaxSlaveCnt = maxSlaveCnt;
00528 Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
00529 fMaxSlaveCnt);
00530 }
00531
00532
00533
00534
00535
00536
00537 fForceLocal = kFALSE;
00538 Int_t forceLocal = 0;
00539 if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
00540 if (forceLocal == 1)
00541 fForceLocal = kTRUE;
00542 else
00543 Info("TPacketizerAdaptive",
00544 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
00545 }
00546
00547
00548
00549
00550
00551
00552
00553 Int_t packetAsAFraction = 0;
00554 if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
00555 if (packetAsAFraction > 0) {
00556 fPacketAsAFraction = packetAsAFraction;
00557 Info("TPacketizerAdaptive",
00558 "using alternate fraction of query time as a packet size: %d",
00559 packetAsAFraction);
00560 } else
00561 Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
00562 }
00563
00564
00565
00566 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
00567 fConfigParams->Add(new TParameter<Double_t>("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
00568 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerStrategy", fStrategy));
00569 fConfigParams->Add(new TParameter<Int_t>("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
00570 fConfigParams->Add(new TParameter<Int_t>("PROOF_ForceLocal", (Int_t)fForceLocal));
00571 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketAsAFraction", fPacketAsAFraction));
00572
00573 Double_t baseLocalPreference = 1.2;
00574 TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference);
00575 fBaseLocalPreference = (Float_t)baseLocalPreference;
00576
00577 fFileNodes = new TList;
00578 fFileNodes->SetOwner();
00579 fUnAllocated = new TList;
00580 fUnAllocated->SetOwner(kFALSE);
00581 fActive = new TList;
00582 fActive->SetOwner(kFALSE);
00583
00584 fValid = kTRUE;
00585
00586
00587
00588
00589
00590 TObjArray *partitions = 0;
00591 TString partitionsStr;
00592 if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0)
00593 partitionsStr = gEnv->GetValue("Packetizer.Partitions", "");
00594 if (!partitionsStr.IsNull()) {
00595 Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data());
00596 partitions = partitionsStr.Tokenize(",");
00597 }
00598
00599
00600 dset->Reset();
00601 TDSetElement *e;
00602 while ((e = (TDSetElement*)dset->Next())) {
00603
00604 if (e->GetValid()) continue;
00605
00606
00607 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
00608 fDataSet = e->GetDataSet();
00609
00610 TUrl url = e->GetFileName();
00611
00612
00613 TString host;
00614 if ( !url.IsValid() ||
00615 (strncmp(url.GetProtocol(),"root", 4) &&
00616 strncmp(url.GetProtocol(),"rfio", 4)) ) {
00617 host = "no-host";
00618 } else {
00619 host = url.GetHostFQDN();
00620 }
00621
00622 if (host.Contains("localhost") || host == "127.0.0.1") {
00623 url.SetHost(gSystem->HostName());
00624 host = url.GetHostFQDN();
00625 }
00626
00627
00628 TString disk;
00629 if (partitions) {
00630 TIter iString(partitions);
00631 TObjString* os = 0;
00632 while ((os = (TObjString *)iString())) {
00633
00634 if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
00635 disk = os->GetName();
00636 break;
00637 }
00638 }
00639 }
00640
00641 TString nodeStr;
00642 if (disk.IsNull())
00643 nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
00644 else
00645 nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
00646 TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
00647
00648 if (node == 0) {
00649 node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
00650 fFileNodes->Add(node);
00651 PDB(kPacketizer,2)
00652 Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
00653 } else {
00654 PDB(kPacketizer,2)
00655 Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
00656 }
00657
00658 node->Add(e, kFALSE);
00659 }
00660
00661 fSlaveStats = new TMap;
00662 fSlaveStats->SetOwner(kFALSE);
00663
00664 TSlave *slave;
00665 TIter si(slaves);
00666 while ((slave = (TSlave*) si.Next())) {
00667 fSlaveStats->Add( slave, new TSlaveStat(slave) );
00668 fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
00669 slave->GetPerfIdx() : fMaxPerfIdx;
00670 }
00671
00672
00673 Reset();
00674
00675 Int_t validateMode = 0;
00676 Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
00677 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
00678 ValidateFiles(dset, slaves, num, byfile);
00679
00680
00681 if (!fValid) return;
00682
00683
00684
00685
00686 Int_t files = 0;
00687 fTotalEntries = 0;
00688 fUnAllocated->Clear();
00689 fActive->Clear();
00690 fFileNodes->Clear();
00691 PDB(kPacketizer,2)
00692 Info("TPacketizerAdaptive",
00693 "processing range: first %lld, num %lld", first, num);
00694
00695 dset->Reset();
00696 Long64_t cur = 0;
00697 while (( e = (TDSetElement*)dset->Next())) {
00698
00699
00700
00701 if (!e->GetValid()) continue;
00702
00703 TUrl url = e->GetFileName();
00704 Long64_t eFirst = e->GetFirst();
00705 Long64_t eNum = e->GetNum();
00706 PDB(kPacketizer,2)
00707 Info("TPacketizerAdaptive",
00708 "processing element: first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
00709
00710 if (!e->GetEntryList()) {
00711
00712 if (cur + eNum < first) {
00713 cur += eNum;
00714 PDB(kPacketizer,2)
00715 Info("TPacketizerAdaptive",
00716 "processing element: skip element cur %lld", cur);
00717 continue;
00718 }
00719
00720
00721 if (num != -1 && (first+num <= cur)) {
00722 cur += eNum;
00723 PDB(kPacketizer,2)
00724 Info("TPacketizerAdaptive",
00725 "processing element: drop element cur %lld", cur);
00726 continue;
00727 }
00728
00729
00730
00731 if (num != -1 && (first+num <= cur+eNum)) {
00732 e->SetNum( first + num - cur );
00733 PDB(kPacketizer,2)
00734 Info("TPacketizerAdaptive",
00735 "processing element: adjust end %lld", first + num - cur);
00736 cur += eNum;
00737 eNum = e->GetNum();
00738 }
00739
00740
00741
00742 if (cur < first) {
00743 e->SetFirst( eFirst + (first - cur) );
00744 e->SetNum( e->GetNum() - (first - cur) );
00745 PDB(kPacketizer,2)
00746 Info("TPacketizerAdaptive",
00747 "processing element: adjust start %lld and end %lld",
00748 eFirst + (first - cur), first + num - cur);
00749 cur += eNum;
00750 eNum = e->GetNum();
00751 }
00752 } else {
00753 TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
00754 if (enl) {
00755 eNum = enl->GetN();
00756 } else {
00757 TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
00758 eNum = evl ? evl->GetN() : eNum;
00759 }
00760 if (!eNum)
00761 continue;
00762 }
00763 PDB(kPacketizer,2)
00764 Info("TPacketizerAdaptive",
00765 "processing element: next cur %lld", cur);
00766
00767
00768 TString host;
00769 if ( !url.IsValid() ||
00770 (strncmp(url.GetProtocol(),"root", 4) &&
00771 strncmp(url.GetProtocol(),"rfio", 4)) ) {
00772 host = "no-host";
00773 } else {
00774 host = url.GetHostFQDN();
00775 }
00776
00777 if (host.Contains("localhost") || host == "127.0.0.1") {
00778 url.SetHost(gSystem->HostName());
00779 host = url.GetHostFQDN();
00780 }
00781
00782
00783 TString disk;
00784 if (partitions) {
00785 TIter iString(partitions);
00786 TObjString* os = 0;
00787 while ((os = (TObjString *)iString())) {
00788
00789 if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
00790 disk = os->GetName();
00791 break;
00792 }
00793 }
00794 }
00795
00796 TString nodeStr;
00797 if (disk.IsNull())
00798 nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
00799 else
00800 nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
00801 TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
00802
00803
00804 if (node == 0) {
00805 node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
00806 fFileNodes->Add( node );
00807 PDB(kPacketizer, 2)
00808 Info("TPacketizerAdaptive", "creating new node '%s' for element", nodeStr.Data());
00809 } else {
00810 PDB(kPacketizer, 2)
00811 Info("TPacketizerAdaptive", "adding element to exiting node '%s'", nodeStr.Data());
00812 }
00813
00814 ++files;
00815 fTotalEntries += eNum;
00816 node->Add(e, kTRUE);
00817 node->IncEvents(eNum);
00818 PDB(kPacketizer,2) e->Print("a");
00819 }
00820 PDB(kPacketizer,1)
00821 Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
00822 fTotalEntries, files, fFileNodes->GetSize());
00823
00824
00825 if (gPerfStats)
00826 gPerfStats->SetNumEvents(fTotalEntries);
00827
00828 Reset();
00829
00830 InitStats();
00831
00832 if (!fValid)
00833 SafeDelete(fProgress);
00834
00835 PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
00836 }
00837
00838
00839 TPacketizerAdaptive::~TPacketizerAdaptive()
00840 {
00841
00842
00843 if (fSlaveStats) {
00844 fSlaveStats->DeleteValues();
00845 }
00846
00847 SafeDelete(fSlaveStats);
00848 SafeDelete(fUnAllocated);
00849 SafeDelete(fActive);
00850 SafeDelete(fFileNodes);
00851 SafeDelete(fFilesToProcess);
00852 }
00853
00854
00855 void TPacketizerAdaptive::InitStats()
00856 {
00857
00858
00859
00860
00861
00862 Int_t noRemoteFiles = 0;
00863 fNEventsOnRemLoc = 0;
00864 Int_t totalNumberOfFiles = 0;
00865 TIter next(fFileNodes);
00866 while (TFileNode *fn = (TFileNode*)next()) {
00867 totalNumberOfFiles += fn->GetNumberOfFiles();
00868 if (fn->GetMySlaveCnt() == 0) {
00869 noRemoteFiles += fn->GetNumberOfFiles();
00870 fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
00871 }
00872 }
00873
00874 if (totalNumberOfFiles == 0) {
00875 Info("InitStats", "no valid or non-empty file found: setting invalid");
00876
00877 fValid = kFALSE;
00878 return;
00879 }
00880
00881 fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
00882 Info("InitStats",
00883 "fraction of remote files %f", fFractionOfRemoteFiles);
00884
00885 if (!fValid)
00886 SafeDelete(fProgress);
00887
00888 PDB(kPacketizer,1) Info("InitStats", "return");
00889 }
00890
00891
00892 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const char *nodeHostName)
00893 {
00894
00895
00896
00897
00898 TFileStat *file = 0;
00899
00900 if (node != 0) {
00901 PDB(kPacketizer, 2)
00902 Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
00903 file = node->GetNextUnAlloc();
00904 if (file == 0) RemoveUnAllocNode(node);
00905 } else {
00906 if (nodeHostName && strlen(nodeHostName) > 0) {
00907
00908 TFileNode *fn;
00909
00910 fUnAllocated->Sort();
00911 PDB(kPacketizer,2) fUnAllocated->Print();
00912
00913
00914 for (int i = 0; i < fUnAllocated->GetSize(); i++) {
00915
00916 if ((fn = (TFileNode *) fUnAllocated->At(i))) {
00917 TUrl uu(fn->GetName());
00918 PDB(kPacketizer, 2)
00919 Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
00920
00921
00922 if (!strcmp(nodeHostName, uu.GetHost())) {
00923 node = fn;
00924
00925
00926 if ((file = node->GetNextUnAlloc()) == 0) {
00927 RemoveUnAllocNode(node);
00928 node = 0;
00929 } else {
00930 PDB(kPacketizer, 2)
00931 Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
00932 break;
00933 }
00934 }
00935 } else {
00936 Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i);
00937 }
00938 }
00939
00940 if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
00941
00942 PDB(kPacketizer,1)
00943 Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
00944 node = 0;
00945 }
00946 }
00947
00948 if (node == 0) {
00949 while (file == 0 && ((node = NextNode()) != 0)) {
00950 PDB(kPacketizer, 2)
00951 Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
00952 if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
00953 }
00954 }
00955 }
00956
00957 if (file != 0) {
00958
00959 if (fActive->FindObject(node) == 0) {
00960 fActive->Add(node);
00961 }
00962 }
00963
00964 return file;
00965 }
00966
00967
00968 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
00969 {
00970
00971
00972
00973 fUnAllocated->Sort();
00974 PDB(kPacketizer,2) {
00975 fUnAllocated->Print();
00976 }
00977
00978 TFileNode *fn = (TFileNode*) fUnAllocated->First();
00979 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
00980
00981 PDB(kPacketizer,1)
00982 Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
00983 fn = 0;
00984 }
00985
00986 return fn;
00987 }
00988
00989
00990 void TPacketizerAdaptive::RemoveUnAllocNode(TFileNode * node)
00991 {
00992
00993
00994 fUnAllocated->Remove(node);
00995 }
00996
00997
00998 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
00999 {
01000
01001
01002 TFileNode *node;
01003 TFileStat *file = 0;
01004
01005 while (file == 0 && ((node = NextActiveNode()) != 0)) {
01006 file = node->GetNextActive();
01007 if (file == 0) RemoveActiveNode(node);
01008 }
01009
01010 return file;
01011 }
01012
01013
01014
01015 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
01016 {
01017
01018
01019 fActive->Sort();
01020 PDB(kPacketizer,2) {
01021 Info("NextActiveNode", "enter");
01022 fActive->Print();
01023 }
01024
01025 TFileNode *fn = (TFileNode*) fActive->First();
01026
01027 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
01028 PDB(kPacketizer,1)
01029 Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
01030 fn = 0;
01031 }
01032
01033 return fn;
01034 }
01035
01036
01037 void TPacketizerAdaptive::RemoveActive(TFileStat *file)
01038 {
01039
01040
01041 TFileNode *node = file->GetNode();
01042
01043 node->RemoveActive(file);
01044 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
01045 }
01046
01047
01048 void TPacketizerAdaptive::RemoveActiveNode(TFileNode *node)
01049 {
01050
01051
01052 fActive->Remove(node);
01053 }
01054
01055
01056 void TPacketizerAdaptive::Reset()
01057 {
01058
01059
01060 fUnAllocated->Clear();
01061 fUnAllocated->AddAll(fFileNodes);
01062
01063 fActive->Clear();
01064
01065 TIter files(fFileNodes);
01066 TFileNode *fn;
01067 while ((fn = (TFileNode*) files.Next()) != 0) {
01068 fn->Reset();
01069 }
01070
01071 TIter slaves(fSlaveStats);
01072 TObject *key;
01073 while ((key = slaves.Next()) != 0) {
01074 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
01075
01076
01077 TFileNode *fnmin = 0;
01078 Int_t fncnt = fSlaveStats->GetSize();
01079 files.Reset();
01080 while ((fn = (TFileNode*) files.Next()) != 0) {
01081 if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
01082 if (fn->GetMySlaveCnt() < fncnt) {
01083 fnmin = fn;
01084 fncnt = fn->GetMySlaveCnt();
01085 }
01086 }
01087 }
01088 if (fnmin != 0 ) {
01089 slstat->SetFileNode(fnmin);
01090 fnmin->IncMySlaveCnt();
01091 PDB(kPacketizer, 2)
01092 Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
01093 fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
01094 }
01095 slstat->fCurFile = 0;
01096 }
01097 }
01098
01099
01100 void TPacketizerAdaptive::ValidateFiles(TDSet *dset, TList *slaves,
01101 Long64_t maxent, Bool_t byfile)
01102 {
01103
01104
01105
01106 TMap slaves_by_sock;
01107 TMonitor mon;
01108 TList workers;
01109
01110
01111
01112
01113 workers.AddAll(slaves);
01114 TIter si(slaves);
01115 TSlave *slm;
01116 while ((slm = (TSlave*)si.Next()) != 0) {
01117 PDB(kPacketizer,3)
01118 Info("ValidateFiles","socket added to monitor: %p (%s)",
01119 slm->GetSocket(), slm->GetName());
01120 mon.Add(slm->GetSocket());
01121 slaves_by_sock.Add(slm->GetSocket(), slm);
01122 }
01123
01124 mon.DeActivateAll();
01125
01126 ((TProof*)gProof)->DeActivateAsyncInput();
01127
01128
01129 ((TProof*)gProof)->fCurrentMonitor = &mon;
01130
01131
01132 if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
01133
01134
01135 TString msg("Validating files");
01136 UInt_t n = 0;
01137 UInt_t tot = dset->GetListOfElements()->GetSize();
01138 Bool_t st = kTRUE;
01139
01140 Long64_t totent = 0, nopenf = 0;
01141 while (kTRUE) {
01142
01143
01144 while (TSlave *s = (TSlave *)workers.First()) {
01145
01146 workers.Remove(s);
01147
01148
01149
01150 TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
01151 TFileNode *node = 0;
01152 TFileStat *file = 0;
01153
01154
01155 if ((node = slstat->GetFileNode()) != 0) {
01156 file = GetNextUnAlloc(node);
01157 if (file == 0)
01158 slstat->SetFileNode(0);
01159 }
01160
01161
01162 if (file == 0)
01163 file = GetNextUnAlloc();
01164
01165 if (file != 0) {
01166
01167 RemoveActive(file);
01168
01169 slstat->fCurFile = file;
01170 TDSetElement *elem = file->GetElement();
01171 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
01172 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
01173
01174 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
01175 TMessage m(kPROOF_GETENTRIES);
01176 m << dset->IsTree()
01177 << TString(elem->GetFileName())
01178 << TString(elem->GetDirectory())
01179 << TString(elem->GetObjName());
01180
01181 s->GetSocket()->Send( m );
01182 mon.Activate(s->GetSocket());
01183 PDB(kPacketizer,2)
01184 Info("ValidateFiles",
01185 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
01186 s->GetOrdinal(), s->GetName(), s->GetSocket(),
01187 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
01188 elem->GetDirectory(), elem->GetObjName());
01189 } else {
01190
01191 elem->SetTDSetOffset(entries);
01192 if (entries > 0) {
01193
01194 elem->SetValid();
01195 if (!elem->GetEntryList()) {
01196 if (elem->GetFirst() > entries) {
01197 Error("ValidateFiles",
01198 "first (%lld) higher then number of entries (%lld) in %s",
01199 elem->GetFirst(), entries, elem->GetFileName());
01200
01201 slstat->fCurFile->SetDone();
01202 elem->Invalidate();
01203 dset->SetBit(TDSet::kSomeInvalid);
01204 }
01205 if (elem->GetNum() == -1) {
01206 elem->SetNum(entries - elem->GetFirst());
01207 } else if (elem->GetFirst() + elem->GetNum() > entries) {
01208 Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
01209 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
01210 entries, elem->GetFileName());
01211 elem->SetNum(entries - elem->GetFirst());
01212 }
01213 PDB(kPacketizer,2)
01214 Info("ValidateFiles",
01215 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
01216 }
01217 }
01218
01219 totent += entries;
01220 nopenf++;
01221
01222 n++;
01223 gProof->SendDataSetStatus(msg, n, tot, st);
01224
01225
01226 workers.Add(s);
01227 }
01228 }
01229 }
01230
01231
01232 if (mon.GetActive() == 0) {
01233 if (byfile && maxent > 0) {
01234
01235 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
01236 if (nrestf <= 0 && maxent > totent) nrestf = 1;
01237 if (nrestf > 0) {
01238 PDB(kPacketizer,3)
01239 Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files",
01240 maxent, totent, nopenf, nrestf);
01241 si.Reset();
01242 while ((slm = (TSlave *) si.Next()) && nrestf--) {
01243 workers.Add(slm);
01244 }
01245 continue;
01246 } else {
01247 PDB(kPacketizer,3)
01248 Info("ValidateFiles", "no need to validate more files");
01249 break;
01250 }
01251 } else {
01252 break;
01253 }
01254 }
01255
01256 PDB(kPacketizer,3) {
01257 Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
01258 TList *act = mon.GetListOfActives();
01259 TIter next(act);
01260 while (TSocket *s = (TSocket*) next()) {
01261 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
01262 if (sl)
01263 Info("ValidateFiles", " worker-%s (%s)",
01264 sl->GetOrdinal(), sl->GetName());
01265 }
01266 delete act;
01267 }
01268
01269 TSocket *sock = mon.Select();
01270
01271 if (!sock) {
01272 Error("ValidateFiles", "selection has been interrupted - STOP");
01273 mon.DeActivateAll();
01274 fValid = kFALSE;
01275 break;
01276 }
01277 mon.DeActivate(sock);
01278
01279 PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
01280
01281 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
01282 if (!sock->IsValid()) {
01283
01284 Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
01285 slave->GetOrdinal(), slave->GetName());
01286 ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
01287 fValid = kFALSE;
01288 break;
01289 }
01290
01291 TMessage *reply;
01292
01293 if (sock->Recv(reply) <= 0) {
01294
01295 Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
01296 slave->GetOrdinal(), slave->GetName());
01297
01298 ((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
01299 fValid = kFALSE;
01300 continue;
01301 }
01302
01303 if (reply->What() != kPROOF_GETENTRIES) {
01304
01305 Int_t what = reply->What();
01306 ((TProof*)gProof)->HandleInputMessage(slave, reply);
01307 if (what == kPROOF_FATAL) {
01308 Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
01309 slave->GetOrdinal(), slave->GetName());
01310 fValid = kFALSE;
01311 } else {
01312
01313 mon.Activate(sock);
01314 }
01315
01316 continue;
01317 }
01318
01319 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
01320 TDSetElement *e = slavestat->fCurFile->GetElement();
01321 slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
01322 Long64_t entries;
01323
01324 (*reply) >> entries;
01325
01326
01327 if ((reply->BufferSize() > reply->Length())) {
01328 TString objname;
01329 (*reply) >> objname;
01330 e->SetTitle(objname);
01331 }
01332
01333 e->SetTDSetOffset(entries);
01334 if (entries > 0) {
01335
01336
01337 e->SetValid();
01338
01339 if (!e->GetEntryList()) {
01340 if (e->GetFirst() > entries) {
01341 Error("ValidateFiles",
01342 "first (%lld) higher then number of entries (%lld) in %s",
01343 e->GetFirst(), entries, e->GetFileName());
01344
01345
01346 slavestat->fCurFile->SetDone();
01347 e->Invalidate();
01348 dset->SetBit(TDSet::kSomeInvalid);
01349 }
01350
01351 if (e->GetNum() == -1) {
01352 e->SetNum(entries - e->GetFirst());
01353 } else if (e->GetFirst() + e->GetNum() > entries) {
01354 Error("ValidateFiles",
01355 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
01356 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
01357 e->SetNum(entries - e->GetFirst());
01358 }
01359 }
01360
01361
01362 totent += entries;
01363 nopenf++;
01364
01365
01366 n++;
01367 gProof->SendDataSetStatus(msg, n, tot, st);
01368
01369 } else {
01370
01371 Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() );
01372
01373
01374
01375
01376 if (gProofServ) {
01377 TMessage m(kPROOF_MESSAGE);
01378 m << TString(Form("Cannot get entries for file: %s - skipping",
01379 e->GetFileName()));
01380 gProofServ->GetSocket()->Send(m);
01381 }
01382
01383
01384 e->Invalidate();
01385 dset->SetBit(TDSet::kSomeInvalid);
01386 }
01387 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
01388
01389
01390 if (maxent < 0 || ((totent < maxent) && !byfile))
01391 workers.Add(slave);
01392 }
01393
01394
01395
01396 ((TProof*)gProof)->ActivateAsyncInput();
01397
01398
01399 ((TProof*)gProof)->fCurrentMonitor = 0;
01400
01401
01402 if (!fValid)
01403 return;
01404
01405
01406 Long64_t offset = 0;
01407 Long64_t newOffset = 0;
01408 TIter next(dset->GetListOfElements());
01409 TDSetElement *el;
01410 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
01411 if (el->GetValid()) {
01412 newOffset = offset + el->GetTDSetOffset();
01413 el->SetTDSetOffset(offset);
01414 offset = newOffset;
01415 }
01416 }
01417 }
01418
01419
01420 Int_t TPacketizerAdaptive::CalculatePacketSize(TObject *slStatPtr, Long64_t cachesz, Int_t learnent)
01421 {
01422
01423
01424 Long64_t num;
01425 if (fStrategy == 0) {
01426
01427
01428 Int_t nslaves = fSlaveStats->GetSize();
01429 if (nslaves > 0) {
01430 num = fTotalEntries / (fPacketAsAFraction * nslaves);
01431 } else {
01432 num = 1;
01433 }
01434 } else {
01435
01436
01437
01438 TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
01439 Float_t rate = slstat->GetCurRate();
01440 if (!rate)
01441 rate = slstat->GetAvgRate();
01442 if (rate) {
01443
01444
01445 Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize()));
01446 Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
01447
01448
01449 Float_t bevt = GetBytesRead() / GetEntriesProcessed();
01450
01451
01452
01453
01454
01455 Bool_t cpsync = fCachePacketSync;
01456 if (fMaxEntriesRatio > 0. && cpsync) {
01457 if (fFilesToProcess && fFilesToProcess->GetSize() <= fSlaveStats->GetSize()) {
01458 Long64_t remEntries = fTotalEntries - GetEntriesProcessed();
01459 Long64_t maxEntries = -1;
01460 if (fFilesToProcess->Last()) {
01461 TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement();
01462 if (elem) maxEntries = elem->GetNum();
01463 }
01464 if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
01465 PDB(kPacketizer,3) {
01466 Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
01467 Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
01468 slstat->GetOrdinal(), fFilesToProcess->GetSize(),
01469 (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
01470 }
01471 cpsync = kFALSE;
01472 }
01473 }
01474 }
01475 if (cachesz > 0 && cpsync) {
01476 if ((Long64_t)(rate * packetTime * bevt) < cachesz)
01477 packetTime = cachesz / bevt / rate;
01478 }
01479
01480
01481 if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
01482 if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
01483
01484
01485 num = (Long64_t)(rate * packetTime);
01486
01487
01488 PDB(kPacketizer,2)
01489 Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
01490 slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
01491 packetTime, num*bevt/1048576., cachesz/1048576., num);
01492
01493 } else {
01494
01495
01496 num = (learnent > 0) ? 5 * learnent : 1000;
01497
01498
01499 PDB(kPacketizer,2)
01500 Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num);
01501 }
01502 }
01503 if (num < 1) num = 1;
01504 return num;
01505 }
01506
01507
01508 Int_t TPacketizerAdaptive::AddProcessed(TSlave *sl,
01509 TProofProgressStatus *status,
01510 Double_t latency,
01511 TList **listOfMissingFiles)
01512 {
01513
01514
01515
01516
01517
01518 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
01519 if (!slstat) {
01520 Error("AddProcessed", "TSlaveStat instance for worker %s not found!",
01521 (sl ? sl->GetName() : "**undef**"));
01522 return -1;
01523 }
01524
01525
01526
01527 if ( slstat->fCurElem != 0 ) {
01528 Long64_t expectedNumEv = slstat->fCurElem->GetNum();
01529
01530 Long64_t numev;
01531 if (status && status->GetEntries() > 0)
01532 numev = status->GetEntries() - slstat->GetEntriesProcessed();
01533 else
01534 numev = 0;
01535
01536
01537 TProofProgressStatus *progress = 0;
01538 if (numev > 0) {
01539
01540 progress = slstat->AddProcessed(status);
01541 if (progress) {
01542 (*fProgressStatus) += *progress;
01543
01544 slstat->UpdateRates(status);
01545 }
01546 } else {
01547 progress = new TProofProgressStatus();
01548 }
01549 if (progress) {
01550 PDB(kPacketizer,2)
01551 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
01552 sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
01553 progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
01554
01555 if (gPerfStats)
01556 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
01557 slstat->fCurElem->GetFileName(),
01558 progress->GetEntries(),
01559 latency,
01560 progress->GetProcTime(),
01561 progress->GetCPUTime(),
01562 progress->GetBytesRead());
01563 delete progress;
01564 }
01565 if (numev != expectedNumEv) {
01566
01567
01568
01569
01570 TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
01571 if (newPacket && numev < expectedNumEv) {
01572 Long64_t first = newPacket->GetFirst();
01573 newPacket->SetFirst(first + numev);
01574 if (listOfMissingFiles && *listOfMissingFiles)
01575 ReassignPacket(newPacket, listOfMissingFiles);
01576 else
01577 Error("AddProcessed", "No list for missing files!");
01578 } else
01579 Error("AddProcessed", "Processed too much? (%lld, %lld)", numev, expectedNumEv);
01580
01581
01582
01583
01584
01585
01586
01587
01588
01589 }
01590
01591 slstat->fCurElem = 0;
01592 return (expectedNumEv - numev);
01593 } else {
01594
01595
01596 return -1;
01597 }
01598 }
01599
01600
01601 TDSetElement *TPacketizerAdaptive::GetNextPacket(TSlave *sl, TMessage *r)
01602 {
01603
01604
01605
01606
01607
01608
01609
01610
01611 if ( !fValid ) {
01612 return 0;
01613 }
01614
01615
01616
01617 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
01618 if (!slstat) {
01619 Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
01620 (sl ? sl->GetName() : "**undef**"));
01621 return 0;
01622 }
01623
01624
01625 TFileStat *file = slstat->fCurFile;
01626
01627
01628
01629 Bool_t firstPacket = kFALSE;
01630 Long64_t cachesz = -1;
01631 Int_t learnent = -1;
01632 Long64_t totalEntries = 0;
01633 if ( slstat->fCurElem != 0 ) {
01634
01635 Double_t latency, proctime, proccpu;
01636 TProofProgressStatus *status = 0;
01637
01638 if (sl->GetProtocol() > 18) {
01639
01640 (*r) >> latency;
01641 (*r) >> status;
01642
01643 if (sl->GetProtocol() > 25) {
01644 (*r) >> cachesz >> learnent;
01645 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
01646 }
01647
01648 } else {
01649
01650 Long64_t bytesRead = -1;
01651
01652 (*r) >> latency >> proctime >> proccpu;
01653
01654 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
01655 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
01656 Long64_t totev = 0;
01657 if (r->BufferSize() > r->Length()) (*r) >> totev;
01658
01659 status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
01660 }
01661
01662 if (totalEntries >= 0) {
01663 if (AddProcessed(sl, status, latency))
01664 Error("GetNextPacket", "the worker processed a different # of entries");
01665 if (fProgressStatus->GetEntries() >= fTotalEntries) {
01666 if (fProgressStatus->GetEntries() > fTotalEntries)
01667 Error("GetNextPacket", "Processed too many entries! (%lld, %lld)", fProgressStatus->GetEntries(), fTotalEntries);
01668
01669 HandleTimer(0);
01670 SafeDelete(fProgress);
01671 }
01672 } else if (file && file->GetElement()) {
01673 Info("GetNextPacket", "file '%s' could not be open: invalidating related element",
01674 file->GetElement()->GetName());
01675 file->GetElement()->Invalidate();
01676 file->SetDone();
01677
01678 if (!fFailedPackets) fFailedPackets = new TList();
01679 fFailedPackets->Add(file->GetElement());
01680 }
01681 } else {
01682 firstPacket = kTRUE;
01683 }
01684
01685 if ( fStop ) {
01686 HandleTimer(0);
01687 return 0;
01688 }
01689
01690 TString nodeName;
01691 if (file != 0) nodeName = file->GetNode()->GetName();
01692 TString nodeHostName(slstat->GetName());
01693
01694 PDB(kPacketizer,3)
01695 Info("GetNextPacket", "%s: looking for a packet from node '%s'", sl->GetOrdinal(), nodeName.Data());
01696
01697
01698 if ( file != 0 && file->IsDone() ) {
01699 file->GetNode()->DecExtSlaveCnt(slstat->GetName());
01700 file->GetNode()->DecRunSlaveCnt();
01701 if (gPerfStats)
01702 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
01703 file->GetElement()->GetFileName(), kFALSE);
01704 file = 0;
01705 }
01706
01707 slstat->fCurFile = file;
01708
01709 Long64_t avgEventsLeftPerSlave =
01710 (fTotalEntries - fProgressStatus->GetEntries()) / fSlaveStats->GetSize();
01711 if (fTotalEntries == fProgressStatus->GetEntries())
01712 return 0;
01713
01714 if ( file == 0) {
01715
01716 Bool_t openLocal;
01717
01718 Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
01719 (0.4 *(fTotalEntries - fProgressStatus->GetEntries())));
01720 if ( slstat->GetFileNode() != 0 ) {
01721
01722 fUnAllocated->Sort();
01723 TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
01724 Bool_t nonLocalNodePossible;
01725 if (fForceLocal)
01726 nonLocalNodePossible = 0;
01727 else
01728 nonLocalNodePossible = firstNonLocalNode ?
01729 (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
01730 : 0;
01731 openLocal = !nonLocalNodePossible;
01732 Float_t slaveRate = slstat->GetAvgRate();
01733 if ( nonLocalNodePossible && fStrategy == 1) {
01734
01735 if ( slstat->GetFileNode()->GetRunSlaveCnt() >
01736 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
01737
01738
01739 openLocal = kTRUE;
01740 else if ( slaveRate == 0 ) {
01741
01742
01743 if ( slstat->GetLocalEventsLeft() * localPreference
01744 > (avgEventsLeftPerSlave))
01745 openLocal = kTRUE;
01746 else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
01747 < slstat->GetLocalEventsLeft() * localPreference )
01748 openLocal = kTRUE;
01749 else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
01750 openLocal = kTRUE;
01751 else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
01752 openLocal = kTRUE;
01753 } else {
01754
01755 Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
01756
01757 Float_t avgTime = avgEventsLeftPerSlave
01758 /(fProgressStatus->GetEntries()/GetCumProcTime());
01759 if (slaveTime * localPreference > avgTime)
01760 openLocal = kTRUE;
01761 else if ((firstNonLocalNode->GetEventsLeftPerSlave())
01762 < slstat->GetLocalEventsLeft() * localPreference)
01763 openLocal = kTRUE;
01764 }
01765 }
01766 if (openLocal || fStrategy == 0) {
01767
01768 file = slstat->GetFileNode()->GetNextUnAlloc();
01769 if (!file)
01770 file = slstat->GetFileNode()->GetNextActive();
01771 if ( file == 0 ) {
01772
01773 slstat->SetFileNode(0);
01774 }
01775 }
01776 }
01777
01778
01779 if(file == 0 && !fForceLocal) {
01780 file = GetNextUnAlloc(0, nodeHostName);
01781 }
01782
01783
01784 if(file == 0 && !fForceLocal) {
01785 file = GetNextActive();
01786 }
01787
01788 if ( file == 0 ) return 0;
01789
01790 PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
01791
01792 slstat->fCurFile = file;
01793
01794 if (file->GetNode()->GetMySlaveCnt() == 0 &&
01795 file->GetElement()->GetFirst() == file->GetNextEntry()) {
01796 fNEventsOnRemLoc -= file->GetElement()->GetNum();
01797 if (fNEventsOnRemLoc < 0) {
01798 Error("GetNextPacket",
01799 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
01800 fNEventsOnRemLoc);
01801 return 0;
01802 }
01803 }
01804 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
01805 file->GetNode()->IncRunSlaveCnt();
01806 if (gPerfStats)
01807 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
01808 file->GetNode()->GetName(),
01809 file->GetElement()->GetFileName(), kTRUE);
01810 }
01811
01812 Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
01813
01814
01815
01816 TDSetElement *base = file->GetElement();
01817 Long64_t first = file->GetNextEntry();
01818 Long64_t last = base->GetFirst() + base->GetNum();
01819
01820
01821
01822
01823 if ( first + num * 1.5 >= last ) {
01824 num = last - first;
01825 file->SetDone();
01826
01827
01828 RemoveActive(file);
01829
01830 }
01831
01832
01833 file->MoveNextEntry(num);
01834
01835 slstat->fCurElem = CreateNewPacket(base, first, num);
01836 if (base->GetEntryList())
01837 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
01838
01839
01840 if (firstPacket)
01841 slstat->fCurElem->SetBit(TDSetElement::kNewRun);
01842 else
01843 slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
01844
01845 PDB(kPacketizer,2)
01846 Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
01847
01848 return slstat->fCurElem;
01849 }
01850
01851
01852 Int_t TPacketizerAdaptive::GetActiveWorkers()
01853 {
01854
01855
01856 Int_t actw = 0;
01857 TIter nxw(fSlaveStats);
01858 TObject *key;
01859 while ((key = nxw())) {
01860 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01861 if (wrkstat && wrkstat->fCurFile) actw++;
01862 }
01863
01864 return actw;
01865 }
01866
01867
01868 Float_t TPacketizerAdaptive::GetCurrentRate(Bool_t &all)
01869 {
01870
01871
01872
01873 all = kTRUE;
01874
01875 Float_t currate = 0.;
01876 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
01877 TIter nxw(fSlaveStats);
01878 TObject *key;
01879 while ((key = nxw()) != 0) {
01880 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01881 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
01882
01883 currate += slstat->GetProgressStatus()->GetCurrentRate();
01884 } else {
01885 all = kFALSE;
01886 }
01887 }
01888 }
01889
01890 return currate;
01891 }
01892
01893
01894 Int_t TPacketizerAdaptive::GetEstEntriesProcessed(Float_t t, Long64_t &ent,
01895 Long64_t &bytes, Long64_t &calls)
01896 {
01897
01898
01899
01900
01901
01902
01903
01904 ent = GetEntriesProcessed();
01905 bytes = GetBytesRead();
01906 calls = GetReadCalls();
01907
01908
01909 if (fUseEstOpt == kEstOff)
01910
01911 return 0;
01912 Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
01913
01914 TTime tnow = gSystem->Now();
01915 Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
01916 Double_t dt = -1;
01917
01918
01919 Bool_t all = kTRUE;
01920 Float_t trate = 0.;
01921 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
01922 ent = 0;
01923 TIter nxw(fSlaveStats);
01924 TObject *key;
01925 while ((key = nxw()) != 0) {
01926 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01927 if (slstat) {
01928
01929 Long64_t e = slstat->GetEntriesProcessed();
01930 if (e <= 0) all = kFALSE;
01931
01932 dt = now - slstat->GetProgressStatus()->GetLastUpdate();
01933
01934 Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
01935 : slstat->GetAvgRate();
01936 trate += rate;
01937
01938 e += (Long64_t) (dt * rate);
01939
01940 ent += e;
01941
01942 PDB(kPacketizer,3)
01943 Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
01944 slstat->fSlave->GetOrdinal(),
01945 slstat->GetEntriesProcessed(), rate, dt, e);
01946 }
01947 }
01948 }
01949
01950 dt = now - fProgressStatus->GetLastUpdate();
01951 PDB(kPacketizer,2)
01952 Info("GetEstEntriesProcessed",
01953 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
01954 dt, ent, GetEntriesProcessed(), bytes, trate, all);
01955
01956
01957 ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
01958 ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
01959 bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
01960
01961
01962 return ((all) ? 0 : 1);
01963 }
01964
01965
01966 void TPacketizerAdaptive::MarkBad(TSlave *s, TProofProgressStatus *status,
01967 TList **listOfMissingFiles)
01968 {
01969
01970
01971
01972
01973
01974
01975
01976
01977
01978 TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
01979 if (!slaveStat) {
01980 Error("MarkBad", "Worker does not exist");
01981 return;
01982 }
01983
01984 if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
01985 slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
01986 slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
01987 }
01988
01989
01990
01991 if (!status) {
01992
01993 TList *subSet = slaveStat->GetProcessedSubSet();
01994 if (subSet) {
01995
01996 if (slaveStat->fCurElem) {
01997 subSet->Add(slaveStat->fCurElem);
01998 }
01999
02000 Int_t nmg = 0, ntries = 100;
02001 TDSetElement *e = 0, *enxt = 0;
02002 do {
02003 nmg = 0;
02004 e = (TDSetElement *) subSet->First();
02005 while ((enxt = (TDSetElement *) subSet->After(e))) {
02006 if (e->MergeElement(enxt) >= 0) {
02007 nmg++;
02008 subSet->Remove(enxt);
02009 delete enxt;
02010 } else {
02011 e = enxt;
02012 }
02013 }
02014 } while (nmg > 0 && --ntries > 0);
02015
02016 SplitPerHost(subSet, listOfMissingFiles);
02017
02018 subSet->SetOwner(0);
02019 } else {
02020 Warning("MarkBad", "subset processed by bad worker not found!");
02021 }
02022 (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
02023 }
02024
02025 fSlaveStats->Remove(s);
02026 delete slaveStat;
02027
02028 InitStats();
02029 }
02030
02031
02032 Int_t TPacketizerAdaptive::ReassignPacket(TDSetElement *e,
02033 TList **listOfMissingFiles)
02034 {
02035
02036
02037
02038 if (!e) {
02039 Error("ReassignPacket", "Empty packet!");
02040 return -1;
02041 }
02042
02043 TUrl url = e->GetFileName();
02044
02045
02046 TString host;
02047 if ( !url.IsValid() ||
02048 (strncmp(url.GetProtocol(),"root", 4) &&
02049 strncmp(url.GetProtocol(),"rfio", 4))) {
02050 host = "no-host";
02051 } else {
02052 host = url.GetHost();
02053 }
02054
02055
02056
02057 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
02058 if (node) {
02059
02060 node->DecreaseProcessed(e->GetNum());
02061 node->Add(e, kFALSE);
02062 if (!fUnAllocated->FindObject(node))
02063 fUnAllocated->Add(node);
02064 return 0;
02065 } else {
02066
02067 TFileInfo *fi = e->GetFileInfo();
02068 if (listOfMissingFiles)
02069 (*listOfMissingFiles)->Add((TObject *)fi);
02070 return -1;
02071 }
02072 }
02073
02074
02075 void TPacketizerAdaptive::SplitPerHost(TList *elements,
02076 TList **listOfMissingFiles)
02077 {
02078
02079
02080
02081
02082 if (!elements) {
02083 Error("SplitPerHost", "Empty list of packets!");
02084 return;
02085 }
02086 if (elements->GetSize() <= 0) {
02087 Error("SplitPerHost", "The input list contains no elements");
02088 return;
02089 }
02090 TIter subSetIter(elements);
02091 TDSetElement *e;
02092 while ((e = (TDSetElement*) subSetIter.Next())) {
02093 if (ReassignPacket(e, listOfMissingFiles) == -1) {
02094
02095 if (elements->Remove(e))
02096 Error("SplitPerHost", "Error removing a missing file");
02097 delete e;
02098 }
02099
02100 }
02101 }