TVirtualPacketizer.cxx

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TVirtualPacketizer.cxx 37396 2010-12-08 13:12:00Z rdm $
00002 // Author: Maarten Ballintijn    9/7/2002
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TVirtualPacketizer                                                   //
00015 //                                                                      //
00016 // The packetizer is a load balancing object created for each query.    //
00017 // It generates packets to be processed on PROOF worker servers.        //
00018 // A packet is an event range (begin entry and number of entries) or    //
00019 // object range (first object and number of objects) in a TTree         //
00020 // (entries) or a directory (objects) in a file.                        //
00021 // Packets are generated taking into account the performance of the     //
00022 // remote machine, the time it took to process a previous packet on     //
00023 // the remote machine, the locality of the database files, etc.         //
00024 //                                                                      //
00025 // TVirtualPacketizer includes common parts of PROOF packetizers.       //
00026 // Look in subclasses for details.                                      //
00027 // The default packetizer is TPacketizerAdaptive.                       //
00028 // To use an alternative one, for instance - the TPacketizer, call:     //
00029 // proof->SetParameter("PROOF_Packetizer", "TPacketizer");              //
00030 //                                                                      //
00031 //////////////////////////////////////////////////////////////////////////
00032 
00033 
00034 #include "TVirtualPacketizer.h"
00035 #include "TEnv.h"
00036 #include "TFile.h"
00037 #include "TTree.h"
00038 #include "TKey.h"
00039 #include "TDSet.h"
00040 #include "TError.h"
00041 #include "TEventList.h"
00042 #include "TEntryList.h"
00043 #include "TMap.h"
00044 #include "TMessage.h"
00045 #include "TObjString.h"
00046 #include "TParameter.h"
00047 
00048 #include "TProof.h"
00049 #include "TProofDebug.h"
00050 #include "TProofPlayer.h"
00051 #include "TProofServ.h"
00052 #include "TSlave.h"
00053 #include "TSocket.h"
00054 #include "TTimer.h"
00055 #include "TUrl.h"
00056 #include "TMath.h"
00057 #include "TMonitor.h"
00058 #include "TNtuple.h"
00059 #include "TNtupleD.h"
00060 #include "TPerfStats.h"
00061 
00062 ClassImp(TVirtualPacketizer)
00063 
00064 //______________________________________________________________________________
00065 TVirtualPacketizer::TVirtualPacketizer(TList *input, TProofProgressStatus *st)
00066 {
00067    // Constructor.
00068 
00069    // General configuration parameters
00070    fMinPacketTime = 3;
00071    Double_t minPacketTime = 0;
00072    if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) {
00073       Info("TVirtualPacketizer", "setting minimum time for a packet to %f",
00074            minPacketTime);
00075       fMinPacketTime = (Int_t) minPacketTime;
00076    }
00077    fMaxPacketTime = 20;
00078    Double_t maxPacketTime = 0;
00079    if (TProof::GetParameter(input, "PROOF_MaxPacketTime", maxPacketTime) == 0) {
00080       Info("TVirtualPacketizer", "setting maximum packet time for a packet to %f",
00081            maxPacketTime);
00082       fMaxPacketTime = (Int_t) maxPacketTime;
00083    }
00084    ResetBit(TVirtualPacketizer::kIsTree);
00085 
00086    // Create the list to save them in the query result (each derived packetizer is
00087    // responsible to update this coherently)
00088    fConfigParams = new TList;
00089    fConfigParams->SetName("PROOF_PacketizerConfigParams");
00090    fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
00091    fConfigParams->Add(new TParameter<Double_t>("PROOF_MaxPacketTime", fMaxPacketTime));
00092 
00093    fProgressStatus = st;
00094    if (!fProgressStatus) {
00095       Error("TVirtualPacketizer", "No progress status");
00096       return;
00097    }
00098    fTotalEntries = 0;
00099    fValid = kTRUE;
00100    fStop = kFALSE;
00101    fFailedPackets = 0;
00102    fDataSet = "";
00103    fSlaveStats = 0;
00104 
00105    // Performance monitoring
00106    fStartTime = gSystem->Now();
00107    SetBit(TVirtualPacketizer::kIsInitializing);
00108    ResetBit(TVirtualPacketizer::kIsDone);
00109    fInitTime = 0;
00110    fProcTime = 0;
00111    fTimeUpdt = -1.;
00112 
00113    // Init circularity ntple for performance calculations
00114    fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb:rc:al");
00115    fCircN = 5;
00116    TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
00117    fCircProg->SetCircular(fCircN);
00118 
00119    // Check if we need to start the progress timer (multi-packetizers do not want
00120    // timers from the packetizers they control ...)
00121    TString startProgress("yes");
00122    TProof::GetParameter(input, "PROOF_StartProgressTimer", startProgress);
00123 
00124    // Init progress timer, if requested
00125    // The timer is destroyed (and therefore stopped) by the relevant TPacketizer implementation
00126    // in GetNextPacket when end of work is detected.
00127    fProgress = 0;
00128    if (startProgress == "yes") {
00129       Long_t period = 500;
00130       TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
00131       fProgress = new TTimer;
00132       fProgress->SetObject(this);
00133       fProgress->Start(period, kFALSE);
00134    }
00135 
00136    // Init ntple to store active workers vs processing time
00137    TString saveProgressPerf("no");
00138    TProof::GetParameter(input, "PROOF_SaveProgressPerf", saveProgressPerf);
00139    fProgressPerf = 0;
00140    if (fProgress && saveProgressPerf == "yes")
00141       fProgressPerf = new TNtuple("PROOF_ProgressPerfNtuple",
00142                                   "{Active workers, evt rate, MB read} vs processing time", "tm:aw:er:mb:ns");
00143    fProcTimeLast = -1.;
00144    fActWrksLast = -1;
00145    fEvtRateLast = -1.;
00146    fMBsReadLast = -1.;
00147    fEffSessLast = -1.;
00148    fAWLastFill = kFALSE;
00149    fReportPeriod = -1.;
00150 
00151    // Whether to send estimated values for the progress info
00152    TString estopt;
00153    TProof::GetParameter(input, "PROOF_RateEstimation", estopt);
00154    if (estopt.IsNull()) {
00155       // Parse option from the env
00156       estopt = gEnv->GetValue("Proof.RateEstimation", "");
00157    }
00158    fUseEstOpt = kEstOff;
00159    if (estopt == "current")
00160       fUseEstOpt = kEstCurrent;
00161    else if (estopt == "average")
00162       fUseEstOpt = kEstAverage;
00163 }
00164 
00165 //______________________________________________________________________________
00166 TVirtualPacketizer::~TVirtualPacketizer()
00167 {
00168    // Destructor.
00169 
00170    SafeDelete(fCircProg);
00171    SafeDelete(fProgress);
00172    SafeDelete(fFailedPackets);
00173    SafeDelete(fConfigParams);
00174    SafeDelete(fProgressPerf);
00175    fProgressStatus = 0; // belongs to the player
00176 }
00177 
00178 //______________________________________________________________________________
00179 Long64_t TVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement *e)
00180 {
00181    // Get entries.
00182 
00183    Long64_t entries;
00184    TFile *file = TFile::Open(e->GetFileName());
00185 
00186    if ( file->IsZombie() ) {
00187       Error("GetEntries","Cannot open file: %s (%s)",
00188             e->GetFileName(), strerror(file->GetErrno()) );
00189       return -1;
00190    }
00191 
00192    TDirectory *dirsave = gDirectory;
00193    if ( ! file->cd(e->GetDirectory()) ) {
00194       Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
00195       delete file;
00196       return -1;
00197    }
00198    TDirectory *dir = gDirectory;
00199    dirsave->cd();
00200 
00201    if ( tree ) {
00202       TKey *key = dir->GetKey(e->GetObjName());
00203       if ( key == 0 ) {
00204          Error("GetEntries","Cannot find tree \"%s\" in %s",
00205                e->GetObjName(), e->GetFileName() );
00206          delete file;
00207          return -1;
00208       }
00209       TTree *t = (TTree *) key->ReadObj();
00210       if ( t == 0 ) {
00211          // Error always reported?
00212          delete file;
00213          return -1;
00214       }
00215       entries = (Long64_t) t->GetEntries();
00216       delete t;
00217 
00218    } else {
00219       TList *keys = dir->GetListOfKeys();
00220       entries = keys->GetSize();
00221    }
00222 
00223    delete file;
00224 
00225    return entries;
00226 }
00227 
00228 //______________________________________________________________________________
00229 TDSetElement *TVirtualPacketizer::GetNextPacket(TSlave *, TMessage *)
00230 {
00231    // Get next packet.
00232 
00233    AbstractMethod("GetNextPacket");
00234    return 0;
00235 }
00236 
00237 //______________________________________________________________________________
00238 void TVirtualPacketizer::StopProcess(Bool_t /*abort*/)
00239 {
00240    // Stop process.
00241 
00242    fStop = kTRUE;
00243 }
00244 
00245 //______________________________________________________________________________
00246 TDSetElement* TVirtualPacketizer::CreateNewPacket(TDSetElement* base,
00247                                                   Long64_t first, Long64_t num)
00248 {
00249    // Creates a new TDSetElement from from base packet starting from
00250    // the first entry with num entries.
00251    // The function returns a new created objects which have to be deleted.
00252 
00253    TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
00254                                          base->GetDirectory(), first, num,
00255                                          0, fDataSet.Data());
00256 
00257    // create TDSetElements for all the friends of elem.
00258    TList *friends = base->GetListOfFriends();
00259    if (friends) {
00260       TIter nxf(friends);
00261       TDSetElement *fe = 0;
00262       while ((fe = (TDSetElement *) nxf())) {
00263          TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
00264                                               fe->GetDirectory(), first, num);
00265          // The alias, if any, is in the element name options ('friend_alias=<alias>|')
00266          elem->AddFriend(xfe, 0);
00267       }
00268    }
00269 
00270    return elem;
00271 }
00272 
00273 //______________________________________________________________________________
00274 Bool_t TVirtualPacketizer::HandleTimer(TTimer *)
00275 {
00276    // Send progress message to client.
00277 
00278    PDB(kPacketizer,2)
00279       Info("HandleTimer", "fProgress: %p, isDone: %d",
00280                           fProgress, TestBit(TVirtualPacketizer::kIsDone));
00281 
00282    if (fProgress == 0 || TestBit(TVirtualPacketizer::kIsDone))
00283       return kFALSE; // timer stopped already or reports completed
00284 
00285    // Prepare progress info
00286    TTime tnow = gSystem->Now();
00287    Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
00288    Long64_t estent = GetEntriesProcessed();
00289    Long64_t estmb = GetBytesRead();
00290    Long64_t estrc = GetReadCalls();
00291 
00292    // Times and counters
00293    Float_t evtrti = -1., mbrti = -1.;
00294    if (TestBit(TVirtualPacketizer::kIsInitializing)) {
00295       // Initialization
00296       fInitTime = now;
00297    } else {
00298       // Fill the reference as first
00299       if (fCircProg->GetEntries() <= 0) {
00300          fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
00301       }
00302       // Time between updates
00303       fTimeUpdt = now - fProcTime;
00304       // Update proc time
00305       fProcTime = now - fInitTime;
00306       // Get the last entry
00307       Double_t *ar = fCircProg->GetArgs();
00308       fCircProg->GetEntry(fCircProg->GetEntries()-1);
00309       // The current rate
00310       Bool_t all = kTRUE;
00311       evtrti = GetCurrentRate(all);
00312       Double_t xall = (all) ? 1. : 0.;
00313       GetEstEntriesProcessed(0, estent, estmb, estrc);
00314       // Fill entry
00315       Double_t evts = (Double_t) estent;
00316       Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.; //--> MB
00317       Double_t rcs = (Double_t) estrc;
00318       fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
00319       fCircProg->GetEntry(fCircProg->GetEntries()-2);
00320       if (all) {
00321          Double_t dt = (Double_t)fProcTime - ar[0];
00322          Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
00323          Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
00324          if (gPerfStats)
00325             gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
00326          // Get the last to spot the cache readings
00327          Double_t rc = (Double_t)estrc - ar[3];
00328          mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
00329       }
00330       // Final report only once (to correctly determine the proc time)
00331       if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
00332          SetBit(TVirtualPacketizer::kIsDone);
00333       PDB(kPacketizer,2)
00334          Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
00335                              estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
00336    }
00337 
00338    if (gProofServ) {
00339       // Message to be sent over
00340       TMessage m(kPROOF_PROGRESS);
00341       if (gProofServ->GetProtocol() > 25) {
00342          Int_t actw = GetActiveWorkers();
00343          Int_t acts = gProofServ->GetActSessions();
00344          Float_t effs = gProofServ->GetEffSessions();
00345          if (fProgressPerf && estent > 0) {
00346             // Estimated query time
00347             if (fProcTime > 0.) {
00348                fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
00349                if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
00350             }
00351            
00352             if (fProgressPerf->GetEntries() <= 0) {
00353                // Fill the first entry
00354                fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
00355             } else {
00356                // Fill only if changed since last entry filled
00357                Float_t *far = fProgressPerf->GetArgs();
00358                fProgressPerf->GetEntry(fProgressPerf->GetEntries()-1);
00359                Bool_t doReport = (fReportPeriod > 0. &&
00360                                  (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
00361                Float_t mbsread = estmb / 1024. / 1024.;
00362                if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
00363                   if (fAWLastFill)
00364                      fProgressPerf->Fill(fProcTimeLast, (Float_t)fActWrksLast,
00365                                          fEvtRateLast, fMBsReadLast, fEffSessLast);
00366                   fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
00367                   fAWLastFill = kFALSE;
00368                } else if (doReport) {
00369                   fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
00370                   fAWLastFill = kFALSE;
00371                } else {
00372                   fAWLastFill = kTRUE;
00373                }
00374                fProcTimeLast = fProcTime;
00375                fActWrksLast = actw;
00376                fEvtRateLast = evtrti;
00377                fMBsReadLast = mbsread;
00378                fEffSessLast = effs;
00379             }
00380          }
00381          // Fill the message now
00382          TProofProgressInfo pi(fTotalEntries, estent, estmb, fInitTime,
00383                                fProcTime, evtrti, mbrti, actw, acts, effs);
00384          m << &pi;
00385       } else if (gProofServ->GetProtocol() > 11) {
00386          // Fill the message now
00387          m << fTotalEntries << estent << estmb << fInitTime << fProcTime
00388            << evtrti << mbrti;
00389       } else {
00390          // Old format
00391          m << fTotalEntries << GetEntriesProcessed();
00392       }
00393       // send message to client;
00394       gProofServ->GetSocket()->Send(m);
00395 
00396    } else {
00397       if (gProof && gProof->GetPlayer()) {
00398          // Log locally
00399          gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
00400                                        fInitTime, fProcTime, evtrti, mbrti);
00401       }
00402    }
00403 
00404    // Final report only once (to correctly determine the proc time)
00405    if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
00406       SetBit(TVirtualPacketizer::kIsDone);
00407 
00408    return kFALSE; // ignored?
00409 }
00410 
00411 //______________________________________________________________________________
00412 void TVirtualPacketizer::SetInitTime()
00413 {
00414    // Set the initialization time
00415 
00416    if (TestBit(TVirtualPacketizer::kIsInitializing)) {
00417       fInitTime = Long64_t(gSystem->Now() - fStartTime) / (Float_t)1000.;
00418       ResetBit(TVirtualPacketizer::kIsInitializing);
00419       PDB(kPacketizer,2)
00420          Info("SetInitTime","fInitTime set to %f s", fInitTime);
00421    }
00422 }

Generated on Tue Jul 5 14:52:20 2011 for ROOT_528-00b_version by  doxygen 1.5.1