
Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                             XrdMonDecSink.hh                              */
00004 /*                                                                           */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00011 // $Id: XrdMonDecSink.hh 24468 2008-06-22 16:47:03Z ganis $
00013 #ifndef XRDMONDECSINK_HH
00014 #define XRDMONDECSINK_HH
00016 #include "XrdMon/XrdMonDecDictInfo.hh"
00017 #include "XrdMon/XrdMonDecTraceInfo.hh"
00018 #include "XrdMon/XrdMonDecUserInfo.hh"
00019 #include "XrdMon/XrdMonDecStageInfo.hh"
00020 #include "XrdMon/XrdMonBufferedOutput.hh"
00021 #include "XrdSys/XrdSysPthread.hh"
00022 #include <algorithm>
00023 #include <fstream>
00024 #include <map>
00025 #include <vector>
00026 using std::fstream;
00027 using std::map;
00028 using std::pair;
00029 using std::vector;
00031 class XrdMonDecSink {
00032 public:
00033     XrdMonDecSink(const char* baseDir,
00034                   const char* rtLogDir,
00035                   int rtBufSize,
00036                   bool saveTraces,
00037                   int maxTraceLogSize);
00038     ~XrdMonDecSink();
00040     void init(dictid_t min, dictid_t max, const string& senderHP);
00041     sequen_t lastSeq() const { return _lastSeq; }
00042     void registerXrdRestart(kXR_int32 stod, senderid_t senderId);
00044     void setLastSeq(sequen_t seq) { _lastSeq = seq; }
00046     void addDictId(dictid_t xrdId, 
00047                    const char* theString, 
00048                    int len,
00049                    senderid_t senderId);
00050     void addStageInfo(dictid_t xrdId, 
00051                       const char* theString, 
00052                       int len,
00053                       senderid_t senderId);
00054     void addUserId(dictid_t xrdId,
00055                    const char* theString,
00056                    int len,
00057                    senderid_t senderId);
00058     void add(dictid_t xrdId,
00059              XrdMonDecTraceInfo& trace,
00060              senderid_t senderId);
00061     void addUserDisconnect(dictid_t xrdId,
00062                            kXR_int32 sec,
00063                            kXR_int32 timestamp,
00064                            senderid_t senderId);
00065     void openFile(dictid_t dictId,
00066                   kXR_int32 timestamp,
00067                   senderid_t senderId,
00068                   kXR_int64 fSize);
00069     void closeFile(dictid_t dictId, 
00070                    kXR_int64 bytesR, 
00071                    kXR_int64 bytesW, 
00072                    kXR_int32 timestamp,
00073                    senderid_t senderId);
00074     void flushHistoryData();
00075     void flushRealTimeData() { if ( 0 != _rtLogger ) _rtLogger->flush(); }
00077     void reset(senderid_t senderId);
00079 private:
00080     typedef map<dictid_t, XrdMonDecDictInfo*> dmap_t;
00081     typedef map<dictid_t, XrdMonDecUserInfo*> umap_t;
00082     typedef map<dictid_t, XrdMonDecDictInfo*>::iterator dmapitr_t;
00083     typedef map<dictid_t, XrdMonDecUserInfo*>::iterator umapitr_t;
00085     void initRT(const char* rtLogDir, int rtBufSize);
00086     void addVersion();
00088     void loadUniqueIdsAndSeq();
00089     vector<XrdMonDecDictInfo*> loadActiveDictInfo();
00090     void flushClosedDicts();
00091     void flushUserCache();
00092     void flushTCache();
00093     void checkpoint();
00094     void openTraceFile(fstream& f);
00095     void write2TraceFile(fstream& f, const char* buf, int len);
00096     void registerLostPacket(dictid_t id, const char* descr);
00097     void reportLostPackets();
00099     void flushOneDMap(dmap_t* m, int& curLen, const int BUFSIZE, 
00100                       string& buf, fstream& fD);
00101     void flushOneUMap(umap_t* m, int& curLen, const int BUFSIZE, 
00102                       string& buf, fstream& fD);
00104     void resetDMap(senderid_t senderId);
00105     void resetUMap(senderid_t senderId);
00107 private:
00108     // this defines how frequently version information will be
00109     // added to the log file (every ...how many entries in the log file)
00110     static const kXR_unt16 VER_FREQ;
00112     kXR_unt16 _verFreqCount;
00115     vector< dmap_t* > _dCache;
00116     vector< umap_t* > _uCache;
00118     // The mutexes guard access to dCache, uCache respectively.
00119     // _dCache and _uCache can be accessed from different threads
00120     // (periodic data flushing inside dedicated thread)
00121     XrdSysMutex    _dMutex;
00122     XrdSysMutex    _uMutex;
00124     XrdMonBufferedOutput* _rtLogger;
00126     bool _saveTraces;
00127     typedef vector<XrdMonDecTraceInfo> TraceVector;
00128     TraceVector _tCache;
00129     kXR_unt32 _tCacheSize;
00130     kXR_unt16 _traceLogNumber;  // trace.000.ascii, 001, and so on...
00131     kXR_int64  _maxTraceLogSize; // [in MB]
00133     map<dictid_t, long> _lost; //lost dictIds -> number of lost traces
00135     sequen_t _lastSeq;
00136     dictid_t _uniqueDictId; // dictId in mySQL, unique for given xrootd host
00137     dictid_t _uniqueUserId; // userId in mySQL, unique for given xrootd host
00139     string _path;        // <basePath>/<date>_seqId_
00140     string _jnlPath;     // <basePath>/jnl
00141     string _dictPath;    // <basePath>/<YYYYMMDD_HH:MM:SS.MMM_dict.ascii
00142     string _userPath;    // <basePath>/<YYYYMMDD_HH:MM:SS.MMM_user.ascii 
00143     string _rtFlagPath;  // <rtLogDir>/rtRunning.flag
00144     string _rtMaxIdsPath;// <rtLogDir>/rtMax.jnl
00145     string _xrdRestartLog;// <basePath>/xrdRestarts.ascii
00146 };
00148 #endif /* XRDMONDECSINK_HH */

Generated on Tue Jul 5 14:46:43 2011 for ROOT_528-00b_version by  doxygen 1.5.1