XrdMonSndCoder.cc

Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                            XrdMonSndCoder.cc                              */
00004 /*                                                                           */
00005 /* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00010 
00011 // $Id: XrdMonSndCoder.cc 24468 2008-06-22 16:47:03Z ganis $
00012 
00013 #include "XrdMon/XrdMonSndCoder.hh"
00014 #include "XrdXrootd/XrdXrootdMonData.hh"
00015 #include <sys/time.h>
00016 #include <iomanip>
00017 using std::setw;
00018 
00019 kXR_int32 XrdMonSndCoder::_serverStartTime = 0;
00020 
00021 XrdMonSndCoder::XrdMonSndCoder()
00022     : _sequenceNo(0),
00023       _noDict(0),
00024       _noOpen(0),
00025       _noClose(0),
00026       _noTrace(0),
00027       _noTime(0)
00028 {
00029     if ( 0 == _serverStartTime ) {
00030         struct timeval tv;
00031         gettimeofday(&tv, 0);
00032         _serverStartTime = tv.tv_sec;
00033     }
00034 }
00035 
00036 int
00037 XrdMonSndCoder::prepare2Transfer(const XrdMonSndAdminEntry& ae)
00038 {
00039     kXR_int32 packetSize = HDRLEN + ae.size();
00040 
00041     int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_ADMIN);
00042     if ( 0 != ret ) {
00043         return ret;
00044     }
00045 
00046     add_kXR_int16(ae.command());
00047     add_kXR_int16(ae.arg());
00048 
00049     return 0;
00050 }
00051 
00052 int
00053 XrdMonSndCoder::prepare2Transfer(const vector<XrdMonSndTraceEntry>& vector)
00054 {
00055     kXR_int16 noElems = vector.size() + 3; // 3: 3 time entries
00056     if (vector.size() == 0 ) {
00057         noElems = 0;
00058     }
00059     
00060     kXR_int32 packetSize = HDRLEN + noElems * TRACEELEMLEN;
00061     if ( packetSize > MAXPACKETSIZE ) {
00062         cerr << "Internal error: cached too many entries: " << noElems
00063              << ", MAXPACKETSIZE = " << MAXPACKETSIZE;
00064         noElems = (MAXPACKETSIZE-HDRLEN) / TRACEELEMLEN;
00065         cerr << " Will send only " << noElems << endl;
00066     }
00067 
00068     int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_TRACE);
00069     if ( 0 != ret ) {
00070         return ret;
00071     }
00072 
00073     kXR_int16 middle = noElems/2;
00074     kXR_int32 curTime = time(0);
00075     for (kXR_int16 i=0 ; i<noElems-3 ; i++ ) {
00076         if (i== 0) { // add time entry
00077             add_Mark(XROOTD_MON_WINDOW);
00078             add_kXR_int32(curTime); // prev window ended
00079             add_kXR_int32(curTime);   // this window started
00080             ++_noTime;
00081             if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00082                 cout << "Adding time window {" << curTime << ", " 
00083                      << curTime << "}" << ", elem no " << i << endl;
00084             }
00085         }
00086         if (i== middle) { // add time entry
00087             add_Mark(XROOTD_MON_WINDOW);
00088             add_kXR_int32(curTime); // prev window ended
00089             add_kXR_int32(curTime);   // this window started
00090             ++_noTime;
00091             if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00092                 cout << "Adding time window {" << curTime << ", " 
00093                      << curTime << "}" << ", elem no " << i << endl;
00094             }
00095         }
00096         const XrdMonSndTraceEntry& de = vector[i];
00097         add_kXR_int64(de.offset());
00098         add_kXR_int32(de.length());
00099         add_kXR_int32(de.id()    );
00100         if (i==noElems-4) {
00101             add_Mark(XROOTD_MON_WINDOW);
00102             add_kXR_int32(curTime); // prev window ended
00103             add_kXR_int32(curTime);   // this window started
00104             ++_noTime;
00105             if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00106                 cout << "Adding time window {" << curTime << ", " 
00107                      << curTime << "}" << ", elem no " << i << endl;
00108             }
00109         }
00110     }
00111     _noTrace += vector.size();
00112     
00113     return 0;
00114 }
00115 
00116 
00117 pair<char, kXR_unt32>
00118 XrdMonSndCoder::generateBigNumber(const char* descr)
00119 {
00120     kXR_int64 xOrg = 1000000000000LL + rand();
00121     char nuToShift = 0;
00122     kXR_int64 x = xOrg;
00123     while ( x > 4294967296LL ) {
00124         ++nuToShift;
00125         x = x >> 1;
00126     }
00127     cout << "Want to send #" << descr << " " << xOrg
00128          << ", sending " << x << " noShifted " 
00129          << (int) nuToShift << endl;
00130 
00131     return pair<char, kXR_unt32>(nuToShift, static_cast<kXR_unt32>(x));
00132 }
00133 
00134 int 
00135 XrdMonSndCoder::prepare2Transfer(const vector<kXR_int32>& vector)
00136 {
00137     kXR_int16 noElems = vector.size() + 2; // 2: 2 time entries
00138     int8_t sizeOfXrdMonSndTraceEntry = sizeof(kXR_int64)+sizeof(kXR_int32)+sizeof(kXR_int32);
00139     kXR_int32 packetSize = HDRLEN + noElems * sizeOfXrdMonSndTraceEntry;
00140     if ( packetSize > MAXPACKETSIZE ) {
00141         cerr << "Internal error: cached too many entries: " << noElems
00142              << ", MAXPACKETSIZE = " << MAXPACKETSIZE;
00143         noElems = (MAXPACKETSIZE-HDRLEN) / sizeOfXrdMonSndTraceEntry;
00144         cerr << " Will send only " << noElems << endl;
00145     }
00146 
00147     int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_TRACE);
00148     if ( 0 != ret ) {
00149         return ret;
00150     }
00151 
00152     kXR_int32 curTime = time(0);
00153 
00154     struct XrdXrootdMonTrace trace;
00155     memset(&trace, 0, sizeof(XrdXrootdMonTrace));
00156     trace.arg0.id[0]  = XROOTD_MON_WINDOW;
00157     trace.arg1.Window = 
00158     trace.arg2.Window = htonl(curTime);
00159     memcpy(writeHere(), &trace, sizeof(XrdXrootdMonTrace));
00160     _putOffset += sizeof(XrdXrootdMonTrace);
00161     
00162     ++_noTime;
00163     if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00164         cout << "Adding time window {" << curTime << ", " 
00165              << curTime << "}" << ", elem no 0" << endl;
00166     }
00167 
00168     for (kXR_int16 i=0 ; i<noElems-2 ; i++ ) {
00169         static int largeNr = 0; // from time to time need to send very large nr
00170         kXR_unt32 rT, wT;
00171 
00172         memset(&trace, 0, sizeof(XrdXrootdMonTrace));
00173         trace.arg0.id[0]   = XROOTD_MON_CLOSE;
00174         if ( ++ largeNr % 11 == 10 ) {
00175             // generate # bytes read/writen (larger than 2^32, shifted)
00176             pair<char, kXR_unt32> bigR = generateBigNumber("read");
00177             pair<char, kXR_unt32> bigW = generateBigNumber("write");
00178 
00179             trace.arg0.id[1] = bigR.first;
00180             trace.arg0.id[2] = bigW.first;
00181             rT = bigR.second;
00182             wT = bigW.second;
00183         } else {
00184             // generate # bytes read/writen (smaller than 2^32)
00185             rT = (kXR_unt32) rand();
00186             wT = (kXR_unt32) rand() / 512;
00187         }
00188 
00189         trace.arg0.rTot[1] = htonl(rT);
00190         trace.arg1.wTot    = htonl(wT);
00191         trace.arg2.dictid  = htonl(vector[i]);
00192         memcpy(writeHere(), &trace, sizeof(XrdXrootdMonTrace));
00193         _putOffset += sizeof(XrdXrootdMonTrace);
00194 
00195         ++_noClose;
00196         cout << "closing file, dictid " << vector[i] 
00197              << ", r=" << rT
00198              << ", w=" << wT << endl;
00199     }
00200 
00201     memset(&trace, 0, sizeof(XrdXrootdMonTrace));
00202     trace.arg0.id[0]  = XROOTD_MON_WINDOW;
00203     trace.arg1.Window = 
00204     trace.arg2.Window = htonl(curTime);
00205     memcpy(writeHere(), &trace, sizeof(XrdXrootdMonTrace));
00206     _putOffset += sizeof(XrdXrootdMonTrace);
00207 
00208     ++_noTime;
00209     if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00210         cout << "Adding time window {" << curTime << ", " 
00211              << curTime << "}" << ", elem no " << noElems-2 << endl;
00212     }
00213     return 0;
00214 }
00215 
00216 
00217 
00218 int
00219 XrdMonSndCoder::prepare2Transfer(const XrdMonSndDictEntry::CompactEntry& ce)
00220 {
00221     kXR_int32 packetSize = HDRLEN + ce.size();
00222 
00223     int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_DICT);
00224     if ( 0 != ret ) {
00225         return ret;
00226     }
00227 
00228     add_kXR_int32(ce.id);
00229     add_string  (ce.others);
00230 
00231     ++_noDict;
00232     
00233     return 0;
00234 }
00235 
00236 
00237 int
00238 XrdMonSndCoder::prepare2Transfer(const XrdMonSndStageEntry::CompactEntry& ce)
00239 {
00240     kXR_int32 packetSize = HDRLEN + ce.size();
00241 
00242     int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_STAGE);
00243     if ( 0 != ret ) {
00244         return ret;
00245     }
00246 
00247     add_kXR_int32(ce.id);
00248     add_string  (ce.others);
00249 
00250     ++_noDict;
00251     
00252     return 0;
00253 }
00254 
00255 int
00256 XrdMonSndCoder::reinitXrdMonSndPacket(packetlen_t newSize, char packetCode)
00257 {
00258     _putOffset = 0;
00259     int ret = _packet.init(newSize);
00260     if ( 0 != ret ) {
00261         return ret;
00262     }
00263 
00264     if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00265         cout << "XrdMonSndPacket " << packetCode 
00266              << ", size " << setw(5) << newSize 
00267              << ", sequenceNo " << setw(3) << (int) _sequenceNo 
00268              << ", time " << _serverStartTime
00269              << " prepared for sending" << endl;
00270     }
00271     add_int08_t(packetCode);
00272     add_int08_t(_sequenceNo++);
00273     add_kXR_unt16(newSize);
00274     add_kXR_int32(_serverStartTime);
00275 
00276     return 0;
00277 }
00278 
00279 void
00280 XrdMonSndCoder::printStats() const
00281 {
00282     cout <<   "dict="    << _noDict
00283          << ", noOpen="  << _noOpen
00284          << ", noClose=" << _noClose
00285          << ", noTrace=" << _noTrace
00286          << ", noTime="  << _noTime << endl;
00287 }
00288 

Generated on Tue Jul 5 14:46:43 2011 for ROOT_528-00b_version by  doxygen 1.5.1