00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "XrdMon/XrdMonCommon.hh"
00014 #include "XrdMon/XrdMonDecPreProcess.hh"
00015 #include "XrdMon/XrdMonErrors.hh"
00016 #include "XrdMon/XrdMonException.hh"
00017 #include "XrdMon/XrdMonHeader.hh"
00018 #include "XrdMon/XrdMonUtils.hh"
00019
00020 #include <iomanip>
00021 #include "XrdSys/XrdSysHeaders.hh"
00022 #include <sstream>
00023 using std::cout;
00024 using std::cerr;
00025 using std::endl;
00026 using std::ios;
00027 using std::setprecision;
00028 using std::setw;
00029 using std::stringstream;
00030
00031 XrdMonDecPreProcess::XrdMonDecPreProcess(fstream& theFile,
00032 kXR_int64 fSize,
00033 sequen_t lastSeq,
00034 kXR_int32 ignoreIfBefore,
00035 vector< pair<packetlen_t, kXR_int64> >& allPackets)
00036 : _file(theFile),
00037 _fSize(fSize),
00038 _tempBufPos(-1),
00039 _markNextSlotAsSpecial(false),
00040 _ignoreIfBefore(ignoreIfBefore),
00041 _allPackets(allPackets),
00042 _lastSeq(lastSeq)
00043 {}
00044
00045 void
00046 XrdMonDecPreProcess::operator()()
00047 {
00048 _allPackets.reserve(64*1024);
00049 _lostPackets.reserve(12);
00050
00051 _file.seekg(0, ios::beg);
00052
00053 checkFile();
00054 reportAndThrowIfTooBad();
00055
00056
00057 _file.seekg(0, ios::beg);
00058 }
00059
00060 void
00061 XrdMonDecPreProcess::checkFile()
00062 {
00063 kXR_int32 xrdStartTime = 0;
00064
00065 enum { RBUFSIZE = 1024*1024 };
00066 char rBuf[RBUFSIZE];
00067
00068 while ( _fSize > _file.tellg() ) {
00069
00070 kXR_int64 fPos = _file.tellg();
00071
00072 int no2Read = _fSize-fPos > RBUFSIZE ? RBUFSIZE : _fSize-fPos;
00073
00074 _file.read(rBuf, no2Read);
00075 cout << "Read " << no2Read << " bytes" << endl;
00076
00077 kXR_int64 fPosEnd = _file.tellg();
00078 int rBufMax = fPosEnd - fPos;
00079
00080
00081 int rPos = 0;
00082 while ( rPos < rBufMax ) {
00083 int noBytesRead = processOnePacket(rBuf+rPos, rBufMax-rPos,
00084 fPos+rPos, xrdStartTime);
00085 if ( noBytesRead == -1 ) {
00086 break;
00087 }
00088 rPos += noBytesRead;
00089 }
00090 _file.seekg(fPos+rBufMax - (rBufMax-rPos));
00091 }
00092 }
00093
00094 int
00095 XrdMonDecPreProcess::processOnePacket(const char* buf,
00096 int bytesLeft,
00097 kXR_int64 fPos,
00098 kXR_int32& xrdStartTime)
00099 {
00100 XrdMonDecOnePacket packet;
00101 int noBytesRead = packet.init(buf, bytesLeft, fPos);
00102 if ( noBytesRead == -1 ) {
00103 return -1;
00104 }
00105
00106 if ( packet.stod() < _ignoreIfBefore ) {
00107 cout << "Ignoring " << packet
00108 << ", timestamp " << packet.stod() << endl;
00109 XrdMonDecOnePacket::resetNextNr();
00110 return noBytesRead;
00111 }
00112 if ( packet.myNr() == 0 ) {
00113 xrdStartTime = packet.stod();
00114 cout << "xrd start time " << xrdStartTime
00115 << " --> " << timestamp2string(xrdStartTime) << endl;
00116 } else if ( packet.stod() != xrdStartTime ) {
00117
00118 stringstream ss(stringstream::out);
00119 ss << "xrd start time changed " << packet.stod()
00120 << ", this is not supported";
00121 throw XrdMonException(ERR_INTERNALERR, ss.str());
00122 }
00123
00124 cout << "XrdMonDecPreprocessing " << packet << endl;
00125
00126 if ( packet.myNr() == 64*1024 ) {
00127 double perc = (100*(double)_file.tellg())/_fSize;
00128 int todo = (int) ((100-perc) * 64 * 1024 / perc);
00129 cout << "Processed 64K packets, currently at "
00130 << _file.tellg() << ", total size " << _fSize
00131 << " (" << perc << "%), ~" << todo
00132 << " packets todo" << endl;
00133 _allPackets.reserve(todo + 64*1024 + 8 * 1024);
00134 }
00135
00136 kXR_char expected = previousSeq() + 1;
00137
00138 int gap = packet.seq() - expected;
00139
00140 if ( 0 == gap ) {
00141 keepPacket(packet);
00142 return noBytesRead;
00143 }
00144 if ( gap < 0 || ((255 + expected - packet.seq()) < TBUFSIZE) ) {
00145
00146 if ( outOfOrder(packet) ) {
00147 return noBytesRead;
00148 }
00149 }
00150 if ( gap < 0 ) {
00151 gap = 256 - expected + packet.seq();
00152 }
00153 while ( gap-- ) {
00154 XrdMonDecOnePacket lostPacket(XrdMonDecOnePacket::LOST);
00155 keepPacket(lostPacket);
00156 cout << "Possibly lost packet" << endl;
00157 }
00158 keepPacket(packet);
00159 return noBytesRead;
00160 }
00161
00162 void
00163 XrdMonDecPreProcess::keepPacket(XrdMonDecOnePacket& packet)
00164 {
00165 _allPackets.push_back(
00166 pair<packetlen_t, kXR_int64>(packet.len(), packet.fPos())
00167 );
00168 add2TempBuf(packet);
00169 }
00170
00171 void
00172 XrdMonDecPreProcess::add2TempBuf(XrdMonDecOnePacket& packet)
00173 {
00174 if ( _tempBufPos < MAXTBUFELEM ) {
00175 _tempBuf[++_tempBufPos] = packet;
00176 } else {
00177 if ( _tempBuf[0].isLost() ) {
00178 _lostPackets.push_back(_tempBuf[0].myNr());
00179 }
00180 int i = 1;
00181 while ( i<TBUFSIZE ) {
00182 _tempBuf[i-1] = _tempBuf[i];
00183 ++i;
00184 }
00185 _tempBuf[MAXTBUFELEM] = packet;
00186 }
00187 }
00188
00189 kXR_char
00190 XrdMonDecPreProcess::previousSeq() const
00191 {
00192 if ( _tempBufPos == -1 ) {
00193 return _lastSeq;
00194 }
00195 for (int i=_tempBufPos ; i>=0 ; --i) {
00196 int s = _tempBuf[i].seq();
00197 if ( s >= 0 ) {
00198 return s;
00199 }
00200 }
00201 return 0xFF;
00202 }
00203
00204
00205 bool
00206 XrdMonDecPreProcess::outOfOrder(XrdMonDecOnePacket& packet)
00207 {
00208 int pos = _tempBufPos;
00209 while ( pos > 0 ) {
00210 --pos;
00211 if ( _tempBuf[pos].seq() == XrdMonDecOnePacket::LOST ) {
00212 if ( _tempBuf[pos-1].seq() + 1 == packet.seq() &&
00213 _tempBuf[pos+1].seq() - 1 == packet.seq() ) {
00214 cout << "Out of order packet arrived, seq "
00215 << packet.seq() << endl;
00216
00217 _tempBuf[pos] = packet;
00218
00219 int apPos = _allPackets.size() - (MAXTBUFELEM - pos) - 1;
00220 _allPackets[apPos].first = packet.len();
00221 _allPackets[apPos].second = packet.fPos();
00222 cout << "set length " << packet.len()
00223 << " for position " << apPos
00224 << ", seq is " << packet.seq() << endl;
00225
00226 _oooPackets.push_back(apPos);
00227 return true;
00228 }
00229 }
00230 }
00231 return false;
00232 }
00233
00234 void
00235 XrdMonDecPreProcess::reportAndThrowIfTooBad()
00236 {
00237 int noLost = _lostPackets.size();
00238 int noOOO = _oooPackets.size();
00239 int i, totalNoPackets = _allPackets.size();
00240
00241 double perc = (100*(double)noLost)/totalNoPackets;
00242 cout << noLost << " packets lost out of " << totalNoPackets
00243 << " (" << setprecision(2) << perc << "%)"
00244 << ", " << noOOO << " out-of-order packets" << endl;
00245 if ( noLost > 0 ) {
00246 cout << "Lost: ";
00247 for (i=0 ; i<noLost ; ++i) {
00248 cout << _lostPackets[i] << ", ";
00249 }
00250 cout << endl;
00251 }
00252 if ( noOOO > 0 ) {
00253 cout << "OOO: ";
00254 for (i=0 ; i<noOOO ; ++i) {
00255 cout << _oooPackets[i] << ", ";
00256 }
00257 cout << endl;
00258 }
00259
00260 cout << "dictIds: min=" << XrdMonDecOnePacket::minDictId()
00261 << ", max=" << XrdMonDecOnePacket::maxDictId() << endl;
00262
00263 if ( perc > 5 ) {
00264 stringstream ss(stringstream::out);
00265 ss << "Too many lost packets: " << noLost
00266 << " (" << perc << "%)";
00267 throw XrdMonException(ERR_TOOMANYLOST, ss.str());
00268 }
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279 }