00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef XRDMONDECSINK_HH
00014 #define XRDMONDECSINK_HH
00015
00016 #include "XrdMon/XrdMonDecDictInfo.hh"
00017 #include "XrdMon/XrdMonDecTraceInfo.hh"
00018 #include "XrdMon/XrdMonDecUserInfo.hh"
00019 #include "XrdMon/XrdMonDecStageInfo.hh"
00020 #include "XrdMon/XrdMonBufferedOutput.hh"
00021 #include "XrdSys/XrdSysPthread.hh"
00022 #include <algorithm>
00023 #include <fstream>
00024 #include <map>
00025 #include <vector>
00026 using std::fstream;
00027 using std::map;
00028 using std::pair;
00029 using std::vector;
00030
00031 class XrdMonDecSink {
00032 public:
00033 XrdMonDecSink(const char* baseDir,
00034 const char* rtLogDir,
00035 int rtBufSize,
00036 bool saveTraces,
00037 int maxTraceLogSize);
00038 ~XrdMonDecSink();
00039
00040 void init(dictid_t min, dictid_t max, const string& senderHP);
00041 sequen_t lastSeq() const { return _lastSeq; }
00042 void registerXrdRestart(kXR_int32 stod, senderid_t senderId);
00043
00044 void setLastSeq(sequen_t seq) { _lastSeq = seq; }
00045
00046 void addDictId(dictid_t xrdId,
00047 const char* theString,
00048 int len,
00049 senderid_t senderId);
00050 void addStageInfo(dictid_t xrdId,
00051 const char* theString,
00052 int len,
00053 senderid_t senderId);
00054 void addUserId(dictid_t xrdId,
00055 const char* theString,
00056 int len,
00057 senderid_t senderId);
00058 void add(dictid_t xrdId,
00059 XrdMonDecTraceInfo& trace,
00060 senderid_t senderId);
00061 void addUserDisconnect(dictid_t xrdId,
00062 kXR_int32 sec,
00063 kXR_int32 timestamp,
00064 senderid_t senderId);
00065 void openFile(dictid_t dictId,
00066 kXR_int32 timestamp,
00067 senderid_t senderId,
00068 kXR_int64 fSize);
00069 void closeFile(dictid_t dictId,
00070 kXR_int64 bytesR,
00071 kXR_int64 bytesW,
00072 kXR_int32 timestamp,
00073 senderid_t senderId);
00074 void flushHistoryData();
00075 void flushRealTimeData() { if ( 0 != _rtLogger ) _rtLogger->flush(); }
00076
00077 void reset(senderid_t senderId);
00078
00079 private:
00080 typedef map<dictid_t, XrdMonDecDictInfo*> dmap_t;
00081 typedef map<dictid_t, XrdMonDecUserInfo*> umap_t;
00082 typedef map<dictid_t, XrdMonDecDictInfo*>::iterator dmapitr_t;
00083 typedef map<dictid_t, XrdMonDecUserInfo*>::iterator umapitr_t;
00084
00085 void initRT(const char* rtLogDir, int rtBufSize);
00086 void addVersion();
00087
00088 void loadUniqueIdsAndSeq();
00089 vector<XrdMonDecDictInfo*> loadActiveDictInfo();
00090 void flushClosedDicts();
00091 void flushUserCache();
00092 void flushTCache();
00093 void checkpoint();
00094 void openTraceFile(fstream& f);
00095 void write2TraceFile(fstream& f, const char* buf, int len);
00096 void registerLostPacket(dictid_t id, const char* descr);
00097 void reportLostPackets();
00098
00099 void flushOneDMap(dmap_t* m, int& curLen, const int BUFSIZE,
00100 string& buf, fstream& fD);
00101 void flushOneUMap(umap_t* m, int& curLen, const int BUFSIZE,
00102 string& buf, fstream& fD);
00103
00104 void resetDMap(senderid_t senderId);
00105 void resetUMap(senderid_t senderId);
00106
00107 private:
00108
00109
00110 static const kXR_unt16 VER_FREQ;
00111
00112 kXR_unt16 _verFreqCount;
00113
00114
00115 vector< dmap_t* > _dCache;
00116 vector< umap_t* > _uCache;
00117
00118
00119
00120
00121 XrdSysMutex _dMutex;
00122 XrdSysMutex _uMutex;
00123
00124 XrdMonBufferedOutput* _rtLogger;
00125
00126 bool _saveTraces;
00127 typedef vector<XrdMonDecTraceInfo> TraceVector;
00128 TraceVector _tCache;
00129 kXR_unt32 _tCacheSize;
00130 kXR_unt16 _traceLogNumber;
00131 kXR_int64 _maxTraceLogSize;
00132
00133 map<dictid_t, long> _lost;
00134
00135 sequen_t _lastSeq;
00136 dictid_t _uniqueDictId;
00137 dictid_t _uniqueUserId;
00138
00139 string _path;
00140 string _jnlPath;
00141 string _dictPath;
00142 string _userPath;
00143 string _rtFlagPath;
00144 string _rtMaxIdsPath;
00145 string _xrdRestartLog;
00146 };
00147
00148 #endif