SRS-control 0.1.4
Loading...
Searching...
No Matches
Handler.cpp
Go to the documentation of this file.
2#include "srs/Application.hpp"
7#include <boost/asio/awaitable.hpp>
8#include <boost/asio/detached.hpp>
9#include <boost/asio/impl/co_spawn.hpp>
10#include <boost/asio/use_awaitable.hpp>
11#include <chrono>
12#include <cstddef>
13#include <fmt/color.h>
14#include <memory>
15#include <span>
16#include <spdlog/common.h>
17#include <spdlog/pattern_formatter.h>
18#include <spdlog/sinks/stdout_color_sinks.h>
19#ifdef USE_ONEAPI_TBB
20#include <oneapi/tbb/concurrent_queue.h>
21#else
22#include <tbb/concurrent_queue.h>
23#endif
24
25#include <spdlog/spdlog.h>
26#include <stdexcept>
27#include <string>
28#include <utility>
29
30namespace srs::workflow
31{
33 : processor_{ processor }
34 , io_context_{ io_context }
36 {
37 }
38
39 auto DataMonitor::print_cycle() -> asio::awaitable<void>
40 {
41 clock_.expires_after(period_);
42 console_ = spdlog::stdout_color_st("Data Monitor");
43 auto console_format = std::make_unique<spdlog::pattern_formatter>(
44 "[%H:%M:%S] <%n> %v", spdlog::pattern_time_type::local, std::string(""));
45 console_->set_formatter(std::move(console_format));
46 console_->flush_on(spdlog::level::info);
47 console_->set_level(spdlog::level::info);
48 const auto buffer_size = processor_->get_app().get_config().data_buffer_size;
49 const auto& task_workflow = processor_->get_data_workflow();
50 while (true)
51 {
52 co_await clock_.async_wait(asio::use_awaitable);
53 clock_.expires_after(period_);
54
55 auto read_total_bytes_count = processor_->get_read_data_bytes();
56 auto read_total_hits_count = processor_->get_processed_hit_number();
57 auto frame_counts = processor_->get_frame_counts();
58 auto write_total_bytes_count = task_workflow.get_data_bytes();
59 auto drop_total_bytes_count = processor_->get_drop_data_bytes();
60
61 auto time_now = std::chrono::steady_clock::now();
62 auto time_duration = std::chrono::duration_cast<std::chrono::microseconds>(time_now - last_print_time_);
63
64 auto bytes_read = static_cast<double>(read_total_bytes_count - last_read_data_bytes_);
65 auto bytes_write = static_cast<double>(write_total_bytes_count - last_write_data_bytes_);
66 auto bytes_drop = static_cast<double>(drop_total_bytes_count - last_drop_data_bytes_);
67 auto hits_processed = static_cast<double>(read_total_hits_count - last_processed_hit_num_);
68 auto frame_counts_diff = frame_counts - last_frame_counts_;
69
70 last_read_data_bytes_ = read_total_bytes_count;
71 last_write_data_bytes_ = write_total_bytes_count;
72 last_drop_data_bytes_ = drop_total_bytes_count;
73 last_processed_hit_num_ = read_total_hits_count;
74 last_frame_counts_ = frame_counts;
75 last_print_time_ = time_now;
76
77 const auto time_duration_us = static_cast<double>(time_duration.count());
78 current_received_bytes_MBps_ = bytes_read / time_duration_us;
79 current_write_bytes_MBps_ = bytes_write / time_duration_us;
80 current_drop_bytes_MBps_ = bytes_drop / time_duration_us;
81 current_hits_ps_ = hits_processed / time_duration_us * 1e6;
82 const auto frame_rate = (frame_counts_diff == 0) ? 0.
83 : bytes_read / static_cast<double>(frame_counts_diff) /
84 static_cast<double>(buffer_size) * 100.;
85
87 console_->info("read|write|drop rate: {} ({:>2.0f}%) | {} | {} {}. Press \"Ctrl-C\" to stop the program \r",
89 frame_rate,
93 }
94 }
95
97 {
98 const auto read_speed_MBps = current_received_bytes_MBps_;
99 const auto write_speed_MBps = current_write_bytes_MBps_;
100 const auto drop_speed_MBps = current_drop_bytes_MBps_;
101
102 const auto scale = (read_speed_MBps < 1.) ? 1000. : 1.;
103 unit_string_ = (read_speed_MBps < 1.) ? fmt::format(fmt::emphasis::bold, "KB/s")
104 : fmt::format(fmt::emphasis::bold, "MB/s");
106 fmt::format(fg(fmt::color::yellow) | fmt::emphasis::bold, "{:>7.5}", scale * read_speed_MBps);
108 fmt::format(fg(fmt::color::spring_green) | fmt::emphasis::bold, "{:^7.5}", scale * write_speed_MBps);
110 fmt::format(fg(fmt::color::orange_red) | fmt::emphasis::bold, "{:<7.5}", scale * drop_speed_MBps);
111 }
112
113 void DataMonitor::start() { asio::co_spawn(*io_context_, print_cycle(), asio::detached); }
114
116 {
117 clock_.cancel();
118 spdlog::debug("DataMonitor: rate polling clock is killed.");
119 }
120
121 Handler::Handler(App* control, std::size_t n_lines)
122 : n_lines_{ n_lines }
123 , app_{ control }
124 , monitor_{ this, &(control->get_io_context()) }
125 {
127 }
128
130 {
131 is_stopped_.store(false);
132 spdlog::debug("------->> Analysis workflow: main loop starts.");
133 task_diagram_ = std::make_unique<TaskDiagram>(this, n_lines_);
134
135 if (print_mode_ == print_speed)
136 {
137 monitor_.start();
138 }
139 try
140 {
141 spdlog::trace("Analysis workflow: entering workflow loop");
142 // TODO: Use direct binary data
143
144 task_diagram_->construct_taskflow_and_run(data_queue_, is_stopped_);
145 }
146 catch (tbb::user_abort& ex)
147 {
148 spdlog::trace("Analysis workflow: {}", ex.what());
149 }
150 spdlog::debug("<<------- Analysis workflow: main loop is done.");
151 }
152
154 {
155 // stop();
156 spdlog::debug("Analysis workflow: total read data bytes: {} ", total_read_data_bytes_.load());
157 spdlog::debug("Analysis workflow: total frame counts: {} ", total_frame_counts_.load());
158 }
159 // DataProcessor::~DataProcessor() = default;
160
162 {
163 // CAS operation to guarantee the thread safety
164 auto expected = false;
165 spdlog::trace("Analysis workflow: trying to stop ... ");
166 spdlog::trace("Analysis workflow: current is_stopped status: {}", is_stopped_.load());
167 if (is_stopped_.compare_exchange_strong(expected, true))
168 {
169 spdlog::trace("------->> Analysis workflow shutdown: Try to monitor and tasks.");
170 monitor_.stop();
171
172 while (not task_diagram_->is_taskflow_abort_ready())
173 {
174 // spdlog::trace("waiting for task diagram to be abort ready");
175 }
176 data_queue_.abort();
177
178 spdlog::trace("<<------- Analysis workflow shutdown: monitor and tasks successfully stopped.");
179 }
180 }
182 {
183 if (task_diagram_ == nullptr)
184 {
185 throw std::runtime_error("task_diagram is still nullptr");
186 }
187 return *task_diagram_;
188 }
189
190 void Handler::read_data_once(std::span<BufferElementType> read_data)
191 {
192 const auto data_size = read_data.size();
193 total_read_data_bytes_ += data_size;
195 auto is_success = data_queue_.try_emplace(read_data);
196 if (not is_success)
197 {
198 total_drop_data_bytes_ += data_size;
199 spdlog::trace("Analysis workflow: Data queue is full and message is lost. Try to increase its capacity!");
200 }
201 }
202} // namespace srs::workflow
The primary interface class of SRS-Control.
std::string write_speed_string_
Definition Handler.hpp:69
std::chrono::milliseconds period_
Definition Handler.hpp:58
std::string drop_speed_string_
Definition Handler.hpp:70
std::chrono::time_point< std::chrono::steady_clock > last_print_time_
Definition Handler.hpp:57
asio::steady_timer clock_
Definition Handler.hpp:56
gsl::not_null< Handler * > processor_
Definition Handler.hpp:53
DataMonitor(Handler *processor, io_context_type *io_context)
Definition Handler.cpp:32
std::shared_ptr< spdlog::logger > console_
Definition Handler.hpp:55
auto print_cycle() -> asio::awaitable< void >
Definition Handler.cpp:39
gsl::not_null< io_context_type * > io_context_
Definition Handler.hpp:54
std::string read_speed_string_
Definition Handler.hpp:68
void read_data_once(std::span< BufferElementType > read_data)
Definition Handler.cpp:190
auto get_data_workflow() const -> const TaskDiagram &
Definition Handler.cpp:181
tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > data_queue_
Definition Handler.hpp:134
std::atomic< uint64_t > total_read_data_bytes_
Definition Handler.hpp:125
Handler(App *control, std::size_t n_lines=1)
Definition Handler.cpp:121
std::unique_ptr< TaskDiagram > task_diagram_
Definition Handler.hpp:135
gsl::not_null< App * > app_
Definition Handler.hpp:129
common::DataPrintMode print_mode_
Definition Handler.hpp:124
std::atomic< bool > is_stopped_
Definition Handler.hpp:122
std::atomic< uint64_t > total_drop_data_bytes_
Definition Handler.hpp:126
std::atomic< uint64_t > total_frame_counts_
Definition Handler.hpp:128
constexpr auto DEFAULT_DATA_QUEUE_SIZE
asio::thread_pool io_context_type