DABC (Data Acquisition Backbone Core)  2.9.9
DataTransport.h
Go to the documentation of this file.
1 // $Id: DataTransport.h 4470 2020-04-15 12:58:02Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #ifndef DABC_DataTransport
17 #define DABC_DataTransport
18 
19 #ifndef DABC_Transport
20 #include "dabc/Transport.h"
21 #endif
22 
23 #ifndef DABC_MemoryPool
24 #include "dabc/MemoryPool.h"
25 #endif
26 
27 #ifndef DABC_DataIO
28 #include "dabc/DataIO.h"
29 #endif
30 
31 namespace dabc {
32 
41  class CmdDataInputClosed : public Command {
42  DABC_COMMAND(CmdDataInputClosed, "DataInputClosed");
43  };
44 
45  class CmdDataInputFailed : public Command {
46  DABC_COMMAND(CmdDataInputFailed, "DataInputFailed");
47  };
48 
49 
50  class InputTransport : public Transport {
51 
52  // enum EDataEvents { evCallBack = evntModuleLast };
53 
54  enum EInputStates {
56  inpInitTimeout, // waiting timeout read_size
58  inpSizeCallBack, // wait for call-back with buffer size
59  inpCheckSize, // in this state one should check return size argument
61  inpWaitBuffer, // in such state we are waiting for the buffer be delivered
62  inpCheckBuffer, // here size of buffer will be checked
63  inpHasBuffer, // buffer is ready for use
64  inpCallBack, // in this mode transport waits for call-back
65  inpCompleting, // one need to complete operation
66  inpComplitTimeout,// waiting timeout after Read_Complete
69  inpEnd, // at such state we need to generate EOF buffer and close input
70  inpReconnect, // reconnection state - transport tries to recreate input object
71  inpClosed
72  };
73 
74  protected:
75 
77  bool fInputOwner;
80  unsigned fNextDataSize;
81  unsigned fPoolChangeCounter;
83  unsigned fExtraBufs;
85  std::string fReconnect;
87 
88 
90  void RequestPoolMonitoring();
91 
92  virtual bool StartTransport();
93  virtual bool StopTransport();
94 
95  void ChangeState(EInputStates state);
96 
99  return (fInpState == inpInit) ||
100  (fInpState == inpBegin) ||
101  (fInpState == inpReady) ||
102  (fInpState == inpError) ||
103  (fInpState == inpClosed);
104  }
105 
106  void CloseInput();
107 
108  virtual void TransportCleanup();
109 
110  virtual bool ProcessSend(unsigned port);
111 
112  virtual bool ProcessBuffer(unsigned pool);
113 
114  virtual void ProcessTimerEvent(unsigned timer);
115 
116  virtual int ExecuteCommand(Command cmd);
117 
118  public:
119 
120  InputTransport(dabc::Command cmd, const PortRef& inpport, DataInput* inp = 0, bool owner = false);
121  virtual ~InputTransport();
122 
124  void SetDataInput(DataInput* inp, bool owner);
125 
127  void EnableReconnect(const std::string &reconn);
128 
129  // in implementation user can get informed when something changed in the memory pool
130  virtual void ProcessPoolChanged(MemoryPool* pool) {}
131 
132  // This method MUST be called by transport, when Read_Start returns di_CallBack
133  // It is only way to "restart" event loop in the transport
134  void Read_CallBack(unsigned compl_res = di_Ok);
135  };
136 
137 // ======================================================================================
138 
146  class OutputTransport : public Transport {
147 
149 
151  outReady, // initial state, next buffer can be written
152  outInitTimeout, // state when timeout should be completed before next check can be done
153  outWaitCallback, // when waiting callback to inform when writing can be started
154  outStartWriting, // we can apply buffer for start writing
155  outWaitFinishCallback, // when waiting when buffer writing is finished
158  outClosing, // closing transport
160  outRetry // waiting for retry
161  };
162 
163  protected:
164 
167 
171  double fRetryPeriod;
172 
173  void SetDataOutput(DataOutput* out, bool owner);
174 
175  void CloseOutput();
176 
179  {
180  return (fOutState == outReady) ||
181  (fOutState == outError) ||
182  (fOutState == outClosed);
183  }
184 
186  std::string StateAsStr() const
187  {
188  switch (fOutState) {
189  case outReady: return "Ready";
190  case outInitTimeout: return "InitTimeout";
191  case outWaitCallback: return "WaitCallback";
192  case outStartWriting: return "StartWriting";
193  case outWaitFinishCallback: return "WaitFinishCallback";
194  case outFinishWriting: return "FinishWriting";
195  case outError: return "Error";
196  case outClosing: return "Closing";
197  case outClosed: return "Closed";
198  case outRetry: return "Retry";
199  }
200  return "undefined";
201  }
202 
203  void ChangeState(EOutputStates state);
204 
205  void CloseOnError();
206 
207  virtual bool StartTransport();
208  virtual bool StopTransport();
209 
210  virtual void TransportCleanup();
211 
212  virtual void ProcessEvent(const EventId&);
213 
214  virtual bool ProcessRecv(unsigned port);
215  virtual void ProcessTimerEvent(unsigned);
216 
217  virtual int ExecuteCommand(dabc::Command cmd);
218 
219  public:
220 
221  OutputTransport(dabc::Command cmd, const PortRef& outport, DataOutput* out, bool owner);
222  virtual ~OutputTransport();
223 
224  void Write_CallBack(unsigned arg) { FireEvent(evCallBack, arg); }
225  };
226 
227 };
228 
229 #endif
Reference on memory from memory pool.
Definition: Buffer.h:135
Base class for input transport implementations.
Definition: DataTransport.h:41
DABC_COMMAND(CmdDataInputClosed, "DataInputClosed")
DABC_COMMAND(CmdDataInputFailed, "DataInputFailed")
Represents command with its arguments.
Definition: Command.h:99
Interface for implementing any kind of data input.
Definition: DataIO.h:61
Interface for implementing any kind of data output.
Definition: DataIO.h:158
Buffer fCurrentBuf
currently used buffer
Definition: DataTransport.h:79
void EnableReconnect(const std::string &reconn)
Set URL, use to reconnect data input.
virtual void TransportCleanup()
void Read_CallBack(unsigned compl_res=di_Ok)
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
unsigned fExtraBufs
number of extra buffers provided to the transport addon
Definition: DataTransport.h:83
void ChangeState(EInputStates state)
void RequestPoolMonitoring()
Method can be used in custom transport to start pool monitoring.
unsigned fNextDataSize
indicate that input has data, but there is no buffer of required size
Definition: DataTransport.h:80
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
void SetDataInput(DataInput *inp, bool owner)
Assign input object, set addon if exists.
std::string fReconnect
when specified, tried to reconnect
Definition: DataTransport.h:85
DataInput * fInput
input object
Definition: DataTransport.h:76
virtual void ProcessPoolChanged(MemoryPool *pool)
EInputStates fInpState
state of transport
Definition: DataTransport.h:78
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual bool StopTransport()
unsigned fPoolChangeCounter
Definition: DataTransport.h:81
bool fInputOwner
if true, fInput object must be destroyed
Definition: DataTransport.h:77
bool fActivateWorkaround
special flag for hadaq transport
Definition: DataTransport.h:84
InputTransport(dabc::Command cmd, const PortRef &inpport, DataInput *inp=0, bool owner=false)
virtual bool ProcessSend(unsigned port)
Method called by framework when at least one buffer can be send to output port.
virtual bool ProcessBuffer(unsigned pool)
Method called by framework when at least one buffer available in pool handle.
bool fStopRequested
if true transport will be stopped when next suitable state is achieved
Definition: DataTransport.h:86
MemoryPoolRef fPoolRef
Definition: DataTransport.h:82
bool SuitableStateForStartStop()
Returns true if state consider to be suitable to stop transport.
Definition: DataTransport.h:98
Reference on dabc::MemoryPool class
Definition: MemoryPool.h:245
Base class for output transport implementations.
void ChangeState(EOutputStates state)
bool SuitableStateForStartStop()
Returns true if state consider to be suitable to stop transport.
Buffer fCurrentBuf
currently used buffer
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
std::string StateAsStr() const
Returns state in string form.
void SetDataOutput(DataOutput *out, bool owner)
virtual void ProcessEvent(const EventId &)
bool fStopRequested
if true transport will be stopped when next suitable state is achieved
virtual void TransportCleanup()
double fRetryPeriod
if retry option enabled, transport will try to reinit output
OutputTransport(dabc::Command cmd, const PortRef &outport, DataOutput *out, bool owner)
EOutputStates fOutState
void Write_CallBack(unsigned arg)
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual bool ProcessRecv(unsigned port)
Method called by framework when at least one buffer available in input port.
virtual bool StopTransport()
virtual void ProcessTimerEvent(unsigned)
Method called by framework when timer event is produced.
Reference on the dabc::Port class
Definition: Port.h:195
Base class for transport implementations.
Definition: Transport.h:37
bool FireEvent(uint16_t evid)
Definition: Worker.h:341
Event manipulation API.
Definition: api.h:23
@ evntModuleLast
Definition: ModuleItem.h:51
@ di_Ok
Definition: DataIO.h:38
Event structure, exchanged between DABC threads.
Definition: Thread.h:70