00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdPollCVSID = "$Id: XrdPoll.cc 28902 2009-06-11 12:36:21Z ganis $";
00014
00015 #include <fcntl.h>
00016 #include <unistd.h>
00017 #include <stdio.h>
00018 #include <stdlib.h>
00019
00020 #include "XrdSys/XrdSysError.hh"
00021 #include "XrdSys/XrdSysPlatform.hh"
00022 #include "XrdSys/XrdSysPthread.hh"
00023 #include "Xrd/XrdLink.hh"
00024 #include "Xrd/XrdProtocol.hh"
00025 #define TRACELINK lp
00026 #include "Xrd/XrdTrace.hh"
00027
00028 #if defined(_DEVPOLL)
00029 #include "Xrd/XrdPollDev.hh"
00030 #elif defined(_EPOLL) && defined(__linux__)
00031 #include "Xrd/XrdPollE.hh"
00032 #else
00033 #include "Xrd/XrdPollPoll.hh"
00034 #endif
00035
00036
00037
00038
00039
00040 class XrdPoll_End : public XrdProtocol
00041 {
00042 public:
00043
00044 void DoIt() {}
00045
00046 XrdProtocol *Match(XrdLink *lp) {return (XrdProtocol *)0;}
00047
00048 int Process(XrdLink *lp) {return -1;}
00049
00050 void Recycle(XrdLink *lp, int x, const char *y) {}
00051
00052 int Stats(char *buff, int blen, int do_sync=0) {return 0;}
00053
00054 XrdPoll_End() : XrdProtocol("link termination") {}
00055 ~XrdPoll_End() {}
00056 };
00057
00058
00059
00060
00061
00062 XrdPoll *XrdPoll::Pollers[XRD_NUMPOLLERS] = {0, 0, 0};
00063
00064 XrdSysMutex XrdPoll::doingAttach;
00065
00066 const char *XrdPoll::TraceID = "Poll";
00067
00068 extern XrdSysError XrdLog;
00069
00070 extern XrdOucTrace XrdTrace;
00071
00072
00073
00074
00075
00076 struct XrdPollArg
00077 {XrdPoll *Poller;
00078 int retcode;
00079 XrdSysSemaphore PollSync;
00080
00081 XrdPollArg() : PollSync(0, "poll sync") {}
00082 ~XrdPollArg() {}
00083 };
00084
00085
00086 void *XrdStartPolling(void *parg)
00087 {
00088 struct XrdPollArg *PArg = (struct XrdPollArg *)parg;
00089 PArg->Poller->Start(&(PArg->PollSync), PArg->retcode);
00090 return (void *)0;
00091 }
00092
00093
00094
00095
00096
00097 XrdPoll::XrdPoll()
00098 {
00099 int fildes[2];
00100
00101 TID=0;
00102 numAttached=numEnabled=numEvents=numInterrupts=0;
00103
00104 if (pipe(fildes) == 0)
00105 {CmdFD = fildes[1]; fcntl(CmdFD, F_SETFD, FD_CLOEXEC);
00106 ReqFD = fildes[0]; fcntl(ReqFD, F_SETFD, FD_CLOEXEC);
00107 } else {
00108 CmdFD = ReqFD = -1;
00109 XrdLog.Emsg("Poll", errno, "create poll pipe");
00110 }
00111 PipeBuff = 0;
00112 PipeBlen = 0;
00113 PipePoll.fd = ReqFD;
00114 PipePoll.events = POLLIN | POLLRDNORM;
00115 }
00116
00117
00118
00119
00120
00121 int XrdPoll::Attach(XrdLink *lp)
00122 {
00123 int i;
00124 XrdPoll *pp;
00125
00126
00127
00128 doingAttach.Lock();
00129
00130
00131
00132 pp = Pollers[0];
00133 for (i = 1; i < XRD_NUMPOLLERS; i++)
00134 if (pp->numAttached > Pollers[i]->numAttached) pp = Pollers[i];
00135
00136
00137
00138 if (!pp->Include(lp)) {doingAttach.UnLock(); return 0;}
00139
00140
00141
00142 lp->Poller = pp;
00143 pp->numAttached++;
00144 doingAttach.UnLock();
00145 TRACEI(POLL, "FD " <<lp->FD <<" attached to poller " <<pp->PID <<"; num=" <<pp->numAttached);
00146 return 1;
00147 }
00148
00149
00150
00151
00152
00153 void XrdPoll::Detach(XrdLink *lp)
00154 {
00155 XrdPoll *pp;
00156
00157
00158
00159 if (!(pp = lp->Poller)) return;
00160
00161
00162
00163 pp->Exclude(lp);
00164
00165
00166
00167 doingAttach.Lock();
00168 if (!pp->numAttached)
00169 {XrdLog.Emsg("Poll","Underflow detaching", lp->ID); abort();}
00170 pp->numAttached--;
00171 doingAttach.UnLock();
00172 TRACEI(POLL, "FD " <<lp->FD <<" detached from poller " <<pp->PID <<"; num=" <<pp->numAttached);
00173 }
00174
00175
00176
00177
00178
00179 int XrdPoll::Finish(XrdLink *lp, const char *etxt)
00180 {
00181 static XrdPoll_End LinkEnd;
00182
00183
00184
00185 if (lp->Protocol == &LinkEnd)
00186 {TRACEI(POLL, "Link " <<lp->FD <<" already terminating; "
00187 <<(etxt ? etxt : "") <<" request ignored.");
00188 return 0;
00189 }
00190
00191
00192
00193 lp->ProtoAlt = lp->Protocol;
00194 lp->Protocol = static_cast<XrdProtocol *>(&LinkEnd);
00195 if (etxt)
00196 {if (lp->Etext) free(lp->Etext);
00197 lp->Etext = strdup(etxt);
00198 } else etxt = "reason unknown";
00199 TRACEI(POLL, "Link " <<lp->FD <<" terminating: " <<etxt);
00200 return 1;
00201 }
00202
00203
00204
00205
00206
00207
00208
00209
00210 int XrdPoll::getRequest()
00211 {
00212 ssize_t rlen;
00213 int rc;
00214
00215
00216
00217 if (!PipeBlen)
00218 {PipeBuff = (char *)&ReqBuff; PipeBlen = sizeof(ReqBuff);}
00219
00220
00221
00222
00223 do {rc = poll(&PipePoll, 1, 0);}
00224 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
00225 if (rc < 1) return 0;
00226
00227
00228
00229
00230 do {rlen = read(ReqFD, PipeBuff, PipeBlen);}
00231 while(rlen < 0 && errno == EINTR);
00232 if (rlen <= 0)
00233 {if (rlen) XrdLog.Emsg("Poll", errno, "read from request pipe");
00234 return 0;
00235 }
00236
00237
00238
00239
00240 if (!(PipeBlen -= rlen)) return 1;
00241 PipeBuff += rlen;
00242 TRACE(POLL, "Poller " <<PID <<" still needs " <<PipeBlen <<" req pipe bytes");
00243 return 0;
00244 }
00245
00246
00247
00248
00249
00250 char *XrdPoll::Poll2Text(short events)
00251 {
00252 if (events & POLLERR) return strdup("socket error");
00253
00254 if (events & POLLHUP) return strdup("client disconnected");
00255
00256 if (events & POLLNVAL) return strdup("client closed socket");
00257
00258 {char buff[64];
00259 sprintf(buff, "unusual event (%.4x)", events);
00260 return strdup(buff);
00261 }
00262 return (char *)0;
00263 }
00264
00265
00266
00267
00268
00269 int XrdPoll::Setup(int numfd)
00270 {
00271 pthread_t tid;
00272 int maxfd, retc, i;
00273 struct XrdPollArg PArg;
00274
00275
00276
00277 maxfd = (numfd / XRD_NUMPOLLERS) + 16;
00278
00279
00280
00281 for (i = 0; i < XRD_NUMPOLLERS; i++)
00282 {if (!(Pollers[i] = newPoller(i, maxfd))) return 0;
00283 Pollers[i]->PID = i;
00284
00285
00286
00287 PArg.Poller = Pollers[i];
00288 PArg.retcode= 0;
00289 TRACE(POLL, "Starting poller " <<i);
00290 if ((retc = XrdSysThread::Run(&tid,XrdStartPolling,(void *)&PArg,
00291 XRDSYSTHREAD_BIND, "Poller")))
00292 {XrdLog.Emsg("Poll", retc, "create poller thread"); return 0;}
00293 Pollers[i]->TID = tid;
00294 PArg.PollSync.Wait();
00295 if (PArg.retcode)
00296 {XrdLog.Emsg("Poll", PArg.retcode, "start poller");
00297 return 0;
00298 }
00299 }
00300
00301
00302
00303 return 1;
00304 }
00305
00306
00307
00308
00309
00310 int XrdPoll::Stats(char *buff, int blen, int do_sync)
00311 {
00312 static const char statfmt[] = "<stats id=\"poll\"><att>%d</att>"
00313 "<en>%d</en><ev>%d</ev><int>%d</int></stats>";
00314 int i, numatt = 0, numen = 0, numev = 0, numint = 0;
00315 XrdPoll *pp;
00316
00317
00318
00319 if (!buff) return (sizeof(statfmt)+(4*16))*XRD_NUMPOLLERS;
00320
00321
00322
00323
00324
00325 for (i = 0; i < XRD_NUMPOLLERS; i++)
00326 {pp = Pollers[i];
00327 numatt += pp->numAttached;
00328 numen += pp->numEnabled;
00329 numev += pp->numEvents;
00330 numint += pp->numInterrupts;
00331 }
00332
00333
00334
00335 return snprintf(buff, blen, statfmt, numatt, numen, numev, numint);
00336 }
00337
00338
00339
00340
00341
00342 #if defined(_DEVPOLL)
00343 #include "Xrd/XrdPollDev.icc"
00344 #elif defined(_EPOLL) && defined(__linux__)
00345 #include "Xrd/XrdPollE.icc"
00346 #else
00347 #include "Xrd/XrdPollPoll.icc"
00348 #endif