19 #include "dabc/Factory.h"
30 #include "hadaq/TdcProcessor.h"
31 #include "hadaq/TrbProcessor.h"
32 #include "hadaq/HldProcessor.h"
36 #include "base/Buffer.h"
37 #include "base/StreamProc.h"
43 dabc::ModuleAsync(name, cmd),
79 hadaq::TdcProcessor::SetAllHistos(
true);
81 const char *dabcsys = getenv(
"DABCSYS");
82 const char *streamsys = getenv(
"STREAMSYS");
84 EOUT(
"DABCSYS variable not set, cannot run stream framework");
90 EOUT(
"STREAMSYS variable not set, cannot run stream framework");
95 std::string extra_include;
97 if (system(
"ls first.C >/dev/null 2>/dev/null") != 0)
98 extra_include =
dabc::format(
"-I%s/applications/autotdc", streamsys);
100 bool second = system(
"ls second.C >/dev/null 2>/dev/null") == 0;
102 #if defined(__MACH__)
103 const char *compiler =
"clang++";
104 const char *ldflags =
"";
106 const char *compiler =
"g++";
107 const char *ldflags =
"-Wl,--no-as-needed";
110 std::string exec =
dabc::format(
"%s %s/plugins/stream/src/stream_engine.cpp -O2 -fPIC -Wall -std=c++11 -I. -I%s/include %s %s"
111 "-shared -Wl,-soname,librunstream.so %s -Wl,-rpath,%s/lib -Wl,-rpath,%s/lib -o librunstream.so",
112 compiler, dabcsys, streamsys, extra_include.c_str(),
113 (second ?
"-D_SECOND_ " :
""), ldflags, dabcsys, streamsys);
115 system(
"rm -f ./librunstream.so");
117 DOUT0(
"Executing %s", exec.c_str());
119 int res = system(exec.c_str());
122 EOUT(
"Fail to compile first.C/second.C scripts. Abort");
128 EOUT(
"Fail to load generated librunstream.so library");
135 EOUT(
"Fail to find stream_engine function in librunstream.so library");
165 cmddef.
AddArg(
"fname",
"string",
true,
"file.root");
166 cmddef.
AddArg(
"kind",
"int",
false,
"2");
167 cmddef.
AddArg(
"maxsize",
"int",
false,
"1900");
178 double interval =
Cfg(
"AutoSave", cmd).
AsDouble(0);
179 if (interval > 1)
CreateTimer(
"AutoSave", interval);
197 if ((fInitFunc!=
nullptr) && (fParallel<=0)) {
203 fProcMgr->SetTop(fWorkerHierarchy, fParallel==0);
207 if (fFileUrl.length() > 0) {
212 if (fname.rfind(
".root") == fname.length() - 5) {
213 fProcMgr->SetTriggeredAnalysis(
true);
215 if (kind!=-1) fProcMgr->SetStoreKind(kind);
216 if (!fProcMgr->CreateStore(fFileUrl.c_str()))
217 EOUT(
"Fail to create store %s - check if libDabcRoot.so plugin in the xml file", fFileUrl.c_str());
221 if (hlevel != -111) fProcMgr->SetHistFilling(hlevel);
224 if (hldfilter>=0)
new hadaq::HldFilter(hldfilter);
228 base::ProcMgr::ClearInstancePointer();
230 if (fProcMgr->IsStreamAnalysis()) {
231 EOUT(
"Stream analysis kind is not supported in DABC engine");
235 if ((fParallel>0) && (fInitFunc!=
nullptr)) {
236 for (
int n=0;n<fParallel;n++) {
237 std::string mname =
dabc::format(
"%s%03d", GetName(), n);
239 cmd.
SetPtr(
"initfunc", fInitFunc);
240 cmd.
SetInt(
"parallel", -1);
242 DOUT0(
"Create module %s", mname.c_str());
248 DOUT0(
"Connect %s ->%s", OutputName(n,
true).c_str(), m.
InputName(0).c_str());
250 DOUT0(
"Connect output %u connected %s", n,
DBOOL(IsOutputConnected(n)));
256 DOUT0(
"!!!! Assigned to thread %s !!!!!", thread().GetName());
261 if (fDidMerge || (fParallel<=0))
return;
270 DOUT0(
"Can now merge histograms");
271 for (
int n=0;n<fParallel;n++) {
273 std::string mname =
dabc::format(
"%s%03d", GetName(), n);
285 DOUT0(
"Adopt histograms from %s", mname.c_str());
292 while (iter1.next()) {
293 if (!iter2.
next()) { miss =
true;
break; }
294 if (strcmp(iter1.name(),iter2.
name())!=0) { miss =
true;
break; }
305 int indx = item1.
GetField(
"_kind").
AsStr()==
"ROOT.TH1D" ? 2 : 5;
310 while (++indx<len) arr1[indx]+=arr2[indx];
315 EOUT(
"!!!!!!!!!!!!!! MISMATCH - CANNOT MERGE HISTOGRAMS !!!!!!!!!!!!");
319 DOUT0(
"Merged %d histograms from %s", nhist, mname.c_str());
324 SaveHierarchy(
main.SaveToBuffer());
330 if (fProcMgr && fProcMgr->ExecuteHCommand(cmd)) {
331 if (fProcMgr->IsWorking()) ActivateInput();
335 if (cmd.
IsName(dabc::CmdHierarchyExec::CmdName())) {
336 std::string cmdpath = cmd.
GetStr(
"Item");
337 DOUT0(
"Execute command %s", cmdpath.c_str());
339 if (cmdpath ==
"Control/StartRootFile") {
340 std::string fname = cmd.
GetStr(
"fname",
"file.root");
341 int kind = cmd.
GetInt(
"kind", 2);
342 int maxsize = cmd.
GetInt(
"maxsize", 1900);
345 if (fProcMgr->CreateStore(fname.c_str())) {
347 if (fProcMgr->IsRawAnalysis()) fProcMgr->SetTriggeredAnalysis(
true);
348 fProcMgr->SetStoreKind(kind);
349 fProcMgr->UserPreLoop(0,
true);
355 if (cmdpath ==
"Control/StopRootFile") {
356 if (fProcMgr) fProcMgr->CloseStore();
363 if (cmd.
IsName(
"SlaveFinished")) {
364 if (--fStopMode == 0) {
365 ProduceMergedHierarchy();
366 DOUT0(
"Stop ourself");
371 if (cmd.
IsName(
"GetHierarchy")) {
372 cmd.
SetRef(
"hierarchy", fWorkerHierarchy);
381 DOUT0(
"START STREAM MODULE %s inp %s", GetName(),
DBOOL(IsInputConnected(0)));
383 if (fProcMgr) fProcMgr->UserPreLoop();
393 system(
"rm -f h.bin");
402 std::string args(
"dabc_root -skip-zero -h h.bin -o ");
405 DOUT0(
"Calling: %s", args.c_str());
407 int res = system(args.c_str());
409 if (res!=0)
EOUT(
"Fail to convert DABC histograms in ROOT file, check h.bin file");
410 else system(
"rm -f h.bin");
415 if (fProcMgr) fProcMgr->UserPostLoop();
419 DOUT0(
"STOP STREAM MODULE %s data %lu evnts %lu outevents %lu %s", GetName(), fTotalSize, fTotalEvnts, fTotalOutEvnts, (fTotalEvnts == fTotalOutEvnts ?
"ok" :
"MISSMATCH"));
422 ProduceMergedHierarchy();
423 }
else if (fAsf.length()>0) {
424 SaveHierarchy(fWorkerHierarchy.SaveToBuffer());
427 DestroyPar(
"Events");
432 if (fProcMgr==0)
return false;
436 if (fParallel==0) Par(
"Events").SetValue(1);
443 bbuf.makereferenceof(evnt, evntsize);
445 bbuf().kind = base::proc_TRBEvent;
449 fProcMgr->ProvideRawData(bbuf);
451 base::Event* outevent = 0;
454 bool new_event = fProcMgr->AnalyzeNewData(outevent);
459 fProcMgr->ProcessEvent(outevent);
461 new_event = fProcMgr->ProduceNextEvent(outevent);
472 if (fProcMgr && !fProcMgr->IsWorking())
return false;
476 if (fParallel==0) Par(
"DataRate").SetValue(buf.
GetTotalSize()/1024./1024.);
480 std::string
main = GetName();
507 if ((fStopMode != -1111) || (fParallel <= 0))
return;
509 DOUT0(
"Inject EOF to finish parallel jobs");
511 SendToAllOutputs(buf);
513 fStopMode = fParallel;
514 SendToAllOutputs(buf);
521 unsigned indx(0), max(0), min(10);
522 for (
unsigned n=0;n<NumOutputs();n++) {
523 unsigned cansend = NumCanSend(n);
524 if (cansend > max) { max = cansend; indx = n; }
525 if (cansend < min) min = cansend;
529 if (max==0)
return false;
532 if ((RecvQueueItem().GetTypeId() ==
dabc::mbt_EOF) && (min == 0))
556 Par(
"Events").SetValue(cnt);
570 return ProcessNextBuffer();
572 return RedistributeBuffers();
577 if (TimerName(timer) ==
"AutoSave") {
578 if (fProcMgr) fProcMgr->SaveAllHistograms();
582 if (TimerName(timer) ==
"KeepAlive") {
588 RedistributeBuffers();
590 if ((fTotalEvnts > 0) && !IsPortConnected(InputName())) {
600 hadaq::HldProcessor *hld =
dynamic_cast<hadaq::HldProcessor*
> (fProcMgr->FindProc(
"HLD"));
605 folder.
SetField(
"EventsRate", Par(
"EventsRate").GetField(
"value").AsDouble());
606 folder.
SetField(
"EventsCount", (int64_t) fTotalEvnts);
607 folder.
SetField(
"StoreInfo", fProcMgr->GetStoreInfo());
609 for (
unsigned n=0;n<hld->NumberOfTRB();n++) {
610 hadaq::TrbProcessor* trb = hld->GetTRB(n);
Generic file storage for DABC buffers.
bool WriteBufPayload(const void *ptr, uint64_t sz)
bool OpenWriting(const char *fname)
bool WriteBufHeader(uint64_t size, uint64_t typ=0)
Reference on memory from memory pool.
unsigned NumSegments() const
Returns number of segment in buffer.
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
unsigned GetTypeId() const
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
void SetTypeId(unsigned tid)
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Command definition class.
CommandDefinition & AddArg(const std::string &name, const std::string &kind="string", bool required=true, const RecordField &dflt=RecordField())
Represents command with its arguments.
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
bool SetInt(const std::string &name, int v)
std::string GetStr(const std::string &name, const std::string &dflt="") const
bool GetBool(const std::string &name, bool dflt=false) const
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
int GetInt(const std::string &name, int dflt=0) const
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
static void * FindSymbol(const std::string &symbol)
static bool LoadLibrary(const std::string &fname)
Represents objects hierarchy of remote (or local) DABC process.
Hierarchy FindChild(const char *name)
Return child element from hierarchy.
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Iterator over objects hierarchy
const char * name() const
Object * next(bool goinside=true)
ConnectionRequest Connect(const std::string &port1, const std::string &port2)
Request connection between two ports.
ModuleRef FindModule(const std::string &name)
Reference on dabc::Module class
std::string InputName(unsigned n=0, bool itemname=true)
Return item name of the input, can be used in connect command.
bool SetPortSignaling(const std::string &name, Port::EventsProducing signal)
virtual Parameter CreatePar(const std::string &name, const std::string &kind="")
void SetAutoStop(bool on=true)
If set, module will be automatically stopped when all i/o ports are disconnected.
std::string OutputName(unsigned indx=0, bool fullname=false) const
virtual void OnThreadAssigned()
std::string InputName(unsigned indx=0, bool fullname=false) const
unsigned NumOutputs() const
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
void EnsurePorts(unsigned numinp=0, unsigned numout=0, const std::string &poolname="")
Method ensure that at least specified number of input and output ports will be created.
const char * GetName() const
Returns name of the object, thread safe
Parameter & SetUnits(const std::string &unit)
Set units field of parameter.
Parameter & SetRatemeter(bool synchron=false, double interval=1.0)
Converts parameter in ratemeter - all values will be summed up and divided on specified interval.
int64_t GetArraySize() const
double * GetDoubleArr() const
std::string AsStr(const std::string &dflt="") const
int64_t AsInt(int64_t dflt=0) const
double AsDouble(double dflt=0.) const
RecordField GetField(const std::string &name) const
RecordField * GetFieldPtr(const std::string &name) const
bool HasField(const std::string &name) const
bool SetField(const std::string &name, const RecordField &v)
Reference on the arbitrary object
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Uniform Resource Locator interpreter.
std::string GetFullName() const
int GetOptionInt(const std::string &optname, int dflt=0) const
bool Execute(Command cmd, double tmout=-1.)
Hierarchy fWorkerHierarchy
place for publishing of worker parameters
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
virtual bool Publish(const Hierarchy &h, const std::string &path)
Read iterator for HADAQ events/subevents.
bool NextEvent()
Used for ready HLD events.
unsigned evntsize() const
hadaq::RawEvent * evnt() const
Read iterator for MBS events/subevents.
unsigned rawdatasize() const
void SetDefaultFill(int fillcol=3)
void * fInitFunc
how many parallel processes to start
virtual bool ProcessRecv(unsigned port)
Method called by framework when at least one buffer available in input port.
void GenerateEOF(dabc::Buffer buf)
virtual void AfterModuleStop()
virtual void BeforeModuleStart()
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
virtual void ProcessTimerEvent(unsigned)
Method called by framework when timer event is produced.
bool ProcessNextEvent(void *evnt, unsigned evntsize)
std::string fFileUrl
! configured file URL - module used to produce output
virtual void OnThreadAssigned()
bool RedistributeBuffers()
int fDefaultFill
! default fill color for 1-D histograms
RunModule(const std::string &name, dabc::Command cmd=nullptr)
void SaveHierarchy(dabc::Buffer buf)
void ProduceMergedHierarchy()
static void SetTRBStatus(dabc::Hierarchy &item, dabc::Hierarchy &logitem, hadaq::TrbProcessor *trb, bool change_progress=true, int *res_progress=nullptr, double *res_quality=nullptr, std::string *res_state=nullptr, std::vector< std::string > *res_msgs=nullptr, bool acknowledge_quality=false)
int main(int numc, char *args[])
std::string format(const char *fmt,...)