00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdClientPSockCVSID = "$Id: XrdClientPSock.cc 33978 2010-06-18 10:02:18Z ganis $";
00014
00015 #include <memory>
00016 #include <errno.h>
00017 #include <string.h>
00018 #include <assert.h>
00019 #include <math.h>
00020 #include "XrdClient/XrdClientPSock.hh"
00021 #include "XrdSys/XrdSysLogger.hh"
00022 #include "XrdNet/XrdNetSocket.hh"
00023 #include "XrdClient/XrdClientDebug.hh"
00024 #include "XrdClient/XrdClientEnv.hh"
00025
00026 #ifdef __solaris__
00027 #include <sunmath.h>
00028 #endif
00029
00030 #ifndef WIN32
00031 #include <unistd.h>
00032 #include <sys/poll.h>
00033 #else
00034 #include "XrdSys/XrdWin32.hh"
00035 #endif
00036
00037
00038 XrdClientPSock::XrdClientPSock(XrdClientUrlInfo Host, int windowsize):
00039 XrdClientSock(Host, windowsize) {
00040
00041 lastsidhint = 0;
00042 fReinit_fd = true;
00043
00044 }
00045
00046
00047 XrdClientPSock::~XrdClientPSock()
00048 {
00049
00050 Disconnect();
00051 }
00052
00053
00054 int CloseSockFunc(int K, int V, void *arg) {
00055 ::close(V);
00056
00057
00058 return -1;
00059 }
00060
00061 void XrdClientPSock::Disconnect()
00062 {
00063
00064 XrdSysMutexHelper mtx(fMutex);
00065
00066 fConnected = FALSE;
00067
00068
00069 fSocketPool.Apply( CloseSockFunc, 0 );
00070
00071 fSocketIdPool.Purge();
00072 fSocketIdRepo.Clear();
00073
00074 }
00075
00076
00077
00078
00079
00080 struct FdSetSockFuncPars {
00081 struct fdinfo *fdnfo;
00082 XrdOucRash<XrdClientSock::Sockdescr, XrdClientSock::Sockid> *banned;
00083 };
00084
00085 int FdSetSockFunc(int sockid, int sockdescr, void *arg) {
00086 struct FdSetSockFuncPars *pars = (struct FdSetSockFuncPars *)arg;
00087 struct fdinfo *fds = pars->fdnfo;
00088
00089
00090
00091
00092
00093 if ( (sockdescr >= 0) && (!pars->banned->Find(sockdescr)) ) {
00094 FD_SET(sockdescr, &fds->fdset);
00095 fds->maxfd = xrdmax(fds->maxfd, sockdescr);
00096 }
00097
00098
00099
00100 return 0;
00101 }
00102
00103
00104 int XrdClientPSock::RecvRaw(void* buffer, int length, int substreamid,
00105 int *usedsubstreamid)
00106 {
00107
00108 time_t starttime;
00109 int bytesread = 0;
00110 int selRet;
00111
00112 struct fdinfo locfdinfo;
00113
00114
00115
00116
00117
00118
00119
00120 if (!fConnected) {
00121 Error("XrdClientPSock::RecvRaw", "Not connected.");
00122 return TXSOCK_ERR;
00123 }
00124 if (GetMainSock() < 0) {
00125 Error("XrdClientPSock::RecvRaw", "cannot find main socket.");
00126 return TXSOCK_ERR;
00127 }
00128
00129
00130 starttime = time(0);
00131
00132 while (bytesread < length) {
00133
00134
00135
00136 do {
00137
00138 if (fReinit_fd) {
00139
00140 Info(XrdClientDebug::kDUMPDEBUG, "XrdClientPSock::RecvRaw", "Reconstructing global fd table.");
00141
00142 XrdSysMutexHelper mtx(fMutex);
00143
00144 FD_ZERO(&globalfdinfo.fdset);
00145 globalfdinfo.maxfd = 0;
00146
00147
00148 struct FdSetSockFuncPars fdpars;
00149 fdpars.fdnfo = &globalfdinfo;
00150 fdpars.banned = &fSocketNYHandshakedIdPool;
00151
00152 fSocketPool.Apply( FdSetSockFunc, (void *)&fdpars );
00153 fReinit_fd = false;
00154 }
00155
00156
00157
00158
00159
00160
00161 if (substreamid == -1) {
00162
00163
00164 locfdinfo = globalfdinfo;
00165
00166 } else {
00167
00168 XrdSysMutexHelper mtx(fMutex);
00169
00170 FD_ZERO(&locfdinfo.fdset);
00171 locfdinfo.maxfd = 0;
00172
00173 int sock = GetSock(substreamid);
00174 if (sock >= 0) {
00175 FD_SET(sock, &locfdinfo.fdset);
00176 locfdinfo.maxfd = sock;
00177
00178 }
00179 else {
00180 Error("XrdClientPSock::RecvRaw", "since we entered RecvRaw, the substreamid " <<
00181 substreamid << " has been removed.");
00182
00183
00184
00185 if (substreamid == 0)
00186 return TXSOCK_ERR;
00187 else {
00188 XrdSysMutexHelper mtx(fMutex);
00189 if (sock >= 0)
00190 FD_CLR(sock, &globalfdinfo.fdset);
00191
00192 RemoveParallelSock(substreamid);
00193
00194 return TXSOCK_ERR_TIMEOUT;
00195 }
00196 }
00197 }
00198
00199
00200
00201
00202
00203 if ((time(0) - starttime) > EnvGetLong(NAME_REQUESTTIMEOUT)) {
00204
00205 return TXSOCK_ERR_TIMEOUT;
00206 }
00207
00208 struct timeval tv = { 0, 100000 };
00209
00210
00211 errno = 0;
00212 selRet = select(locfdinfo.maxfd+1, &locfdinfo.fdset, NULL, NULL, &tv);
00213
00214 if ( (selRet < 0) && (errno != EINTR) && (errno != EAGAIN) ) {
00215 Error("XrdClientPSock::RecvRaw", "Error in select() : " <<
00216 ::strerror(errno));
00217
00218 ReinitFDTable();
00219 return TXSOCK_ERR;
00220 }
00221
00222 } while (selRet <= 0 && !fRDInterrupt);
00223
00224
00225
00226
00227
00228 if (GetMainSock() < 0) {
00229 Error("XrdClientPSock::RecvRaw", "since we entered RecvRaw, the main socket "
00230 "file descriptor has been removed.");
00231 return TXSOCK_ERR;
00232 }
00233
00234
00235 if (fRDInterrupt) {
00236 fRDInterrupt = 0;
00237 Error("XrdClientPSock::RecvRaw", "got interrupt");
00238 return TXSOCK_ERR_INTERRUPT;
00239 }
00240
00241
00242
00243 for (int ii = 0; ii <= locfdinfo.maxfd; ii++) {
00244
00245 if (FD_ISSET(ii, &locfdinfo.fdset)) {
00246 int n = 0;
00247
00248 do {
00249 errno = 0;
00250 n = ::recv(ii, static_cast<char *>(buffer) + bytesread,
00251 length - bytesread, 0);
00252 } while (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR));
00253
00254
00255 if ((n <= 0) && (errno != EINTR)) {
00256 Error("XrdClientPSock::RecvRaw", "Error reading from socket " << ii << ". n=" << n <<
00257 " Error:'" <<
00258 ::strerror(errno) << "'");
00259
00260
00261
00262 if (( GetSockId(ii) == 0 ) || ( GetSockId(ii) == -1 ))
00263 return TXSOCK_ERR;
00264 else {
00265 XrdSysMutexHelper mtx(fMutex);
00266 FD_CLR(ii, &globalfdinfo.fdset);
00267 RemoveParallelSock(GetSockId(ii));
00268
00269 return TXSOCK_ERR_TIMEOUT;
00270 }
00271
00272 }
00273
00274 if (n > 0) bytesread += n;
00275
00276
00277
00278
00279 FD_ZERO(&locfdinfo.fdset);
00280 FD_SET(ii, &locfdinfo.fdset);
00281 locfdinfo.maxfd = ii;
00282 substreamid = GetSockId(ii);
00283
00284 if (usedsubstreamid) *usedsubstreamid = GetSockId(ii);
00285
00286
00287
00288 break;
00289 }
00290 }
00291
00292 }
00293
00294
00295
00296
00297 return bytesread;
00298 }
00299
00300 int XrdClientPSock::SendRaw(const void* buffer, int length, int substreamid) {
00301
00302 int sfd = GetSock(substreamid);
00303
00304 Info(XrdClientDebug::kDUMPDEBUG,
00305 "SendRaw",
00306 "Writing to substreamid " <<
00307 substreamid << " mapped to socket fd " << sfd);
00308
00309 return XrdClientSock::SendRaw(buffer, length, sfd);
00310
00311 }
00312
00313
00314 void XrdClientPSock::TryConnect(bool isUnix) {
00315
00316
00317 if (fConnected) {
00318 assert(GetMainSock() >= 0);
00319 return;
00320 }
00321
00322 int s = TryConnect_low(isUnix);
00323
00324 if (s >= 0) {
00325 XrdSysMutexHelper mtx(fMutex);
00326
00327 int z = 0;
00328 fSocketPool.Rep(0, s);
00329 fSocketIdPool.Rep(s, z);
00330
00331 }
00332
00333 }
00334
00335 XrdClientSock::Sockdescr XrdClientPSock::TryConnectParallelSock(int port, int windowsz, Sockid &newid) {
00336
00337 int s = TryConnect_low(false, port, windowsz);
00338
00339 if (s >= 0) {
00340
00341 XrdSysMutexHelper mtx(fMutex);
00342
00343
00344
00345
00346 BanSockDescr(s, newid);
00347
00348
00349 fSocketPool.Rep(newid, s);
00350 fSocketIdPool.Rep(s, newid);
00351
00352 }
00353
00354 return s;
00355 }
00356
00357 int XrdClientPSock::RemoveParallelSock(int sockid) {
00358
00359 XrdSysMutexHelper mtx(fMutex);
00360
00361 int s = GetSock(sockid);
00362
00363 if (s >= 0) ::close(s);
00364
00365 fSocketIdPool.Del(s);
00366 fSocketPool.Del(sockid);
00367
00368 for (int i = 0; i < fSocketIdRepo.GetSize(); i++)
00369 if (fSocketIdRepo[i] == sockid) {
00370 fSocketIdRepo.Erase(i);
00371 break;
00372 }
00373
00374 return 0;
00375 }
00376
00377 int XrdClientPSock::EstablishParallelSock(Sockid tmpsockid, Sockid newsockid) {
00378 XrdSysMutexHelper mtx(fMutex);
00379
00380 Sockdescr s = GetSock(tmpsockid);
00381 if (s >= 0) {
00382
00383 fSocketPool.Del(tmpsockid);
00384 fSocketIdPool.Del(s);
00385
00386 fSocketPool.Rep(newsockid, s);
00387 fSocketIdPool.Rep(s, newsockid);
00388 fSocketIdRepo.Push_back(newsockid);
00389
00390 Info(XrdClientDebug::kUSERDEBUG,
00391 "XrdClientSock::EstablishParallelSock", "Sockid " << newsockid << " established.");
00392
00393 return 0;
00394 }
00395
00396 return -1;
00397
00398 }
00399
00400 int XrdClientPSock::GetSockIdHint(int reqsperstream) {
00401
00402 XrdSysMutexHelper mtx(fMutex);
00403
00404
00405
00406 if (fSocketIdRepo.GetSize() > 0) {
00407 int tmp = lastsidhint+1;
00408 lastsidhint = ( ( tmp % (fSocketIdRepo.GetSize()*reqsperstream) ) );
00409 }
00410 else lastsidhint = 0;
00411
00412 return fSocketIdRepo[lastsidhint / reqsperstream];
00413
00414
00415 }
00416
00417
00418
00419 void XrdClientPSock::PauseSelectOnSubstream(int substreamid) {
00420 XrdSysMutexHelper mtx(fMutex);
00421
00422 int sock = GetSock(substreamid);
00423
00424 if (sock >= 0)
00425 FD_CLR(sock, &globalfdinfo.fdset);
00426
00427 }
00428
00429
00430 void XrdClientPSock::RestartSelectOnSubstream(int substreamid) {
00431 XrdSysMutexHelper mtx(fMutex);
00432
00433 int sock = GetSock(substreamid);
00434
00435 if (sock >= 0)
00436 FD_SET(sock, &globalfdinfo.fdset);
00437
00438 }