25 #define SYS__read_meb 0
26 #define SYS__collector 1
27 #define SYS__transport 2
28 #define SYS__event_serv 3
29 #define SYS__msg_log 4
30 #define SYS__dispatch 5
32 #define SYS__sbs_mon 7
33 #define SYS__read_cam_slav 8
34 #define SYS__esone_serv 9
35 #define SYS__stream_serv 10
36 #define SYS__histogram 11
37 #define SYS__prompt 12
38 #define SYS__daq_rate 13
45 #define SYS__vme_serv 20
47 #define VERSION__SETUP 1
48 #define VERSION__SET_ML 1
49 #define VERSION__SET_MO 1
53 dabc::SocketIOAddon(fd),
59 SetDeleteWorkerOnClose(
false);
72 EOUT(
"Get thread assigned in not-init state - check why");
90 EOUT(
"Status timeout - delete addon");
92 EOUT(
"Status timeout - delete addon called");
98 if (fState == ioRecvHeader) {
100 if(fStatus.l_endian != 1) fSwapping =
true;
105 if ((fStatus.l_version != 51) && (fStatus.l_version != 62) && (fStatus.l_version != 63)) {
106 EOUT(
"Unsupported status version %u", (
unsigned) fStatus.l_version);
111 StartRecv(&fStatus.bh_daqst_initalized, (fStatus.l_fix_lw-7)*4);
115 if (fState == ioRecvData) {
118 mbs::SwapData(&fStatus.bh_daqst_initalized, (fStatus.l_fix_lw-7)*4 - (19 * fStatus.l_sbs__str_len_64));
126 EOUT(
"Wrong state when recv data");
133 mbs::MonitorSlowControl(name,
"Mbs", cmd),
143 fWaitingLogger(true),
148 fMbsNode = Cfg(
"node", cmd).AsStr();
149 fAlias = Cfg(
"alias", cmd).AsStr(fMbsNode);
150 fPeriod = Cfg(
"period", cmd).AsDouble(1.);
151 fRateInterval = Cfg(
"rateinterval", cmd).AsDouble(1.);
152 fHistory = Cfg(
"history", cmd).AsInt(200);
153 bool publish = Cfg(
"publish", cmd).AsBool(
true);
154 fPrintf = Cfg(
"printf", cmd).AsBool(
false);
156 fFileStateName =
"MbsFileOpen";
157 fAcqStateName =
"MbsAcqRunning";
158 fSetupStateName =
"MbsSetupLoaded";
159 fRateIntervalName =
"MbsRateInterval";
160 fHistoryName =
"MbsHistoryDepth";
162 if (Cfg(
"stat", cmd).AsStr() ==
"true")
165 fStatPort = Cfg(
"stat", cmd).AsInt(0);
167 if (Cfg(
"logger", cmd).AsStr() ==
"true")
170 fLoggerPort = Cfg(
"logger", cmd).AsInt(0);
172 if (Cfg(
"cmd", cmd).AsStr() ==
"true")
175 fCmdPort = Cfg(
"cmd", cmd).AsInt(0);
177 fHierarchy.Create(
"MBS");
236 SetRateInterval(fRateInterval);
237 SetHistoryDepth(fHistory);
243 cmddef.
AddArg(
"cmd",
"string",
true,
"show rate");
248 cmddef_rate.
AddArg(
"time",
"double",
true,
"1.0");
253 cmddef_hist.
AddArg(
"entries",
"int",
true,
"100");
258 ui.
SetField(
"_UserFilePath",
"${DABCSYS}/plugins/mbs/htm/");
259 ui.
SetField(
"_UserFileMain",
"main.htm");
261 CreateTimer(
"update", 5.);
262 CreateTimer(
"MbsUpdate", fPeriod);
270 Publish(fHierarchy, std::string(
"/MBS/") + fAlias);
272 if (fLoggerPort <= 0) NewMessage(
"!!! logger not activated !!!");
282 if (fLoggerPort > 0) {
283 DaqLogWorker* logger =
new DaqLogWorker(
this,
"DaqLogger", fMbsNode, fLoggerPort);
284 logger->AssignToThread(thread());
288 CreateCommandWorker();
296 fWaitingLogger =
false;
299 if (!wrk.
null())
return;
301 DOUT0(
"Create command worker");
305 remcmd =
new PrompterWorker(
this,
"DaqCmd", fMbsNode, fCmdPort);
306 }
else if (fCmdPort > 0) {
307 remcmd =
new DaqRemCmdWorker(
this,
"DaqCmd", fMbsNode, fCmdPort);
311 if (!fWaitingCmd.null())
312 remcmd->
Submit(fWaitingCmd);
318 int bStreams_n = 0, bBuffers_n = 0, bEvents_n = 0, bData_n = 0;
319 int bStreams_r = 0, bBuffers_r = 0, bEvents_r = 0, bData_r = 0;
320 int bSrvStreams_n = 0, bSrvEvents_n = 0, bSrvBuffers_n = 0, bSrvData_n = 0;
321 int bSrvStreams_r = 0, bSrvEvents_r = 0, bSrvBuffers_r = 0, bSrvData_r = 0;
322 int bFilename = 0, bFileFilled = 0, bFileData = 0, bFileData_r = 0, bFileIndex = 0;
323 int bStreamsFree = 0, bStreamsFilled = 0, bStreamsSrv = 0;
325 int bl_file = 0, bl_server = 0, bl_mbs = 0, bl_streams = 0;
327 int l_free_stream(0), l_trans_stream(0), l_serv_stream(0);
335 strncpy(c_line, options.c_str(),
sizeof(c_line)-1);
337 if (strstr (c_line,
"-nst") != NULL )
339 if (strstr (c_line,
"-rst") != NULL )
341 if (strstr (c_line,
"-nsst") != NULL )
343 if (strstr (c_line,
"-rsst") != NULL )
345 if (strstr (c_line,
"-est") != NULL )
347 if (strstr (c_line,
"-fst") != NULL )
349 if (strstr (c_line,
"-kst") != NULL )
351 if (strstr (c_line,
"-nbu") != NULL )
353 if (strstr (c_line,
"-rbu") != NULL )
355 if (strstr (c_line,
"-nsbu") != NULL )
357 if (strstr (c_line,
"-rsbu") != NULL )
359 if (strstr (c_line,
"-nev") != NULL )
361 if (strstr (c_line,
"-rev") != NULL )
363 if (strstr (c_line,
"-nsev") != NULL )
365 if (strstr (c_line,
"-rsev") != NULL )
367 if (strstr (c_line,
"-nda") != NULL )
369 if (strstr (c_line,
"-rda") != NULL )
371 if (strstr (c_line,
"-nsda") != NULL )
373 if (strstr (c_line,
"-rsda") != NULL )
375 if (strstr (c_line,
"-sfi") != NULL )
377 if (strstr (c_line,
"-ffi") != NULL )
379 if (strstr (c_line,
"-rfi") != NULL )
381 if (strstr (c_line,
"-nfi") != NULL )
383 if (strstr (c_line,
"-ifi") != NULL )
385 if (strstr (c_line,
"-mbs") != NULL )
400 if (strstr (c_line,
"-u") != NULL )
414 if (strstr (c_line,
"-fi") != NULL )
422 if (strstr (c_line,
"-st") != NULL )
430 if (strstr (c_line,
"-se") != NULL )
448 if (strstr (c_line,
"-ra") != NULL )
466 bl_mbs = bData_n + bEvents_n + bBuffers_n + bStreams_n + bData_r + bEvents_r + bBuffers_r + bStreams_r;
467 bl_server = bSrvData_n + bSrvEvents_n + bSrvBuffers_n + bSrvStreams_n + bSrvData_r + bSrvEvents_r + bSrvBuffers_r
469 bl_streams = bStreamsFree + bStreamsFilled + bStreamsSrv;
470 bl_file = bFileData + bFileFilled + bFileData_r + bFilename + bFileIndex;
472 memset(c_head0, 0,
sizeof(c_head0));
473 memset(c_head, 0,
sizeof(c_head));
476 strncat (c_head0,
" Event building",
sizeof(c_head0)-1);
479 strncat (c_head,
" MB ",
sizeof(c_head)-1);
480 strncat (c_head0,
" ",
sizeof(c_head0)-1);
484 strncat (c_head,
" Events ",
sizeof(c_head)-1);
485 strncat (c_head0,
" ",
sizeof(c_head0)-1);
489 strncat (c_head,
" Buffers ",
sizeof(c_head)-1);
490 strncat (c_head0,
" ",
sizeof(c_head0)-1);
494 strncat (c_head,
"Streams ",
sizeof(c_head)-1);
495 strncat (c_head0,
" ",
sizeof(c_head0)-1);
499 strncat (c_head,
" Kb/sec ",
sizeof(c_head)-1);
500 strncat (c_head0,
" ",
sizeof(c_head0)-1);
504 strncat (c_head,
" Ev/sec ",
sizeof(c_head)-1);
505 strncat (c_head0,
" ",
sizeof(c_head0)-1);
509 strncat (c_head,
"Buf/sec ",
sizeof(c_head)-1);
510 strncat (c_head0,
" ",
sizeof(c_head0)-1);
514 strncat (c_head,
"Str/sec ",
sizeof(c_head)-1);
515 strncat (c_head0,
" ",
sizeof(c_head0)-1);
517 c_head0[strlen (c_head0) - 15] = 0;
519 bl_server = bSrvData_n + bSrvEvents_n + bSrvBuffers_n + bSrvStreams_n + bSrvData_r + bSrvEvents_r + bSrvBuffers_r
523 strncat (c_head0,
"| Server ",
sizeof(c_head0)-1);
524 strncat (c_head,
"|",
sizeof(c_head)-1);
527 strncat (c_head,
" MB ",
sizeof(c_head)-1);
528 strncat (c_head0,
" ",
sizeof(c_head0)-1);
532 strncat (c_head,
" Events ",
sizeof(c_head)-1);
533 strncat (c_head0,
" ",
sizeof(c_head0)-1);
537 strncat (c_head,
" Buffers ",
sizeof(c_head)-1);
538 strncat (c_head0,
" ",
sizeof(c_head0)-1);
542 strncat (c_head,
"Streams ",
sizeof(c_head)-1);
543 strncat (c_head0,
" ",
sizeof(c_head0)-1);
547 strncat (c_head,
" Kb/sec ",
sizeof(c_head)-1);
548 strncat (c_head0,
" ",
sizeof(c_head0)-1);
552 strncat (c_head,
" Ev/sec ",
sizeof(c_head)-1);
553 strncat (c_head0,
" ",
sizeof(c_head0)-1);
557 strncat (c_head,
"Buf/sec ",
sizeof(c_head)-1);
558 strncat (c_head0,
" ",
sizeof(c_head0)-1);
562 strncat (c_head,
"Str/sec ",
sizeof(c_head)-1);
563 strncat (c_head0,
" ",
sizeof(c_head0)-1);
565 c_head0[strlen (c_head0) - 8] = 0;
567 bl_streams = bStreamsFree + bStreamsFilled + bStreamsSrv;
570 strncat (c_head0,
"| Streams ",
sizeof(c_head0)-1);
571 strncat (c_head,
"|",
sizeof(c_head)-1);
574 strncat (c_head,
"Empty ",
sizeof(c_head)-1);
575 strncat (c_head0,
" ",
sizeof(c_head0)-1);
579 strncat (c_head,
"Full ",
sizeof(c_head)-1);
580 strncat (c_head0,
" ",
sizeof(c_head0)-1);
584 strncat (c_head,
"Hold ",
sizeof(c_head)-1);
585 strncat (c_head0,
" ",
sizeof(c_head0)-1);
587 c_head0[strlen (c_head0) - 9] = 0;
589 bl_file = bFileData + bFileFilled + bFileData_r + bFilename + bFileIndex;
592 strncat (c_head0,
"| File output ",
sizeof(c_head0)-1);
594 strncat (c_head0,
" ",
sizeof(c_head0)-1);
595 strncat (c_head,
"|",
sizeof(c_head)-1);
597 strncat (c_head,
" MB ",
sizeof(c_head)-1);
599 strncat (c_head,
"Filled ",
sizeof(c_head)-1);
601 strncat (c_head,
" Kb/sec ",
sizeof(c_head)-1);
603 strncat (c_head,
"Index ",
sizeof(c_head)-1);
605 strncat (c_head,
"Filename",
sizeof(c_head)-1);
618 double r_new_kb = new_daqst->
bl_n_kbyte * 1.024;
631 double r_old_kb = old_daqst->
bl_n_kbyte * 1.024;
638 double r_rate_buf = (r_new_buf - r_old_buf) / diff_time;
639 double r_rate_kb = (r_new_kb - r_old_kb) / diff_time;
640 double r_rate_evt = (r_new_evt - r_old_evt) / diff_time;
641 double r_rate_stream = (r_new_stream - r_old_stream) / diff_time;
642 double r_rate_evsrv_evt = (r_new_evsrv_evt - r_old_evsrv_evt) / diff_time;
645 double r_rate_file_kb = (r_new_file_kb - r_old_file_kb) / diff_time;
646 double r_rate_strsrv_str = (r_new_strsrv_str - r_old_strsrv_str) / diff_time / bl_n_ev_buf;
647 double r_rate_strsrv_buf = (r_new_strsrv_buf - r_old_strsrv_buf) / diff_time;
648 double r_rate_strsrv_kb = (r_new_strsrv_kb - r_old_strsrv_kb) / diff_time;
650 memset(c_out, 0,
sizeof(c_out));
651 memset(c_line, 0,
sizeof(c_line));
656 snprintf (c_line,
sizeof(c_line),
"%10.0f ", r_new_buf / 1000000. * bl_ev_buf_len);
657 strncat (c_out, c_line,
sizeof(c_out)-1);
661 snprintf (c_line,
sizeof(c_line),
"%10u ", (
unsigned) new_daqst->
bl_n_events);
662 strncat (c_out, c_line,
sizeof(c_out)-1);
666 snprintf (c_line,
sizeof(c_line),
"%10u ", (
unsigned) new_daqst->
bl_n_buffers);
667 strncat (c_out, c_line,
sizeof(c_out)-1);
671 snprintf (c_line,
sizeof(c_line),
"%7u ", (
unsigned) new_daqst->
bl_n_bufstream);
672 strncat (c_out, c_line,
sizeof(c_out)-1);
676 snprintf (c_line,
sizeof(c_line),
"%7.1f ", r_rate_kb);
677 strncat (c_out, c_line,
sizeof(c_out)-1);
681 snprintf (c_line,
sizeof(c_line),
"%7.0f ", r_rate_evt);
682 strncat (c_out, c_line,
sizeof(c_out)-1);
686 snprintf (c_line,
sizeof(c_line),
"%7.0f ", r_rate_buf);
687 strncat (c_out, c_line,
sizeof(c_out)-1);
691 snprintf (c_line,
sizeof(c_line),
"%7.0f ", r_rate_stream);
692 strncat (c_out, c_line,
sizeof(c_out)-1);
697 strncat (c_out,
"|",
sizeof(c_out)-1);
700 snprintf (c_line,
sizeof(c_line),
"%10.0f ", r_new_strsrv_kb / 1000.);
701 strncat (c_out, c_line,
sizeof(c_out)-1);
706 strncat (c_out, c_line,
sizeof(c_out)-1);
710 snprintf (c_line,
sizeof(c_line),
"%10u ", (
unsigned) new_daqst->
bl_n_strserv_bufs);
711 strncat (c_out, c_line,
sizeof(c_out)-1);
715 snprintf (c_line,
sizeof(c_line),
"%7u ", (
unsigned) new_daqst->
bl_n_strserv_bufs / bl_n_ev_buf);
716 strncat (c_out, c_line,
sizeof(c_out)-1);
720 snprintf (c_line,
sizeof(c_line),
"%7.1f ", r_rate_strsrv_kb);
721 strncat (c_out, c_line,
sizeof(c_out)-1);
725 snprintf (c_line,
sizeof(c_line),
"%7.0f ", r_rate_evsrv_evt);
726 strncat (c_out, c_line,
sizeof(c_out)-1);
730 snprintf (c_line,
sizeof(c_line),
"%7.0f ", r_rate_strsrv_buf);
731 strncat (c_out, c_line,
sizeof(c_out)-1);
735 snprintf (c_line,
sizeof(c_line),
"%7.0f ", r_rate_strsrv_str);
736 strncat (c_out, c_line,
sizeof(c_out)-1);
741 strncat (c_out,
"|",
sizeof(c_out)-1);
744 snprintf (c_line,
sizeof(c_line),
"%5d ", l_free_stream);
745 strncat (c_out, c_line,
sizeof(c_out)-1);
749 snprintf (c_line,
sizeof(c_line),
"%4d ", l_trans_stream);
750 strncat (c_out, c_line,
sizeof(c_out)-1);
754 snprintf (c_line,
sizeof(c_line),
"%4d ", l_serv_stream);
755 strncat (c_out, c_line,
sizeof(c_out)-1);
760 strncat (c_out,
"|",
sizeof(c_out)-1);
763 snprintf (c_line,
sizeof(c_line),
"%8.0f ", r_new_file_kb / 1000.);
764 strncat (c_out, c_line,
sizeof(c_out)-1);
768 snprintf (c_line,
sizeof(c_line),
"%5.1f %% ", new_daqst->
l_file_size > 0 ? r_new_file_kb / new_daqst->
l_file_size * 0.1 : 0.);
769 strncat (c_out, c_line,
sizeof(c_out)-1);
773 snprintf (c_line,
sizeof(c_line),
"%8.1f ", r_rate_file_kb);
774 strncat (c_out, c_line,
sizeof(c_out)-1);
778 snprintf (c_line,
sizeof(c_line),
" %04u ", (
unsigned) new_daqst->
l_file_cur);
779 strncat (c_out, c_line,
sizeof(c_out)-1);
783 snprintf (c_line,
sizeof(c_line),
"%s", new_daqst->
c_file_name);
784 strncat (c_out, c_line,
sizeof(c_out)-1);
787 strncat (c_out,
" op",
sizeof(c_out)-1);
789 strncat (c_out,
" cl",
sizeof(c_out)-1);
794 if (fCounter % 20 == 0) {
805 fHierarchy.GetHChild(
"DataRate").SetField(
"value",
dabc::format(
"%3.1f", r_rate_kb));
806 fHierarchy.GetHChild(
"DataRate").SetFieldModified(
"value");
807 fHierarchy.GetHChild(
"EventRate").SetField(
"value",
dabc::format(
"%3.1f", r_rate_evt));
808 fHierarchy.GetHChild(
"EventRate").SetFieldModified(
"value");
809 fHierarchy.GetHChild(
"ServerRate").SetField(
"value",
dabc::format(
"%3.1f", r_rate_strsrv_kb));
810 fHierarchy.GetHChild(
"ServerRate").SetFieldModified(
"value");
811 fHierarchy.GetHChild(
"FileRate").SetField(
"value",
dabc::format(
"%3.1f", r_rate_file_kb));
812 fHierarchy.GetHChild(
"FileRate").SetFieldModified(
"value");
816 std::string prefix = std::string(
"MBS.") + fMbsNode + std::string(
".");
817 fRec.AddDouble(prefix +
"DataRate", r_rate_kb,
true);
818 fRec.AddDouble(prefix +
"EventRate", r_rate_evt,
true);
819 fRec.AddDouble(prefix +
"ServerRate", r_rate_strsrv_kb,
true);
820 fRec.AddDouble(prefix +
"FileRate", r_rate_file_kb,
true);
823 fHierarchy.MarkChangedItems();
828 if (TimerName(timer) !=
"MbsUpdate") {
833 if (fMbsNode.empty()) {
836 double v1 = 100. * (1.3 + sin(
dabc::Now().AsDouble()/5.));
837 fHierarchy.GetHChild(
"DataRate").SetField(
"value",
dabc::format(
"%4.2f", v1));
839 v1 = 100. * (1.3 + cos(
dabc::Now().AsDouble()/8.));
840 fHierarchy.GetHChild(
"EventRate").SetField(
"value",
dabc::format(
"%4.2f", v1));
842 fHierarchy.GetHChild(
"rate_log").SetField(
"value",
dabc::format(
"| Header | Entry | Rate |"));
843 fHierarchy.GetHChild(
"rate_log").SetField(
"value",
dabc::format(
"| | %5d | %6.2f |", fCounter,v1));
845 fHierarchy.MarkChangedItems();
853 if (!fAddon.null() || (fStatPort<=0))
return;
857 EOUT(
"FAIL status port %d for node %s", fStatPort, fMbsNode.c_str());
859 AssignAddon(
new DaqStatusAddon(fd));
867 CreateCommandWorker();
877 fHierarchy.MarkChangedItems();
880 if (fPrintf) printf(
"%s\n", msg.c_str());
886 if (!fPrintf)
return;
887 if (res>=0) printf(
"replcmd>%s res=%s\n", cmd.c_str(),
DBOOL(res));
888 else printf(
"sendcmd>%s\n", cmd.c_str());
897 double tmdiff = stamp.
AsDouble() - fStatStamp.AsDouble();
899 EOUT(
"Wrong time calculation");
902 double deltaT=fabs((tmdiff-fRateInterval)/tmdiff);
903 DOUT3(
"NEW STATUS with rate interval:%f , dt=%f\n, delta=%f", fRateInterval, tmdiff, deltaT);
904 if ((tmdiff>0) && ((deltaT<1/fRateInterval) || (ceil(tmdiff) > 1 + fRateInterval) ) )
907 if (!fStatus.null()){
908 FillStatistic(
"-u",
"rate_log", &fStatus, &stat, tmdiff);
909 FillStatistic(
"-rev -rda -nev -nda",
"rash_log", &fStatus, &stat, tmdiff);
910 FillStatistic(
"-rev -rda -nev -nda -rsda",
"rast_log", &fStatus, &stat, tmdiff);
911 FillStatistic(
"-rev -rda -nev -nda -rsda -fi",
"ratf_log", &fStatus, &stat, tmdiff);
913 DOUT3(
"Filled statistics with rate interval:%f after dt=%f\n", fRateInterval, tmdiff);
917 memcpy(&fStatus, &stat,
sizeof(stat));
940 fHierarchy.MarkChangedItems();
943 DOUT0(
"UpdateFileState Could not find hierarchy child %s", fFileStateName.c_str());
953 fHierarchy.MarkChangedItems();
956 DOUT0(
"UpdateMbsState Could not find hierarchy child %s", fAcqStateName.c_str());
967 fHierarchy.MarkChangedItems();
972 DOUT0(
"UpdateSetupState Could not find hierarchy child %s", fSetupStateName.c_str());
986 fHierarchy.MarkChangedItems();
987 DOUT0(
"Changed rate interval to %f seconds", t);
989 DOUT0(
"SetRateInterval Could not find hierarchy child %s", fRateIntervalName.c_str());
1000 fHierarchy.MarkChangedItems();
1001 DOUT0(
"Changed history depth to %d entries", entries);
1003 DOUT0(
"SetHistoryDepth Could not find hierarchy child %s", fHistoryName.c_str());
1011 fHierarchy.GetHChild(
"DataRate").EnableHistory(0,
true);
1012 fHierarchy.GetHChild(
"DataRate").EnableHistory(fHistory,
true);
1013 fHierarchy.GetHChild(
"EventRate").EnableHistory(0,
true);
1014 fHierarchy.GetHChild(
"EventRate").EnableHistory(fHistory,
true);
1015 fHierarchy.GetHChild(
"ServerRate").EnableHistory(0,
true);
1016 fHierarchy.GetHChild(
"ServerRate").EnableHistory(fHistory,
true);
1017 fHierarchy.GetHChild(
"FileRate").EnableHistory(0,
true);
1018 fHierarchy.GetHChild(
"FileRate").EnableHistory(fHistory,
true);
1020 fHierarchy.GetHChild(
"rate_log").EnableHistory(0,
true);
1021 fHierarchy.GetHChild(
"rate_log").EnableHistory(fHistory,
true);
1022 fHierarchy.GetHChild(
"rash_log").EnableHistory(0,
true);
1023 fHierarchy.GetHChild(
"rash_log").EnableHistory(fHistory,
true);
1024 fHierarchy.GetHChild(
"rast_log").EnableHistory(0,
true);
1025 fHierarchy.GetHChild(
"rast_log").EnableHistory(fHistory,
true);
1026 fHierarchy.GetHChild(
"ratf_log").EnableHistory(0,
true);
1027 fHierarchy.GetHChild(
"ratf_log").EnableHistory(fHistory,
true);
1034 if (cmd.
IsName(
"ProcessDaqStatus")) {
1040 AssignAddon(
nullptr);
1043 }
else if (cmd.
IsName(dabc::CmdHierarchyExec::CmdName())) {
1045 std::string cmdpath = cmd.
GetStr(
"Item");
1049 if (cmdpath ==
"CmdMbs") {
1051 if (fWaitingLogger) {
1059 if ((fCmdPort <= 0) || wrk.
null())
1065 }
else if (cmdpath ==
"CmdSetRateInterval") {
1066 DOUT0(
"ExecuteCommand sees CmdSetRateInterval");
1067 double deltat = cmd.
GetDouble(
"time", 3.0);
1068 SetRateInterval(deltat);
1069 SetHistoryDepth(fHistory);
1071 }
else if (cmdpath ==
"CmdSetHistoryDepth") {
1072 DOUT0(
"ExecuteCommand sees CmdSetHistoryDepth");
1073 int entries = cmd.
GetInt(
"entries", 200);
1074 SetHistoryDepth(entries);
1079 }
else if (cmd.
IsName(
"DeleteWorkers")) {
1083 wrk = FindChildRef(
"DaqLogger");
1086 AssignAddon(
nullptr);
1102 dabc::Worker(parent, name),
1116 if (!fAddon.null())
return true;
1120 EOUT(
"Fail open log %d port on node %s", fPort, fMbsNode.c_str());
1128 memset(&fRec, 0,
sizeof(fRec));
1139 if (!CreateAddon()) ActivateTimeout(5);
1141 DOUT2(
"mbs::DaqLogWorker::OnThreadAssigned parent = %p",
GetParent());
1147 if (CreateAddon())
return -1;
1160 if (fRec.iOrder==1) {
1162 if (fRec.iType == 1) {
1163 DOUT4(
"Keep alive message from MBS logger");
1165 DOUT2(
"Get MSG: %s",fRec.fBuffer);
1171 memset(&fRec, 0,
sizeof(fRec));
1174 if (add) add->
StartRecv(&fRec,
sizeof(fRec));
1186 EOUT(
"Problem with logger - reconnect");
1187 AssignAddon(
nullptr);
1199 const std::string &mbsnode,
int port) :
1200 dabc::Worker(parent, name),
1203 fCmds(
dabc::CommandsQueue::kindPostponed),
1211 DOUT3(
"Destroy DaqRemCmdWorker");
1223 if (!fAddon.null())
return true;
1227 EOUT(
"Fail open command port %d on node %s", fPort, fMbsNode.c_str());
1235 DOUT2(
"ADDON:%p Created cmd socket %d to mbs %s:%d", addon, fd, fMbsNode.c_str(), fPort);
1240 memset(&fRecvBuf, 0,
sizeof(fRecvBuf));
1241 addon->
StartRecv(&fRecvBuf,
sizeof(fRecvBuf));
1258 if (!fRecvBuf.CheckByteOrder()) {
1259 EOUT(
"Fail to decode data in receive buffer");
1261 if (fRecvBuf.l_cmdid == 0xffffffff) {
1265 if ((fCmds.Size()>0) && (fState == ioWaitReply)) {
1268 DOUT3(
"mbs::DaqRemCmdWorker get reply for the command id %u", (
unsigned) fRecvBuf.l_cmdid);
1270 bool res = fRecvBuf.l_status==0;
1272 if (fSendCmdId!= fRecvBuf.l_cmdid) {
1273 EOUT(
"Mismatch of command id in the MBS reply");
1278 if (pl) pl->
NewSendCommand(fCmds.Front().GetStr(
"cmd"), res ? 1 : 0);
1280 fCmds.Pop().ReplyBool(res);
1287 EOUT(
"ADDON disappear !!!");
1288 ActivateTimeout(3.);
1290 memset(&fRecvBuf, 0,
sizeof(fRecvBuf));
1291 addon->
StartRecv(&fRecvBuf,
sizeof(fRecvBuf));
1299 AssignAddon(
nullptr);
1300 if ((fState==ioWaitReply) && (fCmds.Size()>0)) {
1305 ActivateTimeout(1.);
1314 if (CreateAddon()) ProcessNextMbsCommand();
1321 if (cmd.
IsName(dabc::CmdHierarchyExec::CmdName())) {
1323 ProcessNextMbsCommand();
1333 if (fState != ioInit)
return;
1335 if (fCmds.Size()==0)
return;
1339 if ((addon==0) || !addon->
IsSocket()) {
1340 EOUT(
"Something went wrong");
1344 std::string mbscmd = fCmds.Front().GetStr(
"cmd");
1345 if (mbscmd.length() >=
sizeof(fSendBuf.c_cmd)-1) {
1346 EOUT(
"Send command too long %u", mbscmd.length());
1348 ProcessNextMbsCommand();
1352 DOUT2(
"Send MBS-CMD: %s", mbscmd.c_str());
1357 fState = ioWaitReply;
1359 if (++fSendCmdId > 0x7fff0000) fSendCmdId = 1;
1361 memset(&fSendBuf, 0,
sizeof(fSendBuf));
1362 fSendBuf.l_order = 1;
1363 fSendBuf.l_cmdid = fSendCmdId;
1364 fSendBuf.l_status = 0;
1365 strncpy(fSendBuf.c_cmd, mbscmd.c_str(),
sizeof(fSendBuf.c_cmd)-1);
1366 addon->
StartSend(&fSendBuf,
sizeof(fSendBuf));
1372 const std::string &mbsnode,
int port) :
1373 dabc::Worker(parent, name),
1377 fCmds(
dabc::CommandsQueue::kindPostponed),
1381 printf(
"Create prompter client with prefix %s\n", fPrefix.c_str());
1386 DOUT3(
"Destroy PrompterWorker");
1398 if (!fAddon.null())
return true;
1402 EOUT(
"Fail open command port %d on node %s", fPort, fMbsNode.c_str());
1410 DOUT2(
"ADDON:%p Created cmd socket %d to mbs %s:%d", addon, fd, fMbsNode.c_str(), fPort);
1415 memset(fRecvBuf, 0,
sizeof(fRecvBuf));
1416 addon->
StartRecv(fRecvBuf,
sizeof(fRecvBuf));
1432 if ((fCmds.Size()>0) && (fState == ioWaitReply)) {
1440 DOUT3(
"mbs::PrompterWorker get reply for the command id %u", (
unsigned) fRecvBuf[1]);
1442 if (fRecvBuf[0]!=1) {
1443 EOUT(
"Wrong reply from the prompter");
1446 if (fRecvBuf[1]!=0) {
1451 if (pl) pl->
NewSendCommand(fCmds.Front().GetStr(
"cmd"), res ? 1 : 0);
1453 fCmds.Pop().ReplyBool(res);
1460 EOUT(
"ADDON disappear !!!");
1461 ActivateTimeout(3.);
1463 memset(fRecvBuf, 0,
sizeof(fRecvBuf));
1464 addon->
StartRecv(fRecvBuf,
sizeof(fRecvBuf));
1472 AssignAddon(
nullptr);
1473 if ((fState==ioWaitReply) && (fCmds.Size()>0)) {
1478 ActivateTimeout(1.);
1487 if (CreateAddon()) ProcessNextMbsCommand();
1494 if (cmd.
IsName(dabc::CmdHierarchyExec::CmdName())) {
1496 ProcessNextMbsCommand();
1506 if (fState != ioInit)
return;
1508 if (fCmds.Size()==0)
return;
1512 if ((addon==0) || !addon->
IsSocket()) {
1513 EOUT(
"Something went wrong");
1517 std::string mbscmd = fCmds.Front().GetStr(
"cmd");
1518 if (mbscmd.length() >=
sizeof(fSendBuf) - fPrefix.length()) {
1519 EOUT(
"Send command too long %u", mbscmd.length());
1520 fCmds.Pop().ReplyBool(
false);
1521 ProcessNextMbsCommand();
1525 DOUT2(
"Send MBS-CMD: %s", mbscmd.c_str());
1530 fState = ioWaitReply;
1532 strcpy(fSendBuf, fPrefix.c_str());
1533 strcat(fSendBuf, mbscmd.c_str());
1534 addon->
StartSend(fSendBuf,
sizeof(fSendBuf));
Command definition class.
CommandDefinition & AddArg(const std::string &name, const std::string &kind="string", bool required=true, const RecordField &dflt=RecordField())
Represents command with its arguments.
double GetDouble(const std::string &name, double dflt=0.) const
std::string GetStr(const std::string &name, const std::string &dflt="") const
int GetInt(const std::string &name, int dflt=0) const
Represents objects hierarchy of remote (or local) DABC process.
void MarkChangedItems(uint64_t tm=0)
If any field was modified, item will be marked with new version.
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Hierarchy GetHChild(const std::string &name, bool allowslahes=false, bool force=false, bool sortorder=false)
Return child, if necessary creates with full subfolder If force specified, missing childs and folders...
void EnableHistory(unsigned length=100, bool withchilds=false)
Activate history production for selected element and its childs.
virtual void OnThreadAssigned()
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
bool SetField(const std::string &name, const RecordField &v)
bool SetFieldModified(const std::string &name, bool on=true)
Reference on the arbitrary object
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
void Destroy()
Release reference and starts destroyment of referenced object.
@ evntSocketSendInfo
event delivered to worker when write is completed
@ evntSocketRecvInfo
event delivered to worker when read is completed
@ evntSocketCloseInfo
event delivered to worker when socket is closed
@ evntSocketErrorInfo
event delivered to worker when error is detected
void SetDeliverEventsToWorker(bool on=true)
Socket addon for handling I/O events.
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
bool StartRecv(void *buf, size_t size)
static std::string DefineHostName(bool force=true)
Return current host name.
static int StartClient(const std::string &host, int nport, bool nonblocking=true)
virtual void OnThreadAssigned()
bool ActivateTimeout(double tmout_sec)
Reference on dabc::Worker
Active object, which is working inside dabc::Thread.
virtual void OnThreadAssigned()
virtual void ProcessEvent(const EventId &)
bool Submit(Command cmd)
Submit command for execution in the processor.
bool AssignToThread(ThreadRef thrd, bool sync=true)
Assign worker to thread, worker becomes active immediately.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
DaqLogWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
virtual void ProcessEvent(const dabc::EventId &)
virtual double ProcessTimeout(double last_diff)
virtual void OnThreadAssigned()
void ProcessNextMbsCommand()
virtual void OnThreadAssigned()
virtual ~DaqRemCmdWorker()
virtual double ProcessTimeout(double last_diff)
virtual void ProcessEvent(const dabc::EventId &)
DaqRemCmdWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Addon class to retrieve status record from MBS server
virtual void OnThreadAssigned()
virtual void OnRecvCompleted()
Method called when receive operation is completed.
virtual double ProcessTimeout(double last_diff)
mbs::DaqStatus & GetStatus()
virtual unsigned WriteRecRawData(void *ptr, unsigned maxsize)
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
Interface module for MBS monitoring and control.
virtual void OnThreadAssigned()
void SetRateInterval(double t)
set sampling interval for rates calculation
void NewMessage(const std::string &msg)
Called by LogWorker to inform about new message.
void FillStatistic(const std::string &options, const std::string &itemname, mbs::DaqStatus *old_daqst, mbs::DaqStatus *new_daqst, double difftime)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
void UpdateSetupState(int isloaded)
update mbs setup loaded state
void NewSendCommand(const std::string &cmd, int res=-1)
Called by CmdWorker to inform about new command send (or getting reply)
void UpdateMbsState(int isrunning)
update mbs acq running state
Monitor(const std::string &name, dabc::Command cmd=nullptr)
void UpdateFileState(int isopen)
update file on/off state
void SetHistoryDepth(int entries)
set depth of variable history buffer
void CreateCommandWorker()
void LoggerAddonCreated()
Called by LogWorker to inform that connection is established.
void NewStatus(mbs::DaqStatus &stat)
Called by DaqStatusAddon to inform about new daq status.
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
virtual unsigned WriteRecRawData(void *ptr, unsigned maxsize)
virtual void ProcessEvent(const dabc::EventId &)
void ProcessNextMbsCommand()
virtual double ProcessTimeout(double last_diff)
PrompterWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
virtual ~PrompterWorker()
virtual void OnThreadAssigned()
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
std::string format(const char *fmt,...)
Support for MBS - standard GSI DAQ.
void SwapData(void *data, unsigned bytessize)
Event structure, exchanged between DABC threads.
Class for acquiring and holding timestamps.
void GetNow()
Method to acquire current time stamp.
double AsDouble() const
Return time stamp in form of double (in seconds)
uint32_t bl_no_stream_buf
uint32_t bl_n_evserv_events
uint32_t bh_acqui_running
uint32_t bh_running[SYS__N_MAX_PROCS]
uint32_t bl_n_strserv_bufs
uint32_t bl_n_strserv_kbytes