00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "XrdMon/XrdMonArgParser.hh"
00014 #include "XrdMon/XrdMonArgParserConvert.hh"
00015 #include "XrdMon/XrdMonUtils.hh"
00016 #include "XrdMon/XrdMonSndCoder.hh"
00017 #include "XrdMon/XrdMonSndDebug.hh"
00018 #include "XrdMon/XrdMonSndDictEntry.hh"
00019 #include "XrdMon/XrdMonSndDummyXrootd.hh"
00020 #include "XrdMon/XrdMonSndTraceCache.hh"
00021 #include "XrdMon/XrdMonSndTraceEntry.hh"
00022 #include "XrdMon/XrdMonSndTransmitter.hh"
00023
00024 #include <assert.h>
00025 #include <unistd.h>
00026 #include <sys/time.h>
00027 using namespace XrdMonArgParserConvert;
00028
00029
00030
00031 const kXR_int64 NOCALLS = 1000000;
00032 const kXR_int16 maxNoXrdMonSndPackets = 50;
00033
00034 void
00035 printHelp()
00036 {
00037 cout << "\nxrdmonDummySender\n"
00038 << " [-host <hostName>]\n"
00039 << " [-port <portNr>]\n"
00040 << "\n"
00041 << "-host <hostName> Name of the receiver's host.\n"
00042 << " Default value is \"" << DEFAULT_HOST << "\".\n"
00043 << "-port <portNr> Port number of the receiver's host\n"
00044 << " Default valus is \"" << DEFAULT_PORT << "\".\n"
00045 << endl;
00046 }
00047
00048 void
00049 doDictionaryXrdMonSndPacket(XrdMonSndDummyXrootd& xrootd,
00050 XrdMonSndCoder& coder,
00051 XrdMonSndTransmitter& transmitter,
00052 kXR_int64& noP)
00053 {
00054 XrdMonSndDictEntry m = xrootd.newXrdMonSndDictEntry();
00055 cout << m << endl;
00056
00057 XrdMonSndDictEntry::CompactEntry ce = m.code();
00058
00059 if ( 0 == coder.prepare2Transfer(ce) ) {
00060 transmitter(coder.packet());
00061 coder.reset();
00062 ++noP;
00063 }
00064 }
00065
00066 void
00067 doStageXrdMonSndPacket(XrdMonSndDummyXrootd& xrootd,
00068 XrdMonSndCoder& coder,
00069 XrdMonSndTransmitter& transmitter,
00070 kXR_int64& noP)
00071 {
00072 XrdMonSndStageEntry m = xrootd.newXrdMonSndStageEntry();
00073 cout << "about to send this stage package:" << m << endl;
00074
00075 XrdMonSndStageEntry::CompactEntry ce = m.code();
00076
00077 if ( 0 == coder.prepare2Transfer(ce) ) {
00078 transmitter(coder.packet());
00079 coder.reset();
00080 ++noP;
00081 }
00082 }
00083
00084 void
00085 doTraceXrdMonSndPacket(XrdMonSndDummyXrootd& xrootd,
00086 XrdMonSndCoder& coder,
00087 XrdMonSndTransmitter& transmitter,
00088 XrdMonSndTraceCache& cache,
00089 kXR_int64& noP)
00090 {
00091 XrdMonSndTraceEntry de = xrootd.newXrdMonSndTraceEntry();
00092
00093 cache.add(de);
00094 if ( ! cache.bufferFull() ) {
00095 return;
00096 }
00097
00098 if ( 0 == coder.prepare2Transfer(cache.getVector()) ) {
00099 cache.clear();
00100 transmitter(coder.packet());
00101 coder.reset();
00102 noP++;
00103 }
00104 }
00105
00106 void
00107 closeFiles(XrdMonSndDummyXrootd& xrootd,
00108 XrdMonSndCoder& coder,
00109 XrdMonSndTransmitter& transmitter,
00110 kXR_int64& noP,
00111 bool justOne)
00112 {
00113 vector<kXR_int32> closedFiles;
00114 if ( justOne ) {
00115 kXR_int32 id = xrootd.closeOneFile();
00116 closedFiles.push_back(id);
00117 } else {
00118 xrootd.closeFiles(closedFiles);
00119 }
00120
00121 int s = closedFiles.size();
00122 int pos = 0;
00123 unsigned int i;
00124 while ( pos < s ) {
00125 vector<kXR_int32> v;
00126 for (i=0 ; i<XrdMonSndTraceCache::NODATAELEMS-2 && pos<s ; ++i, ++pos) {
00127 v.push_back(closedFiles.back());
00128 closedFiles.pop_back();
00129 }
00130 coder.prepare2Transfer(v);
00131 transmitter(coder.packet());
00132 noP++;
00133 }
00134 }
00135
00136
00137
00138 int main(int argc, char* argv[])
00139 {
00140 XrdMonArgParser::ArgImpl<const char*, Convert2String>
00141 arg_host("-host", DEFAULT_HOST);
00142 XrdMonArgParser::ArgImpl<int, Convert2Int>
00143 arg_port("-port", DEFAULT_PORT);
00144
00145 try {
00146 XrdMonArgParser argParser;
00147 argParser.registerExpectedArg(&arg_host);
00148 argParser.registerExpectedArg(&arg_port);
00149 argParser.parseArguments(argc, argv);
00150 } catch (XrdMonException& e) {
00151 e.printIt();
00152 printHelp();
00153 return 1;
00154 }
00155
00156 const char* inputPathFile = "./paths.txt";
00157
00158 kXR_int32 seed = 12345;
00159
00160 srand(seed);
00161
00162 XrdMonSndDummyXrootd::NEWUSERFREQUENCY = 200;
00163 XrdMonSndDummyXrootd::NEWPROCFREQUENCY = 50;
00164 kXR_int16 NEWDICTENTRYFREQUENCY = 8000;
00165 kXR_int16 calls2NewXrdMonSndDictEntry = 1;
00166
00167 XrdMonSndDebug::initialize();
00168
00169 XrdMonSndDummyXrootd xrootd;
00170 assert ( !xrootd.initialize(inputPathFile) );
00171
00172 XrdMonSndTraceCache cache;
00173 XrdMonSndCoder coder;
00174 XrdMonSndTransmitter transmitter;
00175
00176 assert ( !transmitter.initialize(arg_host.myVal(), arg_port.myVal()) );
00177 kXR_int64 noP = 0;
00178
00179 while ( 0 != access("start.txt", F_OK) ) {
00180 static bool warned = false;
00181 if ( ! warned ) {
00182 cout << "Waiting for start.txt file\n";
00183 warned = true;
00184 }
00185 sleep(1);
00186 }
00187
00188 bool sendLight = true;
00189
00190 if ( sendLight ) {
00191 cout << "\n***** sending LIGHT data *****\n" << endl;
00192 for ( kXR_int64 i=0 ; i<NOCALLS ; i++ ) {
00193 calls2NewXrdMonSndDictEntry = NEWDICTENTRYFREQUENCY;
00194 doDictionaryXrdMonSndPacket(xrootd, coder, transmitter, noP);
00195 doStageXrdMonSndPacket(xrootd, coder, transmitter, noP);
00196 if ( i % 3 ) {
00197 closeFiles(xrootd, coder, transmitter, noP, true);
00198 }
00199 if ( noP >= maxNoXrdMonSndPackets-2 ) {
00200 break;
00201 }
00202 if ( 0 == access("stop.txt", F_OK) ) {
00203 break;
00204 }
00205 if ( i%1001 == 1000 ) {
00206 usleep(1);
00207 }
00208 }
00209 } else {
00210 cout << "\n***** sending BULK data *****\n" << endl;
00211 for ( kXR_int64 i=0 ; i<NOCALLS ; i++ ) {
00212 if ( ! --calls2NewXrdMonSndDictEntry ) {
00213 calls2NewXrdMonSndDictEntry = NEWDICTENTRYFREQUENCY;
00214 doDictionaryXrdMonSndPacket(xrootd, coder, transmitter, noP);
00215 } else {
00216 doTraceXrdMonSndPacket(xrootd, coder, transmitter, cache, noP);
00217 }
00218 if ( noP >= maxNoXrdMonSndPackets-2 ) {
00219 break;
00220 }
00221 if ( 0 == access("stop.txt", F_OK) ) {
00222 break;
00223 }
00224 if ( i%1001 == 1000 ) {
00225 usleep(1);
00226 }
00227 }
00228 }
00229
00230 if ( XrdMonSndDebug::verbose(XrdMonSndDebug::Sending) ) {
00231 cout << "Flushing cache" << endl;
00232 }
00233 if ( 0 == coder.prepare2Transfer(cache.getVector()) ) {
00234 cache.clear();
00235 transmitter(coder.packet());
00236 coder.reset();
00237 noP++;
00238 }
00239
00240 closeFiles(xrootd, coder, transmitter, noP, false);
00241
00242
00243
00244
00245
00246
00247
00248 transmitter.shutdown();
00249 coder.printStats();
00250
00251 return 0;
00252 }