DABC (Data Acquisition Backbone Core)  2.9.9
CombinerModule.h
Go to the documentation of this file.
1 // $Id: CombinerModule.h 4707 2021-02-26 11:29:21Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #ifndef HADAQ_CombinerModule
17 #define HADAQ_CombinerModule
18 
19 #ifndef DABC_ModuleAsync
20 #include "dabc/ModuleAsync.h"
21 #endif
22 
23 #ifndef DABC_Profiler
24 #include "dabc/Profiler.h"
25 #endif
26 
27 #ifndef HADAQ_HadaqTypeDefs
28 #include "hadaq/HadaqTypeDefs.h"
29 #endif
30 
31 #ifndef HADAQ_Iterator
32 #include "hadaq/Iterator.h"
33 #endif
34 
35 #define HADAQ_NEVTIDS 64UL /* must be 2^n */
36 #define HADAQ_NEVTIDS_IN_FILE 0UL /* must be 2^n */
37 #define HADAQ_RINGSIZE 100
38 
39 namespace hadaq {
40 
47  class TerminalModule;
48 
50 
51  friend class TerminalModule;
52 
53  protected:
54 
55  struct InputCfg {
58  bool has_data;
59  uint32_t data_size;
60  uint32_t fTrigNr;
61  uint32_t fLastTrigNr;
62  uint32_t fTrigTag;
63  uint32_t fTrigType;
64  unsigned fErrorBitsCnt{0};
65  unsigned fHubId{0};
66  unsigned fUdpPort{0};
67  float fQueueLevel;
69  bool fDataError;
70  bool fEmpty;
71  void* fInfo;
74  unsigned fLostTrig;
75  unsigned fDroppedTrig;
77  unsigned fRingCnt;
78  unsigned fResort;
82  std::string fCalibr;
83  bool fCalibrReq;
85  std::string fCalibrState;
86  double fCalibrQuality;
87  uint64_t fHubLastSize;
88  uint64_t fHubPrevSize;
90 
91 
93  subevnt(0),
94  evnt(0),
95  has_data(false),
96  data_size(0),
97  fTrigNr(0),
98  fLastTrigNr(0xffffffff),
99  fTrigTag(0),
100  fTrigType(0),
101  fQueueLevel(0),
103  fDataError(false),
104  fEmpty(true),
105  fInfo(0),
106  fQueueCapacity(0),
107  fNumCanRecv(0),
108  fLostTrig(0),
109  fDroppedTrig(0),
110  fRingCnt(0),
111  fResort(false),
112  fIter(),
113  fResortIter(),
114  fResortIndx(-1),
115  fCalibr(),
116  fCalibrReq(false),
117  fCalibrProgr(0),
118  fCalibrState(),
119  fCalibrQuality(0.),
120  fHubLastSize(0),
121  fHubPrevSize(0),
122  fHubSizeTmCnt(0)
123  {
124  for (int i=0;i<HADAQ_RINGSIZE;i++)
125  fTrigNumRing[i]=0;
126  }
127 
128  void Reset(bool complete = false)
129  {
130  // used to reset current subevent
131  subevnt = 0;
132  evnt = 0;
133  has_data = false;
134  data_size = 0;
135  fTrigNr = 0;
136  fTrigTag = 0;
137  fTrigType = 0;
138  fDataError = false;
139  // fHubId = 0;
140  fEmpty = true;
141  // do not clear last fill level and last trig id
142  if (complete) {
143  fLastTrigNr = 0xffffffff;
144  fUdpPort = 0;
145  }
146  }
147 
148  void Close()
149  {
150  // used to close all iterators
151  fIter.Close();
152  fResortIter.Close();
153  fResortIndx = -1;
154  }
155 
156  std::string TriggerRingAsStr(int RingSize) {
157  std::string sbuf;
158 
159  unsigned cnt = fRingCnt, prev = 0;
160  if ((int)cnt < RingSize) cnt += HADAQ_RINGSIZE;
161  bool first = true;
162  cnt -= RingSize;
163  for (int n=0;n<RingSize;n++) {
164  if (first && (fTrigNumRing[cnt]==0) && (n!=RingSize-1)) {
165  // ignore 0 in the beginning
166  } else if (first) {
167  sbuf.append(dabc::format(" 0x%06x",(unsigned)fTrigNumRing[cnt]));
168  first = false;
169  } else {
170  sbuf.append(dabc::format(" %d",(int)fTrigNumRing[cnt] - (int)fTrigNumRing[prev]));
171  }
172 
173  prev = cnt; cnt = (cnt+1) % HADAQ_RINGSIZE;
174  }
175  return sbuf;
176  }
177  };
178 
179 
180  /* master stream for event building*/
181  //unsigned fMasterChannel;
182 
185 
188 
192 
193  uint32_t fLastTrigNr;
194 
195  std::vector<InputCfg> fCfg;
196 
198 
200  int32_t fEBId;
201  pid_t fPID;
203  bool fSkipEmpty;
204 
206 
207  int fNumReadBuffers; //< / SL: workaround counter, which indicates how many buffers were taken from queues
208 
209  unsigned fSpecialItemId;
211  double fLastEventRate;
212 
213  bool fCheckTag;
214 
215  bool fBNETsend;
216  bool fBNETrecv;
220  std::string fBNETCalibrDir;
221  std::string fBNETCalibrPackScript;
223 
227 
228  std::string fDataRateName;
229  std::string fDataDroppedRateName;
230  std::string fEventRateName;
231  std::string fLostEventRateName;
232  std::string fInfoName;
233 
234  uint64_t fDataRateCnt;
236  uint64_t fEventRateCnt;
238 
239  uint64_t fRunRecvBytes;
240  uint64_t fRunBuildEvents;
241  uint64_t fRunDroppedData;
242  uint64_t fRunDiscEvents;
243  uint64_t fRunTagErrors;
244  uint64_t fRunDataErrors;
245 
246  uint64_t fAllRecvBytes;
247  uint64_t fAllBuildEvents;
249  uint64_t fAllDiscEvents;
250  uint64_t fAllDroppedData;
251  uint64_t fAllFullDrops;
252 
254 
256  std::string fPrefix;
257 
259  uint32_t fRunNumber;
260 
262  uint32_t fEpicsRunNumber;
263 
267 
271 
272  bool fExtraDebug;
277  double fMaxProcDist;
278 
279  std::string fBnetInfo;
280  std::string fBnetStat;
281 
282  long fBldCalls{0};
283  long fInpCalls{0};
284  long fOutCalls{0};
285  long fBufCalls{0};
286  long fTimerCalls{0};
288 
289  bool BuildEvent();
290 
291  bool FlushOutputBuffer();
292 
293  void DoInputSnapshot(unsigned ninp);
294 
295  void BeforeModuleStart() override;
296 
297  void AfterModuleStop() override;
298 
299  bool ShiftToNextHadTu(unsigned ninp);
300 
302  bool ShiftToNextEvent(unsigned ninp, bool fast = false, bool dropped = false);
303 
305  bool ShiftToNextSubEvent(unsigned ninp, bool fast = false, bool dropped = false);
306 
308  bool ShiftToNextBuffer(unsigned ninp);
309 
310  /* cleanup input buffers in case of too large eventnumber mismatch*/
311  bool DropAllInputBuffers();
312 
313  //uint32_t CurrEventId(unsigned int ninp) const { return fCfg[ninp].curr_evnt_num; }
314 
315  void SetInfo(const std::string &info, bool forceinfo = false);
316 
317 
318  /* Methods to export run begin to oracle via text file*/
319  void StoreRunInfoStart();
320 
321  /* Methods to export run end and statistics to oracle via text file
322  if this is called on eventbuilder exit, we use local time instead ioc/runnumber time*/
323  void StoreRunInfoStop(bool onexit=false, unsigned newrunid = 0);
324 
325  void ResetInfoCounters();
326 
327  /* stolen from daqdata/hadaq/logger.c to keep oracle export output format of numbers*/
328  char* Unit(unsigned long v);
329 
330  /* we synthetise old and new filenames ourselves, since all communication to hldoutput is faulty,
331  because of timeshift between getting new runid and open/close of actual files*/
332  std::string GenerateFileName(unsigned runid);
333 
335 
336  int DestinationPort(uint32_t trignr);
337  bool CheckDestination(uint32_t trignr);
338  void UpdateBnetInfo();
339 
340  void StartEventsBuilding();
341 
342  public:
343  CombinerModule(const std::string &name, dabc::Command cmd = nullptr);
344  virtual ~CombinerModule();
345 
346  void ModuleCleanup() override;
347 
348  void ProcessPoolEvent(unsigned) override { fBufCalls++; StartEventsBuilding(); }
349  void ProcessInputEvent(unsigned) override { fInpCalls++; StartEventsBuilding(); }
350  void ProcessOutputEvent(unsigned) override { fOutCalls++; StartEventsBuilding(); }
351 
352  void ProcessTimerEvent(unsigned timer) override;
353 
354  void ProcessUserEvent(unsigned item) override;
355 
356  int ExecuteCommand(dabc::Command cmd) override;
357 
358  bool ReplyCommand(dabc::Command cmd) override;
359 
360  int CalcTrigNumDiff(const uint32_t &prev, const uint32_t &next);
361  };
362 
363 }
364 
365 #endif
Represents command with its arguments.
Definition: Command.h:99
Base class for user-derived code, implementing event-processing.
Definition: ModuleAsync.h:32
int fBNETbunch
number of events delivered to same event builder
void ProcessUserEvent(unsigned item) override
Method called by framework when custom user event is produced.
std::vector< InputCfg > fCfg
all input-dependent configurations
bool ShiftToNextEvent(unsigned ninp, bool fast=false, bool dropped=false)
Shifts to next event in the input queue.
bool fExtraDebug
when true, extra debug output is created
uint32_t fRunNumber
run id from timeofday for eventbuilding
long fBufCalls
number of buffer processing calls
std::string fDataDroppedRateName
long fOutCalls
number of output processing calls
dabc::TimeStamp fLastDropTm
timer used to avoid too often drop of data
uint32_t fEpicsRunNumber
most recent run id from epics, for multi eventbuilder mode
void AfterModuleStop() override
void SetInfo(const std::string &info, bool forceinfo=false)
bool ShiftToNextHadTu(unsigned ninp)
bool CheckDestination(uint32_t trignr)
int32_t fEBId
eventbuilder id <- node id
dabc::TimeStamp fLastBuildTm
last time when complete event was build
bool fSpecialFired
if user event was already fired
double fLastEventRate
last event rate
bool fBNETsend
indicate that combiner used as BNET sender
bool ShiftToNextBuffer(unsigned ninp)
Method should be used to skip current buffer from the queue.
double fEventBuildTimeout
timeout in seconds since last complete event when previous buffers are dropped
std::string fLostEventRateName
bool fHadesTriggerType
When true, read trigger type as in original hades event builders.
dabc::TimeStamp fLastDebugTm
timer used to generate rare debugs output
uint64_t fAllBuildEvents
number of build events
void StoreRunInfoStop(bool onexit=false, unsigned newrunid=0)
unsigned fEventIdCount[HADAQ_NEVTIDS]
void DoInputSnapshot(unsigned ninp)
dabc::Profiler fBldProfiler
profiler of build event performance
int fBNETNumSend
number of BNET senders
void ProcessOutputEvent(unsigned) override
Method called by framework when output event is produced.
std::string GenerateFileName(unsigned runid)
long fBldCalls
number of build event calls
uint64_t fAllFullDrops
number of complete drops
char * Unit(unsigned long v)
uint32_t fMaxHadaqTrigger
Defines trigger sequence number range for overflow.
long fInpCalls
number of input processing calls
uint64_t fRunBuildEvents
number of build events
unsigned fSpecialItemId
item used to create user events
bool fBNETrecv
indicate that second-level event building is performed
std::string fBnetInfo
info for showing of bnet sender
pid_t fPID
process id of combiner module
dabc::Command fBnetFileCmd
current running bnet file command
void ProcessInputEvent(unsigned) override
Method called by framework when input event is produced.
bool fEvnumDiffStatistics
if true, account difference of subsequent build event numbers as lost events if false,...
uint32_t fLastTrigNr
last number of build event
int DestinationPort(uint32_t trignr)
int ExecuteCommand(dabc::Command cmd) override
Main method where commands are executed.
void ProcessTimerEvent(unsigned timer) override
Method called by framework when timer event is produced.
bool ShiftToNextSubEvent(unsigned ninp, bool fast=false, bool dropped=false)
Shifts to next subevent in the input queue.
std::string fBnetStat
gener-purpose statistic in text form
std::string fEventRateName
dabc::TimeStamp fLastProcTm
last time when event building was called
std::string fBNETCalibrDir
name of extra directory where to store calibrations
void BeforeModuleStart() override
std::string fRunInfoToOraFilename
void ModuleCleanup() override
Method, which can be reimplemented by user and should cleanup all references on buffers and other obj...
int CalcTrigNumDiff(const uint32_t &prev, const uint32_t &next)
bool fSkipEmpty
skip empty subevents in final event, default true
dabc::Command fBnetRefreshCmd
current running refresh command
int fTriggerNrTolerance
maximum allowed difference of trigger numbers (subevent sequence number)
uint64_t fAllBuildEventsLimit
maximal number events to build
double fMaxProcDist
maximal time between calls to BuildEvent method
bool ReplyCommand(dabc::Command cmd) override
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
long fTimerCalls
number of timer events calls
void ProcessPoolEvent(unsigned) override
Method called by framework when pool event is produced.
CombinerModule(const std::string &name, dabc::Command cmd=nullptr)
dabc::Command fBnetCalibrCmd
current running bnet calibration command
std::string fBNETCalibrPackScript
name of script to pack calibration files
int fBNETNumRecv
number of BNET receivers
Read iterator for HADAQ events/subevents.
Definition: Iterator.h:39
Terminal for HADAQ event builder.
Write iterator for HADAQ events/subevents.
Definition: Iterator.h:104
#define HADAQ_NEVTIDS
#define HADAQ_RINGSIZE
std::string format(const char *fmt,...)
Definition: string.cxx:49
Support for HADAQ - HADES DAQ
Definition: api.h:27
Class for acquiring and holding timestamps.
Definition: timing.h:40
uint32_t fTrigNr
keeps current trigger sequence number
ReadIterator fIter
main iterator
unsigned fRingCnt
where next value will be written
ReadIterator fResortIter
additional iterator to check resort
bool fCalibrReq
when true, request was send
int fResortIndx
index of buffer in the queue used for resort iterator (-1 - off)
bool fEmpty
indicates if input has empty data
uint32_t fLastTrigNr
keeps previous trigger sequence number - used to control data lost
unsigned fResort
enables resorting of events
unsigned fUdpPort
if configured, port id
float fQueueLevel
current input queue fill level
bool fDataError
indicates if subevent has data error bit set in header id
double fCalibrQuality
calibration quality
unsigned fHubId
subevent id from given input
int fNumCanRecv
Number buffers can be received.
int fQueueCapacity
capacity of input queue
uint32_t fLastEvtBuildTrigId
remember id of last build event
std::string fCalibr
name of calibration module, used only in terminal
std::string fCalibrState
calibration state
std::string TriggerRingAsStr(int RingSize)
bool has_data
when true, input has data (subevent or bunch of sub events)
unsigned fErrorBitsCnt
number of subevents with non-zero error bits
uint32_t data_size
padded size of current subevent, required in output buffer
uint32_t fTrigTag
keeps current trigger tag
hadaq::RawSubevent * subevnt
actual subevent
unsigned fLostTrig
number of lost triggers (never received by the combiner)
int fHubSizeTmCnt
count how many time data was the same
void Reset(bool complete=false)
uint32_t fTrigNumRing[HADAQ_RINGSIZE]
values of last seen TU ID
void * fInfo
Direct pointer on transport info, used only for debugging.
int fCalibrProgr
calibration progress
unsigned fDroppedTrig
number of dropped triggers (received but dropped by the combiner)
uint32_t fTrigType
current subevent trigger type
hadaq::RawEvent * evnt
actual event
Hadaq event structure.
Definition: defines.h:443
Hadaq subevent structure.
Definition: defines.h:262