00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "XrdProofdPlatform.h"
00023
00024 #ifdef OLDXRDOUC
00025 # include "XrdOuc/XrdOucError.hh"
00026 # include "XrdOuc/XrdOucLogger.hh"
00027 #else
00028 # include "XrdSys/XrdSysError.hh"
00029 # include "XrdSys/XrdSysLogger.hh"
00030 #endif
00031 #include "XrdSys/XrdSysPriv.hh"
00032 #include "XrdOuc/XrdOucStream.hh"
00033
00034 #include "XrdVersion.hh"
00035 #include "Xrd/XrdBuffer.hh"
00036 #include "XrdNet/XrdNetDNS.hh"
00037
00038 #include "XrdProofdClient.h"
00039 #include "XrdProofdClientMgr.h"
00040 #include "XrdProofdConfig.h"
00041 #include "XrdProofdManager.h"
00042 #include "XrdProofdNetMgr.h"
00043 #include "XrdProofdPriorityMgr.h"
00044 #include "XrdProofdProofServMgr.h"
00045 #include "XrdProofdProtocol.h"
00046 #include "XrdProofdResponse.h"
00047 #include "XrdProofdProofServ.h"
00048 #include "XrdProofSched.h"
00049
00050
00051 #include "XrdProofdTrace.h"
00052 XrdOucTrace *XrdProofdTrace = 0;
00053
00054
00055 static XrdSysLogger gMainLogger;
00056
00057
00058
00059 int XrdProofdProtocol::fgCount = 0;
00060 XrdObjectQ<XrdProofdProtocol>
00061 XrdProofdProtocol::fgProtStack("ProtStack",
00062 "xproofd protocol anchor");
00063 XrdSysRecMutex XrdProofdProtocol::fgBMutex;
00064 XrdBuffManager *XrdProofdProtocol::fgBPool = 0;
00065 int XrdProofdProtocol::fgMaxBuffsz= 0;
00066 XrdSysError XrdProofdProtocol::fgEDest(0, "xpd");
00067 XrdSysLogger *XrdProofdProtocol::fgLogger = 0;
00068
00069
00070 bool XrdProofdProtocol::fgConfigDone = 0;
00071
00072 int XrdProofdProtocol::fgReadWait = 0;
00073
00074 XrdProofdManager *XrdProofdProtocol::fgMgr = 0;
00075
00076
00077 int XrdProofdProtocol::fgEUidAtStartup = -1;
00078
00079
00080 #define MAX_ARGS 128
00081
00082
00083 #ifndef XPDCOND
00084 #define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns))
00085 #endif
00086 #ifndef XPDSETSTRING
00087 #define XPDSETSTRING(n,ns,c,s) \
00088 { if (XPDCOND(n,ns)) { \
00089 SafeFree(c); c = strdup(s.c_str()); ns = n; }}
00090 #endif
00091
00092 #ifndef XPDADOPTSTRING
00093 #define XPDADOPTSTRING(n,ns,c,s) \
00094 { char *t = 0; \
00095 XPDSETSTRING(n, ns, t, s); \
00096 if (t && strlen(t)) { \
00097 SafeFree(c); c = t; \
00098 } else \
00099 SafeFree(t); }
00100 #endif
00101
00102 #ifndef XPDSETINT
00103 #define XPDSETINT(n,ns,i,s) \
00104 { if (XPDCOND(n,ns)) { \
00105 i = strtol(s.c_str(),0,10); ns = n; }}
00106 #endif
00107
00108 typedef struct {
00109 kXR_int32 ptyp;
00110 kXR_int32 rlen;
00111 kXR_int32 pval;
00112 kXR_int32 styp;
00113 } hs_response_t;
00114
00115 typedef struct ResetCtrlcGuard {
00116 XrdProofdProtocol *xpd;
00117 int type;
00118 ResetCtrlcGuard(XrdProofdProtocol *p, int t) : xpd(p), type(t) { }
00119 ~ResetCtrlcGuard() { if (xpd && type != kXP_ctrlc) xpd->ResetCtrlC(); }
00120 } ResetCtrlcGuard_t;
00121
00122
00123
00124 class XrdProofdProtCfg : public XrdProofdConfig {
00125 public:
00126 int fPort;
00127 XrdProofdProtCfg(const char *cfg, XrdSysError *edest = 0);
00128 int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool);
00129 void RegisterDirectives();
00130 };
00131
00132
00133 XrdProofdProtCfg::XrdProofdProtCfg(const char *cfg, XrdSysError *edest)
00134 : XrdProofdConfig(cfg, edest)
00135 {
00136
00137
00138 fPort = -1;
00139 RegisterDirectives();
00140 }
00141
00142
00143 void XrdProofdProtCfg::RegisterDirectives()
00144 {
00145
00146
00147 Register("port", new XrdProofdDirective("port", this, &DoDirectiveClass));
00148 Register("xrd.protocol", new XrdProofdDirective("xrd.protocol", this, &DoDirectiveClass));
00149 }
00150
00151
00152 int XrdProofdProtCfg::DoDirective(XrdProofdDirective *d,
00153 char *val, XrdOucStream *cfg, bool)
00154 {
00155
00156
00157 if (!d) return -1;
00158
00159 XrdOucString port(val);
00160 if (d->fName == "xrd.protocol") {
00161 port = cfg->GetWord();
00162 port.replace("xproofd:", "");
00163 } else if (d->fName != "port") {
00164 return -1;
00165 }
00166 if (port.length() > 0) {
00167 fPort = strtol(port.c_str(), 0, 10);
00168 }
00169 fPort = (fPort < 0) ? XPD_DEF_PORT : fPort;
00170 return 0;
00171 }
00172
00173
00174 extern "C" {
00175
00176 XrdProtocol *XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
00177 {
00178
00179
00180
00181
00182
00183 if (XrdProofdProtocol::Configure(parms, pi)) {
00184
00185 return (XrdProtocol *) new XrdProofdProtocol();
00186 }
00187 return (XrdProtocol *)0;
00188 }
00189
00190
00191 int XrdgetProtocolPort(const char * , char * , XrdProtocol_Config *pi)
00192 {
00193
00194
00195
00196 XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
00197
00198 XrdProofdTrace = new XrdOucTrace(pi->eDest);
00199 pcfg.Config(0);
00200
00201
00202 int port = XPD_DEF_PORT;
00203
00204 if (pcfg.fPort > 0) {
00205 port = pcfg.fPort;
00206 } else {
00207 port = (pi && pi->Port > 0) ? pi->Port : XPD_DEF_PORT;
00208 }
00209
00210 return port;
00211 }}
00212
00213
00214 XrdProofdProtocol::XrdProofdProtocol()
00215 : XrdProtocol("xproofd protocol handler"), fProtLink(this)
00216 {
00217
00218 fLink = 0;
00219 fArgp = 0;
00220 fPClient = 0;
00221 fSecClient = 0;
00222 fAuthProt = 0;
00223 fResponses.reserve(10);
00224
00225
00226 Reset();
00227 }
00228
00229
00230 XrdProofdResponse *XrdProofdProtocol::Response(kXR_unt16 sid)
00231 {
00232
00233 XPDLOC(ALL, "Protocol::Response")
00234
00235 TRACE(HDBG, "sid: "<<sid<<", size: "<<fResponses.size());
00236
00237 if (sid > 0)
00238 if (sid <= fResponses.size())
00239 return fResponses[sid-1];
00240
00241 return (XrdProofdResponse *)0;
00242 }
00243
00244
00245 XrdProofdResponse *XrdProofdProtocol::GetNewResponse(kXR_unt16 sid)
00246 {
00247
00248 XPDLOC(ALL, "Protocol::GetNewResponse")
00249
00250 XrdOucString msg;
00251 XPDFORM(msg, "sid: %d", sid);
00252 if (sid > 0) {
00253 if (sid > fResponses.size()) {
00254 if (sid > fResponses.capacity()) {
00255 int newsz = (sid < 2 * fResponses.capacity()) ? 2 * fResponses.capacity() : sid+1 ;
00256 fResponses.reserve(newsz);
00257 if (TRACING(DBG)) {
00258 msg += " new capacity: ";
00259 msg += (int) fResponses.capacity();
00260 }
00261 }
00262 int nnew = sid - fResponses.size();
00263 while (nnew--)
00264 fResponses.push_back(new XrdProofdResponse());
00265 if (TRACING(DBG)) {
00266 msg += "; new size: ";
00267 msg += (int) fResponses.size();
00268 }
00269 }
00270 } else {
00271 TRACE(XERR,"wrong sid: "<<sid);
00272 return (XrdProofdResponse *)0;
00273 }
00274
00275 TRACE(DBG, msg);
00276
00277
00278 return fResponses[sid-1];
00279 }
00280
00281
00282 XrdProtocol *XrdProofdProtocol::Match(XrdLink *lp)
00283 {
00284
00285
00286 struct ClientInitHandShake hsdata;
00287 char *hsbuff = (char *)&hsdata;
00288
00289 static hs_response_t hsresp = {0, 0, htonl(XPROOFD_VERSBIN), 0};
00290
00291 XrdProofdProtocol *xp;
00292 int dlen;
00293
00294
00295 if ((dlen = lp->Peek(hsbuff,sizeof(hsdata),fgReadWait)) != sizeof(hsdata)) {
00296 if (dlen <= 0) lp->setEtext("Match: handshake not received");
00297 return (XrdProtocol *)0;
00298 }
00299
00300
00301 hsdata.third = ntohl(hsdata.third);
00302 if (dlen != sizeof(hsdata) || hsdata.first || hsdata.second
00303 || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) return 0;
00304
00305
00306 if (!lp->Send((char *)&hsresp, sizeof(hsresp))) {
00307 lp->setEtext("Match: handshake failed");
00308 return (XrdProtocol *)0;
00309 }
00310
00311
00312 int len = sizeof(hsdata);
00313 if (lp->Recv(hsbuff, len) != len) {
00314 lp->setEtext("Match: reread failed");
00315 return (XrdProtocol *)0;
00316 }
00317
00318
00319 if (!(xp = fgProtStack.Pop()))
00320 xp = new XrdProofdProtocol();
00321
00322
00323 xp->fLink = lp;
00324 strcpy(xp->fSecEntity.prot, "host");
00325 xp->fSecEntity.host = strdup((char *)lp->Host());
00326
00327
00328 kXR_int32 dum[2];
00329 if (xp->GetData("dummy",(char *)&dum[0],sizeof(dum)) != 0) {
00330 xp->Recycle(0,0,0);
00331 return (XrdProtocol *)0;
00332 }
00333
00334
00335 return (XrdProtocol *)xp;
00336 }
00337
00338
00339 int XrdProofdProtocol::Stats(char *buff, int blen, int)
00340 {
00341
00342
00343
00344 static char statfmt[] = "<stats id=\"xproofd\"><num>%ld</num></stats>";
00345
00346
00347 if (!buff)
00348 return sizeof(statfmt)+16;
00349
00350
00351 return snprintf(buff, blen, statfmt, fgCount);
00352 }
00353
00354
00355 void XrdProofdProtocol::Reset()
00356 {
00357
00358
00359
00360 fLink = 0;
00361 fPid = -1;
00362 fArgp = 0;
00363 fStatus = 0;
00364 fClntCapVer = 0;
00365 fConnType = kXPD_ClientMaster;
00366 fSuperUser = 0;
00367 fPClient = 0;
00368 fCID = -1;
00369 fTraceID = "";
00370 fAdminPath = "";
00371 if (fAuthProt) {
00372 fAuthProt->Delete();
00373 fAuthProt = 0;
00374 }
00375 memset(&fSecEntity, 0, sizeof(fSecEntity));
00376
00377 std::vector<XrdProofdResponse *>::iterator ii = fResponses.begin();
00378 while (ii != fResponses.end()) {
00379 delete *ii;
00380 ii++;
00381 }
00382 fResponses.clear();
00383 }
00384
00385
00386 int XrdProofdProtocol::Configure(char *, XrdProtocol_Config *pi)
00387 {
00388
00389
00390
00391 XPDLOC(ALL, "Protocol::Configure")
00392
00393 XrdOucString mp;
00394
00395
00396 if (fgConfigDone)
00397 return 1;
00398 fgConfigDone = 1;
00399
00400
00401 fgLogger = pi->eDest->logger();
00402 fgEDest.logger(fgLogger);
00403 if (XrdProofdTrace) delete XrdProofdTrace;
00404 XrdProofdTrace = new XrdOucTrace(&fgEDest);
00405 fgBPool = pi->BPool;
00406 fgReadWait = pi->readWait;
00407
00408
00409 fgMaxBuffsz = fgBPool->MaxSize();
00410
00411
00412
00413
00414 fgProtStack.Set(pi->Sched, XrdProofdTrace, TRACE_MEM);
00415 fgProtStack.Set((pi->ConnMax/3 ? pi->ConnMax/3 : 30), 60*60);
00416
00417
00418
00419
00420
00421 XrdProofdTrace->What = TRACE_DOMAINS;
00422 TRACESET(XERR, 1);
00423 TRACESET(LOGIN, 1);
00424 TRACESET(RSP, 0);
00425 if (pi->DebugON)
00426 XrdProofdTrace->What |= (TRACE_REQ | TRACE_FORK);
00427
00428
00429
00430 fgEUidAtStartup = geteuid();
00431 if (!getuid()) XrdSysPriv::ChangePerm((uid_t)0, (gid_t)0);
00432
00433
00434
00435 fgMgr = new XrdProofdManager(pi, &fgEDest);
00436 if (fgMgr->Config(0)) return 0;
00437 mp = "global manager created";
00438 TRACE(ALL, mp);
00439
00440
00441 TRACE(ALL, "xproofd protocol version "<<XPROOFD_VERSION<<
00442 " build "<<XrdVERSION<<" successfully loaded");
00443
00444
00445 return 1;
00446 }
00447
00448
00449 int XrdProofdProtocol::Process(XrdLink *)
00450 {
00451
00452
00453 XPDLOC(ALL, "Protocol::Process")
00454
00455 int rc = 0;
00456 TRACEP(this, DBG, "instance: " << this);
00457
00458
00459 if ((rc = GetData("request", (char *)&fRequest, sizeof(fRequest))) != 0)
00460 return rc;
00461 TRACEP(this, HDBG, "after GetData: rc: " << rc);
00462
00463
00464 fRequest.header.requestid = ntohs(fRequest.header.requestid);
00465 fRequest.header.dlen = ntohl(fRequest.header.dlen);
00466
00467
00468 kXR_unt16 sid;
00469 memcpy((void *)&sid, (const void *)&(fRequest.header.streamid[0]), 2);
00470 XrdProofdResponse *response = 0;
00471 if (!(response = Response(sid))) {
00472 if (!(response = GetNewResponse(sid))) {
00473 TRACEP(this, XERR, "could not get Response instance for rid: "<< sid);
00474 return rc;
00475 }
00476 }
00477
00478 response->Set(fRequest.header.streamid);
00479 response->Set(fLink);
00480
00481 TRACEP(this, REQ, "sid: " << sid << ", req id: " << fRequest.header.requestid <<
00482 " (" << XrdProofdAux::ProofRequestTypes(fRequest.header.requestid)<<
00483 ")" << ", dlen: " <<fRequest.header.dlen);
00484
00485
00486
00487 if (fRequest.header.dlen < 0) {
00488 response->Send(kXR_ArgInvalid, "Process: Invalid request data length");
00489 return fLink->setEtext("Process: protocol data length error");
00490 }
00491
00492
00493
00494
00495 if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) {
00496 if ((fArgp = GetBuff(fRequest.header.dlen+1, fArgp)) == 0) {
00497 response->Send(kXR_ArgTooLong, "fRequest.argument is too long");
00498 return rc;
00499 }
00500 if ((rc = GetData("arg", fArgp->buff, fRequest.header.dlen)))
00501 return rc;
00502 fArgp->buff[fRequest.header.dlen] = '\0';
00503 }
00504
00505
00506 return Process2();
00507 }
00508
00509
00510 int XrdProofdProtocol::Process2()
00511 {
00512
00513
00514 XPDLOC(ALL, "Protocol::Process2")
00515
00516 int rc = 0;
00517 XPD_SETRESP(this, "Process2");
00518
00519 TRACEP(this, REQ, "req id: " << fRequest.header.requestid << " (" <<
00520 XrdProofdAux::ProofRequestTypes(fRequest.header.requestid) << ")");
00521
00522 ResetCtrlcGuard_t ctrlcguard(this, fRequest.header.requestid);
00523
00524
00525 if (fStatus && (fStatus & XPD_LOGGEDIN)) {
00526
00527 TouchAdminPath();
00528
00529 if (!fPClient) {
00530 TRACEP(this, XERR, "client undefined!!! ");
00531 response->Send(kXR_InvalidRequest,"client undefined!!! ");
00532 return 0;
00533 }
00534 bool formgr = 0;
00535 switch(fRequest.header.requestid) {
00536 case kXP_ctrlc:
00537 rc = CtrlC();
00538 break;
00539 case kXP_touch:
00540
00541 fPClient->Touch(1);
00542 break;
00543 case kXP_interrupt:
00544 rc = Interrupt();
00545 break;
00546 case kXP_ping:
00547 rc = Ping();
00548 break;
00549 case kXP_sendmsg:
00550 rc = SendMsg();
00551 break;
00552 case kXP_urgent:
00553 rc = Urgent();
00554 break;
00555 default:
00556 formgr = 1;
00557 }
00558 if (!formgr) {
00559
00560 if (!fLink || (fLink->FDnum() <= 0)) {
00561 TRACE(XERR, "link is undefined! ");
00562 return -1;
00563 }
00564 return rc;
00565 }
00566 }
00567
00568
00569 rc = fgMgr->Process(this);
00570
00571 if (!fLink || (fLink->FDnum() <= 0)) {
00572 TRACE(XERR, "link is undefined! ");
00573 return -1;
00574 }
00575 return rc;
00576 }
00577
00578
00579 void XrdProofdProtocol::Recycle(XrdLink *, int, const char *)
00580 {
00581
00582 XPDLOC(ALL, "Protocol::Recycle")
00583
00584 const char *srvtype[6] = {"ANY", "MasterWorker", "MasterMaster",
00585 "ClientMaster", "Internal", "Admin"};
00586 XrdOucString buf;
00587
00588
00589 if (fPClient)
00590 XPDFORM(buf, "user %s disconnected; type: %s", fPClient->User(),
00591 srvtype[fConnType+1]);
00592 else
00593 XPDFORM(buf, "user disconnected; type: %s", srvtype[fConnType+1]);
00594 TRACEP(this, LOGIN, buf);
00595
00596
00597 if (fArgp) {
00598 fgBPool->Release(fArgp);
00599 fArgp = 0;
00600 }
00601
00602
00603 XrdProofdClient *pmgr = fPClient;
00604
00605 if (pmgr) {
00606
00607
00608 if (!Internal()) {
00609
00610
00611 if (fgMgr && fgMgr->ClientMgr()) {
00612 TRACE(HDBG, "fAdminPath: "<<fAdminPath);
00613 XPDFORM(buf, "%s %p %d %d", fAdminPath.c_str(), pmgr, fCID, fPid);
00614 TRACE(DBG, "sending to ClientMgr: "<<buf);
00615 fgMgr->ClientMgr()->Pipe()->Post(XrdProofdClientMgr::kClientDisconnect, buf.c_str());
00616 }
00617
00618 } else {
00619
00620
00621
00622
00623 if (fgMgr && fgMgr->SessionMgr()) {
00624 TRACE(HDBG, "fAdminPath: "<<fAdminPath);
00625 buf.assign(fAdminPath, fAdminPath.rfind('/') + 1, -1);
00626 TRACE(DBG, "sending to ProofServMgr: "<<buf);
00627 fgMgr->SessionMgr()->Pipe()->Post(XrdProofdProofServMgr::kSessionRemoval, buf.c_str());
00628 }
00629 }
00630 }
00631
00632
00633 Reset();
00634
00635
00636 fgProtStack.Push(&fProtLink);
00637 }
00638
00639
00640 XrdBuffer *XrdProofdProtocol::GetBuff(int quantum, XrdBuffer *argp)
00641 {
00642
00643
00644 XPDLOC(ALL, "Protocol::GetBuff")
00645
00646 TRACE(HDBG, "len: "<<quantum);
00647
00648
00649
00650 if (argp) {
00651 if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
00652 return argp;
00653 }
00654
00655
00656 XrdSysMutexHelper mh(fgBMutex);
00657 if (argp)
00658 fgBPool->Release(argp);
00659
00660
00661 if ((argp = fgBPool->Obtain(quantum)) == 0) {
00662 TRACE(XERR, "could not get requested buffer (size: "<<quantum<<
00663 ") = insufficient memory");
00664 } else {
00665 TRACE(HDBG, "quantum: "<<quantum<<
00666 ", buff: "<<(void *)(argp->buff)<<", bsize:"<<argp->bsize);
00667 }
00668
00669
00670 return argp;
00671 }
00672
00673
00674 void XrdProofdProtocol::ReleaseBuff(XrdBuffer *argp)
00675 {
00676
00677
00678 XrdSysMutexHelper mh(fgBMutex);
00679 fgBPool->Release(argp);
00680 }
00681
00682
00683 int XrdProofdProtocol::GetData(const char *dtype, char *buff, int blen)
00684 {
00685
00686 XPDLOC(ALL, "Protocol::GetData")
00687
00688 int rlen;
00689
00690
00691
00692 TRACEP(this, HDBG, "dtype: "<<(dtype ? dtype : " - ")<<", blen: "<<blen);
00693
00694
00695 rlen = fLink->Recv(buff, blen, fgReadWait);
00696 if (rlen < 0) {
00697 if (rlen != -ENOMSG && rlen != -ECONNRESET) {
00698 XrdOucString emsg = "link read error: errno: ";
00699 emsg += -rlen;
00700 TRACEP(this, XERR, emsg.c_str());
00701 return (fLink ? fLink->setEtext(emsg.c_str()) : -1);
00702 } else {
00703 TRACEP(this, HDBG, "connection closed by peer (errno: "<<-rlen<<")");
00704 return -1;
00705 }
00706 }
00707 if (rlen < blen) {
00708 TRACEP(this, DBG, dtype << " timeout; read " <<rlen <<" of " <<blen <<" bytes - rescheduling");
00709 return 1;
00710 }
00711 TRACEP(this, HDBG, "rlen: "<<rlen);
00712
00713 return 0;
00714 }
00715
00716
00717 int XrdProofdProtocol::SendData(XrdProofdProofServ *xps,
00718 kXR_int32 sid, XrdSrvBuffer **buf, bool savebuf)
00719 {
00720
00721 XPDLOC(ALL, "Protocol::SendData")
00722
00723 int rc = 0;
00724
00725 TRACEP(this, HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
00726
00727
00728 int len = fRequest.header.dlen;
00729
00730
00731 int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
00732
00733
00734 XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
00735 if (!argp) return -1;
00736
00737
00738 XrdOucString msg;
00739 while (len > 0) {
00740
00741 XrdProofdResponse *response = (sid > -1) ? xps->Response() : 0;
00742
00743 if ((rc = GetData("data", argp->buff, quantum))) {
00744 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
00745 return -1;
00746 }
00747 if (buf && !(*buf) && savebuf)
00748 *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
00749
00750 if (sid > -1) {
00751 if (TRACING(HDBG))
00752 XPDFORM(msg, "EXT: server ID: %d, sending: %d bytes", sid, quantum);
00753 if (!response || response->Send(kXR_attn, kXPD_msgsid, sid,
00754 argp->buff, quantum) != 0) {
00755 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
00756 XPDFORM(msg, "EXT: server ID: %d, problems sending: %d bytes to server",
00757 sid, quantum);
00758 TRACEP(this, XERR, msg);
00759 return -1;
00760 }
00761 } else {
00762
00763
00764 int cid = ntohl(fRequest.sendrcv.cid);
00765 if (TRACING(HDBG))
00766 XPDFORM(msg, "INT: client ID: %d, sending: %d bytes", cid, quantum);
00767 if (xps->SendData(cid, argp->buff, quantum) != 0) {
00768 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
00769 XPDFORM(msg, "INT: client ID: %d, problems sending: %d bytes to client",
00770 cid, quantum);
00771 TRACEP(this, XERR, msg);
00772 return -1;
00773 }
00774 }
00775 TRACEP(this, HDBG, msg);
00776
00777 len -= quantum;
00778 if (len < quantum)
00779 quantum = len;
00780 }
00781
00782
00783 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
00784
00785
00786 return 0;
00787 }
00788
00789
00790 int XrdProofdProtocol::SendDataN(XrdProofdProofServ *xps,
00791 XrdSrvBuffer **buf, bool savebuf)
00792 {
00793
00794
00795
00796 XPDLOC(ALL, "Protocol::SendDataN")
00797
00798 int rc = 0;
00799
00800 TRACEP(this, HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
00801
00802
00803 int len = fRequest.header.dlen;
00804
00805
00806 int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
00807
00808
00809 XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
00810 if (!argp) return -1;
00811
00812
00813 while (len > 0) {
00814 if ((rc = GetData("data", argp->buff, quantum))) {
00815 XrdProofdProtocol::ReleaseBuff(argp);
00816 return -1;
00817 }
00818 if (buf && !(*buf) && savebuf)
00819 *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
00820
00821
00822 if (xps->SendDataN(argp->buff, quantum) != 0) {
00823 XrdProofdProtocol::ReleaseBuff(argp);
00824 return -1;
00825 }
00826
00827
00828 len -= quantum;
00829 if (len < quantum)
00830 quantum = len;
00831 }
00832
00833
00834 XrdProofdProtocol::ReleaseBuff(argp);
00835
00836
00837 return 0;
00838 }
00839
00840
00841 int XrdProofdProtocol::SendMsg()
00842 {
00843
00844 XPDLOC(ALL, "Protocol::SendMsg")
00845
00846 static const char *crecv[5] = {"master proofserv", "top master",
00847 "client", "undefined", "any"};
00848 int rc = 0;
00849
00850 XPD_SETRESP(this, "SendMsg");
00851
00852
00853 int psid = ntohl(fRequest.sendrcv.sid);
00854 int opt = ntohl(fRequest.sendrcv.opt);
00855
00856 XrdOucString msg;
00857
00858 XrdProofdProofServ *xps = 0;
00859 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
00860 XPDFORM(msg, "%s: session ID not found: %d", (Internal() ? "INT" : "EXT"), psid);
00861 TRACEP(this, XERR, msg.c_str());
00862 response->Send(kXR_InvalidRequest, msg.c_str());
00863 return 0;
00864 }
00865
00866
00867 int len = fRequest.header.dlen;
00868
00869 if (!Internal()) {
00870
00871 if (TRACING(HDBG)) {
00872
00873 XPDFORM(msg, "EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d,"
00874 " cid: %d)", len, psid, xps, xps->Status(), fCID);
00875 TRACEP(this, HDBG, msg.c_str());
00876 }
00877
00878
00879 if (fCID == -1) {
00880 TRACEP(this, REQ, "EXT: error getting clientSID");
00881 response->Send(kXP_ServerError,"EXT: getting clientSID");
00882 return 0;
00883 }
00884 if (SendData(xps, fCID)) {
00885 TRACEP(this, REQ, "EXT: error sending message to proofserv");
00886 response->Send(kXP_reconnecting,"EXT: sending message to proofserv");
00887 return 0;
00888 }
00889
00890
00891 response->Send();
00892
00893 } else {
00894
00895 if (TRACING(HDBG)) {
00896
00897 XPDFORM(msg, "INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
00898 len, psid, xps, xps->Status());
00899 TRACEP(this, HDBG, msg.c_str());
00900 }
00901 bool saveStartMsg = 0;
00902 XrdSrvBuffer *savedBuf = 0;
00903
00904 if (opt & kXPD_setidle) {
00905 TRACEP(this, DBG, "INT: setting proofserv in 'idle' state");
00906 xps->SetStatus(kXPD_idle);
00907 PostSession(-1, fPClient->UI().fUser.c_str(),
00908 fPClient->UI().fGroup.c_str(), xps);
00909 } else if (opt & kXPD_querynum) {
00910 TRACEP(this, DBG, "INT: got message with query number");
00911 } else if (opt & kXPD_startprocess) {
00912 TRACEP(this, DBG, "INT: setting proofserv in 'running' state");
00913 xps->SetStatus(kXPD_running);
00914 PostSession(1, fPClient->UI().fUser.c_str(),
00915 fPClient->UI().fGroup.c_str(), xps);
00916
00917 xps->DeleteStartMsg();
00918 saveStartMsg = 1;
00919 } else if (opt & kXPD_logmsg) {
00920
00921
00922 if (xps->Status() == kXPD_running) {
00923 TRACEP(this, DBG, "INT: broadcasting log message");
00924 opt |= kXPD_fb_prog;
00925 }
00926 }
00927 bool fbprog = (opt & kXPD_fb_prog);
00928
00929 if (!fbprog) {
00930
00931
00932 if (SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
00933 response->Send(kXP_reconnecting,
00934 "SendMsg: INT: session is reconnecting: retry later");
00935 return 0;
00936 }
00937 } else {
00938
00939 if (SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
00940 response->Send(kXP_reconnecting,
00941 "SendMsg: INT: session is reconnecting: retry later");
00942 return 0;
00943 }
00944 }
00945
00946 if (saveStartMsg)
00947 xps->SetStartMsg(savedBuf);
00948
00949 if (TRACING(DBG)) {
00950 int ii = xps->SrvType();
00951 if (ii > 3) ii = 3;
00952 if (ii < 0) ii = 4;
00953 XPDFORM(msg, "INT: message sent to %s (%d bytes)", crecv[ii], len);
00954 TRACEP(this, DBG, msg);
00955 }
00956
00957 response->Send();
00958 }
00959
00960
00961 return 0;
00962 }
00963
00964
00965 int XrdProofdProtocol::Urgent()
00966 {
00967
00968 XPDLOC(ALL, "Protocol::Urgent")
00969
00970 unsigned int rc = 0;
00971
00972 XPD_SETRESP(this, "Urgent");
00973
00974
00975 int psid = ntohl(fRequest.proof.sid);
00976 int type = ntohl(fRequest.proof.int1);
00977 int int1 = ntohl(fRequest.proof.int2);
00978 int int2 = ntohl(fRequest.proof.int3);
00979
00980 TRACEP(this, REQ, "psid: "<<psid<<", type: "<< type);
00981
00982
00983 XrdProofdProofServ *xps = 0;
00984 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
00985 TRACEP(this, XERR, "session ID not found: "<<psid);
00986 response->Send(kXR_InvalidRequest,"Urgent: session ID not found");
00987 return 0;
00988 }
00989
00990 TRACEP(this, DBG, "xps: "<<xps<<", status: "<<xps->Status());
00991
00992
00993 if (!xps->Match(psid)) {
00994 response->Send(kXP_InvalidRequest,"Urgent: IDs do not match - do nothing");
00995 return 0;
00996 }
00997
00998
00999 if (!xps->Response()) {
01000 response->Send(kXP_InvalidRequest,"Urgent: session response object undefined - do nothing");
01001 return 0;
01002 }
01003
01004
01005 int len = 3 *sizeof(kXR_int32);
01006 char *buf = new char[len];
01007
01008 kXR_int32 itmp = static_cast<kXR_int32>(htonl(type));
01009 memcpy(buf, &itmp, sizeof(kXR_int32));
01010
01011 itmp = static_cast<kXR_int32>(htonl(int1));
01012 memcpy(buf + sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
01013
01014 itmp = static_cast<kXR_int32>(htonl(int2));
01015 memcpy(buf + 2 * sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
01016
01017 if (xps->Response()->Send(kXR_attn, kXPD_urgent, buf, len) != 0) {
01018 response->Send(kXP_ServerError,
01019 "Urgent: could not propagate request to proofsrv");
01020 return 0;
01021 }
01022
01023
01024 response->Send();
01025 TRACEP(this, DBG, "request propagated to proofsrv");
01026
01027
01028 return 0;
01029 }
01030
01031
01032 int XrdProofdProtocol::Interrupt()
01033 {
01034
01035 XPDLOC(ALL, "Protocol::Interrupt")
01036
01037 int rc = 0;
01038
01039 XPD_SETRESP(this, "Interrupt");
01040
01041
01042 int psid = ntohl(fRequest.interrupt.sid);
01043 int type = ntohl(fRequest.interrupt.type);
01044 TRACEP(this, REQ, "psid: "<<psid<<", type:"<<type);
01045
01046
01047 XrdProofdProofServ *xps = 0;
01048 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
01049 TRACEP(this, XERR, "session ID not found: "<<psid);
01050 response->Send(kXR_InvalidRequest,"Interrupt: session ID not found");
01051 return 0;
01052 }
01053
01054 if (xps) {
01055
01056
01057 if (!xps->Match(psid)) {
01058 response->Send(kXP_InvalidRequest,"Interrupt: IDs do not match - do nothing");
01059 return 0;
01060 }
01061
01062 XrdOucString msg;
01063 XPDFORM(msg, "xps: %p, link ID: %s, proofsrv PID: %d",
01064 xps, xps->Response()->TraceID(), xps->SrvPID());
01065 TRACEP(this, DBG, msg.c_str());
01066
01067
01068 if (xps->Response()->Send(kXR_attn, kXPD_interrupt, type) != 0) {
01069 response->Send(kXP_ServerError,
01070 "Interrupt: could not propagate interrupt code to proofsrv");
01071 return 0;
01072 }
01073
01074
01075 response->Send();
01076 TRACEP(this, DBG, "interrupt propagated to proofsrv");
01077 }
01078
01079
01080 return 0;
01081 }
01082
01083
01084 int XrdProofdProtocol::Ping()
01085 {
01086
01087
01088
01089
01090 XPDLOC(ALL, "Protocol::Ping")
01091
01092 int rc = 0;
01093 if (Internal()) {
01094 if (TRACING(HDBG)) {
01095 XPD_SETRESP(this, "Ping");
01096 TRACEP(this, HDBG, "INT: nothing to do ");
01097 }
01098 return 0;
01099 }
01100 XPD_SETRESP(this, "Ping");
01101
01102
01103 int psid = ntohl(fRequest.sendrcv.sid);
01104 int asyncopt = ntohl(fRequest.sendrcv.opt);
01105
01106 TRACEP(this, REQ, "psid: "<<psid<<", async: "<<asyncopt);
01107
01108
01109
01110 XrdProofdProofServ *xps = 0;
01111 if (!fPClient || (psid > -1 && !(xps = fPClient->GetServer(psid)))) {
01112 TRACEP(this, XERR, "session ID not found: "<<psid);
01113 response->Send(kXR_InvalidRequest,"session ID not found");
01114 return 0;
01115 }
01116
01117
01118 kXR_int32 pingres = (psid > -1) ? 0 : 1;
01119 if (psid > -1 && xps && xps->IsValid()) {
01120
01121 TRACEP(this, DBG, "EXT: psid: "<<psid);
01122
01123
01124 kXR_int32 checkfq = fgMgr->SessionMgr()->CheckFrequency();
01125
01126
01127 if (asyncopt == 1) {
01128 TRACEP(this, DBG, "EXT: async: notifying timeout to client: "<<checkfq<<" secs");
01129 response->Send(kXR_ok, checkfq);
01130 }
01131
01132
01133 XrdOucString path(xps->AdminPath());
01134 if (path.length() <= 0) {
01135 TRACEP(this, XERR, "EXT: admin path is empty! - protocol error");
01136 if (asyncopt == 0)
01137 response->Send(kXP_ServerError, "EXT: admin path is empty! - protocol error");
01138 return 0;
01139 }
01140 path += ".status";
01141
01142
01143 int now = time(0);
01144
01145
01146 struct stat st0;
01147 if (stat(path.c_str(), &st0) != 0) {
01148 TRACEP(this, XERR, "EXT: cannot stat admin path: "<<path);
01149 if (asyncopt == 0)
01150 response->Send(kXP_ServerError, "EXT: cannot stat admin path");
01151 return 0;
01152 }
01153
01154
01155 int pid = xps->SrvPID();
01156
01157 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
01158
01159 if ((now - st0.st_mtime) > checkfq - 5) {
01160
01161 if (xps->VerifyProofServ(1) != 0) {
01162 TRACEP(this, XERR, "EXT: could not send verify request to proofsrv");
01163 if (asyncopt == 0)
01164 response->Send(kXP_ServerError, "EXT: could not verify reuqest to proofsrv");
01165 return 0;
01166 }
01167
01168 struct stat st1;
01169 int ns = checkfq;
01170 while (ns--) {
01171 if (stat(path.c_str(), &st1) == 0) {
01172 if (st1.st_mtime > st0.st_mtime) {
01173 pingres = 1;
01174 break;
01175 }
01176 }
01177
01178 TRACEP(this, DBG, "EXT: waiting "<<ns<<" secs for session "<<pid<<
01179 " to touch the admin path");
01180 sleep(1);
01181 }
01182
01183 } else {
01184
01185 pingres = 1;
01186 }
01187 } else {
01188
01189 pingres = 0;
01190 }
01191
01192
01193 TRACEP(this, DBG, "EXT: notified the result to client: "<<pingres);
01194 if (asyncopt == 0) {
01195 response->Send(kXR_ok, pingres);
01196 } else {
01197
01198 int len = sizeof(kXR_int32);
01199 char *buf = new char[len];
01200
01201 kXR_int32 ifw = (kXR_int32)0;
01202 ifw = static_cast<kXR_int32>(htonl(ifw));
01203 memcpy(buf, &ifw, sizeof(kXR_int32));
01204 response->Send(kXR_attn, kXPD_ping, buf, len);
01205 }
01206 return 0;
01207 } else if (psid > -1) {
01208
01209 TRACEP(this, XERR, "session ID not found: "<<psid);
01210 }
01211
01212
01213 response->Send(kXR_ok, pingres);
01214
01215
01216 return 0;
01217 }
01218
01219
01220 void XrdProofdProtocol::PostSession(int on, const char *u, const char *g,
01221 XrdProofdProofServ *xps)
01222 {
01223
01224 XPDLOC(ALL, "Protocol::PostSession")
01225
01226
01227 if (fgMgr && fgMgr->PriorityMgr()) {
01228 int pid = (xps) ? xps->SrvPID() : -1;
01229 if (pid < 0) {
01230 TRACE(XERR, "undefined session or process id");
01231 return;
01232 }
01233 XrdOucString buf;
01234 XPDFORM(buf, "%d %s %s %d", on, u, g, pid);
01235
01236 if (fgMgr->PriorityMgr()->Pipe()->Post(XrdProofdPriorityMgr::kChangeStatus,
01237 buf.c_str()) != 0) {
01238 TRACE(XERR, "problem posting the prority manager pipe");
01239 }
01240 }
01241
01242 if (fgMgr && fgMgr->ProofSched()) {
01243 if (on == -1 && xps && xps->SrvType() == kXPD_TopMaster) {
01244 TRACE(DBG, "posting the scheduler pipe");
01245 if (fgMgr->ProofSched()->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
01246 TRACE(XERR, "problem posting the scheduler pipe");
01247 }
01248 }
01249 }
01250
01251 if (fgMgr && fgMgr->SessionMgr()) {
01252 if (fgMgr->SessionMgr()->Pipe()->Post(XrdProofdProofServMgr::kChgSessionSt, 0) != 0) {
01253 TRACE(XERR, "problem posting the session manager pipe");
01254 }
01255 }
01256
01257 return;
01258 }
01259
01260
01261 void XrdProofdProtocol::TouchAdminPath()
01262 {
01263
01264 XPDLOC(ALL, "Protocol::TouchAdminPath")
01265
01266 XPD_SETRESPV(this, "TouchAdminPath");
01267 TRACEP(this, HDBG, fAdminPath);
01268
01269 if (fAdminPath.length() > 0) {
01270 int rc = 0;
01271 if ((rc = XrdProofdAux::Touch(fAdminPath.c_str())) != 0) {
01272
01273
01274
01275 XrdOucString apath = fAdminPath;
01276 if (rc == -ENOENT && Internal()) {
01277 apath.replace("/activesessions/", "/terminatedsessions/");
01278 apath.replace(".status", "");
01279 rc = XrdProofdAux::Touch(apath.c_str());
01280 }
01281 if (rc != 0) {
01282 const char *type = Internal() ? "internal" : "external";
01283 TRACEP(this, XERR, type<<": problems touching "<<apath<<"; errno: "<<-rc);
01284 }
01285 }
01286 }
01287
01288 return;
01289 }
01290
01291
01292 int XrdProofdProtocol::CtrlC()
01293 {
01294
01295 XPDLOC(ALL, "Protocol::CtrlC")
01296
01297 TRACEP(this, ALL, "handling request");
01298
01299 { XrdSysMutexHelper mhp(fCtrlcMutex);
01300 fIsCtrlC = 1;
01301 }
01302
01303
01304 if (fgMgr) {
01305 if (fgMgr->SrvType() != kXPD_Worker) {
01306 if (fgMgr->NetMgr()) {
01307 fgMgr->NetMgr()->BroadcastCtrlC(Client()->User());
01308 }
01309 }
01310 }
01311
01312
01313 return 0;
01314 }