SRS-control 0.1.4
Loading...
Searching...
No Matches
TaskDiagram.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "srs/Application.hpp"
10#include <algorithm>
11#include <atomic>
12#include <cstddef>
13#include <cstdint>
14#include <gsl/gsl-lite.hpp>
15#include <optional>
16#include <string_view>
17#include <taskflow/core/executor.hpp>
18#include <taskflow/core/task.hpp>
19#include <taskflow/core/taskflow.hpp>
20#include <vector>
21
22#ifdef USE_ONEAPI_TBB
23#include <oneapi/tbb/concurrent_queue.h>
24#else
25#include <tbb/concurrent_queue.h>
26#endif
27
28namespace srs::workflow
29{
31 {
32 public:
33 explicit TaskDiagram(Handler* data_processor, std::size_t n_lines = 1);
34
35 using InputType = bool;
36 using OutputType = std::size_t;
37 void construct_taskflow_and_run(tbb::concurrent_bounded_queue<process::SerializableMsgBuffer>& data_queue,
38 const std::atomic<bool>& is_stopped);
39 void run_task(tbb::concurrent_bounded_queue<process::SerializableMsgBuffer>& data_queue,
40 std::size_t line_number);
41 void run();
42 auto is_taskflow_abort_ready() const -> bool;
43 [[nodiscard]] auto operator()(std::size_t line_number) const -> std::string_view
44 {
45 return raw_data_[line_number].data();
46 }
47
48 [[nodiscard]] auto get_data_bytes() const -> uint64_t { return total_read_data_bytes_.load(); }
49 [[nodiscard]] auto get_n_lines() const -> std::size_t { return n_lines_; }
50
51 auto get_struct_data() -> const auto* { return struct_deserializer_converter_(0); }
52 // auto generate_coro() -> asio::experimental::coro<std::size_t(bool)>;
53
54 private:
55 std::size_t n_lines_ = 1;
56 tf::Executor tf_executor_;
57 tf::Taskflow main_taskflow_;
58 std::vector<tf::Taskflow> taskflow_lines_;
59 std::vector<std::atomic<bool>> is_pipeline_stopped_;
60 std::vector<process::SerializableMsgBuffer> raw_data_;
62
68
69 std::atomic<uint64_t> total_read_data_bytes_ = 0;
70 gsl::not_null<writer::Manager*> writers_;
71
72 void construct_taskflow_line(tf::Taskflow& taskflow, std::size_t line_number);
73
74 // TODO: Add converter concept here
75 auto create_task(auto& converter, tf::Taskflow& taskflow, std::size_t line_number) -> std::optional<tf::Task>
76 {
77 auto is_required = std::ranges::all_of(converter.DataConvertOptions,
78 [this](auto converter_option)
79 { return writers_->is_convert_required(converter_option); });
80 if (not is_required)
81 {
82 return {};
83 }
84 return taskflow.emplace([this, line_number, &converter]() { converter.run_task(*this, line_number); })
85 .name(converter.get_name_str());
86 }
87 };
88} // namespace srs::workflow
std::atomic< uint64_t > total_read_data_bytes_
process::Raw2DelimRawConverter raw_to_delim_raw_converter_
auto get_n_lines() const -> std::size_t
process::ProtoDelimSerializer proto_delim_serializer_converter_
auto is_taskflow_abort_ready() const -> bool
process::SerializableMsgBuffer binary_data_
std::vector< process::SerializableMsgBuffer > raw_data_
process::Struct2ProtoConverter struct_to_proto_converter_
process::ProtoSerializer proto_serializer_converter_
process::StructDeserializer struct_deserializer_converter_
auto get_data_bytes() const -> uint64_t
std::vector< std::atomic< bool > > is_pipeline_stopped_
auto create_task(auto &converter, tf::Taskflow &taskflow, std::size_t line_number) -> std::optional< tf::Task >
void construct_taskflow_and_run(tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > &data_queue, const std::atomic< bool > &is_stopped)
void run_task(tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > &data_queue, std::size_t line_number)
std::vector< tf::Taskflow > taskflow_lines_
gsl::not_null< writer::Manager * > writers_
void construct_taskflow_line(tf::Taskflow &taskflow, std::size_t line_number)
auto get_struct_data() -> const auto *
TaskDiagram(Handler *data_processor, std::size_t n_lines=1)