XrdProofWorker.cxx

Go to the documentation of this file.
00001 // @(#)root/proofd:$Id: XrdProofWorker.cxx 36936 2010-11-25 13:52:27Z rdm $
00002 // Author: Gerardo Ganis  June 2007
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // XrdProofWorker                                                       //
00015 //                                                                      //
00016 // Authors: G. Ganis, CERN, 2007                                        //
00017 //                                                                      //
00018 // Class with information about a potential worker.                     //
00019 // A list of instances of this class is built using the config file or  //
00020 // or the information collected from the resource discoverers.          //
00021 //                                                                      //
00022 //////////////////////////////////////////////////////////////////////////
00023 
00024 #include <time.h>
00025 
00026 #include "XrdProofdAux.h"
00027 #include "XrdProofWorker.h"
00028 #include "XrdProofdProofServ.h"
00029 #include "XrdClient/XrdClientUrlInfo.hh"
00030 #include "XrdNet/XrdNetDNS.hh"
00031 #include "XProofProtocol.h"
00032 
00033 // Tracing utilities
00034 #include "XrdProofdTrace.h"
00035 
00036 //______________________________________________________________________________
00037 XrdProofWorker::XrdProofWorker(const char *str)
00038    : fExport(256), fType('W'), fPort(-1), fPerfIdx(100), fActive(1)
00039 {
00040    // Constructor from a config file-like string
00041 
00042    fMutex = new XrdSysRecMutex;
00043 
00044    // Make sure we got something to parse
00045    if (!str || strlen(str) <= 0)
00046       return;
00047 
00048    // The actual work is done by Reset()
00049    Reset(str);
00050 }
00051 //__________________________________________________________________________
00052 XrdProofWorker::~XrdProofWorker()
00053 {
00054    // Destructor
00055 
00056    SafeDelete(fMutex);
00057 }
00058 
00059 //______________________________________________________________________________
00060 void XrdProofWorker::Reset(const char *str)
00061 {
00062    // Set content from a config file-like string
00063    XPDLOC(NMGR, "Worker::Reset")
00064 
00065 
00066    // Reinit vars
00067    fExport = "";
00068    fType = 'W';
00069    fHost = "";
00070    fPort = XPD_DEF_PORT;
00071    fPerfIdx = 100;
00072    fImage = "";
00073    fWorkDir = "";
00074    fMsd = "";
00075    fId = "";
00076 
00077    // Make sure we got something to parse
00078    if (!str || strlen(str) <= 0)
00079       return;
00080 
00081    // Tokenize the string
00082    XrdOucString s(str);
00083 
00084    // First token is the type
00085    XrdOucString tok;
00086    XrdOucString typestr = "master|submaster|worker|slave";
00087    int from = s.tokenize(tok, 0, ' ');
00088    if (from == STR_NPOS || typestr.find(tok) == STR_NPOS)
00089       return;
00090    if (tok == "submaster")
00091       fType = 'S';
00092    else if (tok == "master")
00093       fType = 'M';
00094 
00095    // Next token is the user@host:port string, make sure it is a full qualified host name
00096    if ((from = s.tokenize(tok, from, ' ')) == STR_NPOS)
00097       return;
00098    XrdClientUrlInfo ui(tok.c_str());
00099    // Take the user name, if specified
00100    fUser = ui.User;
00101    char *err;
00102    char *fullHostName = XrdNetDNS::getHostName((char *)ui.Host.c_str(), &err);
00103    if (!fullHostName || !strcmp(fullHostName, "0.0.0.0")) {
00104       TRACE(XERR, "DNS could not resolve '" << ui.Host << "'");
00105       return;
00106    }
00107    fHost = fullHostName;
00108    SafeFree(fullHostName);
00109    // Take the port, if specified
00110    fPort = (ui.Port > 0) ? ui.Port : fPort;
00111 
00112    // and then the remaining options
00113    while ((from = s.tokenize(tok, from, ' ')) != STR_NPOS) {
00114       if (tok.beginswith("workdir=")) {
00115          // Working dir
00116          tok.replace("workdir=", "");
00117          fWorkDir = tok;
00118       } else if (tok.beginswith("image=")) {
00119          // Image
00120          tok.replace("image=", "");
00121          fImage = tok;
00122       } else if (tok.beginswith("msd=")) {
00123          // Mass storage domain
00124          tok.replace("msd=", "");
00125          fMsd = tok;
00126       } else if (tok.beginswith("port=")) {
00127          // Port
00128          tok.replace("port=", "");
00129          fPort = strtol(tok.c_str(), (char **)0, 10);
00130       } else if (tok.beginswith("perf=")) {
00131          // Performance index
00132          tok.replace("perf=", "");
00133          fPerfIdx = strtol(tok.c_str(), (char **)0, 10);
00134       } else if (!tok.beginswith("repeat=")) {
00135          // Unknown
00136          TRACE(XERR, "ignoring unknown option '" << tok << "'");
00137       }
00138    }
00139 }
00140 
00141 //______________________________________________________________________________
00142 bool XrdProofWorker::Matches(const char *host)
00143 {
00144    // Check compatibility of host with this instance.
00145    // return 1 if compatible.
00146 
00147    return ((fHost.matches(host)) ? 1 : 0);
00148 }
00149 
00150 //______________________________________________________________________________
00151 bool XrdProofWorker::Matches(XrdProofWorker *wrk)
00152 {
00153    // Set content from a config file-like string
00154 
00155    // Check if 'wrk' is on the same node that 'this'; used to find the unique
00156    // worker nodes.
00157    // return 1 if the node is the same.
00158 
00159    if (wrk) {
00160       // Check Host names
00161       if (wrk->fHost == fHost) {
00162          // Check ports
00163          int pa = (fPort > 0) ? fPort : XPD_DEF_PORT;
00164          int pb = (wrk->fPort > 0) ? wrk->fPort : XPD_DEF_PORT;
00165          if (pa == pb)
00166             return 1;
00167       }
00168    }
00169 
00170    // They do not match
00171    return 0;
00172 }
00173 
00174 //______________________________________________________________________________
00175 const char *XrdProofWorker::Export(const char *ord)
00176 {
00177    // Export current content in a form understood by parsing algorithms
00178    // inside the PROOF session, i.e.
00179    // <type>|<user@host>|<port>|<ord>|-|<perfidx>|<img>|<workdir>|<msd>
00180    XPDLOC(NMGR, "Worker::Export")
00181 
00182    fExport = fType;
00183 
00184    // Add user@host
00185    fExport += '|' ;
00186    if (fUser.length() > 0) {
00187       fExport += fUser;
00188       fExport += "@";
00189    }
00190    fExport += fHost;
00191 
00192    // Add port
00193    if (fPort > 0) {
00194       fExport += '|' ;
00195       fExport += fPort;
00196    } else
00197       fExport += "|-";
00198 
00199    // Ordinal only if passed as argument
00200    if (ord && strlen(ord) > 0) {
00201       // Add ordinal
00202       fExport += '|' ;
00203       fExport += ord;
00204    } else {
00205       // No ordinal at this level
00206       fExport += "|-";
00207    }
00208    // ID at this level
00209    fExport += "|-";
00210 
00211    // Add performance index
00212    fExport += '|' ;
00213    fExport += fPerfIdx;
00214 
00215    // Add image
00216    if (fImage.length() > 0) {
00217       fExport += '|' ;
00218       fExport += fImage;
00219    } else
00220       fExport += "|-";
00221 
00222    // Add workdir
00223    if (fWorkDir.length() > 0) {
00224       fExport += '|' ;
00225       fExport += fWorkDir;
00226    } else
00227       fExport += "|-";
00228 
00229    // Add mass storage domain
00230    if (fMsd.length() > 0) {
00231       fExport += '|' ;
00232       fExport += fMsd;
00233    } else
00234       fExport += "|-";
00235 
00236    // We are done
00237    TRACE(DBG, "sending: " << fExport);
00238    return fExport.c_str();
00239 }
00240 
00241 //______________________________________________________________________________
00242 int XrdProofWorker::GetNActiveSessions()
00243 {
00244    // Calculate the number of workers existing on this node which are
00245    // currently running.
00246    // TODO: optimally, one could contact the packetizer and count the
00247    // opened files.
00248 
00249    int myRunning = 0;
00250    std::list<XrdProofdProofServ *>::iterator iter;
00251    XrdSysMutexHelper mhp(fMutex);
00252    for (iter = fProofServs.begin(); iter != fProofServs.end(); ++iter) {
00253       if (*iter) {
00254          if ((*iter)->Status() == kXPD_running)
00255             myRunning++;
00256       }
00257    }
00258    return myRunning;
00259 }
00260 
00261 //______________________________________________________________________________
00262 void XrdProofWorker::MergeProofServs(const XrdProofWorker &other)
00263 {
00264    // Merge session objects from the other worker object in order to merge all
00265    // the objects in only one. This was added to support hybrid satatically and
00266    // dinamically Bonjour workers discovery.
00267 
00268    std::list<XrdProofdProofServ *>::const_iterator iter;
00269    XrdSysMutexHelper mhp(fMutex);
00270    for (iter = other.fProofServs.begin(); iter != other.fProofServs.end(); ++iter) {
00271       this->fProofServs.push_back(*iter);
00272    }
00273 }
00274 
00275 //______________________________________________________________________________
00276 void XrdProofWorker::Sort(std::list<XrdProofWorker *> *lst,
00277                           bool (*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
00278 {
00279    // Sort ascendingly the list according to the comparing algorithm defined
00280    // by 'f'; 'f' should return 'true' if 'rhs' > 'lhs'.
00281    // This is implemented because on Solaris where std::list::sort() does not
00282    // support an alternative comparison algorithm.
00283 
00284    // Check argument
00285    if (!lst)
00286       return;
00287 
00288    // If empty or just one element, nothing to do
00289    if (lst->size() < 2)
00290       return;
00291 
00292    // Fill a temp array with the current status
00293    XrdProofWorker **ta = new XrdProofWorker *[lst->size() - 1];
00294    std::list<XrdProofWorker *>::iterator i = lst->begin();
00295    i++; // skip master
00296    int n = 0;
00297    for (; i != lst->end(); ++i)
00298       ta[n++] = *i;
00299 
00300    // Now start the loops
00301    XrdProofWorker *tmp = 0;
00302    bool notyet = 1;
00303    int jold = 0;
00304    while (notyet) {
00305       int j = jold;
00306       while (j < n - 1) {
00307          if (f(ta[j], ta[j+1]))
00308             break;
00309          j++;
00310       }
00311       if (j >= n - 1) {
00312          notyet = 0;
00313       } else {
00314          jold = j + 1;
00315          XPDSWAP(ta[j], ta[j+1], tmp);
00316          int k = j;
00317          while (k > 0) {
00318             if (!f(ta[k], ta[k-1])) {
00319                XPDSWAP(ta[k], ta[k-1], tmp);
00320             } else {
00321                break;
00322             }
00323             k--;
00324          }
00325       }
00326    }
00327 
00328    // Empty the original list
00329    XrdProofWorker *mst = lst->front();
00330    lst->clear();
00331    lst->push_back(mst);
00332 
00333    // Fill it again
00334    while (n--)
00335       lst->push_back(ta[n]);
00336 
00337    // Clean up
00338    delete[] ta;
00339 }

Generated on Tue Jul 5 14:51:52 2011 for ROOT_528-00b_version by  doxygen 1.5.1