SRS-control 0.1.4
Loading...
Searching...
No Matches
FecSwitchSocket.cpp
Go to the documentation of this file.
1#include "FecSwitchSocket.hpp"
2#include "srs/connections/ConnectionBase.hpp" // IWYU pragma: keep
7#include "srs/utils/UDPFormatters.hpp" // IWYU pragma: keep
8#include <algorithm>
9#include <boost/asio/awaitable.hpp>
10#include <boost/asio/deferred.hpp>
11#include <boost/asio/experimental/cancellation_condition.hpp>
12#include <boost/asio/experimental/parallel_group.hpp>
13#include <boost/asio/impl/co_spawn.hpp>
14#include <boost/asio/strand.hpp>
15#include <boost/asio/use_future.hpp>
16#include <fmt/ranges.h>
17#include <memory>
18#include <mutex>
19#include <ranges>
20#include <span>
21#include <spdlog/spdlog.h>
22#include <string>
23#include <utility>
24
25namespace srs::connection
26{
27 namespace
28 {
29 auto remove_once_if(auto& connections, auto unitary_opt) -> bool
30 {
31 auto iter = std::ranges::find_if(
32 connections, [unitary_opt](const auto& connection) -> bool { return not unitary_opt(connection); });
33 if (iter == connections.end())
34 {
35 return false;
36 }
37 std::iter_swap(iter, connections.end() - 1);
38 connections.pop_back();
39 return true;
40 }
41
42 auto print_connections(const auto& connections) -> std::string
43 {
44 auto join_connection = [](const auto& connection)
45 { return fmt::format("{:02x}", fmt::join(connection->get_response_msg(), " ")); };
46 return fmt::format("{}", fmt::join(connections | std::views::transform(join_connection), "\n\t"));
47 }
48 } // namespace
49
51 : SpecialSocket{ port_number, io_context }
52 , strand_{ asio::make_strand(io_context.get_executor()) }
53
54 {
56 }
57
58 void FecCommandSocket::register_send_action_imp(asio::awaitable<void> action,
59 const std::shared_ptr<SmallConnection>& connection)
60 {
61 auto time = get_time_us();
62 spdlog::debug(
63 "Registering data sent to the remote endpoint {} at time {} us", connection->get_remote_endpoint(), time);
64 // asio::co_spawn(strand_, std::move(action), asio::detached);
65 auto co_action = asio::co_spawn(strand_, std::move(action), asio::deferred);
66 action_queue_.push_back(std::move(co_action));
67
68 auto lock = std::lock_guard{ mut_ };
69 const auto& remote_endpoint = connection->get_remote_endpoint();
70 auto& connections = all_connections_.try_emplace(remote_endpoint, SmallConnections{}).first->second;
71 connections.push_back(connection);
72 }
73
75 {
76 asio::experimental::make_parallel_group(std::move(action_queue_))
77 .async_wait(asio::experimental::wait_for_all(), asio::use_future)
78 .get();
79 }
80
82 std::span<char> response,
83 SmallConnections& connections)
84 {
85 if (connections.empty())
86 {
87 spdlog::debug("Response from the remote endpoint {} to the local port {} is received. But no further "
88 "message from the endpoint is not required anymore!",
89 endpoint,
90 get_port());
91 return;
92 }
93 auto res = remove_once_if(connections,
94 [response](const std::shared_ptr<SmallConnection>& connection) -> bool
95 {
96 auto is_same = connection->check_response(response);
97 return not is_same;
98 });
99 if (res)
100 {
101 spdlog::debug("Response from the remote endpoint {} is recognized by the local socket with port {}",
102 endpoint,
103 get_port());
104 // print_available_responses();
105 }
106 else
107 {
108 spdlog::warn("From remote endpoint: {}, local socket with port {} received an unrecognized msg: {:02x}",
109 endpoint,
110 get_port(),
111 fmt::join(response, " "));
113 }
114 }
115
117 {
118 spdlog::info(
119 "Available responses are:\n\t{}",
120 fmt::join(all_connections_ | std::views::values |
121 std::views::filter([](const auto& connections) -> bool { return not connections.empty(); }) |
122 std::views::transform([](const auto& connections) { return print_connections(connections); }),
123 "\n\t"));
124 }
125
127 {
128 spdlog::error(
129 "TIMEOUT during local port {} waiting for the responses from the following ip/port:\n\t{}",
130 get_port(),
131 fmt::join(
133 std::views::filter([](const auto& key_value) -> bool { return not key_value.second.empty(); }) |
134 std::views::transform(
135 [](const auto& key_value)
136 { return fmt::format("{}: {}", key_value.first, print_connections(key_value.second)); }),
137 "\n\t"));
138 }
139
140 void FecCommandSocket::response_handler(const UDPEndpoint& endpoint, std::span<char> response)
141 {
142 auto lock = std::lock_guard{ mut_ };
143 spdlog::trace("Local port {} received a response from a remote endpoint {}: \n\t{:02x}",
144 get_port(),
145 endpoint,
146 fmt::join(response, " "));
147 auto connections_iter = all_connections_.find(endpoint);
148 if (connections_iter == all_connections_.end())
149 {
150 spdlog::error("Local socket with port {} received an unknown remote endpoint: {}", get_port(), endpoint);
151 return;
152 }
153 deregister_connection(endpoint, response, connections_iter->second);
154 }
155
157 {
158
159 return std::ranges::all_of(all_connections_ | std::views::values,
160 [](const auto& connections) -> bool { return connections.empty(); });
161 }
162} // namespace srs::connection
FecCommandSocket(int port_number, io_context_type &io_context)
void deregister_connection(const UDPEndpoint &endpoint, std::span< char > response, SmallConnections &connections)
std::map< UDPEndpoint, SmallConnections > all_connections_
std::vector< ActionType > action_queue_
void register_send_action_imp(asio::awaitable< void > action, const std::shared_ptr< SmallConnection > &connection)
asio::strand< io_context_type::executor_type > strand_
void response_handler(const UDPEndpoint &endpoint, std::span< char > response)
std::vector< std::shared_ptr< SmallConnection > > SmallConnections
constexpr auto SMALL_READ_MSG_BUFFER_SIZE
boost::asio::ip::basic_endpoint< boost::asio::ip::udp > UDPEndpoint
asio::thread_pool io_context_type