DABC (Data Acquisition Backbone Core)  2.9.9
RunModule.cxx
Go to the documentation of this file.
1 // $Id: RunModule.cxx 4772 2021-05-04 08:27:16Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "stream/RunModule.h"
17 
18 #include "dabc/Manager.h"
19 #include "dabc/Factory.h"
20 #include "dabc/Iterator.h"
21 #include "dabc/Buffer.h"
22 #include "dabc/Publisher.h"
23 #include "dabc/Url.h"
24 #include "dabc/BinaryFile.h"
25 
27 
28 #include "hadaq/Iterator.h"
29 #include "mbs/Iterator.h"
30 #include "hadaq/TdcProcessor.h"
31 #include "hadaq/TrbProcessor.h"
32 #include "hadaq/HldProcessor.h"
33 
34 #include <cstdlib>
35 
36 #include "base/Buffer.h"
37 #include "base/StreamProc.h"
38 #include "stream/DabcProcMgr.h"
39 
40 // ==================================================================================
41 
42 stream::RunModule::RunModule(const std::string &name, dabc::Command cmd) :
43  dabc::ModuleAsync(name, cmd),
44  fParallel(0),
45  fInitFunc(nullptr),
46  fStopMode(-1111),
47  fProcMgr(nullptr),
48  fAsf(),
49  fFileUrl(),
50  fDidMerge(false),
51  fTotalSize(0),
52  fTotalEvnts(0),
53  fTotalOutEvnts(0)
54 {
55  fParallel = Cfg("parallel", cmd).AsInt(0);
56 
57  fDefaultFill = Cfg("fillcolor", cmd).AsInt(3);
58 
59  // we need one input and no outputs
60  EnsurePorts(1, fParallel<0 ? 0 : fParallel);
61 
62  if (fParallel > 0) {
64  for (unsigned n=0;n<NumOutputs();++n)
66  }
67 
68  fInitFunc = cmd.GetPtr("initfunc");
69 
70  fFileUrl = cmd.GetStr("fileurl");
71 
72 
73  if ((fParallel>=0) && (fInitFunc==0)) {
74  // first generate and load init func
75 
76  if (fParallel>999) fParallel=999;
77 
78  // ensure that all histos on all branches present
79  hadaq::TdcProcessor::SetAllHistos(true);
80 
81  const char *dabcsys = getenv("DABCSYS");
82  const char *streamsys = getenv("STREAMSYS");
83  if (!dabcsys) {
84  EOUT("DABCSYS variable not set, cannot run stream framework");
86  return;
87  }
88 
89  if (!streamsys) {
90  EOUT("STREAMSYS variable not set, cannot run stream framework");
92  return;
93  }
94 
95  std::string extra_include;
96  if (cmd.GetBool("use_autotdc"))
97  if (system("ls first.C >/dev/null 2>/dev/null") != 0)
98  extra_include = dabc::format("-I%s/applications/autotdc", streamsys);
99 
100  bool second = system("ls second.C >/dev/null 2>/dev/null") == 0;
101 
102 #if defined(__MACH__) /* Apple OSX section */
103  const char *compiler = "clang++";
104  const char *ldflags = "";
105 #else
106  const char *compiler = "g++";
107  const char *ldflags = "-Wl,--no-as-needed";
108 #endif
109 
110  std::string exec = dabc::format("%s %s/plugins/stream/src/stream_engine.cpp -O2 -fPIC -Wall -std=c++11 -I. -I%s/include %s %s"
111  "-shared -Wl,-soname,librunstream.so %s -Wl,-rpath,%s/lib -Wl,-rpath,%s/lib -o librunstream.so",
112  compiler, dabcsys, streamsys, extra_include.c_str(),
113  (second ? "-D_SECOND_ " : ""), ldflags, dabcsys, streamsys);
114 
115  system("rm -f ./librunstream.so");
116 
117  DOUT0("Executing %s", exec.c_str());
118 
119  int res = system(exec.c_str());
120 
121  if (res!=0) {
122  EOUT("Fail to compile first.C/second.C scripts. Abort");
124  return;
125  }
126 
127  if (!dabc::Factory::LoadLibrary("librunstream.so")) {
128  EOUT("Fail to load generated librunstream.so library");
130  return;
131  }
132 
133  fInitFunc = dabc::Factory::FindSymbol("stream_engine");
134  if (fInitFunc==0) {
135  EOUT("Fail to find stream_engine function in librunstream.so library");
137  return;
138  }
139  }
140 
141 
142  if (fParallel>=0) {
143  fAsf = Cfg("asf",cmd).AsStr();
144  // do not autosave is specified, module will not stop when data source disappears
145  if ((fAsf.length()==0) || (fParallel>0)) SetAutoStop(false);
146  CreatePar("Events").SetRatemeter(false, 3.).SetUnits("Ev");
147  } else {
148  SetAutoStop(false);
149  }
150 
151  fWorkerHierarchy.Create("Worker");
152 
153  if (fParallel<=0) {
154  CreateTimer("Update", 1.);
155 
156  fWorkerHierarchy.CreateHChild("Status");
157  fWorkerHierarchy.SetField("_player", "DABC.StreamControl");
158 
159  CreatePar("EventsRate").SetRatemeter(false, 3.).SetUnits("Ev");
160  CreatePar("DataRate").SetRatemeter(false, 3.).SetUnits("MB");
161 
162  dabc::CommandDefinition cmddef = fWorkerHierarchy.CreateHChild("Control/StartRootFile");
163  cmddef.SetField(dabc::prop_kind, "DABC.Command");
164  // cmddef.SetField(dabc::prop_auth, true); // require authentication
165  cmddef.AddArg("fname", "string", true, "file.root");
166  cmddef.AddArg("kind", "int", false, "2");
167  cmddef.AddArg("maxsize", "int", false, "1900");
168 
169  cmddef = fWorkerHierarchy.CreateHChild("Control/StopRootFile");
170  cmddef.SetField(dabc::prop_kind, "DABC.Command");
171  // cmddef.SetField(dabc::prop_auth, true); // require authentication
172  } else {
173  CreateTimer("KeepAlive", 0.1);
174  }
175 
176  Publish(fWorkerHierarchy, dabc::format("$CONTEXT$/%s", GetName()));
177 
178  double interval = Cfg("AutoSave", cmd).AsDouble(0);
179  if (interval > 1) CreateTimer("AutoSave", interval);
180 }
181 
183 {
184  if (fProcMgr) {
185  delete fProcMgr;
186  fProcMgr = nullptr;
187  }
188 }
189 
190 typedef void* entryfunc();
191 
192 
194 {
196 
197  if ((fInitFunc!=nullptr) && (fParallel<=0)) {
198 
199  entryfunc* func = (entryfunc*) fInitFunc;
200 
201  fProcMgr = new DabcProcMgr;
202  fProcMgr->SetDefaultFill(fDefaultFill);
203  fProcMgr->SetTop(fWorkerHierarchy, fParallel==0);
204 
205  func();
206 
207  if (fFileUrl.length() > 0) {
208  dabc::Url url(fFileUrl);
209 
210  std::string fname = url.GetFullName();
211 
212  if (fname.rfind(".root") == fname.length() - 5) {
213  fProcMgr->SetTriggeredAnalysis(true);
214  int kind = url.GetOptionInt("kind", -1);
215  if (kind!=-1) fProcMgr->SetStoreKind(kind);
216  if (!fProcMgr->CreateStore(fFileUrl.c_str()))
217  EOUT("Fail to create store %s - check if libDabcRoot.so plugin in the xml file", fFileUrl.c_str());
218  }
219 
220  int hlevel = url.GetOptionInt("hlevel", -111);
221  if (hlevel != -111) fProcMgr->SetHistFilling(hlevel);
222 
223  int hldfilter = url.GetOptionInt("hldfilter", -111);
224  if (hldfilter>=0) new hadaq::HldFilter(hldfilter);
225  }
226 
227  // remove pointer, let other modules to create and use it
228  base::ProcMgr::ClearInstancePointer();
229 
230  if (fProcMgr->IsStreamAnalysis()) {
231  EOUT("Stream analysis kind is not supported in DABC engine");
233  }
234  } else
235  if ((fParallel>0) && (fInitFunc!=nullptr)) {
236  for (int n=0;n<fParallel;n++) {
237  std::string mname = dabc::format("%s%03d", GetName(), n);
238  dabc::CmdCreateModule cmd("stream::RunModule", mname);
239  cmd.SetPtr("initfunc", fInitFunc);
240  cmd.SetInt("parallel", -1);
241 
242  DOUT0("Create module %s", mname.c_str());
243 
244  dabc::mgr.Execute(cmd);
245 
247 
248  DOUT0("Connect %s ->%s", OutputName(n,true).c_str(), m.InputName(0).c_str());
249  dabc::Reference r = dabc::mgr.Connect(OutputName(n,true), m.InputName(0));
250  DOUT0("Connect output %u connected %s", n, DBOOL(IsOutputConnected(n)));
251 
252  m.Start();
253  }
254  }
255 
256  DOUT0("!!!! Assigned to thread %s !!!!!", thread().GetName());
257 }
258 
260 {
261  if (fDidMerge || (fParallel<=0)) return;
262 
263  fDidMerge = true;
264 
265  dabc::PublisherRef publ = GetPublisher();
266 
268  int nhist = 0;
269 
270  DOUT0("Can now merge histograms");
271  for (int n=0;n<fParallel;n++) {
272 
273  std::string mname = dabc::format("%s%03d", GetName(), n);
274 
276 
277  m.Stop();
278 
279  dabc::Command cmd("GetHierarchy");
280  m.Execute(cmd);
281 
282  dabc::Hierarchy h = cmd.GetRef("hierarchy");
283 
284  if (main.null()) {
285  DOUT0("Adopt histograms from %s", mname.c_str());
286  main = h;
287  continue;
288  }
289 
290  dabc::Iterator iter1(main), iter2(h);
291  bool miss = false;
292  while (iter1.next()) {
293  if (!iter2.next()) { miss = true; break; }
294  if (strcmp(iter1.name(),iter2.name())!=0) { miss = true; break; }
295 
296  // merge histograms till the end
297  dabc::Hierarchy item1 = iter1.ref();
298  dabc::Hierarchy item2 = iter2.ref();
299 
300  if (item1.HasField("_dabc_hist") && item2.HasField("_dabc_hist") &&
301  (item1.GetFieldPtr("bins")!=0) && (item2.GetFieldPtr("bins")!=0) &&
302  (item1.GetFieldPtr("bins")->GetArraySize() == item2.GetFieldPtr("bins")->GetArraySize())) {
303  double* arr1 = item1.GetFieldPtr("bins")->GetDoubleArr();
304  double* arr2 = item2.GetFieldPtr("bins")->GetDoubleArr();
305  int indx = item1.GetField("_kind").AsStr()=="ROOT.TH1D" ? 2 : 5;
306  int len = item1.GetFieldPtr("bins")->GetArraySize();
307  if (n==1) nhist++;
308 
309  if (arr1 && arr2)
310  while (++indx<len) arr1[indx]+=arr2[indx];
311  }
312  }
313 
314  if (miss) {
315  EOUT("!!!!!!!!!!!!!! MISMATCH - CANNOT MERGE HISTOGRAMS !!!!!!!!!!!!");
317  return;
318  } else {
319  DOUT0("Merged %d histograms from %s", nhist, mname.c_str());
320  }
321  }
322 
323  if (fAsf.length()>0)
324  SaveHierarchy(main.SaveToBuffer());
325 }
326 
327 
329 {
330  if (fProcMgr && fProcMgr->ExecuteHCommand(cmd)) {
331  if (fProcMgr->IsWorking()) ActivateInput(); // when working set, just ensure that module reads input
332  return dabc::cmd_true;
333  }
334 
335  if (cmd.IsName(dabc::CmdHierarchyExec::CmdName())) {
336  std::string cmdpath = cmd.GetStr("Item");
337  DOUT0("Execute command %s", cmdpath.c_str());
338 
339  if (cmdpath == "Control/StartRootFile") {
340  std::string fname = cmd.GetStr("fname","file.root");
341  int kind = cmd.GetInt("kind", 2);
342  int maxsize = cmd.GetInt("maxsize", 1900);
343  fname += dabc::format("?maxsize=%d", maxsize);
344  if (fProcMgr)
345  if (fProcMgr->CreateStore(fname.c_str())) {
346  // only in triggered mode storing is allowed
347  if (fProcMgr->IsRawAnalysis()) fProcMgr->SetTriggeredAnalysis(true);
348  fProcMgr->SetStoreKind(kind);
349  fProcMgr->UserPreLoop(0, true);
350 
351  }
352  return dabc::cmd_true;
353 
354  } else
355  if (cmdpath == "Control/StopRootFile") {
356  if (fProcMgr) fProcMgr->CloseStore();
357 
358  return dabc::cmd_true;
359  } else
360  return dabc::cmd_false;
361  }
362 
363  if (cmd.IsName("SlaveFinished")) {
364  if (--fStopMode == 0) {
365  ProduceMergedHierarchy();
366  DOUT0("Stop ourself");
367  Stop();
368  }
369  return dabc::cmd_true;
370  } else
371  if (cmd.IsName("GetHierarchy")) {
372  cmd.SetRef("hierarchy", fWorkerHierarchy);
373  return dabc::cmd_true;
374  }
375 
377 }
378 
380 {
381  DOUT0("START STREAM MODULE %s inp %s", GetName(), DBOOL(IsInputConnected(0)));
382 
383  if (fProcMgr) fProcMgr->UserPreLoop();
384 }
385 
387 {
388  if (buf.GetTotalSize()==0) return;
389 
390  DOUT0("store hierarchy size %d in temporary h.bin file", buf.GetTotalSize());
391  {
393  system("rm -f h.bin");
394  if (f.OpenWriting("h.bin")) {
395  if (f.WriteBufHeader(buf.GetTotalSize(), buf.GetTypeId()))
396  for (unsigned n=0;n<buf.NumSegments();n++)
397  f.WriteBufPayload(buf.SegmentPtr(n), buf.SegmentSize(n));
398  f.Close();
399  }
400  }
401 
402  std::string args("dabc_root -skip-zero -h h.bin -o ");
403  args += fAsf;
404 
405  DOUT0("Calling: %s", args.c_str());
406 
407  int res = system(args.c_str());
408 
409  if (res!=0) EOUT("Fail to convert DABC histograms in ROOT file, check h.bin file");
410  else system("rm -f h.bin");
411 }
412 
414 {
415  if (fProcMgr) fProcMgr->UserPostLoop();
416 
417  // DOUT0("!!!! thread on start %s !!!!!", thread().GetName());
418 
419  DOUT0("STOP STREAM MODULE %s data %lu evnts %lu outevents %lu %s", GetName(), fTotalSize, fTotalEvnts, fTotalOutEvnts, (fTotalEvnts == fTotalOutEvnts ? "ok" : "MISSMATCH"));
420 
421  if (fParallel > 0) {
422  ProduceMergedHierarchy();
423  } else if (fAsf.length()>0) {
424  SaveHierarchy(fWorkerHierarchy.SaveToBuffer());
425  }
426 
427  DestroyPar("Events");
428 }
429 
430 bool stream::RunModule::ProcessNextEvent(void* evnt, unsigned evntsize)
431 {
432  if (fProcMgr==0) return false;
433 
434  fTotalEvnts++;
435 
436  if (fParallel==0) Par("Events").SetValue(1);
437 
438  // TODO - later we need to use DABC buffer here to allow more complex
439  // analysis when many dabc buffers required at the same time to analyze data
440 
441  base::Buffer bbuf;
442 
443  bbuf.makereferenceof(evnt, evntsize);
444 
445  bbuf().kind = base::proc_TRBEvent;
446  bbuf().boardid = 0;
447  bbuf().format = 0;
448 
449  fProcMgr->ProvideRawData(bbuf);
450 
451  base::Event* outevent = 0;
452 
453  // scan new data
454  bool new_event = fProcMgr->AnalyzeNewData(outevent);
455 
456  while (new_event) {
457  fTotalOutEvnts++;
458 
459  fProcMgr->ProcessEvent(outevent);
460 
461  new_event = fProcMgr->ProduceNextEvent(outevent);
462  }
463 
464  delete outevent;
465 
466  return true;
467 }
468 
469 
471 {
472  if (fProcMgr && !fProcMgr->IsWorking()) return false;
473 
474  dabc::Buffer buf = Recv();
475 
476  if (fParallel==0) Par("DataRate").SetValue(buf.GetTotalSize()/1024./1024.);
477 
478  if (buf.GetTypeId() == dabc::mbt_EOF) {
479  if (fParallel<0) {
480  std::string main = GetName();
481  main.resize(main.length()-3);
482  dabc::mgr.FindModule(main).Submit(dabc::Command("SlaveFinished"));
483  }
484  return true;
485  }
486 
487  fTotalSize += buf.GetTotalSize();
488 
489  if (buf.GetTypeId() == mbs::mbt_MbsEvents) {
490  mbs::ReadIterator iter(buf);
491  while (iter.NextEvent()) {
492  if (iter.NextSubEvent())
493  ProcessNextEvent(iter.rawdata(), iter.rawdatasize());
494  }
495  } else {
496  hadaq::ReadIterator iter(buf);
497  while (iter.NextEvent()) {
498  ProcessNextEvent(iter.evnt(), iter.evntsize());
499  }
500  }
501 
502  return true;
503 }
504 
506 {
507  if ((fStopMode != -1111) || (fParallel <= 0)) return;
508 
509  DOUT0("Inject EOF to finish parallel jobs");
510 
511  SendToAllOutputs(buf);
512 
513  fStopMode = fParallel;
514  SendToAllOutputs(buf);
515 }
516 
518 {
519  while (CanRecv()) {
520 
521  unsigned indx(0), max(0), min(10);
522  for (unsigned n=0;n<NumOutputs();n++) {
523  unsigned cansend = NumCanSend(n);
524  if (cansend > max) { max = cansend; indx = n; }
525  if (cansend < min) min = cansend;
526  }
527 
528  // one need at least one output to be able send something
529  if (max==0) return false;
530 
531  // in case of EOF one need that all outputs can accept at least one buffer
532  if ((RecvQueueItem().GetTypeId() == dabc::mbt_EOF) && (min == 0))
533  return false;
534 
535  dabc::Buffer buf = Recv();
536 
537  if (buf.GetTypeId() == dabc::mbt_EOF) {
538  GenerateEOF(buf);
539  return false;
540  }
541 
542  fTotalSize += buf.GetTotalSize();
543 
544  int cnt = 0;
545 
546  if (buf.GetTypeId() == mbs::mbt_MbsEvents) {
547  mbs::ReadIterator iter(buf);
548  while (iter.NextEvent()) cnt++;
549  } else {
550  hadaq::ReadIterator iter(buf);
551  while (iter.NextEvent()) cnt++;
552  }
553 
554  fTotalEvnts+=cnt;
555 
556  Par("Events").SetValue(cnt);
557 
558  // DOUT0("Send buffer to output %d\n", indx);
559 
560  Send(indx, buf);
561  }
562 
563  // all possible buffers are processed, no reason to invoke method once again
564  return false;
565 }
566 
568 {
569  if (fParallel<=0)
570  return ProcessNextBuffer();
571 
572  return RedistributeBuffers();
573 }
574 
576 {
577  if (TimerName(timer) == "AutoSave") {
578  if (fProcMgr) fProcMgr->SaveAllHistograms();
579  return;
580  }
581 
582  if (TimerName(timer) == "KeepAlive") {
583  // std::string s = dabc::format("numcanrecv %u isconnected %s cansend", NumCanRecv(), DBOOL(IsPortConnected(InputName())));
584  // for (unsigned n=0;n<NumOutputs();n++)
585  // s.append(dabc::format(" %u:%u", n, NumCanSend(n)));
586  // DOUT0("keep alive %s", s.c_str());
587 
588  RedistributeBuffers();
589 
590  if ((fTotalEvnts > 0) && !IsPortConnected(InputName())) {
591  dabc::Buffer buf = TakeBuffer();
593  GenerateEOF(buf);
594  }
595 
596  return;
597  }
598 
599 
600  hadaq::HldProcessor *hld = dynamic_cast<hadaq::HldProcessor*> (fProcMgr->FindProc("HLD"));
601  if (!hld) return;
602 
603  dabc::Hierarchy folder = fWorkerHierarchy.FindChild("Status");
604 
605  folder.SetField("EventsRate", Par("EventsRate").GetField("value").AsDouble());
606  folder.SetField("EventsCount", (int64_t) fTotalEvnts);
607  folder.SetField("StoreInfo", fProcMgr->GetStoreInfo());
608 
609  for (unsigned n=0;n<hld->NumberOfTRB();n++) {
610  hadaq::TrbProcessor* trb = hld->GetTRB(n);
611  if (!trb) continue;
612 
613  dabc::Hierarchy item = folder.CreateHChild(trb->GetName());
614 
615  dabc::Hierarchy logitem;
616 
617  TdcCalibrationModule::SetTRBStatus(item, logitem, trb);
618  }
619 
620 }
621 
void * entryfunc()
Definition: RunModule.cxx:190
Generic file storage for DABC buffers.
Definition: BinaryFile.h:181
bool WriteBufPayload(const void *ptr, uint64_t sz)
Definition: BinaryFile.h:288
bool OpenWriting(const char *fname)
Definition: BinaryFile.h:227
bool WriteBufHeader(uint64_t size, uint64_t typ=0)
Definition: BinaryFile.h:266
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned NumSegments() const
Returns number of segment in buffer.
Definition: Buffer.h:163
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
Definition: Buffer.h:174
unsigned GetTypeId() const
Definition: Buffer.h:152
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
void SetTypeId(unsigned tid)
Definition: Buffer.h:151
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Definition: Buffer.h:171
Command definition class.
Definition: Parameter.h:333
CommandDefinition & AddArg(const std::string &name, const std::string &kind="string", bool required=true, const RecordField &dflt=RecordField())
Definition: Parameter.cxx:593
Represents command with its arguments.
Definition: Command.h:99
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
Definition: Command.cxx:151
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
Definition: Command.cxx:175
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
Definition: Command.cxx:168
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
Definition: Command.cxx:158
static void * FindSymbol(const std::string &symbol)
Definition: Factory.cxx:49
static bool LoadLibrary(const std::string &fname)
Definition: Factory.cxx:26
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
Hierarchy FindChild(const char *name)
Return child element from hierarchy.
Definition: Hierarchy.h:362
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Definition: Hierarchy.cxx:934
Iterator over objects hierarchy
Definition: Iterator.h:36
const char * name() const
Definition: Iterator.h:57
Object * next(bool goinside=true)
Definition: Iterator.cxx:44
Reference ref() const
Definition: Iterator.h:50
void StopApplication()
Definition: Manager.cxx:2231
ConnectionRequest Connect(const std::string &port1, const std::string &port2)
Request connection between two ports.
Definition: Manager.cxx:2138
ModuleRef FindModule(const std::string &name)
Definition: Manager.cxx:2018
Reference on dabc::Module class
Definition: Module.h:275
bool Stop()
Definition: Module.h:289
bool Start()
Definition: Module.h:287
std::string InputName(unsigned n=0, bool itemname=true)
Return item name of the input, can be used in connect command.
Definition: Module.cxx:1056
bool SetPortSignaling(const std::string &name, Port::EventsProducing signal)
Definition: Module.cxx:791
virtual Parameter CreatePar(const std::string &name, const std::string &kind="")
Definition: Module.cxx:129
void SetAutoStop(bool on=true)
If set, module will be automatically stopped when all i/o ports are disconnected.
Definition: Module.h:115
std::string OutputName(unsigned indx=0, bool fullname=false) const
Definition: Module.cxx:181
virtual void OnThreadAssigned()
Definition: Module.cxx:79
std::string InputName(unsigned indx=0, bool fullname=false) const
Definition: Module.cxx:188
unsigned NumOutputs() const
Definition: Module.h:141
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
Definition: Module.cxx:109
void EnsurePorts(unsigned numinp=0, unsigned numout=0, const std::string &poolname="")
Method ensure that at least specified number of input and output ports will be created.
Definition: Module.cxx:66
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
Parameter & SetUnits(const std::string &unit)
Set units field of parameter.
Definition: Parameter.h:256
Parameter & SetRatemeter(bool synchron=false, double interval=1.0)
Converts parameter in ratemeter - all values will be summed up and divided on specified interval.
Definition: Parameter.cxx:365
@ SignalEvery
Definition: Port.h:63
int64_t GetArraySize() const
Definition: Record.h:317
double * GetDoubleArr() const
Definition: Record.h:329
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
int64_t AsInt(int64_t dflt=0) const
Definition: Record.cxx:501
double AsDouble(double dflt=0.) const
Definition: Record.cxx:549
RecordField GetField(const std::string &name) const
Definition: Record.h:510
RecordField * GetFieldPtr(const std::string &name) const
Definition: Record.h:513
bool HasField(const std::string &name) const
Definition: Record.h:498
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
Reference on the arbitrary object
Definition: Reference.h:73
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
Uniform Resource Locator interpreter.
Definition: Url.h:33
std::string GetFullName() const
Definition: Url.cxx:124
int GetOptionInt(const std::string &optname, int dflt=0) const
Definition: Url.cxx:290
bool Execute(Command cmd, double tmout=-1.)
Definition: Worker.cxx:1147
bool Submit(Command cmd)
Definition: Worker.cxx:1139
Hierarchy fWorkerHierarchy
place for publishing of worker parameters
Definition: Worker.h:168
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
virtual bool Publish(const Hierarchy &h, const std::string &path)
Definition: Worker.cxx:1075
Read iterator for HADAQ events/subevents.
Definition: Iterator.h:39
bool NextEvent()
Used for ready HLD events.
Definition: Iterator.cxx:127
unsigned evntsize() const
Definition: Iterator.h:82
hadaq::RawEvent * evnt() const
Definition: Iterator.h:81
Read iterator for MBS events/subevents.
Definition: Iterator.h:40
unsigned rawdatasize() const
Definition: Iterator.h:88
void * rawdata() const
Definition: Iterator.h:87
void SetDefaultFill(int fillcol=3)
Definition: DabcProcMgr.h:51
void * fInitFunc
how many parallel processes to start
Definition: RunModule.h:40
virtual bool ProcessRecv(unsigned port)
Method called by framework when at least one buffer available in input port.
Definition: RunModule.cxx:567
void GenerateEOF(dabc::Buffer buf)
Definition: RunModule.cxx:505
virtual void AfterModuleStop()
Definition: RunModule.cxx:413
virtual void BeforeModuleStart()
Definition: RunModule.cxx:379
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Definition: RunModule.cxx:328
virtual void ProcessTimerEvent(unsigned)
Method called by framework when timer event is produced.
Definition: RunModule.cxx:575
bool ProcessNextEvent(void *evnt, unsigned evntsize)
Definition: RunModule.cxx:430
std::string fFileUrl
! configured file URL - module used to produce output
Definition: RunModule.h:44
virtual void OnThreadAssigned()
Definition: RunModule.cxx:193
bool RedistributeBuffers()
Definition: RunModule.cxx:517
int fDefaultFill
! default fill color for 1-D histograms
Definition: RunModule.h:49
virtual ~RunModule()
Definition: RunModule.cxx:182
bool ProcessNextBuffer()
Definition: RunModule.cxx:470
std::string fAsf
Definition: RunModule.h:43
RunModule(const std::string &name, dabc::Command cmd=nullptr)
Definition: RunModule.cxx:42
void SaveHierarchy(dabc::Buffer buf)
Definition: RunModule.cxx:386
void ProduceMergedHierarchy()
Definition: RunModule.cxx:259
static void SetTRBStatus(dabc::Hierarchy &item, dabc::Hierarchy &logitem, hadaq::TrbProcessor *trb, bool change_progress=true, int *res_progress=nullptr, double *res_quality=nullptr, std::string *res_state=nullptr, std::vector< std::string > *res_msgs=nullptr, bool acknowledge_quality=false)
int main(int numc, char *args[])
Definition: dabc_exe.cxx:68
#define DOUT0(args ...)
Definition: logging.h:156
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * prop_kind
Definition: Hierarchy.cxx:29
@ mbt_EOF
Definition: Buffer.h:45
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38
@ mbt_MbsEvents
Definition: MbsTypeDefs.h:348