3#include <boost/asio/experimental/awaitable_operators.hpp>
4#include <boost/asio/experimental/coro.hpp>
6#include <gsl/gsl-lite.hpp>
7#include <spdlog/spdlog.h>
17 template <
int buffer_size = common::SMALL_READ_MSG_BUFFER_SIZE>
18 class Base :
public std::enable_shared_from_this<Base<buffer_size>>
21 explicit Base(
const Info& info, std::string name)
23 ,
name_{ std::move(name) }
24 ,
app_{ info.control }
26 spdlog::debug(
"Creating connection {} with buffer size: {}",
name_, buffer_size);
32 void on_fail() { spdlog::debug(
"default on_fail is called!"); }
35 void listen(
this auto&& self,
bool is_non_stop =
false);
36 void communicate(
this auto&& self,
const std::vector<CommunicateEntryType>& data, uint16_t address);
41 void set_socket(std::unique_ptr<asio::ip::udp::socket> socket) {
socket_ = std::move(socket); }
55 [[nodiscard]]
auto get_name() const -> const std::
string& {
return name_; }
72 std::string
name_ =
"ConnectionBase";
82 void encode_write_msg(
const std::vector<CommunicateEntryType>& data, uint16_t address);
86 -> asio::awaitable<void>;
93 template <
int buffer_size>
96 spdlog::trace(
"Connection {}: Sending data ...",
connection->get_name());
97 auto data_size =
co_await connection->socket_->async_send_to(
98 asio::buffer(
connection->write_msg_buffer_.data()),
connection->remote_endpoint_, asio::use_awaitable);
99 spdlog::debug(
"Connection {}: Message is sent.",
connection->get_name());
100 spdlog::trace(
"Connection {}: {} bytes data sent with {:02x}",
103 fmt::join(
connection->write_msg_buffer_.data(),
" "));
106 template <
int buffer_size>
109 auto send_msg = std::string_view{};
114 auto msg =
co_yield data_size;
116 if (not msg.has_value())
123 send_msg = msg.value();
128 template <
int buffer_size>
130 -> asio::awaitable<void>
132 using asio::experimental::awaitable_operators::operator||;
134 spdlog::debug(
"Connection {}: starting to listen ...",
connection->get_name());
136 auto io_context =
co_await asio::this_coro::executor;
137 auto timer = asio::steady_timer{ io_context };
140 timer.expires_at(
decltype(timer)::time_point::max());
144 timer.expires_after(std::chrono::seconds{
connection->timeout_seconds_ });
154 auto receive_data_size =
co_await (
155 connection->socket_->async_receive(asio::buffer(
connection->read_msg_buffer_), asio::use_awaitable) ||
156 timer.async_wait(asio::use_awaitable));
157 if (not is_non_stop and std::holds_alternative<std::monostate>(receive_data_size))
161 spdlog::error(
"Connection {}: Message listening TIMEOUT after {} seconds.",
168 spdlog::info(
"Connection {}: Message listening TIMEOUT after {} seconds.",
174 auto read_msg = std::span{
connection->read_msg_buffer_.data(), std::get<std::size_t>(receive_data_size) };
187 template <
int buffer_size>
190 connection->signal_set_ = std::make_unique<asio::signal_set>(
co_await asio::this_coro::executor, SIGINT);
191 spdlog::trace(
"Connection {}: waiting for signals",
connection->get_name());
192 auto [error, sig_num] =
co_await connection->signal_set_->async_wait(asio::as_tuple(asio::use_awaitable));
193 if (error == asio::error::operation_aborted)
195 spdlog::trace(
"Connection {}: Signal ended with {}",
connection->get_name(), error.message());
201 "Connection {}: Signal ID {} is called with {:?}!",
connection->get_name(), sig_num, error.message());
206 template <
int buffer_size>
209 auto socket = std::make_unique<udp::socket>(
210 app_->get_io_context(), udp::endpoint{ udp::v4(), static_cast<asio::ip::port_type>(port_number) });
211 spdlog::debug(
"Connection {}: Openning the socket from ip: {} with port: {}",
213 socket->local_endpoint().address().to_string(),
214 socket->local_endpoint().port());
231 template <
int buffer_size>
234 using asio::experimental::awaitable_operators::operator||;
235 if (self.socket_ ==
nullptr)
237 self.socket_ = self.new_shared_socket(self.local_port_number_);
240 co_spawn(self.app_->get_io_context(),
244 spdlog::trace(
"Connection {}: spawned listen coroutine", self.name_);
247 template <
int buffer_size>
249 const std::vector<CommunicateEntryType>& data,
253 self.encode_write_msg(data, address);
254 co_spawn(self.app_->get_io_context(),
send_message(self.shared_from_this()), asio::detached);
255 spdlog::trace(
"Connection {}: spawned write coroutine", self.name_);
258 template <
int buffer_size>
264 spdlog::trace(
"Connection {}: Closing the socket ...",
name_);
269 spdlog::trace(
"Connection {}: cannelling signal ...",
name_);
271 spdlog::trace(
"Connection {}: signal is cancelled.",
name_);
273 spdlog::trace(
"Connection {}: Socket is closed and cancelled.",
name_);
static auto send_message(std::shared_ptr< Base > connection) -> asio::awaitable< void >
static auto timer_countdown(auto *connection) -> asio::awaitable< void >
process::SerializableMsgBuffer write_msg_buffer_
void set_send_message(const RangedData auto &msg)
auto get_read_msg_buffer() const -> const ReadBufferType< buffer_size > &
void encode_write_msg(const std::vector< CommunicateEntryType > &data, uint16_t address)
std::unique_ptr< asio::signal_set > signal_set_
udp::endpoint remote_endpoint_
void set_timeout_seconds(int val)
void set_socket(std::unique_ptr< asio::ip::udp::socket > socket)
static auto listen_message(SharedConnectionPtr auto connection, bool is_non_stop=false) -> asio::awaitable< void >
auto is_continuous() const -> bool
void listen(this auto &&self, bool is_non_stop=false)
void reset_read_msg_buffer()
void read_data_handle(std::span< BufferElementType > read_data)
auto get_local_port_number() const -> int
auto send_continuous_message() -> asio::experimental::coro< int(std::optional< std::string_view >)>
std::span< const char > continuous_send_msg_
auto get_name() const -> const std::string &
std::unique_ptr< udp::socket > socket_
auto get_remote_endpoint() -> const udp::endpoint &
std::atomic< bool > is_socket_closed_
void set_remote_endpoint(asio::ip::udp::endpoint endpoint)
void communicate(this auto &&self, const std::vector< CommunicateEntryType > &data, uint16_t address)
auto new_shared_socket(int port_number) -> std::unique_ptr< udp::socket >
void set_continuous(bool is_continuous=true)
static auto signal_handling(SharedConnectionPtr auto connection) -> asio::awaitable< void >
ReadBufferType< buffer_size > read_msg_buffer_
gsl::not_null< App * > app_
Base(const Info &info, std::string name)
auto get_socket() -> const udp::socket &
std::array< BufferElementType, buffer_size > ReadBufferType
constexpr auto WRITE_COMMAND_BITS
constexpr auto INIT_COUNT_VALUE
constexpr auto DEFAULT_TYPE_BITS
constexpr auto ZERO_UINT16_PADDING
constexpr auto COMMAND_LENGTH_BITS
constexpr auto DEFAULT_TIMEOUT_SECONDS
constexpr auto get_shared_from_this(auto &&obj)