TProofLite.cxx

Go to the documentation of this file.
00001 // @(#)root/proof:$Id: TProofLite.cxx 38349 2011-03-09 19:05:39Z ganis $
00002 // Author: G. Ganis March 2008
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TProofLite                                                           //
00015 //                                                                      //
00016 // This class starts a PROOF session on the local machine: no daemons,  //
00017 // client and master merged, communications via UNIX-like sockets.      //
00018 // By default the number of workers started is NumberOfCores+1; a       //
00019 // different number can be forced on construction.                      //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 #include "TProofLite.h"
00024 
00025 #ifdef WIN32
00026 #   include <io.h>
00027 #   include "snprintf.h"
00028 #endif
00029 #include "RConfigure.h"
00030 #include "TDSet.h"
00031 #include "TEnv.h"
00032 #include "TError.h"
00033 #include "TFile.h"
00034 #include "TFileCollection.h"
00035 #include "TFileInfo.h"
00036 #include "THashList.h"
00037 #include "TMessage.h"
00038 #include "TMonitor.h"
00039 #include "TObjString.h"
00040 #include "TPluginManager.h"
00041 #include "TDataSetManager.h"
00042 #include "TProofQueryResult.h"
00043 #include "TProofServ.h"
00044 #include "TQueryResultManager.h"
00045 #include "TROOT.h"
00046 #include "TServerSocket.h"
00047 #include "TSlave.h"
00048 #include "TSortedList.h"
00049 #include "TTree.h"
00050 #include "TVirtualProofPlayer.h"
00051 #include "TSelector.h"
00052 
00053 ClassImp(TProofLite)
00054 
00055 Int_t TProofLite::fgWrksMax = -2; // Unitialized max number of workers
00056 
00057 //______________________________________________________________________________
00058 TProofLite::TProofLite(const char *url, const char *conffile, const char *confdir,
00059                        Int_t loglevel, const char *alias, TProofMgr *mgr)
00060 {
00061    // Create a PROOF environment. Starting PROOF involves either connecting
00062    // to a master server, which in turn will start a set of slave servers, or
00063    // directly starting as master server (if master = ""). Masterurl is of
00064    // the form: [proof[s]://]host[:port]. Conffile is the name of the config
00065    // file describing the remote PROOF cluster (this argument alows you to
00066    // describe different cluster configurations).
00067    // The default is proof.conf. Confdir is the directory where the config
00068    // file and other PROOF related files are (like motd and noproof files).
00069    // Loglevel is the log level (default = 1). User specified custom config
00070    // files will be first looked for in $HOME/.conffile.
00071 
00072    fUrl.SetUrl(url);
00073 
00074    // This may be needed during init
00075    fManager = mgr;
00076 
00077    // Default server type
00078    fServType = TProofMgr::kProofLite;
00079 
00080    // Default query mode
00081    fQueryMode = kSync;
00082 
00083    // Client and master are merged
00084    fMasterServ = kTRUE;
00085    SetBit(TProof::kIsClient);
00086    SetBit(TProof::kIsMaster);
00087 
00088    // Flag that we are a client
00089    if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
00090    
00091    // Protocol and Host
00092    fUrl.SetProtocol("proof");
00093    fUrl.SetHost("__lite__");
00094    fUrl.SetPort(1093);
00095 
00096    // User
00097    if (strlen(fUrl.GetUser()) <= 0) {
00098       // Get user logon name
00099       UserGroup_t *pw = gSystem->GetUserInfo();
00100       if (pw) {
00101          fUrl.SetUser(pw->fUser);
00102          delete pw;
00103       }
00104    }
00105    fMaster = gSystem->HostName();
00106 
00107    // Analysise the conffile field
00108    ParseConfigField(conffile);
00109 
00110    // Determine the number of workers giving priority to users request.
00111    // Otherwise use the system information, if available, or just start
00112    // the minimal number, i.e. 2 .
00113    if ((fNWorkers = GetNumberOfWorkers(url)) > 0) {
00114 
00115       Printf(" +++ Starting PROOF-Lite with %d workers +++", fNWorkers);
00116       // Init the session now
00117       Init(url, conffile, confdir, loglevel, alias);
00118    }
00119 
00120    // For final cleanup
00121    if (!gROOT->GetListOfProofs()->FindObject(this))
00122       gROOT->GetListOfProofs()->Add(this);
00123 
00124    // Still needed by the packetizers: needs to be changed
00125    gProof = this;
00126 }
00127 
00128 //______________________________________________________________________________
00129 Int_t TProofLite::Init(const char *, const char *conffile,
00130                        const char *confdir, Int_t loglevel, const char *)
00131 {
00132    // Start the PROOF environment. Starting PROOF involves either connecting
00133    // to a master server, which in turn will start a set of slave servers, or
00134    // directly starting as master server (if master = ""). For a description
00135    // of the arguments see the TProof ctor. Returns the number of started
00136    // master or slave servers, returns 0 in case of error, in which case
00137    // fValid remains false.
00138 
00139    R__ASSERT(gSystem);
00140 
00141    fValid = kFALSE;
00142 
00143    if (TestBit(TProof::kIsMaster)) {
00144       // Fill default conf file and conf dir
00145       if (!conffile || strlen(conffile) == 0)
00146          fConfFile = kPROOF_ConfFile;
00147       if (!confdir  || strlen(confdir) == 0)
00148          fConfDir  = kPROOF_ConfDir;
00149    } else {
00150       fConfDir     = confdir;
00151       fConfFile    = conffile;
00152    }
00153 
00154    // The sandbox for this session
00155    if (CreateSandbox() != 0) {
00156       Error("Init", "could not create/assert sandbox for this session");
00157       return 0;
00158    }
00159 
00160    // UNIX path for communication with workers
00161    TString sockpathdir = gEnv->GetValue("ProofLite.SockPathDir", gSystem->TempDirectory());
00162    if (sockpathdir.IsNull()) sockpathdir = gSystem->TempDirectory();
00163    if (sockpathdir(sockpathdir.Length()-1) == '/') sockpathdir.Remove(sockpathdir.Length()-1);
00164    fSockPath.Form("%s/plite-%d", sockpathdir.Data(), gSystem->GetPid());
00165    if (fSockPath.Length() > 104) {
00166       // Sort of hardcoded limit length for Unix systems
00167       Error("Init", "Unix socket path '%s' is too long (%d bytes):",
00168                     fSockPath.Data(), fSockPath.Length());
00169       Error("Init", "use 'ProofLite.SockPathDir' to create it under a directory different"
00170                     " from '%s'", sockpathdir.Data());
00171       return 0;
00172    }
00173 
00174    fLogLevel       = loglevel;
00175    fProtocol       = kPROOF_Protocol;
00176    fSendGroupView  = kTRUE;
00177    fImage          = "<local>";
00178    fIntHandler     = 0;
00179    fStatus         = 0;
00180    fRecvMessages   = new TList;
00181    fRecvMessages->SetOwner(kTRUE);
00182    fSlaveInfo      = 0;
00183    fChains         = new TList;
00184    fAvailablePackages = 0;
00185    fEnabledPackages = 0;
00186    fEndMaster      = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
00187    fInputData      = 0;
00188    ResetBit(TProof::kNewInputData);
00189 
00190    // Timeout for some collect actions
00191    fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
00192 
00193    fProgressDialog        = 0;
00194    fProgressDialogStarted = kFALSE;
00195 
00196    // Client logging of messages from the workers
00197    fRedirLog = kFALSE;
00198    if (TestBit(TProof::kIsClient)) {
00199       fLogFileName = Form("%s/session-%s.log", fWorkDir.Data(), GetName());
00200       if ((fLogFileW = fopen(fLogFileName.Data(), "w")) == 0)
00201          Error("Init", "could not create temporary logfile %s", fLogFileName.Data());
00202       if ((fLogFileR = fopen(fLogFileName.Data(), "r")) == 0)
00203          Error("Init", "could not open logfile %s for reading", fLogFileName.Data());
00204    }
00205    fLogToWindowOnly = kFALSE;
00206 
00207    fCacheLock = new TProofLockPath(TString::Format("%s/%s%s", gSystem->TempDirectory(),
00208                                    kPROOF_CacheLockFile,
00209                                    TString(fCacheDir).ReplaceAll("/","%").Data()));
00210 
00211    // Create 'queries' locker instance and lock it
00212    fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s", gSystem->TempDirectory(),
00213                                    kPROOF_QueryLockFile, GetName(),
00214                                    TString(fQueryDir).ReplaceAll("/","%").Data()));
00215    fQueryLock->Lock();
00216    // Create the query manager
00217    fQMgr = new TQueryResultManager(fQueryDir, GetName(), fWorkDir,
00218                                    fQueryLock, fLogFileW);
00219 
00220    // Apply quotas, if any
00221    if (fQMgr && fQMgr->ApplyMaxQueries(10) != 0)
00222       Warning("Init", "problems applying fMaxQueries");
00223 
00224    if (InitDataSetManager() != 0)
00225       Warning("Init", "problems initializing the dataset manager");
00226 
00227    // Status of cluster
00228    fNotIdle = 0;
00229 
00230    // Query type
00231    fSync = kTRUE;
00232 
00233    // List of queries
00234    fQueries = 0;
00235    fOtherQueries = 0;
00236    fDrawQueries = 0;
00237    fMaxDrawQueries = 1;
00238    fSeqNum = 0;
00239 
00240    // Remote ID of the session
00241    fSessionID = -1;
00242 
00243    // Part of active query
00244    fWaitingSlaves = 0;
00245 
00246    // Make remote PROOF player
00247    fPlayer = 0;
00248    MakePlayer("lite");
00249 
00250    fFeedback = new TList;
00251    fFeedback->SetOwner();
00252    fFeedback->SetName("FeedbackList");
00253    AddInput(fFeedback);
00254 
00255    // Sort workers by descending performance index
00256    fSlaves           = new TSortedList(kSortDescending);
00257    fActiveSlaves     = new TList;
00258    fInactiveSlaves   = new TList;
00259    fUniqueSlaves     = new TList;
00260    fAllUniqueSlaves  = new TList;
00261    fNonUniqueMasters = new TList;
00262    fBadSlaves        = new TList;
00263    fAllMonitor       = new TMonitor;
00264    fActiveMonitor    = new TMonitor;
00265    fUniqueMonitor    = new TMonitor;
00266    fAllUniqueMonitor = new TMonitor;
00267    fCurrentMonitor   = 0;
00268    fServSock         = 0;
00269 
00270    // Control how to start the workers; copy-on-write (fork) is *very*
00271    // experimental and available on Unix only.
00272    fForkStartup      = kFALSE;
00273    if (gEnv->GetValue("ProofLite.ForkStartup", 0) != 0) {
00274 #ifndef WIN32
00275       fForkStartup   = kTRUE;
00276 #else
00277       Warning("Init", "fork-based workers startup is not available on Windows - ignoring");
00278 #endif
00279    }
00280 
00281    fPackageLock             = 0;
00282    fEnabledPackagesOnClient = 0;
00283    fLoadedMacros            = 0;
00284    fGlobalPackageDirList    = 0;
00285    if (TestBit(TProof::kIsClient)) {
00286 
00287       // List of directories where to look for global packages
00288       TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
00289       if (globpack.Length() > 0) {
00290          Int_t ng = 0;
00291          Int_t from = 0;
00292          TString ldir;
00293          while (globpack.Tokenize(ldir, from, ":")) {
00294             if (gSystem->AccessPathName(ldir, kReadPermission)) {
00295                Warning("Init", "directory for global packages %s does not"
00296                                " exist or is not readable", ldir.Data());
00297             } else {
00298                // Add to the list, key will be "G<ng>", i.e. "G0", "G1", ...
00299                TString key = Form("G%d", ng++);
00300                if (!fGlobalPackageDirList) {
00301                   fGlobalPackageDirList = new THashList();
00302                   fGlobalPackageDirList->SetOwner();
00303                }
00304                fGlobalPackageDirList->Add(new TNamed(key,ldir));
00305             }
00306          }
00307       }
00308 
00309       TString lockpath(fPackageDir);
00310       lockpath.ReplaceAll("/", "%");
00311       lockpath.Insert(0, TString::Format("%s/%s", gSystem->TempDirectory(), kPROOF_PackageLockFile));
00312       fPackageLock = new TProofLockPath(lockpath.Data());
00313 
00314       fEnabledPackagesOnClient = new TList;
00315       fEnabledPackagesOnClient->SetOwner();
00316    }
00317 
00318    // Start workers
00319    if (SetupWorkers(0) != 0) {
00320       Error("Init", "problems setting up workers");
00321       return 0;
00322    }
00323 
00324    // we are now properly initialized
00325    fValid = kTRUE;
00326 
00327    // De-activate monitor (will be activated in Collect)
00328    fAllMonitor->DeActivateAll();
00329 
00330    // By default go into parallel mode
00331    GoParallel(9999, kFALSE);
00332 
00333    // Send relevant initial state to slaves
00334    SendInitialState();
00335 
00336    SetActive(kFALSE);
00337 
00338    if (IsValid()) {
00339       // Activate input handler
00340       ActivateAsyncInput();
00341       // Set PROOF to running state
00342       SetRunStatus(TProof::kRunning);
00343    }
00344    // We register the session as a socket so that cleanup is done properly
00345    R__LOCKGUARD2(gROOTMutex);
00346    gROOT->GetListOfSockets()->Add(this);
00347 
00348    return fActiveSlaves->GetSize();
00349 }
00350 //______________________________________________________________________________
00351 TProofLite::~TProofLite()
00352 {
00353    // Destructor
00354 
00355    // Shutdown the workers
00356    RemoveWorkers(0);
00357 
00358    if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
00359       // needed in case fQueryDir is on NFS ?!
00360       gSystem->MakeDirectory(fQueryDir+"/.delete");
00361       gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
00362    }
00363 
00364    // Remove lock file
00365    if (fQueryLock) {
00366       gSystem->Unlink(fQueryLock->GetName());
00367       fQueryLock->Unlock();
00368    }
00369 
00370    // Cleanup the socket
00371    SafeDelete(fServSock);
00372    gSystem->Unlink(fSockPath);
00373 }
00374 
00375 //______________________________________________________________________________
00376 Int_t TProofLite::GetNumberOfWorkers(const char *url)
00377 {
00378    // Static method to determine the number of workers giving priority to users request.
00379    // Otherwise use the system information, if available, or just start
00380    // the minimal number, i.e. 2 .
00381 
00382    Bool_t notify = kFALSE;
00383    if (fgWrksMax == -2) {
00384       // Find the max number of workers, if any
00385       TString sysname = "system.rootrc";
00386 #ifdef ROOTETCDIR
00387       char *s = gSystem->ConcatFileName(ROOTETCDIR, sysname);
00388 #else
00389       TString etc = gRootDir;
00390 #ifdef WIN32
00391       etc += "\\etc";
00392 #else
00393       etc += "/etc";
00394 #endif
00395       char *s = gSystem->ConcatFileName(etc, sysname);
00396 #endif
00397       TEnv sysenv(0);
00398       sysenv.ReadFile(s, kEnvGlobal);
00399       fgWrksMax = sysenv.GetValue("ProofLite.MaxWorkers", -1);
00400       // Notify once the user if its will is changed
00401       notify = kTRUE;
00402       if (s) delete[] s;
00403    }
00404    if (fgWrksMax == 0) {
00405       ::Error("TProofLite::GetNumberOfWorkers",
00406               "PROOF-Lite disabled by the system administrator: sorry!");
00407       return 0;
00408    }
00409 
00410    Int_t nWorkers = -1;
00411    if (url && strlen(url)) {
00412       TString o(url);
00413       Int_t in = o.Index("workers=");
00414       if (in != kNPOS) {
00415          o.Remove(0, in + strlen("workers="));
00416          while (!o.IsDigit())
00417             o.Remove(o.Length()-1);
00418          nWorkers = (!o.IsNull()) ? o.Atoi() : nWorkers;
00419       }
00420    }
00421    if (nWorkers <= 0) {
00422       nWorkers = gEnv->GetValue("ProofLite.Workers", -1);
00423       if (nWorkers <= 0) {
00424          SysInfo_t si;
00425          if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
00426             nWorkers = si.fCpus;
00427          } else {
00428             // Two workers by default
00429             nWorkers = 2;
00430          }
00431          if (notify) notify = kFALSE;
00432       }
00433    }
00434    // Apply the max, if any
00435    if (fgWrksMax > 0 && fgWrksMax < nWorkers) {
00436       if (notify)
00437          ::Warning("TProofLite::GetNumberOfWorkers", "number of PROOF-Lite workers limited by"
00438                                                      " the system administrator to %d", fgWrksMax);
00439       nWorkers = fgWrksMax;
00440    }
00441 
00442    // Done
00443    return nWorkers;
00444 }
00445 
00446 //______________________________________________________________________________
00447 Int_t TProofLite::SetupWorkers(Int_t opt, TList *startedWorkers)
00448 {
00449    // Start up PROOF workers.
00450 
00451    // Create server socket on the assigned UNIX sock path
00452    if (!fServSock) {
00453       if ((fServSock = new TServerSocket(fSockPath))) {
00454          R__LOCKGUARD2(gROOTMutex);
00455          // Remove from the list so that cleanup can be done in the correct order
00456          gROOT->GetListOfSockets()->Remove(fServSock);
00457       }
00458    }
00459    if (!fServSock || !fServSock->IsValid()) {
00460       Error("SetupWorkers",
00461             "unable to create server socket for internal communications");
00462       SetBit(kInvalidObject);
00463       return -1;
00464    }
00465 
00466    // Create a monitor and add the socket to it
00467    TMonitor *mon = new TMonitor;
00468    mon->Add(fServSock);
00469 
00470    TList started;
00471    TSlave *wrk = 0;
00472    Int_t nWrksDone = 0, nWrksTot = -1;
00473    TString fullord;
00474 
00475    if (opt == 0) {
00476       nWrksTot = fForkStartup ? 1 : fNWorkers;
00477       // Now we create the worker applications which will call us back to finalize
00478       // the setup
00479       Int_t ord = 0;
00480       for (; ord < nWrksTot; ord++) {
00481 
00482          // Ordinal for this worker server
00483          fullord = Form("0.%d", ord);
00484 
00485          // Create environment files
00486          SetProofServEnv(fullord);
00487 
00488          // Create worker server and add to the list
00489          if ((wrk = CreateSlave("lite", fullord, 100, fImage, fWorkDir)))
00490             started.Add(wrk);
00491 
00492          // Notify
00493          NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
00494 
00495       } //end of worker loop
00496    } else {
00497       if (!fForkStartup) {
00498          Warning("SetupWorkers", "standard startup: workers already started");
00499          return -1;
00500       }
00501       nWrksTot = fNWorkers - 1;
00502       // Now we create the worker applications which will call us back to finalize
00503       // the setup
00504       TString clones;
00505       Int_t ord = 0;
00506       for (; ord < nWrksTot; ord++) {
00507 
00508          // Ordinal for this worker server
00509          fullord.Form("0.%d", ord + 1);
00510          if (!clones.IsNull()) clones += " ";
00511          clones += fullord;
00512 
00513          // Create worker server and add to the list
00514          if ((wrk = CreateSlave("lite", fullord, -1, fImage, fWorkDir)))
00515             started.Add(wrk);
00516 
00517          // Notify
00518          NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
00519 
00520       } //end of worker loop
00521 
00522       // Send the request
00523       TMessage m(kPROOF_FORK);
00524       m << clones;
00525       Broadcast(m, kActive);
00526    }
00527 
00528    // Wait for call backs
00529    nWrksDone = 0;
00530    nWrksTot = started.GetSize();
00531    Int_t nSelects = 0;
00532    Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
00533    while (started.GetSize() > 0 && nSelects < nWrksTot) {
00534 
00535       // Wait for activity on the socket for max 5 secs
00536       TSocket *xs = mon->Select(to);
00537 
00538       // Count attempts and check
00539       nSelects++;
00540       if (xs == (TSocket *) -1) continue;
00541 
00542       // Get the connection
00543       TSocket *s = fServSock->Accept();
00544       if (s && s->IsValid()) {
00545          // Receive ordinal
00546          TMessage *msg = 0;
00547          s->Recv(msg);
00548          if (msg) {
00549             TString ord;
00550             *msg >> ord;
00551             // Find who is calling back
00552             if ((wrk = (TSlave *) started.FindObject(ord))) {
00553                // Remove it from the started list
00554                started.Remove(wrk);
00555 
00556                // Assign tis socket the selected worker
00557                wrk->SetSocket(s);
00558                // Remove socket from global TROOT socket list. Only the TProof object,
00559                // representing all worker sockets, will be added to this list. This will
00560                // ensure the correct termination of all proof servers in case the
00561                // root session terminates.
00562                {  R__LOCKGUARD2(gROOTMutex);
00563                   gROOT->GetListOfSockets()->Remove(s);
00564                }
00565                if (wrk->IsValid()) {
00566                   // Set the input handler
00567                   wrk->SetInputHandler(new TProofInputHandler(this, wrk->GetSocket()));
00568                   // Set fParallel to 1 for workers since they do not
00569                   // report their fParallel with a LOG_DONE message
00570                   wrk->fParallel = 1;
00571                   // Finalize setup of the server
00572                   wrk->SetupServ(TSlave::kSlave, 0);
00573                }
00574 
00575                // Monitor good workers
00576                if (wrk->IsValid()) {
00577                   fSlaves->Add(wrk);
00578                   if (opt == 1) fActiveSlaves->Add(wrk);
00579                   fAllMonitor->Add(wrk->GetSocket());
00580                   // Record also in the list for termination
00581                   if (startedWorkers) startedWorkers->Add(wrk);
00582                   // Notify startup operations
00583                   NotifyStartUp("Setting up worker servers", ++nWrksDone, nWrksTot);
00584                } else {
00585                   // Flag as bad
00586                   fBadSlaves->Add(wrk);
00587                }
00588             }
00589          }
00590       }
00591    }
00592 
00593    // Cleanup the monitor and the server socket
00594    mon->DeActivateAll();
00595    delete mon;
00596 
00597    // Create Progress dialog, if needed
00598    if (!gROOT->IsBatch() && !fProgressDialog) {
00599       if ((fProgressDialog =
00600          gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
00601          if (fProgressDialog->LoadPlugin() == -1)
00602             fProgressDialog = 0;
00603    }
00604 
00605    if (opt == 1) {
00606       // Collect replies
00607       Collect(kActive);
00608       // Update group view
00609       SendGroupView();
00610       // By default go into parallel mode
00611       SetParallel(9999, 0);
00612    }
00613    // Done
00614    return 0;
00615 }
00616 
00617 //______________________________________________________________________________
00618 void TProofLite::NotifyStartUp(const char *action, Int_t done, Int_t tot)
00619 {
00620    // Notify setting-up operation message
00621 
00622    Int_t frac = (Int_t) (done*100.)/tot;
00623    char msg[512] = {0};
00624    if (frac >= 100) {
00625       snprintf(msg, 512, "%s: OK (%d workers)                 \n",
00626                    action, tot);
00627    } else {
00628       snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
00629                    action, done, tot, frac);
00630    }
00631    fprintf(stderr,"%s", msg);
00632 }
00633 
00634 //______________________________________________________________________________
00635 Int_t TProofLite::SetProofServEnv(const char *ord)
00636 {
00637    // Create environment files for worker 'ord'
00638 
00639    // Check input
00640    if (!ord || strlen(ord) <= 0) {
00641       Error("SetProofServEnv", "ordinal string undefined");
00642       return -1;
00643    }
00644 
00645    // ROOT env file
00646    TString rcfile(Form("%s/worker-%s.rootrc", fWorkDir.Data(), ord));
00647    FILE *frc = fopen(rcfile.Data(), "w");
00648    if (!frc) {
00649       Error("SetProofServEnv", "cannot open rc file %s", rcfile.Data());
00650       return -1;
00651    }
00652 
00653    // The session working dir depends on the role
00654    fprintf(frc,"# The session working dir\n");
00655    fprintf(frc,"ProofServ.SessionDir: %s/worker-%s\n", fWorkDir.Data(), ord);
00656 
00657    // The session unique tag
00658    fprintf(frc,"# Session tag\n");
00659    fprintf(frc,"ProofServ.SessionTag: %s\n", GetName());
00660 
00661    // Log / Debug level
00662    fprintf(frc,"# Proof Log/Debug level\n");
00663    fprintf(frc,"Proof.DebugLevel: %d\n", gDebug);
00664 
00665    // Ordinal number
00666    fprintf(frc,"# Ordinal number\n");
00667    fprintf(frc,"ProofServ.Ordinal: %s\n", ord);
00668 
00669    // ROOT Version tag
00670    fprintf(frc,"# ROOT Version tag\n");
00671    fprintf(frc,"ProofServ.RootVersionTag: %s\n", gROOT->GetVersion());
00672 
00673    // Work dir
00674    TString sandbox = gEnv->GetValue("ProofServ.Sandbox", fSandbox);
00675    gSystem->ExpandPathName(sandbox);
00676    fprintf(frc,"# Users sandbox\n");
00677    fprintf(frc, "ProofServ.Sandbox: %s\n", sandbox.Data());
00678 
00679    // Cache dir
00680    fprintf(frc,"# Users cache\n");
00681    fprintf(frc, "ProofServ.CacheDir: %s\n", fCacheDir.Data());
00682 
00683    // Package dir
00684    fprintf(frc,"# Users packages\n");
00685    fprintf(frc, "ProofServ.PackageDir: %s\n", fPackageDir.Data());
00686 
00687    // Image
00688    fprintf(frc,"# Server image\n");
00689    fprintf(frc, "ProofServ.Image: %s\n", fImage.Data());
00690 
00691    // Set Open socket
00692    fprintf(frc,"# Open socket\n");
00693    fprintf(frc, "ProofServ.OpenSock: %s\n", fSockPath.Data());
00694 
00695    // Client Protocol
00696    fprintf(frc,"# Client Protocol\n");
00697    fprintf(frc, "ProofServ.ClientVersion: %d\n", kPROOF_Protocol);
00698 
00699    // ROOT env file created
00700    fclose(frc);
00701 
00702    // System env file
00703    TString envfile(Form("%s/worker-%s.env", fWorkDir.Data(), ord));
00704    FILE *fenv = fopen(envfile.Data(), "w");
00705    if (!fenv) {
00706       Error("SetProofServEnv", "cannot open env file %s", envfile.Data());
00707       return -1;
00708    }
00709    // ROOTSYS
00710 #ifdef R__HAVE_CONFIG
00711    fprintf(fenv, "export ROOTSYS=%s\n", ROOTPREFIX);
00712 #else
00713    fprintf(fenv, "export ROOTSYS=%s\n", gSystem->Getenv("ROOTSYS"));
00714 #endif
00715    // Conf dir
00716 #ifdef R__HAVE_CONFIG
00717    fprintf(fenv, "export ROOTCONFDIR=%s\n", ROOTETCDIR);
00718 #else
00719    fprintf(fenv, "export ROOTCONFDIR=%s\n", gSystem->Getenv("ROOTSYS"));
00720 #endif
00721    // TMPDIR
00722    fprintf(fenv, "export TMPDIR=%s\n", gSystem->TempDirectory());
00723    // Log file in the log dir
00724    TString logfile(Form("%s/worker-%s.log", fWorkDir.Data(), ord));
00725    fprintf(fenv, "export ROOTPROOFLOGFILE=%s\n", logfile.Data());
00726    // RC file
00727    fprintf(fenv, "export ROOTRCFILE=%s\n", rcfile.Data());
00728    // ROOT version tag (needed in building packages)
00729    fprintf(fenv, "export ROOTVERSIONTAG=%s\n", gROOT->GetVersion());
00730    // This flag can be used to identify the type of worker; for example, in BUILD.sh or SETUP.C ...
00731    fprintf(fenv, "export ROOTPROOFLITE=%d\n", fNWorkers);
00732    // Set the user envs
00733    if (fgProofEnvList) {
00734       TString namelist;
00735       TIter nxenv(fgProofEnvList);
00736       TNamed *env = 0;
00737       while ((env = (TNamed *)nxenv())) {
00738          TString senv(env->GetTitle());
00739          ResolveKeywords(senv, logfile.Data());
00740          fprintf(fenv, "export %s=%s\n", env->GetName(), senv.Data());
00741          if (namelist.Length() > 0)
00742             namelist += ',';
00743          namelist += env->GetName();
00744       }
00745       fprintf(fenv, "export PROOF_ALLVARS=%s\n", namelist.Data());
00746    }
00747 
00748    // System env file created
00749    fclose(fenv);
00750 
00751    // Done
00752    return 0;
00753 }
00754 
00755 //__________________________________________________________________________
00756 void TProofLite::ResolveKeywords(TString &s, const char *logfile)
00757 {
00758    // Resolve some keywords in 's'
00759    //    <logfileroot>, <user>, <rootsys>
00760 
00761    if (!logfile) return;
00762 
00763    // Log file
00764    if (s.Contains("<logfilewrk>") && logfile) {
00765       TString lfr(logfile);
00766       if (lfr.EndsWith(".log")) lfr.Remove(lfr.Last('.'));
00767       s.ReplaceAll("<logfilewrk>", lfr.Data());
00768    }
00769 
00770    // user
00771    if (gSystem->Getenv("USER") && s.Contains("<user>")) {
00772       s.ReplaceAll("<user>", gSystem->Getenv("USER"));
00773    }
00774 
00775    // rootsys
00776    if (gSystem->Getenv("ROOTSYS") && s.Contains("<rootsys>")) {
00777       s.ReplaceAll("<rootsys>", gSystem->Getenv("ROOTSYS"));
00778    }
00779 }
00780 
00781 //______________________________________________________________________________
00782 Int_t TProofLite::CreateSandbox()
00783 {
00784    // Create the sandbox for this session
00785 
00786    // Make sure the sandbox area exist and is writable
00787    fSandbox = gEnv->GetValue("ProofLite.Sandbox", "");
00788    if (fSandbox.IsNull())
00789       fSandbox = gEnv->GetValue("Proof.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
00790    gSystem->ExpandPathName(fSandbox);
00791    if (AssertPath(fSandbox, kTRUE) != 0) return -1;
00792 
00793    // Package Dir
00794    fPackageDir = gEnv->GetValue("Proof.PackageDir", "");
00795    if (fPackageDir.IsNull())
00796       fPackageDir.Form("%s/%s", fSandbox.Data(), kPROOF_PackDir);
00797    if (AssertPath(fPackageDir, kTRUE) != 0) return -1;
00798 
00799    // Cache Dir
00800    fCacheDir = gEnv->GetValue("Proof.CacheDir", "");
00801    if (fCacheDir.IsNull())
00802       fCacheDir.Form("%s/%s", fSandbox.Data(), kPROOF_CacheDir);
00803    if (AssertPath(fCacheDir, kTRUE) != 0) return -1;
00804 
00805    // Data Set Dir
00806    fDataSetDir = gEnv->GetValue("Proof.DataSetDir", "");
00807    if (fDataSetDir.IsNull())
00808       fDataSetDir.Form("%s/%s", fSandbox.Data(), kPROOF_DataSetDir);
00809    if (AssertPath(fDataSetDir, kTRUE) != 0) return -1;
00810 
00811    // Session unique tag (name of this TProof instance)
00812    TString stag;
00813    stag.Form("%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
00814    SetName(stag.Data());
00815 
00816    // Subpath for this session in the fSandbox (<sandbox>/path-to-working-dir)
00817    TString sessdir(gSystem->WorkingDirectory());
00818    sessdir.ReplaceAll(gSystem->HomeDirectory(),"");
00819    sessdir.ReplaceAll("/","-");
00820    sessdir.Replace(0,1,"/",1);
00821    sessdir.Insert(0, fSandbox.Data());
00822 
00823    // Session working and queries dir
00824    fWorkDir.Form("%s/session-%s", sessdir.Data(), stag.Data());
00825    if (AssertPath(fWorkDir, kTRUE) != 0) return -1;
00826 
00827    // Create symlink to the last session
00828    TString lastsess;
00829    lastsess.Form("%s/last-lite-session", sessdir.Data());
00830    gSystem->Unlink(lastsess);
00831    gSystem->Symlink(fWorkDir, lastsess);
00832 
00833    // Queries Dir: local to the working dir, unless required differently
00834    fQueryDir = gEnv->GetValue("Proof.QueryDir", "");
00835    if (fQueryDir.IsNull())
00836       fQueryDir.Form("%s/%s", sessdir.Data(), kPROOF_QueryDir);
00837    if (AssertPath(fQueryDir, kTRUE) != 0) return -1;
00838 
00839    // Cleanup old sessions dirs
00840    CleanupSandbox();
00841 
00842    // Done
00843    return 0;
00844 }
00845 
00846 //______________________________________________________________________________
00847 void TProofLite::Print(Option_t *option) const
00848 {
00849    // Print status of PROOF-Lite cluster.
00850 
00851    if (IsParallel())
00852       Printf("*** PROOF-Lite cluster (parallel mode, %d workers):", GetParallel());
00853    else
00854       Printf("*** PROOF-Lite cluster (sequential mode)");
00855 
00856    Printf("Host name:                  %s", gSystem->HostName());
00857    Printf("User:                       %s", GetUser());
00858    TString ver(gROOT->GetVersion());
00859    if (gROOT->GetSvnRevision() > 0)
00860       ver += Form("|r%d", gROOT->GetSvnRevision());
00861    if (gSystem->Getenv("ROOTVERSIONTAG"))
00862       ver += Form("|%s", gSystem->Getenv("ROOTVERSIONTAG"));
00863    Printf("ROOT version|rev|tag:       %s", ver.Data());
00864    Printf("Architecture-Compiler:      %s-%s", gSystem->GetBuildArch(),
00865                                                gSystem->GetBuildCompilerVersion());
00866    Printf("Protocol version:           %d", GetClientProtocol());
00867    Printf("Working directory:          %s", gSystem->WorkingDirectory());
00868    Printf("Communication path:         %s", fSockPath.Data());
00869    Printf("Log level:                  %d", GetLogLevel());
00870    Printf("Number of workers:          %d", GetNumberOfSlaves());
00871    Printf("Number of active workers:   %d", GetNumberOfActiveSlaves());
00872    Printf("Number of unique workers:   %d", GetNumberOfUniqueSlaves());
00873    Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
00874    Printf("Number of bad workers:      %d", GetNumberOfBadSlaves());
00875    Printf("Total MB's processed:       %.2f", float(GetBytesRead())/(1024*1024));
00876    Printf("Total real time used (s):   %.3f", GetRealTime());
00877    Printf("Total CPU time used (s):    %.3f", GetCpuTime());
00878    if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
00879       Printf("List of workers:");
00880       TIter nextslave(fSlaves);
00881       while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
00882          if (sl->IsValid())
00883             sl->Print(option);
00884       }
00885    }
00886 }
00887 
00888 //______________________________________________________________________________
00889 TProofQueryResult *TProofLite::MakeQueryResult(Long64_t nent, const char *opt,
00890                                                Long64_t fst, TDSet *dset,
00891                                                const char *selec)
00892 {
00893    // Create a TProofQueryResult instance for this query.
00894 
00895    // Increment sequential number
00896    Int_t seqnum = -1;
00897    if (fQMgr) {
00898       fQMgr->IncrementSeqNum();
00899       seqnum = fQMgr->SeqNum();
00900    }
00901 
00902    // Create the instance and add it to the list
00903    TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt,
00904                                                   fPlayer->GetInputList(), nent,
00905                                                   fst, dset, selec,
00906                                                   (dset ? dset->GetEntryList() : 0));
00907    // Title is the session identifier
00908    pqr->SetTitle(GetName());
00909 
00910    return pqr;
00911 }
00912 
00913 //______________________________________________________________________________
00914 void TProofLite::SetQueryRunning(TProofQueryResult *pq)
00915 {
00916    // Set query in running state.
00917 
00918    // Record current position in the log file at start
00919    fflush(fLogFileW);
00920    Int_t startlog = lseek(fileno(fLogFileW), (off_t) 0, SEEK_END);
00921 
00922    // Add some header to logs
00923    Printf(" ");
00924    Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
00925 
00926    // Build the list of loaded PAR packages
00927    TString parlist = "";
00928    TIter nxp(fEnabledPackagesOnClient);
00929    TObjString *os= 0;
00930    while ((os = (TObjString *)nxp())) {
00931       if (parlist.Length() <= 0)
00932          parlist = os->GetName();
00933       else
00934          parlist += Form(";%s",os->GetName());
00935    }
00936 
00937    // Set in running state
00938    pq->SetRunning(startlog, parlist, GetParallel());
00939 
00940    // Bytes and CPU at start (we will calculate the differential at end)
00941    pq->SetProcessInfo(pq->GetEntries(), GetCpuTime(), GetBytesRead());
00942 }
00943 
00944 //______________________________________________________________________________
00945 Long64_t TProofLite::DrawSelect(TDSet *dset, const char *varexp,
00946                                 const char *selection, Option_t *option,
00947                                 Long64_t nentries, Long64_t first)
00948 {
00949    // Execute the specified drawing action on a data set (TDSet).
00950    // Event- or Entry-lists should be set in the data set object using
00951    // TDSet::SetEntryList.
00952    // Returns -1 in case of error or number of selected events otherwise.
00953 
00954    if (!IsValid()) return -1;
00955 
00956    // Make sure that asynchronous processing is not active
00957    if (!IsIdle()) {
00958       Info("DrawSelect","not idle, asynchronous Draw not supported");
00959       return -1;
00960    }
00961    TString opt(option);
00962    Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
00963    if (idx != kNPOS)
00964       opt.Replace(idx,4,"");
00965 
00966    // Fill the internal variables
00967    fVarExp = varexp;
00968    fSelection = selection;
00969    
00970    return Process(dset, "draw:", opt, nentries, first);
00971 }
00972 
00973 //______________________________________________________________________________
00974 Long64_t TProofLite::Process(TDSet *dset, const char *selector, Option_t *option,
00975                              Long64_t nentries, Long64_t first)
00976 {
00977    // Process a data set (TDSet) using the specified selector (.C) file.
00978    // Entry- or event-lists should be set in the data set object using
00979    // TDSet::SetEntryList.
00980    // The return value is -1 in case of error and TSelector::GetStatus() in
00981    // in case of success.
00982 
00983    // For the time being cannot accept other queries if not idle, even if in async
00984    // mode; needs to set up an event handler to manage that
00985 
00986    // Resolve query mode
00987    fSync = (GetQueryMode(option) == kSync);
00988    if (!fSync) {
00989       Info("Process","asynchronous mode not yet supported in PROOF-Lite");
00990       return -1;
00991    }
00992 
00993    if (!IsIdle()) {
00994       // Notify submission
00995       Info("Process", "not idle: cannot accept queries");
00996       return -1;
00997    }
00998 
00999    // Cleanup old temporary datasets
01000    if (IsIdle() && fRunningDSets && fRunningDSets->GetSize() > 0) {
01001       fRunningDSets->SetOwner(kTRUE);
01002       fRunningDSets->Delete();
01003    }
01004 
01005    if (!IsValid() || !fQMgr || !fPlayer) {
01006       Error("Process", "invalid sesion or query-result manager undefined!");
01007       return -1;
01008    }
01009 
01010    // Make sure that all enabled workers get some work, unless stated
01011    // differently
01012    if (!fPlayer->GetInputList()->FindObject("PROOF_MaxSlavesPerNode"))
01013       SetParameter("PROOF_MaxSlavesPerNode", (Long_t)fNWorkers);
01014 
01015    Bool_t hasNoData = (!dset || (dset && dset->TestBit(TDSet::kEmpty))) ? kTRUE : kFALSE;
01016 
01017    // If just a name was given to identify the dataset, retrieve it from the
01018    // local files
01019    // Make sure the dataset contains the information needed
01020    if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
01021       TString emsg;
01022       if (TProof::AssertDataSet(dset, fPlayer->GetInputList(), fDataSetManager, emsg) != 0) {
01023          Error("Process", "from AssertDataSet: %s", emsg.Data());
01024          return -1;
01025       }
01026       if (dset->GetListOfElements()->GetSize() == 0) {
01027          Error("Process", "no files to process!");
01028          return -1;
01029       }
01030    }
01031 
01032    TString selec(selector), varexp, selection, objname;
01033    // If a draw query, extract the relevant info
01034    if (selec.BeginsWith("draw:")) {
01035       varexp = fVarExp;
01036       selection = fSelection;
01037       // Decode now the expression
01038       if (fPlayer->GetDrawArgs(varexp, selection, option, selec, objname) != 0) {
01039          Error("Process", "draw query: error parsing arguments '%s', '%s', '%s'",
01040                           varexp.Data(), selection.Data(), option);
01041          return -1;
01042       }
01043    }
01044 
01045    // Create instance of query results (the data set is added after Process)
01046    TProofQueryResult *pq = MakeQueryResult(nentries, option, first, 0, selec);
01047 
01048    // If not a draw action add the query to the main list
01049    if (!(pq->IsDraw())) {
01050       if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
01051       // Also save it to queries dir
01052       fQMgr->SaveQuery(pq);
01053    }
01054 
01055    // Set the query number
01056    fSeqNum = pq->GetSeqNum();
01057 
01058    // Set in running state
01059    SetQueryRunning(pq);
01060 
01061    // Save to queries dir, if not standard draw
01062    if (!(pq->IsDraw()))
01063       fQMgr->SaveQuery(pq);
01064    else
01065       fQMgr->IncrementDrawQueries();
01066 
01067    // Start or reset the progress dialog
01068    if (!gROOT->IsBatch()) {
01069       Int_t dsz = (dset && dset->GetListOfElements()) ? dset->GetListOfElements()->GetSize() : -1;
01070       if (fProgressDialog &&
01071           !TestBit(kUsingSessionGui) && TestBit(kUseProgressDialog)) {
01072          if (!fProgressDialogStarted) {
01073             fProgressDialog->ExecPlugin(5, this, selec.Data(), dsz,
01074                                            first, nentries);
01075             fProgressDialogStarted = kTRUE;
01076          } else {
01077             ResetProgressDialog(selec.Data(), dsz, first, nentries);
01078          }
01079       }
01080       ResetBit(kUsingSessionGui);
01081    }
01082 
01083    // Add query results to the player lists
01084    if (!(pq->IsDraw()))
01085       fPlayer->AddQueryResult(pq);
01086 
01087    // Set query currently processed
01088    fPlayer->SetCurrentQuery(pq);
01089 
01090    // Make sure the unique query tag is available as TNamed object in the
01091    // input list so that it can be used in TSelectors for monitoring
01092    TNamed *qtag = (TNamed *) fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
01093    if (qtag) {
01094       qtag->SetTitle(Form("%s:%s",pq->GetTitle(),pq->GetName()));
01095    } else {
01096       TObject *o = fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
01097       if (o) fPlayer->GetInputList()->Remove(o);
01098       fPlayer->AddInput(new TNamed("PROOF_QueryTag",
01099                                    Form("%s:%s",pq->GetTitle(),pq->GetName())));
01100    }
01101 
01102    // Set PROOF to running state
01103    SetRunStatus(TProof::kRunning);
01104 
01105    // deactivate the default application interrupt handler
01106    // ctrl-c's will be forwarded to PROOF to stop the processing
01107    TSignalHandler *sh = 0;
01108    if (fSync) {
01109       if (gApplication)
01110          sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
01111    }
01112 
01113    // Start the additional workers now if using fork-based startup
01114    TList *startedWorkers = 0;
01115    if (fForkStartup) {
01116       startedWorkers = new TList;
01117       startedWorkers->SetOwner(kFALSE);
01118       SetupWorkers(1, startedWorkers);
01119    }
01120 
01121    Long64_t rv = 0;
01122    if (!(pq->IsDraw())) {
01123       rv = fPlayer->Process(dset, selec, option, nentries, first);
01124    } else {
01125       rv = fPlayer->DrawSelect(dset, varexp, selection, option, nentries, first);
01126    }
01127 
01128    if (fSync) {
01129 
01130       // Terminate additional workers if using fork-based startup
01131       if (fForkStartup && startedWorkers) {
01132          RemoveWorkers(startedWorkers);
01133          SafeDelete(startedWorkers);
01134       }
01135 
01136       // reactivate the default application interrupt handler
01137       if (sh)
01138          gSystem->AddSignalHandler(sh);
01139 
01140       // Return number of events processed
01141       if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
01142          Bool_t abort = (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted)
01143                      ? kTRUE : kFALSE;
01144          if (abort) fPlayer->StopProcess(kTRUE);
01145          Emit("StopProcess(Bool_t)", abort);
01146       }
01147 
01148       // In PROOFLite this has to be done once only in TProofLite::Process
01149       pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
01150       // If the last object, notify the GUI that the result arrived
01151       QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
01152       // Processing is over
01153       UpdateDialog();
01154 
01155       // Save the data set into the TQueryResult (should be done after Process to avoid
01156       // improper deletion during collection)
01157       if (rv == 0 && dset && pq->GetInputList()) {
01158          pq->GetInputList()->Add(dset);
01159          if (dset->GetEntryList())
01160             pq->GetInputList()->Add(dset->GetEntryList());
01161       }
01162 
01163       // Register any dataset produced during this processing, if required
01164       if (fDataSetManager && fPlayer->GetOutputList()) {
01165          TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
01166          if (psr) {
01167             if (RegisterDataSets(fPlayer->GetInputList(), fPlayer->GetOutputList()) != 0)
01168                Warning("ProcessNext", "problems registering produced datasets");
01169             fPlayer->GetOutputList()->Remove(psr);
01170             delete psr;
01171          }
01172       }
01173 
01174       // Complete filling of the TQueryResult instance
01175       AskStatistics();
01176       if (!(pq->IsDraw())) {
01177          if (fQMgr->FinalizeQuery(pq, this, fPlayer)) {
01178             // Automatic saving is controlled by ProofLite.AutoSaveQueries
01179             if (!strcmp(gEnv->GetValue("ProofLite.AutoSaveQueries", "off"), "on"))
01180                fQMgr->SaveQuery(pq, -1);
01181          }
01182       }
01183 
01184       // Remove aborted queries from the list
01185       if (fPlayer && fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
01186          if (fPlayer->GetListOfResults()) fPlayer->GetListOfResults()->Remove(pq);
01187          if (fQMgr) fQMgr->RemoveQuery(pq);
01188       } else {
01189          // If the last object, notify the GUI that the result arrived
01190          QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
01191          // Keep in memory only light info about a query
01192          if (!(pq->IsDraw())) {
01193             if (fQMgr && fQMgr->Queries())
01194                // Remove from the fQueries list
01195                fQMgr->Queries()->Remove(pq);
01196          }
01197          // To get the prompt back
01198          TString msg;
01199          msg.Form("Lite-0: all output objects have been merged                                                         ");
01200          fprintf(stderr, "%s\n", msg.Data());
01201       }
01202 
01203    }
01204 
01205    // Done
01206    return rv;
01207 }
01208 
01209 //______________________________________________________________________________
01210 Int_t TProofLite::CreateSymLinks(TList *files)
01211 {
01212    // Create in each worker sandbox symlinks to the files in the list
01213    // Used to make the caceh information available to workers.
01214 
01215    Int_t rc = 0;
01216    if (files) {
01217       TIter nxf(files);
01218       TObjString *os = 0;
01219       while ((os = (TObjString *) nxf())) {
01220          // Expand target
01221          TString tgt(os->GetName());
01222          gSystem->ExpandPathName(tgt);
01223          // Loop over active workers
01224          TIter nxw(fActiveSlaves);
01225          TSlave *wrk = 0;
01226          while ((wrk = (TSlave *) nxw())) {
01227             // Link name
01228             TString lnk = Form("%s/%s", wrk->GetWorkDir(), gSystem->BaseName(os->GetName()));
01229             gSystem->Unlink(lnk);
01230             if (gSystem->Symlink(tgt, lnk) != 0) {
01231                rc++;
01232                Warning("CreateSymLinks", "problems creating sym link: %s", lnk.Data());
01233             }
01234          }
01235       }
01236    } else {
01237       Warning("CreateSymLinks", "files list is undefined");
01238    }
01239    // Done
01240    return rc;
01241 }
01242 
01243 //______________________________________________________________________________
01244 Int_t TProofLite::InitDataSetManager()
01245 {
01246    // Initialize the dataset manager from directives or from defaults
01247    // Return 0 on success, -1 on failure
01248 
01249    fDataSetManager = 0;
01250 
01251    // Default user and group
01252    TString user("???"), group("default");
01253    UserGroup_t *pw = gSystem->GetUserInfo();
01254    if (pw) {
01255       user = pw->fUser;
01256       delete pw;
01257    }
01258 
01259    // Dataset manager instance via plug-in
01260    TPluginHandler *h = 0;
01261    TString dsm = gEnv->GetValue("Proof.DataSetManager", "");
01262    if (!dsm.IsNull()) {
01263       // Get plugin manager to load the appropriate TDataSetManager
01264       if (gROOT->GetPluginManager()) {
01265          // Find the appropriate handler
01266          h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
01267          if (h && h->LoadPlugin() != -1) {
01268             // make instance of the dataset manager
01269             fDataSetManager =
01270                reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, group.Data(),
01271                                                          user.Data(), dsm.Data()));
01272          }
01273       }
01274    }
01275    if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
01276       Warning("InitDataSetManager", "dataset manager plug-in initialization failed");
01277       SafeDelete(fDataSetManager);
01278    }
01279 
01280    // If no valid dataset manager has been created we instantiate the default one
01281    if (!fDataSetManager) {
01282       TString opts("Av:");
01283       TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
01284       if (dsetdir.IsNull()) {
01285          // Use the default in the sandbox
01286          dsetdir = fDataSetDir;
01287          opts += "Sb:";
01288       }
01289       // Find the appropriate handler
01290       if (!h) {
01291          h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
01292          if (h && h->LoadPlugin() == -1) h = 0;
01293       }
01294       if (h) {
01295          // make instance of the dataset manager
01296          fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
01297                            group.Data(), user.Data(),
01298                            Form("dir:%s opt:%s", dsetdir.Data(), opts.Data())));
01299       }
01300       if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
01301          Warning("InitDataSetManager", "default dataset manager plug-in initialization failed");
01302          SafeDelete(fDataSetManager);
01303       }
01304    }
01305 
01306    if (gDebug > 0 && fDataSetManager) {
01307       Info("InitDataSetManager", "datasetmgr Cq: %d, Ar: %d, Av: %d, Ti: %d, Sb: %d",
01308             fDataSetManager->TestBit(TDataSetManager::kCheckQuota),
01309             fDataSetManager->TestBit(TDataSetManager::kAllowRegister),
01310             fDataSetManager->TestBit(TDataSetManager::kAllowVerify),
01311             fDataSetManager->TestBit(TDataSetManager::kTrustInfo),
01312             fDataSetManager->TestBit(TDataSetManager::kIsSandbox));
01313    }
01314 
01315    // Done
01316    return (fDataSetManager ? 0 : -1);
01317 }
01318 
01319 //______________________________________________________________________________
01320 void TProofLite::ShowCache(Bool_t)
01321 {
01322    // List contents of file cache. If all is true show all caches also on
01323    // slaves. If everything is ok all caches are to be the same.
01324 
01325    if (!IsValid()) return;
01326 
01327    Printf("*** Local file cache %s ***", fCacheDir.Data());
01328    gSystem->Exec(Form("%s %s", kLS, fCacheDir.Data()));
01329 }
01330 
01331 //______________________________________________________________________________
01332 void TProofLite::ClearCache(const char *file)
01333 {
01334     // Remove files from all file caches.
01335 
01336    if (!IsValid()) return;
01337 
01338    fCacheLock->Lock();
01339    if (!file || strlen(file) <= 0) {
01340       gSystem->Exec(Form("%s %s/*", kRM, fCacheDir.Data()));
01341    } else {
01342       gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), file));
01343    }
01344    fCacheLock->Unlock();
01345 }
01346 
01347 //______________________________________________________________________________
01348 Int_t TProofLite::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueOnly,
01349                        TList *wrks)
01350 {
01351    // Copy the specified macro in the cache directory. The macro file is
01352    // uploaded if new or updated. If existing, the corresponding header
01353    // basename(macro).h or .hh, is also uploaded. For the other arguments
01354    // see TProof::Load().
01355    // Returns 0 in case of success and -1 in case of error.
01356 
01357    if (!IsValid()) return -1;
01358 
01359    if (!macro || !strlen(macro)) {
01360       Error("Load", "need to specify a macro name");
01361       return -1;
01362    }
01363 
01364    TString macs(macro), mac;
01365    Int_t from = 0;
01366    while (macs.Tokenize(mac, from, ",")) {
01367       if (CopyMacroToCache(mac) < 0) return -1;
01368    }
01369 
01370    return TProof::Load(macro, notOnClient, uniqueOnly, wrks);
01371 }
01372 
01373 //______________________________________________________________________________
01374 Int_t TProofLite::CopyMacroToCache(const char *macro, Int_t headerRequired,
01375                                    TSelector **selector, Int_t opt)
01376 {
01377    // Copy a macro, and its possible associated .h[h] file,
01378    // to the cache directory, from where the workers can get the file.
01379    // If headerRequired is 1, return -1 in case the header is not found.
01380    // If headerRequired is 0, try to copy header too.
01381    // If headerRequired is -1, don't look for header, only copy macro.
01382    // If the selector pionter is not 0, consider the macro to be a selector
01383    // and try to load the selector and set it to the pointer.
01384    // The mask 'opt' is an or of ESendFileOpt:
01385    //       kCpBin   (0x8)     Retrieve from the cache the binaries associated
01386    //                          with the file
01387    //       kCp      (0x10)    Retrieve the files from the cache
01388    // Return -1 in case of error, 0 otherwise.
01389 
01390    // Relevant pointers
01391    TString cacheDir = fCacheDir;
01392    gSystem->ExpandPathName(cacheDir);
01393    TProofLockPath *cacheLock = fCacheLock;
01394 
01395    // Split out the aclic mode, if any
01396    TString name = macro;
01397    TString acmode, args, io;
01398    name = gSystem->SplitAclicMode(name, acmode, args, io);
01399 
01400    PDB(kGlobal,1)
01401       Info("CopyMacroToCache", "enter: names: %s, %s", macro, name.Data());
01402 
01403    // Make sure that the file exists
01404    if (gSystem->AccessPathName(name, kReadPermission)) {
01405       Error("CopyMacroToCache", "file %s not found or not readable", name.Data());
01406       return -1;
01407    }
01408 
01409    // Update the macro path
01410    TString mp(TROOT::GetMacroPath());
01411    TString np(gSystem->DirName(name));
01412    if (!np.IsNull()) {
01413       np += ":";
01414       if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
01415          Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
01416          mp.Insert(ip, np);
01417          TROOT::SetMacroPath(mp);
01418          if (gDebug > 0)
01419             Info("CopyMacroToCache", "macro path set to '%s'", TROOT::GetMacroPath());
01420       }
01421    }
01422 
01423    // Check the header file
01424    Int_t dot = name.Last('.');
01425    const char *hext[] = { ".h", ".hh", "" };
01426    TString hname, checkedext;
01427    Int_t i = 0;
01428    while (strlen(hext[i]) > 0) {
01429       hname = name(0, dot);
01430       hname += hext[i];
01431       if (!gSystem->AccessPathName(hname, kReadPermission))
01432          break;
01433       if (!checkedext.IsNull()) checkedext += ",";
01434       checkedext += hext[i];
01435       hname = "";
01436       i++;
01437    }
01438    if (hname.IsNull() && headerRequired == 1) {
01439       Error("CopyMacroToCache", "header file for %s not found or not readable "
01440             "(checked extensions: %s)", name.Data(), checkedext.Data());
01441       return -1;
01442    }
01443    if (headerRequired < 0)
01444       hname = "";
01445 
01446    cacheLock->Lock();
01447 
01448    // Check these files with those in the cache (if any)
01449    Bool_t useCacheBinaries = kFALSE;
01450    TString cachedname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(name));
01451    TString cachedhname;
01452    if (!hname.IsNull())
01453       cachedhname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(hname));
01454    if (!gSystem->AccessPathName(cachedname, kReadPermission)) {
01455       TMD5 *md5 = TMD5::FileChecksum(name);
01456       TMD5 *md5cache = TMD5::FileChecksum(cachedname);
01457       if (md5 && md5cache && (*md5 == *md5cache))
01458          useCacheBinaries = kTRUE;
01459       if (!hname.IsNull()) {
01460          if (!gSystem->AccessPathName(cachedhname, kReadPermission)) {
01461             TMD5 *md5h = TMD5::FileChecksum(hname);
01462             TMD5 *md5hcache = TMD5::FileChecksum(cachedhname);
01463             if (md5h && md5hcache && (*md5h != *md5hcache))
01464                useCacheBinaries = kFALSE;
01465             SafeDelete(md5h);
01466             SafeDelete(md5hcache);
01467          }
01468       }
01469       SafeDelete(md5);
01470       SafeDelete(md5cache);
01471    }
01472 
01473    // Create version file name template
01474    TString vername(Form(".%s", name.Data()));
01475    dot = vername.Last('.');
01476    if (dot != kNPOS)
01477       vername.Remove(dot);
01478    vername += ".binversion";
01479    Bool_t savever = kFALSE;
01480 
01481    // Check binary version
01482    if (useCacheBinaries) {
01483       TString v;
01484       Int_t rev = -1;
01485       FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "r");
01486       if (f) {
01487          TString r;
01488          v.Gets(f);
01489          r.Gets(f);
01490          rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
01491          fclose(f);
01492       }
01493       if (!f || v != gROOT->GetVersion() ||
01494           (gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision()))
01495          useCacheBinaries = kFALSE;
01496    }
01497 
01498    // Create binary name template
01499    TString binname = gSystem->BaseName(name);
01500    dot = binname.Last('.');
01501    if (dot != kNPOS)
01502       binname.Replace(dot,1,"_");
01503    binname += ".";
01504 
01505    FileStat_t stlocal, stcache;
01506    void *dirp = 0;
01507    if (useCacheBinaries) {
01508       // Loop over binaries in the cache and copy them locally if newer then the local
01509       // versions or there is no local version
01510       dirp = gSystem->OpenDirectory(cacheDir);
01511       if (dirp) {
01512          const char *e = 0;
01513          while ((e = gSystem->GetDirEntry(dirp))) {
01514             if (!strncmp(e, binname.Data(), binname.Length())) {
01515                TString fncache = Form("%s/%s", cacheDir.Data(), e);
01516                Bool_t docp = kTRUE;
01517                if (!gSystem->GetPathInfo(fncache, stcache)) {
01518                   Int_t rc = gSystem->GetPathInfo(e, stlocal);
01519                   if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
01520                      docp = kFALSE;
01521                   // Copy the file, if needed
01522                   if (docp) {
01523                      gSystem->Exec(Form("%s %s", kRM, e));
01524                      PDB(kGlobal,2)
01525                         Info("CopyMacroToCache",
01526                              "retrieving %s from cache", fncache.Data());
01527                      gSystem->Exec(Form("%s %s %s", kCP, fncache.Data(), e));
01528                   }
01529                }
01530             }
01531          }
01532          gSystem->FreeDirectory(dirp);
01533       }
01534    }
01535    cacheLock->Unlock();
01536 
01537    if (selector) {
01538       // Now init the selector in optimized way
01539       if (!(*selector = TSelector::GetSelector(macro))) {
01540          Error("CopyMacroToCache", "could not create a selector from %s", macro);
01541          return -1;
01542       }
01543    }
01544 
01545    cacheLock->Lock();
01546 
01547    TList *cachedFiles = new TList;
01548    // Save information in the cache now for later usage
01549    dirp = gSystem->OpenDirectory(".");
01550    if (dirp) {
01551       const char *e = 0;
01552       while ((e = gSystem->GetDirEntry(dirp))) {
01553          if (!strncmp(e, binname.Data(), binname.Length())) {
01554             Bool_t docp = kTRUE;
01555             if (!gSystem->GetPathInfo(e, stlocal)) {
01556                TString fncache = Form("%s/%s", cacheDir.Data(), e);
01557                Int_t rc = gSystem->GetPathInfo(fncache, stcache);
01558                if (rc == 0 && (stlocal.fMtime <= stcache.fMtime))
01559                   docp = kFALSE;
01560                // Copy the file, if needed
01561                if (docp) {
01562                   gSystem->Exec(Form("%s %s", kRM, fncache.Data()));
01563                   PDB(kGlobal,2)
01564                      Info("CopyMacroToCache","caching %s ...", e);
01565                   gSystem->Exec(Form("%s %s %s", kCP, e, fncache.Data()));
01566                   savever = kTRUE;
01567                }
01568                if (opt & kCpBin)
01569                   cachedFiles->Add(new TObjString(fncache.Data()));
01570             }
01571          }
01572       }
01573       gSystem->FreeDirectory(dirp);
01574    }
01575 
01576    // Save binary version if requested
01577    if (savever) {
01578       FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "w");
01579       if (f) {
01580          fputs(gROOT->GetVersion(), f);
01581          fputs(Form("\n%d",gROOT->GetSvnRevision()), f);
01582          fclose(f);
01583       }
01584    }
01585 
01586    // Save also the selector info, if needed
01587    if (!useCacheBinaries) {
01588       gSystem->Exec(Form("%s %s", kRM, cachedname.Data()));
01589       PDB(kGlobal,2)
01590          Info("CopyMacroToCache","caching %s ...", name.Data());
01591       gSystem->Exec(Form("%s %s %s", kCP, name.Data(), cachedname.Data()));
01592       if (!hname.IsNull()) {
01593          gSystem->Exec(Form("%s %s", kRM, cachedhname.Data()));
01594          PDB(kGlobal,2)
01595             Info("CopyMacroToCache","caching %s ...", hname.Data());
01596          gSystem->Exec(Form("%s %s %s", kCP, hname.Data(), cachedhname.Data()));
01597       }
01598    }
01599    if (opt & kCp) {
01600       cachedFiles->Add(new TObjString(cachedname.Data()));
01601       if (!hname.IsNull())
01602          cachedFiles->Add(new TObjString(cachedhname.Data()));
01603    }
01604 
01605    cacheLock->Unlock();
01606 
01607    // Create symlinks
01608    if (opt & (kCp | kCpBin))
01609       CreateSymLinks(cachedFiles);
01610 
01611    cachedFiles->SetOwner();
01612    delete cachedFiles;
01613 
01614    return 0;
01615 }
01616 
01617 //______________________________________________________________________________
01618 Int_t TProofLite::CleanupSandbox()
01619 {
01620    // Remove old sessions dirs keep at most 'Proof.MaxOldSessions' (default 10)
01621 
01622    Int_t maxold = gEnv->GetValue("Proof.MaxOldSessions", 10);
01623 
01624    if (maxold < 0) return 0;
01625 
01626    TSortedList *olddirs = new TSortedList(kFALSE);
01627 
01628    TString sandbox = gSystem->DirName(fWorkDir.Data());
01629 
01630    void *dirp = gSystem->OpenDirectory(sandbox);
01631    if (dirp) {
01632       const char *e = 0;
01633       while ((e = gSystem->GetDirEntry(dirp))) {
01634          if (!strncmp(e, "session-", 8) && !strstr(e, GetName())) {
01635             TString d(e);
01636             Int_t i = d.Last('-');
01637             if (i != kNPOS) d.Remove(i);
01638             i = d.Last('-');
01639             if (i != kNPOS) d.Remove(0,i+1);
01640             TString path = Form("%s/%s", sandbox.Data(), e);
01641             olddirs->Add(new TNamed(d, path));
01642          }
01643       }
01644       gSystem->FreeDirectory(dirp);
01645    }
01646 
01647    // Clean it up, if required
01648    Bool_t notify = kTRUE;
01649    while (olddirs->GetSize() > maxold) {
01650       if (notify && gDebug > 0)
01651          Printf("Cleaning sandbox at: %s", sandbox.Data());
01652       notify = kFALSE;
01653       TNamed *n = (TNamed *) olddirs->Last();
01654       if (n) {
01655          gSystem->Exec(Form("%s %s", kRM, n->GetTitle()));
01656          olddirs->Remove(n);
01657          delete n;
01658       }
01659    }
01660 
01661    // Cleanup
01662    olddirs->SetOwner();
01663    delete olddirs;
01664 
01665    // Done
01666    return 0;
01667 }
01668 
01669 //______________________________________________________________________________
01670 TList *TProofLite::GetListOfQueries(Option_t *opt)
01671 {
01672    // Get the list of queries.
01673 
01674    Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
01675 
01676    TList *ql = new TList;
01677    Int_t ntot = 0, npre = 0, ndraw= 0;
01678    if (fQMgr) {
01679       if (all) {
01680          // Rescan
01681          TString qdir = fQueryDir;
01682          Int_t idx = qdir.Index("session-");
01683          if (idx != kNPOS)
01684             qdir.Remove(idx);
01685          fQMgr->ScanPreviousQueries(qdir);
01686          // Gather also information about previous queries, if any
01687          if (fQMgr->PreviousQueries()) {
01688             TIter nxq(fQMgr->PreviousQueries());
01689             TProofQueryResult *pqr = 0;
01690             while ((pqr = (TProofQueryResult *)nxq())) {
01691                ntot++;
01692                pqr->fSeqNum = ntot;
01693                ql->Add(pqr);
01694             }
01695          }
01696       }
01697 
01698       npre = ntot;
01699       if (fQMgr->Queries()) {
01700          // Add info about queries in this session
01701          TIter nxq(fQMgr->Queries());
01702          TProofQueryResult *pqr = 0;
01703          TQueryResult *pqm = 0;
01704          while ((pqr = (TProofQueryResult *)nxq())) {
01705             ntot++;
01706             pqm = pqr->CloneInfo();
01707             pqm->fSeqNum = ntot;
01708             ql->Add(pqm);
01709          }
01710       }
01711       // Number of draw queries
01712       ndraw = fQMgr->DrawQueries();
01713    }
01714 
01715    fOtherQueries = npre;
01716    fDrawQueries = ndraw;
01717    if (fQueries) {
01718       fQueries->Delete();
01719       delete fQueries;
01720       fQueries = 0;
01721    }
01722    fQueries = ql;
01723 
01724    // This should have been filled by now
01725    return fQueries;
01726 }
01727 
01728 //______________________________________________________________________________
01729 Bool_t TProofLite::RegisterDataSet(const char *uri,
01730                                    TFileCollection *dataSet, const char* optStr)
01731 {
01732    // Register the 'dataSet' on the cluster under the current
01733    // user, group and the given 'dataSetName'.
01734    // Fails if a dataset named 'dataSetName' already exists, unless 'optStr'
01735    // contains 'O', in which case the old dataset is overwritten.
01736    // If 'optStr' contains 'V' the dataset files are verified (default no
01737    // verification).
01738    // Returns kTRUE on success.
01739 
01740    if (!fDataSetManager) {
01741       Info("RegisterDataSet", "dataset manager not available");
01742       return kFALSE;
01743    }
01744 
01745    if (!uri || strlen(uri) <= 0) {
01746       Info("RegisterDataSet", "specifying a dataset name is mandatory");
01747       return kFALSE;
01748    }
01749 
01750    Bool_t result = kTRUE;
01751    if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
01752       // Check the list
01753       if (!dataSet || dataSet->GetList()->GetSize() == 0) {
01754          Error("RegisterDataSet", "can not save an empty list.");
01755          result = kFALSE;
01756       }
01757       // Register the dataset (quota checks are done inside here)
01758       result = (fDataSetManager->RegisterDataSet(uri, dataSet, optStr) == 0)
01759              ? kTRUE : kFALSE;
01760    } else {
01761       Info("RegisterDataSets", "dataset registration not allowed");
01762       result = kFALSE;
01763    }
01764 
01765    if (!result)
01766       Error("RegisterDataSet", "dataset was not saved");
01767 
01768    // Done
01769    return result;
01770 }
01771 
01772 //______________________________________________________________________________
01773 Int_t TProofLite::SetDataSetTreeName(const char *dataset, const char *treename)
01774 {
01775    // Set/Change the name of the default tree. The tree name may contain
01776    // subdir specification in the form "subdir/name".
01777    // Returns 0 on success, -1 otherwise.
01778 
01779    if (!fDataSetManager) {
01780       Info("ExistsDataSet", "dataset manager not available");
01781       return kFALSE;
01782    }
01783 
01784    if (!dataset || strlen(dataset) <= 0) {
01785       Info("SetDataSetTreeName", "specifying a dataset name is mandatory");
01786       return -1;
01787    }
01788 
01789    if (!treename || strlen(treename) <= 0) {
01790       Info("SetDataSetTreeName", "specifying a tree name is mandatory");
01791       return -1;
01792    }
01793 
01794    TUri uri(dataset);
01795    TString fragment(treename);
01796    if (!fragment.BeginsWith("/")) fragment.Insert(0, "/");
01797    uri.SetFragment(fragment);
01798 
01799    return fDataSetManager->ScanDataSet(uri.GetUri().Data(),
01800                                       (UInt_t)TDataSetManager::kSetDefaultTree);
01801 }
01802 
01803 //______________________________________________________________________________
01804 Bool_t TProofLite::ExistsDataSet(const char *uri)
01805 {
01806    // Returns kTRUE if 'dataset' described by 'uri' exists, kFALSE otherwise
01807 
01808    if (!fDataSetManager) {
01809       Info("ExistsDataSet", "dataset manager not available");
01810       return kFALSE;
01811    }
01812 
01813    if (!uri || strlen(uri) <= 0) {
01814       Error("ExistsDataSet", "dataset name missing");
01815       return kFALSE;
01816    }
01817 
01818    // Check if the dataset exists
01819    return fDataSetManager->ExistsDataSet(uri);
01820 }
01821 
01822 //______________________________________________________________________________
01823 TMap *TProofLite::GetDataSets(const char *uri, const char *srvex)
01824 {
01825    // lists all datasets that match given uri
01826 
01827    if (!fDataSetManager) {
01828       Info("GetDataSets", "dataset manager not available");
01829       return (TMap *)0;
01830    }
01831 
01832    // Get the datasets and return the map
01833    if (srvex && strlen(srvex) > 0) {
01834       return fDataSetManager->GetSubDataSets(uri, srvex);
01835    } else {
01836       UInt_t opt = (UInt_t)TDataSetManager::kExport;
01837       return fDataSetManager->GetDataSets(uri, opt);
01838    }
01839 }
01840 
01841 //______________________________________________________________________________
01842 void TProofLite::ShowDataSets(const char *uri, const char *opt)
01843 {
01844    // Shows datasets in locations that match the uri
01845    // By default shows the user's datasets and global ones
01846 
01847    if (!fDataSetManager) {
01848       Info("GetDataSet", "dataset manager not available");
01849       return;
01850    }
01851 
01852    fDataSetManager->ShowDataSets(uri, opt);
01853 }
01854 
01855 //______________________________________________________________________________
01856 TFileCollection *TProofLite::GetDataSet(const char *uri, const char *)
01857 {
01858    // Get a list of TFileInfo objects describing the files of the specified
01859    // dataset.
01860 
01861    if (!fDataSetManager) {
01862       Info("GetDataSet", "dataset manager not available");
01863       return (TFileCollection *)0;
01864    }
01865 
01866    if (!uri || strlen(uri) <= 0) {
01867       Info("GetDataSet", "specifying a dataset name is mandatory");
01868       return 0;
01869    }
01870 
01871    // Return the list
01872    return fDataSetManager->GetDataSet(uri);
01873 }
01874 
01875 //______________________________________________________________________________
01876 Int_t TProofLite::RemoveDataSet(const char *uri, const char *)
01877 {
01878    // Remove the specified dataset from the PROOF cluster.
01879    // Files are not deleted.
01880 
01881    if (!fDataSetManager) {
01882       Info("RemoveDataSet", "dataset manager not available");
01883       return -1;
01884    }
01885 
01886    if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
01887       if (!fDataSetManager->RemoveDataSet(uri)) {
01888          // Failure
01889          return -1;
01890       }
01891    } else {
01892       Info("RemoveDataSet", "dataset creation / removal not allowed");
01893       return -1;
01894    }
01895 
01896    // Done
01897    return 0;
01898 }
01899 
01900 //______________________________________________________________________________
01901 Int_t TProofLite::VerifyDataSet(const char *uri, const char *)
01902 {
01903    // Verify if all files in the specified dataset are available.
01904    // Print a list and return the number of missing files.
01905 
01906    if (!fDataSetManager) {
01907       Info("VerifyDataSet", "dataset manager not available");
01908       return -1;
01909    }
01910 
01911    Int_t rc = -1;
01912    if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
01913       rc = fDataSetManager->ScanDataSet(uri);
01914    } else {
01915       Info("VerifyDataSet", "dataset verification not allowed");
01916       return -1;
01917    }
01918 
01919    // Done
01920    return rc;
01921 }
01922 
01923 //______________________________________________________________________________
01924 void TProofLite::ClearDataSetCache(const char *dataset)
01925 {
01926    // Clear the content of the dataset cache, if any (matching 'dataset', if defined).
01927 
01928    if (fDataSetManager) fDataSetManager->ClearCache(dataset);
01929    // Done
01930    return;
01931 }
01932 
01933 //______________________________________________________________________________
01934 void TProofLite::ShowDataSetCache(const char *dataset)
01935 {
01936    // Display the content of the dataset cache, if any (matching 'dataset', if defined).
01937 
01938    // For PROOF-Lite act locally
01939    if (fDataSetManager) fDataSetManager->ShowCache(dataset);
01940    // Done
01941    return;
01942 }
01943 
01944 //______________________________________________________________________________
01945 void TProofLite::SendInputDataFile()
01946 {
01947    // Make sure that the input data objects are available to the workers in a
01948    // dedicated file in the cache; the objects are taken from the dedicated list
01949    // and / or the specified file.
01950    // If the fInputData is empty the specified file is sent over.
01951    // If there is no specified file, a file named "inputdata.root" is created locally
01952    // with the content of fInputData and sent over to the master.
01953    // If both fInputData and the specified file are not empty, a copy of the file
01954    // is made locally and augmented with the content of fInputData.
01955 
01956    // Prepare the file
01957    TString dataFile;
01958    PrepareInputDataFile(dataFile);
01959 
01960    // Make sure it is in the cache, if not empty
01961    if (dataFile.Length() > 0) {
01962 
01963       if (!dataFile.BeginsWith(fCacheDir)) {
01964          // Destination
01965          TString dst;
01966          dst.Form("%s/%s", fCacheDir.Data(), gSystem->BaseName(dataFile));
01967          // Remove it first if it exists
01968          if (!gSystem->AccessPathName(dst))
01969             gSystem->Unlink(dst);
01970          // Copy the file
01971          gSystem->CopyFile(dataFile, dst);
01972       }
01973 
01974       // Set the name in the input list so that the workers can find it
01975       AddInput(new TNamed("PROOF_InputDataFile", Form("%s", gSystem->BaseName(dataFile))));
01976    }
01977 }
01978 
01979 //______________________________________________________________________________
01980 Int_t TProofLite::Remove(const char *ref, Bool_t all)
01981 {
01982    // Handle remove request.
01983 
01984    PDB(kGlobal, 1)
01985       Info("Remove", "Enter: %s, %d", ref, all);
01986 
01987    if (all) {
01988       // Remove also local copies, if any
01989       if (fPlayer)
01990          fPlayer->RemoveQueryResult(ref);
01991    }
01992 
01993    TString queryref(ref);
01994 
01995    if (queryref == "cleanupdir") {
01996 
01997       // Cleanup previous sessions results
01998       Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
01999 
02000       // Notify
02001       Info("Remove", "%d directories removed", nd);
02002       // We are done
02003       return 0;
02004    }
02005 
02006 
02007    if (fQMgr) {
02008       TProofLockPath *lck = 0;
02009       if (fQMgr->LockSession(queryref, &lck) == 0) {
02010 
02011          // Remove query
02012          fQMgr->RemoveQuery(queryref, 0);
02013 
02014          // Unlock and remove the lock file
02015          if (lck) {
02016             gSystem->Unlink(lck->GetName());
02017             SafeDelete(lck);
02018          }
02019 
02020          // We are done
02021          return 0;
02022       }
02023    } else {
02024       Warning("Remove", "query result manager undefined!");
02025    }
02026 
02027    // Notify failure
02028    Info("Remove",
02029         "query %s could not be removed (unable to lock session)", queryref.Data());
02030 
02031    // Done
02032    return -1;
02033 }
02034 
02035 //______________________________________________________________________________
02036 TTree *TProofLite::GetTreeHeader(TDSet *dset)
02037 {
02038    // Creates a tree header (a tree with nonexisting files) object for
02039    // the DataSet.
02040 
02041    TTree *t = 0;
02042    if (!dset) {
02043       Error("GetTreeHeader", "undefined TDSet");
02044       return t;
02045    }
02046 
02047    dset->Reset();
02048    TDSetElement *e = dset->Next();
02049    Long64_t entries = 0;
02050    TFile *f = 0;
02051    if (!e) {
02052       PDB(kGlobal, 1) Info("GetTreeHeader", "empty TDSet");
02053    } else {
02054       f = TFile::Open(e->GetFileName());
02055       t = 0;
02056       if (f) {
02057          t = (TTree*) f->Get(e->GetObjName());
02058          if (t) {
02059             t->SetMaxVirtualSize(0);
02060             t->DropBaskets();
02061             entries = t->GetEntries();
02062 
02063             // compute #entries in all the files
02064             while ((e = dset->Next()) != 0) {
02065                TFile *f1 = TFile::Open(e->GetFileName());
02066                if (f1) {
02067                   TTree *t1 = (TTree*) f1->Get(e->GetObjName());
02068                   if (t1) {
02069                      entries += t1->GetEntries();
02070                      delete t1;
02071                   }
02072                   delete f1;
02073                }
02074             }
02075             t->SetMaxEntryLoop(entries);   // this field will hold the total number of entries ;)
02076          }
02077       }
02078    }
02079    // Done
02080    return t;
02081 }
02082 
02083 //______________________________________________________________________________
02084 void TProofLite::FindUniqueSlaves()
02085 {
02086    // Add to the fUniqueSlave list the active slaves that have a unique
02087    // (user) file system image. This information is used to transfer files
02088    // only once to nodes that share a file system (an image). Submasters
02089    // which are not in fUniqueSlaves are put in the fNonUniqueMasters
02090    // list. That list is used to trigger the transferring of files to
02091    // the submaster's unique slaves without the need to transfer the file
02092    // to the submaster.
02093 
02094    fUniqueSlaves->Clear();
02095    fUniqueMonitor->RemoveAll();
02096    fAllUniqueSlaves->Clear();
02097    fAllUniqueMonitor->RemoveAll();
02098    fNonUniqueMasters->Clear();
02099 
02100    if (fActiveSlaves->GetSize() <= 0) return;
02101 
02102    TSlave *wrk = dynamic_cast<TSlave*>(fActiveSlaves->First());
02103    if (!wrk) {
02104       Error("FindUniqueSlaves", "first object in fActiveSlaves not a TSlave: embarrasing!");
02105       return;
02106    }
02107    fUniqueSlaves->Add(wrk);
02108    fAllUniqueSlaves->Add(wrk);
02109    fUniqueMonitor->Add(wrk->GetSocket());
02110    fAllUniqueMonitor->Add(wrk->GetSocket());
02111 
02112    // will be actiavted in Collect()
02113    fUniqueMonitor->DeActivateAll();
02114    fAllUniqueMonitor->DeActivateAll();
02115 }
02116 
02117 //______________________________________________________________________________
02118 Int_t TProofLite::RegisterDataSets(TList *in, TList *out)
02119 {
02120    // Register TFileCollections in 'out' as datasets according to the rules in 'in'
02121 
02122    PDB(kDataset, 1) Info("RegisterDataSets", "enter");
02123 
02124    if (!in || !out) return 0;
02125 
02126    TString msg;
02127    TIter nxo(out);
02128    TObject *o = 0;
02129    while ((o = nxo())) {
02130       // Only file collections TFileCollection
02131       TFileCollection *ds = dynamic_cast<TFileCollection*> (o);
02132       if (ds) {
02133          // The tag and register option
02134          TNamed *fcn = 0;
02135          TString tag = TString::Format("DATASET_%s", ds->GetName());
02136          if (!(fcn = (TNamed *) out->FindObject(tag))) continue;
02137          // Register option
02138          TString regopt(fcn->GetTitle());
02139          // Register this dataset
02140          if (fDataSetManager) {
02141             if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
02142                // Extract the list
02143                if (ds->GetList()->GetSize() > 0) {
02144                   // Register the dataset (quota checks are done inside here)
02145                   msg.Form("Registering and verifying dataset '%s' ... ", ds->GetName());
02146                   Info("RegisterDataSets", "%s", msg.Data());
02147                   // Always allow verification for this action
02148                   Bool_t allowVerify = fDataSetManager->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
02149                   if (regopt.Contains("V") && !allowVerify)
02150                      fDataSetManager->SetBit(TDataSetManager::kAllowVerify);
02151                   Int_t rc = fDataSetManager->RegisterDataSet(ds->GetName(), ds, regopt);
02152                   // Reset to the previous state if needed
02153                   if (regopt.Contains("V") && !allowVerify)
02154                      fDataSetManager->ResetBit(TDataSetManager::kAllowVerify);
02155                   if (rc != 0) {
02156                      Warning("RegisterDataSets",
02157                               "failure registering dataset '%s'", ds->GetName());
02158                      msg.Form("Registering and verifying dataset '%s' ... failed! See log for more details", ds->GetName());
02159                   } else {
02160                      Info("RegisterDataSets", "dataset '%s' successfully registered", ds->GetName());
02161                      msg.Form("Registering and verifying dataset '%s' ... OK", ds->GetName());
02162                   }
02163                   Info("RegisterDataSets", "%s", msg.Data());
02164                   // Notify
02165                   PDB(kDataset, 2) {
02166                      Info("RegisterDataSets","printing collection");
02167                      ds->Print("F");
02168                   }
02169                } else {
02170                   Warning("RegisterDataSets", "collection '%s' is empty", o->GetName());
02171                }
02172             } else {
02173                Info("RegisterDataSets", "dataset registration not allowed");
02174                return -1;
02175             }
02176          } else {
02177             Error("RegisterDataSets", "dataset manager is undefined!");
02178             return -1;
02179          }
02180          // Cleanup temporary stuff
02181          out->Remove(fcn);
02182          SafeDelete(fcn);
02183       }
02184    }
02185 
02186    PDB(kDataset, 1) Info("RegisterDataSets", "exit");
02187    // Done
02188    return 0;
02189 }

Generated on Tue Jul 5 14:51:38 2011 for ROOT_528-00b_version by  doxygen 1.5.1