SRS-control 0.1.4
 
Loading...
Searching...
No Matches
TaskDiagram.cpp
Go to the documentation of this file.
6
7namespace srs::workflow
8{
9 TaskDiagram::TaskDiagram(Handler* data_processor, asio::thread_pool& thread_pool)
10 : raw_to_delim_raw_converter_{ thread_pool }
11 , struct_deserializer_{ thread_pool }
12 , struct_proto_converter_{ thread_pool }
13 , proto_serializer_{ thread_pool }
14 , proto_delim_serializer_{ thread_pool }
15 , writers_{ data_processor }
16 {
17 coro_ = generate_starting_coro(thread_pool.get_executor());
18 common::coro_sync_start(coro_, false, asio::use_awaitable);
19 }
20
21 auto TaskDiagram::analysis_one(tbb::concurrent_bounded_queue<process::SerializableMsgBuffer>& data_queue,
22 bool is_blocking) -> bool
23 {
24 auto pop_res = true;
25 reset();
26 if (is_blocking)
27 {
28 data_queue.pop(binary_data_);
29 }
30 else
31 {
32 pop_res = data_queue.try_pop(binary_data_);
33 }
34 if (pop_res)
35 {
36 auto res = run_processes(false);
37 if (not res.has_value())
38 {
39 throw std::runtime_error(fmt::format("{}", res.error()));
40 }
41 }
42 return pop_res;
43 }
44
46 {
47 spdlog::debug("Closing analysis task workflows ...");
48 reset();
49 auto res = run_processes(true);
50 if (not res.has_value())
51 {
52 spdlog::error("{}", res.error());
53 }
54 }
55
56 auto TaskDiagram::run_processes(bool is_stopped) -> std::expected<void, std::string_view>
57 {
58 auto starting_fut = common::create_coro_future(coro_, is_stopped).share();
59 auto raw_to_delim_raw_fut = raw_to_delim_raw_converter_.create_future(starting_fut, writers_);
60 auto struct_deser_fut = struct_deserializer_.create_future(starting_fut, writers_);
61 auto proto_converter_fut = struct_proto_converter_.create_future(struct_deser_fut, writers_);
62 auto proto_deser_fut = proto_serializer_.create_future(proto_converter_fut, writers_);
63 auto proto_delim_deser_fut = proto_delim_serializer_.create_future(proto_converter_fut, writers_);
64
65 if (is_stopped)
66 {
67 spdlog::debug("Shutting down all data writers...");
68 }
69
70 auto make_writer_future = [&](auto& writer)
71 {
72 const auto convert_mode = writer.get_convert_mode();
73 if constexpr (std::remove_cvref_t<decltype(writer)>::IsStructType)
74 {
75 return writer.write(struct_deser_fut);
76 }
77 else
78 {
79 switch (convert_mode)
80 {
81 case raw:
82 return writer.write(starting_fut);
83 case raw_frame:
84 return writer.write(raw_to_delim_raw_fut);
85 case proto:
86 return writer.write(proto_deser_fut);
87 case proto_frame:
88 return writer.write(proto_delim_deser_fut);
89 default:
90 return boost::unique_future<std::optional<int>>{};
91 }
92 }
93 };
94
95 writers_.write_with(make_writer_future);
96 writers_.wait_for_finished();
97 if (is_stopped)
98 {
99 spdlog::info("All data consumers are finished.");
100 }
101 return {};
102 }
103
104 // NOLINTNEXTLINE(readability-static-accessed-through-instance)
105 auto TaskDiagram::generate_starting_coro(asio::any_io_executor /*unused*/) -> StartingCoroType
106 {
107 while (true)
108 {
109 auto data = std::string_view{ binary_data_.data() };
110 auto is_terminated = co_yield (data);
111 if (is_terminated)
112 {
113 spdlog::debug("Shutting down starting coroutine.");
114 co_return;
115 }
116 }
117 }
118
120 {
121 writers_.reset();
122 binary_data_.clear();
123 }
124
125 void TaskDiagram::set_output_filenames(const std::vector<std::string>& filenames)
126 {
127 writers_.set_output_filenames(filenames);
128 }
129} // namespace srs::workflow
TaskDiagram(Handler *data_processor, asio::thread_pool &thread_pool)
process::Raw2DelimRawConverter raw_to_delim_raw_converter_
process::ProtoSerializer proto_serializer_
process::StructDeserializer struct_deserializer_
process::SerializableMsgBuffer binary_data_
void set_output_filenames(const std::vector< std::string > &filenames)
auto generate_starting_coro(asio::any_io_executor) -> StartingCoroType
process::Struct2ProtoConverter struct_proto_converter_
auto analysis_one(tbb::concurrent_bounded_queue< process::SerializableMsgBuffer > &data_queue, bool is_blocking) -> bool
asio::experimental::coro< std::string_view(bool)> StartingCoroType
process::ProtoDelimSerializer proto_delim_serializer_
auto run_processes(bool is_stopped) -> std::expected< void, std::string_view >
auto create_coro_future(auto &coro, auto &&pre_fut)
void coro_sync_start(auto &coro, auto &&... args)