TPacketizerFile.cxx

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TPacketizerFile.cxx 38009 2011-02-08 17:57:54Z ganis $
00002 // Author: G. Ganis 2009
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TPacketizerFile                                                      //
00015 //                                                                      //
00016 // This packetizer generates packets which contain a single file path   //
00017 // to be used in process. Used for tasks generating files, like in      //
00018 // PROOF bench.                                                         //
00019 //                                                                      //
00020 //////////////////////////////////////////////////////////////////////////
00021 
00022 #include "TPacketizerFile.h"
00023 
00024 #include "Riostream.h"
00025 #include "TDSet.h"
00026 #include "TError.h"
00027 #include "TEventList.h"
00028 #include "TMap.h"
00029 #include "TMessage.h"
00030 #include "TMonitor.h"
00031 #include "TNtupleD.h"
00032 #include "TObject.h"
00033 #include "TParameter.h"
00034 #include "TPerfStats.h"
00035 #include "TProofDebug.h"
00036 #include "TProof.h"
00037 #include "TProofPlayer.h"
00038 #include "TProofServ.h"
00039 #include "TSlave.h"
00040 #include "TSocket.h"
00041 #include "TStopwatch.h"
00042 #include "TTimer.h"
00043 #include "TUrl.h"
00044 #include "TClass.h"
00045 #include "TMath.h"
00046 #include "TObjString.h"
00047 #include "TFileInfo.h"
00048 #include "TFileCollection.h"
00049 #include "THashList.h"
00050 
00051 //------------------------------------------------------------------------------
00052 
00053 class TPacketizerFile::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
00054 
00055 friend class TPacketizerFile;
00056 
00057 private:
00058    Long64_t  fLastProcessed; // number of processed entries of the last packet
00059    Double_t  fSpeed;         // estimated current average speed of the processing slave
00060    Double_t  fTimeInstant;   // stores the time instant when the current packet started
00061    TNtupleD *fCircNtp;       // Keeps circular info for speed calculations
00062    Long_t    fCircLvl;       // Circularity level
00063 
00064 public:
00065    TSlaveStat(TSlave *sl, TList *input);
00066    ~TSlaveStat();
00067 
00068    void        GetCurrentTime();
00069 
00070    void        UpdatePerformance(Double_t time);
00071    TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
00072 };
00073 
00074 // Iterator wrapper
00075 class TPacketizerFile::TIterObj : public TObject {
00076 
00077 private:
00078    TString   fName;          // Name of reference
00079    TIter    *fIter;          // Iterator
00080 
00081 public:
00082    TIterObj(const char *n, TIter *iter) : fName(n), fIter(iter) { }
00083    virtual ~TIterObj() { if (fIter) delete fIter; }
00084 
00085    const char *GetName() const {return fName;}
00086    TIter      *GetIter() const {return fIter;}
00087    void        Print(Option_t* option = "") const;
00088 };
00089 
00090 ClassImp(TPacketizerFile)
00091 
00092 //______________________________________________________________________________
00093 TPacketizerFile::TPacketizerFile(TList *workers, Long64_t, TList *input,
00094                                  TProofProgressStatus *st)
00095                 : TVirtualPacketizer(input, st)
00096 {
00097    // Constructor
00098 
00099    PDB(kPacketizer,1) Info("TPacketizerFile", "enter");
00100    ResetBit(TObject::kInvalidObject);
00101    fValid = kFALSE;
00102    fAssigned = 0;
00103    fProcNotAssigned = kTRUE;
00104 
00105    if (!input || (input && input->GetSize() <= 0)) {
00106       Error("TPacketizerFile", "input file is undefined or empty!");
00107       SetBit(TObject::kInvalidObject);
00108       return;
00109    }
00110 
00111    // Check if the files not explicitely assigned have to be processed
00112    Int_t procnotass = 1;
00113    if (TProof::GetParameter(input, "PROOF_ProcessNotAssigned", procnotass) == 0) {
00114       if (procnotass == 0) {
00115          Info("TPacketizerFile", "files not assigned to workers will not be processed");
00116          fProcNotAssigned = kFALSE;
00117       }
00118    }
00119 
00120    // These are the file to be created/processed per node; the information
00121    if (!(fFiles = dynamic_cast<TMap *>(input->FindObject("PROOF_FilesToProcess")))) {
00122       Error("TPacketizerFile", "map of files to be processed/created not found");
00123       SetBit(TObject::kInvalidObject);
00124       return;
00125    }
00126 
00127    // The worker stats
00128    fSlaveStats = new TMap;
00129    fSlaveStats->SetOwner(kFALSE);
00130 
00131    TList nodes;
00132    nodes.SetOwner(kTRUE);
00133    TSlave *wrk;
00134    TIter si(workers);
00135    while ((wrk = (TSlave *) si.Next())) {
00136       fSlaveStats->Add(wrk, new TSlaveStat(wrk, input));
00137       TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
00138       Info("TPacketizerFile", "worker: %s", wrkname.Data());
00139       if (!nodes.FindObject(wrkname)) nodes.Add(new TObjString(wrkname));
00140    }
00141 
00142    // The list of iterators
00143    fIters = new TList;
00144    fIters->SetOwner(kTRUE);
00145 
00146    // There must be something in
00147    fTotalEntries = 0;
00148    fNotAssigned = new TList;
00149    fNotAssigned->SetName("*");
00150    TIter nxl(fFiles);
00151    TObject *key, *o = 0;
00152    while ((key = nxl()) != 0) {
00153       THashList *wrklist = dynamic_cast<THashList *>(fFiles->GetValue(key));
00154       if (!wrklist) {
00155          TFileCollection *fc = dynamic_cast<TFileCollection *>(fFiles->GetValue(key));
00156          if (fc) wrklist = fc->GetList();
00157       }
00158       if (wrklist) {
00159          TString hname = TUrl(key->GetName()).GetHostFQDN();
00160          if ((o = nodes.FindObject(hname))) {
00161             fTotalEntries += wrklist->GetSize();
00162             fIters->Add(new TIterObj(hname, new TIter(wrklist)));
00163             // Notify
00164             PDB(kPacketizer,2)
00165                Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') assigned to '%s'",
00166                                        wrklist->GetSize(), key->GetName(), hname.Data(), o->GetName());
00167          } else {
00168             // We add all to the not assigned list so that they will be distributed
00169             // according to the load
00170             TIter nxf(wrklist);
00171             while ((o = nxf()))
00172                fNotAssigned->Add(o);
00173             // Notify
00174             PDB(kPacketizer,2)
00175                Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') not assigned",
00176                                        wrklist->GetSize(), key->GetName(), hname.Data());
00177          }
00178       }
00179    }
00180    if (fNotAssigned && fNotAssigned->GetSize() > 0) {
00181       fTotalEntries += fNotAssigned->GetSize();
00182       fIters->Add(new TIterObj("*", new TIter(fNotAssigned)));
00183       Info("TPacketizerFile", "non-assigned files: %d", fNotAssigned->GetSize());
00184       fNotAssigned->Print();
00185    }
00186    if (fTotalEntries <= 0) {
00187       Error("TPacketizerFile", "no file path in the map!");
00188       SetBit(TObject::kInvalidObject);
00189       SafeDelete(fIters);
00190       return;
00191    } else {
00192       Info("TPacketizerFile", "processing %lld files", fTotalEntries);
00193       fIters->Print();
00194    }
00195 
00196    fStopwatch = new TStopwatch();
00197    fStopwatch->Start();
00198    fValid = kTRUE;
00199    PDB(kPacketizer,1) Info("TPacketizerFile", "return");
00200 
00201    // Done
00202    return;
00203 }
00204 
00205 //______________________________________________________________________________
00206 TPacketizerFile::~TPacketizerFile()
00207 {
00208    // Destructor.
00209 
00210    if (fNotAssigned) fNotAssigned->SetOwner(kFALSE);
00211    SafeDelete(fNotAssigned);
00212    if (fIters) fIters->SetOwner(kTRUE);
00213    SafeDelete(fIters);
00214    SafeDelete(fStopwatch);
00215 }
00216 
00217 //______________________________________________________________________________
00218 Double_t TPacketizerFile::GetCurrentTime()
00219 {
00220    // Get current time
00221 
00222    Double_t retValue = fStopwatch->RealTime();
00223    fStopwatch->Continue();
00224    return retValue;
00225 }
00226 
00227 //______________________________________________________________________________
00228 Float_t TPacketizerFile::GetCurrentRate(Bool_t &all)
00229 {
00230    // Get Estimation of the current rate; just summing the current rates of
00231    // the active workers
00232 
00233    all = kTRUE;
00234    // Loop over the workers
00235    Float_t currate = 0.;
00236    if (fSlaveStats && fSlaveStats->GetSize() > 0) {
00237       TIter nxw(fSlaveStats);
00238       TObject *key;
00239       while ((key = nxw()) != 0) {
00240          TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
00241          if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
00242             // Sum-up the current rates
00243             currate += wrkstat->GetProgressStatus()->GetCurrentRate();
00244          } else {
00245             all = kFALSE;
00246          }
00247       }
00248    }
00249    // Done
00250    return currate;
00251 }
00252 
00253 //______________________________________________________________________________
00254 TDSetElement *TPacketizerFile::GetNextPacket(TSlave *wrk, TMessage *r)
00255 {
00256    // Get next packet
00257 
00258    TDSetElement *elem = 0;
00259    if (!fValid)  return elem;
00260 
00261    // Find slave
00262    TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(wrk);
00263    if (!wrkstat) {
00264       Error("GetNextPacket", "could not find stat object for worker '%s'!", wrk->GetName());
00265       return elem;
00266    }
00267 
00268    PDB(kPacketizer,2)
00269       Info("GetNextPacket","worker-%s: fAssigned %lld / %lld", wrk->GetOrdinal(), fAssigned, fTotalEntries);
00270 
00271    // Update stats & free old element
00272    Double_t latency = 0., proctime = 0., proccpu = 0.;
00273    Long64_t bytesRead = -1;
00274    Long64_t totalEntries = -1; // used only to read an old message type
00275    Long64_t totev = 0;
00276    Long64_t numev = -1;
00277 
00278    TProofProgressStatus *status = 0;
00279    if (wrk->GetProtocol() > 18) {
00280       (*r) >> latency;
00281       (*r) >> status;
00282 
00283       // Calculate the progress made in the last packet
00284       TProofProgressStatus *progress = 0;
00285       if (status) {
00286          // upadte the worker status
00287          numev = status->GetEntries() - wrkstat->GetEntriesProcessed();
00288          progress = wrkstat->AddProcessed(status);
00289          if (progress) {
00290             // (*fProgressStatus) += *progress;
00291             proctime = progress->GetProcTime();
00292             proccpu  = progress->GetCPUTime();
00293             totev  = status->GetEntries(); // for backward compatibility
00294             bytesRead  = progress->GetBytesRead();
00295             delete progress;
00296          }
00297          delete status;
00298       } else
00299           Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
00300    } else {
00301 
00302       (*r) >> latency >> proctime >> proccpu;
00303 
00304       // only read new info if available
00305       if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
00306       if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
00307       if (r->BufferSize() > r->Length()) (*r) >> totev;
00308 
00309       numev = totev - wrkstat->GetEntriesProcessed();
00310       wrkstat->GetProgressStatus()->IncEntries(numev);
00311    }
00312 
00313    fProgressStatus->IncEntries(numev);
00314 
00315    PDB(kPacketizer,2)
00316       Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
00317                            wrk->GetOrdinal(), wrk->GetName(),
00318                            numev, latency, proctime, proccpu, bytesRead);
00319 
00320    if (gPerfStats != 0) {
00321       gPerfStats->PacketEvent(wrk->GetOrdinal(), wrk->GetName(), "", numev,
00322                               latency, proctime, proccpu, bytesRead);
00323    }
00324 
00325    if (fAssigned == fTotalEntries) {
00326       // Send last timer message
00327       HandleTimer(0);
00328       return 0;
00329    }
00330 
00331    if (fStop) {
00332       // Send last timer message
00333       HandleTimer(0);
00334       return 0;
00335    }
00336 
00337    PDB(kPacketizer,2)
00338       Info("GetNextPacket", "worker-%s (%s): getting next files ... ", wrk->GetOrdinal(),
00339                             wrk->GetName());
00340 
00341    // Get next file now
00342    TObject *nextfile = 0;
00343 
00344    // Find iterator associated to the worker
00345    TIterObj *io = dynamic_cast<TIterObj *>(fIters->FindObject(wrk->GetName()));
00346    if (io) {
00347       // Get next file to process in the list of the worker
00348       if (io->GetIter())
00349          nextfile = io->GetIter()->Next();
00350    }
00351 
00352    // If not found or all files already processed, check if a generic iterator
00353    // has still some files to process
00354    if (!nextfile && fProcNotAssigned) {
00355       if ((io = dynamic_cast<TIterObj *>(fIters->FindObject("*")))) {
00356          // Get next file to process in the list of the worker
00357          if (io->GetIter())
00358             nextfile = io->GetIter()->Next();
00359       }
00360    }
00361 
00362    // Return if nothing to process
00363    if (!nextfile) return elem;
00364 
00365    // The file name: we support TObjString or TFileInfo
00366    TString filename;
00367    TObjString *os = 0;
00368    if ((os = dynamic_cast<TObjString *>(nextfile))) {
00369       filename = os->GetName();
00370    } else {
00371       TFileInfo *fi = 0;
00372       if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
00373          filename = fi->GetCurrentUrl()->GetUrl();
00374    }
00375    // Nothing to process
00376    if (filename.IsNull()) {
00377       Warning("GetNextPacket", "found unsupported object of type '%s' in list: it must"
00378                                " be 'TObjString' or 'TFileInfo'", nextfile->GetName());
00379       return elem;
00380    }
00381    // Prepare the packet
00382    PDB(kPacketizer,2)
00383       Info("GetNextPacket", "worker-%s: assigning: '%s' (remaining %lld files)",
00384                             wrk->GetOrdinal(), filename.Data(), (fTotalEntries - fAssigned));
00385    elem = new TDSetElement(filename, "", "", 0, 1);
00386    elem->SetBit(TDSetElement::kEmpty);
00387 
00388    // Update the total counter
00389    fAssigned += 1;
00390 
00391    return elem;
00392 }
00393 
00394 //------------------------------------------------------------------------------
00395 
00396 //______________________________________________________________________________
00397 TPacketizerFile::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
00398                             : fLastProcessed(0),
00399                               fSpeed(0), fTimeInstant(0), fCircLvl(5)
00400 {
00401    // Main constructor
00402 
00403    // Initialize the circularity ntple for speed calculations
00404    fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
00405    TProof::GetParameter(input, "PROOF_TPacketizerFileCircularity", fCircLvl);
00406    fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
00407    fCircNtp->SetCircular(fCircLvl);
00408    fSlave = slave;
00409    fStatus = new TProofProgressStatus();
00410 }
00411 
00412 //______________________________________________________________________________
00413 TPacketizerFile::TSlaveStat::~TSlaveStat()
00414 {
00415    // Destructor
00416 
00417    SafeDelete(fCircNtp);
00418 }
00419 
00420 //______________________________________________________________________________
00421 void TPacketizerFile::TSlaveStat::UpdatePerformance(Double_t time)
00422 {
00423    // Update the circular ntple
00424 
00425    Double_t ttot = time;
00426    Double_t *ar = fCircNtp->GetArgs();
00427    Int_t ne = fCircNtp->GetEntries();
00428    if (ne <= 0) {
00429       // First call: just fill one ref entry and return
00430       fCircNtp->Fill(0., 0);
00431       fSpeed = 0.;
00432       return;
00433    }
00434    // Fill the entry
00435    fCircNtp->GetEntry(ne-1);
00436    ttot = ar[0] + time;
00437    fCircNtp->Fill(ttot, GetEntriesProcessed());
00438 
00439    // Calculate the speed
00440    fCircNtp->GetEntry(0);
00441    Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
00442    Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
00443    fSpeed = nevts / dtime;
00444    PDB(kPacketizer,2)
00445       Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
00446                                 time, dtime, nevts, fSpeed);
00447 
00448 }
00449 
00450 //______________________________________________________________________________
00451 TProofProgressStatus *TPacketizerFile::TSlaveStat::AddProcessed(TProofProgressStatus *st)
00452 {
00453    // Update the status info to the 'st'.
00454    // return the difference (*st - *fStatus)
00455 
00456    if (st) {
00457       // The entriesis not correct in 'st'
00458       Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
00459       // The last proc time should not be added
00460       fStatus->SetLastProcTime(0.);
00461       // Get the diff
00462       TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
00463       *fStatus += *diff;
00464       // Set the correct value
00465       fStatus->SetLastEntries(lastEntries);
00466       return diff;
00467    } else {
00468       Error("AddProcessed", "status arg undefined");
00469       return 0;
00470    }
00471 }
00472 
00473 //______________________________________________________________________________
00474 void TPacketizerFile::TIterObj::Print(Option_t *) const
00475 {
00476    // Printf info
00477 
00478    Printf("Iterator '%s' controls %d units", GetName(),
00479           ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
00480                                                      : -1));
00481 }

Generated on Tue Jul 5 14:52:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1