SRS-control 0.1.4
Loading...
Searching...
No Matches
Application.cpp
Go to the documentation of this file.
1#include "srs/Application.hpp"
9
10#include <boost/asio/error.hpp>
11#include <boost/asio/executor_work_guard.hpp>
12#include <boost/asio/strand.hpp>
13#include <boost/system/detail/error_code.hpp>
14#include <chrono>
15#include <cstddef>
16#include <exception>
17#include <expected>
18#include <fmt/base.h>
19#include <fmt/format.h>
20#include <future>
21#include <memory>
22#include <spdlog/spdlog.h>
23#include <string>
24#include <string_view>
25#include <thread>
26#include <vector>
27
28namespace srs
29{
30 internal::AppExitHelper::~AppExitHelper() noexcept { app_->action_after_destructor(); }
31
33 : io_work_guard_{ asio::make_work_guard(io_context_) }
34 , fec_strand_{ asio::make_strand(io_context_.get_executor()) }
35 {
36 spdlog::set_pattern("[%H:%M:%S][%^%=6l%$][%t] %v");
37 spdlog::info("Welcome to SRS Application");
38 workflow_handler_ = std::make_unique<workflow::Handler>(this);
39 }
40
42 {
43 spdlog::trace("Application: Resetting io context workguard ... ");
44 io_work_guard_.reset();
45 if (auto switch_off_status = wait_for_switch_action(switch_off_future_);
46 switch_off_status == std::future_status::ready)
47 {
48 spdlog::info("Application: FECs has been switched off successfully.");
49 }
50 else if (switch_off_status == std::future_status::timeout)
51 {
52 spdlog::warn("Application: TIMEOUT while waiting for switching off process to finish.");
53 }
54 else if (switch_off_status == std::future_status::deferred)
55 {
56 spdlog::warn("Application: Switching off of FECs is pending.");
57 }
58 else if (not switch_off_status.has_value())
59 {
60 spdlog::debug("Application: FECs don't need switching off due to: {}", switch_off_status.error().message());
61 }
62
63 // TODO: set a timer
64 //
65 if (working_thread_.joinable())
66 {
67 spdlog::debug("Application: Wait until working thread finishes ...");
68 working_thread_.join();
69 spdlog::debug("Application: All tasks in main io_context are finished.");
70 }
71 spdlog::debug("Application has exited.");
72 }
73
74 App::~App() noexcept
75 {
76 spdlog::debug("Application: Calling the destructor ");
77 auto err = boost::system::error_code{};
78 signal_set_.cancel(err);
79 signal_set_.clear(err);
80 }
81
82 void App::init()
83 {
85 signal_set_.async_wait(
86 [this](const boost::system::error_code& error, auto)
87 {
88 if (error == asio::error::operation_aborted)
89 {
90 return;
91 }
93 spdlog::info("Application: Calling SIGINT from monitoring thread");
94 });
95 auto monitoring_action = [this]()
96 {
97 try
98 {
99 io_context_.join();
100 }
101 catch (const std::exception& ex)
102 {
103 spdlog::critical("Application: Exception on working thread occurred: {}", ex.what());
104 }
105 };
106 working_thread_ = std::jthread{ monitoring_action };
107 }
108
110 {
111 if (data_socket_ == nullptr)
112 {
113 return;
114 }
115 data_socket_->cancel();
116 auto status = data_socket_->wait_for_listen_finish(common::DEFAULT_STATUS_WAITING_TIME_SECONDS);
117 if (status.has_value() and status.value() == std::future_status::timeout)
118 {
119 spdlog::warn("Application: TIMEOUT during waiting for the closing of input data stream.");
120 }
121 }
122
123 // This will be called by Ctrl-C interrupt
125 {
126 fmt::println("");
127 spdlog::debug("Application: exit is called");
129 workflow_handler_->stop();
130
131 // Turn off SRS data acquisition
133
134 if (auto switch_on_status = wait_for_switch_action(switch_on_future_);
135 switch_on_status == std::future_status::ready)
136 {
137 spdlog::debug("Application: FECs has been switched on successfully.");
138 switch_off();
139 }
140 else if (switch_on_status == std::future_status::timeout)
141 {
142 spdlog::warn(
143 "Application: TIMEOUT during waiting for FECs to be switched on. Skipping switching off process.");
144 }
145 else if (switch_on_status == std::future_status::deferred)
146 {
147 spdlog::warn("Application: Switching on of FECs is pending. Skipping switching off process.");
148 }
149 else
150 {
151 spdlog::info("Application: FECs were not switched on. Skipping switching off process.");
152 }
153 }
154
155 void App::set_print_mode(common::DataPrintMode mode) { workflow_handler_->set_print_mode(mode); }
156 void App::set_output_filenames(const std::vector<std::string>& filenames, std::size_t n_lines)
157 {
158 workflow_handler_->set_n_pipelines(n_lines);
159 workflow_handler_->set_output_filenames(filenames);
160 }
161
162 void App::add_remote_fec_endpoint(std::string_view remote_ip, int port_number)
163 {
164 auto resolver = udp::resolver{ io_context_ };
165 spdlog::info("Application: Add the remote FEC with ip: {} and port: {}", remote_ip, port_number);
166 auto udp_endpoints = resolver.resolve(udp::v4(), remote_ip, fmt::format("{}", port_number));
167
168 if (udp_endpoints.begin() == udp_endpoints.end())
169 {
170 spdlog::debug("Application: Failed to add the FEC remote point");
171 return;
172 }
173 remote_fec_endpoints_.push_back(*udp_endpoints.begin());
174 }
175
176 template <typename T>
177 auto App::switch_FECs(std::string_view connection_name) -> SwitchFutureType
178 {
181 .transform(
182 [this, connection_name](const std::shared_ptr<connection::FecCommandSocket>& socket)
183 {
184 for (const auto& remote_endpoint : remote_fec_endpoints_)
185 {
186 auto fec_connection = std::make_shared<T>(connection_name);
187 fec_connection->set_remote_endpoint(remote_endpoint);
188 fec_connection->send_message_from(socket);
189 }
190 auto res_fut = socket->cancel_listen_after(io_context_);
191 socket->launch_actions();
192 return res_fut;
193 });
194 }
195
197 {
198 spdlog::info("Application: Switching on FEC devices ...");
200 }
201
203 {
204 spdlog::info("Application: Switching off FEC devices ...");
206 }
207
208 void App::read_data(bool /*is_non_stop*/)
209 {
210 spdlog::info("Application: Starting input data stream ...");
212 config_.fec_data_receive_port, io_context_, workflow_handler_.get())
213 .transform([this](auto socket) { data_socket_ = std::move(socket); });
214 spdlog::debug("data stream is using the buffer size: {}", config_.data_buffer_size);
215 data_socket_->set_buffer_size(config_.data_buffer_size);
216
217 if (not fut.has_value())
218 {
219 spdlog::critical(
220 "Application: Cannot establish the connection for the input data stream because the local port number "
221 "{} is not available.",
222 config_.fec_data_receive_port);
223 }
224 }
225
227 {
228 workflow_thread_ = std::jthread(
229 [this]()
230 {
231 spdlog::info("Application: Starting input data analysis workflow ...");
232 workflow_handler_->start();
233 });
234 }
235
237
239 {
240 if (not switch_future->valid())
241 {
242 return std::unexpected{ asio::error::make_error_code(asio::error::basic_errors::invalid_argument) };
243 }
244 return switch_future.transform(
245 [](const std::future<void>& switch_fut)
246 {
247 if (switch_fut.valid())
248 {
249 return switch_fut.wait_for(common::DEFAULT_STATUS_WAITING_TIME_SECONDS);
250 };
251 return std::future_status::ready;
252 });
253 }
254
256 {
257 for (const auto& fec_ips : config_.remote_fec_ips)
258 {
260 }
261 }
262} // namespace srs
~App() noexcept
Destructor of the App class.
asio::executor_work_guard< io_context_type::executor_type > io_work_guard_
Asio io_context work guard.
void init()
Initialization of internal members.
SwitchFutureType switch_on_future_
void wait_for_finish()
Manually wait for the working thread to finish.
void read_data(bool is_non_stop=true)
Start reading the input data stream from the port specified by srs::Config::fec_data_receive_port....
void set_output_filenames(const std::vector< std::string > &filenames, std::size_t n_lines=1)
Set the output filenames.
void add_remote_fec_endpoint(std::string_view remote_ip, int port_number)
asio::signal_set signal_set_
User signal handler for interrupts.
std::vector< udp::endpoint > remote_fec_endpoints_
Remote endpoints of FEC devices.
void exit_and_switch_off()
void start_workflow()
Start data analysis workflow, triggering data conversions.
std::expected< std::future< void >, boost::system::error_code > SwitchFutureType
std::jthread working_thread_
Main working thread.
void set_print_mode(common::DataPrintMode mode)
Set the print mode.
void switch_off()
Establish the communications to the available FECs to stop the data acquisition.
std::expected< std::future_status, boost::system::error_code > SwitchFutureStatusType
std::unique_ptr< workflow::Handler > workflow_handler_
The handler to the analysis working flow.
App()
Constructor of the App class.
asio::strand< io_context_type::executor_type > fec_strand_
FEC communication strand for synchronous communications.
void set_remote_fec_endpoints()
std::shared_ptr< connection::DataSocket > data_socket_
Communication to the main input data stream.
static auto wait_for_switch_action(const SwitchFutureType &switch_future) -> SwitchFutureStatusType
SwitchFutureType switch_off_future_
Config config_
io_context_type io_context_
Asio io_context that manages the task scheduling and network IO.
std::jthread workflow_thread_
Main thread to run workflow.
void switch_on()
Establish the communications to the available FECs to start the data acquisition.
auto switch_FECs(std::string_view connection_name) -> SwitchFutureType
void wait_for_reading_finish()
void action_after_destructor()
static auto create(int port_number, io_context_type &io_context, Args... args) -> std::expected< std::shared_ptr< SocketType >, boost::system::error_code >
~AppExitHelper() noexcept
Destructor calling srs::App::end_of_work method.
constexpr auto DEFAULT_STATUS_WAITING_TIME_SECONDS
DataPrintMode
Print mode of the status line.
constexpr auto DEFAULT_SRS_CONTROL_PORT