DABC (Data Acquisition Backbone Core)  2.9.9
LocalTransport.h
Go to the documentation of this file.
1 // $Id: LocalTransport.h 4210 2019-02-12 19:55:03Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #ifndef DABC_LocalTransport
17 #define DABC_LocalTransport
18 
19 #ifndef DABC_Worker
20 #include "dabc/Worker.h"
21 #endif
22 
23 #ifndef DABC_BuffersQueue
24 #include "dabc/BuffersQueue.h"
25 #endif
26 
27 
28 namespace dabc {
29 
30  class Port;
31  class LocalTransportRef;
32 
40  class LocalTransport : public Object {
41 
42  friend class Port;
43  friend class LocalTransportRef;
44 
45  protected:
46 
48  bool fWithMutex;
49 
51  unsigned fOutId;
53  unsigned fSignalOut;
54 
56  unsigned fInpId;
58  unsigned fSignalInp;
59 
60  unsigned fConnected;
61 
64 
65  enum { MaskInp = 0x1, MaskOut = 0x2, MaskConn = 0x3 };
66 
67  LocalTransport(unsigned capacity, bool withmutex);
68 
69  virtual ~LocalTransport();
70 
71  void SetConnected(bool isinp) { LockGuard lock(QueueMutex()); fConnected |= isinp ? MaskInp : MaskOut; }
72 
73  void ConfirmEvent(bool isoutput);
74 
75  void PortActivated(int itemkind, bool on);
76 
77  void CleanupQueue();
78 
79  bool IsConnected() const
80  {
81  LockGuard lock(QueueMutex());
82  return (fConnected == MaskConn);
83  }
84 
86  unsigned NumCanSend() const
87  {
88  LockGuard lock(QueueMutex());
89  if (!fQueue.Full()) return fQueue.Capacity() - fQueue.Size();
90  // when queue is full and transport in non-blocking state, one buffer can be add (oldest will be lost)
92  }
93 
97  bool CanSend() const { return NumCanSend() > 0; }
98 
99  bool Send(Buffer& buf);
100 
101  bool CanRecv() const { LockGuard lock(QueueMutex()); return !fQueue.Empty(); }
102 
103  bool Recv(Buffer& buf);
104 
105  void SignalWhenFull();
106 
107  inline Mutex* QueueMutex() const { return fWithMutex ? ObjectMutex() : 0; }
108 
109  inline void EnableMutex() { fWithMutex = true; }
110 
111  // no need for mutex - capacity is not changed until destructor call
112  unsigned Capacity() const { LockGuard lock(QueueMutex()); return fQueue.Capacity(); }
113 
114  unsigned Size() const { LockGuard lock(QueueMutex()); return fQueue.Size(); }
115 
117 
118  unsigned Full() const { LockGuard lock(QueueMutex()); return fQueue.Full(); }
119 
120  Buffer Item(unsigned indx) const { LockGuard lock(QueueMutex()); return fQueue.Item(indx); }
121 
122  void Disconnect(bool isinp, bool witherr = false);
123 
124  public:
125 
126  static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd = nullptr);
127  };
128 
129  // ________________________________________________________________________
130 
137  class LocalTransportRef : public Reference {
139 
140  bool IsConnected() const { return GetObject() ? GetObject()->IsConnected() : false; }
141 
142  unsigned NumCanSend() const { return GetObject() ? GetObject()->NumCanSend() : 1; }
143 
144  bool CanSend() const { return GetObject() ? GetObject()->CanSend() : true; }
145 
146  bool Send(Buffer& buf) { if (GetObject()) return GetObject()->Send(buf); buf.Release(); return true; }
147 
148  bool CanRecv() const { return GetObject() ? GetObject()->CanRecv() : false; }
149 
150  bool Recv(Buffer& buf) { return GetObject() ? GetObject()->Recv(buf) : false; }
151 
152  void SignalWhenFull() { if (GetObject()) GetObject()->SignalWhenFull(); }
153 
154  void Disconnect(bool isinp, bool witherr = false) { if (GetObject()) GetObject()->Disconnect(isinp, witherr); Release(); }
155 
156  void PortActivated(int itemkind, bool on) { if (GetObject()) GetObject()->PortActivated(itemkind, on); }
157 
158  void ConfirmEvent(bool isoutput) { if (GetObject()) GetObject()->ConfirmEvent(isoutput); }
159 
160  unsigned Size() const { return GetObject() ? GetObject()->Size() : 0; }
161 
162  BufferSize_t TotalBuffersSize() const { return GetObject() ? GetObject()->TotalBuffersSize() : 0; }
163 
164  Buffer Item(unsigned indx) const { return GetObject() ? GetObject()->Item(indx) : Buffer(); }
165 
166  bool Full() const { return GetObject() ? GetObject()->Full() : false; }
167 
168  bool SubmitCommandTo(bool to_input, Command cmd)
169  {
170  if (!GetObject()) return false;
171  return to_input ? GetObject()->fInp.Submit(cmd) : GetObject()->fOut.Submit(cmd);
172  }
173  };
174 
175 
176 }
177 
178 #endif
#define DABC_REFERENCE(RefClass, ParentClass, T)
Definition: Reference.h:222
Reference on memory from memory pool.
Definition: Buffer.h:135
Queue of buffers
Definition: BuffersQueue.h:39
bool Empty() const
Definition: BuffersQueue.h:85
BufferSize_t TotalBuffersSize() const
Definition: BuffersQueue.h:98
unsigned Size() const
Definition: BuffersQueue.h:79
unsigned Capacity() const
Definition: BuffersQueue.h:81
Buffer Item(unsigned n) const
Returns reference on the Buffer in the queue, one can create any kind of buffer copies from it.
Definition: BuffersQueue.h:93
bool Full() const
Definition: BuffersQueue.h:83
Represents command with its arguments.
Definition: Command.h:99
Reference on the dabc::LocalTransport
bool SubmitCommandTo(bool to_input, Command cmd)
void ConfirmEvent(bool isoutput)
unsigned NumCanSend() const
bool Send(Buffer &buf)
BufferSize_t TotalBuffersSize() const
void Disconnect(bool isinp, bool witherr=false)
Buffer Item(unsigned indx) const
unsigned Size() const
void PortActivated(int itemkind, bool on)
bool Recv(Buffer &buf)
Transport between two ports on the same node
void Disconnect(bool isinp, bool witherr=false)
bool fBlockWhenConnected
should queue block when input port connected, default true
bool Recv(Buffer &buf)
void SetConnected(bool isinp)
Buffer Item(unsigned indx) const
unsigned Full() const
bool CanSend() const
Returns true when send operation will add buffer into the queue When queue is not connected,...
bool fBlockWhenUnconnected
should queue block when input port not connected, default false
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
BufferSize_t TotalBuffersSize() const
bool IsConnected() const
unsigned Size() const
unsigned NumCanSend() const
How many buffers can be add to the queue.
unsigned Capacity() const
void ConfirmEvent(bool isoutput)
LocalTransport(unsigned capacity, bool withmutex)
Mutex * QueueMutex() const
void PortActivated(int itemkind, bool on)
bool Send(Buffer &buf)
Lock guard for posix mutex.
Definition: threads.h:127
posix pthread mutex
Definition: threads.h:61
Base class for most of the DABC classes.
Definition: Object.h:116
Mutex * ObjectMutex() const
Returns mutex, used for protection of Object data members.
Definition: Object.h:190
Base class for input and output ports.
Definition: Port.h:47
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
Object * GetObject() const
Return pointer on the object.
Definition: Reference.h:129
Reference on dabc::Worker
Definition: Worker.h:466
Event manipulation API.
Definition: api.h:23
uint32_t BufferSize_t
Definition: Buffer.h:32