
Go to the documentation of this file.
00001 // @(#)root/proofd:$Id: XrdProofdNetMgr.cxx 37960 2011-02-03 09:04:41Z ganis $
00002 // Author: G. Ganis  Jan 2008
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 #include "XrdProofdPlatform.h"
00013 //////////////////////////////////////////////////////////////////////////
00014 //                                                                      //
00015 // XrdProofdNetMgr                                                      //
00016 //                                                                      //
00017 // Authors: G. Ganis, CERN, 2008                                        //
00018 //                                                                      //
00019 // Manages connections between PROOF server daemons                     //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00023 #include "XrdProofdNetMgr.h"
00025 #include "Xrd/XrdBuffer.hh"
00026 #include "XrdClient/XrdClientConst.hh"
00027 #include "XrdClient/XrdClientEnv.hh"
00028 #include "XrdClient/XrdClientMessage.hh"
00029 #include "XrdClient/XrdClientUrlInfo.hh"
00030 #include "XrdNet/XrdNetDNS.hh"
00031 #include "XrdOuc/XrdOucStream.hh"
00032 #include "XrdSys/XrdSysPlatform.hh"
00034 #include "XrdProofdClient.h"
00035 #include "XrdProofdManager.h"
00036 #include "XrdProofdProtocol.h"
00037 #include "XrdProofdResponse.h"
00038 #include "XrdProofWorker.h"
00040 // Tracing utilities
00041 #include "XrdProofdTrace.h"
00043 #include <algorithm>
00044 #include <limits>
00045 #include <math.h>
00047 //______________________________________________________________________________
00048 int MessageSender(const char *msg, int len, void *arg)
00049 {
00050    // Send up a message from the server
00052    XrdProofdResponse *r = (XrdProofdResponse *) arg;
00053    if (r) {
00054       return r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) msg, len);
00055    }
00056    return -1;
00057 }
00059 //______________________________________________________________________________
00060 XrdProofdNetMgr::XrdProofdNetMgr(XrdProofdManager *mgr,
00061                                  XrdProtocol_Config *pi, XrdSysError *e)
00062    : XrdProofdConfig(pi->ConfigFN, e)
00063 {
00064    // Constructor
00065    fMgr = mgr;
00066    fResourceType = kRTNone;
00067    fPROOFcfg.fName = "";
00068    fPROOFcfg.fMtime = -1;
00069    fReloadPROOFcfg = 1;
00070    fDfltFallback = 0;
00071    fDfltWorkers.clear();
00072    fRegWorkers.clear();
00073    fWorkers.clear();
00074    fNodes.clear();
00075    fNumLocalWrks = XrdProofdAux::GetNumCPUs();
00076    fWorkerUsrCfg = 0;
00077    fRequestTO = 30;
00078    fBonjourEnabled = false;
00079 #if defined(BUILD_BONJOUR)
00080    char *host = XrdNetDNS::getHostName();
00081    fBonjourName = host ? host : "";
00082    SafeFree(host);
00083    fBonjourCores = XrdProofdAux::GetNumCPUs();
00084    fBonjourRequestedSrvType = kBonjourSrvDisabled;
00085 #endif
00087    // Configuration directives
00088    RegisterDirectives();
00089 }
00091 //__________________________________________________________________________
00092 void XrdProofdNetMgr::RegisterDirectives()
00093 {
00094    // Register config directives
00096    Register("adminreqto", new XrdProofdDirective("adminreqto", this, &DoDirectiveClass));
00097    Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
00098    Register("worker", new XrdProofdDirective("worker", this, &DoDirectiveClass));
00099    Register("bonjour", new XrdProofdDirective("bonjour", this, &DoDirectiveClass));
00100    Register("localwrks", new XrdProofdDirective("localwrks", (void *)&fNumLocalWrks, &DoDirectiveInt));
00101 }
00103 //__________________________________________________________________________
00104 XrdProofdNetMgr::~XrdProofdNetMgr()
00105 {
00106    // Destructor
00108    // Cleanup the worker lists
00109    // (the nodes list points to the same object, no cleanup is needed)
00110    std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
00111    while (w != fRegWorkers.end()) {
00112       delete *w;
00113       w = fRegWorkers.erase(w);
00114    }
00115    w = fDfltWorkers.begin();
00116    while (w != fDfltWorkers.end()) {
00117       delete *w;
00118       w = fDfltWorkers.erase(w);
00119    }
00120    fWorkers.clear();
00121 }
00123 //__________________________________________________________________________
00124 int XrdProofdNetMgr::Config(bool rcf)
00125 {
00126    // Run configuration and parse the entered config directives.
00127    // Return 0 on success, -1 on error
00128    XPDLOC(NMGR, "NetMgr::Config")
00130    // Lock the method to protect the lists.
00131    XrdSysMutexHelper mhp(fMutex);
00133    // Cleanup the worker list
00134    std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
00135    while (w != fWorkers.end()) {
00136       delete *w;
00137       w = fWorkers.erase(w);
00138    }
00139    // Create a default master line
00140    XrdOucString mm("master ", 128);
00141    mm += fMgr->Host();
00142    mm += " port=";
00143    mm += fMgr->Port();
00144    fWorkers.push_back(new XrdProofWorker(mm.c_str()));
00146    // Run first the configurator
00147    if (XrdProofdConfig::Config(rcf) != 0) {
00148       XPDERR("problems parsing file ");
00149       return -1;
00150    }
00152    XrdOucString msg;
00153    msg = (rcf) ? "re-configuring" : "configuring";
00154    TRACE(ALL, msg);
00156    if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
00157       TRACE(ALL, "PROOF config file: " <<
00158             ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() : "none"));
00159       if (fResourceType == kRTStatic) {
00160          // Initialize the list of workers if a static config has been required
00161          // Default file path, if none specified
00162          bool dodefault = 1;
00163          if (fPROOFcfg.fName.length() > 0) {
00164             // Load file content in memory
00165             if (ReadPROOFcfg() == 0) {
00166                TRACE(ALL, "PROOF config file will " <<
00167                      ((fReloadPROOFcfg) ? "" : "not ") << "be reloaded upon change");
00168                dodefault = 0;
00169             } else {
00170                if (!fDfltFallback) {
00171                   XPDERR("unable to find valid information in PROOF config file " <<
00172                          fPROOFcfg.fName);
00173                   fPROOFcfg.fMtime = -1;
00174                   return 0;
00175                } else {
00176                   TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration to start with");
00177                }
00178             }
00179          }
00180          if (dodefault) {
00181             // Use default
00182             CreateDefaultPROOFcfg();
00183          }
00184       } else if (fResourceType == kRTNone && fWorkers.size() <= 1 && !fBonjourEnabled) {
00185          // Nothing defined: use default
00186          CreateDefaultPROOFcfg();
00187       }
00189       // Find unique nodes
00190       FindUniqueNodes();
00191    }
00193    // For connection to the other xproofds we try only once
00194    XrdProofConn::SetRetryParam(1, 1);
00195    // Request Timeout
00196    EnvPutInt(NAME_REQUESTTIMEOUT, fRequestTO);
00198    // Notification
00199    XPDFORM(msg, "%d worker nodes defined at start-up", fWorkers.size() - 1);
00200    TRACE(ALL, msg);
00202    // Done
00203    return 0;
00204 }
00206 //______________________________________________________________________________
00207 int XrdProofdNetMgr::DoDirective(XrdProofdDirective *d,
00208                                  char *val, XrdOucStream *cfg, bool rcf)
00209 {
00210    // Update the priorities of the active sessions.
00211    XPDLOC(NMGR, "NetMgr::DoDirective")
00213    if (!d)
00214       // undefined inputs
00215       return -1;
00217    if (d->fName == "resource") {
00218       return DoDirectiveResource(val, cfg, rcf);
00219    } else if (d->fName == "adminreqto") {
00220       return DoDirectiveAdminReqTO(val, cfg, rcf);
00221    } else if (d->fName == "worker") {
00222       return DoDirectiveWorker(val, cfg, rcf);
00223    } else if (d->fName == "bonjour") {
00224       return DoDirectiveBonjour(val, cfg, rcf);
00225    }
00227    TRACE(XERR, "unknown directive: " << d->fName);
00229    return -1;
00230 }
00232 //______________________________________________________________________________
00233 int XrdProofdNetMgr::DoDirectiveBonjour(char *val, XrdOucStream *cfg, bool)
00234 {
00235    XPDLOC(NMGR, "NetMgr::DoDirectiveBonjour");
00237    // Process 'bonjour' directive
00238    TRACE(DBG, "processing Bonjour directive");
00240    if (!val || !cfg)
00241       // undefined inputs
00242       return -1;
00244 #if defined(BUILD_BONJOUR)
00245    const char * cp = NULL;
00247    // The first directive must be the 'bonjour role'.
00248    if (!strcmp("register", val) || !strcmp("publish", val)) {
00249       // register and publish are synonyms.
00250       fBonjourRequestedSrvType = kBonjourSrvRegister;
00251    } else if (!strcmp("discover", val) || !strcmp("browse", val)) {
00252       fBonjourRequestedSrvType = kBonjourSrvBrowse;
00253    } else if (!strcmp("both", val) || !strcmp("all", val)) {
00254       fBonjourRequestedSrvType = kBonjourSrvBoth;
00255    } else {
00256       TRACE(XERR, "Bonjour directive unknown");
00257       return -1;
00258    }
00260    // Continue reading words until the end of the logical line. This a descending
00261    // recursive parser (LR). Doing this in that way allows users to use a custom
00262    // order of directives and improves xrootd's if/else/fi compatibility.
00263    while ((val = cfg->GetWord()) != NULL) {
00264       // Construct an XrdString for a more confortable analysis.
00265       XrdOucString s(val);
00266       // If we have line, parse the directive according to the personal rules of
00267       // each one. Note that this method allows. It would be more elegant with
00268       // switch statment, but we need to check only the beginning of the words.
00269       if (s.beginswith("servicetype=")) {
00270          cp = index(val, '=');
00271          cp++;
00272          fBonjourServiceType.assign(cp, 0);
00273          TRACE(DBG, "custom service type is " << cp);
00274       } else if (s.beginswith("name=")) {
00275          cp = index(val, '=');
00276          cp++;
00277          fBonjourName.assign(cp, 0);
00278          TRACE(DBG, "custom Bonjour name is " << cp);
00279       } else if (s.beginswith("domain=")) {
00280          cp = index(val, '=');
00281          cp++;
00282          fBonjourDomain.assign(cp, 0);
00283          TRACE(DBG, "custom Bonjour domain is " << cp);
00284       } else if (s.beginswith("cores=")) {
00285          cp = index(val, '=');
00286          cp++;
00287          fBonjourCores = strtol(cp, NULL, 10); // atoi() is not thread-safe.
00288          if (fBonjourCores <= 0) {
00289             TRACE(XERR, "number of cores not valid: " << fBonjourCores);
00290             TRACE(XERR, "Bonjour module not loaded!");
00291             return -1;
00292          }
00293          TRACE(DBG, "custom number of cores is " << cp);
00294       } else {
00295          TRACE(XERR, "Bonjour directive unknown");
00296          cfg->RetToken();
00297          return -1;
00298       }
00299    }
00300    TRACE(DBG, "custom Bonjour name is '" << fBonjourName <<"'");
00302    // Check the compatibility of the roles and give a warning to the user.
00303    if (!XrdProofdNetMgr::CheckBonjourRoleCoherence(fMgr->SrvType(), GetBonjourRequestedServiceType())) {
00304       TRACE(XERR, "Warning: xpd.role directive and xpd.bonjour service selection are not compatible");
00305    }
00307    // Register the service on bonjour.
00308    return LoadBonjourModule(fBonjourRequestedSrvType);
00310 #else
00312    TRACE(XERR, "Bonjour support is disabled");
00313    return -1;
00315 #endif
00316 }
00318 //______________________________________________________________________________
00319 void XrdProofdNetMgr::BalanceNodesOrder()
00320 {
00321    // Indices (this will be used twice).
00322    list<XrdProofWorker *>::const_iterator iter, iter2;
00323    list<XrdProofWorker *>::iterator iter3; // Not const, less efficient.
00324    // Map to store information of the balancer.
00325    map<XrdProofWorker *, BalancerInfo> info;
00326    // Node with minimum number of workers distinct to 1.
00327    unsigned int min = UINT_MAX;
00328    // Total number of nodes and per iteration assignments.
00329    unsigned int total = 0, total_perit = 0;
00330    // Number of iterations to get every node filled.
00331    unsigned int total_added = 0;
00332    // Temporary list to store the balanced configuration
00333    list<XrdProofWorker *> tempNodes;
00334    // Flag for the search and destroy loop.
00335    bool deleted;
00337    // Fill the information store with the first data (number of nodes).
00338    for (iter = fNodes.begin(); iter != fNodes.end(); iter++) {
00339       // The next code is not the same as this:
00340       //info[*iter].available = count(fWorkers.begin(), fWorkers.end(), *iter);
00341       // The previous piece of STL code only checks the pointer of the value
00342       // stored on the list, altough it is more efficient, it needs that repeated
00343       // nodes point to the same object. To allow hybrid configurations, we are
00344       // doing a 'manually' matching since statically configured nodes are
00345       // created in multiple ways.
00346       info[*iter].available = 0;
00347       for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); iter2++) {
00348          if ((*iter)->Matches(*iter2)) {
00349             info[*iter].available++;
00350          }
00351       }
00352       info[*iter].added = 0;
00353       // Calculate the minimum greater than 1.
00354       if (info[*iter].available > 1 && info[*iter].available < min)
00355          min = info[*iter].available;
00356       // Calculate the totals.
00357       total += info[*iter].available;
00358    }
00360    // Now, calculate the number of workers to add in each iteration of the
00361    // round robin, scaling to the smaller number.
00362    for (iter = fNodes.begin(); iter != fNodes.end(); iter++) {
00363       if (info[*iter].available > 1) {
00364          info[*iter].per_iteration = (unsigned int)floor((double)info[*iter].available / (double)min);
00365       } else {
00366          info[*iter].per_iteration = 1;
00367       }
00368       // Calculate the totals.
00369       total_perit += info[*iter].per_iteration;
00370    }
00372    // Since we are going to substitute the list, don't forget to recover the
00373    // default node at the fist time.
00374    tempNodes.push_back(fWorkers.front());
00376    // Finally, do the round robin assignment of nodes.
00377    // Stop when every node has its workers processed.
00378    while (total_added < total) {
00379       for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); i++) {
00380          if (i->second.added < i->second.available) {
00381             // Be careful with the remainders (on prime number of nodes).
00382             unsigned int to_add = xrdmin(i->second.per_iteration,
00383                                         (i->second.available - i->second.added));
00384             // Then add the nodes.
00385             for (unsigned int j = 0; j < to_add; j++) {
00386                tempNodes.push_back(i->first);
00387             }
00388             i->second.added += to_add;
00389             total_added += to_add;
00390          }
00391       }
00392    }
00394    // Since we are mergin nodes in only one object, we must merge the current
00395    // sessions of the static nodes (that can be distinct objects that represent
00396    // the same node) and delete the orphaned objects. If, in the future, we can
00397    // assure that every worker has only one object in the list, this is not more
00398    // necessary. The things needed to change are the DoDirectiveWorker, it must
00399    // search for a node before inserting it, and in the repeat directive insert
00400    // the same node always. Also the default configuration methods (there are
00401    // two in this class) must be updated.
00402    iter3 = ++(fWorkers.begin());
00403    while (iter3 != fWorkers.end()) {
00404       deleted = false;
00405       // If the worker is not in the fWorkers list, we must process it. Note that
00406       // std::count() uses a plain comparison between values, in this case, we
00407       // are comparing pointers (numbers, at the end).
00408       if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
00409          // Search for an object that matches with this in the temp list.
00410          for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
00411             if ((*iter2)->Matches(*iter3)) {
00412                // Copy data and delete the *iter object.
00413                (*iter2)->MergeProofServs(*(*iter3));
00414                deleted = true;
00415                delete *iter3;
00416                fWorkers.erase(iter3++);
00417                break;
00418             }
00419          }
00420       }
00421       // Do not forget to increase the iterator.
00422       if (!deleted)
00423          iter3++;
00424    }
00426    // Then, substitute the current fWorkers list with the balanced one.
00427    fWorkers = tempNodes;
00428 }
00430 //______________________________________________________________________________
00431 #if defined(BUILD_BONJOUR)
00432 void * XrdProofdNetMgr::ProcessBonjourUpdate(void * context)
00433 {
00434    XrdProofdNetMgr * mgr;
00435    std::list<XrdOucBonjourNode *> nodes;
00436    std::list<XrdOucBonjourNode *>::const_iterator idx;
00437    std::list<XrdProofWorker *>::iterator w, w2;
00438    const XrdOucBonjourNode * i;
00439    XrdProofWorker * worker;
00440    bool haveit;
00441    int cores = -1;
00442    int recordLength;
00444    XPDLOC(NMGR, "NetMgr::ProcessBonjourUpdate");
00445    TRACE(DBG, "Updating the network topology");
00447    mgr = static_cast<XrdProofdNetMgr *>(context);
00449    // Lock the method to protect the lists.
00450    XrdSysMutexHelper mhp(mgr->fMutex);
00452    // If there are no workers registered on the fRegWorkers list, this is the
00453    // first time we run this updater, so we must create the default node. If not
00454    // we can mark all the nodes as inactive and then look for the Bonjour updates.
00455    if (mgr->fRegWorkers.size() < 1) {
00456       XrdOucString mm("master ", 128);
00457       mm += mgr->fMgr->Host();
00458       mm += " port=";
00459       mm += mgr->fMgr->Port();
00460       mgr->fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
00461    } else {
00462       // Deactivate all current active workers
00463       w = mgr->fRegWorkers.begin();
00464       // Skip the master line
00465       w++;
00466       for (; w != mgr->fRegWorkers.end(); w++) {
00467          (*w)->fActive = false;
00468       }
00469    }
00471    // Update the list with the new nodes.
00472    // Get the list, and get it locked.
00473    mgr->fBonjourManager->LockNodeList();
00474    nodes = mgr->fBonjourManager->GetCurrentNodeList();
00476    for (idx = nodes.begin(); idx != nodes.end(); idx++) {
00477       // Get the current node.
00478       i = *idx;
00479       (*i).Print();
00480       // Must be not empty
00481       if (!i->GetHostName() || (i->GetHostName() && strlen(i->GetHostName()) <= 0)) {
00482          TRACE(ALL,"bonjour list node: empty!");
00483          continue;
00484       }
00485       TRACE(DBG, "parsing info for node: " << i->GetHostName() << ", port: " << i->GetPort());
00486       // Filter by service type getting rid of the trailing '.'.
00487       if (i->GetBonjourRecord().MatchesRegisteredType(mgr->GetBonjourServiceType())) {
00488          // Check if we have already it
00489          w = mgr->fRegWorkers.begin();
00490          w++;
00491          haveit = 0;
00492          while (w != mgr->fRegWorkers.end()) {
00493             TRACE(HDBG,"registerd node: "<< (*w)->fHost <<", port: "<<(*w)->fPort);
00494             if ((*w)->fHost == i->GetHostName() && (*w)->fPort == i->GetPort()) {
00495                (*w)->fActive = true;
00496                haveit = 1;
00498                // Check if the node is on the fWorkers list.
00499                if (std::find(mgr->fWorkers.begin(), mgr->fWorkers.end(), *w) == mgr->fWorkers.end()) {
00500                   // Check for the cores of the node.
00501                   if (i->GetBonjourRecord().GetTXTValue("cores", recordLength) != NULL) {
00502                      XrdOucString trimmed(i->GetBonjourRecord().GetTXTValue("cores", recordLength), recordLength);
00503                      cores = strtol(trimmed.c_str(), NULL, 10);
00504                   } else {
00505                      cores = 1;
00506                   }
00507                   // The node is returning from being not available, maybe with
00508                   // a different number of cores, so re-check the number of it.
00509                   TRACE(HDBG, " adding "<<cores<<" for worker '"<<(*w)->fHost<<"'");
00510                   for (int c = 0; c < cores; c++) {
00511                      // If we don't have the node on the fRegWorkers list, it will not
00512                      // be also on the de fWorkers list.
00513                      mgr->fWorkers.push_back(*w);
00514                   }
00515                } else {
00516                   TRACE(DBG, " worker(s) '"<<(*w)->fHost<<"' already in the list");
00517                }
00519                break;
00520             }
00521             // Go to next
00522             w++;
00523          }
00524          // If we do not have it, build a new worker object
00525          if (!haveit) {
00526             // Create the new node.
00527             worker = new XrdProofWorker();
00528             worker->fHost = i->GetHostName();
00529             worker->fPort = i->GetPort();
00530             worker->fActive = true;
00531             if (i->GetBonjourRecord().GetTXTValue("nodetype", recordLength) != NULL) {
00532                worker->fType = i->GetBonjourRecord().GetTXTValue("nodetype", recordLength)[0];
00533             }
00534             // Check for the cores of the node.
00535             const char *pbr = i->GetBonjourRecord().GetTXTValue("cores", recordLength);
00536             if (recordLength > 0) {
00537                char *pc = new char[recordLength + 1];
00538                memcpy(pc, pbr, recordLength);
00539                pc[recordLength] = 0;
00540                cores = strtol(pbr, NULL, 10);
00541                delete [] pc;
00542             } else {
00543                TRACE(ALL, "no information about the cores available ... skip");
00544                continue;
00545             }
00546             // Add the node to the list the times needed.
00547             mgr->fRegWorkers.push_back(worker);
00548             for (int c = 0; c < cores; c++) {
00549                // If we don't have the node on the fRegWorkers list, it will not
00550                // be also on the de fWorkers list.
00551                mgr->fWorkers.push_back(worker);
00552             }
00553          }
00554       }
00555    }
00557    // Remove the lock on the Bonjour list.
00558    mgr->fBonjourManager->UnLockNodeList();
00560    // Remove nodes not active from fWorkers list.
00561    w = mgr->fRegWorkers.begin();
00562    w++;
00563    while (w != mgr->fRegWorkers.end()) {
00564       if (!((*w)->fActive)) {
00565          mgr->fWorkers.remove(*w);
00566       }
00567       w++;
00568    }
00570    // Process list.
00571    mgr->FindUniqueNodes();
00573    // Balance order.
00574    mgr->BalanceNodesOrder();
00576    return NULL;
00577 }
00578 #endif
00580 //______________________________________________________________________________
00581 #if defined(BUILD_BONJOUR)
00582 int XrdProofdNetMgr::LoadBonjourModule(int srvtype)
00583 {
00584    XPDLOC(NMGR, "NetMgr::LoadBonjourModule");
00586    // Get the reference to the bonjour manager. Store it to optimze the
00587    // getInstance() usage.
00588    fBonjourManager = &(XrdOucBonjourFactory::FactoryByPlatform()->GetBonjourManager());
00590    // Register the service if needed.
00591    if (srvtype == kBonjourSrvRegister || srvtype == kBonjourSrvBoth) {
00592       // Default service name or a user customized
00593       XrdOucBonjourRecord record(GetBonjourName(), GetBonjourServiceType(), GetBonjourDomain());
00595       // Put the extra information on the record.
00596       if (XrdProofdProtocol::Mgr())
00597          switch (XrdProofdProtocol::Mgr()->SrvType()) {
00598             case kXPD_TopMaster:
00599             case kXPD_Master:
00600                record.AddTXTRecord("nodetype", "S");
00601                break;
00602             case kXPD_AnyServer: // Altough we can be master, publish as worker.
00603             case kXPD_Worker:
00604                record.AddTXTRecord("nodetype", "W");
00605                break;
00606             default:
00607                TRACE(XERR, "TXT node type is not known '" << XrdProofdProtocol::Mgr()->SrvType() << "'");
00608          }
00610       // Put the number of workers desired
00611       record.AddTXTRecord("cores", fBonjourCores);
00613       // Register the service.
00614       if (fBonjourManager->RegisterService(record, fMgr->Port()) == 0) {
00615          TRACE(ALL, "Bonjour service was published OK");
00616       } else {
00617          TRACE(XERR, "Bonjour service could not be published");
00618          return -1;
00619       }
00620    }
00622    // Subscribe to the discoverage thread.
00623    if (srvtype == kBonjourSrvBrowse || srvtype == kBonjourSrvBoth) {
00624       fBonjourEnabled = true;
00625       fBonjourManager->SubscribeForUpdates(GetBonjourServiceType(), ProcessBonjourUpdate, this);
00626    }
00628    return 0;
00629 }
00630 #endif
00632 //______________________________________________________________________________
00633 #if defined(BUILD_BONJOUR)
00634 bool XrdProofdNetMgr::CheckBonjourRoleCoherence(int xrdRole, int bonjourSrvType)
00635 {
00636    // Bonjour services:       Discover-Publish-Both   ALLOWED COMBINATIONS
00637    const bool allowed[4][3] = {{true,  true,  true }, // -1: AnyServer
00638                                {false, true,  false}, //  0: Worker
00639                                {true,  true,  true }, //  1: Submaster
00640                                {true,  false, false}};//  2: Master & Supermaster
00642    if (xrdRole < -1 || xrdRole > 2 || bonjourSrvType < -1 || bonjourSrvType > 2)
00643       return false;
00645    if (bonjourSrvType == kBonjourSrvDisabled)
00646       return true; // Avoids warnings when Bonjour is not enabled.
00648    // Add 1 to role since the constants defined in XProofProtocol.h are between
00649    // -1 and 2.
00650    return allowed[xrdRole + 1][bonjourSrvType];
00651 }
00652 #endif
00654 //______________________________________________________________________________
00655 int XrdProofdNetMgr::DoDirectiveAdminReqTO(char *val, XrdOucStream *cfg, bool)
00656 {
00657    // Process 'adminreqto' directive
00659    if (!val)
00660       // undefined inputs
00661       return -1;
00663    // Check deprecated 'if' directive
00664    if (fMgr->Host() && cfg)
00665       if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
00666          return 0;
00668    // Timeout on requested broadcasted to workers; there are 4 attempts,
00669    // so the real timeout is 4 x fRequestTO
00670    int to = strtol(val, 0, 10);
00671    fRequestTO = (to > 0) ? to : fRequestTO;
00672    return 0;
00673 }
00675 //______________________________________________________________________________
00676 int XrdProofdNetMgr::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
00677 {
00678    // Process 'resource' directive
00679    XPDLOC(NMGR, "NetMgr::DoDirectiveResource")
00681    if (!val || !cfg)
00682       // undefined inputs
00683       return -1;
00685    if (!strcmp("static", val)) {
00686       // We just take the path of the config file here; the
00687       // rest is used by the static scheduler
00688       fResourceType = kRTStatic;
00689       while ((val = cfg->GetWord()) && val[0]) {
00690          XrdOucString s(val);
00691          if (s.beginswith("ucfg:")) {
00692             fWorkerUsrCfg = s.endswith("yes") ? 1 : 0;
00693          } else if (s.beginswith("reload:")) {
00694             fReloadPROOFcfg = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
00695          } else if (s.beginswith("dfltfallback:")) {
00696             fDfltFallback = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
00697          } else if (s.beginswith("wmx:")) {
00698          } else if (s.beginswith("selopt:")) {
00699          } else {
00700             // Config file
00701             fPROOFcfg.fName = val;
00702             if (fPROOFcfg.fName.beginswith("sm:")) {
00703                fPROOFcfg.fName.replace("sm:", "");
00704             }
00705             XrdProofdAux::Expand(fPROOFcfg.fName);
00706             // Make sure it exists and can be read
00707             if (access(fPROOFcfg.fName.c_str(), R_OK)) {
00708                if (errno == ENOENT) {
00709                   TRACE(ALL, "WARNING: configuration file does not exists: " << fPROOFcfg.fName);
00710                } else {
00711                   TRACE(XERR, "configuration file cannot be read: " << fPROOFcfg.fName);
00712                   fPROOFcfg.fName = "";
00713                   fPROOFcfg.fMtime = -1;
00714                }
00715             }
00716          }
00717       }
00718    }
00719    return 0;
00720 }
00722 //______________________________________________________________________________
00723 int XrdProofdNetMgr::DoDirectiveWorker(char *val, XrdOucStream *cfg, bool)
00724 {
00725    // Process 'worker' directive
00726    XPDLOC(NMGR, "NetMgr::DoDirectiveWorker")
00728    if (!val || !cfg)
00729       // undefined inputs
00730       return -1;
00732    // Lock the method to protect the lists.
00733    XrdSysMutexHelper mhp(fMutex);
00735    // Get the full line (w/o heading keyword)
00736    cfg->RetToken();
00737    XrdOucString wrd(cfg->GetWord());
00738    if (wrd.length() > 0) {
00739       // Build the line
00740       XrdOucString line;
00741       char rest[2048] = {0};
00742       cfg->GetRest((char *)&rest[0], 2048);
00743       XPDFORM(line, "%s %s", wrd.c_str(), rest);
00744       // Parse it now
00745       if (wrd == "master" || wrd == "node") {
00746          // Init a master instance
00747          XrdProofWorker *pw = new XrdProofWorker(line.c_str());
00748          if (pw->fHost.beginswith("localhost") ||
00749              pw->Matches(fMgr->Host())) {
00750             // Replace the default line (the first with what found in the file)
00751             XrdProofWorker *fw = fWorkers.front();
00752             fw->Reset(line.c_str());
00753          }
00754          SafeDelete(pw);
00755       } else {
00756          // How many lines like this?
00757          int nr = 1;
00758          int ir = line.find("repeat=");
00759          if (ir != STR_NPOS) {
00760             XrdOucString r(line, ir + strlen("repeat="));
00761             r.erase(r.find(' '));
00762             nr = r.atoi();
00763             if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
00764             TRACE(DBG, "found repeat = " << nr);
00765          }
00766          while (nr--) {
00767             // Build the worker object
00768             XrdProofdMultiStr mline(line.c_str());
00769             if (mline.IsValid()) {
00770                TRACE(DBG, "found multi-line with: " << mline.N() << " tokens");
00771                for (int i = 0; i < mline.N(); i++) {
00772                   TRACE(HDBG, "found token: " << mline.Get(i));
00773                   fWorkers.push_back(new XrdProofWorker(mline.Get(i).c_str()));
00774                }
00775             } else {
00776                TRACE(DBG, "found line: " << line);
00777                fWorkers.push_back(new XrdProofWorker(line.c_str()));
00778             }
00779          }
00780       }
00781    }
00783    // Necessary for the balancer when Bonjour is enabled. Note that this balancer
00784    // can also be enabled with a static configuration. By this time is disabled
00785    // due to its experimental status.
00786    FindUniqueNodes();
00787    //BalanceNodesOrder();
00789    return 0;
00790 }
00792 //__________________________________________________________________________
00793 int XrdProofdNetMgr::BroadcastCtrlC(const char *usr)
00794 {
00795    // Broadcast a ctrlc interrupt
00796    // Return 0 on success, -1 on error
00797    XPDLOC(NMGR, "NetMgr::BroadcastCtrlC")
00799    int rc = 0;
00801    // Loop over unique nodes
00802    std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
00803    XrdProofWorker *w = 0;
00804    while (iw != fNodes.end()) {
00805       if ((w = *iw) && w->fType != 'M') {
00806          // Do not send it to ourselves
00807          bool us = (((w->fHost.find("localhost") != STR_NPOS ||
00808                       XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
00809                     (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
00810          if (!us) {
00811             // Create 'url'
00812             XrdOucString u = (usr) ? usr : fMgr->EffectiveUser();
00813             u += '@';
00814             u += w->fHost;
00815             if (w->fPort != -1) {
00816                u += ':';
00817                u += w->fPort;
00818             }
00819             // Get a connection to the server
00820             XrdProofConn *conn = GetProofConn(u.c_str());
00821             if (conn && conn->IsValid()) {
00822                // Prepare request
00823                XPClientRequest reqhdr;
00824                memset(&reqhdr, 0, sizeof(reqhdr));
00825                conn->SetSID(reqhdr.header.streamid);
00826                reqhdr.proof.requestid = kXP_ctrlc;
00827                reqhdr.proof.sid = 0;
00828                reqhdr.proof.dlen = 0;
00829                // We need the right order
00830                if (XPD::clientMarshall(&reqhdr) != 0) {
00831                   TRACE(XERR, "problems marshalling request");
00832                   return -1;
00833                }
00834                if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
00835                   TRACE(XERR, "problems sending ctrl-c request to server " << u);
00836                }
00837                // Clean it up, to avoid leaving open tcp connection possibly going forever
00838                // into CLOSE_WAIT
00839                SafeDelete(conn);
00840             }
00841          } else {
00842             TRACE(DBG, "broadcast request for ourselves: ignore");
00843          }
00844       }
00845       // Next worker
00846       iw++;
00847    }
00849    // Done
00850    return rc;
00851 }
00853 //__________________________________________________________________________
00854 int XrdProofdNetMgr::Broadcast(int type, const char *msg, const char *usr,
00855                                XrdProofdResponse *r, bool notify, int subtype)
00856 {
00857    // Broadcast request to known potential sub-nodes.
00858    // Return 0 on success, -1 on error
00859    XPDLOC(NMGR, "NetMgr::Broadcast")
00861    unsigned int nok = 0;
00862    TRACE(REQ, "type: " << type);
00864    // Loop over unique nodes
00865    std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
00866    XrdProofWorker *w = 0;
00867    XrdClientMessage *xrsp = 0;
00868    while (iw != fNodes.end()) {
00869       if ((w = *iw) && w->fType != 'M') {
00870          // Do not send it to ourselves
00871          bool us = (((w->fHost.find("localhost") != STR_NPOS ||
00872                       XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
00873                     (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
00874          if (!us) {
00875             // Create 'url'
00876             XrdOucString u = (usr) ? usr : fMgr->EffectiveUser();
00877             u += '@';
00878             u += w->fHost;
00879             if (w->fPort != -1) {
00880                u += ':';
00881                u += w->fPort;
00882             }
00883             // Type of server
00884             int srvtype = (w->fType != 'W') ? (kXR_int32) kXPD_Master
00885                           : (kXR_int32) kXPD_Worker;
00886             TRACE(HDBG, "sending request to " << u);
00887             // Send request
00888             if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
00889                TRACE(XERR, "problems sending request to " << u);
00890             } else {
00891                nok++;
00892             }
00893             // Cleanup answer
00894             SafeDelete(xrsp);
00895          } else {
00896             TRACE(DBG, "broadcast request for ourselves: ignore");
00897          }
00898       }
00899       // Next worker
00900       iw++;
00901    }
00903    // Done
00904    return (nok == fNodes.size()) ? 0 : -1;
00905 }
00907 //__________________________________________________________________________
00908 XrdProofConn *XrdProofdNetMgr::GetProofConn(const char *url)
00909 {
00910    // Get a XrdProofConn for url; create a new one if not available
00912    XrdProofConn *p = 0;
00914    // If not found create a new one
00915    XrdOucString buf = " Manager connection from ";
00916    buf += fMgr->Host();
00917    buf += "|ord:000";
00918    char m = 'A'; // log as admin
00920    {
00921       XrdSysMutexHelper mhp(fMutex);
00922       p = new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
00923    }
00924    if (p && !(p->IsValid())) SafeDelete(p);
00926    // Done
00927    return p;
00928 }
00930 //__________________________________________________________________________
00931 XrdClientMessage *XrdProofdNetMgr::Send(const char *url, int type,
00932                                         const char *msg, int srvtype,
00933                                         XrdProofdResponse *r, bool notify,
00934                                         int subtype)
00935 {
00936    // Broadcast request to known potential sub-nodes.
00937    // Return 0 on success, -1 on error
00938    XPDLOC(NMGR, "NetMgr::Send")
00940    XrdClientMessage *xrsp = 0;
00941    TRACE(REQ, "type: " << type);
00943    if (!url || strlen(url) <= 0)
00944       return xrsp;
00946    // Get a connection to the server
00947    XrdProofConn *conn = GetProofConn(url);
00949    bool ok = 1;
00950    if (conn && conn->IsValid()) {
00951       XrdOucString notifymsg("Send: ");
00952       // Prepare request
00953       XPClientRequest reqhdr;
00954       const void *buf = 0;
00955       char **vout = 0;
00956       memset(&reqhdr, 0, sizeof(reqhdr));
00957       conn->SetSID(reqhdr.header.streamid);
00958       reqhdr.header.requestid = kXP_admin;
00959       reqhdr.proof.int1 = type;
00960       switch (type) {
00961          case kROOTVersion:
00962             notifymsg += "change-of-ROOT version request to ";
00963             notifymsg += url;
00964             notifymsg += " msg: ";
00965             notifymsg += msg;
00966             reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
00967             buf = (msg) ? (const void *)msg : buf;
00968             break;
00969          case kCleanupSessions:
00970             notifymsg += "cleanup request to ";
00971             notifymsg += url;
00972             notifymsg += " for user: ";
00973             notifymsg += msg;
00974             reqhdr.proof.int2 = (kXR_int32) srvtype;
00975             reqhdr.proof.sid = -1;
00976             reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
00977             buf = (msg) ? (const void *)msg : buf;
00978             break;
00979          case kExec:
00980             notifymsg += "exec ";
00981             notifymsg += subtype;
00982             notifymsg += "request for ";
00983             notifymsg += msg;
00984             reqhdr.proof.int2 = (kXR_int32) subtype;
00985             reqhdr.proof.sid = -1;
00986             reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
00987             buf = (msg) ? (const void *)msg : buf;
00988             break;
00989          default:
00990             ok = 0;
00991             TRACE(XERR, "invalid request type " << type);
00992             break;
00993       }
00995       // Notify the client that we are sending the request
00996       if (r && notify)
00997          r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notifymsg.c_str(), notifymsg.length());
00999       // Activate processing of unsolicited responses
01000       conn->SetAsync(conn, &MessageSender, (void *)r);
01002       // Send over
01003       if (ok)
01004          xrsp = conn->SendReq(&reqhdr, buf, vout, "NetMgr::Send");
01006       // Deactivate processing of unsolicited responses
01007       conn->SetAsync(0, 0, (void *)0);
01009       // Print error msg, if any
01010       if (r && !xrsp && conn->GetLastErr()) {
01011          XrdOucString cmsg = url;
01012          cmsg += ": ";
01013          cmsg += conn->GetLastErr();
01014          r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
01015       }
01016       // Clean it up, to avoid leaving open tcp connection possibly going forever
01017       // into CLOSE_WAIT
01018       SafeDelete(conn);
01020    } else {
01021       TRACE(XERR, "could not open connection to " << url);
01022       if (r) {
01023          XrdOucString cmsg = "failure attempting connection to ";
01024          cmsg += url;
01025          r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
01026       }
01027    }
01029    // Done
01030    return xrsp;
01031 }
01033 //______________________________________________________________________________
01034 bool XrdProofdNetMgr::IsLocal(const char *host, bool checkport)
01035 {
01036    // Check if 'host' is this local host. If checkport is true,
01037    // matching of the local port with the one implied by host is also checked.
01038    // Return 1 if 'local', 0 otherwise
01040    int rc = 0;
01041    if (host && strlen(host) > 0) {
01042       XrdClientUrlInfo uu(host);
01043       if (uu.Port <= 0) uu.Port = 1093;
01044       // Fully qualified name
01045       char *fqn = XrdNetDNS::getHostName(uu.Host.c_str());
01046       if (fqn && (strstr(fqn, "localhost") || !strcmp(fqn, "") ||
01047                   !strcmp(fMgr->Host(), fqn))) {
01048          if (!checkport || (uu.Port == fMgr->Port()))
01049             rc = 1;
01050       }
01051       SafeFree(fqn);
01052    }
01053    // Done
01054    return rc;
01055 }
01057 //______________________________________________________________________________
01058 int XrdProofdNetMgr::ReadBuffer(XrdProofdProtocol *p)
01059 {
01060    // Process a readbuf request
01061    XPDLOC(NMGR, "NetMgr::ReadBuffer")
01063    int rc = 0;
01064    XPD_SETRESP(p, "ReadBuffer");
01066    XrdOucString emsg;
01068    // Unmarshall the data
01069    //
01070    kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
01071    int len = ntohl(p->Request()->readbuf.len);
01073    // Find out the file name
01074    char *file = 0;
01075    char *filen = 0;
01076    char *pattern = 0;
01077    int dlen = p->Request()->header.dlen;
01078    int grep = ntohl(p->Request()->readbuf.int1);
01079    int blen = dlen;
01080    bool local = 0;
01081    if (dlen > 0 && p->Argp()->buff) {
01082       file = new char[dlen+1];
01083       memcpy(file, p->Argp()->buff, dlen);
01084       file[dlen] = 0;
01085       // Check if local
01086       XrdClientUrlInfo ui(file);
01087       if (ui.Host.length() > 0) {
01088          // Check locality
01089          local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
01090          if (local) {
01091             memcpy(file, ui.File.c_str(), ui.File.length());
01092             file[ui.File.length()] = 0;
01093             blen = ui.File.length();
01094             TRACEP(p, DBG, "file is LOCAL");
01095          }
01096       }
01097       // If grep, extract the pattern
01098       if (grep > 0) {
01099          // 'grep' operation: len is the length of the 'pattern' to be grepped
01100          pattern = new char[len + 1];
01101          int j = blen - len;
01102          int i = 0;
01103          while (j < blen)
01104             pattern[i++] = file[j++];
01105          pattern[i] = 0;
01106          filen = strdup(file);
01107          filen[blen - len] = 0;
01108          TRACEP(p, DBG, "grep operation " << grep << ", pattern:" << pattern);
01109       }
01110    } else {
01111       emsg = "file name not found";
01112       TRACEP(p, XERR, emsg);
01113       response->Send(kXR_InvalidRequest, emsg.c_str());
01114       return 0;
01115    }
01116    if (grep) {
01117       TRACEP(p, REQ, "file: " << filen << ", ofs: " << ofs << ", len: " << len <<
01118              ", pattern: " << pattern);
01119    } else {
01120       TRACEP(p, REQ, "file: " << file << ", ofs: " << ofs << ", len: " << len);
01121    }
01123    // Get the buffer
01124    int lout = len;
01125    char *buf = 0;
01126    if (local) {
01127       if (grep > 0) {
01128          // Grep local file
01129          lout = blen; // initial length
01130          buf = ReadBufferLocal(filen, pattern, lout, grep);
01131       } else {
01132          // Read portion of local file
01133          buf = ReadBufferLocal(file, ofs, lout);
01134       }
01135    } else {
01136       // Read portion of remote file
01137       XrdClientUrlInfo u(file);
01138       u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
01139       buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
01140    }
01142    if (!buf) {
01143       if (lout > 0) {
01144          if (grep > 0) {
01145             if (TRACING(DBG)) {
01146                XPDFORM(emsg, "nothing found by 'grep' in %s, pattern: %s", filen, pattern);
01147                TRACEP(p, DBG, emsg);
01148             }
01149             response->Send();
01150             return 0;
01151          } else {
01152             XPDFORM(emsg, "could not read buffer from %s %s",
01153                     (local) ? "local file " : "remote file ", file);
01154             TRACEP(p, XERR, emsg);
01155             response->Send(kXR_InvalidRequest, emsg.c_str());
01156             return 0;
01157          }
01158       } else {
01159          // Just got an empty buffer
01160          if (TRACING(DBG)) {
01161             emsg = "nothing found in ";
01162             emsg += (grep > 0) ? filen : file;
01163             TRACEP(p, DBG, emsg);
01164          }
01165       }
01166    }
01168    // Send back to user
01169    response->Send(buf, lout);
01171    // Cleanup
01172    SafeFree(buf);
01173    SafeDelArray(file);
01174    SafeFree(filen);
01175    SafeDelArray(pattern);
01177    // Done
01178    return 0;
01179 }
01181 //______________________________________________________________________________
01182 int XrdProofdNetMgr::LocateLocalFile(XrdOucString &file)
01183 {
01184    // Locate the exact file path allowing for wildcards '*' in the file name.
01185    // In case of success, returns 0 and fills file wity the first matching instance.
01186    // Return -1 if no matching pat is found.
01188    XPDLOC(NMGR, "NetMgr::LocateLocalFile")
01190    // If no wild cards or empty, nothing to do
01191    if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;
01193    // Locate the file name and the dir
01194    XrdOucString fn, dn;
01195    int isl = file.rfind('/');
01196    if (isl != STR_NPOS) {
01197       fn.assign(file, isl + 1, -1);
01198       dn.assign(file, 0, isl);
01199    } else {
01200       fn = file;
01201       dn = "./";
01202    }
01204    XrdOucString emsg;
01205    // Scan the dir
01206    DIR *dirp = opendir(dn.c_str());
01207    if (!dirp) {
01208       XPDFORM(emsg, "cannot open '%s' - errno: %d", dn.c_str(), errno);
01209       TRACE(XERR, emsg.c_str());
01210       return -1;
01211    }
01212    struct dirent *ent = 0;
01213    XrdOucString sent;
01214    while ((ent = readdir(dirp))) {
01215       if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2))
01216          continue;
01217       // Check the match
01218       sent = ent->d_name;
01219       if (sent.matches(fn.c_str()) > 0) break;
01220       sent = "";
01221    }
01222    closedir(dirp);
01224    // If found fill a new output
01225    if (sent.length() > 0) {
01226       XPDFORM(file, "%s%s", dn.c_str(), sent.c_str());
01227       return 0;
01228    }
01230    // Not found
01231    return -1;
01232 }
01234 //______________________________________________________________________________
01235 char *XrdProofdNetMgr::ReadBufferLocal(const char *path, kXR_int64 ofs, int &len)
01236 {
01237    // Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the
01238    // returned buffer must be freed by the caller.
01239    // Wild cards '*' are allowed in the file name of 'path'; the first matching
01240    // instance is taken.
01241    // Returns 0 in case of error.
01242    XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
01244    XrdOucString emsg;
01245    TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);
01247    // Check input
01248    if (!path || strlen(path) <= 0) {
01249       TRACE(XERR, "path undefined!");
01250       return (char *)0;
01251    }
01253    // Locate the path resolving wild cards
01254    XrdOucString spath(path);
01255    if (LocateLocalFile(spath) != 0) {
01256       TRACE(XERR, "path cannot be resolved! (" << path << ")");
01257       return (char *)0;
01258    }
01259    const char *file = spath.c_str();
01261    // Open the file in read mode
01262    int fd = open(file, O_RDONLY);
01263    if (fd < 0) {
01264       emsg = "could not open ";
01265       emsg += file;
01266       TRACE(XERR, emsg);
01267       return (char *)0;
01268    }
01270    // Size of the output
01271    struct stat st;
01272    if (fstat(fd, &st) != 0) {
01273       emsg = "could not get size of file with stat: errno: ";
01274       emsg += (int)errno;
01275       TRACE(XERR, emsg);
01276       close(fd);
01277       return (char *)0;
01278    }
01279    off_t ltot = st.st_size;
01281    // Estimate offsets of the requested range
01282    // Start from ...
01283    kXR_int64 start = ofs;
01284    off_t fst = (start < 0) ? ltot + start : start;
01285    fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
01286    // End at ...
01287    kXR_int64 end = fst + len;
01288    off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end  : ltot);
01289    TRACE(DBG, "file size: " << ltot << ", read from: " << fst << " to " << lst);
01291    // Number of bytes to be read
01292    len = lst - fst;
01294    // Output buffer
01295    char *buf = (char *)malloc(len + 1);
01296    if (!buf) {
01297       emsg = "could not allocate enough memory on the heap: errno: ";
01298       emsg += (int)errno;
01299       XPDERR(emsg);
01300       close(fd);
01301       return (char *)0;
01302    }
01304    // Reposition, if needed
01305    if (fst >= 0)
01306       lseek(fd, fst, SEEK_SET);
01308    int left = len;
01309    int pos = 0;
01310    int nr = 0;
01311    do {
01312       while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
01313          errno = 0;
01314       if (nr < 0) {
01315          TRACE(XERR, "error reading from file: errno: " << errno);
01316          break;
01317       }
01319       // Update counters
01320       pos += nr;
01321       left -= nr;
01323    } while (nr > 0 && left > 0);
01325    // Termination
01326    buf[len] = 0;
01327    TRACE(HDBG, "read " << nr << " bytes: " << buf);
01329    // Close file
01330    close(fd);
01332    // Done
01333    return buf;
01334 }
01336 //______________________________________________________________________________
01337 char *XrdProofdNetMgr::ReadBufferLocal(const char *path,
01338                                        const char *pat, int &len, int opt)
01339 {
01340    // Grep lines matching 'pat' form 'path'; the returned buffer (length in 'len')
01341    // must be freed by the caller.
01342    // Wild cards '*' are allowed in the file name of 'path'; the first matching
01343    // instance is taken.
01344    // Returns 0 in case of error.
01345    XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
01347    XrdOucString emsg;
01348    TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);
01350    // Check input
01351    if (!path || strlen(path) <= 0) {
01352       TRACE(XERR, "file path undefined!");
01353       return (char *)0;
01354    }
01356    // Locate the path resolving wild cards
01357    XrdOucString spath(path);
01358    if (LocateLocalFile(spath) != 0) {
01359       TRACE(XERR, "path cannot be resolved! (" << path << ")");
01360       return (char *)0;
01361    }
01362    const char *file = spath.c_str();
01364    // Size of the output
01365    struct stat st;
01366    if (stat(file, &st) != 0) {
01367       emsg = "could not get size of file with stat: errno: ";
01368       emsg += (int)errno;
01369       TRACE(XERR, emsg);
01370       return (char *)0;
01371    }
01372    off_t ltot = st.st_size;
01374    // The grep command
01375    char *cmd = 0;
01376    int lcmd = 0;
01377    if (pat && strlen(pat) > 0) {
01378       lcmd = strlen(pat) + strlen(file) + 20;
01379       cmd = new char[lcmd];
01380       if (opt == 2) {
01381          sprintf(cmd, "grep -v %s %s", pat, file);
01382       } else {
01383          sprintf(cmd, "grep %s %s", pat, file);
01384       }
01385    } else {
01386       lcmd = strlen(file) + 10;
01387       cmd = new char[lcmd];
01388       sprintf(cmd, "cat %s", file);
01389    }
01390    TRACE(DBG, "cmd: " << cmd);
01392    // Execute the command in a pipe
01393    FILE *fp = popen(cmd, "r");
01394    if (!fp) {
01395       emsg = "could not run '";
01396       emsg += cmd;
01397       emsg += "'";
01398       TRACE(XERR, emsg);
01399       delete[] cmd;
01400       return (char *)0;
01401    }
01402    delete[] cmd;
01404    // Read line by line
01405    len = 0;
01406    char *buf = 0;
01407    char line[2048];
01408    int bufsiz = 0, left = 0, lines = 0;
01409    while ((ltot > 0) && fgets(line, sizeof(line), fp)) {
01410       // Parse the line
01411       int llen = strlen(line);
01412       ltot -= llen;
01413       lines++;
01414       // (Re-)allocate the buffer
01415       if (!buf || (llen > left)) {
01416          int dsiz = 100 * ((int)((len + llen) / lines) + 1);
01417          dsiz = (dsiz > llen) ? dsiz : llen;
01418          bufsiz += dsiz;
01419          buf = (char *)realloc(buf, bufsiz + 1);
01420          left += dsiz;
01421       }
01422       if (!buf) {
01423          emsg = "could not allocate enough memory on the heap: errno: ";
01424          emsg += (int)errno;
01425          TRACE(XERR, emsg);
01426          pclose(fp);
01427          return (char *)0;
01428       }
01429       // Add line to the buffer
01430       memcpy(buf + len, line, llen);
01431       len += llen;
01432       left -= llen;
01433       if (TRACING(HDBG))
01434          fprintf(stderr, "line: %s", line);
01435    }
01437    // Check the result and terminate the buffer
01438    if (buf) {
01439       if (len > 0) {
01440          buf[len] = 0;
01441       } else {
01442          free(buf);
01443          buf = 0;
01444       }
01445    }
01447    // Close file
01448    pclose(fp);
01450    // Done
01451    return buf;
01452 }
01454 //______________________________________________________________________________
01455 char *XrdProofdNetMgr::ReadBufferRemote(const char *url, const char *file,
01456                                         kXR_int64 ofs, int &len, int grep)
01457 {
01458    // Send a read buffer request of length 'len' at offset 'ofs' for remote file
01459    // defined by 'url'; the returned buffer must be freed by the caller.
01460    // Returns 0 in case of error.
01461    XPDLOC(NMGR, "NetMgr::ReadBufferRemote")
01463    TRACE(REQ, "url: " << (url ? url : "undef") <<
01464          ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
01465          ", len: " << len << ", grep: " << grep);
01467    // Check input
01468    if (!file || strlen(file) <= 0) {
01469       TRACE(XERR, "file undefined!");
01470       return (char *)0;
01471    }
01472    XrdClientUrlInfo u(url);
01473    if (!url || strlen(url) <= 0) {
01474       // Use file as url
01475       u.TakeUrl(XrdOucString(file));
01476       if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
01477    }
01479    // Get a connection (logs in)
01480    XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());
01482    char *buf = 0;
01483    if (conn && conn->IsValid()) {
01484       // Prepare request
01485       XPClientRequest reqhdr;
01486       memset(&reqhdr, 0, sizeof(reqhdr));
01487       conn->SetSID(reqhdr.header.streamid);
01488       reqhdr.header.requestid = kXP_readbuf;
01489       reqhdr.readbuf.ofs = ofs;
01490       reqhdr.readbuf.len = len;
01491       reqhdr.readbuf.int1 = grep;
01492       reqhdr.header.dlen = strlen(file);
01493       const void *btmp = (const void *) file;
01494       char **vout = &buf;
01495       // Send over
01496       XrdClientMessage *xrsp =
01497          conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadBufferRemote");
01499       // If positive answer
01500       if (xrsp && buf && (xrsp->DataLen() > 0)) {
01501          len = xrsp->DataLen();
01502       } else {
01503          if (xrsp && !(xrsp->IsError()))
01504             // The buffer was just empty: do not call it error
01505             len = 0;
01506          SafeFree(buf);
01507       }
01509       // Clean the message
01510       SafeDelete(xrsp);
01511       // Clean it up, to avoid leaving open tcp connection possibly going forever
01512       // into CLOSE_WAIT
01513       SafeDelete(conn);
01514    }
01516    // Done
01517    return buf;
01518 }
01520 //______________________________________________________________________________
01521 char *XrdProofdNetMgr::ReadLogPaths(const char *url, const char *msg, int isess)
01522 {
01523    // Get log paths from next tier; used in multi-master setups
01524    // Returns 0 in case of error.
01525    XPDLOC(NMGR, "NetMgr::ReadLogPaths")
01527    TRACE(REQ, "url: " << (url ? url : "undef") <<
01528          ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);
01530    // Check input
01531    if (!url || strlen(url) <= 0) {
01532       TRACE(XERR, "url undefined!");
01533       return (char *)0;
01534    }
01536    // Get a connection (logs in)
01537    XrdProofConn *conn = GetProofConn(url);
01539    char *buf = 0;
01540    if (conn && conn->IsValid()) {
01541       // Prepare request
01542       XPClientRequest reqhdr;
01543       memset(&reqhdr, 0, sizeof(reqhdr));
01544       conn->SetSID(reqhdr.header.streamid);
01545       reqhdr.header.requestid = kXP_admin;
01546       reqhdr.proof.int1 = kQueryLogPaths;
01547       reqhdr.proof.int2 = isess;
01548       reqhdr.proof.sid = -1;
01549       reqhdr.header.dlen = strlen(msg);
01550       const void *btmp = (const void *) msg;
01551       char **vout = &buf;
01552       // Send over
01553       XrdClientMessage *xrsp =
01554          conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadLogPaths");
01556       // If positive answer
01557       if (xrsp && buf && (xrsp->DataLen() > 0)) {
01558          int len = xrsp->DataLen();
01559          buf = (char *) realloc((void *)buf, len + 1);
01560          if (buf)
01561             buf[len] = 0;
01562       } else {
01563          SafeFree(buf);
01564       }
01566       // Clean the message
01567       SafeDelete(xrsp);
01568       // Clean it up, to avoid leaving open tcp connection possibly going forever
01569       // into CLOSE_WAIT
01570       SafeDelete(conn);
01571    }
01573    // Done
01574    return buf;
01575 }
01577 //__________________________________________________________________________
01578 void XrdProofdNetMgr::CreateDefaultPROOFcfg()
01579 {
01580    // Fill-in fWorkers for a localhost based on the number of
01581    // workers fNumLocalWrks.
01582    XPDLOC(NMGR, "NetMgr::CreateDefaultPROOFcfg")
01584    TRACE(DBG, "enter: local workers: " << fNumLocalWrks);
01586    // Lock the method to protect the lists.
01587    XrdSysMutexHelper mhp(fMutex);
01589    // Cleanup the worker list
01590    fWorkers.clear();
01591    // The first time we need to create the default workers
01592    if (fDfltWorkers.size() < 1) {
01593       // Create a default master line
01594       XrdOucString mm("master ", 128);
01595       mm += fMgr->Host();
01596       fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
01598       // Create 'localhost' lines for each worker
01599       int nwrk = fNumLocalWrks;
01600       if (nwrk > 0) {
01601          mm = "worker localhost port=";
01602          mm += fMgr->Port();
01603          while (nwrk--) {
01604             fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
01605             TRACE(DBG, "added line: " << mm);
01606          }
01607       }
01608    }
01610    // Copy the list
01611    std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
01612    for (; w != fDfltWorkers.end(); w++) {
01613       fWorkers.push_back(*w);
01614    }
01616    TRACE(DBG, "done: " << fWorkers.size() - 1 << " workers");
01618    // Find unique nodes
01619    FindUniqueNodes();
01621    // We are done
01622    return;
01623 }
01625 //__________________________________________________________________________
01626 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
01627 {
01628    // Return the list of workers after having made sure that the info is
01629    // up-to-date
01630    XPDLOC(NMGR, "NetMgr::GetActiveWorkers")
01632    XrdSysMutexHelper mhp(fMutex);
01634    if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
01635       // Check if there were any changes in the config file
01636       if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
01637          if (fDfltFallback) {
01638             // Use default settings
01639             CreateDefaultPROOFcfg();
01640             TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
01641          } else {
01642             TRACE(XERR, "unable to read the configuration file");
01643             return (std::list<XrdProofWorker *> *)0;
01644          }
01645       }
01646    }
01647    TRACE(DBG,  "returning list with " << fWorkers.size() << " entries");
01649    if (TRACING(HDBG)) Dump();
01651    return &fWorkers;
01652 }
01654 //__________________________________________________________________________
01655 void XrdProofdNetMgr::Dump()
01656 {
01657    // Dump status
01658    const char *xpdloc = "NetMgr::Dump";
01660    XrdSysMutexHelper mhp(fMutex);
01662    XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
01663    XPDPRT("+ Active workers status");
01664    XPDPRT("+ Size: " << fWorkers.size());
01665    XPDPRT("+ ");
01667    std::list<XrdProofWorker *>::iterator iw;
01668    for (iw = fWorkers.begin(); iw != fWorkers.end(); iw++) {
01669       XPDPRT("+ wrk: " << (*iw)->fHost << ":" << (*iw)->fPort << " type:" << (*iw)->fType <<
01670              " active sessions:" << (*iw)->Active());
01671    }
01672    XPDPRT("+ ");
01673    XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
01674 }
01676 //__________________________________________________________________________
01677 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
01678 {
01679    // Return the list of unique nodes after having made sure that the info is
01680    // up-to-date
01681    XPDLOC(NMGR, "NetMgr::GetNodes")
01683    XrdSysMutexHelper mhp(fMutex);
01685    if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
01686       // Check if there were any changes in the config file
01687       if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
01688          if (fDfltFallback) {
01689             // Use default settings
01690             CreateDefaultPROOFcfg();
01691             TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
01692          } else {
01693             TRACE(XERR, "unable to read the configuration file");
01694             return (std::list<XrdProofWorker *> *)0;
01695          }
01696       }
01697    }
01698    TRACE(DBG, "returning list with " << fNodes.size() << " entries");
01700    return &fNodes;
01701 }
01703 //__________________________________________________________________________
01704 int XrdProofdNetMgr::ReadPROOFcfg(bool reset)
01705 {
01706    // Read PROOF config file and load the information in fWorkers.
01707    // NB: 'master' information here is ignored, because it is passed
01708    //     via the 'xpd.workdir' and 'xpd.image' config directives
01709    XPDLOC(NMGR, "NetMgr::ReadPROOFcfg")
01711    TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);
01713    // Lock the method to protect the lists.
01714    XrdSysMutexHelper mhp(fMutex);
01716    // Check inputs
01717    if (fPROOFcfg.fName.length() <= 0)
01718       return -1;
01720    // Get the modification time
01721    struct stat st;
01722    if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
01723       // If the file disappeared, reset the modification time so that we are sure
01724       // to reload it if it comes back
01725       if (errno == ENOENT) fPROOFcfg.fMtime = -1;
01726       if (!fDfltFallback) {
01727          TRACE(XERR, "unable to stat file: " << fPROOFcfg.fName << " - errno: " << errno);
01728       } else {
01729          TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration");
01730       }
01731       return -1;
01732    }
01733    TRACE(DBG, "time of last modification: " << st.st_mtime);
01735    // File should be loaded only once
01736    if (st.st_mtime <= fPROOFcfg.fMtime)
01737       return 0;
01739    // Save the modification time
01740    fPROOFcfg.fMtime = st.st_mtime;
01742    // Open the defined path.
01743    FILE *fin = 0;
01744    if (!(fin = fopen(fPROOFcfg.fName.c_str(), "r"))) {
01745       if (fWorkers.size() > 1) {
01746          TRACE(XERR, "unable to fopen file: " << fPROOFcfg.fName << " - errno: " << errno);
01747          TRACE(XERR, "continuing with existing list of workers.");
01748          return 0;
01749       } else {
01750          return -1;
01751       }
01752    }
01754    if (reset) {
01755       // Cleanup the worker list
01756       fWorkers.clear();
01757    }
01759    // Add default a master line if not yet there
01760    if (fRegWorkers.size() < 1) {
01761       XrdOucString mm("master ", 128);
01762       mm += fMgr->Host();
01763       fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
01764    } else {
01765       // Deactivate all current active workers
01766       std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
01767       // Skip the master line
01768       w++;
01769       for (; w != fRegWorkers.end(); w++) {
01770          (*w)->fActive = 0;
01771       }
01772    }
01774    // Read now the directives
01775    int nw = 0;
01776    char lin[2048];
01777    while (fgets(lin, sizeof(lin), fin)) {
01778       // Skip empty lines
01779       int p = 0;
01780       while (lin[p++] == ' ') {
01781          ;
01782       }
01783       p--;
01784       if (lin[p] == '\0' || lin[p] == '\n')
01785          continue;
01787       // Skip comments
01788       if (lin[0] == '#')
01789          continue;
01791       // Remove trailing '\n';
01792       if (lin[strlen(lin)-1] == '\n')
01793          lin[strlen(lin)-1] = '\0';
01795       TRACE(DBG, "found line: " << lin);
01797       // Parse the line
01798       XrdProofWorker *pw = new XrdProofWorker(lin);
01800       const char *pfx[2] = { "master", "node" };
01801       if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
01802           !strncmp(lin, pfx[1], strlen(pfx[1]))) {
01803          // Init a master instance
01804          if (pw->fHost.beginswith("localhost") ||
01805              pw->Matches(fMgr->Host())) {
01806             // Replace the default line (the first with what found in the file)
01807             XrdProofWorker *fw = fRegWorkers.front();
01808             fw->Reset(lin);
01809          }
01810          // Ignore it
01811          SafeDelete(pw);
01812       } else {
01813          // Check if we have already it
01814          std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
01815          // Skip the master line
01816          w++;
01817          bool haveit = 0;
01818          while (w != fRegWorkers.end()) {
01819             if (!((*w)->fActive)) {
01820                if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
01821                   (*w)->fActive = 1;
01822                   haveit = 1;
01823                   break;
01824                }
01825             }
01826             // Go to next
01827             w++;
01828          }
01829          // If we do not have it, build a new worker object
01830          if (!haveit) {
01831             // Keep it
01832             fRegWorkers.push_back(pw);
01833          } else {
01834             // Drop it
01835             SafeDelete(pw);
01836          }
01837       }
01838    }
01840    // Copy the active workers
01841    std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
01842    while (w != fRegWorkers.end()) {
01843       if ((*w)->fActive) {
01844          fWorkers.push_back(*w);
01845          nw++;
01846       }
01847       w++;
01848    }
01850    // Close files
01851    fclose(fin);
01853    // Find unique nodes
01854    if (reset)
01855       FindUniqueNodes();
01857    // We are done
01858    return ((nw == 0) ? -1 : 0);
01859 }
01861 //__________________________________________________________________________
01862 int XrdProofdNetMgr::FindUniqueNodes()
01863 {
01864    // Scan fWorkers for unique nodes (stored in fNodes).
01865    // Return the number of unque nodes.
01866    // NB: 'master' information here is ignored, because it is passed
01867    //     via the 'xpd.workdir' and 'xpd.image' config directives
01868    XPDLOC(NMGR, "NetMgr::FindUniqueNodes")
01870    TRACE(REQ, "# workers: " << fWorkers.size());
01872    // Cleanup the nodes list
01873    fNodes.clear();
01875    // Build the list of unique nodes (skip the master line);
01876    if (fWorkers.size() > 1) {
01877       std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
01878       w++;
01879       for (; w != fWorkers.end(); w++) if ((*w)->fActive) {
01880             bool add = 1;
01881             std::list<XrdProofWorker *>::iterator n;
01882             for (n = fNodes.begin() ; n != fNodes.end(); n++) {
01883                if ((*n)->Matches(*w)) {
01884                   add = 0;
01885                   break;
01886                }
01887             }
01888             if (add)
01889                fNodes.push_back(*w);
01890          }
01891    }
01892    TRACE(REQ, "found " << fNodes.size() << " unique nodes");
01894    // We are done
01895    return fNodes.size();
01896 }

Generated on Tue Jul 5 14:51:51 2011 for ROOT_528-00b_version by  doxygen 1.5.1