00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "XrdProofdPlatform.h"
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "XrdProofdNetMgr.h"
00024
00025 #include "Xrd/XrdBuffer.hh"
00026 #include "XrdClient/XrdClientConst.hh"
00027 #include "XrdClient/XrdClientEnv.hh"
00028 #include "XrdClient/XrdClientMessage.hh"
00029 #include "XrdClient/XrdClientUrlInfo.hh"
00030 #include "XrdNet/XrdNetDNS.hh"
00031 #include "XrdOuc/XrdOucStream.hh"
00032 #include "XrdSys/XrdSysPlatform.hh"
00033
00034 #include "XrdProofdClient.h"
00035 #include "XrdProofdManager.h"
00036 #include "XrdProofdProtocol.h"
00037 #include "XrdProofdResponse.h"
00038 #include "XrdProofWorker.h"
00039
00040
00041 #include "XrdProofdTrace.h"
00042
00043 #include <algorithm>
00044 #include <limits>
00045 #include <math.h>
00046
00047
00048 int MessageSender(const char *msg, int len, void *arg)
00049 {
00050
00051
00052 XrdProofdResponse *r = (XrdProofdResponse *) arg;
00053 if (r) {
00054 return r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) msg, len);
00055 }
00056 return -1;
00057 }
00058
00059
00060 XrdProofdNetMgr::XrdProofdNetMgr(XrdProofdManager *mgr,
00061 XrdProtocol_Config *pi, XrdSysError *e)
00062 : XrdProofdConfig(pi->ConfigFN, e)
00063 {
00064
00065 fMgr = mgr;
00066 fResourceType = kRTNone;
00067 fPROOFcfg.fName = "";
00068 fPROOFcfg.fMtime = -1;
00069 fReloadPROOFcfg = 1;
00070 fDfltFallback = 0;
00071 fDfltWorkers.clear();
00072 fRegWorkers.clear();
00073 fWorkers.clear();
00074 fNodes.clear();
00075 fNumLocalWrks = XrdProofdAux::GetNumCPUs();
00076 fWorkerUsrCfg = 0;
00077 fRequestTO = 30;
00078 fBonjourEnabled = false;
00079 #if defined(BUILD_BONJOUR)
00080 char *host = XrdNetDNS::getHostName();
00081 fBonjourName = host ? host : "";
00082 SafeFree(host);
00083 fBonjourCores = XrdProofdAux::GetNumCPUs();
00084 fBonjourRequestedSrvType = kBonjourSrvDisabled;
00085 #endif
00086
00087
00088 RegisterDirectives();
00089 }
00090
00091
00092 void XrdProofdNetMgr::RegisterDirectives()
00093 {
00094
00095
00096 Register("adminreqto", new XrdProofdDirective("adminreqto", this, &DoDirectiveClass));
00097 Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
00098 Register("worker", new XrdProofdDirective("worker", this, &DoDirectiveClass));
00099 Register("bonjour", new XrdProofdDirective("bonjour", this, &DoDirectiveClass));
00100 Register("localwrks", new XrdProofdDirective("localwrks", (void *)&fNumLocalWrks, &DoDirectiveInt));
00101 }
00102
00103
00104 XrdProofdNetMgr::~XrdProofdNetMgr()
00105 {
00106
00107
00108
00109
00110 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
00111 while (w != fRegWorkers.end()) {
00112 delete *w;
00113 w = fRegWorkers.erase(w);
00114 }
00115 w = fDfltWorkers.begin();
00116 while (w != fDfltWorkers.end()) {
00117 delete *w;
00118 w = fDfltWorkers.erase(w);
00119 }
00120 fWorkers.clear();
00121 }
00122
00123
00124 int XrdProofdNetMgr::Config(bool rcf)
00125 {
00126
00127
00128 XPDLOC(NMGR, "NetMgr::Config")
00129
00130
00131 XrdSysMutexHelper mhp(fMutex);
00132
00133
00134 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
00135 while (w != fWorkers.end()) {
00136 delete *w;
00137 w = fWorkers.erase(w);
00138 }
00139
00140 XrdOucString mm("master ", 128);
00141 mm += fMgr->Host();
00142 mm += " port=";
00143 mm += fMgr->Port();
00144 fWorkers.push_back(new XrdProofWorker(mm.c_str()));
00145
00146
00147 if (XrdProofdConfig::Config(rcf) != 0) {
00148 XPDERR("problems parsing file ");
00149 return -1;
00150 }
00151
00152 XrdOucString msg;
00153 msg = (rcf) ? "re-configuring" : "configuring";
00154 TRACE(ALL, msg);
00155
00156 if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
00157 TRACE(ALL, "PROOF config file: " <<
00158 ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() : "none"));
00159 if (fResourceType == kRTStatic) {
00160
00161
00162 bool dodefault = 1;
00163 if (fPROOFcfg.fName.length() > 0) {
00164
00165 if (ReadPROOFcfg() == 0) {
00166 TRACE(ALL, "PROOF config file will " <<
00167 ((fReloadPROOFcfg) ? "" : "not ") << "be reloaded upon change");
00168 dodefault = 0;
00169 } else {
00170 if (!fDfltFallback) {
00171 XPDERR("unable to find valid information in PROOF config file " <<
00172 fPROOFcfg.fName);
00173 fPROOFcfg.fMtime = -1;
00174 return 0;
00175 } else {
00176 TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration to start with");
00177 }
00178 }
00179 }
00180 if (dodefault) {
00181
00182 CreateDefaultPROOFcfg();
00183 }
00184 } else if (fResourceType == kRTNone && fWorkers.size() <= 1 && !fBonjourEnabled) {
00185
00186 CreateDefaultPROOFcfg();
00187 }
00188
00189
00190 FindUniqueNodes();
00191 }
00192
00193
00194 XrdProofConn::SetRetryParam(1, 1);
00195
00196 EnvPutInt(NAME_REQUESTTIMEOUT, fRequestTO);
00197
00198
00199 XPDFORM(msg, "%d worker nodes defined at start-up", fWorkers.size() - 1);
00200 TRACE(ALL, msg);
00201
00202
00203 return 0;
00204 }
00205
00206
00207 int XrdProofdNetMgr::DoDirective(XrdProofdDirective *d,
00208 char *val, XrdOucStream *cfg, bool rcf)
00209 {
00210
00211 XPDLOC(NMGR, "NetMgr::DoDirective")
00212
00213 if (!d)
00214
00215 return -1;
00216
00217 if (d->fName == "resource") {
00218 return DoDirectiveResource(val, cfg, rcf);
00219 } else if (d->fName == "adminreqto") {
00220 return DoDirectiveAdminReqTO(val, cfg, rcf);
00221 } else if (d->fName == "worker") {
00222 return DoDirectiveWorker(val, cfg, rcf);
00223 } else if (d->fName == "bonjour") {
00224 return DoDirectiveBonjour(val, cfg, rcf);
00225 }
00226
00227 TRACE(XERR, "unknown directive: " << d->fName);
00228
00229 return -1;
00230 }
00231
00232
00233 int XrdProofdNetMgr::DoDirectiveBonjour(char *val, XrdOucStream *cfg, bool)
00234 {
00235 XPDLOC(NMGR, "NetMgr::DoDirectiveBonjour");
00236
00237
00238 TRACE(DBG, "processing Bonjour directive");
00239
00240 if (!val || !cfg)
00241
00242 return -1;
00243
00244 #if defined(BUILD_BONJOUR)
00245 const char * cp = NULL;
00246
00247
00248 if (!strcmp("register", val) || !strcmp("publish", val)) {
00249
00250 fBonjourRequestedSrvType = kBonjourSrvRegister;
00251 } else if (!strcmp("discover", val) || !strcmp("browse", val)) {
00252 fBonjourRequestedSrvType = kBonjourSrvBrowse;
00253 } else if (!strcmp("both", val) || !strcmp("all", val)) {
00254 fBonjourRequestedSrvType = kBonjourSrvBoth;
00255 } else {
00256 TRACE(XERR, "Bonjour directive unknown");
00257 return -1;
00258 }
00259
00260
00261
00262
00263 while ((val = cfg->GetWord()) != NULL) {
00264
00265 XrdOucString s(val);
00266
00267
00268
00269 if (s.beginswith("servicetype=")) {
00270 cp = index(val, '=');
00271 cp++;
00272 fBonjourServiceType.assign(cp, 0);
00273 TRACE(DBG, "custom service type is " << cp);
00274 } else if (s.beginswith("name=")) {
00275 cp = index(val, '=');
00276 cp++;
00277 fBonjourName.assign(cp, 0);
00278 TRACE(DBG, "custom Bonjour name is " << cp);
00279 } else if (s.beginswith("domain=")) {
00280 cp = index(val, '=');
00281 cp++;
00282 fBonjourDomain.assign(cp, 0);
00283 TRACE(DBG, "custom Bonjour domain is " << cp);
00284 } else if (s.beginswith("cores=")) {
00285 cp = index(val, '=');
00286 cp++;
00287 fBonjourCores = strtol(cp, NULL, 10);
00288 if (fBonjourCores <= 0) {
00289 TRACE(XERR, "number of cores not valid: " << fBonjourCores);
00290 TRACE(XERR, "Bonjour module not loaded!");
00291 return -1;
00292 }
00293 TRACE(DBG, "custom number of cores is " << cp);
00294 } else {
00295 TRACE(XERR, "Bonjour directive unknown");
00296 cfg->RetToken();
00297 return -1;
00298 }
00299 }
00300 TRACE(DBG, "custom Bonjour name is '" << fBonjourName <<"'");
00301
00302
00303 if (!XrdProofdNetMgr::CheckBonjourRoleCoherence(fMgr->SrvType(), GetBonjourRequestedServiceType())) {
00304 TRACE(XERR, "Warning: xpd.role directive and xpd.bonjour service selection are not compatible");
00305 }
00306
00307
00308 return LoadBonjourModule(fBonjourRequestedSrvType);
00309
00310 #else
00311
00312 TRACE(XERR, "Bonjour support is disabled");
00313 return -1;
00314
00315 #endif
00316 }
00317
00318
00319 void XrdProofdNetMgr::BalanceNodesOrder()
00320 {
00321
00322 list<XrdProofWorker *>::const_iterator iter, iter2;
00323 list<XrdProofWorker *>::iterator iter3;
00324
00325 map<XrdProofWorker *, BalancerInfo> info;
00326
00327 unsigned int min = UINT_MAX;
00328
00329 unsigned int total = 0, total_perit = 0;
00330
00331 unsigned int total_added = 0;
00332
00333 list<XrdProofWorker *> tempNodes;
00334
00335 bool deleted;
00336
00337
00338 for (iter = fNodes.begin(); iter != fNodes.end(); iter++) {
00339
00340
00341
00342
00343
00344
00345
00346 info[*iter].available = 0;
00347 for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); iter2++) {
00348 if ((*iter)->Matches(*iter2)) {
00349 info[*iter].available++;
00350 }
00351 }
00352 info[*iter].added = 0;
00353
00354 if (info[*iter].available > 1 && info[*iter].available < min)
00355 min = info[*iter].available;
00356
00357 total += info[*iter].available;
00358 }
00359
00360
00361
00362 for (iter = fNodes.begin(); iter != fNodes.end(); iter++) {
00363 if (info[*iter].available > 1) {
00364 info[*iter].per_iteration = (unsigned int)floor((double)info[*iter].available / (double)min);
00365 } else {
00366 info[*iter].per_iteration = 1;
00367 }
00368
00369 total_perit += info[*iter].per_iteration;
00370 }
00371
00372
00373
00374 tempNodes.push_back(fWorkers.front());
00375
00376
00377
00378 while (total_added < total) {
00379 for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); i++) {
00380 if (i->second.added < i->second.available) {
00381
00382 unsigned int to_add = xrdmin(i->second.per_iteration,
00383 (i->second.available - i->second.added));
00384
00385 for (unsigned int j = 0; j < to_add; j++) {
00386 tempNodes.push_back(i->first);
00387 }
00388 i->second.added += to_add;
00389 total_added += to_add;
00390 }
00391 }
00392 }
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402 iter3 = ++(fWorkers.begin());
00403 while (iter3 != fWorkers.end()) {
00404 deleted = false;
00405
00406
00407
00408 if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
00409
00410 for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
00411 if ((*iter2)->Matches(*iter3)) {
00412
00413 (*iter2)->MergeProofServs(*(*iter3));
00414 deleted = true;
00415 delete *iter3;
00416 fWorkers.erase(iter3++);
00417 break;
00418 }
00419 }
00420 }
00421
00422 if (!deleted)
00423 iter3++;
00424 }
00425
00426
00427 fWorkers = tempNodes;
00428 }
00429
00430
00431 #if defined(BUILD_BONJOUR)
00432 void * XrdProofdNetMgr::ProcessBonjourUpdate(void * context)
00433 {
00434 XrdProofdNetMgr * mgr;
00435 std::list<XrdOucBonjourNode *> nodes;
00436 std::list<XrdOucBonjourNode *>::const_iterator idx;
00437 std::list<XrdProofWorker *>::iterator w, w2;
00438 const XrdOucBonjourNode * i;
00439 XrdProofWorker * worker;
00440 bool haveit;
00441 int cores = -1;
00442 int recordLength;
00443
00444 XPDLOC(NMGR, "NetMgr::ProcessBonjourUpdate");
00445 TRACE(DBG, "Updating the network topology");
00446
00447 mgr = static_cast<XrdProofdNetMgr *>(context);
00448
00449
00450 XrdSysMutexHelper mhp(mgr->fMutex);
00451
00452
00453
00454
00455 if (mgr->fRegWorkers.size() < 1) {
00456 XrdOucString mm("master ", 128);
00457 mm += mgr->fMgr->Host();
00458 mm += " port=";
00459 mm += mgr->fMgr->Port();
00460 mgr->fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
00461 } else {
00462
00463 w = mgr->fRegWorkers.begin();
00464
00465 w++;
00466 for (; w != mgr->fRegWorkers.end(); w++) {
00467 (*w)->fActive = false;
00468 }
00469 }
00470
00471
00472
00473 mgr->fBonjourManager->LockNodeList();
00474 nodes = mgr->fBonjourManager->GetCurrentNodeList();
00475
00476 for (idx = nodes.begin(); idx != nodes.end(); idx++) {
00477
00478 i = *idx;
00479 (*i).Print();
00480
00481 if (!i->GetHostName() || (i->GetHostName() && strlen(i->GetHostName()) <= 0)) {
00482 TRACE(ALL,"bonjour list node: empty!");
00483 continue;
00484 }
00485 TRACE(DBG, "parsing info for node: " << i->GetHostName() << ", port: " << i->GetPort());
00486
00487 if (i->GetBonjourRecord().MatchesRegisteredType(mgr->GetBonjourServiceType())) {
00488
00489 w = mgr->fRegWorkers.begin();
00490 w++;
00491 haveit = 0;
00492 while (w != mgr->fRegWorkers.end()) {
00493 TRACE(HDBG,"registerd node: "<< (*w)->fHost <<", port: "<<(*w)->fPort);
00494 if ((*w)->fHost == i->GetHostName() && (*w)->fPort == i->GetPort()) {
00495 (*w)->fActive = true;
00496 haveit = 1;
00497
00498
00499 if (std::find(mgr->fWorkers.begin(), mgr->fWorkers.end(), *w) == mgr->fWorkers.end()) {
00500
00501 if (i->GetBonjourRecord().GetTXTValue("cores", recordLength) != NULL) {
00502 XrdOucString trimmed(i->GetBonjourRecord().GetTXTValue("cores", recordLength), recordLength);
00503 cores = strtol(trimmed.c_str(), NULL, 10);
00504 } else {
00505 cores = 1;
00506 }
00507
00508
00509 TRACE(HDBG, " adding "<<cores<<" for worker '"<<(*w)->fHost<<"'");
00510 for (int c = 0; c < cores; c++) {
00511
00512
00513 mgr->fWorkers.push_back(*w);
00514 }
00515 } else {
00516 TRACE(DBG, " worker(s) '"<<(*w)->fHost<<"' already in the list");
00517 }
00518
00519 break;
00520 }
00521
00522 w++;
00523 }
00524
00525 if (!haveit) {
00526
00527 worker = new XrdProofWorker();
00528 worker->fHost = i->GetHostName();
00529 worker->fPort = i->GetPort();
00530 worker->fActive = true;
00531 if (i->GetBonjourRecord().GetTXTValue("nodetype", recordLength) != NULL) {
00532 worker->fType = i->GetBonjourRecord().GetTXTValue("nodetype", recordLength)[0];
00533 }
00534
00535 const char *pbr = i->GetBonjourRecord().GetTXTValue("cores", recordLength);
00536 if (recordLength > 0) {
00537 char *pc = new char[recordLength + 1];
00538 memcpy(pc, pbr, recordLength);
00539 pc[recordLength] = 0;
00540 cores = strtol(pbr, NULL, 10);
00541 delete [] pc;
00542 } else {
00543 TRACE(ALL, "no information about the cores available ... skip");
00544 continue;
00545 }
00546
00547 mgr->fRegWorkers.push_back(worker);
00548 for (int c = 0; c < cores; c++) {
00549
00550
00551 mgr->fWorkers.push_back(worker);
00552 }
00553 }
00554 }
00555 }
00556
00557
00558 mgr->fBonjourManager->UnLockNodeList();
00559
00560
00561 w = mgr->fRegWorkers.begin();
00562 w++;
00563 while (w != mgr->fRegWorkers.end()) {
00564 if (!((*w)->fActive)) {
00565 mgr->fWorkers.remove(*w);
00566 }
00567 w++;
00568 }
00569
00570
00571 mgr->FindUniqueNodes();
00572
00573
00574 mgr->BalanceNodesOrder();
00575
00576 return NULL;
00577 }
00578 #endif
00579
00580
00581 #if defined(BUILD_BONJOUR)
00582 int XrdProofdNetMgr::LoadBonjourModule(int srvtype)
00583 {
00584 XPDLOC(NMGR, "NetMgr::LoadBonjourModule");
00585
00586
00587
00588 fBonjourManager = &(XrdOucBonjourFactory::FactoryByPlatform()->GetBonjourManager());
00589
00590
00591 if (srvtype == kBonjourSrvRegister || srvtype == kBonjourSrvBoth) {
00592
00593 XrdOucBonjourRecord record(GetBonjourName(), GetBonjourServiceType(), GetBonjourDomain());
00594
00595
00596 if (XrdProofdProtocol::Mgr())
00597 switch (XrdProofdProtocol::Mgr()->SrvType()) {
00598 case kXPD_TopMaster:
00599 case kXPD_Master:
00600 record.AddTXTRecord("nodetype", "S");
00601 break;
00602 case kXPD_AnyServer:
00603 case kXPD_Worker:
00604 record.AddTXTRecord("nodetype", "W");
00605 break;
00606 default:
00607 TRACE(XERR, "TXT node type is not known '" << XrdProofdProtocol::Mgr()->SrvType() << "'");
00608 }
00609
00610
00611 record.AddTXTRecord("cores", fBonjourCores);
00612
00613
00614 if (fBonjourManager->RegisterService(record, fMgr->Port()) == 0) {
00615 TRACE(ALL, "Bonjour service was published OK");
00616 } else {
00617 TRACE(XERR, "Bonjour service could not be published");
00618 return -1;
00619 }
00620 }
00621
00622
00623 if (srvtype == kBonjourSrvBrowse || srvtype == kBonjourSrvBoth) {
00624 fBonjourEnabled = true;
00625 fBonjourManager->SubscribeForUpdates(GetBonjourServiceType(), ProcessBonjourUpdate, this);
00626 }
00627
00628 return 0;
00629 }
00630 #endif
00631
00632
00633 #if defined(BUILD_BONJOUR)
00634 bool XrdProofdNetMgr::CheckBonjourRoleCoherence(int xrdRole, int bonjourSrvType)
00635 {
00636
00637 const bool allowed[4][3] = {{true, true, true },
00638 {false, true, false},
00639 {true, true, true },
00640 {true, false, false}};
00641
00642 if (xrdRole < -1 || xrdRole > 2 || bonjourSrvType < -1 || bonjourSrvType > 2)
00643 return false;
00644
00645 if (bonjourSrvType == kBonjourSrvDisabled)
00646 return true;
00647
00648
00649
00650 return allowed[xrdRole + 1][bonjourSrvType];
00651 }
00652 #endif
00653
00654
00655 int XrdProofdNetMgr::DoDirectiveAdminReqTO(char *val, XrdOucStream *cfg, bool)
00656 {
00657
00658
00659 if (!val)
00660
00661 return -1;
00662
00663
00664 if (fMgr->Host() && cfg)
00665 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
00666 return 0;
00667
00668
00669
00670 int to = strtol(val, 0, 10);
00671 fRequestTO = (to > 0) ? to : fRequestTO;
00672 return 0;
00673 }
00674
00675
00676 int XrdProofdNetMgr::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
00677 {
00678
00679 XPDLOC(NMGR, "NetMgr::DoDirectiveResource")
00680
00681 if (!val || !cfg)
00682
00683 return -1;
00684
00685 if (!strcmp("static", val)) {
00686
00687
00688 fResourceType = kRTStatic;
00689 while ((val = cfg->GetWord()) && val[0]) {
00690 XrdOucString s(val);
00691 if (s.beginswith("ucfg:")) {
00692 fWorkerUsrCfg = s.endswith("yes") ? 1 : 0;
00693 } else if (s.beginswith("reload:")) {
00694 fReloadPROOFcfg = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
00695 } else if (s.beginswith("dfltfallback:")) {
00696 fDfltFallback = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
00697 } else if (s.beginswith("wmx:")) {
00698 } else if (s.beginswith("selopt:")) {
00699 } else {
00700
00701 fPROOFcfg.fName = val;
00702 if (fPROOFcfg.fName.beginswith("sm:")) {
00703 fPROOFcfg.fName.replace("sm:", "");
00704 }
00705 XrdProofdAux::Expand(fPROOFcfg.fName);
00706
00707 if (access(fPROOFcfg.fName.c_str(), R_OK)) {
00708 if (errno == ENOENT) {
00709 TRACE(ALL, "WARNING: configuration file does not exists: " << fPROOFcfg.fName);
00710 } else {
00711 TRACE(XERR, "configuration file cannot be read: " << fPROOFcfg.fName);
00712 fPROOFcfg.fName = "";
00713 fPROOFcfg.fMtime = -1;
00714 }
00715 }
00716 }
00717 }
00718 }
00719 return 0;
00720 }
00721
00722
00723 int XrdProofdNetMgr::DoDirectiveWorker(char *val, XrdOucStream *cfg, bool)
00724 {
00725
00726 XPDLOC(NMGR, "NetMgr::DoDirectiveWorker")
00727
00728 if (!val || !cfg)
00729
00730 return -1;
00731
00732
00733 XrdSysMutexHelper mhp(fMutex);
00734
00735
00736 cfg->RetToken();
00737 XrdOucString wrd(cfg->GetWord());
00738 if (wrd.length() > 0) {
00739
00740 XrdOucString line;
00741 char rest[2048] = {0};
00742 cfg->GetRest((char *)&rest[0], 2048);
00743 XPDFORM(line, "%s %s", wrd.c_str(), rest);
00744
00745 if (wrd == "master" || wrd == "node") {
00746
00747 XrdProofWorker *pw = new XrdProofWorker(line.c_str());
00748 if (pw->fHost.beginswith("localhost") ||
00749 pw->Matches(fMgr->Host())) {
00750
00751 XrdProofWorker *fw = fWorkers.front();
00752 fw->Reset(line.c_str());
00753 }
00754 SafeDelete(pw);
00755 } else {
00756
00757 int nr = 1;
00758 int ir = line.find("repeat=");
00759 if (ir != STR_NPOS) {
00760 XrdOucString r(line, ir + strlen("repeat="));
00761 r.erase(r.find(' '));
00762 nr = r.atoi();
00763 if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
00764 TRACE(DBG, "found repeat = " << nr);
00765 }
00766 while (nr--) {
00767
00768 XrdProofdMultiStr mline(line.c_str());
00769 if (mline.IsValid()) {
00770 TRACE(DBG, "found multi-line with: " << mline.N() << " tokens");
00771 for (int i = 0; i < mline.N(); i++) {
00772 TRACE(HDBG, "found token: " << mline.Get(i));
00773 fWorkers.push_back(new XrdProofWorker(mline.Get(i).c_str()));
00774 }
00775 } else {
00776 TRACE(DBG, "found line: " << line);
00777 fWorkers.push_back(new XrdProofWorker(line.c_str()));
00778 }
00779 }
00780 }
00781 }
00782
00783
00784
00785
00786 FindUniqueNodes();
00787
00788
00789 return 0;
00790 }
00791
00792
00793 int XrdProofdNetMgr::BroadcastCtrlC(const char *usr)
00794 {
00795
00796
00797 XPDLOC(NMGR, "NetMgr::BroadcastCtrlC")
00798
00799 int rc = 0;
00800
00801
00802 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
00803 XrdProofWorker *w = 0;
00804 while (iw != fNodes.end()) {
00805 if ((w = *iw) && w->fType != 'M') {
00806
00807 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
00808 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
00809 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
00810 if (!us) {
00811
00812 XrdOucString u = (usr) ? usr : fMgr->EffectiveUser();
00813 u += '@';
00814 u += w->fHost;
00815 if (w->fPort != -1) {
00816 u += ':';
00817 u += w->fPort;
00818 }
00819
00820 XrdProofConn *conn = GetProofConn(u.c_str());
00821 if (conn && conn->IsValid()) {
00822
00823 XPClientRequest reqhdr;
00824 memset(&reqhdr, 0, sizeof(reqhdr));
00825 conn->SetSID(reqhdr.header.streamid);
00826 reqhdr.proof.requestid = kXP_ctrlc;
00827 reqhdr.proof.sid = 0;
00828 reqhdr.proof.dlen = 0;
00829
00830 if (XPD::clientMarshall(&reqhdr) != 0) {
00831 TRACE(XERR, "problems marshalling request");
00832 return -1;
00833 }
00834 if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
00835 TRACE(XERR, "problems sending ctrl-c request to server " << u);
00836 }
00837
00838
00839 SafeDelete(conn);
00840 }
00841 } else {
00842 TRACE(DBG, "broadcast request for ourselves: ignore");
00843 }
00844 }
00845
00846 iw++;
00847 }
00848
00849
00850 return rc;
00851 }
00852
00853
00854 int XrdProofdNetMgr::Broadcast(int type, const char *msg, const char *usr,
00855 XrdProofdResponse *r, bool notify, int subtype)
00856 {
00857
00858
00859 XPDLOC(NMGR, "NetMgr::Broadcast")
00860
00861 unsigned int nok = 0;
00862 TRACE(REQ, "type: " << type);
00863
00864
00865 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
00866 XrdProofWorker *w = 0;
00867 XrdClientMessage *xrsp = 0;
00868 while (iw != fNodes.end()) {
00869 if ((w = *iw) && w->fType != 'M') {
00870
00871 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
00872 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
00873 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
00874 if (!us) {
00875
00876 XrdOucString u = (usr) ? usr : fMgr->EffectiveUser();
00877 u += '@';
00878 u += w->fHost;
00879 if (w->fPort != -1) {
00880 u += ':';
00881 u += w->fPort;
00882 }
00883
00884 int srvtype = (w->fType != 'W') ? (kXR_int32) kXPD_Master
00885 : (kXR_int32) kXPD_Worker;
00886 TRACE(HDBG, "sending request to " << u);
00887
00888 if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
00889 TRACE(XERR, "problems sending request to " << u);
00890 } else {
00891 nok++;
00892 }
00893
00894 SafeDelete(xrsp);
00895 } else {
00896 TRACE(DBG, "broadcast request for ourselves: ignore");
00897 }
00898 }
00899
00900 iw++;
00901 }
00902
00903
00904 return (nok == fNodes.size()) ? 0 : -1;
00905 }
00906
00907
00908 XrdProofConn *XrdProofdNetMgr::GetProofConn(const char *url)
00909 {
00910
00911
00912 XrdProofConn *p = 0;
00913
00914
00915 XrdOucString buf = " Manager connection from ";
00916 buf += fMgr->Host();
00917 buf += "|ord:000";
00918 char m = 'A';
00919
00920 {
00921 XrdSysMutexHelper mhp(fMutex);
00922 p = new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
00923 }
00924 if (p && !(p->IsValid())) SafeDelete(p);
00925
00926
00927 return p;
00928 }
00929
00930
00931 XrdClientMessage *XrdProofdNetMgr::Send(const char *url, int type,
00932 const char *msg, int srvtype,
00933 XrdProofdResponse *r, bool notify,
00934 int subtype)
00935 {
00936
00937
00938 XPDLOC(NMGR, "NetMgr::Send")
00939
00940 XrdClientMessage *xrsp = 0;
00941 TRACE(REQ, "type: " << type);
00942
00943 if (!url || strlen(url) <= 0)
00944 return xrsp;
00945
00946
00947 XrdProofConn *conn = GetProofConn(url);
00948
00949 bool ok = 1;
00950 if (conn && conn->IsValid()) {
00951 XrdOucString notifymsg("Send: ");
00952
00953 XPClientRequest reqhdr;
00954 const void *buf = 0;
00955 char **vout = 0;
00956 memset(&reqhdr, 0, sizeof(reqhdr));
00957 conn->SetSID(reqhdr.header.streamid);
00958 reqhdr.header.requestid = kXP_admin;
00959 reqhdr.proof.int1 = type;
00960 switch (type) {
00961 case kROOTVersion:
00962 notifymsg += "change-of-ROOT version request to ";
00963 notifymsg += url;
00964 notifymsg += " msg: ";
00965 notifymsg += msg;
00966 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
00967 buf = (msg) ? (const void *)msg : buf;
00968 break;
00969 case kCleanupSessions:
00970 notifymsg += "cleanup request to ";
00971 notifymsg += url;
00972 notifymsg += " for user: ";
00973 notifymsg += msg;
00974 reqhdr.proof.int2 = (kXR_int32) srvtype;
00975 reqhdr.proof.sid = -1;
00976 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
00977 buf = (msg) ? (const void *)msg : buf;
00978 break;
00979 case kExec:
00980 notifymsg += "exec ";
00981 notifymsg += subtype;
00982 notifymsg += "request for ";
00983 notifymsg += msg;
00984 reqhdr.proof.int2 = (kXR_int32) subtype;
00985 reqhdr.proof.sid = -1;
00986 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
00987 buf = (msg) ? (const void *)msg : buf;
00988 break;
00989 default:
00990 ok = 0;
00991 TRACE(XERR, "invalid request type " << type);
00992 break;
00993 }
00994
00995
00996 if (r && notify)
00997 r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notifymsg.c_str(), notifymsg.length());
00998
00999
01000 conn->SetAsync(conn, &MessageSender, (void *)r);
01001
01002
01003 if (ok)
01004 xrsp = conn->SendReq(&reqhdr, buf, vout, "NetMgr::Send");
01005
01006
01007 conn->SetAsync(0, 0, (void *)0);
01008
01009
01010 if (r && !xrsp && conn->GetLastErr()) {
01011 XrdOucString cmsg = url;
01012 cmsg += ": ";
01013 cmsg += conn->GetLastErr();
01014 r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
01015 }
01016
01017
01018 SafeDelete(conn);
01019
01020 } else {
01021 TRACE(XERR, "could not open connection to " << url);
01022 if (r) {
01023 XrdOucString cmsg = "failure attempting connection to ";
01024 cmsg += url;
01025 r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
01026 }
01027 }
01028
01029
01030 return xrsp;
01031 }
01032
01033
01034 bool XrdProofdNetMgr::IsLocal(const char *host, bool checkport)
01035 {
01036
01037
01038
01039
01040 int rc = 0;
01041 if (host && strlen(host) > 0) {
01042 XrdClientUrlInfo uu(host);
01043 if (uu.Port <= 0) uu.Port = 1093;
01044
01045 char *fqn = XrdNetDNS::getHostName(uu.Host.c_str());
01046 if (fqn && (strstr(fqn, "localhost") || !strcmp(fqn, "127.0.0.1") ||
01047 !strcmp(fMgr->Host(), fqn))) {
01048 if (!checkport || (uu.Port == fMgr->Port()))
01049 rc = 1;
01050 }
01051 SafeFree(fqn);
01052 }
01053
01054 return rc;
01055 }
01056
01057
01058 int XrdProofdNetMgr::ReadBuffer(XrdProofdProtocol *p)
01059 {
01060
01061 XPDLOC(NMGR, "NetMgr::ReadBuffer")
01062
01063 int rc = 0;
01064 XPD_SETRESP(p, "ReadBuffer");
01065
01066 XrdOucString emsg;
01067
01068
01069
01070 kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
01071 int len = ntohl(p->Request()->readbuf.len);
01072
01073
01074 char *file = 0;
01075 char *filen = 0;
01076 char *pattern = 0;
01077 int dlen = p->Request()->header.dlen;
01078 int grep = ntohl(p->Request()->readbuf.int1);
01079 int blen = dlen;
01080 bool local = 0;
01081 if (dlen > 0 && p->Argp()->buff) {
01082 file = new char[dlen+1];
01083 memcpy(file, p->Argp()->buff, dlen);
01084 file[dlen] = 0;
01085
01086 XrdClientUrlInfo ui(file);
01087 if (ui.Host.length() > 0) {
01088
01089 local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
01090 if (local) {
01091 memcpy(file, ui.File.c_str(), ui.File.length());
01092 file[ui.File.length()] = 0;
01093 blen = ui.File.length();
01094 TRACEP(p, DBG, "file is LOCAL");
01095 }
01096 }
01097
01098 if (grep > 0) {
01099
01100 pattern = new char[len + 1];
01101 int j = blen - len;
01102 int i = 0;
01103 while (j < blen)
01104 pattern[i++] = file[j++];
01105 pattern[i] = 0;
01106 filen = strdup(file);
01107 filen[blen - len] = 0;
01108 TRACEP(p, DBG, "grep operation " << grep << ", pattern:" << pattern);
01109 }
01110 } else {
01111 emsg = "file name not found";
01112 TRACEP(p, XERR, emsg);
01113 response->Send(kXR_InvalidRequest, emsg.c_str());
01114 return 0;
01115 }
01116 if (grep) {
01117 TRACEP(p, REQ, "file: " << filen << ", ofs: " << ofs << ", len: " << len <<
01118 ", pattern: " << pattern);
01119 } else {
01120 TRACEP(p, REQ, "file: " << file << ", ofs: " << ofs << ", len: " << len);
01121 }
01122
01123
01124 int lout = len;
01125 char *buf = 0;
01126 if (local) {
01127 if (grep > 0) {
01128
01129 lout = blen;
01130 buf = ReadBufferLocal(filen, pattern, lout, grep);
01131 } else {
01132
01133 buf = ReadBufferLocal(file, ofs, lout);
01134 }
01135 } else {
01136
01137 XrdClientUrlInfo u(file);
01138 u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
01139 buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
01140 }
01141
01142 if (!buf) {
01143 if (lout > 0) {
01144 if (grep > 0) {
01145 if (TRACING(DBG)) {
01146 XPDFORM(emsg, "nothing found by 'grep' in %s, pattern: %s", filen, pattern);
01147 TRACEP(p, DBG, emsg);
01148 }
01149 response->Send();
01150 return 0;
01151 } else {
01152 XPDFORM(emsg, "could not read buffer from %s %s",
01153 (local) ? "local file " : "remote file ", file);
01154 TRACEP(p, XERR, emsg);
01155 response->Send(kXR_InvalidRequest, emsg.c_str());
01156 return 0;
01157 }
01158 } else {
01159
01160 if (TRACING(DBG)) {
01161 emsg = "nothing found in ";
01162 emsg += (grep > 0) ? filen : file;
01163 TRACEP(p, DBG, emsg);
01164 }
01165 }
01166 }
01167
01168
01169 response->Send(buf, lout);
01170
01171
01172 SafeFree(buf);
01173 SafeDelArray(file);
01174 SafeFree(filen);
01175 SafeDelArray(pattern);
01176
01177
01178 return 0;
01179 }
01180
01181
01182 int XrdProofdNetMgr::LocateLocalFile(XrdOucString &file)
01183 {
01184
01185
01186
01187
01188 XPDLOC(NMGR, "NetMgr::LocateLocalFile")
01189
01190
01191 if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;
01192
01193
01194 XrdOucString fn, dn;
01195 int isl = file.rfind('/');
01196 if (isl != STR_NPOS) {
01197 fn.assign(file, isl + 1, -1);
01198 dn.assign(file, 0, isl);
01199 } else {
01200 fn = file;
01201 dn = "./";
01202 }
01203
01204 XrdOucString emsg;
01205
01206 DIR *dirp = opendir(dn.c_str());
01207 if (!dirp) {
01208 XPDFORM(emsg, "cannot open '%s' - errno: %d", dn.c_str(), errno);
01209 TRACE(XERR, emsg.c_str());
01210 return -1;
01211 }
01212 struct dirent *ent = 0;
01213 XrdOucString sent;
01214 while ((ent = readdir(dirp))) {
01215 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2))
01216 continue;
01217
01218 sent = ent->d_name;
01219 if (sent.matches(fn.c_str()) > 0) break;
01220 sent = "";
01221 }
01222 closedir(dirp);
01223
01224
01225 if (sent.length() > 0) {
01226 XPDFORM(file, "%s%s", dn.c_str(), sent.c_str());
01227 return 0;
01228 }
01229
01230
01231 return -1;
01232 }
01233
01234
01235 char *XrdProofdNetMgr::ReadBufferLocal(const char *path, kXR_int64 ofs, int &len)
01236 {
01237
01238
01239
01240
01241
01242 XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
01243
01244 XrdOucString emsg;
01245 TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);
01246
01247
01248 if (!path || strlen(path) <= 0) {
01249 TRACE(XERR, "path undefined!");
01250 return (char *)0;
01251 }
01252
01253
01254 XrdOucString spath(path);
01255 if (LocateLocalFile(spath) != 0) {
01256 TRACE(XERR, "path cannot be resolved! (" << path << ")");
01257 return (char *)0;
01258 }
01259 const char *file = spath.c_str();
01260
01261
01262 int fd = open(file, O_RDONLY);
01263 if (fd < 0) {
01264 emsg = "could not open ";
01265 emsg += file;
01266 TRACE(XERR, emsg);
01267 return (char *)0;
01268 }
01269
01270
01271 struct stat st;
01272 if (fstat(fd, &st) != 0) {
01273 emsg = "could not get size of file with stat: errno: ";
01274 emsg += (int)errno;
01275 TRACE(XERR, emsg);
01276 close(fd);
01277 return (char *)0;
01278 }
01279 off_t ltot = st.st_size;
01280
01281
01282
01283 kXR_int64 start = ofs;
01284 off_t fst = (start < 0) ? ltot + start : start;
01285 fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
01286
01287 kXR_int64 end = fst + len;
01288 off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
01289 TRACE(DBG, "file size: " << ltot << ", read from: " << fst << " to " << lst);
01290
01291
01292 len = lst - fst;
01293
01294
01295 char *buf = (char *)malloc(len + 1);
01296 if (!buf) {
01297 emsg = "could not allocate enough memory on the heap: errno: ";
01298 emsg += (int)errno;
01299 XPDERR(emsg);
01300 close(fd);
01301 return (char *)0;
01302 }
01303
01304
01305 if (fst >= 0)
01306 lseek(fd, fst, SEEK_SET);
01307
01308 int left = len;
01309 int pos = 0;
01310 int nr = 0;
01311 do {
01312 while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
01313 errno = 0;
01314 if (nr < 0) {
01315 TRACE(XERR, "error reading from file: errno: " << errno);
01316 break;
01317 }
01318
01319
01320 pos += nr;
01321 left -= nr;
01322
01323 } while (nr > 0 && left > 0);
01324
01325
01326 buf[len] = 0;
01327 TRACE(HDBG, "read " << nr << " bytes: " << buf);
01328
01329
01330 close(fd);
01331
01332
01333 return buf;
01334 }
01335
01336
01337 char *XrdProofdNetMgr::ReadBufferLocal(const char *path,
01338 const char *pat, int &len, int opt)
01339 {
01340
01341
01342
01343
01344
01345 XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
01346
01347 XrdOucString emsg;
01348 TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);
01349
01350
01351 if (!path || strlen(path) <= 0) {
01352 TRACE(XERR, "file path undefined!");
01353 return (char *)0;
01354 }
01355
01356
01357 XrdOucString spath(path);
01358 if (LocateLocalFile(spath) != 0) {
01359 TRACE(XERR, "path cannot be resolved! (" << path << ")");
01360 return (char *)0;
01361 }
01362 const char *file = spath.c_str();
01363
01364
01365 struct stat st;
01366 if (stat(file, &st) != 0) {
01367 emsg = "could not get size of file with stat: errno: ";
01368 emsg += (int)errno;
01369 TRACE(XERR, emsg);
01370 return (char *)0;
01371 }
01372 off_t ltot = st.st_size;
01373
01374
01375 char *cmd = 0;
01376 int lcmd = 0;
01377 if (pat && strlen(pat) > 0) {
01378 lcmd = strlen(pat) + strlen(file) + 20;
01379 cmd = new char[lcmd];
01380 if (opt == 2) {
01381 sprintf(cmd, "grep -v %s %s", pat, file);
01382 } else {
01383 sprintf(cmd, "grep %s %s", pat, file);
01384 }
01385 } else {
01386 lcmd = strlen(file) + 10;
01387 cmd = new char[lcmd];
01388 sprintf(cmd, "cat %s", file);
01389 }
01390 TRACE(DBG, "cmd: " << cmd);
01391
01392
01393 FILE *fp = popen(cmd, "r");
01394 if (!fp) {
01395 emsg = "could not run '";
01396 emsg += cmd;
01397 emsg += "'";
01398 TRACE(XERR, emsg);
01399 delete[] cmd;
01400 return (char *)0;
01401 }
01402 delete[] cmd;
01403
01404
01405 len = 0;
01406 char *buf = 0;
01407 char line[2048];
01408 int bufsiz = 0, left = 0, lines = 0;
01409 while ((ltot > 0) && fgets(line, sizeof(line), fp)) {
01410
01411 int llen = strlen(line);
01412 ltot -= llen;
01413 lines++;
01414
01415 if (!buf || (llen > left)) {
01416 int dsiz = 100 * ((int)((len + llen) / lines) + 1);
01417 dsiz = (dsiz > llen) ? dsiz : llen;
01418 bufsiz += dsiz;
01419 buf = (char *)realloc(buf, bufsiz + 1);
01420 left += dsiz;
01421 }
01422 if (!buf) {
01423 emsg = "could not allocate enough memory on the heap: errno: ";
01424 emsg += (int)errno;
01425 TRACE(XERR, emsg);
01426 pclose(fp);
01427 return (char *)0;
01428 }
01429
01430 memcpy(buf + len, line, llen);
01431 len += llen;
01432 left -= llen;
01433 if (TRACING(HDBG))
01434 fprintf(stderr, "line: %s", line);
01435 }
01436
01437
01438 if (buf) {
01439 if (len > 0) {
01440 buf[len] = 0;
01441 } else {
01442 free(buf);
01443 buf = 0;
01444 }
01445 }
01446
01447
01448 pclose(fp);
01449
01450
01451 return buf;
01452 }
01453
01454
01455 char *XrdProofdNetMgr::ReadBufferRemote(const char *url, const char *file,
01456 kXR_int64 ofs, int &len, int grep)
01457 {
01458
01459
01460
01461 XPDLOC(NMGR, "NetMgr::ReadBufferRemote")
01462
01463 TRACE(REQ, "url: " << (url ? url : "undef") <<
01464 ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
01465 ", len: " << len << ", grep: " << grep);
01466
01467
01468 if (!file || strlen(file) <= 0) {
01469 TRACE(XERR, "file undefined!");
01470 return (char *)0;
01471 }
01472 XrdClientUrlInfo u(url);
01473 if (!url || strlen(url) <= 0) {
01474
01475 u.TakeUrl(XrdOucString(file));
01476 if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
01477 }
01478
01479
01480 XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());
01481
01482 char *buf = 0;
01483 if (conn && conn->IsValid()) {
01484
01485 XPClientRequest reqhdr;
01486 memset(&reqhdr, 0, sizeof(reqhdr));
01487 conn->SetSID(reqhdr.header.streamid);
01488 reqhdr.header.requestid = kXP_readbuf;
01489 reqhdr.readbuf.ofs = ofs;
01490 reqhdr.readbuf.len = len;
01491 reqhdr.readbuf.int1 = grep;
01492 reqhdr.header.dlen = strlen(file);
01493 const void *btmp = (const void *) file;
01494 char **vout = &buf;
01495
01496 XrdClientMessage *xrsp =
01497 conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadBufferRemote");
01498
01499
01500 if (xrsp && buf && (xrsp->DataLen() > 0)) {
01501 len = xrsp->DataLen();
01502 } else {
01503 if (xrsp && !(xrsp->IsError()))
01504
01505 len = 0;
01506 SafeFree(buf);
01507 }
01508
01509
01510 SafeDelete(xrsp);
01511
01512
01513 SafeDelete(conn);
01514 }
01515
01516
01517 return buf;
01518 }
01519
01520
01521 char *XrdProofdNetMgr::ReadLogPaths(const char *url, const char *msg, int isess)
01522 {
01523
01524
01525 XPDLOC(NMGR, "NetMgr::ReadLogPaths")
01526
01527 TRACE(REQ, "url: " << (url ? url : "undef") <<
01528 ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);
01529
01530
01531 if (!url || strlen(url) <= 0) {
01532 TRACE(XERR, "url undefined!");
01533 return (char *)0;
01534 }
01535
01536
01537 XrdProofConn *conn = GetProofConn(url);
01538
01539 char *buf = 0;
01540 if (conn && conn->IsValid()) {
01541
01542 XPClientRequest reqhdr;
01543 memset(&reqhdr, 0, sizeof(reqhdr));
01544 conn->SetSID(reqhdr.header.streamid);
01545 reqhdr.header.requestid = kXP_admin;
01546 reqhdr.proof.int1 = kQueryLogPaths;
01547 reqhdr.proof.int2 = isess;
01548 reqhdr.proof.sid = -1;
01549 reqhdr.header.dlen = strlen(msg);
01550 const void *btmp = (const void *) msg;
01551 char **vout = &buf;
01552
01553 XrdClientMessage *xrsp =
01554 conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadLogPaths");
01555
01556
01557 if (xrsp && buf && (xrsp->DataLen() > 0)) {
01558 int len = xrsp->DataLen();
01559 buf = (char *) realloc((void *)buf, len + 1);
01560 if (buf)
01561 buf[len] = 0;
01562 } else {
01563 SafeFree(buf);
01564 }
01565
01566
01567 SafeDelete(xrsp);
01568
01569
01570 SafeDelete(conn);
01571 }
01572
01573
01574 return buf;
01575 }
01576
01577
01578 void XrdProofdNetMgr::CreateDefaultPROOFcfg()
01579 {
01580
01581
01582 XPDLOC(NMGR, "NetMgr::CreateDefaultPROOFcfg")
01583
01584 TRACE(DBG, "enter: local workers: " << fNumLocalWrks);
01585
01586
01587 XrdSysMutexHelper mhp(fMutex);
01588
01589
01590 fWorkers.clear();
01591
01592 if (fDfltWorkers.size() < 1) {
01593
01594 XrdOucString mm("master ", 128);
01595 mm += fMgr->Host();
01596 fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
01597
01598
01599 int nwrk = fNumLocalWrks;
01600 if (nwrk > 0) {
01601 mm = "worker localhost port=";
01602 mm += fMgr->Port();
01603 while (nwrk--) {
01604 fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
01605 TRACE(DBG, "added line: " << mm);
01606 }
01607 }
01608 }
01609
01610
01611 std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
01612 for (; w != fDfltWorkers.end(); w++) {
01613 fWorkers.push_back(*w);
01614 }
01615
01616 TRACE(DBG, "done: " << fWorkers.size() - 1 << " workers");
01617
01618
01619 FindUniqueNodes();
01620
01621
01622 return;
01623 }
01624
01625
01626 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
01627 {
01628
01629
01630 XPDLOC(NMGR, "NetMgr::GetActiveWorkers")
01631
01632 XrdSysMutexHelper mhp(fMutex);
01633
01634 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
01635
01636 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
01637 if (fDfltFallback) {
01638
01639 CreateDefaultPROOFcfg();
01640 TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
01641 } else {
01642 TRACE(XERR, "unable to read the configuration file");
01643 return (std::list<XrdProofWorker *> *)0;
01644 }
01645 }
01646 }
01647 TRACE(DBG, "returning list with " << fWorkers.size() << " entries");
01648
01649 if (TRACING(HDBG)) Dump();
01650
01651 return &fWorkers;
01652 }
01653
01654
01655 void XrdProofdNetMgr::Dump()
01656 {
01657
01658 const char *xpdloc = "NetMgr::Dump";
01659
01660 XrdSysMutexHelper mhp(fMutex);
01661
01662 XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
01663 XPDPRT("+ Active workers status");
01664 XPDPRT("+ Size: " << fWorkers.size());
01665 XPDPRT("+ ");
01666
01667 std::list<XrdProofWorker *>::iterator iw;
01668 for (iw = fWorkers.begin(); iw != fWorkers.end(); iw++) {
01669 XPDPRT("+ wrk: " << (*iw)->fHost << ":" << (*iw)->fPort << " type:" << (*iw)->fType <<
01670 " active sessions:" << (*iw)->Active());
01671 }
01672 XPDPRT("+ ");
01673 XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
01674 }
01675
01676
01677 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
01678 {
01679
01680
01681 XPDLOC(NMGR, "NetMgr::GetNodes")
01682
01683 XrdSysMutexHelper mhp(fMutex);
01684
01685 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
01686
01687 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
01688 if (fDfltFallback) {
01689
01690 CreateDefaultPROOFcfg();
01691 TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
01692 } else {
01693 TRACE(XERR, "unable to read the configuration file");
01694 return (std::list<XrdProofWorker *> *)0;
01695 }
01696 }
01697 }
01698 TRACE(DBG, "returning list with " << fNodes.size() << " entries");
01699
01700 return &fNodes;
01701 }
01702
01703
01704 int XrdProofdNetMgr::ReadPROOFcfg(bool reset)
01705 {
01706
01707
01708
01709 XPDLOC(NMGR, "NetMgr::ReadPROOFcfg")
01710
01711 TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);
01712
01713
01714 XrdSysMutexHelper mhp(fMutex);
01715
01716
01717 if (fPROOFcfg.fName.length() <= 0)
01718 return -1;
01719
01720
01721 struct stat st;
01722 if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
01723
01724
01725 if (errno == ENOENT) fPROOFcfg.fMtime = -1;
01726 if (!fDfltFallback) {
01727 TRACE(XERR, "unable to stat file: " << fPROOFcfg.fName << " - errno: " << errno);
01728 } else {
01729 TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration");
01730 }
01731 return -1;
01732 }
01733 TRACE(DBG, "time of last modification: " << st.st_mtime);
01734
01735
01736 if (st.st_mtime <= fPROOFcfg.fMtime)
01737 return 0;
01738
01739
01740 fPROOFcfg.fMtime = st.st_mtime;
01741
01742
01743 FILE *fin = 0;
01744 if (!(fin = fopen(fPROOFcfg.fName.c_str(), "r"))) {
01745 if (fWorkers.size() > 1) {
01746 TRACE(XERR, "unable to fopen file: " << fPROOFcfg.fName << " - errno: " << errno);
01747 TRACE(XERR, "continuing with existing list of workers.");
01748 return 0;
01749 } else {
01750 return -1;
01751 }
01752 }
01753
01754 if (reset) {
01755
01756 fWorkers.clear();
01757 }
01758
01759
01760 if (fRegWorkers.size() < 1) {
01761 XrdOucString mm("master ", 128);
01762 mm += fMgr->Host();
01763 fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
01764 } else {
01765
01766 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
01767
01768 w++;
01769 for (; w != fRegWorkers.end(); w++) {
01770 (*w)->fActive = 0;
01771 }
01772 }
01773
01774
01775 int nw = 0;
01776 char lin[2048];
01777 while (fgets(lin, sizeof(lin), fin)) {
01778
01779 int p = 0;
01780 while (lin[p++] == ' ') {
01781 ;
01782 }
01783 p--;
01784 if (lin[p] == '\0' || lin[p] == '\n')
01785 continue;
01786
01787
01788 if (lin[0] == '#')
01789 continue;
01790
01791
01792 if (lin[strlen(lin)-1] == '\n')
01793 lin[strlen(lin)-1] = '\0';
01794
01795 TRACE(DBG, "found line: " << lin);
01796
01797
01798 XrdProofWorker *pw = new XrdProofWorker(lin);
01799
01800 const char *pfx[2] = { "master", "node" };
01801 if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
01802 !strncmp(lin, pfx[1], strlen(pfx[1]))) {
01803
01804 if (pw->fHost.beginswith("localhost") ||
01805 pw->Matches(fMgr->Host())) {
01806
01807 XrdProofWorker *fw = fRegWorkers.front();
01808 fw->Reset(lin);
01809 }
01810
01811 SafeDelete(pw);
01812 } else {
01813
01814 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
01815
01816 w++;
01817 bool haveit = 0;
01818 while (w != fRegWorkers.end()) {
01819 if (!((*w)->fActive)) {
01820 if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
01821 (*w)->fActive = 1;
01822 haveit = 1;
01823 break;
01824 }
01825 }
01826
01827 w++;
01828 }
01829
01830 if (!haveit) {
01831
01832 fRegWorkers.push_back(pw);
01833 } else {
01834
01835 SafeDelete(pw);
01836 }
01837 }
01838 }
01839
01840
01841 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
01842 while (w != fRegWorkers.end()) {
01843 if ((*w)->fActive) {
01844 fWorkers.push_back(*w);
01845 nw++;
01846 }
01847 w++;
01848 }
01849
01850
01851 fclose(fin);
01852
01853
01854 if (reset)
01855 FindUniqueNodes();
01856
01857
01858 return ((nw == 0) ? -1 : 0);
01859 }
01860
01861
01862 int XrdProofdNetMgr::FindUniqueNodes()
01863 {
01864
01865
01866
01867
01868 XPDLOC(NMGR, "NetMgr::FindUniqueNodes")
01869
01870 TRACE(REQ, "# workers: " << fWorkers.size());
01871
01872
01873 fNodes.clear();
01874
01875
01876 if (fWorkers.size() > 1) {
01877 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
01878 w++;
01879 for (; w != fWorkers.end(); w++) if ((*w)->fActive) {
01880 bool add = 1;
01881 std::list<XrdProofWorker *>::iterator n;
01882 for (n = fNodes.begin() ; n != fNodes.end(); n++) {
01883 if ((*n)->Matches(*w)) {
01884 add = 0;
01885 break;
01886 }
01887 }
01888 if (add)
01889 fNodes.push_back(*w);
01890 }
01891 }
01892 TRACE(REQ, "found " << fNodes.size() << " unique nodes");
01893
01894
01895 return fNodes.size();
01896 }