TApplicationRemote.cxx

Go to the documentation of this file.
00001 // @(#)root/net:$Id: TApplicationRemote.cxx 36116 2010-10-06 13:58:05Z ganis $
00002 // Author: G. Ganis  10/5/2007
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2007, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TApplicationRemote                                                   //
00015 //                                                                      //
00016 // TApplicationRemote maps a remote session. It starts a remote session //
00017 // and takes care of redirecting the commands to be processed to the    //
00018 // remote session, to collect the graphic output objects and to display //
00019 // them locally.                                                        //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 #include <errno.h>
00024 
00025 #include "TApplicationRemote.h"
00026 
00027 #include "TBrowser.h"
00028 #include "TDirectory.h"
00029 #include "TError.h"
00030 #include "THashList.h"
00031 #include "TMonitor.h"
00032 #include "TRandom.h"
00033 #include "TROOT.h"
00034 #include "TServerSocket.h"
00035 #include "TSystem.h"
00036 #include "TRemoteObject.h"
00037 #ifdef WIN32
00038 #include <io.h>
00039 #include <sys/types.h>
00040 #endif
00041 
00042 //
00043 // TApplicationRemote Interrupt signal handler
00044 //______________________________________________________________________________
00045 Bool_t TARInterruptHandler::Notify()
00046 {
00047    // TApplicationRemote interrupt handler.
00048 
00049    Info("Notify","Processing interrupt signal ...");
00050 
00051    // Handle interrupt condition on socket(s)
00052    fApplicationRemote->Interrupt(kRRI_Hard);
00053 
00054    return kTRUE;
00055 }
00056 
00057 
00058 ClassImp(TApplicationRemote)
00059 
00060 static const char *gScript = "roots";
00061 static const char *gScriptCmd = "\\\"%s %d localhost:%d/%s -d=%d\\\"";
00062 #ifndef WIN32
00063 static const char *gSshCmd = "ssh %s -f4 %s -R %d:localhost:%d sh -c \
00064    \"'(sh=\\`basename \'\\\\\\$SHELL\'\\`; \
00065    if test xbash = x\'\\\\\\$sh\' -o xsh = x\'\\\\\\$sh\' -o xzsh = x\'\\\\\\$sh\' -o xdash = x\'\\\\\\$sh\'; then \
00066       \'\\\\\\$SHELL\' -l -c %s; \
00067    elif test xcsh = x\'\\\\\\$sh\' -o xtcsh = x\'\\\\\\$sh\' -o xksh = x\'\\\\\\$sh\'; then \
00068       \'\\\\\\$SHELL\' -c %s; \
00069    else \
00070       echo \\\"Unknown shell \'\\\\\\$SHELL\'\\\"; \
00071    fi)'\"";
00072 #else
00073 static const char *gSshCmd = "ssh %s -f4 %s -R %d:localhost:%d sh -c \
00074    \"'(sh=`basename $SHELL`; \
00075    if test xbash = x$sh -o xsh = x$sh -o xzsh = x$sh -o xdash = x$sh; then \
00076       $SHELL -l -c %s; \
00077    elif test xcsh = x$sh -o xtcsh = x$sh -o xksh = x$sh; then \
00078       $SHELL -c %s; \
00079    else \
00080       echo \"Unknown shell $SHELL\"; \
00081    fi)'\"";
00082 #endif
00083 
00084 Int_t TApplicationRemote::fgPortAttempts = 100; // number of attempts to find a port
00085 Int_t TApplicationRemote::fgPortLower =  49152; // lower bound for ports
00086 Int_t TApplicationRemote::fgPortUpper =  65535; // upper bound for ports
00087 
00088 //______________________________________________________________________________
00089 TApplicationRemote::TApplicationRemote(const char *url, Int_t debug,
00090                                        const char *script)
00091                    : TApplication(), fUrl(url)
00092 {
00093    // Main constructor: start a remote session at 'url' accepting callbacks
00094    // on local port 'port'; if port is already in use scan up to 'scan - 1'
00095    // ports starting from port + 1, i.e. port + 1, ... , port + scan - 1
00096 
00097    // Unique name (used also in the prompt)
00098    fName = fUrl.GetHost();
00099    if (strlen(fUrl.GetOptions()) > 0)
00100       fName += Form("-%s", fUrl.GetOptions());
00101    UserGroup_t *pw = gSystem->GetUserInfo(gSystem->GetEffectiveUid());
00102    TString user = (pw) ? pw->fUser : "";
00103    SafeDelete(pw);
00104    if (strlen(fUrl.GetUser()) > 0 && user != fUrl.GetUser())
00105       fName.Insert(0,Form("%s@", fUrl.GetUser()));
00106 
00107    fIntHandler = 0;
00108    fSocket = 0;
00109    fMonitor = 0;
00110    fFileList = 0;
00111    fWorkingDir = 0;
00112    fRootFiles = 0;
00113    fReceivedObject = 0;
00114    ResetBit(kCollecting);
00115 
00116    // Create server socket; generate randomly a port to find a free one
00117    Int_t port = -1;
00118    Int_t na = fgPortAttempts;
00119    Long64_t now = gSystem->Now();
00120    gRandom->SetSeed((UInt_t)now);
00121    TServerSocket *ss = 0;
00122    while (na--) {
00123       port = (Int_t) (gRandom->Rndm() * (fgPortUpper - fgPortLower)) + fgPortLower;
00124       ss = new TServerSocket(port);
00125       if (ss->IsValid())
00126          break;
00127    }
00128    if (!ss || !ss->IsValid()) {
00129       Error("TApplicationRemote","unable to find a free port for connections");
00130       SetBit(kInvalidObject);
00131       return;
00132    }
00133 
00134    // Create a monitor and add the socket to it
00135    TMonitor *mon = new TMonitor;
00136    mon->Add(ss);
00137 
00138    // Start the remote server
00139    Int_t rport = (port < fgPortUpper) ? port + 1 : port - 1;
00140    TString sc = gScript;
00141    if (script && *script) {
00142       // script is enclosed by " ", so ignore first " char
00143       if (script[1] == '<') {
00144          if (script[2])
00145             sc.Form("source %s; %s", script+2, gScript);
00146          else
00147             Error("TApplicationRemote", "illegal script name <");
00148       } else
00149          sc = script;
00150    }
00151    sc.ReplaceAll("\"","");
00152    TString userhost = fUrl.GetHost();
00153    if (strlen(fUrl.GetUser()) > 0)
00154       userhost.Insert(0, Form("%s@", fUrl.GetUser()));
00155    const char *verb = "";
00156    if (debug > 0)
00157       verb = "-v";
00158    TString scriptCmd;
00159    scriptCmd.Form(gScriptCmd, sc.Data(), kRRemote_Protocol, rport, fUrl.GetFile(), debug);
00160    TString cmd;
00161    cmd.Form(gSshCmd, verb, userhost.Data(), rport, port, scriptCmd.Data(), scriptCmd.Data());
00162 #ifdef WIN32
00163    // make sure that the Gpad and GUI libs are loaded
00164    TApplication::NeedGraphicsLibs();
00165    gApplication->InitializeGraphics();
00166 #endif
00167    if (gDebug > 0)
00168       Info("TApplicationRemote", "executing: %s", cmd.Data());
00169    if (gSystem->Exec(cmd) != 0) {
00170       Info("TApplicationRemote", "an error occured during SSH connection");
00171       mon->DeActivateAll();
00172       delete mon;
00173       delete ss;
00174       SafeDelete(fSocket);
00175       SetBit(kInvalidObject);
00176       return;
00177    }
00178 
00179    // Wait for activity on the socket
00180    mon->Select();
00181 
00182    // Get the connection
00183    fSocket = ss->Accept();
00184 
00185    // Cleanup the monitor and the server socket
00186    mon->DeActivateAll();
00187    delete mon;
00188    delete ss;
00189 
00190    // Receive the startup message
00191    Int_t what;
00192    char buf[512];
00193    if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
00194       Error("TApplicationRemote", "failed to receive startup message");
00195       SafeDelete(fSocket);
00196       SetBit(kInvalidObject);
00197       return;
00198    }
00199    Printf("%s", buf);
00200 
00201    // Receive the protocol version run remotely
00202    if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
00203       Error("TApplicationRemote", "failed to receive remote server protocol");
00204       SafeDelete(fSocket);
00205       SetBit(kInvalidObject);
00206       return;
00207    }
00208    if (fProtocol != kRRemote_Protocol)
00209       Info("TApplicationRemote","server runs a different protocol version: %d (vs %d)",
00210                      fProtocol, kRRemote_Protocol);
00211 
00212    TMessage *msg = 0;
00213    // Receive the protocol version run remotely
00214    if (fSocket->Recv(msg) < 0 || msg->What() != kMESS_ANY) {
00215       Error("TApplicationRemote", "failed to receive server info - protocol error");
00216       SafeDelete(fSocket);
00217       SetBit(kInvalidObject);
00218       return;
00219    }
00220 
00221    // Real host name and full path to remote log
00222    TString hostname;
00223    (*msg) >> hostname >> fLogFilePath;
00224    fUrl.SetHost(hostname);
00225 
00226    // Monitor the socket
00227    fMonitor = new TMonitor;
00228    fMonitor->Add(fSocket);
00229 
00230    // Set interrupt handler from now on
00231    fIntHandler = new TARInterruptHandler(this);
00232 
00233    // To get the right cleaning sequence
00234    gROOT->GetListOfSockets()->Remove(fSocket);
00235    gROOT->GetListOfSockets()->Add(this);
00236 
00237    fRootFiles = new TList;
00238    fRootFiles->SetName("Files");
00239 
00240    // Collect startup notifications
00241    Collect();
00242 
00243    // Done
00244    return;
00245 }
00246 
00247 //______________________________________________________________________________
00248 TApplicationRemote::~TApplicationRemote()
00249 {
00250    // Destructor
00251 
00252    gROOT->GetListOfSockets()->Remove(this);
00253    Terminate(0);
00254 }
00255 
00256 //______________________________________________________________________________
00257 Int_t TApplicationRemote::Broadcast(const TMessage &mess)
00258 {
00259    // Broadcast a message to the remote session.
00260    // Returns 0 on success, -1 in case of error.
00261 
00262    if (!IsValid()) return -1;
00263 
00264    if (fSocket->Send(mess) == -1) {
00265       Error("Broadcast", "could not send message");
00266       return -1;
00267    }
00268    // Done
00269    return 0;
00270 }
00271 
00272 //______________________________________________________________________________
00273 Int_t TApplicationRemote::Broadcast(const char *str, Int_t kind, Int_t type)
00274 {
00275    // Broadcast a character string buffer to the remote session.
00276    // Use kind to set the TMessage what field.
00277    // Returns 0 on success, -1 in case of error.
00278 
00279    TMessage mess(kind);
00280    if (kind == kMESS_ANY)
00281       mess << type;
00282    if (str) mess.WriteString(str);
00283    return Broadcast(mess);
00284 }
00285 
00286 //______________________________________________________________________________
00287 Int_t TApplicationRemote::BroadcastObject(const TObject *obj, Int_t kind)
00288 {
00289    // Broadcast an object to the remote session.
00290    // Use kind to set the TMessage what field.
00291    // Returns 0 on success, -1 in case of error.
00292 
00293    TMessage mess(kind);
00294    mess.WriteObject(obj);
00295    return Broadcast(mess);
00296 }
00297 
00298 //______________________________________________________________________________
00299 Int_t TApplicationRemote::BroadcastRaw(const void *buffer, Int_t length)
00300 {
00301    // Broadcast a raw buffer of specified length to the remote session.
00302    // Returns 0 on success, -1 in case of error.
00303 
00304    if (!IsValid()) return -1;
00305 
00306    if (fSocket->SendRaw(buffer, length) == -1) {
00307       Error("Broadcast", "could not send raw buffer");
00308       return -1;
00309    }
00310    // Done
00311    return 0;
00312 }
00313 
00314 //______________________________________________________________________________
00315 Int_t TApplicationRemote::Collect(Long_t timeout)
00316 {
00317    // Collect responses from the remote server.
00318    // Returns the number of messages received.
00319    // If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
00320    // which means wait forever).
00321 
00322    // Activate monitoring
00323    fMonitor->ActivateAll();
00324    if (!fMonitor->GetActive())
00325        return 0;
00326 
00327    // Timeout counter
00328    Long_t nto = timeout;
00329    if (gDebug > 2)
00330       Info("Collect","active: %d", fMonitor->GetActive());
00331 
00332    // On clients, handle Ctrl-C during collection
00333    if (fIntHandler)
00334       fIntHandler->Add();
00335 
00336    // We are now going to collect from the server
00337    SetBit(kCollecting);
00338 
00339    Int_t rc = 0, cnt = 0;
00340    while (fMonitor->GetActive() && (nto < 0 || nto > 0)) {
00341 
00342       // Wait for a ready socket
00343       TSocket *s = fMonitor->Select(1000);
00344 
00345       if (s && s != (TSocket *)(-1)) {
00346          // Get and analyse the info it did receive
00347          if ((rc = CollectInput()) != 0) {
00348             // Deactivate it if we are done with it
00349             fMonitor->DeActivate(s);
00350             if (gDebug > 2)
00351                Info("Collect","deactivating %p", s);
00352          }
00353 
00354          // Update counter (if no error occured)
00355          if (rc >= 0)
00356             cnt++;
00357 
00358       } else {
00359          // If not timed-out, exit if not stopped or not aborted
00360          // (player exits status is finished in such a case); otherwise,
00361          // we still need to collect the partial output info
00362          if (!s)
00363             fMonitor->DeActivateAll();
00364          // Decrease the timeout counter if requested
00365          if (s == (TSocket *)(-1) && nto > 0)
00366             nto--;
00367       }
00368    }
00369 
00370    // Collection is over
00371    ResetBit(kCollecting);
00372 
00373    // If timed-out, deactivate everything
00374    if (nto == 0)
00375       fMonitor->DeActivateAll();
00376 
00377    // Deactivate Ctrl-C special handler
00378    if (fIntHandler)
00379       fIntHandler->Remove();
00380 
00381    return cnt;
00382 }
00383 
00384 //______________________________________________________________________________
00385 Int_t TApplicationRemote::CollectInput()
00386 {
00387    // Collect and analyze available input from the socket.
00388    // Returns 0 on success, -1 if any failure occurs.
00389 
00390    TMessage *mess;
00391    Int_t rc = 0;
00392 
00393    char      str[512];
00394    TObject  *obj;
00395    Int_t     what;
00396    Bool_t    delete_mess = kTRUE;
00397 
00398    if (fSocket->Recv(mess) < 0) {
00399       SetBit(kInvalidObject);
00400       SafeDelete(fSocket);
00401       return -1;
00402    }
00403    if (!mess) {
00404       // we get here in case the remote server died
00405       SetBit(kInvalidObject);
00406       SafeDelete(fSocket);
00407       return -1;
00408    }
00409 
00410    what = mess->What();
00411 
00412    if (gDebug > 2)
00413       Info("CollectInput","what %d", what);
00414 
00415    switch (what) {
00416 
00417       case kMESS_OBJECT:
00418          {  // The server sent over an object: read it in memory
00419             TObject *o = mess->ReadObject(mess->GetClass());
00420             // If a canvas, draw it
00421             if (TString(o->ClassName()) == "TCanvas")
00422                o->Draw();
00423             else if (TString(o->ClassName()) == "TRemoteObject") {
00424                TRemoteObject *robj = (TRemoteObject *)o;
00425                if (TString(robj->GetClassName()) == "TSystemDirectory") {
00426                   if (fWorkingDir == 0) {
00427                      fWorkingDir = (TRemoteObject *)o;
00428                   }
00429                }
00430             }
00431             else if (TString(o->ClassName()) == "TList") {
00432                TList *list = (TList *)o;
00433                TRemoteObject *robj = (TRemoteObject *)list->First();
00434                if (robj && (TString(robj->GetClassName()) == "TFile")) {
00435                   TIter next(list);
00436                   while ((robj = (TRemoteObject *)next())) {
00437                      if (!fRootFiles->FindObject(robj->GetName()))
00438                         fRootFiles->Add(robj);
00439                   }
00440                   gROOT->RefreshBrowsers();
00441                }
00442             }
00443             fReceivedObject = o;
00444          }
00445          break;
00446 
00447       case kMESS_ANY:
00448          // Generic message: read out the type
00449          {  Int_t type;
00450             (*mess) >> type;
00451 
00452             if (gDebug > 2)
00453                Info("CollectInput","type %d", type);
00454 
00455             switch (type) {
00456 
00457                case kRRT_GetObject:
00458                   // send server the object it asks for
00459                   mess->ReadString(str, sizeof(str));
00460                   obj = gDirectory->Get(str);
00461                   if (obj) {
00462                      fSocket->SendObject(obj);
00463                   } else {
00464                      Warning("CollectInput",
00465                              "server requested an object that we do not have");
00466                      fSocket->Send(kMESS_NOTOK);
00467                   }
00468                   break;
00469 
00470                case kRRT_Fatal:
00471                   // Fatal error
00472                   SafeDelete(fSocket);
00473                   rc = -1;
00474                   break;
00475 
00476                case kRRT_LogFile:
00477                   {  Int_t size;
00478                      (*mess) >> size;
00479                      RecvLogFile(size);
00480                   }
00481                   break;
00482 
00483                case kRRT_LogDone:
00484                   {  Int_t st;
00485                     (*mess) >> st;
00486                      if (st < 0) {
00487                         // Problem: object should not be used
00488                         SetBit(kInvalidObject);
00489                      }
00490                      if (gDebug > 1)
00491                         Info("CollectInput","kRTT_LogDone: status %d", st);
00492                      rc = 1;
00493                   }
00494                   break;
00495 
00496                case kRRT_Message:
00497                   {  TString msg;
00498                      Bool_t lfeed;
00499                      (*mess) >> msg >> lfeed;
00500                      if (lfeed)
00501                         fprintf(stderr,"%s\n", msg.Data());
00502                      else
00503                         fprintf(stderr,"%s\r", msg.Data());
00504                   }
00505                   break;
00506 
00507                case kRRT_SendFile:
00508                   {  TString fname;
00509                      (*mess) >> fname;
00510                      // Prepare the reply
00511                      TMessage m(kMESS_ANY);
00512                      m << (Int_t) kRRT_SendFile;
00513                      // The server needs a file: we send also the related header
00514                      // if we have it.
00515                      char *imp = gSystem->Which(TROOT::GetMacroPath(), fname, kReadPermission);
00516                      if (!imp) {
00517                         Error("CollectInput", "file %s not found in path(s) %s",
00518                                          fname.Data(), TROOT::GetMacroPath());
00519                         m << (Bool_t) kFALSE;
00520                         Broadcast(m);
00521                      } else {
00522                         TString impfile = imp;
00523                         delete [] imp;
00524                         Int_t dot = impfile.Last('.');
00525 
00526                         // Is there any associated header file
00527                         Bool_t hasHeader = kTRUE;
00528                         TString headfile = impfile;
00529                         if (dot != kNPOS)
00530                            headfile.Remove(dot);
00531                         headfile += ".h";
00532                         if (gSystem->AccessPathName(headfile, kReadPermission)) {
00533                            TString h = headfile;
00534                            headfile.Remove(dot);
00535                            headfile += ".hh";
00536                            if (gSystem->AccessPathName(headfile, kReadPermission)) {
00537                               hasHeader = kFALSE;
00538                               if (gDebug > 0)
00539                                  Info("CollectInput", "no associated header file"
00540                                                  " found: tried: %s %s",
00541                                                  h.Data(), headfile.Data());
00542                            }
00543                         }
00544 
00545                         // Send files now;
00546                         m << (Bool_t) kTRUE;
00547                         Broadcast(m);
00548                         if (SendFile(impfile, kForce) == -1) {
00549                            Info("CollectInput", "problems sending file %s", impfile.Data());
00550                            return 0;
00551                         }
00552                         if (hasHeader) {
00553                            Broadcast(m);
00554                            if (SendFile(headfile, kForce) == -1) {
00555                               Info("CollectInput", "problems sending file %s", headfile.Data());
00556                               return 0;
00557                            }
00558                         }
00559                      }
00560                      // End of transmission
00561                      m.Reset(kMESS_ANY);
00562                      m << (Int_t) kRRT_SendFile;
00563                      m << (Bool_t) kFALSE;
00564                      Broadcast(m);
00565                   }
00566                   break;
00567 
00568                default:
00569                   Warning("CollectInput","unknown type received from server: %d", type);
00570                   break;
00571 
00572             }
00573          }
00574          break;
00575 
00576       default:
00577          Error("CollectInput", "unknown command received from server: %d", what);
00578          SetBit(kInvalidObject);
00579          SafeDelete(fSocket);
00580          rc = -1;
00581          break;
00582    }
00583 
00584    // Cleanup
00585    if (delete_mess)
00586       delete mess;
00587 
00588    // We are done successfully
00589    return rc;
00590 }
00591 
00592 
00593 //______________________________________________________________________________
00594 void TApplicationRemote::RecvLogFile(Int_t size)
00595 {
00596    // Receive the log file from the server
00597 
00598    const Int_t kMAXBUF = 16384;  //32768  //16384  //65536;
00599    char buf[kMAXBUF];
00600 
00601    // Append messages to active logging unit
00602    Int_t fdout = fileno(stdout);
00603    if (fdout < 0) {
00604       Warning("RecvLogFile", "file descriptor for outputs undefined (%d):"
00605                              " will not log msgs", fdout);
00606       return;
00607    }
00608    lseek(fdout, (off_t) 0, SEEK_END);
00609 
00610    Int_t  left, rec, r;
00611    Long_t filesize = 0;
00612 
00613    while (filesize < size) {
00614       left = Int_t(size - filesize);
00615       if (left > kMAXBUF)
00616          left = kMAXBUF;
00617       rec = fSocket->RecvRaw(&buf, left);
00618       filesize = (rec > 0) ? (filesize + rec) : filesize;
00619       if (rec > 0) {
00620 
00621          char *p = buf;
00622          r = rec;
00623          while (r) {
00624             Int_t w;
00625 
00626             w = write(fdout, p, r);
00627 
00628             if (w < 0) {
00629                SysError("RecvLogFile", "error writing to unit: %d", fdout);
00630                break;
00631             }
00632             r -= w;
00633             p += w;
00634          }
00635       } else if (rec < 0) {
00636          Error("RecvLogFile", "error during receiving log file");
00637          break;
00638       }
00639    }
00640 }
00641 
00642 //______________________________________________________________________________
00643 Int_t TApplicationRemote::SendObject(const TObject *obj)
00644 {
00645    // Send object to server.
00646    // Return 0 on success, -1 in case of error.
00647 
00648    if (!IsValid() || !obj) return -1;
00649 
00650    TMessage mess(kMESS_OBJECT);
00651    mess.WriteObject(obj);
00652    return Broadcast(mess);
00653 }
00654 
00655 //______________________________________________________________________________
00656 Bool_t TApplicationRemote::CheckFile(const char *file, Long_t modtime)
00657 {
00658    // Check if a file needs to be send to the server. Use the following
00659    // algorithm:
00660    //   - check if file appears in file map
00661    //     - if yes, get file's modtime and check against time in map,
00662    //       if modtime not same get md5 and compare against md5 in map,
00663    //       if not same return kTRUE.
00664    //     - if no, get file's md5 and modtime and store in file map, ask
00665    //       slave if file exists with specific md5, if yes return kFALSE,
00666    //       if no return kTRUE.
00667    // Returns kTRUE in case file needs to be send, returns kFALSE in case
00668    // file is already on remote node.
00669 
00670    Bool_t sendto = kFALSE;
00671 
00672    if (!IsValid()) return -1;
00673 
00674    // The filename for the cache
00675    TString fn = gSystem->BaseName(file);
00676 
00677    // Check if the file is already in the cache
00678    TARFileStat *fs = 0;
00679    if (fFileList && (fs = (TARFileStat *) fFileList->FindObject(fn))) {
00680       // File in cache
00681       if (fs->fModtime != modtime) {
00682          TMD5 *md5 = TMD5::FileChecksum(file);
00683          if (md5) {
00684             if ((*md5) != fs->fMD5) {
00685                sendto       = kTRUE;
00686                fs->fMD5      = *md5;
00687                fs->fModtime  = modtime;
00688             }
00689             delete md5;
00690          } else {
00691             Error("CheckFile", "could not calculate local MD5 check sum - dont send");
00692             return kFALSE;
00693          }
00694       }
00695    } else {
00696       // file not in the cache
00697       TMD5 *md5 = TMD5::FileChecksum(file);
00698       if (md5) {
00699          fs = new TARFileStat(fn, *md5, modtime);
00700          if (!fFileList)
00701             fFileList = new THashList;
00702          fFileList->Add(fs);
00703          delete md5;
00704       } else {
00705          Error("CheckFile", "could not calculate local MD5 check sum - dont send");
00706          return kFALSE;
00707       }
00708       TMessage mess(kMESS_ANY);
00709       mess << Int_t(kRRT_CheckFile) << TString(gSystem->BaseName(file)) << fs->fMD5;
00710       fSocket->Send(mess);
00711 
00712       TMessage *reply;
00713       fSocket->Recv(reply);
00714       if (reply) {
00715          if (reply->What() == kMESS_ANY) {
00716             // Check the type
00717             Int_t type;
00718             Bool_t uptodate;
00719             (*reply) >> type >> uptodate;
00720             if (type != kRRT_CheckFile) {
00721                // Protocol error
00722                Warning("CheckFile", "received wrong type:"
00723                                     " %d (expected %d): protocol error?",
00724                                     type, (Int_t)kRRT_CheckFile);
00725             }
00726             sendto = uptodate ? kFALSE : kTRUE;
00727          } else {
00728             // Protocol error
00729             Error("CheckFile", "received wrong message: %d (expected %d)",
00730                                reply->What(), kMESS_ANY);
00731          }
00732       } else {
00733          Error("CheckFile", "received empty message");
00734       }
00735       // Collect logs
00736       Collect();
00737    }
00738 
00739    // Done
00740    return sendto;
00741 }
00742 
00743 //______________________________________________________________________________
00744 Int_t TApplicationRemote::SendFile(const char *file, Int_t opt, const char *rfile)
00745 {
00746    // Send a file to the server. Return 0 on success, -1 in case of error.
00747    // If defined, the full path of the remote path will be rfile.
00748    // The mask 'opt' is an or of ESendFileOpt:
00749    //
00750    //       kAscii  (0x0)      if set true ascii file transfer is used
00751    //       kBinary (0x1)      if set true binary file transfer is used
00752    //       kForce  (0x2)      if not set an attempt is done to find out
00753    //                          whether the file really needs to be downloaded
00754    //                          (a valid copy may already exist in the cache
00755    //                          from a previous run)
00756    //
00757 
00758    if (!IsValid()) return -1;
00759 
00760 #ifndef R__WIN32
00761    Int_t fd = open(file, O_RDONLY);
00762 #else
00763    Int_t fd = open(file, O_RDONLY | O_BINARY);
00764 #endif
00765    if (fd < 0) {
00766       SysError("SendFile", "cannot open file %s", file);
00767       return -1;
00768    }
00769 
00770    // Get info about the file
00771    Long64_t size;
00772    Long_t id, flags, modtime;
00773    if (gSystem->GetPathInfo(file, &id, &size, &flags, &modtime) == 1) {
00774       Error("SendFile", "cannot stat file %s", file);
00775       return -1;
00776    }
00777    if (size == 0) {
00778       Error("SendFile", "empty file %s", file);
00779       return -1;
00780    }
00781 
00782    // Decode options
00783    Bool_t bin   = (opt & kBinary)  ? kTRUE : kFALSE;
00784    Bool_t force = (opt & kForce)   ? kTRUE : kFALSE;
00785 
00786    const Int_t kMAXBUF = 32768;  //16384  //65536;
00787    char buf[kMAXBUF];
00788 
00789    const char *fnam = (rfile) ? rfile : gSystem->BaseName(file);
00790 
00791    Bool_t sendto = force ? kTRUE : CheckFile(file, modtime);
00792 
00793    // The value of 'size' is used as flag remotely, so we need to
00794    // reset it to 0 if we are not going to send the file
00795    size = sendto ? size : 0;
00796 
00797    if (gDebug > 1 && size > 0)
00798       Info("SendFile", "sending file %s", file);
00799 
00800    snprintf(buf, kMAXBUF, "%s %d %lld", fnam, bin, size);
00801    if (Broadcast(buf, kMESS_ANY, kRRT_File) == -1) {
00802       SafeDelete(fSocket);
00803       return -1;
00804    }
00805 
00806    if (sendto) {
00807 
00808       lseek(fd, 0, SEEK_SET);
00809 
00810       Int_t len;
00811       do {
00812          while ((len = read(fd, buf, kMAXBUF)) < 0 && TSystem::GetErrno() == EINTR)
00813             TSystem::ResetErrno();
00814 
00815          if (len < 0) {
00816             SysError("SendFile", "error reading from file %s", file);
00817             Interrupt();
00818             close(fd);
00819             return -1;
00820          }
00821 
00822          if (len > 0 && fSocket->SendRaw(buf, len) == -1) {
00823             SysError("SendFile", "error writing to server @ %s:%d (now offline)",
00824                      fUrl.GetHost(), fUrl.GetPort());
00825             SafeDelete(fSocket);
00826             break;
00827          }
00828 
00829       } while (len > 0);
00830    }
00831    close(fd);
00832 
00833    // Get the log (during collection this will be done at the end
00834    if (!TestBit(kCollecting))
00835       Collect();
00836 
00837    // Done
00838    return IsValid() ? 0 : -1;
00839 }
00840 
00841 //______________________________________________________________________________
00842 void TApplicationRemote::Terminate(Int_t status)
00843 {
00844    // Terminate this session
00845 
00846    TMessage mess(kMESS_ANY);
00847    mess << (Int_t)kRRT_Terminate << status;
00848    Broadcast(mess);
00849 
00850    SafeDelete(fRootFiles);
00851    SafeDelete(fMonitor);
00852    SafeDelete(fSocket);
00853 }
00854 
00855 //______________________________________________________________________________
00856 void TApplicationRemote::SetPortParam(Int_t lower, Int_t upper, Int_t attempts)
00857 {
00858    // Set port parameters for tunnelling. A value of -1 means unchanged
00859 
00860    if (lower > -1)
00861       fgPortLower = lower;
00862    if (upper > -1)
00863       fgPortUpper = upper;
00864    if (attempts > -1)
00865       fgPortAttempts = attempts;
00866 
00867    ::Info("TApplicationRemote::SetPortParam","port scan: %d attempts in [%d,%d]",
00868           fgPortAttempts, fgPortLower, fgPortUpper);
00869 }
00870 
00871 //______________________________________________________________________________
00872 Long_t TApplicationRemote::ProcessLine(const char *line, Bool_t, Int_t *)
00873 {
00874    // Parse a single command line and forward the request to the remote server
00875    // where it will be processed. The line is either a C++ statement or an
00876    // interpreter command starting with a ".".
00877    // Return the return value of the command casted to a long.
00878 
00879    if (!line || !*line) return 0;
00880 
00881    if (!strncasecmp(line, ".q", 2)) {
00882       // terminate the session
00883       gApplication->ProcessLine(".R -close");
00884       return 0;
00885    }
00886 
00887    if (!strncmp(line, "?", 1)) {
00888       Help(line);
00889       return 1;
00890    }
00891 
00892    fReceivedObject = 0;
00893 
00894    // Init graphics
00895    InitializeGraphics();
00896 
00897    // Ok, now we pack the command and we send it over for processing
00898    Broadcast(line, kMESS_CINT);
00899 
00900    // And collect the results
00901    Collect();
00902 
00903    // Done
00904    return (Long_t)fReceivedObject;
00905    return 1;
00906 }
00907 
00908 //______________________________________________________________________________
00909 void TApplicationRemote::Print(Option_t *opt) const
00910 {
00911    // Print some info about this instance
00912 
00913    TString s(Form("OBJ: TApplicationRemote     %s", fName.Data()));
00914    Printf("%s", s.Data());
00915    if (opt && opt[0] == 'F') {
00916       s = "    url: ";
00917       if (strlen(fUrl.GetUser()) > 0)
00918          s += Form("%s@", fUrl.GetUser());
00919       s += fUrl.GetHostFQDN();
00920       s += Form("  logfile: %s", fLogFilePath.Data());
00921       Printf("%s", s.Data());
00922    }
00923 }
00924 //______________________________________________________________________________
00925 void TApplicationRemote::Interrupt(Int_t type)
00926 {
00927    // Send interrupt OOB byte to server.
00928    // Returns 0 if ok, -1 in case of error
00929 
00930    if (!IsValid()) return;
00931 
00932    fInterrupt = kTRUE;
00933 
00934 #if 1
00935    Info("Interrupt", "*** Ctrl-C not yet enabled *** (type= %d)", type);
00936    return;
00937 #else
00938 
00939    char oobc = (char) type;
00940    const int kBufSize = 1024;
00941    char waste[kBufSize];
00942 
00943    // Send one byte out-of-band message to server
00944    if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
00945       Error("Interrupt", "error sending oobc to server");
00946       return;
00947    }
00948 
00949    if (type == kRRI_Hard) {
00950       char  oob_byte;
00951       int   n, nch, nbytes = 0, nloop = 0;
00952 
00953       // Receive the OOB byte
00954       while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
00955          if (n == -2) {   // EWOULDBLOCK
00956             //
00957             // The OOB data has not yet arrived: flush the input stream
00958             //
00959             // In some systems (Solaris) regular recv() does not return upon
00960             // receipt of the oob byte, which makes the below call to recv()
00961             // block indefinitely if there are no other data in the queue.
00962             // FIONREAD ioctl can be used to check if there are actually any
00963             // data to be flushed.  If not, wait for a while for the oob byte
00964             // to arrive and try to read it again.
00965             //
00966             fSocket->GetOption(kBytesToRead, nch);
00967             if (nch == 0) {
00968                gSystem->Sleep(1000);
00969                continue;
00970             }
00971 
00972             if (nch > kBufSize) nch = kBufSize;
00973             n = fSocket->RecvRaw(waste, nch);
00974             if (n <= 0) {
00975                Error("Interrupt", "error receiving waste from server");
00976                break;
00977             }
00978             nbytes += n;
00979          } else if (n == -3) {   // EINVAL
00980             //
00981             // The OOB data has not arrived yet
00982             //
00983             gSystem->Sleep(100);
00984             if (++nloop > 100) {  // 10 seconds time-out
00985                Error("Interrupt", "server does not respond");
00986                break;
00987             }
00988          } else {
00989             Error("Interrupt", "error receiving OOB from server");
00990             break;
00991          }
00992       }
00993 
00994       //
00995       // Continue flushing the input socket stream until the OOB
00996       // mark is reached
00997       //
00998       while (1) {
00999          int atmark;
01000 
01001          fSocket->GetOption(kAtMark, atmark);
01002 
01003          if (atmark)
01004             break;
01005 
01006          // find out number of bytes to read before atmark
01007          fSocket->GetOption(kBytesToRead, nch);
01008          if (nch == 0) {
01009             gSystem->Sleep(1000);
01010             continue;
01011          }
01012 
01013          if (nch > kBufSize) nch = kBufSize;
01014          n = fSocket->RecvRaw(waste, nch);
01015          if (n <= 0) {
01016             Error("Interrupt", "error receiving waste (2) from server");
01017             break;
01018          }
01019          nbytes += n;
01020       }
01021       if (nbytes > 0)
01022          Info("Interrupt", "server synchronized: %d bytes discarded", nbytes);
01023 
01024       // Get log file from server after a hard interrupt
01025       Collect();
01026 
01027    } else if (type == kRRI_Soft) {
01028 
01029       // Get log file from server after a soft interrupt
01030       Collect();
01031 
01032    } else if (type == kRRI_Shutdown) {
01033 
01034       ; // nothing expected to be returned
01035 
01036    } else {
01037 
01038       // Unexpected message, just receive log file
01039       Collect();
01040    }
01041 #endif
01042 }
01043 
01044 //______________________________________________________________________________
01045 void TApplicationRemote::Browse(TBrowser *b)
01046 {
01047    // Browse remote application (working directory and ROOT files).
01048 
01049    b->Add(fRootFiles, "ROOT Files");
01050    b->Add(fWorkingDir, fWorkingDir->GetTitle());
01051    gROOT->RefreshBrowsers();
01052 }

Generated on Tue Jul 5 14:46:10 2011 for ROOT_528-00b_version by  doxygen 1.5.1