XrdMonDecPreProcess.cc

Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                           XrdMonDecPreProcess.cc                          */
00004 /*                                                                           */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00010 
00011 // $Id: XrdMonDecPreProcess.cc 24468 2008-06-22 16:47:03Z ganis $
00012 
00013 #include "XrdMon/XrdMonCommon.hh"
00014 #include "XrdMon/XrdMonDecPreProcess.hh"
00015 #include "XrdMon/XrdMonErrors.hh"
00016 #include "XrdMon/XrdMonException.hh"
00017 #include "XrdMon/XrdMonHeader.hh"
00018 #include "XrdMon/XrdMonUtils.hh"
00019 
00020 #include <iomanip>
00021 #include "XrdSys/XrdSysHeaders.hh"
00022 #include <sstream>
00023 using std::cout;
00024 using std::cerr;
00025 using std::endl;
00026 using std::ios;
00027 using std::setprecision;
00028 using std::setw;
00029 using std::stringstream;
00030 
00031 XrdMonDecPreProcess::XrdMonDecPreProcess(fstream& theFile, 
00032                              kXR_int64 fSize, 
00033                              sequen_t lastSeq,
00034                              kXR_int32 ignoreIfBefore,
00035                              vector< pair<packetlen_t, kXR_int64> >& allPackets)
00036     : _file(theFile),
00037       _fSize(fSize),
00038       _tempBufPos(-1),
00039       _markNextSlotAsSpecial(false),
00040       _ignoreIfBefore(ignoreIfBefore),
00041       _allPackets(allPackets),
00042       _lastSeq(lastSeq)
00043 {}
00044 
00045 void
00046 XrdMonDecPreProcess::operator()()
00047 {
00048     _allPackets.reserve(64*1024);
00049     _lostPackets.reserve(12);
00050     
00051     _file.seekg(0, ios::beg);
00052 
00053     checkFile();
00054     reportAndThrowIfTooBad();
00055 
00056     // prepare file for reading
00057     _file.seekg(0, ios::beg);
00058 }
00059 
00060 void
00061 XrdMonDecPreProcess::checkFile()
00062 {
00063     kXR_int32 xrdStartTime = 0;
00064     
00065     enum { RBUFSIZE = 1024*1024 };
00066     char rBuf[RBUFSIZE];    
00067 
00068     while ( _fSize > _file.tellg() ) {
00069         // fill buffer
00070         kXR_int64 fPos = _file.tellg(); // tellg of this read buffer
00071 
00072         int no2Read = _fSize-fPos > RBUFSIZE ? RBUFSIZE : _fSize-fPos;
00073         
00074         _file.read(rBuf, no2Read);
00075         cout << "Read " << no2Read << " bytes" << endl;
00076         
00077         kXR_int64 fPosEnd = _file.tellg();
00078         int rBufMax = fPosEnd - fPos;        
00079         
00080         // process packets until rbuf's size at least MAXPACKETSIZE
00081         int rPos = 0;
00082         while ( rPos < rBufMax ) {
00083             int noBytesRead = processOnePacket(rBuf+rPos, rBufMax-rPos, 
00084                                                fPos+rPos, xrdStartTime);
00085             if ( noBytesRead == -1 ) { // only a piece of a packet 
00086                 break;                 // left in buffer
00087             }
00088             rPos += noBytesRead;
00089         }
00090         _file.seekg(fPos+rBufMax - (rBufMax-rPos));
00091     }
00092 }
00093 
00094 int
00095 XrdMonDecPreProcess::processOnePacket(const char* buf, 
00096                                       int bytesLeft, 
00097                                       kXR_int64 fPos, 
00098                                       kXR_int32& xrdStartTime)
00099 {
00100     XrdMonDecOnePacket packet;
00101     int noBytesRead = packet.init(buf, bytesLeft, fPos);
00102     if ( noBytesRead == -1 ) {
00103         return -1;
00104     }
00105     
00106     if ( packet.stod() < _ignoreIfBefore ) {
00107         cout << "Ignoring " << packet 
00108              << ", timestamp " << packet.stod() << endl;
00109         XrdMonDecOnePacket::resetNextNr();
00110         return noBytesRead;
00111     }
00112     if ( packet.myNr() == 0 ) {
00113         xrdStartTime = packet.stod();
00114         cout << "xrd start time " << xrdStartTime
00115              << " --> " << timestamp2string(xrdStartTime) << endl;
00116     } else if ( packet.stod() != xrdStartTime ) {
00117         // BTW, FIXME memory leak in stringstream::str()
00118         stringstream ss(stringstream::out);
00119         ss << "xrd start time changed " << packet.stod() 
00120            << ", this is not supported";
00121         throw XrdMonException(ERR_INTERNALERR, ss.str());
00122     }   
00123 
00124     cout << "XrdMonDecPreprocessing " << packet << endl;
00125         
00126     if ( packet.myNr() == 64*1024 ) {
00127         double perc = (100*(double)_file.tellg())/_fSize;
00128         int todo = (int) ((100-perc) * 64 * 1024 / perc);
00129         cout << "Processed 64K packets, currently at " 
00130              << _file.tellg() << ", total size " << _fSize
00131              << " (" << perc << "%), ~" << todo 
00132              << " packets todo" << endl;
00133         _allPackets.reserve(todo + 64*1024 + 8 * 1024);
00134     }
00135         
00136     kXR_char expected = previousSeq() + 1;
00137     
00138     int gap = packet.seq() - expected;
00139 
00140     if ( 0 == gap ) {
00141         keepPacket(packet);          
00142         return noBytesRead;
00143     }
00144     if ( gap < 0 || ((255 + expected - packet.seq()) < TBUFSIZE) ) {
00145         // likely to be an out of order packet, do some studies to check
00146         if ( outOfOrder(packet) ) { // if ooo, all done inside
00147             return noBytesRead;
00148         }
00149     }
00150     if ( gap < 0 ) {
00151         gap = 256 - expected + packet.seq();
00152     }
00153     while ( gap-- ) {
00154         XrdMonDecOnePacket lostPacket(XrdMonDecOnePacket::LOST);
00155         keepPacket(lostPacket);
00156         cout << "Possibly lost packet" << endl;
00157     }
00158     keepPacket(packet);
00159     return noBytesRead;
00160 }
00161 
00162 void
00163 XrdMonDecPreProcess::keepPacket(XrdMonDecOnePacket& packet)
00164 {
00165     _allPackets.push_back(
00166              pair<packetlen_t, kXR_int64>(packet.len(), packet.fPos())
00167                          );
00168     add2TempBuf(packet);
00169 }
00170 
00171 void
00172 XrdMonDecPreProcess::add2TempBuf(XrdMonDecOnePacket& packet)
00173 {
00174     if ( _tempBufPos < MAXTBUFELEM ) {
00175         _tempBuf[++_tempBufPos] = packet;
00176     } else {
00177         if ( _tempBuf[0].isLost() ) {
00178             _lostPackets.push_back(_tempBuf[0].myNr());
00179         }
00180         int i = 1;
00181         while ( i<TBUFSIZE ) {
00182             _tempBuf[i-1] = _tempBuf[i];
00183             ++i;
00184         }
00185         _tempBuf[MAXTBUFELEM] = packet;
00186     }
00187 }
00188 
00189 kXR_char
00190 XrdMonDecPreProcess::previousSeq() const
00191 {
00192     if ( _tempBufPos == -1 ) {
00193         return _lastSeq;
00194     }
00195     for (int i=_tempBufPos ; i>=0 ; --i) {
00196         int s = _tempBuf[i].seq();
00197         if ( s >= 0 ) {
00198             return s;
00199         }
00200     }
00201     return 0xFF;
00202 }
00203 
00204 // returns position in the _packets vector where this packet belongs
00205 bool
00206 XrdMonDecPreProcess::outOfOrder(XrdMonDecOnePacket& packet)
00207 {
00208     int pos = _tempBufPos; // "pos" - position of ooo packet in the tempBuf
00209     while ( pos > 0 ) {
00210         --pos;
00211         if ( _tempBuf[pos].seq() == XrdMonDecOnePacket::LOST ) {
00212             if ( _tempBuf[pos-1].seq() + 1 == packet.seq() &&
00213                  _tempBuf[pos+1].seq() - 1 == packet.seq()   ) {
00214                 cout << "Out of order packet arrived, seq " 
00215                      << packet.seq() << endl;
00216 
00217                 _tempBuf[pos] = packet;
00218 
00219                 int apPos = _allPackets.size() - (MAXTBUFELEM - pos) - 1;
00220                 _allPackets[apPos].first  = packet.len();
00221                 _allPackets[apPos].second = packet.fPos();
00222                 cout << "set length " << packet.len()
00223                      << " for position " << apPos 
00224                      << ", seq is " << packet.seq() << endl;
00225                 
00226                 _oooPackets.push_back(apPos);
00227                 return true; // yes, ooo packet
00228             }
00229         }
00230     }
00231     return false; // not an ooo packet
00232 }
00233 
00234 void
00235 XrdMonDecPreProcess::reportAndThrowIfTooBad()
00236 {
00237     int noLost = _lostPackets.size();
00238     int noOOO  = _oooPackets.size();
00239     int i, totalNoPackets = _allPackets.size();
00240     
00241     double perc = (100*(double)noLost)/totalNoPackets;
00242     cout << noLost << " packets lost out of " << totalNoPackets
00243          << " (" << setprecision(2) << perc << "%)"
00244          << ", " << noOOO << " out-of-order packets" << endl;
00245     if ( noLost > 0 ) {
00246         cout << "Lost: ";
00247         for (i=0 ; i<noLost ; ++i) {
00248             cout << _lostPackets[i] << ", ";
00249         }
00250         cout << endl;
00251     }
00252     if ( noOOO > 0 ) {
00253         cout << "OOO: ";
00254         for (i=0 ; i<noOOO ; ++i) {
00255             cout << _oooPackets[i] << ", ";
00256         }
00257         cout << endl;
00258     }
00259 
00260     cout << "dictIds: min=" << XrdMonDecOnePacket::minDictId() 
00261          << ", max=" << XrdMonDecOnePacket::maxDictId() << endl;
00262 
00263     if ( perc > 5 ) {
00264         stringstream ss(stringstream::out);
00265         ss << "Too many lost packets: " << noLost
00266            << " (" << perc << "%)";
00267         throw XrdMonException(ERR_TOOMANYLOST, ss.str());
00268     }
00269 
00270     //cout << " ================" << endl;
00271     //cout << "allPackets:\n";
00272     //
00273     //int s = _allPackets.size();
00274     //for (i=0 ; i<s ; ++i) {
00275     //    cout << setw(3) << i 
00276     //         << setw(5) << ") len = " << setw(4) << _allPackets[i].first
00277     //         << ", fPos = " << setw(7) << _allPackets[i].second << endl;
00278     //}
00279 }

Generated on Tue Jul 5 14:46:42 2011 for ROOT_528-00b_version by  doxygen 1.5.1