TXProofMgr.cxx

Go to the documentation of this file.
00001 // @(#)root/proofx:$Id: TXProofMgr.cxx 34254 2010-06-30 16:29:36Z ganis $
00002 // Author: Gerardo Ganis  12/12/2005
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TXProofMgr                                                           //
00015 //                                                                      //
00016 // The PROOF manager interacts with the PROOF server coordinator to     //
00017 // create or destroy a PROOF session, attach to or detach from          //
00018 // existing one, and to monitor any client activity on the cluster.     //
00019 // At most one manager instance per server is allowed.                  //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 #include <errno.h>
00024 #ifdef WIN32
00025 #include <io.h>
00026 #endif
00027 
00028 #include "Getline.h"
00029 #include "TList.h"
00030 #include "TObjArray.h"
00031 #include "TObjString.h"
00032 #include "TProof.h"
00033 #include "TProofLog.h"
00034 #include "TXProofMgr.h"
00035 #include "TXSocket.h"
00036 #include "TXSocketHandler.h"
00037 #include "TROOT.h"
00038 #include "TStopwatch.h"
00039 #include "TSysEvtHandler.h"
00040 #include "XProofProtocol.h"
00041 
00042 ClassImp(TXProofMgr)
00043 
00044 //
00045 //----- ProofMgr Interrupt signal handler
00046 //
00047 class TProofMgrInterruptHandler : public TSignalHandler {
00048 private:
00049    TProofMgr *fMgr;
00050 
00051    TProofMgrInterruptHandler(const TProofMgrInterruptHandler&); // Not implemented
00052    TProofMgrInterruptHandler& operator=(const TProofMgrInterruptHandler&); // Not implemented
00053 public:
00054    TProofMgrInterruptHandler(TProofMgr *mgr)
00055       : TSignalHandler(kSigInterrupt, kFALSE), fMgr(mgr) { }
00056    Bool_t Notify();
00057 };
00058 
00059 //______________________________________________________________________________
00060 Bool_t TProofMgrInterruptHandler::Notify()
00061 {
00062    // TProofMgr interrupt handler.
00063 
00064    // Only on clients
00065    if (isatty(0) != 0 && isatty(1) != 0) {
00066       TString u = fMgr->GetUrl();
00067       Printf("Opening new connection to %s", u.Data());
00068       TXSocket *s = new TXSocket(u, 'C', kPROOF_Protocol,
00069                                  kXPROOF_Protocol, 0, -1, (TXHandler *)fMgr);
00070       if (s && s->IsValid()) {
00071          // Set the interrupt flag on the server
00072          s->CtrlC();
00073       }
00074    }
00075    return kTRUE;
00076 }
00077 
00078 // Autoloading hooks.
00079 // These are needed to avoid using the plugin manager which may create
00080 // problems in multi-threaded environments.
00081 TProofMgr *GetTXProofMgr(const char *url, Int_t l, const char *al)
00082 { return ((TProofMgr *) new TXProofMgr(url, l, al)); }
00083 
00084 class TXProofMgrInit {
00085 public:
00086    TXProofMgrInit() {
00087       TProofMgr::SetTXProofMgrHook(&GetTXProofMgr);
00088 }};
00089 static TXProofMgrInit gxproofmgr_init;
00090 
00091 //______________________________________________________________________________
00092 TXProofMgr::TXProofMgr(const char *url, Int_t dbg, const char *alias)
00093           : TProofMgr(url, dbg, alias)
00094 {
00095    // Create a PROOF manager for the standard (old) environment.
00096 
00097    // Set the correct servert type
00098    fServType = kXProofd;
00099 
00100    // Initialize
00101    if (Init(dbg) != 0) {
00102       // Failure: make sure the socket is deleted so that its lack of
00103       // validity is correctly transmitted
00104       SafeDelete(fSocket);
00105    }
00106 }
00107 
00108 //______________________________________________________________________________
00109 Int_t TXProofMgr::Init(Int_t)
00110 {
00111    // Do real initialization: open the connection and set the relevant
00112    // variables.
00113    // Login and authentication are dealt with at this level, if required.
00114    // Return 0 in case of success, 1 if the remote server is a 'proofd',
00115    // -1 in case of error.
00116 
00117    // Here we make sure that the port is explicitly specified in the URL,
00118    // even when it matches the default value
00119    TString u = fUrl.GetUrl(kTRUE);
00120 
00121    fSocket = 0;
00122    if (!(fSocket = new TXSocket(u, 'C', kPROOF_Protocol,
00123                                 kXPROOF_Protocol, 0, -1, this)) ||
00124        !(fSocket->IsValid())) {
00125       if (!fSocket || !(fSocket->IsServProofd()))
00126          if (gDebug > 0)
00127             Error("Init", "while opening the connection to %s - exit (error: %d)",
00128                           u.Data(), (fSocket ? fSocket->GetOpenError() : -1));
00129       if (fSocket && fSocket->IsServProofd())
00130          fServType = TProofMgr::kProofd;
00131       return -1;
00132    }
00133 
00134    // Protocol run by remote PROOF server
00135    fRemoteProtocol = fSocket->GetRemoteProtocol();
00136 
00137    // We add the manager itself for correct destruction
00138    {  R__LOCKGUARD2(gROOTMutex);
00139       gROOT->GetListOfSockets()->Remove(fSocket);
00140    }
00141 
00142    // Set interrupt PROOF handler from now on
00143    fIntHandler = new TProofMgrInterruptHandler(this);
00144 
00145    // We are done
00146    return 0;
00147 }
00148 
00149 //______________________________________________________________________________
00150 TXProofMgr::~TXProofMgr()
00151 {
00152    // Destructor: close the connection
00153 
00154    SetInvalid();
00155 }
00156 
00157 //______________________________________________________________________________
00158 void TXProofMgr::SetInvalid()
00159 {
00160    // Invalidate this manager by closing the connection
00161 
00162    if (fSocket)
00163       fSocket->Close("P");
00164    SafeDelete(fSocket);
00165 
00166    // Avoid destroying twice
00167    {  R__LOCKGUARD2(gROOTMutex);
00168       gROOT->GetListOfSockets()->Remove(this);
00169    }
00170 }
00171 
00172 //______________________________________________________________________________
00173 TProof *TXProofMgr::AttachSession(TProofDesc *d, Bool_t gui)
00174 {
00175    // Dummy version provided for completeness. Just returns a pointer to
00176    // existing session 'id' (as shown by TProof::QuerySessions) or 0 if 'id' is
00177    // not valid. The boolena 'gui' should be kTRUE when invoked from the GUI.
00178 
00179    if (!IsValid()) {
00180       Warning("AttachSession","invalid TXProofMgr - do nothing");
00181       return 0;
00182    }
00183    if (!d) {
00184       Warning("AttachSession","invalid description object - do nothing");
00185       return 0;
00186    }
00187 
00188    if (d->GetProof())
00189       // Nothing to do if already in contact with proofserv
00190       return d->GetProof();
00191 
00192    // Re-compose url
00193    TString u(Form("%s/?%d", fUrl.GetUrl(kTRUE), d->GetRemoteId()));
00194 
00195    // We need this to set correctly the kUsingSessionGui bit before the first
00196    // feedback messages arrive
00197    if (gui)
00198       u += "GUI";
00199 
00200    // Attach
00201    TProof *p = new TProof(u, 0, 0, gDebug, 0, this);
00202    if (p && p->IsValid()) {
00203 
00204       // Set reference manager
00205       p->SetManager(this);
00206 
00207       // Save record about this session
00208       Int_t st = (p->IsIdle()) ? TProofDesc::kIdle
00209                                  : TProofDesc::kRunning;
00210       d->SetStatus(st);
00211       d->SetProof(p);
00212 
00213       // Set session tag
00214       p->SetName(d->GetName());
00215 
00216    } else {
00217       // Session creation failed
00218       Error("AttachSession", "attaching to PROOF session");
00219    }
00220    return p;
00221 }
00222 
00223 //______________________________________________________________________________
00224 void TXProofMgr::DetachSession(Int_t id, Option_t *opt)
00225 {
00226    // Detach session with 'id' from its proofserv. The 'id' is the number
00227    // shown by QuerySessions. The correspondent TProof object is deleted.
00228    // If id == 0 all the known sessions are detached.
00229    // Option opt="S" or "s" forces session shutdown.
00230 
00231    if (!IsValid()) {
00232       Warning("DetachSession","invalid TXProofMgr - do nothing");
00233       return;
00234    }
00235 
00236    if (id > 0) {
00237       // Single session request
00238       TProofDesc *d = GetProofDesc(id);
00239       if (d) {
00240          if (fSocket)
00241             fSocket->DisconnectSession(d->GetRemoteId(), opt);
00242          TProof *p = d->GetProof();
00243          fSessions->Remove(d);
00244          SafeDelete(p);
00245          delete d;
00246       }
00247    } else if (id == 0) {
00248 
00249       // Requesto to destroy all sessions
00250       if (fSocket) {
00251          TString o = Form("%sA",opt);
00252          fSocket->DisconnectSession(-1, o);
00253       }
00254       if (fSessions) {
00255          // Delete PROOF sessions
00256          TIter nxd(fSessions);
00257          TProofDesc *d = 0;
00258          while ((d = (TProofDesc *)nxd())) {
00259             TProof *p = d->GetProof();
00260             SafeDelete(p);
00261          }
00262          fSessions->Delete();
00263       }
00264    }
00265 
00266    return;
00267 }
00268 
00269 //______________________________________________________________________________
00270 void TXProofMgr::DetachSession(TProof *p, Option_t *opt)
00271 {
00272    // Detach session 'p' from its proofserv. The instance 'p' is invalidated
00273    // and should be deleted by the caller
00274 
00275    if (!IsValid()) {
00276       Warning("DetachSession","invalid TXProofMgr - do nothing");
00277       return;
00278    }
00279 
00280    if (p) {
00281       // Single session request
00282       TProofDesc *d = GetProofDesc(p);
00283       if (d) {
00284          if (fSocket)
00285             fSocket->DisconnectSession(d->GetRemoteId(), opt);
00286          fSessions->Remove(d);
00287          p->Close(opt);
00288          delete d;
00289       }
00290    }
00291 
00292    return;
00293 }
00294 
00295 //______________________________________________________________________________
00296 Bool_t TXProofMgr::MatchUrl(const char *url)
00297 {
00298    // Checks if 'url' refers to the same 'user@host:port' entity as the URL
00299    // in memory. TProofMgr::MatchUrl cannot be used here because of the
00300    // 'double' default port, implying an additional check on the port effectively
00301    // open.
00302 
00303    if (!IsValid()) {
00304       Warning("MatchUrl","invalid TXProofMgr - do nothing");
00305       return 0;
00306    }
00307 
00308    TUrl u(url);
00309 
00310    // Correct URL protocol
00311    if (!strcmp(u.GetProtocol(), TUrl("a").GetProtocol()))
00312       u.SetProtocol("proof");
00313 
00314    if (u.GetPort() == TUrl("a").GetPort()) {
00315       // Set default port
00316       Int_t port = gSystem->GetServiceByName("proofd");
00317       if (port < 0)
00318          port = 1093;
00319       u.SetPort(port);
00320    }
00321 
00322    // Now we can check
00323    if (!strcmp(u.GetHostFQDN(), fUrl.GetHost()))
00324       if (u.GetPort() == fUrl.GetPort() ||
00325           u.GetPort() == fSocket->GetPort())
00326          if (strlen(u.GetUser()) <= 0 || !strcmp(u.GetUser(),fUrl.GetUser()))
00327             return kTRUE;
00328 
00329    // Match failed
00330    return kFALSE;
00331 }
00332 
00333 //______________________________________________________________________________
00334 void TXProofMgr::ShowWorkers()
00335 {
00336    // Show available workers
00337 
00338    if (!IsValid()) {
00339       Warning("ShowWorkers","invalid TXProofMgr - do nothing");
00340       return;
00341    }
00342 
00343    // Send the request
00344    TObjString *os = fSocket->SendCoordinator(kQueryWorkers);
00345    if (os) {
00346       TObjArray *oa = TString(os->GetName()).Tokenize(TString("&"));
00347       if (oa) {
00348          TIter nxos(oa);
00349          TObjString *to = 0;
00350          while ((to = (TObjString *) nxos()))
00351             // Now parse them ...
00352             Printf("+  %s", to->GetName());
00353       }
00354    }
00355 }
00356 
00357 //______________________________________________________________________________
00358 TList *TXProofMgr::QuerySessions(Option_t *opt)
00359 {
00360    // Get list of sessions accessible to this manager
00361 
00362    if (opt && !strncasecmp(opt,"L",1))
00363       // Just return the existing list
00364       return fSessions;
00365 
00366    // Nothing to do if not in contact with proofserv
00367    if (!IsValid()) {
00368       Warning("QuerySessions","invalid TXProofMgr - do nothing");
00369       return 0;
00370    }
00371 
00372    // Create list if not existing
00373    if (!fSessions) {
00374       fSessions = new TList();
00375       fSessions->SetOwner();
00376    }
00377 
00378    // Send the request
00379    TList *ocl = new TList;
00380    TObjString *os = fSocket->SendCoordinator(kQuerySessions);
00381    if (os) {
00382       TObjArray *oa = TString(os->GetName()).Tokenize(TString("|"));
00383       if (oa) {
00384          TProofDesc *d = 0;
00385          TIter nxos(oa);
00386          TObjString *to = (TObjString *) nxos();
00387          while ((to = (TObjString *) nxos())) {
00388             // Now parse them ...
00389             char al[256];
00390             char tg[256];
00391             Int_t id = -1, st = -1, nc = 0;
00392             sscanf(to->GetName(),"%d %s %s %d %d", &id, tg, al, &st, &nc);
00393             // Add to the list, if not already there
00394             if (!(d = (TProofDesc *) fSessions->FindObject(tg))) {
00395                Int_t locid = fSessions->GetSize() + 1;
00396                d = new TProofDesc(tg, al, GetUrl(), locid, id, st, 0);
00397                fSessions->Add(d);
00398             } else {
00399                // Set missing / update info
00400                d->SetStatus(st);
00401                d->SetRemoteId(id);
00402                d->SetTitle(al);
00403             }
00404             // Add to the list for final garbage collection
00405             ocl->Add(new TObjString(tg));
00406          }
00407          SafeDelete(oa);
00408       }
00409       SafeDelete(os);
00410    }
00411 
00412    // Printout and Garbage collection
00413    if (fSessions->GetSize() > 0) {
00414       TIter nxd(fSessions);
00415       TProofDesc *d = 0;
00416       while ((d = (TProofDesc *)nxd())) {
00417          if (ocl->FindObject(d->GetName())) {
00418             if (opt && !strncasecmp(opt,"S",1))
00419                d->Print("");
00420          } else {
00421             fSessions->Remove(d);
00422             SafeDelete(d);
00423          }
00424       }
00425    }
00426 
00427    // We are done
00428    return fSessions;
00429 }
00430 
00431 //_____________________________________________________________________________
00432 Bool_t TXProofMgr::HandleInput(const void *)
00433 {
00434    // Handle asynchronous input on the socket
00435 
00436    if (fSocket && fSocket->IsValid()) {
00437       TMessage *mess;
00438       if (fSocket->Recv(mess) >= 0) {
00439          Int_t what = mess->What();
00440          if (gDebug > 0)
00441             Info("HandleInput", "%p: got message type: %d", this, what);
00442          switch (what) {
00443             case kPROOF_TOUCH:
00444                fSocket->RemoteTouch();
00445                break;
00446             default:
00447                Warning("HandleInput", "%p: got unknown message type: %d", this, what);
00448                break;
00449          }
00450       }
00451    } else {
00452       Warning("HandleInput", "%p: got message but socket is invalid!", this);
00453    }
00454 
00455    // We are done
00456    return kTRUE;
00457 }
00458 
00459 //_____________________________________________________________________________
00460 Bool_t TXProofMgr::HandleError(const void *in)
00461 {
00462    // Handle error on the input socket
00463 
00464    XHandleErr_t *herr = in ? (XHandleErr_t *)in : 0;
00465 
00466    // Try reconnection
00467    if (fSocket && herr && (herr->fOpt == 1)) {
00468       fSocket->Reconnect();
00469       if (fSocket && fSocket->IsValid()) {
00470          if (gDebug > 0)
00471             Printf("ProofMgr: connection to coordinator at %s re-established",
00472                    fUrl.GetUrl());
00473          return kFALSE;
00474       }
00475    }
00476    Printf("TXProofMgr::HandleError: %p: got called ...", this);
00477 
00478    // Interrupt any PROOF session in Collect
00479    if (fSessions && fSessions->GetSize() > 0) {
00480       TIter nxd(fSessions);
00481       TProofDesc *d = 0;
00482       while ((d = (TProofDesc *)nxd())) {
00483          TProof *p = (TProof *) d->GetProof();
00484          if (p)
00485             p->InterruptCurrentMonitor();
00486       }
00487    }
00488    if (gDebug > 0)
00489       Printf("TXProofMgr::HandleError: %p: DONE ... ", this);
00490 
00491    // We are done
00492    return kTRUE;
00493 }
00494 
00495 //______________________________________________________________________________
00496 Int_t TXProofMgr::Reset(Bool_t hard, const char *usr)
00497 {
00498    // Send a cleanup request for the sessions associated with the current user.
00499    // If 'hard' is true sessions are signalled for termination and moved to
00500    // terminate at all stages (top master, sub-master, workers). Otherwise
00501    // (default) only top-master sessions are asked to terminate, triggering
00502    // a gentle session termination. In all cases all sessions should be gone
00503    // after a few (2 or 3) session checking cycles.
00504    // A user with superuser privileges can also asks cleaning for an different
00505    // user, specified by 'usr', or for all users (usr = *)
00506    // Return 0 on success, -1 in case of error.
00507 
00508    // Nothing to do if not in contact with proofserv
00509    if (!IsValid()) {
00510       Warning("Reset","invalid TXProofMgr - do nothing");
00511       return -1;
00512    }
00513 
00514    Int_t h = (hard) ? 1 : 0;
00515    fSocket->SendCoordinator(kCleanupSessions, usr, h);
00516 
00517    return 0;
00518 }
00519 
00520 //_____________________________________________________________________________
00521 TProofLog *TXProofMgr::GetSessionLogs(Int_t isess,
00522                                       const char *stag, const char *pattern)
00523 {
00524    // Get logs or log tails from last session associated with this manager
00525    // instance.
00526    // The arguments allow to specify a session different from the last one:
00527    //      isess   specifies a position relative to the last one, i.e. 1
00528    //              for the next to last session; the absolute value is taken
00529    //              so -1 and 1 are equivalent.
00530    //      stag    specifies the unique tag of the wanted session
00531    // The special value stag = "NR" allows to just initialize the TProofLog
00532    // object w/o retrieving the files; this may be useful when the number
00533    // of workers is large and only a subset of logs is required.
00534    // If 'stag' is specified 'isess' is ignored (unless stag = "NR").
00535    // If 'pattern' is specified only the lines containing it are retrieved
00536    // (remote grep functionality); to filter out a pattern 'pat' use
00537    // pattern = "-v pat".
00538    // Returns a TProofLog object (to be deleted by the caller) on success,
00539    // 0 if something wrong happened.
00540 
00541    // Nothing to do if not in contact with proofserv
00542    if (!IsValid()) {
00543       Warning("GetSessionLogs","invalid TXProofMgr - do nothing");
00544       return 0;
00545    }
00546 
00547    TProofLog *pl = 0;
00548 
00549    // The absolute value of isess counts
00550    isess = (isess > 0) ? -isess : isess;
00551 
00552    // Special option in stag
00553    bool retrieve = 1;
00554    TString sesstag(stag);
00555    if (sesstag == "NR") {
00556       retrieve = 0;
00557       sesstag = "";
00558    }
00559 
00560    // Get the list of paths
00561    TObjString *os = fSocket->SendCoordinator(kQueryLogPaths, sesstag.Data(), isess);
00562 
00563    // Analyse it now
00564    Int_t ii = 0;
00565    if (os) {
00566       TString rs(os->GetName());
00567       Ssiz_t from = 0;
00568       // The session tag
00569       TString tag;
00570       if (!rs.Tokenize(tag, from, "|")) {
00571          Warning("GetSessionLogs", "Session tag undefined: corruption?\n"
00572                                    " (received string: %s)", os->GetName());
00573          return (TProofLog *)0;
00574       }
00575       // The pool url
00576       TString purl;
00577       if (!rs.Tokenize(purl, from, "|")) {
00578          Warning("GetSessionLogs", "Pool URL undefined: corruption?\n"
00579                                    " (received string: %s)", os->GetName());
00580          return (TProofLog *)0;
00581       }
00582       // Create the instance now
00583       if (!pl)
00584          pl = new TProofLog(tag, GetUrl(), this);
00585 
00586       // Per-node info
00587       TString to;
00588       while (rs.Tokenize(to, from, "|")) {
00589          if (!to.IsNull()) {
00590             TString ord(to);
00591             ord.Strip(TString::kLeading, ' ');
00592             TString url(ord);
00593             if ((ii = ord.Index(" ")) != kNPOS)
00594                ord.Remove(ii);
00595             if ((ii = url.Index(" ")) != kNPOS)
00596                url.Remove(0, ii + 1);
00597             // Add to the list (special tag for valgrind outputs)
00598             if (url.Contains(".valgrind")) ord += "-valgrind";
00599             pl->Add(ord, url);
00600             // Notify
00601             if (gDebug > 1)
00602                Info("GetSessionLogs", "ord: %s, url: %s", ord.Data(), url.Data());
00603          }
00604       }
00605       // Cleanup
00606       SafeDelete(os);
00607       // Retrieve the default part if required
00608       if (pl && retrieve) {
00609          if (pattern && strlen(pattern) > 0)
00610             pl->Retrieve("*", TProofLog::kGrep, 0, pattern);
00611          else
00612             pl->Retrieve();
00613       }
00614    }
00615 
00616    // Done
00617    return pl;
00618 }
00619 
00620 //______________________________________________________________________________
00621 TObjString *TXProofMgr::ReadBuffer(const char *fin, Long64_t ofs, Int_t len)
00622 {
00623    // Read, via the coordinator, 'len' bytes from offset 'ofs' of 'file'.
00624    // Returns a TObjString with the content or 0, in case of failure
00625 
00626    // Nothing to do if not in contact with proofserv
00627    if (!IsValid()) {
00628       Warning("ReadBuffer","invalid TXProofMgr - do nothing");
00629       return (TObjString *)0;
00630    }
00631 
00632    // Send the request
00633    return fSocket->SendCoordinator(kReadBuffer, fin, len, ofs, 0);
00634 }
00635 
00636 //______________________________________________________________________________
00637 TObjString *TXProofMgr::ReadBuffer(const char *fin, const char *pattern)
00638 {
00639    // Read, via the coordinator, lines containing 'pattern' in 'file'.
00640    // Returns a TObjString with the content or 0, in case of failure
00641 
00642    // Nothing to do if not in contact with proofserv
00643    if (!IsValid()) {
00644       Warning("ReadBuffer","invalid TXProofMgr - do nothing");
00645       return (TObjString *)0;
00646    }
00647 
00648    // Prepare the buffer
00649    Int_t plen = strlen(pattern);
00650    Int_t lfi = strlen(fin);
00651    char *buf = new char[lfi + plen + 1];
00652    memcpy(buf, fin, lfi);
00653    memcpy(buf+lfi, pattern, plen);
00654    buf[lfi+plen] = 0;
00655 
00656    // Send the request
00657    return fSocket->SendCoordinator(kReadBuffer, buf, plen, 0, 1);
00658 }
00659 
00660 //______________________________________________________________________________
00661 void TXProofMgr::ShowROOTVersions()
00662 {
00663    // Display what ROOT versions are available on the cluster
00664 
00665    // Nothing to do if not in contact with proofserv
00666    if (!IsValid()) {
00667       Warning("ShowROOTVersions","invalid TXProofMgr - do nothing");
00668       return;
00669    }
00670 
00671    // Send the request
00672    TObjString *os = fSocket->SendCoordinator(kQueryROOTVersions);
00673    if (os) {
00674       // Display it
00675       Printf("----------------------------------------------------------\n");
00676       Printf("Available versions (tag ROOT-vers remote-path PROOF-version):\n");
00677       Printf("%s", os->GetName());
00678       Printf("----------------------------------------------------------");
00679       SafeDelete(os);
00680    }
00681 
00682    // We are done
00683    return;
00684 }
00685 
00686 //______________________________________________________________________________
00687 void TXProofMgr::SetROOTVersion(const char *tag)
00688 {
00689    // Set the default ROOT version to be used
00690 
00691    // Nothing to do if not in contact with proofserv
00692    if (!IsValid()) {
00693       Warning("SetROOTVersion","invalid TXProofMgr - do nothing");
00694       return;
00695    }
00696 
00697    // Send the request
00698    fSocket->SendCoordinator(kROOTVersion, tag);
00699 
00700    // We are done
00701    return;
00702 }
00703 
00704 //______________________________________________________________________________
00705 Int_t TXProofMgr::SendMsgToUsers(const char *msg, const char *usr)
00706 {
00707    // Send a message to connected users. Only superusers can do this.
00708    // The first argument specifies the message or the file from where to take
00709    // the message.
00710    // The second argument specifies the user to which to send the message: if
00711    // empty or null the message is send to all the connected users.
00712    // return 0 in case of success, -1 in case of error
00713 
00714    Int_t rc = 0;
00715 
00716    // Check input
00717    if (!msg || strlen(msg) <= 0) {
00718       Error("SendMsgToUsers","no message to send - do nothing");
00719       return -1;
00720    }
00721 
00722    // Buffer (max 32K)
00723    const Int_t kMAXBUF = 32768;
00724    char buf[kMAXBUF] = {0};
00725    char *p = &buf[0];
00726    Int_t space = kMAXBUF - 1;
00727    Int_t len = 0;
00728    Int_t lusr = 0;
00729 
00730    // A specific user?
00731    if (usr && strlen(usr) > 0 && (strlen(usr) != 1 || usr[0] != '*')) {
00732       lusr = (strlen(usr) + 3);
00733       sprintf(buf, "u:%s ", usr);
00734       p += lusr;
00735       space -= lusr;
00736    }
00737 
00738    // Is it from file ?
00739    if (!gSystem->AccessPathName(msg, kFileExists)) {
00740       // From file: can we read it ?
00741       if (gSystem->AccessPathName(msg, kReadPermission)) {
00742          Error("SendMsgToUsers","request to read message from unreadable file '%s'", msg);
00743          return -1;
00744       }
00745       // Open the file
00746       FILE *f = 0;
00747       if (!(f = fopen(msg, "r"))) {
00748          Error("SendMsgToUsers", "file '%s' cannot be open", msg);
00749          return -1;
00750       }
00751       // Determine the number of bytes to be read from the file.
00752       Int_t left = (Int_t) lseek(fileno(f), (off_t) 0, SEEK_END);
00753       lseek(fileno(f), (off_t) 0, SEEK_SET);
00754       // Now readout from file
00755       Int_t wanted = left;
00756       if (wanted > space) {
00757          wanted = space;
00758          Warning("SendMsgToUsers",
00759                  "requested to send %d bytes: max size is %d bytes: truncating", left, space);
00760       }
00761       do {
00762          while ((len = read(fileno(f), p, wanted)) < 0 &&
00763                   TSystem::GetErrno() == EINTR)
00764             TSystem::ResetErrno();
00765          if (len < 0) {
00766             SysError("SendMsgToUsers", "error reading file");
00767             break;
00768          }
00769 
00770          // Update counters
00771          left -= len;
00772          p += len;
00773          wanted = (left > kMAXBUF-1) ? kMAXBUF-1 : left;
00774 
00775       } while (len > 0 && left > 0);
00776    } else {
00777       // Add the message to the buffer
00778       len = strlen(msg);
00779       if (len > space) {
00780          Warning("SendMsgToUsers",
00781                  "requested to send %d bytes: max size is %d bytes: truncating", len, space);
00782          len = space;
00783       }
00784       memcpy(p, msg, len);
00785    }
00786 
00787    // Null-terminate
00788    buf[len + lusr] = 0;
00789 
00790    // Send the request
00791    fSocket->SendCoordinator(kSendMsgToUser, buf);
00792 
00793    return rc;
00794 }
00795 
00796 //______________________________________________________________________________
00797 void TXProofMgr::Grep(const char *what, const char *how, const char *where)
00798 {
00799    // Run 'grep' on the nodes
00800 
00801    // Nothing to do if not in contact with proofserv
00802    if (!IsValid()) {
00803       Warning("Grep","invalid TXProofMgr - do nothing");
00804       return;
00805    }
00806    // Server may not support it
00807    if (fSocket->GetXrdProofdVersion() < 1006) {
00808       Warning("Grep", "functionality not supported by server");
00809       return;
00810    }
00811 
00812    // Send the request
00813    TObjString *os = Exec(kGrep, what, how, where);
00814 
00815    // Show the result, if any
00816    if (os) Printf("%s", os->GetName());
00817 
00818    // Cleanup
00819    SafeDelete(os);
00820 }
00821 
00822 //______________________________________________________________________________
00823 void TXProofMgr::Find(const char *what, const char *how, const char *where)
00824 {
00825    // Run 'find' on the nodes
00826 
00827    // Nothing to do if not in contact with proofserv
00828    if (!IsValid()) {
00829       Warning("Find","invalid TXProofMgr - do nothing");
00830       return;
00831    }
00832    // Server may not support it
00833    if (fSocket->GetXrdProofdVersion() < 1006) {
00834       Warning("Find", "functionality not supported by server (XrdProofd version: %d)",
00835                       fSocket->GetXrdProofdVersion());
00836       return;
00837    }
00838 
00839    // Send the request
00840    TObjString *os = Exec(kFind, what, how, where);
00841 
00842    // Show the result, if any
00843    if (os) Printf("%s", os->GetName());
00844 
00845    // Cleanup
00846    SafeDelete(os);
00847 }
00848 
00849 //______________________________________________________________________________
00850 void TXProofMgr::Ls(const char *what, const char *how, const char *where)
00851 {
00852    // Run 'ls' on the nodes
00853 
00854    // Nothing to do if not in contact with proofserv
00855    if (!IsValid()) {
00856       Warning("Ls","invalid TXProofMgr - do nothing");
00857       return;
00858    }
00859    // Server may not support it
00860    if (fSocket->GetXrdProofdVersion() < 1006) {
00861       Warning("Ls", "functionality not supported by server");
00862       return;
00863    }
00864 
00865    // Send the request
00866    TObjString *os = Exec(kLs, what, how, where);
00867 
00868    // Show the result, if any
00869    if (os) Printf("%s", os->GetName());
00870 
00871    // Cleanup
00872    SafeDelete(os);
00873 }
00874 
00875 //______________________________________________________________________________
00876 void TXProofMgr::More(const char *what, const char *how, const char *where)
00877 {
00878    // Run 'more' on the nodes
00879 
00880    // Nothing to do if not in contact with proofserv
00881    if (!IsValid()) {
00882       Warning("More","invalid TXProofMgr - do nothing");
00883       return;
00884    }
00885    // Server may not support it
00886    if (fSocket->GetXrdProofdVersion() < 1006) {
00887       Warning("More", "functionality not supported by server");
00888       return;
00889    }
00890 
00891    // Send the request
00892    TObjString *os = Exec(kMore, what, how, where);
00893 
00894    // Show the result, if any
00895    if (os) Printf("%s", os->GetName());
00896 
00897    // Cleanup
00898    SafeDelete(os);
00899 }
00900 
00901 //______________________________________________________________________________
00902 Int_t TXProofMgr::Rm(const char *what, const char *how, const char *where)
00903 {
00904    // Run 'rm' on the nodes. The user is prompted before removal, unless 'how'
00905    // contains "--force" or a combination of single letter options including 'f',
00906    // e.g. "-fv".
00907 
00908    // Nothing to do if not in contact with proofserv
00909    if (!IsValid()) {
00910       Warning("Rm","invalid TXProofMgr - do nothing");
00911       return -1;
00912    }
00913    // Server may not support it
00914    if (fSocket->GetXrdProofdVersion() < 1006) {
00915       Warning("Rm", "functionality not supported by server");
00916       return -1;
00917    }
00918 
00919    TString prompt, ans("Y"), opt(how);
00920    Bool_t force = kFALSE;
00921    if (!opt.IsNull()) {
00922       TString t;
00923       Int_t from = 0;
00924       while (!force && opt.Tokenize(t, from, " ")) {
00925          if (t == "--force") {
00926             force = kTRUE;
00927          } else if (t.BeginsWith("-") && !t.BeginsWith("--") && t.Contains("f")) {
00928             force = kTRUE;
00929          }
00930       }
00931    }
00932 
00933    if (!force && isatty(0) != 0 && isatty(1) != 0) {
00934       // Really remove the file?
00935       prompt.Form("Do you really want to remove '%s'? [N/y]", what);
00936       ans = "";
00937       while (ans != "N" && ans != "Y") {
00938          ans = Getline(prompt.Data());
00939          ans.Remove(TString::kTrailing, '\n');
00940          if (ans == "") ans = "N";
00941          ans.ToUpper();
00942          if (ans != "N" && ans != "Y")
00943             Printf("Please answer y, Y, n or N");
00944       }
00945    }
00946 
00947    if (ans == "Y") {
00948       // Send the request
00949       TObjString *os = Exec(kRm, what, how, where);
00950       // Show the result, if any
00951       if (os) {
00952          if (gDebug > 1) Printf("%s", os->GetName());
00953          // Cleanup
00954          SafeDelete(os);
00955          // Success
00956          return 0;
00957       }
00958       // Failure
00959       return -1;
00960    }
00961    // Done
00962    return 0;
00963 }
00964 
00965 //______________________________________________________________________________
00966 void TXProofMgr::Tail(const char *what, const char *how, const char *where)
00967 {
00968    // Run 'tail' on the nodes
00969 
00970    // Nothing to do if not in contact with proofserv
00971    if (!IsValid()) {
00972       Warning("Tail","invalid TXProofMgr - do nothing");
00973       return;
00974    }
00975    // Server may not support it
00976    if (fSocket->GetXrdProofdVersion() < 1006) {
00977       Warning("Tail", "functionality not supported by server");
00978       return;
00979    }
00980 
00981    // Send the request
00982    TObjString *os = Exec(kTail, what, how, where);
00983 
00984    // Show the result, if any
00985    if (os) Printf("%s", os->GetName());
00986 
00987    // Cleanup
00988    SafeDelete(os);
00989 }
00990 
00991 //______________________________________________________________________________
00992 Int_t TXProofMgr::Md5sum(const char *what, TString &sum, const char *where)
00993 {
00994    // Run 'md5sum' on one of the nodes
00995 
00996    // Nothing to do if not in contact with proofserv
00997    if (!IsValid()) {
00998       Warning("Md5sum","invalid TXProofMgr - do nothing");
00999       return -1;
01000    }
01001    // Server may not support it
01002    if (fSocket->GetXrdProofdVersion() < 1006) {
01003       Warning("Md5sum", "functionality not supported by server");
01004       return -1;
01005    }
01006 
01007    if (where && !strcmp(where, "all")) {
01008       Warning("Md5sum","cannot run on all nodes at once: please specify one");
01009       return -1;
01010    }
01011 
01012    // Send the request
01013    TObjString *os = Exec(kMd5sum, what, 0, where);
01014 
01015    // Show the result, if any
01016    if (os) {
01017       if (gDebug > 1) Printf("%s", os->GetName());
01018       sum = os->GetName();
01019       // Cleanup
01020       SafeDelete(os);
01021       // Success
01022       return 0;
01023    }
01024    // Failure
01025    return -1;
01026 }
01027 
01028 //______________________________________________________________________________
01029 Int_t TXProofMgr::Stat(const char *what, FileStat_t &st, const char *where)
01030 {
01031    // Run 'stat' on one of the nodes
01032 
01033    // Nothing to do if not in contact with proofserv
01034    if (!IsValid()) {
01035       Warning("Stat","invalid TXProofMgr - do nothing");
01036       return -1;
01037    }
01038    // Server may not support it
01039    if (fSocket->GetXrdProofdVersion() < 1006) {
01040       Warning("Stat", "functionality not supported by server");
01041       return -1;
01042    }
01043 
01044    if (where && !strcmp(where, "all")) {
01045       Warning("Stat","cannot run on all nodes at once: please specify one");
01046       return -1;
01047    }
01048 
01049    // Send the request
01050    TObjString *os = Exec(kStat, what, 0, where);
01051 
01052    // Show the result, if any
01053    if (os) {
01054       if (gDebug > 1) Printf("%s", os->GetName());
01055       Int_t    mode, uid, gid, islink;
01056       Long_t   dev, ino, mtime;
01057       Long64_t size;
01058 #ifdef R__WIN32
01059       sscanf(os->GetName(), "%ld %ld %d %d %d %I64d %ld %d", &dev, &ino, &mode,
01060                             &uid, &gid, &size, &mtime, &islink);
01061 #else
01062       sscanf(os->GetName(), "%ld %ld %d %d %d %lld %ld %d", &dev, &ino, &mode,
01063                             &uid, &gid, &size, &mtime, &islink);
01064 #endif
01065       if (dev == -1)
01066          return -1;
01067       st.fDev    = dev;
01068       st.fIno    = ino;
01069       st.fMode   = mode;
01070       st.fUid    = uid;
01071       st.fGid    = gid;
01072       st.fSize   = size;
01073       st.fMtime  = mtime;
01074       st.fIsLink = (islink == 1);
01075 
01076       // Cleanup
01077       SafeDelete(os);
01078       // Success
01079       return 0;
01080    }
01081    // Failure
01082    return -1;
01083 }
01084 
01085 //______________________________________________________________________________
01086 TObjString *TXProofMgr::Exec(Int_t action,
01087                              const char *what, const char *how, const char *where)
01088 {
01089    // Execute 'action' (see EAdminExecType in 'XProofProtocol.h') at 'where'
01090    // (default master), with options 'how', on 'what'. The option specified by
01091    // 'how' are typically unix option for the relate commands. In addition to
01092    // the unix authorizations, the limitations are:
01093    //
01094    //      action = kRm        limited to the sandbox (but basic dirs cannot be
01095    //                          removed) and on files owned by the user in the
01096    //                          allowed directories
01097    //      action = kTail      option '-f' is not supported and will be ignored
01098    //
01099 
01100    // Nothing to do if not in contact with proofserv
01101    if (!IsValid()) {
01102       Warning("Exec","invalid TXProofMgr - do nothing");
01103       return (TObjString *)0;
01104    }
01105    // Server may not support it
01106    if (fSocket->GetXrdProofdVersion() < 1006) {
01107       Warning("Exec", "functionality not supported by server");
01108       return (TObjString *)0;
01109    }
01110    // Check 'what'
01111    if (!what || strlen(what) <= 0) {
01112       Error("Exec","specifying a path is mandatory");
01113       return (TObjString *)0;
01114    }
01115    // Check the options
01116    TString opt(how);
01117    if (action == kTail && !opt.IsNull()) {
01118       // Keep only static options: -c, --bytes=N, -n , --lines=N, -N
01119       TString opts(how), o;
01120       Int_t from = 0;
01121       Bool_t isc = kFALSE, isn = kFALSE;
01122       while (opts.Tokenize(o, from, " ")) {
01123          // Skip values not starting with '-' is not argument to '-c' or '-n'
01124          if (!o.BeginsWith("-") && !isc && isn) continue;
01125          if (isc) {
01126             opt.Form("-c %s", o.Data());
01127             isc = kFALSE;
01128          }
01129          if (isn) {
01130             opt.Form("-n %s", o.Data());
01131             isn = kFALSE;
01132          }
01133          if (o == "-c") {
01134             isc = kTRUE;
01135          } else if (o == "-n") {
01136             isn = kTRUE;
01137          } else if (o == "--bytes=" || o == "--lines=") {
01138             opt = o;
01139          } else if (o.BeginsWith("-")) {
01140             o.Remove(TString::kLeading,'-');
01141             if (o.IsDigit()) opt.Form("-%s", o.Data());
01142          }
01143       }
01144    }
01145 
01146    // Build the command line
01147    TString cmd(where);
01148    if (cmd.IsNull()) cmd.Form("%s:%d", fUrl.GetHost(), fUrl.GetPort());
01149    cmd += "|";
01150    cmd += what;
01151    cmd += "|";
01152    cmd += opt;
01153 
01154    // On clients, handle Ctrl-C during collection
01155    if (fIntHandler) fIntHandler->Add();
01156 
01157    // Send the request
01158    TObjString *os = fSocket->SendCoordinator(kExec, cmd.Data(), action);
01159 
01160    // On clients, handle Ctrl-C during collection
01161    if (fIntHandler) fIntHandler->Remove();
01162 
01163    // Done
01164    return os;
01165 }
01166 
01167 //______________________________________________________________________________
01168 Int_t TXProofMgr::GetFile(const char *remote, const char *local, const char *opt)
01169 {
01170    // Get file 'remote' into 'local' from the master.
01171    // If opt contains "force", the file, if it exists remotely, is copied in all cases,
01172    // otherwise a check is done on the MD5sum.
01173    // If opt contains "silent" standard notificatons are not printed (errors and
01174    // warnings and prompts still are).
01175    // Return 0 on success, -1 on error.
01176 
01177    Int_t rc = -1;
01178    // Nothing to do if not in contact with proofserv
01179    if (!IsValid()) {
01180       Warning("GetFile", "invalid TXProofMgr - do nothing");
01181       return rc;
01182    }
01183    // Server may not support it
01184    if (fSocket->GetXrdProofdVersion() < 1006) {
01185       Warning("GetFile", "functionality not supported by server");
01186       return rc;
01187    }
01188 
01189    // Check remote path name
01190    TString filerem(remote);
01191    if (filerem.IsNull()) {
01192       Error("GetFile", "remote file path undefined");
01193       return rc;
01194    }
01195 
01196    // Parse option
01197    TString oo(opt);
01198    oo.ToUpper();
01199    Bool_t force = (oo.Contains("FORCE")) ? kTRUE : kFALSE;
01200    Bool_t silent = (oo.Contains("SILENT")) ? kTRUE : kFALSE;
01201 
01202    // Check local path name
01203    TString fileloc(local);
01204    if (fileloc.IsNull()) {
01205       // Set the same as the remote one, in the working dir
01206       fileloc = gSystem->BaseName(filerem);
01207    }
01208    gSystem->ExpandPathName(fileloc);
01209 
01210    // Default open and mode flags
01211 #ifdef WIN32
01212    UInt_t openflags =  O_WRONLY | O_BINARY;
01213 #else
01214    UInt_t openflags =  O_WRONLY;
01215 #endif
01216    UInt_t openmode = 0600;
01217 
01218    // Get information about the local file
01219    UserGroup_t *ugloc = 0;
01220    Int_t rcloc = 0;
01221    FileStat_t stloc;
01222    if ((rcloc = gSystem->GetPathInfo(fileloc, stloc)) == 0) {
01223       if (R_ISDIR(stloc.fMode)) {
01224          // Add the filename of the remote file and re-check
01225          if (!fileloc.EndsWith("/")) fileloc += "/";
01226          fileloc += gSystem->BaseName(filerem);
01227          // Get again the status of the path
01228          rcloc = gSystem->GetPathInfo(fileloc, stloc);
01229       }
01230       if (rcloc == 0) {
01231          // It exists already. If it is not a regular file we cannot continue
01232          if (!R_ISREG(stloc.fMode)) {
01233             if (!silent)
01234                Printf("[GetFile] local file '%s' exists and is not regular: cannot continue",
01235                       fileloc.Data());
01236             return rc;
01237          }
01238          // Get our info
01239          if (!(ugloc = gSystem->GetUserInfo(gSystem->GetUid()))) {
01240             Error("GetFile", "cannot get user info for additional checks");
01241             return rc;
01242          }
01243          // Can we delete or overwrite it ?
01244          Bool_t owner = (ugloc->fUid == stloc.fUid && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
01245          Bool_t group = (!owner && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
01246          Bool_t other = (!owner && !group) ? kTRUE : kFALSE;
01247          delete ugloc;
01248          if ((owner && !(stloc.fMode & kS_IWUSR)) ||
01249              (group && !(stloc.fMode & kS_IWGRP)) || (other && !(stloc.fMode & kS_IWOTH))) {
01250             if (!silent) {
01251                Printf("[GetFile] file '%s' exists: no permission to delete or overwrite the file", fileloc.Data());
01252                Printf("[GetFile] ownership: owner: %d, group: %d, other: %d", owner, group, other);
01253                Printf("[GetFile] mode: %x", stloc.fMode);
01254             }
01255             return rc;
01256          }
01257          // In case we open the file, we need to truncate it
01258          openflags |=  O_CREAT | O_TRUNC;
01259       } else {
01260          // In case we open the file, we need to create it
01261          openflags |=  O_CREAT;
01262       }
01263    } else {
01264       // In case we open the file, we need to create it
01265       openflags |=  O_CREAT;
01266    }
01267 
01268    // Check the remote file exists and get it check sum
01269    TString remsum;
01270    if (Md5sum(filerem, remsum) != 0) {
01271       if (!silent)
01272          Printf("[GetFile] remote file '%s' does not exists or cannot be read", filerem.Data());
01273       return rc;
01274    }
01275 
01276    // If the file exists already locally, check if it is different
01277    bool same = 0;
01278    if (rcloc == 0 && !force) {
01279       TMD5 *md5loc = TMD5::FileChecksum(fileloc);
01280       if (md5loc) {
01281          if (remsum == md5loc->AsString()) {
01282             if (!silent) {
01283                Printf("[GetFile] local file '%s' and remote file '%s' have the same MD5 check sum",
01284                       fileloc.Data(), filerem.Data());
01285                Printf("[GetFile] use option 'force' to override");
01286             }
01287             same = 1;
01288          }
01289          delete md5loc;
01290       }
01291 
01292       // If a different file with the same name exists already, ask what to do
01293       if (!same) {
01294          char *a = Getline("Local file exists already: would you like to overwrite it? [N/y]");
01295          if (a[0] == 'n' || a[0] == 'N' || a[0] == '\0') return 0;
01296       } else {
01297          return 0;
01298       }
01299    }
01300 
01301    // Open the local file for writing
01302    Int_t fdout = open(fileloc, openflags, openmode);
01303    if (fdout < 0) {
01304       Error("GetFile", "could not open local file '%s' for writing: errno: %d", local, errno);
01305       return rc;
01306    }
01307 
01308    // Build the command line
01309    TString cmd(filerem);
01310 
01311    // Disable TXSocket handling while receiving the file (CpProgress processes
01312    // pending events and this could screw-up synchronization in the TXSocket pipe)
01313    gSystem->RemoveFileHandler(TXSocketHandler::GetSocketHandler());
01314 
01315    // Send the request
01316    TStopwatch watch;
01317    watch.Start();
01318    TObjString *os = fSocket->SendCoordinator(kGetFile, cmd.Data());
01319 
01320    if (os) {
01321       // The message contains the size
01322       Long64_t size;
01323       sscanf(os->GetName(), "%lld", &size);
01324       if (size <= 0) {
01325          Error("GetFile", "received null or negative size: %lld", size);
01326          close(fdout);
01327          return rc;
01328       }
01329 
01330       // Receive the file
01331       const Int_t kMAXBUF = 16384;  //32768  //16384  //65536;
01332       char buf[kMAXBUF];
01333 
01334       rc = 0;
01335       Int_t rec, r;
01336       Long64_t filesize = 0, left = 0;
01337       while (rc == 0 && filesize < size) {
01338          left = size - filesize;
01339          if (left > kMAXBUF) left = kMAXBUF;
01340          rec = fSocket->RecvRaw(&buf, left);
01341          filesize = (rec > 0) ? (filesize + rec) : filesize;
01342          if (rec > 0) {
01343             char *p = buf;
01344             r = rec;
01345             while (r) {
01346                Int_t w = 0;
01347                while ((w = write(fdout, p, r)) < 0 && TSystem::GetErrno() == EINTR)
01348                   TSystem::ResetErrno();
01349                if (w < 0) {
01350                   SysError("GetFile", "error writing to unit: %d", fdout);
01351                   rc = -1;
01352                   break;
01353                }
01354                r -= w;
01355                p += w;
01356             }
01357             // Basic progress bar
01358             CpProgress("GetFile", filesize, size, &watch);
01359          } else if (rec < 0) {
01360             rc = -1;
01361             Error("GetFile", "error during receiving file");
01362             break;
01363          }
01364       }
01365       // Finalize the progress bar
01366       CpProgress("GetFile", filesize, size, &watch, kTRUE);
01367 
01368    } else {
01369       Error("GetFile", "size not received");
01370       rc = -1;
01371    }
01372 
01373    // Restore socket handling while receiving the file
01374    gSystem->AddFileHandler(TXSocketHandler::GetSocketHandler());
01375 
01376    // Close local file
01377    close(fdout);
01378    watch.Stop();
01379    watch.Reset();
01380 
01381    if (rc == 0) {
01382       // Check if everything went fine
01383       TMD5 *md5loc = TMD5::FileChecksum(fileloc);
01384       if (!md5loc) {
01385          Error("GetFile", "cannot get MD5 checksum of the new local file '%s'", fileloc.Data());
01386          rc = -1;
01387       } else if (remsum != md5loc->AsString()) {
01388          Error("GetFile", "checksums for the local copy and the remote file differ: {rem:%s,loc:%s}",
01389                            remsum.Data(), md5loc->AsString());
01390          rc = -1;
01391          delete md5loc;
01392       }
01393    }
01394    // Done
01395    return rc;
01396 }
01397 
01398 //______________________________________________________________________________
01399 Int_t TXProofMgr::PutFile(const char *local, const char *remote, const char *opt)
01400 {
01401    // Put file 'local'to 'remote' to the master
01402    // If opt is "force", the file, if it exists remotely, is copied in all cases,
01403    // otherwise a check is done on the MD5sum.
01404    // Return 0 on success, -1 on error
01405 
01406    Int_t rc = -1;
01407    // Nothing to do if not in contact with proofserv
01408    if (!IsValid()) {
01409       Warning("PutFile", "invalid TXProofMgr - do nothing");
01410       return rc;
01411    }
01412    // Server may not support it
01413    if (fSocket->GetXrdProofdVersion() < 1006) {
01414       Warning("PutFile", "functionality not supported by server");
01415       return rc;
01416    }
01417 
01418    // Check local path name
01419    TString fileloc(local);
01420    if (fileloc.IsNull()) {
01421       Error("PutFile", "local file path undefined");
01422       return rc;
01423    }
01424    gSystem->ExpandPathName(fileloc);
01425 
01426    // Parse option
01427    TString oo(opt);
01428    oo.ToUpper();
01429    Bool_t force = (oo == "FORCE") ? kTRUE : kFALSE;
01430 
01431    // Check remote path name
01432    TString filerem(remote);
01433    if (filerem.IsNull()) {
01434       // Set the same as the local one, in the working dir
01435       filerem.Form("~/%s", gSystem->BaseName(fileloc));
01436    } else if (filerem.EndsWith("/")) {
01437       // Remote path is a directory: add the file name as in the local one
01438       filerem += gSystem->BaseName(fileloc);
01439    }
01440 
01441    // Default open flags
01442 #ifdef WIN32
01443    UInt_t openflags =  O_RDONLY | O_BINARY;
01444 #else
01445    UInt_t openflags =  O_RDONLY;
01446 #endif
01447 
01448    // Get information about the local file
01449    Int_t rcloc = 0;
01450    FileStat_t stloc;
01451    if ((rcloc = gSystem->GetPathInfo(fileloc, stloc)) != 0 || !R_ISREG(stloc.fMode)) {
01452       // It dies not exists or it is not a regular file: we cannot continue
01453       const char *why = (rcloc == 0) ? "is not regular" : "does not exists";
01454       Printf("[PutFile] local file '%s' %s: cannot continue", fileloc.Data(), why);
01455       return rc;
01456    }
01457    // Get our info
01458    UserGroup_t *ugloc = 0;
01459    if (!(ugloc = gSystem->GetUserInfo(gSystem->GetUid()))) {
01460       Error("PutFile", "cannot get user info for additional checks");
01461       return rc;
01462    }
01463    // Can we read it ?
01464    Bool_t owner = (ugloc->fUid == stloc.fUid && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
01465    Bool_t group = (!owner && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
01466    Bool_t other = (!owner && !group) ? kTRUE : kFALSE;
01467    delete ugloc;
01468    if ((owner && !(stloc.fMode & kS_IRUSR)) ||
01469        (group && !(stloc.fMode & kS_IRGRP)) || (other && !(stloc.fMode & kS_IROTH))) {
01470       Printf("[PutFile] file '%s': no permission to read the file", fileloc.Data());
01471       Printf("[PutFile] ownership: owner: %d, group: %d, other: %d", owner, group, other);
01472       Printf("[PutFile] mode: %x", stloc.fMode);
01473       return rc;
01474    }
01475 
01476    // Local MD5 sum
01477    TString locsum;
01478    TMD5 *md5loc = TMD5::FileChecksum(fileloc);
01479    if (!md5loc) {
01480       Error("PutFile", "cannot calculate the check sum for '%s'", fileloc.Data());
01481       return rc;
01482    } else {
01483       locsum = md5loc->AsString();
01484       delete md5loc;
01485    }
01486 
01487    // Check the remote file exists and get it check sum
01488    Bool_t same = kFALSE;
01489    FileStat_t strem;
01490    TString remsum;
01491    if (Stat(filerem, strem) == 0) {
01492       if (Md5sum(filerem, remsum) != 0) {
01493          Printf("[PutFile] remote file exists but the check sum calculation failed");
01494          return rc;
01495       }
01496       // Check sums
01497       if (remsum == locsum) {
01498          if (!force) {
01499             Printf("[PutFile] local file '%s' and remote file '%s' have the same MD5 check sum",
01500                               fileloc.Data(), filerem.Data());
01501             Printf("[PutFile] use option 'force' to override");
01502          }
01503          same = kTRUE;
01504       }
01505       if (!force) {
01506          // If a different file with the same name exists already, ask what to do
01507          if (!same) {
01508             char *a = Getline("Remote file exists already: would you like to overwrite it? [N/y]");
01509             if (a[0] == 'n' || a[0] == 'N' || a[0] == '\0') return 0;
01510             force = kTRUE;
01511          } else {
01512             return 0;
01513          }
01514       }
01515    }
01516 
01517    // Open the local file
01518    int fd = open(fileloc.Data(), openflags);
01519    if (fd < 0) {
01520       Error("PutFile", "cannot open file '%s': %d", fileloc.Data(), errno);
01521       return -1;
01522    }
01523 
01524    // Build the command line: 'path size [opt]'
01525    TString cmd;
01526    cmd.Form("%s %lld", filerem.Data(), stloc.fSize);
01527    if (force) cmd += " force";
01528 
01529    // Disable TXSocket handling while sending the file (CpProgress processes
01530    // pending events and this could screw-up synchronization in the TXSocket pipe)
01531    gSystem->RemoveFileHandler(TXSocketHandler::GetSocketHandler());
01532 
01533    // Send the request
01534    TStopwatch watch;
01535    watch.Start();
01536    TObjString *os = fSocket->SendCoordinator(kPutFile, cmd.Data());
01537 
01538    if (os) {
01539 
01540       // Send over the file
01541       const Int_t kMAXBUF = 16384;  //32768  //16384  //65536;
01542       char buf[kMAXBUF];
01543 
01544       Long64_t pos = 0;
01545       lseek(fd, pos, SEEK_SET);
01546 
01547       rc = 0;
01548       while (rc == 0 && pos < stloc.fSize) {
01549          Long64_t left = stloc.fSize - pos;
01550          if (left > kMAXBUF) left = kMAXBUF;
01551          Int_t siz;
01552          while ((siz = read(fd, &buf[0], left)) < 0 && TSystem::GetErrno() == EINTR)
01553             TSystem::ResetErrno();
01554          if (siz < 0 || siz != left) {
01555             Error("PutFile", "error reading from file: errno: %d", errno);
01556             rc = -1;
01557             break;
01558          }
01559          Int_t src = 0;
01560          if ((src = fSocket->fConn->WriteRaw((void *)&buf[0], left)) != left) {
01561             Error("PutFile", "error sending over: errno: %d (rc: %d)", TSystem::GetErrno(), src);
01562             rc = -1;
01563             break;
01564          }
01565          // Basic progress bar
01566          CpProgress("PutFile", pos, stloc.fSize, &watch);
01567          // Re-position
01568          pos += left;
01569       }
01570       // Finalize the progress bar
01571       CpProgress("PutFile", pos, stloc.fSize, &watch, kTRUE);
01572 
01573    } else {
01574       Error("PutFile", "command could not be executed");
01575       rc = -1;
01576    }
01577 
01578    // Restore TXSocket handling
01579    gSystem->AddFileHandler(TXSocketHandler::GetSocketHandler());
01580 
01581    // Close local file
01582    close(fd);
01583    watch.Stop();
01584    watch.Reset();
01585 
01586    if (rc == 0) {
01587       // Check if everything went fine
01588       if (Md5sum(filerem, remsum) != 0) {
01589          Printf("[PutFile] cannot get MD5 checksum of the new remote file '%s'", filerem.Data());
01590          rc = -1;
01591       } else if (remsum != locsum) {
01592          Printf("[PutFile] checksums for the local copy and the remote file differ: {rem:%s, loc:%s}",
01593                            remsum.Data(), locsum.Data());
01594          rc = -1;
01595       }
01596    }
01597 
01598    // Done
01599    return rc;
01600 }
01601 
01602 //______________________________________________________________________________
01603 void TXProofMgr::CpProgress(const char *pfx, Long64_t bytes,
01604                             Long64_t size, TStopwatch *watch, Bool_t cr)
01605 {
01606    // Print file copy progress.
01607 
01608    // Protection
01609    if (!pfx || size == 0 || !watch) return;
01610 
01611    fprintf(stderr, "[%s] Total %.02f MB\t|", pfx, (Double_t)size/1048576);
01612 
01613    for (int l = 0; l < 20; l++) {
01614       if (size > 0) {
01615          if (l < 20*bytes/size)
01616             fprintf(stderr, "=");
01617          else if (l == 20*bytes/size)
01618             fprintf(stderr, ">");
01619          else if (l > 20*bytes/size)
01620             fprintf(stderr, ".");
01621       } else
01622          fprintf(stderr, "=");
01623    }
01624    // Allow to update the GUI while uploading files
01625    gSystem->ProcessEvents();
01626    watch->Stop();
01627    Double_t copytime = watch->RealTime();
01628    fprintf(stderr, "| %.02f %% [%.01f MB/s]\r",
01629            100.0*(size?(bytes/size):1), bytes/copytime/1048576.);
01630    if (cr) fprintf(stderr, "\n");
01631    watch->Continue();
01632 }
01633 
01634 //______________________________________________________________________________
01635 Int_t TXProofMgr::Cp(const char *src, const char *dst, const char *fmt)
01636 {
01637    // Copy files in/out of the sandbox. Either 'src' or 'dst' must be in the
01638    // sandbox.
01639    // Return 0 on success, -1 on error
01640 
01641    Int_t rc = -1;
01642    // Nothing to do if not in contact with proofserv
01643    if (!IsValid()) {
01644       Warning("Cp", "invalid TXProofMgr - do nothing");
01645       return rc;
01646    }
01647    // Server may not support it
01648    if (fSocket->GetXrdProofdVersion() < 1006) {
01649       Warning("Cp", "functionality not supported by server");
01650       return rc;
01651    }
01652 
01653    // Check source path name
01654    TString filesrc(src);
01655    if (filesrc.IsNull()) {
01656       Error("Cp", "source file path undefined");
01657       return rc;
01658    }
01659    // Check destination path name
01660    TString filedst(dst);
01661    if (filedst.IsNull()) {
01662       filedst = gSystem->BaseName(TUrl(filesrc.Data()).GetFile());
01663    } else if (filedst.EndsWith("/")) {
01664       // Remote path is a directory: add the file name as in the local one
01665       filedst += gSystem->BaseName(filesrc);
01666    }
01667 
01668    // Make sure that local files are in the format file://host/<file> otherwise
01669    // the URL class in the server will not parse them correctly
01670    TUrl usrc = TUrl(filesrc.Data(), kTRUE).GetUrl();
01671    filesrc = usrc.GetUrl();
01672    if (!strcmp(usrc.GetProtocol(), "file"))
01673       filesrc.Form("file://host/%s", usrc.GetFileAndOptions());
01674    TUrl udst = TUrl(filedst.Data(), kTRUE).GetUrl();
01675    filedst = udst.GetUrl();
01676    if (!strcmp(udst.GetProtocol(), "file"))
01677       filedst.Form("file://host/%s", udst.GetFileAndOptions());
01678 
01679    // Prepare the command
01680    TString cmd;
01681    cmd.Form("%s %s %s", filesrc.Data(), filedst.Data(), (fmt ? fmt : ""));
01682 
01683    // On clients, handle Ctrl-C during collection
01684    if (fIntHandler) fIntHandler->Add();
01685 
01686    // Send the request
01687    TObjString *os = fSocket->SendCoordinator(kCpFile, cmd.Data());
01688 
01689    // On clients, handle Ctrl-C during collection
01690    if (fIntHandler) fIntHandler->Remove();
01691 
01692    // Show the result, if any
01693    if (os) {
01694       if (gDebug > 0) Printf("%s", os->GetName());
01695       rc = 0;
01696    }
01697 
01698    // Done
01699    return rc;
01700 }

Generated on Tue Jul 5 14:52:27 2011 for ROOT_528-00b_version by  doxygen 1.5.1