00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "TProofPlayerLite.h"
00022
00023 #include "MessageTypes.h"
00024 #include "TDSet.h"
00025 #include "TDSetProxy.h"
00026 #include "TEntryList.h"
00027 #include "TEventList.h"
00028 #include "TList.h"
00029 #include "TMap.h"
00030 #include "TMessage.h"
00031 #include "TObjString.h"
00032 #include "TPerfStats.h"
00033 #include "TProofLite.h"
00034 #include "TProofDebug.h"
00035 #include "TProofServ.h"
00036 #include "TROOT.h"
00037 #include "TSelector.h"
00038 #include "TVirtualPacketizer.h"
00039
00040
00041 Int_t TProofPlayerLite::MakeSelector(const char *selfile)
00042 {
00043
00044
00045
00046
00047
00048 fSelectorClass = 0;
00049 SafeDelete(fSelector);
00050 if (!selfile || strlen(selfile) <= 0) {
00051 Error("MakeSelector", "input file path or name undefined");
00052 return -1;
00053 }
00054
00055
00056 if (!strchr(gSystem->BaseName(selfile), '.')) {
00057 if (gDebug > 1)
00058 Info("MakeSelector", "selector name '%s' does not contain a '.':"
00059 " no file to check, it will be loaded from a library", selfile);
00060 if (!(fSelector = TSelector::GetSelector(selfile))) {
00061 Error("MakeSelector", "could not create a %s selector", selfile);
00062 return -1;
00063 }
00064
00065 return 0;
00066 }
00067
00068 if (((TProofLite*)fProof)->CopyMacroToCache(selfile, 1, &fSelector, TProof::kCp | TProof::kCpBin) < 0)
00069 return -1;
00070
00071
00072 return 0;
00073 }
00074
00075
00076 Long64_t TProofPlayerLite::Process(TDSet *dset, const char *selector_file,
00077 Option_t *option, Long64_t nentries,
00078 Long64_t first)
00079 {
00080
00081
00082
00083
00084
00085 PDB(kGlobal,1) Info("Process","Enter");
00086 fDSet = dset;
00087 fExitStatus = kFinished;
00088
00089 if (!fProgressStatus) {
00090 Error("Process", "No progress status");
00091 return -1;
00092 }
00093 fProgressStatus->Reset();
00094
00095
00096 if (!fOutput)
00097 fOutput = new TList;
00098 else
00099 fOutput->Clear();
00100
00101 TPerfStats::Setup(fInput);
00102 TPerfStats::Start(fInput, fOutput);
00103
00104 TMessage mesg(kPROOF_PROCESS);
00105 TString fn(gSystem->BaseName(selector_file));
00106
00107
00108 Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
00109
00110
00111 if (fOutputLists) {
00112 fOutputLists->Delete();
00113 delete fOutputLists;
00114 fOutputLists = 0;
00115 }
00116
00117 if (!sync) {
00118 gSystem->RedirectOutput(fProof->fLogFileName);
00119 Printf(" ");
00120 Info("Process","starting new query");
00121 }
00122
00123 if (MakeSelector(selector_file) != 0) {
00124 if (!sync)
00125 gSystem->RedirectOutput(0);
00126 return -1;
00127 }
00128
00129 fSelectorClass = fSelector->IsA();
00130 fSelector->SetInputList(fInput);
00131 fSelector->SetOption(option);
00132
00133 PDB(kLoop,1) Info("Process","Call Begin(0)");
00134 fSelector->Begin(0);
00135
00136
00137 gProof->SendInputDataFile();
00138
00139 PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
00140 TDSet *set = new TDSetProxy(dset->GetType(), dset->GetObjName(),
00141 dset->GetDirectory());
00142 if (dset->TestBit(TDSet::kEmpty))
00143 set->SetBit(TDSet::kEmpty);
00144 fProof->SetParameter("PROOF_MaxSlavesPerNode", (Long_t) ((TProofLite *)fProof)->fNWorkers);
00145 if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizerAdaptive") != 0) {
00146 Error("Process", "cannot init the packetizer");
00147 fExitStatus = kAborted;
00148 return -1;
00149 }
00150
00151 first = 0;
00152
00153 if (!fProof->GetParameter("PROOF_MemLogFreq")){
00154 Long64_t memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
00155 memlogfreq = (memlogfreq > 0) ? memlogfreq : 1;
00156 fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
00157 }
00158
00159
00160
00161 fProof->SetParameter("PROOF_QueryTag", fProof->GetName());
00162
00163 fProof->SetParameter("PROOF_QuerySeqNum", fProof->fSeqNum);
00164
00165 if (!sync)
00166 gSystem->RedirectOutput(0);
00167
00168 TCleanup clean(this);
00169 SetupFeedback();
00170
00171 TString opt = option;
00172
00173
00174 Long64_t num = (fProof->IsParallel()) ? -1 : nentries;
00175 Long64_t fst = (fProof->IsParallel()) ? -1 : first;
00176
00177
00178 TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
00179 : (TEntryList *)0;
00180 TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
00181 : (TEventList *)0;
00182
00183 fProof->ResetMergePrg();
00184
00185
00186 PDB(kGlobal,1) Info("Process","Calling Broadcast");
00187 mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
00188 Int_t nb = fProof->Broadcast(mesg);
00189 fProof->fNotIdle += nb;
00190
00191
00192 fProof->fRedirLog = kTRUE;
00193
00194 if (!sync) {
00195
00196
00197
00198 PDB(kGlobal,1) Info("Process","Asynchronous processing:"
00199 " activating CollectInputFrom");
00200 fProof->Activate();
00201
00202
00203 return fProof->fSeqNum;
00204
00205 } else {
00206
00207
00208 PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
00209 fProof->Collect();
00210
00211
00212
00213 fProof->fRedirLog = kFALSE;
00214
00215 if (!TSelector::IsStandardDraw(fn))
00216 HandleTimer(0);
00217
00218 if (fPacketizer && fQuery)
00219 fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
00220 fPacketizer->GetInitTime(),
00221 fPacketizer->GetProcTime());
00222 StopFeedback();
00223
00224 if (GetExitStatus() != TProofPlayer::kAborted)
00225 return Finalize(kFALSE, sync);
00226 else
00227 return -1;
00228 }
00229 }
00230
00231
00232 Long64_t TProofPlayerLite::Finalize(Bool_t force, Bool_t sync)
00233 {
00234
00235
00236
00237 if (fOutputLists == 0) {
00238 if (force && fQuery)
00239 return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
00240 fQuery->GetName()), force);
00241 }
00242
00243 Long64_t rv = 0;
00244
00245 TPerfStats::Stop();
00246
00247 if (!fQuery) {
00248 Info("Finalize", "query is undefined!");
00249 return -1;
00250 }
00251
00252
00253
00254 MergeOutput();
00255
00256
00257 MergeOutputFiles();
00258
00259 if (fExitStatus != kAborted) {
00260
00261 if (!sync) {
00262
00263
00264
00265
00266 if (ReinitSelector(fQuery) == -1) {
00267 Info("Finalize", "problems reinitializing selector \"%s\"",
00268 fQuery->GetSelecImp()->GetName());
00269 return -1;
00270 }
00271 }
00272
00273
00274 fSelector->SetInputList(fInput);
00275
00276 TIter next(fOutput);
00277 TList *output = fSelector->GetOutputList();
00278 while(TObject* obj = next()) {
00279 if (fProof->IsParallel() || DrawCanvas(obj) == 1)
00280
00281
00282 output->Add(obj);
00283 }
00284
00285 SetSelectorDataMembersFromOutputList();
00286
00287 PDB(kLoop,1) Info("Finalize","Call Terminate()");
00288 fOutput->Clear("nodelete");
00289 fSelector->Terminate();
00290
00291 rv = fSelector->GetStatus();
00292
00293
00294 TIter it(output);
00295 while(TObject* o = it()) {
00296 fOutput->Add(o);
00297 }
00298
00299
00300 if (fQuery) {
00301 fQuery->SetOutputList(fOutput);
00302
00303 fQuery->SetFinalized();
00304 } else {
00305 Warning("Finalize","current TQueryResult object is undefined!");
00306 }
00307
00308
00309
00310
00311 output->SetOwner(kFALSE);
00312 SafeDelete(fSelector);
00313
00314
00315
00316 fOutput->SetOwner(kFALSE);
00317 SafeDelete(fOutput);
00318 } else {
00319
00320
00321 fOutput->SetOwner();
00322 SafeDelete(fSelector);
00323 }
00324
00325 PDB(kGlobal,1) Info("Finalize","exit");
00326 return rv;
00327 }
00328
00329
00330 Bool_t TProofPlayerLite::HandleTimer(TTimer *)
00331 {
00332
00333
00334 PDB(kFeedback,2)
00335 Info("HandleTimer","Entry: %p", fFeedbackTimer);
00336
00337 if (fFeedbackTimer == 0) return kFALSE;
00338
00339
00340
00341
00342 TList *fb = new TList;
00343 fb->SetOwner();
00344
00345 TIter next(fFeedback);
00346 while( TObjString *name = (TObjString*) next() ) {
00347 TObject *o = fOutput->FindObject(name->GetName());
00348 if (o != 0) fb->Add(o->Clone());
00349 }
00350
00351 if (fb->GetSize() > 0)
00352 StoreFeedback(this, fb);
00353 else
00354 delete fb;
00355
00356 if (fFeedbackLists == 0) {
00357 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
00358 return kFALSE;
00359 }
00360
00361 fb = MergeFeedback();
00362
00363 Feedback(fb);
00364 fb->SetOwner();
00365 delete fb;
00366
00367 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
00368
00369 return kFALSE;
00370 }
00371
00372
00373 void TProofPlayerLite::SetupFeedback()
00374 {
00375
00376
00377 fFeedback = (TList*) fInput->FindObject("FeedbackList");
00378
00379 if (fFeedback) {
00380 PDB(kFeedback,1)
00381 Info("SetupFeedback","\"FeedbackList\" found: %d objects", fFeedback->GetSize());
00382 } else {
00383 PDB(kFeedback,1)
00384 Info("SetupFeedback","\"FeedbackList\" NOT found");
00385 }
00386
00387 if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
00388
00389
00390 SafeDelete(fFeedbackTimer);
00391 fFeedbackPeriod = 2000;
00392 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
00393 fFeedbackTimer = new TTimer;
00394 fFeedbackTimer->SetObject(this);
00395 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
00396 }
00397
00398
00399 void TProofPlayerLite::StoreFeedback(TObject *slave, TList *out)
00400 {
00401
00402
00403 PDB(kFeedback,1)
00404 Info("StoreFeedback","Enter (%p,%p,%d)", fFeedbackLists, out, (out ? out->GetSize() : -1));
00405
00406 if ( out == 0 ) {
00407 PDB(kFeedback,1)
00408 Info("StoreFeedback","Leave (empty)");
00409 return;
00410 }
00411
00412 if (fFeedbackLists == 0) {
00413 PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
00414 fFeedbackLists = new TList;
00415 fFeedbackLists->SetOwner();
00416 }
00417
00418 TIter next(out);
00419 out->SetOwner(kFALSE);
00420
00421 TObject *obj;
00422 while( (obj = next()) ) {
00423 PDB(kFeedback,2)
00424 Info("StoreFeedback","Find '%s'", obj->GetName() );
00425
00426 TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
00427 if ( map == 0 ) {
00428 PDB(kFeedback,2)
00429 Info("StoreFeedback", "map for '%s' not found (creating)", obj->GetName());
00430
00431 map = new TMap;
00432 map->SetName(obj->GetName());
00433 fFeedbackLists->Add(map);
00434 } else {
00435 PDB(kFeedback,2)
00436 Info("StoreFeedback","removing previous value");
00437 if (map->GetValue(slave))
00438 delete map->GetValue(slave);
00439 map->Remove(slave);
00440 }
00441 map->Add(slave, obj);
00442 }
00443
00444 delete out;
00445 PDB(kFeedback,1)
00446 Info("StoreFeedback","Leave");
00447 }