00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdNetLinkCVSID = "$Id: XrdNetLink.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <fcntl.h>
00016 #ifndef WIN32
00017 #include <poll.h>
00018 #include <string.h>
00019 #include <sys/types.h>
00020 #include <sys/socket.h>
00021 #else
00022 #include "XrdSys/XrdWin32.hh"
00023 #endif
00024
00025 #include "XrdNet/XrdNet.hh"
00026 #include "XrdNet/XrdNetBuffer.hh"
00027 #include "XrdNet/XrdNetDNS.hh"
00028 #include "XrdNet/XrdNetLink.hh"
00029 #include "XrdNet/XrdNetPeer.hh"
00030 #include "XrdSys/XrdSysError.hh"
00031 #include "XrdSys/XrdSysPlatform.hh"
00032 #include "XrdOuc/XrdOucStream.hh"
00033 #include "XrdOuc/XrdOucTokenizer.hh"
00034
00035
00036
00037
00038
00039 XrdSysMutex XrdNetLink::LinkList;
00040 XrdOucStack<XrdNetLink> XrdNetLink::LinkStack;
00041 int XrdNetLink::maxlink = 16;
00042 int XrdNetLink::numlink = 0;
00043 int XrdNetLink::devNull = open("/dev/null", O_RDONLY);
00044
00045
00046
00047
00048
00049 XrdNetLink *XrdNetLink::Alloc(XrdSysError *erp, XrdNet *Net, XrdNetPeer &Peer,
00050 XrdNetBufferQ *bq, int opts)
00051 {
00052 XrdNetLink *lp;
00053
00054
00055
00056 LinkList.Lock();
00057 if ((lp = LinkStack.Pop())) numlink--;
00058 LinkList.UnLock();
00059
00060
00061
00062 if (!lp) lp = new XrdNetLink(erp, bq);
00063 else if (bq != lp->BuffQ)
00064 {if (lp->recvbuff) {lp->recvbuff->Recycle(); lp->recvbuff = 0;}
00065 if (lp->sendbuff) {lp->sendbuff->Recycle(); lp->sendbuff = 0;}
00066 lp->BuffQ = bq;
00067 }
00068
00069
00070
00071 if (!lp) return lp;
00072 lp->noclose = (opts & XRDNETLINK_NOCLOSE);
00073 lp->isReset = 0;
00074
00075
00076
00077 if (Peer.InetBuff)
00078 { lp->recvbuff = Peer.InetBuff;
00079 if (!(lp->Bucket = new XrdOucTokenizer(Peer.InetBuff->data)))
00080 {lp->Recycle(); lp = 0;}
00081 } else if (!(opts & XRDNETLINK_NOSTREAM))
00082 {if (!(lp->Stream = new XrdOucStream(erp)))
00083 {lp->Recycle(); lp = 0;}
00084 else lp->Stream->Attach(Peer.fd);
00085 }
00086
00087
00088
00089 if (lp)
00090 {memcpy((void *)&(lp->InetAddr), (const void *)&Peer.InetAddr,
00091 sizeof(Peer.InetAddr));
00092 if (Peer.InetName) lp->Lname = strdup(Peer.InetName);
00093 else lp->Lname = XrdNetDNS::getHostName(Peer.InetAddr);
00094 lp->Sname = strdup(lp->Lname);
00095 Net->Trim(lp->Sname);
00096 lp->FD = Peer.fd;
00097 }
00098
00099
00100
00101 return lp;
00102 }
00103
00104
00105
00106
00107
00108 int XrdNetLink::Close(int defer)
00109 {
00110
00111
00112
00113
00114 if(!defer) rdMutex.Lock();
00115 wrMutex.Lock();
00116
00117
00118
00119
00120
00121 if (!defer)
00122 {if (Stream) {Stream->Detach(); delete Stream; Stream = 0;}
00123 if (Bucket) {delete Bucket; Bucket = 0;}
00124 if (recvbuff) {recvbuff->Recycle(); recvbuff = 0;}
00125 if (sendbuff) {sendbuff->Recycle(); sendbuff = 0;}
00126 if (Lname) {free(Lname); Lname = 0;}
00127 if (FD >= 0 && !noclose) close(FD);
00128 FD = -1;
00129 } else if (FD >= 0 && !isReset)
00130 {dup2(devNull, FD); isReset = 1;}
00131
00132
00133
00134 wrMutex.UnLock();
00135 if (!defer) rdMutex.UnLock();
00136 return 0;
00137 }
00138
00139
00140
00141
00142
00143 char *XrdNetLink::GetLine()
00144 {
00145 if (Stream) return Stream->GetLine();
00146 else if (Bucket) return Bucket->GetLine();
00147 else if (recvbuff && recvbuff->dlen) return recvbuff->data;
00148 return 0;
00149 }
00150
00151
00152
00153
00154
00155 char *XrdNetLink::GetToken(char **rest)
00156 {
00157 if (Stream) return Stream->GetToken(rest);
00158 else if (Bucket) return Bucket->GetToken(rest);
00159 return 0;
00160 }
00161
00162 char *XrdNetLink::GetToken()
00163 {
00164 if (Stream) return Stream->GetToken();
00165 else if (Bucket) return Bucket->GetToken();
00166 return 0;
00167 }
00168
00169
00170
00171
00172
00173 int XrdNetLink::LastError()
00174 {
00175 return (Stream ? Stream->LastError() : 0);
00176 }
00177
00178
00179
00180
00181
00182 int XrdNetLink::OK2Recv(int timeout)
00183 {
00184 struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
00185 int retc;
00186
00187 do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
00188
00189 return (retc == 1 && (polltab.revents & (POLLIN | POLLRDNORM)));
00190 }
00191
00192
00193
00194
00195
00196 void XrdNetLink::Recycle()
00197 {
00198
00199
00200
00201 if (numlink >= maxlink) {delete this; return;}
00202 Close();
00203
00204
00205
00206 LinkList.Lock();
00207 LinkStack.Push(&LinkLink);
00208 numlink++;
00209 LinkList.UnLock();
00210 return;
00211 }
00212
00213
00214
00215
00216
00217 int XrdNetLink::Recv(char *Buff, int Blen)
00218 {
00219 ssize_t rlen;
00220
00221
00222
00223 rdMutex.Lock();
00224 do {rlen = read(FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
00225 rdMutex.UnLock();
00226
00227 if (rlen >= 0) return int(rlen);
00228 eDest->Emsg("Link", errno, "recieve from", Lname);
00229 return -1;
00230 }
00231
00232
00233
00234
00235
00236 void XrdNetLink::RetToken()
00237 {
00238 if (Stream) Stream->RetToken();
00239 else if (Bucket) Bucket->RetToken();
00240 }
00241
00242
00243
00244
00245
00246 int XrdNetLink::Send(const char *Buff, int Blen, int tmo)
00247 {
00248 int retc;
00249
00250 if (!Blen && !(Blen = strlen(Buff))) return 0;
00251 if ('\n' != Buff[Blen-1])
00252 {const struct iovec iodata[2] = {{IOV_INIT((char *)Buff, Blen)},
00253 {IOV_INIT((char *)"\n", 1)}};
00254 return Send(iodata, 2, tmo);
00255 }
00256
00257 wrMutex.Lock();
00258
00259 if (tmo >= 0 && !OK2Send(tmo))
00260 {wrMutex.UnLock(); return -2;}
00261
00262 if (Stream)
00263 do {retc = write(FD, Buff, Blen);}
00264 while (retc < 0 && errno == EINTR);
00265 else
00266 do {retc = sendto(FD, (Sokdata_t)Buff, Blen, 0,
00267 (struct sockaddr *)&InetAddr, sizeof(InetAddr));}
00268 while (retc < 0 && errno == EINTR);
00269 if (retc < 0) return retErr(errno);
00270 wrMutex.UnLock();
00271 return 0;
00272 }
00273
00274 int XrdNetLink::Send(const void *Buff, int Blen, int tmo)
00275 {
00276 int retc;
00277
00278 wrMutex.Lock();
00279
00280 if (tmo >= 0 && !OK2Send(tmo))
00281 {wrMutex.UnLock(); return -2;}
00282
00283 if (Stream)
00284 do {retc = write(FD, Buff, Blen);}
00285 while (retc < 0 && errno == EINTR);
00286 else
00287 do {retc = sendto(FD, (Sokdata_t)Buff, Blen, 0,
00288 (struct sockaddr *)&InetAddr, sizeof(InetAddr));}
00289 while (retc < 0 && errno == EINTR);
00290 if (retc < 0) return retErr(errno);
00291 wrMutex.UnLock();
00292 return 0;
00293 }
00294
00295 int XrdNetLink::Send(const char *dest, const char *Buff, int Blen, int tmo)
00296 {
00297 int retc;
00298 struct sockaddr destip;
00299
00300 if (!Blen && !(Blen = strlen(Buff))) return 0;
00301 if ('\n' != Buff[Blen-1])
00302 {const struct iovec iodata[2] = {{IOV_INIT((char *)Buff, Blen)},
00303 {IOV_INIT((char *)"\n", 1)}};
00304 return Send(dest, iodata, 2, tmo);
00305 }
00306
00307 if (!XrdNetDNS::Host2Dest(dest, destip))
00308 {eDest->Emsg("Link", dest, "is unreachable");
00309 return -1;
00310 }
00311
00312 if (Stream)
00313 {eDest->Emsg("Link", "Unable to send msg to", dest, "on a stream socket");
00314 return -1;
00315 }
00316
00317 wrMutex.Lock();
00318
00319 if (tmo >= 0 && !OK2Send(tmo, dest))
00320 {wrMutex.UnLock(); return -2;}
00321
00322 do {retc = sendto(FD, (Sokdata_t)Buff, Blen, 0,
00323 (struct sockaddr *)&destip, sizeof(destip));}
00324 while (retc < 0 && errno == EINTR);
00325
00326 if (retc < 0) return retErr(errno, dest);
00327 wrMutex.UnLock();
00328 return 0;
00329 }
00330
00331 int XrdNetLink::Send(const struct iovec iov[], int iovcnt, int tmo)
00332 {
00333 int i, dsz, retc;
00334 char *bp;
00335
00336 wrMutex.Lock();
00337
00338 if (tmo >= 0 && !OK2Send(tmo))
00339 {wrMutex.UnLock(); return -2;}
00340
00341 if (Stream)
00342 do {retc = writev(FD, iov, iovcnt);}
00343 while (retc < 0 && errno == EINTR);
00344 else
00345 {if (!sendbuff && !(sendbuff = BuffQ->Alloc())) return retErr(ENOMEM);
00346 dsz = sendbuff->BuffSize(); bp = sendbuff->data;
00347 for (i = 0; i < iovcnt; i++)
00348 {dsz -= iov[i].iov_len;
00349 if (dsz < 0) return retErr(EMSGSIZE);
00350 memcpy((void *)bp,(const void *)iov[i].iov_base,iov[i].iov_len);
00351 bp += iov[i].iov_len;
00352 }
00353 do {retc = sendto(FD, (Sokdata_t)sendbuff->data, (int)(bp-sendbuff->data), 0,
00354 (struct sockaddr *)&InetAddr, sizeof(InetAddr));}
00355 while (retc < 0 && errno == EINTR);
00356 }
00357
00358 if (retc < 0) return retErr(errno);
00359 wrMutex.UnLock();
00360 return 0;
00361 }
00362
00363 int XrdNetLink::Send(const char *dest,const struct iovec iov[],int iovcnt,int tmo)
00364 {
00365 int i, dsz, retc;
00366 char *bp;
00367 struct sockaddr destip;
00368
00369 if (!XrdNetDNS::Host2Dest(dest, destip))
00370 {eDest->Emsg("Link", dest, "is unreachable");
00371 return -1;
00372 }
00373
00374 if (Stream)
00375 {eDest->Emsg("Link", "Unable to send msg to", dest, "on a stream socket");
00376 return -1;
00377 }
00378
00379 wrMutex.Lock();
00380
00381 if (tmo >= 0 && !OK2Send(tmo, dest))
00382 {wrMutex.UnLock(); return -2;}
00383
00384 if (!sendbuff && !(sendbuff = BuffQ->Alloc())) return retErr(ENOMEM);
00385 dsz = sendbuff->BuffSize(); bp = sendbuff->data;
00386 for (i = 0; i < iovcnt; i++)
00387 {dsz -= iov[i].iov_len;
00388 if (dsz < 0) return retErr(EMSGSIZE);
00389 memcpy((void *)bp,(const void *)iov[i].iov_base,iov[i].iov_len);
00390 bp += iov[i].iov_len;
00391 }
00392 do {retc = sendto(FD, (Sokdata_t)sendbuff->data, (int)(bp-sendbuff->data), 0,
00393 &destip, sizeof(destip));}
00394 while (retc < 0 && errno == EINTR);
00395
00396 if (retc < 0) return retErr(errno, dest);
00397 wrMutex.UnLock();
00398 return 0;
00399 }
00400
00401
00402
00403
00404
00405 void XrdNetLink::Set(int maxl)
00406 {
00407
00408
00409
00410 LinkList.Lock();
00411 maxlink = maxl;
00412 LinkList.UnLock();
00413 return;
00414 }
00415
00416
00417
00418
00419
00420 void XrdNetLink::SetOpts(int opts)
00421 {
00422
00423
00424
00425 if (opts & XRDNETLINK_NOBLOCK) fcntl(FD, F_SETFL, O_NONBLOCK);
00426 }
00427
00428
00429
00430
00431
00432
00433
00434
00435 int XrdNetLink::OK2Send(int timeout, const char *dest)
00436 {
00437 struct pollfd polltab = {FD, POLLOUT|POLLWRNORM, 0};
00438 int retc;
00439
00440 do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
00441
00442 if (retc == 0 || !(polltab.revents & (POLLOUT | POLLWRNORM)))
00443 eDest->Emsg("Link",(dest ? dest : Lname), "is blocked.");
00444 else if (retc < 0)
00445 eDest->Emsg("Link",errno,"poll",(dest ? dest : Lname));
00446 else return 1;
00447 return 0;
00448 }
00449
00450
00451
00452
00453
00454 int XrdNetLink::retErr(int ecode, const char *dest)
00455 {
00456 wrMutex.UnLock();
00457 eDest->Emsg("Link", ecode, "send to", (dest ? dest : Lname));
00458 return (EWOULDBLOCK == ecode || EAGAIN == ecode ? -2 : -1);
00459 }