00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "TPacketizer.h"
00028
00029 #include "Riostream.h"
00030 #include "TDSet.h"
00031 #include "TEnv.h"
00032 #include "TError.h"
00033 #include "TEventList.h"
00034 #include "TEntryList.h"
00035 #include "TMap.h"
00036 #include "TMessage.h"
00037 #include "TMonitor.h"
00038 #include "TNtupleD.h"
00039 #include "TObject.h"
00040 #include "TParameter.h"
00041 #include "TPerfStats.h"
00042 #include "TProofDebug.h"
00043 #include "TProof.h"
00044 #include "TProofPlayer.h"
00045 #include "TProofServ.h"
00046 #include "TSlave.h"
00047 #include "TSocket.h"
00048 #include "TTimer.h"
00049 #include "TUrl.h"
00050 #include "TClass.h"
00051 #include "TMath.h"
00052 #include "TObjString.h"
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 class TPacketizer::TFileStat : public TObject {
00069
00070 private:
00071 Bool_t fIsDone;
00072 TFileNode *fNode;
00073 TDSetElement *fElement;
00074 Long64_t fNextEntry;
00075
00076 public:
00077 TFileStat(TFileNode *node, TDSetElement *elem);
00078
00079 Bool_t IsDone() const {return fIsDone;}
00080 void SetDone() {fIsDone = kTRUE;}
00081 TFileNode *GetNode() const {return fNode;}
00082 TDSetElement *GetElement() const {return fElement;}
00083 Long64_t GetNextEntry() const {return fNextEntry;}
00084 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
00085 };
00086
00087
00088 TPacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
00089 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
00090 {
00091 }
00092
00093
00094
00095
00096 class TPacketizer::TFileNode : public TObject {
00097
00098 private:
00099 TString fNodeName;
00100 TList *fFiles;
00101 TObject *fUnAllocFileNext;
00102 TList *fActFiles;
00103 TObject *fActFileNext;
00104 Int_t fMySlaveCnt;
00105 Int_t fSlaveCnt;
00106
00107 public:
00108 TFileNode(const char *name);
00109 ~TFileNode() { delete fFiles; delete fActFiles; }
00110
00111 void IncMySlaveCnt() { fMySlaveCnt++; }
00112 void IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
00113 void DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
00114 Int_t GetSlaveCnt() const {return fMySlaveCnt + fSlaveCnt;}
00115 Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
00116 Bool_t IsSortable() const { return kTRUE; }
00117
00118 const char *GetName() const { return fNodeName.Data(); }
00119
00120 void Add(TDSetElement *elem)
00121 {
00122 TFileStat *f = new TFileStat(this,elem);
00123 fFiles->Add(f);
00124 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
00125 }
00126
00127 TFileStat *GetNextUnAlloc()
00128 {
00129 TObject *next = fUnAllocFileNext;
00130
00131 if (next != 0) {
00132
00133 fActFiles->Add(next);
00134 if (fActFileNext == 0) fActFileNext = fActFiles->First();
00135
00136
00137 fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
00138 }
00139
00140 return (TFileStat *) next;
00141 }
00142
00143 TFileStat *GetNextActive()
00144 {
00145 TObject *next = fActFileNext;
00146
00147 if (fActFileNext != 0) {
00148 fActFileNext = fActFiles->After(fActFileNext);
00149 if (fActFileNext == 0) fActFileNext = fActFiles->First();
00150 }
00151
00152 return (TFileStat *) next;
00153 }
00154
00155 void RemoveActive(TFileStat *file)
00156 {
00157 if (fActFileNext == file) fActFileNext = fActFiles->After(file);
00158 fActFiles->Remove(file);
00159 if (fActFileNext == 0) fActFileNext = fActFiles->First();
00160 }
00161
00162 Int_t Compare(const TObject *other) const
00163 {
00164
00165
00166 const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
00167 if (!obj) {
00168 Error("Compare", "input is not a TPacketizer::TFileNode object");
00169 return 0;
00170 }
00171
00172 Int_t myVal = GetSlaveCnt();
00173 Int_t otherVal = obj->GetSlaveCnt();
00174 if (myVal < otherVal) {
00175 return -1;
00176 } else if (myVal > otherVal) {
00177 return 1;
00178 } else {
00179 return 0;
00180 }
00181 }
00182
00183 void Print(Option_t *) const
00184 {
00185 cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
00186 << "\tMySlaveCount " << fMySlaveCnt
00187 << "\tSlaveCount " << fSlaveCnt << endl;
00188 }
00189
00190 void Reset()
00191 {
00192 fUnAllocFileNext = fFiles->First();
00193 fActFiles->Clear();
00194 fActFileNext = 0;
00195 fSlaveCnt = 0;
00196 fMySlaveCnt = 0;
00197 }
00198 };
00199
00200
00201 TPacketizer::TFileNode::TFileNode(const char *name)
00202 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
00203 fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
00204 {
00205
00206
00207 fFiles->SetOwner();
00208 fActFiles->SetOwner(kFALSE);
00209 }
00210
00211
00212
00213
00214 class TPacketizer::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
00215
00216 friend class TPacketizer;
00217
00218 private:
00219 TFileNode *fFileNode;
00220 TFileStat *fCurFile;
00221 TDSetElement *fCurElem;
00222 TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
00223 public:
00224 TSlaveStat(TSlave *slave);
00225 ~TSlaveStat();
00226
00227 TFileNode *GetFileNode() const { return fFileNode; }
00228
00229 void SetFileNode(TFileNode *node) { fFileNode = node; }
00230 };
00231
00232
00233 TPacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
00234 : fFileNode(0), fCurFile(0), fCurElem(0)
00235 {
00236 fSlave = slave;
00237 fStatus = new TProofProgressStatus();
00238 }
00239
00240
00241 TPacketizer::TSlaveStat::~TSlaveStat()
00242 {
00243
00244
00245 SafeDelete(fStatus);
00246 }
00247
00248 TProofProgressStatus *TPacketizer::TSlaveStat::AddProcessed(TProofProgressStatus *st)
00249 {
00250
00251
00252
00253 if (st) {
00254
00255 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
00256
00257 fStatus->SetLastProcTime(0.);
00258
00259 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
00260 *fStatus += *diff;
00261
00262 fStatus->SetLastEntries(lastEntries);
00263 return diff;
00264 } else {
00265 Error("AddProcessed", "status arg undefined");
00266 return 0;
00267 }
00268 }
00269
00270
00271
00272 ClassImp(TPacketizer)
00273
00274
00275 TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first,
00276 Long64_t num, TList *input, TProofProgressStatus *st)
00277 : TVirtualPacketizer(input, st)
00278 {
00279
00280
00281 PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
00282
00283
00284 fSlaveStats = 0;
00285 fPackets = 0;
00286 fSlaveStats = 0;
00287 fUnAllocated = 0;
00288 fActive = 0;
00289 fFileNodes = 0;
00290 fMaxPerfIdx = 1;
00291 fMaxSlaveCnt = 0;
00292
00293 if (!fProgressStatus) {
00294 Error("TPacketizerAdaptive", "No progress status");
00295 return;
00296 }
00297
00298 Long_t maxSlaveCnt = 0;
00299 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
00300 if (maxSlaveCnt < 1) {
00301 Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be grater than 0");
00302 maxSlaveCnt = 0;
00303 }
00304 } else {
00305
00306 Int_t mxslcnt = -1;
00307 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
00308 if (mxslcnt < 1) {
00309 Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be grater than 0");
00310 mxslcnt = 0;
00311 }
00312 maxSlaveCnt = (Long_t) mxslcnt;
00313 }
00314 }
00315 if (!maxSlaveCnt)
00316 maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 4);
00317 if (maxSlaveCnt > 0) {
00318 fMaxSlaveCnt = maxSlaveCnt;
00319 PDB(kPacketizer,1)
00320 Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
00321 }
00322
00323 fPackets = new TList;
00324 fPackets->SetOwner();
00325
00326 fFileNodes = new TList;
00327 fFileNodes->SetOwner();
00328 fUnAllocated = new TList;
00329 fUnAllocated->SetOwner(kFALSE);
00330 fActive = new TList;
00331 fActive->SetOwner(kFALSE);
00332
00333
00334 fValid = kTRUE;
00335
00336
00337
00338
00339
00340 dset->Reset();
00341 TDSetElement *e;
00342 while ((e = (TDSetElement*)dset->Next())) {
00343 if (e->GetValid()) continue;
00344
00345 TUrl url = e->GetFileName();
00346
00347
00348 TString host;
00349 if ( !url.IsValid() ||
00350 (strncmp(url.GetProtocol(),"root", 4) &&
00351 strncmp(url.GetProtocol(),"rfio", 4)) ) {
00352 host = "no-host";
00353 } else {
00354 host = url.GetHost();
00355 }
00356
00357 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
00358
00359 if (node == 0) {
00360 node = new TFileNode(host);
00361 fFileNodes->Add(node);
00362 }
00363
00364 node->Add( e );
00365 }
00366
00367 fSlaveStats = new TMap;
00368 fSlaveStats->SetOwner(kFALSE);
00369
00370 TSlave *slave;
00371 TIter si(slaves);
00372 while ((slave = (TSlave*) si.Next())) {
00373 fSlaveStats->Add( slave, new TSlaveStat(slave) );
00374 fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
00375 slave->GetPerfIdx() : fMaxPerfIdx;
00376 }
00377
00378
00379 Reset();
00380
00381 Int_t validateMode = 0;
00382 Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
00383 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
00384 ValidateFiles(dset, slaves, num, byfile);
00385
00386 if (!fValid) return;
00387
00388
00389
00390
00391 Int_t files = 0;
00392 fTotalEntries = 0;
00393 fUnAllocated->Clear();
00394 fActive->Clear();
00395 fFileNodes->Clear();
00396 PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
00397
00398 dset->Reset();
00399 Long64_t cur = 0;
00400 while (( e = (TDSetElement*)dset->Next())) {
00401
00402
00403
00404 if (!e->GetValid()) continue;
00405
00406
00407 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
00408 fDataSet = e->GetDataSet();
00409
00410 TUrl url = e->GetFileName();
00411 Long64_t eFirst = e->GetFirst();
00412 Long64_t eNum = e->GetNum();
00413 PDB(kPacketizer,2)
00414 Info("TPacketizer", "processing element: first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
00415
00416 if (!e->GetEntryList()){
00417
00418 if (cur + eNum < first) {
00419 cur += eNum;
00420 PDB(kPacketizer,2)
00421 Info("TPacketizer", "processing element: skip element cur %lld", cur);
00422 continue;
00423 }
00424
00425
00426 if (num != -1 && (first+num <= cur)) {
00427 cur += eNum;
00428 PDB(kPacketizer,2)
00429 Info("TPacketizer", "processing element: drop element cur %lld", cur);
00430 continue;
00431 }
00432
00433
00434
00435 if (num != -1 && (first+num <= cur+eNum)) {
00436 e->SetNum(first + num - cur);
00437 PDB(kPacketizer,2)
00438 Info("TPacketizer", "processing element: adjust end %lld", first + num - cur);
00439 cur += eNum;
00440 eNum = e->GetNum();
00441 }
00442
00443
00444
00445 if (cur < first) {
00446 e->SetFirst(eFirst + (first - cur));
00447 e->SetNum(e->GetNum() - (first - cur));
00448 PDB(kPacketizer,2)
00449 Info("TPacketizer", "processing element: adjust start %lld and end %lld",
00450 eFirst + (first - cur), first + num - cur);
00451 cur += eNum;
00452 eNum = e->GetNum();
00453 }
00454
00455 } else {
00456 TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
00457 if (enl) {
00458 eNum = enl->GetN();
00459 } else {
00460 TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
00461 eNum = evl ? evl->GetN() : eNum;
00462 }
00463 if (!eNum)
00464 continue;
00465 }
00466 PDB(kPacketizer,2)
00467 Info("TPacketizer", "processing element: next cur %lld", cur);
00468
00469
00470 TString host;
00471 if ( !url.IsValid() ||
00472 (strncmp(url.GetProtocol(),"root", 4) &&
00473 strncmp(url.GetProtocol(),"rfio", 4)) ) {
00474 host = "no-host";
00475 } else {
00476 host = url.GetHostFQDN();
00477 }
00478
00479 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
00480
00481 if ( node == 0 ) {
00482 node = new TFileNode( host );
00483 fFileNodes->Add( node );
00484 }
00485
00486 ++files;
00487 fTotalEntries += eNum;
00488 node->Add(e);
00489 PDB(kPacketizer,2) e->Print("a");
00490 }
00491
00492 PDB(kPacketizer,1)
00493 Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
00494 fTotalEntries, files, fFileNodes->GetSize());
00495
00496
00497 if (gPerfStats)
00498 gPerfStats->SetNumEvents(fTotalEntries);
00499
00500 Reset();
00501
00502 if (fFileNodes->GetSize() == 0) {
00503 Info("TPacketizer", "no valid or non-empty file found: setting invalid");
00504
00505 fValid = kFALSE;
00506 return;
00507 }
00508
00509
00510
00511
00512
00513
00514
00515
00516 Long_t packetAsAFraction = 20;
00517 if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0)
00518 Info("Process", "using alternate fraction of query time as a packet Size: %ld",
00519 packetAsAFraction);
00520 fPacketAsAFraction = (Int_t)packetAsAFraction;
00521
00522 fPacketSize = 1;
00523 if (TProof::GetParameter(input, "PROOF_PacketSize", fPacketSize) == 0) {
00524 Info("Process","using alternate packet size: %lld", fPacketSize);
00525 } else {
00526
00527 Int_t nslaves = fSlaveStats->GetSize();
00528 if (nslaves > 0) {
00529 fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves);
00530 if (fPacketSize < 1) fPacketSize = 1;
00531 } else {
00532 fPacketSize = 1;
00533 }
00534 }
00535
00536 PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
00537
00538 if (!fValid)
00539 SafeDelete(fProgress);
00540
00541 PDB(kPacketizer,1) Info("TPacketizer", "Return");
00542 }
00543
00544
00545 TPacketizer::~TPacketizer()
00546 {
00547
00548
00549 if (fSlaveStats) {
00550 fSlaveStats->DeleteValues();
00551 }
00552
00553 SafeDelete(fPackets);
00554 SafeDelete(fSlaveStats);
00555 SafeDelete(fUnAllocated);
00556 SafeDelete(fActive);
00557 SafeDelete(fFileNodes);
00558 }
00559
00560
00561 TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node)
00562 {
00563
00564
00565 TFileStat *file = 0;
00566
00567 if (node != 0) {
00568 file = node->GetNextUnAlloc();
00569 if (file == 0) RemoveUnAllocNode(node);
00570 } else {
00571 while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
00572 file = node->GetNextUnAlloc();
00573 if (file == 0) RemoveUnAllocNode(node);
00574 }
00575 }
00576
00577 if (file != 0) {
00578
00579 if (fActive->FindObject(node) == 0) {
00580 fActive->Add(node);
00581 }
00582 }
00583
00584 return file;
00585 }
00586
00587
00588 TPacketizer::TFileNode *TPacketizer::NextUnAllocNode()
00589 {
00590
00591
00592 fUnAllocated->Sort();
00593 PDB(kPacketizer,2) {
00594 cout << "TPacketizer::NextUnAllocNode()" << endl;
00595 fUnAllocated->Print();
00596 }
00597
00598 TFileNode *fn = (TFileNode*) fUnAllocated->First();
00599 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
00600 PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
00601 fMaxSlaveCnt);
00602 fn = 0;
00603 }
00604
00605 return fn;
00606 }
00607
00608
00609 void TPacketizer::RemoveUnAllocNode(TFileNode * node)
00610 {
00611
00612
00613 fUnAllocated->Remove(node);
00614 }
00615
00616
00617 TPacketizer::TFileStat *TPacketizer::GetNextActive()
00618 {
00619
00620
00621 TFileNode *node;
00622 TFileStat *file = 0;
00623
00624 while (file == 0 && ((node = NextActiveNode()) != 0)) {
00625 file = node->GetNextActive();
00626 if (file == 0) RemoveActiveNode(node);
00627 }
00628
00629 return file;
00630 }
00631
00632
00633 TPacketizer::TFileNode *TPacketizer::NextActiveNode()
00634 {
00635
00636
00637 fActive->Sort();
00638 PDB(kPacketizer,2) {
00639 cout << "TPacketizer::NextActiveNode()" << endl;
00640 fActive->Print();
00641 }
00642
00643 TFileNode *fn = (TFileNode*) fActive->First();
00644 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
00645 PDB(kPacketizer,1) Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
00646 fn = 0;
00647 }
00648
00649 return fn;
00650 }
00651
00652
00653 void TPacketizer::RemoveActive(TFileStat *file)
00654 {
00655
00656
00657 TFileNode *node = file->GetNode();
00658
00659 node->RemoveActive(file);
00660 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
00661 }
00662
00663
00664 void TPacketizer::RemoveActiveNode(TFileNode *node)
00665 {
00666
00667
00668 fActive->Remove(node);
00669 }
00670
00671
00672 void TPacketizer::Reset()
00673 {
00674
00675
00676 fUnAllocated->Clear();
00677 fUnAllocated->AddAll(fFileNodes);
00678
00679 fActive->Clear();
00680
00681 TIter files(fFileNodes);
00682 TFileNode *fn;
00683 while ((fn = (TFileNode*) files.Next()) != 0) {
00684 fn->Reset();
00685 }
00686
00687 TIter slaves(fSlaveStats);
00688 TObject *key;
00689 while ((key = slaves.Next()) != 0) {
00690 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
00691 fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
00692 if (fn != 0 ) {
00693 slstat->SetFileNode(fn);
00694 fn->IncMySlaveCnt();
00695 }
00696 slstat->fCurFile = 0;
00697 }
00698 }
00699
00700
00701 void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
00702 {
00703
00704
00705
00706 TMap slaves_by_sock;
00707 TMonitor mon;
00708 TList workers;
00709
00710
00711
00712
00713 workers.AddAll(slaves);
00714 TIter si(slaves);
00715 TSlave *slm = 0;
00716 while ((slm = (TSlave*)si.Next()) != 0) {
00717 PDB(kPacketizer,3)
00718 Info("ValidateFiles","socket added to monitor: %p (%s)",
00719 slm->GetSocket(), slm->GetName());
00720 mon.Add(slm->GetSocket());
00721 slaves_by_sock.Add(slm->GetSocket(), slm);
00722 PDB(kPacketizer,1)
00723 Info("ValidateFiles",
00724 "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
00725 }
00726
00727 mon.DeActivateAll();
00728
00729 ((TProof*)gProof)->DeActivateAsyncInput();
00730
00731
00732 ((TProof*)gProof)->fCurrentMonitor = &mon;
00733
00734
00735 TString msg("Validating files");
00736 UInt_t n = 0;
00737 UInt_t tot = dset->GetListOfElements()->GetSize();
00738 Bool_t st = kTRUE;
00739
00740 Long64_t totent = 0, nopenf = 0;
00741 while (kTRUE) {
00742
00743
00744 while( TSlave *s = (TSlave*)workers.First() ) {
00745
00746 workers.Remove(s);
00747
00748
00749
00750 TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
00751 TFileNode *node = 0;
00752 TFileStat *file = 0;
00753
00754
00755 if ( (node = slstat->GetFileNode()) != 0 ) {
00756 file = GetNextUnAlloc(node);
00757 if ( file == 0 ) {
00758 slstat->SetFileNode(0);
00759 }
00760 }
00761
00762
00763 if (file == 0) {
00764 file = GetNextUnAlloc();
00765 }
00766
00767 if ( file != 0 ) {
00768
00769 RemoveActive(file);
00770
00771 slstat->fCurFile = file;
00772 TDSetElement *elem = file->GetElement();
00773 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
00774 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
00775
00776 file->GetNode()->IncSlaveCnt(slstat->GetName());
00777 TMessage m(kPROOF_GETENTRIES);
00778 m << dset->IsTree()
00779 << TString(elem->GetFileName())
00780 << TString(elem->GetDirectory())
00781 << TString(elem->GetObjName());
00782
00783 s->GetSocket()->Send( m );
00784 mon.Activate(s->GetSocket());
00785 PDB(kPacketizer,2)
00786 Info("ValidateFiles",
00787 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
00788 s->GetOrdinal(), s->GetName(), s->GetSocket(),
00789 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
00790 elem->GetDirectory(), elem->GetObjName());
00791 } else {
00792
00793 elem->SetTDSetOffset(entries);
00794 if (entries > 0) {
00795
00796 elem->SetValid();
00797 if (!elem->GetEntryList()) {
00798 if (elem->GetFirst() > entries) {
00799 Error("ValidateFiles",
00800 "first (%lld) higher then number of entries (%lld) in %s",
00801 elem->GetFirst(), entries, elem->GetFileName());
00802
00803 slstat->fCurFile->SetDone();
00804 elem->Invalidate();
00805 dset->SetBit(TDSet::kSomeInvalid);
00806 }
00807 if (elem->GetNum() == -1) {
00808 elem->SetNum(entries - elem->GetFirst());
00809 } else if (elem->GetFirst() + elem->GetNum() > entries) {
00810 Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
00811 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
00812 entries, elem->GetFileName());
00813 elem->SetNum(entries - elem->GetFirst());
00814 }
00815 PDB(kPacketizer,2)
00816 Info("ValidateFiles",
00817 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
00818 }
00819 }
00820
00821 n++;
00822 gProof->SendDataSetStatus(msg, n, tot, st);
00823
00824
00825 workers.Add(s);
00826 }
00827 }
00828 }
00829
00830
00831 if (mon.GetActive() == 0) {
00832 if (byfile && maxent > 0) {
00833
00834 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
00835 if (nrestf <= 0 && maxent > totent) nrestf = 1;
00836 if (nrestf > 0) {
00837 PDB(kPacketizer,3)
00838 Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files",
00839 maxent, totent, nopenf, nrestf);
00840 si.Reset();
00841 while ((slm = (TSlave *) si.Next()) && nrestf--) {
00842 workers.Add(slm);
00843 }
00844 continue;
00845 } else {
00846 PDB(kPacketizer,3)
00847 Info("ValidateFiles", "no need to validate more files");
00848 break;
00849 }
00850 } else {
00851 break;
00852 }
00853 }
00854
00855 PDB(kPacketizer,3) {
00856 Info("ValidateFiles", "waiting for %d workers:", mon.GetActive());
00857 TList *act = mon.GetListOfActives();
00858 TIter next(act);
00859 TSocket *s = 0;
00860 while ((s = (TSocket*) next())) {
00861 Info("ValidateFiles", "found sck: %p", s);
00862 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
00863 if (sl)
00864 Info("ValidateFiles", " worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
00865 }
00866 delete act;
00867 }
00868
00869 TSocket *sock = mon.Select();
00870
00871 if (!sock) {
00872 Error("ValidateFiles", "selection has been interrupted - STOP");
00873 mon.DeActivateAll();
00874 fValid = kFALSE;
00875 break;
00876 }
00877 mon.DeActivate(sock);
00878
00879 PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
00880
00881 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
00882 if (!sock->IsValid()) {
00883
00884 Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
00885 slave->GetOrdinal(), slave->GetName());
00886 ((TProof*)gProof)->MarkBad(slave);
00887 fValid = kFALSE;
00888 break;
00889 }
00890
00891 TMessage *reply;
00892
00893 if ( sock->Recv(reply) <= 0 ) {
00894
00895 ((TProof*)gProof)->MarkBad(slave);
00896 fValid = kFALSE;
00897 Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
00898 slave->GetOrdinal(), slave->GetName());
00899 continue;
00900 }
00901
00902 if (reply->What() != kPROOF_GETENTRIES) {
00903
00904 Int_t what = reply->What();
00905 ((TProof*)gProof)->HandleInputMessage(slave, reply);
00906 if (what == kPROOF_FATAL) {
00907 Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
00908 slave->GetOrdinal(), slave->GetName());
00909 fValid = kFALSE;
00910 } else {
00911
00912 mon.Activate(sock);
00913 }
00914
00915 continue;
00916 }
00917
00918 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
00919 TDSetElement *e = slavestat->fCurFile->GetElement();
00920 slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
00921 Long64_t entries;
00922
00923 (*reply) >> entries;
00924
00925
00926 if ((reply->BufferSize() > reply->Length())) {
00927 TString objname;
00928 (*reply) >> objname;
00929 e->SetTitle(objname);
00930 }
00931
00932 e->SetTDSetOffset(entries);
00933 if ( entries > 0 ) {
00934
00935
00936 e->SetValid();
00937
00938
00939 if (!e->GetEntryList()){
00940 if ( e->GetFirst() > entries ) {
00941 Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s",
00942 e->GetFirst(), entries, e->GetFileName());
00943
00944
00945 slavestat->fCurFile->SetDone();
00946 e->Invalidate();
00947 dset->SetBit(TDSet::kSomeInvalid);
00948 }
00949
00950 if ( e->GetNum() == -1 ) {
00951 e->SetNum( entries - e->GetFirst() );
00952 } else if ( e->GetFirst() + e->GetNum() > entries ) {
00953 Error("ValidateFiles",
00954 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
00955 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
00956 e->SetNum(entries - e->GetFirst());
00957 }
00958 }
00959
00960
00961 totent += entries;
00962 nopenf++;
00963
00964
00965 n++;
00966 gProof->SendDataSetStatus(msg, n, tot, st);
00967
00968 } else {
00969
00970 Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
00971
00972
00973
00974
00975 if (gProofServ) {
00976 TMessage m(kPROOF_MESSAGE);
00977 m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
00978 gProofServ->GetSocket()->Send(m);
00979 }
00980
00981
00982 e->Invalidate();
00983 dset->SetBit(TDSet::kSomeInvalid);
00984 }
00985 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
00986
00987
00988 if (maxent < 0 || ((totent < maxent) && !byfile))
00989 workers.Add(slave);
00990 }
00991
00992
00993
00994 ((TProof*)gProof)->ActivateAsyncInput();
00995
00996
00997 ((TProof*)gProof)->fCurrentMonitor = 0;
00998
00999
01000 if (!fValid)
01001 return;
01002
01003
01004
01005 Long64_t offset = 0;
01006 Long64_t newOffset = 0;
01007 TIter next(dset->GetListOfElements());
01008 TDSetElement *el;
01009 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
01010 newOffset = offset + el->GetTDSetOffset();
01011 el->SetTDSetOffset(offset);
01012 offset = newOffset;
01013 }
01014 }
01015
01016
01017 Long64_t TPacketizer::GetEntriesProcessed(TSlave *slave) const
01018 {
01019
01020
01021 if ( fSlaveStats == 0 ) return 0;
01022
01023 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
01024
01025 if ( slstat == 0 ) return 0;
01026
01027 return slstat->GetEntriesProcessed();
01028 }
01029
01030
01031 Float_t TPacketizer::GetCurrentRate(Bool_t &all)
01032 {
01033
01034
01035
01036 all = kTRUE;
01037
01038 Float_t currate = 0.;
01039 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
01040 TIter nxw(fSlaveStats);
01041 TObject *key;
01042 while ((key = nxw()) != 0) {
01043 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01044 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
01045
01046 currate += slstat->GetProgressStatus()->GetCurrentRate();
01047 } else {
01048 all = kFALSE;
01049 }
01050 }
01051 }
01052
01053 return currate;
01054 }
01055
01056
01057 TDSetElement *TPacketizer::GetNextPacket(TSlave *sl, TMessage *r)
01058 {
01059
01060
01061 if ( !fValid ) {
01062 return 0;
01063 }
01064
01065
01066
01067 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
01068
01069 R__ASSERT( slstat != 0 );
01070
01071
01072
01073 Bool_t firstPacket = kFALSE;
01074 if ( slstat->fCurElem != 0 ) {
01075 Double_t latency = 0., proctime = 0., proccpu = 0.;
01076 Long64_t bytesRead = -1;
01077 Long64_t totalEntries = -1;
01078 Long64_t totev = 0;
01079 Long64_t numev = slstat->fCurElem->GetNum();
01080
01081 fPackets->Add(slstat->fCurElem);
01082
01083 if (sl->GetProtocol() > 18) {
01084 TProofProgressStatus *status = 0;
01085 (*r) >> latency;
01086 (*r) >> status;
01087
01088
01089 TProofProgressStatus *progress = 0;
01090 if (status) {
01091
01092 numev = status->GetEntries() - slstat->GetEntriesProcessed();
01093 progress = slstat->AddProcessed(status);
01094 if (progress) {
01095
01096 proctime = progress->GetProcTime();
01097 proccpu = progress->GetCPUTime();
01098 totev = status->GetEntries();
01099 bytesRead = progress->GetBytesRead();
01100 delete progress;
01101 }
01102 delete status;
01103 } else
01104 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
01105 } else {
01106
01107 (*r) >> latency >> proctime >> proccpu;
01108
01109
01110 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
01111 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
01112 if (r->BufferSize() > r->Length()) (*r) >> totev;
01113
01114 numev = totev - slstat->GetEntriesProcessed();
01115 slstat->GetProgressStatus()->IncEntries(numev);
01116 slstat->GetProgressStatus()->IncBytesRead(bytesRead);
01117 }
01118
01119 if (fProgressStatus) {
01120 if (numev > 0) fProgressStatus->IncEntries(numev);
01121 if (bytesRead > 0) fProgressStatus->IncBytesRead(bytesRead);
01122 }
01123 PDB(kPacketizer,2)
01124 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
01125 sl->GetOrdinal(), sl->GetName(),
01126 numev, latency, proctime, proccpu, bytesRead);
01127
01128 if (gPerfStats)
01129 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
01130 numev, latency, proctime, proccpu, bytesRead);
01131
01132 slstat->fCurElem = 0;
01133 if (fProgressStatus && fProgressStatus->GetEntries() == fTotalEntries) {
01134 HandleTimer(0);
01135 delete fProgress; fProgress = 0;
01136 }
01137 } else {
01138 firstPacket = kTRUE;
01139 }
01140
01141 if ( fStop ) {
01142 HandleTimer(0);
01143 return 0;
01144 }
01145
01146
01147
01148 TFileStat *file = slstat->fCurFile;
01149
01150 if ( file != 0 && file->IsDone() ) {
01151 file->GetNode()->DecSlaveCnt(slstat->GetName());
01152 if (gPerfStats)
01153 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
01154 file->GetElement()->GetFileName(), kFALSE);
01155 file = 0;
01156 }
01157
01158 slstat->fCurFile = file;
01159
01160 if (!file) {
01161
01162
01163 if (slstat->GetFileNode() != 0) {
01164 file = GetNextUnAlloc(slstat->GetFileNode());
01165 if (!file) {
01166 slstat->SetFileNode(0);
01167 }
01168 }
01169
01170
01171 if (!file) {
01172 file = GetNextUnAlloc();
01173 }
01174
01175
01176 if (!file) {
01177 file = GetNextActive();
01178 }
01179
01180 if (!file) return 0;
01181
01182 slstat->fCurFile = file;
01183 file->GetNode()->IncSlaveCnt(slstat->GetName());
01184 if (gPerfStats)
01185 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
01186 file->GetNode()->GetName(),
01187 file->GetElement()->GetFileName(), kTRUE);
01188 }
01189
01190
01191
01192 TDSetElement *base = file->GetElement();
01193 Long64_t num = Long64_t(fPacketSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
01194 if (num < 1) num = 1;
01195
01196 Long64_t first = file->GetNextEntry();
01197 Long64_t last = base->GetFirst() + base->GetNum();
01198
01199 if ( first + num >= last ) {
01200 num = last - first;
01201 file->SetDone();
01202
01203
01204 RemoveActive(file);
01205
01206 } else {
01207 file->MoveNextEntry(num);
01208 }
01209
01210
01211 slstat->fCurElem = CreateNewPacket(base, first, num);
01212 if (base->GetEntryList())
01213 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
01214
01215
01216 if (firstPacket)
01217 slstat->fCurElem->SetBit(TDSetElement::kNewRun);
01218 else
01219 slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
01220
01221 PDB(kPacketizer,2)
01222 Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
01223
01224 return slstat->fCurElem;
01225 }
01226
01227
01228 Int_t TPacketizer::GetActiveWorkers()
01229 {
01230
01231
01232 Int_t actw = 0;
01233 TIter nxw(fSlaveStats);
01234 TObject *key;
01235 while ((key = nxw())) {
01236 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01237 if (wrkstat && wrkstat->fCurFile) actw++;
01238 }
01239
01240 return actw;
01241 }