7#include <boost/asio/awaitable.hpp>
8#include <boost/asio/detached.hpp>
9#include <boost/asio/impl/co_spawn.hpp>
10#include <boost/asio/use_awaitable.hpp>
16#include <spdlog/common.h>
17#include <spdlog/pattern_formatter.h>
18#include <spdlog/sinks/stdout_color_sinks.h>
20#include <oneapi/tbb/concurrent_queue.h>
22#include <tbb/concurrent_queue.h>
25#include <spdlog/spdlog.h>
42 console_ = spdlog::stdout_color_st(
"Data Monitor");
43 auto console_format = std::make_unique<spdlog::pattern_formatter>(
44 "[%H:%M:%S] <%n> %v", spdlog::pattern_time_type::local, std::string(
""));
45 console_->set_formatter(std::move(console_format));
46 console_->flush_on(spdlog::level::info);
47 console_->set_level(spdlog::level::info);
48 const auto buffer_size =
processor_->get_app().get_config().data_buffer_size;
49 const auto& task_workflow =
processor_->get_data_workflow();
52 co_await clock_.async_wait(asio::use_awaitable);
55 auto read_total_bytes_count =
processor_->get_read_data_bytes();
56 auto read_total_hits_count =
processor_->get_processed_hit_number();
57 auto frame_counts =
processor_->get_frame_counts();
58 auto write_total_bytes_count = task_workflow.get_data_bytes();
59 auto drop_total_bytes_count =
processor_->get_drop_data_bytes();
61 auto time_now = std::chrono::steady_clock::now();
62 auto time_duration = std::chrono::duration_cast<std::chrono::microseconds>(time_now -
last_print_time_);
77 const auto time_duration_us =
static_cast<double>(time_duration.count());
82 const auto frame_rate = (frame_counts_diff == 0) ? 0.
83 : bytes_read /
static_cast<double>(frame_counts_diff) /
84 static_cast<double>(buffer_size) * 100.;
87 console_->info(
"read|write|drop rate: {} ({:>2.0f}%) | {} | {} {}. Press \"Ctrl-C\" to stop the program \r",
102 const auto scale = (read_speed_MBps < 1.) ? 1000. : 1.;
103 unit_string_ = (read_speed_MBps < 1.) ? fmt::format(fmt::emphasis::bold,
"KB/s")
104 : fmt::format(fmt::emphasis::bold,
"MB/s");
106 fmt::format(fg(fmt::color::yellow) | fmt::emphasis::bold,
"{:>7.5}", scale * read_speed_MBps);
108 fmt::format(fg(fmt::color::spring_green) | fmt::emphasis::bold,
"{:^7.5}", scale * write_speed_MBps);
110 fmt::format(fg(fmt::color::orange_red) | fmt::emphasis::bold,
"{:<7.5}", scale * drop_speed_MBps);
118 spdlog::debug(
"DataMonitor: rate polling clock is killed.");
124 ,
monitor_{ this, &(control->get_io_context()) }
132 spdlog::debug(
"------->> Analysis workflow: main loop starts.");
141 spdlog::trace(
"Analysis workflow: entering workflow loop");
146 catch (tbb::user_abort& ex)
148 spdlog::trace(
"Analysis workflow: {}", ex.what());
150 spdlog::debug(
"<<------- Analysis workflow: main loop is done.");
164 auto expected =
false;
165 spdlog::trace(
"Analysis workflow: trying to stop ... ");
166 spdlog::trace(
"Analysis workflow: current is_stopped status: {}",
is_stopped_.load());
167 if (
is_stopped_.compare_exchange_strong(expected,
true))
169 spdlog::trace(
"------->> Analysis workflow shutdown: Try to monitor and tasks.");
178 spdlog::trace(
"<<------- Analysis workflow shutdown: monitor and tasks successfully stopped.");
185 throw std::runtime_error(
"task_diagram is still nullptr");
192 const auto data_size = read_data.size();
195 auto is_success =
data_queue_.try_emplace(read_data);
199 spdlog::trace(
"Analysis workflow: Data queue is full and message is lost. Try to increase its capacity!");
The primary interface class of SRS-Control.
uint64_t last_frame_counts_
std::string write_speed_string_
double current_received_bytes_MBps_
std::chrono::milliseconds period_
std::string drop_speed_string_
uint64_t last_write_data_bytes_
double current_write_bytes_MBps_
std::chrono::time_point< std::chrono::steady_clock > last_print_time_
asio::steady_timer clock_
gsl::not_null< Handler * > processor_
DataMonitor(Handler *processor, io_context_type *io_context)
std::shared_ptr< spdlog::logger > console_
double current_drop_bytes_MBps_
uint64_t last_read_data_bytes_
auto print_cycle() -> asio::awaitable< void >
uint64_t last_processed_hit_num_
gsl::not_null< io_context_type * > io_context_
uint64_t last_drop_data_bytes_
std::string read_speed_string_
void read_data_once(std::span< BufferElementType > read_data)
auto get_data_workflow() const -> const TaskDiagram &
tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > data_queue_
std::atomic< uint64_t > total_read_data_bytes_
Handler(App *control, std::size_t n_lines=1)
std::unique_ptr< TaskDiagram > task_diagram_
gsl::not_null< App * > app_
common::DataPrintMode print_mode_
std::atomic< bool > is_stopped_
std::atomic< uint64_t > total_drop_data_bytes_
std::atomic< uint64_t > total_frame_counts_
constexpr auto DEFAULT_DATA_QUEUE_SIZE
asio::thread_pool io_context_type