TProofPlayerLite.cxx

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TProofPlayerLite.cxx 34254 2010-06-30 16:29:36Z ganis $
00002 // Author: G. Ganis Mar 2008
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TProofPlayerLite                                                     //
00015 //                                                                      //
00016 // This version of TProofPlayerRemote merges the functionality needed   //
00017 // by clients and masters. It is used in optmized local sessions.       //
00018 //                                                                      //
00019 //////////////////////////////////////////////////////////////////////////
00020 
00021 #include "TProofPlayerLite.h"
00022 
00023 #include "MessageTypes.h"
00024 #include "TDSet.h"
00025 #include "TDSetProxy.h"
00026 #include "TEntryList.h"
00027 #include "TEventList.h"
00028 #include "TList.h"
00029 #include "TMap.h"
00030 #include "TMessage.h"
00031 #include "TObjString.h"
00032 #include "TPerfStats.h"
00033 #include "TProofLite.h"
00034 #include "TProofDebug.h"
00035 #include "TProofServ.h"
00036 #include "TROOT.h"
00037 #include "TSelector.h"
00038 #include "TVirtualPacketizer.h"
00039 
00040 //______________________________________________________________________________
00041 Int_t TProofPlayerLite::MakeSelector(const char *selfile)
00042 {
00043    // Create the selector object and save the relevant files and binary information
00044    // in the cache so that the worker can pick it up.
00045    // Returns 0 and fill fSelector in case of success. Returns -1 and sets
00046    // fSelector to 0 in case of failure.
00047 
00048    fSelectorClass = 0;
00049    SafeDelete(fSelector);
00050    if (!selfile || strlen(selfile) <= 0) {
00051       Error("MakeSelector", "input file path or name undefined");
00052       return -1;
00053    }
00054 
00055    // If we are just given a name, init the selector and return
00056    if (!strchr(gSystem->BaseName(selfile), '.')) {
00057       if (gDebug > 1)
00058          Info("MakeSelector", "selector name '%s' does not contain a '.':"
00059               " no file to check, it will be loaded from a library", selfile);
00060       if (!(fSelector = TSelector::GetSelector(selfile))) {
00061          Error("MakeSelector", "could not create a %s selector", selfile);
00062          return -1;
00063       }
00064       // Done
00065       return 0;
00066    }
00067 
00068    if (((TProofLite*)fProof)->CopyMacroToCache(selfile, 1, &fSelector, TProof::kCp | TProof::kCpBin) < 0)
00069       return -1;
00070 
00071    // Done
00072    return 0;
00073 }
00074 
00075 //______________________________________________________________________________
00076 Long64_t TProofPlayerLite::Process(TDSet *dset, const char *selector_file,
00077                                    Option_t *option, Long64_t nentries,
00078                                    Long64_t first)
00079 {
00080    // Process specified TDSet on PROOF.
00081    // This method is called on client and on the PROOF master.
00082    // The return value is -1 in case of error and TSelector::GetStatus() in
00083    // in case of success.
00084 
00085    PDB(kGlobal,1) Info("Process","Enter");
00086    fDSet = dset;
00087    fExitStatus = kFinished;
00088 
00089    if (!fProgressStatus) {
00090       Error("Process", "No progress status");
00091       return -1;
00092    }
00093    fProgressStatus->Reset();
00094 
00095    //   delete fOutput;
00096    if (!fOutput)
00097       fOutput = new TList;
00098    else
00099       fOutput->Clear();
00100 
00101    TPerfStats::Setup(fInput);
00102    TPerfStats::Start(fInput, fOutput);
00103 
00104    TMessage mesg(kPROOF_PROCESS);
00105    TString fn(gSystem->BaseName(selector_file));
00106 
00107    // Parse option
00108    Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
00109 
00110    // Make sure that the temporary output list is empty
00111    if (fOutputLists) {
00112       fOutputLists->Delete();
00113       delete fOutputLists;
00114       fOutputLists = 0;
00115    }
00116 
00117    if (!sync) {
00118       gSystem->RedirectOutput(fProof->fLogFileName);
00119       Printf(" ");
00120       Info("Process","starting new query");
00121    }
00122 
00123    if (MakeSelector(selector_file) != 0) {
00124       if (!sync)
00125          gSystem->RedirectOutput(0);
00126       return -1;
00127    }
00128 
00129    fSelectorClass = fSelector->IsA();
00130    fSelector->SetInputList(fInput);
00131    fSelector->SetOption(option);
00132 
00133    PDB(kLoop,1) Info("Process","Call Begin(0)");
00134    fSelector->Begin(0);
00135 
00136    // Send large input data objects, if any
00137    gProof->SendInputDataFile();
00138 
00139    PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
00140    TDSet *set = new TDSetProxy(dset->GetType(), dset->GetObjName(),
00141                                dset->GetDirectory());
00142    if (dset->TestBit(TDSet::kEmpty))
00143       set->SetBit(TDSet::kEmpty);
00144    fProof->SetParameter("PROOF_MaxSlavesPerNode", (Long_t) ((TProofLite *)fProof)->fNWorkers);
00145    if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizerAdaptive") != 0) {
00146       Error("Process", "cannot init the packetizer");
00147       fExitStatus = kAborted;
00148       return -1;
00149    }
00150    // reset start, this is now managed by the packetizer
00151    first = 0;
00152    // Try to have 100 messages about memory, unless a different number is given by the user
00153    if (!fProof->GetParameter("PROOF_MemLogFreq")){
00154       Long64_t memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
00155       memlogfreq = (memlogfreq > 0) ? memlogfreq : 1;
00156       fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
00157    }
00158 
00159    // Add the unique query tag as TNamed object to the input list
00160    // so that it is available in TSelectors for monitoring
00161    fProof->SetParameter("PROOF_QueryTag", fProof->GetName());
00162    //  ... and the sequential number
00163    fProof->SetParameter("PROOF_QuerySeqNum", fProof->fSeqNum);
00164 
00165    if (!sync)
00166       gSystem->RedirectOutput(0);
00167 
00168    TCleanup clean(this);
00169    SetupFeedback();
00170 
00171    TString opt = option;
00172 
00173    // Workers will get the entry ranges from the packetizer
00174    Long64_t num = (fProof->IsParallel()) ? -1 : nentries;
00175    Long64_t fst = (fProof->IsParallel()) ? -1 : first;
00176 
00177    // Entry- or Event- list ?
00178    TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
00179                                            : (TEntryList *)0;
00180    TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
00181                                            : (TEventList *)0;
00182    // Reset the merging progress information
00183    fProof->ResetMergePrg();
00184 
00185    // Broadcast main message
00186    PDB(kGlobal,1) Info("Process","Calling Broadcast");
00187    mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
00188    Int_t nb = fProof->Broadcast(mesg);
00189    fProof->fNotIdle += nb;
00190 
00191    // Redirect logs from master to special log frame
00192    fProof->fRedirLog = kTRUE;
00193 
00194    if (!sync) {
00195 
00196       // Asynchronous query: just make sure that asynchronous input
00197       // is enabled and return the prompt
00198       PDB(kGlobal,1) Info("Process","Asynchronous processing:"
00199                                     " activating CollectInputFrom");
00200       fProof->Activate();
00201 
00202       // Return the query sequential number
00203       return fProof->fSeqNum;
00204 
00205    } else {
00206 
00207       // Wait for processing
00208       PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
00209       fProof->Collect();
00210 
00211       // Restore prompt logging (Collect leaves things as they were
00212       // at the time it was called)
00213       fProof->fRedirLog = kFALSE;
00214 
00215       if (!TSelector::IsStandardDraw(fn))
00216          HandleTimer(0); // force an update of final result
00217       // Store process info
00218       if (fPacketizer && fQuery)
00219          fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
00220                                        fPacketizer->GetInitTime(),
00221                                        fPacketizer->GetProcTime());
00222       StopFeedback();
00223 
00224       if (GetExitStatus() != TProofPlayer::kAborted)
00225          return Finalize(kFALSE, sync);
00226       else
00227          return -1;
00228    }
00229 }
00230 
00231 //______________________________________________________________________________
00232 Long64_t TProofPlayerLite::Finalize(Bool_t force, Bool_t sync)
00233 {
00234    // Finalize a query.
00235    // Returns -1 in case error, 0 otherwise.
00236 
00237    if (fOutputLists == 0) {
00238       if (force && fQuery)
00239          return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
00240                                                fQuery->GetName()), force);
00241    }
00242 
00243    Long64_t rv = 0;
00244 
00245    TPerfStats::Stop();
00246 
00247    if (!fQuery) {
00248       Info("Finalize", "query is undefined!");
00249       return -1;
00250    }
00251 
00252    // Some objects (e.g. histos in autobin) may not have been merged yet
00253    // do it now
00254    MergeOutput();
00255 
00256    // Merge the output files created on workers, if any
00257    MergeOutputFiles();
00258 
00259    if (fExitStatus != kAborted) {
00260 
00261       if (!sync) {
00262          // Reinit selector (with multi-sessioning we must do this until
00263          // TSelector::GetSelector() is optimized to i) avoid reloading of an
00264          // unchanged selector and ii) invalidate existing instances of
00265          // reloaded selector)
00266          if (ReinitSelector(fQuery) == -1) {
00267             Info("Finalize", "problems reinitializing selector \"%s\"",
00268                   fQuery->GetSelecImp()->GetName());
00269             return -1;
00270          }
00271       }
00272 
00273       // Some input parameters may be needed in Terminate
00274       fSelector->SetInputList(fInput);
00275 
00276       TIter next(fOutput);
00277       TList *output = fSelector->GetOutputList();
00278       while(TObject* obj = next()) {
00279          if (fProof->IsParallel() || DrawCanvas(obj) == 1)
00280             // Either parallel or not a canvas or not able to display it:
00281             // just add to the list
00282             output->Add(obj);
00283       }
00284 
00285       SetSelectorDataMembersFromOutputList();
00286 
00287       PDB(kLoop,1) Info("Finalize","Call Terminate()");
00288       fOutput->Clear("nodelete");
00289       fSelector->Terminate();
00290 
00291       rv = fSelector->GetStatus();
00292 
00293       // copy the output list back and clean the selector's list
00294       TIter it(output);
00295       while(TObject* o = it()) {
00296          fOutput->Add(o);
00297       }
00298 
00299       // Save the output list in the current query, if any
00300       if (fQuery) {
00301          fQuery->SetOutputList(fOutput);
00302          // Set in finalized state (cannot be done twice)
00303          fQuery->SetFinalized();
00304       } else {
00305          Warning("Finalize","current TQueryResult object is undefined!");
00306       }
00307 
00308       // We have transferred copy of the output objects in TQueryResult,
00309       // so now we can cleanup the selector, making sure that we do not
00310       // touch the output objects
00311       output->SetOwner(kFALSE);
00312       SafeDelete(fSelector);
00313 
00314       // Delete fOutput (not needed anymore, cannot be finalized twice),
00315       // making sure that the objects saved in TQueryResult are not deleted
00316       fOutput->SetOwner(kFALSE);
00317       SafeDelete(fOutput);
00318    } else {
00319 
00320       // Cleanup
00321       fOutput->SetOwner();
00322       SafeDelete(fSelector);
00323    }
00324 
00325    PDB(kGlobal,1) Info("Finalize","exit");
00326    return rv;
00327 }
00328 
00329 //______________________________________________________________________________
00330 Bool_t TProofPlayerLite::HandleTimer(TTimer *)
00331 {
00332    // Send feedback objects to client.
00333 
00334    PDB(kFeedback,2)
00335       Info("HandleTimer","Entry: %p", fFeedbackTimer);
00336 
00337    if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
00338 
00339 
00340    // process local feedback objects
00341 
00342    TList *fb = new TList;
00343    fb->SetOwner();
00344 
00345    TIter next(fFeedback);
00346    while( TObjString *name = (TObjString*) next() ) {
00347       TObject *o = fOutput->FindObject(name->GetName());
00348       if (o != 0) fb->Add(o->Clone());
00349    }
00350 
00351    if (fb->GetSize() > 0)
00352       StoreFeedback(this, fb); // adopts fb
00353    else
00354       delete fb;
00355 
00356    if (fFeedbackLists == 0) {
00357       fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);   // maybe next time
00358       return kFALSE;
00359    }
00360 
00361    fb = MergeFeedback();
00362 
00363    Feedback(fb);
00364    fb->SetOwner();
00365    delete fb;
00366 
00367    fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
00368 
00369    return kFALSE; // ignored?
00370 }
00371 
00372 //______________________________________________________________________________
00373 void TProofPlayerLite::SetupFeedback()
00374 {
00375    // Setup reporting of feedback objects.
00376 
00377    fFeedback = (TList*) fInput->FindObject("FeedbackList");
00378 
00379    if (fFeedback) {
00380       PDB(kFeedback,1)
00381          Info("SetupFeedback","\"FeedbackList\" found: %d objects", fFeedback->GetSize());
00382    } else {
00383       PDB(kFeedback,1)
00384          Info("SetupFeedback","\"FeedbackList\" NOT found");
00385    }
00386 
00387    if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
00388 
00389    // OK, feedback was requested, setup the timer
00390    SafeDelete(fFeedbackTimer);
00391    fFeedbackPeriod = 2000;
00392    TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
00393    fFeedbackTimer = new TTimer;
00394    fFeedbackTimer->SetObject(this);
00395    fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
00396 }
00397 
00398 //______________________________________________________________________________
00399 void TProofPlayerLite::StoreFeedback(TObject *slave, TList *out)
00400 {
00401    // Store feedback results from the specified slave.
00402 
00403    PDB(kFeedback,1)
00404       Info("StoreFeedback","Enter (%p,%p,%d)", fFeedbackLists, out, (out ? out->GetSize() : -1));
00405 
00406    if ( out == 0 ) {
00407       PDB(kFeedback,1)
00408          Info("StoreFeedback","Leave (empty)");
00409       return;
00410    }
00411 
00412    if (fFeedbackLists == 0) {
00413       PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
00414       fFeedbackLists = new TList;
00415       fFeedbackLists->SetOwner();
00416    }
00417 
00418    TIter next(out);
00419    out->SetOwner(kFALSE);  // take ownership of the contents
00420 
00421    TObject *obj;
00422    while( (obj = next()) ) {
00423       PDB(kFeedback,2)
00424          Info("StoreFeedback","Find '%s'", obj->GetName() );
00425 
00426       TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
00427       if ( map == 0 ) {
00428          PDB(kFeedback,2)
00429             Info("StoreFeedback", "map for '%s' not found (creating)", obj->GetName());
00430          // map must not be owner (ownership is with regards to the keys (only))
00431          map = new TMap;
00432          map->SetName(obj->GetName());
00433          fFeedbackLists->Add(map);
00434       } else {
00435          PDB(kFeedback,2)
00436             Info("StoreFeedback","removing previous value");
00437          if (map->GetValue(slave))
00438             delete map->GetValue(slave);
00439          map->Remove(slave);
00440       }
00441       map->Add(slave, obj);
00442    }
00443 
00444    delete out;
00445    PDB(kFeedback,1)
00446       Info("StoreFeedback","Leave");
00447 }

Generated on Tue Jul 5 14:52:20 2011 for ROOT_528-00b_version by  doxygen 1.5.1