19 #include "dabc/Factory.h" 
   30 #include "hadaq/TdcProcessor.h" 
   31 #include "hadaq/TrbProcessor.h" 
   32 #include "hadaq/HldProcessor.h" 
   36 #include "base/Buffer.h" 
   37 #include "base/StreamProc.h" 
   43    dabc::ModuleAsync(name, cmd),
 
   79       hadaq::TdcProcessor::SetAllHistos(
true);
 
   81       const char *dabcsys = getenv(
"DABCSYS");
 
   82       const char *streamsys = getenv(
"STREAMSYS");
 
   84          EOUT(
"DABCSYS variable not set, cannot run stream framework");
 
   90          EOUT(
"STREAMSYS variable not set, cannot run stream framework");
 
   95       std::string extra_include;
 
   97          if (system(
"ls first.C >/dev/null 2>/dev/null") != 0)
 
   98             extra_include = 
dabc::format(
"-I%s/applications/autotdc", streamsys);
 
  100       bool second = system(
"ls second.C >/dev/null 2>/dev/null") == 0;
 
  102 #if defined(__MACH__)  
  103       const char *compiler = 
"clang++";
 
  104       const char *ldflags = 
"";
 
  106       const char *compiler = 
"g++";
 
  107       const char *ldflags = 
"-Wl,--no-as-needed";
 
  110       std::string exec = 
dabc::format(
"%s %s/plugins/stream/src/stream_engine.cpp -O2 -fPIC -Wall -std=c++11 -I. -I%s/include %s %s" 
  111             "-shared -Wl,-soname,librunstream.so %s -Wl,-rpath,%s/lib -Wl,-rpath,%s/lib  -o librunstream.so",
 
  112             compiler, dabcsys, streamsys, extra_include.c_str(),
 
  113             (second ? 
"-D_SECOND_ " : 
""), ldflags, dabcsys, streamsys);
 
  115       system(
"rm -f ./librunstream.so");
 
  117       DOUT0(
"Executing %s", exec.c_str());
 
  119       int res = system(exec.c_str());
 
  122          EOUT(
"Fail to compile first.C/second.C scripts. Abort");
 
  128          EOUT(
"Fail to load generated librunstream.so library");
 
  135          EOUT(
"Fail to find stream_engine function in librunstream.so library");
 
  165       cmddef.
AddArg(
"fname", 
"string", 
true, 
"file.root");
 
  166       cmddef.
AddArg(
"kind", 
"int", 
false, 
"2");
 
  167       cmddef.
AddArg(
"maxsize", 
"int", 
false, 
"1900");
 
  178    double interval = 
Cfg(
"AutoSave", cmd).
AsDouble(0);
 
  179    if (interval > 1) 
