DABC (Data Acquisition Backbone Core)  2.9.9
Port.cxx
Go to the documentation of this file.
1 // $Id: Port.cxx 4694 2021-02-23 13:54:37Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/Port.h"
17 
18 #include <cstdlib>
19 
20 #include "dabc/Manager.h"
21 #include "dabc/MemoryPool.h"
22 
23 
24 dabc::Port::Port(int kind, Reference parent, const std::string &name, unsigned queuesize) :
25  ModuleItem(kind, parent, name),
26  fQueueCapacity(queuesize),
27  fRate(),
28  fSignal(SignalConfirm),
29  fQueue(),
30  fBindName(),
31  fRateName(),
32  fMaxLoopLength(0),
33  fReconnectPeriod(-1),
34  fReconnectLimit(-1),
35  fDoingReconnect(false),
36  fOnError()
37 {
38 }
39 
41 {
42  DOUT3("PORT %s: destructor %p queue %p", GetName(), this, fQueue());
43 }
44 
45 
47 {
48  fQueueCapacity = Cfg(xmlQueueAttr).AsInt(fQueueCapacity);
49  fMaxLoopLength = Cfg(xmlLoopAttr).AsInt(fMaxLoopLength);
50  std::string signal = Cfg(xmlSignalAttr).AsStr();
51  if (signal == "none") fSignal = SignalNone; else
52  if ((signal == "confirm") || (signal == "normal")) fSignal = SignalConfirm; else
53  if (signal == "oper") fSignal = SignalConfirm; else
54  if (signal == "every") fSignal = SignalEvery;
55  fBindName = Cfg(xmlBindAttr).AsStr(fBindName);
56  fRateName = Cfg(xmlRateAttr).AsStr(fRateName);
57 
58  ConfigureOnError(Cfg("onerror").AsStr());
59 
60  ConfigureReconnect(Cfg(xmlReconnectAttr).AsDouble(-1.), Cfg(xmlNumReconnAttr).AsInt(-1));
61 }
62 
63 
65 {
66  if (IsConnected()) {
67  EOUT("Cannot change signaling kind with connected port!!!");
68  return false;
69  }
70 
71  fSignal = kind;
72  return true;
73 }
74 
76 {
78 
79  if (!req.null() || !force) return req;
80 
81  // at the moment when first connection request will be created,
82  // connection manager should be already there
83 
84  req = new dabc::ConnectionObject(this, dabc::mgr.ComposeAddress("",ItemName()));
85 
86  ConfigIO io(dabc::mgr()->cfg());
87 
88  io.ReadRecordField(this, dabc::ConnectionObject::ObjectName(), 0, &(req()->Fields()));
89 
90  req()->FireParEvent(parCreated);
91 
92  return req;
93 }
94 
95 
97 {
99 }
100 
102 {
103  dabc::LockGuard lock(ObjectMutex());
104  return fQueueCapacity;
105 }
106 
107 void dabc::Port::SetBindName(const std::string &name)
108 {
109  dabc::LockGuard lock(ObjectMutex());
110  fBindName = name;
111 }
112 
113 std::string dabc::Port::GetBindName() const
114 {
115  dabc::LockGuard lock(ObjectMutex());
116  return fBindName;
117 }
118 
119 
121 {
122  fQueue << ref;
123  if (fQueue.null()) return;
124 
125  fQueue()->SetConnected(IsInput());
126 
127  // change capacity field under mutex, while it can be accessed outside the thread
128  dabc::LockGuard lock(ObjectMutex());
129  fQueueCapacity = fQueue()->Capacity();
130 }
131 
132 
134 {
135  fQueue.PortActivated(GetType(), true);
136 }
137 
139 {
140  fQueue.PortActivated(GetType(), false);
141 }
142 
144 {
145  Disconnect();
146  fRate.Release();
147 }
148 
150 {
151  DOUT3("Port %s cleanup inp:%s out:%s", ItemName().c_str(), DBOOL(IsInput()), DBOOL(IsOutput()));
152 
153  // remove queue
154  Disconnect();
155 
156  fRate.Release();
157 
159 }
160 
161 
163 {
164  fRate = ref;
165 
166  if (fRate.GetUnits().empty())
167  fRate.SetUnits("MB");
168 
169  // TODO: do we need dependency on the rate parameter or it should remain until we release it
170  // dabc::mgr()->RegisterDependency(this, fInpRate());
171 }
172 
174 {
175  switch (SignalingKind()) {
176  case SignalNone:
177  return 0;
178  case SignalConfirm:
179  case SignalOperation:
180  return fMaxLoopLength ? fMaxLoopLength : QueueCapacity();
181  case Port::SignalEvery:
182  return -1;
183  }
184  return -1;
185 }
186 
188 {
189  if (IsInput()) return fQueue.SubmitCommandTo(false, cmd);
190  if (IsOutput()) return fQueue.SubmitCommandTo(true, cmd);
191  return false;
192 }
193 
194 bool dabc::Port::TryNextReconnect(bool caused_by_error, bool can_disconnect)
195 {
196 // if (!caused_by_error) return false;
197 
198  if ((fReconnectPeriod>0) && ((fReconnectLimit < 0) || (fReconnectLimit-- > 0))) return true;
199 
200  SetDoingReconnect(false);
201 
202  if (fOnError == "none") {
203  // do nothing
204  } else if (fOnError == "stop") {
205  DOUT0("Stop module %s due to error on port %s", DNAME(GetParent()), ItemName().c_str());
206  StopModule();
207  } else if (fOnError == "exit") {
208  DOUT0("Exit application due to error on port %s", ItemName().c_str());
210  } else if (fOnError == "abort") {
211  DOUT0("Abort application due to error on port %s", ItemName().c_str());
212  abort();
213  } else if (can_disconnect) {
214  Disconnect();
215  }
216 
217  return false;
218 }
219 
220 
221 // ================================================================
222 
224 {
225  if (GetObject()==0) return Port::SignalNone;
226  dabc::Command cmd("GetSignalingKind");
227  cmd.SetStr("Port", GetObject()->GetName());
228  if (GetModule().Execute(cmd) == cmd_true)
229  return cmd.GetInt("Kind");
230  return Port::SignalNone;
231 }
232 
233 bool dabc::PortRef::Disconnect(bool witherr)
234 {
235  if (GetObject()==0) return false;
236  dabc::Command cmd("DisconnectPort");
237  cmd.SetStr("Port", GetObject()->GetName());
238  cmd.SetBool("WithErr", witherr);
239  return GetModule().Execute(cmd) == cmd_true;
240 }
241 
243 {
244  std::string name;
245  if (GetObject()) name = GetObject()->GetBindName();
246  if (name.empty()) return 0;
247  return GetModule().FindChild(name.c_str());
248 }
249 
251 {
252  if (GetObject()==0) return false;
253  dabc::Command cmd("IsPortConnected");
254  cmd.SetStr("Port", GetObject()->GetName());
255  return GetModule().Execute(cmd) == cmd_true;
256 }
257 
258 
259 dabc::ConnectionRequest dabc::PortRef::MakeConnReq(const std::string &url, bool isserver)
260 {
262 
263  if (null() || GetModule().null()) return req;
264 
265  dabc::Command cmd("MakeConnReq");
266  cmd.SetStr("Port", GetObject()->GetName());
267  cmd.SetStr("Url", url);
268  cmd.SetBool("IsServer", isserver);
269 
270  if (GetModule().Execute(cmd) == cmd_true)
271  req = cmd.GetRef("ConnReq");
272 
273  return req;
274 }
275 
276 // ====================================================================================
277 
278 
280  const std::string &name,
281  unsigned queuesize) :
282  Port(dabc::mitInpPort, parent, name, queuesize)
283 {
284  // only can do it here, while in Port Cfg() cannot correctly locate InputPort as class
285 
287 }
288 
290 {
291  DOUT4("PORT: destructor %p", this);
292 }
293 
295 {
296  while (cnt>0) {
297  if (!CanRecv()) return false;
298  dabc::Buffer buf = Recv();
299  buf.Release();
300  cnt--;
301  }
302 
303  return true;
304 }
305 
307 {
308  switch (SignalingKind()) {
309  case SignalNone:
310  return 0;
311  case SignalConfirm:
312  case SignalOperation:
313  return CanRecv() ? 1 : 0;
314  case Port::SignalEvery:
315  return NumCanRecv();
316  }
317  return 0;
318 }
319 
321 {
322  Buffer buf;
323  fQueue.Recv(buf);
324  fRate.SetValue(buf.GetTotalSize()/1024./1024.);
325  return buf;
326 }
327 
328 
329 // ====================================================================================
330 
332  const std::string &name,
333  unsigned queuesize) :
334  Port(dabc::mitOutPort, parent, name, queuesize),
335  fSendallFlag(false)
336 {
337  // only can do it here, while in Port Cfg() cannot correctly locate OutputPort as class
339 }
340 
342 {
343  DOUT4("PORT: destructor %p", this);
344 }
345 
346 
348 {
349  fRate.SetValue(buf.GetTotalSize()/1024./1024.);
350 
351  bool res = fQueue.Send(buf);
352 
353  if (!buf.null() && !fQueue.null() && res)
354  EOUT("Should not happen queue %p buf %p", fQueue.GetObject(), buf.GetObject());
355 
356  if (!res) {
357  EOUT("PORT %s fail to send buffer %u", ItemName().c_str(), buf.GetTotalSize());
358  buf.Release();
359  }
360 
361  return res;
362 }
363 
364 
366 {
367  switch (SignalingKind()) {
368  case SignalNone:
369  return 0;
370  case SignalConfirm:
371  case SignalOperation:
372  return CanSend() ? 1 : 0;
373  case Port::SignalEvery:
374  return NumCanSend();
375  }
376  return 0;
377 }
378 
379 
380 // ====================================================================================
381 
382 
384  Reference pool,
385  const std::string &name,
386  unsigned queuesize) :
387  Port(dabc::mitPool, parent, name, queuesize),
388  fPool(pool)
389 {
390  // only can do it here, while in Port Cfg() cannot correctly locate PoolHandle as class
392 }
393 
394 
396 {
397  DOUT4("PoolHandle: destructor %p", this);
398 }
399 
400 
402 {
403  switch (SignalingKind()) {
404  case SignalNone:
405  return 0;
406  case SignalConfirm:
407  case SignalOperation:
408  return (NumRequestedBuffer() > 0) ? 1 : 0;
409  case Port::SignalEvery:
410  return NumRequestedBuffer();
411  }
412  return 0;
413 }
414 
415 
417 {
418  if (QueueCapacity()==0) return ((MemoryPool*)fPool())->TakeBuffer(size);
419 
420  return TakeRequestedBuffer();
421 }
422 
Reference on memory from memory pool.
Definition: Buffer.h:135
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetBool(const std::string &name, bool v)
Definition: Command.h:141
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
Definition: Command.cxx:175
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Interface class between xml configuration and dabc objects.
Definition: ConfigIO.h:38
bool ReadRecordField(Object *obj, const std::string &name, RecordField *field, RecordFieldsMap *fieldsmap)
Definition: ConfigIO.cxx:132
Container for connection parameters.
static const char * ObjectName()
Connection request.
Buffer Recv()
Definition: Port.cxx:320
bool SkipBuffers(unsigned cnt=1)
Remove buffer from the input queue.
Definition: Port.cxx:294
InputPort(Reference parent, const std::string &name, unsigned queuesize)
Definition: Port.cxx:279
virtual ~InputPort()
Definition: Port.cxx:289
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
Definition: Port.cxx:306
Lock guard for posix mutex.
Definition: threads.h:127
void StopApplication()
Definition: Manager.cxx:2231
Base class for module items like ports, timers, pool handles.
Definition: ModuleItem.h:68
OutputPort(Reference parent, const std::string &name, unsigned queuesize)
Definition: Port.cxx:331
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
Definition: Port.cxx:365
bool Send(dabc::Buffer &buf)
Definition: Port.cxx:347
virtual ~OutputPort()
Definition: Port.cxx:341
Parameter class
Definition: Parameter.h:163
Parameter & SetUnits(const std::string &unit)
Set units field of parameter.
Definition: Parameter.h:256
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
Definition: Port.cxx:401
virtual ~PoolHandle()
Definition: Port.cxx:395
PoolHandle(Reference parent, Reference pool, const std::string &name, unsigned queuesize)
Definition: Port.cxx:383
Buffer TakeBuffer(BufferSize_t size=0)
Definition: Port.cxx:416
Reference on the dabc::Port class
Definition: Port.h:195
bool IsConnected()
Returns true if port is connected.
Definition: Port.cxx:250
int GetSignalingKind()
Returns signaling method configured for the port.
Definition: Port.cxx:223
PortRef GetBindPort()
Return reference on the bind port.
Definition: Port.cxx:242
bool Disconnect(bool witherr=false)
Disconnect port
Definition: Port.cxx:233
ConnectionRequest MakeConnReq(const std::string &url, bool isserver=false)
Create connection request to specified url.
Definition: Port.cxx:259
Base class for input and output ports.
Definition: Port.h:47
void SetRateMeter(const Parameter &ref)
Set port ratemeter - must be used from module thread.
Definition: Port.cxx:162
unsigned QueueCapacity() const
Method returns actual queue capacity of the port, object mutex is used.
Definition: Port.cxx:101
ConnectionRequest GetConnReq(bool force=false)
Return reference on existing request object.
Definition: Port.cxx:75
Port(int kind, Reference parent, const std::string &name, unsigned queuesize)
Definition: Port.cxx:24
bool SubmitCommandToTransport(Command cmd)
Submit command to connected transport.
Definition: Port.cxx:187
bool SetSignaling(EventsProducing kind)
Specifies how often port event will be produced.
Definition: Port.cxx:64
int GetMaxLoopLength()
Return maximum number of events, which could be processed at once.
Definition: Port.cxx:173
void SetBindName(const std::string &name)
Set name of bind port - when input and output ports should use same transport.
Definition: Port.cxx:107
bool TryNextReconnect(bool caused_by_error, bool can_disconnect=true)
Returns true when reconnection should be attempted.
Definition: Port.cxx:194
virtual void DoStop()
Definition: Port.cxx:138
virtual void DoStart()
Definition: Port.cxx:133
virtual ~Port()
Definition: Port.cxx:40
void RemoveConnReq()
Remove connection request - it does not automatically means that port will be disconnected.
Definition: Port.cxx:96
virtual void ObjectCleanup()
Inherited method, should cleanup everything.
Definition: Port.cxx:149
std::string GetBindName() const
Returns name of bind port.
Definition: Port.cxx:113
virtual void DoCleanup()
Called when module object is cleaned up - should release all references if any.
Definition: Port.cxx:143
void SetQueue(Reference &ref)
Definition: Port.cxx:120
void ReadPortConfiguration()
Should be called from constructors of derived classes to read port configuration like queue and so on...
Definition: Port.cxx:46
EventsProducing
Definition: Port.h:58
@ SignalEvery
Definition: Port.h:63
@ SignalNone
Definition: Port.h:59
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
Object * GetObject() const
Return pointer on the object.
Definition: Reference.h:129
Reference FindChild(const char *name) const
Searches for child in referenced object.
Definition: Reference.cxx:210
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
virtual void ObjectCleanup()
Central cleanup method for worker.
Definition: Worker.cxx:238
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DNAME(arg)
Definition: logging.h:193
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
Definition: XmlEngine.cxx:917
Event manipulation API.
Definition: api.h:23
uint32_t BufferSize_t
Definition: Buffer.h:32
@ mitOutPort
Definition: ModuleItem.h:31
@ mitPool
Definition: ModuleItem.h:32
@ mitInpPort
Definition: ModuleItem.h:30
const char * xmlRateAttr
Definition: Object.cxx:39
ManagerRef mgr
Definition: Manager.cxx:42
@ parCreated
produced once when parameter is created
Definition: Parameter.h:41
const char * xmlNumReconnAttr
Definition: ConfigBase.cxx:45
const char * xmlReconnectAttr
Definition: ConfigBase.cxx:44
const char * xmlSignalAttr
Definition: Object.cxx:38
const char * xmlLoopAttr
Definition: Object.cxx:40
const char * xmlBindAttr
Definition: Object.cxx:37
const char * xmlQueueAttr
Definition: Object.cxx:36
@ cmd_true
Definition: Command.h:38