DABC (Data Acquisition Backbone Core)  2.9.9
SorterModule.cxx
Go to the documentation of this file.
1 // $Id: SorterModule.cxx 4480 2020-04-15 14:40:06Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "hadaq/SorterModule.h"
17 
18 #include <algorithm>
19 
20 #include "hadaq/Iterator.h"
21 
22 
23 hadaq::SorterModule::SorterModule(const std::string &name, dabc::Command cmd) :
24  dabc::ModuleAsync(name, cmd),
25  fFlushCnt(5),
26  fBufCnt(0),
27  fLastRet(0),
28  fNextBufIndx(0),
29  fReadyBufIndx(0),
30  fSubs(),
31  fOutBuf(),
32  fOutPtr()
33 {
34  // we need at least one input and one output port
36 
37  double flushtime = Cfg(dabc::xmlFlushTimeout, cmd).AsDouble(0.3);
38 
39  if (flushtime > 0.)
40  CreateTimer("FlushTimer", flushtime);
41 
43  fLastTrigger = 0xffffffff;
44 
45  fSubs.reserve(1024);
46 }
47 
49 {
50  // remove *cnt* buffers from the input queue
51  // all references should be removed
52 
53  if (fNextBufIndx>cnt) fNextBufIndx-=cnt; else fNextBufIndx = 0;
54  if (fReadyBufIndx>cnt) fReadyBufIndx-=cnt; else fReadyBufIndx = 0;
55 
56  unsigned tgt(0);
57 
58  for (unsigned n=0;n<fSubs.size();n++) {
59  if (fSubs[n].buf<cnt) continue;
60  fSubs[n].buf-=cnt;
61  if (n!=tgt) fSubs[tgt] = fSubs[n];
62  tgt++;
63  }
64 
65  fSubs.resize(tgt);
66 }
67 
69 {
70  // remove used entries from subs list
71  // return true if any buffer from inp
72 
73  if (num==0) return false;
74 
75  if (num >= fSubs.size()) {
76  // this is full clear of indexed data
77  unsigned maxbuf(0);
78  for (unsigned n=0;n<fSubs.size();n++)
79  if (fSubs[n].buf>maxbuf) maxbuf = fSubs[n].buf;
80 
81  fSubs.clear();
82 
83  DecremntInputIndex(maxbuf+1);
84 
85  SkipInputBuffers(0, maxbuf+1);
86 
87  return true;
88  }
89 
90  unsigned minbuf(0xffffffff);
91 
92  // first check indexes of buffer which are removed
93  for (unsigned n=num;n<fSubs.size();n++) {
94  if (fSubs[n].buf<minbuf) minbuf = fSubs[n].buf;
95  fSubs[n-num] = fSubs[n];
96  }
97  fSubs.resize(fSubs.size() - num);
98 
99  if (minbuf>0) {
100  // we skip at least one buffer
101  DecremntInputIndex(minbuf);
102  SkipInputBuffers(0, minbuf);
103  return true; // indicate that buffers were removed
104  }
105 
106  return false; // no buffers removed
107 }
108 
109 
111 {
112  bool new_data = false, full_recv_queue = RecvQueueFull(), flush_data = false;
113 
114  while (fNextBufIndx < NumCanRecv()) {
115 
116  // remember state of the queue before we access it
117  full_recv_queue = RecvQueueFull();
118 
119  dabc::Buffer buf = RecvQueueItem(0, fNextBufIndx);
120 
121  // special handling for EOF buffer
122  // either flush all data or just forward EOF buffer
123  if (buf.GetTypeId()==dabc::mbt_EOF) {
124  if (fNextBufIndx==0) {
125  if (!CanSend()) { fLastRet = 50; return false; }
126  buf = Recv();
127  DecremntInputIndex();
128  Send(buf);
129  fFlushCnt = 5;
130  fLastRet = 40;
131  return true;
132  }
133  flush_data = true;
134  break;
135  }
136 
137  hadaq::ReadIterator iter(buf);
138  fBufCnt++;
139  bool was_empty = fSubs.size() == 0;
140 
141  // scan buffer
142  while (iter.NextSubeventsBlock())
143  while (iter.NextSubEvent()) {
144  SubsRec rec;
145  rec.subevnt = iter.subevnt();
146  rec.buf = fNextBufIndx;
147  rec.trig = (iter.subevnt()->GetTrigNr() >> 8) & (fTriggersRange-1);
148  rec.sz = iter.subevnt()->GetPaddedSize();
149 
150  // DOUT1("Event 0x%06x size %3u", rec.trig, rec.sz);
151 
152  fSubs.push_back(rec);
153  new_data = true;
154  }
155 
156  // check if buffer can be used as is
157  // all ids are in the order and corresponds to previous values
158  if ((fReadyBufIndx == fNextBufIndx) && was_empty) {
159  uint32_t prev = fLastTrigger;
160  bool ok(true);
161  for (unsigned n=0;n<fSubs.size();n++) {
162  if (prev!=0xffffffff) {
163  ok = Diff(prev, fSubs[n].trig)==1;
164  if (!ok) break;
165  }
166  prev = fSubs[n].trig;
167  }
168 
169  if (ok) {
170  fLastTrigger = prev;
171  fReadyBufIndx++;
172  fSubs.clear(); // no need to keep array
173  new_data = false;
174  }
175  }
176 
177  fNextBufIndx++;
178  }
179 
180  // sort current array
181  if (new_data)
182  std::sort(fSubs.begin(), fSubs.end(), SubsComp(this));
183 
184  // simple case - retransmit buffer from input to output
185  if ((fReadyBufIndx>0) && CanSend() && CanRecv()) {
186  dabc::Buffer buf = Recv();
187  DecremntInputIndex();
188  Send(buf);
189  fFlushCnt = 5;
190  fLastRet = 30;
191  return true;
192  }
193 
194  // no need to try if cannot send buffer
195  if (!CanSend()) { fLastRet = 20; return false; }
196 
197  if (fOutBuf.null()) {
198  if (!CanTakeBuffer()) { fLastRet = 10; return false; }
199  fOutBuf = TakeBuffer();
200  fOutBuf.SetTypeId(hadaq::mbt_HadaqSubevents);
201  fOutPtr.reset(fOutBuf);
202  }
203 
204  // one could allow gaps in the trigger IDs if more than 2 items in the input queue
205  unsigned cnt = 0;
206  while (cnt < fSubs.size()) {
207  int diff = 1;
208  if (fLastTrigger!=0xffffffff)
209  diff = Diff(fLastTrigger, fSubs[cnt].trig);
210 
211  if (diff!=1) {
212 
213  if (diff<0) {
214  EOUT("Buf:%3d problem in sorting - older events appeared. Most probably, flush time has wrong value", fBufCnt);
215  cnt++; // skip subevent
216  continue;
217  }
218 
219  // if buffer for such subevents in two last buffers, wait for next data
220  // if EOF buffer was seen before, flush subevents immediately
221  if ((fSubs[cnt].buf + 2 > fNextBufIndx) && !full_recv_queue && !flush_data) break;
222 
223  DOUT3("Buf:%3d Saw difference %d with trigger 0x%06x cnt:%u", fBufCnt, diff, fSubs[cnt].trig, fOutPtr.distance_to_ownbuf());
224 
225  DOUT3("Allow gap full:%s numcanrecv:%u indx:%u nextbufind:%u", DBOOL(full_recv_queue), NumCanRecv(), fSubs[cnt].buf, fNextBufIndx);
226 
227  // even after the gap, event taken into output buffer
228  }
229 
230  // check if output buffer has enough space
231  if (fOutPtr.fullsize() < fSubs[cnt].sz) { flush_data = true; break; }
232 
233  memcpy(fOutPtr(), fSubs[cnt].subevnt, fSubs[cnt].sz);
234  fOutPtr.shift(fSubs[cnt].sz);
235 
236  fLastTrigger = fSubs[cnt++].trig;
237  }
238 
239  if (full_recv_queue) flush_data = true;
240 
241  if (flush_data && (fOutPtr.distance_to_ownbuf()>0)) {
242  fOutBuf.SetTotalSize(fOutPtr.distance_to_ownbuf());
243  fOutPtr.reset();
244  Send(fOutBuf);
245  fFlushCnt = 5;
246  }
247 
248  // if buffers were removed from input queue, call retransmit again
249  if (RemoveUsedSubevents(cnt)) flush_data = true;
250 
251  fLastRet = flush_data ? 60 : 70;
252 
253  return flush_data;
254 }
255 
256 
258 {
259  // timer events used for data flush
260  // if after 3 timer events no data was send, any data filled into output buffer will be send
261  // if nothing happened after 6 timer events, any indexed data will be placed into output buffer and send
262 
263  if (!CanSend()) return; // first of all, check if we can send data
264 
265  if (--fFlushCnt > 2) return;
266 
267  // flush buffer if any data is accumulated
268  unsigned len = fOutPtr.distance_to_ownbuf();
269  if (len>0) {
270  // DOUT1("Buf:%3d Flush output counter %d subs.size %u nextbuf:%u numcanrev:%u lastret:%d", fBufCnt, fFlushCnt, fSubs.size(), fNextBufIndx, NumCanRecv(), fLastRet);
271  fOutBuf.SetTotalSize(len);
272  fOutPtr.reset();
273  Send(fOutBuf);
274  fFlushCnt = 5;
275  return;
276  }
277 
278  if (fFlushCnt >= 0) return;
279  // send any remained data and clear buffers
280 
281 }
282 
284 {
285  if (cmd.IsName("GetHadaqTransportInfo")) {
286  if (SubmitCommandToTransport(InputName(0), cmd)) return dabc::cmd_postponed;
287  return dabc::cmd_true;
288  }
289 
291 }
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned GetTypeId() const
Definition: Buffer.h:152
Represents command with its arguments.
Definition: Command.h:99
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
Definition: Module.cxx:109
void EnsurePorts(unsigned numinp=0, unsigned numout=0, const std::string &poolname="")
Method ensure that at least specified number of input and output ports will be created.
Definition: Module.cxx:66
uint64_t AsUInt(uint64_t dflt=0) const
Definition: Record.cxx:525
double AsDouble(double dflt=0.) const
Definition: Record.cxx:549
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
Read iterator for HADAQ events/subevents.
Definition: Iterator.h:39
hadaq::RawSubevent * subevnt() const
Definition: Iterator.h:90
bool NextSubEvent()
Used for sub-events iteration inside current block.
Definition: Iterator.cxx:199
bool NextSubeventsBlock()
Depending from buffer type calls NextHadTu() or NextEvent()
Definition: Iterator.cxx:166
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
SorterModule(const std::string &name, dabc::Command cmd=nullptr)
void DecremntInputIndex(unsigned cnt=1)
uint32_t fLastTrigger
last trigger copied into output
Definition: SorterModule.h:60
uint32_t fTriggersRange
valid range for the triggers, normally 0x1000000
Definition: SorterModule.h:59
virtual void ProcessTimerEvent(unsigned)
Method called by framework when timer event is produced.
std::vector< SubsRec > fSubs
vector with subevents data in the buffers
Definition: SorterModule.h:63
bool RemoveUsedSubevents(unsigned num)
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
const char * xmlWorkPool
Definition: Object.cxx:46
const char * xmlFlushTimeout
Definition: Object.cxx:61
@ mbt_EOF
Definition: Buffer.h:45
@ cmd_postponed
Definition: Command.h:42
@ cmd_true
Definition: Command.h:38
@ mbt_HadaqSubevents
Definition: HadaqTypeDefs.h:26
const char * xmlHadaqTrignumRange
uint32_t GetPaddedSize() const
Definition: defines.h:184
uint32_t GetTrigNr() const
Definition: defines.h:274
uint32_t buf
buffer indx
Definition: SorterModule.h:44
uint32_t sz
padded size
Definition: SorterModule.h:45
void * subevnt
direct pointer on subevent
Definition: SorterModule.h:42
uint32_t trig
trigger number
Definition: SorterModule.h:43