TPacketizerUnit.cxx

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TPacketizerUnit.cxx 35120 2010-09-02 11:11:23Z ganis $
00002 // Author: Long Tran-Thanh    22/07/07
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TPacketizerUnit                                                      //
00015 //                                                                      //
00016 // This packetizer generates packets of generic units, representing the //
00017 // number of times an operation cycle has to be repeated by the worker  //
00018 // node, e.g. the number of Monte carlo events to be generated.         //
00019 // Packets sizes are generated taking into account the performance of   //
00020 // worker nodes, based on the time needed to process previous packets.  //
00021 //                                                                      //
00022 //////////////////////////////////////////////////////////////////////////
00023 
00024 
00025 #include "TPacketizerUnit.h"
00026 
00027 #include "Riostream.h"
00028 #include "TDSet.h"
00029 #include "TError.h"
00030 #include "TEventList.h"
00031 #include "TMap.h"
00032 #include "TMessage.h"
00033 #include "TMonitor.h"
00034 #include "TNtupleD.h"
00035 #include "TObject.h"
00036 #include "TParameter.h"
00037 #include "TPerfStats.h"
00038 #include "TProofDebug.h"
00039 #include "TProof.h"
00040 #include "TProofPlayer.h"
00041 #include "TProofServ.h"
00042 #include "TSlave.h"
00043 #include "TSocket.h"
00044 #include "TStopwatch.h"
00045 #include "TTimer.h"
00046 #include "TUrl.h"
00047 #include "TClass.h"
00048 #include "TMath.h"
00049 #include "TObjString.h"
00050 
00051 
00052 using namespace TMath;
00053 //
00054 // The following utility class manage the state of the
00055 // work to be performed and the slaves involved in the process.
00056 //
00057 // The list of TSlaveStat(s) keep track of the work (being) done
00058 // by each slave
00059 //
00060 
00061 //------------------------------------------------------------------------------
00062 
00063 class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
00064 
00065 friend class TPacketizerUnit;
00066 
00067 private:
00068    Long64_t  fLastProcessed; // number of processed entries of the last packet
00069    Double_t  fSpeed;         // estimated current average speed of the processing slave
00070    Double_t  fTimeInstant;   // stores the time instant when the current packet started
00071    TNtupleD *fCircNtp;       // Keeps circular info for speed calculations
00072    Long_t    fCircLvl;       // Circularity level
00073 
00074 public:
00075    TSlaveStat(TSlave *sl, TList *input);
00076    ~TSlaveStat();
00077 
00078    void        GetCurrentTime();
00079 
00080    void        UpdatePerformance(Double_t time);
00081    TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
00082 };
00083 
00084 //______________________________________________________________________________
00085 TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
00086                             : fLastProcessed(0),
00087                               fSpeed(0), fTimeInstant(0), fCircLvl(5)
00088 {
00089    // Main constructor
00090 
00091    // Initialize the circularity ntple for speed calculations
00092    fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
00093    TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
00094    fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
00095    fCircNtp->SetCircular(fCircLvl);
00096    fSlave = slave;
00097    fStatus = new TProofProgressStatus();
00098 }
00099 
00100 //______________________________________________________________________________
00101 TPacketizerUnit::TSlaveStat::~TSlaveStat()
00102 {
00103    // Destructor
00104 
00105    SafeDelete(fCircNtp);
00106 }
00107 
00108 //______________________________________________________________________________
00109 void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
00110 {
00111    // Update the circular ntple
00112 
00113    Double_t ttot = time;
00114    Double_t *ar = fCircNtp->GetArgs();
00115    Int_t ne = fCircNtp->GetEntries();
00116    if (ne <= 0) {
00117       // First call: just fill one ref entry and return
00118       fCircNtp->Fill(0., 0);
00119       fSpeed = 0.;
00120       return;
00121    }
00122    // Fill the entry
00123    fCircNtp->GetEntry(ne-1);
00124    ttot = ar[0] + time;
00125    fCircNtp->Fill(ttot, GetEntriesProcessed());
00126 
00127    // Calculate the speed
00128    fCircNtp->GetEntry(0);
00129    Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
00130    Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
00131    fSpeed = nevts / dtime;
00132    PDB(kPacketizer,2)
00133       Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
00134                                 time, dtime, nevts, fSpeed);
00135 
00136 }
00137 
00138 //______________________________________________________________________________
00139 TProofProgressStatus *TPacketizerUnit::TSlaveStat::AddProcessed(TProofProgressStatus *st)
00140 {
00141    // Update the status info to the 'st'.
00142    // return the difference (*st - *fStatus)
00143 
00144    if (st) {
00145       // The entriesis not correct in 'st'
00146       Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
00147       // The last proc time should not be added
00148       fStatus->SetLastProcTime(0.);
00149       // Get the diff
00150       TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
00151       *fStatus += *diff;
00152       // Set the correct value
00153       fStatus->SetLastEntries(lastEntries);
00154       return diff;
00155    } else {
00156       Error("AddProcessed", "status arg undefined");
00157       return 0;
00158    }
00159 }
00160 
00161 //------------------------------------------------------------------------------
00162 
00163 ClassImp(TPacketizerUnit)
00164 
00165 //______________________________________________________________________________
00166 TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input,
00167                                  TProofProgressStatus *st)
00168                 : TVirtualPacketizer(input, st)
00169 {
00170    // Constructor
00171 
00172    PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
00173 
00174    // Init pointer members
00175    fSlaveStats = 0;
00176    fPackets = 0;
00177 
00178    Int_t fixednum = -1;
00179    if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0)
00180       fixednum = 0;
00181    if (fixednum == 1)
00182       Info("TPacketizerUnit", "forcing the same cycles on each worker");
00183 
00184    if (TProof::GetParameter(input, "PROOF_PacketizerCalibNum", fCalibNum) != 0 || fCalibNum <= 0)
00185       fCalibNum = 5;
00186    PDB(kPacketizer,1)
00187       Info("TPacketizerUnit", "size of the calibration packets: %lld", fCalibNum);
00188 
00189    fMaxPacketTime = 1.;
00190    Double_t timeLimit = -1;
00191    if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
00192       fMaxPacketTime = timeLimit;
00193       Warning("TPacketizerUnit", "PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
00194    }
00195    PDB(kPacketizer,1)
00196       Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
00197    fProcessing = 0;
00198    fAssigned = 0;
00199 
00200    fStopwatch = new TStopwatch();
00201 
00202    fPackets = new TList;
00203    fPackets->SetOwner();
00204 
00205    fSlaveStats = new TMap;
00206    fSlaveStats->SetOwner(kFALSE);
00207 
00208    TSlave *slave;
00209    TIter si(slaves);
00210    while ((slave = (TSlave*) si.Next()))
00211       fSlaveStats->Add(slave, new TSlaveStat(slave, input));
00212 
00213    fTotalEntries = num;
00214    fNumPerWorker = -1;
00215    if (fixednum == 1 && fSlaveStats->GetSize() > 0) {
00216       // Approximate number: the exact number is determined in GetNextPacket
00217       fNumPerWorker = fTotalEntries / fSlaveStats->GetSize();
00218       if (fNumPerWorker == 0) fNumPerWorker = 1;
00219       if (fCalibNum >= fNumPerWorker) fCalibNum = 1;
00220    }
00221 
00222    // Save the config parameters in the dedicated list so that they will be saved
00223    // in the outputlist and therefore in the relevant TQueryResult
00224    fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
00225    fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerCalibNum", fCalibNum));
00226 
00227    fStopwatch->Start();
00228    PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
00229 }
00230 
00231 //______________________________________________________________________________
00232 TPacketizerUnit::~TPacketizerUnit()
00233 {
00234    // Destructor.
00235 
00236    if (fSlaveStats)
00237       fSlaveStats->DeleteValues();
00238    SafeDelete(fSlaveStats);
00239    SafeDelete(fPackets);
00240    SafeDelete(fStopwatch);
00241 }
00242 
00243 //______________________________________________________________________________
00244 Double_t TPacketizerUnit::GetCurrentTime()
00245 {
00246    // Get current time
00247 
00248    Double_t retValue = fStopwatch->RealTime();
00249    fStopwatch->Continue();
00250    return retValue;
00251 }
00252 
00253 //______________________________________________________________________________
00254 Float_t TPacketizerUnit::GetCurrentRate(Bool_t &all)
00255 {
00256    // Get Estimation of the current rate; just summing the current rates of
00257    // the active workers
00258 
00259    all = kTRUE;
00260    // Loop over the workers
00261    Float_t currate = 0.;
00262    if (fSlaveStats && fSlaveStats->GetSize() > 0) {
00263       TIter nxw(fSlaveStats);
00264       TObject *key;
00265       while ((key = nxw()) != 0) {
00266          TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
00267          if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
00268             // Sum-up the current rates
00269             currate += slstat->GetProgressStatus()->GetCurrentRate();
00270          } else {
00271             all = kFALSE;
00272          }
00273       }
00274    }
00275    // Done
00276    return currate;
00277 }
00278 
00279 //______________________________________________________________________________
00280 TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r)
00281 {
00282    // Get next packet
00283 
00284    if (!fValid)
00285       return 0;
00286 
00287    // Find slave
00288    TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(sl);
00289    R__ASSERT(slstat != 0);
00290 
00291    PDB(kPacketizer,2)
00292       Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
00293 
00294    // Update stats & free old element
00295    Double_t latency = 0., proctime = 0., proccpu = 0.;
00296    Long64_t bytesRead = -1;
00297    Long64_t totalEntries = -1; // used only to read an old message type
00298    Long64_t totev = 0;
00299    Long64_t numev = -1;
00300 
00301    TProofProgressStatus *status = 0;
00302    if (sl->GetProtocol() > 18) {
00303       (*r) >> latency;
00304       (*r) >> status;
00305 
00306       // Calculate the progress made in the last packet
00307       TProofProgressStatus *progress = 0;
00308       if (status) {
00309          // upadte the worker status
00310          numev = status->GetEntries() - slstat->GetEntriesProcessed();
00311          progress = slstat->AddProcessed(status);
00312          if (progress) {
00313             // (*fProgressStatus) += *progress;
00314             proctime = progress->GetProcTime();
00315             proccpu  = progress->GetCPUTime();
00316             totev  = status->GetEntries(); // for backward compatibility
00317             bytesRead  = progress->GetBytesRead();
00318             delete progress;
00319          }
00320          delete status;
00321       } else
00322           Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
00323    } else {
00324 
00325       (*r) >> latency >> proctime >> proccpu;
00326 
00327       // only read new info if available
00328       if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
00329       if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
00330       if (r->BufferSize() > r->Length()) (*r) >> totev;
00331 
00332       numev = totev - slstat->GetEntriesProcessed();
00333       slstat->GetProgressStatus()->IncEntries(numev);
00334    }
00335 
00336    fProgressStatus->IncEntries(numev);
00337 
00338    fProcessing = 0;
00339 
00340    PDB(kPacketizer,2)
00341       Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
00342                            sl->GetOrdinal(), sl->GetName(),
00343                            numev, latency, proctime, proccpu, bytesRead);
00344 
00345    if (gPerfStats != 0) {
00346       gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
00347                               latency, proctime, proccpu, bytesRead);
00348    }
00349 
00350    if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
00351       PDB(kPacketizer,2)
00352          Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
00353                            sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
00354       return 0;
00355    }
00356 
00357    if (fAssigned == fTotalEntries) {
00358       // Send last timer message
00359       HandleTimer(0);
00360       return 0;
00361    }
00362 
00363    if (fStop) {
00364       // Send last timer message
00365       HandleTimer(0);
00366       return 0;
00367    }
00368 
00369 
00370    Long64_t num;
00371 
00372    // Get the current time
00373    Double_t cTime = GetCurrentTime();
00374 
00375    if (slstat->fCircNtp->GetEntries() <= 0) {
00376       // The calibration phase
00377       PDB(kPacketizer,2)
00378          Info("GetNextPacket", "calibration: total entries %lld, workers %d",
00379                                fTotalEntries, fSlaveStats->GetSize());
00380       Long64_t avg = fTotalEntries / fSlaveStats->GetSize();
00381       num = (avg > fCalibNum) ? fCalibNum : avg;
00382 
00383       // Create a reference entry
00384       slstat->UpdatePerformance(0.);
00385 
00386    } else {
00387       if (fNumPerWorker < 0) {
00388          // Schedule tasks for workers based on the currently estimated processing speeds
00389 
00390          // Update performances
00391          // slstat->fStatus was updated before;
00392          slstat->UpdatePerformance(proctime);
00393 
00394          TMapIter *iter = (TMapIter *)fSlaveStats->MakeIterator();
00395          TSlave * tmpSlave;
00396          TSlaveStat * tmpStat;
00397 
00398          Double_t sumSpeed = 0;
00399          Double_t sumBusy = 0;
00400 
00401          // The basic idea is to estimate the optimal finishing time for the process, assuming
00402          // that the currently measured processing speeds of the slaves remain the same in the future.
00403          // The optTime can be calculated as follows.
00404          // Let s_i be the speed of worker i. We assume that worker i will be busy in the
00405          // next b_i time instant. Therefore we have the following equation:
00406          //                 SUM((optTime-b_i)*s_i) = (remaining_entries),
00407          // Hence optTime can be calculated easily:
00408          //                 optTime = ((remaining_entries) + SUM(b_i*s_i))/SUM(s_i)
00409 
00410          while ((tmpSlave = (TSlave *)iter->Next())) {
00411             tmpStat = (TSlaveStat *)fSlaveStats->GetValue(tmpSlave);
00412             // If the slave doesn't response for a long time, its service rate will be considered as 0
00413             if ((cTime - tmpStat->fTimeInstant) > 4*fMaxPacketTime)
00414                tmpStat->fSpeed = 0;
00415             PDB(kPacketizer,2)
00416                Info("GetNextPacket",
00417                   "worker-%s: speed %lf", tmpSlave->GetOrdinal(), tmpStat->fSpeed);
00418             if (tmpStat->fSpeed > 0) {
00419                // Calculating the SUM(s_i)
00420                sumSpeed += tmpStat->fSpeed;
00421                // There is nothing to do if slave_i is not calibrated or slave i is the current slave
00422                if ((tmpStat->fTimeInstant) && (cTime - tmpStat->fTimeInstant > 0)) {
00423                   // Calculating the SUM(b_i*s_i)
00424                   //      s_i = tmpStat->fSpeed
00425                   //      b_i = tmpStat->fTimeInstant + tmpStat->fLastProcessed/s_i - cTime)
00426                   //  b_i*s_i = (tmpStat->fTimeInstant - cTime)*s_i + tmpStat->fLastProcessed
00427                   Double_t busyspeed =
00428                      ((tmpStat->fTimeInstant - cTime)*tmpStat->fSpeed + tmpStat->fLastProcessed);
00429                   if (busyspeed > 0)
00430                      sumBusy += busyspeed;
00431                }
00432             }
00433          }
00434          SafeDelete(iter);
00435          PDB(kPacketizer,2)
00436             Info("GetNextPacket", "worker-%s: sum speed: %lf, sum busy: %f",
00437                                  sl->GetOrdinal(), sumSpeed, sumBusy);
00438          // firstly the slave will try to get all of the remaining entries
00439          if (slstat->fSpeed > 0 &&
00440             (fTotalEntries - fAssigned)/(slstat->fSpeed) < fMaxPacketTime) {
00441             num = (fTotalEntries - fAssigned);
00442          } else {
00443             if (slstat->fSpeed > 0) {
00444                // calculating the optTime
00445                Double_t optTime = (fTotalEntries - fAssigned + sumBusy)/sumSpeed;
00446                // if optTime is greater than the official time limit, then the slave gets a number
00447                // of entries that still fit into the time limit, otherwise it uses the optTime as
00448                // a time limit
00449                num = (optTime > fMaxPacketTime) ? Nint(fMaxPacketTime*(slstat->fSpeed))
00450                                           : Nint(optTime*(slstat->fSpeed));
00451             PDB(kPacketizer,2)
00452                Info("GetNextPacket", "opTime %lf num %lld speed %lf", optTime, num, slstat->fSpeed);
00453             } else {
00454                Long64_t avg = fTotalEntries/(fSlaveStats->GetSize());
00455                num = (avg > 5) ? 5 : avg;
00456             }
00457          }
00458       } else {
00459          // Fixed number of cycles per worker
00460          num = fNumPerWorker - slstat->fLastProcessed;
00461          if (num > 1 && slstat->fSpeed > 0 && num / slstat->fSpeed > fMaxPacketTime) {
00462             num = (Long64_t) (slstat->fSpeed * fMaxPacketTime);
00463          }
00464       }
00465    }
00466    // Minimum packet size
00467    num = (num > 1) ? num : 1;
00468    fProcessing = (num < (fTotalEntries - fAssigned)) ? num
00469                                                      : (fTotalEntries - fAssigned);
00470 
00471    // Set the informations of the current slave
00472    slstat->fLastProcessed = fProcessing;
00473    // Set the start time of the current packet
00474    slstat->fTimeInstant = cTime;
00475 
00476    PDB(kPacketizer,2)
00477       Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
00478                             num, fProcessing, (fTotalEntries - fAssigned - fProcessing));
00479    TDSetElement *elem = new TDSetElement("", "", "", fAssigned, fProcessing);
00480    elem->SetBit(TDSetElement::kEmpty);
00481 
00482    // Update the total counter
00483    fAssigned += slstat->fLastProcessed;
00484 
00485    return elem;
00486 }

Generated on Tue Jul 5 14:52:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1