DABC (Data Acquisition Backbone Core)  2.9.9
Port.h
Go to the documentation of this file.
1 // $Id: Port.h 4694 2021-02-23 13:54:37Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #ifndef DABC_Port
17 #define DABC_Port
18 
19 #ifndef DABC_ModuleItem
20 #include "dabc/ModuleItem.h"
21 #endif
22 
23 #ifndef DABC_LocalTransport
24 #include "dabc/LocalTransport.h"
25 #endif
26 
27 #ifndef DABC_ConnectionRequest
28 #include "dabc/ConnectionRequest.h"
29 #endif
30 
31 namespace dabc {
32 
33  class PortRef;
34  class Port;
35  class Module;
36  class ModuleAsync;
37  class ModuleSync;
38  class ConnTimer;
39 
47  class Port : public ModuleItem {
48 
49  friend class Module;
50  friend class ModuleAsync;
51  friend class ModuleSync;
52  friend class PortRef;
53  friend class LocalTransport;
54  friend class ConnTimer;
55 
56  public:
57  // TODO: provide meaningful names
59  SignalNone = 0, // port will not produce any events
60  SignalConfirm, // next event can be produced when previous event is received, confirmed and
61  // event consumer performed next operation
62  SignalOperation, // next event can be produced when event consumer performs next operation
63  SignalEvery // every operation will produce event
64  };
65 
66  protected:
67  unsigned fQueueCapacity;
71  std::string fBindName;
72  std::string fRateName;
73  unsigned fMaxLoopLength;
77  std::string fOnError;
78 
79 
81  virtual void ObjectCleanup();
82 
83  virtual void DoStart();
84  virtual void DoStop();
85  virtual void DoCleanup();
86 
87  Port(int kind, Reference parent,
88  const std::string &name,
89  unsigned queuesize);
90  virtual ~Port();
91 
92  void SetQueue(Reference& ref);
93 
94  inline void ConfirmEvent()
96 
99  int GetMaxLoopLength();
100 
102  void ReadPortConfiguration();
103 
105  virtual unsigned NumStartEvents() { return 0; }
106 
109  ConnectionRequest GetConnReq(bool force = false);
110 
112  void RemoveConnReq();
113 
115  unsigned QueueCapacity() const;
116 
117  void Disconnect(bool witherr = false) { fQueue.Disconnect(IsInput(), witherr); }
118 
120  void SetBindName(const std::string &name);
121 
123  std::string GetBindName() const;
124 
126  bool IsConnected() const { return fQueue.IsConnected(); }
127 
130  void ConfigureReconnect(double period = -1, int numtry = -1)
131  {
132  fReconnectPeriod = period;
133  fReconnectLimit = numtry;
134  if ((fReconnectPeriod <= 0) && (fReconnectLimit >= 0)) fReconnectPeriod = 1.; else
135  // if ((fReconnectPeriod > 0) && (fReconnectLimit == -1)) fReconnectLimit = 10; else
136  if (fReconnectPeriod <= 0) SetDoingReconnect(false);
137  }
138 
140  bool TryNextReconnect(bool caused_by_error, bool can_disconnect = true);
141 
142  double GetReconnectPeriod() const { return fReconnectPeriod; }
143 
145  bool IsDoingReconnect() const { return fDoingReconnect; }
146 
147  void SetDoingReconnect(bool on = true) { fDoingReconnect = on; }
148 
150  void SetRateMeter(const Parameter& ref);
151 
153  void SetMaxLoopLength(unsigned cnt) { fMaxLoopLength = cnt; }
154 
156  bool SetSignaling(EventsProducing kind);
157 
160 
163 
174  void ConfigureOnError(const std::string &action = "") { fOnError = action; }
175 
176  public:
177 
178  virtual const char* ClassName() const { return "Port"; }
179 
180  virtual bool IsInput() const { return false; }
181  virtual bool IsOutput() const { return false; }
182  };
183 
184  // __________________________________________________________________________
185 
186 
195  class PortRef : public ModuleItemRef {
197 
198 
199  bool IsInput() const { return GetObject() ? GetObject()->IsInput() : false; }
200 
202  bool IsOutput() const { return GetObject() ? GetObject()->IsOutput() : false; }
203 
205  unsigned QueueCapacity() const { return GetObject() ? GetObject()->QueueCapacity() : 0; }
206 
208  int GetSignalingKind();
209 
211  bool IsConnected();
212 
214  bool Disconnect(bool witherr = false);
215 
218 
220  void ConfigureOnError(const std::string &action = "")
221  { if (GetObject()) GetObject()->ConfigureOnError(action); }
222 
224  void ConfigureReconnect(double period = 1., int numtry = -1)
225  { if (GetObject()) GetObject()->ConfigureReconnect(period, numtry); }
226 
232  ConnectionRequest MakeConnReq(const std::string &url, bool isserver = false);
233  };
234 
235 
236  // =====================================================================================
237 
244  class InputPort : public Port {
245 
246  friend class Module;
247  friend class ModuleAsync;
248  friend class ModuleSync;
249 
250  private:
251  InputPort(Reference parent,
252  const std::string &name,
253  unsigned queuesize);
254 
255  protected:
256 
257  virtual ~InputPort();
258 
259  virtual unsigned NumStartEvents();
260 
262  inline unsigned NumCanRecv() { return fQueue.Size(); }
263 
266 
268  bool QueueFull() { return fQueue.Full(); }
269 
270  Buffer Item(unsigned indx) { return fQueue.Item(indx); }
271 
273  inline bool CanRecv() const { return fQueue.CanRecv(); }
274 
275  //Buffer Recv() { Buffer buf; fQueue.Recv(buf); fRate.SetDouble(buf.GetTotalSize()/1024./1024.); return buf; }
276 
277  Buffer Recv();
278 
281 
283  bool SkipBuffers(unsigned cnt=1);
284 
285  public:
286 
287  virtual const char* ClassName() const { return "InputPort"; }
288 
289  virtual bool IsInput() const { return true; }
290  };
291 
292 
293  // =======================================================================================
294 
301  class OutputPort : public Port {
302 
303  friend class Module;
304  friend class ModuleAsync;
305  friend class ModuleSync;
306 
307  private:
308 
309  bool fSendallFlag; // flag, used by SendToAllOutputs to mark, which output must be used for sending
310 
311  OutputPort(Reference parent,
312  const std::string &name,
313  unsigned queuesize);
314 
315  protected:
316 
317  virtual ~OutputPort();
318 
319  virtual unsigned NumStartEvents();
320 
322  unsigned NumCanSend() const { return fQueue.NumCanSend(); }
323 
325  bool CanSend() const { return fQueue.CanSend(); }
326 
327 // bool Send(dabc::Buffer& buf) { fRate.SetDouble(buf.GetTotalSize()/1024./1024.); return fQueue.Send(buf); }
328 
329  bool Send(dabc::Buffer& buf);
330 
331  public:
332 
333  virtual const char* ClassName() const { return "OutputPort"; }
334 
335  virtual bool IsOutput() const { return true; }
336 
337  };
338 
339  // =======================================================================================
340 
347  class PoolHandle : public Port {
348 
349  friend class Module;
350  friend class ModuleAsync;
351  friend class ModuleSync;
352 
353  private:
354 
356 
357  PoolHandle(Reference parent,
358  Reference pool,
359  const std::string &name,
360  unsigned queuesize);
361 
362  protected:
363 
364 
365  virtual ~PoolHandle();
366 
367  virtual void DoCleanup()
368  {
369  fPool.Release();
370  Port::DoCleanup();
371  }
372 
373  virtual void ObjectCleanup()
374  {
375  fPool.Release();
377  }
378 
379  virtual unsigned NumStartEvents();
380 
381  // inline MemoryPool* Pool() const { return fPool(); }
382 
384  inline unsigned NumCanTake() const { return fQueue.Size(); }
385 
386  inline Buffer Item(unsigned indx) { return fQueue.Item(indx); }
387 
388  inline bool CanTakeBuffer() const
389  {
390  return (QueueCapacity()==0) ? true : fQueue.CanRecv();
391  }
392 
393  Buffer TakeBuffer(BufferSize_t size = 0);
394 
395  unsigned NumRequestedBuffer() const
396  {
397  return fQueue.Size();
398  }
399 
401  {
402  Buffer buf; fQueue.Recv(buf); return buf;
403  }
404 
405  public:
406 
407  virtual const char* ClassName() const { return "PoolHandle"; }
408 
409  virtual bool IsInput() const { return true; }
410  };
411 
412 }
413 
414 #endif
#define DABC_REFERENCE(RefClass, ParentClass, T)
Definition: Reference.h:222
Reference on memory from memory pool.
Definition: Buffer.h:135
Represents command with its arguments.
Definition: Command.h:99
Special timer to reestablish port connections in the module.
Definition: ModuleItem.h:197
Connection request.
Input port.
Definition: Port.h:244
unsigned NumCanRecv()
Defines how many buffers can be received.
Definition: Port.h:262
virtual bool IsInput() const
Definition: Port.h:289
Buffer Item(unsigned indx)
Definition: Port.h:270
Buffer Recv()
Definition: Port.cxx:320
bool CanRecv() const
Returns true if user can get (receive) buffer from the port.
Definition: Port.h:273
void SignalWhenFull()
This method say framework that signal must be issued when queue will be fulled.
Definition: Port.h:280
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Port.h:287
bool SkipBuffers(unsigned cnt=1)
Remove buffer from the input queue.
Definition: Port.cxx:294
bool QueueFull()
Returns true, when input queue is full and cannot get more buffers.
Definition: Port.h:268
InputPort(Reference parent, const std::string &name, unsigned queuesize)
Definition: Port.cxx:279
virtual ~InputPort()
Definition: Port.cxx:289
BufferSize_t TotalSizeCanRecv()
Defines how many buffers can be received.
Definition: Port.h:265
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
Definition: Port.cxx:306
Reference on the dabc::LocalTransport
void ConfirmEvent(bool isoutput)
unsigned NumCanSend() const
BufferSize_t TotalBuffersSize() const
void Disconnect(bool isinp, bool witherr=false)
Buffer Item(unsigned indx) const
unsigned Size() const
bool Recv(Buffer &buf)
Transport between two ports on the same node
Base class for user-derived code, implementing event-processing.
Definition: ModuleAsync.h:32
Reference on dabc::ModuleItem class
Definition: ModuleItem.h:116
Base class for module items like ports, timers, pool handles.
Definition: ModuleItem.h:68
int GetType() const
Definition: ModuleItem.h:103
Base class for user-derived code, implementing main loop.
Definition: ModuleSync.h:60
Base for dabc::ModuleSync and dabc::ModuleAsync classes.
Definition: Module.h:42
Output port.
Definition: Port.h:301
bool fSendallFlag
Definition: Port.h:309
OutputPort(Reference parent, const std::string &name, unsigned queuesize)
Definition: Port.cxx:331
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
Definition: Port.cxx:365
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Port.h:333
bool CanSend() const
Returns true if user can send get buffer via the port.
Definition: Port.h:325
bool Send(dabc::Buffer &buf)
Definition: Port.cxx:347
unsigned NumCanSend() const
Returns number of buffer which can be put to the queue.
Definition: Port.h:322
virtual bool IsOutput() const
Definition: Port.h:335
virtual ~OutputPort()
Definition: Port.cxx:341
Parameter class
Definition: Parameter.h:163
Handle for pool connection.
Definition: Port.h:347
virtual void ObjectCleanup()
Inherited method, should cleanup everything.
Definition: Port.h:373
virtual void DoCleanup()
Called when module object is cleaned up - should release all references if any.
Definition: Port.h:367
bool CanTakeBuffer() const
Definition: Port.h:388
Reference fPool
Definition: Port.h:355
virtual bool IsInput() const
Definition: Port.h:409
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
Definition: Port.cxx:401
Buffer TakeRequestedBuffer()
Definition: Port.h:400
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Port.h:407
unsigned NumCanTake() const
Defines how many buffers can be received.
Definition: Port.h:384
virtual ~PoolHandle()
Definition: Port.cxx:395
Buffer Item(unsigned indx)
Definition: Port.h:386
PoolHandle(Reference parent, Reference pool, const std::string &name, unsigned queuesize)
Definition: Port.cxx:383
unsigned NumRequestedBuffer() const
Definition: Port.h:395
Buffer TakeBuffer(BufferSize_t size=0)
Definition: Port.cxx:416
Reference on the dabc::Port class
Definition: Port.h:195
void ConfigureReconnect(double period=1., int numtry=-1)
Configure reconnect parameters
Definition: Port.h:224
void ConfigureOnError(const std::string &action="")
Configure action in case of error.
Definition: Port.h:220
bool IsOutput() const
Returns true if it is output port.
Definition: Port.h:202
bool IsConnected()
Returns true if port is connected.
Definition: Port.cxx:250
bool IsInput() const
Returns true if it is input port.
Definition: Port.h:199
int GetSignalingKind()
Returns signaling method configured for the port.
Definition: Port.cxx:223
PortRef GetBindPort()
Return reference on the bind port.
Definition: Port.cxx:242
bool Disconnect(bool witherr=false)
Disconnect port
Definition: Port.cxx:233
unsigned QueueCapacity() const
Returns queue capacity of the port.
Definition: Port.h:205
ConnectionRequest MakeConnReq(const std::string &url, bool isserver=false)
Create connection request to specified url.
Definition: Port.cxx:259
Base class for input and output ports.
Definition: Port.h:47
void SetRateMeter(const Parameter &ref)
Set port ratemeter - must be used from module thread.
Definition: Port.cxx:162
unsigned QueueCapacity() const
Method returns actual queue capacity of the port, object mutex is used.
Definition: Port.cxx:101
ConnectionRequest GetConnReq(bool force=false)
Return reference on existing request object.
Definition: Port.cxx:75
Port(int kind, Reference parent, const std::string &name, unsigned queuesize)
Definition: Port.cxx:24
bool fDoingReconnect
true if reconnection is now active
Definition: Port.h:76
bool SubmitCommandToTransport(Command cmd)
Submit command to connected transport.
Definition: Port.cxx:187
bool SetSignaling(EventsProducing kind)
Specifies how often port event will be produced.
Definition: Port.cxx:64
virtual bool IsInput() const
Definition: Port.h:180
void Disconnect(bool witherr=false)
Definition: Port.h:117
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
Definition: Port.h:105
int GetMaxLoopLength()
Return maximum number of events, which could be processed at once.
Definition: Port.cxx:173
void SetBindName(const std::string &name)
Set name of bind port - when input and output ports should use same transport.
Definition: Port.cxx:107
Parameter fRate
parameter for rate calculations
Definition: Port.h:68
bool TryNextReconnect(bool caused_by_error, bool can_disconnect=true)
Returns true when reconnection should be attempted.
Definition: Port.cxx:194
EventsProducing fSignal
which kinds of signals will be produced
Definition: Port.h:69
unsigned fQueueCapacity
configured capacity of the queue
Definition: Port.h:67
std::string fOnError
that to do in case of error
Definition: Port.h:77
double GetReconnectPeriod() const
Definition: Port.h:142
virtual bool IsOutput() const
Definition: Port.h:181
virtual void DoStop()
Definition: Port.cxx:138
bool IsConnected() const
Method can only be used from thread itself.
Definition: Port.h:126
void ConfigureReconnect(double period=-1, int numtry=-1)
Specify reconnect period and number of reconnect tries If both negative, disable reconnect.
Definition: Port.h:130
virtual void DoStart()
Definition: Port.cxx:133
void SetDoingReconnect(bool on=true)
Definition: Port.h:147
virtual ~Port()
Definition: Port.cxx:40
double fReconnectPeriod
defines how often reconnect for port should be tried, -1 disable reconnect
Definition: Port.h:74
void RemoveConnReq()
Remove connection request - it does not automatically means that port will be disconnected.
Definition: Port.cxx:96
int fReconnectLimit
number of reconnect attempts, default 10
Definition: Port.h:75
unsigned fMaxLoopLength
maximum length of single event-processing loop
Definition: Port.h:73
void ConfirmEvent()
Definition: Port.h:94
virtual void ObjectCleanup()
Inherited method, should cleanup everything.
Definition: Port.cxx:149
std::string GetBindName() const
Returns name of bind port.
Definition: Port.cxx:113
void SetMaxLoopLength(unsigned cnt)
Specifies how many events can be processed at once.
Definition: Port.h:153
LocalTransportRef fQueue
queue with buffers
Definition: Port.h:70
bool IsDoingReconnect() const
Return true if reconnection procedure started for the port.
Definition: Port.h:145
virtual void DoCleanup()
Called when module object is cleaned up - should release all references if any.
Definition: Port.cxx:143
void SetQueue(Reference &ref)
Definition: Port.cxx:120
EventsProducing SignalingKind() const
Returns configured signaling mode.
Definition: Port.h:159
std::string fBindName
name of bind port (when input and output connected to same transport)
Definition: Port.h:71
void ConfigureOnError(const std::string &action="")
Configure action in case of error.
Definition: Port.h:174
void ReadPortConfiguration()
Should be called from constructors of derived classes to read port configuration like queue and so on...
Definition: Port.cxx:46
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Port.h:178
std::string fRateName
name of rate parameter, which should be assigned to port
Definition: Port.h:72
EventsProducing
Definition: Port.h:58
@ SignalEvery
Definition: Port.h:63
@ SignalOperation
Definition: Port.h:62
@ SignalNone
Definition: Port.h:59
@ SignalConfirm
Definition: Port.h:60
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
Object * GetObject() const
Return pointer on the object.
Definition: Reference.h:129
Event manipulation API.
Definition: api.h:23
uint32_t BufferSize_t
Definition: Buffer.h:32
@ mitOutPort
Definition: ModuleItem.h:31