9#include <boost/asio/awaitable.hpp>
10#include <boost/asio/deferred.hpp>
11#include <boost/asio/experimental/cancellation_condition.hpp>
12#include <boost/asio/experimental/parallel_group.hpp>
13#include <boost/asio/impl/co_spawn.hpp>
14#include <boost/asio/strand.hpp>
15#include <boost/asio/use_future.hpp>
16#include <fmt/ranges.h>
21#include <spdlog/spdlog.h>
29 auto remove_once_if(
auto& connections,
auto unitary_opt) ->
bool
31 auto iter = std::ranges::find_if(
32 connections, [unitary_opt](
const auto& connection) ->
bool {
return not unitary_opt(connection); });
33 if (iter == connections.end())
37 std::iter_swap(iter, connections.end() - 1);
38 connections.pop_back();
42 auto print_connections(
const auto& connections) -> std::string
44 auto join_connection = [](
const auto& connection)
45 {
return fmt::format(
"{:02x}", fmt::join(connection->get_response_msg(),
" ")); };
46 return fmt::format(
"{}", fmt::join(connections | std::views::transform(join_connection),
"\n\t"));
52 ,
strand_{ asio::make_strand(io_context.get_executor()) }
59 const std::shared_ptr<SmallConnection>&
connection)
63 "Registering data sent to the remote endpoint {} at time {} us",
connection->get_remote_endpoint(), time);
65 auto co_action = asio::co_spawn(
strand_, std::move(action), asio::deferred);
68 auto lock = std::lock_guard{
mut_ };
69 const auto& remote_endpoint =
connection->get_remote_endpoint();
76 asio::experimental::make_parallel_group(std::move(
action_queue_))
77 .async_wait(asio::experimental::wait_for_all(), asio::use_future)
82 std::span<char> response,
85 if (connections.empty())
87 spdlog::debug(
"Response from the remote endpoint {} to the local port {} is received. But no further "
88 "message from the endpoint is not required anymore!",
93 auto res = remove_once_if(connections,
94 [response](
const std::shared_ptr<SmallConnection>&
connection) ->
bool
96 auto is_same =
connection->check_response(response);
101 spdlog::debug(
"Response from the remote endpoint {} is recognized by the local socket with port {}",
108 spdlog::warn(
"From remote endpoint: {}, local socket with port {} received an unrecognized msg: {:02x}",
111 fmt::join(response,
" "));
119 "Available responses are:\n\t{}",
121 std::views::filter([](
const auto& connections) ->
bool {
return not connections.empty(); }) |
122 std::views::transform([](
const auto& connections) {
return print_connections(connections); }),
129 "TIMEOUT during local port {} waiting for the responses from the following ip/port:\n\t{}",
133 std::views::filter([](
const auto& key_value) ->
bool {
return not key_value.second.empty(); }) |
134 std::views::transform(
135 [](
const auto& key_value)
136 {
return fmt::format(
"{}: {}", key_value.first, print_connections(key_value.second)); }),
142 auto lock = std::lock_guard{
mut_ };
143 spdlog::trace(
"Local port {} received a response from a remote endpoint {}: \n\t{:02x}",
146 fmt::join(response,
" "));
150 spdlog::error(
"Local socket with port {} received an unknown remote endpoint: {}",
get_port(), endpoint);
160 [](
const auto& connections) ->
bool {
return connections.empty(); });
FecCommandSocket(int port_number, io_context_type &io_context)
void deregister_connection(const UDPEndpoint &endpoint, std::span< char > response, SmallConnections &connections)
std::map< UDPEndpoint, SmallConnections > all_connections_
std::vector< ActionType > action_queue_
auto get_time_us() const -> auto
auto is_finished() -> bool
void register_send_action_imp(asio::awaitable< void > action, const std::shared_ptr< SmallConnection > &connection)
void print_available_responses() const
asio::strand< io_context_type::executor_type > strand_
void response_handler(const UDPEndpoint &endpoint, std::span< char > response)
std::vector< char > read_msg_buffer_
std::vector< std::shared_ptr< SmallConnection > > SmallConnections
constexpr auto SMALL_READ_MSG_BUFFER_SIZE
boost::asio::ip::basic_endpoint< boost::asio::ip::udp > UDPEndpoint
asio::thread_pool io_context_type