DABC (Data Acquisition Backbone Core)  2.9.9
Publisher.h
Go to the documentation of this file.
1 // $Id: Publisher.h 4472 2020-04-15 13:33:18Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #ifndef DABC_Publisher
17 #define DABC_Publisher
18 
19 #ifndef DABC_Worker
20 #include "dabc/Worker.h"
21 #endif
22 
23 namespace dabc {
24 
25  class PublisherRef;
26  class HierarchyStore;
27 
28  class CmdPublisher : public Command {
29  DABC_COMMAND(CmdPublisher, "CmdPublisher");
30  };
31 
33  class CmdGetBinary : public Command {
34  DABC_COMMAND(CmdGetBinary, "CmdGetBinary");
35 
36  CmdGetBinary(const std::string &path, const std::string &kind, const std::string &query) :
37  Command(CmdName())
38  {
39  SetStr("Item", path);
40  SetStr("Kind", kind);
41  SetStr("Query", query);
42  }
43  };
44 
47  class CmdHierarchyExec : public Command {
48  DABC_COMMAND(CmdHierarchyExec, "CmdHierarchyExec");
49 
50  CmdHierarchyExec(const std::string &path) :
51  Command(CmdName())
52  {
53  SetStr("Item", path);
54  }
55  };
56 
58  class CmdGetNamesList : public Command {
59  DABC_COMMAND(CmdGetNamesList, "CmdGetNamesList");
60 
61  CmdGetNamesList(const std::string &kind, const std::string &path, const std::string &query) :
62  Command(CmdName())
63  {
64  SetStr("textkind", kind);
65  SetStr("path", path);
66  SetStr("query", query);
67  }
68 
69  void AddHeader(const std::string &name, const std::string &value, bool withquotes = true)
70  {
71  int num = GetInt("NumHdrs");
72  SetStr(dabc::format("OptHdrName%d", num), name);
73  SetStr(dabc::format("OptHdrValue%d", num), withquotes ? std::string("\"") + value + "\"" : value);
74  SetInt("NumHdrs", num+1);
75  }
76 
77  static void SetResNamesList(dabc::Command &cmd, Hierarchy& res);
78 
80  };
81 
82 
83  class CmdSubscriber : public Command {
84  DABC_COMMAND(CmdSubscriber, "CmdSubscriber");
85 
86  void SetEntry(const Hierarchy& e) { SetRef("entry", e); }
87  Hierarchy GetEntry() { return GetRef("entry"); }
88  };
89 
90  struct PublisherEntry {
91  unsigned id; // unique id in the worker
92  std::string path; // absolute path in hierarchy
93  std::string worker; // worker which is manages hierarchy
94  std::string fulladdr; // address, used to identify producer of the hierarchy branch
95  void* hier; // local hierarchy pointer, one not allowed to use it from publisher
96  uint64_t version; // last requested version
97  uint64_t lastglvers; // last version, used to build global
98  bool local; // is entry from local node
99  bool mgrsubitem; // if true, belongs to manager hierarchy
100  int errcnt; // counter of consequent errors
101  bool waiting_publisher; // indicate if next request is submitted
102  Hierarchy rem; // remote hierarchy
103  HierarchyStore* store; // store object for the registered hierarchy
104 
106  id(0), path(), worker(), fulladdr(), hier(0),
107  version(0), lastglvers(0), local(true), mgrsubitem(false),
108  errcnt(0), waiting_publisher(false), rem(), store(0) {}
109 
110  // implement copy constructor to avoid any extra copies which are not necessary
112  id(src.id), path(), worker(), fulladdr(), hier(0),
113  version(0), lastglvers(0), local(true), mgrsubitem(false),
114  errcnt(0), waiting_publisher(false), rem(), store(0) {}
115 
116  ~PublisherEntry();
117 
118  // implement assign operator to avoid any extra copies which are not necessary
120  {
121  id = src.id;
122  return *this;
123  }
124  };
125 
127  unsigned id; // unique id in the worker
128  std::string path; // absolute path in hierarchy
129  std::string worker; // worker which is manages hierarchy
130  bool local; // is entry from local node
131  int hlimit; // history limit -1 - DNS, 0 - no history
132  Hierarchy fEntry; // entry which is provided to the worker as result
133  uint64_t version; // last version provided to subscriber
134  bool waiting_worker; // when enabled, we are waiting while worker process entry
135 
137  id(0), path(), worker(), local(true),
138  hlimit(0), fEntry(), version(0), waiting_worker(false) {}
139  };
140 
146  class Publisher : public dabc::Worker {
147 
148  friend class PublisherRef;
149 
150  protected:
151 
153 
155 
156  uint64_t fLastLocalVers;
157 
158  typedef std::list<PublisherEntry> PublishersList;
159 
160  typedef std::list<SubscriberEntry> SubscribersList;
161 
163 
165 
166  unsigned fCnt;
167 
168  std::string fMgrPath;
170 
171  std::string fStoreDir;
172  std::string fStoreSel;
175  double fStorePeriod;
176 
177  virtual void OnThreadAssigned();
178 
179  virtual double ProcessTimeout(double last_diff);
180 
181  virtual int ExecuteCommand(Command cmd);
182 
183  virtual bool ReplyCommand(Command cmd);
184 
185  dabc::Command CreateExeCmd(const std::string &path, const std::string &query, dabc::Command tgt = nullptr);
186 
187  void CheckDnsSubscribers();
188 
190  void InvalidateGlobal();
191 
192  bool ApplyEntryDiff(unsigned recid, dabc::Buffer& buf, uint64_t version, bool witherror = false);
193 
194  bool DoStorage() const { return !fStoreDir.empty(); }
195 
197  Hierarchy GetWorkItem(const std::string &path, bool *islocal = nullptr);
198 
202  bool RedirectCommand(dabc::Command cmd, const std::string &itemname);
203 
207  bool IdentifyItem(bool asproducer, const std::string &itemname, bool &islocal, std::string &producer_name, std::string &request_name);
208 
209  public:
210 
211  Publisher(const std::string &name, dabc::Command cmd = nullptr);
212  virtual ~Publisher();
213 
214  static const char* DfltName() { return "/publ"; }
215 
216  virtual const char* ClassName() const { return "Publisher"; }
217  };
218 
219  // ==============================================================
220 
221  class PublisherRef : public WorkerRef {
223 
224  bool Register(const std::string &path, const std::string &workername, void* hierarchy)
225  { return OwnCommand(1, path, workername, hierarchy); }
226 
227  bool Unregister(const std::string &path, const std::string &workername, void* hierarchy)
228  { return OwnCommand(2, path, workername, hierarchy); }
229 
230  bool Subscribe(const std::string &path, const std::string &workername)
231  { return OwnCommand(3, path, workername); }
232 
233  bool Unsubscribe(const std::string &path, const std::string &workername)
234  { return OwnCommand(4, path, workername); }
235 
236  bool RemoveWorker(const std::string &workername, bool sync = true)
237  { return OwnCommand(sync ? 5 : -5, "", workername); }
238 
239  bool AddRemote(const std::string &remnode, const std::string &workername)
240  { return OwnCommand(6, remnode, workername); }
241 
246  std::string UserInterfaceKind(const char *uri, std::string &path, std::string &fname);
247 
249  int NeedAuth(const std::string &path);
250 
252  Buffer GetBinary(const std::string &fullname, const std::string &kind, const std::string &query, double tmout = 5.);
253 
254  Hierarchy GetItem(const std::string &fullname, const std::string &query = "", double tmout = 5.);
255 
257  Command ExeCmd(const std::string &fullname, const std::string &query);
258 
259  protected:
260  bool OwnCommand(int id, const std::string &path, const std::string &workername, void *hier = nullptr);
261  };
262 
263 }
264 
265 #endif
#define DABC_REFERENCE(RefClass, ParentClass, T)
Definition: Reference.h:222
Reference on memory from memory pool.
Definition: Buffer.h:135
Command used to produce custom binary data for published in hierarchy entries.
Definition: Publisher.h:33
DABC_COMMAND(CmdGetBinary, "CmdGetBinary")
CmdGetBinary(const std::string &path, const std::string &kind, const std::string &query)
Definition: Publisher.h:36
Command to request names list.
Definition: Publisher.h:58
static void SetResNamesList(dabc::Command &cmd, Hierarchy &res)
Definition: Publisher.cxx:22
DABC_COMMAND(CmdGetNamesList, "CmdGetNamesList")
static Hierarchy GetResNamesList(dabc::Command &cmd)
Definition: Publisher.cxx:63
CmdGetNamesList(const std::string &kind, const std::string &path, const std::string &query)
Definition: Publisher.h:61
void AddHeader(const std::string &name, const std::string &value, bool withquotes=true)
Definition: Publisher.h:69
Command submitted to worker when item in hierarchy defined as DABC.Command and used to produce custom...
Definition: Publisher.h:47
DABC_COMMAND(CmdHierarchyExec, "CmdHierarchyExec")
CmdHierarchyExec(const std::string &path)
Definition: Publisher.h:50
DABC_COMMAND(CmdPublisher, "CmdPublisher")
void SetEntry(const Hierarchy &e)
Definition: Publisher.h:86
DABC_COMMAND(CmdSubscriber, "CmdSubscriber")
Hierarchy GetEntry()
Definition: Publisher.h:87
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
Definition: Command.cxx:175
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
Definition: Command.cxx:168
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
int NeedAuth(const std::string &path)
Returns 1 - need auth, 0 - no need auth, -1 - undefined.
Definition: Publisher.cxx:960
Buffer GetBinary(const std::string &fullname, const std::string &kind, const std::string &query, double tmout=5.)
Return different kinds of binary data, depends from kind.
Definition: Publisher.cxx:996
bool Unregister(const std::string &path, const std::string &workername, void *hierarchy)
Definition: Publisher.h:227
bool RemoveWorker(const std::string &workername, bool sync=true)
Definition: Publisher.h:236
Command ExeCmd(const std::string &fullname, const std::string &query)
Execute item is command, providing parameters in query.
Definition: Publisher.cxx:974
bool Unsubscribe(const std::string &path, const std::string &workername)
Definition: Publisher.h:233
std::string UserInterfaceKind(const char *uri, std::string &path, std::string &fname)
Returns "" - undefined, "__tree__" – tree hierarchy "__single__" – single element "__file__" – just a...
Definition: Publisher.cxx:946
Hierarchy GetItem(const std::string &fullname, const std::string &query="", double tmout=5.)
Definition: Publisher.cxx:1009
bool AddRemote(const std::string &remnode, const std::string &workername)
Definition: Publisher.h:239
bool Subscribe(const std::string &path, const std::string &workername)
Definition: Publisher.h:230
bool OwnCommand(int id, const std::string &path, const std::string &workername, void *hier=nullptr)
Definition: Publisher.cxx:922
bool Register(const std::string &path, const std::string &workername, void *hierarchy)
Definition: Publisher.h:224
Module manages published hierarchies and provide optimize access to them
Definition: Publisher.h:146
uint64_t fLastLocalVers
! this is hierarchy only for entries from local modules
Definition: Publisher.h:156
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Definition: Publisher.cxx:361
std::list< SubscriberEntry > SubscribersList
Definition: Publisher.h:160
Hierarchy GetWorkItem(const std::string &path, bool *islocal=nullptr)
Return hierarchy item selected for work.
Definition: Publisher.cxx:382
int fFileLimit
! selected hierarchy path for storage like 'MBS' or 'FESA/server'
Definition: Publisher.h:173
std::string fMgrPath
! counter for new records
Definition: Publisher.h:168
dabc::Command CreateExeCmd(const std::string &path, const std::string &query, dabc::Command tgt=nullptr)
Definition: Publisher.cxx:512
virtual double ProcessTimeout(double last_diff)
Definition: Publisher.cxx:165
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Publisher.h:216
bool IdentifyItem(bool asproducer, const std::string &itemname, bool &islocal, std::string &producer_name, std::string &request_name)
Try to find producer which potentially could deliver item It could happen that item is not exists in ...
Definition: Publisher.cxx:394
std::string fStoreSel
! directory to store data
Definition: Publisher.h:172
bool RedirectCommand(dabc::Command cmd, const std::string &itemname)
Command redirected to local modules or remote publisher, where it should be processed Primary usage -...
Definition: Publisher.cxx:452
bool DoStorage() const
Definition: Publisher.h:194
Publisher(const std::string &name, dabc::Command cmd=nullptr)
Definition: Publisher.cxx:96
PublishersList fPublishers
Definition: Publisher.h:162
virtual ~Publisher()
Definition: Publisher.cxx:123
void CheckDnsSubscribers()
Definition: Publisher.cxx:299
bool ApplyEntryDiff(unsigned recid, dabc::Buffer &buf, uint64_t version, bool witherror=false)
Definition: Publisher.cxx:312
Hierarchy fLocal
! this is hierarchy of all known items, including remote, used only when any global hierarchies are e...
Definition: Publisher.h:154
std::string fStoreDir
! this is manager hierarchy, published by ourselfs
Definition: Publisher.h:171
unsigned fCnt
Definition: Publisher.h:166
double fStorePeriod
! maximum time of store file, in seconds
Definition: Publisher.h:175
virtual void OnThreadAssigned()
! how often storage is triggered
Definition: Publisher.cxx:128
static const char * DfltName()
Definition: Publisher.h:214
SubscribersList fSubscribers
Definition: Publisher.h:164
void InvalidateGlobal()
Method marks that global version is out of date and should be rebuild.
Definition: Publisher.cxx:155
int fTimeLimit
! maximum size of store file, in MB
Definition: Publisher.h:174
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Publisher.cxx:620
std::list< PublisherEntry > PublishersList
! last local version, used to build global list
Definition: Publisher.h:158
Hierarchy fMgrHiearchy
! path for manager
Definition: Publisher.h:169
Hierarchy fGlobal
Definition: Publisher.h:152
Reference on dabc::Worker
Definition: Worker.h:466
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
Event manipulation API.
Definition: api.h:23
std::string format(const char *fmt,...)
Definition: string.cxx:49
HierarchyStore * store
Definition: Publisher.h:103
std::string path
Definition: Publisher.h:92
PublisherEntry & operator=(const PublisherEntry &src)
Definition: Publisher.h:119
uint64_t lastglvers
Definition: Publisher.h:97
PublisherEntry(const PublisherEntry &src)
Definition: Publisher.h:111
std::string fulladdr
Definition: Publisher.h:94
std::string worker
Definition: Publisher.h:93
std::string path
Definition: Publisher.h:128
std::string worker
Definition: Publisher.h:129