SRS-control 0.1.4
 
Loading...
Searching...
No Matches
Handler.cpp
Go to the documentation of this file.
1#include <boost/asio/co_spawn.hpp>
2#include <boost/asio/detached.hpp>
3#include <fmt/chrono.h>
4#include <fmt/color.h>
5#include <spdlog/pattern_formatter.h>
6#include <spdlog/sinks/stdout_color_sinks.h>
7
10
11namespace srs::workflow
12{
14 : processor_{ processor }
15 , io_context_{ io_context }
17 {
18 }
19
20 auto DataMonitor::print_cycle() -> asio::awaitable<void>
21 {
22 clock_.expires_after(period_);
23 console_ = spdlog::stdout_color_st("Data Monitor");
24 auto console_format = std::make_unique<spdlog::pattern_formatter>(
25 "[%H:%M:%S] <%n> %v", spdlog::pattern_time_type::local, std::string(""));
26 console_->set_formatter(std::move(console_format));
27 console_->flush_on(spdlog::level::info);
28 console_->set_level(spdlog::level::info);
29 while (true)
30 {
31 co_await clock_.async_wait(asio::use_awaitable);
32 clock_.expires_after(period_);
33
34 auto total_bytes_count = processor_->get_read_data_bytes();
35 auto total_hits_count = processor_->get_processed_hit_number();
36
37 auto time_now = std::chrono::steady_clock::now();
38
39 auto time_duration = std::chrono::duration_cast<std::chrono::microseconds>(time_now - last_print_time_);
40 auto bytes_read = static_cast<double>(total_bytes_count - last_read_data_bytes_);
41 auto hits_processed = static_cast<double>(total_hits_count - last_processed_hit_num_);
42
43 last_read_data_bytes_ = total_bytes_count;
44 last_processed_hit_num_ = total_hits_count;
45 last_print_time_ = time_now;
46
47 const auto time_duration_double = static_cast<double>(time_duration.count());
48 current_received_bytes_MBps_.store(bytes_read / time_duration_double);
49 current_hits_ps_.store(hits_processed / time_duration_double * 1e6);
50
52 console_->info("Data reading rate: {}. Press \"Ctrl-C\" to stop the program.\r", speed_string_);
53 }
54 }
55
56 void DataMonitor::set_speed_string(double speed_MBps)
57 {
58 if (speed_MBps < 1.)
59 {
61 fmt::format(fg(fmt::color::yellow) | fmt::emphasis::bold, "{:>7.5} KB/s", 1000. * speed_MBps);
62 }
63 else
64 {
65 speed_string_ = fmt::format(fg(fmt::color::yellow) | fmt::emphasis::bold, "{:>7.5} MB/s", speed_MBps);
66 }
67 }
68
69 void DataMonitor::start() { asio::co_spawn(*io_context_, print_cycle(), asio::detached); }
70
72 {
73 clock_.cancel();
74 spdlog::debug("DataMonitor: rate polling clock is killed.");
75 }
76
78 : app_{ control }
79 , monitor_{ this, &(control->get_io_context()) }
80 , data_processes_{ this, control->get_io_context() }
81 {
82 }
83
84 void Handler::start(bool is_blocking)
85 {
86 is_stopped_.store(false);
87 spdlog::debug("Workflow starts.");
88 if (print_mode_ == print_speed)
89 {
90 monitor_.start();
91 }
92 analysis_loop(is_blocking);
93 spdlog::trace("Workflow exit start().");
94 }
95
97 // DataProcessor::~DataProcessor() = default;
98
100 {
101 // CAS operation to guarantee the thread safty
102 auto expected = false;
103 spdlog::trace("Try to stop the workflow. Current is_stopped status: {}", is_stopped_.load());
104 if (is_stopped_.compare_exchange_strong(expected, true))
105 {
106 spdlog::trace("Try to stop data monitor");
107 monitor_.stop();
108 data_queue_.abort();
109 spdlog::trace("Workflow is stopped");
110 }
111 }
112
113 void Handler::read_data_once(std::span<BufferElementType> read_data)
114 {
115 total_read_data_bytes_ += read_data.size();
116 auto is_success = data_queue_.try_emplace(read_data);
117 if (not is_success)
118 {
119 spdlog::critical("Data queue is full and message is lost. Try to increase its capacity!");
120 }
121 }
122
123 void Handler::analysis_loop(bool is_blocking)
124 {
125 try
126 {
127 spdlog::trace("entering workflow loop");
128 // TODO: Use direct binary data
129
130 while (true)
131 {
132 if (is_stopped_.load())
133 {
134 break;
135 }
136 auto analysis_result = data_processes_.analysis_one(data_queue_, is_blocking);
138 print_data();
139
140 auto is_reading = app_->get_status().is_reading.load();
141 if (not(is_reading or analysis_result))
142 {
143 break;
144 }
145 }
146 }
147 catch (tbb::user_abort& ex)
148 {
149 spdlog::trace("Workflow: {}", ex.what());
150 }
151 catch (std::exception& ex)
152 {
153 spdlog::critical("Exception occured: {}", ex.what());
154 app_->set_error_string(ex.what());
155 // app_->exit();
156 }
157 spdlog::debug("Workflow loop is done.\n");
158 }
159
161 {
162 const auto& struct_data = data_processes_.get_struct_data();
163
164 total_processed_hit_numer_ += struct_data.hit_data.size();
165 monitor_.update(struct_data);
166 }
167
169 {
170 const auto& export_data = data_processes_.get_struct_data();
171 const auto& raw_data = data_processes_.get_data<TaskDiagram::raw>();
172 if (print_mode_ == print_raw)
173 {
174 spdlog::info("data: {:x}", fmt::join(raw_data, ""));
175 }
176 if (print_mode_ == print_header or print_mode_ == print_raw or print_mode_ == print_all)
177 {
178 spdlog::info("{}. Data size: {}", export_data.header, received_data_size_);
179 }
180
181 if (print_mode_ == print_all)
182 {
183 for (const auto& hit_data : export_data.hit_data)
184 {
185 spdlog::info("{}", hit_data);
186 }
187 for (const auto& marker_data : export_data.marker_data)
188 {
189 spdlog::info("{}", marker_data);
190 }
191 }
192 }
193} // namespace srs::workflow
std::atomic< double > current_hits_ps_
Definition Handler.hpp:49
std::chrono::milliseconds period_
Definition Handler.hpp:47
std::chrono::time_point< std::chrono::steady_clock > last_print_time_
Definition Handler.hpp:46
std::atomic< uint64_t > last_processed_hit_num_
Definition Handler.hpp:45
asio::steady_timer clock_
Definition Handler.hpp:43
gsl::not_null< Handler * > processor_
Definition Handler.hpp:40
std::atomic< uint64_t > last_read_data_bytes_
Definition Handler.hpp:44
DataMonitor(Handler *processor, io_context_type *io_context)
Definition Handler.cpp:13
std::shared_ptr< spdlog::logger > console_
Definition Handler.hpp:42
auto print_cycle() -> asio::awaitable< void >
Definition Handler.cpp:20
gsl::not_null< io_context_type * > io_context_
Definition Handler.hpp:41
void set_speed_string(double speed_MBps)
Definition Handler.cpp:56
std::atomic< double > current_received_bytes_MBps_
Definition Handler.hpp:48
void read_data_once(std::span< BufferElementType > read_data)
Definition Handler.cpp:113
std::size_t received_data_size_
Definition Handler.hpp:96
tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > data_queue_
Definition Handler.hpp:104
std::atomic< uint64_t > total_read_data_bytes_
Definition Handler.hpp:98
void start(bool is_blocking)
Definition Handler.cpp:84
Handler(App *control)
Definition Handler.cpp:77
std::atomic< uint64_t > total_processed_hit_numer_
Definition Handler.hpp:99
gsl::not_null< App * > app_
Definition Handler.hpp:100
common::DataPrintMode print_mode_
Definition Handler.hpp:97
std::atomic< bool > is_stopped_
Definition Handler.hpp:95
TaskDiagram data_processes_
Definition Handler.hpp:105
void analysis_loop(bool is_blocking)
Definition Handler.cpp:123
asio::thread_pool io_context_type