00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "TPacketizerFile.h"
00023
00024 #include "Riostream.h"
00025 #include "TDSet.h"
00026 #include "TError.h"
00027 #include "TEventList.h"
00028 #include "TMap.h"
00029 #include "TMessage.h"
00030 #include "TMonitor.h"
00031 #include "TNtupleD.h"
00032 #include "TObject.h"
00033 #include "TParameter.h"
00034 #include "TPerfStats.h"
00035 #include "TProofDebug.h"
00036 #include "TProof.h"
00037 #include "TProofPlayer.h"
00038 #include "TProofServ.h"
00039 #include "TSlave.h"
00040 #include "TSocket.h"
00041 #include "TStopwatch.h"
00042 #include "TTimer.h"
00043 #include "TUrl.h"
00044 #include "TClass.h"
00045 #include "TMath.h"
00046 #include "TObjString.h"
00047 #include "TFileInfo.h"
00048 #include "TFileCollection.h"
00049 #include "THashList.h"
00050
00051
00052
00053 class TPacketizerFile::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
00054
00055 friend class TPacketizerFile;
00056
00057 private:
00058 Long64_t fLastProcessed;
00059 Double_t fSpeed;
00060 Double_t fTimeInstant;
00061 TNtupleD *fCircNtp;
00062 Long_t fCircLvl;
00063
00064 public:
00065 TSlaveStat(TSlave *sl, TList *input);
00066 ~TSlaveStat();
00067
00068 void GetCurrentTime();
00069
00070 void UpdatePerformance(Double_t time);
00071 TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
00072 };
00073
00074
00075 class TPacketizerFile::TIterObj : public TObject {
00076
00077 private:
00078 TString fName;
00079 TIter *fIter;
00080
00081 public:
00082 TIterObj(const char *n, TIter *iter) : fName(n), fIter(iter) { }
00083 virtual ~TIterObj() { if (fIter) delete fIter; }
00084
00085 const char *GetName() const {return fName;}
00086 TIter *GetIter() const {return fIter;}
00087 void Print(Option_t* option = "") const;
00088 };
00089
00090 ClassImp(TPacketizerFile)
00091
00092
00093 TPacketizerFile::TPacketizerFile(TList *workers, Long64_t, TList *input,
00094 TProofProgressStatus *st)
00095 : TVirtualPacketizer(input, st)
00096 {
00097
00098
00099 PDB(kPacketizer,1) Info("TPacketizerFile", "enter");
00100 ResetBit(TObject::kInvalidObject);
00101 fValid = kFALSE;
00102 fAssigned = 0;
00103 fProcNotAssigned = kTRUE;
00104
00105 if (!input || (input && input->GetSize() <= 0)) {
00106 Error("TPacketizerFile", "input file is undefined or empty!");
00107 SetBit(TObject::kInvalidObject);
00108 return;
00109 }
00110
00111
00112 Int_t procnotass = 1;
00113 if (TProof::GetParameter(input, "PROOF_ProcessNotAssigned", procnotass) == 0) {
00114 if (procnotass == 0) {
00115 Info("TPacketizerFile", "files not assigned to workers will not be processed");
00116 fProcNotAssigned = kFALSE;
00117 }
00118 }
00119
00120
00121 if (!(fFiles = dynamic_cast<TMap *>(input->FindObject("PROOF_FilesToProcess")))) {
00122 Error("TPacketizerFile", "map of files to be processed/created not found");
00123 SetBit(TObject::kInvalidObject);
00124 return;
00125 }
00126
00127
00128 fSlaveStats = new TMap;
00129 fSlaveStats->SetOwner(kFALSE);
00130
00131 TList nodes;
00132 nodes.SetOwner(kTRUE);
00133 TSlave *wrk;
00134 TIter si(workers);
00135 while ((wrk = (TSlave *) si.Next())) {
00136 fSlaveStats->Add(wrk, new TSlaveStat(wrk, input));
00137 TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
00138 Info("TPacketizerFile", "worker: %s", wrkname.Data());
00139 if (!nodes.FindObject(wrkname)) nodes.Add(new TObjString(wrkname));
00140 }
00141
00142
00143 fIters = new TList;
00144 fIters->SetOwner(kTRUE);
00145
00146
00147 fTotalEntries = 0;
00148 fNotAssigned = new TList;
00149 fNotAssigned->SetName("*");
00150 TIter nxl(fFiles);
00151 TObject *key, *o = 0;
00152 while ((key = nxl()) != 0) {
00153 THashList *wrklist = dynamic_cast<THashList *>(fFiles->GetValue(key));
00154 if (!wrklist) {
00155 TFileCollection *fc = dynamic_cast<TFileCollection *>(fFiles->GetValue(key));
00156 if (fc) wrklist = fc->GetList();
00157 }
00158 if (wrklist) {
00159 TString hname = TUrl(key->GetName()).GetHostFQDN();
00160 if ((o = nodes.FindObject(hname))) {
00161 fTotalEntries += wrklist->GetSize();
00162 fIters->Add(new TIterObj(hname, new TIter(wrklist)));
00163
00164 PDB(kPacketizer,2)
00165 Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') assigned to '%s'",
00166 wrklist->GetSize(), key->GetName(), hname.Data(), o->GetName());
00167 } else {
00168
00169
00170 TIter nxf(wrklist);
00171 while ((o = nxf()))
00172 fNotAssigned->Add(o);
00173
00174 PDB(kPacketizer,2)
00175 Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') not assigned",
00176 wrklist->GetSize(), key->GetName(), hname.Data());
00177 }
00178 }
00179 }
00180 if (fNotAssigned && fNotAssigned->GetSize() > 0) {
00181 fTotalEntries += fNotAssigned->GetSize();
00182 fIters->Add(new TIterObj("*", new TIter(fNotAssigned)));
00183 Info("TPacketizerFile", "non-assigned files: %d", fNotAssigned->GetSize());
00184 fNotAssigned->Print();
00185 }
00186 if (fTotalEntries <= 0) {
00187 Error("TPacketizerFile", "no file path in the map!");
00188 SetBit(TObject::kInvalidObject);
00189 SafeDelete(fIters);
00190 return;
00191 } else {
00192 Info("TPacketizerFile", "processing %lld files", fTotalEntries);
00193 fIters->Print();
00194 }
00195
00196 fStopwatch = new TStopwatch();
00197 fStopwatch->Start();
00198 fValid = kTRUE;
00199 PDB(kPacketizer,1) Info("TPacketizerFile", "return");
00200
00201
00202 return;
00203 }
00204
00205
00206 TPacketizerFile::~TPacketizerFile()
00207 {
00208
00209
00210 if (fNotAssigned) fNotAssigned->SetOwner(kFALSE);
00211 SafeDelete(fNotAssigned);
00212 if (fIters) fIters->SetOwner(kTRUE);
00213 SafeDelete(fIters);
00214 SafeDelete(fStopwatch);
00215 }
00216
00217
00218 Double_t TPacketizerFile::GetCurrentTime()
00219 {
00220
00221
00222 Double_t retValue = fStopwatch->RealTime();
00223 fStopwatch->Continue();
00224 return retValue;
00225 }
00226
00227
00228 Float_t TPacketizerFile::GetCurrentRate(Bool_t &all)
00229 {
00230
00231
00232
00233 all = kTRUE;
00234
00235 Float_t currate = 0.;
00236 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
00237 TIter nxw(fSlaveStats);
00238 TObject *key;
00239 while ((key = nxw()) != 0) {
00240 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
00241 if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
00242
00243 currate += wrkstat->GetProgressStatus()->GetCurrentRate();
00244 } else {
00245 all = kFALSE;
00246 }
00247 }
00248 }
00249
00250 return currate;
00251 }
00252
00253
00254 TDSetElement *TPacketizerFile::GetNextPacket(TSlave *wrk, TMessage *r)
00255 {
00256
00257
00258 TDSetElement *elem = 0;
00259 if (!fValid) return elem;
00260
00261
00262 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(wrk);
00263 if (!wrkstat) {
00264 Error("GetNextPacket", "could not find stat object for worker '%s'!", wrk->GetName());
00265 return elem;
00266 }
00267
00268 PDB(kPacketizer,2)
00269 Info("GetNextPacket","worker-%s: fAssigned %lld / %lld", wrk->GetOrdinal(), fAssigned, fTotalEntries);
00270
00271
00272 Double_t latency = 0., proctime = 0., proccpu = 0.;
00273 Long64_t bytesRead = -1;
00274 Long64_t totalEntries = -1;
00275 Long64_t totev = 0;
00276 Long64_t numev = -1;
00277
00278 TProofProgressStatus *status = 0;
00279 if (wrk->GetProtocol() > 18) {
00280 (*r) >> latency;
00281 (*r) >> status;
00282
00283
00284 TProofProgressStatus *progress = 0;
00285 if (status) {
00286
00287 numev = status->GetEntries() - wrkstat->GetEntriesProcessed();
00288 progress = wrkstat->AddProcessed(status);
00289 if (progress) {
00290
00291 proctime = progress->GetProcTime();
00292 proccpu = progress->GetCPUTime();
00293 totev = status->GetEntries();
00294 bytesRead = progress->GetBytesRead();
00295 delete progress;
00296 }
00297 delete status;
00298 } else
00299 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
00300 } else {
00301
00302 (*r) >> latency >> proctime >> proccpu;
00303
00304
00305 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
00306 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
00307 if (r->BufferSize() > r->Length()) (*r) >> totev;
00308
00309 numev = totev - wrkstat->GetEntriesProcessed();
00310 wrkstat->GetProgressStatus()->IncEntries(numev);
00311 }
00312
00313 fProgressStatus->IncEntries(numev);
00314
00315 PDB(kPacketizer,2)
00316 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
00317 wrk->GetOrdinal(), wrk->GetName(),
00318 numev, latency, proctime, proccpu, bytesRead);
00319
00320 if (gPerfStats != 0) {
00321 gPerfStats->PacketEvent(wrk->GetOrdinal(), wrk->GetName(), "", numev,
00322 latency, proctime, proccpu, bytesRead);
00323 }
00324
00325 if (fAssigned == fTotalEntries) {
00326
00327 HandleTimer(0);
00328 return 0;
00329 }
00330
00331 if (fStop) {
00332
00333 HandleTimer(0);
00334 return 0;
00335 }
00336
00337 PDB(kPacketizer,2)
00338 Info("GetNextPacket", "worker-%s (%s): getting next files ... ", wrk->GetOrdinal(),
00339 wrk->GetName());
00340
00341
00342 TObject *nextfile = 0;
00343
00344
00345 TIterObj *io = dynamic_cast<TIterObj *>(fIters->FindObject(wrk->GetName()));
00346 if (io) {
00347
00348 if (io->GetIter())
00349 nextfile = io->GetIter()->Next();
00350 }
00351
00352
00353
00354 if (!nextfile && fProcNotAssigned) {
00355 if ((io = dynamic_cast<TIterObj *>(fIters->FindObject("*")))) {
00356
00357 if (io->GetIter())
00358 nextfile = io->GetIter()->Next();
00359 }
00360 }
00361
00362
00363 if (!nextfile) return elem;
00364
00365
00366 TString filename;
00367 TObjString *os = 0;
00368 if ((os = dynamic_cast<TObjString *>(nextfile))) {
00369 filename = os->GetName();
00370 } else {
00371 TFileInfo *fi = 0;
00372 if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
00373 filename = fi->GetCurrentUrl()->GetUrl();
00374 }
00375
00376 if (filename.IsNull()) {
00377 Warning("GetNextPacket", "found unsupported object of type '%s' in list: it must"
00378 " be 'TObjString' or 'TFileInfo'", nextfile->GetName());
00379 return elem;
00380 }
00381
00382 PDB(kPacketizer,2)
00383 Info("GetNextPacket", "worker-%s: assigning: '%s' (remaining %lld files)",
00384 wrk->GetOrdinal(), filename.Data(), (fTotalEntries - fAssigned));
00385 elem = new TDSetElement(filename, "", "", 0, 1);
00386 elem->SetBit(TDSetElement::kEmpty);
00387
00388
00389 fAssigned += 1;
00390
00391 return elem;
00392 }
00393
00394
00395
00396
00397 TPacketizerFile::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
00398 : fLastProcessed(0),
00399 fSpeed(0), fTimeInstant(0), fCircLvl(5)
00400 {
00401
00402
00403
00404 fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
00405 TProof::GetParameter(input, "PROOF_TPacketizerFileCircularity", fCircLvl);
00406 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
00407 fCircNtp->SetCircular(fCircLvl);
00408 fSlave = slave;
00409 fStatus = new TProofProgressStatus();
00410 }
00411
00412
00413 TPacketizerFile::TSlaveStat::~TSlaveStat()
00414 {
00415
00416
00417 SafeDelete(fCircNtp);
00418 }
00419
00420
00421 void TPacketizerFile::TSlaveStat::UpdatePerformance(Double_t time)
00422 {
00423
00424
00425 Double_t ttot = time;
00426 Double_t *ar = fCircNtp->GetArgs();
00427 Int_t ne = fCircNtp->GetEntries();
00428 if (ne <= 0) {
00429
00430 fCircNtp->Fill(0., 0);
00431 fSpeed = 0.;
00432 return;
00433 }
00434
00435 fCircNtp->GetEntry(ne-1);
00436 ttot = ar[0] + time;
00437 fCircNtp->Fill(ttot, GetEntriesProcessed());
00438
00439
00440 fCircNtp->GetEntry(0);
00441 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
00442 Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
00443 fSpeed = nevts / dtime;
00444 PDB(kPacketizer,2)
00445 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
00446 time, dtime, nevts, fSpeed);
00447
00448 }
00449
00450
00451 TProofProgressStatus *TPacketizerFile::TSlaveStat::AddProcessed(TProofProgressStatus *st)
00452 {
00453
00454
00455
00456 if (st) {
00457
00458 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
00459
00460 fStatus->SetLastProcTime(0.);
00461
00462 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
00463 *fStatus += *diff;
00464
00465 fStatus->SetLastEntries(lastEntries);
00466 return diff;
00467 } else {
00468 Error("AddProcessed", "status arg undefined");
00469 return 0;
00470 }
00471 }
00472
00473
00474 void TPacketizerFile::TIterObj::Print(Option_t *) const
00475 {
00476
00477
00478 Printf("Iterator '%s' controls %d units", GetName(),
00479 ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
00480 : -1));
00481 }