00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "TProofCondor.h"
00024
00025 #include "TCondor.h"
00026 #include "TList.h"
00027 #include "TMap.h"
00028 #include "TMessage.h"
00029 #include "TMonitor.h"
00030 #include "TProofNodeInfo.h"
00031 #include "TProofResourcesStatic.h"
00032 #include "TProofServ.h"
00033 #include "TSlave.h"
00034 #include "TSocket.h"
00035 #include "TString.h"
00036 #include "TTimer.h"
00037
00038 ClassImp(TProofCondor)
00039
00040
00041 TProofCondor::TProofCondor(const char *masterurl, const char *conffile,
00042 const char *confdir, Int_t loglevel,
00043 const char *, TProofMgr *mgr)
00044 : fCondor(0), fTimer(0)
00045 {
00046
00047
00048
00049 InitMembers();
00050
00051
00052 fManager = mgr;
00053
00054 fUrl = TUrl(masterurl);
00055
00056 if (!conffile || strlen(conffile) == 0) {
00057 conffile = kPROOF_ConfFile;
00058 } else if (!strncasecmp(conffile, "condor:", 7)) {
00059 conffile+=7;
00060 }
00061
00062 if (!confdir || strlen(confdir) == 0) {
00063 confdir = kPROOF_ConfDir;
00064 }
00065
00066 Init(masterurl, conffile, confdir, loglevel);
00067 }
00068
00069
00070 TProofCondor::~TProofCondor()
00071 {
00072
00073
00074 SafeDelete(fCondor);
00075 SafeDelete(fTimer);
00076 }
00077
00078
00079 Bool_t TProofCondor::StartSlaves(Bool_t)
00080 {
00081
00082
00083 fCondor = new TCondor;
00084 TString jobad = GetJobAd();
00085
00086 fImage = fCondor->GetImage(gSystem->HostName());
00087 if (fImage.Length() == 0) {
00088 Error("StartSlaves", "Empty Condor image found for system %s",
00089 gSystem->HostName());
00090 return kFALSE;
00091 }
00092
00093 TList claims;
00094 if (fConfFile.IsNull()) {
00095
00096 TList *condorclaims = fCondor->Claim(9999, jobad);
00097 TIter nextclaim(condorclaims);
00098 while (TObject *o = nextclaim()) claims.Add(o);
00099 } else {
00100
00101 TProofResourcesStatic *resources = new TProofResourcesStatic(fConfDir, fConfFile);
00102 fConfFile = resources->GetFileName();
00103 PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
00104
00105
00106 TList *workerList = resources->GetWorkers();
00107 if (workerList->GetSize() == 0) {
00108 Error("StartSlaves", "Found no condorworkers in %s", fConfFile.Data());
00109 return kFALSE;
00110 }
00111
00112
00113 Int_t ord = 0;
00114
00115
00116 TListIter next(workerList);
00117 TObject *to;
00118 TProofNodeInfo *worker;
00119 int nSlavesDone = 0;
00120 while ((to = next())) {
00121
00122 worker = (TProofNodeInfo *)to;
00123
00124
00125 const Char_t *image = worker->GetImage().Data();
00126 const Char_t *workdir = worker->GetWorkDir().Data();
00127 Int_t perfidx = worker->GetPerfIndex();
00128
00129 gSystem->Sleep(10 );
00130 TCondorSlave* csl = fCondor->Claim(worker->GetNodeName().Data(), jobad);
00131 if (csl) {
00132 csl->fPerfIdx = perfidx;
00133 csl->fImage = image;
00134 csl->fWorkDir = gSystem->ExpandPathName(workdir);
00135 TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
00136 csl->fOrdinal = fullord.Data();
00137 claims.Add(csl);
00138 ord++;
00139 }
00140
00141
00142 nSlavesDone++;
00143 TMessage m(kPROOF_SERVERSTARTED);
00144 m << TString("Creating COD Claim") << workerList->GetSize()
00145 << nSlavesDone << (csl != 0);
00146 gProofServ->GetSocket()->Send(m);
00147
00148 }
00149
00150
00151 delete resources;
00152 resources = 0;
00153 }
00154
00155 Long_t delay = 500;
00156 Int_t ntries = 20;
00157 Int_t trial = 1;
00158 Int_t idx = 0;
00159
00160 int nClaims = claims.GetSize();
00161 int nClaimsDone = 0;
00162 while (claims.GetSize() > 0) {
00163 TCondorSlave* c = 0;
00164
00165
00166 if (trial == 1) {
00167 c = dynamic_cast<TCondorSlave*>(claims.At(idx));
00168 } else {
00169 TPair *p = dynamic_cast<TPair*>(claims.At(idx));
00170 if (p) {
00171 TTimer *t = dynamic_cast<TTimer*>(p->Value());
00172 if (t) {
00173
00174 Long64_t wait = t->GetAbsTime()-gSystem->Now();
00175 if (wait > 0) gSystem->Sleep((UInt_t)wait);
00176 c = dynamic_cast<TCondorSlave*>(p->Key());
00177 }
00178 }
00179 }
00180
00181
00182 TSlave *slave = 0;
00183 if (c) slave = CreateSlave(Form("%s:%d", c->fHostname.Data(), c->fPort), c->fOrdinal,
00184 c->fPerfIdx, c->fImage, c->fWorkDir);
00185
00186
00187 if (trial < ntries) {
00188 if (slave && slave->IsValid()) {
00189 fSlaves->Add(slave);
00190 if (trial == 1) {
00191 claims.Remove(c);
00192 } else {
00193 TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
00194 if (p) {
00195 TTimer *xt = dynamic_cast<TTimer*>(p->Value());
00196 if (xt) delete xt;
00197 delete p;
00198 }
00199 }
00200 nClaimsDone++;
00201 TMessage m(kPROOF_SERVERSTARTED);
00202 m << TString("Opening connections to workers") << nClaims
00203 << nClaimsDone << kTRUE;
00204 gProofServ->GetSocket()->Send(m);
00205 } else if (slave) {
00206 if (trial == 1) {
00207 TTimer* timer = new TTimer(delay);
00208 TPair *p = new TPair(c, timer);
00209 claims.RemoveAt(idx);
00210 claims.AddAt(p, idx);
00211 } else {
00212 TPair *p = dynamic_cast<TPair*>(claims.At(idx));
00213 if (p && p->Value()) {
00214 TTimer *xt = dynamic_cast<TTimer*>(p->Value());
00215 if (xt) xt->Reset();
00216 }
00217 }
00218 delete slave;
00219 idx++;
00220 } else {
00221 Warning("StartSlaves", "could not create TSlave object!");
00222 }
00223 } else {
00224 if (slave) {
00225 fSlaves->Add(slave);
00226 TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
00227 if (p && p->Value()) {
00228 TTimer *xt = dynamic_cast<TTimer*>(p->Value());
00229 delete xt;
00230 }
00231 if (p) delete p;
00232
00233 nClaimsDone++;
00234 TMessage m(kPROOF_SERVERSTARTED);
00235 m << TString("Opening connections to workers") << nClaims
00236 << nClaimsDone << slave->IsValid();
00237 gProofServ->GetSocket()->Send(m);
00238 } else {
00239 Warning("StartSlaves", "could not create TSlave object!");
00240 }
00241 }
00242
00243 if (idx>=claims.GetSize()) {
00244 trial++;
00245 idx = 0;
00246 }
00247 }
00248
00249
00250
00251 TIter nxsl(fSlaves);
00252 TSlave *sl = 0;
00253 int nSlavesDone = 0, nSlavesTotal = fSlaves->GetSize();
00254 while ((sl = (TSlave *) nxsl())) {
00255
00256
00257 if (sl->IsValid()) {
00258 sl->SetupServ(TSlave::kSlave, 0);
00259 }
00260
00261 if (sl->IsValid()) {
00262 fAllMonitor->Add(sl->GetSocket());
00263 } else {
00264 fBadSlaves->Add(sl);
00265 }
00266
00267
00268 nSlavesDone++;
00269 TMessage m(kPROOF_SERVERSTARTED);
00270 m << TString("Setting up worker servers") << nSlavesTotal
00271 << nSlavesDone << sl->IsValid();
00272 gProofServ->GetSocket()->Send(m);
00273 }
00274
00275 return kTRUE;
00276 }
00277
00278
00279 void TProofCondor::SetActive(Bool_t active)
00280 {
00281
00282
00283 if (fTimer == 0) {
00284 fTimer = new TTimer();
00285 }
00286 if (active) {
00287 PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
00288 fTimer->Stop();
00289 if (fCondor->GetState() == TCondor::kSuspended)
00290 fCondor->Resume();
00291 } else {
00292 #if 1
00293 return;
00294 #else
00295 Int_t delay = 60000;
00296 PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec / to %lld) --",
00297 delay, delay + Long64_t(gSystem->Now()));
00298 fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
00299 fTimer->Start(10000, kTRUE);
00300 #endif
00301 }
00302 }
00303
00304
00305 TString TProofCondor::GetJobAd()
00306 {
00307
00308
00309 TString ad;
00310
00311 ad = "JobUniverse = 5\n";
00312 ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
00313 ad += Form("Iwd = \"%s\"\n", gSystem->TempDirectory());
00314 ad += "In = \"/dev/null\"\n";
00315 ad += Form("Out = \"%s/proofd.out.$(Port)\"\n", gSystem->TempDirectory());
00316 ad += Form("Err = \"%s/proofd.err.$(Port)\"\n", gSystem->TempDirectory());
00317 ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());
00318
00319 return ad;
00320 }