00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <errno.h>
00021 #ifdef WIN32
00022 # include <io.h>
00023 #endif
00024
00025 #include "TQueryResultManager.h"
00026
00027 #include "TFile.h"
00028 #include "THashList.h"
00029 #include "TKey.h"
00030 #include "TProofQueryResult.h"
00031 #include "TObjString.h"
00032 #include "TParameter.h"
00033 #include "TProof.h"
00034 #include "TProofServ.h"
00035 #include "TRegexp.h"
00036 #include "TSortedList.h"
00037 #include "TSystem.h"
00038 #include "TVirtualProofPlayer.h"
00039
00040
00041 TQueryResultManager::TQueryResultManager(const char *qdir, const char *stag,
00042 const char *sdir,
00043 TProofLockPath *lck, FILE *logfile)
00044 {
00045
00046
00047 fQueryDir = qdir;
00048 fSessionTag = stag;
00049 fSessionDir = sdir;
00050 fSeqNum = 0;
00051 fDrawQueries = 0;
00052 fKeptQueries = 0;
00053 fQueries = new TList;
00054 fPreviousQueries = 0;
00055 fLock = lck;
00056 fLogFile = (logfile) ? logfile : stdout;
00057 }
00058
00059
00060 TQueryResultManager::~TQueryResultManager()
00061 {
00062
00063
00064
00065 SafeDelete(fQueries);
00066 SafeDelete(fPreviousQueries);
00067 }
00068
00069
00070 void TQueryResultManager::AddLogFile(TProofQueryResult *pq)
00071 {
00072
00073
00074
00075 if (!pq)
00076 return;
00077
00078
00079 fflush(fLogFile);
00080
00081
00082 off_t lnow = 0;
00083 if ((lnow = lseek(fileno(fLogFile), (off_t) 0, SEEK_CUR)) < 0) {
00084 Error("AddLogFile", "problems lseeking current position on log file (errno: %d)", errno);
00085 return;
00086 }
00087
00088
00089 Int_t start = pq->fStartLog;
00090 if (start > -1)
00091 lseek(fileno(fLogFile), (off_t) start, SEEK_SET);
00092
00093
00094 const Int_t kMAXBUF = 4096;
00095 char line[kMAXBUF];
00096 while (fgets(line, sizeof(line), fLogFile)) {
00097 if (line[strlen(line)-1] == '\n')
00098 line[strlen(line)-1] = 0;
00099 pq->AddLogLine((const char *)line);
00100 }
00101
00102
00103 if (lnow >= 0) lseek(fileno(fLogFile), lnow, SEEK_SET);
00104 }
00105
00106 Int_t TQueryResultManager::CleanupQueriesDir()
00107 {
00108
00109
00110 Int_t nd = 0;
00111
00112
00113 if (fPreviousQueries) {
00114 fPreviousQueries->Delete();
00115 SafeDelete(fPreviousQueries);
00116 }
00117
00118
00119 TString queriesdir = fQueryDir;
00120 queriesdir = queriesdir.Remove(queriesdir.Index(kPROOF_QueryDir) +
00121 strlen(kPROOF_QueryDir));
00122 void *dirs = gSystem->OpenDirectory(queriesdir);
00123 char *sess = 0;
00124 while ((sess = (char *) gSystem->GetDirEntry(dirs))) {
00125
00126
00127 if (strlen(sess) < 7 || strncmp(sess,"session",7))
00128 continue;
00129
00130
00131 if (strstr(sess, fSessionTag))
00132 continue;
00133
00134
00135 TString qdir;
00136 qdir.Form("%s/%s", queriesdir.Data(), sess);
00137 PDB(kGlobal, 1)
00138 Info("RemoveQuery", "removing directory: %s", qdir.Data());
00139 gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
00140 nd++;
00141 }
00142
00143
00144 return nd;
00145 }
00146
00147
00148 void TQueryResultManager::ScanPreviousQueries(const char *dir)
00149 {
00150
00151
00152
00153
00154
00155 if (fPreviousQueries) {
00156 fPreviousQueries->Delete();
00157 SafeDelete(fPreviousQueries);
00158 }
00159
00160
00161 void *dirs = gSystem->OpenDirectory(dir);
00162 char *sess = 0;
00163 while ((sess = (char *) gSystem->GetDirEntry(dirs))) {
00164
00165
00166 if (strlen(sess) < 7 || strncmp(sess,"session",7))
00167 continue;
00168
00169
00170 if (strstr(sess, fSessionTag))
00171 continue;
00172
00173
00174 void *dirq = gSystem->OpenDirectory(Form("%s/%s", dir, sess));
00175 char *qry = 0;
00176 while ((qry = (char *) gSystem->GetDirEntry(dirq))) {
00177
00178
00179 if (qry[0] == '.')
00180 continue;
00181
00182
00183 TString fn = Form("%s/%s/%s/query-result.root", dir, sess, qry);
00184 TFile *f = TFile::Open(fn);
00185 if (f) {
00186 f->ReadKeys();
00187 TIter nxk(f->GetListOfKeys());
00188 TKey *k = 0;
00189 TProofQueryResult *pqr = 0;
00190 while ((k = (TKey *)nxk())) {
00191 if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
00192 pqr = (TProofQueryResult *) f->Get(k->GetName());
00193 if (pqr) {
00194 TQueryResult *qr = pqr->CloneInfo();
00195 if (!fPreviousQueries)
00196 fPreviousQueries = new TList;
00197 if (qr->GetStatus() > TQueryResult::kRunning) {
00198 fPreviousQueries->Add(qr);
00199 } else {
00200
00201
00202 TProofLockPath *lck = 0;
00203 if (LockSession(qr->GetTitle(), &lck) == 0) {
00204 RemoveQuery(qr);
00205
00206 SafeDelete(lck);
00207 }
00208 }
00209 }
00210 }
00211 }
00212 f->Close();
00213 delete f;
00214 }
00215 }
00216 gSystem->FreeDirectory(dirq);
00217 }
00218 gSystem->FreeDirectory(dirs);
00219 }
00220
00221
00222 Int_t TQueryResultManager::ApplyMaxQueries(Int_t mxq)
00223 {
00224
00225
00226
00227
00228
00229 if (mxq < 0)
00230 return 0;
00231
00232
00233 TSortedList *sl = new TSortedList;
00234 sl->SetOwner();
00235
00236 THashList *hl = new THashList;
00237 hl->SetOwner();
00238
00239
00240 TList *dl = new TList;
00241 dl->SetOwner();
00242
00243
00244 TString dir = fQueryDir;
00245 Int_t idx = dir.Index("session-");
00246 if (idx != kNPOS)
00247 dir.Remove(idx);
00248 void *dirs = gSystem->OpenDirectory(dir);
00249 char *sess = 0;
00250 while ((sess = (char *) gSystem->GetDirEntry(dirs))) {
00251
00252
00253 if (strlen(sess) < 7 || strncmp(sess,"session",7))
00254 continue;
00255
00256
00257 if (strstr(sess, fSessionTag))
00258 continue;
00259
00260
00261 Int_t nq = 0;
00262 void *dirq = gSystem->OpenDirectory(Form("%s/%s", dir.Data(), sess));
00263 char *qry = 0;
00264 while ((qry = (char *) gSystem->GetDirEntry(dirq))) {
00265
00266
00267 if (qry[0] == '.')
00268 continue;
00269
00270
00271 TString fn = Form("%s/%s/%s/query-result.root", dir.Data(), sess, qry);
00272
00273 FileStat_t st;
00274 if (gSystem->GetPathInfo(fn, st)) {
00275 PDB(kGlobal, 1)
00276 Info("ApplyMaxQueries","file '%s' cannot be stated: remove it", fn.Data());
00277 gSystem->Unlink(gSystem->DirName(fn));
00278 continue;
00279 }
00280
00281
00282 sl->Add(new TObjString(TString::Format("%ld", st.fMtime)));
00283 hl->Add(new TNamed((const char*)TString::Format("%ld",st.fMtime), fn.Data()));
00284 nq++;
00285 }
00286 gSystem->FreeDirectory(dirq);
00287
00288 if (nq > 0)
00289 dl->Add(new TParameter<Int_t>(TString::Format("%s/%s", dir.Data(), sess), nq));
00290 else
00291
00292 gSystem->Exec(TString::Format("%s -fr %s/%s", kRM, dir.Data(), sess));
00293 }
00294 gSystem->FreeDirectory(dirs);
00295
00296
00297 TIter nxq(sl, kIterBackward);
00298 Int_t nqkept = 0;
00299 TObjString *os = 0;
00300 while ((os = (TObjString *)nxq())) {
00301 if (nqkept < mxq) {
00302
00303 nqkept++;
00304 } else {
00305
00306 TNamed *nm = dynamic_cast<TNamed *>(hl->FindObject(os->GetName()));
00307 if (nm) {
00308 gSystem->Unlink(nm->GetTitle());
00309
00310 TString tdir(gSystem->DirName(nm->GetTitle()));
00311 tdir = gSystem->DirName(tdir.Data());
00312 TParameter<Int_t> *nq = dynamic_cast<TParameter<Int_t>*>(dl->FindObject(tdir));
00313 if (nq) {
00314 Int_t val = nq->GetVal();
00315 nq->SetVal(--val);
00316 if (nq->GetVal() <= 0)
00317
00318 gSystem->Exec(Form("%s -fr %s", kRM, tdir.Data()));
00319 }
00320 }
00321 }
00322 }
00323
00324
00325 delete sl;
00326 delete hl;
00327 delete dl;
00328
00329
00330 return 0;
00331 }
00332
00333
00334 Int_t TQueryResultManager::LockSession(const char *sessiontag, TProofLockPath **lck)
00335 {
00336
00337
00338
00339
00340
00341 if (strstr(sessiontag, fSessionTag))
00342 return 0;
00343
00344 if (!lck) {
00345 Error("LockSession","locker space undefined");
00346 return -1;
00347 }
00348 *lck = 0;
00349
00350
00351 TString stag = sessiontag;
00352 TRegexp re("session-.*-.*-.*-.*");
00353 Int_t i1 = stag.Index(re);
00354 if (i1 == kNPOS) {
00355 Error("LockSession","bad format: %s", sessiontag);
00356 return -1;
00357 }
00358 stag.ReplaceAll("session-","");
00359
00360
00361 Int_t i2 = stag.Index(":q");
00362 if (i2 != kNPOS)
00363 stag.Remove(i2);
00364
00365
00366 TString parlog = fSessionDir;
00367 parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
00368 parlog += stag;
00369 if (!gSystem->AccessPathName(parlog)) {
00370 PDB(kGlobal, 1)
00371 Info("LockSession", "parent still running: do nothing");
00372 return -1;
00373 }
00374
00375
00376 if (fLock) {
00377 TString qlock = fLock->GetName();
00378 qlock.ReplaceAll(fSessionTag, stag);
00379
00380 if (!gSystem->AccessPathName(qlock)) {
00381 *lck = new TProofLockPath(qlock);
00382 if (((*lck)->Lock()) < 0) {
00383 Error("LockSession","problems locking query lock file");
00384 SafeDelete(*lck);
00385 return -1;
00386 }
00387 }
00388 }
00389
00390
00391 return 0;
00392 }
00393
00394
00395 Int_t TQueryResultManager::CleanupSession(const char *sessiontag)
00396 {
00397
00398
00399 if (!sessiontag) {
00400 Error("CleanupSession","session tag undefined");
00401 return -1;
00402 }
00403
00404
00405 TString qdir = fQueryDir;
00406 qdir.ReplaceAll(Form("session-%s", fSessionTag.Data()), sessiontag);
00407 Int_t idx = qdir.Index(":q");
00408 if (idx != kNPOS)
00409 qdir.Remove(idx);
00410 if (gSystem->AccessPathName(qdir)) {
00411 Info("CleanupSession","query dir %s does not exist", qdir.Data());
00412 return -1;
00413 }
00414
00415 TProofLockPath *lck = 0;
00416 if (LockSession(sessiontag, &lck) == 0) {
00417
00418
00419 gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
00420
00421
00422 if (lck) {
00423 gSystem->Unlink(lck->GetName());
00424 SafeDelete(lck);
00425 }
00426
00427
00428 return 0;
00429 }
00430
00431
00432 Info("CleanupSession", "could not lock session %s", sessiontag);
00433 return -1;
00434 }
00435
00436
00437 void TQueryResultManager::SaveQuery(TProofQueryResult *qr, const char *fout)
00438 {
00439
00440
00441
00442 if (!qr || qr->IsDraw())
00443 return;
00444
00445
00446 TString querydir = Form("%s/%d",fQueryDir.Data(), qr->GetSeqNum());
00447
00448
00449 if (gSystem->AccessPathName(querydir))
00450 gSystem->MakeDirectory(querydir);
00451 TString ofn = fout ? fout : Form("%s/query-result.root", querydir.Data());
00452
00453
00454 TFile *f = TFile::Open(ofn, "RECREATE");
00455 if (f) {
00456 f->cd();
00457 if (!(qr->IsArchived()))
00458 qr->SetResultFile(ofn);
00459 qr->Write();
00460 f->Close();
00461 delete f;
00462 }
00463 }
00464
00465
00466 void TQueryResultManager::RemoveQuery(const char *queryref, TList *otherlist)
00467 {
00468
00469
00470
00471 PDB(kGlobal, 1)
00472 Info("RemoveQuery", "Enter");
00473
00474
00475 Int_t qry = -1;
00476 TString qdir;
00477 TProofQueryResult *pqr = LocateQuery(queryref, qry, qdir);
00478
00479 if (pqr) {
00480 if (qry > -1) {
00481 fQueries->Remove(pqr);
00482 if (otherlist) otherlist->Add(pqr);
00483 } else
00484 fPreviousQueries->Remove(pqr);
00485 delete pqr;
00486 pqr = 0;
00487 }
00488
00489
00490 PDB(kGlobal, 1)
00491 Info("RemoveQuery", "removing directory: %s", qdir.Data());
00492 gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
00493
00494
00495 return;
00496 }
00497
00498
00499 void TQueryResultManager::RemoveQuery(TQueryResult *qr, Bool_t soft)
00500 {
00501
00502
00503
00504 PDB(kGlobal, 1)
00505 Info("RemoveQuery", "Enter");
00506
00507 if (!qr)
00508 return;
00509
00510
00511 TString qdir = fQueryDir;
00512 qdir = qdir.Remove(qdir.Index(kPROOF_QueryDir)+strlen(kPROOF_QueryDir));
00513 qdir = Form("%s/%s/%d", qdir.Data(), qr->GetTitle(), qr->GetSeqNum());
00514 PDB(kGlobal, 1)
00515 Info("RemoveQuery", "removing directory: %s", qdir.Data());
00516 gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
00517
00518
00519 if (soft) {
00520 TQueryResult *qrn = qr->CloneInfo();
00521 Int_t idx = fQueries->IndexOf(qr);
00522 if (idx > -1)
00523 fQueries->AddAt(qrn, idx);
00524 else
00525 SafeDelete(qrn);
00526 }
00527 fQueries->Remove(qr);
00528 SafeDelete(qr);
00529
00530
00531 return;
00532 }
00533
00534
00535 TProofQueryResult *TQueryResultManager::LocateQuery(TString queryref, Int_t &qry, TString &qdir)
00536 {
00537
00538
00539
00540
00541 TProofQueryResult *pqr = 0;
00542
00543
00544
00545 qry = -1;
00546 if (queryref.IsDigit()) {
00547 qry = queryref.Atoi();
00548 } else if (queryref.Contains(fSessionTag)) {
00549 Int_t i1 = queryref.Index(":q");
00550 if (i1 != kNPOS) {
00551 queryref.Remove(0,i1+2);
00552 qry = queryref.Atoi();
00553 }
00554 }
00555
00556
00557 qdir = "";
00558 if (qry > -1) {
00559
00560 PDB(kGlobal, 1)
00561 Info("LocateQuery", "local query: %d", qry);
00562
00563
00564 if (fQueries) {
00565 TIter nxq(fQueries);
00566 while ((pqr = (TProofQueryResult *) nxq())) {
00567 if (pqr->GetSeqNum() == qry) {
00568
00569 qdir = Form("%s/%d", fQueryDir.Data(), qry);
00570 break;
00571 }
00572 }
00573 }
00574
00575 } else {
00576 PDB(kGlobal, 1)
00577 Info("LocateQuery", "previously processed query: %s", queryref.Data());
00578
00579
00580 if (fPreviousQueries) {
00581 TIter nxq(fPreviousQueries);
00582 while ((pqr = (TProofQueryResult *) nxq())) {
00583 if (queryref.Contains(pqr->GetTitle()) &&
00584 queryref.Contains(pqr->GetName()))
00585 break;
00586 }
00587 }
00588
00589 queryref.ReplaceAll(":q","/");
00590 qdir = fQueryDir;
00591 qdir = qdir.Remove(qdir.Index(kPROOF_QueryDir)+strlen(kPROOF_QueryDir));
00592 qdir = Form("%s/%s", qdir.Data(), queryref.Data());
00593 }
00594
00595
00596 return pqr;
00597 }
00598
00599
00600 Bool_t TQueryResultManager::FinalizeQuery(TProofQueryResult *pq,
00601 TProof *proof, TVirtualProofPlayer *player)
00602 {
00603
00604
00605 if (!pq || !proof || !player) {
00606 Warning("FinalizeQuery", "bad inputs: query = %p, proof = %p, player: %p ",
00607 pq ? pq : 0, proof ? proof : 0, player ? player : 0);
00608 return kFALSE;
00609 }
00610
00611 Int_t qn = pq->GetSeqNum();
00612 Long64_t np = player->GetEventsProcessed();
00613 TVirtualProofPlayer::EExitStatus est = player->GetExitStatus();
00614 TList *out = player->GetOutputList();
00615
00616 Float_t cpu = proof->GetCpuTime();
00617 Long64_t bytes = proof->GetBytesRead();
00618
00619 TQueryResult::EQueryStatus st = TQueryResult::kAborted;
00620
00621 PDB(kGlobal, 2) Info("FinalizeQuery","query #%d", qn);
00622
00623 PDB(kGlobal, 1)
00624 Info("FinalizeQuery","%.1f %lld", cpu, bytes);
00625
00626
00627 Bool_t save = kTRUE;
00628 switch (est) {
00629 case TVirtualProofPlayer::kAborted:
00630 PDB(kGlobal, 1)
00631 Info("FinalizeQuery", "query %d has been ABORTED <====", qn);
00632 out = 0;
00633 save = kFALSE;
00634 break;
00635 case TVirtualProofPlayer::kStopped:
00636 PDB(kGlobal, 1)
00637 Info("FinalizeQuery",
00638 "query %d has been STOPPED: %lld events processed", qn, np);
00639 st = TQueryResult::kStopped;
00640 break;
00641 case TVirtualProofPlayer::kFinished:
00642 PDB(kGlobal, 1)
00643 Info("FinalizeQuery",
00644 "query %d has been completed: %lld events processed", qn, np);
00645 st = TQueryResult::kCompleted;
00646 break;
00647 default:
00648 Warning("FinalizeQuery",
00649 "query %d: unknown exit status (%d)", qn, player->GetExitStatus());
00650 }
00651
00652
00653
00654 PDB(kGlobal, 1)
00655 Info("FinalizeQuery", "cpu: %.4f, saved: %.4f, master: %.4f",
00656 cpu, pq->GetUsedCPU() ,GetCpuTime());
00657 pq->SetProcessInfo(np, cpu - pq->GetUsedCPU() + GetCpuTime());
00658 pq->RecordEnd(st, out);
00659
00660
00661 AddLogFile(pq);
00662
00663
00664 return save;
00665 }
00666
00667
00668 void TQueryResultManager::SaveQuery(TProofQueryResult *pq, Int_t mxq)
00669 {
00670
00671
00672
00673 if (mxq > -1) {
00674 if (fQueries && fKeptQueries >= mxq) {
00675
00676 TQueryResult *fcom = 0;
00677 TQueryResult *farc = 0;
00678 TIter nxq(fQueries);
00679 TQueryResult *qr = 0;
00680 while (fKeptQueries >= mxq) {
00681 while ((qr = (TQueryResult *) nxq())) {
00682 if (qr->IsArchived()) {
00683 if (qr->GetOutputList() && !farc)
00684 farc = qr;
00685 } else if (qr->GetStatus() > TQueryResult::kRunning && !fcom) {
00686 fcom = qr;
00687 }
00688 if (farc && fcom)
00689 break;
00690 }
00691 if (!farc && !fcom) {
00692 break;
00693 } else if (farc) {
00694 RemoveQuery(farc, kTRUE);
00695 fKeptQueries--;
00696 farc = 0;
00697 } else if (fcom) {
00698 RemoveQuery(fcom);
00699 fKeptQueries--;
00700 fcom = 0;
00701 }
00702 }
00703 }
00704 if (fKeptQueries < mxq) {
00705 SaveQuery(pq);
00706 fKeptQueries++;
00707 } else {
00708 TString emsg;
00709 emsg.Form("Too many saved queries (%d): cannot save %s:%s",
00710 fKeptQueries, pq->GetTitle(), pq->GetName());
00711 if (gProofServ) {
00712 gProofServ->SendAsynMessage(emsg.Data());
00713 } else {
00714 Warning("SaveQuery", "%s", emsg.Data());
00715 }
00716 }
00717 } else {
00718 SaveQuery(pq);
00719 fKeptQueries++;
00720 }
00721 }