1#include <boost/asio/ip/address.hpp>
13 auto convert_str_to_endpoint(asio::thread_pool& thread_pool, std::string_view ip_port)
14 -> std::optional<asio::ip::udp::endpoint>
16 const auto colon_pos = ip_port.find(
':');
17 if (colon_pos == std::string::npos)
19 spdlog::critical(
"Ill format socket string {:?}. Please set it as \"ip:port\"", ip_port);
22 auto ip_string = ip_port.substr(0, colon_pos);
23 auto port_str = ip_port.substr(colon_pos + 1);
25 auto err_code = boost::system::error_code{};
26 auto resolver = asio::ip::udp::resolver{ thread_pool };
27 auto query = asio::ip::udp::resolver::query{ std::string{ ip_string }, std::string{ port_str } };
28 auto iter = resolver.resolve(query, err_code);
31 spdlog::critical(
"Cannot query the ip address {:?}. Error code: {}", ip_port, err_code.message());
37 auto check_root_dependency() ->
bool
42 spdlog::error(
"Cannot output to a root file. Please make sure the program is "
43 "built with the ROOT library.");
59 return std::ranges::any_of(
61 [dependee](
const auto& option_count) ->
bool
62 {
return option_count.second > 0 && convert_option_has_dependency(dependee, option_count.first); });
71 .try_emplace(filename, std::make_unique<BinaryFile>(app.get_io_context(), filename, deser_mode))
78 auto endpoint = convert_str_to_endpoint(app.get_io_context(), filename);
79 if (endpoint.has_value())
81 return udp_files_.try_emplace(filename, std::make_unique<UDP>(app, std::move(endpoint.value()), deser_mode))
93 .try_emplace(filename, std::make_unique<RootFile>(app.get_io_context(), filename.c_str(),
"RECREATE"))
103 return json_files_.try_emplace(filename, std::make_unique<Json>(app.get_io_context(), filename)).second;
110 for (
const auto& filename : filenames)
119 spdlog::error(
"Extension of the filename {:?} cannot be recognized!", filename);
129 if (not check_root_dependency())
142 spdlog::info(
"Add the output source {:?}", filename);
147 spdlog::error(
"The filename {:?} has been already added!", filename);
Manager(workflow::Handler *processor)
std::map< std::string, std::unique_ptr< BinaryFile > > binary_files_
auto add_binary_file(const std::string &filename, process::DataConvertOptions deser_mode) -> bool
auto add_root_file(const std::string &filename) -> bool
auto is_convert_required(process::DataConvertOptions dependee) const -> bool
std::map< std::string, std::unique_ptr< Json > > json_files_
void set_output_filenames(const std::vector< std::string > &filenames)
std::vector< boost::unique_future< std::optional< int > > > write_futures_
std::map< process::DataConvertOptions, int > convert_count_map_
std::map< std::string, std::unique_ptr< UDP > > udp_files_
workflow::Handler * workflow_handler_
auto add_udp_file(const std::string &filename, process::DataConvertOptions deser_mode) -> bool
auto add_json_file(const std::string &filename) -> bool
auto get_filetype_from_filename(std::string_view filename) -> std::tuple< DataWriterOption, process::DataConvertOptions >