00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdClientPhyConnectionCVSID = "$Id: XrdClientPhyConnection.cc 30949 2009-11-02 16:37:58Z ganis $";
00016
00017 #include <time.h>
00018 #include <stdlib.h>
00019 #include "XrdClient/XrdClientPhyConnection.hh"
00020 #include "XrdClient/XrdClientDebug.hh"
00021 #include "XrdClient/XrdClientMessage.hh"
00022 #include "XrdClient/XrdClientEnv.hh"
00023 #include "XrdClient/XrdClientSid.hh"
00024 #include "XrdSec/XrdSecInterface.hh"
00025 #ifndef WIN32
00026 #include <sys/socket.h>
00027 #else
00028 #include <Winsock2.h>
00029 #endif
00030
00031
00032 #define READERCOUNT (xrdmin(50, EnvGetLong(NAME_MULTISTREAMCNT)+1))
00033
00034
00035 void *SocketReaderThread(void * arg, XrdClientThread *thr)
00036 {
00037
00038
00039
00040
00041
00042
00043 if (thr->MaskSignal(0) != 0)
00044 Error("SocketReaderThread", "Warning: problems masking signals");
00045
00046 XrdClientPhyConnection *thisObj;
00047
00048 Info(XrdClientDebug::kHIDEBUG,
00049 "SocketReaderThread",
00050 "Reader Thread starting.");
00051
00052 thr->SetCancelDeferred();
00053 thr->SetCancelOn();
00054
00055 thisObj = (XrdClientPhyConnection *)arg;
00056
00057 thisObj->StartedReader();
00058
00059 while (1) {
00060 thr->SetCancelOff();
00061 thisObj->BuildMessage(TRUE, TRUE);
00062 thr->SetCancelOn();
00063
00064 if (thisObj->CheckAutoTerm())
00065 break;
00066 }
00067
00068 Info(XrdClientDebug::kHIDEBUG,
00069 "SocketReaderThread",
00070 "Reader Thread exiting.");
00071
00072 return 0;
00073 }
00074
00075
00076 XrdClientPhyConnection::XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h,
00077 XrdClientSid *sid):
00078 fMStreamsGoing(false), fReaderCV(0), fLogConnCnt(0), fSidManager(sid),
00079 fServerProto(0) {
00080
00081
00082 fServerType = kSTNone;
00083
00084
00085 fTTLsec = 30;
00086
00087 Touch();
00088
00089 fServer.Clear();
00090
00091 SetLogged(kNo);
00092
00093 fRequestTimeout = EnvGetLong(NAME_REQUESTTIMEOUT);
00094
00095 UnsolicitedMsgHandler = h;
00096
00097 for (int i = 0; i < READERCOUNT; i++)
00098 fReaderthreadhandler[i] = 0;
00099 fReaderthreadrunning = 0;
00100
00101 fSecProtocol = 0;
00102 }
00103
00104
00105 XrdClientPhyConnection::~XrdClientPhyConnection()
00106 {
00107
00108 Info(XrdClientDebug::kUSERDEBUG,
00109 "XrdClientPhyConnection",
00110 "Destroying. [" << fServer.Host << ":" << fServer.Port << "]");
00111
00112 Disconnect();
00113
00114 if (fSocket) {
00115 delete fSocket;
00116 fSocket = 0;
00117 }
00118
00119 UnlockChannel();
00120
00121 if (fReaderthreadrunning)
00122 for (int i = 0; i < READERCOUNT; i++)
00123 if (fReaderthreadhandler[i]) {
00124 fReaderthreadhandler[i]->Cancel();
00125 fReaderthreadhandler[i]->Join();
00126 delete fReaderthreadhandler[i];
00127 }
00128
00129 if (fSecProtocol) {
00130
00131
00132 fSecProtocol->Delete();
00133 fSecProtocol = 0;
00134 }
00135 }
00136
00137
00138 bool XrdClientPhyConnection::Connect(XrdClientUrlInfo RemoteHost, bool isUnix)
00139 {
00140
00141 XrdSysMutexHelper l(fMutex);
00142
00143
00144 if (isUnix) {
00145 Info(XrdClientDebug::kHIDEBUG, "Connect", "Connecting to " << RemoteHost.File);
00146 } else {
00147 Info(XrdClientDebug::kHIDEBUG,
00148 "Connect", "Connecting to [" << RemoteHost.Host << ":" << RemoteHost.Port << "]");
00149 }
00150
00151 if (EnvGetLong(NAME_MULTISTREAMCNT))
00152 fSocket = new XrdClientPSock(RemoteHost);
00153 else
00154 fSocket = new XrdClientSock(RemoteHost);
00155
00156 if(!fSocket) {
00157 Error("Connect","Unable to create a client socket. Aborting.");
00158 abort();
00159 }
00160
00161 fSocket->TryConnect(isUnix);
00162
00163 if (!fSocket->IsConnected()) {
00164 if (isUnix) {
00165 Error("Connect", "can't open UNIX connection to " << RemoteHost.File);
00166 } else {
00167 Error("Connect", "can't open connection to [" <<
00168 RemoteHost.Host << ":" << RemoteHost.Port << "]");
00169 }
00170 Disconnect();
00171
00172 return FALSE;
00173 }
00174
00175 Touch();
00176
00177 fTTLsec = EnvGetLong(NAME_DATASERVERCONN_TTL);
00178
00179 if (isUnix) {
00180 Info(XrdClientDebug::kHIDEBUG, "Connect", "Connected to " << RemoteHost.File);
00181 } else {
00182 Info(XrdClientDebug::kHIDEBUG, "Connect", "Connected to [" <<
00183 RemoteHost.Host << ":" << RemoteHost.Port << "]");
00184 }
00185
00186 fServer = RemoteHost;
00187
00188 {
00189 XrdSysMutexHelper l(fMutex);
00190 fReaderthreadrunning = 0;
00191 }
00192
00193 return TRUE;
00194 }
00195
00196
00197 void XrdClientPhyConnection::StartReader() {
00198 bool running;
00199
00200 {
00201 XrdSysMutexHelper l(fMutex);
00202 running = fReaderthreadrunning;
00203 }
00204
00205
00206
00207
00208
00209 if ( !running ) {
00210
00211 Info(XrdClientDebug::kHIDEBUG,
00212 "StartReader", "Starting reader thread...");
00213
00214 int rdcnt = READERCOUNT;
00215 if (fServerType == kSTBaseXrootd) rdcnt = 1;
00216
00217 for (int i = 0; i < rdcnt; i++) {
00218
00219
00220 fReaderthreadhandler[i] = new XrdClientThread(SocketReaderThread);
00221 if (!fReaderthreadhandler[i]) {
00222 Error("PhyConnection",
00223 "Can't create reader thread: out of system resources");
00224
00225 exit(-1);
00226 }
00227
00228 if (fReaderthreadhandler[i]->Run(this)) {
00229 Error("PhyConnection",
00230 "Can't run reader thread: out of system resources. Critical error.");
00231
00232 exit(-1);
00233 }
00234
00235 if (fReaderthreadhandler[i]->Detach())
00236 {
00237 Error("PhyConnection", "Thread detach failed");
00238
00239 }
00240
00241 }
00242
00243
00244 int maxRetries = 10;
00245 while (--maxRetries >= 0) {
00246 { XrdSysMutexHelper l(fMutex);
00247 if (fReaderthreadrunning)
00248 break;
00249 }
00250 fReaderCV.Wait(100);
00251 }
00252 }
00253 }
00254
00255
00256
00257 void XrdClientPhyConnection::StartedReader() {
00258 XrdSysMutexHelper l(fMutex);
00259 fReaderthreadrunning++;
00260 fReaderCV.Post();
00261 }
00262
00263
00264 bool XrdClientPhyConnection::ReConnect(XrdClientUrlInfo RemoteHost)
00265 {
00266
00267
00268 Disconnect();
00269 return Connect(RemoteHost);
00270 }
00271
00272
00273 void XrdClientPhyConnection::Disconnect()
00274 {
00275 XrdSysMutexHelper l(fMutex);
00276
00277
00278
00279 if (fSocket) {
00280 Info(XrdClientDebug::kHIDEBUG,
00281 "PhyConnection", "Disconnecting socket...");
00282 fSocket->Disconnect();
00283
00284 }
00285
00286
00287
00288 }
00289
00290
00291 bool XrdClientPhyConnection::CheckAutoTerm() {
00292 bool doexit = FALSE;
00293
00294 {
00295 XrdSysMutexHelper l(fMutex);
00296
00297
00298
00299 if ( !IsValid() ) {
00300
00301 Info(XrdClientDebug::kHIDEBUG,
00302 "CheckAutoTerm", "Self-Cancelling reader thread.");
00303
00304 {
00305 XrdSysMutexHelper l(fMutex);
00306 fReaderthreadrunning--;
00307 }
00308
00309
00310
00311
00312 doexit = TRUE;
00313 }
00314
00315 }
00316
00317
00318 if (doexit) {
00319 UnlockChannel();
00320 return true;
00321 }
00322
00323 return false;
00324 }
00325
00326
00327
00328 void XrdClientPhyConnection::Touch()
00329 {
00330
00331 XrdSysMutexHelper l(fMutex);
00332
00333 time_t t = time(0);
00334
00335
00336
00337
00338
00339 fLastUseTimestamp = t;
00340 }
00341
00342
00343 int XrdClientPhyConnection::ReadRaw(void *buf, int len, int substreamid,
00344 int *usedsubstreamid) {
00345
00346
00347
00348
00349
00350
00351
00352 int res;
00353
00354
00355 if (IsValid()) {
00356
00357 Info(XrdClientDebug::kDUMPDEBUG,
00358 "ReadRaw",
00359 "Reading from " <<
00360 fServer.Host << ":" << fServer.Port);
00361
00362 res = fSocket->RecvRaw(buf, len, substreamid, usedsubstreamid);
00363
00364 if ((res < 0) && (res != TXSOCK_ERR_TIMEOUT) && errno ) {
00365
00366
00367 Info(XrdClientDebug::kHIDEBUG,
00368 "ReadRaw", "Read error on " <<
00369 fServer.Host << ":" << fServer.Port << ". errno=" << errno );
00370 }
00371
00372
00373
00374 if (((res < 0) && (res == TXSOCK_ERR)) ||
00375 (!fSocket->IsConnected())) {
00376
00377 Info(XrdClientDebug::kHIDEBUG,
00378 "ReadRaw",
00379 "Disconnection reported on" <<
00380 fServer.Host << ":" << fServer.Port);
00381
00382 Disconnect();
00383 }
00384
00385
00386
00387 if ((res > 0) && (DebugLevel() > XrdClientDebug::kDUMPDEBUG)) {
00388 XrdOucString s = " ";
00389 char b[256];
00390
00391 for (int i = 0; i < xrdmin(res, 256); i++) {
00392 sprintf(b, "%.2x ", *((unsigned char *)buf + i));
00393 s += b;
00394 if (!((i + 1) % 16)) s += "\n ";
00395 }
00396
00397 Info(XrdClientDebug::kHIDEBUG,
00398 "ReadRaw", "Read " << res << "bytes. Dump:" << endl << s << endl);
00399
00400 }
00401
00402 return res;
00403 }
00404 else {
00405
00406 Info(XrdClientDebug::kUSERDEBUG,
00407 "ReadRaw", "Socket is disconnected.");
00408
00409 return TXSOCK_ERR;
00410 }
00411
00412 }
00413
00414
00415 XrdClientMessage *XrdClientPhyConnection::ReadMessage(int streamid) {
00416
00417
00418
00419 Touch();
00420 return fMsgQ.GetMsg(streamid, fRequestTimeout );
00421
00422 }
00423
00424
00425 XrdClientMessage *XrdClientPhyConnection::BuildMessage(bool IgnoreTimeouts, bool Enqueue)
00426 {
00427
00428
00429
00430 XrdClientMessage *m;
00431 struct SidInfo *parallelsid = 0;
00432 UnsolRespProcResult res = kUNSOL_KEEP;
00433
00434 m = new XrdClientMessage();
00435 if (!m) {
00436 Error("BuildMessage",
00437 "Cannot create a new Message. Aborting.");
00438 abort();
00439 }
00440
00441 {
00442
00443 m->ReadRaw(this);
00444
00445 }
00446
00447 parallelsid = (fSidManager) ? fSidManager->GetSidInfo(m->HeaderSID()) : 0;
00448
00449 if ( parallelsid || (m->IsAttn()) || (m->GetStatusCode() == XrdClientMessage::kXrdMSC_readerr)) {
00450
00451
00452
00453
00454
00455
00456
00457
00458 if (m->GetStatusCode() == XrdClientMessage::kXrdMSC_readerr) {
00459 Info(XrdClientDebug::kDUMPDEBUG,
00460 "BuildMessage"," propagating a communication error message.");
00461 }
00462 else {
00463 Info(XrdClientDebug::kDUMPDEBUG,
00464 "BuildMessage"," propagating unsol id " << m->HeaderSID());
00465 }
00466
00467 Touch();
00468 res = HandleUnsolicited(m);
00469
00470
00471
00472 }
00473
00474 if (Enqueue && !parallelsid && !m->IsAttn() && (m->GetStatusCode() != XrdClientMessage::kXrdMSC_readerr)) {
00475
00476
00477
00478
00479
00480
00481
00482 if (IgnoreTimeouts) {
00483
00484 if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
00485
00486
00487 Info(XrdClientDebug::kDUMPDEBUG,
00488 "BuildMessage"," posting id "<<m->HeaderSID());
00489
00490 fMsgQ.PutMsg(m);
00491
00492
00493
00494 }
00495 else {
00496
00497 Info(XrdClientDebug::kDUMPDEBUG,
00498 "BuildMessage"," deleting id "<<m->HeaderSID());
00499
00500 delete m;
00501 m = 0;
00502 }
00503
00504 } else
00505 fMsgQ.PutMsg(m);
00506 }
00507 else {
00508
00509
00510
00511 if ( (parallelsid) && (res != kUNSOL_KEEP) &&
00512 (m->GetStatusCode() != XrdClientMessage::kXrdMSC_readerr) )
00513 if (fSidManager && (m->HeaderStatus() != kXR_oksofar))
00514 fSidManager->ReleaseSid(m->HeaderSID());
00515
00516
00517 delete m;
00518 m = 0;
00519
00520
00521 }
00522
00523 return m;
00524 }
00525
00526
00527 UnsolRespProcResult XrdClientPhyConnection::HandleUnsolicited(XrdClientMessage *m)
00528 {
00529
00530
00531 bool ProcessingToGo = TRUE;
00532 struct ServerResponseBody_Attn *attnbody;
00533
00534 Touch();
00535
00536
00537 attnbody = (struct ServerResponseBody_Attn *)m->GetData();
00538
00539 if (attnbody && (m->IsAttn())) {
00540 attnbody->actnum = ntohl(attnbody->actnum);
00541
00542 switch (attnbody->actnum) {
00543 case kXR_asyncms:
00544
00545 Info(XrdClientDebug::kNODEBUG,
00546 "HandleUnsolicited",
00547 "Message from " <<
00548 fServer.Host << ":" << fServer.Port << ". '" <<
00549 attnbody->parms << "'");
00550
00551 ProcessingToGo = FALSE;
00552 break;
00553
00554 case kXR_asyncab:
00555
00556 Info(XrdClientDebug::kNODEBUG,
00557 "HandleUnsolicited",
00558 "******* Abort request received ******* Server: " <<
00559 fServer.Host << ":" << fServer.Port << ". Msg: '" <<
00560 attnbody->parms << "'");
00561
00562 exit(255);
00563
00564 ProcessingToGo = FALSE;
00565 break;
00566 }
00567 }
00568
00569
00570
00571 if (ProcessingToGo) {
00572 UnsolRespProcResult retval;
00573
00574 retval = SendUnsolicitedMsg(this, m);
00575
00576
00577 if (attnbody && (m->IsAttn())) {
00578 switch (attnbody->actnum) {
00579
00580 case kXR_asyncrd:
00581
00582
00583
00584 Disconnect();
00585 break;
00586
00587 case kXR_asyncdi:
00588
00589
00590
00591
00592 Disconnect();
00593 break;
00594
00595 }
00596 }
00597 return retval;
00598
00599 }
00600 else
00601 return kUNSOL_CONTINUE;
00602 }
00603
00604
00605 int XrdClientPhyConnection::WriteRaw(const void *buf, int len, int substreamid) {
00606
00607
00608
00609
00610
00611
00612 int res;
00613
00614 Touch();
00615
00616 if (IsValid()) {
00617
00618 Info(XrdClientDebug::kDUMPDEBUG,
00619 "WriteRaw",
00620 "Writing to substreamid " <<
00621 substreamid);
00622
00623 res = fSocket->SendRaw(buf, len, substreamid);
00624
00625 if ((res < 0) && (res != TXSOCK_ERR_TIMEOUT) && errno) {
00626
00627
00628 Info(XrdClientDebug::kHIDEBUG,
00629 "WriteRaw", "Write error on " <<
00630 fServer.Host << ":" << fServer.Port << ". errno=" << errno );
00631
00632 }
00633
00634
00635 if ((res < 0) || (!fSocket) || (!fSocket->IsConnected())) {
00636
00637 Info(XrdClientDebug::kHIDEBUG,
00638 "WriteRaw",
00639 "Disconnection reported on" <<
00640 fServer.Host << ":" << fServer.Port);
00641
00642 Disconnect();
00643 }
00644
00645 Touch();
00646 return( res );
00647 }
00648 else {
00649
00650 Info(XrdClientDebug::kUSERDEBUG,
00651 "WriteRaw",
00652 "Socket is disconnected.");
00653 return TXSOCK_ERR;
00654 }
00655 }
00656
00657
00658
00659 bool XrdClientPhyConnection::ExpiredTTL()
00660 {
00661
00662 return( (time(0) - fLastUseTimestamp) > fTTLsec );
00663 }
00664
00665
00666 void XrdClientPhyConnection::LockChannel()
00667 {
00668
00669 fRwMutex.Lock();
00670 }
00671
00672
00673 void XrdClientPhyConnection::UnlockChannel()
00674 {
00675
00676 fRwMutex.UnLock();
00677 }
00678
00679
00680 ERemoteServerType XrdClientPhyConnection::DoHandShake(ServerInitHandShake &xbody,
00681 int substreamid)
00682 {
00683
00684
00685
00686 struct ClientInitHandShake initHS;
00687 ServerResponseType type;
00688 ERemoteServerType typeres = kSTNone;
00689
00690 int writeres, readres, len;
00691
00692
00693 memset(&initHS, 0, sizeof(initHS));
00694 initHS.fourth = (kXR_int32)htonl(4);
00695 initHS.fifth = (kXR_int32)htonl(2012);
00696
00697
00698
00699
00700 len = sizeof(initHS);
00701
00702 Info(XrdClientDebug::kHIDEBUG,
00703 "DoHandShake",
00704 "HandShake step 1: Sending " << len << " bytes.");
00705
00706 writeres = WriteRaw(&initHS, len, substreamid);
00707
00708 if (writeres < 0) {
00709 Info(XrdClientDebug::kNODEBUG,"DoHandShake", "Failed to send " << len <<
00710 " bytes. Retrying ...");
00711
00712 return kSTError;
00713 }
00714
00715
00716 len = sizeof(type);
00717
00718 Info(XrdClientDebug::kHIDEBUG,
00719 "DoHandShake",
00720 "HandShake step 2: Reading " << len <<
00721 " bytes.");
00722
00723
00724
00725
00726
00727 readres = ReadRaw(&type,
00728 len, substreamid);
00729
00730 if (readres < 0) {
00731 Info(XrdClientDebug::kNODEBUG, "DoHandShake", "Failed to read " << len <<
00732 " bytes. Retrying ...");
00733
00734 return kSTError;
00735 }
00736
00737
00738 type = ntohl(type);
00739
00740
00741
00742 if (type == 0) {
00743
00744 len = sizeof(xbody);
00745
00746 Info(XrdClientDebug::kHIDEBUG,
00747 "DoHandShake",
00748 "HandShake step 3: Reading " << len <<
00749 " bytes.");
00750
00751 readres = ReadRaw(&xbody, len, substreamid);
00752
00753 if (readres < 0) {
00754 Error("DoHandShake", "Error reading " << len <<
00755 " bytes.");
00756
00757 return kSTError;
00758 }
00759
00760 ServerInitHandShake2HostFmt(&xbody);
00761
00762 Info(XrdClientDebug::kHIDEBUG,
00763 "DoHandShake",
00764 "Server protocol: " << xbody.protover << " type: " << xbody.msgval);
00765
00766
00767 switch (xbody.msgval) {
00768
00769 case kXR_DataServer:
00770
00771 typeres = kSTDataXrootd;
00772 break;
00773
00774 case kXR_LBalServer:
00775 typeres = kSTBaseXrootd;
00776 break;
00777 }
00778
00779 } else {
00780
00781
00782
00783 if (type == 8)
00784 typeres = kSTRootd;
00785 else
00786
00787 typeres = kSTNone;
00788 }
00789
00790 fServerType = typeres;
00791 return typeres;
00792 }
00793
00794
00795 void XrdClientPhyConnection::CountLogConn(int d)
00796 {
00797
00798 fMutex.Lock();
00799 fLogConnCnt += d;
00800 fMutex.UnLock();
00801 }
00802
00803
00804 bool XrdClientPhyConnection::TestAndSetMStreamsGoing() {
00805 XrdSysMutexHelper mtx(fMutex);
00806 bool retval = fMStreamsGoing;
00807 fMStreamsGoing = true;
00808 return retval;
00809 }
00810
00811 bool XrdClientPhyConnection::IsValid() {
00812 XrdSysMutexHelper l(fMutex);
00813 return ( (fSocket != 0) && fSocket->IsConnected());
00814 }
00815
00816 ELoginState XrdClientPhyConnection::IsLogged() {
00817 const XrdSysMutexHelper l(fMutex);
00818 return fLogged;
00819 }