00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #include "TVirtualPacketizer.h"
00035 #include "TEnv.h"
00036 #include "TFile.h"
00037 #include "TTree.h"
00038 #include "TKey.h"
00039 #include "TDSet.h"
00040 #include "TError.h"
00041 #include "TEventList.h"
00042 #include "TEntryList.h"
00043 #include "TMap.h"
00044 #include "TMessage.h"
00045 #include "TObjString.h"
00046 #include "TParameter.h"
00047
00048 #include "TProof.h"
00049 #include "TProofDebug.h"
00050 #include "TProofPlayer.h"
00051 #include "TProofServ.h"
00052 #include "TSlave.h"
00053 #include "TSocket.h"
00054 #include "TTimer.h"
00055 #include "TUrl.h"
00056 #include "TMath.h"
00057 #include "TMonitor.h"
00058 #include "TNtuple.h"
00059 #include "TNtupleD.h"
00060 #include "TPerfStats.h"
00061
00062 ClassImp(TVirtualPacketizer)
00063
00064
00065 TVirtualPacketizer::TVirtualPacketizer(TList *input, TProofProgressStatus *st)
00066 {
00067
00068
00069
00070 fMinPacketTime = 3;
00071 Double_t minPacketTime = 0;
00072 if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) {
00073 Info("TVirtualPacketizer", "setting minimum time for a packet to %f",
00074 minPacketTime);
00075 fMinPacketTime = (Int_t) minPacketTime;
00076 }
00077 fMaxPacketTime = 20;
00078 Double_t maxPacketTime = 0;
00079 if (TProof::GetParameter(input, "PROOF_MaxPacketTime", maxPacketTime) == 0) {
00080 Info("TVirtualPacketizer", "setting maximum packet time for a packet to %f",
00081 maxPacketTime);
00082 fMaxPacketTime = (Int_t) maxPacketTime;
00083 }
00084 ResetBit(TVirtualPacketizer::kIsTree);
00085
00086
00087
00088 fConfigParams = new TList;
00089 fConfigParams->SetName("PROOF_PacketizerConfigParams");
00090 fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
00091 fConfigParams->Add(new TParameter<Double_t>("PROOF_MaxPacketTime", fMaxPacketTime));
00092
00093 fProgressStatus = st;
00094 if (!fProgressStatus) {
00095 Error("TVirtualPacketizer", "No progress status");
00096 return;
00097 }
00098 fTotalEntries = 0;
00099 fValid = kTRUE;
00100 fStop = kFALSE;
00101 fFailedPackets = 0;
00102 fDataSet = "";
00103 fSlaveStats = 0;
00104
00105
00106 fStartTime = gSystem->Now();
00107 SetBit(TVirtualPacketizer::kIsInitializing);
00108 ResetBit(TVirtualPacketizer::kIsDone);
00109 fInitTime = 0;
00110 fProcTime = 0;
00111 fTimeUpdt = -1.;
00112
00113
00114 fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb:rc:al");
00115 fCircN = 5;
00116 TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
00117 fCircProg->SetCircular(fCircN);
00118
00119
00120
00121 TString startProgress("yes");
00122 TProof::GetParameter(input, "PROOF_StartProgressTimer", startProgress);
00123
00124
00125
00126
00127 fProgress = 0;
00128 if (startProgress == "yes") {
00129 Long_t period = 500;
00130 TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
00131 fProgress = new TTimer;
00132 fProgress->SetObject(this);
00133 fProgress->Start(period, kFALSE);
00134 }
00135
00136
00137 TString saveProgressPerf("no");
00138 TProof::GetParameter(input, "PROOF_SaveProgressPerf", saveProgressPerf);
00139 fProgressPerf = 0;
00140 if (fProgress && saveProgressPerf == "yes")
00141 fProgressPerf = new TNtuple("PROOF_ProgressPerfNtuple",
00142 "{Active workers, evt rate, MB read} vs processing time", "tm:aw:er:mb:ns");
00143 fProcTimeLast = -1.;
00144 fActWrksLast = -1;
00145 fEvtRateLast = -1.;
00146 fMBsReadLast = -1.;
00147 fEffSessLast = -1.;
00148 fAWLastFill = kFALSE;
00149 fReportPeriod = -1.;
00150
00151
00152 TString estopt;
00153 TProof::GetParameter(input, "PROOF_RateEstimation", estopt);
00154 if (estopt.IsNull()) {
00155
00156 estopt = gEnv->GetValue("Proof.RateEstimation", "");
00157 }
00158 fUseEstOpt = kEstOff;
00159 if (estopt == "current")
00160 fUseEstOpt = kEstCurrent;
00161 else if (estopt == "average")
00162 fUseEstOpt = kEstAverage;
00163 }
00164
00165
00166 TVirtualPacketizer::~TVirtualPacketizer()
00167 {
00168
00169
00170 SafeDelete(fCircProg);
00171 SafeDelete(fProgress);
00172 SafeDelete(fFailedPackets);
00173 SafeDelete(fConfigParams);
00174 SafeDelete(fProgressPerf);
00175 fProgressStatus = 0;
00176 }
00177
00178
00179 Long64_t TVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement *e)
00180 {
00181
00182
00183 Long64_t entries;
00184 TFile *file = TFile::Open(e->GetFileName());
00185
00186 if ( file->IsZombie() ) {
00187 Error("GetEntries","Cannot open file: %s (%s)",
00188 e->GetFileName(), strerror(file->GetErrno()) );
00189 return -1;
00190 }
00191
00192 TDirectory *dirsave = gDirectory;
00193 if ( ! file->cd(e->GetDirectory()) ) {
00194 Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
00195 delete file;
00196 return -1;
00197 }
00198 TDirectory *dir = gDirectory;
00199 dirsave->cd();
00200
00201 if ( tree ) {
00202 TKey *key = dir->GetKey(e->GetObjName());
00203 if ( key == 0 ) {
00204 Error("GetEntries","Cannot find tree \"%s\" in %s",
00205 e->GetObjName(), e->GetFileName() );
00206 delete file;
00207 return -1;
00208 }
00209 TTree *t = (TTree *) key->ReadObj();
00210 if ( t == 0 ) {
00211
00212 delete file;
00213 return -1;
00214 }
00215 entries = (Long64_t) t->GetEntries();
00216 delete t;
00217
00218 } else {
00219 TList *keys = dir->GetListOfKeys();
00220 entries = keys->GetSize();
00221 }
00222
00223 delete file;
00224
00225 return entries;
00226 }
00227
00228
00229 TDSetElement *TVirtualPacketizer::GetNextPacket(TSlave *, TMessage *)
00230 {
00231
00232
00233 AbstractMethod("GetNextPacket");
00234 return 0;
00235 }
00236
00237
00238 void TVirtualPacketizer::StopProcess(Bool_t )
00239 {
00240
00241
00242 fStop = kTRUE;
00243 }
00244
00245
00246 TDSetElement* TVirtualPacketizer::CreateNewPacket(TDSetElement* base,
00247 Long64_t first, Long64_t num)
00248 {
00249
00250
00251
00252
00253 TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
00254 base->GetDirectory(), first, num,
00255 0, fDataSet.Data());
00256
00257
00258 TList *friends = base->GetListOfFriends();
00259 if (friends) {
00260 TIter nxf(friends);
00261 TDSetElement *fe = 0;
00262 while ((fe = (TDSetElement *) nxf())) {
00263 TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
00264 fe->GetDirectory(), first, num);
00265
00266 elem->AddFriend(xfe, 0);
00267 }
00268 }
00269
00270 return elem;
00271 }
00272
00273
00274 Bool_t TVirtualPacketizer::HandleTimer(TTimer *)
00275 {
00276
00277
00278 PDB(kPacketizer,2)
00279 Info("HandleTimer", "fProgress: %p, isDone: %d",
00280 fProgress, TestBit(TVirtualPacketizer::kIsDone));
00281
00282 if (fProgress == 0 || TestBit(TVirtualPacketizer::kIsDone))
00283 return kFALSE;
00284
00285
00286 TTime tnow = gSystem->Now();
00287 Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
00288 Long64_t estent = GetEntriesProcessed();
00289 Long64_t estmb = GetBytesRead();
00290 Long64_t estrc = GetReadCalls();
00291
00292
00293 Float_t evtrti = -1., mbrti = -1.;
00294 if (TestBit(TVirtualPacketizer::kIsInitializing)) {
00295
00296 fInitTime = now;
00297 } else {
00298
00299 if (fCircProg->GetEntries() <= 0) {
00300 fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
00301 }
00302
00303 fTimeUpdt = now - fProcTime;
00304
00305 fProcTime = now - fInitTime;
00306
00307 Double_t *ar = fCircProg->GetArgs();
00308 fCircProg->GetEntry(fCircProg->GetEntries()-1);
00309
00310 Bool_t all = kTRUE;
00311 evtrti = GetCurrentRate(all);
00312 Double_t xall = (all) ? 1. : 0.;
00313 GetEstEntriesProcessed(0, estent, estmb, estrc);
00314
00315 Double_t evts = (Double_t) estent;
00316 Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.;
00317 Double_t rcs = (Double_t) estrc;
00318 fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
00319 fCircProg->GetEntry(fCircProg->GetEntries()-2);
00320 if (all) {
00321 Double_t dt = (Double_t)fProcTime - ar[0];
00322 Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
00323 Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
00324 if (gPerfStats)
00325 gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
00326
00327 Double_t rc = (Double_t)estrc - ar[3];
00328 mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
00329 }
00330
00331 if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
00332 SetBit(TVirtualPacketizer::kIsDone);
00333 PDB(kPacketizer,2)
00334 Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
00335 estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
00336 }
00337
00338 if (gProofServ) {
00339
00340 TMessage m(kPROOF_PROGRESS);
00341 if (gProofServ->GetProtocol() > 25) {
00342 Int_t actw = GetActiveWorkers();
00343 Int_t acts = gProofServ->GetActSessions();
00344 Float_t effs = gProofServ->GetEffSessions();
00345 if (fProgressPerf && estent > 0) {
00346
00347 if (fProcTime > 0.) {
00348 fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
00349 if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
00350 }
00351
00352 if (fProgressPerf->GetEntries() <= 0) {
00353
00354 fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
00355 } else {
00356
00357 Float_t *far = fProgressPerf->GetArgs();
00358 fProgressPerf->GetEntry(fProgressPerf->GetEntries()-1);
00359 Bool_t doReport = (fReportPeriod > 0. &&
00360 (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
00361 Float_t mbsread = estmb / 1024. / 1024.;
00362 if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
00363 if (fAWLastFill)
00364 fProgressPerf->Fill(fProcTimeLast, (Float_t)fActWrksLast,
00365 fEvtRateLast, fMBsReadLast, fEffSessLast);
00366 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
00367 fAWLastFill = kFALSE;
00368 } else if (doReport) {
00369 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
00370 fAWLastFill = kFALSE;
00371 } else {
00372 fAWLastFill = kTRUE;
00373 }
00374 fProcTimeLast = fProcTime;
00375 fActWrksLast = actw;
00376 fEvtRateLast = evtrti;
00377 fMBsReadLast = mbsread;
00378 fEffSessLast = effs;
00379 }
00380 }
00381
00382 TProofProgressInfo pi(fTotalEntries, estent, estmb, fInitTime,
00383 fProcTime, evtrti, mbrti, actw, acts, effs);
00384 m << π
00385 } else if (gProofServ->GetProtocol() > 11) {
00386
00387 m << fTotalEntries << estent << estmb << fInitTime << fProcTime
00388 << evtrti << mbrti;
00389 } else {
00390
00391 m << fTotalEntries << GetEntriesProcessed();
00392 }
00393
00394 gProofServ->GetSocket()->Send(m);
00395
00396 } else {
00397 if (gProof && gProof->GetPlayer()) {
00398
00399 gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
00400 fInitTime, fProcTime, evtrti, mbrti);
00401 }
00402 }
00403
00404
00405 if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
00406 SetBit(TVirtualPacketizer::kIsDone);
00407
00408 return kFALSE;
00409 }
00410
00411
00412 void TVirtualPacketizer::SetInitTime()
00413 {
00414
00415
00416 if (TestBit(TVirtualPacketizer::kIsInitializing)) {
00417 fInitTime = Long64_t(gSystem->Now() - fStartTime) / (Float_t)1000.;
00418 ResetBit(TVirtualPacketizer::kIsInitializing);
00419 PDB(kPacketizer,2)
00420 Info("SetInitTime","fInitTime set to %f s", fInitTime);
00421 }
00422 }