00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "XrdMon/XrdMonException.hh"
00014 #include "XrdMon/XrdMonCommon.hh"
00015 #include "XrdMon/XrdMonHeader.hh"
00016 #include "XrdMon/XrdMonUtils.hh"
00017 #include "XrdMon/XrdMonErrors.hh"
00018 #include "XrdMon/XrdMonDecPacketDecoder.hh"
00019 #include "XrdMon/XrdMonDecTraceInfo.hh"
00020 #include "XrdSys/XrdSysPlatform.hh"
00021 #include "XrdXrootd/XrdXrootdMonData.hh"
00022 #include <stdio.h>
00023 #include <netinet/in.h>
00024 using std::cerr;
00025 using std::cout;
00026 using std::endl;
00027
00028
00029 XrdMonDecPacketDecoder::XrdMonDecPacketDecoder(const char* baseDir,
00030 const char* rtLogDir,
00031 int rtBufSize)
00032 : _sink(baseDir, rtLogDir, rtBufSize, false, 2),
00033 _stopNow(false),
00034 _upToTime(0)
00035 {}
00036
00037 XrdMonDecPacketDecoder::XrdMonDecPacketDecoder(const char* baseDir,
00038 bool saveTraces,
00039 int maxTraceLogSize,
00040 kXR_int32 upToTime)
00041 : _sink(baseDir, 0, 8, saveTraces, maxTraceLogSize),
00042 _stopNow(false),
00043 _upToTime(upToTime)
00044 {}
00045
00046 void
00047 XrdMonDecPacketDecoder::init(dictid_t min,
00048 dictid_t max,
00049 const string& senderHP)
00050 {
00051 _sink.init(min, max, senderHP);
00052 }
00053
00054
00055 void
00056 XrdMonDecPacketDecoder::operator()(const XrdMonHeader& header,
00057 const char* packet,
00058 senderid_t senderId)
00059 {
00060 int len = header.packetLen() - HDRLEN;
00061
00062
00063 if ( len < 1 ) {
00064 cout << "Warning: Ignoring empty packet" << endl;
00065 return;
00066 }
00067
00068 if ( header.stodChanged(senderId) ) {
00069 _sink.registerXrdRestart(header.stod(), senderId);
00070 }
00071
00072 switch ( header.packetType() ) {
00073 case PACKET_TYPE_TRACE : {
00074 decodeTracePacket(packet+HDRLEN, len, senderId);
00075 break;
00076 }
00077 case PACKET_TYPE_DICT : {
00078 decodeDictPacket(packet+HDRLEN, len, senderId);
00079 break;
00080 }
00081 case PACKET_TYPE_USER : {
00082 decodeUserPacket(packet+HDRLEN, len, senderId);
00083 break;
00084 }
00085 case PACKET_TYPE_STAGE : {
00086 decodeStagePacket(packet+HDRLEN, len, senderId);
00087 break;
00088 }
00089
00090 default: {
00091 cerr << "Unsupported packet type: " << header.packetType() << endl;
00092 }
00093 }
00094 _sink.setLastSeq(header.seqNo());
00095 }
00096
00097 void
00098 XrdMonDecPacketDecoder::reset(senderid_t senderId)
00099 {
00100 _sink.reset(senderId);
00101 }
00102
00103
00104 void
00105 XrdMonDecPacketDecoder::decodeTracePacket(const char* packet,
00106 int len,
00107 senderid_t senderId)
00108 {
00109
00110 if ( static_cast<kXR_char>(*packet) != XROOTD_MON_WINDOW ) {
00111 char buf[256];
00112 sprintf(buf, "Expected time window packet (1st packet), got %i, ", (int)*packet);
00113 throw XrdMonException(ERR_NOTATIMEWINDOW, buf);
00114 }
00115 TimePair t = decodeTime(packet);
00116 if ( _upToTime != 0 && _upToTime <= t.first ) {
00117 cout << "reached the up-to-time, will stop decoding now" << endl;
00118 _stopNow = true;
00119 return;
00120 }
00121 kXR_int32 begTime = t.second;
00122 int offset = TRACELEN;
00123
00124
00125
00126 while ( offset < len ) {
00127 CalcTime ct = prepareTimestamp(packet, offset, len, begTime);
00128 int elemNo = 0;
00129 while ( offset<ct.endOffset ) {
00130 kXR_char infoType = static_cast<kXR_char>(*(packet+offset));
00131 kXR_int32 timestamp = begTime + (kXR_int32) (elemNo++ * ct.timePerTrace);
00132 if ( !(infoType & XROOTD_MON_RWREQUESTMASK) ) {
00133 decodeRWRequest(packet+offset, timestamp, senderId);
00134 } else if ( infoType == XROOTD_MON_OPEN ) {
00135 decodeOpen(packet+offset, timestamp, senderId);
00136 } else if ( infoType == XROOTD_MON_CLOSE ) {
00137 decodeClose(packet+offset, timestamp, senderId);
00138 } else if ( infoType == XROOTD_MON_DISC ) {
00139 decodeDisconnect(packet+offset, timestamp, senderId);
00140 } else {
00141 char buf[256];
00142 sprintf(buf, "Unsupported infoType of trace packet: %i", (int) infoType);
00143 throw XrdMonException(ERR_INVALIDINFOTYPE, buf);
00144 }
00145 offset += TRACELEN;
00146 }
00147 begTime = ct.begTimeNextWindow;
00148 offset += TRACELEN;
00149 }
00150 }
00151
00152
00153 void
00154 XrdMonDecPacketDecoder::decodeDictPacket(const char* packet,
00155 int len,
00156 senderid_t senderId)
00157 {
00158 kXR_int32 x32;
00159 memcpy(&x32, packet, sizeof(kXR_int32));
00160 dictid_t dictId = ntohl(x32);
00161
00162 _sink.addDictId(dictId,
00163 packet+sizeof(kXR_int32),
00164 len-sizeof(kXR_int32),
00165 senderId);
00166 }
00167
00168
00169 void
00170 XrdMonDecPacketDecoder::decodeUserPacket(const char* packet,
00171 int len,
00172 senderid_t senderId)
00173 {
00174 kXR_int32 x32;
00175 memcpy(&x32, packet, sizeof(kXR_int32));
00176 dictid_t dictId = ntohl(x32);
00177
00178 _sink.addUserId(dictId,
00179 packet+sizeof(kXR_int32),
00180 len-sizeof(kXR_int32),
00181 senderId);
00182 }
00183
00184
00185 void
00186 XrdMonDecPacketDecoder::decodeStagePacket(const char* packet,
00187 int len,
00188 senderid_t senderId)
00189 {
00190 kXR_int32 x32;
00191 memcpy(&x32, packet, sizeof(kXR_int32));
00192 dictid_t dictId = ntohl(x32);
00193
00194 _sink.addStageInfo(dictId,
00195 packet+sizeof(kXR_int32),
00196 len-sizeof(kXR_int32),
00197 senderId);
00198 }
00199
00200 XrdMonDecPacketDecoder::TimePair
00201 XrdMonDecPacketDecoder::decodeTime(const char* packet)
00202 {
00203 struct X {
00204 kXR_int32 endT;
00205 kXR_int32 begT;
00206 } x;
00207
00208 memcpy(&x, packet+sizeof(kXR_int64), sizeof(X));
00209 return TimePair(ntohl(x.endT), ntohl(x.begT));
00210 }
00211
00212 void
00213 XrdMonDecPacketDecoder::decodeRWRequest(const char* packet,
00214 kXR_int32 timestamp,
00215 senderid_t senderId)
00216 {
00217 XrdXrootdMonTrace trace;
00218 memcpy(&trace, packet, sizeof(XrdXrootdMonTrace));
00219 kXR_int64 tOffset = ntohll(trace.arg0.val);
00220 kXR_int32 tLen = ntohl (trace.arg1.buflen);
00221 kXR_unt32 dictId = ntohl (trace.arg2.dictid);
00222
00223 if ( tOffset < 0 ) {
00224 throw XrdMonException(ERR_NEGATIVEOFFSET);
00225 }
00226 char rwReq = 'r';
00227 if ( tLen<0 ) {
00228 rwReq = 'w';
00229 tLen *= -1;
00230 }
00231
00232 XrdMonDecTraceInfo traceInfo(tOffset, tLen, rwReq, timestamp);
00233 _sink.add(dictId, traceInfo, senderId);
00234 }
00235
00236 void
00237 XrdMonDecPacketDecoder::decodeOpen(const char* packet,
00238 kXR_int32 timestamp,
00239 senderid_t senderId)
00240 {
00241 XrdXrootdMonTrace trace;
00242 memcpy(&trace, packet, sizeof(XrdXrootdMonTrace));
00243 kXR_unt32 dictId = ntohl(trace.arg2.dictid);
00244
00245
00246 #if _FILE_OFFSET_BITS==64
00247 kXR_int64 maskHBO = 0x00ffffffffffffffLL;
00248 #else
00249 kXR_int64 maskHBO = 0x00ffffff;
00250 #endif
00251 kXR_int64 maskNBO = 0;
00252 h2nll(maskHBO, maskNBO);
00253 kXR_int64 maskedVal = trace.arg0.val & maskNBO;
00254 kXR_int64 fSize = 0;
00255 n2hll(maskedVal, fSize);
00256
00257 _sink.openFile(dictId, timestamp, senderId, fSize);
00258 }
00259
00260 void
00261 XrdMonDecPacketDecoder::decodeClose(const char* packet,
00262 kXR_int32 timestamp,
00263 senderid_t senderId)
00264 {
00265 XrdXrootdMonTrace trace;
00266 memcpy(&trace, packet, sizeof(XrdXrootdMonTrace));
00267 kXR_unt32 dictId = ntohl(trace.arg2.dictid);
00268 kXR_unt32 tR = ntohl(trace.arg0.rTot[1]);
00269 kXR_unt32 tW = ntohl(trace.arg1.wTot);
00270 char rShift = trace.arg0.id[1];
00271 char wShift = trace.arg0.id[2];
00272 kXR_int64 realR = tR; realR = realR << rShift;
00273 kXR_int64 realW = tW; realW = realW << wShift;
00274
00275
00276
00277
00278
00279
00280 _sink.closeFile(dictId, realR, realW, timestamp, senderId);
00281 }
00282
00283 void
00284 XrdMonDecPacketDecoder::decodeDisconnect(const char* packet,
00285 kXR_int32 timestamp,
00286 senderid_t senderId)
00287 {
00288 XrdXrootdMonTrace trace;
00289 memcpy(&trace, packet, sizeof(XrdXrootdMonTrace));
00290 kXR_int32 sec = ntohl(trace.arg1.buflen);
00291 kXR_unt32 dictId = ntohl(trace.arg2.dictid);
00292
00293 cout << "decoded user disconnect, dict " << dictId
00294 << ", sec = " << sec << ", t = " << timestamp << endl;
00295
00296 _sink.addUserDisconnect(dictId, sec, timestamp, senderId);
00297 }
00298
00299 XrdMonDecPacketDecoder::CalcTime
00300 XrdMonDecPacketDecoder::prepareTimestamp(const char* packet,
00301 int& offset,
00302 int len,
00303 kXR_int32& begTime)
00304 {
00305
00306 int x = offset;
00307 int noElems = 0;
00308 while ( static_cast<kXR_char>(*(packet+x)) != XROOTD_MON_WINDOW ) {
00309 if ( x >= len ) {
00310 throw XrdMonException(ERR_NOTATIMEWINDOW,
00311 "Expected time window packet (last packet)");
00312 }
00313 x += TRACELEN;
00314 ++noElems;
00315 }
00316
00317
00318
00319
00320
00321 TimePair t = decodeTime(packet+x);
00322
00323
00324
00325 if ( begTime > t.first ) {
00326 char buf[256];
00327 sprintf(buf, "Wrong time: %d > %d at offset %d, will fix", begTime, t.first, x);
00328 cout << buf << endl;
00329 begTime = t.first;
00330
00331 }
00332
00333 float timePerTrace = ((float)(t.first - begTime)) / noElems;
00334
00335
00336 return CalcTime(timePerTrace, t.second, x);
00337 }