24 if (res.
null())
return;
26 std::string kind = cmd.
GetStr(
"textkind");
47 int num = cmd.
GetInt(
"NumHdrs");
48 for (
int n=0;n<num;++n) {
51 store.
SetField(name.c_str(), value.c_str());
54 if (res.
SaveTo(store,
false)) {
70 EOUT(
"No raw data when requesting hierarchy");
75 EOUT(
"Error decoding hierarchy data from buffer");
109 if (
Cfg(
"manager",cmd).AsBool(
false))
139 if (!fMgrHiearchy.null()) {
140 DOUT3(
"dabc::Publisher::BeforeModuleStart mgr path %s", fMgrPath.c_str());
142 fPublishers.back().id = fCnt++;
143 fPublishers.back().path = fMgrPath;
144 fPublishers.back().worker = ItemName();
145 fPublishers.back().fulladdr = WorkerAddress(
true);
146 fPublishers.back().hier = fMgrHiearchy();
147 fPublishers.back().local =
true;
149 fLocal.GetFolder(fMgrPath,
true);
152 ActivateTimeout(0.1);
159 for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
160 if (!iter->local) iter->lastglvers = 0;
169 bool is_any_global(
false);
170 bool rebuild_global = fLocal.GetVersion() > fLastLocalVers;
204 for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
207 is_any_global =
true;
208 if (iter->version > iter->lastglvers) rebuild_global =
true;
211 if (iter->waiting_publisher)
continue;
213 iter->waiting_publisher =
true;
215 if (iter->hier == fMgrHiearchy())
223 fMgrHiearchy.Update(curr);
231 ApplyEntryDiff(iter->id, diff, fMgrHiearchy.GetVersion());
236 bool dostore =
false;
238 cmd.
SetUInt(
"version", iter->version);
239 cmd.
SetPtr(
"hierarchy", iter->hier);
240 cmd.
SetUInt(
"recid", iter->id);
241 if (iter->store && iter->store->CheckForNextStore(storetm, fStorePeriod, fTimeLimit)) {
242 cmd.
SetPtr(
"store", iter->store);
248 Command cmd(
"GetLocalHierarchy");
250 cmd.
SetUInt(
"version", iter->version);
251 cmd.
SetUInt(
"recid", iter->id);
259 if (rebuild_global && is_any_global) {
263 fGlobal.Create(
"Global");
267 fGlobal.Duplicate(fLocal);
268 fLastLocalVers = fLocal.GetVersion();
270 for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
272 if (iter->local || (iter->version==0))
continue;
276 fGlobal.Duplicate(iter->rem);
278 iter->lastglvers = iter->version;
283 if (!is_any_global) {
288 for (SubscribersList::iterator iter = fSubscribers.begin(); iter != fSubscribers.end(); iter++) {
289 if (iter->waiting_worker)
continue;
291 if (iter->hlimit < 0)
continue;
301 for (SubscribersList::iterator iter = fSubscribers.begin(); iter != fSubscribers.end(); iter++) {
302 if (iter->waiting_worker)
continue;
304 if (iter->hlimit >= 0)
continue;
308 if (h.
null()) {
EOUT(
"Subscribed path %s not found", iter->path.c_str());
continue; }
314 PublishersList::iterator iter = fPublishers.begin();
315 while (iter != fPublishers.end()) {
316 if (iter->id == recid)
break;
320 if (iter == fPublishers.end()) {
321 EOUT(
"Get reply for non-existing id %u", recid);
325 iter->waiting_publisher =
false;
329 EOUT(
"Command failed for rec %u addr %s errcnt %d", recid, iter->fulladdr.c_str(), iter->errcnt);
334 iter->version = version;
346 EOUT(
"Did not found local folder %s ", iter->path.c_str());
349 iter->rem.UpdateFromBuffer(diff);
352 DOUT5(
"LOCAL ver %u diff %u itemver %u \n%s", fLocal.GetVersion(), diff.
GetTotalSize(), iter->version, fLocal.SaveToXml().c_str());
355 CheckDnsSubscribers();
363 if (cmd.
IsName(CmdPublisher::CmdName())) {
370 if (cmd.
IsName(
"GetLocalHierarchy")) {
387 if (islocal) *islocal = fGlobal.null();
389 if (path.empty() || (path==
"/"))
return top;
396 if (asproducer && (itemname.length()==0))
return false;
405 producer_name = itemname;
406 DOUT3(
"Producer:%s request:%s item:%s", producer_name.c_str(), request_name.c_str(), itemname.c_str());
410 for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
411 if (iter->local)
continue;
414 if (h.
null())
continue;
421 producer_name = itemname;
426 if (!h.
null() || (itemname.length()<2))
return !producer_name.empty() || !!asproducer;
430 std::string item1 = itemname;
432 size_t pos = item1.find_last_of(
"/", item1.length()-2);
433 if (pos == std::string::npos)
return false;
435 if ((pos == 0) && asproducer)
return false;
437 std::string sub = item1.substr(pos+1);
442 if (IdentifyItem(asproducer, item1, islocal, producer_name, request_name)) {
443 if ((sub.length()>0) && (request_name.length()>0) && (request_name[request_name.length()-1]!=
'/')) request_name.append(
"/");
444 request_name.append(sub);
454 std::string producer_name, request_name;
457 DOUT3(
"PUBLISHER CMD %s ITEM %s", cmd.
GetName(), itemname.c_str());
459 if (!IdentifyItem(
true, itemname, islocal, producer_name, request_name)) {
460 DOUT2(
"Not found producer for item %s", itemname.c_str());
464 DOUT3(
"ITEM %s PRODUCER %s REQUEST %s", itemname.c_str(), producer_name.c_str(), request_name.c_str());
466 bool producer_local(
true);
467 std::string producer_server, producer_item;
469 if (!
dabc::mgr.DecomposeAddress(producer_name, producer_local, producer_server, producer_item)) {
470 EOUT(
"Wrong address specified as producer %s", producer_name.c_str());
474 if (islocal || producer_local) {
478 for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
479 if (!iter->local)
continue;
481 if ((iter->worker != producer_item) && (iter->worker != std::string(
"/") + producer_item))
continue;
486 DOUT3(
"Submit GET command to %s subitem %s", producer_item.c_str(), request_name.c_str());
488 cmd.
SetPtr(
"hierarchy", iter->hier);
489 cmd.
SetStr(
"subitem", request_name);
494 EOUT(
"Not found producer %s, which is correspond to item %s", producer_item.c_str(), itemname.c_str());
499 EOUT(
"Command to get item %s already was analyzed - something went wrong", itemname.c_str());
516 if (def.
null())
return nullptr;
520 std::string request_name;
522 if (producer_name.empty())
return nullptr;
525 DOUT3(
"Create normal command %s for path %s", def.
GetName(), path.c_str());
532 DOUT3(
"Create hierarchy command %s for path %s", request_name.c_str(), path.c_str());
536 res.
ChangeName(dabc::CmdHierarchyExec::CmdName());
537 res.
SetStr(
"Item", request_name);
541 dabc::Url url(std::string(
"execute?") + query);
548 if (part.empty())
break;
550 size_t p = part.find(
"=");
551 if ((p==std::string::npos) || (p==0) || (p==part.length()-1))
break;
553 std::string parname = part.substr(0,p);
554 std::string parvalue = part.substr(p+1);
557 while ((pos = parvalue.find(
"%20")) != std::string::npos)
558 parvalue.replace(pos, 3,
" ");
562 if ((parvalue.length()>1) && ((parvalue[0]==
'\'') || (parvalue[0]==
'\"'))
563 && (parvalue[0] == parvalue[parvalue.length()-1])) {
565 parvalue.resize(parvalue.length()-1);
573 std::vector<std::string> vect;
574 std::vector<double> dblvect;
575 std::vector<int64_t> intvect;
578 res.
SetStr(parname, parvalue);
587 for (
unsigned n=0;n<vect.size();n++) {
590 if (
str_to_double(vect[n].c_str(), &ddd)) dblvect.push_back(ddd);
591 if (
str_to_lint(vect[n].c_str(), &iii)) intvect.push_back(ddd);
594 if (intvect.size()==vect.size()) res.
SetField(parname, intvect);
else
595 if (dblvect.size()==vect.size()) res.
SetField(parname, dblvect);
else
598 }
else if (parname ==
"tmout") {
607 res.
SetStr(parname, parvalue);
611 }
while (!part.empty());
622 if (cmd.
IsName(
"OwnCommand")) {
624 std::string path = cmd.
GetStr(
"Path");
625 std::string worker = cmd.
GetStr(
"Worker");
626 bool ismgrpath =
false;
627 if (path.find(
"$MGR$")==0) {
630 path = fMgrPath + path;
632 if (path.find(
"$CONTEXT$")==0) {
637 switch (cmd.
GetInt(
"cmdid")) {
640 for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
641 if (iter->path == path) {
642 EOUT(
"Path %s already registered!!!", path.c_str());
650 DOUT0(
"Path %s is registered in manager hierarchy, treat it individually", path.c_str());
653 EOUT(
"Path %s already present in the hierarchy", path.c_str());
658 DOUT3(
"PUBLISH folder %s", path.c_str());
661 fPublishers.back().id = fCnt++;
662 fPublishers.back().path = path;
663 fPublishers.back().worker = worker;
665 fPublishers.back().hier = cmd.
GetPtr(
"Hierarchy");
666 fPublishers.back().local =
true;
667 fPublishers.back().mgrsubitem = ismgrpath;
669 if (!fStoreDir.empty()) {
670 if (fStoreSel.empty() || (path.find(fStoreSel) == 0)) {
671 DOUT1(
"Create store for %s", path.c_str());
673 fPublishers.back().store->SetBasePath(fStoreDir + path);
677 fLocal.GetFolder(path,
true);
687 for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
688 if (iter->local && (iter->path == path) && (iter->worker == worker)) {
690 if (!fLocal.RemoveEmptyFolders(path))
691 EOUT(
"Not found local entry with path %s", path.c_str());
693 fPublishers.erase(iter);
699 return cmd_bool(find);
707 if (h.
null()) { h = fGlobal.
GetFolder(path); islocal =
false; }
709 EOUT(
"Path %s not exists", path.c_str());
720 entry.
local = islocal;
728 SubscribersList::iterator iter = fSubscribers.begin();
729 while (iter != fSubscribers.end()) {
730 if ((iter->worker == worker) && (iter->path == path))
731 fSubscribers.erase(iter++);
741 DOUT2(
"Publisher removes worker %s", worker.c_str());
743 PublishersList::iterator iter = fPublishers.begin();
744 while (iter!=fPublishers.end()) {
745 if (iter->worker == worker) {
746 DOUT2(
"Publisher removes path %s of worker %s", iter->path.c_str(), worker.c_str());
747 if (iter->local) fLocal.GetFolder(iter->path).Destroy();
748 fPublishers.erase(iter++);
754 SubscribersList::iterator iter2 = fSubscribers.begin();
755 while (iter2 != fSubscribers.end()) {
756 if (iter2->worker == worker)
757 fSubscribers.erase(iter2++);
769 for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
770 if ((iter->fulladdr == remoteaddr) && !iter->local) {
771 EOUT(
"Path %s already registered!!!", path.c_str());
777 fPublishers.back().id = fCnt++;
778 fPublishers.back().path =
"";
779 fPublishers.back().worker = worker;
780 fPublishers.back().fulladdr = remoteaddr;
781 fPublishers.back().hier = 0;
782 fPublishers.back().local =
false;
783 fPublishers.back().rem.Create(
"remote");
785 DOUT3(
"PUBLISH NODE %s", path.c_str());
791 EOUT(
"BAD OwnCommand ID");
795 if (cmd.
IsName(
"GetLocalHierarchy")) {
800 cmd.
SetUInt(
"version", fLocal.GetVersion());
804 if (cmd.
IsName(CmdGetNamesList::CmdName())) {
805 std::string path = cmd.
GetStr(
"path");
809 DOUT3(
"Get names list %s query %s", path.c_str(), cmd.
GetStr(
"query").c_str());
813 if (!RedirectCommand(cmd, path))
return cmd_false;
814 DOUT3(
"ITEM %s CAN PROVIDE MORE!!!", path.c_str());
822 if (cmd.
IsName(
"CreateExeCmd")) {
827 cmd.
SetRef(
"ExeCmd", res);
831 if (cmd.
IsName(
"CmdUIKind")) {
834 std::string item_name, request_name, uri = cmd.
GetStr(
"uri");
837 if (!IdentifyItem(
false, uri, islocal, item_name, request_name))
return cmd_false;
842 cmd.
SetStr(
"ui_kind",
"__user__");
844 if (request_name.empty()) request_name = h.
GetField(
"_UserFileMain").
AsStr();
845 cmd.
SetStr(
"fname", request_name);
847 cmd.
SetStr(
"path", item_name);
848 cmd.
SetStr(
"fname", request_name);
855 size_t pos = request_name.rfind(
"/");
856 if ((pos != std::string::npos) && (pos != request_name.length()-1)) {
857 cmd.
SetStr(
"path", item_name + request_name.substr(0, pos));
858 cmd.
SetStr(
"fname", request_name.substr(pos+1));
865 if (cmd.
IsName(
"CmdNeedAuth")) {
866 std::string path = cmd.
GetStr(
"path");
880 cmd.
SetInt(
"need_auth", res);
885 if (cmd.
IsName(CmdGetBinary::CmdName())) {
889 std::string itemname = cmd.
GetStr(
"Item");
892 if (cmd.
GetStr(
"Kind") ==
"execute") {
893 std::string query = cmd.
GetStr(
"Query");
911 if (!RedirectCommand(cmd, itemname))
return cmd_false;
924 if (
null())
return false;
937 cmd.
SetStr(
"Worker", workername);
938 cmd.
SetPtr(
"Hierarchy", hier);
940 if (!sync)
return Submit(cmd);
948 if (
null())
return "__error__";
953 if (Execute(cmd) !=
cmd_true)
return "__error__";
955 path = cmd.
GetStr(
"path");
956 fname = cmd.
GetStr(
"fname");
957 return cmd.
GetStr(
"ui_kind");
962 if (
null())
return -1;
967 if (Execute(cmd) !=
cmd_true)
return -1;
969 return cmd.
GetInt(
"need_auth");
977 if (
null())
return res;
980 cmd.
SetStr(
"path", fullname);
981 cmd.
SetStr(
"query", query);
983 if (Execute(cmd) !=
cmd_true)
return res;
985 res = cmd.
GetRef(
"ExeCmd");
987 DOUT3(
"Produce command %p - now submit", res());
989 if (res.
null())
return res;
998 if (
null())
return 0;
1013 if (
null())
return res;
1018 if (Execute(cmd) !=
cmd_true)
return res;
Reference on memory from memory pool.
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Command used to produce custom binary data for published in hierarchy entries.
static void SetResNamesList(dabc::Command &cmd, Hierarchy &res)
static Hierarchy GetResNamesList(dabc::Command &cmd)
Command submitted to worker when item in hierarchy defined as DABC.Command and used to produce custom...
Represents command with its arguments.
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
bool SetStr(const std::string &name, const char *value)
bool SetBool(const std::string &name, bool v)
bool SetInt(const std::string &name, int v)
bool SetRawData(Buffer rawdata)
Set raw data to the command, which can be transported also between nodes.
std::string GetStr(const std::string &name, const std::string &dflt="") const
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
bool SetUInt(const std::string &name, unsigned v)
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
bool GetBool(const std::string &name, bool dflt=false) const
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
bool SetDouble(const std::string &name, double v)
int GetInt(const std::string &name, int dflt=0) const
Buffer GetRawData()
Returns reference on raw data Can be called only once - raw data reference will be cleaned.
void ChangeName(const std::string &name)
Change command name, should not be used for remote commands.
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
Class for holding GMT time with precision of nanoseconds.
class, used for direct store of records in JSON/XML form
void CloseNode(const char *nodename)
void SetField(const char *name, const char *value)
void CreateNode(const char *nodename)
Represents objects hierarchy of remote (or local) DABC process.
void DisableReadingAsChild()
Disable reading of element when it appears as child in the structure.
Hierarchy FindChild(const char *name)
Return child element from hierarchy.
const RecordField & Field(const std::string &name) const
dabc::Buffer SaveToBuffer(unsigned kind=stream_Full, uint64_t version=0, unsigned hlimit=0)
Save hierarchy in binary form, relative to specified version.
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
std::string FindBinaryProducer(std::string &request_name, bool topmost=true)
Search for parent element, where binary_producer property is specified Returns name of binary produce...
bool UpdateFromBuffer(const dabc::Buffer &buf, HierarchyStreamKind kind=stream_Full)
Apply modification to hierarchy, using stored binary data
bool ReadFromBuffer(const dabc::Buffer &buf)
Read hierarchy from buffer.
void BuildNew(Reference top)
Build objects hierarchy, referenced by top.
void SetVersion(uint64_t v)
Change version of the item, only for advanced usage.
std::string ComposeAddress(const std::string &server, const std::string &itemname="")
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
const char * GetName() const
Returns name of the object, thread safe
int NeedAuth(const std::string &path)
Returns 1 - need auth, 0 - no need auth, -1 - undefined.
Buffer GetBinary(const std::string &fullname, const std::string &kind, const std::string &query, double tmout=5.)
Return different kinds of binary data, depends from kind.
Command ExeCmd(const std::string &fullname, const std::string &query)
Execute item is command, providing parameters in query.
std::string UserInterfaceKind(const char *uri, std::string &path, std::string &fname)
Returns "" - undefined, "__tree__" – tree hierarchy "__single__" – single element "__file__" – just a...
Hierarchy GetItem(const std::string &fullname, const std::string &query="", double tmout=5.)
bool OwnCommand(int id, const std::string &path, const std::string &workername, void *hier=nullptr)
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Hierarchy GetWorkItem(const std::string &path, bool *islocal=nullptr)
Return hierarchy item selected for work.
int fFileLimit
! selected hierarchy path for storage like 'MBS' or 'FESA/server'
dabc::Command CreateExeCmd(const std::string &path, const std::string &query, dabc::Command tgt=nullptr)
virtual double ProcessTimeout(double last_diff)
virtual const char * ClassName() const
Returns class name of the object instance.
bool IdentifyItem(bool asproducer, const std::string &itemname, bool &islocal, std::string &producer_name, std::string &request_name)
Try to find producer which potentially could deliver item It could happen that item is not exists in ...
std::string fStoreSel
! directory to store data
bool RedirectCommand(dabc::Command cmd, const std::string &itemname)
Command redirected to local modules or remote publisher, where it should be processed Primary usage -...
Publisher(const std::string &name, dabc::Command cmd=nullptr)
void CheckDnsSubscribers()
bool ApplyEntryDiff(unsigned recid, dabc::Buffer &buf, uint64_t version, bool witherror=false)
Hierarchy fLocal
! this is hierarchy of all known items, including remote, used only when any global hierarchies are e...
std::string fStoreDir
! this is manager hierarchy, published by ourselfs
double fStorePeriod
! maximum time of store file, in seconds
virtual void OnThreadAssigned()
! how often storage is triggered
static const char * DfltName()
void InvalidateGlobal()
Method marks that global version is out of date and should be rebuild.
int fTimeLimit
! maximum size of store file, in MB
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Hierarchy fMgrHiearchy
! path for manager
bool AsBool(bool dflt=false) const
std::string AsStr(const std::string &dflt="") const
int64_t AsInt(int64_t dflt=0) const
double AsDouble(double dflt=0.) const
static bool StrToStrVect(const char *str, std::vector< std::string > &vect, bool verbose=true)
RecordField GetField(const std::string &name) const
bool HasField(const std::string &name) const
bool SetField(const std::string &name, const RecordField &v)
bool RemoveField(const std::string &name)
bool SaveTo(HStore &store, bool create_node=true)
Store hierarchy in json/xml form
void Release()
Releases reference on the object.
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Reference GetParentRef() const
Returns reference on parent object.
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Reference GetFolder(const std::string &name, bool force=false)
Return folder of specified name, no special symbols are allowed.
bool null() const
Returns true if reference contains nullptr.
Uniform Resource Locator interpreter.
std::string GetOptionsPart(int number=0) const
bool HasOption(const std::string &optname) const
int GetOptionInt(const std::string &optname, int dflt=0) const
void SetOptions(const std::string &opt)
Method allows to set URL options directly to be able use all Get methods.
bool Execute(Command cmd, double tmout=-1.)
Active object, which is working inside dabc::Thread.
virtual void OnThreadAssigned()
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
bool str_to_double(const char *val, double *res)
Convert string to double value.
std::string format(const char *fmt,...)
const char * prop_producer
bool str_to_lint(const char *val, long *res)
Convert string to long integer value.