XrdMonCtrCollector.cc

Go to the documentation of this file.
00001 /*****************************************************************************/
00002 /*                                                                           */
00003 /*                          XrdMonCtrCollector.cc                            */
00004 /*                                                                           */
00005 /* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University */
00006 /*                            All Rights Reserved                            */
00007 /*       Produced by Jacek Becla for Stanford University under contract      */
00008 /*              DE-AC02-76SF00515 with the Department of Energy              */
00009 /*****************************************************************************/
00010 
00011 // $Id: XrdMonCtrCollector.cc 24468 2008-06-22 16:47:03Z ganis $
00012 
00013 #include "XrdMon/XrdMonCommon.hh"
00014 #include "XrdMon/XrdMonTimer.hh"
00015 #include "XrdMon/XrdMonCtrBuffer.hh"
00016 #include "XrdMon/XrdMonCtrPacket.hh"
00017 #include <sys/socket.h>
00018 #include <assert.h>
00019 
00020 //#define DEBUG
00021 #define PRINT_SPEED
00022 
00023 // for DEBUG/PRINT_SPEED only
00024 #include "XrdMon/XrdMonCtrDebug.hh"
00025 #include "XrdMon/XrdMonSenderInfo.hh"
00026 
00027 #include <iomanip>
00028 #include "XrdSys/XrdSysHeaders.hh"
00029 using std::cout;
00030 
00031 namespace XrdMonCtrCollector {
00032     int port = DEFAULT_PORT;
00033 }
00034 
00035 void
00036 printSpeed()
00037 {
00038     static kXR_int64 noP = 0;
00039     static XrdMonTimer t;
00040     if ( 0 == noP ) {
00041         t.start();
00042     }
00043     ++noP;
00044     const kXR_int64 EVERY = 1001;
00045     if ( noP % EVERY == EVERY-1) {
00046         double elapsed = t.stop();
00047         cout << noP << " packets received in " << elapsed 
00048              << " sec (" << EVERY/elapsed << " Hz)" << endl;
00049         t.reset(); t.start();
00050     }
00051 }
00052 
00053 extern "C" void* receivePackets(void*)
00054 {
00055     struct sockaddr_in sAddress;
00056 
00057     int socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);    
00058     assert( -1 != socket_ );
00059 
00060     memset((char *) &sAddress, sizeof(sAddress), 0);
00061     sAddress.sin_family = AF_INET;
00062     sAddress.sin_port = htons(XrdMonCtrCollector::port);
00063     sAddress.sin_addr.s_addr = htonl(INADDR_ANY);
00064 
00065     if ( -1 == bind(socket_, 
00066                     (struct sockaddr*)&sAddress, 
00067                     sizeof(sAddress)) ) {
00068         cerr << "Failed to bind, likely port " 
00069              << XrdMonCtrCollector::port << " in use" << endl;
00070         ::abort();
00071     }
00072 
00073     XrdMonCtrBuffer* pb = XrdMonCtrBuffer::instance();
00074     cout << "Ready to receive data..." << endl;
00075     while ( 1 ) {
00076         XrdMonCtrPacket* packet = new XrdMonCtrPacket(MAXPACKETSIZE);
00077         socklen_t slen = sizeof(packet->sender);
00078         if ( -1 == recvfrom(socket_, 
00079                             packet->buf, 
00080                             MAXPACKETSIZE, 
00081                             0, 
00082                             (sockaddr* )(&(packet->sender)), 
00083                             &slen) ) {
00084             cerr << "Failed to receive data" << endl;
00085             ::abort();
00086         }
00087 #ifdef DEBUG
00088         static kXR_int32 packetNo = 0;
00089         ++packetNo;
00090         {
00091             XrdMonCtrXrdSysMutexHelper mh; mh.Lock(&XrdMonCtrDebug::_mutex);
00092             cout << "Received packet no " 
00093                  << setw(5) << packetNo << " from " 
00094                  << XrdMonSenderInfo::hostPort(packet->sender) << endl;
00095         }
00096 #endif
00097 
00098         pb->push_back(packet);
00099 
00100 #ifdef PRINT_SPEED
00101         printSpeed();
00102 #endif
00103     }
00104     return 0;
00105 }

Generated on Tue Jul 5 14:46:39 2011 for ROOT_528-00b_version by  doxygen 1.5.1