DABC (Data Acquisition Backbone Core)  2.9.9
CombinerModule.cxx
Go to the documentation of this file.
1 // $Id: CombinerModule.cxx 4747 2021-03-23 14:49:44Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "hadaq/CombinerModule.h"
17 
18 #include <cmath>
19 #include <iostream>
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <cstdio>
23 #include <cstdlib>
24 
25 #include "dabc/Manager.h"
26 
27 #include "hadaq/UdpTransport.h"
28 
29 hadaq::CombinerModule::CombinerModule(const std::string &name, dabc::Command cmd) :
30  dabc::ModuleAsync(name, cmd),
31  fCfg(),
32  fOut(),
33  fFlushCounter(0),
34  fIsTerminating(false),
35  fRunToOracle(false),
36  fCheckTag(true),
37  fFlushTimeout(0.),
38  fBnetFileCmd(),
39  fEvnumDiffStatistics(true)
40 {
41  EnsurePorts(0, 1, dabc::xmlWorkPool);
42 
43  fSpecialItemId = CreateUserItem("BuildEvents");
44  fSpecialFired = false;
45  fLastEventRate = 0.;
46  fBldProfiler.Reserve(50);
47 
48  fRunRecvBytes = 0;
49  fRunBuildEvents = 0;
50  fRunDiscEvents = 0;
51  fRunDroppedData = 0;
52  fRunTagErrors = 0;
53  fRunDataErrors = 0;
54 
55  fAllRecvBytes = 0;
56  fAllBuildEvents = 0;
57  fAllBuildEventsLimit = 0;
58  fAllDiscEvents = 0;
59  fAllDroppedData = 0;
60  fAllFullDrops = 0;
61  fMaxProcDist = 0;
62 
63  for (unsigned i = 0; i < HADAQ_NEVTIDS; i++)
64  fEventIdCount[i] = 0;
65 
66  fEBId = Cfg("NodeId", cmd).AsInt(-1);
67  if (fEBId<0) fEBId = dabc::mgr.NodeId()+1; // hades eb ids start with 1
68 
69  fBNETsend = Cfg("BNETsend", cmd).AsBool(false);
70  fBNETrecv = Cfg("BNETrecv", cmd).AsBool(false);
71  fBNETbunch = Cfg("EB_EVENTS", cmd).AsInt(16);
72  fBNETNumRecv = Cfg("BNET_NUMRECEIVERS", cmd).AsInt(1);
73  fBNETNumSend = Cfg("BNET_NUMSENDERS", cmd).AsInt(1);
74 
75  fExtraDebug = Cfg("ExtraDebug", cmd).AsBool(true);
76 
77  fCheckTag = Cfg("CheckTag", cmd).AsBool(true);
78 
79  fSkipEmpty = Cfg("SkipEmpty", cmd).AsBool(true);
80 
81  fBNETCalibrDir = Cfg("CalibrDir", cmd).AsStr();
82  fBNETCalibrPackScript = Cfg("CalibrPack", cmd).AsStr();
83 
84  fEpicsRunNumber = 0;
85 
86  fLastTrigNr = 0xffffffff;
87  fMaxHadaqTrigger = 0;
88  fTriggerRangeMask = 0;
89 
90  if (fBNETrecv || fBNETsend)
91  fRunNumber = 0; // ignore data without valid run id at beginning!
92  else
93  fRunNumber = hadaq::CreateRunId(); // runid from configuration time.
94 
95  fMaxHadaqTrigger = Cfg(hadaq::xmlHadaqTrignumRange, cmd).AsUInt(0x1000000);
96  fTriggerRangeMask = fMaxHadaqTrigger-1;
97  DOUT1("HADAQ %s module using maxtrigger 0x%x, rangemask:0x%x", GetName(), fMaxHadaqTrigger, fTriggerRangeMask);
98  fEvnumDiffStatistics = Cfg(hadaq::xmlHadaqDiffEventStats, cmd).AsBool(true);
99 
100  fTriggerNrTolerance = Cfg(hadaq::xmlHadaqTriggerTollerance, cmd).AsInt(-1);
101  if (fTriggerNrTolerance == -1) fTriggerNrTolerance = fMaxHadaqTrigger / 4;
102  fEventBuildTimeout = Cfg(hadaq::xmlEvtbuildTimeout, cmd).AsDouble(20.0); // 20 seconds configure this optionally from xml later
103  fAllBuildEventsLimit = Cfg(hadaq::xmlMaxNumBuildEvt, cmd).AsUInt(0);
104  fHadesTriggerType = Cfg(hadaq::xmlHadesTriggerType, cmd).AsBool(false);
105  fHadesTriggerHUB = Cfg(hadaq::xmlHadesTriggerHUB, cmd).AsUInt(0x8800);
106 
107  std::string ratesprefix = "Hadaq";
108 
109  for (unsigned n = 0; n < NumInputs(); n++) {
110  fCfg.emplace_back();
111  fCfg[n].Reset(true);
112  fCfg[n].fResort = FindPort(InputName(n)).Cfg("resort").AsBool(false);
113  if (fCfg[n].fResort) DOUT0("Do resort on input %u",n);
114  }
115 
116  fFlushTimeout = Cfg(dabc::xmlFlushTimeout, cmd).AsDouble(1.);
117 
118  // provide timeout with period/2, but trigger flushing after 3 counts
119  // this will lead to effective flush time between FlushTimeout and FlushTimeout*1.5
120  CreateTimer("FlushTimer", (fFlushTimeout > 0) ? fFlushTimeout/2. : 1.);
121 
122  //CreatePar("RunId");
123  //Par("RunId").SetValue(fRunNumber); // to communicate with file components
124 
125  fRunInfoToOraFilename = dabc::format("eb_runinfo2ora_%d.txt",fEBId);
126  // TODO: optionally set this name
127  fPrefix = Cfg("FilePrefix", cmd).AsStr("no");
128  fRunToOracle = Cfg("Runinfo2ora", cmd).AsBool(false);
129 
130  fDataRateName = ratesprefix + "Data";
131  fEventRateName = ratesprefix + "Events";
132  fLostEventRateName = ratesprefix + "LostEvents";
133  fDataDroppedRateName = ratesprefix + "DroppedData";
134  fInfoName = ratesprefix + "Info";
135 
136  CreatePar(fDataRateName).SetRatemeter(false, 3.).SetUnits("MB");
137  CreatePar(fEventRateName).SetRatemeter(false, 3.).SetUnits("Ev");
138  CreatePar(fLostEventRateName).SetRatemeter(false, 3.).SetUnits("Ev");
139  CreatePar(fDataDroppedRateName).SetRatemeter(false, 3.).SetUnits("MB");
140 
141  fDataRateCnt = fEventRateCnt = fLostEventRateCnt = fDataDroppedRateCnt = 0;
142 
143  if (fBNETrecv) {
144  CreatePar("RunFileSize").SetUnits("MB").SetFld(dabc::prop_kind,"rate").SetFld("#record", true);
145  CreatePar("LtsmFileSize").SetUnits("MB").SetFld(dabc::prop_kind,"rate").SetFld("#record", true);
146  CreateCmdDef("BnetFileControl").SetField("_hidden", true);
147  } else if (fBNETsend) {
148  CreateCmdDef("BnetCalibrControl").SetField("_hidden", true);
149  CreateCmdDef("BnetCalibrRefresh").SetField("_hidden", true);
150  } else {
151  CreateCmdDef("StartHldFile")
152  .AddArg("filename", "string", true, "file.hld")
153  .AddArg(dabc::xml_maxsize, "int", false, 1500)
154  .SetArgMinMax(dabc::xml_maxsize, 1, 5000);
155  CreateCmdDef("StopHldFile");
156  CreateCmdDef("RestartHldFile");
157  }
158 
159  CreatePar(fInfoName, "info").SetSynchron(true, 2., false).SetDebugLevel(2);
160 
161  if (IsName("Combiner"))
162  PublishPars("$CONTEXT$/HadaqCombiner");
163  else
164  PublishPars(dabc::format("$CONTEXT$/%s", GetName()));
165 
166  fWorkerHierarchy.SetField("_player", "DABC.HadaqDAQControl");
167 
168  if (fBNETsend) fWorkerHierarchy.SetField("_bnet", "sender");
169  if (fBNETrecv) {
170  fWorkerHierarchy.SetField("_bnet", "receiver");
171  fWorkerHierarchy.SetField("build_events", 0);
172  fWorkerHierarchy.SetField("build_data", 0);
173  fWorkerHierarchy.SetField("discard_events", 0);
174  }
175 
176  if (fBNETsend || fBNETrecv) {
177  CreateTimer("BnetTimer", 1.); // check BNET values
178  dabc::Hierarchy item = fWorkerHierarchy.CreateHChild("State");
179  item.SetField(dabc::prop_kind, "Text");
180  item.SetField("value", "Init");
181  }
182 
183  fNumReadBuffers = 0;
184 }
185 
186 
188 {
189  DOUT3("hadaq::CombinerModule::DTOR..does nothing now!.");
190  //fOut.Close().Release();
191  //fCfg.clear();
192 }
193 
195 {
196  DOUT0("hadaq::CombinerModule::ModuleCleanup()");
197  fIsTerminating = true;
198  StoreRunInfoStop(true); // run info with exit mode
199  fOut.Close().Release();
200 
201  for (unsigned n=0;n<fCfg.size();n++)
202  fCfg[n].Reset();
203 
204  DOUT5("hadaq::CombinerModule::ModuleCleanup() after fCfg[n].Reset()");
205 
206 // DOUT0("First %06x Last %06x Num %u Time %5.2f", firstsync, lastsync, numsync, tm2-tm1);
207 // if (numsync>0)
208 // DOUT0("Step %5.2f rate %5.2f sync/s", (lastsync-firstsync + 0.) / numsync, (numsync + 0.) / (tm2-tm1));
209 }
210 
211 
212 void hadaq::CombinerModule::SetInfo(const std::string &info, bool forceinfo)
213 {
214 // DOUT0("SET INFO: %s", info.c_str());
215 
217 
218  if (!fInfoName.empty()) par = Par(fInfoName);
219 
220  par.SetValue(info);
221  if (forceinfo)
222  par.FireModified();
223 }
224 
225 void hadaq::CombinerModule::ProcessTimerEvent(unsigned timer)
226 {
227  if (TimerName(timer) == "BnetTimer") {
228  UpdateBnetInfo();
229  return;
230  }
231 
232  if ((fFlushTimeout > 0) && (++fFlushCounter > 2)) {
233  fFlushCounter = 0;
234  dabc::ProfilerGuard grd(fBldProfiler, "flush", 30);
235  FlushOutputBuffer();
236  }
237 
238  fTimerCalls++;
239 
240  Par(fDataRateName).SetValue(fDataRateCnt/1024./1024.);
241  Par(fEventRateName).SetValue(fEventRateCnt);
242  Par(fLostEventRateName).SetValue(fLostEventRateCnt);
243  Par(fDataDroppedRateName).SetValue(fDataDroppedRateCnt/1024./1024.);
244 
245  fDataRateCnt = fEventRateCnt = fLostEventRateCnt = fDataDroppedRateCnt = 0;
246 
247  fLastEventRate = Par(fEventRateName).Value().AsDouble();
248 
249  // invoke event building, if necessary - reinjects events
250  StartEventsBuilding();
251 
252  if ((fAllBuildEventsLimit > 0) && (fAllBuildEvents >= fAllBuildEventsLimit)) {
253  FlushOutputBuffer();
254  fAllBuildEventsLimit = 0; // invoke only once
256  }
257 }
258 
260 {
261  int cnt = 10;
262  if (fLastEventRate > 1000) cnt = 20;
263  if (fLastEventRate > 30000) cnt = 50;
264 
265  while (IsRunning() && (cnt-- > 0)) {
266  // no need to continue
267  if (!BuildEvent()) return;
268  }
269 
270  if (!fSpecialFired) {
271  fSpecialFired = true;
272  // DOUT0("Fire user event %d item %u", dabc::evntUser, fSpecialItemId);
273  FireEvent(dabc::evntUser, fSpecialItemId);
274  }
275 }
276 
277 void hadaq::CombinerModule::ProcessUserEvent(unsigned item)
278 {
279  if (fSpecialItemId == item) {
280  // DOUT0("Get user event");
281  fSpecialFired = false;
282  } else {
283  EOUT("Get wrong user event");
284  }
285 
286  StartEventsBuilding();
287 }
288 
290 {
291  std::string info = dabc::format(
292  "HADAQ %s starts. Runid:%d, numinp:%u, numout:%u flush:%3.1f",
293  GetName(), (int) fRunNumber, NumInputs(), NumOutputs(), fFlushTimeout);
294 
295  SetInfo(info, true);
296  DOUT0(info.c_str());
297  fLastDropTm.GetNow();
298 
299  fLastProcTm.GetNow();
300  fLastBuildTm.GetNow();
301 
302  // direct addon pointers can be used for terminal printout
303  for (unsigned ninp=0;ninp<fCfg.size();ninp++) {
304  fCfg[ninp].fQueueCapacity = InputQueueCapacity(ninp);
305  if (fBNETrecv) continue;
306  dabc::Command cmd("GetHadaqTransportInfo");
307  cmd.SetInt("id", ninp);
308  SubmitCommandToTransport(InputName(ninp), Assign(cmd));
309  }
310 }
311 
313 {
314  std::string info = dabc::format(
315  "HADAQ %s stopped. CompleteEvents:%d, BrokenEvents:%d, DroppedData:%d, RecvBytes:%d, data errors:%d, tag errors:%d",
316  GetName(), (int) fAllBuildEvents, (int) fAllDiscEvents , (int) fAllDroppedData, (int) fAllRecvBytes ,(int) fRunDataErrors ,(int) fRunTagErrors);
317 
318  SetInfo(info, true);
319  DOUT0(info.c_str());
320 
321  // when BNET receiver module stopped, lead to application stop
322  if (fBNETrecv) dabc::mgr.StopApplication();
323 }
324 
325 
327 {
328  if (fOut.IsEmpty() || !fOut.IsBuffer()) {
329  DOUT3("FlushOutputBuffer has no buffer to flush");
330  return false;
331  }
332 
333  int dest = DestinationPort(fLastTrigNr);
334  if (dest<0) {
335  if (!CanSendToAllOutputs()) return false;
336  } else {
337  if (!CanSend(dest)) return false;
338  }
339 
340  dabc::Buffer buf = fOut.Close();
341 
342  // if (fBNETsend) DOUT0("%s FLUSH buffer", GetName());
343 
344  if (dest<0)
345  SendToAllOutputs(buf);
346  else
347  Send(dest, buf);
348 
349  fFlushCounter = 0; // indicate that next flush timeout one not need to send buffer
350 
351  return true;
352 }
353 
355 {
356  fBldProfiler.MakeStatistic();
357 
358  dabc::ProfilerGuard grd(fBldProfiler, "info", 20);
359 
360  if (fBNETrecv) {
361 
362  if (!fBnetFileCmd.null() && fBnetFileCmd.IsTimedout()) fBnetFileCmd.Reply(dabc::cmd_false);
363 
364  dabc::Command cmd("GetTransportStatistic");
365  if ((NumOutputs() < 2) || !SubmitCommandToTransport(OutputName(1), Assign(cmd))) {
366  fWorkerHierarchy.SetField("runid", 0);
367  fWorkerHierarchy.SetField("runsize", 0);
368  fWorkerHierarchy.SetField("runname", std::string());
369  fWorkerHierarchy.SetField("runprefix", std::string());
370  fWorkerHierarchy.SetField("state", "NoFile");
371  fWorkerHierarchy.GetHChild("State").SetField("value", "NoFile");
372  fWorkerHierarchy.SetField("quality", 0.5); // not very bad - just inform that file not written
373  }
374 
375  dabc::Command cmd2("GetTransportStatistic");
376  cmd2.SetBool("#ltsm", true);
377  if ((NumOutputs() < 3) || !SubmitCommandToTransport(OutputName(2), Assign(cmd2))) {
378  fWorkerHierarchy.SetField("ltsmid", 0);
379  fWorkerHierarchy.SetField("ltsmsize", 0);
380  fWorkerHierarchy.SetField("ltsmname", std::string());
381  fWorkerHierarchy.SetField("ltsmrefix", std::string());
382  Par("LtsmFileSize").SetValue(0.);
383  }
384 
385  dabc::Command cmd3("GetTransportStatistic");
386  cmd3.SetBool("#mbs", true);
387  if ((NumOutputs() < 1) || !SubmitCommandToTransport(OutputName(0), Assign(cmd3))) {
388  fWorkerHierarchy.SetField("mbsinfo", "");
389  }
390 
391  std::string info = "BnetRecv: ";
392  std::vector<int64_t> qsz;
393  for (unsigned n=0;n<NumInputs();++n) {
394  unsigned len = NumCanRecv(n);
395  info.append(" ");
396  info.append(std::to_string(len));
397  qsz.push_back(len);
398  }
399  fBnetInfo = info;
400 
401  fWorkerHierarchy.SetField("queues", qsz);
402  fWorkerHierarchy.SetField("ninputs", NumInputs());
403  fWorkerHierarchy.SetField("build_events", fAllBuildEvents);
404  fWorkerHierarchy.SetField("build_data", fAllRecvBytes);
405  fWorkerHierarchy.SetField("discard_events", fAllDiscEvents);
406  }
407 
408  if (fBNETsend) {
409  std::string node_state = "";
410  double node_quality = 1;
411  int node_progress = 0;
412 
413  std::vector<uint64_t> hubs, ports, hubs_progress, recv_sizes, recv_bufs, hubs_dropev, hubs_lostev;
414  std::vector<std::string> calibr, hubs_state, hubs_info;
415  std::vector<double> hubs_quality, hubs_rates;
416  for (unsigned n=0;n<fCfg.size();n++) {
417  InputCfg &inp = fCfg[n];
418 
419  hubs.push_back(inp.fHubId);
420  ports.push_back(inp.fUdpPort);
421  calibr.push_back(inp.fCalibr);
422 
423  unsigned nbuf = NumCanRecv(n);
424  uint64_t bufsz = TotalSizeCanRecv(n);
425 
426  if (inp.fIter.IsData()) {
427  nbuf++;
428  bufsz += inp.fIter.remained_size();
429  }
430 
431  recv_bufs.push_back(nbuf);
432  recv_sizes.push_back(bufsz);
433 
434  if (!inp.fCalibrReq && !inp.fCalibr.empty()) {
435  dabc::Command cmd("GetCalibrState");
436  cmd.SetInt("indx",n);
437  cmd.SetReceiver(inp.fCalibr);
438  dabc::mgr.Submit(Assign(cmd));
439  inp.fCalibrReq = true;
440  }
441 
442  std::string hub_state = "", sinfo = "";
443  hadaq::TransportInfo *info = (hadaq::TransportInfo *) inp.fInfo;
444  double rate = 0., hub_quality = 1;
445  int hub_progress = 100;
446 
447  if (!info) {
448  sinfo = "missing transport-info";
449  } else {
450  inp.fHubSizeTmCnt++;
451 
452  if (info->fTotalRecvBytes > inp.fHubLastSize)
453  rate = (info->fTotalRecvBytes - inp.fHubLastSize)/1024.0/1024.0;
454  else if (inp.fHubSizeTmCnt <= 15)
455  rate = (info->fTotalRecvBytes - inp.fHubPrevSize)/1024.0/1024.0/inp.fHubSizeTmCnt;
456 
457  if (inp.fHubLastSize != info->fTotalRecvBytes) {
458  inp.fHubSizeTmCnt = 0;
459  inp.fHubPrevSize = inp.fHubLastSize;
460  } else if ((inp.fHubSizeTmCnt > 0.75*fEventBuildTimeout) && (hub_quality > 0.1)) {
461  hub_state = "NoData";
462  hub_quality = 0.1;
463  hub_progress = 0;
464  } else if ((inp.fHubSizeTmCnt > 7) && (hub_quality > 0.6)) {
465  hub_state = "LowData";
466  hub_quality = 0.6;
467  hub_progress = 0;
468  }
469 
470  inp.fHubLastSize = info->fTotalRecvBytes;
471  sinfo = dabc::format("port:%d %5.3f MB/s data:%s pkts:%s buf:%s disc:%s d32:%s drop:%s lost:%s errbits:%s ",
472  info->fNPort,
473  rate,
474  dabc::size_to_str(info->fTotalRecvBytes).c_str(),
475  dabc::number_to_str(info->fTotalRecvPacket,1).c_str(),
477  info->GetDiscardString().c_str(),
478  info->GetDiscard32String().c_str(),
479  dabc::number_to_str(inp.fDroppedTrig,0).c_str(),
480  dabc::number_to_str(inp.fLostTrig,0).c_str(),
481  dabc::number_to_str(inp.fErrorBitsCnt,0).c_str());
482 
483  sinfo += inp.TriggerRingAsStr(16);
484  }
485 
486  hubs_dropev.push_back(inp.fDroppedTrig);
487  hubs_lostev.push_back(inp.fLostTrig);
488 
489  if (!inp.fCalibr.empty() && (inp.fCalibrQuality < hub_quality)) {
490  hub_state = inp.fCalibrState;
491  hub_quality = inp.fCalibrQuality;
492  hub_progress = inp.fCalibrProgr;
493  }
494 
495  if ((hub_progress > 0) && ((node_progress == 0) || (hub_progress < node_progress)))
496  node_progress = hub_progress;
497 
498  if (hub_quality < node_quality) {
499  node_quality = hub_quality;
500  node_state = hub_state;
501  }
502 
503  hubs_state.push_back(hub_state);
504  hubs_info.push_back(sinfo);
505  hubs_quality.push_back(hub_quality);
506  hubs_progress.push_back(hub_progress);
507  hubs_rates.push_back(rate);
508  }
509 
510  std::string info = "BnetSend:";
511  std::vector<int64_t> qsz;
512  for (unsigned n=0;n<NumOutputs();++n) {
513  unsigned len = NumCanSend(n);
514  info.append(" ");
515  info.append(std::to_string(len));
516  qsz.push_back(len);
517  }
518  fBnetInfo = info;
519 
520  if (node_state.empty()) {
521  node_state = "Ready";
522  node_quality = 1.;
523  node_progress = 100;
524  }
525 
526  fWorkerHierarchy.SetField("hubs", hubs);
527  fWorkerHierarchy.SetField("hubs_info", hubs_info);
528  fWorkerHierarchy.SetField("ports", ports);
529  fWorkerHierarchy.SetField("calibr", calibr);
530  fWorkerHierarchy.SetField("state", node_state);
531  fWorkerHierarchy.SetField("quality", node_quality);
532  fWorkerHierarchy.SetField("progress", node_progress);
533  fWorkerHierarchy.SetField("nbuilders", NumOutputs());
534  fWorkerHierarchy.SetField("queues", qsz);
535  fWorkerHierarchy.SetField("hubs_dropev",hubs_dropev);
536  fWorkerHierarchy.SetField("hubs_lostev",hubs_lostev);
537  fWorkerHierarchy.SetField("hubs_state", hubs_state);
538  fWorkerHierarchy.SetField("hubs_quality", hubs_quality);
539  fWorkerHierarchy.SetField("hubs_progress", hubs_progress);
540  fWorkerHierarchy.SetField("hubs_rates", hubs_rates);
541  fWorkerHierarchy.SetField("recv_bufs", recv_bufs);
542  fWorkerHierarchy.SetField("recv_sizes", recv_sizes);
543 
544  fWorkerHierarchy.GetHChild("State").SetField("value", node_state);
545  }
546 
547  fBnetStat = fBldProfiler.Format();
548 
549  // fBnetStat = dabc::format("BldStat: calls:%ld inp:%ld out:%ld buf:%ld timer:%ld", fBldCalls, fInpCalls, fOutCalls, fBufCalls, fTimerCalls);
550 
551  fBldCalls = fInpCalls = fOutCalls = fBufCalls = fTimerCalls = 0;
552 }
553 
556 
557 
559 {
560  DOUT5("CombinerModule::ShiftToNextBuffer %d ", ninp);
561 
562  InputCfg& cfg = fCfg[ninp];
563 
564  ReadIterator& iter = (cfg.fResortIndx < 0) ? cfg.fIter : cfg.fResortIter;
565 
566  iter.Close();
567 
568  dabc::Buffer buf;
569 
570  if (cfg.fResortIndx < 0) {
571  // normal way to take next buffer
572  if(!CanRecv(ninp)) return false;
573  buf = Recv(ninp);
574  fNumReadBuffers++;
575  } else {
576  // do not try to look further than one more buffer
577  if (cfg.fResortIndx>1) return false;
578  // when doing resort, try to access buffers from the input queue
579  buf = RecvQueueItem(ninp, cfg.fResortIndx++);
580  }
581 
582  if (buf.GetTypeId() == dabc::mbt_EOF) {
583  // Stop();
584  return false;
585  }
586 
587  return iter.Reset(buf);
588 }
589 
590 bool hadaq::CombinerModule::ShiftToNextHadTu(unsigned ninp)
591 {
592  InputCfg &cfg = fCfg[ninp];
593  ReadIterator &iter = (cfg.fResortIndx < 0) ? cfg.fIter : cfg.fResortIter;
594 
595  while (true) {
596 
597  bool res = false;
598  if (iter.IsData())
599  res = iter.NextSubeventsBlock();
600 
601  if (res && iter.IsData()) return true;
602 
603  if(!ShiftToNextBuffer(ninp)) return false;
604 
605  // DOUT0("Inp%u next buffer distance %u", ninp, iter.OnlyDebug());
606  } // while (!foundhadtu)
607 
608  return false;
609 }
610 
611 
612 int hadaq::CombinerModule::CalcTrigNumDiff(const uint32_t& prev, const uint32_t& next)
613 {
614  int res = (int) (next) - prev;
615  if (res > (int) fMaxHadaqTrigger/2) res -= fMaxHadaqTrigger; else
616  if (res < (int) fMaxHadaqTrigger/-2) res += fMaxHadaqTrigger;
617  return res;
618 }
619 
620 bool hadaq::CombinerModule::ShiftToNextEvent(unsigned ninp, bool fast, bool dropped)
621 {
622  // function used to shift to next event - used in BNET builder mode
623 
624  InputCfg& cfg = fCfg[ninp];
625 
626  if (dropped && cfg.has_data) cfg.fDroppedTrig++;
627 
628  cfg.Reset(fast);
629 
630  ReadIterator& iter = cfg.fIter;
631 
632  if (!iter.NextEvent())
633  // retry in next hadtu container
634  if (!ShiftToNextHadTu(ninp)) return false;
635 
636  // no need to analyze data
637  if (fast) return true;
638 
639  // this is selected event
640  cfg.evnt = iter.evnt();
641  cfg.has_data = true;
642  cfg.data_size = cfg.evnt->AllSubeventsSize();
643 
644  uint32_t seq = cfg.evnt->GetSeqNr();
645 
646  cfg.fTrigNr = (seq >> 8) & fTriggerRangeMask;
647  cfg.fTrigTag = seq & 0xFF;
648 
649  cfg.fTrigNumRing[cfg.fRingCnt] = cfg.fTrigNr;
650  cfg.fRingCnt = (cfg.fRingCnt+1) % HADAQ_RINGSIZE;
651 
652  cfg.fEmpty = (cfg.data_size == 0);
653  cfg.fDataError = cfg.evnt->GetDataError();
654 
655  cfg.fTrigType = cfg.evnt->GetId() & 0xF;
656 
657  // int diff = CalcTrigNumDiff(cfg.fLastTrigNr,cfg.fTrigNr);
658  // if (diff != 1)
659  // DOUT0("Inp%u Diff%d %x %x distance: %u", ninp, diff, cfg.fLastTrigNr, cfg.fTrigNr, iter.OnlyDebug());
660 
661  cfg.fLastTrigNr = cfg.fTrigNr;
662 
663  return true;
664 }
665 
666 
667 bool hadaq::CombinerModule::ShiftToNextSubEvent(unsigned ninp, bool fast, bool dropped)
668 {
669  if (fBNETrecv) return ShiftToNextEvent(ninp, fast, dropped);
670 
671  DOUT5("CombinerModule::ShiftToNextSubEvent %d ", ninp);
672 
673  InputCfg &cfg = fCfg[ninp];
674 
675 #ifdef HADAQ_DEBUG
676  if (dropped && cfg.has_data)
677  fprintf(stderr, "Input%u Trig:%6x Tag:%2x DROP\n", ninp, cfg.fTrigNr, cfg.fTrigTag);
678 #endif
679 
680 
681  bool foundevent(false), doshift(true), tryresort(cfg.fResort);
682 
683  if (cfg.fResortIndx >= 0) {
684  doshift = false; // do not shift event in main iterator
685  if (cfg.subevnt) cfg.subevnt->SetTrigNr(0xffffffff); // mark subevent as used
686  cfg.fResortIndx = -1;
687  cfg.fResortIter.Close();
688  } else {
689  // account when subevent exists but intentionally dropped
690  if (dropped && cfg.has_data) cfg.fDroppedTrig++;
691  }
692 
693  cfg.Reset(fast);
694 
695  // if (fast) DOUT0("FAST DROP on inp %d", ninp);
696 
697  while (!foundevent) {
698  ReadIterator &iter = (cfg.fResortIndx < 0) ? cfg.fIter : cfg.fResortIter;
699 
700  bool res = true;
701  if (doshift) res = iter.NextSubEvent();
702  doshift = true;
703 
704  if (!res || (iter.subevnt() == 0)) {
705  DOUT5("CombinerModule::ShiftToNextSubEvent %d with zero NextSubEvent()", ninp);
706 
707  // retry in next hadtu container
708  if (ShiftToNextHadTu(ninp)) continue;
709 
710  if ((cfg.fResortIndx>=0) && (NumCanRecv(ninp) > 1)) {
711  // we have at least 2 buffers in the queue and cannot find required subevent
712  // seems to be, we should use next event from normal queue
713  cfg.fResortIndx = -1;
714  cfg.fResortIter.Close();
715  doshift = false;
716  tryresort = false;
717  continue;
718  }
719 
720  // no more input buffers available
721  return false;
722  }
723 
724  // no need to analyze data
725  if (fast) return true;
726 
727  if (tryresort && (cfg.fLastTrigNr!=0xffffffff)) {
728  uint32_t trignr = iter.subevnt()->GetTrigNr();
729  if (trignr==0xffffffff) continue; // this is processed trigger, exclude it
730 
731  int diff = CalcTrigNumDiff(cfg.fLastTrigNr, (trignr >> 8) & fTriggerRangeMask);
732 
733  if (diff!=1) {
734 
735  if (cfg.fResortIndx < 0) {
736  cfg.fResortIndx = 0;
737  cfg.fResortIter = cfg.fIter;
738  }
739  continue;
740  }
741  }
742 
743  foundevent = true;
744 
745  // this is selected subevent
746  cfg.subevnt = iter.subevnt();
747  cfg.has_data = true;
748  cfg.data_size = cfg.subevnt->GetPaddedSize();
749 
750  cfg.fTrigNr = (cfg.subevnt->GetTrigNr() >> 8) & fTriggerRangeMask;
751  cfg.fTrigTag = cfg.subevnt->GetTrigNr() & 0xFF;
752 
753  // try to fix problem with TRB2 readout
754  // Produced sequence of trigger numbers are: 0x2bffff, 0x2b0000, 0x2c0001 and repeated every 64k events
755 
756  if (((cfg.fTrigNr & 0xffff) == 0) && // lower two bytes in trigger id are 0 (from 0x2b0000)
757  (fTriggerRangeMask > 0x100000) && // more than 4+16 bits used in trigger mask
758  (cfg.fResortIndx < 0) && // do not try to resort data, normally enabled for very special cases
759  (cfg.fLastTrigNr != 0xffffffff) && // last trigger is not dummy
760  ((cfg.fLastTrigNr & 0xffff) == 0xffff) && // lower byte of last trigger is 0xffff (from 0x2bffff)
761  ((cfg.fTrigNr & 0xffff0000) == (cfg.fLastTrigNr & 0xffff0000))) // high bytes are same in last and now (0x2b == 0x2b)
762  cfg.fTrigNr = (cfg.fLastTrigNr + 1) & fTriggerRangeMask;
763 
764 #ifdef HADAQ_DEBUG
765  fprintf(stderr, "Input%u Trig:%6x Tag:%2x\n", ninp, cfg.fTrigNr, cfg.fTrigTag);
766 #endif
767 
768  cfg.fTrigNumRing[cfg.fRingCnt] = cfg.fTrigNr;
769  cfg.fRingCnt = (cfg.fRingCnt+1) % HADAQ_RINGSIZE;
770 
771  cfg.fEmpty = cfg.subevnt->GetSize() <= sizeof(hadaq::RawSubevent);
772  cfg.fDataError = cfg.subevnt->GetDataError();
773 
774  cfg.fHubId = cfg.subevnt->GetId() & 0xffff;
775 
776  /* Evaluate trigger type:*/
777  /* NEW for trb3: trigger type is part of decoding word*/
778  if (!fHadesTriggerType) {
779  cfg.fTrigType = cfg.subevnt->GetTrigTypeTrb3();
780  } else if (cfg.fHubId == fHadesTriggerHUB) {
781  unsigned wordNr = 2;
782  uint32_t bitmask = 0xff000000; /* extended mask to contain spill on/off bit*/
783  uint32_t bitshift = 24;
784  // above from args.c defaults
785  uint32_t val = cfg.subevnt->Data(wordNr - 1);
786  cfg.fTrigType = (val & bitmask) >> bitshift;
787  //DOUT0("Inp:%u use trb2 trigger type 0x%x", ninp, cfg.fTrigType);
788  } else {
789  cfg.fTrigType = 0;
790  }
791 
792  uint32_t errorBits = cfg.subevnt->GetErrBits();
793 
794  if ((errorBits != 0) && (errorBits != 1))
795  cfg.fErrorBitsCnt++;
796 
797  int diff = 1;
798  if (cfg.fLastTrigNr != 0xffffffff)
799  diff = CalcTrigNumDiff(cfg.fLastTrigNr, cfg.fTrigNr);
800  cfg.fLastTrigNr = cfg.fTrigNr;
801 
802  if (diff>1) cfg.fLostTrig += (diff-1);
803  }
804 
805  return true;
806 }
807 
809 {
810  DOUT0("hadaq::CombinerModule::DropAllInputBuffers()...");
811 
812  unsigned maxnumsubev(0), droppeddata(0);
813 
814  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
815  unsigned numsubev = 0;
816 
817  do {
818  if (fCfg[ninp].has_data) numsubev++;
819  droppeddata += fCfg[ninp].data_size;
820  } while (ShiftToNextSubEvent(ninp, true, true));
821 
822  if (numsubev>maxnumsubev) maxnumsubev = numsubev;
823 
824  fCfg[ninp].Reset();
825  fCfg[ninp].Close();
826  while (SkipInputBuffers(ninp, 100)); // drop input port queue buffers until no more there
827  }
828 
829  Par(fLostEventRateName).SetValue(maxnumsubev);
830  Par(fDataDroppedRateName).SetValue(droppeddata/1024./1024.);
831  fRunDiscEvents += maxnumsubev;
832  fAllDiscEvents += maxnumsubev;
833  fRunDroppedData += droppeddata;
834  fAllDroppedData += droppeddata;
835 
836  return true;
837 }
838 
839 int hadaq::CombinerModule::DestinationPort(uint32_t trignr)
840 {
841  if (!fBNETsend || (NumOutputs()<2)) return -1;
842 
843  return (trignr/fBNETbunch) % NumOutputs();
844 }
845 
846 bool hadaq::CombinerModule::CheckDestination(uint32_t trignr)
847 {
848  if (!fBNETsend || (fLastTrigNr==0xffffffff)) return true;
849 
850  return DestinationPort(fLastTrigNr) == DestinationPort(trignr);
851 }
852 
854 {
855  // RETURN VALUE: true - event is successfully build, recall immediately
856  // false - leave event loop for framework (other modules input is required!)
857 
858  // eventbuilding on hadtu streams here:
859 
860  // this is daq_evtbuild logic:
861  // first check eventnumber of master channel
862  // here loop over all channels: skip subevts with too old eventnumbers
863  // if event is not complete, discard this and try next master channel index
864 
865  // adjust run number that might have changed by file output
866  //fRunNumber=GetEvtbuildParValue("runId"); // PERFORMANCE?
867  // note: file outout will overwrite this number in event header to be consistent with file name
868  // for online monitor, we could live with different run numbers
869 
871  // alternative approach like a simplified mbs event building:
874  // first input loop: find out maximum trignum of all inputs = current event trignumber
875 
876 
877  dabc::ProfilerGuard grd(fBldProfiler, "bld", 0);
878 
879  fBldCalls++;
880 
881  if (fExtraDebug) {
882  double tm = fLastProcTm.SpentTillNow(true);
883  if (tm > fMaxProcDist) fMaxProcDist = tm;
884  }
885 
886  // DOUT0("hadaq::CombinerModule::BuildEvent() starts");
887 
888  unsigned masterchannel(0), min_inp(0);
889  uint32_t subeventssize(0), mineventid(0), maxeventid(0), buildevid(0);
890  bool incomplete_data(false), any_data(false);
891  int missing_inp(-1);
892 
893  grd.Next("shft");
894 
895  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
896  if (!fCfg[ninp].has_data)
897  if (!ShiftToNextSubEvent(ninp)) {
898  // could not get subevent data on any channel.
899  // let framework do something before next try
900  if (fExtraDebug && fLastDebugTm.Expired(2.)) {
901  DOUT1("Fail to build event while input %u is not ready numcanrecv %u maxtm = %5.3f ", ninp, NumCanRecv(ninp), fMaxProcDist);
902  fLastDebugTm.GetNow();
903  fMaxProcDist = 0;
904  }
905 
906  missing_inp = ninp;
907  incomplete_data = true;
908  continue;
909  }
910 
911  uint32_t evid = fCfg[ninp].fTrigNr;
912 
913  if (!any_data) {
914  any_data = true;
915  mineventid = evid;
916  maxeventid = evid;
917  buildevid = evid;
918  min_inp = ninp;
919  } else {
920  if (CalcTrigNumDiff(evid, maxeventid) < 0)
921  maxeventid = evid;
922 
923  if (CalcTrigNumDiff(mineventid, evid) < 0) {
924  mineventid = evid;
925  min_inp = ninp;
926  }
927  }
928  } // for ninp
929 
930  grd.Next("drp");
931 
932  // we always build event with maximum trigger id = newest event, discard incomplete older events
933  int diff = incomplete_data ? 0 : CalcTrigNumDiff(mineventid, maxeventid);
934 
935 // DOUT0("Min:%8u Max:%8u diff:%5d", mineventid, maxeventid, diff);
936 
938  // check too large triggertag difference on input channels or very long delay in building,
939  // to repair situation, try to flush all input buffers
940  if (fLastDropTm.Expired((fEventBuildTimeout > 0) ? 1.5*fEventBuildTimeout : 5.))
941  if (((fTriggerNrTolerance > 0) && (diff > fTriggerNrTolerance)) || ((fEventBuildTimeout > 0) && fLastBuildTm.Expired(fEventBuildTimeout) && any_data && (fCfg.size() > 1))) {
942 
943  std::string msg;
944  if ((fTriggerNrTolerance > 0) && (diff > fTriggerNrTolerance)) {
945  msg = dabc::format(
946  "Event id difference %d exceeding tolerance window %d (min input %u),",
947  diff, fTriggerNrTolerance, min_inp);
948  } else {
949  msg = dabc::format("No events were build since at least %.1f seconds,", fEventBuildTimeout);
950  }
951 
952  if (missing_inp >= 0) {
953  msg += dabc::format(" missing data on input %d url: %s,", missing_inp, FindPort(InputName(missing_inp)).Cfg("url").AsStr().c_str());
954  }
955  msg += " drop all!";
956 
957  SetInfo(msg, true);
958  DOUT0(msg.c_str());
959 
960 #ifdef HADAQ_DEBUG
961  fprintf(stderr, "DROP ALL\n");
962 #endif
963 
964  DropAllInputBuffers();
965 
966  fAllFullDrops++;
967 
968  if (fExtraDebug && fLastDebugTm.Expired(1.)) {
969  DOUT1("Drop all buffers");
970  fLastDebugTm.GetNow();
971  }
972  fLastDropTm.GetNow();
973 
974  return false; // retry on next set of buffers
975  }
976 
977 
978  grd.Next("chkcomp");
979 
980  if (incomplete_data) return false;
981 
982  uint32_t buildtag = fCfg[masterchannel].fTrigTag;
983 
984  // printf("build evid = %u\n", buildevid);
985 
987  // second input loop: skip all subevents until we reach current trignum
988  // select inputs which will be used for building
989  //bool eventIsBroken=false;
990  bool dataError(false), tagError(false);
991 
992  bool hasCompleteEvent = true;
993 
994  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
995  bool foundsubevent = false;
996  while (!foundsubevent) {
997  uint32_t trignr = fCfg[ninp].fTrigNr;
998  uint32_t trigtag = fCfg[ninp].fTrigTag;
999  bool isempty = fCfg[ninp].fEmpty;
1000  bool haserror = fCfg[ninp].fDataError;
1001  if (trignr == buildevid) {
1002 
1003  if (!isempty || !fSkipEmpty) {
1004  // check also trigtag:
1005  if (trigtag != buildtag) tagError = true;
1006  if (haserror) dataError = true;
1007  subeventssize += fCfg[ninp].data_size;
1008  }
1009  foundsubevent = true;
1010  break;
1011 
1012  } else
1013  if (CalcTrigNumDiff(trignr, buildevid) > 0) {
1014 
1015  int droppedsize = fCfg[ninp].data_size;
1016 
1017 #ifdef HADAQ_DEBUG
1018  fprintf(stderr, "Input%u TrigNr:%6x Skip while building %6x diff %u\n", ninp, trignr, buildevid, CalcTrigNumDiff(trignr, buildevid));
1019 #endif
1020 
1021  // DOUT0("Drop data inp %u size %d", ninp, droppedsize);
1022 
1023  fDataDroppedRateCnt += droppedsize;
1024 
1025  // Par(fDataDroppedRateName).SetValue(droppedsize/1024./1024.);
1026  fRunDroppedData += droppedsize;
1027  fAllDroppedData += droppedsize;
1028 
1029  if(!ShiftToNextSubEvent(ninp, false, true)) {
1030  if (fExtraDebug && fLastDebugTm.Expired(2.)) {
1031  DOUT1("Cannot shift data from input %d", ninp);
1032  fLastDebugTm.GetNow();
1033  }
1034 
1035  return false;
1036  }
1037  // try with next subevt until reaching buildevid
1038 
1039  continue;
1040  } else {
1041 
1042  // we want to build event with id, defined by input 0
1043  // but subevent in this input has number bigger than buildevid
1044  // it will not be possible to build buildevid, therefore mark it as incomplete
1045  hasCompleteEvent = false;
1046 
1047  // let also verify all other channels
1048  break;
1049  }
1050 
1051  } // while foundsubevent
1052  } // for ninpt
1053 
1054  grd.Next("buf");
1055 
1056  // here all inputs should be aligned to buildevid
1057 
1058  // for sync sequence number, check first if we have error from cts:
1059  uint32_t sequencenumber = fRunBuildEvents + 1; // HADES convention: sequencenumber 0 is "start event" of file
1060 
1061  if (fBNETsend)
1062  sequencenumber = (fCfg[masterchannel].fTrigNr << 8) | fCfg[masterchannel].fTrigTag;
1063 
1064  if (hasCompleteEvent && fCheckTag && tagError) {
1065  hasCompleteEvent = false;
1066 
1067  if (fBNETrecv) DOUT0("TAG error");
1068 
1069  fRunTagErrors++;
1070  }
1071 
1072  // provide normal buffer
1073 
1074  if (hasCompleteEvent) {
1075  if (fOut.IsBuffer() && (!fOut.IsPlaceForEvent(subeventssize) || !CheckDestination(buildevid))) {
1076  // first we close current buffer
1077  if (!FlushOutputBuffer()) {
1078  if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1079  std::string sendmask;
1080  for (unsigned n=0;n<NumOutputs();n++)
1081  sendmask.append(CanSend(n) ? "o" : "x");
1082 
1083  DOUT0("FlushOutputBuffer can't send to all %u outputs sendmask = %s", NumOutputs(), sendmask.c_str());
1084  fLastDebugTm.GetNow();
1085  }
1086  return false;
1087  }
1088  }
1089  // after flushing last buffer, take next one:
1090  if (!fOut.IsBuffer()) {
1091  dabc::Buffer buf = TakeBuffer();
1092  if (buf.null()) {
1093 
1094  if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1095  DOUT0("did not have new buffer - wait for it");
1096  fLastDebugTm.GetNow();
1097  }
1098 
1099  return false;
1100  }
1101  if (!fOut.Reset(buf)) {
1102  SetInfo("Cannot use buffer for output - hard error!!!!", true);
1103  buf.Release();
1105  if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1106  DOUT0("Abort application completely");
1107  fLastDebugTm.GetNow();
1108  }
1109  return false;
1110  }
1111  }
1112  // now check working buffer for space:
1113  if (!fOut.IsPlaceForEvent(subeventssize)) {
1114  DOUT0("New buffer has not enough space, skip subevent!");
1115  hasCompleteEvent = false;
1116  }
1117  }
1118 
1119  // now we should be able to build event
1120  if (hasCompleteEvent) {
1121  // EVENT BUILDING IS HERE
1122 
1123  grd.Next("compl");
1124 
1125  fOut.NewEvent(sequencenumber, fRunNumber); // like in hadaq, event sequence number is independent of trigger.
1126  fRunBuildEvents++;
1127  fAllBuildEvents++;
1128 
1129  fOut.evnt()->SetDataError((dataError || tagError));
1130  if (dataError) fRunDataErrors++;
1131  if (tagError) fRunTagErrors++;
1132 
1133  unsigned trigtyp = 0;
1134  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
1135  trigtyp = fCfg[ninp].fTrigType;
1136  if (trigtyp) break;
1137  }
1138 
1139  // here event id, always from "cts master channel" 0
1140  unsigned currentid = trigtyp | (2 << 12); // DAQVERSION=2 for dabc
1141  //fEventIdCount[currentid & (HADAQ_NEVTIDS - 1)]++;
1142  fEventIdCount[currentid & 0xF]++; // JAM: problem with spill bit?
1143  fOut.evnt()->SetId(currentid & (HADAQ_NEVTIDS_IN_FILE - 1));
1144 
1145  grd.Next("main");
1146 
1147  // third input loop: build output event from all not empty subevents
1148  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
1149  if (fCfg[ninp].fEmpty && fSkipEmpty) continue;
1150  if (fBNETrecv)
1151  fOut.AddAllSubevents(fCfg[ninp].evnt);
1152  else
1153  fOut.AddSubevent(fCfg[ninp].subevnt);
1154  DoInputSnapshot(ninp); // record current state of event tag and queue level for control system
1155  } // for ninp
1156 
1157 
1158  grd.Next("after");
1159 
1160  fOut.FinishEvent();
1161 
1162  int diff = 1;
1163  if (fLastTrigNr!=0xffffffff) diff = CalcTrigNumDiff(fLastTrigNr, buildevid);
1164 
1165  //if (fBNETsend && (diff!=1))
1166  // DOUT0("%s %x %x %d", GetName(), fLastTrigNr, buildevid, diff);
1167  // if (fBNETsend) DOUT0("%s trig %x size %u", GetName(), buildevid, subeventssize);
1168 
1169 #ifdef HADAQ_DEBUG
1170  fprintf(stderr, "BUILD:%6x\n", buildevid);
1171 #endif
1172 
1173  if (fBNETrecv && fEvnumDiffStatistics && (fBNETNumRecv > 1) && (diff > fBNETbunch)) {
1174  // check if we really lost these events
1175  // int diff0 = diff;
1176 
1177  long ncycles = diff / (fBNETbunch * fBNETNumRecv);
1178 
1179  // substract big cycles
1180  diff -= ncycles * (fBNETbunch * fBNETNumRecv);
1181 
1182  // substract expected gap to previous cycle
1183  diff -= fBNETbunch * (fBNETNumRecv - 1);
1184  if (diff <= 0) diff = 1;
1185 
1186  // add lost events from big cycles
1187  diff += ncycles * fBNETbunch;
1188 
1189  // if (diff != 1) {
1190  // DOUT0("Large EVENT difference %d bunch %ld ncycles %ld final %d", diff0, fBNETbunch, ncycles, diff);
1191  //}
1192  }
1193 
1194  fLastTrigNr = buildevid;
1195 
1196  fEventRateCnt++;
1197  // Par(fEventRateName).SetValue(1);
1198 
1199  if (fEvnumDiffStatistics && (diff > 1)) {
1200 
1201  if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1202  DOUT1("Events gap %d", diff-1);
1203  fLastDebugTm.GetNow();
1204  }
1205 
1206  fLostEventRateCnt += (diff-1);
1207  //Par(fLostEventRateName).SetValue(diff-1);
1208  fRunDiscEvents += (diff-1);
1209  fAllDiscEvents += (diff-1);
1210  }
1211 
1212  unsigned currentbytes = subeventssize + sizeof(hadaq::RawEvent);
1213  fRunRecvBytes += currentbytes;
1214  fAllRecvBytes += currentbytes;
1215  fDataRateCnt += currentbytes;
1216  // Par(fDataRateName).SetValue(currentbytes / 1024. / 1024.);
1217 
1218  fLastBuildTm.GetNow();
1219  } else {
1220  grd.Next("lostl", 14);
1221  fLostEventRateCnt += 1;
1222  // Par(fLostEventRateName).SetValue(1);
1223  fRunDiscEvents += 1;
1224  fAllDiscEvents += 1;
1225  } // ensure outputbuffer
1226 
1227  std::string debugmask;
1228  debugmask.resize(fCfg.size(), ' ');
1229 
1230  grd.Next("shift", 15);
1231 
1232  // FINAL loop: proceed to next subevents
1233  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++)
1234  if (fCfg[ninp].fTrigNr == buildevid) {
1235  debugmask[ninp] = 'o';
1236  ShiftToNextSubEvent(ninp, false, !hasCompleteEvent);
1237  } else {
1238  debugmask[ninp] = 'x';
1239  }
1240 
1241  if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1242  DOUT1("Did building as usual mask %s complete = %5s maxdist = %5.3f s", debugmask.c_str(), DBOOL(hasCompleteEvent), fMaxProcDist);
1243  fLastDebugTm.GetNow();
1244  fMaxProcDist = 0;
1245  // put here update of tid
1246  // fPID= syscall(SYS_gettid);
1247  }
1248 
1249  // return true means that method can be called again immediately
1250  // in all places one requires while loop
1251  return true; // event is build successfully. try next one
1252 }
1253 
1254 
1255 void hadaq::CombinerModule::DoInputSnapshot(unsigned ninp)
1256 {
1257  // copy here input properties at the moment of event building to stats:
1258 
1259  auto &cfg = fCfg[ninp];
1260 
1261  cfg.fNumCanRecv = NumCanRecv(ninp);
1262  cfg.fQueueLevel = (cfg.fQueueCapacity > 0) ? 1. * cfg.fNumCanRecv / cfg.fQueueCapacity : 0.;
1263  cfg.fLastEvtBuildTrigId = (cfg.fTrigNr << 8) | (cfg.fTrigTag & 0xff);
1264 }
1265 
1266 
1268 {
1269  bool do_start = false, do_stop = false;
1270 
1271  if (cmd.IsName("StartHldFile")) {
1272  do_start = do_stop = true;
1273  SetInfo("Execute StartHldFile");
1274 
1275  DOUT0("******************* START HLD FILE *************");
1276  } else if (cmd.IsName("StopHldFile")) {
1277  do_stop = true;
1278  SetInfo("Execute StopHldFile");
1279  DOUT0("******************* STOP HLD FILE *************");
1280 
1281  } else if (cmd.IsName("RestartHldFile")) {
1282  if (NumOutputs()<2) return dabc::cmd_false;
1283  SetInfo("Execute RestartHldFile");
1284  cmd.ChangeName("RestartTransport");
1285  SubmitCommandToTransport(OutputName(1), cmd);
1286  return dabc::cmd_postponed;
1287  } else if (cmd.IsName("BnetFileControl")) {
1288  if (NumOutputs()<2) return dabc::cmd_false;
1289  if (!fBnetFileCmd.null()) fBnetFileCmd.Reply(dabc::cmd_false);
1290 
1291  std::string mode = cmd.GetStr("mode");
1292 
1293  if (mode == "start") {
1294  SetInfo("Execute BnetFileControl");
1295  for (unsigned k=1;k<NumOutputs();++k) {
1296  dabc::Command subcmd("RestartTransport");
1297  subcmd.SetBool("only_prefix", true);
1298  subcmd.SetStr("prefix", cmd.GetStr("prefix"));
1299  SubmitCommandToTransport(OutputName(k), Assign(subcmd));
1300  }
1301  fBnetFileCmd = cmd;
1302  fBnetFileCmd.SetInt("#replies", NumOutputs()-1);
1303  if (!fBnetFileCmd.IsTimeoutSet()) fBnetFileCmd.SetTimeout(30);
1304  return dabc::cmd_postponed;
1305  }
1306 
1307  if (mode == "stop") {
1308  if (fRunNumber) StoreRunInfoStop();
1309  // reset runid
1310  fRunNumber = 0;
1311 
1312  FlushOutputBuffer(); // need to ensure that all output data are moved to outputs
1313 
1314  // submit dummy buffer to the HLD outputs to stop current file
1315  for (unsigned k=1;k<NumOutputs();++k) {
1316  if (CanSend(k)) {
1317  dabc::Buffer eolbuf = TakeBuffer();
1318  if (eolbuf.null()) {
1319  EOUT("FAIL to SEND EOL buffer to OUTPUT %d", k);
1320  } else {
1321  DOUT2("SEND EOL to OUTPUT %d %d", k, eolbuf.GetTotalSize());
1323  Send(k, eolbuf);
1324  }
1325  }
1326  }
1327  }
1328 
1329  return dabc::cmd_true;
1330 
1331  } else if (cmd.IsName("HCMD_DropAllBuffers")) {
1332 
1333  DropAllInputBuffers();
1334  fAllFullDrops++;
1335  fLastDropTm.GetNow();
1336 
1337  if (fBNETsend && !fIsTerminating) {
1338  for (unsigned n = 0; n < NumInputs(); n++) {
1339  fCfg[n].fErrorBitsCnt = 0;
1340  fCfg[n].fDroppedTrig = 0;
1341  fCfg[n].fLostTrig = 0;
1342  fCfg[n].fHubSizeTmCnt = 0;
1343  fCfg[n].fHubLastSize = 0;
1344  fCfg[n].fHubPrevSize = 0;
1345  dabc::Command subcmd("ResetTransportStat");
1346  SubmitCommandToTransport(InputName(n), subcmd);
1347  }
1348  }
1349 
1350  cmd.SetStrRawData("true");
1351  return dabc::cmd_true;
1352 
1353  } else if (cmd.IsName("BnetCalibrControl")) {
1354 
1355  if (!fBNETsend || fIsTerminating || (NumInputs()==0))
1356  return dabc::cmd_true;
1357 
1358  if (!fBnetCalibrCmd.null()) {
1359  EOUT("Still calibration command running");
1360  fBnetCalibrCmd.Reply(dabc::cmd_false);
1361  }
1362 
1363  fBnetCalibrCmd = cmd;
1364  fBnetCalibrCmd.SetInt("#replies", NumInputs());
1365  fBnetCalibrCmd.SetDouble("quality", 1.0);
1366 
1367  std::string rundir = "";
1368  unsigned runid = cmd.GetUInt("runid");
1369 
1370  if ((cmd.GetStr("mode") != "start") && !fBNETCalibrDir.empty() && (runid != 0)) {
1371  rundir = fBNETCalibrDir;
1372  rundir.append("/");
1373  rundir.append(hadaq::FormatFilename(runid));
1374  std::string mkdir = "mkdir -p ";
1375  mkdir.append(rundir);
1376  auto res = system(mkdir.c_str());
1377  (void) res; // avoid compiler warnings
1378  fBnetCalibrCmd.SetStr("#rundir", rundir);
1379  rundir.append("/");
1380  }
1381 
1382  DOUT0("Combiner get BnetCalibrControl mode %s rundir %s", cmd.GetStr("mode").c_str(), rundir.c_str());
1383 
1384  for (unsigned n = 0; n < NumInputs(); n++) {
1385  dabc::Command subcmd("TdcCalibrations");
1386  subcmd.SetStr("mode", cmd.GetStr("mode"));
1387  subcmd.SetStr("rundir", rundir);
1388  SubmitCommandToTransport(InputName(n), Assign(subcmd));
1389  }
1390 
1391  return dabc::cmd_postponed;
1392  } else if (cmd.IsName("BnetCalibrRefresh")) {
1393 
1394  if (!fBNETsend || fIsTerminating || (NumInputs()==0))
1395  return dabc::cmd_true;
1396 
1397  if (!fBnetRefreshCmd.null()) {
1398  EOUT("Still calibration command running");
1399  fBnetRefreshCmd.Reply(dabc::cmd_false);
1400  }
1401 
1402  fBnetRefreshCmd = cmd;
1403  fBnetRefreshCmd.SetInt("#replies", NumInputs());
1404  fBnetRefreshCmd.SetDouble("quality", 1.0);
1405 
1406  for (unsigned n = 0; n < NumInputs(); n++) {
1407  dabc::Command subcmd("CalibrRefresh");
1408  SubmitCommandToTransport(InputName(n), Assign(subcmd));
1409  }
1410 
1411  return dabc::cmd_postponed;
1412 
1413  } else {
1415  }
1416 
1417  bool res = true;
1418 
1419  if (do_stop) {
1420  if (NumOutputs()>1)
1421  res = DisconnectPort(OutputName(1));
1422 
1423  DOUT0("Stop HLD file res = %s", DBOOL(res));
1424  }
1425 
1426  if (do_start && res) {
1427  std::string fname = cmd.GetStr("filename", "file.hld");
1428  int maxsize = cmd.GetInt(dabc::xml_maxsize, 1500);
1429 
1430  std::string url = dabc::format("hld://%s?%s=%d", fname.c_str(), dabc::xml_maxsize, maxsize);
1431 
1432  // we guarantee, that at least two ports will be created
1433  EnsurePorts(0, 2);
1434 
1435  res = dabc::mgr.CreateTransport(OutputName(1, true), url);
1436 
1437  DOUT0("Start HLD file %s res = %s", fname.c_str(), DBOOL(res));
1438  }
1439 
1440  return cmd_bool(res);
1441 }
1442 
1443 
1445 {
1446  /* open ascii file eb_runinfo2ora.txt to store simple information for
1447  the started RUN. The format: start <run_id> <filename> <date> <time>
1448  where "start" is a key word which defines START RUN info. -S.Y.
1449  */
1450  if(!fRunToOracle || fRunNumber==0) return;
1451  time_t t = fRunNumber + hadaq::HADAQ_TIMEOFFSET; // new run number defines start time
1452  char ltime[20]; /* local time */
1453  struct tm tm_res;
1454  strftime(ltime, 20, "%Y-%m-%d %H:%M:%S", localtime_r(&t, &tm_res));
1455  std::string filename=GenerateFileName(fRunNumber); // new run number defines filename
1456  FILE *fp = fopen(fRunInfoToOraFilename.c_str(), "a+");
1457  if (fp) {
1458  fprintf(fp, "start %u %d %s %s\n", fRunNumber, fEBId, filename.c_str(), ltime);
1459  fclose(fp);
1460  }
1461  DOUT1("Write run info to %s - start: %lu %d %s %s ", fRunInfoToOraFilename.c_str(), fRunNumber, fEBId, filename.c_str(), ltime);
1462 
1463 }
1464 
1465 void hadaq::CombinerModule::StoreRunInfoStop(bool onexit, unsigned newrunid)
1466 {
1467  /* open ascii file eb_runinfo2ora.txt to store simple information for
1468  the stoped RUN. The format: stop <run_id> <date> <time> <events> <bytes>
1469  where "stop" is a key word which defines STOP RUN info. -S.Y.
1470  */
1471 
1472  if(!fRunToOracle || fRunNumber==0) return; // suppress void output at beginning
1473  // JAM we do not use our own time, but time of next run given by epics master
1474  // otherwise mismatch between run start time that comes before run stop time!
1475  // note that this problem also occured with old EBs
1476  // only exception: when eventbuilder is discarded we use termination time!
1477  time_t t;
1478  if(onexit || (newrunid==0))
1479  t = time(NULL);
1480  else
1481  t = newrunid + hadaq::HADAQ_TIMEOFFSET; // new run number defines stop time
1482  char ltime[20]; /* local time */
1483  struct tm tm_res;
1484  strftime(ltime, 20, "%Y-%m-%d %H:%M:%S", localtime_r(&t, &tm_res));
1485  std::string filename = GenerateFileName(fRunNumber); // old run number defines old filename
1486  FILE *fp = fopen(fRunInfoToOraFilename.c_str(), "a+");
1487  if (fp) {
1488  fprintf(fp, "stop %u %d %s %s %s ", fRunNumber, fEBId, filename.c_str(), ltime, Unit(fRunBuildEvents));
1489  fprintf(fp, "%s\n", Unit(fRunRecvBytes));
1490  fclose(fp);
1491  }
1492  DOUT1("Write run info to %s - stop: %lu %d %s %s %s %s", fRunInfoToOraFilename.c_str(), fRunNumber, fEBId, filename.c_str(), ltime, Unit(fRunBuildEvents),Unit(fRunRecvBytes));
1493 
1494 }
1495 
1497 {
1498  // DO NOT RESET COUNTERS IN BNET MODE
1499  fRunRecvBytes = 0;
1500  fRunBuildEvents = 0;
1501  fRunDiscEvents = 0;
1502  fRunDroppedData = 0;
1503  fRunTagErrors = 0;
1504  fRunDataErrors = 0;
1505 
1506  if (!fBNETrecv && !fIsTerminating)
1507  for (unsigned n = 0; n < NumInputs(); n++) {
1508  SubmitCommandToTransport(InputName(n), dabc::Command("ResetTransportStat"));
1509 
1510  fCfg[n].fLastEvtBuildTrigId = 0;
1511  }
1512 
1513  for (unsigned i = 0; i < HADAQ_NEVTIDS; i++)
1514  fEventIdCount[i] = 0;
1515 }
1516 
1517 char* hadaq::CombinerModule::Unit(unsigned long v)
1518 {
1519 
1520  // JAM stolen from old hadaq eventbuilders to keep precisely same format
1521  static char retVal[16];
1522  static char u[] = " kM";
1523  unsigned int i;
1524 
1525  for (i = 0; v >= 10000 && i < sizeof(u) - 2; v /= 1000, i++) {
1526  }
1527  snprintf(retVal, sizeof(retVal), "%4lu%c", v, u[i]);
1528 
1529  return retVal;
1530 }
1531 
1532 std::string hadaq::CombinerModule::GenerateFileName(unsigned runid)
1533 {
1534  return fPrefix + hadaq::FormatFilename(fRunNumber,fEBId) + std::string(".hld");
1535 }
1536 
1538 {
1539  if (cmd.IsName("GetHadaqTransportInfo")) {
1540  unsigned id = cmd.GetUInt("id");
1541  if (id < fCfg.size()) {
1542  fCfg[id].fInfo = cmd.GetPtr("Info");
1543  fCfg[id].fUdpPort = cmd.GetUInt("UdpPort");
1544  fCfg[id].fCalibr = cmd.GetStr("CalibrModule");
1545  }
1546  return true;
1547  } else if (cmd.IsName("GetCalibrState")) {
1548  unsigned n = cmd.GetUInt("indx");
1549  if (n < fCfg.size()) {
1550  fCfg[n].fCalibrReq = false;
1551  // fCfg[n].trb = cmd.GetUInt("trb");
1552  // fCfg[n].tdcs = cmd.GetField("tdcs").AsUIntVect();
1553  fCfg[n].fCalibrProgr = cmd.GetInt("progress");
1554  fCfg[n].fCalibrState = cmd.GetStr("state");
1555  fCfg[n].fCalibrQuality = cmd.GetDouble("quality");
1556  }
1557  return true;
1558  } else if (cmd.IsName("GetTransportStatistic")) {
1559  if (cmd.GetBool("#mbs")) {
1560  fWorkerHierarchy.SetField("mbsinfo", cmd.GetStr("MbsInfo"));
1561  return true;
1562  }
1563 
1564  unsigned runid = cmd.GetUInt("RunId");
1565  std::string runname = cmd.GetStr("RunName");
1566  std::string runprefix = cmd.GetStr("RunPrefix");
1567  unsigned runsz = cmd.GetUInt("RunSize");
1568 
1569  if (cmd.GetBool("#ltsm")) {
1570  // this is LTSM info
1571  fWorkerHierarchy.SetField("ltsmid", runid);
1572  fWorkerHierarchy.SetField("ltsmsize", runsz);
1573  fWorkerHierarchy.SetField("ltsmname", runname);
1574  fWorkerHierarchy.SetField("ltsmprefix", runprefix);
1575  Par("LtsmFileSize").SetValue(runsz/1024./1024.);
1576  return true;
1577  }
1578 
1579  fWorkerHierarchy.SetField("runid", runid);
1580  fWorkerHierarchy.SetField("runsize", runsz);
1581  fWorkerHierarchy.SetField("runname", runname);
1582  fWorkerHierarchy.SetField("runprefix", runprefix);
1583 
1584  Par("RunFileSize").SetValue(runsz/1024./1024.);
1585 
1586  std::string state = "File";
1587  double quality = 0.98;
1588  if ((Par(fEventRateName).Value().AsDouble() == 0) && (quality > 0.55)) { state = "NoData"; quality = 0.55; }
1589  if ((runid==0) && runname.empty() && (quality > 0.5)) { state = "NoFile"; quality = 0.5; }
1590 
1591  fWorkerHierarchy.SetField("state", state);
1592  fWorkerHierarchy.SetField("quality", quality);
1593  fWorkerHierarchy.GetHChild("State").SetField("value", state);
1594 
1595  return true;
1596  } else if (cmd.IsName("RestartTransport")) {
1597  int num = fBnetFileCmd.GetInt("#replies");
1598  if (num == 1) {
1599  unsigned newrunid = fBnetFileCmd.GetUInt("runid");
1600  if (fRunNumber) StoreRunInfoStop(false, newrunid);
1601  std::string newprefix = fBnetFileCmd.GetStr("prefix");
1602  if(!newprefix.empty()) fPrefix = newprefix; // need to reset prefix here for run info JAM2018
1603  // SetEvtbuildPar("prefix",hadaq::Observer::Args_prefixCode(fPrefix.c_str())); // also export changed prefix to EPICS
1604  fRunNumber = newrunid;
1605  ResetInfoCounters();
1606  StoreRunInfoStart();
1607  fBnetFileCmd.Reply(dabc::cmd_true);
1608  } else {
1609  fBnetFileCmd.SetInt("#replies", num-1);
1610  }
1611  return true;
1612  } else if (cmd.IsName("TdcCalibrations")) {
1613  int num = fBnetCalibrCmd.GetInt("#replies");
1614  double q = cmd.GetDouble("quality");
1615  if (q < fBnetCalibrCmd.GetDouble("quality"))
1616  fBnetCalibrCmd.SetDouble("quality", q);
1617 
1618  if (num == 1) {
1619 
1620  std::string rundir = fBnetCalibrCmd.GetStr("#rundir");
1621  DOUT0("COMBINER COMPLETE CALIBR PROCESSING quality %5.3f dir %s", fBnetCalibrCmd.GetDouble("quality"), rundir.c_str());;
1622 
1623  if (!fBNETCalibrPackScript.empty() && !rundir.empty() && (fBnetCalibrCmd.GetStr("mode") == "stop")) {
1624  std::string exec = fBNETCalibrPackScript;
1625  exec.append(" ");
1626  exec.append(rundir);
1627  int res = system(exec.c_str());
1628  DOUT0("EXEC %s res = %d", exec.c_str(), res);
1629  }
1630 
1631  fBnetCalibrCmd.Reply(dabc::cmd_true);
1632  } else {
1633  fBnetCalibrCmd.SetInt("#replies", num-1);
1634  }
1635  return true;
1636  } else if (cmd.IsName("CalibrRefresh")) {
1637  int num = fBnetCalibrCmd.GetInt("#replies");
1638  double q = cmd.GetDouble("quality");
1639  if (q < fBnetRefreshCmd.GetDouble("quality"))
1640  fBnetRefreshCmd.SetDouble("quality", q);
1641  fBnetRefreshCmd.SetInt("#replies", num-1);
1642  if (num == 1)
1643  fBnetRefreshCmd.Reply(dabc::cmd_true);
1644  }
1645 
1646  return dabc::ModuleAsync::ReplyCommand(cmd);
1647 }
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned GetTypeId() const
Definition: Buffer.h:152
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
void SetTypeId(unsigned tid)
Definition: Buffer.h:151
Represents command with its arguments.
Definition: Command.h:99
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
Definition: Command.h:148
bool SetStrRawData(const std::string &str)
Set raw data with string content.
Definition: Command.cxx:340
double GetDouble(const std::string &name, double dflt=0.) const
Definition: Command.h:145
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
Definition: Command.h:264
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
void ChangeName(const std::string &name)
Change command name, should not be used for remote commands.
Definition: Command.cxx:62
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
Definition: Command.cxx:158
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
Special info parameter class.
Definition: Parameter.h:300
void StopApplication()
Definition: Manager.cxx:2231
bool CreateTransport(const std::string &portname, const std::string &transportkind="", const std::string &thrdname="")
Definition: Manager.cxx:2210
int NodeId() const
Definition: Manager.cxx:2086
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Definition: Module.h:240
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
bool SetValue(const RecordField &v)
Set parameter value.
Definition: Parameter.h:205
void FireModified()
Can be called by user to signal framework that parameter was modified.
Definition: Parameter.cxx:555
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
bool Submit(Command cmd)
Definition: Worker.cxx:1139
void ProcessUserEvent(unsigned item) override
Method called by framework when custom user event is produced.
bool ShiftToNextEvent(unsigned ninp, bool fast=false, bool dropped=false)
Shifts to next event in the input queue.
void AfterModuleStop() override
void SetInfo(const std::string &info, bool forceinfo=false)
bool ShiftToNextHadTu(unsigned ninp)
bool CheckDestination(uint32_t trignr)
bool ShiftToNextBuffer(unsigned ninp)
Method should be used to skip current buffer from the queue.
void StoreRunInfoStop(bool onexit=false, unsigned newrunid=0)
void DoInputSnapshot(unsigned ninp)
std::string GenerateFileName(unsigned runid)
char * Unit(unsigned long v)
int DestinationPort(uint32_t trignr)
int ExecuteCommand(dabc::Command cmd) override
Main method where commands are executed.
void ProcessTimerEvent(unsigned timer) override
Method called by framework when timer event is produced.
bool ShiftToNextSubEvent(unsigned ninp, bool fast=false, bool dropped=false)
Shifts to next subevent in the input queue.
void BeforeModuleStart() override
void ModuleCleanup() override
Method, which can be reimplemented by user and should cleanup all references on buffers and other obj...
int CalcTrigNumDiff(const uint32_t &prev, const uint32_t &next)
bool ReplyCommand(dabc::Command cmd) override
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
CombinerModule(const std::string &name, dabc::Command cmd=nullptr)
#define HADAQ_NEVTIDS
#define HADAQ_NEVTIDS_IN_FILE
#define HADAQ_RINGSIZE
std::vector< unsigned > hubs
Definition: hldprint.cxx:996
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
const char * xmlWorkPool
Definition: Object.cxx:46
const char * xml_maxsize
Definition: Object.cxx:73
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
std::string size_to_str(unsigned long sz, int prec=1, int select=0)
Convert size to string of form like 4.2 GB or 3.7 MB.
Definition: string.cxx:75
std::string number_to_str(unsigned long num, int prec=1, int select=0)
Convert number to string of form like 4.2G or 3.7M.
Definition: string.cxx:107
const char * prop_kind
Definition: Hierarchy.cxx:29
const char * xmlFlushTimeout
Definition: Object.cxx:61
@ mbt_EOF
Definition: Buffer.h:45
@ evntUser
Definition: ModuleItem.h:52
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38
const char * xmlHadaqTriggerTollerance
const char * xmlHadesTriggerHUB
const char * xmlMaxNumBuildEvt
const char * xmlEvtbuildTimeout
const char * xmlHadaqDiffEventStats
@ mbt_HadaqStopRun
Definition: HadaqTypeDefs.h:27
@ HADAQ_TIMEOFFSET
Definition: defines.h:115
const char * xmlHadaqTrignumRange
std::string FormatFilename(uint32_t runid, uint16_t ebid=0)
const char * xmlHadesTriggerType
uint32_t CreateRunId()
Hadaq event structure.
Definition: defines.h:443
Hadaq subevent structure.
Definition: defines.h:262
int fNPort
upd port number
Definition: UdpTransport.h:43
uint64_t fTotalRecvPacket
Definition: UdpTransport.h:45
uint64_t fTotalProducedBuffers
Definition: UdpTransport.h:52
std::string GetDiscard32String()
Definition: UdpTransport.h:78
uint64_t fTotalRecvBytes
Definition: UdpTransport.h:50
std::string GetDiscardString()
Definition: UdpTransport.h:68