00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdXrootdMonitorCVSID = "$Id: XrdXrootdMonitor.cc 34000 2010-06-21 06:49:56Z ganis $";
00014
00015 #include <errno.h>
00016 #include <stdlib.h>
00017 #include <time.h>
00018 #include <unistd.h>
00019 #include <sys/socket.h>
00020 #include <sys/types.h>
00021 #if !defined(__macos__) && !defined(__FreeBSD__)
00022 #include <malloc.h>
00023 #endif
00024
00025 #include "XrdNet/XrdNet.hh"
00026 #include "XrdNet/XrdNetDNS.hh"
00027 #include "XrdNet/XrdNetPeer.hh"
00028 #include "XrdSys/XrdSysError.hh"
00029 #include "XrdSys/XrdSysPlatform.hh"
00030
00031 #include "Xrd/XrdScheduler.hh"
00032 #include "XrdXrootd/XrdXrootdMonitor.hh"
00033 #include "XrdXrootd/XrdXrootdTrace.hh"
00034
00035
00036
00037
00038
00039 XrdScheduler *XrdXrootdMonitor::Sched = 0;
00040 XrdSysError *XrdXrootdMonitor::eDest = 0;
00041 int XrdXrootdMonitor::monFD;
00042 char *XrdXrootdMonitor::Dest1 = 0;
00043 int XrdXrootdMonitor::monMode1 = 0;
00044 struct sockaddr XrdXrootdMonitor::InetAddr1;
00045 char *XrdXrootdMonitor::Dest2 = 0;
00046 int XrdXrootdMonitor::monMode2 = 0;
00047 struct sockaddr XrdXrootdMonitor::InetAddr2;
00048 XrdXrootdMonitor *XrdXrootdMonitor::altMon = 0;
00049 XrdSysMutex XrdXrootdMonitor::windowMutex;
00050 kXR_int32 XrdXrootdMonitor::startTime = 0;
00051 int XrdXrootdMonitor::monBlen = 0;
00052 int XrdXrootdMonitor::lastEnt = 0;
00053 int XrdXrootdMonitor::isEnabled = 0;
00054 int XrdXrootdMonitor::numMonitor = 0;
00055 int XrdXrootdMonitor::autoFlush = 600;
00056 int XrdXrootdMonitor::FlushTime = 0;
00057 kXR_int32 XrdXrootdMonitor::currWindow = 0;
00058 kXR_int32 XrdXrootdMonitor::sizeWindow = 60;
00059 char XrdXrootdMonitor::monINFO = 0;
00060 char XrdXrootdMonitor::monIO = 0;
00061 char XrdXrootdMonitor::monFILE = 0;
00062 char XrdXrootdMonitor::monSTAGE = 0;
00063 char XrdXrootdMonitor::monUSER = 0;
00064
00065
00066
00067
00068
00069 extern XrdOucTrace *XrdXrootdTrace;
00070
00071
00072
00073
00074
00075
00076
00077
00078 class XrdXrootdMonitor_Tick : public XrdJob
00079 {
00080 public:
00081
00082 void DoIt() {
00083 #ifndef NODEBUG
00084 const char *TraceID = "MonTick";
00085 #endif
00086 time_t Now = XrdXrootdMonitor::Tick();
00087 if (Window && Now)
00088 Sched->Schedule((XrdJob *)this, Now+Window);
00089 else {TRACE(DEBUG, "Monitor clock stopping.");}
00090 }
00091
00092 void Set(XrdScheduler *sp, int intvl) {Sched = sp; Window = intvl;}
00093
00094 XrdXrootdMonitor_Tick() : XrdJob("monitor window clock")
00095 {Sched = 0; Window = 0;}
00096 ~XrdXrootdMonitor_Tick() {}
00097
00098 private:
00099 XrdScheduler *Sched;
00100 int Window;
00101 };
00102
00103
00104
00105
00106
00107 class XrdXrootdMonitorLock
00108 {
00109 public:
00110
00111 static void Lock() {monLock.Lock();}
00112
00113 static void UnLock() {monLock.UnLock();}
00114
00115 XrdXrootdMonitorLock(XrdXrootdMonitor *theMonitor)
00116 {if (theMonitor != XrdXrootdMonitor::altMon) unLock = 0;
00117 else {unLock = 1; monLock.Lock();}
00118 }
00119 ~XrdXrootdMonitorLock() {if (unLock) monLock.UnLock();}
00120
00121 private:
00122
00123 static XrdSysMutex monLock;
00124 char unLock;
00125 };
00126
00127 XrdSysMutex XrdXrootdMonitorLock::monLock;
00128
00129
00130
00131
00132
00133 XrdXrootdMonitor::XrdXrootdMonitor()
00134 {
00135 kXR_int32 localWindow;
00136
00137
00138
00139 windowMutex.Lock();
00140 localWindow = currWindow;
00141 windowMutex.UnLock();
00142
00143
00144
00145 if (!(monBuff = (XrdXrootdMonBuff *)memalign(getpagesize(), monBlen)))
00146 eDest->Emsg("Monitor", "Unable to allocate monitor buffer.");
00147 else {nextEnt = 1;
00148 monBuff->info[0].arg0.rTot[0] = 0;
00149 monBuff->info[0].arg0.id[0] = XROOTD_MON_WINDOW;
00150 monBuff->info[0].arg1.Window =
00151 monBuff->info[0].arg2.Window =
00152 static_cast<kXR_int32>(ntohl(localWindow));
00153 }
00154 }
00155
00156
00157
00158
00159
00160 XrdXrootdMonitor::~XrdXrootdMonitor()
00161 {
00162
00163 if (monBuff) {Flush(); free(monBuff);}
00164 }
00165
00166
00167
00168
00169
00170 void XrdXrootdMonitor::appID(char *id)
00171 {
00172
00173
00174
00175 if (this == altMon || !*id) return;
00176
00177
00178
00179 if (lastWindow != currWindow) Mark();
00180 else if (nextEnt == lastEnt) Flush();
00181 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_APPID;
00182 strncpy((char *)&monBuff->info[nextEnt].arg0.id[4], id,
00183 sizeof(XrdXrootdMonTrace)-4);
00184 }
00185
00186
00187
00188
00189
00190 XrdXrootdMonitor *XrdXrootdMonitor::Alloc(int force)
00191 {
00192 XrdXrootdMonitor *mp;
00193 int lastVal;
00194
00195
00196
00197
00198 if (!isEnabled || (isEnabled < 0 && !force)) mp = 0;
00199 else if (!monIO) mp = altMon;
00200 else if ((mp = new XrdXrootdMonitor()))
00201 if (!(mp->monBuff)) {delete mp; mp = 0;}
00202
00203
00204
00205 if (mp && isEnabled < 0)
00206 {windowMutex.Lock();
00207 lastVal = numMonitor; numMonitor++;
00208 if (!lastVal) startClock();
00209 windowMutex.UnLock();
00210 }
00211
00212
00213
00214 return mp;
00215 }
00216
00217
00218
00219
00220
00221 void XrdXrootdMonitor::Close(kXR_unt32 dictid, long long rTot, long long wTot)
00222 {
00223 XrdXrootdMonitorLock mLock(this);
00224 unsigned int rVal, wVal;
00225
00226
00227
00228 if (lastWindow != currWindow) Mark();
00229 else if (nextEnt == lastEnt) Flush();
00230 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_CLOSE;
00231 monBuff->info[nextEnt].arg0.id[1] = do_Shift(rTot, rVal);
00232 monBuff->info[nextEnt].arg0.rTot[1] = htonl(rVal);
00233 monBuff->info[nextEnt].arg0.id[2] = do_Shift(wTot, wVal);
00234 monBuff->info[nextEnt].arg0.id[3] = 0;
00235 monBuff->info[nextEnt].arg1.wTot = htonl(wVal);
00236 monBuff->info[nextEnt++].arg2.dictid = dictid;
00237
00238
00239
00240 if (altMon && this != altMon) altMon->Dup(&monBuff->info[nextEnt-1]);
00241 }
00242
00243
00244
00245
00246
00247 void XrdXrootdMonitor::Disc(kXR_unt32 dictid, int csec)
00248 {
00249 XrdXrootdMonitorLock mLock(this);
00250
00251
00252
00253 if (this != altMon && monUSER == 1 && altMon)
00254 {altMon->Disc(dictid, csec); return;}
00255
00256
00257
00258 if (lastWindow != currWindow) Mark();
00259 else if (nextEnt == lastEnt) Flush();
00260 monBuff->info[nextEnt].arg0.rTot[0] = 0;
00261 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_DISC;
00262 monBuff->info[nextEnt].arg1.wTot = htonl(csec);
00263 monBuff->info[nextEnt++].arg2.dictid = dictid;
00264
00265
00266
00267 if (altMon && this != altMon && monUSER == 3)
00268 altMon->Dup(&monBuff->info[nextEnt-1]);
00269 }
00270
00271
00272
00273
00274
00275 void XrdXrootdMonitor::Defaults(char *dest1, int mode1, char *dest2, int mode2)
00276 {
00277 int mmode;
00278
00279
00280
00281 if (!dest1)
00282 {mode1 = (dest1 = dest2) ? mode2 : 0;
00283 dest2 = 0; mode2 = 0;
00284 } else if (!dest2) mode2 = 0;
00285
00286
00287
00288
00289 if (Dest1) free(Dest1);
00290 Dest1 = dest1; monMode1 = mode1;
00291 if (Dest2) free(Dest2);
00292 Dest2 = dest2; monMode2 = mode2;
00293
00294
00295
00296 mmode = mode1 | mode2;
00297 isEnabled = (mmode & XROOTD_MON_ALL ? 1 :-1);
00298 monIO = (mmode & XROOTD_MON_IO ? 1 : 0);
00299 monINFO = (mmode & XROOTD_MON_INFO ? 1 : 0);
00300 monFILE = (mmode & XROOTD_MON_FILE ? 1 : 0) | monIO;
00301 monSTAGE = (mmode & XROOTD_MON_STAGE? 1 : 0);
00302 monUSER = (mmode & XROOTD_MON_USER ? 1 : 0);
00303
00304
00305
00306 if (((mode1 & XROOTD_MON_IO) && (mode1 & XROOTD_MON_USER))
00307 || ((mode2 & XROOTD_MON_IO) && (mode2 & XROOTD_MON_USER)))
00308 {if ((!(mode1 & XROOTD_MON_IO) && (mode1 & XROOTD_MON_USER))
00309 || (!(mode2 & XROOTD_MON_IO) && (mode2 & XROOTD_MON_USER))) monUSER = 3;
00310 else monUSER = 2;
00311 }
00312
00313
00314
00315 if (Dest1 == 0 && Dest2 == 0) isEnabled = 0;
00316 }
00317
00318
00319
00320 void XrdXrootdMonitor::Defaults(int msz, int wsz, int flush)
00321 {
00322
00323
00324
00325 sizeWindow = (wsz <= 0 ? 60 : wsz);
00326 autoFlush = (flush <= 0 ? 600 : flush);
00327
00328
00329
00330 if (msz <= 0) msz = 8192;
00331 else if (msz < 1024) msz = 1024;
00332 lastEnt = (msz-sizeof(XrdXrootdMonHeader))/sizeof(XrdXrootdMonTrace);
00333 monBlen = (lastEnt*sizeof(XrdXrootdMonTrace))+sizeof(XrdXrootdMonHeader);
00334 lastEnt--;
00335 startTime = htonl(time(0));
00336 }
00337
00338
00339
00340
00341
00342 void XrdXrootdMonitor::Dup(XrdXrootdMonTrace *mrec)
00343 {
00344 XrdXrootdMonitorLock mLock(this);
00345
00346
00347
00348 if (lastWindow != currWindow) Mark();
00349 else if (nextEnt == lastEnt) Flush();
00350 memcpy(&monBuff->info[nextEnt],(const void *)mrec,sizeof(XrdXrootdMonTrace));
00351 nextEnt++;
00352 }
00353
00354
00355
00356
00357
00358 int XrdXrootdMonitor::Init(XrdScheduler *sp, XrdSysError *errp)
00359 {
00360 XrdNet myNetwork(errp, 0);
00361 XrdNetPeer monDest;
00362 char *etext;
00363
00364
00365
00366 Sched = sp;
00367 eDest = errp;
00368
00369
00370
00371 if (!isEnabled) return 1;
00372
00373
00374
00375 if (!myNetwork.Relay(monDest, Dest1, XRDNET_SENDONLY)) return 0;
00376 monFD = monDest.fd;
00377
00378
00379
00380 if (!XrdNetDNS::Host2Dest(Dest1, InetAddr1, &etext))
00381 {eDest->Emsg("Monitor", "setup monitor collector;", etext);
00382 return 0;
00383 }
00384
00385
00386
00387 if (Dest2 && !XrdNetDNS::Host2Dest(Dest2, InetAddr2, &etext))
00388 {eDest->Emsg("Monitor", "setup monitor collector;", etext);
00389 return 0;
00390 }
00391
00392
00393
00394
00395 if ((monMode1 && !(monMode1 & XROOTD_MON_IO))
00396 || (monMode2 && !(monMode2 & XROOTD_MON_IO)))
00397 if (!(altMon = new XrdXrootdMonitor()) || !altMon->monBuff)
00398 {if (altMon) {delete altMon; altMon = 0;}
00399 eDest->Emsg("Monitor","allocate monitor; insufficient storage.");
00400 return 0;
00401 }
00402
00403
00404
00405 if (isEnabled > 0) startClock();
00406
00407
00408
00409 return 1;
00410 }
00411
00412
00413
00414
00415
00416 kXR_unt32 XrdXrootdMonitor::Map(const char code,
00417 const char *uname, const char *path)
00418 {
00419 static XrdSysMutex seqMutex;
00420 static unsigned int monSeqID = 1;
00421 XrdXrootdMonMap map;
00422 int size, montype;
00423 unsigned int mySeqID;
00424
00425
00426
00427 seqMutex.Lock();
00428 mySeqID = monSeqID++;
00429 seqMutex.UnLock();
00430
00431
00432
00433 map.dictid = htonl(mySeqID);
00434 strcpy(map.info, uname);
00435 size = strlen(uname);
00436 if (path)
00437 {*(map.info+size) = '\n';
00438 strlcpy(map.info+size+1, path, sizeof(map.info)-size-1);
00439 size = size + strlen(path) + 1;
00440 }
00441
00442
00443
00444 size = sizeof(XrdXrootdMonHeader)+sizeof(kXR_int32)+size;
00445 fillHeader(&map.hdr, code, size);
00446
00447
00448
00449 if (code == XROOTD_MON_MAPUSER) montype = XROOTD_MON_USER;
00450 else if (code == XROOTD_MON_MAPPATH) montype = XROOTD_MON_PATH;
00451 else if (code == XROOTD_MON_MAPSTAG) montype = XROOTD_MON_STAGE;
00452 else montype = XROOTD_MON_INFO;
00453 Send(montype, (void *)&map, size);
00454
00455
00456
00457 return map.dictid;
00458 }
00459
00460
00461
00462
00463
00464
00465 void XrdXrootdMonitor::Open(kXR_unt32 dictid, off_t fsize)
00466 {
00467 XrdXrootdMonitorLock mLock(this);
00468
00469 if (lastWindow != currWindow) Mark();
00470 else if (nextEnt == lastEnt) Flush();
00471 h2nll(fsize, monBuff->info[nextEnt].arg0.val);
00472 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_OPEN;
00473 monBuff->info[nextEnt].arg1.buflen = 0;
00474 monBuff->info[nextEnt++].arg2.dictid = dictid;
00475
00476
00477
00478 if (altMon && this != altMon) altMon->Dup(&monBuff->info[nextEnt-1]);
00479 }
00480
00481
00482
00483
00484
00485 time_t XrdXrootdMonitor::Tick()
00486 {
00487 time_t Now;
00488 windowMutex.Lock();
00489 Now = time(0);
00490 currWindow = static_cast<kXR_int32>(Now);
00491 if (isEnabled < 0 && !numMonitor) Now = 0;
00492 windowMutex.UnLock();
00493
00494
00495
00496 if (altMon && currWindow >= FlushTime)
00497 {XrdXrootdMonitorLock::Lock();
00498 if (currWindow >= FlushTime)
00499 {if (altMon->nextEnt > 1) altMon->Flush();
00500 else FlushTime = currWindow + autoFlush;
00501 }
00502 XrdXrootdMonitorLock::UnLock();
00503 }
00504
00505
00506
00507 return Now;
00508 }
00509
00510
00511
00512
00513
00514 void XrdXrootdMonitor::unAlloc(XrdXrootdMonitor *monp)
00515 {
00516
00517
00518
00519 if (monp != altMon) delete monp;
00520
00521
00522
00523 if (isEnabled < 0)
00524 {windowMutex.Lock();
00525 numMonitor--;
00526 windowMutex.UnLock();
00527 }
00528 }
00529
00530
00531
00532
00533
00534
00535
00536
00537 unsigned char XrdXrootdMonitor::do_Shift(long long xTot, unsigned int &xVal)
00538 {
00539 const long long smask = 0x7fffffff00000000LL;
00540 unsigned char xshift = 0;
00541
00542 while(xTot & smask) {xTot = xTot >> 1LL; xshift++;}
00543 xVal = static_cast<unsigned int>(xTot);
00544
00545 return xshift;
00546 }
00547
00548
00549
00550
00551
00552 void XrdXrootdMonitor::fillHeader(XrdXrootdMonHeader *hdr,
00553 const char id, int size)
00554 { static XrdSysMutex seqMutex;
00555 static int seq = 0;
00556 int myseq;
00557
00558
00559
00560 seqMutex.Lock();
00561 myseq = 0x00ff & (seq++);
00562 seqMutex.UnLock();
00563
00564
00565
00566 hdr->code = static_cast<kXR_char>(id);
00567 hdr->pseq = static_cast<kXR_char>(myseq);
00568 hdr->plen = htons(static_cast<uint16_t>(size));
00569 hdr->stod = startTime;
00570 }
00571
00572
00573
00574
00575
00576 void XrdXrootdMonitor::Flush()
00577 {
00578 int size;
00579 kXR_int32 localWindow, now;
00580
00581
00582
00583 if (nextEnt <= 1) return;
00584
00585
00586
00587
00588 windowMutex.Lock();
00589 localWindow = currWindow;
00590 windowMutex.UnLock();
00591
00592
00593
00594 size = (nextEnt+1)*sizeof(XrdXrootdMonTrace)+sizeof(XrdXrootdMonHeader);
00595 fillHeader(&monBuff->hdr, XROOTD_MON_MAPTRCE, size);
00596
00597
00598
00599 if (monBuff->info[0].arg2.Window != localWindow) now = localWindow;
00600 else now = localWindow + sizeWindow;
00601
00602
00603
00604 monBuff->info[nextEnt].arg0.rTot[0] = 0;
00605 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_WINDOW;
00606 monBuff->info[nextEnt].arg1.Window =
00607 monBuff->info[nextEnt].arg2.Window = htonl(now);
00608
00609 if (this != altMon) Send(XROOTD_MON_IO, (void *)monBuff, size);
00610 else {Send(XROOTD_MON_FILE, (void *)monBuff, size);
00611 FlushTime = localWindow + autoFlush;
00612 }
00613
00614 monBuff->info[0].arg0.rTot[0] = 0;
00615 monBuff->info[0].arg0.id[0] = XROOTD_MON_WINDOW;
00616 monBuff->info[0].arg1.Window =
00617 monBuff->info[0].arg2.Window = htonl(localWindow);
00618 nextEnt = 1;
00619 }
00620
00621
00622
00623
00624
00625 void XrdXrootdMonitor::Mark()
00626 {
00627 kXR_int32 localWindow;
00628
00629
00630
00631
00632 windowMutex.Lock();
00633 localWindow = currWindow;
00634 windowMutex.UnLock();
00635
00636
00637
00638 if (monBuff->info[nextEnt-1].arg0.id[0] == XROOTD_MON_WINDOW)
00639 monBuff->info[nextEnt-1].arg2.Window =
00640 static_cast<kXR_int32>(ntohl(localWindow));
00641 else if (nextEnt+8 > lastEnt) Flush();
00642 else {monBuff->info[nextEnt].arg0.rTot[0] = 0;
00643 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_WINDOW;
00644 monBuff->info[nextEnt].arg1.Window =
00645 static_cast<kXR_int32>(ntohl(lastWindow));
00646 monBuff->info[nextEnt].arg2.Window =
00647 static_cast<kXR_int32>(ntohl(localWindow));
00648 nextEnt++;
00649 }
00650 lastWindow = localWindow;
00651 }
00652
00653
00654
00655
00656
00657 int XrdXrootdMonitor::Send(int monMode, void *buff, int blen)
00658 {
00659 #ifndef NODEBUG
00660 const char *TraceID = "Monitor";
00661 #endif
00662 static XrdSysMutex sendMutex;
00663 int rc1, rc2;
00664
00665 sendMutex.Lock();
00666 if (monMode & monMode1)
00667 {rc1 = (int)sendto(monFD, buff, blen, 0,
00668 (const struct sockaddr *)&InetAddr1, sizeof(sockaddr));
00669 TRACE(DEBUG,blen <<" bytes sent to " <<Dest1 <<" rc=" <<(rc1 ? errno : 0));
00670 }
00671 else rc1 = 0;
00672 if (monMode & monMode2)
00673 {rc2 = (int)sendto(monFD, buff, blen, 0,
00674 (const struct sockaddr *)&InetAddr2, sizeof(sockaddr));
00675 TRACE(DEBUG,blen <<" bytes sent to " <<Dest2 <<" rc=" <<(rc2 ? errno : 0));
00676 }
00677 else rc2 = 0;
00678 sendMutex.UnLock();
00679
00680 return (rc1 > rc2 ? rc1 : rc2);
00681 }
00682
00683
00684
00685
00686
00687 void XrdXrootdMonitor::startClock()
00688 {
00689 static XrdXrootdMonitor_Tick MonTick;
00690 time_t Now;
00691
00692
00693
00694 Now = time(0);
00695 currWindow = static_cast<kXR_int32>(Now);
00696 MonTick.Set(Sched, sizeWindow);
00697 FlushTime = autoFlush + currWindow;
00698 if (Sched) Sched->Schedule((XrdJob *)&MonTick, Now+sizeWindow);
00699 }