00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #ifndef ROOT_TProofPlayer
00013 #define ROOT_TProofPlayer
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #ifndef ROOT_TVirtualProofPlayer
00034 #include "TVirtualProofPlayer.h"
00035 #endif
00036 #ifndef ROOT_TArrayL64
00037 #include "TArrayL64.h"
00038 #endif
00039 #ifndef ROOT_TArrayF
00040 #include "TArrayF.h"
00041 #endif
00042 #ifndef ROOT_TArrayI
00043 #include "TArrayI.h"
00044 #endif
00045 #ifndef ROOT_TList
00046 #include "TList.h"
00047 #endif
00048 #ifndef ROOT_TSystem
00049 #include "TSystem.h"
00050 #endif
00051 #ifndef ROOT_TQueryResult
00052 #include "TQueryResult.h"
00053 #endif
00054 #ifndef ROOT_TProofProgressStatus
00055 #include "TProofProgressStatus.h"
00056 #endif
00057 #ifndef ROOT_TError
00058 #include "TError.h"
00059 #endif
00060
00061 class TSelector;
00062 class TSocket;
00063 class TVirtualPacketizer;
00064 class TSlave;
00065 class TEventIter;
00066 class TProofStats;
00067 class TMutex;
00068 class TStatus;
00069 class TTimer;
00070 class THashList;
00071
00072
00073
00074
00075 class TProofPlayer : public TVirtualProofPlayer {
00076
00077 private:
00078 TList *fAutoBins;
00079
00080 protected:
00081 TList *fInput;
00082 TList *fOutput;
00083 TSelector *fSelector;
00084 TClass *fSelectorClass;
00085 TTimer *fFeedbackTimer;
00086 Long_t fFeedbackPeriod;
00087 TEventIter *fEvIter;
00088 TStatus *fSelStatus;
00089 EExitStatus fExitStatus;
00090 Long64_t fTotalEvents;
00091 TProofProgressStatus *fProgressStatus;
00092
00093 TList *fQueryResults;
00094 TQueryResult *fQuery;
00095 TQueryResult *fPreviousQuery;
00096 Int_t fDrawQueries;
00097 Int_t fMaxDrawQueries;
00098
00099 TTimer *fStopTimer;
00100 TMutex *fStopTimerMtx;
00101
00102 TTimer *fDispatchTimer;
00103
00104 static THashList *fgDrawInputPars;
00105
00106 void *GetSender() { return this; }
00107
00108 virtual Int_t DrawCanvas(TObject *obj);
00109
00110 virtual void SetupFeedback();
00111
00112 virtual void MergeOutput();
00113
00114 public:
00115 virtual void StopFeedback();
00116
00117 protected:
00118 class TCleanup {
00119 private:
00120 TProofPlayer *fPlayer;
00121 public:
00122 TCleanup(TProofPlayer *p) : fPlayer(p) { }
00123 ~TCleanup() { fPlayer->StopFeedback(); }
00124 };
00125
00126 Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg);
00127
00128 void MapOutputListToDataMembers() const;
00129
00130 public:
00131 enum EStatusBits { kDispatchOneEvent = BIT(15), kIsProcessing = BIT(16) };
00132
00133 TProofPlayer(TProof *proof = 0);
00134 virtual ~TProofPlayer();
00135
00136 Long64_t Process(TDSet *set,
00137 const char *selector, Option_t *option = "",
00138 Long64_t nentries = -1, Long64_t firstentry = 0);
00139 TVirtualPacketizer *GetPacketizer() const { return 0; }
00140 Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
00141 Long64_t Finalize(TQueryResult *qr);
00142 Long64_t DrawSelect(TDSet *set, const char *varexp,
00143 const char *selection, Option_t *option = "",
00144 Long64_t nentries = -1, Long64_t firstentry = 0);
00145 Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt,
00146 TString &selector, TString &objname);
00147 void HandleGetTreeHeader(TMessage *mess);
00148 void HandleRecvHisto(TMessage *mess);
00149 void FeedBackCanvas(const char *name, Bool_t create);
00150
00151 void StopProcess(Bool_t abort, Int_t timeout = -1);
00152 void AddInput(TObject *inp);
00153 void ClearInput();
00154 TObject *GetOutput(const char *name) const;
00155 TList *GetOutputList() const;
00156 TList *GetInputList() const { return fInput; }
00157 TList *GetListOfResults() const { return fQueryResults; }
00158 void AddQueryResult(TQueryResult *q);
00159 TQueryResult *GetCurrentQuery() const { return fQuery; }
00160 TQueryResult *GetQueryResult(const char *ref);
00161 void RemoveQueryResult(const char *ref);
00162 void SetCurrentQuery(TQueryResult *q);
00163 void SetMaxDrawQueries(Int_t max) { fMaxDrawQueries = max; }
00164 void RestorePreviousQuery() { fQuery = fPreviousQuery; }
00165 Int_t AddOutputObject(TObject *obj);
00166 void AddOutput(TList *out);
00167 void StoreOutput(TList *out);
00168 void StoreFeedback(TObject *slave, TList *out);
00169 void Progress(Long64_t total, Long64_t processed);
00170 void Progress(TSlave *, Long64_t total, Long64_t processed)
00171 { Progress(total, processed); }
00172 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
00173 Float_t initTime, Float_t procTime,
00174 Float_t evtrti, Float_t mbrti);
00175 void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
00176 Float_t initTime, Float_t procTime,
00177 Float_t evtrti, Float_t mbrti)
00178 { Progress(total, processed, bytesread, initTime, procTime,
00179 evtrti, mbrti); }
00180 void Progress(TProofProgressInfo *pi);
00181 void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); }
00182 void Feedback(TList *objs);
00183
00184 TDrawFeedback *CreateDrawFeedback(TProof *p);
00185 void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
00186 void DeleteDrawFeedback(TDrawFeedback *f);
00187
00188 TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
00189
00190 Int_t ReinitSelector(TQueryResult *qr);
00191
00192 void UpdateAutoBin(const char *name,
00193 Double_t& xmin, Double_t& xmax,
00194 Double_t& ymin, Double_t& ymax,
00195 Double_t& zmin, Double_t& zmax);
00196
00197 Bool_t IsClient() const { return kFALSE; }
00198
00199 EExitStatus GetExitStatus() const { return fExitStatus; }
00200 Long64_t GetEventsProcessed() const { return fProgressStatus->GetEntries(); }
00201 void AddEventsProcessed(Long64_t ev) { fProgressStatus->IncEntries(ev); }
00202
00203 void SetDispatchTimer(Bool_t on = kTRUE);
00204 void SetStopTimer(Bool_t on = kTRUE,
00205 Bool_t abort = kFALSE, Int_t timeout = 0);
00206
00207 virtual void SetInitTime() { }
00208 Long64_t GetCacheSize();
00209 Int_t GetLearnEntries();
00210
00211 void SetProcessing(Bool_t on = kTRUE);
00212 TProofProgressStatus *GetProgressStatus() const { return fProgressStatus; }
00213
00214 ClassDef(TProofPlayer,0)
00215 };
00216
00217
00218
00219
00220 class TProofPlayerLocal : public TProofPlayer {
00221
00222 private:
00223 Bool_t fIsClient;
00224
00225 protected:
00226 void SetupFeedback() { }
00227 void StopFeedback() { }
00228
00229 public:
00230 TProofPlayerLocal(Bool_t client = kTRUE) : fIsClient(client) { }
00231 virtual ~TProofPlayerLocal() { }
00232
00233 Bool_t IsClient() const { return fIsClient; }
00234
00235 ClassDef(TProofPlayerLocal,0)
00236 };
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256 class TProofPlayerRemote : public TProofPlayer {
00257
00258 protected:
00259 TProof *fProof;
00260 TList *fOutputLists;
00261 TList *fFeedback;
00262 TList *fFeedbackLists;
00263 TVirtualPacketizer *fPacketizer;
00264 Bool_t fMergeFiles;
00265 TDSet *fDSet;
00266 ErrorHandlerFunc_t fErrorHandler;
00267
00268 virtual Bool_t HandleTimer(TTimer *timer);
00269 Int_t InitPacketizer(TDSet *dset, Long64_t nentries,
00270 Long64_t first, const char *defpackunit,
00271 const char *defpackdata);
00272 TList *MergeFeedback();
00273 Bool_t MergeOutputFiles();
00274 void NotifyMemory(TObject *obj);
00275 void SetLastMergingMsg(TObject *obj);
00276 virtual Bool_t SendSelector(const char *selector_file);
00277 TProof *GetProof() const { return fProof; }
00278 void SetupFeedback();
00279 void StopFeedback();
00280 void SetSelectorDataMembersFromOutputList();
00281
00282 public:
00283 TProofPlayerRemote(TProof *proof = 0) : fProof(proof), fOutputLists(0), fFeedback(0),
00284 fFeedbackLists(0), fPacketizer(0),
00285 fMergeFiles(kFALSE), fDSet(0), fErrorHandler(0)
00286 { fProgressStatus = new TProofProgressStatus(); }
00287 virtual ~TProofPlayerRemote();
00288 virtual Long64_t Process(TDSet *set, const char *selector,
00289 Option_t *option = "", Long64_t nentries = -1,
00290 Long64_t firstentry = 0);
00291 virtual Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
00292 virtual Long64_t Finalize(TQueryResult *qr);
00293 Long64_t DrawSelect(TDSet *set, const char *varexp,
00294 const char *selection, Option_t *option = "",
00295 Long64_t nentries = -1, Long64_t firstentry = 0);
00296
00297 void RedirectOutput(Bool_t on = kTRUE);
00298 void StopProcess(Bool_t abort, Int_t timeout = -1);
00299 void StoreOutput(TList *out);
00300 virtual void StoreFeedback(TObject *slave, TList *out);
00301 Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged);
00302 TObject *HandleHistogram(TObject *obj);
00303 Int_t AddOutputObject(TObject *obj);
00304 void AddOutput(TList *out);
00305 virtual void MergeOutput();
00306 void Progress(Long64_t total, Long64_t processed);
00307 void Progress(TSlave*, Long64_t total, Long64_t processed)
00308 { Progress(total, processed); }
00309 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
00310 Float_t initTime, Float_t procTime,
00311 Float_t evtrti, Float_t mbrti);
00312 void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
00313 Float_t initTime, Float_t procTime,
00314 Float_t evtrti, Float_t mbrti)
00315 { Progress(total, processed, bytesread, initTime, procTime,
00316 evtrti, mbrti); }
00317 void Progress(TProofProgressInfo *pi);
00318 void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); }
00319 void Feedback(TList *objs);
00320 TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
00321 TVirtualPacketizer *GetPacketizer() const { return fPacketizer; }
00322
00323 Bool_t IsClient() const;
00324
00325 void SetInitTime();
00326
00327 ClassDef(TProofPlayerRemote,0)
00328 };
00329
00330
00331
00332
00333 class TProofPlayerSlave : public TProofPlayer {
00334
00335 private:
00336 TSocket *fSocket;
00337 TList *fFeedback;
00338
00339 Bool_t HandleTimer(TTimer *timer);
00340
00341 protected:
00342 void SetupFeedback();
00343 void StopFeedback();
00344
00345 public:
00346 TProofPlayerSlave(TSocket *socket = 0) : fSocket(socket), fFeedback(0) { }
00347
00348 void HandleGetTreeHeader(TMessage *mess);
00349
00350 ClassDef(TProofPlayerSlave,0)
00351 };
00352
00353
00354
00355
00356 class TProofPlayerSuperMaster : public TProofPlayerRemote {
00357
00358 private:
00359 TArrayL64 fSlaveProgress;
00360 TArrayL64 fSlaveTotals;
00361 TArrayL64 fSlaveBytesRead;
00362 TArrayF fSlaveInitTime;
00363 TArrayF fSlaveProcTime;
00364 TArrayF fSlaveEvtRti;
00365 TArrayF fSlaveMBRti;
00366 TArrayI fSlaveActW;
00367 TArrayI fSlaveTotS;
00368 TArrayF fSlaveEffS;
00369 TList fSlaves;
00370 Bool_t fReturnFeedback;
00371
00372 protected:
00373 Bool_t HandleTimer(TTimer *timer);
00374 void SetupFeedback();
00375
00376 public:
00377 TProofPlayerSuperMaster(TProof *proof = 0) :
00378 TProofPlayerRemote(proof), fReturnFeedback(kFALSE) { }
00379 virtual ~TProofPlayerSuperMaster() { }
00380
00381 Long64_t Process(TDSet *set, const char *selector,
00382 Option_t *option = "", Long64_t nentries = -1,
00383 Long64_t firstentry = 0);
00384 void Progress(Long64_t total, Long64_t processed)
00385 { TProofPlayerRemote::Progress(total, processed); }
00386 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
00387 Float_t initTime, Float_t procTime,
00388 Float_t evtrti, Float_t mbrti)
00389 { TProofPlayerRemote::Progress(total, processed, bytesread,
00390 initTime, procTime, evtrti, mbrti); }
00391 void Progress(TProofProgressInfo *pi) { TProofPlayerRemote::Progress(pi); }
00392 void Progress(TSlave *sl, Long64_t total, Long64_t processed);
00393 void Progress(TSlave *sl, Long64_t total, Long64_t processed, Long64_t bytesread,
00394 Float_t initTime, Float_t procTime,
00395 Float_t evtrti, Float_t mbrti);
00396 void Progress(TSlave *sl, TProofProgressInfo *pi);
00397
00398 ClassDef(TProofPlayerSuperMaster,0)
00399 };
00400
00401 #endif