XrdClientPhyConnection.cc

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // XrdClientPhyConnection                                               //
00004 // Author: Fabrizio Furano (INFN Padova, 2004)                          //
00005 // Adapted from TXNetFile (root.cern.ch) originally done by             //
00006 //  Alvise Dorigo, Fabrizio Furano                                      //
00007 //          INFN Padova, 2003                                           //
00008 //                                                                      //
00009 // Class handling physical connections to xrootd servers                //
00010 //                                                                      //
00011 //////////////////////////////////////////////////////////////////////////
00012 
00013 //       $Id: XrdClientPhyConnection.cc 30949 2009-11-02 16:37:58Z ganis $
00014 
00015 const char *XrdClientPhyConnectionCVSID = "$Id: XrdClientPhyConnection.cc 30949 2009-11-02 16:37:58Z ganis $";
00016 
00017 #include <time.h>
00018 #include <stdlib.h>
00019 #include "XrdClient/XrdClientPhyConnection.hh"
00020 #include "XrdClient/XrdClientDebug.hh"
00021 #include "XrdClient/XrdClientMessage.hh"
00022 #include "XrdClient/XrdClientEnv.hh"
00023 #include "XrdClient/XrdClientSid.hh"
00024 #include "XrdSec/XrdSecInterface.hh"
00025 #ifndef WIN32
00026 #include <sys/socket.h>
00027 #else
00028 #include <Winsock2.h>
00029 #endif
00030 
00031 
00032 #define READERCOUNT (xrdmin(50, EnvGetLong(NAME_MULTISTREAMCNT)+1))
00033 
00034 //____________________________________________________________________________
00035 void *SocketReaderThread(void * arg, XrdClientThread *thr)
00036 {
00037    // This thread is the base for the async capabilities of XrdClientPhyConnection
00038    // It repeatedly keeps reading from the socket, while feeding the
00039    // MsqQ with a stream of XrdClientMessages containing what's happening
00040    // at the socket level
00041 
00042    // Mask all allowed signals
00043    if (thr->MaskSignal(0) != 0)
00044       Error("SocketReaderThread", "Warning: problems masking signals");
00045 
00046    XrdClientPhyConnection *thisObj;
00047 
00048    Info(XrdClientDebug::kHIDEBUG,
00049         "SocketReaderThread",
00050         "Reader Thread starting.");
00051 
00052    thr->SetCancelDeferred();
00053    thr->SetCancelOn();
00054 
00055    thisObj = (XrdClientPhyConnection *)arg;
00056 
00057    thisObj->StartedReader();
00058 
00059    while (1) {
00060      thr->SetCancelOff();
00061      thisObj->BuildMessage(TRUE, TRUE);
00062      thr->SetCancelOn();
00063 
00064      if (thisObj->CheckAutoTerm())
00065         break;
00066    }
00067 
00068    Info(XrdClientDebug::kHIDEBUG,
00069         "SocketReaderThread",
00070         "Reader Thread exiting.");
00071 
00072    return 0;
00073 }
00074 
00075 //____________________________________________________________________________
00076 XrdClientPhyConnection::XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h,
00077                                                XrdClientSid *sid):
00078     fMStreamsGoing(false), fReaderCV(0), fLogConnCnt(0), fSidManager(sid),
00079     fServerProto(0) {
00080 
00081    // Constructor
00082    fServerType = kSTNone;
00083 
00084    // Immediate destruction of this object is always a bad idea
00085    fTTLsec = 30;
00086 
00087    Touch();
00088 
00089    fServer.Clear();
00090 
00091    SetLogged(kNo);
00092 
00093    fRequestTimeout = EnvGetLong(NAME_REQUESTTIMEOUT);
00094 
00095    UnsolicitedMsgHandler = h;
00096 
00097    for (int i = 0; i < READERCOUNT; i++)
00098      fReaderthreadhandler[i] = 0;
00099    fReaderthreadrunning = 0;
00100 
00101    fSecProtocol = 0;
00102 }
00103 
00104 //____________________________________________________________________________
00105 XrdClientPhyConnection::~XrdClientPhyConnection()
00106 {
00107    // Destructor
00108   Info(XrdClientDebug::kUSERDEBUG,
00109        "XrdClientPhyConnection",
00110        "Destroying. [" << fServer.Host << ":" << fServer.Port << "]");
00111 
00112    Disconnect();
00113 
00114      if (fSocket) {
00115         delete fSocket;
00116         fSocket = 0;
00117    }
00118 
00119    UnlockChannel();
00120 
00121    if (fReaderthreadrunning) 
00122       for (int i = 0; i < READERCOUNT; i++)
00123         if (fReaderthreadhandler[i]) {
00124           fReaderthreadhandler[i]->Cancel();
00125           fReaderthreadhandler[i]->Join();
00126           delete fReaderthreadhandler[i];
00127         }
00128 
00129    if (fSecProtocol) {
00130       // This insures that the right destructor is called
00131       // (Do not do C++ delete).
00132       fSecProtocol->Delete();
00133       fSecProtocol = 0;
00134    }
00135 }
00136 
00137 //____________________________________________________________________________
00138 bool XrdClientPhyConnection::Connect(XrdClientUrlInfo RemoteHost, bool isUnix)
00139 {
00140    // Connect to remote server
00141    XrdSysMutexHelper l(fMutex);
00142 
00143 
00144    if (isUnix) {
00145       Info(XrdClientDebug::kHIDEBUG, "Connect", "Connecting to " << RemoteHost.File);
00146    } else {
00147       Info(XrdClientDebug::kHIDEBUG,
00148       "Connect", "Connecting to [" << RemoteHost.Host << ":" << RemoteHost.Port << "]");
00149    } 
00150 
00151    if (EnvGetLong(NAME_MULTISTREAMCNT))
00152        fSocket = new XrdClientPSock(RemoteHost);
00153    else
00154        fSocket = new XrdClientSock(RemoteHost);
00155 
00156    if(!fSocket) {
00157       Error("Connect","Unable to create a client socket. Aborting.");
00158       abort();
00159    }
00160 
00161    fSocket->TryConnect(isUnix);
00162 
00163    if (!fSocket->IsConnected()) {
00164       if (isUnix) {
00165          Error("Connect", "can't open UNIX connection to " << RemoteHost.File);
00166       } else {
00167          Error("Connect", "can't open connection to [" <<
00168                RemoteHost.Host << ":" << RemoteHost.Port << "]");
00169       }
00170       Disconnect();
00171 
00172      return FALSE;
00173    }
00174 
00175    Touch();
00176 
00177    fTTLsec = EnvGetLong(NAME_DATASERVERCONN_TTL);
00178 
00179    if (isUnix) {
00180       Info(XrdClientDebug::kHIDEBUG, "Connect", "Connected to " << RemoteHost.File);
00181    } else {
00182       Info(XrdClientDebug::kHIDEBUG, "Connect", "Connected to [" <<
00183            RemoteHost.Host << ":" << RemoteHost.Port << "]");
00184    }
00185 
00186    fServer = RemoteHost;
00187 
00188    {
00189       XrdSysMutexHelper l(fMutex);
00190       fReaderthreadrunning = 0;
00191    }
00192 
00193    return TRUE;
00194 }
00195 
00196 //____________________________________________________________________________
00197 void XrdClientPhyConnection::StartReader() {
00198    bool running;
00199 
00200    {
00201       XrdSysMutexHelper l(fMutex);
00202       running = fReaderthreadrunning;
00203    }
00204    // Start reader thread
00205 
00206    // Parametric asynchronous stuff.
00207    // If we are going Sync, then nothing has to be done,
00208    // otherwise the reader thread must be started
00209    if ( !running ) {
00210 
00211       Info(XrdClientDebug::kHIDEBUG,
00212            "StartReader", "Starting reader thread...");
00213 
00214       int rdcnt = READERCOUNT;
00215       if (fServerType == kSTBaseXrootd) rdcnt = 1;
00216 
00217       for (int i = 0; i < rdcnt; i++) {
00218 
00219       // Now we launch  the reader thread
00220       fReaderthreadhandler[i] = new XrdClientThread(SocketReaderThread);
00221       if (!fReaderthreadhandler[i]) {
00222          Error("PhyConnection",
00223                "Can't create reader thread: out of system resources");
00224 // HELP: what do we do here
00225          exit(-1);
00226       }
00227 
00228       if (fReaderthreadhandler[i]->Run(this)) {
00229          Error("PhyConnection",
00230                "Can't run reader thread: out of system resources. Critical error.");
00231 // HELP: what do we do here
00232          exit(-1);
00233       }
00234 
00235       if (fReaderthreadhandler[i]->Detach())
00236       {
00237          Error("PhyConnection", "Thread detach failed");
00238          //return;
00239       }
00240 
00241       }
00242       // sleep until at least one thread starts running, which hopefully
00243       // is not forever.
00244       int maxRetries = 10;
00245       while (--maxRetries >= 0) {
00246          {  XrdSysMutexHelper l(fMutex);
00247             if (fReaderthreadrunning)
00248                break;
00249          }
00250          fReaderCV.Wait(100);
00251       }
00252    }
00253 }
00254 
00255 
00256 //____________________________________________________________________________
00257 void XrdClientPhyConnection::StartedReader() {
00258    XrdSysMutexHelper l(fMutex);
00259    fReaderthreadrunning++;
00260    fReaderCV.Post();
00261 }
00262 
00263 //____________________________________________________________________________
00264 bool XrdClientPhyConnection::ReConnect(XrdClientUrlInfo RemoteHost)
00265 {
00266    // Re-connection attempt
00267 
00268    Disconnect();
00269    return Connect(RemoteHost);
00270 }
00271 
00272 //____________________________________________________________________________
00273 void XrdClientPhyConnection::Disconnect()
00274 {
00275    XrdSysMutexHelper l(fMutex);
00276 
00277    // Disconnect from remote server
00278 
00279    if (fSocket) {
00280       Info(XrdClientDebug::kHIDEBUG,
00281            "PhyConnection", "Disconnecting socket...");
00282       fSocket->Disconnect();
00283 
00284    }
00285 
00286    // We do not destroy the socket here. The socket will be destroyed
00287    // in CheckAutoTerm or in the ConnMgr
00288 }
00289 
00290 //____________________________________________________________________________
00291 bool XrdClientPhyConnection::CheckAutoTerm() {
00292    bool doexit = FALSE;
00293 
00294   {
00295    XrdSysMutexHelper l(fMutex);
00296 
00297    // Parametric asynchronous stuff
00298    // If we are going async, we might be willing to term ourself
00299    if ( !IsValid() ) {
00300 
00301          Info(XrdClientDebug::kHIDEBUG,
00302               "CheckAutoTerm", "Self-Cancelling reader thread.");
00303 
00304          {
00305             XrdSysMutexHelper l(fMutex);
00306             fReaderthreadrunning--;
00307          }
00308 
00309          //delete fSocket;
00310          //fSocket = 0;
00311 
00312          doexit = TRUE;
00313       }
00314 
00315   }
00316 
00317 
00318   if (doexit) {
00319         UnlockChannel();
00320         return true;
00321    }
00322 
00323   return false;
00324 }
00325 
00326 
00327 //____________________________________________________________________________
00328 void XrdClientPhyConnection::Touch()
00329 {
00330    // Set last-use-time to present time
00331    XrdSysMutexHelper l(fMutex);
00332 
00333    time_t t = time(0);
00334 
00335    //Info(XrdClientDebug::kDUMPDEBUG,
00336    //   "Touch",
00337    //   "Setting last use to current time" << t);
00338 
00339    fLastUseTimestamp = t;
00340 }
00341 
00342 //____________________________________________________________________________
00343 int XrdClientPhyConnection::ReadRaw(void *buf, int len, int substreamid,
00344                            int *usedsubstreamid) {
00345    // Receive 'len' bytes from the connected server and store them in 'buf'.
00346    // Return 0 if OK. 
00347    // If substreamid = -1 then
00348    //  gets length bytes from any par socket, and returns the usedsubstreamid
00349    //   where it got the bytes from
00350    // Otherwise read bytes from the specified substream. 0 is the main one.
00351 
00352    int res;
00353 
00354 
00355    if (IsValid()) {
00356 
00357       Info(XrdClientDebug::kDUMPDEBUG,
00358            "ReadRaw",
00359            "Reading from " <<
00360            fServer.Host << ":" << fServer.Port);
00361 
00362       res = fSocket->RecvRaw(buf, len, substreamid, usedsubstreamid);
00363 
00364       if ((res < 0) && (res != TXSOCK_ERR_TIMEOUT) && errno ) {
00365          //strerror_r(errno, errbuf, sizeof(buf));
00366 
00367          Info(XrdClientDebug::kHIDEBUG,
00368               "ReadRaw", "Read error on " <<
00369               fServer.Host << ":" << fServer.Port << ". errno=" << errno );
00370       }
00371 
00372       // If a socket error comes, then we disconnect
00373       // but we have not to disconnect in the case of a timeout
00374       if (((res < 0) && (res == TXSOCK_ERR)) ||
00375           (!fSocket->IsConnected())) {
00376 
00377          Info(XrdClientDebug::kHIDEBUG,
00378               "ReadRaw", 
00379               "Disconnection reported on" <<
00380               fServer.Host << ":" << fServer.Port);
00381 
00382          Disconnect();
00383       }
00384 
00385 
00386       // Let's dump the received bytes
00387       if ((res > 0) && (DebugLevel() > XrdClientDebug::kDUMPDEBUG)) {
00388           XrdOucString s = "   ";
00389           char b[256]; 
00390 
00391           for (int i = 0; i < xrdmin(res, 256); i++) {
00392               sprintf(b, "%.2x ", *((unsigned char *)buf + i));
00393               s += b;
00394               if (!((i + 1) % 16)) s += "\n   ";
00395           }
00396 
00397           Info(XrdClientDebug::kHIDEBUG,
00398                "ReadRaw", "Read " << res <<  "bytes. Dump:" << endl << s << endl);
00399 
00400       }
00401 
00402       return res;
00403    }
00404    else {
00405       // Socket already destroyed or disconnected
00406       Info(XrdClientDebug::kUSERDEBUG,
00407            "ReadRaw", "Socket is disconnected.");
00408 
00409       return TXSOCK_ERR;
00410    }
00411 
00412 }
00413 
00414 //____________________________________________________________________________
00415 XrdClientMessage *XrdClientPhyConnection::ReadMessage(int streamid) {
00416    // Gets a full loaded XrdClientMessage from this phyconn.
00417    // May be a pure msg pick from a queue
00418 
00419    Touch();
00420    return fMsgQ.GetMsg(streamid, fRequestTimeout );
00421 
00422  }
00423 
00424 //____________________________________________________________________________
00425 XrdClientMessage *XrdClientPhyConnection::BuildMessage(bool IgnoreTimeouts, bool Enqueue)
00426 {
00427    // Builds an XrdClientMessage, and makes it read its header/data from the socket
00428    // Also put automatically the msg into the queue
00429 
00430    XrdClientMessage *m;
00431    struct SidInfo *parallelsid = 0;
00432    UnsolRespProcResult res = kUNSOL_KEEP;
00433 
00434    m = new XrdClientMessage();
00435    if (!m) {
00436       Error("BuildMessage",
00437             "Cannot create a new Message. Aborting.");
00438       abort();
00439    }
00440 
00441    {
00442 //     fMultireadMutex.Lock();
00443      m->ReadRaw(this);
00444 //     fMultireadMutex.UnLock();
00445    }
00446 
00447    parallelsid = (fSidManager) ? fSidManager->GetSidInfo(m->HeaderSID()) : 0;
00448 
00449    if ( parallelsid || (m->IsAttn()) || (m->GetStatusCode() == XrdClientMessage::kXrdMSC_readerr)) {
00450       
00451 
00452       // Here we insert the PhyConn-level support for unsolicited responses
00453       // Some of them will be propagated in some way to the upper levels
00454       // The path should be
00455       //  here -> XrdClientConnMgr -> all the involved XrdClientLogConnections ->
00456       //   -> all the corresponding XrdClient
00457 
00458       if (m->GetStatusCode() == XrdClientMessage::kXrdMSC_readerr) {
00459           Info(XrdClientDebug::kDUMPDEBUG,
00460                "BuildMessage"," propagating a communication error message.");
00461       }
00462       else {
00463           Info(XrdClientDebug::kDUMPDEBUG,
00464                "BuildMessage"," propagating unsol id " << m->HeaderSID());
00465       }
00466 
00467       Touch();
00468       res = HandleUnsolicited(m);
00469 
00470 
00471 
00472    }
00473    
00474    if (Enqueue && !parallelsid && !m->IsAttn() && (m->GetStatusCode() != XrdClientMessage::kXrdMSC_readerr)) {
00475        // If we have to ignore the socket timeouts, then we have not to
00476        // feed the queue with them. In this case, the newly created XrdClientMessage
00477        // has to be freed.
00478        //if ( !IgnoreTimeouts || !m->IsError() )
00479 
00480        //bool waserror;
00481 
00482        if (IgnoreTimeouts) {
00483 
00484            if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
00485                //waserror = m->IsError();
00486 
00487                Info(XrdClientDebug::kDUMPDEBUG,
00488                     "BuildMessage"," posting id "<<m->HeaderSID());
00489 
00490                fMsgQ.PutMsg(m);
00491 
00492                //if (waserror)
00493                //   for (int kk=0; kk < 10; kk++) fMsgQ.PutMsg(0);
00494            }
00495            else {
00496 
00497                Info(XrdClientDebug::kDUMPDEBUG,
00498                     "BuildMessage"," deleting id "<<m->HeaderSID());
00499 
00500                delete m;
00501                m = 0;
00502            }
00503 
00504        } else
00505            fMsgQ.PutMsg(m);
00506    }
00507    else {
00508 
00509 
00510        // The purpose of this message ends here
00511        if ( (parallelsid) && (res != kUNSOL_KEEP) &&
00512             (m->GetStatusCode() != XrdClientMessage::kXrdMSC_readerr) )
00513          if (fSidManager && (m->HeaderStatus() != kXR_oksofar))
00514             fSidManager->ReleaseSid(m->HeaderSID());
00515        
00516        //       if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_readerr) {
00517        delete m;
00518        m = 0;
00519        //       }
00520 
00521    }
00522   
00523    return m;
00524 }
00525 
00526 //____________________________________________________________________________
00527 UnsolRespProcResult XrdClientPhyConnection::HandleUnsolicited(XrdClientMessage *m)
00528 {
00529    // Local processing of unsolicited responses is done here
00530 
00531    bool ProcessingToGo = TRUE;
00532    struct ServerResponseBody_Attn *attnbody;
00533 
00534    Touch();
00535 
00536    // Local pre-processing of the unsolicited XrdClientMessage
00537    attnbody = (struct ServerResponseBody_Attn *)m->GetData();
00538 
00539    if (attnbody && (m->IsAttn())) {
00540       attnbody->actnum = ntohl(attnbody->actnum);
00541 
00542       switch (attnbody->actnum) {
00543       case kXR_asyncms:
00544          // A message arrived from the server. Let's print it.
00545          Info(XrdClientDebug::kNODEBUG,
00546               "HandleUnsolicited",
00547               "Message from " <<
00548               fServer.Host << ":" << fServer.Port << ". '" <<
00549               attnbody->parms << "'");
00550 
00551          ProcessingToGo = FALSE;
00552          break;
00553 
00554       case kXR_asyncab:
00555          // The server requested to abort the execution!!!!
00556          Info(XrdClientDebug::kNODEBUG,
00557               "HandleUnsolicited",
00558               "******* Abort request received ******* Server: " <<
00559               fServer.Host << ":" << fServer.Port << ". Msg: '" <<
00560               attnbody->parms << "'");
00561          
00562          exit(255);
00563 
00564          ProcessingToGo = FALSE;
00565          break;
00566       }
00567    }
00568 
00569    // Now we propagate the message to the interested object, if any
00570    // It could be some sort of upper layer of the architecture
00571    if (ProcessingToGo) {
00572       UnsolRespProcResult retval;
00573 
00574       retval = SendUnsolicitedMsg(this, m);
00575 
00576       // Request post-processing
00577       if (attnbody && (m->IsAttn())) {
00578          switch (attnbody->actnum) {
00579 
00580          case kXR_asyncrd:
00581             // After having set all the belonging object, we disconnect.
00582             // The next commands will redirect-on-error where we want
00583 
00584             Disconnect();
00585             break;
00586       
00587          case kXR_asyncdi:
00588             // After having set all the belonging object, we disconnect.
00589             // The next connection attempt will behave as requested,
00590             // i.e. waiting some time before reconnecting
00591 
00592             Disconnect();
00593             break;
00594 
00595          } // switch
00596       }
00597       return retval;
00598 
00599    }
00600    else 
00601       return kUNSOL_CONTINUE;
00602 }
00603 
00604 //____________________________________________________________________________
00605 int XrdClientPhyConnection::WriteRaw(const void *buf, int len, int substreamid) {
00606     // Send 'len' bytes located at 'buf' to the connected server.
00607     // Return number of bytes sent.
00608     // usesubstreams tells if we have to select a substream to send the data through or
00609     // the main stream is to be used
00610     // substreamid == 0 means to use the main stream
00611 
00612    int res;
00613 
00614    Touch();
00615 
00616    if (IsValid()) {
00617 
00618       Info(XrdClientDebug::kDUMPDEBUG,
00619            "WriteRaw",
00620            "Writing to substreamid " <<
00621            substreamid);
00622     
00623       res = fSocket->SendRaw(buf, len, substreamid);
00624 
00625       if ((res < 0)  && (res != TXSOCK_ERR_TIMEOUT) && errno) {
00626          //strerror_r(errno, errbuf, sizeof(buf));
00627 
00628          Info(XrdClientDebug::kHIDEBUG,
00629               "WriteRaw", "Write error on " <<
00630               fServer.Host << ":" << fServer.Port << ". errno=" << errno );
00631 
00632       }
00633 
00634       // If a socket error comes, then we disconnect (and destroy the fSocket)
00635       if ((res < 0) || (!fSocket) || (!fSocket->IsConnected())) {
00636 
00637          Info(XrdClientDebug::kHIDEBUG,
00638               "WriteRaw", 
00639               "Disconnection reported on" <<
00640               fServer.Host << ":" << fServer.Port);
00641 
00642          Disconnect();
00643       }
00644 
00645       Touch();
00646       return( res );
00647    }
00648    else {
00649       // Socket already destroyed or disconnected
00650       Info(XrdClientDebug::kUSERDEBUG,
00651            "WriteRaw",
00652            "Socket is disconnected.");
00653       return TXSOCK_ERR;
00654    }
00655 }
00656 
00657 
00658 //____________________________________________________________________________
00659 bool XrdClientPhyConnection::ExpiredTTL()
00660 {
00661    // Check expiration time
00662    return( (time(0) - fLastUseTimestamp) > fTTLsec );
00663 }
00664 
00665 //____________________________________________________________________________
00666 void XrdClientPhyConnection::LockChannel()
00667 {
00668    // Lock 
00669    fRwMutex.Lock();
00670 }
00671 
00672 //____________________________________________________________________________
00673 void XrdClientPhyConnection::UnlockChannel()
00674 {
00675    // Unlock
00676    fRwMutex.UnLock();
00677 }
00678 
00679 //_____________________________________________________________________________
00680 ERemoteServerType XrdClientPhyConnection::DoHandShake(ServerInitHandShake &xbody,
00681                                                       int substreamid)
00682 {
00683    // Performs initial hand-shake with the server in order to understand which 
00684    // kind of server is there at the other side and to make the server know who 
00685    // we are
00686    struct ClientInitHandShake initHS;
00687    ServerResponseType type;
00688    ERemoteServerType typeres = kSTNone;
00689 
00690    int writeres, readres, len;
00691 
00692    // Set field in network byte order
00693    memset(&initHS, 0, sizeof(initHS));
00694    initHS.fourth = (kXR_int32)htonl(4);
00695    initHS.fifth  = (kXR_int32)htonl(2012);
00696 
00697 
00698    // Send to the server the initial hand-shaking message asking for the 
00699    // kind of server
00700    len = sizeof(initHS);
00701 
00702    Info(XrdClientDebug::kHIDEBUG,
00703         "DoHandShake",
00704         "HandShake step 1: Sending " << len << " bytes.");
00705 
00706    writeres = WriteRaw(&initHS, len, substreamid);
00707 
00708    if (writeres < 0) {
00709       Info(XrdClientDebug::kNODEBUG,"DoHandShake", "Failed to send " << len <<
00710             " bytes. Retrying ...");
00711 
00712       return kSTError;
00713    }
00714 
00715    // Read from server the first 4 bytes
00716    len = sizeof(type);
00717 
00718    Info(XrdClientDebug::kHIDEBUG,
00719         "DoHandShake",
00720         "HandShake step 2: Reading " << len <<
00721         " bytes.");
00722  
00723    //
00724    // Read returns the return value of TSocket->RecvRaw... that returns the 
00725    // return value of recv (unix low level syscall)
00726    //
00727    readres = ReadRaw(&type, 
00728                      len, substreamid); // Reads 4(2+2) bytes
00729                
00730    if (readres < 0) {
00731       Info(XrdClientDebug::kNODEBUG, "DoHandShake", "Failed to read " << len <<
00732             " bytes. Retrying ...");
00733 
00734       return kSTError;
00735    }
00736 
00737    // to host byte order
00738    type = ntohl(type);
00739 
00740    // Check if the server is the eXtended rootd or not, checking the value 
00741    // of type
00742    if (type == 0) { // ok, eXtended!
00743 
00744       len = sizeof(xbody);
00745 
00746       Info(XrdClientDebug::kHIDEBUG,
00747            "DoHandShake",
00748            "HandShake step 3: Reading " << len << 
00749            " bytes.");
00750 
00751       readres = ReadRaw(&xbody, len, substreamid); // Read 12(4+4+4) bytes
00752 
00753       if (readres < 0) {
00754          Error("DoHandShake", "Error reading " << len << 
00755                " bytes.");
00756 
00757          return kSTError;
00758       }
00759 
00760       ServerInitHandShake2HostFmt(&xbody);
00761 
00762       Info(XrdClientDebug::kHIDEBUG,
00763            "DoHandShake",
00764            "Server protocol: " << xbody.protover << " type: " << xbody.msgval);
00765 
00766       // check if the eXtended rootd is a data server
00767       switch (xbody.msgval) {
00768 
00769       case kXR_DataServer:
00770          // This is a data server
00771          typeres = kSTDataXrootd;
00772          break;
00773 
00774       case kXR_LBalServer:
00775          typeres = kSTBaseXrootd;
00776          break;
00777       }
00778       
00779    } else {
00780 
00781       // We are here if it wasn't an XRootd
00782       // and we need to complete the reading
00783       if (type == 8)
00784          typeres = kSTRootd;
00785       else 
00786          // We dunno the server type
00787          typeres = kSTNone;
00788    }
00789 
00790    fServerType = typeres;
00791    return typeres;
00792 }
00793 
00794 //____________________________________________________________________________
00795 void XrdClientPhyConnection::CountLogConn(int d)
00796 {
00797    // Modify countre of logical connections using this phyconn
00798    fMutex.Lock();
00799    fLogConnCnt += d;
00800    fMutex.UnLock();
00801 }
00802 
00803 
00804 bool XrdClientPhyConnection::TestAndSetMStreamsGoing() {
00805   XrdSysMutexHelper mtx(fMutex);
00806   bool retval = fMStreamsGoing;
00807   fMStreamsGoing = true;
00808   return retval;
00809 }
00810 
00811 bool XrdClientPhyConnection::IsValid() {
00812   XrdSysMutexHelper l(fMutex);
00813   return ( (fSocket != 0) && fSocket->IsConnected());
00814 }
00815 
00816 ELoginState XrdClientPhyConnection::IsLogged() {
00817   const XrdSysMutexHelper l(fMutex);
00818   return fLogged;
00819 }

Generated on Tue Jul 5 14:46:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1