SRS-control 0.1.4
Loading...
Searching...
No Matches
UDPWriter.hpp
Go to the documentation of this file.
1#pragma once
2
7#include <boost/asio/buffer.hpp>
8#include <boost/asio/experimental/coro.hpp>
9#include <boost/asio/ip/udp.hpp>
10#include <boost/asio/use_awaitable.hpp>
11#include <boost/asio/uses_executor.hpp>
12#include <boost/system/detail/error_code.hpp>
13#include <boost/thread/future.hpp>
14#include <cassert>
15#include <cstddef>
16#include <memory>
17#include <optional>
18#include <spdlog/spdlog.h>
21#include <string_view>
22#include <utility>
23#include <vector>
24
25namespace srs::connection
26{
28 {
29 public:
30 explicit UDPWriterConnection(io_context_type& io_executor, asio::ip::udp::endpoint remote_endpoint)
31 : io_context_{ &io_executor }
32 , remote_endpoint_{ std::move(remote_endpoint) }
33 , socket_{ io_executor, asio::ip::udp::endpoint{ asio::ip::udp::v4(), 0 } }
34 {
35 }
36 using OutputType = std::size_t;
37 using InputType = std::string_view;
38
39 [[nodiscard]] auto get_executor() const { return io_context_->get_executor(); }
40
42 {
43 const auto read_size =
44 (not input_data.empty()) ? socket_.send_to(asio::buffer(input_data), remote_endpoint_, 0, err_) : 0;
45 return read_size;
46 }
47
48 void close()
49 {
50 auto error = boost::system::error_code{};
51 socket_.close(error);
52 if (error)
53 {
54 spdlog::warn("UDP: Failed to close the socket with endpoint: {} due to the error: {}",
55 socket_.local_endpoint(),
56 error.what());
57 }
58 }
59
60 auto send_continuous_message() -> asio::experimental::coro<OutputType(std::optional<InputType>)>
61 {
62 auto total_size = std::size_t{};
63 spdlog::debug("UDP: Starting to send data to the remote point {}", remote_endpoint_);
64 auto msg = co_yield OutputType{};
65 while (true)
66 {
67 if (not msg.has_value())
68 {
69 break;
70 }
71 const auto read_size =
72 (not msg.value().empty()) ? socket_.send_to(asio::buffer(msg.value()), remote_endpoint_) : 0;
73 total_size += read_size;
74 msg = co_yield read_size;
75 }
76 close();
77 spdlog::debug("UDP: Stopped sending the message. Sent bytes: {}", total_size);
78 co_return;
79 }
80
81 private:
82 boost::system::error_code err_;
84 asio::ip::udp::endpoint remote_endpoint_;
85 asio::ip::udp::socket socket_;
86 };
87} // namespace srs::connection
88
89namespace srs::writer
90{
91 class UDP : public process::WriterTask<DataWriterOption::udp, std::string_view, std::size_t>
92 {
93 public:
94 UDP(io_context_type& io_context,
95 asio::ip::udp::endpoint remote_endpoint,
96 std::size_t n_lines,
98 static constexpr auto IsStructType = false;
99
100 ~UDP() noexcept;
101 UDP(const UDP&) = delete;
102 UDP(UDP&&) = default;
103 UDP& operator=(const UDP&) = delete;
104 UDP& operator=(UDP&&) = default;
105
106 auto run(const OutputTo<InputType> auto& prev_data_converter, std::size_t line_number = 0) -> RunResult
107 {
108 assert(line_number < get_n_lines());
109 output_data_[line_number] = connections_[line_number]->send_sync_message(prev_data_converter(line_number));
110 return output_data_[line_number];
111 }
112
113 [[nodiscard]] auto is_deserialize_valid() const
114 {
115 return get_required_conversion() == raw or get_required_conversion() == proto;
116 }
117 [[nodiscard]] auto operator()(std::size_t line_number = 0) const -> OutputType
118 {
119 return output_data_[line_number];
120 }
121
122 private:
123 std::vector<OutputType> output_data_;
124 std::vector<std::unique_ptr<connection::UDPWriterConnection>> connections_;
126 };
127
128} // namespace srs::writer
asio::ip::udp::endpoint remote_endpoint_
Definition UDPWriter.hpp:84
boost::system::error_code err_
Definition UDPWriter.hpp:82
UDPWriterConnection(io_context_type &io_executor, asio::ip::udp::endpoint remote_endpoint)
Definition UDPWriter.hpp:30
auto send_continuous_message() -> asio::experimental::coro< OutputType(std::optional< InputType >)>
Definition UDPWriter.hpp:60
auto send_sync_message(InputType input_data) -> OutputType
Definition UDPWriter.hpp:41
std::expected< OutputType, std::string_view > RunResult
auto operator()(std::size_t line_number=0) const -> OutputType
std::vector< std::unique_ptr< connection::UDPWriterConnection > > connections_
static constexpr auto IsStructType
Definition UDPWriter.hpp:98
auto run(const OutputTo< InputType > auto &prev_data_converter, std::size_t line_number=0) -> RunResult
auto is_deserialize_valid() const
UDP(io_context_type &io_context, asio::ip::udp::endpoint remote_endpoint, std::size_t n_lines, process::DataConvertOptions deser_mode=process::DataConvertOptions::none)
Definition UDPWriter.cpp:14
~UDP() noexcept
Definition UDPWriter.cpp:28
std::vector< OutputType > output_data_
boost::asio::ip::udp udp
asio::thread_pool io_context_type