TProofPEAC.cxx

Go to the documentation of this file.
00001 // @(#)root/peac:$Id: TProofPEAC.cxx 25918 2008-10-22 15:00:04Z ganis $
00002 // Author: Maarten Ballintijn    21/10/2004
00003 // Author: Kris Gulbrandsen      21/10/2004
00004 
00005 /*************************************************************************
00006  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00007  * All rights reserved.                                                  *
00008  *                                                                       *
00009  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00010  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00011  *************************************************************************/
00012 
00013 //////////////////////////////////////////////////////////////////////////
00014 //                                                                      //
00015 // TProofPEAC                                                           //
00016 //                                                                      //
00017 // This class implements the setup of a PROOF session using PEAC        //
00018 //                                                                      //
00019 //////////////////////////////////////////////////////////////////////////
00020 
00021 #include "TClarens.h"
00022 #include "TCondor.h"
00023 #include "TDSet.h"
00024 #include "TEnv.h"
00025 #include "TError.h"
00026 #include "TList.h"
00027 #include "TLM.h"
00028 #include "TMonitor.h"
00029 #include "TProofPEAC.h"
00030 #include "TProofServ.h"
00031 #include "TSlave.h"
00032 #include "TSystem.h"
00033 #include "TTimer.h"
00034 #include "TUrl.h"
00035 
00036 ClassImp(TProofPEAC)
00037 
00038 
00039 //______________________________________________________________________________
00040 TProofPEAC::TProofPEAC(const char *masterurl, const char *sessionid,
00041                        const char *confdir, Int_t loglevel,
00042                        const char *, TProofMgr *mgr)
00043    : fCondor(0), fTimer(0)
00044 {
00045    // Start PEAC proof session
00046 
00047    // This may be needed during init
00048    fManager = mgr;
00049 
00050    if (!strncasecmp(sessionid, "peac:", 5))
00051       sessionid+=5;
00052 
00053    Init(masterurl, sessionid, confdir, loglevel);
00054 
00055 }
00056 
00057 //______________________________________________________________________________
00058 TProofPEAC::~TProofPEAC()
00059 {
00060    // Destroy PEAC proof session
00061 
00062    delete fCondor;
00063    delete fTimer;
00064    if (fLM) {
00065       delete fHeartbeatTimer;
00066       fHeartbeatTimer = 0;
00067       fLM->EndSession(fSession);
00068       delete fLM;
00069       fLM = 0;
00070    }
00071 
00072 }
00073 
00074 //------------------------------------------------------------------------------
00075 Bool_t TProofPEAC::StartSlaves(Bool_t,Bool_t)
00076 {
00077 
00078    if (IsMaster()) {
00079 
00080       TClarens::Init();
00081       const Char_t *lmUrl = gEnv->GetValue("PEAC.LmUrl",
00082                                            "http://localhost:8080/clarens/");
00083       fLM = gClarens->CreateLM(lmUrl);
00084       if (!fLM) {
00085          Error("StartSlaves", "Could not connect to local manager for url '%s'",
00086                               lmUrl);
00087          return kFALSE;
00088       }
00089 
00090       TUrl url(lmUrl);
00091       TString lm = url.GetHost();
00092       Int_t lmPort = url.GetPort();
00093       fSession = fConfFile;
00094 
00095       PDB(kGlobal,1) Info("StartSlaves", "PEAC mode: host: %s  port: %d  session: %s",
00096                           lm.Data(), lmPort, fSession.Data());
00097 
00098       TList* config = 0;
00099       if(!fLM->StartSession(fSession, config, fHBPeriod)) {
00100          Error("StartSlaves", "Could not start session '%s' for local manager '%s'",
00101                               fSession.Data(), lmUrl);
00102          return kFALSE;
00103       }
00104 
00105       TList csl;
00106 
00107       TIter NextSlave(config);
00108       Int_t ord = 0;
00109       TString jobad;
00110       while (TLM::TSlaveParams *sp = dynamic_cast<TLM::TSlaveParams*>(NextSlave())) {
00111 
00112          PDB(kGlobal,1) Info("StartSlaves", "node: %s", sp->fNode.Data());
00113 
00114          // create slave server
00115 
00116          if (sp->fType == "inetd") {
00117             TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
00118             ord++;
00119             TSlave *slave = CreateSlave(sp->fNode, fullord,
00120                                         sp->fPerfidx, sp->fImg, Form("~/%s", kPROOF_WorkDir));
00121             fSlaves->Add(slave);
00122             if (slave->IsValid()) {
00123                fAllMonitor->Add(slave->GetSocket());
00124                PDB(kGlobal,3)
00125                  Info("StartSlaves", "slave on host %s created and added to list",
00126                       sp->fNode.Data());
00127             } else {
00128                fBadSlaves->Add(slave);
00129                PDB(kGlobal,3)
00130                  Info("StartSlaves", "slave on host %s created and added to list of bad slaves",
00131                       sp->fNode.Data());
00132             }
00133          } else if (sp->fType == "cod") {
00134             if (fCondor == 0) {
00135                fCondor = new TCondor;
00136                jobad = GetJobAd();
00137 
00138                fImage = fCondor->GetImage(gSystem->HostName());
00139                if (fImage.Length() == 0) {
00140                   Error("StartSlaves", "no image found for node %s",
00141                         gSystem->HostName());
00142                   delete fCondor;
00143                   fCondor = 0;
00144                }
00145             }
00146 
00147             if (fCondor != 0) {
00148                TCondorSlave *c = fCondor->Claim(sp->fNode, jobad);
00149 
00150                if (c != 0) {
00151                   csl.Add(c);
00152                } else {
00153                   Info("StartSlaves", "node: %s not claimed", sp->fNode.Data());
00154                }
00155             }
00156          } else {
00157             Error("StartSlaves", "unknown slave type (%s)", sp->fType.Data());
00158          }
00159       }
00160       delete config;
00161 
00162       TIter next(&csl);
00163       TCondorSlave *cs;
00164       while ((cs = (TCondorSlave*)next()) != 0) {
00165          // Get slave FQDN ...
00166          TString SlaveFqdn;
00167          TInetAddress SlaveAddr = gSystem->GetHostByName(cs->fHostname);
00168          if (SlaveAddr.IsValid())
00169             SlaveFqdn = SlaveAddr.GetHostName();
00170 
00171          // who do we believe for perf & img, Condor for the moment
00172          TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
00173          ord++;
00174          TSlave *slave = CreateSlave(cs->fHostname, fullord,
00175                                      cs->fPerfIdx, cs->fImage, Form("~/%s", kPROOF_WorkDir));
00176 
00177          fSlaves->Add(slave);
00178          if (slave->IsValid()) {
00179             fAllMonitor->Add(slave->GetSocket());
00180             PDB(kGlobal,3)
00181               Info("StartSlaves", "slave on host %s created and added to list (port %d)",
00182                cs->fHostname.Data(),cs->fPort);
00183          } else {
00184             fBadSlaves->Add(slave);
00185             PDB(kGlobal,3)
00186               Info("StartSlaves", "slave on host %s created and added to list of bad slaves (port %d)",
00187                 cs->fHostname.Data(),cs->fPort);
00188          }
00189       }
00190 
00191       //create and start heartbeat timer
00192       fHeartbeatTimer = new TTimer;
00193       fHeartbeatTimer->Connect("Timeout()", "TProofPEAC", this, "SendHeartbeat()");
00194       fHeartbeatTimer->Start(fHBPeriod*1000, kFALSE);
00195    } else {
00196       return TProof::StartSlaves(kTRUE);
00197    }
00198 
00199    return kTRUE;
00200 }
00201 
00202 //______________________________________________________________________________
00203 void TProofPEAC::Close(Option_t *option)
00204 {
00205 
00206    TProof::Close(option);
00207 
00208    if (fLM) {
00209       delete fHeartbeatTimer;
00210       fHeartbeatTimer = 0;
00211       fLM->EndSession(fSession);
00212       delete fLM;
00213       fLM = 0;
00214    }
00215 
00216 }
00217 
00218 //______________________________________________________________________________
00219 void TProofPEAC::SetActive(Bool_t active)
00220 {
00221    // Suspend or resume PROOF via Condor.
00222 
00223    if (fCondor) {
00224       if (fTimer == 0) {
00225          fTimer = new TTimer();
00226       }
00227       if (active) {
00228          PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
00229          fTimer->Stop();
00230          if (fCondor->GetState() == TCondor::kSuspended)
00231             fCondor->Resume();
00232       } else {
00233          Int_t delay = 10000; // milli seconds
00234          PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec) --", delay);
00235          fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
00236          fTimer->Start(10000, kTRUE); // single shot
00237       }
00238    }
00239 }
00240 
00241 //______________________________________________________________________________
00242 TString TProofPEAC::GetJobAd()
00243 {
00244    TString ad;
00245 
00246    ad = "JobUniverse = 5\n"; // vanilla
00247    ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
00248    ad += "Iwd = \"/tmp\"\n";
00249    ad += "In = \"/dev/null\"\n";
00250    ad += "Out = \"/tmp/proofd.out.$(Port)\"\n";
00251    ad += "Err = \"/tmp/proofd.err.$(Port)\"\n";
00252    ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());
00253 
00254    return ad;
00255 }
00256 
00257 //______________________________________________________________________________
00258 Bool_t TProofPEAC::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
00259 {
00260    Bool_t dataready = kFALSE;
00261    if (IsMaster()) {
00262       dataready = fLM ? fLM->DataReady(fSession, bytesready, totalbytes) : kFALSE;
00263       ////// for testing
00264       //static Long64_t total = 10;
00265       //static Long64_t ready = -1;
00266       //ready++;
00267       //totalbytes=total;
00268       //bytesready=ready;
00269       if (totalbytes>bytesready) dataready=kFALSE;
00270       //////
00271    } else {
00272       dataready = TProof::IsDataReady(totalbytes, bytesready);
00273    }
00274    return dataready;
00275 }
00276 
00277 //______________________________________________________________________________
00278 void TProofPEAC::SendHeartbeat()
00279 {
00280    if (fLM) fLM->Heartbeat(fSession);
00281 }

Generated on Tue Jul 5 14:50:51 2011 for ROOT_528-00b_version by  doxygen 1.5.1