DABC (Data Acquisition Backbone Core)  2.9.9
Manager.cxx
Go to the documentation of this file.
1 // $Id: Manager.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/Manager.h"
17 
18 #include <dirent.h>
19 #include <fnmatch.h>
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <cmath>
25 
26 #include "dabc/api.h"
27 #include "dabc/defines.h"
28 
29 #include "dabc/Transport.h"
30 #include "dabc/MemoryPool.h"
31 #include "dabc/Iterator.h"
32 #include "dabc/BinaryFileIO.h"
33 #include "dabc/Configuration.h"
34 #include "dabc/ConnectionManager.h"
35 #include "dabc/CpuInfoModule.h"
36 #include "dabc/MultiplexerModule.h"
37 #include "dabc/SocketFactory.h"
38 #include "dabc/Publisher.h"
39 
40 namespace dabc {
41 
43 
44  class StdManagerFactory : public Factory {
45  public:
46  StdManagerFactory(const std::string &name) : Factory(name) { }
47 
48  virtual Module* CreateModule(const std::string &classname, const std::string &modulename, Command cmd);
49 
50  virtual Reference CreateObject(const std::string &classname, const std::string &objname, dabc::Command cmd);
51 
52  virtual Reference CreateThread(Reference parent, const std::string &classname, const std::string &thrdname, const std::string &thrddev, Command cmd);
53 
54  virtual DataOutput* CreateDataOutput(const std::string &typ);
55 
56  virtual DataInput* CreateDataInput(const std::string &typ);
57  };
58 
59 
67  struct DependPair {
70  int fire;
71 
72  DependPair() : src(), tgt(0), fire(0) {}
73  DependPair(Object* _src, Object* _tgt) : src(_src), tgt(_tgt), fire(0) {}
74  DependPair(const DependPair& d) : src(d.src), tgt(d.tgt), fire(d.fire) {}
75  };
76 
77  class DependPairList : public std::list<DependPair> {};
78 
79 
82  std::string remote_recv;
83  bool only_change;
84  std::string name_mask;
85  std::string fullname_mask;
86  int queue;
87 
89  recv(),
90  remote_recv(),
91  only_change(false),
92  name_mask(),
93  fullname_mask(),
94  queue(0)
95  {}
96 
97  bool match(const std::string &parname, int event, const std::string &fullname)
98  {
99  if (only_change && (event!=parModified)) return false;
100 
101  if (name_mask.length()>0)
102  return Object::NameMatch(parname, name_mask);
103  if (fullname_mask.length() > 0)
104  return Object::NameMatch(fullname, fullname_mask);
105  return true;
106  }
107 
108  };
109 
110  class ParamEventReceiverList : public std::list<ParamEventReceiverRec> {};
111 
112  class BlockingOutput : public DataOutput {
113  protected:
114  double fBlockTm;
115  bool fSleep;
117 
118  public:
119  BlockingOutput(const dabc::Url& url) :
120  DataOutput(url),
121  fBlockTm(1.),
122  fSleep(true),
123  fErrCounter(0)
124  {
125  fBlockTm = url.GetOptionDouble("time", 1.);
126  fSleep = url.GetOptionBool("sleep", true);
127  fErrCounter = url.GetOptionInt("err", 0);
128  }
129 
130  unsigned Write_Buffer(Buffer& buf)
131  {
132  buf.Release();
133  if (fErrCounter > 0)
134  if (--fErrCounter == 0) return do_Error;
135 
136  if (fSleep) {
138  } else {
139  TimeStamp tm;
140  tm.GetNow();
141  int cnt(0);
142  while (!tm.Expired(fBlockTm)) cnt++;
143  }
144  return do_Ok;
145  }
146  };
147 }
148 
149 dabc::Module* dabc::StdManagerFactory::CreateModule(const std::string &classname, const std::string &modulename, Command cmd)
150 {
151  if (classname == "dabc::CpuInfoModule")
152  return new CpuInfoModule(modulename, cmd);
153 
154  if (classname == "dabc::ConnectionManager")
155  return new ConnectionManager(modulename, cmd);
156 
157  if (classname == "dabc::MultiplexerModule")
158  return new MultiplexerModule(modulename, cmd);
159 
160  if (classname == "dabc::RepeaterModule")
161  return new dabc::RepeaterModule(modulename, cmd);
162 
163  return 0;
164 }
165 
166 dabc::Reference dabc::StdManagerFactory::CreateObject(const std::string &classname, const std::string &objname, dabc::Command cmd)
167 {
168  if (classname == "dabc::Publisher")
169  return new dabc::Publisher(objname, cmd);
170 
171  return dabc::Factory::CreateObject(classname, objname, cmd);
172 }
173 
174 
175 dabc::Reference dabc::StdManagerFactory::CreateThread(Reference parent, const std::string &classname, const std::string &thrdname, const std::string &thrddev, Command cmd)
176 {
177  dabc::Thread* thrd = 0;
178 
179  if (classname.empty() || (classname == typeThread))
180  thrd = new Thread(parent, thrdname, cmd);
181 
182  return Reference(thrd);
183 }
184 
185 
187 {
188  dabc::Url url(typ);
189  if (url.GetProtocol()=="bin") {
190  return new dabc::BinaryFileOutput(url);
191  } else
192  if (url.GetProtocol() == "block") {
193  return new dabc::BlockingOutput(url);
194  }
195 
196  return 0;
197 }
198 
200 {
201  dabc::Url url(typ);
202  if (url.GetProtocol()=="bin") {
203  return new dabc::BinaryFileInput(url);
204  }
205 
206  return 0;
207 }
208 
209 
212 namespace dabc {
214  friend class dabc::Manager;
215 
216  static bool fAutoDestroy;
217 
218  public:
221  {
222  //printf("Vary last action mgr = %p\n", dabc::mgr());
223  if (dabc::mgr() && fAutoDestroy) {
224  printf("Destroy manager\n");
225  dabc::mgr()->HaltManager();
226  dabc::mgr.Destroy();
227  printf("Destroy manager done\n");
228  }
229  }
230  };
231 }
232 
234 
236 
237 // ******************************************************************
238 
240 {
242 }
243 
244 
245 dabc::Manager::Manager(const std::string &managername, Configuration* cfg) :
246  Worker(0, managername),
247  fMgrStoppedTime(),
248  fAppFinished(false),
249  fMgrMutex(0),
250  fDestroyQueue(0),
251  fParsQueue(1024),
252  fTimedPars(0),
254  fDepend(0),
255  fCfg(cfg),
256  fCfgHost(),
257  fNodeId(0),
258  fNumNodes(1),
259  fLocalAddress(),
262 {
263  fInstance = this;
265 
266  if (dabc::mgr.null()) {
267  dabc::mgr = dabc::ManagerRef(this);
269  }
270 
271  // set unique ID for application, which can be used to identify instance
272  fLocalAddress = dabc::format("local_pid%d", (int) getpid());
273 
274  if (cfg) {
275  fCfgHost = cfg->MgrHost();
276  fNodeId = cfg->MgrNodeId();
278 
279  std::string layout = cfg->ThreadsLayout();
280 
281  if (layout=="minimal") fThrLayout = layoutMinimalistic; else
282  if (layout=="permodule") fThrLayout = layoutPerModule; else
283  if (layout=="balanced") fThrLayout = layoutBalanced; else
284  if (layout=="maximal") fThrLayout = layoutMaximal; else layout.clear();
285 
286  if (!layout.empty()) DOUT0("Set threads layout to %s", layout.c_str());
287  }
288 
289  // we create recursive mutex to avoid problem in Folder::GetFolder method,
290  // where constructor is called under locked mutex,
291  // let say - there is no a big problem, when mutex locked several times from one thread
292 
293  fMgrMutex = new Mutex(true);
294 
295  fDepend = new DependPairList;
296 
297  fLastCreatedDevName.clear();
298 
300 
301  // this should automatically add all factories to the manager
303 
304  ProcessFactory(new dabc::SocketFactory("sockets"));
305 
306  // append factories, which are created too fast
308  for (unsigned n=0;n<sizeof(fFirstFactories)/sizeof(void*); n++)
309  if (fFirstFactories[n]) {
311  fFirstFactories[n] = 0;
312  }
313  }
314 
316 
317  ActivateTimeout(1.);
318 }
319 
321 {
322  // stop thread that it is not try to access objects which we are directly deleting
323  // normally, as last operation in the main() program must be HaltManeger(true)
324  // call, which suspend and erase all items in manager
325 
326 // dabc::SetDebugLevel(3);
327 // dabc::SetDebugLevel(3);
328 
329  DOUT3("~Manager -> DestroyQueue");
330 
331  if (fDestroyQueue && (fDestroyQueue->GetSize()>0))
332  EOUT("Manager destroy queue is not empty");
333 
335 
336  delete fDestroyQueue; fDestroyQueue = 0;
337 
338  if (fTimedPars!=0) {
339  if (fTimedPars->GetSize() > 0) {
340  EOUT("Manager timed parameters list not empty");
341  }
342 
343  delete fTimedPars;
344  fTimedPars = 0;
345  }
346 
347  DOUT3("~Manager -> ~fDepend");
348 
349  if (fDepend && (fDepend->size()>0))
350  EOUT("Dependencies parameters list not empty");
351 
352  delete fDepend; fDepend = 0;
353 
354  DOUT3("~Manager -> ~fMgrMutex");
355 
356  delete fMgrMutex; fMgrMutex = 0;
357 
358  if (dabc::mgr()==this) {
359  DOUT1("Normal EXIT");
362  } else {
363  EOUT("What ??? !!!");
364  }
365 
366  fInstance = 0;
367  fInstanceId = 0;
368 }
369 
370 
372 {
373  ThreadRef thrd = thread();
374  // only for case, when manager thread does not run its own main loop,
375  // we could help it to finish processing
376  if (thrd.IsRealThrd()) thrd.Release();
377 
379 
381 
382  RemoveChilds();
383 
384  // run dummy event loop several seconds to complete events which may be submitted there
385 
386  int cnt = 0;
387  TimeStamp tm1 = dabc::Now();
388  double halttime = fCfg ? fCfg->GetHaltTime() : 0.;
389  if (halttime<=0.) halttime = 0.7;
390 
391  do {
392 
393  cnt++;
394 
396 
398 
399  if (!thrd.null()) {
400  thrd()->SingleLoop(0, 0.001);
401  if ((thrd()->TotalNumberOfEvents()==0) || tm1.Expired(halttime*0.7)) thrd.Release();
402  } else {
403  dabc::Sleep(0.001);
404  }
405 
406  } while ((dabc::Thread::NumThreadInstances() > 0) && !tm1.Expired(halttime));
407 
408  TimeStamp tm2 = dabc::Now();
409 
411  EOUT("!!!!!!!!! There are still %u threads - anyway declare manager halted spent: %5.3f s!!!!!!", dabc::Thread::NumThreadInstances(), tm2 - tm1);
412  } else {
413  DOUT1("All threads stopped after %5.3f s (loop count = %d)", tm2-tm1, cnt);
414  }
415 
416 #ifdef DABC_EXTRA_CHECKS
417  dabc::Object::DebugObject();
418 #endif
419 
421 
422  DOUT3("dabc::Manager::HaltManager done refcnt = %u", fObjectRefCnt);
423 }
424 
426 {
427  ReferencesVector* vect = 0;
428 
429  // this is references, which want to be informed that object destroyed
430  ReferencesVector clr_vect;
431  PointersVector ptr_vect;
432 
433  // this is references which need to be released
434  ReferencesVector rel_vect;
435 
436  // stole destroy queue - to avoid any kind of copy
437  {
438  LockGuard lock(fMgrMutex);
439 
440  vect = fDestroyQueue;
441  fDestroyQueue = 0;
442 
443  DependPairList::iterator iter = fDepend->begin();
444  while (iter != fDepend->end()) {
445  if (iter->fire != 0) {
446  if (iter->fire & 2) {
447  clr_vect.Add(iter->src);
448  ptr_vect.push_back(iter->tgt);
449  } else
450  if (iter->fire & 1) {
451  rel_vect.Add(iter->src);
452  }
453 
454  fDepend->erase(iter++);
455  } else
456  iter++;
457  }
458  }
459 
460  // first inform about dependencies
461  for (unsigned n=0;n<clr_vect.GetSize();n++) {
462  Object* obj = clr_vect.GetObject(n);
463  Worker* w = dynamic_cast<Worker*> (obj);
464  if (w!=0) {
465  if (obj->IsLogging())
466  DOUT0("Submit worker %p to thread", obj);
467  dabc::Command cmd("ObjectDestroyed");
468  cmd.SetPtr("#Object", ptr_vect[n]);
470 
471  if (w->Submit(cmd)) continue;
472  }
473 
474  obj->ObjectDestroyed((Object*)ptr_vect[n]);
475  }
476 
477  // clear all references
478  clr_vect.Clear();
479  ptr_vect.clear();
480  rel_vect.Clear();
481 
482  if (vect==0) return false;
483 
484  ParamEventReceiverList::iterator iter = fParEventsReceivers->begin();
485 
486  while (iter!=fParEventsReceivers->end()) {
487  if (vect->HasObject(iter->recv()))
488  fParEventsReceivers->erase(iter++);
489  else
490  iter++;
491  }
492 
493  DOUT3("MGR: Process destroy QUEUE vect = %u", (vect ? vect->GetSize() : 0));
494 
495  // FIXME: remove timed parameters, otherwise it is not possible to delete it
496  // if (fTimedPars!=0)
497  // for(unsigned n=0;n<vect->GetSize();n++)
498  // fTimedPars->Remove(vect->GetObject(n));
499 
500  vect->Clear(true);
501  delete vect;
502 
503  return true;
504 }
505 
507 {
508  // method called from parameter itself therefore we do not need mutex to access parameter field
509 
510  if (par==0) return;
511 
512  // first analyze parameters without manager mutex - we do not need it now
513 
514  if ((evid==parModified) && !par->IsDeliverAllEvents()) {
515  // check if event of that parameter in the queue
516  LockGuard lock(ObjectMutex());
517  for (unsigned n=0;n<fParsQueue.Size();n++)
518  if ((fParsQueue.Item(n).par == par) && (fParsQueue.Item(n).event==evid)) return;
519  }
520 
521  bool fire = false;
522 
523  Parameter parref(par);
524 
525  {
526  // now add record to the queue
527 
528  //LockGuard lock(ObjectMutex());
529  DABC_LOCKGUARD(ObjectMutex(), "Inserting new event into fParsQueue");
530 
531  fire = fParsQueue.Size() == 0;
532 
533  // add parameter event to the queue
534  ParamRec* rec = fParsQueue.PushEmpty();
535 
536  if (rec) {
537  // memset(rec, 0, sizeof(ParamRec));
538  rec->par << parref; // we are trying to avoid parameter locking under locked queue mutex
539  rec->event = evid;
540  }
541 
542  }
543 
544 // DOUT0("FireParamEvent id %d par %s", evid, par->GetName());
545 
546  if (fire) FireEvent(evntManagerParam);
547 }
548 
550 {
551  // do not process more than 100 events a time
552  int maxcnt = 100;
553 
554  while (maxcnt-->0) {
555 
556  ParamRec rec;
557 
558  {
559  DABC_LOCKGUARD(ObjectMutex(), "Extracting event from fParsQueue");
560  // LockGuard lock(ObjectMutex());
561  if (fParsQueue.Size()==0) return false;
562  rec.par << fParsQueue.Front().par;
563  rec.event = fParsQueue.Front().event;
564  fParsQueue.PopOnly();
565  }
566 
567  if (rec.par.null()) continue;
568 
569  bool checkadd(false), checkremove(false);
570 
571  switch (rec.event) {
572  case parCreated:
573  checkadd = rec.par.NeedTimeout();
574  break;
575 
576  case parConfigured:
577  checkadd = rec.par.NeedTimeout();
578  checkremove = !checkadd;
579  DOUT2("Parameter %s configured checkadd %s", rec.par.GetName(), DBOOL(checkadd));
580  break;
581 
582  case parModified:
583  break;
584 
585  case parDestroy:
586  checkremove = rec.par.NeedTimeout();
587  break;
588  }
589 
590  if (checkadd) {
591  rec.par()->SetCleanupBit();
592  if (fTimedPars==0)
594  if (!fTimedPars->HasObject(rec.par())) {
595  Reference ref = rec.par;
596  fTimedPars->Add(ref);
597  }
598  }
599 
600  if (checkremove && fTimedPars) {
601  fTimedPars->Remove(rec.par());
602  if (fTimedPars->GetSize()==0) {
603  delete fTimedPars;
604  fTimedPars = 0;
605  }
606  }
607 
608  // from here we provide parameter to the external world
609 
610  bool attrmodified = rec.par.TakeAttrModified();
611 
612  if (rec.event == parConfigured) {
613  attrmodified = true;
614  rec.event = parModified;
615  }
616 
617  std::string fullname, value;
618 
619  FillItemName(rec.par(), fullname);
620 
621  for (ParamEventReceiverList::iterator iter = fParEventsReceivers->begin();
622  iter != fParEventsReceivers->end(); iter++) {
623 
624  if (!iter->match(rec.par.GetName(), rec.event, fullname)) continue;
625 
626  // TODO: provide complete list of fields - mean complete xml record ?????
627  if (value.length()==0)
628  value = rec.par.Value().AsStr();
629 
630  if (iter->queue > 1000) {
631  EOUT("Too many events for receiver %s - block any following", iter->recv.GetName());
632  continue;
633  }
634 
635  if (!iter->recv.null() && !iter->recv.CanSubmitCommand()) {
636  DOUT4("receiver %s cannot be used to submit command - ignore", iter->recv.GetName());
637  continue;
638  }
639 
640  // TODO: probably one could use special objects and not command to deliver parameter events to receivers
641  CmdParameterEvent evnt(fullname, value, rec.event, attrmodified);
642 
643  evnt.SetPtr("#Iterator", &(*iter));
644  evnt.SetBool("#no_warnings",true);
645 
646  iter->queue++;
647 
648  Assign(evnt);
649 
650  if (iter->remote_recv.length() > 0) {
651  evnt.SetReceiver(iter->remote_recv);
652  GetCommandChannel().Submit(evnt);
653  } else
654  iter->recv.Submit(evnt);
655  }
656 
657  }
658 
659  // generate one more event - we do not process all of records
661 
662  // generate parameter event from the manager thread
663  return true;
664 }
665 
667 {
668  DOUT5("MGR::ProcessEvent %s", evnt.asstring().c_str());
669 
670  switch (evnt.GetCode()) {
671  case evntDestroyObj:
673  break;
674  case evntManagerParam:
676  break;
677  default:
678  Worker::ProcessEvent(evnt);
679  break;
680  }
681  DOUT5("MGR::ProcessEvent %s done", evnt.asstring().c_str());
682 }
683 
685 {
686  Module* m = 0;
687  Iterator iter(this);
688 
689  while (iter.next_cast(m, m==0)) {
690  if (m->IsRunning()) return true;
691  }
692 
693  return false;
694 }
695 
696 
697 dabc::Reference dabc::Manager::FindItem(const std::string &name, bool islocal)
698 {
699  std::string server, itemname;
700 
701  if (islocal) {
702  itemname = name;
703  } else {
704  if (!DecomposeAddress(name, islocal, server, itemname)) return 0;
705  if (!islocal) return 0;
706  }
707 
708  if (itemname.empty()) return 0;
709 
710  return FindChildRef(itemname.c_str());
711 }
712 
714 {
715  return FindItem(name);
716 }
717 
718 
719 dabc::Reference dabc::Manager::FindPort(const std::string &name)
720 {
721  PortRef ref = FindItem(name);
722 
723  return ref;
724 }
725 
726 
727 dabc::Reference dabc::Manager::FindPool(const std::string &name)
728 {
729  MemoryPoolRef ref = FindItem(name);
730 
731  return ref;
732 }
733 
735 {
736  DeviceRef ref = FindItem(name);
737 
738  return ref;
739 }
740 
742 {
744 }
745 
746 void dabc::Manager::FillItemName(const Object* ptr, std::string& itemname, bool compact)
747 {
748  itemname.clear();
749 
750  if (ptr==0) return;
751 
752  if (!compact) itemname = "/";
753  ptr->FillFullName(itemname, this);
754 }
755 
757 {
758  // check if command dedicated for other node, module and so on
759  // returns: cmd_ignore - command will be processed in manager itself
760  // cmd_postponed - command redirected to other instance
761  // .. - command is executed
762 
763  std::string url = cmd.GetReceiver();
764 
765  bool islocal(true);
766  std::string server,itemname;
767 
768  if (!url.empty() && DecomposeAddress(url, islocal, server, itemname)) {
769 
770  if (cmd.GetBool("#local_cmd")) islocal = true;
771 
772  if (!islocal)
773  DOUT3("MGR: Preview command %s item %s tgtnode %s", cmd.GetName(), url.c_str(), server.c_str());
774 
775  if (!islocal) {
776 
777  if (GetCommandChannel().Submit(cmd)) return cmd_postponed;
778 
779  EOUT("Cannot submit command to remote server %s", server.c_str());
780  return cmd_false;
781 
782  } else
783  if (!itemname.empty()) {
784 
785  WorkerRef worker = FindItem(itemname, true);
786 
787  if (!worker.null()) {
788  cmd.RemoveReceiver();
789  worker.Submit(cmd);
790  return cmd_postponed;
791  }
792 
793  if (cmd.IsName(CmdSetParameter::CmdName())) {
794  Parameter par = FindItem(itemname, true);
795 
796  if (!par.null()) {
797  cmd.RemoveReceiver();
798  return par.ExecuteChange(cmd);
799  }
800  }
801 
802  EOUT("Did not found receiver %s for command %s", itemname.c_str(), cmd.GetName());
803 
804  return cmd_false;
805  }
806 
807  }
808 
809  return Worker::PreviewCommand(cmd);
810 }
811 
812 #define FOR_EACH_FACTORY(arg) \
813 { \
814  ReferencesVector factories; \
815  GetFactoriesFolder().GetAllChildRef(&factories); \
816  Factory* factory = nullptr; \
817  for (unsigned n=0; n<factories.GetSize(); n++) { \
818  factory = dynamic_cast<Factory*> (factories.GetObject(n)); \
819  if (factory==0) continue; \
820  arg \
821  } \
822 }
823 
824 dabc::WorkerRef dabc::Manager::DoCreateModule(const std::string &classname, const std::string &modulename, Command cmd)
825 {
826  ModuleRef mdl = FindModule(modulename);
827 
828  if (!mdl.null()) {
829  DOUT4("Module name %s already exists", modulename.c_str());
830 
831  } else {
832 
834  mdl = factory->CreateModule(classname, modulename, cmd);
835  if (!mdl.null()) break;
836  )
837 
838  if (mdl.null()) {
839  EOUT("Cannot create module of type %s", classname.c_str());
840  return mdl;
841  }
842 
843  std::string thrdname = mdl.Cfg(xmlThreadAttr, cmd).AsStr();
844 
845  if (thrdname.empty())
846  switch (GetThreadsLayout()) {
847  case layoutMinimalistic: thrdname = ThreadName(); break;
848  case layoutPerModule: thrdname = modulename + "Thrd"; break;
849  case layoutBalanced: thrdname = modulename + "Thrd"; break;
850  case layoutMaximal: thrdname = modulename + "Thrd"; break;
851  default: thrdname = modulename + "Thrd"; break;
852  }
853 
854  mdl.MakeThreadForWorker(thrdname);
855 
856  mdl.ConnectPoolHandles();
857  }
858 
859  return mdl;
860 }
861 
862 
863 dabc::Reference dabc::Manager::DoCreateObject(const std::string &classname, const std::string &objname, Command cmd)
864 {
865 
866  dabc::Reference ref;
867 
869  ref = factory->CreateObject(classname, objname, cmd);
870  if (!ref.null()) break;
871  )
872 
873  return ref;
874 }
875 
876 
878 {
879  DOUT5("MGR: Execute %s\n%s", cmd.GetName(), cmd.SaveToXml(false).c_str());
880 
881  int cmd_res = cmd_true;
882 
883  if (cmd.IsName(CmdCreateModule::CmdName())) {
884  std::string classname = cmd.GetStr(xmlClassAttr);
885  std::string modulename = cmd.GetStr(CmdCreateModule::ModuleArg());
886 
887  ModuleRef ref = DoCreateModule(classname, modulename, cmd);
888  cmd_res = cmd_bool(!ref.null());
889 
890  } else
891  if (cmd.IsName("InitFactories")) {
893  factory->Initialize();
894  )
895  cmd_res = cmd_true;
896  } else
897  if (cmd.IsName(CmdCreateApplication::CmdName())) {
898  std::string classname = cmd.GetStr("AppClass");
899 
900  if (classname.empty()) classname = typeApplication;
901 
902  ApplicationRef appref = app();
903 
904  if (!appref.null() && (classname == appref.ClassName())) {
905  DOUT2("Application of class %s already exists", classname.c_str());
906  } else
907  if (classname != typeApplication) {
908  appref.Destroy();
909 
911  appref = factory->CreateApplication(classname, cmd);
912  if (!appref.null()) break;
913  )
914  } else {
915  appref.Destroy();
916 
918 
919  appref = new Application();
920 
921  if (func) appref()->SetInitFunc((Application::ExternalFunction*)func);
922  }
923 
924 
925  std::string appthrd = appref.Cfg(xmlThreadAttr, cmd).AsStr();
926 
927  if (appthrd.empty())
928  switch (GetThreadsLayout()) {
929  case layoutMinimalistic: appthrd = ThreadName(); break;
930  default: appthrd = AppThrdName(); break;
931  }
932 
933 
934  appref.MakeThreadForWorker(appthrd);
935 
936  cmd_res = cmd_bool(!appref.null());
937 
938  if (appref.null()) EOUT("Cannot create application of class %s", classname.c_str());
939  else DOUT2("Application of class %s thrd %s created", classname.c_str(), appthrd.c_str());
940 
941  } else
942 
943  if (cmd.IsName(CmdCreateDevice::CmdName())) {
944  std::string classname = cmd.GetStr("DevClass");
945  std::string devname = cmd.GetStr("DevName");
946  if (devname.empty()) devname = classname;
947 
948  WorkerRef dev = FindDevice(devname);
949 
950  cmd_res = cmd_false;
951 
952  if (!dev.null()) {
953  if (classname == dev.ClassName()) {
954  DOUT4("Device %s of class %s already exists", devname.c_str(), classname.c_str());
955  cmd_res = cmd_true;
956  } else {
957  EOUT("Device %s has other class name %s than requested", devname.c_str(), dev.ClassName(), classname.c_str());
958  }
959  } else {
961  dev = factory->CreateDevice(classname, devname, cmd);
962  if (!dev.null()) break;
963  )
964 
965  if (dev.null()) {
966  EOUT("Cannot create device %s of class %s", devname.c_str(), classname.c_str());
967  } else {
968 
969  std::string thrdname = dev.Cfg(xmlThreadAttr, cmd).AsStr();
970 
971  if (thrdname.empty())
972  switch (GetThreadsLayout()) {
973  case layoutMinimalistic: thrdname = ThreadName(); break;
974  case layoutPerModule:
975  case layoutBalanced:
976  case layoutMaximal: thrdname = devname + "Thrd"; break;
977  default: thrdname = devname + "Thrd"; break;
978  }
979 
980  dev.MakeThreadForWorker(thrdname);
981 
982  cmd_res = cmd_true;
983  }
984  }
985 
986  if ((cmd_res==cmd_true) && !devname.empty()) {
987  LockGuard guard(fMgrMutex);
988  fLastCreatedDevName = devname;
989  }
990  } else
991  if (cmd.IsName(CmdDestroyDevice::CmdName())) {
992  std::string devname = cmd.GetStr("DevName");
993 
994  Reference dev = FindDevice(devname);
995 
996  cmd_res = cmd_bool(!dev.null());
997 
998  dev.Destroy();
999  } else
1000  if (cmd.IsName(CmdCreateTransport::CmdName())) {
1001 
1002  CmdCreateTransport crcmd = cmd;
1003  // TODO: make url xml node attribute as for parameter
1004 
1005  std::string trkind = crcmd.TransportKind();
1006  std::string portname = crcmd.PortName();
1007 
1008  PortRef port = FindPort(portname);
1009  if (trkind.empty()) {
1010  trkind = port.Cfg("url", cmd).AsStr();
1011 
1012  Url url(trkind);
1013  if (url.IsValid()) {
1014 
1015  bool hasoptions = url.GetOptions().length() > 0;
1016 
1017  for (int cnt = 0; cnt < 3; cnt++) {
1018  std::string optname = "urlopt";
1019  if (cnt>0) dabc::formats(optname,"urlopt%d",cnt);
1020  std::string tropt = port.Cfg(optname, cmd).AsStr();
1021  if (tropt.length() > 0) {
1022  trkind.append(hasoptions ? "&" : "?");
1023  trkind.append(tropt);
1024  hasoptions = true;
1025  }
1026  }
1027  }
1028  }
1029 
1030  if (port.null()) {
1031  EOUT("Ports %s not found - cannot create transport", crcmd.PortName().c_str());
1032  return cmd_false;
1033  }
1034 
1035  if (trkind.empty()) {
1036  // disconnect will be done via special command
1037  port.Disconnect();
1038  return cmd_true;
1039  }
1040 
1041  WorkerRef dev = FindDevice(trkind);
1042 
1043  Url url(trkind);
1044  if (dev.null() && url.IsValid() && ((url.GetProtocol()=="dev") || (url.GetProtocol()=="device")))
1045  dev = FindDevice(url.GetFullName());
1046 
1047  if (!dev.null()) {
1048  cmd.SetStr("url", trkind); // provide complete url to the device
1049  dev.Submit(cmd);
1050  return cmd_postponed;
1051  }
1052 
1053  PortRef port2 = FindPort(trkind);
1054  if (!port2.null()) {
1055  // this is local connection between two ports
1056  cmd_res = dabc::LocalTransport::ConnectPorts(port2, port, cmd);
1057  // connect also bind ports (if exists)
1058  if (cmd_res == cmd_true)
1059  cmd_res = dabc::LocalTransport::ConnectPorts(port.GetBindPort(), port2.GetBindPort(), cmd);
1060 
1061  return cmd_res;
1062  }
1063 
1064 
1065  cmd_res = cmd_false;
1066 
1067  DOUT1("%s create %s", port.ItemName().c_str(), trkind.c_str());
1068 
1069  ModuleRef tr;
1071  tr = factory->CreateTransport(port, trkind, cmd);
1072  if (!tr.null()) break;
1073  )
1074 
1075  if (!tr.null()) {
1076  if (portname != crcmd.PortName()) {
1077  DOUT0("Original port name %s for created transport was changed on the fly by %s", portname.c_str(), crcmd.PortName().c_str());
1078  portname = crcmd.PortName();
1079  port = FindPort(portname);
1080  if (port.null()) { tr.Destroy(); return cmd_false; }
1081  }
1082 
1083  std::string thrdname = port.Cfg(xmlThreadAttr, cmd).AsStr();
1084  if (thrdname.empty())
1085  switch (GetThreadsLayout()) {
1086  case layoutMinimalistic: thrdname = ThreadName(); break;
1087  case layoutPerModule: thrdname = port.GetModule().ThreadName(); break;
1088  case layoutBalanced: thrdname = port.GetModule().ThreadName() + (port.IsInput() ? "Inp" : "Out"); break;
1089  case layoutMaximal: thrdname = port.GetModule().ThreadName() + port.GetName(); break;
1090  default: thrdname = port.GetModule().ThreadName(); break;
1091  }
1092 
1093  DOUT3("Creating thread %s for transport", thrdname.c_str());
1094 
1095  if (!tr.MakeThreadForWorker(thrdname)) {
1096  EOUT("Fail to create thread for transport");
1097  tr.Destroy();
1098  } else {
1099  tr.ConnectPoolHandles();
1100  cmd_res = cmd_true;
1101  if (port.IsInput())
1102  dabc::LocalTransport::ConnectPorts(tr.FindPort(tr.OutputName(0, false)), port, cmd);
1103  if (port.IsOutput())
1104  dabc::LocalTransport::ConnectPorts(port, tr.FindPort(tr.InputName(0, false)), cmd);
1105  }
1106 
1107  DOUT3("Created transport for port %s is port connected %s", port.ItemName().c_str(), DBOOL(port.IsConnected()));
1108  }
1109  } else
1110 
1111  if (cmd.IsName(CmdDestroyTransport::CmdName())) {
1112  PortRef portref = FindPort(cmd.GetStr("PortName"));
1113  if (!portref.Disconnect())
1114  cmd_res = cmd_false;
1115  } else
1116 
1117  if (cmd.IsName(CmdCreateAny::CmdName())) {
1118  void* res = 0;
1120  res = factory->CreateAny(cmd.GetStr("ClassName"), cmd.GetStr("ObjectName"), cmd);
1121  if (res != 0) break;
1122  )
1123  cmd.SetPtr("ObjectPtr", res);
1124  cmd_res = cmd_true;
1125  } else
1126 
1127  if (cmd.IsName(CmdCreateThread::CmdName())) {
1128  const std::string &thrdname = cmd.GetStr(CmdCreateThread::ThrdNameArg());
1129  const std::string &thrdclass = cmd.GetStr("ThrdClass");
1130  const std::string &thrddev = cmd.GetStr("ThrdDev");
1131 
1132  ThreadRef thrd = DoCreateThread(thrdname, thrdclass, thrddev, cmd);
1133 
1134  if (thrd.null()) {
1135  cmd_res = cmd_false;
1136  } else {
1138  cmd_res = cmd_true;
1139  }
1140 
1141  } else
1142  if (cmd.IsName(CmdCreateMemoryPool::CmdName())) {
1143  cmd_res = cmd_bool(DoCreateMemoryPool(cmd));
1144  } else
1145  if (cmd.IsName(CmdCreateObject::CmdName())) {
1146  cmd.SetRef("Object", DoCreateObject(cmd.GetStr("ClassName"), cmd.GetStr("ObjName"), cmd));
1147  cmd_res = cmd_true;
1148  } else
1149  if (cmd.IsName(CmdCreateDataInput::CmdName())) {
1150  std::string kind = cmd.GetStr("Kind");
1151  DataInput* res = 0;
1153  res = factory->CreateDataInput(kind);
1154  if (res != 0) break;
1155  )
1156  cmd.SetPtr("DataInput", res);
1157  cmd_res = cmd_true;
1158  } else
1159  if (cmd.IsName(CmdCleanupApplication::CmdName())) {
1160  cmd_res = DoCleanupApplication();
1161  } else
1163 
1164  std::string name = cmd.GetStr(CmdModule::ModuleArg());
1165  dabc::WorkerRef ref;
1166 
1167  if (name.compare("*")==0)
1168  ref = app();
1169  else
1170  ref = FindModule(name);
1171 
1172  if (ref.Submit(cmd))
1173  cmd_res = cmd_postponed;
1174  else
1175  cmd_res = cmd_false;
1176  } else
1177  if (cmd.IsName(CmdDeleteModule::CmdName())) {
1179 
1180  cmd_res = cmd_bool(!ref.null());
1181 
1182  DOUT2("Stop and delete module %s", ref.GetName());
1183 
1184  ref.Destroy();
1185 
1186  DOUT2("Stop and delete module done");
1187 
1188  } else
1189  if (cmd.IsName(CmdDeletePool::CmdName())) {
1190  FindPool(cmd.GetStr("PoolName")).Destroy();
1191  } else
1192  if (cmd.IsName("Print")) {
1194  Thread* thrd = 0;
1195  while (iter1.next_cast(thrd, false))
1196  DOUT1("Thrd: %s", thrd->GetName());
1197 
1198  dabc::Iterator iter2(this);
1199  Module* m = 0;
1200  while (iter2.next_cast(m, false))
1201  DOUT1("Module: %s", m->GetName());
1202  } else
1203 
1204  // these are two special commands with postponed execution
1205  if (cmd.IsName(CmdGetNodesState::CmdName())) {
1206  GetCommandChannel().Submit(cmd);
1207 
1208  cmd_res = cmd_postponed;
1209  } else
1210  if (cmd.IsName("Ping")) {
1211  DOUT2("!!! PING !!!");
1212  cmd_res = cmd_true;
1213  } else
1214  if (cmd.IsName("ParameterEventSubscription")) {
1215  Worker* worker = (Worker*) cmd.GetPtr("Worker");
1216  std::string mask = cmd.GetStr("Mask");
1217  std::string remote = cmd.GetStr("RemoteWorker");
1218 
1219  if (cmd.GetBool("IsSubscribe")) {
1220 
1221  // first add empty record to avoid usage of copy constructor
1224 
1225  if (worker) worker->SetCleanupBit();
1226  rec.recv = worker;
1227  rec.remote_recv = remote;
1228  rec.name_mask = mask;
1229  rec.only_change = cmd.GetBool("OnlyChange");
1230  } else {
1231  ParamEventReceiverList::iterator iter = fParEventsReceivers->begin();
1232  while (iter!=fParEventsReceivers->end()) {
1233  if ((iter->name_mask==mask) && (iter->recv == worker) && (iter->remote_recv == remote))
1234  fParEventsReceivers->erase(iter++);
1235  else
1236  iter++;
1237  }
1238  }
1239  cmd_res = cmd_true;
1240  } else
1241  if (cmd.IsName("StopManagerMainLoop")) {
1243  } else
1244  cmd_res = cmd_false;
1245 
1246  return cmd_res;
1247 }
1248 
1250 {
1251  LockGuard guard(fMgrMutex);
1252  return fLastCreatedDevName;
1253 }
1254 
1256 {
1257  if (cmd.null()) return false;
1258 
1259  std::string poolname = cmd.GetStr(xmlPoolName);
1260  if (poolname.empty()) {
1261  EOUT("Pool name is not specified");
1262  return false;
1263  }
1264 
1265  MemoryPoolRef ref = FindPool(poolname);
1266  if (ref.null()) {
1267  ref = new dabc::MemoryPool(poolname, true);
1268 
1269  ref()->Reconstruct(cmd);
1270 
1271  // TODO: make thread name for pool configurable
1272  ref()->AssignToThread(thread());
1273  ref.Start();
1274  }
1275 
1276  return !ref.null();
1277 }
1278 
1280 {
1281  // ReplyCommand return true, when command can be safely deleted
1282 
1283  if (cmd.IsName(CmdParameterEvent::CmdName())) {
1284 
1285  void* origin = cmd.GetPtr("#Iterator");
1286 
1287  for (ParamEventReceiverList::iterator iter = fParEventsReceivers->begin();
1288  iter != fParEventsReceivers->end(); iter++) {
1289 
1290  if (&(*iter) == origin) {
1291  iter->queue--;
1292  if (iter->queue<0)
1293  DOUT2("Internal error - parameters event queue negative");
1294  return true;
1295  }
1296  }
1297 
1298  DOUT2("Did not find original record with receiver for parameter events");
1299 
1300  return true;
1301  }
1302 
1303  if (cmd.IsName(CmdStateTransition::CmdName())) {
1304  // manager receive reply on this command only during normal shutdown
1305  fAppFinished = true;
1306  return true;
1307  }
1308 
1309  return dabc::Worker::ReplyCommand(cmd);
1310 }
1311 
1313 {
1314  Execute("Print");
1315 }
1316 
1318 {
1319  Object *obj = ref();
1320 
1321  if (!obj) return true;
1322 
1323  if (obj->IsLogging())
1324  DOUT0("dabc::Manager::DestroyObject %p %s", obj, obj->GetName());
1325 
1326  bool fire = false;
1327 
1328  {
1329  LockGuard lock(fMgrMutex);
1330 
1331  // analyze that object presented in some dependency lists and mark record to process it
1332  DependPairList::iterator iter = fDepend->begin();
1333  while (iter != fDepend->end()) {
1334  if (iter->src() == obj) {
1335  iter->fire = iter->fire | 1;
1336  if (obj->IsLogging())
1337  DOUT0("Find object %p as dependency source", obj);
1338  }
1339 
1340  if (iter->tgt == obj) {
1341  iter->fire = iter->fire | 2;
1342  if (obj->IsLogging())
1343  DOUT0("Find object %p as dependency target", obj);
1344  }
1345 
1346  iter++;
1347  }
1348 
1349  if (fDestroyQueue==0) {
1351  fire = true;
1352  }
1353 
1354  fDestroyQueue->Add(ref);
1355  }
1356 
1357  // FIXME: check that thread is working - probably we can destroy ourself if manager does not active
1358  if (fire) FireEvent(evntDestroyObj);
1359 
1360  return true;
1361 }
1362 
1364 {
1365  // destroy application if exist
1366  app().Destroy();
1367 
1369 
1371 
1372  return true;
1373 }
1374 
1375 void dabc::Manager::Sleep(double tmout, const char* prefix)
1376 {
1377  if (tmout<=0.) return;
1378 
1379  ThreadRef thrd = CurrentThread();
1380 
1381  if (thrd.null()) {
1382  if (prefix) {
1383  fprintf(stdout, "%s ", prefix);
1384  int sec = lrint(tmout);
1385  while (sec-->0) {
1386  fprintf(stdout, "\b\b\b%3d", sec);
1387  fflush(stdout);
1388  dabc::Sleep(1);
1389  }
1390  fprintf(stdout, "\n");
1391  } else {
1392  dabc::Sleep(tmout);
1393  }
1394  } else {
1395  if (prefix) {
1396  fprintf(stdout, "%s ", prefix);
1397  fflush(stdout);
1398  }
1399 
1400  TimeStamp finish = dabc::Now() + tmout;
1401  double remain;
1402 
1403  while ((remain = finish - dabc::Now()) > 0) {
1404 
1405  if (prefix) {
1406  fprintf(stdout, "\b\b\b%3d", (int) lrint(remain));
1407  fflush(stdout);
1408  }
1409 
1410  thrd.RunEventLoop(remain > 1. ? 1 : remain);
1411  }
1412 
1413  if (prefix) {
1414  fprintf(stdout, "\b\b\b\n");
1415  fflush(stdout);
1416  }
1417  }
1418 }
1419 
1420 
1421 std::string dabc::Manager::GetNodeAddress(int nodeid)
1422 {
1423  LockGuard lock(fMgrMutex);
1424 
1425  if ((nodeid<0) || (nodeid>=fNumNodes)) return std::string();
1426 
1427  if (!fCfg) return std::string();
1428 
1429  Url url(fCfg->NodeName(nodeid));
1431 }
1432 
1433 
1435 {
1436  return fLocalAddress;
1437 }
1438 
1439 std::string dabc::Manager::ComposeAddress(const std::string &server, const std::string &itemname)
1440 {
1441  std::string res = server;
1442  if (res.empty()) res = GetLocalAddress();
1443  if (res.empty()) res = "localhost";
1444 
1445  if (res.find("dabc://")!=0) res = std::string("dabc://") + res;
1446 
1447  if (!itemname.empty()) {
1448  if (itemname[0]!='/') res += "/";
1449  res += itemname;
1450  }
1451  return res;
1452 }
1453 
1454 bool dabc::Manager::DecomposeAddress(const std::string &addr, bool& islocal, std::string& server, std::string& itemtname)
1455 {
1456 
1457  dabc::Url url;
1458 
1459 // DOUT0("Url %s valid %d protocol %s host %s file %s", name, url.IsValid(), url.GetProtocol().c_str(), url.GetHostName().c_str(), url.GetFileName().c_str());
1460 
1461  if (!url.SetUrl(addr, false)) return false;
1462 
1463  if (url.GetProtocol().length()==0) {
1464  islocal = true;
1465  server.clear();
1466  itemtname = addr;
1467  return true;
1468  }
1469 
1470  if (url.GetProtocol().compare("dabc")!=0) return false;
1471 
1472  islocal = false;
1473  server = url.GetHostNameWithPort();
1474  itemtname = url.GetFileName();
1475 
1476  int nodeid = -1;
1477  if (server.compare(0, 4, "node")==0) {
1478  if (!str_to_int(server.c_str() + 4, &nodeid)) nodeid = -1;
1479  }
1480 
1481  if (nodeid>=0) server = GetNodeAddress(nodeid);
1482 
1483  if ((nodeid>=0) && (fNodeId==nodeid)) islocal = true; else
1484  if (server == fLocalAddress) islocal = true; else
1485  if (server == "localhost") islocal = true;
1486 
1487  if (islocal) server = fLocalAddress;
1488 
1489  return true;
1490 }
1491 
1492 
1493 bool dabc::Manager::RegisterDependency(Object* src, Object* tgt, bool bidirectional)
1494 {
1495  if ((src==0) || (tgt==0)) return false;
1496 
1497  // one only need cleanup for tgt, for src Reference will force object call
1498  tgt->SetCleanupBit();
1499 
1500  // we create record outside that mutexes not block each other
1501  DependPair rec(src,tgt);
1502 
1503  {
1504  LockGuard guard(fMgrMutex);
1505 
1506  fDepend->push_back(rec);
1507  }
1508 
1509  if (!bidirectional) return true;
1510 
1511  return RegisterDependency(tgt, src, false);
1512 }
1513 
1514 bool dabc::Manager::UnregisterDependency(Object* src, Object* tgt, bool bidirectional)
1515 {
1516  if ((src==0) || (tgt==0)) return false;
1517 
1518  DependPair rec;
1519 
1520  {
1521  LockGuard guard(fMgrMutex);
1522  DependPairList::iterator iter = fDepend->begin();
1523  while (iter != fDepend->end()) {
1524  if ((iter->src() == src) && (iter->tgt == tgt)) {
1525  rec = *iter; // we should not release reference inside manager mutex
1526  fDepend->erase(iter++);
1527  break;
1528  } else
1529  iter++;
1530  }
1531  }
1532 
1533  // just do it in clear code and outside manager mutex
1534  rec.src.Release();
1535 
1536  if (!bidirectional) return true;
1537 
1538  return UnregisterDependency(tgt, src, false);
1539 }
1540 
1541 double dabc::Manager::ProcessTimeout(double last_diff)
1542 {
1544 
1545  // we can process timeouts without mutex while vector can be only changed from the thread itself
1546  if (fTimedPars!=0)
1547  for (unsigned n=0; n<fTimedPars->GetSize(); n++) {
1549  if (par) par->ProcessTimeout(last_diff);
1550  }
1551 
1552  return 1.;
1553 }
1554 
1557 
1560 
1562 {
1563  if (factory==0) return;
1564 
1565  DOUT2("Instantiate factory %s", factory->GetName());
1566 
1568  fInstance->GetFactoriesFolder(true).AddChild(factory);
1569  return;
1570  }
1571 
1572  // printf("Manager is not exists when factory %s is created\n", factory->GetName());
1573 
1575  for (unsigned n=0;n<sizeof(fFirstFactories)/sizeof(void*); n++)
1576  if (fFirstFactories[n] == 0) {
1577  fFirstFactories[n] = factory;
1578  break;
1579  }
1580  } else {
1581  // printf("Init first factories arrary %u\n", (unsigned) (sizeof(fFirstFactories)/sizeof(void*)));
1583  fFirstFactories[0] = factory;
1584  for (unsigned n=1;n<sizeof(fFirstFactories)/sizeof(void*); n++) fFirstFactories[n] = 0;
1585  }
1586 
1587 }
1588 
1590 {
1591  // during shutdown do not try to reopen log file
1593 
1594  DOUT0("Process CTRL-C signal");
1595 
1596  if (fMgrStoppedTime.null()) {
1598  return;
1599  }
1600 
1601  double spent = fMgrStoppedTime.SpentTillNow();
1602 
1603  // TODO: make 10 second configurable
1604  if (spent<10.) return;
1605 
1606  DOUT0("Ctrl-C repeated more than after 10 sec out of main loop - force manager halt");
1607 
1608  HaltManager();
1609 
1610  DOUT0("Exit after Ctrl-C");
1611 
1612  exit(0);
1613 }
1614 
1616 {
1617  // during shutdown do not try to reopen log file
1619 
1620  DOUT0("Process signal SIGPIPE - Socket error from plug-in libraries?");
1621 
1622  if (fMgrStoppedTime.null()) {
1624  return;
1625  }
1626 
1627 }
1628 
1629 
1630 
1632 {
1633  DOUT2("Enter dabc::Manager::RunManagerMainLoop");
1634 
1635  ThreadRef thrd = thread();
1636  if (thrd.null()) return;
1637 
1638  if (!fMgrStoppedTime.null()) {
1639  DOUT1("Manager stopped before entering to the mainloop - stop running");
1640  return;
1641  }
1642 
1643  if (runtime>0)
1644  DOUT0("Application mainloop will run for %3.1f s", runtime);
1645  else
1646  DOUT0("Application mainloop is now running");
1647  DOUT0(" Press Ctrl-C for stop");
1648 
1649  if (thrd.IsRealThrd()) {
1650  DOUT3("Manager has normal thread - just wait until application modules are stopped");
1651  } else {
1652  DOUT3("Run manager thread mainloop inside main process");
1653 
1654  // to be sure that timeout processing is active
1655  // only via timeout one should be able to stop processing of main loop
1656  ActivateTimeout(1.);
1657  }
1658 
1659  TimeStamp starttm = dabc::Now();
1660 
1661  bool appstopped = false;
1662 
1663  ApplicationRef appref = app();
1664 
1665  // we run even loop in units of 0.1 sec
1666  // TODO: make 0.1 sec configurable
1667  double period = 0.1;
1668 
1669  while (true) {
1670 
1671  if (thrd.IsRealThrd())
1672  dabc::Sleep(period);
1673  else
1674  thrd.RunEventLoop(period);
1675 
1676  if (appstopped && fAppFinished) break;
1677 
1678  // check if stop time was not set
1679  if (fMgrStoppedTime.null()) {
1680  if ((runtime <= 0) || !starttm.Expired(runtime)) continue;
1681  DOUT0("++++++++ Set stop time while runtime expired");
1683  }
1684 
1685  period = 0.001; // perform checks more often
1686 
1687  if (!appstopped) {
1688  appstopped = true;
1690  }
1691 
1692  if (fMgrStoppedTime.Expired(10.)) break; // TODO: make 10 second configurable
1693  }
1694 
1695  DOUT2("Exit dabc::Manager::RunManagerMainLoop");
1696 }
1697 
1698 
1700 {
1701  struct timeval tv;
1702  fd_set fds;
1703  tv.tv_sec = 0;
1704  tv.tv_usec = 0;
1705  FD_ZERO(&fds);
1706  FD_SET(STDIN_FILENO, &fds);
1707  select(STDIN_FILENO+1, &fds, NULL, NULL, &tv);
1708  return (FD_ISSET(STDIN_FILENO, &fds));
1709 }
1710 
1711 #include <iostream>
1712 
1713 void dabc::Manager::RunManagerCmdLoop(double runtime, const std::string &remnode)
1714 {
1715  DOUT0("Enter dabc::Manager::RunManagerCmdLoop");
1716 
1717  ThreadRef thrd = thread();
1718 
1719  if (thrd.null()) return;
1720 
1721  if (GetCommandChannel().null()) {
1722  EOUT("No command channel found");
1723  return;
1724  }
1725 
1726  if (thrd.IsRealThrd()) {
1727  DOUT3("Manager has normal thread - just wait until application modules are stopped");
1728  } else {
1729  DOUT3("Run manager thread mainloop inside main process");
1730 
1731  // to be sure that timeout processing is active
1732  // only via timeout one should be able to stop processing of main loop
1733  ActivateTimeout(1.);
1734  }
1735 
1736  TimeStamp start = dabc::Now();
1737 
1738  std::string tgtnode;
1739 
1740  Hierarchy rem_hierarchy;
1741 
1742  bool first(true);
1743 
1744  while (true) {
1745 
1746  // we run even loop in units of 0.1 sec
1747  // TODO: make 0.1 sec configurable
1748  if (thrd.IsRealThrd())
1749  dabc::Sleep(0.001);
1750  else
1751  thrd.RunEventLoop(0.001);
1752 
1753  if ((runtime>0) && start.Expired(runtime)) {
1754  DOUT0("run time is over");
1755  break;
1756  }
1757 
1758  if (!fMgrStoppedTime.null()) {
1759  DOUT0("break command shell");
1760  break;
1761  }
1762 
1763  std::string str;
1764 
1765  if (first && !remnode.empty()) {
1766  first = false;
1767  str = std::string("connect ") + remnode;
1768  printf("cmd>%s\n",str.c_str());
1769  } else {
1770  first = false;
1771  if (inputAvailable()<=0) continue;
1772  printf("cmd>"); fflush(stdout);
1773  std::getline(std::cin, str);
1774  }
1775 
1776  if (str.empty()) continue;
1777 
1778  if ((str=="quit") || (str=="exit") || (str==".q") || (str=="q")) break;
1779 
1780  dabc::Command cmd;
1781  if (!cmd.ReadFromCmdString(str)) {
1782  EOUT("Wrong syntax %s", str.c_str());
1783  continue;
1784  }
1785 
1786  if (cmd.IsName("connect")) {
1787  std::string node = dabc::MakeNodeName(cmd.GetStr("Arg0"));
1788 
1789  if (node.empty()) {
1790  EOUT("Node name not specified correctly");
1791  continue;
1792  }
1793 
1794  tgtnode = node;
1795 
1796  dabc::Command cmd2("Ping");
1797  cmd2.SetReceiver(tgtnode);
1798  cmd2.SetTimeout(5.);
1799 
1800  int res = GetCommandChannel().Execute(cmd2);
1801 
1802  if (res == cmd_true) {
1803  DOUT0("Connection to %s done", tgtnode.c_str());
1804  } else {
1805  DOUT0("FAIL connection to %s", tgtnode.c_str());
1806  tgtnode.clear();
1807  }
1808 
1809  continue;
1810  }
1811 
1812  if (tgtnode.empty()) {
1813  DOUT0("Tgt node not connected, command %s not executed", cmd.GetName());
1814  continue;
1815  }
1816 
1817  if (cmd.IsName("close") || cmd.IsName("disconnect")) {
1818  cmd.SetStr("host", tgtnode);
1819  GetCommandChannel().Execute(cmd);
1820  tgtnode.clear();
1821  rem_hierarchy.Release();
1822  continue;
1823  }
1824 
1825  if (cmd.IsName("update")) {
1826 
1827  rem_hierarchy = dabc::GetNodeHierarchy(tgtnode);
1828 
1829  continue;
1830  }
1831 
1832  if (cmd.IsName("ls")) {
1833  if (!rem_hierarchy.null())
1834  DOUT0("xml = ver %u \n%s", (unsigned) rem_hierarchy.GetVersion(), rem_hierarchy.SaveToXml().c_str());
1835  else
1836  DOUT0("No hierarchy available");
1837  continue;
1838  }
1839 
1840  if (cmd.IsName("get")) {
1841  std::string path = cmd.GetStr("Arg0");
1842  int hlimit = cmd.GetInt("Arg1");
1843 
1844  std::string query;
1845  if (hlimit>0) query = dabc::format("history=%d",hlimit);
1846 
1847  CmdGetBinary cmd2(path, "hierarchy", query);
1848  cmd2.SetTimeout(5);
1849 
1850  if (GetCommandChannel().Execute(cmd2)!=cmd_true) {
1851  DOUT0("Fail to get item %s", path.c_str());
1852  continue;
1853  }
1854 
1855  dabc::Hierarchy res;
1856  res.Create("get");
1857  res.SetVersion(cmd2.GetUInt("version"));
1858  res.ReadFromBuffer(cmd2.GetRawData());
1859 
1860  DOUT0("GET:%s len:%d RES = \n%s", path.c_str(), hlimit, res.SaveToXml().c_str());
1861 
1862  continue;
1863  }
1864 
1865 
1866  cmd.SetReceiver(tgtnode);
1867  cmd.SetTimeout(5.);
1868 
1869  int res = GetCommandChannel().Execute(cmd);
1870 
1871  if (res == cmd_timedout)
1872  DOUT0("Command %s timeout", cmd.GetName());
1873  else
1874  DOUT0("Command %s res = %d", cmd.GetName(), res);
1875  }
1876 }
1877 
1878 
1879 
1881 {
1882  while (cfg.FindItem(xmlContext)) {
1883  if (!fCfgHost.empty())
1884  if (!cfg.CheckAttr(xmlHostAttr, fCfgHost.c_str())) continue;
1885 
1886  if (fCfgHost != GetName())
1887  if (!cfg.CheckAttr(xmlNameAttr, GetName())) continue;
1888 
1889  return true;
1890  }
1891 
1892  return false;
1893 }
1894 
1895 
1896 // =================================== classes from ManagerRef class ==================================
1897 
1898 
1899 dabc::ThreadRef dabc::Manager::FindThread(const std::string &name, const std::string &required_class)
1900 {
1901  ThreadRef ref = GetThreadsFolder().FindChild(name.c_str());
1902 
1903  if (ref.null()) return ref;
1904 
1905  if (!required_class.empty() && !ref()->CompatibleClass(required_class)) ref.Release();
1906 
1907  return ref;
1908 }
1909 
1911 {
1912  ReferencesVector vect;
1913 
1914  if (GetThreadsFolder().GetAllChildRef(&vect))
1915  while (vect.GetSize()>0) {
1916  ThreadRef thrd = vect.TakeLast();
1917  if (thrd.IsItself()) return thrd;
1918  }
1919 
1920  return ThreadRef();
1921 }
1922 
1923 dabc::ThreadRef dabc::Manager::DoCreateThread(const std::string &thrdname, const std::string &thrdclass, const std::string &thrddev, Command cmd)
1924 {
1925  std::string newname = thrdname;
1926  int basecnt = 0;
1927 
1928  ThreadRef thrd = FindThread(newname);
1929 
1930  while ((basecnt<1000) && !thrd.null()) {
1931  if (thrd()->CompatibleClass(thrdclass)) return thrd;
1932 
1933  EOUT("Thread %s of class %s exists and incompatible with %s class",
1934  thrdname.c_str(), thrd()->ClassName(), (thrdclass.empty() ? "---" : thrdclass.c_str()) );
1935 
1936  newname = dabc::format("%s_%d", thrdname.c_str(), basecnt++);
1937  thrd = FindThread(newname);
1938  }
1939 
1940  if (!thrd.null()) {
1941  EOUT("Too many incompatible threads with name %s", thrdname.c_str());
1942  exit(765);
1943  }
1944 
1945  DOUT3("CreateThread %s of class %s, is any %p", newname.c_str(), (thrdclass.empty() ? "---" : thrdclass.c_str()), thrd());
1946 
1948  thrd = factory->CreateThread(GetThreadsFolder(true), thrdclass, newname, thrddev, cmd);
1949  if (!thrd.null()) break;
1950  )
1951 
1952  DOUT3("CreateThread %s done %p", newname.c_str(), thrd());
1953 
1954  bool noraml_thread = true;
1955  if ((newname == MgrThrdName()) && cfg())
1956  noraml_thread = cfg()->NormalMainThread();
1957 
1958  DOUT3("Starting thread %s as normal %s refcnt %d", thrd.GetName(), DBOOL(noraml_thread), thrd.NumReferences());
1959 
1960  if (!thrd.null())
1961  if (!thrd()->Start(10, noraml_thread)) {
1962  EOUT("Thread %s cannot be started!!!", newname.c_str());
1963  thrd.Destroy();
1964  }
1965 
1966  DOUT3("Create thread %s of class %s thrd %p refcnt %d done", newname.c_str(), (thrdclass.empty() ? "---" : thrdclass.c_str()), thrd(), thrd.NumReferences());
1967 
1968  return thrd;
1969 }
1970 
1971 
1972 // ========================================== ManagerRef methods ================================
1973 
1974 
1975 bool dabc::ManagerRef::CreateApplication(const std::string &classname, const std::string &appthrd)
1976 {
1977  return Execute(CmdCreateApplication(classname, appthrd));
1978 }
1979 
1980 
1981 dabc::ModuleRef dabc::ManagerRef::CreateModule(const std::string &classname, const std::string &modulename, const std::string &thrdname)
1982 {
1983  CmdCreateModule cmd(classname, modulename, thrdname);
1984 
1985  return Execute(cmd) ? FindModule(modulename) : dabc::ModuleRef();
1986 }
1987 
1988 dabc::ThreadRef dabc::ManagerRef::CreateThread(const std::string &thrdname, const std::string &classname, const std::string &devname)
1989 {
1990  CmdCreateThread cmd(thrdname, classname, devname);
1991 
1992  return Execute(cmd) == cmd_true ? FindThread(cmd.GetThrdName()) : dabc::ThreadRef();
1993 }
1994 
1996 {
1997  return GetObject() ? GetObject()->CurrentThread() : dabc::ThreadRef();
1998 }
1999 
2000 
2001 bool dabc::ManagerRef::CreateDevice(const std::string &classname, const std::string &devname)
2002 {
2003  return Execute(CmdCreateDevice(classname, devname));
2004 }
2005 
2006 
2007 bool dabc::ManagerRef::DeleteDevice(const std::string &devname)
2008 {
2009  return Execute(CmdDestroyDevice(devname));
2010 }
2011 
2012 
2014 {
2015  return GetObject() ? GetObject()->FindDevice(name) : dabc::WorkerRef();
2016 }
2017 
2019 {
2020  return GetObject() ? GetObject()->FindModule(name) : dabc::ModuleRef();
2021 }
2022 
2023 void dabc::ManagerRef::StartModule(const std::string &modulename)
2024 {
2025  Execute(dabc::CmdStartModule(modulename));
2026 }
2027 
2028 void dabc::ManagerRef::StopModule(const std::string &modulename)
2029 {
2030  Execute(dabc::CmdStopModule(modulename));
2031 }
2032 
2034 {
2035  return Execute(CmdStartModule("*"));
2036 }
2037 
2039 {
2040  return Execute(CmdStopModule("*"));
2041 }
2042 
2043 bool dabc::ManagerRef::DeleteModule(const std::string &name)
2044 {
2045  return Execute(CmdDeleteModule(name));
2046 }
2047 
2048 bool dabc::ManagerRef::DeletePool(const std::string &name)
2049 {
2050  return Execute(CmdDeletePool(name));
2051 }
2052 
2053 
2055 {
2056  return GetObject() ? GetObject()->FindItem(name) : Reference();
2057 }
2058 
2059 dabc::Reference dabc::ManagerRef::FindPort(const std::string &portname)
2060 {
2061  return GetObject() ? GetObject()->FindPort(portname) : Reference();
2062 }
2063 
2065 {
2066  return GetObject() ? GetObject()->FindPool(name) : Reference();
2067 }
2068 
2069 dabc::Parameter dabc::ManagerRef::FindPar(const std::string &parname)
2070 {
2071  return GetObject() ? GetObject()->FindItem(parname) : Reference();
2072 }
2073 
2075 {
2076  return GetObject() ? GetObject()->app() : dabc::ApplicationRef();
2077 }
2078 
2080 {
2081  // this method must delete all threads, modules and pools and clean device drivers
2082 
2083  return Execute(CmdCleanupApplication());
2084 }
2085 
2087 {
2088  return GetObject() ? GetObject()->NodeId() : 0;
2089 }
2090 
2092 {
2093  return GetObject() ? GetObject()->NumNodes() : 0;
2094 }
2095 
2096 bool dabc::ManagerRef::ParameterEventSubscription(Worker* ptr, bool subscribe, const std::string &mask, bool onlychangeevent)
2097 {
2098  if (ptr == 0) return false;
2099 
2100  // TODO: by the subscription to remote node first register receiver on local node and
2101  // only then submit registration to remote.
2102  // One should avoid multiple parallel subscription to remote node
2103 
2104  std::string server, itemname;
2105  bool islocal(true);
2106 
2107  if (!DecomposeAddress(mask, islocal, server, itemname)) {
2108  EOUT("Wrong parameter mask %s", mask.c_str());
2109  return false;
2110  }
2111 
2112  Command cmd("ParameterEventSubscription");
2113 
2114  cmd.SetBool("IsSubscribe", subscribe);
2115  cmd.SetStr("Mask", mask);
2116  cmd.SetBool("OnlyChange", onlychangeevent);
2117 
2118  if (islocal) {
2119  // this is registration for local parameters
2120  cmd.SetPtr("Worker", ptr);
2121  return Execute(cmd);
2122  }
2123 
2124  cmd.SetStr("RemoteWorker", ComposeAddress("", ptr->ItemName()));
2125  cmd.SetReceiver(ComposeAddress(server));
2126 
2127  // do registration asynchron
2128  return Submit(cmd);
2129 }
2130 
2131 bool dabc::ManagerRef::IsLocalItem(const std::string &name)
2132 {
2133  Url url;
2134  if (!url.SetUrl(name, false)) return true;
2135  return url.GetProtocol().compare("dabc") != 0;
2136 }
2137 
2138 dabc::ConnectionRequest dabc::ManagerRef::Connect(const std::string &port1name, const std::string &port2name)
2139 {
2140  // configure conection between two ports
2141  // Normally port1 is output and port2 is input port1->port2
2142  // But bidirectional connection can be established as well
2143 
2144  if (GetObject()==0) return dabc::ConnectionRequest();
2145 
2146  PortRef port1 = FindPort(port1name);
2147  PortRef port2 = FindPort(port2name);
2148 
2149  if (!port1.null() && !port2.null()) {
2150  // make local connection immediately
2151  dabc::LocalTransport::ConnectPorts(port1, port2);
2152  // connect also bind ports (if exists)
2154  return dabc::ConnectionRequest();
2155  }
2156 
2157  if (IsLocalItem(port1name) && port1.null()) {
2158  EOUT("Did not found port %s", port1name.c_str());
2159  return dabc::ConnectionRequest();
2160  }
2161 
2162  if (IsLocalItem(port2name) && port2.null()) {
2163  EOUT("Did not found port %s", port2name.c_str());
2164  return dabc::ConnectionRequest();
2165  }
2166 
2167  if (port1.null() && port2.null()) return dabc::ConnectionRequest();
2168 
2169  if (GetObject()->GetCommandChannel().null()) {
2170  EOUT("Not possible to establish remote connection without command channel");
2171  return dabc::ConnectionRequest();
2172  }
2173 
2174  ModuleRef m = FindModule(Manager::ConnMgrName());
2175 
2176  if (m.null())
2177  CreateModule("dabc::ConnectionManager", Manager::ConnMgrName(), Manager::MgrThrdName());
2178 
2179  DOUT2("Connect ports %s %p %s %p", port1name.c_str(), port1(), port2name.c_str(), port2());
2180 
2182 
2183  if (!port1.null())
2184  req = port1.MakeConnReq(port2name, true);
2185 
2186  if (!port2.null())
2187  req = port2.MakeConnReq(port1name, false);
2188 
2189  // if not configured differently, specify
2190  if (req.GetConnDevice().empty() && GetObject())
2191  req.SetConnDevice(GetObject()->GetLastCreatedDevName());
2192 
2193  return req;
2194 }
2195 
2196 
2197 bool dabc::ManagerRef::ActivateConnections(double tmout, bool sync)
2198 {
2199  // ensure that all commands are executed, for instance creation of connection manager is done
2200  if (sync) SyncWorker();
2201 
2202  ModuleRef conn = FindModule(Manager::ConnMgrName());
2203  if (conn.null()) return true;
2204 
2205  dabc::Command cmd("ActivateConnections");
2206  cmd.SetTimeout(tmout);
2207  return sync ? conn.Execute(cmd) : conn.Submit(cmd);
2208 }
2209 
2210 bool dabc::ManagerRef::CreateTransport(const std::string &portname, const std::string &transportkind, const std::string &thrdname)
2211 {
2212  PortRef port = FindPort(portname);
2213 
2214  if (port.null()) return false;
2215 
2216  return Execute(CmdCreateTransport(portname, transportkind, thrdname));
2217 }
2218 
2219 void* dabc::ManagerRef::CreateAny(const std::string &classname, const std::string &objname)
2220 {
2221  CmdCreateAny cmd;
2222  cmd.SetStr("ClassName", classname);
2223  cmd.SetStr("ObjectName", objname);
2224 
2225  if (Execute(cmd) != cmd_true) return 0;
2226 
2227  return cmd.GetPtr("ObjectPtr");
2228 }
2229 
2230 
2232 {
2233  // Manager will be stopped regularly if it is in running
2234 
2235  Submit(dabc::Command("StopManagerMainLoop"));
2236 
2237 }
2238 
2239 bool dabc::ManagerRef::CreateMemoryPool(const std::string &poolname,
2240  unsigned buffersize,
2241  unsigned numbuffers)
2242 {
2243 
2244  CmdCreateMemoryPool cmd(poolname);
2245 
2246  cmd.SetMem(buffersize, numbuffers);
2247 
2248  return Execute(cmd);
2249 }
2250 
2251 dabc::Reference dabc::ManagerRef::CreateObject(const std::string &classname, const std::string &objname)
2252 {
2253  CmdCreateObject cmd(classname, objname);
2254 
2255  if (!Execute(cmd)) return dabc::Reference();
2256 
2257  return cmd.GetRef("Object");
2258 }
2259 
2261 {
2262  CmdCreateDataInput cmd;
2263  cmd.SetStr("Kind", kind);
2264  if (!Execute(cmd)) return 0;
2265 
2266  return (dabc::DataInput*) cmd.GetPtr("DataInput");
2267 }
2268 
2269 void dabc::ManagerRef::Sleep(double tmout, const char* prefix)
2270 {
2271  if (GetObject())
2272  GetObject()->Sleep(tmout, prefix);
2273  else
2274  dabc::Sleep(tmout);
2275 }
2276 
2277 bool dabc::ManagerRef::CreateControl(bool withserver, int serv_port, bool allow_clients)
2278 {
2279  if (null()) return false;
2280 
2281  WorkerRef ref = GetCommandChannel();
2282  if (!ref.null()) return true;
2283 
2284  dabc::CmdCreateObject cmd("SocketCommandChannel", dabc::Manager::CmdChlName());
2285  cmd.SetBool("WithServer", withserver);
2286  cmd.SetBool("ClientsAllowed", allow_clients);
2287  if (withserver) {
2288  int port = 0;
2289  std::string host;
2290  if (GetObject()->cfg()) {
2291  port = GetObject()->cfg()->MgrPort();
2292  host = GetObject()->cfg()->MgrHost();
2293  if (host.empty()) host = dabc::Configuration::GetLocalHost();
2294  }
2295  if (port<=0) port = serv_port;
2296  if (port<=0) port = defaultDabcPort;
2297  cmd.SetStr("ServerHost", host);
2298  cmd.SetInt("ServerPort", port);
2299  }
2300 
2301  ref = GetObject()->DoCreateObject("SocketCommandChannel", dabc::Manager::CmdChlName(), cmd);
2302 
2303  if (ref.null()) return false;
2304 
2305  std::string localaddr = cmd.GetStr("localaddr");
2306  if (!localaddr.empty() /* && localaddr.find("localhost") == std::string::npos*/)
2307  GetObject()->fLocalAddress = localaddr;
2308 
2309  ref.MakeThreadForWorker("CmdThrd");
2310 
2311  return true;
2312 }
2313 
2314 
2316 {
2317  PublisherRef ref = FindItem(dabc::Publisher::DfltName());
2318  if (!ref.null()) return true;
2319 
2321  ref.MakeThreadForWorker("PublisherThrd");
2322 
2323  return true;
2324 }
int inputAvailable()
Definition: Manager.cxx:1699
#define FOR_EACH_FACTORY(arg)
Definition: Manager.cxx:812
dabc::AutoDestroyClass auto_destroy_instance
Definition: Manager.cxx:235
Reference on dabc::Application class
Definition: Application.h:178
Base class for user-specific applications.
Definition: Application.h:73
void * ExternalFunction()
Definition: Application.h:80
static const char * stHalted()
Definition: Application.h:83
static bool fAutoDestroy
Definition: Manager.cxx:216
Binary file input object.
Definition: BinaryFileIO.h:36
Binary file output object.
Definition: BinaryFileIO.h:67
unsigned Write_Buffer(Buffer &buf)
Start writing of buffer to output.
Definition: Manager.cxx:130
BlockingOutput(const dabc::Url &url)
Definition: Manager.cxx:119
int fErrCounter
counter till error
Definition: Manager.cxx:116
Reference on memory from memory pool.
Definition: Buffer.h:135
static const char * CmdName()
Definition: Manager.h:123
void SetMem(unsigned buffersize, unsigned numbuffers, unsigned align=0)
Definition: MemoryPool.h:227
static const char * CmdName()
Definition: Manager.h:73
static const char * ThrdNameArg()
Definition: Manager.h:148
std::string GetThrdName() const
Definition: Manager.h:158
static const char * CmdName()
Definition: Manager.h:146
std::string PortName() const
Definition: Manager.h:195
std::string TransportKind() const
Definition: Manager.h:196
static const char * CmdName()
Definition: Manager.h:99
static const char * CmdName()
Definition: Manager.h:54
static const char * CmdName()
Definition: Manager.h:135
Command used to produce custom binary data for published in hierarchy entries.
Definition: Publisher.h:33
static const char * ModuleArg()
Definition: Manager.h:63
This command used to distribute parameter event to receivers.
Definition: Worker.h:517
static const char * CmdName()
Definition: Manager.h:85
static const char * CmdName()
Definition: Manager.h:92
Represents command with its arguments.
Definition: Command.h:99
bool ReadFromCmdString(const std::string &str)
Read command from string, which is typed in std output.
Definition: Command.cxx:292
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
Definition: Command.cxx:151
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
Definition: Command.h:148
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetBool(const std::string &name, bool v)
Definition: Command.h:141
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
Definition: Command.cxx:108
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
Definition: Command.h:264
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
Definition: Command.cxx:175
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Buffer GetRawData()
Returns reference on raw data Can be called only once - raw data reference will be cleaned.
Definition: Command.cxx:347
void RemoveReceiver()
Definition: Command.h:268
void SetPriority(int prio)
Set command priority, defines how fast command should be treated In special cases priority allows to ...
Definition: Command.h:213
std::string GetReceiver() const
Definition: Command.h:267
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
Definition: Command.cxx:168
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
Definition: Command.cxx:158
std::string NodeName(unsigned id)
returns nodename of specified context
Definition: ConfigBase.cxx:349
Interface class between xml configuration and dabc objects.
Definition: ConfigIO.h:38
bool FindItem(const char *name)
Definition: ConfigIO.cxx:42
bool CheckAttr(const char *name, const char *value)
Check if item, found by FindItem routine, has attribute with specified value.
Definition: ConfigIO.cxx:63
Full-functional class to reading configuration from xml files.
Definition: Configuration.h:34
double GetHaltTime()
Returns time, required to halt DABC process.
int MgrNodeId() const
Definition: Configuration.h:57
std::string ThreadsLayout()
std::string MgrHost() const
Definition: Configuration.h:54
std::string InitFuncName()
static std::string GetLocalHost()
int MgrNumNodes() const
Definition: Configuration.h:58
Connections manager class.
Connection request.
void SetConnDevice(const std::string &dev)
std::string GetConnDevice() const
Device name which may be used to create connection (depends from url)
Module provides CPU information
Definition: CpuInfoModule.h:34
Interface for implementing any kind of data input.
Definition: DataIO.h:61
Interface for implementing any kind of data output.
Definition: DataIO.h:158
Reference on dabc::Device class
Definition: Device.h:79
Factory for user-specific classes
Definition: Factory.h:47
static void * FindSymbol(const std::string &symbol)
Definition: Factory.cxx:49
virtual Reference CreateObject(const std::string &classname, const std::string &objname, Command cmd)
Factory method to create object.
Definition: Factory.h:58
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
uint64_t GetVersion() const
Returns actual version of hierarchy entry.
Definition: Hierarchy.h:349
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Definition: Hierarchy.cxx:934
bool ReadFromBuffer(const dabc::Buffer &buf)
Read hierarchy from buffer.
Definition: Hierarchy.cxx:896
void SetVersion(uint64_t v)
Change version of the item, only for advanced usage.
Definition: Hierarchy.h:352
Iterator over objects hierarchy
Definition: Iterator.h:36
bool next_cast(T *&ptr, bool goinside=true)
Definition: Iterator.h:60
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
Lock guard for posix mutex.
Definition: threads.h:127
virtual void LogFile(const char *fname)
Definition: logging.cxx:210
static Logger * Instance()
Definition: logging.h:104
static void DisableLogReopen()
Definition: logging.cxx:405
static void CheckTimeout()
Definition: logging.cxx:398
Reference on manager object
Definition: Manager.h:560
WorkerRef FindDevice(const std::string &name)
Definition: Manager.cxx:2013
void StopModule(const std::string &modulename)
Definition: Manager.cxx:2028
bool ActivateConnections(double tmout, bool sync=true)
Definition: Manager.cxx:2197
ThreadRef CurrentThread()
Returns reference on the thread, which is now active.
Definition: Manager.cxx:1995
DataInput * CreateDataInput(const std::string &kind)
Create data input, using factories methods.
Definition: Manager.cxx:2260
Reference FindPool(const std::string &name)
Definition: Manager.cxx:2064
void StopApplication()
Definition: Manager.cxx:2231
ApplicationRef app()
Definition: Manager.cxx:2074
ModuleRef CreateModule(const std::string &classname, const std::string &modulename, const std::string &thrdname="")
Definition: Manager.cxx:1981
bool CreateTransport(const std::string &portname, const std::string &transportkind="", const std::string &thrdname="")
Definition: Manager.cxx:2210
Reference CreateObject(const std::string &classname, const std::string &objname)
Definition: Manager.cxx:2251
Reference FindPort(const std::string &port)
Definition: Manager.cxx:2059
ConnectionRequest Connect(const std::string &port1, const std::string &port2)
Request connection between two ports.
Definition: Manager.cxx:2138
bool DeleteModule(const std::string &name)
Definition: Manager.cxx:2043
void Sleep(double tmout, const char *prefix=0)
Sleep for specified time, keep thread event loop running See Manager::Sleep() method for more details...
Definition: Manager.cxx:2269
void StartModule(const std::string &modulename)
Definition: Manager.cxx:2023
Parameter FindPar(const std::string &parname)
Definition: Manager.cxx:2069
bool DeletePool(const std::string &name)
Definition: Manager.cxx:2048
bool CleanupApplication()
Method safely deletes all object created for application - modules, devices, memory pools.
Definition: Manager.cxx:2079
Reference FindItem(const std::string &name)
Definition: Manager.cxx:2054
bool DeleteDevice(const std::string &devname)
Definition: Manager.cxx:2007
bool StartAllModules()
Definition: Manager.cxx:2033
bool ParameterEventSubscription(Worker *ptr, bool subscribe, const std::string &mask, bool onlychangeevent=true)
Definition: Manager.cxx:2096
int NumNodes() const
Definition: Manager.cxx:2091
bool CreateApplication(const std::string &classname="", const std::string &appthrd="")
Definition: Manager.cxx:1975
bool IsLocalItem(const std::string &name)
Returns true, if name of the item should specify name in local context or from remote node.
Definition: Manager.cxx:2131
bool CreateMemoryPool(const std::string &poolname, unsigned buffersize=0, unsigned numbuffers=0)
Generic method to create memory pool.
Definition: Manager.cxx:2239
bool StopAllModules()
Definition: Manager.cxx:2038
void * CreateAny(const std::string &classname, const std::string &objname="")
Definition: Manager.cxx:2219
ModuleRef FindModule(const std::string &name)
Definition: Manager.cxx:2018
bool CreateDevice(const std::string &classname, const std::string &devname)
Definition: Manager.cxx:2001
bool CreatePublisher()
Create publisher, which manage all published hierarchies.
Definition: Manager.cxx:2315
bool CreateControl(bool withserver, int serv_port=0, bool allow_clients=true)
Create command channel Parameter withserver defines if server socket will be created,...
Definition: Manager.cxx:2277
ThreadRef CreateThread(const std::string &thrdname, const std::string &classname="", const std::string &devname="")
Definition: Manager.cxx:1988
int NodeId() const
Definition: Manager.cxx:2086
Manager of everything in DABC
Definition: Manager.h:291
WorkerRef DoCreateModule(const std::string &classname, const std::string &modulename, Command cmd)
Definition: Manager.cxx:824
ModuleRef FindModule(const std::string &name)
Definition: Manager.cxx:713
static const char * ConnMgrName()
Definition: Manager.h:478
bool IsAnyModuleRunning()
Definition: Manager.cxx:684
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
Definition: Manager.cxx:756
std::string ComposeAddress(const std::string &server, const std::string &itemtname="")
Provides string, which can be used as receiver argument.
Definition: Manager.cxx:1439
void Sleep(double tmout, const char *prefix=0)
Perform sleeping with event loop running.
Definition: Manager.cxx:1375
void ProduceParameterEvent(ParameterContainer *par, int evid)
Definition: Manager.cxx:506
bool fAppFinished
when true, reply from application was received
Definition: Manager.h:341
std::string GetLastCreatedDevName()
Definition: Manager.cxx:1249
virtual void Print(int lvl=0)
Displays on std output list of running threads and modules.
Definition: Manager.cxx:1312
Reference DoCreateObject(const std::string &classname, const std::string &objname="", Command cmd=nullptr)
Definition: Manager.cxx:863
std::string fLastCreatedDevName
name of last created device, automatically used for connection establishing
Definition: Manager.h:373
static void ProcessFactory(Factory *factory)
Definition: Manager.cxx:1561
std::string fCfgHost
Definition: Manager.h:361
ThreadsLayout fThrLayout
defines distribution of threads
Definition: Manager.h:371
static Factory * fFirstFactories[10]
first factories, which are comming before manager is created
Definition: Manager.h:309
virtual bool Find(ConfigIO &cfg)
Method to locate object in xml file.
Definition: Manager.cxx:1880
WorkerRef FindDevice(const std::string &name)
Definition: Manager.cxx:734
bool UnregisterDependency(Object *src, Object *tgt, bool bidirectional=false)
Unregister dependency between objects.
Definition: Manager.cxx:1514
DependPairList * fDepend
Definition: Manager.h:358
static int fInstanceId
magic number which indicates that instance is initialized
Definition: Manager.h:308
ApplicationRef app()
Definition: Manager.cxx:741
Reference FindItem(const std::string &itemname, bool islocal=false)
Find object in manager hierarchy with specified itemname.
Definition: Manager.cxx:697
static const char * AppThrdName()
Definition: Manager.h:477
TimeStamp fMgrStoppedTime
indicate when manager mainloop was stopped
Definition: Manager.h:340
@ MagicInstanceId
Definition: Manager.h:302
void RunManagerCmdLoop(double runtime=0., const std::string &remnode="")
Runs manager command loop - when command shell is used.
Definition: Manager.cxx:1713
ReferencesVector * fDestroyQueue
Definition: Manager.h:346
virtual void ProcessEvent(const EventId &)
Definition: Manager.cxx:666
bool ProcessParameterEvents()
Definition: Manager.cxx:549
ThreadRef FindThread(const std::string &name, const std::string &required_class="")
Definition: Manager.cxx:1899
static const char * CmdChlName()
Definition: Manager.h:479
bool ProcessDestroyQueue()
Definition: Manager.cxx:425
Reference FindPort(const std::string &name)
Definition: Manager.cxx:719
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Manager.cxx:877
static void SetAutoDestroy(bool on)
Method set flag if manager instance should be destroyed when process finished.
Definition: Manager.cxx:239
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Definition: Manager.cxx:1279
bool DoCleanupApplication()
Definition: Manager.cxx:1363
ParamEventReceiverList * fParEventsReceivers
list of workers, registered for receiving of parameter events Used only from manager thread,...
Definition: Manager.h:355
ReferencesVector * fTimedPars
Definition: Manager.h:351
static const char * MgrThrdName()
Definition: Manager.h:476
bool DoCreateMemoryPool(Command cmd)
Definition: Manager.cxx:1255
void ProcessPipeSignal()
Definition: Manager.cxx:1615
@ evntDestroyObj
Definition: Manager.h:315
@ evntManagerParam
Definition: Manager.h:315
bool RegisterDependency(Object *src, Object *tgt, bool bidirectional=false)
Register dependency between objects.
Definition: Manager.cxx:1493
static int fFirstFactoriesId
magic number which should be set when fFirstFactories initialized for the first time
Definition: Manager.h:310
RecordsQueue< ParamRec > fParsQueue
Definition: Manager.h:348
void ProcessCtrlCSignal()
Definition: Manager.cxx:1589
Reference GetThreadsFolder(bool force=false)
Return reference on folder with all registered threads.
Definition: Manager.h:430
Reference FindPool(const std::string &name)
Definition: Manager.cxx:727
WorkerRef GetCommandChannel()
Return reference on command channel.
Definition: Manager.h:416
void RunManagerMainLoop(double runtime=0.)
Runs manager mainloop.
Definition: Manager.cxx:1631
std::string GetNodeAddress(int nodeid)
Return address of the node to be able communicate with it.
Definition: Manager.cxx:1421
virtual double ProcessTimeout(double last_diff)
Definition: Manager.cxx:1541
std::string fLocalAddress
Identifier for the current application Only set when control instance is created Depending on configu...
Definition: Manager.h:369
Configuration * fCfg
Definition: Manager.h:360
ThreadRef CurrentThread()
Definition: Manager.cxx:1910
Mutex * fMgrMutex
Definition: Manager.h:344
void HaltManager()
Delete all modules and stop manager thread.
Definition: Manager.cxx:371
std::string GetLocalAddress()
Return address of current application.
Definition: Manager.cxx:1434
ThreadsLayout GetThreadsLayout() const
Definition: Manager.h:498
void FillItemName(const Object *ptr, std::string &itemname, bool compact=true)
Method should be used to produce name of object, which can be used as item name in different Find met...
Definition: Manager.cxx:746
virtual ~Manager()
Definition: Manager.cxx:320
Configuration * cfg() const
Definition: Manager.h:502
ThreadRef DoCreateThread(const std::string &thrdname, const std::string &classname="", const std::string &devname="", Command cmd=nullptr)
Create thread with specified name and class name.
Definition: Manager.cxx:1923
static Manager * fInstance
pointer on current manager instance
Definition: Manager.h:307
bool DecomposeAddress(const std::string &url, bool &islocal, std::string &server, std::string &itemtname)
From address like dabc://nodeabc:988/item/subtim extracts server (with port) and itemname If server n...
Definition: Manager.cxx:1454
int fNumNodes
Definition: Manager.h:364
Reference GetFactoriesFolder(bool force=false)
Return reference on folder with factories.
Definition: Manager.h:423
virtual bool DestroyObject(Reference ref)
Delete derived from Object class object in manager thread.
Definition: Manager.cxx:1317
Reference on dabc::MemoryPool class
Definition: MemoryPool.h:245
WorkerRef GetModule() const
Definition: ModuleItem.cxx:66
Reference on dabc::Module class
Definition: Module.h:275
std::string OutputName(unsigned n=0, bool itemname=true)
Return item name of the output, can be used in connect command.
Definition: Module.cxx:1066
bool Start()
Definition: Module.h:287
PortRef FindPort(const std::string &name)
Return reference on the port.
Definition: Module.cxx:1031
bool ConnectPoolHandles()
Method called by manager to establish connection to pools TODO: while used from devices,...
Definition: Module.cxx:999
std::string InputName(unsigned n=0, bool itemname=true)
Return item name of the input, can be used in connect command.
Definition: Module.cxx:1056
Base for dabc::ModuleSync and dabc::ModuleAsync classes.
Definition: Module.h:42
bool IsRunning() const
Returns true if module if running.
Definition: Module.h:109
Multiplexer module.
posix pthread mutex
Definition: threads.h:61
Base class for most of the DABC classes.
Definition: Object.h:116
void FillFullName(std::string &fullname, Object *upto, bool exclude_top_parent=false) const
Method used to produce full item name,.
Definition: Object.cxx:1061
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Object.cxx:1076
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
Mutex * ObjectMutex() const
Returns mutex, used for protection of Object data members.
Definition: Object.h:190
static void InspectGarbageCollector()
\ brief Methods to inspect how many objects pointers are remained
Definition: Object.cxx:1099
int fObjectRefCnt
accounts how many references existing on the object, thread safe
Definition: Object.h:179
static bool NameMatch(const std::string &name, const std::string &mask)
Check if name matches to specified mask.
Definition: Object.cxx:1004
Reference FindChildRef(const char *name, bool force=false) const
returns reference on child object with given name
Definition: Object.cxx:735
bool RemoveChilds(bool cleanup=true)
Remove all childs.
Definition: Object.cxx:821
virtual void ObjectDestroyed(Object *)
Method called by the manager when registered dependent object is destroyed Should be used in user cla...
Definition: Object.h:226
bool IsLogging() const
Return true if object selected for logging, thread safe
Definition: Object.cxx:173
friend class Manager
Definition: Object.h:117
void SetCleanupBit()
Method set cleanup bit that object will be cleaned up in all registered objects Used only by manager ...
Definition: Object.cxx:297
Container for parameter object.
Definition: Parameter.h:52
void ProcessTimeout(double last_dif)
Method called from manager thread when parameter configured as asynchronous.
Definition: Parameter.cxx:216
bool IsDeliverAllEvents() const
If true, all events must be delivered to the consumer.
Definition: Parameter.cxx:67
Parameter class
Definition: Parameter.h:163
bool TakeAttrModified()
Returns true if any parameter attribute was modified since last call to this method.
Definition: Parameter.cxx:346
RecordField Value() const
Returns parameter value.
Definition: Parameter.h:202
int ExecuteChange(Command cmd)
Specifies that parameter produce 'modified' events synchronous with changes of parameter.
Definition: Parameter.cxx:508
bool NeedTimeout()
Returns true if parameter object requires timeout processing.
Definition: Parameter.cxx:339
Specialized vector with pointers.
Definition: Queue.h:340
Reference on the dabc::Port class
Definition: Port.h:195
bool IsOutput() const
Returns true if it is output port.
Definition: Port.h:202
bool IsConnected()
Returns true if port is connected.
Definition: Port.cxx:250
bool IsInput() const
Returns true if it is input port.
Definition: Port.h:199
PortRef GetBindPort()
Return reference on the bind port.
Definition: Port.cxx:242
bool Disconnect(bool witherr=false)
Disconnect port
Definition: Port.cxx:233
ConnectionRequest MakeConnReq(const std::string &url, bool isserver=false)
Create connection request to specified url.
Definition: Port.cxx:259
Module manages published hierarchies and provide optimize access to them
Definition: Publisher.h:146
static const char * DfltName()
Definition: Publisher.h:214
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
std::string SaveToXml(unsigned mask=0)
Store record in XML form.
Definition: Record.cxx:1666
Reference on the arbitrary object
Definition: Reference.h:73
bool AddChild(Object *obj)
Add child to list of object children.
Definition: Reference.cxx:188
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * ClassName() const
Return class name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:172
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
Reference FindChild(const char *name) const
Searches for child in referenced object.
Definition: Reference.cxx:210
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
unsigned NumReferences() const
Returns number of references on the object.
Definition: Reference.cxx:112
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Reference.cxx:241
void Destroy()
Release reference and starts destroyment of referenced object.
Definition: Reference.cxx:148
Vector of dabc::Reference objects.
unsigned GetSize() const
Returns number of items in vector.
bool HasObject(Object *ptr)
Return true if vector has pointer on the object.
Reference TakeLast()
Remove last reference from vector.
bool Clear(bool asowner=false)
Clear all references, if owner specified objects will be destroyed.
bool Add(Reference &ref)
Add reference to the vector.
bool Remove(Object *obj)
Remove reference on specified object
Object * GetObject(unsigned n) const
Returns pointer on the object.
Repeater module.
Factory for socket classes
Definition: SocketFactory.h:30
virtual Module * CreateModule(const std::string &classname, const std::string &modulename, Command cmd)
Factory method to create module.
Definition: Manager.cxx:149
virtual DataOutput * CreateDataOutput(const std::string &typ)
Factory method to create data output.
Definition: Manager.cxx:186
virtual Reference CreateThread(Reference parent, const std::string &classname, const std::string &thrdname, const std::string &thrddev, Command cmd)
Factory method to create thread.
Definition: Manager.cxx:175
virtual Reference CreateObject(const std::string &classname, const std::string &objname, dabc::Command cmd)
Factory method to create object.
Definition: Manager.cxx:166
virtual DataInput * CreateDataInput(const std::string &typ)
Factory method to create data input.
Definition: Manager.cxx:199
StdManagerFactory(const std::string &name)
Definition: Manager.cxx:46
Reference on the dabc::Thread class
Definition: Thread.h:482
void RunEventLoop(double tmout=1.)
Runs thread event loop for specified time.
Definition: Thread.cxx:1406
bool IsRealThrd() const
Definition: Thread.h:505
bool IsItself() const
Definition: Thread.h:496
Represent thread functionality.
Definition: Thread.h:109
static unsigned NumThreadInstances()
Definition: Thread.h:331
Uniform Resource Locator interpreter.
Definition: Url.h:33
bool SetUrl(const std::string &url, bool showerr=true)
Definition: Url.cxx:46
std::string GetFullName() const
Definition: Url.cxx:124
bool GetOptionBool(const std::string &optname, bool dflt=false) const
Definition: Url.cxx:314
std::string GetFileName() const
Definition: Url.h:62
int GetOptionInt(const std::string &optname, int dflt=0) const
Definition: Url.cxx:290
std::string GetProtocol() const
Definition: Url.h:57
std::string GetHostNameWithPort(int dfltport=0) const
Definition: Url.cxx:139
std::string GetOptions() const
Definition: Url.h:63
double GetOptionDouble(const std::string &optname, double dflt=0.) const
Definition: Url.cxx:302
bool IsValid() const
Definition: Url.h:55
Reference on dabc::Worker
Definition: Worker.h:466
std::string ThreadName() const
Returns thread name of worker assigned.
Definition: Worker.h:489
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
Definition: Worker.h:482
bool Execute(Command cmd, double tmout=-1.)
Definition: Worker.cxx:1147
bool MakeThreadForWorker(const std::string &thrdname="")
Definition: Worker.h:498
bool Submit(Command cmd)
Definition: Worker.cxx:1139
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
Definition: Worker.cxx:650
bool ActivateTimeout(double tmout_sec)
Method used to produce timeout events in the worker.
Definition: Worker.cxx:385
@ priorityMagic
Definition: Worker.h:210
static int cmd_bool(bool v)
Definition: Worker.h:197
virtual void ProcessEvent(const EventId &)
Definition: Worker.cxx:499
bool Submit(Command cmd)
Submit command for execution in the processor.
Definition: Worker.cxx:960
bool Execute(Command cmd, double tmout=-1.)
Execute command in the processor.
Definition: Worker.cxx:877
Command Assign(Command cmd)
! Assign command with processor before command be submitted to other processor This produce ReplyComm...
Definition: Worker.cxx:933
bool MakeThreadForWorker(const std::string &thrdname="")
Creates appropriate thread for worker and assign worker to the thread.
Definition: Worker.cxx:302
std::string ThreadName() const
Returns name of the worker thread; thread-safe
Definition: Worker.cxx:186
ThreadRef thread()
Return reference on the worker thread; thread-safe.
Definition: Worker.cxx:158
bool DettachFromThread()
Detach worker from the thread, later worker can be assigned to some other thread Method especially us...
Definition: Worker.cxx:372
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Definition: Worker.cxx:856
bool FireEvent(uint16_t evid)
Definition: Worker.h:341
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
Event manipulation API.
Definition: api.h:23
void Sleep(double tm)
Definition: timing.cxx:129
@ layoutMaximal
Definition: Manager.h:278
@ layoutPerModule
Definition: Manager.h:276
@ layoutBalanced
Definition: Manager.h:277
@ layoutMinimalistic
Definition: Manager.h:275
Hierarchy GetNodeHierarchy(const std::string &nodeaddr)
Function request hierarchy of objects on remote node.
Definition: api.cxx:162
TimeStamp Now()
Definition: timing.h:260
void formats(std::string &sbuf, const char *fmt,...)
Definition: string.cxx:26
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
@ parConfigured
event only for manager, used to react on reconfiguration of parameter
Definition: Parameter.h:42
@ parCreated
produced once when parameter is created
Definition: Parameter.h:41
@ parModified
produced when parameter value modified. Either every change or after time interval (default = 1 sec)
Definition: Parameter.h:43
@ parDestroy
produced once when parameter is destroyed
Definition: Parameter.h:44
const char * xmlAppDfltName
Definition: ConfigBase.cxx:31
@ do_Ok
Definition: DataIO.h:142
@ do_Error
Definition: DataIO.h:147
const char * xmlThreadAttr
Definition: ConfigBase.cxx:38
const char * xmlContext
Definition: ConfigBase.cxx:29
const char * typeThread
Definition: Object.cxx:77
void SetDebugPrefix(const char *prefix=0)
Definition: logging.cxx:478
const char * xmlPoolName
Definition: Object.cxx:45
const char * xmlClassAttr
Definition: ConfigBase.cxx:36
std::string MakeNodeName(const std::string &arg)
Function creates node name, which can be supplied as receiver of dabc commands.
Definition: api.cxx:124
const char * xmlNameAttr
Definition: ConfigBase.cxx:33
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_timedout
Definition: Command.h:40
@ cmd_true
Definition: Command.h:38
const char * xmlHostAttr
Definition: ConfigBase.cxx:34
bool str_to_int(const char *val, int *res)
Convert string to integer value.
Definition: string.cxx:142
@ defaultDabcPort
Definition: ConfigBase.h:92
const char * typeApplication
Definition: Object.cxx:81
Keeps dependency between two objects.
Definition: Manager.cxx:67
Object * tgt
when this object deleted, DependPair::src will be informed
Definition: Manager.cxx:69
Reference src
reference on object which want to be informed when DependPair::tgt object is deleted
Definition: Manager.cxx:68
int fire
how to proceed pair 0 - remain, 1 - inform src, 2 - just delete
Definition: Manager.cxx:70
DependPair(Object *_src, Object *_tgt)
Definition: Manager.cxx:73
DependPair(const DependPair &d)
Definition: Manager.cxx:74
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
std::string asstring() const
Definition: Thread.cxx:37
uint16_t GetCode() const
Definition: Thread.h:92
Parameter par
reference on the parameter
Definition: Manager.h:318
WorkerRef recv
only workers can be receiver of the parameters events
Definition: Manager.cxx:81
std::string remote_recv
address of remote receiver of parameter events
Definition: Manager.cxx:82
bool match(const std::string &parname, int event, const std::string &fullname)
Definition: Manager.cxx:97
bool only_change
specify if only parameter-change events are produced
Definition: Manager.cxx:83
std::string name_mask
mask only for parameter names, useful when only specific names are interested
Definition: Manager.cxx:84
std::string fullname_mask
mask for parameter full names, necessary when full parameter name is important
Definition: Manager.cxx:85
int queue
number of parameters events submitted, if bigger than some limit events will be skipped
Definition: Manager.cxx:86
Class for acquiring and holding timestamps.
Definition: timing.h:40
double SpentTillNow() const
Method return time in second, spent from the time kept in TimeStamp instance If time was not set befo...
Definition: timing.h:144
bool null() const
Returns true if time stamp is not initialized or its value less than 0.
Definition: timing.h:131
bool Expired(double interval=0.) const
Method returns true if specified time interval expired relative to time, kept in TimeStamp instance.
Definition: timing.h:163
void GetNow()
Method to acquire current time stamp.
Definition: timing.h:137
#define DABC_LOCKGUARD(mutex, info)
Definition: threads.h:117