00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 const char *XrdClientConnCVSID = "$Id: XrdClientConn.cc 38011 2011-02-08 18:35:57Z ganis $";
00017
00018 #include "XrdClient/XrdClientDebug.hh"
00019
00020 #include "XrdClient/XrdClientConnMgr.hh"
00021 #include "XrdClient/XrdClientConn.hh"
00022 #include "XrdClient/XrdClientLogConnection.hh"
00023 #include "XrdClient/XrdClientPhyConnection.hh"
00024 #include "XrdClient/XrdClientProtocol.hh"
00025
00026 #include "XrdOuc/XrdOucErrInfo.hh"
00027 #include "XrdSec/XrdSecInterface.hh"
00028 #include "XrdNet/XrdNetDNS.hh"
00029 #include "XrdClient/XrdClientUrlInfo.hh"
00030 #include "XrdClient/XrdClientEnv.hh"
00031 #include "XrdClient/XrdClientAbs.hh"
00032
00033 #include "XrdClient/XrdClientSid.hh"
00034
00035 #include "XrdSys/XrdSysPriv.hh"
00036
00037
00038
00039
00040 #if defined(__solaris__)
00041 #include <sys/isa_defs.h>
00042 #if defined(_ILP32) && (_FILE_OFFSET_BITS != 32)
00043 #undef _FILE_OFFSET_BITS
00044 #define _FILE_OFFSET_BITS 32
00045 #undef _LARGEFILE_SOURCE
00046 #endif
00047 #endif
00048
00049 #ifndef WIN32
00050 #include <dlfcn.h>
00051 #ifndef __macos__
00052 #include <link.h>
00053 #endif
00054 #endif
00055
00056 #include <stdio.h>
00057 #include <stdlib.h>
00058 #ifndef WIN32
00059 #include <pwd.h>
00060 #include <sys/types.h>
00061 #include <unistd.h>
00062 #else
00063 #include <process.h>
00064 #include "XrdSys/XrdWin32.hh"
00065 #endif
00066 #include <string.h>
00067 #include <ctype.h>
00068
00069 #define SafeDelete(x) { if (x) { delete x; x = 0; } }
00070
00071
00072
00073 typedef XrdSecProtocol *(*XrdSecGetProt_t)(const char *, const struct sockaddr &,
00074 const XrdSecParameters &, XrdOucErrInfo *);
00075
00076 XrdOucHash<XrdClientConn::SessionIDInfo> XrdClientConn::fSessionIDRepo;
00077
00078
00079 XrdClientConnectionMgr *XrdClientConn::fgConnectionMgr = 0;
00080 XrdOucString XrdClientConn::fgClientHostDomain;
00081
00082
00083 void ParseRedirHost(XrdOucString &host, XrdOucString &opaque, XrdOucString &token)
00084 {
00085
00086
00087
00088 int pos;
00089
00090 token = "";
00091 opaque = "";
00092
00093 if ( (pos = host.find('?')) != STR_NPOS ) {
00094 opaque.assign(host,pos+1);
00095 host.erasefromend(host.length()-pos);
00096
00097 if ( (pos = opaque.find('?')) != STR_NPOS ) {
00098 token.assign(host,pos+1);
00099 opaque.erasefromend(opaque.length()-pos);
00100 }
00101
00102 }
00103
00104 }
00105
00106
00107 void ParseRedir(XrdClientMessage* xmsg, int &port, XrdOucString &host, XrdOucString &opaque, XrdOucString &token)
00108 {
00109
00110
00111
00112
00113
00114 struct ServerResponseBody_Redirect* redirdata =
00115 (struct ServerResponseBody_Redirect*)xmsg->GetData();
00116
00117 port = 0;
00118
00119 if (redirdata) {
00120 XrdOucString h(redirdata->host);
00121 ParseRedirHost(h, opaque, token);
00122 host = h;
00123 port = ntohl(redirdata->port);
00124 }
00125 }
00126
00127
00128
00129
00130
00131 XrdClientConn::XrdClientConn(): fOpenError((XErrorCode)0), fUrl(""),
00132 fLBSUrl(0),
00133 fConnected(false),
00134 fGettingAccessToSrv(false),
00135 fMainReadCache(0),
00136 fREQWaitRespData(0),
00137 fREQWaitTimeLimit(0),
00138 fREQConnectWaitTimeLimit(0) {
00139
00140 ClearLastServerError();
00141 memset(&LastServerResp, 0, sizeof(LastServerResp));
00142 LastServerResp.status = kXR_noResponsesYet;
00143
00144 fREQUrl.Clear();
00145 fREQWait = new XrdSysCondVar(0);
00146 fREQConnectWait = new XrdSysCondVar(0);
00147 fREQWaitResp = new XrdSysCondVar(0);
00148 fWriteWaitAck = new XrdSysCondVar(0);
00149
00150 fRedirHandler = 0;
00151 fUnsolMsgHandler = 0;
00152
00153
00154 fGlobalRedirLastUpdateTimestamp = time(0);
00155 fGlobalRedirCnt = 0;
00156 fMaxGlobalRedirCnt = EnvGetLong(NAME_MAXREDIRECTCOUNT);
00157
00158 fOpenSockFD = -1;
00159
00160
00161 if (!fgConnectionMgr) {
00162 if (!(fgConnectionMgr = new XrdClientConnectionMgr())) {
00163 Error("XrdClientConn::XrdClientConn", "initializing connection manager");
00164 }
00165
00166 char buf[255];
00167 gethostname(buf, sizeof(buf));
00168 fgClientHostDomain = GetDomainToMatch(buf);
00169
00170 if (fgClientHostDomain == "")
00171 Error("XrdClientConn",
00172 "Error resolving this host's domain name." );
00173
00174 XrdOucString goodDomainsRE = fgClientHostDomain;
00175 goodDomainsRE += "|*";
00176
00177 if (EnvGetString(NAME_REDIRDOMAINALLOW_RE) == 0)
00178 EnvPutString(NAME_REDIRDOMAINALLOW_RE, goodDomainsRE.c_str());
00179 if (EnvGetString(NAME_REDIRDOMAINDENY_RE) == 0)
00180 EnvPutString(NAME_REDIRDOMAINDENY_RE, "<unknown>");
00181 if (EnvGetString(NAME_CONNECTDOMAINALLOW_RE) == 0)
00182 EnvPutString(NAME_CONNECTDOMAINALLOW_RE, goodDomainsRE.c_str());
00183 if (EnvGetString(NAME_CONNECTDOMAINDENY_RE) == 0)
00184 EnvPutString(NAME_CONNECTDOMAINDENY_RE, "<unknown>");
00185 }
00186
00187
00188 fServerType = kSTNone;
00189 }
00190
00191
00192 XrdClientConn::~XrdClientConn()
00193 {
00194
00195
00196
00197 Disconnect(FALSE);
00198
00199
00200 if (fMainReadCache && (DebugLevel() >= XrdClientDebug::kUSERDEBUG))
00201 fMainReadCache->PrintPerfCounters();
00202
00203 if (fLBSUrl) delete fLBSUrl;
00204
00205 if (fMainReadCache) delete fMainReadCache;
00206 fMainReadCache = 0;
00207
00208 delete fREQWait;
00209 fREQWait = 0;
00210
00211 delete fREQConnectWait;
00212 fREQConnectWait = 0;
00213
00214 delete fREQWaitResp;
00215 fREQWaitResp = 0;
00216
00217 delete fWriteWaitAck;
00218 fWriteWaitAck = 0;
00219
00220 }
00221
00222
00223 short XrdClientConn::Connect(XrdClientUrlInfo Host2Conn,
00224 XrdClientAbsUnsolMsgHandler *unsolhandler)
00225 {
00226
00227
00228
00229
00230
00231
00232
00233
00234 short logid;
00235 logid = -1;
00236 fPrimaryStreamid = 0;
00237 fLogConnID = 0;
00238
00239 CheckREQConnectWaitState();
00240
00241 Info(XrdClientDebug::kHIDEBUG,
00242 "XrdClientConn", "Trying to connect to " <<
00243 Host2Conn.HostAddr << ":" << Host2Conn.Port);
00244
00245 logid = ConnectionManager->Connect(Host2Conn);
00246
00247 Info(XrdClientDebug::kHIDEBUG,
00248 "Connect", "Connect(" << Host2Conn.Host << ", " <<
00249 Host2Conn.Port << ") returned " <<
00250 logid );
00251
00252 if (logid < 0) {
00253 Error("XrdNetFile",
00254 "Error creating logical connection to " <<
00255 Host2Conn.Host << ":" << Host2Conn.Port );
00256
00257 fLogConnID = logid;
00258 fConnected = FALSE;
00259 return -1;
00260 }
00261
00262 fConnected = TRUE;
00263
00264 fLogConnID = logid;
00265 fPrimaryStreamid = ConnectionManager->GetConnection(fLogConnID)->Streamid();
00266
00267 ConnectionManager->GetConnection(fLogConnID)->UnsolicitedMsgHandler = unsolhandler;
00268 fUnsolMsgHandler = unsolhandler;
00269
00270 return logid;
00271 }
00272
00273
00274 void XrdClientConn::Disconnect(bool ForcePhysicalDisc)
00275 {
00276
00277
00278 ConnectionManager->SidManager()->GetAllOutstandingWriteRequests(fPrimaryStreamid, fWriteReqsToRetry);
00279
00280 if (fMainReadCache && (DebugLevel() >= XrdClientDebug::kDUMPDEBUG) ) fMainReadCache->PrintCache();
00281
00282 if (fConnected)
00283 ConnectionManager->Disconnect(fLogConnID, ForcePhysicalDisc);
00284
00285 fConnected = FALSE;
00286 }
00287
00288
00289 XrdClientMessage *XrdClientConn::ClientServerCmd(ClientRequest *req, const void *reqMoreData,
00290 void **answMoreDataAllocated,
00291 void *answMoreData, bool HasToAlloc,
00292 int substreamid)
00293 {
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315 int len;
00316
00317
00318 size_t TotalBlkSize = 0;
00319
00320 void *tmpMoreData;
00321 XReqErrorType errorType = kOK;
00322
00323 XrdClientMessage *xmsg = 0;
00324
00325
00326
00327
00328
00329
00330 do {
00331
00332
00333
00334
00335
00336
00337
00338
00339 len = sizeof(ClientRequest);
00340
00341
00342
00343
00344
00345 SetSID(req->header.streamid);
00346
00347 errorType = WriteToServer(req, reqMoreData, fLogConnID, substreamid);
00348
00349
00350
00351
00352
00353 TotalBlkSize = 0;
00354
00355
00356 tmpMoreData = 0;
00357 if ((answMoreData != 0) && !HasToAlloc)
00358 tmpMoreData = answMoreData;
00359
00360
00361 do {
00362
00363 XrdClientConn::EThreeStateReadHandler whatToDo;
00364
00365 delete xmsg;
00366
00367 xmsg = ReadPartialAnswer(errorType, TotalBlkSize, req, HasToAlloc,
00368 &tmpMoreData, whatToDo);
00369
00370
00371
00372 if (xmsg && fMainReadCache && (req->header.requestid == kXR_read) &&
00373 ((xmsg->HeaderStatus() == kXR_oksofar) ||
00374 (xmsg->HeaderStatus() == kXR_ok)))
00375
00376 fMainReadCache->SubmitXMessage(xmsg, req->read.offset + TotalBlkSize - xmsg->fHdr.dlen,
00377 req->read.offset + TotalBlkSize - 1);
00378
00379 if (whatToDo == kTSRHReturnNullMex) {
00380 delete xmsg;
00381 return 0;
00382 }
00383
00384 if (whatToDo == kTSRHReturnMex)
00385 return xmsg;
00386
00387 if (xmsg && (xmsg->HeaderStatus() == kXR_oksofar) &&
00388 (xmsg->DataLen() == 0))
00389 return xmsg;
00390
00391 } while (xmsg && (xmsg->HeaderStatus() == kXR_oksofar));
00392
00393 } while ((fGlobalRedirCnt < fMaxGlobalRedirCnt) &&
00394 !IsOpTimeLimitElapsed(time(0)) &&
00395 xmsg && (xmsg->HeaderStatus() == kXR_redirect));
00396
00397
00398
00399 if (HasToAlloc && (answMoreDataAllocated)) {
00400 *answMoreDataAllocated = tmpMoreData;
00401 }
00402
00403
00404 if (xmsg && (xmsg->HeaderStatus() == kXR_ok) && TotalBlkSize)
00405 xmsg->fHdr.dlen = TotalBlkSize;
00406
00407 return xmsg;
00408 }
00409
00410
00411 bool XrdClientConn::SendGenCommand(ClientRequest *req, const void *reqMoreData,
00412 void **answMoreDataAllocated,
00413 void *answMoreData, bool HasToAlloc,
00414 char *CmdName,
00415 int substreamid) {
00416
00417
00418
00419 short retry = 0;
00420 bool resp = FALSE, abortcmd = FALSE;
00421
00422 string orig_fname,new_fname;
00423 if (req->header.requestid == kXR_open && reqMoreData)
00424 orig_fname.assign((const char *)reqMoreData);
00425
00426
00427
00428 if (req->header.requestid == kXR_open)
00429 fOpenError = (XErrorCode)0;
00430
00431
00432
00433
00434
00435 while (!abortcmd && !resp) {
00436 abortcmd = FALSE;
00437
00438
00439 CheckREQPauseState();
00440
00441
00442
00443 Info(XrdClientDebug::kHIDEBUG,
00444 "SendGenCommand","Sending command " << CmdName);
00445
00446
00447 if ( (req->header.requestid == kXR_open) &&
00448 (GetServerProtocol() < 0x00000270) ) {
00449 if (req->open.options & kXR_retstat)
00450 req->open.options ^= kXR_retstat;
00451
00452 Info(XrdClientDebug::kHIDEBUG, "SendGenCommand",
00453 "Old server proto version(" << GetServerProtocol() <<
00454 ". kXR_retstat is now disabled. Current open options: " << req->open.options);
00455 }
00456
00457
00458 kXR_int32 oldlen = 0;
00459 if (req->header.requestid == kXR_open && reqMoreData) {
00460 oldlen = req->open.dlen;
00461 new_fname = orig_fname;
00462 if (fRedirOpaque.length()) {
00463 new_fname += "?";
00464 new_fname += string(fRedirOpaque.c_str());
00465 }
00466 reqMoreData = new_fname.c_str();
00467 req->open.dlen = new_fname.length();
00468 }
00469
00470 XrdClientMessage *cmdrespMex = ClientServerCmd(req, reqMoreData,
00471 answMoreDataAllocated,
00472 answMoreData, HasToAlloc,
00473 substreamid);
00474
00475 if (req->header.requestid == kXR_open && reqMoreData) {
00476 req->open.dlen = oldlen;
00477 }
00478
00479
00480
00481 if (cmdrespMex)
00482 memcpy(&LastServerResp, &cmdrespMex->fHdr,sizeof(struct ServerResponseHeader));
00483
00484
00485 if (IsOpTimeLimitElapsed(time(0))) {
00486 Error("SendGenCommand",
00487 "Max time limit elapsed for request " <<
00488 convertRequestIdToChar(req->header.requestid) <<
00489 ". Aborting command.");
00490 abortcmd = TRUE;
00491
00492 } else
00493
00494 if (fGlobalRedirCnt >= fMaxGlobalRedirCnt) {
00495 Error("SendGenCommand",
00496 "Too many redirections for request " <<
00497 convertRequestIdToChar(req->header.requestid) <<
00498 ". Aborting command.");
00499
00500 abortcmd = TRUE;
00501 }
00502 else {
00503
00504
00505
00506 if (!cmdrespMex || cmdrespMex->IsError()) {
00507
00508 Info(XrdClientDebug::kHIDEBUG,
00509 "SendGenCommand", "Got (and maybe recovered) an error from " <<
00510 fUrl.Host << ":" << fUrl.Port);
00511
00512
00513
00514
00515 if (req->header.requestid != kXR_open)
00516 retry++;
00517
00518 if (retry > kXR_maxReqRetry) {
00519 Error("SendGenCommand",
00520 "Too many errors communication errors with server"
00521 ". Aborting command.");
00522
00523 abortcmd = TRUE;
00524 }
00525
00526 else
00527 if (req->header.requestid == kXR_bind) {
00528 Info(XrdClientDebug::kHIDEBUG,
00529 "SendGenCommand", "Parallel stream bind failure. Aborting request." <<
00530 fUrl.Host << ":" << fUrl.Port);
00531
00532 abortcmd = TRUE;
00533 }
00534
00535
00536 else {
00537
00538
00539
00540
00541
00542 if ( (LastServerResp.status != kXR_ok) &&
00543 ( (req->header.requestid == kXR_read) ||
00544 (req->header.requestid == kXR_write) ||
00545 (req->header.requestid == kXR_sync) ||
00546 (req->header.requestid == kXR_close) ) ) {
00547
00548 Info(XrdClientDebug::kHIDEBUG,
00549 "SendGenCommand", "Recovery failure detected. Aborting request." <<
00550 fUrl.Host << ":" << fUrl.Port);
00551
00552 abortcmd = TRUE;
00553
00554 }
00555 else
00556 abortcmd = FALSE;
00557
00558 }
00559 } else {
00560
00561
00562
00563 resp = CheckResp(&cmdrespMex->fHdr, CmdName);
00564 retry++;
00565
00566
00567
00568
00569 if (!resp) {
00570
00571
00572
00573 if (cmdrespMex->fHdr.status == kXR_waitresp) {
00574
00575 kXR_int32 *maxwait = (kXR_int32 *)cmdrespMex->GetData();
00576 kXR_int32 mw;
00577
00578 if (maxwait)
00579 mw = ntohl(*maxwait);
00580 else mw = 30;
00581
00582 if (!WaitResp(mw)) {
00583
00584
00585 memcpy(&LastServerResp, &fREQWaitRespData->resphdr,
00586 sizeof(struct ServerResponseHeader));
00587
00588
00589
00590
00591 if (fREQWaitRespData->resphdr.status == kXR_wait) {
00592 cmdrespMex->fHdr.status = kXR_wait;
00593 if (fREQWaitRespData->resphdr.dlen)
00594 memcpy(cmdrespMex->GetData(), fREQWaitRespData->respdata, sizeof(kXR_int32));
00595 else memset(cmdrespMex->GetData(), 0, sizeof(kXR_int32));
00596
00597 CheckErrorStatus(cmdrespMex, retry, CmdName);
00598
00599 resp = false;
00600 }
00601 else {
00602
00603 if (HasToAlloc) {
00604 *answMoreDataAllocated = malloc(LastServerResp.dlen);
00605 memcpy(*answMoreDataAllocated,
00606 &fREQWaitRespData->respdata,
00607 LastServerResp.dlen);
00608 }
00609 else {
00610
00611 memcpy(answMoreData,
00612 &fREQWaitRespData->respdata,
00613 LastServerResp.dlen);
00614
00615 }
00616
00617
00618 resp = true;
00619 }
00620
00621 free( fREQWaitRespData);
00622 fREQWaitRespData = 0;
00623
00624 abortcmd = false;
00625
00626 }
00627
00628
00629 }
00630 else {
00631
00632 abortcmd = CheckErrorStatus(cmdrespMex, retry, CmdName);
00633
00634
00635
00636 if (req->header.requestid == kXR_open)
00637 req->open.options &= ((kXR_unt16)~kXR_refresh);
00638 }
00639 }
00640
00641 if (retry > kXR_maxReqRetry) {
00642 Error("SendGenCommand",
00643 "Too many errors messages from server."
00644 " Aborting command.");
00645
00646 abortcmd = TRUE;
00647 }
00648 }
00649 }
00650
00651 delete cmdrespMex;
00652 }
00653
00654 return (!abortcmd);
00655 }
00656
00657
00658 bool XrdClientConn::CheckHostDomain(XrdOucString hostToCheck)
00659 {
00660
00661
00662 static XrdOucHash<int> knownHosts;
00663 static XrdOucString alloweddomains = EnvGetString(NAME_REDIRDOMAINALLOW_RE);
00664 static XrdOucString denieddomains = EnvGetString(NAME_REDIRDOMAINDENY_RE);
00665
00666
00667 int *he = knownHosts.Find(hostToCheck.c_str());
00668 if (he)
00669 return (*he == 1) ? TRUE : FALSE;
00670
00671
00672 XrdOucString domain = GetDomainToMatch(hostToCheck);
00673
00674
00675 if (domain.length() <= 0) {
00676 Error("CheckHostDomain", "Error resolving domain name for " <<
00677 hostToCheck << ". Denying access.");
00678 return FALSE;
00679 }
00680 Info(XrdClientDebug::kHIDEBUG, "CheckHostDomain", "Resolved [" << hostToCheck <<
00681 "]'s domain name into [" << domain << "]" );
00682
00683
00684
00685 if (DomainMatcher(domain, denieddomains) ) {
00686 knownHosts.Add(hostToCheck.c_str(), new int(0));
00687 Error("CheckHostDomain", "Access denied to the domain of [" << hostToCheck << "].");
00688 return FALSE;
00689 }
00690
00691
00692
00693 if (DomainMatcher(domain, alloweddomains) ) {
00694 knownHosts.Add(hostToCheck.c_str(), new int(1));
00695 Info(XrdClientDebug::kHIDEBUG, "CheckHostDomain",
00696 "Access granted to the domain of [" << hostToCheck << "].");
00697 return TRUE;
00698 }
00699
00700 Error("CheckHostDomain", "Access to domain " << domain <<
00701 " is not allowed nor denied: deny.");
00702
00703 return FALSE;
00704
00705 }
00706
00707
00708 bool XrdClientConn::DomainMatcher(XrdOucString dom, XrdOucString domlist)
00709 {
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721 Info(XrdClientDebug::kHIDEBUG, "DomainMatcher",
00722 "search for '"<<dom<<"' in '"<<domlist<<"'");
00723
00724
00725 if (domlist.length() > 0) {
00726 XrdOucString domain;
00727 int nm = 0, from = 0;
00728 while ((from = domlist.tokenize(domain, from, '|')) != STR_NPOS) {
00729 Info(XrdClientDebug::kDUMPDEBUG, "DomainMatcher",
00730 "checking domain: "<<domain);
00731 nm = dom.matches(domain.c_str());
00732 if (nm > 0) {
00733 Info(XrdClientDebug::kHIDEBUG, "DomainMatcher",
00734 "domain: "<<domain<<" matches '"<<dom
00735 <<"' (matching chars: "<<nm<<")");
00736 return TRUE;
00737 }
00738 }
00739 }
00740 Info(XrdClientDebug::kHIDEBUG, "DomainMatcher",
00741 "no domain matching '"<<dom<<"' found in '"<<domlist<<"'");
00742
00743 return FALSE;
00744 }
00745
00746
00747 bool XrdClientConn::CheckResp(struct ServerResponseHeader *resp, const char *method)
00748 {
00749
00750
00751
00752
00753 if (MatchStreamid(resp)) {
00754
00755
00756 if (resp->status == kXR_redirect) {
00757
00758 Error(method, "Error in handling a redirection.");
00759 return FALSE;
00760 }
00761
00762 if ((resp->status != kXR_ok) && (resp->status != kXR_authmore))
00763
00764 return FALSE;
00765
00766 return TRUE;
00767
00768 } else {
00769 Error(method, "The return message doesn't belong to this client.");
00770 return FALSE;
00771 }
00772 }
00773
00774
00775 bool XrdClientConn::MatchStreamid(struct ServerResponseHeader *ServerResponse)
00776 {
00777
00778
00779
00780
00781 return ( memcmp(ServerResponse->streamid,
00782 &fPrimaryStreamid,
00783 sizeof(ServerResponse->streamid)) == 0 );
00784 }
00785
00786
00787 void XrdClientConn::SetSID(kXR_char *sid) {
00788
00789
00790 memcpy((void *)sid, (const void*)&fPrimaryStreamid, 2);
00791 }
00792
00793
00794
00795 XReqErrorType XrdClientConn::WriteToServer(ClientRequest *req,
00796 const void* reqMoreData,
00797 short LogConnID,
00798 int substreamid) {
00799
00800
00801 ClientRequest req_netfmt = *req;
00802 XrdClientLogConnection *lgc = 0;
00803 XrdClientPhyConnection *phyc = 0;
00804
00805 if (DebugLevel() >= XrdClientDebug::kDUMPDEBUG)
00806 smartPrintClientHeader(req);
00807
00808 lgc = ConnectionManager->GetConnection(LogConnID);
00809 if (!lgc) {
00810 Error("WriteToServer",
00811 "Unknown logical conn " << LogConnID);
00812
00813 return kWRITE;
00814 }
00815
00816 phyc = lgc->GetPhyConnection();
00817 if (!phyc) {
00818 Error("WriteToServer",
00819 "Cannot find physical conn for logid " << LogConnID);
00820
00821 return kWRITE;
00822 }
00823
00824 clientMarshall(&req_netfmt);
00825
00826
00827 {
00828 XrdClientPhyConnLocker pcl(phyc);
00829
00830
00831
00832
00833 short len = sizeof(req->header);
00834
00835
00836 int writeres;
00837 if ( req->header.requestid == kXR_bind )
00838 writeres = ConnectionManager->WriteRaw(LogConnID, &req_netfmt, len, substreamid);
00839 else
00840 writeres = ConnectionManager->WriteRaw(LogConnID, &req_netfmt, len, 0);
00841
00842 fLastDataBytesSent = req->header.dlen;
00843
00844
00845
00846 if (writeres < 0) {
00847 Error("WriteToServer",
00848 "Error sending " << len << " bytes in the header part"
00849 " to server [" <<
00850 fUrl.Host << ":" << fUrl.Port << "].");
00851
00852 return kWRITE;
00853 }
00854
00855
00856
00857 if (req->header.dlen > 0) {
00858
00859
00860
00861
00862 writeres = ConnectionManager->WriteRaw(LogConnID, reqMoreData,
00863 req->header.dlen, substreamid);
00864
00865
00866
00867 if (writeres < 0) {
00868 Error("WriteToServer",
00869 "Error sending " << req->header.dlen << " bytes in the data part"
00870 " to server [" <<
00871 fUrl.Host << ":" << fUrl.Port << "].");
00872
00873 return kWRITE;
00874 }
00875 }
00876
00877 fLastDataBytesSent = req->header.dlen;
00878 return kOK;
00879 }
00880 }
00881
00882
00883 bool XrdClientConn::CheckErrorStatus(XrdClientMessage *mex, short &Retry, char *CmdName)
00884 {
00885
00886
00887 if (mex->HeaderStatus() == kXR_redirect) {
00888
00889 Error("CheckErrorStatus",
00890 "Error while being redirected for request " << CmdName );
00891 return TRUE;
00892 }
00893
00894 if (mex->HeaderStatus() == kXR_error) {
00895
00896
00897
00898 struct ServerResponseBody_Error *body_err;
00899
00900 body_err = (struct ServerResponseBody_Error *)(mex->GetData());
00901
00902 if (body_err) {
00903
00904
00905 fOpenError = (XErrorCode)ntohl(body_err->errnum);
00906
00907 Info(XrdClientDebug::kNODEBUG, "CheckErrorStatus", "Server [" << GetCurrentUrl().HostWPort <<
00908 "] declared: " <<
00909 (const char*)body_err->errmsg << "(error code: " << fOpenError << ")");
00910
00911
00912 memset(&LastServerError, 0, sizeof(LastServerError));
00913 memcpy(&LastServerError, body_err, mex->DataLen());
00914 LastServerError.errnum = fOpenError;
00915
00916 }
00917 return TRUE;
00918 }
00919
00920 if (mex->HeaderStatus() == kXR_wait) {
00921
00922
00923
00924 struct ServerResponseBody_Wait *body_wait;
00925
00926 body_wait = (struct ServerResponseBody_Wait *)mex->GetData();
00927
00928 if (body_wait) {
00929
00930 if (mex->DataLen() > 4)
00931 Info(XrdClientDebug::kUSERDEBUG, "CheckErrorStatus", "Server [" <<
00932 fUrl.Host << ":" << fUrl.Port <<
00933 "] requested " << ntohl(body_wait->seconds) << " seconds"
00934 " of wait. Server message is " << body_wait->infomsg)
00935 else
00936 Info(XrdClientDebug::kUSERDEBUG, "CheckErrorStatus", "Server [" <<
00937 fUrl.Host << ":" << fUrl.Port <<
00938 "] requested " << ntohl(body_wait->seconds) << " seconds"
00939 " of wait")
00940
00941
00942 int cmw = (getenv("XRDCLIENTMAXWAIT")) ? atoi(getenv("XRDCLIENTMAXWAIT")) : -1;
00943 int bws = (int)ntohl(body_wait->seconds);
00944 if ((cmw > -1) && cmw < bws) {
00945 Error("CheckErrorStatus", "XROOTD MaxWait forced - file is offline"
00946 ". Aborting command. " << cmw << " : " << bws);
00947 Retry= kXR_maxReqRetry;
00948 return TRUE;
00949 }
00950
00951
00952
00953 int newbws = bws;
00954 if (bws <= 0) newbws = 1;
00955 if (bws > 1800) newbws = 10;
00956 if (bws != newbws)
00957 Error("CheckErrorStatus", "Sleep time fixed from " << bws << " to " << newbws);
00958
00959
00960 sleep(newbws);
00961 }
00962
00963
00964 Retry--;
00965 return FALSE;
00966 }
00967
00968
00969 Error("CheckErrorStatus",
00970 "Answer from server [" <<
00971 fUrl.Host << ":" << fUrl.Port <<
00972 "] not recognized after executing " << CmdName);
00973
00974 return TRUE;
00975 }
00976
00977
00978 XrdClientMessage *XrdClientConn::ReadPartialAnswer(XReqErrorType &errorType,
00979 size_t &TotalBlkSize,
00980 ClientRequest *req,
00981 bool HasToAlloc, void** tmpMoreData,
00982 EThreeStateReadHandler &what_to_do)
00983 {
00984
00985
00986 int len;
00987 XrdClientMessage *Xmsg = 0;
00988 void *tmp2MoreData;
00989
00990
00991 if (errorType == kOK) {
00992
00993 len = sizeof(ServerResponseHeader);
00994
00995 Info(XrdClientDebug::kHIDEBUG, "ReadPartialAnswer",
00996 "Reading a XrdClientMessage from the server [" <<
00997 fUrl.Host << ":" << fUrl.Port << "]...");
00998
00999
01000
01001
01002
01003
01004 Xmsg = ConnectionManager->ReadMsg(fLogConnID);
01005
01006 fLastDataBytesRecv = Xmsg ? Xmsg->DataLen() : 0;
01007
01008 if ( !Xmsg || (Xmsg->IsError()) ) {
01009 Info(XrdClientDebug::kNODEBUG, "ReadPartialAnswer", "Failed to read msg from connmgr"
01010 " (server [" << fUrl.Host << ":" << fUrl.Port << "]). Retrying ...");
01011
01012 if (HasToAlloc) {
01013 if (*tmpMoreData)
01014 free(*tmpMoreData);
01015 *tmpMoreData = 0;
01016 }
01017 errorType = kREAD;
01018 }
01019 else
01020
01021 Xmsg->Unmarshall();
01022 }
01023
01024 if (Xmsg != 0)
01025 if (DebugLevel() >= XrdClientDebug::kDUMPDEBUG)
01026 smartPrintServerHeader(&Xmsg->fHdr);
01027
01028
01029
01030 if ((errorType == kOK) && (Xmsg->DataLen() > 0)) {
01031
01032
01033
01034 if ( (Xmsg->HeaderStatus() == kXR_ok) ||
01035 (Xmsg->HeaderStatus() == kXR_oksofar) ||
01036 (Xmsg->HeaderStatus() == kXR_authmore) )
01037 {
01038
01039
01040
01041
01042
01043
01044
01045
01046
01047 if (HasToAlloc) {
01048 tmp2MoreData = realloc(*tmpMoreData, TotalBlkSize + Xmsg->DataLen());
01049 if (!tmp2MoreData) {
01050
01051 Error("ReadPartialAnswer", "Error reallocating " <<
01052 TotalBlkSize << " bytes.");
01053
01054 free(*tmpMoreData);
01055 *tmpMoreData = 0;
01056 what_to_do = kTSRHReturnNullMex;
01057
01058 delete Xmsg;
01059
01060 return 0;
01061 }
01062 *tmpMoreData = tmp2MoreData;
01063 }
01064
01065
01066
01067 if (*tmpMoreData)
01068 memcpy(((kXR_char *)(*tmpMoreData)) + TotalBlkSize,
01069 Xmsg->GetData(), Xmsg->DataLen());
01070
01071
01072
01073
01074
01075
01076
01077
01078
01079
01080
01081
01082 TotalBlkSize += Xmsg->DataLen();
01083
01084 } else {
01085
01086 Info(XrdClientDebug::kHIDEBUG, "ReadPartialAnswer",
01087 "Server [" <<
01088 fUrl.Host << ":" << fUrl.Port << "] answered [" <<
01089 convertRespStatusToChar(Xmsg->fHdr.status) <<
01090 "] (" << Xmsg->fHdr.status << ")");
01091 }
01092 }
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109
01110 if ( (errorType == kREAD) ||
01111 (errorType == kWRITE) ||
01112 isRedir(&Xmsg->fHdr) )
01113 {
01114
01115
01116
01117 ESrvErrorHandlerRetval Return = HandleServerError(errorType, Xmsg, req);
01118
01119 if (Return == kSEHRReturnMsgToCaller) {
01120
01121
01122
01123
01124
01125 if (HasToAlloc) {
01126 free(*tmpMoreData);
01127 *tmpMoreData = 0;
01128 }
01129
01130
01131 what_to_do = kTSRHReturnMex;
01132 return Xmsg;
01133 }
01134
01135 if (Return == kSEHRReturnNoMsgToCaller) {
01136
01137
01138
01139
01140
01141
01142
01143 if (HasToAlloc) {
01144 free(*tmpMoreData);
01145 *tmpMoreData = 0;
01146 }
01147
01148 delete Xmsg;
01149 Xmsg = 0;
01150
01151 what_to_do = kTSRHReturnMex;
01152 return Xmsg;
01153 }
01154 }
01155
01156 what_to_do = kTSRHContinue;
01157 return Xmsg;
01158 }
01159
01160
01161
01162 bool XrdClientConn::GetAccessToSrv()
01163 {
01164
01165
01166
01167
01168
01169
01170
01171
01172 XrdClientLogConnection *logconn = ConnectionManager->GetConnection(fLogConnID);
01173
01174
01175 if (fGettingAccessToSrv) {
01176 logconn->GetPhyConnection()->StartReader();
01177 return true;
01178 }
01179
01180 fGettingAccessToSrv = true;
01181
01182 switch ((fServerType = DoHandShake(fLogConnID))) {
01183 case kSTError:
01184 Info(XrdClientDebug::kNODEBUG,
01185 "GetAccessToSrv",
01186 "HandShake failed with server [" <<
01187 fUrl.Host << ":" << fUrl.Port << "]");
01188
01189 Disconnect(TRUE);
01190
01191 fGettingAccessToSrv = false;
01192 return FALSE;
01193
01194 case kSTNone:
01195 Info(XrdClientDebug::kNODEBUG,
01196 "GetAccessToSrv", "The server on [" <<
01197 fUrl.Host << ":" << fUrl.Port << "] is unknown");
01198
01199 Disconnect(TRUE);
01200
01201 fGettingAccessToSrv = false;
01202 return FALSE;
01203
01204 case kSTRootd:
01205
01206 if (EnvGetLong(NAME_KEEPSOCKOPENIFNOTXRD) == 1) {
01207 Info(XrdClientDebug::kHIDEBUG,
01208 "GetAccessToSrv","Ok: the server on [" <<
01209 fUrl.Host << ":" << fUrl.Port <<
01210 "] is a rootd. Saving socket for later use.");
01211
01212 fOpenSockFD = logconn->GetPhyConnection()->SaveSocket();
01213 Disconnect(TRUE);
01214 ConnectionManager->GarbageCollect();
01215 break;
01216
01217 } else {
01218
01219 Info(XrdClientDebug::kHIDEBUG,
01220 "GetAccessToSrv","Ok: the server on [" <<
01221 fUrl.Host << ":" << fUrl.Port << "] is a rootd."
01222 " Not supported.");
01223
01224 Disconnect(TRUE);
01225
01226 fGettingAccessToSrv = false;
01227 return FALSE;
01228 }
01229
01230 case kSTBaseXrootd:
01231
01232 Info(XrdClientDebug::kHIDEBUG,
01233 "GetAccessToSrv",
01234 "Ok: the server on [" <<
01235 fUrl.Host << ":" << fUrl.Port << "] is an xrootd redirector.");
01236
01237 logconn->GetPhyConnection()->SetTTL(EnvGetLong(NAME_LBSERVERCONN_TTL));
01238
01239 break;
01240
01241 case kSTDataXrootd:
01242
01243 Info( XrdClientDebug::kHIDEBUG,
01244 "GetAccessToSrv",
01245 "Ok, the server on [" <<
01246 fUrl.Host << ":" << fUrl.Port << "] is an xrootd data server.");
01247
01248 logconn->GetPhyConnection()->SetTTL(EnvGetLong(NAME_DATASERVERCONN_TTL));
01249
01250 break;
01251 }
01252
01253 bool retval = false;
01254
01255
01256 XrdClientPhyConnection *phyc = logconn->GetPhyConnection();
01257 if (!phyc) {
01258 fGettingAccessToSrv = false;
01259 return false;
01260 }
01261
01262 XrdClientPhyConnLocker pl(phyc);
01263
01264
01265 if (fServerType != kSTRootd) {
01266
01267 phyc = logconn->GetPhyConnection();
01268 if (!phyc || !phyc->IsValid()) {
01269 Error( "GetAccessToSrv", "Physical connection disappeared.");
01270 fGettingAccessToSrv = false;
01271 return false;
01272 }
01273
01274
01275 phyc->StartReader();
01276
01277 if (phyc->IsLogged() == kNo)
01278 retval = DoLogin();
01279 else {
01280
01281 Info( XrdClientDebug::kHIDEBUG,
01282 "GetAccessToSrv", "Reusing physical connection to server [" <<
01283 fUrl.Host << ":" << fUrl.Port << "]).");
01284
01285 retval = true;
01286 }
01287 }
01288 else
01289 retval = true;
01290
01291 fGettingAccessToSrv = false;
01292 return retval;
01293 }
01294
01295
01296 ERemoteServerType XrdClientConn::DoHandShake(short int log) {
01297
01298 struct ServerInitHandShake xbody;
01299 ERemoteServerType type;
01300
01301
01302 XrdClientLogConnection *lcn = ConnectionManager->GetConnection(log);
01303
01304 if (!lcn) return kSTError;
01305
01306 XrdClientPhyConnection *phyconn = lcn->GetPhyConnection();
01307
01308 if (!phyconn || !phyconn->IsValid()) return kSTError;
01309
01310
01311 {
01312 XrdClientPhyConnLocker pl(phyconn);
01313
01314 if (phyconn->fServerType == kSTBaseXrootd) {
01315
01316 Info(XrdClientDebug::kUSERDEBUG,
01317 "DoHandShake",
01318 "The physical channel is already bound to a load balancer"
01319 " server [" <<
01320 fUrl.Host << ":" << fUrl.Port << "]. No handshake is needed.");
01321
01322 fServerProto = phyconn->fServerProto;
01323
01324 if (!fLBSUrl || (fLBSUrl->Host == "")) {
01325
01326 Info(XrdClientDebug::kHIDEBUG,
01327 "DoHandShake", "Setting Load Balancer Server Url = " <<
01328 fUrl.GetUrl() );
01329
01330
01331 fLBSUrl = new XrdClientUrlInfo(fUrl.GetUrl());
01332 if(!fLBSUrl) {
01333 Error("DoHandShake","Object creation "
01334 " failed. Probable system resources exhausted.");
01335 abort();
01336 }
01337 }
01338 return kSTBaseXrootd;
01339 }
01340
01341
01342 if (phyconn->fServerType == kSTDataXrootd) {
01343
01344 if (DebugLevel() >= XrdClientDebug::kHIDEBUG)
01345 Info(XrdClientDebug::kHIDEBUG,
01346 "DoHandShake",
01347 "The physical channel is already bound to the data server"
01348 " [" << fUrl.Host << ":" << fUrl.Port << "]. No handshake is needed.");
01349
01350 fServerProto = phyconn->fServerProto;
01351
01352 return kSTDataXrootd;
01353 }
01354
01355
01356 type = phyconn->DoHandShake(xbody);
01357 if (type == kSTError) return type;
01358
01359
01360
01361
01362 fServerProto = xbody.protover;
01363
01364
01365
01366 phyconn->fServerProto = fServerProto;
01367
01368 if (type == kSTBaseXrootd) {
01369
01370 if (!fLBSUrl || (fLBSUrl->Host == "")) {
01371
01372 Info(XrdClientDebug::kHIDEBUG, "DoHandShake", "Setting Load Balancer Server Url = " <<
01373 fUrl.GetUrl() );
01374
01375
01376 fLBSUrl = new XrdClientUrlInfo(fUrl.GetUrl());
01377 if (!fLBSUrl) {
01378 Error("DoHandShake","Object creation failed.");
01379 abort();
01380 }
01381 }
01382
01383 }
01384
01385 return type;
01386
01387
01388 }
01389 }
01390
01391
01392 bool XrdClientConn::DoLogin()
01393 {
01394
01395
01396
01397 ClientRequest reqhdr;
01398 bool resp;
01399
01400
01401 memset( &reqhdr, 0, sizeof(reqhdr));
01402
01403 SetSID(reqhdr.header.streamid);
01404 reqhdr.header.requestid = kXR_login;
01405 reqhdr.login.capver[0] = XRD_CLIENT_CAPVER;
01406 reqhdr.login.pid = getpid();
01407
01408
01409 XrdOucString User = fUrl.User;
01410 if (User.length() <= 0) {
01411
01412 #ifndef WIN32
01413 struct passwd *u = getpwuid(getuid());
01414 if (u >= 0)
01415 User = u->pw_name;
01416 #else
01417 char name[256];
01418 DWORD length = sizeof (name);
01419 GetUserName(name, &length);
01420 User = name;
01421 #endif
01422 }
01423 if (User.length() > 0)
01424 strncpy( (char *)reqhdr.login.username, User.c_str(), 8 );
01425 else
01426 strcpy( (char *)reqhdr.login.username, "????" );
01427
01428
01429
01430 XrdOucString effUser = User;
01431 #ifndef WIN32
01432 if (!getuid()) {
01433 if (getenv("XrdClientEUSER")) effUser = getenv("XrdClientEUSER");
01434 }
01435 XrdSysPrivGuard guard(effUser.c_str());
01436 if (!guard.Valid() && !getuid()) {
01437
01438 fOpenError = kXR_NotAuthorized;
01439 LastServerError.errnum = fOpenError;
01440 XrdOucString emsg("Cannot set effective uid for user: ");
01441 emsg += effUser;
01442 strcpy(LastServerError.errmsg, emsg.c_str());
01443 Error("DoLogin", emsg << ". Exiting.");
01444 return false;
01445 }
01446 #endif
01447
01448
01449
01450 reqhdr.header.dlen = fRedirInternalToken.length();
01451
01452
01453 Info(XrdClientDebug::kHIDEBUG,
01454 "DoLogin",
01455 "Logging into the server [" << fUrl.Host << ":" << fUrl.Port <<
01456 "]. pid=" << reqhdr.login.pid << " uid=" << reqhdr.login.username);
01457
01458 {
01459 XrdClientLogConnection *l = ConnectionManager->GetConnection(fLogConnID);
01460 XrdClientPhyConnection *p = 0;
01461 if (l) p = l->GetPhyConnection();
01462 if (p) p->SetLogged(kNo);
01463 else {
01464 Error("DoLogin",
01465 "Logical connection disappeared before request?!? Srv: [" << fUrl.Host << ":" << fUrl.Port <<
01466 "]. Exiting.");
01467 return false;
01468 }
01469 }
01470
01471
01472 char *plist = 0;
01473 resp = SendGenCommand(&reqhdr, fRedirInternalToken.c_str(),
01474 (void **)&plist, 0,
01475 TRUE, (char *)"XrdClientConn::DoLogin");
01476
01477
01478 XrdSecProtocol *secp = 0;
01479 SessionIDInfo *prevsessid = 0;
01480 XrdOucString sessname;
01481 XrdOucString sessdump;
01482 if (resp && LastServerResp.dlen && plist) {
01483
01484 plist = (char *)realloc(plist, LastServerResp.dlen+1);
01485
01486 plist[LastServerResp.dlen]=0;
01487
01488 char *pauth = 0;
01489 int lenauth = 0;
01490 if ((fServerProto >= 0x240) && (LastServerResp.dlen >= 16)) {
01491
01492 if (XrdClientDebug::kHIDEBUG <= DebugLevel()) {
01493 char b[20];
01494 for (unsigned int i = 0; i < 16; i++) {
01495 snprintf(b, 20, "%.2x", plist[i]);
01496 sessdump += b;
01497 }
01498 Info(XrdClientDebug::kHIDEBUG,
01499 "DoLogin","Got session ID: " << sessdump);
01500 }
01501
01502
01503
01504 char buf[20];
01505 snprintf(buf, 20, "%d", fUrl.Port);
01506
01507 sessname = fUrl.HostAddr;
01508 if (sessname.length() <= 0)
01509 sessname = fUrl.Host;
01510
01511 sessname += ":";
01512 sessname += buf;
01513
01514 prevsessid = fSessionIDRepo.Find(sessname.c_str());
01515
01516
01517 if (LastServerResp.dlen > 16) {
01518 Info(XrdClientDebug::kHIDEBUG, "DoLogin","server requires authentication");
01519 pauth = plist+16;
01520 lenauth = LastServerResp.dlen-15;
01521 }
01522
01523
01524 } else {
01525
01526 Info(XrdClientDebug::kHIDEBUG, "DoLogin","server requires authentication");
01527 pauth = plist;
01528 lenauth = LastServerResp.dlen+1;
01529 }
01530
01531
01532 if (pauth) {
01533
01534 char *cenv = 0;
01535
01536
01537 if (EnvGetLong(NAME_DEBUG) > 0) {
01538 cenv = new char[18];
01539 sprintf(cenv, "XrdSecDEBUG=%ld",EnvGetLong(NAME_DEBUG));
01540 putenv(cenv);
01541 }
01542
01543
01544 cenv = new char[User.length()+12];
01545 sprintf(cenv, "XrdSecUSER=%s",User.c_str());
01546 putenv(cenv);
01547
01548
01549 cenv = new char[fUrl.Host.length()+12];
01550 sprintf(cenv, "XrdSecHOST=%s",fUrl.Host.c_str());
01551 putenv(cenv);
01552
01553 secp = DoAuthentication(pauth, lenauth);
01554 resp = (secp != 0) ? 1 : 0;
01555 }
01556
01557
01558 if (prevsessid) {
01559
01560
01561
01562
01563 if (XrdClientDebug::kHIDEBUG <= DebugLevel()) {
01564 XrdOucString sessdump;
01565 char b[20];
01566 for (unsigned int i = 0; i < sizeof(prevsessid->id); i++) {
01567 snprintf(b, 20, "%.2x", prevsessid->id[i]);
01568 sessdump += b;
01569 }
01570 Info(XrdClientDebug::kHIDEBUG,
01571 "DoLogin","Found prev session info for " << sessname <<
01572 ": " << sessdump);
01573 }
01574
01575 memset( &reqhdr, 0, sizeof(reqhdr));
01576 SetSID(reqhdr.header.streamid);
01577 reqhdr.header.requestid = kXR_endsess;
01578
01579 memcpy(reqhdr.endsess.sessid, prevsessid->id, sizeof(prevsessid->id));
01580
01581
01582 Info(XrdClientDebug::kHIDEBUG,
01583 "DoLogin","Trying to terminate previous session.");
01584
01585 SendGenCommand(&reqhdr, 0, 0, 0,
01586 FALSE, (char *)"XrdClientConn::Endsess");
01587
01588
01589 for (unsigned int i=0; i < 16; i++)
01590 prevsessid->id[i] = plist[i];
01591
01592
01593
01594 } else {
01595 Info(XrdClientDebug::kHIDEBUG,
01596 "DoLogin","No prev session info for " << sessname);
01597
01598
01599 SessionIDInfo *newsessid = new SessionIDInfo;
01600
01601 for (int i=0; i < int(sizeof(newsessid->id)); i++)
01602 newsessid->id[i] = plist[i];
01603
01604 fSessionIDRepo.Rep(sessname.c_str(), newsessid);
01605 }
01606
01607 }
01608
01609
01610 {
01611 XrdClientLogConnection *l = ConnectionManager->GetConnection(fLogConnID);
01612 XrdClientPhyConnection *p = 0;
01613 if (l) p = l->GetPhyConnection();
01614 if (!p) {
01615 Error("DoLogin",
01616 "Logical connection disappeared after request?!? Srv: [" << fUrl.Host << ":" << fUrl.Port <<
01617 "]. Exiting.");
01618 return false;
01619 }
01620
01621 if (resp) {
01622 p->SetLogged(kYes);
01623 p->SetSecProtocol(secp);
01624 }
01625 else Disconnect(true);
01626 }
01627
01628 if (plist)
01629 free(plist);
01630
01631 return resp;
01632
01633 }
01634
01635
01636 XrdSecProtocol *XrdClientConn::DoAuthentication(char *plist, int plsiz)
01637 {
01638
01639
01640
01641 static XrdSecGetProt_t getp = 0;
01642 XrdSecProtocol *protocol = (XrdSecProtocol *)0;
01643
01644 if (!plist || plsiz <= 0)
01645 return protocol;
01646
01647 Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
01648 "host " << fUrl.Host << " sent a list of " << plsiz << " bytes");
01649
01650
01651
01652 struct sockaddr_in netaddr;
01653 char **hosterrmsg = 0;
01654 if (XrdNetDNS::getHostAddr((char *)fUrl.HostAddr.c_str(),
01655 (struct sockaddr &)netaddr, hosterrmsg) <= 0) {
01656 Info(XrdClientDebug::kUSERDEBUG, "DoAuthentication",
01657 "getHostAddr said '" << *hosterrmsg << "'");
01658 return protocol;
01659 }
01660 netaddr.sin_port = fUrl.Port;
01661
01662
01663
01664 XrdSecParameters *secToken = 0;
01665 XrdSecCredentials *credentials = 0;
01666
01667
01668
01669 char *bpar = (char *)malloc(plsiz + 1);
01670 if (bpar) memcpy(bpar, plist, plsiz);
01671 bpar[plsiz] = 0;
01672 XrdSecParameters Parms(bpar, plsiz + 1);
01673
01674
01675 if (!getp) {
01676
01677 void *lh = 0;
01678 if (!(lh = dlopen("libXrdSec.so", RTLD_NOW))) {
01679 Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
01680 "unable to load libXrdSec.so");
01681
01682 fOpenError = kXR_NotAuthorized;
01683 LastServerError.errnum = fOpenError;
01684 strcpy(LastServerError.errmsg, "unable to load libXrdSec.so");
01685 return protocol;
01686 }
01687
01688
01689 if (!(getp = (XrdSecGetProt_t) dlsym(lh, "XrdSecGetProtocol"))) {
01690 Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
01691 "unable to load XrdSecGetProtocol()");
01692
01693 fOpenError = kXR_NotAuthorized;
01694 LastServerError.errnum = fOpenError;
01695 strcpy(LastServerError.errmsg, "unable to load XrdSecGetProtocol()");
01696 return protocol;
01697 }
01698 }
01699
01700
01701
01702
01703 while ((protocol = (*getp)((char *)fUrl.Host.c_str(),
01704 (const struct sockaddr &)netaddr, Parms, 0))) {
01705
01706
01707 XrdOucString protname = protocol->Entity.prot;
01708
01709
01710 XrdOucErrInfo ei;
01711 credentials = protocol->getCredentials(0, &ei);
01712 if (!credentials) {
01713 Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
01714 "cannot obtain credentials (protocol: "<<
01715 protname<<")");
01716
01717 fOpenError = kXR_NotAuthorized;
01718 LastServerError.errnum = fOpenError;
01719 strcpy(LastServerError.errmsg, "cannot obtain credentials for protocol: ");
01720 strcat(LastServerError.errmsg, ei.getErrText());
01721 protocol->Delete();
01722 protocol = 0;
01723 continue;
01724 } else {
01725 Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
01726 "credentials size: "<< credentials->size);
01727 }
01728
01729
01730 ClientRequest reqhdr;
01731 memset(reqhdr.auth.reserved, 0, 12);
01732 memset(reqhdr.auth.credtype, 0, 4 );
01733 memcpy(reqhdr.auth.credtype, protname.c_str(), protname.length() > 4 ? 4 : protname.length() );
01734
01735 LastServerResp.status = kXR_authmore;
01736 char *srvans = 0;
01737 while (LastServerResp.status == kXR_authmore) {
01738 bool resp = false;
01739
01740
01741
01742 reqhdr.header.dlen = credentials->size;
01743 SetSID(reqhdr.header.streamid);
01744 reqhdr.header.requestid = kXR_auth;
01745 resp = SendGenCommand(&reqhdr, credentials->buffer, (void **)&srvans, 0, TRUE,
01746 (char *)"XrdClientConn::DoAuthentication");
01747 SafeDelete(credentials);
01748 Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
01749 "server reply: status: "<<
01750 LastServerResp.status <<
01751 " dlen: "<< LastServerResp.dlen);
01752 if (resp && (LastServerResp.status == kXR_authmore)) {
01753
01754
01755
01756
01757 secToken = new XrdSecParameters(srvans,LastServerResp.dlen);
01758
01759
01760 credentials = protocol->getCredentials(secToken, &ei);
01761 SafeDelete(secToken);
01762 srvans = 0;
01763 if (!credentials) {
01764 Info(XrdClientDebug::kUSERDEBUG, "DoAuthentication",
01765 "cannot obtain credentials");
01766
01767 fOpenError = kXR_NotAuthorized;
01768 LastServerError.errnum = fOpenError;
01769 strcpy(LastServerError.errmsg, "cannot obtain credentials: ");
01770 strcat(LastServerError.errmsg, ei.getErrText());
01771 protocol->Delete();
01772 protocol = 0;
01773 break;
01774 } else {
01775 Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
01776 "credentials size " << credentials->size);
01777 }
01778 } else {
01779
01780
01781 if (LastServerResp.status == kXR_error) {
01782
01783
01784 if (LastServerError.errmsg)
01785 Error("DoAuthentication", LastServerError.errmsg);
01786
01787 protocol->Delete();
01788 protocol = 0;
01789
01790 break;
01791 }
01792
01793 if (!resp) {
01794
01795
01796 protocol->Delete();
01797 protocol = 0;
01798
01799 break;
01800 }
01801
01802 }
01803 }
01804
01805
01806 if (protocol) break;
01807 }
01808
01809 if (!protocol) {
01810 Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
01811 "unable to get protocol object.");
01812
01813 fOpenError = kXR_NotAuthorized;
01814 LastServerError.errnum = fOpenError;
01815 strcpy(LastServerError.errmsg, "unable to get protocol object.");
01816 }
01817
01818
01819
01820 return protocol;
01821 }
01822
01823
01824 XrdClientConn::ESrvErrorHandlerRetval
01825 XrdClientConn::HandleServerError(XReqErrorType &errorType, XrdClientMessage *xmsg,
01826 ClientRequest *req)
01827 {
01828
01829
01830 int newport;
01831 XrdOucString newhost;
01832
01833 bool noRedirError = (fMaxGlobalRedirCnt == 1 && xmsg && isRedir(&xmsg->fHdr));
01834
01835
01836
01837
01838
01839
01840 if (!noRedirError) {
01841 if ((errorType == kREAD) || (errorType == kWRITE)) {
01842 Disconnect(TRUE);
01843
01844 if (fMainReadCache)
01845 fMainReadCache->RemovePlaceholders();
01846 } else
01847 Disconnect(FALSE);
01848 }
01849
01850
01851 do {
01852
01853
01854 fGlobalRedirCnt++;
01855
01856 Info(XrdClientDebug::kHIDEBUG,
01857 "HandleServerError",
01858 "Redir count=" << fGlobalRedirCnt);
01859
01860 if ( fGlobalRedirCnt >= fMaxGlobalRedirCnt ) {
01861 if (noRedirError) {
01862
01863
01864 newhost = "";
01865 newport = 0;
01866
01867 ParseRedir(xmsg, newport, newhost, fRedirOpaque, fRedirInternalToken);
01868
01869
01870
01871
01872
01873
01874
01875 fGlobalRedirCnt = 0;
01876 return kSEHRReturnMsgToCaller;
01877 } else {
01878 return kSEHRContinue;
01879 }
01880 }
01881
01882
01883 if (IsOpTimeLimitElapsed(time(0))) return kSEHRContinue;
01884
01885 newhost = "";
01886 newport = 0;
01887
01888 if ((errorType == kREAD) ||
01889 (errorType == kWRITE) ||
01890 (errorType == kREDIRCONNECT)) {
01891
01892 bool cangoaway = ( fRedirHandler &&
01893 fRedirHandler->CanRedirOnError() );
01894
01895
01896
01897
01898
01899
01900
01901
01902
01903
01904 if ( (fREQUrl.Host.length() > 0) ) {
01905
01906
01907
01908
01909 newhost = fREQUrl.Host;
01910 newport = fREQUrl.Port;
01911
01912 ParseRedirHost(newhost, fRedirOpaque, fRedirInternalToken);
01913
01914
01915
01916 fREQUrl.Clear();
01917 }
01918 else
01919 if ( cangoaway && fLBSUrl && (fLBSUrl->GetUrl().length() > 0) ) {
01920
01921
01922
01923
01924 newhost = fLBSUrl->Host;
01925 newport = fLBSUrl->Port;
01926 }
01927 else {
01928
01929 Error("HandleServerError",
01930 "Communication error"
01931 " with server [" << fUrl.Host << ":" << fUrl.Port <<
01932 "]. Rebouncing here.");
01933
01934 if (fUrl.Host.length()) newhost = fUrl.Host;
01935 else
01936 newhost = fUrl.HostAddr;
01937
01938 newport = fUrl.Port;
01939 }
01940
01941 } else if (isRedir(&xmsg->fHdr)) {
01942
01943
01944 newhost = "";
01945 newport = 0;
01946
01947
01948 ParseRedir(xmsg, newport, newhost, fRedirOpaque, fRedirInternalToken);
01949
01950
01951
01952 }
01953
01954
01955
01956 CheckPort(newport);
01957
01958 if ((newhost.length() > 0) && newport) {
01959 XrdClientUrlInfo NewUrl(fUrl.GetUrl());
01960
01961 if (DebugLevel() >= XrdClientDebug::kUSERDEBUG)
01962 Info(XrdClientDebug::kUSERDEBUG,
01963 "HandleServerError",
01964 "Received redirection to [" << newhost << ":" << newport <<
01965 "]. Token=[" << fRedirInternalToken << "]" <<
01966 "]. Opaque=[" << fRedirOpaque << "].");
01967
01968 errorType = kOK;
01969
01970 NewUrl.Host = NewUrl.HostAddr = newhost;
01971 NewUrl.Port = newport;
01972 NewUrl.SetAddrFromHost();
01973
01974
01975 if ( !CheckHostDomain(newhost) ) {
01976 Error("HandleServerError",
01977 "Redirection to a server out-of-domain disallowed. Abort.");
01978 abort();
01979 }
01980
01981 errorType = GoToAnotherServer(NewUrl);
01982 }
01983 else {
01984
01985 Error("HandleServerError",
01986 "Received redirection to [" << newhost << ":" << newport <<
01987 "]. Token=[" << fRedirInternalToken << "]" <<
01988 "]. Opaque=[" << fRedirOpaque << "]. No server to go...");
01989
01990 errorType = kREDIRCONNECT;
01991 }
01992
01993
01994 if (errorType == kREDIRCONNECT) {
01995 if (LastServerError.errnum == kXR_NotAuthorized)
01996 return kSEHRReturnMsgToCaller;
01997
01998 sleep(EnvGetLong(NAME_RECONNECTWAIT));
01999 }
02000
02001
02002
02003
02004
02005
02006 } while ((errorType == kREDIRCONNECT) && (LastServerError.errnum != kXR_NotAuthorized));
02007
02008
02009
02010 if (!IsConnected()) {
02011 Error("HandleServerError",
02012 "Not connected. Internal error. Abort.");
02013 abort();
02014 }
02015
02016
02017
02018
02019
02020
02021
02022
02023 if ((req->header.requestid == kXR_open) ||
02024 (req->header.requestid == kXR_login)) return kSEHRReturnNoMsgToCaller;
02025
02026
02027
02028 char localfhandle[4];
02029 bool wasopen, newopenok;
02030
02031 if (fRedirHandler) {
02032 newopenok = fRedirHandler->OpenFileWhenRedirected(localfhandle, wasopen);
02033 if (newopenok && wasopen) {
02034
02035
02036
02037
02038
02039 PutFilehandleInRequest(req, localfhandle);
02040
02041
02042
02043
02044
02045 if (xmsg && !xmsg->IsError())
02046 return kSEHRContinue;
02047 else
02048 return kSEHRReturnNoMsgToCaller;
02049 }
02050
02051 if (!newopenok) return kSEHRContinue;
02052
02053 }
02054
02055
02056
02057 if (!fRedirHandler) {
02058
02059
02060 if (xmsg && !xmsg->IsError())
02061 return kSEHRContinue;
02062 else
02063 return kSEHRReturnNoMsgToCaller;
02064 }
02065
02066
02067
02068 return kSEHRContinue;
02069 }
02070
02071
02072 XReqErrorType XrdClientConn::GoToAnotherServer(XrdClientUrlInfo &newdest)
02073 {
02074
02075
02076 fGettingAccessToSrv = false;
02077
02078 if (!newdest.Port) newdest.Port = 1094;
02079 if (newdest.HostAddr == "") newdest.HostAddr = newdest.Host;
02080
02081 if ( (fLogConnID = Connect( newdest, fUnsolMsgHandler)) == -1) {
02082
02083
02084
02085 Error("GoToAnotherServer", "Error connecting to [" <<
02086 newdest.Host << ":" << newdest.Port);
02087
02088
02089 return kREDIRCONNECT;
02090 }
02091
02092
02093
02094
02095
02096 fUrl = newdest;
02097
02098 if (IsConnected() && !GetAccessToSrv()) {
02099 Error("GoToAnotherServer", "Error handshaking to [" <<
02100 newdest.Host.c_str() << ":" << newdest.Port << "]");
02101 return kREDIRCONNECT;
02102 }
02103
02104 fPrimaryStreamid = ConnectionManager->GetConnection(fLogConnID)->Streamid();
02105
02106 return kOK;
02107 }
02108
02109
02110 XReqErrorType XrdClientConn::GoBackToRedirector() {
02111
02112
02113
02114 Disconnect(false);
02115 if (fGlobalRedirCnt) fGlobalRedirCnt--;
02116 return (fLBSUrl ? GoToAnotherServer(*fLBSUrl) : kOK);
02117 }
02118
02119
02120 XrdOucString XrdClientConn::GetDomainToMatch(XrdOucString hostname) {
02121
02122
02123
02124
02125
02126 char *fullname, *err;
02127
02128
02129 XrdOucString res = ParseDomainFromHostname(hostname);
02130 if (res.length() > 0)
02131 return res;
02132
02133
02134
02135 err =
02136 fullname = XrdNetDNS::getHostName((char *)hostname.c_str(), &err);
02137
02138 if ( strcmp(fullname, (char *)"0.0.0.0") ) {
02139
02140
02141
02142 Info(XrdClientDebug::kHIDEBUG,
02143 "GetDomainToMatch", "GetHostName(" << hostname <<
02144 ") returned name=" << fullname);
02145
02146 res = ParseDomainFromHostname(fullname);
02147
02148 if (res == "") {
02149 Info(XrdClientDebug::kHIDEBUG,
02150 "GetDomainToMatch", "No domain contained in " << fullname);
02151
02152 res = ParseDomainFromHostname(hostname);
02153 }
02154 if (res == "") {
02155 Info(XrdClientDebug::kHIDEBUG,
02156 "GetDomainToMatch", "No domain contained in " << hostname);
02157
02158 res = hostname;
02159 }
02160
02161 } else {
02162
02163 Info(XrdClientDebug::kHIDEBUG,
02164 "GetDomainToMatch", "GetHostName(" << hostname << ") returned a non valid address. errtxt=" << err);
02165
02166 res = ParseDomainFromHostname(hostname);
02167 }
02168
02169 Info(XrdClientDebug::kHIDEBUG,
02170 "GetDomainToMatch", "GetDomain(" << hostname << ") --> " << res);
02171
02172
02173 if (fullname) free(fullname);
02174
02175 return res;
02176 }
02177
02178
02179 XrdOucString XrdClientConn::ParseDomainFromHostname(XrdOucString hostname)
02180 {
02181
02182
02183 XrdOucString res;
02184 int idot = hostname.find('.');
02185 if (idot != STR_NPOS)
02186 res.assign(hostname, idot+1);
02187
02188 return res;
02189 }
02190
02191
02192 void XrdClientConn::CheckPort(int &port) {
02193
02194 if(port <= 0) {
02195
02196 Info(XrdClientDebug::kHIDEBUG,
02197 "checkPort",
02198 "TCP port not specified. Trying to get it from /etc/services...");
02199
02200 struct servent *S = getservbyname("rootd", "tcp");
02201 if(!S) {
02202
02203 Info(XrdClientDebug::kHIDEBUG,
02204 "checkPort", "Service rootd not specified in /etc/services. "
02205 "Using default IANA tcp port 1094");
02206 port = 1094;
02207 } else {
02208 Info(XrdClientDebug::kNODEBUG,
02209 "checkPort", "Found tcp port " << ntohs(S->s_port) <<
02210 " in /etc/service");
02211
02212 port = (int)ntohs(S->s_port);
02213 }
02214
02215 }
02216 }
02217
02218
02219
02220 long XrdClientConn::GetDataFromCache(const void *buffer, long long begin_offs,
02221 long long end_offs, bool PerfCalc,
02222 XrdClientIntvList &missingblks, long &outstandingblks) {
02223
02224
02225
02226
02227 if (!fMainReadCache)
02228 return FALSE;
02229
02230 return ( fMainReadCache->GetDataIfPresent(buffer,
02231 begin_offs,
02232 end_offs,
02233 PerfCalc,
02234 missingblks, outstandingblks) );
02235 }
02236
02237
02238 bool XrdClientConn::SubmitDataToCache(XrdClientMessage *xmsg, long long begin_offs,
02239 long long end_offs) {
02240
02241 if (xmsg && fMainReadCache &&
02242 ((xmsg->HeaderStatus() == kXR_oksofar) ||
02243 (xmsg->HeaderStatus() == kXR_ok)))
02244
02245 fMainReadCache->SubmitXMessage(xmsg, begin_offs, end_offs);
02246
02247 return true;
02248 }
02249
02250
02251 bool XrdClientConn::SubmitRawDataToCache(const void *buffer,
02252 long long begin_offs,
02253 long long end_offs) {
02254
02255
02256 if (fMainReadCache) {
02257 if (!fMainReadCache->SubmitRawData(buffer, begin_offs, end_offs))
02258 free(const_cast<void *>(buffer));
02259 }
02260
02261 return true;
02262
02263 }
02264
02265
02266
02267 XReqErrorType XrdClientConn::WriteToServer_Async(ClientRequest *req,
02268 const void* reqMoreData,
02269 int substreamid) {
02270
02271
02272
02273
02274
02275
02276
02277 if (!ConnectionManager->SidManager()->GetNewSid(fPrimaryStreamid, req))
02278 return kNOMORESTREAMS;
02279
02280
02281
02282
02283
02284
02285 if (fMainReadCache && (req->header.requestid == kXR_write)) {
02286
02287
02288
02289 void *locbuf = malloc(req->header.dlen);
02290 if (!locbuf) {
02291 Error("WriteToServer_Async", "Error allocating " <<
02292 req->header.dlen << " bytes.");
02293 return kGENERICERR;
02294 }
02295
02296 memcpy(locbuf, reqMoreData, req->header.dlen);
02297
02298 if (!fMainReadCache->SubmitRawData(locbuf, req->write.offset, req->write.offset+req->header.dlen-1, true))
02299 free(locbuf);
02300 }
02301
02302 return WriteToServer(req, reqMoreData, fLogConnID, substreamid);
02303
02304 }
02305
02306
02307
02308 bool XrdClientConn::PanicClose() {
02309 ClientRequest closeFileRequest;
02310
02311 memset(&closeFileRequest, 0, sizeof(closeFileRequest) );
02312
02313 SetSID(closeFileRequest.header.streamid);
02314
02315 closeFileRequest.close.requestid = kXR_close;
02316
02317
02318
02319 closeFileRequest.close.dlen = 0;
02320
02321 WriteToServer(&closeFileRequest, 0, fLogConnID);
02322
02323 return TRUE;
02324 }
02325
02326
02327
02328 void XrdClientConn::CheckREQPauseState() {
02329
02330
02331
02332
02333
02334 time_t timenow;
02335
02336
02337 fREQWait->Lock();
02338
02339
02340 while (1) {
02341 timenow = time(0);
02342
02343 if ((timenow < fREQWaitTimeLimit) && !IsOpTimeLimitElapsed(timenow)) {
02344
02345 time_t tt = xrdmin(fREQWaitTimeLimit - timenow, 10);
02346
02347 fREQWait->Wait(tt);
02348 }
02349 else break;
02350 }
02351
02352
02353 fREQWait->UnLock();
02354 }
02355
02356
02357 void XrdClientConn::CheckREQConnectWaitState() {
02358
02359
02360
02361
02362
02363 time_t timenow;
02364
02365
02366 fREQConnectWait->Lock();
02367
02368
02369 while (1) {
02370 timenow = time(0);
02371
02372 if ((timenow < fREQConnectWaitTimeLimit) && !IsOpTimeLimitElapsed(timenow)) {
02373
02374 time_t tt = xrdmin(fREQWaitTimeLimit - timenow, 10);
02375
02376 fREQConnectWait->Wait(tt);
02377 }
02378 else break;
02379 }
02380
02381
02382 fREQConnectWait->UnLock();
02383 }
02384
02385
02386 bool XrdClientConn::WaitResp(int secsmax) {
02387
02388
02389
02390
02391
02392
02393 int rc = false;
02394
02395 Info(XrdClientDebug::kHIDEBUG,
02396 "WaitResp", "Waiting response for " << secsmax << " secs." );
02397
02398
02399 fREQWaitResp->Lock();
02400
02401 time_t timelimit = time(0)+secsmax;
02402
02403 while (!fREQWaitRespData) {
02404 rc = true;
02405 time_t timenow = time(0);
02406
02407 if ((timenow < timelimit) && !IsOpTimeLimitElapsed(timenow)) {
02408
02409 time_t tt = xrdmin(timelimit - timenow, 10);
02410 fREQWaitResp->Wait(tt);
02411
02412
02413
02414 if (fREQWaitRespData) {
02415 rc = false;
02416 break;
02417 }
02418
02419 }
02420 else break;
02421
02422
02423 }
02424
02425
02426 fREQWaitResp->UnLock();
02427
02428 if (rc) {
02429 Info(XrdClientDebug::kHIDEBUG,
02430 "WaitResp", "Timeout elapsed.");
02431 }
02432 else {
02433 Info(XrdClientDebug::kHIDEBUG,
02434 "WaitResp", "Got an unsolicited response. Data=" << fREQWaitRespData);
02435 }
02436
02437 return rc;
02438 }
02439
02440
02441
02442 UnsolRespProcResult XrdClientConn::ProcessAsynResp(XrdClientMessage *unsolmsg) {
02443
02444
02445
02446
02447
02448 if (unsolmsg->GetStatusCode() != XrdClientMessage::kXrdMSC_ok) {
02449 fREQWaitResp->Lock();
02450
02451
02452 fREQWaitRespData = (ServerResponseBody_Attn_asynresp *)malloc( sizeof(struct ServerResponseBody_Attn_asynresp) );
02453 memset( fREQWaitRespData, 0, sizeof(struct ServerResponseBody_Attn_asynresp) );
02454
02455 fREQWaitRespData->resphdr.status = kXR_wait;
02456 fREQWaitRespData->resphdr.dlen = sizeof(kXR_int32);
02457
02458 kXR_int32 i = htonl(1);
02459 memcpy(&fREQWaitRespData->respdata, &i, sizeof(i));
02460
02461 fREQWaitResp->Signal();
02462 fREQWaitResp->UnLock();
02463 return kUNSOL_CONTINUE;
02464 }
02465
02466
02467 ServerResponseBody_Attn_asynresp *ar;
02468 ar = (ServerResponseBody_Attn_asynresp *)unsolmsg->GetData();
02469
02470
02471
02472
02473 if ( !MatchStreamid(&ar->resphdr) ) return kUNSOL_CONTINUE;
02474
02475 Info(XrdClientDebug::kHIDEBUG,
02476 "ProcessAsynResp", "Streamid matched." );
02477
02478 fREQWaitResp->Lock();
02479
02480
02481
02482 fREQWaitRespData = ar;
02483
02484
02485 clientUnmarshall(&fREQWaitRespData->resphdr);
02486
02487 if (DebugLevel() >= XrdClientDebug::kDUMPDEBUG)
02488 smartPrintServerHeader(&fREQWaitRespData->resphdr);
02489
02490
02491 memcpy(&LastServerResp, &fREQWaitRespData->resphdr, sizeof(struct ServerResponseHeader));
02492
02493
02494 switch (fREQWaitRespData->resphdr.status) {
02495 case kXR_error: {
02496
02497
02498
02499 struct ServerResponseBody_Error *body_err;
02500
02501 body_err = (struct ServerResponseBody_Error *)(&fREQWaitRespData->respdata);
02502
02503 if (body_err) {
02504
02505
02506 kXR_int32 fErr = (XErrorCode)ntohl(body_err->errnum);
02507
02508 Info(XrdClientDebug::kNODEBUG, "ProcessAsynResp", "Server declared: " <<
02509 (const char*)body_err->errmsg << "(error code: " << fErr << ")");
02510
02511
02512 memset(&LastServerError, 0, sizeof(LastServerError));
02513 memcpy(&LastServerError, body_err, xrdmin(fREQWaitRespData->resphdr.dlen, (kXR_int32)(sizeof(LastServerError)-1) ));
02514 LastServerError.errnum = fErr;
02515
02516 }
02517
02518 break;
02519 }
02520
02521 case kXR_redirect: {
02522
02523
02524
02525
02526
02527 struct ServerResponseBody_Redirect *rd;
02528 rd = (struct ServerResponseBody_Redirect *)fREQWaitRespData->respdata;
02529
02530
02531 if (rd && (strlen(rd->host) > 0)) {
02532 Info(XrdClientDebug::kUSERDEBUG,
02533 "ProcessAsynResp", "Requested sync redir (via async response) to " << rd->host <<
02534 ":" << ntohl(rd->port));
02535
02536 SetRequestedDestHost(rd->host, ntohl(rd->port));
02537
02538
02539
02540 Disconnect(FALSE);
02541 }
02542
02543
02544
02545 fREQWaitRespData = (ServerResponseBody_Attn_asynresp *)malloc( sizeof(struct ServerResponseBody_Attn_asynresp) );
02546 memset( fREQWaitRespData, 0, sizeof(struct ServerResponseBody_Attn_asynresp) );
02547
02548 fREQWaitRespData->resphdr.status = kXR_wait;
02549 fREQWaitRespData->resphdr.dlen = sizeof(kXR_int32);
02550
02551 kXR_int32 i = htonl(1);
02552 memcpy(&fREQWaitRespData->respdata, &i, sizeof(i));
02553
02554 free(unsolmsg->DonateData());
02555 break;
02556 }
02557 }
02558
02559 unsolmsg->DonateData();
02560
02561
02562
02563 fREQWaitResp->Signal();
02564
02565 fREQWaitResp->UnLock();
02566
02567
02568 return kUNSOL_DISPOSE;
02569 }
02570
02571
02572
02573
02574
02575
02576
02577 int XrdClientConn::GetParallelStreamToUse(int reqsperstream) {
02578
02579 XrdClientLogConnection *lgc = 0;
02580 XrdClientPhyConnection *phyc = 0;
02581
02582 lgc = ConnectionManager->GetConnection(fLogConnID);
02583 if (!lgc) {
02584 Error("GetParallelStreamToUse",
02585 "Unknown logical conn " << fLogConnID);
02586
02587 return kWRITE;
02588 }
02589
02590 phyc = lgc->GetPhyConnection();
02591 if (!phyc) {
02592 Error("GetParallelStreamToUse",
02593 "Cannot find physical conn for logid " << fLogConnID);
02594
02595 return kWRITE;
02596 }
02597
02598 return phyc->GetSockIdHint(reqsperstream);
02599 }
02600
02601
02602 bool XrdClientConn::IsPhyConnConnected() {
02603
02604
02605 XrdClientLogConnection *lgc = 0;
02606 XrdClientPhyConnection *phyc = 0;
02607
02608 lgc = ConnectionManager->GetConnection(fLogConnID);
02609 if (!lgc) return false;
02610
02611 phyc = lgc->GetPhyConnection();
02612 if (!phyc) return false;
02613
02614 return phyc->IsValid();
02615 }
02616
02617 int XrdClientConn:: GetParallelStreamCount() {
02618
02619 XrdClientLogConnection *lgc = 0;
02620 XrdClientPhyConnection *phyc = 0;
02621
02622 lgc = ConnectionManager->GetConnection(fLogConnID);
02623 if (!lgc) {
02624 Error("GetParallelStreamCount",
02625 "Unknown logical conn " << fLogConnID);
02626
02627 return 0;
02628 }
02629
02630 phyc = lgc->GetPhyConnection();
02631 if (!phyc) {
02632 Error("GetParallelStreamCount",
02633 "Cannot find physical conn for logid " << fLogConnID);
02634
02635 return 0;
02636 }
02637
02638 return phyc->GetSockIdCount();
02639
02640 }
02641
02642
02643
02644 XrdClientPhyConnection *XrdClientConn::GetPhyConn(int LogConnID) {
02645
02646
02647 XrdClientLogConnection *log;
02648
02649 log = ConnectionManager->GetConnection(LogConnID);
02650 if (log) return log->GetPhyConnection();
02651 return 0;
02652
02653 }
02654
02655
02656 bool XrdClientConn::DoWriteSoftCheckPoint() {
02657
02658
02659
02660
02661
02662
02663
02664
02665
02666
02667 ConnectionManager->SidManager()->GetFailedOutstandingWriteRequests(fPrimaryStreamid, fWriteReqsToRetry);
02668
02669 for (int it = 0; it < fWriteReqsToRetry.GetSize(); it++) {
02670
02671 ClientRequest req;
02672 req = fWriteReqsToRetry[it];
02673
02674
02675
02676 void *data = fMainReadCache->FindBlk(req.write.offset, req.write.offset+req.write.dlen-1);
02677
02678
02679
02680 if (data) {
02681
02682 req.write.pathid = 0;
02683 bool ok = SendGenCommand(&req, data, 0, 0,
02684 false, (char *)"Write_checkpoint");
02685
02686 UnPinCacheBlk(req.write.offset, req.write.offset+req.write.dlen-1);
02687
02688
02689 if (!ok) return false;
02690
02691 }
02692 else {
02693 Error("DoWriteSoftCheckPoint", "Checkpoint data disappeared.");
02694 return false;
02695 }
02696
02697 }
02698
02699
02700 fWriteReqsToRetry.Clear();
02701 return true;
02702 }
02703
02704 bool XrdClientConn::DoWriteHardCheckPoint() {
02705
02706
02707
02708
02709
02710 while(1) {
02711 if (ConnectionManager->SidManager()->GetOutstandingWriteRequestCnt(fPrimaryStreamid) == 0) return true;
02712 if (!DoWriteSoftCheckPoint()) return false;
02713 if (ConnectionManager->SidManager()->GetOutstandingWriteRequestCnt(fPrimaryStreamid) == 0) return true;
02714
02715
02716 fWriteWaitAck->Wait(1);
02717 }
02718
02719 }
02720
02721
02722
02723
02724 void XrdClientConn::SetOpTimeLimit(int delta_secs) {
02725 fOpTimeLimit = time(0)+delta_secs;
02726 }
02727
02728 bool XrdClientConn::IsOpTimeLimitElapsed(time_t timenow) {
02729 return (timenow > fOpTimeLimit);
02730 }