00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "TXSlave.h"
00022 #include "TProof.h"
00023 #include "TProofServ.h"
00024 #include "TSystem.h"
00025 #include "TEnv.h"
00026 #include "TROOT.h"
00027 #include "TUrl.h"
00028 #include "TMessage.h"
00029 #include "TMonitor.h"
00030 #include "TError.h"
00031 #include "TSysEvtHandler.h"
00032 #include "TVirtualMutex.h"
00033 #include "TThread.h"
00034 #include "TXSocket.h"
00035 #include "TXSocketHandler.h"
00036 #include "Varargs.h"
00037 #include "XProofProtocol.h"
00038
00039 ClassImp(TXSlave)
00040
00041
00042
00043
00044
00045
00046 TSlave *GetTXSlave(const char *url, const char *ord, Int_t perf,
00047 const char *image, TProof *proof, Int_t stype,
00048 const char *workdir, const char *msd)
00049 {
00050 return ((TSlave *)(new TXSlave(url, ord, perf, image,
00051 proof, stype, workdir, msd)));
00052 }
00053
00054 class XSlaveInit {
00055 public:
00056 XSlaveInit() {
00057 TSlave::SetTXSlaveHook(&GetTXSlave);
00058 }};
00059 static XSlaveInit xslave_init;
00060
00061
00062
00063
00064
00065
00066
00067 void TXSlave::DoError(int level, const char *location, const char *fmt, va_list va) const
00068 {
00069
00070
00071 ::ErrorHandler(level, Form("TXSlave::%s", location), fmt, va);
00072 }
00073
00074
00075
00076
00077 class TXSlaveInterruptHandler : public TSignalHandler {
00078 private:
00079 TXSocket *fSocket;
00080 public:
00081 TXSlaveInterruptHandler(TXSocket *s = 0)
00082 : TSignalHandler(kSigInterrupt, kFALSE), fSocket(s) { }
00083 Bool_t Notify();
00084 };
00085
00086
00087 Bool_t TXSlaveInterruptHandler::Notify()
00088 {
00089
00090
00091 Info("Notify","Processing interrupt signal ...");
00092
00093
00094 if (fSocket)
00095 fSocket->SetInterrupt();
00096
00097 return kTRUE;
00098 }
00099
00100
00101 TXSlave::TXSlave(const char *url, const char *ord, Int_t perf,
00102 const char *image, TProof *proof, Int_t stype,
00103 const char *workdir, const char *msd) : TSlave()
00104 {
00105
00106 fImage = image;
00107 fProofWorkDir = workdir;
00108 fWorkDir = workdir;
00109 fOrdinal = ord;
00110 fPerfIdx = perf;
00111 fProof = proof;
00112 fSlaveType = (ESlaveType)stype;
00113 fMsd = msd;
00114 fIntHandler = 0;
00115 fValid = kFALSE;
00116
00117
00118 TXSocketHandler *sh = TXSocketHandler::GetSocketHandler();
00119 gSystem->AddFileHandler(sh);
00120
00121 TXSocket::SetLocation((fProof->IsMaster()) ? "master" : "client");
00122
00123 Init(url, stype);
00124 }
00125
00126
00127 void TXSlave::Init(const char *host, Int_t stype)
00128 {
00129
00130
00131
00132
00133
00134
00135
00136
00137 TUrl url(host);
00138 url.SetProtocol(fProof->fUrl.GetProtocol());
00139
00140 if (url.GetPort() == TUrl("a").GetPort()) {
00141
00142 Int_t port = gSystem->GetServiceByName("proofd");
00143 if (port < 0) {
00144 if (gDebug > 0)
00145 Info("Init","service 'proofd' not found by GetServiceByName"
00146 ": using default IANA assigned tcp port 1093");
00147 port = 1093;
00148 } else {
00149 if (gDebug > 1)
00150 Info("Init","port from GetServiceByName: %d", port);
00151 }
00152 url.SetPort(port);
00153 }
00154
00155
00156 fName = url.GetHostFQDN();
00157 fPort = url.GetPort();
00158
00159 fGroup = url.GetPasswd();
00160
00161
00162
00163
00164 TString opts(url.GetOptions());
00165 Bool_t attach = (opts.Length() > 0 && opts.IsDigit()) ? kTRUE : kFALSE;
00166 Int_t psid = (attach) ? opts.Atoi() : kPROOF_Protocol;
00167
00168
00169 TString iam;
00170 Char_t mode = 's';
00171 TString alias = fProof->GetTitle();
00172 if (fProof->IsMaster() && stype == kSlave) {
00173 iam = "Master";
00174 mode = 's';
00175
00176 alias = Form("session-%s|ord:%s", fProof->GetName(), fOrdinal.Data());
00177 } else if (fProof->IsMaster() && stype == kMaster) {
00178 iam = "Master";
00179 mode = 'm';
00180
00181 alias = Form("session-%s|ord:%s", fProof->GetName(), fOrdinal.Data());
00182 } else if (!fProof->IsMaster() && stype == kMaster) {
00183 iam = "Local Client";
00184 mode = (attach) ? 'A' : 'M';
00185 } else {
00186 Error("Init","Impossible PROOF <-> SlaveType Configuration Requested");
00187 R__ASSERT(0);
00188 }
00189
00190
00191 if (fProof->fConfFile.Length() > 0)
00192 alias += Form("|cf:%s",fProof->fConfFile.Data());
00193
00194
00195 TString envlist;
00196 if (!fProof->GetManager() ||
00197 fProof->GetManager()->GetRemoteProtocol() > 1001) {
00198
00199
00200 if (gSystem->Getenv("XrdSecPROTOCOL")) {
00201 TProof::DelEnvVar("XrdSecPROTOCOL");
00202 TProof::AddEnvVar("XrdSecPROTOCOL", gSystem->Getenv("XrdSecPROTOCOL"));
00203 }
00204 const TList *envs = TProof::GetEnvVars();
00205 if (envs != 0 ) {
00206 TIter next(envs);
00207 for (TObject *o = next(); o != 0; o = next()) {
00208 TNamed *env = dynamic_cast<TNamed*>(o);
00209 if (env != 0) {
00210 if (!envlist.IsNull())
00211 envlist += ",";
00212 envlist += Form("%s=%s", env->GetName(), env->GetTitle());
00213 }
00214 }
00215 }
00216 } else {
00217 if (fProof->GetManager() && TProof::GetEnvVars())
00218 Info("Init", "** NOT ** sending user envs - RemoteProtocol : %d",
00219 fProof->GetManager()->GetRemoteProtocol());
00220 }
00221
00222
00223 if (!envlist.IsNull())
00224 alias += Form("|envs:%s", envlist.Data());
00225
00226
00227
00228 if (!(fSocket = new TXSocket(url.GetUrl(kTRUE), mode, psid,
00229 -1, alias, fProof->GetLogLevel(), this))) {
00230 Error("Init", "while opening the connection to %s - exit", url.GetUrl(kTRUE));
00231 return;
00232 }
00233
00234
00235 if (!(fSocket->IsValid())) {
00236
00237 if (gDebug > 0)
00238 Error("Init", "some severe error occurred while opening "
00239 "the connection at %s - exit", url.GetUrl(kTRUE));
00240 SafeDelete(fSocket);
00241 return;
00242 }
00243
00244
00245 fSocket->SetTitle(fOrdinal);
00246
00247
00248 if (!fProof->GetManager() && !envlist.IsNull() &&
00249 ((TXSocket *)fSocket)->GetXrdProofdVersion() <= 1001) {
00250 Info("Init","user envs setting sent but unsupported remotely - RemoteProtocol : %d",
00251 ((TXSocket *)fSocket)->GetXrdProofdVersion());
00252 }
00253
00254
00255 ((TXSocket *)fSocket)->fReference = fProof;
00256
00257
00258 fProtocol = fSocket->GetRemoteProtocol();
00259
00260
00261 fProof->fServType = TProofMgr::kXProofd;
00262
00263
00264 fProof->fSessionID = ((TXSocket *)fSocket)->GetSessionID();
00265
00266
00267 TString dpu(((TXSocket *)fSocket)->fBuffer);
00268 if (dpu.Length() > 0)
00269 fProof->SetDataPoolUrl(dpu);
00270
00271
00272
00273
00274
00275 {
00276 R__LOCKGUARD2(gROOTMutex);
00277 gROOT->GetListOfSockets()->Remove(fSocket);
00278 }
00279
00280 R__LOCKGUARD2(gProofMutex);
00281
00282
00283 fUser = ((TXSocket *)fSocket)->fUser;
00284 PDB(kGlobal,3) {
00285 Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
00286 }
00287
00288
00289 fValid = kTRUE;
00290 }
00291
00292
00293 Int_t TXSlave::SetupServ(Int_t, const char *)
00294 {
00295
00296
00297
00298
00299
00300
00301 Int_t what;
00302 char buf[512];
00303 if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
00304 Error("SetupServ", "failed to receive slave startup message");
00305 Close("S");
00306 SafeDelete(fSocket);
00307 fValid = kFALSE;
00308 return -1;
00309 }
00310
00311 if (what == kMESS_NOTOK) {
00312 SafeDelete(fSocket);
00313 fValid = kFALSE;
00314 return -1;
00315 }
00316
00317
00318 if (fProtocol < 4) {
00319 Error("SetupServ", "incompatible PROOF versions (remote version "
00320 "must be >= 4, is %d)", fProtocol);
00321 SafeDelete(fSocket);
00322 fValid = kFALSE;
00323 return -1;
00324 }
00325
00326 fProof->fProtocol = fProtocol;
00327
00328
00329 fSocket->SetOption(kNoDelay, 1);
00330
00331
00332 return 0;
00333 }
00334
00335
00336 TXSlave::~TXSlave()
00337 {
00338
00339
00340 Close();
00341 }
00342
00343
00344 void TXSlave::Close(Option_t *opt)
00345 {
00346
00347
00348 if (fSocket)
00349
00350 fSocket->Close(opt);
00351
00352 SafeDelete(fInput);
00353 SafeDelete(fSocket);
00354 }
00355
00356
00357 Int_t TXSlave::Ping()
00358 {
00359
00360
00361
00362 if (!IsValid()) return -1;
00363
00364 return (((TXSocket *)fSocket)->Ping(GetOrdinal()) ? 0 : -1);
00365 }
00366
00367
00368 void TXSlave::Touch()
00369 {
00370
00371
00372 if (!IsValid()) return;
00373
00374 ((TXSocket *)fSocket)->RemoteTouch();
00375 return;
00376 }
00377
00378
00379 void TXSlave::Interrupt(Int_t type)
00380 {
00381
00382
00383
00384 if (!IsValid()) return;
00385
00386 if (type == TProof::kLocalInterrupt) {
00387
00388
00389
00390 if (fProof) {
00391
00392
00393 TMonitor *mon = fProof->fCurrentMonitor;
00394 if (mon && fSocket && mon->GetListOfActives()->FindObject(fSocket)) {
00395
00396 if (gDebug > 2)
00397 Info("Interrupt", "%p: deactivating from monitor %p", this, mon);
00398 mon->DeActivate(fSocket);
00399 }
00400 } else {
00401 Warning("Interrupt", "%p: reference to PROOF missing", this);
00402 }
00403
00404
00405 if (fSocket) {
00406 R__LOCKGUARD(((TXSocket *)fSocket)->fAMtx);
00407 TSemaphore *sem = &(((TXSocket *)fSocket)->fASem);
00408 while (sem->TryWait() != 1)
00409 sem->Post();
00410 }
00411 return;
00412 }
00413
00414 ((TXSocket *)fSocket)->SendInterrupt(type);
00415 Info("Interrupt","Interrupt of type %d sent", type);
00416 }
00417
00418
00419 void TXSlave::StopProcess(Bool_t abort, Int_t timeout)
00420 {
00421
00422
00423 if (!IsValid()) return;
00424
00425 ((TXSocket *)fSocket)->SendUrgent(TXSocket::kStopProcess, (Int_t)abort, timeout);
00426 if (gDebug > 0)
00427 Info("StopProcess", "Request of type %d sent over", abort);
00428 }
00429
00430
00431 Int_t TXSlave::GetProofdProtocol(TSocket *s)
00432 {
00433
00434
00435
00436 Int_t rproto = -1;
00437
00438 UInt_t cproto = 0;
00439 Int_t len = sizeof(cproto);
00440 memcpy((char *)&cproto,
00441 Form(" %d", TSocket::GetClientProtocol()),len);
00442 Int_t ns = s->SendRaw(&cproto, len);
00443 if (ns != len) {
00444 ::Error("TXSlave::GetProofdProtocol",
00445 "sending %d bytes to proofd server [%s:%d]",
00446 len, (s->GetInetAddress()).GetHostName(), s->GetPort());
00447 return -1;
00448 }
00449
00450
00451 Int_t ibuf[2] = {0};
00452 len = sizeof(ibuf);
00453 Int_t nr = s->RecvRaw(ibuf, len);
00454 if (nr != len) {
00455 ::Error("TXSlave::GetProofdProtocol",
00456 "reading %d bytes from proofd server [%s:%d]",
00457 len, (s->GetInetAddress()).GetHostName(), s->GetPort());
00458 return -1;
00459 }
00460 Int_t kind = net2host(ibuf[0]);
00461 if (kind == kROOTD_PROTOCOL) {
00462 rproto = net2host(ibuf[1]);
00463 } else {
00464 kind = net2host(ibuf[1]);
00465 if (kind == kROOTD_PROTOCOL) {
00466 len = sizeof(rproto);
00467 nr = s->RecvRaw(&rproto, len);
00468 if (nr != len) {
00469 ::Error("TXSlave::GetProofdProtocol",
00470 "reading %d bytes from proofd server [%s:%d]",
00471 len, (s->GetInetAddress()).GetHostName(), s->GetPort());
00472 return -1;
00473 }
00474 rproto = net2host(rproto);
00475 }
00476 }
00477 if (gDebug > 2)
00478 ::Info("TXSlave::GetProofdProtocol",
00479 "remote proofd: buf1: %d, buf2: %d rproto: %d",
00480 net2host(ibuf[0]),net2host(ibuf[1]),rproto);
00481
00482
00483 return rproto;
00484 }
00485
00486
00487 TObjString *TXSlave::SendCoordinator(Int_t kind, const char *msg, Int_t int2)
00488 {
00489
00490
00491
00492 return ((TXSocket *)fSocket)->SendCoordinator(kind, msg, int2);
00493 }
00494
00495
00496 void TXSlave::SetAlias(const char *alias)
00497 {
00498
00499
00500
00501
00502
00503 if (!IsValid()) return;
00504
00505 ((TXSocket *)fSocket)->SendCoordinator(kSessionAlias, alias);
00506
00507 return;
00508 }
00509
00510
00511 Int_t TXSlave::SendGroupPriority(const char *grp, Int_t priority)
00512 {
00513
00514
00515
00516
00517
00518 if (!IsValid()) return -1;
00519
00520 ((TXSocket *)fSocket)->SendCoordinator(kGroupProperties, grp, priority);
00521
00522 return 0;
00523 }
00524
00525
00526 Bool_t TXSlave::HandleError(const void *in)
00527 {
00528
00529
00530 XHandleErr_t *herr = in ? (XHandleErr_t *)in : 0;
00531
00532
00533 if (fSocket && herr && (herr->fOpt == 1)) {
00534
00535 ((TXSocket *)fSocket)->Reconnect();
00536 if (fSocket && fSocket->IsValid()) {
00537 if (gDebug > 0) {
00538 if (!strcmp(GetOrdinal(), "0")) {
00539 Printf("Proof: connection to master at %s:%d re-established",
00540 GetName(), GetPort());
00541 } else {
00542 Printf("Proof: connection to node '%s' at %s:%d re-established",
00543 GetOrdinal(), GetName(), GetPort());
00544 }
00545 }
00546 return kFALSE;
00547 }
00548 }
00549
00550
00551 Info("HandleError", "%p:%s:%s got called ... fProof: %p, fSocket: %p (valid: %d)",
00552 this, fName.Data(), fOrdinal.Data(), fProof, fSocket,
00553 (fSocket ? (Int_t)fSocket->IsValid() : -1));
00554
00555
00556
00557 SetInterruptHandler(kFALSE);
00558
00559 if (fProof) {
00560
00561
00562 if (fProof->fIntHandler)
00563 fProof->fIntHandler->Remove();
00564
00565 Info("HandleError", "%p: proof: %p", this, fProof);
00566
00567 if (fSocket) {
00568
00569 ((TXSocket *)fSocket)->SetSessionID(-1);
00570
00571
00572 ((TXSocket *)fSocket)->PostMsg(kPROOF_FATAL);
00573 }
00574
00575
00576 if (fProof->IsMaster()) {
00577 TString msg(Form("Worker '%s-%s' has been removed from the active list",
00578 fName.Data(), fOrdinal.Data()));
00579 TMessage m(kPROOF_MESSAGE);
00580 m << msg;
00581 if (gProofServ)
00582 gProofServ->GetSocket()->Send(m);
00583 else
00584 Warning("HandleError", "%p: global reference to TProofServ missing", this);
00585 }
00586 } else {
00587 Warning("HandleError", "%p: reference to PROOF missing", this);
00588 }
00589
00590 Printf("TXSlave::HandleError: %p: DONE ... ", this);
00591
00592
00593 return kTRUE;
00594 }
00595
00596
00597 Bool_t TXSlave::HandleInput(const void *)
00598 {
00599
00600
00601 if (fProof) {
00602
00603
00604 TMonitor *mon = fProof->fCurrentMonitor;
00605
00606 if (gDebug > 2)
00607 Info("HandleInput", "%p: %s: proof: %p, mon: %p",
00608 this, GetOrdinal(), fProof, mon);
00609
00610 if (mon && mon->IsActive(fSocket)) {
00611
00612 if (gDebug > 2)
00613 Info("HandleInput","%p: %s: posting monitor %p", this, GetOrdinal(), mon);
00614 mon->SetReady(fSocket);
00615 } else {
00616
00617 if (gDebug > 2) {
00618 if (mon) {
00619 Info("HandleInput", "%p: %s: not active in current monitor"
00620 " - calling TProof::CollectInputFrom",
00621 this, GetOrdinal());
00622 } else {
00623 Info("HandleInput", "%p: %s: calling TProof::CollectInputFrom",
00624 this, GetOrdinal());
00625 }
00626 }
00627 if (fProof->CollectInputFrom(fSocket) < 0)
00628
00629 FlushSocket();
00630 }
00631 } else {
00632 Warning("HandleInput", "%p: %s: reference to PROOF missing", this, GetOrdinal());
00633 return kFALSE;
00634 }
00635
00636
00637 return kTRUE;
00638 }
00639
00640
00641 void TXSlave::SetInterruptHandler(Bool_t on)
00642 {
00643
00644
00645 if (gDebug > 1)
00646 Info("SetInterruptHandler", "enter: %d", on);
00647
00648 if (on) {
00649 if (!fIntHandler)
00650 fIntHandler = new TXSlaveInterruptHandler((TXSocket *)fSocket);
00651 fIntHandler->Add();
00652 } else {
00653 if (fIntHandler)
00654 fIntHandler->Remove();
00655 }
00656 }
00657
00658
00659 void TXSlave::FlushSocket()
00660 {
00661
00662
00663 if (gDebug > 1)
00664 Info("FlushSocket", "enter: %p", fSocket);
00665
00666 if (fSocket)
00667 TXSocket::fgPipe.Flush(fSocket);
00668 }