00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "TClarens.h"
00022 #include "TCondor.h"
00023 #include "TDSet.h"
00024 #include "TEnv.h"
00025 #include "TError.h"
00026 #include "TList.h"
00027 #include "TLM.h"
00028 #include "TMonitor.h"
00029 #include "TProofPEAC.h"
00030 #include "TProofServ.h"
00031 #include "TSlave.h"
00032 #include "TSystem.h"
00033 #include "TTimer.h"
00034 #include "TUrl.h"
00035
00036 ClassImp(TProofPEAC)
00037
00038
00039
00040 TProofPEAC::TProofPEAC(const char *masterurl, const char *sessionid,
00041 const char *confdir, Int_t loglevel,
00042 const char *, TProofMgr *mgr)
00043 : fCondor(0), fTimer(0)
00044 {
00045
00046
00047
00048 fManager = mgr;
00049
00050 if (!strncasecmp(sessionid, "peac:", 5))
00051 sessionid+=5;
00052
00053 Init(masterurl, sessionid, confdir, loglevel);
00054
00055 }
00056
00057
00058 TProofPEAC::~TProofPEAC()
00059 {
00060
00061
00062 delete fCondor;
00063 delete fTimer;
00064 if (fLM) {
00065 delete fHeartbeatTimer;
00066 fHeartbeatTimer = 0;
00067 fLM->EndSession(fSession);
00068 delete fLM;
00069 fLM = 0;
00070 }
00071
00072 }
00073
00074
00075 Bool_t TProofPEAC::StartSlaves(Bool_t,Bool_t)
00076 {
00077
00078 if (IsMaster()) {
00079
00080 TClarens::Init();
00081 const Char_t *lmUrl = gEnv->GetValue("PEAC.LmUrl",
00082 "http://localhost:8080/clarens/");
00083 fLM = gClarens->CreateLM(lmUrl);
00084 if (!fLM) {
00085 Error("StartSlaves", "Could not connect to local manager for url '%s'",
00086 lmUrl);
00087 return kFALSE;
00088 }
00089
00090 TUrl url(lmUrl);
00091 TString lm = url.GetHost();
00092 Int_t lmPort = url.GetPort();
00093 fSession = fConfFile;
00094
00095 PDB(kGlobal,1) Info("StartSlaves", "PEAC mode: host: %s port: %d session: %s",
00096 lm.Data(), lmPort, fSession.Data());
00097
00098 TList* config = 0;
00099 if(!fLM->StartSession(fSession, config, fHBPeriod)) {
00100 Error("StartSlaves", "Could not start session '%s' for local manager '%s'",
00101 fSession.Data(), lmUrl);
00102 return kFALSE;
00103 }
00104
00105 TList csl;
00106
00107 TIter NextSlave(config);
00108 Int_t ord = 0;
00109 TString jobad;
00110 while (TLM::TSlaveParams *sp = dynamic_cast<TLM::TSlaveParams*>(NextSlave())) {
00111
00112 PDB(kGlobal,1) Info("StartSlaves", "node: %s", sp->fNode.Data());
00113
00114
00115
00116 if (sp->fType == "inetd") {
00117 TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
00118 ord++;
00119 TSlave *slave = CreateSlave(sp->fNode, fullord,
00120 sp->fPerfidx, sp->fImg, Form("~/%s", kPROOF_WorkDir));
00121 fSlaves->Add(slave);
00122 if (slave->IsValid()) {
00123 fAllMonitor->Add(slave->GetSocket());
00124 PDB(kGlobal,3)
00125 Info("StartSlaves", "slave on host %s created and added to list",
00126 sp->fNode.Data());
00127 } else {
00128 fBadSlaves->Add(slave);
00129 PDB(kGlobal,3)
00130 Info("StartSlaves", "slave on host %s created and added to list of bad slaves",
00131 sp->fNode.Data());
00132 }
00133 } else if (sp->fType == "cod") {
00134 if (fCondor == 0) {
00135 fCondor = new TCondor;
00136 jobad = GetJobAd();
00137
00138 fImage = fCondor->GetImage(gSystem->HostName());
00139 if (fImage.Length() == 0) {
00140 Error("StartSlaves", "no image found for node %s",
00141 gSystem->HostName());
00142 delete fCondor;
00143 fCondor = 0;
00144 }
00145 }
00146
00147 if (fCondor != 0) {
00148 TCondorSlave *c = fCondor->Claim(sp->fNode, jobad);
00149
00150 if (c != 0) {
00151 csl.Add(c);
00152 } else {
00153 Info("StartSlaves", "node: %s not claimed", sp->fNode.Data());
00154 }
00155 }
00156 } else {
00157 Error("StartSlaves", "unknown slave type (%s)", sp->fType.Data());
00158 }
00159 }
00160 delete config;
00161
00162 TIter next(&csl);
00163 TCondorSlave *cs;
00164 while ((cs = (TCondorSlave*)next()) != 0) {
00165
00166 TString SlaveFqdn;
00167 TInetAddress SlaveAddr = gSystem->GetHostByName(cs->fHostname);
00168 if (SlaveAddr.IsValid())
00169 SlaveFqdn = SlaveAddr.GetHostName();
00170
00171
00172 TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
00173 ord++;
00174 TSlave *slave = CreateSlave(cs->fHostname, fullord,
00175 cs->fPerfIdx, cs->fImage, Form("~/%s", kPROOF_WorkDir));
00176
00177 fSlaves->Add(slave);
00178 if (slave->IsValid()) {
00179 fAllMonitor->Add(slave->GetSocket());
00180 PDB(kGlobal,3)
00181 Info("StartSlaves", "slave on host %s created and added to list (port %d)",
00182 cs->fHostname.Data(),cs->fPort);
00183 } else {
00184 fBadSlaves->Add(slave);
00185 PDB(kGlobal,3)
00186 Info("StartSlaves", "slave on host %s created and added to list of bad slaves (port %d)",
00187 cs->fHostname.Data(),cs->fPort);
00188 }
00189 }
00190
00191
00192 fHeartbeatTimer = new TTimer;
00193 fHeartbeatTimer->Connect("Timeout()", "TProofPEAC", this, "SendHeartbeat()");
00194 fHeartbeatTimer->Start(fHBPeriod*1000, kFALSE);
00195 } else {
00196 return TProof::StartSlaves(kTRUE);
00197 }
00198
00199 return kTRUE;
00200 }
00201
00202
00203 void TProofPEAC::Close(Option_t *option)
00204 {
00205
00206 TProof::Close(option);
00207
00208 if (fLM) {
00209 delete fHeartbeatTimer;
00210 fHeartbeatTimer = 0;
00211 fLM->EndSession(fSession);
00212 delete fLM;
00213 fLM = 0;
00214 }
00215
00216 }
00217
00218
00219 void TProofPEAC::SetActive(Bool_t active)
00220 {
00221
00222
00223 if (fCondor) {
00224 if (fTimer == 0) {
00225 fTimer = new TTimer();
00226 }
00227 if (active) {
00228 PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
00229 fTimer->Stop();
00230 if (fCondor->GetState() == TCondor::kSuspended)
00231 fCondor->Resume();
00232 } else {
00233 Int_t delay = 10000;
00234 PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec) --", delay);
00235 fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
00236 fTimer->Start(10000, kTRUE);
00237 }
00238 }
00239 }
00240
00241
00242 TString TProofPEAC::GetJobAd()
00243 {
00244 TString ad;
00245
00246 ad = "JobUniverse = 5\n";
00247 ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
00248 ad += "Iwd = \"/tmp\"\n";
00249 ad += "In = \"/dev/null\"\n";
00250 ad += "Out = \"/tmp/proofd.out.$(Port)\"\n";
00251 ad += "Err = \"/tmp/proofd.err.$(Port)\"\n";
00252 ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());
00253
00254 return ad;
00255 }
00256
00257
00258 Bool_t TProofPEAC::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
00259 {
00260 Bool_t dataready = kFALSE;
00261 if (IsMaster()) {
00262 dataready = fLM ? fLM->DataReady(fSession, bytesready, totalbytes) : kFALSE;
00263
00264
00265
00266
00267
00268
00269 if (totalbytes>bytesready) dataready=kFALSE;
00270
00271 } else {
00272 dataready = TProof::IsDataReady(totalbytes, bytesready);
00273 }
00274 return dataready;
00275 }
00276
00277
00278 void TProofPEAC::SendHeartbeat()
00279 {
00280 if (fLM) fLM->Heartbeat(fSession);
00281 }