TProofServLite.cxx

Go to the documentation of this file.
00001 // @(#)root/proofx:$Id: TProofServLite.cxx 38146 2011-02-18 12:23:01Z ganis $
00002 // Author: Gerardo Ganis  12/12/2005
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TProofServLite                                                       //
00015 //                                                                      //
00016 // TProofServLite is the version of the PROOF worker server for local   //
00017 // running. The client starts directly the desired number of these      //
00018 // workers; the master and daemons are eliminated, optimizing the number//
00019 // of messages exchanged and created / destroyed.                       //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 #include "RConfigure.h"
00024 #include "RConfig.h"
00025 #include "Riostream.h"
00026 
00027 #ifdef WIN32
00028    #include <io.h>
00029    typedef long off_t;
00030    #include <snprintf.h>
00031 #else
00032 #include <netinet/in.h>
00033 #endif
00034 #include <sys/types.h>
00035 #include <cstdlib>
00036 
00037 #include "TProofServLite.h"
00038 #include "TObjString.h"
00039 #include "TEnv.h"
00040 #include "TError.h"
00041 #include "TException.h"
00042 #include "THashList.h"
00043 #include "TInterpreter.h"
00044 #include "TMessage.h"
00045 #include "TProofDebug.h"
00046 #include "TProof.h"
00047 #include "TProofPlayer.h"
00048 #include "TProofQueryResult.h"
00049 #include "TRegexp.h"
00050 #include "TClass.h"
00051 #include "TROOT.h"
00052 #include "TSystem.h"
00053 #include "TPluginManager.h"
00054 #include "TSocket.h"
00055 #include "TTimeStamp.h"
00056 #include "compiledata.h"
00057 
00058 
00059 // debug hook
00060 static volatile Int_t gProofServDebug = 1;
00061 
00062 //----- Interrupt signal handler -----------------------------------------------
00063 //______________________________________________________________________________
00064 class TProofServLiteInterruptHandler : public TSignalHandler {
00065    TProofServLite  *fServ;
00066 public:
00067    TProofServLiteInterruptHandler(TProofServLite *s)
00068       : TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
00069    Bool_t  Notify();
00070 };
00071 
00072 //______________________________________________________________________________
00073 Bool_t TProofServLiteInterruptHandler::Notify()
00074 {
00075    // Handle urgent data
00076 
00077    fServ->HandleUrgentData();
00078    if (TROOT::Initialized()) {
00079       Throw(GetSignal());
00080    }
00081    return kTRUE;
00082 }
00083 
00084 //----- SigPipe signal handler -------------------------------------------------
00085 //______________________________________________________________________________
00086 class TProofServLiteSigPipeHandler : public TSignalHandler {
00087    TProofServLite  *fServ;
00088 public:
00089    TProofServLiteSigPipeHandler(TProofServLite *s) : TSignalHandler(kSigPipe, kFALSE)
00090       { fServ = s; }
00091    Bool_t  Notify();
00092 };
00093 
00094 //______________________________________________________________________________
00095 Bool_t TProofServLiteSigPipeHandler::Notify()
00096 {
00097    // Handle sig pipe
00098 
00099    fServ->HandleSigPipe();
00100    return kTRUE;
00101 }
00102 
00103 //----- Termination signal handler ---------------------------------------------
00104 //______________________________________________________________________________
00105 class TProofServLiteTerminationHandler : public TSignalHandler {
00106    TProofServLite  *fServ;
00107 public:
00108    TProofServLiteTerminationHandler(TProofServLite *s)
00109       : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
00110    Bool_t  Notify();
00111 };
00112 
00113 //______________________________________________________________________________
00114 Bool_t TProofServLiteTerminationHandler::Notify()
00115 {
00116    // Handle termination
00117 
00118    Printf("TProofServLiteTerminationHandler::Notify: wake up!");
00119 
00120    fServ->HandleTermination();
00121    return kTRUE;
00122 }
00123 
00124 //----- Seg violation signal handler ---------------------------------------------
00125 //______________________________________________________________________________
00126 class TProofServLiteSegViolationHandler : public TSignalHandler {
00127    TProofServLite  *fServ;
00128 public:
00129    TProofServLiteSegViolationHandler(TProofServLite *s)
00130       : TSignalHandler(kSigSegmentationViolation, kFALSE) { fServ = s; }
00131    Bool_t  Notify();
00132 };
00133 
00134 //______________________________________________________________________________
00135 Bool_t TProofServLiteSegViolationHandler::Notify()
00136 {
00137    // Handle seg violation
00138 
00139    Printf("**** ");
00140    Printf("**** Segmentation violation: terminating ****");
00141    Printf("**** ");
00142    fServ->HandleTermination();
00143    return kTRUE;
00144 }
00145 
00146 //----- Input handler for messages from parent or master -----------------------
00147 //______________________________________________________________________________
00148 class TProofServLiteInputHandler : public TFileHandler {
00149    TProofServLite  *fServ;
00150 public:
00151    TProofServLiteInputHandler(TProofServLite *s, Int_t fd) : TFileHandler(fd, 1)
00152       { fServ = s; }
00153    Bool_t Notify();
00154    Bool_t ReadNotify() { return Notify(); }
00155 };
00156 
00157 //______________________________________________________________________________
00158 Bool_t TProofServLiteInputHandler::Notify()
00159 {
00160    // Handle input on the socket
00161 
00162    fServ->HandleSocketInput();
00163 
00164    return kTRUE;
00165 }
00166 
00167 ClassImp(TProofServLite)
00168 
00169 // Hook to the constructor. This is needed to avoid using the plugin manager
00170 // which may create problems in multi-threaded environments.
00171 extern "C" {
00172    TApplication *GetTProofServLite(Int_t *argc, char **argv, FILE *flog)
00173    { return new TProofServLite(argc, argv, flog); }
00174 }
00175 
00176 //______________________________________________________________________________
00177 TProofServLite::TProofServLite(Int_t *argc, char **argv, FILE *flog)
00178             : TProofServ(argc, argv, flog)
00179 {
00180    // Main constructor
00181 
00182    fInterruptHandler = 0;
00183    fTerminated = kFALSE;
00184 }
00185 
00186 //______________________________________________________________________________
00187 Int_t TProofServLite::CreateServer()
00188 {
00189    // Finalize the server setup. If master, create the TProof instance to talk
00190    // the worker or submaster nodes.
00191    // Return 0 on success, -1 on error
00192 
00193    if (gProofDebugLevel > 0)
00194       Info("CreateServer", "starting server creation");
00195 
00196    // Get file descriptor for log file
00197    if (fLogFile) {
00198       // Use the file already open by pmain
00199       if ((fLogFileDes = fileno(fLogFile)) < 0) {
00200          Error("CreateServer", "resolving the log file description number");
00201          return -1;
00202       }
00203    }
00204 
00205    // Get socket to be used to call back our xpd
00206    fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
00207    if (fSockPath.Length() <= 0) {
00208       Error("CreateServer", "Socket setup by xpd undefined");
00209       return -1;
00210    }
00211    TString entity = gEnv->GetValue("ProofServ.Entity", "");
00212    if (entity.Length() > 0)
00213       fSockPath.Insert(0,TString::Format("%s/", entity.Data()));
00214 
00215    // Call back the client
00216    fSocket = new TSocket(fSockPath);
00217    if (!fSocket || !(fSocket->IsValid())) {
00218       Error("CreateServer", "Failed to open connection to the client");
00219       return -1;
00220    }
00221 
00222    // Send our ordinal, to allow the client to identify us
00223    TMessage msg;
00224    msg << fOrdinal;
00225    fSocket->Send(msg);
00226 
00227    // Get socket descriptor
00228    Int_t sock = fSocket->GetDescriptor();
00229 
00230    // Install interrupt and message input handlers
00231    fInterruptHandler = new TProofServLiteInterruptHandler(this);
00232    gSystem->AddSignalHandler(fInterruptHandler);
00233    gSystem->AddFileHandler(new TProofServLiteInputHandler(this, sock));
00234 
00235    // Wait (loop) in worker node to allow debugger to connect
00236    if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
00237       while (gProofServDebug)
00238          ;
00239    }
00240 
00241    if (gProofDebugLevel > 0)
00242       Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
00243            fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
00244 
00245    if (Setup() == -1) {
00246       // Setup failure
00247       Terminate(0);
00248       SendLogFile();
00249       return -1;
00250    }
00251 
00252    if (!fLogFile) {
00253       RedirectOutput();
00254       // If for some reason we failed setting a redirection file for the logs
00255       // we cannot continue
00256       if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
00257          Terminate(0);
00258          SendLogFile(-98);
00259          return -1;
00260       }
00261    }
00262 
00263    // Everybody expects iostream to be available, so load it...
00264    ProcessLine("#include <iostream>", kTRUE);
00265    ProcessLine("#include <string>",kTRUE); // for std::string iostream.
00266 
00267    // Load user functions
00268    const char *logon;
00269    logon = gEnv->GetValue("Proof.Load", (char *)0);
00270    if (logon) {
00271       char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00272       if (mac)
00273          ProcessLine(TString::Format(".L %s", logon), kTRUE);
00274       delete [] mac;
00275    }
00276 
00277    // Execute logon macro
00278    logon = gEnv->GetValue("Proof.Logon", (char *)0);
00279    if (logon && !NoLogOpt()) {
00280       char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00281       if (mac)
00282          ProcessFile(logon);
00283       delete [] mac;
00284    }
00285 
00286    // Save current interpreter context
00287    gInterpreter->SaveContext();
00288    gInterpreter->SaveGlobalsContext();
00289 
00290    // Avoid spurious messages at first action
00291    FlushLogFile();
00292 
00293    // Done
00294    return 0;
00295 }
00296 
00297 //______________________________________________________________________________
00298 TProofServLite::~TProofServLite()
00299 {
00300    // Cleanup. Not really necessary since after this dtor there is no
00301    // live anyway.
00302 
00303    delete fSocket;
00304 }
00305 
00306 //______________________________________________________________________________
00307 void TProofServLite::HandleSigPipe()
00308 {
00309    // Called when the client is not alive anymore; terminate the session.
00310 
00311    Terminate(0);  // will not return from here....
00312 }
00313 
00314 //______________________________________________________________________________
00315 void TProofServLite::HandleTermination()
00316 {
00317    // Called when the client is not alive anymore; terminate the session.
00318 
00319    Terminate(0);  // will not return from here....
00320 }
00321 
00322 //______________________________________________________________________________
00323 Int_t TProofServLite::Setup()
00324 {
00325    // Print the ProofServ logo on standard output.
00326    // Return 0 on success, -1 on error
00327 
00328    char str[512];
00329 
00330    if (IsMaster()) {
00331       snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
00332    } else {
00333       snprintf(str, 512, "**** PROOF worker server @ %s started ****", gSystem->HostName());
00334    }
00335 
00336    if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
00337       Error("Setup", "failed to send proof server startup message");
00338       return -1;
00339    }
00340 
00341    // Get client protocol
00342    if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
00343       Error("Setup", "remote proof protocol missing");
00344       return -1;
00345    }
00346 
00347    // The local user
00348    UserGroup_t *pw = gSystem->GetUserInfo();
00349    if (pw) {
00350       fUser = pw->fUser;
00351       delete pw;
00352    }
00353 
00354    // Work dir and ...
00355    fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
00356    Info("Setup", "fWorkDir: %s", fWorkDir.Data());
00357 
00358    // Get Session tags
00359    fTopSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1");
00360    fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), gSystem->HostName(),
00361                                     (Long_t)TTimeStamp().GetSec(), gSystem->GetPid());
00362    if (gProofDebugLevel > 0)
00363       Info("Setup", "session tag is %s", fSessionTag.Data());
00364    if (fTopSessionTag.IsNull()) fTopSessionTag = fSessionTag;
00365 
00366    // Send session tag to client
00367    TMessage m(kPROOF_SESSIONTAG);
00368    m << fSessionTag;
00369    fSocket->Send(m);
00370 
00371    // Get Session dir (sandbox)
00372    if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
00373       Error("Setup", "Session dir missing");
00374       return -1;
00375    }
00376 
00377    // Link the session tag to the log file
00378    if (gSystem->Getenv("ROOTPROOFLOGFILE")) {
00379       TString logfile = gSystem->Getenv("ROOTPROOFLOGFILE");
00380       Int_t iord = logfile.Index(TString::Format("-%s", fOrdinal.Data()));
00381       if (iord != kNPOS) logfile.Remove(iord);
00382       logfile += TString::Format("-%s.log", fSessionTag.Data());
00383       gSystem->Symlink(gSystem->Getenv("ROOTPROOFLOGFILE"), logfile);
00384    }
00385 
00386    // Goto to the main PROOF working directory
00387    char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
00388    fWorkDir = workdir;
00389    delete [] workdir;
00390    if (gProofDebugLevel > 0)
00391       Info("Setup", "working directory set to %s", fWorkDir.Data());
00392 
00393    // Common setup
00394    if (SetupCommon() != 0) {
00395       Error("Setup", "common setup failed");
00396       return -1;
00397    }
00398 
00399    // Check every two hours if client is still alive
00400    fSocket->SetOption(kKeepAlive, 1);
00401 
00402    // Install SigPipe handler to handle kKeepAlive failure
00403    gSystem->AddSignalHandler(new TProofServLiteSigPipeHandler(this));
00404 
00405    // Install Termination handler
00406    gSystem->AddSignalHandler(new TProofServLiteTerminationHandler(this));
00407 
00408    // Install seg violation handler
00409    gSystem->AddSignalHandler(new TProofServLiteSegViolationHandler(this));
00410 
00411    // Done
00412    return 0;
00413 }
00414 
00415 //______________________________________________________________________________
00416 void TProofServLite::Terminate(Int_t status)
00417 {
00418    // Terminate the proof server.
00419    if (fTerminated)
00420       // Avoid doubling the exit operations
00421       exit(1);
00422    fTerminated = kTRUE;
00423 
00424    // Notify
00425    Info("Terminate", "starting session termination operations ...");
00426 
00427    // Cleanup session directory
00428    if (status == 0) {
00429       // make sure we remain in a "connected" directory
00430       gSystem->ChangeDirectory("/");
00431       // needed in case fSessionDir is on NFS ?!
00432       gSystem->MakeDirectory(fSessionDir+"/.delete");
00433       gSystem->Exec(TString::Format("%s %s", kRM, fSessionDir.Data()));
00434    }
00435 
00436    // Cleanup data directory if empty
00437    if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
00438      if (UnlinkDataDir(fDataDir))
00439         Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
00440    }
00441 
00442    // Remove input and signal handlers to avoid spurious "signals"
00443    // for closing activities executed upon exit()
00444    gSystem->RemoveSignalHandler(fInterruptHandler);
00445 
00446    // Stop processing events (set a flag to exit the event loop)
00447    gSystem->ExitLoop();
00448 
00449    // Notify
00450    Printf("Terminate: termination operations ended: quitting!");
00451 }
00452 
00453 //______________________________________________________________________________
00454 void TProofServLite::HandleFork(TMessage *mess)
00455 {
00456    // Cloning itself via fork.
00457 
00458    if (!mess) {
00459       Error("HandleFork", "empty message!");
00460       return;
00461    }
00462 
00463    // Extract the ordinals of the clones
00464    TString clones;
00465    (*mess) >> clones;
00466    PDB(kGlobal, 1)
00467       Info("HandleFork", "cloning to %s", clones.Data());
00468 
00469    TString clone;
00470    Int_t from = 0;
00471    while (clones.Tokenize(clone, from, " ")) {
00472 
00473       Int_t rc = 0;
00474       // Fork
00475       if ((rc = Fork()) < 0) {
00476          Error("HandleFork", "failed to fork %s", clone.Data());
00477          return;
00478       }
00479 
00480       // If the child, finalize the setup and return
00481       if (rc == 0) {
00482          SetupOnFork(clone.Data());
00483          return;
00484       }
00485    }
00486 
00487    // Done
00488    return;
00489 }
00490 
00491 //______________________________________________________________________________
00492 Int_t TProofServLite::SetupOnFork(const char *ord)
00493 {
00494    // Finalize the server setup afetr forking.
00495    // Return 0 on success, -1 on error
00496 
00497    if (gProofDebugLevel > 0)
00498       Info("SetupOnFork", "finalizing setup of %s", ord);
00499 
00500    // Set the ordinal
00501    fOrdinal = ord;
00502    TString sord;
00503    sord.Form("-%s", fOrdinal.Data());
00504 
00505    // Close the current log file
00506    if (fLogFile) {
00507       fclose(fLogFile);
00508       fLogFileDes = -1;
00509    }
00510 
00511    TString sdir = gSystem->DirName(fSessionDir.Data());
00512    RedirectOutput(sdir.Data(), "a");
00513    // If for some reason we failed setting a redirection file for the logs
00514    // we cannot continue
00515    if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
00516       Terminate(0);
00517       return -1;
00518    }
00519    FlushLogFile();
00520 
00521    // Eliminate existing symlink
00522    void *dirp = gSystem->OpenDirectory(sdir);
00523    if (dirp) {
00524       TString ent;
00525       const char *e = 0;
00526       while ((e = gSystem->GetDirEntry(dirp))) {
00527          ent.Form("%s/%s", sdir.Data(), e);
00528          FileStat_t st;
00529          gSystem->GetPathInfo(ent.Data(), st);
00530          if (st.fIsLink && ent.Contains(sord)) {
00531             PDB(kGlobal, 1)
00532                Info("SetupOnFork","unlinking: %s", ent.Data());
00533             gSystem->Unlink(ent);
00534          }
00535       }
00536       gSystem->FreeDirectory(dirp);
00537    }
00538 
00539    // The session tag
00540    fSessionTag.Form("%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
00541 
00542    // Create new symlink
00543    TString logfile = gSystem->Getenv("ROOTPROOFLOGFILE");
00544    logfile.ReplaceAll("-0.0", sord.Data());
00545    gSystem->Setenv("ROOTPROOFLOGFILE", logfile);
00546    Int_t iord = logfile.Index(sord.Data());
00547    if (iord != kNPOS) logfile.Remove(iord + sord.Length());
00548    logfile += TString::Format("-%s.log", fSessionTag.Data());
00549    gSystem->Symlink(gSystem->Getenv("ROOTPROOFLOGFILE"), logfile);
00550 
00551    // Get socket to be used to call back our xpd
00552    fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
00553    if (fSockPath.Length() <= 0) {
00554       Error("CreateServer", "Socket setup by xpd undefined");
00555       return -1;
00556    }
00557    TString entity = gEnv->GetValue("ProofServ.Entity", "");
00558    if (entity.Length() > 0)
00559       fSockPath.Insert(0, TString::Format("%s/", entity.Data()));
00560 
00561    // Call back the client
00562    fSocket = new TSocket(fSockPath);
00563    if (!fSocket || !(fSocket->IsValid())) {
00564       Error("CreateServer", "Failed to open connection to the client");
00565       return -1;
00566    }
00567 
00568    // Send our ordinal, to allow the client to identify us
00569    TMessage msg;
00570    msg << fOrdinal;
00571    fSocket->Send(msg);
00572 
00573    // Get socket descriptor
00574    Int_t sock = fSocket->GetDescriptor();
00575 
00576    // Install interrupt and message input handlers
00577    fInterruptHandler = new TProofServLiteInterruptHandler(this);
00578    gSystem->AddSignalHandler(fInterruptHandler);
00579    gSystem->AddFileHandler(new TProofServLiteInputHandler(this, sock));
00580 
00581    // Wait (loop) in worker node to allow debugger to connect
00582    if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
00583       while (gProofServDebug)
00584          ;
00585    }
00586 
00587    if (gProofDebugLevel > 0)
00588       Info("SetupOnFork", "Service: %s, ConfDir: %s, IsMaster: %d",
00589            fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
00590 
00591    if (Setup() == -1) {
00592       // Setup failure
00593       Terminate(0);
00594       SendLogFile();
00595       return -1;
00596    }
00597 
00598    // Disallow the interpretation of Rtypes.h, TError.h and TGenericClassInfo.h
00599    ProcessLine("#define ROOT_Rtypes 0", kTRUE);
00600    ProcessLine("#define ROOT_TError 0", kTRUE);
00601    ProcessLine("#define ROOT_TGenericClassInfo 0", kTRUE);
00602 
00603    // Save current interpreter context
00604    gInterpreter->SaveContext();
00605    gInterpreter->SaveGlobalsContext();
00606 
00607    // Done
00608    return 0;
00609 }

Generated on Tue Jul 5 14:51:42 2011 for ROOT_528-00b_version by  doxygen 1.5.1