DABC (Data Acquisition Backbone Core)  2.9.9
LocalTransport.cxx
Go to the documentation of this file.
1 // $Id: LocalTransport.cxx 3781 2017-12-05 12:55:40Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/LocalTransport.h"
17 
18 #include "dabc/MemoryPool.h"
19 
20 
21 dabc::LocalTransport::LocalTransport(unsigned capacity, bool withmutex) :
22  dabc::Object("queue"),
23  fQueue(capacity),
24  fWithMutex(withmutex),
25  fOut(),
26  fOutId(0),
27  fOutSignKind(0),
28  fSignalOut(3), // signal output after first operation
29  fInp(),
30  fInpId(0),
31  fInpSignKind(0),
32  fSignalInp(3), // signal input after any first operation
33  fConnected(0),
34  fBlockWhenUnconnected(false),
35  fBlockWhenConnected(true)
36 {
37  SetFlag(flAutoDestroy, true);
38 
39  DOUT3("Create buffers queue %p", this);
40 }
41 
43 {
44 // DOUT3("Destroy dabc::LocalTransport %p size %u", this, fQueue.Size());
45 
46  if (fConnected!=0)
47  EOUT("Queue was not correctly disconnected %u", fConnected);
48 
49  if (fQueue.Size() != 0) {
50  // EOUT("!!! QUEUE WAS NOT cleaned up");
51  CleanupQueue();
52  }
53 }
54 
55 
57 {
58  // TODO: check if we need mutex
59  // TODO: check if we need to copy buffer (different pools)
60  // TODO: check if thread boundary crossed, that not many references on the buffer exists
61  // TODO: check if every buffer must be signaled
62 
63  if (buf.null()) return true;
64 
65 // DOUT0("Local transport %p send buffer %u", this, (unsigned) buf.SegmentId(0));
66 
67 
68  dabc::Buffer skipbuf;
69  dabc::WorkerRef mdl;
70  unsigned id(0);
71 
72  {
73  dabc::LockGuard lock(QueueMutex());
74 
75  // when send operation invoked in not connected state, one could reject buffer
76  // but ptobably reconnection will be started therefore try to add buffer into the queue
77  //if (fConnected != MaskConn) {
78  // DOUT1("Local transport %s ignore buffer while not fully connected mask %u inp %s out %s",
79  // GetName(), fConnected, (fInp.null() ? "---" : fInp.GetName()), (fOut.null() ? "---" : fOut.GetName()));
80  // return false;
81  // }
82 
83  if (buf.NumReferences() > 1)
84  EOUT("Buffer ref cnt %d bigger than 1, which means extra buffer instance inside thread", buf.NumReferences());
85 
86  // printf("PUSH: %s -> %s cap:%u sz:%u conn: %s flags: %s %s\n",
87  // fOut.ItemName().c_str(), fInp.ItemName().c_str(), fQueue.Capacity(), fQueue.Size(),
88  // DBOOL(fConnected == MaskConn), DBOOL(fBlockWhenConnected), DBOOL(fBlockWhenUnconnected));
89 
90  // when queue is full and transport in non-blocking mode, skip latest buffer
91  if (fQueue.Full() && !((fConnected == MaskConn) ? fBlockWhenConnected : fBlockWhenUnconnected)) {
92  fQueue.PopBuffer(skipbuf);
93  }
94 
95  if (!fQueue.PushBuffer(buf)) {
96  EOUT("Not able to push buffer into the %s -> %s queue, mutex: %s skipped: %s, queuefull: %s %u %s, connected: %s, blflags: %s %s",
97  fOut.ItemName().c_str(), fInp.ItemName().c_str(), DBOOL(QueueMutex()!=0), DBOOL(!skipbuf.null()), DBOOL(fQueue.Full()), fQueue.Size(), fQueue.Capacity(),
98  DBOOL(fConnected == MaskConn), DBOOL(fBlockWhenConnected), DBOOL(fBlockWhenUnconnected));
99  }
100 
101  if (!buf.null()) { EOUT("Something went wrong - buffer is not null here"); exit(3); }
102 
103  if (fSignalOut==2) fSignalOut = 3; // mark that output operation done
104 
105  bool makesig(false);
106 
107  // only if input port still connected, deliver events to it
108  if (fConnected & MaskInp)
109  switch (fInpSignKind) {
110  case Port::SignalNone: return true;
111 
112  case Port::SignalConfirm:
113  if (fSignalInp == 3) { makesig = true; fSignalInp = 1; }
114  break;
115 
117  if (fSignalInp == 3) { makesig = true; fSignalInp = 2; }
118  break;
119 
120  case Port::SignalEvery:
121  makesig = true;
122  break;
123  }
124 
125 // DOUT1("QUEUE %p SEND inp:%u out:%u makesig:%s", this, fSignalInp, fSignalOut, DBOOL(makesig));
126 
127  // make reference under mutex - insure that something will not change in between
128  if (makesig) {
129  mdl = fInp;
130  id = fInpId;
131  }
132  }
133 
134  skipbuf.Release();
135 
136  mdl.FireEvent(evntInput, id);
137 
138  return true;
139 }
140 
142 {
143  dabc::WorkerRef mdl;
144  unsigned id(0);
145 
146  {
147  dabc::LockGuard lock(QueueMutex());
148 
149  if (!buf.null()) { EOUT("AAAAAAAAAA"); exit(432); }
150 
151  fQueue.PopBuffer(buf);
152 
153  if (fSignalInp == 2) fSignalInp = 3;
154 
155  bool makesig(false);
156 
157  switch (fOutSignKind) {
158  case Port::SignalNone: return true;
159 
160  case Port::SignalConfirm:
161  // if operation was confirmed by sender, we could signal immediately
162  if (fSignalOut == 3) { makesig = true; fSignalOut = 1; }
163  break;
164 
166  if (fSignalOut == 3) { makesig = true; fSignalOut = 2; }
167  break;
168 
169  case Port::SignalEvery:
170  makesig = true;
171  break;
172  }
173 
174 // DOUT1("QUEUE %p RECV inp:%u out:%u makesig:%s", this, fSignalInp, fSignalOut, DBOOL(makesig));
175 
176  // signal output event only if sender did something after previous event
177  if (makesig) {
178  // make reference under mutex - insure that something will not change in between
179  mdl = fOut;
180  id = fOutId;
181  }
182  }
183 
184  mdl.FireEvent(evntOutput, id);
185 
186  return true;
187 }
188 
190 {
191  // Main motivation for the method - set queue in the state that it signal
192  // input port when queue is full.
193 
194  // But another use is like dummy read method.
195  // Means is there is enough space in the queue, output event will be
196  // generated simulating that input port read data from the queue
197 
198 
199  dabc::WorkerRef mdl;
200  unsigned id(0), evnt(0);
201 
202  {
203  dabc::LockGuard lock(QueueMutex());
204 
205  if (fQueue.Full()) {
206  mdl = fInp;
207  id = fInpId;
208  evnt = evntInput;
209  } else {
210  if (fSignalInp == 2) fSignalInp = 3;
211 
212  bool makesig(false);
213 
214  switch (fOutSignKind) {
215  case Port::SignalNone: return;
216 
217  case Port::SignalConfirm:
218  // if operation was confirmed by sender, we could signal immediately
219  if (fSignalOut == 3) { makesig = true; fSignalOut = 1; }
220  break;
221 
223  if (fSignalOut == 3) { makesig = true; fSignalOut = 2; }
224  break;
225 
226  case Port::SignalEvery:
227  makesig = true;
228  break;
229  }
230 
231  // signal output event only if sender did something after previous event
232  if (makesig) {
233 
234  DOUT3("Producing output signal from DummyRecv");
235 
236  // make reference under mutex - insure that something will not change in between
237  mdl = fOut;
238  id = fOutId;
239  evnt = evntOutput;
240  }
241  }
242  }
243 
244  mdl.FireEvent(evnt, id);
245 }
246 
247 
248 void dabc::LocalTransport::ConfirmEvent(bool fromoutputport)
249 {
250  // method only called by ports, which are configured as Port::SignalConfirm
251 
252 
253  dabc::LockGuard lock(QueueMutex());
254 
255  if (fromoutputport) {
256  // after current output event is confirmed we are normally
257  // should first send buffer in the queue and only than next recv operation on other side
258  // will produce new output event
259  // if queue full at this moment, no any send is possible and therefore
260  // we just waiting for next recv operation
261 
262  fSignalOut = fQueue.Full() ? 3 : 2;
263  } else {
264  // after current input event confirmed we are normally
265  // should first take buffer from the queue and only than next send operation
266  // will produce new input event
267  // but if queue is empty at this moment, no any recv operation possible and therefore
268  // we just waiting for next send operation
269 
270  fSignalInp = fQueue.Empty() ? 3 : 2; // we are waiting first recv operation
271  }
272 
273 // DOUT0("QUEUE %p Conf inp:%u out:%u", this, fSignalInp, fSignalOut);
274 
275 }
276 
277 void dabc::LocalTransport::Disconnect(bool isinp, bool witherr)
278 {
279  dabc::WorkerRef m1, m2;
280  unsigned id1, id2;
281 
282  bool cleanup(false);
283 
284  {
285  // we remove all references from queue itself
286  dabc::LockGuard lock(QueueMutex());
287 
288  id1 = fInpId;
289  if (isinp) {
290  m1 << fInp;
291  fInpId = 0;
292  } else {
293  m1 = fInp; // we will use reference to deliver signal
294  }
295 
296  id2 = fOutId;
297  if (!isinp) {
298  m2 << fOut;
299  fOutId = 0;
300  } else {
301  m2 = fOut;
302  }
303  fConnected = fConnected & ~(isinp ? MaskInp : MaskOut);
304  if (fConnected == 0) cleanup = true;
305  }
306 
307  DOUT3("Queue %p disconnected witherr %s isinp %s conn %u m1:%s m2:%s", this, DBOOL(witherr), DBOOL(isinp), fConnected, m1.GetName(), m2.GetName());
308 
309  if (!isinp) m1.FireEvent(witherr ? evntPortError : evntPortDisconnect, id1);
310 
311  if (isinp) m2.FireEvent(witherr ? evntPortError : evntPortDisconnect, id2);
312 
313  m1.Release();
314 
315  m2.Release();
316 
317  if (cleanup) {
318  DOUT3("Perform queue %p cleanup by disconnect", this);
319  CleanupQueue();
320  }
321 }
322 
323 
325 {
326  fQueue.Cleanup(QueueMutex());
327 }
328 
329 
330 void dabc::LocalTransport::PortActivated(int itemkind, bool on)
331 {
332  dabc::WorkerRef other;
333  unsigned otherid(0);
334 
335  {
336  dabc::LockGuard lock(QueueMutex());
337 
338  if (itemkind == mitOutPort) {
339  other = fInp;
340  otherid = fInpId;
341  } else {
342  other = fOut;
343  otherid = fOutId;
344  }
345  }
346 
347  other.FireEvent(on ? evntConnStart : evntConnStop, otherid);
348 }
349 
351 {
352  if (port1ref.null() && port2ref.null()) return cmd_true;
353 
354  PortRef port_out = port1ref;
355  PortRef port_inp = port2ref;
356 
357  if (port_out.null() || port_inp.null()) return cmd_false;
358 
359  std::string blocking = port_out.Cfg("blocking", cmd).AsStr();
360  if (blocking.empty()) blocking = port_inp.Cfg("blocking").AsStr("connected");
361 
362  DOUT2("Connect ports %s -> %s", port_out.ItemName().c_str(), port_inp.ItemName().c_str());
363 
364  ModuleRef m1 = port_out.GetModule();
365  ModuleRef m2 = port_inp.GetModule();
366 
367  LocalTransportRef q_out = port_out()->fQueue;
368  LocalTransportRef q_inp = port_inp()->fQueue;
369 
370  if (!q_out.null() && !q_inp.null()) {
371  EOUT("Both ports have existing queues - should not happen");
372  q_out.Release();
373  q_inp.Release();
374  }
375 
376  bool withmutex(true), assign_out(true), assign_inp(true);
377 
378  if (m1.IsSameThread(m2)) {
379  DOUT3("!!!! Can create queue without mutex !!!");
380  withmutex = false;
381  }
382 
383  unsigned queuesize = port_out.QueueCapacity() > port_inp.QueueCapacity() ?
384  port_out.QueueCapacity() : port_inp.QueueCapacity();
385 
387 
388  if (!q_out.null()) {
389  q << q_out;
390  if (withmutex) q()->EnableMutex();
391  assign_out = false;
392 
393  DOUT3("REUSE queue of output port %s", port_out.ItemName().c_str());
394  } else
395  if (!q_inp.null()) {
396  q << q_inp;
397  if (withmutex) q()->EnableMutex();
398  assign_inp = false;
399  DOUT3("REUSE queue of input port %s", port_inp.ItemName().c_str());
400  } else {
401  q = new LocalTransport(queuesize, withmutex);
402  }
403 
404  if (blocking == "disconnected") {
405  q()->fBlockWhenUnconnected = true;
406  q()->fBlockWhenConnected = false;
407  } else
408  if (blocking == "never") {
409  DOUT0("Never block output port %s", port_out.ItemName().c_str());
410  q()->fBlockWhenUnconnected = false;
411  q()->fBlockWhenConnected = false;
412  } else
413  if (blocking == "always") {
414  q()->fBlockWhenUnconnected = true;
415  q()->fBlockWhenConnected = true;
416  } else {
417  q()->fBlockWhenUnconnected = false;
418  q()->fBlockWhenConnected = true;
419  }
420 
421  if (assign_inp) {
422  q()->fInp = m2;
423  q()->fInpId = port_inp.ItemId();
424  q()->fInpSignKind = port_inp.GetSignalingKind();
425  }
426 
427  if (assign_out) {
428  q()->fOut = m1;
429  q()->fOutId = port_out.ItemId();
430  q()->fOutSignKind = port_out.GetSignalingKind();
431  }
432 
433  // first of all, we must connect input port
434  if (assign_inp) {
435  dabc::Command cmd2("SetQueue");
436  cmd2.SetStr("Port", port_inp.GetName());
437  cmd2.SetRef("Queue", q);
438  if (!m2.Execute(cmd2)) return cmd_false;
439  }
440 
441  // than assign output port
442  if (assign_out) {
443  dabc::Command cmd1("SetQueue");
444  cmd1.SetStr("Port", port_out.GetName());
445  cmd1.SetRef("Queue", q);
446  if (!m1.Execute(cmd1)) return cmd_false;
447  }
448 
449  bool m1running = m1.IsRunning();
450  bool m2running = m2.IsRunning();
451 
452  // inform each module that it's port is connected
453  m1.FireEvent(evntPortConnect, port_out.ItemId());
454  m2.FireEvent(evntPortConnect, port_inp.ItemId());
455 
456  // inform modules if another is already running
457  if (m1running && !m2running)
458  m2.FireEvent(evntConnStart, port_inp.ItemId());
459 
460  if (!m1running && m2running)
461  m1.FireEvent(evntConnStart, port_out.ItemId());
462 
463  return cmd_true;
464 }
Reference on memory from memory pool.
Definition: Buffer.h:135
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
Definition: Command.cxx:168
Reference on the dabc::LocalTransport
Transport between two ports on the same node
void Disconnect(bool isinp, bool witherr=false)
bool Recv(Buffer &buf)
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
void ConfirmEvent(bool isoutput)
LocalTransport(unsigned capacity, bool withmutex)
void PortActivated(int itemkind, bool on)
bool Send(Buffer &buf)
Lock guard for posix mutex.
Definition: threads.h:127
WorkerRef GetModule() const
Definition: ModuleItem.cxx:66
unsigned ItemId() const
Definition: ModuleItem.h:121
Reference on dabc::Module class
Definition: Module.h:275
bool IsRunning() const
Definition: Module.h:285
Base class for most of the DABC classes.
Definition: Object.h:116
void SetFlag(unsigned fl, bool on=true)
Change value of selected flag, not thread safe
Definition: Object.h:187
@ flAutoDestroy
object will be automatically destroyed when no references exists, normally set in constructor,...
Definition: Object.h:167
Reference on the dabc::Port class
Definition: Port.h:195
int GetSignalingKind()
Returns signaling method configured for the port.
Definition: Port.cxx:223
unsigned QueueCapacity() const
Returns queue capacity of the port.
Definition: Port.h:205
@ SignalEvery
Definition: Port.h:63
@ SignalOperation
Definition: Port.h:62
@ SignalNone
Definition: Port.h:59
@ SignalConfirm
Definition: Port.h:60
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
unsigned NumReferences() const
Returns number of references on the object.
Definition: Reference.cxx:112
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Reference.cxx:241
Reference on dabc::Worker
Definition: Worker.h:466
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
Definition: Worker.h:482
bool FireEvent(uint16_t evid, uint32_t arg)
Definition: Worker.h:508
bool Execute(Command cmd, double tmout=-1.)
Definition: Worker.cxx:1147
bool IsSameThread(const WorkerRef &ref)
Returns true if two workers share same thread.
Definition: Worker.cxx:1170
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
@ mitOutPort
Definition: ModuleItem.h:31
@ evntConnStop
Definition: ModuleItem.h:50
@ evntPortDisconnect
Definition: ModuleItem.h:47
@ evntInput
Definition: ModuleItem.h:41
@ evntPortConnect
Definition: ModuleItem.h:46
@ evntConnStart
Definition: ModuleItem.h:49
@ evntPortError
Definition: ModuleItem.h:48
@ evntOutput
Definition: ModuleItem.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38