00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdMpxStatsCVSID = "$Id: XrdMpxStats.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <stdio.h>
00016 #include <unistd.h>
00017 #include <sys/types.h>
00018 #include <sys/socket.h>
00019 #include <sys/uio.h>
00020
00021 #include "XrdNet/XrdNetDNS.hh"
00022 #include "XrdNet/XrdNetOpts.hh"
00023 #include "XrdNet/XrdNetSocket.hh"
00024 #include "XrdOuc/XrdOucTokenizer.hh"
00025 #include "XrdSys/XrdSysError.hh"
00026 #include "XrdSys/XrdSysLogger.hh"
00027 #include "XrdSys/XrdSysHeaders.hh"
00028 #include "XrdSys/XrdSysPlatform.hh"
00029 #include "XrdSys/XrdSysPthread.hh"
00030
00031
00032
00033
00034
00035 namespace XrdMpx
00036 {
00037 XrdSysLogger Logger;
00038
00039 XrdSysError Say(&Logger, "mpxstats");
00040
00041 static const int addSender = 0x0001;
00042
00043 int Opts;
00044
00045 int Debug;
00046 };
00047
00048 using namespace XrdMpx;
00049
00050
00051
00052
00053
00054
00055
00056
00057 class XrdMpxVar
00058 {
00059 public:
00060
00061 int Pop(const char *vName);
00062
00063 int Push(const char *vName);
00064
00065 void Reset() {vEnd = vBuff; vNum = -1; *vBuff = 0;}
00066
00067 const char *Var() {return vBuff;}
00068
00069 XrdMpxVar() : vFence(vBuff + sizeof(vBuff) - 1) {Reset();}
00070 ~XrdMpxVar() {}
00071
00072 private:
00073
00074 static const int vMax = 15;
00075 char *vEnd, *vFence, *vStack[vMax+1], vBuff[1024];
00076 int vNum;
00077 };
00078
00079
00080
00081
00082
00083 int XrdMpxVar::Pop(const char *vName)
00084 {
00085 if (Debug) cerr <<"Pop: " <<(vName ? vName : "") <<"; var=" <<vBuff <<endl;
00086 if (vNum < 0 || (vName && strcmp(vStack[vNum], vName))) return 0;
00087 vEnd = vStack[vNum]-1; *vEnd = '\0'; vNum--;
00088 return 1;
00089 }
00090
00091
00092
00093
00094
00095 int XrdMpxVar::Push(const char *vName)
00096 {
00097 int n = strlen(vName);
00098
00099 if (Debug) cerr <<"Push: " <<vName <<"; var=" <<vBuff <<endl;
00100 if (vNum >= vMax) return 0;
00101 if (vNum >= 0) *vEnd++ = '.';
00102 else vEnd = vBuff;
00103 if (vEnd+n+1 >= vFence) return 0;
00104 strcpy(vEnd, vName);
00105 vStack[++vNum] = vEnd;
00106 vEnd += n;
00107 return 1;
00108 }
00109
00110
00111
00112
00113
00114 class XrdMpxXml
00115 {
00116 public:
00117
00118 enum fmtType {fmtCGI, fmtFlat, fmtXML};
00119
00120 int Format(const char *Host, char *ibuff, char *obuff);
00121
00122 XrdMpxXml(fmtType ft) : fType(ft)
00123 {if (ft == fmtCGI) {vSep = '='; vSfx = '&';}
00124 else {vSep = ' '; vSfx = '\n';}
00125 }
00126 ~XrdMpxXml() {}
00127
00128 private:
00129
00130 struct VarInfo
00131 {const char *Name;
00132 char *Data;
00133 };
00134
00135 char *Add(char *Buff, const char *Var, const char *Val);
00136 void getVars(XrdOucTokenizer &Data, VarInfo Var[]);
00137 int xmlErr(const char *t1, const char *t2=0, const char *t3=0);
00138
00139 fmtType fType;
00140 char vSep;
00141 char vSfx;
00142 };
00143
00144
00145
00146
00147
00148 int XrdMpxXml::Format(const char *Host, char *ibuff, char *obuff)
00149 {
00150 static const char *Hdr0 = "<statistics ";
00151 static const int H0Len = strlen(Hdr0);
00152
00153 XrdMpxVar xVar;
00154 XrdOucTokenizer Data(ibuff);
00155 VarInfo vHead[] = {{"tod", 0}, {"ver", 0}, {"src", 0}, {"tos", 0},
00156 {"pgm", 0}, {"ins", 0}, {"pid", 0}, {0, 0}};
00157 VarInfo vStat[] = {{"id", 0}, {0, 0}};
00158 VarInfo vTail[] = {{"toe", 0}, {0, 0}};
00159 char *lP = ibuff, *oP = obuff, *tP, *vP;
00160 int i, rc;
00161
00162
00163
00164 if (!(lP = (char *)index(lP, '>')))
00165 return xmlErr("Invalid xml stream: ", ibuff);
00166 *lP++ = '\n';
00167
00168
00169
00170 while(*lP)
00171 {if (*lP == '>' || (*lP == '<' && *(lP+1) == '/')) *lP = ' ';
00172 lP++;
00173 }
00174
00175
00176
00177 if (!(lP = Data.GetLine()) || strncmp(Hdr0, lP, H0Len))
00178 return xmlErr("Stream does not start with '<statistics'.");
00179 Data.GetToken(); getVars(Data, vHead);
00180
00181
00182
00183 for (i = 0; vHead[i].Name; i++)
00184 {if (vHead[i].Data) oP = Add(oP, vHead[i].Name, vHead[i].Data);}
00185
00186
00187
00188 if (Host) oP = Add(oP, "host", Host);
00189
00190
00191
00192 if (!Data.GetLine()) return xmlErr("Null xml stream after header.");
00193
00194
00195
00196 while((tP = Data.GetToken()) && strcmp(tP, "/statistics"))
00197 { if (*tP == '/')
00198 {if (!xVar.Pop(strcmp("/stats", tP) ? tP+1 : 0))
00199 return xmlErr(tP, "invalid end for ", xVar.Var());
00200 }
00201 else if (*tP == '<')
00202 {if (strcmp("<stats", tP)) rc = xVar.Push(tP+1);
00203 else {getVars(Data, vStat);
00204 rc = (vStat[0].Data ? xVar.Push(vStat[0].Data)
00205 : xVar.Push(tP+1));
00206 }
00207 if (!rc) return xmlErr("Nesting too deep for ", xVar.Var());
00208 }
00209 else {if ((vP = index(tP, '<'))) *vP = '\0';
00210 if (*tP == '"')
00211 {i = strlen(tP)-1;
00212 if (*(tP+i) == '"') {*(tP+i) = '\0'; i = 1;}
00213 } else i = 0;
00214 oP = Add(oP, xVar.Var(), tP+i);
00215 if (vP) {*vP = '<';
00216 if (vP != tP) memset(tP, ' ', vP - tP);
00217 Data.RetToken();
00218 }
00219 }
00220 }
00221 if (!tP) return xmlErr("Missing '</statistics>' in xml stream.");
00222 getVars(Data, vTail);
00223 if (vTail[0].Data) oP = Add(oP, vTail[0].Name, vTail[0].Data);
00224 if (*(oP-1) == '&') oP--;
00225 *oP++ = '\n';
00226 return oP - obuff;
00227 }
00228
00229
00230
00231
00232
00233 char *XrdMpxXml::Add(char *Buff, const char *Var, const char *Val)
00234 {
00235 strcpy(Buff, Var); Buff += strlen(Var);
00236 *Buff++ = vSep;
00237 strcpy(Buff, Val); Buff += strlen(Val);
00238 *Buff++ = vSfx;
00239 return Buff;
00240 }
00241
00242
00243
00244
00245
00246
00247
00248 void XrdMpxXml::getVars(XrdOucTokenizer &Data, VarInfo Var[])
00249 {
00250 char *tVar, *tVal;
00251 int i;
00252
00253
00254
00255 i = 0;
00256 while(Var[i].Name) Var[i++].Data = 0;
00257
00258
00259
00260 while((tVar = Data.GetToken()) && *tVar != '<' && *tVar != '/')
00261 {if (!(tVal = (char *)index(tVar, '='))) continue;
00262 *tVal++ = '\0';
00263 if (*tVal == '"')
00264 {tVal++, i = strlen(tVal);
00265 if (*(tVal+i-1) == '"') *(tVal+i-1) = '\0';
00266 }
00267 i = 0;
00268 while(Var[i].Name)
00269 {if (!strcmp(Var[i].Name, tVar)) {Var[i].Data = tVal; break;}
00270 else i++;
00271 }
00272 }
00273 if (tVar && (*tVar == '<' || *tVar == '/')) Data.RetToken();
00274 }
00275
00276
00277
00278
00279
00280 int XrdMpxXml::xmlErr(const char *t1, const char *t2, const char *t3)
00281 {
00282 Say.Emsg(":", t1, t2, t3);
00283 return 0;
00284 }
00285
00286
00287
00288
00289
00290 class XrdMpxOut
00291 {
00292 public:
00293
00294 struct statsBuff
00295 {statsBuff *Next;
00296 struct sockaddr From;
00297 int Dlen;
00298 char Data[8190];
00299 char Pad[2];
00300 };
00301
00302 void Add(statsBuff *sbP);
00303
00304 statsBuff *getBuff();
00305
00306 void *Run(XrdMpxXml *xP);
00307
00308 XrdMpxOut() : Ready(0), inQ(0), Free(0) {}
00309 ~XrdMpxOut() {}
00310
00311 private:
00312
00313 XrdSysMutex myMutex;
00314 XrdSysSemaphore Ready;
00315
00316 statsBuff *inQ;
00317 statsBuff *Free;
00318 };
00319
00320
00321
00322
00323
00324 void XrdMpxOut::Add(statsBuff *sbP)
00325 {
00326
00327
00328
00329 myMutex.Lock();
00330 sbP->Next = inQ;
00331 inQ = sbP;
00332 Ready.Post();
00333 myMutex.UnLock();
00334 }
00335
00336
00337
00338
00339
00340 XrdMpxOut::statsBuff *XrdMpxOut::getBuff()
00341 {
00342 statsBuff *sbP;
00343
00344
00345
00346 myMutex.Lock();
00347 if ((sbP = Free)) Free = sbP->Next;
00348 else sbP = new statsBuff;
00349 myMutex.UnLock();
00350 return sbP;
00351 }
00352
00353
00354
00355
00356
00357 void *XrdMpxOut::Run(XrdMpxXml *xP)
00358 {
00359 char *bP, *Host=0, obuff[sizeof(statsBuff)*2];
00360 statsBuff *sbP;
00361 int wLen, rc;
00362
00363
00364
00365 while(1)
00366 {Ready.Wait();
00367 myMutex.Lock();
00368 if ((sbP = inQ)) inQ = sbP->Next;
00369 myMutex.UnLock();
00370 if (!sbP) continue;
00371 if (xP)
00372 {Host = (Opts & addSender ? XrdNetDNS::getHostName(sbP->From) : 0);
00373 wLen = xP->Format(Host, sbP->Data, obuff);
00374 bP = obuff;
00375 if (Host) free(Host);
00376 } else {
00377 bP = sbP->Data;
00378 *(bP + sbP->Dlen) = '\n';
00379 wLen = sbP->Dlen+1;
00380 }
00381
00382 while(wLen > 0)
00383 {do {rc = write(STDOUT_FILENO, bP, wLen);}
00384 while(rc < 0 && errno == EINTR);
00385 wLen -= rc; bP += rc;
00386 }
00387
00388 myMutex.Lock(); sbP->Next = Free; Free = sbP; myMutex.UnLock();
00389 }
00390
00391
00392
00393 return (void *)0;
00394 }
00395
00396
00397
00398
00399
00400 namespace XrdMpx
00401 {
00402 XrdMpxOut statsQ;
00403 };
00404
00405
00406
00407
00408
00409 void *mainOutput(void *parg)
00410 {
00411 XrdMpxXml *xP = static_cast<XrdMpxXml *>(parg);
00412 return statsQ.Run(xP);
00413 }
00414
00415
00416
00417
00418
00419 void Usage(int rc)
00420 {
00421 cerr <<"\nUsage: mpxstats [-f {cgi|flat|xml}] -p <port> [-s]" <<endl;
00422 exit(rc);
00423 }
00424
00425
00426
00427
00428
00429 int main(int argc, char *argv[])
00430 {
00431 extern char *optarg;
00432 extern int opterr, optopt;
00433 sigset_t myset;
00434 pthread_t tid;
00435 XrdMpxXml::fmtType fType = XrdMpxXml::fmtXML;
00436 XrdMpxOut::statsBuff *sbP = 0;
00437 XrdNetSocket mySocket(&Say);
00438 XrdMpxXml *xP = 0;
00439 SOCKLEN_t fromLen;
00440 int Port = 0, retc, udpFD;
00441 char buff[64], c;
00442
00443
00444
00445 opterr = 0; Debug = 0; Opts = 0;
00446 if (argc > 1 && '-' == *argv[1])
00447 while ((c = getopt(argc,argv,"df:p:s")) && ((unsigned char)c != 0xff))
00448 { switch(c)
00449 {
00450 case 'd': Debug = 1;
00451 break;
00452 case 'f': if (!strcmp(optarg, "cgi" )) fType = XrdMpxXml::fmtCGI;
00453 else if (!strcmp(optarg, "flat")) fType = XrdMpxXml::fmtFlat;
00454 else if (!strcmp(optarg, "xml" )) fType = XrdMpxXml::fmtXML;
00455 else {Say.Emsg(":", "Invalid format - ", optarg); Usage(1);}
00456 break;
00457 case 'h': Usage(0);
00458 case 'p': if (!(Port = atoi(optarg)))
00459 {Say.Emsg(":", "Invalid port number - ", optarg); Usage(1);}
00460 break;
00461 case 's': Opts |= addSender;
00462 break;
00463 default: sprintf(buff,"'%c'", optopt);
00464 if (c == ':') Say.Emsg(":", buff, "value not specified.");
00465 else Say.Emsg(0, buff, "option is invalid");
00466 Usage(1);
00467 }
00468 }
00469
00470
00471
00472 if (!Port) {Say.Emsg(":", "Port has not been specified."); Usage(1);}
00473
00474
00475
00476 signal(SIGPIPE, SIG_IGN);
00477 sigemptyset(&myset);
00478 sigaddset(&myset, SIGPIPE);
00479 sigaddset(&myset, SIGCHLD);
00480 pthread_sigmask(SIG_BLOCK, &myset, NULL);
00481
00482
00483
00484 if (sizeof(long) > 4) XrdSysThread::setStackSize((size_t)1048576);
00485 else XrdSysThread::setStackSize((size_t)786432);
00486
00487
00488
00489 if (mySocket.Open(0, Port, XRDNET_SERVER|XRDNET_UDPSOCKET, 0) < 0)
00490 {Say.Emsg(":", -mySocket.LastError(), "create udp socket"); exit(4);}
00491 udpFD = mySocket.Detach();
00492
00493
00494
00495 if (fType != XrdMpxXml::fmtXML) xP = new XrdMpxXml(fType);
00496
00497
00498
00499 if ((retc = XrdSysThread::Run(&tid, mainOutput, (void *)xP,
00500 XRDSYSTHREAD_BIND, "Output")))
00501 {Say.Emsg(":", retc, "create output thread"); exit(4);}
00502
00503
00504
00505 while(1)
00506 {sbP = statsQ.getBuff();
00507 fromLen = sizeof(sbP->From);
00508 retc = recvfrom(udpFD, sbP->Data, sizeof(sbP->Data), 0,
00509 &sbP->From, &fromLen);
00510 if (retc < 0) {Say.Emsg(":", retc, "recv udp message"); exit(8);}
00511 sbP->Dlen = retc;
00512 statsQ.Add(sbP);
00513 }
00514
00515
00516
00517 return 0;
00518 }