TXProofServ.cxx

Go to the documentation of this file.
00001 // @(#)root/proofx:$Id: TXProofServ.cxx 36592 2010-11-11 10:43:17Z ganis $
00002 // Author: Gerardo Ganis  12/12/2005
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TXProofServ                                                          //
00015 //                                                                      //
00016 // TXProofServ is the XRD version of the PROOF server. It differs from  //
00017 // TXProofServ only for the underlying connection technology            //
00018 //                                                                      //
00019 //////////////////////////////////////////////////////////////////////////
00020 
00021 #include "RConfigure.h"
00022 #include "RConfig.h"
00023 #include "Riostream.h"
00024 
00025 #ifdef WIN32
00026    #include <io.h>
00027    typedef long off_t;
00028 #endif
00029 #include <sys/types.h>
00030 #include <netinet/in.h>
00031 #include <utime.h>
00032 
00033 #include "TXProofServ.h"
00034 #include "TObjString.h"
00035 #include "TEnv.h"
00036 #include "TError.h"
00037 #include "TException.h"
00038 #include "THashList.h"
00039 #include "TInterpreter.h"
00040 #include "TParameter.h"
00041 #include "TProofDebug.h"
00042 #include "TProof.h"
00043 #include "TProofPlayer.h"
00044 #include "TQueryResultManager.h"
00045 #include "TRegexp.h"
00046 #include "TClass.h"
00047 #include "TROOT.h"
00048 #include "TSystem.h"
00049 #include "TPluginManager.h"
00050 #include "TXSocketHandler.h"
00051 #include "TXUnixSocket.h"
00052 #include "compiledata.h"
00053 #include "TProofNodeInfo.h"
00054 #include "XProofProtocol.h"
00055 
00056 #include <XrdClient/XrdClientConst.hh>
00057 #include <XrdClient/XrdClientEnv.hh>
00058 
00059 
00060 // debug hook
00061 static volatile Int_t gProofServDebug = 1;
00062 
00063 //----- SigPipe signal handler -------------------------------------------------
00064 //______________________________________________________________________________
00065 class TXProofServSigPipeHandler : public TSignalHandler {
00066    TXProofServ  *fServ;
00067 public:
00068    TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
00069       { fServ = s; }
00070    Bool_t  Notify();
00071 };
00072 
00073 //______________________________________________________________________________
00074 Bool_t TXProofServSigPipeHandler::Notify()
00075 {
00076    fServ->HandleSigPipe();
00077    return kTRUE;
00078 }
00079 
00080 //----- Termination signal handler ---------------------------------------------
00081 //______________________________________________________________________________
00082 class TXProofServTerminationHandler : public TSignalHandler {
00083    TXProofServ  *fServ;
00084 public:
00085    TXProofServTerminationHandler(TXProofServ *s)
00086       : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
00087    Bool_t  Notify();
00088 };
00089 
00090 //______________________________________________________________________________
00091 Bool_t TXProofServTerminationHandler::Notify()
00092 {
00093    Printf("Received SIGTERM: terminating");
00094 
00095    fServ->HandleTermination();
00096    return kTRUE;
00097 }
00098 
00099 //----- Seg violation signal handler ---------------------------------------------
00100 //______________________________________________________________________________
00101 class TXProofServSegViolationHandler : public TSignalHandler {
00102    TXProofServ  *fServ;
00103 public:
00104    TXProofServSegViolationHandler(TXProofServ *s)
00105       : TSignalHandler(kSigSegmentationViolation, kFALSE) { fServ = s; }
00106    Bool_t  Notify();
00107 };
00108 
00109 //______________________________________________________________________________
00110 Bool_t TXProofServSegViolationHandler::Notify()
00111 {
00112    Printf("**** ");
00113    Printf("**** Segmentation violation: terminating ****");
00114    Printf("**** ");
00115    fServ->HandleTermination();
00116    return kTRUE;
00117 }
00118 
00119 //----- Input handler for messages from parent or master -----------------------
00120 //______________________________________________________________________________
00121 class TXProofServInputHandler : public TFileHandler {
00122    TXProofServ  *fServ;
00123 public:
00124    TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
00125       { fServ = s; }
00126    Bool_t Notify();
00127    Bool_t ReadNotify() { return Notify(); }
00128 };
00129 
00130 //______________________________________________________________________________
00131 Bool_t TXProofServInputHandler::Notify()
00132 {
00133    fServ->HandleSocketInput();
00134    // This request has been completed: remove the client ID from the pipe
00135    ((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
00136    return kTRUE;
00137 }
00138 
00139 ClassImp(TXProofServ)
00140 
00141 // Hook to the constructor. This is needed to avoid using the plugin manager
00142 // which may create problems in multi-threaded environments.
00143 extern "C" {
00144    TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
00145    { return new TXProofServ(argc, argv, flog); }
00146 }
00147 
00148 //______________________________________________________________________________
00149 TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
00150             : TProofServ(argc, argv, flog)
00151 {
00152    // Main constructor
00153 
00154    fInterruptHandler = 0;
00155    fInputHandler = 0;
00156    fTerminated = kFALSE;
00157 
00158    // TODO:
00159    //    Int_t useFIFO = 0;
00160 /*   if (GetParameter(fProof->GetInputList(), "PROOF_UseFIFO", useFIFO) != 0) {
00161       if (useFIFO == 1)
00162          Info("", "enablig use of FIFO (if allowed by the server)");
00163       else
00164          Warning("", "unsupported strategy index (%d): ignore", strategy);
00165    }
00166 */
00167 }
00168 
00169 //______________________________________________________________________________
00170 Int_t TXProofServ::CreateServer()
00171 {
00172    // Finalize the server setup. If master, create the TProof instance to talk
00173    // the worker or submaster nodes.
00174    // Return 0 on success, -1 on error
00175 
00176    Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
00177 
00178    if (gProofDebugLevel > 0)
00179       Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
00180 
00181    // Get file descriptor for log file
00182    if (fLogFile) {
00183       // Use the file already open by pmain
00184       if ((fLogFileDes = fileno(fLogFile)) < 0) {
00185          Error("CreateServer", "resolving the log file description number");
00186          return -1;
00187       }
00188       // Hide the session start-up logs unless we are in verbose mode
00189       if (gProofDebugLevel <= 0)
00190          lseek(fLogFileDes, (off_t) 0, SEEK_END);
00191    }
00192 
00193    // Global location string in TXSocket
00194    TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
00195 
00196    // Set debug level in XrdClient
00197    EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
00198 
00199    // Get socket to be used to call back our xpd
00200    if (xtest) {
00201       // test session, just send the protocol version on the open pipe
00202       // and exit
00203       if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
00204          Error("CreateServer", "Socket setup by xpd undefined");
00205          return -1;
00206       }
00207       Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
00208       int proto = htonl(kPROOF_Protocol);
00209       fSockPath = "";
00210       if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
00211          Error("CreateServer", "test: sending protocol number");
00212          return -1;
00213       }
00214       exit(0);
00215    } else {
00216       fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
00217       if (fSockPath.Length() <= 0) {
00218          Error("CreateServer", "Socket setup by xpd undefined");
00219          return -1;
00220       }
00221       TString entity = gEnv->GetValue("ProofServ.Entity", "");
00222       if (entity.Length() > 0)
00223          fSockPath.Insert(0,Form("%s/", entity.Data()));
00224    }
00225 
00226    // Get the sessions ID
00227    Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
00228    if (psid < 0) {
00229      Error("CreateServer", "Session ID undefined");
00230      return -1;
00231    }
00232 
00233    // Call back the server
00234    fSocket = new TXUnixSocket(fSockPath, psid, -1, this);
00235    if (!fSocket || !(fSocket->IsValid())) {
00236       Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
00237       return -1;
00238    }
00239    // Set compression level, if any
00240    fSocket->SetCompressionLevel(fCompressMsg);
00241 
00242    // Set the title for debugging
00243    TString tgt("client");
00244    if (fOrdinal != "0") {
00245       tgt = fOrdinal;
00246       if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
00247    }
00248    fSocket->SetTitle(tgt);
00249 
00250    // Set the this as reference of this socket
00251    ((TXSocket *)fSocket)->fReference = this;
00252 
00253    // Get socket descriptor
00254    Int_t sock = fSocket->GetDescriptor();
00255 
00256    // Install message input handlers
00257    fInputHandler =
00258       TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
00259    gSystem->AddFileHandler(fInputHandler);
00260 
00261    // Get the client ID
00262    Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
00263    if (cid < 0) {
00264      Error("CreateServer", "Client ID undefined");
00265      SendLogFile();
00266      return -1;
00267    }
00268    ((TXSocket *)fSocket)->SetClientID(cid);
00269 
00270    // debug hooks
00271    if (IsMaster()) {
00272       // wait (loop) in master to allow debugger to connect
00273       if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
00274          while (gProofServDebug)
00275             ;
00276       }
00277    } else {
00278       // wait (loop) in slave to allow debugger to connect
00279       if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
00280          while (gProofServDebug)
00281             ;
00282       }
00283    }
00284 
00285    if (gProofDebugLevel > 0)
00286       Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
00287            fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
00288 
00289    if (Setup() == -1) {
00290       // Setup failure
00291       LogToMaster();
00292       SendLogFile();
00293       Terminate(0);
00294       return -1;
00295    }
00296 
00297    if (!fLogFile) {
00298       RedirectOutput();
00299       // If for some reason we failed setting a redirection file for the logs
00300       // we cannot continue
00301       if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
00302          LogToMaster();
00303          SendLogFile(-98);
00304          Terminate(0);
00305          return -1;
00306       }
00307    }
00308 
00309    // Send message of the day to the client
00310    if (IsMaster()) {
00311       if (CatMotd() == -1) {
00312          LogToMaster();
00313          SendLogFile(-99);
00314          Terminate(0);
00315          return -1;
00316       }
00317    }
00318 
00319    // Everybody expects iostream to be available, so load it...
00320    ProcessLine("#include <iostream>", kTRUE);
00321    ProcessLine("#include <string>",kTRUE); // for std::string iostream.
00322 
00323    // Load user functions
00324    const char *logon;
00325    logon = gEnv->GetValue("Proof.Load", (char *)0);
00326    if (logon) {
00327       char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00328       if (mac)
00329          ProcessLine(Form(".L %s", logon), kTRUE);
00330       delete [] mac;
00331    }
00332 
00333    // Execute logon macro
00334    logon = gEnv->GetValue("Proof.Logon", (char *)0);
00335    if (logon && !NoLogOpt()) {
00336       char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00337       if (mac)
00338          ProcessFile(logon);
00339       delete [] mac;
00340    }
00341 
00342    // Save current interpreter context
00343    gInterpreter->SaveContext();
00344    gInterpreter->SaveGlobalsContext();
00345 
00346    // if master, start slave servers
00347    if (IsMaster()) {
00348       TString master = Form("proof://%s@__master__", fUser.Data());
00349 
00350       // Add port, if defined
00351       Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
00352       if (port > -1) {
00353          master += ":";
00354          master += port;
00355       }
00356 
00357       // Make sure that parallel startup via threads is not active
00358       // (it is broken for xpd because of the locks on gCINTMutex)
00359       gEnv->SetValue("Proof.ParallelStartup", 0);
00360 
00361       // Get plugin manager to load appropriate TProof from
00362       TPluginManager *pm = gROOT->GetPluginManager();
00363       if (!pm) {
00364          Error("CreateServer", "no plugin manager found");
00365          SendLogFile(-99);
00366          Terminate(0);
00367          return -1;
00368       }
00369 
00370       // Find the appropriate handler
00371       TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
00372       if (!h) {
00373          Error("CreateServer", "no plugin found for TProof with a"
00374                              " config file of '%s'", fConfFile.Data());
00375          SendLogFile(-99);
00376          Terminate(0);
00377          return -1;
00378       }
00379 
00380       // load the plugin
00381       if (h->LoadPlugin() == -1) {
00382          Error("CreateServer", "plugin for TProof could not be loaded");
00383          SendLogFile(-99);
00384          Terminate(0);
00385          return -1;
00386       }
00387 
00388       // make instance of TProof
00389       fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
00390                                                           fConfFile.Data(),
00391                                                           fConfDir.Data(),
00392                                                           fLogLevel,
00393                                                           fTopSessionTag.Data()));
00394       if (!fProof || !fProof->IsValid()) {
00395          Error("CreateServer", "plugin for TProof could not be executed");
00396          FlushLogFile();
00397          delete fProof;
00398          fProof = 0;
00399          SendLogFile(-99);
00400          Terminate(0);
00401          return -1;
00402       }
00403       // Find out if we are a master in direct contact only with workers
00404       fEndMaster = fProof->IsEndMaster();
00405 
00406       // Save worker info
00407       fProof->SaveWorkerInfo();
00408 
00409       SendLogFile();
00410    }
00411 
00412    // Setup the shutdown timer
00413    if (!fShutdownTimer) {
00414       // Check activity on socket every 5 mins
00415       fShutdownTimer = new TShutdownTimer(this, 300000);
00416       fShutdownTimer->Start(-1, kFALSE);
00417    }
00418 
00419    // Check if schema evolution is effective: clients running versions <=17 do not
00420    // support that: send a warning message
00421    if (fProtocol <= 17) {
00422       TString msg;
00423       msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
00424                "         This may generate compatibility problems between streamed objects.\n"
00425                "         The advise is to move to ROOT >= 5.21/02 .");
00426       SendAsynMessage(msg.Data());
00427    }
00428 
00429    // Setup the idle timer
00430    if (IsMaster() && !fIdleTOTimer) {
00431       // Check activity on socket every 5 mins
00432       Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
00433       if (idle_to > 0) {
00434          fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
00435          fIdleTOTimer->Start(-1, kTRUE);
00436          if (gProofDebugLevel > 0)
00437             Info("CreateServer", " idle timer started (%d secs)", idle_to);
00438       } else if (gProofDebugLevel > 0) {
00439          Info("CreateServer", " idle timer not started (no idle timeout requested)");
00440       }
00441    }
00442 
00443    // Done
00444    return 0;
00445 }
00446 
00447 //______________________________________________________________________________
00448 TXProofServ::~TXProofServ()
00449 {
00450    // Cleanup. Not really necessary since after this dtor there is no
00451    // live anyway.
00452 
00453    delete fSocket;
00454 }
00455 
00456 //______________________________________________________________________________
00457 void TXProofServ::HandleUrgentData()
00458 {
00459    // Handle high priority data sent by the master or client.
00460 
00461    // Real-time notification of messages
00462    TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
00463 
00464    // Get interrupt
00465    Bool_t fw = kFALSE;
00466    Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
00467    if (iLev < 0) {
00468       Error("HandleUrgentData", "error receiving interrupt");
00469       return;
00470    }
00471 
00472    PDB(kGlobal, 2)
00473       Info("HandleUrgentData", "got interrupt: %d\n", iLev);
00474 
00475    if (fProof)
00476       fProof->SetActive();
00477 
00478    switch (iLev) {
00479 
00480       case TProof::kPing:
00481          PDB(kGlobal, 2)
00482             Info("HandleUrgentData", "*** Ping");
00483 
00484          // If master server, propagate interrupt to slaves
00485          if (fw && IsMaster()) {
00486             Int_t nbad = fProof->fActiveSlaves->GetSize() - fProof->Ping();
00487             if (nbad > 0) {
00488                Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
00489             }
00490          }
00491 
00492          // Touch the admin path to show we are alive
00493          if (fAdminPath.IsNull()) {
00494             fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
00495          }
00496 
00497          if (!fAdminPath.IsNull()) {
00498             if (!fAdminPath.EndsWith(".status")) {
00499                // Update file time stamps
00500                if (utime(fAdminPath.Data(), 0) != 0)
00501                   Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
00502                else
00503                   PDB(kGlobal, 2)
00504                      Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
00505             } else {
00506                // Update the status in the file
00507                //     0     idle
00508                //     1     running
00509                //     2     being terminated  (currently unused)
00510                //     3     queued
00511                //     4     idle timed-out
00512                Int_t uss_rc = UpdateSessionStatus(-1);
00513                if (uss_rc != 0)
00514                   Error("HandleUrgentData", "problems updating status path: %s (errno: %d)", fAdminPath.Data(), -uss_rc);
00515             }
00516          } else {
00517             Info("HandleUrgentData", "admin path undefined");
00518          }
00519 
00520          break;
00521 
00522       case TProof::kHardInterrupt:
00523          Info("HandleUrgentData", "*** Hard Interrupt");
00524 
00525          // If master server, propagate interrupt to slaves
00526          if (fw && IsMaster())
00527             fProof->Interrupt(TProof::kHardInterrupt);
00528 
00529          // Flush input socket
00530          ((TXSocket *)fSocket)->Flush();
00531 
00532          if (IsMaster())
00533             SendLogFile();
00534 
00535          break;
00536 
00537       case TProof::kSoftInterrupt:
00538          Info("HandleUrgentData", "Soft Interrupt");
00539 
00540          // If master server, propagate interrupt to slaves
00541          if (fw && IsMaster())
00542             fProof->Interrupt(TProof::kSoftInterrupt);
00543 
00544          Interrupt();
00545 
00546          if (IsMaster())
00547             SendLogFile();
00548 
00549          break;
00550 
00551 
00552       case TProof::kShutdownInterrupt:
00553          Info("HandleUrgentData", "Shutdown Interrupt");
00554 
00555          // When returning for here connection are closed
00556          HandleTermination();
00557 
00558          break;
00559 
00560       default:
00561          Error("HandleUrgentData", "unexpected type: %d", iLev);
00562          break;
00563    }
00564 
00565 
00566    if (fProof) fProof->SetActive(kFALSE);
00567 }
00568 
00569 //______________________________________________________________________________
00570 void TXProofServ::HandleSigPipe()
00571 {
00572    // Called when the client is not alive anymore; terminate the session.
00573 
00574    // Real-time notification of messages
00575 
00576    Info("HandleSigPipe","got sigpipe ... do nothing");
00577 }
00578 
00579 //______________________________________________________________________________
00580 void TXProofServ::HandleTermination()
00581 {
00582    // Called when the client is not alive anymore; terminate the session.
00583 
00584    // If master server, propagate interrupt to slaves
00585    // (shutdown interrupt send internally).
00586    if (IsMaster()) {
00587 
00588       // If not idle, try first to stop processing
00589       if (!fIdle) {
00590          // Remove pending requests
00591          fWaitingQueries->Delete();
00592          // Interrupt the current monitor
00593          fProof->InterruptCurrentMonitor();
00594          // Do not wait for ever, but al least 20 seconds
00595          Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
00596          timeout = (timeout > 20) ? timeout : 20;
00597          // Processing will be aborted
00598          fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
00599          // Receive end-of-processing messages, but do not wait for ever
00600          fProof->Collect(TProof::kActive, timeout);
00601          // Still not idle
00602          if (!fIdle)
00603             Warning("HandleTermination","processing could not be stopped");
00604       }
00605       // Close the session
00606       if (fProof)
00607          fProof->Close("S");
00608    }
00609 
00610    Terminate(0);  // will not return from here....
00611 }
00612 
00613 //______________________________________________________________________________
00614 Int_t TXProofServ::Setup()
00615 {
00616    // Print the ProofServ logo on standard output.
00617    // Return 0 on success, -1 on error
00618 
00619    char str[512];
00620 
00621    if (IsMaster()) {
00622       sprintf(str, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
00623    } else {
00624       sprintf(str, "**** PROOF worker server @ %s started ****", gSystem->HostName());
00625    }
00626 
00627    if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
00628       Error("Setup", "failed to send proof server startup message");
00629       return -1;
00630    }
00631 
00632    // Get client protocol
00633    if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
00634       Error("Setup", "remote proof protocol missing");
00635       return -1;
00636    }
00637 
00638    // The local user
00639    fUser = gEnv->GetValue("ProofServ.Entity", "");
00640    if (fUser.Length() >= 0) {
00641       if (fUser.Contains(":"))
00642          fUser.Remove(fUser.Index(":"));
00643       if (fUser.Contains("@"))
00644          fUser.Remove(fUser.Index("@"));
00645    } else {
00646       UserGroup_t *pw = gSystem->GetUserInfo();
00647       if (pw) {
00648          fUser = pw->fUser;
00649          delete pw;
00650       }
00651    }
00652 
00653    // Work dir and ...
00654    if (IsMaster()) {
00655       TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
00656       if (cf.Length() > 0)
00657          fConfFile = cf;
00658    }
00659    fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
00660 
00661    // Get Session tag
00662    if ((fTopSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
00663       Error("Setup", "Session tag missing");
00664       return -1;
00665    }
00666    fSessionTag = fTopSessionTag;
00667    // Make sure the process ID is in the tag
00668    TString spid = Form("-%d", gSystem->GetPid());
00669    if (!fSessionTag.EndsWith(spid)) {
00670       Int_t nd = 0;
00671       if ((nd = fSessionTag.CountChar('-')) >= 2) {
00672          Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
00673          if (id != kNPOS) fSessionTag.Remove(id);
00674       } else if (nd != 1) {
00675          Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
00676       }
00677       // Add this process ID
00678       fSessionTag += spid;
00679    }
00680    if (gProofDebugLevel > 0)
00681       Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
00682 
00683    // Get Session dir (sandbox)
00684    if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
00685       Error("Setup", "Session dir missing");
00686       return -1;
00687    }
00688 
00689    // Goto to the main PROOF working directory
00690    char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
00691    fWorkDir = workdir;
00692    delete [] workdir;
00693    if (gProofDebugLevel > 0)
00694       Info("Setup", "working directory set to %s", fWorkDir.Data());
00695 
00696    // Common setup
00697    if (SetupCommon() != 0) {
00698       Error("Setup", "common setup failed");
00699       return -1;
00700    }
00701 
00702    // Send packages off immediately to reduce latency
00703    fSocket->SetOption(kNoDelay, 1);
00704 
00705    // Check every two hours if client is still alive
00706    fSocket->SetOption(kKeepAlive, 1);
00707 
00708    // Install SigPipe handler to handle kKeepAlive failure
00709    gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
00710 
00711    // Install Termination handler
00712    gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
00713 
00714    // Install seg violation handler
00715    gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
00716 
00717    if (gProofDebugLevel > 0)
00718       Info("Setup", "successfully completed");
00719 
00720    // Done
00721    return 0;
00722 }
00723 
00724 //______________________________________________________________________________
00725 TProofServ::EQueryAction TXProofServ::GetWorkers(TList *workers,
00726                                                  Int_t & /* prioritychange */,
00727                                                  Bool_t resume)
00728 {
00729    // Get list of workers to be used from now on.
00730    // The list must be provided by the caller.
00731 
00732    TProofServ::EQueryAction rc = kQueryStop;
00733 
00734    // If user config files are enabled, check them first
00735    if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
00736       Int_t pc = 1;
00737       if ((rc = TProofServ::GetWorkers(workers, pc)) == kQueryOK)
00738          return rc;
00739    }
00740 
00741    // seqnum of the query for which we call getworkers
00742    Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
00743    TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
00744    if (!fWaitingQueries->IsEmpty()) {
00745       if (resume) {
00746          seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
00747       } else {
00748          seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
00749       }
00750    }
00751    // Send request to the coordinator
00752    TObjString *os =
00753       ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
00754 
00755    // The reply contains some information about the master (image, workdir)
00756    // followed by the information about the workers; the tokens for each node
00757    // are separated by '&'
00758    if (os) {
00759       TString fl(os->GetName());
00760       if (fl.BeginsWith(XPD_GW_QueryEnqueued)) {
00761          SendAsynMessage("+++ Query cannot be processed now: enqueued");
00762          return kQueryEnqueued;
00763       }
00764 
00765       // Honour a max number of workers request (typically when running in valgrind)
00766       Int_t nwrks = -1;
00767       Bool_t pernode = kFALSE;
00768       if (gSystem->Getenv("PROOF_NWORKERS")) {
00769          TString s(gSystem->Getenv("PROOF_NWORKERS"));
00770          if (s.EndsWith("x")) {
00771             pernode = kTRUE;
00772             s.ReplaceAll("x", "");
00773          }
00774          if (s.IsDigit()) {
00775             nwrks = s.Atoi();
00776             if (nwrks > 0) {
00777                // Notify
00778                TString msg;
00779                if (pernode) {
00780                   msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
00781                } else {
00782                   msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
00783                }
00784                SendAsynMessage(msg);
00785             } else {
00786                nwrks = -1;
00787             }
00788          } else {
00789             pernode = kFALSE;
00790          }
00791       }
00792 
00793       TString tok;
00794       Ssiz_t from = 0;
00795       TList *nodecnt = (pernode) ? new TList : 0 ;
00796       if (fl.Tokenize(tok, from, "&")) {
00797          if (!tok.IsNull()) {
00798             TProofNodeInfo *master = new TProofNodeInfo(tok);
00799             if (!master) {
00800                Error("GetWorkers", "no appropriate master line got from coordinator");
00801                return kQueryStop;
00802             } else {
00803                // Set image if not yet done and available
00804                if (fImage.IsNull() && strlen(master->GetImage()) > 0)
00805                   fImage = master->GetImage();
00806                SafeDelete(master);
00807             }
00808             // Now the workers
00809             while (fl.Tokenize(tok, from, "&") && (nwrks == -1 || nwrks > 0)) {
00810                if (!tok.IsNull()) {
00811                   // We have the minimal set of information to start
00812                   rc = kQueryOK;
00813                   if (pernode && nodecnt) {
00814                      TProofNodeInfo *ni = new TProofNodeInfo(tok);
00815                      TParameter<Int_t> *p = 0;
00816                      Int_t nw = 0;
00817                      if (!(p = (TParameter<Int_t> *) nodecnt->FindObject(ni->GetNodeName().Data()))) {
00818                         p = new TParameter<Int_t>(ni->GetNodeName().Data(), nw);
00819                         nodecnt->Add(p);
00820                      }
00821                      nw = p->GetVal();
00822                      if (gDebug > 0)
00823                         Info("GetWorkers","%p: name: %s (%s) val: %d (nwrks: %d)",
00824                                           p, p->GetName(), ni->GetNodeName().Data(),  nw, nwrks);
00825                      if (nw < nwrks) {
00826                         if (workers) workers->Add(ni);
00827                         nw++;
00828                         p->SetVal(nw);
00829                      } else {
00830                         // Two many workers on this machine already
00831                         SafeDelete(ni);
00832                      }
00833                   } else {
00834                      if (workers)
00835                         workers->Add(new TProofNodeInfo(tok));
00836                      // Count down
00837                      if (nwrks != -1) nwrks--;
00838                   }
00839                }
00840             }
00841          }
00842       }
00843       // Cleanup
00844       if (nodecnt) {
00845          nodecnt->SetOwner(kTRUE);
00846          SafeDelete(nodecnt);
00847       }
00848    }
00849 
00850    // We are done
00851    return rc;
00852 }
00853 
00854 //_____________________________________________________________________________
00855 Bool_t TXProofServ::HandleError(const void *)
00856 {
00857    // Handle error on the input socket
00858 
00859    // Try reconnection
00860    if (fSocket && !fSocket->IsValid()) {
00861 
00862       fSocket->Reconnect();
00863       if (fSocket && fSocket->IsValid()) {
00864          if (gDebug > 0)
00865             Info("HandleError",
00866                "%p: connection to local coordinator re-established", this);
00867          FlushLogFile();
00868          return kFALSE;
00869       }
00870    }
00871    Printf("TXProofServ::HandleError: %p: got called ...", this);
00872 
00873    // If master server, propagate interrupt to slaves
00874    // (shutdown interrupt send internally).
00875    if (IsMaster())
00876       fProof->Close("S");
00877 
00878    // Avoid communicating back anything to the coordinator (it is gone)
00879    ((TXSocket *)fSocket)->SetSessionID(-1);
00880 
00881    Terminate(0);
00882 
00883    Printf("TXProofServ::HandleError: %p: DONE ... ", this);
00884 
00885    // We are done
00886    return kTRUE;
00887 }
00888 
00889 //_____________________________________________________________________________
00890 Bool_t TXProofServ::HandleInput(const void *in)
00891 {
00892    // Handle asynchronous input on the input socket
00893 
00894    if (gDebug > 2)
00895       Printf("TXProofServ::HandleInput %p, in: %p", this, in);
00896 
00897    XHandleIn_t *hin = (XHandleIn_t *) in;
00898    Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
00899 
00900    // Act accordingly
00901    if (acod == kXPD_ping || acod == kXPD_interrupt) {
00902       // Interrupt or Ping
00903       HandleUrgentData();
00904 
00905    } else if (acod == kXPD_flush) {
00906       // Flush stdout, so that we can access the full log file
00907       Info("HandleInput","kXPD_flush: flushing log file (stdout)");
00908       fflush(stdout);
00909 
00910    } else if (acod == kXPD_urgent) {
00911       // Get type
00912       Int_t type = hin->fInt2;
00913       switch (type) {
00914       case TXSocket::kStopProcess:
00915          {
00916             // Abort or Stop ?
00917             Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
00918             // Timeout
00919             Int_t timeout = hin->fInt4;
00920             // Act now
00921             if (fProof)
00922                fProof->StopProcess(abort, timeout);
00923             else
00924                if (fPlayer)
00925                   fPlayer->StopProcess(abort, timeout);
00926          }
00927          break;
00928       default:
00929          Info("HandleInput","kXPD_urgent: unknown type: %d", type);
00930       }
00931 
00932    } else if (acod == kXPD_inflate) {
00933 
00934       // Set inflate factor
00935       fInflateFactor = (hin->fInt2 >= 1000) ? hin->fInt2 : fInflateFactor;
00936       // Notify
00937       Info("HandleInput", "kXPD_inflate: inflate factor set to %f",
00938            (Float_t) fInflateFactor / 1000.);
00939 
00940    } else if (acod == kXPD_priority) {
00941 
00942       // The factor is the priority to be propagated
00943       fGroupPriority = hin->fInt2;
00944       if (fProof)
00945          fProof->BroadcastGroupPriority(fGroup, fGroupPriority);
00946       // Notify
00947       Info("HandleInput", "kXPD_priority: group %s priority set to %f",
00948            fGroup.Data(), (Float_t) fGroupPriority / 100.);
00949 
00950    } else if (acod == kXPD_clusterinfo) {
00951 
00952       // Information about the cluster status
00953       fTotSessions     = hin->fInt2;
00954       fActSessions     = hin->fInt3;
00955       fEffSessions     = (hin->fInt4)/1000.;
00956       // Notify
00957       Info("HandleInput", "kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
00958            fTotSessions, fActSessions, fEffSessions);
00959 
00960    } else {
00961       // Standard socket input
00962       HandleSocketInput();
00963       // This request has been completed: remove the client ID from the pipe
00964       ((TXSocket *)fSocket)->RemoveClientID();
00965    }
00966 
00967    // We are done
00968    return kTRUE;
00969 }
00970 
00971 //______________________________________________________________________________
00972 void TXProofServ::DisableTimeout()
00973 {
00974    // Disable read timeout on the underlying socket
00975 
00976    if (fSocket)
00977      ((TXSocket *)fSocket)->DisableTimeout();
00978 }
00979 
00980 //______________________________________________________________________________
00981 void TXProofServ::EnableTimeout()
00982 {
00983    // Enable read timeout on the underlying socket
00984 
00985    if (fSocket)
00986      ((TXSocket *)fSocket)->EnableTimeout();
00987 }
00988 
00989 //______________________________________________________________________________
00990 void TXProofServ::Terminate(Int_t status)
00991 {
00992    // Terminate the proof server.
00993 
00994    if (fTerminated)
00995       // Avoid doubling the exit operations
00996       exit(1);
00997    fTerminated = kTRUE;
00998 
00999    // Notify
01000    Info("Terminate", "starting session termination operations ...");
01001    if (fgLogToSysLog > 0) {
01002       TString s;
01003       s.Form("%s -1 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
01004       gSystem->Syslog(kLogNotice, s.Data());
01005    }
01006 
01007    // Notify the memory footprint
01008    ProcInfo_t pi;
01009    if (!gSystem->GetProcInfo(&pi)){
01010       Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
01011                         pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
01012    }
01013 
01014    // Deactivate current monitor, if any
01015    if (fProof)
01016       fProof->SetMonitor(0, kFALSE);
01017 
01018    // Cleanup session directory
01019    if (status == 0) {
01020       // make sure we remain in a "connected" directory
01021       gSystem->ChangeDirectory("/");
01022       // needed in case fSessionDir is on NFS ?!
01023       gSystem->MakeDirectory(fSessionDir+"/.delete");
01024       gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
01025    }
01026 
01027    // Cleanup queries directory if empty
01028    if (IsMaster()) {
01029       if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
01030          // make sure we remain in a "connected" directory
01031          gSystem->ChangeDirectory("/");
01032          // needed in case fQueryDir is on NFS ?!
01033          gSystem->MakeDirectory(fQueryDir+"/.delete");
01034          gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
01035          // Remove lock file
01036          if (fQueryLock)
01037             gSystem->Unlink(fQueryLock->GetName());
01038        }
01039 
01040       // Unlock the query dir owned by this session
01041       if (fQueryLock)
01042          fQueryLock->Unlock();
01043    } else {
01044       // Try to stop processing if any
01045       Bool_t abort = (status == 0) ? kFALSE : kTRUE;
01046       if (!fIdle && fPlayer)
01047          fPlayer->StopProcess(abort,1);
01048       gSystem->Sleep(2000);
01049    }
01050 
01051    // Cleanup data directory if empty
01052    if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
01053      if (UnlinkDataDir(fDataDir))
01054         Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
01055    }
01056 
01057    // Remove input and signal handlers to avoid spurious "signals"
01058    // for closing activities executed upon exit()
01059    gSystem->RemoveFileHandler(fInputHandler);
01060 
01061    // Stop processing events (set a flag to exit the event loop)
01062    gSystem->ExitLoop();
01063 
01064    // We post the pipe once to wake up the main thread which is waiting for
01065    // activity on this socket; this fake activity will make it return and
01066    // eventually exit the loop.
01067    TXSocket::fgPipe.Post((TXSocket *)fSocket);
01068 
01069    // Notify
01070    Printf("Terminate: termination operations ended: quitting!");
01071 }
01072 
01073 //______________________________________________________________________________
01074 Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
01075 {
01076    // Try locking query area of session tagged sessiontag.
01077    // The id of the locking file is returned in fid and must be
01078    // unlocked via UnlockQueryFile(fid).
01079 
01080    // We do not need to lock our own session
01081    if (strstr(sessiontag, fTopSessionTag))
01082       return 0;
01083 
01084    if (!lck) {
01085       Info("LockSession","locker space undefined");
01086       return -1;
01087    }
01088    *lck = 0;
01089 
01090    // Check the format
01091    TString stag = sessiontag;
01092    TRegexp re("session-.*-.*-.*");
01093    Int_t i1 = stag.Index(re);
01094    if (i1 == kNPOS) {
01095       Info("LockSession","bad format: %s", sessiontag);
01096       return -1;
01097    }
01098    stag.ReplaceAll("session-","");
01099 
01100    // Drop query number, if any
01101    Int_t i2 = stag.Index(":q");
01102    if (i2 != kNPOS)
01103       stag.Remove(i2);
01104 
01105    // Make sure that parent process does not exist anylonger
01106    TString parlog = fSessionDir;
01107    parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
01108    parlog += stag;
01109    if (!gSystem->AccessPathName(parlog)) {
01110       Info("LockSession","parent still running: do nothing");
01111       return -1;
01112    }
01113 
01114    // Lock the query lock file
01115    TString qlock = fQueryLock->GetName();
01116    qlock.ReplaceAll(fTopSessionTag, stag);
01117 
01118    if (!gSystem->AccessPathName(qlock)) {
01119       *lck = new TProofLockPath(qlock);
01120       if (((*lck)->Lock()) < 0) {
01121          Info("LockSession","problems locking query lock file");
01122          SafeDelete(*lck);
01123          return -1;
01124       }
01125    }
01126 
01127    // We are done
01128    return 0;
01129 }
01130 
01131 //______________________________________________________________________________
01132 void TXProofServ::ReleaseWorker(const char *ord)
01133 {
01134    // Send message to intermediate coordinator to release worker of last ordinal
01135    // ord.
01136 
01137    Info("ReleaseWorker","releasing: %s", ord);
01138 
01139    ((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
01140 }

Generated on Tue Jul 5 14:52:27 2011 for ROOT_528-00b_version by  doxygen 1.5.1