DABC (Data Acquisition Backbone Core)  2.9.9
Monitor.cxx
Go to the documentation of this file.
1 // $Id: Monitor.cxx 3887 2018-05-16 15:01:49Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "root/Monitor.h"
17 
18 #include "TImage.h"
19 
20 #include "dabc/Url.h"
21 #include "dabc/Iterator.h"
22 #include "dabc/Publisher.h"
23 
24 #include "THttpServer.h"
25 #include "TRootSniffer.h"
26 #include "TRootSnifferStore.h"
27 #include "RVersion.h"
28 
29 class TRootSnifferStoreDabc : public TRootSnifferStore {
30 public:
32  std::string prefix;
34 
35  TRootSnifferStoreDabc(dabc::Hierarchy& _top, const std::string &_prefix) :
36  TRootSnifferStore(),
37  top(_top),
38  prefix(_prefix.c_str()),
39  curr() {}
40 
42 
43  virtual void CreateNode(Int_t lvl, const char* nodename)
44  {
45  if (curr.null()) {
46  top.Create(nodename);
47  curr = top;
48  } else {
49  curr = curr.CreateHChild(nodename, true);
50  }
51  }
52  virtual void SetField(Int_t lvl, const char* field, const char* value, Bool_t with_quotes)
53  {
54  if (!with_quotes) {
55  if (!strcmp("value", "true")) { curr.SetField(field, true); return; }
56  if (!strcmp("value", "false")) { curr.SetField(field, false); return; }
57  if ((strlen(value)>2) && (*value=='[')) {
58  dabc::RecordField f(value);
59  if (f.AsIntVect().size()>0) {
60  curr.SetField(field,f.AsIntVect());
61  return;
62  }
63  }
64  }
65 
66  if ((strcmp(field,"_autoload")==0) && (*value == '/')) {
67  TString res = "/";
68  res.Append(prefix);
69  res.Append(value);
70  res.ReplaceAll(";/", std::string("/") + prefix);
71  curr.SetField(field, res.Data());
72  } else
73  if ((strcmp(field,"_icon")==0) && (*value == '/'))
74  curr.SetField(field, std::string("/") + prefix + value);
75  else
76  curr.SetField(field, value);
77  }
78 
79  virtual void BeforeNextChild(Int_t, Int_t, Int_t)
80  {
81  }
82 
83  virtual void CloseNode(Int_t, Int_t)
84  {
86  }
87 };
88 
89 
90 // ==============================================================================
91 
92 root::Monitor::Monitor(const std::string &name, dabc::Command cmd) :
93  dabc::Worker(MakePair(name)),
94  fEnabled(false),
95  fRoot(),
96  fHierarchy(),
97  fRootCmds(dabc::CommandsQueue::kindPostponed),
98  fLastUpdate()
99 {
100  fEnabled = Cfg("enabled", cmd).AsBool(false);
101  if (!fEnabled) return;
102  fPrefix = Cfg("prefix", cmd).AsStr("ROOT");
103 }
104 
106 {
107 }
108 
109 
111 {
113 
114  if (!IsEnabled()) {
115  EOUT("sniffer was not enabled - why it is started??");
116  return;
117  }
118 
119  fHierarchy.Create("ROOT", true);
120  InitializeHierarchy();
121 
122  Publish(fHierarchy, fPrefix);
123 
124  // if timer not installed, emulate activity in ROOT by regular timeouts
125 }
126 
127 double root::Monitor::ProcessTimeout(double last_diff)
128 {
129  return 10.;
130 }
131 
132 
134 {
135  if (cmd.IsName(dabc::CmdGetBinary::CmdName()) ||
136  cmd.IsName(dabc::CmdGetNamesList::CmdName()) ||
137  cmd.IsName(dabc::CmdHierarchyExec::CmdName())) {
138  dabc::LockGuard lock(fHierarchy.GetHMutex());
139  fRootCmds.Push(cmd);
140  return dabc::cmd_postponed;
141  }
142 
143  return dabc::Worker::ExecuteCommand(cmd);
144 }
145 
146 
147 void root::Monitor::RescanHierarchy(TRootSniffer* sniff, dabc::Hierarchy& main, const char* path)
148 {
149  main.Release();
150 
151  TRootSnifferStoreDabc store(main, fPrefix);
152 
153  sniff->ScanHierarchy("DABC", path, &store);
154 }
155 
156 
157 int root::Monitor::ProcessGetBinary(THttpServer* serv, TRootSniffer* sniff, dabc::Command cmd)
158 {
159  // command executed in ROOT context without locked mutex,
160  // one can use as much ROOT as we want
161 
162  std::string itemname = cmd.GetStr("subitem");
163  std::string binkind = cmd.GetStr("Kind");
164  std::string query = cmd.GetStr("Query");
165 
166  dabc::Buffer buf;
167 
168  TString realfilename;
169  std::string filename = std::string("/") + itemname + std::string("/") + binkind;
170 
171  if (serv->IsFileRequested(filename.c_str(), realfilename)) {
172  DOUT0("Item %s Send file %s ", filename.c_str(), realfilename.Data());
173 
174  Int_t len = 0;
175  char *ptr = THttpServer::ReadFileContent(realfilename.Data(), len);
176  buf = dabc::Buffer::CreateBuffer(ptr, (unsigned) len, true);
177  cmd.SetRawData(buf);
178  return dabc::cmd_true;
179  }
180 
181  // for root.bin request verify that object must be really requested - it may be not changed at all
182  if (binkind == "root.bin") {
183 
184  // check if version was specified in query
185  uint64_t version = 0;
186  dabc::Url url(std::string("getbin?") + query);
187  if (url.IsValid() && url.HasOption("version"))
188  version = (unsigned) url.GetOptionInt("version", 0);
189 
190  ULong_t objhash = sniff->GetItemHash(itemname.c_str());
191 
192  bool binchanged = true;
193  if (version > 0) {
194  dabc::LockGuard lock(fHierarchy.GetHMutex());
195  binchanged = fHierarchy.IsBinItemChanged(itemname, objhash, version);
196  }
197 
198  if (!binchanged) {
199  buf = dabc::Buffer::CreateBuffer(8); // dummy data of 8 bytes long
200  {
201  dabc::LockGuard lock(fHierarchy.GetHMutex());
202  // here correct version number for item and master item will be filled
203  fHierarchy.FillBinHeader(itemname, cmd);
204  }
205 
206  cmd.SetRawData(buf);
207  return dabc::cmd_true;
208  }
209  }
210 
211 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,13,0)
212 
213  std::string str;
214 
215  if (!sniff->Produce(itemname, binkind, query, str)) {
216  EOUT("ROOT sniffer producer fails for item %s kind %s", itemname.c_str(), binkind.c_str());
217  return dabc::cmd_false;
218  }
219 
220  buf = dabc::Buffer::CreateBuffer(str.data(), str.length(), false, true);
221 
222 #else
223 
224  void* ptr(nullptr);
225  Long_t length(0);
226  TString str;
227 
228  // use sniffer method to generate data
229 
230  if (!sniff->Produce(itemname.c_str(), binkind.c_str(), query.c_str(), ptr, length, str)) {
231  EOUT("ROOT sniffer producer fails for item %s kind %s", itemname.c_str(), binkind.c_str());
232  return dabc::cmd_false;
233  }
234 
235  if (ptr)
236  buf = dabc::Buffer::CreateBuffer(ptr, (unsigned) length, true);
237  else
238  buf = dabc::Buffer::CreateBuffer(str.Data(), str.Length(), false, true);
239 
240 #endif
241 
242  // for binary data set correct version into header
243  if (binkind == "root.bin") {
244 
245  ULong_t mhash = sniff->GetStreamerInfoHash();
246 
247  dabc::LockGuard lock(fHierarchy.GetHMutex());
248  // here correct version number for item and master item will be filled
249  fHierarchy.FillBinHeader(itemname, cmd, mhash);
250  }
251 
252  cmd.SetRawData(buf);
253 
254  return dabc::cmd_true;
255 }
256 
257 
258 void root::Monitor::ProcessActionsInRootContext(THttpServer* serv, TRootSniffer* sniff)
259 {
260  // DOUT0("ROOOOOOOT sniffer ProcessActionsInRootContext %p %s active %s", this, ClassName(), DBOOL(fWorkerActive));
261 
262  if (fLastUpdate.null() || fLastUpdate.Expired(3.)) {
263  DOUT3("Update ROOT structures");
264  RescanHierarchy(sniff, fRoot);
265  fLastUpdate.GetNow();
266 
267  // we lock mutex only at the moment when synchronize hierarchy with main
268  dabc::LockGuard lock(fHierarchy.GetHMutex());
269 
270  fHierarchy.Update(fRoot);
271  }
272 
273  bool doagain(true);
274  dabc::Command cmd;
275 
276  while (doagain) {
277  if (cmd.IsName(dabc::CmdGetBinary::CmdName())) {
278  cmd.Reply(ProcessGetBinary(serv, sniff, cmd));
279  } else
280  if (cmd.IsName(dabc::CmdGetNamesList::CmdName())) {
281  std::string item = cmd.GetStr("subitem");
282 
283  dabc::Hierarchy res;
284 
285  if (item.empty()) {
286  if (fLastUpdate.null() || fLastUpdate.Expired(3.)) {
287  RescanHierarchy(sniff, fRoot);
288  fLastUpdate.GetNow();
289  dabc::LockGuard lock(fHierarchy.GetHMutex());
290  fHierarchy.Update(fRoot);
291  }
292 
293  res = fRoot;
294  } else {
295  RescanHierarchy(sniff, res, item.c_str());
296  }
297 
298  DOUT3("Extend result %s", DBOOL(!res.null()));
299 
300  if (res.null()) cmd.ReplyFalse();
301 
303 
304  cmd.ReplyTrue();
305  } else
306  if (cmd.IsName(dabc::CmdHierarchyExec::CmdName())) {
307  if (ProcessHCommand(cmd.GetStr("Item"), cmd)) cmd.ReplyTrue();
308  }
309 
310  if (!cmd.null()) {
311  EOUT("Not processed command %s", cmd.GetName());
312  cmd.ReplyFalse();
313  }
314 
315  dabc::LockGuard lock(fHierarchy.GetHMutex());
316 
317  if (fRootCmds.Size()>0) cmd = fRootCmds.Pop();
318  doagain = !cmd.null();
319  }
320 }
321 
virtual void SetField(Int_t lvl, const char *field, const char *value, Bool_t with_quotes)
Definition: Monitor.cxx:52
dabc::Hierarchy curr
Definition: Monitor.cxx:33
virtual void CloseNode(Int_t, Int_t)
Definition: Monitor.cxx:83
virtual ~TRootSnifferStoreDabc()
Definition: Monitor.cxx:41
TRootSnifferStoreDabc(dabc::Hierarchy &_top, const std::string &_prefix)
Definition: Monitor.cxx:35
virtual void CreateNode(Int_t lvl, const char *nodename)
Definition: Monitor.cxx:43
dabc::Hierarchy & top
Definition: Monitor.cxx:31
virtual void BeforeNextChild(Int_t, Int_t, Int_t)
Definition: Monitor.cxx:79
std::string prefix
Definition: Monitor.cxx:32
Reference on memory from memory pool.
Definition: Buffer.h:135
static Buffer CreateBuffer(BufferSize_t sz)
This static method create independent buffer for any other memory pools Therefore it can be used in s...
Definition: Buffer.cxx:419
static void SetResNamesList(dabc::Command &cmd, Hierarchy &res)
Definition: Publisher.cxx:22
Represents command with its arguments.
Definition: Command.h:99
bool SetRawData(Buffer rawdata)
Set raw data to the command, which can be transported also between nodes.
Definition: Command.cxx:334
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
void ReplyFalse()
Reply on the command with false (cmd_false==0) value.
Definition: Command.h:232
void ReplyTrue()
Reply on the command with true (cmd_true==1) value.
Definition: Command.h:235
void Reply(int res=cmd_noresult)
Replied on the command.
Definition: Command.cxx:225
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Definition: Hierarchy.cxx:934
Lock guard for posix mutex.
Definition: threads.h:127
std::vector< int64_t > AsIntVect() const
Definition: Record.cxx:573
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
Reference GetParentRef() const
Returns reference on parent object.
Definition: Reference.h:135
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Uniform Resource Locator interpreter.
Definition: Url.h:33
virtual void OnThreadAssigned()
Definition: Worker.h:392
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Worker.cxx:851
virtual void OnThreadAssigned()
Definition: Monitor.cxx:110
Monitor(const std::string &name, dabc::Command cmd=nullptr)
Definition: Monitor.cxx:92
virtual double ProcessTimeout(double last_diff)
Definition: Monitor.cxx:127
virtual ~Monitor()
Definition: Monitor.cxx:105
void RescanHierarchy(TRootSniffer *sniff, dabc::Hierarchy &main, const char *path=0)
Method scans normal objects, registered in ROOT and DABC.
Definition: Monitor.cxx:147
virtual int ProcessGetBinary(THttpServer *serv, TRootSniffer *sniff, dabc::Command cmd)
Definition: Monitor.cxx:157
void ProcessActionsInRootContext(THttpServer *serv, TRootSniffer *sniff)
Definition: Monitor.cxx:258
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Definition: Monitor.cxx:133
int main(int numc, char *args[])
Definition: dabc_exe.cxx:68
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38