00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "TPerfStats.h"
00023
00024 #include "Riostream.h"
00025 #include "TCollection.h"
00026 #include "TEnv.h"
00027 #include "TError.h"
00028 #include "TFile.h"
00029 #include "TH1.h"
00030 #include "TH2.h"
00031 #include "TProofDebug.h"
00032 #include "TProof.h"
00033 #include "TProofServ.h"
00034 #include "TSlave.h"
00035 #include "TTree.h"
00036 #include "TSQLServer.h"
00037 #include "TSQLResult.h"
00038 #include "TParameter.h"
00039 #include "TPluginManager.h"
00040 #include "TROOT.h"
00041 #include "TTimeStamp.h"
00042 #include "TVirtualMonitoring.h"
00043
00044
00045 ClassImp(TPerfEvent)
00046 ClassImp(TPerfStats)
00047
00048
00049
00050
00051
00052 TPerfEvent::TPerfEvent(TTimeStamp *offset)
00053 : fEvtNode("-3"), fType(TVirtualPerfStats::kUnDefined), fSlave(),
00054 fEventsProcessed(0), fBytesRead(0), fLen(0), fLatency(0.0), fProcTime(0.0), fCpuTime(0.0),
00055 fIsStart(kFALSE), fIsOk(kFALSE)
00056 {
00057
00058
00059 if (gProofServ != 0) {
00060 fEvtNode = gProofServ->GetOrdinal();
00061 } else {
00062 if (gProof && gProof->IsLite())
00063 fEvtNode = "0";
00064 else
00065 fEvtNode = "-2";
00066 }
00067
00068 if (offset != 0) {
00069 fTimeStamp = TTimeStamp(fTimeStamp.GetSec() - offset->GetSec(),
00070 fTimeStamp.GetNanoSec() - offset->GetNanoSec());
00071 }
00072 }
00073
00074
00075 Int_t TPerfEvent::Compare(const TObject *obj) const
00076 {
00077
00078
00079
00080 const TPerfEvent *pe = dynamic_cast<const TPerfEvent*>(obj);
00081
00082 if (!pe) {
00083 Error("Compare", "input is not a TPerfEvent object");
00084 return 0;
00085 }
00086
00087 if (fTimeStamp < pe->fTimeStamp) {
00088 return -1;
00089 } else if (fTimeStamp == pe->fTimeStamp) {
00090 return 0;
00091 } else {
00092 return 1;
00093 }
00094 }
00095
00096
00097 void TPerfEvent::Print(Option_t *) const
00098 {
00099
00100
00101 TString where;
00102 if (fEvtNode == -2) {
00103 where = "TPerfEvent: StandAlone ";
00104 } else if ( fEvtNode == -1 ) {
00105 where = "TPerfEvent: Master ";
00106 } else {
00107 where.Form("TPerfEvent: Worker %s ", fEvtNode.Data());
00108 }
00109 Printf("%s %s %f", where.Data(),
00110 TVirtualPerfStats::EventType(fType), double(fTimeStamp));
00111 }
00112
00113
00114 TPerfStats::TPerfStats(TList *input, TList *output)
00115 : fTrace(0), fPerfEvent(0), fPacketsHist(0), fEventsHist(0), fLatencyHist(0),
00116 fProcTimeHist(0), fCpuTimeHist(0), fBytesRead(0),
00117 fTotCpuTime(0.), fTotBytesRead(0), fTotEvents(0), fNumEvents(0),
00118 fSlaves(0), fDoHist(kFALSE),
00119 fDoTrace(kFALSE), fDoTraceRate(kFALSE), fDoSlaveTrace(kFALSE), fDoQuota(kFALSE),
00120 fMonitoringWriter(0)
00121 {
00122
00123
00124 TProof *proof = (gProofServ) ? gProofServ->GetProof() : gProof;
00125
00126
00127 Bool_t isMaster = ((proof && proof->TestBit(TProof::kIsMaster)) ||
00128 (gProofServ && gProofServ->IsMaster())) ? kTRUE : kFALSE;
00129
00130 TList *l = proof ? proof->GetListOfSlaveInfos() : 0 ;
00131 TIter nextslaveinfo(l);
00132 while (TSlaveInfo *si = dynamic_cast<TSlaveInfo*>(nextslaveinfo()))
00133 if (si->fStatus == TSlaveInfo::kActive) fSlaves++;
00134
00135 PDB(kGlobal,1) Info("TPerfStats", "Statistics for %d slave(s)", fSlaves);
00136
00137 fDoHist = (input->FindObject("PROOF_StatsHist") != 0);
00138 fDoTrace = (input->FindObject("PROOF_StatsTrace") != 0);
00139 fDoTraceRate = (input->FindObject("PROOF_RateTrace") != 0);
00140 fDoSlaveTrace = (input->FindObject("PROOF_SlaveStatsTrace") != 0);
00141
00142
00143 Int_t perpacket = -1;
00144 if (TProof::GetParameter(input, "PROOF_MonitorPerPacket", perpacket) != 0) {
00145
00146 perpacket = gEnv->GetValue("Proof.MonitorPerPacket", 0);
00147 }
00148 fMonitorPerPacket = (perpacket == 1) ? kTRUE : kFALSE;
00149 if (fMonitorPerPacket)
00150 Info("TPerfStats", "sending full information after each packet");
00151
00152 if ((isMaster && (fDoTrace || fDoTraceRate)) || (!isMaster && fDoSlaveTrace)) {
00153
00154 gDirectory->RecursiveRemove(gDirectory->FindObject("PROOF_PerfStats"));
00155 fTrace = new TTree("PROOF_PerfStats", "PROOF Statistics");
00156 fTrace->SetDirectory(0);
00157 fTrace->Bronch("PerfEvents", "TPerfEvent", &fPerfEvent, 64000, 0);
00158 output->Add(fTrace);
00159 }
00160
00161 if (fDoHist && isMaster) {
00162
00163 Double_t time_per_bin = 1e-3;
00164 Double_t min_time = 0;
00165 Int_t ntime_bins = 1000;
00166
00167 gDirectory->RecursiveRemove(gDirectory->FindObject("PROOF_PacketsHist"));
00168 fPacketsHist = new TH1D("PROOF_PacketsHist", "Packets processed per Worker",
00169 fSlaves, 0, fSlaves);
00170 fPacketsHist->SetDirectory(0);
00171 fPacketsHist->SetMinimum(0);
00172 output->Add(fPacketsHist);
00173
00174 gDirectory->RecursiveRemove(gDirectory->FindObject("PROOF_EventsHist"));
00175 fEventsHist = new TH1D("PROOF_EventsHist", "Events processed per Worker",
00176 fSlaves, 0, fSlaves);
00177 fEventsHist->SetFillColor(kGreen);
00178 fEventsHist->SetDirectory(0);
00179 fEventsHist->SetMinimum(0);
00180 output->Add(fEventsHist);
00181
00182 gDirectory->RecursiveRemove(gDirectory->FindObject("PROOF_NodeHist"));
00183 fNodeHist = new TH1D("PROOF_NodeHist", "Slaves per Fileserving Node",
00184 fSlaves, 0, fSlaves);
00185 fNodeHist->SetDirectory(0);
00186 fNodeHist->SetMinimum(0);
00187 fNodeHist->SetBit(TH1::kCanRebin);
00188 output->Add(fNodeHist);
00189
00190 gDirectory->RecursiveRemove(gDirectory->FindObject("PROOF_LatencyHist"));
00191 fLatencyHist = new TH2D("PROOF_LatencyHist", "GetPacket Latency per Worker",
00192 fSlaves, 0, fSlaves,
00193 ntime_bins, min_time, time_per_bin);
00194 fLatencyHist->SetDirectory(0);
00195 fLatencyHist->SetMarkerStyle(4);
00196 fLatencyHist->SetBit(TH1::kCanRebin);
00197 output->Add(fLatencyHist);
00198
00199 gDirectory->RecursiveRemove(gDirectory->FindObject("PROOF_ProcTimeHist"));
00200 fProcTimeHist = new TH2D("PROOF_ProcTimeHist", "Packet Processing Time per Worker",
00201 fSlaves, 0, fSlaves,
00202 ntime_bins, min_time, time_per_bin);
00203 fProcTimeHist->SetDirectory(0);
00204 fProcTimeHist->SetMarkerStyle(4);
00205 fProcTimeHist->SetBit(TH1::kCanRebin);
00206 output->Add(fProcTimeHist);
00207
00208 gDirectory->RecursiveRemove(gDirectory->FindObject("PROOF_CpuTimeHist"));
00209 fCpuTimeHist = new TH2D("PROOF_CpuTimeHist", "Packet CPU Time per Worker",
00210 fSlaves, 0, fSlaves,
00211 ntime_bins, min_time, time_per_bin);
00212 fCpuTimeHist->SetDirectory(0);
00213 fCpuTimeHist->SetMarkerStyle(4);
00214 fCpuTimeHist->SetBit(TH1::kCanRebin);
00215 output->Add(fCpuTimeHist);
00216
00217 nextslaveinfo.Reset();
00218 Int_t slavebin=1;
00219 while (TSlaveInfo *si = dynamic_cast<TSlaveInfo*>(nextslaveinfo())) {
00220 if (si->fStatus == TSlaveInfo::kActive) {
00221 fPacketsHist->GetXaxis()->SetBinLabel(slavebin, si->GetOrdinal());
00222 fEventsHist->GetXaxis()->SetBinLabel(slavebin, si->GetOrdinal());
00223 fNodeHist->GetXaxis()->SetBinLabel(slavebin, si->GetOrdinal());
00224 fLatencyHist->GetXaxis()->SetBinLabel(slavebin, si->GetOrdinal());
00225 fProcTimeHist->GetXaxis()->SetBinLabel(slavebin, si->GetOrdinal());
00226 fCpuTimeHist->GetXaxis()->SetBinLabel(slavebin, si->GetOrdinal());
00227 slavebin++;
00228 }
00229 }
00230 }
00231
00232 if (isMaster) {
00233
00234 TString sqlserv = gEnv->GetValue("ProofServ.QueryLogDB", "");
00235 if (sqlserv != "") {
00236 PDB(kGlobal,1) Info("TPerfStats", "store monitoring data in SQL DB: %s", sqlserv.Data());
00237 fDoQuota = kTRUE;
00238 }
00239
00240
00241 TString mon = gEnv->GetValue("ProofServ.Monitoring", "");
00242 if (mon != "") {
00243
00244 TString a[10];
00245 Int_t from = 0;
00246 TString tok;
00247 Int_t na = 0;
00248 while (mon.Tokenize(tok, from, " "))
00249 a[na++] = tok;
00250 na--;
00251
00252 TPluginHandler *h = 0;
00253 if ((h = gROOT->GetPluginManager()->FindHandler("TVirtualMonitoringWriter", a[0]))) {
00254 if (h->LoadPlugin() != -1) {
00255 fMonitoringWriter =
00256 (TVirtualMonitoringWriter *) h->ExecPlugin(na, a[1].Data(), a[2].Data(), a[3].Data(),
00257 a[4].Data(), a[5].Data(), a[6].Data(),
00258 a[7].Data(), a[8].Data(), a[9].Data());
00259 if (fMonitoringWriter && fMonitoringWriter->IsZombie()) {
00260 delete fMonitoringWriter;
00261 fMonitoringWriter = 0;
00262 }
00263 }
00264 }
00265 }
00266
00267 if (fMonitoringWriter) {
00268 PDB(kGlobal,1) Info("TPerfStats", "created monitoring object: %s", mon.Data());
00269 fDoQuota = kTRUE;
00270 }
00271 }
00272 }
00273
00274
00275 void TPerfStats::SimpleEvent(EEventType type)
00276 {
00277
00278
00279 if (type == kStop && fPacketsHist != 0) {
00280 fNodeHist->LabelsDeflate("X");
00281 fNodeHist->LabelsOption("auv","X");
00282 }
00283
00284 if (type == kStop && fDoQuota)
00285 WriteQueryLog();
00286
00287 if (fTrace == 0) return;
00288
00289 TPerfEvent pe(&fTzero);
00290 pe.fType = type;
00291
00292 fPerfEvent = &pe;
00293 fTrace->SetBranchAddress("PerfEvents",&fPerfEvent);
00294 fTrace->Fill();
00295 fPerfEvent = 0;
00296 }
00297
00298
00299 void TPerfStats::PacketEvent(const char *slave, const char* slavename, const char* filename,
00300 Long64_t eventsprocessed, Double_t latency, Double_t proctime,
00301 Double_t cputime, Long64_t bytesRead)
00302 {
00303
00304
00305 if (fDoTrace && fTrace != 0) {
00306 TPerfEvent pe(&fTzero);
00307
00308 pe.fType = kPacket;
00309 pe.fSlaveName = slavename;
00310 pe.fFileName = filename;
00311 pe.fSlave = slave;
00312 pe.fEventsProcessed = eventsprocessed;
00313 pe.fBytesRead = bytesRead;
00314 pe.fLatency = latency;
00315 pe.fProcTime = proctime;
00316 pe.fCpuTime = cputime;
00317
00318 fPerfEvent = &pe;
00319 fTrace->SetBranchAddress("PerfEvents",&fPerfEvent);
00320 fTrace->Fill();
00321 fPerfEvent = 0;
00322 }
00323
00324 PDB(kGlobal,1)
00325 Info("PacketEvent","%s: fDoHist: %d, fPacketsHist: %p, eventsprocessed: %lld",
00326 slave, fDoHist, fPacketsHist, eventsprocessed);
00327
00328 if (fDoHist && fPacketsHist != 0) {
00329 fPacketsHist->Fill(slave, 1);
00330 fEventsHist->Fill(slave, eventsprocessed);
00331 fLatencyHist->Fill(slave, latency, 1);
00332 fProcTimeHist->Fill(slave, proctime, 1);
00333 fCpuTimeHist->Fill(slave, cputime, 1);
00334 }
00335
00336 if (fDoQuota) {
00337 fTotCpuTime += cputime;
00338 fTotBytesRead += bytesRead;
00339 fTotEvents += eventsprocessed;
00340 }
00341
00342
00343 if (fMonitoringWriter && fMonitorPerPacket) {
00344 if (!gProofServ || !gProofServ->GetSessionTag() || !gProofServ->GetProof() ||
00345 !gProofServ->GetProof()->GetQueryResult()) {
00346 Error("PacketEvent", "some required object are undefined (%p %p %p %p)",
00347 gProofServ, (gProofServ ? gProofServ->GetSessionTag() : 0),
00348 (gProofServ ? gProofServ->GetProof() : 0),
00349 ((gProofServ && gProofServ->GetProof()) ?
00350 gProofServ->GetProof()->GetQueryResult() : 0));
00351 return;
00352 }
00353
00354 TTimeStamp stop;
00355 TString identifier;
00356 identifier.Form("%s-q%d", gProofServ->GetSessionTag(),
00357 gProofServ->GetProof()->GetQueryResult()->GetSeqNum());
00358
00359 TList values;
00360 values.SetOwner();
00361 values.Add(new TParameter<int>("id", 0));
00362 values.Add(new TNamed("user", gProofServ->GetUser()));
00363 values.Add(new TNamed("proofgroup", gProofServ->GetGroup()));
00364 values.Add(new TNamed("begin", fTzero.AsString("s")));
00365 values.Add(new TNamed("end", stop.AsString("s")));
00366 values.Add(new TParameter<int>("walltime", stop.GetSec()-fTzero.GetSec()));
00367 values.Add(new TParameter<Long64_t>("bytesread", fTotBytesRead));
00368 values.Add(new TParameter<Long64_t>("events", fTotEvents));
00369 values.Add(new TParameter<Long64_t>("totevents", fNumEvents));
00370 values.Add(new TParameter<int>("workers", fSlaves));
00371 values.Add(new TNamed("querytag", identifier.Data()));
00372 if (!fMonitoringWriter->SendParameters(&values, identifier))
00373 Error("PacketEvent", "sending of monitoring info failed");
00374 }
00375 }
00376
00377
00378 void TPerfStats::FileEvent(const char *slave, const char *slavename, const char *nodename,
00379 const char *filename, Bool_t isStart)
00380 {
00381
00382
00383 if (fDoTrace && fTrace != 0) {
00384 TPerfEvent pe(&fTzero);
00385
00386 pe.fType = kFile;
00387 pe.fSlaveName = slavename;
00388 pe.fNodeName = nodename;
00389 pe.fFileName = filename;
00390 pe.fSlave = slave;
00391 pe.fIsStart = isStart;
00392
00393 fPerfEvent = &pe;
00394 fTrace->SetBranchAddress("PerfEvents",&fPerfEvent);
00395 fTrace->Fill();
00396 fPerfEvent = 0;
00397 }
00398
00399 if (fDoHist && fPacketsHist != 0) {
00400 fNodeHist->Fill(nodename, isStart ? 1 : -1);
00401 }
00402 }
00403
00404
00405 void TPerfStats::FileOpenEvent(TFile *file, const char *filename, Double_t start)
00406 {
00407
00408
00409 if (fDoTrace && fTrace != 0) {
00410 TPerfEvent pe(&fTzero);
00411
00412 pe.fType = kFileOpen;
00413 pe.fFileName = filename;
00414 pe.fFileClass = file != 0 ? file->ClassName() : "none";
00415 pe.fProcTime = double(TTimeStamp())-start;
00416 pe.fIsOk = (file != 0);
00417
00418 fPerfEvent = &pe;
00419 fTrace->SetBranchAddress("PerfEvents",&fPerfEvent);
00420 fTrace->Fill();
00421 fPerfEvent = 0;
00422 }
00423 }
00424
00425
00426 void TPerfStats::FileReadEvent(TFile *file, Int_t len, Double_t start)
00427 {
00428
00429
00430 if (fDoTrace && fTrace != 0) {
00431 TPerfEvent pe(&fTzero);
00432
00433 pe.fType = kFileRead;
00434 pe.fFileName = file->GetName();
00435 pe.fFileClass = file->ClassName();
00436 pe.fLen = len;
00437 pe.fProcTime = double(TTimeStamp())-start;
00438
00439 fPerfEvent = &pe;
00440 fTrace->SetBranchAddress("PerfEvents",&fPerfEvent);
00441 fTrace->Fill();
00442 fPerfEvent = 0;
00443 }
00444 }
00445
00446
00447 void TPerfStats::RateEvent(Double_t proctime, Double_t deltatime,
00448 Long64_t eventsprocessed, Long64_t bytesRead)
00449 {
00450
00451
00452 if ((fDoTrace || fDoTraceRate) && fTrace != 0) {
00453 TPerfEvent pe(&fTzero);
00454
00455 pe.fType = kRate;
00456 pe.fEventsProcessed = eventsprocessed;
00457 pe.fBytesRead = bytesRead;
00458 pe.fProcTime = proctime;
00459 pe.fLatency = deltatime;
00460
00461 fPerfEvent = &pe;
00462 fTrace->SetBranchAddress("PerfEvents",&fPerfEvent);
00463 fTrace->Fill();
00464 fPerfEvent = 0;
00465 }
00466 }
00467
00468
00469 void TPerfStats::SetBytesRead(Long64_t num)
00470 {
00471
00472
00473 fBytesRead = num;
00474 }
00475
00476
00477 Long64_t TPerfStats::GetBytesRead() const
00478 {
00479
00480
00481 return fBytesRead;
00482 }
00483
00484
00485 void TPerfStats::WriteQueryLog()
00486 {
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505 TTimeStamp stop;
00506
00507 TString sqlserv = gEnv->GetValue("ProofServ.QueryLogDB","");
00508 TString sqluser = gEnv->GetValue("ProofServ.QueryLogUser","");
00509 TString sqlpass = gEnv->GetValue("ProofServ.QueryLogPasswd","");
00510
00511
00512 if (sqlserv != "" && sqluser != "" && sqlpass != "" && gProofServ) {
00513 TString sql;
00514 sql.Form("INSERT INTO proofquerylog VALUES (0, '%s', '%s', "
00515 "'%s', '%s', %ld, %.2f, %lld, %lld, %d)",
00516 gProofServ->GetUser(), gProofServ->GetGroup(),
00517 fTzero.AsString("s"), stop.AsString("s"),
00518 stop.GetSec()-fTzero.GetSec(), fTotCpuTime,
00519 fTotBytesRead, fTotEvents, fSlaves);
00520
00521
00522 TSQLServer *db = TSQLServer::Connect(sqlserv, sqluser, sqlpass);
00523
00524 if (!db || db->IsZombie()) {
00525 Error("WriteQueryLog", "failed to connect to SQL server %s as %s %s",
00526 sqlserv.Data(), sqluser.Data(), sqlpass.Data());
00527 printf("%s\n", sql.Data());
00528 } else {
00529 TSQLResult *res = db->Query(sql);
00530
00531 if (!res) {
00532 Error("WriteQueryLog", "insert into proofquerylog failed");
00533 printf("%s\n", sql.Data());
00534 }
00535 delete res;
00536 }
00537 delete db;
00538 }
00539
00540
00541 if (fMonitoringWriter) {
00542 if (!gProofServ || !gProofServ->GetSessionTag() || !gProofServ->GetProof() ||
00543 !gProofServ->GetProof()->GetQueryResult()) {
00544 Error("WriteQueryLog", "some required object are undefined (%p %p %p %p)",
00545 gProofServ, (gProofServ ? gProofServ->GetSessionTag() : 0),
00546 (gProofServ ? gProofServ->GetProof() : 0),
00547 ((gProofServ && gProofServ->GetProof()) ?
00548 gProofServ->GetProof()->GetQueryResult() : 0));
00549 return;
00550 }
00551
00552 TString identifier;
00553 identifier.Form("%s-q%d", gProofServ->GetSessionTag(),
00554 gProofServ->GetProof()->GetQueryResult()->GetSeqNum());
00555
00556 TList values;
00557 values.SetOwner();
00558 values.Add(new TParameter<int>("id", 0));
00559 values.Add(new TNamed("user", gProofServ->GetUser()));
00560 values.Add(new TNamed("proofgroup", gProofServ->GetGroup()));
00561 values.Add(new TNamed("begin", fTzero.AsString("s")));
00562 values.Add(new TNamed("end", stop.AsString("s")));
00563 values.Add(new TParameter<int>("walltime", stop.GetSec()-fTzero.GetSec()));
00564 values.Add(new TParameter<float>("cputime", fTotCpuTime));
00565 values.Add(new TParameter<Long64_t>("bytesread", fTotBytesRead));
00566 values.Add(new TParameter<Long64_t>("events", fTotEvents));
00567 values.Add(new TParameter<Long64_t>("totevents", fTotEvents));
00568 values.Add(new TParameter<int>("workers", fSlaves));
00569 values.Add(new TNamed("querytag", identifier.Data()));
00570 if (!fMonitoringWriter->SendParameters(&values, identifier))
00571 Error("WriteQueryLog", "sending of monitoring info failed");
00572 }
00573 }
00574
00575
00576 void TPerfStats::Setup(TList *input)
00577 {
00578
00579
00580 const Int_t ntags=3;
00581 const Char_t *tags[ntags] = {"StatsHist",
00582 "StatsTrace",
00583 "SlaveStatsTrace"};
00584
00585 for (Int_t i=0; i<ntags; i++) {
00586 TString envvar = "Proof.";
00587 envvar += tags[i];
00588 TString inputname = "PROOF_";
00589 inputname += tags[i];
00590 TObject* obj = input->FindObject(inputname.Data());
00591 if (gEnv->GetValue(envvar.Data(), 0)) {
00592 if (!obj)
00593 input->Add(new TNamed(inputname.Data(),""));
00594 } else {
00595 if (obj) {
00596 input->Remove(obj);
00597 delete obj;
00598 }
00599 }
00600 }
00601 }
00602
00603
00604 void TPerfStats::Start(TList *input, TList *output)
00605 {
00606
00607
00608 if (gPerfStats)
00609 delete gPerfStats;
00610
00611 gPerfStats = new TPerfStats(input, output);
00612 if (gPerfStats && !gPerfStats->TestBit(TObject::kInvalidObject)) {
00613 gPerfStats->SimpleEvent(TVirtualPerfStats::kStart);
00614 } else {
00615 SafeDelete(gPerfStats);
00616 }
00617 }
00618
00619
00620 void TPerfStats::Stop()
00621 {
00622
00623
00624 if (!gPerfStats) return;
00625
00626 gPerfStats->SimpleEvent(TVirtualPerfStats::kStop);
00627
00628 delete gPerfStats;
00629 gPerfStats = 0;
00630 }