TPacketizer.cxx

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id$
00002 // Author: Maarten Ballintijn    18/03/02
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TPacketizer                                                          //
00015 //                                                                      //
00016 // This class generates packets to be processed on PROOF worker servers.//
00017 // A packet is an event range (begin entry and number of entries) or    //
00018 // object range (first object and number of objects) in a TTree         //
00019 // (entries) or a directory (objects) in a file.                        //
00020 // Packets are generated taking into account the performance of the     //
00021 // remote machine, the time it took to process a previous packet on     //
00022 // the remote machine, the locality of the database files, etc.         //
00023 //                                                                      //
00024 //////////////////////////////////////////////////////////////////////////
00025 
00026 
00027 #include "TPacketizer.h"
00028 
00029 #include "Riostream.h"
00030 #include "TDSet.h"
00031 #include "TEnv.h"
00032 #include "TError.h"
00033 #include "TEventList.h"
00034 #include "TEntryList.h"
00035 #include "TMap.h"
00036 #include "TMessage.h"
00037 #include "TMonitor.h"
00038 #include "TNtupleD.h"
00039 #include "TObject.h"
00040 #include "TParameter.h"
00041 #include "TPerfStats.h"
00042 #include "TProofDebug.h"
00043 #include "TProof.h"
00044 #include "TProofPlayer.h"
00045 #include "TProofServ.h"
00046 #include "TSlave.h"
00047 #include "TSocket.h"
00048 #include "TTimer.h"
00049 #include "TUrl.h"
00050 #include "TClass.h"
00051 #include "TMath.h"
00052 #include "TObjString.h"
00053 
00054 //
00055 // The following three utility classes manage the state of the
00056 // work to be performed and the slaves involved in the process.
00057 // A list of TFileNode(s) describes the hosts with files, each
00058 // has a list of TFileStat(s) keeping the state for each TDSet
00059 // element (file).
00060 //
00061 // The list of TSlaveStat(s) keep track of the work (being) done
00062 // by each slave
00063 //
00064 
00065 
00066 //------------------------------------------------------------------------------
00067 
00068 class TPacketizer::TFileStat : public TObject {
00069 
00070 private:
00071    Bool_t         fIsDone;       // is this element processed
00072    TFileNode     *fNode;         // my FileNode
00073    TDSetElement  *fElement;      // location of the file and its range
00074    Long64_t       fNextEntry;    // cursor in the range, -1 when done
00075 
00076 public:
00077    TFileStat(TFileNode *node, TDSetElement *elem);
00078 
00079    Bool_t         IsDone() const {return fIsDone;}
00080    void           SetDone() {fIsDone = kTRUE;}
00081    TFileNode     *GetNode() const {return fNode;}
00082    TDSetElement  *GetElement() const {return fElement;}
00083    Long64_t       GetNextEntry() const {return fNextEntry;}
00084    void           MoveNextEntry(Long64_t step) {fNextEntry += step;}
00085 };
00086 
00087 
00088 TPacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
00089    : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
00090 {
00091 }
00092 
00093 
00094 //------------------------------------------------------------------------------
00095 
00096 class TPacketizer::TFileNode : public TObject {
00097 
00098 private:
00099    TString        fNodeName;        // FQDN of the node
00100    TList         *fFiles;           // TDSetElements (files) stored on this node
00101    TObject       *fUnAllocFileNext; // cursor in fFiles
00102    TList         *fActFiles;        // files with work remaining
00103    TObject       *fActFileNext;     // cursor in fActFiles
00104    Int_t          fMySlaveCnt;      // number of slaves running on this node
00105    Int_t          fSlaveCnt;        // number of external slaves processing files on this node
00106 
00107 public:
00108    TFileNode(const char *name);
00109    ~TFileNode() { delete fFiles; delete fActFiles; }
00110 
00111    void        IncMySlaveCnt() { fMySlaveCnt++; }
00112    void        IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
00113    void        DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
00114    Int_t       GetSlaveCnt() const {return fMySlaveCnt + fSlaveCnt;}
00115    Int_t       GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
00116    Bool_t      IsSortable() const { return kTRUE; }
00117 
00118    const char *GetName() const { return fNodeName.Data(); }
00119 
00120    void Add(TDSetElement *elem)
00121    {
00122       TFileStat *f = new TFileStat(this,elem);
00123       fFiles->Add(f);
00124       if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
00125    }
00126 
00127    TFileStat *GetNextUnAlloc()
00128    {
00129       TObject *next = fUnAllocFileNext;
00130 
00131       if (next != 0) {
00132          // make file active
00133          fActFiles->Add(next);
00134          if (fActFileNext == 0) fActFileNext = fActFiles->First();
00135 
00136          // move cursor
00137          fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
00138       }
00139 
00140       return (TFileStat *) next;
00141    }
00142 
00143    TFileStat *GetNextActive()
00144    {
00145       TObject *next = fActFileNext;
00146 
00147       if (fActFileNext != 0) {
00148          fActFileNext = fActFiles->After(fActFileNext);
00149          if (fActFileNext == 0) fActFileNext = fActFiles->First();
00150       }
00151 
00152       return (TFileStat *) next;
00153    }
00154 
00155    void RemoveActive(TFileStat *file)
00156    {
00157       if (fActFileNext == file) fActFileNext = fActFiles->After(file);
00158       fActFiles->Remove(file);
00159       if (fActFileNext == 0) fActFileNext = fActFiles->First();
00160    }
00161 
00162    Int_t Compare(const TObject *other) const
00163    {
00164       // Must return -1 if this is smaller than obj, 0 if objects are equal
00165       // and 1 if this is larger than obj.
00166       const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
00167       if (!obj) {
00168          Error("Compare", "input is not a TPacketizer::TFileNode object");
00169          return 0;
00170       }
00171 
00172       Int_t myVal = GetSlaveCnt();
00173       Int_t otherVal = obj->GetSlaveCnt();
00174       if (myVal < otherVal) {
00175          return -1;
00176       } else if (myVal > otherVal) {
00177          return 1;
00178       } else {
00179          return 0;
00180       }
00181    }
00182 
00183    void Print(Option_t *) const
00184    {
00185       cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
00186            << "\tMySlaveCount " << fMySlaveCnt
00187            << "\tSlaveCount " << fSlaveCnt << endl;
00188    }
00189 
00190    void Reset()
00191    {
00192       fUnAllocFileNext = fFiles->First();
00193       fActFiles->Clear();
00194       fActFileNext = 0;
00195       fSlaveCnt = 0;
00196       fMySlaveCnt = 0;
00197    }
00198 };
00199 
00200 
00201 TPacketizer::TFileNode::TFileNode(const char *name)
00202    : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
00203      fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
00204 {
00205    // Constructor
00206 
00207    fFiles->SetOwner();
00208    fActFiles->SetOwner(kFALSE);
00209 }
00210 
00211 
00212 //------------------------------------------------------------------------------
00213 
00214 class TPacketizer::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
00215 
00216 friend class TPacketizer;
00217 
00218 private:
00219    TFileNode     *fFileNode;     // corresponding node or 0
00220    TFileStat     *fCurFile;      // file currently being processed
00221    TDSetElement  *fCurElem;      // TDSetElement currently being processed
00222    TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
00223 public:
00224    TSlaveStat(TSlave *slave);
00225    ~TSlaveStat();
00226 
00227    TFileNode  *GetFileNode() const { return fFileNode; }
00228 
00229    void        SetFileNode(TFileNode *node) { fFileNode = node; }
00230 };
00231 
00232 
00233 TPacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
00234    : fFileNode(0), fCurFile(0), fCurElem(0)
00235 {
00236    fSlave = slave;
00237    fStatus = new TProofProgressStatus();
00238 }
00239 
00240 //______________________________________________________________________________
00241 TPacketizer::TSlaveStat::~TSlaveStat()
00242 {
00243    // Cleanup
00244 
00245    SafeDelete(fStatus);
00246 }
00247 
00248 TProofProgressStatus *TPacketizer::TSlaveStat::AddProcessed(TProofProgressStatus *st)
00249 {
00250    // Update the status info to the 'st'.
00251    // return the difference (*st - *fStatus)
00252 
00253    if (st) {
00254       // The entriesis not correct in 'st'
00255       Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
00256       // The last proc time should not be added
00257       fStatus->SetLastProcTime(0.);
00258       // Get the diff
00259       TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
00260       *fStatus += *diff;
00261       // Set the correct value
00262       fStatus->SetLastEntries(lastEntries);
00263       return diff;
00264    } else {
00265       Error("AddProcessed", "status arg undefined");
00266       return 0;
00267    }
00268 }
00269 
00270 //------------------------------------------------------------------------------
00271 
00272 ClassImp(TPacketizer)
00273 
00274 //______________________________________________________________________________
00275 TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first,
00276                          Long64_t num, TList *input, TProofProgressStatus *st)
00277             : TVirtualPacketizer(input, st)
00278 {
00279    // Constructor
00280 
00281    PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
00282 
00283    // Init pointer members
00284    fSlaveStats = 0;
00285    fPackets = 0;
00286    fSlaveStats = 0;
00287    fUnAllocated = 0;
00288    fActive = 0;
00289    fFileNodes = 0;
00290    fMaxPerfIdx = 1;
00291    fMaxSlaveCnt = 0;
00292 
00293    if (!fProgressStatus) {
00294       Error("TPacketizerAdaptive", "No progress status");
00295       return;
00296    }
00297 
00298    Long_t maxSlaveCnt = 0;
00299    if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
00300       if (maxSlaveCnt < 1) {
00301          Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be grater than 0");
00302          maxSlaveCnt = 0;
00303       }
00304    } else {
00305       // Try also with Int_t (recently supported in TProof::SetParameter)
00306       Int_t mxslcnt = -1;
00307       if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
00308          if (mxslcnt < 1) {
00309             Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be grater than 0");
00310             mxslcnt = 0;
00311          }
00312          maxSlaveCnt = (Long_t) mxslcnt;
00313       }
00314    }
00315    if (!maxSlaveCnt)
00316       maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 4);
00317    if (maxSlaveCnt > 0) {
00318       fMaxSlaveCnt = maxSlaveCnt;
00319       PDB(kPacketizer,1)
00320          Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
00321    }
00322 
00323    fPackets = new TList;
00324    fPackets->SetOwner();
00325 
00326    fFileNodes = new TList;
00327    fFileNodes->SetOwner();
00328    fUnAllocated = new TList;
00329    fUnAllocated->SetOwner(kFALSE);
00330    fActive = new TList;
00331    fActive->SetOwner(kFALSE);
00332 
00333 
00334    fValid = kTRUE;
00335 
00336    // Resolve end-point urls to optmize distribution
00337    // dset->Lookup(); // moved to TProofPlayerRemote::Process
00338 
00339    // Split into per host entries
00340    dset->Reset();
00341    TDSetElement *e;
00342    while ((e = (TDSetElement*)dset->Next())) {
00343       if (e->GetValid()) continue;
00344 
00345       TUrl url = e->GetFileName();
00346 
00347       // Map non URL filenames to dummy host
00348       TString host;
00349       if ( !url.IsValid() ||
00350           (strncmp(url.GetProtocol(),"root", 4) &&
00351            strncmp(url.GetProtocol(),"rfio", 4)) ) {
00352          host = "no-host";
00353       } else {
00354          host = url.GetHost();
00355       }
00356 
00357       TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
00358 
00359       if (node == 0) {
00360          node = new TFileNode(host);
00361          fFileNodes->Add(node);
00362       }
00363 
00364       node->Add( e );
00365    }
00366 
00367    fSlaveStats = new TMap;
00368    fSlaveStats->SetOwner(kFALSE);
00369 
00370    TSlave *slave;
00371    TIter si(slaves);
00372    while ((slave = (TSlave*) si.Next())) {
00373       fSlaveStats->Add( slave, new TSlaveStat(slave) );
00374       fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
00375          slave->GetPerfIdx() : fMaxPerfIdx;
00376    }
00377 
00378    // Setup file & filenode structure
00379    Reset();
00380    // Optimize the number of files to be open when running on subsample
00381    Int_t validateMode = 0;
00382    Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
00383    Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
00384    ValidateFiles(dset, slaves, num, byfile);
00385 
00386    if (!fValid) return;
00387 
00388    // apply global range (first,num) to dset and rebuild structure
00389    // ommitting TDSet elements that are not needed
00390 
00391    Int_t files = 0;
00392    fTotalEntries = 0;
00393    fUnAllocated->Clear();  // avoid dangling pointers
00394    fActive->Clear();
00395    fFileNodes->Clear();    // then delete all objects
00396    PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
00397 
00398    dset->Reset();
00399    Long64_t cur = 0;
00400    while (( e = (TDSetElement*)dset->Next())) {
00401 
00402       // Skip invalid or missing file; It will be moved
00403       // from the dset to the 'MissingFiles' list in the player.
00404       if (!e->GetValid()) continue;
00405 
00406       // The dataset name, if any
00407       if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
00408          fDataSet = e->GetDataSet();
00409 
00410       TUrl url = e->GetFileName();
00411       Long64_t eFirst = e->GetFirst();
00412       Long64_t eNum = e->GetNum();
00413       PDB(kPacketizer,2)
00414          Info("TPacketizer", "processing element: first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
00415 
00416       if (!e->GetEntryList()){
00417          // this element is before the start of the global range, skip it
00418          if (cur + eNum < first) {
00419             cur += eNum;
00420             PDB(kPacketizer,2)
00421                Info("TPacketizer", "processing element: skip element cur %lld", cur);
00422             continue;
00423          }
00424 
00425          // this element is after the end of the global range, skip it
00426          if (num != -1 && (first+num <= cur)) {
00427             cur += eNum;
00428             PDB(kPacketizer,2)
00429                Info("TPacketizer", "processing element: drop element cur %lld", cur);
00430             continue; // break ??
00431          }
00432 
00433          // If this element contains the end of the global range
00434          // adjust its number of entries
00435          if (num != -1 && (first+num <= cur+eNum)) {
00436             e->SetNum(first + num - cur);
00437             PDB(kPacketizer,2)
00438                Info("TPacketizer", "processing element: adjust end %lld", first + num - cur);
00439             cur += eNum;
00440             eNum = e->GetNum();
00441          }
00442 
00443          // If this element contains the start of the global range
00444          // adjust its start and number of entries
00445          if (cur < first) {
00446             e->SetFirst(eFirst + (first - cur));
00447             e->SetNum(e->GetNum() - (first - cur));
00448             PDB(kPacketizer,2)
00449                Info("TPacketizer", "processing element: adjust start %lld and end %lld",
00450                                   eFirst + (first - cur), first + num - cur);
00451             cur += eNum;
00452             eNum = e->GetNum();
00453          }
00454 
00455       } else {
00456          TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
00457          if (enl) {
00458             eNum = enl->GetN();
00459          } else {
00460             TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
00461             eNum = evl ? evl->GetN() : eNum;
00462          }
00463          if (!eNum)
00464             continue;
00465       }
00466       PDB(kPacketizer,2)
00467          Info("TPacketizer", "processing element: next cur %lld", cur);
00468 
00469       // Map non URL filenames to dummy host
00470       TString host;
00471       if ( !url.IsValid() ||
00472           (strncmp(url.GetProtocol(),"root", 4) &&
00473            strncmp(url.GetProtocol(),"rfio", 4)) ) {
00474          host = "no-host";
00475       } else {
00476          host = url.GetHostFQDN();
00477       }
00478 
00479       TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
00480 
00481       if ( node == 0 ) {
00482          node = new TFileNode( host );
00483          fFileNodes->Add( node );
00484       }
00485 
00486       ++files;
00487       fTotalEntries += eNum;
00488       node->Add(e);
00489       PDB(kPacketizer,2) e->Print("a");
00490    }
00491 
00492    PDB(kPacketizer,1)
00493       Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
00494                          fTotalEntries, files, fFileNodes->GetSize());
00495 
00496    // Set the total number for monitoring
00497    if (gPerfStats)
00498       gPerfStats->SetNumEvents(fTotalEntries);
00499 
00500    Reset();
00501 
00502    if (fFileNodes->GetSize() == 0) {
00503       Info("TPacketizer", "no valid or non-empty file found: setting invalid");
00504       // No valid files: set invalid and return
00505       fValid = kFALSE;
00506       return;
00507    }
00508 
00509    // Below we provide a possibility to change the way packet size is
00510    // calculated or define the packet size directly.
00511    // fPacketAsAFraction can be interpreted as follows:
00512    // assuming all slaves have equal processing rate,
00513    // packet size is (#events processed by 1 slave) / fPacketSizeAsAFraction.
00514    // It substitutes 20 in the old formula to calculate the fPacketSize:
00515    // fPacketSize = fTotalEntries / (20 * nslaves)
00516    Long_t packetAsAFraction = 20;
00517    if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0)
00518       Info("Process", "using alternate fraction of query time as a packet Size: %ld",
00519            packetAsAFraction);
00520    fPacketAsAFraction = (Int_t)packetAsAFraction;
00521 
00522    fPacketSize = 1;
00523    if (TProof::GetParameter(input, "PROOF_PacketSize", fPacketSize) == 0) {
00524       Info("Process","using alternate packet size: %lld", fPacketSize);
00525    } else {
00526       // Heuristic for starting packet size
00527       Int_t nslaves = fSlaveStats->GetSize();
00528       if (nslaves > 0) {
00529          fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves);
00530          if (fPacketSize < 1) fPacketSize = 1;
00531       } else {
00532          fPacketSize = 1;
00533       }
00534    }
00535 
00536    PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
00537 
00538    if (!fValid)
00539       SafeDelete(fProgress);
00540 
00541    PDB(kPacketizer,1) Info("TPacketizer", "Return");
00542 }
00543 
00544 //______________________________________________________________________________
00545 TPacketizer::~TPacketizer()
00546 {
00547    // Destructor.
00548 
00549    if (fSlaveStats) {
00550       fSlaveStats->DeleteValues();
00551    }
00552 
00553    SafeDelete(fPackets);
00554    SafeDelete(fSlaveStats);
00555    SafeDelete(fUnAllocated);
00556    SafeDelete(fActive);
00557    SafeDelete(fFileNodes);
00558 }
00559 
00560 //______________________________________________________________________________
00561 TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node)
00562 {
00563    // Get next unallocated file.
00564 
00565    TFileStat *file = 0;
00566 
00567    if (node != 0) {
00568       file = node->GetNextUnAlloc();
00569       if (file == 0) RemoveUnAllocNode(node);
00570    } else {
00571       while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
00572          file = node->GetNextUnAlloc();
00573          if (file == 0) RemoveUnAllocNode(node);
00574       }
00575    }
00576 
00577    if (file != 0) {
00578       // if needed make node active
00579       if (fActive->FindObject(node) == 0) {
00580          fActive->Add(node);
00581       }
00582    }
00583 
00584    return file;
00585 }
00586 
00587 //______________________________________________________________________________
00588 TPacketizer::TFileNode *TPacketizer::NextUnAllocNode()
00589 {
00590    // Get next unallocated node.
00591 
00592    fUnAllocated->Sort();
00593    PDB(kPacketizer,2) {
00594       cout << "TPacketizer::NextUnAllocNode()" << endl;
00595       fUnAllocated->Print();
00596    }
00597 
00598    TFileNode *fn = (TFileNode*) fUnAllocated->First();
00599    if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
00600       PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
00601                               fMaxSlaveCnt);
00602       fn = 0;
00603    }
00604 
00605    return fn;
00606 }
00607 
00608 //______________________________________________________________________________
00609 void TPacketizer::RemoveUnAllocNode(TFileNode * node)
00610 {
00611    // Remove unallocated node.
00612 
00613    fUnAllocated->Remove(node);
00614 }
00615 
00616 //______________________________________________________________________________
00617 TPacketizer::TFileStat *TPacketizer::GetNextActive()
00618 {
00619    // Get next active file.
00620 
00621    TFileNode *node;
00622    TFileStat *file = 0;
00623 
00624    while (file == 0 && ((node = NextActiveNode()) != 0)) {
00625          file = node->GetNextActive();
00626          if (file == 0) RemoveActiveNode(node);
00627    }
00628 
00629    return file;
00630 }
00631 
00632 //______________________________________________________________________________
00633 TPacketizer::TFileNode *TPacketizer::NextActiveNode()
00634 {
00635    // Get next active node.
00636 
00637    fActive->Sort();
00638    PDB(kPacketizer,2) {
00639       cout << "TPacketizer::NextActiveNode()" << endl;
00640       fActive->Print();
00641    }
00642 
00643    TFileNode *fn = (TFileNode*) fActive->First();
00644    if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
00645       PDB(kPacketizer,1) Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
00646       fn = 0;
00647    }
00648 
00649    return fn;
00650 }
00651 
00652 //______________________________________________________________________________
00653 void TPacketizer::RemoveActive(TFileStat *file)
00654 {
00655    // Remove file from the list of actives.
00656 
00657    TFileNode *node = file->GetNode();
00658 
00659    node->RemoveActive(file);
00660    if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
00661 }
00662 
00663 //______________________________________________________________________________
00664 void TPacketizer::RemoveActiveNode(TFileNode *node)
00665 {
00666    // Remove node from the list of actives.
00667 
00668    fActive->Remove(node);
00669 }
00670 
00671 //______________________________________________________________________________
00672 void TPacketizer::Reset()
00673 {
00674    // Reset the internal datastructure for packet distribution.
00675 
00676    fUnAllocated->Clear();
00677    fUnAllocated->AddAll(fFileNodes);
00678 
00679    fActive->Clear();
00680 
00681    TIter files(fFileNodes);
00682    TFileNode *fn;
00683    while ((fn = (TFileNode*) files.Next()) != 0) {
00684       fn->Reset();
00685    }
00686 
00687    TIter slaves(fSlaveStats);
00688    TObject *key;
00689    while ((key = slaves.Next()) != 0) {
00690       TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
00691       fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
00692       if (fn != 0 ) {
00693          slstat->SetFileNode(fn);
00694          fn->IncMySlaveCnt();
00695       }
00696       slstat->fCurFile = 0;
00697    }
00698 }
00699 
00700 //______________________________________________________________________________
00701 void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
00702 {
00703    // Check existence of file/dir/tree an get number of entries.
00704    // Assumes the files have been setup.
00705 
00706    TMap     slaves_by_sock;
00707    TMonitor mon;
00708    TList    workers;
00709 
00710 
00711    // Setup the communication infrastructure
00712 
00713    workers.AddAll(slaves);
00714    TIter    si(slaves);
00715    TSlave  *slm = 0;
00716    while ((slm = (TSlave*)si.Next()) != 0) {
00717       PDB(kPacketizer,3)
00718          Info("ValidateFiles","socket added to monitor: %p (%s)",
00719               slm->GetSocket(), slm->GetName());
00720       mon.Add(slm->GetSocket());
00721       slaves_by_sock.Add(slm->GetSocket(), slm);
00722       PDB(kPacketizer,1)
00723          Info("ValidateFiles",
00724               "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
00725    }
00726 
00727    mon.DeActivateAll();
00728 
00729    ((TProof*)gProof)->DeActivateAsyncInput();
00730 
00731    // Some monitoring systems (TXSocketHandler) need to know this
00732    ((TProof*)gProof)->fCurrentMonitor = &mon;
00733 
00734    // Preparing for client notification
00735    TString msg("Validating files");
00736    UInt_t n = 0;
00737    UInt_t tot = dset->GetListOfElements()->GetSize();
00738    Bool_t st = kTRUE;
00739 
00740    Long64_t totent = 0, nopenf = 0;
00741    while (kTRUE) {
00742 
00743       // send work
00744       while( TSlave *s = (TSlave*)workers.First() ) {
00745 
00746          workers.Remove(s);
00747 
00748          // find a file
00749 
00750          TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
00751          TFileNode *node = 0;
00752          TFileStat *file = 0;
00753 
00754          // try its own node first
00755          if ( (node = slstat->GetFileNode()) != 0 ) {
00756             file = GetNextUnAlloc(node);
00757             if ( file == 0 ) {
00758                slstat->SetFileNode(0);
00759             }
00760          }
00761 
00762          // look for a file on any other node if necessary
00763          if (file == 0) {
00764             file = GetNextUnAlloc();
00765          }
00766 
00767          if ( file != 0 ) {
00768             // files are done right away
00769             RemoveActive(file);
00770 
00771             slstat->fCurFile = file;
00772             TDSetElement *elem = file->GetElement();
00773             Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
00774             if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
00775                // This is decremented when we get the reply
00776                file->GetNode()->IncSlaveCnt(slstat->GetName());
00777                TMessage m(kPROOF_GETENTRIES);
00778                m << dset->IsTree()
00779                << TString(elem->GetFileName())
00780                << TString(elem->GetDirectory())
00781                << TString(elem->GetObjName());
00782 
00783                s->GetSocket()->Send( m );
00784                mon.Activate(s->GetSocket());
00785                PDB(kPacketizer,2)
00786                   Info("ValidateFiles",
00787                        "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
00788                        s->GetOrdinal(), s->GetName(), s->GetSocket(),
00789                        dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
00790                        elem->GetDirectory(), elem->GetObjName());
00791             } else {
00792                // Fill the info
00793                elem->SetTDSetOffset(entries);
00794                if (entries > 0) {
00795                   // Most likely valid
00796                   elem->SetValid();
00797                   if (!elem->GetEntryList()) {
00798                      if (elem->GetFirst() > entries) {
00799                         Error("ValidateFiles",
00800                               "first (%lld) higher then number of entries (%lld) in %s",
00801                               elem->GetFirst(), entries, elem->GetFileName());
00802                         // disable element
00803                         slstat->fCurFile->SetDone();
00804                         elem->Invalidate();
00805                         dset->SetBit(TDSet::kSomeInvalid);
00806                      }
00807                      if (elem->GetNum() == -1) {
00808                         elem->SetNum(entries - elem->GetFirst());
00809                      } else if (elem->GetFirst() + elem->GetNum() > entries) {
00810                         Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
00811                                  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
00812                                  entries, elem->GetFileName());
00813                         elem->SetNum(entries - elem->GetFirst());
00814                      }
00815                      PDB(kPacketizer,2)
00816                         Info("ValidateFiles",
00817                              "found elem '%s' with %lld entries", elem->GetFileName(), entries);
00818                   }
00819                }
00820                // Notify the client
00821                n++;
00822                gProof->SendDataSetStatus(msg, n, tot, st);
00823 
00824                // This worker is ready for the next validation
00825                workers.Add(s);
00826             }
00827          }
00828       }
00829 
00830       // Check if there is anything to wait for
00831       if (mon.GetActive() == 0) {
00832          if (byfile && maxent > 0) {
00833             // How many files do we still need ?
00834             Long64_t nrestf = (maxent - totent) * nopenf / totent ;
00835             if (nrestf <= 0 && maxent > totent) nrestf = 1;
00836             if (nrestf > 0) {
00837                PDB(kPacketizer,3)
00838                   Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files",
00839                                          maxent, totent, nopenf, nrestf);
00840                si.Reset();
00841                while ((slm = (TSlave *) si.Next()) && nrestf--) {
00842                   workers.Add(slm);
00843                }
00844                continue;
00845             } else {
00846                PDB(kPacketizer,3)
00847                   Info("ValidateFiles", "no need to validate more files");
00848                break;
00849             }
00850          } else {
00851             break;
00852          }
00853       }
00854 
00855       PDB(kPacketizer,3) {
00856          Info("ValidateFiles", "waiting for %d workers:", mon.GetActive());
00857          TList *act = mon.GetListOfActives();
00858          TIter next(act);
00859          TSocket *s = 0;
00860          while ((s = (TSocket*) next())) {
00861             Info("ValidateFiles", "found sck: %p", s);
00862             TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
00863             if (sl)
00864                Info("ValidateFiles", "   worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
00865          }
00866          delete act;
00867       }
00868 
00869       TSocket *sock = mon.Select();
00870       // If we have been interrupted break
00871       if (!sock) {
00872          Error("ValidateFiles", "selection has been interrupted - STOP");
00873          mon.DeActivateAll();
00874          fValid = kFALSE;
00875          break;
00876       }
00877       mon.DeActivate(sock);
00878 
00879       PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
00880 
00881       TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
00882       if (!sock->IsValid()) {
00883          // A socket got invalid during validation
00884          Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
00885                slave->GetOrdinal(), slave->GetName());
00886          ((TProof*)gProof)->MarkBad(slave);
00887          fValid = kFALSE;
00888          break;
00889       }
00890 
00891       TMessage *reply;
00892 
00893       if ( sock->Recv(reply) <= 0 ) {
00894          // Help! lost a slave?
00895          ((TProof*)gProof)->MarkBad(slave);
00896          fValid = kFALSE;
00897          Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
00898                slave->GetOrdinal(), slave->GetName());
00899          continue;
00900       }
00901 
00902       if (reply->What() != kPROOF_GETENTRIES) {
00903          // Not what we want: handover processing to the central machinery
00904          Int_t what = reply->What();
00905          ((TProof*)gProof)->HandleInputMessage(slave, reply);
00906          if (what == kPROOF_FATAL) {
00907              Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
00908                                     slave->GetOrdinal(), slave->GetName());
00909              fValid = kFALSE;
00910          } else {
00911             // Reactivate the socket
00912             mon.Activate(sock);
00913          }
00914          // Get next message
00915          continue;
00916       }
00917 
00918       TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
00919       TDSetElement *e = slavestat->fCurFile->GetElement();
00920       slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
00921       Long64_t entries;
00922 
00923       (*reply) >> entries;
00924 
00925       // Extract object name, if there
00926       if ((reply->BufferSize() > reply->Length())) {
00927          TString objname;
00928          (*reply) >> objname;
00929          e->SetTitle(objname);
00930       }
00931 
00932       e->SetTDSetOffset(entries);
00933       if ( entries > 0 ) {
00934 
00935          // This dataset element is most likely valid
00936          e->SetValid();
00937 
00938          //if (!e->GetEventList()) {
00939          if (!e->GetEntryList()){
00940             if ( e->GetFirst() > entries ) {
00941                Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s",
00942                                       e->GetFirst(), entries, e->GetFileName());
00943 
00944                // Invalidate the element
00945                slavestat->fCurFile->SetDone();
00946                e->Invalidate();
00947                dset->SetBit(TDSet::kSomeInvalid);
00948             }
00949 
00950             if ( e->GetNum() == -1 ) {
00951                e->SetNum( entries - e->GetFirst() );
00952             } else if ( e->GetFirst() + e->GetNum() > entries ) {
00953                Error("ValidateFiles",
00954                      "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
00955                      e->GetNum(), e->GetFirst(), entries, e->GetFileName());
00956                e->SetNum(entries - e->GetFirst());
00957             }
00958          }
00959 
00960          // Count
00961          totent += entries;
00962          nopenf++;
00963 
00964          // Notify the client
00965          n++;
00966          gProof->SendDataSetStatus(msg, n, tot, st);
00967 
00968       } else {
00969 
00970          Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
00971          //
00972          // Need to fix this with a user option to allow incomplete file sets (rdm)
00973          //
00974          //fValid = kFALSE; // all element must be readable!
00975          if (gProofServ) {
00976             TMessage m(kPROOF_MESSAGE);
00977             m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
00978             gProofServ->GetSocket()->Send(m);
00979          }
00980 
00981          // Invalidate the element
00982          e->Invalidate();
00983          dset->SetBit(TDSet::kSomeInvalid);
00984       }
00985       PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
00986 
00987       // Ready for the next job, unless we have enough files
00988       if (maxent < 0 || ((totent < maxent) && !byfile))
00989          workers.Add(slave);
00990    }
00991 
00992    // report std. output from slaves??
00993 
00994    ((TProof*)gProof)->ActivateAsyncInput();
00995 
00996    // This needs to be reset
00997    ((TProof*)gProof)->fCurrentMonitor = 0;
00998 
00999    // No reason to continue if invalid
01000    if (!fValid)
01001       return;
01002 
01003 
01004    // compute the offset for each file element
01005    Long64_t offset = 0;
01006    Long64_t newOffset = 0;
01007    TIter next(dset->GetListOfElements());
01008    TDSetElement *el;
01009    while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
01010       newOffset = offset + el->GetTDSetOffset();
01011       el->SetTDSetOffset(offset);
01012       offset = newOffset;
01013    }
01014 }
01015 
01016 //______________________________________________________________________________
01017 Long64_t TPacketizer::GetEntriesProcessed(TSlave *slave) const
01018 {
01019    // Get entries processed by the specified slave.
01020 
01021    if ( fSlaveStats == 0 ) return 0;
01022 
01023    TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
01024 
01025    if ( slstat == 0 ) return 0;
01026 
01027    return slstat->GetEntriesProcessed();
01028 }
01029 
01030 //______________________________________________________________________________
01031 Float_t TPacketizer::GetCurrentRate(Bool_t &all)
01032 {
01033    // Get Estimation of the current rate; just summing the current rates of
01034    // the active workers
01035 
01036    all = kTRUE;
01037    // Loop over the workers
01038    Float_t currate = 0.;
01039    if (fSlaveStats && fSlaveStats->GetSize() > 0) {
01040       TIter nxw(fSlaveStats);
01041       TObject *key;
01042       while ((key = nxw()) != 0) {
01043          TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01044          if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
01045             // Sum-up the current rates
01046             currate += slstat->GetProgressStatus()->GetCurrentRate();
01047          } else {
01048             all = kFALSE;
01049          }
01050       }
01051    }
01052    // Done
01053    return currate;
01054 }
01055 
01056 //______________________________________________________________________________
01057 TDSetElement *TPacketizer::GetNextPacket(TSlave *sl, TMessage *r)
01058 {
01059    // Get next packet
01060 
01061    if ( !fValid ) {
01062       return 0;
01063    }
01064 
01065    // find slave
01066 
01067    TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
01068 
01069    R__ASSERT( slstat != 0 );
01070 
01071    // update stats & free old element
01072 
01073    Bool_t firstPacket = kFALSE;
01074    if ( slstat->fCurElem != 0 ) {
01075       Double_t latency = 0., proctime = 0., proccpu = 0.;
01076       Long64_t bytesRead = -1;
01077       Long64_t totalEntries = -1;
01078       Long64_t totev = 0;
01079       Long64_t numev = slstat->fCurElem->GetNum();
01080 
01081       fPackets->Add(slstat->fCurElem);
01082 
01083       if (sl->GetProtocol() > 18) {
01084          TProofProgressStatus *status = 0;
01085          (*r) >> latency;
01086          (*r) >> status;
01087 
01088          // Calculate the progress made in the last packet
01089          TProofProgressStatus *progress = 0;
01090          if (status) {
01091             // upadte the worker status
01092             numev = status->GetEntries() - slstat->GetEntriesProcessed();
01093             progress = slstat->AddProcessed(status);
01094             if (progress) {
01095                // (*fProgressStatus) += *progress;
01096                proctime = progress->GetProcTime();
01097                proccpu  = progress->GetCPUTime();
01098                totev  = status->GetEntries(); // for backward compatibility
01099                bytesRead  = progress->GetBytesRead();
01100                delete progress;
01101             }
01102             delete status;
01103          } else
01104              Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
01105       } else {
01106 
01107          (*r) >> latency >> proctime >> proccpu;
01108 
01109          // only read new info if available
01110          if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
01111          if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
01112          if (r->BufferSize() > r->Length()) (*r) >> totev;
01113 
01114          numev = totev - slstat->GetEntriesProcessed();
01115          slstat->GetProgressStatus()->IncEntries(numev);
01116          slstat->GetProgressStatus()->IncBytesRead(bytesRead);
01117       }
01118 
01119       if (fProgressStatus) {
01120          if (numev > 0)  fProgressStatus->IncEntries(numev);
01121          if (bytesRead > 0)  fProgressStatus->IncBytesRead(bytesRead);
01122       }
01123       PDB(kPacketizer,2)
01124          Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
01125                               sl->GetOrdinal(), sl->GetName(),
01126                               numev, latency, proctime, proccpu, bytesRead);
01127 
01128       if (gPerfStats)
01129          gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
01130                                  numev, latency, proctime, proccpu, bytesRead);
01131 
01132       slstat->fCurElem = 0;
01133       if (fProgressStatus && fProgressStatus->GetEntries() == fTotalEntries) {
01134          HandleTimer(0);   // Send last timer message
01135          delete fProgress; fProgress = 0;
01136       }
01137    } else {
01138       firstPacket = kTRUE;
01139    }
01140 
01141    if ( fStop ) {
01142       HandleTimer(0);
01143       return 0;
01144    }
01145 
01146    // get a file if needed
01147 
01148    TFileStat *file = slstat->fCurFile;
01149 
01150    if ( file != 0 && file->IsDone() ) {
01151       file->GetNode()->DecSlaveCnt(slstat->GetName());
01152       if (gPerfStats)
01153          gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
01154                                file->GetElement()->GetFileName(), kFALSE);
01155       file = 0;
01156    }
01157    // Reset the current file field
01158    slstat->fCurFile = file;
01159 
01160    if (!file) {
01161 
01162       // Try its own node first
01163       if (slstat->GetFileNode() != 0) {
01164          file = GetNextUnAlloc(slstat->GetFileNode());
01165          if (!file) {
01166             slstat->SetFileNode(0);
01167          }
01168       }
01169 
01170       // try to find an unused filenode first
01171       if (!file) {
01172          file = GetNextUnAlloc();
01173       }
01174 
01175       // then look at the active filenodes
01176       if (!file) {
01177          file = GetNextActive();
01178       }
01179 
01180       if (!file) return 0;
01181 
01182       slstat->fCurFile = file;
01183       file->GetNode()->IncSlaveCnt(slstat->GetName());
01184       if (gPerfStats)
01185          gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
01186                                file->GetNode()->GetName(),
01187                                file->GetElement()->GetFileName(), kTRUE);
01188    }
01189 
01190    // get a packet
01191 
01192    TDSetElement *base = file->GetElement();
01193    Long64_t num = Long64_t(fPacketSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
01194    if (num < 1) num = 1;
01195 
01196    Long64_t first = file->GetNextEntry();
01197    Long64_t last = base->GetFirst() + base->GetNum();
01198 
01199    if ( first + num >= last ) {
01200       num = last - first;
01201       file->SetDone(); // done
01202 
01203       // delete file from active list (unalloc list is single pass, no delete needed)
01204       RemoveActive(file);
01205 
01206    } else {
01207       file->MoveNextEntry(num);
01208    }
01209 
01210 
01211    slstat->fCurElem = CreateNewPacket(base, first, num);
01212    if (base->GetEntryList())
01213       slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
01214 
01215    // Flag the first packet of a new run (dataset)
01216    if (firstPacket)
01217       slstat->fCurElem->SetBit(TDSetElement::kNewRun);
01218    else
01219       slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
01220 
01221    PDB(kPacketizer,2)
01222       Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
01223 
01224    return slstat->fCurElem;
01225 }
01226 
01227 //______________________________________________________________________________
01228 Int_t TPacketizer::GetActiveWorkers()
01229 {
01230    // Return the number of workers still processing
01231 
01232    Int_t actw = 0;
01233    TIter nxw(fSlaveStats);
01234    TObject *key;
01235    while ((key = nxw())) {
01236       TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
01237       if (wrkstat && wrkstat->fCurFile) actw++;
01238    }
01239    // Done
01240    return actw;
01241 }

Generated on Tue Jul 5 14:52:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1