DABC (Data Acquisition Backbone Core)  2.9.9
UdpTransport.cxx
Go to the documentation of this file.
1 // $Id: UdpTransport.cxx 4707 2021-02-26 11:29:21Z linev $
2 
3 /********************************************************************
4  * The Data Acquisition Backbone Core (DABC)
5  ********************************************************************
6  * Copyright (C) 2009-
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
8  * Planckstr. 1
9  * 64291 Darmstadt
10  * Germany
11  * Contact: http://dabc.gsi.de
12  ********************************************************************
13  * This software can be used under the GPL license agreements as stated
14  * in LICENSE.txt file which is part of the distribution.
15  ********************************************************************/
16 
17 #include "hadaq/UdpTransport.h"
18 
19 #include <cerrno>
20 #include <cmath>
21 #include <unistd.h>
22 #include <cstdio>
23 #include <cstdlib>
24 #include <sched.h>
25 
26 #include <netinet/in.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <sys/syscall.h>
30 
31 
32 // according to specification maximal UDP packet is 65,507 or 0xFFE3
33 #define DEFAULT_MTU 0xFFF0
34 
35 hadaq::NewAddon::NewAddon(int fd, int nport, int mtu, bool debug, int maxloop, double reduce, double lost) :
36  dabc::SocketAddon(fd),
37  TransportInfo(nport),
38  fTgtPtr(),
39  fMTU(mtu > 0 ? mtu : DEFAULT_MTU),
40  fMtuBuffer(nullptr),
41  fSkipCnt(0),
42  fSendCnt(0),
43  fMaxLoopCnt(maxloop > 1 ? maxloop : 1),
44  fReduce(reduce < 0.1 ? 0.1 : (reduce > 1. ? 1. : reduce)),
45  fLostRate(lost),
46  fLostCnt(lost>0 ? 1 : -1),
47  fDebug(debug),
48  fRunning(false),
49  fMaxProcDist(0.)
50 {
51  fMtuBuffer = malloc(fMTU);
52 }
53 
55 {
56  free(fMtuBuffer);
57 }
58 
60 {
61  if (evnt.GetCode() == evntSocketRead) {
62 
63  // DOUT0("Addon %d get read event", fNPort);
64 
65  // ignore events when not waiting for the new data
66  if (fRunning) { ReadUdp(); SetDoingInput(true); }
67  } else {
69  }
70 }
71 
72 long hadaq::NewAddon::Notify(const std::string &msg, int arg)
73 {
74  if (msg == "TransportWantToStart") {
75  fLastProcTm.GetNow();
76  fMaxProcDist = 0;
77  fRunning = true;
78  SetDoingInput(true);
79  return 0;
80  }
81 
82  if (msg == "TransportWantToStop") {
83  fRunning = false;
84  SetDoingInput(false);
85  return 0;
86  }
87 
88  return dabc::SocketAddon::Notify(msg, arg);
89 }
90 
91 
93 {
94  if (fTgtPtr.null()) return false;
95  unsigned fill_sz = fTgtPtr.distance_to_ownbuf();
96  if (fill_sz == 0) return false;
97 
98  fTgtPtr.buf().SetTypeId(hadaq::mbt_HadaqTransportUnit);
99  fTgtPtr.buf().SetTotalSize(fill_sz);
100  fTgtPtr.reset();
101 
102  fSendCnt++;
103  fTotalProducedBuffers++;
104  return true;
105 }
106 
107 
109 {
110  if (!fRunning) return false;
111 
112  hadaq::NewTransport* tr = dynamic_cast<hadaq::NewTransport*> (fWorker());
113  if (!tr) { EOUT("No transport assigned"); return false; }
114 
115  if (fDebug) {
116  double tm = fLastProcTm.SpentTillNow(true);
117  if (tm > fMaxProcDist) fMaxProcDist = tm;
118  }
119 
120  void *tgt = nullptr;
121 
122  if (fTgtPtr.null()) {
123  if (!tr->AssignNewBuffer(0, this)) {
124  if (fSkipCnt++<10) { fTotalArtificialSkip++; return false; }
125  tgt = fMtuBuffer;
126  }
127  }
128 
129  if (tgt != fMtuBuffer) {
130  if (fTgtPtr.rawsize() < fMTU) {
131  DOUT0("UDP:%d Should never happen - rest size is smaller than MTU", fNPort);
132  return false;
133  }
134  fSkipCnt = 0;
135  }
136 
137  int cnt = fMaxLoopCnt;
138 
139  while (cnt-- > 0) {
140 
141  if (tgt != fMtuBuffer) tgt = fTgtPtr.ptr();
142 
143  /* this was old form which is not necessary - socket is already bind with the port */
144  // socklen_t socklen = sizeof(fSockAddr);
145  // ssize_t res = recvfrom(Socket(), fTgtPtr.ptr(), fMTU, 0, (sockaddr*) &fSockAddr, &socklen);
146 
147  ssize_t res = recv(Socket(), tgt, fMTU, MSG_DONTWAIT);
148 
149  if (res == 0) {
150  DOUT0("UDP:%d Seems to be, socket was closed", fNPort);
151  return false;
152  }
153 
154  if (res < 0) {
155  // socket do not have data, one should enable event processing
156  // otherwise we need to poll for the new data
157  if (errno == EAGAIN) break;
158  EOUT("Socket error");
159  return false;
160  }
161 
162  if ((fLostCnt > 0) && (--fLostCnt == 0)) {
163  // artificial drop of received UDP packet
164  fLostCnt = (int) (1 / fLostRate * (0.5 + 1.* rand() / RAND_MAX));
165  if (fLostCnt < 3) fLostCnt = 3;
166  fTotalArtificialLosts++;
167  continue;
168  }
169 
170  hadaq::HadTu* hadTu = (hadaq::HadTu*) tgt;
171  int msgsize = hadTu->GetPaddedSize() + 32; // trb sender adds a 32 byte control trailer identical to event header
172 
173  std::string errmsg;
174 
175  if (res != msgsize) {
176  errmsg = dabc::format("Send buffer %d differ from message size %d - ignore it", res, msgsize);
177  } else
178  if (memcmp((char*) hadTu + hadTu->GetPaddedSize(), (char*) hadTu, 32) != 0) {
179  fTotalDiscard32Packet++;
180  errmsg = "Trailing 32 bytes do not match to header - ignore packet";
181  }
182 
183  if (!errmsg.empty()) {
184  DOUT3("UDP:%d %s", fNPort, errmsg.c_str());
185  if (fDebug && (dabc::lgr()->GetDebugLevel()>2)) {
186  errmsg = dabc::format(" Packet length %d", res);
187  uint32_t* ptr = (uint32_t*) hadTu;
188  for (unsigned n=0;n<res/4;n++) {
189  if (n%8 == 0) {
190  printf(" %s\n", errmsg.c_str());
191  errmsg = dabc::format("0x%04x:", n*4);
192  }
193 
194  errmsg.append(dabc::format(" 0x%08x", (unsigned) ptr[n]));
195  }
196  printf(" %s\n",errmsg.c_str());
197  }
198 
199  fTotalDiscardPacket++;
200  fTotalDiscardBytes+=res;
201  continue;
202  }
203 
204  if (tgt == fMtuBuffer) {
205  // skip single MTU
206  fTotalDiscardPacket++;
207  fTotalDiscardBytes+=res;
208  return false;
209  }
210 
211  fTotalRecvPacket++;
212  fTotalRecvBytes += res;
213 
214  fTgtPtr.shift(hadTu->GetPaddedSize());
215 
216  // when rest size is smaller that mtu, one should close buffer
217  if ((fTgtPtr.rawsize() < fMTU) || (fTgtPtr.consumed_size() > fReduce)) {
218  CloseBuffer();
219  tr->BufferReady();
220  if (!tr->AssignNewBuffer(0,this)) return false;
221  }
222  }
223 
224  return true; // indicate that buffer reading will be finished by callback
225 }
226 
227 int hadaq::NewAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen)
228 {
229  int fd = socket(PF_INET, SOCK_DGRAM, 0);
230  if (fd < 0) return -1;
231 
233  EOUT("Cannot set non-blocking mode for UDP socket %d", fd);
234  close(fd);
235  return -1;
236  }
237 
238  if (rcvbuflen > 0) {
239  // for hadaq application: set receive buffer length _before_ bind:
240  // int rcvBufLenReq = 1 * (1 << 20);
241  socklen_t rcvBufLenLen = sizeof(rcvbuflen);
242  if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuflen, rcvBufLenLen) == -1) {
243  EOUT("Fail to setsockopt SO_RCVBUF %s", strerror(errno));
244  }
245 
246  int rcvBufLenRet = 0;
247  if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvBufLenRet, &rcvBufLenLen) == -1) {
248  EOUT("fail to getsockopt SO_RCVBUF, ...): %s", strerror(errno));
249  }
250 
251  if (rcvBufLenRet < rcvbuflen) {
252  EOUT("UDP receive buffer length (%d) smaller than requested buffer length (%d)", rcvBufLenRet, rcvbuflen);
253  rcvbuflen = rcvBufLenRet;
254  } else {
255  DOUT0("SO_RCVBUF Configured %ld Actual %ld", (long) rcvbuflen, (long) rcvBufLenRet);
256  }
257  }
258 
259  if ((host.length()>0) && (host!="host")) {
260  struct addrinfo hints, *info = nullptr;
261 
262  memset(&hints, 0, sizeof(hints));
263  hints.ai_flags = AI_PASSIVE;
264  hints.ai_family = AF_UNSPEC; //AF_INET;
265  hints.ai_socktype = SOCK_DGRAM;
266 
267  std::string service = std::to_string(nport);
268 
269  getaddrinfo(host.c_str(), service.c_str(), &hints, &info);
270 
271  if (info && bind(fd, info->ai_addr, info->ai_addrlen) == 0) return fd;
272  }
273 
274  sockaddr_in addr;
275  memset(&addr, 0, sizeof(addr));
276  addr.sin_family = AF_INET;
277  addr.sin_port = htons(nport);
278 
279  if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == 0) return fd;
280 
281  close(fd);
282  return -1;
283 }
284 
285 
286 // ========================================================================================
287 
288 hadaq::NewTransport::NewTransport(dabc::Command cmd, const dabc::PortRef& inpport, NewAddon* addon, double flush, double heartbeat) :
289  dabc::Transport(cmd, inpport, 0),
290  fIdNumber(0),
291  fNumReadyBufs(0),
292  fBufAssigned(false),
293  fLastSendCnt(0)
294 {
295  // do not process to much events at once, let another transports a chance
297 
298  // low-down priority of module/port events, let process I/O events faster
300 
301  addon->SetIOPriority(1); // this is priority of main I/O events, higher than module events
302 
303  fIdNumber = inpport.ItemSubId();
304 
305  AssignAddon(addon);
306 
307  if (flush > 0)
308  CreateTimer("FlushTimer", flush);
309 
310  if (heartbeat > 0)
311  CreateTimer("HeartbeatTimer", heartbeat);
312 
313  DOUT3("Starting hadaq::DataTransport %s id %d", GetName(), fIdNumber);
314 }
315 
317 {
318 }
319 
321 {
322  if (cmd.IsName("ResetTransportStat")) {
323  NewAddon *addon = dynamic_cast<NewAddon *> (fAddon());
324  if (addon) addon->ClearCounters();
325  return dabc::cmd_true;
326  }
327 
328  if (cmd.IsName("GetHadaqTransportInfo")) {
329  TransportInfo *info = (TransportInfo *) (dynamic_cast<NewAddon*> (fAddon()));
330  cmd.SetPtr("Info", info);
331  cmd.SetUInt("UdpPort", info ? info->fNPort : 0);
332  return dabc::cmd_true;
333  }
334 
335  if (cmd.IsName("TdcCalibrations") || cmd.IsName("CalibrRefresh")) {
336  // ignore this command
337  return dabc::cmd_true;
338  }
339 
341 }
342 
344 {
345  fLastDebugTm.GetNow();
346 
347  AssignNewBuffer(0); // provide immediately buffer - if possible
348 
349  fAddon.Notify("TransportWantToStart");
350 
352 }
353 
355 {
356  FlushBuffer(true);
357 
358  fAddon.Notify("TransportWantToStop");
359 
361 }
362 
364 {
365  std::string name = TimerName(timer);
366  if (name == "HeartbeatTimer") {
367  NewAddon *addon = (NewAddon *) fAddon();
368 
369  if (addon) {
370  addon->ReadUdp();
371  addon->SetDoingInput(true);
372  }
373 
374  } else if (name == "FlushTimer") {
375  FlushBuffer(false);
376 
377  NewAddon *addon = (NewAddon *) fAddon();
378 
379  if (addon && addon->fDebug && fLastDebugTm.Expired(1.)) {
380  DOUT1("UDP %d NumReady:%u CanTake:%u BufAssigned:%s CanSend:%u DoingInp %s maxlooptm = %5.3f", fIdNumber, fNumReadyBufs, NumCanTake(0), DBOOL(fBufAssigned), NumCanSend(0), DBOOL(addon->IsDoingInput()), addon->fMaxProcDist);
381  fLastDebugTm.GetNow();
382  addon->fMaxProcDist = 0;
383  }
384 
385  } else {
387  }
388 }
389 
391 {
392  if (fNumReadyBufs > 0) {
393  dabc::Buffer buf = TakeBuffer(0);
394  Send(port, buf);
395  fNumReadyBufs--;
396  }
397 
398  return fNumReadyBufs > 0;
399 }
400 
402 {
403  // check that required element available in the pool
404 
405  NewAddon* addon = (NewAddon *) fAddon();
406 
407  if (AssignNewBuffer(pool, addon)) {
408  addon->ReadUdp();
409  addon->SetDoingInput(true);
410  }
411 
412  return false;
413 }
414 
416 {
417  // assign new buffer to the addon
418 
419  if (fBufAssigned || (NumCanTake(pool) <= fNumReadyBufs)) return false;
420 
421  if (!addon) addon = (NewAddon*) fAddon();
422 
423  if (addon->HasBuffer()) {
424  EOUT("should not happen");
425  return false;
426  }
427 
428  dabc::Buffer buf = PoolQueueItem(pool, fNumReadyBufs);
429  if (buf.null()) {
430  EOUT("Empty buffer when all checks already done - strange");
431  CloseTransport(true);
432  return false;
433  }
434 
435  unsigned bufsize = (unsigned) buf.SegmentSize(0);
436 
437  addon->fTgtPtr.reset(buf, 0, bufsize);
438 
439  fBufAssigned = true;
440 
441  if (addon->fTgtPtr.rawsize() < addon->fMTU) {
442  EOUT("not enough space in the buffer - at least %u is required", addon->fMTU);
443  CloseTransport(true);
444  return false;
445  }
446 
447  return true;
448 }
449 
451 {
452  fBufAssigned = false;
453  fNumReadyBufs++;
454 
455  while (CanSend(0))
456  if (!ProcessSend(0)) break;
457 }
458 
460 {
461  NewAddon* addon = dynamic_cast<NewAddon*> (fAddon());
462 
463  if (onclose || (fLastSendCnt == addon->fSendCnt)) {
464  if (addon->CloseBuffer()) {
465  BufferReady();
466  if (!onclose) AssignNewBuffer(0, addon);
467  }
468  }
469 
470  fLastSendCnt = addon->fSendCnt;
471 }
#define DEFAULT_MTU
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
Definition: Buffer.h:174
Represents command with its arguments.
Definition: Command.h:99
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
Definition: Command.cxx:151
bool SetUInt(const std::string &name, unsigned v)
Definition: Command.h:147
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
Definition: ModuleAsync.h:162
unsigned ItemSubId() const
Definition: ModuleItem.h:123
std::string OutputName(unsigned indx=0, bool fullname=false) const
Definition: Module.cxx:181
virtual void SetModulePriority(int pri=-1)
Definition: Module.cxx:284
bool SetPortLoopLength(const std::string &name, unsigned cnt)
Definition: Module.cxx:812
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
Definition: Module.cxx:109
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
void reset(void *buf, BufferSize_t sz)
Definition: Pointer.h:99
BufferSize_t rawsize() const
Definition: Pointer.h:148
Reference on the dabc::Port class
Definition: Port.h:195
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
virtual void ProcessEvent(const EventId &)
void SetIOPriority(int prior=1)
Method defines priority level for socket IO events.
Definition: SocketThread.h:125
void SetDoingInput(bool on=true)
Call method to indicate that object wants to read data from the socket.
Definition: SocketThread.h:69
bool IsDoingInput() const
Definition: SocketThread.h:111
static bool SetNonBlockSocket(int fd)
virtual bool StopTransport()
Definition: Transport.cxx:231
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
Definition: Transport.cxx:220
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Transport.cxx:202
virtual long Notify(const std::string &, int)
Light-weight command interface, which can be used from worker.
Definition: Worker.h:82
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
Definition: Worker.cxx:277
virtual ~NewAddon()
int fSendCnt
counter of send buffers since last timeout active
Definition: UdpTransport.h:104
double fMaxProcDist
maximal time between calls to BuildEvent method
Definition: UdpTransport.h:112
NewAddon(int fd, int nport, int mtu, bool debug, int maxloop, double reduce, double lost)
unsigned fMTU
maximal size of packet expected from TRB
Definition: UdpTransport.h:101
virtual long Notify(const std::string &, int)
Light-weight command interface.
void * fMtuBuffer
buffer used to skip packets when no normal buffer is available
Definition: UdpTransport.h:102
bool fDebug
when true, produce more debug output
Definition: UdpTransport.h:109
static int OpenUdp(const std::string &host, int nport, int rcvbuflen)
virtual void ProcessEvent(const dabc::EventId &)
bool HasBuffer() const
Definition: UdpTransport.h:128
dabc::Pointer fTgtPtr
pointer used to read data
Definition: UdpTransport.h:100
bool AssignNewBuffer(unsigned pool, NewAddon *addon=nullptr)
virtual bool ProcessBuffer(unsigned pool)
Method called by framework when at least one buffer available in pool handle.
void FlushBuffer(bool onclose=false)
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual bool ProcessSend(unsigned port)
Method called by framework when at least one buffer can be send to output port.
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
NewTransport(dabc::Command, const dabc::PortRef &inpport, NewAddon *addon, double flush=1, double heartbeat=-1)
int fIdNumber
input port item id
Definition: UdpTransport.h:139
virtual bool StopTransport()
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
std::string format(const char *fmt,...)
Definition: string.cxx:49
@ cmd_true
Definition: Command.h:38
Logger * lgr()
Definition: logging.cxx:483
@ mbt_HadaqTransportUnit
Definition: HadaqTypeDefs.h:25
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
uint16_t GetCode() const
Definition: Thread.h:92
HADES transport unit header.
Definition: defines.h:138
uint32_t GetPaddedSize() const
Definition: defines.h:184
int fNPort
upd port number
Definition: UdpTransport.h:43