00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <stdlib.h>
00024
00025 #include "RConfigure.h"
00026 #include "TApplication.h"
00027 #include "TSlave.h"
00028 #include "TSlaveLite.h"
00029 #include "TProof.h"
00030 #include "TSystem.h"
00031 #include "TEnv.h"
00032 #include "TROOT.h"
00033 #include "TUrl.h"
00034 #include "TMessage.h"
00035 #include "TError.h"
00036 #include "TVirtualMutex.h"
00037 #include "TThread.h"
00038 #include "TSocket.h"
00039 #include "TObjString.h"
00040
00041 ClassImp(TSlave)
00042
00043
00044 TSlave_t TSlave::fgTXSlaveHook = 0;
00045
00046
00047 TSlave::TSlave(const char *url, const char *ord, Int_t perf,
00048 const char *image, TProof *proof, Int_t stype,
00049 const char *workdir, const char *msd)
00050 : fImage(image), fProofWorkDir(workdir),
00051 fWorkDir(workdir), fPort(-1),
00052 fOrdinal(ord), fPerfIdx(perf),
00053 fProtocol(0), fSocket(0), fProof(proof),
00054 fInput(0), fBytesRead(0), fRealTime(0),
00055 fCpuTime(0), fSlaveType((ESlaveType)stype), fStatus(TSlave::kInvalid),
00056 fParallel(0), fMsd(msd)
00057 {
00058
00059 fName = TUrl(url).GetHostFQDN();
00060 fPort = TUrl(url).GetPort();
00061
00062 Init(url, -1, stype);
00063 }
00064
00065
00066 TSlave::TSlave()
00067 {
00068
00069
00070 fPort = -1;
00071 fOrdinal = "-1";
00072 fPerfIdx = -1;
00073 fProof = 0;
00074 fSlaveType = kMaster;
00075 fProtocol = 0;
00076 fSocket = 0;
00077 fInput = 0;
00078 fBytesRead = 0;
00079 fRealTime = 0;
00080 fCpuTime = 0;
00081 fStatus = kInvalid;
00082 fParallel = 0;
00083 }
00084
00085
00086 void TSlave::Init(const char *host, Int_t port, Int_t stype)
00087 {
00088
00089
00090
00091
00092
00093
00094 TString proto = fProof->fUrl.GetProtocol();
00095 proto.Insert(5, 'd');
00096
00097 TUrl hurl(host);
00098 hurl.SetProtocol(proto);
00099 if (port > 0)
00100 hurl.SetPort(port);
00101
00102
00103 TString iam;
00104 if (fProof->IsMaster() && stype == kSlave) {
00105 iam = "Master";
00106 hurl.SetOptions("SM");
00107 } else if (fProof->IsMaster() && stype == kMaster) {
00108 iam = "Master";
00109 hurl.SetOptions("MM");
00110 } else if (!fProof->IsMaster() && stype == kMaster) {
00111 iam = "Local Client";
00112 hurl.SetOptions("MC");
00113 } else {
00114 Error("Init","Impossible PROOF <-> SlaveType Configuration Requested");
00115 R__ASSERT(0);
00116 }
00117
00118
00119
00120
00121
00122 Int_t wsize = 65536;
00123 fSocket = TSocket::CreateAuthSocket(hurl.GetUrl(), 0, wsize, fSocket);
00124
00125 if (!fSocket || !fSocket->IsAuthenticated()) {
00126 SafeDelete(fSocket);
00127 return;
00128 }
00129
00130
00131
00132
00133
00134 {
00135 R__LOCKGUARD2(gROOTMutex);
00136 gROOT->GetListOfSockets()->Remove(fSocket);
00137 }
00138
00139 R__LOCKGUARD2(gProofMutex);
00140
00141
00142 fUser = fSocket->GetSecContext()->GetUser();
00143 PDB(kGlobal,3) {
00144 Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
00145 }
00146
00147 if (fSocket->GetRemoteProtocol() >= 14 ) {
00148 TMessage m(kPROOF_SETENV);
00149
00150 const TList *envs = TProof::GetEnvVars();
00151 if (envs != 0 ) {
00152 TIter next(envs);
00153 for (TObject *o = next(); o != 0; o = next()) {
00154 TNamed *env = dynamic_cast<TNamed*>(o);
00155 if (env != 0) {
00156 TString def = Form("%s=%s", env->GetName(), env->GetTitle());
00157 const char *p = def.Data();
00158 m << p;
00159 }
00160 }
00161 }
00162 fSocket->Send(m);
00163 } else {
00164 Info("Init","** NOT ** Sending kPROOF_SETENV RemoteProtocol : %d",
00165 fSocket->GetRemoteProtocol());
00166 }
00167
00168 char buf[512];
00169 fSocket->Recv(buf, sizeof(buf));
00170 if (strcmp(buf, "Okay")) {
00171 Printf("%s", buf);
00172 SafeDelete(fSocket);
00173 return;
00174 }
00175
00176 }
00177
00178
00179 Int_t TSlave::SetupServ(Int_t stype, const char *conffile)
00180 {
00181
00182
00183
00184
00185
00186
00187 Int_t what;
00188 char buf[512];
00189 if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
00190 Error("SetupServ", "failed to receive slave startup message");
00191 SafeDelete(fSocket);
00192 return -1;
00193 }
00194
00195 if (what == kMESS_NOTOK) {
00196 SafeDelete(fSocket);
00197 return -1;
00198 }
00199
00200
00201
00202 if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
00203 Error("SetupServ", "failed to send local PROOF protocol");
00204 SafeDelete(fSocket);
00205 return -1;
00206 }
00207
00208 if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
00209 Error("SetupServ", "failed to receive remote PROOF protocol");
00210 SafeDelete(fSocket);
00211 return -1;
00212 }
00213
00214
00215 if (fProtocol < 4) {
00216 Error("SetupServ", "incompatible PROOF versions (remote version"
00217 " must be >= 4, is %d)", fProtocol);
00218 SafeDelete(fSocket);
00219 return -1;
00220 }
00221
00222 fProof->fProtocol = fProtocol;
00223
00224 if (fProtocol < 5) {
00225
00226
00227 Bool_t isMaster = (stype == kMaster);
00228 TString wconf = isMaster ? TString(conffile) : fProofWorkDir;
00229 if (OldAuthSetup(isMaster, wconf) != 0) {
00230 Error("SetupServ", "OldAuthSetup: failed to setup authentication");
00231 SafeDelete(fSocket);
00232 return -1;
00233 }
00234 } else {
00235
00236
00237 TMessage mess;
00238 if (stype == kMaster)
00239 mess << fUser << fOrdinal << TString(conffile);
00240 else
00241 mess << fUser << fOrdinal << fProofWorkDir;
00242
00243 if (fSocket->Send(mess) < 0) {
00244 Error("SetupServ", "failed to send ordinal and config info");
00245 SafeDelete(fSocket);
00246 return -1;
00247 }
00248 }
00249
00250
00251 fSocket->SetOption(kNoDelay, 1);
00252
00253
00254 fStatus = kActive;
00255
00256
00257 return 0;
00258 }
00259
00260
00261 void TSlave::Init(TSocket *s, Int_t stype)
00262 {
00263
00264
00265
00266 fSocket = s;
00267 TSlave::Init(s->GetInetAddress().GetHostName(), s->GetPort(), stype);
00268 }
00269
00270
00271 TSlave::~TSlave()
00272 {
00273
00274
00275 Close();
00276 }
00277
00278
00279 void TSlave::Close(Option_t *opt)
00280 {
00281
00282
00283 if (fSocket) {
00284
00285
00286 if (!(fProof->IsMaster()) && !strncasecmp(opt,"S",1)) {
00287
00288 Interrupt(TProof::kShutdownInterrupt);
00289 }
00290
00291
00292
00293 TSecContext *sc = fSocket->GetSecContext();
00294 if (sc && sc->IsActive()) {
00295 TIter last(sc->GetSecContextCleanup(), kIterBackward);
00296 TSecContextCleanup *nscc = 0;
00297 while ((nscc = (TSecContextCleanup *)last())) {
00298 if (nscc->GetType() == TSocket::kPROOFD &&
00299 nscc->GetProtocol() < 9) {
00300 sc->DeActivate("");
00301 break;
00302 }
00303 }
00304 }
00305 }
00306
00307 SafeDelete(fInput);
00308 SafeDelete(fSocket);
00309 }
00310
00311
00312 Int_t TSlave::Compare(const TObject *obj) const
00313 {
00314
00315
00316 const TSlave *sl = dynamic_cast<const TSlave*>(obj);
00317
00318 if (!sl) {
00319 Error("Compare", "input is not a TSlave object");
00320 return 0;
00321 }
00322
00323 if (fPerfIdx > sl->GetPerfIdx()) return 1;
00324 if (fPerfIdx < sl->GetPerfIdx()) return -1;
00325 const char *myord = GetOrdinal();
00326 const char *otherord = sl->GetOrdinal();
00327 while (myord && otherord) {
00328 Int_t myval = atoi(myord);
00329 Int_t otherval = atoi(otherord);
00330 if (myval < otherval) return 1;
00331 if (myval > otherval) return -1;
00332 myord = strchr(myord, '.');
00333 if (myord) myord++;
00334 otherord = strchr(otherord, '.');
00335 if (otherord) otherord++;
00336 }
00337 if (myord) return -1;
00338 if (otherord) return 1;
00339 return 0;
00340 }
00341
00342
00343 void TSlave::Print(Option_t *) const
00344 {
00345
00346
00347 TString sc;
00348
00349 const char *sst[] = { "invalid" , "valid", "inactive" };
00350 Int_t st = fSocket ? ((fStatus == kInactive) ? 2 : 1) : 0;
00351
00352 Printf("*** Worker %s (%s)", fOrdinal.Data(), sst[st]);
00353 Printf(" Host name: %s", GetName());
00354 Printf(" Port number: %d", GetPort());
00355 Printf(" Worker session tag: %s", GetSessionTag());
00356 Printf(" ROOT version|rev|tag: %s", GetROOTVersion());
00357 Printf(" Architecture-Compiler: %s", GetArchCompiler());
00358 if (fSocket) {
00359 if (strlen(GetGroup()) > 0) {
00360 Printf(" User/Group: %s/%s", GetUser(), GetGroup());
00361 } else {
00362 Printf(" User: %s", GetUser());
00363 }
00364 if (fSocket->GetSecContext())
00365 Printf(" Security context: %s", fSocket->GetSecContext()->AsString(sc));
00366 Printf(" Proofd protocol version: %d", fSocket->GetRemoteProtocol());
00367 Printf(" Image name: %s", GetImage());
00368 Printf(" Working directory: %s", GetWorkDir());
00369 Printf(" Performance index: %d", GetPerfIdx());
00370 Printf(" MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
00371 Printf(" MB's sent: %.2f", float(fSocket->GetBytesRecv())/(1024*1024));
00372 Printf(" MB's received: %.2f", float(fSocket->GetBytesSent())/(1024*1024));
00373 Printf(" Real time used (s): %.3f", GetRealTime());
00374 Printf(" CPU time used (s): %.3f", GetCpuTime());
00375 }
00376 }
00377
00378
00379 void TSlave::SetInputHandler(TFileHandler *ih)
00380 {
00381
00382
00383
00384 fInput = ih;
00385 fInput->Add();
00386 }
00387
00388
00389 Int_t TSlave::OldAuthSetup(Bool_t master, TString wconf)
00390 {
00391
00392
00393 static OldSlaveAuthSetup_t oldAuthSetupHook = 0;
00394
00395 if (!oldAuthSetupHook) {
00396
00397 TString authlib = "libRootAuth";
00398 char *p = 0;
00399
00400 if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
00401 delete[] p;
00402 if (gSystem->Load(authlib) == -1) {
00403 Error("OldAuthSetup", "can't load %s",authlib.Data());
00404 return kFALSE;
00405 }
00406 } else {
00407 Error("OldAuthSetup", "can't locate %s",authlib.Data());
00408 return -1;
00409 }
00410
00411
00412 Func_t f = gSystem->DynFindSymbol(authlib,"OldSlaveAuthSetup");
00413 if (f)
00414 oldAuthSetupHook = (OldSlaveAuthSetup_t)(f);
00415 else {
00416 Error("OldAuthSetup", "can't find OldSlaveAuthSetup");
00417 return -1;
00418 }
00419 }
00420
00421
00422 if (oldAuthSetupHook) {
00423 return (*oldAuthSetupHook)(fSocket, master, fOrdinal, wconf);
00424 } else {
00425 Error("OldAuthSetup", "hook to method OldSlaveAuthSetup is undefined");
00426 return -1;
00427 }
00428 }
00429
00430
00431 TSlave *TSlave::Create(const char *url, const char *ord, Int_t perf,
00432 const char *image, TProof *proof, Int_t stype,
00433 const char *workdir, const char *msd)
00434 {
00435
00436
00437
00438 TSlave *s = 0;
00439
00440
00441 if (!strcmp(url, "lite")) {
00442 return new TSlaveLite(ord, perf, image, proof, stype, workdir, msd);
00443 }
00444
00445
00446 Bool_t tryxpd = kTRUE;
00447 if (!(proof->IsMaster())) {
00448 if (proof->IsProofd())
00449 tryxpd = kFALSE;
00450 } else {
00451 if (gApplication &&
00452 (gApplication->Argc() < 3 || strncmp(gApplication->Argv(2),"xpd",3)))
00453 tryxpd = kFALSE;
00454 }
00455
00456
00457
00458 if (!fgTXSlaveHook) {
00459
00460
00461 TString proofxlib = "libProofx";
00462 char *p = 0;
00463 if ((p = gSystem->DynamicPathName(proofxlib, kTRUE))) {
00464 delete[] p;
00465 if (gSystem->Load(proofxlib) == -1)
00466 ::Error("TSlave::Create", "can't load %s", proofxlib.Data());
00467 } else
00468 ::Error("TSlave::Create", "can't locate %s", proofxlib.Data());
00469 }
00470
00471
00472 if (fgTXSlaveHook && tryxpd) {
00473 s = (*fgTXSlaveHook)(url, ord, perf, image, proof, stype, workdir, msd);
00474 } else {
00475 s = new TSlave(url, ord, perf, image, proof, stype, workdir, msd);
00476 }
00477
00478 return s;
00479 }
00480
00481
00482 Int_t TSlave::Ping()
00483 {
00484
00485
00486
00487 if (!IsValid()) return -1;
00488
00489 TMessage mess(kPROOF_PING | kMESS_ACK);
00490 fSocket->Send(mess);
00491 if (fSocket->Send(mess) == -1) {
00492 Warning("Ping","%s: acknowledgement not received", GetOrdinal());
00493 return -1;
00494 }
00495 return 0;
00496 }
00497
00498
00499 void TSlave::Interrupt(Int_t type)
00500 {
00501
00502
00503
00504 if (!IsValid()) return;
00505
00506 char oobc = (char) type;
00507 const int kBufSize = 1024;
00508 char waste[kBufSize];
00509
00510
00511 if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
00512 Error("Interrupt", "error sending oobc to slave %s", GetOrdinal());
00513 return;
00514 }
00515
00516 if (type == TProof::kHardInterrupt) {
00517 char oob_byte;
00518 int n, nch, nbytes = 0, nloop = 0;
00519
00520
00521 while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
00522 if (n == -2) {
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533 fSocket->GetOption(kBytesToRead, nch);
00534 if (nch == 0) {
00535 gSystem->Sleep(1000);
00536 continue;
00537 }
00538
00539 if (nch > kBufSize) nch = kBufSize;
00540 n = fSocket->RecvRaw(waste, nch);
00541 if (n <= 0) {
00542 Error("Interrupt", "error receiving waste from slave %s",
00543 GetOrdinal());
00544 break;
00545 }
00546 nbytes += n;
00547 } else if (n == -3) {
00548
00549
00550
00551 gSystem->Sleep(100);
00552 if (++nloop > 100) {
00553 Error("Interrupt", "server %s does not respond", GetOrdinal());
00554 break;
00555 }
00556 } else {
00557 Error("Interrupt", "error receiving OOB from server %s",
00558 GetOrdinal());
00559 break;
00560 }
00561 }
00562
00563
00564
00565
00566
00567 while (1) {
00568 int atmark;
00569
00570 fSocket->GetOption(kAtMark, atmark);
00571
00572 if (atmark)
00573 break;
00574
00575
00576 fSocket->GetOption(kBytesToRead, nch);
00577 if (nch == 0) {
00578 gSystem->Sleep(1000);
00579 continue;
00580 }
00581
00582 if (nch > kBufSize) nch = kBufSize;
00583 n = fSocket->RecvRaw(waste, nch);
00584 if (n <= 0) {
00585 Error("Interrupt", "error receiving waste (2) from slave %s",
00586 GetOrdinal());
00587 break;
00588 }
00589 nbytes += n;
00590 }
00591 if (nbytes > 0) {
00592 if (fProof->IsMaster())
00593 Info("Interrupt", "slave %s:%s synchronized: %d bytes discarded",
00594 GetName(), GetOrdinal(), nbytes);
00595 else
00596 Info("Interrupt", "PROOF synchronized: %d bytes discarded", nbytes);
00597 }
00598
00599
00600 fProof->Collect(this);
00601
00602 } else if (type == TProof::kSoftInterrupt) {
00603
00604
00605 fProof->Collect(this);
00606
00607 } else if (type == TProof::kShutdownInterrupt) {
00608
00609 ;
00610
00611 } else {
00612
00613
00614 fProof->Collect(this);
00615 }
00616 }
00617
00618
00619 void TSlave::StopProcess(Bool_t abort, Int_t timeout)
00620 {
00621
00622
00623
00624 TMessage msg(kPROOF_STOPPROCESS);
00625 msg << abort;
00626 if (fProof->fProtocol > 9)
00627 msg << timeout;
00628 fSocket->Send(msg);
00629 }
00630
00631
00632 TObjString *TSlave::SendCoordinator(Int_t, const char *, Int_t)
00633 {
00634
00635
00636
00637 if (gDebug > 0)
00638 Info("SendCoordinator","method not implemented for this communication layer");
00639 return 0;
00640 }
00641
00642
00643 void TSlave::SetAlias(const char *)
00644 {
00645
00646
00647
00648
00649 if (gDebug > 0)
00650 Info("SetAlias","method not implemented for this communication layer");
00651 return;
00652 }
00653
00654
00655 void TSlave::SetTXSlaveHook(TSlave_t xslavehook)
00656 {
00657
00658 fgTXSlaveHook = xslavehook;
00659 }