00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "XrdMon/XrdMonErrors.hh"
00014 #include "XrdMon/XrdMonException.hh"
00015 #include "XrdMon/XrdMonDecSink.hh"
00016 #include "XrdMon/XrdMonDecTraceInfo.hh"
00017 #include "XrdMon/XrdMonDecStageInfo.hh"
00018 #include "XrdMon/XrdMonSenderInfo.hh"
00019 #include "XrdMon/XrdMonUtils.hh"
00020
00021 #include <netinet/in.h>
00022 #include <sys/time.h>
00023 #include <iomanip>
00024 #include <unistd.h>
00025 using std::cerr;
00026 using std::cout;
00027 using std::endl;
00028 using std::ios;
00029 using std::map;
00030 using std::setw;
00031
00032 const kXR_unt16 XrdMonDecSink::VER_FREQ = 1000;
00033
00034 XrdMonDecSink::XrdMonDecSink(const char* baseDir,
00035 const char* rtLogDir,
00036 int rtBufSize,
00037 bool saveTraces,
00038 int maxTraceLogSize)
00039 : _verFreqCount(VER_FREQ),
00040 _rtLogger(0),
00041 _saveTraces(saveTraces),
00042 _tCacheSize(32*1024),
00043 _traceLogNumber(0),
00044 _maxTraceLogSize(maxTraceLogSize),
00045 _lastSeq(0xFF),
00046 _uniqueDictId(1),
00047 _uniqueUserId(1)
00048 {
00049 if ( maxTraceLogSize < 2 ) {
00050 cerr << "Trace log size must be > 2MB" << endl;
00051 throw XrdMonException(ERR_INVALIDARG, "Trace log size must be > 2MB");
00052 }
00053
00054 _path = baseDir;
00055 _path += "/";
00056 _jnlPath = _path + "/jnl";
00057 _path += generateTimestamp();
00058 _path += "_";
00059 _dictPath = _path + "dict.ascii";
00060 _userPath = _path + "user.ascii";
00061 _xrdRestartLog = baseDir;
00062 _xrdRestartLog += "/xrdRestarts.ascii";
00063
00064 if ( 0 == access(_dictPath.c_str(), F_OK) ) {
00065 string s("File "); s += _dictPath;
00066 s += " exists. Move it somewhere else first.";
00067 throw XrdMonException(ERR_INVALIDARG, s);
00068 }
00069 if ( _saveTraces ) {
00070 _tCache.reserve(_tCacheSize+1);
00071 string fTPath = _path + "trace000.ascii";
00072 if ( 0 == access(fTPath.c_str(), F_OK) ) {
00073 string s("File "); s += fTPath;
00074 s += " exists. Move it somewhere else first.";
00075 throw XrdMonException(ERR_INVALIDARG, s);
00076 }
00077 }
00078
00079 if ( 0 != rtLogDir ) {
00080 initRT(rtLogDir, rtBufSize);
00081 } else {
00082 loadUniqueIdsAndSeq();
00083 }
00084 }
00085
00086 XrdMonDecSink::~XrdMonDecSink()
00087 {
00088 flushClosedDicts();
00089
00090 reportLostPackets();
00091 _lost.clear();
00092
00093
00094 if ( 0 == _rtLogger ) {
00095 flushTCache();
00096 checkpoint();
00097 }
00098 {
00099 XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00100 int i, dcacheSize = _dCache.size();
00101 for ( i=0; i<dcacheSize ; ++i ) {
00102 resetDMap(i);
00103 delete _dCache[i];
00104 _dCache[i] = 0;
00105 }
00106 }
00107 {
00108 XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00109 int i, ucacheSize = _uCache.size();
00110 for ( i=0; i<ucacheSize ; ++i ) {
00111 resetUMap(i);
00112 delete _uCache[i];
00113 _uCache[i] = 0;
00114 }
00115 }
00116
00117
00118 _rtLogger->flush();
00119 delete _rtLogger;
00120
00121
00122 fstream f(_rtMaxIdsPath.c_str(), ios::out);
00123 f << "o " << _uniqueDictId << '\n'
00124 << "u " << _uniqueUserId << endl;
00125 f.close();
00126
00127
00128 unlink(_rtFlagPath.c_str());
00129 }
00130
00131 struct connectDictIdsWithCache : public std::unary_function<XrdMonDecDictInfo*, void> {
00132 connectDictIdsWithCache(map<dictid_t, XrdMonDecDictInfo*>& dC) : _cache(dC){}
00133 void operator()(XrdMonDecDictInfo* di) {
00134 dictid_t id = di->xrdId();
00135 _cache[id] = di;
00136 }
00137 map<dictid_t, XrdMonDecDictInfo*>& _cache;
00138 };
00139
00140 void
00141 XrdMonDecSink::init(dictid_t min, dictid_t max, const string& senderHP)
00142 {
00143
00144
00145 vector<XrdMonDecDictInfo*> diVector = loadActiveDictInfo();
00146
00147
00148
00149
00150
00151 ::abort();
00152 }
00153
00154 void
00155 XrdMonDecSink::initRT(const char* rtLogDir,
00156 int rtBufSize)
00157 {
00158 _rtFlagPath = rtLogDir;
00159 _rtFlagPath += "/rtRunning.flag";
00160 _rtMaxIdsPath = rtLogDir;
00161 _rtMaxIdsPath += "/rtMax.jnl";
00162
00163
00164
00165 if( (access(_rtFlagPath.c_str(), F_OK)) != -1 ) {
00166 string s("Can't start rtDecoder: either collector is already running, or it has been stopped uncleanly, in which case you need to run cleanup utility first");
00167 throw XrdMonException(ERR_UNKNOWN, s);
00168 }
00169
00170
00171 fstream f(_rtFlagPath.c_str(), fstream::out);
00172 f.close();
00173
00174 char* rtLogName = new char [strlen(rtLogDir) + 32];
00175 sprintf(rtLogName, "%s/rtLog.txt", rtLogDir);
00176
00177 char* rtLogNLock = new char [strlen(rtLogDir) + 32];
00178 sprintf(rtLogNLock, "%s/rtLog.lock", rtLogDir);
00179 _rtLogger = new XrdMonBufferedOutput(rtLogName, rtLogNLock, rtBufSize);
00180 addVersion();
00181
00182 delete [] rtLogName;
00183 delete [] rtLogNLock;
00184
00185
00186 f.open(_rtMaxIdsPath.c_str(), ios::in);
00187 if ( f.is_open() ) {
00188 do {
00189 char line[64];
00190 f.getline(line, 64);
00191 char type;
00192 int number;
00193 sscanf(line, "%c %i", &type, &number);
00194 if ( type == 'o' && number > _uniqueDictId + 1 ) {
00195 _uniqueDictId = number + 1;
00196 }
00197 if ( type == 'u' && number > _uniqueUserId + 1 ) {
00198 _uniqueUserId = number + 1;
00199 }
00200 } while ( ! f.eof() );
00201 f.close();
00202 unlink(_rtMaxIdsPath.c_str());
00203 cout << "Updated uniqueIds from jnl file. "
00204 << "uniqueDictId: " << _uniqueDictId << ", "
00205 << "uniqueUserId: " << _uniqueUserId << endl;
00206 }
00207 }
00208
00209 void
00210 XrdMonDecSink::addDictId(dictid_t xrdId,
00211 const char* theString,
00212 int len,
00213 senderid_t senderId)
00214 {
00215 XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00216 dmap_t* dMap = 0;
00217 if ( _dCache.size() <= senderId ) {
00218 dMap = new dmap_t;
00219 _dCache.push_back(dMap);
00220 } else {
00221 dMap = _dCache[senderId];
00222 }
00223
00224 dmapitr_t itr = dMap->find(xrdId);
00225 if ( itr != dMap->end() ) {
00226 cerr << "Error: dictID " << xrdId << " already in cache." << endl;
00227 return;
00228
00229 }
00230
00231 XrdMonDecDictInfo* di;
00232 (*dMap)[xrdId] = di = new XrdMonDecDictInfo(xrdId, _uniqueDictId++,
00233 theString, len, senderId);
00234
00235
00236
00237
00238
00239 }
00240
00241
00242 void
00243 XrdMonDecSink::addStageInfo(dictid_t xrdId,
00244 const char* theString,
00245 int len,
00246 senderid_t senderId)
00247 {
00248
00249
00250 dictid_t uniqueId2use = 0;
00251 if ( xrdId > 0 ) {
00252 uniqueId2use = _uniqueDictId++;
00253 }
00254
00255 XrdMonDecStageInfo* si;
00256 si = new XrdMonDecStageInfo(xrdId, uniqueId2use,
00257 theString, len, senderId);
00258
00259 if ( 0 != _rtLogger ) {
00260 _rtLogger->add( si->writeRT2Buffer() );
00261 if ( --_verFreqCount < 1 ) {
00262 addVersion();
00263 _verFreqCount = VER_FREQ;
00264 }
00265 }
00266 delete si;
00267 }
00268
00269
00270 void
00271 XrdMonDecSink::addUserId(dictid_t usrId,
00272 const char* theString,
00273 int len,
00274 senderid_t senderId)
00275 {
00276 XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00277 umap_t* uMap = 0;
00278 if ( _uCache.size() <= senderId ) {
00279 uMap = new umap_t;
00280 _uCache.push_back(uMap);
00281 } else {
00282 uMap = _uCache[senderId];
00283 }
00284
00285 umapitr_t itr = uMap->find(usrId);
00286 if ( itr != uMap->end() ) {
00287 cerr << "Error: userID " << usrId << " already in cache." << endl;
00288 return;
00289
00290 }
00291
00292 XrdMonDecUserInfo* ui;
00293 (*uMap)[usrId] = ui
00294 = new XrdMonDecUserInfo(usrId, _uniqueUserId++,
00295 theString, len, senderId);
00296
00297
00298 if ( 0 != _rtLogger ) {
00299 _rtLogger->add( ui->writeRT2Buffer(XrdMonDecUserInfo::CONNECT) );
00300 if ( --_verFreqCount < 1 ) {
00301 addVersion();
00302 _verFreqCount = VER_FREQ;
00303 }
00304 }
00305 }
00306
00307 void
00308 XrdMonDecSink::add(dictid_t xrdId,
00309 XrdMonDecTraceInfo& trace,
00310 senderid_t senderId)
00311 {
00312 static long totalNoTraces = 0;
00313 static long noLostTraces = 0;
00314 if ( ++totalNoTraces % 500001 == 500000 ) {
00315 cout << noLostTraces << " lost since last time" << endl;
00316 noLostTraces = 0;
00317 }
00318
00319 XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00320 dmap_t* dMap = 0;
00321 if ( _dCache.size() <= senderId ) {
00322 dMap = new dmap_t;
00323 _dCache.push_back(dMap);
00324 } else {
00325 dMap = _dCache[senderId];
00326 }
00327
00328 dmapitr_t itr = dMap->find(xrdId);
00329 if ( itr == dMap->end() ) {
00330 registerLostPacket(xrdId, "Add trace");
00331 return;
00332 }
00333 XrdMonDecDictInfo* di = itr->second;
00334
00335 trace.setUniqueId(di->uniqueId());
00336
00337 if ( ! di->addTrace(trace) ) {
00338 return;
00339 }
00340 if ( _saveTraces ) {
00341
00342
00343 _tCache.push_back(trace);
00344 if ( _tCache.size() >= _tCacheSize ) {
00345 flushTCache();
00346 }
00347 cout << "FIXME: tcache one for all servers now, good enough?" << endl;
00348 :: abort();
00349 }
00350 }
00351
00352 void
00353 XrdMonDecSink::addUserDisconnect(dictid_t xrdId,
00354 kXR_int32 sec,
00355 kXR_int32 timestamp,
00356 senderid_t senderId)
00357 {
00358 XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00359
00360 umap_t* uMap = 0;
00361 if ( _uCache.size() <= senderId ) {
00362 uMap = new umap_t;
00363 _uCache.push_back(uMap);
00364 } else {
00365 uMap = _uCache[senderId];
00366 }
00367
00368 umapitr_t itr = uMap->find(xrdId);
00369 if ( itr == uMap->end() ) {
00370 registerLostPacket(xrdId, "User disconnect");
00371 return;
00372 }
00373 itr->second->setDisconnectInfo(sec, timestamp);
00374
00375 if ( 0 != _rtLogger ) {
00376 _rtLogger->add( itr->second->writeRT2Buffer(XrdMonDecUserInfo::DISCONNECT) );
00377 if ( --_verFreqCount < 1 ) {
00378 addVersion();
00379 _verFreqCount = VER_FREQ;
00380 }
00381 }
00382 }
00383
00384 void
00385 XrdMonDecSink::openFile(dictid_t xrdId,
00386 kXR_int32 timestamp,
00387 senderid_t senderId,
00388 kXR_int64 fSize)
00389 {
00390 XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00391 dmap_t* dMap = 0;
00392 if ( _dCache.size() <= senderId ) {
00393 dMap = new dmap_t;
00394 _dCache.push_back(dMap);
00395 } else {
00396 dMap = _dCache[senderId];
00397 }
00398
00399 dmapitr_t itr = dMap->find(xrdId);
00400 if ( itr == dMap->end() ) {
00401 registerLostPacket(xrdId, "Open file");
00402 cout << "requested open file " << xrdId << ", xrdId not found" << endl;
00403 return;
00404 }
00405
00406 cout << "Opening file " << xrdId << endl;
00407 itr->second->openFile(timestamp, fSize);
00408
00409 if ( 0 != _rtLogger ) {
00410 _rtLogger->add( itr->second->writeRT2BufferOpenFile(fSize) );
00411 if ( --_verFreqCount < 1 ) {
00412 addVersion();
00413 _verFreqCount = VER_FREQ;
00414 }
00415 }
00416 }
00417
00418 void
00419 XrdMonDecSink::closeFile(dictid_t xrdId,
00420 kXR_int64 bytesR,
00421 kXR_int64 bytesW,
00422 kXR_int32 timestamp,
00423 senderid_t senderId)
00424 {
00425 XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00426 dmap_t* dMap = 0;
00427 if ( _dCache.size() <= senderId ) {
00428 dMap = new dmap_t;
00429 _dCache.push_back(dMap);
00430 } else {
00431 dMap = _dCache[senderId];
00432 }
00433
00434 dmapitr_t itr = dMap->find(xrdId);
00435 if ( itr == dMap->end() ) {
00436 registerLostPacket(xrdId, "Close file");
00437 return;
00438 }
00439
00440
00441
00442 itr->second->closeFile(bytesR, bytesW, timestamp);
00443
00444 if ( 0 != _rtLogger ) {
00445 _rtLogger->add(itr->second->writeRT2BufferCloseFile());
00446 if ( --_verFreqCount < 1 ) {
00447 addVersion();
00448 _verFreqCount = VER_FREQ;
00449 }
00450 }
00451 }
00452
00453 void
00454 XrdMonDecSink::loadUniqueIdsAndSeq()
00455 {
00456 if ( 0 == access(_jnlPath.c_str(), F_OK) ) {
00457 char buf[32];
00458 fstream f(_jnlPath.c_str(), ios::in);
00459 f.read(buf, sizeof(sequen_t)+2*sizeof(dictid_t));
00460 f.close();
00461
00462 memcpy(&_lastSeq, buf, sizeof(sequen_t));
00463 kXR_int32 v32;
00464
00465 memcpy(&v32, buf+sizeof(sequen_t), sizeof(kXR_int32));
00466 _uniqueDictId = ntohl(v32);
00467
00468 memcpy(&v32, buf+sizeof(sequen_t)+sizeof(dictid_t), sizeof(kXR_int32));
00469 _uniqueUserId = ntohl(v32);
00470
00471 cout << "Loaded from jnl file: "
00472 << "seq " << (int) _lastSeq
00473 << ", uniqueDictId " << _uniqueDictId
00474 << ", uniqueUserId " << _uniqueUserId
00475 << endl;
00476 }
00477 }
00478
00479 void
00480 XrdMonDecSink::flushClosedDicts()
00481 {
00482 fstream fD(_dictPath.c_str(), ios::out | ios::app);
00483 enum { BUFSIZE = 1024*1024 };
00484 string buf;
00485 buf.reserve(BUFSIZE);
00486
00487 int curLen = 0, sizeBefore = 0, sizeAfter = 0;
00488 {
00489 XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00490 int i, dcacheSize = _dCache.size();
00491 for ( i=0; i<dcacheSize ; ++i ) {
00492 dmap_t* m = _dCache[i];
00493 sizeBefore += m->size();
00494 flushOneDMap(_dCache[i], curLen, BUFSIZE, buf, fD);
00495 sizeAfter += m->size();
00496 }
00497 }
00498
00499 if ( curLen > 0 ) {
00500 fD.write(buf.c_str(), curLen);
00501
00502 }
00503 fD.close();
00504 cout << "flushed (d) " << sizeBefore-sizeAfter
00505 << ", left " << sizeAfter << endl;
00506 }
00507
00508 void
00509 XrdMonDecSink::flushOneDMap(dmap_t* m,
00510 int& curLen,
00511 const int BUFSIZE,
00512 string& buf,
00513 fstream& fD)
00514 {
00515 vector<dictid_t> forDeletion;
00516 dmapitr_t itr;
00517 for ( itr=m->begin() ; itr != m->end() ; ++itr ) {
00518 XrdMonDecDictInfo* di = itr->second;
00519 if ( di != 0 && di->isClosed() ) {
00520 const char* dString = di->convert2string();
00521 int strLen = strlen(dString);
00522 if ( curLen == 0 ) {
00523 buf = dString;
00524 } else {
00525 if ( curLen + strLen >= BUFSIZE ) {
00526 fD.write(buf.c_str(), curLen);
00527 curLen = 0;
00528
00529 buf = dString;
00530 } else {
00531 buf += dString;
00532 }
00533 }
00534 curLen += strLen;
00535 delete itr->second;
00536 forDeletion.push_back(itr->first);
00537 }
00538 }
00539 int s = forDeletion.size();
00540 for (int i=0 ; i<s ; ++i) {
00541 m->erase(forDeletion[i]);
00542 }
00543 }
00544
00545 void
00546 XrdMonDecSink::flushUserCache()
00547 {
00548 fstream fD(_userPath.c_str(), ios::app);
00549 enum { BUFSIZE = 1024*1024 };
00550
00551 string buf;
00552 buf.reserve(BUFSIZE);
00553
00554 int curLen = 0, sizeBefore = 0, sizeAfter = 0;
00555 {
00556 XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00557 int i, ucacheSize = _uCache.size();
00558 for ( i=0 ; i<ucacheSize ; ++i ) {
00559 umap_t* m = _uCache[i];
00560 sizeBefore += m->size();
00561 flushOneUMap(m, curLen, BUFSIZE, buf, fD);
00562 sizeAfter += m->size();
00563 }
00564 }
00565
00566 if ( curLen > 0 ) {
00567 fD.write(buf.c_str(), curLen);
00568 cout << "flushed to disk: \n" << buf << endl;
00569 }
00570 fD.close();
00571 cout << "flushed (u) " << sizeBefore-sizeAfter << ", left "
00572 << sizeAfter << endl;
00573 }
00574
00575 void
00576 XrdMonDecSink::flushOneUMap(umap_t* m,
00577 int& curLen,
00578 const int BUFSIZE,
00579 string& buf,
00580 fstream& fD)
00581 {
00582 vector <dictid_t> forDeletion;
00583 umapitr_t itr;
00584 for ( itr=m->begin() ; itr != m->end() ; ++itr ) {
00585 XrdMonDecUserInfo* di = itr->second;
00586 if ( di != 0 && di->readyToBeStored() ) {
00587 const char* dString = di->convert2string();
00588 int strLen = strlen(dString);
00589 if ( curLen == 0 ) {
00590 buf = dString;
00591 } else {
00592 if ( curLen + strLen >= BUFSIZE ) {
00593 fD.write(buf.c_str(), curLen);
00594 curLen = 0;
00595 buf = dString;
00596 } else {
00597 buf += dString;
00598 }
00599 }
00600 curLen += strLen;
00601 delete itr->second;
00602 forDeletion.push_back(itr->first);
00603 }
00604 }
00605 int s = forDeletion.size();
00606 for (int i=0 ; i<s ; ++i) {
00607 m->erase(forDeletion[i]);
00608 }
00609 }
00610
00611
00612
00613 void
00614 XrdMonDecSink::flushTCache()
00615 {
00616 if ( _tCache.size() == 0 ) {
00617 return;
00618 }
00619
00620 fstream f;
00621 enum { BUFSIZE = 32*1024 };
00622 char buf[BUFSIZE];
00623 int curLen = 0;
00624 int s = _tCache.size();
00625 char oneTrace[256];
00626 for (int i=0 ; i<s ; ++i) {
00627 _tCache[i].convertToString(oneTrace);
00628 int strLen = strlen(oneTrace);
00629 if ( curLen == 0 ) {
00630 strcpy(buf, oneTrace);
00631 } else {
00632 if ( curLen + strLen >= BUFSIZE ) {
00633 write2TraceFile(f, buf, curLen);
00634 curLen = 0;
00635
00636 strcpy(buf, oneTrace);
00637 } else {
00638 strcat(buf, oneTrace);
00639 }
00640 }
00641 curLen += strLen;
00642 }
00643 if ( curLen > 0 ) {
00644 write2TraceFile(f, buf, curLen);
00645
00646 }
00647 _tCache.clear();
00648 f.close();
00649 }
00650
00651
00652 void
00653 XrdMonDecSink::checkpoint()
00654 {
00655 ::abort();
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708 }
00709
00710
00711 void
00712 XrdMonDecSink::openTraceFile(fstream& f)
00713 {
00714
00715
00716
00717
00718
00719
00720 cout << "trace log file open NOT IMPLEMENTED " << endl;
00721 ::abort();
00722 }
00723
00724
00725 void
00726 XrdMonDecSink::write2TraceFile(fstream& f,
00727 const char* buf,
00728 int len)
00729 {
00730 if ( ! f.is_open() ) {
00731 openTraceFile(f);
00732 }
00733 kXR_int64 tobeSize = len + f.tellp();
00734 if ( tobeSize > _maxTraceLogSize*1024*1024 ) {
00735 f.close();
00736 ++_traceLogNumber;
00737 openTraceFile(f);
00738
00739 }
00740 f.write(buf, len);
00741 }
00742
00743 vector<XrdMonDecDictInfo*>
00744 XrdMonDecSink::loadActiveDictInfo()
00745 {
00746 vector<XrdMonDecDictInfo*> v;
00747
00748 if ( 0 != access(_jnlPath.c_str(), F_OK) ) {
00749 return v;
00750 }
00751
00752 fstream f(_jnlPath.c_str(), ios::in);
00753 f.seekg(0, ios::end);
00754 int fSize = f.tellg();
00755 int pos = sizeof(sequen_t) + sizeof(kXR_int32);
00756 if ( fSize - pos == 0 ) {
00757 return v;
00758 }
00759 f.seekg(pos);
00760 char* buf = new char[fSize-pos];
00761 f.read(buf, fSize-pos);
00762
00763 int bufPos = 0;
00764 while ( bufPos < fSize-pos ) {
00765 v.push_back( new XrdMonDecDictInfo(buf, bufPos) );
00766 }
00767 delete [] buf;
00768
00769 return v;
00770 }
00771
00772 void
00773 XrdMonDecSink::registerLostPacket(dictid_t xrdId, const char* descr)
00774 {
00775 map<dictid_t, long>::iterator lostItr = _lost.find(xrdId);
00776 if ( lostItr == _lost.end() ) {
00777 cerr << descr << ": cannot find dictID " << xrdId << endl;
00778 _lost[xrdId] = 1;
00779 } else {
00780 ++lostItr->second;
00781 }
00782 }
00783
00784 void
00785 XrdMonDecSink::flushHistoryData()
00786 {
00787 cout << "Flushing decoded data..." << endl;
00788 flushClosedDicts();
00789 flushUserCache();
00790 }
00791
00792 void
00793 XrdMonDecSink::reset(senderid_t senderId)
00794 {
00795 flushClosedDicts();
00796
00797 {
00798 XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00799 resetDMap(senderId);
00800 }
00801
00802 {
00803 XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00804 resetUMap(senderId);
00805 }
00806 }
00807
00808 void
00809 XrdMonDecSink::resetDMap(senderid_t senderId)
00810 {
00811 if ( senderId >= _dCache.size() ) {
00812 return;
00813 }
00814 dmap_t* m = _dCache[senderId];
00815 dmapitr_t itr;
00816 for ( itr=m->begin() ; itr != m->end() ; ++itr ) {
00817 delete itr->second;
00818 }
00819 m->clear();
00820 }
00821
00822 void
00823 XrdMonDecSink::resetUMap(senderid_t senderId)
00824 {
00825 if ( senderId >= _uCache.size() ) {
00826 return;
00827 }
00828 umap_t* m = _uCache[senderId];
00829 umapitr_t itr;
00830 for ( itr=m->begin() ; itr != m->end() ; ++itr ) {
00831 delete itr->second;
00832 }
00833 m->clear();
00834 }
00835
00836 void
00837 XrdMonDecSink::reportLostPackets()
00838 {
00839 int size = _lost.size();
00840 if ( size > 0 ) {
00841 cout << "Lost " << size << " dictIds {id, #lostTraces}: ";
00842 map<dictid_t, long>::iterator lostItr = _lost.begin();
00843 while ( lostItr != _lost.end() ) {
00844 cout << "{"<< lostItr->first << ", " << lostItr->second << "} ";
00845 ++lostItr;
00846 }
00847 cout << endl;
00848 }
00849 }
00850
00851 void
00852 XrdMonDecSink::registerXrdRestart(kXR_int32 stod, senderid_t senderId)
00853 {
00854 char t[24];
00855 timestamp2string(stod, t, GMT);
00856 const char* h = XrdMonSenderInfo::id2Host(senderId);
00857
00858 if ( 0 != _rtLogger ) {
00859 char buf[512];
00860 sprintf(buf, "r\t%s\t%s\n", h, t);
00861 _rtLogger->add(buf);
00862 }
00863
00864 fstream f(_xrdRestartLog.c_str(), ios::out | ios::app);
00865 f << h << '\t' << t << endl;
00866 f.close();
00867 }
00868
00869 void
00870 XrdMonDecSink::addVersion()
00871 {
00872 char buf[16];
00873 sprintf(buf, "v\t%03d\n", XRDMON_VERSION);
00874 _rtLogger->add(buf);
00875 }