TProofCondor.cxx

Go to the documentation of this file.
00001 // @(#)root/proof:$Id: TProofCondor.cxx 36116 2010-10-06 13:58:05Z ganis $
00002 // Author: Fons Rademakers   13/02/97
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TProof                                                               //
00015 //                                                                      //
00016 // This class controls a Parallel ROOT Facility, PROOF, cluster.        //
00017 // It fires the slave servers, it keeps track of how many slaves are    //
00018 // running, it keeps track of the slaves running status, it broadcasts  //
00019 // messages to all slaves, it collects results, etc.                    //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 #include "TProofCondor.h"
00024 
00025 #include "TCondor.h"
00026 #include "TList.h"
00027 #include "TMap.h"
00028 #include "TMessage.h"
00029 #include "TMonitor.h"
00030 #include "TProofNodeInfo.h"
00031 #include "TProofResourcesStatic.h"
00032 #include "TProofServ.h"
00033 #include "TSlave.h"
00034 #include "TSocket.h"
00035 #include "TString.h"
00036 #include "TTimer.h"
00037 
00038 ClassImp(TProofCondor)
00039 
00040 //______________________________________________________________________________
00041 TProofCondor::TProofCondor(const char *masterurl, const char *conffile,
00042                            const char *confdir, Int_t loglevel,
00043                            const char *, TProofMgr *mgr)
00044   : fCondor(0), fTimer(0)
00045 {
00046    // Start proof using condor
00047 
00048    // Default initializations
00049    InitMembers();
00050 
00051    // This may be needed during init
00052    fManager = mgr;
00053 
00054    fUrl = TUrl(masterurl);
00055 
00056    if (!conffile || strlen(conffile) == 0) {
00057       conffile = kPROOF_ConfFile;
00058    } else if (!strncasecmp(conffile, "condor:", 7)) {
00059       conffile+=7;
00060    }
00061 
00062    if (!confdir  || strlen(confdir) == 0) {
00063       confdir = kPROOF_ConfDir;
00064    }
00065 
00066    Init(masterurl, conffile, confdir, loglevel);
00067 }
00068 
00069 //______________________________________________________________________________
00070 TProofCondor::~TProofCondor()
00071 {
00072    // Clean up Condor PROOF environment.
00073 
00074    SafeDelete(fCondor);
00075    SafeDelete(fTimer);
00076 }
00077 
00078 //______________________________________________________________________________
00079 Bool_t TProofCondor::StartSlaves(Bool_t)
00080 {
00081    // Setup Condor workers using dynamic information
00082 
00083    fCondor = new TCondor;
00084    TString jobad = GetJobAd();
00085 
00086    fImage = fCondor->GetImage(gSystem->HostName());
00087    if (fImage.Length() == 0) {
00088       Error("StartSlaves", "Empty Condor image found for system %s",
00089             gSystem->HostName());
00090       return kFALSE;
00091    }
00092 
00093    TList claims;
00094    if (fConfFile.IsNull()) {
00095       // startup all slaves if no config file given
00096       TList *condorclaims = fCondor->Claim(9999, jobad);
00097       TIter nextclaim(condorclaims);
00098       while (TObject *o = nextclaim()) claims.Add(o);
00099    } else {
00100       // parse config file
00101       TProofResourcesStatic *resources = new TProofResourcesStatic(fConfDir, fConfFile);
00102       fConfFile = resources->GetFileName(); // Update the global file name (with path)
00103       PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
00104 
00105       // Get all workers
00106       TList *workerList = resources->GetWorkers();
00107       if (workerList->GetSize() == 0) {
00108          Error("StartSlaves", "Found no condorworkers in %s", fConfFile.Data());
00109          return kFALSE;
00110       }
00111 
00112       // check for valid slave lines and claim condor nodes
00113       Int_t ord = 0;
00114 
00115       // Loop over all workers and start them
00116       TListIter next(workerList);
00117       TObject *to;
00118       TProofNodeInfo *worker;
00119       int nSlavesDone = 0;
00120       while ((to = next())) {
00121          // Get the next worker from the list
00122          worker = (TProofNodeInfo *)to;
00123 
00124          // Read back worker node info
00125          const Char_t *image = worker->GetImage().Data();
00126          const Char_t *workdir = worker->GetWorkDir().Data();
00127          Int_t perfidx = worker->GetPerfIndex();
00128 
00129          gSystem->Sleep(10 /* ms */);
00130          TCondorSlave* csl = fCondor->Claim(worker->GetNodeName().Data(), jobad);
00131          if (csl) {
00132             csl->fPerfIdx = perfidx;
00133             csl->fImage = image;
00134             csl->fWorkDir = gSystem->ExpandPathName(workdir);
00135             TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
00136             csl->fOrdinal = fullord.Data();
00137             claims.Add(csl);
00138             ord++;
00139          }
00140 
00141          // Notify claim creation
00142          nSlavesDone++;
00143          TMessage m(kPROOF_SERVERSTARTED);
00144          m << TString("Creating COD Claim") << workerList->GetSize()
00145          << nSlavesDone << (csl != 0);
00146          gProofServ->GetSocket()->Send(m);
00147 
00148       } // end while (worker loop)
00149 
00150       // Cleanup
00151       delete resources;
00152       resources = 0;
00153    } // end else (parse config file)
00154 
00155    Long_t delay = 500; // timer delay 0.5s
00156    Int_t ntries = 20; // allow 20 tries (must be > 1 for algorithm to work)
00157    Int_t trial = 1;
00158    Int_t idx = 0;
00159 
00160    int nClaims = claims.GetSize();
00161    int nClaimsDone = 0;
00162    while (claims.GetSize() > 0) {
00163       TCondorSlave* c = 0;
00164 
00165       // Get Condor Slave
00166       if (trial == 1) {
00167          c = dynamic_cast<TCondorSlave*>(claims.At(idx));
00168       } else {
00169          TPair *p = dynamic_cast<TPair*>(claims.At(idx));
00170          if (p) {
00171             TTimer *t = dynamic_cast<TTimer*>(p->Value());
00172             if (t) {
00173                // wait remaining time
00174                Long64_t wait = t->GetAbsTime()-gSystem->Now();
00175                if (wait > 0) gSystem->Sleep((UInt_t)wait);
00176                c = dynamic_cast<TCondorSlave*>(p->Key());
00177             }
00178          }
00179       }
00180 
00181       // create slave
00182       TSlave *slave = 0;
00183       if (c) slave = CreateSlave(Form("%s:%d", c->fHostname.Data(), c->fPort), c->fOrdinal,
00184                                                c->fPerfIdx, c->fImage, c->fWorkDir);
00185 
00186       // add slave to appropriate list
00187       if (trial < ntries) {
00188          if (slave && slave->IsValid()) {
00189             fSlaves->Add(slave);
00190             if (trial == 1) {
00191                claims.Remove(c);
00192             } else {
00193                TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
00194                if (p) {
00195                   TTimer *xt = dynamic_cast<TTimer*>(p->Value());
00196                   if (xt) delete xt;
00197                   delete p;
00198                }
00199             }
00200             nClaimsDone++;
00201             TMessage m(kPROOF_SERVERSTARTED);
00202             m << TString("Opening connections to workers") << nClaims
00203                << nClaimsDone << kTRUE;
00204             gProofServ->GetSocket()->Send(m);
00205          } else if (slave) {
00206             if (trial == 1) {
00207                TTimer* timer = new TTimer(delay);
00208                TPair *p = new TPair(c, timer);
00209                claims.RemoveAt(idx);
00210                claims.AddAt(p, idx);
00211             } else {
00212                TPair *p = dynamic_cast<TPair*>(claims.At(idx));
00213                if (p && p->Value()) {
00214                   TTimer *xt = dynamic_cast<TTimer*>(p->Value());
00215                   if (xt) xt->Reset();
00216                }
00217             }
00218             delete slave;
00219             idx++;
00220          } else {
00221             Warning("StartSlaves", "could not create TSlave object!");
00222          }
00223       } else {
00224          if (slave) {
00225             fSlaves->Add(slave);
00226             TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
00227             if (p && p->Value()) {
00228                TTimer *xt = dynamic_cast<TTimer*>(p->Value());
00229                delete xt;
00230             }
00231             if (p) delete p;
00232 
00233             nClaimsDone++;
00234             TMessage m(kPROOF_SERVERSTARTED);
00235             m << TString("Opening connections to workers") << nClaims
00236                << nClaimsDone << slave->IsValid();
00237             gProofServ->GetSocket()->Send(m);
00238          } else {
00239             Warning("StartSlaves", "could not create TSlave object!");
00240          }
00241       }
00242 
00243       if (idx>=claims.GetSize()) {
00244          trial++;
00245          idx = 0;
00246       }
00247    }
00248 
00249    // Here we finalize the server startup: in this way the bulk
00250    // of remote operations are almost parallelized
00251    TIter nxsl(fSlaves);
00252    TSlave *sl = 0;
00253    int nSlavesDone = 0, nSlavesTotal = fSlaves->GetSize();
00254    while ((sl = (TSlave *) nxsl())) {
00255 
00256       // Finalize setup of the server
00257       if (sl->IsValid()) {
00258          sl->SetupServ(TSlave::kSlave, 0);
00259       }
00260 
00261       if (sl->IsValid()) {
00262          fAllMonitor->Add(sl->GetSocket());
00263       } else {
00264          fBadSlaves->Add(sl);
00265       }
00266 
00267       // Notify end of startup operations
00268       nSlavesDone++;
00269       TMessage m(kPROOF_SERVERSTARTED);
00270       m << TString("Setting up worker servers") << nSlavesTotal
00271          << nSlavesDone << sl->IsValid();
00272       gProofServ->GetSocket()->Send(m);
00273    }
00274 
00275    return kTRUE;
00276 }
00277 
00278 //______________________________________________________________________________
00279 void TProofCondor::SetActive(Bool_t active)
00280 {
00281    // Suspend or resume PROOF via Condor.
00282 
00283    if (fTimer == 0) {
00284       fTimer = new TTimer();
00285    }
00286    if (active) {
00287       PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
00288       fTimer->Stop();
00289       if (fCondor->GetState() == TCondor::kSuspended)
00290          fCondor->Resume();
00291    } else {
00292 #if 1
00293       return; // don't suspend for the moment
00294 #else
00295       Int_t delay = 60000; // milli seconds
00296       PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec / to %lld) --",
00297                           delay, delay + Long64_t(gSystem->Now()));
00298       fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
00299       fTimer->Start(10000, kTRUE); // single shot
00300 #endif
00301    }
00302 }
00303 
00304 //______________________________________________________________________________
00305 TString TProofCondor::GetJobAd()
00306 {
00307    // Get job Ad
00308 
00309    TString ad;
00310 
00311    ad = "JobUniverse = 5\n"; // vanilla
00312    ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
00313    ad += Form("Iwd = \"%s\"\n", gSystem->TempDirectory());
00314    ad += "In = \"/dev/null\"\n";
00315    ad += Form("Out = \"%s/proofd.out.$(Port)\"\n", gSystem->TempDirectory());
00316    ad += Form("Err = \"%s/proofd.err.$(Port)\"\n", gSystem->TempDirectory());
00317    ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());
00318 
00319    return ad;
00320 }

Generated on Tue Jul 5 14:51:38 2011 for ROOT_528-00b_version by  doxygen 1.5.1