XrdMonDecPacketDecoder.cc

Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                        XrdMonDecPacketDecoder.cc                          */
00004 /*                                                                           */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00010 
00011 // $Id: XrdMonDecPacketDecoder.cc 24468 2008-06-22 16:47:03Z ganis $
00012 
00013 #include "XrdMon/XrdMonException.hh"
00014 #include "XrdMon/XrdMonCommon.hh"
00015 #include "XrdMon/XrdMonHeader.hh"
00016 #include "XrdMon/XrdMonUtils.hh"
00017 #include "XrdMon/XrdMonErrors.hh"
00018 #include "XrdMon/XrdMonDecPacketDecoder.hh"
00019 #include "XrdMon/XrdMonDecTraceInfo.hh"
00020 #include "XrdSys/XrdSysPlatform.hh"
00021 #include "XrdXrootd/XrdXrootdMonData.hh"
00022 #include <stdio.h>
00023 #include <netinet/in.h>
00024 using std::cerr;
00025 using std::cout;
00026 using std::endl;
00027 
00028 // for light decoding in real time
00029 XrdMonDecPacketDecoder::XrdMonDecPacketDecoder(const char* baseDir, 
00030                                                const char* rtLogDir, 
00031                                                int rtBufSize)
00032     : _sink(baseDir, rtLogDir, rtBufSize, false, 2),
00033       _stopNow(false),
00034       _upToTime(0)
00035 {}
00036 
00037 XrdMonDecPacketDecoder::XrdMonDecPacketDecoder(const char* baseDir,
00038                                                bool saveTraces,
00039                                                int maxTraceLogSize,
00040                                                kXR_int32 upToTime)
00041     : _sink(baseDir, 0, 8, saveTraces, maxTraceLogSize),
00042       _stopNow(false),
00043       _upToTime(upToTime)
00044 {}
00045 
00046 void
00047 XrdMonDecPacketDecoder::init(dictid_t min, 
00048                              dictid_t max, 
00049                              const string& senderHP)
00050 {
00051     _sink.init(min, max, senderHP);
00052 }    
00053 
00054 // true if up-to-time reached and decoding should stop
00055 void
00056 XrdMonDecPacketDecoder::operator()(const XrdMonHeader& header,
00057                                    const char* packet,
00058                                    senderid_t senderId)
00059 {
00060     int len = header.packetLen() - HDRLEN;
00061     //cout << "header " << header << endl;
00062     
00063     if ( len < 1 ) {
00064         cout << "Warning: Ignoring empty packet" << endl;
00065         return;
00066     }
00067 
00068     if ( header.stodChanged(senderId) ) {
00069         _sink.registerXrdRestart(header.stod(), senderId);
00070     }
00071 
00072     switch ( header.packetType() ) {
00073         case PACKET_TYPE_TRACE : {
00074             decodeTracePacket(packet+HDRLEN, len, senderId);
00075             break;
00076         }
00077         case PACKET_TYPE_DICT : {
00078             decodeDictPacket(packet+HDRLEN, len, senderId);
00079             break;
00080         }
00081         case PACKET_TYPE_USER : {
00082             decodeUserPacket(packet+HDRLEN, len, senderId);
00083             break;
00084         }
00085         case PACKET_TYPE_STAGE : {
00086             decodeStagePacket(packet+HDRLEN, len, senderId);
00087             break;
00088         }
00089         
00090         default: {
00091             cerr << "Unsupported packet type: " << header.packetType() << endl;
00092         }
00093     }
00094     _sink.setLastSeq(header.seqNo());
00095 }
00096 
00097 void
00098 XrdMonDecPacketDecoder::reset(senderid_t senderId)
00099 {
00100     _sink.reset(senderId);
00101 }
00102 
00103 // packet should point to data after header
00104 void
00105 XrdMonDecPacketDecoder::decodeTracePacket(const char* packet, 
00106                                           int len, 
00107                                           senderid_t senderId)
00108 {
00109     // decode first packet - time window
00110     if ( static_cast<kXR_char>(*packet) != XROOTD_MON_WINDOW ) {
00111         char buf[256];
00112         sprintf(buf, "Expected time window packet (1st packet), got %i, ", (int)*packet);
00113         throw XrdMonException(ERR_NOTATIMEWINDOW, buf);
00114     }
00115     TimePair t = decodeTime(packet);
00116     if ( _upToTime != 0 && _upToTime <= t.first ) {
00117         cout << "reached the up-to-time, will stop decoding now" << endl;
00118         _stopNow = true;
00119         return;
00120     }
00121     kXR_int32 begTime = t.second;
00122     int offset = TRACELEN;
00123 
00124     //cout << "Decoded time (first) " << t.first << " " << t.second << endl;
00125     
00126     while ( offset < len ) {
00127         CalcTime ct = prepareTimestamp(packet, offset, len, begTime);
00128         int elemNo = 0;
00129         while ( offset<ct.endOffset ) {
00130             kXR_char infoType = static_cast<kXR_char>(*(packet+offset));
00131             kXR_int32 timestamp = begTime + (kXR_int32) (elemNo++ * ct.timePerTrace);
00132             if ( !(infoType & XROOTD_MON_RWREQUESTMASK) ) {
00133                 decodeRWRequest(packet+offset, timestamp, senderId);
00134             } else if ( infoType == XROOTD_MON_OPEN ) {
00135                 decodeOpen(packet+offset, timestamp, senderId);
00136             } else if ( infoType == XROOTD_MON_CLOSE ) {
00137                 decodeClose(packet+offset, timestamp, senderId);
00138             } else if ( infoType == XROOTD_MON_DISC ) {
00139                 decodeDisconnect(packet+offset, timestamp, senderId);
00140             } else {
00141                 char buf[256];
00142                 sprintf(buf, "Unsupported infoType of trace packet: %i", (int) infoType);
00143                 throw XrdMonException(ERR_INVALIDINFOTYPE, buf);
00144             }
00145             offset += TRACELEN;
00146         }
00147         begTime = ct.begTimeNextWindow;
00148         offset += TRACELEN; // skip window trace which was already read
00149     }
00150 }
00151 
00152 // packet should point to data after header
00153 void
00154 XrdMonDecPacketDecoder::decodeDictPacket(const char* packet, 
00155                                          int len, 
00156                                          senderid_t senderId)
00157 {
00158     kXR_int32 x32;
00159     memcpy(&x32, packet, sizeof(kXR_int32));
00160     dictid_t dictId = ntohl(x32);
00161     
00162     _sink.addDictId(dictId, 
00163                     packet+sizeof(kXR_int32), 
00164                     len-sizeof(kXR_int32),
00165                     senderId);
00166 }
00167 
00168 // packet should point to data after header
00169 void
00170 XrdMonDecPacketDecoder::decodeUserPacket(const char* packet,
00171                                          int len,
00172                                          senderid_t senderId)
00173 {
00174     kXR_int32 x32;
00175     memcpy(&x32, packet, sizeof(kXR_int32));
00176     dictid_t dictId = ntohl(x32);
00177     
00178     _sink.addUserId(dictId,
00179                     packet+sizeof(kXR_int32),
00180                     len-sizeof(kXR_int32),
00181                     senderId);
00182 }
00183 
00184 // packet should point to data after header
00185 void
00186 XrdMonDecPacketDecoder::decodeStagePacket(const char* packet, 
00187                                           int len, 
00188                                           senderid_t senderId)
00189 {
00190     kXR_int32 x32;
00191     memcpy(&x32, packet, sizeof(kXR_int32));
00192     dictid_t dictId = ntohl(x32);
00193 
00194     _sink.addStageInfo(dictId, 
00195                        packet+sizeof(kXR_int32), 
00196                        len-sizeof(kXR_int32),
00197                        senderId);
00198 }
00199 
00200 XrdMonDecPacketDecoder::TimePair
00201 XrdMonDecPacketDecoder::decodeTime(const char* packet)
00202 {
00203     struct X {
00204         kXR_int32 endT;
00205         kXR_int32 begT;
00206     } x;
00207 
00208     memcpy(&x, packet+sizeof(kXR_int64), sizeof(X));
00209     return TimePair(ntohl(x.endT), ntohl(x.begT));
00210 }
00211 
00212 void
00213 XrdMonDecPacketDecoder::decodeRWRequest(const char* packet,
00214                                         kXR_int32 timestamp, 
00215                                         senderid_t senderId)
00216 {
00217     XrdXrootdMonTrace trace;
00218     memcpy(&trace, packet, sizeof(XrdXrootdMonTrace));
00219     kXR_int64 tOffset = ntohll(trace.arg0.val);
00220     kXR_int32 tLen    = ntohl (trace.arg1.buflen);
00221     kXR_unt32 dictId  = ntohl (trace.arg2.dictid);
00222 
00223     if ( tOffset < 0 ) {
00224         throw XrdMonException(ERR_NEGATIVEOFFSET);
00225     }
00226     char rwReq = 'r';
00227     if ( tLen<0 ) {
00228         rwReq = 'w';
00229         tLen *= -1;
00230     }
00231 
00232     XrdMonDecTraceInfo traceInfo(tOffset, tLen, rwReq, timestamp);
00233     _sink.add(dictId, traceInfo, senderId);
00234 }
00235 
00236 void
00237 XrdMonDecPacketDecoder::decodeOpen(const char* packet,
00238                                    kXR_int32 timestamp, 
00239                                    senderid_t senderId)
00240 {
00241     XrdXrootdMonTrace trace;
00242     memcpy(&trace, packet, sizeof(XrdXrootdMonTrace));
00243     kXR_unt32 dictId = ntohl(trace.arg2.dictid);
00244     // mask needed to hide id[0] value which is stored
00245     // in the top most byte
00246 #if _FILE_OFFSET_BITS==64
00247     kXR_int64 maskHBO = 0x00ffffffffffffffLL;
00248 #else
00249     kXR_int64 maskHBO = 0x00ffffff;
00250 #endif
00251     kXR_int64 maskNBO = 0;
00252     h2nll(maskHBO, maskNBO);
00253     kXR_int64 maskedVal = trace.arg0.val & maskNBO;
00254     kXR_int64 fSize = 0;
00255     n2hll(maskedVal, fSize);
00256     
00257     _sink.openFile(dictId, timestamp, senderId, fSize);
00258 }
00259 
00260 void
00261 XrdMonDecPacketDecoder::decodeClose(const char* packet,
00262                                     kXR_int32 timestamp, 
00263                                     senderid_t senderId)
00264 {
00265     XrdXrootdMonTrace trace;
00266     memcpy(&trace, packet, sizeof(XrdXrootdMonTrace));
00267     kXR_unt32 dictId = ntohl(trace.arg2.dictid);
00268     kXR_unt32 tR     = ntohl(trace.arg0.rTot[1]);
00269     kXR_unt32 tW     = ntohl(trace.arg1.wTot);
00270     char rShift      = trace.arg0.id[1];
00271     char wShift      = trace.arg0.id[2];
00272     kXR_int64 realR  = tR; realR = realR << rShift;
00273     kXR_int64 realW  = tW; realW = realW << wShift;
00274 
00275     //cout << "decoded close file, dict " << dictId 
00276     //     << ", total r " << tR << " shifted " << (int) rShift << ", or " << realR
00277     //     << ", total w " << tW << " shifted " << (int) wShift << ", or " << realW
00278     //     << endl;
00279 
00280     _sink.closeFile(dictId, realR, realW, timestamp, senderId);
00281 }
00282 
00283 void
00284 XrdMonDecPacketDecoder::decodeDisconnect(const char* packet, 
00285                                          kXR_int32 timestamp, 
00286                                          senderid_t senderId)
00287 {
00288     XrdXrootdMonTrace trace;
00289     memcpy(&trace, packet, sizeof(XrdXrootdMonTrace));
00290     kXR_int32 sec    = ntohl(trace.arg1.buflen);
00291     kXR_unt32 dictId = ntohl(trace.arg2.dictid);
00292 
00293     cout << "decoded user disconnect, dict " << dictId
00294          << ", sec = " << sec << ", t = " << timestamp << endl;
00295 
00296     _sink.addUserDisconnect(dictId, sec, timestamp, senderId);
00297 }
00298 
00299 XrdMonDecPacketDecoder::CalcTime
00300 XrdMonDecPacketDecoder::prepareTimestamp(const char* packet, 
00301                                          int& offset, 
00302                                          int len, 
00303                                          kXR_int32& begTime)
00304 {
00305     // look for time window
00306     int x = offset;
00307     int noElems = 0;
00308     while ( static_cast<kXR_char>(*(packet+x)) != XROOTD_MON_WINDOW ) {
00309         if ( x >= len ) {
00310             throw XrdMonException(ERR_NOTATIMEWINDOW, 
00311                               "Expected time window packet (last packet)");
00312         }
00313         x += TRACELEN;
00314         ++noElems;
00315     }
00316 
00317     // cout << "Found timestamp, offset " << x 
00318     //     << " after " << noElems << " elements" << endl;
00319     
00320     // decode time window
00321     TimePair t = decodeTime(packet+x);
00322 
00323     // cout << "decoded time " << t.first << " " << t.second << endl;
00324     
00325     if ( begTime > t.first ) {
00326         char buf[256];
00327         sprintf(buf, "Wrong time: %d > %d at offset %d, will fix", begTime, t.first, x);
00328         cout << buf << endl;
00329         begTime = t.first;
00330         //throw XrdMonException(ERR_INVALIDTIME, buf);
00331     }
00332 
00333     float timePerTrace = ((float)(t.first - begTime)) / noElems;
00334     //cout << "timepertrace = " << timePerTrace << endl;
00335     
00336     return CalcTime(timePerTrace, t.second, x);
00337 }

Generated on Tue Jul 5 14:46:42 2011 for ROOT_528-00b_version by  doxygen 1.5.1