TPacketizerAdaptive.cxx

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TPacketizerAdaptive.cxx 37981 2011-02-04 12:39:10Z ganis $
00002 // Author: Jan Iwaszkiewicz   11/12/06
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010 *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TPacketizerAdaptive                                                  //
00015 //                                                                      //
00016 // This packetizer is based on TPacketizer but uses different           //
00017 // load-balancing algorithms and data structures.                       //
00018 // Two main improvements in the load-balancing strategy:                //
00019 // - First one was to change the order in which the files are assigned  //
00020 //   to the computing nodes in such a way that network transfers are    //
00021 //   evenly distributed in the query time. Transfer of the remote files //
00022 //   was often becoming a bottleneck at the end of a query.             //
00023 // - The other improvement is the use of time-based packet size. We     //
00024 //   measure the processing rate of all the nodes and calculate the     //
00025 //   packet size, so that it takes certain amount of time. In this way  //
00026 //   packetizer prevents the situation where the query can't finish     //
00027 //   because of one slow node.                                          //
00028 //                                                                      //
00029 // The data structures: TFileStat, TFileNode and TSlaveStat are         //
00030 // enriched + changed and TFileNode::Compare method is changed.         //
00031 //                                                                      //
00032 //////////////////////////////////////////////////////////////////////////
00033 
00034 
00035 #include "TPacketizerAdaptive.h"
00036 
00037 #include "Riostream.h"
00038 #include "TDSet.h"
00039 #include "TError.h"
00040 #include "TEnv.h"
00041 #include "TEntryList.h"
00042 #include "TEventList.h"
00043 #include "TMap.h"
00044 #include "TMessage.h"
00045 #include "TMonitor.h"
00046 #include "TNtupleD.h"
00047 #include "TObject.h"
00048 #include "TParameter.h"
00049 #include "TPerfStats.h"
00050 #include "TProofDebug.h"
00051 #include "TProof.h"
00052 #include "TProofServ.h"
00053 #include "TSlave.h"
00054 #include "TSocket.h"
00055 #include "TSortedList.h"
00056 #include "TUrl.h"
00057 #include "TClass.h"
00058 #include "TRandom.h"
00059 #include "TMath.h"
00060 #include "TObjString.h"
00061 #include "TList.h"
00062 
00063 //
00064 // The following three utility classes manage the state of the
00065 // work to be performed and the slaves involved in the process.
00066 // A list of TFileNode(s) describes the hosts with files, each
00067 // has a list of TFileStat(s) keeping the state for each TDSet
00068 // element (file).
00069 //
00070 // The list of TSlaveStat(s) keep track of the work (being) done
00071 // by each slave
00072 //
00073 
00074 
00075 //------------------------------------------------------------------------------
00076 
00077 class TPacketizerAdaptive::TFileStat : public TObject {
00078 
00079 private:
00080    Bool_t         fIsDone;       // is this element processed
00081    TFileNode     *fNode;         // my FileNode
00082    TDSetElement  *fElement;      // location of the file and its range
00083    Long64_t       fNextEntry;    // cursor in the range, -1 when done // needs changing
00084 
00085 public:
00086    TFileStat(TFileNode *node, TDSetElement *elem, TList *file);
00087 
00088    Bool_t         IsDone() const {return fIsDone;}
00089    Bool_t         IsSortable() const { return kTRUE; }
00090    void           SetDone() {fIsDone = kTRUE;}
00091    TFileNode     *GetNode() const {return fNode;}
00092    TDSetElement  *GetElement() const {return fElement;}
00093    Long64_t       GetNextEntry() const {return fNextEntry;}
00094    void           MoveNextEntry(Long64_t step) {fNextEntry += step;}
00095 
00096    // This method is used to keep a sorted list of remaining files to be processed
00097    Int_t          Compare(const TObject* obj) const
00098    {
00099       // Return -1 if elem.entries < obj.elem.entries, 0 if elem.entries equal
00100       // and 1 if elem.entries < obj.elem.entries.
00101       const TFileStat *fst = dynamic_cast<const TFileStat*>(obj);
00102       if (fst && GetElement() && fst->GetElement()) {
00103          Long64_t ent = GetElement()->GetNum();
00104          Long64_t entfst = fst->GetElement()->GetNum();
00105          if (ent > 0 && entfst > 0) {
00106             if (ent > entfst) {
00107                return 1;
00108             } else if (ent < entfst) {
00109                return -1;
00110             } else {
00111                return 0;
00112             }
00113          }
00114       }
00115       // No info: assume equal (no change in order)
00116       return 0;
00117    }
00118    void Print(Option_t * = 0) const
00119    {  // Notify file name and entries
00120       Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---",
00121                                    fElement ? fElement->GetNum() : -1);
00122    }
00123 };
00124 
00125 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files)
00126    : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
00127 {
00128    // Constructor: add to the global list
00129    if (files) files->Add(this);
00130 }
00131 
00132 //------------------------------------------------------------------------------
00133 
00134 // a class describing a file node as a part of a session
00135 class TPacketizerAdaptive::TFileNode : public TObject {
00136 
00137 private:
00138    TString        fNodeName;        // FQDN of the node
00139    TList         *fFiles;           // TDSetElements (files) stored on this node
00140    TObject       *fUnAllocFileNext; // cursor in fFiles
00141    TList         *fActFiles;        // files with work remaining
00142    TObject       *fActFileNext;     // cursor in fActFiles
00143    Int_t          fMySlaveCnt;      // number of slaves running on this node
00144                                     // (which can process remote files)
00145    Int_t          fExtSlaveCnt;     // number of external slaves processing
00146                                     // files on this node
00147    Int_t          fRunSlaveCnt;     // total number of slaves processing files
00148                                     // on this node
00149    Long64_t       fProcessed;       // number of events processed on this node
00150    Long64_t       fEvents;          // number of entries in files on this node
00151 
00152    Int_t          fStrategy;        // 0 means the classic and 1 (default) - the adaptive strategy
00153 
00154    TSortedList   *fFilesToProcess;  // Global list of files (TFileStat) to be processed (owned by TPacketizer)
00155 
00156 public:
00157    TFileNode(const char *name, Int_t strategy, TSortedList *files);
00158    ~TFileNode() { delete fFiles; delete fActFiles; }
00159 
00160    void        IncMySlaveCnt() { fMySlaveCnt++; }
00161    Int_t       GetMySlaveCnt() const { return fMySlaveCnt; }
00162    void        IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
00163    void        DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
00164    Int_t       GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
00165    void        IncRunSlaveCnt() { fRunSlaveCnt++; }
00166    void        DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
00167    Int_t       GetRunSlaveCnt() const { return fRunSlaveCnt; }
00168    Int_t       GetExtSlaveCnt() const { return fExtSlaveCnt; }
00169    Int_t       GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
00170    Bool_t      IsSortable() const { return kTRUE; }
00171    Int_t       GetNumberOfFiles() { return fFiles->GetSize(); }
00172    void        IncProcessed(Long64_t nEvents)
00173                   { fProcessed += nEvents; }
00174    Long64_t    GetProcessed() const { return fProcessed; }
00175    void        DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
00176    // this method is used by Compare() it adds 1, so it returns a number that
00177    // would be true if one more slave is added.
00178    Long64_t    GetEventsLeftPerSlave() const
00179       { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
00180    void        IncEvents(Long64_t nEvents) { fEvents += nEvents; }
00181    const char *GetName() const { return fNodeName.Data(); }
00182    Long64_t    GetNEvents() const { return fEvents; }
00183 
00184    void Print(Option_t * = 0) const
00185    {
00186       TFileStat *fs = 0;
00187       TDSetElement *e = 0;
00188       Int_t nn = 0;
00189       Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
00190       Printf("+++ TFileNode: %s +++", fNodeName.Data());
00191       Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
00192       Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
00193       Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
00194       if (fFiles && fFiles->GetSize() > 0) {
00195          TIter nxf(fFiles);
00196          while ((fs = (TFileStat *) nxf())) {
00197             if ((e = fs->GetElement())) {
00198                Printf("+++  #%d: %s  %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
00199                      e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
00200             } else {
00201                Printf("+++  #%d: no element! ", ++nn);
00202             }
00203          }
00204       }
00205       Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
00206       if (fActFiles && fActFiles->GetSize() > 0) {
00207          TIter nxaf(fActFiles);
00208          while ((fs = (TFileStat *) nxaf())) {
00209             if ((e = fs->GetElement())) {
00210                Printf("+++  #%d: %s  %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
00211                       e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
00212             } else {
00213                Printf("+++  #%d: no element! ", ++nn);
00214             }
00215          }
00216       }
00217       Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
00218    }
00219 
00220    void Add(TDSetElement *elem, Bool_t tolist)
00221    {
00222       TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
00223       TFileStat *f = new TFileStat(this, elem, files);
00224       fFiles->Add(f);
00225       if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
00226    }
00227 
00228    TFileStat *GetNextUnAlloc()
00229    {
00230       TObject *next = fUnAllocFileNext;
00231 
00232       if (next != 0) {
00233          // make file active
00234          fActFiles->Add(next);
00235          if (fActFileNext == 0) fActFileNext = fActFiles->First();
00236 
00237          // move cursor
00238          fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
00239       }
00240       return (TFileStat *) next;
00241    }
00242 
00243    TFileStat *GetNextActive()
00244    {
00245       TObject *next = fActFileNext;
00246 
00247       if (fActFileNext != 0) {
00248          fActFileNext = fActFiles->After(fActFileNext);
00249          if (fActFileNext == 0) fActFileNext = fActFiles->First();
00250       }
00251 
00252       return (TFileStat *) next;
00253    }
00254 
00255    void RemoveActive(TFileStat *file)
00256    {
00257       if (fActFileNext == file) fActFileNext = fActFiles->After(file);
00258       fActFiles->Remove(file);
00259       if (fFilesToProcess) fFilesToProcess->Remove(file);
00260       if (fActFileNext == 0) fActFileNext = fActFiles->First();
00261    }
00262 
00263    Int_t Compare(const TObject *other) const
00264    {
00265       // Must return -1 if this is smaller than obj, 0 if objects are equal
00266       // and 1 if this is larger than obj.
00267       // smaller means more needing a new worker.
00268       // Two cases are considered depending on
00269       // relation between harddrive speed and network bandwidth.
00270 
00271       const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
00272       if (!obj) {
00273          Error("Compare", "input is not a TPacketizer::TFileNode object");
00274          return 0;
00275       }
00276       
00277       // how many more events it has than obj
00278 
00279       if (fStrategy == 1) {
00280          // The default adaptive strategy.
00281          Int_t myVal = GetRunSlaveCnt();
00282          Int_t otherVal = obj->GetRunSlaveCnt();
00283          if (myVal < otherVal) {
00284             return -1;
00285          } else if (myVal > otherVal) {
00286             return 1;
00287          } else {
00288             // if this has more events to process than obj
00289             if ((fEvents - fProcessed) >
00290                 (obj->GetNEvents() - obj->GetProcessed())) {
00291                return -1;
00292             } else {
00293                return 1;
00294             }
00295          }
00296       } else {
00297          Int_t myVal = GetSlaveCnt();
00298          Int_t otherVal = obj->GetSlaveCnt();
00299          if (myVal < otherVal) {
00300             return -1;
00301          } else if (myVal > otherVal) {
00302             return 1;
00303          } else {
00304             return 0;
00305          }
00306       }
00307    }
00308 
00309    void Reset()
00310    {
00311       fUnAllocFileNext = fFiles->First();
00312       fActFiles->Clear();
00313       fActFileNext = 0;
00314       fExtSlaveCnt = 0;
00315       fMySlaveCnt = 0;
00316       fRunSlaveCnt = 0;
00317    }
00318 };
00319 
00320 
00321 TPacketizerAdaptive::TFileNode::TFileNode(const char *name, Int_t strategy, TSortedList *files)
00322    : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
00323      fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
00324      fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
00325      fStrategy(strategy), fFilesToProcess(files)
00326 {
00327    // Constructor
00328 
00329    fFiles->SetOwner();
00330    fActFiles->SetOwner(kFALSE);
00331 }
00332 
00333 //------------------------------------------------------------------------------
00334 
00335 class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
00336 
00337 friend class TPacketizerAdaptive;
00338 
00339 private:
00340    TFileNode     *fFileNode;     // corresponding node or 0
00341    TFileStat     *fCurFile;      // file currently being processed
00342    TDSetElement  *fCurElem;      // TDSetElement currently being processed
00343    Long64_t       fCurProcessed; // events processed in the current file
00344    Float_t        fCurProcTime;  // proc time spent on the current file
00345    TList         *fDSubSet;      // packets processed by this worker
00346 
00347 public:
00348    TSlaveStat(TSlave *slave);
00349    ~TSlaveStat();
00350    TFileNode  *GetFileNode() const { return fFileNode; }
00351    Long64_t    GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
00352    Double_t    GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
00353    TFileStat  *GetCurFile() { return fCurFile; }
00354    void        SetFileNode(TFileNode *node) { fFileNode = node; }
00355    void        UpdateRates(TProofProgressStatus *st);
00356    Float_t     GetAvgRate() { return fStatus->GetRate(); }
00357    Float_t     GetCurRate() {
00358       return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
00359    Int_t       GetLocalEventsLeft() {
00360       return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
00361    TList      *GetProcessedSubSet() { return fDSubSet; }
00362    TProofProgressStatus *GetProgressStatus() { return fStatus; }
00363    TProofProgressStatus *AddProcessed(TProofProgressStatus *st = 0);
00364 };
00365 
00366 //______________________________________________________________________________
00367 TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
00368    : fFileNode(0), fCurFile(0), fCurElem(0),
00369      fCurProcessed(0), fCurProcTime(0)
00370 {
00371    // Constructor
00372 
00373    fDSubSet = new TList();
00374    fDSubSet->SetOwner();
00375    fSlave = slave;
00376    fStatus = new TProofProgressStatus();
00377    // The slave name is a special one in PROOF-Lite: avoid blocking on the DNS
00378    // for non existing names
00379    fWrkFQDN = slave->GetName();
00380    if (strcmp(slave->ClassName(), "TSlaveLite")) {
00381       fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN();
00382       // Get full name for local hosts
00383       if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1")
00384          fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN();
00385    }
00386    PDB(kPacketizer, 2)
00387       Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
00388 }
00389 
00390 //______________________________________________________________________________
00391 TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
00392 {
00393    // Cleanup
00394 
00395    SafeDelete(fDSubSet);
00396    SafeDelete(fStatus);
00397 }
00398 
00399 //______________________________________________________________________________
00400 void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
00401 {
00402    // Update packetizer rates
00403 
00404    if (!st) {
00405       Error("UpdateRates", "no status object!");
00406       return;
00407    }
00408    if (fCurFile->IsDone()) {
00409       fCurProcTime = 0;
00410       fCurProcessed = 0;
00411    } else {
00412       fCurProcTime += st->GetProcTime() - GetProcTime();
00413       fCurProcessed += st->GetEntries() - GetEntriesProcessed();
00414    }
00415    fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
00416    st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
00417    SafeDelete(fStatus);
00418    fStatus = st;
00419 }
00420 
00421 //______________________________________________________________________________
00422 TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
00423 {
00424    // Add the current element to the fDSubSet (subset processed by this worker)
00425    // and if the status arg is given, then change the size of the packet.
00426    // return the difference (*st - *fStatus)
00427 
00428    if (st && fDSubSet && fCurElem) {
00429       if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
00430          fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
00431       fDSubSet->Add(fCurElem);
00432       TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
00433       return diff;
00434    } else {
00435       Error("AddProcessed", "processed subset of current elem undefined");
00436       return 0;
00437    }
00438 }
00439 
00440 //------------------------------------------------------------------------------
00441 
00442 ClassImp(TPacketizerAdaptive)
00443 
00444 //______________________________________________________________________________
00445 TPacketizerAdaptive::TPacketizerAdaptive(TDSet *dset, TList *slaves,
00446                                          Long64_t first, Long64_t num,
00447                                          TList *input, TProofProgressStatus *st)
00448                     : TVirtualPacketizer(input, st)
00449 {
00450    // Constructor
00451 
00452    PDB(kPacketizer,1) Info("TPacketizerAdaptive",
00453                            "enter (first %lld, num %lld)", first, num);
00454 
00455    // Init pointer members
00456    fSlaveStats = 0;
00457    fUnAllocated = 0;
00458    fActive = 0;
00459    fFileNodes = 0;
00460    fMaxPerfIdx = 1;
00461    fCachePacketSync = kTRUE;
00462    fMaxEntriesRatio = 2.;
00463 
00464    fMaxSlaveCnt = -1;
00465    fPacketAsAFraction = 4;
00466    fStrategy = 1;
00467    fFilesToProcess = new TSortedList;
00468    fFilesToProcess->SetOwner(kFALSE);
00469 
00470    if (!fProgressStatus) {
00471       Error("TPacketizerAdaptive", "No progress status");
00472       return;
00473    }
00474 
00475    // Attempt to synchronize the packet size with the tree cache size
00476    Int_t cpsync = -1;
00477    if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) {
00478       // Check if there is a global cache-packet sync setting
00479       cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1);
00480    }
00481    if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
00482 
00483    // Max file entries to avg allowed ratio for cache-to-packet synchronization
00484    // (applies only if fCachePacketSync is true; -1. disables the bound)
00485    if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
00486       // Check if there is a global ratio setting
00487       fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.);
00488    }
00489 
00490    // The possibility to change packetizer strategy to the basic TPacketizer's
00491    // one (in which workers always process their local data first).
00492    Int_t strategy = -1;
00493    if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
00494       // Check if there is a global strategy setting
00495       strategy = gEnv->GetValue("Packetizer.Strategy", 1);
00496    }
00497    if (strategy == 0) {
00498       fStrategy = 0;
00499       Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
00500    } else if (strategy != 1) {
00501       Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
00502    }
00503 
00504    Long_t maxSlaveCnt = 0;
00505    if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
00506       if (maxSlaveCnt < 1) {
00507          Info("TPacketizerAdaptive",
00508               "The value of PROOF_MaxSlavesPerNode must be grater than 0");
00509          maxSlaveCnt = 0;
00510       }
00511    } else {
00512       // Try also with Int_t (recently supported in TProof::SetParameter)
00513       Int_t mxslcnt = -1;
00514       if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
00515          if (mxslcnt < 1) {
00516             Info("TPacketizerAdaptive",
00517                  "The value of PROOF_MaxSlavesPerNode must be grater than 0");
00518             mxslcnt = 0;
00519          }
00520          maxSlaveCnt = (Long_t) mxslcnt;
00521       }
00522    }
00523 
00524    if (!maxSlaveCnt)
00525       maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
00526    if (maxSlaveCnt > 0) {
00527       fMaxSlaveCnt = maxSlaveCnt;
00528       Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
00529            fMaxSlaveCnt);
00530    }
00531 
00532    // if forceLocal parameter is set to 1 then eliminate the cross-worker
00533    // processing;
00534    // This minimizes the network usage on the PROOF cluser at the expense of
00535    // longer jobs processing times.
00536    // To process successfully the session must have workers with all the data!
00537    fForceLocal = kFALSE;
00538    Int_t forceLocal = 0;
00539    if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
00540       if (forceLocal == 1)
00541          fForceLocal = kTRUE;
00542       else
00543          Info("TPacketizerAdaptive",
00544             "The only accepted value of PROOF_ForceLocal parameter is 1 !");
00545    }
00546 
00547    // Below we provide a possibility to change the way packet size is
00548    // calculated or define the packet time directly.
00549    // fPacketAsAFraction can be interpreted as follows:
00550    // packet time is (expected job proc. time) / fPacketSizeAsAFraction.
00551    // It substitutes 20 in the old formula to calculate the fPacketSize:
00552    // fPacketSize = fTotalEntries / (20 * nslaves)
00553    Int_t packetAsAFraction = 0;
00554    if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
00555       if (packetAsAFraction > 0) {
00556          fPacketAsAFraction = packetAsAFraction;
00557          Info("TPacketizerAdaptive",
00558               "using alternate fraction of query time as a packet size: %d",
00559               packetAsAFraction);
00560       } else
00561          Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
00562    }
00563 
00564    // Save the config parameters in the dedicated list so that they will be saved
00565    // in the outputlist and therefore in the relevant TQueryResult
00566    fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
00567    fConfigParams->Add(new TParameter<Double_t>("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
00568    fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerStrategy", fStrategy));
00569    fConfigParams->Add(new TParameter<Int_t>("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
00570    fConfigParams->Add(new TParameter<Int_t>("PROOF_ForceLocal", (Int_t)fForceLocal));
00571    fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketAsAFraction", fPacketAsAFraction));
00572 
00573    Double_t baseLocalPreference = 1.2;
00574    TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference);
00575    fBaseLocalPreference = (Float_t)baseLocalPreference;
00576 
00577    fFileNodes = new TList;
00578    fFileNodes->SetOwner();
00579    fUnAllocated = new TList;
00580    fUnAllocated->SetOwner(kFALSE);
00581    fActive = new TList;
00582    fActive->SetOwner(kFALSE);
00583 
00584    fValid = kTRUE;
00585 
00586    // Resolve end-point urls to optmize distribution
00587    // dset->Lookup(); // moved to TProofPlayerRemote::Process
00588 
00589    // Read list of mounted disks
00590    TObjArray *partitions = 0;
00591    TString partitionsStr;
00592    if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0)
00593       partitionsStr = gEnv->GetValue("Packetizer.Partitions", "");
00594    if (!partitionsStr.IsNull()) {
00595       Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data());
00596       partitions = partitionsStr.Tokenize(",");
00597    }
00598 
00599    // Split into per host and disk entries
00600    dset->Reset();
00601    TDSetElement *e;
00602    while ((e = (TDSetElement*)dset->Next())) {
00603 
00604       if (e->GetValid()) continue;
00605 
00606       // The dataset name, if any
00607       if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
00608          fDataSet = e->GetDataSet();
00609 
00610       TUrl url = e->GetFileName();
00611 
00612       // Map non URL filenames to dummy host
00613       TString host;
00614       if ( !url.IsValid() ||
00615           (strncmp(url.GetProtocol(),"root", 4) &&
00616            strncmp(url.GetProtocol(),"rfio", 4)) ) {
00617          host = "no-host";
00618       } else {
00619          host = url.GetHostFQDN();
00620       }
00621       // Get full name for local hosts
00622       if (host.Contains("localhost") || host == "127.0.0.1") {
00623          url.SetHost(gSystem->HostName());
00624          host = url.GetHostFQDN();
00625       }
00626 
00627       // Find on which disk is the file, if any
00628       TString disk;
00629       if (partitions) {
00630          TIter iString(partitions);
00631          TObjString* os = 0;
00632          while ((os = (TObjString *)iString())) {
00633             // Compare begining of the url with disk mountpoint
00634             if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
00635                disk = os->GetName();
00636                break;
00637             }
00638          }
00639       }
00640       // Node's url
00641       TString nodeStr;
00642       if (disk.IsNull())
00643          nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
00644       else
00645          nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
00646       TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
00647 
00648       if (node == 0) {
00649          node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
00650          fFileNodes->Add(node);
00651          PDB(kPacketizer,2)
00652             Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
00653       } else {
00654          PDB(kPacketizer,2)
00655             Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
00656       }
00657 
00658       node->Add(e, kFALSE);
00659    }
00660 
00661    fSlaveStats = new TMap;
00662    fSlaveStats->SetOwner(kFALSE);
00663 
00664    TSlave *slave;
00665    TIter si(slaves);
00666    while ((slave = (TSlave*) si.Next())) {
00667       fSlaveStats->Add( slave, new TSlaveStat(slave) );
00668       fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
00669          slave->GetPerfIdx() : fMaxPerfIdx;
00670    }
00671 
00672    // Setup file & filenode structure
00673    Reset();
00674    // Optimize the number of files to be open when running on subsample
00675    Int_t validateMode = 0;
00676    Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
00677    Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
00678    ValidateFiles(dset, slaves, num, byfile);
00679 
00680 
00681    if (!fValid) return;
00682 
00683    // apply global range (first,num) to dset and rebuild structure
00684    // ommitting TDSet elements that are not needed
00685 
00686    Int_t files = 0;
00687    fTotalEntries = 0;
00688    fUnAllocated->Clear();  // avoid dangling pointers
00689    fActive->Clear();
00690    fFileNodes->Clear();    // then delete all objects
00691    PDB(kPacketizer,2)
00692       Info("TPacketizerAdaptive",
00693            "processing range: first %lld, num %lld", first, num);
00694 
00695    dset->Reset();
00696    Long64_t cur = 0;
00697    while (( e = (TDSetElement*)dset->Next())) {
00698 
00699       // Skip invalid or missing file; It will be moved
00700       // from the dset to the 'MissingFiles' list in the player.
00701       if (!e->GetValid()) continue;
00702 
00703       TUrl url = e->GetFileName();
00704       Long64_t eFirst = e->GetFirst();
00705       Long64_t eNum = e->GetNum();
00706       PDB(kPacketizer,2)
00707          Info("TPacketizerAdaptive",
00708               "processing element: first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
00709 
00710       if (!e->GetEntryList()) {
00711          // this element is before the start of the global range, skip it
00712          if (cur + eNum < first) {
00713             cur += eNum;
00714             PDB(kPacketizer,2)
00715                Info("TPacketizerAdaptive",
00716                     "processing element: skip element cur %lld", cur);
00717             continue;
00718          }
00719 
00720          // this element is after the end of the global range, skip it
00721          if (num != -1 && (first+num <= cur)) {
00722             cur += eNum;
00723             PDB(kPacketizer,2)
00724                Info("TPacketizerAdaptive",
00725                     "processing element: drop element cur %lld", cur);
00726             continue; // break ??
00727          }
00728 
00729          // If this element contains the end of the global range
00730          // adjust its number of entries
00731          if (num != -1 && (first+num <= cur+eNum)) {
00732             e->SetNum( first + num - cur );
00733             PDB(kPacketizer,2)
00734                Info("TPacketizerAdaptive",
00735                     "processing element: adjust end %lld", first + num - cur);
00736             cur += eNum;
00737             eNum = e->GetNum();
00738          }
00739 
00740          // If this element contains the start of the global range
00741          // adjust its start and number of entries
00742          if (cur < first) {
00743             e->SetFirst( eFirst + (first - cur) );
00744             e->SetNum( e->GetNum() - (first - cur) );
00745             PDB(kPacketizer,2)
00746                Info("TPacketizerAdaptive",
00747                     "processing element: adjust start %lld and end %lld",
00748                     eFirst + (first - cur), first + num - cur);
00749             cur += eNum;
00750             eNum = e->GetNum();
00751          }
00752       } else {
00753          TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
00754          if (enl) {
00755             eNum = enl->GetN();
00756          } else {
00757             TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
00758             eNum = evl ? evl->GetN() : eNum;
00759          }
00760          if (!eNum)
00761             continue;
00762       }
00763       PDB(kPacketizer,2)
00764          Info("TPacketizerAdaptive",
00765               "processing element: next cur %lld", cur);
00766 
00767       // Map non URL filenames to dummy host
00768       TString host;
00769       if ( !url.IsValid() ||
00770           (strncmp(url.GetProtocol(),"root", 4) &&
00771            strncmp(url.GetProtocol(),"rfio", 4)) ) {
00772          host = "no-host";
00773       } else {
00774          host = url.GetHostFQDN();
00775       }
00776       // Get full name for local hosts
00777       if (host.Contains("localhost") || host == "127.0.0.1") {
00778          url.SetHost(gSystem->HostName());
00779          host = url.GetHostFQDN();
00780       }
00781 
00782       // Find, on which disk is the file
00783       TString disk;
00784       if (partitions) {
00785          TIter iString(partitions);
00786          TObjString* os = 0;
00787          while ((os = (TObjString *)iString())) {
00788             // Compare begining of the url with disk mountpoint
00789             if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
00790                disk = os->GetName();
00791                break;
00792             }
00793          }
00794       }
00795       // Node's url
00796       TString nodeStr;
00797       if (disk.IsNull())
00798          nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
00799       else
00800          nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
00801       TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
00802 
00803 
00804       if (node == 0) {
00805          node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
00806          fFileNodes->Add( node );
00807          PDB(kPacketizer, 2)
00808             Info("TPacketizerAdaptive", "creating new node '%s' for element", nodeStr.Data());
00809       } else {
00810          PDB(kPacketizer, 2)
00811             Info("TPacketizerAdaptive", "adding element to exiting node '%s'", nodeStr.Data());
00812       }
00813 
00814       ++files;
00815       fTotalEntries += eNum;
00816       node->Add(e, kTRUE);
00817       node->IncEvents(eNum);
00818       PDB(kPacketizer,2) e->Print("a");
00819    }
00820    PDB(kPacketizer,1)
00821       Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
00822                                   fTotalEntries, files, fFileNodes->GetSize());
00823 
00824    // Set the total number for monitoring
00825    if (gPerfStats)
00826       gPerfStats->SetNumEvents(fTotalEntries);
00827 
00828    Reset();
00829 
00830    InitStats();
00831 
00832    if (!fValid)
00833       SafeDelete(fProgress);
00834 
00835    PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
00836 }
00837 
00838 //______________________________________________________________________________
00839 TPacketizerAdaptive::~TPacketizerAdaptive()
00840 {
00841    // Destructor.
00842 
00843    if (fSlaveStats) {
00844       fSlaveStats->DeleteValues();
00845    }
00846 
00847    SafeDelete(fSlaveStats);
00848    SafeDelete(fUnAllocated);
00849    SafeDelete(fActive);
00850    SafeDelete(fFileNodes);
00851    SafeDelete(fFilesToProcess);
00852 }
00853 
00854 //______________________________________________________________________________
00855 void TPacketizerAdaptive::InitStats()
00856 {
00857    // (re)initialise the statistics
00858    // called at the begining or after a worker dies.
00859 
00860    // calculating how many files from TDSet are not cached on
00861    // any slave
00862    Int_t noRemoteFiles = 0;
00863    fNEventsOnRemLoc = 0;
00864    Int_t totalNumberOfFiles = 0;
00865    TIter next(fFileNodes);
00866    while (TFileNode *fn = (TFileNode*)next()) {
00867       totalNumberOfFiles += fn->GetNumberOfFiles();
00868       if (fn->GetMySlaveCnt() == 0) {
00869          noRemoteFiles += fn->GetNumberOfFiles();
00870          fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
00871       }
00872    }
00873 
00874    if (totalNumberOfFiles == 0) {
00875       Info("InitStats", "no valid or non-empty file found: setting invalid");
00876       // No valid files: set invalid and return
00877       fValid = kFALSE;
00878       return;
00879    }
00880 
00881    fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
00882    Info("InitStats",
00883         "fraction of remote files %f", fFractionOfRemoteFiles);
00884 
00885    if (!fValid)
00886       SafeDelete(fProgress);
00887 
00888    PDB(kPacketizer,1) Info("InitStats", "return");
00889 }
00890 
00891 //______________________________________________________________________________
00892 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const char *nodeHostName)
00893 {
00894    // Get next unallocated file from 'node' or other nodes:
00895    // First try 'node'. If there is no more files, keep trying to
00896    // find an unallocated file on other nodes.
00897 
00898    TFileStat *file = 0;
00899 
00900    if (node != 0) {
00901       PDB(kPacketizer, 2)
00902          Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
00903       file = node->GetNextUnAlloc();
00904       if (file == 0) RemoveUnAllocNode(node);
00905    } else {
00906       if (nodeHostName && strlen(nodeHostName) > 0) {
00907 
00908          TFileNode *fn;
00909          // Make sure that they are in the corrected order
00910          fUnAllocated->Sort();
00911          PDB(kPacketizer,2) fUnAllocated->Print();
00912 
00913          // Loop over unallocated fileNode list
00914          for (int i = 0; i < fUnAllocated->GetSize(); i++) {
00915 
00916             if ((fn = (TFileNode *) fUnAllocated->At(i))) {
00917                TUrl uu(fn->GetName());
00918                PDB(kPacketizer, 2)
00919                   Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
00920 
00921                // Check, whether node's hostname is matching with current fileNode (fn)
00922                if (!strcmp(nodeHostName, uu.GetHost())) {
00923                   node = fn;
00924 
00925                   // Fetch next unallocated file from this node
00926                   if ((file = node->GetNextUnAlloc()) == 0) {
00927                      RemoveUnAllocNode(node);
00928                      node = 0;
00929                   } else {
00930                      PDB(kPacketizer, 2)
00931                         Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
00932                      break;
00933                   }
00934                }
00935             } else {
00936                Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i);
00937             }
00938          }
00939 
00940          if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
00941             // Unlike in TPacketizer we look at the number of ext slaves only.
00942             PDB(kPacketizer,1)
00943                Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
00944             node = 0;
00945          }
00946       }
00947 
00948       if (node == 0) {
00949          while (file == 0 && ((node = NextNode()) != 0)) {
00950             PDB(kPacketizer, 2)
00951                Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
00952             if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
00953          }
00954       }
00955    }
00956 
00957    if (file != 0) {
00958       // if needed make node active
00959       if (fActive->FindObject(node) == 0) {
00960          fActive->Add(node);
00961       }
00962    }
00963 
00964    return file;
00965 }
00966 
00967 //______________________________________________________________________________
00968 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
00969 {
00970    // Get next node which has unallocated files.
00971    // the order is determined by TFileNode::Compare
00972 
00973    fUnAllocated->Sort();
00974    PDB(kPacketizer,2) {
00975       fUnAllocated->Print();
00976    }
00977 
00978    TFileNode *fn = (TFileNode*) fUnAllocated->First();
00979    if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
00980       // unlike in TPacketizer we look at the number of ext slaves only.
00981       PDB(kPacketizer,1)
00982          Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
00983       fn = 0;
00984    }
00985 
00986    return fn;
00987 }
00988 
00989 //______________________________________________________________________________
00990 void TPacketizerAdaptive::RemoveUnAllocNode(TFileNode * node)
00991 {
00992    // Remove unallocated node.
00993 
00994    fUnAllocated->Remove(node);
00995 }
00996 
00997 //______________________________________________________________________________
00998 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
00999 {
01000    // Get next active file.
01001 
01002    TFileNode *node;
01003    TFileStat *file = 0;
01004 
01005    while (file == 0 && ((node = NextActiveNode()) != 0)) {
01006          file = node->GetNextActive();
01007          if (file == 0) RemoveActiveNode(node);
01008    }
01009 
01010    return file;
01011 }
01012 
01013 
01014 //______________________________________________________________________________
01015 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
01016 {
01017    // Get next active node.
01018 
01019    fActive->Sort();
01020    PDB(kPacketizer,2) {
01021       Info("NextActiveNode", "enter");
01022       fActive->Print();
01023    }
01024 
01025    TFileNode *fn = (TFileNode*) fActive->First();
01026    // look at only ext slaves
01027    if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
01028       PDB(kPacketizer,1)
01029          Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
01030       fn = 0;
01031    }
01032 
01033    return fn;
01034 }
01035 
01036 //______________________________________________________________________________
01037 void TPacketizerAdaptive::RemoveActive(TFileStat *file)
01038 {
01039    // Remove file from the list of actives.
01040 
01041    TFileNode *node = file->GetNode();
01042 
01043    node->RemoveActive(file);
01044    if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
01045 }
01046 
01047 //______________________________________________________________________________
01048 void TPacketizerAdaptive::RemoveActiveNode(TFileNode *node)
01049 {
01050    // Remove node from the list of actives.
01051 
01052    fActive->Remove(node);
01053 }
01054 
01055 //______________________________________________________________________________
01056 void TPacketizerAdaptive::Reset()
01057 {
01058    // Reset the internal data structure for packet distribution.
01059 
01060    fUnAllocated->Clear();
01061    fUnAllocated->AddAll(fFileNodes);
01062 
01063    fActive->Clear();
01064 
01065    TIter files(fFileNodes);
01066    TFileNode *fn;
01067    while ((fn = (TFileNode*) files.Next()) != 0) {
01068       fn->Reset();
01069    }
01070 
01071    TIter slaves(fSlaveStats);
01072    TObject *key;
01073    while ((key = slaves.Next()) != 0) {
01074       TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
01075       // Find out which file nodes are on the worker machine and assign the
01076       // one with less workers assigned
01077       TFileNode *fnmin = 0;
01078       Int_t fncnt = fSlaveStats->GetSize();
01079       files.Reset();
01080       while ((fn = (TFileNode*) files.Next()) != 0) {
01081          if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
01082             if (fn->GetMySlaveCnt() < fncnt) {
01083                fnmin = fn;
01084                fncnt = fn->GetMySlaveCnt();
01085             }
01086          }
01087       }
01088       if (fnmin != 0 ) {
01089          slstat->SetFileNode(fnmin);
01090          fnmin->IncMySlaveCnt();
01091          PDB(kPacketizer, 2)
01092             Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
01093                          fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
01094       }
01095       slstat->fCurFile = 0;
01096    }
01097 }
01098 
01099 //______________________________________________________________________________
01100 void TPacketizerAdaptive::ValidateFiles(TDSet *dset, TList *slaves,
01101                                         Long64_t maxent, Bool_t byfile)
01102 {
01103    // Check existence of file/dir/tree an get number of entries.
01104    // Assumes the files have been setup.
01105 
01106    TMap     slaves_by_sock;
01107    TMonitor mon;
01108    TList    workers;
01109 
01110 
01111    // Setup the communication infrastructure
01112 
01113    workers.AddAll(slaves);
01114    TIter    si(slaves);
01115    TSlave   *slm;
01116    while ((slm = (TSlave*)si.Next()) != 0) {
01117       PDB(kPacketizer,3)
01118       Info("ValidateFiles","socket added to monitor: %p (%s)",
01119           slm->GetSocket(), slm->GetName());
01120       mon.Add(slm->GetSocket());
01121       slaves_by_sock.Add(slm->GetSocket(), slm);
01122    }
01123 
01124    mon.DeActivateAll();
01125 
01126    ((TProof*)gProof)->DeActivateAsyncInput();
01127 
01128    // Some monitoring systems (TXSocketHandler) need to know this
01129    ((TProof*)gProof)->fCurrentMonitor = &mon;
01130 
01131    // Identify the type
01132    if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
01133 
01134    // Preparing for client notification
01135    TString msg("Validating files");
01136    UInt_t n = 0;
01137    UInt_t tot = dset->GetListOfElements()->GetSize();
01138    Bool_t st = kTRUE;
01139 
01140    Long64_t totent = 0, nopenf = 0;
01141    while (kTRUE) {
01142 
01143       // send work
01144       while (TSlave *s = (TSlave *)workers.First()) {
01145 
01146          workers.Remove(s);
01147 
01148          // find a file
01149 
01150          TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
01151          TFileNode *node = 0;
01152          TFileStat *file = 0;
01153 
01154          // try its own node first
01155          if ((node = slstat->GetFileNode()) != 0) {
01156             file = GetNextUnAlloc(node);
01157             if (file == 0)
01158                slstat->SetFileNode(0);
01159          }
01160 
01161          // look for a file on any other node if necessary
01162          if (file == 0)
01163             file = GetNextUnAlloc();
01164 
01165          if (file != 0) {
01166             // files are done right away
01167             RemoveActive(file);
01168 
01169             slstat->fCurFile = file;
01170             TDSetElement *elem = file->GetElement();
01171             Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
01172             if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
01173                // This is decremented when we get the reply
01174                file->GetNode()->IncExtSlaveCnt(slstat->GetName());
01175                TMessage m(kPROOF_GETENTRIES);
01176                m << dset->IsTree()
01177                << TString(elem->GetFileName())
01178                << TString(elem->GetDirectory())
01179                << TString(elem->GetObjName());
01180 
01181                s->GetSocket()->Send( m );
01182                mon.Activate(s->GetSocket());
01183                PDB(kPacketizer,2)
01184                   Info("ValidateFiles",
01185                        "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
01186                        s->GetOrdinal(), s->GetName(), s->GetSocket(),
01187                        dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
01188                        elem->GetDirectory(), elem->GetObjName());
01189             } else {
01190                // Fill the info
01191                elem->SetTDSetOffset(entries);
01192                if (entries > 0) {
01193                   // Most likely valid
01194                   elem->SetValid();
01195                   if (!elem->GetEntryList()) {
01196                      if (elem->GetFirst() > entries) {
01197                         Error("ValidateFiles",
01198                               "first (%lld) higher then number of entries (%lld) in %s",
01199                                elem->GetFirst(), entries, elem->GetFileName());
01200                         // disable element
01201                         slstat->fCurFile->SetDone();
01202                         elem->Invalidate();
01203                         dset->SetBit(TDSet::kSomeInvalid);
01204                      }
01205                      if (elem->GetNum() == -1) {
01206                         elem->SetNum(entries - elem->GetFirst());
01207                      } else if (elem->GetFirst() + elem->GetNum() > entries) {
01208                         Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
01209                                  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
01210                                  entries, elem->GetFileName());
01211                         elem->SetNum(entries - elem->GetFirst());
01212                      }
01213                      PDB(kPacketizer,2)
01214                         Info("ValidateFiles",
01215                              "found elem '%s' with %lld entries", elem->GetFileName(), entries);
01216                   }
01217                }
01218                // Count
01219                totent += entries;
01220                nopenf++;
01221                // Notify the client
01222                n++;
01223                gProof->SendDataSetStatus(msg, n, tot, st);
01224 
01225                // This worker is ready for the next validation
01226                workers.Add(s);
01227             }
01228          }
01229       }
01230 
01231       // Check if there is anything to wait for
01232       if (mon.GetActive() == 0) {
01233          if (byfile && maxent > 0) {
01234             // How many files do we still need ?
01235             Long64_t nrestf = (maxent - totent) * nopenf / totent ;
01236             if (nrestf <= 0 && maxent > totent) nrestf = 1;
01237             if (nrestf > 0) {
01238                PDB(kPacketizer,3)
01239                   Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files",
01240                                          maxent, totent, nopenf, nrestf);
01241                si.Reset();
01242                while ((slm = (TSlave *) si.Next()) && nrestf--) {
01243                   workers.Add(slm);
01244                }
01245                continue;
01246             } else {
01247                PDB(kPacketizer,3)
01248                   Info("ValidateFiles", "no need to validate more files");
01249                break;
01250             }
01251          } else {
01252             break;
01253          }
01254       }
01255 
01256       PDB(kPacketizer,3) {
01257          Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
01258          TList *act = mon.GetListOfActives();
01259          TIter next(act);
01260          while (TSocket *s = (TSocket*) next()) {
01261             TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
01262             if (sl)
01263                Info("ValidateFiles", "   worker-%s (%s)",
01264                     sl->GetOrdinal(), sl->GetName());
01265          }
01266          delete act;
01267       }
01268 
01269       TSocket *sock = mon.Select();
01270       // If we have been interrupted break
01271       if (!sock) {
01272          Error("ValidateFiles", "selection has been interrupted - STOP");
01273          mon.DeActivateAll();
01274          fValid = kFALSE;
01275          break;
01276       }
01277       mon.DeActivate(sock);
01278 
01279       PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
01280 
01281       TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
01282       if (!sock->IsValid()) {
01283          // A socket got invalid during validation
01284          Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
01285                slave->GetOrdinal(), slave->GetName());
01286          ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
01287          fValid = kFALSE;
01288          break;
01289       }
01290 
01291       TMessage *reply;
01292 
01293       if (sock->Recv(reply) <= 0) {
01294          // Notify
01295          Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
01296                                 slave->GetOrdinal(), slave->GetName());
01297          // Help! lost a slave? ('slave' is deleted inside here ...)
01298          ((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
01299          fValid = kFALSE;
01300          continue;
01301       }
01302 
01303       if (reply->What() != kPROOF_GETENTRIES) {
01304          // Not what we want: handover processing to the central machinery
01305          Int_t what = reply->What();
01306          ((TProof*)gProof)->HandleInputMessage(slave, reply);
01307          if (what == kPROOF_FATAL) {
01308              Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
01309                                     slave->GetOrdinal(), slave->GetName());
01310              fValid = kFALSE;
01311          } else {
01312             // Reactivate the socket
01313             mon.Activate(sock);
01314          }
01315          // Get next message
01316          continue;
01317       }
01318 
01319       TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
01320       TDSetElement *e = slavestat->fCurFile->GetElement();
01321       slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
01322       Long64_t entries;
01323 
01324       (*reply) >> entries;
01325 
01326       // Extract object name, if there
01327       if ((reply->BufferSize() > reply->Length())) {
01328          TString objname;
01329          (*reply) >> objname;
01330          e->SetTitle(objname);
01331       }
01332 
01333       e->SetTDSetOffset(entries);
01334       if (entries > 0) {
01335 
01336          // This dataset element is most likely valid
01337          e->SetValid();
01338 
01339          if (!e->GetEntryList()) {
01340             if (e->GetFirst() > entries) {
01341                Error("ValidateFiles",
01342                      "first (%lld) higher then number of entries (%lld) in %s",
01343                       e->GetFirst(), entries, e->GetFileName());
01344 
01345                // Invalidate the element
01346                slavestat->fCurFile->SetDone();
01347                e->Invalidate();
01348                dset->SetBit(TDSet::kSomeInvalid);
01349             }
01350 
01351             if (e->GetNum() == -1) {
01352                e->SetNum(entries - e->GetFirst());
01353             } else if (e->GetFirst() + e->GetNum() > entries) {
01354                Error("ValidateFiles",
01355                      "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
01356                       e->GetNum(), e->GetFirst(), entries, e->GetFileName());
01357                e->SetNum(entries - e->GetFirst());
01358             }
01359          }
01360 
01361          // Count
01362          totent += entries;
01363          nopenf++;
01364 
01365          // Notify the client
01366          n++;
01367          gProof->SendDataSetStatus(msg, n, tot, st);
01368 
01369       } else {
01370 
01371          Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() );
01372          //
01373          // Need to fix this with a user option to allow incomplete file sets (rdm)
01374          //
01375          //fValid = kFALSE; // all element must be readable!
01376          if (gProofServ) {
01377             TMessage m(kPROOF_MESSAGE);
01378             m << TString(Form("Cannot get entries for file: %s - skipping",
01379                               e->GetFileName()));
01380             gProofServ->GetSocket()->Send(m);
01381          }
01382 
01383          // invalidate element
01384          e->Invalidate();
01385          dset->SetBit(TDSet::kSomeInvalid);
01386       }
01387       PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
01388 
01389       // Ready for the next job, unless we have enough files
01390       if (maxent < 0 || ((totent < maxent) && !byfile))
01391          workers.Add(slave);
01392    }
01393 
01394    // report std. output from slaves??
01395 
01396    ((TProof*)gProof)->ActivateAsyncInput();
01397 
01398    // This needs to be reset
01399    ((TProof*)gProof)->fCurrentMonitor = 0;
01400 
01401    // No reason to continue if invalid
01402    if (!fValid)
01403       return;
01404 
01405    // compute the offset for each file element
01406    Long64_t offset = 0;
01407    Long64_t newOffset = 0;
01408    TIter next(dset->GetListOfElements());
01409    TDSetElement *el;
01410    while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
01411       if (el->GetValid()) {
01412          newOffset = offset + el->GetTDSetOffset();
01413          el->SetTDSetOffset(offset);
01414          offset = newOffset;
01415       }
01416    }
01417 }
01418 
01419 //______________________________________________________________________________
01420 Int_t TPacketizerAdaptive::CalculatePacketSize(TObject *slStatPtr, Long64_t cachesz, Int_t learnent)
01421 {
01422    // The result depends on the fStrategy
01423 
01424    Long64_t num;
01425    if (fStrategy == 0) {
01426       // TPacketizer's heuristic for starting packet size
01427       // Constant packet size;
01428       Int_t nslaves = fSlaveStats->GetSize();
01429       if (nslaves > 0) {
01430          num = fTotalEntries / (fPacketAsAFraction * nslaves);
01431       } else {
01432          num = 1;
01433       }
01434    } else {
01435       // The dynamic heuristic for setting the packet size (default)
01436       // Calculates the packet size based on performance of this slave
01437       // and estimated time left until the end of the query.
01438       TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
01439       Float_t rate = slstat->GetCurRate();
01440       if (!rate)
01441          rate = slstat->GetAvgRate();
01442       if (rate) {
01443 
01444          // Global average rate
01445          Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize()));
01446          Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
01447 
01448          // Bytes-to-Event conversion
01449          Float_t bevt = GetBytesRead() / GetEntriesProcessed();
01450 
01451          // Make sure it is not smaller then the cache, if the info is available and the size
01452          // synchronization is required. But apply the cache-packet size synchronization only if there
01453          // are enough left files to process and the files are all of similar sizes. Otherwise we risk
01454          // to not exploit optimally all potentially active workers.
01455          Bool_t cpsync = fCachePacketSync;
01456          if (fMaxEntriesRatio > 0. && cpsync) {
01457             if (fFilesToProcess && fFilesToProcess->GetSize() <= fSlaveStats->GetSize()) {
01458                Long64_t remEntries = fTotalEntries - GetEntriesProcessed();
01459                Long64_t maxEntries = -1;
01460                if (fFilesToProcess->Last()) {
01461                   TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement();
01462                   if (elem) maxEntries = elem->GetNum();
01463                }
01464                if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
01465                   PDB(kPacketizer,3) {
01466                      Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
01467                      Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
01468                                                  slstat->GetOrdinal(), fFilesToProcess->GetSize(),
01469                                                 (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
01470                   }
01471                   cpsync = kFALSE;
01472                }
01473             }
01474          }
01475          if (cachesz > 0 && cpsync) {
01476             if ((Long64_t)(rate * packetTime * bevt) < cachesz)
01477                packetTime = cachesz / bevt / rate;
01478          }
01479 
01480          // Apply min-max again, if required
01481          if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
01482          if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
01483 
01484          // Translate the packet length in number of entries
01485          num = (Long64_t)(rate * packetTime);
01486 
01487          // Notify
01488          PDB(kPacketizer,2)
01489             Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
01490                  slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
01491                  packetTime, num*bevt/1048576., cachesz/1048576., num);
01492 
01493       } else {
01494          // First packet for this worker in this query
01495          // Twice the learning phase
01496          num = (learnent > 0) ? 5 * learnent : 1000;
01497 
01498          // Notify
01499          PDB(kPacketizer,2)
01500             Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(),  num);
01501       }
01502    }
01503    if (num < 1) num = 1;
01504    return num;
01505 }
01506 
01507 //______________________________________________________________________________
01508 Int_t TPacketizerAdaptive::AddProcessed(TSlave *sl,
01509                                         TProofProgressStatus *status,
01510                                         Double_t latency,
01511                                         TList **listOfMissingFiles)
01512 {
01513    // To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
01514    // message (when the worker was asked to stop processing during a packet).
01515    // returns the #entries intended in the last packet - #processed entries
01516 
01517    // find slave
01518    TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
01519    if (!slstat) {
01520       Error("AddProcessed", "TSlaveStat instance for worker %s not found!",
01521                             (sl ? sl->GetName() : "**undef**"));
01522       return -1;
01523    }
01524 
01525    // update stats & free old element
01526 
01527    if ( slstat->fCurElem != 0 ) {
01528       Long64_t expectedNumEv = slstat->fCurElem->GetNum();
01529       // Calculate the number of events processed in the last packet
01530       Long64_t numev;
01531       if (status && status->GetEntries() > 0)
01532          numev = status->GetEntries() - slstat->GetEntriesProcessed();
01533       else
01534          numev = 0;
01535 
01536       // Calculate the progress made in the last packet
01537       TProofProgressStatus *progress = 0;
01538       if (numev > 0) {
01539          // This also moves the pointer in the corrsponding TFileInfo
01540          progress = slstat->AddProcessed(status);
01541          if (progress) {
01542             (*fProgressStatus) += *progress;
01543             // update processing rate
01544             slstat->UpdateRates(status);
01545          }
01546       } else {
01547           progress = new TProofProgressStatus();
01548       }
01549       if (progress) {
01550          PDB(kPacketizer,2)
01551             Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
01552                sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
01553                progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
01554 
01555          if (gPerfStats)
01556             gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
01557                                     slstat->fCurElem->GetFileName(),
01558                                     progress->GetEntries(),
01559                                     latency,
01560                                     progress->GetProcTime(),
01561                                     progress->GetCPUTime(),
01562                                     progress->GetBytesRead());
01563          delete progress;
01564       }
01565       if (numev != expectedNumEv) {
01566          // The last packet was not fully processed
01567          // and will be split in two:
01568          // - The completed part was marked as done.
01569          // - Create a new packet with the part to be resubmitted.
01570          TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
01571          if (newPacket && numev < expectedNumEv) {
01572             Long64_t first = newPacket->GetFirst();
01573             newPacket->SetFirst(first + numev);
01574             if (listOfMissingFiles && *listOfMissingFiles)
01575                ReassignPacket(newPacket, listOfMissingFiles);
01576             else
01577                Error("AddProcessed", "No list for missing files!");
01578          } else
01579             Error("AddProcessed", "Processed too much? (%lld, %lld)", numev, expectedNumEv);
01580 
01581          // TODO: a signal handler which will send info from the worker
01582          // after a packet fails.
01583          /* Add it to the failed packets list.
01584          if (!fFailedPackets) {
01585             fFailedPackets = new TList();
01586          }
01587          fFailedPackets->Add(slstat->fCurElem);
01588          */
01589       }
01590 
01591       slstat->fCurElem = 0;
01592       return (expectedNumEv - numev);
01593    } else {
01594       // the kPROOF_STOPPRPOCESS message is send after the worker receives zero
01595       // as the reply to kPROOF_GETNEXTPACKET
01596       return -1;
01597    }
01598 }
01599 
01600 //______________________________________________________________________________
01601 TDSetElement *TPacketizerAdaptive::GetNextPacket(TSlave *sl, TMessage *r)
01602 {
01603    // Get next packet;
01604    // A meaningfull difference to TPacketizer is the fact that this
01605    // packetizer, for each worker, tries to predict whether the worker
01606    // will finish processing it's local files before the end of the query.
01607    // If yes, it allocates, to those workers, files from non-slave filenodes
01608    // or from slaves that are overloaded. The check is done every time a new
01609    // file needs to be assigned.
01610 
01611    if ( !fValid ) {
01612       return 0;
01613    }
01614 
01615    // find slave
01616 
01617    TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
01618    if (!slstat) {
01619       Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
01620                             (sl ? sl->GetName() : "**undef**"));
01621       return 0;
01622    }
01623 
01624    // Attach to current file
01625    TFileStat *file = slstat->fCurFile;
01626 
01627    // Update stats & free old element
01628 
01629    Bool_t firstPacket = kFALSE;
01630    Long64_t cachesz = -1;
01631    Int_t learnent = -1;
01632    Long64_t totalEntries = 0;
01633    if ( slstat->fCurElem != 0 ) {
01634 
01635       Double_t latency, proctime, proccpu;
01636       TProofProgressStatus *status = 0;
01637 
01638       if (sl->GetProtocol() > 18) {
01639 
01640          (*r) >> latency;
01641          (*r) >> status;
01642 
01643          if (sl->GetProtocol() > 25) {
01644             (*r) >> cachesz >> learnent;
01645             if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
01646          }
01647 
01648       } else {
01649 
01650          Long64_t bytesRead = -1;
01651 
01652          (*r) >> latency >> proctime >> proccpu;
01653          // only read new info if available
01654          if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
01655          if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
01656          Long64_t totev = 0;
01657          if (r->BufferSize() > r->Length()) (*r) >> totev;
01658 
01659          status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
01660       }
01661 
01662       if (totalEntries >= 0) {
01663          if (AddProcessed(sl, status, latency))
01664             Error("GetNextPacket", "the worker processed a different # of entries");
01665          if (fProgressStatus->GetEntries() >= fTotalEntries) {
01666             if (fProgressStatus->GetEntries() > fTotalEntries)
01667                Error("GetNextPacket", "Processed too many entries! (%lld, %lld)", fProgressStatus->GetEntries(), fTotalEntries);
01668             // Send last timer message and stop the timer
01669             HandleTimer(0);
01670             SafeDelete(fProgress);
01671          }
01672       } else if (file && file->GetElement()) {
01673          Info("GetNextPacket", "file '%s' could not be open: invalidating related element",
01674                                file->GetElement()->GetName()); 
01675          file->GetElement()->Invalidate();
01676          file->SetDone();
01677          // Add it to the failed packets list.
01678          if (!fFailedPackets) fFailedPackets = new TList();
01679          fFailedPackets->Add(file->GetElement());
01680       }
01681    } else {
01682       firstPacket = kTRUE;
01683    }
01684 
01685    if ( fStop ) {
01686       HandleTimer(0);
01687       return 0;
01688    }
01689 
01690    TString nodeName;
01691    if (file != 0) nodeName = file->GetNode()->GetName();
01692    TString nodeHostName(slstat->GetName());
01693 
01694    PDB(kPacketizer,3)
01695       Info("GetNextPacket", "%s: looking for a packet from node '%s'", sl->GetOrdinal(), nodeName.Data());
01696 
01697    // if current file is just finished
01698    if ( file != 0 && file->IsDone() ) {
01699       file->GetNode()->DecExtSlaveCnt(slstat->GetName());
01700       file->GetNode()->DecRunSlaveCnt();
01701       if (gPerfStats)
01702          gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
01703                                file->GetElement()->GetFileName(), kFALSE);
01704       file = 0;
01705    }
01706    // Reset the current file field
01707    slstat->fCurFile = file;
01708 
01709    Long64_t avgEventsLeftPerSlave =
01710       (fTotalEntries - fProgressStatus->GetEntries()) / fSlaveStats->GetSize();
01711    if (fTotalEntries == fProgressStatus->GetEntries())
01712       return 0;
01713    // get a file if needed
01714    if ( file == 0) {
01715       // needs a new file
01716       Bool_t openLocal;
01717       // aiming for localPreference == 1 when #local == #remote events left
01718       Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
01719                                 (0.4 *(fTotalEntries - fProgressStatus->GetEntries())));
01720       if ( slstat->GetFileNode() != 0 ) {
01721          // local file node exists and has more events to process.
01722          fUnAllocated->Sort();
01723          TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
01724          Bool_t nonLocalNodePossible;
01725          if (fForceLocal)
01726             nonLocalNodePossible = 0;
01727          else
01728             nonLocalNodePossible = firstNonLocalNode ?
01729                     (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
01730                                                      : 0;
01731          openLocal = !nonLocalNodePossible;
01732          Float_t slaveRate = slstat->GetAvgRate();
01733          if ( nonLocalNodePossible && fStrategy == 1) {
01734             // openLocal is set to kFALSE
01735             if ( slstat->GetFileNode()->GetRunSlaveCnt() >
01736                  slstat->GetFileNode()->GetMySlaveCnt() - 1 )
01737                 // external slaves help slstat -> don't open nonlocal files
01738                 // -1 because, at this point slstat is not running
01739                   openLocal = kTRUE;
01740             else if ( slaveRate == 0 ) { // first file for this slave
01741                // GetLocalEventsLeft() counts the potential slave
01742                // as running on its fileNode.
01743                if ( slstat->GetLocalEventsLeft() * localPreference
01744                    > (avgEventsLeftPerSlave))
01745                   openLocal = kTRUE;
01746                else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
01747                      < slstat->GetLocalEventsLeft() * localPreference )
01748                   openLocal = kTRUE;
01749                else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
01750                   openLocal = kTRUE;
01751                else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
01752                   openLocal = kTRUE;
01753             } else {
01754                // at this point slstat has a non zero avg rate > 0
01755                Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
01756                // and thus fCumProcTime, fProcessed > 0
01757                Float_t avgTime = avgEventsLeftPerSlave
01758                                  /(fProgressStatus->GetEntries()/GetCumProcTime());
01759                if (slaveTime * localPreference > avgTime)
01760                   openLocal = kTRUE;
01761                else if ((firstNonLocalNode->GetEventsLeftPerSlave())
01762                         < slstat->GetLocalEventsLeft() * localPreference)
01763                   openLocal = kTRUE;
01764             }
01765          }
01766          if (openLocal || fStrategy == 0) {
01767             // Try its own node
01768             file = slstat->GetFileNode()->GetNextUnAlloc();
01769             if (!file)
01770                file = slstat->GetFileNode()->GetNextActive();
01771             if ( file == 0 ) {
01772                //no more files on this slave.
01773                slstat->SetFileNode(0);
01774             }
01775          }
01776       }
01777 
01778       // Try to find an unused filenode first
01779       if(file == 0 && !fForceLocal) {
01780          file = GetNextUnAlloc(0, nodeHostName);
01781       }
01782 
01783       // Then look at the active filenodes
01784       if(file == 0 && !fForceLocal) {
01785          file = GetNextActive();
01786       }
01787 
01788       if ( file == 0 ) return 0;
01789 
01790       PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
01791 
01792       slstat->fCurFile = file;
01793       // if remote and unallocated file
01794       if (file->GetNode()->GetMySlaveCnt() == 0 &&
01795          file->GetElement()->GetFirst() == file->GetNextEntry()) {
01796          fNEventsOnRemLoc -= file->GetElement()->GetNum();
01797          if (fNEventsOnRemLoc < 0) {
01798             Error("GetNextPacket",
01799                   "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
01800                    fNEventsOnRemLoc);
01801             return 0;
01802          }
01803       }
01804       file->GetNode()->IncExtSlaveCnt(slstat->GetName());
01805       file->GetNode()->IncRunSlaveCnt();
01806       if (gPerfStats)
01807          gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
01808                                file->GetNode()->GetName(),
01809                                file->GetElement()->GetFileName(), kTRUE);
01810    }
01811 
01812    Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
01813 
01814    // get a packet
01815 
01816    TDSetElement *base = file->GetElement();
01817    Long64_t first = file->GetNextEntry();
01818    Long64_t last = base->GetFirst() + base->GetNum();
01819 
01820    // if the remaining part is smaller than the (packetsize * 1.5)
01821    // then increase the packetsize
01822 
01823    if ( first + num * 1.5 >= last ) {
01824       num = last - first;
01825       file->SetDone(); // done
01826 
01827       // delete file from active list (unalloc list is single pass, no delete needed)
01828       RemoveActive(file);
01829 
01830    }
01831 
01832    // Update NextEntry in the file object
01833    file->MoveNextEntry(num);
01834 
01835    slstat->fCurElem = CreateNewPacket(base, first, num);
01836    if (base->GetEntryList())
01837       slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
01838 
01839    // Flag the first packet of a new run (dataset)
01840    if (firstPacket)
01841       slstat->fCurElem->SetBit(TDSetElement::kNewRun);
01842    else
01843       slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
01844 
01845    PDB(kPacketizer,2)
01846       Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
01847 
01848    return slstat->fCurElem;
01849 }
01850 
01851 //______________________________________________________________________________
01852 Int_t TPacketizerAdaptive::GetActiveWorkers()
01853 {
01854    // Return the number of workers still processing
01855 
01856    Int_t actw = 0;
01857    TIter nxw(fSlaveStats);
01858    TObject *key;
01859    while ((key = nxw())) {
01860       TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01861       if (wrkstat && wrkstat->fCurFile) actw++;
01862    }
01863    // Done
01864    return actw;
01865 }
01866 
01867 //______________________________________________________________________________
01868 Float_t TPacketizerAdaptive::GetCurrentRate(Bool_t &all)
01869 {
01870    // Get Estimation of the current rate; just summing the current rates of
01871    // the active workers
01872 
01873    all = kTRUE;
01874    // Loop over the workers
01875    Float_t currate = 0.;
01876    if (fSlaveStats && fSlaveStats->GetSize() > 0) {
01877       TIter nxw(fSlaveStats);
01878       TObject *key;
01879       while ((key = nxw()) != 0) {
01880          TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01881          if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
01882             // Sum-up the current rates
01883             currate += slstat->GetProgressStatus()->GetCurrentRate();
01884          } else {
01885             all = kFALSE;
01886          }
01887       }
01888    }
01889    // Done
01890    return currate;
01891 }
01892 
01893 //______________________________________________________________________________
01894 Int_t TPacketizerAdaptive::GetEstEntriesProcessed(Float_t t, Long64_t &ent,
01895                                                   Long64_t &bytes, Long64_t &calls)
01896 {
01897    // Get estimation for the number of processed entries and bytes read at time t,
01898    // based on the numbers already processed and the latests worker measured speeds.
01899    // If t <= 0 the current time is used.
01900    // Only the estimation for the entries is currently implemented.
01901    // This is needed to smooth the instantaneous rate plot.
01902 
01903    // Default value
01904    ent = GetEntriesProcessed();
01905    bytes = GetBytesRead();
01906    calls = GetReadCalls();
01907 
01908    // Parse option
01909    if (fUseEstOpt == kEstOff)
01910       // Do not use estimation
01911       return 0;
01912    Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
01913 
01914    TTime tnow = gSystem->Now();
01915    Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
01916    Double_t dt = -1;
01917 
01918    // Loop over the workers
01919    Bool_t all = kTRUE;
01920    Float_t trate = 0.;
01921    if (fSlaveStats && fSlaveStats->GetSize() > 0) {
01922       ent = 0;
01923       TIter nxw(fSlaveStats);
01924       TObject *key;
01925       while ((key = nxw()) != 0) {
01926          TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01927          if (slstat) {
01928             // Those surely processed
01929             Long64_t e = slstat->GetEntriesProcessed();
01930             if (e <= 0) all = kFALSE;
01931             // Time elapsed since last update
01932             dt = now - slstat->GetProgressStatus()->GetLastUpdate();
01933             // Add estimated entries processed since last update
01934             Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
01935                                                                  : slstat->GetAvgRate();
01936             trate += rate;
01937             // Add estimated entries processed since last update
01938             e += (Long64_t) (dt * rate);
01939             // Add to the total
01940             ent += e;
01941             // Notify
01942             PDB(kPacketizer,3)
01943                Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
01944                                           slstat->fSlave->GetOrdinal(),
01945                                           slstat->GetEntriesProcessed(), rate, dt, e);
01946          }
01947       }
01948    }
01949    // Notify
01950    dt = now - fProgressStatus->GetLastUpdate();
01951    PDB(kPacketizer,2)
01952       Info("GetEstEntriesProcessed",
01953            "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
01954                                dt, ent, GetEntriesProcessed(), bytes, trate, all);
01955 
01956    // Check values
01957    ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
01958    ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
01959    bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
01960 
01961    // Done
01962    return ((all) ? 0 : 1);
01963 }
01964 
01965 //______________________________________________________________________________
01966 void TPacketizerAdaptive::MarkBad(TSlave *s, TProofProgressStatus *status,
01967                                   TList **listOfMissingFiles)
01968 {
01969    // This method can be called at any time during processing
01970    // as an effect of handling kPROOF_STOPPROCESS
01971    // If the output list from this worker is going to be sent back to the master,
01972    // the 'status' includes the number of entries processed by the slave.
01973    // From this we calculate the remaining part of the packet.
01974    // 0 indicates that the results from that worker were lost completely.
01975    // Assume that the filenodes for which we have a TFileNode object
01976    // are still up and running.
01977 
01978    TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
01979    if (!slaveStat) {
01980       Error("MarkBad", "Worker does not exist");
01981       return;
01982    }
01983    // Update worker counters
01984    if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
01985       slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
01986       slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
01987    }
01988 
01989    // If status is defined, the remaining part of the last packet is
01990    // reassigned in AddProcessed called from handling kPROOF_STOPPROCESS
01991    if (!status) {
01992       // Get the subset processed by the bad worker.
01993       TList *subSet = slaveStat->GetProcessedSubSet();
01994       if (subSet) {
01995          // Take care of the current packet
01996          if (slaveStat->fCurElem) {
01997             subSet->Add(slaveStat->fCurElem);
01998          }
01999          // Merge overlapping or subsequent elements
02000          Int_t nmg = 0, ntries = 100;
02001          TDSetElement *e = 0, *enxt = 0;
02002          do {
02003             nmg = 0;
02004             e = (TDSetElement *) subSet->First();
02005             while ((enxt = (TDSetElement *) subSet->After(e))) {
02006                if (e->MergeElement(enxt) >= 0) {
02007                   nmg++;
02008                   subSet->Remove(enxt);
02009                   delete enxt;
02010                } else {
02011                   e = enxt;
02012                }
02013             }
02014          } while (nmg > 0 && --ntries > 0);
02015          // reassign the packets assigned to the bad slave and save the size;
02016          SplitPerHost(subSet, listOfMissingFiles);
02017          // the elements were reassigned so should not be deleted
02018          subSet->SetOwner(0);
02019       } else {
02020          Warning("MarkBad", "subset processed by bad worker not found!");
02021       }
02022       (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
02023    }
02024    // remove slavestat from the map
02025    fSlaveStats->Remove(s);
02026    delete slaveStat;
02027    // recalculate fNEventsOnRemLoc and others
02028    InitStats();
02029 }
02030 
02031 //______________________________________________________________________________
02032 Int_t TPacketizerAdaptive::ReassignPacket(TDSetElement *e,
02033                                           TList **listOfMissingFiles)
02034 {
02035    // The file in the listOfMissingFiles can appear several times;
02036    // in order to fix that, a TDSetElement::Merge method is needed.
02037 
02038    if (!e) {
02039       Error("ReassignPacket", "Empty packet!");
02040       return -1;
02041    }
02042    // check the old filenode
02043    TUrl url = e->GetFileName();
02044    // Check the host from which 'e' was previously read.
02045    // Map non URL filenames to dummy host
02046    TString host;
02047    if ( !url.IsValid() ||
02048        (strncmp(url.GetProtocol(),"root", 4) &&
02049         strncmp(url.GetProtocol(),"rfio", 4))) {
02050       host = "no-host";
02051    } else {
02052       host = url.GetHost();
02053    }
02054 
02055    // if accessible add it back to the old node
02056    // and do DecProcessed
02057    TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
02058    if (node) {
02059       // the packet 'e' was processing data from this node.
02060       node->DecreaseProcessed(e->GetNum());
02061       node->Add(e, kFALSE); // The file should be already in fFilesToProcess ...
02062       if (!fUnAllocated->FindObject(node))
02063          fUnAllocated->Add(node);
02064       return 0;
02065    } else {
02066       // add to the list of missing files
02067       TFileInfo *fi = e->GetFileInfo();
02068       if (listOfMissingFiles)
02069          (*listOfMissingFiles)->Add((TObject *)fi);
02070       return -1;
02071    }
02072 }
02073 
02074 //______________________________________________________________________________
02075 void TPacketizerAdaptive::SplitPerHost(TList *elements,
02076                                        TList **listOfMissingFiles)
02077 {
02078    // Split into per host entries
02079    // The files in the listOfMissingFiles can appear several times;
02080    // in order to fix that, a TDSetElement::Merge method is needed.
02081 
02082    if (!elements) {
02083       Error("SplitPerHost", "Empty list of packets!");
02084       return;
02085    }
02086    if (elements->GetSize() <= 0) {
02087       Error("SplitPerHost", "The input list contains no elements");
02088       return;
02089    }
02090    TIter subSetIter(elements);
02091    TDSetElement *e;
02092    while ((e = (TDSetElement*) subSetIter.Next())) {
02093       if (ReassignPacket(e, listOfMissingFiles) == -1) {
02094          // remove from the list in order to delete it.
02095          if (elements->Remove(e))
02096             Error("SplitPerHost", "Error removing a missing file");
02097          delete e;
02098       }
02099 
02100    }
02101 }

Generated on Tue Jul 5 14:52:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1