00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "TProofDraw.h"
00030 #include "TProofPlayer.h"
00031 #include "THashList.h"
00032 #include "TEnv.h"
00033 #include "TEventIter.h"
00034 #include "TVirtualPacketizer.h"
00035 #include "TSelector.h"
00036 #include "TSocket.h"
00037 #include "TProofServ.h"
00038 #include "TProof.h"
00039 #include "TProofOutputFile.h"
00040 #include "TProofSuperMaster.h"
00041 #include "TSlave.h"
00042 #include "TClass.h"
00043 #include "TROOT.h"
00044 #include "TError.h"
00045 #include "TException.h"
00046 #include "MessageTypes.h"
00047 #include "TMessage.h"
00048 #include "TDSetProxy.h"
00049 #include "TString.h"
00050 #include "TSystem.h"
00051 #include "TFile.h"
00052 #include "TFileCollection.h"
00053 #include "TFileInfo.h"
00054 #include "TFileMerger.h"
00055 #include "TProofDebug.h"
00056 #include "TTimer.h"
00057 #include "TMap.h"
00058 #include "TPerfStats.h"
00059 #include "TStatus.h"
00060 #include "TEventList.h"
00061 #include "TProofLimitsFinder.h"
00062 #include "TSortedList.h"
00063 #include "TTree.h"
00064 #include "TEntryList.h"
00065 #include "TDSet.h"
00066 #include "TDrawFeedback.h"
00067 #include "TNamed.h"
00068 #include "TObjString.h"
00069 #include "TQueryResult.h"
00070 #include "TMD5.h"
00071 #include "TMethodCall.h"
00072 #include "TObjArray.h"
00073 #include "TMutex.h"
00074 #include "TH1.h"
00075 #include "TVirtualMonitoring.h"
00076 #include "TParameter.h"
00077 #include "TOutputListSelectorDataMap.h"
00078
00079
00080 #define kPEX_STOPPED 1001
00081 #define kPEX_ABORTED 1002
00082
00083
00084
00085 static Bool_t gAbort = kFALSE;
00086
00087 class TAutoBinVal : public TNamed {
00088 private:
00089 Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
00090
00091 public:
00092 TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
00093 Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
00094 {
00095 fXmin = xmin; fXmax = xmax;
00096 fYmin = ymin; fYmax = ymax;
00097 fZmin = zmin; fZmax = zmax;
00098 }
00099 void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
00100 Double_t& ymax, Double_t& zmin, Double_t& zmax)
00101 {
00102 xmin = fXmin; xmax = fXmax;
00103 ymin = fYmin; ymax = fYmax;
00104 zmin = fZmin; zmax = fZmax;
00105 }
00106
00107 };
00108
00109
00110
00111
00112 class TDispatchTimer : public TTimer {
00113 private:
00114 TProofPlayer *fPlayer;
00115
00116 public:
00117 TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
00118
00119 Bool_t Notify();
00120 };
00121
00122 Bool_t TDispatchTimer::Notify()
00123 {
00124
00125
00126
00127
00128 if (gDebug > 0)
00129 Info ("Notify","called!");
00130
00131 fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
00132
00133
00134 Reset();
00135 return kTRUE;
00136 }
00137
00138
00139
00140
00141 class TStopTimer : public TTimer {
00142 private:
00143 Bool_t fAbort;
00144 TProofPlayer *fPlayer;
00145
00146 public:
00147 TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
00148
00149 Bool_t Notify();
00150 };
00151
00152
00153 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
00154 : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
00155 {
00156
00157
00158
00159
00160
00161 if (gDebug > 0)
00162 Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
00163
00164 fPlayer = p;
00165 fAbort = abort;
00166
00167 if (gDebug > 1)
00168 Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
00169 }
00170
00171
00172 Bool_t TStopTimer::Notify()
00173 {
00174
00175
00176
00177
00178
00179 if (gDebug > 0)
00180 Info ("Notify","called!");
00181
00182 if (fAbort)
00183 Throw(kPEX_ABORTED);
00184 else
00185 Throw(kPEX_STOPPED);
00186
00187 return kTRUE;
00188 }
00189
00190
00191
00192 ClassImp(TProofPlayer)
00193
00194 THashList *TProofPlayer::fgDrawInputPars = 0;
00195
00196
00197 TProofPlayer::TProofPlayer(TProof *)
00198 : fAutoBins(0), fOutput(0), fSelector(0), fSelectorClass(0),
00199 fFeedbackTimer(0), fFeedbackPeriod(2000),
00200 fEvIter(0), fSelStatus(0),
00201 fTotalEvents(0), fQueryResults(0), fQuery(0), fDrawQueries(0),
00202 fMaxDrawQueries(1), fStopTimer(0), fStopTimerMtx(0), fDispatchTimer(0)
00203 {
00204
00205
00206 fInput = new TList;
00207 fExitStatus = kFinished;
00208 fProgressStatus = new TProofProgressStatus();
00209 SetProcessing(kFALSE);
00210
00211 static Bool_t initLimitsFinder = kFALSE;
00212 if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
00213 THLimitsFinder::SetLimitsFinder(new TProofLimitsFinder);
00214 initLimitsFinder = kTRUE;
00215 }
00216 }
00217
00218
00219 TProofPlayer::~TProofPlayer()
00220 {
00221
00222
00223 fInput->Clear("nodelete");
00224 SafeDelete(fInput);
00225
00226 SafeDelete(fSelector);
00227 SafeDelete(fFeedbackTimer);
00228 SafeDelete(fEvIter);
00229 SafeDelete(fQueryResults);
00230 SafeDelete(fDispatchTimer);
00231 SafeDelete(fStopTimer);
00232 }
00233
00234
00235 void TProofPlayer::SetProcessing(Bool_t on)
00236 {
00237
00238
00239 if (on)
00240 SetBit(TProofPlayer::kIsProcessing);
00241 else
00242 ResetBit(TProofPlayer::kIsProcessing);
00243 }
00244
00245
00246 void TProofPlayer::StopProcess(Bool_t abort, Int_t timeout)
00247 {
00248
00249
00250
00251
00252 if (gDebug > 0)
00253 Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
00254
00255 if (fEvIter != 0)
00256 fEvIter->StopProcess(abort);
00257 Long_t to = 1;
00258 if (abort == kTRUE) {
00259 fExitStatus = kAborted;
00260 } else {
00261 fExitStatus = kStopped;
00262 to = timeout;
00263 }
00264
00265 if (to > 0)
00266 SetStopTimer(kTRUE, abort, to);
00267 }
00268
00269
00270 void TProofPlayer::SetDispatchTimer(Bool_t on)
00271 {
00272
00273
00274 SafeDelete(fDispatchTimer);
00275 ResetBit(TProofPlayer::kDispatchOneEvent);
00276 if (on) {
00277 fDispatchTimer = new TDispatchTimer(this);
00278 fDispatchTimer->Start();
00279 }
00280 }
00281
00282
00283 void TProofPlayer::SetStopTimer(Bool_t on, Bool_t abort, Int_t timeout)
00284 {
00285
00286
00287
00288 fStopTimerMtx = (fStopTimerMtx) ? fStopTimerMtx : new TMutex(kTRUE);
00289 R__LOCKGUARD(fStopTimerMtx);
00290
00291
00292 SafeDelete(fStopTimer);
00293 if (on) {
00294
00295 fStopTimer = new TStopTimer(this, abort, timeout);
00296
00297 fStopTimer->Start();
00298 if (gDebug > 0)
00299 Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
00300 (abort ? "ABORT" : "STOP"), timeout);
00301 } else {
00302 if (gDebug > 0)
00303 Info ("SetStopTimer", "timer STOPPED");
00304 }
00305 }
00306
00307
00308 void TProofPlayer::AddQueryResult(TQueryResult *q)
00309 {
00310
00311
00312
00313 if (!q) {
00314 Warning("AddQueryResult","query undefined - do nothing");
00315 return;
00316 }
00317
00318
00319 if (!(q->IsDraw())) {
00320 if (!fQueryResults) {
00321 fQueryResults = new TList;
00322 fQueryResults->Add(q);
00323 } else {
00324 TIter nxr(fQueryResults);
00325 TQueryResult *qr = 0;
00326 TQueryResult *qp = 0;
00327 while ((qr = (TQueryResult *) nxr())) {
00328
00329 if (*qr == *q) {
00330 fQueryResults->Remove(qr);
00331 delete qr;
00332 break;
00333 }
00334
00335 if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
00336 qp = qr;
00337 }
00338
00339 if (!qp) {
00340 fQueryResults->AddFirst(q);
00341 } else {
00342 fQueryResults->AddAfter(qp, q);
00343 }
00344 }
00345 } else if (IsClient()) {
00346
00347 if (fDrawQueries == fMaxDrawQueries && fMaxDrawQueries > 0) {
00348 TIter nxr(fQueryResults);
00349 TQueryResult *qr = 0;
00350 while ((qr = (TQueryResult *) nxr())) {
00351
00352 if (qr->IsDraw()) {
00353 fDrawQueries--;
00354 fQueryResults->Remove(qr);
00355 delete qr;
00356 break;
00357 }
00358 }
00359 }
00360
00361 if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
00362 fDrawQueries++;
00363 if (!fQueryResults)
00364 fQueryResults = new TList;
00365 fQueryResults->Add(q);
00366 }
00367 }
00368 }
00369
00370
00371 void TProofPlayer::RemoveQueryResult(const char *ref)
00372 {
00373
00374
00375
00376 if (fQueryResults) {
00377 TIter nxq(fQueryResults);
00378 TQueryResult *qr = 0;
00379 while ((qr = (TQueryResult *) nxq())) {
00380 if (qr->Matches(ref)) {
00381 fQueryResults->Remove(qr);
00382 delete qr;
00383 }
00384 }
00385 }
00386 }
00387
00388
00389 TQueryResult *TProofPlayer::GetQueryResult(const char *ref)
00390 {
00391
00392
00393
00394 if (fQueryResults) {
00395 if (ref && strlen(ref) > 0) {
00396 TIter nxq(fQueryResults);
00397 TQueryResult *qr = 0;
00398 while ((qr = (TQueryResult *) nxq())) {
00399 if (qr->Matches(ref))
00400 return qr;
00401 }
00402 } else {
00403
00404 return (TQueryResult *) fQueryResults->Last();
00405 }
00406 }
00407
00408
00409 return (TQueryResult *)0;
00410 }
00411
00412
00413 void TProofPlayer::SetCurrentQuery(TQueryResult *q)
00414 {
00415
00416
00417 fPreviousQuery = fQuery;
00418 fQuery = q;
00419 }
00420
00421
00422 void TProofPlayer::AddInput(TObject *inp)
00423 {
00424
00425
00426 fInput->Add(inp);
00427 }
00428
00429
00430 void TProofPlayer::ClearInput()
00431 {
00432
00433
00434 fInput->Clear();
00435 }
00436
00437
00438 TObject *TProofPlayer::GetOutput(const char *name) const
00439 {
00440
00441
00442 if (fOutput)
00443 return fOutput->FindObject(name);
00444 return 0;
00445 }
00446
00447
00448 TList *TProofPlayer::GetOutputList() const
00449 {
00450
00451
00452 TList *ol = fOutput;
00453 if (!ol && fQuery)
00454 ol = fQuery->GetOutputList();
00455 return ol;
00456 }
00457
00458
00459 Int_t TProofPlayer::ReinitSelector(TQueryResult *qr)
00460 {
00461
00462
00463
00464
00465 Int_t rc = 0;
00466
00467
00468 if (!qr) {
00469 Info("ReinitSelector", "query undefined - do nothing");
00470 return -1;
00471 }
00472
00473
00474 TString selec = qr->GetSelecImp()->GetName();
00475 if (selec.Length() <= 0) {
00476 Info("ReinitSelector", "selector name undefined - do nothing");
00477 return -1;
00478 }
00479
00480
00481 Bool_t stdselec = TSelector::IsStandardDraw(selec);
00482
00483
00484
00485 Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
00486
00487
00488 TString ipathold;
00489 if (!stdselec && !compselec) {
00490
00491 Bool_t expandselec = kTRUE;
00492 TString dir, ipath;
00493 char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
00494 if (selc) {
00495
00496 TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
00497
00498 md5icur = TMD5::FileChecksum(selc);
00499 md5iold = qr->GetSelecImp()->Checksum();
00500
00501 TString selh(selc);
00502 Int_t dot = selh.Last('.');
00503 if (dot != kNPOS) selh.Remove(dot);
00504 selh += ".h";
00505 if (!gSystem->AccessPathName(selh, kReadPermission))
00506 md5hcur = TMD5::FileChecksum(selh);
00507 md5hold = qr->GetSelecHdr()->Checksum();
00508
00509
00510 if (md5hcur && md5hold && md5icur && md5iold)
00511 if (*md5hcur == *md5hold && *md5icur == *md5iold)
00512 expandselec = kFALSE;
00513
00514 SafeDelete(md5icur);
00515 SafeDelete(md5hcur);
00516 SafeDelete(md5iold);
00517 SafeDelete(md5hold);
00518 if (selc) delete [] selc;
00519 }
00520
00521 Bool_t ok = kTRUE;
00522
00523 if (expandselec) {
00524
00525 ok = kFALSE;
00526
00527 TUUID u;
00528 dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
00529 if (!(gSystem->MakeDirectory(dir))) {
00530
00531
00532 selec = Form("%s/%s",dir.Data(),selec.Data());
00533 qr->GetSelecImp()->SaveSource(selec);
00534
00535
00536 TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
00537 qr->GetSelecHdr()->SaveSource(seleh);
00538
00539
00540 ipathold = gSystem->GetIncludePath();
00541 ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
00542 gSystem->SetIncludePath(ipath.Data());
00543
00544 ok = kTRUE;
00545 }
00546 }
00547 TString opt(qr->GetOptions());
00548 Ssiz_t id = opt.Last('#');
00549 if (id != kNPOS && id < opt.Length() - 1)
00550 selec += opt(id + 1, opt.Length());
00551
00552 if (!ok) {
00553 Info("ReinitSelector", "problems locating or exporting selector files");
00554 return -1;
00555 }
00556 }
00557
00558
00559 SafeDelete(fSelector);
00560 fSelectorClass = 0;
00561
00562
00563 Int_t iglevelsave = gErrorIgnoreLevel;
00564 if (compselec)
00565
00566 gErrorIgnoreLevel = kBreak;
00567
00568 if ((fSelector = TSelector::GetSelector(selec))) {
00569 if (compselec)
00570 gErrorIgnoreLevel = iglevelsave;
00571 fSelectorClass = fSelector->IsA();
00572 fSelector->SetOption(qr->GetOptions());
00573
00574 } else {
00575 if (compselec) {
00576 gErrorIgnoreLevel = iglevelsave;
00577
00578 if (strlen(qr->GetLibList()) > 0) {
00579 TString sl(qr->GetLibList());
00580 TObjArray *oa = sl.Tokenize(" ");
00581 if (oa) {
00582 Bool_t retry = kFALSE;
00583 TIter nxl(oa);
00584 TObjString *os = 0;
00585 while ((os = (TObjString *) nxl())) {
00586 TString lib = gSystem->BaseName(os->GetName());
00587 if (lib != "lib") {
00588 lib.ReplaceAll("-l", "lib");
00589 if (gSystem->Load(lib) == 0)
00590 retry = kTRUE;
00591 }
00592 }
00593
00594 if (retry)
00595 fSelector = TSelector::GetSelector(selec);
00596 }
00597 }
00598 }
00599 if (!fSelector) {
00600 if (compselec)
00601 Info("ReinitSelector", "compiled selector re-init failed:"
00602 " automatic reload unsuccessful:"
00603 " please load manually the correct library");
00604 rc = -1;
00605 }
00606 }
00607 if (fSelector) {
00608
00609 fSelector->SetInputList(qr->GetInputList());
00610 if (stdselec) {
00611 ((TProofDraw *)fSelector)->DefVar();
00612 } else {
00613
00614 fSelector->Begin(0);
00615 }
00616 }
00617
00618
00619 if (ipathold.Length() > 0)
00620 gSystem->SetIncludePath(ipathold.Data());
00621
00622 return rc;
00623 }
00624
00625
00626 Int_t TProofPlayer::AddOutputObject(TObject *)
00627 {
00628
00629
00630 MayNotUse("AddOutputObject");
00631 return -1;
00632 }
00633
00634
00635 void TProofPlayer::AddOutput(TList *)
00636 {
00637
00638
00639 MayNotUse("AddOutput");
00640 }
00641
00642
00643 void TProofPlayer::StoreOutput(TList *)
00644 {
00645
00646
00647 MayNotUse("StoreOutput");
00648 }
00649
00650
00651 void TProofPlayer::StoreFeedback(TObject *, TList *)
00652 {
00653
00654
00655 MayNotUse("StoreFeedback");
00656 }
00657
00658
00659 void TProofPlayer::Progress(Long64_t , Long64_t )
00660 {
00661
00662
00663 MayNotUse("Progress");
00664 }
00665
00666
00667 void TProofPlayer::Progress(Long64_t , Long64_t ,
00668 Long64_t ,
00669 Float_t , Float_t ,
00670 Float_t , Float_t )
00671 {
00672
00673
00674 MayNotUse("Progress");
00675 }
00676
00677
00678 void TProofPlayer::Progress(TProofProgressInfo * )
00679 {
00680
00681
00682 MayNotUse("Progress");
00683 }
00684
00685
00686 void TProofPlayer::Feedback(TList *)
00687 {
00688
00689
00690 MayNotUse("Feedback");
00691 }
00692
00693
00694 TDrawFeedback *TProofPlayer::CreateDrawFeedback(TProof *p)
00695 {
00696
00697
00698
00699 return new TDrawFeedback(p);
00700 }
00701
00702
00703 void TProofPlayer::SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
00704 {
00705
00706
00707 if (f)
00708 f->SetOption(opt);
00709 }
00710
00711
00712 void TProofPlayer::DeleteDrawFeedback(TDrawFeedback *f)
00713 {
00714
00715
00716 delete f;
00717 }
00718
00719
00720 Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
00721 Option_t *option, Long64_t nentries,
00722 Long64_t first)
00723 {
00724
00725
00726
00727
00728 PDB(kGlobal,1) Info("Process","Enter");
00729
00730 fExitStatus = kFinished;
00731 fOutput = 0;
00732
00733 TCleanup clean(this);
00734
00735 SafeDelete(fSelector);
00736 fSelectorClass = 0;
00737 Int_t version = -1;
00738 TRY {
00739
00740 if (gProofServ) {
00741 gProofServ->GetCacheLock()->Lock();
00742 gProofServ->CopyFromCache(selector_file, 1);
00743 }
00744
00745 if (!(fSelector = TSelector::GetSelector(selector_file))) {
00746 Error("Process", "cannot load: %s", selector_file );
00747 gProofServ->GetCacheLock()->Unlock();
00748 return -1;
00749 }
00750
00751
00752 if (gProofServ) {
00753 gProofServ->CopyToCache(selector_file, 1);
00754 gProofServ->GetCacheLock()->Unlock();
00755 }
00756
00757 fSelectorClass = fSelector->IsA();
00758 version = fSelector->Version();
00759
00760 fOutput = fSelector->GetOutputList();
00761
00762 if (gProofServ)
00763 TPerfStats::Start(fInput, fOutput);
00764
00765 fSelStatus = new TStatus;
00766 fOutput->Add(fSelStatus);
00767
00768 fSelector->SetOption(option);
00769 fSelector->SetInputList(fInput);
00770
00771
00772
00773 fTotalEvents = nentries;
00774 if (fTotalEvents < 0 && gProofServ &&
00775 gProofServ->IsMaster() && !gProofServ->IsParallel()) {
00776 dset->Validate();
00777 dset->Reset();
00778 TDSetElement *e = 0;
00779 while ((e = dset->Next())) {
00780 fTotalEvents += e->GetNum();
00781 }
00782 }
00783
00784 dset->Reset();
00785
00786
00787 Int_t useTreeCache = 1;
00788 if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
00789 if (useTreeCache > -1 && useTreeCache < 2)
00790 gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
00791 }
00792 Long64_t cacheSize = -1;
00793 if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
00794 TString sz = TString::Format("%lld", cacheSize);
00795 gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
00796 }
00797
00798 Int_t useParallelUnzip = 0;
00799 if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
00800 if (useParallelUnzip > -1 && useParallelUnzip < 2)
00801 gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
00802 }
00803 fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
00804
00805 if (version == 0) {
00806 PDB(kLoop,1) Info("Process","Call Begin(0)");
00807 fSelector->Begin(0);
00808 } else {
00809 if (IsClient()) {
00810
00811 PDB(kLoop,1) Info("Process","Call Begin(0)");
00812 fSelector->Begin(0);
00813 }
00814 if (fSelStatus->IsOk()) {
00815 PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
00816 fSelector->SlaveBegin(0);
00817
00818 }
00819 }
00820
00821 } CATCH(excode) {
00822 SetProcessing(kFALSE);
00823 Error("Process","exception %d caught", excode);
00824 gProofServ->GetCacheLock()->Unlock();
00825 return -1;
00826 } ENDTRY;
00827
00828
00829 SetupFeedback();
00830
00831 if (gMonitoringWriter)
00832 gMonitoringWriter->SendProcessingStatus("STARTED",kTRUE);
00833
00834 PDB(kLoop,1)
00835 Info("Process","Looping over Process()");
00836
00837
00838 Long64_t readbytesatstart = TFile::GetFileBytesRead();
00839 Long64_t readcallsatstart = TFile::GetFileReadCalls();
00840
00841 if (gMonitoringWriter)
00842 gMonitoringWriter->SendProcessingProgress(0,0,kTRUE);
00843
00844
00845 SetDispatchTimer(kTRUE);
00846
00847
00848 gAbort = kFALSE;
00849 Long64_t entry;
00850 fProgressStatus->Reset();
00851 if (gProofServ) gProofServ->ResetBit(TProofServ::kHighMemory);
00852
00853 TRY {
00854
00855
00856 TParameter<Long64_t> *par = (TParameter<Long64_t>*)fInput->FindObject("PROOF_MemLogFreq");
00857 Long64_t memlogfreq = (par) ? par->GetVal() : 100;
00858 Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
00859 TString lastMsg;
00860
00861 TPair *currentElem = 0;
00862
00863 while ((entry = fEvIter->GetNextEvent()) >= 0 && fSelStatus->IsOk()) {
00864
00865
00866
00867 SetProcessing(kTRUE);
00868
00869
00870
00871 lastMsg = "(unfortunately no detailed info is available about current packet)";
00872 if (dset->Current()) {
00873 if (!currentElem) {
00874 currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
00875 fInput->Add(currentElem);
00876 } else {
00877 if (currentElem->Value() != dset->Current()) {
00878 currentElem->SetValue(dset->Current());
00879 } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
00880 dset->Current()->ResetBit(TDSetElement::kNewRun);
00881 }
00882 }
00883 if (dset->TestBit(TDSet::kEmpty)) {
00884 lastMsg.Form("while processing cycle:%lld - check logs for possible stacktrace", entry);
00885 } else {
00886 TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
00887 TString fn = (elem) ? elem->GetFileName() : "<undef>";
00888 lastMsg.Form("while processing dset:'%s', file:'%s', event:%lld"
00889 " - check logs for possible stacktrace", dset->GetName(), fn.Data(), entry);
00890 }
00891 }
00892
00893 TProofServ::SetLastMsg(lastMsg);
00894
00895 if (version == 0) {
00896 PDB(kLoop,3)
00897 Info("Process","Call ProcessCut(%lld)", entry);
00898 if (fSelector->ProcessCut(entry)) {
00899 PDB(kLoop,3)
00900 Info("Process","Call ProcessFill(%lld)", entry);
00901 fSelector->ProcessFill(entry);
00902 }
00903 } else {
00904 PDB(kLoop,3)
00905 Info("Process","Call Process(%lld)", entry);
00906 fSelector->Process(entry);
00907 if (fSelector->GetAbort() == TSelector::kAbortProcess) {
00908 SetProcessing(kFALSE);
00909 break;
00910 }
00911 }
00912
00913 if (fSelStatus->IsOk()) {
00914 fProgressStatus->IncEntries();
00915 fProgressStatus->SetBytesRead(TFile::GetFileBytesRead()-readbytesatstart);
00916 fProgressStatus->SetReadCalls(TFile::GetFileReadCalls()-readcallsatstart);
00917 if (gMonitoringWriter)
00918 gMonitoringWriter->SendProcessingProgress(fProgressStatus->GetEntries(),
00919 TFile::GetFileBytesRead()-readbytesatstart, kFALSE);
00920 }
00921
00922 TString wmsg;
00923 if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
00924 Error("Process", "%s", wmsg.Data());
00925 if (gProofServ) {
00926 wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ", gProofServ->GetOrdinal(), entry));
00927 gProofServ->SendAsynMessage(wmsg.Data());
00928 }
00929 fExitStatus = kStopped;
00930 SetProcessing(kFALSE);
00931 if (gProofServ) gProofServ->SetBit(TProofServ::kHighMemory);
00932 break;
00933 } else {
00934 if (!wmsg.IsNull()) {
00935 Warning("Process", "%s", wmsg.Data());
00936 if (gProofServ) {
00937 wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ", gProofServ->GetOrdinal(), entry));
00938 gProofServ->SendAsynMessage(wmsg.Data());
00939 }
00940 }
00941 }
00942
00943 if (TestBit(TProofPlayer::kDispatchOneEvent)) {
00944 gSystem->DispatchOneEvent(kTRUE);
00945 ResetBit(TProofPlayer::kDispatchOneEvent);
00946 }
00947 SetProcessing(kFALSE);
00948 if (!fSelStatus->IsOk() || gROOT->IsInterrupted()) break;
00949 }
00950
00951 } CATCH(excode) {
00952 if (excode == kPEX_STOPPED) {
00953 Info("Process","received stop-process signal");
00954 fExitStatus = kStopped;
00955 } else if (excode == kPEX_ABORTED) {
00956 gAbort = kTRUE;
00957 Info("Process","received abort-process signal");
00958 fExitStatus = kAborted;
00959 } else {
00960 Error("Process","exception %d caught", excode);
00961
00962 gAbort = kTRUE;
00963 fExitStatus = kAborted;
00964 }
00965 SetProcessing(kFALSE);
00966 } ENDTRY;
00967
00968
00969 TPair *currentElem = 0;
00970 if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
00971 fInput->Remove(currentElem);
00972 delete currentElem->Key();
00973 delete currentElem;
00974 }
00975
00976 PDB(kGlobal,2)
00977 Info("Process","%lld events processed", fProgressStatus->GetEntries());
00978
00979 if (gMonitoringWriter) {
00980 gMonitoringWriter->SendProcessingProgress(fProgressStatus->GetEntries(), TFile::GetFileBytesRead()-readbytesatstart, kFALSE);
00981 gMonitoringWriter->SendProcessingStatus("DONE");
00982 }
00983
00984
00985 SetDispatchTimer(kFALSE);
00986 if (fStopTimer != 0)
00987 SetStopTimer(kFALSE, gAbort);
00988 if (fFeedbackTimer != 0)
00989 HandleTimer(0);
00990
00991 StopFeedback();
00992
00993 SafeDelete(fEvIter);
00994
00995
00996
00997 if (fExitStatus != kAborted) {
00998
00999 TIter nxo(GetOutputList());
01000 TObject *o = 0;
01001 while ((o = nxo())) {
01002
01003 if (o->IsA() == TProofOutputFile::Class()) {
01004 ((TProofOutputFile *)o)->SetWorkerOrdinal(gProofServ->GetOrdinal());
01005 if (!strcmp(((TProofOutputFile *)o)->GetDir(),""))
01006 ((TProofOutputFile *)o)->SetDir(gProofServ->GetSessionDir());
01007 }
01008 }
01009
01010 MapOutputListToDataMembers();
01011
01012 if (fSelStatus->IsOk()) {
01013 if (version == 0) {
01014 PDB(kLoop,1) Info("Process","Call Terminate()");
01015 fSelector->Terminate();
01016 } else {
01017 PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
01018 fSelector->SlaveTerminate();
01019 if (IsClient() && fSelStatus->IsOk()) {
01020 PDB(kLoop,1) Info("Process","Call Terminate()");
01021 fSelector->Terminate();
01022 }
01023 }
01024 }
01025 if (gProofServ && !gProofServ->IsParallel()) {
01026 TIter nxc(gROOT->GetListOfCanvases());
01027 while (TObject *c = nxc())
01028 fOutput->Add(c);
01029 }
01030 }
01031
01032 if (gProofServ)
01033 TPerfStats::Stop();
01034
01035 return 0;
01036 }
01037
01038
01039 Bool_t TProofPlayer::CheckMemUsage(Long64_t &mfreq, Bool_t &w80r,
01040 Bool_t &w80v, TString &wmsg)
01041 {
01042
01043
01044
01045
01046 if (mfreq > 0 && GetEventsProcessed()%mfreq == 0) {
01047
01048 ProcInfo_t pi;
01049 if (!gSystem->GetProcInfo(&pi)){
01050 Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
01051 pi.fMemVirtual, pi.fMemResident, GetEventsProcessed());
01052 wmsg = "";
01053
01054 if (TProofServ::GetVirtMemMax() > 0) {
01055 if (pi.fMemVirtual > TProofServ::GetMemStop() * TProofServ::GetVirtMemMax()) {
01056 wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
01057 " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
01058 return kFALSE;
01059 } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
01060
01061 mfreq = 1;
01062 wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
01063 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
01064 w80v = kFALSE;
01065 }
01066 }
01067
01068 if (TProofServ::GetResMemMax() > 0) {
01069 if (pi.fMemResident > TProofServ::GetMemStop() * TProofServ::GetResMemMax()) {
01070 wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
01071 " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
01072 return kFALSE;
01073 } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
01074
01075 mfreq = 1;
01076 if (wmsg.Length() > 0) {
01077 wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
01078 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
01079 } else {
01080 wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
01081 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
01082 }
01083 w80r = kFALSE;
01084 }
01085 }
01086 }
01087 }
01088
01089 return kTRUE;
01090 }
01091
01092
01093 Long64_t TProofPlayer::Finalize(Bool_t, Bool_t)
01094 {
01095
01096
01097 MayNotUse("Finalize");
01098 return -1;
01099 }
01100
01101
01102 Long64_t TProofPlayer::Finalize(TQueryResult *)
01103 {
01104
01105
01106 MayNotUse("Finalize");
01107 return -1;
01108 }
01109
01110 void TProofPlayer::MergeOutput()
01111 {
01112
01113
01114 MayNotUse("MergeOutput");
01115 return;
01116 }
01117
01118
01119 void TProofPlayer::MapOutputListToDataMembers() const
01120 {
01121 TOutputListSelectorDataMap* olsdm = new TOutputListSelectorDataMap(fSelector);
01122 fOutput->Add(olsdm);
01123 }
01124
01125
01126 void TProofPlayer::UpdateAutoBin(const char *name,
01127 Double_t& xmin, Double_t& xmax,
01128 Double_t& ymin, Double_t& ymax,
01129 Double_t& zmin, Double_t& zmax)
01130 {
01131
01132
01133 if ( fAutoBins == 0 ) {
01134 fAutoBins = new THashList;
01135 }
01136
01137 TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
01138
01139 if ( val == 0 ) {
01140
01141 if (gProofServ && !gProofServ->IsTopMaster()) {
01142 TString key = name;
01143 TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
01144 }
01145
01146 val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
01147 fAutoBins->Add(val);
01148 } else {
01149 val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
01150 }
01151 }
01152
01153
01154 TDSetElement *TProofPlayer::GetNextPacket(TSlave *, TMessage *)
01155 {
01156
01157
01158 MayNotUse("GetNextPacket");
01159 return 0;
01160 }
01161
01162
01163 void TProofPlayer::SetupFeedback()
01164 {
01165
01166
01167 MayNotUse("SetupFeedback");
01168 }
01169
01170
01171 void TProofPlayer::StopFeedback()
01172 {
01173
01174
01175 MayNotUse("StopFeedback");
01176 }
01177
01178
01179 Long64_t TProofPlayer::DrawSelect(TDSet * , const char * ,
01180 const char * , Option_t * ,
01181 Long64_t , Long64_t )
01182 {
01183
01184
01185 MayNotUse("DrawSelect");
01186 return -1;
01187 }
01188
01189
01190 void TProofPlayer::HandleGetTreeHeader(TMessage *)
01191 {
01192
01193
01194 MayNotUse("HandleGetTreeHeader|");
01195 }
01196
01197
01198 void TProofPlayer::HandleRecvHisto(TMessage *mess)
01199 {
01200
01201
01202 TObject *obj = mess->ReadObject(mess->GetClass());
01203 if (obj->InheritsFrom(TH1::Class())) {
01204 TH1 *h = (TH1*)obj;
01205 h->SetDirectory(0);
01206 TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
01207 if (horg)
01208 horg->Add(h);
01209 else
01210 h->SetDirectory(gDirectory);
01211 }
01212 }
01213
01214
01215 Int_t TProofPlayer::DrawCanvas(TObject *obj)
01216 {
01217
01218
01219
01220
01221 static Int_t (*gDrawCanvasHook)(TObject *) = 0;
01222
01223
01224 if (!gDrawCanvasHook) {
01225
01226 TString drawlib = "libProofDraw";
01227 char *p = 0;
01228 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
01229 delete[] p;
01230 if (gSystem->Load(drawlib) != -1) {
01231
01232 Func_t f = 0;
01233 if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
01234 gDrawCanvasHook = (Int_t (*)(TObject *))(f);
01235 else
01236 Warning("DrawCanvas", "can't find DrawCanvas");
01237 } else
01238 Warning("DrawCanvas", "can't load %s", drawlib.Data());
01239 } else
01240 Warning("DrawCanvas", "can't locate %s", drawlib.Data());
01241 }
01242 if (gDrawCanvasHook && obj)
01243 return (*gDrawCanvasHook)(obj);
01244
01245 return 1;
01246 }
01247
01248
01249 Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
01250 TString &selector, TString &objname)
01251 {
01252
01253
01254
01255
01256 static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
01257 TString &, TString &) = 0;
01258
01259
01260 if (!gGetDrawArgsHook) {
01261
01262 TString drawlib = "libProofDraw";
01263 char *p = 0;
01264 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
01265 delete[] p;
01266 if (gSystem->Load(drawlib) != -1) {
01267
01268 Func_t f = 0;
01269 if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
01270 gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
01271 TString &, TString &))(f);
01272 else
01273 Warning("GetDrawArgs", "can't find GetDrawArgs");
01274 } else
01275 Warning("GetDrawArgs", "can't load %s", drawlib.Data());
01276 } else
01277 Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
01278 }
01279 if (gGetDrawArgsHook)
01280 return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
01281
01282 return 1;
01283 }
01284
01285
01286 void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
01287 {
01288
01289
01290 static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
01291
01292
01293 if (!gFeedBackCanvasHook) {
01294
01295 TString drawlib = "libProofDraw";
01296 char *p = 0;
01297 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
01298 delete[] p;
01299 if (gSystem->Load(drawlib) != -1) {
01300
01301 Func_t f = 0;
01302 if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
01303 gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
01304 else
01305 Warning("FeedBackCanvas", "can't find FeedBackCanvas");
01306 } else
01307 Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
01308 } else
01309 Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
01310 }
01311 if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
01312
01313 return;
01314 }
01315
01316
01317 Long64_t TProofPlayer::GetCacheSize()
01318 {
01319
01320
01321 if (fEvIter) return fEvIter->GetCacheSize();
01322 return -1;
01323 }
01324
01325
01326 Int_t TProofPlayer::GetLearnEntries()
01327 {
01328
01329
01330 if (fEvIter) return fEvIter->GetLearnEntries();
01331 return -1;
01332 }
01333
01334
01335
01336 ClassImp(TProofPlayerLocal)
01337
01338
01339
01340
01341 ClassImp(TProofPlayerRemote)
01342
01343
01344
01345 TProofPlayerRemote::~TProofPlayerRemote()
01346 {
01347
01348
01349 SafeDelete(fOutput);
01350 SafeDelete(fOutputLists);
01351
01352
01353 SafeDelete(fFeedbackLists);
01354 SafeDelete(fPacketizer);
01355 }
01356
01357
01358 Int_t TProofPlayerRemote::InitPacketizer(TDSet *dset, Long64_t nentries,
01359 Long64_t first, const char *defpackunit,
01360 const char *defpackdata)
01361 {
01362
01363
01364
01365 SafeDelete(fPacketizer);
01366 PDB(kGlobal,1) Info("Process","Enter");
01367 fDSet = dset;
01368 fExitStatus = kFinished;
01369
01370 Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
01371
01372 TString packetizer;
01373 TList *listOfMissingFiles = 0;
01374
01375 TMethodCall callEnv;
01376 TClass *cl;
01377 noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
01378
01379 if (noData) {
01380
01381 if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
01382 packetizer = defpackunit;
01383 else
01384 Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
01385
01386
01387 cl = TClass::GetClass(packetizer);
01388 if (cl == 0) {
01389 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
01390 fExitStatus = kAborted;
01391 return -1;
01392 }
01393
01394
01395 callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
01396 if (!callEnv.IsValid()) {
01397 Error("InitPacketizer",
01398 "cannot find correct constructor for '%s'", cl->GetName());
01399 fExitStatus = kAborted;
01400 return -1;
01401 }
01402 callEnv.ResetParam();
01403 callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
01404 callEnv.SetParam((Long64_t) nentries);
01405 callEnv.SetParam((Long_t) fInput);
01406 callEnv.SetParam((Long_t) fProgressStatus);
01407
01408 } else if (dset->TestBit(TDSet::kMultiDSet)) {
01409
01410
01411 if (fProof->GetRunStatus() != TProof::kRunning) {
01412
01413 Error("InitPacketizer", "received stop/abort request");
01414 fExitStatus = kAborted;
01415 return -1;
01416 }
01417
01418
01419 packetizer = "TPacketizerMulti";
01420
01421
01422 cl = TClass::GetClass(packetizer);
01423 if (cl == 0) {
01424 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
01425 fExitStatus = kAborted;
01426 return -1;
01427 }
01428
01429
01430 callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
01431 if (!callEnv.IsValid()) {
01432 Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
01433 fExitStatus = kAborted;
01434 return -1;
01435 }
01436 callEnv.ResetParam();
01437 callEnv.SetParam((Long_t) dset);
01438 callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
01439 callEnv.SetParam((Long64_t) first);
01440 callEnv.SetParam((Long64_t) nentries);
01441 callEnv.SetParam((Long_t) fInput);
01442 callEnv.SetParam((Long_t) fProgressStatus);
01443
01444
01445 dset->SetBit(TDSet::kValidityChecked);
01446 dset->ResetBit(TDSet::kSomeInvalid);
01447
01448 } else {
01449
01450
01451
01452
01453
01454 if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
01455
01456 fInput->Remove(listOfMissingFiles);
01457 } else {
01458 listOfMissingFiles = new TList;
01459 }
01460
01461 TString lkopt;
01462 if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
01463 dset->Lookup(kTRUE, &listOfMissingFiles);
01464
01465 if (fProof->GetRunStatus() != TProof::kRunning) {
01466
01467 Error("InitPacketizer", "received stop/abort request");
01468 fExitStatus = kAborted;
01469 return -1;
01470 }
01471
01472 if (!(dset->GetListOfElements()) ||
01473 !(dset->GetListOfElements()->GetSize())) {
01474 if (gProofServ)
01475 gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
01476 Error("InitPacketizer", "No files from the data set were found - Aborting");
01477 fExitStatus = kAborted;
01478 if (listOfMissingFiles) {
01479 listOfMissingFiles->SetOwner();
01480 fOutput->Remove(listOfMissingFiles);
01481 SafeDelete(listOfMissingFiles);
01482 }
01483 return -1;
01484 }
01485
01486 if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
01487
01488 packetizer = defpackdata;
01489 else
01490 Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
01491
01492
01493 cl = TClass::GetClass(packetizer);
01494 if (cl == 0) {
01495 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
01496 fExitStatus = kAborted;
01497 return -1;
01498 }
01499
01500
01501 callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
01502 if (!callEnv.IsValid()) {
01503 Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
01504 fExitStatus = kAborted;
01505 return -1;
01506 }
01507 callEnv.ResetParam();
01508 callEnv.SetParam((Long_t) dset);
01509 callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
01510 callEnv.SetParam((Long64_t) first);
01511 callEnv.SetParam((Long64_t) nentries);
01512 callEnv.SetParam((Long_t) fInput);
01513 callEnv.SetParam((Long_t) fProgressStatus);
01514
01515
01516 dset->SetBit(TDSet::kValidityChecked);
01517 dset->ResetBit(TDSet::kSomeInvalid);
01518 }
01519
01520
01521 Long_t ret = 0;
01522 callEnv.Execute(ret);
01523 if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
01524 Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
01525 fExitStatus = kAborted;
01526 return -1;
01527 }
01528
01529 if (!fPacketizer->IsValid()) {
01530 Error("InitPacketizer",
01531 "instantiated packetizer object '%s' is invalid", cl->GetName());
01532 fExitStatus = kAborted;
01533 SafeDelete(fPacketizer);
01534 return -1;
01535 }
01536
01537
01538 if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
01539 if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
01540
01541 fInput->Remove(listOfMissingFiles);
01542 }
01543 }
01544
01545 if (!noData) {
01546
01547 TDSetElement *elem = 0;
01548 if (dset->TestBit(TDSet::kSomeInvalid)) {
01549 TIter nxe(dset->GetListOfElements());
01550 while ((elem = (TDSetElement *)nxe())) {
01551 if (!elem->GetValid()) {
01552 if (!listOfMissingFiles)
01553 listOfMissingFiles = new TList;
01554 listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
01555 dset->Remove(elem, kFALSE);
01556 }
01557 }
01558
01559 dset->ResetBit(TDSet::kSomeInvalid);
01560 }
01561
01562
01563 if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
01564 TIter missingFiles(listOfMissingFiles);
01565 TString msg;
01566 if (gDebug > 0) {
01567 TFileInfo *fi = 0;
01568 while ((fi = (TFileInfo *) missingFiles.Next())) {
01569 if (fi->GetCurrentUrl()) {
01570 msg = Form("File not found: %s - skipping!",
01571 fi->GetCurrentUrl()->GetUrl());
01572 } else {
01573 msg = Form("File not found: %s - skipping!", fi->GetName());
01574 }
01575 if (gProofServ) gProofServ->SendAsynMessage(msg.Data());
01576 }
01577 }
01578
01579 if (!GetOutput("MissingFiles")) {
01580 listOfMissingFiles->SetName("MissingFiles");
01581 AddOutputObject(listOfMissingFiles);
01582 }
01583 TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
01584 if (!tmpStatus) {
01585 tmpStatus = new TStatus();
01586 AddOutputObject(tmpStatus);
01587 }
01588
01589 Int_t ngood = dset->GetListOfElements()->GetSize();
01590 Int_t nbad = listOfMissingFiles->GetSize();
01591 Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
01592 msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
01593 " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
01594 tmpStatus->Add(msg.Data());
01595 msg = Form(" +++\n"
01596 " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
01597 " the 'MissingFiles' list\n"
01598 " +++", xb * 100., '%', nbad, nbad + ngood);
01599 if (gProofServ) gProofServ->SendAsynMessage(msg.Data());
01600 } else {
01601
01602 SafeDelete(listOfMissingFiles);
01603 }
01604 }
01605
01606
01607 return 0;
01608 }
01609
01610
01611 Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
01612 Option_t *option, Long64_t nentries,
01613 Long64_t first)
01614 {
01615
01616
01617
01618
01619
01620 PDB(kGlobal,1) Info("Process","Enter");
01621 fDSet = dset;
01622 fExitStatus = kFinished;
01623
01624 if (!fProgressStatus) {
01625 Error("Process", "No progress status");
01626 return -1;
01627 }
01628 fProgressStatus->Reset();
01629
01630
01631 if (!fOutput)
01632 fOutput = new TList;
01633 else
01634 fOutput->Clear();
01635
01636 SafeDelete(fFeedbackLists);
01637
01638 if (fProof->IsMaster()){
01639 TPerfStats::Start(fInput, fOutput);
01640 } else {
01641 TPerfStats::Setup(fInput);
01642 }
01643
01644 if(!SendSelector(selector_file)) return -1;
01645
01646 TMessage mesg(kPROOF_PROCESS);
01647 TString fn(gSystem->BaseName(selector_file));
01648
01649
01650 Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
01651
01652 TDSet *set = dset;
01653 if (fProof->IsMaster()) {
01654
01655 PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
01656 set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
01657 dset->GetDirectory() );
01658 if (dset->TestBit(TDSet::kEmpty))
01659 set->SetBit(TDSet::kEmpty);
01660
01661 const char *datapack = (fProof->IsLite()) ? "TPacketizer" : "TPacketizerAdaptive";
01662 if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", datapack) != 0) {
01663 Error("Process", "cannot init the packetizer");
01664 fExitStatus = kAborted;
01665 return -1;
01666 }
01667
01668
01669 first = 0;
01670
01671 if (!fProof->GetParameter("PROOF_MemLogFreq")){
01672 Long64_t memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
01673 memlogfreq = (memlogfreq > 0) ? memlogfreq : 1;
01674 fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
01675 }
01676
01677
01678 TString emsg;
01679 if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
01680 Warning("Process", "could not forward input data: %s", emsg.Data());
01681
01682 } else {
01683
01684
01685 if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
01686 Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
01687 if (smg >= 0) fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
01688 }
01689
01690
01691
01692 if (fOutputLists) {
01693 fOutputLists->Delete();
01694 delete fOutputLists;
01695 fOutputLists = 0;
01696 }
01697
01698 if (!sync) {
01699 gSystem->RedirectOutput(fProof->fLogFileName);
01700 Printf(" ");
01701 Info("Process","starting new query");
01702 }
01703
01704 SafeDelete(fSelector);
01705 fSelectorClass = 0;
01706 if (!(fSelector = TSelector::GetSelector(selector_file))) {
01707 if (!sync)
01708 gSystem->RedirectOutput(0);
01709 return -1;
01710 }
01711 fSelectorClass = fSelector->IsA();
01712 fSelector->SetInputList(fInput);
01713 fSelector->SetOption(option);
01714
01715 PDB(kLoop,1) Info("Process","Call Begin(0)");
01716 fSelector->Begin(0);
01717
01718
01719 fProof->SendInputDataFile();
01720
01721 if (!sync)
01722 gSystem->RedirectOutput(0);
01723 }
01724
01725 TCleanup clean(this);
01726 SetupFeedback();
01727
01728 TString opt = option;
01729
01730
01731 if (fProof->fProtocol < 13)
01732 dset->SetWriteV3(kTRUE);
01733
01734
01735 Long64_t num = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : nentries;
01736 Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
01737
01738
01739 TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
01740 : (TEntryList *)0;
01741 TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
01742 : (TEventList *)0;
01743 if (fProof->fProtocol > 14) {
01744 mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
01745 } else {
01746 mesg << set << fn << fInput << opt << num << fst << evl << sync;
01747 if (enl)
01748
01749 Warning("Process","entry lists not supported by the server");
01750 }
01751
01752
01753 fProof->ResetMergePrg();
01754
01755 PDB(kGlobal,1) Info("Process","Calling Broadcast");
01756 fProof->Broadcast(mesg);
01757
01758
01759 if (fProof->fProtocol < 13)
01760 dset->SetWriteV3(kFALSE);
01761
01762
01763 if (IsClient())
01764 fProof->fRedirLog = kTRUE;
01765
01766 if (!IsClient()){
01767
01768 Info("Process|Svc", "Start merging Memory information");
01769 }
01770
01771 if (!sync) {
01772 if (IsClient()) {
01773
01774
01775 PDB(kGlobal,1) Info("Process","Asynchronous processing:"
01776 " activating CollectInputFrom");
01777 fProof->Activate();
01778
01779
01780 fProof->Collect();
01781
01782 return fProof->fSeqNum;
01783
01784 } else {
01785 PDB(kGlobal,1) Info("Process","Calling Collect");
01786 fProof->Collect();
01787
01788 HandleTimer(0);
01789 StopFeedback();
01790
01791 return Finalize(kFALSE,sync);
01792 }
01793 } else {
01794
01795 PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
01796 fProof->Collect();
01797 if (!(fProof->IsSync())) {
01798
01799 Info("Process", "switching to the asynchronous mode ...");
01800 return fProof->fSeqNum;
01801 }
01802
01803
01804
01805 if (IsClient())
01806 fProof->fRedirLog = kFALSE;
01807
01808 if (!IsClient()) {
01809 HandleTimer(0);
01810
01811 if (fPacketizer && fQuery)
01812 fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
01813 fPacketizer->GetInitTime(),
01814 fPacketizer->GetProcTime());
01815 }
01816 StopFeedback();
01817
01818 if (!IsClient() || GetExitStatus() != TProofPlayer::kAborted)
01819 return Finalize(kFALSE,sync);
01820 else
01821 return -1;
01822 }
01823 }
01824
01825
01826 Bool_t TProofPlayerRemote::MergeOutputFiles()
01827 {
01828
01829
01830 TList *rmList = 0;
01831 if (fMergeFiles) {
01832 TIter nxo(fOutput);
01833 TObject *o = 0;
01834 TProofOutputFile *pf = 0;
01835 while ((o = nxo())) {
01836 if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
01837
01838 if (pf->IsMerge()) {
01839
01840
01841 TFileMerger *filemerger = pf->GetFileMerger();
01842 if (!filemerger) {
01843 Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
01844 pf->Print();
01845 continue;
01846 }
01847
01848 if (!filemerger->OutputFile(pf->GetOutputFileName())) {
01849 Error("MergeOutputFiles", "cannot open the output file");
01850 continue;
01851 }
01852
01853 if (!pf->IsMerged()) {
01854 TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
01855 filemerger->AddFile(fileLoc);
01856 }
01857
01858 if (!filemerger->Merge()) {
01859 Error("MergeOutputFiles", "cannot merge the output files");
01860 continue;
01861 }
01862
01863 TList *fileList = filemerger->GetMergeList();
01864 if (fileList) {
01865 TIter next(fileList);
01866 TObjString *url = 0;
01867 while((url = (TObjString*)next())) {
01868 gSystem->Unlink(url->GetString());
01869 }
01870 }
01871
01872 filemerger->Reset();
01873
01874 } else {
01875
01876
01877 TFileCollection *fc = pf->GetFileCollection();
01878 if (!fc) {
01879 Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
01880 pf->Print();
01881 continue;
01882 }
01883
01884
01885 fOutput->Add(fc);
01886
01887 pf->ResetFileCollection();
01888
01889 if (pf->IsRegister()) {
01890 TString opt;
01891 if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
01892 if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
01893 if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
01894 fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
01895 TString tag = TString::Format("DATASET_%s", pf->GetTitle());
01896 fOutput->Add(new TNamed(tag, opt));
01897 }
01898
01899 fOutput->Remove(pf);
01900 if (!rmList) rmList = new TList;
01901 rmList->Add(pf);
01902 }
01903 }
01904 }
01905 }
01906
01907
01908 if (rmList && rmList->GetSize() > 0) {
01909 TIter nxo(rmList);
01910 TObject *o = 0;
01911 while((o = nxo())) {
01912 fOutput->Remove(o);
01913 }
01914 rmList->SetOwner(kTRUE);
01915 delete rmList;
01916 }
01917
01918
01919 return kTRUE;
01920 }
01921
01922
01923
01924 void TProofPlayerRemote::SetSelectorDataMembersFromOutputList()
01925 {
01926
01927
01928
01929 TOutputListSelectorDataMap* olsdm
01930 = TOutputListSelectorDataMap::FindInList(fOutput);
01931 if (!olsdm) {
01932 PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList","Failed to find map object in output list!");
01933 return;
01934 }
01935
01936 olsdm->SetDataMembers(fSelector);
01937 }
01938
01939
01940 Long64_t TProofPlayerRemote::Finalize(Bool_t force, Bool_t sync)
01941 {
01942
01943
01944
01945
01946 if (IsClient()) {
01947 if (fOutputLists == 0) {
01948 if (force)
01949 if (fQuery)
01950 return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
01951 fQuery->GetName()), force);
01952 } else {
01953
01954 PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
01955 MergeOutput();
01956 }
01957 }
01958
01959 Long64_t rv = 0;
01960 if (fProof->IsMaster()) {
01961 TPerfStats::Stop();
01962
01963 PDB(kOutput,1) Info("Finalize","Calling Merge Output");
01964
01965
01966 MergeOutput();
01967
01968
01969 MergeOutputFiles();
01970
01971 fOutput->SetOwner();
01972
01973
01974 if (fPacketizer) {
01975 TObject *pperf = (TObject *) fPacketizer->GetProgressPerf(kTRUE);
01976 if (pperf) fOutput->Add(pperf);
01977 TList *parms = fPacketizer->GetConfigParams(kTRUE);
01978 if (parms) {
01979 TIter nxo(parms);
01980 TObject *o = 0;
01981 while ((o = nxo())) fOutput->Add(o);
01982 }
01983
01984
01985
01986 TDSetElement *elem = 0;
01987 if (fPacketizer->GetFailedPackets()) {
01988 TString type = (fPacketizer->TestBit(TVirtualPacketizer::kIsTree)) ? "TTree" : "";
01989 TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
01990 if (!listOfMissingFiles) {
01991 listOfMissingFiles = new TList;
01992 listOfMissingFiles->SetName("MissingFiles");
01993 }
01994 TIter nxe(fPacketizer->GetFailedPackets());
01995 while ((elem = (TDSetElement *)nxe()))
01996 listOfMissingFiles->Add(elem->GetFileInfo(type));
01997 if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
01998 }
01999 }
02000
02001 SafeDelete(fSelector);
02002 } else {
02003 if (fExitStatus != kAborted) {
02004
02005 if (!sync) {
02006
02007
02008
02009
02010 if (ReinitSelector(fQuery) == -1) {
02011 Info("Finalize", "problems reinitializing selector \"%s\"",
02012 fQuery->GetSelecImp()->GetName());
02013 return -1;
02014 }
02015 }
02016
02017 if (fPacketizer)
02018 if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
02019 fPacketizer->SetFailedPackets(0);
02020 failedPackets->SetName("FailedPackets");
02021 AddOutputObject(failedPackets);
02022
02023 TStatus *status = (TStatus *)GetOutput("PROOF_Status");
02024 if (!status) {
02025 status = new TStatus();
02026 AddOutputObject(status);
02027 }
02028 status->Add("Some packets were not processed! Check the the"
02029 " 'FailedPackets' list in the output list");
02030 }
02031
02032
02033 fSelector->SetInputList(fInput);
02034
02035 TIter next(fOutput);
02036 TList *output = fSelector->GetOutputList();
02037 while(TObject* obj = next()) {
02038 if (fProof->IsParallel() || DrawCanvas(obj) == 1)
02039
02040
02041 output->Add(obj);
02042 }
02043
02044 SetSelectorDataMembersFromOutputList();
02045
02046 PDB(kLoop,1) Info("Finalize","Call Terminate()");
02047 fOutput->Clear("nodelete");
02048 fSelector->Terminate();
02049
02050 rv = fSelector->GetStatus();
02051
02052
02053 TIter it(output);
02054 while(TObject* o = it()) {
02055 fOutput->Add(o);
02056 }
02057
02058
02059 if (fQuery) {
02060 fQuery->SetOutputList(fOutput);
02061
02062 fQuery->SetFinalized();
02063 } else {
02064 Warning("Finalize","current TQueryResult object is undefined!");
02065 }
02066
02067
02068
02069
02070 output->SetOwner(kFALSE);
02071 SafeDelete(fSelector);
02072
02073
02074
02075 fOutput->SetOwner(kFALSE);
02076 SafeDelete(fOutput);
02077 }
02078 }
02079 PDB(kGlobal,1) Info("Process","exit");
02080
02081 if (!IsClient()) {
02082 Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
02083 }
02084 fProof->FinalizationDone();
02085
02086 return rv;
02087 }
02088
02089
02090 Long64_t TProofPlayerRemote::Finalize(TQueryResult *qr)
02091 {
02092
02093
02094 PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
02095
02096 if (!IsClient()) {
02097 Info("Finalize(TQueryResult *)",
02098 "method to be executed only on the clients");
02099 return -1;
02100 }
02101
02102 if (!qr) {
02103 Info("Finalize(TQueryResult *)", "query undefined");
02104 return -1;
02105 }
02106
02107 if (qr->IsFinalized()) {
02108 Info("Finalize(TQueryResult *)", "query already finalized");
02109 return -1;
02110 }
02111
02112
02113 if (!fOutput)
02114 fOutput = new TList;
02115 else
02116 fOutput->Clear();
02117
02118
02119 if (fOutputLists) {
02120 fOutputLists->Delete();
02121 delete fOutputLists;
02122 fOutputLists = 0;
02123 }
02124
02125
02126 gSystem->RedirectOutput(fProof->fLogFileName);
02127
02128
02129 TList *tmp = (TList *) qr->GetOutputList();
02130 if (!tmp) {
02131 gSystem->RedirectOutput(0);
02132 Info("Finalize(TQueryResult *)", "ouputlist is empty");
02133 return -1;
02134 }
02135 TList *out = fOutput;
02136 if (fProof->fProtocol < 11)
02137 out = new TList;
02138 TIter nxo(tmp);
02139 TObject *o = 0;
02140 while ((o = nxo()))
02141 out->Add(o->Clone());
02142
02143
02144 if (fProof->fProtocol < 11) {
02145 out->SetOwner();
02146 StoreOutput(out);
02147 }
02148 gSystem->RedirectOutput(0);
02149
02150 SetSelectorDataMembersFromOutputList();
02151
02152
02153 SetCurrentQuery(qr);
02154 Long64_t rc = Finalize();
02155 RestorePreviousQuery();
02156
02157 return rc;
02158 }
02159
02160
02161 Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
02162 {
02163
02164
02165
02166 if (!selector_file) {
02167 Info("SendSelector", "Invalid input: selector (file) name undefined");
02168 return kFALSE;
02169 }
02170
02171 if (!strchr(gSystem->BaseName(selector_file), '.')) {
02172 if (gDebug > 1)
02173 Info("SendSelector", "selector name '%s' does not contain a '.':"
02174 " nothing to send, it will be loaded from a library", selector_file);
02175 return kTRUE;
02176 }
02177
02178
02179 TString selec = selector_file;
02180 TString aclicMode;
02181 TString arguments;
02182 TString io;
02183 selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
02184
02185
02186 gSystem->ExpandPathName(selec);
02187
02188
02189 TString mp(TROOT::GetMacroPath());
02190 TString np(gSystem->DirName(selec));
02191 if (!np.IsNull()) {
02192 np += ":";
02193 if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
02194 Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
02195 mp.Insert(ip, np);
02196 TROOT::SetMacroPath(mp);
02197 if (gDebug > 0)
02198 Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
02199 }
02200 }
02201
02202
02203 TString header = selec;
02204 header.Remove(header.Last('.'));
02205 header += ".h";
02206 if (gSystem->AccessPathName(header, kReadPermission)) {
02207 TString h = header;
02208 header.Remove(header.Last('.'));
02209 header += ".hh";
02210 if (gSystem->AccessPathName(header, kReadPermission)) {
02211 Info("SendSelector",
02212 "header file not found: tried: %s %s", h.Data(), header.Data());
02213 return kFALSE;
02214 }
02215 }
02216
02217
02218 if (fProof->SendFile(selec, (TProof::kBinary | TProof::kForward | TProof::kCp | TProof::kCpBin)) == -1) {
02219 Info("SendSelector", "problems sending implementation file %s", selec.Data());
02220 return kFALSE;
02221 }
02222 if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
02223 Info("SendSelector", "problems sending header file %s", header.Data());
02224 return kFALSE;
02225 }
02226
02227 return kTRUE;
02228 }
02229
02230
02231 void TProofPlayerRemote::MergeOutput()
02232 {
02233
02234
02235 PDB(kOutput,1) Info("MergeOutput","Enter");
02236
02237 if (fOutputLists == 0) {
02238 PDB(kOutput,1) Info("MergeOutput","Leave (no output)");
02239 return;
02240 }
02241
02242 TIter next(fOutputLists);
02243
02244 TList *list;
02245 while ( (list = (TList *) next()) ) {
02246
02247 TObject *obj = fOutput->FindObject(list->GetName());
02248
02249 if (obj == 0) {
02250 obj = list->First();
02251 list->Remove(obj);
02252 fOutput->Add(obj);
02253 }
02254
02255 if ( list->IsEmpty() ) continue;
02256
02257 TMethodCall callEnv;
02258 if (obj->IsA())
02259 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
02260 if (callEnv.IsValid()) {
02261 callEnv.SetParam((Long_t) list);
02262 callEnv.Execute(obj);
02263 } else {
02264
02265 while ( (obj = list->First()) ) {
02266 fOutput->Add(obj);
02267 list->Remove(obj);
02268 }
02269 }
02270 }
02271
02272 SafeDelete(fOutputLists);
02273
02274 PDB(kOutput,1) Info("MergeOutput","Leave (%d object(s))", fOutput->GetSize());
02275 }
02276
02277
02278 void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed)
02279 {
02280
02281
02282 if (IsClient()) {
02283 fProof->Progress(total, processed);
02284 } else {
02285
02286 TMessage m(kPROOF_PROGRESS);
02287 m << total << processed;
02288 gProofServ->GetSocket()->Send(m);
02289 }
02290 }
02291
02292
02293 void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed,
02294 Long64_t bytesread,
02295 Float_t initTime, Float_t procTime,
02296 Float_t evtrti, Float_t mbrti)
02297 {
02298
02299
02300 PDB(kGlobal,1)
02301 Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
02302 initTime, procTime, evtrti, mbrti);
02303
02304 if (IsClient()) {
02305 fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
02306 } else {
02307
02308 TMessage m(kPROOF_PROGRESS);
02309 m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
02310 gProofServ->GetSocket()->Send(m);
02311 }
02312 }
02313
02314
02315 void TProofPlayerRemote::Progress(TProofProgressInfo *pi)
02316 {
02317
02318
02319 if (pi) {
02320 PDB(kGlobal,1)
02321 Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
02322 pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
02323 pi->fActWorkers, pi->fEffSessions);
02324
02325 if (IsClient()) {
02326 fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
02327 pi->fInitTime, pi->fProcTime,
02328 pi->fEvtRateI, pi->fMBRateI,
02329 pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
02330 } else {
02331
02332 TMessage m(kPROOF_PROGRESS);
02333 m << pi;
02334 gProofServ->GetSocket()->Send(m);
02335 }
02336 } else {
02337 Warning("Progress","TProofProgressInfo object undefined!");
02338 }
02339 }
02340
02341
02342
02343 void TProofPlayerRemote::Feedback(TList *objs)
02344 {
02345
02346
02347 fProof->Feedback(objs);
02348 }
02349
02350
02351 void TProofPlayerRemote::StopProcess(Bool_t abort, Int_t)
02352 {
02353
02354
02355 if (fPacketizer != 0)
02356 fPacketizer->StopProcess(abort);
02357 if (abort == kTRUE)
02358 fExitStatus = kAborted;
02359 else
02360 fExitStatus = kStopped;
02361 }
02362
02363
02364 Int_t TProofPlayerRemote::AddOutputObject(TObject *obj)
02365 {
02366
02367
02368
02369
02370
02371
02372
02373
02374 PDB(kOutput,1)
02375 Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
02376
02377
02378 if (!obj) {
02379 PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
02380 return -1;
02381 }
02382
02383
02384 if (!fOutput)
02385 fOutput = new TList;
02386
02387
02388 Bool_t merged = kTRUE;
02389
02390
02391 TList *elists = dynamic_cast<TList *> (obj);
02392 if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
02393
02394
02395
02396 TEventList *evlist = new TEventList("PROOF_EventList");
02397
02398
02399 TIter nxevl(elists);
02400 TEventList *evl = 0;
02401 while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
02402
02403
02404
02405 TIter nxelem(fDSet->GetListOfElements());
02406 TDSetElement *elem = 0;
02407 while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
02408 if (!strcmp(elem->GetFileName(), evl->GetName()))
02409 break;
02410 }
02411 if (!elem) {
02412 Error("AddOutputObject", "Found an event list for %s, but no object with"
02413 " the same name in the TDSet", evl->GetName());
02414 continue;
02415 }
02416 Long64_t offset = elem->GetTDSetOffset();
02417
02418
02419 Long64_t *arr = evl->GetList();
02420 Int_t num = evl->GetN();
02421 if (arr && offset > 0)
02422 for (Int_t i = 0; i < num; i++)
02423 arr[i] += offset;
02424
02425
02426 evlist->Add(evl);
02427 }
02428
02429
02430 SetLastMergingMsg(evlist);
02431 Incorporate(evlist, fOutput, merged);
02432 NotifyMemory(evlist);
02433
02434
02435 if (merged)
02436 SafeDelete(evlist);
02437
02438
02439
02440 return 1;
02441 }
02442
02443
02444 TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
02445 if (pf) {
02446 fMergeFiles = kTRUE;
02447 if (!IsClient()) {
02448 if (pf->IsMerge()) {
02449
02450 if (strlen(pf->GetOutputFileName()) <= 0) {
02451 TString of(Form("root://%s", gSystem->HostName()));
02452 if (gSystem->Getenv("XRDPORT")) {
02453 TString sp(gSystem->Getenv("XRDPORT"));
02454 if (sp.IsDigit())
02455 of += Form(":%s", sp.Data());
02456 }
02457 TString sessionPath(gProofServ->GetSessionDir());
02458
02459 TString pfx = gEnv->GetValue("Path.Localroot","");
02460 if (!pfx.IsNull())
02461 sessionPath.Remove(0, pfx.Length());
02462 of += Form("/%s/%s", sessionPath.Data(), pf->GetFileName());
02463 pf->SetOutputFileName(of);
02464 }
02465
02466 if (gDebug > 0)
02467 pf->Print();
02468 }
02469 } else {
02470
02471 Printf("Output file: %s", pf->GetOutputFileName());
02472 }
02473 }
02474
02475
02476 SetLastMergingMsg(obj);
02477 Incorporate(obj, fOutput, merged);
02478 NotifyMemory(obj);
02479
02480
02481 return (merged ? 1 : 0);
02482 }
02483
02484
02485 void TProofPlayerRemote::RedirectOutput(Bool_t on)
02486 {
02487
02488
02489 if (on && fProof && fProof->fLogFileW) {
02490 TProofServ::SetErrorHandlerFile(fProof->fLogFileW);
02491 fErrorHandler = SetErrorHandler(TProofServ::ErrorHandler);
02492 } else if (!on) {
02493 if (fErrorHandler) {
02494 TProofServ::SetErrorHandlerFile(0);
02495 SetErrorHandler(fErrorHandler);
02496 }
02497 }
02498 }
02499
02500
02501 void TProofPlayerRemote::AddOutput(TList *out)
02502 {
02503
02504
02505
02506
02507
02508 PDB(kOutput,1) Info("AddOutput","Enter");
02509
02510
02511 if (!out) {
02512 PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
02513 return;
02514 }
02515
02516
02517 if (!fOutput)
02518 fOutput = new TList;
02519
02520
02521 Bool_t merged = kTRUE;
02522 TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
02523 if (elists) {
02524
02525
02526
02527 TEventList *evlist = new TEventList("PROOF_EventList");
02528
02529
02530 TIter nxevl(elists);
02531 TEventList *evl = 0;
02532 while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
02533
02534
02535
02536 TIter nxelem(fDSet->GetListOfElements());
02537 TDSetElement *elem = 0;
02538 while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
02539 if (!strcmp(elem->GetFileName(), evl->GetName()))
02540 break;
02541 }
02542 if (!elem) {
02543 Error("AddOutput", "Found an event list for %s, but no object with"
02544 " the same name in the TDSet", evl->GetName());
02545 continue;
02546 }
02547 Long64_t offset = elem->GetTDSetOffset();
02548
02549
02550 Long64_t *arr = evl->GetList();
02551 Int_t num = evl->GetN();
02552 if (arr && offset > 0)
02553 for (Int_t i = 0; i < num; i++)
02554 arr[i] += offset;
02555
02556
02557 evlist->Add(evl);
02558 }
02559
02560
02561
02562 out->Remove(elists);
02563 delete elists;
02564
02565
02566 SetLastMergingMsg(evlist);
02567 Incorporate(evlist, fOutput, merged);
02568 NotifyMemory(evlist);
02569 }
02570
02571
02572 TIter nxo(out);
02573 TObject *obj = 0;
02574 while ((obj = nxo())) {
02575 SetLastMergingMsg(obj);
02576 Incorporate(obj, fOutput, merged);
02577
02578
02579 if (!merged)
02580 out->Remove(obj);
02581 NotifyMemory(obj);
02582 }
02583
02584
02585 return;
02586 }
02587
02588
02589 void TProofPlayerRemote::NotifyMemory(TObject *obj)
02590 {
02591
02592
02593
02594 if (fProof && (!IsClient() || fProof->IsLite())){
02595 ProcInfo_t pi;
02596 if (!gSystem->GetProcInfo(&pi)){
02597
02598
02599 RedirectOutput(fProof->IsLite());
02600 Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
02601 pi.fMemVirtual, pi.fMemResident, obj->GetName());
02602 RedirectOutput(0);
02603 }
02604 }
02605 }
02606
02607
02608 void TProofPlayerRemote::SetLastMergingMsg(TObject *obj)
02609 {
02610
02611
02612 TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
02613 TProofServ::SetLastMsg(lastMsg);
02614 }
02615
02616
02617 Int_t TProofPlayerRemote::Incorporate(TObject *newobj, TList *outlist, Bool_t &merged)
02618 {
02619
02620
02621
02622
02623
02624
02625
02626
02627
02628 merged = kTRUE;
02629
02630 PDB(kOutput,1)
02631 Info("Incorporate", "enter: obj: %p (%s), list: %p",
02632 newobj, newobj ? newobj->ClassName() : "undef", outlist);
02633
02634
02635 if (!newobj || !outlist) {
02636 Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
02637 return -1;
02638 }
02639
02640
02641 Bool_t specialH =
02642 (!fProof || !fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) ? kTRUE : kFALSE;
02643 if (specialH && newobj->InheritsFrom(TH1::Class())) {
02644 if (!HandleHistogram(newobj)) {
02645 PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
02646 " appropriate list for delayed merging", newobj->GetName());
02647 merged = kFALSE;
02648 return 0;
02649 }
02650 }
02651
02652
02653 TObject *obj = outlist->FindObject(newobj->GetName());
02654
02655
02656 if (!obj) {
02657 outlist->Add(newobj);
02658 merged = kFALSE;
02659
02660 return 0;
02661 }
02662
02663
02664 TMethodCall callEnv;
02665 if (obj->IsA())
02666 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
02667 if (callEnv.IsValid()) {
02668
02669 static TList *xlist = new TList;
02670 xlist->Add(newobj);
02671
02672 callEnv.SetParam((Long_t) xlist);
02673 callEnv.Execute(obj);
02674
02675 xlist->Clear();
02676 } else {
02677
02678 outlist->Add(newobj);
02679 merged = kFALSE;
02680 }
02681
02682
02683 return 0;
02684 }
02685
02686
02687 TObject *TProofPlayerRemote::HandleHistogram(TObject *obj)
02688 {
02689
02690
02691 TH1 *h = dynamic_cast<TH1 *>(obj);
02692 if (!h) {
02693
02694 return obj;
02695 }
02696
02697
02698 Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
02699
02700
02701 Int_t nent = h->GetBufferLength();
02702 PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
02703 h->GetName(), nent, h->GetBufferSize());
02704
02705
02706 TList *list = 0;
02707 if (!fOutputLists) {
02708 PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
02709 fOutputLists = new TList;
02710 fOutputLists->SetOwner();
02711 }
02712 list = (TList *) fOutputLists->FindObject(h->GetName());
02713
02714 TH1 *href = 0;
02715 if (tobebinned) {
02716
02717
02718
02719
02720 if (!list) {
02721
02722 list = new TList;
02723 list->SetName(h->GetName());
02724 list->SetOwner();
02725 fOutputLists->Add(list);
02726
02727 if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
02728 fOutput->Remove(href);
02729 list->Add(href);
02730 }
02731 }
02732 TIter nxh(list);
02733 while ((href = (TH1 *) nxh())) {
02734 if (href->GetBuffer() && href->GetBufferLength() < nent) break;
02735 }
02736 if (href) {
02737 list->AddBefore(href, h);
02738 } else {
02739 list->Add(h);
02740 }
02741
02742 return (TObject *)0;
02743
02744 } else {
02745
02746 if (list) {
02747 TIter nxh(list);
02748 while ((href = (TH1 *) nxh())) {
02749 if (href->GetBuffer() || href->GetEntries() < nent) break;
02750 }
02751 if (href) {
02752 list->AddBefore(href, h);
02753 } else {
02754 list->Add(h);
02755 }
02756
02757 return (TObject *)0;
02758
02759 } else {
02760
02761 Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
02762 if (gProofServ && hsz > gProofServ->GetMsgSizeHWM()) {
02763
02764 return obj;
02765 } else {
02766
02767
02768 list = new TList;
02769 list->SetName(h->GetName());
02770 list->SetOwner();
02771 fOutputLists->Add(list);
02772 list->Add(h);
02773
02774 return (TObject *)0;
02775 }
02776 }
02777 }
02778 PDB(kOutput,1) Info("HandleHistogram", "leaving");
02779 }
02780
02781
02782 void TProofPlayerRemote::StoreOutput(TList *out)
02783 {
02784
02785
02786 PDB(kOutput,1) Info("StoreOutput","Enter");
02787
02788 if ( out == 0 ) {
02789 PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
02790 return;
02791 }
02792
02793 TIter next(out);
02794 out->SetOwner(kFALSE);
02795
02796 if (fOutputLists == 0) {
02797 PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
02798 fOutputLists = new TList;
02799 fOutputLists->SetOwner();
02800 }
02801
02802 TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
02803 if (lists) {
02804 out->Remove(lists);
02805 TEventList *mainList = new TEventList("PROOF_EventList");
02806 out->Add(mainList);
02807 TIter it(lists);
02808 TEventList *aList;
02809 while ( (aList = dynamic_cast<TEventList*> (it())) ) {
02810
02811 TIter nxe(fDSet->GetListOfElements());
02812 TDSetElement *elem;
02813 while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
02814 if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
02815 break;
02816 }
02817 if (!elem) {
02818 Error("StoreOutput", "found the EventList for %s, but no object with that name "
02819 "in the TDSet", aList->GetName());
02820 continue;
02821 }
02822 Long64_t offset = elem->GetTDSetOffset();
02823
02824
02825 Long64_t *arr = aList->GetList();
02826 Int_t num = aList->GetN();
02827 if (arr && offset)
02828 for (int i = 0; i < num; i++)
02829 arr[i] += offset;
02830
02831 mainList->Add(aList);
02832 }
02833 delete lists;
02834 }
02835
02836 TObject *obj;
02837 while( (obj = next()) ) {
02838 PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
02839
02840 TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
02841 if ( list == 0 ) {
02842 PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
02843 list = new TList;
02844 list->SetName( obj->GetName() );
02845 list->SetOwner();
02846 fOutputLists->Add( list );
02847 }
02848 list->Add( obj );
02849 }
02850
02851 delete out;
02852 PDB(kOutput,1) Info("StoreOutput", "leave");
02853 }
02854
02855
02856 TList *TProofPlayerRemote::MergeFeedback()
02857 {
02858
02859
02860 PDB(kFeedback,1)
02861 Info("MergeFeedback","Enter");
02862
02863 if ( fFeedbackLists == 0 ) {
02864 PDB(kFeedback,1)
02865 Info("MergeFeedback","Leave (no output)");
02866 return 0;
02867 }
02868
02869 TList *fb = new TList;
02870 fb->SetOwner();
02871
02872 TIter next(fFeedbackLists);
02873
02874 TMap *map;
02875 while ( (map = (TMap*) next()) ) {
02876
02877 PDB(kFeedback,2)
02878 Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
02879
02880
02881
02882 TList *list = new TList;
02883 TIter keys(map);
02884
02885 #ifndef R__TH1MERGEFIXED
02886 Int_t nbmx = -1;
02887 TObject *oref = 0;
02888 #endif
02889 while ( TObject *key = keys() ) {
02890 TObject *o = map->GetValue(key);
02891 TH1 *h = dynamic_cast<TH1 *>(o);
02892 #ifndef R__TH1MERGEFIXED
02893
02894
02895
02896
02897 if (h && !strncmp(o->GetName(),"PROOF_",6)) {
02898 if (h->GetNbinsX() > nbmx) {
02899 nbmx= h->GetNbinsX();
02900 oref = o;
02901 }
02902 }
02903 #endif
02904 if (h) {
02905 TIter nxh(list);
02906 TH1 *href= 0;
02907 while ((href = (TH1 *)nxh())) {
02908 if (h->GetBuffer()) {
02909 if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
02910 } else {
02911 if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
02912 }
02913 }
02914 if (href) {
02915 list->AddBefore(href, h);
02916 } else {
02917 list->Add(h);
02918 }
02919 } else {
02920 list->Add(o);
02921 }
02922 }
02923
02924
02925 #ifdef R__TH1MERGEFIXED
02926 TObject *obj = list->First();
02927 #else
02928 TObject *obj = (oref) ? oref : list->First();
02929 #endif
02930 list->Remove(obj);
02931 obj = obj->Clone();
02932 fb->Add(obj);
02933
02934 if ( list->IsEmpty() ) {
02935 delete list;
02936 continue;
02937 }
02938
02939
02940 TMethodCall callEnv;
02941 if (obj->IsA())
02942 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
02943 if (callEnv.IsValid()) {
02944 callEnv.SetParam((Long_t) list);
02945 callEnv.Execute(obj);
02946 } else {
02947
02948 while ( (obj = list->First()) ) {
02949 fb->Add(obj->Clone());
02950 list->Remove(obj);
02951 }
02952 }
02953
02954 delete list;
02955 }
02956
02957 PDB(kFeedback,1)
02958 Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
02959
02960 return fb;
02961 }
02962
02963
02964 void TProofPlayerRemote::StoreFeedback(TObject *slave, TList *out)
02965 {
02966
02967
02968 PDB(kFeedback,1)
02969 Info("StoreFeedback","Enter");
02970
02971 if ( out == 0 ) {
02972 PDB(kFeedback,1)
02973 Info("StoreFeedback","Leave (empty)");
02974 return;
02975 }
02976
02977 if ( IsClient() ) {
02978
02979 Feedback(out);
02980 delete out;
02981 return;
02982 }
02983
02984 if (fFeedbackLists == 0) {
02985 PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
02986 fFeedbackLists = new TList;
02987 fFeedbackLists->SetOwner();
02988 }
02989
02990 TIter next(out);
02991 out->SetOwner(kFALSE);
02992
02993 const char *ord = ((TSlave*) slave)->GetOrdinal();
02994
02995 TObject *obj;
02996 while( (obj = next()) ) {
02997 PDB(kFeedback,2)
02998 Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
02999 TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
03000 if ( map == 0 ) {
03001 PDB(kFeedback,2)
03002 Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
03003
03004 map = new TMap;
03005 map->SetName(obj->GetName());
03006 fFeedbackLists->Add(map);
03007 } else {
03008 PDB(kFeedback,2)
03009 Info("StoreFeedback","%s: removing previous value", ord);
03010 if (map->GetValue(slave))
03011 delete map->GetValue(slave);
03012 map->Remove(slave);
03013 }
03014 map->Add(slave, obj);
03015 PDB(kFeedback,2)
03016 Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
03017 }
03018
03019 delete out;
03020 PDB(kFeedback,1)
03021 Info("StoreFeedback","Leave");
03022 }
03023
03024
03025 void TProofPlayerRemote::SetupFeedback()
03026 {
03027
03028
03029 if (IsClient()) return;
03030
03031 fFeedback = (TList*) fInput->FindObject("FeedbackList");
03032
03033 PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
03034 fFeedback == 0 ? "NOT ":"");
03035
03036 if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
03037
03038
03039 SafeDelete(fFeedbackTimer);
03040 fFeedbackPeriod = 2000;
03041 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
03042 fFeedbackTimer = new TTimer;
03043 fFeedbackTimer->SetObject(this);
03044 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03045 }
03046
03047
03048 void TProofPlayerRemote::StopFeedback()
03049 {
03050
03051
03052 if (fFeedbackTimer == 0) return;
03053
03054 PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
03055
03056 SafeDelete(fFeedbackTimer);
03057 }
03058
03059
03060 Bool_t TProofPlayerRemote::HandleTimer(TTimer *)
03061 {
03062
03063
03064 PDB(kFeedback,2) Info("HandleTimer","Entry");
03065
03066 if (fFeedbackTimer == 0) return kFALSE;
03067
03068
03069
03070 TList *fb = new TList;
03071 fb->SetOwner();
03072
03073 TIter next(fFeedback);
03074 while( TObjString *name = (TObjString*) next() ) {
03075 TObject *o = fOutput->FindObject(name->GetName());
03076 if (o != 0) {
03077 fb->Add(o->Clone());
03078
03079 TMap *m = 0;
03080 if (fFeedbackLists &&
03081 (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
03082 fFeedbackLists->Remove(m);
03083 m->DeleteValues();
03084 delete m;
03085 }
03086 }
03087 }
03088
03089 if (fb->GetSize() > 0) {
03090 StoreFeedback(this, fb);
03091 } else {
03092 delete fb;
03093 }
03094
03095 if (fFeedbackLists == 0) {
03096 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03097 return kFALSE;
03098 }
03099
03100 fb = MergeFeedback();
03101
03102 PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
03103
03104 TMessage m(kPROOF_FEEDBACK);
03105 m << fb;
03106
03107
03108 gProofServ->GetSocket()->Send(m);
03109
03110 delete fb;
03111
03112 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03113
03114 return kFALSE;
03115 }
03116
03117
03118 TDSetElement *TProofPlayerRemote::GetNextPacket(TSlave *slave, TMessage *r)
03119 {
03120
03121
03122
03123 SetInitTime();
03124
03125 TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
03126
03127 if (e == 0) {
03128 PDB(kPacketizer,2) Info("GetNextPacket","%s: done!", slave->GetOrdinal());
03129 } else if (e == (TDSetElement*) -1) {
03130 PDB(kPacketizer,2) Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
03131 } else {
03132 PDB(kPacketizer,2)
03133 Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
03134 slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
03135 e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
03136 }
03137
03138 return e;
03139 }
03140
03141
03142 Bool_t TProofPlayerRemote::IsClient() const
03143 {
03144
03145
03146 return fProof ? fProof->TestBit(TProof::kIsClient) : kFALSE;
03147 }
03148
03149
03150 Long64_t TProofPlayerRemote::DrawSelect(TDSet *set, const char *varexp,
03151 const char *selection, Option_t *option,
03152 Long64_t nentries, Long64_t firstentry)
03153 {
03154
03155
03156
03157 if (!fgDrawInputPars) {
03158 fgDrawInputPars = new THashList;
03159 fgDrawInputPars->Add(new TObjString("FeedbackList"));
03160 fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
03161 fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
03162 fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
03163 fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
03164 fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
03165 fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
03166 fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
03167 fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
03168 }
03169
03170 TString selector, objname;
03171 if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
03172 Error("DrawSelect", "parsing arguments");
03173 return -1;
03174 }
03175
03176 TNamed *varexpobj = new TNamed("varexp", varexp);
03177 TNamed *selectionobj = new TNamed("selection", selection);
03178
03179
03180 TObject *o = 0;
03181 TList *savedInput = new TList;
03182 TIter nxi(fInput);
03183 while ((o = nxi())) {
03184 savedInput->Add(o);
03185 TString n(o->GetName());
03186 if (fgDrawInputPars && !fgDrawInputPars->FindObject(o->GetName())) fInput->Remove(o);
03187 }
03188
03189 fInput->Add(varexpobj);
03190 fInput->Add(selectionobj);
03191
03192
03193 if (objname == "") objname = "htemp";
03194
03195 fProof->AddFeedback(objname);
03196 Long64_t r = Process(set, selector, option, nentries, firstentry);
03197 fProof->RemoveFeedback(objname);
03198
03199 fInput->Remove(varexpobj);
03200 fInput->Remove(selectionobj);
03201 if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
03202 fInput->Remove(opt);
03203 delete opt;
03204 }
03205
03206 delete varexpobj;
03207 delete selectionobj;
03208
03209
03210 fInput->Clear();
03211 TIter nxsi(savedInput);
03212 while ((o = nxsi()))
03213 fInput->Add(o);
03214 savedInput->SetOwner(kFALSE);
03215 delete savedInput;
03216
03217 return r;
03218 }
03219
03220
03221 void TProofPlayerRemote::SetInitTime()
03222 {
03223
03224
03225 if (fPacketizer)
03226 fPacketizer->SetInitTime();
03227 }
03228
03229
03230
03231
03232 ClassImp(TProofPlayerSlave)
03233
03234
03235 void TProofPlayerSlave::SetupFeedback()
03236 {
03237
03238
03239 TList *fb = (TList*) fInput->FindObject("FeedbackList");
03240 if (fb) {
03241 PDB(kFeedback,1)
03242 Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
03243 } else {
03244 PDB(kFeedback,1)
03245 Info("SetupFeedback","\"FeedbackList\" NOT found");
03246 }
03247
03248 if (fb == 0 || fb->GetSize() == 0) return;
03249
03250
03251
03252 SafeDelete(fFeedbackTimer);
03253 fFeedbackPeriod = 2000;
03254 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
03255 fFeedbackTimer = new TTimer;
03256 fFeedbackTimer->SetObject(this);
03257 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03258
03259 fFeedback = fb;
03260 }
03261
03262
03263 void TProofPlayerSlave::StopFeedback()
03264 {
03265
03266
03267 if (fFeedbackTimer == 0) return;
03268
03269 PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
03270
03271 SafeDelete(fFeedbackTimer);
03272 }
03273
03274
03275 Bool_t TProofPlayerSlave::HandleTimer(TTimer *)
03276 {
03277
03278
03279 PDB(kFeedback,2) Info("HandleTimer","Entry");
03280
03281
03282
03283 if (gProofServ) {
03284 Bool_t sendm = kFALSE;
03285 TMessage m(kPROOF_PROGRESS);
03286 if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
03287 sendm = kTRUE;
03288 if (gProofServ->GetProtocol() > 25) {
03289 m << GetProgressStatus();
03290 } else if (gProofServ->GetProtocol() > 11) {
03291 TProofProgressStatus *ps = GetProgressStatus();
03292 m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
03293 << (Float_t) -1. << (Float_t) ps->GetProcTime()
03294 << (Float_t) ps->GetRate() << (Float_t) -1.;
03295 } else {
03296 m << fTotalEvents << GetEventsProcessed();
03297 }
03298 }
03299 if (sendm) gProofServ->GetSocket()->Send(m);
03300 }
03301
03302 if (fFeedback == 0) return kFALSE;
03303
03304 TList *fb = new TList;
03305 fb->SetOwner(kFALSE);
03306
03307 if (fOutput == 0) {
03308 fOutput = fSelector->GetOutputList();
03309 }
03310
03311 if (fOutput) {
03312 TIter next(fFeedback);
03313 while( TObjString *name = (TObjString*) next() ) {
03314
03315 TObject *o = fOutput->FindObject(name->GetName());
03316 if (o != 0) fb->Add(o);
03317 }
03318 }
03319
03320 PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
03321
03322 TMessage m(kPROOF_FEEDBACK);
03323 m << fb;
03324
03325
03326 gProofServ->GetSocket()->Send(m);
03327
03328 delete fb;
03329
03330 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03331
03332 return kFALSE;
03333 }
03334
03335
03336 void TProofPlayerSlave::HandleGetTreeHeader(TMessage *mess)
03337 {
03338
03339
03340 TMessage answ(kPROOF_GETTREEHEADER);
03341
03342 TDSet *dset;
03343 (*mess) >> dset;
03344 dset->Reset();
03345 TDSetElement *e = dset->Next();
03346 Long64_t entries = 0;
03347 TFile *f = 0;
03348 TTree *t = 0;
03349 if (!e) {
03350 PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
03351 } else {
03352 f = TFile::Open(e->GetFileName());
03353 t = 0;
03354 if (f) {
03355 t = (TTree*) f->Get(e->GetObjName());
03356 if (t) {
03357 t->SetMaxVirtualSize(0);
03358 t->DropBaskets();
03359 entries = t->GetEntries();
03360
03361
03362 while ((e = dset->Next()) != 0) {
03363 TFile *f1 = TFile::Open(e->GetFileName());
03364 if (f1) {
03365 TTree *t1 = (TTree*) f1->Get(e->GetObjName());
03366 if (t1) {
03367 entries += t1->GetEntries();
03368 delete t1;
03369 }
03370 delete f1;
03371 }
03372 }
03373 t->SetMaxEntryLoop(entries);
03374 }
03375 }
03376 }
03377 if (t)
03378 answ << TString("Success") << t;
03379 else
03380 answ << TString("Failed") << t;
03381
03382 fSocket->Send(answ);
03383
03384 SafeDelete(t);
03385 SafeDelete(f);
03386 }
03387
03388
03389
03390
03391 ClassImp(TProofPlayerSuperMaster)
03392
03393
03394 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
03395 Option_t *option, Long64_t nentries,
03396 Long64_t first)
03397 {
03398
03399
03400
03401
03402 fProgressStatus->Reset();
03403 PDB(kGlobal,1) Info("Process","Enter");
03404
03405 TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
03406 if (!proof) return -1;
03407
03408 delete fOutput;
03409 fOutput = new TList;
03410
03411 TPerfStats::Start(fInput, fOutput);
03412
03413 if (!SendSelector(selector_file)) {
03414 Error("Process", "sending selector %s", selector_file);
03415 return -1;
03416 }
03417
03418 TCleanup clean(this);
03419 SetupFeedback();
03420
03421 if (proof->IsMaster()) {
03422
03423
03424 if (!dset->ElementsValid()) {
03425 proof->ValidateDSet(dset);
03426 if (!dset->ElementsValid()) {
03427 Error("Process", "could not validate TDSet");
03428 return -1;
03429 }
03430 }
03431
03432 TList msds;
03433 msds.SetOwner();
03434
03435 TList keyholder;
03436 keyholder.SetOwner();
03437 TList valueholder;
03438 valueholder.SetOwner();
03439
03440
03441 TIter nextslave(proof->GetListOfActiveSlaves());
03442 while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
03443 TList *submasters = 0;
03444 TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
03445 if (!msd) {
03446 submasters = new TList;
03447 submasters->SetName(sl->GetMsd());
03448 keyholder.Add(submasters);
03449 TList *setelements = new TSortedList(kSortDescending);
03450 setelements->SetName(TString(sl->GetMsd())+"_Elements");
03451 valueholder.Add(setelements);
03452 msds.Add(new TPair(submasters, setelements));
03453 } else {
03454 submasters = dynamic_cast<TList*>(msd->Key());
03455 }
03456 if (submasters) submasters->Add(sl);
03457 }
03458
03459
03460 Long64_t cur = 0;
03461 TIter nextelement(dset->GetListOfElements());
03462 while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
03463
03464 if (elem->GetNum()<1) continue;
03465
03466 if (nentries !=-1 && cur>=first+nentries) {
03467
03468 break;
03469 }
03470
03471 if (cur+elem->GetNum()-1<first) {
03472
03473 cur+=elem->GetNum();
03474 continue;
03475 }
03476
03477 if (cur<first) {
03478
03479 elem->SetNum(elem->GetNum()-(first-cur));
03480 elem->SetFirst(elem->GetFirst()+first-cur);
03481 cur=first;
03482 }
03483
03484 if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
03485 cur+=elem->GetNum();
03486 } else {
03487
03488 elem->SetNum(first+nentries-cur);
03489 cur=first+nentries;
03490 }
03491
03492 TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
03493 if (!msd) {
03494 Error("Process", "data requires mass storage domain '%s'"
03495 " which is not accessible in this proof session",
03496 elem->GetMsd());
03497 return -1;
03498 } else {
03499 TList *elements = dynamic_cast<TList*>(msd->Value());
03500 if (elements) elements->Add(elem);
03501 }
03502 }
03503
03504 TList usedmasters;
03505 TIter nextmsd(msds.MakeIterator());
03506 while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
03507 TList *submasters = dynamic_cast<TList*>(msd->Key());
03508 TList *setelements = dynamic_cast<TList*>(msd->Value());
03509
03510
03511 Int_t nmasters = submasters ? submasters->GetSize() : -1;
03512 Int_t nelements = setelements ? setelements->GetSize() : -1;
03513 for (Int_t i=0; i<nmasters; i++) {
03514
03515 Long64_t nent = 0;
03516 TDSet set(dset->GetType(), dset->GetObjName(),
03517 dset->GetDirectory());
03518 for (Int_t j = (i*nelements)/nmasters;
03519 j < ((i+1)*nelements)/nmasters;
03520 j++) {
03521 TDSetElement *elem = setelements ?
03522 dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
03523 if (elem) {
03524 set.Add(elem->GetFileName(), elem->GetObjName(),
03525 elem->GetDirectory(), elem->GetFirst(),
03526 elem->GetNum(), elem->GetMsd());
03527 nent += elem->GetNum();
03528 } else {
03529 Warning("Process", "not a TDSetElement object");
03530 }
03531 }
03532
03533 if (set.GetListOfElements()->GetSize()>0) {
03534 TMessage mesg(kPROOF_PROCESS);
03535 TString fn(gSystem->BaseName(selector_file));
03536 TString opt = option;
03537 mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
03538
03539 TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
03540 if (sl) {
03541 PDB(kGlobal,1) Info("Process",
03542 "Sending TDSet with %d elements to submaster %s",
03543 set.GetListOfElements()->GetSize(),
03544 sl->GetOrdinal());
03545 sl->GetSocket()->Send(mesg);
03546 usedmasters.Add(sl);
03547
03548
03549 fSlaves.AddLast(sl);
03550 fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
03551 fSlaveProgress[fSlaveProgress.GetSize()-1] = 0;
03552 fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
03553 fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
03554 fSlaveBytesRead.Set(fSlaveBytesRead.GetSize()+1);
03555 fSlaveBytesRead[fSlaveBytesRead.GetSize()-1] = 0;
03556 fSlaveInitTime.Set(fSlaveInitTime.GetSize()+1);
03557 fSlaveInitTime[fSlaveInitTime.GetSize()-1] = -1.;
03558 fSlaveProcTime.Set(fSlaveProcTime.GetSize()+1);
03559 fSlaveProcTime[fSlaveProcTime.GetSize()-1] = -1.;
03560 fSlaveEvtRti.Set(fSlaveEvtRti.GetSize()+1);
03561 fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
03562 fSlaveMBRti.Set(fSlaveMBRti.GetSize()+1);
03563 fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
03564 fSlaveActW.Set(fSlaveActW.GetSize()+1);
03565 fSlaveActW[fSlaveActW.GetSize()-1] = 0;
03566 fSlaveTotS.Set(fSlaveTotS.GetSize()+1);
03567 fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
03568 fSlaveEffS.Set(fSlaveEffS.GetSize()+1);
03569 fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
03570 } else {
03571 Warning("Process", "not a TSlave object");
03572 }
03573 }
03574 }
03575 }
03576
03577 if ( !IsClient() ) HandleTimer(0);
03578 PDB(kGlobal,1) Info("Process","Calling Collect");
03579 proof->Collect(&usedmasters);
03580 HandleTimer(0);
03581
03582 }
03583
03584 StopFeedback();
03585
03586 PDB(kGlobal,1) Info("Process","Calling Merge Output");
03587 MergeOutput();
03588
03589 TPerfStats::Stop();
03590
03591 return 0;
03592 }
03593
03594
03595 void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total, Long64_t processed)
03596 {
03597
03598
03599 Int_t idx = fSlaves.IndexOf(sl);
03600 fSlaveProgress[idx] = processed;
03601 if (fSlaveTotals[idx] != total)
03602 Warning("Progress", "total events has changed for slave %s", sl->GetName());
03603 fSlaveTotals[idx] = total;
03604
03605 Long64_t tot = 0;
03606 Int_t i;
03607 for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
03608 Long64_t proc = 0;
03609 for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
03610
03611 Progress(tot, proc);
03612 }
03613
03614
03615 void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total,
03616 Long64_t processed, Long64_t bytesread,
03617 Float_t initTime, Float_t procTime,
03618 Float_t evtrti, Float_t mbrti)
03619 {
03620
03621
03622 PDB(kGlobal,2)
03623 Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
03624 processed, bytesread, initTime, procTime, evtrti, mbrti);
03625
03626 Int_t idx = fSlaves.IndexOf(sl);
03627 if (fSlaveTotals[idx] != total)
03628 Warning("Progress", "total events has changed for slave %s", sl->GetName());
03629 fSlaveTotals[idx] = total;
03630 fSlaveProgress[idx] = processed;
03631 fSlaveBytesRead[idx] = bytesread;
03632 fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
03633 fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
03634 fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
03635 fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
03636
03637 Int_t i;
03638 Long64_t tot = 0;
03639 Long64_t proc = 0;
03640 Long64_t bytes = 0;
03641 Float_t init = -1.;
03642 Float_t ptime = -1.;
03643 Float_t erti = 0.;
03644 Float_t srti = 0.;
03645 Int_t nerti = 0;
03646 Int_t nsrti = 0;
03647 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
03648 tot += fSlaveTotals[i];
03649 if (i < fSlaveProgress.GetSize())
03650 proc += fSlaveProgress[i];
03651 if (i < fSlaveBytesRead.GetSize())
03652 bytes += fSlaveBytesRead[i];
03653 if (i < fSlaveInitTime.GetSize())
03654 if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
03655 init = fSlaveInitTime[i];
03656 if (i < fSlaveProcTime.GetSize())
03657 if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
03658 ptime = fSlaveProcTime[i];
03659 if (i < fSlaveEvtRti.GetSize())
03660 if (fSlaveEvtRti[i] > -1.) {
03661 erti += fSlaveEvtRti[i];
03662 nerti++;
03663 }
03664 if (i < fSlaveMBRti.GetSize())
03665 if (fSlaveMBRti[i] > -1.) {
03666 srti += fSlaveMBRti[i];
03667 nsrti++;
03668 }
03669 }
03670 srti = (nsrti > 0) ? srti / nerti : 0.;
03671
03672 Progress(tot, proc, bytes, init, ptime, erti, srti);
03673 }
03674
03675
03676 void TProofPlayerSuperMaster::Progress(TSlave *wrk, TProofProgressInfo *pi)
03677 {
03678
03679
03680 if (pi) {
03681 PDB(kGlobal,2)
03682 Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
03683 pi->fTotal, pi->fProcessed, pi->fBytesRead,
03684 pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
03685 pi->fActWorkers, pi->fEffSessions);
03686
03687 Int_t idx = fSlaves.IndexOf(wrk);
03688 if (fSlaveTotals[idx] != pi->fTotal)
03689 Warning("Progress", "total events has changed for worker %s", wrk->GetName());
03690 fSlaveTotals[idx] = pi->fTotal;
03691 fSlaveProgress[idx] = pi->fProcessed;
03692 fSlaveBytesRead[idx] = pi->fBytesRead;
03693 fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
03694 fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
03695 fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
03696 fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
03697 fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
03698 fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
03699 fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
03700
03701 Int_t i;
03702 Int_t nerti = 0;
03703 Int_t nsrti = 0;
03704 TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
03705 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
03706 pisum.fTotal += fSlaveTotals[i];
03707 if (i < fSlaveProgress.GetSize())
03708 pisum.fProcessed += fSlaveProgress[i];
03709 if (i < fSlaveBytesRead.GetSize())
03710 pisum.fBytesRead += fSlaveBytesRead[i];
03711 if (i < fSlaveInitTime.GetSize())
03712 if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
03713 pisum.fInitTime = fSlaveInitTime[i];
03714 if (i < fSlaveProcTime.GetSize())
03715 if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
03716 pisum.fProcTime = fSlaveProcTime[i];
03717 if (i < fSlaveEvtRti.GetSize())
03718 if (fSlaveEvtRti[i] > -1.) {
03719 pisum.fEvtRateI += fSlaveEvtRti[i];
03720 nerti++;
03721 }
03722 if (i < fSlaveMBRti.GetSize())
03723 if (fSlaveMBRti[i] > -1.) {
03724 pisum.fMBRateI += fSlaveMBRti[i];
03725 nsrti++;
03726 }
03727 if (i < fSlaveActW.GetSize())
03728 pisum.fActWorkers += fSlaveActW[i];
03729 if (i < fSlaveTotS.GetSize())
03730 if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
03731 pisum.fTotSessions = fSlaveTotS[i];
03732 if (i < fSlaveEffS.GetSize())
03733 if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
03734 pisum.fEffSessions = fSlaveEffS[i];
03735 }
03736 pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
03737
03738 Progress(&pisum);
03739 }
03740 }
03741
03742
03743 Bool_t TProofPlayerSuperMaster::HandleTimer(TTimer *)
03744 {
03745
03746
03747 if (fFeedbackTimer == 0) return kFALSE;
03748
03749 Int_t i;
03750 Long64_t tot = 0;
03751 Long64_t proc = 0;
03752 Long64_t bytes = 0;
03753 Float_t init = -1.;
03754 Float_t ptime = -1.;
03755 Float_t erti = 0.;
03756 Float_t srti = 0.;
03757 Int_t nerti = 0;
03758 Int_t nsrti = 0;
03759 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
03760 tot += fSlaveTotals[i];
03761 if (i < fSlaveProgress.GetSize())
03762 proc += fSlaveProgress[i];
03763 if (i < fSlaveBytesRead.GetSize())
03764 bytes += fSlaveBytesRead[i];
03765 if (i < fSlaveInitTime.GetSize())
03766 if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
03767 init = fSlaveInitTime[i];
03768 if (i < fSlaveProcTime.GetSize())
03769 if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
03770 ptime = fSlaveProcTime[i];
03771 if (i < fSlaveEvtRti.GetSize())
03772 if (fSlaveEvtRti[i] > -1.) {
03773 erti += fSlaveEvtRti[i];
03774 nerti++;
03775 }
03776 if (i < fSlaveMBRti.GetSize())
03777 if (fSlaveMBRti[i] > -1.) {
03778 srti += fSlaveMBRti[i];
03779 nsrti++;
03780 }
03781 }
03782 erti = (nerti > 0) ? erti / nerti : 0.;
03783 srti = (nsrti > 0) ? srti / nerti : 0.;
03784
03785 TMessage m(kPROOF_PROGRESS);
03786 if (gProofServ->GetProtocol() > 25) {
03787
03788 TProofProgressInfo pi(tot, proc, bytes, init, ptime,
03789 erti, srti, -1,
03790 gProofServ->GetTotSessions(), gProofServ->GetEffSessions());
03791 m << π
03792 } else {
03793
03794 m << tot << proc << bytes << init << ptime << erti << srti;
03795 }
03796
03797
03798 gProofServ->GetSocket()->Send(m);
03799
03800 if (fReturnFeedback)
03801 return TProofPlayerRemote::HandleTimer(0);
03802 else
03803 return kFALSE;
03804 }
03805
03806
03807 void TProofPlayerSuperMaster::SetupFeedback()
03808 {
03809
03810
03811 if (IsClient()) return;
03812
03813 TProofPlayerRemote::SetupFeedback();
03814
03815 if (fFeedbackTimer) {
03816 fReturnFeedback = kTRUE;
03817 return;
03818 } else {
03819 fReturnFeedback = kFALSE;
03820 }
03821
03822
03823 SafeDelete(fFeedbackTimer);
03824 fFeedbackPeriod = 2000;
03825 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
03826 fFeedbackTimer = new TTimer;
03827 fFeedbackTimer->SetObject(this);
03828 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03829 }