00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "TProofLite.h"
00024
00025 #ifdef WIN32
00026 # include <io.h>
00027 # include "snprintf.h"
00028 #endif
00029 #include "RConfigure.h"
00030 #include "TDSet.h"
00031 #include "TEnv.h"
00032 #include "TError.h"
00033 #include "TFile.h"
00034 #include "TFileCollection.h"
00035 #include "TFileInfo.h"
00036 #include "THashList.h"
00037 #include "TMessage.h"
00038 #include "TMonitor.h"
00039 #include "TObjString.h"
00040 #include "TPluginManager.h"
00041 #include "TDataSetManager.h"
00042 #include "TProofQueryResult.h"
00043 #include "TProofServ.h"
00044 #include "TQueryResultManager.h"
00045 #include "TROOT.h"
00046 #include "TServerSocket.h"
00047 #include "TSlave.h"
00048 #include "TSortedList.h"
00049 #include "TTree.h"
00050 #include "TVirtualProofPlayer.h"
00051 #include "TSelector.h"
00052
00053 ClassImp(TProofLite)
00054
00055 Int_t TProofLite::fgWrksMax = -2;
00056
00057
00058 TProofLite::TProofLite(const char *url, const char *conffile, const char *confdir,
00059 Int_t loglevel, const char *alias, TProofMgr *mgr)
00060 {
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072 fUrl.SetUrl(url);
00073
00074
00075 fManager = mgr;
00076
00077
00078 fServType = TProofMgr::kProofLite;
00079
00080
00081 fQueryMode = kSync;
00082
00083
00084 fMasterServ = kTRUE;
00085 SetBit(TProof::kIsClient);
00086 SetBit(TProof::kIsMaster);
00087
00088
00089 if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
00090
00091
00092 fUrl.SetProtocol("proof");
00093 fUrl.SetHost("__lite__");
00094 fUrl.SetPort(1093);
00095
00096
00097 if (strlen(fUrl.GetUser()) <= 0) {
00098
00099 UserGroup_t *pw = gSystem->GetUserInfo();
00100 if (pw) {
00101 fUrl.SetUser(pw->fUser);
00102 delete pw;
00103 }
00104 }
00105 fMaster = gSystem->HostName();
00106
00107
00108 ParseConfigField(conffile);
00109
00110
00111
00112
00113 if ((fNWorkers = GetNumberOfWorkers(url)) > 0) {
00114
00115 Printf(" +++ Starting PROOF-Lite with %d workers +++", fNWorkers);
00116
00117 Init(url, conffile, confdir, loglevel, alias);
00118 }
00119
00120
00121 if (!gROOT->GetListOfProofs()->FindObject(this))
00122 gROOT->GetListOfProofs()->Add(this);
00123
00124
00125 gProof = this;
00126 }
00127
00128
00129 Int_t TProofLite::Init(const char *, const char *conffile,
00130 const char *confdir, Int_t loglevel, const char *)
00131 {
00132
00133
00134
00135
00136
00137
00138
00139 R__ASSERT(gSystem);
00140
00141 fValid = kFALSE;
00142
00143 if (TestBit(TProof::kIsMaster)) {
00144
00145 if (!conffile || strlen(conffile) == 0)
00146 fConfFile = kPROOF_ConfFile;
00147 if (!confdir || strlen(confdir) == 0)
00148 fConfDir = kPROOF_ConfDir;
00149 } else {
00150 fConfDir = confdir;
00151 fConfFile = conffile;
00152 }
00153
00154
00155 if (CreateSandbox() != 0) {
00156 Error("Init", "could not create/assert sandbox for this session");
00157 return 0;
00158 }
00159
00160
00161 TString sockpathdir = gEnv->GetValue("ProofLite.SockPathDir", gSystem->TempDirectory());
00162 if (sockpathdir.IsNull()) sockpathdir = gSystem->TempDirectory();
00163 if (sockpathdir(sockpathdir.Length()-1) == '/') sockpathdir.Remove(sockpathdir.Length()-1);
00164 fSockPath.Form("%s/plite-%d", sockpathdir.Data(), gSystem->GetPid());
00165 if (fSockPath.Length() > 104) {
00166
00167 Error("Init", "Unix socket path '%s' is too long (%d bytes):",
00168 fSockPath.Data(), fSockPath.Length());
00169 Error("Init", "use 'ProofLite.SockPathDir' to create it under a directory different"
00170 " from '%s'", sockpathdir.Data());
00171 return 0;
00172 }
00173
00174 fLogLevel = loglevel;
00175 fProtocol = kPROOF_Protocol;
00176 fSendGroupView = kTRUE;
00177 fImage = "<local>";
00178 fIntHandler = 0;
00179 fStatus = 0;
00180 fRecvMessages = new TList;
00181 fRecvMessages->SetOwner(kTRUE);
00182 fSlaveInfo = 0;
00183 fChains = new TList;
00184 fAvailablePackages = 0;
00185 fEnabledPackages = 0;
00186 fEndMaster = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
00187 fInputData = 0;
00188 ResetBit(TProof::kNewInputData);
00189
00190
00191 fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
00192
00193 fProgressDialog = 0;
00194 fProgressDialogStarted = kFALSE;
00195
00196
00197 fRedirLog = kFALSE;
00198 if (TestBit(TProof::kIsClient)) {
00199 fLogFileName = Form("%s/session-%s.log", fWorkDir.Data(), GetName());
00200 if ((fLogFileW = fopen(fLogFileName.Data(), "w")) == 0)
00201 Error("Init", "could not create temporary logfile %s", fLogFileName.Data());
00202 if ((fLogFileR = fopen(fLogFileName.Data(), "r")) == 0)
00203 Error("Init", "could not open logfile %s for reading", fLogFileName.Data());
00204 }
00205 fLogToWindowOnly = kFALSE;
00206
00207 fCacheLock = new TProofLockPath(TString::Format("%s/%s%s", gSystem->TempDirectory(),
00208 kPROOF_CacheLockFile,
00209 TString(fCacheDir).ReplaceAll("/","%").Data()));
00210
00211
00212 fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s", gSystem->TempDirectory(),
00213 kPROOF_QueryLockFile, GetName(),
00214 TString(fQueryDir).ReplaceAll("/","%").Data()));
00215 fQueryLock->Lock();
00216
00217 fQMgr = new TQueryResultManager(fQueryDir, GetName(), fWorkDir,
00218 fQueryLock, fLogFileW);
00219
00220
00221 if (fQMgr && fQMgr->ApplyMaxQueries(10) != 0)
00222 Warning("Init", "problems applying fMaxQueries");
00223
00224 if (InitDataSetManager() != 0)
00225 Warning("Init", "problems initializing the dataset manager");
00226
00227
00228 fNotIdle = 0;
00229
00230
00231 fSync = kTRUE;
00232
00233
00234 fQueries = 0;
00235 fOtherQueries = 0;
00236 fDrawQueries = 0;
00237 fMaxDrawQueries = 1;
00238 fSeqNum = 0;
00239
00240
00241 fSessionID = -1;
00242
00243
00244 fWaitingSlaves = 0;
00245
00246
00247 fPlayer = 0;
00248 MakePlayer("lite");
00249
00250 fFeedback = new TList;
00251 fFeedback->SetOwner();
00252 fFeedback->SetName("FeedbackList");
00253 AddInput(fFeedback);
00254
00255
00256 fSlaves = new TSortedList(kSortDescending);
00257 fActiveSlaves = new TList;
00258 fInactiveSlaves = new TList;
00259 fUniqueSlaves = new TList;
00260 fAllUniqueSlaves = new TList;
00261 fNonUniqueMasters = new TList;
00262 fBadSlaves = new TList;
00263 fAllMonitor = new TMonitor;
00264 fActiveMonitor = new TMonitor;
00265 fUniqueMonitor = new TMonitor;
00266 fAllUniqueMonitor = new TMonitor;
00267 fCurrentMonitor = 0;
00268 fServSock = 0;
00269
00270
00271
00272 fForkStartup = kFALSE;
00273 if (gEnv->GetValue("ProofLite.ForkStartup", 0) != 0) {
00274 #ifndef WIN32
00275 fForkStartup = kTRUE;
00276 #else
00277 Warning("Init", "fork-based workers startup is not available on Windows - ignoring");
00278 #endif
00279 }
00280
00281 fPackageLock = 0;
00282 fEnabledPackagesOnClient = 0;
00283 fLoadedMacros = 0;
00284 fGlobalPackageDirList = 0;
00285 if (TestBit(TProof::kIsClient)) {
00286
00287
00288 TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
00289 if (globpack.Length() > 0) {
00290 Int_t ng = 0;
00291 Int_t from = 0;
00292 TString ldir;
00293 while (globpack.Tokenize(ldir, from, ":")) {
00294 if (gSystem->AccessPathName(ldir, kReadPermission)) {
00295 Warning("Init", "directory for global packages %s does not"
00296 " exist or is not readable", ldir.Data());
00297 } else {
00298
00299 TString key = Form("G%d", ng++);
00300 if (!fGlobalPackageDirList) {
00301 fGlobalPackageDirList = new THashList();
00302 fGlobalPackageDirList->SetOwner();
00303 }
00304 fGlobalPackageDirList->Add(new TNamed(key,ldir));
00305 }
00306 }
00307 }
00308
00309 TString lockpath(fPackageDir);
00310 lockpath.ReplaceAll("/", "%");
00311 lockpath.Insert(0, TString::Format("%s/%s", gSystem->TempDirectory(), kPROOF_PackageLockFile));
00312 fPackageLock = new TProofLockPath(lockpath.Data());
00313
00314 fEnabledPackagesOnClient = new TList;
00315 fEnabledPackagesOnClient->SetOwner();
00316 }
00317
00318
00319 if (SetupWorkers(0) != 0) {
00320 Error("Init", "problems setting up workers");
00321 return 0;
00322 }
00323
00324
00325 fValid = kTRUE;
00326
00327
00328 fAllMonitor->DeActivateAll();
00329
00330
00331 GoParallel(9999, kFALSE);
00332
00333
00334 SendInitialState();
00335
00336 SetActive(kFALSE);
00337
00338 if (IsValid()) {
00339
00340 ActivateAsyncInput();
00341
00342 SetRunStatus(TProof::kRunning);
00343 }
00344
00345 R__LOCKGUARD2(gROOTMutex);
00346 gROOT->GetListOfSockets()->Add(this);
00347
00348 return fActiveSlaves->GetSize();
00349 }
00350
00351 TProofLite::~TProofLite()
00352 {
00353
00354
00355
00356 RemoveWorkers(0);
00357
00358 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
00359
00360 gSystem->MakeDirectory(fQueryDir+"/.delete");
00361 gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
00362 }
00363
00364
00365 if (fQueryLock) {
00366 gSystem->Unlink(fQueryLock->GetName());
00367 fQueryLock->Unlock();
00368 }
00369
00370
00371 SafeDelete(fServSock);
00372 gSystem->Unlink(fSockPath);
00373 }
00374
00375
00376 Int_t TProofLite::GetNumberOfWorkers(const char *url)
00377 {
00378
00379
00380
00381
00382 Bool_t notify = kFALSE;
00383 if (fgWrksMax == -2) {
00384
00385 TString sysname = "system.rootrc";
00386 #ifdef ROOTETCDIR
00387 char *s = gSystem->ConcatFileName(ROOTETCDIR, sysname);
00388 #else
00389 TString etc = gRootDir;
00390 #ifdef WIN32
00391 etc += "\\etc";
00392 #else
00393 etc += "/etc";
00394 #endif
00395 char *s = gSystem->ConcatFileName(etc, sysname);
00396 #endif
00397 TEnv sysenv(0);
00398 sysenv.ReadFile(s, kEnvGlobal);
00399 fgWrksMax = sysenv.GetValue("ProofLite.MaxWorkers", -1);
00400
00401 notify = kTRUE;
00402 if (s) delete[] s;
00403 }
00404 if (fgWrksMax == 0) {
00405 ::Error("TProofLite::GetNumberOfWorkers",
00406 "PROOF-Lite disabled by the system administrator: sorry!");
00407 return 0;
00408 }
00409
00410 Int_t nWorkers = -1;
00411 if (url && strlen(url)) {
00412 TString o(url);
00413 Int_t in = o.Index("workers=");
00414 if (in != kNPOS) {
00415 o.Remove(0, in + strlen("workers="));
00416 while (!o.IsDigit())
00417 o.Remove(o.Length()-1);
00418 nWorkers = (!o.IsNull()) ? o.Atoi() : nWorkers;
00419 }
00420 }
00421 if (nWorkers <= 0) {
00422 nWorkers = gEnv->GetValue("ProofLite.Workers", -1);
00423 if (nWorkers <= 0) {
00424 SysInfo_t si;
00425 if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
00426 nWorkers = si.fCpus;
00427 } else {
00428
00429 nWorkers = 2;
00430 }
00431 if (notify) notify = kFALSE;
00432 }
00433 }
00434
00435 if (fgWrksMax > 0 && fgWrksMax < nWorkers) {
00436 if (notify)
00437 ::Warning("TProofLite::GetNumberOfWorkers", "number of PROOF-Lite workers limited by"
00438 " the system administrator to %d", fgWrksMax);
00439 nWorkers = fgWrksMax;
00440 }
00441
00442
00443 return nWorkers;
00444 }
00445
00446
00447 Int_t TProofLite::SetupWorkers(Int_t opt, TList *startedWorkers)
00448 {
00449
00450
00451
00452 if (!fServSock) {
00453 if ((fServSock = new TServerSocket(fSockPath))) {
00454 R__LOCKGUARD2(gROOTMutex);
00455
00456 gROOT->GetListOfSockets()->Remove(fServSock);
00457 }
00458 }
00459 if (!fServSock || !fServSock->IsValid()) {
00460 Error("SetupWorkers",
00461 "unable to create server socket for internal communications");
00462 SetBit(kInvalidObject);
00463 return -1;
00464 }
00465
00466
00467 TMonitor *mon = new TMonitor;
00468 mon->Add(fServSock);
00469
00470 TList started;
00471 TSlave *wrk = 0;
00472 Int_t nWrksDone = 0, nWrksTot = -1;
00473 TString fullord;
00474
00475 if (opt == 0) {
00476 nWrksTot = fForkStartup ? 1 : fNWorkers;
00477
00478
00479 Int_t ord = 0;
00480 for (; ord < nWrksTot; ord++) {
00481
00482
00483 fullord = Form("0.%d", ord);
00484
00485
00486 SetProofServEnv(fullord);
00487
00488
00489 if ((wrk = CreateSlave("lite", fullord, 100, fImage, fWorkDir)))
00490 started.Add(wrk);
00491
00492
00493 NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
00494
00495 }
00496 } else {
00497 if (!fForkStartup) {
00498 Warning("SetupWorkers", "standard startup: workers already started");
00499 return -1;
00500 }
00501 nWrksTot = fNWorkers - 1;
00502
00503
00504 TString clones;
00505 Int_t ord = 0;
00506 for (; ord < nWrksTot; ord++) {
00507
00508
00509 fullord.Form("0.%d", ord + 1);
00510 if (!clones.IsNull()) clones += " ";
00511 clones += fullord;
00512
00513
00514 if ((wrk = CreateSlave("lite", fullord, -1, fImage, fWorkDir)))
00515 started.Add(wrk);
00516
00517
00518 NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
00519
00520 }
00521
00522
00523 TMessage m(kPROOF_FORK);
00524 m << clones;
00525 Broadcast(m, kActive);
00526 }
00527
00528
00529 nWrksDone = 0;
00530 nWrksTot = started.GetSize();
00531 Int_t nSelects = 0;
00532 Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
00533 while (started.GetSize() > 0 && nSelects < nWrksTot) {
00534
00535
00536 TSocket *xs = mon->Select(to);
00537
00538
00539 nSelects++;
00540 if (xs == (TSocket *) -1) continue;
00541
00542
00543 TSocket *s = fServSock->Accept();
00544 if (s && s->IsValid()) {
00545
00546 TMessage *msg = 0;
00547 s->Recv(msg);
00548 if (msg) {
00549 TString ord;
00550 *msg >> ord;
00551
00552 if ((wrk = (TSlave *) started.FindObject(ord))) {
00553
00554 started.Remove(wrk);
00555
00556
00557 wrk->SetSocket(s);
00558
00559
00560
00561
00562 { R__LOCKGUARD2(gROOTMutex);
00563 gROOT->GetListOfSockets()->Remove(s);
00564 }
00565 if (wrk->IsValid()) {
00566
00567 wrk->SetInputHandler(new TProofInputHandler(this, wrk->GetSocket()));
00568
00569
00570 wrk->fParallel = 1;
00571
00572 wrk->SetupServ(TSlave::kSlave, 0);
00573 }
00574
00575
00576 if (wrk->IsValid()) {
00577 fSlaves->Add(wrk);
00578 if (opt == 1) fActiveSlaves->Add(wrk);
00579 fAllMonitor->Add(wrk->GetSocket());
00580
00581 if (startedWorkers) startedWorkers->Add(wrk);
00582
00583 NotifyStartUp("Setting up worker servers", ++nWrksDone, nWrksTot);
00584 } else {
00585
00586 fBadSlaves->Add(wrk);
00587 }
00588 }
00589 }
00590 }
00591 }
00592
00593
00594 mon->DeActivateAll();
00595 delete mon;
00596
00597
00598 if (!gROOT->IsBatch() && !fProgressDialog) {
00599 if ((fProgressDialog =
00600 gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
00601 if (fProgressDialog->LoadPlugin() == -1)
00602 fProgressDialog = 0;
00603 }
00604
00605 if (opt == 1) {
00606
00607 Collect(kActive);
00608
00609 SendGroupView();
00610
00611 SetParallel(9999, 0);
00612 }
00613
00614 return 0;
00615 }
00616
00617
00618 void TProofLite::NotifyStartUp(const char *action, Int_t done, Int_t tot)
00619 {
00620
00621
00622 Int_t frac = (Int_t) (done*100.)/tot;
00623 char msg[512] = {0};
00624 if (frac >= 100) {
00625 snprintf(msg, 512, "%s: OK (%d workers) \n",
00626 action, tot);
00627 } else {
00628 snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
00629 action, done, tot, frac);
00630 }
00631 fprintf(stderr,"%s", msg);
00632 }
00633
00634
00635 Int_t TProofLite::SetProofServEnv(const char *ord)
00636 {
00637
00638
00639
00640 if (!ord || strlen(ord) <= 0) {
00641 Error("SetProofServEnv", "ordinal string undefined");
00642 return -1;
00643 }
00644
00645
00646 TString rcfile(Form("%s/worker-%s.rootrc", fWorkDir.Data(), ord));
00647 FILE *frc = fopen(rcfile.Data(), "w");
00648 if (!frc) {
00649 Error("SetProofServEnv", "cannot open rc file %s", rcfile.Data());
00650 return -1;
00651 }
00652
00653
00654 fprintf(frc,"# The session working dir\n");
00655 fprintf(frc,"ProofServ.SessionDir: %s/worker-%s\n", fWorkDir.Data(), ord);
00656
00657
00658 fprintf(frc,"# Session tag\n");
00659 fprintf(frc,"ProofServ.SessionTag: %s\n", GetName());
00660
00661
00662 fprintf(frc,"# Proof Log/Debug level\n");
00663 fprintf(frc,"Proof.DebugLevel: %d\n", gDebug);
00664
00665
00666 fprintf(frc,"# Ordinal number\n");
00667 fprintf(frc,"ProofServ.Ordinal: %s\n", ord);
00668
00669
00670 fprintf(frc,"# ROOT Version tag\n");
00671 fprintf(frc,"ProofServ.RootVersionTag: %s\n", gROOT->GetVersion());
00672
00673
00674 TString sandbox = gEnv->GetValue("ProofServ.Sandbox", fSandbox);
00675 gSystem->ExpandPathName(sandbox);
00676 fprintf(frc,"# Users sandbox\n");
00677 fprintf(frc, "ProofServ.Sandbox: %s\n", sandbox.Data());
00678
00679
00680 fprintf(frc,"# Users cache\n");
00681 fprintf(frc, "ProofServ.CacheDir: %s\n", fCacheDir.Data());
00682
00683
00684 fprintf(frc,"# Users packages\n");
00685 fprintf(frc, "ProofServ.PackageDir: %s\n", fPackageDir.Data());
00686
00687
00688 fprintf(frc,"# Server image\n");
00689 fprintf(frc, "ProofServ.Image: %s\n", fImage.Data());
00690
00691
00692 fprintf(frc,"# Open socket\n");
00693 fprintf(frc, "ProofServ.OpenSock: %s\n", fSockPath.Data());
00694
00695
00696 fprintf(frc,"# Client Protocol\n");
00697 fprintf(frc, "ProofServ.ClientVersion: %d\n", kPROOF_Protocol);
00698
00699
00700 fclose(frc);
00701
00702
00703 TString envfile(Form("%s/worker-%s.env", fWorkDir.Data(), ord));
00704 FILE *fenv = fopen(envfile.Data(), "w");
00705 if (!fenv) {
00706 Error("SetProofServEnv", "cannot open env file %s", envfile.Data());
00707 return -1;
00708 }
00709
00710 #ifdef R__HAVE_CONFIG
00711 fprintf(fenv, "export ROOTSYS=%s\n", ROOTPREFIX);
00712 #else
00713 fprintf(fenv, "export ROOTSYS=%s\n", gSystem->Getenv("ROOTSYS"));
00714 #endif
00715
00716 #ifdef R__HAVE_CONFIG
00717 fprintf(fenv, "export ROOTCONFDIR=%s\n", ROOTETCDIR);
00718 #else
00719 fprintf(fenv, "export ROOTCONFDIR=%s\n", gSystem->Getenv("ROOTSYS"));
00720 #endif
00721
00722 fprintf(fenv, "export TMPDIR=%s\n", gSystem->TempDirectory());
00723
00724 TString logfile(Form("%s/worker-%s.log", fWorkDir.Data(), ord));
00725 fprintf(fenv, "export ROOTPROOFLOGFILE=%s\n", logfile.Data());
00726
00727 fprintf(fenv, "export ROOTRCFILE=%s\n", rcfile.Data());
00728
00729 fprintf(fenv, "export ROOTVERSIONTAG=%s\n", gROOT->GetVersion());
00730
00731 fprintf(fenv, "export ROOTPROOFLITE=%d\n", fNWorkers);
00732
00733 if (fgProofEnvList) {
00734 TString namelist;
00735 TIter nxenv(fgProofEnvList);
00736 TNamed *env = 0;
00737 while ((env = (TNamed *)nxenv())) {
00738 TString senv(env->GetTitle());
00739 ResolveKeywords(senv, logfile.Data());
00740 fprintf(fenv, "export %s=%s\n", env->GetName(), senv.Data());
00741 if (namelist.Length() > 0)
00742 namelist += ',';
00743 namelist += env->GetName();
00744 }
00745 fprintf(fenv, "export PROOF_ALLVARS=%s\n", namelist.Data());
00746 }
00747
00748
00749 fclose(fenv);
00750
00751
00752 return 0;
00753 }
00754
00755
00756 void TProofLite::ResolveKeywords(TString &s, const char *logfile)
00757 {
00758
00759
00760
00761 if (!logfile) return;
00762
00763
00764 if (s.Contains("<logfilewrk>") && logfile) {
00765 TString lfr(logfile);
00766 if (lfr.EndsWith(".log")) lfr.Remove(lfr.Last('.'));
00767 s.ReplaceAll("<logfilewrk>", lfr.Data());
00768 }
00769
00770
00771 if (gSystem->Getenv("USER") && s.Contains("<user>")) {
00772 s.ReplaceAll("<user>", gSystem->Getenv("USER"));
00773 }
00774
00775
00776 if (gSystem->Getenv("ROOTSYS") && s.Contains("<rootsys>")) {
00777 s.ReplaceAll("<rootsys>", gSystem->Getenv("ROOTSYS"));
00778 }
00779 }
00780
00781
00782 Int_t TProofLite::CreateSandbox()
00783 {
00784
00785
00786
00787 fSandbox = gEnv->GetValue("ProofLite.Sandbox", "");
00788 if (fSandbox.IsNull())
00789 fSandbox = gEnv->GetValue("Proof.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
00790 gSystem->ExpandPathName(fSandbox);
00791 if (AssertPath(fSandbox, kTRUE) != 0) return -1;
00792
00793
00794 fPackageDir = gEnv->GetValue("Proof.PackageDir", "");
00795 if (fPackageDir.IsNull())
00796 fPackageDir.Form("%s/%s", fSandbox.Data(), kPROOF_PackDir);
00797 if (AssertPath(fPackageDir, kTRUE) != 0) return -1;
00798
00799
00800 fCacheDir = gEnv->GetValue("Proof.CacheDir", "");
00801 if (fCacheDir.IsNull())
00802 fCacheDir.Form("%s/%s", fSandbox.Data(), kPROOF_CacheDir);
00803 if (AssertPath(fCacheDir, kTRUE) != 0) return -1;
00804
00805
00806 fDataSetDir = gEnv->GetValue("Proof.DataSetDir", "");
00807 if (fDataSetDir.IsNull())
00808 fDataSetDir.Form("%s/%s", fSandbox.Data(), kPROOF_DataSetDir);
00809 if (AssertPath(fDataSetDir, kTRUE) != 0) return -1;
00810
00811
00812 TString stag;
00813 stag.Form("%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
00814 SetName(stag.Data());
00815
00816
00817 TString sessdir(gSystem->WorkingDirectory());
00818 sessdir.ReplaceAll(gSystem->HomeDirectory(),"");
00819 sessdir.ReplaceAll("/","-");
00820 sessdir.Replace(0,1,"/",1);
00821 sessdir.Insert(0, fSandbox.Data());
00822
00823
00824 fWorkDir.Form("%s/session-%s", sessdir.Data(), stag.Data());
00825 if (AssertPath(fWorkDir, kTRUE) != 0) return -1;
00826
00827
00828 TString lastsess;
00829 lastsess.Form("%s/last-lite-session", sessdir.Data());
00830 gSystem->Unlink(lastsess);
00831 gSystem->Symlink(fWorkDir, lastsess);
00832
00833
00834 fQueryDir = gEnv->GetValue("Proof.QueryDir", "");
00835 if (fQueryDir.IsNull())
00836 fQueryDir.Form("%s/%s", sessdir.Data(), kPROOF_QueryDir);
00837 if (AssertPath(fQueryDir, kTRUE) != 0) return -1;
00838
00839
00840 CleanupSandbox();
00841
00842
00843 return 0;
00844 }
00845
00846
00847 void TProofLite::Print(Option_t *option) const
00848 {
00849
00850
00851 if (IsParallel())
00852 Printf("*** PROOF-Lite cluster (parallel mode, %d workers):", GetParallel());
00853 else
00854 Printf("*** PROOF-Lite cluster (sequential mode)");
00855
00856 Printf("Host name: %s", gSystem->HostName());
00857 Printf("User: %s", GetUser());
00858 TString ver(gROOT->GetVersion());
00859 if (gROOT->GetSvnRevision() > 0)
00860 ver += Form("|r%d", gROOT->GetSvnRevision());
00861 if (gSystem->Getenv("ROOTVERSIONTAG"))
00862 ver += Form("|%s", gSystem->Getenv("ROOTVERSIONTAG"));
00863 Printf("ROOT version|rev|tag: %s", ver.Data());
00864 Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
00865 gSystem->GetBuildCompilerVersion());
00866 Printf("Protocol version: %d", GetClientProtocol());
00867 Printf("Working directory: %s", gSystem->WorkingDirectory());
00868 Printf("Communication path: %s", fSockPath.Data());
00869 Printf("Log level: %d", GetLogLevel());
00870 Printf("Number of workers: %d", GetNumberOfSlaves());
00871 Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
00872 Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
00873 Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
00874 Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
00875 Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
00876 Printf("Total real time used (s): %.3f", GetRealTime());
00877 Printf("Total CPU time used (s): %.3f", GetCpuTime());
00878 if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
00879 Printf("List of workers:");
00880 TIter nextslave(fSlaves);
00881 while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
00882 if (sl->IsValid())
00883 sl->Print(option);
00884 }
00885 }
00886 }
00887
00888
00889 TProofQueryResult *TProofLite::MakeQueryResult(Long64_t nent, const char *opt,
00890 Long64_t fst, TDSet *dset,
00891 const char *selec)
00892 {
00893
00894
00895
00896 Int_t seqnum = -1;
00897 if (fQMgr) {
00898 fQMgr->IncrementSeqNum();
00899 seqnum = fQMgr->SeqNum();
00900 }
00901
00902
00903 TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt,
00904 fPlayer->GetInputList(), nent,
00905 fst, dset, selec,
00906 (dset ? dset->GetEntryList() : 0));
00907
00908 pqr->SetTitle(GetName());
00909
00910 return pqr;
00911 }
00912
00913
00914 void TProofLite::SetQueryRunning(TProofQueryResult *pq)
00915 {
00916
00917
00918
00919 fflush(fLogFileW);
00920 Int_t startlog = lseek(fileno(fLogFileW), (off_t) 0, SEEK_END);
00921
00922
00923 Printf(" ");
00924 Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
00925
00926
00927 TString parlist = "";
00928 TIter nxp(fEnabledPackagesOnClient);
00929 TObjString *os= 0;
00930 while ((os = (TObjString *)nxp())) {
00931 if (parlist.Length() <= 0)
00932 parlist = os->GetName();
00933 else
00934 parlist += Form(";%s",os->GetName());
00935 }
00936
00937
00938 pq->SetRunning(startlog, parlist, GetParallel());
00939
00940
00941 pq->SetProcessInfo(pq->GetEntries(), GetCpuTime(), GetBytesRead());
00942 }
00943
00944
00945 Long64_t TProofLite::DrawSelect(TDSet *dset, const char *varexp,
00946 const char *selection, Option_t *option,
00947 Long64_t nentries, Long64_t first)
00948 {
00949
00950
00951
00952
00953
00954 if (!IsValid()) return -1;
00955
00956
00957 if (!IsIdle()) {
00958 Info("DrawSelect","not idle, asynchronous Draw not supported");
00959 return -1;
00960 }
00961 TString opt(option);
00962 Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
00963 if (idx != kNPOS)
00964 opt.Replace(idx,4,"");
00965
00966
00967 fVarExp = varexp;
00968 fSelection = selection;
00969
00970 return Process(dset, "draw:", opt, nentries, first);
00971 }
00972
00973
00974 Long64_t TProofLite::Process(TDSet *dset, const char *selector, Option_t *option,
00975 Long64_t nentries, Long64_t first)
00976 {
00977
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987 fSync = (GetQueryMode(option) == kSync);
00988 if (!fSync) {
00989 Info("Process","asynchronous mode not yet supported in PROOF-Lite");
00990 return -1;
00991 }
00992
00993 if (!IsIdle()) {
00994
00995 Info("Process", "not idle: cannot accept queries");
00996 return -1;
00997 }
00998
00999
01000 if (IsIdle() && fRunningDSets && fRunningDSets->GetSize() > 0) {
01001 fRunningDSets->SetOwner(kTRUE);
01002 fRunningDSets->Delete();
01003 }
01004
01005 if (!IsValid() || !fQMgr || !fPlayer) {
01006 Error("Process", "invalid sesion or query-result manager undefined!");
01007 return -1;
01008 }
01009
01010
01011
01012 if (!fPlayer->GetInputList()->FindObject("PROOF_MaxSlavesPerNode"))
01013 SetParameter("PROOF_MaxSlavesPerNode", (Long_t)fNWorkers);
01014
01015 Bool_t hasNoData = (!dset || (dset && dset->TestBit(TDSet::kEmpty))) ? kTRUE : kFALSE;
01016
01017
01018
01019
01020 if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
01021 TString emsg;
01022 if (TProof::AssertDataSet(dset, fPlayer->GetInputList(), fDataSetManager, emsg) != 0) {
01023 Error("Process", "from AssertDataSet: %s", emsg.Data());
01024 return -1;
01025 }
01026 if (dset->GetListOfElements()->GetSize() == 0) {
01027 Error("Process", "no files to process!");
01028 return -1;
01029 }
01030 }
01031
01032 TString selec(selector), varexp, selection, objname;
01033
01034 if (selec.BeginsWith("draw:")) {
01035 varexp = fVarExp;
01036 selection = fSelection;
01037
01038 if (fPlayer->GetDrawArgs(varexp, selection, option, selec, objname) != 0) {
01039 Error("Process", "draw query: error parsing arguments '%s', '%s', '%s'",
01040 varexp.Data(), selection.Data(), option);
01041 return -1;
01042 }
01043 }
01044
01045
01046 TProofQueryResult *pq = MakeQueryResult(nentries, option, first, 0, selec);
01047
01048
01049 if (!(pq->IsDraw())) {
01050 if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
01051
01052 fQMgr->SaveQuery(pq);
01053 }
01054
01055
01056 fSeqNum = pq->GetSeqNum();
01057
01058
01059 SetQueryRunning(pq);
01060
01061
01062 if (!(pq->IsDraw()))
01063 fQMgr->SaveQuery(pq);
01064 else
01065 fQMgr->IncrementDrawQueries();
01066
01067
01068 if (!gROOT->IsBatch()) {
01069 Int_t dsz = (dset && dset->GetListOfElements()) ? dset->GetListOfElements()->GetSize() : -1;
01070 if (fProgressDialog &&
01071 !TestBit(kUsingSessionGui) && TestBit(kUseProgressDialog)) {
01072 if (!fProgressDialogStarted) {
01073 fProgressDialog->ExecPlugin(5, this, selec.Data(), dsz,
01074 first, nentries);
01075 fProgressDialogStarted = kTRUE;
01076 } else {
01077 ResetProgressDialog(selec.Data(), dsz, first, nentries);
01078 }
01079 }
01080 ResetBit(kUsingSessionGui);
01081 }
01082
01083
01084 if (!(pq->IsDraw()))
01085 fPlayer->AddQueryResult(pq);
01086
01087
01088 fPlayer->SetCurrentQuery(pq);
01089
01090
01091
01092 TNamed *qtag = (TNamed *) fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
01093 if (qtag) {
01094 qtag->SetTitle(Form("%s:%s",pq->GetTitle(),pq->GetName()));
01095 } else {
01096 TObject *o = fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
01097 if (o) fPlayer->GetInputList()->Remove(o);
01098 fPlayer->AddInput(new TNamed("PROOF_QueryTag",
01099 Form("%s:%s",pq->GetTitle(),pq->GetName())));
01100 }
01101
01102
01103 SetRunStatus(TProof::kRunning);
01104
01105
01106
01107 TSignalHandler *sh = 0;
01108 if (fSync) {
01109 if (gApplication)
01110 sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
01111 }
01112
01113
01114 TList *startedWorkers = 0;
01115 if (fForkStartup) {
01116 startedWorkers = new TList;
01117 startedWorkers->SetOwner(kFALSE);
01118 SetupWorkers(1, startedWorkers);
01119 }
01120
01121 Long64_t rv = 0;
01122 if (!(pq->IsDraw())) {
01123 rv = fPlayer->Process(dset, selec, option, nentries, first);
01124 } else {
01125 rv = fPlayer->DrawSelect(dset, varexp, selection, option, nentries, first);
01126 }
01127
01128 if (fSync) {
01129
01130
01131 if (fForkStartup && startedWorkers) {
01132 RemoveWorkers(startedWorkers);
01133 SafeDelete(startedWorkers);
01134 }
01135
01136
01137 if (sh)
01138 gSystem->AddSignalHandler(sh);
01139
01140
01141 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
01142 Bool_t abort = (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted)
01143 ? kTRUE : kFALSE;
01144 if (abort) fPlayer->StopProcess(kTRUE);
01145 Emit("StopProcess(Bool_t)", abort);
01146 }
01147
01148
01149 pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
01150
01151 QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
01152
01153 UpdateDialog();
01154
01155
01156
01157 if (rv == 0 && dset && pq->GetInputList()) {
01158 pq->GetInputList()->Add(dset);
01159 if (dset->GetEntryList())
01160 pq->GetInputList()->Add(dset->GetEntryList());
01161 }
01162
01163
01164 if (fDataSetManager && fPlayer->GetOutputList()) {
01165 TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
01166 if (psr) {
01167 if (RegisterDataSets(fPlayer->GetInputList(), fPlayer->GetOutputList()) != 0)
01168 Warning("ProcessNext", "problems registering produced datasets");
01169 fPlayer->GetOutputList()->Remove(psr);
01170 delete psr;
01171 }
01172 }
01173
01174
01175 AskStatistics();
01176 if (!(pq->IsDraw())) {
01177 if (fQMgr->FinalizeQuery(pq, this, fPlayer)) {
01178
01179 if (!strcmp(gEnv->GetValue("ProofLite.AutoSaveQueries", "off"), "on"))
01180 fQMgr->SaveQuery(pq, -1);
01181 }
01182 }
01183
01184
01185 if (fPlayer && fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
01186 if (fPlayer->GetListOfResults()) fPlayer->GetListOfResults()->Remove(pq);
01187 if (fQMgr) fQMgr->RemoveQuery(pq);
01188 } else {
01189
01190 QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
01191
01192 if (!(pq->IsDraw())) {
01193 if (fQMgr && fQMgr->Queries())
01194
01195 fQMgr->Queries()->Remove(pq);
01196 }
01197
01198 TString msg;
01199 msg.Form("Lite-0: all output objects have been merged ");
01200 fprintf(stderr, "%s\n", msg.Data());
01201 }
01202
01203 }
01204
01205
01206 return rv;
01207 }
01208
01209
01210 Int_t TProofLite::CreateSymLinks(TList *files)
01211 {
01212
01213
01214
01215 Int_t rc = 0;
01216 if (files) {
01217 TIter nxf(files);
01218 TObjString *os = 0;
01219 while ((os = (TObjString *) nxf())) {
01220
01221 TString tgt(os->GetName());
01222 gSystem->ExpandPathName(tgt);
01223
01224 TIter nxw(fActiveSlaves);
01225 TSlave *wrk = 0;
01226 while ((wrk = (TSlave *) nxw())) {
01227
01228 TString lnk = Form("%s/%s", wrk->GetWorkDir(), gSystem->BaseName(os->GetName()));
01229 gSystem->Unlink(lnk);
01230 if (gSystem->Symlink(tgt, lnk) != 0) {
01231 rc++;
01232 Warning("CreateSymLinks", "problems creating sym link: %s", lnk.Data());
01233 }
01234 }
01235 }
01236 } else {
01237 Warning("CreateSymLinks", "files list is undefined");
01238 }
01239
01240 return rc;
01241 }
01242
01243
01244 Int_t TProofLite::InitDataSetManager()
01245 {
01246
01247
01248
01249 fDataSetManager = 0;
01250
01251
01252 TString user("???"), group("default");
01253 UserGroup_t *pw = gSystem->GetUserInfo();
01254 if (pw) {
01255 user = pw->fUser;
01256 delete pw;
01257 }
01258
01259
01260 TPluginHandler *h = 0;
01261 TString dsm = gEnv->GetValue("Proof.DataSetManager", "");
01262 if (!dsm.IsNull()) {
01263
01264 if (gROOT->GetPluginManager()) {
01265
01266 h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
01267 if (h && h->LoadPlugin() != -1) {
01268
01269 fDataSetManager =
01270 reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, group.Data(),
01271 user.Data(), dsm.Data()));
01272 }
01273 }
01274 }
01275 if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
01276 Warning("InitDataSetManager", "dataset manager plug-in initialization failed");
01277 SafeDelete(fDataSetManager);
01278 }
01279
01280
01281 if (!fDataSetManager) {
01282 TString opts("Av:");
01283 TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
01284 if (dsetdir.IsNull()) {
01285
01286 dsetdir = fDataSetDir;
01287 opts += "Sb:";
01288 }
01289
01290 if (!h) {
01291 h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
01292 if (h && h->LoadPlugin() == -1) h = 0;
01293 }
01294 if (h) {
01295
01296 fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
01297 group.Data(), user.Data(),
01298 Form("dir:%s opt:%s", dsetdir.Data(), opts.Data())));
01299 }
01300 if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
01301 Warning("InitDataSetManager", "default dataset manager plug-in initialization failed");
01302 SafeDelete(fDataSetManager);
01303 }
01304 }
01305
01306 if (gDebug > 0 && fDataSetManager) {
01307 Info("InitDataSetManager", "datasetmgr Cq: %d, Ar: %d, Av: %d, Ti: %d, Sb: %d",
01308 fDataSetManager->TestBit(TDataSetManager::kCheckQuota),
01309 fDataSetManager->TestBit(TDataSetManager::kAllowRegister),
01310 fDataSetManager->TestBit(TDataSetManager::kAllowVerify),
01311 fDataSetManager->TestBit(TDataSetManager::kTrustInfo),
01312 fDataSetManager->TestBit(TDataSetManager::kIsSandbox));
01313 }
01314
01315
01316 return (fDataSetManager ? 0 : -1);
01317 }
01318
01319
01320 void TProofLite::ShowCache(Bool_t)
01321 {
01322
01323
01324
01325 if (!IsValid()) return;
01326
01327 Printf("*** Local file cache %s ***", fCacheDir.Data());
01328 gSystem->Exec(Form("%s %s", kLS, fCacheDir.Data()));
01329 }
01330
01331
01332 void TProofLite::ClearCache(const char *file)
01333 {
01334
01335
01336 if (!IsValid()) return;
01337
01338 fCacheLock->Lock();
01339 if (!file || strlen(file) <= 0) {
01340 gSystem->Exec(Form("%s %s/*", kRM, fCacheDir.Data()));
01341 } else {
01342 gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), file));
01343 }
01344 fCacheLock->Unlock();
01345 }
01346
01347
01348 Int_t TProofLite::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueOnly,
01349 TList *wrks)
01350 {
01351
01352
01353
01354
01355
01356
01357 if (!IsValid()) return -1;
01358
01359 if (!macro || !strlen(macro)) {
01360 Error("Load", "need to specify a macro name");
01361 return -1;
01362 }
01363
01364 TString macs(macro), mac;
01365 Int_t from = 0;
01366 while (macs.Tokenize(mac, from, ",")) {
01367 if (CopyMacroToCache(mac) < 0) return -1;
01368 }
01369
01370 return TProof::Load(macro, notOnClient, uniqueOnly, wrks);
01371 }
01372
01373
01374 Int_t TProofLite::CopyMacroToCache(const char *macro, Int_t headerRequired,
01375 TSelector **selector, Int_t opt)
01376 {
01377
01378
01379
01380
01381
01382
01383
01384
01385
01386
01387
01388
01389
01390
01391 TString cacheDir = fCacheDir;
01392 gSystem->ExpandPathName(cacheDir);
01393 TProofLockPath *cacheLock = fCacheLock;
01394
01395
01396 TString name = macro;
01397 TString acmode, args, io;
01398 name = gSystem->SplitAclicMode(name, acmode, args, io);
01399
01400 PDB(kGlobal,1)
01401 Info("CopyMacroToCache", "enter: names: %s, %s", macro, name.Data());
01402
01403
01404 if (gSystem->AccessPathName(name, kReadPermission)) {
01405 Error("CopyMacroToCache", "file %s not found or not readable", name.Data());
01406 return -1;
01407 }
01408
01409
01410 TString mp(TROOT::GetMacroPath());
01411 TString np(gSystem->DirName(name));
01412 if (!np.IsNull()) {
01413 np += ":";
01414 if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
01415 Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
01416 mp.Insert(ip, np);
01417 TROOT::SetMacroPath(mp);
01418 if (gDebug > 0)
01419 Info("CopyMacroToCache", "macro path set to '%s'", TROOT::GetMacroPath());
01420 }
01421 }
01422
01423
01424 Int_t dot = name.Last('.');
01425 const char *hext[] = { ".h", ".hh", "" };
01426 TString hname, checkedext;
01427 Int_t i = 0;
01428 while (strlen(hext[i]) > 0) {
01429 hname = name(0, dot);
01430 hname += hext[i];
01431 if (!gSystem->AccessPathName(hname, kReadPermission))
01432 break;
01433 if (!checkedext.IsNull()) checkedext += ",";
01434 checkedext += hext[i];
01435 hname = "";
01436 i++;
01437 }
01438 if (hname.IsNull() && headerRequired == 1) {
01439 Error("CopyMacroToCache", "header file for %s not found or not readable "
01440 "(checked extensions: %s)", name.Data(), checkedext.Data());
01441 return -1;
01442 }
01443 if (headerRequired < 0)
01444 hname = "";
01445
01446 cacheLock->Lock();
01447
01448
01449 Bool_t useCacheBinaries = kFALSE;
01450 TString cachedname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(name));
01451 TString cachedhname;
01452 if (!hname.IsNull())
01453 cachedhname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(hname));
01454 if (!gSystem->AccessPathName(cachedname, kReadPermission)) {
01455 TMD5 *md5 = TMD5::FileChecksum(name);
01456 TMD5 *md5cache = TMD5::FileChecksum(cachedname);
01457 if (md5 && md5cache && (*md5 == *md5cache))
01458 useCacheBinaries = kTRUE;
01459 if (!hname.IsNull()) {
01460 if (!gSystem->AccessPathName(cachedhname, kReadPermission)) {
01461 TMD5 *md5h = TMD5::FileChecksum(hname);
01462 TMD5 *md5hcache = TMD5::FileChecksum(cachedhname);
01463 if (md5h && md5hcache && (*md5h != *md5hcache))
01464 useCacheBinaries = kFALSE;
01465 SafeDelete(md5h);
01466 SafeDelete(md5hcache);
01467 }
01468 }
01469 SafeDelete(md5);
01470 SafeDelete(md5cache);
01471 }
01472
01473
01474 TString vername(Form(".%s", name.Data()));
01475 dot = vername.Last('.');
01476 if (dot != kNPOS)
01477 vername.Remove(dot);
01478 vername += ".binversion";
01479 Bool_t savever = kFALSE;
01480
01481
01482 if (useCacheBinaries) {
01483 TString v;
01484 Int_t rev = -1;
01485 FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "r");
01486 if (f) {
01487 TString r;
01488 v.Gets(f);
01489 r.Gets(f);
01490 rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
01491 fclose(f);
01492 }
01493 if (!f || v != gROOT->GetVersion() ||
01494 (gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision()))
01495 useCacheBinaries = kFALSE;
01496 }
01497
01498
01499 TString binname = gSystem->BaseName(name);
01500 dot = binname.Last('.');
01501 if (dot != kNPOS)
01502 binname.Replace(dot,1,"_");
01503 binname += ".";
01504
01505 FileStat_t stlocal, stcache;
01506 void *dirp = 0;
01507 if (useCacheBinaries) {
01508
01509
01510 dirp = gSystem->OpenDirectory(cacheDir);
01511 if (dirp) {
01512 const char *e = 0;
01513 while ((e = gSystem->GetDirEntry(dirp))) {
01514 if (!strncmp(e, binname.Data(), binname.Length())) {
01515 TString fncache = Form("%s/%s", cacheDir.Data(), e);
01516 Bool_t docp = kTRUE;
01517 if (!gSystem->GetPathInfo(fncache, stcache)) {
01518 Int_t rc = gSystem->GetPathInfo(e, stlocal);
01519 if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
01520 docp = kFALSE;
01521
01522 if (docp) {
01523 gSystem->Exec(Form("%s %s", kRM, e));
01524 PDB(kGlobal,2)
01525 Info("CopyMacroToCache",
01526 "retrieving %s from cache", fncache.Data());
01527 gSystem->Exec(Form("%s %s %s", kCP, fncache.Data(), e));
01528 }
01529 }
01530 }
01531 }
01532 gSystem->FreeDirectory(dirp);
01533 }
01534 }
01535 cacheLock->Unlock();
01536
01537 if (selector) {
01538
01539 if (!(*selector = TSelector::GetSelector(macro))) {
01540 Error("CopyMacroToCache", "could not create a selector from %s", macro);
01541 return -1;
01542 }
01543 }
01544
01545 cacheLock->Lock();
01546
01547 TList *cachedFiles = new TList;
01548
01549 dirp = gSystem->OpenDirectory(".");
01550 if (dirp) {
01551 const char *e = 0;
01552 while ((e = gSystem->GetDirEntry(dirp))) {
01553 if (!strncmp(e, binname.Data(), binname.Length())) {
01554 Bool_t docp = kTRUE;
01555 if (!gSystem->GetPathInfo(e, stlocal)) {
01556 TString fncache = Form("%s/%s", cacheDir.Data(), e);
01557 Int_t rc = gSystem->GetPathInfo(fncache, stcache);
01558 if (rc == 0 && (stlocal.fMtime <= stcache.fMtime))
01559 docp = kFALSE;
01560
01561 if (docp) {
01562 gSystem->Exec(Form("%s %s", kRM, fncache.Data()));
01563 PDB(kGlobal,2)
01564 Info("CopyMacroToCache","caching %s ...", e);
01565 gSystem->Exec(Form("%s %s %s", kCP, e, fncache.Data()));
01566 savever = kTRUE;
01567 }
01568 if (opt & kCpBin)
01569 cachedFiles->Add(new TObjString(fncache.Data()));
01570 }
01571 }
01572 }
01573 gSystem->FreeDirectory(dirp);
01574 }
01575
01576
01577 if (savever) {
01578 FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "w");
01579 if (f) {
01580 fputs(gROOT->GetVersion(), f);
01581 fputs(Form("\n%d",gROOT->GetSvnRevision()), f);
01582 fclose(f);
01583 }
01584 }
01585
01586
01587 if (!useCacheBinaries) {
01588 gSystem->Exec(Form("%s %s", kRM, cachedname.Data()));
01589 PDB(kGlobal,2)
01590 Info("CopyMacroToCache","caching %s ...", name.Data());
01591 gSystem->Exec(Form("%s %s %s", kCP, name.Data(), cachedname.Data()));
01592 if (!hname.IsNull()) {
01593 gSystem->Exec(Form("%s %s", kRM, cachedhname.Data()));
01594 PDB(kGlobal,2)
01595 Info("CopyMacroToCache","caching %s ...", hname.Data());
01596 gSystem->Exec(Form("%s %s %s", kCP, hname.Data(), cachedhname.Data()));
01597 }
01598 }
01599 if (opt & kCp) {
01600 cachedFiles->Add(new TObjString(cachedname.Data()));
01601 if (!hname.IsNull())
01602 cachedFiles->Add(new TObjString(cachedhname.Data()));
01603 }
01604
01605 cacheLock->Unlock();
01606
01607
01608 if (opt & (kCp | kCpBin))
01609 CreateSymLinks(cachedFiles);
01610
01611 cachedFiles->SetOwner();
01612 delete cachedFiles;
01613
01614 return 0;
01615 }
01616
01617
01618 Int_t TProofLite::CleanupSandbox()
01619 {
01620
01621
01622 Int_t maxold = gEnv->GetValue("Proof.MaxOldSessions", 10);
01623
01624 if (maxold < 0) return 0;
01625
01626 TSortedList *olddirs = new TSortedList(kFALSE);
01627
01628 TString sandbox = gSystem->DirName(fWorkDir.Data());
01629
01630 void *dirp = gSystem->OpenDirectory(sandbox);
01631 if (dirp) {
01632 const char *e = 0;
01633 while ((e = gSystem->GetDirEntry(dirp))) {
01634 if (!strncmp(e, "session-", 8) && !strstr(e, GetName())) {
01635 TString d(e);
01636 Int_t i = d.Last('-');
01637 if (i != kNPOS) d.Remove(i);
01638 i = d.Last('-');
01639 if (i != kNPOS) d.Remove(0,i+1);
01640 TString path = Form("%s/%s", sandbox.Data(), e);
01641 olddirs->Add(new TNamed(d, path));
01642 }
01643 }
01644 gSystem->FreeDirectory(dirp);
01645 }
01646
01647
01648 Bool_t notify = kTRUE;
01649 while (olddirs->GetSize() > maxold) {
01650 if (notify && gDebug > 0)
01651 Printf("Cleaning sandbox at: %s", sandbox.Data());
01652 notify = kFALSE;
01653 TNamed *n = (TNamed *) olddirs->Last();
01654 if (n) {
01655 gSystem->Exec(Form("%s %s", kRM, n->GetTitle()));
01656 olddirs->Remove(n);
01657 delete n;
01658 }
01659 }
01660
01661
01662 olddirs->SetOwner();
01663 delete olddirs;
01664
01665
01666 return 0;
01667 }
01668
01669
01670 TList *TProofLite::GetListOfQueries(Option_t *opt)
01671 {
01672
01673
01674 Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
01675
01676 TList *ql = new TList;
01677 Int_t ntot = 0, npre = 0, ndraw= 0;
01678 if (fQMgr) {
01679 if (all) {
01680
01681 TString qdir = fQueryDir;
01682 Int_t idx = qdir.Index("session-");
01683 if (idx != kNPOS)
01684 qdir.Remove(idx);
01685 fQMgr->ScanPreviousQueries(qdir);
01686
01687 if (fQMgr->PreviousQueries()) {
01688 TIter nxq(fQMgr->PreviousQueries());
01689 TProofQueryResult *pqr = 0;
01690 while ((pqr = (TProofQueryResult *)nxq())) {
01691 ntot++;
01692 pqr->fSeqNum = ntot;
01693 ql->Add(pqr);
01694 }
01695 }
01696 }
01697
01698 npre = ntot;
01699 if (fQMgr->Queries()) {
01700
01701 TIter nxq(fQMgr->Queries());
01702 TProofQueryResult *pqr = 0;
01703 TQueryResult *pqm = 0;
01704 while ((pqr = (TProofQueryResult *)nxq())) {
01705 ntot++;
01706 pqm = pqr->CloneInfo();
01707 pqm->fSeqNum = ntot;
01708 ql->Add(pqm);
01709 }
01710 }
01711
01712 ndraw = fQMgr->DrawQueries();
01713 }
01714
01715 fOtherQueries = npre;
01716 fDrawQueries = ndraw;
01717 if (fQueries) {
01718 fQueries->Delete();
01719 delete fQueries;
01720 fQueries = 0;
01721 }
01722 fQueries = ql;
01723
01724
01725 return fQueries;
01726 }
01727
01728
01729 Bool_t TProofLite::RegisterDataSet(const char *uri,
01730 TFileCollection *dataSet, const char* optStr)
01731 {
01732
01733
01734
01735
01736
01737
01738
01739
01740 if (!fDataSetManager) {
01741 Info("RegisterDataSet", "dataset manager not available");
01742 return kFALSE;
01743 }
01744
01745 if (!uri || strlen(uri) <= 0) {
01746 Info("RegisterDataSet", "specifying a dataset name is mandatory");
01747 return kFALSE;
01748 }
01749
01750 Bool_t result = kTRUE;
01751 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
01752
01753 if (!dataSet || dataSet->GetList()->GetSize() == 0) {
01754 Error("RegisterDataSet", "can not save an empty list.");
01755 result = kFALSE;
01756 }
01757
01758 result = (fDataSetManager->RegisterDataSet(uri, dataSet, optStr) == 0)
01759 ? kTRUE : kFALSE;
01760 } else {
01761 Info("RegisterDataSets", "dataset registration not allowed");
01762 result = kFALSE;
01763 }
01764
01765 if (!result)
01766 Error("RegisterDataSet", "dataset was not saved");
01767
01768
01769 return result;
01770 }
01771
01772
01773 Int_t TProofLite::SetDataSetTreeName(const char *dataset, const char *treename)
01774 {
01775
01776
01777
01778
01779 if (!fDataSetManager) {
01780 Info("ExistsDataSet", "dataset manager not available");
01781 return kFALSE;
01782 }
01783
01784 if (!dataset || strlen(dataset) <= 0) {
01785 Info("SetDataSetTreeName", "specifying a dataset name is mandatory");
01786 return -1;
01787 }
01788
01789 if (!treename || strlen(treename) <= 0) {
01790 Info("SetDataSetTreeName", "specifying a tree name is mandatory");
01791 return -1;
01792 }
01793
01794 TUri uri(dataset);
01795 TString fragment(treename);
01796 if (!fragment.BeginsWith("/")) fragment.Insert(0, "/");
01797 uri.SetFragment(fragment);
01798
01799 return fDataSetManager->ScanDataSet(uri.GetUri().Data(),
01800 (UInt_t)TDataSetManager::kSetDefaultTree);
01801 }
01802
01803
01804 Bool_t TProofLite::ExistsDataSet(const char *uri)
01805 {
01806
01807
01808 if (!fDataSetManager) {
01809 Info("ExistsDataSet", "dataset manager not available");
01810 return kFALSE;
01811 }
01812
01813 if (!uri || strlen(uri) <= 0) {
01814 Error("ExistsDataSet", "dataset name missing");
01815 return kFALSE;
01816 }
01817
01818
01819 return fDataSetManager->ExistsDataSet(uri);
01820 }
01821
01822
01823 TMap *TProofLite::GetDataSets(const char *uri, const char *srvex)
01824 {
01825
01826
01827 if (!fDataSetManager) {
01828 Info("GetDataSets", "dataset manager not available");
01829 return (TMap *)0;
01830 }
01831
01832
01833 if (srvex && strlen(srvex) > 0) {
01834 return fDataSetManager->GetSubDataSets(uri, srvex);
01835 } else {
01836 UInt_t opt = (UInt_t)TDataSetManager::kExport;
01837 return fDataSetManager->GetDataSets(uri, opt);
01838 }
01839 }
01840
01841
01842 void TProofLite::ShowDataSets(const char *uri, const char *opt)
01843 {
01844
01845
01846
01847 if (!fDataSetManager) {
01848 Info("GetDataSet", "dataset manager not available");
01849 return;
01850 }
01851
01852 fDataSetManager->ShowDataSets(uri, opt);
01853 }
01854
01855
01856 TFileCollection *TProofLite::GetDataSet(const char *uri, const char *)
01857 {
01858
01859
01860
01861 if (!fDataSetManager) {
01862 Info("GetDataSet", "dataset manager not available");
01863 return (TFileCollection *)0;
01864 }
01865
01866 if (!uri || strlen(uri) <= 0) {
01867 Info("GetDataSet", "specifying a dataset name is mandatory");
01868 return 0;
01869 }
01870
01871
01872 return fDataSetManager->GetDataSet(uri);
01873 }
01874
01875
01876 Int_t TProofLite::RemoveDataSet(const char *uri, const char *)
01877 {
01878
01879
01880
01881 if (!fDataSetManager) {
01882 Info("RemoveDataSet", "dataset manager not available");
01883 return -1;
01884 }
01885
01886 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
01887 if (!fDataSetManager->RemoveDataSet(uri)) {
01888
01889 return -1;
01890 }
01891 } else {
01892 Info("RemoveDataSet", "dataset creation / removal not allowed");
01893 return -1;
01894 }
01895
01896
01897 return 0;
01898 }
01899
01900
01901 Int_t TProofLite::VerifyDataSet(const char *uri, const char *)
01902 {
01903
01904
01905
01906 if (!fDataSetManager) {
01907 Info("VerifyDataSet", "dataset manager not available");
01908 return -1;
01909 }
01910
01911 Int_t rc = -1;
01912 if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
01913 rc = fDataSetManager->ScanDataSet(uri);
01914 } else {
01915 Info("VerifyDataSet", "dataset verification not allowed");
01916 return -1;
01917 }
01918
01919
01920 return rc;
01921 }
01922
01923
01924 void TProofLite::ClearDataSetCache(const char *dataset)
01925 {
01926
01927
01928 if (fDataSetManager) fDataSetManager->ClearCache(dataset);
01929
01930 return;
01931 }
01932
01933
01934 void TProofLite::ShowDataSetCache(const char *dataset)
01935 {
01936
01937
01938
01939 if (fDataSetManager) fDataSetManager->ShowCache(dataset);
01940
01941 return;
01942 }
01943
01944
01945 void TProofLite::SendInputDataFile()
01946 {
01947
01948
01949
01950
01951
01952
01953
01954
01955
01956
01957 TString dataFile;
01958 PrepareInputDataFile(dataFile);
01959
01960
01961 if (dataFile.Length() > 0) {
01962
01963 if (!dataFile.BeginsWith(fCacheDir)) {
01964
01965 TString dst;
01966 dst.Form("%s/%s", fCacheDir.Data(), gSystem->BaseName(dataFile));
01967
01968 if (!gSystem->AccessPathName(dst))
01969 gSystem->Unlink(dst);
01970
01971 gSystem->CopyFile(dataFile, dst);
01972 }
01973
01974
01975 AddInput(new TNamed("PROOF_InputDataFile", Form("%s", gSystem->BaseName(dataFile))));
01976 }
01977 }
01978
01979
01980 Int_t TProofLite::Remove(const char *ref, Bool_t all)
01981 {
01982
01983
01984 PDB(kGlobal, 1)
01985 Info("Remove", "Enter: %s, %d", ref, all);
01986
01987 if (all) {
01988
01989 if (fPlayer)
01990 fPlayer->RemoveQueryResult(ref);
01991 }
01992
01993 TString queryref(ref);
01994
01995 if (queryref == "cleanupdir") {
01996
01997
01998 Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
01999
02000
02001 Info("Remove", "%d directories removed", nd);
02002
02003 return 0;
02004 }
02005
02006
02007 if (fQMgr) {
02008 TProofLockPath *lck = 0;
02009 if (fQMgr->LockSession(queryref, &lck) == 0) {
02010
02011
02012 fQMgr->RemoveQuery(queryref, 0);
02013
02014
02015 if (lck) {
02016 gSystem->Unlink(lck->GetName());
02017 SafeDelete(lck);
02018 }
02019
02020
02021 return 0;
02022 }
02023 } else {
02024 Warning("Remove", "query result manager undefined!");
02025 }
02026
02027
02028 Info("Remove",
02029 "query %s could not be removed (unable to lock session)", queryref.Data());
02030
02031
02032 return -1;
02033 }
02034
02035
02036 TTree *TProofLite::GetTreeHeader(TDSet *dset)
02037 {
02038
02039
02040
02041 TTree *t = 0;
02042 if (!dset) {
02043 Error("GetTreeHeader", "undefined TDSet");
02044 return t;
02045 }
02046
02047 dset->Reset();
02048 TDSetElement *e = dset->Next();
02049 Long64_t entries = 0;
02050 TFile *f = 0;
02051 if (!e) {
02052 PDB(kGlobal, 1) Info("GetTreeHeader", "empty TDSet");
02053 } else {
02054 f = TFile::Open(e->GetFileName());
02055 t = 0;
02056 if (f) {
02057 t = (TTree*) f->Get(e->GetObjName());
02058 if (t) {
02059 t->SetMaxVirtualSize(0);
02060 t->DropBaskets();
02061 entries = t->GetEntries();
02062
02063
02064 while ((e = dset->Next()) != 0) {
02065 TFile *f1 = TFile::Open(e->GetFileName());
02066 if (f1) {
02067 TTree *t1 = (TTree*) f1->Get(e->GetObjName());
02068 if (t1) {
02069 entries += t1->GetEntries();
02070 delete t1;
02071 }
02072 delete f1;
02073 }
02074 }
02075 t->SetMaxEntryLoop(entries);
02076 }
02077 }
02078 }
02079
02080 return t;
02081 }
02082
02083
02084 void TProofLite::FindUniqueSlaves()
02085 {
02086
02087
02088
02089
02090
02091
02092
02093
02094 fUniqueSlaves->Clear();
02095 fUniqueMonitor->RemoveAll();
02096 fAllUniqueSlaves->Clear();
02097 fAllUniqueMonitor->RemoveAll();
02098 fNonUniqueMasters->Clear();
02099
02100 if (fActiveSlaves->GetSize() <= 0) return;
02101
02102 TSlave *wrk = dynamic_cast<TSlave*>(fActiveSlaves->First());
02103 if (!wrk) {
02104 Error("FindUniqueSlaves", "first object in fActiveSlaves not a TSlave: embarrasing!");
02105 return;
02106 }
02107 fUniqueSlaves->Add(wrk);
02108 fAllUniqueSlaves->Add(wrk);
02109 fUniqueMonitor->Add(wrk->GetSocket());
02110 fAllUniqueMonitor->Add(wrk->GetSocket());
02111
02112
02113 fUniqueMonitor->DeActivateAll();
02114 fAllUniqueMonitor->DeActivateAll();
02115 }
02116
02117
02118 Int_t TProofLite::RegisterDataSets(TList *in, TList *out)
02119 {
02120
02121
02122 PDB(kDataset, 1) Info("RegisterDataSets", "enter");
02123
02124 if (!in || !out) return 0;
02125
02126 TString msg;
02127 TIter nxo(out);
02128 TObject *o = 0;
02129 while ((o = nxo())) {
02130
02131 TFileCollection *ds = dynamic_cast<TFileCollection*> (o);
02132 if (ds) {
02133
02134 TNamed *fcn = 0;
02135 TString tag = TString::Format("DATASET_%s", ds->GetName());
02136 if (!(fcn = (TNamed *) out->FindObject(tag))) continue;
02137
02138 TString regopt(fcn->GetTitle());
02139
02140 if (fDataSetManager) {
02141 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
02142
02143 if (ds->GetList()->GetSize() > 0) {
02144
02145 msg.Form("Registering and verifying dataset '%s' ... ", ds->GetName());
02146 Info("RegisterDataSets", "%s", msg.Data());
02147
02148 Bool_t allowVerify = fDataSetManager->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
02149 if (regopt.Contains("V") && !allowVerify)
02150 fDataSetManager->SetBit(TDataSetManager::kAllowVerify);
02151 Int_t rc = fDataSetManager->RegisterDataSet(ds->GetName(), ds, regopt);
02152
02153 if (regopt.Contains("V") && !allowVerify)
02154 fDataSetManager->ResetBit(TDataSetManager::kAllowVerify);
02155 if (rc != 0) {
02156 Warning("RegisterDataSets",
02157 "failure registering dataset '%s'", ds->GetName());
02158 msg.Form("Registering and verifying dataset '%s' ... failed! See log for more details", ds->GetName());
02159 } else {
02160 Info("RegisterDataSets", "dataset '%s' successfully registered", ds->GetName());
02161 msg.Form("Registering and verifying dataset '%s' ... OK", ds->GetName());
02162 }
02163 Info("RegisterDataSets", "%s", msg.Data());
02164
02165 PDB(kDataset, 2) {
02166 Info("RegisterDataSets","printing collection");
02167 ds->Print("F");
02168 }
02169 } else {
02170 Warning("RegisterDataSets", "collection '%s' is empty", o->GetName());
02171 }
02172 } else {
02173 Info("RegisterDataSets", "dataset registration not allowed");
02174 return -1;
02175 }
02176 } else {
02177 Error("RegisterDataSets", "dataset manager is undefined!");
02178 return -1;
02179 }
02180
02181 out->Remove(fcn);
02182 SafeDelete(fcn);
02183 }
02184 }
02185
02186 PDB(kDataset, 1) Info("RegisterDataSets", "exit");
02187
02188 return 0;
02189 }