10#include <boost/asio/error.hpp>
11#include <boost/asio/executor_work_guard.hpp>
12#include <boost/asio/strand.hpp>
13#include <boost/system/detail/error_code.hpp>
19#include <fmt/format.h>
22#include <spdlog/spdlog.h>
36 spdlog::set_pattern(
"[%H:%M:%S][%^%=6l%$][%t] %v");
37 spdlog::info(
"Welcome to SRS Application");
43 spdlog::trace(
"Application: Resetting io context workguard ... ");
46 switch_off_status == std::future_status::ready)
48 spdlog::info(
"Application: FECs has been switched off successfully.");
50 else if (switch_off_status == std::future_status::timeout)
52 spdlog::warn(
"Application: TIMEOUT while waiting for switching off process to finish.");
54 else if (switch_off_status == std::future_status::deferred)
56 spdlog::warn(
"Application: Switching off of FECs is pending.");
58 else if (not switch_off_status.has_value())
60 spdlog::debug(
"Application: FECs don't need switching off due to: {}", switch_off_status.error().message());
67 spdlog::debug(
"Application: Wait until working thread finishes ...");
69 spdlog::debug(
"Application: All tasks in main io_context are finished.");
71 spdlog::debug(
"Application has exited.");
76 spdlog::debug(
"Application: Calling the destructor ");
77 auto err = boost::system::error_code{};
86 [
this](
const boost::system::error_code& error,
auto)
88 if (error == asio::error::operation_aborted)
93 spdlog::info(
"Application: Calling SIGINT from monitoring thread");
95 auto monitoring_action = [
this]()
101 catch (
const std::exception& ex)
103 spdlog::critical(
"Application: Exception on working thread occurred: {}", ex.what());
117 if (status.has_value() and status.value() == std::future_status::timeout)
119 spdlog::warn(
"Application: TIMEOUT during waiting for the closing of input data stream.");
127 spdlog::debug(
"Application: exit is called");
135 switch_on_status == std::future_status::ready)
137 spdlog::debug(
"Application: FECs has been switched on successfully.");
140 else if (switch_on_status == std::future_status::timeout)
143 "Application: TIMEOUT during waiting for FECs to be switched on. Skipping switching off process.");
145 else if (switch_on_status == std::future_status::deferred)
147 spdlog::warn(
"Application: Switching on of FECs is pending. Skipping switching off process.");
151 spdlog::info(
"Application: FECs were not switched on. Skipping switching off process.");
165 spdlog::info(
"Application: Add the remote FEC with ip: {} and port: {}", remote_ip, port_number);
166 auto udp_endpoints = resolver.resolve(udp::v4(), remote_ip, fmt::format(
"{}", port_number));
168 if (udp_endpoints.begin() == udp_endpoints.end())
170 spdlog::debug(
"Application: Failed to add the FEC remote point");
176 template <
typename T>
182 [
this, connection_name](
const std::shared_ptr<connection::FecCommandSocket>& socket)
186 auto fec_connection = std::make_shared<T>(connection_name);
187 fec_connection->set_remote_endpoint(remote_endpoint);
188 fec_connection->send_message_from(socket);
190 auto res_fut = socket->cancel_listen_after(
io_context_);
191 socket->launch_actions();
198 spdlog::info(
"Application: Switching on FEC devices ...");
204 spdlog::info(
"Application: Switching off FEC devices ...");
210 spdlog::info(
"Application: Starting input data stream ...");
213 .transform([
this](
auto socket) {
data_socket_ = std::move(socket); });
214 spdlog::debug(
"data stream is using the buffer size: {}",
config_.data_buffer_size);
217 if (not fut.has_value())
220 "Application: Cannot establish the connection for the input data stream because the local port number "
221 "{} is not available.",
222 config_.fec_data_receive_port);
231 spdlog::info(
"Application: Starting input data analysis workflow ...");
240 if (not switch_future->valid())
242 return std::unexpected{ asio::error::make_error_code(asio::error::basic_errors::invalid_argument) };
244 return switch_future.transform(
245 [](
const std::future<void>& switch_fut)
247 if (switch_fut.valid())
251 return std::future_status::ready;
257 for (
const auto& fec_ips :
config_.remote_fec_ips)
~App() noexcept
Destructor of the App class.
asio::executor_work_guard< io_context_type::executor_type > io_work_guard_
Asio io_context work guard.
void init()
Initialization of internal members.
SwitchFutureType switch_on_future_
void wait_for_finish()
Manually wait for the working thread to finish.
void read_data(bool is_non_stop=true)
Start reading the input data stream from the port specified by srs::Config::fec_data_receive_port....
void set_output_filenames(const std::vector< std::string > &filenames, std::size_t n_lines=1)
Set the output filenames.
void add_remote_fec_endpoint(std::string_view remote_ip, int port_number)
asio::signal_set signal_set_
User signal handler for interrupts.
std::vector< udp::endpoint > remote_fec_endpoints_
Remote endpoints of FEC devices.
void exit_and_switch_off()
void start_workflow()
Start data analysis workflow, triggering data conversions.
std::expected< std::future< void >, boost::system::error_code > SwitchFutureType
std::jthread working_thread_
Main working thread.
void set_print_mode(common::DataPrintMode mode)
Set the print mode.
void switch_off()
Establish the communications to the available FECs to stop the data acquisition.
std::expected< std::future_status, boost::system::error_code > SwitchFutureStatusType
std::unique_ptr< workflow::Handler > workflow_handler_
The handler to the analysis working flow.
App()
Constructor of the App class.
asio::strand< io_context_type::executor_type > fec_strand_
FEC communication strand for synchronous communications.
void set_remote_fec_endpoints()
std::shared_ptr< connection::DataSocket > data_socket_
Communication to the main input data stream.
static auto wait_for_switch_action(const SwitchFutureType &switch_future) -> SwitchFutureStatusType
SwitchFutureType switch_off_future_
io_context_type io_context_
Asio io_context that manages the task scheduling and network IO.
std::jthread workflow_thread_
Main thread to run workflow.
void switch_on()
Establish the communications to the available FECs to start the data acquisition.
auto switch_FECs(std::string_view connection_name) -> SwitchFutureType
void wait_for_reading_finish()
void action_after_destructor()
static auto create(int port_number, io_context_type &io_context, Args... args) -> std::expected< std::shared_ptr< SocketType >, boost::system::error_code >
~AppExitHelper() noexcept
Destructor calling srs::App::end_of_work method.
constexpr auto DEFAULT_STATUS_WAITING_TIME_SECONDS
DataPrintMode
Print mode of the status line.
constexpr auto DEFAULT_SRS_CONTROL_PORT