24 dabc::ModuleAsync(name, cmd),
53 if (fNextBufIndx>cnt) fNextBufIndx-=cnt;
else fNextBufIndx = 0;
54 if (fReadyBufIndx>cnt) fReadyBufIndx-=cnt;
else fReadyBufIndx = 0;
58 for (
unsigned n=0;n<fSubs.size();n++) {
59 if (fSubs[n].buf<cnt)
continue;
61 if (n!=tgt) fSubs[tgt] = fSubs[n];
73 if (num==0)
return false;
75 if (num >= fSubs.size()) {
78 for (
unsigned n=0;n<fSubs.size();n++)
79 if (fSubs[n].buf>maxbuf) maxbuf = fSubs[n].buf;
83 DecremntInputIndex(maxbuf+1);
85 SkipInputBuffers(0, maxbuf+1);
90 unsigned minbuf(0xffffffff);
93 for (
unsigned n=num;n<fSubs.size();n++) {
94 if (fSubs[n].buf<minbuf) minbuf = fSubs[n].buf;
95 fSubs[n-num] = fSubs[n];
97 fSubs.resize(fSubs.size() - num);
101 DecremntInputIndex(minbuf);
102 SkipInputBuffers(0, minbuf);
112 bool new_data =
false, full_recv_queue = RecvQueueFull(), flush_data =
false;
114 while (fNextBufIndx < NumCanRecv()) {
117 full_recv_queue = RecvQueueFull();
124 if (fNextBufIndx==0) {
125 if (!CanSend()) { fLastRet = 50;
return false; }
127 DecremntInputIndex();
139 bool was_empty = fSubs.size() == 0;
146 rec.
buf = fNextBufIndx;
152 fSubs.push_back(rec);
158 if ((fReadyBufIndx == fNextBufIndx) && was_empty) {
159 uint32_t prev = fLastTrigger;
161 for (
unsigned n=0;n<fSubs.size();n++) {
162 if (prev!=0xffffffff) {
163 ok = Diff(prev, fSubs[n].trig)==1;
166 prev = fSubs[n].trig;
182 std::sort(fSubs.begin(), fSubs.end(),
SubsComp(
this));
185 if ((fReadyBufIndx>0) && CanSend() && CanRecv()) {
187 DecremntInputIndex();
195 if (!CanSend()) { fLastRet = 20;
return false; }
197 if (fOutBuf.null()) {
198 if (!CanTakeBuffer()) { fLastRet = 10;
return false; }
199 fOutBuf = TakeBuffer();
201 fOutPtr.reset(fOutBuf);
206 while (cnt < fSubs.size()) {
208 if (fLastTrigger!=0xffffffff)
209 diff = Diff(fLastTrigger, fSubs[cnt].trig);
214 EOUT(
"Buf:%3d problem in sorting - older events appeared. Most probably, flush time has wrong value", fBufCnt);
221 if ((fSubs[cnt].buf + 2 > fNextBufIndx) && !full_recv_queue && !flush_data)
break;
223 DOUT3(
"Buf:%3d Saw difference %d with trigger 0x%06x cnt:%u", fBufCnt, diff, fSubs[cnt].trig, fOutPtr.distance_to_ownbuf());
225 DOUT3(
"Allow gap full:%s numcanrecv:%u indx:%u nextbufind:%u",
DBOOL(full_recv_queue), NumCanRecv(), fSubs[cnt].buf, fNextBufIndx);
231 if (fOutPtr.fullsize() < fSubs[cnt].sz) { flush_data =
true;
break; }
233 memcpy(fOutPtr(), fSubs[cnt].subevnt, fSubs[cnt].sz);
234 fOutPtr.shift(fSubs[cnt].sz);
236 fLastTrigger = fSubs[cnt++].trig;
239 if (full_recv_queue) flush_data =
true;
241 if (flush_data && (fOutPtr.distance_to_ownbuf()>0)) {
242 fOutBuf.SetTotalSize(fOutPtr.distance_to_ownbuf());
249 if (RemoveUsedSubevents(cnt)) flush_data =
true;
251 fLastRet = flush_data ? 60 : 70;
263 if (!CanSend())
return;
265 if (--fFlushCnt > 2)
return;
268 unsigned len = fOutPtr.distance_to_ownbuf();
271 fOutBuf.SetTotalSize(len);
278 if (fFlushCnt >= 0)
return;
285 if (cmd.
IsName(
"GetHadaqTransportInfo")) {
Reference on memory from memory pool.
unsigned GetTypeId() const
Represents command with its arguments.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
void EnsurePorts(unsigned numinp=0, unsigned numout=0, const std::string &poolname="")
Method ensure that at least specified number of input and output ports will be created.
uint64_t AsUInt(uint64_t dflt=0) const
double AsDouble(double dflt=0.) const
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Read iterator for HADAQ events/subevents.
hadaq::RawSubevent * subevnt() const
bool NextSubEvent()
Used for sub-events iteration inside current block.
bool NextSubeventsBlock()
Depending from buffer type calls NextHadTu() or NextEvent()
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
SorterModule(const std::string &name, dabc::Command cmd=nullptr)
void DecremntInputIndex(unsigned cnt=1)
uint32_t fLastTrigger
last trigger copied into output
uint32_t fTriggersRange
valid range for the triggers, normally 0x1000000
virtual void ProcessTimerEvent(unsigned)
Method called by framework when timer event is produced.
std::vector< SubsRec > fSubs
vector with subevents data in the buffers
bool RemoveUsedSubevents(unsigned num)
const char * xmlFlushTimeout
const char * xmlHadaqTrignumRange
uint32_t GetPaddedSize() const
uint32_t GetTrigNr() const
void * subevnt
direct pointer on subevent
uint32_t trig
trigger number