XrdLink.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                            X r d L i n k . c c                             */
00004 /*                                                                            */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*       All Rights Reserved. See XrdInfo.cc for complete License Terms       */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC03-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010   
00011 //           $Id: XrdLink.cc 38011 2011-02-08 18:35:57Z ganis $
00012 
00013 const char *XrdLinkCVSID = "$Id: XrdLink.cc 38011 2011-02-08 18:35:57Z ganis $";
00014 
00015 #include <poll.h>
00016 #include <signal.h>
00017 #include <stdio.h>
00018 #include <string.h>
00019 #include <sys/types.h>
00020 #include <sys/socket.h>
00021 #include <sys/uio.h>
00022 
00023 #ifdef __linux__
00024 #include <netinet/tcp.h>
00025 #if !defined(TCP_CORK)
00026 #undef HAVE_SENDFILE
00027 #endif
00028 #endif
00029 
00030 #ifdef HAVE_SENDFILE
00031 
00032 #ifndef __macos__
00033 #include <sys/sendfile.h>
00034 #endif
00035 
00036 #endif
00037 
00038 #include "XrdNet/XrdNetDNS.hh"
00039 #include "XrdNet/XrdNetPeer.hh"
00040 #include "XrdSys/XrdSysError.hh"
00041 #include "XrdSys/XrdSysPlatform.hh"
00042 
00043 #include "Xrd/XrdBuffer.hh"
00044 #include "Xrd/XrdLink.hh"
00045 #include "Xrd/XrdInet.hh"
00046 #include "Xrd/XrdPoll.hh"
00047 #include "Xrd/XrdScheduler.hh"
00048 #define  TRACELINK this
00049 #include "Xrd/XrdTrace.hh"
00050  
00051 /******************************************************************************/
00052 /*                         L o c a l   C l a s s e s                          */
00053 /******************************************************************************/
00054 /******************************************************************************/
00055 /*                    C l a s s   x r d _ L i n k S c a n                     */
00056 /******************************************************************************/
00057   
00058 class XrdLinkScan : XrdJob
00059 {
00060 public:
00061 
00062 void          DoIt() {idleScan();}
00063 
00064               XrdLinkScan(int im, int it, const char *lt="idle link scan") :
00065                                            XrdJob(lt)
00066                           {idleCheck = im; idleTicks = it;}
00067              ~XrdLinkScan() {}
00068 
00069 private:
00070 
00071              void   idleScan();
00072 
00073              int    idleCheck;
00074              int    idleTicks;
00075 
00076 static const char *TraceID;
00077 };
00078   
00079 /******************************************************************************/
00080 /*                               G l o b a l s                                */
00081 /******************************************************************************/
00082   
00083 extern XrdSysError     XrdLog;
00084 
00085 extern XrdScheduler    XrdSched;
00086 
00087 extern XrdInet        *XrdNetTCP;
00088 extern XrdOucTrace     XrdTrace;
00089 
00090 #if defined(HAVE_SENDFILE)
00091        int             XrdLink::sfOK = 1;
00092 #else
00093        int             XrdLink::sfOK = 0;
00094 #endif
00095 
00096        XrdLink       **XrdLink::LinkTab;
00097        char           *XrdLink::LinkBat;
00098        unsigned int    XrdLink::LinkAlloc;
00099        int             XrdLink::LTLast = -1;
00100        XrdSysMutex     XrdLink::LTMutex;
00101 
00102        const char     *XrdLink::TraceID = "Link";
00103 
00104        long long       XrdLink::LinkBytesIn   = 0;
00105        long long       XrdLink::LinkBytesOut  = 0;
00106        long long       XrdLink::LinkConTime   = 0;
00107        long long       XrdLink::LinkCountTot  = 0;
00108        int             XrdLink::LinkCount     = 0;
00109        int             XrdLink::LinkCountMax  = 0;
00110        int             XrdLink::LinkTimeOuts  = 0;
00111        int             XrdLink::LinkStalls    = 0;
00112        int             XrdLink::LinkSfIntr    = 0;
00113        XrdSysMutex     XrdLink::statsMutex;
00114 
00115        const char     *XrdLinkScan::TraceID = "LinkScan";
00116        int             XrdLink::devNull = open("/dev/null", O_RDONLY);
00117        short           XrdLink::killWait= 3;  // Kill then wait
00118        short           XrdLink::waitKill= 4;  // Wait then kill
00119 
00120 // The following values are defined for LinkBat[]. We assume that FREE is 0
00121 //
00122 #define XRDLINK_FREE 0x00
00123 #define XRDLINK_USED 0x01
00124 #define XRDLINK_IDLE 0x02
00125   
00126 /******************************************************************************/
00127 /*                           C o n s t r u c t o r                            */
00128 /******************************************************************************/
00129   
00130 XrdLink::XrdLink() : XrdJob("connection"), IOSemaphore(0, "link i/o")
00131 {
00132   Etext = 0;
00133   HostName = 0;
00134   Reset();
00135 }
00136 
00137 void XrdLink::Reset()
00138 {
00139   FD    = -1;
00140   if (Etext)    {free(Etext); Etext = 0;}
00141   if (HostName) {free(HostName); HostName = 0;}
00142   Uname[sizeof(Uname)-1] = '@';
00143   Uname[sizeof(Uname)-2] = '?';
00144   Lname[0] = '?';
00145   Lname[1] = '\0';
00146   ID       = &Uname[sizeof(Uname)-2];
00147   Comment  = ID;
00148   Next     = 0;
00149   Protocol = 0; 
00150   ProtoAlt = 0;
00151   conTime  = time(0);
00152   stallCnt = stallCntTot = 0;
00153   tardyCnt = tardyCntTot = 0;
00154   InUse    = 1;
00155   Poller   = 0; 
00156   PollEnt  = 0;
00157   isEnabled= 0;
00158   isIdle   = 0;
00159   inQ      = 0;
00160   tBound   = 0;
00161   BytesOut = BytesIn = BytesOutTot = BytesInTot = 0;
00162   doPost   = 0;
00163   LockReads= 0;
00164   KeepFD   = 0;
00165   udpbuff  = 0;
00166   Instance = 0;
00167   KillcvP  = 0;
00168   KillCnt  = 0;
00169 }
00170 
00171 /******************************************************************************/
00172 /*                                 A l l o c                                  */
00173 /******************************************************************************/
00174   
00175 XrdLink *XrdLink::Alloc(XrdNetPeer &Peer, int opts)
00176 {
00177    static XrdSysMutex  instMutex;
00178    static unsigned int myInstance = 1;
00179    XrdLink *lp;
00180    char *unp, buff[16];
00181    int bl;
00182 
00183 // Make sure that the link slot is available
00184 //
00185    LTMutex.Lock();
00186    if (LinkBat[Peer.fd])
00187       {LTMutex.UnLock();
00188        XrdLog.Emsg("Link", "attempt to reuse active link");
00189        return (XrdLink *)0;
00190       }
00191 
00192 // Check if we already have a link object in this slot. If not, allocate
00193 // a quantum of link objects and put them in the table.
00194 //
00195    if (!(lp = LinkTab[Peer.fd]))
00196       {unsigned int i;
00197        XrdLink **blp, *nlp = new XrdLink[LinkAlloc]();
00198        if (!nlp)
00199           {LTMutex.UnLock();
00200            XrdLog.Emsg("Link", ENOMEM, "create link"); 
00201            return (XrdLink *)0;
00202           }
00203        blp = &LinkTab[Peer.fd/LinkAlloc*LinkAlloc];
00204        for (i = 0; i < LinkAlloc; i++, blp++) *blp = &nlp[i];
00205        lp = LinkTab[Peer.fd];
00206       }
00207       else lp->Reset();
00208    LinkBat[Peer.fd] = XRDLINK_USED;
00209    if (Peer.fd > LTLast) LTLast = Peer.fd;
00210    LTMutex.UnLock();
00211 
00212 // Establish the instance number of this link. This is will prevent us from
00213 // sending asynchronous responses to the wrong client when the file descriptor
00214 // gets reused for connections to the same host.
00215 //
00216    instMutex.Lock();
00217    lp->Instance = myInstance++;
00218    instMutex.UnLock();
00219 
00220 // Establish the address and connection type of this link
00221 //
00222    memcpy((void *)&(lp->InetAddr), (const void *)&Peer.InetAddr,
00223           sizeof(struct sockaddr));
00224    if (Peer.InetName) strlcpy(lp->Lname, Peer.InetName, sizeof(lp->Lname));
00225       else {char *host = XrdNetDNS::getHostName(Peer.InetAddr);
00226             strlcpy(lp->Lname, host, sizeof(lp->Lname));
00227             free(host);
00228            }
00229    lp->HostName = strdup(lp->Lname);
00230    lp->HNlen = strlen(lp->HostName);
00231    XrdNetTCP->Trim(lp->Lname);
00232    bl = sprintf(buff, "?:%d", Peer.fd);
00233    unp = lp->Lname - bl - 1;
00234    strncpy(unp, buff, bl);
00235    lp->ID = unp;
00236    lp->FD = Peer.fd;
00237    lp->udpbuff = Peer.InetBuff;
00238    lp->Comment = (const char *)unp;
00239 
00240 // Set options as needed
00241 //
00242    lp->LockReads = (0 != (opts & XRDLINK_RDLOCK));
00243    lp->KeepFD    = (0 != (opts & XRDLINK_NOCLOSE));
00244 
00245 // Return the link
00246 //
00247    statsMutex.Lock();
00248    LinkCountTot++;
00249    if (LinkCountMax == LinkCount++) LinkCountMax = LinkCount;
00250    statsMutex.UnLock();
00251    return lp;
00252 }
00253   
00254 /******************************************************************************/
00255 /*                                  B i n d                                   */
00256 /******************************************************************************/
00257   
00258 void XrdLink::Bind(pthread_t tid)
00259 {
00260 
00261 // For bind operations, it's quite simple
00262 //
00263    TID = tid; 
00264    tBound = 1; 
00265 }
00266 
00267 /******************************************************************************/
00268 
00269 void XrdLink::Bind()
00270 {
00271 #ifdef __linux__
00272    pthread_t curTID = (tBound ? TID : XrdSysThread::ID());
00273 #endif
00274 
00275 // For unbind operations, we need to do some additional work. This is specific
00276 // to Linux. See the discussion under defered close in the Close() method.
00277 //
00278    if (tBound)
00279       {tBound = 0;
00280 #ifdef __linux__
00281        if (!XrdSysThread::Same(curTID, XrdSysThread::ID()))
00282           {XrdSysThread::Signal(curTID, SIGSTOP);
00283            XrdSysThread::Signal(curTID, SIGCONT);
00284           }
00285 #endif
00286       }
00287 }
00288 
00289 /******************************************************************************/
00290 /*                                C l i e n t                                 */
00291 /******************************************************************************/
00292   
00293 int XrdLink::Client(char *nbuf, int nbsz)
00294 {
00295    int ulen;
00296 
00297 // Generate full client name
00298 //
00299    if (nbsz <= 0) return 0;
00300    ulen = (Lname - ID);
00301    if ((ulen + HNlen) >= nbsz) ulen = 0;
00302       else {strncpy(nbuf, ID, ulen);
00303             strcpy(nbuf+ulen, HostName);
00304             ulen += HNlen;
00305            }
00306    return ulen;
00307 }
00308 
00309 /******************************************************************************/
00310 /*                                 C l o s e                                  */
00311 /******************************************************************************/
00312   
00313 int XrdLink::Close(int defer)
00314 {   int csec, fd, rc = 0;
00315 
00316 // If a defer close is requested, we can close the descriptor but we must
00317 // keep the slot number to prevent a new client getting the same fd number.
00318 // Linux is peculiar in that any in-progress operations will remain in that
00319 // state even after the FD is closed unless there is some activity either on
00320 // the connection or an event occurs that causes an operation restart. We
00321 // accomplish this in Linux by stopping and then starting the thread that may
00322 // be bound to this link (see Bind()). Ugly, but that's what happens in Linux.
00323 // We also add a bit of portability by issuing a shutdown() on the socket prior
00324 // closing it. On most platforms, this informs readers that the connection is
00325 // gone (though not on Linux, sigh).
00326 //
00327    opMutex.Lock();
00328    if (defer)
00329       {TRACEI(DEBUG, "Closing FD only");
00330        if (FD > 1)
00331           {fd = FD; FD = -FD; csec = Instance; Instance = 0;
00332            if (!KeepFD)
00333               {shutdown(fd, SHUT_RDWR);
00334                if (dup2(devNull, fd) < 0)
00335                   {FD = fd; Instance = csec;
00336                    XrdLog.Emsg("Link",errno,"close FD for",ID);
00337                   } else Bind();
00338               }
00339           }
00340        opMutex.UnLock();
00341        return 0;
00342       }
00343 
00344 // Multiple protocols may be bound to this link. If it is in use, defer the
00345 // actual close until the use count drops to one.
00346 //
00347    while(InUse > 1)
00348       {opMutex.UnLock();
00349        TRACEI(DEBUG, "Close defered, use count=" <<InUse);
00350        Serialize();
00351        opMutex.Lock();
00352       }
00353    InUse--;
00354    Instance = 0;
00355 
00356 // Add up the statistic for this link
00357 //
00358    syncStats(&csec);
00359 
00360 // Clean this link up
00361 //
00362    if (Protocol) {Protocol->Recycle(this, csec, Etext); Protocol = 0;}
00363    if (ProtoAlt) {ProtoAlt->Recycle(this, csec, Etext); ProtoAlt = 0;}
00364    if (udpbuff)  {udpbuff->Recycle();  udpbuff  = 0;}
00365    if (Etext) {free(Etext); Etext = 0;}
00366    InUse    = 0;
00367 
00368 // At this point we can have no lock conflicts, so if someone is waiting for
00369 // us to terminate let them know about it. Note that we will get the condvar
00370 // mutex while we hold the opMutex. This is the required order! We will also
00371 // zero out the pointer to the condvar while holding the opmutex.
00372 //
00373    if (KillcvP) {KillcvP->  Lock(); KillcvP->Signal();
00374                  KillcvP->UnLock(); KillcvP = 0;
00375                 }
00376 
00377 // Remove ourselves from the poll table and then from the Link table. We may
00378 // not hold on to the opMutex when we acquire the LTMutex. However, the link
00379 // table needs to be cleaned up prior to actually closing the socket. So, we
00380 // do some fancy footwork to prevent multiple closes of this link.
00381 //
00382    fd = (FD < 0 ? -FD : FD);
00383    if (FD != -1)
00384       {if (Poller) {XrdPoll::Detach(this); Poller = 0;}
00385        FD = -1;
00386        opMutex.UnLock();
00387        LTMutex.Lock();
00388        LinkBat[fd] = XRDLINK_FREE;
00389        if (fd == LTLast) while(LTLast && !(LinkBat[LTLast])) LTLast--;
00390        LTMutex.UnLock();
00391       } else opMutex.UnLock();
00392 
00393 // Close the file descriptor if it isn't being shared. Do it as the last
00394 // thing because closes and accepts and not interlocked.
00395 //
00396    if (fd >= 2) {if (KeepFD) rc = 0;
00397                     else rc = (close(fd) < 0 ? errno : 0);
00398                 }
00399    if (rc) XrdLog.Emsg("Link", rc, "close", ID);
00400    return rc;
00401 }
00402 
00403 /******************************************************************************/
00404 /*                                  D o I t                                   */
00405 /******************************************************************************/
00406  
00407 void XrdLink::DoIt()
00408 {
00409    int rc;
00410 
00411 // The Process() return code tells us what to do:
00412 // < 0 -> Stop getting requests, 
00413 //        -EINPROGRESS leave link disabled but otherwise all is well
00414 //        -n           Error, disable and close the link
00415 // = 0 -> OK, get next request, if allowed, o/w enable the link
00416 // > 0 -> Slow link, stop getting requests  and enable the link
00417 //
00418    if (Protocol)
00419       do {rc = Protocol->Process(this);} while (!rc && XrdSched.canStick());
00420       else {XrdLog.Emsg("Link", "Dispatch on closed link", ID);
00421             return;
00422            }
00423 
00424 // Either re-enable the link and cycle back waiting for a new request, leave
00425 // disabled, or terminate the connection.
00426 //
00427    if (rc >= 0) {if (Poller) Poller->Enable(this);}
00428       else if (rc != -EINPROGRESS) Close();
00429 }
00430   
00431 /******************************************************************************/
00432 /*                                  F i n d                                   */
00433 /******************************************************************************/
00434 
00435 // Warning: curr must be set to a value of 0 or less on the initial call and
00436 //          not touched therafter unless a null pointer is returned. When an
00437 //          actual link object pointer is returned, it's refcount is increased.
00438 //          The count is automatically decreased on the next call to Find().
00439 //
00440 XrdLink *XrdLink::Find(int &curr, XrdLinkMatch *who)
00441 {
00442    XrdLink *lp;
00443    const int MaxSeek = 16;
00444    unsigned int myINS;
00445    int i, seeklim = MaxSeek;
00446 
00447 // Do initialization
00448 //
00449    LTMutex.Lock();
00450    if (curr >= 0 && LinkTab[curr]) LinkTab[curr]->setRef(-1);
00451       else curr = -1;
00452 
00453 // Find next matching link. Since this may take some time, we periodically
00454 // release the LTMutex lock which drives up overhead but will still allow
00455 // other critical operations to occur.
00456 //
00457    for (i = curr+1; i <= LTLast; i++)
00458        {if ((lp = LinkTab[i]) && LinkBat[i] && lp->HostName)
00459            if (!who 
00460            ||   who->Match(lp->ID,lp->Lname-lp->ID-1,lp->HostName,lp->HNlen))
00461               {myINS = lp->Instance;
00462                LTMutex.UnLock();
00463                lp->setRef(1);
00464                curr = i;
00465                if (myINS == lp->Instance) return lp;
00466                LTMutex.Lock();
00467               }
00468         if (!seeklim--) {LTMutex.UnLock(); seeklim = MaxSeek; LTMutex.Lock();}
00469        }
00470 
00471 // Done scanning the table
00472 //
00473     LTMutex.UnLock();
00474     curr = -1;
00475     return 0;
00476 }
00477 
00478 /******************************************************************************/
00479 /*                               g e t N a m e                                */
00480 /******************************************************************************/
00481 
00482 // Warning: curr must be set to a value of 0 or less on the initial call and
00483 //          not touched therafter unless null is returned. Returns the length
00484 //          the name in nbuf.
00485 //
00486 int XrdLink::getName(int &curr, char *nbuf, int nbsz, XrdLinkMatch *who)
00487 {
00488    XrdLink *lp;
00489    const int MaxSeek = 16;
00490    int i, ulen = 0, seeklim = MaxSeek;
00491 
00492 // Find next matching link. Since this may take some time, we periodically
00493 // release the LTMutex lock which drives up overhead but will still allow
00494 // other critical operations to occur.
00495 //
00496    LTMutex.Lock();
00497    for (i = curr+1; i <= LTLast; i++)
00498        {if ((lp = LinkTab[i]) && LinkBat[i] && lp->HostName)
00499            if (!who 
00500            ||   who->Match(lp->ID,lp->Lname-lp->ID-1,lp->HostName,lp->HNlen))
00501               {ulen = lp->Client(nbuf, nbsz);
00502                LTMutex.UnLock();
00503                curr = i;
00504                return ulen;
00505               }
00506         if (!seeklim--) {LTMutex.UnLock(); seeklim = MaxSeek; LTMutex.Lock();}
00507        }
00508    LTMutex.UnLock();
00509 
00510 // Done scanning the table
00511 //
00512    curr = -1;
00513    return 0;
00514 }
00515 
00516 /******************************************************************************/
00517 /*                                  P e e k                                   */
00518 /******************************************************************************/
00519   
00520 int XrdLink::Peek(char *Buff, int Blen, int timeout)
00521 {
00522    XrdSysMutexHelper theMutex;
00523    struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
00524    ssize_t mlen;
00525    int retc;
00526 
00527 // Lock the read mutex if we need to, the helper will unlock it upon exit
00528 //
00529    if (LockReads) theMutex.Lock(&rdMutex);
00530 
00531 // Wait until we can actually read something
00532 //
00533    isIdle = 0;
00534    do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
00535    if (retc != 1)
00536       {if (retc == 0) return 0;
00537        return XrdLog.Emsg("Link", -errno, "poll", ID);
00538       }
00539 
00540 // Verify it is safe to read now
00541 //
00542    if (!(polltab.revents & (POLLIN|POLLRDNORM)))
00543       {XrdLog.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
00544                            "polling", ID);
00545        return -1;
00546       }
00547 
00548 // Do the peek.
00549 //
00550    do {mlen = recv(FD, Buff, Blen, MSG_PEEK);}
00551       while(mlen < 0 && errno == EINTR);
00552 
00553 // Return the result
00554 //
00555    if (mlen >= 0) return int(mlen);
00556    XrdLog.Emsg("Link", errno, "peek on", ID);
00557    return -1;
00558 }
00559   
00560 /******************************************************************************/
00561 /*                                  R e c v                                   */
00562 /******************************************************************************/
00563   
00564 int XrdLink::Recv(char *Buff, int Blen)
00565 {
00566    ssize_t rlen;
00567 
00568 // Note that we will read only as much as is queued. Use Recv() with a
00569 // timeout to receive as much data as possible.
00570 //
00571    if (LockReads) rdMutex.Lock();
00572    isIdle = 0;
00573    do {rlen = read(FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
00574    if (LockReads) rdMutex.UnLock();
00575 
00576    if (rlen >= 0) return int(rlen);
00577    if (FD >= 0) XrdLog.Emsg("Link", errno, "receive from", ID);
00578    return -1;
00579 }
00580 
00581 /******************************************************************************/
00582 
00583 int XrdLink::Recv(char *Buff, int Blen, int timeout)
00584 {
00585    XrdSysMutexHelper theMutex;
00586    struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
00587    ssize_t rlen, totlen = 0;
00588    int retc;
00589 
00590 // Lock the read mutex if we need to, the helper will unlock it upon exit
00591 //
00592    if (LockReads) theMutex.Lock(&rdMutex);
00593 
00594 // Wait up to timeout milliseconds for data to arrive
00595 //
00596    isIdle = 0;
00597    while(Blen > 0)
00598         {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
00599          if (retc != 1)
00600             {if (retc == 0)
00601                 {tardyCnt++;
00602                  if (totlen  && (++stallCnt & 0xff) == 1)
00603                     TRACEI(DEBUG, "read timed out");
00604                  return int(totlen);
00605                 }
00606              return (FD >= 0 ? XrdLog.Emsg("Link", -errno, "poll", ID) : -1);
00607             }
00608 
00609          // Verify it is safe to read now
00610          //
00611          if (!(polltab.revents & (POLLIN|POLLRDNORM)))
00612             {XrdLog.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
00613                                  "polling", ID);
00614              return -1;
00615             }
00616 
00617          // Read as much data as you can. Note that we will force an error
00618          // if we get a zero-length read after poll said it was OK.
00619          //
00620          do {rlen = recv(FD, Buff, Blen, 0);} while(rlen < 0 && errno == EINTR);
00621          if (rlen <= 0)
00622             {if (!rlen) return -ENOMSG;
00623              return (FD<0 ? -1 : XrdLog.Emsg("Link",-errno,"receive from",ID));
00624             }
00625          BytesIn += rlen; totlen += rlen; Blen -= rlen; Buff += rlen;
00626         }
00627 
00628    return int(totlen);
00629 }
00630 
00631 
00632 /******************************************************************************/
00633 /*                               R e c v A l l                                */
00634 /******************************************************************************/
00635   
00636 int XrdLink::RecvAll(char *Buff, int Blen, int timeout)
00637 {
00638    struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
00639    ssize_t rlen;
00640    int     retc;
00641 
00642 // Check if timeout specified. Notice that the timeout is the max we will
00643 // for some data. We will wait forever for all the data. Yeah, it's weird.
00644 //
00645    if (timeout >= 0)
00646       {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
00647        if (retc != 1)
00648           {if (!retc) return -ETIMEDOUT;
00649            XrdLog.Emsg("Link",errno,"poll",ID);
00650            return -1;
00651           }
00652        if (!(polltab.revents & (POLLIN|POLLRDNORM)))
00653           {XrdLog.Emsg("Link",XrdPoll::Poll2Text(polltab.revents),"polling",ID);
00654            return -1;
00655           }
00656       }
00657 
00658 // Note that we will block until we receive all he bytes.
00659 //
00660    if (LockReads) rdMutex.Lock();
00661    isIdle = 0;
00662    do {rlen = recv(FD,Buff,Blen,MSG_WAITALL);} while(rlen < 0 && errno == EINTR);
00663    if (LockReads) rdMutex.UnLock();
00664 
00665    if (int(rlen) == Blen) return Blen;
00666    if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
00667       else if (rlen > 0) XrdLog.Emsg("RecvAll","Premature end from", ID);
00668               else if (FD >= 0) XrdLog.Emsg("Link",errno,"recieve from",ID);
00669    return -1;
00670 }
00671 
00672 /******************************************************************************/
00673 /*                                  S e n d                                   */
00674 /******************************************************************************/
00675   
00676 int XrdLink::Send(const char *Buff, int Blen)
00677 {
00678    ssize_t retc = 0, bytesleft = Blen;
00679 
00680 // Get a lock
00681 //
00682    wrMutex.Lock();
00683    isIdle = 0;
00684 
00685 // Write the data out
00686 //
00687    while(bytesleft)
00688         {if ((retc = write(FD, Buff, bytesleft)) < 0)
00689             {if (errno == EINTR) continue;
00690                 else break;
00691             }
00692          BytesOut += retc; bytesleft -= retc; Buff += retc;
00693         }
00694 
00695 // All done
00696 //
00697    wrMutex.UnLock();
00698    if (retc >= 0) return Blen;
00699    XrdLog.Emsg("Link", errno, "send to", ID);
00700    return -1;
00701 }
00702 
00703 /******************************************************************************/
00704   
00705 int XrdLink::Send(const struct iovec *iov, int iocnt, int bytes)
00706 {
00707    ssize_t bytesleft, n, retc = 0;
00708    const char *Buff;
00709    int i;
00710 
00711 // Add up bytes if they were not given to us
00712 //
00713    if (!bytes) for (i = 0; i < iocnt; i++) bytes += iov[i].iov_len;
00714    bytesleft = static_cast<ssize_t>(bytes);
00715 
00716 // Get a lock and assume we will be successful (statistically we are)
00717 //
00718    wrMutex.Lock();
00719    isIdle = 0;
00720    BytesOut += bytes;
00721 
00722 // Write the data out. On some version of Unix (e.g., Linux) a writev() may
00723 // end at any time without writing all the bytes when directed to a socket.
00724 // So, we attempt to resume the writev() using a combination of write() and
00725 // a writev() continuation. This approach slowly converts a writev() to a
00726 // series of writes if need be. We must do this inline because we must hold
00727 // the lock until all the bytes are written or an error occurs.
00728 //
00729    while(bytesleft)
00730         {do {retc = writev(FD, iov, iocnt);} while(retc < 0 && errno == EINTR);
00731          if (retc >= bytesleft || retc < 0) break;
00732          bytesleft -= retc;
00733          while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
00734               {retc -= n; iov++; iocnt--;}
00735          Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
00736          while(n) {if ((retc = write(FD, Buff, n)) < 0)
00737                       {if (errno == EINTR) continue;
00738                           else break;
00739                       }
00740                    n -= retc; Buff += retc;
00741                   }
00742          if (retc < 0 || iocnt < 1) break;
00743         }
00744 
00745 // All done
00746 //
00747    wrMutex.UnLock();
00748    if (retc >= 0) return bytes;
00749    XrdLog.Emsg("Link", errno, "send to", ID);
00750    return -1;
00751 }
00752  
00753 /******************************************************************************/
00754 int XrdLink::Send(const struct sfVec *sfP, int sfN)
00755 {
00756 #if !defined(HAVE_SENDFILE)
00757    return -1;
00758 #else
00759 // Make sure we have valid vector count
00760 //
00761    if (sfN < 1 || sfN > sfMax)
00762       {XrdLog.Emsg("Link", EINVAL, "send file to", ID);
00763        return -1;
00764       }
00765 
00766 #ifdef __solaris__
00767     sendfilevec_t vecSF[sfMax], *vecSFP = vecSF;
00768     size_t xframt, totamt, bytes = 0;
00769     ssize_t retc;
00770     int i = 0;
00771 
00772 // Construct the sendfilev() vector
00773 //
00774    for (i = 0; i < sfN; sfP++, i++)
00775        {if (sfP->fdnum < 0)
00776            {vecSF[i].sfv_fd  = SFV_FD_SELF;
00777             vecSF[i].sfv_off = (off_t)sfP->buffer;
00778            } else {
00779             vecSF[i].sfv_fd  = sfP->fdnum;
00780             vecSF[i].sfv_off = sfP->offset;
00781            }
00782         vecSF[i].sfv_flag = 0;
00783         vecSF[i].sfv_len  = sfP->sendsz;
00784         bytes += sfP->sendsz;
00785        }
00786    totamt = bytes;
00787 
00788 // Lock the link, issue sendfilev(), and unlock the link. The documentation
00789 // is very spotty and inconsistent. We can only retry this operation under
00790 // very limited conditions.
00791 //
00792    wrMutex.Lock();
00793    isIdle = 0;
00794 do{retc = sendfilev(FD, vecSFP, sfN, &xframt);
00795 
00796 // Check if all went well and return if so (usual case)
00797 //
00798    if (xframt == bytes)
00799       {BytesOut += bytes;
00800        wrMutex.UnLock();
00801        return totamt;
00802       }
00803 
00804 // The only one we will recover from is EINTR. We cannot legally get EAGAIN.
00805 //
00806    if (retc < 0 && errno != EINTR) break;
00807 
00808 // Try to resume the transfer
00809 //
00810    if (xframt > 0)
00811       {BytesOut += xframt; bytes -= xframt; SfIntr++;
00812        while(xframt > 0 && sfN)
00813             {if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
00814                 {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt; break;}
00815              xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
00816             }
00817       }
00818   } while(sfN > 0);
00819 
00820 // See if we can recover without destroying the connection
00821 //
00822    retc = (retc < 0 ? errno : ECANCELED);
00823    wrMutex.UnLock();
00824    XrdLog.Emsg("Link", retc, "send file to", ID);
00825    return -1;
00826 
00827 #elif defined(__linux__)
00828 
00829    static const int setON = 1, setOFF = 0;
00830    ssize_t retc = 0, bytesleft;
00831    off_t myOffset;
00832    int i, xfrbytes = 0, uncork = 1, xIntr = 0;
00833 
00834 // lock the link
00835 //
00836    wrMutex.Lock();
00837    isIdle = 0;
00838 
00839 // In linux we need to cork the socket. On permanent errors we do not uncork
00840 // the socket because it will be closed in short order.
00841 //
00842    if (setsockopt(FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
00843       {XrdLog.Emsg("Link", errno, "cork socket for", ID); 
00844        uncork = 0; sfOK = 0;
00845       }
00846 
00847 // Send the header first
00848 //
00849    for (i = 0; i < sfN; sfP++, i++)
00850        {if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
00851            else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
00852                  while(bytesleft
00853                     && (retc=sendfile(FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
00854                       {myOffset += retc; bytesleft -= retc; xIntr++;}
00855                 }
00856         if (retc <  0 && errno == EINTR) continue;
00857         if (retc <= 0) break;
00858         xfrbytes += sfP->sendsz;
00859        }
00860 
00861 // Diagnose any sendfile errors
00862 //
00863    if (retc <= 0)
00864       {if (retc == 0) errno = ECANCELED;
00865        wrMutex.UnLock();
00866        XrdLog.Emsg("Link", errno, "send file to", ID);
00867        return -1;
00868       }
00869 
00870 // Now uncork the socket
00871 //
00872    if (uncork && setsockopt(FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
00873       XrdLog.Emsg("Link", errno, "uncork socket for", ID);
00874 
00875 // All done
00876 //
00877    if (xIntr > sfN) SfIntr += (xIntr - sfN);
00878    BytesOut += xfrbytes;
00879    wrMutex.UnLock();
00880    return xfrbytes;
00881 #endif
00882 #endif
00883 }
00884 
00885 /******************************************************************************/
00886 /* private                      s e n d D a t a                               */
00887 /******************************************************************************/
00888   
00889 int XrdLink::sendData(const char *Buff, int Blen)
00890 {
00891    ssize_t retc = 0, bytesleft = Blen;
00892 
00893 // Write the data out
00894 //
00895    while(bytesleft)
00896         {if ((retc = write(FD, Buff, bytesleft)) < 0)
00897             {if (errno == EINTR) continue;
00898                 else break;
00899             }
00900          bytesleft -= retc; Buff += retc;
00901         }
00902 
00903 // All done
00904 //
00905    return retc;
00906 }
00907 
00908 /******************************************************************************/
00909 /*                              s e t E t e x t                               */
00910 /******************************************************************************/
00911 
00912 int XrdLink::setEtext(const char *text)
00913 {
00914      opMutex.Lock();
00915      if (Etext) free(Etext);
00916      Etext = (text ? strdup(text) : 0);
00917      opMutex.UnLock();
00918      return -1;
00919 }
00920   
00921 /******************************************************************************/
00922 /*                                 s e t I D                                  */
00923 /******************************************************************************/
00924   
00925 void XrdLink::setID(const char *userid, int procid)
00926 {
00927    char buff[sizeof(Uname)], *bp, *sp;
00928    int ulen;
00929 
00930    snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, FD);
00931    ulen = strlen(buff);
00932    sp = buff + ulen - 1;
00933    bp = &Uname[sizeof(Uname)-1];
00934    if (ulen > (int)sizeof(Uname)) ulen = sizeof(Uname);
00935    *bp = '@'; bp--;
00936    while(ulen--) {*bp = *sp; bp--; sp--;}
00937    ID = bp+1;
00938    Comment = (const char *)ID;
00939 }
00940  
00941 /******************************************************************************/
00942 /*                                 S e t u p                                  */
00943 /******************************************************************************/
00944 
00945 int XrdLink::Setup(int maxfds, int idlewait)
00946 {
00947    int numalloc, iticks, ichk;
00948 
00949 // Make sure our static /dev/null fd is closed whn we exec
00950 //
00951    fcntl(devNull, F_SETFD, FD_CLOEXEC);
00952 
00953 // Compute the number of link objects we should allocate at a time. Generally,
00954 // we like to allocate 8k of them at a time but always as a power of two.
00955 //
00956    numalloc = 8192 / sizeof(XrdLink);
00957    LinkAlloc = 1;
00958    while((numalloc = numalloc/2)) LinkAlloc = LinkAlloc*2;
00959    TRACE(DEBUG, "Allocating " <<LinkAlloc <<" link objects at a time");
00960 
00961 // Create the link table
00962 //
00963    if (!(LinkTab = (XrdLink **)malloc(maxfds*sizeof(XrdLink *)+LinkAlloc)))
00964       {XrdLog.Emsg("Link", ENOMEM, "create LinkTab"); return 0;}
00965    memset((void *)LinkTab, 0, maxfds*sizeof(XrdLink *));
00966 
00967 // Create the slot status table
00968 //
00969    if (!(LinkBat = (char *)malloc(maxfds*sizeof(char)+LinkAlloc)))
00970       {XrdLog.Emsg("Link", ENOMEM, "create LinkBat"); return 0;}
00971    memset((void *)LinkBat, XRDLINK_FREE, maxfds*sizeof(char));
00972 
00973 // Create an idle connection scan job
00974 //
00975    if (idlewait)
00976       {if (!(ichk = idlewait/3)) {iticks = 1; ichk = idlewait;}
00977           else iticks = 3;
00978        XrdLinkScan *ls = new XrdLinkScan(ichk, iticks);
00979        XrdSched.Schedule((XrdJob *)ls, ichk+time(0));
00980       }
00981 
00982    return 1;
00983 }
00984   
00985 /******************************************************************************/
00986 /*                             S e r i a l i z e                              */
00987 /******************************************************************************/
00988   
00989 void XrdLink::Serialize()
00990 {
00991 
00992 // This is meant to make sure that no protocol objects are refering to this
00993 // link so that we can safely run in psuedo single thread mode for critical
00994 // functions.
00995 //
00996    opMutex.Lock();
00997    if (InUse <= 1) opMutex.UnLock();
00998       else {doPost++;
00999             opMutex.UnLock();
01000             TRACEI(DEBUG, "Waiting for link serialization; use=" <<InUse);
01001             IOSemaphore.Wait();
01002            }
01003 }
01004 
01005 /******************************************************************************/
01006 /*                                s e t K W T                                 */
01007 /******************************************************************************/
01008   
01009 void XrdLink::setKWT(int wkSec, int kwSec)
01010 {
01011    if (wkSec > 0) waitKill = static_cast<short>(wkSec);
01012    if (kwSec > 0) killWait = static_cast<short>(kwSec);
01013 }
01014   
01015 /******************************************************************************/
01016 /*                           s e t P r o t o c o l                            */
01017 /******************************************************************************/
01018   
01019 XrdProtocol *XrdLink::setProtocol(XrdProtocol *pp)
01020 {
01021 
01022 // Set new protocol.
01023 //
01024    opMutex.Lock();
01025    XrdProtocol *op = Protocol;
01026    Protocol = pp; 
01027    opMutex.UnLock();
01028    return op;
01029 }
01030 
01031 /******************************************************************************/
01032 /*                                s e t R e f                                 */
01033 /******************************************************************************/
01034   
01035 void XrdLink::setRef(int use)
01036 {
01037    opMutex.Lock();
01038    TRACEI(DEBUG,"Setting ref to " <<InUse <<'+' <<use <<" post=" <<doPost);
01039    InUse += use;
01040 
01041          if (!InUse)
01042             {InUse = 1; opMutex.UnLock();
01043              XrdLog.Emsg("Link", "Zero use count for", ID);
01044             }
01045     else if (InUse == 1 && doPost)
01046             {doPost--;
01047              IOSemaphore.Post();
01048              TRACEI(CONN, "setRef posted link");
01049              opMutex.UnLock();
01050             }
01051     else if (InUse < 0)
01052             {InUse = 1;
01053              opMutex.UnLock();
01054              XrdLog.Emsg("Link", "Negative use count for", ID);
01055             }
01056     else opMutex.UnLock();
01057 }
01058  
01059 /******************************************************************************/
01060 /*                                 S t a t s                                  */
01061 /******************************************************************************/
01062 
01063 int XrdLink::Stats(char *buff, int blen, int do_sync)
01064 {
01065    static const char statfmt[] = "<stats id=\"link\"><num>%d</num>"
01066           "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
01067           "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
01068           "<sfps>%d</sfps></stats>";
01069    int i, myLTLast;
01070 
01071 // Check if actual length wanted
01072 //
01073    if (!buff) return sizeof(statfmt)+17*6;
01074 
01075 // We must synchronize the statistical counters
01076 //
01077    if (do_sync)
01078       {LTMutex.Lock(); myLTLast = LTLast; LTMutex.UnLock();
01079        for (i = 0; i <= myLTLast; i++) 
01080            if (LinkBat[i] == XRDLINK_USED && LinkTab[i]) 
01081               LinkTab[i]->syncStats();
01082       }
01083 
01084 // Obtain lock on the stats area and format it
01085 //
01086    statsMutex.Lock();
01087    i = snprintf(buff, blen, statfmt, LinkCount,   LinkCountMax, LinkCountTot,
01088                                      LinkBytesIn, LinkBytesOut, LinkConTime,
01089                                      LinkTimeOuts,LinkStalls,   LinkSfIntr);
01090    statsMutex.UnLock();
01091    return i;
01092 }
01093   
01094 /******************************************************************************/
01095 /*                             s y n c S t a t s                              */
01096 /******************************************************************************/
01097   
01098 void XrdLink::syncStats(int *ctime)
01099 {
01100 
01101 // If this is dynamic, get the opMutex lock
01102 //
01103    if (!ctime) opMutex.Lock();
01104 
01105 // Either the caller has the opMutex or this is called out of close. In either
01106 // case, we need to get the read
01107 //
01108    statsMutex.Lock();
01109    rdMutex.Lock();
01110    LinkBytesIn  += BytesIn; BytesInTot   += BytesIn;   BytesIn = 0;
01111    LinkTimeOuts += tardyCnt; tardyCntTot += tardyCnt; tardyCnt = 0;
01112    LinkStalls   += stallCnt; stallCntTot += stallCnt; stallCnt = 0;
01113    rdMutex.UnLock();
01114    wrMutex.Lock();
01115    LinkBytesOut += BytesOut; BytesOutTot += BytesOut;BytesOut = 0;
01116    LinkSfIntr   += SfIntr;   SfIntr = 0;
01117    wrMutex.UnLock();
01118    if (ctime)
01119       {*ctime = time(0) - conTime;
01120        LinkConTime += *ctime;
01121        if (!(LinkCount--)) LinkCount = 0;
01122       }
01123    statsMutex.UnLock();
01124 
01125 // Make sure the protocol updates it's statistics as well
01126 //
01127    if (Protocol) Protocol->Stats(0, 0, 1);
01128 
01129 // Clear our local counters
01130 //
01131    if (!ctime) opMutex.UnLock();
01132 }
01133  
01134 /******************************************************************************/
01135 /*                             T e r m i n a t e                              */
01136 /******************************************************************************/
01137   
01138 int XrdLink::Terminate(const XrdLink *owner, int fdnum, unsigned int inst)
01139 {
01140    XrdSysCondVar killDone(0);
01141    XrdLink *lp;
01142    char buff[1024], *cp;
01143    int wTime, didKW = KillCnt & KillXwt;
01144 
01145 // Find the correspodning link
01146 //
01147    KillCnt = KillCnt & KillMsk;
01148    if (!(lp = fd2link(fdnum, inst))) return (didKW ? -EPIPE : -ESRCH);
01149 
01150 // If this is self termination, then indicate that to the caller
01151 //
01152    if (lp == owner) return 0;
01153 
01154 // Serialize the target link
01155 //
01156    lp->Serialize();
01157    lp->opMutex.Lock();
01158 
01159 // If this link is now dead, simply ignore the request. Typically, this
01160 // indicates a race condition that the server won.
01161 //
01162    if ( lp->FD != fdnum ||   lp->Instance != inst
01163    || !(lp->Poller)     || !(lp->Protocol))
01164       {lp->opMutex.UnLock();
01165        return -EPIPE;
01166       }
01167 
01168 // Verify that the owner of this link is making the request
01169 //
01170    if (owner 
01171    && (!(cp = index(owner->ID, ':')) 
01172       || strncmp(lp->ID, owner->ID, cp-(owner->ID))
01173       || strcmp(owner->Lname, lp->Lname)))
01174       {lp->opMutex.UnLock();
01175        return -EACCES;
01176       }
01177 
01178 // Check if we have too many tries here
01179 //
01180    if (lp->KillCnt > KillMax)
01181       {lp->opMutex.UnLock();
01182        return -ETIME;
01183       }
01184    wTime = lp->KillCnt++;
01185 
01186 // Make sure we can disable this link. Of not, then force the caller to wait
01187 // a tad more than the read timeout interval.
01188 //
01189    if (!(lp->isEnabled) || lp->InUse > 1 || lp->KillcvP)
01190       {wTime = wTime*2+waitKill;
01191        KillCnt |= KillXwt;
01192        lp->opMutex.UnLock();
01193        return (wTime > 60 ? 60: wTime);
01194       }
01195 
01196 // Set the pointer to our condvar. We are holding the opMutex to prevent a race.
01197 //
01198    lp->KillcvP = &killDone;
01199    killDone.Lock();
01200 
01201 // We can now disable the link and schedule a close
01202 //
01203    snprintf(buff, sizeof(buff), "ended by %s", ID);
01204    buff[sizeof(buff)-1] = '\0';
01205    lp->Poller->Disable(lp, buff);
01206    lp->opMutex.UnLock();
01207 
01208 // Now wait for the link to shutdown. This avoids lock problems.
01209 //
01210    if (killDone.Wait(int(killWait))) {wTime += killWait; KillCnt |= KillXwt;}
01211       else wTime = -EPIPE;
01212    killDone.UnLock();
01213 
01214 // Reobtain the opmutex so that we can zero out the pointer the condvar pntr
01215 // This is really stupid code but because we don't have a way of associating
01216 // an arbitrary mutex with a condvar. But since this code is rarely executed
01217 // the ugliness is sort of tolerable.
01218 //
01219    lp->opMutex.Lock(); lp->KillcvP = 0; lp->opMutex.UnLock();
01220 
01221 // Do some tracing
01222 //
01223    TRACEI(DEBUG,"Terminate " << (wTime <= 0 ? "complete ":"timeout ") <<wTime);
01224    return wTime;
01225 }
01226 
01227 /******************************************************************************/
01228 /*                              i d l e S c a n                               */
01229 /******************************************************************************/
01230   
01231 #undef   TRACELINK
01232 #define  TRACELINK lp
01233 
01234 void XrdLinkScan::idleScan()
01235 {
01236    XrdLink *lp;
01237    int i, ltlast, lnum = 0, tmo = 0, tmod = 0;
01238 
01239 // Get the current link high watermark
01240 //
01241    XrdLink::LTMutex.Lock();
01242    ltlast = XrdLink::LTLast;
01243    XrdLink::LTMutex.UnLock();
01244 
01245 // Scan across all links looking for idle links. Links are never deallocated
01246 // so we don't need any special kind of lock for these
01247 //
01248    for (i = 0; i <= ltlast; i++)
01249        {if (XrdLink::LinkBat[i] != XRDLINK_USED 
01250         || !(lp = XrdLink::LinkTab[i])) continue;
01251         lnum++;
01252         lp->opMutex.Lock();
01253         if (lp->isIdle) tmo++;
01254         lp->isIdle++;
01255         if ((int(lp->isIdle)) < idleTicks) {lp->opMutex.UnLock(); continue;}
01256         lp->isIdle = 0;
01257         if (!(lp->Poller) || !(lp->isEnabled))
01258            XrdLog.Emsg("LinkScan","Link",lp->ID,"is disabled and idle.");
01259            else if (lp->InUse == 1)
01260                    {lp->Poller->Disable(lp, "idle timeout");
01261                     tmod++;
01262                    }
01263         lp->opMutex.UnLock();
01264        }
01265 
01266 // Trace what we did
01267 //
01268    TRACE(CONN, lnum <<" links; " <<tmo <<" idle; " <<tmod <<" force closed");
01269 
01270 // Reschedule ourselves
01271 //
01272    XrdSched.Schedule((XrdJob *)this, idleCheck+time(0));
01273 }

Generated on Tue Jul 5 14:46:14 2011 for ROOT_528-00b_version by  doxygen 1.5.1