TVirtualPacketizer.h

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TVirtualPacketizer.h 37396 2010-12-08 13:12:00Z rdm $
00002 // Author: Maarten Ballintijn    9/7/2002
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 #ifndef ROOT_TVirtualPacketizer
00013 #define ROOT_TVirtualPacketizer
00014 
00015 //////////////////////////////////////////////////////////////////////////
00016 //                                                                      //
00017 // TVirtualPacketizer                                                   //
00018 //                                                                      //
00019 // Packetizer is a load balancing object created for each query.        //
00020 // It generates packets to be processed on PROOF worker servers.        //
00021 // A packet is an event range (begin entry and number of entries) or    //
00022 // object range (first object and number of objects) in a TTree         //
00023 // (entries) or a directory (objects) in a file.                        //
00024 // Packets are generated taking into account the performance of the     //
00025 // remote machine, the time it took to process a previous packet on     //
00026 // the remote machine, the locality of the database files, etc.         //
00027 //                                                                      //
00028 // TVirtualPacketizer includes common parts of PROOF packetizers.       //
00029 // Look in subclasses for details.                                      //
00030 // The default packetizer is TPacketizerAdaptive.                       //
00031 // To use an alternative one, for instance - the TPacketizer, call:     //
00032 // proof->SetParameter("PROOF_Packetizer", "TPacketizer");              //
00033 //                                                                      //
00034 //////////////////////////////////////////////////////////////////////////
00035 
00036 #ifndef ROOT_TObject
00037 #include "TObject.h"
00038 #endif
00039 #ifndef ROOT_TSlave
00040 #include "TSlave.h"
00041 #endif
00042 #ifndef ROOT_TProofProgressStatus
00043 #include "TProofProgressStatus.h"
00044 #endif
00045 #ifndef ROOT_TTime
00046 #include "TTime.h"
00047 #endif
00048 
00049 
00050 class TDSet;
00051 class TDSetElement;
00052 class TList;
00053 class TMap;
00054 class TMessage;
00055 class TNtuple;
00056 class TNtupleD;
00057 class TProofProgressInfo;
00058 class TSlave;
00059 
00060 
00061 class TVirtualPacketizer : public TObject {
00062 
00063 public:              // public because of Sun CC bug
00064    class TVirtualSlaveStat;
00065 
00066 protected:
00067    enum EUseEstOpt {        // Option for usage of estimated values
00068       kEstOff     = 0,
00069       kEstCurrent = 1,
00070       kEstAverage = 2
00071    };
00072 
00073    // General configuration parameters
00074    Double_t  fMinPacketTime; // minimum packet time
00075    Double_t  fMaxPacketTime; // maximum packet time
00076    TList    *fConfigParams;  // List of configuration parameters
00077 
00078    TMap     *fSlaveStats;   // slave status, keyed by correspondig TSlave
00079 
00080    TProofProgressStatus *fProgressStatus; // pointer to status in the player.
00081    TTimer   *fProgress;     // progress updates timer
00082 
00083    Long64_t  fTotalEntries; // total number of entries to be distributed;
00084                             // not used in the progressive packetizer
00085    TList    *fFailedPackets;// a list of packets that failed while processing
00086 
00087    // Members for progress info
00088    TTime     fStartTime;    // time offset
00089    Float_t   fInitTime;     // time before processing
00090    Float_t   fProcTime;     // time since start of processing
00091    Float_t   fTimeUpdt;     // time between updates
00092    TNtupleD *fCircProg;     // Keeps circular info for "instantenous"
00093                             // rate calculations
00094    Long_t    fCircN;        // Circularity
00095 
00096    TNtuple  *fProgressPerf; // {Active workers, evt rate, MBs read} as a function of processing time
00097    Float_t   fProcTimeLast; // Time of the last measurement
00098    Int_t     fActWrksLast;  // Active workers at fProcTimeLast
00099    Float_t   fEvtRateLast;  // Evt rate at fProcTimeLast
00100    Float_t   fMBsReadLast;  // MBs read at fProcTimeLast
00101    Float_t   fEffSessLast;  // Number of effective sessions at fProcTimeLast
00102    Bool_t    fAWLastFill;   // Whether to fill the last measurement
00103    Float_t   fReportPeriod; // Time between reports if nothing changes (estimated proc time / 100)
00104 
00105    EUseEstOpt fUseEstOpt;   // Control usage of estimated values for the progress info
00106 
00107    Bool_t   fValid;           // Constructed properly?
00108    Bool_t   fStop;            // Termination of Process() requested?
00109 
00110    TString  fDataSet;         // Name of the dataset being processed (for dataset-driven runs)
00111 
00112    TVirtualPacketizer(TList *input, TProofProgressStatus *st = 0);
00113    TVirtualPacketizer(const TVirtualPacketizer &);  // no implementation, will generate
00114    void operator=(const TVirtualPacketizer &);      // error on accidental usage
00115 
00116    TDSetElement  *CreateNewPacket(TDSetElement* base, Long64_t first, Long64_t num);
00117    Long64_t       GetEntries(Bool_t tree, TDSetElement *e); // Num of entries or objects
00118    virtual Bool_t HandleTimer(TTimer *timer);
00119 
00120 public:
00121    enum EStatusBits { kIsInitializing = BIT(16), kIsDone = BIT(17), kIsTree = BIT(18) };
00122    virtual ~TVirtualPacketizer();
00123 
00124    Bool_t                  IsValid() const { return fValid; }
00125    Long64_t                GetEntriesProcessed() const { return (fProgressStatus? fProgressStatus->GetEntries() : 0); }
00126    virtual Int_t           GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
00127                            { ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); return 0; }
00128    virtual Float_t         GetCurrentRate(Bool_t &all) { all = kTRUE; return (fProgressStatus? fProgressStatus->GetCurrentRate() : 0.); }
00129    Long64_t                GetTotalEntries() const { return fTotalEntries; }
00130    virtual TDSetElement   *GetNextPacket(TSlave *sl, TMessage *r);
00131    virtual void            SetInitTime();
00132    virtual void            StopProcess(Bool_t abort);
00133    TList                  *GetFailedPackets() { return fFailedPackets; }
00134    void                    SetFailedPackets(TList *list) { fFailedPackets = list; }
00135 
00136    Long64_t      GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); }
00137    Long64_t      GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); }
00138    Double_t      GetCumProcTime() const { return fProgressStatus->GetProcTime(); }
00139    Float_t       GetInitTime() const { return fInitTime; }
00140    Float_t       GetProcTime() const { return fProcTime; }
00141    TNtuple      *GetProgressPerf(Bool_t steal = kFALSE) { if (steal) { TNtuple *n = fProgressPerf; fProgressPerf = 0; return n;
00142                                                                 } else { return fProgressPerf;} }
00143    TList        *GetConfigParams(Bool_t steal = kFALSE) { if (steal) { TList *l = fConfigParams; fConfigParams = 0; return l;
00144                                                                 } else { return fConfigParams;} }
00145    virtual void  MarkBad(TSlave * /*s*/, TProofProgressStatus * /*status*/, TList ** /*missingFiles*/) { return; }
00146    virtual Int_t AddProcessed(TSlave * /*sl*/, TProofProgressStatus * /*st*/,
00147                     Double_t /*lat*/, TList ** /*missingFiles*/) { return 0; }
00148    TProofProgressStatus *GetStatus() { return fProgressStatus; }
00149    void          SetProgressStatus(TProofProgressStatus *st) { fProgressStatus = st; }
00150    void          SetTotalEntries(Long64_t ent) { fTotalEntries = ent; }
00151 
00152    TMap         *GetSlaveStats() const { return fSlaveStats; }
00153 
00154    virtual Int_t GetActiveWorkers() { return -1; }
00155 
00156    ClassDef(TVirtualPacketizer,0)  //Generate work packets for parallel processing
00157 };
00158 
00159 //------------------------------------------------------------------------------
00160 
00161 class TVirtualPacketizer::TVirtualSlaveStat : public TObject {
00162 
00163 friend class TPacketizerAdaptive;
00164 friend class TPacketizer;
00165 
00166 protected:
00167    TString        fWrkFQDN;       // Worker FQDN
00168    TSlave        *fSlave;         // corresponding TSlave record
00169    TProofProgressStatus *fStatus; // status as of the last finished packet
00170 
00171 public:
00172    const char *GetName() const { return fWrkFQDN.Data(); }
00173    const char *GetOrdinal() const { return fSlave->GetOrdinal(); }
00174    Long64_t    GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
00175    Double_t    GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
00176    Float_t     GetAvgRate() { return fStatus->GetRate(); }
00177    TProofProgressStatus *GetProgressStatus() { return fStatus; }
00178    virtual TProofProgressStatus *AddProcessed(TProofProgressStatus *st) = 0;
00179 };
00180 
00181 #endif

Generated on Tue Jul 5 14:28:28 2011 for ROOT_528-00b_version by  doxygen 1.5.1