TSlave.cxx

Go to the documentation of this file.
00001 // @(#)root/proof:$Id: TSlave.cxx 37978 2011-02-04 11:49:23Z ganis $
00002 // Author: Fons Rademakers   14/02/97
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TSlave                                                               //
00015 //                                                                      //
00016 // This class describes a PROOF slave server.                           //
00017 // It contains information like the slaves host name, ordinal number,   //
00018 // performance index, socket, etc. Objects of this class can only be    //
00019 // created via TProof member functions.                                 //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 #include <stdlib.h>
00024 
00025 #include "RConfigure.h"
00026 #include "TApplication.h"
00027 #include "TSlave.h"
00028 #include "TSlaveLite.h"
00029 #include "TProof.h"
00030 #include "TSystem.h"
00031 #include "TEnv.h"
00032 #include "TROOT.h"
00033 #include "TUrl.h"
00034 #include "TMessage.h"
00035 #include "TError.h"
00036 #include "TVirtualMutex.h"
00037 #include "TThread.h"
00038 #include "TSocket.h"
00039 #include "TObjString.h"
00040 
00041 ClassImp(TSlave)
00042 
00043 // Hook for the TXSlave constructor
00044 TSlave_t TSlave::fgTXSlaveHook = 0;
00045 
00046 //______________________________________________________________________________
00047 TSlave::TSlave(const char *url, const char *ord, Int_t perf,
00048                const char *image, TProof *proof, Int_t stype,
00049                const char *workdir, const char *msd)
00050   : fImage(image), fProofWorkDir(workdir),
00051     fWorkDir(workdir), fPort(-1),
00052     fOrdinal(ord), fPerfIdx(perf),
00053     fProtocol(0), fSocket(0), fProof(proof),
00054     fInput(0), fBytesRead(0), fRealTime(0),
00055     fCpuTime(0), fSlaveType((ESlaveType)stype), fStatus(TSlave::kInvalid),
00056     fParallel(0), fMsd(msd)
00057 {
00058    // Create a PROOF slave object. Called via the TProof ctor.
00059    fName = TUrl(url).GetHostFQDN();
00060    fPort = TUrl(url).GetPort();
00061 
00062    Init(url, -1, stype);
00063 }
00064 
00065 //______________________________________________________________________________
00066 TSlave::TSlave()
00067 {
00068    // Default constructor used by derived classes
00069 
00070    fPort      = -1;
00071    fOrdinal   = "-1";
00072    fPerfIdx   = -1;
00073    fProof     = 0;
00074    fSlaveType = kMaster;
00075    fProtocol  = 0;
00076    fSocket    = 0;
00077    fInput     = 0;
00078    fBytesRead = 0;
00079    fRealTime  = 0;
00080    fCpuTime   = 0;
00081    fStatus    = kInvalid;
00082    fParallel  = 0;
00083 }
00084 
00085 //______________________________________________________________________________
00086 void TSlave::Init(const char *host, Int_t port, Int_t stype)
00087 {
00088    // Init a PROOF slave object. Called via the TSlave ctor.
00089    // The Init method is technology specific and is overwritten by derived
00090    // classes.
00091 
00092    // The url contains information about the server type: make sure
00093    // it is 'proofd' or alike
00094    TString proto = fProof->fUrl.GetProtocol();
00095    proto.Insert(5, 'd');
00096 
00097    TUrl hurl(host);
00098    hurl.SetProtocol(proto);
00099    if (port > 0)
00100       hurl.SetPort(port);
00101 
00102    // Add information about our status (Client or Master)
00103    TString iam;
00104    if (fProof->IsMaster() && stype == kSlave) {
00105       iam = "Master";
00106       hurl.SetOptions("SM");
00107    } else if (fProof->IsMaster() && stype == kMaster) {
00108       iam = "Master";
00109       hurl.SetOptions("MM");
00110    } else if (!fProof->IsMaster() && stype == kMaster) {
00111       iam = "Local Client";
00112       hurl.SetOptions("MC");
00113    } else {
00114       Error("Init","Impossible PROOF <-> SlaveType Configuration Requested");
00115       R__ASSERT(0);
00116    }
00117 
00118    // Open authenticated connection to remote PROOF slave server.
00119    // If a connection was already open (fSocket != 0), re-use it
00120    // to perform authentication (optimization needed to avoid a double
00121    // opening in case this is called by TXSlave).
00122    Int_t wsize = 65536;
00123    fSocket = TSocket::CreateAuthSocket(hurl.GetUrl(), 0, wsize, fSocket);
00124 
00125    if (!fSocket || !fSocket->IsAuthenticated()) {
00126       SafeDelete(fSocket);
00127       return;
00128    }
00129 
00130    // Remove socket from global TROOT socket list. Only the TProof object,
00131    // representing all slave sockets, will be added to this list. This will
00132    // ensure the correct termination of all proof servers in case the
00133    // root session terminates.
00134    {
00135       R__LOCKGUARD2(gROOTMutex);
00136       gROOT->GetListOfSockets()->Remove(fSocket);
00137    }
00138 
00139    R__LOCKGUARD2(gProofMutex);
00140 
00141    // Fill some useful info
00142    fUser              = fSocket->GetSecContext()->GetUser();
00143    PDB(kGlobal,3) {
00144       Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
00145    }
00146 
00147    if (fSocket->GetRemoteProtocol() >= 14 ) {
00148       TMessage m(kPROOF_SETENV);
00149 
00150       const TList *envs = TProof::GetEnvVars();
00151       if (envs != 0 ) {
00152          TIter next(envs);
00153          for (TObject *o = next(); o != 0; o = next()) {
00154             TNamed *env = dynamic_cast<TNamed*>(o);
00155             if (env != 0) {
00156                TString def = Form("%s=%s", env->GetName(), env->GetTitle());
00157                const char *p = def.Data();
00158                m << p;
00159             }
00160          }
00161       }
00162       fSocket->Send(m);
00163    } else {
00164       Info("Init","** NOT ** Sending kPROOF_SETENV RemoteProtocol : %d",
00165          fSocket->GetRemoteProtocol());
00166    }
00167 
00168    char buf[512];
00169    fSocket->Recv(buf, sizeof(buf));
00170    if (strcmp(buf, "Okay")) {
00171       Printf("%s", buf);
00172       SafeDelete(fSocket);
00173       return;
00174    }
00175 
00176 }
00177 
00178 //______________________________________________________________________________
00179 Int_t TSlave::SetupServ(Int_t stype, const char *conffile)
00180 {
00181    // Init a PROOF slave object. Called via the TSlave ctor.
00182    // The Init method is technology specific and is overwritten by derived
00183    // classes.
00184 
00185    // get back startup message of proofserv (we are now talking with
00186    // the real proofserver and not anymore with the proofd front-end)
00187    Int_t what;
00188    char buf[512];
00189    if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
00190       Error("SetupServ", "failed to receive slave startup message");
00191       SafeDelete(fSocket);
00192       return -1;
00193    }
00194 
00195    if (what == kMESS_NOTOK) {
00196       SafeDelete(fSocket);
00197       return -1;
00198    }
00199 
00200    // exchange protocol level between client and master and between
00201    // master and slave
00202    if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
00203       Error("SetupServ", "failed to send local PROOF protocol");
00204       SafeDelete(fSocket);
00205       return -1;
00206    }
00207 
00208    if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
00209       Error("SetupServ", "failed to receive remote PROOF protocol");
00210       SafeDelete(fSocket);
00211       return -1;
00212    }
00213 
00214    // protocols less than 4 are incompatible
00215    if (fProtocol < 4) {
00216       Error("SetupServ", "incompatible PROOF versions (remote version"
00217                       " must be >= 4, is %d)", fProtocol);
00218       SafeDelete(fSocket);
00219       return -1;
00220    }
00221 
00222    fProof->fProtocol   = fProtocol;   // protocol of last slave on master
00223 
00224    if (fProtocol < 5) {
00225       //
00226       // Setup authentication related stuff for ald versions
00227       Bool_t isMaster = (stype == kMaster);
00228       TString wconf = isMaster ? TString(conffile) : fProofWorkDir;
00229       if (OldAuthSetup(isMaster, wconf) != 0) {
00230          Error("SetupServ", "OldAuthSetup: failed to setup authentication");
00231          SafeDelete(fSocket);
00232          return -1;
00233       }
00234    } else {
00235       //
00236       // Send ordinal (and config) info to slave (or master)
00237       TMessage mess;
00238       if (stype == kMaster)
00239          mess << fUser << fOrdinal << TString(conffile);
00240       else
00241          mess << fUser << fOrdinal << fProofWorkDir;
00242 
00243       if (fSocket->Send(mess) < 0) {
00244          Error("SetupServ", "failed to send ordinal and config info");
00245          SafeDelete(fSocket);
00246          return -1;
00247       }
00248    }
00249 
00250    // set some socket options
00251    fSocket->SetOption(kNoDelay, 1);
00252 
00253    // Set active state
00254    fStatus = kActive;
00255 
00256    // We are done
00257    return 0;
00258 }
00259 
00260 //______________________________________________________________________________
00261 void TSlave::Init(TSocket *s, Int_t stype)
00262 {
00263    // Init a PROOF slave object using the connection opened via s. Used to
00264    // avoid double opening when an attempt via TXSlave found a remote proofd.
00265 
00266    fSocket = s;
00267    TSlave::Init(s->GetInetAddress().GetHostName(), s->GetPort(), stype);
00268 }
00269 
00270 //______________________________________________________________________________
00271 TSlave::~TSlave()
00272 {
00273    // Destroy slave.
00274 
00275    Close();
00276 }
00277 
00278 //______________________________________________________________________________
00279 void TSlave::Close(Option_t *opt)
00280 {
00281    // Close slave socket.
00282 
00283    if (fSocket) {
00284 
00285       // If local client ...
00286       if (!(fProof->IsMaster()) && !strncasecmp(opt,"S",1)) {
00287          // ... tell master and slaves to stop
00288          Interrupt(TProof::kShutdownInterrupt);
00289       }
00290 
00291       // deactivate used sec context if talking to proofd daemon running
00292       // an old protocol (sec context disactivated remotely)
00293       TSecContext *sc = fSocket->GetSecContext();
00294       if (sc && sc->IsActive()) {
00295          TIter last(sc->GetSecContextCleanup(), kIterBackward);
00296          TSecContextCleanup *nscc = 0;
00297          while ((nscc = (TSecContextCleanup *)last())) {
00298             if (nscc->GetType() == TSocket::kPROOFD &&
00299                 nscc->GetProtocol() < 9) {
00300                sc->DeActivate("");
00301                break;
00302             }
00303          }
00304       }
00305    }
00306 
00307    SafeDelete(fInput);
00308    SafeDelete(fSocket);
00309 }
00310 
00311 //______________________________________________________________________________
00312 Int_t TSlave::Compare(const TObject *obj) const
00313 {
00314    // Used to sort slaves by performance index.
00315 
00316    const TSlave *sl = dynamic_cast<const TSlave*>(obj);
00317 
00318    if (!sl) {
00319       Error("Compare", "input is not a TSlave object");
00320       return 0;
00321    }
00322 
00323    if (fPerfIdx > sl->GetPerfIdx()) return 1;
00324    if (fPerfIdx < sl->GetPerfIdx()) return -1;
00325    const char *myord = GetOrdinal();
00326    const char *otherord = sl->GetOrdinal();
00327    while (myord && otherord) {
00328       Int_t myval = atoi(myord);
00329       Int_t otherval = atoi(otherord);
00330       if (myval < otherval) return 1;
00331       if (myval > otherval) return -1;
00332       myord = strchr(myord, '.');
00333       if (myord) myord++;
00334       otherord = strchr(otherord, '.');
00335       if (otherord) otherord++;
00336    }
00337    if (myord) return -1;
00338    if (otherord) return 1;
00339    return 0;
00340 }
00341 
00342 //______________________________________________________________________________
00343 void TSlave::Print(Option_t *) const
00344 {
00345    // Printf info about slave.
00346 
00347    TString sc;
00348 
00349    const char *sst[] = { "invalid" , "valid", "inactive" };
00350    Int_t st = fSocket ? ((fStatus == kInactive) ? 2 : 1) : 0;
00351 
00352    Printf("*** Worker %s  (%s)", fOrdinal.Data(), sst[st]);
00353    Printf("    Host name:               %s", GetName());
00354    Printf("    Port number:             %d", GetPort());
00355    Printf("    Worker session tag:      %s", GetSessionTag());
00356    Printf("    ROOT version|rev|tag:    %s", GetROOTVersion());
00357    Printf("    Architecture-Compiler:   %s", GetArchCompiler());
00358    if (fSocket) {
00359       if (strlen(GetGroup()) > 0) {
00360          Printf("    User/Group:              %s/%s", GetUser(), GetGroup());
00361       } else {
00362          Printf("    User:                    %s", GetUser());
00363       }
00364       if (fSocket->GetSecContext())
00365          Printf("    Security context:        %s", fSocket->GetSecContext()->AsString(sc));
00366       Printf("    Proofd protocol version: %d", fSocket->GetRemoteProtocol());
00367       Printf("    Image name:              %s", GetImage());
00368       Printf("    Working directory:       %s", GetWorkDir());
00369       Printf("    Performance index:       %d", GetPerfIdx());
00370       Printf("    MB's processed:          %.2f", float(GetBytesRead())/(1024*1024));
00371       Printf("    MB's sent:               %.2f", float(fSocket->GetBytesRecv())/(1024*1024));
00372       Printf("    MB's received:           %.2f", float(fSocket->GetBytesSent())/(1024*1024));
00373       Printf("    Real time used (s):      %.3f", GetRealTime());
00374       Printf("    CPU time used (s):       %.3f", GetCpuTime());
00375    }
00376 }
00377 
00378 //______________________________________________________________________________
00379 void TSlave::SetInputHandler(TFileHandler *ih)
00380 {
00381    // Adopt and register input handler for this slave. Handler will be deleted
00382    // by the slave.
00383 
00384    fInput = ih;
00385    fInput->Add();
00386 }
00387 
00388 //______________________________________________________________________________
00389 Int_t TSlave::OldAuthSetup(Bool_t master, TString wconf)
00390 {
00391    // Setup authentication related stuff for old versions.
00392    // Provided for backward compatibility.
00393    static OldSlaveAuthSetup_t oldAuthSetupHook = 0;
00394 
00395    if (!oldAuthSetupHook) {
00396       // Load libraries needed for (server) authentication ...
00397       TString authlib = "libRootAuth";
00398       char *p = 0;
00399       // The generic one
00400       if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
00401          delete[] p;
00402          if (gSystem->Load(authlib) == -1) {
00403             Error("OldAuthSetup", "can't load %s",authlib.Data());
00404             return kFALSE;
00405          }
00406       } else {
00407          Error("OldAuthSetup", "can't locate %s",authlib.Data());
00408          return -1;
00409       }
00410       //
00411       // Locate OldSlaveAuthSetup
00412       Func_t f = gSystem->DynFindSymbol(authlib,"OldSlaveAuthSetup");
00413       if (f)
00414          oldAuthSetupHook = (OldSlaveAuthSetup_t)(f);
00415       else {
00416          Error("OldAuthSetup", "can't find OldSlaveAuthSetup");
00417          return -1;
00418       }
00419    }
00420    //
00421    // Setup
00422    if (oldAuthSetupHook) {
00423       return (*oldAuthSetupHook)(fSocket, master, fOrdinal, wconf);
00424    } else {
00425       Error("OldAuthSetup", "hook to method OldSlaveAuthSetup is undefined");
00426       return -1;
00427    }
00428 }
00429 
00430 //______________________________________________________________________________
00431 TSlave *TSlave::Create(const char *url, const char *ord, Int_t perf,
00432                        const char *image, TProof *proof, Int_t stype,
00433                        const char *workdir, const char *msd)
00434 {
00435    // Static method returning the appropriate TSlave object for the remote
00436    // server.
00437 
00438    TSlave *s = 0;
00439 
00440    // Check if we are setting up a lite version
00441    if (!strcmp(url, "lite")) {
00442       return new TSlaveLite(ord, perf, image, proof, stype, workdir, msd);
00443    }
00444 
00445    // No need to try a XPD connection in some well defined cases
00446    Bool_t tryxpd = kTRUE;
00447    if (!(proof->IsMaster())) {
00448       if (proof->IsProofd())
00449          tryxpd = kFALSE;
00450    } else {
00451       if (gApplication &&
00452          (gApplication->Argc() < 3 || strncmp(gApplication->Argv(2),"xpd",3)))
00453          tryxpd = kFALSE;
00454    }
00455 
00456    // We do this without the plugin manager because it blocks the CINT mutex
00457    // breaking the parallel startup
00458    if (!fgTXSlaveHook) {
00459 
00460       // Load the library containing TXSlave ...
00461       TString proofxlib = "libProofx";
00462       char *p = 0;
00463       if ((p = gSystem->DynamicPathName(proofxlib, kTRUE))) {
00464          delete[] p;
00465          if (gSystem->Load(proofxlib) == -1)
00466             ::Error("TSlave::Create", "can't load %s", proofxlib.Data());
00467       } else
00468          ::Error("TSlave::Create", "can't locate %s", proofxlib.Data());
00469    }
00470 
00471    // Load the right class
00472    if (fgTXSlaveHook && tryxpd) {
00473       s = (*fgTXSlaveHook)(url, ord, perf, image, proof, stype, workdir, msd);
00474    } else {
00475       s = new TSlave(url, ord, perf, image, proof, stype, workdir, msd);
00476    }
00477 
00478    return s;
00479 }
00480 
00481 //______________________________________________________________________________
00482 Int_t TSlave::Ping()
00483 {
00484    // Ping the remote master or slave servers.
00485    // Returns 0 if ok, -1 in case of error
00486 
00487    if (!IsValid()) return -1;
00488 
00489    TMessage mess(kPROOF_PING | kMESS_ACK);
00490    fSocket->Send(mess);
00491    if (fSocket->Send(mess) == -1) {
00492       Warning("Ping","%s: acknowledgement not received", GetOrdinal());
00493       return -1;
00494    }
00495    return 0;
00496 }
00497 
00498 //______________________________________________________________________________
00499 void TSlave::Interrupt(Int_t type)
00500 {
00501    // Send interrupt OOB byte to master or slave servers.
00502    // Returns 0 if ok, -1 in case of error
00503 
00504    if (!IsValid()) return;
00505 
00506    char oobc = (char) type;
00507    const int kBufSize = 1024;
00508    char waste[kBufSize];
00509 
00510    // Send one byte out-of-band message to server
00511    if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
00512       Error("Interrupt", "error sending oobc to slave %s", GetOrdinal());
00513       return;
00514    }
00515 
00516    if (type == TProof::kHardInterrupt) {
00517       char  oob_byte;
00518       int   n, nch, nbytes = 0, nloop = 0;
00519 
00520       // Receive the OOB byte
00521       while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
00522          if (n == -2) {   // EWOULDBLOCK
00523             //
00524             // The OOB data has not yet arrived: flush the input stream
00525             //
00526             // In some systems (Solaris) regular recv() does not return upon
00527             // receipt of the oob byte, which makes the below call to recv()
00528             // block indefinitely if there are no other data in the queue.
00529             // FIONREAD ioctl can be used to check if there are actually any
00530             // data to be flushed.  If not, wait for a while for the oob byte
00531             // to arrive and try to read it again.
00532             //
00533             fSocket->GetOption(kBytesToRead, nch);
00534             if (nch == 0) {
00535                gSystem->Sleep(1000);
00536                continue;
00537             }
00538 
00539             if (nch > kBufSize) nch = kBufSize;
00540             n = fSocket->RecvRaw(waste, nch);
00541             if (n <= 0) {
00542                Error("Interrupt", "error receiving waste from slave %s",
00543                      GetOrdinal());
00544                break;
00545             }
00546             nbytes += n;
00547          } else if (n == -3) {   // EINVAL
00548             //
00549             // The OOB data has not arrived yet
00550             //
00551             gSystem->Sleep(100);
00552             if (++nloop > 100) {  // 10 seconds time-out
00553                Error("Interrupt", "server %s does not respond", GetOrdinal());
00554                break;
00555             }
00556          } else {
00557             Error("Interrupt", "error receiving OOB from server %s",
00558                   GetOrdinal());
00559             break;
00560          }
00561       }
00562 
00563       //
00564       // Continue flushing the input socket stream until the OOB
00565       // mark is reached
00566       //
00567       while (1) {
00568          int atmark;
00569 
00570          fSocket->GetOption(kAtMark, atmark);
00571 
00572          if (atmark)
00573             break;
00574 
00575          // find out number of bytes to read before atmark
00576          fSocket->GetOption(kBytesToRead, nch);
00577          if (nch == 0) {
00578             gSystem->Sleep(1000);
00579             continue;
00580          }
00581 
00582          if (nch > kBufSize) nch = kBufSize;
00583          n = fSocket->RecvRaw(waste, nch);
00584          if (n <= 0) {
00585             Error("Interrupt", "error receiving waste (2) from slave %s",
00586                   GetOrdinal());
00587             break;
00588          }
00589          nbytes += n;
00590       }
00591       if (nbytes > 0) {
00592          if (fProof->IsMaster())
00593             Info("Interrupt", "slave %s:%s synchronized: %d bytes discarded",
00594                  GetName(), GetOrdinal(), nbytes);
00595          else
00596             Info("Interrupt", "PROOF synchronized: %d bytes discarded", nbytes);
00597       }
00598 
00599       // Get log file from master or slave after a hard interrupt
00600       fProof->Collect(this);
00601 
00602    } else if (type == TProof::kSoftInterrupt) {
00603 
00604       // Get log file from master or slave after a soft interrupt
00605       fProof->Collect(this);
00606 
00607    } else if (type == TProof::kShutdownInterrupt) {
00608 
00609       ; // nothing expected to be returned
00610 
00611    } else {
00612 
00613       // Unexpected message, just receive log file
00614       fProof->Collect(this);
00615    }
00616 }
00617 
00618 //______________________________________________________________________________
00619 void TSlave::StopProcess(Bool_t abort, Int_t timeout)
00620 {
00621    // Sent stop/abort request to PROOF server.
00622 
00623    // Notify the remote counterpart
00624    TMessage msg(kPROOF_STOPPROCESS);
00625    msg << abort;
00626    if (fProof->fProtocol > 9)
00627       msg << timeout;
00628    fSocket->Send(msg);
00629 }
00630 
00631 //______________________________________________________________________________
00632 TObjString *TSlave::SendCoordinator(Int_t, const char *, Int_t)
00633 {
00634    // Send message to intermediate coordinator. Only meaningful when there is one,
00635    // i.e. in XPD framework
00636 
00637    if (gDebug > 0)
00638       Info("SendCoordinator","method not implemented for this communication layer");
00639    return 0;
00640 }
00641 
00642 //______________________________________________________________________________
00643 void TSlave::SetAlias(const char *)
00644 {
00645    // Set an alias for this session. If reconnection is supported, the alias
00646    // will be communicated to the remote coordinator so that it can be recovered
00647    // when reconnecting
00648 
00649    if (gDebug > 0)
00650       Info("SetAlias","method not implemented for this communication layer");
00651    return;
00652 }
00653 
00654 //_____________________________________________________________________________
00655 void TSlave::SetTXSlaveHook(TSlave_t xslavehook)
00656 {
00657    // Set hook to TXSlave ctor
00658    fgTXSlaveHook = xslavehook;
00659 }

Generated on Tue Jul 5 14:51:42 2011 for ROOT_528-00b_version by  doxygen 1.5.1