SRS-control 0.1.4
Loading...
Searching...
No Matches
Application.cpp
Go to the documentation of this file.
1#include <string_view>
2
3#include <fmt/ranges.h>
4#include <spdlog/spdlog.h>
5
6#include <srs/Application.hpp>
9
10namespace srs
11{
13 : io_work_guard_{ asio::make_work_guard(io_context_) }
14 , fec_strand_{ asio::make_strand(io_context_.get_executor()) }
15 {
16 spdlog::set_pattern("[%H:%M:%S] [%^%=7l%$] [thread %t] %v");
17 spdlog::info("Welcome to SRS Application");
18 workflow_handler_ = std::make_unique<workflow::Handler>(this);
19 }
20
21 AppExitHelper::~AppExitHelper() noexcept { app_->end_of_work(); }
22
24 {
25 spdlog::trace("Application: Resetting io context workguard ... ");
26 io_work_guard_.reset();
27 if (not status_.is_acq_off.load())
28 {
29 spdlog::critical(
30 "Failed to close srs system! Please manually close the system with\n\nsrs_control --acq-off\n");
31 }
32 // TODO: set a timer
33 //
34 if (working_thread_.joinable())
35 {
36 spdlog::debug("Application: Wait until working thread finishes ...");
37 // io_context_.stop();
38 working_thread_.join();
39 spdlog::debug("io context is stoped");
40 }
41 spdlog::debug("Application: working thread is finished");
42 spdlog::debug("Application has exited.");
43 }
44
45 App::~App() noexcept
46 {
47 spdlog::debug("Calling the destructor of App ... ");
48 signal_set_.cancel();
49
50 // Turn off SRS data acquisition
52
53 if (status_.is_acq_on.load())
54 {
55 spdlog::debug("Turning srs system off ...");
56 switch_off();
57 }
58 else
59 {
61 }
62 set_status_acq_on(false);
63 }
64
65 void App::init()
66 {
68 signal_set_.async_wait(
69 [this](const boost::system::error_code& error, auto)
70 {
71 if (error == asio::error::operation_aborted)
72 {
73 return;
74 }
75 exit();
76 spdlog::info("Calling SIGINT from monitoring thread");
77 });
78 auto monitoring_action = [this]()
79 {
80 try
81 {
82 io_context_.join();
83 }
84 catch (const std::exception& ex)
85 {
86 spdlog::critical("Exception on working thread occured: {}", ex.what());
87 }
88 };
89 working_thread_ = std::jthread{ monitoring_action };
90 }
91
93 {
94
95 auto res = status_.wait_for_status(
96 [](const auto& status)
97 {
98 spdlog::debug("Waiting for reading status false");
99 return not status.is_reading.load();
100 });
101
102 if (not res)
103 {
104 spdlog::critical("TIMEOUT during waiting for status is_reading false.");
105 }
106 }
107
108 // This will be called by Ctrl-C interrupt
110 {
111 spdlog::debug("App::exit is called");
112 status_.is_on_exit.store(true);
114 workflow_handler_->stop();
115 }
116
117 void App::set_print_mode(common::DataPrintMode mode) { workflow_handler_->set_print_mode(mode); }
118 void App::set_output_filenames(const std::vector<std::string>& filenames)
119 {
120 workflow_handler_->set_output_filenames(filenames);
121 }
122
123 void App::set_remote_endpoint(std::string_view remote_ip, int port_number)
124 {
125 auto resolver = udp::resolver{ io_context_ };
126 spdlog::debug("Set the remote socket with ip: {} and port: {}", remote_ip, port_number);
127 auto udp_endpoints = resolver.resolve(udp::v4(), remote_ip, fmt::format("{}", port_number));
128 remote_endpoint_ = *udp_endpoints.begin();
129 }
130
131 void App::add_remote_fec_endpoint(std::string_view remote_ip, int port_number)
132 {
133 auto resolver = udp::resolver{ io_context_ };
134 spdlog::debug("Add the remote FEC with ip: {} and port: {}", remote_ip, port_number);
135 auto udp_endpoints = resolver.resolve(udp::v4(), remote_ip, fmt::format("{}", port_number));
136
137 if (udp_endpoints.begin() == udp_endpoints.end())
138 {
139 spdlog::debug("Failed to add the FEC remote point");
140 return;
141 }
142 remote_fec_endpoints_.push_back(*udp_endpoints.begin());
143 }
144
146 {
147 auto connection_info = connection::Info{ this };
148 connection_info.local_port_number = configurations_.fec_control_local_port;
149
150 for (const auto& remote_endpoint : remote_fec_endpoints_)
151 {
152 auto fec_connection = std::make_shared<connection::Starter>(connection_info);
153 fec_connection->set_remote_endpoint(remote_endpoint);
154 asio::post(fec_strand_, [fec_connection = std::move(fec_connection)]() { fec_connection->acq_on(); });
155 }
156 }
157
159 {
160 const auto waiting_time = std::chrono::seconds{ 4 };
161 auto is_ok = wait_for_status(
162 [](const Status& status)
163 {
164 spdlog::debug("Waiting for acq_on status true ...");
165 return status.is_acq_on.load();
166 },
167 waiting_time);
168
169 if (not is_ok)
170 {
171 throw std::runtime_error("TIMEOUT during waiting for status is_acq_on true.");
172 }
173 auto connection_info = connection::Info{ this };
174 connection_info.local_port_number = configurations_.fec_control_local_port;
175
176 for (const auto& remote_endpoint : remote_fec_endpoints_)
177 {
178 auto fec_connection = std::make_shared<connection::Stopper>(connection_info);
179 fec_connection->set_remote_endpoint(remote_endpoint);
180 fec_connection->acq_off();
181 }
182 spdlog::info("SRS system is turned off");
184 }
185
186 void App::read_data(bool is_non_stop)
187 {
188 auto connection_info = connection::Info{ this };
189 connection_info.local_port_number = configurations_.fec_data_receive_port;
190 data_reader_ = std::make_shared<connection::DataReader>(connection_info, workflow_handler_.get());
191 data_reader_->start(is_non_stop);
192 }
193
194 void App::start_workflow(bool is_blocking) { workflow_handler_->start(is_blocking); }
196
198 {
199 for (const auto& fec_ips : configurations_.remote_fec_ips)
200 {
202 }
203 }
204} // namespace srs
~AppExitHelper() noexcept
~App() noexcept
asio::executor_work_guard< io_context_type::executor_type > io_work_guard_
void init()
void wait_for_finish()
void read_data(bool is_non_stop=true)
void set_remote_endpoint(std::string_view remote_ip, int port_number)
Config configurations_
void exit()
void set_status_acq_on(bool val=true)
std::shared_ptr< connection::DataReader > data_reader_
void end_of_work()
Status status_
void add_remote_fec_endpoint(std::string_view remote_ip, int port_number)
asio::signal_set signal_set_
std::vector< udp::endpoint > remote_fec_endpoints_
std::jthread working_thread_
void set_print_mode(common::DataPrintMode mode)
void switch_off()
void set_output_filenames(const std::vector< std::string > &filenames)
std::unique_ptr< workflow::Handler > workflow_handler_
void start_workflow(bool is_blocking=true)
auto wait_for_status(auto &&condition, std::chrono::seconds time_duration=common::DEFAULT_STATUS_WAITING_TIME_SECONDS) -> bool
asio::strand< io_context_type::executor_type > fec_strand_
udp::endpoint remote_endpoint_
void set_remote_fec_endpoints()
io_context_type io_context_
void switch_on()
void wait_for_reading_finish()
void set_status_acq_off(bool val=true)
constexpr auto DEFAULT_SRS_CONTROL_PORT
std::atomic< bool > is_acq_on
Definition AppStatus.hpp:11