DABC (Data Acquisition Backbone Core)  2.9.9
ModuleSync.cxx
Go to the documentation of this file.
1 // $Id: ModuleSync.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/ModuleSync.h"
17 
18 dabc::ModuleSync::ModuleSync(const std::string &name, Command cmd) :
19  Module(name, cmd),
20  fTmoutExcept(false),
21  fDisconnectExcept(false),
22  fSyncCommands(false),
23  fNewCommands(0),
24  fWaitItem(0),
25  fWaitId(0),
26  fWaitRes(false),
27  fInsideMainLoop(false)
28 {
29 }
30 
32 {
33  // if module was not yet halted, make sure that mainloop of the module is leaved
34 
35  if (IsRunning()) {
36  EOUT("Problem in sync module %s destructor - cannot leave normally main loop, must crash :-O", GetName());
37  }
38 
39  if (fNewCommands!=0) {
40  EOUT("Some commands remain event in module %s destructor - BAD", GetName());
41  delete fNewCommands;
42  fNewCommands = 0;
43  }
44 }
45 
46 bool dabc::ModuleSync::WaitConnect(const std::string &name, double timeout)
47 {
48  PortRef port = FindPort(name);
49 
50  if (port.null()) return false;
51 
52  uint16_t evid(evntModuleNone);
53 
54  do {
55  // we are using direct pointer, while method can only be used from inside thread
56  if (port()->IsConnected()) return true;
57 
58  if (evid == evntPortConnect)
59  throw Exception(ex_Connect, "Get connection event when port is not connected", port.ItemName());
60 
61  } while (WaitItemEvent(timeout, port(), &evid));
62 
63  return false;
64 
65 }
66 
67 bool dabc::ModuleSync::Send(unsigned indx, Buffer &buf, double timeout)
68 {
69  OutputPort* port = Output(indx);
70 
71  if ((port==0) || buf.null()) return false;
72 
73  uint16_t evid(evntModuleNone);
74 
75  // one need Keeper to release buffer in case of exceptions
76  do {
77 
78  if ((evid == evntPortDisconnect) || (evid==evntPortError))
79  if (IsDisconnectExcept())
80  throw Exception(ex_Disconnect, "Port disconnected when sending buffer", port->ItemName());
81 
82  if (port->CanSend())
83  return port->Send(buf);
84 
85  } while (WaitItemEvent(timeout, port, &evid));
86 
87  return false;
88 }
89 
90 
91 dabc::Buffer dabc::ModuleSync::Recv(unsigned indx, double timeout)
92 {
93  InputPort* port = Input(indx);
94 
95  if (port==0) return Buffer();
96 
97  uint16_t evid(evntModuleNone);
98 
99  do {
100  if ((evid == evntPortDisconnect) || (evid==evntPortError))
101  if (IsDisconnectExcept())
102  throw Exception(ex_Disconnect, "Port disconnected when receiving buffer", port->ItemName());
103 
104  if (port->CanRecv())
105  return port->Recv();
106 
107  } while (WaitItemEvent(timeout, port, &evid));
108 
109  return Buffer();
110 }
111 
112 dabc::Buffer dabc::ModuleSync::TakeBuffer(unsigned poolindx, double timeout)
113 {
114  PoolHandle* handle = Pool(poolindx);
115  if (handle==0) return (poolindx==0) ? TakeDfltBuffer() : Buffer();
116 
117  do {
118  if (handle->CanTakeBuffer())
119  return handle->TakeBuffer();
120  } while (WaitItemEvent(timeout, handle));
121 
122  return Buffer();
123 }
124 
125 
126 dabc::Buffer dabc::ModuleSync::RecvFromAny(unsigned* indx, double timeout)
127 {
128  uint16_t evid(evntModuleNone);
129  ModuleItem* resitem(0);
130  unsigned shift(0);
131 
132  do {
133  if ((evid == evntPortDisconnect) || (evid==evntPortError))
134  if (IsDisconnectExcept())
135  throw Exception(ex_Disconnect, "Port disconnected when receiving buffer", resitem ? resitem->ItemName() : "");
136 
137  if (NumInputs() == 0) return Buffer();
138 
139  if (resitem && (evid == evntInput))
140  shift = resitem->fSubId;
141  else
142  shift = 0;
143 
144  for (unsigned n=0; n < NumInputs(); n++) {
145  InputPort* p = fInputs[(n+shift) % NumInputs()];
146  if (p->CanRecv()) {
147  if (indx) *indx = p->fSubId;
148  return p->Recv();
149  }
150  }
151  } while (WaitItemEvent(timeout, 0, &evid, &resitem));
152 
153  return Buffer();
154 }
155 
156 
157 bool dabc::ModuleSync::WaitInput(unsigned indx, unsigned minqueuesize, double timeout)
158 {
159  InputPort* port = Input(indx);
160 
161  if (port==0) return false;
162 
163  uint16_t evid(evntModuleNone);
164 
165  do {
166  if ((evid == evntPortDisconnect) || (evid==evntPortError))
167  if (IsDisconnectExcept())
168  throw Exception(ex_Disconnect, "Port disconnected when waiting for input buffers", port->ItemName());
169 
170  if (port->NumCanRecv() >= minqueuesize) return true;
171  } while (WaitItemEvent(timeout, port, &evid));
172 
173  return false;
174 }
175 
177 {
178  AsyncProcessCommands();
179 
180  if (!SingleLoop(timeout))
181  throw dabc::Exception(ex_Stop, "Module stopped", ItemName());
182 
183  return true;
184 }
185 
186 uint16_t dabc::ModuleSync::WaitEvent(double timeout)
187 {
188  uint16_t evid = 0;
189 
190  if (!WaitItemEvent(timeout, 0, &evid)) evid = 0;
191 
192  return evid;
193 }
194 
196 {
197  if (cmd.IsName("StartModule")) {
198  // module already running
199  if (IsRunning()) return cmd_true;
200 
201  if (fInsideMainLoop) return cmd_bool(DoStart());
202 
203  return cmd_bool(ActivateMainLoop());
204  } else
205  if (cmd.IsName("StopModule")) {
206  if (!IsRunning()) return cmd_true;
207 
208  if (!fInsideMainLoop) {
209  EOUT("Something wrong, module %s runs without main loop ????", GetName());
210  return cmd_false;
211  }
212 
213  AsyncProcessCommands();
214 
215  return cmd_bool(DoStop());
216  }
217 
218  int cmd_res = Module::PreviewCommand(cmd);
219 
220  // asynchronous execution possible only in running mode,
221  // when module is stopped, commands will be executed immediately
222  if (!fSyncCommands && (cmd_res==cmd_ignore) && IsRunning()) {
223 
224  if (fNewCommands==0)
225  fNewCommands = new CommandsQueue(CommandsQueue::kindSubmit);
226 
227  fNewCommands->Push(cmd);
228  cmd_res = cmd_postponed;
229  }
230 
231  return cmd_res;
232 }
233 
235 {
236  Stop();
237 
238  DOUT1("Stop module %s until restart", GetName());
239 
240  double tmout = -1.;
241 
242  WaitItemEvent(tmout);
243 
244  DOUT1("Finish StopUntilRestart for module %s", GetName());
245 }
246 
247 
249 {
250  if (fNewCommands!=0) {
251  EOUT("Some commands remain even when module %s is cleaned up - BAD", GetName());
252  AsyncProcessCommands();
253  }
254 
255  DOUT4("ModuleSync::ObjectCleanup %s", GetName());
256 
258 }
259 
261 {
262  if (fNewCommands==0) return;
263 
264  while (fNewCommands->Size()>0) {
265  Command cmd = fNewCommands->Pop();
266  int cmd_res = ExecuteCommand(cmd);
267  if (cmd_res>=0) cmd.Reply(cmd_res);
268  }
269 
270  delete fNewCommands;
271  fNewCommands = 0;
272 }
273 
275 {
276  if ((evid==evntInput) || (evid==evntOutput))
277  ((Port*) item)->ConfirmEvent();
278 
279  // no need to store any consequent events
280  if (fWaitRes) return;
281 
282  if (item==0) { EOUT("Zero item !!!!!!!!!!!!!"); }
283 
284  if ((fWaitItem==item) || (fWaitItem==0)) {
285  fWaitRes = true;
286  fWaitId = evid;
287  fWaitItem = item;
288  }
289 }
290 
291 bool dabc::ModuleSync::WaitItemEvent(double& tmout, ModuleItem* item, uint16_t *resevid, ModuleItem** resitem)
292 {
293  if (tmout<0) return false;
294 
295  fWaitItem = item;
296  fWaitId = 0;
297  fWaitRes = false;
298 
299  TimeStamp last_tm, tm;
300 
301  // if module not in running state, wait item event will block main loop completely
302  while (!fWaitRes || !IsRunning()) {
303 
304  // account timeout only in running state
305  if ((tmout>=0) && IsRunning()) {
306  tm.GetNow();
307 
308  if (!last_tm.null()) {
309  tmout -= (tm - last_tm);
310  if (tmout<0) {
311  if (IsTmoutExcept())
312  throw Exception(ex_Timeout, "Operation timeout for item", ItemName());
313  else
314  return false;
315  }
316  }
317 
318  last_tm = tm;
319  } else
320  last_tm.Reset();
321 
322  // SingleLoop return false only when Worker should be halted,
323  // we use this to stop module and break recursion
324 
325  if (!SingleLoop(tmout)) throw Exception(ex_Stop, "Module stopped when waiting for", ItemName());
326  }
327 
328  if (resevid!=0) *resevid = fWaitId;
329  if (resitem!=0) *resitem = fWaitItem;
330 
331  return true; // it is normal exit
332 }
333 
335 {
336  DoStart();
337 
338  try {
339  fInsideMainLoop = true;
340 
341  MainLoop();
342 
343  fInsideMainLoop = false;
344  } catch (...) {
345  fInsideMainLoop = false;
346  throw;
347  }
348 }
349 
351 {
352  fInsideMainLoop = false;
353 
354  if (IsRunning()) {
355  AsyncProcessCommands();
356  DoStop();
357  }
358 
359  DOUT3("Stop sync module %s", GetName());
360 }
Reference on memory from memory pool.
Definition: Buffer.h:135
Represents command with its arguments.
Definition: Command.h:99
void Reply(int res=cmd_noresult)
Replied on the command.
Definition: Command.cxx:225
Queue of commands
Definition: CommandsQueue.h:39
@ kindSubmit
command submitted for execution
Definition: CommandsQueue.h:45
DABC exception.
Definition: Exception.h:57
Input port.
Definition: Port.h:244
unsigned NumCanRecv()
Defines how many buffers can be received.
Definition: Port.h:262
Buffer Recv()
Definition: Port.cxx:320
bool CanRecv() const
Returns true if user can get (receive) buffer from the port.
Definition: Port.h:273
Base class for module items like ports, timers, pool handles.
Definition: ModuleItem.h:68
unsigned fSubId
Definition: ModuleItem.h:77
bool WaitItemEvent(double &tmout, ModuleItem *item=0, uint16_t *resevid=0, ModuleItem **resitem=0)
Definition: ModuleSync.cxx:291
Buffer RecvFromAny(unsigned *port=0, double timeout=-1)
Receive buffer from any input port.
Definition: ModuleSync.cxx:126
Buffer TakeBuffer(unsigned pool=0, double timeout=-1)
Take buffer from memory pool.
Definition: ModuleSync.cxx:112
bool ModuleWorking(double timeout=0.)
Returns true when module in running state.
Definition: ModuleSync.cxx:176
bool WaitConnect(const std::string &name, double timeout=-1)
Waits for connection for specified port.
Definition: ModuleSync.cxx:46
virtual void DoWorkerMainLoop()
Internal - entrance function for main loop execution.
Definition: ModuleSync.cxx:334
virtual void ProcessItemEvent(ModuleItem *item, uint16_t evid)
Internal - central method of events processing.
Definition: ModuleSync.cxx:274
bool WaitInput(unsigned indx, unsigned minqueuesize=1, double timeout=-1)
Waits until input port has specified number of buffers in the queue.
Definition: ModuleSync.cxx:157
bool Send(unsigned indx, Buffer &buf, double timeout=-1)
Send buffer via specified output port.
Definition: ModuleSync.cxx:67
virtual void DoWorkerAfterMainLoop()
Internal - function executed after leaving main loop.
Definition: ModuleSync.cxx:350
virtual ~ModuleSync()
Destructor.
Definition: ModuleSync.cxx:31
virtual void ObjectCleanup()
Internal DABC method.
Definition: ModuleSync.cxx:248
void AsyncProcessCommands()
Internal - process commands which are submitted to sync queue.
Definition: ModuleSync.cxx:260
virtual int PreviewCommand(Command cmd)
Internal - preview command before execution.
Definition: ModuleSync.cxx:195
uint16_t WaitEvent(double timeout=-1)
Waits for any event.
Definition: ModuleSync.cxx:186
Buffer Recv(unsigned indx=0, double timeout=-1)
Receive buffer from input port.
Definition: ModuleSync.cxx:91
void StopUntilRestart()
Call this method from main loop if one want suspend of module execution until new Start of the module...
Definition: ModuleSync.cxx:234
Base for dabc::ModuleSync and dabc::ModuleAsync classes.
Definition: Module.h:42
virtual void ObjectCleanup()
Inherited method, called during module destroy.
Definition: Module.cxx:536
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
Definition: Module.cxx:311
friend class ModuleSync
Definition: Module.h:48
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Object.cxx:1076
Output port.
Definition: Port.h:301
bool CanSend() const
Returns true if user can send get buffer via the port.
Definition: Port.h:325
bool Send(dabc::Buffer &buf)
Definition: Port.cxx:347
Handle for pool connection.
Definition: Port.h:347
bool CanTakeBuffer() const
Definition: Port.h:388
Buffer TakeBuffer(BufferSize_t size=0)
Definition: Port.cxx:416
Reference on the dabc::Port class
Definition: Port.h:195
Base class for input and output ports.
Definition: Port.h:47
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Reference.cxx:241
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DOUT4(args ...)
Definition: logging.h:182
@ evntPortDisconnect
Definition: ModuleItem.h:47
@ evntInput
Definition: ModuleItem.h:41
@ evntPortConnect
Definition: ModuleItem.h:46
@ evntPortError
Definition: ModuleItem.h:48
@ evntOutput
Definition: ModuleItem.h:42
@ evntModuleNone
Definition: ModuleItem.h:40
@ ex_Connect
Definition: Exception.h:33
@ ex_Stop
Definition: Exception.h:29
@ ex_Disconnect
Definition: Exception.h:34
@ ex_Timeout
Definition: Exception.h:30
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_ignore
Definition: Command.h:41
@ cmd_true
Definition: Command.h:38
Class for acquiring and holding timestamps.
Definition: timing.h:40
bool null() const
Returns true if time stamp is not initialized or its value less than 0.
Definition: timing.h:131
void Reset()
Set time stamp value to null.
Definition: timing.h:134
void GetNow()
Method to acquire current time stamp.
Definition: timing.h:137