SRS-control 0.1.4
Loading...
Searching...
No Matches
ConnectionBase.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <boost/asio/experimental/awaitable_operators.hpp>
4#include <boost/asio/experimental/coro.hpp>
5#include <fmt/ranges.h>
6#include <gsl/gsl-lite.hpp>
7#include <spdlog/spdlog.h>
8#include <srs/Application.hpp>
13
14namespace srs::connection
15{
16 // derive from enable_shared_from_this to make sure object still alive in the coroutine
17 template <int buffer_size = common::SMALL_READ_MSG_BUFFER_SIZE>
18 class Base : public std::enable_shared_from_this<Base<buffer_size>>
19 {
20 public:
21 explicit Base(const Info& info, std::string name)
22 : local_port_number_{ info.local_port_number }
23 , name_{ std::move(name) }
24 , app_{ info.control }
25 {
26 spdlog::debug("Creating connection {} with buffer size: {}", name_, buffer_size);
27 }
28
29 // possible overload from derived class
30 void read_data_handle(std::span<BufferElementType> read_data) {}
31 void close() { close_socket(); }
32 void on_fail() { spdlog::debug("default on_fail is called!"); }
33 auto get_executor() { return app_->get_io_context().get_executor(); }
34
35 void listen(this auto&& self, bool is_non_stop = false);
36 void communicate(this auto&& self, const std::vector<CommunicateEntryType>& data, uint16_t address);
37
38 auto send_continuous_message() -> asio::experimental::coro<int(std::optional<std::string_view>)>;
39
40 // Settters:
41 void set_socket(std::unique_ptr<asio::ip::udp::socket> socket) { socket_ = std::move(socket); }
42 void set_remote_endpoint(asio::ip::udp::endpoint endpoint) { remote_endpoint_ = std::move(endpoint); }
43 void set_timeout_seconds(int val) { timeout_seconds_ = val; }
44
45 void set_send_message(const RangedData auto& msg)
46 {
47 continuous_send_msg_ = std::span{ msg.begin(), msg.end() };
48 }
49
50 // Getters:
51 [[nodiscard]] auto get_read_msg_buffer() const -> const ReadBufferType<buffer_size>&
52 {
53 return read_msg_buffer_;
54 }
55 [[nodiscard]] auto get_name() const -> const std::string& { return name_; }
56 [[nodiscard]] auto get_app() -> App& { return *app_; }
57 auto get_socket() -> const udp::socket& { return *socket_; }
58 auto get_remote_endpoint() -> const udp::endpoint& { return remote_endpoint_; }
59 [[nodiscard]] auto get_local_port_number() const -> int { return local_port_number_; }
60 [[nodiscard]] auto is_continuous() const -> bool { return is_continuous_; }
61
62 protected:
63 auto new_shared_socket(int port_number) -> std::unique_ptr<udp::socket>;
67 auto is_connection_ok() -> bool { return is_connection_ok_; }
68
69 private:
70 bool is_continuous_ = false;
71 bool is_connection_ok_ = true;
73 std::atomic<bool> is_socket_closed_{ false };
75 std::string name_ = "ConnectionBase";
76 gsl::not_null<App*> app_;
77 std::unique_ptr<udp::socket> socket_;
78 udp::endpoint remote_endpoint_;
80 std::span<const char> continuous_send_msg_;
81 std::unique_ptr<asio::signal_set> signal_set_;
84
85 void encode_write_msg(const std::vector<CommunicateEntryType>& data, uint16_t address);
86 static auto signal_handling(SharedConnectionPtr auto connection) -> asio::awaitable<void>;
87 static auto timer_countdown(auto* connection) -> asio::awaitable<void>;
88 static auto listen_message(SharedConnectionPtr auto connection, bool is_non_stop = false)
89 -> asio::awaitable<void>;
90 static auto send_message(std::shared_ptr<Base> connection) -> asio::awaitable<void>;
91 void reset_read_msg_buffer() { std::fill(read_msg_buffer_.begin(), read_msg_buffer_.end(), 0); }
92 };
93
94 // Member function definitions:
95
96 template <int buffer_size>
97 auto Base<buffer_size>::send_message(std::shared_ptr<Base<buffer_size>> connection) -> asio::awaitable<void>
98 {
99 spdlog::trace("Connection {}: Sending data ...", connection->get_name());
100 auto data_size = co_await connection->socket_->async_send_to(
101 asio::buffer(connection->write_msg_buffer_.data()), connection->remote_endpoint_, asio::use_awaitable);
102 spdlog::debug("Connection {}: Message is sent.", connection->get_name());
103 spdlog::trace("Connection {}: {} bytes data sent with {:02x}",
104 connection->get_name(),
105 data_size,
106 fmt::join(connection->write_msg_buffer_.data(), " "));
107 }
108
109 template <int buffer_size>
110 auto Base<buffer_size>::send_continuous_message() -> asio::experimental::coro<int(std::optional<std::string_view>)>
111 {
112 auto send_msg = std::string_view{};
113 auto data_size = 0;
114 while (true)
115 {
116 data_size = (not send_msg.empty()) ? socket_->send_to(asio::buffer(send_msg), remote_endpoint_) : 0;
117 auto msg = co_yield data_size;
118
119 if (not msg.has_value())
120 {
121 close();
122 co_return;
123 }
124 else
125 {
126 send_msg = msg.value();
127 }
128 }
129 }
130
131 template <int buffer_size>
133 -> asio::awaitable<void>
134 {
135 using asio::experimental::awaitable_operators::operator||;
136
137 spdlog::debug("Connection {}: starting to listen ...", connection->get_name());
138
139 auto io_context = co_await asio::this_coro::executor;
140 auto timer = asio::steady_timer{ io_context };
141 if (is_non_stop)
142 {
143 timer.expires_at(decltype(timer)::time_point::max());
144 }
145 else
146 {
147 timer.expires_after(std::chrono::seconds{ connection->timeout_seconds_ });
148 }
149
150 while (true)
151 {
152 if (not connection->socket_->is_open() or connection->is_socket_closed_.load())
153 {
154 co_return;
155 }
156
157 auto receive_data_size = co_await (
158 connection->socket_->async_receive(asio::buffer(connection->read_msg_buffer_), asio::use_awaitable) ||
159 timer.async_wait(asio::use_awaitable));
160 if (not is_non_stop and std::holds_alternative<std::monostate>(receive_data_size))
161 {
162 if (not connection->is_continuous())
163 {
164 spdlog::error("Connection {}: Message listening TIMEOUT after {} seconds.",
165 connection->get_name(),
166 connection->timeout_seconds_);
167 connection->on_fail();
168 }
169 else
170 {
171 spdlog::info("Connection {}: Message listening TIMEOUT after {} seconds.",
172 connection->get_name(),
173 connection->timeout_seconds_);
174 }
175 break;
176 }
177 auto read_msg = std::span{ connection->read_msg_buffer_.data(), std::get<std::size_t>(receive_data_size) };
178 connection->read_data_handle(read_msg);
179 // spdlog::info("Connection {}: received {} bytes data", connection->get_name(), read_msg.size());
180
181 connection->reset_read_msg_buffer();
182 if (not connection->is_continuous() or connection->get_app().get_status().is_on_exit.load())
183 {
184 break;
185 }
186 }
187 connection->close();
188 }
189
190 template <int buffer_size>
192 {
193 connection->signal_set_ = std::make_unique<asio::signal_set>(co_await asio::this_coro::executor, SIGINT);
194 spdlog::trace("Connection {}: waiting for signals", connection->get_name());
195 auto [error, sig_num] = co_await connection->signal_set_->async_wait(asio::as_tuple(asio::use_awaitable));
196 if (error == asio::error::operation_aborted)
197 {
198 spdlog::trace("Connection {}: Signal ended with {}", connection->get_name(), error.message());
199 }
200 else
201 {
202 fmt::print("\n");
203 spdlog::trace(
204 "Connection {}: Signal ID {} is called with {:?}!", connection->get_name(), sig_num, error.message());
205 connection->close();
206 }
207 }
208
209 template <int buffer_size>
210 auto Base<buffer_size>::new_shared_socket(int port_number) -> std::unique_ptr<udp::socket>
211 {
212 auto socket = std::make_unique<udp::socket>(
213 app_->get_io_context(), udp::endpoint{ udp::v4(), static_cast<asio::ip::port_type>(port_number) });
214 spdlog::debug("Connection {}: Openning the socket from ip: {} with port: {}",
215 name_,
216 socket->local_endpoint().address().to_string(),
217 socket->local_endpoint().port());
218 local_port_number_ = socket->local_endpoint().port();
219 return socket;
220 }
221
222 template <int size>
223 void Base<size>::encode_write_msg(const std::vector<CommunicateEntryType>& data, uint16_t address)
224 {
225 write_msg_buffer_.serialize(counter_,
227 address,
231 write_msg_buffer_.serialize(data);
232 }
233
234 template <int buffer_size>
235 void Base<buffer_size>::listen(this auto&& self, bool is_non_stop)
236 {
237 using asio::experimental::awaitable_operators::operator||;
238 if (self.socket_ == nullptr)
239 {
240 self.socket_ = self.new_shared_socket(self.local_port_number_);
241 }
242
243 co_spawn(self.app_->get_io_context(),
246 asio::detached);
247 spdlog::trace("Connection {}: spawned listen coroutine", self.name_);
248 }
249
250 // TODO: utilize listen function
251 template <int buffer_size>
252 void Base<buffer_size>::communicate(this auto&& self,
253 const std::vector<CommunicateEntryType>& data,
254 uint16_t address)
255 {
256 using asio::experimental::awaitable_operators::operator||;
257 if (self.socket_ == nullptr)
258 {
259 self.socket_ = self.new_shared_socket(self.local_port_number_);
260 }
261
262 auto listen_action = co_spawn(self.app_->get_io_context(),
265 asio::deferred);
266
267 self.encode_write_msg(data, address);
268 auto send_action = co_spawn(self.app_->get_io_context(), send_message(self.shared_from_this()), asio::deferred);
269 auto group = asio::experimental::make_parallel_group(std::move(listen_action), std::move(send_action));
270 auto fut = group.async_wait(asio::experimental::wait_for_all(), asio::use_future);
271 spdlog::trace("Connection {}: start communication", self.name_);
272 fut.get();
273 }
274
275 template <int buffer_size>
277 {
278 if (not is_socket_closed_.load())
279 {
280 is_socket_closed_.store(true);
281 spdlog::trace("Connection {}: Closing the socket ...", name_);
282 // socket_->cancel();
283 socket_->close();
284 if (signal_set_ != nullptr)
285 {
286 spdlog::trace("Connection {}: cancelling signal ...", name_);
287 signal_set_->cancel();
288 spdlog::trace("Connection {}: signal is cancelled.", name_);
289 }
290 spdlog::trace("Connection {}: Socket is closed and cancelled.", name_);
291 }
292 }
293} // namespace srs::connection
static auto send_message(std::shared_ptr< Base > connection) -> asio::awaitable< void >
auto is_connection_ok() -> bool
static auto timer_countdown(auto *connection) -> asio::awaitable< void >
process::SerializableMsgBuffer write_msg_buffer_
void set_send_message(const RangedData auto &msg)
auto get_read_msg_buffer() const -> const ReadBufferType< buffer_size > &
void encode_write_msg(const std::vector< CommunicateEntryType > &data, uint16_t address)
std::unique_ptr< asio::signal_set > signal_set_
udp::endpoint remote_endpoint_
void set_timeout_seconds(int val)
void set_socket(std::unique_ptr< asio::ip::udp::socket > socket)
static auto listen_message(SharedConnectionPtr auto connection, bool is_non_stop=false) -> asio::awaitable< void >
auto is_continuous() const -> bool
void listen(this auto &&self, bool is_non_stop=false)
void read_data_handle(std::span< BufferElementType > read_data)
auto get_local_port_number() const -> int
auto send_continuous_message() -> asio::experimental::coro< int(std::optional< std::string_view >)>
std::span< const char > continuous_send_msg_
auto get_name() const -> const std::string &
std::unique_ptr< udp::socket > socket_
auto get_remote_endpoint() -> const udp::endpoint &
std::atomic< bool > is_socket_closed_
void set_remote_endpoint(asio::ip::udp::endpoint endpoint)
void communicate(this auto &&self, const std::vector< CommunicateEntryType > &data, uint16_t address)
auto new_shared_socket(int port_number) -> std::unique_ptr< udp::socket >
void set_continuous(bool is_continuous=true)
static auto signal_handling(SharedConnectionPtr auto connection) -> asio::awaitable< void >
ReadBufferType< buffer_size > read_msg_buffer_
gsl::not_null< App * > app_
auto get_app() -> App &
Base(const Info &info, std::string name)
auto get_socket() -> const udp::socket &
std::array< BufferElementType, buffer_size > ReadBufferType
constexpr auto WRITE_COMMAND_BITS
constexpr auto INIT_COUNT_VALUE
constexpr auto DEFAULT_TYPE_BITS
constexpr auto ZERO_UINT16_PADDING
constexpr auto COMMAND_LENGTH_BITS
constexpr auto DEFAULT_TIMEOUT_SECONDS
constexpr auto get_shared_from_this(auto &&obj)