SRS-control 0.1.4
Loading...
Searching...
No Matches
DataWriter.cpp
Go to the documentation of this file.
1#include "DataWriter.hpp"
4#include <boost/asio/ip/udp.hpp>
5#include <boost/asio/thread_pool.hpp>
6#include <boost/system/detail/error_code.hpp>
7#include <boost/thread/futures/wait_for_all.hpp>
8#include <magic_enum/magic_enum.hpp>
9#include <map>
10#include <memory>
11#include <optional>
12#include <ranges>
13#include <spdlog/spdlog.h>
19#include <string>
20#include <string_view>
21#include <utility>
22#include <vector>
23
24namespace srs::writer
25{
26 namespace
27 {
28 auto convert_str_to_endpoint(asio::thread_pool& thread_pool, std::string_view ip_port)
29 -> std::optional<asio::ip::udp::endpoint>
30 {
31 const auto question_pos = ip_port.find('?');
32 const auto colon_pos = ip_port.find(':');
33 if (colon_pos == std::string::npos)
34 {
35 spdlog::critical("Ill format socket string {:?}. Please set it as \"ip:port\"", ip_port);
36 return {};
37 }
38 auto ip_string =
39 (question_pos == std::string::npos) ? ip_port.substr(0, colon_pos) : ip_port.substr(0, question_pos);
40 auto port_str = ip_port.substr(colon_pos + 1);
41
42 auto err_code = boost::system::error_code{};
43 auto resolver = asio::ip::udp::resolver{ thread_pool };
44 auto iter = resolver.resolve(std::string{ ip_string }, std::string{ port_str }, err_code).begin();
45 if (err_code)
46 {
47 spdlog::critical("Cannot query the ip address {:?}. Error code: {}", ip_port, err_code.message());
48 return {};
49 }
50 return *iter;
51 }
52
53 auto check_root_dependency() -> bool
54 {
55#ifdef HAS_ROOT
56 return true;
57#else
58 spdlog::error("Cannot output to a root file. Please make sure the program is "
59 "built with the ROOT library.");
60 return false;
61#endif
62 }
63
64 } // namespace
65
67 : workflow_handler_{ processor }
68 {
69 }
70
71 Manager::~Manager() = default;
72
74 {
75 auto is_required = false;
77 [&is_required, dependee](std::string_view, auto& writer)
78 { is_required |= convert_option_has_dependency(dependee, writer.get_required_conversion()); });
79 return is_required;
80 }
81
83 {
84 constexpr auto conversions = magic_enum::enum_values<process::DataConvertOptions>();
85 return std::views::transform(conversions,
86 [this](const auto conversion) -> std::pair<process::DataConvertOptions, bool>
87 { return std::pair{ conversion, is_convert_required(conversion) }; }) |
88 std::ranges::to<std::map<process::DataConvertOptions, bool>>();
89 }
90
91 void Manager::wait_for_finished() { boost::wait_for_all(write_futures_.begin(), write_futures_.end()); }
92
93 auto Manager::add_binary_file(const std::string& filename, process::DataConvertOptions prev_conversion) -> bool
94 {
95 return binary_files_
96 .try_emplace(filename,
97 std::make_unique<BinaryFile>(filename, prev_conversion, workflow_handler_->get_n_lines()))
98 .second;
99 }
100
101 auto Manager::add_udp_file(const std::string& filename, process::DataConvertOptions prev_conversion) -> bool
102 {
103 auto& app = workflow_handler_->get_app();
104 auto endpoint = convert_str_to_endpoint(app.get_io_context(), filename);
105 if (endpoint.has_value())
106 {
107 return udp_files_
108 .try_emplace(filename,
109 std::make_unique<UDP>(app.get_io_context(),
110 std::move(endpoint.value()),
111 workflow_handler_->get_n_lines(),
112 prev_conversion))
113 .second;
114 }
115 return false;
116 }
117
118 auto Manager::add_root_file([[maybe_unused]] const std::string& filename,
119 [[maybe_unused]] process::DataConvertOptions prev_conversion) -> bool
120 {
121#ifdef HAS_ROOT
122 return root_files_
123 .try_emplace(
124 filename,
125 std::make_unique<RootFile>(filename.c_str(), prev_conversion, workflow_handler_->get_n_lines()))
126 .second;
127#else
128 return false;
129#endif
130 }
131
132 auto Manager::add_json_file(const std::string& filename, process::DataConvertOptions prev_conversion) -> bool
133 {
134 return json_files_
135 .try_emplace(filename, std::make_unique<Json>(filename, prev_conversion, workflow_handler_->get_n_lines()))
136 .second;
137 }
138
139 void Manager::set_output_filenames(const std::vector<std::string>& filenames)
140 {
141 for (const auto& filename : filenames)
142 {
143 if (filename.empty())
144 {
145 continue;
146 }
147 const auto [filetype, convert_mode] = get_filetype_from_filename(filename);
148
149 auto is_ok = false;
150
151 switch (filetype)
152 {
153 case no_output:
154 spdlog::error("Extension of the filename {:?} cannot be recognized!", filename);
155 continue;
156 break;
157 case bin:
158 is_ok = add_binary_file(filename, convert_mode);
159 break;
160 case udp:
161 is_ok = add_udp_file(filename, convert_mode);
162 break;
163 case root:
164 if (not check_root_dependency())
165 {
166 continue;
167 }
168 is_ok = add_root_file(filename, convert_mode);
169 break;
170 case json:
171 is_ok = add_json_file(filename, convert_mode);
172 break;
173 }
174
175 if (is_ok)
176 {
177 spdlog::info("Add the output source {:?}", filename);
178 }
179 else
180 {
181 spdlog::error("The filename {:?} has been already added!", filename);
182 }
183 }
184 }
185} // namespace srs::writer
DataConvertOptions
Manager(workflow::Handler *processor)
auto add_root_file(const std::string &filename, process::DataConvertOptions prev_conversion) -> bool
std::map< std::string, std::unique_ptr< BinaryFile > > binary_files_
auto is_convert_required(process::DataConvertOptions dependee) const -> bool
std::map< std::string, std::unique_ptr< Json > > json_files_
auto generate_conversion_req_map() const -> std::map< process::DataConvertOptions, bool >
void set_output_filenames(const std::vector< std::string > &filenames)
std::map< std::string, std::unique_ptr< UDP > > udp_files_
auto add_udp_file(const std::string &filename, process::DataConvertOptions prev_conversion) -> bool
auto add_binary_file(const std::string &filename, process::DataConvertOptions prev_conversion) -> bool
workflow::Handler * workflow_handler_
std::vector< boost::unique_future< std::optional< std::size_t > > > write_futures_
auto add_json_file(const std::string &filename, process::DataConvertOptions prev_conversion) -> bool
void do_for_each_writer(WriterVisitor auto visitor)
asio::ip::udp udp
Definition main.cpp:9
auto get_filetype_from_filename(std::string_view filename) -> std::tuple< DataWriterOption, process::DataConvertOptions >