SRS-control 0.1.4
Loading...
Searching...
No Matches
TaskDiagram.cpp
Go to the documentation of this file.
2#include "srs/Application.hpp"
6#include <algorithm>
7#include <atomic>
8#include <chrono>
9#include <cstddef>
10#include <fmt/format.h>
11#include <fmt/ranges.h>
12#include <optional>
13#include <ranges>
14#include <spdlog/spdlog.h>
15#include <string>
16#include <string_view>
17#include <taskflow/algorithm/pipeline.hpp>
18#include <taskflow/core/task.hpp>
19#include <taskflow/core/taskflow.hpp>
20#include <thread>
21#include <type_traits>
22#include <utility>
23#include <vector>
24
25#ifdef USE_ONEAPI_TBB
26#include <oneapi/tbb/concurrent_queue.h>
27#else
28#include <tbb/concurrent_queue.h>
29#endif
30
31namespace srs::workflow
32{
33 TaskDiagram::TaskDiagram(Handler* data_processor, std::size_t n_lines)
34 : n_lines_{ n_lines }
35 , is_pipeline_stopped_{ std::vector<std::atomic<bool>>(n_lines_) }
41 , writers_{ data_processor->get_writer() }
42 {
43 for (auto& is_pipe_stopped : is_pipeline_stopped_)
44 {
45 is_pipe_stopped.store(true);
46 }
47 raw_data_.resize(n_lines);
48 }
49
50 void TaskDiagram::run_task(tbb::concurrent_bounded_queue<process::SerializableMsgBuffer>& data_queue,
51 std::size_t line_number)
52 {
53 raw_data_[line_number].clear();
54 data_queue.pop(raw_data_[line_number]);
55 total_read_data_bytes_ += raw_data_[line_number].data().size();
56 }
57
59 tbb::concurrent_bounded_queue<process::SerializableMsgBuffer>& data_queue,
60 const std::atomic<bool>& is_stopped)
61 {
63 for (const auto [line_number, taskflow] : std::views::zip(std::views::iota(0), taskflow_lines_))
64 {
65 construct_taskflow_line(taskflow, static_cast<std::size_t>(line_number));
66 }
67
68 auto starting_task =
70 .emplace(
71 [this]()
72 {
73 const auto& conversion_req_map = writers_->generate_conversion_req_map();
74 spdlog::debug("Starting taskflow with enabled writers");
75 spdlog::debug("Conversion requirements map: \n\t{}",
76 fmt::join(conversion_req_map | std::views::transform(
77 [](const auto conv_req) -> std::string
78 {
79 return fmt::format(
80 "conversion \t {:<20} \t required: {}",
81 magic_enum::enum_name(conv_req.first),
82 conv_req.second);
83 }),
84 "\n\t"));
85 })
86 .name("Starting");
87
88 auto main_pipeline =
89 tf::Pipeline{ n_lines_,
90 tf::Pipe{ tf::PipeType::SERIAL, []([[maybe_unused]] tf::Pipeflow& pipeflow) {} },
91 tf::Pipe{ tf::PipeType::PARALLEL,
92 [this, &data_queue, &is_stopped]([[maybe_unused]] tf::Pipeflow& pipeflow)
93 {
94 if (is_stopped.load())
95 {
96 pipeflow.stop();
97 }
98 else
99 {
100 run_task(data_queue, pipeflow.line());
101 is_pipeline_stopped_[pipeflow.line()].store(false);
102 tf_executor_.corun(taskflow_lines_[pipeflow.line()]);
103 is_pipeline_stopped_[pipeflow.line()].store(true);
104 }
105 } },
106 tf::Pipe{ tf::PipeType::SERIAL, []([[maybe_unused]] tf::Pipeflow& pipeflow) {} } };
107
108 auto pipeline_task = main_taskflow_.composed_of(main_pipeline).name("Main pipeline");
109 starting_task.precede(pipeline_task);
110 tf_executor_.run(main_taskflow_).wait();
111 }
112
114 {
115 static constexpr auto SLEEP_TIME = std::chrono::milliseconds(10);
116 std::this_thread::sleep_for(SLEEP_TIME);
117 auto res = std::ranges::all_of(is_pipeline_stopped_,
118 [](const std::atomic<bool>& is_pipe_stopped) -> bool
119 { return is_pipe_stopped.load(); });
120 return res;
121 }
122
123 void TaskDiagram::construct_taskflow_line(tf::Taskflow& taskflow, std::size_t line_number)
124 {
125 // TODO: Add converter concept here
126 auto create_task = [writers = writers_, line_number, &taskflow]<typename PrevConverter, typename ThisConverter>(
127 ThisConverter& converter,
128 std::optional<std::pair<const PrevConverter&, tf::Task>>& prev_task)
129 -> std::optional<std::pair<const ThisConverter&, tf::Task>>
130 {
131 if (not prev_task.has_value())
132 {
133 return {};
134 }
135 auto is_required = writers->is_convert_required(converter.get_required_conversion());
136 if (not is_required)
137 {
138 return {};
139 }
140 auto task = taskflow
141 .emplace([line_number, &converter, &prev_converter = prev_task.value().first]()
142 { [[maybe_unused]] auto res = converter.run(prev_converter, line_number); })
143 .name(converter.get_name_str());
144 if (not prev_task.value().second.empty())
145 {
146 prev_task.value().second.precede(task);
147 }
148 return std::pair<const ThisConverter&, tf::Task>{ converter, task };
149 };
150
151 auto empty_task = std::optional{ std::pair<const TaskDiagram&, tf::Task>{ *this, tf::Task{} } };
152 auto raw_delimiter_task = create_task(raw_to_delim_raw_converter_, empty_task);
153 auto struct_deser_task = create_task(struct_deserializer_converter_, empty_task);
154 auto struct_to_proto_task = create_task(struct_to_proto_converter_, struct_deser_task);
155 auto proto_deser_task = create_task(proto_serializer_converter_, struct_to_proto_task);
156 auto proto_delim_deser_task = create_task(proto_delim_serializer_converter_, struct_to_proto_task);
157 // TODO: root_deser
158
159 writers_->do_for_each_writer(
160 [&create_task,
161 &empty_task,
162 &raw_delimiter_task,
163 &struct_deser_task,
164 &proto_deser_task,
165 &proto_delim_deser_task](std::string_view filename, auto& file_writer) -> void
166 {
167 if constexpr (std::remove_cvref_t<decltype(file_writer)>::IsStructType)
168 {
169 create_task(file_writer, struct_deser_task);
170 }
171 else
172 {
174 const auto convert_mode = file_writer.get_required_conversion();
175 switch (convert_mode)
176 {
177 case raw:
178 create_task(file_writer, empty_task);
179 break;
180 case raw_frame:
181 create_task(file_writer, raw_delimiter_task);
182 break;
183 case proto:
184 create_task(file_writer, proto_deser_task);
185 break;
186 case proto_frame:
187 create_task(file_writer, proto_delim_deser_task);
188 break;
189 default:
190 spdlog::warn("unrecognized conversion {}from the file {}", convert_mode, filename);
191 create_task(file_writer, empty_task);
192 break;
193 }
194 }
195 });
196 }
197} // namespace srs::workflow
std::atomic< uint64_t > total_read_data_bytes_
process::Raw2DelimRawConverter raw_to_delim_raw_converter_
process::ProtoDelimSerializer proto_delim_serializer_converter_
auto is_taskflow_abort_ready() const -> bool
std::vector< process::SerializableMsgBuffer > raw_data_
process::Struct2ProtoConverter struct_to_proto_converter_
process::ProtoSerializer proto_serializer_converter_
process::StructDeserializer struct_deserializer_converter_
std::vector< std::atomic< bool > > is_pipeline_stopped_
auto create_task(auto &converter, tf::Taskflow &taskflow, std::size_t line_number) -> std::optional< tf::Task >
void construct_taskflow_and_run(tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > &data_queue, const std::atomic< bool > &is_stopped)
void run_task(tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > &data_queue, std::size_t line_number)
std::vector< tf::Taskflow > taskflow_lines_
gsl::not_null< writer::Manager * > writers_
void construct_taskflow_line(tf::Taskflow &taskflow, std::size_t line_number)
TaskDiagram(Handler *data_processor, std::size_t n_lines=1)