DABC (Data Acquisition Backbone Core)  2.9.9
BnetMasterModule.cxx
Go to the documentation of this file.
1 // $Id: BnetMasterModule.cxx 4707 2021-02-26 11:29:21Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "hadaq/BnetMasterModule.h"
17 
18 #include "dabc/Publisher.h"
19 #include "dabc/Iterator.h"
20 
21 #include "hadaq/HadaqTypeDefs.h"
22 
24  dabc::ModuleAsync(name, cmd)
25 {
26  fControl = Cfg("Controller", cmd).AsBool(false);
27  fMaxRunSize = Cfg("MaxRunSize", cmd).AsUInt(2000);
28 
29  double period = Cfg("period", cmd).AsDouble(fControl ? 0.2 : 1);
30  CreateTimer("update", period);
31 
32  fSameBuildersCnt = 0;
33 
34  fCmdCnt = 1;
35  fCmdReplies = 0;
36  fCmdQuality = 1.;
37 
38  fRefreshCnt = 1;
39  fRefreshReplies = 0;
40 
41  fCtrlId = 1;
42  fNewRunTm.GetNow();
43  fCtrlTm.GetNow();
44  fCtrlCnt = 0;
45  fCtrlError = false;
46  fCtrlErrorCnt = 0;
47  fCtrlSzLimit = 0; // no need to do something
48 
49  // more fine measurement of events rate
52 
53  fWorkerHierarchy.Create("Bnet");
54 
55  fWorkerHierarchy.SetField("_player","DABC.BnetControl");
56 
58  item.SetField(dabc::prop_kind, "Text");
59  item.SetField("value", "");
60  item.SetField("_hidden", "true");
61 
62  item = fWorkerHierarchy.CreateHChild("Builders");
63  item.SetField(dabc::prop_kind, "Text");
64  item.SetField("value", "");
65  item.SetField("_hidden", "true");
66 
67  item = fWorkerHierarchy.CreateHChild("LastPrefix");
68  item.SetField(dabc::prop_kind, "Text");
69  item.SetField("value", "");
70  item.SetField("_hidden", "true");
71 
72  item = fWorkerHierarchy.CreateHChild("LastCalibr");
73  item.SetField(dabc::prop_kind, "Text");
74  item.SetField("value", "");
75  item.SetField("_hidden", "true");
76 
77  item = fWorkerHierarchy.CreateHChild("MasterRunId"); // runid configured by master when starting RUN
78  item.SetField(dabc::prop_kind, "Text");
79  item.SetField("value", "0");
80  item.SetField("_hidden", "true");
81 
82  item = fWorkerHierarchy.CreateHChild("RunningCmd"); // currently running cmd
83  item.SetField(dabc::prop_kind, "Text");
84  item.SetField("value", "");
85  item.SetField("_hidden", "true");
86 
87  CreatePar("State").SetFld(dabc::prop_kind, "Text").SetValue("Init");
88  CreatePar("Quality").SetFld(dabc::prop_kind, "Text").SetValue("0.5");
89 
90  CreatePar("RunId").SetFld(dabc::prop_kind, "Text").SetValue("--");
91  CreatePar("RunIdStr").SetFld(dabc::prop_kind, "Text").SetValue("--");
92  CreatePar("RunPrefix").SetFld(dabc::prop_kind, "Text").SetValue("--");
93 
94  CreatePar("DataRate").SetUnits("MB").SetFld(dabc::prop_kind,"rate").SetFld("#record", true);
95  CreatePar("EventsRate").SetUnits("Ev").SetFld(dabc::prop_kind,"rate").SetFld("#record", true);
96  CreatePar("LostRate").SetUnits("Ev").SetFld(dabc::prop_kind,"rate").SetFld("#record", true);
97 
98  CreatePar("TotalEvents").SetValue("0");
99  CreatePar("TotalLost").SetValue("0");
100 
101  if (fControl) {
102  CreateCmdDef("StartRun").AddArg("prefix", "string", true, "run")
103  .AddArg("oninit", "int", false, "0");
104  CreateCmdDef("StopRun");
105  CreateCmdDef("RefreshRun");
106  CreateCmdDef("ResetDAQ");
107  }
108 
109  // read calibration from file
111 
112  // Publish(fWorkerHierarchy, "$CONTEXT$/BNET");
113  PublishPars("$CONTEXT$/BNET");
114 
115  DOUT0("BNET MASTER Control %s period %3.1f ", DBOOL(fControl), period);
116 }
117 
118 void hadaq::BnetMasterModule::AddItem(std::vector<std::string> &items, std::vector<std::string> &nodes, const std::string &item, const std::string &node)
119 {
120  auto iter1 = items.begin();
121  auto iter2 = nodes.begin();
122  while (iter1 != items.end()) {
123  if (*iter1 > item) {
124  items.insert(iter1, item);
125  nodes.insert(iter2, node);
126  return;
127  }
128  ++iter1;
129  ++iter2;
130  }
131 
132  items.emplace_back(item);
133  nodes.emplace_back(node);
134 }
135 
136 void hadaq::BnetMasterModule::PreserveLastCalibr(bool do_write, double quality, unsigned runid)
137 {
138  dabc::Hierarchy item = fWorkerHierarchy.GetHChild("LastCalibr");
139  if (!item) return;
140 
141  dabc::DateTime tm;
142 
143  FILE* f = fopen("lastcalibr.txt", do_write ? "w" : "r");
144  if (!f) {
145  EOUT("FAIL to open file lastcalibr.txt for %s", do_write ? "writing" : "reading");
146  return;
147  }
148 
149  if (do_write) {
150  tm.GetNow();
151  fprintf(f,"%lu\n", (long unsigned) tm.AsJSDate());
152  fprintf(f,"%f\n", quality);
153  fprintf(f,"%u\n", runid);
154  } else {
155  long unsigned tm_js = 0;
156  if (fscanf(f,"%lu", &tm_js) != 1) EOUT("Fail to get time from lastcalibr.txt");
157  tm.SetJSDate(tm_js);
158  if (fscanf(f,"%lf", &quality) != 1) EOUT("Fail to get quality from lastcalibr.txt");
159  if (fscanf(f,"%u", &runid) != 1) EOUT("Fail to get runid from lastcalibr.txt");
160 
161  if (!do_write) fCalibrRunId = runid;
162  }
163  fclose(f);
164 
165  std::string info = dabc::format("%s quality = %5.2f run = %s", tm.AsString(0,true).c_str(), quality, hadaq::FormatFilename(runid,0).c_str());
166 
167  DOUT0("CALIBR INFO %s", info.c_str());
168 
169  item.SetField("value", info);
170  item.SetField("time", tm.AsJSString());
171  item.SetField("quality", quality);
172  item.SetField("runid", runid);
173 }
174 
175 
177 {
178  if (cmd.IsName(dabc::CmdGetNamesList::CmdName())) {
179  //DOUT0("Get hierarchy");
181  dabc::Iterator iter(h);
182  std::vector<std::string> binp, bbuild, nodes_inp, nodes_build;
183  while (iter.next()) {
184  dabc::Hierarchy item = iter.ref();
185  if (item.HasField("_bnet")) {
186  std::string kind = item.GetField("_bnet").AsStr(),
187  producer = item.GetField("_producer").AsStr();
188 
189  std::size_t pos = producer.find_last_of("/");
190  if (pos != std::string::npos) producer.resize(pos);
191 
192  if (kind == "sender") AddItem(binp, nodes_inp, item.ItemName(), producer);
193  if (kind == "receiver") AddItem(bbuild, nodes_build, item.ItemName(), producer);
194  //DOUT0("Get BNET %s", item.ItemName().c_str());
195  }
196  }
197 
198  if ((fLastBuilders.size()>0) && (fLastBuilders == bbuild)) {
199  fSameBuildersCnt++;
200  if (!fInitRunCmd.null() && (fSameBuildersCnt > cmd.GetInt("oninit"))) {
201  DOUT0("DETECTED SAME BUILDERS %d", fSameBuildersCnt);
202 
203  fInitRunCmd.SetBool("#verified", true);
204  int res = ExecuteCommand(fInitRunCmd);
205  if (res != dabc::cmd_postponed)
206  fInitRunCmd.Reply(res);
207  else
208  fInitRunCmd.Release();
209  }
210 
211  } else {
212  fSameBuildersCnt = 0;
213  }
214 
215  fLastBuilders = bbuild;
216 
217  fWorkerHierarchy.GetHChild("Inputs").SetField("value", binp);
218  fWorkerHierarchy.GetHChild("Inputs").SetField("nodes", nodes_inp);
219  fWorkerHierarchy.GetHChild("Builders").SetField("value", bbuild);
220  fWorkerHierarchy.GetHChild("Builders").SetField("nodes", nodes_build);
221 
222  if (fCtrlCnt != 0) {
223  if (!fCtrlTm.Expired()) return true;
224  if (fCtrlCnt > 0) { fCtrlError = true; EOUT("Fail to get %d control records", fCtrlCnt); }
225  }
226 
227  if (fCtrlError)
228  fCtrlErrorCnt++;
229  else
230  fCtrlErrorCnt = 0;
231 
232  fCtrlCnt = 0;
233  fCtrlId++;
234  fCtrlError = false;
235  fCtrlTm.GetNow(3.); // use 3 sec timeout for control requests
236 
237  fCtrlStateQuality = 1;
238  fCtrlStateName = "";
239  fCtrlData = 0.;
240  fCtrlEvents = 0.;
241  fCtrlLost = 0.;
242  fCtrlInpNodesCnt = 0;
243  fCtrlInpNodesExpect = 0;
244  fCtrlBldNodesCnt = 0;
245  fCtrlBldNodesExpect = 0;
246 
247  fCtrlRunId = 0;
248  fCtrlRunPrefix = "";
249 
250  fCurrentLost = fCurrentEvents = fCurrentData = 0;
251 
252  dabc::WorkerRef publ = GetPublisher();
253 
254  for (unsigned n=0;n<bbuild.size();++n) {
255  dabc::CmdGetBinary subcmd(bbuild[n], "hierarchy", "childs");
256  subcmd.SetInt("#bnet_ctrl_id", fCtrlId);
257  subcmd.SetTimeout(10);
258  publ.Submit(Assign(subcmd));
259  fCtrlCnt++;
260  }
261 
262  for (unsigned n=0;n<binp.size();++n) {
263  dabc::CmdGetBinary subcmd(binp[n], "hierarchy", "childs");
264  subcmd.SetInt("#bnet_ctrl_id", fCtrlId);
265  subcmd.SetTimeout(10);
266  publ.Submit(Assign(subcmd));
267  fCtrlCnt++;
268  }
269 
270  if (fCtrlCnt == 0) {
271  fCtrlStateQuality = 0.;
272  fCtrlStateName = "NoNodes";
273  } else if (binp.size()==0) {
274  fCtrlStateQuality = 0.;
275  fCtrlStateName = "NoInputs";
276  } else if (bbuild.size() == 0) {
277  fCtrlStateQuality = 0.;
278  fCtrlStateName = "NoBuilders";
279  } else if (fCtrlErrorCnt > 5) {
280  fCtrlStateQuality = 0.1;
281  fCtrlStateName = "LostControl";
282  }
283 
284  if (!fCtrlStateName.empty()) {
285  SetParValue("State", "NoNodes");
286  SetParValue("Quality", fCtrlStateQuality);
287  }
288 
289  return true;
290 
291  } else if (cmd.HasField("#bnet_cnt")) {
292  // this commands used to send file requests
293 
294  DOUT0("Get %s reply id:%d expecting:%d replies:%d cmd:%s", cmd.GetName(), cmd.GetInt("#bnet_cnt"), fCmdCnt, fCmdReplies, fCurrentFileCmd.GetName());
295 
296  if (!fCurrentFileCmd.null() && (cmd.GetInt("#bnet_cnt") == fCmdCnt)) {
297 
298  bool stop_calibr = fCurrentFileCmd.IsName("StopRun") && fCurrentFileCmd.GetBool("#calibr_run");
299 
300  if (stop_calibr && cmd.HasField("quality"))
301  if (cmd.GetDouble("quality") < fCmdQuality)
302  fCmdQuality = cmd.GetDouble("quality");
303 
304  if (--fCmdReplies <= 0) {
305 
306  fCalibrRunId = fCurrentFileCmd.GetUInt("#calibr_runid");
307 
308  fCurrentFileCmd.Reply(dabc::cmd_true);
309 
310  fWorkerHierarchy.GetHChild("RunningCmd").SetField("value","");
311 
312  if (stop_calibr) PreserveLastCalibr(true, fCmdQuality, fCalibrRunId);
313  }
314  }
315 
316  } else if (cmd.HasField("#refresh_cnt")) {
317 
318  if (!fCurrentRefreshCmd.null() && (cmd.GetInt("#refresh_cnt") == fRefreshCnt)) {
319  double q = cmd.GetDouble("quality", 1.),
320  q0 = fCurrentRefreshCmd.GetDouble("quality", 1.);
321  if (q < q0) {
322  q0 = q;
323  fCurrentRefreshCmd.SetDouble("quality", q0);
324  }
325 
326  if (--fRefreshReplies <= 0) {
327  fCurrentRefreshCmd.Reply(dabc::cmd_true);
328  PreserveLastCalibr(true, q0, fCalibrRunId);
329  }
330  }
331 
332  return true;
333 
334  } else if (cmd.GetInt("#bnet_ctrl_id") == fCtrlId) {
335  // this commands used to send control requests
336 
337  fCtrlCnt--;
338 
339  if (!cmd.GetResult() || cmd.IsTimedout()) fCtrlError = true;
340 
342  dabc::Iterator iter(h);
343 
344  bool is_builder = false;
345 
346  while (iter.next()) {
347  dabc::Hierarchy item = iter.ref();
348  if (!item.HasField("_bnet")) {
349  if (item.IsName("HadaqData") && is_builder) fCtrlData += item.GetField("value").AsDouble(); else
350  if (item.IsName("HadaqEvents") && is_builder) fCtrlEvents += item.GetField("value").AsDouble(); else
351  if (item.IsName("HadaqLostEvents") && is_builder) fCtrlLost += item.GetField("value").AsDouble();
352  continue;
353  }
354  // normally only that item should be used
355 
356  if (item.GetField("_bnet").AsStr() == "receiver") is_builder = true;
357 
358  if (is_builder) fCtrlBldNodesCnt++;
359  else fCtrlInpNodesCnt++;
360 
361  std::string state = item.GetField("state").AsStr();
362  double quality = item.GetField("quality").AsDouble();
363 
364  if (fCtrlStateName.empty() || (quality < fCtrlStateQuality)) {
365  fCtrlStateQuality = quality;
366  fCtrlStateName = state;
367  }
368 
369  if (is_builder) {
370  fCurrentLost += item.GetField("discard_events").AsUInt();
371  fCurrentEvents += item.GetField("build_events").AsUInt();
372  fCurrentData += item.GetField("build_data").AsUInt();
373 
374  if (!fTotalData) DOUT0("FIRST TIME GET DATA %d %lu", fCtrlCnt, item.GetField("build_data").AsUInt());
375 
376  // check maximal size of the run
377  if (fNewRunTm.Expired() && (fCtrlSzLimit > 0) && (fMaxRunSize > 0) && (item.GetField("runsize").AsUInt() > fMaxRunSize*1e6))
378  fCtrlSzLimit = 2;
379 
380  // check current runid
381  unsigned runid = item.GetField("runid").AsUInt();
382  std::string runprefix = item.GetField("runprefix").AsStr();
383 
384  if (runid && !runprefix.empty()) {
385  if (!fCtrlRunId) {
386  fCtrlRunId = runid;
387  fCtrlRunPrefix = runprefix;
388  } else if ((fCtrlRunId != runid) || (fCtrlRunPrefix != runprefix)) {
389  if ((fCtrlStateQuality > 0) && fNewRunTm.Expired()) {
390  fCtrlStateName = "RunMismatch";
391  fCtrlStateQuality = 0;
392  }
393  }
394  }
395 
396  int ninputs = item.GetField("ninputs").AsInt();
397  if (fCtrlInpNodesExpect == 0) fCtrlInpNodesExpect = ninputs;
398 
399  if ((fCtrlInpNodesExpect != ninputs) && (fCtrlStateQuality > 0)) {
400  fCtrlStateName = "InputsMismatch";
401  fCtrlStateQuality = 0;
402  }
403 
404  } else {
405  int nbuilders = item.GetField("nbuilders").AsInt();
406  if (fCtrlBldNodesExpect==0) fCtrlBldNodesExpect = nbuilders;
407  if ((fCtrlBldNodesExpect != nbuilders) && (fCtrlStateQuality > 0)) {
408  fCtrlStateName = "BuildersMismatch";
409  fCtrlStateQuality = 0;
410  }
411  }
412 
413  // DOUT0("BNET reply from %s state %s sz %u", item.GetField("_bnet").AsStr().c_str(), item.GetField("state").AsStr().c_str(), item.GetField("runsize").AsUInt());
414  }
415 
416  if (fCtrlCnt == 0) {
417  if (fCtrlStateName.empty()) {
418  fCtrlStateName = "Ready";
419  fCtrlStateQuality = 1.;
420  }
421  if ((fCtrlInpNodesCnt == 0) && (fCtrlStateQuality > 0)) {
422  fCtrlStateName = "NoInputs";
423  fCtrlStateQuality = 0;
424  }
425 
426  if ((fCtrlInpNodesExpect != fCtrlInpNodesCnt) && (fCtrlStateQuality > 0)) {
427  fCtrlStateName = "InputsMismatch";
428  fCtrlStateQuality = 0;
429  }
430 
431  if ((fCtrlBldNodesCnt == 0) && (fCtrlStateQuality > 0)) {
432  fCtrlStateName = "NoBuilders";
433  fCtrlStateQuality = 0;
434  }
435  if ((fCtrlBldNodesExpect != fCtrlBldNodesCnt) && (fCtrlStateQuality > 0)) {
436  fCtrlStateName = "BuildersMismatch";
437  fCtrlStateQuality = 0;
438  }
439 
440  SetParValue("State", fCtrlStateName);
441  SetParValue("Quality", fCtrlStateQuality);
442  SetParValue("RunId", fCtrlRunId);
443  SetParValue("RunIdStr", fCtrlRunId ? hadaq::FormatFilename(fCtrlRunId,0) : std::string("0"));
444  SetParValue("RunPrefix", fCtrlRunPrefix);
445 
446  fWorkerHierarchy.GetHChild("LastPrefix").SetField("value", fCtrlRunPrefix);
447 
448  DOUT3("BNET control sequence ready state %s overlimit %s", fCtrlStateName.c_str(), DBOOL(fCtrlSzLimit>1));
449 
450  bool do_set = (fTotalEvents || fTotalLost) && (fCurrentLost >= fTotalLost) && (fCurrentEvents >= fTotalEvents) && (fCurrentData >= fTotalData);
451 
452  if (do_set) {
453  double spent = fLastRateTm.SpentTillNow();
454  spent = (spent > 1e-3) ? 1./spent : 0.;
455 
456  fCtrlLost = (fCurrentLost-fTotalLost)*spent;
457  fCtrlEvents = (fCurrentEvents-fTotalEvents)*spent;
458  fCtrlData = (fCurrentData-fTotalData)*spent/1024./1024.;
459 
460  if ((fCtrlEvents > 1e9) || (fCtrlLost > 1e9) || (fCtrlData > 1e9)) do_set = false;
461  }
462 
463  fTotalLost = fCurrentLost;
464  fTotalEvents = fCurrentEvents;
465  fTotalData = fCurrentData;
466  fLastRateTm.GetNow();
467 
468  if (do_set) {
469  Par("DataRate").SetValue(fCtrlData);
470  Par("EventsRate").SetValue(fCtrlEvents);
471  Par("LostRate").SetValue(fCtrlLost);
472  }
473 
474  SetParValue("TotalEvents", fTotalEvents);
475  SetParValue("TotalLost", fTotalLost);
476 
477  if (fControl && (fCtrlSzLimit > 1) && fCurrentFileCmd.null()) {
478  fCtrlSzLimit = 0;
479  // this is a place, where new run automatically started
480  dabc::Command newrun("StartRun");
481  newrun.SetTimeout(45);
482  Submit(newrun);
483  }
484  }
485  }
486 
487  return dabc::Module::ReplyCommand(cmd);
488 }
489 
491 {
492 }
493 
495 {
497 
498  dabc::WorkerRef publ = GetPublisher();
499 
500  publ.Submit(Assign(cmd));
501 
502  if (!fCurrentFileCmd.null() && fCurrentFileCmd.IsTimedout()) {
503  EOUT("Abort RUN command %s", fCurrentFileCmd.GetName());
504  fCurrentFileCmd.Reply(dabc::cmd_false);
505  fWorkerHierarchy.GetHChild("RunningCmd").SetField("value","");
506  }
507 
508  if (!fInitRunCmd.null() && fInitRunCmd.IsTimedout())
509  fInitRunCmd.Reply(dabc::cmd_false);
510 }
511 
513 {
514  if (cmd.IsName("StartRun") || cmd.IsName("StopRun")) {
515 
516  if (!fCurrentFileCmd.null()) {
517  EOUT("Ignore run command - previous %s not completed", fCurrentFileCmd.GetName());
518  return dabc::cmd_false;
519  }
520 
521  bool isstart = cmd.IsName("StartRun");
522 
523  // DOUT0("Command %s oninit %s", cmd.GetName(), cmd.GetStr("oninit").c_str());
524 
525  if (isstart && (cmd.GetInt("oninit")>0) && !cmd.GetBool("#verified")) {
526  DOUT0("Detect START RUN with oninit flag!!!!");
527 
528  // this is entry point for StartRun command during initialization
529  // we remember it and checks that at least two time same list of input nodes are collected
530  if (!fInitRunCmd.null()) fInitRunCmd.Reply(dabc::cmd_false);
531  fInitRunCmd = cmd;
532  fSameBuildersCnt = 0; // reset counter
533  if (!cmd.IsTimeoutSet()) cmd.SetTimeout(30. + cmd.GetInt("oninit")*2.);
534  return dabc::cmd_postponed;
535  }
536 
537  std::vector<std::string> builders = fWorkerHierarchy.GetHChild("Builders").GetField("value").AsStrVect();
538  if (builders.size() == 0) return dabc::cmd_true;
539 
540  dabc::WorkerRef publ = GetPublisher();
541  if (publ.null()) return dabc::cmd_false;
542 
543  fCurrentFileCmd = cmd;
544  if (isstart) {
545  fCtrlSzLimit = 1; // allow to control size limits
546  fNewRunTm.GetNow(5); // do not check new run earlier than after 5 seconds
547  }
548  fCmdCnt++;
549  fCmdReplies = 0;
550  fCmdQuality = 1.;
551 
552  if (!cmd.IsTimeoutSet() || (cmd.TimeTillTimeout() < 60)) {
553  DOUT0("INCREASE cmd %s TIMEOUT from %4.1f to 60 sec", cmd.GetName(), cmd.TimeTillTimeout());
554  cmd.SetTimeout(60.);
555  }
556 
557  double main_tmout = cmd.TimeTillTimeout() - 1;
558 
559  std::string query, prefix;
560  unsigned runid = 0;
561  unsigned lastrunid = fWorkerHierarchy.GetHChild("MasterRunId").GetField("value").AsUInt();
562  std::string lastprefix = fWorkerHierarchy.GetHChild("LastPrefix").GetField("value").AsStr();
563  if (isstart) {
564  prefix = cmd.GetStr("prefix");
565  if (prefix == "NO_FILE" || prefix == "--" || (prefix.empty() && (lastprefix == "--" || lastprefix.empty())))
566  isstart = false;
567  }
568  if (isstart) {
569  runid = cmd.GetUInt("runid");
570  if (runid == 0)
571  runid = hadaq::CreateRunId();
572  query = dabc::format("mode=start&runid=%u", runid);
573  if (!prefix.empty()) {
574  query.append("&prefix=");
575  query.append(prefix);
576  }
577  DOUT0("Starting new run %s", query.c_str());
578  } else {
579  query = "mode=stop";
580  }
581 
582  if (isstart)
583  fWorkerHierarchy.GetHChild("RunningCmd").SetField("value", dabc::format("Start %s %u", prefix.c_str(), runid));
584  else
585  fWorkerHierarchy.GetHChild("RunningCmd").SetField("value", dabc::format("Stop %s %u", lastprefix.c_str(), lastrunid));
586 
587  fWorkerHierarchy.GetHChild("MasterRunId").SetField("value", runid);
588 
589  if (isstart && !prefix.empty())
590  fWorkerHierarchy.GetHChild("LastPrefix").SetField("value", prefix);
591 
592  DOUT0("MASTER cmd:%s doing:%s query:%s prefix:%s lastprefix:%s lastrunid:%u cmdcnt:%d", cmd.GetName(), (isstart ? "START" : "STOP"), query.c_str(), prefix.c_str(), lastprefix.c_str(), lastrunid, fCmdCnt);
593 
594  for (unsigned n=0; n<builders.size(); ++n) {
595  dabc::CmdGetBinary subcmd(builders[n] + "/BnetFileControl", "execute", query);
596  subcmd.SetInt("#bnet_cnt", fCmdCnt);
597  subcmd.SetTimeout(main_tmout);
598  fCmdReplies++;
599  publ.Submit(Assign(subcmd));
600  DOUT0("Submit bldr cmd %s %s %f", subcmd.GetName(), DBOOL(subcmd.IsTimeoutSet()), subcmd.TimeTillTimeout());
601  }
602 
603  query = "";
604  if (isstart && (prefix == "tc") && lastprefix.empty()) {
605  query = dabc::format("mode=start&runid=%u", runid);
606  } else if (!isstart && (lastprefix == "tc")) {
607  query = dabc::format("mode=stop&runid=%u", lastrunid);
608  }
609 
610  if (!query.empty()) {
611  fCurrentFileCmd.SetBool("#calibr_run", true);
612  fCurrentFileCmd.SetUInt("#calibr_runid", lastrunid);
613 
614  // trigger calibration start for all TDCs
615  std::vector<std::string> inputs = fWorkerHierarchy.GetHChild("Inputs").GetField("value").AsStrVect();
616 
617  for (unsigned n=0; n<inputs.size(); ++n) {
618  dabc::CmdGetBinary subcmd(inputs[n] + "/BnetCalibrControl", "execute", query);
619  subcmd.SetInt("#bnet_cnt", fCmdCnt);
620  subcmd.SetTimeout(main_tmout);
621  fCmdReplies++;
622  publ.Submit(Assign(subcmd));
623  DOUT0("Submit input cmd %s %s %f", subcmd.GetName(), DBOOL(subcmd.IsTimeoutSet()), subcmd.TimeTillTimeout());
624  }
625  }
626 
627  return dabc::cmd_postponed;
628 
629  } else if (cmd.IsName("RefreshRun")) {
630 
631  if (!fCurrentRefreshCmd.null()) {
632  EOUT("Ignore run command - previous %s not completed", fCurrentRefreshCmd.GetName());
633  return dabc::cmd_false;
634  }
635 
636  dabc::WorkerRef publ = GetPublisher();
637  if (publ.null()) return dabc::cmd_false;
638 
639  fCurrentRefreshCmd = cmd;
640  fRefreshCnt++;
641  fRefreshReplies = 0;
642 
643  fCurrentRefreshCmd.SetDouble("quality", 1.0);
644 
645  // trigger calibration start for all TDCs
646  std::vector<std::string> inputs = fWorkerHierarchy.GetHChild("Inputs").GetField("value").AsStrVect();
647 
648  for (unsigned n = 0; n < inputs.size(); ++n) {
649  dabc::CmdGetBinary subcmd(inputs[n] + "/BnetCalibrRefresh", "execute", "");
650  subcmd.SetInt("#refresh_cnt", fRefreshCnt);
651  subcmd.SetTimeout(10.);
652  fRefreshReplies++;
653  publ.Submit(Assign(subcmd));
654  }
655 
656  return dabc::cmd_postponed;
657  } else if (cmd.IsName("ResetDAQ")) {
658  std::vector<std::string> builders = fWorkerHierarchy.GetHChild("Builders").GetField("value").AsStrVect();
659  std::vector<std::string> inputs = fWorkerHierarchy.GetHChild("Inputs").GetField("value").AsStrVect();
660 
661  dabc::WorkerRef publ = GetPublisher();
662  if (publ.null()) return dabc::cmd_false;
663 
664  for (unsigned n=0; n<inputs.size(); ++n) {
665  dabc::CmdGetBinary subcmd(inputs[n], "cmd.json", "command=DropAllBuffers");
666  publ.Submit(subcmd);
667  }
668 
669  for (unsigned n=0; n<builders.size(); ++n) {
670  dabc::CmdGetBinary subcmd(builders[n], "cmd.json", "command=DropAllBuffers");
671  publ.Submit(subcmd);
672  }
673 
674  return dabc::cmd_true;
675  }
676 
677  return dabc::cmd_true;
678 
679 }
Command used to produce custom binary data for published in hierarchy entries.
Definition: Publisher.h:33
Command to request names list.
Definition: Publisher.h:58
static Hierarchy GetResNamesList(dabc::Command &cmd)
Definition: Publisher.cxx:63
CommandDefinition & AddArg(const std::string &name, const std::string &kind="string", bool required=true, const RecordField &dflt=RecordField())
Definition: Parameter.cxx:593
Represents command with its arguments.
Definition: Command.h:99
double TimeTillTimeout(double extra=0.) const
Returns time which remains until command should be timed out.
Definition: Command.cxx:132
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
Definition: Command.h:148
double GetDouble(const std::string &name, double dflt=0.) const
Definition: Command.h:145
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
Definition: Command.cxx:108
bool IsTimeoutSet() const
Return true if timeout was specified.
Definition: Command.cxx:124
int GetResult() const
Definition: Command.h:174
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
bool SetDouble(const std::string &name, double v)
Definition: Command.h:144
bool IsTimedout() const
Returns true if command timeout is expired.
Definition: Command.h:195
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Class for holding GMT time with precision of nanoseconds.
Definition: timing.h:190
std::string AsJSString(int ndecimal=3) const
convert string into sec.frac format, can be interpret directly in JavaScript ISO 8601 standard is use...
Definition: timing.cxx:212
void SetJSDate(uint64_t jsdate)
Set value in form of JS date - milliseconds since 1.1.1970.
Definition: timing.h:224
uint64_t AsJSDate() const
Return date and time in JS format - number of millisecond since 1.1.1970.
Definition: timing.cxx:158
std::string AsString(int ndecimal=0, bool localtime=false) const
convert string into human-readable format, cannot be interpret directly in JavaScript
Definition: timing.cxx:164
DateTime & GetNow()
Definition: timing.cxx:147
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
std::string ItemName() const
Name which is used as item name in hierarchy.
Definition: Hierarchy.cxx:1133
Hierarchy GetHChild(const std::string &name, bool allowslahes=false, bool force=false, bool sortorder=false)
Return child, if necessary creates with full subfolder If force specified, missing childs and folders...
Definition: Hierarchy.cxx:944
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Definition: Hierarchy.cxx:934
Iterator over objects hierarchy
Definition: Iterator.h:36
Object * next(bool goinside=true)
Definition: Iterator.cxx:44
Reference ref() const
Definition: Iterator.h:50
virtual Parameter CreatePar(const std::string &name, const std::string &kind="")
Definition: Module.cxx:129
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Definition: Module.h:240
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
Definition: Module.cxx:109
bool SetValue(const RecordField &v)
Set parameter value.
Definition: Parameter.h:205
Parameter & SetFld(const std::string &name, const RecordField &v)
Definition: Parameter.h:272
Parameter & SetUnits(const std::string &unit)
Set units field of parameter.
Definition: Parameter.h:256
uint64_t AsUInt(uint64_t dflt=0) const
Definition: Record.cxx:525
bool AsBool(bool dflt=false) const
Definition: Record.cxx:477
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
int64_t AsInt(int64_t dflt=0) const
Definition: Record.cxx:501
double AsDouble(double dflt=0.) const
Definition: Record.cxx:549
RecordField GetField(const std::string &name) const
Definition: Record.h:510
bool HasField(const std::string &name) const
Definition: Record.h:498
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Reference on dabc::Worker
Definition: Worker.h:466
bool Submit(Command cmd)
Definition: Worker.cxx:1139
CommandDefinition CreateCmdDef(const std::string &name)
Definition: Worker.cxx:603
Hierarchy fWorkerHierarchy
place for publishing of worker parameters
Definition: Worker.h:168
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
virtual bool PublishPars(const std::string &path)
Definition: Worker.cxx:1081
void PreserveLastCalibr(bool do_write=false, double quality=1., unsigned runid=0)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
dabc::TimeStamp fLastRateTm
last time ratemeter was updated
dabc::TimeStamp fNewRunTm
time when last control count was send
int fRefreshCnt
currently running refresh command
int fCtrlCnt
how many control replies are awaited
int fCmdReplies
number of replies for current command
double fCmdQuality
current command quality, used when creating calibration
virtual bool ReplyCommand(dabc::Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
uint64_t fCurrentData
current value
uint64_t fTotalData
last value
unsigned fMaxRunSize
maximal run size in MB
uint64_t fTotalLost
last value
uint64_t fTotalEvents
last value
uint64_t fCurrentLost
current value
BnetMasterModule(const std::string &name, dabc::Command cmd=nullptr)
uint64_t fCurrentEvents
current value
virtual void BeforeModuleStart()
int fCtrlErrorCnt
number of consequent control errors
int fCtrlSzLimit
0 - do nothing, 1 - start checking (after start run), 2 - exced
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
int fCtrlId
counter for control requests
int fSameBuildersCnt
how many time same number of inputs was detected
bool fCtrlError
if there are error during current communication loop
dabc::TimeStamp fCtrlTm
time when last control count was send
bool fControl
when true, master actively controls BNET nodes and switches to new RUNs
int fRefreshReplies
number of replies for current command
void AddItem(std::vector< std::string > &items, std::vector< std::string > &nodes, const std::string &item, const std::string &node)
int fCmdCnt
just counter to avoid mismatch
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * prop_kind
Definition: Hierarchy.cxx:29
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38
std::string FormatFilename(uint32_t runid, uint16_t ebid=0)
uint32_t CreateRunId()
void GetNow()
Method to acquire current time stamp.
Definition: timing.h:137