TProofServ.cxx

Go to the documentation of this file.
00001 // @(#)root/proof:$Id: TProofServ.cxx 37943 2011-02-02 14:36:55Z ganis $
00002 // Author: Fons Rademakers   16/02/97
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TProofServ                                                           //
00015 //                                                                      //
00016 // TProofServ is the PROOF server. It can act either as the master      //
00017 // server or as a slave server, depending on its startup arguments. It  //
00018 // receives and handles message coming from the client or from the      //
00019 // master server.                                                       //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 #include "RConfigure.h"
00024 #include "RConfig.h"
00025 #include "Riostream.h"
00026 
00027 #ifdef WIN32
00028    #include <process.h>
00029    #include <io.h>
00030    #include "snprintf.h"
00031    typedef long off_t;
00032 #endif
00033 #include <errno.h>
00034 #include <time.h>
00035 #include <fcntl.h>
00036 #include <sys/types.h>
00037 #include <sys/stat.h>
00038 #ifndef WIN32
00039 #include <sys/wait.h>
00040 #endif
00041 #include <cstdlib>
00042 
00043 // To handle exceptions
00044 #include <exception>
00045 #include <new>
00046 
00047 #if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
00048     (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
00049      (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
00050 #include <sys/file.h>
00051 #define lockf(fd, op, sz)   flock((fd), (op))
00052 #ifndef F_LOCK
00053 #define F_LOCK             (LOCK_EX | LOCK_NB)
00054 #endif
00055 #ifndef F_ULOCK
00056 #define F_ULOCK             LOCK_UN
00057 #endif
00058 #endif
00059 
00060 #include "TProofServ.h"
00061 #include "TDSetProxy.h"
00062 #include "TEnv.h"
00063 #include "TError.h"
00064 #include "TEventList.h"
00065 #include "TEntryList.h"
00066 #include "TException.h"
00067 #include "TFile.h"
00068 #include "THashList.h"
00069 #include "TInterpreter.h"
00070 #include "TKey.h"
00071 #include "TMessage.h"
00072 #include "TVirtualPerfStats.h"
00073 #include "TProofDebug.h"
00074 #include "TProof.h"
00075 #include "TVirtualProofPlayer.h"
00076 #include "TProofQueryResult.h"
00077 #include "TQueryResultManager.h"
00078 #include "TRegexp.h"
00079 #include "TROOT.h"
00080 #include "TSocket.h"
00081 #include "TStopwatch.h"
00082 #include "TSystem.h"
00083 #include "TTimeStamp.h"
00084 #include "TUrl.h"
00085 #include "TPluginManager.h"
00086 #include "TObjString.h"
00087 #include "compiledata.h"
00088 #include "TProofResourcesStatic.h"
00089 #include "TProofNodeInfo.h"
00090 #include "TFileInfo.h"
00091 #include "TMutex.h"
00092 #include "TClass.h"
00093 #include "TSQLServer.h"
00094 #include "TSQLResult.h"
00095 #include "TSQLRow.h"
00096 #include "TPRegexp.h"
00097 #include "TParameter.h"
00098 #include "TMap.h"
00099 #include "TSortedList.h"
00100 #include "TParameter.h"
00101 #include "TFileCollection.h"
00102 #include "TLockFile.h"
00103 #include "TDataSetManagerFile.h"
00104 #include "TProofProgressStatus.h"
00105 #include "TServerSocket.h"
00106 #include "TMonitor.h"
00107 #include "TFunction.h"
00108 #include "TMethodArg.h"
00109 #include "TMethodCall.h"
00110 
00111 // global proofserv handle
00112 TProofServ *gProofServ = 0;
00113 
00114 // debug hook
00115 static volatile Int_t gProofServDebug = 1;
00116 
00117 // Syslog control
00118 Int_t TProofServ::fgLogToSysLog = 0;
00119 TString TProofServ::fgSysLogService("proof");
00120 TString TProofServ::fgSysLogEntity("undef:default");
00121 
00122 // File where to log: default stderr
00123 FILE *TProofServ::fgErrorHandlerFile = 0;
00124 
00125 // To control allowed actions while processing
00126 Int_t TProofServ::fgRecursive = 0;
00127 
00128 // Last message before exceptions
00129 TString TProofServ::fgLastMsg("<undef>");
00130 
00131 // Memory controllers
00132 Long_t TProofServ::fgVirtMemMax = -1;
00133 Long_t TProofServ::fgResMemMax = -1;
00134 Float_t TProofServ::fgMemHWM = 0.80;
00135 Float_t TProofServ::fgMemStop = 0.95;
00136 
00137 //----- Termination signal handler ---------------------------------------------
00138 //______________________________________________________________________________
00139 class TProofServTerminationHandler : public TSignalHandler {
00140    TProofServ  *fServ;
00141 public:
00142    TProofServTerminationHandler(TProofServ *s)
00143       : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
00144    Bool_t  Notify();
00145 };
00146 
00147 //______________________________________________________________________________
00148 Bool_t TProofServTerminationHandler::Notify()
00149 {
00150    // Handle this interrupt
00151 
00152    Printf("Received SIGTERM: terminating");
00153    fServ->HandleTermination();
00154    return kTRUE;
00155 }
00156 
00157 //----- Interrupt signal handler -----------------------------------------------
00158 //______________________________________________________________________________
00159 class TProofServInterruptHandler : public TSignalHandler {
00160    TProofServ  *fServ;
00161 public:
00162    TProofServInterruptHandler(TProofServ *s)
00163       : TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
00164    Bool_t  Notify();
00165 };
00166 
00167 //______________________________________________________________________________
00168 Bool_t TProofServInterruptHandler::Notify()
00169 {
00170    // Handle this interrupt
00171 
00172    fServ->HandleUrgentData();
00173    if (TROOT::Initialized()) {
00174       Throw(GetSignal());
00175    }
00176    return kTRUE;
00177 }
00178 
00179 //----- SigPipe signal handler -------------------------------------------------
00180 //______________________________________________________________________________
00181 class TProofServSigPipeHandler : public TSignalHandler {
00182    TProofServ  *fServ;
00183 public:
00184    TProofServSigPipeHandler(TProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
00185       { fServ = s; }
00186    Bool_t  Notify();
00187 };
00188 
00189 //______________________________________________________________________________
00190 Bool_t TProofServSigPipeHandler::Notify()
00191 {
00192    // Handle this signal
00193 
00194    fServ->HandleSigPipe();
00195    return kTRUE;
00196 }
00197 
00198 //----- Input handler for messages from parent or master -----------------------
00199 //______________________________________________________________________________
00200 class TProofServInputHandler : public TFileHandler {
00201    TProofServ  *fServ;
00202 public:
00203    TProofServInputHandler(TProofServ *s, Int_t fd) : TFileHandler(fd, 1)
00204       { fServ = s; }
00205    Bool_t Notify();
00206    Bool_t ReadNotify() { return Notify(); }
00207 };
00208 
00209 //______________________________________________________________________________
00210 Bool_t TProofServInputHandler::Notify()
00211 {
00212    // Handle this input
00213 
00214    fServ->HandleSocketInput();
00215    return kTRUE;
00216 }
00217 
00218 TString TProofServLogHandler::fgPfx = ""; // Default prefix to be prepended to messages
00219 Int_t TProofServLogHandler::fgCmdRtn = 0; // Return code of the command execution (available only
00220                                           // after closing the pipe)
00221 //______________________________________________________________________________
00222 TProofServLogHandler::TProofServLogHandler(const char *cmd,
00223                                              TSocket *s, const char *pfx)
00224                      : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
00225 {
00226    // Execute 'cmd' in a pipe and handle output messages from the related file
00227 
00228    ResetBit(kFileIsPipe);
00229    fgCmdRtn = 0;
00230    fFile = 0;
00231    if (s && cmd) {
00232       fFile = gSystem->OpenPipe(cmd, "r");
00233       if (fFile) {
00234          SetFd(fileno(fFile));
00235          // Notify what already in the file
00236          Notify();
00237          // Used in the destructor
00238          SetBit(kFileIsPipe);
00239       } else {
00240          fSocket = 0;
00241          Error("TProofServLogHandler", "executing command in pipe");
00242          fgCmdRtn = -1;
00243       }
00244    } else {
00245       Error("TProofServLogHandler",
00246             "undefined command (%p) or socket (%p)", (int *)cmd, s);
00247    }
00248 }
00249 //______________________________________________________________________________
00250 TProofServLogHandler::TProofServLogHandler(FILE *f, TSocket *s, const char *pfx)
00251                      : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
00252 {
00253    // Handle available message from the open file 'f'
00254 
00255    ResetBit(kFileIsPipe);
00256    fgCmdRtn = 0;
00257    fFile = 0;
00258    if (s && f) {
00259       fFile = f;
00260       SetFd(fileno(fFile));
00261       // Notify what already in the file
00262       Notify();
00263    } else {
00264       Error("TProofServLogHandler", "undefined file (%p) or socket (%p)", f, s);
00265    }
00266 }
00267 //______________________________________________________________________________
00268 TProofServLogHandler::~TProofServLogHandler()
00269 {
00270    // Handle available message in the open file
00271 
00272    if (TestBit(kFileIsPipe) && fFile) {
00273       Int_t rc = gSystem->ClosePipe(fFile);
00274 #ifdef WIN32
00275       fgCmdRtn = rc;
00276 #else
00277       fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
00278 #endif
00279    }
00280    fFile = 0;
00281    fSocket = 0;
00282    ResetBit(kFileIsPipe);
00283 }
00284 //______________________________________________________________________________
00285 Bool_t TProofServLogHandler::Notify()
00286 {
00287    // Handle available message in the open file
00288 
00289    if (IsValid()) {
00290       TMessage m(kPROOF_MESSAGE);
00291       // Read buffer
00292       char line[4096];
00293       char *plf = 0;
00294       while (fgets(line, sizeof(line), fFile)) {
00295          if ((plf = strchr(line, '\n')))
00296             *plf = 0;
00297          // Create log string
00298          TString log;
00299          if (fPfx.Length() > 0) {
00300             // Prepend prefix specific to this instance
00301             log.Form("%s: %s", fPfx.Data(), line);
00302          } else if (fgPfx.Length() > 0) {
00303             // Prepend default prefix
00304             log.Form("%s: %s", fgPfx.Data(), line);
00305          } else {
00306             // Nothing to prepend
00307             log = line;
00308          }
00309          // Send the message one level up
00310          m.Reset(kPROOF_MESSAGE);
00311          m << log;
00312          fSocket->Send(m);
00313       }
00314    }
00315    return kTRUE;
00316 }
00317 //______________________________________________________________________________
00318 void TProofServLogHandler::SetDefaultPrefix(const char *pfx)
00319 {
00320    // Static method to set the default prefix
00321 
00322    fgPfx = pfx;
00323 }
00324 //______________________________________________________________________________
00325 Int_t TProofServLogHandler::GetCmdRtn()
00326 {
00327    // Static method to get the return code from the execution of a command via
00328    // the pipe. This is always 0 when the log handler is not used with a pipe
00329 
00330    return fgCmdRtn;
00331 }
00332 
00333 //______________________________________________________________________________
00334 TProofServLogHandlerGuard::TProofServLogHandlerGuard(const char *cmd, TSocket *s,
00335                                                      const char *pfx, Bool_t on)
00336 {
00337    // Init a guard for executing a command in a pipe
00338 
00339    fExecHandler = 0;
00340    if (cmd && on) {
00341       fExecHandler = new TProofServLogHandler(cmd, s, pfx);
00342       if (fExecHandler->IsValid()) {
00343          gSystem->AddFileHandler(fExecHandler);
00344       } else {
00345          Error("TProofServLogHandlerGuard","invalid handler");
00346       }
00347    } else {
00348       if (on)
00349          Error("TProofServLogHandlerGuard","undefined command");
00350    }
00351 }
00352 
00353 //______________________________________________________________________________
00354 TProofServLogHandlerGuard::TProofServLogHandlerGuard(FILE *f, TSocket *s,
00355                                                      const char *pfx, Bool_t on)
00356 {
00357    // Init a guard for executing a command in a pipe
00358 
00359    fExecHandler = 0;
00360    if (f && on) {
00361       fExecHandler = new TProofServLogHandler(f, s, pfx);
00362       if (fExecHandler->IsValid()) {
00363          gSystem->AddFileHandler(fExecHandler);
00364       } else {
00365          Error("TProofServLogHandlerGuard","invalid handler");
00366       }
00367    } else {
00368       if (on)
00369          Error("TProofServLogHandlerGuard","undefined file");
00370    }
00371 }
00372 
00373 //______________________________________________________________________________
00374 TProofServLogHandlerGuard::~TProofServLogHandlerGuard()
00375 {
00376    // Close a guard for executing a command in a pipe
00377 
00378    if (fExecHandler && fExecHandler->IsValid()) {
00379       gSystem->RemoveFileHandler(fExecHandler);
00380       SafeDelete(fExecHandler);
00381    }
00382 }
00383 
00384 //--- Special timer to control delayed shutdowns ----------------------------//
00385 //______________________________________________________________________________
00386 Bool_t TShutdownTimer::Notify()
00387 {
00388    // Handle expiration of the shutdown timer. In the case of low activity the
00389    // process will be aborted.
00390 
00391    if (gDebug > 0)
00392       Info ("Notify","checking activity on the input socket");
00393 
00394    // Check activity on the socket
00395    TSocket *xs = 0;
00396    if (fProofServ && (xs = fProofServ->GetSocket())) {
00397       TTimeStamp now;
00398       TTimeStamp ts = xs->GetLastUsage();
00399       Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
00400                   (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
00401       Int_t to = gEnv->GetValue("ProofServ.ShutdonwTimeout", 20);
00402       if (dt > to * 60000) {
00403          Printf("TShutdownTimer::Notify: input socket: %p: did not show any activity"
00404                          " during the last %d mins: aborting", xs, to);
00405          // At this point we lost our controller: we need to abort to avoid
00406          // hidden timeouts or loops
00407          gSystem->Abort();
00408       } else {
00409          if (gDebug > 0)
00410             Info("Notify", "input socket: %p: show activity"
00411                            " %ld secs ago", xs, dt / 60000);
00412       }
00413    }
00414    Start(-1, kFALSE);
00415    return kTRUE;
00416 }
00417 
00418 //--- Synchronous timer used to reap children processes change of state ------//
00419 //______________________________________________________________________________
00420 TReaperTimer::~TReaperTimer()
00421 {
00422    // Destructor
00423 
00424    if (fChildren) {
00425       fChildren->SetOwner(kTRUE);
00426       delete fChildren;
00427       fChildren = 0;
00428    }
00429 }
00430 
00431 //______________________________________________________________________________
00432 void TReaperTimer::AddPid(Int_t pid)
00433 {
00434    // Add an entry for 'pid' in the internal list
00435 
00436    if (pid > 0) {
00437       if (!fChildren)
00438          fChildren = new TList;
00439       TString spid;
00440       spid.Form("%d", pid);
00441       fChildren->Add(new TParameter<Int_t>(spid.Data(), pid));
00442       TurnOn();
00443    }
00444 }
00445 
00446 //______________________________________________________________________________
00447 Bool_t TReaperTimer::Notify()
00448 {
00449    // Check if any of the registered children has changed its state.
00450    // Unregister those that are gone.
00451 
00452    if (fChildren) {
00453       TIter nxp(fChildren);
00454       TParameter<Int_t> *p = 0;
00455       while ((p = (TParameter<Int_t> *)nxp())) {
00456          int status;
00457 #ifndef WIN32
00458          pid_t pid;
00459          do {
00460             pid = waitpid(p->GetVal(), &status, WNOHANG);
00461          } while (pid < 0 && errno == EINTR);
00462 #else
00463          intptr_t pid;
00464          pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
00465 #endif
00466          if (pid > 0 && pid == p->GetVal()) {
00467             // Remove from the list
00468             fChildren->Remove(p);
00469             delete p;
00470          }
00471       }
00472    }
00473 
00474    // Stop the timer if no children
00475    if (!fChildren || fChildren->GetSize() <= 0) {
00476       Stop();
00477    } else {
00478       // Needed for the next shot
00479       Reset();
00480    }
00481    return kTRUE;
00482 }
00483 
00484 //--- Special timer to to terminate idle sessions ----------------------------//
00485 //______________________________________________________________________________
00486 Bool_t TIdleTOTimer::Notify()
00487 {
00488    // Handle expiration of the idle timer. The session will just be terminated.
00489 
00490    Info ("Notify", "session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
00491 
00492    if (fProofServ) {
00493       // Set the status to timed-out
00494       Int_t uss_rc = -1;
00495       if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
00496          Warning("Notify", "problems updating session status (errno: %d)", -uss_rc);
00497       // Send a terminate request
00498       TString msg;
00499       if (fProofServ->GetProtocol() < 29) {
00500          msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
00501                   "// Please IGNORE any error message possibly displayed below\n//",
00502                   gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
00503       } else {
00504          msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
00505                   gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
00506       }
00507       fProofServ->SendAsynMessage(msg.Data());
00508       fProofServ->Terminate(0);
00509       Reset();
00510       Stop();
00511    } else {
00512       Warning("Notify", "fProofServ undefined!");
00513       Start(-1, kTRUE);
00514    }
00515    return kTRUE;
00516 }
00517 
00518 ClassImp(TProofServ)
00519 
00520 // Hook to the constructor. This is needed to avoid using the plugin manager
00521 // which may create problems in multi-threaded environments.
00522 extern "C" {
00523    TApplication *GetTProofServ(Int_t *argc, char **argv, FILE *flog)
00524    { return new TProofServ(argc, argv, flog); }
00525 }
00526 
00527 //______________________________________________________________________________
00528 TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog)
00529        : TApplication("proofserv", argc, argv, 0, -1)
00530 {
00531    // Main constructor. Create an application environment. The TProofServ
00532    // environment provides an eventloop via inheritance of TApplication.
00533    // Actual server creation work is done in CreateServer() to allow
00534    // overloading.
00535 
00536    // Read session specific rootrc file
00537    TString rcfile = gSystem->Getenv("ROOTRCFILE") ? gSystem->Getenv("ROOTRCFILE")
00538                                                   : "session.rootrc";
00539    if (!gSystem->AccessPathName(rcfile, kReadPermission))
00540       gEnv->ReadFile(rcfile, kEnvChange);
00541 
00542    // Upper limit on Virtual Memory (in kB)
00543    fgVirtMemMax = gEnv->GetValue("Proof.VirtMemMax",-1);
00544    if (fgVirtMemMax < 0 && gSystem->Getenv("PROOF_VIRTMEMMAX")) {
00545       Long_t mmx = strtol(gSystem->Getenv("PROOF_VIRTMEMMAX"), 0, 10);
00546       if (mmx < kMaxLong && mmx > 0)
00547          fgVirtMemMax = mmx * 1024;
00548    }
00549    // Old variable for backward compatibility
00550    if (fgVirtMemMax < 0 && gSystem->Getenv("ROOTPROOFASHARD")) {
00551       Long_t mmx = strtol(gSystem->Getenv("ROOTPROOFASHARD"), 0, 10);
00552       if (mmx < kMaxLong && mmx > 0)
00553          fgVirtMemMax = mmx * 1024;
00554    }
00555    // Upper limit on Resident Memory (in kB)
00556    fgResMemMax = gEnv->GetValue("Proof.ResMemMax",-1);
00557    if (fgResMemMax < 0 && gSystem->Getenv("PROOF_RESMEMMAX")) {
00558       Long_t mmx = strtol(gSystem->Getenv("PROOF_RESMEMMAX"), 0, 10);
00559       if (mmx < kMaxLong && mmx > 0)
00560          fgResMemMax = mmx * 1024;
00561    }
00562    // Thresholds for warnings and stop processing
00563    fgMemStop = gEnv->GetValue("Proof.MemStop", 0.95);
00564    fgMemHWM = gEnv->GetValue("Proof.MemHWM", 0.80);
00565    if (fgVirtMemMax > 0 || fgResMemMax > 0) {
00566       if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
00567          Warning("TProofServ", "requested memory fraction threshold to stop processing"
00568                                " (MemStop) out of range [0,1] - ignoring");
00569          fgMemStop = 0.95;
00570       }
00571       if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
00572          Warning("TProofServ", "requested memory fraction threshold for warning and finer monitoring"
00573                                " (MemHWM) out of range [0,MemStop] - ignoring");
00574          fgMemHWM = 0.80;
00575       }
00576    }   
00577 
00578    // Wait (loop) to allow debugger to connect
00579    Bool_t test = (*argc >= 4 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
00580    if ((gEnv->GetValue("Proof.GdbHook",0) == 3 && !test) ||
00581        (gEnv->GetValue("Proof.GdbHook",0) == 4 && test)) {
00582       while (gProofServDebug)
00583          ;
00584    }
00585 
00586    // Test instance
00587    if (*argc >= 4)
00588       if (!strcmp(argv[3], "test"))
00589          fService = "prooftest";
00590 
00591    // crude check on number of arguments
00592    if (*argc < 2) {
00593       Error("TProofServ", "Must have at least 1 arguments (see  proofd).");
00594       exit(1);
00595    }
00596 
00597    // Set global to this instance
00598    gProofServ = this;
00599 
00600    // Log control flags
00601    fSendLogToMaster = kFALSE;
00602 
00603    // Abort on higher than kSysError's and set error handler
00604    gErrorAbortLevel = kSysError + 1;
00605    SetErrorHandlerFile(stderr);
00606    SetErrorHandler(ErrorHandler);
00607 
00608    fNcmd            = 0;
00609    fGroupPriority   = 100;
00610    fInterrupt       = kFALSE;
00611    fProtocol        = 0;
00612    fOrdinal         = gEnv->GetValue("ProofServ.Ordinal", "-1");
00613    fGroupId         = -1;
00614    fGroupSize       = 0;
00615    fRealTime        = 0.0;
00616    fCpuTime         = 0.0;
00617    fProof           = 0;
00618    fPlayer          = 0;
00619    fSocket          = 0;
00620    fEnabledPackages = new TList;
00621    fEnabledPackages->SetOwner();
00622 
00623    fTotSessions     = -1;
00624    fActSessions     = -1;
00625    fEffSessions     = -1.;
00626 
00627    fGlobalPackageDirList = 0;
00628 
00629    fLogFile         = flog;
00630    fLogFileDes      = -1;
00631 
00632    fArchivePath     = "";
00633    // Init lockers
00634    fPackageLock     = 0;
00635    fCacheLock       = 0;
00636    fQueryLock       = 0;
00637 
00638    fQMgr            = 0;
00639    fQMtx            = new TMutex(kTRUE);
00640    fWaitingQueries  = new TList;
00641    fIdle            = kTRUE;
00642    fQuerySeqNum     = -1;
00643 
00644    fQueuedMsg       = new TList;
00645 
00646    fRealTimeLog     = kFALSE;
00647 
00648    fShutdownTimer   = 0;
00649    fReaperTimer     = 0;
00650    fIdleTOTimer     = 0;
00651 
00652    fInflateFactor   = 1000;
00653 
00654    fDataSetManager  = 0; // Initialized in Setup()
00655 
00656    fInputHandler    = 0;
00657 
00658    // Quotas disabled by default
00659    fMaxQueries      = -1;
00660    fMaxBoxSize      = -1;
00661    fHWMBoxSize      = -1;
00662 
00663    // Submerger quantities
00664    fMergingSocket   = 0;
00665    fMergingMonitor  = 0;
00666    fMergedWorkers   = 0;
00667 
00668    // Bit to flg high-memory footprint
00669    ResetBit(TProofServ::kHighMemory);
00670    
00671    // Max message size
00672    fMsgSizeHWM = gEnv->GetValue("ProofServ.MsgSizeHWM", 1000000);
00673 
00674    // Message compression
00675    fCompressMsg     = gEnv->GetValue("ProofServ.CompressMessage", 0);
00676 
00677    gProofDebugLevel = gEnv->GetValue("Proof.DebugLevel",0);
00678    fLogLevel = gProofDebugLevel;
00679 
00680    gProofDebugMask = (TProofDebug::EProofDebugMask) gEnv->GetValue("Proof.DebugMask",~0);
00681    if (gProofDebugLevel > 0)
00682       Info("TProofServ", "DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
00683 
00684    // Parse options
00685    GetOptions(argc, argv);
00686 
00687    // Default prefix in the form '<role>-<ordinal>'
00688    fPrefix = (IsMaster() ? "Mst-" : "Wrk-");
00689    if (test) fPrefix = "Test";
00690    if (fOrdinal != "-1")
00691       fPrefix += fOrdinal;
00692    TProofServLogHandler::SetDefaultPrefix(fPrefix);
00693 
00694    // Syslog control
00695    TString slog = gEnv->GetValue("ProofServ.LogToSysLog", "");
00696    if (!(slog.IsNull())) {
00697       if (slog.IsDigit()) {
00698          fgLogToSysLog = slog.Atoi();
00699       } else {
00700          char c = (slog[0] == 'M' || slog[0] == 'm') ? 'm' : 'a';
00701          c = (slog[0] == 'W' || slog[0] == 'w') ? 'w' : c;
00702          Bool_t dosyslog = ((c == 'm' && IsMaster()) ||
00703                             (c == 'w' && !IsMaster()) || c == 'a') ? kTRUE : kFALSE;
00704          if (dosyslog) {
00705             slog.Remove(0,1);
00706             if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
00707             if (fgLogToSysLog <= 0)
00708                Warning("TProofServ", "request for syslog logging ineffective!");
00709          }
00710       }
00711    }
00712    // Initialize proper service if required
00713    if (fgLogToSysLog > 0) {
00714       fgSysLogService = (IsMaster()) ? "proofm" : "proofw";
00715       if (fOrdinal != "-1") fgSysLogService += TString::Format("-%s", fOrdinal.Data());
00716       gSystem->Openlog(fgSysLogService, kLogPid | kLogCons, kLogLocal5);
00717    }
00718 
00719    // Enable optimized sending of streamer infos to use embedded backward/forward
00720    // compatibility support between different ROOT versions and different versions of
00721    // users classes
00722    Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
00723    if (enableSchemaEvolution) {
00724       TMessage::EnableSchemaEvolutionForAll();
00725    } else {
00726       Info("TProofServ", "automatic schema evolution in TMessage explicitely disabled");
00727    }
00728 }
00729 
00730 //______________________________________________________________________________
00731 Int_t TProofServ::CreateServer()
00732 {
00733    // Finalize the server setup. If master, create the TProof instance to talk
00734    // to the worker or submaster nodes.
00735    // Return 0 on success, -1 on error
00736 
00737    // Get socket to be used (setup in proofd)
00738    TString opensock = gSystem->Getenv("ROOTOPENSOCK");
00739    if (opensock.Length() <= 0)
00740       opensock = gEnv->GetValue("ProofServ.OpenSock", "-1");
00741    Int_t sock = opensock.Atoi();
00742    if (sock <= 0) {
00743       Fatal("CreateServer", "Invalid socket descriptor number (%d)", sock);
00744       return -1;
00745    }
00746    fSocket = new TSocket(sock);
00747 
00748    // Set compression level, if any
00749    fSocket->SetCompressionLevel(fCompressMsg);
00750 
00751    // debug hooks
00752    if (IsMaster()) {
00753       // wait (loop) in master to allow debugger to connect
00754       if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
00755          while (gProofServDebug)
00756             ;
00757       }
00758    } else {
00759       // wait (loop) in slave to allow debugger to connect
00760       if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
00761          while (gProofServDebug)
00762             ;
00763       }
00764    }
00765 
00766    if (gProofDebugLevel > 0)
00767       Info("CreateServer", "Service %s ConfDir %s IsMaster %d\n",
00768            GetService(), GetConfDir(), (Int_t)fMasterServ);
00769 
00770    if (Setup() != 0) {
00771       // Setup failure
00772       LogToMaster();
00773       SendLogFile();
00774       Terminate(0);
00775       return -1;
00776    }
00777 
00778    // Set the default prefix in the form '<role>-<ordinal>' (it was already done
00779    // in the constructor, but for standard PROOF the ordinal number is only set in
00780    // Setup(), so we need to do it again here)
00781    TString pfx = (IsMaster() ? "Mst-" : "Wrk-");
00782    pfx += GetOrdinal();
00783    TProofServLogHandler::SetDefaultPrefix(pfx);
00784 
00785    if (!fLogFile) {
00786       RedirectOutput();
00787       // If for some reason we failed setting a redirection fole for the logs
00788       // we cannot continue
00789       if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
00790          LogToMaster();
00791          SendLogFile(-98);
00792          Terminate(0);
00793          return -1;
00794       }
00795    } else {
00796       // Use the file already open by pmain
00797       if ((fLogFileDes = fileno(fLogFile)) < 0) {
00798          LogToMaster();
00799          SendLogFile(-98);
00800          Terminate(0);
00801          return -1;
00802       }
00803    }
00804 
00805    // Send message of the day to the client
00806    if (IsMaster()) {
00807       if (CatMotd() == -1) {
00808          LogToMaster();
00809          SendLogFile(-99);
00810          Terminate(0);
00811          return -1;
00812       }
00813    }
00814 
00815    // Everybody expects iostream to be available, so load it...
00816    ProcessLine("#include <iostream>", kTRUE);
00817    ProcessLine("#include <string>",kTRUE); // for std::string iostream.
00818 
00819    // The following libs are also useful to have, make sure they are loaded...
00820    //gROOT->LoadClass("TMinuit",     "Minuit");
00821    //gROOT->LoadClass("TPostScript", "Postscript");
00822 
00823    // Load user functions
00824    const char *logon;
00825    logon = gEnv->GetValue("Proof.Load", (char *)0);
00826    if (logon) {
00827       char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00828       if (mac)
00829          ProcessLine(TString::Format(".L %s", logon), kTRUE);
00830       delete [] mac;
00831    }
00832 
00833    // Execute logon macro
00834    logon = gEnv->GetValue("Proof.Logon", (char *)0);
00835    if (logon && !NoLogOpt()) {
00836       char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00837       if (mac)
00838          ProcessFile(logon);
00839       delete [] mac;
00840    }
00841 
00842    // Save current interpreter context
00843    gInterpreter->SaveContext();
00844    gInterpreter->SaveGlobalsContext();
00845 
00846    // Install interrupt and message input handlers
00847    gSystem->AddSignalHandler(new TProofServTerminationHandler(this));
00848    gSystem->AddSignalHandler(new TProofServInterruptHandler(this));
00849    fInputHandler = new TProofServInputHandler(this, sock);
00850    gSystem->AddFileHandler(fInputHandler);
00851 
00852    // if master, start slave servers
00853    if (IsMaster()) {
00854       TString master = "proof://__master__";
00855       TInetAddress a = gSystem->GetSockName(sock);
00856       if (a.IsValid()) {
00857          master += ":";
00858          master += a.GetPort();
00859       }
00860 
00861       // Get plugin manager to load appropriate TProof from
00862       TPluginManager *pm = gROOT->GetPluginManager();
00863       if (!pm) {
00864          Error("CreateServer", "no plugin manager found");
00865          SendLogFile(-99);
00866          Terminate(0);
00867          return -1;
00868       }
00869 
00870       // Find the appropriate handler
00871       TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
00872       if (!h) {
00873          Error("CreateServer", "no plugin found for TProof with a"
00874                              " config file of '%s'", fConfFile.Data());
00875          SendLogFile(-99);
00876          Terminate(0);
00877          return -1;
00878       }
00879 
00880       // load the plugin
00881       if (h->LoadPlugin() == -1) {
00882          Error("CreateServer", "plugin for TProof could not be loaded");
00883          SendLogFile(-99);
00884          Terminate(0);
00885          return -1;
00886       }
00887 
00888       // make instance of TProof
00889       fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
00890                                                           fConfFile.Data(),
00891                                                           GetConfDir(),
00892                                                           fLogLevel, 0));
00893       if (!fProof || !fProof->IsValid()) {
00894          Error("CreateServer", "plugin for TProof could not be executed");
00895          SafeDelete(fProof);
00896          SendLogFile(-99);
00897          Terminate(0);
00898          return -1;
00899       }
00900       // Find out if we are a master in direct contact only with workers
00901       fEndMaster = fProof->IsEndMaster();
00902 
00903       SendLogFile();
00904    }
00905 
00906    // Setup the shutdown timer
00907    if (!fShutdownTimer) {
00908       // Check activity on socket every 5 mins
00909       fShutdownTimer = new TShutdownTimer(this, 300000);
00910       fShutdownTimer->Start(-1, kFALSE);
00911    }
00912 
00913    // Check if schema evolution is effective: clients running versions <=17 do not
00914    // support that: send a warning message
00915    if (fProtocol <= 17) {
00916       TString msg;
00917       msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
00918                "         This may generate compatibility problems between streamed objects.\n"
00919                "         The advise is to move to ROOT >= 5.21/02 .");
00920       SendAsynMessage(msg.Data());
00921    }
00922 
00923    // Setup the idle timer
00924    if (IsMaster() && !fIdleTOTimer) {
00925       // Check activity on socket every 5 mins
00926       Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
00927       if (idle_to > 0) {
00928          fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
00929          fIdleTOTimer->Start(-1, kTRUE);
00930          if (gProofDebugLevel > 0)
00931             Info("CreateServer", " idle timer started (%d secs)", idle_to);
00932       } else if (gProofDebugLevel > 0) {
00933          Info("CreateServer", " idle timer not started (no idle timeout requested)");
00934       }
00935    }
00936 
00937    // Done
00938    return 0;
00939 }
00940 
00941 //______________________________________________________________________________
00942 TProofServ::~TProofServ()
00943 {
00944    // Cleanup. Not really necessary since after this dtor there is no
00945    // live anyway.
00946 
00947    SafeDelete(fWaitingQueries);
00948    SafeDelete(fQMtx);
00949    SafeDelete(fEnabledPackages);
00950    SafeDelete(fSocket);
00951    SafeDelete(fPackageLock);
00952    SafeDelete(fCacheLock);
00953    SafeDelete(fQueryLock);
00954    SafeDelete(fGlobalPackageDirList);
00955    close(fLogFileDes);
00956 }
00957 
00958 //______________________________________________________________________________
00959 Int_t TProofServ::CatMotd()
00960 {
00961    // Print message of the day (in the file pointed by the env PROOFMOTD
00962    // or from fConfDir/etc/proof/motd). The motd is not shown more than
00963    // once a day. If the file pointed by env PROOFNOPROOF exists (or the
00964    // file fConfDir/etc/proof/noproof exists), show its contents and close
00965    // the connection.
00966 
00967    TString lastname;
00968    FILE   *motd;
00969    Bool_t  show = kFALSE;
00970 
00971    // If we are disabled just print the message and close the connection
00972    TString motdname(GetConfDir());
00973    // The env variable PROOFNOPROOF allows to put the file in an alternative
00974    // location not overwritten by a new installation
00975    if (gSystem->Getenv("PROOFNOPROOF")) {
00976       motdname = gSystem->Getenv("PROOFNOPROOF");
00977    } else {
00978       motdname += "/etc/proof/noproof";
00979    }
00980    if ((motd = fopen(motdname, "r"))) {
00981       Int_t c;
00982       printf("\n");
00983       while ((c = getc(motd)) != EOF)
00984          putchar(c);
00985       fclose(motd);
00986       printf("\n");
00987 
00988       return -1;
00989    }
00990 
00991    // get last modification time of the file ~/proof/.prooflast
00992    lastname = TString(GetWorkDir()) + "/.prooflast";
00993    char *last = gSystem->ExpandPathName(lastname.Data());
00994    Long64_t size;
00995    Long_t id, flags, modtime, lasttime;
00996    if (gSystem->GetPathInfo(last, &id, &size, &flags, &lasttime) == 1)
00997       lasttime = 0;
00998 
00999    // show motd at least once per day
01000    if (time(0) - lasttime > (time_t)86400)
01001       show = kTRUE;
01002 
01003    // The env variable PROOFMOTD allows to put the file in an alternative
01004    // location not overwritten by a new installation
01005    if (gSystem->Getenv("PROOFMOTD")) {
01006       motdname = gSystem->Getenv("PROOFMOTD");
01007    } else {
01008       motdname = GetConfDir();
01009       motdname += "/etc/proof/motd";
01010    }
01011    if (gSystem->GetPathInfo(motdname, &id, &size, &flags, &modtime) == 0) {
01012       if (modtime > lasttime || show) {
01013          if ((motd = fopen(motdname, "r"))) {
01014             Int_t c;
01015             printf("\n");
01016             while ((c = getc(motd)) != EOF)
01017                putchar(c);
01018             fclose(motd);
01019             printf("\n");
01020          }
01021       }
01022    }
01023 
01024    if (lasttime)
01025       gSystem->Unlink(last);
01026    Int_t fd = creat(last, 0600);
01027    if (fd >= 0) close(fd);
01028    delete [] last;
01029 
01030    return 0;
01031 }
01032 
01033 //______________________________________________________________________________
01034 TObject *TProofServ::Get(const char *namecycle)
01035 {
01036    // Get object with name "name;cycle" (e.g. "aap;2") from master or client.
01037    // This method is called by TDirectory::Get() in case the object can not
01038    // be found locally.
01039 
01040    fSocket->Send(namecycle, kPROOF_GETOBJECT);
01041 
01042    TObject *idcur = 0;
01043 
01044    Bool_t notdone = kTRUE;
01045    while (notdone) {
01046       TMessage *mess = 0;
01047       if (fSocket->Recv(mess) < 0)
01048          return 0;
01049       Int_t what = mess->What();
01050       if (what == kMESS_OBJECT) {
01051          idcur = mess->ReadObject(mess->GetClass());
01052          notdone = kFALSE;
01053       } else {
01054          Int_t xrc = HandleSocketInput(mess, kFALSE);
01055          if (xrc == -1) {
01056             Error("Get", "command %d cannot be executed while processing", what);
01057          } else if (xrc == -2) {
01058             Error("Get", "unknown command %d ! Protocol error?", what);
01059          }
01060       }
01061       delete mess;
01062    }
01063 
01064    return idcur;
01065 }
01066 
01067 //______________________________________________________________________________
01068 void TProofServ::RestartComputeTime()
01069 {
01070    // Reset the compute time
01071 
01072    fCompute.Stop();
01073    if (fPlayer) {
01074       TProofProgressStatus *status = fPlayer->GetProgressStatus();
01075       if (status) status->SetLearnTime(fCompute.RealTime());
01076       Info("RestartComputeTime", "compute time restarted after %f secs (%d entries)",
01077                                  fCompute.RealTime(), fPlayer->GetLearnEntries());
01078    }
01079    fCompute.Start(kFALSE);
01080 }
01081 
01082 //______________________________________________________________________________
01083 TDSetElement *TProofServ::GetNextPacket(Long64_t totalEntries)
01084 {
01085    // Get next range of entries to be processed on this server.
01086 
01087    Long64_t bytesRead = 0;
01088 
01089    if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
01090 
01091    if (fCompute.Counter() > 0)
01092       fCompute.Stop();
01093 
01094    TMessage req(kPROOF_GETPACKET);
01095    Double_t cputime = fCompute.CpuTime();
01096    Double_t realtime = fCompute.RealTime();
01097 
01098    // Apply inflate factor, if needed
01099    PDB(kLoop, 2)
01100       Info("GetNextPacket", "inflate factor: %d"
01101                             " (realtime: %f, cputime: %f, entries: %lld)",
01102                             fInflateFactor, realtime, cputime, totalEntries);
01103    if (fInflateFactor > 1000) {
01104       UInt_t sleeptime = (UInt_t) (cputime * (fInflateFactor - 1000)) ;
01105       Int_t i = 0;
01106       for (i = kSigBus ; i <= kSigUser2 ; i++)
01107          gSystem->IgnoreSignal((ESignals)i, kTRUE);
01108       gSystem->Sleep(sleeptime);
01109       for (i = kSigBus ; i <= kSigUser2 ; i++)
01110          gSystem->IgnoreSignal((ESignals)i, kFALSE);
01111       realtime += sleeptime / 1000.;
01112       PDB(kLoop, 2)
01113          Info("GetNextPacket","slept %d millisec", sleeptime);
01114    }
01115 
01116    if (fProtocol > 18) {
01117       req << fLatency.RealTime();
01118       TProofProgressStatus *status = 0;
01119       if (fPlayer)
01120          status = fPlayer->GetProgressStatus();
01121       else {
01122          Error("GetNextPacket", "no progress status object");
01123          return 0;
01124       }
01125       // the CPU and wallclock proc times are kept in the TProofServ and here
01126       // added to the status object in the fPlayer.
01127       if (status->GetEntries() > 0) {
01128          PDB(kLoop, 2) status->Print(GetOrdinal());
01129          status->IncProcTime(realtime);
01130          status->IncCPUTime(cputime);
01131       }
01132       req << status;
01133       // Send tree cache information
01134       Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
01135       Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
01136       req << cacheSize << learnent;
01137 
01138       // Sent over the number of entries in the file, used by packetizer do not relying
01139       // on initial validation. Also, -1 means that the file could not be open, which is
01140       // used to flag files as missing
01141       req << totalEntries;
01142 
01143       PDB(kLoop, 1) {
01144          PDB(kLoop, 2) status->Print();
01145          Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
01146       }
01147       status = 0; // status is owned by the player.
01148    } else {
01149       req << fLatency.RealTime() << realtime << cputime
01150           << bytesRead << totalEntries;
01151       if (fPlayer)
01152          req << fPlayer->GetEventsProcessed();
01153    }
01154 
01155    fLatency.Start();
01156    Int_t rc = fSocket->Send(req);
01157    if (rc <= 0) {
01158       Error("GetNextPacket","Send() failed, returned %d", rc);
01159       return 0;
01160    }
01161 
01162    TDSetElement  *e = 0;
01163    Bool_t notdone = kTRUE;
01164    while (notdone) {
01165 
01166       TMessage *mess;
01167       if ((rc = fSocket->Recv(mess)) <= 0) {
01168          fLatency.Stop();
01169          Error("GetNextPacket","Recv() failed, returned %d", rc);
01170          return 0;
01171       }
01172 
01173       Int_t xrc = 0;
01174       TString file, dir, obj;
01175 
01176       Int_t what = mess->What();
01177 
01178       switch (what) {
01179          case kPROOF_GETPACKET:
01180 
01181             fLatency.Stop();
01182             (*mess) >> e;
01183             if (e != 0) {
01184                fCompute.Start();
01185                PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
01186                                  e->GetFileName(), e->GetDirectory(),
01187                                  e->GetObjName(), e->GetFirst(),e->GetNum());
01188             } else {
01189                PDB(kLoop, 2) Info("GetNextPacket", "Done");
01190             }
01191             notdone = kFALSE;
01192             break;
01193 
01194          case kPROOF_STOPPROCESS:
01195             // if a kPROOF_STOPPROCESS message is returned to kPROOF_GETPACKET
01196             // GetNextPacket() will return 0 and the TPacketizer and hence
01197             // TEventIter will be stopped
01198             fLatency.Stop();
01199             PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
01200             break;
01201 
01202          default:
01203             xrc = HandleSocketInput(mess, kFALSE);
01204             if (xrc == -1) {
01205                Error("GetNextPacket", "command %d cannot be executed while processing", what);
01206             } else if (xrc == -2) {
01207                Error("GetNextPacket", "unknown command %d ! Protocol error?", what);
01208             }
01209             break;
01210       }
01211 
01212       delete mess;
01213 
01214    }
01215 
01216    // Done
01217    return e;
01218 }
01219 
01220 //______________________________________________________________________________
01221 void TProofServ::GetOptions(Int_t *argc, char **argv)
01222 {
01223    // Get and handle command line options. Fixed format:
01224    // "proofserv"|"proofslave" <confdir>
01225 
01226    if (*argc <= 1) {
01227       Fatal("GetOptions", "Must be started from proofd with arguments");
01228       exit(1);
01229    }
01230 
01231    if (!strcmp(argv[1], "proofserv")) {
01232       fMasterServ = kTRUE;
01233       fEndMaster = kTRUE;
01234    } else if (!strcmp(argv[1], "proofslave")) {
01235       fMasterServ = kFALSE;
01236       fEndMaster = kFALSE;
01237    } else {
01238       Fatal("GetOptions", "Must be started as 'proofserv' or 'proofslave'");
01239       exit(1);
01240    }
01241 
01242    fService = argv[1];
01243 
01244    // Confdir
01245    if (!(gSystem->Getenv("ROOTCONFDIR"))) {
01246       Fatal("GetOptions", "ROOTCONFDIR shell variable not set");
01247       exit(1);
01248    }
01249    fConfDir = gSystem->Getenv("ROOTCONFDIR");
01250 }
01251 
01252 //______________________________________________________________________________
01253 void TProofServ::HandleSocketInput()
01254 {
01255    // Handle input coming from the client or from the master server.
01256 
01257    // The idle timeout guard: stops the timer and restarts when we return from here
01258    TIdleTOTimerGuard itg(fIdleTOTimer);
01259 
01260    Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
01261    fgRecursive++;
01262 
01263    TMessage *mess;
01264    Int_t rc = 0;
01265    TString exmsg;
01266 
01267    try {
01268    
01269       // Get message
01270       if (fSocket->Recv(mess) <= 0 || !mess) {
01271          // Pending: do something more intelligent here
01272          // but at least get a message in the log file
01273          Error("HandleSocketInput", "retrieving message from input socket");
01274          Terminate(0);
01275          return;
01276       }
01277       Int_t what = mess->What();
01278       PDB(kCollect, 1)
01279          Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
01280 
01281       fNcmd++;
01282 
01283       if (fProof) fProof->SetActive();
01284 
01285       Bool_t doit = kTRUE;
01286 
01287       while (doit) {
01288 
01289          // Process the message
01290          rc = HandleSocketInput(mess, all);
01291          if (rc < 0) {
01292             TString emsg;
01293             if (rc == -1) {
01294                emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what);
01295             } else if (rc == -3) {
01296                emsg.Form("HandleSocketInput: message %d undefined! Protocol error?", what);
01297             } else {
01298                emsg.Form("HandleSocketInput: unknown command %d! Protocol error?", what);
01299             }
01300             SendAsynMessage(emsg.Data());
01301          } else if (rc == 2) {
01302             // Add to the queue
01303             fQueuedMsg->Add(mess);
01304             PDB(kGlobal, 1)
01305                Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
01306                                           what, fQueuedMsg->GetSize());
01307             mess = 0;
01308          }
01309 
01310          // Still something to do?
01311          doit = 0;
01312          if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
01313             // Add to the queue
01314             PDB(kCollect, 1)
01315                Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
01316                                           what, fQueuedMsg->GetSize());
01317             all = 1;
01318             SafeDelete(mess);
01319             mess = (TMessage *) fQueuedMsg->First();
01320             if (mess) fQueuedMsg->Remove(mess);
01321             doit = 1;
01322          }
01323       }
01324    
01325    } catch (std::bad_alloc &) {
01326       // Memory allocation problem:
01327       exmsg.Form("caught exception 'bad_alloc' (memory leak?) %s", fgLastMsg.Data());
01328    } catch (std::exception &exc) {
01329       // Standard exception caught
01330       exmsg.Form("caught standard exception '%s' %s", exc.what(), fgLastMsg.Data());
01331    } catch (int i) {
01332       // Other exception caught
01333       exmsg.Form("caught exception throwing %d %s", i, fgLastMsg.Data());
01334    } catch (const char *str) {
01335       // Other exception caught
01336       exmsg.Form("caught exception throwing '%s' %s", str, fgLastMsg.Data());
01337    } catch (...) {
01338       // Caught other exception
01339       exmsg.Form("caught exception <unknown> %s", fgLastMsg.Data());
01340    }
01341 
01342    // Terminate on exception
01343    if (!exmsg.IsNull()) {
01344       // Save info in the log file too
01345       Error("HandleSocketInput", "%s", exmsg.Data());
01346       // Try to warn the user
01347       SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
01348       // Terminate
01349       Terminate(0);
01350    }
01351    
01352    // Terminate also if a high memory footprint was detected before the related
01353    // exception was thrwon
01354    if (TestBit(TProofServ::kHighMemory)) {
01355       // Save info in the log file too
01356       exmsg.Form("high-memory footprint detected during Process(...) - terminating");
01357       Error("HandleSocketInput", "%s", exmsg.Data());
01358       // Try to warn the user
01359       SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
01360       // Terminate
01361       Terminate(0);
01362    }
01363 
01364    fgRecursive--;
01365 
01366    if (fProof) {
01367       // If something wrong went on during processing and we do not have
01368       // any worker anymore, we shutdown this session
01369       Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
01370       Int_t ngwrks = fProof->GetListOfActiveSlaves()->GetSize() + fProof->GetListOfInactiveSlaves()->GetSize();
01371       if (rc == 0 && ngwrks == 0 && !masterOnly) {
01372          SendAsynMessage(" *** No workers left: cannot continue! Terminating ... *** ");
01373          Terminate(0);
01374       }
01375       fProof->SetActive(kFALSE);
01376       // Reset PROOF to running state
01377       fProof->SetRunStatus(TProof::kRunning);
01378    }
01379 
01380    // Cleanup
01381    SafeDelete(mess);
01382 }
01383 
01384 //______________________________________________________________________________
01385 Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all)
01386 {
01387    // Process input coming from the client or from the master server.
01388    // If 'all' is kFALSE, process only those messages that can be handled
01389    // during qurey processing.
01390    // Returns -1 if the message could not be processed, <-1 if something went
01391    // wrong. Returns 1 if the action may have changed the parallel state.
01392    // Returns 2 if the message has to be enqueued.
01393    // Returns 0 otherwise
01394 
01395    static TStopwatch timer;
01396    char str[2048];
01397    Bool_t aborted = kFALSE;
01398 
01399    if (!mess) return -3;
01400 
01401    Int_t what = mess->What();
01402    PDB(kCollect, 1)
01403       Info("HandleSocketInput", "processing message type %d from '%s'",
01404                                 what, fSocket->GetTitle());
01405 
01406    timer.Start();
01407 
01408    Int_t rc = 0;
01409    TString slb;
01410    TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
01411 
01412    switch (what) {
01413 
01414       case kMESS_CINT:
01415          if (all) {
01416             mess->ReadString(str, sizeof(str));
01417             // Make sure that the relevant files are available
01418             TString fn;
01419             if (TProof::GetFileInCmd(str, fn))
01420                CopyFromCache(fn, 1);
01421             if (IsParallel()) {
01422                fProof->SendCommand(str);
01423             } else {
01424                PDB(kGlobal, 1)
01425                   Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
01426                ProcessLine(str);
01427             }
01428             LogToMaster();
01429          } else {
01430             rc = -1;
01431          }
01432          SendLogFile();
01433          if (pslb) slb = str;
01434          break;
01435 
01436       case kMESS_STRING:
01437          if (all) {
01438             mess->ReadString(str, sizeof(str));
01439          } else {
01440             rc = -1;
01441          }
01442          break;
01443 
01444       case kMESS_OBJECT:
01445          if (all) {
01446             mess->ReadObject(mess->GetClass());
01447          } else {
01448             rc = -1;
01449          }
01450          break;
01451 
01452       case kPROOF_GROUPVIEW:
01453          if (all) {
01454             mess->ReadString(str, sizeof(str));
01455             // coverity[secure_coding]
01456             sscanf(str, "%d %d", &fGroupId, &fGroupSize);
01457          } else {
01458             rc = -1;
01459          }
01460          break;
01461 
01462       case kPROOF_LOGLEVEL:
01463          {  UInt_t mask;
01464             mess->ReadString(str, sizeof(str));
01465             sscanf(str, "%d %u", &fLogLevel, &mask);
01466             gProofDebugLevel = fLogLevel;
01467             gProofDebugMask  = (TProofDebug::EProofDebugMask) mask;
01468             if (IsMaster())
01469                fProof->SetLogLevel(fLogLevel, mask);
01470          }
01471          break;
01472 
01473       case kPROOF_PING:
01474          {  if (IsMaster())
01475                fProof->Ping();
01476             // do nothing (ping is already acknowledged)
01477          }
01478          break;
01479 
01480       case kPROOF_PRINT:
01481          mess->ReadString(str, sizeof(str));
01482          Print(str);
01483          LogToMaster();
01484          SendLogFile();
01485          break;
01486 
01487       case kPROOF_RESET:
01488          if (all) {
01489             mess->ReadString(str, sizeof(str));
01490             Reset(str);
01491          } else {
01492             rc = -1;
01493          }
01494          break;
01495 
01496       case kPROOF_STATUS:
01497          Warning("HandleSocketInput:kPROOF_STATUS",
01498                "kPROOF_STATUS message is obsolete");
01499          fSocket->Send(fProof->GetParallel(), kPROOF_STATUS);
01500          break;
01501 
01502       case kPROOF_GETSTATS:
01503          SendStatistics();
01504          break;
01505 
01506       case kPROOF_GETPARALLEL:
01507          SendParallel();
01508          break;
01509 
01510       case kPROOF_STOP:
01511          if (all) {
01512             if (IsMaster()) {
01513                TString ord;
01514                *mess >> ord;
01515                PDB(kGlobal, 1)
01516                   Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
01517                if (fProof) fProof->TerminateWorker(ord);
01518             } else {
01519                PDB(kGlobal, 1)
01520                   Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
01521                Terminate(0);
01522             }
01523          } else {
01524             rc = -1;
01525          }
01526          break;
01527 
01528       case kPROOF_STOPPROCESS:
01529          if (all) {
01530             // this message makes only sense when the query is being processed,
01531             // however the message can also be received if the user pressed
01532             // ctrl-c, so ignore it!
01533             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
01534          } else {
01535             Long_t timeout = -1;
01536             (*mess) >> aborted;
01537             if (fProtocol > 9)
01538                (*mess) >> timeout;
01539             PDB(kGlobal, 1)
01540                Info("HandleSocketInput:kPROOF_STOPPROCESS",
01541                     "recursive mode: enter %d, %ld", aborted, timeout);
01542             if (fProof)
01543                // On the master: propagate further
01544                fProof->StopProcess(aborted, timeout);
01545             else
01546                // Worker: actually stop processing
01547                if (fPlayer)
01548                   fPlayer->StopProcess(aborted, timeout);
01549          }
01550          break;
01551 
01552       case kPROOF_PROCESS:
01553          {
01554             TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
01555             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
01556             HandleProcess(mess, pslb);
01557             // The log file is send either in HandleProcess or HandleSubmergers.
01558             // The reason is that the order of various messages depend on the
01559             // processing mode (sync/async) and/or merging mode
01560          }
01561          break;
01562 
01563       case kPROOF_QUERYLIST:
01564          {
01565             HandleQueryList(mess);
01566             // Notify
01567             SendLogFile();
01568          }
01569          break;
01570 
01571       case kPROOF_REMOVE:
01572          {
01573             HandleRemove(mess, pslb);
01574             // Notify
01575             SendLogFile();
01576          }
01577          break;
01578 
01579       case kPROOF_RETRIEVE:
01580          {
01581             HandleRetrieve(mess, pslb);
01582             // Notify
01583             SendLogFile();
01584          }
01585          break;
01586 
01587       case kPROOF_ARCHIVE:
01588          {
01589             HandleArchive(mess, pslb);
01590             // Notify
01591             SendLogFile();
01592          }
01593          break;
01594 
01595       case kPROOF_MAXQUERIES:
01596          {  PDB(kGlobal, 1)
01597                Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
01598             TMessage m(kPROOF_MAXQUERIES);
01599             m << fMaxQueries;
01600             fSocket->Send(m);
01601             // Notify
01602             SendLogFile();
01603          }
01604          break;
01605 
01606       case kPROOF_CLEANUPSESSION:
01607          if (all) {
01608             PDB(kGlobal, 1)
01609                Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
01610             TString stag;
01611             (*mess) >> stag;
01612             if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
01613                Printf("Session %s cleaned up", stag.Data());
01614             } else {
01615                Printf("Could not cleanup session %s", stag.Data());
01616             }
01617          } else {
01618             rc = -1;
01619          }
01620          // Notify
01621          SendLogFile();
01622          break;
01623 
01624       case kPROOF_GETENTRIES:
01625          {  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
01626             Bool_t         isTree;
01627             TString        filename;
01628             TString        dir;
01629             TString        objname("undef");
01630             Long64_t       entries = -1;
01631 
01632             if (all) {
01633                (*mess) >> isTree >> filename >> dir >> objname;
01634                PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
01635                                     "Report size of object %s (%s) in dir %s in file %s",
01636                                     objname.Data(), isTree ? "T" : "O",
01637                                     dir.Data(), filename.Data());
01638                entries = TDSet::GetEntries(isTree, filename, dir, objname);
01639                PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
01640                                     "Found %lld %s", entries, isTree ? "entries" : "objects");
01641             } else {
01642                rc = -1;
01643             }
01644             TMessage answ(kPROOF_GETENTRIES);
01645             answ << entries << objname;
01646             SendLogFile(); // in case of error messages
01647             fSocket->Send(answ);
01648             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
01649          }
01650          break;
01651 
01652       case kPROOF_CHECKFILE:
01653          if (!all && fProtocol <= 19) {
01654             // Come back later
01655             rc = 2;
01656          } else {
01657             // Handle file checking request
01658             HandleCheckFile(mess, pslb);
01659          }
01660          break;
01661 
01662       case kPROOF_SENDFILE:
01663          if (!all && fProtocol <= 19) {
01664             // Come back later
01665             rc = 2;
01666          } else {
01667             mess->ReadString(str, sizeof(str));
01668             Long_t size;
01669             Int_t  bin, fw = 1;
01670             char   name[1024];
01671             if (fProtocol > 5) {
01672                sscanf(str, "%1023s %d %ld %d", name, &bin, &size, &fw);
01673             } else {
01674                sscanf(str, "%1023s %d %ld", name, &bin, &size);
01675             }
01676             TString fnam(name);
01677             Bool_t copytocache = kTRUE;
01678             if (fnam.BeginsWith("cache:")) {
01679                fnam.ReplaceAll("cache:", TString::Format("%s/", fCacheDir.Data()));
01680                copytocache = kFALSE;
01681             }
01682             if (size > 0) {
01683                ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
01684             } else {
01685                // Take it from the cache
01686                if (!fnam.BeginsWith(fCacheDir.Data())) {
01687                   fnam.Insert(0, TString::Format("%s/", fCacheDir.Data()));
01688                }
01689             }
01690             // copy file to cache if not a PAR file
01691             if (copytocache && size > 0 &&
01692                 strncmp(fPackageDir, name, fPackageDir.Length()))
01693                CopyToCache(name, 0);
01694             if (IsMaster() && fw == 1) {
01695                Int_t opt = TProof::kForward | TProof::kCp;
01696                if (bin)
01697                   opt |= TProof::kBinary;
01698                PDB(kGlobal, 1)
01699                   Info("HandleSocketInput","forwarding file: %s", fnam.Data());
01700                if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
01701                   Error("HandleSocketInput", "forwarding file: %s", fnam.Data());
01702                }
01703             }
01704             if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE);
01705          }
01706          break;
01707 
01708       case kPROOF_LOGFILE:
01709          {
01710             Int_t start, end;
01711             (*mess) >> start >> end;
01712             PDB(kGlobal, 1)
01713                Info("HandleSocketInput:kPROOF_LOGFILE",
01714                     "Logfile request - byte range: %d - %d", start, end);
01715 
01716             LogToMaster();
01717             SendLogFile(0, start, end);
01718          }
01719          break;
01720 
01721       case kPROOF_PARALLEL:
01722          if (all) {
01723             if (IsMaster()) {
01724                Int_t nodes;
01725                Bool_t random = kFALSE;
01726                (*mess) >> nodes;
01727                if ((mess->BufferSize() > mess->Length()))
01728                   (*mess) >> random;
01729                if (fProof) fProof->SetParallel(nodes, random);
01730                rc = 1;
01731             }
01732          } else {
01733             rc = -1;
01734          }
01735          // Notify
01736          SendLogFile();
01737          break;
01738 
01739       case kPROOF_CACHE:
01740          if (!all && fProtocol <= 19) {
01741             // Come back later
01742             rc = 2;
01743          } else {
01744             TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
01745             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
01746             Int_t status = HandleCache(mess, pslb);
01747             // Notify
01748             SendLogFile(status);
01749          }
01750          break;
01751 
01752       case kPROOF_WORKERLISTS:
01753          if (all) {
01754             if (IsMaster())
01755                HandleWorkerLists(mess);
01756             else
01757                Warning("HandleSocketInput:kPROOF_WORKERLISTS",
01758                        "Action meaning-less on worker nodes: protocol error?");
01759          } else {
01760             rc = -1;
01761          }
01762          // Notify
01763          SendLogFile();
01764          break;
01765 
01766       case kPROOF_GETSLAVEINFO:
01767          if (all) {
01768             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
01769             if (IsMaster()) {
01770                TList *info = fProof->GetListOfSlaveInfos();
01771                TMessage answ(kPROOF_GETSLAVEINFO);
01772                answ << info;
01773                fSocket->Send(answ);
01774             } else {
01775                TMessage answ(kPROOF_GETSLAVEINFO);
01776                TList *info = new TList;
01777                TSlaveInfo *wi = new TSlaveInfo(GetOrdinal(), TUrl(gSystem->HostName()).GetHostFQDN(), 0, "", GetDataDir());
01778                SysInfo_t si;
01779                gSystem->GetSysInfo(&si);
01780                wi->SetSysInfo(si);
01781                info->Add(wi);
01782                answ << (TList *)info;
01783                fSocket->Send(answ);
01784                info->SetOwner(kTRUE);
01785                delete info;
01786             }
01787 
01788             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
01789          } else {
01790             TMessage answ(kPROOF_GETSLAVEINFO);
01791             answ << (TList *)0;
01792             fSocket->Send(answ);
01793             rc = -1;
01794          }
01795          break;
01796 
01797       case kPROOF_GETTREEHEADER:
01798          if (all) {
01799             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
01800 
01801             TVirtualProofPlayer *p = TVirtualProofPlayer::Create("slave", 0, fSocket);
01802             p->HandleGetTreeHeader(mess);
01803             delete p;
01804 
01805             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
01806          } else {
01807             TMessage answ(kPROOF_GETTREEHEADER);
01808             answ << TString("Failed") << (TObject *)0;
01809             fSocket->Send(answ);
01810             rc = -1;
01811          }
01812          break;
01813 
01814       case kPROOF_GETOUTPUTLIST:
01815          {  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
01816             TList* outputList = 0;
01817             if (IsMaster()) {
01818                outputList = fProof->GetOutputList();
01819                if (!outputList)
01820                   outputList = new TList();
01821             } else {
01822                outputList = new TList();
01823                if (fProof->GetPlayer()) {
01824                   TList *olist = fProof->GetPlayer()->GetOutputList();
01825                   TIter next(olist);
01826                   TObject *o;
01827                   while ( (o = next()) ) {
01828                      outputList->Add(new TNamed(o->GetName(), ""));
01829                   }
01830                }
01831             }
01832             outputList->SetOwner();
01833             TMessage answ(kPROOF_GETOUTPUTLIST);
01834             answ << outputList;
01835             fSocket->Send(answ);
01836             delete outputList;
01837             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
01838          }
01839          break;
01840 
01841       case kPROOF_VALIDATE_DSET:
01842          if (all) {
01843             PDB(kGlobal, 1)
01844                Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
01845 
01846             TDSet* dset = 0;
01847             (*mess) >> dset;
01848 
01849             if (IsMaster()) fProof->ValidateDSet(dset);
01850             else dset->Validate();
01851 
01852             TMessage answ(kPROOF_VALIDATE_DSET);
01853             answ << dset;
01854             fSocket->Send(answ);
01855             delete dset;
01856             PDB(kGlobal, 1)
01857                Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
01858          } else {
01859             rc = -1;
01860          }
01861          // Notify
01862          SendLogFile();
01863          break;
01864 
01865       case kPROOF_DATA_READY:
01866          if (all) {
01867             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
01868             TMessage answ(kPROOF_DATA_READY);
01869             if (IsMaster()) {
01870                Long64_t totalbytes = 0, bytesready = 0;
01871                Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
01872                answ << dataready << totalbytes << bytesready;
01873             } else {
01874                Error("HandleSocketInput:kPROOF_DATA_READY",
01875                      "This message should not be sent to slaves");
01876                answ << kFALSE << Long64_t(0) << Long64_t(0);
01877             }
01878             fSocket->Send(answ);
01879             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
01880          } else {
01881             TMessage answ(kPROOF_DATA_READY);
01882             answ << kFALSE << Long64_t(0) << Long64_t(0);
01883             fSocket->Send(answ);
01884             rc = -1;
01885          }
01886          // Notify
01887          SendLogFile();
01888          break;
01889 
01890       case kPROOF_DATASETS:
01891          {  Int_t xrc = -1;
01892             if (fProtocol > 16) {
01893                xrc = HandleDataSets(mess, pslb);
01894             } else {
01895                Error("HandleSocketInput", "old client: no or incompatible dataset support");
01896             }
01897             SendLogFile(xrc);
01898          }
01899          break;
01900 
01901       case kPROOF_SUBMERGER:
01902          {  HandleSubmerger(mess);
01903          }
01904          break;
01905 
01906       case kPROOF_LIB_INC_PATH:
01907          if (all) {
01908             HandleLibIncPath(mess);
01909          } else {
01910             rc = -1;
01911          }
01912          // Notify the client
01913          SendLogFile();
01914          break;
01915 
01916       case kPROOF_REALTIMELOG:
01917          {  Bool_t on;
01918             (*mess) >> on;
01919             PDB(kGlobal, 1)
01920                Info("HandleSocketInput:kPROOF_REALTIMELOG",
01921                     "setting real-time logging %s", (on ? "ON" : "OFF"));
01922             fRealTimeLog = on;
01923             // Forward the request to lower levels
01924             if (IsMaster())
01925                fProof->SetRealTimeLog(on);
01926          }
01927          break;
01928 
01929       case kPROOF_FORK:
01930          if (all) {
01931             HandleFork(mess);
01932             LogToMaster();
01933          } else {
01934             rc = -1;
01935          }
01936          SendLogFile();
01937          break;
01938 
01939       case kPROOF_STARTPROCESS:
01940          if (all) {
01941             // This message resumes the session; should not come during processing.
01942 
01943             if (WaitingQueries() == 0) {
01944                Error("HandleSocketInput", "no queries enqueued");
01945                break;
01946             }
01947 
01948             // Similar to handle process
01949             // get the list of workers and start them
01950             TList *workerList = (fProof->UseDynamicStartup()) ? new TList : (TList *)0;
01951             Int_t pc = 0;
01952             EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
01953 
01954             if (retVal == TProofServ::kQueryOK) {
01955                Int_t ret = 0;
01956                if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
01957                   Error("HandleSocketInput", "adding a list of worker nodes returned: %d", ret);
01958                } else {
01959                   ProcessNext(pslb);
01960                   // Set idle
01961                   SetIdle(kTRUE);
01962                   // Signal the client that we are idle
01963                   TMessage m(kPROOF_SETIDLE);
01964                   Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
01965                   m << waiting;
01966                   fSocket->Send(m);
01967                }
01968             } else {
01969                if (retVal == TProofServ::kQueryStop) {
01970                   Error("HandleSocketInput", "error getting list of worker nodes");
01971                } else if (retVal != TProofServ::kQueryEnqueued) {
01972                   Warning("HandleSocketInput", "query was re-queued!");
01973                } else {
01974                   Error("HandleSocketInput", "unexpected answer: %d", retVal);
01975                   break;
01976                }
01977             }
01978 
01979          }
01980          break;
01981 
01982       case kPROOF_GOASYNC:
01983          {  // The client requested to switch to asynchronous mode:
01984             // communicate the sequential number of the running query for later
01985             // identification, if any
01986             if (!IsIdle() && fPlayer) {
01987                // Get query currently being processed
01988                TProofQueryResult *pq = (TProofQueryResult *) fPlayer->GetCurrentQuery();
01989                TMessage m(kPROOF_QUERYSUBMITTED);
01990                m << pq->GetSeqNum() << kFALSE;
01991                fSocket->Send(m);
01992             } else {
01993                // Idle or undefined: nothing to do; ignore
01994                SendAsynMessage("Processing request to go asynchronous:"
01995                                " idle or undefined player - ignoring");
01996             }
01997          }
01998          break;
01999 
02000       default:
02001          Error("HandleSocketInput", "unknown command %d", what);
02002          rc = -2;
02003          break;
02004    }
02005 
02006    fRealTime += (Float_t)timer.RealTime();
02007    fCpuTime  += (Float_t)timer.CpuTime();
02008 
02009    if (!(slb.IsNull()) || fgLogToSysLog > 1) {
02010       TString s;
02011       s.Form("%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
02012                                    what, timer.RealTime(), timer.CpuTime(), slb.Data());
02013       gSystem->Syslog(kLogNotice, s.Data());
02014    }
02015 
02016    // Done
02017    return rc;
02018 }
02019 
02020 //______________________________________________________________________________
02021 Bool_t TProofServ::AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer)
02022 {
02023    // Accept and merge results from a set of workers
02024 
02025    TMessage *mess = new TMessage();
02026    Int_t mergedWorkers = 0;
02027 
02028    PDB(kSubmerger, 1)  Info("AcceptResults", "enter");
02029 
02030    // Overall result of this procedure
02031    Bool_t result = kTRUE;
02032 
02033    fMergingMonitor = new TMonitor();
02034    fMergingMonitor->Add(fMergingSocket);
02035 
02036    Int_t numworkers = 0;
02037    while (fMergingMonitor->GetActive() > 0 && mergedWorkers <  connections) {
02038 
02039       TSocket *s = fMergingMonitor->Select();
02040       if (!s) {
02041          Info("AcceptResults", "interrupt!");
02042          result = kFALSE;
02043          break;
02044       }
02045 
02046       if (s == fMergingSocket) {
02047          // New incoming connection
02048          TSocket *sw = fMergingSocket->Accept();
02049          fMergingMonitor->Add(sw);
02050 
02051          PDB(kSubmerger, 2)
02052             Info("AcceptResults", "connection from a worker accepted on merger %s ",
02053                                   fOrdinal.Data());
02054          // All assigned workers are connected
02055          if (++numworkers >= connections)
02056             fMergingMonitor->Remove(fMergingSocket);
02057       } else {
02058          s->Recv(mess);
02059          PDB(kSubmerger, 2)
02060             Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
02061          if (!mess) {
02062             Error("AcceptResults", "message received: %p ", mess);
02063             continue;
02064          }
02065          Int_t type = 0;
02066 
02067          // Read output objec(s) from the received message
02068          while ((mess->BufferSize() > mess->Length())) {
02069             (*mess) >> type;
02070 
02071             PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
02072             if (type == 2) {
02073                mergedWorkers++;
02074                PDB(kSubmerger, 2)
02075                   Info("AcceptResults",
02076                        "a new worker has been mergerd. Total merged workers: %d",
02077                        mergedWorkers);
02078             }
02079             TObject *o = mess->ReadObject(TObject::Class());
02080             if (mergerPlayer->AddOutputObject(o) == 1) {
02081                // Remove the object if it has been merged
02082                PDB(kSubmerger, 2)  Info("AcceptResults", "removing %p (has been merged)", o);
02083                SafeDelete(o);
02084             } else
02085                PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
02086          }
02087       }
02088    }
02089    fMergingMonitor->DeActivateAll();
02090 
02091    TList* sockets = fMergingMonitor->GetListOfDeActives();
02092    Int_t size = sockets->GetSize();
02093    for (Int_t i =0; i< size; ++i){
02094       ((TSocket*)(sockets->At(i)))->Close();
02095       PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
02096       delete ((TSocket*)(sockets->At(i)));
02097    }
02098 
02099    fMergingMonitor->RemoveAll();
02100    SafeDelete(fMergingMonitor);
02101 
02102    PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
02103    return result;
02104 }
02105 
02106 //______________________________________________________________________________
02107 void TProofServ::HandleUrgentData()
02108 {
02109    // Handle Out-Of-Band data sent by the master or client.
02110 
02111    char  oob_byte;
02112    Int_t n, nch, wasted = 0;
02113 
02114    const Int_t kBufSize = 1024;
02115    char waste[kBufSize];
02116 
02117    // Real-time notification of messages
02118    TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
02119 
02120    PDB(kGlobal, 5)
02121       Info("HandleUrgentData", "handling oob...");
02122 
02123    // Receive the OOB byte
02124    while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
02125       if (n == -2) {   // EWOULDBLOCK
02126          //
02127          // The OOB data has not yet arrived: flush the input stream
02128          //
02129          // In some systems (Solaris) regular recv() does not return upon
02130          // receipt of the oob byte, which makes the below call to recv()
02131          // block indefinitely if there are no other data in the queue.
02132          // FIONREAD ioctl can be used to check if there are actually any
02133          // data to be flushed.  If not, wait for a while for the oob byte
02134          // to arrive and try to read it again.
02135          //
02136          fSocket->GetOption(kBytesToRead, nch);
02137          if (nch == 0) {
02138             gSystem->Sleep(1000);
02139             continue;
02140          }
02141 
02142          if (nch > kBufSize) nch = kBufSize;
02143          n = fSocket->RecvRaw(waste, nch);
02144          if (n <= 0) {
02145             Error("HandleUrgentData", "error receiving waste");
02146             break;
02147          }
02148          wasted = 1;
02149       } else {
02150          Error("HandleUrgentData", "error receiving OOB");
02151          return;
02152       }
02153    }
02154 
02155    PDB(kGlobal, 5)
02156       Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
02157 
02158    if (fProof) fProof->SetActive();
02159 
02160    switch (oob_byte) {
02161 
02162       case TProof::kHardInterrupt:
02163          Info("HandleUrgentData", "*** Hard Interrupt");
02164 
02165          // If master server, propagate interrupt to slaves
02166          if (IsMaster())
02167             fProof->Interrupt(TProof::kHardInterrupt);
02168 
02169          // Flush input socket
02170          while (1) {
02171             Int_t atmark;
02172 
02173             fSocket->GetOption(kAtMark, atmark);
02174 
02175             if (atmark) {
02176                // Send the OOB byte back so that the client knows where
02177                // to stop flushing its input stream of obsolete messages
02178                n = fSocket->SendRaw(&oob_byte, 1, kOob);
02179                if (n <= 0)
02180                   Error("HandleUrgentData", "error sending OOB");
02181                break;
02182             }
02183 
02184             // find out number of bytes to read before atmark
02185             fSocket->GetOption(kBytesToRead, nch);
02186             if (nch == 0) {
02187                gSystem->Sleep(1000);
02188                continue;
02189             }
02190 
02191             if (nch > kBufSize) nch = kBufSize;
02192             n = fSocket->RecvRaw(waste, nch);
02193             if (n <= 0) {
02194                Error("HandleUrgentData", "error receiving waste (2)");
02195                break;
02196             }
02197          }
02198 
02199          SendLogFile();
02200 
02201          break;
02202 
02203       case TProof::kSoftInterrupt:
02204          Info("HandleUrgentData", "Soft Interrupt");
02205 
02206          // If master server, propagate interrupt to slaves
02207          if (IsMaster())
02208             fProof->Interrupt(TProof::kSoftInterrupt);
02209 
02210          if (wasted) {
02211             Error("HandleUrgentData", "soft interrupt flushed stream");
02212             break;
02213          }
02214 
02215          Interrupt();
02216 
02217          SendLogFile();
02218 
02219          break;
02220 
02221       case TProof::kShutdownInterrupt:
02222          Info("HandleUrgentData", "Shutdown Interrupt");
02223 
02224          // If master server, propagate interrupt to slaves
02225          if (IsMaster())
02226             fProof->Interrupt(TProof::kShutdownInterrupt);
02227 
02228          Terminate(0);
02229 
02230          break;
02231 
02232       default:
02233          Error("HandleUrgentData", "unexpected OOB byte");
02234          break;
02235    }
02236 
02237    if (fProof) fProof->SetActive(kFALSE);
02238 }
02239 
02240 //______________________________________________________________________________
02241 void TProofServ::HandleSigPipe()
02242 {
02243    // Called when the client is not alive anymore (i.e. when kKeepAlive
02244    // has failed).
02245 
02246    // Real-time notification of messages
02247    TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
02248 
02249    if (IsMaster()) {
02250       // Check if we are here because client is closed. Try to ping client,
02251       // if that works it we are here because some slave died
02252       if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
02253          Info("HandleSigPipe", "keepAlive probe failed");
02254          // Tell slaves we are going to close since there is no client anymore
02255 
02256          fProof->SetActive();
02257          fProof->Interrupt(TProof::kShutdownInterrupt);
02258          fProof->SetActive(kFALSE);
02259          Terminate(0);
02260       }
02261    } else {
02262       Info("HandleSigPipe", "keepAlive probe failed");
02263       Terminate(0);  // will not return from here....
02264    }
02265 }
02266 
02267 //______________________________________________________________________________
02268 Bool_t TProofServ::IsParallel() const
02269 {
02270    // True if in parallel mode.
02271 
02272    if (IsMaster() && fProof)
02273       return fProof->IsParallel();
02274 
02275    // false in case we are a slave
02276    return kFALSE;
02277 }
02278 
02279 //______________________________________________________________________________
02280 void TProofServ::Print(Option_t *option) const
02281 {
02282    // Print status of slave server.
02283 
02284    if (IsMaster() && fProof)
02285       fProof->Print(option);
02286    else
02287       Printf("This is worker %s", gSystem->HostName());
02288 }
02289 
02290 //______________________________________________________________________________
02291 void TProofServ::RedirectOutput(const char *dir, const char *mode)
02292 {
02293    // Redirect stdout to a log file. This log file will be flushed to the
02294    // client or master after each command.
02295 
02296    char logfile[512];
02297 
02298    TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
02299    if (IsMaster()) {
02300       snprintf(logfile, 512, "%s/master-%s.log", sdir.Data(), fOrdinal.Data());
02301    } else {
02302       snprintf(logfile, 512, "%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
02303    }
02304 
02305    if ((freopen(logfile, mode, stdout)) == 0)
02306       SysError("RedirectOutput", "could not freopen stdout (%s)", logfile);
02307 
02308    if ((dup2(fileno(stdout), fileno(stderr))) < 0)
02309       SysError("RedirectOutput", "could not redirect stderr");
02310 
02311    if ((fLogFile = fopen(logfile, "r")) == 0)
02312       SysError("RedirectOutput", "could not open logfile '%s'", logfile);
02313 
02314    // from this point on stdout and stderr are properly redirected
02315    if (fProtocol < 4 && fWorkDir != TString::Format("~/%s", kPROOF_WorkDir)) {
02316       Warning("RedirectOutput", "no way to tell master (or client) where"
02317               " to upload packages");
02318    }
02319 }
02320 
02321 //______________________________________________________________________________
02322 void TProofServ::Reset(const char *dir)
02323 {
02324    // Reset PROOF environment to be ready for execution of next command.
02325 
02326    // First go to new directory. Check first that we got a reasonable path;
02327    // in PROOF-Lite it may not be the case
02328    TString dd(dir);
02329    if (!dd.BeginsWith("proofserv")) {
02330       Int_t ic = dd.Index(":");
02331       if (ic != kNPOS)
02332          dd.Replace(0, ic, "proofserv");
02333    }
02334    gDirectory->cd(dd.Data());
02335 
02336    // Clear interpreter environment.
02337    gROOT->Reset();
02338 
02339    // Make sure current directory is empty (don't delete anything when
02340    // we happen to be in the ROOT memory only directory!?)
02341    if (gDirectory != gROOT) {
02342       gDirectory->Delete();
02343    }
02344 
02345    if (IsMaster()) fProof->SendCurrentState();
02346 }
02347 
02348 //______________________________________________________________________________
02349 Int_t TProofServ::ReceiveFile(const char *file, Bool_t bin, Long64_t size)
02350 {
02351    // Receive a file, either sent by a client or a master server.
02352    // If bin is true it is a binary file, other wise it is an ASCII
02353    // file and we need to check for Windows \r tokens. Returns -1 in
02354    // case of error, 0 otherwise.
02355 
02356    if (size <= 0) return 0;
02357 
02358    // open file, overwrite already existing file
02359    Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
02360    if (fd < 0) {
02361       SysError("ReceiveFile", "error opening file %s", file);
02362       return -1;
02363    }
02364 
02365    const Int_t kMAXBUF = 16384;  //32768  //16384  //65536;
02366    char buf[kMAXBUF], cpy[kMAXBUF];
02367 
02368    Int_t    left, r;
02369    Long64_t filesize = 0;
02370 
02371    while (filesize < size) {
02372       left = Int_t(size - filesize);
02373       if (left > kMAXBUF)
02374          left = kMAXBUF;
02375       r = fSocket->RecvRaw(&buf, left);
02376       if (r > 0) {
02377          char *p = buf;
02378 
02379          filesize += r;
02380          while (r) {
02381             Int_t w;
02382 
02383             if (!bin) {
02384                Int_t k = 0, i = 0, j = 0;
02385                char *q;
02386                while (i < r) {
02387                   if (p[i] == '\r') {
02388                      i++;
02389                      k++;
02390                   }
02391                   cpy[j++] = buf[i++];
02392                }
02393                q = cpy;
02394                r -= k;
02395                w = write(fd, q, r);
02396             } else {
02397                w = write(fd, p, r);
02398             }
02399 
02400             if (w < 0) {
02401                SysError("ReceiveFile", "error writing to file %s", file);
02402                close(fd);
02403                return -1;
02404             }
02405             r -= w;
02406             p += w;
02407          }
02408       } else if (r < 0) {
02409          Error("ReceiveFile", "error during receiving file %s", file);
02410          close(fd);
02411          return -1;
02412       }
02413    }
02414 
02415    close(fd);
02416 
02417    chmod(file, 0644);
02418 
02419    return 0;
02420 }
02421 
02422 //______________________________________________________________________________
02423 void TProofServ::Run(Bool_t retrn)
02424 {
02425    // Main server eventloop.
02426 
02427    // Setup the server
02428    if (CreateServer() == 0) {
02429 
02430       // Run the main event loop
02431       TApplication::Run(retrn);
02432    }
02433 }
02434 
02435 //______________________________________________________________________________
02436 void TProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
02437 {
02438    // Send log file to master.
02439    // If start > -1 send only bytes in the range from start to end,
02440    // if end <= start send everything from start.
02441 
02442    // Determine the number of bytes left to be read from the log file.
02443    fflush(stdout);
02444 
02445    // On workers we do not send the logs to masters (to avoid duplication of
02446    // text) unless asked explicitely, e.g. after an Exec(...) request.
02447    if (!IsMaster()) {
02448       if (!fSendLogToMaster) {
02449          FlushLogFile();
02450       } else {
02451          // Decide case by case
02452          LogToMaster(kFALSE);
02453       }
02454    }
02455 
02456    off_t ltot=0, lnow=0;
02457    Int_t left = -1;
02458    Bool_t adhoc = kFALSE;
02459 
02460    if (fLogFileDes > -1) {
02461       ltot = lseek(fileno(stdout),   (off_t) 0, SEEK_END);
02462       lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
02463 
02464       if (ltot >= 0 && lnow >= 0) {
02465          if (start > -1) {
02466             lseek(fLogFileDes, (off_t) start, SEEK_SET);
02467             if (end <= start || end > ltot)
02468                end = ltot;
02469             left = (Int_t)(end - start);
02470             if (end < ltot)
02471                left++;
02472             adhoc = kTRUE;
02473          } else {
02474             left = (Int_t)(ltot - lnow);
02475          }
02476       }
02477    }
02478 
02479    if (left > 0) {
02480       fSocket->Send(left, kPROOF_LOGFILE);
02481 
02482       const Int_t kMAXBUF = 32768;  //16384  //65536;
02483       char buf[kMAXBUF];
02484       Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
02485       Int_t len;
02486       do {
02487          while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
02488                 TSystem::GetErrno() == EINTR)
02489             TSystem::ResetErrno();
02490 
02491          if (len < 0) {
02492             SysError("SendLogFile", "error reading log file");
02493             break;
02494          }
02495 
02496          if (end == ltot && len == wanted)
02497             buf[len-1] = '\n';
02498 
02499          if (fSocket->SendRaw(buf, len) < 0) {
02500             SysError("SendLogFile", "error sending log file");
02501             break;
02502          }
02503 
02504          // Update counters
02505          left -= len;
02506          wanted = (left > kMAXBUF) ? kMAXBUF : left;
02507 
02508       } while (len > 0 && left > 0);
02509    }
02510 
02511    // Restore initial position if partial send
02512    if (adhoc && lnow >=0 )
02513       lseek(fLogFileDes, lnow, SEEK_SET);
02514 
02515    TMessage mess(kPROOF_LOGDONE);
02516    if (IsMaster())
02517       mess << status << (fProof ? fProof->GetParallel() : 0);
02518    else
02519       mess << status << (Int_t) 1;
02520 
02521    fSocket->Send(mess);
02522 
02523    PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
02524 }
02525 
02526 //______________________________________________________________________________
02527 void TProofServ::SendStatistics()
02528 {
02529    // Send statistics of slave server to master or client.
02530 
02531    Long64_t bytesread = TFile::GetFileBytesRead();
02532    Float_t cputime = fCpuTime, realtime = fRealTime;
02533    if (IsMaster()) {
02534       bytesread = fProof->GetBytesRead();
02535       cputime = fProof->GetCpuTime();
02536       realtime = fProof->GetRealTime();
02537    }
02538 
02539    TMessage mess(kPROOF_GETSTATS);
02540    TString workdir = gSystem->WorkingDirectory();  // expect TString on other side
02541    mess << bytesread << realtime << cputime << workdir;
02542    if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
02543    mess << TString(gProofServ->GetImage());
02544    fSocket->Send(mess);
02545 }
02546 
02547 //______________________________________________________________________________
02548 void TProofServ::SendParallel(Bool_t async)
02549 {
02550    // Send number of parallel nodes to master or client.
02551 
02552    Int_t nparallel = 0;
02553    if (IsMaster()) {
02554       fProof->AskParallel();
02555       nparallel = fProof->GetParallel();
02556    } else {
02557       nparallel = 1;
02558    }
02559 
02560    TMessage mess(kPROOF_GETPARALLEL);
02561    mess << nparallel << async;
02562    fSocket->Send(mess);
02563 }
02564 
02565 //______________________________________________________________________________
02566 Int_t TProofServ::UnloadPackage(const char *package)
02567 {
02568    // Removes link to package in working directory,
02569    // removes entry from include path,
02570    // removes entry from enabled package list,
02571    // does not currently remove entry from interpreter include path.
02572    // Returns -1 in case of error, 0 otherwise.
02573 
02574    TObjString *pack = (TObjString *) fEnabledPackages->FindObject(package);
02575    if (pack) {
02576 
02577       // Remove entry from include path
02578       TString aclicincpath = gSystem->GetIncludePath();
02579       TString cintincpath = gInterpreter->GetIncludePath();
02580       // remove interpreter part of gSystem->GetIncludePath()
02581       aclicincpath.Remove(aclicincpath.Length() - cintincpath.Length() - 1);
02582       // remove package's include path
02583       aclicincpath.ReplaceAll(TString(" -I") + package, "");
02584       gSystem->SetIncludePath(aclicincpath);
02585 
02586       //TODO reset interpreter include path
02587 
02588       // remove entry from enabled packages list
02589       delete fEnabledPackages->Remove(pack);
02590       PDB(kPackage, 1)
02591          Info("UnloadPackage",
02592               "package %s successfully unloaded", package);
02593    }
02594 
02595    // Cleanup the link, if there
02596    if (!gSystem->AccessPathName(package))
02597       if (gSystem->Unlink(package) != 0)
02598          Warning("UnloadPackage", "unable to remove symlink to %s", package);
02599 
02600    // We are done
02601    return 0;
02602 }
02603 
02604 //______________________________________________________________________________
02605 Int_t TProofServ::UnloadPackages()
02606 {
02607    // Unloads all enabled packages. Returns -1 in case of error, 0 otherwise.
02608 
02609    // Iterate over packages and remove each package
02610    TIter nextpackage(fEnabledPackages);
02611    while (TObjString* objstr = dynamic_cast<TObjString*>(nextpackage()))
02612       if (UnloadPackage(objstr->String()) != 0)
02613          return -1;
02614 
02615    PDB(kPackage, 1)
02616       Info("UnloadPackages",
02617            "packages successfully unloaded");
02618 
02619    return 0;
02620 }
02621 
02622 //______________________________________________________________________________
02623 Int_t TProofServ::Setup()
02624 {
02625    // Print the ProofServ logo on standard output.
02626    // Return 0 on success, -1 on failure
02627 
02628    char str[512];
02629 
02630    if (IsMaster()) {
02631       snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
02632    } else {
02633       snprintf(str, 512, "**** PROOF slave server @ %s started ****", gSystem->HostName());
02634    }
02635 
02636    if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
02637       Error("Setup", "failed to send proof server startup message");
02638       return -1;
02639    }
02640 
02641    // exchange protocol level between client and master and between
02642    // master and slave
02643    Int_t what;
02644    if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
02645       Error("Setup", "failed to receive remote proof protocol");
02646       return -1;
02647    }
02648    if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
02649       Error("Setup", "failed to send local proof protocol");
02650       return -1;
02651    }
02652 
02653    // If old version, setup authentication related stuff
02654    if (fProtocol < 5) {
02655       TString wconf;
02656       if (OldAuthSetup(wconf) != 0) {
02657          Error("Setup", "OldAuthSetup: failed to setup authentication");
02658          return -1;
02659       }
02660       if (IsMaster()) {
02661          fConfFile = wconf;
02662          fWorkDir.Form("~/%s", kPROOF_WorkDir);
02663       } else {
02664          if (fProtocol < 4) {
02665             fWorkDir.Form("~/%s", kPROOF_WorkDir);
02666          } else {
02667             fWorkDir = wconf;
02668             if (fWorkDir.IsNull()) fWorkDir.Form("~/%s", kPROOF_WorkDir);
02669          }
02670       }
02671    } else {
02672 
02673       // Receive some useful information
02674       TMessage *mess;
02675       if ((fSocket->Recv(mess) <= 0) || !mess) {
02676          Error("Setup", "failed to receive ordinal and config info");
02677          return -1;
02678       }
02679       if (IsMaster()) {
02680          (*mess) >> fUser >> fOrdinal >> fConfFile;
02681          fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
02682       } else {
02683          (*mess) >> fUser >> fOrdinal >> fWorkDir;
02684          if (fWorkDir.IsNull())
02685             fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
02686       }
02687       // Set the correct prefix
02688       if (fOrdinal != "-1")
02689          fPrefix += fOrdinal;
02690       TProofServLogHandler::SetDefaultPrefix(fPrefix);
02691       delete mess;
02692    }
02693 
02694    if (IsMaster()) {
02695 
02696       // strip off any prooftype directives
02697       TString conffile = fConfFile;
02698       conffile.Remove(0, 1 + conffile.Index(":"));
02699 
02700       // parse config file to find working directory
02701       TProofResourcesStatic resources(fConfDir, conffile);
02702       if (resources.IsValid()) {
02703          if (resources.GetMaster()) {
02704             TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
02705             if (tmpWorkDir != "")
02706                fWorkDir = tmpWorkDir;
02707          }
02708       } else {
02709          Info("Setup", "invalid config file %s (missing or unreadable",
02710                         resources.GetFileName().Data());
02711       }
02712    }
02713 
02714    // Set $HOME and $PATH. The HOME directory was already set to the
02715    // user's home directory by proofd.
02716    gSystem->Setenv("HOME", gSystem->HomeDirectory());
02717 
02718    // Add user name in case of non default workdir
02719    if (fWorkDir.BeginsWith("/") &&
02720       !fWorkDir.BeginsWith(gSystem->HomeDirectory())) {
02721       if (!fWorkDir.EndsWith("/"))
02722          fWorkDir += "/";
02723       UserGroup_t *u = gSystem->GetUserInfo();
02724       if (u) {
02725          fWorkDir += u->fUser;
02726          delete u;
02727       }
02728    }
02729 
02730    // Goto to the main PROOF working directory
02731    char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
02732    fWorkDir = workdir;
02733    delete [] workdir;
02734    if (gProofDebugLevel > 0)
02735       Info("Setup", "working directory set to %s", fWorkDir.Data());
02736 
02737    // host first name
02738    TString host = gSystem->HostName();
02739    if (host.Index(".") != kNPOS)
02740       host.Remove(host.Index("."));
02741 
02742    // Session tag
02743    fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
02744                     (Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
02745    fTopSessionTag = fSessionTag;
02746 
02747    // create session directory and make it the working directory
02748    fSessionDir = fWorkDir;
02749    if (IsMaster())
02750       fSessionDir += "/master-";
02751    else
02752       fSessionDir += "/slave-";
02753    fSessionDir += fSessionTag;
02754 
02755    // Common setup
02756    if (SetupCommon() != 0) {
02757       Error("Setup", "common setup failed");
02758       return -1;
02759    }
02760 
02761    // Incoming OOB should generate a SIGURG
02762    fSocket->SetOption(kProcessGroup, gSystem->GetPid());
02763 
02764    // Send packets off immediately to reduce latency
02765    fSocket->SetOption(kNoDelay, 1);
02766 
02767    // Check every two hours if client is still alive
02768    fSocket->SetOption(kKeepAlive, 1);
02769 
02770    // Done
02771    return 0;
02772 }
02773 
02774 //______________________________________________________________________________
02775 Int_t TProofServ::SetupCommon()
02776 {
02777    // Common part (between TProofServ and TXProofServ) of the setup phase.
02778    // Return 0 on success, -1 on error
02779 
02780    // deny write access for group and world
02781    gSystem->Umask(022);
02782 
02783 #ifdef R__UNIX
02784    TString bindir;
02785 # ifdef ROOTBINDIR
02786    bindir = ROOTBINDIR;
02787 # else
02788    bindir = gSystem->Getenv("ROOTSYS");
02789    if (!bindir.IsNull()) bindir += "/bin";
02790 # endif
02791 # ifdef COMPILER
02792    TString compiler = COMPILER;
02793    if (compiler.Index("is ") != kNPOS)
02794       compiler.Remove(0, compiler.Index("is ") + 3);
02795    compiler = gSystem->DirName(compiler);
02796    if (!bindir.IsNull()) bindir += ":";
02797    bindir += compiler;
02798 #endif
02799    if (!bindir.IsNull()) bindir += ":";
02800    bindir += "/bin:/usr/bin:/usr/local/bin";
02801    // Add bindir to PATH
02802    TString path(gSystem->Getenv("PATH"));
02803    if (!path.IsNull()) path.Insert(0, ":");
02804    path.Insert(0, bindir);
02805    gSystem->Setenv("PATH", path);
02806 #endif
02807 
02808    if (gSystem->AccessPathName(fWorkDir)) {
02809       gSystem->mkdir(fWorkDir, kTRUE);
02810       if (!gSystem->ChangeDirectory(fWorkDir)) {
02811          Error("SetupCommon", "can not change to PROOF directory %s",
02812                fWorkDir.Data());
02813          return -1;
02814       }
02815    } else {
02816       if (!gSystem->ChangeDirectory(fWorkDir)) {
02817          gSystem->Unlink(fWorkDir);
02818          gSystem->mkdir(fWorkDir, kTRUE);
02819          if (!gSystem->ChangeDirectory(fWorkDir)) {
02820             Error("SetupCommon", "can not change to PROOF directory %s",
02821                      fWorkDir.Data());
02822             return -1;
02823          }
02824       }
02825    }
02826 
02827    // Set group
02828    fGroup = gEnv->GetValue("ProofServ.ProofGroup", "default");
02829 
02830    // Check and make sure "cache" directory exists
02831    fCacheDir = gEnv->GetValue("ProofServ.CacheDir",
02832                                TString::Format("%s/%s", fWorkDir.Data(), kPROOF_CacheDir));
02833    ResolveKeywords(fCacheDir);
02834    if (gSystem->AccessPathName(fCacheDir))
02835       gSystem->mkdir(fCacheDir, kTRUE);
02836    if (gProofDebugLevel > 0)
02837       Info("SetupCommon", "cache directory set to %s", fCacheDir.Data());
02838    fCacheLock =
02839       new TProofLockPath(TString::Format("%s/%s%s",
02840                          gSystem->TempDirectory(), kPROOF_CacheLockFile,
02841                          TString(fCacheDir).ReplaceAll("/","%").Data()));
02842 
02843    // Check and make sure "packages" directory exists
02844    fPackageDir = gEnv->GetValue("ProofServ.PackageDir",
02845                                  TString::Format("%s/%s", fWorkDir.Data(), kPROOF_PackDir));
02846    ResolveKeywords(fPackageDir);
02847    if (gSystem->AccessPathName(fPackageDir))
02848       gSystem->mkdir(fPackageDir, kTRUE);
02849    if (gProofDebugLevel > 0)
02850       Info("SetupCommon", "package directory set to %s", fPackageDir.Data());
02851    fPackageLock =
02852       new TProofLockPath(TString::Format("%s/%s%s",
02853                          gSystem->TempDirectory(), kPROOF_PackageLockFile,
02854                          TString(fPackageDir).ReplaceAll("/","%").Data()));
02855 
02856    // Check and make sure "data" directory exists
02857    fDataDir = gEnv->GetValue("ProofServ.DataDir","");
02858    if (fDataDir.IsNull()) {
02859       // Use default
02860       fDataDir.Form("%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
02861    }
02862    ResolveKeywords(fDataDir);
02863    if (gSystem->AccessPathName(fDataDir))
02864       gSystem->mkdir(fDataDir, kTRUE);
02865    if (gProofDebugLevel > 0)
02866       Info("SetupCommon", "data directory set to %s", fDataDir.Data());
02867 
02868    // List of directories where to look for global packages
02869    TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
02870    if (globpack.Length() > 0) {
02871       Int_t ng = 0;
02872       Int_t from = 0;
02873       TString ldir;
02874       while (globpack.Tokenize(ldir, from, ":")) {
02875          if (gSystem->AccessPathName(ldir, kReadPermission)) {
02876             Warning("SetupCommon", "directory for global packages %s does not"
02877                              " exist or is not readable", ldir.Data());
02878          } else {
02879             // Add to the list, key will be "G<ng>", i.e. "G0", "G1", ...
02880             TString key;
02881             key.Form("G%d", ng++);
02882             if (!fGlobalPackageDirList) {
02883                fGlobalPackageDirList = new THashList();
02884                fGlobalPackageDirList->SetOwner();
02885             }
02886             fGlobalPackageDirList->Add(new TNamed(key,ldir));
02887             Info("SetupCommon", "directory for global packages %s added to the list",
02888                           ldir.Data());
02889             FlushLogFile();
02890          }
02891       }
02892    }
02893 
02894    // Check the session dir
02895    if (fSessionDir != gSystem->WorkingDirectory()) {
02896       ResolveKeywords(fSessionDir);
02897       if (gSystem->AccessPathName(fSessionDir))
02898          gSystem->mkdir(fSessionDir, kTRUE);
02899       if (!gSystem->ChangeDirectory(fSessionDir)) {
02900          Error("SetupCommon", "can not change to working directory %s",
02901                               fSessionDir.Data());
02902          return -1;
02903       }
02904    }
02905    gSystem->Setenv("PROOF_SANDBOX", fSessionDir);
02906    if (gProofDebugLevel > 0)
02907       Info("SetupCommon", "session dir is %s", fSessionDir.Data());
02908 
02909    // On masters, check and make sure that "queries" and "datasets"
02910    // directories exist
02911    if (IsMaster()) {
02912 
02913       // Make sure that the 'queries' dir exist
02914       fQueryDir = fWorkDir;
02915       fQueryDir += TString("/") + kPROOF_QueryDir;
02916       ResolveKeywords(fQueryDir);
02917       if (gSystem->AccessPathName(fQueryDir))
02918          gSystem->mkdir(fQueryDir, kTRUE);
02919       fQueryDir += TString("/session-") + fTopSessionTag;
02920       if (gSystem->AccessPathName(fQueryDir))
02921          gSystem->mkdir(fQueryDir, kTRUE);
02922       if (gProofDebugLevel > 0)
02923          Info("SetupCommon", "queries dir is %s", fQueryDir.Data());
02924 
02925       // Create 'queries' locker instance and lock it
02926       fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s",
02927                        gSystem->TempDirectory(),
02928                        kPROOF_QueryLockFile, fTopSessionTag.Data(),
02929                        TString(fQueryDir).ReplaceAll("/","%").Data()));
02930       fQueryLock->Lock();
02931       // Create the query manager
02932       fQMgr = new TQueryResultManager(fQueryDir, fSessionTag, fSessionDir,
02933                                       fQueryLock, 0);
02934    }
02935 
02936    // Server image
02937    fImage = gEnv->GetValue("ProofServ.Image", "");
02938 
02939    // Get the group priority
02940    if (IsMaster()) {
02941       // Send session tag to client
02942       TMessage m(kPROOF_SESSIONTAG);
02943       m << fTopSessionTag;
02944       if (GetProtocol() > 24) m << fGroup;
02945       fSocket->Send(m);
02946       // Group priority
02947       fGroupPriority = GetPriority();
02948       // Dataset manager instance via plug-in
02949       TPluginHandler *h = 0;
02950       TString dsms = gEnv->GetValue("Proof.DataSetManager", "");
02951       if (!dsms.IsNull()) {
02952          TString dsm;
02953          Int_t from  = 0;
02954          dsms.Tokenize(dsm, from, ",");
02955          // Get plugin manager to load the appropriate TDataSetManager
02956          if (gROOT->GetPluginManager()) {
02957             // Find the appropriate handler
02958             h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
02959             if (h && h->LoadPlugin() != -1) {
02960                // make instance of the dataset manager
02961                fDataSetManager =
02962                   reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, fGroup.Data(),
02963                                                           fUser.Data(), dsm.Data()));
02964             }
02965          }
02966          // Check the result of the dataset manager initialization
02967          if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
02968             Warning("SetupCommon", "dataset manager plug-in initialization failed");
02969             SendAsynMessage("TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
02970             SafeDelete(fDataSetManager);
02971          }
02972       } else {
02973          // Initialize the default dataset manager
02974          TString opts("Av:");
02975          TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
02976          if (dsetdir.IsNull()) {
02977             // Use the default in the sandbox
02978             dsetdir.Form("%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
02979             if (gSystem->AccessPathName(fDataSetDir))
02980                gSystem->MakeDirectory(fDataSetDir);
02981             opts += "Sb:";
02982          }
02983          // Find the appropriate handler
02984          if (!h) {
02985             h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
02986             if (h && h->LoadPlugin() == -1) h = 0;
02987          }
02988          if (h) {
02989             // make instance of the dataset manager
02990             TString oo = TString::Format("dir:%s opt:%s", dsetdir.Data(), opts.Data());
02991             fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
02992                               fGroup.Data(), fUser.Data(), oo.Data()));
02993          }
02994          if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
02995             Warning("SetupCommon", "default dataset manager plug-in initialization failed");
02996             SafeDelete(fDataSetManager);
02997          }
02998       }
02999    }
03000 
03001    // Quotas
03002    TString quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotas.%s", fUser.Data()),"");
03003    if (quotas.IsNull())
03004       quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotasByGroup.%s", fGroup.Data()),"");
03005    if (quotas.IsNull())
03006       quotas = gEnv->GetValue("ProofServ.UserQuotas", "");
03007    if (!quotas.IsNull()) {
03008       // Parse it; format ("maxquerykept:10 hwmsz:800m maxsz:1g")
03009       TString tok;
03010       Ssiz_t from = 0;
03011       while (quotas.Tokenize(tok, from, " ")) {
03012          // Set max number of query results to keep
03013          if (tok.BeginsWith("maxquerykept=")) {
03014             tok.ReplaceAll("maxquerykept=","");
03015             if (tok.IsDigit())
03016                fMaxQueries = tok.Atoi();
03017             else
03018                Info("SetupCommon",
03019                     "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
03020          }
03021          // Set High-Water-Mark or max on the sandbox size
03022          const char *ksz[2] = {"hwmsz=", "maxsz="};
03023          for (Int_t j = 0; j < 2; j++) {
03024             if (tok.BeginsWith(ksz[j])) {
03025                tok.ReplaceAll(ksz[j],"");
03026                Long64_t fact = -1;
03027                if (!tok.IsDigit()) {
03028                   // Parse (k, m, g)
03029                   tok.ToLower();
03030                   const char *s[3] = {"k", "m", "g"};
03031                   Int_t i = 0, k = 1024;
03032                   while (fact < 0) {
03033                      if (tok.EndsWith(s[i]))
03034                         fact = k;
03035                      else
03036                         k *= 1024;
03037                   }
03038                   tok.Remove(tok.Length()-1);
03039                }
03040                if (tok.IsDigit()) {
03041                   if (j == 0)
03042                      fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
03043                   else
03044                      fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
03045                } else {
03046                   TString ssz(ksz[j], strlen(ksz[j])-1);
03047                   Info("SetupCommon", "parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
03048                }
03049             }
03050          }
03051       }
03052    }
03053 
03054    // Apply quotas, if any
03055    if (IsMaster() && fQMgr)
03056       if (fQMgr->ApplyMaxQueries(fMaxQueries) != 0)
03057          Warning("SetupCommon", "problems applying fMaxQueries");
03058 
03059    // Send "ROOTversion|ArchCompiler" flag
03060    if (fProtocol > 12) {
03061       TString vac = gROOT->GetVersion();
03062       if (gROOT->GetSvnRevision() > 0)
03063          vac += TString::Format(":r%d", gROOT->GetSvnRevision());
03064       TString rtag = gEnv->GetValue("ProofServ.RootVersionTag", "");
03065       if (rtag.Length() > 0)
03066          vac += TString::Format(":%s", rtag.Data());
03067       vac += TString::Format("|%s-%s",gSystem->GetBuildArch(), gSystem->GetBuildCompilerVersion());
03068       TMessage m(kPROOF_VERSARCHCOMP);
03069       m << vac;
03070       fSocket->Send(m);
03071    }
03072 
03073    // Set user vars in TProof
03074    TString all_vars(gSystem->Getenv("PROOF_ALLVARS"));
03075    TString name;
03076    Int_t from = 0;
03077    while (all_vars.Tokenize(name, from, ",")) {
03078       if (!name.IsNull()) {
03079          TString value = gSystem->Getenv(name);
03080          TProof::AddEnvVar(name, value);
03081       }
03082    }
03083 
03084    if (fgLogToSysLog > 0) {
03085       // Set the syslog entity (all the information is available now)
03086       if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
03087          fgSysLogEntity.Form("%s:%s", fUser.Data(), fGroup.Data());
03088       } else if (!(fUser.IsNull()) && fGroup.IsNull()) {
03089          fgSysLogEntity.Form("%s:default", fUser.Data());
03090       } else if (fUser.IsNull() && !(fGroup.IsNull())) {
03091          fgSysLogEntity.Form("undef:%s", fGroup.Data());
03092       }
03093       // Log the beginning of this session
03094       TString s;
03095       s.Form("%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
03096       gSystem->Syslog(kLogNotice, s.Data());
03097    }
03098 
03099    if (gProofDebugLevel > 0)
03100       Info("SetupCommon", "successfully completed");
03101 
03102    // Done
03103    return 0;
03104 }
03105 
03106 //______________________________________________________________________________
03107 void TProofServ::Terminate(Int_t status)
03108 {
03109    // Terminate the proof server.
03110 
03111    if (fgLogToSysLog > 0) {
03112       TString s;
03113       s.Form("%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
03114       gSystem->Syslog(kLogNotice, s.Data());
03115    }
03116 
03117    // Notify the memory footprint
03118    ProcInfo_t pi;
03119    if (!gSystem->GetProcInfo(&pi)){
03120       Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
03121                         pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
03122    }
03123 
03124    // Cleanup session directory
03125    if (status == 0) {
03126       // make sure we remain in a "connected" directory
03127       gSystem->ChangeDirectory("/");
03128       // needed in case fSessionDir is on NFS ?!
03129       gSystem->MakeDirectory(fSessionDir+"/.delete");
03130       gSystem->Exec(TString::Format("%s %s", kRM, fSessionDir.Data()));
03131    }
03132 
03133    // Cleanup queries directory if empty
03134    if (IsMaster()) {
03135       if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
03136          // make sure we remain in a "connected" directory
03137          gSystem->ChangeDirectory("/");
03138          // needed in case fQueryDir is on NFS ?!
03139          gSystem->MakeDirectory(fQueryDir+"/.delete");
03140          gSystem->Exec(TString::Format("%s %s", kRM, fQueryDir.Data()));
03141          // Remove lock file
03142          if (fQueryLock)
03143             gSystem->Unlink(fQueryLock->GetName());
03144       }
03145 
03146       // Unlock the query dir owned by this session
03147       if (fQueryLock)
03148          fQueryLock->Unlock();
03149    }
03150 
03151    // Cleanup data directory if empty
03152    if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
03153      if (UnlinkDataDir(fDataDir))
03154         Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
03155    }
03156 
03157    // Remove input handler to avoid spurious signals in socket
03158    // selection for closing activities executed upon exit()
03159    TIter next(gSystem->GetListOfFileHandlers());
03160    TObject *fh = 0;
03161    while ((fh = next())) {
03162       TProofServInputHandler *ih = dynamic_cast<TProofServInputHandler *>(fh);
03163       if (ih)
03164          gSystem->RemoveFileHandler(ih);
03165    }
03166 
03167    // Stop processing events
03168    gSystem->ExitLoop();
03169 
03170    // Exit() is called in pmain
03171 }
03172 
03173 //______________________________________________________________________________
03174 Bool_t TProofServ::UnlinkDataDir(const char *path)
03175 {
03176    // Scan recursively the datadir and unlink it if empty
03177    // Return kTRUE if it can be unlinked, kFALSE otherwise
03178 
03179    if (!path || strlen(path) <= 0) return kFALSE;
03180 
03181    Bool_t dorm = kTRUE;
03182    void *dirp = gSystem->OpenDirectory(path);
03183    if (dirp) {
03184       TString fpath;
03185       const char *ent = 0;
03186       while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
03187          if (!strcmp(ent, ".") || !strcmp(ent, "..")) continue;
03188          fpath.Form("%s/%s", path, ent);
03189          FileStat_t st;
03190          if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
03191             dorm = UnlinkDataDir(fpath);
03192          } else {
03193             dorm = kFALSE;
03194          }
03195       }
03196    } else {
03197       // Cannot open the directory
03198       dorm = kFALSE;
03199    }
03200 
03201     // Do remove, if required
03202    if (dorm && gSystem->Unlink(path) != 0)
03203       Warning("UnlinkDataDir", "data directory '%s' is empty but could not be removed", path);
03204    // done
03205    return dorm;
03206 }
03207 
03208 //______________________________________________________________________________
03209 Bool_t TProofServ::IsActive()
03210 {
03211    // Static function that returns kTRUE in case we are a PROOF server.
03212 
03213    return gProofServ ? kTRUE : kFALSE;
03214 }
03215 
03216 //______________________________________________________________________________
03217 TProofServ *TProofServ::This()
03218 {
03219    // Static function returning pointer to global object gProofServ.
03220    // Mainly for use via CINT, where the gProofServ symbol might be
03221    // deleted from the symbol table.
03222 
03223    return gProofServ;
03224 }
03225 
03226 //______________________________________________________________________________
03227 Int_t TProofServ::OldAuthSetup(TString &conf)
03228 {
03229    // Setup authentication related stuff for old versions.
03230    // Provided for backward compatibility.
03231 
03232    OldProofServAuthSetup_t oldAuthSetupHook = 0;
03233 
03234    if (!oldAuthSetupHook) {
03235       // Load libraries needed for (server) authentication ...
03236       TString authlib = "libRootAuth";
03237       char *p = 0;
03238       // The generic one
03239       if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
03240          delete[] p;
03241          if (gSystem->Load(authlib) == -1) {
03242             Error("OldAuthSetup", "can't load %s",authlib.Data());
03243             return kFALSE;
03244          }
03245       } else {
03246          Error("OldAuthSetup", "can't locate %s",authlib.Data());
03247          return -1;
03248       }
03249       //
03250       // Locate OldProofServAuthSetup
03251       Func_t f = gSystem->DynFindSymbol(authlib,"OldProofServAuthSetup");
03252       if (f)
03253          oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
03254       else {
03255          Error("OldAuthSetup", "can't find OldProofServAuthSetup");
03256          return -1;
03257       }
03258    }
03259    //
03260    // Setup
03261    return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
03262                               fUser, fOrdinal, conf);
03263 }
03264 
03265 //______________________________________________________________________________
03266 TProofQueryResult *TProofServ::MakeQueryResult(Long64_t nent,
03267                                                const char *opt,
03268                                                TList *inlist, Long64_t fst,
03269                                                TDSet *dset, const char *selec,
03270                                                TObject *elist)
03271 {
03272    // Create a TProofQueryResult instance for this query.
03273 
03274    // Increment sequential number
03275    Int_t seqnum = -1;
03276    if (fQMgr) {
03277       fQMgr->IncrementSeqNum();
03278       seqnum = fQMgr->SeqNum();
03279    }
03280 
03281    // Locally we always use the current streamer
03282    Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
03283    if (olds)
03284       dset->SetWriteV3(kFALSE);
03285 
03286    // Create the instance and add it to the list
03287    TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt, inlist, nent,
03288                                                   fst, dset, selec, elist);
03289    // Title is the session identifier
03290    pqr->SetTitle(gSystem->BaseName(fQueryDir));
03291 
03292    // Restore old streamer info
03293    if (olds)
03294       dset->SetWriteV3(kTRUE);
03295 
03296    return pqr;
03297 }
03298 
03299 //______________________________________________________________________________
03300 void TProofServ::SetQueryRunning(TProofQueryResult *pq)
03301 {
03302    // Set query in running state.
03303 
03304    // Record current position in the log file at start
03305    fflush(stdout);
03306    Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
03307 
03308    // Add some header to logs
03309    Printf(" ");
03310    Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
03311 
03312    // Build the list of loaded PAR packages
03313    TString parlist = "";
03314    TIter nxp(fEnabledPackages);
03315    TObjString *os= 0;
03316    while ((os = (TObjString *)nxp())) {
03317       if (parlist.Length() <= 0)
03318          parlist = os->GetName();
03319       else
03320          parlist += TString::Format(";%s",os->GetName());
03321    }
03322 
03323    if (fProof) {
03324       // Set in running state
03325       pq->SetRunning(startlog, parlist, fProof->GetParallel());
03326 
03327       // Bytes and CPU at start (we will calculate the differential at end)
03328       pq->SetProcessInfo(pq->GetEntries(),
03329                         fProof->GetCpuTime(), fProof->GetBytesRead());
03330    } else {
03331       // Set in running state
03332       pq->SetRunning(startlog, parlist, -1);
03333 
03334       // Bytes and CPU at start (we will calculate the differential at end)
03335       pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
03336    }
03337 }
03338 
03339 //______________________________________________________________________________
03340 void TProofServ::HandleArchive(TMessage *mess, TString *slb)
03341 {
03342    // Handle archive request.
03343 
03344    PDB(kGlobal, 1)
03345       Info("HandleArchive", "Enter");
03346 
03347    TString queryref;
03348    TString path;
03349    (*mess) >> queryref >> path;
03350 
03351    if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
03352 
03353    // If this is a set default action just save the default
03354    if (queryref == "Default") {
03355       fArchivePath = path;
03356       Info("HandleArchive",
03357            "default path set to %s", fArchivePath.Data());
03358       return;
03359    }
03360 
03361    Int_t qry = -1;
03362    TString qdir;
03363    TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
03364    TProofQueryResult *pqm = pqr;
03365 
03366    if (path.Length() <= 0) {
03367       if (fArchivePath.Length() <= 0) {
03368          Info("HandleArchive",
03369               "archive paths are not defined - do nothing");
03370          return;
03371       }
03372       if (qry > 0) {
03373          path.Form("%s/session-%s-%d.root",
03374                    fArchivePath.Data(), fTopSessionTag.Data(), qry);
03375       } else {
03376          path = queryref;
03377          path.ReplaceAll(":q","-");
03378          path.Insert(0, TString::Format("%s/",fArchivePath.Data()));
03379          path += ".root";
03380       }
03381    }
03382 
03383    // Build file name for specific query
03384    if (!pqr || qry < 0) {
03385       TString fout = qdir;
03386       fout += "/query-result.root";
03387 
03388       TFile *f = TFile::Open(fout,"READ");
03389       pqr = 0;
03390       if (f) {
03391          f->ReadKeys();
03392          TIter nxk(f->GetListOfKeys());
03393          TKey *k =  0;
03394          while ((k = (TKey *)nxk())) {
03395             if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
03396                pqr = (TProofQueryResult *) f->Get(k->GetName());
03397                if (pqr)
03398                   break;
03399             }
03400          }
03401          f->Close();
03402          delete f;
03403       } else {
03404          Info("HandleArchive",
03405               "file cannot be open (%s)",fout.Data());
03406          return;
03407       }
03408    }
03409 
03410    if (pqr) {
03411 
03412       PDB(kGlobal, 1) Info("HandleArchive",
03413                            "archive path for query #%d: %s",
03414                            qry, path.Data());
03415       TFile *farc = 0;
03416       if (gSystem->AccessPathName(path))
03417          farc = TFile::Open(path,"NEW");
03418       else
03419          farc = TFile::Open(path,"UPDATE");
03420       if (!farc || !(farc->IsOpen())) {
03421          Info("HandleArchive",
03422               "archive file cannot be open (%s)",path.Data());
03423          return;
03424       }
03425       farc->cd();
03426 
03427       // Update query status
03428       pqr->SetArchived(path);
03429       if (pqm)
03430          pqm->SetArchived(path);
03431 
03432       // Write to file
03433       pqr->Write();
03434 
03435       // Update temporary files too
03436       if (qry > -1 && fQMgr)
03437          fQMgr->SaveQuery(pqr);
03438 
03439       // Notify
03440       Info("HandleArchive",
03441            "results of query %s archived to file %s",
03442            queryref.Data(), path.Data());
03443    }
03444 
03445    // Done
03446    return;
03447 }
03448 
03449 //______________________________________________________________________________
03450 void TProofServ::HandleProcess(TMessage *mess, TString *slb)
03451 {
03452    // Handle processing request.
03453 
03454    PDB(kGlobal, 1)
03455       Info("HandleProcess", "Enter");
03456 
03457    // Nothing to do for slaves if we are not idle
03458    if (!IsTopMaster() && !IsIdle())
03459       return;
03460 
03461    TDSet *dset;
03462    TString filename, opt;
03463    TList *input;
03464    Long64_t nentries, first;
03465    TEventList *evl = 0;
03466    TEntryList *enl = 0;
03467    Bool_t sync;
03468 
03469    (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
03470    // Get entry list information, if any (support started with fProtocol == 15)
03471    if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
03472       (*mess) >> enl;
03473    Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
03474 
03475    // Priority to the entry list
03476    TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
03477    if (enl && evl)
03478       // Cannot specify both at the same time
03479       SafeDelete(evl);
03480    if ((!hasNoData) && elist)
03481       dset->SetEntryList(elist);
03482 
03483    if (IsTopMaster()) {
03484 
03485       // Make sure the dataset contains the information needed
03486       if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
03487          TString emsg;
03488          if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
03489             SendAsynMessage(TString::Format("AssertDataSet on %s: %s",
03490                                  fPrefix.Data(), emsg.Data()));
03491             Error("HandleProcess", "AssertDataSet: %s", emsg.Data());
03492             // To terminate collection
03493             if (sync) SendLogFile();
03494             return;
03495          }
03496       }
03497 
03498       TProofQueryResult *pq = 0;
03499 
03500       // Create instance of query results; we set ownership of the input list
03501       // to the TQueryResult object, to avoid too many instantiations
03502       pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
03503 
03504       // Prepare the input list and transfer it into the TQueryResult object
03505       if (dset) input->Add(dset);
03506       if (elist) input->Add(elist);
03507       pq->SetInputList(input, kTRUE);
03508 
03509       // Clear the list
03510       input->Clear("nodelete");
03511       SafeDelete(input);
03512 
03513       // Save input data, if any
03514       TString emsg;
03515       if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
03516          Warning("HandleProcess", "could not save input data: %s", emsg.Data());
03517 
03518       // If not a draw action add the query to the main list
03519       if (!(pq->IsDraw())) {
03520          if (fQMgr) {
03521             if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
03522             // Also save it to queries dir
03523             fQMgr->SaveQuery(pq);
03524          }
03525       }
03526 
03527       // Add anyhow to the waiting lists
03528       QueueQuery(pq);
03529 
03530       // Call get Workers
03531       // if we are not idle the scheduler will just enqueue the query and
03532       // send a resume message later.
03533 
03534       Bool_t enqueued = kFALSE;
03535       Int_t pc = 0;
03536       // if the session does not have workers and is in the dynamic mode
03537       if (fProof->UseDynamicStartup()) {
03538          // get the a list of workers and start them
03539          TList* workerList = new TList();
03540          EQueryAction retVal = GetWorkers(workerList, pc);
03541          if (retVal == TProofServ::kQueryStop) {
03542             Error("HandleProcess", "error getting list of worker nodes");
03543             // To terminate collection
03544             if (sync) SendLogFile();
03545             return;
03546          } else if (retVal == TProofServ::kQueryEnqueued) {
03547             // change to an asynchronous query
03548             enqueued = kTRUE;
03549             Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
03550          } else if (Int_t ret = fProof->AddWorkers(workerList) < 0) {
03551             Error("HandleProcess", "Adding a list of worker nodes returned: %d",
03552                   ret);
03553             // To terminate collection
03554             if (sync) SendLogFile();
03555             return;
03556          }
03557       } else {
03558          EQueryAction retVal = GetWorkers(0, pc);
03559          if (retVal == TProofServ::kQueryStop) {
03560             Error("HandleProcess", "error getting list of worker nodes");
03561             // To terminate collection
03562             if (sync) SendLogFile();
03563             return;
03564          } else if (retVal == TProofServ::kQueryEnqueued) {
03565             // change to an asynchronous query
03566             enqueued = kTRUE;
03567             Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
03568          } else if (retVal != TProofServ::kQueryOK) {
03569             Error("HandleProcess", "unknown return value: %d", retVal);
03570             // To terminate collection
03571             if (sync) SendLogFile();
03572             return;
03573          }
03574       }
03575 
03576       // If the client submission was asynchronous, signal the submission of
03577       // the query and communicate the assigned sequential number for later
03578       // identification
03579       TMessage m(kPROOF_QUERYSUBMITTED);
03580       if (!sync || enqueued) {
03581          m << pq->GetSeqNum() << kFALSE;
03582          fSocket->Send(m);
03583       }
03584 
03585       // Nothing more to do if we are not idle
03586       if (!IsIdle()) {
03587          // Notify submission
03588          Info("HandleProcess",
03589               "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
03590          return;
03591       }
03592 
03593       // Process
03594       // in the static mode, if a session is enqueued it will be processed after current query
03595       // (there is no way to enqueue if idle).
03596       // in the dynamic mode we will process here only if the session was idle and got workers!
03597       Bool_t doprocess = kFALSE;
03598       while (WaitingQueries() > 0 && !enqueued) {
03599          doprocess = kTRUE;
03600          //
03601          ProcessNext(slb);
03602          // avoid processing async queries sent during processing in dyn mode
03603          if (fProof->UseDynamicStartup())
03604             enqueued = kTRUE;
03605 
03606       } // Loop on submitted queries
03607 
03608       // Set idle
03609       SetIdle(kTRUE);
03610 
03611       // Reset mergers
03612       fProof->ResetMergers();
03613 
03614       // kPROOF_SETIDLE sets the client to idle; in asynchronous mode clients monitor
03615       // TProof::IsIdle for to check the readiness of a query, so we need to send this
03616       // before to be sure thatn everything about a query is received by the client
03617       if (!sync) SendLogFile();
03618 
03619       // Signal the client that we are idle
03620       if (doprocess) {
03621          m.Reset(kPROOF_SETIDLE);
03622          Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
03623          m << waiting;
03624          fSocket->Send(m);
03625       }
03626 
03627       // In synchronous mode TProof::Collect is terminated by the reception of the
03628       // log file and subsequent submissions are controlled by TProof::IsIdle(), so
03629       // this must be last one to be sent
03630       if (sync) SendLogFile();
03631 
03632       // Set idle
03633       SetIdle(kTRUE);
03634 
03635    } else {
03636 
03637       // Set not idle
03638       SetIdle(kFALSE);
03639 
03640       // Cleanup the player
03641       Bool_t deleteplayer = kTRUE;
03642       MakePlayer();
03643 
03644       // Setup data set
03645       if (dset && (dset->IsA() == TDSetProxy::Class()))
03646          ((TDSetProxy*)dset)->SetProofServ(this);
03647 
03648       // Get input data, if any
03649       TString emsg;
03650       if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
03651          Warning("HandleProcess", "could not get input data: %s", emsg.Data());
03652 
03653       // Get query sequential number
03654       if (TProof::GetParameter(input, "PROOF_QuerySeqNum", fQuerySeqNum) != 0)
03655          Warning("HandleProcess", "could not get query sequential number!");
03656 
03657       // Make the ordinal number available in the selector
03658       TObject *nord = 0;
03659       while ((nord = input->FindObject("PROOF_Ordinal")))
03660          input->Remove(nord);
03661       input->Add(new TNamed("PROOF_Ordinal", GetOrdinal()));
03662 
03663       // Set input
03664       TIter next(input);
03665       TObject *o = 0;
03666       while ((o = next())) {
03667          PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
03668          fPlayer->AddInput(o);
03669       }
03670 
03671       // Signal the master that we are starting processing
03672       fSocket->Send(kPROOF_STARTPROCESS);
03673 
03674       // Process
03675       PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
03676       fPlayer->Process(dset, filename, opt, nentries, first);
03677 
03678       // Return number of events processed
03679       TMessage m(kPROOF_STOPPROCESS);
03680       Bool_t abort = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted) ? kFALSE : kTRUE;
03681       if (fProtocol > 18) {
03682          TProofProgressStatus* status =
03683             new TProofProgressStatus(fPlayer->GetEventsProcessed(),
03684                                     gPerfStats?gPerfStats->GetBytesRead():0);
03685          if (status)
03686             m << status << abort;
03687          if (slb)
03688             slb->Form("%d %lld %lld", fPlayer->GetExitStatus(),
03689                                       status->GetEntries(), status->GetBytesRead());
03690          SafeDelete(status);
03691       } else {
03692          m << fPlayer->GetEventsProcessed() << abort;
03693          if (slb)
03694             slb->Form("%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
03695       }
03696 
03697       fSocket->Send(m);
03698       PDB(kGlobal, 2)
03699          Info("TProofServ::Handleprocess",
03700               "worker %s has finished processing with %d objects in output list",
03701               GetOrdinal(), fPlayer->GetOutputList()->GetEntries());
03702 
03703       // Cleanup the input data set info
03704       SafeDelete(dset);
03705       SafeDelete(enl);
03706       SafeDelete(evl);
03707 
03708       // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
03709       Bool_t isInMergingMode = kFALSE;
03710       if (!(TestBit(TProofServ::kHighMemory))) {
03711          Int_t nm = 0;
03712          if (TProof::GetParameter(input, "PROOF_UseMergers", nm) == 0) {
03713             isInMergingMode = (nm >= 0) ? kTRUE : kFALSE;
03714          }
03715       }
03716       PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isInMergingMode);
03717 
03718       if (!IsMaster() && isInMergingMode &&
03719           fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
03720          // Worker in merging mode.
03721          //----------------------------
03722          // First, it reports only the size of its output to the master
03723          // + port on which it can possibly accept outputs from other workers if it becomes a merger
03724          // Master will later tell it where it should send the output (either to the master or to some merger)
03725          // or if it should become a merger
03726 
03727          TMessage msg_osize(kPROOF_SUBMERGER);
03728          msg_osize << Int_t(TProof::kOutputSize);
03729          msg_osize << fPlayer->GetOutputList()->GetEntries();
03730 
03731          fMergingSocket = new TServerSocket(0);
03732          Int_t merge_port = 0;
03733          if (fMergingSocket) {
03734             PDB(kGlobal, 2)
03735                Info("HandleProcess", "possible port for merging connections: %d",
03736                                      fMergingSocket->GetLocalPort());
03737             merge_port = fMergingSocket->GetLocalPort();
03738          }
03739          msg_osize << merge_port;
03740          fSocket->Send(msg_osize);
03741 
03742          // Set idle
03743          SetIdle(kTRUE);
03744 
03745          // Do not cleanup the player yet: it will be used in sub-merging activities
03746          deleteplayer = kFALSE;
03747 
03748          PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
03749 
03750       } else {
03751          // Sub-master OR worker not in merging mode
03752          // ---------------------------------------------
03753          if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
03754             PDB(kGlobal, 2)  Info("HandleProcess", "sending result directly to master");
03755             if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
03756                Warning("HandleProcess","problems sending output list");
03757          } else {
03758             if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
03759                Warning("HandleProcess","the output list is empty!");
03760             if (SendResults(fSocket) != 0)
03761                Warning("HandleProcess", "problems sending output list");
03762          }
03763 
03764          // Masters reset the mergers, if any
03765          if (IsMaster()) fProof->ResetMergers();
03766 
03767          // Signal the master that we are idle
03768          fSocket->Send(kPROOF_SETIDLE);
03769 
03770          // Set idle
03771          SetIdle(kTRUE);
03772 
03773          // Notify the user
03774          SendLogFile();
03775       }
03776       // Make also sure the input list objects are deleted
03777       fPlayer->GetInputList()->SetOwner(0);
03778       input->SetOwner();
03779       SafeDelete(input);
03780 
03781       // Cleanup if required
03782       if (deleteplayer) DeletePlayer();
03783    }
03784 
03785    PDB(kGlobal, 1) Info("HandleProcess", "done");
03786 
03787    // Done
03788    return;
03789 }
03790 
03791 //______________________________________________________________________________
03792 Int_t TProofServ::SendResults(TSocket *sock, TList *outlist, TQueryResult *pq)
03793 {
03794    // Sends all objects from the given list to the specified socket
03795 
03796    PDB(kOutput, 2) Info("SendResults", "enter");
03797 
03798    TString msg;
03799    if (fProtocol > 23 && outlist) {
03800       // Send objects in bunches of max fMsgSizeHWM bytes to optimize transfer
03801       // Objects are merged one-by-one by the client
03802       // Messages for objects
03803       TMessage mbuf(kPROOF_OUTPUTOBJECT);
03804       // Objects in the output list
03805       Int_t olsz = outlist->GetSize();
03806       if (IsTopMaster() && pq) {
03807          msg.Form("%s: merging output objects ... done                                     ",
03808                        fPrefix.Data());
03809          SendAsynMessage(msg.Data());
03810          // Message for the client
03811          msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
03812          SendAsynMessage(msg.Data(), kFALSE);
03813          // Send light query info
03814          mbuf << (Int_t) 0;
03815          mbuf.WriteObject(pq);
03816          if (sock->Send(mbuf) < 0) return -1;
03817       }
03818       // Objects in the output list
03819       Int_t ns = 0, np = 0;
03820       TIter nxo(outlist);
03821       TObject *o = 0;
03822       Int_t totsz = 0, objsz = 0;
03823       mbuf.Reset();
03824       while ((o = nxo())) {
03825          if (mbuf.Length() > fMsgSizeHWM) {
03826             PDB(kOutput, 1)
03827                Info("SendResults",
03828                     "message has %d bytes: limit of %lld bytes reached - sending ...",
03829                     mbuf.Length(), fMsgSizeHWM);
03830             // Compress the message, if required; for these messages we do it already
03831             // here so we get the size; TXSocket does not do it twice.
03832             if (fCompressMsg > 0) {
03833                mbuf.SetCompressionLevel(fCompressMsg);
03834                mbuf.Compress();
03835                objsz = mbuf.CompLength();
03836             } else {
03837                objsz = mbuf.Length();
03838             }
03839             totsz += objsz;
03840             if (IsTopMaster()) {
03841                msg.Form("%s: objects merged; sending obj %d/%d (%d bytes)   ",
03842                               fPrefix.Data(), ns, olsz, objsz);
03843                SendAsynMessage(msg.Data(), kFALSE);
03844             }
03845             if (sock->Send(mbuf) < 0) return -1;
03846             // Reset the message
03847             mbuf.Reset();
03848             np = 0;
03849          }
03850          ns++;
03851          np++;
03852          mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
03853          mbuf << o;
03854       }
03855       if (np > 0) {
03856          // Compress the message, if required; for these messages we do it already
03857          // here so we get the size; TXSocket does not do it twice.
03858          if (fCompressMsg > 0) {
03859             mbuf.SetCompressionLevel(fCompressMsg);
03860             mbuf.Compress();
03861             objsz = mbuf.CompLength();
03862          } else {
03863             objsz = mbuf.Length();
03864          }
03865          totsz += objsz;
03866          if (IsTopMaster()) {
03867             msg.Form("%s: objects merged; sending obj %d/%d (%d bytes)     ",
03868                            fPrefix.Data(), ns, olsz, objsz);
03869             SendAsynMessage(msg.Data(), kFALSE);
03870          }
03871          if (sock->Send(mbuf) < 0) return -1;
03872       }
03873       if (IsTopMaster()) {
03874          // Send total size
03875          msg.Form("%s: grand total: sent %d objects, size: %d bytes                            ",
03876                                         fPrefix.Data(), olsz, totsz);
03877          SendAsynMessage(msg.Data());
03878       }
03879    } else if (fProtocol > 10 && outlist) {
03880 
03881       // Send objects one-by-one to optimize transfer and merging
03882       // Messages for objects
03883       TMessage mbuf(kPROOF_OUTPUTOBJECT);
03884       // Objects in the output list
03885       Int_t olsz = outlist->GetSize();
03886       if (IsTopMaster() && pq) {
03887          msg.Form("%s: merging output objects ... done                                     ",
03888                        fPrefix.Data());
03889          SendAsynMessage(msg.Data());
03890          // Message for the client
03891          msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
03892          SendAsynMessage(msg.Data(), kFALSE);
03893          // Send light query info
03894          mbuf << (Int_t) 0;
03895          mbuf.WriteObject(pq);
03896          if (sock->Send(mbuf) < 0) return -1;
03897       }
03898 
03899       Int_t ns = 0;
03900       Int_t totsz = 0, objsz = 0;
03901       TIter nxo(fPlayer->GetOutputList());
03902       TObject *o = 0;
03903       while ((o = nxo())) {
03904          ns++;
03905          mbuf.Reset();
03906          Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
03907          mbuf << type;
03908          mbuf.WriteObject(o);
03909          // Compress the message, if required; for these messages we do it already
03910          // here so we get the size; TXSocket does not do it twice.
03911          if (fCompressMsg > 0) {
03912             mbuf.SetCompressionLevel(fCompressMsg);
03913             mbuf.Compress();
03914             objsz = mbuf.CompLength();
03915          } else {
03916             objsz = mbuf.Length();
03917          }
03918          totsz += objsz;
03919          if (IsTopMaster()) {
03920             msg.Form("%s: objects merged; sending obj %d/%d (%d bytes)   ",
03921                            fPrefix.Data(), ns, olsz, objsz);
03922             SendAsynMessage(msg.Data(), kFALSE);
03923          }
03924          if (sock->Send(mbuf) < 0) return -1;
03925       }
03926       // Total size
03927       if (IsTopMaster()) {
03928          // Send total size
03929          msg.Form("%s: grand total: sent %d objects, size: %d bytes       ",
03930                                         fPrefix.Data(), olsz, totsz);
03931          SendAsynMessage(msg.Data());
03932       }
03933 
03934    } else if (IsTopMaster() && fProtocol > 6 && outlist) {
03935 
03936       // Buffer to be sent
03937       TMessage mbuf(kPROOF_OUTPUTLIST);
03938       mbuf.WriteObject(pq);
03939       // Sizes
03940       Int_t blen = mbuf.CompLength();
03941       Int_t olsz = outlist->GetSize();
03942       // Message for the client
03943       msg.Form("%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
03944       SendAsynMessage(msg.Data(), kFALSE);
03945       if (sock->Send(mbuf) < 0) return -1;
03946 
03947    } else {
03948       if (outlist) {
03949          PDB(kGlobal, 2) Info("SendResults", "sending output list");
03950       } else {
03951          PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
03952       }
03953       if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
03954    }
03955 
03956    PDB(kOutput,2) Info("SendResults", "done");
03957 
03958    // Done
03959    return 0;
03960 }
03961 
03962 //______________________________________________________________________________
03963 void TProofServ::ProcessNext(TString *slb)
03964 {
03965    // process the next query from the queue of submitted jobs.
03966    // to be called on the top master only.
03967 
03968    TDSet *dset = 0;
03969    TString filename, opt;
03970    TList *input = 0;
03971    Long64_t nentries = -1, first = 0;
03972 
03973    TObject *elist = 0;
03974    TProofQueryResult *pq = 0;
03975 
03976    // Process
03977 
03978    // Get next query info (also removes query from the list)
03979    pq = NextQuery();
03980    if (pq) {
03981 
03982       // Set not idle
03983       SetIdle(kFALSE);
03984       opt      = pq->GetOptions();
03985       input    = pq->GetInputList();
03986       nentries = pq->GetEntries();
03987       first    = pq->GetFirst();
03988       filename = pq->GetSelecImp()->GetName();
03989       Ssiz_t id = opt.Last('#');
03990       if (id != kNPOS && id < opt.Length() - 1) {
03991          filename += opt(id + 1, opt.Length());
03992          // Remove it from 'opt' so user found on the workers what they specified
03993          opt.Remove(id);
03994       }
03995       // Attach to data set and entry- (or event-) list (if any)
03996       TObject *o = 0;
03997       if ((o = pq->GetInputObject("TDSet"))) {
03998          dset = (TDSet *) o;
03999       } else {
04000          // Should never get here
04001          Error("ProcessNext", "no TDset object: cannot continue");
04002          return;
04003       }
04004       elist = 0;
04005       if ((o = pq->GetInputObject("TEntryList")))
04006          elist = o;
04007       else if ((o = pq->GetInputObject("TEventList")))
04008          elist = o;
04009       //
04010       // Expand selector files
04011       if (pq->GetSelecImp()) {
04012          gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecImp()->GetName()));
04013          pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
04014       }
04015       if (pq->GetSelecHdr() &&
04016           !strstr(pq->GetSelecHdr()->GetName(), "TProofDrawHist")) {
04017          gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecHdr()->GetName()));
04018          pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
04019       }
04020    } else {
04021       // Should never get here
04022       Error("ProcessNext", "empty waiting queries list!");
04023       return;
04024    }
04025 
04026    // Set in running state
04027    SetQueryRunning(pq);
04028 
04029    // Save to queries dir, if not standard draw
04030    if (fQMgr) {
04031       if (!(pq->IsDraw()))
04032          fQMgr->SaveQuery(pq);
04033       else
04034          fQMgr->IncrementDrawQueries();
04035       fQMgr->ResetTime();
04036    }
04037 
04038    // Signal the client that we are starting a new query
04039    TMessage m(kPROOF_STARTPROCESS);
04040    m << TString(pq->GetSelecImp()->GetName())
04041      << dset->GetNumOfFiles()
04042      << pq->GetFirst() << pq->GetEntries();
04043    fSocket->Send(m);
04044 
04045    // Create player
04046    MakePlayer();
04047 
04048    // Add query results to the player lists
04049    fPlayer->AddQueryResult(pq);
04050 
04051    // Set query currently processed
04052    fPlayer->SetCurrentQuery(pq);
04053 
04054    // Setup data set
04055    if (dset->IsA() == TDSetProxy::Class())
04056       ((TDSetProxy*)dset)->SetProofServ(this);
04057 
04058    // Add the unique query tag as TNamed object to the input list
04059    // so that it is available in TSelectors for monitoring
04060    TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
04061    input->Add(new TNamed("PROOF_QueryTag", qid.Data()));
04062    //  ... and the sequential number
04063    fQuerySeqNum = pq->GetSeqNum();
04064    input->Add(new TParameter<Int_t>("PROOF_QuerySeqNum", fQuerySeqNum));
04065 
04066    // Check whether we have to enforce the use of submergers, but only if the user did
04067    // not express itself on the subject
04068    if (gEnv->Lookup("Proof.UseMergers") && !input->FindObject("PROOF_UseMergers")) {
04069       Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
04070       if (smg >= 0) {
04071          input->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
04072          PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
04073       }
04074    }
04075 
04076    // Set input
04077    TIter next(input);
04078    TObject *o = 0;
04079    while ((o = next())) {
04080       PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
04081       fPlayer->AddInput(o);
04082    }
04083 
04084    // Remove the list of the missing files from the original list, if any
04085    if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
04086 
04087    // Process
04088    PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
04089    fPlayer->Process(dset, filename, opt, nentries, first);
04090 
04091    // Return number of events processed
04092    if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
04093       Bool_t abort =
04094          (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) ? kTRUE : kFALSE;
04095       m.Reset(kPROOF_STOPPROCESS);
04096       // message sent from worker to the master
04097       if (fProtocol > 18) {
04098          TProofProgressStatus* status = fPlayer->GetProgressStatus();
04099          m << status << abort;
04100          status = 0; // the status belongs to the player.
04101       } else if (fProtocol > 8) {
04102          m << fPlayer->GetEventsProcessed() << abort;
04103       } else {
04104          m << fPlayer->GetEventsProcessed();
04105       }
04106       fSocket->Send(m);
04107    }
04108 
04109    // Register any dataset produced during this processing, if required
04110    if (fDataSetManager && fPlayer->GetOutputList()) {
04111       TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
04112       if (psr) {
04113          if (RegisterDataSets(input, fPlayer->GetOutputList()) != 0)
04114             Warning("ProcessNext", "problems registering produced datasets");
04115          fPlayer->GetOutputList()->Remove(psr);
04116          delete psr;
04117       }
04118    }
04119 
04120    // Complete filling of the TQueryResult instance
04121    if (fQMgr && !pq->IsDraw()) {
04122       fProof->AskStatistics();
04123       if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
04124          fQMgr->SaveQuery(pq, fMaxQueries);
04125    }
04126 
04127    // Send back the results
04128    TQueryResult *pqr = pq->CloneInfo();
04129    // At least the TDSet name in the light object
04130    Info("ProcessNext", "adding info about dataset '%s' in the light query result", dset->GetName());
04131    TList rin;
04132    TDSet *ds = new TDSet(dset->GetName(), dset->GetObjName());
04133    rin.Add(ds);
04134    pqr->SetInputList(&rin, kTRUE);
04135    if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
04136       PDB(kGlobal, 2)
04137          Info("ProcessNext", "sending results");
04138       TQueryResult *xpq = (fProtocol > 10) ? pqr : pq;
04139       if (SendResults(fSocket, fPlayer->GetOutputList(), xpq) != 0)
04140          Warning("ProcessNext", "problems sending output list");
04141       if (slb) slb->Form("%d %lld %lld %.3f", fPlayer->GetExitStatus(), pq->GetEntries(),
04142                                               pq->GetBytes(), pq->GetUsedCPU());
04143    } else {
04144       if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
04145          Warning("ProcessNext","the output list is empty!");
04146       if (SendResults(fSocket) != 0)
04147          Warning("ProcessNext", "problems sending output list");
04148       if (slb) slb->Form("%d -1 -1 %.3f", fPlayer->GetExitStatus(), pq->GetUsedCPU());
04149    }
04150 
04151    // Remove aborted queries from the list
04152    if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
04153       delete pqr;
04154       if (fQMgr) fQMgr->RemoveQuery(pq);
04155    } else {
04156       // Keep in memory only light infor about a query
04157       if (!(pq->IsDraw())) {
04158          if (fQMgr && fQMgr->Queries()) {
04159             fQMgr->Queries()->Add(pqr);
04160             // Remove from the fQueries list
04161             fQMgr->Queries()->Remove(pq);
04162          }
04163          // These removes 'pq' from the internal player list and
04164          // deletes it; in this way we do not attempt a double delete
04165          // when destroying the player
04166          fPlayer->RemoveQueryResult(TString::Format("%s:%s",
04167                                     pq->GetTitle(), pq->GetName()));
04168       }
04169    }
04170 
04171    DeletePlayer();
04172    if (IsMaster() && fProof->UseDynamicStartup())
04173       // stop the workers
04174       fProof->RemoveWorkers(0);
04175 }
04176 
04177 //______________________________________________________________________________
04178 Int_t TProofServ::RegisterDataSets(TList *in, TList *out)
04179 {
04180    // Register TFileCollections in 'out' as datasets according to the rules in 'in'
04181 
04182    PDB(kDataset, 1) Info("RegisterDataSets", "enter");
04183 
04184    if (!in || !out) return 0;
04185 
04186    TString msg;
04187    TIter nxo(out);
04188    TObject *o = 0;
04189    while ((o = nxo())) {
04190       // Only file collections TFileCollection
04191       TFileCollection *ds = dynamic_cast<TFileCollection*> (o);
04192       if (ds) {
04193          // The tag and register option
04194          TNamed *fcn = 0;
04195          TString tag = TString::Format("DATASET_%s", ds->GetName());
04196          if (!(fcn = (TNamed *) out->FindObject(tag))) continue;
04197          // Register option
04198          TString regopt(fcn->GetTitle());
04199          // Register this dataset
04200          if (fDataSetManager) {
04201             if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
04202                // Extract the list
04203                if (ds->GetList()->GetSize() > 0) {
04204                   // Register the dataset (quota checks are done inside here)
04205                   msg.Form("Registering and verifying dataset '%s' ... ", ds->GetName());
04206                   SendAsynMessage(msg.Data(), kFALSE);
04207                   Int_t rc = 0;
04208                   FlushLogFile();
04209                   {  TProofServLogHandlerGuard hg(fLogFile,  fSocket);
04210                      // Always allow verification for this action
04211                      Bool_t allowVerify = fDataSetManager->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
04212                      if (regopt.Contains("V") && !allowVerify)
04213                         fDataSetManager->SetBit(TDataSetManager::kAllowVerify);
04214                      rc = fDataSetManager->RegisterDataSet(ds->GetName(), ds, regopt);
04215                      // Reset to the previous state if needed
04216                      if (regopt.Contains("V") && !allowVerify)
04217                         fDataSetManager->ResetBit(TDataSetManager::kAllowVerify);
04218                   }
04219                   if (rc != 0) {
04220                      Warning("RegisterDataSets",
04221                               "failure registering dataset '%s'", ds->GetName());
04222                      msg.Form("Registering and verifying dataset '%s' ... failed! See log for more details", ds->GetName());
04223                   } else {
04224                      Info("RegisterDataSets", "dataset '%s' successfully registered", ds->GetName());
04225                      msg.Form("Registering and verifying dataset '%s' ... OK", ds->GetName());
04226                   }
04227                   SendAsynMessage(msg.Data(), kTRUE);
04228                   // Notify
04229                   PDB(kDataset, 2) {
04230                      Info("RegisterDataSets","printing collection");
04231                      ds->Print("F");
04232                   }
04233                } else {
04234                   Warning("RegisterDataSets", "collection '%s' is empty", o->GetName());
04235                }
04236             } else {
04237                Info("RegisterDataSets", "dataset registration not allowed");
04238                return -1;
04239             }
04240          } else {
04241             Error("RegisterDataSets", "dataset manager is undefined!");
04242             return -1;
04243          }
04244          // Cleanup temporary stuff
04245          out->Remove(fcn);
04246          SafeDelete(fcn);
04247       }
04248    }
04249 
04250    PDB(kDataset, 1) Info("RegisterDataSets", "exit");
04251    // Done
04252    return 0;
04253 }
04254 
04255 //______________________________________________________________________________
04256 void TProofServ::HandleQueryList(TMessage *mess)
04257 {
04258    // Handle request for list of queries.
04259 
04260    PDB(kGlobal, 1)
04261       Info("HandleQueryList", "Enter");
04262 
04263    Bool_t all;
04264    (*mess) >> all;
04265 
04266    TList *ql = new TList;
04267    Int_t ntot = 0, npre = 0, ndraw= 0;
04268    if (fQMgr) {
04269       if (all) {
04270          // Rescan
04271          TString qdir = fQueryDir;
04272          Int_t idx = qdir.Index("session-");
04273          if (idx != kNPOS)
04274             qdir.Remove(idx);
04275          fQMgr->ScanPreviousQueries(qdir);
04276          // Send also information about previous queries, if any
04277          if (fQMgr->PreviousQueries()) {
04278             TIter nxq(fQMgr->PreviousQueries());
04279             TProofQueryResult *pqr = 0;
04280             while ((pqr = (TProofQueryResult *)nxq())) {
04281                ntot++;
04282                pqr->fSeqNum = ntot;
04283                ql->Add(pqr);
04284             }
04285          }
04286       }
04287 
04288       npre = ntot;
04289       if (fQMgr->Queries()) {
04290          // Add info about queries in this session
04291          TIter nxq(fQMgr->Queries());
04292          TProofQueryResult *pqr = 0;
04293          TQueryResult *pqm = 0;
04294          while ((pqr = (TProofQueryResult *)nxq())) {
04295             ntot++;
04296             pqm = pqr->CloneInfo();
04297             pqm->fSeqNum = ntot;
04298             ql->Add(pqm);
04299          }
04300       }
04301       // Number of draw queries
04302       ndraw = fQMgr->DrawQueries();
04303    }
04304 
04305    TMessage m(kPROOF_QUERYLIST);
04306    m << npre << ndraw << ql;
04307    fSocket->Send(m);
04308    delete ql;
04309 
04310    // Done
04311    return;
04312 }
04313 
04314 //______________________________________________________________________________
04315 void TProofServ::HandleRemove(TMessage *mess, TString *slb)
04316 {
04317    // Handle remove request.
04318 
04319    PDB(kGlobal, 1)
04320       Info("HandleRemove", "Enter");
04321 
04322    TString queryref;
04323    (*mess) >> queryref;
04324 
04325    if (slb) *slb = queryref;
04326 
04327    if (queryref == "cleanupqueue") {
04328       // Remove pending requests
04329       Int_t pend = CleanupWaitingQueries();
04330       // Notify
04331       Info("HandleRemove", "%d queries removed from the waiting list", pend);
04332       // We are done
04333       return;
04334    }
04335 
04336    if (queryref == "cleanupdir") {
04337 
04338       // Cleanup previous sessions results
04339       Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
04340 
04341       // Notify
04342       Info("HandleRemove", "%d directories removed", nd);
04343       // We are done
04344       return;
04345    }
04346 
04347 
04348    if (fQMgr) {
04349       TProofLockPath *lck = 0;
04350       if (fQMgr->LockSession(queryref, &lck) == 0) {
04351 
04352          // Remove query
04353          TList qtorm;
04354          fQMgr->RemoveQuery(queryref, &qtorm);
04355          CleanupWaitingQueries(kFALSE, &qtorm);
04356 
04357          // Unlock and remove the lock file
04358          if (lck) {
04359             gSystem->Unlink(lck->GetName());
04360             SafeDelete(lck);
04361          }
04362 
04363          // We are done
04364          return;
04365       }
04366    } else {
04367       Warning("HandleRemove", "query result manager undefined!");
04368    }
04369 
04370    // Notify failure
04371    Info("HandleRemove",
04372         "query %s could not be removed (unable to lock session)", queryref.Data());
04373 
04374    // Done
04375    return;
04376 }
04377 
04378 //______________________________________________________________________________
04379 void TProofServ::HandleRetrieve(TMessage *mess, TString *slb)
04380 {
04381    // Handle retrieve request.
04382 
04383    PDB(kGlobal, 1)
04384       Info("HandleRetrieve", "Enter");
04385 
04386    TString queryref;
04387    (*mess) >> queryref;
04388 
04389    if (slb) *slb = queryref;
04390 
04391    // Parse reference string
04392    Int_t qry = -1;
04393    TString qdir;
04394    if (fQMgr) fQMgr->LocateQuery(queryref, qry, qdir);
04395 
04396    TString fout = qdir;
04397    fout += "/query-result.root";
04398 
04399    TFile *f = TFile::Open(fout,"READ");
04400    TProofQueryResult *pqr = 0;
04401    if (f) {
04402       f->ReadKeys();
04403       TIter nxk(f->GetListOfKeys());
04404       TKey *k =  0;
04405       while ((k = (TKey *)nxk())) {
04406          if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
04407             pqr = (TProofQueryResult *) f->Get(k->GetName());
04408             // For backward compatibility
04409             if (fProtocol < 13) {
04410                TDSet *d = 0;
04411                TObject *o = 0;
04412                TIter nxi(pqr->GetInputList());
04413                while ((o = nxi()))
04414                   if ((d = dynamic_cast<TDSet *>(o)))
04415                      break;
04416                d->SetWriteV3(kTRUE);
04417             }
04418             if (pqr) {
04419 
04420                // Message for the client
04421                Float_t qsz = (Float_t) f->GetSize();
04422                Int_t ilb = 0;
04423                static const char *clb[4] = { "bytes", "KB", "MB", "GB" };
04424                while (qsz > 1000. && ilb < 3) {
04425                   qsz /= 1000.;
04426                   ilb++;
04427                }
04428                SendAsynMessage(TString::Format("%s: sending result of %s:%s (%.1f %s)",
04429                                                fPrefix.Data(), pqr->GetTitle(), pqr->GetName(),
04430                                                qsz, clb[ilb]));
04431                fSocket->SendObject(pqr, kPROOF_RETRIEVE);
04432             } else {
04433                Info("HandleRetrieve",
04434                     "query not found in file %s",fout.Data());
04435                // Notify not found
04436                fSocket->SendObject(0, kPROOF_RETRIEVE);
04437             }
04438             break;
04439          }
04440       }
04441       f->Close();
04442       delete f;
04443    } else {
04444       Info("HandleRetrieve",
04445            "file cannot be open (%s)",fout.Data());
04446       // Notify not found
04447       fSocket->SendObject(0, kPROOF_RETRIEVE);
04448       return;
04449    }
04450 
04451    // Done
04452    return;
04453 }
04454 
04455 //______________________________________________________________________________
04456 void TProofServ::HandleLibIncPath(TMessage *mess)
04457 {
04458    // Handle lib, inc search paths modification request
04459 
04460    TString type;
04461    Bool_t add;
04462    TString path;
04463    (*mess) >> type >> add >> path;
04464 
04465    // Check type of action
04466    if ((type != "lib") && (type != "inc")) {
04467       Error("HandleLibIncPath","unknown action type: %s", type.Data());
04468       return;
04469    }
04470 
04471    // Separators can be either commas or blanks
04472    path.ReplaceAll(","," ");
04473 
04474    // Decompose lists
04475    TObjArray *op = 0;
04476    if (path.Length() > 0 && path != "-") {
04477       if (!(op = path.Tokenize(" "))) {
04478          Error("HandleLibIncPath","decomposing path %s", path.Data());
04479          return;
04480       }
04481    }
04482 
04483    if (add) {
04484 
04485       if (type == "lib") {
04486 
04487          // Add libs
04488          TIter nxl(op, kIterBackward);
04489          TObjString *lib = 0;
04490          while ((lib = (TObjString *) nxl())) {
04491             // Expand path
04492             TString xlib = lib->GetName();
04493             gSystem->ExpandPathName(xlib);
04494             // Add to the dynamic lib search path if it exists and can be read
04495             if (!gSystem->AccessPathName(xlib, kReadPermission)) {
04496                TString newlibpath = gSystem->GetDynamicPath();
04497                // In the first position after the working dir
04498                Int_t pos = 0;
04499                if (newlibpath.BeginsWith(".:"))
04500                   pos = 2;
04501                if (newlibpath.Index(xlib) == kNPOS) {
04502                   newlibpath.Insert(pos,TString::Format("%s:", xlib.Data()));
04503                   gSystem->SetDynamicPath(newlibpath);
04504                }
04505             } else {
04506                Info("HandleLibIncPath",
04507                     "libpath %s does not exist or cannot be read - not added", xlib.Data());
04508             }
04509          }
04510 
04511          // Forward the request, if required
04512          if (IsMaster())
04513             fProof->AddDynamicPath(path);
04514 
04515       } else {
04516 
04517          // Add incs
04518          TIter nxi(op);
04519          TObjString *inc = 0;
04520          while ((inc = (TObjString *) nxi())) {
04521             // Expand path
04522             TString xinc = inc->GetName();
04523             gSystem->ExpandPathName(xinc);
04524             // Add to the dynamic lib search path if it exists and can be read
04525             if (!gSystem->AccessPathName(xinc, kReadPermission)) {
04526                TString curincpath = gSystem->GetIncludePath();
04527                if (curincpath.Index(xinc) == kNPOS)
04528                   gSystem->AddIncludePath(TString::Format("-I%s", xinc.Data()));
04529             } else
04530                Info("HandleLibIncPath",
04531                     "incpath %s does not exist or cannot be read - not added", xinc.Data());
04532          }
04533 
04534          // Forward the request, if required
04535          if (IsMaster())
04536             fProof->AddIncludePath(path);
04537       }
04538 
04539 
04540    } else {
04541 
04542       if (type == "lib") {
04543 
04544          // Remove libs
04545          TIter nxl(op);
04546          TObjString *lib = 0;
04547          while ((lib = (TObjString *) nxl())) {
04548             // Expand path
04549             TString xlib = lib->GetName();
04550             gSystem->ExpandPathName(xlib);
04551             // Remove from the dynamic lib search path
04552             TString newlibpath = gSystem->GetDynamicPath();
04553             newlibpath.ReplaceAll(TString::Format("%s:", xlib.Data()),"");
04554             gSystem->SetDynamicPath(newlibpath);
04555          }
04556 
04557          // Forward the request, if required
04558          if (IsMaster())
04559             fProof->RemoveDynamicPath(path);
04560 
04561       } else {
04562 
04563          // Remove incs
04564          TIter nxi(op);
04565          TObjString *inc = 0;
04566          while ((inc = (TObjString *) nxi())) {
04567             TString newincpath = gSystem->GetIncludePath();
04568             newincpath.ReplaceAll(TString::Format("-I%s", inc->GetName()),"");
04569             // Remove the interpreter path (added anyhow internally)
04570             newincpath.ReplaceAll(gInterpreter->GetIncludePath(),"");
04571             gSystem->SetIncludePath(newincpath);
04572          }
04573 
04574          // Forward the request, if required
04575          if (IsMaster())
04576             fProof->RemoveIncludePath(path);
04577       }
04578    }
04579 }
04580 
04581 //______________________________________________________________________________
04582 void TProofServ::HandleCheckFile(TMessage *mess, TString *slb)
04583 {
04584    // Handle file checking request.
04585 
04586    TString filenam;
04587    TMD5    md5;
04588    UInt_t  opt = TProof::kUntar;
04589 
04590    TMessage reply(kPROOF_CHECKFILE);
04591 
04592    // Parse message
04593    (*mess) >> filenam >> md5;
04594    if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
04595       (*mess) >> opt;
04596 
04597    if (slb) *slb = filenam;
04598 
04599    if (filenam.BeginsWith("-")) {
04600       // install package:
04601       // compare md5's, untar, store md5 in PROOF-INF, remove par file
04602       Int_t  st  = 0;
04603       Bool_t err = kFALSE;
04604       filenam = filenam.Strip(TString::kLeading, '-');
04605       TString packnam = filenam;
04606       packnam.Remove(packnam.Length() - 4);  // strip off ".par"
04607       // compare md5's to check if transmission was ok
04608       fPackageLock->Lock();
04609       TMD5 *md5local = TMD5::FileChecksum(fPackageDir + "/" + filenam);
04610       if (md5local && md5 == (*md5local)) {
04611          if ((opt & TProof::kRemoveOld)) {
04612             // remove any previous package directory with same name
04613             st = gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
04614                                packnam.Data()));
04615             if (st)
04616                Error("HandleCheckFile", "failure executing: %s %s/%s",
04617                      kRM, fPackageDir.Data(), packnam.Data());
04618          }
04619          // find gunzip...
04620          char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
04621                                        kExecutePermission);
04622          if (gunzip) {
04623             // untar package
04624             st = gSystem->Exec(TString::Format(kUNTAR, gunzip, fPackageDir.Data(),
04625                                filenam.Data(), fPackageDir.Data()));
04626             if (st)
04627                Error("HandleCheckFile", "failure executing: %s",
04628                      TString::Format(kUNTAR, gunzip, fPackageDir.Data(),
04629                           filenam.Data(), fPackageDir.Data()).Data());
04630             delete [] gunzip;
04631          } else
04632             Error("HandleCheckFile", "%s not found", kGUNZIP);
04633          // check that fPackageDir/packnam now exists
04634          if (gSystem->AccessPathName(fPackageDir + "/" + packnam, kWritePermission)) {
04635             // par file did not unpack itself in the expected directory, failure
04636             reply << (Int_t)0;
04637             if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04638             err = kTRUE;
04639             Error("HandleCheckFile", "package %s did not unpack into %s",
04640                                      filenam.Data(), packnam.Data());
04641          } else {
04642             // store md5 in package/PROOF-INF/md5.txt
04643             TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
04644             TMD5::WriteChecksum(md5f, md5local);
04645             // Notify the client
04646             reply << (Int_t)1;
04647             PDB(kPackage, 1)
04648                Info("HandleCheckFile",
04649                     "package %s installed on node", filenam.Data());
04650          }
04651       } else {
04652          reply << (Int_t)0;
04653          if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04654          err = kTRUE;
04655          PDB(kPackage, 1)
04656             Info("HandleCheckFile",
04657                  "package %s not yet on node", filenam.Data());
04658       }
04659 
04660       // Note: Originally an fPackageLock->Unlock() call was made
04661       // after the if-else statement below. With multilevel masters,
04662       // submasters still check to make sure the package exists with
04663       // the correct md5 checksum and need to do a read lock there.
04664       // As yet locking is not that sophisicated so the lock must
04665       // be released below before the call to fProof->UploadPackage().
04666       if (err) {
04667          // delete par file in case of error
04668          gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
04669                        filenam.Data()));
04670          fPackageLock->Unlock();
04671       } else if (IsMaster()) {
04672          // forward to workers
04673          fPackageLock->Unlock();
04674          fProof->UploadPackage(fPackageDir + "/" + filenam, (TProof::EUploadPackageOpt)opt);
04675       } else {
04676          // Unlock in all cases
04677          fPackageLock->Unlock();
04678       }
04679       delete md5local;
04680       fSocket->Send(reply);
04681 
04682    } else if (filenam.BeginsWith("+")) {
04683       // check file in package directory
04684       filenam = filenam.Strip(TString::kLeading, '+');
04685       TString packnam = filenam;
04686       packnam.Remove(packnam.Length() - 4);  // strip off ".par"
04687       TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
04688       fPackageLock->Lock();
04689       TMD5 *md5local = TMD5::ReadChecksum(md5f);
04690       fPackageLock->Unlock();
04691       if (md5local && md5 == (*md5local)) {
04692          // package already on server, unlock directory
04693          reply << (Int_t)1;
04694          PDB(kPackage, 1)
04695             Info("HandleCheckFile",
04696                  "package %s already on node", filenam.Data());
04697          if (IsMaster())
04698             fProof->UploadPackage(fPackageDir + "/" + filenam);
04699       } else {
04700          reply << (Int_t)0;
04701          if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04702          PDB(kPackage, 1)
04703             Info("HandleCheckFile",
04704                  "package %s not yet on node", filenam.Data());
04705       }
04706       delete md5local;
04707       fSocket->Send(reply);
04708 
04709    } else if (filenam.BeginsWith("=")) {
04710       // check file in package directory, do not lock if it is the wrong file
04711       filenam = filenam.Strip(TString::kLeading, '=');
04712       TString packnam = filenam;
04713       packnam.Remove(packnam.Length() - 4);  // strip off ".par"
04714       TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
04715       fPackageLock->Lock();
04716       TMD5 *md5local = TMD5::ReadChecksum(md5f);
04717       fPackageLock->Unlock();
04718       if (md5local && md5 == (*md5local)) {
04719          // package already on server, unlock directory
04720          reply << (Int_t)1;
04721          PDB(kPackage, 1)
04722             Info("HandleCheckFile",
04723                  "package %s already on node", filenam.Data());
04724          if (IsMaster())
04725             fProof->UploadPackage(fPackageDir + "/" + filenam);
04726       } else {
04727          reply << (Int_t)0;
04728          if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04729          PDB(kPackage, 1)
04730             Info("HandleCheckFile",
04731                  "package %s not yet on node", filenam.Data());
04732       }
04733       delete md5local;
04734       fSocket->Send(reply);
04735 
04736    } else {
04737       // check file in cache directory
04738       TString cachef = fCacheDir + "/" + filenam;
04739       fCacheLock->Lock();
04740       TMD5 *md5local = TMD5::FileChecksum(cachef);
04741 
04742       if (md5local && md5 == (*md5local)) {
04743          // copy file from cache to working directory
04744          Bool_t cp = ((opt & TProof::kCp || opt & TProof::kCpBin) || (fProtocol <= 19)) ? kTRUE : kFALSE;
04745          if (cp) {
04746             Bool_t cpbin = (opt & TProof::kCpBin) ? kTRUE : kFALSE;
04747             CopyFromCache(filenam, cpbin);
04748          }
04749          reply << (Int_t)1;
04750          PDB(kCache, 1)
04751             Info("HandleCheckFile", "file %s already on node", filenam.Data());
04752       } else {
04753          reply << (Int_t)0;
04754          if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
04755          PDB(kCache, 1)
04756             Info("HandleCheckFile", "file %s not yet on node", filenam.Data());
04757       }
04758       delete md5local;
04759       fSocket->Send(reply);
04760       fCacheLock->Unlock();
04761    }
04762 }
04763 
04764 //______________________________________________________________________________
04765 Int_t TProofServ::HandleCache(TMessage *mess, TString *slb)
04766 {
04767    // Handle here all cache and package requests.
04768 
04769    PDB(kGlobal, 1)
04770       Info("HandleCache", "Enter");
04771 
04772    Int_t status = 0;
04773    Int_t type = 0;
04774    Bool_t all = kFALSE;
04775    TMessage msg;
04776    Bool_t fromglobal = kFALSE;
04777 
04778    // Notification message
04779    TString noth;
04780    const char *k = (IsMaster()) ? "Mst" : "Wrk";
04781    noth.Form("%s-%s", k, fOrdinal.Data());
04782 
04783    TList *optls = 0;
04784    TString packagedir(fPackageDir), package, pdir, ocwd, file;
04785    (*mess) >> type;
04786    switch (type) {
04787       case TProof::kShowCache:
04788          (*mess) >> all;
04789          printf("*** File cache %s:%s ***\n", gSystem->HostName(),
04790                 fCacheDir.Data());
04791          fflush(stdout);
04792          PDB(kCache, 1) {
04793             gSystem->Exec(TString::Format("%s -a %s", kLS, fCacheDir.Data()));
04794          } else {
04795             gSystem->Exec(TString::Format("%s %s", kLS, fCacheDir.Data()));
04796          }
04797          if (IsMaster() && all)
04798             fProof->ShowCache(all);
04799          LogToMaster();
04800          if (slb) slb->Form("%d %d", type, all);
04801          break;
04802       case TProof::kClearCache:
04803          file = "";
04804          if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
04805          fCacheLock->Lock();
04806          if (file.IsNull() || file == "*") {
04807             gSystem->Exec(TString::Format("%s %s/* %s/.*.binversion", kRM, fCacheDir.Data(), fCacheDir.Data()));
04808          } else {
04809             gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), file.Data()));
04810          }
04811          fCacheLock->Unlock();
04812          if (IsMaster())
04813             fProof->ClearCache(file);
04814          if (slb) slb->Form("%d %s", type, file.Data());
04815          break;
04816       case TProof::kShowPackages:
04817          (*mess) >> all;
04818          if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
04819             // Scan the list of global packages dirs
04820             TIter nxd(fGlobalPackageDirList);
04821             TNamed *nm = 0;
04822             while ((nm = (TNamed *)nxd())) {
04823                printf("*** Global Package cache %s %s:%s ***\n",
04824                       nm->GetName(), gSystem->HostName(), nm->GetTitle());
04825                fflush(stdout);
04826                gSystem->Exec(TString::Format("%s %s", kLS, nm->GetTitle()));
04827                printf("\n");
04828                fflush(stdout);
04829             }
04830          }
04831          printf("*** Package cache %s:%s ***\n", gSystem->HostName(),
04832                 fPackageDir.Data());
04833          fflush(stdout);
04834          gSystem->Exec(TString::Format("%s %s", kLS, fPackageDir.Data()));
04835          if (IsMaster() && all)
04836             fProof->ShowPackages(all);
04837          LogToMaster();
04838          if (slb) slb->Form("%d %d", type, all);
04839          break;
04840       case TProof::kClearPackages:
04841          status = UnloadPackages();
04842          if (status == 0) {
04843             fPackageLock->Lock();
04844             gSystem->Exec(TString::Format("%s %s/*", kRM, fPackageDir.Data()));
04845             fPackageLock->Unlock();
04846             if (IsMaster())
04847                status = fProof->ClearPackages();
04848          }
04849          if (slb) slb->Form("%d %d", type, status);
04850          break;
04851       case TProof::kClearPackage:
04852          (*mess) >> package;
04853          status = UnloadPackage(package);
04854          if (status == 0) {
04855             fPackageLock->Lock();
04856             // remove package directory and par file
04857             gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
04858                           package.Data()));
04859             if (IsMaster())
04860                gSystem->Exec(TString::Format("%s %s/%s.par", kRM, fPackageDir.Data(),
04861                              package.Data()));
04862             fPackageLock->Unlock();
04863             if (IsMaster())
04864                status = fProof->ClearPackage(package);
04865          }
04866          if (slb) slb->Form("%d %s %d", type, package.Data(), status);
04867          break;
04868       case TProof::kBuildPackage:
04869          (*mess) >> package;
04870 
04871          // always follows BuildPackage so no need to check for PROOF-INF
04872          pdir = fPackageDir + "/" + package;
04873 
04874          fromglobal = kFALSE;
04875          if (gSystem->AccessPathName(pdir, kReadPermission) ||
04876              gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
04877             // Is there a global package with this name?
04878             if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
04879                // Scan the list of global packages dirs
04880                TIter nxd(fGlobalPackageDirList);
04881                TNamed *nm = 0;
04882                while ((nm = (TNamed *)nxd())) {
04883                   pdir.Form("%s/%s", nm->GetTitle(), package.Data());
04884                   if (!gSystem->AccessPathName(pdir, kReadPermission) &&
04885                       !gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
04886                      // Package found, stop searching
04887                      fromglobal = kTRUE;
04888                      packagedir = nm->GetTitle();
04889                      break;
04890                   }
04891                   pdir = "";
04892                }
04893                if (pdir.Length() <= 0) {
04894                   // Package not found
04895                   SendAsynMessage(TString::Format("%s: kBuildPackage: failure locating %s ...",
04896                                        noth.Data(), package.Data()));
04897                   break;
04898                }
04899             }
04900          }
04901 
04902          if (IsMaster() && !fromglobal) {
04903             // make sure package is available on all slaves, even new ones
04904             fProof->UploadPackage(pdir + ".par");
04905          }
04906          fPackageLock->Lock();
04907 
04908          if (!status) {
04909 
04910             PDB(kPackage, 1)
04911                Info("HandleCache",
04912                     "kBuildPackage: package %s exists and has PROOF-INF directory", package.Data());
04913 
04914             ocwd = gSystem->WorkingDirectory();
04915             gSystem->ChangeDirectory(pdir);
04916 
04917             // forward build command to slaves, but don't wait for results
04918             if (IsMaster())
04919                fProof->BuildPackage(package, TProof::kBuildOnSlavesNoWait);
04920 
04921             // check for BUILD.sh and execute
04922             if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
04923                // Notify the upper level
04924                SendAsynMessage(TString::Format("%s: building %s ...", noth.Data(), package.Data()));
04925 
04926                // read version from file proofvers.txt, and if current version is
04927                // not the same do a "BUILD.sh clean"
04928                Bool_t savever = kFALSE;
04929                TString v;
04930                Int_t rev = -1;
04931                FILE *f = fopen("PROOF-INF/proofvers.txt", "r");
04932                if (f) {
04933                   TString r;
04934                   v.Gets(f);
04935                   r.Gets(f);
04936                   rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
04937                   fclose(f);
04938                }
04939                if (!f || v != gROOT->GetVersion() ||
04940                   (gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision())) {
04941                   if (!fromglobal || !gSystem->AccessPathName(pdir, kWritePermission)) {
04942                      savever = kTRUE;
04943                      SendAsynMessage(TString::Format("%s: %s: version change (current: %s:%d,"
04944                                           " build: %s:%d): cleaning ... ",
04945                                           noth.Data(), package.Data(), gROOT->GetVersion(),
04946                                           gROOT->GetSvnRevision(), v.Data(), rev));
04947                      // Hard cleanup: go up the dir tree
04948                      gSystem->ChangeDirectory(packagedir);
04949                      // remove package directory
04950                      gSystem->Exec(TString::Format("%s %s", kRM, pdir.Data()));
04951                      // find gunzip...
04952                      char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
04953                                                    kExecutePermission);
04954                      if (gunzip) {
04955                         TString par;
04956                         par.Form("%s.par", pdir.Data());
04957                         // untar package
04958                         TString cmd;
04959                         cmd.Form(kUNTAR3, gunzip, par.Data());
04960                         status = gSystem->Exec(cmd);
04961                         if (status) {
04962                            Error("HandleCache", "kBuildPackage: failure executing: %s", cmd.Data());
04963                         } else {
04964                            // Store md5 in package/PROOF-INF/md5.txt
04965                            TMD5 *md5local = TMD5::FileChecksum(par);
04966                            if (md5local) {
04967                               TString md5f = packagedir + "/" + package + "/PROOF-INF/md5.txt";
04968                               TMD5::WriteChecksum(md5f, md5local);
04969                               // Go down to the package directory
04970                               gSystem->ChangeDirectory(pdir);
04971                               // Cleanup
04972                               SafeDelete(md5local);
04973                            } else {
04974                               Error("HandleCache", "kBuildPackage: failure calculating MD5sum for '%s'", par.Data());
04975                            }
04976                         }
04977                         delete [] gunzip;
04978                      } else
04979                         Error("HandleCache", "kBuildPackage: %s not found", kGUNZIP);
04980                   } else {
04981                      SendAsynMessage(TString::Format("%s: %s: ROOT version inconsistency (current: %s, build: %s):"
04982                                           " global package: cannot re-build!!! ",
04983                                           noth.Data(), package.Data(), gROOT->GetVersion(), v.Data()));
04984                   }
04985                }
04986 
04987                if (!status) {
04988                   // To build the package we execute PROOF-INF/BUILD.sh via a pipe
04989                   // so that we can send back the log in (almost) real-time to the
04990                   // (impatient) client. Note that this operation will block, so
04991                   // the messages from builds on the workers will reach the client
04992                   // shortly after the master ones.
04993                   TString ipath(gSystem->GetIncludePath());
04994                   ipath.ReplaceAll("\"","");
04995                   TString cmd;
04996                   cmd.Form("export ROOTINCLUDEPATH=\"%s\" ; PROOF-INF/BUILD.sh", ipath.Data());
04997                   {
04998                      TProofServLogHandlerGuard hg(cmd, fSocket);
04999                   }
05000                   if (!(status = TProofServLogHandler::GetCmdRtn())) {
05001                      // Success: write version file
05002                      if (savever) {
05003                         f = fopen("PROOF-INF/proofvers.txt", "w");
05004                         if (f) {
05005                            fputs(gROOT->GetVersion(), f);
05006                            fputs(TString::Format("\n%d",gROOT->GetSvnRevision()), f);
05007                            fclose(f);
05008                         }
05009                      }
05010                   }
05011                }
05012             } else {
05013                // Notify the user
05014                PDB(kPackage, 1)
05015                   Info("HandleCache", "no PROOF-INF/BUILD.sh found for package %s", package.Data());
05016             }
05017             gSystem->ChangeDirectory(ocwd);
05018          }
05019 
05020          fPackageLock->Unlock();
05021 
05022          if (status) {
05023             // Notify the upper level
05024             SendAsynMessage(TString::Format("%s: failure building %s ... (status: %d)", noth.Data(), package.Data(), status));
05025          } else {
05026             // collect built results from slaves
05027             if (IsMaster())
05028                status = fProof->BuildPackage(package, TProof::kCollectBuildResults);
05029             PDB(kPackage, 1)
05030                Info("HandleCache", "package %s successfully built", package.Data());
05031          }
05032          if (slb) slb->Form("%d %s %d", type, package.Data(), status);
05033          break;
05034       case TProof::kLoadPackage:
05035          (*mess) >> package;
05036 
05037          // If already loaded don't do it again
05038          if (fEnabledPackages->FindObject(package)) {
05039             Info("HandleCache",
05040                  "package %s already loaded", package.Data());
05041             break;
05042          }
05043 
05044          // always follows BuildPackage so no need to check for PROOF-INF
05045          pdir = fPackageDir + "/" + package;
05046 
05047          if (gSystem->AccessPathName(pdir, kReadPermission)) {
05048             // Is there a global package with this name?
05049             if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
05050                // Scan the list of global packages dirs
05051                TIter nxd(fGlobalPackageDirList);
05052                TNamed *nm = 0;
05053                while ((nm = (TNamed *)nxd())) {
05054                   pdir.Form("%s/%s", nm->GetTitle(), package.Data());
05055                   if (!gSystem->AccessPathName(pdir, kReadPermission)) {
05056                      // Package found, stop searching
05057                      break;
05058                   }
05059                   pdir = "";
05060                }
05061                if (pdir.Length() <= 0) {
05062                   // Package not found
05063                   SendAsynMessage(TString::Format("%s: kLoadPackage: failure locating %s ...",
05064                                        noth.Data(), package.Data()));
05065                   break;
05066                }
05067             }
05068          }
05069 
05070          ocwd = gSystem->WorkingDirectory();
05071          gSystem->ChangeDirectory(pdir);
05072 
05073          // We have to be atomic here
05074          fPackageLock->Lock();
05075 
05076          // Check for SETUP.C and execute
05077          if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
05078             // We need to change the name of the function to avoid problems when we load more packages
05079             TString setup, setupfn;
05080             setup.Form("SETUP_%x", package.Hash());
05081             // Remove special characters
05082             setupfn.Form("%s/%s.C", gSystem->TempDirectory(), setup.Data());
05083             TMacro setupmc("PROOF-INF/SETUP.C");
05084             TObjString *setupline = setupmc.GetLineWith("SETUP(");
05085             if (setupline) {
05086                TString setupstring(setupline->GetString());
05087                setupstring.ReplaceAll("SETUP(", TString::Format("%s(", setup.Data()));
05088                setupline->SetString(setupstring);
05089             } else {
05090                // Macro does not contain SETUP()
05091                SendAsynMessage(TString::Format("%s: warning: macro '%s/PROOF-INF/SETUP.C' does not contain a SETUP()"
05092                                                " function", noth.Data(), package.Data()));
05093             }
05094             setupmc.SaveSource(setupfn.Data());
05095             // Load the macro
05096             if (gROOT->LoadMacro(setupfn.Data()) != 0) {
05097                // Macro could not be loaded
05098                SendAsynMessage(TString::Format("%s: error: macro '%s/PROOF-INF/SETUP.C' could not be loaded:"
05099                                                 " cannot continue",
05100                                                 noth.Data(), package.Data()));
05101                status = -1;
05102             } else {
05103                // Check the signature
05104                TFunction *fun = (TFunction *) gROOT->GetListOfGlobalFunctions()->FindObject(setup);
05105                if (!fun) {
05106                   // Notify the upper level
05107                   SendAsynMessage(TString::Format("%s: error: function SETUP() not found in macro '%s/PROOF-INF/SETUP.C':"
05108                                                    " cannot continue",
05109                                                    noth.Data(), package.Data()));
05110                   status = -1;
05111                } else {
05112                   TMethodCall callEnv;
05113                   // Check the number of arguments
05114                   if (fun->GetNargs() == 0) {
05115                      // No arguments (basic signature)
05116                      callEnv.InitWithPrototype(setup.Data(),"");
05117                      if ((mess->BufferSize() > mess->Length())) {
05118                         (*mess) >> optls;
05119                         SendAsynMessage(TString::Format("%s: warning: loaded SETUP() does not take any argument:"
05120                                                         " the specified argument will be ignored", noth.Data()));
05121                      }
05122                   } else if (fun->GetNargs() == 1) {
05123                      TMethodArg *arg = (TMethodArg *) fun->GetListOfMethodArgs()->First();
05124                      if (arg) {
05125                         // Get argument
05126                         if ((mess->BufferSize() > mess->Length())) (*mess) >> optls;
05127                         // Check argument type
05128                         TString argsig(arg->GetTitle());
05129                         if (argsig.BeginsWith("TList")) {
05130                            callEnv.InitWithPrototype(setup.Data(),"TList *");
05131                            callEnv.ResetParam();
05132                            callEnv.SetParam((Long_t) optls);
05133                         } else if (argsig.BeginsWith("const char")) {
05134                            callEnv.InitWithPrototype(setup.Data(),"const char *");
05135                            callEnv.ResetParam();
05136                            TObjString *os = optls ? dynamic_cast<TObjString *>(optls->First()) : 0;
05137                            if (os) {
05138                               callEnv.SetParam((Long_t) os->GetName());
05139                            } else {
05140                               if (optls && optls->First()) {
05141                                  SendAsynMessage(TString::Format("%s: warning: found object argument of type %s:"
05142                                                                  " SETUP expects 'const char *': ignoring",
05143                                                                  noth.Data(), optls->First()->ClassName()));
05144                               }
05145                               callEnv.SetParam((Long_t) 0);
05146                            }
05147                         } else {
05148                            // Notify the upper level
05149                            SendAsynMessage(TString::Format("%s: error: unsupported SETUP signature: SETUP(%s)"
05150                                                             " cannot continue", noth.Data(), arg->GetTitle()));
05151                            status = -1;
05152                         }
05153                      } else {
05154                         // Notify the upper level
05155                         SendAsynMessage(TString::Format("%s: error: cannot get information about the SETUP() argument:"
05156                                                          " cannot continue", noth.Data()));
05157                         status = -1;
05158                      }
05159                   } else if (fun->GetNargs() > 1) {
05160                      // Notify the upper level
05161                      SendAsynMessage(TString::Format("%s: error: function SETUP() can have at most a 'TList *' argument:"
05162                                                       " cannot continue", noth.Data()));
05163                      status = -1;
05164                   }
05165                   // Execute
05166                   Long_t setuprc = (status == 0) ? 0 : -1;
05167                   if (status == 0) {
05168                      callEnv.Execute(setuprc);
05169                      if (setuprc < 0) status = -1;
05170                   }
05171                }
05172             }
05173             if (!gSystem->AccessPathName(setupfn.Data())) gSystem->Unlink(setupfn.Data());
05174          }
05175 
05176          // End of atomicity
05177          fPackageLock->Unlock();
05178 
05179          gSystem->ChangeDirectory(ocwd);
05180 
05181          if (status < 0) {
05182 
05183             // Notify the upper level
05184             SendAsynMessage(TString::Format("%s: failure loading %s ...", noth.Data(), package.Data()));
05185 
05186          } else {
05187 
05188             // create link to package in working directory
05189             gSystem->Symlink(pdir, package);
05190 
05191             // add package to list of include directories to be searched
05192             // by ACliC
05193             gSystem->AddIncludePath(TString("-I") + package);
05194 
05195             // add package to list of include directories to be searched by CINT
05196             gROOT->ProcessLine(TString(".include ") + package);
05197 
05198             // if successful add to list and propagate to slaves
05199             fEnabledPackages->Add(new TObjString(package));
05200             if (IsMaster()) {
05201                if (optls && optls->GetSize() > 0) {
05202                   // List argument
05203                   status = fProof->LoadPackage(package, kFALSE, optls);
05204                } else {
05205                   // No argument
05206                   status = fProof->LoadPackage(package);
05207                }
05208             }
05209 
05210             PDB(kPackage, 1)
05211                Info("HandleCache", "package %s successfully loaded", package.Data());
05212          }
05213          if (slb) slb->Form("%d %s %d", type, package.Data(), status);
05214          break;
05215       case TProof::kShowEnabledPackages:
05216          (*mess) >> all;
05217          if (IsMaster()) {
05218             if (all)
05219                printf("*** Enabled packages on master %s on %s\n",
05220                       fOrdinal.Data(), gSystem->HostName());
05221             else
05222                printf("*** Enabled packages ***\n");
05223          } else {
05224             printf("*** Enabled packages on slave %s on %s\n",
05225                    fOrdinal.Data(), gSystem->HostName());
05226          }
05227          {
05228             TIter next(fEnabledPackages);
05229             while (TObjString *str = (TObjString*) next())
05230                printf("%s\n", str->GetName());
05231          }
05232          if (IsMaster() && all)
05233             fProof->ShowEnabledPackages(all);
05234          LogToMaster();
05235          if (slb) slb->Form("%d %d", type, all);
05236          break;
05237       case TProof::kShowSubCache:
05238          (*mess) >> all;
05239          if (IsMaster() && all)
05240             fProof->ShowCache(all);
05241          LogToMaster();
05242          if (slb) slb->Form("%d %d", type, all);
05243          break;
05244       case TProof::kClearSubCache:
05245          file = "";
05246          if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
05247          if (IsMaster())
05248             fProof->ClearCache(file);
05249          if (slb) slb->Form("%d %s", type, file.Data());
05250          break;
05251       case TProof::kShowSubPackages:
05252          (*mess) >> all;
05253          if (IsMaster() && all)
05254             fProof->ShowPackages(all);
05255          LogToMaster();
05256          if (slb) slb->Form("%d %d", type, all);
05257          break;
05258       case TProof::kDisableSubPackages:
05259          if (IsMaster())
05260             fProof->DisablePackages();
05261          if (slb) slb->Form("%d", type);
05262          break;
05263       case TProof::kDisableSubPackage:
05264          (*mess) >> package;
05265          if (IsMaster())
05266             fProof->DisablePackage(package);
05267          if (slb) slb->Form("%d %s", type, package.Data());
05268          break;
05269       case TProof::kBuildSubPackage:
05270          (*mess) >> package;
05271          if (IsMaster())
05272             fProof->BuildPackage(package);
05273          if (slb) slb->Form("%d %s", type, package.Data());
05274          break;
05275       case TProof::kUnloadPackage:
05276          (*mess) >> package;
05277          status = UnloadPackage(package);
05278          if (IsMaster() && status == 0)
05279             status = fProof->UnloadPackage(package);
05280          if (slb) slb->Form("%d %s %d", type, package.Data(), status);
05281          break;
05282       case TProof::kDisablePackage:
05283          (*mess) >> package;
05284          fPackageLock->Lock();
05285          // remove package directory and par file
05286          gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
05287                        package.Data()));
05288          gSystem->Exec(TString::Format("%s %s/%s.par", kRM, fPackageDir.Data(),
05289                        package.Data()));
05290          fPackageLock->Unlock();
05291          if (IsMaster())
05292             fProof->DisablePackage(package);
05293          if (slb) slb->Form("%d %s", type, package.Data());
05294          break;
05295       case TProof::kUnloadPackages:
05296          status = UnloadPackages();
05297          if (IsMaster() && status == 0)
05298             status = fProof->UnloadPackages();
05299          if (slb) slb->Form("%d %s %d", type, package.Data(), status);
05300          break;
05301       case TProof::kDisablePackages:
05302          fPackageLock->Lock();
05303          gSystem->Exec(TString::Format("%s %s/*", kRM, fPackageDir.Data()));
05304          fPackageLock->Unlock();
05305          if (IsMaster())
05306             fProof->DisablePackages();
05307          if (slb) slb->Form("%d %s", type, package.Data());
05308          break;
05309       case TProof::kListEnabledPackages:
05310          msg.Reset(kPROOF_PACKAGE_LIST);
05311          msg << type << fEnabledPackages;
05312          fSocket->Send(msg);
05313          if (slb) slb->Form("%d", type);
05314          break;
05315       case TProof::kListPackages:
05316          {
05317             TList *pack = new TList;
05318             void *dir = gSystem->OpenDirectory(fPackageDir);
05319             if (dir) {
05320                TString pac(gSystem->GetDirEntry(dir));
05321                while (pac.Length() > 0) {
05322                   if (pac.EndsWith(".par")) {
05323                      pac.ReplaceAll(".par","");
05324                      pack->Add(new TObjString(pac.Data()));
05325                   }
05326                   pac = gSystem->GetDirEntry(dir);
05327                }
05328             }
05329             gSystem->FreeDirectory(dir);
05330             msg.Reset(kPROOF_PACKAGE_LIST);
05331             msg << type << pack;
05332             fSocket->Send(msg);
05333          }
05334          if (slb) slb->Form("%d", type);
05335          break;
05336       case TProof::kLoadMacro:
05337 
05338          (*mess) >> package;
05339 
05340          // By first forwarding the load command to the unique workers
05341          // and only then loading locally we load/build in parallel
05342          if (IsMaster())
05343             fProof->Load(package, kFALSE, kTRUE);
05344 
05345          // Atomic action
05346          fCacheLock->Lock();
05347 
05348          // Load locally; the implementation and header files (and perhaps
05349          // the binaries) are already in the cache
05350          CopyFromCache(package, kTRUE);
05351 
05352          // Load the macro
05353          Info("HandleCache", "loading macro %s ...", package.Data());
05354          gROOT->ProcessLine(TString::Format(".L %s", package.Data()));
05355 
05356          // Cache binaries, if any new
05357          CopyToCache(package, 1);
05358 
05359          // Release atomicity
05360          fCacheLock->Unlock();
05361 
05362          // Now we collect the result from the unique workers and send the load request
05363          // to the other workers (no compilation)
05364          if (IsMaster())
05365             fProof->Load(package, kFALSE, kFALSE);
05366 
05367          // Notify the upper level
05368          LogToMaster();
05369 
05370          if (slb) slb->Form("%d %s", type, package.Data());
05371          break;
05372       default:
05373          Error("HandleCache", "unknown type %d", type);
05374          break;
05375    }
05376 
05377    // We are done
05378    return status;
05379 }
05380 
05381 //______________________________________________________________________________
05382 void TProofServ::HandleWorkerLists(TMessage *mess)
05383 {
05384    // Handle here all requests to modify worker lists
05385 
05386    PDB(kGlobal, 1)
05387       Info("HandleWorkerLists", "Enter");
05388 
05389    Int_t type = 0;
05390    TString ord;
05391 
05392    (*mess) >> type;
05393 
05394    switch (type) {
05395       case TProof::kActivateWorker:
05396          (*mess) >> ord;
05397          if (fProof) {
05398             Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
05399             Int_t nactmax = fProof->GetListOfSlaves()->GetSize() -
05400                             fProof->GetListOfBadSlaves()->GetSize();
05401             if (nact < nactmax) {
05402                fProof->ActivateWorker(ord);
05403                Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
05404                if (ord == "*") {
05405                   if (nactnew == nactmax) {
05406                      Info("HandleWorkerList","all workers (re-)activated");
05407                   } else {
05408                      Info("HandleWorkerList","%d workers could not be (re-)activated", nactmax - nactnew);
05409                   }
05410                } else {
05411                   if (nactnew == (nact + 1)) {
05412                      Info("HandleWorkerList","worker %s (re-)activated", ord.Data());
05413                   } else {
05414                      Info("HandleWorkerList","worker %s could not be (re-)activated;"
05415                                              " # of actives: %d --> %d", ord.Data(), nact, nactnew);
05416                   }
05417                }
05418             } else {
05419                Info("HandleWorkerList","all workers are already active");
05420             }
05421          } else {
05422             Warning("HandleWorkerList","undefined PROOF session: protocol error?");
05423          }
05424          break;
05425       case TProof::kDeactivateWorker:
05426          (*mess) >> ord;
05427          if (fProof) {
05428             Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
05429             if (nact > 0) {
05430                fProof->DeactivateWorker(ord);
05431                Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
05432                if (ord == "*") {
05433                   if (nactnew == 0) {
05434                      Info("HandleWorkerList","all workers deactivated");
05435                   } else {
05436                      Info("HandleWorkerList","%d workers could not be deactivated", nactnew);
05437                   }
05438                } else {
05439                   if (nactnew == (nact - 1)) {
05440                      Info("HandleWorkerList","worker %s deactivated", ord.Data());
05441                   } else {
05442                      Info("HandleWorkerList","worker %s could not be deactivated:"
05443                                              " # of actives: %d --> %d", ord.Data(), nact, nactnew);
05444                   }
05445                }
05446             } else {
05447                Info("HandleWorkerList","all workers are already inactive");
05448             }
05449          } else {
05450             Warning("HandleWorkerList","undefined PROOF session: protocol error?");
05451          }
05452          break;
05453       default:
05454          Warning("HandleWorkerList","unknown action type (%d)", type);
05455    }
05456 }
05457 
05458 //______________________________________________________________________________
05459 TProofServ::EQueryAction TProofServ::GetWorkers(TList *workers,
05460                                                 Int_t & /* prioritychange */,
05461                                                 Bool_t /* resume */)
05462 {
05463    // Get list of workers to be used from now on.
05464    // The list must be provided by the caller.
05465 
05466    // Parse the config file
05467    TProofResourcesStatic *resources =
05468       new TProofResourcesStatic(fConfDir, fConfFile);
05469    fConfFile = resources->GetFileName(); // Update the global file name (with path)
05470    PDB(kGlobal,1)
05471          Info("GetWorkers", "using PROOF config file: %s", fConfFile.Data());
05472 
05473    // Get the master
05474    TProofNodeInfo *master = resources->GetMaster();
05475    if (!master) {
05476       PDB(kAll,1)
05477          Info("GetWorkers",
05478               "no appropriate master line found in %s", fConfFile.Data());
05479       return kQueryStop;
05480    } else {
05481       // Set image if not yet done and available
05482       if (fImage.IsNull() && strlen(master->GetImage()) > 0)
05483          fImage = master->GetImage();
05484    }
05485 
05486    // Fill submaster or worker list
05487    if (workers) {
05488       if (resources->GetSubmasters() && resources->GetSubmasters()->GetSize() > 0) {
05489          PDB(kAll,1)
05490             resources->GetSubmasters()->Print();
05491          TProofNodeInfo *ni = 0;
05492          TIter nw(resources->GetSubmasters());
05493          while ((ni = (TProofNodeInfo *) nw()))
05494             workers->Add(new TProofNodeInfo(*ni));
05495       } else if (resources->GetWorkers() && resources->GetWorkers()->GetSize() > 0) {
05496          PDB(kAll,1)
05497             resources->GetWorkers()->Print();
05498          TProofNodeInfo *ni = 0;
05499          TIter nw(resources->GetWorkers());
05500          while ((ni = (TProofNodeInfo *) nw()))
05501             workers->Add(new TProofNodeInfo(*ni));
05502       }
05503    }
05504 
05505    // We are done
05506    return kQueryOK;
05507 }
05508 
05509 //______________________________________________________________________________
05510 FILE *TProofServ::SetErrorHandlerFile(FILE *ferr)
05511 {
05512    // Set the file stream where to log (default stderr).
05513    // If ferr == 0 the default is restored.
05514    // Returns current setting.
05515 
05516    FILE *oldferr = fgErrorHandlerFile;
05517    fgErrorHandlerFile = (ferr) ? ferr : stderr;
05518    return oldferr;
05519 }
05520 
05521 //______________________________________________________________________________
05522 void TProofServ::ErrorHandler(Int_t level, Bool_t abort, const char *location,
05523                               const char *msg)
05524 {
05525    // The PROOF error handler function. It prints the message on fgErrorHandlerFile and
05526    // if abort is set it aborts the application.
05527 
05528    if (gErrorIgnoreLevel == kUnset) {
05529       gErrorIgnoreLevel = 0;
05530       if (gEnv) {
05531          TString lvl = gEnv->GetValue("Root.ErrorIgnoreLevel", "Print");
05532          if (!lvl.CompareTo("Print", TString::kIgnoreCase))
05533             gErrorIgnoreLevel = kPrint;
05534          else if (!lvl.CompareTo("Info", TString::kIgnoreCase))
05535             gErrorIgnoreLevel = kInfo;
05536          else if (!lvl.CompareTo("Warning", TString::kIgnoreCase))
05537             gErrorIgnoreLevel = kWarning;
05538          else if (!lvl.CompareTo("Error", TString::kIgnoreCase))
05539             gErrorIgnoreLevel = kError;
05540          else if (!lvl.CompareTo("Break", TString::kIgnoreCase))
05541             gErrorIgnoreLevel = kBreak;
05542          else if (!lvl.CompareTo("SysError", TString::kIgnoreCase))
05543             gErrorIgnoreLevel = kSysError;
05544          else if (!lvl.CompareTo("Fatal", TString::kIgnoreCase))
05545             gErrorIgnoreLevel = kFatal;
05546       }
05547    }
05548 
05549    if (level < gErrorIgnoreLevel)
05550       return;
05551 
05552    // Always communicate errors via SendLogFile
05553    if (level >= kError && gProofServ)
05554       gProofServ->LogToMaster();
05555 
05556    Bool_t tosyslog = (fgLogToSysLog > 2) ? kTRUE : kFALSE;
05557 
05558    const char *type   = 0;
05559    ELogLevel loglevel = kLogInfo;
05560 
05561    Int_t ipos = (location) ? strlen(location) : 0;
05562 
05563    if (level >= kPrint) {
05564       loglevel = kLogInfo;
05565       type = "Print";
05566    }
05567    if (level >= kInfo) {
05568       loglevel = kLogInfo;
05569       char *ps = location ? (char *) strrchr(location, '|') : (char *)0;
05570       if (ps) {
05571          ipos = (int)(ps - (char *)location);
05572          type = "SvcMsg";
05573       } else {
05574          type = "Info";
05575       }
05576    }
05577    if (level >= kWarning) {
05578       loglevel = kLogWarning;
05579       type = "Warning";
05580    }
05581    if (level >= kError) {
05582       loglevel = kLogErr;
05583       type = "Error";
05584    }
05585    if (level >= kBreak) {
05586       loglevel = kLogErr;
05587       type = "*** Break ***";
05588    }
05589    if (level >= kSysError) {
05590       loglevel = kLogErr;
05591       type = "SysError";
05592    }
05593    if (level >= kFatal) {
05594       loglevel = kLogErr;
05595       type = "Fatal";
05596    }
05597 
05598 
05599    TString buf;
05600 
05601    // Time stamp
05602    TTimeStamp ts;
05603    TString st(ts.AsString("lc"),19);
05604 
05605    if (!location || ipos == 0 ||
05606        (level >= kPrint && level < kInfo) ||
05607        (level >= kBreak && level < kSysError)) {
05608       fprintf(fgErrorHandlerFile, "%s %5d %s | %s: %s\n", st(11,8).Data(),
05609                                   gSystem->GetPid(),
05610                                  (gProofServ ? gProofServ->GetPrefix() : "proof"),
05611                                   type, msg);
05612       if (tosyslog)
05613          buf.Form("%s: %s:%s", fgSysLogEntity.Data(), type, msg);
05614    } else {
05615       fprintf(fgErrorHandlerFile, "%s %5d %s | %s in <%.*s>: %s\n", st(11,8).Data(),
05616                                   gSystem->GetPid(),
05617                                  (gProofServ ? gProofServ->GetPrefix() : "proof"),
05618                                   type, ipos, location, msg);
05619       if (tosyslog)
05620          buf.Form("%s: %s:<%.*s>: %s", fgSysLogEntity.Data(), type, ipos, location, msg);
05621    }
05622    fflush(fgErrorHandlerFile);
05623 
05624    if (tosyslog)
05625       gSystem->Syslog(loglevel, buf);
05626 
05627    if (abort) {
05628 
05629       static Bool_t recursive = kFALSE;
05630 
05631       if (gProofServ != 0 && !recursive) {
05632          recursive = kTRUE;
05633          gProofServ->GetSocket()->Send(kPROOF_FATAL);
05634          recursive = kFALSE;
05635       }
05636 
05637       fprintf(fgErrorHandlerFile, "aborting\n");
05638       fflush(fgErrorHandlerFile);
05639       gSystem->StackTrace();
05640       gSystem->Abort();
05641    }
05642 }
05643 
05644 //______________________________________________________________________________
05645 Int_t TProofServ::CopyFromCache(const char *macro, Bool_t cpbin)
05646 {
05647    // Retrieve any files related to 'macro' from the cache directory.
05648    // If 'cpbin' is true, the associated binaries are retrieved as well.
05649    // Returns 0 on success, -1 otherwise
05650 
05651    if (!macro || strlen(macro) <= 0)
05652       // Invalid inputs
05653       return -1;
05654 
05655    // Split out the aclic mode, if any
05656    TString name = macro;
05657    TString acmode, args, io;
05658    name = gSystem->SplitAclicMode(name, acmode, args, io);
05659 
05660    PDB(kGlobal,1)
05661       Info("CopyFromCache","enter: names: %s, %s", macro, name.Data());
05662 
05663    // Atomic action
05664    Bool_t locked = (fCacheLock->IsLocked()) ? kTRUE : kFALSE;
05665    if (!locked) fCacheLock->Lock();
05666 
05667    // Get source from the cache
05668    Bool_t assertfile = kFALSE;
05669    TString srcname(name);
05670    Int_t dot = srcname.Last('.');
05671    if (dot != kNPOS) {
05672       srcname.Remove(dot);
05673       srcname += ".*";
05674    } else {
05675       assertfile = kTRUE;
05676    }
05677    srcname.Insert(0, TString::Format("%s/",fCacheDir.Data()));
05678    dot = (dot != kNPOS) ? srcname.Last('.') : dot;
05679    // Assert the file if asked (to silence warnings from 'cp')
05680    if (assertfile) {
05681       if (gSystem->AccessPathName(srcname)) {
05682          PDB(kCache,1)
05683             Info("CopyFromCache", "file %s not in cache", srcname.Data());
05684          if (!locked) fCacheLock->Unlock();
05685          return 0;
05686       }
05687    }
05688    PDB(kCache,1)
05689       Info("CopyFromCache", "retrieving %s from cache", srcname.Data());
05690    gSystem->Exec(TString::Format("%s %s .", kCP, srcname.Data()));
05691 
05692    // Check if we are done
05693    if (!cpbin) {
05694       // End of atomicity
05695       if (!locked) fCacheLock->Unlock();
05696       return 0;
05697    }
05698 
05699    // Create binary name template
05700    TString binname = name;
05701    dot = binname.Last('.');
05702    if (dot != kNPOS) {
05703       binname.Replace(dot,1,"_");
05704       binname += ".";
05705    } else {
05706       PDB(kCache,1)
05707          Info("CopyFromCache",
05708               "non-standard name structure: %s ('.' missing)", name.Data());
05709       // Done
05710       if (!locked) fCacheLock->Unlock();
05711       return 0;
05712    }
05713 
05714    // Binary version file name
05715    TString vername;
05716    vername.Form(".%s", name.Data());
05717    Int_t dotv = vername.Last('.');
05718    if (dotv != kNPOS)
05719       vername.Remove(dotv);
05720    vername += ".binversion";
05721 
05722    // Check binary version
05723    TString v;
05724    Int_t rev = -1;
05725    Bool_t okfil = kFALSE;
05726    FILE *f = fopen(TString::Format("%s/%s", fCacheDir.Data(), vername.Data()), "r");
05727    if (f) {
05728       TString r;
05729       v.Gets(f);
05730       r.Gets(f);
05731       rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
05732       fclose(f);
05733       okfil = kTRUE;
05734    }
05735 
05736    Bool_t okver = (v != gROOT->GetVersion()) ? kFALSE : kTRUE;
05737    Bool_t okrev = (gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision()) ? kFALSE : kTRUE;
05738    if (!okfil || !okver || !okrev) {
05739    PDB(kCache,1)
05740       Info("CopyFromCache",
05741            "removing binaries: 'file': %s, 'ROOT version': %s, 'ROOT revision': %s",
05742            (okfil ? "OK" : "not OK"), (okver ? "OK" : "not OK"), (okrev ? "OK" : "not OK") );
05743       // Remove all existing binaries
05744       binname += "*";
05745       gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), binname.Data()));
05746       // ... and the binary version file
05747       gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), vername.Data()));
05748       // Done
05749       if (!locked) fCacheLock->Unlock();
05750       return 0;
05751    }
05752 
05753    // Retrieve existing binaries, if any
05754    void *dirp = gSystem->OpenDirectory(fCacheDir);
05755    if (dirp) {
05756       const char *e = 0;
05757       while ((e = gSystem->GetDirEntry(dirp))) {
05758          if (!strncmp(e, binname.Data(), binname.Length())) {
05759             TString fncache;
05760             fncache.Form("%s/%s", fCacheDir.Data(), e);
05761             Bool_t docp = kTRUE;
05762             FileStat_t stlocal, stcache;
05763             if (!gSystem->GetPathInfo(fncache, stcache)) {
05764                Int_t rc = gSystem->GetPathInfo(e, stlocal);
05765                if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
05766                   docp = kFALSE;
05767                // If a copy candidate, check also the MD5
05768                if (docp) {
05769                   TMD5 *md5local = TMD5::FileChecksum(e);
05770                   TMD5 *md5cache = TMD5::FileChecksum(fncache);
05771                   if (md5local && md5cache && md5local == md5cache) docp = kFALSE;
05772                   SafeDelete(md5local);
05773                   SafeDelete(md5cache);
05774                }
05775                // Copy the file, if needed
05776                if (docp) {
05777                   gSystem->Exec(TString::Format("%s %s", kRM, e));
05778                   PDB(kCache,1)
05779                      Info("CopyFromCache",
05780                           "retrieving %s from cache", fncache.Data());
05781                   gSystem->Exec(TString::Format("%s %s %s", kCP, fncache.Data(), e));
05782                }
05783             }
05784          }
05785       }
05786       gSystem->FreeDirectory(dirp);
05787    }
05788 
05789    // End of atomicity
05790    if (!locked) fCacheLock->Unlock();
05791 
05792    // Done
05793    return 0;
05794 }
05795 
05796 //______________________________________________________________________________
05797 Int_t TProofServ::CopyToCache(const char *macro, Int_t opt)
05798 {
05799    // Copy files related to 'macro' to the cache directory.
05800    // Action depends on 'opt':
05801    //
05802    //    opt = 0         copy 'macro' to cache and delete from cache any binary
05803    //                    related to name; e.g. if macro = bla.C, the binaries are
05804    //                    bla_C.so, bla_C.rootmap, ...
05805    //    opt = 1         copy the binaries related to macro to the cache
05806    //
05807    // Returns 0 on success, -1 otherwise
05808 
05809    if (!macro || strlen(macro) <= 0 || opt < 0 || opt > 1)
05810       // Invalid inputs
05811       return -1;
05812 
05813    // Split out the aclic mode, if any
05814    TString name = macro;
05815    TString acmode, args, io;
05816    name = gSystem->SplitAclicMode(name, acmode, args, io);
05817 
05818    PDB(kGlobal,1)
05819       Info("CopyToCache","enter: opt: %d, names: %s, %s", opt, macro, name.Data());
05820 
05821    // Create binary name template
05822    TString binname = name;
05823    Int_t dot = binname.Last('.');
05824    if (dot != kNPOS)
05825       binname.Replace(dot,1,"_");
05826 
05827    // Create version file name template
05828    TString vername;
05829    vername.Form(".%s", name.Data());
05830    dot = vername.Last('.');
05831    if (dot != kNPOS)
05832       vername.Remove(dot);
05833    vername += ".binversion";
05834    Bool_t savever = kFALSE;
05835 
05836    // Atomic action
05837    Bool_t locked = (fCacheLock->IsLocked()) ? kTRUE : kFALSE;
05838    if (!locked) fCacheLock->Lock();
05839 
05840    // Action depends on 'opt'
05841    if (opt == 0) {
05842       // Save name to cache
05843       PDB(kCache,1)
05844          Info("CopyToCache",
05845               "caching %s/%s ...", fCacheDir.Data(), name.Data());
05846       gSystem->Exec(TString::Format("%s %s %s", kCP, name.Data(), fCacheDir.Data()));
05847       // If needed, remove from the cache any existing binary related to 'name'
05848       if (dot != kNPOS) {
05849          binname += ".*";
05850          PDB(kCache,1)
05851             Info("CopyToCache", "opt = 0: removing binaries '%s'", binname.Data());
05852          gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), binname.Data()));
05853          gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), vername.Data()));
05854       }
05855    } else if (opt == 1) {
05856       // If needed, copy to the cache any existing binary related to 'name'.
05857       if (dot != kNPOS) {
05858          binname += ".";
05859          void *dirp = gSystem->OpenDirectory(".");
05860          if (dirp) {
05861             const char *e = 0;
05862             while ((e = gSystem->GetDirEntry(dirp))) {
05863                if (!strncmp(e, binname.Data(), binname.Length())) {
05864                   Bool_t docp = kTRUE;
05865                   FileStat_t stlocal, stcache;
05866                   if (!gSystem->GetPathInfo(e, stlocal)) {
05867                      TString fncache;
05868                      fncache.Form("%s/%s", fCacheDir.Data(), e);
05869                      Int_t rc = gSystem->GetPathInfo(fncache, stcache);
05870                      if (rc == 0 && (stlocal.fMtime <= stcache.fMtime)) {
05871                         docp = kFALSE;
05872                         if (rc == 0) rc = -1;
05873                      }
05874                      // If a copy candidate, check also the MD5
05875                      if (docp) {
05876                         TMD5 *md5local = TMD5::FileChecksum(e);
05877                         TMD5 *md5cache = TMD5::FileChecksum(fncache);
05878                         if (md5local && md5cache && md5local == md5cache) docp = kFALSE;
05879                         SafeDelete(md5local);
05880                         SafeDelete(md5cache);
05881                         if (!docp) rc = -2;
05882                      }
05883                      // Copy the file, if needed
05884                      if (docp) {
05885                         gSystem->Exec(TString::Format("%s %s", kRM, fncache.Data()));
05886                         PDB(kCache,1)
05887                            Info("CopyToCache","caching %s ... (reason: %d)", e, rc);
05888                         gSystem->Exec(TString::Format("%s %s %s", kCP, e, fncache.Data()));
05889                         savever = kTRUE;
05890                      }
05891                   }
05892                }
05893             }
05894             gSystem->FreeDirectory(dirp);
05895          }
05896          // Save binary version if requested
05897          if (savever) {
05898             PDB(kCache,1)
05899                Info("CopyToCache","updating version file %s ...", vername.Data());
05900             FILE *f = fopen(TString::Format("%s/%s", fCacheDir.Data(), vername.Data()), "w");
05901             if (f) {
05902                fputs(gROOT->GetVersion(), f);
05903                fputs(TString::Format("\n%d",gROOT->GetSvnRevision()), f);
05904                fclose(f);
05905             }
05906          }
05907       }
05908    }
05909 
05910    // End of atomicity
05911    if (!locked) fCacheLock->Unlock();
05912 
05913    // Done
05914    return 0;
05915 }
05916 
05917 //______________________________________________________________________________
05918 void TProofServ::MakePlayer()
05919 {
05920    // Make player instance.
05921 
05922    TVirtualProofPlayer *p = 0;
05923 
05924    // Cleanup first
05925    DeletePlayer();
05926 
05927    if (IsParallel()) {
05928       // remote mode
05929       p = fProof->MakePlayer();
05930    } else {
05931       // slave or sequential mode
05932       p = TVirtualProofPlayer::Create("slave", 0, fSocket);
05933       if (IsMaster())
05934          fProof->SetPlayer(p);
05935    }
05936 
05937    // set player
05938    fPlayer = p;
05939 }
05940 
05941 //______________________________________________________________________________
05942 void TProofServ::DeletePlayer()
05943 {
05944    // Delete player instance.
05945 
05946    if (IsMaster()) {
05947       if (fProof) fProof->SetPlayer(0);
05948    } else {
05949       SafeDelete(fPlayer);
05950    }
05951    fPlayer = 0;
05952 }
05953 
05954 //______________________________________________________________________________
05955 Int_t TProofServ::GetPriority()
05956 {
05957    // Get the processing priority for the group the user belongs too. This
05958    // prioroty is a number (0 - 100) determined by a scheduler (third
05959    // party process) based on some basic priority the group has, e.g.
05960    // we might want to give users in a specific group (e.g. promptana)
05961    // a higher priority than users in other groups, and on the analysis
05962    // of historical logging data (i.e. usage of CPU by the group in a
05963    // previous time slot, as recorded in TPerfStats::WriteQueryLog()).
05964    //
05965    // Currently the group priority is obtained by a query in a SQL DB
05966    // table proofpriority, which has the format:
05967    // CREATE TABLE proofpriority (
05968    //   id            INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
05969    //   group         VARCHAR(32) NOT NULL,
05970    //   priority      INT
05971    //)
05972 
05973    TString sqlserv = gEnv->GetValue("ProofServ.QueryLogDB","");
05974    TString sqluser = gEnv->GetValue("ProofServ.QueryLogUser","");
05975    TString sqlpass = gEnv->GetValue("ProofServ.QueryLogPasswd","");
05976 
05977    Int_t priority = 100;
05978 
05979    if (sqlserv == "")
05980       return priority;
05981 
05982    TString sql;
05983    sql.Form("SELECT priority WHERE group='%s' FROM proofpriority", fGroup.Data());
05984 
05985    // open connection to SQL server
05986    TSQLServer *db =  TSQLServer::Connect(sqlserv, sqluser, sqlpass);
05987 
05988    if (!db || db->IsZombie()) {
05989       Error("GetPriority", "failed to connect to SQL server %s as %s %s",
05990             sqlserv.Data(), sqluser.Data(), sqlpass.Data());
05991       printf("%s\n", sql.Data());
05992    } else {
05993       TSQLResult *res = db->Query(sql);
05994 
05995       if (!res) {
05996          Error("GetPriority", "query into proofpriority failed");
05997          printf("%s\n", sql.Data());
05998       } else {
05999          TSQLRow *row = res->Next();   // first row is header
06000          priority = atoi(row->GetField(0));
06001          delete row;
06002       }
06003       delete res;
06004    }
06005    delete db;
06006 
06007    return priority;
06008 }
06009 
06010 //______________________________________________________________________________
06011 Int_t TProofServ::SendAsynMessage(const char *msg, Bool_t lf)
06012 {
06013    // Send an asychronous message to the master / client .
06014    // Masters will forward up the message to the client.
06015    // The client prints 'msg' of stderr and adds a '\n'/'\r' depending on
06016    // 'lf' being kTRUE (default) or kFALSE.
06017    // Returns the return value from TSocket::Send(TMessage &) .
06018    static TMessage m(kPROOF_MESSAGE);
06019 
06020    // To leave a track in the output file ... if requested
06021    // (clients will be notified twice)
06022    PDB(kAsyn,1)
06023       Info("SendAsynMessage","%s", (msg ? msg : "(null)"));
06024 
06025    if (fSocket && msg) {
06026       m.Reset(kPROOF_MESSAGE);
06027       m << TString(msg) << lf;
06028       return fSocket->Send(m);
06029    }
06030 
06031    // No message
06032    return -1;
06033 }
06034 
06035 //______________________________________________________________________________
06036 void TProofServ::FlushLogFile()
06037 {
06038    // Reposition the read pointer in the log file to the very end.
06039    // This allows to "hide" useful debug messages during normal operations
06040    // while preserving the possibility to have them in case of problems.
06041 
06042    off_t lend = lseek(fileno(stdout), (off_t)0, SEEK_END);
06043    if (lend >= 0) lseek(fLogFileDes, lend, SEEK_SET);
06044 }
06045 
06046 //______________________________________________________________________________
06047 void TProofServ::HandleException(Int_t sig)
06048 {
06049    // Exception handler: we do not try to recover here, just exit.
06050 
06051    Error("HandleException", "caugth exception triggered by signal '%d' %s",
06052                             sig, fgLastMsg.Data());
06053    // Description
06054    TString emsg;
06055    emsg.Form("%s: caught exception triggered by signal '%d' %s",
06056              GetOrdinal(), sig, fgLastMsg.Data());
06057    // Try to warn the user
06058    SendAsynMessage(emsg.Data());
06059 
06060    gSystem->Exit(sig);
06061 }
06062 
06063 //______________________________________________________________________________
06064 Int_t TProofServ::HandleDataSets(TMessage *mess, TString *slb)
06065 {
06066    // Handle here requests about datasets.
06067 
06068    if (gDebug > 0)
06069       Info("HandleDataSets", "enter");
06070 
06071    // We need a dataset manager
06072    if (!fDataSetManager) {
06073       Warning("HandleDataSets", "no data manager is available to fullfil the request");
06074       return -1;
06075    }
06076 
06077    // Used in most cases
06078    TString dsUser, dsGroup, dsName, dsTree, uri, opt;
06079    Int_t rc = 0;
06080 
06081    // Message type
06082    Int_t type = 0;
06083    (*mess) >> type;
06084 
06085    switch (type) {
06086       case TProof::kCheckDataSetName:
06087          //
06088          // Check whether this dataset exist
06089          {
06090             (*mess) >> uri;
06091             if (slb) slb->Form("%d %s", type, uri.Data());
06092             if (fDataSetManager->ExistsDataSet(uri))
06093                // Dataset name does exist
06094                return -1;
06095          }
06096          break;
06097       case TProof::kRegisterDataSet:
06098          // list size must be above 0
06099          {
06100             if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
06101                (*mess) >> uri;
06102                (*mess) >> opt;
06103                if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06104                // Extract the list
06105                TFileCollection *dataSet =
06106                   dynamic_cast<TFileCollection*> ((mess->ReadObject(TFileCollection::Class())));
06107                if (!dataSet || dataSet->GetList()->GetSize() == 0) {
06108                   Error("HandleDataSets", "can not save an empty list.");
06109                   return -1;
06110                }
06111                // Register the dataset (quota checks are done inside here)
06112                rc = fDataSetManager->RegisterDataSet(uri, dataSet, opt);
06113                delete dataSet;
06114                return rc;
06115             } else {
06116                Info("HandleDataSets", "dataset registration not allowed");
06117                if (slb) slb->Form("%d notallowed", type);
06118                return -1;
06119             }
06120          }
06121          break;
06122 
06123       case TProof::kShowDataSets:
06124          {
06125             (*mess) >> uri >> opt;
06126             if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06127             // Show content
06128             fDataSetManager->ShowDataSets(uri, opt);
06129          }
06130          break;
06131 
06132       case TProof::kGetDataSets:
06133          {
06134             (*mess) >> uri >> opt;
06135             if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06136             // Get the datasets and fill a map
06137             UInt_t omsk = (UInt_t)TDataSetManager::kExport;
06138             Ssiz_t kLite = opt.Index(":lite:", 0, TString::kIgnoreCase);
06139             if (kLite != kNPOS) {
06140                omsk |= (UInt_t)TDataSetManager::kReadShort;
06141                opt.Remove(kLite, strlen(":lite:"));
06142             }
06143             TMap *returnMap = fDataSetManager->GetDataSets(uri, omsk);
06144             // If defines, option gives the name of a server for which to extract the information
06145             if (returnMap && !opt.IsNull()) {
06146                // The return map will be in the form   </group/user/datasetname> --> <dataset>
06147                TMap *rmap = new TMap;
06148                TObject *k = 0;
06149                TFileCollection *fc = 0, *xfc = 0;
06150                TIter nxd(returnMap);
06151                while ((k = nxd()) && (fc = (TFileCollection *) returnMap->GetValue(k))) {
06152                   // Get subset on specified server, if any
06153                   if ((xfc = fc->GetFilesOnServer(opt.Data()))) {
06154                      rmap->Add(new TObjString(k->GetName()), xfc);
06155                   }
06156                }
06157                returnMap->DeleteAll();
06158                if (rmap->GetSize() > 0) {
06159                   returnMap = rmap;
06160                } else {
06161                   Info("HandleDataSets", "no dataset found on server '%s'", opt.Data());
06162                   delete rmap;
06163                   returnMap = 0;
06164                }
06165             }
06166             if (returnMap) {
06167                // Send them back
06168                fSocket->SendObject(returnMap, kMESS_OK);
06169                returnMap->DeleteAll();
06170             } else {
06171                // Failure
06172                return -1;
06173             }
06174          }
06175          break;
06176       case TProof::kGetDataSet:
06177          {
06178             (*mess) >> uri >> opt;
06179             if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06180             // Get the list
06181             TFileCollection *fileList = fDataSetManager->GetDataSet(uri,opt);
06182             if (fileList) {
06183                fSocket->SendObject(fileList, kMESS_OK);
06184                delete fileList;
06185             } else {
06186                // Failure
06187                return -1;
06188             }
06189          }
06190          break;
06191       case TProof::kRemoveDataSet:
06192          {
06193             if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
06194                (*mess) >> uri;
06195                if (slb) slb->Form("%d %s", type, uri.Data());
06196                if (!fDataSetManager->RemoveDataSet(uri)) {
06197                   // Failure
06198                   return -1;
06199                }
06200             } else {
06201                Info("HandleDataSets", "dataset creation / removal not allowed");
06202                if (slb) slb->Form("%d notallowed", type);
06203                return -1;
06204             }
06205          }
06206          break;
06207       case TProof::kVerifyDataSet:
06208          {
06209             if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
06210                (*mess) >> uri >> opt;
06211                if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06212                TProofServLogHandlerGuard hg(fLogFile,  fSocket);
06213                rc = fDataSetManager->ScanDataSet(uri, opt);
06214                // TODO: verify in parallel:
06215                //  - dataset = GetDataSet(uri)
06216                //  - TList flist; TDataSetManager::ScanDataSet(dataset, ..., &flist)
06217                //  - fPlayer->Process( ... flist ...) // needs to be developed
06218                //  - dataset->Integrate(flist) (perhaps automatic; flist object owned by dataset)
06219                //  - RegisterDataSet(uri, dataset, "OT")
06220             } else {
06221                Info("HandleDataSets", "dataset verification not allowed");
06222                return -1;
06223             }
06224          }
06225          break;
06226       case TProof::kGetQuota:
06227          {
06228             if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
06229                if (slb) slb->Form("%d", type);
06230                TMap *groupQuotaMap = fDataSetManager->GetGroupQuotaMap();
06231                if (groupQuotaMap) {
06232                   // Send result
06233                   fSocket->SendObject(groupQuotaMap, kMESS_OK);
06234                } else {
06235                   return -1;
06236                }
06237             } else {
06238                Info("HandleDataSets", "quota control disabled");
06239                if (slb) slb->Form("%d disabled", type);
06240                return -1;
06241             }
06242          }
06243          break;
06244       case TProof::kShowQuota:
06245          {
06246             if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
06247                if (slb) slb->Form("%d", type);
06248                (*mess) >> opt;
06249                // Display quota information
06250                fDataSetManager->ShowQuota(opt);
06251             } else {
06252                Info("HandleDataSets", "quota control disabled");
06253                if (slb) slb->Form("%d disabled", type);
06254             }
06255          }
06256          break;
06257       case TProof::kSetDefaultTreeName:
06258          {
06259             if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
06260                (*mess) >> uri;
06261                if (slb) slb->Form("%d %s", type, uri.Data());
06262                rc = fDataSetManager->ScanDataSet(uri, (UInt_t)TDataSetManager::kSetDefaultTree);
06263             } else {
06264                Info("HandleDataSets", "kSetDefaultTreeName: modification of dataset info not allowed");
06265                if (slb) slb->Form("%d notallowed", type);
06266                return -1;
06267             }
06268          }
06269          break;
06270       case TProof::kCache:
06271          {
06272             (*mess) >> uri >> opt;
06273             if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
06274             if (opt == "show") {
06275                // Show cache content
06276                fDataSetManager->ShowCache(uri);
06277             } else if (opt == "clear") {
06278                // Clear cache content
06279                fDataSetManager->ClearCache(uri);
06280             } else {
06281                Error("HandleDataSets", "kCache: unknown action: %s", opt.Data());
06282             }
06283          }
06284          break;
06285       default:
06286          rc = -1;
06287          Error("HandleDataSets", "unknown type %d", type);
06288          break;
06289    }
06290 
06291    // We are done
06292    return rc;
06293 }
06294 
06295 //______________________________________________________________________________
06296 void TProofServ::HandleSubmerger(TMessage *mess)
06297 {
06298    // Handle a message of type kPROOF_SUBMERGER
06299 
06300    // Message type
06301    Int_t type = 0;
06302    (*mess) >> type;
06303 
06304    TString msg;
06305    switch (type) {
06306       case TProof::kOutputSize:
06307          break;
06308 
06309       case TProof::kSendOutput:
06310          {
06311             Bool_t deleteplayer = kTRUE;
06312             if (!IsMaster()) {
06313                if (fMergingMonitor) {
06314                   Info("HandleSubmerger", "kSendOutput: interrupting ...");
06315                   fMergingMonitor->Interrupt();
06316                }
06317                if (fMergingSocket) {
06318                   if (fMergingMonitor) fMergingMonitor->Remove(fMergingSocket);
06319                   fMergingSocket->Close();
06320                   SafeDelete(fMergingSocket);
06321                }
06322 
06323                TString name;
06324                Int_t port = 0;
06325                Int_t merger_id = -1;
06326                (*mess) >> merger_id >> name >> port;
06327                PDB(kSubmerger, 1)
06328                   Info("HandleSubmerger","worker %s redirected to merger #%d %s:%d", fOrdinal.Data(), merger_id, name.Data(), port);
06329 
06330                TSocket *t = 0;
06331                if (name.Length() > 0 && port > 0 && (t = new TSocket(name, port)) && t->IsValid()) {
06332 
06333                   PDB(kSubmerger, 2) Info("HandleSubmerger",
06334                                           "kSendOutput: worker asked for sending output to merger #%d %s:%d",
06335                                           merger_id, name.Data(), port);
06336 
06337                   if (SendResults(t, fPlayer->GetOutputList()) != 0) {
06338                      msg.Form("worker %s cannot send results to merger #%d at %s:%d", GetPrefix(), merger_id, name.Data(), port);
06339                      PDB(kSubmerger, 2) Info("HandleSubmerger",
06340                                              "kSendOutput: %s - inform the master", msg.Data());
06341                      SendAsynMessage(msg);
06342                      // Results not send
06343                      TMessage answ(kPROOF_SUBMERGER);
06344                      answ << Int_t(TProof::kMergerDown);
06345                      answ << merger_id;
06346                      fSocket->Send(answ);
06347                   } else {
06348                      // Worker informs master that it had sent its output to the merger
06349                      TMessage answ(kPROOF_SUBMERGER);
06350                      answ << Int_t(TProof::kOutputSent);
06351                      answ << merger_id;
06352                      fSocket->Send(answ);
06353 
06354                      PDB(kSubmerger, 2) Info("HandleSubmerger", "kSendOutput: worker sent its output");
06355                      fSocket->Send(kPROOF_SETIDLE);
06356                      SetIdle(kTRUE);
06357                      SendLogFile();
06358                   }
06359                } else {
06360 
06361                   if (name == "master") {
06362                      PDB(kSubmerger, 2) Info("HandleSubmerger",
06363                                              "kSendOutput: worker was asked for sending output to master");
06364                      if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
06365                         Warning("HandleSubmerger", "problems sending output list");
06366                      // Signal the master that we are idle
06367                      fSocket->Send(kPROOF_SETIDLE);
06368                      SetIdle(kTRUE);
06369                      SendLogFile();
06370 
06371                   } else if (!t || !(t->IsValid())) {
06372                      msg.Form("worker %s could not open a valid socket to merger #%d at %s:%d",
06373                               GetPrefix(), merger_id, name.Data(), port);
06374                      PDB(kSubmerger, 2) Info("HandleSubmerger",
06375                                              "kSendOutput: %s - inform the master", msg.Data());
06376                      SendAsynMessage(msg);
06377                      // Results not send
06378                      TMessage answ(kPROOF_SUBMERGER);
06379                      answ << Int_t(TProof::kMergerDown);
06380                      answ << merger_id;
06381                      fSocket->Send(answ);
06382                      deleteplayer = kFALSE;
06383                   }
06384 
06385                   if (t) SafeDelete(t);
06386 
06387                }
06388 
06389             } else {
06390                Error("HandleSubmerger", "kSendOutput: received not on worker");
06391             }
06392 
06393             // Cleanup
06394             if (deleteplayer) DeletePlayer();
06395          }
06396          break;
06397       case TProof::kBeMerger:
06398          {
06399             Bool_t deleteplayer = kTRUE;
06400             if (!IsMaster()) {
06401                Int_t merger_id = -1;
06402                //Int_t merger_port = 0;
06403                Int_t connections = 0;
06404                (*mess) >> merger_id  >> connections;
06405                PDB(kSubmerger, 2)
06406                   Info("HandleSubmerger", "worker %s established as merger", fOrdinal.Data());
06407 
06408                PDB(kSubmerger, 2)
06409                   Info("HandleSubmerger",
06410                        "kBeMerger: worker asked for being merger #%d for %d connections",
06411                        merger_id, connections);
06412 
06413                TVirtualProofPlayer *mergerPlayer =  TVirtualProofPlayer::Create("remote",fProof,0);
06414                PDB(kSubmerger, 2) Info("HandleSubmerger",
06415                                        "kBeMerger: mergerPlayer created (%p) ", mergerPlayer);
06416 
06417                // Accept results from assigned workers
06418                if (AcceptResults(connections, mergerPlayer)) {
06419                   PDB(kSubmerger, 2)
06420                      Info("HandleSubmerger", "kBeMerger: all outputs from workers accepted");
06421 
06422                   PDB(kSubmerger, 2)
06423                      Info("","adding own output to the list on %s", fOrdinal.Data());
06424 
06425                   // Add own results to the output list.
06426                   // On workers the player does not own the output list, which is owned
06427                   // by the selector and deleted in there
06428                   // On workers the player does not own the output list, which is owned
06429                   // by the selector and deleted in there
06430                   TIter nxo(fPlayer->GetOutputList());
06431                   TObject * o = 0;
06432                   while ((o = nxo())) {
06433                      if ((mergerPlayer->AddOutputObject(o) != 1)) {
06434                         // Remove the object if it has not been merged: it is owned
06435                         // now by the merger player (in its output list)
06436                         PDB(kSubmerger, 2) Info("HandleSocketInput", "removing merged object (%p)", o);
06437                         fPlayer->GetOutputList()->Remove(o);
06438                      }
06439                   }
06440                   PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: own outputs added");
06441                   PDB(kSubmerger, 2) Info("HandleSubmerger","starting delayed merging on %s", fOrdinal.Data());
06442 
06443                   // Delayed merging if neccessary
06444                   mergerPlayer->MergeOutput();
06445 
06446                   PDB(kSubmerger, 2) Info("HandleSubmerger", "delayed merging on %s finished ", fOrdinal.Data());
06447                   PDB(kSubmerger, 2) Info("HandleSubmerger", "%s sending results to master ", fOrdinal.Data());
06448                   // Send merged results to master
06449                   if (SendResults(fSocket, mergerPlayer->GetOutputList()) != 0)
06450                      Warning("HandleSubmerger","kBeMerger: problems sending output list");
06451                   mergerPlayer->GetOutputList()->SetOwner(kTRUE);
06452                   delete mergerPlayer;
06453 
06454                   PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: results sent to master");
06455                   // Signal the master that we are idle
06456                   fSocket->Send(kPROOF_SETIDLE);
06457                   SetIdle(kTRUE);
06458                   SendLogFile();
06459                } else {
06460                   // Results from all assigned workers not accepted
06461                   TMessage answ(kPROOF_SUBMERGER);
06462                   answ << Int_t(TProof::kMergerDown);
06463                   answ << merger_id;
06464                   fSocket->Send(answ);
06465                   deleteplayer = kFALSE;
06466                }
06467             } else {
06468                Error("HandleSubmerger","kSendOutput: received not on worker");
06469             }
06470 
06471             // Cleanup
06472             if (deleteplayer) DeletePlayer();
06473          }
06474          break;
06475 
06476       case TProof::kMergerDown:
06477          break;
06478 
06479       case TProof::kStopMerging:
06480          {
06481             // Received only in case of forced termination of merger by master
06482             PDB(kSubmerger, 2)  Info("HandleSubmerger", "kStopMerging");
06483             if (fMergingMonitor) {
06484                Info("HandleSubmerger", "kStopMerging: interrupting ...");
06485                fMergingMonitor->Interrupt();
06486             }
06487          }
06488          break;
06489 
06490       case TProof::kOutputSent:
06491          break;
06492    }
06493 }
06494 
06495 //______________________________________________________________________________
06496 void TProofServ::HandleFork(TMessage *)
06497 {
06498    // Cloning itself via fork. Not implemented
06499 
06500    Info("HandleFork", "fork cloning not implemented");
06501 }
06502 
06503 //______________________________________________________________________________
06504 Int_t TProofServ::Fork()
06505 {
06506    // Fork a child.
06507    // If successful, return 0 in the child process and the child pid in the parent
06508    // process. The child pid is registered for reaping.
06509    // Return <0 in the parent process in case of failure.
06510 
06511 #ifndef WIN32
06512    // Fork
06513    pid_t pid;
06514    if ((pid = fork()) < 0) {
06515       Error("Fork", "failed to fork");
06516       return pid;
06517    }
06518 
06519    // Nothing else to do in the child
06520    if (!pid) return pid;
06521 
06522    // Make sure that the reaper timer is started
06523    if (!fReaperTimer) {
06524       fReaperTimer = new TReaperTimer(1000);
06525       fReaperTimer->Start(-1);
06526    }
06527 
06528    // Register the new child
06529    fReaperTimer->AddPid(pid);
06530 
06531    // Done
06532    return pid;
06533 #else
06534    Warning("Fork", "Functionality not provided under windows");
06535    return -1;
06536 #endif
06537 }
06538 
06539 //______________________________________________________________________________
06540 void TProofServ::ResolveKeywords(TString &fname, const char *path)
06541 {
06542    // Replace <ord>, <user>, <u>, <group>, <stag>, <qnum> and <file> placeholders in fname
06543 
06544    // Replace <user>, if any
06545    if (fname.Contains("<user>")) {
06546       if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
06547          fname.ReplaceAll("<user>", gProofServ->GetUser());
06548       } else {
06549          fname.ReplaceAll("<user>", "nouser");
06550       }
06551    }
06552    // Replace <us>, if any
06553    if (fname.Contains("<u>")) {
06554       if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
06555          TString u(gProofServ->GetUser()[0]);
06556          fname.ReplaceAll("<u>", u);
06557       } else {
06558          fname.ReplaceAll("<u>", "n");
06559       }
06560    }
06561    // Replace <group>, if any
06562    if (fname.Contains("<group>")) {
06563       if (gProofServ && gProofServ->GetGroup() && strlen(gProofServ->GetGroup()))
06564          fname.ReplaceAll("<group>", gProofServ->GetGroup());
06565       else
06566          fname.ReplaceAll("<group>", "default");
06567    }
06568    // Replace <stag>, if any
06569    if (fname.Contains("<stag>")) {
06570       if (gProofServ && gProofServ->GetSessionTag() && strlen(gProofServ->GetSessionTag()))
06571          fname.ReplaceAll("<stag>", gProofServ->GetSessionTag());
06572       else
06573          ::Warning("TProofServ::ResolveKeywords", "session tag undefined: ignoring");
06574    }
06575    // Replace <ord>, if any
06576    if (fname.Contains("<ord>")) {
06577       if (gProofServ && gProofServ->GetOrdinal() && strlen(gProofServ->GetOrdinal()))
06578          fname.ReplaceAll("<ord>", gProofServ->GetOrdinal());
06579       else
06580          ::Warning("TProofServ::ResolveKeywords", "ordinal number undefined: ignoring");
06581    }
06582    // Replace <qnum>, if any
06583    if (fname.Contains("<qnum>")) {
06584       if (gProofServ && gProofServ->GetQuerySeqNum() && gProofServ->GetQuerySeqNum() > 0)
06585          fname.ReplaceAll("<qnum>", TString::Format("%d", gProofServ->GetQuerySeqNum()).Data());
06586       else
06587          ::Warning("TProofServ::ResolveKeywords", "query seqeuntial number undefined: ignoring");
06588    }
06589    // Replace <file>, if any
06590    if (fname.Contains("<file>") && path && strlen(path) > 0) {
06591       fname.ReplaceAll("<file>", path);
06592    }
06593 }
06594 
06595 //______________________________________________________________________________
06596 Int_t TProofServ::GetSessionStatus()
06597 {
06598    // Return the status of this session:
06599    //     0     idle
06600    //     1     running
06601    //     2     being terminated  (currently unused)
06602    //     3     queued
06603    //     4     idle timed-out (not set in here but in TIdleTOTimer::Notify)
06604    // This is typically run in the reader thread, so access needs to be protected
06605 
06606    R__LOCKGUARD(fQMtx);
06607    Int_t st = (fIdle) ? 0 : 1;
06608    if (fIdle && fWaitingQueries->GetSize() > 0) st = 3;
06609    return st;
06610 }
06611 
06612 //______________________________________________________________________________
06613 Int_t TProofServ::UpdateSessionStatus(Int_t xst)
06614 {
06615    // Update the session status in the relevant file. The status is taken from
06616    // GetSessionStatus() unless xst >= 0, in which case xst is used.
06617    // Return 0 on success, -errno if the file could not be opened.
06618 
06619    FILE *fs = fopen(fAdminPath.Data(), "w");
06620    if (fs) {
06621       Int_t st = (xst < 0) ? GetSessionStatus() : xst;
06622       fprintf(fs, "%d", st);
06623       fclose(fs);
06624       PDB(kGlobal, 2)
06625          Info("UpdateSessionStatus", "status (=%d) update in path: %s", st, fAdminPath.Data());
06626    } else {
06627       return -errno;
06628    }
06629    // Done
06630    return 0;
06631 }
06632 
06633 //______________________________________________________________________________
06634 Bool_t TProofServ::IsIdle()
06635 {
06636    // Return the idle status
06637    R__LOCKGUARD(fQMtx);
06638    return fIdle;
06639 }
06640 
06641 //______________________________________________________________________________
06642 void TProofServ::SetIdle(Bool_t st)
06643 {
06644    // Change the idle status
06645    R__LOCKGUARD(fQMtx);
06646    fIdle = st;
06647 }
06648 
06649 //______________________________________________________________________________
06650 Bool_t TProofServ::IsWaiting()
06651 {
06652    // Return kTRUE if the session is waiting for the OK to start processing
06653    R__LOCKGUARD(fQMtx);
06654    if (fIdle && fWaitingQueries->GetSize() > 0) return kTRUE;
06655    return kFALSE;
06656 }
06657 
06658 //______________________________________________________________________________
06659 Int_t TProofServ::WaitingQueries()
06660 {
06661    // Return the number of waiting queries
06662    R__LOCKGUARD(fQMtx);
06663    return fWaitingQueries->GetSize();
06664 }
06665 
06666 //______________________________________________________________________________
06667 Int_t TProofServ::QueueQuery(TProofQueryResult *pq)
06668 {
06669    // Add a query to the waiting list
06670    // Returns the number of queries in the list
06671    R__LOCKGUARD(fQMtx);
06672    fWaitingQueries->Add(pq);
06673    return fWaitingQueries->GetSize();
06674 }
06675 
06676 //______________________________________________________________________________
06677 TProofQueryResult *TProofServ::NextQuery()
06678 {
06679    // Get the next query from the waiting list.
06680    // The query is removed from the list.
06681    R__LOCKGUARD(fQMtx);
06682    TProofQueryResult *pq = (TProofQueryResult *) fWaitingQueries->First();
06683    fWaitingQueries->Remove(pq);
06684    return pq;
06685 }
06686 
06687 //______________________________________________________________________________
06688 Int_t TProofServ::CleanupWaitingQueries(Bool_t del, TList *qls)
06689 {
06690    // Cleanup the waiting queries list. The objects are deleted if 'del' is true.
06691    // If 'qls' is non null, only objects in 'qls' are removed.
06692    // Returns the number of cleanup queries
06693    R__LOCKGUARD(fQMtx);
06694    Int_t ncq = 0;
06695    if (qls) {
06696       TIter nxq(qls);
06697       TObject *o = 0;
06698       while ((o = nxq())) {
06699          if (fWaitingQueries->FindObject(o)) ncq++;
06700          fWaitingQueries->Remove(o);
06701          if (del) delete o;
06702       }
06703    } else {
06704       ncq = fWaitingQueries->GetSize();
06705       fWaitingQueries->SetOwner(del);
06706       fWaitingQueries->Delete();
06707    }
06708    // Done
06709    return ncq;
06710 }
06711 
06712 //______________________________________________________________________________
06713 void TProofServ::SetLastMsg(const char *lastmsg)
06714 {
06715    // Set the message to be sent back in case of exceptions
06716 
06717    fgLastMsg = lastmsg;
06718 }
06719 
06720 //______________________________________________________________________________
06721 Long_t TProofServ::GetVirtMemMax()
06722 {
06723    // VirtMemMax getter
06724    return fgVirtMemMax;
06725 }
06726 //______________________________________________________________________________
06727 Long_t TProofServ::GetResMemMax()
06728 {
06729    // ResMemMax getter
06730    return fgResMemMax;
06731 }
06732 //______________________________________________________________________________
06733 Float_t TProofServ::GetMemHWM()
06734 {
06735    // MemHWM getter
06736    return fgMemHWM;
06737 }
06738 //______________________________________________________________________________
06739 Float_t TProofServ::GetMemStop()
06740 {
06741    // MemStop getter
06742    return fgMemStop;
06743 }
06744 
06745 //______________________________________________________________________________
06746 Int_t TProofLockPath::Lock()
06747 {
06748    // Locks the directory. Waits if lock is hold by an other process.
06749    // Returns 0 on success, -1 in case of error.
06750 
06751    const char *pname = GetName();
06752 
06753    if (gSystem->AccessPathName(pname))
06754       fLockId = open(pname, O_CREAT|O_RDWR, 0644);
06755    else
06756       fLockId = open(pname, O_RDWR);
06757 
06758    if (fLockId == -1) {
06759       SysError("Lock", "cannot open lock file %s", pname);
06760       return -1;
06761    }
06762 
06763    PDB(kPackage, 2)
06764       Info("Lock", "%d: locking file %s ...", gSystem->GetPid(), pname);
06765    // lock the file
06766 #if !defined(R__WIN32) && !defined(R__WINGCC)
06767    if (lockf(fLockId, F_LOCK, (off_t) 1) == -1) {
06768       SysError("Lock", "error locking %s", pname);
06769       close(fLockId);
06770       fLockId = -1;
06771       return -1;
06772    }
06773 #endif
06774 
06775    PDB(kPackage, 2)
06776       Info("Lock", "%d: file %s locked", gSystem->GetPid(), pname);
06777 
06778    return 0;
06779 }
06780 
06781 //______________________________________________________________________________
06782 Int_t TProofLockPath::Unlock()
06783 {
06784    // Unlock the directory. Returns 0 in case of success,
06785    // -1 in case of error.
06786 
06787    if (!IsLocked())
06788       return 0;
06789 
06790    PDB(kPackage, 2)
06791       Info("Lock", "%d: unlocking file %s ...", gSystem->GetPid(), GetName());
06792    // unlock the file
06793    lseek(fLockId, 0, SEEK_SET);
06794 #if !defined(R__WIN32) && !defined(R__WINGCC)
06795    if (lockf(fLockId, F_ULOCK, (off_t)1) == -1) {
06796       SysError("Unlock", "error unlocking %s", GetName());
06797       close(fLockId);
06798       fLockId = -1;
06799       return -1;
06800    }
06801 #endif
06802 
06803    PDB(kPackage, 2)
06804       Info("Unlock", "%d: file %s unlocked", gSystem->GetPid(), GetName());
06805 
06806    close(fLockId);
06807    fLockId = -1;
06808 
06809    return 0;
06810 }

Generated on Tue Jul 5 14:51:41 2011 for ROOT_528-00b_version by  doxygen 1.5.1