1#include <boost/asio/co_spawn.hpp>
2#include <boost/asio/detached.hpp>
5#include <spdlog/pattern_formatter.h>
6#include <spdlog/sinks/stdout_color_sinks.h>
23 console_ = spdlog::stdout_color_st(
"Data Monitor");
24 auto console_format = std::make_unique<spdlog::pattern_formatter>(
25 "[%H:%M:%S] <%n> %v", spdlog::pattern_time_type::local, std::string(
""));
26 console_->set_formatter(std::move(console_format));
27 console_->flush_on(spdlog::level::info);
28 console_->set_level(spdlog::level::info);
31 co_await clock_.async_wait(asio::use_awaitable);
34 auto total_bytes_count =
processor_->get_read_data_bytes();
35 auto total_hits_count =
processor_->get_processed_hit_number();
37 auto time_now = std::chrono::steady_clock::now();
39 auto time_duration = std::chrono::duration_cast<std::chrono::microseconds>(time_now -
last_print_time_);
47 const auto time_duration_double =
static_cast<double>(time_duration.count());
61 fmt::format(fg(fmt::color::yellow) | fmt::emphasis::bold,
"{:>7.5} KB/s", 1000. * speed_MBps);
65 speed_string_ = fmt::format(fg(fmt::color::yellow) | fmt::emphasis::bold,
"{:>7.5} MB/s", speed_MBps);
74 spdlog::debug(
"DataMonitor: rate polling clock is killed.");
79 ,
monitor_{ this, &(control->get_io_context()) }
87 spdlog::debug(
"Workflow starts.");
93 spdlog::trace(
"Workflow exit start().");
102 auto expected =
false;
103 spdlog::trace(
"Try to stop the workflow. Current is_stopped status: {}",
is_stopped_.load());
104 if (
is_stopped_.compare_exchange_strong(expected,
true))
106 spdlog::trace(
"Try to stop data monitor");
109 spdlog::trace(
"Workflow is stopped");
116 auto is_success =
data_queue_.try_emplace(read_data);
119 spdlog::critical(
"Data queue is full and message is lost. Try to increase its capacity!");
127 spdlog::trace(
"entering workflow loop");
140 auto is_reading =
app_->get_status().is_reading.load();
141 if (not(is_reading or analysis_result))
147 catch (tbb::user_abort& ex)
149 spdlog::trace(
"Workflow: {}", ex.what());
151 catch (std::exception& ex)
153 spdlog::critical(
"Exception occured: {}", ex.what());
154 app_->set_error_string(ex.what());
157 spdlog::debug(
"Workflow loop is done.\n");
174 spdlog::info(
"data: {:x}", fmt::join(raw_data,
""));
183 for (
const auto& hit_data : export_data.hit_data)
185 spdlog::info(
"{}", hit_data);
187 for (
const auto& marker_data : export_data.marker_data)
189 spdlog::info(
"{}", marker_data);
std::atomic< double > current_hits_ps_
std::chrono::milliseconds period_
std::chrono::time_point< std::chrono::steady_clock > last_print_time_
std::atomic< uint64_t > last_processed_hit_num_
asio::steady_timer clock_
gsl::not_null< Handler * > processor_
std::atomic< uint64_t > last_read_data_bytes_
DataMonitor(Handler *processor, io_context_type *io_context)
std::shared_ptr< spdlog::logger > console_
auto print_cycle() -> asio::awaitable< void >
gsl::not_null< io_context_type * > io_context_
void set_speed_string(double speed_MBps)
std::atomic< double > current_received_bytes_MBps_
std::string speed_string_
void read_data_once(std::span< BufferElementType > read_data)
std::size_t received_data_size_
tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > data_queue_
std::atomic< uint64_t > total_read_data_bytes_
void start(bool is_blocking)
std::atomic< uint64_t > total_processed_hit_numer_
gsl::not_null< App * > app_
common::DataPrintMode print_mode_
std::atomic< bool > is_stopped_
TaskDiagram data_processes_
void analysis_loop(bool is_blocking)
asio::thread_pool io_context_type