24 dabc::ModuleAsync(name, cmd),
28 fBuildCompleteEvents(false),
37 if (fEventIdMask == 0) fEventIdMask = 0xffffffff;
45 fExcludeTime = Cfg(
"ExcludeTime", cmd).AsDouble(5.);
49 for (
unsigned n=0;n<NumInputs();n++) {
50 DOUT0(
" MBS COMBINER Port%u: Capacity %u", n, InputQueueCapacity(n));
58 fNumObligatoryInputs = NumInputs();
60 if (flushtmout>0.) CreateTimer(
"FlushTimer", flushtmout);
62 fEventRateName = ratesprefix+
"Events";
63 fDataRateName = ratesprefix+
"Data";
64 fInfoName = ratesprefix+
"Info";
65 fFileStateName= ratesprefix +
"FileOn";
67 DOUT0(
"Create rate %s", fDataRateName.c_str());
69 CreatePar(fDataRateName).SetRatemeter(
false, 3.).SetUnits(
"MB");
70 CreatePar(fEventRateName).SetRatemeter(
false, 3.).SetUnits(
"Ev");
86 CreatePar(fInfoName,
"info").SetSynchron(
true, 2.,
false);
87 CreatePar(fFileStateName).Dflt(
false);
89 PublishPars(
dabc::format(
"$CONTEXT$/%sCombinerModule",ratesprefix.c_str()));
91 SetInfo(
dabc::format(
"MBS combiner module ready. Mode: full events only:%d, subids check:%d flush:%3.1f" ,fBuildCompleteEvents,fCheckSubIds,flushtmout),
true);
96 DOUT0(
"mbs::CombinerModule::DTOR - does nothing!");
101 DOUT0(
"mbs::CombinerModule::ModuleCleanup()");
103 fOut.Close().Release();
104 for (
unsigned n=0;n<fInp.size();n++)
112 Par(fInfoName).SetValue(info);
114 if (forceinfo) Par(fInfoName).FireModified();
134 while (IsRunning() && (cnt<100) && BuildEvent()) ++cnt;
142 if (fOut.IsEmpty() || !fOut.IsBuffer())
return false;
144 if (!CanSendToAllOutputs())
return false;
150 SendToAllOutputs(buf);
159 DOUT0(
"mbs::CombinerModule::BeforeModuleStart name: %s is calling first build event...", GetName());
166 while (cnt<100 && BuildEvent()) cnt++;
168 DOUT0(
"mbs::CombinerModule::BeforeModuleStart name: %s is finished %u %u", GetName(), NumInputs(), NumOutputs());
179 fCfg[ninp].curr_evnt_num = 0;
180 fCfg[ninp].curr_evnt_special =
false;
181 fCfg[ninp].valid =
false;
184 SkipInputBuffers(ninp, 1);
192 fCfg[ninp].curr_evnt_num = 0;
193 fCfg[ninp].curr_evnt_special =
false;
194 fCfg[ninp].valid =
false;
196 bool foundevent(
false);
198 while (!foundevent) {
200 if (!fInp[ninp].IsData()) {
202 if (NumCanRecv(ninp)==0)
return false;
204 if (!fInp[ninp].Reset(RecvQueueItem(ninp, 0))) {
208 SkipInputBuffers(ninp, 1);
213 bool res = fInp[ninp].NextEvent();
215 if (!res || (fInp[ninp].evnt()==0)) {
217 SkipInputBuffers(ninp, 1);
221 if (fCfg[ninp].real_mbs && (fInp[ninp].evnt()->iTrigger>=fSpecialTriggerLimit)) {
224 fCfg[ninp].curr_evnt_special =
true;
225 fCfg[ninp].curr_evnt_num = fInp[ninp].evnt()->EventNumber();
226 DOUT1(
"Found special event with trigger %d on input %u", fInp[ninp].evnt()->iTrigger, ninp);
228 if (fCfg[ninp].real_evnt_num) {
230 fCfg[ninp].curr_evnt_num = fInp[ninp].evnt()->EventNumber() & fEventIdMask;
234 if (fCfg[ninp].no_evnt_num) {
241 fCfg[ninp].curr_evnt_num = 0;
245 if (subevnt->
fFullId == fCfg[ninp].evntsrc_fullid)
break;
246 subevnt = fInp[ninp].evnt()->NextSubEvent(subevnt);
250 uint32_t* data = (uint32_t*) (((uint8_t*) subevnt->
RawData()) + fCfg[ninp].evntsrc_shift);
252 if (fCfg[ninp].evntsrc_shift +
sizeof(uint32_t) <= subevnt->
RawDataSize()) {
254 fCfg[ninp].curr_evnt_num = *data & fEventIdMask;
257 EOUT(
"Subevent too small %u compare with required shift %u for id location", subevnt->
RawDataSize(), fCfg[ninp].evntsrc_shift);
260 EOUT(
"Did not found subevent for id location");
268 fCfg[ninp].valid =
true;
285 bool required_missing(
false);
287 for (
unsigned ninp=0; ninp<fCfg.size(); ninp++) {
290 if (IsOptionalInput(ninp))
continue;
292 if (!IsInputConnected(ninp)) required_missing =
true;
294 if (InputQueueFull(ninp) && (mostly_full<0))
295 mostly_full = (int) ninp;
298 if (required_missing && fBuildCompleteEvents) {
302 for (
unsigned ninp=0; ninp<fCfg.size(); ninp++) {
303 if (InputQueueFull(ninp) ||
304 (fCfg[ninp].no_evnt_num && (NumCanRecv(ninp) > InputQueueCapacity(ninp) / 2))) {
305 ShiftToNextBuffer(ninp);
306 SetInfo(
dabc::format(
"Skip buffer on input %u while some other input is disconnected", ninp));
307 DOUT0(
"Skip buffer on input %u",ninp);
315 int hasTriggerEvent = -1;
320 for (
unsigned ninp=0; ninp<fCfg.size(); ninp++) {
322 fCfg[ninp].selected =
false;
324 if (fCfg[ninp].last_valid_tm <= 0) fCfg[ninp].last_valid_tm = tm_now;
326 if (fInp[ninp].evnt()==0) {
327 if (!ShiftToNextEvent(ninp)) {
329 if (fCfg[ninp].no_evnt_num)
continue;
331 if ((mostly_full>=0) && !fBuildCompleteEvents)
continue;
333 if (fCfg[ninp].optional_input && (tm_now > (fCfg[ninp].last_valid_tm + fExcludeTime)))
continue;
337 fCfg[ninp].last_valid_tm = tm_now;
341 if (fCfg[ninp].no_evnt_num)
continue;
345 if (num_valid == 0) {
349 if (evid < mineventid) mineventid = evid;
else
350 if (evid > maxeventid) maxeventid = evid;
355 if (fCfg[ninp].curr_evnt_special && (hasTriggerEvent<0)) {
356 hasTriggerEvent = ninp;
357 triggereventid = evid;
362 if (num_valid==0)
return false;
369 if (hasTriggerEvent>=0) {
370 buildevid = triggereventid;
371 fCfg[hasTriggerEvent].selected =
true;
377 if (diff > fEventIdMask/2) {
378 buildevid = maxeventid;
379 diff = fEventIdMask - diff + 1;
382 if ((fEventIdTolerance > 0) && (diff > fEventIdTolerance)) {
383 SetInfo(
dabc::format(
"Event id difference %u exceeding tolerance window %u, stopping dabc!", diff, fEventIdTolerance),
true);
389 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++)
390 if (fCfg[ninp].valid && ((fCfg[ninp].curr_evnt_num == buildevid) || fCfg[ninp].no_evnt_num))
391 fCfg[ninp].selected =
true;
396 uint32_t subeventssize = 0;
398 int copyMbsHdrId = -1;
399 std::map<uint32_t, bool> subid_map;
400 unsigned num_selected_important(0), num_selected_all(0);
403 bool duplicatefound =
false;
407 bool important_input_skipped =
false;
409 int firstselected = -1;
413 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
414 if (!fCfg[ninp].selected) {
416 if (fCfg[ninp].no_evnt_num)
continue;
420 if (fCfg[ninp].optional_input)
421 if (fCfg[ninp].valid || (tm_now > (fCfg[ninp].last_valid_tm + fExcludeTime)))
continue;
423 important_input_skipped =
true;
432 if (!IsOptionalInput(ninp)) {
434 if (firstselected<0) firstselected = ninp;
435 num_selected_important++;
436 if (fCfg[ninp].real_mbs && (copyMbsHdrId<0)) copyMbsHdrId = ninp;
439 subeventssize += fInp[ninp].evnt()->SubEventsSize();
442 while (fInp[ninp].NextSubEvent()) {
443 uint32_t
fullid = fInp[ninp].subevnt()->fFullId;
444 if (subid_map.find(
fullid) != subid_map.end()) {
446 duplicatefound =
true;
452 bool do_skip_data =
false;
456 if (fBuildCompleteEvents && important_input_skipped && (hasTriggerEvent<0)) {
457 SetInfo(
dabc::format(
"Skip incomplete event %u, found inputs %u required %u selected %s", buildevid, num_selected_important, NumObligatoryInputs(), sel_str.c_str()));
461 if (duplicatefound && (hasTriggerEvent<0)) {
462 SetInfo(
dabc::format(
"Skip event %u while duplicates subevents found", buildevid));
467 if (fBuildCompleteEvents && (num_selected_important < NumObligatoryInputs())) {
468 SetInfo(
dabc::format(
"Build incomplete event %u, found inputs %u required %u first %d diff %u mostly_full %d", buildevid, num_selected_important, NumObligatoryInputs(), firstselected, diff, mostly_full) );
471 if (important_input_skipped) {
472 SetInfo(
dabc::format(
"Build incomplete event %u, found inputs %u required %u first %d diff %u mostly_full %d", buildevid, num_selected_important, NumObligatoryInputs(), firstselected, diff, mostly_full) );
481 if (fOut.IsBuffer() && !fOut.IsPlaceForEvent(subeventssize))
482 if (!FlushBuffer())
return false;
484 if (!fOut.IsBuffer()) {
487 if (buf.
null())
return false;
489 if (!fOut.Reset(buf)) {
490 SetInfo(
"Cannot use buffer for output - hard error!!!!",
true);
499 if (!fOut.IsPlaceForEvent(subeventssize)) {
500 EOUT(
"Event size %u too big for buffer, skip event %u completely", subeventssize+
sizeof(
mbs::EventHeader), buildevid);
503 if (copyMbsHdrId<0) {
508 DOUT4(
"Building event %u num_valid %u", buildevid, num_valid);
509 fOut.NewEvent(buildevid);
511 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
513 if (fCfg[ninp].selected) {
516 if (copyMbsHdrId<0) copyMbsHdrId = ninp;
518 if (!fInp[ninp].IsData())
519 throw dabc::Exception(
"Input has no buffer but used for event building");
522 fInp[ninp].AssignEventPointer(ptr);
529 fOut.AddSubevent(ptr);
533 fOut.evnt()->CopyHeader(fInp[copyMbsHdrId].evnt());
537 DOUT4(
"Produced event %d subevents %u", buildevid, subeventssize);
539 Par(fEventRateName).SetValue(1);
540 Par(fDataRateName).SetValue((subeventssize +
sizeof(
mbs::EventHeader))/1024./1024.);
543 if (!fOut.IsPlaceForEvent(0))
548 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++)
549 if (fCfg[ninp].selected) {
551 if (do_skip_data && fCfg[ninp].no_evnt_num)
continue;
553 if (ShiftToNextEvent(ninp))
554 fCfg[ninp].last_valid_tm = tm_now;
614 DOUT0(
"Started file %s res = %d", url.c_str(), res);
615 SetInfo(
dabc::format(
"Execute StartFile for %s, result=%d",url.c_str(), res),
true);
616 ChangeFileState(
true);
617 return cmd_bool(res);
620 FindPort(OutputName(1)).Disconnect();
621 SetInfo(
"Stopped file",
true);
622 ChangeFileState(
false);
626 if (NumOutputs()<1) {
627 EOUT(
"No ports was created for the server");
636 DOUT0(
"Started server %s res = %d", url.c_str(), res);
637 SetInfo(
dabc::format(
"Execute StartServer for %s, result=%d",url.c_str(), res),
true);
638 return cmd_bool(res);
641 FindPort(OutputName(0)).Disconnect();
642 SetInfo(
"Stopped server",
true);
650 if (cmd.
IsName(
"ConfigureInput")) {
651 unsigned ninp = cmd.
GetUInt(
"Port", 0);
653 if (ninp<fCfg.size()) {
657 fCfg[ninp].real_mbs = cmd.
GetBool(
"RealMbs", fCfg[ninp].real_mbs);
658 fCfg[ninp].real_evnt_num = cmd.
GetBool(
"RealEvntNum", fCfg[ninp].real_evnt_num);
659 fCfg[ninp].no_evnt_num = cmd.
GetBool(
"NoEvntNum", fCfg[ninp].no_evnt_num);
660 fCfg[ninp].optional_input = cmd.
GetBool(
"Optional", fCfg[ninp].optional_input);
663 fCfg[ninp].evntsrc_fullid = cmd.
GetUInt(
"EvntSrcFullId", fCfg[ninp].evntsrc_fullid);
664 fCfg[ninp].evntsrc_shift = cmd.
GetUInt(
"EvntSrcShift", fCfg[ninp].evntsrc_shift);
668 std::string ratename = cmd.
GetStr(
"RateName",
"");
669 if (!ratename.empty())
670 SetPortRatemeter(InputName(ninp), CreatePar(ratename).SetRatemeter(
false,1.));
675 if (fCfg[ninp].optional_input || fCfg[ninp].no_evnt_num) {
676 if (fNumObligatoryInputs>1) fNumObligatoryInputs--;
680 if (fCfg[ninp].no_evnt_num) {
681 fCfg[ninp].real_mbs =
false;
684 DOUT1(
"Configure input%u of module %s: RealMbs:%s RealEvntNum:%s EvntSrcFullId: 0x%x EvntSrcShift: %u",
686 DBOOL(fCfg[ninp].real_mbs),
DBOOL(fCfg[ninp].real_evnt_num),
687 fCfg[ninp].evntsrc_fullid, fCfg[ninp].evntsrc_shift);
710 SetParValue(fFileStateName, on);
Reference on memory from memory pool.
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Represents command with its arguments.
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
std::string GetStr(const std::string &name, const std::string &dflt="") const
bool GetBool(const std::string &name, bool dflt=false) const
int GetInt(const std::string &name, int dflt=0) const
bool CreateTransport(const std::string &portname, const std::string &transportkind="", const std::string &thrdname="")
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Manipulator with dabc::Buffer class.
BufferSize_t shift(BufferSize_t sz)
void Release()
Releases reference on the object.
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
Read iterator for HADAQ events/subevents.
void SetInfo(const std::string &info, bool forceinfo=false)
virtual unsigned int GetOverflowEventNumber() const
bool ShiftToNextEvent(unsigned ninp)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
CombinerModule(const std::string &name, dabc::Command cmd=nullptr)
virtual void AfterModuleStop()
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
virtual ~CombinerModule()
void ChangeFileState(bool on)
change file on/off state in application
virtual void ModuleCleanup()
Method, which can be reimplemented by user and should cleanup all references on buffers and other obj...
bool ShiftToNextBuffer(unsigned ninp)
Method should be used to skip current buffer from the queue.
virtual void BeforeModuleStart()
std::string format(const char *fmt,...)
const char * xmlFlushTimeout
const char * xmlFileSizeLimit
const char * comStartServer
const char * comStartFile
const char * xmlCombineCompleteOnly
const char * xmlSpecialTriggerLimit
const char * comStopServer
const char * xmlCheckSubeventIds
const char * xmlCombinerRatesPrefix
const char * xmlServerKind
const char * xmlEvidTolerance
const char * xmlServerPort
const char * ServerKindToStr(int kind)
static TimeStamp Now()
Method returns TimeStamp instance with current time stamp value, measured either by fast TSC (if it i...
double AsDouble() const
Return time stamp in form of double (in seconds)