XrdNetLink.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                         X r d N e t L i n k . c c                          */
00004 /*                                                                            */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdNetLink.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdNetLinkCVSID = "$Id: XrdNetLink.cc 35287 2010-09-14 21:19:35Z ganis $";
00014   
00015 #include <fcntl.h>
00016 #ifndef WIN32
00017 #include <poll.h>
00018 #include <string.h>
00019 #include <sys/types.h>
00020 #include <sys/socket.h>
00021 #else
00022 #include "XrdSys/XrdWin32.hh"
00023 #endif
00024 
00025 #include "XrdNet/XrdNet.hh"
00026 #include "XrdNet/XrdNetBuffer.hh"
00027 #include "XrdNet/XrdNetDNS.hh"
00028 #include "XrdNet/XrdNetLink.hh"
00029 #include "XrdNet/XrdNetPeer.hh"
00030 #include "XrdSys/XrdSysError.hh"
00031 #include "XrdSys/XrdSysPlatform.hh"
00032 #include "XrdOuc/XrdOucStream.hh"
00033 #include "XrdOuc/XrdOucTokenizer.hh"
00034  
00035 /******************************************************************************/
00036 /*                 S t a t i c   I n i t i a l i z a t i o n                  */
00037 /******************************************************************************/
00038 
00039 XrdSysMutex             XrdNetLink::LinkList;
00040 XrdOucStack<XrdNetLink> XrdNetLink::LinkStack;
00041 int                     XrdNetLink::maxlink = 16;
00042 int                     XrdNetLink::numlink = 0;
00043 int                     XrdNetLink::devNull = open("/dev/null", O_RDONLY);
00044   
00045 /******************************************************************************/
00046 /*                                 A l l o c                                  */
00047 /******************************************************************************/
00048   
00049 XrdNetLink *XrdNetLink::Alloc(XrdSysError *erp, XrdNet *Net, XrdNetPeer &Peer,
00050                               XrdNetBufferQ *bq, int opts)
00051 {
00052    XrdNetLink *lp;
00053 
00054 // Lock the data area
00055 //
00056    LinkList.Lock();
00057    if ((lp = LinkStack.Pop())) numlink--;
00058    LinkList.UnLock();
00059 
00060 // Either return a new link or an old one
00061 //
00062    if (!lp) lp = new XrdNetLink(erp, bq);
00063       else if (bq != lp->BuffQ)
00064               {if (lp->recvbuff) {lp->recvbuff->Recycle(); lp->recvbuff = 0;}
00065                if (lp->sendbuff) {lp->sendbuff->Recycle(); lp->sendbuff = 0;}
00066                lp->BuffQ = bq;
00067               }
00068 
00069 // Unlock the data area return if no link was allocated
00070 //
00071    if (!lp) return lp;
00072    lp->noclose = (opts & XRDNETLINK_NOCLOSE);
00073    lp->isReset = 0;
00074 
00075 // Set the buffer pointer for this link
00076 //
00077   if (Peer.InetBuff) 
00078      { lp->recvbuff = Peer.InetBuff;
00079        if (!(lp->Bucket = new XrdOucTokenizer(Peer.InetBuff->data)))
00080           {lp->Recycle(); lp = 0;}
00081      } else if (!(opts & XRDNETLINK_NOSTREAM))
00082                {if (!(lp->Stream = new XrdOucStream(erp)))
00083                    {lp->Recycle(); lp = 0;}
00084                    else lp->Stream->Attach(Peer.fd);
00085                }
00086 
00087 // Establish the address and connection type of this link
00088 //
00089    if (lp) 
00090       {memcpy((void *)&(lp->InetAddr), (const void *)&Peer.InetAddr,
00091                                        sizeof(Peer.InetAddr));
00092        if (Peer.InetName) lp->Lname = strdup(Peer.InetName);
00093           else lp->Lname = XrdNetDNS::getHostName(Peer.InetAddr);
00094        lp->Sname = strdup(lp->Lname);
00095        Net->Trim(lp->Sname);
00096        lp->FD = Peer.fd;
00097        }
00098 
00099 // Return the link
00100 //
00101    return lp;
00102 }
00103   
00104 /******************************************************************************/
00105 /*                                 C l o s e                                  */
00106 /******************************************************************************/
00107   
00108 int XrdNetLink::Close(int defer)
00109 {
00110 
00111 // Make sure no output activity is occuring. Sync on input only if this is not
00112 // a deferred close. Otherwise, we will likely deadlock.
00113 //
00114    if(!defer) rdMutex.Lock();
00115               wrMutex.Lock();
00116 
00117 // If close is not to be defered until a recycle, then delete appendages and
00118 // close the file descriptor unless it is being shared. Otherwise, substitute
00119 // '/dev/null' for the file descriptor target to scuttle future reads & writes.
00120 //
00121     if (!defer)
00122        {if (Stream)   {Stream->Detach(); delete Stream; Stream = 0;}
00123         if (Bucket)   {delete Bucket; Bucket = 0;}
00124         if (recvbuff) {recvbuff->Recycle(); recvbuff = 0;}
00125         if (sendbuff) {sendbuff->Recycle(); sendbuff = 0;}
00126         if (Lname)    {free(Lname); Lname = 0;}
00127         if (FD >= 0 && !noclose) close(FD);
00128         FD = -1;
00129        } else if (FD >= 0 && !isReset)
00130                  {dup2(devNull, FD); isReset = 1;}
00131 
00132 // All done
00133 //
00134                 wrMutex.UnLock();
00135     if (!defer) rdMutex.UnLock();
00136     return 0;
00137 }
00138 
00139 /******************************************************************************/
00140 /*                               G e t L i n e                                */
00141 /******************************************************************************/
00142   
00143 char *XrdNetLink::GetLine()
00144 {
00145      if (Stream) return Stream->GetLine();
00146         else if (Bucket) return Bucket->GetLine();
00147                 else if (recvbuff && recvbuff->dlen) return recvbuff->data;
00148      return 0;
00149 }
00150 
00151 /******************************************************************************/
00152 /*                              G e t T o k e n                               */
00153 /******************************************************************************/
00154   
00155 char *XrdNetLink::GetToken(char **rest)
00156 {
00157      if (Stream)   return Stream->GetToken(rest);
00158         else if (Bucket) return Bucket->GetToken(rest);
00159      return 0;
00160 }
00161   
00162 char *XrdNetLink::GetToken()
00163 {
00164      if (Stream)   return Stream->GetToken();
00165         else if (Bucket) return Bucket->GetToken();
00166      return 0;
00167 }
00168 
00169 /******************************************************************************/
00170 /*                             L a s t E r r o r                              */
00171 /******************************************************************************/
00172   
00173 int XrdNetLink::LastError()
00174 {
00175     return (Stream ? Stream->LastError() : 0);
00176 }
00177 
00178 /******************************************************************************/
00179 /*                               O K 2 R e c v                                */
00180 /******************************************************************************/
00181   
00182 int XrdNetLink::OK2Recv(int timeout)
00183 {
00184    struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
00185    int retc;
00186 
00187    do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
00188 
00189    return (retc == 1 && (polltab.revents & (POLLIN | POLLRDNORM)));
00190 }
00191 
00192 /******************************************************************************/
00193 /*                               R e c y c l e                                */
00194 /******************************************************************************/
00195 
00196 void XrdNetLink::Recycle()
00197 {
00198 
00199 // Check if we have enough objects, if so, delete ourselves and return
00200 //
00201    if (numlink >= maxlink) {delete this; return;}
00202    Close();
00203 
00204 // Add the link to the recycle list
00205 //
00206    LinkList.Lock();
00207    LinkStack.Push(&LinkLink);
00208    numlink++;
00209    LinkList.UnLock();
00210    return;
00211 }
00212 
00213 /******************************************************************************/
00214 /*                                  R e c v                                   */
00215 /******************************************************************************/
00216 
00217 int XrdNetLink::Recv(char *Buff, int Blen)
00218 {
00219    ssize_t rlen;
00220 
00221 // Note that we will read only as much as is queued.
00222 //
00223    rdMutex.Lock();
00224    do {rlen = read(FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
00225    rdMutex.UnLock();
00226 
00227    if (rlen >= 0) return int(rlen);
00228    eDest->Emsg("Link", errno, "recieve from", Lname);
00229    return -1;
00230 }
00231 
00232 /******************************************************************************/
00233 /*                              R e t T o k e n                               */
00234 /******************************************************************************/
00235   
00236 void XrdNetLink::RetToken()
00237 {
00238      if (Stream) Stream->RetToken();
00239         else if (Bucket) Bucket->RetToken();
00240 }
00241 
00242 /******************************************************************************/
00243 /*                                  S e n d                                   */
00244 /******************************************************************************/
00245 
00246 int XrdNetLink::Send(const char *Buff, int Blen, int tmo)
00247 {
00248    int retc;
00249 
00250    if (!Blen && !(Blen = strlen(Buff))) return 0;
00251    if ('\n' != Buff[Blen-1])
00252       {const struct iovec iodata[2] = {{IOV_INIT((char *)Buff, Blen)},
00253                                        {IOV_INIT((char *)"\n", 1)}};
00254        return Send(iodata, 2, tmo);
00255       }
00256 
00257    wrMutex.Lock();
00258 
00259    if (tmo >= 0 && !OK2Send(tmo))
00260       {wrMutex.UnLock(); return -2;}
00261 
00262    if (Stream)
00263       do {retc = write(FD, Buff, Blen);}
00264          while (retc < 0 && errno == EINTR);
00265       else
00266       do {retc = sendto(FD, (Sokdata_t)Buff, Blen, 0,
00267                        (struct sockaddr *)&InetAddr, sizeof(InetAddr));}
00268          while (retc < 0 && errno == EINTR);
00269    if (retc < 0) return retErr(errno);
00270    wrMutex.UnLock();
00271    return 0;
00272 }
00273 
00274 int XrdNetLink::Send(const void *Buff, int Blen, int tmo)
00275 {
00276    int retc;
00277 
00278    wrMutex.Lock();
00279 
00280    if (tmo >= 0 && !OK2Send(tmo))
00281       {wrMutex.UnLock(); return -2;}
00282 
00283    if (Stream)
00284       do {retc = write(FD, Buff, Blen);}
00285          while (retc < 0 && errno == EINTR);
00286       else
00287       do {retc = sendto(FD, (Sokdata_t)Buff, Blen, 0,
00288                        (struct sockaddr *)&InetAddr, sizeof(InetAddr));}
00289          while (retc < 0 && errno == EINTR);
00290    if (retc < 0) return retErr(errno);
00291    wrMutex.UnLock();
00292    return 0; 
00293 }
00294   
00295 int XrdNetLink::Send(const char *dest, const char *Buff, int Blen, int tmo)
00296 {
00297    int retc;
00298    struct sockaddr destip;
00299 
00300    if (!Blen && !(Blen = strlen(Buff))) return 0;
00301    if ('\n' != Buff[Blen-1])
00302       {const struct iovec iodata[2] = {{IOV_INIT((char *)Buff, Blen)},
00303                                        {IOV_INIT((char *)"\n", 1)}};
00304        return Send(dest, iodata, 2, tmo);
00305       }
00306 
00307    if (!XrdNetDNS::Host2Dest(dest, destip))
00308       {eDest->Emsg("Link", dest, "is unreachable");
00309        return -1;
00310       }
00311 
00312    if (Stream)
00313       {eDest->Emsg("Link", "Unable to send msg to", dest, "on a stream socket");
00314        return -1;
00315       }
00316 
00317    wrMutex.Lock();
00318 
00319    if (tmo >= 0 && !OK2Send(tmo, dest))
00320       {wrMutex.UnLock(); return -2;}
00321 
00322    do {retc = sendto(FD, (Sokdata_t)Buff, Blen, 0,
00323                     (struct sockaddr *)&destip, sizeof(destip));}
00324        while (retc < 0 && errno == EINTR);
00325 
00326    if (retc < 0) return retErr(errno, dest);
00327    wrMutex.UnLock();
00328    return 0;
00329 }
00330 
00331 int XrdNetLink::Send(const struct iovec iov[], int iovcnt, int tmo)
00332 {
00333    int i, dsz, retc;
00334    char *bp;
00335 
00336    wrMutex.Lock();
00337 
00338    if (tmo >= 0 && !OK2Send(tmo))
00339       {wrMutex.UnLock(); return -2;}
00340 
00341    if (Stream)
00342       do {retc = writev(FD, iov, iovcnt);}
00343          while (retc < 0 && errno == EINTR);
00344       else
00345       {if (!sendbuff && !(sendbuff = BuffQ->Alloc())) return retErr(ENOMEM);
00346        dsz = sendbuff->BuffSize(); bp = sendbuff->data;
00347        for (i = 0; i < iovcnt; i++)
00348            {dsz -= iov[i].iov_len;
00349             if (dsz < 0) return retErr(EMSGSIZE);
00350             memcpy((void *)bp,(const void *)iov[i].iov_base,iov[i].iov_len);
00351             bp += iov[i].iov_len;
00352            }
00353        do {retc = sendto(FD, (Sokdata_t)sendbuff->data, (int)(bp-sendbuff->data), 0,
00354                        (struct sockaddr *)&InetAddr, sizeof(InetAddr));}
00355            while (retc < 0 && errno == EINTR);
00356        }
00357 
00358    if (retc < 0) return retErr(errno);
00359    wrMutex.UnLock();
00360    return 0;
00361 }
00362 
00363 int XrdNetLink::Send(const char *dest,const struct iovec iov[],int iovcnt,int tmo)
00364 {
00365    int i, dsz, retc;
00366    char *bp;
00367    struct sockaddr destip;
00368 
00369    if (!XrdNetDNS::Host2Dest(dest, destip))
00370       {eDest->Emsg("Link", dest, "is unreachable");
00371        return -1;
00372       }
00373 
00374    if (Stream)
00375       {eDest->Emsg("Link", "Unable to send msg to", dest, "on a stream socket");
00376        return -1;
00377       }
00378 
00379    wrMutex.Lock();
00380 
00381    if (tmo >= 0 && !OK2Send(tmo, dest))
00382       {wrMutex.UnLock(); return -2;}
00383 
00384    if (!sendbuff && !(sendbuff = BuffQ->Alloc())) return retErr(ENOMEM);
00385    dsz = sendbuff->BuffSize(); bp = sendbuff->data;
00386    for (i = 0; i < iovcnt; i++)
00387        {dsz -= iov[i].iov_len;
00388         if (dsz < 0) return retErr(EMSGSIZE);
00389         memcpy((void *)bp,(const void *)iov[i].iov_base,iov[i].iov_len);
00390         bp += iov[i].iov_len;
00391        }
00392    do {retc = sendto(FD, (Sokdata_t)sendbuff->data, (int)(bp-sendbuff->data), 0,
00393                      &destip, sizeof(destip));}
00394       while (retc < 0 && errno == EINTR);
00395 
00396    if (retc < 0) return retErr(errno, dest);
00397    wrMutex.UnLock();
00398    return 0;
00399 }
00400 
00401 /******************************************************************************/
00402 /*                                   S e t                                    */
00403 /******************************************************************************/
00404   
00405 void XrdNetLink::Set(int maxl)
00406 {
00407 
00408 // Lock the data area, set max links, unlock and return
00409 //
00410    LinkList.Lock();
00411    maxlink = maxl;
00412    LinkList.UnLock();
00413    return;
00414 }
00415  
00416 /******************************************************************************/
00417 /*                               S e t O p t s                                */
00418 /******************************************************************************/
00419   
00420 void XrdNetLink::SetOpts(int opts)
00421 {
00422 
00423 // Set options we care about
00424 //
00425    if (opts & XRDNETLINK_NOBLOCK) fcntl(FD, F_SETFL, O_NONBLOCK);
00426 }
00427   
00428 /******************************************************************************/
00429 /*                       P r i v a t e   M e t h o d s                        */
00430 /******************************************************************************/
00431 /******************************************************************************/
00432 /*                               O K 2 S e n d                                */
00433 /******************************************************************************/
00434   
00435 int XrdNetLink::OK2Send(int timeout, const char *dest)
00436 {
00437    struct pollfd polltab = {FD, POLLOUT|POLLWRNORM, 0};
00438    int retc;
00439 
00440    do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
00441 
00442    if (retc == 0 || !(polltab.revents & (POLLOUT | POLLWRNORM)))
00443       eDest->Emsg("Link",(dest ? dest : Lname), "is blocked.");
00444       else if (retc < 0)
00445               eDest->Emsg("Link",errno,"poll",(dest ? dest : Lname));
00446               else return 1;
00447    return 0;
00448 }
00449   
00450 /******************************************************************************/
00451 /*                                r e t E r r                                 */
00452 /******************************************************************************/
00453   
00454 int XrdNetLink::retErr(int ecode, const char *dest)
00455 {
00456    wrMutex.UnLock();
00457    eDest->Emsg("Link", ecode, "send to", (dest ? dest : Lname));
00458    return (EWOULDBLOCK == ecode || EAGAIN == ecode ? -2 : -1);
00459 }

Generated on Tue Jul 5 14:46:43 2011 for ROOT_528-00b_version by  doxygen 1.5.1