26 #include <netinet/in.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <sys/syscall.h>
33 #define DEFAULT_MTU 0xFFF0
36 dabc::SocketAddon(fd),
43 fMaxLoopCnt(maxloop > 1 ? maxloop : 1),
44 fReduce(reduce < 0.1 ? 0.1 : (reduce > 1. ? 1. : reduce)),
46 fLostCnt(lost>0 ? 1 : -1),
61 if (evnt.
GetCode() == evntSocketRead) {
66 if (fRunning) { ReadUdp(); SetDoingInput(
true); }
74 if (msg ==
"TransportWantToStart") {
82 if (msg ==
"TransportWantToStop") {
94 if (fTgtPtr.null())
return false;
95 unsigned fill_sz = fTgtPtr.distance_to_ownbuf();
96 if (fill_sz == 0)
return false;
99 fTgtPtr.buf().SetTotalSize(fill_sz);
103 fTotalProducedBuffers++;
110 if (!fRunning)
return false;
113 if (!tr) {
EOUT(
"No transport assigned");
return false; }
116 double tm = fLastProcTm.SpentTillNow(
true);
117 if (tm > fMaxProcDist) fMaxProcDist = tm;
122 if (fTgtPtr.null()) {
124 if (fSkipCnt++<10) { fTotalArtificialSkip++;
return false; }
129 if (tgt != fMtuBuffer) {
130 if (fTgtPtr.rawsize() < fMTU) {
131 DOUT0(
"UDP:%d Should never happen - rest size is smaller than MTU", fNPort);
137 int cnt = fMaxLoopCnt;
141 if (tgt != fMtuBuffer) tgt = fTgtPtr.ptr();
147 ssize_t res = recv(Socket(), tgt, fMTU, MSG_DONTWAIT);
150 DOUT0(
"UDP:%d Seems to be, socket was closed", fNPort);
157 if (errno == EAGAIN)
break;
158 EOUT(
"Socket error");
162 if ((fLostCnt > 0) && (--fLostCnt == 0)) {
164 fLostCnt = (int) (1 / fLostRate * (0.5 + 1.* rand() / RAND_MAX));
165 if (fLostCnt < 3) fLostCnt = 3;
166 fTotalArtificialLosts++;
175 if (res != msgsize) {
176 errmsg =
dabc::format(
"Send buffer %d differ from message size %d - ignore it", res, msgsize);
178 if (memcmp((
char*) hadTu + hadTu->
GetPaddedSize(), (
char*) hadTu, 32) != 0) {
179 fTotalDiscard32Packet++;
180 errmsg =
"Trailing 32 bytes do not match to header - ignore packet";
183 if (!errmsg.empty()) {
184 DOUT3(
"UDP:%d %s", fNPort, errmsg.c_str());
185 if (fDebug && (
dabc::lgr()->GetDebugLevel()>2)) {
187 uint32_t* ptr = (uint32_t*) hadTu;
188 for (
unsigned n=0;n<res/4;n++) {
190 printf(
" %s\n", errmsg.c_str());
194 errmsg.append(
dabc::format(
" 0x%08x", (
unsigned) ptr[n]));
196 printf(
" %s\n",errmsg.c_str());
199 fTotalDiscardPacket++;
200 fTotalDiscardBytes+=res;
204 if (tgt == fMtuBuffer) {
206 fTotalDiscardPacket++;
207 fTotalDiscardBytes+=res;
212 fTotalRecvBytes += res;
217 if ((fTgtPtr.rawsize() < fMTU) || (fTgtPtr.consumed_size() > fReduce)) {
229 int fd = socket(PF_INET, SOCK_DGRAM, 0);
230 if (fd < 0)
return -1;
233 EOUT(
"Cannot set non-blocking mode for UDP socket %d", fd);
241 socklen_t rcvBufLenLen =
sizeof(rcvbuflen);
242 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuflen, rcvBufLenLen) == -1) {
243 EOUT(
"Fail to setsockopt SO_RCVBUF %s", strerror(errno));
246 int rcvBufLenRet = 0;
247 if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvBufLenRet, &rcvBufLenLen) == -1) {
248 EOUT(
"fail to getsockopt SO_RCVBUF, ...): %s", strerror(errno));
251 if (rcvBufLenRet < rcvbuflen) {
252 EOUT(
"UDP receive buffer length (%d) smaller than requested buffer length (%d)", rcvBufLenRet, rcvbuflen);
253 rcvbuflen = rcvBufLenRet;
255 DOUT0(
"SO_RCVBUF Configured %ld Actual %ld", (
long) rcvbuflen, (
long) rcvBufLenRet);
259 if ((host.length()>0) && (host!=
"host")) {
260 struct addrinfo hints, *info =
nullptr;
262 memset(&hints, 0,
sizeof(hints));
263 hints.ai_flags = AI_PASSIVE;
264 hints.ai_family = AF_UNSPEC;
265 hints.ai_socktype = SOCK_DGRAM;
267 std::string service = std::to_string(nport);
269 getaddrinfo(host.c_str(), service.c_str(), &hints, &info);
271 if (info && bind(fd, info->ai_addr, info->ai_addrlen) == 0)
return fd;
275 memset(&addr, 0,
sizeof(addr));
276 addr.sin_family = AF_INET;
277 addr.sin_port = htons(nport);
279 if (bind(fd, (
struct sockaddr *) &addr,
sizeof(addr)) == 0)
return fd;
289 dabc::Transport(cmd, inpport, 0),
322 if (cmd.
IsName(
"ResetTransportStat")) {
328 if (cmd.
IsName(
"GetHadaqTransportInfo")) {
335 if (cmd.
IsName(
"TdcCalibrations") || cmd.
IsName(
"CalibrRefresh")) {
345 fLastDebugTm.GetNow();
349 fAddon.Notify(
"TransportWantToStart");
358 fAddon.Notify(
"TransportWantToStop");
365 std::string name = TimerName(timer);
366 if (name ==
"HeartbeatTimer") {
374 }
else if (name ==
"FlushTimer") {
379 if (addon && addon->
fDebug && fLastDebugTm.Expired(1.)) {
380 DOUT1(
"UDP %d NumReady:%u CanTake:%u BufAssigned:%s CanSend:%u DoingInp %s maxlooptm = %5.3f", fIdNumber, fNumReadyBufs, NumCanTake(0),
DBOOL(fBufAssigned), NumCanSend(0),
DBOOL(addon->
IsDoingInput()), addon->
fMaxProcDist);
381 fLastDebugTm.GetNow();
392 if (fNumReadyBufs > 0) {
398 return fNumReadyBufs > 0;
407 if (AssignNewBuffer(pool, addon)) {
419 if (fBufAssigned || (NumCanTake(pool) <= fNumReadyBufs))
return false;
421 if (!addon) addon = (
NewAddon*) fAddon();
424 EOUT(
"should not happen");
430 EOUT(
"Empty buffer when all checks already done - strange");
431 CloseTransport(
true);
442 EOUT(
"not enough space in the buffer - at least %u is required", addon->
fMTU);
443 CloseTransport(
true);
452 fBufAssigned =
false;
456 if (!ProcessSend(0))
break;
463 if (onclose || (fLastSendCnt == addon->
fSendCnt)) {
466 if (!onclose) AssignNewBuffer(0, addon);
Reference on memory from memory pool.
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
Represents command with its arguments.
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
bool SetUInt(const std::string &name, unsigned v)
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
unsigned ItemSubId() const
std::string OutputName(unsigned indx=0, bool fullname=false) const
virtual void SetModulePriority(int pri=-1)
bool SetPortLoopLength(const std::string &name, unsigned cnt)
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
const char * GetName() const
Returns name of the object, thread safe
void reset(void *buf, BufferSize_t sz)
BufferSize_t rawsize() const
Reference on the dabc::Port class
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
virtual void ProcessEvent(const EventId &)
void SetIOPriority(int prior=1)
Method defines priority level for socket IO events.
void SetDoingInput(bool on=true)
Call method to indicate that object wants to read data from the socket.
bool IsDoingInput() const
static bool SetNonBlockSocket(int fd)
virtual bool StopTransport()
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
virtual long Notify(const std::string &, int)
Light-weight command interface, which can be used from worker.
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
int fSendCnt
counter of send buffers since last timeout active
double fMaxProcDist
maximal time between calls to BuildEvent method
NewAddon(int fd, int nport, int mtu, bool debug, int maxloop, double reduce, double lost)
unsigned fMTU
maximal size of packet expected from TRB
virtual long Notify(const std::string &, int)
Light-weight command interface.
void * fMtuBuffer
buffer used to skip packets when no normal buffer is available
bool fDebug
when true, produce more debug output
static int OpenUdp(const std::string &host, int nport, int rcvbuflen)
virtual void ProcessEvent(const dabc::EventId &)
dabc::Pointer fTgtPtr
pointer used to read data
bool AssignNewBuffer(unsigned pool, NewAddon *addon=nullptr)
virtual bool ProcessBuffer(unsigned pool)
Method called by framework when at least one buffer available in pool handle.
void FlushBuffer(bool onclose=false)
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual bool ProcessSend(unsigned port)
Method called by framework when at least one buffer can be send to output port.
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
NewTransport(dabc::Command, const dabc::PortRef &inpport, NewAddon *addon, double flush=1, double heartbeat=-1)
int fIdNumber
input port item id
virtual bool StopTransport()
std::string format(const char *fmt,...)
Event structure, exchanged between DABC threads.
HADES transport unit header.
uint32_t GetPaddedSize() const
int fNPort
upd port number