DABC (Data Acquisition Backbone Core)  2.9.9
Publisher.cxx
Go to the documentation of this file.
1 // $Id: Publisher.cxx 4718 2021-03-12 17:09:50Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/Publisher.h"
17 
18 #include "dabc/Manager.h"
19 #include "dabc/Url.h"
20 #include "dabc/HierarchyStore.h"
21 
23 {
24  if (res.null()) return;
25 
26  std::string kind = cmd.GetStr("textkind");
27 
28  if (kind.empty()) {
30  cmd.SetRawData(buf);
31  } else {
32  unsigned mask = 0;
33 
34  dabc::Url url;
35  url.SetOptions(cmd.GetStr("query"));
36 
37  if (url.HasOption("compact"))
39 
40  if (kind == "xml") mask |= dabc::storemask_AsXML;
41 
43 
44  dabc::HStore store(mask);
45  store.CreateNode(res.GetName());
46 
47  int num = cmd.GetInt("NumHdrs");
48  for (int n=0;n<num;++n) {
49  std::string name = cmd.GetStr(dabc::format("OptHdrName%d", n)),
50  value = cmd.GetStr(dabc::format("OptHdrValue%d", n));
51  store.SetField(name.c_str(), value.c_str());
52  }
53 
54  if (res.SaveTo(store, false)) {
55  store.CloseNode(res.GetName());
56  cmd.SetStr("astext", store.GetResult());
57  }
58  }
59 }
60 
61 // ======================================================================
62 
64 {
65  dabc::Hierarchy res;
66 
67  dabc::Buffer buf = cmd.GetRawData();
68 
69  if (buf.null()) {
70  EOUT("No raw data when requesting hierarchy");
71  return res;
72  }
73 
74  if (!res.ReadFromBuffer(buf)) {
75  EOUT("Error decoding hierarchy data from buffer");
76  res.Release();
77  }
78 
79  return res;
80 }
81 
82 
83 // ==========================================================
84 
86 {
87  if (store) {
88  store->CloseFile();
89  delete store;
90  store = nullptr;
91  }
92 }
93 
94 // ======================================================================
95 
96 dabc::Publisher::Publisher(const std::string &name, dabc::Command cmd) :
97  dabc::Worker(MakePair(name)),
98  fGlobal(),
99  fLocal(),
100  fLastLocalVers(0),
101  fPublishers(),
102  fSubscribers(),
103  fCnt(0),
104  fMgrPath(),
105  fMgrHiearchy()
106 {
107  fLocal.Create("LOCAL");
108 
109  if (Cfg("manager",cmd).AsBool(false))
110  fMgrHiearchy.Create("Manager");
111 
112  fStoreDir = Cfg("storedir", cmd).AsStr();
113  fStoreSel = Cfg("storesel", cmd).AsStr();
114  fFileLimit = Cfg("filelimit", cmd).AsInt(100);
115  fTimeLimit = Cfg("timelimit", cmd).AsInt(600);
116  fStorePeriod = Cfg("period",cmd).AsDouble(5.);
117 
118  if (!Cfg("store", cmd).AsBool()) fStoreDir.clear();
119 
120  DOUT3("PUBLISHER name:%s item:%s class:%s mgr:%s", GetName(), ItemName().c_str(), ClassName(), DBOOL(!fMgrHiearchy.null()));
121 }
122 
124 {
125 }
126 
127 
129 {
131 
132  fMgrPath = dabc::format("DABC/%s", dabc::mgr.GetName());
133 
134 // std::string addr = dabc::mgr.GetLocalAddress();
135 // size_t pos = addr.find(":");
136 // if (pos<addr.length()) addr[pos]='_';
137 // fMgrPath = std::string("DABC/")+ addr;
138 
139  if (!fMgrHiearchy.null()) {
140  DOUT3("dabc::Publisher::BeforeModuleStart mgr path %s", fMgrPath.c_str());
141  fPublishers.push_back(PublisherEntry());
142  fPublishers.back().id = fCnt++;
143  fPublishers.back().path = fMgrPath;
144  fPublishers.back().worker = ItemName();
145  fPublishers.back().fulladdr = WorkerAddress(true);
146  fPublishers.back().hier = fMgrHiearchy();
147  fPublishers.back().local = true;
148 
149  fLocal.GetFolder(fMgrPath, true);
150  }
151 
152  ActivateTimeout(0.1);
153 }
154 
156 {
157  fLastLocalVers = 0;
158 
159  for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
160  if (!iter->local) iter->lastglvers = 0;
161  }
162 }
163 
164 
165 double dabc::Publisher::ProcessTimeout(double last_diff)
166 {
167 // DOUT0("dabc::Publisher::ProcessTimerEvent");
168 
169  bool is_any_global(false);
170  bool rebuild_global = fLocal.GetVersion() > fLastLocalVers;
171 /*
172  static int mycnt = 0;
173  if ((mycnt++ % 20 == 0) && !fStoreDir.empty()) {
174  HierarchyReading rr;
175  rr.SetBasePath(fStoreDir);
176  DOUT0("-------------- DO SCAN -------------");
177  rr.ScanTree();
178 
179  dabc::Hierarchy hh;
180  rr.GetStrucutre(hh);
181 
182  DOUT0("GOT\n%s", hh.SaveToXml().c_str());
183 
184  dabc::DateTime from;
185  from.SetOnlyDate("2013-09-18");
186  from.SetOnlyDate("13:05:00");
187 
188  dabc::DateTime till = from;
189  till.SetOnlyDate("14:05:00");
190 
191  dabc::Hierarchy sel = rr.GetSerie("/FESA/Test/TestRate", from, till);
192 
193  if (!sel.null())
194  DOUT0("SELECT\n%s",sel.SaveToXml(dabc::storemask_History).c_str());
195  else
196  DOUT0("???????? SELECT FAILED ?????????");
197 
198 
199  DOUT0("-------------- DID SCAN -------------");
200  }
201 */
202  DateTime storetm; // stamp used to mark time when next store operation was triggered
203 
204  for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
205 
206  if (!iter->local) {
207  is_any_global = true;
208  if (iter->version > iter->lastglvers) rebuild_global = true;
209  }
210 
211  if (iter->waiting_publisher) continue;
212 
213  iter->waiting_publisher = true;
214 
215  if (iter->hier == fMgrHiearchy())
216  {
217  // first, generate current objects hierarchy
218  dabc::Hierarchy curr;
219  curr.BuildNew(dabc::mgr);
220  curr.SetField(prop_producer, WorkerAddress());
221 
222  // than use update to mark all changes
223  fMgrHiearchy.Update(curr);
224 
225  // DOUT0("MANAGER %u\n %s", fMgrHiearchy.GetVersion(), fMgrHiearchy.SaveToXml().c_str());
226 
227  // generate diff to the last requested version
228  Buffer diff = fMgrHiearchy.SaveToBuffer(dabc::stream_NamesList, iter->version);
229 
230  // and finally, apply diff to the main hierarchy
231  ApplyEntryDiff(iter->id, diff, fMgrHiearchy.GetVersion());
232 
233  } else
234  if (iter->local) {
235  CmdPublisher cmd;
236  bool dostore = false;
237  cmd.SetReceiver(iter->worker);
238  cmd.SetUInt("version", iter->version);
239  cmd.SetPtr("hierarchy", iter->hier);
240  cmd.SetUInt("recid", iter->id);
241  if (iter->store && iter->store->CheckForNextStore(storetm, fStorePeriod, fTimeLimit)) {
242  cmd.SetPtr("store", iter->store);
243  dostore = true;
244  }
245  cmd.SetTimeout(dostore ? 50. : 5.);
246  dabc::mgr.Submit(Assign(cmd));
247  } else {
248  Command cmd("GetLocalHierarchy");
249  cmd.SetReceiver(iter->fulladdr);
250  cmd.SetUInt("version", iter->version);
251  cmd.SetUInt("recid", iter->id);
252  cmd.SetTimeout(10.);
253  dabc::mgr.Submit(Assign(cmd));
254  }
255 
256 // DOUT0("Submit command to worker %s id %u", iter->worker.c_str(), iter->id);
257  }
258 
259  if (rebuild_global && is_any_global) {
260  // recreate global structures again
261 
262  fGlobal.Release();
263  fGlobal.Create("Global");
264 
265  //DOUT0("LOCAL version before:%u now:%u", (unsigned) fLastLocalVers, (unsigned) fLocal.GetVersion());
266 
267  fGlobal.Duplicate(fLocal);
268  fLastLocalVers = fLocal.GetVersion();
269 
270  for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
271 
272  if (iter->local || (iter->version==0)) continue;
273 
274  //DOUT0("REMOTE %s version before:%u now:%u", iter->fulladdr.c_str(), (unsigned) iter->lastglvers, (unsigned) iter->version);
275 
276  fGlobal.Duplicate(iter->rem);
277 
278  iter->lastglvers = iter->version;
279  }
280 
281  //DOUT0("GLOBAL\n%s", fGlobal.SaveToXml().c_str());
282  } else
283  if (!is_any_global) {
284  fGlobal.Release();
285  fLastLocalVers = 0;
286  }
287 
288  for (SubscribersList::iterator iter = fSubscribers.begin(); iter != fSubscribers.end(); iter++) {
289  if (iter->waiting_worker) continue;
290 
291  if (iter->hlimit < 0) continue;
292 
293  // here direct request can be submitted, do it later, may be even not here
294  }
295 
296  return 0.5;
297 }
298 
300 {
301  for (SubscribersList::iterator iter = fSubscribers.begin(); iter != fSubscribers.end(); iter++) {
302  if (iter->waiting_worker) continue;
303 
304  if (iter->hlimit >= 0) continue;
305 
306  dabc::Hierarchy h = iter->local ? fLocal.GetFolder(iter->path) : fGlobal.GetFolder(iter->path);
307 
308  if (h.null()) { EOUT("Subscribed path %s not found", iter->path.c_str()); continue; }
309  }
310 }
311 
312 bool dabc::Publisher::ApplyEntryDiff(unsigned recid, dabc::Buffer& diff, uint64_t version, bool witherror)
313 {
314  PublishersList::iterator iter = fPublishers.begin();
315  while (iter != fPublishers.end()) {
316  if (iter->id == recid) break;
317  iter++;
318  }
319 
320  if (iter == fPublishers.end()) {
321  EOUT("Get reply for non-existing id %u", recid);
322  return false;
323  }
324 
325  iter->waiting_publisher = false;
326 
327  if (witherror) {
328  iter->errcnt++;
329  EOUT("Command failed for rec %u addr %s errcnt %d", recid, iter->fulladdr.c_str(), iter->errcnt);
330  return false;
331  }
332 
333  iter->errcnt = 0;
334  iter->version = version;
335 
336  if (iter->local) {
337 
338  dabc::Hierarchy top = fLocal.GetFolder(iter->path);
339  if (!top.null()) {
340  // we ensure that update of that item in manager hierarchy will not change its properties
341  top.UpdateFromBuffer(diff);
342  top.SetField(prop_producer, iter->fulladdr);
343  if (iter->mgrsubitem) top.DisableReadingAsChild();
344 
345  } else {
346  EOUT("Did not found local folder %s ", iter->path.c_str());
347  }
348  } else {
349  iter->rem.UpdateFromBuffer(diff);
350  }
351 
352  DOUT5("LOCAL ver %u diff %u itemver %u \n%s", fLocal.GetVersion(), diff.GetTotalSize(), iter->version, fLocal.SaveToXml().c_str());
353 
354  // check if hierarchy was changed
355  CheckDnsSubscribers();
356 
357  return true;
358 }
359 
360 
362 {
363  if (cmd.IsName(CmdPublisher::CmdName())) {
364  dabc::Buffer diff = cmd.GetRawData();
365 
366  ApplyEntryDiff(cmd.GetUInt("recid"), diff, cmd.GetUInt("version"), cmd.GetResult() != cmd_true);
367 
368  return true;
369  } else
370  if (cmd.IsName("GetLocalHierarchy")) {
371  dabc::Buffer diff = cmd.GetRawData();
372 
373  ApplyEntryDiff(cmd.GetUInt("recid"), diff, cmd.GetUInt("version"), cmd.GetResult() != cmd_true);
374 
375  return true;
376  }
377 
378  return dabc::Worker::ReplyCommand(cmd);
379 }
380 
381 
382 dabc::Hierarchy dabc::Publisher::GetWorkItem(const std::string &path, bool* islocal)
383 {
384 
385  dabc::Hierarchy top = fGlobal.null() ? fLocal : fGlobal;
386 
387  if (islocal) *islocal = fGlobal.null();
388 
389  if (path.empty() || (path=="/")) return top;
390 
391  return top.FindChild(path.c_str());
392 }
393 
394 bool dabc::Publisher::IdentifyItem(bool asproducer, const std::string &itemname, bool& islocal, std::string& producer_name, std::string& request_name)
395 {
396  if (asproducer && (itemname.length()==0)) return false;
397 
398  dabc::Hierarchy h = fLocal.GetFolder(itemname);
399  if (!h.null()) {
400  // we need to redirect command to appropriate worker (or to ourself)
401  // for local producers we need to find last (maximal depth) producer
402  if (asproducer)
403  producer_name = h.FindBinaryProducer(request_name, false);
404  else
405  producer_name = itemname;
406  DOUT3("Producer:%s request:%s item:%s", producer_name.c_str(), request_name.c_str(), itemname.c_str());
407 
408  islocal = true;
409  } else
410  for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
411  if (iter->local) continue;
412 
413  h = iter->rem.GetFolder(itemname);
414  if (h.null()) continue;
415 
416  // we need to redirect command to remote node
417 
418  if (asproducer)
419  producer_name = h.FindBinaryProducer(request_name);
420  else
421  producer_name = itemname;
422  islocal = false;
423  break;
424  }
425 
426  if (!h.null() || (itemname.length()<2)) return !producer_name.empty() || !!asproducer;
427 
428  // DOUT0("Cut part from item %s", itemname.c_str());
429 
430  std::string item1 = itemname;
431  //while (item1[item1.length()-1] == '/') item1.resize(item1.length()-1);
432  size_t pos = item1.find_last_of("/", item1.length()-2);
433  if (pos == std::string::npos) return false;
434  // when searching producer, it cannot be root folder
435  if ((pos == 0) && asproducer) return false;
436 
437  std::string sub = item1.substr(pos+1); // keep slash
438  item1.resize(pos+1);
439 
440  // DOUT0("After cut item1 %s sub %s", item1.c_str(), sub.c_str());
441 
442  if (IdentifyItem(asproducer, item1, islocal, producer_name, request_name)) {
443  if ((sub.length()>0) && (request_name.length()>0) && (request_name[request_name.length()-1]!='/')) request_name.append("/");
444  request_name.append(sub);
445  return true;
446  }
447 
448  return false;
449 }
450 
451 
452 bool dabc::Publisher::RedirectCommand(dabc::Command cmd, const std::string &itemname)
453 {
454  std::string producer_name, request_name;
455  bool islocal(true);
456 
457  DOUT3("PUBLISHER CMD %s ITEM %s", cmd.GetName(), itemname.c_str());
458 
459  if (!IdentifyItem(true, itemname, islocal, producer_name, request_name)) {
460  DOUT2("Not found producer for item %s", itemname.c_str());
461  return false;
462  }
463 
464  DOUT3("ITEM %s PRODUCER %s REQUEST %s", itemname.c_str(), producer_name.c_str(), request_name.c_str());
465 
466  bool producer_local(true);
467  std::string producer_server, producer_item;
468 
469  if (!dabc::mgr.DecomposeAddress(producer_name, producer_local, producer_server, producer_item)) {
470  EOUT("Wrong address specified as producer %s", producer_name.c_str());
471  return false;
472  }
473 
474  if (islocal || producer_local) {
475  // this is local case, we need to redirect command to the appropriate worker
476  // but first we should locate hierarchy which is assigned with the worker
477 
478  for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
479  if (!iter->local) continue;
480 
481  if ((iter->worker != producer_item) && (iter->worker != std::string("/") + producer_item)) continue;
482 
483  // we redirect command to local worker
484  // manager should find proper worker for execution
485 
486  DOUT3("Submit GET command to %s subitem %s", producer_item.c_str(), request_name.c_str());
487  cmd.SetReceiver(iter->worker);
488  cmd.SetPtr("hierarchy", iter->hier);
489  cmd.SetStr("subitem", request_name);
490  dabc::mgr.Submit(cmd);
491  return true;
492  }
493 
494  EOUT("Not found producer %s, which is correspond to item %s", producer_item.c_str(), itemname.c_str());
495  return false;
496  }
497 
498  if (cmd.GetBool("analyzed")) {
499  EOUT("Command to get item %s already was analyzed - something went wrong", itemname.c_str());
500  return false;
501  }
502 
503  DOUT3("Submit command to Receiver %s", dabc::mgr.ComposeAddress(producer_server, dabc::Publisher::DfltName()).c_str());
504 
505  cmd.SetReceiver(dabc::mgr.ComposeAddress(producer_server, dabc::Publisher::DfltName()));
506  cmd.SetBool("analyzed", true);
507  dabc::mgr.Submit(cmd);
508  return true;
509 }
510 
511 
512 dabc::Command dabc::Publisher::CreateExeCmd(const std::string &path, const std::string &query, dabc::Command res)
513 {
514  bool islocal(true);
515  dabc::Hierarchy def = GetWorkItem(path, &islocal);
516  if (def.null()) return nullptr;
517 
518  if (def.Field(dabc::prop_kind).AsStr()!="DABC.Command") return nullptr;
519 
520  std::string request_name;
521  std::string producer_name = def.FindBinaryProducer(request_name, !islocal);
522  if (producer_name.empty()) return nullptr;
523 
524  if (def.GetField("_parcmddef").AsBool(false)) {
525  DOUT3("Create normal command %s for path %s", def.GetName(), path.c_str());
526  if (res.null()) {
527  res = dabc::Command(def.GetName());
528  } else {
529  res.ChangeName(def.GetName());
530  }
531  } else {
532  DOUT3("Create hierarchy command %s for path %s", request_name.c_str(), path.c_str());
533  if (res.null()) {
534  res = dabc::CmdHierarchyExec(request_name);
535  } else {
536  res.ChangeName(dabc::CmdHierarchyExec::CmdName());
537  res.SetStr("Item", request_name);
538  }
539  }
540 
541  dabc::Url url(std::string("execute?") + query);
542 
543  if (url.IsValid()) {
544  int cnt = 0;
545  std::string part;
546  do {
547  part = url.GetOptionsPart(cnt++);
548  if (part.empty()) break;
549 
550  size_t p = part.find("=");
551  if ((p==std::string::npos) || (p==0) || (p==part.length()-1)) break;
552 
553  std::string parname = part.substr(0,p);
554  std::string parvalue = part.substr(p+1);
555 
556  size_t pos;
557  while ((pos = parvalue.find("%20")) != std::string::npos)
558  parvalue.replace(pos, 3, " ");
559 
560  bool quotes = false;
561 
562  if ((parvalue.length()>1) && ((parvalue[0]=='\'') || (parvalue[0]=='\"'))
563  && (parvalue[0] == parvalue[parvalue.length()-1])) {
564  parvalue.erase(0,1);
565  parvalue.resize(parvalue.length()-1);
566  quotes = true;
567  }
568 
569  // DOUT0("parname %s parvalue %s", parname.c_str(), parvalue.c_str());
570 
571  if (quotes) {
572 
573  std::vector<std::string> vect;
574  std::vector<double> dblvect;
575  std::vector<int64_t> intvect;
576 
577  if (!dabc::RecordField::StrToStrVect(parvalue.c_str(), vect)) {
578  res.SetStr(parname, parvalue);
579  continue;
580  }
581 
582  if (vect.empty()) {
583  res.SetField(parname, parvalue);
584  continue;
585  }
586 
587  for (unsigned n=0;n<vect.size();n++) {
588  double ddd;
589  long iii;
590  if (str_to_double(vect[n].c_str(), &ddd)) dblvect.push_back(ddd);
591  if (str_to_lint(vect[n].c_str(), &iii)) intvect.push_back(ddd);
592  }
593 
594  if (intvect.size()==vect.size()) res.SetField(parname, intvect); else
595  if (dblvect.size()==vect.size()) res.SetField(parname, dblvect); else
596  res.SetField(parname, vect);
597 
598  } else if (parname == "tmout") {
599  double tmout = 10;
600  if (!str_to_double(parvalue.c_str(), &tmout)) tmout = 10;
601  res.SetTimeout(tmout);
602  } else {
603  double ddd;
604  long iii;
605  if (str_to_lint(parvalue.c_str(), &iii)) res.SetInt(parname, iii); else
606  if (str_to_double(parvalue.c_str(), &ddd)) res.SetDouble(parname, ddd); else
607  res.SetStr(parname, parvalue);
608 
609  }
610 
611  } while (!part.empty());
612  }
613 
614  res.SetReceiver(producer_name);
615 
616  return res;
617 }
618 
619 
621 {
622  if (cmd.IsName("OwnCommand")) {
623 
624  std::string path = cmd.GetStr("Path");
625  std::string worker = cmd.GetStr("Worker");
626  bool ismgrpath = false;
627  if (path.find("$MGR$")==0) {
628  ismgrpath = true;
629  path.erase(0, 5);
630  path = fMgrPath + path;
631  } else
632  if (path.find("$CONTEXT$")==0) {
633  path.erase(0, 9);
634  path = dabc::format("/%s", dabc::mgr.GetName()) + path;
635  }
636 
637  switch (cmd.GetInt("cmdid")) {
638  case 1: { // REGISTER
639 
640  for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
641  if (iter->path == path) {
642  EOUT("Path %s already registered!!!", path.c_str());
643  return cmd_false;
644  }
645  }
646 
647  dabc::Hierarchy h = fLocal.GetFolder(path);
648  if (!h.null()) {
649  if (ismgrpath) {
650  DOUT0("Path %s is registered in manager hierarchy, treat it individually", path.c_str());
652  } else {
653  EOUT("Path %s already present in the hierarchy", path.c_str());
654  return cmd_false;
655  }
656  }
657 
658  DOUT3("PUBLISH folder %s", path.c_str());
659 
660  fPublishers.push_back(PublisherEntry());
661  fPublishers.back().id = fCnt++;
662  fPublishers.back().path = path;
663  fPublishers.back().worker = worker;
664  fPublishers.back().fulladdr = dabc::mgr.ComposeAddress("", worker);
665  fPublishers.back().hier = cmd.GetPtr("Hierarchy");
666  fPublishers.back().local = true;
667  fPublishers.back().mgrsubitem = ismgrpath;
668 
669  if (!fStoreDir.empty()) {
670  if (fStoreSel.empty() || (path.find(fStoreSel) == 0)) {
671  DOUT1("Create store for %s", path.c_str());
672  fPublishers.back().store = new HierarchyStore();
673  fPublishers.back().store->SetBasePath(fStoreDir + path);
674  }
675  }
676 
677  fLocal.GetFolder(path, true);
678  // set immediately producer
679 
680  // ShootTimer("Timer");
681 
682  return cmd_true;
683  }
684 
685  case 2: { // UNREGISTER
686  bool find = false;
687  for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
688  if (iter->local && (iter->path == path) && (iter->worker == worker)) {
689 
690  if (!fLocal.RemoveEmptyFolders(path))
691  EOUT("Not found local entry with path %s", path.c_str());
692 
693  fPublishers.erase(iter);
694  find = true;
695  break;
696  }
697  }
698 
699  return cmd_bool(find);
700  }
701 
702  case 3: { // SUBSCRIBE
703 
704  bool islocal = true;
705 
706  dabc::Hierarchy h = fLocal.GetFolder(path);
707  if (h.null()) { h = fGlobal.GetFolder(path); islocal = false; }
708  if (h.null()) {
709  EOUT("Path %s not exists", path.c_str());
710  return cmd_false;
711  }
712 
713  fSubscribers.push_back(SubscriberEntry());
714 
715  SubscriberEntry& entry = fSubscribers.back();
716 
717  entry.id = fCnt++;
718  entry.path = path;
719  entry.worker = worker;
720  entry.local = islocal;
721  entry.hlimit = 0;
722 
723  return cmd_true;
724  }
725 
726  case 4: { // UNSUBSCRIBE
727 
728  SubscribersList::iterator iter = fSubscribers.begin();
729  while (iter != fSubscribers.end()) {
730  if ((iter->worker == worker) && (iter->path == path))
731  fSubscribers.erase(iter++);
732  else
733  iter++;
734  }
735 
736  return cmd_true;
737  }
738 
739  case 5: { // REMOVE WORKER
740 
741  DOUT2("Publisher removes worker %s", worker.c_str());
742 
743  PublishersList::iterator iter = fPublishers.begin();
744  while (iter!=fPublishers.end()) {
745  if (iter->worker == worker) {
746  DOUT2("Publisher removes path %s of worker %s", iter->path.c_str(), worker.c_str());
747  if (iter->local) fLocal.GetFolder(iter->path).Destroy();
748  fPublishers.erase(iter++);
749  } else {
750  iter++;
751  }
752  }
753 
754  SubscribersList::iterator iter2 = fSubscribers.begin();
755  while (iter2 != fSubscribers.end()) {
756  if (iter2->worker == worker)
757  fSubscribers.erase(iter2++);
758  else
759  iter2++;
760  }
761 
762  return cmd_true;
763  }
764 
765  case 6: { // ADD REMOTE NODE
766 
767  std::string remoteaddr = dabc::mgr.ComposeAddress(path, dabc::Publisher::DfltName());
768 
769  for (PublishersList::iterator iter = fPublishers.begin(); iter != fPublishers.end(); iter++) {
770  if ((iter->fulladdr == remoteaddr) && !iter->local) {
771  EOUT("Path %s already registered!!!", path.c_str());
772  return cmd_false;
773  }
774  }
775 
776  fPublishers.push_back(PublisherEntry());
777  fPublishers.back().id = fCnt++;
778  fPublishers.back().path = "";
779  fPublishers.back().worker = worker;
780  fPublishers.back().fulladdr = remoteaddr;
781  fPublishers.back().hier = 0;
782  fPublishers.back().local = false;
783  fPublishers.back().rem.Create("remote");
784 
785  DOUT3("PUBLISH NODE %s", path.c_str());
786 
787  return cmd_true;
788  }
789 
790  default:
791  EOUT("BAD OwnCommand ID");
792  return cmd_false;
793  }
794  } else
795  if (cmd.IsName("GetLocalHierarchy")) {
796 
797  Buffer diff = fLocal.SaveToBuffer(dabc::stream_NamesList, cmd.GetUInt("version"));
798 
799  cmd.SetRawData(diff);
800  cmd.SetUInt("version", fLocal.GetVersion());
801 
802  return cmd_true;
803  } else
804  if (cmd.IsName(CmdGetNamesList::CmdName())) {
805  std::string path = cmd.GetStr("path");
806 
807  dabc::Hierarchy h = GetWorkItem(path);
808 
809  DOUT3("Get names list %s query %s", path.c_str(), cmd.GetStr("query").c_str());
810 
811  // if item was not found directly, try to ask producer if it can be extended
812  if (h.null() || h.HasField(dabc::prop_more) || (cmd.GetStr("query").find("more")!=std::string::npos)) {
813  if (!RedirectCommand(cmd, path)) return cmd_false;
814  DOUT3("ITEM %s CAN PROVIDE MORE!!!", path.c_str());
815  return cmd_postponed;
816  }
817 
819 
820  return cmd_true;
821  } else
822  if (cmd.IsName("CreateExeCmd")) {
823 
824  dabc::Command res = CreateExeCmd(cmd.GetStr("path"), cmd.GetStr("query"));
825  if (res.null()) return cmd_false;
826 
827  cmd.SetRef("ExeCmd", res);
828 
829  return cmd_true;
830  } else
831  if (cmd.IsName("CmdUIKind")) {
832 
833  bool islocal = true;
834  std::string item_name, request_name, uri = cmd.GetStr("uri");
835 
836  if (!uri.empty())
837  if (!IdentifyItem(false, uri, islocal, item_name, request_name)) return cmd_false;
838 
839  dabc::Hierarchy h = GetWorkItem(item_name);
840 
841  if (islocal && h.Field(dabc::prop_kind).AsStr()=="DABC.HTML") {
842  cmd.SetStr("ui_kind", "__user__");
843  cmd.SetStr("path", h.GetField("_UserFilePath").AsStr());
844  if (request_name.empty()) request_name = h.GetField("_UserFileMain").AsStr();
845  cmd.SetStr("fname", request_name);
846  } else {
847  cmd.SetStr("path", item_name);
848  cmd.SetStr("fname", request_name);
849 
850  //if (request_name.empty())
851  // cmd.SetStr("ui_kind", h.NumChilds() > 0 ? "__tree__" : "__single__");
852 
853  // publisher can only identify existing entries
854  // all extra entries (like objects members in Go4 events browser) appears as subfolders in request
855  size_t pos = request_name.rfind("/");
856  if ((pos != std::string::npos) && (pos != request_name.length()-1)) {
857  cmd.SetStr("path", item_name + request_name.substr(0, pos));
858  cmd.SetStr("fname", request_name.substr(pos+1));
859  }
860  }
861 
862  return cmd_true;
863 
864  } else
865  if (cmd.IsName("CmdNeedAuth")) {
866  std::string path = cmd.GetStr("path");
867  dabc::Hierarchy h = GetWorkItem(path);
868 
869  int res = -1;
870 
871  while (!h.null()) {
872  if (h.HasField(dabc::prop_auth)) {
873  res = h.GetField(dabc::prop_auth).AsBool() ? 1 : 0;
874  break;
875  }
876 
877  h = h.GetParentRef();
878  }
879 
880  cmd.SetInt("need_auth", res);
881 
882  return cmd_true;
883  }
884 
885  if (cmd.IsName(CmdGetBinary::CmdName())) {
886 
887  // if we get command here, we need to find destination for it
888 
889  std::string itemname = cmd.GetStr("Item");
890 
891  // this is executing of the command
892  if (cmd.GetStr("Kind") == "execute") {
893  std::string query = cmd.GetStr("Query");
894 
895  cmd.RemoveField("Item");
896  cmd.RemoveField("Kind");
897  cmd.RemoveField("Query");
898 
899  dabc::Command res = CreateExeCmd(itemname, query, cmd);
900 
901  if (res == cmd) {
902  dabc::mgr.Submit(cmd);
903  return cmd_postponed;
904  } else {
905  return cmd_false;
906  }
907  }
908 
909  // DOUT3("Publisher::CmdGetBinary for item %s", itemname.c_str());
910 
911  if (!RedirectCommand(cmd, itemname)) return cmd_false;
912 
913  return cmd_postponed;
914  }
915 
916  return dabc::Worker::ExecuteCommand(cmd);
917 }
918 
919 // ===================================================================
920 
921 
922 bool dabc::PublisherRef::OwnCommand(int id, const std::string &path, const std::string &workername, void* hier)
923 {
924  if (null()) return false;
925 
926 /* if (thread().null()) {
927  DOUT2("Trying to submit publisher command when thread not assigned - ignore");
928  return true;
929  }
930 */
931  bool sync = id > 0;
932  if (!sync) id = -id;
933 
934  Command cmd("OwnCommand");
935  cmd.SetInt("cmdid", id);
936  cmd.SetStr("Path", path);
937  cmd.SetStr("Worker", workername);
938  cmd.SetPtr("Hierarchy", hier);
939 
940  if (!sync) return Submit(cmd);
941 
942  return Execute(cmd) == cmd_true;
943 }
944 
945 
946 std::string dabc::PublisherRef::UserInterfaceKind(const char* uri, std::string& path, std::string& fname)
947 {
948  if (null()) return "__error__";
949 
950  dabc::Command cmd("CmdUIKind");
951  cmd.SetStr("uri", uri);
952 
953  if (Execute(cmd) != cmd_true) return "__error__";
954 
955  path = cmd.GetStr("path");
956  fname = cmd.GetStr("fname");
957  return cmd.GetStr("ui_kind");
958 }
959 
960 int dabc::PublisherRef::NeedAuth(const std::string &path)
961 {
962  if (null()) return -1;
963 
964  dabc::Command cmd("CmdNeedAuth");
965  cmd.SetStr("path", path);
966 
967  if (Execute(cmd) != cmd_true) return -1;
968 
969  return cmd.GetInt("need_auth");
970 }
971 
972 
973 
974 dabc::Command dabc::PublisherRef::ExeCmd(const std::string &fullname, const std::string &query)
975 {
976  dabc::Command res;
977  if (null()) return res;
978 
979  dabc::Command cmd("CreateExeCmd");
980  cmd.SetStr("path", fullname);
981  cmd.SetStr("query", query);
982 
983  if (Execute(cmd) != cmd_true) return res;
984 
985  res = cmd.GetRef("ExeCmd");
986 
987  DOUT3("Produce command %p - now submit", res());
988 
989  if (res.null()) return res;
990 
991  dabc::mgr.Execute(res);
992  return res;
993 }
994 
995 
996 dabc::Buffer dabc::PublisherRef::GetBinary(const std::string &fullname, const std::string &kind, const std::string &query, double tmout)
997 {
998  if (null()) return 0;
999 
1000  CmdGetBinary cmd(fullname, kind, query);
1001  cmd.SetTimeout(tmout);
1002 
1003  if (Execute(cmd) == cmd_true)
1004  return cmd.GetRawData();
1005 
1006  return 0;
1007 }
1008 
1009 dabc::Hierarchy dabc::PublisherRef::GetItem(const std::string &fullname, const std::string &query, double tmout)
1010 {
1011  dabc::Hierarchy res;
1012 
1013  if (null()) return res;
1014 
1015  CmdGetBinary cmd(fullname, "hierarchy", query);
1016  cmd.SetTimeout(tmout);
1017 
1018  if (Execute(cmd) != cmd_true) return res;
1019 
1020  res.Create("get");
1021  res.SetVersion(cmd.GetUInt("version"));
1022  res.ReadFromBuffer(cmd.GetRawData());
1023 
1024  return res;
1025 }
Reference on memory from memory pool.
Definition: Buffer.h:135
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
Command used to produce custom binary data for published in hierarchy entries.
Definition: Publisher.h:33
static void SetResNamesList(dabc::Command &cmd, Hierarchy &res)
Definition: Publisher.cxx:22
static Hierarchy GetResNamesList(dabc::Command &cmd)
Definition: Publisher.cxx:63
Command submitted to worker when item in hierarchy defined as DABC.Command and used to produce custom...
Definition: Publisher.h:47
Represents command with its arguments.
Definition: Command.h:99
void SetPtr(const std::string &name, void *p)
Set pointer argument for the command.
Definition: Command.cxx:151
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
Definition: Command.h:148
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetBool(const std::string &name, bool v)
Definition: Command.h:141
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
bool SetRawData(Buffer rawdata)
Set raw data to the command, which can be transported also between nodes.
Definition: Command.cxx:334
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
Definition: Command.cxx:108
bool SetUInt(const std::string &name, unsigned v)
Definition: Command.h:147
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
Definition: Command.h:264
int GetResult() const
Definition: Command.h:174
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
Definition: Command.cxx:175
bool SetDouble(const std::string &name, double v)
Definition: Command.h:144
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Buffer GetRawData()
Returns reference on raw data Can be called only once - raw data reference will be cleaned.
Definition: Command.cxx:347
void ChangeName(const std::string &name)
Change command name, should not be used for remote commands.
Definition: Command.cxx:62
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
Definition: Command.cxx:168
void * GetPtr(const std::string &name, void *deflt=0) const
Get pointer argument from the command.
Definition: Command.cxx:158
Class for holding GMT time with precision of nanoseconds.
Definition: timing.h:190
class, used for direct store of records in JSON/XML form
Definition: Record.h:179
void CloseNode(const char *nodename)
Definition: Record.cxx:221
void SetField(const char *name, const char *value)
Definition: Record.cxx:170
std::string GetResult()
Definition: Record.h:212
void CreateNode(const char *nodename)
Definition: Record.cxx:145
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
void DisableReadingAsChild()
Disable reading of element when it appears as child in the structure.
Definition: Hierarchy.cxx:1162
Hierarchy FindChild(const char *name)
Return child element from hierarchy.
Definition: Hierarchy.h:362
const RecordField & Field(const std::string &name) const
Definition: Hierarchy.h:304
dabc::Buffer SaveToBuffer(unsigned kind=stream_Full, uint64_t version=0, unsigned hlimit=0)
Save hierarchy in binary form, relative to specified version.
Definition: Hierarchy.cxx:873
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Definition: Hierarchy.cxx:934
std::string FindBinaryProducer(std::string &request_name, bool topmost=true)
Search for parent element, where binary_producer property is specified Returns name of binary produce...
Definition: Hierarchy.cxx:1098
bool UpdateFromBuffer(const dabc::Buffer &buf, HierarchyStreamKind kind=stream_Full)
Apply modification to hierarchy, using stored binary data
Definition: Hierarchy.cxx:912
bool ReadFromBuffer(const dabc::Buffer &buf)
Read hierarchy from buffer.
Definition: Hierarchy.cxx:896
void BuildNew(Reference top)
Build objects hierarchy, referenced by top.
Definition: Hierarchy.cxx:795
void SetVersion(uint64_t v)
Change version of the item, only for advanced usage.
Definition: Hierarchy.h:352
std::string ComposeAddress(const std::string &server, const std::string &itemname="")
Definition: Manager.h:603
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Object.cxx:1076
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
int NeedAuth(const std::string &path)
Returns 1 - need auth, 0 - no need auth, -1 - undefined.
Definition: Publisher.cxx:960
Buffer GetBinary(const std::string &fullname, const std::string &kind, const std::string &query, double tmout=5.)
Return different kinds of binary data, depends from kind.
Definition: Publisher.cxx:996
Command ExeCmd(const std::string &fullname, const std::string &query)
Execute item is command, providing parameters in query.
Definition: Publisher.cxx:974
std::string UserInterfaceKind(const char *uri, std::string &path, std::string &fname)
Returns "" - undefined, "__tree__" – tree hierarchy "__single__" – single element "__file__" – just a...
Definition: Publisher.cxx:946
Hierarchy GetItem(const std::string &fullname, const std::string &query="", double tmout=5.)
Definition: Publisher.cxx:1009
bool OwnCommand(int id, const std::string &path, const std::string &workername, void *hier=nullptr)
Definition: Publisher.cxx:922
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Definition: Publisher.cxx:361
Hierarchy GetWorkItem(const std::string &path, bool *islocal=nullptr)
Return hierarchy item selected for work.
Definition: Publisher.cxx:382
int fFileLimit
! selected hierarchy path for storage like 'MBS' or 'FESA/server'
Definition: Publisher.h:173
dabc::Command CreateExeCmd(const std::string &path, const std::string &query, dabc::Command tgt=nullptr)
Definition: Publisher.cxx:512
virtual double ProcessTimeout(double last_diff)
Definition: Publisher.cxx:165
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Publisher.h:216
bool IdentifyItem(bool asproducer, const std::string &itemname, bool &islocal, std::string &producer_name, std::string &request_name)
Try to find producer which potentially could deliver item It could happen that item is not exists in ...
Definition: Publisher.cxx:394
std::string fStoreSel
! directory to store data
Definition: Publisher.h:172
bool RedirectCommand(dabc::Command cmd, const std::string &itemname)
Command redirected to local modules or remote publisher, where it should be processed Primary usage -...
Definition: Publisher.cxx:452
Publisher(const std::string &name, dabc::Command cmd=nullptr)
Definition: Publisher.cxx:96
virtual ~Publisher()
Definition: Publisher.cxx:123
void CheckDnsSubscribers()
Definition: Publisher.cxx:299
bool ApplyEntryDiff(unsigned recid, dabc::Buffer &buf, uint64_t version, bool witherror=false)
Definition: Publisher.cxx:312
Hierarchy fLocal
! this is hierarchy of all known items, including remote, used only when any global hierarchies are e...
Definition: Publisher.h:154
std::string fStoreDir
! this is manager hierarchy, published by ourselfs
Definition: Publisher.h:171
double fStorePeriod
! maximum time of store file, in seconds
Definition: Publisher.h:175
virtual void OnThreadAssigned()
! how often storage is triggered
Definition: Publisher.cxx:128
static const char * DfltName()
Definition: Publisher.h:214
void InvalidateGlobal()
Method marks that global version is out of date and should be rebuild.
Definition: Publisher.cxx:155
int fTimeLimit
! maximum size of store file, in MB
Definition: Publisher.h:174
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Publisher.cxx:620
Hierarchy fMgrHiearchy
! path for manager
Definition: Publisher.h:169
bool AsBool(bool dflt=false) const
Definition: Record.cxx:477
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
int64_t AsInt(int64_t dflt=0) const
Definition: Record.cxx:501
double AsDouble(double dflt=0.) const
Definition: Record.cxx:549
static bool StrToStrVect(const char *str, std::vector< std::string > &vect, bool verbose=true)
Definition: Record.cxx:983
RecordField GetField(const std::string &name) const
Definition: Record.h:510
bool HasField(const std::string &name) const
Definition: Record.h:498
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
bool RemoveField(const std::string &name)
Definition: Record.h:501
bool SaveTo(HStore &store, bool create_node=true)
Store hierarchy in json/xml form
Definition: Record.h:534
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
Reference GetParentRef() const
Returns reference on parent object.
Definition: Reference.h:135
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
Reference GetFolder(const std::string &name, bool force=false)
Return folder of specified name, no special symbols are allowed.
Definition: Reference.cxx:234
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Uniform Resource Locator interpreter.
Definition: Url.h:33
std::string GetOptionsPart(int number=0) const
Definition: Url.cxx:195
bool HasOption(const std::string &optname) const
Definition: Url.h:70
int GetOptionInt(const std::string &optname, int dflt=0) const
Definition: Url.cxx:290
void SetOptions(const std::string &opt)
Method allows to set URL options directly to be able use all Get methods.
Definition: Url.cxx:102
bool IsValid() const
Definition: Url.h:55
bool Execute(Command cmd, double tmout=-1.)
Definition: Worker.cxx:1147
bool Submit(Command cmd)
Definition: Worker.cxx:1139
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
virtual void OnThreadAssigned()
Definition: Worker.h:392
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Worker.cxx:851
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Definition: Worker.cxx:856
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
@ storemask_Compact
Definition: Record.h:168
@ storemask_AsXML
Definition: Record.h:173
bool str_to_double(const char *val, double *res)
Convert string to double value.
Definition: string.cxx:216
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * prop_kind
Definition: Hierarchy.cxx:29
const char * prop_more
Definition: Hierarchy.cxx:38
const char * prop_auth
Definition: Hierarchy.cxx:34
@ stream_NamesList
Definition: Hierarchy.h:132
const char * prop_producer
Definition: Hierarchy.cxx:32
bool str_to_lint(const char *val, long *res)
Convert string to long integer value.
Definition: string.cxx:162
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38
std::string path
Definition: Publisher.h:128
std::string worker
Definition: Publisher.h:129