25 #define SYS__read_meb         0 
   26 #define SYS__collector        1 
   27 #define SYS__transport        2 
   28 #define SYS__event_serv       3 
   29 #define SYS__msg_log          4 
   30 #define SYS__dispatch         5 
   32 #define SYS__sbs_mon          7 
   33 #define SYS__read_cam_slav    8 
   34 #define SYS__esone_serv       9 
   35 #define SYS__stream_serv     10 
   36 #define SYS__histogram       11 
   37 #define SYS__prompt          12 
   38 #define SYS__daq_rate        13 
   45 #define SYS__vme_serv        20 
   47 #define VERSION__SETUP  1 
   48 #define VERSION__SET_ML 1 
   49 #define VERSION__SET_MO 1 
   53    dabc::SocketIOAddon(fd),
 
   59    SetDeleteWorkerOnClose(
false);
 
   72       EOUT(
"Get thread assigned in not-init state - check why");
 
   90    EOUT(
"Status timeout - delete addon");
 
   92    EOUT(
"Status timeout - delete addon called");
 
   98    if (fState == ioRecvHeader) {
 
  100       if(fStatus.l_endian != 1) fSwapping = 
true;
 
  105       if ((fStatus.l_version != 51) && (fStatus.l_version != 62) && (fStatus.l_version != 63))  {
 
  106          EOUT(
"Unsupported status version %u", (
unsigned) fStatus.l_version);
 
  111       StartRecv(&fStatus.bh_daqst_initalized, (fStatus.l_fix_lw-7)*4);
 
  115    if (fState == ioRecvData) {
 
  118          mbs::SwapData(&fStatus.bh_daqst_initalized, (fStatus.l_fix_lw-7)*4 - (19 * fStatus.l_sbs__str_len_64));
 
  126    EOUT(
"Wrong state when recv data");
 
  133    mbs::MonitorSlowControl(name, 
"Mbs", cmd),
 
  143    fWaitingLogger(true),
 
  148    fMbsNode = Cfg(
"node", cmd).AsStr();
 
  149    fAlias = Cfg(
"alias", cmd).AsStr(fMbsNode);
 
  150    fPeriod = Cfg(
"period", cmd).AsDouble(1.);
 
  151    fRateInterval = Cfg(
"rateinterval", cmd).AsDouble(1.);
 
  152    fHistory = Cfg(
"history", cmd).AsInt(200);
 
  153    bool publish = Cfg(
"publish", cmd).AsBool(
true);
 
  154    fPrintf = Cfg(
"printf", cmd).AsBool(
false);
 
  156    fFileStateName = 
"MbsFileOpen";
 
  157    fAcqStateName = 
"MbsAcqRunning";
 
  158    fSetupStateName = 
"MbsSetupLoaded";
 
  159    fRateIntervalName = 
"MbsRateInterval";
 
  160    fHistoryName = 
"MbsHistoryDepth";
 
  162    if (Cfg(
"stat", cmd).AsStr() == 
"true")
 
  165       fStatPort = Cfg(
"stat", cmd).AsInt(0);
 
  167    if (Cfg(
"logger", cmd).AsStr() == 
"true")
 
  170       fLoggerPort = Cfg(
"logger", cmd).AsInt(0);
 
  172    if (Cfg(
"cmd", cmd).AsStr() == 
"true")
 
  175       fCmdPort = Cfg(
"cmd", cmd).AsInt(0);
 
  177    fHierarchy.Create(
"MBS");
 
  236    SetRateInterval(fRateInterval); 
 
  237    SetHistoryDepth(fHistory);
 
  243       cmddef.
AddArg(
"cmd", 
"string", 
true, 
"show rate");
 
  248       cmddef_rate.
AddArg(
"time", 
"double", 
true, 
"1.0");
 
  253       cmddef_hist.
AddArg(
"entries", 
"int", 
true, 
"100");
 
  258    ui.
SetField(
"_UserFilePath", 
"${DABCSYS}/plugins/mbs/htm/");
 
  259    ui.
SetField(
"_UserFileMain", 
"main.htm");
 
  261    CreateTimer(
"update", 5.);
 
  262    CreateTimer(
"MbsUpdate", fPeriod);
 
  270       Publish(fHierarchy, std::string(
"/MBS/") + fAlias);
 
  272    if (fLoggerPort <= 0) NewMessage(
"!!! logger not activated !!!");
 
  282    if (fLoggerPort > 0) {
 
  283       DaqLogWorker* logger = 
new DaqLogWorker(
this, 
"DaqLogger", fMbsNode, fLoggerPort);
 
  284       logger->AssignToThread(thread());
 
  288       CreateCommandWorker();
 
  296    fWaitingLogger = 
false;
 
  299    if (!wrk.
null()) 
return;
 
  301    DOUT0(
"Create command worker");
 
  305       remcmd = 
new PrompterWorker(
this, 
"DaqCmd", fMbsNode, fCmdPort);
 
  306    } 
else if (fCmdPort > 0) {
 
  307       remcmd = 
new DaqRemCmdWorker(
this, 
"DaqCmd", fMbsNode, fCmdPort);
 
  311       if (!fWaitingCmd.null())
 
  312          remcmd->
Submit(fWaitingCmd);
 
  318    int bStreams_n = 0, bBuffers_n = 0, bEvents_n = 0, bData_n = 0;
 
  319    int bStreams_r = 0, bBuffers_r = 0, bEvents_r = 0, bData_r = 0;
 
  320    int bSrvStreams_n = 0, bSrvEvents_n = 0, bSrvBuffers_n = 0, bSrvData_n = 0;
 
  321    int bSrvStreams_r = 0, bSrvEvents_r = 0, bSrvBuffers_r = 0, bSrvData_r = 0;
 
  322    int bFilename = 0, bFileFilled = 0, bFileData = 0, bFileData_r = 0, bFileIndex = 0;
 
  323    int bStreamsFree = 0, bStreamsFilled = 0, bStreamsSrv = 0;
 
  325    int bl_file = 0, bl_server = 0, bl_mbs = 0, bl_streams = 0;
 
  327    int l_free_stream(0), l_trans_stream(0), l_serv_stream(0);
 
  335    strncpy(c_line, options.c_str(), 
sizeof(c_line)-1);
 
  337    if (strstr (c_line, 
"-nst") != NULL )
 
  339    if (strstr (c_line, 
"-rst") != NULL )
 
  341    if (strstr (c_line, 
"-nsst") != NULL )
 
  343    if (strstr (c_line, 
"-rsst") != NULL )
 
  345    if (strstr (c_line, 
"-est") != NULL )
 
  347    if (strstr (c_line, 
"-fst") != NULL )
 
  349    if (strstr (c_line, 
"-kst") != NULL )
 
  351    if (strstr (c_line, 
"-nbu") != NULL )
 
  353    if (strstr (c_line, 
"-rbu") != NULL )
 
  355    if (strstr (c_line, 
"-nsbu") != NULL )
 
  357    if (strstr (c_line, 
"-rsbu") != NULL )
 
  359    if (strstr (c_line, 
"-nev") != NULL )
 
  361    if (strstr (c_line, 
"-rev") != NULL )
 
  363    if (strstr (c_line, 
"-nsev") != NULL )
 
  365    if (strstr (c_line, 
"-rsev") != NULL )
 
  367    if (strstr (c_line, 
"-nda") != NULL )
 
  369    if (strstr (c_line, 
"-rda") != NULL )
 
  371    if (strstr (c_line, 
"-nsda") != NULL )
 
  373    if (strstr (c_line, 
"-rsda") != NULL )
 
  375    if (strstr (c_line, 
"-sfi") != NULL )
 
  377    if (strstr (c_line, 
"-ffi") != NULL )
 
  379    if (strstr (c_line, 
"-rfi") != NULL )
 
  381    if (strstr (c_line, 
"-nfi") != NULL )
 
  383    if (strstr (c_line, 
"-ifi") != NULL )
 
  385    if (strstr (c_line, 
"-mbs") != NULL )
 
  400    if (strstr (c_line, 
"-u") != NULL )
 
  414    if (strstr (c_line, 
"-fi") != NULL )
 
  422    if (strstr (c_line, 
"-st") != NULL )
 
  430    if (strstr (c_line, 
"-se") != NULL )
 
  448    if (strstr (c_line, 
"-ra") != NULL )
 
  466    bl_mbs = bData_n + bEvents_n + bBuffers_n + bStreams_n + bData_r + bEvents_r + bBuffers_r + bStreams_r;
 
  467    bl_server = bSrvData_n + bSrvEvents_n + bSrvBuffers_n + bSrvStreams_n + bSrvData_r + bSrvEvents_r + bSrvBuffers_r
 
  469    bl_streams = bStreamsFree + bStreamsFilled + bStreamsSrv;
 
  470    bl_file = bFileData + bFileFilled + bFileData_r + bFilename + bFileIndex;
 
  472    memset(c_head0, 0, 
sizeof(c_head0));
 
  473    memset(c_head, 0, 
sizeof(c_head));
 
  476      strncat (c_head0, 
" Event building", 
sizeof(c_head0)-1);
 
  479        strncat (c_head, 
"   MB      ", 
sizeof(c_head)-1);
 
  480        strncat (c_head0, 
"           ", 
sizeof(c_head0)-1);
 
  484        strncat (c_head, 
"   Events  ", 
sizeof(c_head)-1);
 
  485        strncat (c_head0, 
"           ", 
sizeof(c_head0)-1);
 
  489        strncat (c_head, 
"   Buffers ", 
sizeof(c_head)-1);
 
  490        strncat (c_head0, 
"           ", 
sizeof(c_head0)-1);
 
  494        strncat (c_head, 
"Streams ", 
sizeof(c_head)-1);
 
  495        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  499        strncat (c_head, 
" Kb/sec ", 
sizeof(c_head)-1);
 
  500        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  504        strncat (c_head, 
" Ev/sec ", 
sizeof(c_head)-1);
 
  505        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  509        strncat (c_head, 
"Buf/sec ", 
sizeof(c_head)-1);
 
  510        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  514        strncat (c_head, 
"Str/sec ", 
sizeof(c_head)-1);
 
  515        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  517      c_head0[strlen (c_head0) - 15] = 0;
 
  519    bl_server = bSrvData_n + bSrvEvents_n + bSrvBuffers_n + bSrvStreams_n + bSrvData_r + bSrvEvents_r + bSrvBuffers_r
 
  523      strncat (c_head0, 
"| Server ", 
sizeof(c_head0)-1);
 
  524      strncat (c_head, 
"|", 
sizeof(c_head)-1);
 
  527        strncat (c_head, 
"   MB      ", 
sizeof(c_head)-1);
 
  528        strncat (c_head0, 
"           ", 
sizeof(c_head0)-1);
 
  532        strncat (c_head, 
"  Events   ", 
sizeof(c_head)-1);
 
  533        strncat (c_head0, 
"           ", 
sizeof(c_head0)-1);
 
  537        strncat (c_head, 
"  Buffers  ", 
sizeof(c_head)-1);
 
  538        strncat (c_head0, 
"           ", 
sizeof(c_head0)-1);
 
  542        strncat (c_head, 
"Streams ", 
sizeof(c_head)-1);
 
  543        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  547        strncat (c_head, 
" Kb/sec ", 
sizeof(c_head)-1);
 
  548        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  552        strncat (c_head, 
" Ev/sec ", 
sizeof(c_head)-1);
 
  553        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  557        strncat (c_head, 
"Buf/sec ", 
sizeof(c_head)-1);
 
  558        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  562        strncat (c_head, 
"Str/sec ", 
sizeof(c_head)-1);
 
  563        strncat (c_head0, 
"        ", 
sizeof(c_head0)-1);
 
  565      c_head0[strlen (c_head0) - 8] = 0;
 
  567    bl_streams = bStreamsFree + bStreamsFilled + bStreamsSrv;
 
  570      strncat (c_head0, 
"| Streams ", 
sizeof(c_head0)-1);
 
  571      strncat (c_head, 
"|", 
sizeof(c_head)-1);
 
  574        strncat (c_head, 
"Empty ", 
sizeof(c_head)-1);
 
  575        strncat (c_head0, 
"      ", 
sizeof(c_head0)-1);
 
  579        strncat (c_head, 
"Full ", 
sizeof(c_head)-1);
 
  580        strncat (c_head0, 
"     ", 
sizeof(c_head0)-1);
 
  584        strncat (c_head, 
"Hold ", 
sizeof(c_head)-1);
 
  585        strncat (c_head0, 
"     ", 
sizeof(c_head0)-1);
 
  587      c_head0[strlen (c_head0) - 9] = 0;
 
  589    bl_file = bFileData + bFileFilled + bFileData_r + bFilename + bFileIndex;
 
  592      strncat (c_head0, 
"| File output ", 
sizeof(c_head0)-1);
 
  594        strncat (c_head0, 
"                ", 
sizeof(c_head0)-1);
 
  595      strncat (c_head, 
"|", 
sizeof(c_head)-1);
 
  597        strncat (c_head, 
"    MB   ", 
sizeof(c_head)-1);
 
  599        strncat (c_head, 
"Filled  ", 
sizeof(c_head)-1);
 
  601        strncat (c_head, 
"  Kb/sec ", 
sizeof(c_head)-1);
 
  603        strncat (c_head, 
"Index ", 
sizeof(c_head)-1);
 
  605        strncat (c_head, 
"Filename", 
sizeof(c_head)-1);
 
  618    double r_new_kb = new_daqst->
bl_n_kbyte * 1.024;
 
  631    double r_old_kb = old_daqst->
bl_n_kbyte * 1.024;
 
  638    double r_rate_buf = (r_new_buf - r_old_buf) / diff_time;
 
  639    double r_rate_kb = (r_new_kb - r_old_kb) / diff_time;
 
  640    double r_rate_evt = (r_new_evt - r_old_evt) / diff_time;
 
  641    double r_rate_stream = (r_new_stream - r_old_stream) / diff_time;
 
  642    double r_rate_evsrv_evt = (r_new_evsrv_evt - r_old_evsrv_evt) / diff_time;
 
  645    double r_rate_file_kb = (r_new_file_kb - r_old_file_kb) / diff_time;
 
  646    double r_rate_strsrv_str = (r_new_strsrv_str - r_old_strsrv_str) / diff_time / bl_n_ev_buf;
 
  647    double r_rate_strsrv_buf = (r_new_strsrv_buf - r_old_strsrv_buf) / diff_time;
 
  648    double r_rate_strsrv_kb = (r_new_strsrv_kb - r_old_strsrv_kb) / diff_time;
 
  650    memset(c_out, 0, 
sizeof(c_out));
 
  651    memset(c_line, 0, 
sizeof(c_line));
 
  656        snprintf (c_line, 
sizeof(c_line), 
"%10.0f ", r_new_buf / 1000000. * bl_ev_buf_len);
 
  657        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  661        snprintf (c_line, 
sizeof(c_line), 
"%10u ", (
unsigned) new_daqst->
bl_n_events);
 
  662        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  666        snprintf (c_line, 
sizeof(c_line), 
"%10u ", (
unsigned) new_daqst->
bl_n_buffers);
 
  667        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  671        snprintf (c_line, 
sizeof(c_line), 
"%7u ", (
unsigned) new_daqst->
bl_n_bufstream);
 
  672        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  676        snprintf (c_line, 
sizeof(c_line), 
"%7.1f ", r_rate_kb);
 
  677        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  681        snprintf (c_line, 
sizeof(c_line), 
"%7.0f ", r_rate_evt);
 
  682        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  686        snprintf (c_line, 
sizeof(c_line), 
"%7.0f ", r_rate_buf);
 
  687        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  691        snprintf (c_line, 
sizeof(c_line), 
"%7.0f ", r_rate_stream);
 
  692        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  697      strncat (c_out, 
"|", 
sizeof(c_out)-1);
 
  700        snprintf (c_line, 
sizeof(c_line), 
"%10.0f ", r_new_strsrv_kb / 1000.);
 
  701        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  706        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  710        snprintf (c_line, 
sizeof(c_line), 
"%10u ", (
unsigned) new_daqst->
bl_n_strserv_bufs);
 
  711        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  715        snprintf (c_line, 
sizeof(c_line), 
"%7u ", (
unsigned) new_daqst->
bl_n_strserv_bufs / bl_n_ev_buf);
 
  716        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  720        snprintf (c_line, 
sizeof(c_line), 
"%7.1f ", r_rate_strsrv_kb);
 
  721        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  725        snprintf (c_line, 
sizeof(c_line), 
"%7.0f ", r_rate_evsrv_evt);
 
  726        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  730        snprintf (c_line, 
sizeof(c_line), 
"%7.0f ", r_rate_strsrv_buf);
 
  731        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  735        snprintf (c_line, 
sizeof(c_line), 
"%7.0f ", r_rate_strsrv_str);
 
  736        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  741      strncat (c_out, 
"|", 
sizeof(c_out)-1);
 
  744        snprintf (c_line, 
sizeof(c_line), 
"%5d ", l_free_stream);
 
  745        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  749        snprintf (c_line, 
sizeof(c_line), 
"%4d ", l_trans_stream);
 
  750        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  754        snprintf (c_line, 
sizeof(c_line), 
"%4d ", l_serv_stream);
 
  755        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  760      strncat (c_out, 
"|", 
sizeof(c_out)-1);
 
  763        snprintf (c_line, 
sizeof(c_line), 
"%8.0f ", r_new_file_kb / 1000.);
 
  764        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  768        snprintf (c_line, 
sizeof(c_line), 
"%5.1f %% ", new_daqst->
l_file_size > 0 ? r_new_file_kb / new_daqst->
l_file_size * 0.1  : 0.);
 
  769        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  773        snprintf (c_line, 
sizeof(c_line), 
"%8.1f ", r_rate_file_kb);
 
  774        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  778        snprintf (c_line, 
sizeof(c_line), 
" %04u ", (
unsigned) new_daqst->
l_file_cur);
 
  779        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  783        snprintf (c_line, 
sizeof(c_line), 
"%s", new_daqst->
c_file_name);
 
  784        strncat (c_out, c_line, 
sizeof(c_out)-1);
 
  787        strncat (c_out, 
" op", 
sizeof(c_out)-1);
 
  789        strncat (c_out, 
" cl", 
sizeof(c_out)-1);
 
  794    if (fCounter % 20 == 0) {
 
  805       fHierarchy.GetHChild(
"DataRate").SetField(
"value", 
dabc::format(
"%3.1f", r_rate_kb));
 
  806       fHierarchy.GetHChild(
"DataRate").SetFieldModified(
"value");
 
  807       fHierarchy.GetHChild(
"EventRate").SetField(
"value", 
dabc::format(
"%3.1f", r_rate_evt));
 
  808       fHierarchy.GetHChild(
"EventRate").SetFieldModified(
"value");
 
  809       fHierarchy.GetHChild(
"ServerRate").SetField(
"value", 
dabc::format(
"%3.1f", r_rate_strsrv_kb));
 
  810       fHierarchy.GetHChild(
"ServerRate").SetFieldModified(
"value");
 
  811       fHierarchy.GetHChild(
"FileRate").SetField(
"value", 
dabc::format(
"%3.1f", r_rate_file_kb));
 
  812       fHierarchy.GetHChild(
"FileRate").SetFieldModified(
"value");
 
  816       std::string prefix = std::string(
"MBS.") + fMbsNode + std::string(
".");
 
  817       fRec.AddDouble(prefix + 
"DataRate", r_rate_kb, 
true);
 
  818       fRec.AddDouble(prefix + 
"EventRate", r_rate_evt, 
true);
 
  819       fRec.AddDouble(prefix + 
"ServerRate", r_rate_strsrv_kb, 
true);
 
  820       fRec.AddDouble(prefix + 
"FileRate", r_rate_file_kb, 
true);
 
  823    fHierarchy.MarkChangedItems();
 
  828    if (TimerName(timer) != 
"MbsUpdate") {
 
  833    if (fMbsNode.empty()) {
 
  836        double v1 = 100. * (1.3 + sin(
dabc::Now().AsDouble()/5.));
 
  837        fHierarchy.GetHChild(
"DataRate").SetField(
"value", 
dabc::format(
"%4.2f", v1));
 
  839        v1 = 100. * (1.3 + cos(
dabc::Now().AsDouble()/8.));
 
  840        fHierarchy.GetHChild(
"EventRate").SetField(
"value", 
dabc::format(
"%4.2f", v1));
 
  842        fHierarchy.GetHChild(
"rate_log").SetField(
"value", 
dabc::format(
"| Header  |   Entry      |      Rate  |"));
 
  843        fHierarchy.GetHChild(
"rate_log").SetField(
"value", 
dabc::format(
"|         |    %5d       |     %6.2f  |", fCounter,v1));
 
  845        fHierarchy.MarkChangedItems();
 
  853    if (!fAddon.null() || (fStatPort<=0)) 
return;
 
  857       EOUT(
"FAIL status port %d for node %s", fStatPort, fMbsNode.c_str());
 
  859       AssignAddon(
new DaqStatusAddon(fd));
 
  867    CreateCommandWorker();
 
  877       fHierarchy.MarkChangedItems();
 
  880    if (fPrintf) printf(
"%s\n", msg.c_str());
 
  886    if (!fPrintf) 
return;
 
  887    if (res>=0) printf(
"replcmd>%s res=%s\n", cmd.c_str(), 
DBOOL(res));
 
  888           else printf(
"sendcmd>%s\n", cmd.c_str());
 
  897    double tmdiff = stamp.
AsDouble() - fStatStamp.AsDouble();
 
  899       EOUT(
"Wrong time calculation");
 
  902    double deltaT=fabs((tmdiff-fRateInterval)/tmdiff); 
 
  903    DOUT3(
"NEW STATUS with rate interval:%f , dt=%f\n, delta=%f", fRateInterval, tmdiff, deltaT);
 
  904    if ((tmdiff>0) && ((deltaT<1/fRateInterval) || (ceil(tmdiff) > 1 + fRateInterval) ) )
 
  907      if (!fStatus.null()){
 
  908       FillStatistic(
"-u", 
"rate_log", &fStatus, &stat, tmdiff);
 
  909       FillStatistic(
"-rev -rda -nev -nda", 
"rash_log", &fStatus, &stat, tmdiff);
 
  910       FillStatistic(
"-rev -rda -nev -nda -rsda", 
"rast_log", &fStatus, &stat, tmdiff);
 
  911       FillStatistic(
"-rev -rda -nev -nda -rsda -fi", 
"ratf_log", &fStatus, &stat, tmdiff);
 
  913       DOUT3(
"Filled statistics with rate interval:%f after dt=%f\n", fRateInterval, tmdiff);
 
  917       memcpy(&fStatus, &stat, 
sizeof(stat));
 
  940       fHierarchy.MarkChangedItems();
 
  943       DOUT0(
"UpdateFileState Could not find hierarchy child %s", fFileStateName.c_str());
 
  953       fHierarchy.MarkChangedItems();
 
  956       DOUT0(
"UpdateMbsState Could not find hierarchy child %s", fAcqStateName.c_str());
 
  967          fHierarchy.MarkChangedItems();
 
  972         DOUT0(
"UpdateSetupState Could not find hierarchy child %s", fSetupStateName.c_str());
 
  986       fHierarchy.MarkChangedItems();
 
  987       DOUT0(
"Changed rate interval to %f seconds", t);
 
  989       DOUT0(
"SetRateInterval Could not find hierarchy child %s", fRateIntervalName.c_str());
 
 1000       fHierarchy.MarkChangedItems();
 
 1001       DOUT0(
"Changed history depth to %d entries", entries);
 
 1003       DOUT0(
"SetHistoryDepth  Could not find hierarchy child %s", fHistoryName.c_str());
 
 1011    fHierarchy.GetHChild(
"DataRate").EnableHistory(0, 
true);
 
 1012    fHierarchy.GetHChild(
"DataRate").EnableHistory(fHistory, 
true);
 
 1013    fHierarchy.GetHChild(
"EventRate").EnableHistory(0, 
true);
 
 1014    fHierarchy.GetHChild(
"EventRate").EnableHistory(fHistory, 
true);
 
 1015    fHierarchy.GetHChild(
"ServerRate").EnableHistory(0, 
true);
 
 1016    fHierarchy.GetHChild(
"ServerRate").EnableHistory(fHistory, 
true);
 
 1017    fHierarchy.GetHChild(
"FileRate").EnableHistory(0, 
true);
 
 1018    fHierarchy.GetHChild(
"FileRate").EnableHistory(fHistory, 
true);
 
 1020    fHierarchy.GetHChild(
"rate_log").EnableHistory(0, 
true);
 
 1021    fHierarchy.GetHChild(
"rate_log").EnableHistory(fHistory, 
true);
 
 1022    fHierarchy.GetHChild(
"rash_log").EnableHistory(0, 
true);
 
 1023    fHierarchy.GetHChild(
"rash_log").EnableHistory(fHistory, 
true);
 
 1024    fHierarchy.GetHChild(
"rast_log").EnableHistory(0, 
true);
 
 1025    fHierarchy.GetHChild(
"rast_log").EnableHistory(fHistory, 
true);
 
 1026    fHierarchy.GetHChild(
"ratf_log").EnableHistory(0, 
true);
 
 1027    fHierarchy.GetHChild(
"ratf_log").EnableHistory(fHistory, 
true);
 
 1034    if (cmd.
IsName(
"ProcessDaqStatus")) {
 
 1040       AssignAddon(
nullptr);
 
 1043    } 
else if (cmd.
IsName(dabc::CmdHierarchyExec::CmdName())) {
 
 1045       std::string cmdpath = cmd.
GetStr(
"Item");
 
 1049       if (cmdpath == 
"CmdMbs") {
 
 1051          if (fWaitingLogger) {
 
 1059          if ((fCmdPort <= 0) || wrk.
null())
 
 1065       } 
else if (cmdpath == 
"CmdSetRateInterval") {
 
 1066          DOUT0(
"ExecuteCommand  sees CmdSetRateInterval");
 
 1067          double deltat = cmd.
GetDouble(
"time", 3.0); 
 
 1068          SetRateInterval(deltat);
 
 1069          SetHistoryDepth(fHistory); 
 
 1071       } 
else if (cmdpath == 
"CmdSetHistoryDepth") {
 
 1072          DOUT0(
"ExecuteCommand  sees CmdSetHistoryDepth");
 
 1073          int entries = cmd.
GetInt(
"entries", 200); 
 
 1074          SetHistoryDepth(entries);
 
 1079    } 
else if (cmd.
IsName(
"DeleteWorkers")) {
 
 1083       wrk = FindChildRef(
"DaqLogger");
 
 1086       AssignAddon(
nullptr);
 
 1102    dabc::Worker(parent, name),
 
 1116    if (!fAddon.null()) 
return true;
 
 1120       EOUT(
"Fail open log %d port on node %s", fPort, fMbsNode.c_str());
 
 1128    memset(&fRec, 0, 
sizeof(fRec));
 
 1139    if (!CreateAddon()) ActivateTimeout(5);
 
 1141    DOUT2(
"mbs::DaqLogWorker::OnThreadAssigned parent = %p", 
GetParent());
 
 1147    if (CreateAddon()) 
return -1;
 
 1160          if (fRec.iOrder==1) {
 
 1162             if (fRec.iType == 1) {
 
 1163                DOUT4(
"Keep alive message from MBS logger");
 
 1165                DOUT2(
"Get MSG: %s",fRec.fBuffer);
 
 1171          memset(&fRec, 0, 
sizeof(fRec));
 
 1174          if (add) add->
StartRecv(&fRec, 
sizeof(fRec));
 
 1186          EOUT(
"Problem with logger - reconnect");
 
 1187          AssignAddon(
nullptr);
 
 1199                                      const std::string &mbsnode, 
int port) :
 
 1200    dabc::Worker(parent, name),
 
 1203    fCmds(
dabc::CommandsQueue::kindPostponed),
 
 1211    DOUT3(
"Destroy DaqRemCmdWorker");
 
 1223    if (!fAddon.null()) 
return true;
 
 1227       EOUT(
"Fail open command port %d on node %s", fPort, fMbsNode.c_str());
 
 1235    DOUT2(
"ADDON:%p Created cmd socket %d to mbs %s:%d", addon, fd, fMbsNode.c_str(), fPort);
 
 1240    memset(&fRecvBuf, 0, 
sizeof(fRecvBuf));
 
 1241    addon->
StartRecv(&fRecvBuf, 
sizeof(fRecvBuf));
 
 1258          if (!fRecvBuf.CheckByteOrder()) {
 
 1259             EOUT(
"Fail to decode data in receive buffer");
 
 1261          if (fRecvBuf.l_cmdid == 0xffffffff) {
 
 1265          if ((fCmds.Size()>0) && (fState == ioWaitReply)) {
 
 1268             DOUT3(
"mbs::DaqRemCmdWorker get reply for the command id %u", (
unsigned) fRecvBuf.l_cmdid);
 
 1270             bool res = fRecvBuf.l_status==0;
 
 1272             if (fSendCmdId!= fRecvBuf.l_cmdid) {
 
 1273                EOUT(
"Mismatch of command id in the MBS reply");
 
 1278             if (pl) pl->
NewSendCommand(fCmds.Front().GetStr(
"cmd"), res ? 1 : 0);
 
 1280             fCmds.Pop().ReplyBool(res);
 
 1287             EOUT(
"ADDON disappear !!!");
 
 1288             ActivateTimeout(3.);
 
 1290             memset(&fRecvBuf, 0, 
sizeof(fRecvBuf));
 
 1291             addon->
StartRecv(&fRecvBuf, 
sizeof(fRecvBuf));
 
 1299          AssignAddon(
nullptr);
 
 1300          if ((fState==ioWaitReply) && (fCmds.Size()>0)) {
 
 1305          ActivateTimeout(1.);
 
 1314    if (CreateAddon()) ProcessNextMbsCommand();
 
 1321    if (cmd.
IsName(dabc::CmdHierarchyExec::CmdName())) {
 
 1323       ProcessNextMbsCommand();
 
 1333    if (fState != ioInit) 
return;
 
 1335    if (fCmds.Size()==0) 
return;
 
 1339    if ((addon==0) || !addon->
IsSocket()) {
 
 1340       EOUT(
"Something went wrong");
 
 1344    std::string mbscmd = fCmds.Front().GetStr(
"cmd");
 
 1345    if (mbscmd.length() >= 
sizeof(fSendBuf.c_cmd)-1) {
 
 1346       EOUT(
"Send command too long %u", mbscmd.length());
 
 1348       ProcessNextMbsCommand();
 
 1352    DOUT2(
"Send MBS-CMD:  %s", mbscmd.c_str());
 
 1357    fState = ioWaitReply;
 
 1359    if (++fSendCmdId > 0x7fff0000) fSendCmdId = 1;
 
 1361    memset(&fSendBuf, 0, 
sizeof(fSendBuf));
 
 1362    fSendBuf.l_order = 1;
 
 1363    fSendBuf.l_cmdid = fSendCmdId;
 
 1364    fSendBuf.l_status = 0;
 
 1365    strncpy(fSendBuf.c_cmd, mbscmd.c_str(), 
sizeof(fSendBuf.c_cmd)-1);
 
 1366    addon->
StartSend(&fSendBuf, 
sizeof(fSendBuf));
 
 1372                                      const std::string &mbsnode, 
int port) :
 
 1373    dabc::Worker(parent, name),
 
 1377    fCmds(
dabc::CommandsQueue::kindPostponed),
 
 1381    printf(
"Create prompter client with prefix %s\n", fPrefix.c_str());
 
 1386    DOUT3(
"Destroy PrompterWorker");
 
 1398    if (!fAddon.null()) 
return true;
 
 1402       EOUT(
"Fail open command port %d on node %s", fPort, fMbsNode.c_str());
 
 1410    DOUT2(
"ADDON:%p Created cmd socket %d to mbs %s:%d", addon, fd, fMbsNode.c_str(), fPort);
 
 1415    memset(fRecvBuf, 0, 
sizeof(fRecvBuf));
 
 1416    addon->
StartRecv(fRecvBuf, 
sizeof(fRecvBuf));
 
 1432          if ((fCmds.Size()>0) && (fState == ioWaitReply)) {
 
 1440             DOUT3(
"mbs::PrompterWorker get reply for the command id %u", (
unsigned) fRecvBuf[1]);
 
 1442             if (fRecvBuf[0]!=1) {
 
 1443                EOUT(
"Wrong reply from the prompter");
 
 1446             if (fRecvBuf[1]!=0) {
 
 1451             if (pl) pl->
NewSendCommand(fCmds.Front().GetStr(
"cmd"), res ? 1 : 0);
 
 1453             fCmds.Pop().ReplyBool(res);
 
 1460             EOUT(
"ADDON disappear !!!");
 
 1461             ActivateTimeout(3.);
 
 1463             memset(fRecvBuf, 0, 
sizeof(fRecvBuf));
 
 1464             addon->
StartRecv(fRecvBuf, 
sizeof(fRecvBuf));
 
 1472          AssignAddon(
nullptr);
 
 1473          if ((fState==ioWaitReply) && (fCmds.Size()>0)) {
 
 1478          ActivateTimeout(1.);
 
 1487    if (CreateAddon()) ProcessNextMbsCommand();
 
 1494    if (cmd.
IsName(dabc::CmdHierarchyExec::CmdName())) {
 
 1496       ProcessNextMbsCommand();
 
 1506    if (fState != ioInit) 
return;
 
 1508    if (fCmds.Size()==0) 
return;
 
 1512    if ((addon==0) || !addon->
IsSocket()) {
 
 1513       EOUT(
"Something went wrong");
 
 1517    std::string mbscmd = fCmds.Front().GetStr(
"cmd");
 
 1518    if (mbscmd.length() >= 
sizeof(fSendBuf) - fPrefix.length()) {
 
 1519       EOUT(
"Send command too long %u", mbscmd.length());
 
 1520       fCmds.Pop().ReplyBool(
false);
 
 1521       ProcessNextMbsCommand();
 
 1525    DOUT2(
"Send MBS-CMD:  %s", mbscmd.c_str());
 
 1530    fState = ioWaitReply;
 
 1532    strcpy(fSendBuf, fPrefix.c_str());
 
 1533    strcat(fSendBuf, mbscmd.c_str());
 
 1534    addon->
StartSend(fSendBuf, 
sizeof(fSendBuf));
 
Command definition class.
CommandDefinition & AddArg(const std::string &name, const std::string &kind="string", bool required=true, const RecordField &dflt=RecordField())
Represents command with its arguments.
double GetDouble(const std::string &name, double dflt=0.) const
std::string GetStr(const std::string &name, const std::string &dflt="") const
int GetInt(const std::string &name, int dflt=0) const
Represents objects hierarchy of remote (or local) DABC process.
void MarkChangedItems(uint64_t tm=0)
If any field was modified, item will be marked with new version.
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Hierarchy GetHChild(const std::string &name, bool allowslahes=false, bool force=false, bool sortorder=false)
Return child, if necessary creates with full subfolder If force specified, missing childs and folders...
void EnableHistory(unsigned length=100, bool withchilds=false)
Activate history production for selected element and its childs.
virtual void OnThreadAssigned()
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
bool SetField(const std::string &name, const RecordField &v)
bool SetFieldModified(const std::string &name, bool on=true)
Reference on the arbitrary object
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
void Destroy()
Release reference and starts destroyment of referenced object.
@ evntSocketSendInfo
event delivered to worker when write is completed
@ evntSocketRecvInfo
event delivered to worker when read is completed
@ evntSocketCloseInfo
event delivered to worker when socket is closed
@ evntSocketErrorInfo
event delivered to worker when error is detected
void SetDeliverEventsToWorker(bool on=true)
Socket addon for handling I/O events.
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
bool StartRecv(void *buf, size_t size)
static std::string DefineHostName(bool force=true)
Return current host name.
static int StartClient(const std::string &host, int nport, bool nonblocking=true)
virtual void OnThreadAssigned()
bool ActivateTimeout(double tmout_sec)
Reference on dabc::Worker
Active object, which is working inside dabc::Thread.
virtual void OnThreadAssigned()
virtual void ProcessEvent(const EventId &)
bool Submit(Command cmd)
Submit command for execution in the processor.
bool AssignToThread(ThreadRef thrd, bool sync=true)
Assign worker to thread, worker becomes active immediately.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
DaqLogWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
virtual void ProcessEvent(const dabc::EventId &)
virtual double ProcessTimeout(double last_diff)
virtual void OnThreadAssigned()
void ProcessNextMbsCommand()
virtual void OnThreadAssigned()
virtual ~DaqRemCmdWorker()
virtual double ProcessTimeout(double last_diff)
virtual void ProcessEvent(const dabc::EventId &)
DaqRemCmdWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Addon class to retrieve status record from MBS server
virtual void OnThreadAssigned()
virtual void OnRecvCompleted()
Method called when receive operation is completed.
virtual double ProcessTimeout(double last_diff)
mbs::DaqStatus & GetStatus()
virtual unsigned WriteRecRawData(void *ptr, unsigned maxsize)
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
Interface module for MBS monitoring and control.
virtual void OnThreadAssigned()
void SetRateInterval(double t)
set sampling interval for rates calculation
void NewMessage(const std::string &msg)
Called by LogWorker to inform about new message.
void FillStatistic(const std::string &options, const std::string &itemname, mbs::DaqStatus *old_daqst, mbs::DaqStatus *new_daqst, double difftime)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
void UpdateSetupState(int isloaded)
update mbs setup loaded state
void NewSendCommand(const std::string &cmd, int res=-1)
Called by CmdWorker to inform about new command send (or getting reply)
void UpdateMbsState(int isrunning)
update mbs acq running state
Monitor(const std::string &name, dabc::Command cmd=nullptr)
void UpdateFileState(int isopen)
update file on/off state
void SetHistoryDepth(int entries)
set depth of variable history buffer
void CreateCommandWorker()
void LoggerAddonCreated()
Called by LogWorker to inform that connection is established.
void NewStatus(mbs::DaqStatus &stat)
Called by DaqStatusAddon to inform about new daq status.
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
virtual unsigned WriteRecRawData(void *ptr, unsigned maxsize)
virtual void ProcessEvent(const dabc::EventId &)
void ProcessNextMbsCommand()
virtual double ProcessTimeout(double last_diff)
PrompterWorker(const dabc::Reference &parent, const std::string &name, const std::string &mbsnode, int port)
virtual ~PrompterWorker()
virtual void OnThreadAssigned()
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
std::string format(const char *fmt,...)
Support for MBS - standard GSI DAQ.
void SwapData(void *data, unsigned bytessize)
Event structure, exchanged between DABC threads.
Class for acquiring and holding timestamps.
void GetNow()
Method to acquire current time stamp.
double AsDouble() const
Return time stamp in form of double (in seconds)
uint32_t bl_no_stream_buf
uint32_t bl_n_evserv_events
uint32_t bh_acqui_running
uint32_t bh_running[SYS__N_MAX_PROCS]
uint32_t bl_n_strserv_bufs
uint32_t bl_n_strserv_kbytes