DABC (Data Acquisition Backbone Core)  2.9.9
TerminalModule.cxx
Go to the documentation of this file.
1 // $Id: TerminalModule.cxx 4696 2021-02-23 14:21:24Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "hadaq/TerminalModule.h"
17 
18 #include <cstdlib>
19 #include <unistd.h>
20 #include <sys/types.h>
21 #include <vector>
22 
23 #include "dabc/Manager.h"
24 
25 #include "hadaq/CombinerModule.h"
26 #include "hadaq/UdpTransport.h"
27 
29  dabc::ModuleAsync(name, cmd),
30  fTotalBuildEvents(0),
31  fTotalRecvBytes(0),
32  fTotalDiscEvents(0),
33  fTotalDroppedData(0),
34  fDoClear(false),
35  fDoShow(true),
36  fLastTm(),
37  fCalibr(),
38  fServPort(0),
39  fLastServCmd(),
40  fServReqRunning(false),
41  fFilePort(1),
42  fLastFileCmd(),
43  fFileReqRunning(false)
44 {
45  double period = Cfg("period", cmd).AsDouble(1);
46  fServPort = Cfg("servport", cmd).AsInt(-1);
47  fFilePort = Cfg("fileport", cmd).AsInt(-1);
48 
49  fDoClear = Cfg("clear", cmd).AsBool(false);
50  fDoShow = Cfg("show", cmd).AsBool(true);
51  fRingSize = Cfg("showtrig", cmd).AsInt(10);
52 
53  fModuleName = Cfg("mname", cmd).AsStr("Combiner");
54 
56 
57  CreateTimer("update", period);
58 
59  fLastTm.Reset();
60 
61  fWorkerHierarchy.Create("Term");
62 
64  item.SetField(dabc::prop_kind, "Text");
65  item.SetField("value", "Init");
66 
67  item = fWorkerHierarchy.CreateHChild("Output");
68  item.SetField(dabc::prop_kind, "Text");
69  item.SetField("value", "");
70 
71  item = fWorkerHierarchy.CreateHChild("Data");
72  item.SetField("value", "");
73  item.SetField("_hidden", "true");
74 
75  Publish(fWorkerHierarchy, "$CONTEXT$/Terminal");
76 }
77 
79 {
80  if (cmd.IsName("GetTransportStatistic")) {
81  if (cmd.GetBool("_file_req")) {
82  fFileReqRunning = false;
83  if (cmd.GetResult() == dabc::cmd_true) fLastFileCmd = cmd;
84  else fLastFileCmd.Release();
85  } else
86  if (cmd.GetBool("_serv_req")) {
87  fServReqRunning = false;
88  if (cmd.GetResult() == dabc::cmd_true) fLastServCmd = cmd;
89  else fLastServCmd.Release();
90  }
91  return true;
92  }
93 
94  if (!cmd.IsName("GetCalibrState")) return true;
95 
96  unsigned n = cmd.GetUInt("indx");
97  if (n < fCalibr.size()) {
98  fCalibr[n].trb = cmd.GetUInt("trb");
99  fCalibr[n].tdcs = cmd.GetField("tdcs").AsUIntVect();
100  fCalibr[n].progress = cmd.GetInt("progress");
101  fCalibr[n].state = cmd.GetStr("state");
102  fCalibr[n].send_request = false;
103  }
104 
105  return true;
106 }
107 
109 {
110  if (fDoShow) {
112  if (fDoClear) {
113  auto res = system("clear");
114  (void) res; // just avoid compiler warnings
115  }
116  }
117 }
118 
120 {
121  if (r<1e4) return dabc::format("%6.1f ev/s",r);
122  return dabc::format("%5.1f kev/s",r/1e3);
123 }
124 
126 {
127  dabc::ModuleRef m = dabc::mgr.FindModule(fModuleName);
128 
129  hadaq::CombinerModule *comb = dynamic_cast<hadaq::CombinerModule*> (m());
130  if (!comb) return;
131 
132  double delta = fLastTm.SpentTillNow(true);
133 
134  delta = (delta > 0.01) ? 1./delta : 0.;
135 
136  double rate1 = (comb->fAllBuildEvents > fTotalBuildEvents) ? (comb->fAllBuildEvents - fTotalBuildEvents) * delta : 0.,
137  rate2 = (comb->fAllRecvBytes > fTotalRecvBytes) ? (comb->fAllRecvBytes - fTotalRecvBytes) * delta : 0.,
138  rate3 = (comb->fAllDiscEvents > fTotalDiscEvents) ? (comb->fAllDiscEvents - fTotalDiscEvents) * delta : 0.,
139  rate4 = (comb->fAllDroppedData > fTotalDroppedData) ? (comb->fAllDroppedData - fTotalDroppedData) * delta : 0.;
140 
141  if (fDoShow) {
142  if (delta > 0) {
143  unsigned nlines = comb->fCfg.size() + 4;
144  if (fServPort>=0) nlines++;
145  if (fFilePort>=0) nlines++;
146  for (unsigned n=0;n<nlines;n++)
147  fputs("\033[A\033[2K",stdout);
148  rewind(stdout);
149  auto res = ftruncate(1,0);
150  (void) res; // just avoid compiler warnings
151  } else {
152  fprintf(stdout,"HADAQ terminal info:\n"
153  " disc - all discarded packets in the UDP receiver\n"
154  " err32 - 32-byte header does not match with 32-bytes footer\n"
155  " errbits - error bits not 0 and not 1\n"
156  " bufs - number of produced buffers\n"
157  " qu - input queue of combiner module\n"
158  " drop - dropped subevents (received by combiner but not useful)\n"
159  " lost - lost subevents (never seen by combiner)\n"
160  " trigger - last trigger values (after masking them in combiner module)\n"
161  " progr - progress of TDC calibration\n");
162  }
163  }
164 
165  fTotalBuildEvents = comb->fAllBuildEvents;
166  fTotalRecvBytes = comb->fAllRecvBytes;
167  fTotalDiscEvents = comb->fAllDiscEvents;
168  fTotalDroppedData = comb->fAllDroppedData;
169 
170  std::string s;
171 
172  s += "---------------------------------------------\n";
173  s += dabc::format("Events:%8s Rate:%12s Data: %10s Rate:%6.3f MB/s\n",
174  dabc::number_to_str(fTotalBuildEvents, 1).c_str(),
175  rate_to_str(rate1).c_str(),
176  dabc::size_to_str(fTotalRecvBytes).c_str(), rate2/1024./1024.);
177  s += dabc::format("Dropped:%7s Rate:%12s Data: %10s Rate:%6.3f MB/s",
178  dabc::number_to_str(fTotalDiscEvents, 1).c_str(),
179  rate_to_str(rate3).c_str(),
180  dabc::size_to_str(fTotalDroppedData).c_str(), rate4/1024./1024.);
181 
182  if (comb->fAllFullDrops>0)
183  s += dabc::format(" Total:%s\n", dabc::size_to_str(comb->fAllFullDrops, 1).c_str());
184  else
185  s += "\n";
186 
187  if (fServPort>=0) {
188  if (fLastServCmd.null()) {
189  s += dabc::format("Server: missing, failed or not found on %s/Output%d\n", fModuleName.c_str(), fServPort);
190  } else {
191  s += dabc::format("Server: clients:%d inpqueue:%d cansend:%s\n", fLastServCmd.GetInt("NumClients"), fLastServCmd.GetInt("NumCanRecv"), fLastServCmd.GetStr("NumCanSend").c_str());
192  }
193  } else if (comb->fBNETsend || comb->fBNETrecv) {
194  s += comb->fBnetInfo;
195  s += "\n";
196  }
197 
198  if (fFilePort>=0) {
199  if (fLastFileCmd.null()) {
200  s += dabc::format("File: missing, failed or not found on %s/Output%d\n", fModuleName.c_str(), fFilePort);
201  } else {
202  std::string state = fLastFileCmd.GetStr("OutputState");
203  if (state!="Ready") state = std::string(" State: ") + state;
204  else state.clear();
205  s += dabc::format("File: %8s Curr: %10s Data: %10s Name: %s%s\n",
206  dabc::number_to_str(fLastFileCmd.GetDouble("OutputFileEvents"),1).c_str(),
207  dabc::size_to_str(fLastFileCmd.GetDouble("OutputCurrFileSize")).c_str(),
208  dabc::size_to_str(fLastFileCmd.GetDouble("OutputFileSize")).c_str(),
209  fLastFileCmd.GetStr("OutputCurrFileName").c_str(),
210  state.c_str());
211  }
212  } else if (comb->fBNETsend || comb->fBNETrecv) {
213  s += comb->fBnetStat;
214  s += "\n";
215  }
216 
217  if (comb->fCfg.size() != fCalibr.size())
218  fCalibr.resize(comb->fCfg.size(), CalibrRect());
219 
220  bool istdccal = false;
221  for (unsigned n=0;n<comb->fCfg.size();n++)
222  if (comb->fCfg[n].fCalibr.length()>0) {
223  istdccal = true;
224  if (!fCalibr[n].send_request) {
225  dabc::Command cmd("GetCalibrState");
226  cmd.SetInt("indx",n);
227  cmd.SetReceiver(comb->fCfg[n].fCalibr);
228  dabc::mgr.Submit(Assign(cmd));
229  fCalibr[n].send_request = true;
230  }
231  }
232 
233  s += "inp port pkt data MB/s disc err32 bufs qu errbits drop lost";
234  if (istdccal) s += " TRB TDC progr state";
235  if (fRingSize>0) s += " triggers";
236  s += "\n";
237 
238  bool isready = true;
239 
240  dabc::Hierarchy ditem = fWorkerHierarchy.GetHChild("Data");
241  ditem.SetField("BuildEvents", fTotalBuildEvents);
242  ditem.SetField("BuildData", fTotalRecvBytes);
243  ditem.SetField("EventsRate", rate1);
244  ditem.SetField("DataRate", rate2);
245  ditem.SetField("LostEvents", fTotalDiscEvents);
246  ditem.SetField("LostData", fTotalDroppedData);
247  ditem.SetField("LostEventsRate", rate3);
248  ditem.SetField("LostDataRate", rate4);
249 
250  std::vector<int64_t> ports, recvbytes, inperrbits, inpdrop, inplost;
251  std::vector<double> inprates;
252 
253  for (unsigned n=0;n<comb->fCfg.size();n++) {
254 
255  std::string sbuf = dabc::format("%2u", n);
256 
257  hadaq::CombinerModule::InputCfg &cfg = comb->fCfg[n];
258 
260 
261  if (info==0) {
262  sbuf.append(" missing transport-info ");
263  fCalibr[n].lastrecv = 0;
264  } else {
265 
266  double rate = (info->fTotalRecvBytes > fCalibr[n].lastrecv) ? (info->fTotalRecvBytes - fCalibr[n].lastrecv) * delta : 0.;
267 
268  sbuf.append(dabc::format(" %5d %7s %9s %7.3f %6s %6s %6s",
269  info->fNPort,
270  dabc::number_to_str(info->fTotalRecvPacket,1).c_str(),
271  dabc::size_to_str(info->fTotalRecvBytes).c_str(),
272  rate/1024./1024.,
273  info->GetDiscardString().c_str(),
274  info->GetDiscard32String().c_str(),
276  fCalibr[n].lastrecv = info->fTotalRecvBytes;
277 
278  ports.push_back(info->fNPort);
279  recvbytes.push_back(info->fTotalRecvBytes);
280  inprates.push_back(rate);
281  }
282 
283  sbuf.append(dabc::format(" %3d %6s %5s %5s",
284  cfg.fNumCanRecv,
285  dabc::number_to_str(cfg.fErrorBitsCnt,0).c_str(),
286  dabc::number_to_str(cfg.fDroppedTrig,0).c_str(),
287  dabc::number_to_str(cfg.fLostTrig,0).c_str()));
288 
289  inperrbits.push_back(cfg.fErrorBitsCnt);
290  inpdrop.push_back(cfg.fDroppedTrig);
291  inplost.push_back(cfg.fLostTrig);
292 
293  if (cfg.fCalibr.length() > 0) {
294  sbuf.append(dabc::format(" 0x%04x", fCalibr[n].trb));
295 
296  std::string tdc = " [";
297  for (unsigned j=0;j<fCalibr[n].tdcs.size();j++) {
298  if (j>0) tdc.append(",");
299  if ((j>3) && (fCalibr[n].tdcs.size()>4)) { tdc.append(" ..."); break; }
300  tdc.append(dabc::format("%04x", (unsigned) fCalibr[n].tdcs[j]));
301  }
302  tdc.append("]");
303  while (tdc.length()<27) tdc.append(" ");
304  sbuf.append(tdc);
305 
306  sbuf.append(dabc::format(" %3d %10s", fCalibr[n].progress, fCalibr[n].state.c_str()));
307 
308  if (fCalibr[n].state.find("Ready")!=0) isready = false;
309  }
310 
311  s += sbuf;
312 
313  if (fRingSize>0) s += " " + cfg.TriggerRingAsStr(fRingSize);
314 
315  s += "\n";
316  }
317 
318  if (fDoShow)
319  fprintf(stdout, "%s", s.c_str());
320 
321  fWorkerHierarchy.GetHChild("State").SetField("value", isready ? "Ready" : "Init");
322  fWorkerHierarchy.GetHChild("Output").SetField("value", s);
323  ditem.SetField("inputs", ports);
324  ditem.SetField("inprecv", recvbytes);
325  ditem.SetField("inprates", inprates);
326  ditem.SetField("inperrbits", inperrbits);
327  ditem.SetField("inpdrop", inpdrop);
328  ditem.SetField("inplost", inplost);
329 
330  if (!fFileReqRunning && (fFilePort>=0)) {
331  dabc::Command cmd("GetTransportStatistic");
332  cmd.SetStr("_for_the_port_", dabc::format("Output%d", fFilePort));
333  cmd.SetBool("_file_req", true);
334  m.Submit(Assign(cmd));
335  }
336 
337  if (!fServReqRunning && (fServPort>=0)) {
338  dabc::Command cmd("GetTransportStatistic");
339  cmd.SetStr("_for_the_port_", dabc::format("Output%d", fServPort));
340  cmd.SetBool("_serv_req", true);
341  m.Submit(Assign(cmd));
342  }
343 
344 }
Represents command with its arguments.
Definition: Command.h:99
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
Definition: Command.h:148
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetBool(const std::string &name, bool v)
Definition: Command.h:141
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
void Release()
Method used to clean command - all internal data will be cleaned, command container will be released.
Definition: Command.cxx:198
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
Definition: Command.h:264
int GetResult() const
Definition: Command.h:174
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
Hierarchy GetHChild(const std::string &name, bool allowslahes=false, bool force=false, bool sortorder=false)
Return child, if necessary creates with full subfolder If force specified, missing childs and folders...
Definition: Hierarchy.cxx:944
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Definition: Hierarchy.cxx:934
ModuleRef FindModule(const std::string &name)
Definition: Manager.cxx:2018
Reference on dabc::Module class
Definition: Module.h:275
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
Definition: Module.cxx:109
std::vector< uint64_t > AsUIntVect() const
Definition: Record.cxx:631
bool AsBool(bool dflt=false) const
Definition: Record.cxx:477
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
int64_t AsInt(int64_t dflt=0) const
Definition: Record.cxx:501
double AsDouble(double dflt=0.) const
Definition: Record.cxx:549
RecordField GetField(const std::string &name) const
Definition: Record.h:510
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool Submit(Command cmd)
Definition: Worker.cxx:1139
Hierarchy fWorkerHierarchy
place for publishing of worker parameters
Definition: Worker.h:168
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
virtual bool Publish(const Hierarchy &h, const std::string &path)
Definition: Worker.cxx:1075
std::vector< InputCfg > fCfg
all input-dependent configurations
bool fBNETsend
indicate that combiner used as BNET sender
uint64_t fAllBuildEvents
number of build events
uint64_t fAllFullDrops
number of complete drops
bool fBNETrecv
indicate that second-level event building is performed
std::string fBnetInfo
info for showing of bnet sender
std::string fBnetStat
gener-purpose statistic in text form
virtual bool ReplyCommand(dabc::Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
std::string fModuleName
name of hadaq combiner module
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
virtual void BeforeModuleStart()
TerminalModule(const std::string &name, dabc::Command cmd=nullptr)
bool fDoShow
perform output
int fRingSize
number of last IDs shown
dabc::TimeStamp fLastTm
std::string rate_to_str(double r)
bool fDoClear
clear terminal when start module
#define HADAQ_RINGSIZE
Event manipulation API.
Definition: api.h:23
void SetDebugLevel(int level=0)
Definition: logging.cxx:468
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
std::string size_to_str(unsigned long sz, int prec=1, int select=0)
Convert size to string of form like 4.2 GB or 3.7 MB.
Definition: string.cxx:75
std::string number_to_str(unsigned long num, int prec=1, int select=0)
Convert number to string of form like 4.2G or 3.7M.
Definition: string.cxx:107
const char * prop_kind
Definition: Hierarchy.cxx:29
@ cmd_true
Definition: Command.h:38
void Reset()
Set time stamp value to null.
Definition: timing.h:134
int fNumCanRecv
Number buffers can be received.
std::string fCalibr
name of calibration module, used only in terminal
std::string TriggerRingAsStr(int RingSize)
unsigned fErrorBitsCnt
number of subevents with non-zero error bits
unsigned fLostTrig
number of lost triggers (never received by the combiner)
void * fInfo
Direct pointer on transport info, used only for debugging.
unsigned fDroppedTrig
number of dropped triggers (received but dropped by the combiner)
int fNPort
upd port number
Definition: UdpTransport.h:43
uint64_t fTotalRecvPacket
Definition: UdpTransport.h:45
uint64_t fTotalProducedBuffers
Definition: UdpTransport.h:52
std::string GetDiscard32String()
Definition: UdpTransport.h:78
uint64_t fTotalRecvBytes
Definition: UdpTransport.h:50
std::string GetDiscardString()
Definition: UdpTransport.h:68