TProofPlayer.h

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TProofPlayer.h 36592 2010-11-11 10:43:17Z ganis $
00002 // Author: Maarten Ballintijn   07/01/02
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 #ifndef ROOT_TProofPlayer
00013 #define ROOT_TProofPlayer
00014 
00015 
00016 //////////////////////////////////////////////////////////////////////////
00017 //                                                                      //
00018 // TProofPlayer                                                         //
00019 //                                                                      //
00020 // This internal class and its subclasses steer the processing in PROOF.//
00021 // Instances of the TProofPlayer class are created on the worker nodes  //
00022 // per session and do the processing.                                   //
00023 // Instances of its subclass - TProofPlayerRemote are created per each  //
00024 // query on the master(s) and on the client. On the master(s),          //
00025 // TProofPlayerRemote coordinate processing, check the dataset, create  //
00026 // the packetizer and take care of merging the results of the workers.  //
00027 // The instance on the client collects information on the input         //
00028 // (dataset and selector), it invokes the Begin() method and finalizes  //
00029 // the query by calling Terminate().                                    //
00030 //                                                                      //
00031 //////////////////////////////////////////////////////////////////////////
00032 
00033 #ifndef ROOT_TVirtualProofPlayer
00034 #include "TVirtualProofPlayer.h"
00035 #endif
00036 #ifndef ROOT_TArrayL64
00037 #include "TArrayL64.h"
00038 #endif
00039 #ifndef ROOT_TArrayF
00040 #include "TArrayF.h"
00041 #endif
00042 #ifndef ROOT_TArrayI
00043 #include "TArrayI.h"
00044 #endif
00045 #ifndef ROOT_TList
00046 #include "TList.h"
00047 #endif
00048 #ifndef ROOT_TSystem
00049 #include "TSystem.h"
00050 #endif
00051 #ifndef ROOT_TQueryResult
00052 #include "TQueryResult.h"
00053 #endif
00054 #ifndef ROOT_TProofProgressStatus
00055 #include "TProofProgressStatus.h"
00056 #endif
00057 #ifndef ROOT_TError
00058 #include "TError.h"
00059 #endif
00060 
00061 class TSelector;
00062 class TSocket;
00063 class TVirtualPacketizer;
00064 class TSlave;
00065 class TEventIter;
00066 class TProofStats;
00067 class TMutex;
00068 class TStatus;
00069 class TTimer;
00070 class THashList;
00071 
00072 
00073 //------------------------------------------------------------------------
00074 
00075 class TProofPlayer : public TVirtualProofPlayer {
00076 
00077 private:
00078    TList        *fAutoBins;  // Map of min/max values by name for slaves
00079 
00080 protected:
00081    TList        *fInput;           //-> list with input objects
00082    TList        *fOutput;          //   list with output objects
00083    TSelector    *fSelector;        //!  the latest selector
00084    TClass       *fSelectorClass;   //!  class of the latest selector
00085    TTimer       *fFeedbackTimer;   //!  timer for sending intermediate results
00086    Long_t        fFeedbackPeriod;  //!  period (ms) for sending intermediate results
00087    TEventIter   *fEvIter;          //!  iterator on events or objects
00088    TStatus      *fSelStatus;       //!  status of query in progress
00089    EExitStatus   fExitStatus;      //   exit status
00090    Long64_t      fTotalEvents;     //   number of events requested
00091    TProofProgressStatus *fProgressStatus; // the progress status object;
00092 
00093    TList        *fQueryResults;    //List of TQueryResult
00094    TQueryResult *fQuery;           //Instance of TQueryResult currently processed
00095    TQueryResult *fPreviousQuery;   //Previous instance of TQueryResult processed
00096    Int_t         fDrawQueries;     //Number of Draw queries in the list
00097    Int_t         fMaxDrawQueries;  //Max number of Draw queries kept
00098 
00099    TTimer       *fStopTimer;       //Timer associated with a stop request
00100    TMutex       *fStopTimerMtx;    //To protect the stop timer
00101 
00102    TTimer       *fDispatchTimer;    //Dispatch pending events while processing
00103 
00104    static THashList *fgDrawInputPars;  // List of input parameters to be kept on drawing actions
00105 
00106    void         *GetSender() { return this; }  //used to set gTQSender
00107 
00108    virtual Int_t DrawCanvas(TObject *obj); // Canvas drawing via libProofDraw
00109 
00110    virtual void SetupFeedback();  // specialized setup
00111    
00112    virtual void  MergeOutput();
00113 
00114 public:   // fix for broken compilers so TCleanup can call StopFeedback()
00115    virtual void StopFeedback();   // specialized teardown
00116 
00117 protected:
00118    class TCleanup {
00119    private:
00120       TProofPlayer *fPlayer;
00121    public:
00122       TCleanup(TProofPlayer *p) : fPlayer(p) { }
00123       ~TCleanup() { fPlayer->StopFeedback(); }
00124    };
00125 
00126    Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg);
00127 
00128    void MapOutputListToDataMembers() const;
00129 
00130 public:
00131    enum EStatusBits { kDispatchOneEvent = BIT(15), kIsProcessing = BIT(16) };
00132 
00133    TProofPlayer(TProof *proof = 0);
00134    virtual ~TProofPlayer();
00135 
00136    Long64_t  Process(TDSet *set,
00137                      const char *selector, Option_t *option = "",
00138                      Long64_t nentries = -1, Long64_t firstentry = 0);
00139    TVirtualPacketizer *GetPacketizer() const { return 0; }
00140    Long64_t  Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
00141    Long64_t  Finalize(TQueryResult *qr);
00142    Long64_t  DrawSelect(TDSet *set, const char *varexp,
00143                         const char *selection, Option_t *option = "",
00144                         Long64_t nentries = -1, Long64_t firstentry = 0);
00145    Int_t     GetDrawArgs(const char *var, const char *sel, Option_t *opt,
00146                          TString &selector, TString &objname);
00147    void      HandleGetTreeHeader(TMessage *mess);
00148    void      HandleRecvHisto(TMessage *mess);
00149    void      FeedBackCanvas(const char *name, Bool_t create);
00150 
00151    void      StopProcess(Bool_t abort, Int_t timeout = -1);
00152    void      AddInput(TObject *inp);
00153    void      ClearInput();
00154    TObject  *GetOutput(const char *name) const;
00155    TList    *GetOutputList() const;
00156    TList    *GetInputList() const { return fInput; }
00157    TList    *GetListOfResults() const { return fQueryResults; }
00158    void      AddQueryResult(TQueryResult *q);
00159    TQueryResult *GetCurrentQuery() const { return fQuery; }
00160    TQueryResult *GetQueryResult(const char *ref);
00161    void      RemoveQueryResult(const char *ref);
00162    void      SetCurrentQuery(TQueryResult *q);
00163    void      SetMaxDrawQueries(Int_t max) { fMaxDrawQueries = max; }
00164    void      RestorePreviousQuery() { fQuery = fPreviousQuery; }
00165    Int_t     AddOutputObject(TObject *obj);
00166    void      AddOutput(TList *out);   // Incorporate a list
00167    void      StoreOutput(TList *out);   // Adopts the list
00168    void      StoreFeedback(TObject *slave, TList *out); // Adopts the list
00169    void      Progress(Long64_t total, Long64_t processed); // *SIGNAL*
00170    void      Progress(TSlave *, Long64_t total, Long64_t processed)
00171                 { Progress(total, processed); }
00172    void      Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
00173                       Float_t initTime, Float_t procTime,
00174                       Float_t evtrti, Float_t mbrti); // *SIGNAL*
00175    void      Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
00176                       Float_t initTime, Float_t procTime,
00177                       Float_t evtrti, Float_t mbrti)
00178                 { Progress(total, processed, bytesread, initTime, procTime,
00179                            evtrti, mbrti); } // *SIGNAL*
00180    void      Progress(TProofProgressInfo *pi); // *SIGNAL*
00181    void      Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); } // *SIGNAL*
00182    void      Feedback(TList *objs); // *SIGNAL*
00183 
00184    TDrawFeedback *CreateDrawFeedback(TProof *p);
00185    void           SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
00186    void           DeleteDrawFeedback(TDrawFeedback *f);
00187 
00188    TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
00189 
00190    Int_t     ReinitSelector(TQueryResult *qr);
00191 
00192    void      UpdateAutoBin(const char *name,
00193                            Double_t& xmin, Double_t& xmax,
00194                            Double_t& ymin, Double_t& ymax,
00195                            Double_t& zmin, Double_t& zmax);
00196 
00197    Bool_t    IsClient() const { return kFALSE; }
00198 
00199    EExitStatus GetExitStatus() const { return fExitStatus; }
00200    Long64_t    GetEventsProcessed() const { return fProgressStatus->GetEntries(); }
00201    void        AddEventsProcessed(Long64_t ev) { fProgressStatus->IncEntries(ev); }
00202 
00203    void      SetDispatchTimer(Bool_t on = kTRUE);
00204    void      SetStopTimer(Bool_t on = kTRUE,
00205                           Bool_t abort = kFALSE, Int_t timeout = 0);
00206 
00207    virtual void      SetInitTime() { }
00208    Long64_t  GetCacheSize();
00209    Int_t     GetLearnEntries();
00210 
00211    void              SetProcessing(Bool_t on = kTRUE);
00212    TProofProgressStatus  *GetProgressStatus() const { return fProgressStatus; }
00213 
00214    ClassDef(TProofPlayer,0)  // Basic PROOF player
00215 };
00216 
00217 
00218 //------------------------------------------------------------------------
00219 
00220 class TProofPlayerLocal : public TProofPlayer {
00221 
00222 private:
00223    Bool_t   fIsClient;
00224 
00225 protected:
00226    void SetupFeedback() { }
00227    void StopFeedback() { }
00228 
00229 public:
00230    TProofPlayerLocal(Bool_t client = kTRUE) : fIsClient(client) { }
00231    virtual ~TProofPlayerLocal() { }
00232 
00233    Bool_t         IsClient() const { return fIsClient; }
00234 
00235    ClassDef(TProofPlayerLocal,0)  // PROOF player running on client
00236 };
00237 
00238 
00239 //------------------------------------------------------------------------
00240 
00241 //////////////////////////////////////////////////////////////////////////
00242 //                                                                      //
00243 // TProofPlayerRemote                                                   //
00244 //                                                                      //
00245 // Instances of TProofPlayerRemote are created per each query on the    //
00246 // master(s) and on the client. On the master(s), TProofPlayerRemote    //
00247 // coordinate processing, check the dataset, create the packetizer      //
00248 // and take care of merging the results of the workers.                 //
00249 // The instance on the client collects information on the input         //
00250 // (dataset and selector), it invokes the Begin() method and finalizes  //
00251 // the query by calling Terminate().                                    //
00252 //                                                                      //
00253 //////////////////////////////////////////////////////////////////////////
00254 
00255 
00256 class TProofPlayerRemote : public TProofPlayer {
00257 
00258 protected:
00259    TProof             *fProof;         // link to associated PROOF session
00260    TList              *fOutputLists;   // results returned by slaves
00261    TList              *fFeedback;      // reference for use on master
00262    TList              *fFeedbackLists; // intermediate results
00263    TVirtualPacketizer *fPacketizer;    // transform TDSet into packets for slaves
00264    Bool_t              fMergeFiles;    // is True when merging output files centrally is needed
00265    TDSet              *fDSet;          //!tdset for current processing
00266    ErrorHandlerFunc_t  fErrorHandler;  // Store previous handler when redirecting output
00267 
00268    virtual Bool_t  HandleTimer(TTimer *timer);
00269    Int_t           InitPacketizer(TDSet *dset, Long64_t nentries,
00270                                   Long64_t first, const char *defpackunit,
00271                                   const char *defpackdata);
00272    TList          *MergeFeedback();
00273    Bool_t          MergeOutputFiles();
00274    void            NotifyMemory(TObject *obj);
00275    void            SetLastMergingMsg(TObject *obj);
00276    virtual Bool_t  SendSelector(const char *selector_file); //send selector to slaves
00277    TProof         *GetProof() const { return fProof; }
00278    void            SetupFeedback();  // specialized setup
00279    void            StopFeedback();   // specialized teardown
00280    void            SetSelectorDataMembersFromOutputList();
00281 
00282 public:
00283    TProofPlayerRemote(TProof *proof = 0) : fProof(proof), fOutputLists(0), fFeedback(0),
00284                                            fFeedbackLists(0), fPacketizer(0),
00285                                            fMergeFiles(kFALSE), fDSet(0), fErrorHandler(0)
00286                                            { fProgressStatus = new TProofProgressStatus(); }
00287    virtual ~TProofPlayerRemote();   // Owns the fOutput list
00288    virtual Long64_t Process(TDSet *set, const char *selector,
00289                             Option_t *option = "", Long64_t nentries = -1,
00290                             Long64_t firstentry = 0);
00291    virtual Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
00292    virtual Long64_t Finalize(TQueryResult *qr);
00293    Long64_t       DrawSelect(TDSet *set, const char *varexp,
00294                              const char *selection, Option_t *option = "",
00295                              Long64_t nentries = -1, Long64_t firstentry = 0);
00296 
00297    void           RedirectOutput(Bool_t on = kTRUE);
00298    void           StopProcess(Bool_t abort, Int_t timeout = -1);
00299    void           StoreOutput(TList *out);   // Adopts the list
00300    virtual void   StoreFeedback(TObject *slave, TList *out); // Adopts the list
00301    Int_t          Incorporate(TObject *obj, TList *out, Bool_t &merged);
00302    TObject       *HandleHistogram(TObject *obj);
00303    Int_t          AddOutputObject(TObject *obj);
00304    void           AddOutput(TList *out);   // Incorporate a list
00305    virtual void   MergeOutput();
00306    void           Progress(Long64_t total, Long64_t processed); // *SIGNAL*
00307    void           Progress(TSlave*, Long64_t total, Long64_t processed)
00308                      { Progress(total, processed); }
00309    void           Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
00310                            Float_t initTime, Float_t procTime,
00311                            Float_t evtrti, Float_t mbrti); // *SIGNAL*
00312    void           Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
00313                            Float_t initTime, Float_t procTime,
00314                            Float_t evtrti, Float_t mbrti)
00315                       { Progress(total, processed, bytesread, initTime, procTime,
00316                            evtrti, mbrti); } // *SIGNAL*
00317    void           Progress(TProofProgressInfo *pi); // *SIGNAL*
00318    void           Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); } // *SIGNAL*
00319    void           Feedback(TList *objs); // *SIGNAL*
00320    TDSetElement  *GetNextPacket(TSlave *slave, TMessage *r);
00321    TVirtualPacketizer *GetPacketizer() const { return fPacketizer; }
00322 
00323    Bool_t         IsClient() const;
00324 
00325    void           SetInitTime();
00326 
00327    ClassDef(TProofPlayerRemote,0)  // PROOF player running on master server
00328 };
00329 
00330 
00331 //------------------------------------------------------------------------
00332 
00333 class TProofPlayerSlave : public TProofPlayer {
00334 
00335 private:
00336    TSocket *fSocket;
00337    TList   *fFeedback;  // List of objects to send updates of
00338 
00339    Bool_t HandleTimer(TTimer *timer);
00340 
00341 protected:
00342    void SetupFeedback();
00343    void StopFeedback();
00344 
00345 public:
00346    TProofPlayerSlave(TSocket *socket = 0) : fSocket(socket), fFeedback(0) { }
00347 
00348    void  HandleGetTreeHeader(TMessage *mess);
00349 
00350    ClassDef(TProofPlayerSlave,0)  // PROOF player running on slave server
00351 };
00352 
00353 
00354 //------------------------------------------------------------------------
00355 
00356 class TProofPlayerSuperMaster : public TProofPlayerRemote {
00357 
00358 private:
00359    TArrayL64 fSlaveProgress;
00360    TArrayL64 fSlaveTotals;
00361    TArrayL64 fSlaveBytesRead;
00362    TArrayF   fSlaveInitTime;
00363    TArrayF   fSlaveProcTime;
00364    TArrayF   fSlaveEvtRti;
00365    TArrayF   fSlaveMBRti;
00366    TArrayI   fSlaveActW;
00367    TArrayI   fSlaveTotS;
00368    TArrayF   fSlaveEffS;
00369    TList     fSlaves;
00370    Bool_t    fReturnFeedback;
00371 
00372 protected:
00373    Bool_t HandleTimer(TTimer *timer);
00374    void   SetupFeedback();
00375 
00376 public:
00377    TProofPlayerSuperMaster(TProof *proof = 0) :
00378       TProofPlayerRemote(proof), fReturnFeedback(kFALSE) { }
00379    virtual ~TProofPlayerSuperMaster() { }
00380 
00381    Long64_t Process(TDSet *set, const char *selector,
00382                     Option_t *option = "", Long64_t nentries = -1,
00383                     Long64_t firstentry = 0);
00384    void  Progress(Long64_t total, Long64_t processed)
00385                     { TProofPlayerRemote::Progress(total, processed); }
00386    void  Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
00387                   Float_t initTime, Float_t procTime,
00388                   Float_t evtrti, Float_t mbrti)
00389                     { TProofPlayerRemote::Progress(total, processed, bytesread,
00390                                                    initTime, procTime, evtrti, mbrti); }
00391    void  Progress(TProofProgressInfo *pi) { TProofPlayerRemote::Progress(pi); }
00392    void  Progress(TSlave *sl, Long64_t total, Long64_t processed);
00393    void  Progress(TSlave *sl, Long64_t total, Long64_t processed, Long64_t bytesread,
00394                   Float_t initTime, Float_t procTime,
00395                   Float_t evtrti, Float_t mbrti);
00396    void  Progress(TSlave *sl, TProofProgressInfo *pi);
00397 
00398    ClassDef(TProofPlayerSuperMaster,0)  // PROOF player running on super master
00399 };
00400 
00401 #endif

Generated on Tue Jul 5 14:28:03 2011 for ROOT_528-00b_version by  doxygen 1.5.1