00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "RConfigure.h"
00022 #include "RConfig.h"
00023 #include "Riostream.h"
00024
00025 #ifdef WIN32
00026 #include <io.h>
00027 typedef long off_t;
00028 #endif
00029 #include <sys/types.h>
00030 #include <netinet/in.h>
00031 #include <utime.h>
00032
00033 #include "TXProofServ.h"
00034 #include "TObjString.h"
00035 #include "TEnv.h"
00036 #include "TError.h"
00037 #include "TException.h"
00038 #include "THashList.h"
00039 #include "TInterpreter.h"
00040 #include "TParameter.h"
00041 #include "TProofDebug.h"
00042 #include "TProof.h"
00043 #include "TProofPlayer.h"
00044 #include "TQueryResultManager.h"
00045 #include "TRegexp.h"
00046 #include "TClass.h"
00047 #include "TROOT.h"
00048 #include "TSystem.h"
00049 #include "TPluginManager.h"
00050 #include "TXSocketHandler.h"
00051 #include "TXUnixSocket.h"
00052 #include "compiledata.h"
00053 #include "TProofNodeInfo.h"
00054 #include "XProofProtocol.h"
00055
00056 #include <XrdClient/XrdClientConst.hh>
00057 #include <XrdClient/XrdClientEnv.hh>
00058
00059
00060
00061 static volatile Int_t gProofServDebug = 1;
00062
00063
00064
00065 class TXProofServSigPipeHandler : public TSignalHandler {
00066 TXProofServ *fServ;
00067 public:
00068 TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
00069 { fServ = s; }
00070 Bool_t Notify();
00071 };
00072
00073
00074 Bool_t TXProofServSigPipeHandler::Notify()
00075 {
00076 fServ->HandleSigPipe();
00077 return kTRUE;
00078 }
00079
00080
00081
00082 class TXProofServTerminationHandler : public TSignalHandler {
00083 TXProofServ *fServ;
00084 public:
00085 TXProofServTerminationHandler(TXProofServ *s)
00086 : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
00087 Bool_t Notify();
00088 };
00089
00090
00091 Bool_t TXProofServTerminationHandler::Notify()
00092 {
00093 Printf("Received SIGTERM: terminating");
00094
00095 fServ->HandleTermination();
00096 return kTRUE;
00097 }
00098
00099
00100
00101 class TXProofServSegViolationHandler : public TSignalHandler {
00102 TXProofServ *fServ;
00103 public:
00104 TXProofServSegViolationHandler(TXProofServ *s)
00105 : TSignalHandler(kSigSegmentationViolation, kFALSE) { fServ = s; }
00106 Bool_t Notify();
00107 };
00108
00109
00110 Bool_t TXProofServSegViolationHandler::Notify()
00111 {
00112 Printf("**** ");
00113 Printf("**** Segmentation violation: terminating ****");
00114 Printf("**** ");
00115 fServ->HandleTermination();
00116 return kTRUE;
00117 }
00118
00119
00120
00121 class TXProofServInputHandler : public TFileHandler {
00122 TXProofServ *fServ;
00123 public:
00124 TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
00125 { fServ = s; }
00126 Bool_t Notify();
00127 Bool_t ReadNotify() { return Notify(); }
00128 };
00129
00130
00131 Bool_t TXProofServInputHandler::Notify()
00132 {
00133 fServ->HandleSocketInput();
00134
00135 ((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
00136 return kTRUE;
00137 }
00138
00139 ClassImp(TXProofServ)
00140
00141
00142
00143 extern "C" {
00144 TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
00145 { return new TXProofServ(argc, argv, flog); }
00146 }
00147
00148
00149 TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
00150 : TProofServ(argc, argv, flog)
00151 {
00152
00153
00154 fInterruptHandler = 0;
00155 fInputHandler = 0;
00156 fTerminated = kFALSE;
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167 }
00168
00169
00170 Int_t TXProofServ::CreateServer()
00171 {
00172
00173
00174
00175
00176 Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
00177
00178 if (gProofDebugLevel > 0)
00179 Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
00180
00181
00182 if (fLogFile) {
00183
00184 if ((fLogFileDes = fileno(fLogFile)) < 0) {
00185 Error("CreateServer", "resolving the log file description number");
00186 return -1;
00187 }
00188
00189 if (gProofDebugLevel <= 0)
00190 lseek(fLogFileDes, (off_t) 0, SEEK_END);
00191 }
00192
00193
00194 TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
00195
00196
00197 EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
00198
00199
00200 if (xtest) {
00201
00202
00203 if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
00204 Error("CreateServer", "Socket setup by xpd undefined");
00205 return -1;
00206 }
00207 Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
00208 int proto = htonl(kPROOF_Protocol);
00209 fSockPath = "";
00210 if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
00211 Error("CreateServer", "test: sending protocol number");
00212 return -1;
00213 }
00214 exit(0);
00215 } else {
00216 fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
00217 if (fSockPath.Length() <= 0) {
00218 Error("CreateServer", "Socket setup by xpd undefined");
00219 return -1;
00220 }
00221 TString entity = gEnv->GetValue("ProofServ.Entity", "");
00222 if (entity.Length() > 0)
00223 fSockPath.Insert(0,Form("%s/", entity.Data()));
00224 }
00225
00226
00227 Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
00228 if (psid < 0) {
00229 Error("CreateServer", "Session ID undefined");
00230 return -1;
00231 }
00232
00233
00234 fSocket = new TXUnixSocket(fSockPath, psid, -1, this);
00235 if (!fSocket || !(fSocket->IsValid())) {
00236 Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
00237 return -1;
00238 }
00239
00240 fSocket->SetCompressionLevel(fCompressMsg);
00241
00242
00243 TString tgt("client");
00244 if (fOrdinal != "0") {
00245 tgt = fOrdinal;
00246 if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
00247 }
00248 fSocket->SetTitle(tgt);
00249
00250
00251 ((TXSocket *)fSocket)->fReference = this;
00252
00253
00254 Int_t sock = fSocket->GetDescriptor();
00255
00256
00257 fInputHandler =
00258 TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
00259 gSystem->AddFileHandler(fInputHandler);
00260
00261
00262 Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
00263 if (cid < 0) {
00264 Error("CreateServer", "Client ID undefined");
00265 SendLogFile();
00266 return -1;
00267 }
00268 ((TXSocket *)fSocket)->SetClientID(cid);
00269
00270
00271 if (IsMaster()) {
00272
00273 if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
00274 while (gProofServDebug)
00275 ;
00276 }
00277 } else {
00278
00279 if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
00280 while (gProofServDebug)
00281 ;
00282 }
00283 }
00284
00285 if (gProofDebugLevel > 0)
00286 Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
00287 fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
00288
00289 if (Setup() == -1) {
00290
00291 LogToMaster();
00292 SendLogFile();
00293 Terminate(0);
00294 return -1;
00295 }
00296
00297 if (!fLogFile) {
00298 RedirectOutput();
00299
00300
00301 if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
00302 LogToMaster();
00303 SendLogFile(-98);
00304 Terminate(0);
00305 return -1;
00306 }
00307 }
00308
00309
00310 if (IsMaster()) {
00311 if (CatMotd() == -1) {
00312 LogToMaster();
00313 SendLogFile(-99);
00314 Terminate(0);
00315 return -1;
00316 }
00317 }
00318
00319
00320 ProcessLine("#include <iostream>", kTRUE);
00321 ProcessLine("#include <string>",kTRUE);
00322
00323
00324 const char *logon;
00325 logon = gEnv->GetValue("Proof.Load", (char *)0);
00326 if (logon) {
00327 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00328 if (mac)
00329 ProcessLine(Form(".L %s", logon), kTRUE);
00330 delete [] mac;
00331 }
00332
00333
00334 logon = gEnv->GetValue("Proof.Logon", (char *)0);
00335 if (logon && !NoLogOpt()) {
00336 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
00337 if (mac)
00338 ProcessFile(logon);
00339 delete [] mac;
00340 }
00341
00342
00343 gInterpreter->SaveContext();
00344 gInterpreter->SaveGlobalsContext();
00345
00346
00347 if (IsMaster()) {
00348 TString master = Form("proof://%s@__master__", fUser.Data());
00349
00350
00351 Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
00352 if (port > -1) {
00353 master += ":";
00354 master += port;
00355 }
00356
00357
00358
00359 gEnv->SetValue("Proof.ParallelStartup", 0);
00360
00361
00362 TPluginManager *pm = gROOT->GetPluginManager();
00363 if (!pm) {
00364 Error("CreateServer", "no plugin manager found");
00365 SendLogFile(-99);
00366 Terminate(0);
00367 return -1;
00368 }
00369
00370
00371 TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
00372 if (!h) {
00373 Error("CreateServer", "no plugin found for TProof with a"
00374 " config file of '%s'", fConfFile.Data());
00375 SendLogFile(-99);
00376 Terminate(0);
00377 return -1;
00378 }
00379
00380
00381 if (h->LoadPlugin() == -1) {
00382 Error("CreateServer", "plugin for TProof could not be loaded");
00383 SendLogFile(-99);
00384 Terminate(0);
00385 return -1;
00386 }
00387
00388
00389 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
00390 fConfFile.Data(),
00391 fConfDir.Data(),
00392 fLogLevel,
00393 fTopSessionTag.Data()));
00394 if (!fProof || !fProof->IsValid()) {
00395 Error("CreateServer", "plugin for TProof could not be executed");
00396 FlushLogFile();
00397 delete fProof;
00398 fProof = 0;
00399 SendLogFile(-99);
00400 Terminate(0);
00401 return -1;
00402 }
00403
00404 fEndMaster = fProof->IsEndMaster();
00405
00406
00407 fProof->SaveWorkerInfo();
00408
00409 SendLogFile();
00410 }
00411
00412
00413 if (!fShutdownTimer) {
00414
00415 fShutdownTimer = new TShutdownTimer(this, 300000);
00416 fShutdownTimer->Start(-1, kFALSE);
00417 }
00418
00419
00420
00421 if (fProtocol <= 17) {
00422 TString msg;
00423 msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
00424 " This may generate compatibility problems between streamed objects.\n"
00425 " The advise is to move to ROOT >= 5.21/02 .");
00426 SendAsynMessage(msg.Data());
00427 }
00428
00429
00430 if (IsMaster() && !fIdleTOTimer) {
00431
00432 Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
00433 if (idle_to > 0) {
00434 fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
00435 fIdleTOTimer->Start(-1, kTRUE);
00436 if (gProofDebugLevel > 0)
00437 Info("CreateServer", " idle timer started (%d secs)", idle_to);
00438 } else if (gProofDebugLevel > 0) {
00439 Info("CreateServer", " idle timer not started (no idle timeout requested)");
00440 }
00441 }
00442
00443
00444 return 0;
00445 }
00446
00447
00448 TXProofServ::~TXProofServ()
00449 {
00450
00451
00452
00453 delete fSocket;
00454 }
00455
00456
00457 void TXProofServ::HandleUrgentData()
00458 {
00459
00460
00461
00462 TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
00463
00464
00465 Bool_t fw = kFALSE;
00466 Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
00467 if (iLev < 0) {
00468 Error("HandleUrgentData", "error receiving interrupt");
00469 return;
00470 }
00471
00472 PDB(kGlobal, 2)
00473 Info("HandleUrgentData", "got interrupt: %d\n", iLev);
00474
00475 if (fProof)
00476 fProof->SetActive();
00477
00478 switch (iLev) {
00479
00480 case TProof::kPing:
00481 PDB(kGlobal, 2)
00482 Info("HandleUrgentData", "*** Ping");
00483
00484
00485 if (fw && IsMaster()) {
00486 Int_t nbad = fProof->fActiveSlaves->GetSize() - fProof->Ping();
00487 if (nbad > 0) {
00488 Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
00489 }
00490 }
00491
00492
00493 if (fAdminPath.IsNull()) {
00494 fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
00495 }
00496
00497 if (!fAdminPath.IsNull()) {
00498 if (!fAdminPath.EndsWith(".status")) {
00499
00500 if (utime(fAdminPath.Data(), 0) != 0)
00501 Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
00502 else
00503 PDB(kGlobal, 2)
00504 Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
00505 } else {
00506
00507
00508
00509
00510
00511
00512 Int_t uss_rc = UpdateSessionStatus(-1);
00513 if (uss_rc != 0)
00514 Error("HandleUrgentData", "problems updating status path: %s (errno: %d)", fAdminPath.Data(), -uss_rc);
00515 }
00516 } else {
00517 Info("HandleUrgentData", "admin path undefined");
00518 }
00519
00520 break;
00521
00522 case TProof::kHardInterrupt:
00523 Info("HandleUrgentData", "*** Hard Interrupt");
00524
00525
00526 if (fw && IsMaster())
00527 fProof->Interrupt(TProof::kHardInterrupt);
00528
00529
00530 ((TXSocket *)fSocket)->Flush();
00531
00532 if (IsMaster())
00533 SendLogFile();
00534
00535 break;
00536
00537 case TProof::kSoftInterrupt:
00538 Info("HandleUrgentData", "Soft Interrupt");
00539
00540
00541 if (fw && IsMaster())
00542 fProof->Interrupt(TProof::kSoftInterrupt);
00543
00544 Interrupt();
00545
00546 if (IsMaster())
00547 SendLogFile();
00548
00549 break;
00550
00551
00552 case TProof::kShutdownInterrupt:
00553 Info("HandleUrgentData", "Shutdown Interrupt");
00554
00555
00556 HandleTermination();
00557
00558 break;
00559
00560 default:
00561 Error("HandleUrgentData", "unexpected type: %d", iLev);
00562 break;
00563 }
00564
00565
00566 if (fProof) fProof->SetActive(kFALSE);
00567 }
00568
00569
00570 void TXProofServ::HandleSigPipe()
00571 {
00572
00573
00574
00575
00576 Info("HandleSigPipe","got sigpipe ... do nothing");
00577 }
00578
00579
00580 void TXProofServ::HandleTermination()
00581 {
00582
00583
00584
00585
00586 if (IsMaster()) {
00587
00588
00589 if (!fIdle) {
00590
00591 fWaitingQueries->Delete();
00592
00593 fProof->InterruptCurrentMonitor();
00594
00595 Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
00596 timeout = (timeout > 20) ? timeout : 20;
00597
00598 fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
00599
00600 fProof->Collect(TProof::kActive, timeout);
00601
00602 if (!fIdle)
00603 Warning("HandleTermination","processing could not be stopped");
00604 }
00605
00606 if (fProof)
00607 fProof->Close("S");
00608 }
00609
00610 Terminate(0);
00611 }
00612
00613
00614 Int_t TXProofServ::Setup()
00615 {
00616
00617
00618
00619 char str[512];
00620
00621 if (IsMaster()) {
00622 sprintf(str, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
00623 } else {
00624 sprintf(str, "**** PROOF worker server @ %s started ****", gSystem->HostName());
00625 }
00626
00627 if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
00628 Error("Setup", "failed to send proof server startup message");
00629 return -1;
00630 }
00631
00632
00633 if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
00634 Error("Setup", "remote proof protocol missing");
00635 return -1;
00636 }
00637
00638
00639 fUser = gEnv->GetValue("ProofServ.Entity", "");
00640 if (fUser.Length() >= 0) {
00641 if (fUser.Contains(":"))
00642 fUser.Remove(fUser.Index(":"));
00643 if (fUser.Contains("@"))
00644 fUser.Remove(fUser.Index("@"));
00645 } else {
00646 UserGroup_t *pw = gSystem->GetUserInfo();
00647 if (pw) {
00648 fUser = pw->fUser;
00649 delete pw;
00650 }
00651 }
00652
00653
00654 if (IsMaster()) {
00655 TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
00656 if (cf.Length() > 0)
00657 fConfFile = cf;
00658 }
00659 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
00660
00661
00662 if ((fTopSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
00663 Error("Setup", "Session tag missing");
00664 return -1;
00665 }
00666 fSessionTag = fTopSessionTag;
00667
00668 TString spid = Form("-%d", gSystem->GetPid());
00669 if (!fSessionTag.EndsWith(spid)) {
00670 Int_t nd = 0;
00671 if ((nd = fSessionTag.CountChar('-')) >= 2) {
00672 Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
00673 if (id != kNPOS) fSessionTag.Remove(id);
00674 } else if (nd != 1) {
00675 Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
00676 }
00677
00678 fSessionTag += spid;
00679 }
00680 if (gProofDebugLevel > 0)
00681 Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
00682
00683
00684 if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
00685 Error("Setup", "Session dir missing");
00686 return -1;
00687 }
00688
00689
00690 char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
00691 fWorkDir = workdir;
00692 delete [] workdir;
00693 if (gProofDebugLevel > 0)
00694 Info("Setup", "working directory set to %s", fWorkDir.Data());
00695
00696
00697 if (SetupCommon() != 0) {
00698 Error("Setup", "common setup failed");
00699 return -1;
00700 }
00701
00702
00703 fSocket->SetOption(kNoDelay, 1);
00704
00705
00706 fSocket->SetOption(kKeepAlive, 1);
00707
00708
00709 gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
00710
00711
00712 gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
00713
00714
00715 gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
00716
00717 if (gProofDebugLevel > 0)
00718 Info("Setup", "successfully completed");
00719
00720
00721 return 0;
00722 }
00723
00724
00725 TProofServ::EQueryAction TXProofServ::GetWorkers(TList *workers,
00726 Int_t & ,
00727 Bool_t resume)
00728 {
00729
00730
00731
00732 TProofServ::EQueryAction rc = kQueryStop;
00733
00734
00735 if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
00736 Int_t pc = 1;
00737 if ((rc = TProofServ::GetWorkers(workers, pc)) == kQueryOK)
00738 return rc;
00739 }
00740
00741
00742 Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
00743 TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
00744 if (!fWaitingQueries->IsEmpty()) {
00745 if (resume) {
00746 seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
00747 } else {
00748 seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
00749 }
00750 }
00751
00752 TObjString *os =
00753 ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
00754
00755
00756
00757
00758 if (os) {
00759 TString fl(os->GetName());
00760 if (fl.BeginsWith(XPD_GW_QueryEnqueued)) {
00761 SendAsynMessage("+++ Query cannot be processed now: enqueued");
00762 return kQueryEnqueued;
00763 }
00764
00765
00766 Int_t nwrks = -1;
00767 Bool_t pernode = kFALSE;
00768 if (gSystem->Getenv("PROOF_NWORKERS")) {
00769 TString s(gSystem->Getenv("PROOF_NWORKERS"));
00770 if (s.EndsWith("x")) {
00771 pernode = kTRUE;
00772 s.ReplaceAll("x", "");
00773 }
00774 if (s.IsDigit()) {
00775 nwrks = s.Atoi();
00776 if (nwrks > 0) {
00777
00778 TString msg;
00779 if (pernode) {
00780 msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
00781 } else {
00782 msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
00783 }
00784 SendAsynMessage(msg);
00785 } else {
00786 nwrks = -1;
00787 }
00788 } else {
00789 pernode = kFALSE;
00790 }
00791 }
00792
00793 TString tok;
00794 Ssiz_t from = 0;
00795 TList *nodecnt = (pernode) ? new TList : 0 ;
00796 if (fl.Tokenize(tok, from, "&")) {
00797 if (!tok.IsNull()) {
00798 TProofNodeInfo *master = new TProofNodeInfo(tok);
00799 if (!master) {
00800 Error("GetWorkers", "no appropriate master line got from coordinator");
00801 return kQueryStop;
00802 } else {
00803
00804 if (fImage.IsNull() && strlen(master->GetImage()) > 0)
00805 fImage = master->GetImage();
00806 SafeDelete(master);
00807 }
00808
00809 while (fl.Tokenize(tok, from, "&") && (nwrks == -1 || nwrks > 0)) {
00810 if (!tok.IsNull()) {
00811
00812 rc = kQueryOK;
00813 if (pernode && nodecnt) {
00814 TProofNodeInfo *ni = new TProofNodeInfo(tok);
00815 TParameter<Int_t> *p = 0;
00816 Int_t nw = 0;
00817 if (!(p = (TParameter<Int_t> *) nodecnt->FindObject(ni->GetNodeName().Data()))) {
00818 p = new TParameter<Int_t>(ni->GetNodeName().Data(), nw);
00819 nodecnt->Add(p);
00820 }
00821 nw = p->GetVal();
00822 if (gDebug > 0)
00823 Info("GetWorkers","%p: name: %s (%s) val: %d (nwrks: %d)",
00824 p, p->GetName(), ni->GetNodeName().Data(), nw, nwrks);
00825 if (nw < nwrks) {
00826 if (workers) workers->Add(ni);
00827 nw++;
00828 p->SetVal(nw);
00829 } else {
00830
00831 SafeDelete(ni);
00832 }
00833 } else {
00834 if (workers)
00835 workers->Add(new TProofNodeInfo(tok));
00836
00837 if (nwrks != -1) nwrks--;
00838 }
00839 }
00840 }
00841 }
00842 }
00843
00844 if (nodecnt) {
00845 nodecnt->SetOwner(kTRUE);
00846 SafeDelete(nodecnt);
00847 }
00848 }
00849
00850
00851 return rc;
00852 }
00853
00854
00855 Bool_t TXProofServ::HandleError(const void *)
00856 {
00857
00858
00859
00860 if (fSocket && !fSocket->IsValid()) {
00861
00862 fSocket->Reconnect();
00863 if (fSocket && fSocket->IsValid()) {
00864 if (gDebug > 0)
00865 Info("HandleError",
00866 "%p: connection to local coordinator re-established", this);
00867 FlushLogFile();
00868 return kFALSE;
00869 }
00870 }
00871 Printf("TXProofServ::HandleError: %p: got called ...", this);
00872
00873
00874
00875 if (IsMaster())
00876 fProof->Close("S");
00877
00878
00879 ((TXSocket *)fSocket)->SetSessionID(-1);
00880
00881 Terminate(0);
00882
00883 Printf("TXProofServ::HandleError: %p: DONE ... ", this);
00884
00885
00886 return kTRUE;
00887 }
00888
00889
00890 Bool_t TXProofServ::HandleInput(const void *in)
00891 {
00892
00893
00894 if (gDebug > 2)
00895 Printf("TXProofServ::HandleInput %p, in: %p", this, in);
00896
00897 XHandleIn_t *hin = (XHandleIn_t *) in;
00898 Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
00899
00900
00901 if (acod == kXPD_ping || acod == kXPD_interrupt) {
00902
00903 HandleUrgentData();
00904
00905 } else if (acod == kXPD_flush) {
00906
00907 Info("HandleInput","kXPD_flush: flushing log file (stdout)");
00908 fflush(stdout);
00909
00910 } else if (acod == kXPD_urgent) {
00911
00912 Int_t type = hin->fInt2;
00913 switch (type) {
00914 case TXSocket::kStopProcess:
00915 {
00916
00917 Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
00918
00919 Int_t timeout = hin->fInt4;
00920
00921 if (fProof)
00922 fProof->StopProcess(abort, timeout);
00923 else
00924 if (fPlayer)
00925 fPlayer->StopProcess(abort, timeout);
00926 }
00927 break;
00928 default:
00929 Info("HandleInput","kXPD_urgent: unknown type: %d", type);
00930 }
00931
00932 } else if (acod == kXPD_inflate) {
00933
00934
00935 fInflateFactor = (hin->fInt2 >= 1000) ? hin->fInt2 : fInflateFactor;
00936
00937 Info("HandleInput", "kXPD_inflate: inflate factor set to %f",
00938 (Float_t) fInflateFactor / 1000.);
00939
00940 } else if (acod == kXPD_priority) {
00941
00942
00943 fGroupPriority = hin->fInt2;
00944 if (fProof)
00945 fProof->BroadcastGroupPriority(fGroup, fGroupPriority);
00946
00947 Info("HandleInput", "kXPD_priority: group %s priority set to %f",
00948 fGroup.Data(), (Float_t) fGroupPriority / 100.);
00949
00950 } else if (acod == kXPD_clusterinfo) {
00951
00952
00953 fTotSessions = hin->fInt2;
00954 fActSessions = hin->fInt3;
00955 fEffSessions = (hin->fInt4)/1000.;
00956
00957 Info("HandleInput", "kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
00958 fTotSessions, fActSessions, fEffSessions);
00959
00960 } else {
00961
00962 HandleSocketInput();
00963
00964 ((TXSocket *)fSocket)->RemoveClientID();
00965 }
00966
00967
00968 return kTRUE;
00969 }
00970
00971
00972 void TXProofServ::DisableTimeout()
00973 {
00974
00975
00976 if (fSocket)
00977 ((TXSocket *)fSocket)->DisableTimeout();
00978 }
00979
00980
00981 void TXProofServ::EnableTimeout()
00982 {
00983
00984
00985 if (fSocket)
00986 ((TXSocket *)fSocket)->EnableTimeout();
00987 }
00988
00989
00990 void TXProofServ::Terminate(Int_t status)
00991 {
00992
00993
00994 if (fTerminated)
00995
00996 exit(1);
00997 fTerminated = kTRUE;
00998
00999
01000 Info("Terminate", "starting session termination operations ...");
01001 if (fgLogToSysLog > 0) {
01002 TString s;
01003 s.Form("%s -1 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
01004 gSystem->Syslog(kLogNotice, s.Data());
01005 }
01006
01007
01008 ProcInfo_t pi;
01009 if (!gSystem->GetProcInfo(&pi)){
01010 Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
01011 pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
01012 }
01013
01014
01015 if (fProof)
01016 fProof->SetMonitor(0, kFALSE);
01017
01018
01019 if (status == 0) {
01020
01021 gSystem->ChangeDirectory("/");
01022
01023 gSystem->MakeDirectory(fSessionDir+"/.delete");
01024 gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
01025 }
01026
01027
01028 if (IsMaster()) {
01029 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
01030
01031 gSystem->ChangeDirectory("/");
01032
01033 gSystem->MakeDirectory(fQueryDir+"/.delete");
01034 gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
01035
01036 if (fQueryLock)
01037 gSystem->Unlink(fQueryLock->GetName());
01038 }
01039
01040
01041 if (fQueryLock)
01042 fQueryLock->Unlock();
01043 } else {
01044
01045 Bool_t abort = (status == 0) ? kFALSE : kTRUE;
01046 if (!fIdle && fPlayer)
01047 fPlayer->StopProcess(abort,1);
01048 gSystem->Sleep(2000);
01049 }
01050
01051
01052 if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
01053 if (UnlinkDataDir(fDataDir))
01054 Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
01055 }
01056
01057
01058
01059 gSystem->RemoveFileHandler(fInputHandler);
01060
01061
01062 gSystem->ExitLoop();
01063
01064
01065
01066
01067 TXSocket::fgPipe.Post((TXSocket *)fSocket);
01068
01069
01070 Printf("Terminate: termination operations ended: quitting!");
01071 }
01072
01073
01074 Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
01075 {
01076
01077
01078
01079
01080
01081 if (strstr(sessiontag, fTopSessionTag))
01082 return 0;
01083
01084 if (!lck) {
01085 Info("LockSession","locker space undefined");
01086 return -1;
01087 }
01088 *lck = 0;
01089
01090
01091 TString stag = sessiontag;
01092 TRegexp re("session-.*-.*-.*");
01093 Int_t i1 = stag.Index(re);
01094 if (i1 == kNPOS) {
01095 Info("LockSession","bad format: %s", sessiontag);
01096 return -1;
01097 }
01098 stag.ReplaceAll("session-","");
01099
01100
01101 Int_t i2 = stag.Index(":q");
01102 if (i2 != kNPOS)
01103 stag.Remove(i2);
01104
01105
01106 TString parlog = fSessionDir;
01107 parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
01108 parlog += stag;
01109 if (!gSystem->AccessPathName(parlog)) {
01110 Info("LockSession","parent still running: do nothing");
01111 return -1;
01112 }
01113
01114
01115 TString qlock = fQueryLock->GetName();
01116 qlock.ReplaceAll(fTopSessionTag, stag);
01117
01118 if (!gSystem->AccessPathName(qlock)) {
01119 *lck = new TProofLockPath(qlock);
01120 if (((*lck)->Lock()) < 0) {
01121 Info("LockSession","problems locking query lock file");
01122 SafeDelete(*lck);
01123 return -1;
01124 }
01125 }
01126
01127
01128 return 0;
01129 }
01130
01131
01132 void TXProofServ::ReleaseWorker(const char *ord)
01133 {
01134
01135
01136
01137 Info("ReleaseWorker","releasing: %s", ord);
01138
01139 ((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
01140 }