00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "XrdMon/XrdMonException.hh"
00014 #include "XrdMon/XrdMonCommon.hh"
00015 #include "XrdMon/XrdMonHeader.hh"
00016 #include "XrdMon/XrdMonSenderInfo.hh"
00017 #include "XrdMon/XrdMonUtils.hh"
00018 #include "XrdMon/XrdMonCtrDebug.hh"
00019 #include "XrdMon/XrdMonErrors.hh"
00020 #include "XrdMon/XrdMonCtrWriter.hh"
00021 #include "XrdSys/XrdSysHeaders.hh"
00022
00023 #include <assert.h>
00024 #include <netinet/in.h>
00025 #include <stdio.h>
00026 #include <unistd.h>
00027
00028 #include <iomanip>
00029 using std::cout;
00030 using std::endl;
00031 using std::ios;
00032 using std::setw;
00033
00034 string XrdMonCtrWriter::_baseDir;
00035 kXR_int64 XrdMonCtrWriter::_maxLogSize(1024*1024*1024);
00036 long XrdMonCtrWriter::_totalArchived(0);
00037
00038
00039 kXR_int32 XrdMonCtrWriter::_bufferSize(1024*1024);
00040
00041 XrdMonCtrWriter::XrdMonCtrWriter(senderid_t senderId, kXR_int32 stod)
00042 : _prevStod(stod),
00043 _buffer(0),
00044 _bPos(0),
00045 _lastActivity(0)
00046 {
00047 assert(_bufferSize > 0);
00048
00049 _timestamp = generateTimestamp();
00050
00051 _sender = XrdMonSenderInfo::id2HostPort(senderId);
00052 }
00053
00054 XrdMonCtrWriter::~XrdMonCtrWriter()
00055 {
00056 flushBuffer();
00057 closeLog();
00058 publish();
00059 delete [] _buffer;
00060 }
00061
00062 void
00063 XrdMonCtrWriter::operator()(const char* packet,
00064 const XrdMonHeader& header,
00065 long currentTime)
00066 {
00067 _lastActivity = currentTime;
00068
00069
00070 if ( 0 == _buffer ) {
00071 _buffer = new char [_bufferSize];
00072 if ( 0 == _buffer ) {
00073 throw XrdMonException(ERR_NOMEM,
00074 "Unable to allocate buffer - run out of memory");
00075 }
00076 }
00077
00078
00079
00080 if ( bufferIsFull(header.packetLen()) ) {
00081
00082 flushBuffer();
00083 }
00084
00085
00086 memcpy(_buffer+_bPos, packet, header.packetLen());
00087 _bPos += header.packetLen();
00088
00089 if ( XrdMonCtrDebug::verbose(XrdMonCtrDebug::Receiving) ) {
00090 XrdSysMutexHelper mh; mh.Lock(&XrdMonCtrDebug::_mutex);
00091
00092
00093 }
00094 }
00095
00096 void
00097 XrdMonCtrWriter::forceClose()
00098 {
00099 cout <<"forceClose not implemented" << endl;
00100 }
00101
00102
00103
00104
00105
00106 void
00107 XrdMonCtrWriter::flushBuffer()
00108 {
00109 if ( _bPos == 0 ) {
00110 return;
00111 }
00112
00113 if ( !logIsOpen() ) {
00114 openLog();
00115 }
00116
00117 if ( logIsFull() ) {
00118 closeLog();
00119 publish();
00120 openLog();
00121 }
00122
00123 _file.write(_buffer, _bPos);
00124
00125 memset(_buffer, 0, _bufferSize);
00126 _bPos = 0;
00127 }
00128 void
00129 XrdMonCtrWriter::mkActiveLogNameDirs() const
00130 {
00131 char* b = new char [_baseDir.size() + 64];
00132
00133 sprintf (b, "%s/%s", _baseDir.c_str(), _sender.first);
00134 mkdirIfNecessary(b);
00135
00136 sprintf (b, "%s/%s/%d", _baseDir.c_str(), _sender.first, _sender.second);
00137 mkdirIfNecessary(b);
00138
00139 delete [] b;
00140 }
00141
00142 string
00143 XrdMonCtrWriter::logName(LogType t) const
00144 {
00145 char* buf = new char[_baseDir.size() + 128];
00146 if ( t == ACTIVE ) {
00147 sprintf(buf, "%s/%s/%d/active.rcv",
00148 _baseDir.c_str(), _sender.first, _sender.second);
00149 } else if ( t == PERMANENT ) {
00150 sprintf(buf, "%s/%s/%d/%s_%s:%d.rcv",
00151 _baseDir.c_str(), _sender.first, _sender.second,
00152 _timestamp.c_str(), _sender.first, _sender.second);
00153 } else {
00154 delete [] buf;
00155 throw XrdMonException(ERR_INVALIDARG, "in XrdMonCtrWriter::logName");
00156 }
00157 string s(buf);
00158 delete [] buf;
00159 return s;
00160 }
00161
00162 void
00163 XrdMonCtrWriter::openLog()
00164 {
00165 mkActiveLogNameDirs();
00166 _file.open(logName(ACTIVE).c_str(), ios::out|ios::binary|ios::ate);
00167 }
00168
00169 void
00170 XrdMonCtrWriter::closeLog()
00171 {
00172 if ( _file.is_open() ) {
00173 _file.close();
00174 }
00175 }
00176
00177 void
00178 XrdMonCtrWriter::publish()
00179 {
00180 string src = logName(ACTIVE);
00181
00182 if ( 0 == _bPos && 0 != access(src.c_str(), F_OK) ) {
00183 return;
00184 }
00185
00186 string dest = logName(PERMANENT);
00187 if ( 0 != rename(src.c_str(), dest.c_str()) ) {
00188 string ss("Cannot rename "); ss += src;
00189 ss += " to "; ss += dest;
00190 throw XrdMonException(ERR_RENAME, ss);
00191 }
00192 _timestamp = generateTimestamp();
00193 }
00194
00195 ostream&
00196 operator<<(ostream& o, const XrdMonCtrWriter& w)
00197 {
00198 o << w._sender.first << ':' << w._sender.second;
00199 return o;
00200 }