20 #include <sys/types.h>
30 dabc::ModuleAsync(name, cmd),
34 fIsTerminating(false),
39 fEvnumDiffStatistics(true)
43 fSpecialItemId = CreateUserItem(
"BuildEvents");
44 fSpecialFired =
false;
46 fBldProfiler.Reserve(50);
57 fAllBuildEventsLimit = 0;
66 fEBId = Cfg(
"NodeId", cmd).AsInt(-1);
69 fBNETsend = Cfg(
"BNETsend", cmd).AsBool(
false);
70 fBNETrecv = Cfg(
"BNETrecv", cmd).AsBool(
false);
71 fBNETbunch = Cfg(
"EB_EVENTS", cmd).AsInt(16);
72 fBNETNumRecv = Cfg(
"BNET_NUMRECEIVERS", cmd).AsInt(1);
73 fBNETNumSend = Cfg(
"BNET_NUMSENDERS", cmd).AsInt(1);
75 fExtraDebug = Cfg(
"ExtraDebug", cmd).AsBool(
true);
77 fCheckTag = Cfg(
"CheckTag", cmd).AsBool(
true);
79 fSkipEmpty = Cfg(
"SkipEmpty", cmd).AsBool(
true);
81 fBNETCalibrDir = Cfg(
"CalibrDir", cmd).AsStr();
82 fBNETCalibrPackScript = Cfg(
"CalibrPack", cmd).AsStr();
86 fLastTrigNr = 0xffffffff;
88 fTriggerRangeMask = 0;
90 if (fBNETrecv || fBNETsend)
96 fTriggerRangeMask = fMaxHadaqTrigger-1;
97 DOUT1(
"HADAQ %s module using maxtrigger 0x%x, rangemask:0x%x", GetName(), fMaxHadaqTrigger, fTriggerRangeMask);
101 if (fTriggerNrTolerance == -1) fTriggerNrTolerance = fMaxHadaqTrigger / 4;
107 std::string ratesprefix =
"Hadaq";
109 for (
unsigned n = 0; n < NumInputs(); n++) {
112 fCfg[n].fResort = FindPort(InputName(n)).Cfg(
"resort").AsBool(
false);
113 if (fCfg[n].fResort)
DOUT0(
"Do resort on input %u",n);
120 CreateTimer(
"FlushTimer", (fFlushTimeout > 0) ? fFlushTimeout/2. : 1.);
125 fRunInfoToOraFilename =
dabc::format(
"eb_runinfo2ora_%d.txt",fEBId);
127 fPrefix = Cfg(
"FilePrefix", cmd).AsStr(
"no");
128 fRunToOracle = Cfg(
"Runinfo2ora", cmd).AsBool(
false);
130 fDataRateName = ratesprefix +
"Data";
131 fEventRateName = ratesprefix +
"Events";
132 fLostEventRateName = ratesprefix +
"LostEvents";
133 fDataDroppedRateName = ratesprefix +
"DroppedData";
134 fInfoName = ratesprefix +
"Info";
136 CreatePar(fDataRateName).SetRatemeter(
false, 3.).SetUnits(
"MB");
137 CreatePar(fEventRateName).SetRatemeter(
false, 3.).SetUnits(
"Ev");
138 CreatePar(fLostEventRateName).SetRatemeter(
false, 3.).SetUnits(
"Ev");
139 CreatePar(fDataDroppedRateName).SetRatemeter(
false, 3.).SetUnits(
"MB");
141 fDataRateCnt = fEventRateCnt = fLostEventRateCnt = fDataDroppedRateCnt = 0;
144 CreatePar(
"RunFileSize").SetUnits(
"MB").SetFld(
dabc::prop_kind,
"rate").SetFld(
"#record",
true);
145 CreatePar(
"LtsmFileSize").SetUnits(
"MB").SetFld(
dabc::prop_kind,
"rate").SetFld(
"#record",
true);
146 CreateCmdDef(
"BnetFileControl").SetField(
"_hidden",
true);
147 }
else if (fBNETsend) {
148 CreateCmdDef(
"BnetCalibrControl").SetField(
"_hidden",
true);
149 CreateCmdDef(
"BnetCalibrRefresh").SetField(
"_hidden",
true);
151 CreateCmdDef(
"StartHldFile")
152 .AddArg(
"filename",
"string",
true,
"file.hld")
155 CreateCmdDef(
"StopHldFile");
156 CreateCmdDef(
"RestartHldFile");
159 CreatePar(fInfoName,
"info").SetSynchron(
true, 2.,
false).SetDebugLevel(2);
161 if (IsName(
"Combiner"))
162 PublishPars(
"$CONTEXT$/HadaqCombiner");
166 fWorkerHierarchy.SetField(
"_player",
"DABC.HadaqDAQControl");
168 if (fBNETsend) fWorkerHierarchy.SetField(
"_bnet",
"sender");
170 fWorkerHierarchy.SetField(
"_bnet",
"receiver");
171 fWorkerHierarchy.SetField(
"build_events", 0);
172 fWorkerHierarchy.SetField(
"build_data", 0);
173 fWorkerHierarchy.SetField(
"discard_events", 0);
176 if (fBNETsend || fBNETrecv) {
177 CreateTimer(
"BnetTimer", 1.);
189 DOUT3(
"hadaq::CombinerModule::DTOR..does nothing now!.");
196 DOUT0(
"hadaq::CombinerModule::ModuleCleanup()");
197 fIsTerminating =
true;
198 StoreRunInfoStop(
true);
199 fOut.Close().Release();
201 for (
unsigned n=0;n<fCfg.size();n++)
204 DOUT5(
"hadaq::CombinerModule::ModuleCleanup() after fCfg[n].Reset()");
218 if (!fInfoName.empty()) par = Par(fInfoName);
227 if (TimerName(timer) ==
"BnetTimer") {
232 if ((fFlushTimeout > 0) && (++fFlushCounter > 2)) {
240 Par(fDataRateName).SetValue(fDataRateCnt/1024./1024.);
241 Par(fEventRateName).SetValue(fEventRateCnt);
242 Par(fLostEventRateName).SetValue(fLostEventRateCnt);
243 Par(fDataDroppedRateName).SetValue(fDataDroppedRateCnt/1024./1024.);
245 fDataRateCnt = fEventRateCnt = fLostEventRateCnt = fDataDroppedRateCnt = 0;
247 fLastEventRate = Par(fEventRateName).Value().AsDouble();
250 StartEventsBuilding();
252 if ((fAllBuildEventsLimit > 0) && (fAllBuildEvents >= fAllBuildEventsLimit)) {
254 fAllBuildEventsLimit = 0;
262 if (fLastEventRate > 1000) cnt = 20;
263 if (fLastEventRate > 30000) cnt = 50;
265 while (IsRunning() && (cnt-- > 0)) {
267 if (!BuildEvent())
return;
270 if (!fSpecialFired) {
271 fSpecialFired =
true;
279 if (fSpecialItemId == item) {
281 fSpecialFired =
false;
283 EOUT(
"Get wrong user event");
286 StartEventsBuilding();
292 "HADAQ %s starts. Runid:%d, numinp:%u, numout:%u flush:%3.1f",
293 GetName(), (
int) fRunNumber, NumInputs(), NumOutputs(), fFlushTimeout);
297 fLastDropTm.GetNow();
299 fLastProcTm.GetNow();
300 fLastBuildTm.GetNow();
303 for (
unsigned ninp=0;ninp<fCfg.size();ninp++) {
304 fCfg[ninp].fQueueCapacity = InputQueueCapacity(ninp);
305 if (fBNETrecv)
continue;
308 SubmitCommandToTransport(InputName(ninp), Assign(cmd));
315 "HADAQ %s stopped. CompleteEvents:%d, BrokenEvents:%d, DroppedData:%d, RecvBytes:%d, data errors:%d, tag errors:%d",
316 GetName(), (
int) fAllBuildEvents, (
int) fAllDiscEvents , (
int) fAllDroppedData, (
int) fAllRecvBytes ,(
int) fRunDataErrors ,(
int) fRunTagErrors);
328 if (fOut.IsEmpty() || !fOut.IsBuffer()) {
329 DOUT3(
"FlushOutputBuffer has no buffer to flush");
333 int dest = DestinationPort(fLastTrigNr);
335 if (!CanSendToAllOutputs())
return false;
337 if (!CanSend(dest))
return false;
345 SendToAllOutputs(buf);
356 fBldProfiler.MakeStatistic();
362 if (!fBnetFileCmd.null() && fBnetFileCmd.IsTimedout()) fBnetFileCmd.Reply(
dabc::cmd_false);
365 if ((NumOutputs() < 2) || !SubmitCommandToTransport(OutputName(1), Assign(cmd))) {
366 fWorkerHierarchy.SetField(
"runid", 0);
367 fWorkerHierarchy.SetField(
"runsize", 0);
368 fWorkerHierarchy.SetField(
"runname", std::string());
369 fWorkerHierarchy.SetField(
"runprefix", std::string());
370 fWorkerHierarchy.SetField(
"state",
"NoFile");
371 fWorkerHierarchy.GetHChild(
"State").SetField(
"value",
"NoFile");
372 fWorkerHierarchy.SetField(
"quality", 0.5);
376 cmd2.SetBool(
"#ltsm",
true);
377 if ((NumOutputs() < 3) || !SubmitCommandToTransport(OutputName(2), Assign(cmd2))) {
378 fWorkerHierarchy.SetField(
"ltsmid", 0);
379 fWorkerHierarchy.SetField(
"ltsmsize", 0);
380 fWorkerHierarchy.SetField(
"ltsmname", std::string());
381 fWorkerHierarchy.SetField(
"ltsmrefix", std::string());
382 Par(
"LtsmFileSize").SetValue(0.);
386 cmd3.SetBool(
"#mbs",
true);
387 if ((NumOutputs() < 1) || !SubmitCommandToTransport(OutputName(0), Assign(cmd3))) {
388 fWorkerHierarchy.SetField(
"mbsinfo",
"");
391 std::string info =
"BnetRecv: ";
392 std::vector<int64_t> qsz;
393 for (
unsigned n=0;n<NumInputs();++n) {
394 unsigned len = NumCanRecv(n);
396 info.append(std::to_string(len));
401 fWorkerHierarchy.SetField(
"queues", qsz);
402 fWorkerHierarchy.SetField(
"ninputs", NumInputs());
403 fWorkerHierarchy.SetField(
"build_events", fAllBuildEvents);
404 fWorkerHierarchy.SetField(
"build_data", fAllRecvBytes);
405 fWorkerHierarchy.SetField(
"discard_events", fAllDiscEvents);
409 std::string node_state =
"";
410 double node_quality = 1;
411 int node_progress = 0;
413 std::vector<uint64_t>
hubs, ports, hubs_progress, recv_sizes, recv_bufs, hubs_dropev, hubs_lostev;
414 std::vector<std::string> calibr, hubs_state, hubs_info;
415 std::vector<double> hubs_quality, hubs_rates;
416 for (
unsigned n=0;n<fCfg.size();n++) {
417 InputCfg &inp = fCfg[n];
419 hubs.push_back(inp.fHubId);
420 ports.push_back(inp.fUdpPort);
421 calibr.push_back(inp.fCalibr);
423 unsigned nbuf = NumCanRecv(n);
424 uint64_t bufsz = TotalSizeCanRecv(n);
426 if (inp.fIter.IsData()) {
428 bufsz += inp.fIter.remained_size();
431 recv_bufs.push_back(nbuf);
432 recv_sizes.push_back(bufsz);
434 if (!inp.fCalibrReq && !inp.fCalibr.empty()) {
439 inp.fCalibrReq =
true;
442 std::string hub_state =
"", sinfo =
"";
444 double rate = 0., hub_quality = 1;
445 int hub_progress = 100;
448 sinfo =
"missing transport-info";
454 else if (inp.fHubSizeTmCnt <= 15)
455 rate = (info->
fTotalRecvBytes - inp.fHubPrevSize)/1024.0/1024.0/inp.fHubSizeTmCnt;
458 inp.fHubSizeTmCnt = 0;
459 inp.fHubPrevSize = inp.fHubLastSize;
460 }
else if ((inp.fHubSizeTmCnt > 0.75*fEventBuildTimeout) && (hub_quality > 0.1)) {
461 hub_state =
"NoData";
464 }
else if ((inp.fHubSizeTmCnt > 7) && (hub_quality > 0.6)) {
465 hub_state =
"LowData";
471 sinfo =
dabc::format(
"port:%d %5.3f MB/s data:%s pkts:%s buf:%s disc:%s d32:%s drop:%s lost:%s errbits:%s ",
483 sinfo += inp.TriggerRingAsStr(16);
486 hubs_dropev.push_back(inp.fDroppedTrig);
487 hubs_lostev.push_back(inp.fLostTrig);
489 if (!inp.fCalibr.empty() && (inp.fCalibrQuality < hub_quality)) {
490 hub_state = inp.fCalibrState;
491 hub_quality = inp.fCalibrQuality;
492 hub_progress = inp.fCalibrProgr;
495 if ((hub_progress > 0) && ((node_progress == 0) || (hub_progress < node_progress)))
496 node_progress = hub_progress;
498 if (hub_quality < node_quality) {
499 node_quality = hub_quality;
500 node_state = hub_state;
503 hubs_state.push_back(hub_state);
504 hubs_info.push_back(sinfo);
505 hubs_quality.push_back(hub_quality);
506 hubs_progress.push_back(hub_progress);
507 hubs_rates.push_back(rate);
510 std::string info =
"BnetSend:";
511 std::vector<int64_t> qsz;
512 for (
unsigned n=0;n<NumOutputs();++n) {
513 unsigned len = NumCanSend(n);
515 info.append(std::to_string(len));
520 if (node_state.empty()) {
521 node_state =
"Ready";
526 fWorkerHierarchy.SetField(
"hubs",
hubs);
527 fWorkerHierarchy.SetField(
"hubs_info", hubs_info);
528 fWorkerHierarchy.SetField(
"ports", ports);
529 fWorkerHierarchy.SetField(
"calibr", calibr);
530 fWorkerHierarchy.SetField(
"state", node_state);
531 fWorkerHierarchy.SetField(
"quality", node_quality);
532 fWorkerHierarchy.SetField(
"progress", node_progress);
533 fWorkerHierarchy.SetField(
"nbuilders", NumOutputs());
534 fWorkerHierarchy.SetField(
"queues", qsz);
535 fWorkerHierarchy.SetField(
"hubs_dropev",hubs_dropev);
536 fWorkerHierarchy.SetField(
"hubs_lostev",hubs_lostev);
537 fWorkerHierarchy.SetField(
"hubs_state", hubs_state);
538 fWorkerHierarchy.SetField(
"hubs_quality", hubs_quality);
539 fWorkerHierarchy.SetField(
"hubs_progress", hubs_progress);
540 fWorkerHierarchy.SetField(
"hubs_rates", hubs_rates);
541 fWorkerHierarchy.SetField(
"recv_bufs", recv_bufs);
542 fWorkerHierarchy.SetField(
"recv_sizes", recv_sizes);
544 fWorkerHierarchy.GetHChild(
"State").SetField(
"value", node_state);
547 fBnetStat = fBldProfiler.Format();
551 fBldCalls = fInpCalls = fOutCalls = fBufCalls = fTimerCalls = 0;
560 DOUT5(
"CombinerModule::ShiftToNextBuffer %d ", ninp);
562 InputCfg& cfg = fCfg[ninp];
564 ReadIterator& iter = (cfg.fResortIndx < 0) ? cfg.fIter : cfg.fResortIter;
570 if (cfg.fResortIndx < 0) {
572 if(!CanRecv(ninp))
return false;
577 if (cfg.fResortIndx>1)
return false;
579 buf = RecvQueueItem(ninp, cfg.fResortIndx++);
587 return iter.Reset(buf);
592 InputCfg &cfg = fCfg[ninp];
593 ReadIterator &iter = (cfg.fResortIndx < 0) ? cfg.fIter : cfg.fResortIter;
599 res = iter.NextSubeventsBlock();
601 if (res && iter.IsData())
return true;
603 if(!ShiftToNextBuffer(ninp))
return false;
614 int res = (int) (next) - prev;
615 if (res > (
int) fMaxHadaqTrigger/2) res -= fMaxHadaqTrigger;
else
616 if (res < (
int) fMaxHadaqTrigger/-2) res += fMaxHadaqTrigger;
624 InputCfg& cfg = fCfg[ninp];
626 if (dropped && cfg.has_data) cfg.fDroppedTrig++;
630 ReadIterator& iter = cfg.fIter;
632 if (!iter.NextEvent())
634 if (!ShiftToNextHadTu(ninp))
return false;
637 if (fast)
return true;
640 cfg.evnt = iter.evnt();
642 cfg.data_size = cfg.evnt->AllSubeventsSize();
644 uint32_t seq = cfg.evnt->GetSeqNr();
646 cfg.fTrigNr = (seq >> 8) & fTriggerRangeMask;
647 cfg.fTrigTag = seq & 0xFF;
649 cfg.fTrigNumRing[cfg.fRingCnt] = cfg.fTrigNr;
652 cfg.fEmpty = (cfg.data_size == 0);
653 cfg.fDataError = cfg.evnt->GetDataError();
655 cfg.fTrigType = cfg.evnt->GetId() & 0xF;
661 cfg.fLastTrigNr = cfg.fTrigNr;
669 if (fBNETrecv)
return ShiftToNextEvent(ninp, fast, dropped);
671 DOUT5(
"CombinerModule::ShiftToNextSubEvent %d ", ninp);
673 InputCfg &cfg = fCfg[ninp];
676 if (dropped && cfg.has_data)
677 fprintf(stderr,
"Input%u Trig:%6x Tag:%2x DROP\n", ninp, cfg.fTrigNr, cfg.fTrigTag);
681 bool foundevent(
false), doshift(
true), tryresort(cfg.fResort);
683 if (cfg.fResortIndx >= 0) {
685 if (cfg.subevnt) cfg.subevnt->SetTrigNr(0xffffffff);
686 cfg.fResortIndx = -1;
687 cfg.fResortIter.Close();
690 if (dropped && cfg.has_data) cfg.fDroppedTrig++;
697 while (!foundevent) {
698 ReadIterator &iter = (cfg.fResortIndx < 0) ? cfg.fIter : cfg.fResortIter;
701 if (doshift) res = iter.NextSubEvent();
704 if (!res || (iter.subevnt() == 0)) {
705 DOUT5(
"CombinerModule::ShiftToNextSubEvent %d with zero NextSubEvent()", ninp);
708 if (ShiftToNextHadTu(ninp))
continue;
710 if ((cfg.fResortIndx>=0) && (NumCanRecv(ninp) > 1)) {
713 cfg.fResortIndx = -1;
714 cfg.fResortIter.Close();
725 if (fast)
return true;
727 if (tryresort && (cfg.fLastTrigNr!=0xffffffff)) {
728 uint32_t trignr = iter.subevnt()->GetTrigNr();
729 if (trignr==0xffffffff)
continue;
731 int diff = CalcTrigNumDiff(cfg.fLastTrigNr, (trignr >> 8) & fTriggerRangeMask);
735 if (cfg.fResortIndx < 0) {
737 cfg.fResortIter = cfg.fIter;
746 cfg.subevnt = iter.subevnt();
748 cfg.data_size = cfg.subevnt->GetPaddedSize();
750 cfg.fTrigNr = (cfg.subevnt->GetTrigNr() >> 8) & fTriggerRangeMask;
751 cfg.fTrigTag = cfg.subevnt->GetTrigNr() & 0xFF;
756 if (((cfg.fTrigNr & 0xffff) == 0) &&
757 (fTriggerRangeMask > 0x100000) &&
758 (cfg.fResortIndx < 0) &&
759 (cfg.fLastTrigNr != 0xffffffff) &&
760 ((cfg.fLastTrigNr & 0xffff) == 0xffff) &&
761 ((cfg.fTrigNr & 0xffff0000) == (cfg.fLastTrigNr & 0xffff0000)))
762 cfg.fTrigNr = (cfg.fLastTrigNr + 1) & fTriggerRangeMask;
765 fprintf(stderr,
"Input%u Trig:%6x Tag:%2x\n", ninp, cfg.fTrigNr, cfg.fTrigTag);
768 cfg.fTrigNumRing[cfg.fRingCnt] = cfg.fTrigNr;
772 cfg.fDataError = cfg.subevnt->GetDataError();
774 cfg.fHubId = cfg.subevnt->GetId() & 0xffff;
778 if (!fHadesTriggerType) {
779 cfg.fTrigType = cfg.subevnt->GetTrigTypeTrb3();
780 }
else if (cfg.fHubId == fHadesTriggerHUB) {
782 uint32_t bitmask = 0xff000000;
783 uint32_t bitshift = 24;
785 uint32_t val = cfg.subevnt->Data(wordNr - 1);
786 cfg.fTrigType = (val & bitmask) >> bitshift;
792 uint32_t errorBits = cfg.subevnt->GetErrBits();
794 if ((errorBits != 0) && (errorBits != 1))
798 if (cfg.fLastTrigNr != 0xffffffff)
799 diff = CalcTrigNumDiff(cfg.fLastTrigNr, cfg.fTrigNr);
800 cfg.fLastTrigNr = cfg.fTrigNr;
802 if (diff>1) cfg.fLostTrig += (diff-1);
810 DOUT0(
"hadaq::CombinerModule::DropAllInputBuffers()...");
812 unsigned maxnumsubev(0), droppeddata(0);
814 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
815 unsigned numsubev = 0;
818 if (fCfg[ninp].has_data) numsubev++;
819 droppeddata += fCfg[ninp].data_size;
820 }
while (ShiftToNextSubEvent(ninp,
true,
true));
822 if (numsubev>maxnumsubev) maxnumsubev = numsubev;
826 while (SkipInputBuffers(ninp, 100));
829 Par(fLostEventRateName).SetValue(maxnumsubev);
830 Par(fDataDroppedRateName).SetValue(droppeddata/1024./1024.);
831 fRunDiscEvents += maxnumsubev;
832 fAllDiscEvents += maxnumsubev;
833 fRunDroppedData += droppeddata;
834 fAllDroppedData += droppeddata;
841 if (!fBNETsend || (NumOutputs()<2))
return -1;
843 return (trignr/fBNETbunch) % NumOutputs();
848 if (!fBNETsend || (fLastTrigNr==0xffffffff))
return true;
850 return DestinationPort(fLastTrigNr) == DestinationPort(trignr);
882 double tm = fLastProcTm.SpentTillNow(
true);
883 if (tm > fMaxProcDist) fMaxProcDist = tm;
888 unsigned masterchannel(0), min_inp(0);
889 uint32_t subeventssize(0), mineventid(0), maxeventid(0), buildevid(0);
890 bool incomplete_data(
false), any_data(
false);
895 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
896 if (!fCfg[ninp].has_data)
897 if (!ShiftToNextSubEvent(ninp)) {
900 if (fExtraDebug && fLastDebugTm.Expired(2.)) {
901 DOUT1(
"Fail to build event while input %u is not ready numcanrecv %u maxtm = %5.3f ", ninp, NumCanRecv(ninp), fMaxProcDist);
902 fLastDebugTm.GetNow();
907 incomplete_data =
true;
911 uint32_t evid = fCfg[ninp].fTrigNr;
920 if (CalcTrigNumDiff(evid, maxeventid) < 0)
923 if (CalcTrigNumDiff(mineventid, evid) < 0) {
933 int diff = incomplete_data ? 0 : CalcTrigNumDiff(mineventid, maxeventid);
940 if (fLastDropTm.Expired((fEventBuildTimeout > 0) ? 1.5*fEventBuildTimeout : 5.))
941 if (((fTriggerNrTolerance > 0) && (diff > fTriggerNrTolerance)) || ((fEventBuildTimeout > 0) && fLastBuildTm.Expired(fEventBuildTimeout) && any_data && (fCfg.size() > 1))) {
944 if ((fTriggerNrTolerance > 0) && (diff > fTriggerNrTolerance)) {
946 "Event id difference %d exceeding tolerance window %d (min input %u),",
947 diff, fTriggerNrTolerance, min_inp);
949 msg =
dabc::format(
"No events were build since at least %.1f seconds,", fEventBuildTimeout);
952 if (missing_inp >= 0) {
953 msg +=
dabc::format(
" missing data on input %d url: %s,", missing_inp, FindPort(InputName(missing_inp)).Cfg(
"url").AsStr().c_str());
961 fprintf(stderr,
"DROP ALL\n");
964 DropAllInputBuffers();
968 if (fExtraDebug && fLastDebugTm.Expired(1.)) {
969 DOUT1(
"Drop all buffers");
970 fLastDebugTm.GetNow();
972 fLastDropTm.GetNow();
980 if (incomplete_data)
return false;
982 uint32_t buildtag = fCfg[masterchannel].fTrigTag;
990 bool dataError(
false), tagError(
false);
992 bool hasCompleteEvent =
true;
994 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
995 bool foundsubevent =
false;
996 while (!foundsubevent) {
997 uint32_t trignr = fCfg[ninp].fTrigNr;
998 uint32_t trigtag = fCfg[ninp].fTrigTag;
999 bool isempty = fCfg[ninp].fEmpty;
1000 bool haserror = fCfg[ninp].fDataError;
1001 if (trignr == buildevid) {
1003 if (!isempty || !fSkipEmpty) {
1005 if (trigtag != buildtag) tagError =
true;
1006 if (haserror) dataError =
true;
1007 subeventssize += fCfg[ninp].data_size;
1009 foundsubevent =
true;
1013 if (CalcTrigNumDiff(trignr, buildevid) > 0) {
1015 int droppedsize = fCfg[ninp].data_size;
1018 fprintf(stderr,
"Input%u TrigNr:%6x Skip while building %6x diff %u\n", ninp, trignr, buildevid, CalcTrigNumDiff(trignr, buildevid));
1023 fDataDroppedRateCnt += droppedsize;
1026 fRunDroppedData += droppedsize;
1027 fAllDroppedData += droppedsize;
1029 if(!ShiftToNextSubEvent(ninp,
false,
true)) {
1030 if (fExtraDebug && fLastDebugTm.Expired(2.)) {
1031 DOUT1(
"Cannot shift data from input %d", ninp);
1032 fLastDebugTm.GetNow();
1045 hasCompleteEvent =
false;
1059 uint32_t sequencenumber = fRunBuildEvents + 1;
1062 sequencenumber = (fCfg[masterchannel].fTrigNr << 8) | fCfg[masterchannel].fTrigTag;
1064 if (hasCompleteEvent && fCheckTag && tagError) {
1065 hasCompleteEvent =
false;
1067 if (fBNETrecv)
DOUT0(
"TAG error");
1074 if (hasCompleteEvent) {
1075 if (fOut.IsBuffer() && (!fOut.IsPlaceForEvent(subeventssize) || !CheckDestination(buildevid))) {
1077 if (!FlushOutputBuffer()) {
1078 if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1079 std::string sendmask;
1080 for (
unsigned n=0;n<NumOutputs();n++)
1081 sendmask.append(CanSend(n) ?
"o" :
"x");
1083 DOUT0(
"FlushOutputBuffer can't send to all %u outputs sendmask = %s", NumOutputs(), sendmask.c_str());
1084 fLastDebugTm.GetNow();
1090 if (!fOut.IsBuffer()) {
1094 if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1095 DOUT0(
"did not have new buffer - wait for it");
1096 fLastDebugTm.GetNow();
1101 if (!fOut.Reset(buf)) {
1102 SetInfo(
"Cannot use buffer for output - hard error!!!!",
true);
1105 if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1106 DOUT0(
"Abort application completely");
1107 fLastDebugTm.GetNow();
1113 if (!fOut.IsPlaceForEvent(subeventssize)) {
1114 DOUT0(
"New buffer has not enough space, skip subevent!");
1115 hasCompleteEvent =
false;
1120 if (hasCompleteEvent) {
1125 fOut.NewEvent(sequencenumber, fRunNumber);
1129 fOut.evnt()->SetDataError((dataError || tagError));
1130 if (dataError) fRunDataErrors++;
1131 if (tagError) fRunTagErrors++;
1133 unsigned trigtyp = 0;
1134 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
1135 trigtyp = fCfg[ninp].fTrigType;
1140 unsigned currentid = trigtyp | (2 << 12);
1142 fEventIdCount[currentid & 0xF]++;
1148 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
1149 if (fCfg[ninp].fEmpty && fSkipEmpty)
continue;
1151 fOut.AddAllSubevents(fCfg[ninp].evnt);
1153 fOut.AddSubevent(fCfg[ninp].subevnt);
1154 DoInputSnapshot(ninp);
1163 if (fLastTrigNr!=0xffffffff) diff = CalcTrigNumDiff(fLastTrigNr, buildevid);
1170 fprintf(stderr,
"BUILD:%6x\n", buildevid);
1173 if (fBNETrecv && fEvnumDiffStatistics && (fBNETNumRecv > 1) && (diff > fBNETbunch)) {
1177 long ncycles = diff / (fBNETbunch * fBNETNumRecv);
1180 diff -= ncycles * (fBNETbunch * fBNETNumRecv);
1183 diff -= fBNETbunch * (fBNETNumRecv - 1);
1184 if (diff <= 0) diff = 1;
1187 diff += ncycles * fBNETbunch;
1194 fLastTrigNr = buildevid;
1199 if (fEvnumDiffStatistics && (diff > 1)) {
1201 if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1202 DOUT1(
"Events gap %d", diff-1);
1203 fLastDebugTm.GetNow();
1206 fLostEventRateCnt += (diff-1);
1208 fRunDiscEvents += (diff-1);
1209 fAllDiscEvents += (diff-1);
1213 fRunRecvBytes += currentbytes;
1214 fAllRecvBytes += currentbytes;
1215 fDataRateCnt += currentbytes;
1218 fLastBuildTm.GetNow();
1220 grd.Next(
"lostl", 14);
1221 fLostEventRateCnt += 1;
1223 fRunDiscEvents += 1;
1224 fAllDiscEvents += 1;
1227 std::string debugmask;
1228 debugmask.resize(fCfg.size(),
' ');
1230 grd.Next(
"shift", 15);
1233 for (
unsigned ninp = 0; ninp < fCfg.size(); ninp++)
1234 if (fCfg[ninp].fTrigNr == buildevid) {
1235 debugmask[ninp] =
'o';
1236 ShiftToNextSubEvent(ninp,
false, !hasCompleteEvent);
1238 debugmask[ninp] =
'x';
1241 if (fExtraDebug && fLastDebugTm.Expired(1.)) {
1242 DOUT1(
"Did building as usual mask %s complete = %5s maxdist = %5.3f s", debugmask.c_str(),
DBOOL(hasCompleteEvent), fMaxProcDist);
1243 fLastDebugTm.GetNow();
1259 auto &cfg = fCfg[ninp];
1261 cfg.fNumCanRecv = NumCanRecv(ninp);
1262 cfg.fQueueLevel = (cfg.fQueueCapacity > 0) ? 1. * cfg.fNumCanRecv / cfg.fQueueCapacity : 0.;
1263 cfg.fLastEvtBuildTrigId = (cfg.fTrigNr << 8) | (cfg.fTrigTag & 0xff);
1269 bool do_start =
false, do_stop =
false;
1271 if (cmd.
IsName(
"StartHldFile")) {
1272 do_start = do_stop =
true;
1273 SetInfo(
"Execute StartHldFile");
1275 DOUT0(
"******************* START HLD FILE *************");
1276 }
else if (cmd.
IsName(
"StopHldFile")) {
1278 SetInfo(
"Execute StopHldFile");
1279 DOUT0(
"******************* STOP HLD FILE *************");
1281 }
else if (cmd.
IsName(
"RestartHldFile")) {
1283 SetInfo(
"Execute RestartHldFile");
1285 SubmitCommandToTransport(OutputName(1), cmd);
1287 }
else if (cmd.
IsName(
"BnetFileControl")) {
1291 std::string mode = cmd.
GetStr(
"mode");
1293 if (mode ==
"start") {
1294 SetInfo(
"Execute BnetFileControl");
1295 for (
unsigned k=1;k<NumOutputs();++k) {
1297 subcmd.SetBool(
"only_prefix",
true);
1298 subcmd.SetStr(
"prefix", cmd.
GetStr(
"prefix"));
1299 SubmitCommandToTransport(OutputName(k), Assign(subcmd));
1302 fBnetFileCmd.
SetInt(
"#replies", NumOutputs()-1);
1303 if (!fBnetFileCmd.IsTimeoutSet()) fBnetFileCmd.SetTimeout(30);
1307 if (mode ==
"stop") {
1308 if (fRunNumber) StoreRunInfoStop();
1312 FlushOutputBuffer();
1315 for (
unsigned k=1;k<NumOutputs();++k) {
1318 if (eolbuf.
null()) {
1319 EOUT(
"FAIL to SEND EOL buffer to OUTPUT %d", k);
1331 }
else if (cmd.
IsName(
"HCMD_DropAllBuffers")) {
1333 DropAllInputBuffers();
1335 fLastDropTm.GetNow();
1337 if (fBNETsend && !fIsTerminating) {
1338 for (
unsigned n = 0; n < NumInputs(); n++) {
1339 fCfg[n].fErrorBitsCnt = 0;
1340 fCfg[n].fDroppedTrig = 0;
1341 fCfg[n].fLostTrig = 0;
1342 fCfg[n].fHubSizeTmCnt = 0;
1343 fCfg[n].fHubLastSize = 0;
1344 fCfg[n].fHubPrevSize = 0;
1346 SubmitCommandToTransport(InputName(n), subcmd);
1353 }
else if (cmd.
IsName(
"BnetCalibrControl")) {
1355 if (!fBNETsend || fIsTerminating || (NumInputs()==0))
1358 if (!fBnetCalibrCmd.null()) {
1359 EOUT(
"Still calibration command running");
1363 fBnetCalibrCmd = cmd;
1364 fBnetCalibrCmd.
SetInt(
"#replies", NumInputs());
1365 fBnetCalibrCmd.SetDouble(
"quality", 1.0);
1367 std::string rundir =
"";
1368 unsigned runid = cmd.
GetUInt(
"runid");
1370 if ((cmd.
GetStr(
"mode") !=
"start") && !fBNETCalibrDir.empty() && (runid != 0)) {
1371 rundir = fBNETCalibrDir;
1374 std::string mkdir =
"mkdir -p ";
1375 mkdir.append(rundir);
1376 auto res = system(mkdir.c_str());
1378 fBnetCalibrCmd.SetStr(
"#rundir", rundir);
1382 DOUT0(
"Combiner get BnetCalibrControl mode %s rundir %s", cmd.
GetStr(
"mode").c_str(), rundir.c_str());
1384 for (
unsigned n = 0; n < NumInputs(); n++) {
1386 subcmd.SetStr(
"mode", cmd.
GetStr(
"mode"));
1387 subcmd.SetStr(
"rundir", rundir);
1388 SubmitCommandToTransport(InputName(n), Assign(subcmd));
1392 }
else if (cmd.
IsName(
"BnetCalibrRefresh")) {
1394 if (!fBNETsend || fIsTerminating || (NumInputs()==0))
1397 if (!fBnetRefreshCmd.null()) {
1398 EOUT(
"Still calibration command running");
1402 fBnetRefreshCmd = cmd;
1403 fBnetRefreshCmd.
SetInt(
"#replies", NumInputs());
1404 fBnetRefreshCmd.SetDouble(
"quality", 1.0);
1406 for (
unsigned n = 0; n < NumInputs(); n++) {
1408 SubmitCommandToTransport(InputName(n), Assign(subcmd));
1421 res = DisconnectPort(OutputName(1));
1426 if (do_start && res) {
1427 std::string fname = cmd.
GetStr(
"filename",
"file.hld");
1437 DOUT0(
"Start HLD file %s res = %s", fname.c_str(),
DBOOL(res));
1440 return cmd_bool(res);
1450 if(!fRunToOracle || fRunNumber==0)
return;
1454 strftime(ltime, 20,
"%Y-%m-%d %H:%M:%S", localtime_r(&t, &tm_res));
1455 std::string filename=GenerateFileName(fRunNumber);
1456 FILE *fp = fopen(fRunInfoToOraFilename.c_str(),
"a+");
1458 fprintf(fp,
"start %u %d %s %s\n", fRunNumber, fEBId, filename.c_str(), ltime);
1461 DOUT1(
"Write run info to %s - start: %lu %d %s %s ", fRunInfoToOraFilename.c_str(), fRunNumber, fEBId, filename.c_str(), ltime);
1472 if(!fRunToOracle || fRunNumber==0)
return;
1478 if(onexit || (newrunid==0))
1484 strftime(ltime, 20,
"%Y-%m-%d %H:%M:%S", localtime_r(&t, &tm_res));
1485 std::string filename = GenerateFileName(fRunNumber);
1486 FILE *fp = fopen(fRunInfoToOraFilename.c_str(),
"a+");
1488 fprintf(fp,
"stop %u %d %s %s %s ", fRunNumber, fEBId, filename.c_str(), ltime, Unit(fRunBuildEvents));
1489 fprintf(fp,
"%s\n", Unit(fRunRecvBytes));
1492 DOUT1(
"Write run info to %s - stop: %lu %d %s %s %s %s", fRunInfoToOraFilename.c_str(), fRunNumber, fEBId, filename.c_str(), ltime, Unit(fRunBuildEvents),Unit(fRunRecvBytes));
1500 fRunBuildEvents = 0;
1502 fRunDroppedData = 0;
1506 if (!fBNETrecv && !fIsTerminating)
1507 for (
unsigned n = 0; n < NumInputs(); n++) {
1508 SubmitCommandToTransport(InputName(n),
dabc::Command(
"ResetTransportStat"));
1510 fCfg[n].fLastEvtBuildTrigId = 0;
1514 fEventIdCount[i] = 0;
1521 static char retVal[16];
1522 static char u[] =
" kM";
1525 for (i = 0; v >= 10000 && i <
sizeof(u) - 2; v /= 1000, i++) {
1527 snprintf(retVal,
sizeof(retVal),
"%4lu%c", v, u[i]);
1539 if (cmd.
IsName(
"GetHadaqTransportInfo")) {
1540 unsigned id = cmd.
GetUInt(
"id");
1541 if (
id < fCfg.size()) {
1542 fCfg[id].fInfo = cmd.
GetPtr(
"Info");
1543 fCfg[id].fUdpPort = cmd.
GetUInt(
"UdpPort");
1544 fCfg[id].fCalibr = cmd.
GetStr(
"CalibrModule");
1547 }
else if (cmd.
IsName(
"GetCalibrState")) {
1548 unsigned n = cmd.
GetUInt(
"indx");
1549 if (n < fCfg.size()) {
1550 fCfg[n].fCalibrReq =
false;
1553 fCfg[n].fCalibrProgr = cmd.
GetInt(
"progress");
1554 fCfg[n].fCalibrState = cmd.
GetStr(
"state");
1555 fCfg[n].fCalibrQuality = cmd.
GetDouble(
"quality");
1558 }
else if (cmd.
IsName(
"GetTransportStatistic")) {
1560 fWorkerHierarchy.SetField(
"mbsinfo", cmd.
GetStr(
"MbsInfo"));
1564 unsigned runid = cmd.
GetUInt(
"RunId");
1565 std::string runname = cmd.
GetStr(
"RunName");
1566 std::string runprefix = cmd.
GetStr(
"RunPrefix");
1567 unsigned runsz = cmd.
GetUInt(
"RunSize");
1571 fWorkerHierarchy.SetField(
"ltsmid", runid);
1572 fWorkerHierarchy.SetField(
"ltsmsize", runsz);
1573 fWorkerHierarchy.SetField(
"ltsmname", runname);
1574 fWorkerHierarchy.SetField(
"ltsmprefix", runprefix);
1575 Par(
"LtsmFileSize").SetValue(runsz/1024./1024.);
1579 fWorkerHierarchy.SetField(
"runid", runid);
1580 fWorkerHierarchy.SetField(
"runsize", runsz);
1581 fWorkerHierarchy.SetField(
"runname", runname);
1582 fWorkerHierarchy.SetField(
"runprefix", runprefix);
1584 Par(
"RunFileSize").SetValue(runsz/1024./1024.);
1586 std::string state =
"File";
1587 double quality = 0.98;
1588 if ((Par(fEventRateName).Value().AsDouble() == 0) && (quality > 0.55)) { state =
"NoData"; quality = 0.55; }
1589 if ((runid==0) && runname.empty() && (quality > 0.5)) { state =
"NoFile"; quality = 0.5; }
1591 fWorkerHierarchy.SetField(
"state", state);
1592 fWorkerHierarchy.SetField(
"quality", quality);
1593 fWorkerHierarchy.GetHChild(
"State").SetField(
"value", state);
1596 }
else if (cmd.
IsName(
"RestartTransport")) {
1597 int num = fBnetFileCmd.GetInt(
"#replies");
1599 unsigned newrunid = fBnetFileCmd.GetUInt(
"runid");
1600 if (fRunNumber) StoreRunInfoStop(
false, newrunid);
1601 std::string newprefix = fBnetFileCmd.GetStr(
"prefix");
1602 if(!newprefix.empty()) fPrefix = newprefix;
1604 fRunNumber = newrunid;
1605 ResetInfoCounters();
1606 StoreRunInfoStart();
1609 fBnetFileCmd.SetInt(
"#replies", num-1);
1612 }
else if (cmd.
IsName(
"TdcCalibrations")) {
1613 int num = fBnetCalibrCmd.GetInt(
"#replies");
1615 if (q < fBnetCalibrCmd.GetDouble(
"quality"))
1616 fBnetCalibrCmd.SetDouble(
"quality", q);
1620 std::string rundir = fBnetCalibrCmd.GetStr(
"#rundir");
1621 DOUT0(
"COMBINER COMPLETE CALIBR PROCESSING quality %5.3f dir %s", fBnetCalibrCmd.GetDouble(
"quality"), rundir.c_str());;
1623 if (!fBNETCalibrPackScript.empty() && !rundir.empty() && (fBnetCalibrCmd.GetStr(
"mode") ==
"stop")) {
1624 std::string exec = fBNETCalibrPackScript;
1626 exec.append(rundir);
1627 int res = system(exec.c_str());
1628 DOUT0(
"EXEC %s res = %d", exec.c_str(), res);
1633 fBnetCalibrCmd.SetInt(
"#replies", num-1);
1636 }
else if (cmd.
IsName(
"CalibrRefresh")) {
1637 int num = fBnetCalibrCmd.GetInt(
"#replies");
1639 if (q < fBnetRefreshCmd.GetDouble(
"quality"))
1640 fBnetRefreshCmd.SetDouble(
"quality", q);
1641 fBnetRefreshCmd.SetInt(
"#replies", num-1);
Reference on memory from memory pool.
unsigned GetTypeId() const
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
void SetTypeId(unsigned tid)
Represents command with its arguments.
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
bool SetStrRawData(const std::string &str)
Set raw data with string content.
double GetDouble(const std::string &name, double dflt=0.) const
bool SetInt(const std::string &name, int v)
std::string GetStr(const std::string &name, const std::string &dflt="") const
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
bool GetBool(const std::string &name, bool dflt=false) const
int GetInt(const std::string &name, int dflt=0) const
void ChangeName(const std::string &name)
Change command name, should not be used for remote commands.
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
Represents objects hierarchy of remote (or local) DABC process.
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Special info parameter class.
bool CreateTransport(const std::string &portname, const std::string &transportkind="", const std::string &thrdname="")
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
bool SetValue(const RecordField &v)
Set parameter value.
void FireModified()
Can be called by user to signal framework that parameter was modified.
bool SetField(const std::string &name, const RecordField &v)
void Release()
Releases reference on the object.
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
void ProcessUserEvent(unsigned item) override
Method called by framework when custom user event is produced.
bool ShiftToNextEvent(unsigned ninp, bool fast=false, bool dropped=false)
Shifts to next event in the input queue.
void AfterModuleStop() override
void SetInfo(const std::string &info, bool forceinfo=false)
bool ShiftToNextHadTu(unsigned ninp)
bool CheckDestination(uint32_t trignr)
bool ShiftToNextBuffer(unsigned ninp)
Method should be used to skip current buffer from the queue.
bool DropAllInputBuffers()
void StoreRunInfoStop(bool onexit=false, unsigned newrunid=0)
void DoInputSnapshot(unsigned ninp)
std::string GenerateFileName(unsigned runid)
char * Unit(unsigned long v)
int DestinationPort(uint32_t trignr)
int ExecuteCommand(dabc::Command cmd) override
Main method where commands are executed.
void ProcessTimerEvent(unsigned timer) override
Method called by framework when timer event is produced.
bool ShiftToNextSubEvent(unsigned ninp, bool fast=false, bool dropped=false)
Shifts to next subevent in the input queue.
void BeforeModuleStart() override
void ModuleCleanup() override
Method, which can be reimplemented by user and should cleanup all references on buffers and other obj...
int CalcTrigNumDiff(const uint32_t &prev, const uint32_t &next)
bool ReplyCommand(dabc::Command cmd) override
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
CombinerModule(const std::string &name, dabc::Command cmd=nullptr)
void StartEventsBuilding()
virtual ~CombinerModule()
#define HADAQ_NEVTIDS_IN_FILE
std::vector< unsigned > hubs
std::string format(const char *fmt,...)
std::string size_to_str(unsigned long sz, int prec=1, int select=0)
Convert size to string of form like 4.2 GB or 3.7 MB.
std::string number_to_str(unsigned long num, int prec=1, int select=0)
Convert number to string of form like 4.2G or 3.7M.
const char * xmlFlushTimeout
const char * xmlHadaqTriggerTollerance
const char * xmlHadesTriggerHUB
const char * xmlMaxNumBuildEvt
const char * xmlEvtbuildTimeout
const char * xmlHadaqDiffEventStats
const char * xmlHadaqTrignumRange
std::string FormatFilename(uint32_t runid, uint16_t ebid=0)
const char * xmlHadesTriggerType
Hadaq subevent structure.
int fNPort
upd port number
uint64_t fTotalRecvPacket
uint64_t fTotalProducedBuffers
std::string GetDiscard32String()
std::string GetDiscardString()