00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "XrdMon/XrdMonSndCoder.hh"
00014 #include "XrdXrootd/XrdXrootdMonData.hh"
00015 #include <sys/time.h>
00016 #include <iomanip>
00017 using std::setw;
00018
00019 kXR_int32 XrdMonSndCoder::_serverStartTime = 0;
00020
00021 XrdMonSndCoder::XrdMonSndCoder()
00022 : _sequenceNo(0),
00023 _noDict(0),
00024 _noOpen(0),
00025 _noClose(0),
00026 _noTrace(0),
00027 _noTime(0)
00028 {
00029 if ( 0 == _serverStartTime ) {
00030 struct timeval tv;
00031 gettimeofday(&tv, 0);
00032 _serverStartTime = tv.tv_sec;
00033 }
00034 }
00035
00036 int
00037 XrdMonSndCoder::prepare2Transfer(const XrdMonSndAdminEntry& ae)
00038 {
00039 kXR_int32 packetSize = HDRLEN + ae.size();
00040
00041 int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_ADMIN);
00042 if ( 0 != ret ) {
00043 return ret;
00044 }
00045
00046 add_kXR_int16(ae.command());
00047 add_kXR_int16(ae.arg());
00048
00049 return 0;
00050 }
00051
00052 int
00053 XrdMonSndCoder::prepare2Transfer(const vector<XrdMonSndTraceEntry>& vector)
00054 {
00055 kXR_int16 noElems = vector.size() + 3;
00056 if (vector.size() == 0 ) {
00057 noElems = 0;
00058 }
00059
00060 kXR_int32 packetSize = HDRLEN + noElems * TRACEELEMLEN;
00061 if ( packetSize > MAXPACKETSIZE ) {
00062 cerr << "Internal error: cached too many entries: " << noElems
00063 << ", MAXPACKETSIZE = " << MAXPACKETSIZE;
00064 noElems = (MAXPACKETSIZE-HDRLEN) / TRACEELEMLEN;
00065 cerr << " Will send only " << noElems << endl;
00066 }
00067
00068 int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_TRACE);
00069 if ( 0 != ret ) {
00070 return ret;
00071 }
00072
00073 kXR_int16 middle = noElems/2;
00074 kXR_int32 curTime = time(0);
00075 for (kXR_int16 i=0 ; i<noElems-3 ; i++ ) {
00076 if (i== 0) {
00077 add_Mark(XROOTD_MON_WINDOW);
00078 add_kXR_int32(curTime);
00079 add_kXR_int32(curTime);
00080 ++_noTime;
00081 if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00082 cout << "Adding time window {" << curTime << ", "
00083 << curTime << "}" << ", elem no " << i << endl;
00084 }
00085 }
00086 if (i== middle) {
00087 add_Mark(XROOTD_MON_WINDOW);
00088 add_kXR_int32(curTime);
00089 add_kXR_int32(curTime);
00090 ++_noTime;
00091 if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00092 cout << "Adding time window {" << curTime << ", "
00093 << curTime << "}" << ", elem no " << i << endl;
00094 }
00095 }
00096 const XrdMonSndTraceEntry& de = vector[i];
00097 add_kXR_int64(de.offset());
00098 add_kXR_int32(de.length());
00099 add_kXR_int32(de.id() );
00100 if (i==noElems-4) {
00101 add_Mark(XROOTD_MON_WINDOW);
00102 add_kXR_int32(curTime);
00103 add_kXR_int32(curTime);
00104 ++_noTime;
00105 if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00106 cout << "Adding time window {" << curTime << ", "
00107 << curTime << "}" << ", elem no " << i << endl;
00108 }
00109 }
00110 }
00111 _noTrace += vector.size();
00112
00113 return 0;
00114 }
00115
00116
00117 pair<char, kXR_unt32>
00118 XrdMonSndCoder::generateBigNumber(const char* descr)
00119 {
00120 kXR_int64 xOrg = 1000000000000LL + rand();
00121 char nuToShift = 0;
00122 kXR_int64 x = xOrg;
00123 while ( x > 4294967296LL ) {
00124 ++nuToShift;
00125 x = x >> 1;
00126 }
00127 cout << "Want to send #" << descr << " " << xOrg
00128 << ", sending " << x << " noShifted "
00129 << (int) nuToShift << endl;
00130
00131 return pair<char, kXR_unt32>(nuToShift, static_cast<kXR_unt32>(x));
00132 }
00133
00134 int
00135 XrdMonSndCoder::prepare2Transfer(const vector<kXR_int32>& vector)
00136 {
00137 kXR_int16 noElems = vector.size() + 2;
00138 int8_t sizeOfXrdMonSndTraceEntry = sizeof(kXR_int64)+sizeof(kXR_int32)+sizeof(kXR_int32);
00139 kXR_int32 packetSize = HDRLEN + noElems * sizeOfXrdMonSndTraceEntry;
00140 if ( packetSize > MAXPACKETSIZE ) {
00141 cerr << "Internal error: cached too many entries: " << noElems
00142 << ", MAXPACKETSIZE = " << MAXPACKETSIZE;
00143 noElems = (MAXPACKETSIZE-HDRLEN) / sizeOfXrdMonSndTraceEntry;
00144 cerr << " Will send only " << noElems << endl;
00145 }
00146
00147 int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_TRACE);
00148 if ( 0 != ret ) {
00149 return ret;
00150 }
00151
00152 kXR_int32 curTime = time(0);
00153
00154 struct XrdXrootdMonTrace trace;
00155 memset(&trace, 0, sizeof(XrdXrootdMonTrace));
00156 trace.arg0.id[0] = XROOTD_MON_WINDOW;
00157 trace.arg1.Window =
00158 trace.arg2.Window = htonl(curTime);
00159 memcpy(writeHere(), &trace, sizeof(XrdXrootdMonTrace));
00160 _putOffset += sizeof(XrdXrootdMonTrace);
00161
00162 ++_noTime;
00163 if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00164 cout << "Adding time window {" << curTime << ", "
00165 << curTime << "}" << ", elem no 0" << endl;
00166 }
00167
00168 for (kXR_int16 i=0 ; i<noElems-2 ; i++ ) {
00169 static int largeNr = 0;
00170 kXR_unt32 rT, wT;
00171
00172 memset(&trace, 0, sizeof(XrdXrootdMonTrace));
00173 trace.arg0.id[0] = XROOTD_MON_CLOSE;
00174 if ( ++ largeNr % 11 == 10 ) {
00175
00176 pair<char, kXR_unt32> bigR = generateBigNumber("read");
00177 pair<char, kXR_unt32> bigW = generateBigNumber("write");
00178
00179 trace.arg0.id[1] = bigR.first;
00180 trace.arg0.id[2] = bigW.first;
00181 rT = bigR.second;
00182 wT = bigW.second;
00183 } else {
00184
00185 rT = (kXR_unt32) rand();
00186 wT = (kXR_unt32) rand() / 512;
00187 }
00188
00189 trace.arg0.rTot[1] = htonl(rT);
00190 trace.arg1.wTot = htonl(wT);
00191 trace.arg2.dictid = htonl(vector[i]);
00192 memcpy(writeHere(), &trace, sizeof(XrdXrootdMonTrace));
00193 _putOffset += sizeof(XrdXrootdMonTrace);
00194
00195 ++_noClose;
00196 cout << "closing file, dictid " << vector[i]
00197 << ", r=" << rT
00198 << ", w=" << wT << endl;
00199 }
00200
00201 memset(&trace, 0, sizeof(XrdXrootdMonTrace));
00202 trace.arg0.id[0] = XROOTD_MON_WINDOW;
00203 trace.arg1.Window =
00204 trace.arg2.Window = htonl(curTime);
00205 memcpy(writeHere(), &trace, sizeof(XrdXrootdMonTrace));
00206 _putOffset += sizeof(XrdXrootdMonTrace);
00207
00208 ++_noTime;
00209 if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00210 cout << "Adding time window {" << curTime << ", "
00211 << curTime << "}" << ", elem no " << noElems-2 << endl;
00212 }
00213 return 0;
00214 }
00215
00216
00217
00218 int
00219 XrdMonSndCoder::prepare2Transfer(const XrdMonSndDictEntry::CompactEntry& ce)
00220 {
00221 kXR_int32 packetSize = HDRLEN + ce.size();
00222
00223 int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_DICT);
00224 if ( 0 != ret ) {
00225 return ret;
00226 }
00227
00228 add_kXR_int32(ce.id);
00229 add_string (ce.others);
00230
00231 ++_noDict;
00232
00233 return 0;
00234 }
00235
00236
00237 int
00238 XrdMonSndCoder::prepare2Transfer(const XrdMonSndStageEntry::CompactEntry& ce)
00239 {
00240 kXR_int32 packetSize = HDRLEN + ce.size();
00241
00242 int ret = reinitXrdMonSndPacket(packetSize, PACKET_TYPE_STAGE);
00243 if ( 0 != ret ) {
00244 return ret;
00245 }
00246
00247 add_kXR_int32(ce.id);
00248 add_string (ce.others);
00249
00250 ++_noDict;
00251
00252 return 0;
00253 }
00254
00255 int
00256 XrdMonSndCoder::reinitXrdMonSndPacket(packetlen_t newSize, char packetCode)
00257 {
00258 _putOffset = 0;
00259 int ret = _packet.init(newSize);
00260 if ( 0 != ret ) {
00261 return ret;
00262 }
00263
00264 if ( XrdMonSndDebug::verbose(XrdMonSndDebug::SPacket) ) {
00265 cout << "XrdMonSndPacket " << packetCode
00266 << ", size " << setw(5) << newSize
00267 << ", sequenceNo " << setw(3) << (int) _sequenceNo
00268 << ", time " << _serverStartTime
00269 << " prepared for sending" << endl;
00270 }
00271 add_int08_t(packetCode);
00272 add_int08_t(_sequenceNo++);
00273 add_kXR_unt16(newSize);
00274 add_kXR_int32(_serverStartTime);
00275
00276 return 0;
00277 }
00278
00279 void
00280 XrdMonSndCoder::printStats() const
00281 {
00282 cout << "dict=" << _noDict
00283 << ", noOpen=" << _noOpen
00284 << ", noClose=" << _noClose
00285 << ", noTrace=" << _noTrace
00286 << ", noTime=" << _noTime << endl;
00287 }
00288