XrdOfsEvs.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                          X r d O f s E v s . c c                           */
00004 /*                                                                            */
00005 /* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /*             Based on code developed by Derek Feichtinger, CERN.            */
00010 /******************************************************************************/
00011   
00012 //         $Id: XrdOfsEvs.cc 35287 2010-09-14 21:19:35Z ganis $
00013 
00014 const char *XrdOfsEvsCVSID = "$Id: XrdOfsEvs.cc 35287 2010-09-14 21:19:35Z ganis $";
00015 
00016 #include <ctype.h>
00017 #include <stdarg.h>
00018 #include <stddef.h>
00019 #include <stdlib.h>
00020 #include <stdio.h>
00021 #include <string.h>
00022 #include <sys/stat.h>
00023 
00024 #include "XrdOfs/XrdOfsEvs.hh"
00025 #include "XrdSys/XrdSysError.hh"
00026 #include "XrdOuc/XrdOucProg.hh"
00027 #include "XrdOuc/XrdOucStream.hh"
00028 #include "XrdNet/XrdNetOpts.hh"
00029 #include "XrdNet/XrdNetSocket.hh"
00030 #include "XrdSys/XrdSysPlatform.hh"
00031 
00032 /******************************************************************************/
00033 /*                         L o c a l   C l a s s e s                          */
00034 /******************************************************************************/
00035 
00036 class XrdOfsEvsMsg
00037 {
00038 public:
00039 
00040 XrdOfsEvsMsg *next;
00041 char         *text;
00042 int           tlen;
00043 int           isBig;
00044 
00045              XrdOfsEvsMsg(char *tval=0, int big=0)
00046                         {text = tval; tlen=0; isBig = big; next=0;}
00047 
00048             ~XrdOfsEvsMsg() {if (text) free(text);}
00049 };
00050 
00051 /******************************************************************************/
00052 /*                     E x t e r n a l   L i n k a g e s                      */
00053 /******************************************************************************/
00054   
00055 void *XrdOfsEvsSend(void *pp)
00056 {
00057      XrdOfsEvs *evs = (XrdOfsEvs *)pp;
00058      evs->sendEvents();
00059      return (void *)0;
00060 }
00061   
00062 /******************************************************************************/
00063 /*                    S t a t i c   D e f i n i t i o n s                     */
00064 /******************************************************************************/
00065 
00066 XrdOfsEvsFormat XrdOfsEvs::MsgFmt[XrdOfsEvs::nCount];
00067 
00068 const int       XrdOfsEvs::minMsgSize;
00069 const int       XrdOfsEvs::maxMsgSize;
00070 
00071 /******************************************************************************/
00072 /*                     X r d E v s F o r m a t : : D e f                      */
00073 /******************************************************************************/
00074   
00075 void XrdOfsEvsFormat::Def(evFlags theFlags, const char *Fmt, ...)
00076 {
00077    va_list ap;
00078    int theVal, i = 0;
00079 
00080 // Return if already defined
00081 //
00082    if (Format) return;
00083 
00084 // Set flags and format. Prepare the arg vector
00085 //
00086    Flags = theFlags; 
00087    Format = Fmt;
00088    memset(Args, 0, sizeof(Args));
00089 
00090 // Pick up all arguments
00091 //
00092    va_start(ap, Fmt);
00093    while((theVal = va_arg(ap, int)) >= 0) 
00094         Args[i++] = static_cast<XrdOfsEvsInfo::evArg>(theVal);
00095    va_end(ap);
00096 }
00097 
00098 /******************************************************************************/
00099 /*                           C o n s t r u c t o r                            */
00100 /******************************************************************************/
00101   
00102 XrdOfsEvs::XrdOfsEvs(Event theEvents, const char *Target, int minq, int maxq)
00103 {
00104 
00105 // Set common variables
00106 //
00107    enEvents = static_cast<Event>(theEvents & enMask);
00108    endIT = 0;
00109    theTarget = strdup(Target);
00110    eDest = 0; 
00111    theProg = 0;
00112    maxMin = minq; maxMax = maxq;
00113    msgFirst = msgLast = msgFreeMax = msgFreeMin = 0;
00114    numMax = numMin = 0; 
00115    tid = 0;
00116    msgFD = -1;
00117 
00118 // Initialize all static format entries that have not been initialized yet.
00119 // Note that format may be specified prior to this object being created!
00120 //
00121 // <tid> chmod  <mode> <path>
00122 //
00123    MsgFmt[Chmod  & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s chmod %s %s\n",
00124                              XrdOfsEvsInfo::evTID,
00125                              XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1);
00126 // <tid> closer <path>
00127 //
00128    MsgFmt[Closer & Mask].Def(XrdOfsEvsFormat::Null,    "%s closer %s\n",
00129                              XrdOfsEvsInfo::evTID,   XrdOfsEvsInfo::evLFN1, -1);
00130                                               
00131 // <tid> closew <path>
00132 //
00133    MsgFmt[Closew & Mask].Def(XrdOfsEvsFormat::Null,    "%s closew %s\n",
00134                              XrdOfsEvsInfo::evTID,   XrdOfsEvsInfo::evLFN1, -1);
00135                                               
00136 // <tid> create <mode> <path>
00137 //
00138    MsgFmt[Create & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s create %s %s\n",
00139                              XrdOfsEvsInfo::evTID,
00140                              XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1);
00141 // <tid> mkdir  <mode> <path>
00142 //
00143    MsgFmt[Mkdir  & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s mkdir %s %s\n",
00144                              XrdOfsEvsInfo::evTID,
00145                              XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1);
00146 // <tid> mv     <path> <path>
00147 //
00148    MsgFmt[Mv     & Mask].Def(XrdOfsEvsFormat::Null,    "%s mv %s %s\n",
00149                              XrdOfsEvsInfo::evTID,
00150                              XrdOfsEvsInfo::evLFN1,  XrdOfsEvsInfo::evLFN2, -1);
00151 // <tid> openr  <path>
00152 //
00153    MsgFmt[Openr  & Mask].Def(XrdOfsEvsFormat::Null,    "%s openr %s\n",
00154                              XrdOfsEvsInfo::evTID,   XrdOfsEvsInfo::evLFN1, -1);
00155                                               
00156 // <tid> openw  <path>
00157 //
00158    MsgFmt[Openw  & Mask].Def(XrdOfsEvsFormat::Null,    "%s openw %s\n",
00159                              XrdOfsEvsInfo::evTID,   XrdOfsEvsInfo::evLFN1, -1);
00160                                               
00161 // <tid> rm     <path>
00162 //
00163    MsgFmt[Rm     & Mask].Def(XrdOfsEvsFormat::Null,    "%s rm %s\n",
00164                              XrdOfsEvsInfo::evTID,   XrdOfsEvsInfo::evLFN1, -1);
00165                                               
00166 // <tid> rmdir  <path>
00167 //
00168    MsgFmt[Rmdir  & Mask].Def(XrdOfsEvsFormat::Null,    "%s rmdir %s\n",
00169                              XrdOfsEvsInfo::evTID,   XrdOfsEvsInfo::evLFN1, -1);
00170                                               
00171 // <tid> trunc  <size>
00172 //
00173    MsgFmt[Trunc  & Mask].Def(XrdOfsEvsFormat::cvtFSize,"%s trunc %s\n",
00174                              XrdOfsEvsInfo::evTID,   XrdOfsEvsInfo::evFSIZE,-1);
00175                                               
00176 // <tid> fwrite <path>
00177 //
00178    MsgFmt[Fwrite & Mask].Def(XrdOfsEvsFormat::Null,    "%s fwrite %s\n",
00179                              XrdOfsEvsInfo::evTID,   XrdOfsEvsInfo::evLFN1, -1);
00180 }
00181 
00182 /******************************************************************************/
00183 /*                            D e s t r u c t o r                             */
00184 /******************************************************************************/
00185 
00186 XrdOfsEvs::~XrdOfsEvs()
00187 {
00188   XrdOfsEvsMsg *tp;
00189 
00190 // Kill the notification thread. This may cause a msg block to be orphaned
00191 // but, in practice, this object does not really get deleted after being 
00192 // started. So, the problem is moot.
00193 //
00194    endIT = 1;
00195    if (tid) XrdSysThread::Kill(tid);
00196 
00197 // Release all queued message bocks
00198 //
00199   qMut.Lock();
00200   while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;}
00201   if (theTarget) free(theTarget);
00202   if (msgFD >= 0)close(msgFD);
00203   if (theProg)   delete theProg;
00204   qMut.UnLock();
00205 
00206 // Release all free message blocks
00207 //
00208   fMut.Lock();
00209   while ((tp = msgFreeMax)) {msgFreeMax = tp->next; delete tp;}
00210   while ((tp = msgFreeMin)) {msgFreeMin = tp->next; delete tp;}
00211   fMut.UnLock();
00212 }
00213 
00214 /******************************************************************************/
00215 /*                                N o t i f y                                 */
00216 /******************************************************************************/
00217   
00218 void XrdOfsEvs::Notify(Event eID, XrdOfsEvsInfo &Info)
00219 {
00220    static int warnings = 0;
00221    XrdOfsEvsFormat *fP;
00222    XrdOfsEvsMsg *tp;
00223    char modebuff[8], sizebuff[16];
00224    int eNum, isBig = (eID & Mv), msgSize = (isBig ? maxMsgSize : minMsgSize);
00225 
00226 // Validate event number and set event name
00227 //
00228    eNum = eID & Mask;
00229    if (eNum < 0 || eNum >= nCount) return;
00230 
00231 // Check if we need to do any conversions
00232 //
00233    fP = &MsgFmt[eNum];
00234    if (fP->Flags & XrdOfsEvsFormat::cvtMode)
00235       {sprintf(modebuff, "%o", static_cast<int>((Info.FMode() & S_IAMB)));
00236        Info.Set(XrdOfsEvsInfo::evFMODE, modebuff);
00237       } else Info.Set(XrdOfsEvsInfo::evFMODE, "$FMODE");
00238    if (fP->Flags & XrdOfsEvsFormat::cvtFSize)
00239       {sprintf(sizebuff, "%lld", Info.FSize());
00240        Info.Set(XrdOfsEvsInfo::evFSIZE, sizebuff);
00241       } else Info.Set(XrdOfsEvsInfo::evFSIZE, "$FSIZE");
00242 
00243 // Get a message block
00244 //
00245    if (!(tp = getMsg(isBig)))
00246       {if ((++warnings & 0xff) == 1)
00247           eDest->Emsg("Notify", "Ran out of message objects;", eName(eNum),
00248                                 "event notification not sent.");
00249           return;
00250       }
00251 
00252 // Format the message
00253 //
00254    tp->tlen = fP->SNP(Info, tp->text, msgSize);
00255 
00256 // Put the message on the queue and return
00257 //
00258    tp->next = 0;
00259    qMut.Lock();
00260    if (msgLast) {msgLast->next = tp; msgLast = tp;}
00261       else msgFirst = msgLast = tp;
00262    qMut.UnLock();
00263    qSem.Post();
00264 }
00265 
00266 /******************************************************************************/
00267 /*                                 P a r s e                                  */
00268 /******************************************************************************/
00269   
00270 int XrdOfsEvs::Parse(XrdSysError &Eroute, XrdOfsEvs::Event eNum, char *mText)
00271 {
00272     static struct valVar {const char              *vname;
00273                           XrdOfsEvsInfo::evArg     vnum;
00274                           XrdOfsEvsFormat::evFlags vopt;}
00275         Vars[] = {
00276         {"TID",     XrdOfsEvsInfo::evTID,   XrdOfsEvsFormat::Null},
00277         {"LFN",     XrdOfsEvsInfo::evLFN1,  XrdOfsEvsFormat::Null},
00278         {"LFN1",    XrdOfsEvsInfo::evLFN1,  XrdOfsEvsFormat::Null},
00279         {"CGI",     XrdOfsEvsInfo::evCGI1,  XrdOfsEvsFormat::Null},
00280         {"CGI1",    XrdOfsEvsInfo::evCGI1,  XrdOfsEvsFormat::Null},
00281         {"LFN2",    XrdOfsEvsInfo::evLFN2,  XrdOfsEvsFormat::Null},
00282         {"CGI2",    XrdOfsEvsInfo::evCGI2,  XrdOfsEvsFormat::Null},
00283         {"FMODE",   XrdOfsEvsInfo::evFMODE, XrdOfsEvsFormat::cvtMode},
00284         {"FSIZE",   XrdOfsEvsInfo::evFSIZE, XrdOfsEvsFormat::cvtFSize}
00285        };
00286    int numvars = sizeof(Vars)/sizeof(struct valVar);
00287    char parms[1024], *pP = parms;
00288    char *pE = parms+sizeof(parms)-((XrdOfsEvsInfo::evARGS*2)-8);
00289    char varbuff[16], *bVar, *eVar;
00290    int  i, j, aNum = 0, Args[XrdOfsEvsInfo::evARGS] = {0};
00291    XrdOfsEvsFormat::evFlags ArgOpts = XrdOfsEvsFormat::freeFmt;
00292 
00293 // Parse the text
00294 //
00295    parms[0] = '\0';
00296    while(*mText && pP < pE)
00297         {if (*mText == '\\' && *(mText+1) == '$')
00298             {*pP++ = '$'; mText += 2; continue;}
00299             else if (*mText != '$') {*pP++ = *mText++; continue;}
00300          bVar = mText+1;
00301               if (*mText == '{') {eVar = index(mText, '}'); j = 1;}
00302          else if (*mText == '[') {eVar = index(mText, ']'); j = 1;}
00303          else {eVar = bVar; while(isalpha(*eVar)) eVar++;   j = 0;}
00304          i = eVar - bVar;
00305          if (i < 1 || i >= (int)sizeof(varbuff))
00306             {Eroute.Emsg("Parse","Invalid notifymsg variable starting at",mText);
00307              return 1;
00308             }
00309          strncpy(varbuff, bVar, i); varbuff[i] = '\0';
00310          for (i = 0; i < numvars; i++)
00311              if (!strcmp(varbuff, Vars[i].vname)) break;
00312          if (i >= numvars)
00313             {Eroute.Emsg("Parse", "Unknown notifymsg variable -",varbuff);
00314              return 1;
00315             }
00316          if (aNum >= XrdOfsEvsInfo::evARGS)
00317             {Eroute.Say("Parse", "Too many notifymsg variables"); return 1;}
00318          strcpy(pP, "%s"); pP += 2;
00319          Args[aNum++] = Vars[i].vnum; 
00320          ArgOpts = static_cast<XrdOfsEvsFormat::evFlags>(ArgOpts|Vars[i].vopt);
00321          mText = eVar+j;
00322         }
00323 
00324 // Check if we overran the buffer or didn't have any text
00325 //
00326    if (pP >= pE)
00327       {Eroute.Emsg("Parse","notifymsg text too long");return 1;}
00328    if (!parms[0])
00329       {Eroute.Emsg("Parse","notifymsg text not specified");return 1;}
00330 
00331 // Set the format
00332 //
00333    strcpy(pP, "\n");
00334    eNum = static_cast<Event>(eNum & Mask);
00335    MsgFmt[eNum].Set(ArgOpts, strdup(parms), Args);
00336 
00337 // All done
00338 //
00339    return 0;
00340 }
00341 
00342 /******************************************************************************/
00343 /*                            s e n d E v e n t s                             */
00344 /******************************************************************************/
00345   
00346 void XrdOfsEvs::sendEvents(void)
00347 {
00348    XrdOfsEvsMsg *tp;
00349    const char *theData[2] = {0,0};
00350          int   theDlen[2] = {0,0};
00351 
00352 // This is an endless loop that just gets things off the event queue and
00353 // send them out. This allows us to only hang a simgle thread should the
00354 // receiver get blocked, instead of the whole process.
00355 //
00356    while(1)
00357         {qSem.Wait();
00358          qMut.Lock();
00359          if (endIT) break;
00360          if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0;
00361          qMut.UnLock();
00362          if (tp) 
00363             {if (!theProg) Feed(tp->text, tp->tlen);
00364                 else {theData[0] = tp->text; theDlen[0] = tp->tlen;
00365                       theProg->Feed(theData, theDlen);
00366                      }
00367              retMsg(tp);
00368             }
00369          }
00370    qMut.UnLock();
00371 }
00372 
00373 /******************************************************************************/
00374 /*                                 S t a r t                                  */
00375 /******************************************************************************/
00376   
00377 int XrdOfsEvs::Start(XrdSysError *eobj)
00378 {
00379    int rc;
00380 
00381 // Set the error object pointer
00382 //
00383    eDest = eobj;
00384 
00385 // Check if we need to create a socket to a path
00386 //
00387    if (*theTarget == '>')
00388       {XrdNetSocket *msgSock;
00389        if (!(msgSock = XrdNetSocket::Create(eobj,theTarget+1,0,0660,XRDNET_FIFO)))
00390           return -1;
00391        msgFD = msgSock->Detach();
00392        delete msgSock;
00393 
00394       } else {
00395 
00396       // Allocate a new program object if we don't have one
00397       //
00398          if (theProg) return 0;
00399          theProg = new XrdOucProg(eobj);
00400 
00401      // Setup the program
00402      //
00403         if (theProg->Setup(theTarget, eobj)) return -1;
00404         if ((rc = theProg->Start()))
00405            {eobj->Emsg("Evs", rc, "start event collector"); return -1;}
00406     }
00407 
00408 // Now start a thread to get messages and send them to the collector
00409 //
00410    if ((rc = XrdSysThread::Run(&tid, XrdOfsEvsSend, static_cast<void *>(this),
00411                           0, "Event notification sender")))
00412       {eobj->Emsg("Evs", rc, "create event notification thread");
00413        return -1;
00414       }
00415 
00416 // All done
00417 //
00418    return 0;
00419 }
00420 
00421 /******************************************************************************/
00422 /*                       P r i v a t e   M e t h o d s                        */
00423 /******************************************************************************/
00424 /******************************************************************************/
00425 /*                                 e N a m e                                  */
00426 /******************************************************************************/
00427   
00428 const char *XrdOfsEvs::eName(int eNum)
00429 {
00430   static const char *eventName[] = {"Chmod",  "closer", "closew", "create",
00431                                     "fwrite", "mkdir",  "mv",     "openr",
00432                                     "opnw",   "rm",     "rmdir",  "trunc"};
00433 
00434   eNum = (eNum & Mask);
00435   return (eNum < 0 || eNum >= nCount ? "?" : eventName[eNum]);
00436 }
00437 
00438 /******************************************************************************/
00439 /*                                  F e e d                                   */
00440 /******************************************************************************/
00441   
00442 int XrdOfsEvs::Feed(const char *data, int dlen)
00443 {
00444    int retc;
00445 
00446 // Write the data. ince this is a udp socket all the data goes or none does
00447 //
00448   do { retc = write(msgFD, (const void *)data, (size_t)dlen);}
00449        while (retc < 0 && errno == EINTR);
00450   if (retc < 0)
00451      {eDest->Emsg("EvsFeed", errno, "write to event socket", theTarget);
00452       return -1;
00453      }
00454 
00455 // All done
00456 //
00457    return 0;
00458 }
00459 
00460 /******************************************************************************/
00461 /*                                g e t M s g                                 */
00462 /******************************************************************************/
00463 
00464 XrdOfsEvsMsg *XrdOfsEvs::getMsg(int bigmsg)
00465 {
00466    XrdOfsEvsMsg *tp;
00467    int msz = 0;
00468 
00469 // Lock the free queue
00470 //
00471    fMut.Lock();
00472 
00473 // Get a free element from the big or small queue, as needed
00474 //
00475    if (bigmsg)
00476         if ((tp = msgFreeMax)) msgFreeMax = tp->next;
00477            else msz = maxMsgSize;
00478    else if ((tp = msgFreeMin)) msgFreeMin = tp->next;
00479            else msz = minMsgSize;
00480 
00481 // Check if we have to allocate a new item
00482 //
00483    if (!tp && (numMax + numMin) < (maxMax + maxMin))
00484       {if ((tp = new XrdOfsEvsMsg((char *)malloc(msz), bigmsg)))
00485           {if (!(tp->text)) {delete tp; tp = 0;}
00486               else if (bigmsg) numMax++;
00487                       else     numMin++;
00488           }
00489       }
00490 
00491 // Unlock and return result
00492 //
00493    fMut.UnLock();
00494    return tp;
00495 }
00496 
00497 /******************************************************************************/
00498 /*                                r e t M s g                                 */
00499 /******************************************************************************/
00500 
00501 void XrdOfsEvs::retMsg(XrdOfsEvsMsg *tp)
00502 {
00503 
00504 // Lock the free queue
00505 //
00506    fMut.Lock();
00507 
00508 // Check if we exceeded the hold quotax
00509 //
00510    if (tp->isBig)
00511       if (numMax > maxMax) {delete tp; numMax--;}
00512          else {tp->next = msgFreeMax; msgFreeMax = tp;}
00513       else
00514       if (numMin > maxMin) {delete tp; numMin--;}
00515          else {tp->next = msgFreeMin; msgFreeMin = tp;}
00516 
00517 // Unlock and return
00518 //
00519    fMut.UnLock();
00520 }

Generated on Tue Jul 5 14:46:44 2011 for ROOT_528-00b_version by  doxygen 1.5.1