SRS-control 0.1.4
 
Loading...
Searching...
No Matches
ConnectionBase.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <boost/asio/experimental/awaitable_operators.hpp>
4#include <boost/asio/experimental/coro.hpp>
5#include <fmt/ranges.h>
6#include <gsl/gsl-lite.hpp>
7#include <spdlog/spdlog.h>
8#include <srs/Application.hpp>
13
14namespace srs::connection
15{
16 // derive from enable_shared_from_this to make sure object still alive in the coroutine
17 template <int buffer_size = common::SMALL_READ_MSG_BUFFER_SIZE>
18 class Base : public std::enable_shared_from_this<Base<buffer_size>>
19 {
20 public:
21 explicit Base(const Info& info, std::string name)
22 : local_port_number_{ info.local_port_number }
23 , name_{ std::move(name) }
24 , app_{ info.control }
25 {
26 spdlog::debug("Creating connection {} with buffer size: {}", name_, buffer_size);
27 }
28
29 // possible overload from derived class
30 void read_data_handle(std::span<BufferElementType> read_data) {}
31 void close() { close_socket(); }
32 void on_fail() { spdlog::debug("default on_fail is called!"); }
33 auto get_executor() { return app_->get_io_context().get_executor(); }
34
35 void listen(this auto&& self, bool is_non_stop = false);
36 void communicate(this auto&& self, const std::vector<CommunicateEntryType>& data, uint16_t address);
37
38 auto send_continuous_message() -> asio::experimental::coro<int(std::optional<std::string_view>)>;
39
40 // Settters:
41 void set_socket(std::unique_ptr<asio::ip::udp::socket> socket) { socket_ = std::move(socket); }
42 void set_remote_endpoint(asio::ip::udp::endpoint endpoint) { remote_endpoint_ = std::move(endpoint); }
43 void set_timeout_seconds(int val) { timeout_seconds_ = val; }
44
45 void set_send_message(const RangedData auto& msg)
46 {
47 continuous_send_msg_ = std::span{ msg.begin(), msg.end() };
48 }
49
50 // Getters:
51 [[nodiscard]] auto get_read_msg_buffer() const -> const ReadBufferType<buffer_size>&
52 {
53 return read_msg_buffer_;
54 }
55 [[nodiscard]] auto get_name() const -> const std::string& { return name_; }
56 [[nodiscard]] auto get_app() -> App& { return *app_; }
57 auto get_socket() -> const udp::socket& { return *socket_; }
58 auto get_remote_endpoint() -> const udp::endpoint& { return remote_endpoint_; }
59 [[nodiscard]] auto get_local_port_number() const -> int { return local_port_number_; }
60 [[nodiscard]] auto is_continuous() const -> bool { return is_continuous_; }
61
62 protected:
63 auto new_shared_socket(int port_number) -> std::unique_ptr<udp::socket>;
66
67 private:
68 bool is_continuous_ = false;
70 std::atomic<bool> is_socket_closed_{ false };
72 std::string name_ = "ConnectionBase";
73 gsl::not_null<App*> app_;
74 std::unique_ptr<udp::socket> socket_;
75 udp::endpoint remote_endpoint_;
77 std::span<const char> continuous_send_msg_;
78 std::unique_ptr<asio::signal_set> signal_set_;
81
82 void encode_write_msg(const std::vector<CommunicateEntryType>& data, uint16_t address);
83 static auto signal_handling(SharedConnectionPtr auto connection) -> asio::awaitable<void>;
84 static auto timer_countdown(auto* connection) -> asio::awaitable<void>;
85 static auto listen_message(SharedConnectionPtr auto connection, bool is_non_stop = false)
86 -> asio::awaitable<void>;
87 static auto send_message(std::shared_ptr<Base> connection) -> asio::awaitable<void>;
88 void reset_read_msg_buffer() { std::fill(read_msg_buffer_.begin(), read_msg_buffer_.end(), 0); }
89 };
90
91 // Member function definitions:
92
93 template <int buffer_size>
94 auto Base<buffer_size>::send_message(std::shared_ptr<Base<buffer_size>> connection) -> asio::awaitable<void>
95 {
96 spdlog::trace("Connection {}: Sending data ...", connection->get_name());
97 auto data_size = co_await connection->socket_->async_send_to(
98 asio::buffer(connection->write_msg_buffer_.data()), connection->remote_endpoint_, asio::use_awaitable);
99 spdlog::debug("Connection {}: Message is sent.", connection->get_name());
100 spdlog::trace("Connection {}: {} bytes data sent with {:02x}",
101 connection->get_name(),
102 data_size,
103 fmt::join(connection->write_msg_buffer_.data(), " "));
104 }
105
106 template <int buffer_size>
107 auto Base<buffer_size>::send_continuous_message() -> asio::experimental::coro<int(std::optional<std::string_view>)>
108 {
109 auto send_msg = std::string_view{};
110 auto data_size = 0;
111 while (true)
112 {
113 data_size = (not send_msg.empty()) ? socket_->send_to(asio::buffer(send_msg), remote_endpoint_) : 0;
114 auto msg = co_yield data_size;
115
116 if (not msg.has_value())
117 {
118 close();
119 co_return;
120 }
121 else
122 {
123 send_msg = msg.value();
124 }
125 }
126 }
127
128 template <int buffer_size>
130 -> asio::awaitable<void>
131 {
132 using asio::experimental::awaitable_operators::operator||;
133
134 spdlog::debug("Connection {}: starting to listen ...", connection->get_name());
135
136 auto io_context = co_await asio::this_coro::executor;
137 auto timer = asio::steady_timer{ io_context };
138 if (is_non_stop)
139 {
140 timer.expires_at(decltype(timer)::time_point::max());
141 }
142 else
143 {
144 timer.expires_after(std::chrono::seconds{ connection->timeout_seconds_ });
145 }
146
147 while (true)
148 {
149 if (not connection->socket_->is_open() or connection->is_socket_closed_.load())
150 {
151 co_return;
152 }
153
154 auto receive_data_size = co_await (
155 connection->socket_->async_receive(asio::buffer(connection->read_msg_buffer_), asio::use_awaitable) ||
156 timer.async_wait(asio::use_awaitable));
157 if (not is_non_stop and std::holds_alternative<std::monostate>(receive_data_size))
158 {
159 if (not connection->is_continuous())
160 {
161 spdlog::error("Connection {}: Message listening TIMEOUT after {} seconds.",
162 connection->get_name(),
163 connection->timeout_seconds_);
164 connection->on_fail();
165 }
166 else
167 {
168 spdlog::info("Connection {}: Message listening TIMEOUT after {} seconds.",
169 connection->get_name(),
170 connection->timeout_seconds_);
171 }
172 break;
173 }
174 auto read_msg = std::span{ connection->read_msg_buffer_.data(), std::get<std::size_t>(receive_data_size) };
175 connection->read_data_handle(read_msg);
176 // spdlog::info("Connection {}: received {} bytes data", connection->get_name(), read_msg.size());
177
178 connection->reset_read_msg_buffer();
179 if (not connection->is_continuous() or connection->get_app().get_status().is_on_exit.load())
180 {
181 break;
182 }
183 }
184 connection->close();
185 }
186
187 template <int buffer_size>
189 {
190 connection->signal_set_ = std::make_unique<asio::signal_set>(co_await asio::this_coro::executor, SIGINT);
191 spdlog::trace("Connection {}: waiting for signals", connection->get_name());
192 auto [error, sig_num] = co_await connection->signal_set_->async_wait(asio::as_tuple(asio::use_awaitable));
193 if (error == asio::error::operation_aborted)
194 {
195 spdlog::trace("Connection {}: Signal ended with {}", connection->get_name(), error.message());
196 }
197 else
198 {
199 fmt::print("\n");
200 spdlog::trace(
201 "Connection {}: Signal ID {} is called with {:?}!", connection->get_name(), sig_num, error.message());
202 connection->close();
203 }
204 }
205
206 template <int buffer_size>
207 auto Base<buffer_size>::new_shared_socket(int port_number) -> std::unique_ptr<udp::socket>
208 {
209 auto socket = std::make_unique<udp::socket>(
210 app_->get_io_context(), udp::endpoint{ udp::v4(), static_cast<asio::ip::port_type>(port_number) });
211 spdlog::debug("Connection {}: Openning the socket from ip: {} with port: {}",
212 name_,
213 socket->local_endpoint().address().to_string(),
214 socket->local_endpoint().port());
215 local_port_number_ = socket->local_endpoint().port();
216 return socket;
217 }
218
219 template <int size>
220 void Base<size>::encode_write_msg(const std::vector<CommunicateEntryType>& data, uint16_t address)
221 {
222 write_msg_buffer_.serialize(counter_,
224 address,
228 write_msg_buffer_.serialize(data);
229 }
230
231 template <int buffer_size>
232 void Base<buffer_size>::listen(this auto&& self, bool is_non_stop)
233 {
234 using asio::experimental::awaitable_operators::operator||;
235 if (self.socket_ == nullptr)
236 {
237 self.socket_ = self.new_shared_socket(self.local_port_number_);
238 }
239
240 co_spawn(self.app_->get_io_context(),
243 asio::detached);
244 spdlog::trace("Connection {}: spawned listen coroutine", self.name_);
245 }
246
247 template <int buffer_size>
248 void Base<buffer_size>::communicate(this auto&& self,
249 const std::vector<CommunicateEntryType>& data,
250 uint16_t address)
251 {
252 self.listen();
253 self.encode_write_msg(data, address);
254 co_spawn(self.app_->get_io_context(), send_message(self.shared_from_this()), asio::detached);
255 spdlog::trace("Connection {}: spawned write coroutine", self.name_);
256 }
257
258 template <int buffer_size>
260 {
261 if (not is_socket_closed_.load())
262 {
263 is_socket_closed_.store(true);
264 spdlog::trace("Connection {}: Closing the socket ...", name_);
265 // socket_->cancel();
266 socket_->close();
267 if (signal_set_ != nullptr)
268 {
269 spdlog::trace("Connection {}: cannelling signal ...", name_);
270 signal_set_->cancel();
271 spdlog::trace("Connection {}: signal is cancelled.", name_);
272 }
273 spdlog::trace("Connection {}: Socket is closed and cancelled.", name_);
274 }
275 }
276} // namespace srs::connection
static auto send_message(std::shared_ptr< Base > connection) -> asio::awaitable< void >
static auto timer_countdown(auto *connection) -> asio::awaitable< void >
process::SerializableMsgBuffer write_msg_buffer_
void set_send_message(const RangedData auto &msg)
auto get_read_msg_buffer() const -> const ReadBufferType< buffer_size > &
void encode_write_msg(const std::vector< CommunicateEntryType > &data, uint16_t address)
std::unique_ptr< asio::signal_set > signal_set_
udp::endpoint remote_endpoint_
void set_timeout_seconds(int val)
void set_socket(std::unique_ptr< asio::ip::udp::socket > socket)
static auto listen_message(SharedConnectionPtr auto connection, bool is_non_stop=false) -> asio::awaitable< void >
auto is_continuous() const -> bool
void listen(this auto &&self, bool is_non_stop=false)
void read_data_handle(std::span< BufferElementType > read_data)
auto get_local_port_number() const -> int
auto send_continuous_message() -> asio::experimental::coro< int(std::optional< std::string_view >)>
std::span< const char > continuous_send_msg_
auto get_name() const -> const std::string &
std::unique_ptr< udp::socket > socket_
auto get_remote_endpoint() -> const udp::endpoint &
std::atomic< bool > is_socket_closed_
void set_remote_endpoint(asio::ip::udp::endpoint endpoint)
void communicate(this auto &&self, const std::vector< CommunicateEntryType > &data, uint16_t address)
auto new_shared_socket(int port_number) -> std::unique_ptr< udp::socket >
void set_continuous(bool is_continuous=true)
static auto signal_handling(SharedConnectionPtr auto connection) -> asio::awaitable< void >
ReadBufferType< buffer_size > read_msg_buffer_
gsl::not_null< App * > app_
auto get_app() -> App &
Base(const Info &info, std::string name)
auto get_socket() -> const udp::socket &
std::array< BufferElementType, buffer_size > ReadBufferType
constexpr auto WRITE_COMMAND_BITS
constexpr auto INIT_COUNT_VALUE
constexpr auto DEFAULT_TYPE_BITS
constexpr auto ZERO_UINT16_PADDING
constexpr auto COMMAND_LENGTH_BITS
constexpr auto DEFAULT_TIMEOUT_SECONDS
constexpr auto get_shared_from_this(auto &&obj)