00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdBwmLoggerCVSID = "$Id: XrdBwmLogger.cc 30949 2009-11-02 16:37:58Z ganis $";
00014
00015 #include <ctype.h>
00016 #include <stddef.h>
00017 #include <stdlib.h>
00018 #include <stdio.h>
00019 #include <string.h>
00020
00021 #include "XrdBwm/XrdBwmLogger.hh"
00022 #include "XrdSys/XrdSysError.hh"
00023 #include "XrdOuc/XrdOucProg.hh"
00024 #include "XrdOuc/XrdOucStream.hh"
00025 #include "XrdNet/XrdNetOpts.hh"
00026 #include "XrdNet/XrdNetSocket.hh"
00027 #include "XrdSys/XrdSysPlatform.hh"
00028
00029
00030
00031
00032
00033 class XrdBwmLoggerMsg
00034 {
00035 public:
00036
00037 static const int msgSize = 2048;
00038
00039 XrdBwmLoggerMsg *next;
00040 char Text[msgSize];
00041 int Tlen;
00042
00043 XrdBwmLoggerMsg() : next(0), Tlen(0) {}
00044
00045 ~XrdBwmLoggerMsg() {}
00046 };
00047
00048
00049
00050
00051
00052 void *XrdBwmLoggerSend(void *pp)
00053 {
00054 XrdBwmLogger *lP = (XrdBwmLogger *)pp;
00055 lP->sendEvents();
00056 return (void *)0;
00057 }
00058
00059
00060
00061
00062
00063 XrdBwmLogger::XrdBwmLogger(const char *Target)
00064 {
00065
00066
00067
00068 theTarget = strdup(Target);
00069 eDest = 0;
00070 theProg = 0;
00071 msgFirst = msgLast = msgFree = 0;
00072 tid = 0;
00073 msgFD = 0;
00074 endIT = 0;
00075 theEOL= '\n';
00076 }
00077
00078
00079
00080
00081
00082 XrdBwmLogger::~XrdBwmLogger()
00083 {
00084 XrdBwmLoggerMsg *tp;
00085
00086
00087
00088
00089
00090 endIT = 1;
00091 if (tid) XrdSysThread::Kill(tid);
00092
00093
00094
00095 qMut.Lock();
00096 while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;}
00097 if (theTarget) free(theTarget);
00098 if (msgFD >= 0) close(msgFD);
00099 if (theProg) delete theProg;
00100 qMut.UnLock();
00101
00102
00103
00104 fMut.Lock();
00105 while ((tp = msgFree)) {msgFree = tp->next; delete tp;}
00106 fMut.UnLock();
00107 }
00108
00109
00110
00111
00112
00113 void XrdBwmLogger::Event(Info &eInfo)
00114 {
00115 static int warnings = 0;
00116 XrdBwmLoggerMsg *tp;
00117
00118
00119
00120 if (!(tp = getMsg()))
00121 {if ((++warnings & 0xff) == 1)
00122 eDest->Emsg("Notify", "Ran out of logger message objects;",
00123 eInfo.Tident, "event not logged.");
00124 return;
00125 }
00126
00127
00128
00129 tp->Tlen = snprintf(tp->Text, XrdBwmLoggerMsg::msgSize,
00130 "<stats id=\"bwm\"><tid>%s</tid><lfn>%s</lfn>"
00131 "<lcl>%s</lcl><rmt>%s</rmt><flow>%c</flow>"
00132 "<at>%ld</at><bt>%ld</bt><ct>%ld</ct>"
00133 "<iq>%d</iq><oq>%d</oq><xq>%d</xq>"
00134 "<sz>%lld<sz><esec>%d</esec></stats>%c",
00135 eInfo.Tident, eInfo.Lfn, eInfo.lclNode, eInfo.rmtNode,
00136 eInfo.Flow, eInfo.ATime, eInfo.BTime, eInfo.CTime,
00137 eInfo.numqIn, eInfo.numqOut, eInfo.numqXeq, eInfo.Size,
00138 eInfo.ESec, theEOL);
00139
00140
00141
00142 tp->next = 0;
00143 qMut.Lock();
00144 if (msgLast) {msgLast->next = tp; msgLast = tp;}
00145 else msgFirst = msgLast = tp;
00146 qMut.UnLock();
00147 qSem.Post();
00148 }
00149
00150
00151
00152
00153
00154 void XrdBwmLogger::sendEvents(void)
00155 {
00156 XrdBwmLoggerMsg *tp;
00157 const char *theData[2] = {0,0};
00158 int theDlen[2] = {0,0};
00159
00160
00161
00162
00163
00164 while(1)
00165 {qSem.Wait();
00166 qMut.Lock();
00167 if (endIT) break;
00168 if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0;
00169 qMut.UnLock();
00170 if (tp)
00171 {if (!theProg) Feed(tp->Text, tp->Tlen);
00172 else {theData[0] = tp->Text; theDlen[0] = tp->Tlen;
00173 theProg->Feed(theData, theDlen);
00174
00175 }
00176 retMsg(tp);
00177 }
00178 }
00179 qMut.UnLock();
00180 }
00181
00182
00183
00184
00185
00186 int XrdBwmLogger::Start(XrdSysError *eobj)
00187 {
00188 int rc;
00189
00190
00191
00192 eDest = eobj;
00193
00194
00195
00196 if (!strcmp("*", theTarget)) {msgFD = -1; theEOL = '\0';}
00197 else if (*theTarget == '>')
00198 {XrdNetSocket *msgSock;
00199 if (!(msgSock = XrdNetSocket::Create(eobj, theTarget+1, 0, 0660,
00200 XRDNET_FIFO))) return -1;
00201 msgFD = msgSock->Detach();
00202 delete msgSock;
00203 }
00204 else {
00205
00206 if (theProg) return 0;
00207 theProg = new XrdOucProg(eobj);
00208
00209
00210
00211 if (theProg->Setup(theTarget, eobj)) return -1;
00212 if ((rc = theProg->Start()))
00213 {eobj->Emsg("Logger", rc, "start event collector"); return -1;}
00214 }
00215
00216
00217
00218 if ((rc = XrdSysThread::Run(&tid, XrdBwmLoggerSend, static_cast<void *>(this),
00219 0, "Log message sender")))
00220 {eobj->Emsg("Logger", rc, "create log message sender thread");
00221 return -1;
00222 }
00223
00224
00225
00226 return 0;
00227 }
00228
00229
00230
00231
00232
00233
00234
00235
00236 int XrdBwmLogger::Feed(const char *data, int dlen)
00237 {
00238 int retc;
00239
00240
00241
00242 if (msgFD < 0) {eDest->Say("", data); return 0;}
00243
00244
00245
00246 do { retc = write(msgFD, (const void *)data, (size_t)dlen);}
00247 while (retc < 0 && errno == EINTR);
00248 if (retc < 0)
00249 {eDest->Emsg("Feed", errno, "write to logger socket", theTarget);
00250 return -1;
00251 }
00252
00253
00254
00255 return 0;
00256 }
00257
00258
00259
00260
00261
00262 XrdBwmLoggerMsg *XrdBwmLogger::getMsg()
00263 {
00264 XrdBwmLoggerMsg *tp;
00265
00266
00267
00268 fMut.Lock();
00269
00270
00271
00272 if (msgsInQ >= maxmInQ) tp = 0;
00273 else {if ((tp = msgFree)) msgFree = tp->next;
00274 else tp = new XrdBwmLoggerMsg();
00275 msgsInQ++;
00276 }
00277
00278
00279
00280 fMut.UnLock();
00281 return tp;
00282 }
00283
00284
00285
00286
00287
00288 void XrdBwmLogger::retMsg(XrdBwmLoggerMsg *tp)
00289 {
00290
00291
00292
00293 fMut.Lock();
00294 tp->next = msgFree;
00295 msgFree = tp;
00296 msgsInQ--;
00297 fMut.UnLock();
00298 }