DABC (Data Acquisition Backbone Core)  2.9.9
ModuleAsync.cxx
Go to the documentation of this file.
1 // $Id: ModuleAsync.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/ModuleAsync.h"
17 
18 
20 {
21 }
22 
24 {
25  InputPort *inp = Input(indx);
26  return inp ? inp->QueueFull() : false;
27 }
28 
30 {
31  InputPort *inp = Input(indx);
32  if (inp) inp->SignalWhenFull();
33 }
34 
35 dabc::Buffer dabc::ModuleAsync::RecvQueueItem(unsigned indx, unsigned nbuf)
36 {
37  InputPort *inp = Input(indx);
38  if (inp) return inp->Item(nbuf);
39  return dabc::Buffer();
40 }
41 
42 dabc::Buffer dabc::ModuleAsync::PoolQueueItem(unsigned poolindex, unsigned nbuf)
43 {
44  PoolHandle* pool = Pool(poolindex);
45  if (pool) return pool->Item(nbuf);
46  return dabc::Buffer();
47 }
48 
50 {
51  switch (evid) {
52  case evntInput:
53  ((Port*) item)->ConfirmEvent();
54  /* no break */
55  case evntInputReinj:
56  if (item->GetType() == mitPool)
57  ProcessPoolEvent(item->fSubId);
58  else
59  ProcessInputEvent(item->fSubId);
60  break;
61  case evntOutput:
62  ((Port*) item)->ConfirmEvent();
63  /* no break */
64  case evntOutputReinj:
65  ProcessOutputEvent(item->fSubId);
66  break;
67  case evntTimeout:
68  ProcessTimerEvent(item->fSubId);
69  break;
70  case evntPortConnect:
71  ProcessConnectEvent(item->GetName(), true);
72  break;
73  case evntPortDisconnect:
74  case evntPortError:
75  ProcessConnectEvent(item->GetName(), false);
76  break;
77  case evntUser:
78  ProcessUserEvent(item->fSubId);
79  break;
80  default:
81  break;
82  }
83 }
84 
86 {
87  if (!dabc::Module::DoStart()) return false;
88 
89  // TODO: in case of every event generate appropriate number of events
90  for (unsigned n=0;n<NumInputs();n++)
91  ProduceInputEvent(n, fInputs[n]->NumStartEvents());
92 
93  for (unsigned n=0;n<NumOutputs();n++)
94  ProduceOutputEvent(n, fOutputs[n]->NumStartEvents());
95 
96  for (unsigned n=0;n<NumPools();n++)
97  ProducePoolEvent(n, fPools[n]->NumStartEvents());
98 
99  return true;
100 }
101 
103 {
104  InputPort* inp = Input(port);
105 
106  if ((inp!=0) && IsRunning() && inp->CanRecv())
107  FireEvent(evntInputReinj, inp->ItemId());
108 }
109 
111 {
112  InputPort* inp = Input(port);
113 
114  int cnt = inp->GetMaxLoopLength();
115 
116  while (IsRunning() && inp->CanRecv()) {
117  if (!ProcessRecv(port)) return;
118  if (cnt<0) return;
119  if (cnt-- == 0) {
120  DOUT3("Port %s performed too many receive operations - break the loop", inp->ItemName().c_str());
121  FireEvent(evntInputReinj, inp->ItemId());
122  return;
123  }
124  }
125 }
126 
128 {
129  OutputPort* out = Output(port);
130 
131  if ((out!=0) && IsRunning() && out->CanSend())
132  FireEvent(evntOutputReinj, out->ItemId());
133 }
134 
136 {
137  OutputPort* out = Output(port);
138 
139  int cnt = out->GetMaxLoopLength();
140 
141  while (IsRunning() && out->CanSend()) {
142  if (!ProcessSend(port)) return;
143  if (cnt<0) return;
144  if (cnt-- == 0) {
145  DOUT3("Port %s performed too many send operations - break the loop", out->ItemName().c_str());
146  FireEvent(evntOutputReinj, out->ItemId());
147  return;
148  }
149  }
150 }
151 
152 void dabc::ModuleAsync::ActivatePool(unsigned poolindex)
153 {
154  PoolHandle* pool = Pool(poolindex);
155 
156  if ((pool!=0) && IsRunning() && pool->CanTakeBuffer())
157  FireEvent(evntOutputReinj, pool->ItemId());
158 }
159 
160 
162 {
163  // DOUT0("Module %s process pool event %u", GetName(), pool->NumRequestedBuffer());
164 
165  PoolHandle* pool = Pool(indx);
166 
167  int cnt = pool->GetMaxLoopLength();
168 
169  while (IsRunning() && pool->CanTakeBuffer()) {
170  if (!ProcessBuffer(indx)) return;
171  if (cnt<0) return;
172  if (cnt-- == 0) {
173  DOUT3("Pool %s performed too many send operations - break the loop", pool->GetName());
174  FireEvent(evntInputReinj, pool->ItemId());
175  return;
176  }
177  }
178 }
179 
180 
Reference on memory from memory pool.
Definition: Buffer.h:135
Input port.
Definition: Port.h:244
Buffer Item(unsigned indx)
Definition: Port.h:270
bool CanRecv() const
Returns true if user can get (receive) buffer from the port.
Definition: Port.h:273
void SignalWhenFull()
This method say framework that signal must be issued when queue will be fulled.
Definition: Port.h:280
bool QueueFull()
Returns true, when input queue is full and cannot get more buffers.
Definition: Port.h:268
void ActivatePool(unsigned pool)
Produces event for specified pool handle.
virtual void ProcessPoolEvent(unsigned pool)
Method called by framework when pool event is produced.
bool RecvQueueFull(unsigned port=0)
Returns true if receive queue is full and block input.
Definition: ModuleAsync.cxx:23
Buffer RecvQueueItem(unsigned port=0, unsigned nbuf=0)
Returns buffer from receive queue of the input port.
Definition: ModuleAsync.cxx:35
virtual void ProcessOutputEvent(unsigned port)
Method called by framework when output event is produced.
void ActivateInput(unsigned port=0)
Produces event for specified input port Should be used when processing was stopped due to return fals...
void SignalRecvWhenFull(unsigned port=0)
Let input signal only when queue is full.
Definition: ModuleAsync.cxx:29
virtual ~ModuleAsync()
Destructor of ModuleAsync class.
Definition: ModuleAsync.cxx:19
virtual void ProcessItemEvent(ModuleItem *item, uint16_t evid)
Generic event processing method [internal].
Definition: ModuleAsync.cxx:49
virtual bool DoStart()
Activate module [internal].
Definition: ModuleAsync.cxx:85
Buffer PoolQueueItem(unsigned pool=0, unsigned nbuf=0)
Returns buffer from queue assigned with the pool.
Definition: ModuleAsync.cxx:42
void ActivateOutput(unsigned port=0)
Produces event for specified output port.
virtual void ProcessInputEvent(unsigned port)
Method called by framework when input event is produced.
Base class for module items like ports, timers, pool handles.
Definition: ModuleItem.h:68
int GetType() const
Definition: ModuleItem.h:103
unsigned fSubId
Definition: ModuleItem.h:77
unsigned ItemId() const
Definition: ModuleItem.h:104
virtual bool DoStart()
Definition: Module.cxx:563
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Object.cxx:1076
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
Output port.
Definition: Port.h:301
bool CanSend() const
Returns true if user can send get buffer via the port.
Definition: Port.h:325
Handle for pool connection.
Definition: Port.h:347
bool CanTakeBuffer() const
Definition: Port.h:388
Buffer Item(unsigned indx)
Definition: Port.h:386
Base class for input and output ports.
Definition: Port.h:47
int GetMaxLoopLength()
Return maximum number of events, which could be processed at once.
Definition: Port.cxx:173
#define DOUT3(args ...)
Definition: logging.h:176
@ mitPool
Definition: ModuleItem.h:32
@ evntInputReinj
Definition: ModuleItem.h:43
@ evntPortDisconnect
Definition: ModuleItem.h:47
@ evntUser
Definition: ModuleItem.h:52
@ evntTimeout
Definition: ModuleItem.h:45
@ evntInput
Definition: ModuleItem.h:41
@ evntPortConnect
Definition: ModuleItem.h:46
@ evntOutputReinj
Definition: ModuleItem.h:44
@ evntPortError
Definition: ModuleItem.h:48
@ evntOutput
Definition: ModuleItem.h:42