00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "XrdMon/XrdMonCommon.hh"
00014 #include "XrdMon/XrdMonTimer.hh"
00015 #include "XrdMon/XrdMonCtrBuffer.hh"
00016 #include "XrdMon/XrdMonCtrPacket.hh"
00017 #include <sys/socket.h>
00018 #include <assert.h>
00019
00020
00021 #define PRINT_SPEED
00022
00023
00024 #include "XrdMon/XrdMonCtrDebug.hh"
00025 #include "XrdMon/XrdMonSenderInfo.hh"
00026
00027 #include <iomanip>
00028 #include "XrdSys/XrdSysHeaders.hh"
00029 using std::cout;
00030
00031 namespace XrdMonCtrCollector {
00032 int port = DEFAULT_PORT;
00033 }
00034
00035 void
00036 printSpeed()
00037 {
00038 static kXR_int64 noP = 0;
00039 static XrdMonTimer t;
00040 if ( 0 == noP ) {
00041 t.start();
00042 }
00043 ++noP;
00044 const kXR_int64 EVERY = 1001;
00045 if ( noP % EVERY == EVERY-1) {
00046 double elapsed = t.stop();
00047 cout << noP << " packets received in " << elapsed
00048 << " sec (" << EVERY/elapsed << " Hz)" << endl;
00049 t.reset(); t.start();
00050 }
00051 }
00052
00053 extern "C" void* receivePackets(void*)
00054 {
00055 struct sockaddr_in sAddress;
00056
00057 int socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00058 assert( -1 != socket_ );
00059
00060 memset((char *) &sAddress, sizeof(sAddress), 0);
00061 sAddress.sin_family = AF_INET;
00062 sAddress.sin_port = htons(XrdMonCtrCollector::port);
00063 sAddress.sin_addr.s_addr = htonl(INADDR_ANY);
00064
00065 if ( -1 == bind(socket_,
00066 (struct sockaddr*)&sAddress,
00067 sizeof(sAddress)) ) {
00068 cerr << "Failed to bind, likely port "
00069 << XrdMonCtrCollector::port << " in use" << endl;
00070 ::abort();
00071 }
00072
00073 XrdMonCtrBuffer* pb = XrdMonCtrBuffer::instance();
00074 cout << "Ready to receive data..." << endl;
00075 while ( 1 ) {
00076 XrdMonCtrPacket* packet = new XrdMonCtrPacket(MAXPACKETSIZE);
00077 socklen_t slen = sizeof(packet->sender);
00078 if ( -1 == recvfrom(socket_,
00079 packet->buf,
00080 MAXPACKETSIZE,
00081 0,
00082 (sockaddr* )(&(packet->sender)),
00083 &slen) ) {
00084 cerr << "Failed to receive data" << endl;
00085 ::abort();
00086 }
00087 #ifdef DEBUG
00088 static kXR_int32 packetNo = 0;
00089 ++packetNo;
00090 {
00091 XrdMonCtrXrdSysMutexHelper mh; mh.Lock(&XrdMonCtrDebug::_mutex);
00092 cout << "Received packet no "
00093 << setw(5) << packetNo << " from "
00094 << XrdMonSenderInfo::hostPort(packet->sender) << endl;
00095 }
00096 #endif
00097
00098 pb->push_back(packet);
00099
00100 #ifdef PRINT_SPEED
00101 printSpeed();
00102 #endif
00103 }
00104 return 0;
00105 }