00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdCmsClientManCVSID = "$Id: XrdCmsClientMan.cc 38011 2011-02-08 18:35:57Z ganis $";
00016
00017 #include <time.h>
00018
00019 #include "XrdCms/XrdCmsClientMan.hh"
00020 #include "XrdCms/XrdCmsClientMsg.hh"
00021 #include "XrdCms/XrdCmsLogin.hh"
00022 #include "XrdCms/XrdCmsTrace.hh"
00023
00024 #include "XrdSys/XrdSysError.hh"
00025 #include "XrdSys/XrdSysTimer.hh"
00026
00027 #include "Xrd/XrdInet.hh"
00028 #include "Xrd/XrdLink.hh"
00029
00030 using namespace XrdCms;
00031
00032
00033
00034
00035
00036 extern XrdInet *XrdXrootdNetwork;
00037
00038 XrdNetBufferQ XrdCmsClientMan::BuffQ(2048,64);
00039
00040 char XrdCmsClientMan::doDebug = 0;
00041
00042 char *XrdCmsClientMan::ConfigFN = 0;
00043
00044 XrdSysMutex XrdCmsClientMan::manMutex;
00045
00046
00047
00048
00049
00050 XrdCmsClientMan::XrdCmsClientMan(char *host, int port,
00051 int cw, int nr, int rw, int rd)
00052 : syncResp(0)
00053 {
00054 static XrdSysMutex initMutex;
00055 static int Instance = 0;
00056 char *dot;
00057
00058 Host = strdup(host);
00059 if ((dot = index(Host, '.')))
00060 {*dot = '\0'; HPfx = strdup(Host); *dot = '.';}
00061 else HPfx = strdup(Host);
00062 Port = port;
00063 Link = 0;
00064 Active = 0;
00065 Silent = 0;
00066 Suspend = 1;
00067 RecvCnt = 0;
00068 nrMax = nr;
00069 NetBuff = BuffQ.Alloc();
00070 repWMax = rw;
00071 repWait = 0;
00072 minDelay= rd;
00073 maxDelay= rd*3;
00074 chkCount= chkVal;
00075 lastUpdt= lastTOut = time(0);
00076
00077
00078
00079 dally = cw / 2 - 1;
00080 if (dally < 3) dally = 3;
00081 else if (dally > 10) dally = 10;
00082
00083
00084
00085 initMutex.Lock();
00086 manMask = 1<<Instance++;
00087 initMutex.UnLock();
00088 }
00089
00090
00091
00092
00093
00094 XrdCmsClientMan::~XrdCmsClientMan()
00095 {
00096 if (Link) Link->Close();
00097 if (Host) free(Host);
00098 if (HPfx) free(HPfx);
00099 if (NetBuff) NetBuff->Recycle();
00100 }
00101
00102
00103
00104
00105
00106 int XrdCmsClientMan::delayResp(XrdOucErrInfo &Resp)
00107 {
00108 XrdCmsResp *rp;
00109 int msgid;
00110
00111
00112
00113 if (!(msgid = Resp.getErrInfo()))
00114 {Say.Emsg("Manager", Host, "supplied invalid waitr msgid");
00115 Resp.setErrInfo(0, "redirector protocol error");
00116 syncResp.Post();
00117 return -EINVAL;
00118 }
00119
00120
00121
00122 if (!(rp = XrdCmsResp::Alloc(&Resp, msgid)))
00123 {Say.Emsg("Manager",ENOMEM,"allocate resp object for",Resp.getErrUser());
00124 Resp.setErrInfo(0, "0");
00125 syncResp.Post();
00126 return -EAGAIN;
00127 }
00128
00129
00130
00131
00132 if (msgid < maxMsgID) RespQ.Purge();
00133 maxMsgID = msgid;
00134 RespQ.Add(rp);
00135
00136
00137
00138
00139
00140 Resp.setErrInfo(0, "");
00141 syncResp.Post();
00142 return -EINPROGRESS;
00143 }
00144
00145
00146
00147
00148
00149 int XrdCmsClientMan::Send(char *msg, int mlen)
00150 {
00151 int allok = 0;
00152
00153
00154
00155 if (!mlen) mlen = strlen(msg);
00156
00157
00158
00159 if (Active)
00160 {myData.Lock();
00161 if (Link)
00162 {if (!(allok = Link->Send(msg, mlen) > 0))
00163 {Active = 0;
00164 Link->Close(1);
00165 } else SendCnt++;
00166 }
00167 myData.UnLock();
00168 }
00169
00170
00171
00172 return allok;
00173 }
00174
00175
00176
00177 int XrdCmsClientMan::Send(const struct iovec *iov, int iovcnt, int iotot)
00178 {
00179 int allok = 0;
00180
00181
00182
00183 if (Active)
00184 {myData.Lock();
00185 if (Link)
00186 {if (!(allok = Link->Send(iov, iovcnt, iotot) > 0))
00187 {Active = 0;
00188 Link->Close(1);
00189 } else SendCnt++;
00190 }
00191 myData.UnLock();
00192 }
00193
00194
00195
00196 return allok;
00197 }
00198
00199
00200
00201
00202
00203 void *XrdCmsClientMan::Start()
00204 {
00205
00206
00207
00208 do {Hookup();
00209
00210
00211
00212
00213
00214
00215
00216
00217 while(Receive())
00218 if (Response.modifier & CmsResponse::kYR_async) relayResp();
00219 else if (Response.rrCode == kYR_status) setStatus();
00220 else if (XrdCmsClientMsg::Reply(HPfx, Response, NetBuff))
00221 {if (Response.rrCode == kYR_waitresp) syncResp.Wait();}
00222
00223
00224
00225 myData.Lock();
00226 if (Link) {Link->Close(); Link = 0;}
00227 Active = 0; Suspend = 1;
00228 myData.UnLock();
00229
00230
00231
00232 Say.Emsg("ClientMan", "Disconnected from", Host);
00233 XrdSysTimer::Snooze(dally);
00234 } while(1);
00235
00236
00237
00238 return (void *)0;
00239 }
00240
00241
00242
00243
00244
00245 int XrdCmsClientMan::whatsUp(const char *user, const char *path)
00246 {
00247 EPNAME("whatsUp");
00248 int theDelay, inQ;
00249
00250
00251
00252
00253 myData.Lock();
00254 if (Active)
00255 {if (Active == RecvCnt)
00256 {if ((time(0)-lastTOut) >= repWait)
00257 {Silent++;
00258 if (Silent > nrMax)
00259 {Active = 0; Silent = 0; Suspend = 1;
00260 if (Link) Link->Close(1);
00261 } else if (Silent & 0x02 && repWait < repWMax) repWait++;
00262 }
00263 } else {Active = RecvCnt; Silent = 0; lastTOut = time(0);}
00264 }
00265
00266
00267
00268
00269 inQ = XrdCmsClientMsg::inQ();
00270 theDelay = inQ * qTime;
00271 myData.UnLock();
00272 theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0);
00273 if (theDelay < minDelay) return minDelay;
00274 if (theDelay > maxDelay) return maxDelay;
00275
00276
00277
00278 TRACE(Redirect, user <<" no resp from " <<HPfx <<"; inQ " <<inQ <<" wait " <<theDelay <<" path=" <<path);
00279 return theDelay;
00280 }
00281
00282
00283
00284
00285
00286
00287
00288
00289 int XrdCmsClientMan::Hookup()
00290 {
00291 EPNAME("Hookup");
00292 CmsLoginData Data;
00293 XrdLink *lp;
00294 char buff[256];
00295 int rc, oldWait, tries = 12, opts = 0;
00296
00297
00298
00299 manMutex.Lock();
00300 doDebug &= ~manMask;
00301 manMutex.UnLock();
00302
00303
00304
00305
00306 do {while(!(lp = XrdXrootdNetwork->Connect(Host, Port, opts)))
00307 {XrdSysTimer::Snooze(dally);
00308 if (tries--) opts = XRDNET_NOEMSG;
00309 else {opts = 0; tries = 12;}
00310 continue;
00311 }
00312 lp->Bind(XrdSysThread::ID());
00313 memset(&Data, 0, sizeof(Data));
00314 Data.Mode = CmsLoginData::kYR_director;
00315 Data.HoldTime = static_cast<int>(getpid());
00316 if (!(rc = XrdCmsLogin::Login(lp, Data))) break;
00317 lp->Close();
00318 XrdSysTimer::Snooze(dally);
00319 } while(1);
00320
00321
00322
00323 manMutex.Lock();
00324 doDebug |= (Data.Mode & CmsLoginData::kYR_debug ? manMask : 0);
00325 manMutex.UnLock();
00326
00327
00328
00329 myData.Lock();
00330 Link = lp;
00331 Active = 1;
00332 Silent = 0;
00333 RecvCnt = 1;
00334 SendCnt = 1;
00335 Suspend = (Data.Mode & CmsLoginData::kYR_suspend);
00336
00337
00338
00339
00340 if ((oldWait = (repWait*20/100)) < 2) oldWait = 2;
00341 if (Data.HoldTime > repWMax*1000) repWait = repWMax;
00342 else if (Data.HoldTime <= 0) repWait = repWMax;
00343 else {repWait = Data.HoldTime*3;
00344 repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0);
00345 if (repWait > repWMax) repWait = repWMax;
00346 else if (repWait < oldWait) repWait = oldWait;
00347 }
00348 qTime = (Data.HoldTime < 100 ? 100 : Data.HoldTime);
00349 lastTOut = time(0);
00350 myData.UnLock();
00351
00352
00353
00354 sprintf(buff, "v %d", Data.Version);
00355 Say.Emsg("ClientMan", (Suspend ? "Connected to suspended" : "Connected to"),
00356 Host, buff);
00357 DEBUG(Host <<" qt=" <<qTime <<"ms rw=" <<repWait);
00358 return 1;
00359 }
00360
00361
00362
00363
00364
00365 int XrdCmsClientMan::Receive()
00366 {
00367
00368
00369
00370
00371
00372 EPNAME("Receive")
00373 if (Link->RecvAll((char *)&Response, sizeof(Response)) > 0)
00374 {int dlen = static_cast<int>(ntohs(Response.datalen));
00375 RecvCnt++; NetBuff->dlen = dlen;
00376 DEBUG(Link->Name() <<' ' <<dlen <<" bytes on " <<Response.streamid);
00377 if (!dlen) return 1;
00378 if (dlen > NetBuff->BuffSize())
00379 Say.Emsg("ClientMan", "Excessive msg length from", Host);
00380 else return Link->RecvAll(NetBuff->data, dlen);
00381 }
00382 return 0;
00383 }
00384
00385
00386
00387
00388
00389 void XrdCmsClientMan::relayResp()
00390 {
00391 EPNAME("relayResp");
00392 XrdCmsResp *rp;
00393
00394
00395
00396 if (!(rp = RespQ.Rem(Response.streamid)))
00397 {DEBUG(Host <<" replied to non-existent request; id=" <<Response.streamid);
00398 return;
00399 }
00400
00401
00402
00403 rp->Reply(HPfx, Response, NetBuff);
00404
00405
00406
00407 NetBuff = BuffQ.Alloc();
00408 }
00409
00410
00411
00412
00413
00414 void XrdCmsClientMan::chkStatus()
00415 {
00416 static CmsUpdateRequest Updt = {{0, kYR_update, 0, 0}};
00417 time_t nowTime;
00418
00419
00420
00421 myData.Lock();
00422 if (!chkCount--)
00423 {chkCount = chkVal;
00424 nowTime = time(0);
00425 if ((nowTime - lastUpdt) >= 30)
00426 {lastUpdt = nowTime;
00427 if (Active) Link->Send((char *)&Updt, sizeof(Updt));
00428 }
00429 }
00430 myData.UnLock();
00431 }
00432
00433
00434
00435
00436
00437 void XrdCmsClientMan::setStatus()
00438 {
00439 EPNAME("setStatus");
00440 const char *State = 0, *Event = "?";
00441
00442
00443 myData.Lock();
00444 if (Response.modifier & CmsStatusRequest::kYR_Suspend)
00445 {Event = "suspend";
00446 if (!Suspend) {Suspend = 1; State = "suspended";}
00447 }
00448 else if (Response.modifier & CmsStatusRequest::kYR_Resume)
00449 {Event = "resume";
00450 if (Suspend) {Suspend = 0; State = "resumed";}
00451 }
00452 myData.UnLock();
00453
00454 DEBUG(Host <<" sent " <<Event <<" event");
00455 if (State) Say.Emsg("setStatus", "Manager", Host, State);
00456 }