SRS-control 0.1.4
Loading...
Searching...
No Matches
SpecialSocketBase.hpp
Go to the documentation of this file.
1#pragma once
2
6#include <boost/asio/as_tuple.hpp>
7#include <boost/asio/awaitable.hpp>
8#include <boost/asio/experimental/awaitable_operators.hpp>
9#include <boost/asio/system_timer.hpp>
10#include <boost/asio/this_coro.hpp>
11#include <boost/asio/use_awaitable.hpp>
12#include <boost/asio/use_future.hpp>
13#include <boost/system/detail/error_code.hpp>
14#include <chrono>
15#include <concepts>
16#include <expected>
17#include <fmt/ranges.h>
18#include <future>
19#include <memory>
20#include <optional>
21#include <span>
22#include <spdlog/spdlog.h>
23#include <type_traits>
24#include <utility>
25#include <variant>
26
27namespace srs::connection
28{
29 class SpecialSocket;
30
31 template <typename SocketType>
33 requires(SocketType socket, std::shared_ptr<typename SocketType::ConnectionType> connection) {
34 requires std::derived_from<SocketType, SpecialSocket>;
35 requires not std::is_same_v<SocketType, SpecialSocket>;
36
37 asio::buffer(socket.get_response_msg_buffer());
38
40
41 { socket.response_handler(UDPEndpoint{}, std::span<char>{}) } -> std::same_as<void>;
42 { socket.is_finished() } -> std::same_as<bool>;
43 { socket.print_error() } -> std::same_as<void>;
44 { socket.register_send_action_imp(asio::awaitable<void>{}, connection) } -> std::same_as<void>;
45 };
46
47 class SpecialSocket : public std::enable_shared_from_this<SpecialSocket>
48 {
49 public:
50 template <SpecialSocketDerived SocketType, typename... Args>
51 static auto create(int port_number, io_context_type& io_context, Args... args)
52 -> std::expected<std::shared_ptr<SocketType>, boost::system::error_code>;
53
55 this auto&& self,
56 asio::awaitable<void> action,
57 const std::shared_ptr<typename std::remove_cvref_t<decltype(self)>::ConnectionType>& connection);
58
59 void listen(this auto& self, io_context_type& io_context);
60
61 auto is_valid() const -> bool { return is_valid_; }
62
63 auto cancel_listen_after(this auto&& self,
64 io_context_type& io_context,
65 std::chrono::seconds waiting_time = std::chrono::seconds(2)) -> std::future<void>;
66
67 auto wait_for_listen_finish(std::chrono::seconds time) -> std::optional<std::future_status>;
68
69 // getters:
70 [[nodiscard]] auto get_socket() const -> auto& { return *socket_; }
71 [[nodiscard]] auto get_port() const { return port_number_; }
72 [[nodiscard]] auto get_socket_error_code() const -> auto { return socket_ec_; }
73 auto get_future() -> auto& { return listen_future_; }
74
75 protected:
76 explicit SpecialSocket(int port_number, io_context_type& io_context);
77 auto get_cancel_timer() -> auto& { return cancel_timer_; };
78
79 private:
80 bool is_valid_ = false;
81 int port_number_ = 0;
82 std::unique_ptr<udp::socket> socket_;
83 std::shared_future<std::variant<std::monostate, std::monostate>> listen_future_;
84 boost::system::error_code socket_ec_;
85 asio::system_timer cancel_timer_;
86
87 auto cancel_coroutine() -> asio::awaitable<void>;
88 void bind_socket();
89 void close_socket();
90
91 // NOTE: Coroutine should always be static to avoid lifetime issue.
92 template <SpecialSocketDerived SocketType>
93 static auto listen_all_connections(std::shared_ptr<SocketType> socket) -> asio::awaitable<void>;
94 };
95
96 template <SpecialSocketDerived SocketType, typename... Args>
97 auto SpecialSocket::create(int port_number, io_context_type& io_context, Args... args)
98 -> std::expected<std::shared_ptr<SocketType>, boost::system::error_code>
99 {
100 auto socket =
101 std::shared_ptr<SocketType>(new SocketType{ port_number, io_context, std::forward<Args>(args)... });
102 auto error_code = socket->get_socket_error_code();
103 if (error_code)
104 {
105 spdlog::critical("Local socket failed to bind to the port {} due to the error: {}",
106 socket->get_port(),
107 error_code.message());
108 return std::unexpected{ error_code };
109 }
110 socket->listen(io_context);
111 return socket;
112 }
113
114 template <SpecialSocketDerived SocketType>
115 auto SpecialSocket::listen_all_connections(std::shared_ptr<SocketType> socket) -> asio::awaitable<void>
116 {
117 spdlog::debug("Local socket with port {} starts to listen ...", socket->get_port());
118 socket->bind_socket();
119 auto remote_endpoint = udp::endpoint{};
120 while (true)
121 {
122 const auto [err_code, receive_size] = co_await socket->get_socket().async_receive_from(
123 asio::buffer(socket->get_response_msg_buffer()), remote_endpoint, asio::as_tuple(asio ::use_awaitable));
124 if (not err_code)
125 {
126 auto read_msg = std::span{ socket->get_response_msg_buffer().data(), receive_size };
127 [[maybe_unused]] auto remote_ec = boost::system::error_code{};
128 socket->response_handler(remote_endpoint, read_msg);
129 }
130 }
131 // TODO: close the socket here
132 spdlog::trace("Coroutine for the local socket with port {} has existed.", socket->get_port());
133 co_return;
134 }
135
137 this auto&& self,
138 asio::awaitable<void> action,
139 const std::shared_ptr<typename std::remove_cvref_t<decltype(self)>::ConnectionType>& connection)
140 {
141 spdlog::trace("Registering send action from connection {} with remote endpoint {}.",
142 connection->get_name(),
143 connection->get_remote_endpoint());
144 self.register_send_action_imp(std::move(action), connection);
145 spdlog::trace("Send action to the remote endpoint {} has been registered.",
146 connection->get_name(),
147 connection->get_remote_endpoint());
148 }
149
150 void SpecialSocket::listen(this auto& self, io_context_type& io_context)
151 {
152 using boost::asio::experimental::awaitable_operators::operator||;
153 self.listen_future_ =
154 asio::co_spawn(io_context,
155 listen_all_connections(common::get_shared_from_this(self)) || self.cancel_coroutine(),
156 asio::use_future)
157 .share();
158 }
159
161 io_context_type& io_context,
162 std::chrono::seconds waiting_time) -> std::future<void>
163 {
164 auto waiting_action = [](auto socket, std::chrono::seconds waiting_time) -> asio::awaitable<void>
165 {
166 spdlog::trace("Waiting local socket with port {} to finish listening ...", socket->get_port());
167 if (socket->is_finished())
168 {
169 spdlog::trace("Local socket with port {} finished listening.", socket->get_port());
170 co_return;
171 }
172 auto timer = asio::system_timer{ co_await asio::this_coro::executor };
173 timer.expires_after(waiting_time);
174 [[maybe_unused]] auto err_code = co_await timer.async_wait(asio::as_tuple(asio::use_awaitable));
175 socket->cancel_timer_.cancel();
176 if (not socket->is_finished())
177 {
178 socket->print_error();
179 }
180 };
181 return asio::co_spawn(
182 io_context, waiting_action(common::get_shared_from_this(self), waiting_time), asio::use_future);
183 }
184} // namespace srs::connection
auto get_socket() const -> auto &
auto cancel_listen_after(this auto &&self, io_context_type &io_context, std::chrono::seconds waiting_time=std::chrono::seconds(2)) -> std::future< void >
asio::system_timer cancel_timer_
Used for cancel the unfinished coroutine.
auto cancel_coroutine() -> asio::awaitable< void >
static auto create(int port_number, io_context_type &io_context, Args... args) -> std::expected< std::shared_ptr< SocketType >, boost::system::error_code >
auto wait_for_listen_finish(std::chrono::seconds time) -> std::optional< std::future_status >
auto get_socket_error_code() const -> auto
std::unique_ptr< udp::socket > socket_
boost::system::error_code socket_ec_
void listen(this auto &self, io_context_type &io_context)
SpecialSocket(int port_number, io_context_type &io_context)
void register_send_action(this auto &&self, asio::awaitable< void > action, const std::shared_ptr< typename std::remove_cvref_t< decltype(self)>::ConnectionType > &connection)
std::shared_future< std::variant< std::monostate, std::monostate > > listen_future_
static auto listen_all_connections(std::shared_ptr< SocketType > socket) -> asio::awaitable< void >
constexpr auto get_shared_from_this(auto &&obj)
boost::asio::ip::basic_endpoint< boost::asio::ip::udp > UDPEndpoint
asio::thread_pool io_context_type