00001 // @(#)root/proofplayer:$Id: TPacketizerAdaptive.h 35196 2010-09-08 11:44:34Z ganis $ 00002 // Author: Jan Iwaszkiewicz 11/12/06 00003 00004 /************************************************************************* 00005 * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. * 00006 * All rights reserved. * 00007 * * 00008 * For the licensing terms see $ROOTSYS/LICENSE. * 00009 * For the list of contributors see $ROOTSYS/README/CREDITS. * 00010 *************************************************************************/ 00011 00012 #ifndef ROOT_TPacketizerAdaptive 00013 #define ROOT_TPacketizerAdaptive 00014 00015 ////////////////////////////////////////////////////////////////////////// 00016 // // 00017 // TPacketizerAdaptive // 00018 // // 00019 // This packetizer is based on TPacketizer but uses different // 00020 // load-balancing algorithms and data structures. // 00021 // Two main improvements in the load-balancing strategy: // 00022 // - First one was to change the order in which the files are assigned // 00023 // to the computing nodes in such a way that network transfers are // 00024 // evenly distributed in the query time. Transfer of the remote files // 00025 // was often becoming a bottleneck at the end of a query. // 00026 // - The other improvement is the use of time-based packet size. We // 00027 // measure the processing rate of all the nodes and calculate the // 00028 // packet size, so that it takes certain amount of time. In this way // 00029 // packetizer prevents the situation where the query can't finish // 00030 // because of one slow node. // 00031 // // 00032 // The data structures: TFileStat, TFileNode and TSlaveStat are // 00033 // enriched + changed and TFileNode::Compare method is changed. // 00034 // // 00035 ////////////////////////////////////////////////////////////////////////// 00036 00037 #ifndef ROOT_TVirtualPacketizer 00038 #include "TVirtualPacketizer.h" 00039 #endif 00040 00041 00042 class TMessage; 00043 class TTree; 00044 class TMap; 00045 class TNtupleD; 00046 class TProofStats; 00047 class TRandom; 00048 class TSortedList; 00049 00050 class TPacketizerAdaptive : public TVirtualPacketizer { 00051 00052 public: // public because of Sun CC bug 00053 class TFileNode; 00054 class TFileStat; 00055 class TSlaveStat; 00056 00057 private: 00058 TList *fFileNodes; // nodes with files 00059 TList *fUnAllocated; // nodes with unallocated files 00060 TList *fActive; // nodes with unfinished files 00061 Int_t fMaxPerfIdx; // maximum of our slaves' performance index 00062 TList *fPartitions; // list of partitions on nodes 00063 00064 TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed 00065 00066 Bool_t fCachePacketSync; // control synchronization of cache and packet sizes 00067 Double_t fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync 00068 00069 Float_t fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers 00070 Long64_t fNEventsOnRemLoc; // number of events in currently 00071 // unalloc files on non-worker loc. 00072 Float_t fBaseLocalPreference; // indicates how much more likely the nodes will be 00073 // to open their local files (1 means indifferent) 00074 Bool_t fForceLocal; // if 1 - eliminate the remote processing 00075 00076 Long_t fMaxSlaveCnt; // maximum number of workers per filenode (Long_t to avoid 00077 // warnings from backward compatibility support) 00078 Int_t fPacketAsAFraction; // used to calculate the packet size 00079 // fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves) 00080 // fPacketAsAFraction can be interpreted as follows: 00081 // assuming all slaves have equal processing rate, packet size 00082 // is (#events processed by 1 slave) / fPacketSizeAsAFraction. 00083 // It can be set with PROOF_PacketAsAFraction in input list. 00084 Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy 00085 00086 TPacketizerAdaptive(); 00087 TPacketizerAdaptive(const TPacketizerAdaptive&); // no implementation, will generate 00088 void InitStats(); // initialise the stats 00089 void operator=(const TPacketizerAdaptive&); // error on accidental usage 00090 00091 TFileNode *NextNode(); 00092 void RemoveUnAllocNode(TFileNode *); 00093 00094 TFileNode *NextActiveNode(); 00095 void RemoveActiveNode(TFileNode *); 00096 00097 TFileStat *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0); 00098 TFileStat *GetNextActive(); 00099 void RemoveActive(TFileStat *file); 00100 00101 void Reset(); 00102 void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE); 00103 Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles); 00104 void SplitPerHost(TList *elements, TList **listOfMissingFiles); 00105 00106 public: 00107 TPacketizerAdaptive(TDSet *dset, TList *slaves, Long64_t first, Long64_t num, 00108 TList *input, TProofProgressStatus *st); 00109 virtual ~TPacketizerAdaptive(); 00110 00111 Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, 00112 Double_t latency, TList **listOfMissingFiles = 0); 00113 Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls); 00114 Float_t GetCurrentRate(Bool_t &all); 00115 Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent); 00116 TDSetElement *GetNextPacket(TSlave *sl, TMessage *r); 00117 void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles); 00118 00119 Int_t GetActiveWorkers(); 00120 00121 ClassDef(TPacketizerAdaptive,0) //Generate work packets for parallel processing 00122 }; 00123 00124 #endif