00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "XrdMon/XrdMonCommon.hh"
00014 #include "XrdMon/XrdMonCtrAdmin.hh"
00015 #include "XrdMon/XrdMonCtrArchiver.hh"
00016 #include "XrdMon/XrdMonCtrBuffer.hh"
00017 #include "XrdMon/XrdMonCtrPacket.hh"
00018 #include "XrdMon/XrdMonCtrWriter.hh"
00019 #include "XrdMon/XrdMonDecPacketDecoder.hh"
00020 #include "XrdMon/XrdMonErrors.hh"
00021 #include "XrdMon/XrdMonException.hh"
00022 #include "XrdMon/XrdMonSenderInfo.hh"
00023
00024 #include "XrdSys/XrdSysHeaders.hh"
00025
00026 #include <sys/time.h>
00027
00028 using std::cout;
00029 using std::endl;
00030
00031 int XrdMonCtrArchiver::_decHDFlushDelay = -1;
00032 int XrdMonCtrArchiver::_decRTFlushDelay = -1;
00033
00034 XrdMonCtrArchiver::XrdMonCtrArchiver(const char* cBaseDir,
00035 const char* dBaseDir,
00036 const char* rtLogDir,
00037 kXR_int64 maxLogSize,
00038 int ctrBufSize,
00039 int rtBufSize,
00040 bool onlineDec,
00041 bool rtDec)
00042 : _decoder(0),
00043 _currentTime(0),
00044 _heartbeat(1)
00045 {
00046 XrdMonCtrWriter::setBaseDir(cBaseDir);
00047 XrdMonCtrWriter::setMaxLogSize(maxLogSize);
00048 XrdMonCtrWriter::setBufferSize(ctrBufSize);
00049
00050 if ( onlineDec ) {
00051 _decoder = new XrdMonDecPacketDecoder(dBaseDir, rtLogDir, rtBufSize);
00052
00053 if ( 0 != pthread_create(&_decHDFlushThread,
00054 0,
00055 decHDFlushHeartBeat,
00056 (void*)_decoder) ) {
00057 throw XrdMonException(ERR_PTHREADCREATE,
00058 "Failed to create thread");
00059 }
00060 if ( 0 != rtDec ) {
00061 if ( 0 != pthread_create(&_decRTFlushThread,
00062 0,
00063 decRTFlushHeartBeat,
00064 (void*)_decoder) ) {
00065 throw XrdMonException(ERR_PTHREADCREATE,
00066 "Failed to create thread");
00067 }
00068 }
00069 }
00070 }
00071
00072 XrdMonCtrArchiver::~XrdMonCtrArchiver()
00073 {
00074 _decoder->flushRealTimeData();
00075 delete _decoder;
00076 _decoder = 0;
00077
00078
00079 int i, s = _writers.size();
00080 for (i=0 ; i<s ; i++) {
00081 delete _writers[i];
00082 }
00083 _writers.clear();
00084
00085 XrdMonSenderInfo::shutdown();
00086 }
00087
00088 void
00089 XrdMonCtrArchiver::operator()()
00090 {
00091 XrdMonCtrBuffer* pb = XrdMonCtrBuffer::instance();
00092 while ( 1 ) {
00093 try {
00094 if ( 0 == --_heartbeat ) {
00095 check4InactiveSenders();
00096 }
00097 XrdMonCtrPacket* p = pb->pop_front();
00098 archivePacket(p);
00099 delete p;
00100 } catch (XrdMonException& e) {
00101 if ( e.err() == SIG_SHUTDOWNNOW ) {
00102 return;
00103 }
00104 e.printItOnce();
00105 }
00106 }
00107 }
00108
00109
00110
00111 extern "C" void*
00112 decHDFlushHeartBeat(void* arg)
00113 {
00114 if ( XrdMonCtrArchiver::_decHDFlushDelay == -1 ) {
00115 return (void*)0;
00116 }
00117 XrdMonDecPacketDecoder* myDecoder = (XrdMonDecPacketDecoder*) arg;
00118 if ( 0 == myDecoder ) {
00119 throw XrdMonException(ERR_PTHREADCREATE,
00120 "invalid archiver passed");
00121 }
00122 while ( 1 ) {
00123 sleep(XrdMonCtrArchiver::_decHDFlushDelay);
00124 myDecoder->flushHistoryData();
00125 }
00126
00127 return (void*)0;
00128 }
00129
00130
00131
00132 extern "C" void*
00133 decRTFlushHeartBeat(void* arg)
00134 {
00135 if ( XrdMonCtrArchiver::_decRTFlushDelay == -1 ) {
00136 return (void*)0;
00137 }
00138 XrdMonDecPacketDecoder* myDecoder = (XrdMonDecPacketDecoder*) arg;
00139 if ( 0 == myDecoder ) {
00140 throw XrdMonException(ERR_PTHREADCREATE,
00141 "invalid archiver passed");
00142 }
00143 while ( 1 ) {
00144 sleep(XrdMonCtrArchiver::_decRTFlushDelay);
00145 if ( 0 != myDecoder ) {
00146 myDecoder->flushRealTimeData();
00147 }
00148 }
00149
00150 return (void*)0;
00151 }
00152
00153 void
00154 XrdMonCtrArchiver::check4InactiveSenders()
00155 {
00156 _heartbeat = TIMESTAMP_FREQ;
00157 struct timeval tv;
00158 gettimeofday(&tv, 0);
00159 _currentTime = tv.tv_sec;
00160
00161 long allowed = _currentTime - MAX_INACTIVITY;
00162 int i, s = _writers.size();
00163 for (i=0 ; i<s ; i++) {
00164 if ( _writers[i]->lastActivity() < allowed ) {
00165 cout << "No activity for " << MAX_INACTIVITY << " sec., "
00166 << "closing all files for sender "
00167 << XrdMonSenderInfo::id2HostPortStr(i) << endl;
00168 _writers[i]->forceClose();
00169 }
00170 }
00171 }
00172
00173 void
00174 XrdMonCtrArchiver::archivePacket(XrdMonCtrPacket* p)
00175 {
00176 XrdMonHeader header;
00177 header.decode(p->buf);
00178
00179 if ( XrdMonCtrAdmin::isAdminPacket(header) ) {
00180 kXR_int16 command = 0, arg = 0;
00181 XrdMonCtrAdmin::decodeAdminPacket(p->buf, command, arg);
00182 XrdMonCtrAdmin::doIt(command, arg);
00183 return;
00184 }
00185
00186 senderid_t senderId = XrdMonSenderInfo::convert2Id(p->sender);
00187
00188 XrdMonCtrWriter* w = 0;
00189
00190 if ( _writers.size() <= senderId ) {
00191 w = new XrdMonCtrWriter(senderId, header.stod());
00192 _writers.push_back(w);
00193 } else {
00194 w = _writers[senderId];
00195 if ( w->prevStod() != header.stod() ) {
00196 cout << "\n* * * * XRD RESTARTED for "
00197 << XrdMonSenderInfo::id2HostPortStr(senderId)
00198 << ": " << w->prevStod() << " != " << header.stod()
00199 << " * * * *\n" << endl;
00200 delete w;
00201 _writers[senderId] = w =
00202 new XrdMonCtrWriter(senderId, header.stod());
00203 _decoder->reset(senderId);
00204 }
00205 }
00206
00207 w->operator()(p->buf, header, _currentTime);
00208
00209 if ( 0 != _decoder ) {
00210 _decoder->operator()(header, p->buf, senderId);
00211 }
00212 }