4#include <boost/asio/ip/udp.hpp>
5#include <boost/asio/thread_pool.hpp>
6#include <boost/system/detail/error_code.hpp>
7#include <boost/thread/futures/wait_for_all.hpp>
8#include <magic_enum/magic_enum.hpp>
13#include <spdlog/spdlog.h>
28 auto convert_str_to_endpoint(asio::thread_pool& thread_pool, std::string_view ip_port)
29 -> std::optional<asio::ip::udp::endpoint>
31 const auto question_pos = ip_port.find(
'?');
32 const auto colon_pos = ip_port.find(
':');
33 if (colon_pos == std::string::npos)
35 spdlog::critical(
"Ill format socket string {:?}. Please set it as \"ip:port\"", ip_port);
39 (question_pos == std::string::npos) ? ip_port.substr(0, colon_pos) : ip_port.substr(0, question_pos);
40 auto port_str = ip_port.substr(colon_pos + 1);
42 auto err_code = boost::system::error_code{};
43 auto resolver = asio::ip::udp::resolver{ thread_pool };
44 auto iter = resolver.resolve(std::string{ ip_string }, std::string{ port_str }, err_code).begin();
47 spdlog::critical(
"Cannot query the ip address {:?}. Error code: {}", ip_port, err_code.message());
53 auto check_root_dependency() ->
bool
58 spdlog::error(
"Cannot output to a root file. Please make sure the program is "
59 "built with the ROOT library.");
75 auto is_required =
false;
77 [&is_required, dependee](std::string_view,
auto&
writer)
78 { is_required |= convert_option_has_dependency(dependee,
writer.get_required_conversion()); });
84 constexpr auto conversions = magic_enum::enum_values<process::DataConvertOptions>();
85 return std::views::transform(conversions,
86 [
this](
const auto conversion) -> std::pair<process::DataConvertOptions, bool>
88 std::ranges::to<std::map<process::DataConvertOptions, bool>>();
96 .try_emplace(filename,
97 std::make_unique<BinaryFile>(filename, prev_conversion,
workflow_handler_->get_n_lines()))
104 auto endpoint = convert_str_to_endpoint(app.get_io_context(), filename);
105 if (endpoint.has_value())
108 .try_emplace(filename,
109 std::make_unique<UDP>(app.get_io_context(),
110 std::move(endpoint.value()),
125 std::make_unique<RootFile>(filename.c_str(), prev_conversion,
workflow_handler_->get_n_lines()))
135 .try_emplace(filename, std::make_unique<Json>(filename, prev_conversion,
workflow_handler_->get_n_lines()))
141 for (
const auto& filename : filenames)
143 if (filename.empty())
154 spdlog::error(
"Extension of the filename {:?} cannot be recognized!", filename);
164 if (not check_root_dependency())
177 spdlog::info(
"Add the output source {:?}", filename);
181 spdlog::error(
"The filename {:?} has been already added!", filename);
Manager(workflow::Handler *processor)
auto add_root_file(const std::string &filename, process::DataConvertOptions prev_conversion) -> bool
std::map< std::string, std::unique_ptr< BinaryFile > > binary_files_
auto is_convert_required(process::DataConvertOptions dependee) const -> bool
std::map< std::string, std::unique_ptr< Json > > json_files_
auto generate_conversion_req_map() const -> std::map< process::DataConvertOptions, bool >
void set_output_filenames(const std::vector< std::string > &filenames)
std::map< std::string, std::unique_ptr< UDP > > udp_files_
auto add_udp_file(const std::string &filename, process::DataConvertOptions prev_conversion) -> bool
auto add_binary_file(const std::string &filename, process::DataConvertOptions prev_conversion) -> bool
workflow::Handler * workflow_handler_
std::vector< boost::unique_future< std::optional< std::size_t > > > write_futures_
auto add_json_file(const std::string &filename, process::DataConvertOptions prev_conversion) -> bool
void do_for_each_writer(WriterVisitor auto visitor)
auto get_filetype_from_filename(std::string_view filename) -> std::tuple< DataWriterOption, process::DataConvertOptions >