00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "RConfig.h"
00022
00023
00024 #ifdef R__ALPHA
00025 #define _XOPEN_SOURCE_EXTENDED
00026 #endif
00027 #include <stdio.h>
00028 #include <string.h>
00029 #ifdef R__ALPHA
00030 #undef _XOPEN_SOURCE_EXTENDED
00031 #endif
00032
00033 #include <stdlib.h>
00034 #include <unistd.h>
00035 #include <signal.h>
00036 #include <sys/socket.h>
00037 #include <netinet/in.h>
00038 #include <netinet/tcp.h>
00039 #include <arpa/inet.h>
00040 #include <netdb.h>
00041 #include <fcntl.h>
00042 #include <errno.h>
00043 #if defined(_AIX)
00044 #include <strings.h>
00045 #endif
00046
00047 #if (defined(R__AIX) && !defined(_AIX43)) || \
00048 (defined(R__SUNGCC3) && !defined(__arch64__))
00049 # define USE_SIZE_T
00050 #elif defined(R__GLIBC) || defined(R__FBSD) || \
00051 (defined(R__SUNGCC3) && defined(__arch64__)) || \
00052 defined(R__OBSD) || defined(MAC_OS_X_VERSION_10_4) || \
00053 (defined(R__AIX) && defined(_AIX43))
00054 # define USE_SOCKLEN_T
00055 #endif
00056
00057 #include "rpdp.h"
00058
00059 extern int gDebug;
00060
00061 namespace ROOT {
00062
00063 extern ErrorHandler_t gErrSys;
00064
00065 int gParallel = 0;
00066
00067 static int gMaxFd;
00068 static int *gPSockFd;
00069 static int *gWriteBytesLeft;
00070 static int *gReadBytesLeft;
00071 static char **gWritePtr;
00072 static char **gReadPtr;
00073 static fd_set gFdSet;
00074
00075
00076 static void InitSelect(int nsock)
00077 {
00078
00079
00080 FD_ZERO(&gFdSet);
00081 gMaxFd = -1;
00082 for (int i = 0; i < nsock; i++) {
00083 FD_SET(gPSockFd[i], &gFdSet);
00084 if (gPSockFd[i] > gMaxFd)
00085 gMaxFd = gPSockFd[i];
00086 }
00087 }
00088
00089
00090 int NetParSend(const void *buf, int len)
00091 {
00092
00093
00094
00095 int i, alen = len, nsock = gParallel;
00096
00097
00098 if (len < 4096)
00099 nsock = 1;
00100
00101 for (i = 0; i < nsock; i++) {
00102 gWriteBytesLeft[i] = len/nsock;
00103 gWritePtr[i] = (char *)buf + (i*gWriteBytesLeft[i]);
00104 }
00105 gWriteBytesLeft[i-1] += len%nsock;
00106
00107 InitSelect(nsock);
00108
00109
00110 while (len > 0) {
00111
00112 fd_set writeReady = gFdSet;
00113
00114 int isel = select(gMaxFd+1, 0, &writeReady, 0, 0);
00115 if (isel < 0) {
00116 ErrorInfo("NetParSend: error on select");
00117 return -1;
00118 }
00119
00120 for (i = 0; i < nsock; i++) {
00121 if (FD_ISSET(gPSockFd[i], &writeReady)) {
00122 if (gWriteBytesLeft[i] > 0) {
00123 int ilen;
00124 again:
00125 ilen = send(gPSockFd[i], gWritePtr[i], gWriteBytesLeft[i], 0);
00126 if (ilen < 0) {
00127 if (GetErrno() == EAGAIN)
00128 goto again;
00129 ErrorInfo("NetParSend: error sending for socket %d (%d)",
00130 i, gPSockFd[i]);
00131 return -1;
00132 }
00133 gWriteBytesLeft[i] -= ilen;
00134 gWritePtr[i] += ilen;
00135 len -= ilen;
00136 }
00137 }
00138 }
00139 }
00140
00141 return alen;
00142 }
00143
00144
00145 int NetParRecv(void *buf, int len)
00146 {
00147
00148
00149
00150 int i, alen = len, nsock = gParallel;
00151
00152
00153 if (len < 4096)
00154 nsock = 1;
00155
00156 for (i = 0; i < nsock; i++) {
00157 gReadBytesLeft[i] = len/nsock;
00158 gReadPtr[i] = (char *)buf + (i*gReadBytesLeft[i]);
00159 }
00160 gReadBytesLeft[i-1] += len%nsock;
00161
00162 InitSelect(nsock);
00163
00164
00165 while (len > 0) {
00166
00167 fd_set readReady = gFdSet;
00168
00169 int isel = select(gMaxFd+1, &readReady, 0, 0, 0);
00170 if (isel < 0) {
00171 ErrorInfo("NetParRecv: error on select");
00172 return -1;
00173 }
00174
00175 for (i = 0; i < nsock; i++) {
00176 if (FD_ISSET(gPSockFd[i], &readReady)) {
00177 if (gReadBytesLeft[i] > 0) {
00178 int ilen = recv(gPSockFd[i], gReadPtr[i], gReadBytesLeft[i], 0);
00179 if (ilen < 0) {
00180 ErrorInfo("NetParRecv: error receiving for socket %d (%d)",
00181 i, gPSockFd[i]);
00182 return -1;
00183 } else if (ilen == 0) {
00184 ErrorInfo("NetParRecv: EOF on socket %d (%d)",
00185 i, gPSockFd[i]);
00186 return 0;
00187 }
00188 gReadBytesLeft[i] -= ilen;
00189 gReadPtr[i] += ilen;
00190 len -= ilen;
00191 }
00192 }
00193 }
00194 }
00195
00196 return alen;
00197 }
00198
00199
00200 int NetParOpen(int port, int size)
00201 {
00202
00203
00204
00205 struct sockaddr_in remote_addr;
00206 memset(&remote_addr, 0, sizeof(remote_addr));
00207
00208 #if defined(USE_SIZE_T)
00209 size_t remlen = sizeof(remote_addr);
00210 #elif defined(USE_SOCKLEN_T)
00211 socklen_t remlen = sizeof(remote_addr);
00212 #else
00213 int remlen = sizeof(remote_addr);
00214 #endif
00215
00216 if (!getpeername(NetGetSockFd(), (struct sockaddr *)&remote_addr, &remlen)) {
00217 remote_addr.sin_family = AF_INET;
00218 remote_addr.sin_port = htons(port);
00219
00220 gPSockFd = new int[size];
00221
00222 for (int i = 0; i < size; i++) {
00223 if ((gPSockFd[i] = socket(AF_INET, SOCK_STREAM, 0)) < 0)
00224 Error(gErrSys, kErrFatal, "NetParOpen: can't create socket %d (%d)",
00225 i, gPSockFd[i]);
00226
00227 NetSetOptions(kROOTD, gPSockFd[i], 65535);
00228
00229 if (connect(gPSockFd[i], (struct sockaddr *)&remote_addr, remlen) < 0)
00230 Error(gErrSys, kErrFatal, "NetParOpen: can't connect socket %d (%d)",
00231 i, gPSockFd[i]);
00232
00233
00234 int val;
00235 if ((val = fcntl(gPSockFd[i], F_GETFL, 0)) < 0)
00236 Error(gErrSys, kErrFatal, "NetParOpen: can't get control flags");
00237 val |= O_NONBLOCK;
00238 if (fcntl(gPSockFd[i], F_SETFL, val) < 0)
00239 Error(gErrSys, kErrFatal, "NetParOpen: can't make socket non blocking");
00240 }
00241
00242 gWriteBytesLeft = new int[size];
00243 gReadBytesLeft = new int[size];
00244 gWritePtr = new char*[size];
00245 gReadPtr = new char*[size];
00246
00247
00248 NetClose();
00249
00250 gParallel = size;
00251
00252 if (gDebug > 0)
00253 ErrorInfo("NetParOpen: %d parallel connections established", size);
00254
00255 } else
00256 Error(gErrSys, kErrFatal, "NetParOpen: can't get peer name");
00257
00258 return gParallel;
00259 }
00260
00261
00262 void NetParClose()
00263 {
00264
00265
00266 for (int i = 0; i < gParallel; i++)
00267 close(gPSockFd[i]);
00268
00269 if (gDebug > 0) {
00270 std::string host;
00271 NetGetRemoteHost(host);
00272 ErrorInfo("NetParClose: closing %d-stream connection to host %s",
00273 gParallel, host.data());
00274 }
00275
00276 delete [] gPSockFd;
00277 delete [] gWriteBytesLeft;
00278 delete [] gReadBytesLeft;
00279 delete [] gWritePtr;
00280 delete [] gReadPtr;
00281
00282 gParallel = 0;
00283 }
00284
00285 }