59 tbb::concurrent_bounded_queue<process::SerializableMsgBuffer>& data_queue,
60 const std::atomic<bool>& is_stopped)
63 for (
const auto [line_number, taskflow] : std::views::zip(std::views::iota(0),
taskflow_lines_))
73 const auto& conversion_req_map =
writers_->generate_conversion_req_map();
74 spdlog::debug(
"Starting taskflow with enabled writers");
75 spdlog::debug(
"Conversion requirements map: \n\t{}",
76 fmt::join(conversion_req_map | std::views::transform(
77 [](
const auto conv_req) -> std::string
80 "conversion \t {:<20} \t required: {}",
81 magic_enum::enum_name(conv_req.first),
90 tf::Pipe{ tf::PipeType::SERIAL, []([[maybe_unused]] tf::Pipeflow& pipeflow) {} },
91 tf::Pipe{ tf::PipeType::PARALLEL,
92 [
this, &data_queue, &is_stopped]([[maybe_unused]] tf::Pipeflow& pipeflow)
94 if (is_stopped.load())
100 run_task(data_queue, pipeflow.line());
106 tf::Pipe{ tf::PipeType::SERIAL, []([[maybe_unused]] tf::Pipeflow& pipeflow) {} } };
108 auto pipeline_task =
main_taskflow_.composed_of(main_pipeline).name(
"Main pipeline");
109 starting_task.precede(pipeline_task);
126 auto create_task = [writers =
writers_, line_number, &taskflow]<
typename PrevConverter,
typename ThisConverter>(
127 ThisConverter& converter,
128 std::optional<std::pair<const PrevConverter&, tf::Task>>& prev_task)
129 -> std::optional<std::pair<const ThisConverter&, tf::Task>>
131 if (not prev_task.has_value())
135 auto is_required = writers->is_convert_required(converter.get_required_conversion());
141 .emplace([line_number, &converter, &prev_converter = prev_task.value().first]()
142 { [[maybe_unused]] auto res = converter.run(prev_converter, line_number); })
143 .name(converter.get_name_str());
144 if (not prev_task.value().second.empty())
146 prev_task.value().second.precede(task);
148 return std::pair<const ThisConverter&, tf::Task>{ converter, task };
151 auto empty_task = std::optional{ std::pair<const TaskDiagram&, tf::Task>{ *
this, tf::Task{} } };
165 &proto_delim_deser_task](std::string_view filename,
auto& file_writer) ->
void
167 if constexpr (std::remove_cvref_t<
decltype(file_writer)>::IsStructType)
174 const auto convert_mode = file_writer.get_required_conversion();
175 switch (convert_mode)
190 spdlog::warn(
"unrecognized conversion {}from the file {}", convert_mode, filename);