DABC (Data Acquisition Backbone Core)  2.9.9
Module.cxx
Go to the documentation of this file.
1 // $Id: Module.cxx 4727 2021-03-13 18:14:44Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/Module.h"
17 
18 #include "dabc/MemoryPool.h"
19 #include "dabc/Manager.h"
20 #include "dabc/Publisher.h"
21 
22 // __________________________________________________________________
23 
24 dabc::Module::Module(const std::string &name, Command cmd) :
25  Worker(MakePair(name.empty() ? cmd.GetStr("Name","module") : name)),
26  fRunState(false),
27  fInputs(),
28  fOutputs(),
29  fPools(),
30  fTimers(),
31  fSysTimerIndex((unsigned)-1),
32  fAutoStop(true),
33  fDfltPool(),
34  fInfoParName(),
35  fPublishPars()
36 {
37  std::string poolname = Cfg(dabc::xmlPoolName, cmd).AsStr();
38  int numinp = Cfg(dabc::xmlNumInputs, cmd).AsInt(0);
39  int numout = Cfg(dabc::xmlNumOutputs, cmd).AsInt(0);
40  fAutoStop = Cfg("autostop", cmd).AsBool(fAutoStop);
41  fPublishPars = Cfg("publish", cmd).AsStr();
42 
43  DOUT2("Create module %s with pool:%s numinp:%d numout:%d", GetName(), poolname.c_str(), numinp, numout);
44 
45  EnsurePorts(numinp, numout, poolname);
46 
47  // we will use 3 priority levels:
48  // 0 - normal for i/o ,
49  // 1 - for commands and replies,
50  // 2 - for sys commands (in modules thread itself)
51 
52 // CreateCmdDef("SetPriority").AddArg("Priority", "int", true);
53 //
54 // CreateCmdDef(CmdSetParameter::CmdName()).AddArg("ParName", "string", true).AddArg("ParValue", "string", true);
55 
56  DOUT3("++++++++++ dabc::Module::Module() %s done", GetName());
57 }
58 
60 {
61  DOUT3("dabc::Module::~Module() %s", GetName());
62 
63  if (fRunState) EOUT("Module %s destroyed in running state", GetName());
64 }
65 
66 void dabc::Module::EnsurePorts(unsigned numinp, unsigned numout, const std::string &poolname)
67 {
68  while (NumInputs() < numinp)
69  CreateInput(format("Input%u", NumInputs()));
70 
71  while (NumOutputs() < numout)
72  CreateOutput(format("Output%u", NumOutputs()));
73 
74  if (!poolname.empty() && (NumPools()==0) && ((NumInputs() + NumOutputs())>0))
75  CreatePoolHandle(poolname);
76 }
77 
78 
80 {
81  DOUT5("Module %s on thread assigned", GetName());
82 
83  if (!fPublishPars.empty())
84  PublishPars(fPublishPars);
85 
86  for (unsigned n=0;n<fItems.size();n++) {
87  ModuleItem* item = fItems[n];
88  if (item && !item->HasThread() && item->ItemNeedThread())
89  item->AssignToThread(thread(), true);
90  }
91 }
92 
93 std::string dabc::Module::TimerName(unsigned indx, bool fullname) const
94 {
95  if (indx >= fTimers.size()) return "";
96  if (fullname) return fTimers[indx]->ItemName();
97  return fTimers[indx]->GetName();
98 }
99 
100 unsigned dabc::Module::FindTimer(const std::string &name)
101 {
102  if (!name.empty())
103  for (unsigned n=0;n<fTimers.size();n++)
104  if (fTimers[n]->IsName(name.c_str())) return n;
105  return (unsigned) -1;
106 }
107 
108 
109 unsigned dabc::Module::CreateTimer(const std::string &name, double period_sec, bool synchron)
110 {
111  unsigned indx = FindTimer(name);
112  if (IsValidTimer(indx)) return indx;
113 
114  bool systimer = !IsValidTimer(fSysTimerIndex) && (name.find("Sys")==0);
115 
116  dabc::Timer* timer = new Timer(this, systimer, name, period_sec, synchron);
117 
118  AddModuleItem(timer);
119 
120  timer->SetItemSubId(fTimers.size());
121 
122  if (systimer) fSysTimerIndex = timer->ItemSubId();
123 
124  fTimers.push_back(timer);
125 
126  return timer->ItemSubId();
127 }
128 
129 dabc::Parameter dabc::Module::CreatePar(const std::string &name, const std::string &kind)
130 {
131  dabc::Parameter par = dabc::Worker::CreatePar(name, kind);
132 
133  if (kind == "info") {
134  std::string itemname = par.ItemName();
135  dabc::LockGuard lock(ObjectMutex());
136  if (fInfoParName.empty())
137  fInfoParName = itemname;
138  }
139 
140  return par;
141 }
142 
143 void dabc::Module::SetInfoParName(const std::string &parname)
144 {
145  dabc::LockGuard lock(ObjectMutex());
146  fInfoParName = parname;
147 }
148 
149 
150 std::string dabc::Module::GetInfoParName() const
151 {
152  dabc::LockGuard lock(ObjectMutex());
153  return fInfoParName;
154 }
155 
156 
157 unsigned dabc::Module::FindOutput(const std::string &name) const
158 {
159  if (!name.empty())
160  for (unsigned n=0;n<fOutputs.size();n++)
161  if (fOutputs[n]->IsName(name)) return n;
162  return (unsigned) -1;
163 }
164 
165 unsigned dabc::Module::FindInput(const std::string &name) const
166 {
167  if (!name.empty())
168  for (unsigned n=0;n<fInputs.size();n++)
169  if (fInputs[n]->IsName(name)) return n;
170  return (unsigned) -1;
171 }
172 
173 unsigned dabc::Module::FindPool(const std::string &name) const
174 {
175  if (!name.empty())
176  for (unsigned n=0;n<fPools.size();n++)
177  if (fPools[n]->IsName(name)) return n;
178  return (unsigned) -1;
179 }
180 
181 std::string dabc::Module::OutputName(unsigned indx, bool fullname) const
182 {
183  if (indx>=fOutputs.size()) return "";
184  if (fullname) return fOutputs[indx]->ItemName();
185  return fOutputs[indx]->GetName();
186 }
187 
188 std::string dabc::Module::InputName(unsigned indx, bool fullname) const
189 {
190  if (indx>=fInputs.size()) return "";
191  if (fullname) return fInputs[indx]->ItemName();
192  return fInputs[indx]->GetName();
193 }
194 
195 std::string dabc::Module::PoolName(unsigned indx, bool fullname) const
196 {
197  if (indx>=fPools.size()) return "";
198  if (fullname) return fPools[indx]->ItemName();
199  return fPools[indx]->GetName();
200 }
201 
202 
203 unsigned dabc::Module::CreateUserItem(const std::string &name)
204 {
205  unsigned indx = FindUserItem(name);
206  if (IsValidUserItem(indx)) return indx;
207 
208  ModuleItem* item = new ModuleItem(mitUser, this, name);
209 
210  AddModuleItem(item);
211 
212  item->SetItemSubId(fUsers.size());
213 
214  fUsers.push_back(item);
215 
216  return item->ItemSubId();
217 }
218 
219 unsigned dabc::Module::FindUserItem(const std::string &name)
220 {
221  if (!name.empty())
222  for (unsigned n=0;n<fUsers.size();n++)
223  if (fUsers[n]->IsName(name)) return n;
224  return (unsigned) -1;
225 }
226 
227 std::string dabc::Module::UserItemName(unsigned indx, bool fullname) const
228 {
229  if (indx>=fUsers.size()) return "";
230  if (fullname) return fUsers[indx]->ItemName();
231  return fUsers[indx]->GetName();
232 }
233 
234 void dabc::Module::ProduceUserItemEvent(unsigned indx, unsigned cnt)
235 {
236  while (IsValidUserItem(indx) && cnt--)
237  FireEvent(evntUser, fUsers[indx]->ItemId());
238 }
239 
241 {
242  DOUT3("Start module %s thrd %s", GetName(), ThreadName().c_str());
243 
244  if (thread().IsItself()) return DoStart();
245 
246  return Execute("StartModule") == cmd_true;
247 }
248 
250 {
251  DOUT3("Stop module %s thrd %s done", GetName(), ThreadName().c_str());
252 
253  if (thread().IsItself()) return DoStop();
254 
255  return Execute("StopModule") == cmd_true;
256 }
257 
259 {
260  if (fDfltPool.null()) fDfltPool = dabc::mgr.FindPool(xmlWorkPool);
261 
262  MemoryPool* pool = dynamic_cast<MemoryPool*> (fDfltPool());
263 
264  if (pool!=0) return pool->TakeBuffer();
265 
266  return dabc::Buffer();
267 }
268 
269 bool dabc::Module::DisconnectPort(const std::string &portname, bool witherr)
270 {
271  PortRef port = FindPort(portname);
272 
273  if (port.null()) return false;
274 
275  port()->Disconnect(witherr);
276 
277  // we should process event (disregard running state) to allow module react on such action
278  ProcessItemEvent(port(), witherr ? evntPortError : evntPortDisconnect);
279 
280  return true;
281 }
282 
283 
285 {
286  SetWorkerPriority(pri);
287 
288  for (unsigned n=0;n<fItems.size();n++)
289  if (fItems[n]) fItems[n]->SetItemPriority(pri);
290 }
291 
293 {
294  for (unsigned n=0;n<fItems.size();n++) {
295  Port* port = dynamic_cast<Port*> (fItems[n]);
296  if (port) port->Disconnect(witherr);
297  }
298 }
299 
300 
301 bool dabc::Module::SubmitCommandToTransport(const std::string &portname, Command cmd)
302 {
303  PortRef port = FindPort(portname);
304 
305  if (port.null()) return false;
306 
307  return port()->SubmitCommandToTransport(cmd);
308 }
309 
310 
312 {
313  // this hook in command execution routine allows us to "preview"
314  // command before it actually executed
315  // if it is system command, just execute it immediately
316 
317  int cmd_res = cmd_ignore;
318 
319  DOUT3("Module:%s PreviewCommand %s", GetName(), cmd.GetName());
320 
321  if (cmd.HasField("_for_the_port_")) {
322  // redirect submitted command to the port transport
323  std::string portname = cmd.GetStr("_for_the_port_");
324  cmd.RemoveField("_for_the_port_");
325  if (SubmitCommandToTransport(portname, cmd)) cmd_res = cmd_postponed;
326  else cmd_res = cmd_false;
327  } else
328  if (cmd.IsName("SetQueue")) {
329  PortRef port = FindPort(cmd.GetStr("Port"));
330  Reference q = cmd.GetRef("Queue");
331  if (port.null()) {
332  EOUT("Wrong port id when assigning queue");
333  cmd_res = cmd_false;
334  } else {
335  port()->SetQueue(q);
336  cmd_res = cmd_true;
337  }
338  } else
339  if (cmd.IsName("StartModule") || cmd.IsName(CmdStartModule::CmdName()))
340  cmd_res = cmd_bool(DoStart());
341  else
342  if (cmd.IsName("StopModule")|| cmd.IsName(CmdStopModule::CmdName()))
343  cmd_res = cmd_bool(DoStop());
344  else
345  if (cmd.IsName("SetPriority")) {
346  if (fThread()) {
347  fThread()->SetPriority(cmd.GetInt("Priority",0));
348  cmd_res = cmd_true;
349  } else
350  cmd_res = cmd_false;
351  } else
352  if (cmd.IsName("CheckConnected")) {
353  cmd_res = cmd_true;
354  for (unsigned n=0;n<NumInputs();n++)
355  if (!Input(n)->IsConnected()) cmd_res = cmd_false;
356  for (unsigned n=0;n<NumOutputs();n++)
357  if (!Output(n)->IsConnected()) cmd_res = cmd_false;
358  } else
359  if (cmd.IsName("IsInputConnect")) {
360  unsigned ninp = cmd.GetUInt("Number");
361  cmd_res = cmd_bool((ninp<NumInputs()) && Input(ninp)->IsConnected());
362  } else
363  if (cmd.IsName("IsOutputConnect")) {
364  unsigned nout = cmd.GetUInt("Number");
365  cmd_res = cmd_bool((nout<NumOutputs()) && Output(nout)->IsConnected());
366  } else
367  if (cmd.IsName("DisconnectPort")) {
368  cmd_res = cmd_bool(DisconnectPort(cmd.GetStr("Port"), cmd.GetBool("WithErr")));
369  } else
370  if (cmd.IsName("IsPortConnected")) {
371  PortRef port = FindPort(cmd.GetStr("Port"));
372  if (!port.null())
373  cmd_res = cmd_bool(port()->IsConnected());
374  else
375  cmd_res = cmd_false;
376  } else
377  if (cmd.IsName("GetSignalingKind")) {
378  PortRef port = FindPort(cmd.GetStr("Port"));
379  if (!port.null()) {
380  cmd_res = cmd_true;
381  cmd.SetInt("Kind", port()->SignalingKind());
382  } else
383  cmd_res = cmd_false;
384  } else
385 
386  if (cmd.IsName("GetPoolHandle")) {
387  unsigned cnt = cmd.GetUInt("Number");
388 
389  // in any case command returns true, but reference set only for the pools
390  cmd_res = cmd_true;
391 
392  for (unsigned indx=0;indx<fPools.size();indx++) {
393  PoolHandle* pool = fPools[indx];
394  if (pool->QueueCapacity()==0) continue;
395 
396  if (cnt>0) { cnt--; continue; }
397 
398  cmd.SetRef("Port", PortRef(pool));
399  cmd.SetRef("Pool", pool->fPool);
400  break;
401  }
402  } else
403  if (cmd.IsName("GetNumInputs")) {
404  cmd.SetUInt("Number", NumInputs());
405  cmd_res = cmd_true;
406  } else
407  if (cmd.IsName("GetNumOutputs")) {
408  cmd.SetUInt("Number", NumOutputs());
409  cmd_res = cmd_true;
410  } else
411  if (cmd.IsName("GetInputName")) {
412  unsigned id = cmd.GetUInt("Id");
413  bool asitem = cmd.GetBool("AsItem");
414  if (id < NumInputs()) {
415  cmd.SetStr("Name", asitem ? Input(id)->ItemName() : std::string(Input(id)->GetName()));
416  cmd_res = cmd_true;
417  } else {
418  cmd_res = cmd_false;
419  }
420  } else
421  if (cmd.IsName("GetOutputName")) {
422  unsigned id = cmd.GetUInt("Id");
423  bool asitem = cmd.GetBool("AsItem");
424  if (id < NumOutputs()) {
425  cmd.SetStr("Name", asitem ? Output(id)->ItemName() : std::string(Output(id)->GetName()));
426  cmd_res = cmd_true;
427  } else {
428  cmd_res = cmd_false;
429  }
430  } else
431  if (cmd.IsName("MakeConnReq")) {
432  dabc::PortRef port = FindPort(cmd.GetStr("Port"));
433 
434  if (!port.null()) {
435  dabc::ConnectionRequest req = port()->GetConnReq(true);
436 
437  req.SetInitState();
438 
439  req.SetRemoteUrl(cmd.GetStr("Url"));
440  req.SetServerSide(cmd.GetBool("IsServer"));
441 
442  std::string thrdname = port.Cfg(xmlThreadAttr).AsStr();
443  if (thrdname.empty())
444  switch (dabc::mgr.GetThreadsLayout()) {
445  case dabc::layoutMinimalistic: thrdname = dabc::mgr.ThreadName(); break;
446  case dabc::layoutPerModule: thrdname = ThreadName(); break;
447  case dabc::layoutBalanced: thrdname = ThreadName() + (port.IsInput() ? "Inp" : "Out"); break;
448  case dabc::layoutMaximal: thrdname = ThreadName() + port.GetName(); break;
449  default: thrdname = ThreadName(); break;
450  }
451  req.SetConnThread(thrdname);
452 
453  req.SetUseAckn(port.Cfg(xmlUseacknAttr).AsBool(false));
454 
455  req.SetOptional(port.Cfg(xmlOptionalAttr).AsBool(false));
456 
457  req.SetConnDevice(port.Cfg(xmlDeviceAttr).AsStr());
458 
459  req.SetConnTimeout(port.Cfg(xmlTimeoutAttr).AsDouble(10.));
460 
461  cmd.SetRef("ConnReq", req);
462 
463  cmd_res = cmd_true;
464  } else {
465  cmd_res = cmd_false;
466  }
467  } else
468  if (cmd.IsName(dabc::CmdGetBinary::CmdName()) && (cmd.GetStr("Kind")=="module.json")) {
469 
470  dabc::Record info;
471 
472  info.CreateRecord(GetName());
473 
474  info.SetField("NumInputs", NumInputs());
475  info.SetField("NumOutputs", NumOutputs());
476  info.SetField("NumPools", NumPools());
477 
478  std::vector<int64_t> outq, inpq, cansend, canrecv, cantake;
479  for (unsigned indx=0;indx<NumOutputs();++indx) {
480  outq.push_back(OutputQueueCapacity(indx));
481  cansend.push_back(fOutputs[indx]->NumCanSend());
482  }
483  for (unsigned indx=0;indx<NumInputs();++indx) {
484  inpq.push_back(InputQueueCapacity(indx));
485  canrecv.push_back(fInputs[indx]->NumCanRecv());
486  }
487  for (unsigned indx=0;indx<NumPools();++indx) {
488  cantake.push_back(fPools[indx]->CanTakeBuffer() ? 0 : 1);
489  }
490 
491  info.SetField("InputQueueCapacity", inpq);
492  info.SetField("OutputQueueCapacity", outq);
493  info.SetField("NumCanSend", cansend);
494  info.SetField("NumCanRecv", canrecv);
495  info.SetField("NumCanTake", cantake);
496 
497  cmd.SetStr("StringReply", info.SaveToJson());
498 
499  cmd_res = cmd_true;
500  } else
501  if (cmd.IsName(dabc::CmdGetBinary::CmdName()) && (cmd.GetStr("Kind")=="transport.json") && !FindPort(cmd.GetStr("subitem")).null()) {
502  std::string portname = cmd.GetStr("subitem");
503  cmd.RemoveField("subitem");
504  if (SubmitCommandToTransport(portname, cmd)) cmd_res = cmd_postponed;
505  else cmd_res = cmd_false;
506  } else
507  cmd_res = Worker::PreviewCommand(cmd);
508 
509  if (cmd_res!=cmd_ignore)
510  DOUT3("Module:%s PreviewCommand %s res=%d", GetName(), cmd.GetName(), cmd_res);
511 
512  return cmd_res;
513 }
514 
515 
517 {
518  DOUT4("Module::Find %p name = %s parent %p", this, GetName(), GetParent());
519 
520  if (GetParent()==0) return false;
521 
522  // module will always have tag "Module", class could be specified with attribute
523  while (cfg.FindItem(xmlModuleNode)) {
524  if (cfg.CheckAttr(xmlNameAttr, GetName())) return true;
525  }
526 
527  return false;
528 }
529 
531 {
532  cont->Field(xmlNumInputs).SetInt(NumInputs());
533  cont->Field(xmlNumOutputs).SetInt(NumOutputs());
534 }
535 
537 {
538  if (IsRunning()) DoStop();
539 
540  ModuleCleanup();
541 
542  DOUT3("Module cleanup %s numchilds %u", GetName(), NumChilds());
543 
544  for (unsigned n=0;n<fItems.size();n++)
545  if (fItems[n]) fItems[n]->DoCleanup();
546 
547  fDfltPool.Release();
548 
549  fSysTimerIndex = -1;
550 
552 }
553 
554 double dabc::Module::ProcessTimeout(double last_diff)
555 {
556  if (fSysTimerIndex < fTimers.size())
557  return fTimers[fSysTimerIndex]->ProcessTimeout(last_diff);
558 
559  return -1.;
560 }
561 
562 
564 {
565  if (IsRunning()) return true;
566 
567  DOUT3("dabc::Module::DoStart() %s", GetName());
568 
569  BeforeModuleStart();
570 
571  fRunState = true;
572 
573  for (unsigned n=0;n<fItems.size();n++)
574  if (fItems[n]) fItems[n]->DoStart();
575 
576  // treat special case of sys timer here - enable timeout of module itself
577  if ((fSysTimerIndex < fTimers.size()) && (fTimers[fSysTimerIndex]->fPeriod>0))
578  ActivateTimeout(fTimers[fSysTimerIndex]->fPeriod);
579 
580 
581  DOUT3("dabc::Module::DoStart() %s done", GetName());
582 
583  return true;
584 }
585 
587 {
588  DOUT3("dabc::Module::DoStop() %s", GetName());
589 
590  if (!IsRunning()) return true;
591 
592  // treat special case of sys timer here - disable timeout of module
593  if (fSysTimerIndex < fTimers.size()) ActivateTimeout(-1);
594 
595  for (unsigned n=0;n<fItems.size();n++)
596  if (fItems[n]) fItems[n]->DoStop();
597 
598  fRunState = false;
599 
600  AfterModuleStop();
601 
602  DOUT3("dabc::Module::DoStop() %s done", GetName());
603 
604  return true;
605 }
606 
607 unsigned dabc::Module::CreatePoolHandle(const std::string &poolname, unsigned queue)
608 {
609  unsigned index = FindPool(poolname);
610  if (IsValidPool(index)) return index;
611 
612  dabc::MemoryPoolRef pool = dabc::mgr.FindPool(poolname);
613 
614  if (pool.null()) {
615  EOUT("Pool %s not exists - cannot connect to module %s", poolname.c_str(), GetName());
616  return (unsigned) -1;
617  }
618 
619  PoolHandle* handle = new PoolHandle(this, pool, poolname, queue);
620 
621  AddModuleItem(handle);
622 
623  handle->SetItemSubId(fPools.size());
624  fPools.push_back(handle);
625 
626  return handle->ItemSubId();
627 }
628 
630 {
631  // at that place one cannot use any dynamic_cast to inherited types,
632  // while constructor of item is not completely finished
633 
634  unsigned id = fItems.size();
635 
636  fItems.push_back(item);
637 
638  item->SetItemId(id);
639 
640  if (id>moduleitemMaxId) {
641  EOUT("Item id is too big, event propagation will not work");
642  exit(104);
643  }
644 
645  if (HasThread() && item->ItemNeedThread())
646  item->AssignToThread(thread(), true);
647 
648 // DOUT0("Module:%s Add item:%s Id:%d", GetName(), item->GetName(), id);
649 }
650 
652 {
653  unsigned id = item->ItemId();
654 
655  for (unsigned n=0;n<fInputs.size();n++) {
656  if (fInputs[n] == item) {
657  fInputs.erase(fInputs.begin()+n);
658  } else {
659  fInputs[n]->SetItemSubId(n);
660  }
661  }
662 
663  for (unsigned n=0;n<fOutputs.size();n++) {
664  if (fOutputs[n] == item) {
665  fOutputs.erase(fOutputs.begin()+n);
666  } else {
667  fOutputs[n]->SetItemSubId(n);
668  }
669  }
670 
671  for (unsigned n=0;n<fPools.size();n++) {
672  if (fPools[n] == item) {
673  fPools.erase(fPools.begin()+n);
674  } else {
675  fPools[n]->SetItemSubId(n);
676  }
677  }
678 
679  for (unsigned n=0;n<fTimers.size();n++) {
680  if (fTimers[n] == item) {
681  fTimers.erase(fTimers.begin()+n);
682  } else {
683  fTimers[n]->SetItemSubId(n);
684  }
685  }
686 
687  for (unsigned n=0;n<fUsers.size();n++) {
688  if (fUsers[n] == item) {
689  fUsers.erase(fUsers.begin()+n);
690  } else {
691  fUsers[n]->SetItemSubId(n);
692  }
693  }
694 
695  fItems[id] = 0;
696 }
697 
698 
699 dabc::PortRef dabc::Module::FindPort(const std::string &name) const
700 {
701  return FindChildRef(name.c_str());
702 }
703 
704 unsigned dabc::Module::CreateInput(const std::string &name, unsigned queue)
705 {
706  unsigned indx = FindInput(name);
707  if (IsValidInput(indx)) return indx;
708 
709  if (queue == 0) return (unsigned) -1;
710 
711  InputPort* port = new InputPort(this, name, queue);
712 
713  AddModuleItem(port);
714 
715  port->SetItemSubId(fInputs.size());
716  fInputs.push_back(port);
717 
718  if (!port->fRateName.empty() && port->fRate.null()) {
719  if (Par(port->fRateName).null())
720  CreatePar(port->fRateName).SetRatemeter(false, 3.).SetUnits("MB");
721  port->SetRateMeter(Par(port->fRateName));
722  }
723 
724  return port->ItemSubId();
725 }
726 
727 unsigned dabc::Module::CreateOutput(const std::string &name, unsigned queue)
728 {
729  unsigned indx = FindOutput(name);
730  if (IsValidOutput(indx)) return indx;
731 
732  if (queue == 0) return (unsigned) -1;
733 
734  OutputPort* port = new OutputPort(this, name, queue);
735 
736  AddModuleItem(port);
737 
738  port->SetItemSubId(fOutputs.size());
739  fOutputs.push_back(port);
740 
741  if (!port->fRateName.empty() && port->fRate.null()) {
742  if (Par(port->fRateName).null())
743  CreatePar(port->fRateName).SetRatemeter(false, 3.).SetUnits("MB");
744  port->SetRateMeter(Par(port->fRateName));
745  }
746 
747  return port->ItemSubId();
748 }
749 
750 bool dabc::Module::BindPorts(const std::string &inpname, const std::string &outname)
751 {
752  unsigned inpindx = FindInput(inpname);
753  unsigned outindx = FindOutput(outname);
754 
755  if (IsValidInput(inpindx) && IsValidOutput(outindx)) {
756  fInputs[inpindx]->SetBindName(outname);
757  fOutputs[outindx]->SetBindName(inpname);
758  return true;
759  }
760 
761  return false;
762 }
763 
764 void dabc::Module::ProduceInputEvent(unsigned indx, unsigned cnt)
765 {
766  // TODO: should we produce such event automatically ???
767 
768  while (IsValidInput(indx) && cnt--)
769  FireEvent(evntInput, fInputs[indx]->ItemId());
770 }
771 
772 void dabc::Module::ProducePoolEvent(unsigned indx, unsigned cnt)
773 {
774  while (IsValidPool(indx) && cnt--)
775  FireEvent(evntInput, fPools[indx]->ItemId());
776 }
777 
778 
779 void dabc::Module::ProduceOutputEvent(unsigned indx, unsigned cnt)
780 {
781  while (IsValidOutput(indx) && cnt--)
782  FireEvent(evntOutput, fOutputs[indx]->ItemId());
783 }
784 
785 bool dabc::Module::IsPortConnected(const std::string &name) const
786 {
787  return FindPort(name).IsConnected();
788 }
789 
790 
791 bool dabc::Module::SetPortSignaling(const std::string &name, Port::EventsProducing signal)
792 {
793  PortRef port = FindPort(name);
794  if (!port.null()) {
795  port()->SetSignaling(signal);
796  return true;
797  }
798 
799  return false;
800 }
801 
802 bool dabc::Module::SetPortRatemeter(const std::string &name, const Parameter& ref)
803 {
804  PortRef port = FindPort(name);
805  if (!port.null()) {
806  port()->SetRateMeter(ref);
807  return true;
808  }
809  return true;
810 }
811 
812 bool dabc::Module::SetPortLoopLength(const std::string &name, unsigned cnt)
813 {
814  PortRef port = FindPort(name);
815  if (!port.null()) {
816  port()->SetMaxLoopLength(cnt);
817  return true;
818  }
819  return true;
820 }
821 
823 {
824  switch (evid.GetCode()) {
825  case evntInput:
826  case evntOutput:
827  case evntInputReinj:
828  case evntOutputReinj:
829  case evntTimeout:
830  case evntUser:
831  if (IsRunning())
832  ProcessItemEvent(GetItem(evid.GetArg()), evid.GetCode());
833  break;
834  case evntPortConnect: {
835  // deliver event to the user disregard running state
836 
837  Port* port = dynamic_cast<Port*> (GetItem(evid.GetArg()));
838 
839  if (port)
841 
842  ProcessItemEvent(GetItem(evid.GetArg()), evid.GetCode());
843 
844  break;
845  }
846 
847  case evntPortDisconnect:
848  case evntPortError: {
849 
850  bool iserror = (evid.GetCode() == evntPortError), dostop = false;
851 
852  Port* port = dynamic_cast<Port*> (GetItem(evid.GetArg()));
853 
854  // DOUT0("PORT DISCONNECTED iserr %s port %s autostop %s", DBOOL(iserror), port->ItemName().c_str(), DBOOL(fAutoStop));
855 
856  if (port!=0) {
857 
858  ConnectionRequest req = port->GetConnReq();
859 
860  if (!req.null()) {
861  if (req.IsOptional()) {
863  } else {
865  dostop = fAutoStop && IsRunning(); // stop module if port is not optional
866  }
867  }
868 
869  DOUT3("Module %s running %s get disconnect event for port %s connected %s err %s", GetName(), DBOOL(IsRunning()), port->ItemName().c_str(), DBOOL(port->IsConnected()), DBOOL(iserror));
870 
871  //InputPort* inp = dynamic_cast<InputPort*> (port);
872  //if (inp) DOUT0("Input still can recv %u buffers", inp->NumCanRecv());
873 
874  // deliver event to the user disregard running state
875  ProcessItemEvent(GetItem(evid.GetArg()), evid.GetCode());
876 
877  // if reconnect is specified and port is not declared as non-automatic
878  // if port was connected with connect manager, let do work by connection manager
879  if (req.null() && port->TryNextReconnect(iserror, fAutoStop)) {
880  std::string timername = dabc::format("ConnTimer_%s", port->GetName());
881 
882  ConnTimer* timer = dynamic_cast<ConnTimer*> (FindChild(timername.c_str()));
883 
884  if (timer==0) {
885  timer = new ConnTimer(this, timername, port->GetName());
886  AddModuleItem(timer);
887  }
888 
889  timer->fErrorFlag = iserror;
890 
891  port->SetDoingReconnect(true);
892  timer->Activate(port->GetReconnectPeriod() > 0 ? port->GetReconnectPeriod() : 1.);
893 
894  DOUT1("Module %s will try to reconnect port %s", GetName(), port->ItemName().c_str());
895 
896  return;
897  }
898  }
899 
900  if (fAutoStop && IsRunning() && !dostop) {
901  for (unsigned n=0;n<NumOutputs();n++)
902  if (Output(n)->IsConnected() || Output(n)->IsDoingReconnect()) return;
903 
904  for (unsigned n=0;n<NumInputs();n++)
905  if (Input(n)->IsConnected() || Input(n)->IsDoingReconnect()) return;
906 
907  dostop = true;
908  }
909 
910  if (dostop) {
911  DOUT2("Module %s automatically stopped while all connections are now disconnected", GetName());
912  DoStop();
913  }
914 
915  break;
916  }
917  case evntConnStart:
918  case evntConnStop: {
919  Port* port = dynamic_cast<Port*> (GetItem(evid.GetArg()));
920 
921  ProcessConnectionActivated(port->GetName(), evid.GetCode() == evntConnStart);
922 
923  break;
924  }
925 
926  default:
928  break;
929  }
930 }
931 
932 bool dabc::Module::CanSendToAllOutputs(bool exclude_disconnected) const
933 {
934  for(unsigned n=0;n<NumOutputs();n++) {
935  OutputPort* out = Output(n);
936  if (exclude_disconnected && !out->IsConnected()) continue;
937  if (!out->CanSend()) return false;
938  }
939  return true;
940 }
941 
943 {
944  if (buf.null()) return;
945 
946  unsigned last_can_send = NumOutputs();
947  for(unsigned n=0;n<NumOutputs();n++) {
948  OutputPort* out = Output(n);
949  out->fSendallFlag = out->CanSend();
950  if (out->fSendallFlag) last_can_send = n;
951  }
952 
953  for(unsigned n=0;n<NumOutputs();n++) {
954  OutputPort* out = Output(n);
955  if (!out->fSendallFlag) continue;
956 
957  if (n==last_can_send) {
958  Output(n)->Send(buf);
959  if (!buf.null()) { EOUT("buffer not null after sending to output %u", n); exit(333); }
960  } else {
961  dabc::Buffer dupl = buf.Duplicate();
962  Output(n)->Send(dupl);
963  if (!dupl.null()) { EOUT("buffer not null after sending to output %u", n); exit(333); }
964  }
965  }
966 
967  if ((last_can_send != NumOutputs()) && !buf.null()) {
968  EOUT("Should never happens buf %u!!!", buf.GetTotalSize());
969  exit(333);
970  }
971 
972  buf.Release();
973 }
974 
975 
976 // ==========================================================================
977 
978 
980 {
981  if (GetObject()==0) return false;
982 
983  dabc::Command cmd("IsInputConnect");
984  cmd.SetInt("Number", ninp);
985  return Execute(cmd) == cmd_true;
986 }
987 
990 {
991  if (GetObject()==0) return false;
992 
993  dabc::Command cmd("IsOutputConnect");
994  cmd.SetInt("Number", ninp);
995  return Execute(cmd) == cmd_true;
996 }
997 
998 
1000 {
1001  if (GetObject()==0) return false;
1002 
1003  unsigned cnt(0);
1004 
1005  while (true) {
1006  dabc::Command cmd("GetPoolHandle");
1007  cmd.SetUInt("Number", cnt++);
1008 
1009  if (Execute(cmd) != cmd_true) return false;
1010 
1011  PortRef portinp = cmd.GetRef("Port");
1012  MemoryPoolRef poolref = cmd.GetRef("Pool");
1013 
1014  if (portinp.null()) break;
1015 
1016  if (poolref.null()) {
1017  EOUT("Something went wrong with connection to the pools");
1018  exit(543);
1019  }
1020 
1021  DOUT3("@@@@@ Create requester for item %s", portinp.ItemName().c_str());
1022 
1023  PortRef portout = poolref.CreateNewRequester();
1024 
1025  dabc::LocalTransport::ConnectPorts(portout, portinp);
1026  }
1027 
1028  return true;
1029 }
1030 
1032 {
1033  return FindChild(name.c_str());
1034 }
1035 
1036 bool dabc::ModuleRef::IsPortConnected(const std::string &name)
1037 {
1038  dabc::Command cmd("IsPortConnected");
1039  cmd.SetStr("Port", name);
1040  return Execute(cmd) == cmd_true;
1041 }
1042 
1044 {
1045  dabc::Command cmd("GetNumInputs");
1046  return (Execute(cmd)==cmd_true) ? cmd.GetUInt("Number") : 0;
1047 }
1048 
1050 {
1051  dabc::Command cmd("GetNumOutputs");
1052  return (Execute(cmd)==cmd_true) ? cmd.GetUInt("Number") : 0;
1053 }
1054 
1055 
1056 std::string dabc::ModuleRef::InputName(unsigned n, bool itemname)
1057 {
1058  dabc::Command cmd("GetInputName");
1059  cmd.SetUInt("Id", n);
1060  cmd.SetBool("AsItem", itemname);
1061 
1062  if (Execute(cmd)==cmd_true) return cmd.GetStr("Name");
1063  return std::string();
1064 }
1065 
1066 std::string dabc::ModuleRef::OutputName(unsigned n, bool itemname)
1067 {
1068  dabc::Command cmd("GetOutputName");
1069  cmd.SetUInt("Id", n);
1070  cmd.SetBool("AsItem", itemname);
1071 
1072  if (Execute(cmd)==cmd_true) return cmd.GetStr("Name");
1073  return std::string();
1074 }
Reference on memory from memory pool.
Definition: Buffer.h:135
Buffer Duplicate() const
Duplicates instance of Buffer with new segments list independent from source.
Definition: Buffer.cxx:192
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
static const char * CmdName()
Definition: Manager.h:85
static const char * CmdName()
Definition: Manager.h:92
Represents command with its arguments.
Definition: Command.h:99
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
Definition: Command.h:148
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetBool(const std::string &name, bool v)
Definition: Command.h:141
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
bool SetUInt(const std::string &name, unsigned v)
Definition: Command.h:147
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
Definition: Command.cxx:175
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
Definition: Command.cxx:168
Interface class between xml configuration and dabc objects.
Definition: ConfigIO.h:38
bool FindItem(const char *name)
Definition: ConfigIO.cxx:42
bool CheckAttr(const char *name, const char *value)
Check if item, found by FindItem routine, has attribute with specified value.
Definition: ConfigIO.cxx:63
Special timer to reestablish port connections in the module.
Definition: ModuleItem.h:197
bool fErrorFlag
indicate why reconnection was started
Definition: ModuleItem.h:203
void Activate(double period)
Definition: ModuleItem.h:209
@ sDisconnected
connection is down by user, will not be reconnected
@ sConnected
connection is up and working
@ sBroken
connection is broken and should be reactivated by connection manager
Connection request.
void SetServerSide(bool isserver=true)
void SetUseAckn(bool on=true)
bool IsOptional() const
indicate if connection is optional and therefore may be ignored during failure or long timeout
void ChangeState(ConnectionObject::EState state, bool force)
void SetConnDevice(const std::string &dev)
void SetRemoteUrl(const std::string &url)
void SetConnThread(const std::string &name)
void SetConnTimeout(double tm)
void SetInitState()
Change state of the connection to init that other parameters can be changed.
void SetOptional(bool on=true)
Input port.
Definition: Port.h:244
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
Lock guard for posix mutex.
Definition: threads.h:127
Reference FindPool(const std::string &name)
Definition: Manager.cxx:2064
Reference on dabc::MemoryPool class
Definition: MemoryPool.h:245
Reference CreateNewRequester()
Definition: MemoryPool.cxx:696
Buffer TakeBuffer(BufferSize_t size=0)
Returns Buffer object with exclusive access rights.
Definition: MemoryPool.cxx:409
Base class for module items like ports, timers, pool handles.
Definition: ModuleItem.h:68
virtual bool ItemNeedThread() const
Definition: ModuleItem.h:81
void SetItemSubId(unsigned id)
Definition: ModuleItem.h:84
void SetItemId(unsigned id)
Definition: ModuleItem.h:83
unsigned ItemId() const
Definition: ModuleItem.h:104
unsigned ItemSubId() const
Definition: ModuleItem.h:105
std::string OutputName(unsigned n=0, bool itemname=true)
Return item name of the output, can be used in connect command.
Definition: Module.cxx:1066
unsigned NumOutputs()
Returns number of outputs in the module.
Definition: Module.cxx:1049
bool IsInputConnected(unsigned ninp)
Returns true if specified input is connected.
Definition: Module.cxx:979
bool IsPortConnected(const std::string &name)
Returns true if port with specified name is connected - thread safe.
Definition: Module.cxx:1036
unsigned NumInputs()
Returns number of inputs in the module.
Definition: Module.cxx:1043
PortRef FindPort(const std::string &name)
Return reference on the port.
Definition: Module.cxx:1031
bool IsOutputConnected(unsigned ninp)
Returns true if specified output is connected.
Definition: Module.cxx:989
bool ConnectPoolHandles()
Method called by manager to establish connection to pools TODO: while used from devices,...
Definition: Module.cxx:999
std::string InputName(unsigned n=0, bool itemname=true)
Return item name of the input, can be used in connect command.
Definition: Module.cxx:1056
bool SubmitCommandToTransport(const std::string &portname, Command cmd)
Submits command to transport, assigned with the port.
Definition: Module.cxx:301
bool SetPortSignaling(const std::string &name, Port::EventsProducing signal)
Definition: Module.cxx:791
unsigned FindOutput(const std::string &name) const
Definition: Module.cxx:157
virtual void ObjectCleanup()
Inherited method, called during module destroy.
Definition: Module.cxx:536
virtual bool Find(ConfigIO &cfg)
Method to locate object in xml file.
Definition: Module.cxx:516
unsigned FindPool(const std::string &name) const
Definition: Module.cxx:173
void RemoveModuleItem(ModuleItem *item)
Definition: Module.cxx:651
void ProduceOutputEvent(unsigned indx=0, unsigned cnt=1)
Definition: Module.cxx:779
virtual Parameter CreatePar(const std::string &name, const std::string &kind="")
Definition: Module.cxx:129
bool IsPortConnected(const std::string &name) const
Definition: Module.cxx:785
virtual void BuildFieldsMap(RecordFieldsMap *cont)
Fill fields map, which is relevant for the object Objects hierarchy produced from dabc::Manager.
Definition: Module.cxx:530
virtual bool DoStop()
Definition: Module.cxx:586
virtual bool DoStart()
Definition: Module.cxx:563
Buffer TakeDfltBuffer()
Definition: Module.cxx:258
std::string OutputName(unsigned indx=0, bool fullname=false) const
Definition: Module.cxx:181
virtual void OnThreadAssigned()
Definition: Module.cxx:79
bool CanSendToAllOutputs(bool exclude_disconnected=true) const
Definition: Module.cxx:932
std::string PoolName(unsigned indx=0, bool fullname=false) const
Definition: Module.cxx:195
virtual void ProcessEvent(const EventId &)
Definition: Module.cxx:822
bool BindPorts(const std::string &inpname, const std::string &outname)
Bind input and output ports that both will share same connection.
Definition: Module.cxx:750
std::string InputName(unsigned indx=0, bool fullname=false) const
Definition: Module.cxx:188
void ProduceUserItemEvent(unsigned indx=0, unsigned cnt=1)
Definition: Module.cxx:234
unsigned FindInput(const std::string &name) const
Definition: Module.cxx:165
unsigned FindUserItem(const std::string &name)
Definition: Module.cxx:219
unsigned CreatePoolHandle(const std::string &poolname, unsigned queue=10)
Creates handle for memory pool, which preserves reference on memory pool and provides fast access to ...
Definition: Module.cxx:607
virtual void SetModulePriority(int pri=-1)
Definition: Module.cxx:284
void SetInfoParName(const std::string &name)
Definition: Module.cxx:143
unsigned CreateUserItem(const std::string &name)
Definition: Module.cxx:203
bool SetPortLoopLength(const std::string &name, unsigned cnt)
Definition: Module.cxx:812
void AddModuleItem(ModuleItem *item)
Definition: Module.cxx:629
std::string GetInfoParName() const
Definition: Module.cxx:150
bool fAutoStop
module will automatically stop when all i/o ports will be disconnected
Definition: Module.h:62
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
Definition: Module.cxx:311
virtual double ProcessTimeout(double last_diff)
Definition: Module.cxx:554
std::string UserItemName(unsigned indx=0, bool fullname=false) const
Definition: Module.cxx:227
unsigned CreateInput(const std::string &name, unsigned queue=10)
Definition: Module.cxx:704
void DisconnectAllPorts(bool witherr=false)
Method disconnects all module ports, should be called only from Module thread.
Definition: Module.cxx:292
void ProduceInputEvent(unsigned indx=0, unsigned cnt=1)
Definition: Module.cxx:764
unsigned CreateOutput(const std::string &name, unsigned queue=10)
Definition: Module.cxx:727
std::string fPublishPars
path where module pars will be published
Definition: Module.h:65
void ProducePoolEvent(unsigned indx=0, unsigned cnt=1)
Definition: Module.cxx:772
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
Definition: Module.cxx:109
PortRef FindPort(const std::string &name) const
Definition: Module.cxx:699
bool Stop()
Stops execution of the module code.
Definition: Module.cxx:249
std::string TimerName(unsigned n=0, bool fullname=false) const
Definition: Module.cxx:93
bool DisconnectPort(const std::string &name, bool witherr=false)
Disconnect port from transport.
Definition: Module.cxx:269
bool Start()
Starts execution of the module code.
Definition: Module.cxx:240
void EnsurePorts(unsigned numinp=0, unsigned numout=0, const std::string &poolname="")
Method ensure that at least specified number of input and output ports will be created.
Definition: Module.cxx:66
bool SetPortRatemeter(const std::string &name, const Parameter &ref)
Definition: Module.cxx:802
virtual ~Module()
Definition: Module.cxx:59
void SendToAllOutputs(Buffer &buf)
Definition: Module.cxx:942
unsigned FindTimer(const std::string &name)
Definition: Module.cxx:100
Module(const std::string &name, Command cmd)
Definition: Module.cxx:24
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Object.cxx:1076
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
Output port.
Definition: Port.h:301
bool fSendallFlag
Definition: Port.h:309
bool CanSend() const
Returns true if user can send get buffer via the port.
Definition: Port.h:325
Parameter class
Definition: Parameter.h:163
Handle for pool connection.
Definition: Port.h:347
Reference fPool
Definition: Port.h:355
Reference on the dabc::Port class
Definition: Port.h:195
bool IsInput() const
Returns true if it is input port.
Definition: Port.h:199
bool Disconnect(bool witherr=false)
Disconnect port
Definition: Port.cxx:233
Base class for input and output ports.
Definition: Port.h:47
void SetRateMeter(const Parameter &ref)
Set port ratemeter - must be used from module thread.
Definition: Port.cxx:162
unsigned QueueCapacity() const
Method returns actual queue capacity of the port, object mutex is used.
Definition: Port.cxx:101
ConnectionRequest GetConnReq(bool force=false)
Return reference on existing request object.
Definition: Port.cxx:75
void Disconnect(bool witherr=false)
Definition: Port.h:117
Parameter fRate
parameter for rate calculations
Definition: Port.h:68
bool TryNextReconnect(bool caused_by_error, bool can_disconnect=true)
Returns true when reconnection should be attempted.
Definition: Port.cxx:194
double GetReconnectPeriod() const
Definition: Port.h:142
bool IsConnected() const
Method can only be used from thread itself.
Definition: Port.h:126
void SetDoingReconnect(bool on=true)
Definition: Port.h:147
std::string fRateName
name of rate parameter, which should be assigned to port
Definition: Port.h:72
EventsProducing
Definition: Port.h:58
bool AsBool(bool dflt=false) const
Definition: Record.cxx:477
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
int64_t AsInt(int64_t dflt=0) const
Definition: Record.cxx:501
double AsDouble(double dflt=0.) const
Definition: Record.cxx:549
bool SetInt(int64_t v)
Definition: Record.cxx:1073
RecordField & Field(const std::string &name)
Direct access to the fields.
Definition: Record.h:401
bool HasField(const std::string &name) const
Definition: Record.h:498
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
std::string SaveToJson(unsigned mask=0)
Store record in JSON form.
Definition: Record.cxx:1658
bool RemoveField(const std::string &name)
Definition: Record.h:501
virtual void CreateRecord(const std::string &name)
Definition: Record.cxx:1674
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Reference.cxx:241
Provides timer event to the module.
Definition: ModuleItem.h:149
std::string ThreadName() const
Returns thread name of worker assigned.
Definition: Worker.h:489
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
Definition: Worker.h:482
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
Definition: Worker.cxx:650
virtual void ProcessEvent(const EventId &)
Definition: Worker.cxx:499
bool AssignToThread(ThreadRef thrd, bool sync=true)
Assign worker to thread, worker becomes active immediately.
Definition: Worker.cxx:326
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
bool HasThread() const
Indicates if pointer on thread is not zero; thread-safe.
Definition: Worker.cxx:151
virtual void ObjectCleanup()
Central cleanup method for worker.
Definition: Worker.cxx:238
virtual Parameter CreatePar(const std::string &name, const std::string &kind="")
Definition: Worker.cxx:558
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
Definition: XmlEngine.cxx:917
@ layoutMaximal
Definition: Manager.h:278
@ layoutPerModule
Definition: Manager.h:276
@ layoutBalanced
Definition: Manager.h:277
@ layoutMinimalistic
Definition: Manager.h:275
const char * xmlWorkPool
Definition: Object.cxx:46
const char * xmlNumOutputs
Definition: Object.cxx:55
const char * xmlTimeoutAttr
Definition: ConfigBase.cxx:42
@ mitUser
Definition: ModuleItem.h:36
const char * xmlUseacknAttr
Definition: ConfigBase.cxx:39
const char * xmlDeviceAttr
Definition: ConfigBase.cxx:37
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * xmlOptionalAttr
Definition: ConfigBase.cxx:40
const char * xmlThreadAttr
Definition: ConfigBase.cxx:38
@ evntInputReinj
Definition: ModuleItem.h:43
@ evntConnStop
Definition: ModuleItem.h:50
@ evntPortDisconnect
Definition: ModuleItem.h:47
@ evntUser
Definition: ModuleItem.h:52
@ evntTimeout
Definition: ModuleItem.h:45
@ evntInput
Definition: ModuleItem.h:41
@ evntPortConnect
Definition: ModuleItem.h:46
@ evntConnStart
Definition: ModuleItem.h:49
@ evntOutputReinj
Definition: ModuleItem.h:44
@ evntPortError
Definition: ModuleItem.h:48
@ evntOutput
Definition: ModuleItem.h:42
const char * xmlModuleNode
Definition: Object.cxx:33
const char * xmlPoolName
Definition: Object.cxx:45
@ moduleitemMaxId
Definition: ModuleItem.h:57
const char * xmlNameAttr
Definition: ConfigBase.cxx:33
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_ignore
Definition: Command.h:41
@ cmd_true
Definition: Command.h:38
const char * xmlNumInputs
Definition: Object.cxx:54
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
uint32_t GetArg() const
Definition: Thread.h:94
uint16_t GetCode() const
Definition: Thread.h:92