00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "RConfigure.h"
00024 #include "RConfig.h"
00025 #include "Riostream.h"
00026
00027 #ifdef WIN32
00028 #include <process.h>
00029 #include <io.h>
00030 #include "snprintf.h"
00031 typedef long off_t;
00032 #endif
00033 #include <errno.h>
00034 #include <time.h>
00035 #include <fcntl.h>
00036 #include <sys/types.h>
00037 #include <sys/stat.h>
00038 #ifndef WIN32
00039 #include <sys/wait.h>
00040 #endif
00041 #include <cstdlib>
00042
00043
00044 #include <exception>
00045 #include <new>
00046
00047 #if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
00048 (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
00049 (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
00050 #include <sys/file.h>
00051 #define lockf(fd, op, sz) flock((fd), (op))
00052 #ifndef F_LOCK
00053 #define F_LOCK (LOCK_EX | LOCK_NB)
00054 #endif
00055 #ifndef F_ULOCK
00056 #define F_ULOCK LOCK_UN
00057 #endif
00058 #endif
00059
00060 #include "TProofServ.h"
00061 #include "TDSetProxy.h"
00062 #include "TEnv.h"
00063 #include "TError.h"
00064 #include "TEventList.h"
00065 #include "TEntryList.h"
00066 #include "TException.h"
00067 #include "TFile.h"
00068 #include "THashList.h"
00069 #include "TInterpreter.h"
00070 #include "TKey.h"
00071 #include "TMessage.h"
00072 #include "TVirtualPerfStats.h"
00073 #include "TProofDebug.h"
00074 #include "TProof.h"
00075 #include "TVirtualProofPlayer.h"
00076 #include "TProofQueryResult.h"
00077 #include "TQueryResultManager.h"
00078 #include "TRegexp.h"
00079 #include "TROOT.h"
00080 #include "TSocket.h"
00081 #include "TStopwatch.h"
00082 #include "TSystem.h"
00083 #include "TTimeStamp.h"
00084 #include "TUrl.h"
00085 #include "TPluginManager.h"
00086 #include "TObjString.h"
00087 #include "compiledata.h"
00088 #include "TProofResourcesStatic.h"
00089 #include "TProofNodeInfo.h"
00090 #include "TFileInfo.h"
00091 #include "TMutex.h"
00092 #include "TClass.h"
00093 #include "TSQLServer.h"
00094 #include "TSQLResult.h"
00095 #include "TSQLRow.h"
00096 #include "TPRegexp.h"
00097 #include "TParameter.h"
00098 #include "TMap.h"
00099 #include "TSortedList.h"
00100 #include "TParameter.h"
00101 #include "TFileCollection.h"
00102 #include "TLockFile.h"
00103 #include "TDataSetManagerFile.h"
00104 #include "TProofProgressStatus.h"
00105 #include "TServerSocket.h"
00106 #include "TMonitor.h"
00107 #include "TFunction.h"
00108 #include "TMethodArg.h"
00109 #include "TMethodCall.h"
00110
00111
00112 TProofServ *gProofServ = 0;
00113
00114
00115 static volatile Int_t gProofServDebug = 1;
00116
00117
00118 Int_t TProofServ::fgLogToSysLog = 0;
00119 TString TProofServ::fgSysLogService("proof");
00120 TString TProofServ::fgSysLogEntity("undef:default");
00121
00122
00123 FILE *TProofServ::fgErrorHandlerFile = 0;
00124
00125
00126 Int_t TProofServ::fgRecursive = 0;
00127
00128
00129 TString TProofServ::fgLastMsg("<undef>");
00130
00131
00132 Long_t TProofServ::fgVirtMemMax = -1;
00133 Long_t TProofServ::fgResMemMax = -1;
00134 Float_t TProofServ::fgMemHWM = 0.80;
00135 Float_t TProofServ::fgMemStop = 0.95;
00136
00137
00138
00139 class TProofServTerminationHandler : public TSignalHandler {
00140 TProofServ *fServ;
00141 public:
00142 TProofServTerminationHandler(TProofServ *s)
00143 : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
00144 Bool_t Notify();
00145 };
00146
00147
00148 Bool_t TProofServTerminationHandler::Notify()
00149 {
00150
00151
00152 Printf("Received SIGTERM: terminating");
00153 fServ->HandleTermination();
00154 return kTRUE;
00155 }
00156
00157
00158
00159 class TProofServInterruptHandler : public TSignalHandler {
00160 TProofServ *fServ;
00161 public:
00162 TProofServInterruptHandler(TProofServ *s)
00163 : TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
00164 Bool_t Notify();
00165 };
00166
00167
00168 Bool_t TProofServInterruptHandler::Notify()
00169 {
00170
00171
00172 fServ->HandleUrgentData();
00173 if (TROOT::Initialized()) {
00174 Throw(GetSignal());
00175 }
00176 return kTRUE;
00177 }
00178
00179
00180
00181 class TProofServSigPipeHandler : public TSignalHandler {
00182 TProofServ *fServ;
00183 public:
00184 TProofServSigPipeHandler(TProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
00185 { fServ = s; }
00186 Bool_t Notify();
00187 };
00188
00189
00190 Bool_t TProofServSigPipeHandler::Notify()
00191 {
00192
00193
00194 fServ->HandleSigPipe();
00195 return kTRUE;
00196 }
00197
00198
00199
00200 class TProofServInputHandler : public TFileHandler {
00201 TProofServ *fServ;
00202 public:
00203 TProofServInputHandler(TProofServ *s, Int_t fd) : TFileHandler(fd, 1)
00204 { fServ = s; }
00205 Bool_t Notify();
00206 Bool_t ReadNotify() { return Notify(); }
00207 };
00208
00209
00210 Bool_t TProofServInputHandler::Notify()
00211 {
00212
00213
00214 fServ->HandleSocketInput();
00215 return kTRUE;
00216 }
00217
00218 TString TProofServLogHandler::fgPfx = "";
00219 Int_t TProofServLogHandler::fgCmdRtn = 0;
00220
00221
00222 TProofServLogHandler::TProofServLogHandler(const char *cmd,
00223 TSocket *s, const char *pfx)
00224 : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
00225 {
00226
00227
00228 ResetBit(kFileIsPipe);
00229 fgCmdRtn = 0;
00230 fFile = 0;
00231 if (s && cmd) {
00232 fFile = gSystem->OpenPipe(cmd, "r");
00233 if (fFile) {
00234 SetFd(fileno(fFile));
00235
00236 Notify();
00237
00238 SetBit(kFileIsPipe);
00239 } else {
00240 fSocket = 0;
00241 Error("TProofServLogHandler", "executing command in pipe");
00242 fgCmdRtn = -1;
00243 }
00244 } else {
00245 Error("TProofServLogHandler",
00246 "undefined command (%p) or socket (%p)", (int *)cmd, s);
00247 }
00248 }
00249
00250 TProofServLogHandler::TProofServLogHandler(FILE *f, TSocket *s, const char *pfx)
00251 : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
00252 {
00253
00254
00255 ResetBit(kFileIsPipe);
00256 fgCmdRtn = 0;
00257 fFile = 0;
00258 if (s && f) {
00259 fFile = f;
00260 SetFd(fileno(fFile));
00261
00262 Notify();
00263 } else {
00264 Error("TProofServLogHandler", "undefined file (%p) or socket (%p)", f, s);
00265 }
00266 }
00267
00268 TProofServLogHandler::~TProofServLogHandler()
00269 {
00270
00271
00272 if (TestBit(kFileIsPipe) && fFile) {
00273 Int_t rc = gSystem->ClosePipe(fFile);
00274 #ifdef WIN32
00275 fgCmdRtn = rc;
00276 #else
00277 fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
00278 #endif
00279 }
00280 fFile = 0;
00281 fSocket = 0;
00282 ResetBit(kFileIsPipe);
00283 }
00284
00285 Bool_t TProofServLogHandler::Notify()
00286 {
00287
00288
00289 if (IsValid()) {
00290 TMessage m(kPROOF_MESSAGE);
00291
00292 char line[4096];
00293 char *plf = 0;
00294 while (fgets(line, sizeof(line), fFile)) {
00295 if ((plf = strchr(line, '\n')))
00296 *plf = 0;
00297
00298 TString log;
00299 if (fPfx.Length() > 0) {
00300
00301 log.Form("%s: %s", fPfx.Data(), line);
00302 } else if (fgPfx.Length() > 0) {
00303
00304 log.Form("%s: %s", fgPfx.Data(), line);
00305 } else {
00306
00307 log = line;
00308 }
00309
00310 m.Reset(kPROOF_MESSAGE);
00311 m << log;
00312 fSocket->Send(m);
00313 }
00314 }
00315 return kTRUE;
00316 }
00317
00318 void TProofServLogHandler::SetDefaultPrefix(const char *pfx)
00319 {
00320
00321
00322 fgPfx = pfx;
00323 }
00324
00325 Int_t TProofServLogHandler::GetCmdRtn()
00326 {
00327
00328
00329
00330 return fgCmdRtn;
00331 }
00332
00333
00334 TProofServLogHandlerGuard::TProofServLogHandlerGuard(const char *cmd, TSocket *s,
00335 const char *pfx, Bool_t on)
00336 {
00337
00338
00339 fExecHandler = 0;
00340 if (cmd && on) {
00341 fExecHandler = new TProofServLogHandler(cmd, s, pfx);
00342 if (fExecHandler->IsValid()) {
00343 gSystem->AddFileHandler(fExecHandler);
00344 } else {
00345 Error("TProofServLogHandlerGuard","invalid handler");
00346 }
00347 } else {
00348 if (on)
00349 Error("TProofServLogHandlerGuard","undefined command");
00350 }
00351 }
00352
00353
00354 TProofServLogHandlerGuard::TProofServLogHandlerGuard(FILE *f, TSocket *s,
00355 const char *pfx, Bool_t on)
00356 {
00357
00358
00359 fExecHandler = 0;
00360 if (f && on) {
00361 fExecHandler = new TProofServLogHandler(f, s, pfx);
00362 if (fExecHandler->IsValid()) {
00363 gSystem->AddFileHandler(fExecHandler);
00364 } else {
00365 Error("TProofServLogHandlerGuard","invalid handler");
00366 }
00367 } else {
00368 if (on)
00369 Error("TProofServLogHandlerGuard","undefined file");
00370 }
00371 }
00372
00373
00374 TProofServLogHandlerGuard::~TProofServLogHandlerGuard()
00375 {
00376
00377
00378 if (fExecHandler && fExecHandler->IsValid()) {
00379 gSystem->RemoveFileHandler(fExecHandler);
00380 SafeDelete(fExecHandler);
00381 }
00382 }
00383
00384
00385
00386 Bool_t TShutdownTimer::Notify()
00387 {
00388
00389
00390
00391 if (gDebug > 0)
00392 Info ("Notify","checking activity on the input socket");
00393
00394
00395 TSocket *xs = 0;
00396 if (fProofServ && (xs = fProofServ->GetSocket())) {
00397 TTimeStamp now;
00398 TTimeStamp ts = xs->GetLastUsage();
00399 Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
00400 (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
00401 Int_t to = gEnv->GetValue("ProofServ.ShutdonwTimeout", 20);
00402 if (dt > to * 60000) {
00403 Printf("TShutdownTimer::Notify: input socket: %p: did not show any activity"
00404 " during the last %d mins: aborting", xs, to);
00405
00406
00407 gSystem->Abort();
00408 } else {
00409 if (gDebug > 0)
00410 Info("Notify", "input socket: %p: show activity"
00411 " %ld secs ago", xs, dt / 60000);
00412 }
00413 }
00414 Start(-1, kFALSE);
00415 return kTRUE;
00416 }
00417
00418
00419
00420 TReaperTimer::~TReaperTimer()
00421 {
00422
00423
00424 if (fChildren) {
00425 fChildren->SetOwner(kTRUE);
00426 delete fChildren;
00427 fChildren = 0;
00428 }
00429 }
00430
00431
00432 void TReaperTimer::AddPid(Int_t pid)
00433 {
00434
00435
00436 if (pid > 0) {
00437 if (!fChildren)
00438 fChildren = new TList;
00439 TString spid;
00440 spid.Form("%d", pid);
00441 fChildren->Add(new TParameter<Int_t>(spid.Data(), pid));
00442 TurnOn();
00443 }
00444 }
00445
00446
00447 Bool_t TReaperTimer::Notify()
00448 {
00449
00450
00451
00452 if (fChildren) {
00453 TIter nxp(fChildren);
00454 TParameter<Int_t> *p = 0;
00455 while ((p = (TParameter<Int_t> *)nxp())) {
00456 int status;
00457 #ifndef WIN32
00458 pid_t pid;
00459 do {
00460 pid = waitpid(p->GetVal(), &status, WNOHANG);
00461 } while (pid < 0 && errno == EINTR);
00462 #else
00463 intptr_t pid;
00464 pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
00465 #endif
00466 if (pid > 0 && pid == p->GetVal()) {
00467
00468 fChildren->Remove(p);
00469 delete p;
00470 }
00471 }
00472 }
00473
00474
00475 if (!fChildren || fChildren->GetSize() <= 0) {
00476 Stop();
00477 } else {
00478
00479 Reset();
00480 }
00481 return kTRUE;
00482 }
00483
00484
00485
00486 Bool_t TIdleTOTimer::Notify()
00487 {
00488
00489
00490 Info ("Notify", "session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
00491
00492 if (fProofServ) {
00493
00494 Int_t uss_rc = -1;
00495 if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
00496 Warning("Notify", "problems updating session status (errno: %d)", -uss_rc);
00497
00498 TString msg;
00499 if (fProofServ->GetProtocol() < 29) {
00500 msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
00501 "// Please IGNORE any error message possibly displayed below\n//",
00502 gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
00503 } else {
00504 msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
00505 gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
00506 }
00507 fProofServ->SendAsynMessage(msg.Data());
00508 fProofServ->Terminate(0);
00509 Reset();
00510 Stop();
00511 } else {
00512 Warning("Notify", "fProofServ undefined!");
00513 Start(-1, kTRUE);
00514 }
00515 return kTRUE;
00516 }
00517
00518 ClassImp(TProofServ)
00519
00520
00521
00522 extern "C" {
00523 TApplication *GetTProofServ(Int_t *argc, char **argv, FILE *flog)
00524 { return new TProofServ(argc, argv, flog); }
00525 }
00526
00527
00528 TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog)
00529 : TApplication("proofserv", argc, argv, 0, -1)
00530 {
00531
00532
00533
00534
00535
00536
00537 TString rcfile = gSystem->Getenv("ROOTRCFILE") ? gSystem->Getenv("ROOTRCFILE")
00538 : "session.rootrc";
00539 if (!gSystem->AccessPathName(rcfile, kReadPermission))
00540 gEnv->ReadFile(rcfile, kEnvChange);
00541
00542
00543 fgVirtMemMax = gEnv->GetValue("Proof.VirtMemMax",-1);
00544 if (fgVirtMemMax < 0 && gSystem->Getenv("PROOF_VIRTMEMMAX")) {
00545 Long_t mmx = strtol(gSystem->Getenv("PROOF_VIRTMEMMAX"), 0, 10);
00546 if (mmx < kMaxLong && mmx > 0)
00547 fgVirtMemMax = mmx * 1024;
00548 }
00549
00550 if (fgVirtMemMax < 0 && gSystem->Getenv("ROOTPROOFASHARD")) {
00551 Long_t mmx = strtol(gSystem->Getenv("ROOTPROOFASHARD"), 0, 10);
00552 if (mmx < kMaxLong && mmx > 0)
00553 fgVirtMemMax = mmx * 1024;
00554 }
00555
00556 fgResMemMax = gEnv->GetValue("Proof.ResMemMax",-1);
00557 if (fgResMemMax < 0 && gSystem->Getenv("PROOF_RESMEMMAX")) {
00558 Long_t mmx = strtol(gSystem->Getenv("PROOF_RESMEMMAX"), 0, 10);
00559 if (mmx < kMaxLong && mmx > 0)
00560 fgResMemMax = mmx * 1024;
00561 }
00562
00563 fgMemStop = gEnv->GetValue("Proof.MemStop", 0.95);
00564 fgMemHWM = gEnv->GetValue("Proof.MemHWM", 0.80);
00565 if (fgVirtMemMax > 0 || fgResMemMax > 0) {
00566 if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
00567 Warning("TProofServ", "requested memory fraction threshold to stop processing"
00568 " (MemStop) out of range [0,1] - ignoring");
00569 fgMemStop = 0.95;
00570 }
00571 if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
00572 Warning("TProofServ", "requested memory fraction threshold for warning and finer monitoring"
00573 " (MemHWM) out of range [0,MemStop] - ignoring");
00574 fgMemHWM = 0.80;
00575 }
00576 }
00577
00578
00579 Bool_t test = (*argc >= 4 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
00580 if ((gEnv->GetValue("Proof.GdbHook",0) == 3 && !test) ||
00581 (gEnv->GetValue("Proof.GdbHook",0) == 4 && test)) {
00582 while (gProofServDebug)
00583 ;
00584 }
00585
00586
00587 if (*argc >= 4)
00588 if (!strcmp(argv[3], "test"))
00589 fService = "prooftest";
00590
00591
00592 if (*argc < 2) {
00593 Error("TProofServ", "Must have at least 1 arguments (see proofd).");
00594 exit(1);
00595 }
00596
00597
00598 gProofServ = this;
00599
00600
00601 fSendLogToMaster = kFALSE;
00602
00603
00604 gErrorAbortLevel = kSysError + 1;
00605 SetErrorHandlerFile(stderr);
00606 SetErrorHandler(ErrorHandler);
00607
00608 fNcmd = 0;
00609 fGroupPriority = 100;
00610 fInterrupt = kFALSE;
00611 fProtocol = 0;
00612 fOrdinal = gEnv->GetValue("ProofServ.Ordinal", "-1");
00613 fGroupId = -1;
00614 fGroupSize = 0;
00615 fRealTime = 0.0;
00616 fCpuTime = 0.0;
00617 fProof = 0;
00618 fPlayer = 0;
00619 fSocket = 0;
00620 fEnabledPackages = new TList;
00621 fEnabledPackages->SetOwner();
00622
00623 fTotSessions = -1;
00624 fActSessions = -1;
00625 fEffSessions = -1.;
00626
00627 fGlobalPackageDirList = 0;
00628
00629 fLogFile = flog;
00630 fLogFileDes = -1;
00631
00632 fArchivePath = "";
00633
00634 fPackageLock = 0;
00635 fCacheLock = 0;
00636 fQueryLock = 0;
00637
00638 fQMgr = 0;
00639 fQMtx = new TMutex(kTRUE);
00640 fWaitingQueries = new TList;
00641 fIdle = kTRUE;
00642 fQuerySeqNum = -1;
00643
00644 fQueuedMsg = new TList;
00645
00646 fRealTimeLog = kFALSE;
00647
00648 fShutdownTimer = 0;
00649 fReaperTimer = 0;
00650 fIdleTOTimer = 0;
00651
00652 fInflateFactor = 1000;
00653
00654 fDataSetManager = 0;
00655
00656 fInputHandler = 0;
00657
00658
00659 fMaxQueries = -1;
00660 fMaxBoxSize = -1;
00661 fHWMBoxSize = -1;
00662
00663
00664 fMergingSocket = 0;
00665 fMergingMonitor = 0;
00666 fMergedWorkers = 0;
00667
00668
00669 ResetBit(TProofServ::kHighMemory);
00670
00671
00672 fMsgSizeHWM = gEnv->GetValue("ProofServ.MsgSizeHWM", 1000000);
00673
00674
00675 fCompressMsg = gEnv->GetValue("ProofServ.CompressMessage", 0);
00676
00677 gProofDebugLevel = gEnv->GetValue("Proof.DebugLevel",0);
00678 fLogLevel = gProofDebugLevel;
00679
00680 gProofDebugMask = (TProofDebug::EProofDebugMask) gEnv->GetValue("Proof.DebugMask",~0);
00681 if (gProofDebugLevel > 0)
00682 Info("TProofServ", "DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
00683
00684
00685 GetOptions(argc, argv);
00686
00687
00688 fPrefix = (IsMaster() ? "Mst-" : "Wrk-");
00689 if (test) fPrefix = "Test";
00690 if (fOrdinal != "-1")
00691 fPrefix += fOrdinal;
00692 TProofServLogHandler::SetDefaultPrefix(fPrefix);
00693
00694
00695 TString slog = gEnv->GetValue("ProofServ.LogToSysLog", "");
00696 if (!(slog.IsNull())) {
00697 if (slog.IsDigit()) {
00698 fgLogToSysLog = slog.Atoi();
00699 } else {
00700 char c = (slog[0] == 'M' || slog[0] == 'm') ? 'm' : 'a';
00701 c = (slog[0] == 'W' || slog[0] == 'w') ? 'w' : c;
00702 Bool_t dosyslog = ((c == 'm' && IsMaster()) ||
00703 (c == 'w' && !IsMaster()) || c == 'a') ? kTRUE : kFALSE;
00704 if (dosyslog) {
00705 slog.Remove(0,1);
00706 if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
00707 if (fgLogToSysLog <= 0)
00708 Warning("TProofServ", "request for syslog logging ineffective!");
00709 }
00710 }
00711 }
00712
00713 if (fgLogToSysLog > 0) {
00714 fgSysLogService = (IsMaster()) ? "proofm" : "proofw";
00715 if (fOrdinal != "-1") fgSysLogService += TString::Format("-%s", fOrdinal.Data());
00716 gSystem->Openlog(fgSysLogService, kLogPid | kLogCons, kLogLocal5);
00717 }
00718
00719
00720
00721
00722 Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
00723 if (enableSchemaEvolution) {
00724 TMessage::EnableSchemaEvolutionForAll();
00725 } else {
00726 Info("TProofServ", "automatic schema evolution in TMessage explicitely disabled");
00727 }
00728 }
00729
00730
00731 Int_t TProofServ::CreateServer()
00732 {
00733
00734
00735
00736
00737
00738 TString opensock = gSystem->Getenv("ROOTOPENSOCK");
00739 if (opensock.Length() <= 0)
00740 opensock = gEnv->GetValue("ProofServ.OpenSock", "-1");
00741 Int_t sock = opensock.Atoi();
00742 if (sock <= 0) {
00743 Fatal("CreateServer", "Invalid socket descriptor number (%d)", sock);
00744 return -1;
00745 }
00746 fSocket = new TSocket(sock);
00747
00748
00749 fSocket->SetCompressionLevel(fCompressMsg);
00750
00751
00752 if (IsMaster()) {
00753
00754 if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
00755 while (gProofServDebug)
00756 ;
00757 }
00758 } else {
00759
00760 if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
00761 while (gProofServDebug)
00762 ;
00763 }
00764 }
00765
00766 if (gProofDebugLevel > 0)
00767 Info("CreateServer", "Service %s ConfDir %s IsMaster %d\n",
00768 GetService(), GetConfDir(), (Int_t)fMasterServ);
00769
00770 if (Setup() != 0) {
00771
00772 LogToMaster();
00773 SendLogFile();
00774 Terminate(0);
00775 return -1;
00776 }
00777
00778
00779
00780
00781 TString pfx = (IsMaster() ? "Mst-" : "Wrk-");
00782 pfx += GetOrdinal();
00783 TProofServLogHandler::SetDefaultPrefix(pfx);
00784
00785 if (!fLogFile) {
00786 RedirectOutput();
00787
00788
00789 if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
00790 LogToMaster();
00791 SendLogFile(-98);
00792 Terminate(0);
00793 return -1;
00794 }
00795 } else {
00796
00797 if ((fLogFileDes = fileno(fLogFile)) < 0) {
00798 LogToMaster();
00799 SendLogFile(-98);
00800 Terminate(0);
00801 return -1;
00802 }
00803 }
00804
00805
00806 if (IsMaster()) {
00807 if (CatMotd() == -1) {
00808 LogToMaster();
00809 SendLogFile(-99);
00810 Terminate(0);
00811 return -1;
00812 }
00813 }
00814
00815
00816 ProcessLine("#include <iostream>", kTRUE);
00817 ProcessLine("#include <string>",kTRUE);
00818
00819
00820
00821
00822
00823
00824 const char *logon;
00825 logon = gEnv->GetValue("Proof.Load", (char *)0);
00826 if (logon) {
00827 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00828 if (mac)
00829 ProcessLine(TString::Format(".L %s", logon), kTRUE);
00830 delete [] mac;
00831 }
00832
00833
00834 logon = gEnv->GetValue("Proof.Logon", (char *)0);
00835 if (logon && !NoLogOpt()) {
00836 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00837 if (mac)
00838 ProcessFile(logon);
00839 delete [] mac;
00840 }
00841
00842
00843 gInterpreter->SaveContext();
00844 gInterpreter->SaveGlobalsContext();
00845
00846
00847 gSystem->AddSignalHandler(new TProofServTerminationHandler(this));
00848 gSystem->AddSignalHandler(new TProofServInterruptHandler(this));
00849 fInputHandler = new TProofServInputHandler(this, sock);
00850 gSystem->AddFileHandler(fInputHandler);
00851
00852
00853 if (IsMaster()) {
00854 TString master = "proof://__master__";
00855 TInetAddress a = gSystem->GetSockName(sock);
00856 if (a.IsValid()) {
00857 master += ":";
00858 master += a.GetPort();
00859 }
00860
00861
00862 TPluginManager *pm = gROOT->GetPluginManager();
00863 if (!pm) {
00864 Error("CreateServer", "no plugin manager found");
00865 SendLogFile(-99);
00866 Terminate(0);
00867 return -1;
00868 }
00869
00870
00871 TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
00872 if (!h) {
00873 Error("CreateServer", "no plugin found for TProof with a"
00874 " config file of '%s'", fConfFile.Data());
00875 SendLogFile(-99);
00876 Terminate(0);
00877 return -1;
00878 }
00879
00880
00881 if (h->LoadPlugin() == -1) {
00882 Error("CreateServer", "plugin for TProof could not be loaded");
00883 SendLogFile(-99);
00884 Terminate(0);
00885 return -1;
00886 }
00887
00888
00889 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
00890 fConfFile.Data(),
00891 GetConfDir(),
00892 fLogLevel, 0));
00893 if (!fProof || !fProof->IsValid()) {
00894 Error("CreateServer", "plugin for TProof could not be executed");
00895 SafeDelete(fProof);
00896 SendLogFile(-99);
00897 Terminate(0);
00898 return -1;
00899 }
00900
00901 fEndMaster = fProof->IsEndMaster();
00902
00903 SendLogFile();
00904 }
00905
00906
00907 if (!fShutdownTimer) {
00908
00909 fShutdownTimer = new TShutdownTimer(this, 300000);
00910 fShutdownTimer->Start(-1, kFALSE);
00911 }
00912
00913
00914
00915 if (fProtocol <= 17) {
00916 TString msg;
00917 msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
00918 " This may generate compatibility problems between streamed objects.\n"
00919 " The advise is to move to ROOT >= 5.21/02 .");
00920 SendAsynMessage(msg.Data());
00921 }
00922
00923
00924 if (IsMaster() && !fIdleTOTimer) {
00925
00926 Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
00927 if (idle_to > 0) {
00928 fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
00929 fIdleTOTimer->Start(-1, kTRUE);
00930 if (gProofDebugLevel > 0)
00931 Info("CreateServer", " idle timer started (%d secs)", idle_to);
00932 } else if (gProofDebugLevel > 0) {
00933 Info("CreateServer", " idle timer not started (no idle timeout requested)");
00934 }
00935 }
00936
00937
00938 return 0;
00939 }
00940
00941
00942 TProofServ::~TProofServ()
00943 {
00944
00945
00946
00947 SafeDelete(fWaitingQueries);
00948 SafeDelete(fQMtx);
00949 SafeDelete(fEnabledPackages);
00950 SafeDelete(fSocket);
00951 SafeDelete(fPackageLock);
00952 SafeDelete(fCacheLock);
00953 SafeDelete(fQueryLock);
00954 SafeDelete(fGlobalPackageDirList);
00955 close(fLogFileDes);
00956 }
00957
00958
00959 Int_t TProofServ::CatMotd()
00960 {
00961
00962
00963
00964
00965
00966
00967 TString lastname;
00968 FILE *motd;
00969 Bool_t show = kFALSE;
00970
00971
00972 TString motdname(GetConfDir());
00973
00974
00975 if (gSystem->Getenv("PROOFNOPROOF")) {
00976 motdname = gSystem->Getenv("PROOFNOPROOF");
00977 } else {
00978 motdname += "/etc/proof/noproof";
00979 }
00980 if ((motd = fopen(motdname, "r"))) {
00981 Int_t c;
00982 printf("\n");
00983 while ((c = getc(motd)) != EOF)
00984 putchar(c);
00985 fclose(motd);
00986 printf("\n");
00987
00988 return -1;
00989 }
00990
00991
00992 lastname = TString(GetWorkDir()) + "/.prooflast";
00993 char *last = gSystem->ExpandPathName(lastname.Data());
00994 Long64_t size;
00995 Long_t id, flags, modtime, lasttime;
00996 if (gSystem->GetPathInfo(last, &id, &size, &flags, &lasttime) == 1)
00997 lasttime = 0;
00998
00999
01000 if (time(0) - lasttime > (time_t)86400)
01001 show = kTRUE;
01002
01003
01004
01005 if (gSystem->Getenv("PROOFMOTD")) {
01006 motdname = gSystem->Getenv("PROOFMOTD");
01007 } else {
01008 motdname = GetConfDir();
01009 motdname += "/etc/proof/motd";
01010 }
01011 if (gSystem->GetPathInfo(motdname, &id, &size, &flags, &modtime) == 0) {
01012 if (modtime > lasttime || show) {
01013 if ((motd = fopen(motdname, "r"))) {
01014 Int_t c;
01015 printf("\n");
01016 while ((c = getc(motd)) != EOF)
01017 putchar(c);
01018 fclose(motd);
01019 printf("\n");
01020 }
01021 }
01022 }
01023
01024 if (lasttime)
01025 gSystem->Unlink(last);
01026 Int_t fd = creat(last, 0600);
01027 if (fd >= 0) close(fd);
01028 delete [] last;
01029
01030 return 0;
01031 }
01032
01033
01034 TObject *TProofServ::Get(const char *namecycle)
01035 {
01036
01037
01038
01039
01040 fSocket->Send(namecycle, kPROOF_GETOBJECT);
01041
01042 TObject *idcur = 0;
01043
01044 Bool_t notdone = kTRUE;
01045 while (notdone) {
01046 TMessage *mess = 0;
01047 if (fSocket->Recv(mess) < 0)
01048 return 0;
01049 Int_t what = mess->What();
01050 if (what == kMESS_OBJECT) {
01051 idcur = mess->ReadObject(mess->GetClass());
01052 notdone = kFALSE;
01053 } else {
01054 Int_t xrc = HandleSocketInput(mess, kFALSE);
01055 if (xrc == -1) {
01056 Error("Get", "command %d cannot be executed while processing", what);
01057 } else if (xrc == -2) {
01058 Error("Get", "unknown command %d ! Protocol error?", what);
01059 }
01060 }
01061 delete mess;
01062 }
01063
01064 return idcur;
01065 }
01066
01067
01068 void TProofServ::RestartComputeTime()
01069 {
01070
01071
01072 fCompute.Stop();
01073 if (fPlayer) {
01074 TProofProgressStatus *status = fPlayer->GetProgressStatus();
01075 if (status) status->SetLearnTime(fCompute.RealTime());
01076 Info("RestartComputeTime", "compute time restarted after %f secs (%d entries)",
01077 fCompute.RealTime(), fPlayer->GetLearnEntries());
01078 }
01079 fCompute.Start(kFALSE);
01080 }
01081
01082
01083 TDSetElement *TProofServ::GetNextPacket(Long64_t totalEntries)
01084 {
01085
01086
01087 Long64_t bytesRead = 0;
01088
01089 if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
01090
01091 if (fCompute.Counter() > 0)
01092 fCompute.Stop();
01093
01094 TMessage req(kPROOF_GETPACKET);
01095 Double_t cputime = fCompute.CpuTime();
01096 Double_t realtime = fCompute.RealTime();
01097
01098
01099 PDB(kLoop, 2)
01100 Info("GetNextPacket", "inflate factor: %d"
01101 " (realtime: %f, cputime: %f, entries: %lld)",
01102 fInflateFactor, realtime, cputime, totalEntries);
01103 if (fInflateFactor > 1000) {
01104 UInt_t sleeptime = (UInt_t) (cputime * (fInflateFactor - 1000)) ;
01105 Int_t i = 0;
01106 for (i = kSigBus ; i <= kSigUser2 ; i++)
01107 gSystem->IgnoreSignal((ESignals)i, kTRUE);
01108 gSystem->Sleep(sleeptime);
01109 for (i = kSigBus ; i <= kSigUser2 ; i++)
01110 gSystem->IgnoreSignal((ESignals)i, kFALSE);
01111 realtime += sleeptime / 1000.;
01112 PDB(kLoop, 2)
01113 Info("GetNextPacket","slept %d millisec", sleeptime);
01114 }
01115
01116 if (fProtocol > 18) {
01117 req << fLatency.RealTime();
01118 TProofProgressStatus *status = 0;
01119 if (fPlayer)
01120 status = fPlayer->GetProgressStatus();
01121 else {
01122 Error("GetNextPacket", "no progress status object");
01123 return 0;
01124 }
01125
01126
01127 if (status->GetEntries() > 0) {
01128 PDB(kLoop, 2) status->Print(GetOrdinal());
01129 status->IncProcTime(realtime);
01130 status->IncCPUTime(cputime);
01131 }
01132 req << status;
01133
01134 Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
01135 Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
01136 req << cacheSize << learnent;
01137
01138
01139
01140
01141 req << totalEntries;
01142
01143 PDB(kLoop, 1) {
01144 PDB(kLoop, 2) status->Print();
01145 Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
01146 }
01147 status = 0;
01148 } else {
01149 req << fLatency.RealTime() << realtime << cputime
01150 << bytesRead << totalEntries;
01151 if (fPlayer)
01152 req << fPlayer->GetEventsProcessed();
01153 }
01154
01155 fLatency.Start();
01156 Int_t rc = fSocket->Send(req);
01157 if (rc <= 0) {
01158 Error("GetNextPacket","Send() failed, returned %d", rc);
01159 return 0;
01160 }
01161
01162 TDSetElement *e = 0;
01163 Bool_t notdone = kTRUE;
01164 while (notdone) {
01165
01166 TMessage *mess;
01167 if ((rc = fSocket->Recv(mess)) <= 0) {
01168 fLatency.Stop();
01169 Error("GetNextPacket","Recv() failed, returned %d", rc);
01170 return 0;
01171 }
01172
01173 Int_t xrc = 0;
01174 TString file, dir, obj;
01175
01176 Int_t what = mess->What();
01177
01178 switch (what) {
01179 case kPROOF_GETPACKET:
01180
01181 fLatency.Stop();
01182 (*mess) >> e;
01183 if (e != 0) {
01184 fCompute.Start();
01185 PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
01186 e->GetFileName(), e->GetDirectory(),
01187 e->GetObjName(), e->GetFirst(),e->GetNum());
01188 } else {
01189 PDB(kLoop, 2) Info("GetNextPacket", "Done");
01190 }
01191 notdone = kFALSE;
01192 break;
01193
01194 case kPROOF_STOPPROCESS:
01195
01196
01197
01198 fLatency.Stop();
01199 PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
01200 break;
01201
01202 default:
01203 xrc = HandleSocketInput(mess, kFALSE);
01204 if (xrc == -1) {
01205 Error("GetNextPacket", "command %d cannot be executed while processing", what);
01206 } else if (xrc == -2) {
01207 Error("GetNextPacket", "unknown command %d ! Protocol error?", what);
01208 }
01209 break;
01210 }
01211
01212 delete mess;
01213
01214 }
01215
01216
01217 return e;
01218 }
01219
01220
01221 void TProofServ::GetOptions(Int_t *argc, char **argv)
01222 {
01223
01224
01225
01226 if (*argc <= 1) {
01227 Fatal("GetOptions", "Must be started from proofd with arguments");
01228 exit(1);
01229 }
01230
01231 if (!strcmp(argv[1], "proofserv")) {
01232 fMasterServ = kTRUE;
01233 fEndMaster = kTRUE;
01234 } else if (!strcmp(argv[1], "proofslave")) {
01235 fMasterServ = kFALSE;
01236 fEndMaster = kFALSE;
01237 } else {
01238 Fatal("GetOptions", "Must be started as 'proofserv' or 'proofslave'");
01239 exit(1);
01240 }
01241
01242 fService = argv[1];
01243
01244
01245 if (!(gSystem->Getenv("ROOTCONFDIR"))) {
01246 Fatal("GetOptions", "ROOTCONFDIR shell variable not set");
01247 exit(1);
01248 }
01249 fConfDir = gSystem->Getenv("ROOTCONFDIR");
01250 }
01251
01252
01253 void TProofServ::HandleSocketInput()
01254 {
01255
01256
01257
01258 TIdleTOTimerGuard itg(fIdleTOTimer);
01259
01260 Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
01261 fgRecursive++;
01262
01263 TMessage *mess;
01264 Int_t rc = 0;
01265 TString exmsg;
01266
01267 try {
01268
01269
01270 if (fSocket->Recv(mess) <= 0 || !mess) {
01271
01272
01273 Error("HandleSocketInput", "retrieving message from input socket");
01274 Terminate(0);
01275 return;
01276 }
01277 Int_t what = mess->What();
01278 PDB(kCollect, 1)
01279 Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
01280
01281 fNcmd++;
01282
01283 if (fProof) fProof->SetActive();
01284
01285 Bool_t doit = kTRUE;
01286
01287 while (doit) {
01288
01289
01290 rc = HandleSocketInput(mess, all);
01291 if (rc < 0) {
01292 TString emsg;
01293 if (rc == -1) {
01294 emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what);
01295 } else if (rc == -3) {
01296 emsg.Form("HandleSocketInput: message %d undefined! Protocol error?", what);
01297 } else {
01298 emsg.Form("HandleSocketInput: unknown command %d! Protocol error?", what);
01299 }
01300 SendAsynMessage(emsg.Data());
01301 } else if (rc == 2) {
01302
01303 fQueuedMsg->Add(mess);
01304 PDB(kGlobal, 1)
01305 Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
01306 what, fQueuedMsg->GetSize());
01307 mess = 0;
01308 }
01309
01310
01311 doit = 0;
01312 if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
01313
01314 PDB(kCollect, 1)
01315 Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
01316 what, fQueuedMsg->GetSize());
01317 all = 1;
01318 SafeDelete(mess);
01319 mess = (TMessage *) fQueuedMsg->First();
01320 if (mess) fQueuedMsg->Remove(mess);
01321 doit = 1;
01322 }
01323 }
01324
01325 } catch (std::bad_alloc &) {
01326
01327 exmsg.Form("caught exception 'bad_alloc' (memory leak?) %s", fgLastMsg.Data());
01328 } catch (std::exception &exc) {
01329
01330 exmsg.Form("caught standard exception '%s' %s", exc.what(), fgLastMsg.Data());
01331 } catch (int i) {
01332
01333 exmsg.Form("caught exception throwing %d %s", i, fgLastMsg.Data());
01334 } catch (const char *str) {
01335
01336 exmsg.Form("caught exception throwing '%s' %s", str, fgLastMsg.Data());
01337 } catch (...) {
01338
01339 exmsg.Form("caught exception <unknown> %s", fgLastMsg.Data());
01340 }
01341
01342
01343 if (!exmsg.IsNull()) {
01344
01345 Error("HandleSocketInput", "%s", exmsg.Data());
01346
01347 SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
01348
01349 Terminate(0);
01350 }
01351
01352
01353
01354 if (TestBit(TProofServ::kHighMemory)) {
01355
01356 exmsg.Form("high-memory footprint detected during Process(...) - terminating");
01357 Error("HandleSocketInput", "%s", exmsg.Data());
01358
01359 SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
01360
01361 Terminate(0);
01362 }
01363
01364 fgRecursive--;
01365
01366 if (fProof) {
01367
01368
01369 Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
01370 Int_t ngwrks = fProof->GetListOfActiveSlaves()->GetSize() + fProof->GetListOfInactiveSlaves()->GetSize();
01371 if (rc == 0 && ngwrks == 0 && !masterOnly) {
01372 SendAsynMessage(" *** No workers left: cannot continue! Terminating ... *** ");
01373 Terminate(0);
01374 }
01375 fProof->SetActive(kFALSE);
01376
01377 fProof->SetRunStatus(TProof::kRunning);
01378 }
01379
01380
01381 SafeDelete(mess);
01382 }
01383
01384
01385 Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all)
01386 {
01387
01388
01389
01390
01391
01392
01393
01394
01395 static TStopwatch timer;
01396 char str[2048];
01397 Bool_t aborted = kFALSE;
01398
01399 if (!mess) return -3;
01400
01401 Int_t what = mess->What();
01402 PDB(kCollect, 1)
01403 Info("HandleSocketInput", "processing message type %d from '%s'",
01404 what, fSocket->GetTitle());
01405
01406 timer.Start();
01407
01408 Int_t rc = 0;
01409 TString slb;
01410 TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
01411
01412 switch (what) {
01413
01414 case kMESS_CINT:
01415 if (all) {
01416 mess->ReadString(str, sizeof(str));
01417
01418 TString fn;
01419 if (TProof::GetFileInCmd(str, fn))
01420 CopyFromCache(fn, 1);
01421 if (IsParallel()) {
01422 fProof->SendCommand(str);
01423 } else {
01424 PDB(kGlobal, 1)
01425 Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
01426 ProcessLine(str);
01427 }
01428 LogToMaster();
01429 } else {
01430 rc = -1;
01431 }
01432 SendLogFile();
01433 if (pslb) slb = str;
01434 break;
01435
01436 case kMESS_STRING:
01437 if (all) {
01438 mess->ReadString(str, sizeof(str));
01439 } else {
01440 rc = -1;
01441 }
01442 break;
01443
01444 case kMESS_OBJECT:
01445 if (all) {
01446 mess->ReadObject(mess->GetClass());
01447 } else {
01448 rc = -1;
01449 }
01450 break;
01451
01452 case kPROOF_GROUPVIEW:
01453 if (all) {
01454 mess->ReadString(str, sizeof(str));
01455
01456 sscanf(str, "%d %d", &fGroupId, &fGroupSize);
01457 } else {
01458 rc = -1;
01459 }
01460 break;
01461
01462 case kPROOF_LOGLEVEL:
01463 { UInt_t mask;
01464 mess->ReadString(str, sizeof(str));
01465 sscanf(str, "%d %u", &fLogLevel, &mask);
01466 gProofDebugLevel = fLogLevel;
01467 gProofDebugMask = (TProofDebug::EProofDebugMask) mask;
01468 if (IsMaster())
01469 fProof->SetLogLevel(fLogLevel, mask);
01470 }
01471 break;
01472
01473 case kPROOF_PING:
01474 { if (IsMaster())
01475 fProof->Ping();
01476
01477 }
01478 break;
01479
01480 case kPROOF_PRINT:
01481 mess->ReadString(str, sizeof(str));
01482 Print(str);
01483 LogToMaster();
01484 SendLogFile();
01485 break;
01486
01487 case kPROOF_RESET:
01488 if (all) {
01489 mess->ReadString(str, sizeof(str));
01490 Reset(str);
01491 } else {
01492 rc = -1;
01493 }
01494 break;
01495
01496 case kPROOF_STATUS:
01497 Warning("HandleSocketInput:kPROOF_STATUS",
01498 "kPROOF_STATUS message is obsolete");
01499 fSocket->Send(fProof->GetParallel(), kPROOF_STATUS);
01500 break;
01501
01502 case kPROOF_GETSTATS:
01503 SendStatistics();
01504 break;
01505
01506 case kPROOF_GETPARALLEL:
01507 SendParallel();
01508 break;
01509
01510 case kPROOF_STOP:
01511 if (all) {
01512 if (IsMaster()) {
01513 TString ord;
01514 *mess >> ord;
01515 PDB(kGlobal, 1)
01516 Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
01517 if (fProof) fProof->TerminateWorker(ord);
01518 } else {
01519 PDB(kGlobal, 1)
01520 Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
01521 Terminate(0);
01522 }
01523 } else {
01524 rc = -1;
01525 }
01526 break;
01527
01528 case kPROOF_STOPPROCESS:
01529 if (all) {
01530
01531
01532
01533 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
01534 } else {
01535 Long_t timeout = -1;
01536 (*mess) >> aborted;
01537 if (fProtocol > 9)
01538 (*mess) >> timeout;
01539 PDB(kGlobal, 1)
01540 Info("HandleSocketInput:kPROOF_STOPPROCESS",
01541 "recursive mode: enter %d, %ld", aborted, timeout);
01542 if (fProof)
01543
01544 fProof->StopProcess(aborted, timeout);
01545 else
01546
01547 if (fPlayer)
01548 fPlayer->StopProcess(aborted, timeout);
01549 }
01550 break;
01551
01552 case kPROOF_PROCESS:
01553 {
01554 TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
01555 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
01556 HandleProcess(mess, pslb);
01557
01558
01559
01560 }
01561 break;
01562
01563 case kPROOF_QUERYLIST:
01564 {
01565 HandleQueryList(mess);
01566
01567 SendLogFile();
01568 }
01569 break;
01570
01571 case kPROOF_REMOVE:
01572 {
01573 HandleRemove(mess, pslb);
01574
01575 SendLogFile();
01576 }
01577 break;
01578
01579 case kPROOF_RETRIEVE:
01580 {
01581 HandleRetrieve(mess, pslb);
01582
01583 SendLogFile();
01584 }
01585 break;
01586
01587 case kPROOF_ARCHIVE:
01588 {
01589 HandleArchive(mess, pslb);
01590
01591 SendLogFile();
01592 }
01593 break;
01594
01595 case kPROOF_MAXQUERIES:
01596 { PDB(kGlobal, 1)
01597 Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
01598 TMessage m(kPROOF_MAXQUERIES);
01599 m << fMaxQueries;
01600 fSocket->Send(m);
01601
01602 SendLogFile();
01603 }
01604 break;
01605
01606 case kPROOF_CLEANUPSESSION:
01607 if (all) {
01608 PDB(kGlobal, 1)
01609 Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
01610 TString stag;
01611 (*mess) >> stag;
01612 if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
01613 Printf("Session %s cleaned up", stag.Data());
01614 } else {
01615 Printf("Could not cleanup session %s", stag.Data());
01616 }
01617 } else {
01618 rc = -1;
01619 }
01620
01621 SendLogFile();
01622 break;
01623
01624 case kPROOF_GETENTRIES:
01625 { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
01626 Bool_t isTree;
01627 TString filename;
01628 TString dir;
01629 TString objname("undef");
01630 Long64_t entries = -1;
01631
01632 if (all) {
01633 (*mess) >> isTree >> filename >> dir >> objname;
01634 PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
01635 "Report size of object %s (%s) in dir %s in file %s",
01636 objname.Data(), isTree ? "T" : "O",
01637 dir.Data(), filename.Data());
01638 entries = TDSet::GetEntries(isTree, filename, dir, objname);
01639 PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
01640 "Found %lld %s", entries, isTree ? "entries" : "objects");
01641 } else {
01642 rc = -1;
01643 }
01644 TMessage answ(kPROOF_GETENTRIES);
01645 answ << entries << objname;
01646 SendLogFile();
01647 fSocket->Send(answ);
01648 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
01649 }
01650 break;
01651
01652 case kPROOF_CHECKFILE:
01653 if (!all && fProtocol <= 19) {
01654
01655 rc = 2;
01656 } else {
01657
01658 HandleCheckFile(mess, pslb);
01659 }
01660 break;
01661
01662 case kPROOF_SENDFILE:
01663 if (!all && fProtocol <= 19) {
01664
01665 rc = 2;
01666 } else {
01667 mess->ReadString(str, sizeof(str));
01668 Long_t size;
01669 Int_t bin, fw = 1;
01670 char name[1024];
01671 if (fProtocol > 5) {
01672 sscanf(str, "%1023s %d %ld %d", name, &bin, &size, &fw);
01673 } else {
01674 sscanf(str, "%1023s %d %ld", name, &bin, &size);
01675 }
01676 TString fnam(name);
01677 Bool_t copytocache = kTRUE;
01678 if (fnam.BeginsWith("cache:")) {
01679 fnam.ReplaceAll("cache:", TString::Format("%s/", fCacheDir.Data()));
01680 copytocache = kFALSE;
01681 }
01682 if (size > 0) {
01683 ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
01684 } else {
01685
01686 if (!fnam.BeginsWith(fCacheDir.Data())) {
01687 fnam.Insert(0, TString::Format("%s/", fCacheDir.Data()));
01688 }
01689 }
01690
01691 if (copytocache && size > 0 &&
01692 strncmp(fPackageDir, name, fPackageDir.Length()))
01693 CopyToCache(name, 0);
01694 if (IsMaster() && fw == 1) {
01695 Int_t opt = TProof::kForward | TProof::kCp;
01696 if (bin)
01697 opt |= TProof::kBinary;
01698 PDB(kGlobal, 1)
01699 Info("HandleSocketInput","forwarding file: %s", fnam.Data());
01700 if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
01701 Error("HandleSocketInput", "forwarding file: %s", fnam.Data());
01702 }
01703 }
01704 if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE);
01705 }
01706 break;
01707
01708 case kPROOF_LOGFILE:
01709 {
01710 Int_t start, end;
01711 (*mess) >> start >> end;
01712 PDB(kGlobal, 1)
01713 Info("HandleSocketInput:kPROOF_LOGFILE",
01714 "Logfile request - byte range: %d - %d", start, end);
01715
01716 LogToMaster();
01717 SendLogFile(0, start, end);
01718 }
01719 break;
01720
01721 case kPROOF_PARALLEL:
01722 if (all) {
01723 if (IsMaster()) {
01724 Int_t nodes;
01725 Bool_t random = kFALSE;
01726 (*mess) >> nodes;
01727 if ((mess->BufferSize() > mess->Length()))
01728 (*mess) >> random;
01729 if (fProof) fProof->SetParallel(nodes, random);
01730 rc = 1;
01731 }
01732 } else {
01733 rc = -1;
01734 }
01735
01736 SendLogFile();
01737 break;
01738
01739 case kPROOF_CACHE:
01740 if (!all && fProtocol <= 19) {
01741
01742 rc = 2;
01743 } else {
01744 TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
01745 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
01746 Int_t status = HandleCache(mess, pslb);
01747
01748 SendLogFile(status);
01749 }
01750 break;
01751
01752 case kPROOF_WORKERLISTS:
01753 if (all) {
01754 if (IsMaster())
01755 HandleWorkerLists(mess);
01756 else
01757 Warning("HandleSocketInput:kPROOF_WORKERLISTS",
01758 "Action meaning-less on worker nodes: protocol error?");
01759 } else {
01760 rc = -1;
01761 }
01762
01763 SendLogFile();
01764 break;
01765
01766 case kPROOF_GETSLAVEINFO:
01767 if (all) {
01768 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
01769 if (IsMaster()) {
01770 TList *info = fProof->GetListOfSlaveInfos();
01771 TMessage answ(kPROOF_GETSLAVEINFO);
01772 answ << info;
01773 fSocket->Send(answ);
01774 } else {
01775 TMessage answ(kPROOF_GETSLAVEINFO);
01776 TList *info = new TList;
01777 TSlaveInfo *wi = new TSlaveInfo(GetOrdinal(), TUrl(gSystem->HostName()).GetHostFQDN(), 0, "", GetDataDir());
01778 SysInfo_t si;
01779 gSystem->GetSysInfo(&si);
01780 wi->SetSysInfo(si);
01781 info->Add(wi);
01782 answ << (TList *)info;
01783 fSocket->Send(answ);
01784 info->SetOwner(kTRUE);
01785 delete info;
01786 }
01787
01788 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
01789 } else {
01790 TMessage answ(kPROOF_GETSLAVEINFO);
01791 answ << (TList *)0;
01792 fSocket->Send(answ);
01793 rc = -1;
01794 }
01795 break;
01796
01797 case kPROOF_GETTREEHEADER:
01798 if (all) {
01799 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
01800
01801 TVirtualProofPlayer *p = TVirtualProofPlayer::Create("slave", 0, fSocket);
01802 p->HandleGetTreeHeader(mess);
01803 delete p;
01804
01805 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
01806 } else {
01807 TMessage answ(kPROOF_GETTREEHEADER);
01808 answ << TString("Failed") << (TObject *)0;
01809 fSocket->Send(answ);
01810 rc = -1;
01811 }
01812 break;
01813
01814 case kPROOF_GETOUTPUTLIST:
01815 { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
01816 TList* outputList = 0;
01817 if (IsMaster()) {
01818 outputList = fProof->GetOutputList();
01819 if (!outputList)
01820 outputList = new TList();
01821 } else {
01822 outputList = new TList();
01823 if (fProof->GetPlayer()) {
01824 TList *olist = fProof->GetPlayer()->GetOutputList();
01825 TIter next(olist);
01826 TObject *o;
01827 while ( (o = next()) ) {
01828 outputList->Add(new TNamed(o->GetName(), ""));
01829 }
01830 }
01831 }
01832 outputList->SetOwner();
01833 TMessage answ(kPROOF_GETOUTPUTLIST);
01834 answ << outputList;
01835 fSocket->Send(answ);
01836 delete outputList;
01837 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
01838 }
01839 break;
01840
01841 case kPROOF_VALIDATE_DSET:
01842 if (all) {
01843 PDB(kGlobal, 1)
01844 Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
01845
01846 TDSet* dset = 0;
01847 (*mess) >> dset;
01848
01849 if (IsMaster()) fProof->ValidateDSet(dset);
01850 else dset->Validate();
01851
01852 TMessage answ(kPROOF_VALIDATE_DSET);
01853 answ << dset;
01854 fSocket->Send(answ);
01855 delete dset;
01856 PDB(kGlobal, 1)
01857 Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
01858 } else {
01859 rc = -1;
01860 }
01861
01862 SendLogFile();
01863 break;
01864
01865 case kPROOF_DATA_READY:
01866 if (all) {
01867 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
01868 TMessage answ(kPROOF_DATA_READY);
01869 if (IsMaster()) {
01870 Long64_t totalbytes = 0, bytesready = 0;
01871 Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
01872 answ << dataready << totalbytes << bytesready;
01873 } else {
01874 Error("HandleSocketInput:kPROOF_DATA_READY",
01875 "This message should not be sent to slaves");
01876 answ << kFALSE << Long64_t(0) << Long64_t(0);
01877 }
01878 fSocket->Send(answ);
01879 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
01880 } else {
01881 TMessage answ(kPROOF_DATA_READY);
01882 answ << kFALSE << Long64_t(0) << Long64_t(0);
01883 fSocket->Send(answ);
01884 rc = -1;
01885 }
01886
01887 SendLogFile();
01888 break;
01889
01890 case kPROOF_DATASETS:
01891 { Int_t xrc = -1;
01892 if (fProtocol > 16) {
01893 xrc = HandleDataSets(mess, pslb);
01894 } else {
01895 Error("HandleSocketInput", "old client: no or incompatible dataset support");
01896 }
01897 SendLogFile(xrc);
01898 }
01899 break;
01900
01901 case kPROOF_SUBMERGER:
01902 { HandleSubmerger(mess);
01903 }
01904 break;
01905
01906 case kPROOF_LIB_INC_PATH:
01907 if (all) {
01908 HandleLibIncPath(mess);
01909 } else {
01910 rc = -1;
01911 }
01912
01913 SendLogFile();
01914 break;
01915
01916 case kPROOF_REALTIMELOG:
01917 { Bool_t on;
01918 (*mess) >> on;
01919 PDB(kGlobal, 1)
01920 Info("HandleSocketInput:kPROOF_REALTIMELOG",
01921 "setting real-time logging %s", (on ? "ON" : "OFF"));
01922 fRealTimeLog = on;
01923
01924 if (IsMaster())
01925 fProof->SetRealTimeLog(on);
01926 }
01927 break;
01928
01929 case kPROOF_FORK:
01930 if (all) {
01931 HandleFork(mess);
01932 LogToMaster();
01933 } else {
01934 rc = -1;
01935 }
01936 SendLogFile();
01937 break;
01938
01939 case kPROOF_STARTPROCESS:
01940 if (all) {
01941
01942
01943 if (WaitingQueries() == 0) {
01944 Error("HandleSocketInput", "no queries enqueued");
01945 break;
01946 }
01947
01948
01949
01950 TList *workerList = (fProof->UseDynamicStartup()) ? new TList : (TList *)0;
01951 Int_t pc = 0;
01952 EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
01953
01954 if (retVal == TProofServ::kQueryOK) {
01955 Int_t ret = 0;
01956 if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
01957 Error("HandleSocketInput", "adding a list of worker nodes returned: %d", ret);
01958 } else {
01959 ProcessNext(pslb);
01960
01961 SetIdle(kTRUE);
01962
01963 TMessage m(kPROOF_SETIDLE);
01964 Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
01965 m << waiting;
01966 fSocket->Send(m);
01967 }
01968 } else {
01969 if (retVal == TProofServ::kQueryStop) {
01970 Error("HandleSocketInput", "error getting list of worker nodes");
01971 } else if (retVal != TProofServ::kQueryEnqueued) {
01972 Warning("HandleSocketInput", "query was re-queued!");
01973 } else {
01974 Error("HandleSocketInput", "unexpected answer: %d", retVal);
01975 break;
01976 }
01977 }
01978
01979 }
01980 break;
01981
01982 case kPROOF_GOASYNC:
01983 {
01984
01985
01986 if (!IsIdle() && fPlayer) {
01987
01988 TProofQueryResult *pq = (TProofQueryResult *) fPlayer->GetCurrentQuery();
01989 TMessage m(kPROOF_QUERYSUBMITTED);
01990 m << pq->GetSeqNum() << kFALSE;
01991 fSocket->Send(m);
01992 } else {
01993
01994 SendAsynMessage("Processing request to go asynchronous:"
01995 " idle or undefined player - ignoring");
01996 }
01997 }
01998 break;
01999
02000 default:
02001 Error("HandleSocketInput", "unknown command %d", what);
02002 rc = -2;
02003 break;
02004 }
02005
02006 fRealTime += (Float_t)timer.RealTime();
02007 fCpuTime += (Float_t)timer.CpuTime();
02008
02009 if (!(slb.IsNull()) || fgLogToSysLog > 1) {
02010 TString s;
02011 s.Form("%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
02012 what, timer.RealTime(), timer.CpuTime(), slb.Data());
02013 gSystem->Syslog(kLogNotice, s.Data());
02014 }
02015
02016
02017 return rc;
02018 }
02019
02020
02021 Bool_t TProofServ::AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer)
02022 {
02023
02024
02025 TMessage *mess = new TMessage();
02026 Int_t mergedWorkers = 0;
02027
02028 PDB(kSubmerger, 1) Info("AcceptResults", "enter");
02029
02030
02031 Bool_t result = kTRUE;
02032
02033 fMergingMonitor = new TMonitor();
02034 fMergingMonitor->Add(fMergingSocket);
02035
02036 Int_t numworkers = 0;
02037 while (fMergingMonitor->GetActive() > 0 && mergedWorkers < connections) {
02038
02039 TSocket *s = fMergingMonitor->Select();
02040 if (!s) {
02041 Info("AcceptResults", "interrupt!");
02042 result = kFALSE;
02043 break;
02044 }
02045
02046 if (s == fMergingSocket) {
02047
02048 TSocket *sw = fMergingSocket->Accept();
02049 fMergingMonitor->Add(sw);
02050
02051 PDB(kSubmerger, 2)
02052 Info("AcceptResults", "connection from a worker accepted on merger %s ",
02053 fOrdinal.Data());
02054
02055 if (++numworkers >= connections)
02056 fMergingMonitor->Remove(fMergingSocket);
02057 } else {
02058 s->Recv(mess);
02059 PDB(kSubmerger, 2)
02060 Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
02061 if (!mess) {
02062 Error("AcceptResults", "message received: %p ", mess);
02063 continue;
02064 }
02065 Int_t type = 0;
02066
02067
02068 while ((mess->BufferSize() > mess->Length())) {
02069 (*mess) >> type;
02070
02071 PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
02072 if (type == 2) {
02073 mergedWorkers++;
02074 PDB(kSubmerger, 2)
02075 Info("AcceptResults",
02076 "a new worker has been mergerd. Total merged workers: %d",
02077 mergedWorkers);
02078 }
02079 TObject *o = mess->ReadObject(TObject::Class());
02080 if (mergerPlayer->AddOutputObject(o) == 1) {
02081
02082 PDB(kSubmerger, 2) Info("AcceptResults", "removing %p (has been merged)", o);
02083 SafeDelete(o);
02084 } else
02085 PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
02086 }
02087 }
02088 }
02089 fMergingMonitor->DeActivateAll();
02090
02091 TList* sockets = fMergingMonitor->GetListOfDeActives();
02092 Int_t size = sockets->GetSize();
02093 for (Int_t i =0; i< size; ++i){
02094 ((TSocket*)(sockets->At(i)))->Close();
02095 PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
02096 delete ((TSocket*)(sockets->At(i)));
02097 }
02098
02099 fMergingMonitor->RemoveAll();
02100 SafeDelete(fMergingMonitor);
02101
02102 PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
02103 return result;
02104 }
02105
02106
02107 void TProofServ::HandleUrgentData()
02108 {
02109
02110
02111 char oob_byte;
02112 Int_t n, nch, wasted = 0;
02113
02114 const Int_t kBufSize = 1024;
02115 char waste[kBufSize];
02116
02117
02118 TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
02119
02120 PDB(kGlobal, 5)
02121 Info("HandleUrgentData", "handling oob...");
02122
02123
02124 while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
02125 if (n == -2) {
02126
02127
02128
02129
02130
02131
02132
02133
02134
02135
02136 fSocket->GetOption(kBytesToRead, nch);
02137 if (nch == 0) {
02138 gSystem->Sleep(1000);
02139 continue;
02140 }
02141
02142 if (nch > kBufSize) nch = kBufSize;
02143 n = fSocket->RecvRaw(waste, nch);
02144 if (n <= 0) {
02145 Error("HandleUrgentData", "error receiving waste");
02146 break;
02147 }
02148 wasted = 1;
02149 } else {
02150 Error("HandleUrgentData", "error receiving OOB");
02151 return;
02152 }
02153 }
02154
02155 PDB(kGlobal, 5)
02156 Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
02157
02158 if (fProof) fProof->SetActive();
02159
02160 switch (oob_byte) {
02161
02162 case TProof::kHardInterrupt:
02163 Info("HandleUrgentData", "*** Hard Interrupt");
02164
02165
02166 if (IsMaster())
02167 fProof->Interrupt(TProof::kHardInterrupt);
02168
02169
02170 while (1) {
02171 Int_t atmark;
02172
02173 fSocket->GetOption(kAtMark, atmark);
02174
02175 if (atmark) {
02176
02177
02178 n = fSocket->SendRaw(&oob_byte, 1, kOob);
02179 if (n <= 0)
02180 Error("HandleUrgentData", "error sending OOB");
02181 break;
02182 }
02183
02184
02185 fSocket->GetOption(kBytesToRead, nch);
02186 if (nch == 0) {
02187 gSystem->Sleep(1000);
02188 continue;
02189 }
02190
02191 if (nch > kBufSize) nch = kBufSize;
02192 n = fSocket->RecvRaw(waste, nch);
02193 if (n <= 0) {
02194 Error("HandleUrgentData", "error receiving waste (2)");
02195 break;
02196 }
02197 }
02198
02199 SendLogFile();
02200
02201 break;
02202
02203 case TProof::kSoftInterrupt:
02204 Info("HandleUrgentData", "Soft Interrupt");
02205
02206
02207 if (IsMaster())
02208 fProof->Interrupt(TProof::kSoftInterrupt);
02209
02210 if (wasted) {
02211 Error("HandleUrgentData", "soft interrupt flushed stream");
02212 break;
02213 }
02214
02215 Interrupt();
02216
02217 SendLogFile();
02218
02219 break;
02220
02221 case TProof::kShutdownInterrupt:
02222 Info("HandleUrgentData", "Shutdown Interrupt");
02223
02224
02225 if (IsMaster())
02226 fProof->Interrupt(TProof::kShutdownInterrupt);
02227
02228 Terminate(0);
02229
02230 break;
02231
02232 default:
02233 Error("HandleUrgentData", "unexpected OOB byte");
02234 break;
02235 }
02236
02237 if (fProof) fProof->SetActive(kFALSE);
02238 }
02239
02240
02241 void TProofServ::HandleSigPipe()
02242 {
02243
02244
02245
02246
02247 TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
02248
02249 if (IsMaster()) {
02250
02251
02252 if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
02253 Info("HandleSigPipe", "keepAlive probe failed");
02254
02255
02256 fProof->SetActive();
02257 fProof->Interrupt(TProof::kShutdownInterrupt);
02258 fProof->SetActive(kFALSE);
02259 Terminate(0);
02260 }
02261 } else {
02262 Info("HandleSigPipe", "keepAlive probe failed");
02263 Terminate(0);
02264 }
02265 }
02266
02267
02268 Bool_t TProofServ::IsParallel() const
02269 {
02270
02271
02272 if (IsMaster() && fProof)
02273 return fProof->IsParallel();
02274
02275
02276 return kFALSE;
02277 }
02278
02279
02280 void TProofServ::Print(Option_t *option) const
02281 {
02282
02283
02284 if (IsMaster() && fProof)
02285 fProof->Print(option);
02286 else
02287 Printf("This is worker %s", gSystem->HostName());
02288 }
02289
02290
02291 void TProofServ::RedirectOutput(const char *dir, const char *mode)
02292 {
02293
02294
02295
02296 char logfile[512];
02297
02298 TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
02299 if (IsMaster()) {
02300 snprintf(logfile, 512, "%s/master-%s.log", sdir.Data(), fOrdinal.Data());
02301 } else {
02302 snprintf(logfile, 512, "%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
02303 }
02304
02305 if ((freopen(logfile, mode, stdout)) == 0)
02306 SysError("RedirectOutput", "could not freopen stdout (%s)", logfile);
02307
02308 if ((dup2(fileno(stdout), fileno(stderr))) < 0)
02309 SysError("RedirectOutput", "could not redirect stderr");
02310
02311 if ((fLogFile = fopen(logfile, "r")) == 0)
02312 SysError("RedirectOutput", "could not open logfile '%s'", logfile);
02313
02314
02315 if (fProtocol < 4 && fWorkDir != TString::Format("~/%s", kPROOF_WorkDir)) {
02316 Warning("RedirectOutput", "no way to tell master (or client) where"
02317 " to upload packages");
02318 }
02319 }
02320
02321
02322 void TProofServ::Reset(const char *dir)
02323 {
02324
02325
02326
02327
02328 TString dd(dir);
02329 if (!dd.BeginsWith("proofserv")) {
02330 Int_t ic = dd.Index(":");
02331 if (ic != kNPOS)
02332 dd.Replace(0, ic, "proofserv");
02333 }
02334 gDirectory->cd(dd.Data());
02335
02336
02337 gROOT->Reset();
02338
02339
02340
02341 if (gDirectory != gROOT) {
02342 gDirectory->Delete();
02343 }
02344
02345 if (IsMaster()) fProof->SendCurrentState();
02346 }
02347
02348
02349 Int_t TProofServ::ReceiveFile(const char *file, Bool_t bin, Long64_t size)
02350 {
02351
02352
02353
02354
02355
02356 if (size <= 0) return 0;
02357
02358
02359 Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
02360 if (fd < 0) {
02361 SysError("ReceiveFile", "error opening file %s", file);
02362 return -1;
02363 }
02364
02365 const Int_t kMAXBUF = 16384;
02366 char buf[kMAXBUF], cpy[kMAXBUF];
02367
02368 Int_t left, r;
02369 Long64_t filesize = 0;
02370
02371 while (filesize < size) {
02372 left = Int_t(size - filesize);
02373 if (left > kMAXBUF)
02374 left = kMAXBUF;
02375 r = fSocket->RecvRaw(&buf, left);
02376 if (r > 0) {
02377 char *p = buf;
02378
02379 filesize += r;
02380 while (r) {
02381 Int_t w;
02382
02383 if (!bin) {
02384 Int_t k = 0, i = 0, j = 0;
02385 char *q;
02386 while (i < r) {
02387 if (p[i] == '\r') {
02388 i++;
02389 k++;
02390 }
02391 cpy[j++] = buf[i++];
02392 }
02393 q = cpy;
02394 r -= k;
02395 w = write(fd, q, r);
02396 } else {
02397 w = write(fd, p, r);
02398 }
02399
02400 if (w < 0) {
02401 SysError("ReceiveFile", "error writing to file %s", file);
02402 close(fd);
02403 return -1;
02404 }
02405 r -= w;
02406 p += w;
02407 }
02408 } else if (r < 0) {
02409 Error("ReceiveFile", "error during receiving file %s", file);
02410 close(fd);
02411 return -1;
02412 }
02413 }
02414
02415 close(fd);
02416
02417 chmod(file, 0644);
02418
02419 return 0;
02420 }
02421
02422
02423 void TProofServ::Run(Bool_t retrn)
02424 {
02425
02426
02427
02428 if (CreateServer() == 0) {
02429
02430
02431 TApplication::Run(retrn);
02432 }
02433 }
02434
02435
02436 void TProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
02437 {
02438
02439
02440
02441
02442
02443 fflush(stdout);
02444
02445
02446
02447 if (!IsMaster()) {
02448 if (!fSendLogToMaster) {
02449 FlushLogFile();
02450 } else {
02451
02452 LogToMaster(kFALSE);
02453 }
02454 }
02455
02456 off_t ltot=0, lnow=0;
02457 Int_t left = -1;
02458 Bool_t adhoc = kFALSE;
02459
02460 if (fLogFileDes > -1) {
02461 ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
02462 lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
02463
02464 if (ltot >= 0 && lnow >= 0) {
02465 if (start > -1) {
02466 lseek(fLogFileDes, (off_t) start, SEEK_SET);
02467 if (end <= start || end > ltot)
02468 end = ltot;
02469 left = (Int_t)(end - start);
02470 if (end < ltot)
02471 left++;
02472 adhoc = kTRUE;
02473 } else {
02474 left = (Int_t)(ltot - lnow);
02475 }
02476 }
02477 }
02478
02479 if (left > 0) {
02480 fSocket->Send(left, kPROOF_LOGFILE);
02481
02482 const Int_t kMAXBUF = 32768;
02483 char buf[kMAXBUF];
02484 Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
02485 Int_t len;
02486 do {
02487 while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
02488 TSystem::GetErrno() == EINTR)
02489 TSystem::ResetErrno();
02490
02491 if (len < 0) {
02492 SysError("SendLogFile", "error reading log file");
02493 break;
02494 }
02495
02496 if (end == ltot && len == wanted)
02497 buf[len-1] = '\n';
02498
02499 if (fSocket->SendRaw(buf, len) < 0) {
02500 SysError("SendLogFile", "error sending log file");
02501 break;
02502 }
02503
02504
02505 left -= len;
02506 wanted = (left > kMAXBUF) ? kMAXBUF : left;
02507
02508 } while (len > 0 && left > 0);
02509 }
02510
02511
02512 if (adhoc && lnow >=0 )
02513 lseek(fLogFileDes, lnow, SEEK_SET);
02514
02515 TMessage mess(kPROOF_LOGDONE);
02516 if (IsMaster())
02517 mess << status << (fProof ? fProof->GetParallel() : 0);
02518 else
02519 mess << status << (Int_t) 1;
02520
02521 fSocket->Send(mess);
02522
02523 PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
02524 }
02525
02526
02527 void TProofServ::SendStatistics()
02528 {
02529
02530
02531 Long64_t bytesread = TFile::GetFileBytesRead();
02532 Float_t cputime = fCpuTime, realtime = fRealTime;
02533 if (IsMaster()) {
02534 bytesread = fProof->GetBytesRead();
02535 cputime = fProof->GetCpuTime();
02536 realtime = fProof->GetRealTime();
02537 }
02538
02539 TMessage mess(kPROOF_GETSTATS);
02540 TString workdir = gSystem->WorkingDirectory();
02541 mess << bytesread << realtime << cputime << workdir;
02542 if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
02543 mess << TString(gProofServ->GetImage());
02544 fSocket->Send(mess);
02545 }
02546
02547
02548 void TProofServ::SendParallel(Bool_t async)
02549 {
02550
02551
02552 Int_t nparallel = 0;
02553 if (IsMaster()) {
02554 fProof->AskParallel();
02555 nparallel = fProof->GetParallel();
02556 } else {
02557 nparallel = 1;
02558 }
02559
02560 TMessage mess(kPROOF_GETPARALLEL);
02561 mess << nparallel << async;
02562 fSocket->Send(mess);
02563 }
02564
02565
02566 Int_t TProofServ::UnloadPackage(const char *package)
02567 {
02568
02569
02570
02571
02572
02573
02574 TObjString *pack = (TObjString *) fEnabledPackages->FindObject(package);
02575 if (pack) {
02576
02577
02578 TString aclicincpath = gSystem->GetIncludePath();
02579 TString cintincpath = gInterpreter->GetIncludePath();
02580
02581 aclicincpath.Remove(aclicincpath.Length() - cintincpath.Length() - 1);
02582
02583 aclicincpath.ReplaceAll(TString(" -I") + package, "");
02584 gSystem->SetIncludePath(aclicincpath);
02585
02586
02587
02588
02589 delete fEnabledPackages->Remove(pack);
02590 PDB(kPackage, 1)
02591 Info("UnloadPackage",
02592 "package %s successfully unloaded", package);
02593 }
02594
02595
02596 if (!gSystem->AccessPathName(package))
02597 if (gSystem->Unlink(package) != 0)
02598 Warning("UnloadPackage", "unable to remove symlink to %s", package);
02599
02600
02601 return 0;
02602 }
02603
02604
02605 Int_t TProofServ::UnloadPackages()
02606 {
02607
02608
02609
02610 TIter nextpackage(fEnabledPackages);
02611 while (TObjString* objstr = dynamic_cast<TObjString*>(nextpackage()))
02612 if (UnloadPackage(objstr->String()) != 0)
02613 return -1;
02614
02615 PDB(kPackage, 1)
02616 Info("UnloadPackages",
02617 "packages successfully unloaded");
02618
02619 return 0;
02620 }
02621
02622
02623 Int_t TProofServ::Setup()
02624 {
02625
02626
02627
02628 char str[512];
02629
02630 if (IsMaster()) {
02631 snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
02632 } else {
02633 snprintf(str, 512, "**** PROOF slave server @ %s started ****", gSystem->HostName());
02634 }
02635
02636 if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
02637 Error("Setup", "failed to send proof server startup message");
02638 return -1;
02639 }
02640
02641
02642
02643 Int_t what;
02644 if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
02645 Error("Setup", "failed to receive remote proof protocol");
02646 return -1;
02647 }
02648 if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
02649 Error("Setup", "failed to send local proof protocol");
02650 return -1;
02651 }
02652
02653
02654 if (fProtocol < 5) {
02655 TString wconf;
02656 if (OldAuthSetup(wconf) != 0) {
02657 Error("Setup", "OldAuthSetup: failed to setup authentication");
02658 return -1;
02659 }
02660 if (IsMaster()) {
02661 fConfFile = wconf;
02662 fWorkDir.Form("~/%s", kPROOF_WorkDir);
02663 } else {
02664 if (fProtocol < 4) {
02665 fWorkDir.Form("~/%s", kPROOF_WorkDir);
02666 } else {
02667 fWorkDir = wconf;
02668 if (fWorkDir.IsNull()) fWorkDir.Form("~/%s", kPROOF_WorkDir);
02669 }
02670 }
02671 } else {
02672
02673
02674 TMessage *mess;
02675 if ((fSocket->Recv(mess) <= 0) || !mess) {
02676 Error("Setup", "failed to receive ordinal and config info");
02677 return -1;
02678 }
02679 if (IsMaster()) {
02680 (*mess) >> fUser >> fOrdinal >> fConfFile;
02681 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
02682 } else {
02683 (*mess) >> fUser >> fOrdinal >> fWorkDir;
02684 if (fWorkDir.IsNull())
02685 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
02686 }
02687
02688 if (fOrdinal != "-1")
02689 fPrefix += fOrdinal;
02690 TProofServLogHandler::SetDefaultPrefix(fPrefix);
02691 delete mess;
02692 }
02693
02694 if (IsMaster()) {
02695
02696
02697 TString conffile = fConfFile;
02698 conffile.Remove(0, 1 + conffile.Index(":"));
02699
02700
02701 TProofResourcesStatic resources(fConfDir, conffile);
02702 if (resources.IsValid()) {
02703 if (resources.GetMaster()) {
02704 TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
02705 if (tmpWorkDir != "")
02706 fWorkDir = tmpWorkDir;
02707 }
02708 } else {
02709 Info("Setup", "invalid config file %s (missing or unreadable",
02710 resources.GetFileName().Data());
02711 }
02712 }
02713
02714
02715
02716 gSystem->Setenv("HOME", gSystem->HomeDirectory());
02717
02718
02719 if (fWorkDir.BeginsWith("/") &&
02720 !fWorkDir.BeginsWith(gSystem->HomeDirectory())) {
02721 if (!fWorkDir.EndsWith("/"))
02722 fWorkDir += "/";
02723 UserGroup_t *u = gSystem->GetUserInfo();
02724 if (u) {
02725 fWorkDir += u->fUser;
02726 delete u;
02727 }
02728 }
02729
02730
02731 char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
02732 fWorkDir = workdir;
02733 delete [] workdir;
02734 if (gProofDebugLevel > 0)
02735 Info("Setup", "working directory set to %s", fWorkDir.Data());
02736
02737
02738 TString host = gSystem->HostName();
02739 if (host.Index(".") != kNPOS)
02740 host.Remove(host.Index("."));
02741
02742
02743 fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
02744 (Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
02745 fTopSessionTag = fSessionTag;
02746
02747
02748 fSessionDir = fWorkDir;
02749 if (IsMaster())
02750 fSessionDir += "/master-";
02751 else
02752 fSessionDir += "/slave-";
02753 fSessionDir += fSessionTag;
02754
02755
02756 if (SetupCommon() != 0) {
02757 Error("Setup", "common setup failed");
02758 return -1;
02759 }
02760
02761
02762 fSocket->SetOption(kProcessGroup, gSystem->GetPid());
02763
02764
02765 fSocket->SetOption(kNoDelay, 1);
02766
02767
02768 fSocket->SetOption(kKeepAlive, 1);
02769
02770
02771 return 0;
02772 }
02773
02774
02775 Int_t TProofServ::SetupCommon()
02776 {
02777
02778
02779
02780
02781 gSystem->Umask(022);
02782
02783 #ifdef R__UNIX
02784 TString bindir;
02785 # ifdef ROOTBINDIR
02786 bindir = ROOTBINDIR;
02787 # else
02788 bindir = gSystem->Getenv("ROOTSYS");
02789 if (!bindir.IsNull()) bindir += "/bin";
02790 # endif
02791 # ifdef COMPILER
02792 TString compiler = COMPILER;
02793 if (compiler.Index("is ") != kNPOS)
02794 compiler.Remove(0, compiler.Index("is ") + 3);
02795 compiler = gSystem->DirName(compiler);
02796 if (!bindir.IsNull()) bindir += ":";
02797 bindir += compiler;
02798 #endif
02799 if (!bindir.IsNull()) bindir += ":";
02800 bindir += "/bin:/usr/bin:/usr/local/bin";
02801
02802 TString path(gSystem->Getenv("PATH"));
02803 if (!path.IsNull()) path.Insert(0, ":");
02804 path.Insert(0, bindir);
02805 gSystem->Setenv("PATH", path);
02806 #endif
02807
02808 if (gSystem->AccessPathName(fWorkDir)) {
02809 gSystem->mkdir(fWorkDir, kTRUE);
02810 if (!gSystem->ChangeDirectory(fWorkDir)) {
02811 Error("SetupCommon", "can not change to PROOF directory %s",
02812 fWorkDir.Data());
02813 return -1;
02814 }
02815 } else {
02816 if (!gSystem->ChangeDirectory(fWorkDir)) {
02817 gSystem->Unlink(fWorkDir);
02818 gSystem->mkdir(fWorkDir, kTRUE);
02819 if (!gSystem->ChangeDirectory(fWorkDir)) {
02820 Error("SetupCommon", "can not change to PROOF directory %s",
02821 fWorkDir.Data());
02822 return -1;
02823 }
02824 }
02825 }
02826
02827
02828 fGroup = gEnv->GetValue("ProofServ.ProofGroup", "default");
02829
02830
02831 fCacheDir = gEnv->GetValue("ProofServ.CacheDir",
02832 TString::Format("%s/%s", fWorkDir.Data(), kPROOF_CacheDir));
02833 ResolveKeywords(fCacheDir);
02834 if (gSystem->AccessPathName(fCacheDir))
02835 gSystem->mkdir(fCacheDir, kTRUE);
02836 if (gProofDebugLevel > 0)
02837 Info("SetupCommon", "cache directory set to %s", fCacheDir.Data());
02838 fCacheLock =
02839 new TProofLockPath(TString::Format("%s/%s%s",
02840 gSystem->TempDirectory(), kPROOF_CacheLockFile,
02841 TString(fCacheDir).ReplaceAll("/","%").Data()));
02842
02843
02844 fPackageDir = gEnv->GetValue("ProofServ.PackageDir",
02845 TString::Format("%s/%s", fWorkDir.Data(), kPROOF_PackDir));
02846 ResolveKeywords(fPackageDir);
02847 if (gSystem->AccessPathName(fPackageDir))
02848 gSystem->mkdir(fPackageDir, kTRUE);
02849 if (gProofDebugLevel > 0)
02850 Info("SetupCommon", "package directory set to %s", fPackageDir.Data());
02851 fPackageLock =
02852 new TProofLockPath(TString::Format("%s/%s%s",
02853 gSystem->TempDirectory(), kPROOF_PackageLockFile,
02854 TString(fPackageDir).ReplaceAll("/","%").Data()));
02855
02856
02857 fDataDir = gEnv->GetValue("ProofServ.DataDir","");
02858 if (fDataDir.IsNull()) {
02859
02860 fDataDir.Form("%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
02861 }
02862 ResolveKeywords(fDataDir);
02863 if (gSystem->AccessPathName(fDataDir))
02864 gSystem->mkdir(fDataDir, kTRUE);
02865 if (gProofDebugLevel > 0)
02866 Info("SetupCommon", "data directory set to %s", fDataDir.Data());
02867
02868
02869 TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
02870 if (globpack.Length() > 0) {
02871 Int_t ng = 0;
02872 Int_t from = 0;
02873 TString ldir;
02874 while (globpack.Tokenize(ldir, from, ":")) {
02875 if (gSystem->AccessPathName(ldir, kReadPermission)) {
02876 Warning("SetupCommon", "directory for global packages %s does not"
02877 " exist or is not readable", ldir.Data());
02878 } else {
02879
02880 TString key;
02881 key.Form("G%d", ng++);
02882 if (!fGlobalPackageDirList) {
02883 fGlobalPackageDirList = new THashList();
02884 fGlobalPackageDirList->SetOwner();
02885 }
02886 fGlobalPackageDirList->Add(new TNamed(key,ldir));
02887 Info("SetupCommon", "directory for global packages %s added to the list",
02888 ldir.Data());
02889 FlushLogFile();
02890 }
02891 }
02892 }
02893
02894
02895 if (fSessionDir != gSystem->WorkingDirectory()) {
02896 ResolveKeywords(fSessionDir);
02897 if (gSystem->AccessPathName(fSessionDir))
02898 gSystem->mkdir(fSessionDir, kTRUE);
02899 if (!gSystem->ChangeDirectory(fSessionDir)) {
02900 Error("SetupCommon", "can not change to working directory %s",
02901 fSessionDir.Data());
02902 return -1;
02903 }
02904 }
02905 gSystem->Setenv("PROOF_SANDBOX", fSessionDir);
02906 if (gProofDebugLevel > 0)
02907 Info("SetupCommon", "session dir is %s", fSessionDir.Data());
02908
02909
02910
02911 if (IsMaster()) {
02912
02913
02914 fQueryDir = fWorkDir;
02915 fQueryDir += TString("/") + kPROOF_QueryDir;
02916 ResolveKeywords(fQueryDir);
02917 if (gSystem->AccessPathName(fQueryDir))
02918 gSystem->mkdir(fQueryDir, kTRUE);
02919 fQueryDir += TString("/session-") + fTopSessionTag;
02920 if (gSystem->AccessPathName(fQueryDir))
02921 gSystem->mkdir(fQueryDir, kTRUE);
02922 if (gProofDebugLevel > 0)
02923 Info("SetupCommon", "queries dir is %s", fQueryDir.Data());
02924
02925
02926 fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s",
02927 gSystem->TempDirectory(),
02928 kPROOF_QueryLockFile, fTopSessionTag.Data(),
02929 TString(fQueryDir).ReplaceAll("/","%").Data()));
02930 fQueryLock->Lock();
02931
02932 fQMgr = new TQueryResultManager(fQueryDir, fSessionTag, fSessionDir,
02933 fQueryLock, 0);
02934 }
02935
02936
02937 fImage = gEnv->GetValue("ProofServ.Image", "");
02938
02939
02940 if (IsMaster()) {
02941
02942 TMessage m(kPROOF_SESSIONTAG);
02943 m << fTopSessionTag;
02944 if (GetProtocol() > 24) m << fGroup;
02945 fSocket->Send(m);
02946
02947 fGroupPriority = GetPriority();
02948
02949 TPluginHandler *h = 0;
02950 TString dsms = gEnv->GetValue("Proof.DataSetManager", "");
02951 if (!dsms.IsNull()) {
02952 TString dsm;
02953 Int_t from = 0;
02954 dsms.Tokenize(dsm, from, ",");
02955
02956 if (gROOT->GetPluginManager()) {
02957
02958 h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
02959 if (h && h->LoadPlugin() != -1) {
02960
02961 fDataSetManager =
02962 reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, fGroup.Data(),
02963 fUser.Data(), dsm.Data()));
02964 }
02965 }
02966
02967 if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
02968 Warning("SetupCommon", "dataset manager plug-in initialization failed");
02969 SendAsynMessage("TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
02970 SafeDelete(fDataSetManager);
02971 }
02972 } else {
02973
02974 TString opts("Av:");
02975 TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
02976 if (dsetdir.IsNull()) {
02977
02978 dsetdir.Form("%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
02979 if (gSystem->AccessPathName(fDataSetDir))
02980 gSystem->MakeDirectory(fDataSetDir);
02981 opts += "Sb:";
02982 }
02983
02984 if (!h) {
02985 h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
02986 if (h && h->LoadPlugin() == -1) h = 0;
02987 }
02988 if (h) {
02989
02990 TString oo = TString::Format("dir:%s opt:%s", dsetdir.Data(), opts.Data());
02991 fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
02992 fGroup.Data(), fUser.Data(), oo.Data()));
02993 }
02994 if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
02995 Warning("SetupCommon", "default dataset manager plug-in initialization failed");
02996 SafeDelete(fDataSetManager);
02997 }
02998 }
02999 }
03000
03001
03002 TString quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotas.%s", fUser.Data()),"");
03003 if (quotas.IsNull())
03004 quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotasByGroup.%s", fGroup.Data()),"");
03005 if (quotas.IsNull())
03006 quotas = gEnv->GetValue("ProofServ.UserQuotas", "");
03007 if (!quotas.IsNull()) {
03008
03009 TString tok;
03010 Ssiz_t from = 0;
03011 while (quotas.Tokenize(tok, from, " ")) {
03012
03013 if (tok.BeginsWith("maxquerykept=")) {
03014 tok.ReplaceAll("maxquerykept=","");
03015 if (tok.IsDigit())
03016 fMaxQueries = tok.Atoi();
03017 else
03018 Info("SetupCommon",
03019 "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
03020 }
03021
03022 const char *ksz[2] = {"hwmsz=", "maxsz="};
03023 for (Int_t j = 0; j < 2; j++) {
03024 if (tok.BeginsWith(ksz[j])) {
03025 tok.ReplaceAll(ksz[j],"");
03026 Long64_t fact = -1;
03027 if (!tok.IsDigit()) {
03028
03029 tok.ToLower();
03030 const char *s[3] = {"k", "m", "g"};
03031 Int_t i = 0, k = 1024;
03032 while (fact < 0) {
03033 if (tok.EndsWith(s[i]))
03034 fact = k;
03035 else
03036 k *= 1024;
03037 }
03038 tok.Remove(tok.Length()-1);
03039 }
03040 if (tok.IsDigit()) {
03041 if (j == 0)
03042 fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
03043 else
03044 fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
03045 } else {
03046 TString ssz(ksz[j], strlen(ksz[j])-1);
03047 Info("SetupCommon", "parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
03048 }
03049 }
03050 }
03051 }
03052 }
03053
03054
03055 if (IsMaster() && fQMgr)
03056 if (fQMgr->ApplyMaxQueries(fMaxQueries) != 0)
03057 Warning("SetupCommon", "problems applying fMaxQueries");
03058
03059
03060 if (fProtocol > 12) {
03061 TString vac = gROOT->GetVersion();
03062 if (gROOT->GetSvnRevision() > 0)
03063 vac += TString::Format(":r%d", gROOT->GetSvnRevision());
03064 TString rtag = gEnv->GetValue("ProofServ.RootVersionTag", "");
03065 if (rtag.Length() > 0)
03066 vac += TString::Format(":%s", rtag.Data());
03067 vac += TString::Format("|%s-%s",gSystem->GetBuildArch(), gSystem->GetBuildCompilerVersion());
03068 TMessage m(kPROOF_VERSARCHCOMP);
03069 m << vac;
03070 fSocket->Send(m);
03071 }
03072
03073
03074 TString all_vars(gSystem->Getenv("PROOF_ALLVARS"));
03075 TString name;
03076 Int_t from = 0;
03077 while (all_vars.Tokenize(name, from, ",")) {
03078 if (!name.IsNull()) {
03079 TString value = gSystem->Getenv(name);
03080 TProof::AddEnvVar(name, value);
03081 }
03082 }
03083
03084 if (fgLogToSysLog > 0) {
03085
03086 if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
03087 fgSysLogEntity.Form("%s:%s", fUser.Data(), fGroup.Data());
03088 } else if (!(fUser.IsNull()) && fGroup.IsNull()) {
03089 fgSysLogEntity.Form("%s:default", fUser.Data());
03090 } else if (fUser.IsNull() && !(fGroup.IsNull())) {
03091 fgSysLogEntity.Form("undef:%s", fGroup.Data());
03092 }
03093
03094 TString s;
03095 s.Form("%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
03096 gSystem->Syslog(kLogNotice, s.Data());
03097 }
03098
03099 if (gProofDebugLevel > 0)
03100 Info("SetupCommon", "successfully completed");
03101
03102
03103 return 0;
03104 }
03105
03106
03107 void TProofServ::Terminate(Int_t status)
03108 {
03109
03110
03111 if (fgLogToSysLog > 0) {
03112 TString s;
03113 s.Form("%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
03114 gSystem->Syslog(kLogNotice, s.Data());
03115 }
03116
03117
03118 ProcInfo_t pi;
03119 if (!gSystem->GetProcInfo(&pi)){
03120 Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
03121 pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
03122 }
03123
03124
03125 if (status == 0) {
03126
03127 gSystem->ChangeDirectory("/");
03128
03129 gSystem->MakeDirectory(fSessionDir+"/.delete");
03130 gSystem->Exec(TString::Format("%s %s", kRM, fSessionDir.Data()));
03131 }
03132
03133
03134 if (IsMaster()) {
03135 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
03136
03137 gSystem->ChangeDirectory("/");
03138
03139 gSystem->MakeDirectory(fQueryDir+"/.delete");
03140 gSystem->Exec(TString::Format("%s %s", kRM, fQueryDir.Data()));
03141
03142 if (fQueryLock)
03143 gSystem->Unlink(fQueryLock->GetName());
03144 }
03145
03146
03147 if (fQueryLock)
03148 fQueryLock->Unlock();
03149 }
03150
03151
03152 if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
03153 if (UnlinkDataDir(fDataDir))
03154 Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
03155 }
03156
03157
03158
03159 TIter next(gSystem->GetListOfFileHandlers());
03160 TObject *fh = 0;
03161 while ((fh = next())) {
03162 TProofServInputHandler *ih = dynamic_cast<TProofServInputHandler *>(fh);
03163 if (ih)
03164 gSystem->RemoveFileHandler(ih);
03165 }
03166
03167
03168 gSystem->ExitLoop();
03169
03170
03171 }
03172
03173
03174 Bool_t TProofServ::UnlinkDataDir(const char *path)
03175 {
03176
03177
03178
03179 if (!path || strlen(path) <= 0) return kFALSE;
03180
03181 Bool_t dorm = kTRUE;
03182 void *dirp = gSystem->OpenDirectory(path);
03183 if (dirp) {
03184 TString fpath;
03185 const char *ent = 0;
03186 while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
03187 if (!strcmp(ent, ".") || !strcmp(ent, "..")) continue;
03188 fpath.Form("%s/%s", path, ent);
03189 FileStat_t st;
03190 if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
03191 dorm = UnlinkDataDir(fpath);
03192 } else {
03193 dorm = kFALSE;
03194 }
03195 }
03196 } else {
03197
03198 dorm = kFALSE;
03199 }
03200
03201
03202 if (dorm && gSystem->Unlink(path) != 0)
03203 Warning("UnlinkDataDir", "data directory '%s' is empty but could not be removed", path);
03204
03205 return dorm;
03206 }
03207
03208
03209 Bool_t TProofServ::IsActive()
03210 {
03211
03212
03213 return gProofServ ? kTRUE : kFALSE;
03214 }
03215
03216
03217 TProofServ *TProofServ::This()
03218 {
03219
03220
03221
03222
03223 return gProofServ;
03224 }
03225
03226
03227 Int_t TProofServ::OldAuthSetup(TString &conf)
03228 {
03229
03230
03231
03232 OldProofServAuthSetup_t oldAuthSetupHook = 0;
03233
03234 if (!oldAuthSetupHook) {
03235
03236 TString authlib = "libRootAuth";
03237 char *p = 0;
03238
03239 if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
03240 delete[] p;
03241 if (gSystem->Load(authlib) == -1) {
03242 Error("OldAuthSetup", "can't load %s",authlib.Data());
03243 return kFALSE;
03244 }
03245 } else {
03246 Error("OldAuthSetup", "can't locate %s",authlib.Data());
03247 return -1;
03248 }
03249
03250
03251 Func_t f = gSystem->DynFindSymbol(authlib,"OldProofServAuthSetup");
03252 if (f)
03253 oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
03254 else {
03255 Error("OldAuthSetup", "can't find OldProofServAuthSetup");
03256 return -1;
03257 }
03258 }
03259
03260
03261 return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
03262 fUser, fOrdinal, conf);
03263 }
03264
03265
03266 TProofQueryResult *TProofServ::MakeQueryResult(Long64_t nent,
03267 const char *opt,
03268 TList *inlist, Long64_t fst,
03269 TDSet *dset, const char *selec,
03270 TObject *elist)
03271 {
03272
03273
03274
03275 Int_t seqnum = -1;
03276 if (fQMgr) {
03277 fQMgr->IncrementSeqNum();
03278 seqnum = fQMgr->SeqNum();
03279 }
03280
03281
03282 Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
03283 if (olds)
03284 dset->SetWriteV3(kFALSE);
03285
03286
03287 TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt, inlist, nent,
03288 fst, dset, selec, elist);
03289
03290 pqr->SetTitle(gSystem->BaseName(fQueryDir));
03291
03292
03293 if (olds)
03294 dset->SetWriteV3(kTRUE);
03295
03296 return pqr;
03297 }
03298
03299
03300 void TProofServ::SetQueryRunning(TProofQueryResult *pq)
03301 {
03302
03303
03304
03305 fflush(stdout);
03306 Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
03307
03308
03309 Printf(" ");
03310 Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
03311
03312
03313 TString parlist = "";
03314 TIter nxp(fEnabledPackages);
03315 TObjString *os= 0;
03316 while ((os = (TObjString *)nxp())) {
03317 if (parlist.Length() <= 0)
03318 parlist = os->GetName();
03319 else
03320 parlist += TString::Format(";%s",os->GetName());
03321 }
03322
03323 if (fProof) {
03324
03325 pq->SetRunning(startlog, parlist, fProof->GetParallel());
03326
03327
03328 pq->SetProcessInfo(pq->GetEntries(),
03329 fProof->GetCpuTime(), fProof->GetBytesRead());
03330 } else {
03331
03332 pq->SetRunning(startlog, parlist, -1);
03333
03334
03335 pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
03336 }
03337 }
03338
03339
03340 void TProofServ::HandleArchive(TMessage *mess, TString *slb)
03341 {
03342
03343
03344 PDB(kGlobal, 1)
03345 Info("HandleArchive", "Enter");
03346
03347 TString queryref;
03348 TString path;
03349 (*mess) >> queryref >> path;
03350
03351 if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
03352
03353
03354 if (queryref == "Default") {
03355 fArchivePath = path;
03356 Info("HandleArchive",
03357 "default path set to %s", fArchivePath.Data());
03358 return;
03359 }
03360
03361 Int_t qry = -1;
03362 TString qdir;
03363 TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
03364 TProofQueryResult *pqm = pqr;
03365
03366 if (path.Length() <= 0) {
03367 if (fArchivePath.Length() <= 0) {
03368 Info("HandleArchive",
03369 "archive paths are not defined - do nothing");
03370 return;
03371 }
03372 if (qry > 0) {
03373 path.Form("%s/session-%s-%d.root",
03374 fArchivePath.Data(), fTopSessionTag.Data(), qry);
03375 } else {
03376 path = queryref;
03377 path.ReplaceAll(":q","-");
03378 path.Insert(0, TString::Format("%s/",fArchivePath.Data()));
03379 path += ".root";
03380 }
03381 }
03382
03383
03384 if (!pqr || qry < 0) {
03385 TString fout = qdir;
03386 fout += "/query-result.root";
03387
03388 TFile *f = TFile::Open(fout,"READ");
03389 pqr = 0;
03390 if (f) {
03391 f->ReadKeys();
03392 TIter nxk(f->GetListOfKeys());
03393 TKey *k = 0;
03394 while ((k = (TKey *)nxk())) {
03395 if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
03396 pqr = (TProofQueryResult *) f->Get(k->GetName());
03397 if (pqr)
03398 break;
03399 }
03400 }
03401 f->Close();
03402 delete f;
03403 } else {
03404 Info("HandleArchive",
03405 "file cannot be open (%s)",fout.Data());
03406 return;
03407 }
03408 }
03409
03410 if (pqr) {
03411
03412 PDB(kGlobal, 1) Info("HandleArchive",
03413 "archive path for query #%d: %s",
03414 qry, path.Data());
03415 TFile *farc = 0;
03416 if (gSystem->AccessPathName(path))
03417 farc = TFile::Open(path,"NEW");
03418 else
03419 farc = TFile::Open(path,"UPDATE");
03420 if (!farc || !(farc->IsOpen())) {
03421 Info("HandleArchive",
03422 "archive file cannot be open (%s)",path.Data());
03423 return;
03424 }
03425 farc->cd();
03426
03427
03428 pqr->SetArchived(path);
03429 if (pqm)
03430 pqm->SetArchived(path);
03431
03432
03433 pqr->Write();
03434
03435
03436 if (qry > -1 && fQMgr)
03437 fQMgr->SaveQuery(pqr);
03438
03439
03440 Info("HandleArchive",
03441 "results of query %s archived to file %s",
03442 queryref.Data(), path.Data());
03443 }
03444
03445
03446 return;
03447 }
03448
03449
03450 void TProofServ::HandleProcess(TMessage *mess, TString *slb)
03451 {
03452
03453
03454 PDB(kGlobal, 1)
03455 Info("HandleProcess", "Enter");
03456
03457
03458 if (!IsTopMaster() && !IsIdle())
03459 return;
03460
03461 TDSet *dset;
03462 TString filename, opt;
03463 TList *input;
03464 Long64_t nentries, first;
03465 TEventList *evl = 0;
03466 TEntryList *enl = 0;
03467 Bool_t sync;
03468
03469 (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
03470
03471 if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
03472 (*mess) >> enl;
03473 Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
03474
03475
03476 TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
03477 if (enl && evl)
03478
03479 SafeDelete(evl);
03480 if ((!hasNoData) && elist)
03481 dset->SetEntryList(elist);
03482
03483 if (IsTopMaster()) {
03484
03485
03486 if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
03487 TString emsg;
03488 if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
03489 SendAsynMessage(TString::Format("AssertDataSet on %s: %s",
03490 fPrefix.Data(), emsg.Data()));
03491 Error("HandleProcess", "AssertDataSet: %s", emsg.Data());
03492
03493 if (sync) SendLogFile();
03494 return;
03495 }
03496 }
03497
03498 TProofQueryResult *pq = 0;
03499
03500
03501
03502 pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
03503
03504
03505 if (dset) input->Add(dset);
03506 if (elist) input->Add(elist);
03507 pq->SetInputList(input, kTRUE);
03508
03509
03510 input->Clear("nodelete");
03511 SafeDelete(input);
03512
03513
03514 TString emsg;
03515 if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
03516 Warning("HandleProcess", "could not save input data: %s", emsg.Data());
03517
03518
03519 if (!(pq->IsDraw())) {
03520 if (fQMgr) {
03521 if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
03522
03523 fQMgr->SaveQuery(pq);
03524 }
03525 }
03526
03527
03528 QueueQuery(pq);
03529
03530
03531
03532
03533
03534 Bool_t enqueued = kFALSE;
03535 Int_t pc = 0;
03536
03537 if (fProof->UseDynamicStartup()) {
03538
03539 TList* workerList = new TList();
03540 EQueryAction retVal = GetWorkers(workerList, pc);
03541 if (retVal == TProofServ::kQueryStop) {
03542 Error("HandleProcess", "error getting list of worker nodes");
03543
03544 if (sync) SendLogFile();
03545 return;
03546 } else if (retVal == TProofServ::kQueryEnqueued) {
03547
03548 enqueued = kTRUE;
03549 Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
03550 } else if (Int_t ret = fProof->AddWorkers(workerList) < 0) {
03551 Error("HandleProcess", "Adding a list of worker nodes returned: %d",
03552 ret);
03553
03554 if (sync) SendLogFile();
03555 return;
03556 }
03557 } else {
03558 EQueryAction retVal = GetWorkers(0, pc);
03559 if (retVal == TProofServ::kQueryStop) {
03560 Error("HandleProcess", "error getting list of worker nodes");
03561
03562 if (sync) SendLogFile();
03563 return;
03564 } else if (retVal == TProofServ::kQueryEnqueued) {
03565
03566 enqueued = kTRUE;
03567 Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
03568 } else if (retVal != TProofServ::kQueryOK) {
03569 Error("HandleProcess", "unknown return value: %d", retVal);
03570
03571 if (sync) SendLogFile();
03572 return;
03573 }
03574 }
03575
03576
03577
03578
03579 TMessage m(kPROOF_QUERYSUBMITTED);
03580 if (!sync || enqueued) {
03581 m << pq->GetSeqNum() << kFALSE;
03582 fSocket->Send(m);
03583 }
03584
03585
03586 if (!IsIdle()) {
03587
03588 Info("HandleProcess",
03589 "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
03590 return;
03591 }
03592
03593
03594
03595
03596
03597 Bool_t doprocess = kFALSE;
03598 while (WaitingQueries() > 0 && !enqueued) {
03599 doprocess = kTRUE;
03600
03601 ProcessNext(slb);
03602
03603 if (fProof->UseDynamicStartup())
03604 enqueued = kTRUE;
03605
03606 }
03607
03608
03609 SetIdle(kTRUE);
03610
03611
03612 fProof->ResetMergers();
03613
03614
03615
03616
03617 if (!sync) SendLogFile();
03618
03619
03620 if (doprocess) {
03621 m.Reset(kPROOF_SETIDLE);
03622 Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
03623 m << waiting;
03624 fSocket->Send(m);
03625 }
03626
03627
03628
03629
03630 if (sync) SendLogFile();
03631
03632
03633 SetIdle(kTRUE);
03634
03635 } else {
03636
03637
03638 SetIdle(kFALSE);
03639
03640
03641 Bool_t deleteplayer = kTRUE;
03642 MakePlayer();
03643
03644
03645 if (dset && (dset->IsA() == TDSetProxy::Class()))
03646 ((TDSetProxy*)dset)->SetProofServ(this);
03647
03648
03649 TString emsg;
03650 if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
03651 Warning("HandleProcess", "could not get input data: %s", emsg.Data());
03652
03653
03654 if (TProof::GetParameter(input, "PROOF_QuerySeqNum", fQuerySeqNum) != 0)
03655 Warning("HandleProcess", "could not get query sequential number!");
03656
03657
03658 TObject *nord = 0;
03659 while ((nord = input->FindObject("PROOF_Ordinal")))
03660 input->Remove(nord);
03661 input->Add(new TNamed("PROOF_Ordinal", GetOrdinal()));
03662
03663
03664 TIter next(input);
03665 TObject *o = 0;
03666 while ((o = next())) {
03667 PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
03668 fPlayer->AddInput(o);
03669 }
03670
03671
03672 fSocket->Send(kPROOF_STARTPROCESS);
03673
03674
03675 PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
03676 fPlayer->Process(dset, filename, opt, nentries, first);
03677
03678
03679 TMessage m(kPROOF_STOPPROCESS);
03680 Bool_t abort = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted) ? kFALSE : kTRUE;
03681 if (fProtocol > 18) {
03682 TProofProgressStatus* status =
03683 new TProofProgressStatus(fPlayer->GetEventsProcessed(),
03684 gPerfStats?gPerfStats->GetBytesRead():0);
03685 if (status)
03686 m << status << abort;
03687 if (slb)
03688 slb->Form("%d %lld %lld", fPlayer->GetExitStatus(),
03689 status->GetEntries(), status->GetBytesRead());
03690 SafeDelete(status);
03691 } else {
03692 m << fPlayer->GetEventsProcessed() << abort;
03693 if (slb)
03694 slb->Form("%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
03695 }
03696
03697 fSocket->Send(m);
03698 PDB(kGlobal, 2)
03699 Info("TProofServ::Handleprocess",
03700 "worker %s has finished processing with %d objects in output list",
03701 GetOrdinal(), fPlayer->GetOutputList()->GetEntries());
03702
03703
03704 SafeDelete(dset);
03705 SafeDelete(enl);
03706 SafeDelete(evl);
03707
03708
03709 Bool_t isInMergingMode = kFALSE;
03710 if (!(TestBit(TProofServ::kHighMemory))) {
03711 Int_t nm = 0;
03712 if (TProof::GetParameter(input, "PROOF_UseMergers", nm) == 0) {
03713 isInMergingMode = (nm >= 0) ? kTRUE : kFALSE;
03714 }
03715 }
03716 PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isInMergingMode);
03717
03718 if (!IsMaster() && isInMergingMode &&
03719 fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
03720
03721
03722
03723
03724
03725
03726
03727 TMessage msg_osize(kPROOF_SUBMERGER);
03728 msg_osize << Int_t(TProof::kOutputSize);
03729 msg_osize << fPlayer->GetOutputList()->GetEntries();
03730
03731 fMergingSocket = new TServerSocket(0);
03732 Int_t merge_port = 0;
03733 if (fMergingSocket) {
03734 PDB(kGlobal, 2)
03735 Info("HandleProcess", "possible port for merging connections: %d",
03736 fMergingSocket->GetLocalPort());
03737 merge_port = fMergingSocket->GetLocalPort();
03738 }
03739 msg_osize << merge_port;
03740 fSocket->Send(msg_osize);
03741
03742
03743 SetIdle(kTRUE);
03744
03745
03746 deleteplayer = kFALSE;
03747
03748 PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
03749
03750 } else {
03751
03752
03753 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
03754 PDB(kGlobal, 2) Info("HandleProcess", "sending result directly to master");
03755 if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
03756 Warning("HandleProcess","problems sending output list");
03757 } else {
03758 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
03759 Warning("HandleProcess","the output list is empty!");
03760 if (SendResults(fSocket) != 0)
03761 Warning("HandleProcess", "problems sending output list");
03762 }
03763
03764
03765 if (IsMaster()) fProof->ResetMergers();
03766
03767
03768 fSocket->Send(kPROOF_SETIDLE);
03769
03770
03771 SetIdle(kTRUE);
03772
03773
03774 SendLogFile();
03775 }
03776
03777 fPlayer->GetInputList()->SetOwner(0);
03778 input->SetOwner();
03779 SafeDelete(input);
03780
03781
03782 if (deleteplayer) DeletePlayer();
03783 }
03784
03785 PDB(kGlobal, 1) Info("HandleProcess", "done");
03786
03787
03788 return;
03789 }
03790
03791
03792 Int_t TProofServ::SendResults(TSocket *sock, TList *outlist, TQueryResult *pq)
03793 {
03794
03795
03796 PDB(kOutput, 2) Info("SendResults", "enter");
03797
03798 TString msg;
03799 if (fProtocol > 23 && outlist) {
03800
03801
03802
03803 TMessage mbuf(kPROOF_OUTPUTOBJECT);
03804
03805 Int_t olsz = outlist->GetSize();
03806 if (IsTopMaster() && pq) {
03807 msg.Form("%s: merging output objects ... done ",
03808 fPrefix.Data());
03809 SendAsynMessage(msg.Data());
03810
03811 msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
03812 SendAsynMessage(msg.Data(), kFALSE);
03813
03814 mbuf << (Int_t) 0;
03815 mbuf.WriteObject(pq);
03816 if (sock->Send(mbuf) < 0) return -1;
03817 }
03818
03819 Int_t ns = 0, np = 0;
03820 TIter nxo(outlist);
03821 TObject *o = 0;
03822 Int_t totsz = 0, objsz = 0;
03823 mbuf.Reset();
03824 while ((o = nxo())) {
03825 if (mbuf.Length() > fMsgSizeHWM) {
03826 PDB(kOutput, 1)
03827 Info("SendResults",
03828 "message has %d bytes: limit of %lld bytes reached - sending ...",
03829 mbuf.Length(), fMsgSizeHWM);
03830
03831
03832 if (fCompressMsg > 0) {
03833 mbuf.SetCompressionLevel(fCompressMsg);
03834 mbuf.Compress();
03835 objsz = mbuf.CompLength();
03836 } else {
03837 objsz = mbuf.Length();
03838 }
03839 totsz += objsz;
03840 if (IsTopMaster()) {
03841 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
03842 fPrefix.Data(), ns, olsz, objsz);
03843 SendAsynMessage(msg.Data(), kFALSE);
03844 }
03845 if (sock->Send(mbuf) < 0) return -1;
03846
03847 mbuf.Reset();
03848 np = 0;
03849 }
03850 ns++;
03851 np++;
03852 mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
03853 mbuf << o;
03854 }
03855 if (np > 0) {
03856
03857
03858 if (fCompressMsg > 0) {
03859 mbuf.SetCompressionLevel(fCompressMsg);
03860 mbuf.Compress();
03861 objsz = mbuf.CompLength();
03862 } else {
03863 objsz = mbuf.Length();
03864 }
03865 totsz += objsz;
03866 if (IsTopMaster()) {
03867 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
03868 fPrefix.Data(), ns, olsz, objsz);
03869 SendAsynMessage(msg.Data(), kFALSE);
03870 }
03871 if (sock->Send(mbuf) < 0) return -1;
03872 }
03873 if (IsTopMaster()) {
03874
03875 msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
03876 fPrefix.Data(), olsz, totsz);
03877 SendAsynMessage(msg.Data());
03878 }
03879 } else if (fProtocol > 10 && outlist) {
03880
03881
03882
03883 TMessage mbuf(kPROOF_OUTPUTOBJECT);
03884
03885 Int_t olsz = outlist->GetSize();
03886 if (IsTopMaster() && pq) {
03887 msg.Form("%s: merging output objects ... done ",
03888 fPrefix.Data());
03889 SendAsynMessage(msg.Data());
03890
03891 msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
03892 SendAsynMessage(msg.Data(), kFALSE);
03893
03894 mbuf << (Int_t) 0;
03895 mbuf.WriteObject(pq);
03896 if (sock->Send(mbuf) < 0) return -1;
03897 }
03898
03899 Int_t ns = 0;
03900 Int_t totsz = 0, objsz = 0;
03901 TIter nxo(fPlayer->GetOutputList());
03902 TObject *o = 0;
03903 while ((o = nxo())) {
03904 ns++;
03905 mbuf.Reset();
03906 Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
03907 mbuf << type;
03908 mbuf.WriteObject(o);
03909
03910
03911 if (fCompressMsg > 0) {
03912 mbuf.SetCompressionLevel(fCompressMsg);
03913 mbuf.Compress();
03914 objsz = mbuf.CompLength();
03915 } else {
03916 objsz = mbuf.Length();
03917 }
03918 totsz += objsz;
03919 if (IsTopMaster()) {
03920 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
03921 fPrefix.Data(), ns, olsz, objsz);
03922 SendAsynMessage(msg.Data(), kFALSE);
03923 }
03924 if (sock->Send(mbuf) < 0) return -1;
03925 }
03926
03927 if (IsTopMaster()) {
03928
03929 msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
03930 fPrefix.Data(), olsz, totsz);
03931 SendAsynMessage(msg.Data());
03932 }
03933
03934 } else if (IsTopMaster() && fProtocol > 6 && outlist) {
03935
03936
03937 TMessage mbuf(kPROOF_OUTPUTLIST);
03938 mbuf.WriteObject(pq);
03939
03940 Int_t blen = mbuf.CompLength();
03941 Int_t olsz = outlist->GetSize();
03942
03943 msg.Form("%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
03944 SendAsynMessage(msg.Data(), kFALSE);
03945 if (sock->Send(mbuf) < 0) return -1;
03946
03947 } else {
03948 if (outlist) {
03949 PDB(kGlobal, 2) Info("SendResults", "sending output list");
03950 } else {
03951 PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
03952 }
03953 if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
03954 }
03955
03956 PDB(kOutput,2) Info("SendResults", "done");
03957
03958
03959 return 0;
03960 }
03961
03962
03963 void TProofServ::ProcessNext(TString *slb)
03964 {
03965
03966
03967
03968 TDSet *dset = 0;
03969 TString filename, opt;
03970 TList *input = 0;
03971 Long64_t nentries = -1, first = 0;
03972
03973 TObject *elist = 0;
03974 TProofQueryResult *pq = 0;
03975
03976
03977
03978
03979 pq = NextQuery();
03980 if (pq) {
03981
03982
03983 SetIdle(kFALSE);
03984 opt = pq->GetOptions();
03985 input = pq->GetInputList();
03986 nentries = pq->GetEntries();
03987 first = pq->GetFirst();
03988 filename = pq->GetSelecImp()->GetName();
03989 Ssiz_t id = opt.Last('#');
03990 if (id != kNPOS && id < opt.Length() - 1) {
03991 filename += opt(id + 1, opt.Length());
03992
03993 opt.Remove(id);
03994 }
03995
03996 TObject *o = 0;
03997 if ((o = pq->GetInputObject("TDSet"))) {
03998 dset = (TDSet *) o;
03999 } else {
04000
04001 Error("ProcessNext", "no TDset object: cannot continue");
04002 return;
04003 }
04004 elist = 0;
04005 if ((o = pq->GetInputObject("TEntryList")))
04006 elist = o;
04007 else if ((o = pq->GetInputObject("TEventList")))
04008 elist = o;
04009
04010
04011 if (pq->GetSelecImp()) {
04012 gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecImp()->GetName()));
04013 pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
04014 }
04015 if (pq->GetSelecHdr() &&
04016 !strstr(pq->GetSelecHdr()->GetName(), "TProofDrawHist")) {
04017 gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecHdr()->GetName()));
04018 pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
04019 }
04020 } else {
04021
04022 Error("ProcessNext", "empty waiting queries list!");
04023 return;
04024 }
04025
04026
04027 SetQueryRunning(pq);
04028
04029
04030 if (fQMgr) {
04031 if (!(pq->IsDraw()))
04032 fQMgr->SaveQuery(pq);
04033 else
04034 fQMgr->IncrementDrawQueries();
04035 fQMgr->ResetTime();
04036 }
04037
04038
04039 TMessage m(kPROOF_STARTPROCESS);
04040 m << TString(pq->GetSelecImp()->GetName())
04041 << dset->GetNumOfFiles()
04042 << pq->GetFirst() << pq->GetEntries();
04043 fSocket->Send(m);
04044
04045
04046 MakePlayer();
04047
04048
04049 fPlayer->AddQueryResult(pq);
04050
04051
04052 fPlayer->SetCurrentQuery(pq);
04053
04054
04055 if (dset->IsA() == TDSetProxy::Class())
04056 ((TDSetProxy*)dset)->SetProofServ(this);
04057
04058
04059
04060 TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
04061 input->Add(new TNamed("PROOF_QueryTag", qid.Data()));
04062
04063 fQuerySeqNum = pq->GetSeqNum();
04064 input->Add(new TParameter<Int_t>("PROOF_QuerySeqNum", fQuerySeqNum));
04065
04066
04067
04068 if (gEnv->Lookup("Proof.UseMergers") && !input->FindObject("PROOF_UseMergers")) {
04069 Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
04070 if (smg >= 0) {
04071 input->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
04072 PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
04073 }
04074 }
04075
04076
04077 TIter next(input);
04078 TObject *o = 0;
04079 while ((o = next())) {
04080 PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
04081 fPlayer->AddInput(o);
04082 }
04083
04084
04085 if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
04086
04087
04088 PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
04089 fPlayer->Process(dset, filename, opt, nentries, first);
04090
04091
04092 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
04093 Bool_t abort =
04094 (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) ? kTRUE : kFALSE;
04095 m.Reset(kPROOF_STOPPROCESS);
04096
04097 if (fProtocol > 18) {
04098 TProofProgressStatus* status = fPlayer->GetProgressStatus();
04099 m << status << abort;
04100 status = 0;
04101 } else if (fProtocol > 8) {
04102 m << fPlayer->GetEventsProcessed() << abort;
04103 } else {
04104 m << fPlayer->GetEventsProcessed();
04105 }
04106 fSocket->Send(m);
04107 }
04108
04109
04110 if (fDataSetManager && fPlayer->GetOutputList()) {
04111 TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
04112 if (psr) {
04113 if (RegisterDataSets(input, fPlayer->GetOutputList()) != 0)
04114 Warning("ProcessNext", "problems registering produced datasets");
04115 fPlayer->GetOutputList()->Remove(psr);
04116 delete psr;
04117 }
04118 }
04119
04120
04121 if (fQMgr && !pq->IsDraw()) {
04122 fProof->AskStatistics();
04123 if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
04124 fQMgr->SaveQuery(pq, fMaxQueries);
04125 }
04126
04127
04128 TQueryResult *pqr = pq->CloneInfo();
04129
04130 Info("ProcessNext", "adding info about dataset '%s' in the light query result", dset->GetName());
04131 TList rin;
04132 TDSet *ds = new TDSet(dset->GetName(), dset->GetObjName());
04133 rin.Add(ds);
04134 pqr->SetInputList(&rin, kTRUE);
04135 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
04136 PDB(kGlobal, 2)
04137 Info("ProcessNext", "sending results");
04138 TQueryResult *xpq = (fProtocol > 10) ? pqr : pq;
04139 if (SendResults(fSocket, fPlayer->GetOutputList(), xpq) != 0)
04140 Warning("ProcessNext", "problems sending output list");
04141 if (slb) slb->Form("%d %lld %lld %.3f", fPlayer->GetExitStatus(), pq->GetEntries(),
04142 pq->GetBytes(), pq->GetUsedCPU());
04143 } else {
04144 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
04145 Warning("ProcessNext","the output list is empty!");
04146 if (SendResults(fSocket) != 0)
04147 Warning("ProcessNext", "problems sending output list");
04148 if (slb) slb->Form("%d -1 -1 %.3f", fPlayer->GetExitStatus(), pq->GetUsedCPU());
04149 }
04150
04151
04152 if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
04153 delete pqr;
04154 if (fQMgr) fQMgr->RemoveQuery(pq);
04155 } else {
04156
04157 if (!(pq->IsDraw())) {
04158 if (fQMgr && fQMgr->Queries()) {
04159 fQMgr->Queries()->Add(pqr);
04160
04161 fQMgr->Queries()->Remove(pq);
04162 }
04163
04164
04165
04166 fPlayer->RemoveQueryResult(TString::Format("%s:%s",
04167 pq->GetTitle(), pq->GetName()));
04168 }
04169 }
04170
04171 DeletePlayer();
04172 if (IsMaster() && fProof->UseDynamicStartup())
04173
04174 fProof->RemoveWorkers(0);
04175 }
04176
04177
04178 Int_t TProofServ::RegisterDataSets(TList *in, TList *out)
04179 {
04180
04181
04182 PDB(kDataset, 1) Info("RegisterDataSets", "enter");
04183
04184 if (!in || !out) return 0;
04185
04186 TString msg;
04187 TIter nxo(out);
04188 TObject *o = 0;
04189 while ((o = nxo())) {
04190
04191 TFileCollection *ds = dynamic_cast<TFileCollection*> (o);
04192 if (ds) {
04193
04194 TNamed *fcn = 0;
04195 TString tag = TString::Format("DATASET_%s", ds->GetName());
04196 if (!(fcn = (TNamed *) out->FindObject(tag))) continue;
04197
04198 TString regopt(fcn->GetTitle());
04199
04200 if (fDataSetManager) {
04201 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
04202
04203 if (ds->GetList()->GetSize() > 0) {
04204
04205 msg.Form("Registering and verifying dataset '%s' ... ", ds->GetName());
04206 SendAsynMessage(msg.Data(), kFALSE);
04207 Int_t rc = 0;
04208 FlushLogFile();
04209 { TProofServLogHandlerGuard hg(fLogFile, fSocket);
04210
04211 Bool_t allowVerify = fDataSetManager->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
04212 if (regopt.Contains("V") && !allowVerify)
04213 fDataSetManager->SetBit(TDataSetManager::kAllowVerify);
04214 rc = fDataSetManager->RegisterDataSet(ds->GetName(), ds, regopt);
04215
04216 if (regopt.Contains("V") && !allowVerify)
04217 fDataSetManager->ResetBit(TDataSetManager::kAllowVerify);
04218 }
04219 if (rc != 0) {
04220 Warning("RegisterDataSets",
04221 "failure registering dataset '%s'", ds->GetName());
04222 msg.Form("Registering and verifying dataset '%s' ... failed! See log for more details", ds->GetName());
04223 } else {
04224 Info("RegisterDataSets", "dataset '%s' successfully registered", ds->GetName());
04225 msg.Form("Registering and verifying dataset '%s' ... OK", ds->GetName());
04226 }
04227 SendAsynMessage(msg.Data(), kTRUE);
04228
04229 PDB(kDataset, 2) {
04230 Info("RegisterDataSets","printing collection");
04231 ds->Print("F");
04232 }
04233 } else {
04234 Warning("RegisterDataSets", "collection '%s' is empty", o->GetName());
04235 }
04236 } else {
04237 Info("RegisterDataSets", "dataset registration not allowed");
04238 return -1;
04239 }
04240 } else {
04241 Error("RegisterDataSets", "dataset manager is undefined!");
04242 return -1;
04243 }
04244
04245 out->Remove(fcn);
04246 SafeDelete(fcn);
04247 }
04248 }
04249
04250 PDB(kDataset, 1) Info("RegisterDataSets", "exit");
04251
04252 return 0;
04253 }
04254
04255
04256 void TProofServ::HandleQueryList(TMessage *mess)
04257 {
04258
04259
04260 PDB(kGlobal, 1)
04261 Info("HandleQueryList", "Enter");
04262
04263 Bool_t all;
04264 (*mess) >> all;
04265
04266 TList *ql = new TList;
04267 Int_t ntot = 0, npre = 0, ndraw= 0;
04268 if (fQMgr) {
04269 if (all) {
04270
04271 TString qdir = fQueryDir;
04272 Int_t idx = qdir.Index("session-");
04273 if (idx != kNPOS)
04274 qdir.Remove(idx);
04275 fQMgr->ScanPreviousQueries(qdir);
04276
04277 if (fQMgr->PreviousQueries()) {
04278 TIter nxq(fQMgr->PreviousQueries());
04279 TProofQueryResult *pqr = 0;
04280 while ((pqr = (TProofQueryResult *)nxq())) {
04281 ntot++;
04282 pqr->fSeqNum = ntot;
04283 ql->Add(pqr);
04284 }
04285 }
04286 }
04287
04288 npre = ntot;
04289 if (fQMgr->Queries()) {
04290
04291 TIter nxq(fQMgr->Queries());
04292 TProofQueryResult *pqr = 0;
04293 TQueryResult *pqm = 0;
04294 while ((pqr = (TProofQueryResult *)nxq())) {
04295 ntot++;
04296 pqm = pqr->CloneInfo();
04297 pqm->fSeqNum = ntot;
04298 ql->Add(pqm);
04299 }
04300 }
04301
04302 ndraw = fQMgr->DrawQueries();
04303 }
04304
04305 TMessage m(kPROOF_QUERYLIST);
04306 m << npre << ndraw << ql;
04307 fSocket->Send(m);
04308 delete ql;
04309
04310
04311 return;
04312 }
04313
04314
04315 void TProofServ::HandleRemove(TMessage *mess, TString *slb)
04316 {
04317
04318
04319 PDB(kGlobal, 1)
04320 Info("HandleRemove", "Enter");
04321
04322 TString queryref;
04323 (*mess) >> queryref;
04324
04325 if (slb) *slb = queryref;
04326
04327 if (queryref == "cleanupqueue") {
04328
04329 Int_t pend = CleanupWaitingQueries();
04330
04331 Info("HandleRemove", "%d queries removed from the waiting list", pend);
04332
04333 return;
04334 }
04335
04336 if (queryref == "cleanupdir") {
04337
04338
04339 Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
04340
04341
04342 Info("HandleRemove", "%d directories removed", nd);
04343
04344 return;
04345 }
04346
04347
04348 if (fQMgr) {
04349 TProofLockPath *lck = 0;
04350 if (fQMgr->LockSession(queryref, &lck) == 0) {
04351
04352
04353 TList qtorm;
04354 fQMgr->RemoveQuery(queryref, &qtorm);
04355 CleanupWaitingQueries(kFALSE, &qtorm);
04356
04357
04358 if (lck) {
04359 gSystem->Unlink(lck->GetName());
04360 SafeDelete(lck);
04361 }
04362
04363
04364 return;
04365 }
04366 } else {
04367 Warning("HandleRemove", "query result manager undefined!");
04368 }
04369
04370
04371 Info("HandleRemove",
04372 "query %s could not be removed (unable to lock session)", queryref.Data());
04373
04374
04375 return;
04376 }
04377
04378
04379 void TProofServ::HandleRetrieve(TMessage *mess, TString *slb)
04380 {
04381
04382
04383 PDB(kGlobal, 1)
04384 Info("HandleRetrieve", "Enter");
04385
04386 TString queryref;
04387 (*mess) >> queryref;
04388
04389 if (slb) *slb = queryref;
04390
04391
04392 Int_t qry = -1;
04393 TString qdir;
04394 if (fQMgr) fQMgr->LocateQuery(queryref, qry, qdir);
04395
04396 TString fout = qdir;
04397 fout += "/query-result.root";
04398
04399 TFile *f = TFile::Open(fout,"READ");
04400 TProofQueryResult *pqr = 0;
04401 if (f) {
04402 f->ReadKeys();
04403 TIter nxk(f->GetListOfKeys());
04404 TKey *k = 0;
04405 while ((k = (TKey *)nxk())) {
04406 if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
04407 pqr = (TProofQueryResult *) f->Get(k->GetName());
04408
04409 if (fProtocol < 13) {
04410 TDSet *d = 0;
04411 TObject *o = 0;
04412 TIter nxi(pqr->GetInputList());
04413 while ((o = nxi()))
04414 if ((d = dynamic_cast<TDSet *>(o)))
04415 break;
04416 d->SetWriteV3(kTRUE);
04417 }
04418 if (pqr) {
04419
04420
04421 Float_t qsz = (Float_t) f->GetSize();
04422 Int_t ilb = 0;
04423 static const char *clb[4] = { "bytes", "KB", "MB", "GB" };
04424 while (qsz > 1000. && ilb < 3) {
04425 qsz /= 1000.;
04426 ilb++;
04427 }
04428 SendAsynMessage(TString::Format("%s: sending result of %s:%s (%.1f %s)",
04429 fPrefix.Data(), pqr->GetTitle(), pqr->GetName(),
04430 qsz, clb[ilb]));
04431 fSocket->SendObject(pqr, kPROOF_RETRIEVE);
04432 } else {
04433 Info("HandleRetrieve",
04434 "query not found in file %s",fout.Data());
04435
04436 fSocket->SendObject(0, kPROOF_RETRIEVE);
04437 }
04438 break;
04439 }
04440 }
04441 f->Close();
04442 delete f;
04443 } else {
04444 Info("HandleRetrieve",
04445 "file cannot be open (%s)",fout.Data());
04446
04447 fSocket->SendObject(0, kPROOF_RETRIEVE);
04448 return;
04449 }
04450
04451
04452 return;
04453 }
04454
04455
04456 void TProofServ::HandleLibIncPath(TMessage *mess)
04457 {
04458
04459
04460 TString type;
04461 Bool_t add;
04462 TString path;
04463 (*mess) >> type >> add >> path;
04464
04465
04466 if ((type != "lib") && (type != "inc")) {
04467 Error("HandleLibIncPath","unknown action type: %s", type.Data());
04468 return;
04469 }
04470
04471
04472 path.ReplaceAll(","," ");
04473
04474
04475 TObjArray *op = 0;
04476 if (path.Length() > 0 && path != "-") {
04477 if (!(op = path.Tokenize(" "))) {
04478 Error("HandleLibIncPath","decomposing path %s", path.Data());
04479 return;
04480 }
04481 }
04482
04483 if (add) {
04484
04485 if (type == "lib") {
04486
04487
04488 TIter nxl(op, kIterBackward);
04489 TObjString *lib = 0;
04490 while ((lib = (TObjString *) nxl())) {
04491
04492 TString xlib = lib->GetName();
04493 gSystem->ExpandPathName(xlib);
04494
04495 if (!gSystem->AccessPathName(xlib, kReadPermission)) {
04496 TString newlibpath = gSystem->GetDynamicPath();
04497
04498 Int_t pos = 0;
04499 if (newlibpath.BeginsWith(".:"))
04500 pos = 2;
04501 if (newlibpath.Index(xlib) == kNPOS) {
04502 newlibpath.Insert(pos,TString::Format("%s:", xlib.Data()));
04503 gSystem->SetDynamicPath(newlibpath);
04504 }
04505 } else {
04506 Info("HandleLibIncPath",
04507 "libpath %s does not exist or cannot be read - not added", xlib.Data());
04508 }
04509 }
04510
04511
04512 if (IsMaster())
04513 fProof->AddDynamicPath(path);
04514
04515 } else {
04516
04517
04518 TIter nxi(op);
04519 TObjString *inc = 0;
04520 while ((inc = (TObjString *) nxi())) {
04521
04522 TString xinc = inc->GetName();
04523 gSystem->ExpandPathName(xinc);
04524
04525 if (!gSystem->AccessPathName(xinc, kReadPermission)) {
04526 TString curincpath = gSystem->GetIncludePath();
04527 if (curincpath.Index(xinc) == kNPOS)
04528 gSystem->AddIncludePath(TString::Format("-I%s", xinc.Data()));
04529 } else
04530 Info("HandleLibIncPath",
04531 "incpath %s does not exist or cannot be read - not added", xinc.Data());
04532 }
04533
04534
04535 if (IsMaster())
04536 fProof->AddIncludePath(path);
04537 }
04538
04539
04540 } else {
04541
04542 if (type == "lib") {
04543
04544
04545 TIter nxl(op);
04546 TObjString *lib = 0;
04547 while ((lib = (TObjString *) nxl())) {
04548
04549 TString xlib = lib->GetName();
04550 gSystem->ExpandPathName(xlib);
04551
04552 TString newlibpath = gSystem->GetDynamicPath();
04553 newlibpath.ReplaceAll(TString::Format("%s:", xlib.Data()),"");
04554 gSystem->SetDynamicPath(newlibpath);
04555 }
04556
04557
04558 if (IsMaster())
04559 fProof->RemoveDynamicPath(path);
04560
04561 } else {
04562
04563
04564 TIter nxi(op);
04565 TObjString *inc = 0;
04566 while ((inc = (TObjString *) nxi())) {
04567 TString newincpath = gSystem->GetIncludePath();
04568 newincpath.ReplaceAll(TString::Format("-I%s", inc->GetName()),"");
04569
04570 newincpath.ReplaceAll(gInterpreter->GetIncludePath(),"");
04571 gSystem->SetIncludePath(newincpath);
04572 }
04573
04574
04575 if (IsMaster())
04576 fProof->RemoveIncludePath(path);
04577 }
04578 }
04579 }
04580
04581
04582 void TProofServ::HandleCheckFile(TMessage *mess, TString *slb)
04583 {
04584
04585
04586 TString filenam;
04587 TMD5 md5;
04588 UInt_t opt = TProof::kUntar;
04589
04590 TMessage reply(kPROOF_CHECKFILE);
04591
04592
04593 (*mess) >> filenam >> md5;
04594 if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
04595 (*mess) >> opt;
04596
04597 if (slb) *slb = filenam;
04598
04599 if (filenam.BeginsWith("-")) {
04600
04601
04602 Int_t st = 0;
04603 Bool_t err = kFALSE;
04604 filenam = filenam.Strip(TString::kLeading, '-');
04605 TString packnam = filenam;
04606 packnam.Remove(packnam.Length() - 4);
04607
04608 fPackageLock->Lock();
04609 TMD5 *md5local = TMD5::FileChecksum(fPackageDir + "/" + filenam);
04610 if (md5local && md5 == (*md5local)) {
04611 if ((opt & TProof::kRemoveOld)) {
04612
04613 st = gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
04614 packnam.Data()));
04615 if (st)
04616 Error("HandleCheckFile", "failure executing: %s %s/%s",
04617 kRM, fPackageDir.Data(), packnam.Data());
04618 }
04619
04620 char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
04621 kExecutePermission);
04622 if (gunzip) {
04623
04624 st = gSystem->Exec(TString::Format(kUNTAR, gunzip, fPackageDir.Data(),
04625 filenam.Data(), fPackageDir.Data()));
04626 if (st)
04627 Error("HandleCheckFile", "failure executing: %s",
04628 TString::Format(kUNTAR, gunzip, fPackageDir.Data(),
04629 filenam.Data(), fPackageDir.Data()).Data());
04630 delete [] gunzip;
04631 } else
04632 Error("HandleCheckFile", "%s not found", kGUNZIP);
04633
04634 if (gSystem->AccessPathName(fPackageDir + "/" + packnam, kWritePermission)) {
04635
04636 reply << (Int_t)0;
04637 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04638 err = kTRUE;
04639 Error("HandleCheckFile", "package %s did not unpack into %s",
04640 filenam.Data(), packnam.Data());
04641 } else {
04642
04643 TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
04644 TMD5::WriteChecksum(md5f, md5local);
04645
04646 reply << (Int_t)1;
04647 PDB(kPackage, 1)
04648 Info("HandleCheckFile",
04649 "package %s installed on node", filenam.Data());
04650 }
04651 } else {
04652 reply << (Int_t)0;
04653 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04654 err = kTRUE;
04655 PDB(kPackage, 1)
04656 Info("HandleCheckFile",
04657 "package %s not yet on node", filenam.Data());
04658 }
04659
04660
04661
04662
04663
04664
04665
04666 if (err) {
04667
04668 gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
04669 filenam.Data()));
04670 fPackageLock->Unlock();
04671 } else if (IsMaster()) {
04672
04673 fPackageLock->Unlock();
04674 fProof->UploadPackage(fPackageDir + "/" + filenam, (TProof::EUploadPackageOpt)opt);
04675 } else {
04676
04677 fPackageLock->Unlock();
04678 }
04679 delete md5local;
04680 fSocket->Send(reply);
04681
04682 } else if (filenam.BeginsWith("+")) {
04683
04684 filenam = filenam.Strip(TString::kLeading, '+');
04685 TString packnam = filenam;
04686 packnam.Remove(packnam.Length() - 4);
04687 TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
04688 fPackageLock->Lock();
04689 TMD5 *md5local = TMD5::ReadChecksum(md5f);
04690 fPackageLock->Unlock();
04691 if (md5local && md5 == (*md5local)) {
04692
04693 reply << (Int_t)1;
04694 PDB(kPackage, 1)
04695 Info("HandleCheckFile",
04696 "package %s already on node", filenam.Data());
04697 if (IsMaster())
04698 fProof->UploadPackage(fPackageDir + "/" + filenam);
04699 } else {
04700 reply << (Int_t)0;
04701 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04702 PDB(kPackage, 1)
04703 Info("HandleCheckFile",
04704 "package %s not yet on node", filenam.Data());
04705 }
04706 delete md5local;
04707 fSocket->Send(reply);
04708
04709 } else if (filenam.BeginsWith("=")) {
04710
04711 filenam = filenam.Strip(TString::kLeading, '=');
04712 TString packnam = filenam;
04713 packnam.Remove(packnam.Length() - 4);
04714 TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
04715 fPackageLock->Lock();
04716 TMD5 *md5local = TMD5::ReadChecksum(md5f);
04717 fPackageLock->Unlock();
04718 if (md5local && md5 == (*md5local)) {
04719
04720 reply << (Int_t)1;
04721 PDB(kPackage, 1)
04722 Info("HandleCheckFile",
04723 "package %s already on node", filenam.Data());
04724 if (IsMaster())
04725 fProof->UploadPackage(fPackageDir + "/" + filenam);
04726 } else {
04727 reply << (Int_t)0;
04728 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04729 PDB(kPackage, 1)
04730 Info("HandleCheckFile",
04731 "package %s not yet on node", filenam.Data());
04732 }
04733 delete md5local;
04734 fSocket->Send(reply);
04735
04736 } else {
04737
04738 TString cachef = fCacheDir + "/" + filenam;
04739 fCacheLock->Lock();
04740 TMD5 *md5local = TMD5::FileChecksum(cachef);
04741
04742 if (md5local && md5 == (*md5local)) {
04743
04744 Bool_t cp = ((opt & TProof::kCp || opt & TProof::kCpBin) || (fProtocol <= 19)) ? kTRUE : kFALSE;
04745 if (cp) {
04746 Bool_t cpbin = (opt & TProof::kCpBin) ? kTRUE : kFALSE;
04747 CopyFromCache(filenam, cpbin);
04748 }
04749 reply << (Int_t)1;
04750 PDB(kCache, 1)
04751 Info("HandleCheckFile", "file %s already on node", filenam.Data());
04752 } else {
04753 reply << (Int_t)0;
04754 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04755 PDB(kCache, 1)
04756 Info("HandleCheckFile", "file %s not yet on node", filenam.Data());
04757 }
04758 delete md5local;
04759 fSocket->Send(reply);
04760 fCacheLock->Unlock();
04761 }
04762 }
04763
04764
04765 Int_t TProofServ::HandleCache(TMessage *mess, TString *slb)
04766 {
04767
04768
04769 PDB(kGlobal, 1)
04770 Info("HandleCache", "Enter");
04771
04772 Int_t status = 0;
04773 Int_t type = 0;
04774 Bool_t all = kFALSE;
04775 TMessage msg;
04776 Bool_t fromglobal = kFALSE;
04777
04778
04779 TString noth;
04780 const char *k = (IsMaster()) ? "Mst" : "Wrk";
04781 noth.Form("%s-%s", k, fOrdinal.Data());
04782
04783 TList *optls = 0;
04784 TString packagedir(fPackageDir), package, pdir, ocwd, file;
04785 (*mess) >> type;
04786 switch (type) {
04787 case TProof::kShowCache:
04788 (*mess) >> all;
04789 printf("*** File cache %s:%s ***\n", gSystem->HostName(),
04790 fCacheDir.Data());
04791 fflush(stdout);
04792 PDB(kCache, 1) {
04793 gSystem->Exec(TString::Format("%s -a %s", kLS, fCacheDir.Data()));
04794 } else {
04795 gSystem->Exec(TString::Format("%s %s", kLS, fCacheDir.Data()));
04796 }
04797 if (IsMaster() && all)
04798 fProof->ShowCache(all);
04799 LogToMaster();
04800 if (slb) slb->Form("%d %d", type, all);
04801 break;
04802 case TProof::kClearCache:
04803 file = "";
04804 if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
04805 fCacheLock->Lock();
04806 if (file.IsNull() || file == "*") {
04807 gSystem->Exec(TString::Format("%s %s/* %s/.*.binversion", kRM, fCacheDir.Data(), fCacheDir.Data()));
04808 } else {
04809 gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), file.Data()));
04810 }
04811 fCacheLock->Unlock();
04812 if (IsMaster())
04813 fProof->ClearCache(file);
04814 if (slb) slb->Form("%d %s", type, file.Data());
04815 break;
04816 case TProof::kShowPackages:
04817 (*mess) >> all;
04818 if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
04819
04820 TIter nxd(fGlobalPackageDirList);
04821 TNamed *nm = 0;
04822 while ((nm = (TNamed *)nxd())) {
04823 printf("*** Global Package cache %s %s:%s ***\n",
04824 nm->GetName(), gSystem->HostName(), nm->GetTitle());
04825 fflush(stdout);
04826 gSystem->Exec(TString::Format("%s %s", kLS, nm->GetTitle()));
04827 printf("\n");
04828 fflush(stdout);
04829 }
04830 }
04831 printf("*** Package cache %s:%s ***\n", gSystem->HostName(),
04832 fPackageDir.Data());
04833 fflush(stdout);
04834 gSystem->Exec(TString::Format("%s %s", kLS, fPackageDir.Data()));
04835 if (IsMaster() && all)
04836 fProof->ShowPackages(all);
04837 LogToMaster();
04838 if (slb) slb->Form("%d %d", type, all);
04839 break;
04840 case TProof::kClearPackages:
04841 status = UnloadPackages();
04842 if (status == 0) {
04843 fPackageLock->Lock();
04844 gSystem->Exec(TString::Format("%s %s/*", kRM, fPackageDir.Data()));
04845 fPackageLock->Unlock();
04846 if (IsMaster())
04847 status = fProof->ClearPackages();
04848 }
04849 if (slb) slb->Form("%d %d", type, status);
04850 break;
04851 case TProof::kClearPackage:
04852 (*mess) >> package;
04853 status = UnloadPackage(package);
04854 if (status == 0) {
04855 fPackageLock->Lock();
04856
04857 gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
04858 package.Data()));
04859 if (IsMaster())
04860 gSystem->Exec(TString::Format("%s %s/%s.par", kRM, fPackageDir.Data(),
04861 package.Data()));
04862 fPackageLock->Unlock();
04863 if (IsMaster())
04864 status = fProof->ClearPackage(package);
04865 }
04866 if (slb) slb->Form("%d %s %d", type, package.Data(), status);
04867 break;
04868 case TProof::kBuildPackage:
04869 (*mess) >> package;
04870
04871
04872 pdir = fPackageDir + "/" + package;
04873
04874 fromglobal = kFALSE;
04875 if (gSystem->AccessPathName(pdir, kReadPermission) ||
04876 gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
04877
04878 if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
04879
04880 TIter nxd(fGlobalPackageDirList);
04881 TNamed *nm = 0;
04882 while ((nm = (TNamed *)nxd())) {
04883 pdir.Form("%s/%s", nm->GetTitle(), package.Data());
04884 if (!gSystem->AccessPathName(pdir, kReadPermission) &&
04885 !gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
04886
04887 fromglobal = kTRUE;
04888 packagedir = nm->GetTitle();
04889 break;
04890 }
04891 pdir = "";
04892 }
04893 if (pdir.Length() <= 0) {
04894
04895 SendAsynMessage(TString::Format("%s: kBuildPackage: failure locating %s ...",
04896 noth.Data(), package.Data()));
04897 break;
04898 }
04899 }
04900 }
04901
04902 if (IsMaster() && !fromglobal) {
04903
04904 fProof->UploadPackage(pdir + ".par");
04905 }
04906 fPackageLock->Lock();
04907
04908 if (!status) {
04909
04910 PDB(kPackage, 1)
04911 Info("HandleCache",
04912 "kBuildPackage: package %s exists and has PROOF-INF directory", package.Data());
04913
04914 ocwd = gSystem->WorkingDirectory();
04915 gSystem->ChangeDirectory(pdir);
04916
04917
04918 if (IsMaster())
04919 fProof->BuildPackage(package, TProof::kBuildOnSlavesNoWait);
04920
04921
04922 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
04923
04924 SendAsynMessage(TString::Format("%s: building %s ...", noth.Data(), package.Data()));
04925
04926
04927
04928 Bool_t savever = kFALSE;
04929 TString v;
04930 Int_t rev = -1;
04931 FILE *f = fopen("PROOF-INF/proofvers.txt", "r");
04932 if (f) {
04933 TString r;
04934 v.Gets(f);
04935 r.Gets(f);
04936 rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
04937 fclose(f);
04938 }
04939 if (!f || v != gROOT->GetVersion() ||
04940 (gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision())) {
04941 if (!fromglobal || !gSystem->AccessPathName(pdir, kWritePermission)) {
04942 savever = kTRUE;
04943 SendAsynMessage(TString::Format("%s: %s: version change (current: %s:%d,"
04944 " build: %s:%d): cleaning ... ",
04945 noth.Data(), package.Data(), gROOT->GetVersion(),
04946 gROOT->GetSvnRevision(), v.Data(), rev));
04947
04948 gSystem->ChangeDirectory(packagedir);
04949
04950 gSystem->Exec(TString::Format("%s %s", kRM, pdir.Data()));
04951
04952 char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
04953 kExecutePermission);
04954 if (gunzip) {
04955 TString par;
04956 par.Form("%s.par", pdir.Data());
04957
04958 TString cmd;
04959 cmd.Form(kUNTAR3, gunzip, par.Data());
04960 status = gSystem->Exec(cmd);
04961 if (status) {
04962 Error("HandleCache", "kBuildPackage: failure executing: %s", cmd.Data());
04963 } else {
04964
04965 TMD5 *md5local = TMD5::FileChecksum(par);
04966 if (md5local) {
04967 TString md5f = packagedir + "/" + package + "/PROOF-INF/md5.txt";
04968 TMD5::WriteChecksum(md5f, md5local);
04969
04970 gSystem->ChangeDirectory(pdir);
04971
04972 SafeDelete(md5local);
04973 } else {
04974 Error("HandleCache", "kBuildPackage: failure calculating MD5sum for '%s'", par.Data());
04975 }
04976 }
04977 delete [] gunzip;
04978 } else
04979 Error("HandleCache", "kBuildPackage: %s not found", kGUNZIP);
04980 } else {
04981 SendAsynMessage(TString::Format("%s: %s: ROOT version inconsistency (current: %s, build: %s):"
04982 " global package: cannot re-build!!! ",
04983 noth.Data(), package.Data(), gROOT->GetVersion(), v.Data()));
04984 }
04985 }
04986
04987 if (!status) {
04988
04989
04990
04991
04992
04993 TString ipath(gSystem->GetIncludePath());
04994 ipath.ReplaceAll("\"","");
04995 TString cmd;
04996 cmd.Form("export ROOTINCLUDEPATH=\"%s\" ; PROOF-INF/BUILD.sh", ipath.Data());
04997 {
04998 TProofServLogHandlerGuard hg(cmd, fSocket);
04999 }
05000 if (!(status = TProofServLogHandler::GetCmdRtn())) {
05001
05002 if (savever) {
05003 f = fopen("PROOF-INF/proofvers.txt", "w");
05004 if (f) {
05005 fputs(gROOT->GetVersion(), f);
05006 fputs(TString::Format("\n%d",gROOT->GetSvnRevision()), f);
05007 fclose(f);
05008 }
05009 }
05010 }
05011 }
05012 } else {
05013
05014 PDB(kPackage, 1)
05015 Info("HandleCache", "no PROOF-INF/BUILD.sh found for package %s", package.Data());
05016 }
05017 gSystem->ChangeDirectory(ocwd);
05018 }
05019
05020 fPackageLock->Unlock();
05021
05022 if (status) {
05023
05024 SendAsynMessage(TString::Format("%s: failure building %s ... (status: %d)", noth.Data(), package.Data(), status));
05025 } else {
05026
05027 if (IsMaster())
05028 status = fProof->BuildPackage(package, TProof::kCollectBuildResults);
05029 PDB(kPackage, 1)
05030 Info("HandleCache", "package %s successfully built", package.Data());
05031 }
05032 if (slb) slb->Form("%d %s %d", type, package.Data(), status);
05033 break;
05034 case TProof::kLoadPackage:
05035 (*mess) >> package;
05036
05037
05038 if (fEnabledPackages->FindObject(package)) {
05039 Info("HandleCache",
05040 "package %s already loaded", package.Data());
05041 break;
05042 }
05043
05044
05045 pdir = fPackageDir + "/" + package;
05046
05047 if (gSystem->AccessPathName(pdir, kReadPermission)) {
05048
05049 if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
05050
05051 TIter nxd(fGlobalPackageDirList);
05052 TNamed *nm = 0;
05053 while ((nm = (TNamed *)nxd())) {
05054 pdir.Form("%s/%s", nm->GetTitle(), package.Data());
05055 if (!gSystem->AccessPathName(pdir, kReadPermission)) {
05056
05057 break;
05058 }
05059 pdir = "";
05060 }
05061 if (pdir.Length() <= 0) {
05062
05063 SendAsynMessage(TString::Format("%s: kLoadPackage: failure locating %s ...",
05064 noth.Data(), package.Data()));
05065 break;
05066 }
05067 }
05068 }
05069
05070 ocwd = gSystem->WorkingDirectory();
05071 gSystem->ChangeDirectory(pdir);
05072
05073
05074 fPackageLock->Lock();
05075
05076
05077 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
05078
05079 TString setup, setupfn;
05080 setup.Form("SETUP_%x", package.Hash());
05081
05082 setupfn.Form("%s/%s.C", gSystem->TempDirectory(), setup.Data());
05083 TMacro setupmc("PROOF-INF/SETUP.C");
05084 TObjString *setupline = setupmc.GetLineWith("SETUP(");
05085 if (setupline) {
05086 TString setupstring(setupline->GetString());
05087 setupstring.ReplaceAll("SETUP(", TString::Format("%s(", setup.Data()));
05088 setupline->SetString(setupstring);
05089 } else {
05090
05091 SendAsynMessage(TString::Format("%s: warning: macro '%s/PROOF-INF/SETUP.C' does not contain a SETUP()"
05092 " function", noth.Data(), package.Data()));
05093 }
05094 setupmc.SaveSource(setupfn.Data());
05095
05096 if (gROOT->LoadMacro(setupfn.Data()) != 0) {
05097
05098 SendAsynMessage(TString::Format("%s: error: macro '%s/PROOF-INF/SETUP.C' could not be loaded:"
05099 " cannot continue",
05100 noth.Data(), package.Data()));
05101 status = -1;
05102 } else {
05103
05104 TFunction *fun = (TFunction *) gROOT->GetListOfGlobalFunctions()->FindObject(setup);
05105 if (!fun) {
05106
05107 SendAsynMessage(TString::Format("%s: error: function SETUP() not found in macro '%s/PROOF-INF/SETUP.C':"
05108 " cannot continue",
05109 noth.Data(), package.Data()));
05110 status = -1;
05111 } else {
05112 TMethodCall callEnv;
05113
05114 if (fun->GetNargs() == 0) {
05115
05116 callEnv.InitWithPrototype(setup.Data(),"");
05117 if ((mess->BufferSize() > mess->Length())) {
05118 (*mess) >> optls;
05119 SendAsynMessage(TString::Format("%s: warning: loaded SETUP() does not take any argument:"
05120 " the specified argument will be ignored", noth.Data()));
05121 }
05122 } else if (fun->GetNargs() == 1) {
05123 TMethodArg *arg = (TMethodArg *) fun->GetListOfMethodArgs()->First();
05124 if (arg) {
05125
05126 if ((mess->BufferSize() > mess->Length())) (*mess) >> optls;
05127
05128 TString argsig(arg->GetTitle());
05129 if (argsig.BeginsWith("TList")) {
05130 callEnv.InitWithPrototype(setup.Data(),"TList *");
05131 callEnv.ResetParam();
05132 callEnv.SetParam((Long_t) optls);
05133 } else if (argsig.BeginsWith("const char")) {
05134 callEnv.InitWithPrototype(setup.Data(),"const char *");
05135 callEnv.ResetParam();
05136 TObjString *os = optls ? dynamic_cast<TObjString *>(optls->First()) : 0;
05137 if (os) {
05138 callEnv.SetParam((Long_t) os->GetName());
05139 } else {
05140 if (optls && optls->First()) {
05141 SendAsynMessage(TString::Format("%s: warning: found object argument of type %s:"
05142 " SETUP expects 'const char *': ignoring",
05143 noth.Data(), optls->First()->ClassName()));
05144 }
05145 callEnv.SetParam((Long_t) 0);
05146 }
05147 } else {
05148
05149 SendAsynMessage(TString::Format("%s: error: unsupported SETUP signature: SETUP(%s)"
05150 " cannot continue", noth.Data(), arg->GetTitle()));
05151 status = -1;
05152 }
05153 } else {
05154
05155 SendAsynMessage(TString::Format("%s: error: cannot get information about the SETUP() argument:"
05156 " cannot continue", noth.Data()));
05157 status = -1;
05158 }
05159 } else if (fun->GetNargs() > 1) {
05160
05161 SendAsynMessage(TString::Format("%s: error: function SETUP() can have at most a 'TList *' argument:"
05162 " cannot continue", noth.Data()));
05163 status = -1;
05164 }
05165
05166 Long_t setuprc = (status == 0) ? 0 : -1;
05167 if (status == 0) {
05168 callEnv.Execute(setuprc);
05169 if (setuprc < 0) status = -1;
05170 }
05171 }
05172 }
05173 if (!gSystem->AccessPathName(setupfn.Data())) gSystem->Unlink(setupfn.Data());
05174 }
05175
05176
05177 fPackageLock->Unlock();
05178
05179 gSystem->ChangeDirectory(ocwd);
05180
05181 if (status < 0) {
05182
05183
05184 SendAsynMessage(TString::Format("%s: failure loading %s ...", noth.Data(), package.Data()));
05185
05186 } else {
05187
05188
05189 gSystem->Symlink(pdir, package);
05190
05191
05192
05193 gSystem->AddIncludePath(TString("-I") + package);
05194
05195
05196 gROOT->ProcessLine(TString(".include ") + package);
05197
05198
05199 fEnabledPackages->Add(new TObjString(package));
05200 if (IsMaster()) {
05201 if (optls && optls->GetSize() > 0) {
05202
05203 status = fProof->LoadPackage(package, kFALSE, optls);
05204 } else {
05205
05206 status = fProof->LoadPackage(package);
05207 }
05208 }
05209
05210 PDB(kPackage, 1)
05211 Info("HandleCache", "package %s successfully loaded", package.Data());
05212 }
05213 if (slb) slb->Form("%d %s %d", type, package.Data(), status);
05214 break;
05215 case TProof::kShowEnabledPackages:
05216 (*mess) >> all;
05217 if (IsMaster()) {
05218 if (all)
05219 printf("*** Enabled packages on master %s on %s\n",
05220 fOrdinal.Data(), gSystem->HostName());
05221 else
05222 printf("*** Enabled packages ***\n");
05223 } else {
05224 printf("*** Enabled packages on slave %s on %s\n",
05225 fOrdinal.Data(), gSystem->HostName());
05226 }
05227 {
05228 TIter next(fEnabledPackages);
05229 while (TObjString *str = (TObjString*) next())
05230 printf("%s\n", str->GetName());
05231 }
05232 if (IsMaster() && all)
05233 fProof->ShowEnabledPackages(all);
05234 LogToMaster();
05235 if (slb) slb->Form("%d %d", type, all);
05236 break;
05237 case TProof::kShowSubCache:
05238 (*mess) >> all;
05239 if (IsMaster() && all)
05240 fProof->ShowCache(all);
05241 LogToMaster();
05242 if (slb) slb->Form("%d %d", type, all);
05243 break;
05244 case TProof::kClearSubCache:
05245 file = "";
05246 if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
05247 if (IsMaster())
05248 fProof->ClearCache(file);
05249 if (slb) slb->Form("%d %s", type, file.Data());
05250 break;
05251 case TProof::kShowSubPackages:
05252 (*mess) >> all;
05253 if (IsMaster() && all)
05254 fProof->ShowPackages(all);
05255 LogToMaster();
05256 if (slb) slb->Form("%d %d", type, all);
05257 break;
05258 case TProof::kDisableSubPackages:
05259 if (IsMaster())
05260 fProof->DisablePackages();
05261 if (slb) slb->Form("%d", type);
05262 break;
05263 case TProof::kDisableSubPackage:
05264 (*mess) >> package;
05265 if (IsMaster())
05266 fProof->DisablePackage(package);
05267 if (slb) slb->Form("%d %s", type, package.Data());
05268 break;
05269 case TProof::kBuildSubPackage:
05270 (*mess) >> package;
05271 if (IsMaster())
05272 fProof->BuildPackage(package);
05273 if (slb) slb->Form("%d %s", type, package.Data());
05274 break;
05275 case TProof::kUnloadPackage:
05276 (*mess) >> package;
05277 status = UnloadPackage(package);
05278 if (IsMaster() && status == 0)
05279 status = fProof->UnloadPackage(package);
05280 if (slb) slb->Form("%d %s %d", type, package.Data(), status);
05281 break;
05282 case TProof::kDisablePackage:
05283 (*mess) >> package;
05284 fPackageLock->Lock();
05285
05286 gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
05287 package.Data()));
05288 gSystem->Exec(TString::Format("%s %s/%s.par", kRM, fPackageDir.Data(),
05289 package.Data()));
05290 fPackageLock->Unlock();
05291 if (IsMaster())
05292 fProof->DisablePackage(package);
05293 if (slb) slb->Form("%d %s", type, package.Data());
05294 break;
05295 case TProof::kUnloadPackages:
05296 status = UnloadPackages();
05297 if (IsMaster() && status == 0)
05298 status = fProof->UnloadPackages();
05299 if (slb) slb->Form("%d %s %d", type, package.Data(), status);
05300 break;
05301 case TProof::kDisablePackages:
05302 fPackageLock->Lock();
05303 gSystem->Exec(TString::Format("%s %s/*", kRM, fPackageDir.Data()));
05304 fPackageLock->Unlock();
05305 if (IsMaster())
05306 fProof->DisablePackages();
05307 if (slb) slb->Form("%d %s", type, package.Data());
05308 break;
05309 case TProof::kListEnabledPackages:
05310 msg.Reset(kPROOF_PACKAGE_LIST);
05311 msg << type << fEnabledPackages;
05312 fSocket->Send(msg);
05313 if (slb) slb->Form("%d", type);
05314 break;
05315 case TProof::kListPackages:
05316 {
05317 TList *pack = new TList;
05318 void *dir = gSystem->OpenDirectory(fPackageDir);
05319 if (dir) {
05320 TString pac(gSystem->GetDirEntry(dir));
05321 while (pac.Length() > 0) {
05322 if (pac.EndsWith(".par")) {
05323 pac.ReplaceAll(".par","");
05324 pack->Add(new TObjString(pac.Data()));
05325 }
05326 pac = gSystem->GetDirEntry(dir);
05327 }
05328 }
05329 gSystem->FreeDirectory(dir);
05330 msg.Reset(kPROOF_PACKAGE_LIST);
05331 msg << type << pack;
05332 fSocket->Send(msg);
05333 }
05334 if (slb) slb->Form("%d", type);
05335 break;
05336 case TProof::kLoadMacro:
05337
05338 (*mess) >> package;
05339
05340
05341
05342 if (IsMaster())
05343 fProof->Load(package, kFALSE, kTRUE);
05344
05345
05346 fCacheLock->Lock();
05347
05348
05349
05350 CopyFromCache(package, kTRUE);
05351
05352
05353 Info("HandleCache", "loading macro %s ...", package.Data());
05354 gROOT->ProcessLine(TString::Format(".L %s", package.Data()));
05355
05356
05357 CopyToCache(package, 1);
05358
05359
05360 fCacheLock->Unlock();
05361
05362
05363
05364 if (IsMaster())
05365 fProof->Load(package, kFALSE, kFALSE);
05366
05367
05368 LogToMaster();
05369
05370 if (slb) slb->Form("%d %s", type, package.Data());
05371 break;
05372 default:
05373 Error("HandleCache", "unknown type %d", type);
05374 break;
05375 }
05376
05377
05378 return status;
05379 }
05380
05381
05382 void TProofServ::HandleWorkerLists(TMessage *mess)
05383 {
05384
05385
05386 PDB(kGlobal, 1)
05387 Info("HandleWorkerLists", "Enter");
05388
05389 Int_t type = 0;
05390 TString ord;
05391
05392 (*mess) >> type;
05393
05394 switch (type) {
05395 case TProof::kActivateWorker:
05396 (*mess) >> ord;
05397 if (fProof) {
05398 Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
05399 Int_t nactmax = fProof->GetListOfSlaves()->GetSize() -
05400 fProof->GetListOfBadSlaves()->GetSize();
05401 if (nact < nactmax) {
05402 fProof->ActivateWorker(ord);
05403 Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
05404 if (ord == "*") {
05405 if (nactnew == nactmax) {
05406 Info("HandleWorkerList","all workers (re-)activated");
05407 } else {
05408 Info("HandleWorkerList","%d workers could not be (re-)activated", nactmax - nactnew);
05409 }
05410 } else {
05411 if (nactnew == (nact + 1)) {
05412 Info("HandleWorkerList","worker %s (re-)activated", ord.Data());
05413 } else {
05414 Info("HandleWorkerList","worker %s could not be (re-)activated;"
05415 " # of actives: %d --> %d", ord.Data(), nact, nactnew);
05416 }
05417 }
05418 } else {
05419 Info("HandleWorkerList","all workers are already active");
05420 }
05421 } else {
05422 Warning("HandleWorkerList","undefined PROOF session: protocol error?");
05423 }
05424 break;
05425 case TProof::kDeactivateWorker:
05426 (*mess) >> ord;
05427 if (fProof) {
05428 Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
05429 if (nact > 0) {
05430 fProof->DeactivateWorker(ord);
05431 Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
05432 if (ord == "*") {
05433 if (nactnew == 0) {
05434 Info("HandleWorkerList","all workers deactivated");
05435 } else {
05436 Info("HandleWorkerList","%d workers could not be deactivated", nactnew);
05437 }
05438 } else {
05439 if (nactnew == (nact - 1)) {
05440 Info("HandleWorkerList","worker %s deactivated", ord.Data());
05441 } else {
05442 Info("HandleWorkerList","worker %s could not be deactivated:"
05443 " # of actives: %d --> %d", ord.Data(), nact, nactnew);
05444 }
05445 }
05446 } else {
05447 Info("HandleWorkerList","all workers are already inactive");
05448 }
05449 } else {
05450 Warning("HandleWorkerList","undefined PROOF session: protocol error?");
05451 }
05452 break;
05453 default:
05454 Warning("HandleWorkerList","unknown action type (%d)", type);
05455 }
05456 }
05457
05458
05459 TProofServ::EQueryAction TProofServ::GetWorkers(TList *workers,
05460 Int_t & ,
05461 Bool_t )
05462 {
05463
05464
05465
05466
05467 TProofResourcesStatic *resources =
05468 new TProofResourcesStatic(fConfDir, fConfFile);
05469 fConfFile = resources->GetFileName();
05470 PDB(kGlobal,1)
05471 Info("GetWorkers", "using PROOF config file: %s", fConfFile.Data());
05472
05473
05474 TProofNodeInfo *master = resources->GetMaster();
05475 if (!master) {
05476 PDB(kAll,1)
05477 Info("GetWorkers",
05478 "no appropriate master line found in %s", fConfFile.Data());
05479 return kQueryStop;
05480 } else {
05481
05482 if (fImage.IsNull() && strlen(master->GetImage()) > 0)
05483 fImage = master->GetImage();
05484 }
05485
05486
05487 if (workers) {
05488 if (resources->GetSubmasters() && resources->GetSubmasters()->GetSize() > 0) {
05489 PDB(kAll,1)
05490 resources->GetSubmasters()->Print();
05491 TProofNodeInfo *ni = 0;
05492 TIter nw(resources->GetSubmasters());
05493 while ((ni = (TProofNodeInfo *) nw()))
05494 workers->Add(new TProofNodeInfo(*ni));
05495 } else if (resources->GetWorkers() && resources->GetWorkers()->GetSize() > 0) {
05496 PDB(kAll,1)
05497 resources->GetWorkers()->Print();
05498 TProofNodeInfo *ni = 0;
05499 TIter nw(resources->GetWorkers());
05500 while ((ni = (TProofNodeInfo *) nw()))
05501 workers->Add(new TProofNodeInfo(*ni));
05502 }
05503 }
05504
05505
05506 return kQueryOK;
05507 }
05508
05509
05510 FILE *TProofServ::SetErrorHandlerFile(FILE *ferr)
05511 {
05512
05513
05514
05515
05516 FILE *oldferr = fgErrorHandlerFile;
05517 fgErrorHandlerFile = (ferr) ? ferr : stderr;
05518 return oldferr;
05519 }
05520
05521
05522 void TProofServ::ErrorHandler(Int_t level, Bool_t abort, const char *location,
05523 const char *msg)
05524 {
05525
05526
05527
05528 if (gErrorIgnoreLevel == kUnset) {
05529 gErrorIgnoreLevel = 0;
05530 if (gEnv) {
05531 TString lvl = gEnv->GetValue("Root.ErrorIgnoreLevel", "Print");
05532 if (!lvl.CompareTo("Print", TString::kIgnoreCase))
05533 gErrorIgnoreLevel = kPrint;
05534 else if (!lvl.CompareTo("Info", TString::kIgnoreCase))
05535 gErrorIgnoreLevel = kInfo;
05536 else if (!lvl.CompareTo("Warning", TString::kIgnoreCase))
05537 gErrorIgnoreLevel = kWarning;
05538 else if (!lvl.CompareTo("Error", TString::kIgnoreCase))
05539 gErrorIgnoreLevel = kError;
05540 else if (!lvl.CompareTo("Break", TString::kIgnoreCase))
05541 gErrorIgnoreLevel = kBreak;
05542 else if (!lvl.CompareTo("SysError", TString::kIgnoreCase))
05543 gErrorIgnoreLevel = kSysError;
05544 else if (!lvl.CompareTo("Fatal", TString::kIgnoreCase))
05545 gErrorIgnoreLevel = kFatal;
05546 }
05547 }
05548
05549 if (level < gErrorIgnoreLevel)
05550 return;
05551
05552
05553 if (level >= kError && gProofServ)
05554 gProofServ->LogToMaster();
05555
05556 Bool_t tosyslog = (fgLogToSysLog > 2) ? kTRUE : kFALSE;
05557
05558 const char *type = 0;
05559 ELogLevel loglevel = kLogInfo;
05560
05561 Int_t ipos = (location) ? strlen(location) : 0;
05562
05563 if (level >= kPrint) {
05564 loglevel = kLogInfo;
05565 type = "Print";
05566 }
05567 if (level >= kInfo) {
05568 loglevel = kLogInfo;
05569 char *ps = location ? (char *) strrchr(location, '|') : (char *)0;
05570 if (ps) {
05571 ipos = (int)(ps - (char *)location);
05572 type = "SvcMsg";
05573 } else {
05574 type = "Info";
05575 }
05576 }
05577 if (level >= kWarning) {
05578 loglevel = kLogWarning;
05579 type = "Warning";
05580 }
05581 if (level >= kError) {
05582 loglevel = kLogErr;
05583 type = "Error";
05584 }
05585 if (level >= kBreak) {
05586 loglevel = kLogErr;
05587 type = "*** Break ***";
05588 }
05589 if (level >= kSysError) {
05590 loglevel = kLogErr;
05591 type = "SysError";
05592 }
05593 if (level >= kFatal) {
05594 loglevel = kLogErr;
05595 type = "Fatal";
05596 }
05597
05598
05599 TString buf;
05600
05601
05602 TTimeStamp ts;
05603 TString st(ts.AsString("lc"),19);
05604
05605 if (!location || ipos == 0 ||
05606 (level >= kPrint && level < kInfo) ||
05607 (level >= kBreak && level < kSysError)) {
05608 fprintf(fgErrorHandlerFile, "%s %5d %s | %s: %s\n", st(11,8).Data(),
05609 gSystem->GetPid(),
05610 (gProofServ ? gProofServ->GetPrefix() : "proof"),
05611 type, msg);
05612 if (tosyslog)
05613 buf.Form("%s: %s:%s", fgSysLogEntity.Data(), type, msg);
05614 } else {
05615 fprintf(fgErrorHandlerFile, "%s %5d %s | %s in <%.*s>: %s\n", st(11,8).Data(),
05616 gSystem->GetPid(),
05617 (gProofServ ? gProofServ->GetPrefix() : "proof"),
05618 type, ipos, location, msg);
05619 if (tosyslog)
05620 buf.Form("%s: %s:<%.*s>: %s", fgSysLogEntity.Data(), type, ipos, location, msg);
05621 }
05622 fflush(fgErrorHandlerFile);
05623
05624 if (tosyslog)
05625 gSystem->Syslog(loglevel, buf);
05626
05627 if (abort) {
05628
05629 static Bool_t recursive = kFALSE;
05630
05631 if (gProofServ != 0 && !recursive) {
05632 recursive = kTRUE;
05633 gProofServ->GetSocket()->Send(kPROOF_FATAL);
05634 recursive = kFALSE;
05635 }
05636
05637 fprintf(fgErrorHandlerFile, "aborting\n");
05638 fflush(fgErrorHandlerFile);
05639 gSystem->StackTrace();
05640 gSystem->Abort();
05641 }
05642 }
05643
05644
05645 Int_t TProofServ::CopyFromCache(const char *macro, Bool_t cpbin)
05646 {
05647
05648
05649
05650
05651 if (!macro || strlen(macro) <= 0)
05652
05653 return -1;
05654
05655
05656 TString name = macro;
05657 TString acmode, args, io;
05658 name = gSystem->SplitAclicMode(name, acmode, args, io);
05659
05660 PDB(kGlobal,1)
05661 Info("CopyFromCache","enter: names: %s, %s", macro, name.Data());
05662
05663
05664 Bool_t locked = (fCacheLock->IsLocked()) ? kTRUE : kFALSE;
05665 if (!locked) fCacheLock->Lock();
05666
05667
05668 Bool_t assertfile = kFALSE;
05669 TString srcname(name);
05670 Int_t dot = srcname.Last('.');
05671 if (dot != kNPOS) {
05672 srcname.Remove(dot);
05673 srcname += ".*";
05674 } else {
05675 assertfile = kTRUE;
05676 }
05677 srcname.Insert(0, TString::Format("%s/",fCacheDir.Data()));
05678 dot = (dot != kNPOS) ? srcname.Last('.') : dot;
05679
05680 if (assertfile) {
05681 if (gSystem->AccessPathName(srcname)) {
05682 PDB(kCache,1)
05683 Info("CopyFromCache", "file %s not in cache", srcname.Data());
05684 if (!locked) fCacheLock->Unlock();
05685 return 0;
05686 }
05687 }
05688 PDB(kCache,1)
05689 Info("CopyFromCache", "retrieving %s from cache", srcname.Data());
05690 gSystem->Exec(TString::Format("%s %s .", kCP, srcname.Data()));
05691
05692
05693 if (!cpbin) {
05694
05695 if (!locked) fCacheLock->Unlock();
05696 return 0;
05697 }
05698
05699
05700 TString binname = name;
05701 dot = binname.Last('.');
05702 if (dot != kNPOS) {
05703 binname.Replace(dot,1,"_");
05704 binname += ".";
05705 } else {
05706 PDB(kCache,1)
05707 Info("CopyFromCache",
05708 "non-standard name structure: %s ('.' missing)", name.Data());
05709
05710 if (!locked) fCacheLock->Unlock();
05711 return 0;
05712 }
05713
05714
05715 TString vername;
05716 vername.Form(".%s", name.Data());
05717 Int_t dotv = vername.Last('.');
05718 if (dotv != kNPOS)
05719 vername.Remove(dotv);
05720 vername += ".binversion";
05721
05722
05723 TString v;
05724 Int_t rev = -1;
05725 Bool_t okfil = kFALSE;
05726 FILE *f = fopen(TString::Format("%s/%s", fCacheDir.Data(), vername.Data()), "r");
05727 if (f) {
05728 TString r;
05729 v.Gets(f);
05730 r.Gets(f);
05731 rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
05732 fclose(f);
05733 okfil = kTRUE;
05734 }
05735
05736 Bool_t okver = (v != gROOT->GetVersion()) ? kFALSE : kTRUE;
05737 Bool_t okrev = (gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision()) ? kFALSE : kTRUE;
05738 if (!okfil || !okver || !okrev) {
05739 PDB(kCache,1)
05740 Info("CopyFromCache",
05741 "removing binaries: 'file': %s, 'ROOT version': %s, 'ROOT revision': %s",
05742 (okfil ? "OK" : "not OK"), (okver ? "OK" : "not OK"), (okrev ? "OK" : "not OK") );
05743
05744 binname += "*";
05745 gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), binname.Data()));
05746
05747 gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), vername.Data()));
05748
05749 if (!locked) fCacheLock->Unlock();
05750 return 0;
05751 }
05752
05753
05754 void *dirp = gSystem->OpenDirectory(fCacheDir);
05755 if (dirp) {
05756 const char *e = 0;
05757 while ((e = gSystem->GetDirEntry(dirp))) {
05758 if (!strncmp(e, binname.Data(), binname.Length())) {
05759 TString fncache;
05760 fncache.Form("%s/%s", fCacheDir.Data(), e);
05761 Bool_t docp = kTRUE;
05762 FileStat_t stlocal, stcache;
05763 if (!gSystem->GetPathInfo(fncache, stcache)) {
05764 Int_t rc = gSystem->GetPathInfo(e, stlocal);
05765 if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
05766 docp = kFALSE;
05767
05768 if (docp) {
05769 TMD5 *md5local = TMD5::FileChecksum(e);
05770 TMD5 *md5cache = TMD5::FileChecksum(fncache);
05771 if (md5local && md5cache && md5local == md5cache) docp = kFALSE;
05772 SafeDelete(md5local);
05773 SafeDelete(md5cache);
05774 }
05775
05776 if (docp) {
05777 gSystem->Exec(TString::Format("%s %s", kRM, e));
05778 PDB(kCache,1)
05779 Info("CopyFromCache",
05780 "retrieving %s from cache", fncache.Data());
05781 gSystem->Exec(TString::Format("%s %s %s", kCP, fncache.Data(), e));
05782 }
05783 }
05784 }
05785 }
05786 gSystem->FreeDirectory(dirp);
05787 }
05788
05789
05790 if (!locked) fCacheLock->Unlock();
05791
05792
05793 return 0;
05794 }
05795
05796
05797 Int_t TProofServ::CopyToCache(const char *macro, Int_t opt)
05798 {
05799
05800
05801
05802
05803
05804
05805
05806
05807
05808
05809 if (!macro || strlen(macro) <= 0 || opt < 0 || opt > 1)
05810
05811 return -1;
05812
05813
05814 TString name = macro;
05815 TString acmode, args, io;
05816 name = gSystem->SplitAclicMode(name, acmode, args, io);
05817
05818 PDB(kGlobal,1)
05819 Info("CopyToCache","enter: opt: %d, names: %s, %s", opt, macro, name.Data());
05820
05821
05822 TString binname = name;
05823 Int_t dot = binname.Last('.');
05824 if (dot != kNPOS)
05825 binname.Replace(dot,1,"_");
05826
05827
05828 TString vername;
05829 vername.Form(".%s", name.Data());
05830 dot = vername.Last('.');
05831 if (dot != kNPOS)
05832 vername.Remove(dot);
05833 vername += ".binversion";
05834 Bool_t savever = kFALSE;
05835
05836
05837 Bool_t locked = (fCacheLock->IsLocked()) ? kTRUE : kFALSE;
05838 if (!locked) fCacheLock->Lock();
05839
05840
05841 if (opt == 0) {
05842
05843 PDB(kCache,1)
05844 Info("CopyToCache",
05845 "caching %s/%s ...", fCacheDir.Data(), name.Data());
05846 gSystem->Exec(TString::Format("%s %s %s", kCP, name.Data(), fCacheDir.Data()));
05847
05848 if (dot != kNPOS) {
05849 binname += ".*";
05850 PDB(kCache,1)
05851 Info("CopyToCache", "opt = 0: removing binaries '%s'", binname.Data());
05852 gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), binname.Data()));
05853 gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), vername.Data()));
05854 }
05855 } else if (opt == 1) {
05856
05857 if (dot != kNPOS) {
05858 binname += ".";
05859 void *dirp = gSystem->OpenDirectory(".");
05860 if (dirp) {
05861 const char *e = 0;
05862 while ((e = gSystem->GetDirEntry(dirp))) {
05863 if (!strncmp(e, binname.Data(), binname.Length())) {
05864 Bool_t docp = kTRUE;
05865 FileStat_t stlocal, stcache;
05866 if (!gSystem->GetPathInfo(e, stlocal)) {
05867 TString fncache;
05868 fncache.Form("%s/%s", fCacheDir.Data(), e);
05869 Int_t rc = gSystem->GetPathInfo(fncache, stcache);
05870 if (rc == 0 && (stlocal.fMtime <= stcache.fMtime)) {
05871 docp = kFALSE;
05872 if (rc == 0) rc = -1;
05873 }
05874
05875 if (docp) {
05876 TMD5 *md5local = TMD5::FileChecksum(e);
05877 TMD5 *md5cache = TMD5::FileChecksum(fncache);
05878 if (md5local && md5cache && md5local == md5cache) docp = kFALSE;
05879 SafeDelete(md5local);
05880 SafeDelete(md5cache);
05881 if (!docp) rc = -2;
05882 }
05883
05884 if (docp) {
05885 gSystem->Exec(TString::Format("%s %s", kRM, fncache.Data()));
05886 PDB(kCache,1)
05887 Info("CopyToCache","caching %s ... (reason: %d)", e, rc);
05888 gSystem->Exec(TString::Format("%s %s %s", kCP, e, fncache.Data()));
05889 savever = kTRUE;
05890 }
05891 }
05892 }
05893 }
05894 gSystem->FreeDirectory(dirp);
05895 }
05896
05897 if (savever) {
05898 PDB(kCache,1)
05899 Info("CopyToCache","updating version file %s ...", vername.Data());
05900 FILE *f = fopen(TString::Format("%s/%s", fCacheDir.Data(), vername.Data()), "w");
05901 if (f) {
05902 fputs(gROOT->GetVersion(), f);
05903 fputs(TString::Format("\n%d",gROOT->GetSvnRevision()), f);
05904 fclose(f);
05905 }
05906 }
05907 }
05908 }
05909
05910
05911 if (!locked) fCacheLock->Unlock();
05912
05913
05914 return 0;
05915 }
05916
05917
05918 void TProofServ::MakePlayer()
05919 {
05920
05921
05922 TVirtualProofPlayer *p = 0;
05923
05924
05925 DeletePlayer();
05926
05927 if (IsParallel()) {
05928
05929 p = fProof->MakePlayer();
05930 } else {
05931
05932 p = TVirtualProofPlayer::Create("slave", 0, fSocket);
05933 if (IsMaster())
05934 fProof->SetPlayer(p);
05935 }
05936
05937
05938 fPlayer = p;
05939 }
05940
05941
05942 void TProofServ::DeletePlayer()
05943 {
05944
05945
05946 if (IsMaster()) {
05947 if (fProof) fProof->SetPlayer(0);
05948 } else {
05949 SafeDelete(fPlayer);
05950 }
05951 fPlayer = 0;
05952 }
05953
05954
05955 Int_t TProofServ::GetPriority()
05956 {
05957
05958
05959
05960
05961
05962
05963
05964
05965
05966
05967
05968
05969
05970
05971
05972
05973 TString sqlserv = gEnv->GetValue("ProofServ.QueryLogDB","");
05974 TString sqluser = gEnv->GetValue("ProofServ.QueryLogUser","");
05975 TString sqlpass = gEnv->GetValue("ProofServ.QueryLogPasswd","");
05976
05977 Int_t priority = 100;
05978
05979 if (sqlserv == "")
05980 return priority;
05981
05982 TString sql;
05983 sql.Form("SELECT priority WHERE group='%s' FROM proofpriority", fGroup.Data());
05984
05985
05986 TSQLServer *db = TSQLServer::Connect(sqlserv, sqluser, sqlpass);
05987
05988 if (!db || db->IsZombie()) {
05989 Error("GetPriority", "failed to connect to SQL server %s as %s %s",
05990 sqlserv.Data(), sqluser.Data(), sqlpass.Data());
05991 printf("%s\n", sql.Data());
05992 } else {
05993 TSQLResult *res = db->Query(sql);
05994
05995 if (!res) {
05996 Error("GetPriority", "query into proofpriority failed");
05997 printf("%s\n", sql.Data());
05998 } else {
05999 TSQLRow *row = res->Next();
06000 priority = atoi(row->GetField(0));
06001 delete row;
06002 }
06003 delete res;
06004 }
06005 delete db;
06006
06007 return priority;
06008 }
06009
06010
06011 Int_t TProofServ::SendAsynMessage(const char *msg, Bool_t lf)
06012 {
06013
06014
06015
06016
06017
06018 static TMessage m(kPROOF_MESSAGE);
06019
06020
06021
06022 PDB(kAsyn,1)
06023 Info("SendAsynMessage","%s", (msg ? msg : "(null)"));
06024
06025 if (fSocket && msg) {
06026 m.Reset(kPROOF_MESSAGE);
06027 m << TString(msg) << lf;
06028 return fSocket->Send(m);
06029 }
06030
06031
06032 return -1;
06033 }
06034
06035
06036 void TProofServ::FlushLogFile()
06037 {
06038
06039
06040
06041
06042 off_t lend = lseek(fileno(stdout), (off_t)0, SEEK_END);
06043 if (lend >= 0) lseek(fLogFileDes, lend, SEEK_SET);
06044 }
06045
06046
06047 void TProofServ::HandleException(Int_t sig)
06048 {
06049
06050
06051 Error("HandleException", "caugth exception triggered by signal '%d' %s",
06052 sig, fgLastMsg.Data());
06053
06054 TString emsg;
06055 emsg.Form("%s: caught exception triggered by signal '%d' %s",
06056 GetOrdinal(), sig, fgLastMsg.Data());
06057
06058 SendAsynMessage(emsg.Data());
06059
06060 gSystem->Exit(sig);
06061 }
06062
06063
06064 Int_t TProofServ::HandleDataSets(TMessage *mess, TString *slb)
06065 {
06066
06067
06068 if (gDebug > 0)
06069 Info("HandleDataSets", "enter");
06070
06071
06072 if (!fDataSetManager) {
06073 Warning("HandleDataSets", "no data manager is available to fullfil the request");
06074 return -1;
06075 }
06076
06077
06078 TString dsUser, dsGroup, dsName, dsTree, uri, opt;
06079 Int_t rc = 0;
06080
06081
06082 Int_t type = 0;
06083 (*mess) >> type;
06084
06085 switch (type) {
06086 case TProof::kCheckDataSetName:
06087
06088
06089 {
06090 (*mess) >> uri;
06091 if (slb) slb->Form("%d %s", type, uri.Data());
06092 if (fDataSetManager->ExistsDataSet(uri))
06093
06094 return -1;
06095 }
06096 break;
06097 case TProof::kRegisterDataSet:
06098
06099 {
06100 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
06101 (*mess) >> uri;
06102 (*mess) >> opt;
06103 if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06104
06105 TFileCollection *dataSet =
06106 dynamic_cast<TFileCollection*> ((mess->ReadObject(TFileCollection::Class())));
06107 if (!dataSet || dataSet->GetList()->GetSize() == 0) {
06108 Error("HandleDataSets", "can not save an empty list.");
06109 return -1;
06110 }
06111
06112 rc = fDataSetManager->RegisterDataSet(uri, dataSet, opt);
06113 delete dataSet;
06114 return rc;
06115 } else {
06116 Info("HandleDataSets", "dataset registration not allowed");
06117 if (slb) slb->Form("%d notallowed", type);
06118 return -1;
06119 }
06120 }
06121 break;
06122
06123 case TProof::kShowDataSets:
06124 {
06125 (*mess) >> uri >> opt;
06126 if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06127
06128 fDataSetManager->ShowDataSets(uri, opt);
06129 }
06130 break;
06131
06132 case TProof::kGetDataSets:
06133 {
06134 (*mess) >> uri >> opt;
06135 if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06136
06137 UInt_t omsk = (UInt_t)TDataSetManager::kExport;
06138 Ssiz_t kLite = opt.Index(":lite:", 0, TString::kIgnoreCase);
06139 if (kLite != kNPOS) {
06140 omsk |= (UInt_t)TDataSetManager::kReadShort;
06141 opt.Remove(kLite, strlen(":lite:"));
06142 }
06143 TMap *returnMap = fDataSetManager->GetDataSets(uri, omsk);
06144
06145 if (returnMap && !opt.IsNull()) {
06146
06147 TMap *rmap = new TMap;
06148 TObject *k = 0;
06149 TFileCollection *fc = 0, *xfc = 0;
06150 TIter nxd(returnMap);
06151 while ((k = nxd()) && (fc = (TFileCollection *) returnMap->GetValue(k))) {
06152
06153 if ((xfc = fc->GetFilesOnServer(opt.Data()))) {
06154 rmap->Add(new TObjString(k->GetName()), xfc);
06155 }
06156 }
06157 returnMap->DeleteAll();
06158 if (rmap->GetSize() > 0) {
06159 returnMap = rmap;
06160 } else {
06161 Info("HandleDataSets", "no dataset found on server '%s'", opt.Data());
06162 delete rmap;
06163 returnMap = 0;
06164 }
06165 }
06166 if (returnMap) {
06167
06168 fSocket->SendObject(returnMap, kMESS_OK);
06169 returnMap->DeleteAll();
06170 } else {
06171
06172 return -1;
06173 }
06174 }
06175 break;
06176 case TProof::kGetDataSet:
06177 {
06178 (*mess) >> uri >> opt;
06179 if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06180
06181 TFileCollection *fileList = fDataSetManager->GetDataSet(uri,opt);
06182 if (fileList) {
06183 fSocket->SendObject(fileList, kMESS_OK);
06184 delete fileList;
06185 } else {
06186
06187 return -1;
06188 }
06189 }
06190 break;
06191 case TProof::kRemoveDataSet:
06192 {
06193 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
06194 (*mess) >> uri;
06195 if (slb) slb->Form("%d %s", type, uri.Data());
06196 if (!fDataSetManager->RemoveDataSet(uri)) {
06197
06198 return -1;
06199 }
06200 } else {
06201 Info("HandleDataSets", "dataset creation / removal not allowed");
06202 if (slb) slb->Form("%d notallowed", type);
06203 return -1;
06204 }
06205 }
06206 break;
06207 case TProof::kVerifyDataSet:
06208 {
06209 if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
06210 (*mess) >> uri >> opt;
06211 if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06212 TProofServLogHandlerGuard hg(fLogFile, fSocket);
06213 rc = fDataSetManager->ScanDataSet(uri, opt);
06214
06215
06216
06217
06218
06219
06220 } else {
06221 Info("HandleDataSets", "dataset verification not allowed");
06222 return -1;
06223 }
06224 }
06225 break;
06226 case TProof::kGetQuota:
06227 {
06228 if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
06229 if (slb) slb->Form("%d", type);
06230 TMap *groupQuotaMap = fDataSetManager->GetGroupQuotaMap();
06231 if (groupQuotaMap) {
06232
06233 fSocket->SendObject(groupQuotaMap, kMESS_OK);
06234 } else {
06235 return -1;
06236 }
06237 } else {
06238 Info("HandleDataSets", "quota control disabled");
06239 if (slb) slb->Form("%d disabled", type);
06240 return -1;
06241 }
06242 }
06243 break;
06244 case TProof::kShowQuota:
06245 {
06246 if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
06247 if (slb) slb->Form("%d", type);
06248 (*mess) >> opt;
06249
06250 fDataSetManager->ShowQuota(opt);
06251 } else {
06252 Info("HandleDataSets", "quota control disabled");
06253 if (slb) slb->Form("%d disabled", type);
06254 }
06255 }
06256 break;
06257 case TProof::kSetDefaultTreeName:
06258 {
06259 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
06260 (*mess) >> uri;
06261 if (slb) slb->Form("%d %s", type, uri.Data());
06262 rc = fDataSetManager->ScanDataSet(uri, (UInt_t)TDataSetManager::kSetDefaultTree);
06263 } else {
06264 Info("HandleDataSets", "kSetDefaultTreeName: modification of dataset info not allowed");
06265 if (slb) slb->Form("%d notallowed", type);
06266 return -1;
06267 }
06268 }
06269 break;
06270 case TProof::kCache:
06271 {
06272 (*mess) >> uri >> opt;
06273 if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06274 if (opt == "show") {
06275
06276 fDataSetManager->ShowCache(uri);
06277 } else if (opt == "clear") {
06278
06279 fDataSetManager->ClearCache(uri);
06280 } else {
06281 Error("HandleDataSets", "kCache: unknown action: %s", opt.Data());
06282 }
06283 }
06284 break;
06285 default:
06286 rc = -1;
06287 Error("HandleDataSets", "unknown type %d", type);
06288 break;
06289 }
06290
06291
06292 return rc;
06293 }
06294
06295
06296 void TProofServ::HandleSubmerger(TMessage *mess)
06297 {
06298
06299
06300
06301 Int_t type = 0;
06302 (*mess) >> type;
06303
06304 TString msg;
06305 switch (type) {
06306 case TProof::kOutputSize:
06307 break;
06308
06309 case TProof::kSendOutput:
06310 {
06311 Bool_t deleteplayer = kTRUE;
06312 if (!IsMaster()) {
06313 if (fMergingMonitor) {
06314 Info("HandleSubmerger", "kSendOutput: interrupting ...");
06315 fMergingMonitor->Interrupt();
06316 }
06317 if (fMergingSocket) {
06318 if (fMergingMonitor) fMergingMonitor->Remove(fMergingSocket);
06319 fMergingSocket->Close();
06320 SafeDelete(fMergingSocket);
06321 }
06322
06323 TString name;
06324 Int_t port = 0;
06325 Int_t merger_id = -1;
06326 (*mess) >> merger_id >> name >> port;
06327 PDB(kSubmerger, 1)
06328 Info("HandleSubmerger","worker %s redirected to merger #%d %s:%d", fOrdinal.Data(), merger_id, name.Data(), port);
06329
06330 TSocket *t = 0;
06331 if (name.Length() > 0 && port > 0 && (t = new TSocket(name, port)) && t->IsValid()) {
06332
06333 PDB(kSubmerger, 2) Info("HandleSubmerger",
06334 "kSendOutput: worker asked for sending output to merger #%d %s:%d",
06335 merger_id, name.Data(), port);
06336
06337 if (SendResults(t, fPlayer->GetOutputList()) != 0) {
06338 msg.Form("worker %s cannot send results to merger #%d at %s:%d", GetPrefix(), merger_id, name.Data(), port);
06339 PDB(kSubmerger, 2) Info("HandleSubmerger",
06340 "kSendOutput: %s - inform the master", msg.Data());
06341 SendAsynMessage(msg);
06342
06343 TMessage answ(kPROOF_SUBMERGER);
06344 answ << Int_t(TProof::kMergerDown);
06345 answ << merger_id;
06346 fSocket->Send(answ);
06347 } else {
06348
06349 TMessage answ(kPROOF_SUBMERGER);
06350 answ << Int_t(TProof::kOutputSent);
06351 answ << merger_id;
06352 fSocket->Send(answ);
06353
06354 PDB(kSubmerger, 2) Info("HandleSubmerger", "kSendOutput: worker sent its output");
06355 fSocket->Send(kPROOF_SETIDLE);
06356 SetIdle(kTRUE);
06357 SendLogFile();
06358 }
06359 } else {
06360
06361 if (name == "master") {
06362 PDB(kSubmerger, 2) Info("HandleSubmerger",
06363 "kSendOutput: worker was asked for sending output to master");
06364 if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
06365 Warning("HandleSubmerger", "problems sending output list");
06366
06367 fSocket->Send(kPROOF_SETIDLE);
06368 SetIdle(kTRUE);
06369 SendLogFile();
06370
06371 } else if (!t || !(t->IsValid())) {
06372 msg.Form("worker %s could not open a valid socket to merger #%d at %s:%d",
06373 GetPrefix(), merger_id, name.Data(), port);
06374 PDB(kSubmerger, 2) Info("HandleSubmerger",
06375 "kSendOutput: %s - inform the master", msg.Data());
06376 SendAsynMessage(msg);
06377
06378 TMessage answ(kPROOF_SUBMERGER);
06379 answ << Int_t(TProof::kMergerDown);
06380 answ << merger_id;
06381 fSocket->Send(answ);
06382 deleteplayer = kFALSE;
06383 }
06384
06385 if (t) SafeDelete(t);
06386
06387 }
06388
06389 } else {
06390 Error("HandleSubmerger", "kSendOutput: received not on worker");
06391 }
06392
06393
06394 if (deleteplayer) DeletePlayer();
06395 }
06396 break;
06397 case TProof::kBeMerger:
06398 {
06399 Bool_t deleteplayer = kTRUE;
06400 if (!IsMaster()) {
06401 Int_t merger_id = -1;
06402
06403 Int_t connections = 0;
06404 (*mess) >> merger_id >> connections;
06405 PDB(kSubmerger, 2)
06406 Info("HandleSubmerger", "worker %s established as merger", fOrdinal.Data());
06407
06408 PDB(kSubmerger, 2)
06409 Info("HandleSubmerger",
06410 "kBeMerger: worker asked for being merger #%d for %d connections",
06411 merger_id, connections);
06412
06413 TVirtualProofPlayer *mergerPlayer = TVirtualProofPlayer::Create("remote",fProof,0);
06414 PDB(kSubmerger, 2) Info("HandleSubmerger",
06415 "kBeMerger: mergerPlayer created (%p) ", mergerPlayer);
06416
06417
06418 if (AcceptResults(connections, mergerPlayer)) {
06419 PDB(kSubmerger, 2)
06420 Info("HandleSubmerger", "kBeMerger: all outputs from workers accepted");
06421
06422 PDB(kSubmerger, 2)
06423 Info("","adding own output to the list on %s", fOrdinal.Data());
06424
06425
06426
06427
06428
06429
06430 TIter nxo(fPlayer->GetOutputList());
06431 TObject * o = 0;
06432 while ((o = nxo())) {
06433 if ((mergerPlayer->AddOutputObject(o) != 1)) {
06434
06435
06436 PDB(kSubmerger, 2) Info("HandleSocketInput", "removing merged object (%p)", o);
06437 fPlayer->GetOutputList()->Remove(o);
06438 }
06439 }
06440 PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: own outputs added");
06441 PDB(kSubmerger, 2) Info("HandleSubmerger","starting delayed merging on %s", fOrdinal.Data());
06442
06443
06444 mergerPlayer->MergeOutput();
06445
06446 PDB(kSubmerger, 2) Info("HandleSubmerger", "delayed merging on %s finished ", fOrdinal.Data());
06447 PDB(kSubmerger, 2) Info("HandleSubmerger", "%s sending results to master ", fOrdinal.Data());
06448
06449 if (SendResults(fSocket, mergerPlayer->GetOutputList()) != 0)
06450 Warning("HandleSubmerger","kBeMerger: problems sending output list");
06451 mergerPlayer->GetOutputList()->SetOwner(kTRUE);
06452 delete mergerPlayer;
06453
06454 PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: results sent to master");
06455
06456 fSocket->Send(kPROOF_SETIDLE);
06457 SetIdle(kTRUE);
06458 SendLogFile();
06459 } else {
06460
06461 TMessage answ(kPROOF_SUBMERGER);
06462 answ << Int_t(TProof::kMergerDown);
06463 answ << merger_id;
06464 fSocket->Send(answ);
06465 deleteplayer = kFALSE;
06466 }
06467 } else {
06468 Error("HandleSubmerger","kSendOutput: received not on worker");
06469 }
06470
06471
06472 if (deleteplayer) DeletePlayer();
06473 }
06474 break;
06475
06476 case TProof::kMergerDown:
06477 break;
06478
06479 case TProof::kStopMerging:
06480 {
06481
06482 PDB(kSubmerger, 2) Info("HandleSubmerger", "kStopMerging");
06483 if (fMergingMonitor) {
06484 Info("HandleSubmerger", "kStopMerging: interrupting ...");
06485 fMergingMonitor->Interrupt();
06486 }
06487 }
06488 break;
06489
06490 case TProof::kOutputSent:
06491 break;
06492 }
06493 }
06494
06495
06496 void TProofServ::HandleFork(TMessage *)
06497 {
06498
06499
06500 Info("HandleFork", "fork cloning not implemented");
06501 }
06502
06503
06504 Int_t TProofServ::Fork()
06505 {
06506
06507
06508
06509
06510
06511 #ifndef WIN32
06512
06513 pid_t pid;
06514 if ((pid = fork()) < 0) {
06515 Error("Fork", "failed to fork");
06516 return pid;
06517 }
06518
06519
06520 if (!pid) return pid;
06521
06522
06523 if (!fReaperTimer) {
06524 fReaperTimer = new TReaperTimer(1000);
06525 fReaperTimer->Start(-1);
06526 }
06527
06528
06529 fReaperTimer->AddPid(pid);
06530
06531
06532 return pid;
06533 #else
06534 Warning("Fork", "Functionality not provided under windows");
06535 return -1;
06536 #endif
06537 }
06538
06539
06540 void TProofServ::ResolveKeywords(TString &fname, const char *path)
06541 {
06542
06543
06544
06545 if (fname.Contains("<user>")) {
06546 if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
06547 fname.ReplaceAll("<user>", gProofServ->GetUser());
06548 } else {
06549 fname.ReplaceAll("<user>", "nouser");
06550 }
06551 }
06552
06553 if (fname.Contains("<u>")) {
06554 if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
06555 TString u(gProofServ->GetUser()[0]);
06556 fname.ReplaceAll("<u>", u);
06557 } else {
06558 fname.ReplaceAll("<u>", "n");
06559 }
06560 }
06561
06562 if (fname.Contains("<group>")) {
06563 if (gProofServ && gProofServ->GetGroup() && strlen(gProofServ->GetGroup()))
06564 fname.ReplaceAll("<group>", gProofServ->GetGroup());
06565 else
06566 fname.ReplaceAll("<group>", "default");
06567 }
06568
06569 if (fname.Contains("<stag>")) {
06570 if (gProofServ && gProofServ->GetSessionTag() && strlen(gProofServ->GetSessionTag()))
06571 fname.ReplaceAll("<stag>", gProofServ->GetSessionTag());
06572 else
06573 ::Warning("TProofServ::ResolveKeywords", "session tag undefined: ignoring");
06574 }
06575
06576 if (fname.Contains("<ord>")) {
06577 if (gProofServ && gProofServ->GetOrdinal() && strlen(gProofServ->GetOrdinal()))
06578 fname.ReplaceAll("<ord>", gProofServ->GetOrdinal());
06579 else
06580 ::Warning("TProofServ::ResolveKeywords", "ordinal number undefined: ignoring");
06581 }
06582
06583 if (fname.Contains("<qnum>")) {
06584 if (gProofServ && gProofServ->GetQuerySeqNum() && gProofServ->GetQuerySeqNum() > 0)
06585 fname.ReplaceAll("<qnum>", TString::Format("%d", gProofServ->GetQuerySeqNum()).Data());
06586 else
06587 ::Warning("TProofServ::ResolveKeywords", "query seqeuntial number undefined: ignoring");
06588 }
06589
06590 if (fname.Contains("<file>") && path && strlen(path) > 0) {
06591 fname.ReplaceAll("<file>", path);
06592 }
06593 }
06594
06595
06596 Int_t TProofServ::GetSessionStatus()
06597 {
06598
06599
06600
06601
06602
06603
06604
06605
06606 R__LOCKGUARD(fQMtx);
06607 Int_t st = (fIdle) ? 0 : 1;
06608 if (fIdle && fWaitingQueries->GetSize() > 0) st = 3;
06609 return st;
06610 }
06611
06612
06613 Int_t TProofServ::UpdateSessionStatus(Int_t xst)
06614 {
06615
06616
06617
06618
06619 FILE *fs = fopen(fAdminPath.Data(), "w");
06620 if (fs) {
06621 Int_t st = (xst < 0) ? GetSessionStatus() : xst;
06622 fprintf(fs, "%d", st);
06623 fclose(fs);
06624 PDB(kGlobal, 2)
06625 Info("UpdateSessionStatus", "status (=%d) update in path: %s", st, fAdminPath.Data());
06626 } else {
06627 return -errno;
06628 }
06629
06630 return 0;
06631 }
06632
06633
06634 Bool_t TProofServ::IsIdle()
06635 {
06636
06637 R__LOCKGUARD(fQMtx);
06638 return fIdle;
06639 }
06640
06641
06642 void TProofServ::SetIdle(Bool_t st)
06643 {
06644
06645 R__LOCKGUARD(fQMtx);
06646 fIdle = st;
06647 }
06648
06649
06650 Bool_t TProofServ::IsWaiting()
06651 {
06652
06653 R__LOCKGUARD(fQMtx);
06654 if (fIdle && fWaitingQueries->GetSize() > 0) return kTRUE;
06655 return kFALSE;
06656 }
06657
06658
06659 Int_t TProofServ::WaitingQueries()
06660 {
06661
06662 R__LOCKGUARD(fQMtx);
06663 return fWaitingQueries->GetSize();
06664 }
06665
06666
06667 Int_t TProofServ::QueueQuery(TProofQueryResult *pq)
06668 {
06669
06670
06671 R__LOCKGUARD(fQMtx);
06672 fWaitingQueries->Add(pq);
06673 return fWaitingQueries->GetSize();
06674 }
06675
06676
06677 TProofQueryResult *TProofServ::NextQuery()
06678 {
06679
06680
06681 R__LOCKGUARD(fQMtx);
06682 TProofQueryResult *pq = (TProofQueryResult *) fWaitingQueries->First();
06683 fWaitingQueries->Remove(pq);
06684 return pq;
06685 }
06686
06687
06688 Int_t TProofServ::CleanupWaitingQueries(Bool_t del, TList *qls)
06689 {
06690
06691
06692
06693 R__LOCKGUARD(fQMtx);
06694 Int_t ncq = 0;
06695 if (qls) {
06696 TIter nxq(qls);
06697 TObject *o = 0;
06698 while ((o = nxq())) {
06699 if (fWaitingQueries->FindObject(o)) ncq++;
06700 fWaitingQueries->Remove(o);
06701 if (del) delete o;
06702 }
06703 } else {
06704 ncq = fWaitingQueries->GetSize();
06705 fWaitingQueries->SetOwner(del);
06706 fWaitingQueries->Delete();
06707 }
06708
06709 return ncq;
06710 }
06711
06712
06713 void TProofServ::SetLastMsg(const char *lastmsg)
06714 {
06715
06716
06717 fgLastMsg = lastmsg;
06718 }
06719
06720
06721 Long_t TProofServ::GetVirtMemMax()
06722 {
06723
06724 return fgVirtMemMax;
06725 }
06726
06727 Long_t TProofServ::GetResMemMax()
06728 {
06729
06730 return fgResMemMax;
06731 }
06732
06733 Float_t TProofServ::GetMemHWM()
06734 {
06735
06736 return fgMemHWM;
06737 }
06738
06739 Float_t TProofServ::GetMemStop()
06740 {
06741
06742 return fgMemStop;
06743 }
06744
06745
06746 Int_t TProofLockPath::Lock()
06747 {
06748
06749
06750
06751 const char *pname = GetName();
06752
06753 if (gSystem->AccessPathName(pname))
06754 fLockId = open(pname, O_CREAT|O_RDWR, 0644);
06755 else
06756 fLockId = open(pname, O_RDWR);
06757
06758 if (fLockId == -1) {
06759 SysError("Lock", "cannot open lock file %s", pname);
06760 return -1;
06761 }
06762
06763 PDB(kPackage, 2)
06764 Info("Lock", "%d: locking file %s ...", gSystem->GetPid(), pname);
06765
06766 #if !defined(R__WIN32) && !defined(R__WINGCC)
06767 if (lockf(fLockId, F_LOCK, (off_t) 1) == -1) {
06768 SysError("Lock", "error locking %s", pname);
06769 close(fLockId);
06770 fLockId = -1;
06771 return -1;
06772 }
06773 #endif
06774
06775 PDB(kPackage, 2)
06776 Info("Lock", "%d: file %s locked", gSystem->GetPid(), pname);
06777
06778 return 0;
06779 }
06780
06781
06782 Int_t TProofLockPath::Unlock()
06783 {
06784
06785
06786
06787 if (!IsLocked())
06788 return 0;
06789
06790 PDB(kPackage, 2)
06791 Info("Lock", "%d: unlocking file %s ...", gSystem->GetPid(), GetName());
06792
06793 lseek(fLockId, 0, SEEK_SET);
06794 #if !defined(R__WIN32) && !defined(R__WINGCC)
06795 if (lockf(fLockId, F_ULOCK, (off_t)1) == -1) {
06796 SysError("Unlock", "error unlocking %s", GetName());
06797 close(fLockId);
06798 fLockId = -1;
06799 return -1;
06800 }
06801 #endif
06802
06803 PDB(kPackage, 2)
06804 Info("Unlock", "%d: file %s unlocked", gSystem->GetPid(), GetName());
06805
06806 close(fLockId);
06807 fLockId = -1;
06808
06809 return 0;
06810 }