XrdMonCtrArchiver.cc

Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                           XrdMonCtrArchiver.cc                            */
00004 /*                                                                           */
00005 /* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00010 
00011 // $Id: XrdMonCtrArchiver.cc 24468 2008-06-22 16:47:03Z ganis $
00012 
00013 #include "XrdMon/XrdMonCommon.hh"
00014 #include "XrdMon/XrdMonCtrAdmin.hh"
00015 #include "XrdMon/XrdMonCtrArchiver.hh"
00016 #include "XrdMon/XrdMonCtrBuffer.hh"
00017 #include "XrdMon/XrdMonCtrPacket.hh"
00018 #include "XrdMon/XrdMonCtrWriter.hh"
00019 #include "XrdMon/XrdMonDecPacketDecoder.hh"
00020 #include "XrdMon/XrdMonErrors.hh"
00021 #include "XrdMon/XrdMonException.hh"
00022 #include "XrdMon/XrdMonSenderInfo.hh"
00023 
00024 #include "XrdSys/XrdSysHeaders.hh"
00025 
00026 #include <sys/time.h>
00027 
00028 using std::cout;
00029 using std::endl;
00030 
00031 int XrdMonCtrArchiver::_decHDFlushDelay = -1;
00032 int XrdMonCtrArchiver::_decRTFlushDelay = -1;
00033 
00034 XrdMonCtrArchiver::XrdMonCtrArchiver(const char* cBaseDir, 
00035                                      const char* dBaseDir,
00036                                      const char* rtLogDir,
00037                                      kXR_int64 maxLogSize,
00038                                      int ctrBufSize,
00039                                      int rtBufSize,
00040                                      bool onlineDec,
00041                                      bool rtDec)
00042     : _decoder(0), 
00043       _currentTime(0),
00044       _heartbeat(1) // force taking timestamp first time
00045 {
00046     XrdMonCtrWriter::setBaseDir(cBaseDir);
00047     XrdMonCtrWriter::setMaxLogSize(maxLogSize);
00048     XrdMonCtrWriter::setBufferSize(ctrBufSize);
00049     
00050     if ( onlineDec ) {
00051         _decoder = new XrdMonDecPacketDecoder(dBaseDir, rtLogDir, rtBufSize);
00052         // BTW, MT-safety inside Sink
00053         if ( 0 != pthread_create(&_decHDFlushThread, 
00054                                  0, 
00055                                  decHDFlushHeartBeat,
00056                                  (void*)_decoder) ) {
00057             throw XrdMonException(ERR_PTHREADCREATE, 
00058                                   "Failed to create thread");
00059         }
00060         if ( 0 != rtDec ) {
00061             if ( 0 != pthread_create(&_decRTFlushThread, 
00062                                      0, 
00063                                      decRTFlushHeartBeat,
00064                                      (void*)_decoder) ) {
00065                 throw XrdMonException(ERR_PTHREADCREATE, 
00066                                       "Failed to create thread");
00067             }
00068         }
00069     }
00070 }
00071 
00072 XrdMonCtrArchiver::~XrdMonCtrArchiver()
00073 {
00074     _decoder->flushRealTimeData();
00075     delete _decoder;
00076     _decoder = 0;
00077 
00078     // go through all writers and shut them down
00079     int i, s = _writers.size();
00080     for (i=0 ; i<s ; i++) {
00081         delete _writers[i];
00082     }
00083     _writers.clear();
00084 
00085     XrdMonSenderInfo::shutdown();
00086 }
00087 
00088 void
00089 XrdMonCtrArchiver::operator()()
00090 {
00091     XrdMonCtrBuffer* pb = XrdMonCtrBuffer::instance();
00092     while ( 1 ) {
00093         try {
00094             if ( 0 == --_heartbeat ) {
00095                 check4InactiveSenders();
00096             }
00097             XrdMonCtrPacket* p = pb->pop_front();
00098             archivePacket(p);
00099             delete p;
00100         } catch (XrdMonException& e) {
00101             if ( e.err() == SIG_SHUTDOWNNOW ) {
00102                 return;
00103             }
00104             e.printItOnce();
00105         }
00106     }
00107 }
00108 
00109 // this function runs in a separate thread, wakes up
00110 // every now and then and triggers "history data" flushing
00111 extern "C" void*
00112 decHDFlushHeartBeat(void* arg)
00113 {
00114     if ( XrdMonCtrArchiver::_decHDFlushDelay == -1 ) {
00115         return (void*)0; // should never happen
00116     }
00117     XrdMonDecPacketDecoder* myDecoder = (XrdMonDecPacketDecoder*) arg;
00118     if ( 0 == myDecoder ) {
00119         throw XrdMonException(ERR_PTHREADCREATE, 
00120                               "invalid archiver passed");
00121     }
00122     while ( 1 ) {
00123         sleep(XrdMonCtrArchiver::_decHDFlushDelay);
00124         myDecoder->flushHistoryData();
00125     }
00126 
00127     return (void*)0;
00128 }
00129 
00130 // this function runs in a separate thread, wakes up
00131 // every now and then and triggers "current data" flushing
00132 extern "C" void*
00133 decRTFlushHeartBeat(void* arg)
00134 {
00135     if ( XrdMonCtrArchiver::_decRTFlushDelay == -1 ) {
00136         return (void*)0; // should never happen
00137     }
00138     XrdMonDecPacketDecoder* myDecoder = (XrdMonDecPacketDecoder*) arg;
00139     if ( 0 == myDecoder ) {
00140         throw XrdMonException(ERR_PTHREADCREATE, 
00141                               "invalid archiver passed");
00142     }
00143     while ( 1 ) {
00144         sleep(XrdMonCtrArchiver::_decRTFlushDelay);
00145         if ( 0 != myDecoder ) {
00146             myDecoder->flushRealTimeData();
00147         }
00148     }
00149 
00150     return (void*)0;
00151 }
00152 
00153 void
00154 XrdMonCtrArchiver::check4InactiveSenders()
00155 {
00156     _heartbeat = TIMESTAMP_FREQ;
00157     struct timeval tv;
00158     gettimeofday(&tv, 0);
00159     _currentTime = tv.tv_sec;
00160     
00161     long allowed = _currentTime - MAX_INACTIVITY;
00162     int i, s = _writers.size();
00163     for (i=0 ; i<s ; i++) {
00164         if ( _writers[i]->lastActivity() < allowed ) {
00165             cout << "No activity for " << MAX_INACTIVITY << " sec., "
00166                  << "closing all files for sender " 
00167                  << XrdMonSenderInfo::id2HostPortStr(i) << endl;
00168             _writers[i]->forceClose();
00169         }
00170     }
00171 }
00172 
00173 void
00174 XrdMonCtrArchiver::archivePacket(XrdMonCtrPacket* p)
00175 {
00176     XrdMonHeader header;
00177     header.decode(p->buf);
00178 
00179     if ( XrdMonCtrAdmin::isAdminPacket(header) ) {
00180         kXR_int16 command = 0, arg = 0;
00181         XrdMonCtrAdmin::decodeAdminPacket(p->buf, command, arg);
00182         XrdMonCtrAdmin::doIt(command, arg);
00183         return;
00184     }
00185 
00186     senderid_t senderId = XrdMonSenderInfo::convert2Id(p->sender);
00187 
00188     XrdMonCtrWriter* w = 0;
00189     
00190     if ( _writers.size() <= senderId ) {
00191         w = new XrdMonCtrWriter(senderId, header.stod());
00192         _writers.push_back(w);
00193     } else {
00194         w = _writers[senderId];
00195         if ( w->prevStod() != header.stod() ) {
00196             cout << "\n* * * *   XRD RESTARTED for " 
00197                  << XrdMonSenderInfo::id2HostPortStr(senderId) 
00198                  << ": " << w->prevStod() << " != " << header.stod()
00199                  << "    * * * *\n" << endl;
00200             delete w;
00201             _writers[senderId] = w = 
00202                 new XrdMonCtrWriter(senderId, header.stod());
00203             _decoder->reset(senderId);
00204         }
00205     }
00206     
00207     w->operator()(p->buf, header, _currentTime);
00208 
00209     if ( 0 != _decoder ) {
00210         _decoder->operator()(header, p->buf, senderId);
00211     }    
00212 }

Generated on Tue Jul 5 14:46:38 2011 for ROOT_528-00b_version by  doxygen 1.5.1