DABC (Data Acquisition Backbone Core)  2.9.9
Transport.cxx
Go to the documentation of this file.
1 // $Id: Transport.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/Transport.h"
17 
18 #include <cstdlib>
19 
20 #include "dabc/Manager.h"
21 #include "dabc/Publisher.h"
22 
23 const unsigned dabc::AcknoledgeQueueLength = 2;
24 
25 
26 std::string dabc::Transport::MakeName(const PortRef& inpport, const PortRef& outport)
27 {
28  std::string name = inpport.ItemName();
29  if (name.empty()) name = outport.ItemName();
30  name += "_Transport";
31 
32  size_t pos;
33  while ((pos=name.find("/")) != std::string::npos)
34  name[pos] = '_';
35 
36  return std::string("#") + name;
37 }
38 
39 
40 dabc::Transport::Transport(dabc::Command cmd, const PortRef& inpport, const PortRef& outport) :
41  ModuleAsync(MakeName(inpport, outport)),
42  fTransportDevice(),
43  fTransportState(stInit),
44  fIsInputTransport(false),
45  fIsOutputTransport(false),
46  fTransportInfoName(),
47  fTransportInfoInterval(-1.),
48  fTransportInfoTm()
49 {
50  std::string poolname;
51 
52  if (!inpport.null()) {
53 
54  unsigned trqueue = inpport.Cfg("TransportQueue", cmd).AsUInt(1);
55  if (trqueue < inpport.QueueCapacity()) trqueue = inpport.QueueCapacity();
56 
57  poolname = inpport.Cfg(dabc::xmlPoolName, cmd).AsStr(poolname);
58 
59  CreateOutput("Output", trqueue);
60 
61  SetPortLoopLength(OutputName(), 1);
62 
63  fTransportInfoName = inpport.InfoParName();
64 
65  fIsInputTransport = true;
66  }
67 
68  if (!outport.null()) {
69 
70  unsigned trqueue = outport.Cfg("TransportQueue", cmd).AsUInt(1);
71  if (trqueue < outport.QueueCapacity()) trqueue = outport.QueueCapacity();
72 
73  poolname = outport.Cfg(dabc::xmlPoolName, cmd).AsStr(poolname);
74 
75  CreateInput("Input", trqueue);
76 
77  SetPortLoopLength(InputName(), 1);
78 
79  if (fTransportInfoName.empty())
80  fTransportInfoName = outport.InfoParName();
81 
82  fIsOutputTransport = true;
83  }
84 
85  if (!fTransportInfoName.empty()) {
86  fTransportInfoInterval = 1;
87  fTransportInfoTm.GetNow();
88  }
89 
90 
91  if (fIsInputTransport && poolname.empty()) poolname = dabc::xmlWorkPool;
92 
93 // DOUT0("Create transport inp %s out %s poolname %s", DBOOL(fIsInputTransport), DBOOL(fIsOutputTransport), poolname.c_str());
94 
95  if (!poolname.empty() && (NumPools()==0)) {
96  // TODO: one should be able to configure if transport use pool requests or not
97 
98  // for output transport one not need extra memory, just link to pool for special cases like verbs
99  CreatePoolHandle(poolname, fIsInputTransport ? 10 : 0);
100  }
101 }
102 
104 {
105  DOUT3("Transport %s DESTRUCTOR", ItemName().c_str());
106 }
107 
109 {
110  DOUT3("Transport %s CLEANUP", ItemName().c_str());
111 
112  // first let transport to cleanup itself
113  TransportCleanup();
114 
115  // than release device reference (if any)
116  fTransportDevice.Release();
117 }
118 
119 
121 {
122  if (fTransportInfoName.empty() || (fTransportInfoInterval<=0)) return false;
123 
124  return fTransportInfoTm.Expired(fTransportInfoInterval);
125 }
126 
127 void dabc::Transport::ProvideInfo(int lvl, const std::string &info)
128 {
129  if (fTransportInfoName.empty()) return;
130 
131  InfoParameter par = dabc::mgr.FindPar(fTransportInfoName);
132 
133  fTransportInfoTm.GetNow();
134 
135  if (par.null()) return;
136 
137  par.SetInfo(info);
138 }
139 
140 
141 void dabc::Transport::ProcessConnectionActivated(const std::string &name, bool on)
142 {
143  // ignore connect/disconnect from pool handles
144  if (IsValidPool(FindPool(name))) return;
145 
146  DOUT5("$$$$$$ Transport %s %p Port %s Activated %s", GetName(), this, name.c_str(), DBOOL(on));
147 
148  if (on) {
149  if ((GetTransportState()==stInit) || (GetTransportState()==stStopped)) {
150  DOUT2("Connection %s activated in transport %s - start it", name.c_str(), GetName());
151 
152  if (StartTransport()) {
153  fTransportState = stRunning;
154  } else {
155  fTransportState = stError;
156  DeleteThis();
157  }
158  } else {
159  DOUT2("Transport %s is running, ignore start message from port %s", GetName(), name.c_str());
160  }
161 
162  } else {
163  if (GetTransportState()==stRunning) {
164  DOUT2("Connection %s deactivated in transport %s - stop it", name.c_str(), GetName());
165  if (StopTransport()) {
166  fTransportState = stStopped;
167  } else {
168  fTransportState = stError;
169  DeleteThis();
170  }
171  }
172  }
173 }
174 
175 void dabc::Transport::ProcessConnectEvent(const std::string &name, bool on)
176 {
177  DOUT5("$$$$$$ Transport %s %p %s event port %s\n", GetName(), this, on ? "connect" : "DISCONNECT", name.c_str());
178 
179  // ignore connect event
180  if (on) return;
181 
182  if (IsInputTransport() && (name == OutputName())) {
183  DOUT2("Transport %s port %s is disconnected - automatic transport destroyment is started", GetName(), name.c_str());
184  DeleteThis();
185  return;
186  }
187 
188  if (IsOutputTransport() && (name == InputName())) {
189  DOUT2("Transport %s port %s is disconnected - automatic transport destroyment is started", GetName(), name.c_str());
190  DeleteThis();
191  return;
192  }
193 
194 }
195 
196 void dabc::Transport::CloseTransport(bool witherr)
197 {
198  DisconnectAllPorts(witherr);
199  DeleteThis();
200 }
201 
202 int dabc::Transport::ExecuteCommand(Command cmd)
203 {
204  if (cmd.IsName("CloseTransport")) {
205  CloseTransport(cmd.GetBool("IsError", true));
206  return cmd_true;
207  } else
208  if (cmd.IsName(dabc::CmdGetBinary::CmdName()) && (cmd.GetStr("Kind")=="transport.json")) {
209  dabc::Record info;
210  info.CreateRecord(GetName());
211  info.SetField("IsInput", IsInputTransport());
212  info.SetField("IsOutput", IsOutputTransport());
213  cmd.SetStr("StringReply", info.SaveToJson());
214  return cmd_true;
215  }
216 
217  return ModuleAsync::ExecuteCommand(cmd);
218 }
219 
221 {
222  if (IsRunning()) return true;
223 
224  DOUT3("Start transport %s", GetName());
225 
226  fAddon.Notify("StartTransport");
227 
228  return ModuleAsync::Start();
229 }
230 
232 {
233  if (!IsRunning()) return true;
234 
235  DOUT3("Stop transport %s", GetName());
236 
237  fAddon.Notify("StopTransport");
238 
239  return ModuleAsync::Stop();
240 }
241 
242 
Represents command with its arguments.
Definition: Command.h:99
Parameter FindPar(const std::string &parname)
Definition: Manager.cxx:2069
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
bool Stop()
Stops execution of the module code.
Definition: Module.cxx:249
bool Start()
Starts execution of the module code.
Definition: Module.cxx:240
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Object.cxx:1076
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
std::string SaveToJson(unsigned mask=0)
Store record in JSON form.
Definition: Record.cxx:1658
virtual void CreateRecord(const std::string &name)
Definition: Record.cxx:1674
Transport(dabc::Command cmd, const PortRef &inpport=0, const PortRef &outport=0)
Definition: Transport.cxx:40
virtual bool StopTransport()
Definition: Transport.cxx:231
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
Definition: Transport.cxx:220
virtual void ProcessConnectionActivated(const std::string &name, bool on)
Method called when module on other side is started.
Definition: Transport.cxx:141
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Transport.cxx:202
virtual void CloseTransport(bool witherr=false)
Definition: Transport.cxx:196
virtual void ProcessConnectEvent(const std::string &name, bool on)
Method called by framework when connection state of the item is changed.
Definition: Transport.cxx:175
virtual void ModuleCleanup()
Reimplemented method from module.
Definition: Transport.cxx:108
static std::string MakeName(const PortRef &inpport, const PortRef &outport)
Definition: Transport.cxx:26
void ProvideInfo(int lvl, const std::string &info)
Method provides transport info to specified parameter.
Definition: Transport.cxx:127
virtual ~Transport()
Definition: Transport.cxx:103
bool InfoExpected() const
Returns true when next info can be provided If info parameter was configured, one could use it regula...
Definition: Transport.cxx:120
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define DBOOL(arg)
Definition: logging.h:191
const unsigned AcknoledgeQueueLength
Definition: Transport.cxx:23
const char * xmlWorkPool
Definition: Object.cxx:46
ManagerRef mgr
Definition: Manager.cxx:42
const char * xmlPoolName
Definition: Object.cxx:45
@ cmd_true
Definition: Command.h:38