00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #ifndef ROOT_TVirtualPacketizer
00013 #define ROOT_TVirtualPacketizer
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #ifndef ROOT_TObject
00037 #include "TObject.h"
00038 #endif
00039 #ifndef ROOT_TSlave
00040 #include "TSlave.h"
00041 #endif
00042 #ifndef ROOT_TProofProgressStatus
00043 #include "TProofProgressStatus.h"
00044 #endif
00045 #ifndef ROOT_TTime
00046 #include "TTime.h"
00047 #endif
00048
00049
00050 class TDSet;
00051 class TDSetElement;
00052 class TList;
00053 class TMap;
00054 class TMessage;
00055 class TNtuple;
00056 class TNtupleD;
00057 class TProofProgressInfo;
00058 class TSlave;
00059
00060
00061 class TVirtualPacketizer : public TObject {
00062
00063 public:
00064 class TVirtualSlaveStat;
00065
00066 protected:
00067 enum EUseEstOpt {
00068 kEstOff = 0,
00069 kEstCurrent = 1,
00070 kEstAverage = 2
00071 };
00072
00073
00074 Double_t fMinPacketTime;
00075 Double_t fMaxPacketTime;
00076 TList *fConfigParams;
00077
00078 TMap *fSlaveStats;
00079
00080 TProofProgressStatus *fProgressStatus;
00081 TTimer *fProgress;
00082
00083 Long64_t fTotalEntries;
00084
00085 TList *fFailedPackets;
00086
00087
00088 TTime fStartTime;
00089 Float_t fInitTime;
00090 Float_t fProcTime;
00091 Float_t fTimeUpdt;
00092 TNtupleD *fCircProg;
00093
00094 Long_t fCircN;
00095
00096 TNtuple *fProgressPerf;
00097 Float_t fProcTimeLast;
00098 Int_t fActWrksLast;
00099 Float_t fEvtRateLast;
00100 Float_t fMBsReadLast;
00101 Float_t fEffSessLast;
00102 Bool_t fAWLastFill;
00103 Float_t fReportPeriod;
00104
00105 EUseEstOpt fUseEstOpt;
00106
00107 Bool_t fValid;
00108 Bool_t fStop;
00109
00110 TString fDataSet;
00111
00112 TVirtualPacketizer(TList *input, TProofProgressStatus *st = 0);
00113 TVirtualPacketizer(const TVirtualPacketizer &);
00114 void operator=(const TVirtualPacketizer &);
00115
00116 TDSetElement *CreateNewPacket(TDSetElement* base, Long64_t first, Long64_t num);
00117 Long64_t GetEntries(Bool_t tree, TDSetElement *e);
00118 virtual Bool_t HandleTimer(TTimer *timer);
00119
00120 public:
00121 enum EStatusBits { kIsInitializing = BIT(16), kIsDone = BIT(17), kIsTree = BIT(18) };
00122 virtual ~TVirtualPacketizer();
00123
00124 Bool_t IsValid() const { return fValid; }
00125 Long64_t GetEntriesProcessed() const { return (fProgressStatus? fProgressStatus->GetEntries() : 0); }
00126 virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
00127 { ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); return 0; }
00128 virtual Float_t GetCurrentRate(Bool_t &all) { all = kTRUE; return (fProgressStatus? fProgressStatus->GetCurrentRate() : 0.); }
00129 Long64_t GetTotalEntries() const { return fTotalEntries; }
00130 virtual TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
00131 virtual void SetInitTime();
00132 virtual void StopProcess(Bool_t abort);
00133 TList *GetFailedPackets() { return fFailedPackets; }
00134 void SetFailedPackets(TList *list) { fFailedPackets = list; }
00135
00136 Long64_t GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); }
00137 Long64_t GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); }
00138 Double_t GetCumProcTime() const { return fProgressStatus->GetProcTime(); }
00139 Float_t GetInitTime() const { return fInitTime; }
00140 Float_t GetProcTime() const { return fProcTime; }
00141 TNtuple *GetProgressPerf(Bool_t steal = kFALSE) { if (steal) { TNtuple *n = fProgressPerf; fProgressPerf = 0; return n;
00142 } else { return fProgressPerf;} }
00143 TList *GetConfigParams(Bool_t steal = kFALSE) { if (steal) { TList *l = fConfigParams; fConfigParams = 0; return l;
00144 } else { return fConfigParams;} }
00145 virtual void MarkBad(TSlave * , TProofProgressStatus * , TList ** ) { return; }
00146 virtual Int_t AddProcessed(TSlave * , TProofProgressStatus * ,
00147 Double_t , TList ** ) { return 0; }
00148 TProofProgressStatus *GetStatus() { return fProgressStatus; }
00149 void SetProgressStatus(TProofProgressStatus *st) { fProgressStatus = st; }
00150 void SetTotalEntries(Long64_t ent) { fTotalEntries = ent; }
00151
00152 TMap *GetSlaveStats() const { return fSlaveStats; }
00153
00154 virtual Int_t GetActiveWorkers() { return -1; }
00155
00156 ClassDef(TVirtualPacketizer,0)
00157 };
00158
00159
00160
00161 class TVirtualPacketizer::TVirtualSlaveStat : public TObject {
00162
00163 friend class TPacketizerAdaptive;
00164 friend class TPacketizer;
00165
00166 protected:
00167 TString fWrkFQDN;
00168 TSlave *fSlave;
00169 TProofProgressStatus *fStatus;
00170
00171 public:
00172 const char *GetName() const { return fWrkFQDN.Data(); }
00173 const char *GetOrdinal() const { return fSlave->GetOrdinal(); }
00174 Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
00175 Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
00176 Float_t GetAvgRate() { return fStatus->GetRate(); }
00177 TProofProgressStatus *GetProgressStatus() { return fStatus; }
00178 virtual TProofProgressStatus *AddProcessed(TProofProgressStatus *st) = 0;
00179 };
00180
00181 #endif