netpar.cxx

Go to the documentation of this file.
00001 // @(#)root/rpdutils:$Id: netpar.cxx 25546 2008-09-25 21:09:09Z rdm $
00002 // Author: Fons Rademakers   06/02/2001
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // netpar                                                               //
00015 //                                                                      //
00016 // Set of parallel network routines for rootd daemon process. To be     //
00017 // used when remote uses TPSocket to connect to rootd.                  //
00018 //                                                                      //
00019 //////////////////////////////////////////////////////////////////////////
00020 
00021 #include "RConfig.h"
00022 
00023 // avoid warning due to wrong bzero prototype (used by FD_ZERO macro)
00024 #ifdef R__ALPHA
00025 #define _XOPEN_SOURCE_EXTENDED
00026 #endif
00027 #include <stdio.h>
00028 #include <string.h>
00029 #ifdef R__ALPHA
00030 #undef _XOPEN_SOURCE_EXTENDED
00031 #endif
00032 
00033 #include <stdlib.h>
00034 #include <unistd.h>
00035 #include <signal.h>
00036 #include <sys/socket.h>
00037 #include <netinet/in.h>
00038 #include <netinet/tcp.h>
00039 #include <arpa/inet.h>
00040 #include <netdb.h>
00041 #include <fcntl.h>
00042 #include <errno.h>
00043 #if defined(_AIX)
00044 #include <strings.h>
00045 #endif
00046 
00047 #if (defined(R__AIX) && !defined(_AIX43)) || \
00048     (defined(R__SUNGCC3) && !defined(__arch64__))
00049 #   define USE_SIZE_T
00050 #elif defined(R__GLIBC) || defined(R__FBSD) || \
00051       (defined(R__SUNGCC3) && defined(__arch64__)) || \
00052       defined(R__OBSD) || defined(MAC_OS_X_VERSION_10_4) || \
00053       (defined(R__AIX) && defined(_AIX43))
00054 #   define USE_SOCKLEN_T
00055 #endif
00056 
00057 #include "rpdp.h"
00058 
00059 extern int gDebug;
00060 
00061 namespace ROOT {
00062 
00063 extern ErrorHandler_t gErrSys;
00064 
00065 int gParallel = 0;
00066 
00067 static int    gMaxFd;
00068 static int   *gPSockFd;
00069 static int   *gWriteBytesLeft;
00070 static int   *gReadBytesLeft;
00071 static char **gWritePtr;
00072 static char **gReadPtr;
00073 static fd_set gFdSet;
00074 
00075 //______________________________________________________________________________
00076 static void InitSelect(int nsock)
00077 {
00078    // Setup select masks.
00079 
00080    FD_ZERO(&gFdSet);
00081    gMaxFd = -1;
00082    for (int i = 0; i < nsock; i++) {
00083       FD_SET(gPSockFd[i], &gFdSet);
00084       if (gPSockFd[i] > gMaxFd)
00085          gMaxFd = gPSockFd[i];
00086    }
00087 }
00088 
00089 //______________________________________________________________________________
00090 int NetParSend(const void *buf, int len)
00091 {
00092    // Send buffer of specified length over the parallel sockets.
00093    // Returns len in case of success and -1 in case of error.
00094 
00095    int i, alen = len, nsock = gParallel;
00096 
00097    // If data buffer is < 4K use only one socket
00098    if (len < 4096)
00099       nsock = 1;
00100 
00101    for (i = 0; i < nsock; i++) {
00102       gWriteBytesLeft[i] = len/nsock;
00103       gWritePtr[i] = (char *)buf + (i*gWriteBytesLeft[i]);
00104    }
00105    gWriteBytesLeft[i-1] += len%nsock;
00106 
00107    InitSelect(nsock);
00108 
00109    // Send the data on the parallel sockets
00110    while (len > 0) {
00111 
00112       fd_set writeReady = gFdSet;
00113 
00114       int isel = select(gMaxFd+1, 0, &writeReady, 0, 0);
00115       if (isel < 0) {
00116          ErrorInfo("NetParSend: error on select");
00117          return -1;
00118       }
00119 
00120       for (i = 0; i < nsock; i++) {
00121          if (FD_ISSET(gPSockFd[i], &writeReady)) {
00122             if (gWriteBytesLeft[i] > 0) {
00123                int ilen;
00124 again:
00125                ilen = send(gPSockFd[i], gWritePtr[i], gWriteBytesLeft[i], 0);
00126                if (ilen < 0) {
00127                   if (GetErrno() == EAGAIN)
00128                      goto again;
00129                   ErrorInfo("NetParSend: error sending for socket %d (%d)",
00130                             i, gPSockFd[i]);
00131                   return -1;
00132                }
00133                gWriteBytesLeft[i] -= ilen;
00134                gWritePtr[i] += ilen;
00135                len -= ilen;
00136             }
00137          }
00138       }
00139    }
00140 
00141    return alen;
00142 }
00143 
00144 //______________________________________________________________________________
00145 int NetParRecv(void *buf, int len)
00146 {
00147    // Receive buffer of specified length over parallel sockets.
00148    // Returns len in case of success and -1 in case of error.
00149 
00150    int i, alen = len, nsock = gParallel;
00151 
00152    // If data buffer is < 4K use only one socket
00153    if (len < 4096)
00154       nsock = 1;
00155 
00156    for (i = 0; i < nsock; i++) {
00157       gReadBytesLeft[i] = len/nsock;
00158       gReadPtr[i] = (char *)buf + (i*gReadBytesLeft[i]);
00159    }
00160    gReadBytesLeft[i-1] += len%nsock;
00161 
00162    InitSelect(nsock);
00163 
00164    // Recieve the data on the parallel sockets
00165    while (len > 0) {
00166 
00167       fd_set readReady = gFdSet;
00168 
00169       int isel = select(gMaxFd+1, &readReady, 0, 0, 0);
00170       if (isel < 0) {
00171          ErrorInfo("NetParRecv: error on select");
00172          return -1;
00173       }
00174 
00175       for (i = 0; i < nsock; i++) {
00176          if (FD_ISSET(gPSockFd[i], &readReady)) {
00177             if (gReadBytesLeft[i] > 0) {
00178                int ilen = recv(gPSockFd[i], gReadPtr[i], gReadBytesLeft[i], 0);
00179                if (ilen < 0) {
00180                   ErrorInfo("NetParRecv: error receiving for socket %d (%d)",
00181                             i, gPSockFd[i]);
00182                   return -1;
00183                } else if (ilen == 0) {
00184                   ErrorInfo("NetParRecv: EOF on socket %d (%d)",
00185                             i, gPSockFd[i]);
00186                   return 0;
00187                }
00188                gReadBytesLeft[i] -= ilen;
00189                gReadPtr[i] += ilen;
00190                len -= ilen;
00191             }
00192          }
00193       }
00194    }
00195 
00196    return alen;
00197 }
00198 
00199 //______________________________________________________________________________
00200 int NetParOpen(int port, int size)
00201 {
00202    // Open size parallel sockets back to client. Returns 0 in case of error,
00203    // and number of parallel sockets in case of success.
00204 
00205    struct sockaddr_in remote_addr;
00206    memset(&remote_addr, 0, sizeof(remote_addr));
00207 
00208 #if defined(USE_SIZE_T)
00209    size_t remlen = sizeof(remote_addr);
00210 #elif defined(USE_SOCKLEN_T)
00211    socklen_t remlen = sizeof(remote_addr);
00212 #else
00213    int remlen = sizeof(remote_addr);
00214 #endif
00215 
00216    if (!getpeername(NetGetSockFd(), (struct sockaddr *)&remote_addr, &remlen)) {
00217       remote_addr.sin_family = AF_INET;
00218       remote_addr.sin_port   = htons(port);
00219 
00220       gPSockFd = new int[size];
00221 
00222       for (int i = 0; i < size; i++) {
00223          if ((gPSockFd[i] = socket(AF_INET, SOCK_STREAM, 0)) < 0)
00224             Error(gErrSys, kErrFatal, "NetParOpen: can't create socket %d (%d)",
00225                      i, gPSockFd[i]);
00226 
00227          NetSetOptions(kROOTD, gPSockFd[i], 65535);
00228 
00229          if (connect(gPSockFd[i], (struct sockaddr *)&remote_addr, remlen) < 0)
00230             Error(gErrSys, kErrFatal, "NetParOpen: can't connect socket %d (%d)",
00231                   i, gPSockFd[i]);
00232 
00233          // Set non-blocking
00234          int val;
00235          if ((val = fcntl(gPSockFd[i], F_GETFL, 0)) < 0)
00236             Error(gErrSys, kErrFatal, "NetParOpen: can't get control flags");
00237          val |= O_NONBLOCK;
00238          if (fcntl(gPSockFd[i], F_SETFL, val) < 0)
00239             Error(gErrSys, kErrFatal, "NetParOpen: can't make socket non blocking");
00240       }
00241 
00242       gWriteBytesLeft = new int[size];
00243       gReadBytesLeft  = new int[size];
00244       gWritePtr       = new char*[size];
00245       gReadPtr        = new char*[size];
00246 
00247       // Close initial setup socket
00248       NetClose();
00249 
00250       gParallel = size;
00251 
00252       if (gDebug > 0)
00253          ErrorInfo("NetParOpen: %d parallel connections established", size);
00254 
00255    } else
00256       Error(gErrSys, kErrFatal, "NetParOpen: can't get peer name");
00257 
00258    return gParallel;
00259 }
00260 
00261 //______________________________________________________________________________
00262 void NetParClose()
00263 {
00264    // Close parallel sockets.
00265 
00266    for (int i = 0; i < gParallel; i++)
00267       close(gPSockFd[i]);
00268 
00269    if (gDebug > 0) {
00270       std::string host;
00271       NetGetRemoteHost(host);
00272       ErrorInfo("NetParClose: closing %d-stream connection to host %s",
00273                 gParallel, host.data());
00274    }
00275 
00276    delete [] gPSockFd;
00277    delete [] gWriteBytesLeft;
00278    delete [] gReadBytesLeft;
00279    delete [] gWritePtr;
00280    delete [] gReadPtr;
00281 
00282    gParallel = 0;
00283 }
00284 
00285 } // namespace ROOT

Generated on Tue Jul 5 14:46:13 2011 for ROOT_528-00b_version by  doxygen 1.5.1