DABC (Data Acquisition Backbone Core)  2.9.9
NetworkTransport.cxx
Go to the documentation of this file.
1 // $Id: NetworkTransport.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/NetworkTransport.h"
17 
18 #include "dabc/Manager.h"
19 #include "dabc/Pointer.h"
20 
21 dabc::NetworkTransport::NetworkTransport(dabc::Command cmd, const PortRef& inpport, const PortRef& outport, bool useackn, WorkerAddon* addon) :
22  dabc::Transport(cmd, inpport, outport),
23  fNet(0),
24  fTransportId(0),
25  fUseAckn(useackn),
26  fInputQueueCapacity(0),
27  fOutputQueueCapacity(0),
28  fNumRecs(0),
29  fRecsCounter(0),
30  fRecs(0),
31  fOutputQueueSize(0),
32  fAcknAllowedOper(0),
33  fAcknSendQueue(),
34  fAcknSendBufBusy(false),
35  fInputQueueSize(0),
36  fFirstAckn(true),
37  fAcknReadyCounter(0),
38  fFullHeaderSize(0),
39  fInlineDataSize(0),
40  fStartBufReq(false)
41 {
42  AssignAddon(addon);
43 
44  fNet = (NetworkInetrface*) fAddon.Notify("GetNetworkTransportInetrface");
45 
46  if (fNet==0) {
47  EOUT("Cannot obtain network addon for the NetworkTransport");
48  exit(345);
49  }
50 
51  if (IsInputTransport())
53 
54  if (IsOutputTransport())
56 
57  DOUT2("Create new net transport inp %s out %s ackn %s", DBOOL(IsInputTransport()), DBOOL(IsOutputTransport()), DBOOL(fUseAckn));
58 
59  if (fUseAckn) {
62  // TODO: do we need here increment output queue size???
63  }
64 
65 
66  if (inpport.QueueCapacity() > 0) {
67 
68  if (NumPools()==0) {
69  EOUT("No memory pool specified to provided buffers for network transport");
70  exit(444);
71  }
72 
73  // use time to request buffer again
74  if (!IsAutoPool()) CreateTimer("SysTimer");
75  }
76 
77 
78  fInputQueueSize = 0;
79  fFirstAckn = true;
81 
82  fInlineDataSize = 32; // TODO: configure via port properties
84 
86  fRecsCounter = 0;
87  fNumUsedRecs = 0;
88  if (fNumRecs>0) {
89  fRecs = new NetIORec[fNumRecs];
90  for (uint32_t n=0;n<fNumRecs;n++) {
91  fRecs[n].used = false;
92  fRecs[n].kind = 0;
93  fRecs[n].extras = 0;
94  // fRecs[n].buf.Release();
95  fRecs[n].header = 0;
96  fRecs[n].inlinebuf = 0;
97  }
98  }
99 
100  if (IsInputTransport() && fUseAckn)
102 
103  if (IsInputTransport() && (NumPools()==0)) {
104  EOUT("Pool required for input transport or for the acknowledge queue");
105  return;
106  }
107 
110 }
111 
113 {
114  DOUT2("#### ~NetworkTransport fRecs %p", fRecs);
115 }
116 
118 {
120 
121  DOUT3("NetworkTransport::TransportCleanup");
122 
123  // at this moment net should be destroyed by the addon cleanup
124  fNet = 0;
125 
126  fAcknSendQueue.Reset();
127 
128  if (fNumRecs>0) {
129  for (uint32_t n=0;n<fNumRecs;n++) {
130  fRecs[n].used = false;
131  fRecs[n].buf.Release();
132  }
133 
134  delete[] fRecs;
135  }
136  fRecs = 0;
137  fNumRecs = 0;
138  fNumUsedRecs = 0;
139 }
140 
141 
142 void dabc::NetworkTransport::SetRecHeader(uint32_t recid, void* header)
143 {
144  fRecs[recid].header = header;
145  if (fInlineDataSize > 0)
146  fRecs[recid].inlinebuf = (char*) header + sizeof(NetworkHeader);
147 }
148 
149 uint32_t dabc::NetworkTransport::TakeRec(Buffer& buf, uint32_t kind, uint32_t extras)
150 {
151  if (fNumRecs == 0) return 0;
152 
153  uint32_t cnt = fNumRecs;
154  uint32_t recid = 0;
155  while (cnt-->0) {
156  recid = fRecsCounter;
157  fRecsCounter = (fRecsCounter+1) % fNumRecs;
158  if (!fRecs[recid].used) {
159  fRecs[recid].used = true;
160  fRecs[recid].kind = kind;
161  fRecs[recid].buf << buf;
162  fRecs[recid].extras = extras;
163  fNumUsedRecs++;
164  return recid;
165  }
166  }
167 
168  EOUT("Cannot allocate NetIORec. Halt");
169  EOUT("SendQueue %u RecvQueue %u NumRecs %u used %u", fOutputQueueSize, fInputQueueSize, fNumRecs, fNumUsedRecs);
170  return fNumRecs;
171 }
172 
174 {
175  if (recid<fNumRecs) {
176  if (!fRecs[recid].buf.null()) EOUT("Buffer is not empty when record is released !!!!");
177  fRecs[recid].used = false;
178  fNumUsedRecs--;
179  } else {
180  EOUT("Error recid %u", recid);
181  }
182 }
183 
184 
186 {
187  // Returns 0 - failure, 1 - only header should be send, 2 - header and buffer should be send */
188 
189  NetworkHeader* hdr = (NetworkHeader*) fRecs[recid].header;
190 
191  if (hdr==0) return 0;
192 
193  hdr->chkword = 123;
194  hdr->kind = fRecs[recid].kind;
195  if (fRecs[recid].buf.null()) {
196  hdr->size = 0;
197  hdr->typid = 0;
198 
199  if (hdr->kind & netot_HdrSend) {
200  hdr->typid = mbt_AcknCounter;
201  hdr->size = fRecs[recid].extras;
202  }
203 
204  return 1;
205  }
206 
207  hdr->typid = fRecs[recid].buf.GetTypeId();
208  hdr->size = fRecs[recid].buf.GetTotalSize();
209 
210  // copy content of the buffer in the inline buffer
211  if ((hdr->size>0) && fRecs[recid].inlinebuf && (hdr->size<=fInlineDataSize)) {
212  fRecs[recid].buf.CopyTo(fRecs[recid].inlinebuf, hdr->size);
213  return 1;
214  }
215 
216  return 2;
217 }
218 
219 void dabc::NetworkTransport::FillRecvQueue(Buffer* freebuf, bool onlyfreebuf)
220 {
221  // method used to keep receive queue filled
222  // Sometime one need to reinject buffer, which was received as "fast",
223  // therefore its processing finished in transport thread and we can
224  // use it again in receive queue.
225 
226 // DOUT0("FillRecvQueue isinp:%s port:%p pool:%p", DBOOL(IsInputTransport()), Output(), Pool());
227 
228  if (isTransportError()) return;
229 
230  unsigned newitems(0), numcansubmit(0);
231 
232  if (IsInputTransport()) {
233  if (NumPools()==0) {
234  EOUT("No memory pool in input transport");
235  CloseTransport(true);
236  return;
237  }
238  if (NumOutputs()==0) {
239  EOUT("No output port for input transport");
240  CloseTransport(true);
241  return;
242  }
243  numcansubmit = NumCanSend();
244 
245 // DOUT0("FillRecvQueue submitlimit %u acutalsize %u", numcansubmit, fInputQueueSize));
246 
247  } else {
248  // if no input is specified, one only need queue for ackn
249  numcansubmit = fInputQueueCapacity;
250  }
251 
252  while (fInputQueueSize < numcansubmit) {
253  Buffer buf;
254 
255  if (IsInputTransport()) {
256  // only with input transport we need buffers
257  if (freebuf) buf << *freebuf;
258 
259  if (buf.null()) buf = TakeBuffer();
260 
261  if (buf.null()) {
262  if (IsAutoPool()) ShootTimer("SysTimer", 0.001);
263  break;
264  }
265  }
266 
267  uint32_t recvrec = TakeRec(buf, netot_Recv);
268  fInputQueueSize++;
269  newitems++;
270  fNet->SubmitRecv(recvrec);
271 
272  // if we want to reuse only free buffer, just break and do not try to submit any new requests
273  if (freebuf && onlyfreebuf) break;
274  }
275 
276  // no need to release additional buffer if it was not used, it will be done in upper method
277  // if (freebuf) freebuf->Release();
278 
279  CheckAcknReadyCounter(newitems);
280 }
281 
283 {
284  // check if count of newly submitted recv buffers exceed limit
285  // after which one should send acknowledge packet to receiver
286 
287  DOUT5("CheckAcknReadyCounter ackn:%s pool:%s inp:%s", DBOOL(fUseAckn), PoolName().c_str(), DBOOL(IsInputTransport()));
288 
289  if (!fUseAckn || (NumPools()==0) || !IsInputTransport()) return false;
290 
291  fAcknReadyCounter+=newitems;
292 
293  if (fAcknSendBufBusy) return false;
294 
295  unsigned ackn_limit = fFirstAckn ? fInputQueueCapacity : fInputQueueCapacity/2;
296  if (ackn_limit<1) ackn_limit = 1;
297 
298  DOUT5("fAcknReadyCounter = %d limit = %d", fAcknReadyCounter, ackn_limit);
299 
300  // check if we need to send ackn packet
301  if (fAcknReadyCounter<ackn_limit) return false;
302 
303  fAcknSendBufBusy = true;
304 
305  fAcknReadyCounter -= ackn_limit;
306 
307  fFirstAckn = false;
308 
309  dabc::Buffer buf;
310 
311  uint32_t recid = TakeRec(buf, netot_HdrSend, ackn_limit);
312 
313  fNet->SubmitSend(recid);
314 
315  return true;
316 }
317 
319 {
320  while ((fAcknAllowedOper>0) && (fAcknSendQueue.Size()>0)) {
321  uint32_t recid = fAcknSendQueue.Pop();
322  fAcknAllowedOper--;
323  fNet->SubmitSend(recid);
324  }
325 }
326 
328 {
329  if (recid>=fNumRecs) { EOUT("Recid fail %u %u", recid, fNumRecs); return; }
330 
331  bool checkackn(false);
332 
333  fRecs[recid].buf.Release();
334 
335  if (fRecs[recid].kind & netot_Send) {
336  // normal send
337  fOutputQueueSize--;
338 
339  if (!CanRecv()) {
340  EOUT("One cannot recieve buffer!!!!");
341  exit(333);
342  }
343 
344  Recv().Release();
345 
346  } else
347  if (fRecs[recid].kind & netot_HdrSend) {
348  fAcknSendBufBusy = false;
349  checkackn = true;
350  } else {
351  EOUT("Wrong kind=%u in ProcessSendCompl", fRecs[recid].kind);
352  }
353 
354  ReleaseRec(recid);
355 
356  // we releasing buffer out of locked area, while it can make indirect call
357  // back to transport instance via memory pool event handling
358 
359  if (checkackn) CheckAcknReadyCounter(0);
360 }
361 
363 {
364  // method return true, if fast packet was received and transport
365  // should try to speed up its threads and probably, for some time switch
366  // in polling mode
367 
368  if (recid>=fNumRecs) {
369  EOUT("Recid fail tr %p %u %u", this, recid, fNumRecs);
370 // return;
371  exit(107);
372  }
373 
374  NetworkHeader* hdr = (NetworkHeader*) fRecs[recid].header;
375 
376  if (hdr->chkword != 123) {
377  EOUT("Error in network header magic number");
378  ReleaseRec(recid);
379  CloseTransport(true);
380  return;
381  }
382 
383  // check special case when we send only network header and nothing else
384  // for the moment this is only work with AcknCounter, later can be extend for other applications
385  if (hdr->kind & netot_HdrSend) {
386  uint32_t extras = hdr->size;
387 
388  fAcknAllowedOper += extras;
389  SubmitAllowedSendOperations();
390 
391  fInputQueueSize--;
392 
393  Buffer buf;
394 
395  buf << fRecs[recid].buf;
396 
397  ReleaseRec(recid);
398 
399  FillRecvQueue(&buf);
400 
401  } else {
402 
403  fInputQueueSize--;
404 
405  Buffer buf;
406 
407  buf << fRecs[recid].buf;
408 
409  buf.SetTotalSize(hdr->size);
410  buf.SetTypeId(hdr->typid);
411 
412  if ((hdr->size>0) && (hdr->size <= fInlineDataSize))
413  Pointer(buf).copyfrom(fRecs[recid].inlinebuf, hdr->size);
414 
415  ReleaseRec(recid);
416 
417  Send(buf);
418  }
419 
420 }
421 
422 
424 {
426 
427  FillRecvQueue();
428 }
429 
430 
432 {
433  unsigned numbufs = NumCanRecv(port);
434 
435  while (fOutputQueueSize < numbufs) {
436  // we create copy of the buffer, which will be used in the transport
437  // original reference will remain in the port queue until send operation is completed
438  Buffer buf = RecvQueueItem(port, fOutputQueueSize);
439 
440  uint32_t recid = TakeRec(buf, netot_Send);
441  if (recid==fNumRecs) {
442  EOUT("No available recs!!!");
443  exit(543);
444  }
445 
446  fOutputQueueSize++;
447 
448  // from this moment buf should be used from record directly
449  if (fAcknSendQueue.Capacity() > 0) {
450  fAcknSendQueue.Push(recid);
451  } else {
452  fNet->SubmitSend(recid);
453  }
454  }
455 
456  SubmitAllowedSendOperations();
457 }
458 
460 {
461  // when consumer take buffers from the queue, one can try to submit more recv operations
462  FillRecvQueue();
463 }
464 
466 {
467  FillRecvQueue();
468 }
469 
471 {
472  FillRecvQueue();
473 }
474 
475 
477 {
479  FillRecvQueue();
480  return true;
481 }
482 
484 {
486 }
487 
488 void dabc::NetworkTransport::GetRequiredQueuesSizes(const PortRef& port, unsigned& input_size, unsigned& output_size)
489 {
490  PortRef inpport, outport;
491 
492  if (port.IsInput()) {
493  inpport = port;
494  outport = inpport.GetBindPort();
495  } else {
496  outport = port;
497  inpport = outport.GetBindPort();
498  }
499 
500  input_size = inpport.QueueCapacity() + AcknoledgeQueueLength;
501  output_size = outport.QueueCapacity() + AcknoledgeQueueLength;
502 }
503 
504 
505 bool dabc::NetworkTransport::Make(const ConnectionRequest& req, WorkerAddon* addon, const std::string &devthrdname)
506 {
507  PortRef port = req.GetPort();
508 
509  if (req.null() || port.null()) {
510  EOUT("Port or request disappear while connection is ready");
511  delete addon;
512  return false;
513  }
514 
515  PortRef inpport, outport;
516 
517  if (port.IsInput()) {
518  inpport << port;
519  outport = inpport.GetBindPort();
520  } else {
521  outport << port;
522  inpport = outport.GetBindPort();
523  }
524 
525  std::string newthrdname = req.GetConnThread();
526  if (newthrdname.empty()) newthrdname = devthrdname;
527 
529  cmd.SetPoolName(req.GetPoolName());
530 
531  TransportRef tr = new NetworkTransport(cmd, inpport, outport, req.GetUseAckn(), addon);
532 
533  if (tr.MakeThreadForWorker(newthrdname)) {
534  tr.ConnectPoolHandles();
535  if (!inpport.null())
537  if (!outport.null())
539 
540  DOUT0("!!!!!! NETWORK TRANSPORT IS CREATED !!!!");
541  return true;
542 
543  }
544 
545  EOUT("No thread for transport");
546  tr.Destroy();
547  return false;
548 }
Reference on memory from memory pool.
Definition: Buffer.h:135
void SetTotalSize(BufferSize_t len)
Set total length of the buffer to specified value Size cannot be bigger than original size of the buf...
Definition: Buffer.cxx:99
void SetTypeId(unsigned tid)
Definition: Buffer.h:151
void SetPoolName(const std::string &name)
Definition: Manager.h:193
Represents command with its arguments.
Definition: Command.h:99
Connection request.
bool GetUseAckn() const
Use of acknowledge in protocol.
std::string GetPoolName() const
std::string GetConnThread() const
Thread name for transport.
Reference GetPort() const
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
bool ConnectPoolHandles()
Method called by manager to establish connection to pools TODO: while used from devices,...
Definition: Module.cxx:999
unsigned NumPools() const
Definition: Module.h:156
virtual void OnThreadAssigned()
Definition: Module.cxx:79
bool IsAutoPool(unsigned indx=0) const
Returns true when handle automatically delivers buffers via the connection.
Definition: Module.h:162
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
Definition: Module.cxx:109
Network interface.
virtual void AllocateNet(unsigned fulloutputqueue, unsigned fullinputqueue)=0
Network transport.
void SetRecHeader(uint32_t recid, void *header)
uint32_t TakeRec(Buffer &buf, uint32_t kind=0, uint32_t extras=0)
NetworkInetrface * fNet
virtual void ProcessPoolEvent(unsigned pool)
Method called by framework when pool event is produced.
void ReleaseRec(uint32_t recid)
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
virtual void TransportCleanup()
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
void FillRecvQueue(Buffer *freebuf=0, bool onlyfreebuf=false)
virtual void ProcessInputEvent(unsigned port)
Method called by framework when input event is produced.
void ProcessRecvCompl(uint32_t recid)
virtual void OnThreadAssigned()
bool CheckAcknReadyCounter(unsigned newitems=0)
void ProcessSendCompl(uint32_t recid)
NetIORecsQueue fAcknSendQueue
NetworkTransport(dabc::Command cmd, const PortRef &inpport, const PortRef &outport, bool useackn, WorkerAddon *addon)
int PackHeader(uint32_t recid)
static bool Make(const ConnectionRequest &req, WorkerAddon *addon, const std::string &devthrdname="")
virtual void ProcessOutputEvent(unsigned port)
Method called by framework when output event is produced.
static void GetRequiredQueuesSizes(const PortRef &port, unsigned &input_size, unsigned &output_size)
Manipulator with dabc::Buffer class.
Definition: Pointer.h:34
BufferSize_t copyfrom(const Pointer &src, BufferSize_t sz=0)
Returns actual size copied.
Definition: Pointer.cxx:54
Reference on the dabc::Port class
Definition: Port.h:195
bool IsInput() const
Returns true if it is input port.
Definition: Port.h:199
PortRef GetBindPort()
Return reference on the bind port.
Definition: Port.cxx:242
unsigned QueueCapacity() const
Returns queue capacity of the port.
Definition: Port.h:205
void Allocate(unsigned capacity)
Definition: Queue.h:115
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
void Destroy()
Release reference and starts destroyment of referenced object.
Definition: Reference.cxx:148
Reference on dabc::Transport class
Definition: Transport.h:109
PortRef InputPort()
Definition: Transport.h:114
PortRef OutputPort()
Definition: Transport.h:112
Base class for transport implementations.
Definition: Transport.h:37
bool IsInputTransport() const
Definition: Transport.h:94
virtual bool StopTransport()
Definition: Transport.cxx:231
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
Definition: Transport.cxx:220
bool IsOutputTransport() const
Definition: Transport.h:95
virtual void TransportCleanup()
Definition: Transport.h:77
long Notify(const std::string &cmd, int arg=0)
Definition: Worker.h:104
Generic addon for dabc::Worker.
Definition: Worker.h:49
bool MakeThreadForWorker(const std::string &thrdname="")
Definition: Worker.h:498
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
Definition: Worker.cxx:277
WorkerAddonRef fAddon
extension of worker for some special events
Definition: Worker.h:152
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
const unsigned AcknoledgeQueueLength
Definition: Transport.cxx:23
@ mbt_AcknCounter
Definition: Buffer.h:43