20 #include <sys/types.h>
27 #include "dabc/defines.h"
97 bool match(
const std::string &parname,
int event,
const std::string &fullname)
151 if (classname ==
"dabc::CpuInfoModule")
154 if (classname ==
"dabc::ConnectionManager")
157 if (classname ==
"dabc::MultiplexerModule")
160 if (classname ==
"dabc::RepeaterModule")
168 if (classname ==
"dabc::Publisher")
179 if (classname.empty() || (classname ==
typeThread))
180 thrd =
new Thread(parent, thrdname, cmd);
224 printf(
"Destroy manager\n");
227 printf(
"Destroy manager done\n");
286 if (!layout.empty())
DOUT0(
"Set threads layout to %s", layout.c_str());
329 DOUT3(
"~Manager -> DestroyQueue");
332 EOUT(
"Manager destroy queue is not empty");
340 EOUT(
"Manager timed parameters list not empty");
347 DOUT3(
"~Manager -> ~fDepend");
350 EOUT(
"Dependencies parameters list not empty");
354 DOUT3(
"~Manager -> ~fMgrMutex");
359 DOUT1(
"Normal EXIT");
363 EOUT(
"What ??? !!!");
389 if (halttime<=0.) halttime = 0.7;
400 thrd()->SingleLoop(0, 0.001);
401 if ((thrd()->TotalNumberOfEvents()==0) || tm1.
Expired(halttime*0.7)) thrd.
Release();
413 DOUT1(
"All threads stopped after %5.3f s (loop count = %d)", tm2-tm1, cnt);
416 #ifdef DABC_EXTRA_CHECKS
417 dabc::Object::DebugObject();
443 DependPairList::iterator iter =
fDepend->begin();
444 while (iter !=
fDepend->end()) {
445 if (iter->fire != 0) {
446 if (iter->fire & 2) {
447 clr_vect.
Add(iter->src);
448 ptr_vect.push_back(iter->tgt);
450 if (iter->fire & 1) {
451 rel_vect.
Add(iter->src);
461 for (
unsigned n=0;n<clr_vect.
GetSize();n++) {
466 DOUT0(
"Submit worker %p to thread", obj);
468 cmd.
SetPtr(
"#Object", ptr_vect[n]);
471 if (w->
Submit(cmd))
continue;
482 if (vect==0)
return false;
493 DOUT3(
"MGR: Process destroy QUEUE vect = %u", (vect ? vect->
GetSize() : 0));
569 bool checkadd(
false), checkremove(
false);
578 checkremove = !checkadd;
591 rec.
par()->SetCleanupBit();
617 std::string fullname, value;
627 if (value.length()==0)
630 if (iter->queue > 1000) {
631 EOUT(
"Too many events for receiver %s - block any following", iter->recv.GetName());
635 if (!iter->recv.null() && !iter->recv.CanSubmitCommand()) {
636 DOUT4(
"receiver %s cannot be used to submit command - ignore", iter->recv.GetName());
643 evnt.
SetPtr(
"#Iterator", &(*iter));
644 evnt.
SetBool(
"#no_warnings",
true);
650 if (iter->remote_recv.length() > 0) {
654 iter->recv.Submit(evnt);
699 std::string server, itemname;
705 if (!islocal)
return 0;
708 if (itemname.empty())
return 0;
752 if (!compact) itemname =
"/";
766 std::string server,itemname;
770 if (cmd.
GetBool(
"#local_cmd")) islocal =
true;
773 DOUT3(
"MGR: Preview command %s item %s tgtnode %s", cmd.
GetName(), url.c_str(), server.c_str());
779 EOUT(
"Cannot submit command to remote server %s", server.c_str());
783 if (!itemname.empty()) {
787 if (!worker.
null()) {
793 if (cmd.
IsName(CmdSetParameter::CmdName())) {
802 EOUT(
"Did not found receiver %s for command %s", itemname.c_str(), cmd.
GetName());
812 #define FOR_EACH_FACTORY(arg) \
814 ReferencesVector factories; \
815 GetFactoriesFolder().GetAllChildRef(&factories); \
816 Factory* factory = nullptr; \
817 for (unsigned n=0; n<factories.GetSize(); n++) { \
818 factory = dynamic_cast<Factory*> (factories.GetObject(n)); \
819 if (factory==0) continue; \
829 DOUT4(
"Module name %s already exists", modulename.c_str());
834 mdl = factory->CreateModule(classname, modulename, cmd);
835 if (!mdl.
null())
break;
839 EOUT(
"Cannot create module of type %s", classname.c_str());
845 if (thrdname.empty())
851 default: thrdname = modulename +
"Thrd";
break;
869 ref = factory->CreateObject(classname, objname, cmd);
870 if (!ref.
null())
break;
891 if (cmd.
IsName(
"InitFactories")) {
893 factory->Initialize();
897 if (cmd.
IsName(CmdCreateApplication::CmdName())) {
898 std::string classname = cmd.
GetStr(
"AppClass");
905 DOUT2(
"Application of class %s already exists", classname.c_str());
911 appref = factory->CreateApplication(classname, cmd);
912 if (!appref.
null())
break;
938 if (appref.
null())
EOUT(
"Cannot create application of class %s", classname.c_str());
939 else DOUT2(
"Application of class %s thrd %s created", classname.c_str(), appthrd.c_str());
944 std::string classname = cmd.
GetStr(
"DevClass");
945 std::string devname = cmd.
GetStr(
"DevName");
946 if (devname.empty()) devname = classname;
954 DOUT4(
"Device %s of class %s already exists", devname.c_str(), classname.c_str());
957 EOUT(
"Device %s has other class name %s than requested", devname.c_str(), dev.
ClassName(), classname.c_str());
961 dev = factory->CreateDevice(classname, devname, cmd);
962 if (!dev.
null())
break;
966 EOUT(
"Cannot create device %s of class %s", devname.c_str(), classname.c_str());
971 if (thrdname.empty())
977 default: thrdname = devname +
"Thrd";
break;
986 if ((cmd_res==
cmd_true) && !devname.empty()) {
992 std::string devname = cmd.
GetStr(
"DevName");
1000 if (cmd.
IsName(CmdCreateTransport::CmdName())) {
1006 std::string portname = crcmd.
PortName();
1009 if (trkind.empty()) {
1010 trkind = port.
Cfg(
"url", cmd).
AsStr();
1015 bool hasoptions = url.
GetOptions().length() > 0;
1017 for (
int cnt = 0; cnt < 3; cnt++) {
1018 std::string optname =
"urlopt";
1020 std::string tropt = port.
Cfg(optname, cmd).
AsStr();
1021 if (tropt.length() > 0) {
1022 trkind.append(hasoptions ?
"&" :
"?");
1023 trkind.append(tropt);
1031 EOUT(
"Ports %s not found - cannot create transport", crcmd.
PortName().c_str());
1035 if (trkind.empty()) {
1048 cmd.
SetStr(
"url", trkind);
1054 if (!port2.
null()) {
1067 DOUT1(
"%s create %s", port.
ItemName().c_str(), trkind.c_str());
1071 tr = factory->CreateTransport(port, trkind, cmd);
1072 if (!tr.
null())
break;
1076 if (portname != crcmd.
PortName()) {
1077 DOUT0(
"Original port name %s for created transport was changed on the fly by %s", portname.c_str(), crcmd.
PortName().c_str());
1084 if (thrdname.empty())
1093 DOUT3(
"Creating thread %s for transport", thrdname.c_str());
1096 EOUT(
"Fail to create thread for transport");
1111 if (cmd.
IsName(CmdDestroyTransport::CmdName())) {
1117 if (cmd.
IsName(CmdCreateAny::CmdName())) {
1120 res = factory->CreateAny(cmd.
GetStr(
"ClassName"), cmd.
GetStr(
"ObjectName"), cmd);
1121 if (res != 0)
break;
1123 cmd.
SetPtr(
"ObjectPtr", res);
1129 const std::string &thrdclass = cmd.
GetStr(
"ThrdClass");
1130 const std::string &thrddev = cmd.
GetStr(
"ThrdDev");
1142 if (cmd.
IsName(CmdCreateMemoryPool::CmdName())) {
1145 if (cmd.
IsName(CmdCreateObject::CmdName())) {
1149 if (cmd.
IsName(CmdCreateDataInput::CmdName())) {
1150 std::string kind = cmd.
GetStr(
"Kind");
1153 res = factory->CreateDataInput(kind);
1154 if (res != 0)
break;
1156 cmd.
SetPtr(
"DataInput", res);
1159 if (cmd.
IsName(CmdCleanupApplication::CmdName())) {
1167 if (name.compare(
"*")==0)
1186 DOUT2(
"Stop and delete module done");
1192 if (cmd.
IsName(
"Print")) {
1205 if (cmd.
IsName(CmdGetNodesState::CmdName())) {
1210 if (cmd.
IsName(
"Ping")) {
1211 DOUT2(
"!!! PING !!!");
1214 if (cmd.
IsName(
"ParameterEventSubscription")) {
1216 std::string mask = cmd.
GetStr(
"Mask");
1217 std::string remote = cmd.
GetStr(
"RemoteWorker");
1219 if (cmd.
GetBool(
"IsSubscribe")) {
1233 if ((iter->name_mask==mask) && (iter->recv == worker) && (iter->remote_recv == remote))
1241 if (cmd.
IsName(
"StopManagerMainLoop")) {
1257 if (cmd.
null())
return false;
1260 if (poolname.empty()) {
1261 EOUT(
"Pool name is not specified");
1269 ref()->Reconstruct(cmd);
1272 ref()->AssignToThread(
thread());
1283 if (cmd.
IsName(CmdParameterEvent::CmdName())) {
1285 void* origin = cmd.
GetPtr(
"#Iterator");
1290 if (&(*iter) == origin) {
1293 DOUT2(
"Internal error - parameters event queue negative");
1298 DOUT2(
"Did not find original record with receiver for parameter events");
1303 if (cmd.
IsName(CmdStateTransition::CmdName())) {
1321 if (!obj)
return true;
1324 DOUT0(
"dabc::Manager::DestroyObject %p %s", obj, obj->
GetName());
1332 DependPairList::iterator iter =
fDepend->begin();
1333 while (iter !=
fDepend->end()) {
1334 if (iter->src() == obj) {
1335 iter->fire = iter->fire | 1;
1337 DOUT0(
"Find object %p as dependency source", obj);
1340 if (iter->tgt == obj) {
1341 iter->fire = iter->fire | 2;
1343 DOUT0(
"Find object %p as dependency target", obj);
1377 if (tmout<=0.)
return;
1383 fprintf(stdout,
"%s ", prefix);
1384 int sec = lrint(tmout);
1386 fprintf(stdout,
"\b\b\b%3d", sec);
1390 fprintf(stdout,
"\n");
1396 fprintf(stdout,
"%s ", prefix);
1403 while ((remain = finish -
dabc::Now()) > 0) {
1406 fprintf(stdout,
"\b\b\b%3d", (
int) lrint(remain));
1414 fprintf(stdout,
"\b\b\b\n");
1425 if ((nodeid<0) || (nodeid>=
fNumNodes))
return std::string();
1427 if (!
fCfg)
return std::string();
1441 std::string res = server;
1443 if (res.empty()) res =
"localhost";
1445 if (res.find(
"dabc://")!=0) res = std::string(
"dabc://") + res;
1447 if (!itemname.empty()) {
1448 if (itemname[0]!=
'/') res +=
"/";
1461 if (!url.
SetUrl(addr,
false))
return false;
1470 if (url.
GetProtocol().compare(
"dabc")!=0)
return false;
1477 if (server.compare(0, 4,
"node")==0) {
1478 if (!
str_to_int(server.c_str() + 4, &nodeid)) nodeid = -1;
1483 if ((nodeid>=0) && (
fNodeId==nodeid)) islocal =
true;
else
1485 if (server ==
"localhost") islocal =
true;
1495 if ((src==0) || (tgt==0))
return false;
1509 if (!bidirectional)
return true;
1516 if ((src==0) || (tgt==0))
return false;
1522 DependPairList::iterator iter =
fDepend->begin();
1523 while (iter !=
fDepend->end()) {
1524 if ((iter->src() == src) && (iter->tgt == tgt)) {
1536 if (!bidirectional)
return true;
1563 if (factory==0)
return;
1594 DOUT0(
"Process CTRL-C signal");
1604 if (spent<10.)
return;
1606 DOUT0(
"Ctrl-C repeated more than after 10 sec out of main loop - force manager halt");
1610 DOUT0(
"Exit after Ctrl-C");
1620 DOUT0(
"Process signal SIGPIPE - Socket error from plug-in libraries?");
1633 DOUT2(
"Enter dabc::Manager::RunManagerMainLoop");
1636 if (thrd.
null())
return;
1639 DOUT1(
"Manager stopped before entering to the mainloop - stop running");
1644 DOUT0(
"Application mainloop will run for %3.1f s", runtime);
1646 DOUT0(
"Application mainloop is now running");
1647 DOUT0(
" Press Ctrl-C for stop");
1650 DOUT3(
"Manager has normal thread - just wait until application modules are stopped");
1652 DOUT3(
"Run manager thread mainloop inside main process");
1661 bool appstopped =
false;
1667 double period = 0.1;
1680 if ((runtime <= 0) || !starttm.
Expired(runtime))
continue;
1681 DOUT0(
"++++++++ Set stop time while runtime expired");
1695 DOUT2(
"Exit dabc::Manager::RunManagerMainLoop");
1706 FD_SET(STDIN_FILENO, &fds);
1707 select(STDIN_FILENO+1, &fds, NULL, NULL, &tv);
1708 return (FD_ISSET(STDIN_FILENO, &fds));
1715 DOUT0(
"Enter dabc::Manager::RunManagerCmdLoop");
1719 if (thrd.
null())
return;
1721 if (GetCommandChannel().
null()) {
1722 EOUT(
"No command channel found");
1727 DOUT3(
"Manager has normal thread - just wait until application modules are stopped");
1729 DOUT3(
"Run manager thread mainloop inside main process");
1733 ActivateTimeout(1.);
1738 std::string tgtnode;
1753 if ((runtime>0) && start.
Expired(runtime)) {
1754 DOUT0(
"run time is over");
1758 if (!fMgrStoppedTime.null()) {
1759 DOUT0(
"break command shell");
1765 if (first && !remnode.empty()) {
1767 str = std::string(
"connect ") + remnode;
1768 printf(
"cmd>%s\n",str.c_str());
1772 printf(
"cmd>"); fflush(stdout);
1773 std::getline(std::cin, str);
1776 if (str.empty())
continue;
1778 if ((str==
"quit") || (str==
"exit") || (str==
".q") || (str==
"q"))
break;
1782 EOUT(
"Wrong syntax %s", str.c_str());
1786 if (cmd.
IsName(
"connect")) {
1790 EOUT(
"Node name not specified correctly");
1800 int res = GetCommandChannel().Execute(cmd2);
1803 DOUT0(
"Connection to %s done", tgtnode.c_str());
1805 DOUT0(
"FAIL connection to %s", tgtnode.c_str());
1812 if (tgtnode.empty()) {
1813 DOUT0(
"Tgt node not connected, command %s not executed", cmd.
GetName());
1818 cmd.
SetStr(
"host", tgtnode);
1819 GetCommandChannel().Execute(cmd);
1825 if (cmd.
IsName(
"update")) {
1833 if (!rem_hierarchy.
null())
1836 DOUT0(
"No hierarchy available");
1841 std::string path = cmd.
GetStr(
"Arg0");
1842 int hlimit = cmd.
GetInt(
"Arg1");
1845 if (hlimit>0) query =
dabc::format(
"history=%d",hlimit);
1850 if (GetCommandChannel().Execute(cmd2)!=
cmd_true) {
1851 DOUT0(
"Fail to get item %s", path.c_str());
1860 DOUT0(
"GET:%s len:%d RES = \n%s", path.c_str(), hlimit, res.
SaveToXml().c_str());
1869 int res = GetCommandChannel().Execute(cmd);
1883 if (!fCfgHost.empty())
1886 if (fCfgHost != GetName())
1903 if (ref.
null())
return ref;
1905 if (!required_class.empty() && !ref()->CompatibleClass(required_class)) ref.
Release();
1914 if (GetThreadsFolder().GetAllChildRef(&vect))
1925 std::string newname = thrdname;
1930 while ((basecnt<1000) && !thrd.
null()) {
1931 if (thrd()->CompatibleClass(thrdclass))
return thrd;
1933 EOUT(
"Thread %s of class %s exists and incompatible with %s class",
1934 thrdname.c_str(), thrd()->ClassName(), (thrdclass.empty() ?
"---" : thrdclass.c_str()) );
1936 newname =
dabc::format(
"%s_%d", thrdname.c_str(), basecnt++);
1937 thrd = FindThread(newname);
1941 EOUT(
"Too many incompatible threads with name %s", thrdname.c_str());
1945 DOUT3(
"CreateThread %s of class %s, is any %p", newname.c_str(), (thrdclass.empty() ?
"---" : thrdclass.c_str()), thrd());
1948 thrd = factory->CreateThread(GetThreadsFolder(
true), thrdclass, newname, thrddev, cmd);
1949 if (!thrd.
null())
break;
1952 DOUT3(
"CreateThread %s done %p", newname.c_str(), thrd());
1954 bool noraml_thread =
true;
1955 if ((newname == MgrThrdName()) && cfg())
1956 noraml_thread = cfg()->NormalMainThread();
1961 if (!thrd()->Start(10, noraml_thread)) {
1962 EOUT(
"Thread %s cannot be started!!!", newname.c_str());
1966 DOUT3(
"Create thread %s of class %s thrd %p refcnt %d done", newname.c_str(), (thrdclass.empty() ?
"---" : thrdclass.c_str()), thrd(), thrd.
NumReferences());
1997 return GetObject() ? GetObject()->CurrentThread() :
dabc::ThreadRef();
2015 return GetObject() ? GetObject()->FindDevice(name) :
dabc::WorkerRef();
2020 return GetObject() ? GetObject()->FindModule(name) :
dabc::ModuleRef();
2056 return GetObject() ? GetObject()->FindItem(name) :
Reference();
2061 return GetObject() ? GetObject()->FindPort(portname) :
Reference();
2066 return GetObject() ? GetObject()->FindPool(name) :
Reference();
2071 return GetObject() ? GetObject()->FindItem(parname) :
Reference();
2088 return GetObject() ? GetObject()->NodeId() : 0;
2093 return GetObject() ? GetObject()->NumNodes() : 0;
2098 if (ptr == 0)
return false;
2104 std::string server, itemname;
2107 if (!DecomposeAddress(mask, islocal, server, itemname)) {
2108 EOUT(
"Wrong parameter mask %s", mask.c_str());
2112 Command cmd(
"ParameterEventSubscription");
2114 cmd.
SetBool(
"IsSubscribe", subscribe);
2115 cmd.
SetStr(
"Mask", mask);
2116 cmd.
SetBool(
"OnlyChange", onlychangeevent);
2120 cmd.
SetPtr(
"Worker", ptr);
2121 return Execute(cmd);
2134 if (!url.
SetUrl(name,
false))
return true;
2146 PortRef port1 = FindPort(port1name);
2147 PortRef port2 = FindPort(port2name);
2149 if (!port1.
null() && !port2.
null()) {
2157 if (IsLocalItem(port1name) && port1.
null()) {
2158 EOUT(
"Did not found port %s", port1name.c_str());
2162 if (IsLocalItem(port2name) && port2.
null()) {
2163 EOUT(
"Did not found port %s", port2name.c_str());
2169 if (GetObject()->GetCommandChannel().
null()) {
2170 EOUT(
"Not possible to establish remote connection without command channel");
2179 DOUT2(
"Connect ports %s %p %s %p", port1name.c_str(), port1(), port2name.c_str(), port2());
2200 if (sync) SyncWorker();
2203 if (conn.
null())
return true;
2212 PortRef port = FindPort(portname);
2214 if (port.
null())
return false;
2222 cmd.
SetStr(
"ClassName", classname);
2223 cmd.
SetStr(
"ObjectName", objname);
2225 if (Execute(cmd) !=
cmd_true)
return 0;
2227 return cmd.
GetPtr(
"ObjectPtr");
2240 unsigned buffersize,
2241 unsigned numbuffers)
2246 cmd.
SetMem(buffersize, numbuffers);
2248 return Execute(cmd);
2257 return cmd.
GetRef(
"Object");
2263 cmd.
SetStr(
"Kind", kind);
2264 if (!Execute(cmd))
return 0;
2272 GetObject()->Sleep(tmout, prefix);
2279 if (
null())
return false;
2282 if (!ref.
null())
return true;
2285 cmd.
SetBool(
"WithServer", withserver);
2286 cmd.
SetBool(
"ClientsAllowed", allow_clients);
2290 if (GetObject()->cfg()) {
2291 port = GetObject()->cfg()->MgrPort();
2292 host = GetObject()->cfg()->MgrHost();
2295 if (port<=0) port = serv_port;
2297 cmd.
SetStr(
"ServerHost", host);
2298 cmd.
SetInt(
"ServerPort", port);
2303 if (ref.
null())
return false;
2305 std::string localaddr = cmd.
GetStr(
"localaddr");
2306 if (!localaddr.empty() )
2307 GetObject()->fLocalAddress = localaddr;
2318 if (!ref.
null())
return true;
#define FOR_EACH_FACTORY(arg)
dabc::AutoDestroyClass auto_destroy_instance
Reference on dabc::Application class
Base class for user-specific applications.
void * ExternalFunction()
static const char * stHalted()
Binary file output object.
unsigned Write_Buffer(Buffer &buf)
Start writing of buffer to output.
BlockingOutput(const dabc::Url &url)
int fErrCounter
counter till error
Reference on memory from memory pool.
static const char * CmdName()
void SetMem(unsigned buffersize, unsigned numbuffers, unsigned align=0)
static const char * CmdName()
static const char * ThrdNameArg()
std::string GetThrdName() const
static const char * CmdName()
std::string PortName() const
std::string TransportKind() const
static const char * CmdName()
static const char * CmdName()
static const char * CmdName()
Command used to produce custom binary data for published in hierarchy entries.
static const char * ModuleArg()
This command used to distribute parameter event to receivers.
static const char * CmdName()
static const char * CmdName()
Represents command with its arguments.
bool ReadFromCmdString(const std::string &str)
Read command from string, which is typed in std output.
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
bool SetStr(const std::string &name, const char *value)
bool SetBool(const std::string &name, bool v)
bool SetInt(const std::string &name, int v)
std::string GetStr(const std::string &name, const std::string &dflt="") const
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
bool GetBool(const std::string &name, bool dflt=false) const
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
int GetInt(const std::string &name, int dflt=0) const
Buffer GetRawData()
Returns reference on raw data Can be called only once - raw data reference will be cleaned.
void SetPriority(int prio)
Set command priority, defines how fast command should be treated In special cases priority allows to ...
std::string GetReceiver() const
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
std::string NodeName(unsigned id)
returns nodename of specified context
Interface class between xml configuration and dabc objects.
bool FindItem(const char *name)
bool CheckAttr(const char *name, const char *value)
Check if item, found by FindItem routine, has attribute with specified value.
Full-functional class to reading configuration from xml files.
double GetHaltTime()
Returns time, required to halt DABC process.
std::string ThreadsLayout()
std::string MgrHost() const
std::string InitFuncName()
static std::string GetLocalHost()
Connections manager class.
void SetConnDevice(const std::string &dev)
std::string GetConnDevice() const
Device name which may be used to create connection (depends from url)
Module provides CPU information
Interface for implementing any kind of data output.
Reference on dabc::Device class
Factory for user-specific classes
static void * FindSymbol(const std::string &symbol)
virtual Reference CreateObject(const std::string &classname, const std::string &objname, Command cmd)
Factory method to create object.
Represents objects hierarchy of remote (or local) DABC process.
uint64_t GetVersion() const
Returns actual version of hierarchy entry.
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
bool ReadFromBuffer(const dabc::Buffer &buf)
Read hierarchy from buffer.
void SetVersion(uint64_t v)
Change version of the item, only for advanced usage.
Iterator over objects hierarchy
bool next_cast(T *&ptr, bool goinside=true)
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
Lock guard for posix mutex.
virtual void LogFile(const char *fname)
static Logger * Instance()
static void DisableLogReopen()
static void CheckTimeout()
Reference on manager object
WorkerRef FindDevice(const std::string &name)
void StopModule(const std::string &modulename)
bool ActivateConnections(double tmout, bool sync=true)
ThreadRef CurrentThread()
Returns reference on the thread, which is now active.
DataInput * CreateDataInput(const std::string &kind)
Create data input, using factories methods.
Reference FindPool(const std::string &name)
ModuleRef CreateModule(const std::string &classname, const std::string &modulename, const std::string &thrdname="")
bool CreateTransport(const std::string &portname, const std::string &transportkind="", const std::string &thrdname="")
Reference CreateObject(const std::string &classname, const std::string &objname)
Reference FindPort(const std::string &port)
ConnectionRequest Connect(const std::string &port1, const std::string &port2)
Request connection between two ports.
bool DeleteModule(const std::string &name)
void Sleep(double tmout, const char *prefix=0)
Sleep for specified time, keep thread event loop running See Manager::Sleep() method for more details...
void StartModule(const std::string &modulename)
Parameter FindPar(const std::string &parname)
bool DeletePool(const std::string &name)
bool CleanupApplication()
Method safely deletes all object created for application - modules, devices, memory pools.
Reference FindItem(const std::string &name)
bool DeleteDevice(const std::string &devname)
bool ParameterEventSubscription(Worker *ptr, bool subscribe, const std::string &mask, bool onlychangeevent=true)
bool CreateApplication(const std::string &classname="", const std::string &appthrd="")
bool IsLocalItem(const std::string &name)
Returns true, if name of the item should specify name in local context or from remote node.
bool CreateMemoryPool(const std::string &poolname, unsigned buffersize=0, unsigned numbuffers=0)
Generic method to create memory pool.
void * CreateAny(const std::string &classname, const std::string &objname="")
ModuleRef FindModule(const std::string &name)
bool CreateDevice(const std::string &classname, const std::string &devname)
bool CreatePublisher()
Create publisher, which manage all published hierarchies.
bool CreateControl(bool withserver, int serv_port=0, bool allow_clients=true)
Create command channel Parameter withserver defines if server socket will be created,...
ThreadRef CreateThread(const std::string &thrdname, const std::string &classname="", const std::string &devname="")
Manager of everything in DABC
WorkerRef DoCreateModule(const std::string &classname, const std::string &modulename, Command cmd)
ModuleRef FindModule(const std::string &name)
static const char * ConnMgrName()
bool IsAnyModuleRunning()
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
std::string ComposeAddress(const std::string &server, const std::string &itemtname="")
Provides string, which can be used as receiver argument.
void Sleep(double tmout, const char *prefix=0)
Perform sleeping with event loop running.
void ProduceParameterEvent(ParameterContainer *par, int evid)
bool fAppFinished
when true, reply from application was received
std::string GetLastCreatedDevName()
virtual void Print(int lvl=0)
Displays on std output list of running threads and modules.
Reference DoCreateObject(const std::string &classname, const std::string &objname="", Command cmd=nullptr)
std::string fLastCreatedDevName
name of last created device, automatically used for connection establishing
static void ProcessFactory(Factory *factory)
ThreadsLayout fThrLayout
defines distribution of threads
static Factory * fFirstFactories[10]
first factories, which are comming before manager is created
virtual bool Find(ConfigIO &cfg)
Method to locate object in xml file.
WorkerRef FindDevice(const std::string &name)
bool UnregisterDependency(Object *src, Object *tgt, bool bidirectional=false)
Unregister dependency between objects.
static int fInstanceId
magic number which indicates that instance is initialized
Reference FindItem(const std::string &itemname, bool islocal=false)
Find object in manager hierarchy with specified itemname.
static const char * AppThrdName()
TimeStamp fMgrStoppedTime
indicate when manager mainloop was stopped
void RunManagerCmdLoop(double runtime=0., const std::string &remnode="")
Runs manager command loop - when command shell is used.
ReferencesVector * fDestroyQueue
virtual void ProcessEvent(const EventId &)
bool ProcessParameterEvents()
ThreadRef FindThread(const std::string &name, const std::string &required_class="")
static const char * CmdChlName()
bool ProcessDestroyQueue()
Reference FindPort(const std::string &name)
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
static void SetAutoDestroy(bool on)
Method set flag if manager instance should be destroyed when process finished.
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
bool DoCleanupApplication()
ParamEventReceiverList * fParEventsReceivers
list of workers, registered for receiving of parameter events Used only from manager thread,...
ReferencesVector * fTimedPars
static const char * MgrThrdName()
bool DoCreateMemoryPool(Command cmd)
bool RegisterDependency(Object *src, Object *tgt, bool bidirectional=false)
Register dependency between objects.
static int fFirstFactoriesId
magic number which should be set when fFirstFactories initialized for the first time
RecordsQueue< ParamRec > fParsQueue
void ProcessCtrlCSignal()
Reference GetThreadsFolder(bool force=false)
Return reference on folder with all registered threads.
Reference FindPool(const std::string &name)
WorkerRef GetCommandChannel()
Return reference on command channel.
void RunManagerMainLoop(double runtime=0.)
Runs manager mainloop.
std::string GetNodeAddress(int nodeid)
Return address of the node to be able communicate with it.
virtual double ProcessTimeout(double last_diff)
std::string fLocalAddress
Identifier for the current application Only set when control instance is created Depending on configu...
ThreadRef CurrentThread()
void HaltManager()
Delete all modules and stop manager thread.
std::string GetLocalAddress()
Return address of current application.
ThreadsLayout GetThreadsLayout() const
void FillItemName(const Object *ptr, std::string &itemname, bool compact=true)
Method should be used to produce name of object, which can be used as item name in different Find met...
Configuration * cfg() const
ThreadRef DoCreateThread(const std::string &thrdname, const std::string &classname="", const std::string &devname="", Command cmd=nullptr)
Create thread with specified name and class name.
static Manager * fInstance
pointer on current manager instance
bool DecomposeAddress(const std::string &url, bool &islocal, std::string &server, std::string &itemtname)
From address like dabc://nodeabc:988/item/subtim extracts server (with port) and itemname If server n...
Reference GetFactoriesFolder(bool force=false)
Return reference on folder with factories.
virtual bool DestroyObject(Reference ref)
Delete derived from Object class object in manager thread.
Reference on dabc::MemoryPool class
WorkerRef GetModule() const
Reference on dabc::Module class
std::string OutputName(unsigned n=0, bool itemname=true)
Return item name of the output, can be used in connect command.
PortRef FindPort(const std::string &name)
Return reference on the port.
bool ConnectPoolHandles()
Method called by manager to establish connection to pools TODO: while used from devices,...
std::string InputName(unsigned n=0, bool itemname=true)
Return item name of the input, can be used in connect command.
Base for dabc::ModuleSync and dabc::ModuleAsync classes.
bool IsRunning() const
Returns true if module if running.
Base class for most of the DABC classes.
void FillFullName(std::string &fullname, Object *upto, bool exclude_top_parent=false) const
Method used to produce full item name,.
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
const char * GetName() const
Returns name of the object, thread safe
Mutex * ObjectMutex() const
Returns mutex, used for protection of Object data members.
static void InspectGarbageCollector()
\ brief Methods to inspect how many objects pointers are remained
int fObjectRefCnt
accounts how many references existing on the object, thread safe
static bool NameMatch(const std::string &name, const std::string &mask)
Check if name matches to specified mask.
Reference FindChildRef(const char *name, bool force=false) const
returns reference on child object with given name
bool RemoveChilds(bool cleanup=true)
Remove all childs.
virtual void ObjectDestroyed(Object *)
Method called by the manager when registered dependent object is destroyed Should be used in user cla...
bool IsLogging() const
Return true if object selected for logging, thread safe
void SetCleanupBit()
Method set cleanup bit that object will be cleaned up in all registered objects Used only by manager ...
Container for parameter object.
void ProcessTimeout(double last_dif)
Method called from manager thread when parameter configured as asynchronous.
bool IsDeliverAllEvents() const
If true, all events must be delivered to the consumer.
bool TakeAttrModified()
Returns true if any parameter attribute was modified since last call to this method.
RecordField Value() const
Returns parameter value.
int ExecuteChange(Command cmd)
Specifies that parameter produce 'modified' events synchronous with changes of parameter.
bool NeedTimeout()
Returns true if parameter object requires timeout processing.
Specialized vector with pointers.
Reference on the dabc::Port class
bool IsOutput() const
Returns true if it is output port.
bool IsConnected()
Returns true if port is connected.
bool IsInput() const
Returns true if it is input port.
PortRef GetBindPort()
Return reference on the bind port.
bool Disconnect(bool witherr=false)
Disconnect port
ConnectionRequest MakeConnReq(const std::string &url, bool isserver=false)
Create connection request to specified url.
Module manages published hierarchies and provide optimize access to them
static const char * DfltName()
std::string AsStr(const std::string &dflt="") const
std::string SaveToXml(unsigned mask=0)
Store record in XML form.
Reference on the arbitrary object
bool AddChild(Object *obj)
Add child to list of object children.
void Release()
Releases reference on the object.
const char * ClassName() const
Return class name of referenced object, if object not assigned, returns "---".
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Reference FindChild(const char *name) const
Searches for child in referenced object.
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
unsigned NumReferences() const
Returns number of references on the object.
bool null() const
Returns true if reference contains nullptr.
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
void Destroy()
Release reference and starts destroyment of referenced object.
Vector of dabc::Reference objects.
unsigned GetSize() const
Returns number of items in vector.
bool HasObject(Object *ptr)
Return true if vector has pointer on the object.
Reference TakeLast()
Remove last reference from vector.
bool Clear(bool asowner=false)
Clear all references, if owner specified objects will be destroyed.
bool Add(Reference &ref)
Add reference to the vector.
bool Remove(Object *obj)
Remove reference on specified object
Object * GetObject(unsigned n) const
Returns pointer on the object.
Factory for socket classes
virtual Module * CreateModule(const std::string &classname, const std::string &modulename, Command cmd)
Factory method to create module.
virtual DataOutput * CreateDataOutput(const std::string &typ)
Factory method to create data output.
virtual Reference CreateThread(Reference parent, const std::string &classname, const std::string &thrdname, const std::string &thrddev, Command cmd)
Factory method to create thread.
virtual Reference CreateObject(const std::string &classname, const std::string &objname, dabc::Command cmd)
Factory method to create object.
virtual DataInput * CreateDataInput(const std::string &typ)
Factory method to create data input.
StdManagerFactory(const std::string &name)
Reference on the dabc::Thread class
void RunEventLoop(double tmout=1.)
Runs thread event loop for specified time.
Represent thread functionality.
static unsigned NumThreadInstances()
Uniform Resource Locator interpreter.
bool SetUrl(const std::string &url, bool showerr=true)
std::string GetFullName() const
bool GetOptionBool(const std::string &optname, bool dflt=false) const
std::string GetFileName() const
int GetOptionInt(const std::string &optname, int dflt=0) const
std::string GetProtocol() const
std::string GetHostNameWithPort(int dfltport=0) const
std::string GetOptions() const
double GetOptionDouble(const std::string &optname, double dflt=0.) const
Reference on dabc::Worker
std::string ThreadName() const
Returns thread name of worker assigned.
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
bool Execute(Command cmd, double tmout=-1.)
bool MakeThreadForWorker(const std::string &thrdname="")
Active object, which is working inside dabc::Thread.
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
bool ActivateTimeout(double tmout_sec)
Method used to produce timeout events in the worker.
static int cmd_bool(bool v)
virtual void ProcessEvent(const EventId &)
bool Submit(Command cmd)
Submit command for execution in the processor.
bool Execute(Command cmd, double tmout=-1.)
Execute command in the processor.
Command Assign(Command cmd)
! Assign command with processor before command be submitted to other processor This produce ReplyComm...
bool MakeThreadForWorker(const std::string &thrdname="")
Creates appropriate thread for worker and assign worker to the thread.
std::string ThreadName() const
Returns name of the worker thread; thread-safe
ThreadRef thread()
Return reference on the worker thread; thread-safe.
bool DettachFromThread()
Detach worker from the thread, later worker can be assigned to some other thread Method especially us...
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
bool FireEvent(uint16_t evid)
Hierarchy GetNodeHierarchy(const std::string &nodeaddr)
Function request hierarchy of objects on remote node.
void formats(std::string &sbuf, const char *fmt,...)
std::string format(const char *fmt,...)
@ parConfigured
event only for manager, used to react on reconfiguration of parameter
@ parCreated
produced once when parameter is created
@ parModified
produced when parameter value modified. Either every change or after time interval (default = 1 sec)
@ parDestroy
produced once when parameter is destroyed
const char * xmlAppDfltName
const char * xmlThreadAttr
void SetDebugPrefix(const char *prefix=0)
const char * xmlClassAttr
std::string MakeNodeName(const std::string &arg)
Function creates node name, which can be supplied as receiver of dabc commands.
bool str_to_int(const char *val, int *res)
Convert string to integer value.
const char * typeApplication
Keeps dependency between two objects.
Object * tgt
when this object deleted, DependPair::src will be informed
Reference src
reference on object which want to be informed when DependPair::tgt object is deleted
int fire
how to proceed pair 0 - remain, 1 - inform src, 2 - just delete
DependPair(Object *_src, Object *_tgt)
DependPair(const DependPair &d)
Event structure, exchanged between DABC threads.
std::string asstring() const
Parameter par
reference on the parameter
WorkerRef recv
only workers can be receiver of the parameters events
std::string remote_recv
address of remote receiver of parameter events
bool match(const std::string &parname, int event, const std::string &fullname)
bool only_change
specify if only parameter-change events are produced
std::string name_mask
mask only for parameter names, useful when only specific names are interested
std::string fullname_mask
mask for parameter full names, necessary when full parameter name is important
int queue
number of parameters events submitted, if bigger than some limit events will be skipped
Class for acquiring and holding timestamps.
double SpentTillNow() const
Method return time in second, spent from the time kept in TimeStamp instance If time was not set befo...
bool null() const
Returns true if time stamp is not initialized or its value less than 0.
bool Expired(double interval=0.) const
Method returns true if specified time interval expired relative to time, kept in TimeStamp instance.
void GetNow()
Method to acquire current time stamp.
#define DABC_LOCKGUARD(mutex, info)