00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "TProofSuperMaster.h"
00024 #include "TString.h"
00025 #include "TObjString.h"
00026 #include "TError.h"
00027 #include "TList.h"
00028 #include "TSortedList.h"
00029 #include "TSlave.h"
00030 #include "TMap.h"
00031 #include "TProofServ.h"
00032 #include "TSocket.h"
00033 #include "TMonitor.h"
00034 #include "TSemaphore.h"
00035 #include "TDSet.h"
00036 #include "TPluginManager.h"
00037 #include "TVirtualProofPlayer.h"
00038 #include "TMessage.h"
00039 #include "TUrl.h"
00040 #include "TProofResourcesStatic.h"
00041 #include "TProofNodeInfo.h"
00042 #include "TROOT.h"
00043
00044 ClassImp(TProofSuperMaster)
00045
00046
00047 TProofSuperMaster::TProofSuperMaster(const char *masterurl, const char *conffile,
00048 const char *confdir, Int_t loglevel,
00049 const char *alias, TProofMgr *mgr)
00050 {
00051
00052
00053
00054 InitMembers();
00055
00056
00057 fManager = mgr;
00058
00059 fUrl = TUrl(masterurl);
00060
00061 if (!conffile || strlen(conffile) == 0)
00062 conffile = kPROOF_ConfFile;
00063 else if (!strncasecmp(conffile, "sm:", 3))
00064 conffile+=3;
00065 if (!confdir || strlen(confdir) == 0)
00066 confdir = kPROOF_ConfDir;
00067
00068
00069 fMasterServ = kTRUE;
00070 ResetBit(TProof::kIsClient);
00071 SetBit(TProof::kIsMaster);
00072 SetBit(TProof::kIsTopMaster);
00073
00074 Init(masterurl, conffile, confdir, loglevel, alias);
00075
00076
00077 gROOT->GetListOfProofs()->Add(this);
00078 }
00079
00080
00081 Bool_t TProofSuperMaster::StartSlaves(Bool_t)
00082 {
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092 Int_t pc = 0;
00093 TList *submasterList = new TList;
00094
00095 if (gProofServ->GetWorkers(submasterList, pc) == TProofServ::kQueryStop) {
00096 Error("StartSlaves", "getting list of submaster nodes");
00097 return kFALSE;
00098 }
00099 fImage = gProofServ->GetImage();
00100 if (fImage.IsNull())
00101 fImage = Form("%s:%s", TUrl(gSystem->HostName()).GetHostFQDN(),
00102 gProofServ->GetWorkDir());
00103
00104 UInt_t nSubmasters = submasterList->GetSize();
00105 UInt_t nSubmastersDone = 0;
00106 Int_t ord = 0;
00107 TList validSubmasters;
00108 TList validPairs;
00109 validPairs.SetOwner();
00110
00111
00112 TListIter next(submasterList);
00113 TObject *to;
00114 TProofNodeInfo *submaster;
00115 while ((to = next())) {
00116
00117 submaster = (TProofNodeInfo *)to;
00118 const Char_t *conffile = submaster->GetConfig();
00119 const Char_t *image = submaster->GetImage();
00120 const Char_t *msd = submaster->GetMsd();
00121 Int_t sport = submaster->GetPort();
00122 if (sport == -1)
00123 sport = fUrl.GetPort();
00124
00125 TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
00126
00127
00128 TUrl u(Form("%s:%d", submaster->GetNodeName().Data(), sport));
00129
00130 if (strlen(gProofServ->GetGroup()) > 0) {
00131
00132 if (strlen(u.GetUser()) <= 0)
00133 u.SetUser(gProofServ->GetUser());
00134 u.SetPasswd(gProofServ->GetGroup());
00135 }
00136 TSlave *slave =
00137 CreateSubmaster(u.GetUrl(), fullord, image, msd);
00138
00139
00140
00141 Bool_t submasterOk = kTRUE;
00142 fSlaves->Add(slave);
00143 if (slave->IsValid()) {
00144 validPairs.Add(new TPair(slave, new TObjString(conffile)));
00145 } else {
00146 submasterOk = kFALSE;
00147 fBadSlaves->Add(slave);
00148 }
00149
00150 PDB(kGlobal,3)
00151 Info("StartSlaves","submaster on host %s created and"
00152 " added to list", submaster->GetNodeName().Data());
00153
00154
00155 nSubmastersDone++;
00156 TMessage m(kPROOF_SERVERSTARTED);
00157 m << TString("Opening connections to submasters") << nSubmasters
00158 << nSubmastersDone << submasterOk;
00159 gProofServ->GetSocket()->Send(m);
00160
00161 ord++;
00162
00163 }
00164
00165
00166 SafeDelete(submasterList);
00167
00168 nSubmastersDone = 0;
00169
00170
00171
00172 TIter nxsc(&validPairs);
00173 TPair *sc = 0;
00174 while ((sc = (TPair *) nxsc())) {
00175
00176 TSlave *sl = (TSlave *) sc->Key();
00177 TObjString *cf = (TObjString *) sc->Value();
00178 sl->SetupServ(TSlave::kMaster, cf->GetName());
00179
00180
00181 Bool_t submasterOk = kTRUE;
00182 if (sl->IsValid()) {
00183
00184
00185 if (fProtocol == 1) {
00186 Error("StartSlaves", "master and submaster protocols"
00187 " not compatible (%d and %d)",
00188 kPROOF_Protocol, fProtocol);
00189 submasterOk = kFALSE;
00190 fBadSlaves->Add(sl);
00191 } else {
00192 fAllMonitor->Add(sl->GetSocket());
00193 validSubmasters.Add(sl);
00194 }
00195 } else {
00196 submasterOk = kFALSE;
00197 fBadSlaves->Add(sl);
00198 }
00199
00200
00201 nSubmastersDone++;
00202 TMessage m(kPROOF_SERVERSTARTED);
00203 m << TString("Setting up submasters") << nSubmasters
00204 << nSubmastersDone << submasterOk;
00205 gProofServ->GetSocket()->Send(m);
00206 }
00207
00208 Collect(kAll);
00209 TIter nextSubmaster(&validSubmasters);
00210 while (TSlave* sl = dynamic_cast<TSlave*>(nextSubmaster())) {
00211 if (sl->GetStatus() == -99) {
00212 Error("StartSlaves", "not allowed to connect to PROOF master server");
00213 fBadSlaves->Add(sl);
00214 continue;
00215 }
00216
00217 if (!sl->IsValid()) {
00218 Error("StartSlaves", "failed to setup connection with PROOF master server");
00219 fBadSlaves->Add(sl);
00220 continue;
00221 }
00222 }
00223
00224 return kTRUE;
00225 }
00226
00227
00228 Long64_t TProofSuperMaster::Process(TDSet *set, const char *selector, Option_t *option,
00229 Long64_t nentries, Long64_t first)
00230 {
00231
00232
00233
00234
00235
00236
00237 if (!IsValid()) return -1;
00238
00239 R__ASSERT(GetPlayer());
00240
00241 if (GetProgressDialog())
00242 GetProgressDialog()->ExecPlugin(5, this, selector, set->GetListOfElements()->GetSize(),
00243 first, nentries);
00244
00245 return GetPlayer()->Process(set, selector, option, nentries, first);
00246 }
00247
00248
00249 void TProofSuperMaster::ValidateDSet(TDSet *dset)
00250 {
00251
00252
00253 if (dset->ElementsValid()) return;
00254
00255
00256 dset->ResetBit(TDSet::kValidityChecked);
00257 dset->ResetBit(TDSet::kSomeInvalid);
00258
00259 TList msds;
00260 msds.SetOwner();
00261
00262 TList smholder;
00263 smholder.SetOwner();
00264 TList elemholder;
00265 elemholder.SetOwner();
00266
00267
00268 TIter nextSubmaster(GetListOfActiveSlaves());
00269 while (TSlave *sl = dynamic_cast<TSlave*>(nextSubmaster())) {
00270 TList *smlist = 0;
00271 TPair *p = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
00272 if (!p) {
00273 smlist = new TList;
00274 smlist->SetName(sl->GetMsd());
00275
00276 smholder.Add(smlist);
00277 TList *elemlist = new TSortedList(kSortDescending);
00278 elemlist->SetName(TString(sl->GetMsd())+"_elem");
00279 elemholder.Add(elemlist);
00280 msds.Add(new TPair(smlist, elemlist));
00281 } else {
00282 smlist = dynamic_cast<TList*>(p->Key());
00283 }
00284 if (smlist) smlist->Add(sl);
00285 }
00286
00287 TIter nextElem(dset->GetListOfElements());
00288 while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
00289 if (elem->GetValid()) continue;
00290 TPair *p = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
00291 if (p && p->Value()) {
00292 TList *xl = dynamic_cast<TList*>(p->Value());
00293 if (xl) xl->Add(elem);
00294 } else {
00295 Error("ValidateDSet", "no mass storage domain '%s' associated"
00296 " with available submasters",
00297 elem->GetMsd());
00298 return;
00299 }
00300 }
00301
00302
00303 TList usedsms;
00304 TIter nextSM(&msds);
00305 SetDSet(dset);
00306 while (TPair *msd = dynamic_cast<TPair*>(nextSM())) {
00307 TList *sms = dynamic_cast<TList*>(msd->Key());
00308 TList *setelements = dynamic_cast<TList*>(msd->Value());
00309
00310
00311 Int_t nsms = sms ? sms->GetSize() : -1;
00312 Int_t nelements = setelements ? setelements->GetSize() : -1;
00313 for (Int_t i=0; i<nsms; i++) {
00314
00315 TDSet set(dset->GetType(), dset->GetObjName(),
00316 dset->GetDirectory());
00317 for (Int_t j = (i*nelements)/nsms;
00318 j < ((i+1)*nelements)/nsms;
00319 j++) {
00320 TDSetElement *elem = setelements ?
00321 dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
00322 if (elem) {
00323 set.Add(elem->GetFileName(), elem->GetObjName(),
00324 elem->GetDirectory(), elem->GetFirst(),
00325 elem->GetNum(), elem->GetMsd());
00326 }
00327 }
00328
00329 if (set.GetListOfElements()->GetSize()>0) {
00330 TMessage mesg(kPROOF_VALIDATE_DSET);
00331 mesg << &set;
00332
00333 TSlave *sl = dynamic_cast<TSlave*>(sms->At(i));
00334 if (sl) {
00335 PDB(kGlobal,1)
00336 Info("ValidateDSet",
00337 "Sending TDSet with %d elements to worker %s"
00338 " to be validated", set.GetListOfElements()->GetSize(),
00339 sl->GetOrdinal());
00340 sl->GetSocket()->Send(mesg);
00341 usedsms.Add(sl);
00342 } else {
00343 Warning("ValidateDSet", "not a TSlave object");
00344 }
00345 }
00346 }
00347 }
00348
00349 PDB(kGlobal,1)
00350 Info("ValidateDSet","Calling Collect");
00351 Collect(&usedsms);
00352 SetDSet(0);
00353 }
00354
00355
00356 TVirtualProofPlayer *TProofSuperMaster::MakePlayer(const char *player, TSocket *s)
00357 {
00358
00359
00360
00361
00362 if (!player)
00363 player = "sm";
00364
00365 SetPlayer(TVirtualProofPlayer::Create(player, this, s));
00366 return GetPlayer();
00367 }
00368