XrdMonDecSink.cc

Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                             XrdMonDecSink.cc                              */
00004 /*                                                                           */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00010 
00011 // $Id: XrdMonDecSink.cc 24468 2008-06-22 16:47:03Z ganis $
00012 
00013 #include "XrdMon/XrdMonErrors.hh"
00014 #include "XrdMon/XrdMonException.hh"
00015 #include "XrdMon/XrdMonDecSink.hh"
00016 #include "XrdMon/XrdMonDecTraceInfo.hh"
00017 #include "XrdMon/XrdMonDecStageInfo.hh"
00018 #include "XrdMon/XrdMonSenderInfo.hh"
00019 #include "XrdMon/XrdMonUtils.hh"
00020 
00021 #include <netinet/in.h>
00022 #include <sys/time.h> // FIXME - remove when xrootd supports openfile
00023 #include <iomanip>
00024 #include <unistd.h>
00025 using std::cerr;
00026 using std::cout;
00027 using std::endl;
00028 using std::ios;
00029 using std::map;
00030 using std::setw;
00031 
00032 const kXR_unt16 XrdMonDecSink::VER_FREQ = 1000;
00033 
00034 XrdMonDecSink::XrdMonDecSink(const char* baseDir,
00035                              const char* rtLogDir,
00036                              int rtBufSize,
00037                              bool saveTraces,
00038                              int maxTraceLogSize)
00039     : _verFreqCount(VER_FREQ),
00040       _rtLogger(0),
00041       _saveTraces(saveTraces),
00042       _tCacheSize(32*1024), // 32*1024 * 32 bytes = 1 MB FIXME-configurable?
00043       _traceLogNumber(0),
00044       _maxTraceLogSize(maxTraceLogSize),
00045       _lastSeq(0xFF),
00046       _uniqueDictId(1),
00047       _uniqueUserId(1)
00048 {
00049     if ( maxTraceLogSize < 2  ) {
00050         cerr << "Trace log size must be > 2MB" << endl;
00051         throw XrdMonException(ERR_INVALIDARG, "Trace log size must be > 2MB");
00052     }
00053 
00054     _path = baseDir;
00055     _path += "/";
00056     _jnlPath = _path + "/jnl";
00057     _path += generateTimestamp();
00058     _path += "_";
00059     _dictPath = _path + "dict.ascii";
00060     _userPath = _path + "user.ascii";
00061     _xrdRestartLog = baseDir;
00062     _xrdRestartLog += "/xrdRestarts.ascii";
00063     
00064     if ( 0 == access(_dictPath.c_str(), F_OK) ) {
00065         string s("File "); s += _dictPath;
00066         s += " exists. Move it somewhere else first.";
00067         throw XrdMonException(ERR_INVALIDARG, s);
00068     }
00069     if ( _saveTraces ) {
00070         _tCache.reserve(_tCacheSize+1);
00071         string fTPath = _path + "trace000.ascii";
00072         if ( 0 == access(fTPath.c_str(), F_OK) ) {
00073             string s("File "); s += fTPath;
00074             s += " exists. Move it somewhere else first.";
00075             throw XrdMonException(ERR_INVALIDARG, s);
00076         }
00077     }
00078 
00079     if ( 0 != rtLogDir ) {
00080         initRT(rtLogDir, rtBufSize);
00081     } else {
00082         loadUniqueIdsAndSeq();
00083     }
00084 }
00085 
00086 XrdMonDecSink::~XrdMonDecSink()
00087 {
00088     flushClosedDicts();
00089 
00090     reportLostPackets();
00091     _lost.clear();
00092 
00093 
00094     if ( 0 == _rtLogger ) {
00095         flushTCache();
00096         checkpoint();
00097     }    
00098     {
00099         XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00100         int i, dcacheSize = _dCache.size();
00101         for ( i=0; i<dcacheSize ; ++i ) {
00102             resetDMap(i);
00103             delete _dCache[i];
00104             _dCache[i] = 0;
00105         }
00106     }
00107     {
00108         XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00109         int i, ucacheSize = _uCache.size();
00110         for ( i=0; i<ucacheSize ; ++i ) {
00111             resetUMap(i);
00112             delete _uCache[i];
00113             _uCache[i] = 0;
00114         }
00115     }
00116 
00117 
00118     _rtLogger->flush();
00119     delete _rtLogger;
00120 
00121     // save ids in jnl file
00122     fstream f(_rtMaxIdsPath.c_str(), ios::out);
00123     f << "o " << _uniqueDictId << '\n'
00124       << "u " << _uniqueUserId << endl;
00125     f.close();
00126 
00127     // remove the flag indicating that collector/decoder is running
00128     unlink(_rtFlagPath.c_str());
00129 }
00130 
00131 struct connectDictIdsWithCache : public std::unary_function<XrdMonDecDictInfo*, void> {
00132     connectDictIdsWithCache(map<dictid_t, XrdMonDecDictInfo*>& dC) : _cache(dC){}
00133     void operator()(XrdMonDecDictInfo* di) {
00134         dictid_t id = di->xrdId();
00135         _cache[id] = di;
00136     }
00137     map<dictid_t, XrdMonDecDictInfo*>& _cache;
00138 };
00139 
00140 void
00141 XrdMonDecSink::init(dictid_t min, dictid_t max, const string& senderHP)
00142 {
00143     // read jnl file, create vector<XrdMonDecDictInfo*> of active 
00144     // XrdMonDecDictInfo objects
00145     vector<XrdMonDecDictInfo*> diVector = loadActiveDictInfo();
00146 
00147     // connect active XrdMonDecDictInfo objects to the cache
00148     //std::for_each(diVector.begin(),
00149     //              diVector.end(),
00150     //              connectDictIdsWithCache(_dCache));
00151     ::abort();
00152 }
00153 
00154 void
00155 XrdMonDecSink::initRT(const char* rtLogDir,
00156                       int rtBufSize)
00157 {
00158     _rtFlagPath = rtLogDir;
00159     _rtFlagPath += "/rtRunning.flag";
00160     _rtMaxIdsPath = rtLogDir;
00161     _rtMaxIdsPath += "/rtMax.jnl";
00162 
00163     // check if another collector/decoder is not running
00164     // or if the old one was closed correctly
00165     if( (access(_rtFlagPath.c_str(), F_OK)) != -1 ) {
00166         string s("Can't start rtDecoder: either collector is already running, or it has been stopped uncleanly, in which case you need to run cleanup utility first");
00167         throw XrdMonException(ERR_UNKNOWN, s);
00168     }
00169 
00170     // create the flag indicating that collector/decoder is running
00171     fstream f(_rtFlagPath.c_str(), fstream::out);
00172     f.close();
00173 
00174     char* rtLogName = new char [strlen(rtLogDir) + 32];
00175     sprintf(rtLogName, "%s/rtLog.txt", rtLogDir);
00176 
00177     char* rtLogNLock = new char [strlen(rtLogDir) + 32];
00178     sprintf(rtLogNLock, "%s/rtLog.lock", rtLogDir);
00179     _rtLogger = new XrdMonBufferedOutput(rtLogName, rtLogNLock, rtBufSize);
00180     addVersion();
00181     
00182     delete [] rtLogName;
00183     delete [] rtLogNLock;
00184     
00185     // read in unique ids from jnl file
00186     f.open(_rtMaxIdsPath.c_str(), ios::in);
00187     if ( f.is_open() ) {
00188         do {
00189             char line[64];
00190             f.getline(line, 64);
00191             char type;
00192             int number;
00193             sscanf(line, "%c %i", &type, &number);
00194             if ( type == 'o' && number > _uniqueDictId + 1 ) {
00195                 _uniqueDictId = number + 1;
00196             }
00197             if ( type == 'u' && number > _uniqueUserId + 1 ) {
00198                 _uniqueUserId = number + 1;
00199             }
00200         } while ( ! f.eof() );
00201         f.close();
00202         unlink(_rtMaxIdsPath.c_str());
00203         cout << "Updated uniqueIds from jnl file. "
00204              << "uniqueDictId: " << _uniqueDictId << ", "
00205              << "uniqueUserId: " << _uniqueUserId << endl;
00206     }
00207 }
00208 
00209 void
00210 XrdMonDecSink::addDictId(dictid_t xrdId, 
00211                          const char* theString, 
00212                          int len,
00213                          senderid_t senderId)
00214 {
00215     XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00216     dmap_t* dMap = 0;
00217     if ( _dCache.size() <= senderId ) {
00218         dMap = new dmap_t;
00219         _dCache.push_back(dMap);
00220     } else {
00221         dMap = _dCache[senderId];
00222     }
00223 
00224     dmapitr_t itr = dMap->find(xrdId);
00225     if ( itr != dMap->end() ) {
00226         cerr << "Error: dictID " << xrdId << " already in cache." << endl;
00227         return;
00228         //throw XrdMonException(ERR_DICTIDINCACHE, buf);
00229     }
00230     
00231     XrdMonDecDictInfo* di;
00232     (*dMap)[xrdId] = di = new XrdMonDecDictInfo(xrdId, _uniqueDictId++, 
00233                                                 theString, len, senderId);
00234     
00235     // cout << "Added dictInfo to sink: " << *di << endl;
00236 
00237     // FIXME: remove this line when xrootd supports openFile
00238     // struct timeval tv; gettimeofday(&tv, 0); openFile(xrdId, tv.tv_sec-8640000);
00239 }
00240 
00241 
00242 void
00243 XrdMonDecSink::addStageInfo(dictid_t xrdId, 
00244                             const char* theString, 
00245                             int len,
00246                             senderid_t senderId)
00247 {
00248     // FIXME: simplify code below once the dictId 
00249     // is properlysent from xrootd
00250     dictid_t uniqueId2use = 0;
00251     if ( xrdId > 0 ) {
00252         uniqueId2use = _uniqueDictId++;
00253     }
00254 
00255     XrdMonDecStageInfo* si;
00256     si = new XrdMonDecStageInfo(xrdId, uniqueId2use, 
00257                                 theString, len, senderId);
00258 
00259     if ( 0 != _rtLogger ) {
00260         _rtLogger->add( si->writeRT2Buffer() );
00261         if ( --_verFreqCount < 1 ) {
00262             addVersion();
00263             _verFreqCount = VER_FREQ;
00264         }
00265     }
00266     delete si;
00267 }
00268 
00269 
00270 void
00271 XrdMonDecSink::addUserId(dictid_t usrId, 
00272                          const char* theString, 
00273                          int len,
00274                          senderid_t senderId)
00275 {
00276     XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00277     umap_t* uMap = 0;
00278     if ( _uCache.size() <= senderId ) {
00279         uMap = new umap_t;
00280         _uCache.push_back(uMap);
00281     } else {
00282         uMap = _uCache[senderId];
00283     }
00284 
00285     umapitr_t itr = uMap->find(usrId);
00286     if ( itr != uMap->end() ) {
00287         cerr << "Error: userID " << usrId << " already in cache." << endl;
00288         return;
00289         //throw XrdMonException(ERR_USERIDINCACHE, buf);
00290     }
00291     
00292     XrdMonDecUserInfo* ui;
00293     (*uMap)[usrId] = ui 
00294         = new XrdMonDecUserInfo(usrId, _uniqueUserId++, 
00295                                 theString, len, senderId);
00296     // cout << "Added userInfo to sink: " << *ui << endl;
00297 
00298     if ( 0 != _rtLogger ) {
00299         _rtLogger->add( ui->writeRT2Buffer(XrdMonDecUserInfo::CONNECT) );
00300         if ( --_verFreqCount < 1 ) {
00301             addVersion();
00302             _verFreqCount = VER_FREQ;
00303         }
00304     }
00305 }
00306 
00307 void
00308 XrdMonDecSink::add(dictid_t xrdId,
00309                    XrdMonDecTraceInfo& trace,
00310                    senderid_t senderId)
00311 {
00312     static long totalNoTraces = 0;
00313     static long noLostTraces  = 0;
00314     if ( ++totalNoTraces % 500001 == 500000 ) {
00315         cout << noLostTraces << " lost since last time" << endl;
00316         noLostTraces = 0;
00317     }
00318 
00319     XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00320     dmap_t* dMap = 0;
00321     if ( _dCache.size() <= senderId ) {
00322         dMap = new dmap_t;
00323         _dCache.push_back(dMap);
00324     } else {
00325         dMap = _dCache[senderId];
00326     }
00327     
00328     dmapitr_t itr = dMap->find(xrdId);
00329     if ( itr == dMap->end() ) {
00330         registerLostPacket(xrdId, "Add trace");
00331         return;
00332     }
00333     XrdMonDecDictInfo* di = itr->second;
00334     
00335     trace.setUniqueId(di->uniqueId());
00336     
00337     if ( ! di->addTrace(trace) ) {
00338         return; // something wrong with this trace, ignore it
00339     }
00340     if ( _saveTraces ) {
00341         //cout << "Adding trace to sink (dictid=" 
00342         //<< xrdId << ") " << trace << endl;
00343         _tCache.push_back(trace);
00344         if ( _tCache.size() >= _tCacheSize ) {
00345             flushTCache();
00346         }
00347         cout << "FIXME: tcache one for all servers now, good enough?" << endl;
00348         :: abort();
00349     }
00350 }
00351 
00352 void
00353 XrdMonDecSink::addUserDisconnect(dictid_t xrdId,
00354                                  kXR_int32 sec,
00355                                  kXR_int32 timestamp,
00356                                  senderid_t senderId)
00357 {
00358     XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00359 
00360     umap_t* uMap = 0;
00361     if ( _uCache.size() <= senderId ) {
00362         uMap = new umap_t;
00363         _uCache.push_back(uMap);
00364     } else {
00365         uMap = _uCache[senderId];
00366     }
00367 
00368     umapitr_t itr = uMap->find(xrdId);
00369     if ( itr == uMap->end() ) {
00370         registerLostPacket(xrdId, "User disconnect");
00371         return;
00372     }
00373     itr->second->setDisconnectInfo(sec, timestamp);
00374     
00375     if ( 0 != _rtLogger ) {
00376         _rtLogger->add( itr->second->writeRT2Buffer(XrdMonDecUserInfo::DISCONNECT) );
00377         if ( --_verFreqCount < 1 ) {
00378             addVersion();
00379             _verFreqCount = VER_FREQ;
00380         }
00381     }    
00382 }
00383 
00384 void
00385 XrdMonDecSink::openFile(dictid_t xrdId,
00386                         kXR_int32 timestamp,
00387                         senderid_t senderId,
00388                         kXR_int64 fSize)
00389 {
00390     XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00391     dmap_t* dMap = 0;
00392     if ( _dCache.size() <= senderId ) {
00393         dMap = new dmap_t;
00394         _dCache.push_back(dMap);
00395     } else {
00396         dMap = _dCache[senderId];
00397     }
00398     
00399     dmapitr_t itr = dMap->find(xrdId);
00400     if ( itr == dMap->end() ) {
00401         registerLostPacket(xrdId, "Open file");
00402         cout << "requested open file " << xrdId << ", xrdId not found" << endl;
00403         return;
00404     }
00405 
00406     cout << "Opening file " << xrdId << endl;
00407     itr->second->openFile(timestamp, fSize);
00408 
00409     if ( 0 != _rtLogger ) {
00410         _rtLogger->add( itr->second->writeRT2BufferOpenFile(fSize) );
00411         if ( --_verFreqCount < 1 ) {
00412             addVersion();
00413             _verFreqCount = VER_FREQ;
00414         }
00415     }
00416 }
00417 
00418 void
00419 XrdMonDecSink::closeFile(dictid_t xrdId, 
00420                          kXR_int64 bytesR, 
00421                          kXR_int64 bytesW, 
00422                          kXR_int32 timestamp,
00423                          senderid_t senderId)
00424 {
00425     XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00426     dmap_t* dMap = 0;
00427     if ( _dCache.size() <= senderId ) {
00428         dMap = new dmap_t;
00429         _dCache.push_back(dMap);
00430     } else {
00431         dMap = _dCache[senderId];
00432     }
00433     
00434     dmapitr_t itr = dMap->find(xrdId);
00435     if ( itr == dMap->end() ) {
00436         registerLostPacket(xrdId, "Close file");
00437         return;
00438     }
00439 
00440     //cout << "Closing file id= " << xrdId << " r= " 
00441     //     << bytesR << " w= " << bytesW << endl;
00442     itr->second->closeFile(bytesR, bytesW, timestamp);
00443 
00444     if ( 0 != _rtLogger ) {
00445         _rtLogger->add(itr->second->writeRT2BufferCloseFile());
00446         if ( --_verFreqCount < 1 ) {
00447             addVersion();
00448             _verFreqCount = VER_FREQ;
00449         }
00450     }
00451 }
00452 
00453 void
00454 XrdMonDecSink::loadUniqueIdsAndSeq()
00455 {
00456     if ( 0 == access(_jnlPath.c_str(), F_OK) ) {
00457         char buf[32];
00458         fstream f(_jnlPath.c_str(), ios::in);
00459         f.read(buf, sizeof(sequen_t)+2*sizeof(dictid_t));
00460         f.close();
00461 
00462         memcpy(&_lastSeq, buf, sizeof(sequen_t));
00463         kXR_int32 v32;
00464 
00465         memcpy(&v32, buf+sizeof(sequen_t), sizeof(kXR_int32));
00466         _uniqueDictId = ntohl(v32);
00467 
00468         memcpy(&v32, buf+sizeof(sequen_t)+sizeof(dictid_t), sizeof(kXR_int32));
00469         _uniqueUserId = ntohl(v32);
00470 
00471         cout << "Loaded from jnl file: "
00472              << "seq " << (int) _lastSeq
00473              << ", uniqueDictId " << _uniqueDictId 
00474              << ", uniqueUserId " << _uniqueUserId 
00475              << endl;
00476     }
00477 }
00478 
00479 void
00480 XrdMonDecSink::flushClosedDicts()
00481 {
00482     fstream fD(_dictPath.c_str(), ios::out | ios::app);
00483     enum { BUFSIZE = 1024*1024 };
00484     string buf;
00485     buf.reserve(BUFSIZE);
00486 
00487     int curLen = 0, sizeBefore = 0, sizeAfter = 0;
00488     {
00489         XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00490         int i, dcacheSize = _dCache.size();
00491         for ( i=0; i<dcacheSize ; ++i ) {
00492             dmap_t* m = _dCache[i];
00493             sizeBefore += m->size();
00494             flushOneDMap(_dCache[i], curLen, BUFSIZE, buf, fD);
00495             sizeAfter += m->size();
00496         }
00497     }
00498     
00499     if ( curLen > 0 ) {
00500         fD.write(buf.c_str(), curLen);
00501         //cout << "flushed to disk: \n" << buf << endl;
00502     }
00503     fD.close();
00504     cout << "flushed (d) " << sizeBefore-sizeAfter 
00505          << ", left " << sizeAfter << endl;
00506 }
00507 
00508 void
00509 XrdMonDecSink::flushOneDMap(dmap_t* m,
00510                             int& curLen,
00511                             const int BUFSIZE, 
00512                             string& buf, 
00513                             fstream& fD)
00514 {
00515     vector<dictid_t> forDeletion;
00516     dmapitr_t itr;
00517     for ( itr=m->begin() ; itr != m->end() ; ++itr ) {
00518         XrdMonDecDictInfo* di = itr->second;
00519         if ( di != 0 && di->isClosed() ) {
00520             const char* dString = di->convert2string();
00521             int strLen = strlen(dString);
00522             if ( curLen == 0 ) {
00523                 buf = dString;
00524             } else {
00525                 if ( curLen + strLen >= BUFSIZE ) {
00526                     fD.write(buf.c_str(), curLen);
00527                     curLen = 0;
00528                     //cout << "flushed to disk: \n" << buf << endl;
00529                     buf = dString;
00530                 } else {
00531                     buf += dString;
00532                 }
00533             }
00534             curLen += strLen;
00535             delete itr->second;
00536             forDeletion.push_back(itr->first);
00537         }
00538     }
00539     int s = forDeletion.size();
00540     for (int i=0 ; i<s ; ++i) {
00541         m->erase(forDeletion[i]);
00542     }
00543 }
00544 
00545 void
00546 XrdMonDecSink::flushUserCache()
00547 {
00548     fstream fD(_userPath.c_str(), ios::app);
00549     enum { BUFSIZE = 1024*1024 };
00550     
00551     string buf;
00552     buf.reserve(BUFSIZE);
00553 
00554     int curLen = 0, sizeBefore = 0, sizeAfter = 0;
00555     {
00556         XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00557         int i, ucacheSize = _uCache.size();
00558         for ( i=0 ; i<ucacheSize ; ++i ) {
00559             umap_t* m = _uCache[i];
00560             sizeBefore += m->size();
00561             flushOneUMap(m, curLen, BUFSIZE, buf, fD);
00562             sizeAfter += m->size();
00563         }        
00564     }
00565     
00566     if ( curLen > 0 ) {
00567         fD.write(buf.c_str(), curLen);
00568         cout << "flushed to disk: \n" << buf << endl;
00569     }
00570     fD.close();
00571     cout << "flushed (u) " << sizeBefore-sizeAfter << ", left " 
00572          << sizeAfter << endl;
00573 }
00574 
00575 void
00576 XrdMonDecSink::flushOneUMap(umap_t* m,
00577                             int& curLen,
00578                             const int BUFSIZE,
00579                             string& buf, 
00580                             fstream& fD)
00581 {
00582     vector <dictid_t> forDeletion;
00583     umapitr_t itr;
00584     for ( itr=m->begin() ; itr != m->end() ; ++itr ) {
00585         XrdMonDecUserInfo* di = itr->second;
00586         if ( di != 0 && di->readyToBeStored() ) {
00587             const char* dString = di->convert2string();
00588             int strLen = strlen(dString);
00589             if ( curLen == 0 ) {
00590                 buf = dString;
00591             } else {
00592                 if ( curLen + strLen >= BUFSIZE ) {
00593                     fD.write(buf.c_str(), curLen);
00594                     curLen = 0;
00595                     buf = dString;
00596                 } else {
00597                     buf += dString;
00598                 }
00599             }
00600             curLen += strLen;
00601             delete itr->second;
00602             forDeletion.push_back(itr->first);
00603         }
00604     }
00605     int s = forDeletion.size();
00606     for (int i=0 ; i<s ; ++i) {
00607         m->erase(forDeletion[i]);
00608     }
00609 }
00610 
00611 
00612 // used for offline processing of (full monitoring with traces) only
00613 void
00614 XrdMonDecSink::flushTCache()
00615 {
00616     if ( _tCache.size() == 0 ) {
00617         return;
00618     }
00619 
00620     fstream f;
00621     enum { BUFSIZE = 32*1024 };    
00622     char buf[BUFSIZE];
00623     int curLen = 0;
00624     int s = _tCache.size();
00625     char oneTrace[256];
00626     for (int i=0 ; i<s ; ++i) {
00627         _tCache[i].convertToString(oneTrace);
00628         int strLen = strlen(oneTrace);
00629         if ( curLen == 0 ) {
00630             strcpy(buf, oneTrace);
00631         } else {
00632             if ( curLen + strLen >= BUFSIZE ) {
00633                 write2TraceFile(f, buf, curLen);                
00634                 curLen = 0;
00635                 //cout << "flushed traces to disk: \n" << buf << endl;
00636                 strcpy(buf, oneTrace);
00637             } else {
00638                 strcat(buf, oneTrace);
00639             }
00640         }
00641         curLen += strLen;
00642     }
00643     if ( curLen > 0 ) {
00644         write2TraceFile(f, buf, curLen);
00645         //cout << "flushed traces to disk: \n" << buf << endl;
00646     }
00647     _tCache.clear();
00648     f.close();
00649 }
00650 
00651 // used for offline processing of (full monitoring with traces) only
00652 void
00653 XrdMonDecSink::checkpoint()
00654 {
00655     ::abort();
00656     /*
00657     enum { BUFSIZE = 1024*1024 };    
00658     char buf[BUFSIZE];
00659     int bufPos = 0;
00660     
00661     // open jnl file
00662     fstream f(_jnlPath.c_str(), ios::out);
00663 
00664     // save lastSeq and uniqueIds
00665     memcpy(buf+bufPos, &_lastSeq, sizeof(sequen_t));
00666     bufPos += sizeof(sequen_t);
00667     kXR_int32 v = htonl(_uniqueDictId);
00668     memcpy(buf+bufPos, &v, sizeof(dictid_t));
00669     bufPos += sizeof(dictid_t);
00670     v = htonl(_uniqueUserId);
00671     memcpy(buf+bufPos, &v, sizeof(dictid_t));
00672     bufPos += sizeof(dictid_t);
00673     
00674     // save all active XrdMonDecDictInfos
00675     int nr =0;
00676     map<dictid_t, XrdMonDecDictInfo*>::iterator itr;
00677     {
00678         vector<dictid_t> forDeletion;
00679         XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00680         for ( itr=_dCache.begin() ; itr != _dCache.end() ; ++itr ) {
00681             XrdMonDecDictInfo* di = itr->second;
00682             if ( di != 0 && ! di->isClosed() ) {
00683                 ++nr;
00684                 if ( di->stringSize() + bufPos >= BUFSIZE ) {
00685                     f.write(buf, bufPos);
00686                     bufPos = 0;
00687                 }
00688                 di->writeSelf2buf(buf, bufPos); // this will increment bufPos
00689                 delete itr->second;
00690                 forDeletion.push_back(itr->first);
00691             }
00692         }
00693         int s = forDeletion.size();
00694         for (int i=0 ; i<s ; ++i) {
00695             _dCache.erase(forDeletion[i]);
00696         }
00697     }
00698     if ( bufPos > 0 ) {
00699         f.write(buf, bufPos);
00700     }
00701     f.close();
00702     cout << "Saved in jnl file seq " << (int) _lastSeq
00703          << ", uniqueDictId " << _uniqueDictId 
00704          << ", uniqueUserId " << _uniqueUserId
00705          << " and " << nr << " XrdMonDecDictInfo objects." 
00706          << endl;
00707     */
00708 }
00709 
00710 // used for offline processing of (full monitoring with traces) only
00711 void
00712 XrdMonDecSink::openTraceFile(fstream& f)
00713 {
00714     //stringstream ss(stringstream::out);
00715     //ss << _path << "trace"
00716     //   << setw(3) << setfill('0') << _traceLogNumber
00717     //   << ".ascii";
00718     //string fPath = ss.str();
00719     //f.open(fPath.c_str(), ios::out | ios::app);
00720     cout << "trace log file open NOT IMPLEMENTED " << endl;
00721     ::abort();
00722 }
00723 
00724 // used for offline processing of (full monitoring with traces) only
00725 void
00726 XrdMonDecSink::write2TraceFile(fstream& f, 
00727                                const char* buf,
00728                                int len)
00729 {
00730     if ( ! f.is_open() ) {
00731         openTraceFile(f);
00732     }
00733     kXR_int64 tobeSize = len + f.tellp();
00734     if (  tobeSize > _maxTraceLogSize*1024*1024 ) {
00735         f.close();
00736         ++_traceLogNumber;
00737         openTraceFile(f);
00738         
00739     }
00740     f.write(buf, len);
00741 }
00742 
00743 vector<XrdMonDecDictInfo*>
00744 XrdMonDecSink::loadActiveDictInfo()
00745 {
00746     vector<XrdMonDecDictInfo*> v;
00747 
00748     if ( 0 != access(_jnlPath.c_str(), F_OK) ) {
00749         return v;
00750     }
00751 
00752     fstream f(_jnlPath.c_str(), ios::in);
00753     f.seekg(0, ios::end);
00754     int fSize = f.tellg();
00755     int pos = sizeof(sequen_t) + sizeof(kXR_int32);
00756     if ( fSize - pos == 0 ) {
00757         return v; // no active XrdMonDecDictInfo objects
00758     }
00759     f.seekg(pos); // skip seq and uniqueId
00760     char* buf = new char[fSize-pos];
00761     f.read(buf, fSize-pos);
00762 
00763     int bufPos = 0;
00764     while ( bufPos < fSize-pos ) {
00765         v.push_back( new XrdMonDecDictInfo(buf, bufPos) );
00766     }
00767     delete [] buf;
00768     
00769     return v;
00770 }    
00771 
00772 void
00773 XrdMonDecSink::registerLostPacket(dictid_t xrdId, const char* descr)
00774 {
00775     map<dictid_t, long>::iterator lostItr = _lost.find(xrdId);
00776     if ( lostItr == _lost.end() ) {
00777         cerr << descr << ": cannot find dictID " << xrdId << endl;
00778         _lost[xrdId] = 1;
00779     } else {
00780         ++lostItr->second;
00781     }
00782 }
00783 
00784 void
00785 XrdMonDecSink::flushHistoryData()
00786 {
00787     cout << "Flushing decoded data..." << endl;
00788     flushClosedDicts();
00789     flushUserCache();
00790 }
00791 
00792 void
00793 XrdMonDecSink::reset(senderid_t senderId)
00794 {
00795     flushClosedDicts();
00796 
00797     {
00798         XrdSysMutexHelper mh; mh.Lock(&_dMutex);
00799         resetDMap(senderId);
00800     }
00801     
00802     {
00803         XrdSysMutexHelper mh; mh.Lock(&_uMutex);
00804         resetUMap(senderId);
00805     }    
00806 }
00807 
00808 void
00809 XrdMonDecSink::resetDMap(senderid_t senderId)
00810 {
00811     if ( senderId >= _dCache.size() ) {
00812         return;
00813     }
00814     dmap_t* m = _dCache[senderId];
00815     dmapitr_t itr;
00816     for ( itr=m->begin() ; itr != m->end() ; ++itr ) {
00817         delete itr->second;
00818     }
00819     m->clear();
00820 }
00821 
00822 void
00823 XrdMonDecSink::resetUMap(senderid_t senderId)
00824 {
00825     if ( senderId >= _uCache.size() ) {
00826         return;
00827     }    
00828     umap_t* m = _uCache[senderId];
00829     umapitr_t itr;
00830     for ( itr=m->begin() ; itr != m->end() ; ++itr ) {
00831         delete itr->second;
00832     }
00833     m->clear();
00834 }
00835 
00836 void
00837 XrdMonDecSink::reportLostPackets()
00838 {
00839     int size = _lost.size();
00840     if ( size > 0 ) {
00841         cout << "Lost " << size << " dictIds {id, #lostTraces}: ";
00842         map<dictid_t, long>::iterator lostItr = _lost.begin();
00843         while ( lostItr != _lost.end() ) {
00844             cout << "{"<< lostItr->first << ", " << lostItr->second << "} ";
00845             ++lostItr;
00846         }    
00847         cout << endl;
00848     }
00849 }
00850 
00851 void
00852 XrdMonDecSink::registerXrdRestart(kXR_int32 stod, senderid_t senderId)
00853 {
00854     char t[24];
00855     timestamp2string(stod, t, GMT);
00856     const char* h = XrdMonSenderInfo::id2Host(senderId);
00857 
00858     if ( 0 != _rtLogger ) {
00859         char buf[512];
00860         sprintf(buf, "r\t%s\t%s\n", h, t);
00861         _rtLogger->add(buf);
00862     }
00863 
00864     fstream f(_xrdRestartLog.c_str(), ios::out | ios::app);
00865     f << h << '\t' << t << endl;
00866     f.close();
00867 }
00868 
00869 void
00870 XrdMonDecSink::addVersion() 
00871 {
00872     char buf[16];
00873     sprintf(buf, "v\t%03d\n", XRDMON_VERSION);
00874     _rtLogger->add(buf);
00875 }

Generated on Tue Jul 5 14:46:43 2011 for ROOT_528-00b_version by  doxygen 1.5.1