DABC (Data Acquisition Backbone Core)  2.9.9
api.cxx
Go to the documentation of this file.
1 // $Id: api.cxx 4766 2021-04-23 11:31:22Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "mbs/api.h"
17 
18 #include "dabc/api.h"
19 #include "dabc/Manager.h"
20 #include "dabc/Publisher.h"
21 
22 mbs::ReadoutModule::ReadoutModule(const std::string &name, dabc::Command cmd) :
23  dabc::ModuleAsync(name, cmd),
24  fIter(),
25  fLastNotFullTm(),
26  fCurBufTm(),
27  fCmd()
28 {
29  EnsurePorts(1, 0, dabc::xmlWorkPool);
30 
31  SetPortSignaling(InputName(), dabc::Port::SignalEvery);
32 
33  CreateTimer("SysTimer");
34 
35  SetAutoStop(false);
36 }
37 
39 {
40  if (cmd.IsName("NextBuffer")) {
41  // previous command not processed - cannot be
42  if (!fCmd.null()) return dabc::cmd_false;
43 
44  fCmd = cmd;
45  double tm = fCmd.TimeTillTimeout();
46 
47  if (CanRecv() || (tm<=0))
48  ProcessData();
49  else
50  ShootTimer(0, tm);
51 
53  }
54 
56 }
57 
59 {
60  // ignore input event as long as command is not specified
61  if (fCmd.null()) return;
62 
63  double maxage = fCmd.GetDouble("maxage", -1);
64 
65  int res = dabc::cmd_false;
66 
67  dabc::TimeStamp now = dabc::Now();
68 
69  bool cleanqueue = false;
70  if ((maxage>0) && InputQueueFull())
71  if (fLastNotFullTm.null() || (now - fLastNotFullTm > maxage)) {
72  cleanqueue = true;
73  }
74 
75  if (cleanqueue) fLastNotFullTm = now;
76 
77  while (CanRecv()) {
78  dabc::Buffer buf = Recv();
79 
80  fCurBufTm = now;
81 
82  // when EOF buffer received, return immediately stop
83  if (buf.GetTypeId() == dabc::mbt_EOF) {
84  fCmd.Reply(dabc::cmd_false);
85  return;
86  }
87 
88  // clean queue when it was full too long
89  // last buffer will be preserved
90  if (cleanqueue && CanRecv()) {
91  buf.Release();
92  continue;
93  }
94 
95  switch (buf.GetTypeId()) {
96  case dabc::mbt_EOF:
97  res = dabc::cmd_false;
98  break;
99  case mbs::mbt_MbsEvents:
100  res = cmd_bool(fIter.Reset(buf));
101  break;
102  default:
103  res = AcceptBuffer(buf);
104  break;
105  }
106 
107  break;
108  }
109 
110  fCmd.Reply(res);
111 }
112 
113 
115 {
116  if (!InputQueueFull()) fLastNotFullTm.GetNow();
117 
118  ProcessData();
119 }
120 
122 {
123  // if timeout happened, reply
124  ProcessData();
125 }
126 
128 {
129  if ((maxage <= 0) || fCurBufTm.null()) return true;
130  return !fCurBufTm.Expired(maxage);
131 }
132 
133 
134 // ===================================================================================
135 
136 mbs::ReadoutHandle mbs::ReadoutHandle::DoConnect(const std::string &url, const char* classname)
137 {
138  if (dabc::mgr.null()) {
140  dabc::CreateManager("dabc", -1);
141  }
142 
143  if (dabc::mgr.FindPool(dabc::xmlWorkPool).null()) {
144  if (!dabc::mgr.CreateMemoryPool(dabc::xmlWorkPool, 4*1024*1024, 40)) {
145  return 0;
146  }
147  }
148 
149  int cnt = 0;
150  std::string name;
151  do {
152  name = dabc::format("MbsReadout%d", cnt);
153  } while (!dabc::mgr.FindModule(name).null());
154 
155  mbs::ReadoutHandle mdl = dabc::mgr.CreateModule(classname, name);
156 
157  if (mdl.null()) return mdl;
158 
159  if (!dabc::mgr.CreateTransport(mdl.InputName(), url)) {
160  EOUT("Cannot create transport %s",url.c_str());
161  mdl.Release();
163  return 0;
164  }
165 
166  mdl.Start();
167 
168  return mdl;
169 }
170 
172 {
173  if (null()) return false;
174 
175  FindPort(InputName()).Disconnect();
176 
177  Stop();
178 
179  std::string name = GetName();
180  Release();
181  dabc::mgr.DeleteModule(name);
182 
183  // add timeout to let cleanup DABC sockets
184  dabc::Sleep(0.2);
185 
186  return true;
187 }
188 
189 
190 mbs::EventHeader* mbs::ReadoutHandle::NextEvent(double tmout, double maxage)
191 {
192  if (null()) return 0;
193 
194  bool intime = GetObject()->GetEventInTime(maxage);
195 
196  if (intime && GetObject()->fIter.NextEvent())
197  return GetObject()->fIter.evnt();
198 
199  dabc::Command cmd("NextBuffer");
200  // here maxage means cleanup of complete queue when queue was full for long time
201  cmd.SetDouble("maxage", 2.*maxage);
202 
203  if (!Execute(cmd, tmout)) return 0;
204 
205  if (GetObject()->fIter.NextEvent())
206  return GetObject()->fIter.evnt();
207 
208  return 0;
209 }
210 
212 {
213  return null() ? 0 : GetObject()->fIter.evnt();
214 }
215 
216 // ============================================================================================
217 
218 mbs::MonitorHandle mbs::MonitorHandle::Connect(const std::string &mbsnode, int cmdport, int logport, int statport)
219 {
220  if (dabc::mgr.null()) {
222  dabc::CreateManager("dabc", -1);
223  }
224 
225  int cnt = 0;
226  std::string name;
227  do {
228  name = dabc::format("MbsCtrl%d", cnt);
229  } while (!dabc::mgr.FindModule(name).null());
230 
231  dabc::CmdCreateModule cmd("mbs::Monitor", name);
232  cmd.SetStr("node", mbsnode);
233  cmd.SetInt("logger", logport);
234  cmd.SetInt("cmd", cmdport);
235  cmd.SetInt("stat", statport);
236  cmd.SetBool("publish", false);
237  cmd.SetBool("printf", true);
238 
239  if (!dabc::mgr.Execute(cmd)) return 0;
240 
242 
243  mdl.Start();
244 
245  return mdl;
246 }
247 
249 {
250  if (null()) return false;
251 
252  Execute("DeleteWorkers");
253 
254  Stop();
255 
256  std::string name = GetName();
257 
258  Release();
259 
260  dabc::mgr.DeleteModule(name);
261 
262  // add timeout to let cleanup DABC sockets
263  dabc::Sleep(0.2);
264 
265  return true;
266 }
267 
268 bool mbs::MonitorHandle::MbsCmd(const std::string &mbscmd, double tmout)
269 {
270  if (null()) return false;
271 
272  dabc::Command cmd = dabc::CmdHierarchyExec("CmdMbs");
273  cmd.SetStr("cmd", mbscmd);
274  cmd.SetTimeout(tmout);
275 
276  return Execute(cmd);
277 }
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned GetTypeId() const
Definition: Buffer.h:152
Command submitted to worker when item in hierarchy defined as DABC.Command and used to produce custom...
Definition: Publisher.h:47
Represents command with its arguments.
Definition: Command.h:99
double TimeTillTimeout(double extra=0.) const
Returns time which remains until command should be timed out.
Definition: Command.cxx:132
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetBool(const std::string &name, bool v)
Definition: Command.h:141
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
Definition: Command.cxx:108
bool SetDouble(const std::string &name, double v)
Definition: Command.h:144
ModuleRef CreateModule(const std::string &classname, const std::string &modulename, const std::string &thrdname="")
Definition: Manager.cxx:1981
bool DeleteModule(const std::string &name)
Definition: Manager.cxx:2043
ModuleRef FindModule(const std::string &name)
Definition: Manager.cxx:2018
bool CanRecv(unsigned indx=0) const
Method return true if recv from specified port can be done.
Definition: ModuleAsync.h:80
bool Start()
Definition: Module.h:287
std::string InputName(unsigned n=0, bool itemname=true)
Return item name of the input, can be used in connect command.
Definition: Module.cxx:1056
void ShootTimer(unsigned indx, double delay_sec=0.)
Definition: Module.h:187
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
@ SignalEvery
Definition: Port.h:63
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
bool Disconnect()
Release connection to the MBS node.
Definition: api.cxx:248
static MonitorHandle Connect(const std::string &mbsnode, int cmdport=6019, int logport=6007, int statport=6008)
Connect with MBS node.
Definition: api.cxx:218
bool MbsCmd(const std::string &cmd, double tmout=5.)
Execute MBS command.
Definition: api.cxx:268
Handle to organize readout of MBS data source.
Definition: api.h:74
mbs::EventHeader * GetEvent()
Get current event pointer.
Definition: api.cxx:211
bool null() const
Check if handle is initialized.
Definition: api.h:85
mbs::EventHeader * NextEvent(double tmout=1.0, double maxage=-1.)
Retrieve next event from the server One could specify timeout (how long one should wait for next even...
Definition: api.cxx:190
static ReadoutHandle DoConnect(const std::string &url, const char *classname)
Definition: api.cxx:136
bool Disconnect()
Disconnect from MBS server.
Definition: api.cxx:171
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Definition: api.cxx:38
dabc::Command fCmd
current nextbuffer cmd
Definition: api.h:50
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
Definition: api.cxx:121
bool GetEventInTime(double maxage)
Definition: api.cxx:127
ReadoutModule(const std::string &name, dabc::Command cmd)
Definition: api.cxx:22
void ProcessData()
Definition: api.cxx:58
virtual void ProcessInputEvent(unsigned port)
Method called by framework when input event is produced.
Definition: api.cxx:114
#define EOUT(args ...)
Definition: logging.h:150
Event manipulation API.
Definition: api.h:23
void Sleep(double tm)
Definition: timing.cxx:129
const char * xmlWorkPool
Definition: Object.cxx:46
bool CreateManager(const std::string &name, int cmd_port=-1)
Function should be used to create manager instance.
Definition: api.cxx:26
TimeStamp Now()
Definition: timing.h:260
void SetDebugLevel(int level=0)
Definition: logging.cxx:468
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
@ mbt_EOF
Definition: Buffer.h:45
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ mbt_MbsEvents
Definition: MbsTypeDefs.h:348
Class for acquiring and holding timestamps.
Definition: timing.h:40