DABC (Data Acquisition Backbone Core)  2.9.9
MultiplexerModule.cxx
Go to the documentation of this file.
1 // $Id: MultiplexerModule.cxx 4575 2020-08-04 12:44:29Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/MultiplexerModule.h"
17 
19  dabc::ModuleAsync(name, cmd),
20  fQueue(100),
21  fDataRateName()
22 {
23  EnsurePorts(1, 1);
24 
25  fDataRateName = Cfg("DataRateName", cmd).AsStr();
26 
27  if (!fDataRateName.empty())
28  CreatePar(fDataRateName).SetRatemeter(false, 3.).SetUnits("MB");
29 
30  for (unsigned n=0;n<NumInputs();n++) {
32  if (!fDataRateName.empty())
34  }
35 
36  DOUT0("Create multiplexer module %s with %u inputs and %u outputs", name.c_str(), NumInputs(), NumOutputs());
37 }
38 
40 {
41  fQueue.Push(port);
42 
43  if (port >= NumInputs())
44  EOUT("MultiplexerModule %s process inpevent from not existing input %u total %u", GetName(), port, NumInputs());
45 
46  CheckDataSending();
47 }
48 
50 {
51  CheckDataSending();
52 }
53 
55 {
56 // DOUT0("MultiplexerModule %s CheckDataSending queue %u canallsend:%s numout %u",
57 // GetName(), fQueue.Size(), DBOOL(CanSendToAllOutputs()), NumOutputs());
58 
59  while (CanSendToAllOutputs() && (fQueue.Size()>0)) {
60  unsigned id = fQueue.Pop();
61 
62  if (id >= NumInputs()) {
63  EOUT("MultiplexerModule %s wrong input port id %u, use 0", GetName(), id);
64  id = 0;
65  }
66 
67  dabc::Buffer buf = Recv(id);
68 
69  if (buf.null())
70  EOUT("Fail to get buffer from input %u", id);
71 // else
72 // DOUT0("Sending buffer %u to all outputs", buf.GetTotalSize());
73 
74  SendToAllOutputs(buf);
75  }
76 }
77 
78 // ========================================================================================
79 
80 
81 
82 dabc::RepeaterModule::RepeaterModule(const std::string &name, dabc::Command cmd) :
83  dabc::ModuleAsync(name, cmd)
84 {
85  EnsurePorts(1, 1);
86 
87  std::string ratemeter = Cfg("DataRateName", cmd).AsStr();
88 
89  if (!ratemeter.empty()) {
90  CreatePar(ratemeter).SetRatemeter(false, 3.).SetUnits("MB");
91 
92  for (unsigned n=0;n<NumInputs();n++)
93  SetPortRatemeter(InputName(n), Par(ratemeter));
94  }
95 
96  DOUT1("Create dabc::RepeaterModule:: %s", GetName());
97 }
98 
100 {
101  bool isout = (port < NumOutputs());
102 
103  if (isout && !CanSend(port)) return false;
104 
105  dabc::Buffer buf = Recv(port);
106  if (isout) Send(port, buf);
107  else buf.Release();
108 
109  return true;
110 }
111 
113 {
114  if (!CanRecv(port)) return false;
115 
116  dabc::Buffer buf = Recv(port);
117  Send(port, buf);
118 
119  return true;
120 }
Reference on memory from memory pool.
Definition: Buffer.h:135
Represents command with its arguments.
Definition: Command.h:99
Base class for user-derived code, implementing event-processing.
Definition: ModuleAsync.h:32
bool SetPortSignaling(const std::string &name, Port::EventsProducing signal)
Definition: Module.cxx:791
virtual Parameter CreatePar(const std::string &name, const std::string &kind="")
Definition: Module.cxx:129
std::string InputName(unsigned indx=0, bool fullname=false) const
Definition: Module.cxx:188
unsigned NumOutputs() const
Definition: Module.h:141
unsigned NumInputs() const
Definition: Module.h:148
void EnsurePorts(unsigned numinp=0, unsigned numout=0, const std::string &poolname="")
Method ensure that at least specified number of input and output ports will be created.
Definition: Module.cxx:66
bool SetPortRatemeter(const std::string &name, const Parameter &ref)
Definition: Module.cxx:802
virtual void ProcessOutputEvent(unsigned port)
Method called by framework when output event is produced.
virtual void ProcessInputEvent(unsigned port)
Method called by framework when input event is produced.
MultiplexerModule(const std::string &name, dabc::Command cmd=nullptr)
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
Parameter & SetUnits(const std::string &unit)
Set units field of parameter.
Definition: Parameter.h:256
Parameter & SetRatemeter(bool synchron=false, double interval=1.0)
Converts parameter in ratemeter - all values will be summed up and divided on specified interval.
Definition: Parameter.cxx:365
@ SignalEvery
Definition: Port.h:63
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
virtual bool ProcessSend(unsigned port)
Method called by framework when at least one buffer can be send to output port.
RepeaterModule(const std::string &name, dabc::Command cmd=nullptr)
virtual bool ProcessRecv(unsigned port)
Method called by framework when at least one buffer available in input port.
Parameter Par(const std::string &name) const
Returns reference on worker parameter object.
Definition: Worker.cxx:516
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
#define DOUT0(args ...)
Definition: logging.h:156
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
Event manipulation API.
Definition: api.h:23