TPacketizerAdaptive.h

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TPacketizerAdaptive.h 35196 2010-09-08 11:44:34Z ganis $
00002 // Author: Jan Iwaszkiewicz   11/12/06
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 #ifndef ROOT_TPacketizerAdaptive
00013 #define ROOT_TPacketizerAdaptive
00014 
00015 //////////////////////////////////////////////////////////////////////////
00016 //                                                                      //
00017 // TPacketizerAdaptive                                                  //
00018 //                                                                      //
00019 // This packetizer is based on TPacketizer but uses different           //
00020 // load-balancing algorithms and data structures.                       //
00021 // Two main improvements in the load-balancing strategy:                //
00022 // - First one was to change the order in which the files are assigned  //
00023 //   to the computing nodes in such a way that network transfers are    //
00024 //   evenly distributed in the query time. Transfer of the remote files //
00025 //   was often becoming a bottleneck at the end of a query.             //
00026 // - The other improvement is the use of time-based packet size. We     //
00027 //   measure the processing rate of all the nodes and calculate the     //
00028 //   packet size, so that it takes certain amount of time. In this way  //
00029 //   packetizer prevents the situation where the query can't finish     //
00030 //   because of one slow node.                                          //
00031 //                                                                      //
00032 // The data structures: TFileStat, TFileNode and TSlaveStat are         //
00033 // enriched + changed and TFileNode::Compare method is changed.         //
00034 //                                                                      //
00035 //////////////////////////////////////////////////////////////////////////
00036 
00037 #ifndef ROOT_TVirtualPacketizer
00038 #include "TVirtualPacketizer.h"
00039 #endif
00040 
00041 
00042 class TMessage;
00043 class TTree;
00044 class TMap;
00045 class TNtupleD;
00046 class TProofStats;
00047 class TRandom;
00048 class TSortedList;
00049 
00050 class TPacketizerAdaptive : public TVirtualPacketizer {
00051 
00052 public:              // public because of Sun CC bug
00053    class TFileNode;
00054    class TFileStat;
00055    class TSlaveStat;
00056 
00057 private:
00058    TList         *fFileNodes;    // nodes with files
00059    TList         *fUnAllocated;  // nodes with unallocated files
00060    TList         *fActive;       // nodes with unfinished files
00061    Int_t          fMaxPerfIdx;   // maximum of our slaves' performance index
00062    TList         *fPartitions;   // list of partitions on nodes
00063 
00064    TSortedList   *fFilesToProcess; // Global list of files (TFileStat) to be processed
00065 
00066    Bool_t         fCachePacketSync; // control synchronization of cache and packet sizes
00067    Double_t       fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync
00068 
00069    Float_t        fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers
00070    Long64_t       fNEventsOnRemLoc;       // number of events in currently
00071                                           // unalloc files on non-worker loc.
00072    Float_t        fBaseLocalPreference;   // indicates how much more likely the nodes will be
00073                                           // to open their local files (1 means indifferent)
00074    Bool_t         fForceLocal;            // if 1 - eliminate the remote processing
00075 
00076    Long_t         fMaxSlaveCnt;        // maximum number of workers per filenode (Long_t to avoid
00077                                        // warnings from backward compatibility support)
00078    Int_t          fPacketAsAFraction;  // used to calculate the packet size
00079                                        // fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves)
00080                                        // fPacketAsAFraction can be interpreted as follows:
00081                                        // assuming all slaves have equal processing rate, packet size
00082                                        // is (#events processed by 1 slave) / fPacketSizeAsAFraction.
00083                                        // It can be set with PROOF_PacketAsAFraction in input list.
00084    Int_t          fStrategy;           // 0 means the classic and 1 (default) - the adaptive strategy
00085 
00086    TPacketizerAdaptive();
00087    TPacketizerAdaptive(const TPacketizerAdaptive&);    // no implementation, will generate
00088    void           InitStats();                         // initialise the stats
00089    void operator=(const TPacketizerAdaptive&);         // error on accidental usage
00090 
00091    TFileNode     *NextNode();
00092    void           RemoveUnAllocNode(TFileNode *);
00093 
00094    TFileNode     *NextActiveNode();
00095    void           RemoveActiveNode(TFileNode *);
00096 
00097    TFileStat     *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0);
00098    TFileStat     *GetNextActive();
00099    void           RemoveActive(TFileStat *file);
00100 
00101    void           Reset();
00102    void           ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE);
00103    Int_t          ReassignPacket(TDSetElement *e, TList **listOfMissingFiles);
00104    void           SplitPerHost(TList *elements, TList **listOfMissingFiles);
00105 
00106 public:
00107    TPacketizerAdaptive(TDSet *dset, TList *slaves, Long64_t first, Long64_t num,
00108                        TList *input, TProofProgressStatus *st);
00109    virtual ~TPacketizerAdaptive();
00110 
00111    Int_t         AddProcessed(TSlave *sl, TProofProgressStatus *st,
00112                                Double_t latency, TList **listOfMissingFiles = 0);
00113    Int_t         GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls);
00114    Float_t       GetCurrentRate(Bool_t &all);
00115    Int_t         CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent);
00116    TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
00117    void          MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles);
00118 
00119    Int_t         GetActiveWorkers();
00120 
00121    ClassDef(TPacketizerAdaptive,0)  //Generate work packets for parallel processing
00122 };
00123 
00124 #endif

Generated on Tue Jul 5 14:27:41 2011 for ROOT_528-00b_version by  doxygen 1.5.1