14#include <gsl/gsl-lite.hpp>
17#include <taskflow/core/executor.hpp>
18#include <taskflow/core/task.hpp>
19#include <taskflow/core/taskflow.hpp>
23#include <oneapi/tbb/concurrent_queue.h>
25#include <tbb/concurrent_queue.h>
38 const std::atomic<bool>& is_stopped);
39 void run_task(tbb::concurrent_bounded_queue<process::SerializableMsgBuffer>& data_queue,
40 std::size_t line_number);
43 [[nodiscard]] auto operator()(std::
size_t line_number) const -> std::string_view
60 std::vector<process::SerializableMsgBuffer>
raw_data_;
75 auto create_task(
auto& converter, tf::Taskflow& taskflow, std::size_t line_number) -> std::optional<tf::Task>
77 auto is_required = std::ranges::all_of(converter.DataConvertOptions,
78 [
this](
auto converter_option)
79 { return writers_->is_convert_required(converter_option); });
84 return taskflow.emplace([
this, line_number, &converter]() { converter.run_task(*
this, line_number); })
85 .name(converter.get_name_str());
std::atomic< uint64_t > total_read_data_bytes_
process::Raw2DelimRawConverter raw_to_delim_raw_converter_
auto get_n_lines() const -> std::size_t
process::ProtoDelimSerializer proto_delim_serializer_converter_
auto is_taskflow_abort_ready() const -> bool
process::SerializableMsgBuffer binary_data_
tf::Taskflow main_taskflow_
std::vector< process::SerializableMsgBuffer > raw_data_
process::Struct2ProtoConverter struct_to_proto_converter_
process::ProtoSerializer proto_serializer_converter_
process::StructDeserializer struct_deserializer_converter_
auto get_data_bytes() const -> uint64_t
std::vector< std::atomic< bool > > is_pipeline_stopped_
auto create_task(auto &converter, tf::Taskflow &taskflow, std::size_t line_number) -> std::optional< tf::Task >
void construct_taskflow_and_run(tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > &data_queue, const std::atomic< bool > &is_stopped)
void run_task(tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > &data_queue, std::size_t line_number)
std::vector< tf::Taskflow > taskflow_lines_
gsl::not_null< writer::Manager * > writers_
tf::Executor tf_executor_
void construct_taskflow_line(tf::Taskflow &taskflow, std::size_t line_number)
auto get_struct_data() -> const auto *
TaskDiagram(Handler *data_processor, std::size_t n_lines=1)