00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "TPacketizerUnit.h"
00026
00027 #include "Riostream.h"
00028 #include "TDSet.h"
00029 #include "TError.h"
00030 #include "TEventList.h"
00031 #include "TMap.h"
00032 #include "TMessage.h"
00033 #include "TMonitor.h"
00034 #include "TNtupleD.h"
00035 #include "TObject.h"
00036 #include "TParameter.h"
00037 #include "TPerfStats.h"
00038 #include "TProofDebug.h"
00039 #include "TProof.h"
00040 #include "TProofPlayer.h"
00041 #include "TProofServ.h"
00042 #include "TSlave.h"
00043 #include "TSocket.h"
00044 #include "TStopwatch.h"
00045 #include "TTimer.h"
00046 #include "TUrl.h"
00047 #include "TClass.h"
00048 #include "TMath.h"
00049 #include "TObjString.h"
00050
00051
00052 using namespace TMath;
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063 class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
00064
00065 friend class TPacketizerUnit;
00066
00067 private:
00068 Long64_t fLastProcessed;
00069 Double_t fSpeed;
00070 Double_t fTimeInstant;
00071 TNtupleD *fCircNtp;
00072 Long_t fCircLvl;
00073
00074 public:
00075 TSlaveStat(TSlave *sl, TList *input);
00076 ~TSlaveStat();
00077
00078 void GetCurrentTime();
00079
00080 void UpdatePerformance(Double_t time);
00081 TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
00082 };
00083
00084
00085 TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
00086 : fLastProcessed(0),
00087 fSpeed(0), fTimeInstant(0), fCircLvl(5)
00088 {
00089
00090
00091
00092 fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
00093 TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
00094 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
00095 fCircNtp->SetCircular(fCircLvl);
00096 fSlave = slave;
00097 fStatus = new TProofProgressStatus();
00098 }
00099
00100
00101 TPacketizerUnit::TSlaveStat::~TSlaveStat()
00102 {
00103
00104
00105 SafeDelete(fCircNtp);
00106 }
00107
00108
00109 void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
00110 {
00111
00112
00113 Double_t ttot = time;
00114 Double_t *ar = fCircNtp->GetArgs();
00115 Int_t ne = fCircNtp->GetEntries();
00116 if (ne <= 0) {
00117
00118 fCircNtp->Fill(0., 0);
00119 fSpeed = 0.;
00120 return;
00121 }
00122
00123 fCircNtp->GetEntry(ne-1);
00124 ttot = ar[0] + time;
00125 fCircNtp->Fill(ttot, GetEntriesProcessed());
00126
00127
00128 fCircNtp->GetEntry(0);
00129 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
00130 Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
00131 fSpeed = nevts / dtime;
00132 PDB(kPacketizer,2)
00133 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
00134 time, dtime, nevts, fSpeed);
00135
00136 }
00137
00138
00139 TProofProgressStatus *TPacketizerUnit::TSlaveStat::AddProcessed(TProofProgressStatus *st)
00140 {
00141
00142
00143
00144 if (st) {
00145
00146 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
00147
00148 fStatus->SetLastProcTime(0.);
00149
00150 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
00151 *fStatus += *diff;
00152
00153 fStatus->SetLastEntries(lastEntries);
00154 return diff;
00155 } else {
00156 Error("AddProcessed", "status arg undefined");
00157 return 0;
00158 }
00159 }
00160
00161
00162
00163 ClassImp(TPacketizerUnit)
00164
00165
00166 TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input,
00167 TProofProgressStatus *st)
00168 : TVirtualPacketizer(input, st)
00169 {
00170
00171
00172 PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
00173
00174
00175 fSlaveStats = 0;
00176 fPackets = 0;
00177
00178 Int_t fixednum = -1;
00179 if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0)
00180 fixednum = 0;
00181 if (fixednum == 1)
00182 Info("TPacketizerUnit", "forcing the same cycles on each worker");
00183
00184 if (TProof::GetParameter(input, "PROOF_PacketizerCalibNum", fCalibNum) != 0 || fCalibNum <= 0)
00185 fCalibNum = 5;
00186 PDB(kPacketizer,1)
00187 Info("TPacketizerUnit", "size of the calibration packets: %lld", fCalibNum);
00188
00189 fMaxPacketTime = 1.;
00190 Double_t timeLimit = -1;
00191 if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
00192 fMaxPacketTime = timeLimit;
00193 Warning("TPacketizerUnit", "PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
00194 }
00195 PDB(kPacketizer,1)
00196 Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
00197 fProcessing = 0;
00198 fAssigned = 0;
00199
00200 fStopwatch = new TStopwatch();
00201
00202 fPackets = new TList;
00203 fPackets->SetOwner();
00204
00205 fSlaveStats = new TMap;
00206 fSlaveStats->SetOwner(kFALSE);
00207
00208 TSlave *slave;
00209 TIter si(slaves);
00210 while ((slave = (TSlave*) si.Next()))
00211 fSlaveStats->Add(slave, new TSlaveStat(slave, input));
00212
00213 fTotalEntries = num;
00214 fNumPerWorker = -1;
00215 if (fixednum == 1 && fSlaveStats->GetSize() > 0) {
00216
00217 fNumPerWorker = fTotalEntries / fSlaveStats->GetSize();
00218 if (fNumPerWorker == 0) fNumPerWorker = 1;
00219 if (fCalibNum >= fNumPerWorker) fCalibNum = 1;
00220 }
00221
00222
00223
00224 fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
00225 fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerCalibNum", fCalibNum));
00226
00227 fStopwatch->Start();
00228 PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
00229 }
00230
00231
00232 TPacketizerUnit::~TPacketizerUnit()
00233 {
00234
00235
00236 if (fSlaveStats)
00237 fSlaveStats->DeleteValues();
00238 SafeDelete(fSlaveStats);
00239 SafeDelete(fPackets);
00240 SafeDelete(fStopwatch);
00241 }
00242
00243
00244 Double_t TPacketizerUnit::GetCurrentTime()
00245 {
00246
00247
00248 Double_t retValue = fStopwatch->RealTime();
00249 fStopwatch->Continue();
00250 return retValue;
00251 }
00252
00253
00254 Float_t TPacketizerUnit::GetCurrentRate(Bool_t &all)
00255 {
00256
00257
00258
00259 all = kTRUE;
00260
00261 Float_t currate = 0.;
00262 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
00263 TIter nxw(fSlaveStats);
00264 TObject *key;
00265 while ((key = nxw()) != 0) {
00266 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
00267 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
00268
00269 currate += slstat->GetProgressStatus()->GetCurrentRate();
00270 } else {
00271 all = kFALSE;
00272 }
00273 }
00274 }
00275
00276 return currate;
00277 }
00278
00279
00280 TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r)
00281 {
00282
00283
00284 if (!fValid)
00285 return 0;
00286
00287
00288 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(sl);
00289 R__ASSERT(slstat != 0);
00290
00291 PDB(kPacketizer,2)
00292 Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
00293
00294
00295 Double_t latency = 0., proctime = 0., proccpu = 0.;
00296 Long64_t bytesRead = -1;
00297 Long64_t totalEntries = -1;
00298 Long64_t totev = 0;
00299 Long64_t numev = -1;
00300
00301 TProofProgressStatus *status = 0;
00302 if (sl->GetProtocol() > 18) {
00303 (*r) >> latency;
00304 (*r) >> status;
00305
00306
00307 TProofProgressStatus *progress = 0;
00308 if (status) {
00309
00310 numev = status->GetEntries() - slstat->GetEntriesProcessed();
00311 progress = slstat->AddProcessed(status);
00312 if (progress) {
00313
00314 proctime = progress->GetProcTime();
00315 proccpu = progress->GetCPUTime();
00316 totev = status->GetEntries();
00317 bytesRead = progress->GetBytesRead();
00318 delete progress;
00319 }
00320 delete status;
00321 } else
00322 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
00323 } else {
00324
00325 (*r) >> latency >> proctime >> proccpu;
00326
00327
00328 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
00329 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
00330 if (r->BufferSize() > r->Length()) (*r) >> totev;
00331
00332 numev = totev - slstat->GetEntriesProcessed();
00333 slstat->GetProgressStatus()->IncEntries(numev);
00334 }
00335
00336 fProgressStatus->IncEntries(numev);
00337
00338 fProcessing = 0;
00339
00340 PDB(kPacketizer,2)
00341 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
00342 sl->GetOrdinal(), sl->GetName(),
00343 numev, latency, proctime, proccpu, bytesRead);
00344
00345 if (gPerfStats != 0) {
00346 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
00347 latency, proctime, proccpu, bytesRead);
00348 }
00349
00350 if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
00351 PDB(kPacketizer,2)
00352 Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
00353 sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
00354 return 0;
00355 }
00356
00357 if (fAssigned == fTotalEntries) {
00358
00359 HandleTimer(0);
00360 return 0;
00361 }
00362
00363 if (fStop) {
00364
00365 HandleTimer(0);
00366 return 0;
00367 }
00368
00369
00370 Long64_t num;
00371
00372
00373 Double_t cTime = GetCurrentTime();
00374
00375 if (slstat->fCircNtp->GetEntries() <= 0) {
00376
00377 PDB(kPacketizer,2)
00378 Info("GetNextPacket", "calibration: total entries %lld, workers %d",
00379 fTotalEntries, fSlaveStats->GetSize());
00380 Long64_t avg = fTotalEntries / fSlaveStats->GetSize();
00381 num = (avg > fCalibNum) ? fCalibNum : avg;
00382
00383
00384 slstat->UpdatePerformance(0.);
00385
00386 } else {
00387 if (fNumPerWorker < 0) {
00388
00389
00390
00391
00392 slstat->UpdatePerformance(proctime);
00393
00394 TMapIter *iter = (TMapIter *)fSlaveStats->MakeIterator();
00395 TSlave * tmpSlave;
00396 TSlaveStat * tmpStat;
00397
00398 Double_t sumSpeed = 0;
00399 Double_t sumBusy = 0;
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410 while ((tmpSlave = (TSlave *)iter->Next())) {
00411 tmpStat = (TSlaveStat *)fSlaveStats->GetValue(tmpSlave);
00412
00413 if ((cTime - tmpStat->fTimeInstant) > 4*fMaxPacketTime)
00414 tmpStat->fSpeed = 0;
00415 PDB(kPacketizer,2)
00416 Info("GetNextPacket",
00417 "worker-%s: speed %lf", tmpSlave->GetOrdinal(), tmpStat->fSpeed);
00418 if (tmpStat->fSpeed > 0) {
00419
00420 sumSpeed += tmpStat->fSpeed;
00421
00422 if ((tmpStat->fTimeInstant) && (cTime - tmpStat->fTimeInstant > 0)) {
00423
00424
00425
00426
00427 Double_t busyspeed =
00428 ((tmpStat->fTimeInstant - cTime)*tmpStat->fSpeed + tmpStat->fLastProcessed);
00429 if (busyspeed > 0)
00430 sumBusy += busyspeed;
00431 }
00432 }
00433 }
00434 SafeDelete(iter);
00435 PDB(kPacketizer,2)
00436 Info("GetNextPacket", "worker-%s: sum speed: %lf, sum busy: %f",
00437 sl->GetOrdinal(), sumSpeed, sumBusy);
00438
00439 if (slstat->fSpeed > 0 &&
00440 (fTotalEntries - fAssigned)/(slstat->fSpeed) < fMaxPacketTime) {
00441 num = (fTotalEntries - fAssigned);
00442 } else {
00443 if (slstat->fSpeed > 0) {
00444
00445 Double_t optTime = (fTotalEntries - fAssigned + sumBusy)/sumSpeed;
00446
00447
00448
00449 num = (optTime > fMaxPacketTime) ? Nint(fMaxPacketTime*(slstat->fSpeed))
00450 : Nint(optTime*(slstat->fSpeed));
00451 PDB(kPacketizer,2)
00452 Info("GetNextPacket", "opTime %lf num %lld speed %lf", optTime, num, slstat->fSpeed);
00453 } else {
00454 Long64_t avg = fTotalEntries/(fSlaveStats->GetSize());
00455 num = (avg > 5) ? 5 : avg;
00456 }
00457 }
00458 } else {
00459
00460 num = fNumPerWorker - slstat->fLastProcessed;
00461 if (num > 1 && slstat->fSpeed > 0 && num / slstat->fSpeed > fMaxPacketTime) {
00462 num = (Long64_t) (slstat->fSpeed * fMaxPacketTime);
00463 }
00464 }
00465 }
00466
00467 num = (num > 1) ? num : 1;
00468 fProcessing = (num < (fTotalEntries - fAssigned)) ? num
00469 : (fTotalEntries - fAssigned);
00470
00471
00472 slstat->fLastProcessed = fProcessing;
00473
00474 slstat->fTimeInstant = cTime;
00475
00476 PDB(kPacketizer,2)
00477 Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
00478 num, fProcessing, (fTotalEntries - fAssigned - fProcessing));
00479 TDSetElement *elem = new TDSetElement("", "", "", fAssigned, fProcessing);
00480 elem->SetBit(TDSetElement::kEmpty);
00481
00482
00483 fAssigned += slstat->fLastProcessed;
00484
00485 return elem;
00486 }