DABC (Data Acquisition Backbone Core)  2.9.9
Monitor.cxx
Go to the documentation of this file.
1 // $Id: Monitor.cxx 4774 2021-05-10 07:08:06Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "mbs/Monitor.h"
17 
18 #include <cstdlib>
19 #include <cstring>
20 #include <cstdio>
21 #include <cmath>
22 
23 #include "dabc/Publisher.h"
24 
25 #define SYS__read_meb 0
26 #define SYS__collector 1
27 #define SYS__transport 2
28 #define SYS__event_serv 3
29 #define SYS__msg_log 4
30 #define SYS__dispatch 5
31 #define SYS__util 6
32 #define SYS__sbs_mon 7
33 #define SYS__read_cam_slav 8
34 #define SYS__esone_serv 9
35 #define SYS__stream_serv 10
36 #define SYS__histogram 11
37 #define SYS__prompt 12
38 #define SYS__daq_rate 13
39 #define SYS__smi 14
40 #define SYS__ds 15
41 #define SYS__dr 16
42 #define SYS__ar 17
43 #define SYS__rirec 18
44 #define SYS__to 19
45 #define SYS__vme_serv 20
46 
47 #define VERSION__SETUP 1
48 #define VERSION__SET_ML 1
49 #define VERSION__SET_MO 1
50 
51 
53  dabc::SocketIOAddon(fd),
54  fState(ioInit),
55  fStatus(),
56  fSwapping(false),
57  fSendCmd(0)
58 {
59  SetDeleteWorkerOnClose(false);
60 }
61 
62 
64 {
65  // DOUT0("mbs::DaqStatusAddon::OnThreadAssigned socket %d", fSocket);
66 
68 
69  if (fState == ioDone) fState = ioInit;
70 
71  if (fState != ioInit) {
72  EOUT("Get thread assigned in not-init state - check why");
73  exit(234);
74  }
75 
77 
78  memset(&fStatus, 0, sizeof(mbs::DaqStatus));
79  StartRecv(&fStatus, 28);
80 
81  fSendCmd = 1;
82  StartSend(&fSendCmd, sizeof(fSendCmd));
83 
84  // check that status data are received in reasonable time
85  ActivateTimeout(5.);
86 }
87 
88 double mbs::DaqStatusAddon::ProcessTimeout(double last_diff)
89 {
90  EOUT("Status timeout - delete addon");
91  DeleteAddonItself();
92  EOUT("Status timeout - delete addon called");
93  return -1;
94 }
95 
97 {
98  if (fState == ioRecvHeader) {
99 
100  if(fStatus.l_endian != 1) fSwapping = true;
101  if(fSwapping) mbs::SwapData(&fStatus, 28);
102 
103  fState = ioRecvData;
104 
105  if ((fStatus.l_version != 51) && (fStatus.l_version != 62) && (fStatus.l_version != 63)) {
106  EOUT("Unsupported status version %u", (unsigned) fStatus.l_version);
107  DeleteAddonItself();
108  return;
109  }
110 
111  StartRecv(&fStatus.bh_daqst_initalized, (fStatus.l_fix_lw-7)*4);
112  return;
113  }
114 
115  if (fState == ioRecvData) {
116 
117  if (fSwapping)
118  mbs::SwapData(&fStatus.bh_daqst_initalized, (fStatus.l_fix_lw-7)*4 - (19 * fStatus.l_sbs__str_len_64));
119 
120  fState = ioDone;
121 
122  SubmitWorkerCmd(dabc::Command("ProcessDaqStatus"));
123  return;
124  }
125 
126  EOUT("Wrong state when recv data");
127  DeleteAddonItself();
128 }
129 
130 // =========================================================================
131 
132 mbs::Monitor::Monitor(const std::string &name, dabc::Command cmd) :
133  mbs::MonitorSlowControl(name, "Mbs", cmd),
134  fHierarchy(),
135  fCounter(0),
136  fMbsNode(),
137  fPeriod(1.),
138  fRateInterval(1.),
139  fHistory(100),
140  fStatPort(0),
141  fLoggerPort(0),
142  fCmdPort(0),
143  fWaitingLogger(true),
144  fStatus(),
145  fStatStamp(),
146  fPrintf(false)
147 {
148  fMbsNode = Cfg("node", cmd).AsStr();
149  fAlias = Cfg("alias", cmd).AsStr(fMbsNode);
150  fPeriod = Cfg("period", cmd).AsDouble(1.);
151  fRateInterval = Cfg("rateinterval", cmd).AsDouble(1.);
152  fHistory = Cfg("history", cmd).AsInt(200);
153  bool publish = Cfg("publish", cmd).AsBool(true);
154  fPrintf = Cfg("printf", cmd).AsBool(false);
155 
156  fFileStateName = "MbsFileOpen";
157  fAcqStateName = "MbsAcqRunning";
158  fSetupStateName = "MbsSetupLoaded";
159  fRateIntervalName = "MbsRateInterval";
160  fHistoryName = "MbsHistoryDepth";
161 
162  if (Cfg("stat", cmd).AsStr() == "true")
163  fStatPort = 6008;
164  else
165  fStatPort = Cfg("stat", cmd).AsInt(0);
166 
167  if (Cfg("logger", cmd).AsStr() == "true")
168  fLoggerPort = 6007;
169  else
170  fLoggerPort = Cfg("logger", cmd).AsInt(0);
171 
172  if (Cfg("cmd", cmd).AsStr() == "true")
173  fCmdPort = 6019;
174  else
175  fCmdPort = Cfg("cmd", cmd).AsInt(0);
176 
177  fHierarchy.Create("MBS");
178 
179  // this is just emulation, later one need list of real variables
180  dabc::Hierarchy item = fHierarchy.CreateHChild("DataRate");
181  item.SetField(dabc::prop_kind, "rate");
182  if (fHistory>1) item.EnableHistory(fHistory);
183 
184  item = fHierarchy.CreateHChild("EventRate");
185  item.SetField(dabc::prop_kind, "rate");
186  if (fHistory>1) item.EnableHistory(fHistory);
187 
188  item = fHierarchy.CreateHChild("ServerRate");
189  item.SetField(dabc::prop_kind, "rate");
190  if (fHistory>1) item.EnableHistory(fHistory);
191 
192  item = fHierarchy.CreateHChild("FileRate");
193  item.SetField(dabc::prop_kind, "rate");
194  if (fHistory>1) item.EnableHistory(fHistory);
195 
196  item = fHierarchy.CreateHChild("logger");
197  item.SetField(dabc::prop_kind, "log");
198  if (fHistory>1) item.EnableHistory(fHistory);
199 
200  item = fHierarchy.CreateHChild("rate_log");
201  item.SetField(dabc::prop_kind, "log");
202  if (fHistory>1) item.EnableHistory(fHistory);
203 
204  item = fHierarchy.CreateHChild("rash_log");
205  item.SetField(dabc::prop_kind, "log");
206  if (fHistory>1) item.EnableHistory(fHistory);
207 
208  item = fHierarchy.CreateHChild("rast_log");
209  item.SetField(dabc::prop_kind, "log");
210  if (fHistory>1) item.EnableHistory(fHistory);
211 
212  item = fHierarchy.CreateHChild("ratf_log");
213  item.SetField(dabc::prop_kind, "log");
214  if (fHistory>1) item.EnableHistory(fHistory);
215 
216  item = fHierarchy.CreateHChild(fFileStateName);
217  item.SetField(dabc::prop_kind, "log");
218  if (fHistory>1) item.EnableHistory(fHistory);
219 
220  item = fHierarchy.CreateHChild(fAcqStateName);
221  item.SetField(dabc::prop_kind, "log");
222  if (fHistory>1) item.EnableHistory(fHistory);
223 
224  item = fHierarchy.CreateHChild(fSetupStateName);
225  item.SetField(dabc::prop_kind, "log");
226  if (fHistory>1) item.EnableHistory(fHistory);
227 
228  item = fHierarchy.CreateHChild(fRateIntervalName);
229  item.SetField(dabc::prop_kind, "log");
230  if (fHistory>1) item.EnableHistory(fHistory);
231 
232  item = fHierarchy.CreateHChild(fHistoryName);
233  item.SetField(dabc::prop_kind, "log");
234  if (fHistory>1) item.EnableHistory(fHistory);
235 
236  SetRateInterval(fRateInterval); // update exported parameters here
237  SetHistoryDepth(fHistory);
238 
239  if (fCmdPort > 0) {
240  dabc::CommandDefinition cmddef = fHierarchy.CreateHChild("CmdMbs");
241  cmddef.SetField(dabc::prop_kind, "DABC.Command");
242  cmddef.SetField(dabc::prop_auth, true); // require authentication
243  cmddef.AddArg("cmd", "string", true, "show rate");
244 
245  dabc::CommandDefinition cmddef_rate = fHierarchy.CreateHChild("CmdSetRateInterval");
246  cmddef_rate.SetField(dabc::prop_kind, "DABC.Command");
247  cmddef_rate.SetField(dabc::prop_auth, true); // require authentication
248  cmddef_rate.AddArg("time", "double", true, "1.0");
249 
250  dabc::CommandDefinition cmddef_hist = fHierarchy.CreateHChild("CmdSetHistoryDepth");
251  cmddef_hist.SetField(dabc::prop_kind, "DABC.Command");
252  cmddef_hist.SetField(dabc::prop_auth, true); // require authentication
253  cmddef_hist.AddArg("entries", "int", true, "100");
254  }
255 
256  dabc::Hierarchy ui = fHierarchy.CreateHChild("ControlGUI");
257  ui.SetField(dabc::prop_kind, "DABC.HTML");
258  ui.SetField("_UserFilePath", "${DABCSYS}/plugins/mbs/htm/");
259  ui.SetField("_UserFileMain", "main.htm");
260 
261  CreateTimer("update", 5.);
262  CreateTimer("MbsUpdate", fPeriod);
263 
264  fCounter = 0;
265 
266  memset(&fStatus, 0, sizeof(mbs::DaqStatus));
267 
268  // from this point on Publisher want to get regular update for the hierarchy
269  if (publish)
270  Publish(fHierarchy, std::string("/MBS/") + fAlias);
271 
272  if (fLoggerPort <= 0) NewMessage("!!! logger not activated !!!");
273 }
274 
275 
277 {
278 }
279 
281 {
282  if (fLoggerPort > 0) {
283  DaqLogWorker* logger = new DaqLogWorker(this, "DaqLogger", fMbsNode, fLoggerPort);
284  logger->AssignToThread(thread());
285  } else {
286  // if logger not used, can created command worker directly
287  // otherwise wait log addon connected
288  CreateCommandWorker();
289  }
290 
292 }
293 
295 {
296  fWaitingLogger = false;
297 
298  dabc::WorkerRef wrk = FindChildRef("DaqCmd");
299  if (!wrk.null()) return;
300 
301  DOUT0("Create command worker");
302 
303  dabc::Worker* remcmd = nullptr;
304  if (IsPrompter()) {
305  remcmd = new PrompterWorker(this, "DaqCmd", fMbsNode, fCmdPort);
306  } else if (fCmdPort > 0) {
307  remcmd = new DaqRemCmdWorker(this, "DaqCmd", fMbsNode, fCmdPort);
308  }
309  if (remcmd) {
310  remcmd->AssignToThread(thread());
311  if (!fWaitingCmd.null())
312  remcmd->Submit(fWaitingCmd);
313  }
314 }
315 
316 void mbs::Monitor::FillStatistic(const std::string &options, const std::string &itemname, mbs::DaqStatus* old_daqst, mbs::DaqStatus* new_daqst, double diff_time)
317 {
318  int bStreams_n = 0, bBuffers_n = 0, bEvents_n = 0, bData_n = 0;
319  int bStreams_r = 0, bBuffers_r = 0, bEvents_r = 0, bData_r = 0;
320  int bSrvStreams_n = 0, bSrvEvents_n = 0, bSrvBuffers_n = 0, bSrvData_n = 0;
321  int bSrvStreams_r = 0, bSrvEvents_r = 0, bSrvBuffers_r = 0, bSrvData_r = 0;
322  int bFilename = 0, bFileFilled = 0, bFileData = 0, bFileData_r = 0, bFileIndex = 0;
323  int bStreamsFree = 0, bStreamsFilled = 0, bStreamsSrv = 0;
324 
325  int bl_file = 0, bl_server = 0, bl_mbs = 0, bl_streams = 0;
326 
327  int l_free_stream(0), l_trans_stream(0), l_serv_stream(0);
328 
329  char c_head0[1000];
330  char c_head[1000];
331  char c_out[1100];
332 
333  char c_line[1000];
334 
335  strncpy(c_line, options.c_str(), sizeof(c_line)-1);
336 
337  if (strstr (c_line, "-nst") != NULL )
338  bStreams_n = 1;
339  if (strstr (c_line, "-rst") != NULL )
340  bStreams_r = 1;
341  if (strstr (c_line, "-nsst") != NULL )
342  bSrvStreams_n = 1;
343  if (strstr (c_line, "-rsst") != NULL )
344  bSrvStreams_r = 1;
345  if (strstr (c_line, "-est") != NULL )
346  bStreamsFree = 1;
347  if (strstr (c_line, "-fst") != NULL )
348  bStreamsFilled = 1;
349  if (strstr (c_line, "-kst") != NULL )
350  bStreamsSrv = 1;
351  if (strstr (c_line, "-nbu") != NULL )
352  bBuffers_n = 1;
353  if (strstr (c_line, "-rbu") != NULL )
354  bBuffers_r = 1;
355  if (strstr (c_line, "-nsbu") != NULL )
356  bSrvBuffers_n = 1;
357  if (strstr (c_line, "-rsbu") != NULL )
358  bSrvBuffers_r = 1;
359  if (strstr (c_line, "-nev") != NULL )
360  bEvents_n = 1;
361  if (strstr (c_line, "-rev") != NULL )
362  bEvents_r = 1;
363  if (strstr (c_line, "-nsev") != NULL )
364  bSrvEvents_n = 1;
365  if (strstr (c_line, "-rsev") != NULL )
366  bSrvEvents_r = 1;
367  if (strstr (c_line, "-nda") != NULL )
368  bData_n = 1;
369  if (strstr (c_line, "-rda") != NULL )
370  bData_r = 1;
371  if (strstr (c_line, "-nsda") != NULL )
372  bSrvData_n = 1;
373  if (strstr (c_line, "-rsda") != NULL )
374  bSrvData_r = 1;
375  if (strstr (c_line, "-sfi") != NULL )
376  bFileData = 1;
377  if (strstr (c_line, "-ffi") != NULL )
378  bFileFilled = 1;
379  if (strstr (c_line, "-rfi") != NULL )
380  bFileData_r = 1;
381  if (strstr (c_line, "-nfi") != NULL )
382  bFilename = 1;
383  if (strstr (c_line, "-ifi") != NULL )
384  bFileIndex = 1;
385  if (strstr (c_line, "-mbs") != NULL )
386  {
387  // printf ("-mbs # mbs part\n");
388  bStreams_n = 1;
389  bStreams_r = 1;
390  bStreamsFree = 1;
391  bStreamsFilled = 1;
392  bStreamsSrv = 1;
393  bBuffers_n = 1;
394  bBuffers_r = 1;
395  bEvents_n = 1;
396  bEvents_r = 1;
397  bData_n = 1;
398  bData_r = 1;
399  }
400  if (strstr (c_line, "-u") != NULL )
401  {
402  // printf ("-user # default selection\n");
403  bEvents_n = 1;
404  bEvents_r = 1;
405  bData_n = 1;
406  bData_r = 1;
407  if (new_daqst->bh_running[SYS__event_serv])
408  bSrvEvents_r = 1;
409  if (new_daqst->bh_running[SYS__stream_serv])
410  bSrvData_r = 1;
411  bFileData_r = 1;
412  bFileIndex = 1;
413  }
414  if (strstr (c_line, "-fi") != NULL )
415  {
416  //printf ("-fi[le] # all file switches\n");
417  bFilename = 1;
418  bFileData = 1;
419  bFileFilled = 1;
420  bFileData_r = 1;
421  }
422  if (strstr (c_line, "-st") != NULL )
423  {
424  // printf ("-st[reams] # stream usage\n");
425  bStreamsFree = 1;
426  bStreamsFilled = 1;
427  bStreamsSrv = 1;
428  bSrvStreams_r = 1;
429  }
430  if (strstr (c_line, "-se") != NULL )
431  {
432  // printf ("-se[rver] # all server switches\n");
433  if (new_daqst->bh_running[SYS__event_serv])
434  {
435  bSrvEvents_n = 1;
436  bSrvEvents_r = 1;
437  }
438  if (new_daqst->bh_running[SYS__stream_serv])
439  {
440  bSrvBuffers_n = 1;
441  bSrvBuffers_r = 1;
442  bSrvData_n = 1;
443  bSrvData_r = 1;
444  bSrvStreams_r = 1;
445  bSrvStreams_n = 1;
446  }
447  }
448  if (strstr (c_line, "-ra") != NULL )
449  {
450  // printf ("-ra[tes] # all rates\n");
451  bStreams_r = 1;
452  bBuffers_r = 1;
453  bEvents_r = 1;
454  bData_r = 1;
455  bFileData_r = 1;
456  if (new_daqst->bh_running[SYS__event_serv])
457  bSrvEvents_r = 1;
458  if (new_daqst->bh_running[SYS__stream_serv])
459  {
460  bSrvStreams_r = 1;
461  bSrvBuffers_r = 1;
462  bSrvData_r = 1;
463  }
464  }
465 
466  bl_mbs = bData_n + bEvents_n + bBuffers_n + bStreams_n + bData_r + bEvents_r + bBuffers_r + bStreams_r;
467  bl_server = bSrvData_n + bSrvEvents_n + bSrvBuffers_n + bSrvStreams_n + bSrvData_r + bSrvEvents_r + bSrvBuffers_r
468  + bSrvStreams_r;
469  bl_streams = bStreamsFree + bStreamsFilled + bStreamsSrv;
470  bl_file = bFileData + bFileFilled + bFileData_r + bFilename + bFileIndex;
471 
472  memset(c_head0, 0, sizeof(c_head0));
473  memset(c_head, 0, sizeof(c_head));
474  if (bl_mbs)
475  {
476  strncat (c_head0, " Event building", sizeof(c_head0)-1);
477  if (bData_n)
478  {
479  strncat (c_head, " MB ", sizeof(c_head)-1);
480  strncat (c_head0, " ", sizeof(c_head0)-1);
481  }
482  if (bEvents_n)
483  {
484  strncat (c_head, " Events ", sizeof(c_head)-1);
485  strncat (c_head0, " ", sizeof(c_head0)-1);
486  }
487  if (bBuffers_n)
488  {
489  strncat (c_head, " Buffers ", sizeof(c_head)-1);
490  strncat (c_head0, " ", sizeof(c_head0)-1);
491  }
492  if (bStreams_n)
493  {
494  strncat (c_head, "Streams ", sizeof(c_head)-1);
495  strncat (c_head0, " ", sizeof(c_head0)-1);
496  }
497  if (bData_r)
498  {
499  strncat (c_head, " Kb/sec ", sizeof(c_head)-1);
500  strncat (c_head0, " ", sizeof(c_head0)-1);
501  }
502  if (bEvents_r)
503  {
504  strncat (c_head, " Ev/sec ", sizeof(c_head)-1);
505  strncat (c_head0, " ", sizeof(c_head0)-1);
506  }
507  if (bBuffers_r)
508  {
509  strncat (c_head, "Buf/sec ", sizeof(c_head)-1);
510  strncat (c_head0, " ", sizeof(c_head0)-1);
511  }
512  if (bStreams_r)
513  {
514  strncat (c_head, "Str/sec ", sizeof(c_head)-1);
515  strncat (c_head0, " ", sizeof(c_head0)-1);
516  }
517  c_head0[strlen (c_head0) - 15] = 0;
518  }
519  bl_server = bSrvData_n + bSrvEvents_n + bSrvBuffers_n + bSrvStreams_n + bSrvData_r + bSrvEvents_r + bSrvBuffers_r
520  + bSrvStreams_r;
521  if (bl_server)
522  {
523  strncat (c_head0, "| Server ", sizeof(c_head0)-1);
524  strncat (c_head, "|", sizeof(c_head)-1);
525  if (bSrvData_n)
526  {
527  strncat (c_head, " MB ", sizeof(c_head)-1);
528  strncat (c_head0, " ", sizeof(c_head0)-1);
529  }
530  if (bSrvEvents_n)
531  {
532  strncat (c_head, " Events ", sizeof(c_head)-1);
533  strncat (c_head0, " ", sizeof(c_head0)-1);
534  }
535  if (bSrvBuffers_n)
536  {
537  strncat (c_head, " Buffers ", sizeof(c_head)-1);
538  strncat (c_head0, " ", sizeof(c_head0)-1);
539  }
540  if (bSrvStreams_n)
541  {
542  strncat (c_head, "Streams ", sizeof(c_head)-1);
543  strncat (c_head0, " ", sizeof(c_head0)-1);
544  }
545  if (bSrvData_r)
546  {
547  strncat (c_head, " Kb/sec ", sizeof(c_head)-1);
548  strncat (c_head0, " ", sizeof(c_head0)-1);
549  }
550  if (bSrvEvents_r)
551  {
552  strncat (c_head, " Ev/sec ", sizeof(c_head)-1);
553  strncat (c_head0, " ", sizeof(c_head0)-1);
554  }
555  if (bSrvBuffers_r)
556  {
557  strncat (c_head, "Buf/sec ", sizeof(c_head)-1);
558  strncat (c_head0, " ", sizeof(c_head0)-1);
559  }
560  if (bSrvStreams_r)
561  {
562  strncat (c_head, "Str/sec ", sizeof(c_head)-1);
563  strncat (c_head0, " ", sizeof(c_head0)-1);
564  }
565  c_head0[strlen (c_head0) - 8] = 0;
566  }
567  bl_streams = bStreamsFree + bStreamsFilled + bStreamsSrv;
568  if (bl_streams)
569  {
570  strncat (c_head0, "| Streams ", sizeof(c_head0)-1);
571  strncat (c_head, "|", sizeof(c_head)-1);
572  if (bStreamsFree)
573  {
574  strncat (c_head, "Empty ", sizeof(c_head)-1);
575  strncat (c_head0, " ", sizeof(c_head0)-1);
576  }
577  if (bStreamsFilled)
578  {
579  strncat (c_head, "Full ", sizeof(c_head)-1);
580  strncat (c_head0, " ", sizeof(c_head0)-1);
581  }
582  if (bStreamsSrv)
583  {
584  strncat (c_head, "Hold ", sizeof(c_head)-1);
585  strncat (c_head0, " ", sizeof(c_head0)-1);
586  }
587  c_head0[strlen (c_head0) - 9] = 0;
588  }
589  bl_file = bFileData + bFileFilled + bFileData_r + bFilename + bFileIndex;
590  if (bl_file)
591  {
592  strncat (c_head0, "| File output ", sizeof(c_head0)-1);
593  if (bFilename)
594  strncat (c_head0, " ", sizeof(c_head0)-1);
595  strncat (c_head, "|", sizeof(c_head)-1);
596  if (bFileData)
597  strncat (c_head, " MB ", sizeof(c_head)-1);
598  if (bFileFilled)
599  strncat (c_head, "Filled ", sizeof(c_head)-1);
600  if (bFileData_r)
601  strncat (c_head, " Kb/sec ", sizeof(c_head)-1);
602  if (bFileIndex)
603  strncat (c_head, "Index ", sizeof(c_head)-1);
604  if (bFilename)
605  strncat (c_head, "Filename", sizeof(c_head)-1);
606  }
607 
608  uint32_t bl_ev_buf_len = new_daqst->l_block_size;
609  uint32_t bl_n_ev_buf = new_daqst->bl_no_stream_buf;
610  //uint32_t bl_n_stream = new_daqst->bl_no_streams;
611 
612 
613  double r_new_buf = new_daqst->bl_n_buffers;
614  double r_new_evt = new_daqst->bl_n_events;
615  double r_new_stream = new_daqst->bl_n_bufstream;
616  double r_new_evsrv_evt = new_daqst->bl_n_evserv_events;
617  //double r_new_evsrv_kb = new_daqst->bl_n_evserv_kbytes * 1.024;
618  double r_new_kb = new_daqst->bl_n_kbyte * 1.024;
619  //double r_new_tape_kb = new_daqst->bl_n_kbyte_tape * 1.024;
620  double r_new_file_kb = new_daqst->bl_n_kbyte_file * 1.024;
621  double r_new_strsrv_str = new_daqst->bl_n_strserv_bufs;
622  double r_new_strsrv_buf = new_daqst->bl_n_strserv_bufs;
623  double r_new_strsrv_kb = new_daqst->bl_n_strserv_kbytes * 1.024;
624 
625 
626  double r_old_buf = old_daqst->bl_n_buffers;
627  double r_old_evt = old_daqst->bl_n_events;
628  double r_old_stream = old_daqst->bl_n_bufstream;
629  double r_old_evsrv_evt = old_daqst->bl_n_evserv_events;
630  //double r_old_evsrv_kb = old_daqst->bl_n_evserv_kbytes * 1.024;
631  double r_old_kb = old_daqst->bl_n_kbyte * 1.024;
632  //double r_old_tape_kb = old_daqst->bl_n_kbyte_tape * 1.024;
633  double r_old_file_kb = old_daqst->bl_n_kbyte_file * 1.024;
634  double r_old_strsrv_str = old_daqst->bl_n_strserv_bufs;
635  double r_old_strsrv_buf = old_daqst->bl_n_strserv_bufs;
636  double r_old_strsrv_kb = old_daqst->bl_n_strserv_kbytes * 1.024;
637 
638  double r_rate_buf = (r_new_buf - r_old_buf) / diff_time;
639  double r_rate_kb = (r_new_kb - r_old_kb) / diff_time;
640  double r_rate_evt = (r_new_evt - r_old_evt) / diff_time;
641  double r_rate_stream = (r_new_stream - r_old_stream) / diff_time;
642  double r_rate_evsrv_evt = (r_new_evsrv_evt - r_old_evsrv_evt) / diff_time;
643  //double r_rate_evsrv_kb = (r_new_evsrv_kb - r_old_evsrv_kb) / diff_time;
644  //double r_rate_tape_kb = (r_new_tape_kb - r_old_tape_kb) / diff_time;
645  double r_rate_file_kb = (r_new_file_kb - r_old_file_kb) / diff_time;
646  double r_rate_strsrv_str = (r_new_strsrv_str - r_old_strsrv_str) / diff_time / bl_n_ev_buf;
647  double r_rate_strsrv_buf = (r_new_strsrv_buf - r_old_strsrv_buf) / diff_time;
648  double r_rate_strsrv_kb = (r_new_strsrv_kb - r_old_strsrv_kb) / diff_time;
649 
650  memset(c_out, 0, sizeof(c_out));
651  memset(c_line, 0, sizeof(c_line));
652  if (bl_mbs)
653  {
654  if (bData_n)
655  {
656  snprintf (c_line, sizeof(c_line), "%10.0f ", r_new_buf / 1000000. * bl_ev_buf_len);
657  strncat (c_out, c_line, sizeof(c_out)-1);
658  }
659  if (bEvents_n)
660  {
661  snprintf (c_line, sizeof(c_line), "%10u ", (unsigned) new_daqst->bl_n_events);
662  strncat (c_out, c_line, sizeof(c_out)-1);
663  }
664  if (bBuffers_n)
665  {
666  snprintf (c_line, sizeof(c_line), "%10u ", (unsigned) new_daqst->bl_n_buffers);
667  strncat (c_out, c_line, sizeof(c_out)-1);
668  }
669  if (bStreams_n)
670  {
671  snprintf (c_line, sizeof(c_line), "%7u ", (unsigned) new_daqst->bl_n_bufstream);
672  strncat (c_out, c_line, sizeof(c_out)-1);
673  }
674  if (bData_r)
675  {
676  snprintf (c_line, sizeof(c_line), "%7.1f ", r_rate_kb);
677  strncat (c_out, c_line, sizeof(c_out)-1);
678  }
679  if (bEvents_r)
680  {
681  snprintf (c_line, sizeof(c_line), "%7.0f ", r_rate_evt);
682  strncat (c_out, c_line, sizeof(c_out)-1);
683  }
684  if (bBuffers_r)
685  {
686  snprintf (c_line, sizeof(c_line), "%7.0f ", r_rate_buf);
687  strncat (c_out, c_line, sizeof(c_out)-1);
688  }
689  if (bStreams_r)
690  {
691  snprintf (c_line, sizeof(c_line), "%7.0f ", r_rate_stream);
692  strncat (c_out, c_line, sizeof(c_out)-1);
693  }
694  }
695  if (bl_server)
696  {
697  strncat (c_out, "|", sizeof(c_out)-1);
698  if (bSrvData_n)
699  {
700  snprintf (c_line, sizeof(c_line), "%10.0f ", r_new_strsrv_kb / 1000.);
701  strncat (c_out, c_line, sizeof(c_out)-1);
702  }
703  if (bSrvEvents_n)
704  {
705  snprintf (c_line, sizeof(c_line), "%10u ", (unsigned) new_daqst->bl_n_evserv_events);
706  strncat (c_out, c_line, sizeof(c_out)-1);
707  }
708  if (bSrvBuffers_n)
709  {
710  snprintf (c_line, sizeof(c_line), "%10u ", (unsigned) new_daqst->bl_n_strserv_bufs);
711  strncat (c_out, c_line, sizeof(c_out)-1);
712  }
713  if (bSrvStreams_n)
714  {
715  snprintf (c_line, sizeof(c_line), "%7u ", (unsigned) new_daqst->bl_n_strserv_bufs / bl_n_ev_buf);
716  strncat (c_out, c_line, sizeof(c_out)-1);
717  }
718  if (bSrvData_r)
719  {
720  snprintf (c_line, sizeof(c_line), "%7.1f ", r_rate_strsrv_kb);
721  strncat (c_out, c_line, sizeof(c_out)-1);
722  }
723  if (bSrvEvents_r)
724  {
725  snprintf (c_line, sizeof(c_line), "%7.0f ", r_rate_evsrv_evt);
726  strncat (c_out, c_line, sizeof(c_out)-1);
727  }
728  if (bSrvBuffers_r)
729  {
730  snprintf (c_line, sizeof(c_line), "%7.0f ", r_rate_strsrv_buf);
731  strncat (c_out, c_line, sizeof(c_out)-1);
732  }
733  if (bSrvStreams_r)
734  {
735  snprintf (c_line, sizeof(c_line), "%7.0f ", r_rate_strsrv_str);
736  strncat (c_out, c_line, sizeof(c_out)-1);
737  }
738  }
739  if (bl_streams)
740  {
741  strncat (c_out, "|", sizeof(c_out)-1);
742  if (bStreamsFree)
743  {
744  snprintf (c_line, sizeof(c_line), "%5d ", l_free_stream);
745  strncat (c_out, c_line, sizeof(c_out)-1);
746  }
747  if (bStreamsFilled)
748  {
749  snprintf (c_line, sizeof(c_line), "%4d ", l_trans_stream);
750  strncat (c_out, c_line, sizeof(c_out)-1);
751  }
752  if (bStreamsSrv)
753  {
754  snprintf (c_line, sizeof(c_line), "%4d ", l_serv_stream);
755  strncat (c_out, c_line, sizeof(c_out)-1);
756  }
757  }
758  if (bl_file)
759  {
760  strncat (c_out, "|", sizeof(c_out)-1);
761  if (bFileData)
762  {
763  snprintf (c_line, sizeof(c_line), "%8.0f ", r_new_file_kb / 1000.);
764  strncat (c_out, c_line, sizeof(c_out)-1);
765  }
766  if (bFileFilled)
767  {
768  snprintf (c_line, sizeof(c_line), "%5.1f %% ", new_daqst->l_file_size > 0 ? r_new_file_kb / new_daqst->l_file_size * 0.1 : 0.);
769  strncat (c_out, c_line, sizeof(c_out)-1);
770  }
771  if (bFileData_r)
772  {
773  snprintf (c_line, sizeof(c_line), "%8.1f ", r_rate_file_kb);
774  strncat (c_out, c_line, sizeof(c_out)-1);
775  }
776  if (bFileIndex)
777  {
778  snprintf (c_line, sizeof(c_line), " %04u ", (unsigned) new_daqst->l_file_cur);
779  strncat (c_out, c_line, sizeof(c_out)-1);
780  }
781  if (bFilename)
782  {
783  snprintf (c_line, sizeof(c_line), "%s", new_daqst->c_file_name);
784  strncat (c_out, c_line, sizeof(c_out)-1);
785  }
786  if (new_daqst->l_open_file)
787  strncat (c_out, " op", sizeof(c_out)-1);
788  else
789  strncat (c_out, " cl", sizeof(c_out)-1);
790  }
791 
792  dabc::Hierarchy item = fHierarchy.GetHChild(itemname);
793 
794  if (fCounter % 20 == 0) {
795  item.SetField("value", c_head0);
796  item.MarkChangedItems();
797  item.SetField("value", c_head);
798  item.MarkChangedItems();
799  }
800  item.SetField("value", c_out);
801  item.SetFieldModified("value");
802 
803  if (options=="-u") {
804  // printf("%s\n",c_out);
805  fHierarchy.GetHChild("DataRate").SetField("value", dabc::format("%3.1f", r_rate_kb));
806  fHierarchy.GetHChild("DataRate").SetFieldModified("value");
807  fHierarchy.GetHChild("EventRate").SetField("value", dabc::format("%3.1f", r_rate_evt));
808  fHierarchy.GetHChild("EventRate").SetFieldModified("value");
809  fHierarchy.GetHChild("ServerRate").SetField("value", dabc::format("%3.1f", r_rate_strsrv_kb));
810  fHierarchy.GetHChild("ServerRate").SetFieldModified("value");
811  fHierarchy.GetHChild("FileRate").SetField("value", dabc::format("%3.1f", r_rate_file_kb));
812  fHierarchy.GetHChild("FileRate").SetFieldModified("value");
813  }
814 
815  if (fDoRec) {
816  std::string prefix = std::string("MBS.") + fMbsNode + std::string(".");
817  fRec.AddDouble(prefix + "DataRate", r_rate_kb, true);
818  fRec.AddDouble(prefix + "EventRate", r_rate_evt, true);
819  fRec.AddDouble(prefix + "ServerRate", r_rate_strsrv_kb, true);
820  fRec.AddDouble(prefix + "FileRate", r_rate_file_kb, true);
821  }
822 
823  fHierarchy.MarkChangedItems();
824 }
825 
826 void mbs::Monitor::ProcessTimerEvent(unsigned timer)
827 {
828  if (TimerName(timer) != "MbsUpdate") {
830  return;
831  }
832 
833  if (fMbsNode.empty()) {
834  fCounter++;
835 
836  double v1 = 100. * (1.3 + sin(dabc::Now().AsDouble()/5.));
837  fHierarchy.GetHChild("DataRate").SetField("value", dabc::format("%4.2f", v1));
838 
839  v1 = 100. * (1.3 + cos(dabc::Now().AsDouble()/8.));
840  fHierarchy.GetHChild("EventRate").SetField("value", dabc::format("%4.2f", v1));
841 
842  fHierarchy.GetHChild("rate_log").SetField("value", dabc::format("| Header | Entry | Rate |"));
843  fHierarchy.GetHChild("rate_log").SetField("value", dabc::format("| | %5d | %6.2f |", fCounter,v1));
844 
845  fHierarchy.MarkChangedItems();
846  return;
847  }
848 
849 // DOUT0("+++++++++++++++++++++++++++ Process timer!!!");
850 
851  // this indicated that addon is active and we should not touch it
852  // SL 20.05.2015: allow to access status record also with prompter
853  if (!fAddon.null() || (fStatPort<=0)) return;
854 
855  int fd = dabc::SocketThread::StartClient(fMbsNode, fStatPort);
856  if (fd<=0)
857  EOUT("FAIL status port %d for node %s", fStatPort, fMbsNode.c_str());
858  else
859  AssignAddon(new DaqStatusAddon(fd));
860 }
861 
862 
864 {
865  // if logger addon connected,
866  // one can create command worker then
867  CreateCommandWorker();
868 }
869 
870 void mbs::Monitor::NewMessage(const std::string &msg)
871 {
872  dabc::Hierarchy item = fHierarchy.GetHChild("logger");
873 
874  if (!item.null()) {
875  item.SetField("value", msg);
876  item.SetFieldModified("value");
877  fHierarchy.MarkChangedItems();
878  }
879 
880  if (fPrintf) printf("%s\n", msg.c_str());
881 }
882 
883 
884 void mbs::Monitor::NewSendCommand(const std::string &cmd, int res)
885 {
886  if (!fPrintf) return;
887  if (res>=0) printf("replcmd>%s res=%s\n", cmd.c_str(), DBOOL(res));
888  else printf("sendcmd>%s\n", cmd.c_str());
889 }
890 
891 
893 {
894  dabc::TimeStamp stamp;
895  stamp.GetNow();
896 
897  double tmdiff = stamp.AsDouble() - fStatStamp.AsDouble();
898  if (tmdiff<=0) {
899  EOUT("Wrong time calculation");
900  return;
901  }
902  double deltaT=fabs((tmdiff-fRateInterval)/tmdiff); // JAM smooth glitches between timer period and time stamp by this
903  DOUT3("NEW STATUS with rate interval:%f , dt=%f\n, delta=%f", fRateInterval, tmdiff, deltaT);
904  if ((tmdiff>0) && ((deltaT<1/fRateInterval) || (ceil(tmdiff) > 1 + fRateInterval) ) )
905  {
906  // last term ensures that we enter this section if fRateInterval is interactively changed to lower values
907  if (!fStatus.null()){
908  FillStatistic("-u", "rate_log", &fStatus, &stat, tmdiff);
909  FillStatistic("-rev -rda -nev -nda", "rash_log", &fStatus, &stat, tmdiff);
910  FillStatistic("-rev -rda -nev -nda -rsda", "rast_log", &fStatus, &stat, tmdiff);
911  FillStatistic("-rev -rda -nev -nda -rsda -fi", "ratf_log", &fStatus, &stat, tmdiff);
912 
913  DOUT3("Filled statistics with rate interval:%f after dt=%f\n", fRateInterval, tmdiff);
914  fCounter++;
915  }
916 
917  memcpy(&fStatus, &stat, sizeof(stat));
918  fStatStamp = stamp;
919  }
920  DOUT3("Got acquisition running=%d, file open=%d", stat.bh_acqui_running, stat.l_open_file);
921 
922  UpdateSetupState((stat.bh_setup_loaded) && (stat.bh_running[SYS__util]));
923  // <- after shutdown, check also if util task is still there, bh_setup_loaded is not reset
924  UpdateMbsState(stat.bh_acqui_running);
925  UpdateFileState((stat.l_open_file) && (stat.bh_running[SYS__transport]));
926  // <- after shutdown, check also if transport task is still there, l_open_file is not reset
927 
928 
929 
930 
931 }
932 
933 
935 {
936  dabc::Hierarchy chld = fHierarchy.GetHChild(fFileStateName);
937  if (!chld.null()) {
938  chld.SetField("value", dabc::format("%d", on));
939  // par.ScanParamFields(&chld()->Fields());
940  fHierarchy.MarkChangedItems();
941  // DOUT0("ChangeFileState to %d", on);
942  } else {
943  DOUT0("UpdateFileState Could not find hierarchy child %s", fFileStateName.c_str());
944  }
945 }
946 
947 void mbs::Monitor::UpdateMbsState(int on)
948 {
949  dabc::Hierarchy chld = fHierarchy.GetHChild(fAcqStateName);
950  if (!chld.null()) {
951  chld.SetField("value", dabc::format("%d", on));
952  // par.ScanParamFields(&chld()->Fields());
953  fHierarchy.MarkChangedItems();
954  // DOUT0("ChangeMBSState to %d", on);
955  } else {
956  DOUT0("UpdateMbsState Could not find hierarchy child %s", fAcqStateName.c_str());
957  }
958 }
959 
961 {
962  dabc::Hierarchy chld=fHierarchy.GetHChild(fSetupStateName);
963  if (!chld.null())
964  {
965  chld.SetField("value", dabc::format("%d", on));
966  //par.ScanParamFields(&chld()->Fields());
967  fHierarchy.MarkChangedItems();
968  //DOUT0("ChangeSetup state to %d", on);
969  }
970  else
971  {
972  DOUT0("UpdateSetupState Could not find hierarchy child %s", fSetupStateName.c_str());
973  }
974 
975 }
976 
977 void mbs::Monitor::SetRateInterval(double t)
978 {
979  if (t < fPeriod)
980  t = fPeriod;
981  fRateInterval = t;
982  dabc::Hierarchy chld = fHierarchy.GetHChild(fRateIntervalName);
983  if (!chld.null()) {
984  chld.SetField("value", dabc::format("%f", t));
985  // par.ScanParamFields(&chld()->Fields());
986  fHierarchy.MarkChangedItems();
987  DOUT0("Changed rate interval to %f seconds", t);
988  } else {
989  DOUT0("SetRateInterval Could not find hierarchy child %s", fRateIntervalName.c_str());
990  }
991 }
992 
993 void mbs::Monitor::SetHistoryDepth(int entries)
994 {
995  fHistory = entries;
996  dabc::Hierarchy chld = fHierarchy.GetHChild(fHistoryName);
997  if (!chld.null()) {
998  chld.SetField("value", dabc::format("%d", entries));
999  // par.ScanParamFields(&chld()->Fields());
1000  fHierarchy.MarkChangedItems();
1001  DOUT0("Changed history depth to %d entries", entries);
1002  } else {
1003  DOUT0("SetHistoryDepth Could not find hierarchy child %s", fHistoryName.c_str());
1004  }
1005 
1006  // here activate it immediately, does it work recursively?
1007  // fHierarchy.EnableHistory(0,true);
1008  // fHierarchy.EnableHistory(fHistory,true);
1009  // NO, lets update explicitely all interesting records:
1010 
1011  fHierarchy.GetHChild("DataRate").EnableHistory(0, true);
1012  fHierarchy.GetHChild("DataRate").EnableHistory(fHistory, true);
1013  fHierarchy.GetHChild("EventRate").EnableHistory(0, true);
1014  fHierarchy.GetHChild("EventRate").EnableHistory(fHistory, true);
1015  fHierarchy.GetHChild("ServerRate").EnableHistory(0, true);
1016  fHierarchy.GetHChild("ServerRate").EnableHistory(fHistory, true);
1017  fHierarchy.GetHChild("FileRate").EnableHistory(0, true);
1018  fHierarchy.GetHChild("FileRate").EnableHistory(fHistory, true);
1019 
1020  fHierarchy.GetHChild("rate_log").EnableHistory(0, true);
1021  fHierarchy.GetHChild("rate_log").EnableHistory(fHistory, true);
1022  fHierarchy.GetHChild("rash_log").EnableHistory(0, true);
1023  fHierarchy.GetHChild("rash_log").EnableHistory(fHistory, true);
1024  fHierarchy.GetHChild("rast_log").EnableHistory(0, true);
1025  fHierarchy.GetHChild("rast_log").EnableHistory(fHistory, true);
1026  fHierarchy.GetHChild("ratf_log").EnableHistory(0, true);
1027  fHierarchy.GetHChild("ratf_log").EnableHistory(fHistory, true);
1028 
1029  fCounter = 0; // to print heading on top
1030 }
1031 
1033 {
1034  if (cmd.IsName("ProcessDaqStatus")) {
1035  mbs::DaqStatusAddon *tr = dynamic_cast<mbs::DaqStatusAddon *>(fAddon());
1036 
1037  if (tr)
1038  NewStatus(tr->GetStatus());
1039 
1040  AssignAddon(nullptr);
1041 
1042  return dabc::cmd_true;
1043  } else if (cmd.IsName(dabc::CmdHierarchyExec::CmdName())) {
1044 
1045  std::string cmdpath = cmd.GetStr("Item");
1046 
1047  // if (cmdpath != "CmdMbs") return dabc::cmd_false;
1048 
1049  if (cmdpath == "CmdMbs") {
1050 
1051  if (fWaitingLogger) {
1052  if (!fWaitingCmd.null()) fWaitingCmd.Reply(dabc::cmd_false);
1053  fWaitingCmd = cmd;
1054  return dabc::cmd_postponed;
1055  }
1056 
1057  dabc::WorkerRef wrk = FindChildRef("DaqCmd");
1058 
1059  if ((fCmdPort <= 0) || wrk.null())
1060  return dabc::cmd_false;
1061 
1062  wrk.Submit(cmd);
1063 
1064  return dabc::cmd_postponed;
1065  } else if (cmdpath == "CmdSetRateInterval") {
1066  DOUT0("ExecuteCommand sees CmdSetRateInterval");
1067  double deltat = cmd.GetDouble("time", 3.0); // JAM todo: put string identifier to define or static variable
1068  SetRateInterval(deltat);
1069  SetHistoryDepth(fHistory); // need to clear old history entries when changing sampling period
1070  return dabc::cmd_true;
1071  } else if (cmdpath == "CmdSetHistoryDepth") {
1072  DOUT0("ExecuteCommand sees CmdSetHistoryDepth");
1073  int entries = cmd.GetInt("entries", 200); // JAM todo: put string identifier to define or static variable
1074  SetHistoryDepth(entries);
1075  return dabc::cmd_true;
1076  } else {
1077  return dabc::cmd_false;
1078  }
1079  } else if (cmd.IsName("DeleteWorkers")) {
1080  // command use to delete workers in api.cxx
1081  dabc::WorkerRef wrk = FindChildRef("DaqCmd");
1082  wrk.Destroy();
1083  wrk = FindChildRef("DaqLogger");
1084  wrk.Destroy();
1085  // also delete addon
1086  AssignAddon(nullptr);
1087  }
1088 
1090 }
1091 
1092 unsigned mbs::Monitor::WriteRecRawData(void* ptr, unsigned maxsize)
1093 {
1094  unsigned len = mbs::MonitorSlowControl::WriteRecRawData(ptr,maxsize);
1095  fRec.Clear();
1096  return len;
1097 }
1098 
1099 // =====================================================================
1100 
1101 mbs::DaqLogWorker::DaqLogWorker(const dabc::Reference& parent, const std::string &name, const std::string &mbsnode, int port) :
1102  dabc::Worker(parent, name),
1103  fMbsNode(mbsnode),
1104  fPort(port),
1105  fFirstRecv(true)
1106 {
1107 }
1108 
1110 {
1111 }
1112 
1113 
1115 {
1116  if (!fAddon.null()) return true;
1117 
1118  int fd = dabc::SocketThread::StartClient(fMbsNode, fPort);
1119  if (fd<=0) {
1120  EOUT("Fail open log %d port on node %s", fPort, fMbsNode.c_str());
1121  return false;
1122  }
1123 
1125  add->SetDeliverEventsToWorker(true);
1126  AssignAddon(add);
1127 
1128  memset(&fRec, 0, sizeof(fRec));
1129  add->StartRecv(&fRec, sizeof(fRec));
1130 
1131  return true;
1132 }
1133 
1134 
1136 {
1138 
1139  if (!CreateAddon()) ActivateTimeout(5);
1140 
1141  DOUT2("mbs::DaqLogWorker::OnThreadAssigned parent = %p", GetParent());
1142 }
1143 
1144 double mbs::DaqLogWorker::ProcessTimeout(double last_diff)
1145 {
1146  // use timeout to reconnect with the logger
1147  if (CreateAddon()) return -1;
1148  return 5.;
1149 }
1150 
1151 
1153 {
1154  switch (evnt.GetCode()) {
1156 
1157  if (fRec.iOrder!=1)
1158  mbs::SwapData(&fRec, 3 * sizeof(int32_t));
1159 
1160  if (fRec.iOrder==1) {
1161 
1162  if (fRec.iType == 1) {
1163  DOUT4("Keep alive message from MBS logger");
1164  } else {
1165  DOUT2("Get MSG: %s",fRec.fBuffer);
1166  mbs::Monitor* pl = dynamic_cast<mbs::Monitor*> (GetParent());
1167  if (pl) pl->NewMessage(fRec.fBuffer);
1168  }
1169  }
1170 
1171  memset(&fRec, 0, sizeof(fRec));
1172 
1173  dabc::SocketIOAddon* add = dynamic_cast<dabc::SocketIOAddon*>(fAddon());
1174  if (add) add->StartRecv(&fRec, sizeof(fRec));
1175 
1176  if (fFirstRecv) {
1177  fFirstRecv = false;
1178  mbs::Monitor* pl = dynamic_cast<mbs::Monitor*> (GetParent());
1179  if (pl) pl->LoggerAddonCreated();
1180  }
1181 
1182  break;
1183  }
1186  EOUT("Problem with logger - reconnect");
1187  AssignAddon(nullptr);
1188  ActivateTimeout(1);
1189  break;
1190  default:
1192  }
1193 
1194 }
1195 
1196 // =================================================================
1197 
1198 mbs::DaqRemCmdWorker::DaqRemCmdWorker(const dabc::Reference& parent, const std::string &name,
1199  const std::string &mbsnode, int port) :
1200  dabc::Worker(parent, name),
1201  fMbsNode(mbsnode),
1202  fPort(port),
1203  fCmds(dabc::CommandsQueue::kindPostponed),
1204  fState(ioInit),
1205  fSendCmdId(1)
1206 {
1207 }
1208 
1210 {
1211  DOUT3("Destroy DaqRemCmdWorker");
1212 }
1213 
1215 {
1217 
1218  CreateAddon();
1219 }
1220 
1222 {
1223  if (!fAddon.null()) return true;
1224 
1225  int fd = dabc::SocketThread::StartClient(fMbsNode, fPort);
1226  if (fd<=0) {
1227  EOUT("Fail open command port %d on node %s", fPort, fMbsNode.c_str());
1228  ActivateTimeout(5);
1229  return false;
1230  }
1231 
1232  dabc::SocketIOAddon* addon = new dabc::SocketIOAddon(fd);
1233  addon->SetDeliverEventsToWorker(true);
1234 
1235  DOUT2("ADDON:%p Created cmd socket %d to mbs %s:%d", addon, fd, fMbsNode.c_str(), fPort);
1236 
1237  AssignAddon(addon);
1238 
1239  // in any case receive next buffer
1240  memset(&fRecvBuf, 0, sizeof(fRecvBuf));
1241  addon->StartRecv(&fRecvBuf, sizeof(fRecvBuf));
1242 
1243  return true;
1244 }
1245 
1246 
1247 
1249 {
1250  switch (evnt.GetCode()) {
1252  // this is just confirmation that data was send - do nothing
1253  break;
1254  }
1256  //DOUT0("mbs::DaqRemCmdWorker get evntSocketRecvInfo");
1257 
1258  if (!fRecvBuf.CheckByteOrder()) {
1259  EOUT("Fail to decode data in receive buffer");
1260  } else
1261  if (fRecvBuf.l_cmdid == 0xffffffff) {
1262  // keep-alive buffer
1263  //DOUT0("mbs::DaqRemCmdWorker keep alive buffer");
1264  } else
1265  if ((fCmds.Size()>0) && (fState == ioWaitReply)) {
1266  // TODO: when reply command - check result
1267 
1268  DOUT3("mbs::DaqRemCmdWorker get reply for the command id %u", (unsigned) fRecvBuf.l_cmdid);
1269 
1270  bool res = fRecvBuf.l_status==0;
1271 
1272  if (fSendCmdId!= fRecvBuf.l_cmdid) {
1273  EOUT("Mismatch of command id in the MBS reply");
1274  res = false;
1275  }
1276 
1277  mbs::Monitor* pl = dynamic_cast<mbs::Monitor*> (GetParent());
1278  if (pl) pl->NewSendCommand(fCmds.Front().GetStr("cmd"), res ? 1 : 0);
1279 
1280  fCmds.Pop().ReplyBool(res);
1281  fState = ioInit;
1282  }
1283 
1284  dabc::SocketIOAddon* addon = dynamic_cast<dabc::SocketIOAddon*> (fAddon());
1285 
1286  if (addon==0) {
1287  EOUT("ADDON disappear !!!");
1288  ActivateTimeout(3.);
1289  } else {
1290  memset(&fRecvBuf, 0, sizeof(fRecvBuf));
1291  addon->StartRecv(&fRecvBuf, sizeof(fRecvBuf));
1292  }
1293 
1294  break;
1295  }
1298  // error, we cancel command execution and issue timeout to try again
1299  AssignAddon(nullptr);
1300  if ((fState==ioWaitReply) && (fCmds.Size()>0)) {
1301  fCmds.Pop().Reply(dabc::cmd_false);
1302  fState = ioInit;
1303  }
1304 
1305  ActivateTimeout(1.);
1306  break;
1307  default:
1309  }
1310 }
1311 
1312 double mbs::DaqRemCmdWorker::ProcessTimeout(double last_diff)
1313 {
1314  if (CreateAddon()) ProcessNextMbsCommand();
1315 
1316  return -1;
1317 }
1318 
1320 {
1321  if (cmd.IsName(dabc::CmdHierarchyExec::CmdName())) {
1322  fCmds.Push(cmd);
1323  ProcessNextMbsCommand();
1324  return dabc::cmd_postponed;
1325  }
1326 
1327  return dabc::Worker::ExecuteCommand(cmd);
1328 }
1329 
1331 {
1332  // start next command when previous is completed
1333  if (fState != ioInit) return;
1334 
1335  if (fCmds.Size()==0) return;
1336 
1337  dabc::SocketIOAddon* addon = dynamic_cast<dabc::SocketIOAddon*> (fAddon());
1338 
1339  if ((addon==0) || !addon->IsSocket()) {
1340  EOUT("Something went wrong");
1341  exit(5);
1342  }
1343 
1344  std::string mbscmd = fCmds.Front().GetStr("cmd");
1345  if (mbscmd.length() >= sizeof(fSendBuf.c_cmd)-1) {
1346  EOUT("Send command too long %u", mbscmd.length());
1347  fCmds.Pop().Reply(dabc::cmd_false);
1348  ProcessNextMbsCommand();
1349  return;
1350  }
1351 
1352  DOUT2("Send MBS-CMD: %s", mbscmd.c_str());
1353 
1354  mbs::Monitor* pl = dynamic_cast<mbs::Monitor*> (GetParent());
1355  if (pl) pl->NewSendCommand(mbscmd);
1356 
1357  fState = ioWaitReply;
1358 
1359  if (++fSendCmdId > 0x7fff0000) fSendCmdId = 1;
1360 
1361  memset(&fSendBuf, 0, sizeof(fSendBuf));
1362  fSendBuf.l_order = 1;
1363  fSendBuf.l_cmdid = fSendCmdId;
1364  fSendBuf.l_status = 0;
1365  strncpy(fSendBuf.c_cmd, mbscmd.c_str(), sizeof(fSendBuf.c_cmd)-1);
1366  addon->StartSend(&fSendBuf, sizeof(fSendBuf));
1367 }
1368 
1369 // ===============================================================================================
1370 
1371 mbs::PrompterWorker::PrompterWorker(const dabc::Reference& parent, const std::string &name,
1372  const std::string &mbsnode, int port) :
1373  dabc::Worker(parent, name),
1374  fMbsNode(mbsnode),
1375  fPort(port),
1376  fPrefix(),
1377  fCmds(dabc::CommandsQueue::kindPostponed),
1378  fState(ioInit)
1379 {
1380  fPrefix = dabc::SocketThread::DefineHostName() + ":";
1381  printf("Create prompter client with prefix %s\n", fPrefix.c_str());
1382 }
1383 
1385 {
1386  DOUT3("Destroy PrompterWorker");
1387 }
1388 
1390 {
1392 
1393  CreateAddon();
1394 }
1395 
1397 {
1398  if (!fAddon.null()) return true;
1399 
1400  int fd = dabc::SocketThread::StartClient(fMbsNode, fPort);
1401  if (fd<=0) {
1402  EOUT("Fail open command port %d on node %s", fPort, fMbsNode.c_str());
1403  ActivateTimeout(5);
1404  return false;
1405  }
1406 
1407  dabc::SocketIOAddon* addon = new dabc::SocketIOAddon(fd);
1408  addon->SetDeliverEventsToWorker(true);
1409 
1410  DOUT2("ADDON:%p Created cmd socket %d to mbs %s:%d", addon, fd, fMbsNode.c_str(), fPort);
1411 
1412  AssignAddon(addon);
1413 
1414  // in any case receive next buffer
1415  memset(fRecvBuf, 0, sizeof(fRecvBuf));
1416  addon->StartRecv(fRecvBuf, sizeof(fRecvBuf));
1417 
1418  return true;
1419 }
1420 
1421 
1423 {
1424  switch (evnt.GetCode()) {
1426  // this is just confirmation that data was send - do nothing
1427  break;
1428  }
1430  //DOUT0("mbs::PrompterWorker get evntSocketRecvInfo");
1431 
1432  if ((fCmds.Size()>0) && (fState == ioWaitReply)) {
1433  // TODO: when reply command - check result
1434 
1435  bool res = true;
1436 
1437  if (fRecvBuf[0]!=1)
1438  mbs::SwapData(fRecvBuf, sizeof(fRecvBuf));
1439 
1440  DOUT3("mbs::PrompterWorker get reply for the command id %u", (unsigned) fRecvBuf[1]);
1441 
1442  if (fRecvBuf[0]!=1) {
1443  EOUT("Wrong reply from the prompter");
1444  res = false;
1445  } else
1446  if (fRecvBuf[1]!=0) {
1447  res = false;
1448  }
1449 
1450  mbs::Monitor* pl = dynamic_cast<mbs::Monitor*> (GetParent());
1451  if (pl) pl->NewSendCommand(fCmds.Front().GetStr("cmd"), res ? 1 : 0);
1452 
1453  fCmds.Pop().ReplyBool(res);
1454  fState = ioInit;
1455  }
1456 
1457  dabc::SocketIOAddon* addon = dynamic_cast<dabc::SocketIOAddon*> (fAddon());
1458 
1459  if (addon==0) {
1460  EOUT("ADDON disappear !!!");
1461  ActivateTimeout(3.);
1462  } else {
1463  memset(fRecvBuf, 0, sizeof(fRecvBuf));
1464  addon->StartRecv(fRecvBuf, sizeof(fRecvBuf));
1465  }
1466 
1467  break;
1468  }
1471  // error, we cancel command execution and issue timeout to try again
1472  AssignAddon(nullptr);
1473  if ((fState==ioWaitReply) && (fCmds.Size()>0)) {
1474  fCmds.Pop().Reply(dabc::cmd_false);
1475  fState = ioInit;
1476  }
1477 
1478  ActivateTimeout(1.);
1479  break;
1480  default:
1482  }
1483 }
1484 
1485 double mbs::PrompterWorker::ProcessTimeout(double last_diff)
1486 {
1487  if (CreateAddon()) ProcessNextMbsCommand();
1488 
1489  return -1;
1490 }
1491 
1493 {
1494  if (cmd.IsName(dabc::CmdHierarchyExec::CmdName())) {
1495  fCmds.Push(cmd);
1496  ProcessNextMbsCommand();
1497  return dabc::cmd_postponed;
1498  }
1499 
1500  return dabc::Worker::ExecuteCommand(cmd);
1501 }
1502 
1504 {
1505  // start next command when previous is completed
1506  if (fState != ioInit) return;
1507 
1508  if (fCmds.Size()==0) return;
1509 
1510  dabc::SocketIOAddon* addon = dynamic_cast<dabc::SocketIOAddon*> (fAddon());
1511 
1512  if ((addon==0) || !addon->IsSocket()) {
1513  EOUT("Something went wrong");
1514  exit(5);
1515  }
1516 
1517  std::string mbscmd = fCmds.Front().GetStr("cmd");
1518  if (mbscmd.length() >= sizeof(fSendBuf) - fPrefix.length()) {
1519  EOUT("Send command too long %u", mbscmd.length());
1520  fCmds.Pop().ReplyBool(false);
1521  ProcessNextMbsCommand();
1522  return;
1523  }
1524 
1525  DOUT2("Send MBS-CMD: %s", mbscmd.c_str());
1526 
1527  mbs::Monitor* pl = dynamic_cast<mbs::Monitor*> (GetParent());
1528  if (pl) pl->NewSendCommand(mbscmd);
1529 
1530  fState = ioWaitReply;
1531 
1532  strcpy(fSendBuf, fPrefix.c_str());
1533  strcat(fSendBuf, mbscmd.c_str());
1534  addon->StartSend(fSendBuf, sizeof(fSendBuf));
1535 }
Command definition class.
Definition: Parameter.h:333
CommandDefinition & AddArg(const std::string &name, const std::string &kind="string", bool required=true, const RecordField &dflt=RecordField())
Definition: Parameter.cxx:593
Represents command with its arguments.
Definition: Command.h:99
double GetDouble(const std::string &name, double dflt=0.) const
Definition: Command.h:145
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
void MarkChangedItems(uint64_t tm=0)
If any field was modified, item will be marked with new version.
Definition: Hierarchy.h:330
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
Hierarchy GetHChild(const std::string &name, bool allowslahes=false, bool force=false, bool sortorder=false)
Return child, if necessary creates with full subfolder If force specified, missing childs and folders...
Definition: Hierarchy.cxx:944
void EnableHistory(unsigned length=100, bool withchilds=false)
Activate history production for selected element and its childs.
Definition: Hierarchy.cxx:837
virtual void OnThreadAssigned()
Definition: Module.cxx:79
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
bool SetFieldModified(const std::string &name, bool on=true)
Definition: Record.h:519
Reference on the arbitrary object
Definition: Reference.h:73
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
void Destroy()
Release reference and starts destroyment of referenced object.
Definition: Reference.cxx:148
@ evntSocketSendInfo
event delivered to worker when write is completed
Definition: SocketThread.h:96
@ evntSocketRecvInfo
event delivered to worker when read is completed
Definition: SocketThread.h:95
@ evntSocketCloseInfo
event delivered to worker when socket is closed
Definition: SocketThread.h:98
@ evntSocketErrorInfo
event delivered to worker when error is detected
Definition: SocketThread.h:97
void SetDeliverEventsToWorker(bool on=true)
Definition: SocketThread.h:121
bool IsSocket() const
Definition: SocketThread.h:109
Socket addon for handling I/O events.
Definition: SocketThread.h:144
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
bool StartRecv(void *buf, size_t size)
static std::string DefineHostName(bool force=true)
Return current host name.
static int StartClient(const std::string &host, int nport, bool nonblocking=true)
virtual void OnThreadAssigned()
Definition: Worker.h:73
bool ActivateTimeout(double tmout_sec)
Definition: Worker.cxx:62
Reference on dabc::Worker
Definition: Worker.h:466
bool Submit(Command cmd)
Definition: Worker.cxx:1139
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
virtual void OnThreadAssigned()
Definition: Worker.h:392
virtual void ProcessEvent(const EventId &)
Definition: Worker.cxx:499
bool Submit(Command cmd)
Submit command for execution in the processor.
Definition: Worker.cxx:960
bool AssignToThread(ThreadRef thrd, bool sync=true)
Assign worker to thread, worker becomes active immediately.
Definition: Worker.cxx:326
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Worker.cxx:851
DaqLogWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
Definition: Monitor.cxx:1101
virtual void ProcessEvent(const dabc::EventId &)
Definition: Monitor.cxx:1152
virtual double ProcessTimeout(double last_diff)
Definition: Monitor.cxx:1144
virtual ~DaqLogWorker()
Definition: Monitor.cxx:1109
virtual void OnThreadAssigned()
Definition: Monitor.cxx:1135
void ProcessNextMbsCommand()
Definition: Monitor.cxx:1330
virtual void OnThreadAssigned()
Definition: Monitor.cxx:1214
virtual ~DaqRemCmdWorker()
Definition: Monitor.cxx:1209
virtual double ProcessTimeout(double last_diff)
Definition: Monitor.cxx:1312
virtual void ProcessEvent(const dabc::EventId &)
Definition: Monitor.cxx:1248
DaqRemCmdWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
Definition: Monitor.cxx:1198
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Definition: Monitor.cxx:1319
Addon class to retrieve status record from MBS server
Definition: Monitor.h:37
IOState fState
Definition: Monitor.h:54
virtual void OnThreadAssigned()
Definition: Monitor.cxx:63
virtual void OnRecvCompleted()
Method called when receive operation is completed.
Definition: Monitor.cxx:96
uint32_t fSendCmd
Definition: Monitor.h:57
virtual double ProcessTimeout(double last_diff)
Definition: Monitor.cxx:88
DaqStatusAddon(int fd)
Definition: Monitor.cxx:52
mbs::DaqStatus fStatus
Definition: Monitor.h:55
mbs::DaqStatus & GetStatus()
Definition: Monitor.h:69
virtual unsigned WriteRecRawData(void *ptr, unsigned maxsize)
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
Interface module for MBS monitoring and control.
Definition: Monitor.h:224
virtual void OnThreadAssigned()
Definition: Monitor.cxx:280
void SetRateInterval(double t)
set sampling interval for rates calculation
Definition: Monitor.cxx:977
void NewMessage(const std::string &msg)
Called by LogWorker to inform about new message.
Definition: Monitor.cxx:870
void FillStatistic(const std::string &options, const std::string &itemname, mbs::DaqStatus *old_daqst, mbs::DaqStatus *new_daqst, double difftime)
Definition: Monitor.cxx:316
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Definition: Monitor.cxx:1032
void UpdateSetupState(int isloaded)
update mbs setup loaded state
Definition: Monitor.cxx:960
void NewSendCommand(const std::string &cmd, int res=-1)
Called by CmdWorker to inform about new command send (or getting reply)
Definition: Monitor.cxx:884
void UpdateMbsState(int isrunning)
update mbs acq running state
Definition: Monitor.cxx:947
Monitor(const std::string &name, dabc::Command cmd=nullptr)
Definition: Monitor.cxx:132
void UpdateFileState(int isopen)
update file on/off state
Definition: Monitor.cxx:934
void SetHistoryDepth(int entries)
set depth of variable history buffer
Definition: Monitor.cxx:993
void CreateCommandWorker()
Definition: Monitor.cxx:294
void LoggerAddonCreated()
Called by LogWorker to inform that connection is established.
Definition: Monitor.cxx:863
void NewStatus(mbs::DaqStatus &stat)
Called by DaqStatusAddon to inform about new daq status.
Definition: Monitor.cxx:892
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
Definition: Monitor.cxx:826
virtual unsigned WriteRecRawData(void *ptr, unsigned maxsize)
Definition: Monitor.cxx:1092
virtual ~Monitor()
Definition: Monitor.cxx:276
virtual void ProcessEvent(const dabc::EventId &)
Definition: Monitor.cxx:1422
void ProcessNextMbsCommand()
Definition: Monitor.cxx:1503
virtual double ProcessTimeout(double last_diff)
Definition: Monitor.cxx:1485
PrompterWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
Definition: Monitor.cxx:1371
virtual ~PrompterWorker()
Definition: Monitor.cxx:1384
virtual void OnThreadAssigned()
Definition: Monitor.cxx:1389
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Definition: Monitor.cxx:1492
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
Definition: XmlEngine.cxx:917
Event manipulation API.
Definition: api.h:23
TimeStamp Now()
Definition: timing.h:260
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * prop_kind
Definition: Hierarchy.cxx:29
const char * prop_auth
Definition: Hierarchy.cxx:34
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38
Support for MBS - standard GSI DAQ.
Definition: api.h:36
void SwapData(void *data, unsigned bytessize)
#define SYS__util
Definition: Monitor.cxx:31
#define SYS__event_serv
Definition: Monitor.cxx:28
#define SYS__transport
Definition: Monitor.cxx:27
#define SYS__stream_serv
Definition: Monitor.cxx:35
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
uint16_t GetCode() const
Definition: Thread.h:92
Class for acquiring and holding timestamps.
Definition: timing.h:40
void GetNow()
Method to acquire current time stamp.
Definition: timing.h:137
double AsDouble() const
Return time stamp in form of double (in seconds)
Definition: timing.h:120
uint32_t bl_n_kbyte_file
Definition: MbsTypeDefs.h:243
uint32_t bl_no_stream_buf
Definition: MbsTypeDefs.h:321
char c_file_name[256]
Definition: MbsTypeDefs.h:338
uint32_t l_file_size
Definition: MbsTypeDefs.h:261
uint32_t bl_n_evserv_events
Definition: MbsTypeDefs.h:238
uint32_t l_file_cur
Definition: MbsTypeDefs.h:260
uint32_t bh_acqui_running
Definition: MbsTypeDefs.h:227
uint32_t bl_n_events
Definition: MbsTypeDefs.h:234
uint32_t bh_setup_loaded
Definition: MbsTypeDefs.h:229
uint32_t bl_n_buffers
Definition: MbsTypeDefs.h:235
uint32_t bh_running[SYS__N_MAX_PROCS]
Definition: MbsTypeDefs.h:279
uint32_t bl_n_bufstream
Definition: MbsTypeDefs.h:236
uint32_t l_block_size
Definition: MbsTypeDefs.h:263
int32_t l_open_file
Definition: MbsTypeDefs.h:266
uint32_t bl_n_strserv_bufs
Definition: MbsTypeDefs.h:240
uint32_t bl_n_strserv_kbytes
Definition: MbsTypeDefs.h:241
uint32_t bl_n_kbyte
Definition: MbsTypeDefs.h:237