XrdMpxStats.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                        X r d M p x S t a t s . c c                         */
00004 /*                                                                            */
00005 /* (c) 2009 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 /*   $Id: XrdMpxStats.cc 35287 2010-09-14 21:19:35Z ganis $           */
00012 
00013 const char *XrdMpxStatsCVSID = "$Id: XrdMpxStats.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <stdio.h>
00016 #include <unistd.h>
00017 #include <sys/types.h>
00018 #include <sys/socket.h>
00019 #include <sys/uio.h>
00020 
00021 #include "XrdNet/XrdNetDNS.hh"
00022 #include "XrdNet/XrdNetOpts.hh"
00023 #include "XrdNet/XrdNetSocket.hh"
00024 #include "XrdOuc/XrdOucTokenizer.hh"
00025 #include "XrdSys/XrdSysError.hh"
00026 #include "XrdSys/XrdSysLogger.hh"
00027 #include "XrdSys/XrdSysHeaders.hh"
00028 #include "XrdSys/XrdSysPlatform.hh"
00029 #include "XrdSys/XrdSysPthread.hh"
00030   
00031 /******************************************************************************/
00032 /*                      G l o b a l   V a r i a b l e s                       */
00033 /******************************************************************************/
00034 
00035 namespace XrdMpx
00036 {
00037        XrdSysLogger       Logger;
00038 
00039        XrdSysError        Say(&Logger, "mpxstats");
00040 
00041 static const int          addSender = 0x0001;
00042 
00043        int                Opts;
00044 
00045        int                Debug;
00046 };
00047 
00048 using namespace XrdMpx;
00049 
00050 /******************************************************************************/
00051 /*                         L o c a l   C l a s s e s                          */
00052 /******************************************************************************/
00053 /******************************************************************************/
00054 /*                             X r d M p x V a r                              */
00055 /******************************************************************************/
00056   
00057 class  XrdMpxVar
00058 {
00059 public:
00060 
00061       int   Pop(const char *vName);
00062 
00063       int   Push(const char *vName);
00064 
00065       void  Reset() {vEnd = vBuff; vNum = -1; *vBuff = 0;}
00066 
00067 const char *Var() {return vBuff;}
00068 
00069             XrdMpxVar() : vFence(vBuff + sizeof(vBuff) - 1) {Reset();}
00070            ~XrdMpxVar() {}
00071 
00072 private:
00073 
00074 static const int   vMax = 15;
00075              char *vEnd, *vFence, *vStack[vMax+1], vBuff[1024];
00076              int   vNum;
00077 };
00078 
00079 /******************************************************************************/
00080 /*                        X r d M p x V a r : : P o p                         */
00081 /******************************************************************************/
00082   
00083 int XrdMpxVar::Pop(const char *vName)
00084 {
00085     if (Debug) cerr <<"Pop:  " <<(vName ? vName : "") <<"; var=" <<vBuff <<endl;
00086     if (vNum < 0 || (vName && strcmp(vStack[vNum], vName))) return 0;
00087     vEnd = vStack[vNum]-1; *vEnd = '\0'; vNum--;
00088     return 1;
00089 }
00090 
00091 /******************************************************************************/
00092 /*                       X r d M p x V a r : : P u s h                        */
00093 /******************************************************************************/
00094   
00095 int XrdMpxVar::Push(const char *vName)
00096 {
00097    int n = strlen(vName);
00098 
00099    if (Debug) cerr <<"Push: " <<vName <<"; var=" <<vBuff <<endl;
00100    if (vNum >= vMax) return 0;
00101    if (vNum >= 0) *vEnd++ = '.';
00102       else         vEnd = vBuff;
00103    if (vEnd+n+1 >= vFence) return 0;
00104    strcpy(vEnd, vName);
00105    vStack[++vNum] = vEnd;
00106    vEnd += n;
00107    return 1;
00108 }
00109 
00110 /******************************************************************************/
00111 /*                             X r d M p x X m l                              */
00112 /******************************************************************************/
00113   
00114 class XrdMpxXml
00115 {
00116 public:
00117 
00118 enum fmtType {fmtCGI, fmtFlat, fmtXML};
00119 
00120 int Format(const char *Host, char *ibuff, char *obuff);
00121 
00122     XrdMpxXml(fmtType ft) : fType(ft)
00123                           {if (ft == fmtCGI) {vSep = '='; vSfx = '&';}
00124                               else           {vSep = ' '; vSfx = '\n';}
00125                           }
00126    ~XrdMpxXml() {}
00127 
00128 private:
00129 
00130 struct VarInfo
00131       {const char *Name;
00132              char *Data;
00133       };
00134 
00135 char *Add(char *Buff, const char *Var, const char *Val);
00136 void  getVars(XrdOucTokenizer &Data, VarInfo Var[]);
00137 int   xmlErr(const char *t1, const char *t2=0, const char *t3=0);
00138 
00139 fmtType fType;
00140 char    vSep;
00141 char    vSfx;
00142 };
00143 
00144 /******************************************************************************/
00145 /*                     X r d M p x X m l : : F o r m a t                      */
00146 /******************************************************************************/
00147   
00148 int XrdMpxXml::Format(const char *Host, char *ibuff, char *obuff)
00149 {
00150    static const char *Hdr0 = "<statistics ";
00151    static const int   H0Len = strlen(Hdr0);
00152 
00153    XrdMpxVar       xVar;
00154    XrdOucTokenizer Data(ibuff);
00155    VarInfo vHead[] = {{"tod", 0}, {"ver", 0}, {"src", 0}, {"tos", 0},
00156                       {"pgm", 0}, {"ins", 0}, {"pid", 0}, {0, 0}};
00157    VarInfo vStat[] = {{"id",  0}, {0, 0}};
00158    VarInfo vTail[] = {{"toe", 0}, {0, 0}};
00159    char *lP = ibuff, *oP = obuff, *tP, *vP;
00160    int i, rc;
00161 
00162 // Insert a newline for the first '>'
00163 //
00164    if (!(lP = (char *)index(lP, '>')))
00165       return xmlErr("Invalid xml stream: ", ibuff);
00166    *lP++ = '\n';
00167 
00168 // Now make the input tokenizable
00169 //
00170    while(*lP)
00171         {if (*lP == '>' || (*lP == '<' && *(lP+1) == '/')) *lP = ' ';
00172          lP++;
00173         }
00174 
00175 // The first token better be '<statistics'
00176 //
00177    if (!(lP = Data.GetLine()) || strncmp(Hdr0, lP, H0Len))
00178       return xmlErr("Stream does not start with '<statistics'.");
00179    Data.GetToken(); getVars(Data, vHead);
00180 
00181 // Output the vars in the headers as 'stats..var'
00182 //
00183    for (i = 0; vHead[i].Name; i++)
00184        {if (vHead[i].Data) oP = Add(oP, vHead[i].Name, vHead[i].Data);}
00185 
00186 // Add in the host name, if supplied
00187 //
00188    if (Host) oP = Add(oP, "host", Host);
00189 
00190 // Get the remainder
00191 //
00192    if (!Data.GetLine()) return xmlErr("Null xml stream after header.");
00193 
00194 // The following segment reads all of the "stats" entries
00195 //
00196    while((tP = Data.GetToken()) && strcmp(tP, "/statistics"))
00197         {     if (*tP == '/')
00198                  {if (!xVar.Pop(strcmp("/stats", tP) ? tP+1 : 0))
00199                      return xmlErr(tP, "invalid end for ", xVar.Var());
00200                  }
00201          else if (*tP == '<')
00202                  {if (strcmp("<stats", tP)) rc = xVar.Push(tP+1);
00203                      else {getVars(Data, vStat);
00204                            rc = (vStat[0].Data ? xVar.Push(vStat[0].Data)
00205                                                : xVar.Push(tP+1));
00206                           }
00207                   if (!rc) return xmlErr("Nesting too deep for ", xVar.Var());
00208                  }
00209          else    {if ((vP = index(tP, '<'))) *vP = '\0';
00210                   if (*tP == '"')
00211                      {i = strlen(tP)-1;
00212                       if (*(tP+i) == '"') {*(tP+i) = '\0'; i = 1;}
00213                      } else i = 0;
00214                   oP = Add(oP, xVar.Var(), tP+i);
00215                   if (vP) {*vP = '<';
00216                            if (vP != tP) memset(tP, ' ', vP - tP);
00217                            Data.RetToken();
00218                           }
00219                  }
00220         }
00221    if (!tP) return xmlErr("Missing '</statistics>' in xml stream.");
00222    getVars(Data, vTail);
00223    if (vTail[0].Data) oP = Add(oP, vTail[0].Name, vTail[0].Data);
00224    if (*(oP-1) == '&') oP--;
00225    *oP++ = '\n';
00226    return oP - obuff;
00227 }
00228 
00229 /******************************************************************************/
00230 /*                        X r d M p x X m l : : A d d                         */
00231 /******************************************************************************/
00232   
00233 char *XrdMpxXml::Add(char *Buff, const char *Var, const char *Val)
00234 {
00235    strcpy(Buff, Var); Buff += strlen(Var);
00236    *Buff++ = vSep;
00237    strcpy(Buff, Val); Buff += strlen(Val);
00238    *Buff++ = vSfx;
00239    return Buff;
00240 }
00241 
00242 /******************************************************************************/
00243 /*                                                                            */
00244 /*                    X r d M p x X m l : : g e t V a r s                     */
00245 /*                                                                            */
00246 /******************************************************************************/
00247   
00248 void XrdMpxXml::getVars(XrdOucTokenizer &Data, VarInfo Var[])
00249 {
00250    char *tVar, *tVal;
00251    int i;
00252 
00253 // Initialize the data pointers to null
00254 //
00255    i = 0;
00256    while(Var[i].Name) Var[i++].Data = 0;
00257 
00258 // Get all of the variables/values and return where possible
00259 //
00260    while((tVar = Data.GetToken()) && *tVar != '<' && *tVar != '/')
00261         {if (!(tVal = (char *)index(tVar, '='))) continue;
00262          *tVal++ = '\0';
00263          if (*tVal == '"')
00264             {tVal++, i = strlen(tVal);
00265              if (*(tVal+i-1) == '"') *(tVal+i-1) = '\0';
00266             }
00267          i = 0;
00268          while(Var[i].Name)
00269               {if (!strcmp(Var[i].Name, tVar)) {Var[i].Data = tVal; break;}
00270                   else i++;
00271               }
00272         }
00273    if (tVar && (*tVar == '<' || *tVar == '/')) Data.RetToken();
00274 }
00275 
00276 /******************************************************************************/
00277 /*                     X r d M p x X m l : : x m l E r r                      */
00278 /******************************************************************************/
00279   
00280 int XrdMpxXml::xmlErr(const char *t1, const char *t2, const char *t3)
00281 {
00282    Say.Emsg(":", t1, t2, t3);
00283    return 0;
00284 }
00285   
00286 /******************************************************************************/
00287 /*                             X r d M p x O u t                              */
00288 /******************************************************************************/
00289   
00290 class XrdMpxOut
00291 {
00292 public:
00293 
00294 struct statsBuff
00295       {statsBuff      *Next;
00296        struct sockaddr From;
00297        int             Dlen;
00298        char            Data[8190];
00299        char            Pad[2];
00300      };
00301 
00302 void       Add(statsBuff *sbP);
00303 
00304 statsBuff *getBuff();
00305 
00306 void      *Run(XrdMpxXml *xP);
00307 
00308            XrdMpxOut() : Ready(0), inQ(0), Free(0) {}
00309           ~XrdMpxOut() {}
00310 
00311 private:
00312 
00313 XrdSysMutex     myMutex;
00314 XrdSysSemaphore Ready;
00315 
00316 statsBuff      *inQ;
00317 statsBuff      *Free;
00318 };
00319 
00320 /******************************************************************************/
00321 /*                        X r d M p x O u t : : A d d                         */
00322 /******************************************************************************/
00323   
00324 void XrdMpxOut::Add(statsBuff *sbP)
00325 {
00326 
00327 // Add this to the queue and signal the processing thread
00328 //
00329    myMutex.Lock();
00330    sbP->Next = inQ;
00331    inQ = sbP;
00332    Ready.Post();
00333    myMutex.UnLock();
00334 }
00335 
00336 /******************************************************************************/
00337 /*                    X r d M p x O u t : : g e t B u f f                     */
00338 /******************************************************************************/
00339   
00340 XrdMpxOut::statsBuff *XrdMpxOut::getBuff()
00341 {
00342    statsBuff *sbP;
00343 
00344 // Use an available buffer or allocate one
00345 //
00346    myMutex.Lock();
00347    if ((sbP = Free)) Free = sbP->Next;
00348       else sbP = new statsBuff;
00349    myMutex.UnLock();
00350    return sbP;
00351 }
00352 
00353 /******************************************************************************/
00354 /*                        X r d M p x O u t : : R u n                         */
00355 /******************************************************************************/
00356   
00357 void *XrdMpxOut::Run(XrdMpxXml *xP)
00358 {
00359    char *bP, *Host=0, obuff[sizeof(statsBuff)*2];
00360    statsBuff *sbP;
00361    int wLen, rc;
00362 
00363 // Simply loop formating and outputing the buffers
00364 //
00365    while(1)
00366         {Ready.Wait();
00367          myMutex.Lock();
00368          if ((sbP = inQ)) inQ = sbP->Next;
00369          myMutex.UnLock();
00370          if (!sbP) continue;
00371          if (xP)
00372             {Host = (Opts & addSender ? XrdNetDNS::getHostName(sbP->From) : 0);
00373              wLen = xP->Format(Host, sbP->Data, obuff);
00374              bP = obuff;
00375              if (Host) free(Host);
00376             } else {
00377              bP = sbP->Data;
00378              *(bP + sbP->Dlen) = '\n';
00379              wLen = sbP->Dlen+1;
00380             }
00381 
00382          while(wLen > 0)
00383               {do {rc = write(STDOUT_FILENO, bP, wLen);}
00384                   while(rc < 0 && errno == EINTR);
00385                wLen -= rc; bP += rc;
00386               }
00387 
00388          myMutex.Lock(); sbP->Next = Free; Free = sbP; myMutex.UnLock();
00389         }
00390 
00391 // Should never get here
00392 //
00393    return (void *)0;
00394 }
00395 
00396 /******************************************************************************/
00397 /*                        G l o b a l   O b j e c t s                         */
00398 /******************************************************************************/
00399   
00400 namespace XrdMpx
00401 {
00402 XrdMpxOut statsQ;
00403 };
00404 
00405 /******************************************************************************/
00406 /*                     T h r e a d   I n t e r f a c e s                      */
00407 /******************************************************************************/
00408   
00409 void *mainOutput(void *parg)
00410 {
00411     XrdMpxXml *xP = static_cast<XrdMpxXml *>(parg);
00412     return statsQ.Run(xP);
00413 }
00414 
00415 /******************************************************************************/
00416 /*                                 U s a g e                                  */
00417 /******************************************************************************/
00418   
00419 void Usage(int rc)
00420 {
00421    cerr <<"\nUsage: mpxstats [-f {cgi|flat|xml}] -p <port> [-s]" <<endl;
00422    exit(rc);
00423 }
00424 
00425 /******************************************************************************/
00426 /*                                  m a i n                                   */
00427 /******************************************************************************/
00428   
00429 int main(int argc, char *argv[])
00430 {
00431    extern char *optarg;
00432    extern int opterr, optopt;
00433    sigset_t myset;
00434    pthread_t tid;
00435    XrdMpxXml::fmtType fType = XrdMpxXml::fmtXML;
00436    XrdMpxOut::statsBuff *sbP = 0;
00437    XrdNetSocket mySocket(&Say);
00438    XrdMpxXml *xP = 0;
00439    SOCKLEN_t fromLen;
00440    int Port = 0, retc, udpFD;
00441    char buff[64], c;
00442 
00443 // Process the options
00444 //
00445    opterr = 0; Debug = 0; Opts = 0;
00446    if (argc > 1 && '-' == *argv[1]) 
00447       while ((c = getopt(argc,argv,"df:p:s")) && ((unsigned char)c != 0xff))
00448      { switch(c)
00449        {
00450        case 'd': Debug = 1;
00451                  break;
00452        case 'f':      if (!strcmp(optarg, "cgi" )) fType = XrdMpxXml::fmtCGI;
00453                  else if (!strcmp(optarg, "flat")) fType = XrdMpxXml::fmtFlat;
00454                  else if (!strcmp(optarg, "xml" )) fType = XrdMpxXml::fmtXML;
00455                  else {Say.Emsg(":", "Invalid format - ", optarg); Usage(1);}
00456                  break;
00457        case 'h': Usage(0);
00458        case 'p': if (!(Port = atoi(optarg)))
00459                     {Say.Emsg(":", "Invalid port number - ", optarg); Usage(1);}
00460                  break;
00461        case 's': Opts |= addSender;
00462                  break;
00463        default:  sprintf(buff,"'%c'", optopt);
00464                  if (c == ':') Say.Emsg(":", buff, "value not specified.");
00465                     else Say.Emsg(0, buff, "option is invalid");
00466                  Usage(1);
00467        }
00468      }
00469 
00470 // Make sure port has been specified
00471 //
00472    if (!Port) {Say.Emsg(":", "Port has not been specified."); Usage(1);}
00473 
00474 // Turn off sigpipe and host a variety of others before we start any threads
00475 //
00476    signal(SIGPIPE, SIG_IGN);  // Solaris optimization
00477    sigemptyset(&myset);
00478    sigaddset(&myset, SIGPIPE);
00479    sigaddset(&myset, SIGCHLD);
00480    pthread_sigmask(SIG_BLOCK, &myset, NULL);
00481 
00482 // Set the default stack size here
00483 //
00484    if (sizeof(long) > 4) XrdSysThread::setStackSize((size_t)1048576);
00485       else               XrdSysThread::setStackSize((size_t)786432);
00486 
00487 // Create a UDP socket and bind it to a port
00488 //
00489    if (mySocket.Open(0, Port, XRDNET_SERVER|XRDNET_UDPSOCKET, 0) < 0)
00490       {Say.Emsg(":", -mySocket.LastError(), "create udp socket"); exit(4);}
00491    udpFD = mySocket.Detach();
00492 
00493 // Establish format
00494 //
00495    if (fType != XrdMpxXml::fmtXML) xP = new XrdMpxXml(fType);
00496 
00497 // Now run a thread to output whatever we get
00498 //
00499    if ((retc = XrdSysThread::Run(&tid, mainOutput, (void *)xP,
00500                                  XRDSYSTHREAD_BIND, "Output")))
00501       {Say.Emsg(":", retc, "create output thread"); exit(4);}
00502 
00503 // Now simply wait for the messages
00504 //
00505    while(1)
00506         {sbP = statsQ.getBuff();
00507          fromLen = sizeof(sbP->From);
00508          retc = recvfrom(udpFD, sbP->Data, sizeof(sbP->Data), 0,
00509                                &sbP->From, &fromLen);
00510          if (retc < 0) {Say.Emsg(":", retc, "recv udp message"); exit(8);}
00511          sbP->Dlen = retc;
00512          statsQ.Add(sbP);
00513         }
00514 
00515 // Should never get here
00516 //
00517    return 0;
00518 }

Generated on Tue Jul 5 14:46:16 2011 for ROOT_528-00b_version by  doxygen 1.5.1