00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "MessageTypes.h"
00021 #include "TEnv.h"
00022 #include "TError.h"
00023 #include "TException.h"
00024 #include "TMonitor.h"
00025 #include "TObjString.h"
00026 #include "TProof.h"
00027 #include "TSlave.h"
00028 #include "TRegexp.h"
00029 #include "TROOT.h"
00030 #include "TUrl.h"
00031 #include "TXHandler.h"
00032 #include "TXSocket.h"
00033 #include "XProofProtocol.h"
00034
00035 #include "XrdProofConn.h"
00036
00037 #include "XrdClient/XrdClientConnMgr.hh"
00038 #include "XrdClient/XrdClientConst.hh"
00039 #include "XrdClient/XrdClientEnv.hh"
00040 #include "XrdClient/XrdClientLogConnection.hh"
00041 #include "XrdClient/XrdClientMessage.hh"
00042
00043 #ifndef WIN32
00044 #include <sys/socket.h>
00045 #else
00046 #include <Winsock2.h>
00047 #endif
00048
00049
00050 #ifdef OLDXRDOUC
00051 # include "XrdSysToOuc.h"
00052 # include "XrdOuc/XrdOucError.hh"
00053 # include "XrdOuc/XrdOucLogger.hh"
00054 #else
00055 # include "XrdSys/XrdSysError.hh"
00056 # include "XrdSys/XrdSysLogger.hh"
00057 #endif
00058 #include "XrdProofdTrace.h"
00059 XrdOucTrace *XrdProofdTrace = 0;
00060 static XrdSysLogger eLogger;
00061 static XrdSysError eDest(0, "Proofx");
00062
00063 #ifdef WIN32
00064 ULong64_t TSocket::fgBytesSent;
00065 ULong64_t TSocket::fgBytesRecv;
00066 #endif
00067
00068
00069
00070
00071
00072
00073 void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
00074 {
00075
00076
00077 ::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
00078 }
00079
00080
00081
00082 class TXSocketPingHandler : public TFileHandler {
00083 TXSocket *fSocket;
00084 public:
00085 TXSocketPingHandler(TXSocket *s, Int_t fd)
00086 : TFileHandler(fd, 1) { fSocket = s; }
00087 Bool_t Notify();
00088 Bool_t ReadNotify() { return Notify(); }
00089 };
00090
00091
00092 Bool_t TXSocketPingHandler::Notify()
00093 {
00094
00095 fSocket->Ping("ping handler");
00096
00097 return kTRUE;
00098 }
00099
00100
00101 Bool_t TXSocket::fgInitDone = kFALSE;
00102
00103
00104 TXSockPipe TXSocket::fgPipe;
00105 TString TXSocket::fgLoc = "undef";
00106
00107
00108 TMutex TXSocket::fgSMtx;
00109 std::list<TXSockBuf *> TXSocket::fgSQue;
00110 Long64_t TXSockBuf::fgBuffMem = 0;
00111 Long64_t TXSockBuf::fgMemMax = 10485760;
00112
00113
00114 TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
00115 const char *logbuf, Int_t loglevel, TXHandler *handler)
00116 : TSocket(), fMode(m), fLogLevel(loglevel),
00117 fBuffer(logbuf), fASem(0), fAsynProc(1),
00118 fDontTimeout(kFALSE), fRDInterrupt(kFALSE), fXrdProofdVersion(-1)
00119 {
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135 fUrl = url;
00136
00137 eDest.logger(&eLogger);
00138 if (!XrdProofdTrace)
00139 XrdProofdTrace = new XrdOucTrace(&eDest);
00140
00141
00142 if (!fgInitDone)
00143 InitEnvs();
00144
00145
00146 if (!(fAMtx = new TMutex(kTRUE))) {
00147 Error("TXSocket", "problems initializing mutex for async queue");
00148 return;
00149 }
00150 fAQue.clear();
00151
00152
00153 if (!(fIMtx = new TMutex(kTRUE))) {
00154 Error("TXSocket", "problems initializing mutex for interrupts");
00155 return;
00156 }
00157 fILev = -1;
00158 fIForward = kFALSE;
00159
00160
00161 fByteLeft = 0;
00162 fByteCur = 0;
00163 fBufCur = 0;
00164 fServType = kPROOFD;
00165 fTcpWindowSize = -1;
00166 fRemoteProtocol = -1;
00167
00168 fSendOpt = (fMode == 'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
00169 fSessionID = (fMode == 'C') ? -1 : psid;
00170 fSocket = -1;
00171
00172
00173
00174 fReference = 0;
00175
00176
00177 if (!fgPipe.IsValid()) {
00178 Error("TXSocket", "internal pipe is invalid");
00179 return;
00180 }
00181
00182
00183 TUrl u(url);
00184 fAddress = gSystem->GetHostByName(u.GetHost());
00185 u.SetProtocol("proof", kTRUE);
00186 fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
00187
00188
00189 fHandler = handler;
00190
00191 if (url) {
00192
00193
00194
00195 char md = (fMode !='A' && fMode !='C') ? fMode : 'M';
00196 fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
00197 if (!fConn || !(fConn->IsValid())) {
00198 if (fConn->GetServType() != XrdProofConn::kSTProofd)
00199 if (gDebug > 0)
00200 Error("TXSocket", "fatal error occurred while opening a connection"
00201 " to server [%s]: %s", url, fConn->GetLastErr());
00202 return;
00203 }
00204
00205
00206 if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
00207
00208 if (!Create()) {
00209
00210 Error("TXSocket", "create or attach failed (%s)",
00211 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
00212 Close();
00213 return;
00214 }
00215 }
00216
00217
00218 fUser = fConn->fUser.c_str();
00219 fHost = fConn->fHost.c_str();
00220 fPort = fConn->fPort;
00221 if (fMode == 'C') {
00222 fXrdProofdVersion = fConn->fRemoteProtocol;
00223 fRemoteProtocol = fConn->fRemoteProtocol;
00224 }
00225
00226
00227 fUrl = fConn->fUrl.GetUrl().c_str();
00228 fAddress = gSystem->GetHostByName(fConn->fUrl.Host.c_str());
00229 fAddress.fPort = fPort;
00230
00231
00232 fPid = gSystem->GetPid();
00233 }
00234 }
00235
00236
00237 TXSocket::TXSocket(const TXSocket &s) : TSocket(s),XrdClientAbsUnsolMsgHandler(s)
00238 {
00239
00240 }
00241
00242
00243 TXSocket& TXSocket::operator=(const TXSocket&)
00244 {
00245
00246 return *this;
00247 }
00248
00249
00250 TXSocket::~TXSocket()
00251 {
00252
00253
00254
00255
00256
00257 Close();
00258
00259
00260 SafeDelete(fAMtx);
00261 SafeDelete(fIMtx);
00262 }
00263
00264
00265 void TXSocket::SetLocation(const char *loc)
00266 {
00267
00268
00269 if (loc) {
00270 fgLoc = loc;
00271 fgPipe.SetLoc(loc);
00272 } else {
00273 fgLoc = "";
00274 fgPipe.SetLoc("");
00275 }
00276 }
00277
00278
00279 void TXSocket::SetSessionID(Int_t id)
00280 {
00281
00282
00283 if (id < 0 && fConn)
00284 fConn->SetAsync(0);
00285 fSessionID = id;
00286 }
00287
00288
00289 void TXSocket::DisconnectSession(Int_t id, Option_t *opt)
00290 {
00291
00292
00293
00294
00295
00296 if (!IsValid()) {
00297 if (gDebug > 0)
00298 Info("DisconnectSession","not connected: nothing to do");
00299 return;
00300 }
00301
00302 Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
00303 Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
00304
00305 if (id > -1 || all) {
00306
00307 XPClientRequest Request;
00308 memset(&Request, 0, sizeof(Request) );
00309 fConn->SetSID(Request.header.streamid);
00310 if (shutdown)
00311 Request.proof.requestid = kXP_destroy;
00312 else
00313 Request.proof.requestid = kXP_detach;
00314 Request.proof.sid = id;
00315
00316
00317 XrdClientMessage *xrsp =
00318 fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
00319
00320
00321 if (!xrsp && fConn->GetLastErr())
00322 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
00323
00324
00325 SafeDelete(xrsp);
00326 }
00327 }
00328
00329
00330 void TXSocket::Close(Option_t *opt)
00331 {
00332
00333
00334
00335
00336
00337
00338 Int_t to = gEnv->GetValue("XProof.AsynProcSemTimeout", 60);
00339 if (fAsynProc.Wait(to*1000) != 0)
00340 Warning("Close", "could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
00341
00342
00343 TXSocket::fgPipe.Flush(this);
00344
00345
00346 if (!fConn) {
00347 if (gDebug > 0)
00348 Info("Close","no connection: nothing to do");
00349 fAsynProc.Post();
00350 return;
00351 }
00352
00353
00354 fConn->SetAsync(0);
00355
00356
00357 if (IsValid()) {
00358
00359
00360 TString o(opt);
00361 Int_t sessID = fSessionID;
00362 if (o.Index("#") != kNPOS) {
00363 o.Remove(0,o.Index("#")+1);
00364 if (o.Index("#") != kNPOS) {
00365 o.Remove(o.Index("#"));
00366 sessID = o.IsDigit() ? o.Atoi() : sessID;
00367 }
00368 }
00369
00370 if (sessID > -1) {
00371
00372 DisconnectSession(sessID, opt);
00373 } else {
00374
00375 fConn->Close(opt);
00376 }
00377 }
00378
00379
00380 SafeDelete(fConn);
00381
00382
00383 fAsynProc.Post();
00384 }
00385
00386
00387 UnsolRespProcResult TXSocket::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *,
00388 XrdClientMessage *m)
00389 {
00390
00391
00392
00393
00394
00395 UnsolRespProcResult rc = kUNSOL_KEEP;
00396
00397
00398 TXSemaphoreGuard semg(&fAsynProc);
00399 if (!semg.IsValid()) {
00400 Error("ProcessUnsolicitedMsg", "%p: async semaphore taken by Close()! Should not be here!", this);
00401 return kUNSOL_CONTINUE;
00402 }
00403
00404 if (!m) {
00405 if (gDebug > 2)
00406 Info("ProcessUnsolicitedMsg", "%p: got empty message: skipping", this);
00407
00408 return kUNSOL_CONTINUE;
00409 } else {
00410 if (gDebug > 2)
00411 Info("ProcessUnsolicitedMsg", "%p: got message with status: %d, len: %d bytes (ID: %d)",
00412 this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
00413 }
00414
00415
00416 if (m->IsError()) {
00417 if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
00418 if (gDebug > 0)
00419 Info("ProcessUnsolicitedMsg","%p: got error from underlying connection", this);
00420 XHandleErr_t herr = {1, 0};
00421 if (!fHandler || fHandler->HandleError((const void *)&herr)) {
00422 if (gDebug > 0)
00423 Info("ProcessUnsolicitedMsg","%p: handler undefined or recovery failed", this);
00424
00425 fSessionID = -1;
00426 } else {
00427
00428 Touch();
00429 }
00430 } else {
00431
00432 if (gDebug > 2)
00433 Info("ProcessUnsolicitedMsg", "%p: underlying connection timed out", this);
00434 }
00435
00436 return kUNSOL_CONTINUE;
00437 }
00438
00439
00440 if (!fConn || !m->MatchStreamid(fConn->fStreamid)) {
00441 if (gDebug > 1)
00442 Info("ProcessUnsolicitedMsg", "%p: IDs do not match: {%d, %d}", this, fConn->fStreamid, m->HeaderSID());
00443 return kUNSOL_CONTINUE;
00444 }
00445
00446
00447 if (!m) {
00448 Error("ProcessUnsolicitedMsg", "undefined message - disabling");
00449 PostMsg(kPROOF_STOP);
00450 return rc;
00451 }
00452
00453 Int_t len = 0;
00454 if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
00455 Error("ProcessUnsolicitedMsg", "empty or bad-formed message - disabling");
00456 PostMsg(kPROOF_STOP);
00457 return rc;
00458 }
00459
00460
00461 Touch();
00462
00463
00464 kXR_int32 acod = 0;
00465 memcpy(&acod, m->GetData(), sizeof(kXR_int32));
00466 if (acod > 10000)
00467 Info("ProcessUnsolicitedMsg", "%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
00468 this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
00469
00470
00471 void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
00472 len -= sizeof(kXR_int32);
00473 if (gDebug > 1)
00474 Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
00475 this, acod, len, m->HeaderSID());
00476
00477 if (gDebug > 3)
00478 fgPipe.DumpReadySock();
00479
00480
00481 kXR_int32 ilev = -1;
00482 const char *lab = 0;
00483
00484 switch (acod) {
00485 case kXPD_ping:
00486
00487
00488 ilev = TProof::kPing;
00489 lab = "kXPD_ping";
00490 case kXPD_interrupt:
00491
00492
00493 lab = !lab ? "kXPD_interrupt" : lab;
00494 { R__LOCKGUARD(fIMtx);
00495 if (acod == kXPD_interrupt) {
00496 memcpy(&ilev, pdata, sizeof(kXR_int32));
00497 ilev = net2host(ilev);
00498
00499 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00500 len -= sizeof(kXR_int32);
00501 }
00502
00503 kXR_int32 ifw = 0;
00504 if (len > 0) {
00505 memcpy(&ifw, pdata, sizeof(kXR_int32));
00506 ifw = net2host(ifw);
00507 if (gDebug > 1)
00508 Info("ProcessUnsolicitedMsg","%s: forwarding option: %d", lab, ifw);
00509 }
00510
00511
00512 fILev = ilev;
00513 fIForward = (ifw == 1) ? kTRUE : kFALSE;
00514
00515
00516
00517 XHandleIn_t hin = {acod, 0, 0, 0};
00518 if (fHandler)
00519 fHandler->HandleInput((const void *)&hin);
00520 else
00521 Error("ProcessUnsolicitedMsg","handler undefined");
00522 }
00523 break;
00524 case kXPD_timer:
00525
00526
00527 {
00528 kXR_int32 opt = 1;
00529 kXR_int32 delay = 0;
00530
00531 if (len > 0) {
00532 memcpy(&opt, pdata, sizeof(kXR_int32));
00533 opt = net2host(opt);
00534 if (gDebug > 1)
00535 Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
00536
00537 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00538 len -= sizeof(kXR_int32);
00539 }
00540
00541 if (len > 0) {
00542 memcpy(&delay, pdata, sizeof(kXR_int32));
00543 delay = net2host(delay);
00544 if (gDebug > 1)
00545 Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
00546
00547 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00548 len -= sizeof(kXR_int32);
00549 }
00550
00551
00552
00553 XHandleIn_t hin = {acod, opt, delay, 0};
00554 if (fHandler)
00555 fHandler->HandleInput((const void *)&hin);
00556 else
00557 Error("ProcessUnsolicitedMsg","handler undefined");
00558 }
00559 break;
00560 case kXPD_inflate:
00561
00562
00563 {
00564 kXR_int32 inflate = 1000;
00565 if (len > 0) {
00566 memcpy(&inflate, pdata, sizeof(kXR_int32));
00567 inflate = net2host(inflate);
00568 if (gDebug > 1)
00569 Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
00570
00571 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00572 len -= sizeof(kXR_int32);
00573 }
00574
00575
00576 XHandleIn_t hin = {acod, inflate, 0, 0};
00577 if (fHandler)
00578 fHandler->HandleInput((const void *)&hin);
00579 else
00580 Error("ProcessUnsolicitedMsg","handler undefined");
00581 }
00582 break;
00583 case kXPD_priority:
00584
00585
00586 {
00587 kXR_int32 priority = -1;
00588 if (len > 0) {
00589 memcpy(&priority, pdata, sizeof(kXR_int32));
00590 priority = net2host(priority);
00591 if (gDebug > 1)
00592 Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
00593
00594 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00595 len -= sizeof(kXR_int32);
00596 }
00597
00598
00599 XHandleIn_t hin = {acod, priority, 0, 0};
00600 if (fHandler)
00601 fHandler->HandleInput((const void *)&hin);
00602 else
00603 Error("ProcessUnsolicitedMsg","handler undefined");
00604 }
00605 break;
00606 case kXPD_flush:
00607
00608
00609 {
00610
00611
00612 XHandleIn_t hin = {acod, 0, 0, 0};
00613 if (fHandler)
00614 fHandler->HandleInput((const void *)&hin);
00615 else
00616 Error("ProcessUnsolicitedMsg","handler undefined");
00617 }
00618 break;
00619 case kXPD_urgent:
00620
00621
00622 {
00623
00624 kXR_int32 type = -1;
00625 if (len > 0) {
00626 memcpy(&type, pdata, sizeof(kXR_int32));
00627 type = net2host(type);
00628 if (gDebug > 1)
00629 Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
00630
00631 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00632 len -= sizeof(kXR_int32);
00633 }
00634
00635 kXR_int32 int1 = -1;
00636 if (len > 0) {
00637 memcpy(&int1, pdata, sizeof(kXR_int32));
00638 int1 = net2host(int1);
00639 if (gDebug > 1)
00640 Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
00641
00642 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00643 len -= sizeof(kXR_int32);
00644 }
00645
00646 kXR_int32 int2 = -1;
00647 if (len > 0) {
00648 memcpy(&int2, pdata, sizeof(kXR_int32));
00649 int2 = net2host(int2);
00650 if (gDebug > 1)
00651 Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
00652
00653 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00654 len -= sizeof(kXR_int32);
00655 }
00656
00657
00658
00659 XHandleIn_t hin = {acod, type, int1, int2};
00660 if (fHandler)
00661 fHandler->HandleInput((const void *)&hin);
00662 else
00663 Error("ProcessUnsolicitedMsg","handler undefined");
00664 }
00665 break;
00666 case kXPD_msg:
00667
00668
00669 { R__LOCKGUARD(fAMtx);
00670
00671
00672 TXSockBuf *b = PopUpSpare(len);
00673 if (!b) {
00674 Error("ProcessUnsolicitedMsg","could allocate spare buffer");
00675 return rc;
00676 }
00677 memcpy(b->fBuf, pdata, len);
00678 b->fLen = len;
00679
00680
00681 fBytesRecv += len;
00682
00683
00684 fAQue.push_back(b);
00685
00686
00687 fgPipe.Post(this);
00688
00689
00690 if (gDebug > 2)
00691 Info("ProcessUnsolicitedMsg","%p: %s: posting semaphore: %p (%d bytes)",
00692 this, GetTitle(), &fASem, len);
00693 fASem.Post();
00694 }
00695
00696 break;
00697 case kXPD_feedback:
00698 Info("ProcessUnsolicitedMsg",
00699 "kXPD_feedback treatment not yet implemented");
00700 break;
00701 case kXPD_srvmsg:
00702
00703
00704 {
00705
00706 kXR_int32 opt = 0;
00707 memcpy(&opt, pdata, sizeof(kXR_int32));
00708 opt = net2host(opt);
00709 if (opt >= 0 && opt <= 4) {
00710
00711 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00712 len -= sizeof(kXR_int32);
00713 } else {
00714 opt = 1;
00715 }
00716
00717 if (opt == 0) {
00718
00719 Printf("| %.*s", len, (char *)pdata);
00720 } else if (opt == 2) {
00721
00722 Printf("%.*s", len, (char *)pdata);
00723 } else if (opt == 3) {
00724
00725 fprintf(stderr, "%.*s", len, (char *)pdata);
00726 } else if (opt == 4) {
00727
00728 fprintf(stderr, "%.*s\r", len, (char *)pdata);
00729 } else {
00730
00731 Printf(" ");
00732 Printf("| Message from server:");
00733 Printf("| %.*s", len, (char *)pdata);
00734 }
00735 }
00736 break;
00737 case kXPD_errmsg:
00738
00739
00740 Printf(" ");
00741 Printf("| Error condition occured: message from server:");
00742 Printf("| %.*s", len, (char *)pdata);
00743
00744 if (fHandler)
00745 fHandler->HandleError();
00746 else
00747 Error("ProcessUnsolicitedMsg","handler undefined");
00748 break;
00749 case kXPD_msgsid:
00750
00751
00752 { R__LOCKGUARD(fAMtx);
00753
00754
00755 kXR_int32 cid = 0;
00756 memcpy(&cid, pdata, sizeof(kXR_int32));
00757 cid = net2host(cid);
00758
00759 if (gDebug > 1)
00760 Info("ProcessUnsolicitedMsg","found cid: %d", cid);
00761
00762
00763 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00764 len -= sizeof(kXR_int32);
00765
00766
00767 TXSockBuf *b = PopUpSpare(len);
00768 if (!b) {
00769 Error("ProcessUnsolicitedMsg","could allocate spare buffer");
00770 return rc;
00771 }
00772 memcpy(b->fBuf, pdata, len);
00773 b->fLen = len;
00774
00775
00776 b->fCid = cid;
00777
00778
00779 fBytesRecv += len;
00780
00781
00782 fAQue.push_back(b);
00783
00784
00785 fgPipe.Post(this);
00786
00787
00788 if (gDebug > 2)
00789 Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
00790 this, cid, &fASem, len);
00791 fASem.Post();
00792 }
00793
00794 break;
00795 case kXPD_wrkmortem:
00796
00797
00798 { TString what = TString::Format("%.*s", len, (char *)pdata);
00799 if (what.BeginsWith("idle-timeout")) {
00800
00801 PostMsg(kPROOF_FATAL, kPROOF_WorkerIdleTO);
00802 } else {
00803 Printf(" ");
00804 Printf("| %s", what.Data());
00805
00806 if (fHandler)
00807 fHandler->HandleError();
00808 else
00809 Error("ProcessUnsolicitedMsg","handler undefined");
00810 }
00811 }
00812 break;
00813
00814 case kXPD_touch:
00815
00816
00817 PostMsg(kPROOF_TOUCH);
00818 break;
00819 case kXPD_resume:
00820
00821
00822 PostMsg(kPROOF_STARTPROCESS);
00823 break;
00824 case kXPD_clusterinfo:
00825
00826
00827 {
00828 kXR_int32 nsess = -1, nacti = -1, neffs = -1;
00829 if (len > 0) {
00830
00831 memcpy(&nsess, pdata, sizeof(kXR_int32));
00832 nsess = net2host(nsess);
00833 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00834 len -= sizeof(kXR_int32);
00835
00836 memcpy(&nacti, pdata, sizeof(kXR_int32));
00837 nacti = net2host(nacti);
00838 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00839 len -= sizeof(kXR_int32);
00840
00841 memcpy(&neffs, pdata, sizeof(kXR_int32));
00842 neffs = net2host(neffs);
00843 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00844 len -= sizeof(kXR_int32);
00845 }
00846 if (gDebug > 1)
00847 Info("ProcessUnsolicitedMsg","kXPD_clusterinfo: # sessions: %d,"
00848 " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
00849
00850
00851 XHandleIn_t hin = {acod, nsess, nacti, neffs};
00852 if (fHandler)
00853 fHandler->HandleInput((const void *)&hin);
00854 else
00855 Error("ProcessUnsolicitedMsg","handler undefined");
00856 }
00857 break;
00858 default:
00859 Error("ProcessUnsolicitedMsg","%p: unknown action code: %d received from '%s' - disabling",
00860 this, acod, GetTitle());
00861 PostMsg(kPROOF_STOP);
00862 break;
00863 }
00864
00865
00866 return rc;
00867 }
00868
00869
00870 void TXSocket::PostMsg(Int_t type, const char *msg)
00871 {
00872
00873
00874
00875
00876
00877
00878
00879 TMessage m(type);
00880
00881
00882 if (msg && strlen(msg) > 0)
00883 m << TString(msg);
00884
00885
00886 m.SetLength();
00887
00888
00889 char *mbuf = m.Buffer();
00890 Int_t mlen = m.Length();
00891 if (m.CompBuffer()) {
00892 mbuf = m.CompBuffer();
00893 mlen = m.CompLength();
00894 }
00895
00896
00897
00898 R__LOCKGUARD(fAMtx);
00899
00900
00901 TXSockBuf *b = PopUpSpare(mlen);
00902 if (!b) {
00903 Error("PostMsg", "could allocate spare buffer");
00904 return;
00905 }
00906
00907
00908 memcpy(b->fBuf, mbuf, mlen);
00909 b->fLen = mlen;
00910
00911
00912 fBytesRecv += mlen;
00913
00914
00915 fAQue.push_back(b);
00916
00917
00918 fgPipe.Post(this);
00919
00920
00921 if (gDebug > 0)
00922 Info("PostMsg", "%p: posting type %d to semaphore: %p (%d bytes)",
00923 this, type, &fASem, mlen);
00924 fASem.Post();
00925
00926
00927 return;
00928 }
00929
00930
00931 Bool_t TXSocket::IsServProofd()
00932 {
00933
00934
00935 if (fConn && (fConn->GetServType() == XrdProofConn::kSTProofd))
00936 return kTRUE;
00937
00938
00939 return kFALSE;
00940 }
00941
00942
00943 Int_t TXSocket::GetInterrupt(Bool_t &forward)
00944 {
00945
00946
00947
00948 if (gDebug > 2)
00949 Info("GetInterrupt","%p: waiting to lock mutex %p", this, fIMtx);
00950
00951 R__LOCKGUARD(fIMtx);
00952
00953
00954 Int_t ilev = -1;
00955 forward = kFALSE;
00956
00957
00958 if (fILev == -1)
00959 Error("GetInterrupt", "value is unset (%d) - protocol error",fILev);
00960
00961
00962 ilev = fILev;
00963 forward = fIForward;
00964
00965
00966 fILev = -1;
00967 fIForward = kFALSE;
00968
00969
00970 return ilev;
00971 }
00972
00973
00974 Int_t TXSocket::Flush()
00975 {
00976
00977
00978
00979
00980 Int_t nf = 0;
00981 list<TXSockBuf *> splist;
00982 list<TXSockBuf *>::iterator i;
00983
00984 { R__LOCKGUARD(fAMtx);
00985
00986
00987 if (fAQue.size() > 0) {
00988
00989
00990 Int_t sz = fAQue.size();
00991
00992 for (i = fAQue.begin(); i != fAQue.end();) {
00993 if (*i) {
00994 splist.push_back(*i);
00995 nf += (*i)->fLen;
00996 i = fAQue.erase(i);
00997 }
00998 }
00999
01000
01001 while (sz--)
01002 fASem.TryWait();
01003 fAQue.clear();
01004 }
01005 }
01006
01007
01008 if (splist.size() > 0) {
01009 R__LOCKGUARD(&fgSMtx);
01010 for (i = splist.begin(); i != splist.end();) {
01011 fgSQue.push_back(*i);
01012 i = splist.erase(i);
01013 }
01014 }
01015
01016
01017 return nf;
01018 }
01019
01020
01021 Bool_t TXSocket::Create(Bool_t attach)
01022 {
01023
01024
01025
01026
01027 if (!IsValid()) {
01028 if (gDebug > 0)
01029 Info("Create","not connected: nothing to do");
01030 return kFALSE;
01031 }
01032
01033 Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
01034
01035 while (retriesleft--) {
01036
01037 XPClientRequest reqhdr;
01038
01039
01040 memset( &reqhdr, 0, sizeof(reqhdr));
01041 fConn->SetSID(reqhdr.header.streamid);
01042
01043
01044 if (fMode == 'A' || attach) {
01045 reqhdr.header.requestid = kXP_attach;
01046 reqhdr.proof.sid = fSessionID;
01047 } else {
01048 reqhdr.header.requestid = kXP_create;
01049 }
01050
01051
01052 reqhdr.proof.int1 = fLogLevel;
01053
01054
01055 const void *buf = (const void *)(fBuffer.Data());
01056 reqhdr.header.dlen = fBuffer.Length();
01057 if (gDebug >= 2)
01058 Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
01059
01060
01061 if (gDebug > 1)
01062 Info("Create", "creating session of server %s", fUrl.Data());
01063
01064
01065 char *answData = 0;
01066 XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
01067 &answData, "TXSocket::Create", 0);
01068 struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
01069
01070
01071 fBuffer = "";
01072 if (xrsp) {
01073
01074
01075
01076 void *pdata = (void *)(xrsp->GetData());
01077 Int_t len = xrsp->DataLen();
01078
01079 if (len >= (Int_t)sizeof(kXR_int32)) {
01080
01081 kXR_int32 psid = 0;
01082 memcpy(&psid, pdata, sizeof(kXR_int32));
01083 fSessionID = net2host(psid);
01084 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
01085 len -= sizeof(kXR_int32);
01086 } else {
01087 Error("Create","session ID is undefined!");
01088 }
01089
01090 if (len >= (Int_t)sizeof(kXR_int16)) {
01091
01092 kXR_int16 dver = 0;
01093 memcpy(&dver, pdata, sizeof(kXR_int16));
01094 fRemoteProtocol = net2host(dver);
01095 pdata = (void *)((char *)pdata + sizeof(kXR_int16));
01096 len -= sizeof(kXR_int16);
01097 } else {
01098 Warning("Create","protocol version of the remote PROOF undefined!");
01099 }
01100
01101 if (fRemoteProtocol == 0) {
01102
01103 len += sizeof(kXR_int16);
01104 kXR_int32 dver = 0;
01105 memcpy(&dver, pdata, sizeof(kXR_int32));
01106 fRemoteProtocol = net2host(dver);
01107 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
01108 len -= sizeof(kXR_int32);
01109 } else {
01110 if (len >= (Int_t)sizeof(kXR_int16)) {
01111
01112 kXR_int16 dver = 0;
01113 memcpy(&dver, pdata, sizeof(kXR_int16));
01114 fXrdProofdVersion = net2host(dver);
01115 pdata = (void *)((char *)pdata + sizeof(kXR_int16));
01116 len -= sizeof(kXR_int16);
01117 } else {
01118 Warning("Create","version of the remote XrdProofdProtocol undefined!");
01119 }
01120 }
01121
01122 if (len > 0) {
01123
01124 char *url = new char[len+1];
01125 memcpy(url, pdata, len);
01126 url[len] = 0;
01127 fBuffer = url;
01128 delete[] url;
01129 }
01130
01131
01132 SafeDelete(xrsp);
01133 if (srvresp)
01134 free(srvresp);
01135
01136
01137 return kTRUE;
01138 } else {
01139
01140 if (fConn->GetOpenError() == kXP_TooManySess) {
01141
01142 fSessionID = -1;
01143 return kFALSE;
01144 } else {
01145
01146 if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr())
01147 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01148 }
01149 }
01150
01151 if (gDebug > 0)
01152 Info("Create", "creation/attachment attempt failed: %d attempts left", retriesleft);
01153 if (retriesleft <= 0)
01154 Error("Create", "%d creation/attachment attempts failed: no attempts left",
01155 gEnv->GetValue("XProof.CreationRetries", 4));
01156
01157 }
01158
01159
01160 Error("Create:",
01161 "problems creating or attaching to a remote server (%s)",
01162 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
01163 return kFALSE;
01164 }
01165
01166
01167 Int_t TXSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
01168 {
01169
01170
01171
01172
01173
01174 TSystem::ResetErrno();
01175
01176
01177 fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
01178 : (~kXPD_async & fSendOpt) ;
01179
01180
01181 XPClientRequest Request;
01182 memset( &Request, 0, sizeof(Request) );
01183 fConn->SetSID(Request.header.streamid);
01184 Request.sendrcv.requestid = kXP_sendmsg;
01185 Request.sendrcv.sid = fSessionID;
01186 Request.sendrcv.opt = fSendOpt;
01187 Request.sendrcv.cid = GetClientID();
01188 Request.sendrcv.dlen = length;
01189 if (gDebug >= 2)
01190 Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
01191
01192
01193 XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
01194
01195 if (xrsp) {
01196
01197 Int_t nsent = length;
01198
01199
01200 fBytesSent += length;
01201
01202
01203 SafeDelete(xrsp);
01204
01205
01206 Touch();
01207
01208
01209 return nsent;
01210 } else {
01211
01212 if (fConn->GetLastErr())
01213 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01214 else
01215 Printf("%s: error occured but no message from server", fHost.Data());
01216 }
01217
01218
01219 Error("SendRaw", "%s: problems sending %d bytes to server",
01220 fHost.Data(), length);
01221 return -1;
01222 }
01223
01224
01225 Bool_t TXSocket::Ping(const char *ord)
01226 {
01227
01228
01229
01230
01231 TSystem::ResetErrno();
01232
01233 if (gDebug > 0)
01234 Info("Ping","%p: %s: sid: %d", this, ord ? ord : "int", fSessionID);
01235
01236
01237 if (!IsValid()) {
01238 Error("Ping","not connected: nothing to do");
01239 return kFALSE;
01240 }
01241
01242
01243 kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
01244
01245
01246 XPClientRequest Request;
01247 memset( &Request, 0, sizeof(Request) );
01248 fConn->SetSID(Request.header.streamid);
01249 Request.sendrcv.requestid = kXP_ping;
01250 Request.sendrcv.sid = fSessionID;
01251 Request.sendrcv.opt = options;
01252 Request.sendrcv.dlen = 0;
01253
01254
01255 Bool_t res = kFALSE;
01256 if (fMode != 'i') {
01257 char *pans = 0;
01258 XrdClientMessage *xrsp =
01259 fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
01260 kXR_int32 *pres = (kXR_int32 *) pans;
01261
01262
01263 if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
01264 *pres = net2host(*pres);
01265 res = (*pres == 1) ? kTRUE : kFALSE;
01266
01267 Touch();
01268 } else {
01269
01270 if (fConn->GetLastErr())
01271 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01272 }
01273
01274
01275 SafeDelete(xrsp);
01276
01277 } else {
01278 if (XPD::clientMarshall(&Request) == 0) {
01279 XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
01280 res = (e == kOK) ? kTRUE : kFALSE;
01281 } else {
01282 Error("Ping", "%p: int: problems marshalling request", this);
01283 }
01284 }
01285
01286
01287 if (!res) {
01288 Error("Ping", "%p: %s: problems sending ping to server", this, ord ? ord : "int");
01289 } else if (gDebug > 0) {
01290 Info("Ping","%p: %s: sid: %d OK", this, ord ? ord : "int", fSessionID);
01291 }
01292
01293 return res;
01294 }
01295
01296
01297 void TXSocket::RemoteTouch()
01298 {
01299
01300
01301
01302 TSystem::ResetErrno();
01303
01304 if (gDebug > 0)
01305 Info("RemoteTouch","%p: sending touch request to %s", this, GetName());
01306
01307
01308 if (!IsValid()) {
01309 Error("RemoteTouch","not connected: nothing to do");
01310 return;
01311 }
01312
01313
01314 XPClientRequest Request;
01315 memset( &Request, 0, sizeof(Request) );
01316 fConn->SetSID(Request.header.streamid);
01317 Request.sendrcv.requestid = kXP_touch;
01318 Request.sendrcv.sid = fSessionID;
01319 Request.sendrcv.opt = 0;
01320 Request.sendrcv.dlen = 0;
01321
01322
01323 if (XPD::clientMarshall(&Request) != 0) {
01324 Error("Touch", "%p: problems marshalling request ", this);
01325 return;
01326 }
01327 if (fConn->LowWrite(&Request, 0, 0) != kOK)
01328 Error("Touch", "%p: problems sending touch request to server", this);
01329
01330
01331 return;
01332 }
01333
01334
01335 void TXSocket::CtrlC()
01336 {
01337
01338
01339
01340 TSystem::ResetErrno();
01341
01342 if (gDebug > 0)
01343 Info("CtrlC","%p: sending ctrl-c request to %s", this, GetName());
01344
01345
01346 if (!IsValid()) {
01347 Error("CtrlC","not connected: nothing to do");
01348 return;
01349 }
01350
01351
01352 XPClientRequest Request;
01353 memset( &Request, 0, sizeof(Request) );
01354 fConn->SetSID(Request.header.streamid);
01355 Request.proof.requestid = kXP_ctrlc;
01356 Request.proof.sid = 0;
01357 Request.proof.dlen = 0;
01358
01359
01360 if (XPD::clientMarshall(&Request) != 0) {
01361 Error("CtrlC", "%p: problems marshalling request ", this);
01362 return;
01363 }
01364 if (fConn->LowWrite(&Request, 0, 0) != kOK)
01365 Error("CtrlC", "%p: problems sending ctrl-c request to server", this);
01366
01367
01368 return;
01369 }
01370
01371
01372 Int_t TXSocket::PickUpReady()
01373 {
01374
01375
01376 fBufCur = 0;
01377 fByteLeft = 0;
01378 fByteCur = 0;
01379 if (gDebug > 2)
01380 Info("PickUpReady", "%p: %s: going to sleep", this, GetTitle());
01381
01382
01383 if (!fDontTimeout) {
01384 static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
01385 static Int_t dt = 2000;
01386 Int_t to = timeout;
01387 while (to && !fRDInterrupt) {
01388 if (fASem.Wait(dt) != 0) {
01389 to -= dt;
01390 if (to <= 0) {
01391 Error("PickUpReady","error waiting at semaphore");
01392 return -1;
01393 } else {
01394 if (gDebug > 0)
01395 Info("PickUpReady", "%p: %s: got timeout: retring (%d secs)",
01396 this, GetTitle(), to/1000);
01397 }
01398 } else
01399 break;
01400 }
01401
01402 if (fRDInterrupt) {
01403 Error("PickUpReady","interrupted");
01404 fRDInterrupt = kFALSE;
01405 return -1;
01406 }
01407 } else {
01408
01409 if (fASem.Wait() != 0) {
01410 Error("PickUpReady","error waiting at semaphore");
01411 return -1;
01412 }
01413 }
01414 if (gDebug > 2)
01415 Info("PickUpReady", "%p: %s: waken up", this, GetTitle());
01416
01417 R__LOCKGUARD(fAMtx);
01418
01419
01420 if (fAQue.size() <= 0) {
01421 Error("PickUpReady","queue is empty - protocol error ?");
01422 return -1;
01423 }
01424 fBufCur = fAQue.front();
01425
01426 fAQue.pop_front();
01427
01428 if (fBufCur)
01429 fByteLeft = fBufCur->fLen;
01430
01431 if (gDebug > 2)
01432 Info("PickUpReady", "%p: %s: got message (%d bytes)",
01433 this, GetTitle(), (Int_t)(fBufCur ? fBufCur->fLen : 0));
01434
01435
01436 fBytesRecv += fBufCur->fLen;
01437
01438
01439 if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
01440 SetClientID(fBufCur->fCid);
01441
01442
01443 fgPipe.Clean(this);
01444
01445
01446 return 0;
01447 }
01448
01449
01450 TXSockBuf *TXSocket::PopUpSpare(Int_t size)
01451 {
01452
01453
01454
01455 TXSockBuf *buf = 0;
01456 static Int_t nBuf = 0;
01457
01458
01459 R__LOCKGUARD(&fgSMtx);
01460
01461
01462 Int_t maxsz = 0;
01463 if (fgSQue.size() > 0) {
01464 list<TXSockBuf *>::iterator i;
01465 for (i = fgSQue.begin(); i != fgSQue.end(); i++) {
01466 maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
01467 if ((*i) && (*i)->fSiz >= size) {
01468 buf = *i;
01469 if (gDebug > 2)
01470 Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
01471 size, (int) fgSQue.size(), nBuf, buf, buf->fSiz);
01472
01473 fgSQue.erase(i);
01474 return buf;
01475 }
01476 }
01477
01478 buf = fgSQue.front();
01479 buf->Resize(size);
01480 if (gDebug > 2)
01481 Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
01482 size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
01483
01484 fgSQue.pop_front();
01485 return buf;
01486 }
01487
01488
01489 char *b = (char *)malloc(size);
01490 if (b)
01491 buf = new TXSockBuf(b, size);
01492 nBuf++;
01493 if (gDebug > 2)
01494 Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
01495 size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
01496
01497
01498 return buf;
01499 }
01500
01501
01502 void TXSocket::PushBackSpare()
01503 {
01504
01505
01506 R__LOCKGUARD(&fgSMtx);
01507
01508 if (gDebug > 2)
01509 Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
01510 fBufCur, fBufCur->fSiz, TXSockBuf::BuffMem());
01511
01512 if (TXSockBuf::BuffMem() < TXSockBuf::GetMemMax()) {
01513 fgSQue.push_back(fBufCur);
01514 } else {
01515 delete fBufCur;
01516 }
01517 fBufCur = 0;
01518 fByteCur = 0;
01519 fByteLeft = 0;
01520 }
01521
01522
01523 Int_t TXSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions)
01524 {
01525
01526
01527
01528 if (!buffer || (length <= 0))
01529 return -1;
01530
01531
01532 if (!fBufCur && (PickUpReady() != 0))
01533 return -1;
01534
01535
01536 if (fByteLeft >= length) {
01537 memcpy(buffer, fBufCur->fBuf + fByteCur, length);
01538 fByteCur += length;
01539 if ((fByteLeft -= length) <= 0)
01540
01541 PushBackSpare();
01542
01543 Touch();
01544 return length;
01545 } else {
01546
01547 memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
01548 Int_t at = fByteLeft;
01549 Int_t tobecopied = length - fByteLeft;
01550 PushBackSpare();
01551 while (tobecopied > 0) {
01552
01553 if (PickUpReady() != 0)
01554 return -1;
01555
01556 Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
01557 memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
01558 fByteCur = ncpy;
01559 if ((fByteLeft -= ncpy) <= 0)
01560
01561 PushBackSpare();
01562
01563 tobecopied -= ncpy;
01564 at += ncpy;
01565 }
01566 }
01567
01568
01569 fBytesRecv += length;
01570 fgBytesRecv += length;
01571
01572
01573 Touch();
01574
01575 return length;
01576 }
01577
01578
01579 Int_t TXSocket::SendInterrupt(Int_t type)
01580 {
01581
01582
01583
01584 TSystem::ResetErrno();
01585
01586
01587 XPClientRequest Request;
01588 memset(&Request, 0, sizeof(Request) );
01589 fConn->SetSID(Request.header.streamid);
01590 if (type == (Int_t) TProof::kShutdownInterrupt)
01591 Request.interrupt.requestid = kXP_destroy;
01592 else
01593 Request.interrupt.requestid = kXP_interrupt;
01594 Request.interrupt.sid = fSessionID;
01595 Request.interrupt.type = type;
01596 Request.interrupt.dlen = 0;
01597
01598
01599 XrdClientMessage *xrsp =
01600 fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
01601 if (xrsp) {
01602
01603 Touch();
01604
01605 SafeDelete(xrsp);
01606
01607 return 0;
01608 } else {
01609
01610 if (fConn->GetLastErr())
01611 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01612 }
01613
01614
01615 Error("SendInterrupt", "problems sending interrupt to server");
01616 return -1;
01617 }
01618
01619
01620 Int_t TXSocket::Send(const TMessage &mess)
01621 {
01622
01623
01624
01625 TSystem::ResetErrno();
01626
01627 if (mess.IsReading()) {
01628 Error("Send", "cannot send a message used for reading");
01629 return -1;
01630 }
01631
01632
01633 SendStreamerInfos(mess);
01634
01635
01636 SendProcessIDs(mess);
01637
01638 mess.SetLength();
01639
01640 if (fCompress > 0 && mess.GetCompressionLevel() == 0)
01641 const_cast<TMessage&>(mess).SetCompressionLevel(fCompress);
01642
01643 if (mess.GetCompressionLevel() > 0)
01644 const_cast<TMessage&>(mess).Compress();
01645
01646 char *mbuf = mess.Buffer();
01647 Int_t mlen = mess.Length();
01648 if (mess.CompBuffer()) {
01649 mbuf = mess.CompBuffer();
01650 mlen = mess.CompLength();
01651 }
01652
01653
01654 kXR_int32 fSendOptDefault = fSendOpt;
01655 switch (mess.What()) {
01656 case kPROOF_PROCESS:
01657 fSendOpt |= kXPD_process;
01658 break;
01659 case kPROOF_PROGRESS:
01660 case kPROOF_FEEDBACK:
01661 fSendOpt |= kXPD_fb_prog;
01662 break;
01663 case kPROOF_QUERYSUBMITTED:
01664 fSendOpt |= kXPD_querynum;
01665 fSendOpt |= kXPD_fb_prog;
01666 break;
01667 case kPROOF_STARTPROCESS:
01668 fSendOpt |= kXPD_startprocess;
01669 fSendOpt |= kXPD_fb_prog;
01670 break;
01671 case kPROOF_STOPPROCESS:
01672 fSendOpt |= kXPD_fb_prog;
01673 break;
01674 case kPROOF_SETIDLE:
01675 fSendOpt |= kXPD_setidle;
01676 fSendOpt |= kXPD_fb_prog;
01677 break;
01678 case kPROOF_LOGFILE:
01679 case kPROOF_LOGDONE:
01680 if (GetClientIDSize() <= 1)
01681 fSendOpt |= kXPD_logmsg;
01682 break;
01683 default:
01684 break;
01685 }
01686
01687 if (gDebug > 2)
01688 Info("Send", "sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
01689
01690 Int_t nsent = SendRaw(mbuf, mlen);
01691 fSendOpt = fSendOptDefault;
01692
01693 if (nsent <= 0)
01694 return nsent;
01695
01696 fBytesSent += nsent;
01697 fgBytesSent += nsent;
01698
01699 return nsent - sizeof(UInt_t);
01700 }
01701
01702
01703 Int_t TXSocket::Recv(TMessage *&mess)
01704 {
01705
01706
01707
01708
01709
01710 TSystem::ResetErrno();
01711
01712 if (!IsValid()) {
01713 mess = 0;
01714 return -5;
01715 }
01716
01717 oncemore:
01718 Int_t n;
01719 UInt_t len;
01720 if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
01721 mess = 0;
01722 return n;
01723 }
01724 len = net2host(len);
01725
01726 char *buf = new char[len+sizeof(UInt_t)];
01727 if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
01728 delete [] buf;
01729 mess = 0;
01730 return n;
01731 }
01732
01733 fBytesRecv += n + sizeof(UInt_t);
01734 fgBytesRecv += n + sizeof(UInt_t);
01735
01736 mess = new TMessage(buf, len+sizeof(UInt_t));
01737
01738
01739 if (RecvStreamerInfos(mess))
01740 goto oncemore;
01741
01742
01743 if (RecvProcessIDs(mess))
01744 goto oncemore;
01745
01746 if (mess->What() & kMESS_ACK) {
01747
01748 mess->SetWhat(mess->What() & ~kMESS_ACK);
01749 }
01750
01751 return n;
01752 }
01753
01754
01755 TObjString *TXSocket::SendCoordinator(Int_t kind, const char *msg, Int_t int2,
01756 Long64_t l64, Int_t int3, const char *)
01757 {
01758
01759
01760
01761
01762 TObjString *sout = 0;
01763
01764
01765 XPClientRequest reqhdr;
01766 const void *buf = 0;
01767 char *bout = 0;
01768 char **vout = 0;
01769 memset(&reqhdr, 0, sizeof(reqhdr));
01770 fConn->SetSID(reqhdr.header.streamid);
01771 reqhdr.header.requestid = kXP_admin;
01772 reqhdr.proof.int1 = kind;
01773 reqhdr.proof.int2 = int2;
01774 switch (kind) {
01775 case kQueryROOTVersions:
01776 case kQuerySessions:
01777 case kQueryWorkers:
01778 reqhdr.proof.sid = 0;
01779 reqhdr.header.dlen = 0;
01780 vout = (char **)&bout;
01781 break;
01782 case kCleanupSessions:
01783 reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
01784 : (kXR_int32) kXPD_TopMaster;
01785 reqhdr.proof.int3 = int2;
01786 reqhdr.proof.sid = fSessionID;
01787 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01788 buf = (msg) ? (const void *)msg : buf;
01789 break;
01790 case kCpFile:
01791 case kGetFile:
01792 case kPutFile:
01793 case kExec:
01794 reqhdr.proof.sid = fSessionID;
01795 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01796 buf = (msg) ? (const void *)msg : buf;
01797 vout = (char **)&bout;
01798 break;
01799 case kQueryLogPaths:
01800 vout = (char **)&bout;
01801 case kReleaseWorker:
01802 case kSendMsgToUser:
01803 case kGroupProperties:
01804 case kSessionTag:
01805 case kSessionAlias:
01806 reqhdr.proof.sid = fSessionID;
01807 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01808 buf = (msg) ? (const void *)msg : buf;
01809 break;
01810 case kROOTVersion:
01811 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01812 buf = (msg) ? (const void *)msg : buf;
01813 break;
01814 case kGetWorkers:
01815 reqhdr.proof.sid = fSessionID;
01816 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01817 if (msg)
01818 buf = (const void *)msg;
01819 vout = (char **)&bout;
01820 break;
01821 case kReadBuffer:
01822 reqhdr.header.requestid = kXP_readbuf;
01823 reqhdr.readbuf.ofs = l64;
01824 reqhdr.readbuf.len = int2;
01825 if (int3 > 0 && fXrdProofdVersion < 1003) {
01826 Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
01827 " grep functionality not supported", fXrdProofdVersion);
01828 return sout;
01829 }
01830 reqhdr.readbuf.int1 = int3;
01831 if (!msg || strlen(msg) <= 0) {
01832 Info("SendCoordinator", "kReadBuffer: file path undefined");
01833 return sout;
01834 }
01835 reqhdr.header.dlen = strlen(msg);
01836 buf = (const void *)msg;
01837 vout = (char **)&bout;
01838 break;
01839 default:
01840 Info("SendCoordinator", "unknown message kind: %d", kind);
01841 return sout;
01842 }
01843
01844
01845 Bool_t noterr = (gDebug > 0) ? kTRUE : kFALSE;
01846 XrdClientMessage *xrsp =
01847 fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator", noterr);
01848
01849
01850 if (xrsp) {
01851
01852 if (bout && (xrsp->DataLen() > 0))
01853 sout = new TObjString(TString(bout,xrsp->DataLen()));
01854 if (bout)
01855 free(bout);
01856
01857 Touch();
01858 SafeDelete(xrsp);
01859 } else {
01860
01861 if (fConn->GetLastErr())
01862 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01863 }
01864
01865
01866 return sout;
01867 }
01868
01869
01870 void TXSocket::SendUrgent(Int_t type, Int_t int1, Int_t int2)
01871 {
01872
01873
01874
01875
01876 TSystem::ResetErrno();
01877
01878
01879 XPClientRequest Request;
01880 memset(&Request, 0, sizeof(Request) );
01881 fConn->SetSID(Request.header.streamid);
01882 Request.proof.requestid = kXP_urgent;
01883 Request.proof.sid = fSessionID;
01884 Request.proof.int1 = type;
01885 Request.proof.int2 = int1;
01886 Request.proof.int3 = int2;
01887 Request.proof.dlen = 0;
01888
01889
01890 XrdClientMessage *xrsp =
01891 fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
01892 if (xrsp) {
01893
01894 Touch();
01895
01896 SafeDelete(xrsp);
01897 } else {
01898
01899 if (fConn->GetLastErr())
01900 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01901 }
01902
01903
01904 return;
01905 }
01906
01907
01908 void TXSocket::InitEnvs()
01909 {
01910
01911
01912
01913 Int_t deb = gEnv->GetValue("XProof.Debug", -1);
01914 EnvPutInt(NAME_DEBUG, deb);
01915 if (deb > 0) {
01916 XrdProofdTrace->What |= TRACE_REQ;
01917 if (deb > 1) {
01918 XrdProofdTrace->What |= TRACE_DBG;
01919 if (deb > 2)
01920 XrdProofdTrace->What |= TRACE_ALL;
01921 }
01922 }
01923 const char *cenv = 0;
01924
01925
01926 TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
01927 if (allowCO.Length() > 0)
01928 EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
01929
01930
01931 TString denyCO = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
01932 if (denyCO.Length() > 0)
01933 EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
01934
01935
01936 XrdProofConn::SetRetryParam(-1, -1);
01937 Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
01938 EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
01939 Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
01940 EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
01941
01942
01943 Int_t recoTO = gEnv->GetValue("XProof.ReconnectWait",
01944 DFLT_RECONNECTWAIT);
01945 if (recoTO == DFLT_RECONNECTWAIT) {
01946
01947 recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
01948 DFLT_RECONNECTWAIT);
01949 }
01950 EnvPutInt(NAME_RECONNECTWAIT, recoTO);
01951
01952
01953 Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", 150);
01954 EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
01955
01956
01957 EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
01958
01959
01960 TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
01961 Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
01962 if (socks4Port > 0) {
01963 if (socks4Host.IsNull())
01964
01965 socks4Host = "127.0.0.1";
01966 EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
01967 EnvPutInt(NAME_SOCKS4PORT, socks4Port);
01968 }
01969
01970
01971 TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
01972 if (autolog.Length() > 0 &&
01973 (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
01974 gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
01975
01976
01977 TString netrc;
01978 netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
01979 gSystem->Setenv("XrdSecNETRC", netrc.Data());
01980
01981 TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
01982 if (alogfile.Length() > 0)
01983 gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
01984
01985 TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
01986 if (verisrv.Length() > 0 &&
01987 (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
01988 gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
01989
01990 TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
01991 if (srvpuk.Length() > 0)
01992 gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
01993
01994
01995 TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
01996 if (cadir.Length() > 0)
01997 gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
01998
01999 TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
02000 if (crldir.Length() > 0)
02001 gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
02002
02003 TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
02004 if (crlext.Length() > 0)
02005 gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
02006
02007 TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
02008 if (ucert.Length() > 0)
02009 gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
02010
02011 TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
02012 if (ukey.Length() > 0)
02013 gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
02014
02015 TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
02016 if (upxy.Length() > 0)
02017 gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
02018
02019 TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
02020 if (valid.Length() > 0)
02021 gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
02022
02023 TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
02024 if (deplen.Length() > 0 &&
02025 (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
02026 gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
02027
02028 TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
02029 if (pxybits.Length() > 0)
02030 gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
02031
02032 TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
02033 if (crlcheck.Length() > 0 &&
02034 (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
02035 gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
02036
02037 TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
02038 if (delegpxy.Length() > 0 &&
02039 (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
02040 gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
02041
02042 TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
02043 if (signpxy.Length() > 0 &&
02044 (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
02045 gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
02046
02047
02048 if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
02049 ::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
02050 gROOT->GetVersion());
02051
02052
02053 fgInitDone = kTRUE;
02054 }
02055
02056
02057 Int_t TXSocket::Reconnect()
02058 {
02059
02060
02061 if (gDebug > 0) {
02062 Info("Reconnect", "%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
02063 this, fConn, (fConn ? fConn->IsValid() : 0),
02064 fUrl.Data(), fConn->GetLogConnID());
02065 }
02066
02067 if (fXrdProofdVersion < 1005) {
02068 Info("Reconnect","%p: server does not support reconnections (protocol: %d < 1005)",
02069 this, fXrdProofdVersion);
02070 return -1;
02071 }
02072
02073 if (fConn) {
02074 if (gDebug > 0)
02075 Info("Reconnect", "%p: locking phyconn: %p", this, fConn->fPhyConn);
02076 fConn->ReConnect();
02077 if (fConn->IsValid()) {
02078
02079 if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
02080
02081 if (!Create(kTRUE)) {
02082
02083 Error("TXSocket", "create or attach failed (%s)",
02084 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
02085 Close();
02086 return -1;
02087 }
02088 }
02089 }
02090 }
02091
02092 if (gDebug > 0) {
02093 Info("Reconnect", "%p (c:%p): attempt %s (logid: %d)", this, fConn,
02094 ((fConn && fConn->IsValid()) ? "succeeded!" : "failed"),
02095 fConn->GetLogConnID() );
02096 }
02097
02098
02099 return ((fConn && fConn->IsValid()) ? 0 : -1);
02100 }
02101
02102
02103 TXSockBuf::TXSockBuf(Char_t *bp, Int_t sz, Bool_t own)
02104 {
02105
02106 fBuf = fMem = bp;
02107 fSiz = fLen = sz;
02108 fOwn = own;
02109 fCid = -1;
02110 fgBuffMem += sz;
02111 }
02112
02113
02114 TXSockBuf::~TXSockBuf()
02115 {
02116
02117 if (fOwn && fMem) {
02118 free(fMem);
02119 fgBuffMem -= fSiz;
02120 }
02121 }
02122
02123
02124 void TXSockBuf::Resize(Int_t sz)
02125 {
02126
02127 if (sz > fSiz) {
02128 if ((fMem = (Char_t *)realloc(fMem, sz))) {
02129 fgBuffMem += (sz - fSiz);
02130 fBuf = fMem;
02131 fSiz = sz;
02132 fLen = 0;
02133 }
02134 }
02135 }
02136
02137
02138
02139
02140
02141
02142
02143 Long64_t TXSockBuf::BuffMem()
02144 {
02145
02146
02147 return fgBuffMem;
02148 }
02149
02150
02151 Long64_t TXSockBuf::GetMemMax()
02152 {
02153
02154
02155 return fgMemMax;
02156 }
02157
02158
02159 void TXSockBuf::SetMemMax(Long64_t memmax)
02160 {
02161
02162
02163 fgMemMax = memmax > 0 ? memmax : fgMemMax;
02164 }
02165
02166
02167
02168
02169
02170
02171
02172 TXSockPipe::TXSockPipe(const char *loc) : fMutex(kTRUE), fLoc(loc)
02173 {
02174
02175
02176
02177 if (pipe(fPipe) != 0) {
02178 Printf("TXSockPipe: problem initializing pipe for socket inputs");
02179 fPipe[0] = -1;
02180 fPipe[1] = -1;
02181 return;
02182 }
02183 }
02184
02185
02186 TXSockPipe::~TXSockPipe()
02187 {
02188
02189
02190 if (fPipe[0] >= 0) close(fPipe[0]);
02191 if (fPipe[1] >= 0) close(fPipe[1]);
02192 }
02193
02194
02195
02196 Int_t TXSockPipe::Post(TSocket *s)
02197 {
02198
02199
02200
02201 if (!IsValid() || !s) return -1;
02202
02203
02204 Int_t sz = 0;
02205 { R__LOCKGUARD(&fMutex);
02206
02207 fReadySock.Add(s);
02208
02209
02210 Char_t c = 1;
02211 if (write(fPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
02212 Printf("TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
02213 return -1;
02214 }
02215 if (gDebug > 2) sz = fReadySock.GetSize();
02216 }
02217
02218 if (gDebug > 2)
02219 Printf("TXSockPipe::Post: %s: %p: pipe posted (pending %d)",
02220 fLoc.Data(), s, sz);
02221
02222 return 0;
02223 }
02224
02225
02226 Int_t TXSockPipe::Clean(TSocket *s)
02227 {
02228
02229
02230
02231 if (!IsValid() || !s) return -1;
02232
02233
02234 Int_t sz = 0;
02235 Char_t c = 0;
02236 { R__LOCKGUARD(&fMutex);
02237 if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
02238 Printf("TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
02239 return -1;
02240 }
02241
02242 fReadySock.Remove(s);
02243
02244 if (gDebug > 2) sz = fReadySock.GetSize();
02245 }
02246
02247 if (gDebug > 2)
02248 Printf("TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d)",
02249 fLoc.Data(), s, sz);
02250
02251
02252 return 0;
02253 }
02254
02255
02256 Int_t TXSockPipe::Flush(TSocket *s)
02257 {
02258
02259
02260
02261
02262 if (!IsValid() || !s) return -1;
02263
02264 TObject *o = 0;
02265
02266 { R__LOCKGUARD(&fMutex);
02267 o = fReadySock.FindObject(s);
02268
02269 while (o) {
02270
02271 fReadySock.Remove(s);
02272 o = fReadySock.FindObject(s);
02273
02274 Char_t c = 0;
02275 if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1)
02276 Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
02277 }
02278 }
02279
02280 ((TXSocket *)s)->Flush();
02281
02282
02283 if (gDebug > 0)
02284 Printf("TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
02285
02286
02287 return 0;
02288 }
02289
02290
02291 void TXSockPipe::DumpReadySock()
02292 {
02293
02294
02295 R__LOCKGUARD(&fMutex);
02296
02297 TString buf = Form("%d |", fReadySock.GetSize());
02298 TIter nxs(&fReadySock);
02299 TObject *o = 0;
02300 while ((o = nxs()))
02301 buf += Form(" %p",o);
02302 Printf("TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
02303 }
02304
02305
02306 TXSocket *TXSockPipe::GetLastReady()
02307 {
02308
02309
02310 R__LOCKGUARD(&fMutex);
02311
02312 return (TXSocket *) fReadySock.Last();
02313 }