00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 const char *XrdOfsEvsCVSID = "$Id: XrdOfsEvs.cc 35287 2010-09-14 21:19:35Z ganis $";
00015
00016 #include <ctype.h>
00017 #include <stdarg.h>
00018 #include <stddef.h>
00019 #include <stdlib.h>
00020 #include <stdio.h>
00021 #include <string.h>
00022 #include <sys/stat.h>
00023
00024 #include "XrdOfs/XrdOfsEvs.hh"
00025 #include "XrdSys/XrdSysError.hh"
00026 #include "XrdOuc/XrdOucProg.hh"
00027 #include "XrdOuc/XrdOucStream.hh"
00028 #include "XrdNet/XrdNetOpts.hh"
00029 #include "XrdNet/XrdNetSocket.hh"
00030 #include "XrdSys/XrdSysPlatform.hh"
00031
00032
00033
00034
00035
00036 class XrdOfsEvsMsg
00037 {
00038 public:
00039
00040 XrdOfsEvsMsg *next;
00041 char *text;
00042 int tlen;
00043 int isBig;
00044
00045 XrdOfsEvsMsg(char *tval=0, int big=0)
00046 {text = tval; tlen=0; isBig = big; next=0;}
00047
00048 ~XrdOfsEvsMsg() {if (text) free(text);}
00049 };
00050
00051
00052
00053
00054
00055 void *XrdOfsEvsSend(void *pp)
00056 {
00057 XrdOfsEvs *evs = (XrdOfsEvs *)pp;
00058 evs->sendEvents();
00059 return (void *)0;
00060 }
00061
00062
00063
00064
00065
00066 XrdOfsEvsFormat XrdOfsEvs::MsgFmt[XrdOfsEvs::nCount];
00067
00068 const int XrdOfsEvs::minMsgSize;
00069 const int XrdOfsEvs::maxMsgSize;
00070
00071
00072
00073
00074
00075 void XrdOfsEvsFormat::Def(evFlags theFlags, const char *Fmt, ...)
00076 {
00077 va_list ap;
00078 int theVal, i = 0;
00079
00080
00081
00082 if (Format) return;
00083
00084
00085
00086 Flags = theFlags;
00087 Format = Fmt;
00088 memset(Args, 0, sizeof(Args));
00089
00090
00091
00092 va_start(ap, Fmt);
00093 while((theVal = va_arg(ap, int)) >= 0)
00094 Args[i++] = static_cast<XrdOfsEvsInfo::evArg>(theVal);
00095 va_end(ap);
00096 }
00097
00098
00099
00100
00101
00102 XrdOfsEvs::XrdOfsEvs(Event theEvents, const char *Target, int minq, int maxq)
00103 {
00104
00105
00106
00107 enEvents = static_cast<Event>(theEvents & enMask);
00108 endIT = 0;
00109 theTarget = strdup(Target);
00110 eDest = 0;
00111 theProg = 0;
00112 maxMin = minq; maxMax = maxq;
00113 msgFirst = msgLast = msgFreeMax = msgFreeMin = 0;
00114 numMax = numMin = 0;
00115 tid = 0;
00116 msgFD = -1;
00117
00118
00119
00120
00121
00122
00123 MsgFmt[Chmod & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s chmod %s %s\n",
00124 XrdOfsEvsInfo::evTID,
00125 XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1);
00126
00127
00128 MsgFmt[Closer & Mask].Def(XrdOfsEvsFormat::Null, "%s closer %s\n",
00129 XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1);
00130
00131
00132
00133 MsgFmt[Closew & Mask].Def(XrdOfsEvsFormat::Null, "%s closew %s\n",
00134 XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1);
00135
00136
00137
00138 MsgFmt[Create & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s create %s %s\n",
00139 XrdOfsEvsInfo::evTID,
00140 XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1);
00141
00142
00143 MsgFmt[Mkdir & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s mkdir %s %s\n",
00144 XrdOfsEvsInfo::evTID,
00145 XrdOfsEvsInfo::evFMODE, XrdOfsEvsInfo::evLFN1, -1);
00146
00147
00148 MsgFmt[Mv & Mask].Def(XrdOfsEvsFormat::Null, "%s mv %s %s\n",
00149 XrdOfsEvsInfo::evTID,
00150 XrdOfsEvsInfo::evLFN1, XrdOfsEvsInfo::evLFN2, -1);
00151
00152
00153 MsgFmt[Openr & Mask].Def(XrdOfsEvsFormat::Null, "%s openr %s\n",
00154 XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1);
00155
00156
00157
00158 MsgFmt[Openw & Mask].Def(XrdOfsEvsFormat::Null, "%s openw %s\n",
00159 XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1);
00160
00161
00162
00163 MsgFmt[Rm & Mask].Def(XrdOfsEvsFormat::Null, "%s rm %s\n",
00164 XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1);
00165
00166
00167
00168 MsgFmt[Rmdir & Mask].Def(XrdOfsEvsFormat::Null, "%s rmdir %s\n",
00169 XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1);
00170
00171
00172
00173 MsgFmt[Trunc & Mask].Def(XrdOfsEvsFormat::cvtFSize,"%s trunc %s\n",
00174 XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evFSIZE,-1);
00175
00176
00177
00178 MsgFmt[Fwrite & Mask].Def(XrdOfsEvsFormat::Null, "%s fwrite %s\n",
00179 XrdOfsEvsInfo::evTID, XrdOfsEvsInfo::evLFN1, -1);
00180 }
00181
00182
00183
00184
00185
00186 XrdOfsEvs::~XrdOfsEvs()
00187 {
00188 XrdOfsEvsMsg *tp;
00189
00190
00191
00192
00193
00194 endIT = 1;
00195 if (tid) XrdSysThread::Kill(tid);
00196
00197
00198
00199 qMut.Lock();
00200 while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;}
00201 if (theTarget) free(theTarget);
00202 if (msgFD >= 0)close(msgFD);
00203 if (theProg) delete theProg;
00204 qMut.UnLock();
00205
00206
00207
00208 fMut.Lock();
00209 while ((tp = msgFreeMax)) {msgFreeMax = tp->next; delete tp;}
00210 while ((tp = msgFreeMin)) {msgFreeMin = tp->next; delete tp;}
00211 fMut.UnLock();
00212 }
00213
00214
00215
00216
00217
00218 void XrdOfsEvs::Notify(Event eID, XrdOfsEvsInfo &Info)
00219 {
00220 static int warnings = 0;
00221 XrdOfsEvsFormat *fP;
00222 XrdOfsEvsMsg *tp;
00223 char modebuff[8], sizebuff[16];
00224 int eNum, isBig = (eID & Mv), msgSize = (isBig ? maxMsgSize : minMsgSize);
00225
00226
00227
00228 eNum = eID & Mask;
00229 if (eNum < 0 || eNum >= nCount) return;
00230
00231
00232
00233 fP = &MsgFmt[eNum];
00234 if (fP->Flags & XrdOfsEvsFormat::cvtMode)
00235 {sprintf(modebuff, "%o", static_cast<int>((Info.FMode() & S_IAMB)));
00236 Info.Set(XrdOfsEvsInfo::evFMODE, modebuff);
00237 } else Info.Set(XrdOfsEvsInfo::evFMODE, "$FMODE");
00238 if (fP->Flags & XrdOfsEvsFormat::cvtFSize)
00239 {sprintf(sizebuff, "%lld", Info.FSize());
00240 Info.Set(XrdOfsEvsInfo::evFSIZE, sizebuff);
00241 } else Info.Set(XrdOfsEvsInfo::evFSIZE, "$FSIZE");
00242
00243
00244
00245 if (!(tp = getMsg(isBig)))
00246 {if ((++warnings & 0xff) == 1)
00247 eDest->Emsg("Notify", "Ran out of message objects;", eName(eNum),
00248 "event notification not sent.");
00249 return;
00250 }
00251
00252
00253
00254 tp->tlen = fP->SNP(Info, tp->text, msgSize);
00255
00256
00257
00258 tp->next = 0;
00259 qMut.Lock();
00260 if (msgLast) {msgLast->next = tp; msgLast = tp;}
00261 else msgFirst = msgLast = tp;
00262 qMut.UnLock();
00263 qSem.Post();
00264 }
00265
00266
00267
00268
00269
00270 int XrdOfsEvs::Parse(XrdSysError &Eroute, XrdOfsEvs::Event eNum, char *mText)
00271 {
00272 static struct valVar {const char *vname;
00273 XrdOfsEvsInfo::evArg vnum;
00274 XrdOfsEvsFormat::evFlags vopt;}
00275 Vars[] = {
00276 {"TID", XrdOfsEvsInfo::evTID, XrdOfsEvsFormat::Null},
00277 {"LFN", XrdOfsEvsInfo::evLFN1, XrdOfsEvsFormat::Null},
00278 {"LFN1", XrdOfsEvsInfo::evLFN1, XrdOfsEvsFormat::Null},
00279 {"CGI", XrdOfsEvsInfo::evCGI1, XrdOfsEvsFormat::Null},
00280 {"CGI1", XrdOfsEvsInfo::evCGI1, XrdOfsEvsFormat::Null},
00281 {"LFN2", XrdOfsEvsInfo::evLFN2, XrdOfsEvsFormat::Null},
00282 {"CGI2", XrdOfsEvsInfo::evCGI2, XrdOfsEvsFormat::Null},
00283 {"FMODE", XrdOfsEvsInfo::evFMODE, XrdOfsEvsFormat::cvtMode},
00284 {"FSIZE", XrdOfsEvsInfo::evFSIZE, XrdOfsEvsFormat::cvtFSize}
00285 };
00286 int numvars = sizeof(Vars)/sizeof(struct valVar);
00287 char parms[1024], *pP = parms;
00288 char *pE = parms+sizeof(parms)-((XrdOfsEvsInfo::evARGS*2)-8);
00289 char varbuff[16], *bVar, *eVar;
00290 int i, j, aNum = 0, Args[XrdOfsEvsInfo::evARGS] = {0};
00291 XrdOfsEvsFormat::evFlags ArgOpts = XrdOfsEvsFormat::freeFmt;
00292
00293
00294
00295 parms[0] = '\0';
00296 while(*mText && pP < pE)
00297 {if (*mText == '\\' && *(mText+1) == '$')
00298 {*pP++ = '$'; mText += 2; continue;}
00299 else if (*mText != '$') {*pP++ = *mText++; continue;}
00300 bVar = mText+1;
00301 if (*mText == '{') {eVar = index(mText, '}'); j = 1;}
00302 else if (*mText == '[') {eVar = index(mText, ']'); j = 1;}
00303 else {eVar = bVar; while(isalpha(*eVar)) eVar++; j = 0;}
00304 i = eVar - bVar;
00305 if (i < 1 || i >= (int)sizeof(varbuff))
00306 {Eroute.Emsg("Parse","Invalid notifymsg variable starting at",mText);
00307 return 1;
00308 }
00309 strncpy(varbuff, bVar, i); varbuff[i] = '\0';
00310 for (i = 0; i < numvars; i++)
00311 if (!strcmp(varbuff, Vars[i].vname)) break;
00312 if (i >= numvars)
00313 {Eroute.Emsg("Parse", "Unknown notifymsg variable -",varbuff);
00314 return 1;
00315 }
00316 if (aNum >= XrdOfsEvsInfo::evARGS)
00317 {Eroute.Say("Parse", "Too many notifymsg variables"); return 1;}
00318 strcpy(pP, "%s"); pP += 2;
00319 Args[aNum++] = Vars[i].vnum;
00320 ArgOpts = static_cast<XrdOfsEvsFormat::evFlags>(ArgOpts|Vars[i].vopt);
00321 mText = eVar+j;
00322 }
00323
00324
00325
00326 if (pP >= pE)
00327 {Eroute.Emsg("Parse","notifymsg text too long");return 1;}
00328 if (!parms[0])
00329 {Eroute.Emsg("Parse","notifymsg text not specified");return 1;}
00330
00331
00332
00333 strcpy(pP, "\n");
00334 eNum = static_cast<Event>(eNum & Mask);
00335 MsgFmt[eNum].Set(ArgOpts, strdup(parms), Args);
00336
00337
00338
00339 return 0;
00340 }
00341
00342
00343
00344
00345
00346 void XrdOfsEvs::sendEvents(void)
00347 {
00348 XrdOfsEvsMsg *tp;
00349 const char *theData[2] = {0,0};
00350 int theDlen[2] = {0,0};
00351
00352
00353
00354
00355
00356 while(1)
00357 {qSem.Wait();
00358 qMut.Lock();
00359 if (endIT) break;
00360 if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0;
00361 qMut.UnLock();
00362 if (tp)
00363 {if (!theProg) Feed(tp->text, tp->tlen);
00364 else {theData[0] = tp->text; theDlen[0] = tp->tlen;
00365 theProg->Feed(theData, theDlen);
00366 }
00367 retMsg(tp);
00368 }
00369 }
00370 qMut.UnLock();
00371 }
00372
00373
00374
00375
00376
00377 int XrdOfsEvs::Start(XrdSysError *eobj)
00378 {
00379 int rc;
00380
00381
00382
00383 eDest = eobj;
00384
00385
00386
00387 if (*theTarget == '>')
00388 {XrdNetSocket *msgSock;
00389 if (!(msgSock = XrdNetSocket::Create(eobj,theTarget+1,0,0660,XRDNET_FIFO)))
00390 return -1;
00391 msgFD = msgSock->Detach();
00392 delete msgSock;
00393
00394 } else {
00395
00396
00397
00398 if (theProg) return 0;
00399 theProg = new XrdOucProg(eobj);
00400
00401
00402
00403 if (theProg->Setup(theTarget, eobj)) return -1;
00404 if ((rc = theProg->Start()))
00405 {eobj->Emsg("Evs", rc, "start event collector"); return -1;}
00406 }
00407
00408
00409
00410 if ((rc = XrdSysThread::Run(&tid, XrdOfsEvsSend, static_cast<void *>(this),
00411 0, "Event notification sender")))
00412 {eobj->Emsg("Evs", rc, "create event notification thread");
00413 return -1;
00414 }
00415
00416
00417
00418 return 0;
00419 }
00420
00421
00422
00423
00424
00425
00426
00427
00428 const char *XrdOfsEvs::eName(int eNum)
00429 {
00430 static const char *eventName[] = {"Chmod", "closer", "closew", "create",
00431 "fwrite", "mkdir", "mv", "openr",
00432 "opnw", "rm", "rmdir", "trunc"};
00433
00434 eNum = (eNum & Mask);
00435 return (eNum < 0 || eNum >= nCount ? "?" : eventName[eNum]);
00436 }
00437
00438
00439
00440
00441
00442 int XrdOfsEvs::Feed(const char *data, int dlen)
00443 {
00444 int retc;
00445
00446
00447
00448 do { retc = write(msgFD, (const void *)data, (size_t)dlen);}
00449 while (retc < 0 && errno == EINTR);
00450 if (retc < 0)
00451 {eDest->Emsg("EvsFeed", errno, "write to event socket", theTarget);
00452 return -1;
00453 }
00454
00455
00456
00457 return 0;
00458 }
00459
00460
00461
00462
00463
00464 XrdOfsEvsMsg *XrdOfsEvs::getMsg(int bigmsg)
00465 {
00466 XrdOfsEvsMsg *tp;
00467 int msz = 0;
00468
00469
00470
00471 fMut.Lock();
00472
00473
00474
00475 if (bigmsg)
00476 if ((tp = msgFreeMax)) msgFreeMax = tp->next;
00477 else msz = maxMsgSize;
00478 else if ((tp = msgFreeMin)) msgFreeMin = tp->next;
00479 else msz = minMsgSize;
00480
00481
00482
00483 if (!tp && (numMax + numMin) < (maxMax + maxMin))
00484 {if ((tp = new XrdOfsEvsMsg((char *)malloc(msz), bigmsg)))
00485 {if (!(tp->text)) {delete tp; tp = 0;}
00486 else if (bigmsg) numMax++;
00487 else numMin++;
00488 }
00489 }
00490
00491
00492
00493 fMut.UnLock();
00494 return tp;
00495 }
00496
00497
00498
00499
00500
00501 void XrdOfsEvs::retMsg(XrdOfsEvsMsg *tp)
00502 {
00503
00504
00505
00506 fMut.Lock();
00507
00508
00509
00510 if (tp->isBig)
00511 if (numMax > maxMax) {delete tp; numMax--;}
00512 else {tp->next = msgFreeMax; msgFreeMax = tp;}
00513 else
00514 if (numMin > maxMin) {delete tp; numMin--;}
00515 else {tp->next = msgFreeMin; msgFreeMin = tp;}
00516
00517
00518
00519 fMut.UnLock();
00520 }