SRS-control 0.1.4
Loading...
Searching...
No Matches
SRSEmulator.cpp
Go to the documentation of this file.
1#include "SRSEmulator.hpp"
8#include <algorithm>
9#include <boost/asio/any_io_executor.hpp>
10#include <boost/asio/as_tuple.hpp>
11#include <boost/asio/awaitable.hpp>
12#include <boost/asio/buffer.hpp>
13#include <boost/asio/detached.hpp>
14#include <boost/asio/impl/co_spawn.hpp>
15#include <boost/asio/ip/address.hpp>
16#include <boost/asio/ip/basic_endpoint.hpp>
17#include <boost/asio/ip/udp.hpp>
18#include <boost/asio/system_timer.hpp>
19#include <boost/asio/this_coro.hpp>
20#include <boost/asio/use_awaitable.hpp>
21#include <boost/asio/use_future.hpp>
22#include <chrono>
23#include <cstddef>
24#include <fmt/ranges.h>
25#include <magic_enum/magic_enum.hpp>
26#include <map>
27#include <memory>
28#include <optional>
29#include <ranges>
30#include <spdlog/spdlog.h>
31#include <string>
32#include <string_view>
33#include <utility>
34#include <vector>
35
36namespace srs::test
37{
38 namespace
39 {
40 auto get_msg_from_receive_type(SRSEmulator::ReceiveType rec_type) -> std::string
41 {
42 using enum SRSEmulator::ReceiveType;
43 switch (rec_type)
44 {
45 case invalid:
46 return {};
47 case acq_on:
48 {
49 auto acq_on_connect = connection::Starter{};
50 return std::string{ acq_on_connect.get_response_msg() };
51 }
52 case acq_off:
53 auto acq_off_connect = connection::Stopper{};
54 return std::string{ acq_off_connect.get_response_msg() };
55 }
56 return {};
57 }
58 auto get_send_msg_from_receive_type(SRSEmulator::ReceiveType rec_type) -> std::string
59 {
60 auto buffer = process::SerializableMsgBuffer{};
61 using enum SRSEmulator::ReceiveType;
62 switch (rec_type)
63 {
64 case invalid:
65 return {};
66 case acq_on:
67 {
68 auto acq_on_connect = connection::Starter{};
69 const auto& send_suffix = acq_on_connect.get_send_suffix();
70 buffer.serialize(send_suffix);
71 return std::string{ buffer.data() };
72 }
73 case acq_off:
74 auto acq_off_connect = connection::Stopper{};
75 const auto& send_suffix = acq_off_connect.get_send_suffix();
76 buffer.serialize(send_suffix);
77 return std::string{ buffer.data() };
78 }
79 return {};
80 }
81
82 auto check_receive_msg_type(std::string_view msg) -> SRSEmulator::ReceiveType
83 {
84 using enum SRSEmulator::ReceiveType;
85 static const auto msg_map = std::map<std::string, SRSEmulator::ReceiveType>{
86 { get_send_msg_from_receive_type(acq_on), acq_on },
87 { get_send_msg_from_receive_type(acq_off), acq_off },
88 };
89 auto msg_iter = std::ranges::find_if(
90 msg_map, [msg](const auto& suffix_type) -> bool { return msg.ends_with(suffix_type.first); });
91 // auto msg_iter = msg_map.find_if(std::string{ msg }, );
92 spdlog::trace("msg_map available options {}",
93 fmt::join(msg_map | std::views::transform(
94 [](const auto& str_type) -> std::string
95 {
96 return fmt::format("{}: {:02x}",
97 magic_enum::enum_name(str_type.second),
98 fmt::join(str_type.first, " "));
99 }),
100 "\n"));
101 if (msg_iter == msg_map.end())
102 {
103 return invalid;
104 }
105 return msg_iter->second;
106 }
107 } // namespace
108
110 : config_{ config }
112 asio::ip::udp::endpoint{ asio::ip::udp::v4(),
113 static_cast<asio::ip::port_type>(config_.listen_port) } }
114 , frame_reader_{ std::string{ config.filename } }
115 {
116 data_sender_status_->expires_at(std::chrono::system_clock::time_point::max());
117 }
118
120 {
121
122 // NOLINTBEGIN (clang-analyzer-core.CallAndMessage)
123 asio::co_spawn(io_context_, listen_coro(), asio::detached);
124 // NOLINTEND (clang-analyzer-core.CallAndMessage)
125 io_context_.join();
126 }
127
129 {
130 auto timer_waiter = [](std::shared_ptr<asio::system_timer> timer) -> asio::awaitable<void>
131 { [[maybe_unused]] auto [err] = co_await timer->async_wait(asio::as_tuple(asio::use_awaitable)); };
132 asio::co_spawn(io_context_, timer_waiter(data_sender_status_), asio::use_future).get();
133 }
134
135 void SRSEmulator::do_if_acq_on(asio::any_io_executor& executor)
136 {
137 if (is_idle_.load())
138 {
139 is_idle_.store(true);
140
141 // spdlog::trace("Server: start_send_data spawning ...");
142 asio::co_spawn(executor, start_send_data(), asio::detached);
143 // spdlog::trace("Server: start_send_data spawned!");
144 }
145 }
146
148
149 auto SRSEmulator::send_response(connection::udp::endpoint endpoint, ReceiveType result_type)
150 -> asio::awaitable<void>
151 {
152 auto msg = get_msg_from_receive_type(result_type);
153 [[maybe_unused]] const auto size =
154 co_await udp_socket_.async_send_to(asio::buffer(msg), endpoint, asio::use_awaitable);
155 spdlog::trace("SRS server: sent the data {:02x} to the remote point {}", fmt::join(msg, " "), endpoint);
156 }
157
158 auto SRSEmulator::listen_coro() -> asio::awaitable<void>
159 {
160 auto msg_buffer = std::vector<char>{};
161 msg_buffer.resize(common::SMALL_READ_MSG_BUFFER_SIZE);
162 auto executor = co_await asio::this_coro::executor;
163 for (;;)
164 {
165 spdlog::trace("SRS server: listening on the local port {}", udp_socket_.local_endpoint());
166 auto remote_endpoint = asio::ip::udp::endpoint{};
167 auto msg_size =
168 co_await udp_socket_.async_receive_from(asio::buffer(msg_buffer), remote_endpoint, asio::use_awaitable);
169 auto msg = std::string_view{ msg_buffer.data(), msg_size };
170 auto result = check_receive_msg_type(msg);
171 spdlog::trace("SRS server: Request type {:?} received with the msg: {:02x}",
172 magic_enum::enum_name(result),
173 fmt::join(msg, " "));
174 asio::co_spawn(io_context_, send_response(std::move(remote_endpoint), result), asio::detached);
175
176 using enum ReceiveType;
177 switch (result)
178 {
179 case invalid:
180 break;
181 case acq_on:
182 do_if_acq_on(executor);
183 break;
184 case acq_off:
186 spdlog::trace("SRS server: existing coroutine");
187 co_return;
188 break;
189 }
190 }
191 }
192
193 auto SRSEmulator::start_send_data() -> asio::awaitable<void>
194 {
195 spdlog::info("Server: Starting to send data from emulator ...");
196 auto total_size = std::size_t{ 0 };
197 auto executor = co_await asio::this_coro::executor;
199 asio::ip::udp::endpoint{
200 asio::ip::make_address("127.0.0.1"),
201 static_cast<asio::ip::port_type>(config_.data_port) } };
202 auto send_coro =
203 common::create_coro_task([&connection]() { return connection.send_continuous_message(); }, executor);
204
205 while (not is_shutdown_.load())
206 {
207 auto read_str = frame_reader_.read_one_frame();
208 if (not read_str.has_value())
209 {
210 spdlog::critical("Server: error occurred from the reading the frame with msg: {}", read_str.error());
211 break;
212 }
213 if (read_str.value().empty())
214 {
215 if (is_continue_.load())
216 {
217 frame_reader_.reset();
218 auto read_str = frame_reader_.read_one_frame();
219 }
220 else
221 {
222 co_await send_coro.async_resume(std::optional<std::string_view>{}, asio::use_awaitable);
223 break;
224 }
225 }
226 data_sending_control_.expires_after(delay_time_);
227 co_await data_sending_control_.async_wait(asio::use_awaitable);
228
229 auto send_size = co_await send_coro.async_resume(std::optional{ read_str.value() }, asio::use_awaitable);
230 total_size += send_size.value_or(0);
231 }
232 spdlog::debug("reaching the end of send_data coroutine. Sent total data size: {}", total_size);
233 data_sender_status_->cancel();
234 }
235
236} // namespace srs::test
static auto get_send_suffix() -> const auto &
static auto get_send_suffix() -> const auto &
std::atomic< bool > is_continue_
auto send_response(connection::udp::endpoint endpoint, ReceiveType result_type) -> asio::awaitable< void >
SRSEmulator(const Config &config)
std::shared_ptr< asio::system_timer > data_sender_status_
void do_if_acq_on(asio::any_io_executor &executor)
auto start_send_data() -> asio::awaitable< void >
std::atomic< bool > is_idle_
std::atomic< bool > is_shutdown_
asio::steady_timer data_sending_control_
IOContextType io_context_
std::chrono::microseconds delay_time_
asio::ip::udp::socket udp_socket_
reader::RawFrame frame_reader_
auto listen_coro() -> asio::awaitable< void >
asio::ip::udp udp
Definition main.cpp:9
auto create_coro_task(auto task, const asio::any_io_executor &executor)
constexpr auto SMALL_READ_MSG_BUFFER_SIZE