21 fDisconnectExcept(false),
27 fInsideMainLoop(false)
36 EOUT(
"Problem in sync module %s destructor - cannot leave normally main loop, must crash :-O", GetName());
39 if (fNewCommands!=0) {
40 EOUT(
"Some commands remain event in module %s destructor - BAD", GetName());
50 if (port.
null())
return false;
56 if (port()->IsConnected())
return true;
61 }
while (WaitItemEvent(timeout, port(), &evid));
71 if ((port==0) || buf.
null())
return false;
79 if (IsDisconnectExcept())
83 return port->
Send(buf);
85 }
while (WaitItemEvent(timeout, port, &evid));
95 if (port==0)
return Buffer();
101 if (IsDisconnectExcept())
107 }
while (WaitItemEvent(timeout, port, &evid));
115 if (handle==0)
return (poolindx==0) ? TakeDfltBuffer() :
Buffer();
120 }
while (WaitItemEvent(timeout, handle));
134 if (IsDisconnectExcept())
137 if (NumInputs() == 0)
return Buffer();
144 for (
unsigned n=0; n < NumInputs(); n++) {
145 InputPort* p = fInputs[(n+shift) % NumInputs()];
147 if (indx) *indx = p->
fSubId;
151 }
while (WaitItemEvent(timeout, 0, &evid, &resitem));
161 if (port==0)
return false;
167 if (IsDisconnectExcept())
170 if (port->
NumCanRecv() >= minqueuesize)
return true;
171 }
while (WaitItemEvent(timeout, port, &evid));
178 AsyncProcessCommands();
180 if (!SingleLoop(timeout))
190 if (!WaitItemEvent(timeout, 0, &evid)) evid = 0;
197 if (cmd.
IsName(
"StartModule")) {
201 if (fInsideMainLoop)
return cmd_bool(DoStart());
203 return cmd_bool(ActivateMainLoop());
205 if (cmd.
IsName(
"StopModule")) {
208 if (!fInsideMainLoop) {
209 EOUT(
"Something wrong, module %s runs without main loop ????", GetName());
213 AsyncProcessCommands();
215 return cmd_bool(DoStop());
222 if (!fSyncCommands && (cmd_res==
cmd_ignore) && IsRunning()) {
227 fNewCommands->Push(cmd);
238 DOUT1(
"Stop module %s until restart", GetName());
242 WaitItemEvent(tmout);
244 DOUT1(
"Finish StopUntilRestart for module %s", GetName());
250 if (fNewCommands!=0) {
251 EOUT(
"Some commands remain even when module %s is cleaned up - BAD", GetName());
252 AsyncProcessCommands();
255 DOUT4(
"ModuleSync::ObjectCleanup %s", GetName());
262 if (fNewCommands==0)
return;
264 while (fNewCommands->Size()>0) {
265 Command cmd = fNewCommands->Pop();
266 int cmd_res = ExecuteCommand(cmd);
267 if (cmd_res>=0) cmd.
Reply(cmd_res);
277 ((
Port*) item)->ConfirmEvent();
280 if (fWaitRes)
return;
282 if (item==0) {
EOUT(
"Zero item !!!!!!!!!!!!!"); }
284 if ((fWaitItem==item) || (fWaitItem==0)) {
293 if (tmout<0)
return false;
302 while (!fWaitRes || !IsRunning()) {
305 if ((tmout>=0) && IsRunning()) {
308 if (!last_tm.
null()) {
309 tmout -= (tm - last_tm);
325 if (!SingleLoop(tmout))
throw Exception(
ex_Stop,
"Module stopped when waiting for", ItemName());
328 if (resevid!=0) *resevid = fWaitId;
329 if (resitem!=0) *resitem = fWaitItem;
339 fInsideMainLoop =
true;
343 fInsideMainLoop =
false;
345 fInsideMainLoop =
false;
352 fInsideMainLoop =
false;
355 AsyncProcessCommands();
359 DOUT3(
"Stop sync module %s", GetName());
Reference on memory from memory pool.
Represents command with its arguments.
void Reply(int res=cmd_noresult)
Replied on the command.
@ kindSubmit
command submitted for execution
Base class for module items like ports, timers, pool handles.
bool WaitItemEvent(double &tmout, ModuleItem *item=0, uint16_t *resevid=0, ModuleItem **resitem=0)
Buffer RecvFromAny(unsigned *port=0, double timeout=-1)
Receive buffer from any input port.
Buffer TakeBuffer(unsigned pool=0, double timeout=-1)
Take buffer from memory pool.
bool ModuleWorking(double timeout=0.)
Returns true when module in running state.
bool WaitConnect(const std::string &name, double timeout=-1)
Waits for connection for specified port.
virtual void DoWorkerMainLoop()
Internal - entrance function for main loop execution.
virtual void ProcessItemEvent(ModuleItem *item, uint16_t evid)
Internal - central method of events processing.
bool WaitInput(unsigned indx, unsigned minqueuesize=1, double timeout=-1)
Waits until input port has specified number of buffers in the queue.
bool Send(unsigned indx, Buffer &buf, double timeout=-1)
Send buffer via specified output port.
virtual void DoWorkerAfterMainLoop()
Internal - function executed after leaving main loop.
virtual ~ModuleSync()
Destructor.
virtual void ObjectCleanup()
Internal DABC method.
void AsyncProcessCommands()
Internal - process commands which are submitted to sync queue.
virtual int PreviewCommand(Command cmd)
Internal - preview command before execution.
uint16_t WaitEvent(double timeout=-1)
Waits for any event.
Buffer Recv(unsigned indx=0, double timeout=-1)
Receive buffer from input port.
void StopUntilRestart()
Call this method from main loop if one want suspend of module execution until new Start of the module...
Base for dabc::ModuleSync and dabc::ModuleAsync classes.
virtual void ObjectCleanup()
Inherited method, called during module destroy.
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
bool CanSend() const
Returns true if user can send get buffer via the port.
bool Send(dabc::Buffer &buf)
Handle for pool connection.
bool CanTakeBuffer() const
Buffer TakeBuffer(BufferSize_t size=0)
Reference on the dabc::Port class
Base class for input and output ports.
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Class for acquiring and holding timestamps.
bool null() const
Returns true if time stamp is not initialized or its value less than 0.
void Reset()
Set time stamp value to null.
void GetNow()
Method to acquire current time stamp.