00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <time.h>
00025
00026 #include "XrdProofdAux.h"
00027 #include "XrdProofWorker.h"
00028 #include "XrdProofdProofServ.h"
00029 #include "XrdClient/XrdClientUrlInfo.hh"
00030 #include "XrdNet/XrdNetDNS.hh"
00031 #include "XProofProtocol.h"
00032
00033
00034 #include "XrdProofdTrace.h"
00035
00036
00037 XrdProofWorker::XrdProofWorker(const char *str)
00038 : fExport(256), fType('W'), fPort(-1), fPerfIdx(100), fActive(1)
00039 {
00040
00041
00042 fMutex = new XrdSysRecMutex;
00043
00044
00045 if (!str || strlen(str) <= 0)
00046 return;
00047
00048
00049 Reset(str);
00050 }
00051
00052 XrdProofWorker::~XrdProofWorker()
00053 {
00054
00055
00056 SafeDelete(fMutex);
00057 }
00058
00059
00060 void XrdProofWorker::Reset(const char *str)
00061 {
00062
00063 XPDLOC(NMGR, "Worker::Reset")
00064
00065
00066
00067 fExport = "";
00068 fType = 'W';
00069 fHost = "";
00070 fPort = XPD_DEF_PORT;
00071 fPerfIdx = 100;
00072 fImage = "";
00073 fWorkDir = "";
00074 fMsd = "";
00075 fId = "";
00076
00077
00078 if (!str || strlen(str) <= 0)
00079 return;
00080
00081
00082 XrdOucString s(str);
00083
00084
00085 XrdOucString tok;
00086 XrdOucString typestr = "master|submaster|worker|slave";
00087 int from = s.tokenize(tok, 0, ' ');
00088 if (from == STR_NPOS || typestr.find(tok) == STR_NPOS)
00089 return;
00090 if (tok == "submaster")
00091 fType = 'S';
00092 else if (tok == "master")
00093 fType = 'M';
00094
00095
00096 if ((from = s.tokenize(tok, from, ' ')) == STR_NPOS)
00097 return;
00098 XrdClientUrlInfo ui(tok.c_str());
00099
00100 fUser = ui.User;
00101 char *err;
00102 char *fullHostName = XrdNetDNS::getHostName((char *)ui.Host.c_str(), &err);
00103 if (!fullHostName || !strcmp(fullHostName, "0.0.0.0")) {
00104 TRACE(XERR, "DNS could not resolve '" << ui.Host << "'");
00105 return;
00106 }
00107 fHost = fullHostName;
00108 SafeFree(fullHostName);
00109
00110 fPort = (ui.Port > 0) ? ui.Port : fPort;
00111
00112
00113 while ((from = s.tokenize(tok, from, ' ')) != STR_NPOS) {
00114 if (tok.beginswith("workdir=")) {
00115
00116 tok.replace("workdir=", "");
00117 fWorkDir = tok;
00118 } else if (tok.beginswith("image=")) {
00119
00120 tok.replace("image=", "");
00121 fImage = tok;
00122 } else if (tok.beginswith("msd=")) {
00123
00124 tok.replace("msd=", "");
00125 fMsd = tok;
00126 } else if (tok.beginswith("port=")) {
00127
00128 tok.replace("port=", "");
00129 fPort = strtol(tok.c_str(), (char **)0, 10);
00130 } else if (tok.beginswith("perf=")) {
00131
00132 tok.replace("perf=", "");
00133 fPerfIdx = strtol(tok.c_str(), (char **)0, 10);
00134 } else if (!tok.beginswith("repeat=")) {
00135
00136 TRACE(XERR, "ignoring unknown option '" << tok << "'");
00137 }
00138 }
00139 }
00140
00141
00142 bool XrdProofWorker::Matches(const char *host)
00143 {
00144
00145
00146
00147 return ((fHost.matches(host)) ? 1 : 0);
00148 }
00149
00150
00151 bool XrdProofWorker::Matches(XrdProofWorker *wrk)
00152 {
00153
00154
00155
00156
00157
00158
00159 if (wrk) {
00160
00161 if (wrk->fHost == fHost) {
00162
00163 int pa = (fPort > 0) ? fPort : XPD_DEF_PORT;
00164 int pb = (wrk->fPort > 0) ? wrk->fPort : XPD_DEF_PORT;
00165 if (pa == pb)
00166 return 1;
00167 }
00168 }
00169
00170
00171 return 0;
00172 }
00173
00174
00175 const char *XrdProofWorker::Export(const char *ord)
00176 {
00177
00178
00179
00180 XPDLOC(NMGR, "Worker::Export")
00181
00182 fExport = fType;
00183
00184
00185 fExport += '|' ;
00186 if (fUser.length() > 0) {
00187 fExport += fUser;
00188 fExport += "@";
00189 }
00190 fExport += fHost;
00191
00192
00193 if (fPort > 0) {
00194 fExport += '|' ;
00195 fExport += fPort;
00196 } else
00197 fExport += "|-";
00198
00199
00200 if (ord && strlen(ord) > 0) {
00201
00202 fExport += '|' ;
00203 fExport += ord;
00204 } else {
00205
00206 fExport += "|-";
00207 }
00208
00209 fExport += "|-";
00210
00211
00212 fExport += '|' ;
00213 fExport += fPerfIdx;
00214
00215
00216 if (fImage.length() > 0) {
00217 fExport += '|' ;
00218 fExport += fImage;
00219 } else
00220 fExport += "|-";
00221
00222
00223 if (fWorkDir.length() > 0) {
00224 fExport += '|' ;
00225 fExport += fWorkDir;
00226 } else
00227 fExport += "|-";
00228
00229
00230 if (fMsd.length() > 0) {
00231 fExport += '|' ;
00232 fExport += fMsd;
00233 } else
00234 fExport += "|-";
00235
00236
00237 TRACE(DBG, "sending: " << fExport);
00238 return fExport.c_str();
00239 }
00240
00241
00242 int XrdProofWorker::GetNActiveSessions()
00243 {
00244
00245
00246
00247
00248
00249 int myRunning = 0;
00250 std::list<XrdProofdProofServ *>::iterator iter;
00251 XrdSysMutexHelper mhp(fMutex);
00252 for (iter = fProofServs.begin(); iter != fProofServs.end(); ++iter) {
00253 if (*iter) {
00254 if ((*iter)->Status() == kXPD_running)
00255 myRunning++;
00256 }
00257 }
00258 return myRunning;
00259 }
00260
00261
00262 void XrdProofWorker::MergeProofServs(const XrdProofWorker &other)
00263 {
00264
00265
00266
00267
00268 std::list<XrdProofdProofServ *>::const_iterator iter;
00269 XrdSysMutexHelper mhp(fMutex);
00270 for (iter = other.fProofServs.begin(); iter != other.fProofServs.end(); ++iter) {
00271 this->fProofServs.push_back(*iter);
00272 }
00273 }
00274
00275
00276 void XrdProofWorker::Sort(std::list<XrdProofWorker *> *lst,
00277 bool (*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
00278 {
00279
00280
00281
00282
00283
00284
00285 if (!lst)
00286 return;
00287
00288
00289 if (lst->size() < 2)
00290 return;
00291
00292
00293 XrdProofWorker **ta = new XrdProofWorker *[lst->size() - 1];
00294 std::list<XrdProofWorker *>::iterator i = lst->begin();
00295 i++;
00296 int n = 0;
00297 for (; i != lst->end(); ++i)
00298 ta[n++] = *i;
00299
00300
00301 XrdProofWorker *tmp = 0;
00302 bool notyet = 1;
00303 int jold = 0;
00304 while (notyet) {
00305 int j = jold;
00306 while (j < n - 1) {
00307 if (f(ta[j], ta[j+1]))
00308 break;
00309 j++;
00310 }
00311 if (j >= n - 1) {
00312 notyet = 0;
00313 } else {
00314 jold = j + 1;
00315 XPDSWAP(ta[j], ta[j+1], tmp);
00316 int k = j;
00317 while (k > 0) {
00318 if (!f(ta[k], ta[k-1])) {
00319 XPDSWAP(ta[k], ta[k-1], tmp);
00320 } else {
00321 break;
00322 }
00323 k--;
00324 }
00325 }
00326 }
00327
00328
00329 XrdProofWorker *mst = lst->front();
00330 lst->clear();
00331 lst->push_back(mst);
00332
00333
00334 while (n--)
00335 lst->push_back(ta[n]);
00336
00337
00338 delete[] ta;
00339 }