XrdMonCtrWriter.cc

Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                            XrdMonCtrWriter.cc                             */
00004 /*                                                                           */
00005 /* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00010 
00011 // $Id: XrdMonCtrWriter.cc 24468 2008-06-22 16:47:03Z ganis $
00012 
00013 #include "XrdMon/XrdMonException.hh"
00014 #include "XrdMon/XrdMonCommon.hh"
00015 #include "XrdMon/XrdMonHeader.hh"
00016 #include "XrdMon/XrdMonSenderInfo.hh"
00017 #include "XrdMon/XrdMonUtils.hh"
00018 #include "XrdMon/XrdMonCtrDebug.hh"
00019 #include "XrdMon/XrdMonErrors.hh"
00020 #include "XrdMon/XrdMonCtrWriter.hh"
00021 #include "XrdSys/XrdSysHeaders.hh"
00022 
00023 #include <assert.h>
00024 #include <netinet/in.h> /* ntohs  */
00025 #include <stdio.h>      /* rename */
00026 #include <unistd.h>     /* F_OK   */
00027 
00028 #include <iomanip>
00029 using std::cout;
00030 using std::endl;
00031 using std::ios;
00032 using std::setw;
00033 
00034 string  XrdMonCtrWriter::_baseDir;
00035 kXR_int64 XrdMonCtrWriter::_maxLogSize(1024*1024*1024); // 1GB
00036 long    XrdMonCtrWriter::_totalArchived(0);
00037 
00038 // never make it < MAXPACKETSIZE
00039 kXR_int32 XrdMonCtrWriter::_bufferSize(1024*1024);      // 1MB 
00040 
00041 XrdMonCtrWriter::XrdMonCtrWriter(senderid_t senderId, kXR_int32 stod)
00042     : _prevStod(stod),
00043       _buffer(0),
00044       _bPos(0),
00045       _lastActivity(0)
00046 {
00047     assert(_bufferSize > 0);
00048 
00049     _timestamp = generateTimestamp();
00050     
00051     _sender = XrdMonSenderInfo::id2HostPort(senderId);
00052 }
00053 
00054 XrdMonCtrWriter::~XrdMonCtrWriter()
00055 {
00056     flushBuffer();
00057     closeLog();
00058     publish();
00059     delete [] _buffer;
00060 }
00061 
00062 void
00063 XrdMonCtrWriter::operator()(const char* packet, 
00064                             const XrdMonHeader& header, 
00065                             long currentTime)
00066 {
00067     _lastActivity = currentTime;
00068     
00069     // initialize buffer
00070     if ( 0 == _buffer ) {
00071         _buffer = new char [_bufferSize];
00072         if ( 0 == _buffer ) {
00073             throw XrdMonException(ERR_NOMEM,
00074                "Unable to allocate buffer - run out of memory");
00075         }
00076     }
00077 
00078     // check if there is space in buffer
00079     // if not, flush to log file
00080     if ( bufferIsFull(header.packetLen()) ) {
00081         //cout << "flushing buffer for " << _sender.first << endl;
00082         flushBuffer();
00083     }
00084 
00085     // write packet to buffer
00086     memcpy(_buffer+_bPos, packet, header.packetLen());
00087     _bPos += header.packetLen();
00088 
00089     if ( XrdMonCtrDebug::verbose(XrdMonCtrDebug::Receiving) ) {
00090         XrdSysMutexHelper mh; mh.Lock(&XrdMonCtrDebug::_mutex);
00091         //cout << " Archiving packet #" << setw(5) << ++_totalArchived 
00092         //     << " from " << _sender.first << " " << header << endl;
00093     }
00094 }
00095 
00096 void
00097 XrdMonCtrWriter::forceClose()
00098 {
00099     cout <<"forceClose not implemented" << endl;
00100 }
00101 
00102 // ===========================================
00103 // =========== private below =================
00104 // ===========================================
00105 
00106 void
00107 XrdMonCtrWriter::flushBuffer()
00108 {
00109     if ( _bPos == 0 ) {
00110         return;
00111     }
00112     
00113     if ( !logIsOpen() ) {
00114         openLog();
00115     }
00116     
00117     if ( logIsFull() ) {
00118         closeLog();
00119         publish();
00120         openLog();
00121     }
00122 
00123     _file.write(_buffer, _bPos);
00124 
00125     memset(_buffer, 0, _bufferSize);
00126     _bPos = 0;
00127 }
00128 void
00129 XrdMonCtrWriter::mkActiveLogNameDirs() const
00130 {
00131     char* b = new char [_baseDir.size() + 64];
00132 
00133     sprintf (b, "%s/%s", _baseDir.c_str(), _sender.first);
00134     mkdirIfNecessary(b);
00135 
00136     sprintf (b, "%s/%s/%d", _baseDir.c_str(), _sender.first, _sender.second);
00137     mkdirIfNecessary(b);
00138 
00139     delete [] b;
00140 }
00141 
00142 string
00143 XrdMonCtrWriter::logName(LogType t) const
00144 {
00145     char* buf = new char[_baseDir.size() + 128];    
00146     if ( t == ACTIVE ) {
00147         sprintf(buf, "%s/%s/%d/active.rcv", 
00148                 _baseDir.c_str(), _sender.first, _sender.second);
00149     } else if ( t == PERMANENT ) {
00150         sprintf(buf, "%s/%s/%d/%s_%s:%d.rcv", 
00151                 _baseDir.c_str(), _sender.first, _sender.second,
00152                 _timestamp.c_str(), _sender.first, _sender.second);
00153     } else {
00154         delete [] buf;
00155         throw XrdMonException(ERR_INVALIDARG, "in XrdMonCtrWriter::logName");
00156     }
00157     string s(buf);
00158     delete [] buf;
00159     return s;
00160 }
00161 
00162 void
00163 XrdMonCtrWriter::openLog()
00164 {
00165     mkActiveLogNameDirs();
00166     _file.open(logName(ACTIVE).c_str(), ios::out|ios::binary|ios::ate);
00167 }
00168 
00169 void
00170 XrdMonCtrWriter::closeLog()
00171 {
00172     if ( _file.is_open() ) {
00173         _file.close();
00174     }
00175 }
00176 
00177 void
00178 XrdMonCtrWriter::publish()
00179 {
00180     string src = logName(ACTIVE);
00181 
00182     if ( 0 == _bPos && 0 != access(src.c_str(), F_OK) ) {
00183         return;
00184     }
00185 
00186     string dest = logName(PERMANENT);
00187     if ( 0 != rename(src.c_str(), dest.c_str()) ) {
00188         string ss("Cannot rename "); ss += src;
00189         ss += " to "; ss += dest;
00190         throw XrdMonException(ERR_RENAME, ss);
00191     }
00192     _timestamp = generateTimestamp();
00193 }
00194 
00195 ostream&
00196 operator<<(ostream& o, const XrdMonCtrWriter& w)
00197 {
00198     o << w._sender.first << ':' << w._sender.second;
00199     return o;
00200 }

Generated on Tue Jul 5 14:46:42 2011 for ROOT_528-00b_version by  doxygen 1.5.1