SRS-control 0.1.4
 
Loading...
Searching...
No Matches
DataWriter.cpp
Go to the documentation of this file.
1#include <boost/asio/ip/address.hpp>
2
8
9namespace srs::writer
10{
11 namespace
12 {
13 auto convert_str_to_endpoint(asio::thread_pool& thread_pool, std::string_view ip_port)
14 -> std::optional<asio::ip::udp::endpoint>
15 {
16 const auto colon_pos = ip_port.find(':');
17 if (colon_pos == std::string::npos)
18 {
19 spdlog::critical("Ill format socket string {:?}. Please set it as \"ip:port\"", ip_port);
20 return {};
21 }
22 auto ip_string = ip_port.substr(0, colon_pos);
23 auto port_str = ip_port.substr(colon_pos + 1);
24
25 auto err_code = boost::system::error_code{};
26 auto resolver = asio::ip::udp::resolver{ thread_pool };
27 auto query = asio::ip::udp::resolver::query{ std::string{ ip_string }, std::string{ port_str } };
28 auto iter = resolver.resolve(query, err_code);
29 if (err_code)
30 {
31 spdlog::critical("Cannot query the ip address {:?}. Error code: {}", ip_port, err_code.message());
32 return {};
33 }
34 return *iter;
35 }
36
37 auto check_root_dependency() -> bool
38 {
39#ifdef HAS_ROOT
40 return true;
41#else
42 spdlog::error("Cannot output to a root file. Please make sure the program is "
43 "built with the ROOT library.");
44 return false;
45#endif
46 }
47
48 } // namespace
49
51 : workflow_handler_{ processor }
52 {
53 }
54
55 Manager::~Manager() = default;
56
58 {
59 return std::ranges::any_of(
61 [dependee](const auto& option_count) -> bool
62 { return option_count.second > 0 && convert_option_has_dependency(dependee, option_count.first); });
63 }
64
65 void Manager::wait_for_finished() { boost::wait_for_all(write_futures_.begin(), write_futures_.end()); }
66
67 auto Manager::add_binary_file(const std::string& filename, process::DataConvertOptions deser_mode) -> bool
68 {
69 auto& app = workflow_handler_->get_app();
70 return binary_files_
71 .try_emplace(filename, std::make_unique<BinaryFile>(app.get_io_context(), filename, deser_mode))
72 .second;
73 }
74
75 auto Manager::add_udp_file(const std::string& filename, process::DataConvertOptions deser_mode) -> bool
76 {
77 auto& app = workflow_handler_->get_app();
78 auto endpoint = convert_str_to_endpoint(app.get_io_context(), filename);
79 if (endpoint.has_value())
80 {
81 return udp_files_.try_emplace(filename, std::make_unique<UDP>(app, std::move(endpoint.value()), deser_mode))
82 .second;
83 }
84 return false;
85 }
86
87 // NOLINTNEXTLINE
88 auto Manager::add_root_file(const std::string& filename) -> bool
89 {
90#ifdef HAS_ROOT
91 auto& app = workflow_handler_->get_app();
92 return root_files_
93 .try_emplace(filename, std::make_unique<RootFile>(app.get_io_context(), filename.c_str(), "RECREATE"))
94 .second;
95#else
96 return false;
97#endif
98 }
99
100 auto Manager::add_json_file(const std::string& filename) -> bool
101 {
102 auto& app = workflow_handler_->get_app();
103 return json_files_.try_emplace(filename, std::make_unique<Json>(app.get_io_context(), filename)).second;
104 }
105
106 void Manager::set_output_filenames(const std::vector<std::string>& filenames)
107 {
108 auto& app = workflow_handler_->get_app();
109
110 for (const auto& filename : filenames)
111 {
112 const auto [filetype, convert_mode] = get_filetype_from_filename(filename);
113
114 auto is_ok = false;
115
116 switch (filetype)
117 {
118 case no_output:
119 spdlog::error("Extension of the filename {:?} cannot be recognized!", filename);
120 continue;
121 break;
122 case bin:
123 is_ok = add_binary_file(filename, convert_mode);
124 break;
125 case udp:
126 is_ok = add_udp_file(filename, convert_mode);
127 break;
128 case root:
129 if (not check_root_dependency())
130 {
131 continue;
132 }
133 is_ok = add_root_file(filename);
134 break;
135 case json:
136 is_ok = add_json_file(filename);
137 break;
138 }
139
140 if (is_ok)
141 {
142 spdlog::info("Add the output source {:?}", filename);
143 ++(convert_count_map_.at(convert_mode));
144 }
145 else
146 {
147 spdlog::error("The filename {:?} has been already added!", filename);
148 }
149 }
150 }
151} // namespace srs::writer
Manager(workflow::Handler *processor)
std::map< std::string, std::unique_ptr< BinaryFile > > binary_files_
auto add_binary_file(const std::string &filename, process::DataConvertOptions deser_mode) -> bool
auto add_root_file(const std::string &filename) -> bool
auto is_convert_required(process::DataConvertOptions dependee) const -> bool
std::map< std::string, std::unique_ptr< Json > > json_files_
void set_output_filenames(const std::vector< std::string > &filenames)
std::vector< boost::unique_future< std::optional< int > > > write_futures_
std::map< process::DataConvertOptions, int > convert_count_map_
std::map< std::string, std::unique_ptr< UDP > > udp_files_
workflow::Handler * workflow_handler_
auto add_udp_file(const std::string &filename, process::DataConvertOptions deser_mode) -> bool
auto add_json_file(const std::string &filename) -> bool
auto get_filetype_from_filename(std::string_view filename) -> std::tuple< DataWriterOption, process::DataConvertOptions >