XrdClientPSock.cc

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // XrdClientPSock                                                       //
00004 //                                                                      //
00005 // Author: Fabrizio Furano (INFN Padova, 2006)                          //
00006 //                                                                      //
00007 // Client Socket with parallel streams and timeout features using XrdNet//
00008 //                                                                      //
00009 //////////////////////////////////////////////////////////////////////////
00010 
00011 //         $Id: XrdClientPSock.cc 33978 2010-06-18 10:02:18Z ganis $
00012 
00013 const char *XrdClientPSockCVSID = "$Id: XrdClientPSock.cc 33978 2010-06-18 10:02:18Z ganis $";
00014 
00015 #include <memory>
00016 #include <errno.h>
00017 #include <string.h>
00018 #include <assert.h>
00019 #include <math.h>
00020 #include "XrdClient/XrdClientPSock.hh"
00021 #include "XrdSys/XrdSysLogger.hh"
00022 #include "XrdNet/XrdNetSocket.hh"
00023 #include "XrdClient/XrdClientDebug.hh"
00024 #include "XrdClient/XrdClientEnv.hh"
00025 
00026 #ifdef __solaris__
00027 #include <sunmath.h>
00028 #endif
00029 
00030 #ifndef WIN32
00031 #include <unistd.h>
00032 #include <sys/poll.h>
00033 #else
00034 #include "XrdSys/XrdWin32.hh"
00035 #endif
00036 
00037 //_____________________________________________________________________________
00038 XrdClientPSock::XrdClientPSock(XrdClientUrlInfo Host, int windowsize):
00039     XrdClientSock(Host, windowsize) {
00040 
00041     lastsidhint = 0;
00042     fReinit_fd = true;
00043     
00044 }
00045 
00046 //_____________________________________________________________________________
00047 XrdClientPSock::~XrdClientPSock()
00048 {
00049    // Destructor
00050    Disconnect();
00051 }
00052 
00053 //_____________________________________________________________________________
00054 int CloseSockFunc(int K, int V, void *arg) {
00055     ::close(V);
00056     
00057     // And also we delete this item by returning < 0
00058     return -1;
00059 }
00060 //_____________________________________________________________________________
00061 void XrdClientPSock::Disconnect()
00062 {
00063   // Close the connection
00064   XrdSysMutexHelper mtx(fMutex);
00065 
00066   fConnected = FALSE;
00067     
00068   // Make the SocketPool invoke the closing of all sockets
00069   fSocketPool.Apply( CloseSockFunc, 0 );
00070     
00071   fSocketIdPool.Purge();
00072   fSocketIdRepo.Clear();
00073 
00074 }
00075 
00076 
00077 
00078 //_____________________________________________________________________________
00079 
00080 struct FdSetSockFuncPars {
00081    struct fdinfo *fdnfo;
00082    XrdOucRash<XrdClientSock::Sockdescr, XrdClientSock::Sockid> *banned;
00083 };
00084 
00085 int FdSetSockFunc(int sockid, int sockdescr, void *arg) {
00086    struct FdSetSockFuncPars *pars = (struct FdSetSockFuncPars *)arg;
00087    struct fdinfo *fds = pars->fdnfo;
00088    
00089 
00090    // There could some sockets in the "banned" state
00091    // I.e. still in the process of being handshaked, but present in the tables
00092    // Those sockets must not be taken into acct by the global selecting mechanism
00093    if ( (sockdescr >= 0) && (!pars->banned->Find(sockdescr)) ) {
00094       FD_SET(sockdescr, &fds->fdset);
00095       fds->maxfd = xrdmax(fds->maxfd, sockdescr);
00096    }
00097 
00098 
00099    // And we continue
00100    return 0;
00101 }
00102 
00103 //_____________________________________________________________________________
00104 int XrdClientPSock::RecvRaw(void* buffer, int length, int substreamid,
00105                            int *usedsubstreamid)
00106 {
00107   // Read bytes following carefully the timeout rules
00108   time_t starttime;
00109   int bytesread = 0;
00110   int selRet;
00111   // The local set of interesting sock descriptors
00112   struct fdinfo locfdinfo;
00113 
00114    // We cycle reading data.
00115    // An exit occurs if:
00116    // We have all the data we are waiting for
00117    // Or a timeout occurs
00118    // The connection is closed by the other peer
00119 
00120    if (!fConnected) {
00121        Error("XrdClientPSock::RecvRaw", "Not connected.");
00122        return TXSOCK_ERR;
00123    }
00124    if (GetMainSock() < 0) {
00125        Error("XrdClientPSock::RecvRaw", "cannot find main socket.");
00126        return TXSOCK_ERR;
00127    }
00128 
00129 
00130    starttime = time(0);
00131 
00132    while (bytesread < length) {
00133 
00134      // We cycle on the select, ignoring the possible interruptions
00135      // We are waiting for something to come from the socket(s)
00136      do {
00137         
00138        if (fReinit_fd) {
00139          // we want to reconstruct the global fd_set
00140          Info(XrdClientDebug::kDUMPDEBUG, "XrdClientPSock::RecvRaw", "Reconstructing global fd table.");
00141 
00142          XrdSysMutexHelper mtx(fMutex);
00143 
00144          FD_ZERO(&globalfdinfo.fdset);
00145          globalfdinfo.maxfd = 0;
00146 
00147          // We are interested in any sock, except for the banned ones
00148          struct FdSetSockFuncPars fdpars;
00149          fdpars.fdnfo = &globalfdinfo;
00150          fdpars.banned = &fSocketNYHandshakedIdPool;
00151 
00152          fSocketPool.Apply( FdSetSockFunc, (void *)&fdpars );
00153          fReinit_fd = false;
00154        }
00155 
00156        // If we already read something, then we are stuck to a single socket
00157        // waiting for the completion of its read
00158        // This is reflected in the local fdset hence we don't have to touch it
00159        //       if ((!bytesread) || (substreamid == -1)) {
00160 
00161        if (substreamid == -1) {
00162            // We are interested in any sock and we are not stuck
00163            // to any in particular so we take the global fdset
00164            locfdinfo = globalfdinfo;
00165            
00166          } else {
00167            // we are using a single specified sock
00168            XrdSysMutexHelper mtx(fMutex);
00169 
00170            FD_ZERO(&locfdinfo.fdset);
00171            locfdinfo.maxfd = 0;
00172            
00173            int sock = GetSock(substreamid);
00174            if (sock >= 0) {
00175              FD_SET(sock, &locfdinfo.fdset);
00176              locfdinfo.maxfd = sock;
00177              
00178            }
00179            else {
00180              Error("XrdClientPSock::RecvRaw", "since we entered RecvRaw, the substreamid " <<
00181                    substreamid << " has been removed.");
00182            
00183              // A dropped parallel stream is not considered
00184              // as an error
00185              if (substreamid == 0)
00186                return TXSOCK_ERR;
00187              else {
00188                XrdSysMutexHelper mtx(fMutex);
00189                if (sock >= 0)
00190                   FD_CLR(sock, &globalfdinfo.fdset);
00191 
00192                RemoveParallelSock(substreamid);
00193                //ReinitFDTable();
00194                return TXSOCK_ERR_TIMEOUT;
00195              }
00196            }
00197          }
00198 
00199          //       }
00200 
00201        
00202          // If too much time has elapsed, then we return an error
00203          if ((time(0) - starttime) > EnvGetLong(NAME_REQUESTTIMEOUT)) {
00204 
00205             return TXSOCK_ERR_TIMEOUT;
00206          }
00207 
00208          struct timeval tv = { 0, 100000 }; // .1 second as timeout step
00209 
00210          // Wait for some events from the socket pool
00211          errno = 0;
00212          selRet = select(locfdinfo.maxfd+1, &locfdinfo.fdset, NULL, NULL, &tv);
00213 
00214          if ( (selRet < 0) && (errno != EINTR) && (errno != EAGAIN) ) {
00215              Error("XrdClientPSock::RecvRaw", "Error in select() : " <<
00216                    ::strerror(errno));
00217 
00218              ReinitFDTable();
00219              return TXSOCK_ERR;
00220          }
00221 
00222       } while (selRet <= 0 && !fRDInterrupt);
00223 
00224       // If we are here, selRet is > 0 why?
00225       //  Because the timeout and the select error are handled inside the previous loop
00226       // But we could have been requested to interrupt
00227 
00228       if (GetMainSock() < 0) {
00229          Error("XrdClientPSock::RecvRaw", "since we entered RecvRaw, the main socket "
00230                "file descriptor has been removed.");
00231          return TXSOCK_ERR;
00232       }
00233 
00234       // If we have been interrupt, reset the interrupt and exit
00235       if (fRDInterrupt) {
00236          fRDInterrupt = 0;
00237          Error("XrdClientPSock::RecvRaw", "got interrupt");
00238          return TXSOCK_ERR_INTERRUPT;
00239       }
00240 
00241       // First of all, we check if there is something to read from any sock.
00242       // the first we find is ok for now
00243       for (int ii = 0; ii <= locfdinfo.maxfd; ii++) {
00244 
00245           if (FD_ISSET(ii, &locfdinfo.fdset)) {
00246               int n = 0;
00247 
00248               do {
00249                  errno = 0;
00250                  n = ::recv(ii, static_cast<char *>(buffer) + bytesread,
00251                             length - bytesread, 0);
00252               } while (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR));
00253 
00254               // If we read nothing, the connection has been closed by the other side
00255               if ((n <= 0)  && (errno != EINTR)) {
00256                 Error("XrdClientPSock::RecvRaw", "Error reading from socket " << ii << ". n=" << n <<
00257                       " Error:'" <<
00258                       ::strerror(errno) << "'");
00259 
00260                   // A dropped parallel stream is not considered
00261                   // as an error
00262                   if (( GetSockId(ii) == 0 ) || ( GetSockId(ii) == -1 ))
00263                       return TXSOCK_ERR;
00264                   else {
00265                     XrdSysMutexHelper mtx(fMutex);
00266                     FD_CLR(ii, &globalfdinfo.fdset);
00267                     RemoveParallelSock(GetSockId(ii));
00268                     //ReinitFDTable();
00269                     return TXSOCK_ERR_TIMEOUT;
00270                   }
00271 
00272               }
00273               
00274               if (n > 0) bytesread += n;
00275               
00276               // If we need to loop more than once to get the whole amount
00277               // of requested bytes, then we have to select only on this fd which
00278               // started providing a chunk of data
00279               FD_ZERO(&locfdinfo.fdset);
00280               FD_SET(ii, &locfdinfo.fdset);
00281               locfdinfo.maxfd = ii;
00282               substreamid = GetSockId(ii);
00283 
00284               if (usedsubstreamid) *usedsubstreamid = GetSockId(ii);
00285 
00286               // We got some data, hence we stop scanning the fd list,
00287               // but we remain stuck to the socket which started providing data
00288               break;
00289           }
00290       }
00291 
00292    } // while
00293 
00294    // Return number of bytes received
00295    // And also usedparsockid has been initialized with the sockid we got something from
00296 
00297    return bytesread;
00298 }
00299 
00300 int XrdClientPSock::SendRaw(const void* buffer, int length, int substreamid) {
00301 
00302     int sfd = GetSock(substreamid);
00303 
00304     Info(XrdClientDebug::kDUMPDEBUG,
00305          "SendRaw",
00306          "Writing to substreamid " <<
00307          substreamid << " mapped to socket fd " << sfd);
00308 
00309     return XrdClientSock::SendRaw(buffer, length, sfd);
00310 
00311 }
00312 
00313 //_____________________________________________________________________________
00314 void XrdClientPSock::TryConnect(bool isUnix) {
00315     // Already connected - we are done.
00316     //
00317     if (fConnected) {
00318         assert(GetMainSock() >= 0);
00319         return;
00320     }
00321 
00322     int s = TryConnect_low(isUnix);
00323 
00324     if (s >= 0) {
00325         XrdSysMutexHelper mtx(fMutex);
00326 
00327         int z = 0;
00328         fSocketPool.Rep(0, s);
00329         fSocketIdPool.Rep(s, z);
00330         //      fSocketIdRepo.Push_back(z);
00331     }
00332 
00333 }
00334 
00335 XrdClientSock::Sockdescr XrdClientPSock::TryConnectParallelSock(int port, int windowsz, Sockid &newid) {
00336 
00337     int s = TryConnect_low(false, port, windowsz);
00338 
00339     if (s >= 0) {
00340 
00341         XrdSysMutexHelper mtx(fMutex);
00342 
00343         // Now we have a good connection, valid from the TCP point of view
00344 
00345         // But we prevent the socket from appearing in the global fd table for now
00346         BanSockDescr(s, newid);
00347 
00348         // We put the descriptor and the id in the tables
00349         fSocketPool.Rep(newid, s);
00350         fSocketIdPool.Rep(s, newid);
00351 
00352     }
00353 
00354     return s;
00355 }
00356 
00357 int XrdClientPSock::RemoveParallelSock(int sockid) {
00358 
00359     XrdSysMutexHelper mtx(fMutex);
00360 
00361     int s = GetSock(sockid);
00362 
00363     if (s >= 0) ::close(s);
00364 
00365     fSocketIdPool.Del(s);
00366     fSocketPool.Del(sockid);
00367 
00368     for (int i = 0; i < fSocketIdRepo.GetSize(); i++)
00369         if (fSocketIdRepo[i] == sockid) {
00370             fSocketIdRepo.Erase(i);
00371             break;
00372         }
00373 
00374     return 0;
00375 }
00376 
00377 int XrdClientPSock::EstablishParallelSock(Sockid tmpsockid, Sockid newsockid) {
00378     XrdSysMutexHelper mtx(fMutex);
00379 
00380     Sockdescr s = GetSock(tmpsockid);
00381     if (s >= 0) {
00382   
00383         fSocketPool.Del(tmpsockid);
00384         fSocketIdPool.Del(s);
00385 
00386         fSocketPool.Rep(newsockid, s);
00387         fSocketIdPool.Rep(s, newsockid);
00388         fSocketIdRepo.Push_back(newsockid);
00389 
00390         Info(XrdClientDebug::kUSERDEBUG,
00391              "XrdClientSock::EstablishParallelSock", "Sockid " << newsockid << " established.");
00392 
00393         return 0;
00394     }
00395 
00396     return -1;
00397 
00398 }
00399 
00400 int XrdClientPSock::GetSockIdHint(int reqsperstream) {
00401 
00402   XrdSysMutexHelper mtx(fMutex);
00403 
00404   // A round robin through the secondary streams. We avoid
00405   // requesting data through the main one because it can become a bottleneck
00406   if (fSocketIdRepo.GetSize() > 0) {
00407      int tmp = lastsidhint+1;
00408      lastsidhint = ( ( tmp % (fSocketIdRepo.GetSize()*reqsperstream) )  );
00409   }
00410   else lastsidhint = 0;
00411 
00412   return fSocketIdRepo[lastsidhint / reqsperstream];
00413   //return (random() % (fSocketIdRepo.GetSize()+1));
00414 
00415 }
00416 
00417 
00418 
00419 void XrdClientPSock::PauseSelectOnSubstream(int substreamid) {
00420   XrdSysMutexHelper mtx(fMutex);
00421 
00422    int sock = GetSock(substreamid);
00423 
00424    if (sock >= 0)
00425       FD_CLR(sock, &globalfdinfo.fdset);
00426 
00427 }
00428 
00429 
00430 void XrdClientPSock::RestartSelectOnSubstream(int substreamid) {
00431   XrdSysMutexHelper mtx(fMutex);
00432 
00433    int sock = GetSock(substreamid);
00434 
00435    if (sock >= 0)
00436       FD_SET(sock, &globalfdinfo.fdset);
00437 
00438 }

Generated on Tue Jul 5 14:46:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1