TPacketizerMulti.cxx

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TPacketizerMulti.cxx 32204 2010-02-03 19:17:40Z ganis $
00002 // Author: G. Ganis Jan 2010
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010 *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TPacketizerMulti                                                     //
00015 //                                                                      //
00016 // This class allows to do multiple runs in the same query; each run    //
00017 // can be a, for example, different dataset or the same dataset with    //
00018 // entry list.                                                          //
00019 // The multiple packetizer conatins a list of packetizers which are     //
00020 // processed in turn.                                                   //
00021 // The bit TSelector::kNewRun is set in the TSelector object when a new //
00022 // packetizer is used.                                                  //
00023 //                                                                      //
00024 //////////////////////////////////////////////////////////////////////////
00025 
00026 
00027 #include "TPacketizerMulti.h"
00028 
00029 #include "TClass.h"
00030 #include "TDSet.h"
00031 #include "TError.h"
00032 #include "TFileInfo.h"
00033 #include "TList.h"
00034 #include "TMap.h"
00035 #include "TMethodCall.h"
00036 #include "TProof.h"
00037 #include "TProofDebug.h"
00038 
00039 ClassImp(TPacketizerMulti)
00040 
00041 //______________________________________________________________________________
00042 TPacketizerMulti::TPacketizerMulti(TDSet *dset, TList *wrks,
00043                                    Long64_t first, Long64_t num,
00044                                    TList *input, TProofProgressStatus *st)
00045                  : TVirtualPacketizer(input, st)
00046 {
00047    // Constructor
00048 
00049    PDB(kPacketizer,1) Info("TPacketizerMulti",
00050                            "enter (first %lld, num %lld)", first, num);
00051    fValid = kFALSE;
00052    fPacketizersIter = 0;
00053    fCurrent = 0;
00054    fAssignedPack = 0;
00055 
00056    // Check inputs
00057    if (!dset || !wrks || !input || !st) {
00058       Error("TPacketizerMulti", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
00059                                 dset, wrks, input, st);
00060       return;
00061    }
00062    // Create the list
00063    fPacketizers = new TList;
00064 
00065    // We do not want progress timers from the single packetizers
00066    TNamed *progTimerFlag = new TNamed("PROOF_StartProgressTimer", "no");
00067    input->Add(progTimerFlag);
00068 
00069    fTotalEntries = 0;
00070    TVirtualPacketizer *packetizer = 0;
00071    // Simple or multi?
00072    if (!(dset->TestBit(TDSet::kMultiDSet))) {
00073       if ((packetizer = CreatePacketizer(dset, wrks, first, num, input, st))) {
00074          fPacketizers->Add(packetizer);
00075          fTotalEntries = packetizer->GetTotalEntries();
00076       } else {
00077          Error("TPacketizerMulti", "problems initializing packetizer for single dataset");
00078          input->Remove(progTimerFlag);
00079          delete progTimerFlag;
00080          return;
00081       }
00082    } else {
00083       // Iterate on the datasets
00084       TIter nxds(dset->GetListOfElements());
00085       TDSet *ds = 0;
00086       while ((ds = (TDSet *)nxds())) {
00087          if ((packetizer = CreatePacketizer(ds, wrks, first, num, input, st))) {
00088             fPacketizers->Add(packetizer);
00089             fTotalEntries += packetizer->GetTotalEntries();
00090          } else {
00091             Error("TPacketizerMulti", "problems initializing packetizer for dataset '%s'", ds->GetName());
00092          }
00093       }
00094    }
00095    // Cleanup temporary additions to the input list
00096    input->Remove(progTimerFlag);
00097    delete progTimerFlag;
00098 
00099    // If no valid packetizer could be initialized we fail
00100    if (fPacketizers->GetSize() <= 0) {
00101       Error("TPacketizerMulti", "no valid packetizer could be initialized - aborting");
00102       SafeDelete(fPacketizers);
00103       return;
00104    } else {
00105       Info("TPacketizerMulti", "%d packetizer(s) have been successfully initialized (%lld events in total)",
00106                                fPacketizers->GetSize(), fTotalEntries);
00107       // To avoid problems with counters we must set the total entries in each packetizer
00108       TIter nxp(fPacketizers);
00109       while ((packetizer = (TVirtualPacketizer *) nxp()))
00110          packetizer->SetTotalEntries(fTotalEntries);
00111    }
00112 
00113    // Create the interator
00114    fPacketizersIter = new TIter(fPacketizers);
00115 
00116    // Set the current the first
00117    if (!(fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next())) {
00118       // Weird
00119       Error("TPacketizerMulti", "could not point to the first valid packetizer");
00120       fPacketizers->SetOwner(kTRUE);
00121       SafeDelete(fPacketizers);
00122       SafeDelete(fPacketizersIter);
00123       return;
00124    }
00125 
00126    // Create map
00127    fAssignedPack = new TMap;
00128 
00129    // Ok, everything went fine
00130    fValid = kTRUE;
00131 
00132    PDB(kPacketizer,1) Info("TPacketizerMulti", "done");
00133 }
00134 
00135 //______________________________________________________________________________
00136 TPacketizerMulti::~TPacketizerMulti()
00137 {
00138    // Destructor.
00139 
00140    if (fPacketizers) {
00141       fPacketizers->SetOwner(kTRUE);
00142       SafeDelete(fPacketizers);
00143    }
00144    SafeDelete(fPacketizers);
00145    fCurrent = 0;
00146    if (fAssignedPack) {
00147       fAssignedPack->SetOwner(kFALSE);
00148       SafeDelete(fAssignedPack);
00149    }
00150 }
00151 
00152 //______________________________________________________________________________
00153 TDSetElement *TPacketizerMulti::GetNextPacket(TSlave *wrk, TMessage *r)
00154 {
00155    // Get next packet from the current packetizer.
00156    // If the current packetizer is done, move to next.
00157    // Retun null when all packetizers are done.
00158 
00159    TDSetElement *elem = 0;
00160 
00161    // Must be valid
00162    if (!fValid) return elem;
00163 
00164    // Point to the packetizer last used for thsi worker
00165    TVirtualPacketizer *lastPacketizer = dynamic_cast<TVirtualPacketizer *>(fAssignedPack->GetValue(wrk));
00166    if (lastPacketizer && lastPacketizer != fCurrent) {
00167       PDB(kPacketizer,2)
00168          Info("GetNextPacket", "%s: asking old packetizer %p ... ", wrk->GetOrdinal(), lastPacketizer);
00169       if ((elem = lastPacketizer->GetNextPacket(wrk, r))) return elem;
00170       if (fCurrent) {
00171          // Transfer the status info
00172          TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(lastPacketizer->GetSlaveStats()->GetValue(wrk));
00173          TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
00174          if (oldstat && curstat)
00175             *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
00176       }
00177    }
00178 
00179    // Need something to be processed
00180    if (!fCurrent) {
00181       HandleTimer(0);   // Send last timer message
00182       return elem;
00183    }
00184 
00185    // Get the next packet from the current packetizer
00186    PDB(kPacketizer,2)
00187       Info("GetNextPacket", "%s: asking current packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
00188    if (!(elem = fCurrent->GetNextPacket(wrk, r))) {
00189       // We need to transfer the status info if we change packetizer now
00190       TMap *oldStats = (lastPacketizer && lastPacketizer == fCurrent) ? lastPacketizer->GetSlaveStats() : 0;
00191       // If the packetizer is done, move to next
00192       fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next();
00193       if (fCurrent) {
00194          // Transfer the status info
00195          if (oldStats) {
00196             TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(oldStats->GetValue(wrk));
00197             TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
00198             if (oldstat && curstat)
00199                *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
00200          }
00201          PDB(kPacketizer,2)
00202             Info("GetNextPacket", "%s: asking new packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
00203          elem = fCurrent->GetNextPacket(wrk, r);
00204       }
00205    }
00206    if (fCurrent) {
00207       // Save the packetizer
00208       TPair *pair = dynamic_cast<TPair *>(fAssignedPack->FindObject(wrk));
00209       if (pair) {
00210          pair->SetValue(fCurrent);
00211       } else {
00212          fAssignedPack->Add(wrk, fCurrent);
00213       }
00214       PDB(kPacketizer,2)
00215          Info("GetNextPacket", "assigned packetizer %p to %s (check: %p)",
00216                                fCurrent,  wrk->GetOrdinal(), fAssignedPack->GetValue(wrk));
00217    }
00218 
00219    // Check the total number of entries
00220    if (fProgressStatus->GetEntries() >= fTotalEntries) {
00221       if (fProgressStatus->GetEntries() > fTotalEntries)
00222          Error("GetNextPacket", "Processed too many entries!");
00223       HandleTimer(0);   // Send last timer message
00224       SafeDelete(fProgress);
00225    }
00226 
00227    // Done
00228    return elem;
00229 }
00230 
00231 //______________________________________________________________________________
00232 TVirtualPacketizer *TPacketizerMulti::CreatePacketizer(TDSet *dset, TList *wrks,
00233                                                        Long64_t first, Long64_t num,
00234                                                        TList *input, TProofProgressStatus *st)
00235 {
00236    // Create a packetizer for dataset 'dset'
00237    // Return null on failure.
00238 
00239    TVirtualPacketizer *packetizer = 0;
00240 
00241    // Check inputs
00242    if (!dset || !wrks || !input || !st) {
00243       Error("CreatePacketizer", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
00244                                  dset, wrks, input, st);
00245       return packetizer;
00246    }
00247 
00248    // This is for data-driven runs
00249    if (dset->TestBit(TDSet::kEmpty)) {
00250       Error("CreatePacketizer", "dataset is empty: protocol error?");
00251       return packetizer;
00252    }
00253 
00254    TString packetizername;
00255    TList *listOfMissingFiles = 0;
00256 
00257    TMethodCall callEnv;
00258    TClass *cl;
00259 
00260    // Lookup - resolve the end-point urls to optmize the distribution.
00261    // The lookup was previously called in the packetizer's constructor.
00262    // A list for the missing files may already have been added to the
00263    // output list; otherwise, if needed it will be created inside
00264    if (!(listOfMissingFiles = (TList *) input->FindObject("MissingFiles"))) {
00265       // Create it
00266       listOfMissingFiles = new TList;
00267       // and add it to the input list; it will be later moved to the output list
00268       input->Add(listOfMissingFiles);
00269    }
00270    dset->Lookup(kTRUE, &listOfMissingFiles);
00271 
00272    if (!(dset->GetListOfElements()) ||
00273        !(dset->GetListOfElements()->GetSize())) {
00274       Error("CreatePacketizer", "no files from the data set were found - skipping");
00275       return packetizer;
00276    }
00277 
00278    if (TProof::GetParameter(input, "PROOF_Packetizer", packetizername) != 0) {
00279       // Using standard packetizer TAdaptivePacketizer
00280       packetizername = "TPacketizerAdaptive";
00281    } else {
00282       Info("CreatePacketizer", "using alternate packetizer: %s", packetizername.Data());
00283    }
00284 
00285    // Get linked to the related class
00286    cl = TClass::GetClass(packetizername);
00287    if (cl == 0) {
00288       Error("CreatePacketizer", "class '%s' not found", packetizername.Data());
00289       return packetizer;
00290    }
00291 
00292    // Init the constructor
00293    callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
00294    if (!callEnv.IsValid()) {
00295       Error("CreatePacketizer", "cannot find correct constructor for '%s'", cl->GetName());
00296       return packetizer;
00297    }
00298    callEnv.ResetParam();
00299    callEnv.SetParam((Long_t) dset);
00300    callEnv.SetParam((Long_t) wrks);
00301    callEnv.SetParam((Long64_t) first);
00302    callEnv.SetParam((Long64_t) num);
00303    callEnv.SetParam((Long_t) input);
00304    callEnv.SetParam((Long_t) st);
00305 
00306    // We are going to test validity during the packetizer initialization
00307    dset->SetBit(TDSet::kValidityChecked);
00308    dset->ResetBit(TDSet::kSomeInvalid);
00309 
00310    // Get an instance of the packetizer
00311    Long_t ret = 0;
00312    callEnv.Execute(ret);
00313    if ((packetizer = (TVirtualPacketizer *)ret) == 0) {
00314       Error("CreatePacketizer", "cannot construct '%s'", cl->GetName());
00315       return packetizer;
00316    }
00317 
00318    if (!packetizer->IsValid()) {
00319       Error("CreatePacketizer",
00320             "instantiated packetizer object '%s' is invalid", cl->GetName());
00321       SafeDelete(packetizer);
00322    }
00323 
00324    // Add invalid elements to the list of missing elements
00325    TDSetElement *elem = 0;
00326    if (dset->TestBit(TDSet::kSomeInvalid)) {
00327       TIter nxe(dset->GetListOfElements());
00328       while ((elem = (TDSetElement *)nxe())) {
00329          if (!elem->GetValid()) {
00330             listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
00331             dset->Remove(elem, kFALSE);
00332          }
00333       }
00334       // The invalid elements have been removed
00335       dset->ResetBit(TDSet::kSomeInvalid);
00336    }
00337 
00338    // Done
00339    return packetizer;
00340 }

Generated on Tue Jul 5 14:52:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1