XrdPoll.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                            X r d P o l l . c c                             */
00004 /*                                                                            */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*       All Rights Reserved. See XrdInfo.cc for complete License Terms       */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC03-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //           $Id: XrdPoll.cc 28902 2009-06-11 12:36:21Z ganis $
00012 
00013 const char *XrdPollCVSID = "$Id: XrdPoll.cc 28902 2009-06-11 12:36:21Z ganis $";
00014 
00015 #include <fcntl.h>
00016 #include <unistd.h>
00017 #include <stdio.h>
00018 #include <stdlib.h>
00019   
00020 #include "XrdSys/XrdSysError.hh"
00021 #include "XrdSys/XrdSysPlatform.hh"
00022 #include "XrdSys/XrdSysPthread.hh"
00023 #include "Xrd/XrdLink.hh"
00024 #include "Xrd/XrdProtocol.hh"
00025 #define  TRACELINK lp
00026 #include "Xrd/XrdTrace.hh"
00027 
00028 #if   defined(_DEVPOLL)
00029 #include "Xrd/XrdPollDev.hh"
00030 #elif defined(_EPOLL) && defined(__linux__)
00031 #include "Xrd/XrdPollE.hh"
00032 #else
00033 #include "Xrd/XrdPollPoll.hh"
00034 #endif
00035 
00036 /******************************************************************************/
00037 /*                         L o c a l   C l a s s e s                          */
00038 /******************************************************************************/
00039 
00040 class XrdPoll_End : public XrdProtocol
00041 {
00042 public:
00043 
00044 void          DoIt() {}
00045 
00046 XrdProtocol  *Match(XrdLink *lp) {return (XrdProtocol *)0;}
00047 
00048 int           Process(XrdLink *lp) {return -1;}
00049 
00050 void          Recycle(XrdLink *lp, int x, const char *y) {}
00051 
00052 int           Stats(char *buff, int blen, int do_sync=0) {return 0;}
00053 
00054       XrdPoll_End() : XrdProtocol("link termination") {}
00055      ~XrdPoll_End() {}
00056 };
00057 
00058 /******************************************************************************/
00059 /*                           G l o b a l   D a t a                            */
00060 /******************************************************************************/
00061   
00062        XrdPoll   *XrdPoll::Pollers[XRD_NUMPOLLERS] = {0, 0, 0};
00063 
00064        XrdSysMutex  XrdPoll::doingAttach;
00065 
00066        const char *XrdPoll::TraceID = "Poll";
00067 
00068 extern XrdSysError  XrdLog;
00069 
00070 extern XrdOucTrace  XrdTrace;
00071 
00072 /******************************************************************************/
00073 /*              T h r e a d   S t a r t u p   I n t e r f a c e               */
00074 /******************************************************************************/
00075 
00076 struct XrdPollArg
00077        {XrdPoll      *Poller;
00078         int            retcode;
00079         XrdSysSemaphore PollSync;
00080 
00081         XrdPollArg() : PollSync(0, "poll sync") {}
00082        ~XrdPollArg()               {}
00083        };
00084 
00085   
00086 void *XrdStartPolling(void *parg)
00087 {
00088      struct XrdPollArg *PArg = (struct XrdPollArg *)parg;
00089      PArg->Poller->Start(&(PArg->PollSync), PArg->retcode);
00090      return (void *)0;
00091 }
00092  
00093 /******************************************************************************/
00094 /*                           C o n s t r u c t o r                            */
00095 /******************************************************************************/
00096   
00097 XrdPoll::XrdPoll()
00098 {
00099    int fildes[2];
00100 
00101    TID=0;
00102    numAttached=numEnabled=numEvents=numInterrupts=0;
00103 
00104    if (pipe(fildes) == 0)
00105       {CmdFD = fildes[1]; fcntl(CmdFD, F_SETFD, FD_CLOEXEC);
00106        ReqFD = fildes[0]; fcntl(ReqFD, F_SETFD, FD_CLOEXEC);
00107       } else {
00108        CmdFD = ReqFD = -1;
00109        XrdLog.Emsg("Poll", errno, "create poll pipe");
00110       }
00111    PipeBuff        = 0;
00112    PipeBlen        = 0;
00113    PipePoll.fd     = ReqFD;
00114    PipePoll.events = POLLIN | POLLRDNORM;
00115 }
00116 
00117 /******************************************************************************/
00118 /*                                A t t a c h                                 */
00119 /******************************************************************************/
00120   
00121 int XrdPoll::Attach(XrdLink *lp)
00122 {
00123    int i;
00124    XrdPoll *pp;
00125 
00126 // We allow only one attach at a time to simplify the processing
00127 //
00128    doingAttach.Lock();
00129 
00130 // Find a poller with the smallest number of entries
00131 //
00132    pp = Pollers[0];
00133    for (i = 1; i < XRD_NUMPOLLERS; i++)
00134        if (pp->numAttached > Pollers[i]->numAttached) pp = Pollers[i];
00135 
00136 // Include this FD into the poll set of the poller
00137 //
00138    if (!pp->Include(lp)) {doingAttach.UnLock(); return 0;}
00139 
00140 // Complete the link setup
00141 //
00142    lp->Poller = pp;
00143    pp->numAttached++;
00144    doingAttach.UnLock();
00145    TRACEI(POLL, "FD " <<lp->FD <<" attached to poller " <<pp->PID <<"; num=" <<pp->numAttached);
00146    return 1;
00147 }
00148 
00149 /******************************************************************************/
00150 /*                                D e t a c h                                 */
00151 /******************************************************************************/
00152   
00153 void XrdPoll::Detach(XrdLink *lp)
00154 {
00155    XrdPoll *pp;
00156 
00157 // If link is not attached, simply return
00158 //
00159    if (!(pp = lp->Poller)) return;
00160 
00161 // Exclude this link from the associated poll set
00162 //
00163    pp->Exclude(lp);
00164 
00165 // Make sure we are consistent
00166 //
00167    doingAttach.Lock();
00168    if (!pp->numAttached)
00169       {XrdLog.Emsg("Poll","Underflow detaching", lp->ID); abort();}
00170    pp->numAttached--;
00171    doingAttach.UnLock();
00172    TRACEI(POLL, "FD " <<lp->FD <<" detached from poller " <<pp->PID <<"; num=" <<pp->numAttached);
00173 }
00174 
00175 /******************************************************************************/
00176 /*                                F i n i s h                                 */
00177 /******************************************************************************/
00178   
00179 int XrdPoll::Finish(XrdLink *lp, const char *etxt)
00180 {
00181    static XrdPoll_End LinkEnd;
00182 
00183 // If this link is already scheduled for termination, ignore this call.
00184 //
00185    if (lp->Protocol == &LinkEnd)
00186       {TRACEI(POLL, "Link " <<lp->FD <<" already terminating; "
00187                     <<(etxt ? etxt : "") <<" request ignored.");
00188        return 0;
00189       }
00190 
00191 // Set the protocol pointer to be link termination
00192 //
00193    lp->ProtoAlt = lp->Protocol;
00194    lp->Protocol = static_cast<XrdProtocol *>(&LinkEnd);
00195    if (etxt)
00196       {if (lp->Etext) free(lp->Etext);
00197        lp->Etext = strdup(etxt);
00198       } else etxt = "reason unknown";
00199    TRACEI(POLL, "Link " <<lp->FD <<" terminating: " <<etxt);
00200    return 1;
00201 }
00202   
00203 /******************************************************************************/
00204 /*                            g e t R e q u e s t                             */
00205 /******************************************************************************/
00206 
00207 // Warning: This method runs unlocked. The caller must have exclusive use of
00208 //          the ReqBuff otherwise unpredictable results will occur.
00209 
00210 int XrdPoll::getRequest()
00211 {
00212    ssize_t rlen;
00213    int rc;
00214 
00215 // See if we are to resume a read or start a fresh one
00216 //
00217    if (!PipeBlen) 
00218       {PipeBuff = (char *)&ReqBuff; PipeBlen = sizeof(ReqBuff);}
00219 
00220 // Wait for the next request. Some OS's (like Linux) don't support non-blocking
00221 // pipes. So, we must front the read with a poll.
00222 //
00223    do {rc = poll(&PipePoll, 1, 0);}
00224       while(rc < 0 && (errno == EAGAIN || errno == EINTR));
00225    if (rc < 1) return 0;
00226 
00227 // Now we can put up a read without a delay. Normally a full command will be
00228 // present. Under some heavy conditions, this may not be the case.
00229 //
00230    do {rlen = read(ReqFD, PipeBuff, PipeBlen);} 
00231       while(rlen < 0 && errno == EINTR);
00232    if (rlen <= 0)
00233       {if (rlen) XrdLog.Emsg("Poll", errno, "read from request pipe");
00234        return 0;
00235       }
00236 
00237 // Check if all the data has arrived. If not all the data is present, defer
00238 // this request until more data arrives.
00239 //
00240    if (!(PipeBlen -= rlen)) return 1;
00241    PipeBuff += rlen;
00242    TRACE(POLL, "Poller " <<PID <<" still needs " <<PipeBlen <<" req pipe bytes");
00243    return 0;
00244 }
00245 
00246 /******************************************************************************/
00247 /*                             P o l l 2 T e x t                              */
00248 /******************************************************************************/
00249   
00250 char *XrdPoll::Poll2Text(short events)
00251 {
00252    if (events & POLLERR) return strdup("socket error");
00253 
00254    if (events & POLLHUP) return strdup("client disconnected");
00255 
00256    if (events & POLLNVAL) return strdup("client closed socket");
00257 
00258   {char buff[64];
00259    sprintf(buff, "unusual event (%.4x)", events);
00260    return strdup(buff);
00261   }
00262   return (char *)0;
00263 }
00264 
00265 /******************************************************************************/
00266 /*                                 S e t u p                                  */
00267 /******************************************************************************/
00268   
00269 int XrdPoll::Setup(int numfd)
00270 {
00271    pthread_t tid;
00272    int maxfd, retc, i;
00273    struct XrdPollArg PArg;
00274 
00275 // Calculate the number of table entries per poller
00276 //
00277    maxfd  = (numfd / XRD_NUMPOLLERS) + 16;
00278 
00279 // Verify that we initialized the poller table
00280 //
00281    for (i = 0; i < XRD_NUMPOLLERS; i++)
00282        {if (!(Pollers[i] = newPoller(i, maxfd))) return 0;
00283         Pollers[i]->PID = i;
00284 
00285    // Now start a thread to handle this poller object
00286    //
00287         PArg.Poller = Pollers[i];
00288         PArg.retcode= 0;
00289         TRACE(POLL, "Starting poller " <<i);
00290         if ((retc = XrdSysThread::Run(&tid,XrdStartPolling,(void *)&PArg,
00291                                       XRDSYSTHREAD_BIND, "Poller")))
00292            {XrdLog.Emsg("Poll", retc, "create poller thread"); return 0;}
00293         Pollers[i]->TID = tid;
00294         PArg.PollSync.Wait();
00295         if (PArg.retcode)
00296            {XrdLog.Emsg("Poll", PArg.retcode, "start poller");
00297             return 0;
00298            }
00299        }
00300 
00301 // All done
00302 //
00303    return 1;
00304 }
00305 
00306 /******************************************************************************/
00307 /*                                 S t a t s                                  */
00308 /******************************************************************************/
00309   
00310 int XrdPoll::Stats(char *buff, int blen, int do_sync)
00311 {
00312    static const char statfmt[] = "<stats id=\"poll\"><att>%d</att>"
00313    "<en>%d</en><ev>%d</ev><int>%d</int></stats>";
00314    int i, numatt = 0, numen = 0, numev = 0, numint = 0;
00315    XrdPoll *pp;
00316 
00317 // Return number of bytes if so wanted
00318 //
00319    if (!buff) return (sizeof(statfmt)+(4*16))*XRD_NUMPOLLERS;
00320 
00321 // Get statistics. While we wish we could honor do_sync, doing so would be
00322 // costly and hardly worth it. So, we do not include code such as:
00323 //    x = pp->y; if (do_sync) while(x != pp->y) x = pp->y; tot += x;
00324 //
00325    for (i = 0; i < XRD_NUMPOLLERS; i++)
00326        {pp = Pollers[i];
00327         numatt += pp->numAttached; 
00328         numen  += pp->numEnabled;
00329         numev  += pp->numEvents;
00330         numint += pp->numInterrupts;
00331        }
00332 
00333 // Format and return
00334 //
00335    return snprintf(buff, blen, statfmt, numatt, numen, numev, numint);
00336 }
00337   
00338 /******************************************************************************/
00339 /*              I m p l e m e n t a t i o n   S p e c i f i c s               */
00340 /******************************************************************************/
00341   
00342 #if   defined(_DEVPOLL)
00343 #include "Xrd/XrdPollDev.icc"
00344 #elif defined(_EPOLL) && defined(__linux__)
00345 #include "Xrd/XrdPollE.icc"
00346 #else
00347 #include "Xrd/XrdPollPoll.icc"
00348 #endif

Generated on Tue Jul 5 14:46:15 2011 for ROOT_528-00b_version by  doxygen 1.5.1