CreateTimer(
"AutoSave", interval);
 
  197    if ((fInitFunc!=
nullptr) && (fParallel<=0)) {
 
  203       fProcMgr->SetTop(fWorkerHierarchy, fParallel==0);
 
  207       if (fFileUrl.length() > 0) {
 
  212          if (fname.rfind(
".root") == fname.length() - 5) {
 
  213             fProcMgr->SetTriggeredAnalysis(
true);
 
  215             if (kind!=-1) fProcMgr->SetStoreKind(kind);
 
  216             if (!fProcMgr->CreateStore(fFileUrl.c_str()))
 
  217                EOUT(
"Fail to create store %s - check if libDabcRoot.so plugin in the xml file", fFileUrl.c_str());
 
  221          if (hlevel != -111) fProcMgr->SetHistFilling(hlevel);
 
  224          if (hldfilter>=0) 
new hadaq::HldFilter(hldfilter);
 
  228       base::ProcMgr::ClearInstancePointer();
 
  230       if (fProcMgr->IsStreamAnalysis()) {
 
  231          EOUT(
"Stream analysis kind is not supported in DABC engine");
 
  235    if ((fParallel>0) && (fInitFunc!=
nullptr))  {
 
  236       for (
int n=0;n<fParallel;n++) {
 
  237          std::string mname = 
dabc::format(
"%s%03d", GetName(), n);
 
  239          cmd.
SetPtr(
"initfunc", fInitFunc);
 
  240          cmd.
SetInt(
"parallel", -1);
 
  242          DOUT0(
"Create module %s", mname.c_str());
 
  248          DOUT0(
"Connect %s ->%s", OutputName(n,
true).c_str(), m.
InputName(0).c_str());
 
  250          DOUT0(
"Connect output %u connected %s", n, 
DBOOL(IsOutputConnected(n)));
 
  256    DOUT0(
"!!!! Assigned to thread %s  !!!!!", thread().GetName());
 
  261    if (fDidMerge || (fParallel<=0)) 
return;
 
  270    DOUT0(
"Can now merge histograms");
 
  271    for (
int n=0;n<fParallel;n++) {
 
  273       std::string mname = 
dabc::format(
"%s%03d", GetName(), n);
 
  285          DOUT0(
"Adopt histograms from %s", mname.c_str());
 
  292       while (iter1.next()) {
 
  293          if (!iter2.
next()) { miss = 
true; 
break; }
 
  294          if (strcmp(iter1.name(),iter2.
name())!=0) { miss = 
true; 
break; }
 
  305             int indx = item1.
GetField(
"_kind").
AsStr()==
"ROOT.TH1D" ? 2 : 5;
 
  310               while (++indx<len) arr1[indx]+=arr2[indx];
 
  315          EOUT(
"!!!!!!!!!!!!!! MISMATCH - CANNOT MERGE HISTOGRAMS !!!!!!!!!!!!");
 
  319          DOUT0(
"Merged %d histograms from %s", nhist, mname.c_str());
 
  324       SaveHierarchy(
main.SaveToBuffer());
 
  330    if (fProcMgr && fProcMgr->ExecuteHCommand(cmd)) {
 
  331       if (fProcMgr->IsWorking()) ActivateInput(); 
 
  335    if (cmd.
IsName(dabc::CmdHierarchyExec::CmdName())) {
 
  336       std::string cmdpath = cmd.
GetStr(
"Item");
 
  337       DOUT0(
"Execute command %s", cmdpath.c_str());
 
  339       if (cmdpath == 
"Control/StartRootFile") {
 
  340          std::string fname = cmd.
GetStr(
"fname",
"file.root");
 
  341          int kind = cmd.
GetInt(
"kind", 2);
 
  342          int maxsize = cmd.
GetInt(
"maxsize", 1900);
 
  345             if (fProcMgr->CreateStore(fname.c_str())) {
 
  347                if (fProcMgr->IsRawAnalysis()) fProcMgr->SetTriggeredAnalysis(
true);
 
  348                fProcMgr->SetStoreKind(kind);
 
  349                fProcMgr->UserPreLoop(0, 
true);
 
  355       if (cmdpath == 
"Control/StopRootFile") {
 
  356          if (fProcMgr) fProcMgr->CloseStore();
 
  363    if (cmd.
IsName(
"SlaveFinished")) {
 
  364       if (--fStopMode == 0) {
 
  365          ProduceMergedHierarchy();
 
  366          DOUT0(
"Stop ourself");
 
  371    if (cmd.
IsName(
"GetHierarchy")) {
 
  372       cmd.
SetRef(
"hierarchy", fWorkerHierarchy);
 
  381    DOUT0(
"START STREAM MODULE %s inp %s", GetName(), 
DBOOL(IsInputConnected(0)));
 
  383    if (fProcMgr) fProcMgr->UserPreLoop();
 
  393       system(
"rm -f h.bin");
 
  402    std::string args(
"dabc_root -skip-zero -h h.bin -o ");
 
  405    DOUT0(
"Calling: %s", args.c_str());
 
  407    int res = system(args.c_str());
 
  409    if (res!=0) 
EOUT(
"Fail to convert DABC histograms in ROOT file, check h.bin file");
 
  410           else system(
"rm -f h.bin");
 
  415    if (fProcMgr) fProcMgr->UserPostLoop();
 
  419    DOUT0(
"STOP STREAM MODULE %s data %lu evnts %lu outevents %lu  %s", GetName(), fTotalSize, fTotalEvnts, fTotalOutEvnts, (fTotalEvnts == fTotalOutEvnts ? 
"ok" : 
"MISSMATCH"));
 
  422       ProduceMergedHierarchy();
 
  423    } 
else if (fAsf.length()>0) {
 
  424       SaveHierarchy(fWorkerHierarchy.SaveToBuffer());
 
  427    DestroyPar(
"Events");
 
  432    if (fProcMgr==0) 
return false;
 
  436    if (fParallel==0) Par(
"Events").SetValue(1);
 
  443    bbuf.makereferenceof(evnt, evntsize);
 
  445    bbuf().kind = base::proc_TRBEvent;
 
  449    fProcMgr->ProvideRawData(bbuf);
 
  451    base::Event* outevent = 0;
 
  454    bool new_event = fProcMgr->AnalyzeNewData(outevent);
 
  459       fProcMgr->ProcessEvent(outevent);
 
  461       new_event = fProcMgr->ProduceNextEvent(outevent);
 
  472    if (fProcMgr && !fProcMgr->IsWorking()) 
return false;
 
  476    if (fParallel==0) Par(
"DataRate").SetValue(buf.
GetTotalSize()/1024./1024.);
 
  480          std::string 
main = GetName();
 
  507    if ((fStopMode != -1111) || (fParallel <= 0)) 
return;
 
  509    DOUT0(
"Inject EOF to finish parallel jobs");
 
  511    SendToAllOutputs(buf);
 
  513    fStopMode = fParallel;
 
  514    SendToAllOutputs(buf);
 
  521       unsigned indx(0), max(0), min(10);
 
  522       for (
unsigned n=0;n<NumOutputs();n++) {
 
  523          unsigned cansend = NumCanSend(n);
 
  524          if (cansend > max) { max = cansend; indx = n; }
 
  525          if (cansend < min) min = cansend;
 
  529       if (max==0) 
return false;
 
  532       if ((RecvQueueItem().GetTypeId() == 
dabc::mbt_EOF) && (min == 0))
 
  556       Par(
"Events").SetValue(cnt);
 
  570       return ProcessNextBuffer();
 
  572    return RedistributeBuffers();
 
  577    if (TimerName(timer) == 
"AutoSave") {
 
  578       if (fProcMgr) fProcMgr->SaveAllHistograms();
 
  582    if (TimerName(timer) == 
"KeepAlive") {
 
  588       RedistributeBuffers();
 
  590       if ((fTotalEvnts > 0) && !IsPortConnected(InputName())) {
 
  600    hadaq::HldProcessor *hld = 
dynamic_cast<hadaq::HldProcessor*
> (fProcMgr->FindProc(
"HLD"));
 
  605    folder.
SetField(
"EventsRate", Par(
"EventsRate").GetField(
"value").AsDouble());
 
  606    folder.
SetField(
"EventsCount", (int64_t) fTotalEvnts);
 
  607    folder.
SetField(
"StoreInfo", fProcMgr->GetStoreInfo());
 
  609    for (
unsigned n=0;n<hld->NumberOfTRB();n++) {
 
  610       hadaq::TrbProcessor* trb = hld->GetTRB(n);
 
Generic file storage for DABC buffers.
 
bool WriteBufPayload(const void *ptr, uint64_t sz)
 
bool OpenWriting(const char *fname)
 
bool WriteBufHeader(uint64_t size, uint64_t typ=0)
 
Reference on memory from memory pool.
 
unsigned NumSegments() const
Returns number of segment in buffer.
 
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
 
unsigned GetTypeId() const
 
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
 
void SetTypeId(unsigned tid)
 
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
 
Command definition class.
 
CommandDefinition & AddArg(const std::string &name, const std::string &kind="string", bool required=true, const RecordField &dflt=RecordField())
 
Represents command with its arguments.
 
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
 
bool SetInt(const std::string &name, int v)
 
std::string GetStr(const std::string &name, const std::string &dflt="") const
 
bool GetBool(const std::string &name, bool dflt=false) const
 
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
 
int GetInt(const std::string &name, int dflt=0) const
 
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
 
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
 
static void * FindSymbol(const std::string &symbol)
 
static bool LoadLibrary(const std::string &fname)
 
Represents objects hierarchy of remote (or local) DABC process.
 
Hierarchy FindChild(const char *name)
Return child element from hierarchy.
 
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
 
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
 
Iterator over objects hierarchy
 
const char * name() const
 
Object * next(bool goinside=true)
 
ConnectionRequest Connect(const std::string &port1, const std::string &port2)
Request connection between two ports.
 
ModuleRef FindModule(const std::string &name)
 
Reference on dabc::Module class
 
std::string InputName(unsigned n=0, bool itemname=true)
Return item name of the input, can be used in connect command.
 
bool SetPortSignaling(const std::string &name, Port::EventsProducing signal)
 
virtual Parameter CreatePar(const std::string &name, const std::string &kind="")
 
void SetAutoStop(bool on=true)
If set, module will be automatically stopped when all i/o ports are disconnected.
 
std::string OutputName(unsigned indx=0, bool fullname=false) const
 
virtual void OnThreadAssigned()
 
std::string InputName(unsigned indx=0, bool fullname=false) const
 
unsigned NumOutputs() const
 
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
 
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
 
void EnsurePorts(unsigned numinp=0, unsigned numout=0, const std::string &poolname="")
Method ensure that at least specified number of input and output ports will be created.
 
const char * GetName() const
Returns name of the object, thread safe
 
Parameter & SetUnits(const std::string &unit)
Set units field of parameter.
 
Parameter & SetRatemeter(bool synchron=false, double interval=1.0)
Converts parameter in ratemeter - all values will be summed up and divided on specified interval.
 
int64_t GetArraySize() const
 
double * GetDoubleArr() const
 
std::string AsStr(const std::string &dflt="") const
 
int64_t AsInt(int64_t dflt=0) const
 
double AsDouble(double dflt=0.) const
 
RecordField GetField(const std::string &name) const
 
RecordField * GetFieldPtr(const std::string &name) const
 
bool HasField(const std::string &name) const
 
bool SetField(const std::string &name, const RecordField &v)
 
Reference on the arbitrary object
 
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
 
Uniform Resource Locator interpreter.
 
std::string GetFullName() const
 
int GetOptionInt(const std::string &optname, int dflt=0) const
 
bool Execute(Command cmd, double tmout=-1.)
 
Hierarchy fWorkerHierarchy
place for publishing of worker parameters
 
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
 
virtual bool Publish(const Hierarchy &h, const std::string &path)
 
Read iterator for HADAQ events/subevents.
 
bool NextEvent()
Used for ready HLD events.
 
unsigned evntsize() const
 
hadaq::RawEvent * evnt() const
 
Read iterator for MBS events/subevents.
 
unsigned rawdatasize() const
 
void SetDefaultFill(int fillcol=3)
 
void * fInitFunc
how many parallel processes to start
 
virtual bool ProcessRecv(unsigned port)
Method called by framework when at least one buffer available in input port.
 
void GenerateEOF(dabc::Buffer buf)
 
virtual void AfterModuleStop()
 
virtual void BeforeModuleStart()
 
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
 
virtual void ProcessTimerEvent(unsigned)
Method called by framework when timer event is produced.
 
bool ProcessNextEvent(void *evnt, unsigned evntsize)
 
std::string fFileUrl
! configured file URL - module used to produce output
 
virtual void OnThreadAssigned()
 
bool RedistributeBuffers()
 
int fDefaultFill
! default fill color for 1-D histograms
 
RunModule(const std::string &name, dabc::Command cmd=nullptr)
 
void SaveHierarchy(dabc::Buffer buf)
 
void ProduceMergedHierarchy()
 
static void SetTRBStatus(dabc::Hierarchy &item, dabc::Hierarchy &logitem, hadaq::TrbProcessor *trb, bool change_progress=true, int *res_progress=nullptr, double *res_quality=nullptr, std::string *res_state=nullptr, std::vector< std::string > *res_msgs=nullptr, bool acknowledge_quality=false)
 
int main(int numc, char *args[])
 
std::string format(const char *fmt,...)