XrdMonSndDummyXrootdApp.cc

Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                        XrdMonSndDummyXrootdApp.cc                         */
00004 /*                                                                           */
00005 /* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00010 
00011 // $Id: XrdMonSndDummyXrootdApp.cc 24468 2008-06-22 16:47:03Z ganis $
00012 
00013 #include "XrdMon/XrdMonArgParser.hh"
00014 #include "XrdMon/XrdMonArgParserConvert.hh"
00015 #include "XrdMon/XrdMonUtils.hh"
00016 #include "XrdMon/XrdMonSndCoder.hh"
00017 #include "XrdMon/XrdMonSndDebug.hh"
00018 #include "XrdMon/XrdMonSndDictEntry.hh"
00019 #include "XrdMon/XrdMonSndDummyXrootd.hh"
00020 #include "XrdMon/XrdMonSndTraceCache.hh"
00021 #include "XrdMon/XrdMonSndTraceEntry.hh"
00022 #include "XrdMon/XrdMonSndTransmitter.hh"
00023 
00024 #include <assert.h>
00025 #include <unistd.h>  /* usleep */
00026 #include <sys/time.h>
00027 using namespace XrdMonArgParserConvert;
00028 
00029 // known problems with 2 and 4
00030 //const kXR_int64 NOCALLS = 8640000;   24h worth
00031 const kXR_int64 NOCALLS = 1000000;
00032 const kXR_int16 maxNoXrdMonSndPackets = 50;
00033 
00034 void
00035 printHelp()
00036 {
00037     cout << "\nxrdmonDummySender\n"
00038          << "    [-host <hostName>]\n"
00039          << "    [-port <portNr>]\n"
00040          << "\n"
00041          << "-host <hostName>         Name of the receiver's host.\n"
00042          << "                         Default value is \"" << DEFAULT_HOST << "\".\n"
00043          << "-port <portNr>           Port number of the receiver's host\n"
00044          << "                         Default valus is \"" << DEFAULT_PORT << "\".\n"
00045          << endl;
00046 }
00047 
00048 void
00049 doDictionaryXrdMonSndPacket(XrdMonSndDummyXrootd& xrootd, 
00050                             XrdMonSndCoder& coder,
00051                             XrdMonSndTransmitter& transmitter,
00052                             kXR_int64& noP)
00053 {
00054     XrdMonSndDictEntry m = xrootd.newXrdMonSndDictEntry();
00055     cout << m << endl;
00056 
00057     XrdMonSndDictEntry::CompactEntry ce = m.code();
00058     
00059     if ( 0 == coder.prepare2Transfer(ce) ) {
00060         transmitter(coder.packet());
00061         coder.reset();
00062         ++noP;
00063     }
00064 }
00065 
00066 void
00067 doStageXrdMonSndPacket(XrdMonSndDummyXrootd& xrootd, 
00068                        XrdMonSndCoder& coder,
00069                        XrdMonSndTransmitter& transmitter,
00070                        kXR_int64& noP)
00071 {
00072     XrdMonSndStageEntry m = xrootd.newXrdMonSndStageEntry();
00073     cout << "about to send this stage package:"  << m << endl;
00074 
00075     XrdMonSndStageEntry::CompactEntry ce = m.code();
00076     
00077     if ( 0 == coder.prepare2Transfer(ce) ) {
00078         transmitter(coder.packet());
00079         coder.reset();
00080         ++noP;
00081     }
00082 }
00083 
00084 void
00085 doTraceXrdMonSndPacket(XrdMonSndDummyXrootd& xrootd,
00086                        XrdMonSndCoder& coder, 
00087                        XrdMonSndTransmitter& transmitter,
00088                        XrdMonSndTraceCache& cache, 
00089                        kXR_int64& noP)
00090 {
00091     XrdMonSndTraceEntry de = xrootd.newXrdMonSndTraceEntry();
00092     // add to buffer, perhaps transmit
00093     cache.add(de);
00094     if ( ! cache.bufferFull() ) {
00095         return;
00096     }
00097     
00098     if ( 0 == coder.prepare2Transfer(cache.getVector()) ) {
00099         cache.clear();
00100         transmitter(coder.packet());
00101         coder.reset();
00102         noP++;
00103     }
00104 }
00105 
00106 void
00107 closeFiles(XrdMonSndDummyXrootd& xrootd,
00108            XrdMonSndCoder& coder, 
00109            XrdMonSndTransmitter& transmitter,
00110            kXR_int64& noP,
00111            bool justOne)
00112 {
00113     vector<kXR_int32> closedFiles;
00114     if ( justOne ) {
00115         kXR_int32 id = xrootd.closeOneFile();
00116         closedFiles.push_back(id);
00117     } else {
00118         xrootd.closeFiles(closedFiles);
00119     }
00120     
00121     int s = closedFiles.size();
00122     int pos = 0;
00123     unsigned int i;
00124     while ( pos < s ) {
00125         vector<kXR_int32> v;
00126         for (i=0 ; i<XrdMonSndTraceCache::NODATAELEMS-2 && pos<s ; ++i, ++pos) {
00127             v.push_back(closedFiles.back());
00128             closedFiles.pop_back();
00129         }
00130         coder.prepare2Transfer(v);
00131         transmitter(coder.packet());
00132         noP++;
00133     }
00134 }
00135 
00136 // XrdMonSndDummyXrootd - main class
00137 
00138 int main(int argc, char* argv[])
00139 {
00140     XrdMonArgParser::ArgImpl<const char*, Convert2String>
00141         arg_host("-host", DEFAULT_HOST);
00142     XrdMonArgParser::ArgImpl<int, Convert2Int> 
00143         arg_port("-port", DEFAULT_PORT);
00144 
00145     try {
00146         XrdMonArgParser argParser;
00147         argParser.registerExpectedArg(&arg_host);
00148         argParser.registerExpectedArg(&arg_port);
00149         argParser.parseArguments(argc, argv);
00150     } catch (XrdMonException& e) {
00151         e.printIt();
00152         printHelp();
00153         return 1;
00154     }
00155 
00156     const char* inputPathFile = "./paths.txt";
00157 
00158     kXR_int32 seed = 12345;
00159 
00160     srand(seed);
00161     
00162     XrdMonSndDummyXrootd::NEWUSERFREQUENCY  =  200;
00163     XrdMonSndDummyXrootd::NEWPROCFREQUENCY  =   50;
00164     kXR_int16 NEWDICTENTRYFREQUENCY =  8000;
00165     kXR_int16 calls2NewXrdMonSndDictEntry  =     1;    
00166     
00167     XrdMonSndDebug::initialize();
00168 
00169     XrdMonSndDummyXrootd xrootd;
00170     assert ( !xrootd.initialize(inputPathFile) );
00171     
00172     XrdMonSndTraceCache cache;
00173     XrdMonSndCoder coder;
00174     XrdMonSndTransmitter transmitter;
00175 
00176     assert ( !transmitter.initialize(arg_host.myVal(), arg_port.myVal()) );
00177     kXR_int64 noP = 0;
00178 
00179     while ( 0 != access("start.txt", F_OK) ) {
00180         static bool warned = false;
00181         if ( ! warned ) {
00182             cout << "Waiting for start.txt file\n";
00183             warned = true;
00184         }
00185         sleep(1);
00186     }
00187 
00188     bool sendLight = true;
00189     
00190     if ( sendLight ) { // use this loop to test light decoder
00191         cout << "\n***** sending LIGHT data *****\n" << endl;
00192         for ( kXR_int64 i=0 ; i<NOCALLS ; i++ ) {
00193             calls2NewXrdMonSndDictEntry = NEWDICTENTRYFREQUENCY;
00194             doDictionaryXrdMonSndPacket(xrootd, coder, transmitter, noP);
00195             doStageXrdMonSndPacket(xrootd, coder, transmitter, noP);
00196             if ( i % 3 ) { // every 3 opens, close one file...
00197                 closeFiles(xrootd, coder, transmitter, noP, true);
00198             }
00199             if ( noP >= maxNoXrdMonSndPackets-2 ) {
00200                 break;
00201             }
00202             if ( 0 == access("stop.txt", F_OK) ) {
00203                 break;
00204             }
00205             if ( i%1001 == 1000 ) {
00206                 usleep(1);
00207             }
00208         }
00209     } else { //  use this loop to test full tracing
00210         cout << "\n***** sending BULK data *****\n" << endl;
00211         for ( kXR_int64 i=0 ; i<NOCALLS ; i++ ) {
00212             if ( ! --calls2NewXrdMonSndDictEntry ) {
00213                 calls2NewXrdMonSndDictEntry = NEWDICTENTRYFREQUENCY;
00214                 doDictionaryXrdMonSndPacket(xrootd, coder, transmitter, noP);
00215             } else {
00216                 doTraceXrdMonSndPacket(xrootd, coder, transmitter, cache, noP);            
00217             }
00218             if ( noP >= maxNoXrdMonSndPackets-2 ) {
00219                 break;
00220             }
00221             if ( 0 == access("stop.txt", F_OK) ) {
00222                 break;
00223             }
00224             if ( i%1001 == 1000 ) {
00225                 usleep(1);
00226             }
00227         }
00228     }
00229 
00230     if ( XrdMonSndDebug::verbose(XrdMonSndDebug::Sending) ) {
00231         cout << "Flushing cache" << endl;
00232     }
00233     if ( 0 == coder.prepare2Transfer(cache.getVector()) ) {
00234         cache.clear();
00235         transmitter(coder.packet());
00236         coder.reset();
00237         noP++;
00238     }
00239 
00240     closeFiles(xrootd, coder, transmitter, noP, false);
00241 
00242     // set shutdown signal
00243     //XrdMonSndAdminEntry ae;
00244     //ae.setShutdown();
00245     //coder.prepare2Transfer(ae);
00246     //transmitter(coder.packet());
00247 
00248     transmitter.shutdown();
00249     coder.printStats();
00250     
00251     return 0;
00252 }

Generated on Tue Jul 5 14:46:43 2011 for ROOT_528-00b_version by  doxygen 1.5.1