1// This file is part of OpenCV project.
2// It is subject to the license terms in the LICENSE file found in the top-level directory
3// of this distribution and at http://opencv.org/license.html.
4//
5// Copyright (C) 2020-2021 Intel Corporation
6
7#include "gtbbexecutor.hpp"
8
9#if defined(HAVE_TBB) && (TBB_INTERFACE_VERSION < 12000)
10// TODO: TBB task API has been deprecated and removed in 12000
11
12#include "utils/itt.hpp"
13
14#include <opencv2/gapi/own/assert.hpp>
15#include <opencv2/gapi/util/copy_through_move.hpp>
16#include "logger.hpp" // GAPI_LOG
17
18#include <tbb/task.h>
19#include <memory> // unique_ptr
20
21#include <atomic>
22#include <condition_variable>
23
24#include <chrono>
25
26#define ASSERT(expr) GAPI_DbgAssert(expr)
27
28#define LOG_INFO(tag, ...) GAPI_LOG_INFO(tag, __VA_ARGS__)
29#define LOG_WARNING(tag, ...) GAPI_LOG_WARNING(tag, __VA_ARGS__)
30#define LOG_DEBUG(tag, ...) GAPI_LOG_DEBUG(tag, __VA_ARGS__)
31
32
33namespace cv { namespace gimpl { namespace parallel {
34
35namespace detail {
36// some helper staff to deal with tbb::task related entities
37namespace tasking {
38
39enum class use_tbb_scheduler_bypass {
40 NO,
41 YES
42};
43
44inline void assert_graph_is_running(tbb::task* root) {
45 // tbb::task::wait_for_all block calling thread until task ref_count is dropped to 1
46 // So if the root task ref_count is greater than 1 graph still has a job to do and
47 // according wait_for_all() has not yet returned
48 ASSERT(root->ref_count() > 1);
49}
50
51// made template to break circular dependencies
52template<typename body_t>
53struct functor_task : tbb::task {
54 body_t body;
55
56 template<typename arg_t>
57 functor_task(arg_t&& a) : body(std::forward<arg_t>(a)) {}
58
59 tbb::task * execute() override {
60 assert_graph_is_running(parent());
61
62 auto reuse_current_task = body();
63 // if needed, say TBB to execute current task once again
64 return (use_tbb_scheduler_bypass::YES == reuse_current_task) ? (recycle_as_continuation(), this) : nullptr;
65 }
66 ~functor_task() {
67 assert_graph_is_running(parent());
68 }
69};
70
71template<typename body_t>
72auto allocate_task(tbb::task* root, body_t const& body) -> functor_task<body_t>* {
73 return new(tbb::task::allocate_additional_child_of(*root)) functor_task<body_t>{body};
74}
75
76template<typename body_t>
77void spawn_no_assert(tbb::task* root, body_t const& body) {
78 tbb::task::spawn(* allocate_task(root, body));
79}
80
81template<typename body_t>
82void batch_spawn(size_t count, tbb::task* root, body_t const& body, bool do_assert_graph_is_running = true) {
83 GAPI_ITT_STATIC_LOCAL_HANDLE(ittTbbSpawnReadyBlocks, "spawn ready blocks");
84 GAPI_ITT_AUTO_TRACE_GUARD(ittTbbSpawnReadyBlocks);
85 if (do_assert_graph_is_running) {
86 assert_graph_is_running(root);
87 }
88
89 for (size_t i=0; i<count; i++) {
90 spawn_no_assert(root, body);
91 }
92}
93
94
95struct destroy_tbb_task {
96 void operator()(tbb::task* t) const { if (t) tbb::task::destroy(*t);};
97};
98
99using root_t = std::unique_ptr<tbb::task, destroy_tbb_task>;
100
101root_t inline create_root(tbb::task_group_context& ctx) {
102 root_t root{new (tbb::task::allocate_root(ctx)) tbb::empty_task};
103 root->set_ref_count(1); // required by wait_for_all, as it waits until counter drops to 1
104 return root;
105}
106
107std::size_t inline tg_context_traits() {
108 // Specify tbb::task_group_context::concurrent_wait in the traits to ask TBB scheduler not to change
109 // ref_count of the task we wait on (root) when wait is complete.
110 return tbb::task_group_context::default_traits | tbb::task_group_context::concurrent_wait;
111}
112
113} // namespace tasking
114
115namespace async {
116struct async_tasks_t {
117 std::atomic<size_t> count {0};
118 std::condition_variable cv;
119 std::mutex mtx;
120};
121
122enum class wake_tbb_master {
123 NO,
124 YES
125};
126
127void inline wake_master(async_tasks_t& async_tasks, wake_tbb_master wake_master) {
128 // TODO: seems that this can be relaxed
129 auto active_async_tasks = --async_tasks.count;
130
131 if ((active_async_tasks == 0) || (wake_master == wake_tbb_master::YES)) {
132 // Was the last async task or asked to wake TBB master up(e.g. there are new TBB tasks to execute)
133 GAPI_ITT_STATIC_LOCAL_HANDLE(ittTbbUnlockMasterThread, "Unlocking master thread");
134 GAPI_ITT_AUTO_TRACE_GUARD(ittTbbUnlockMasterThread);
135 // While decrement of async_tasks_t::count is atomic, it might occur after the waiting
136 // thread has read its value but _before_ it actually starts waiting on the condition variable.
137 // So, lock acquire is needed to guarantee that current condition check (if any) in the waiting thread
138 // (possibly ran in parallel to async_tasks_t::count decrement above) is completed _before_ signal is issued.
139 // Therefore when notify_one is called, waiting thread is either sleeping on the condition variable or
140 // running a new check which is guaranteed to pick the new value and return from wait().
141
142 // There is no need to _hold_ the lock while signaling, only to acquire it.
143 std::unique_lock<std::mutex> {async_tasks.mtx}; // Acquire and release the lock.
144 async_tasks.cv.notify_one();
145 }
146}
147
148struct master_thread_sleep_lock_t
149{
150 struct sleep_unlock {
151 void operator()(async_tasks_t* t) const {
152 ASSERT(t);
153 wake_master(*t, wake_tbb_master::NO);
154 }
155 };
156
157 std::unique_ptr<async_tasks_t, sleep_unlock> guard;
158
159 master_thread_sleep_lock_t() = default;
160 master_thread_sleep_lock_t(async_tasks_t* async_tasks_ptr) : guard(async_tasks_ptr) {
161 // TODO: seems that this can be relaxed
162 ++(guard->count);
163 }
164
165 void unlock(wake_tbb_master wake) {
166 if (auto* p = guard.release()) {
167 wake_master(*p, wake);
168 }
169 }
170};
171
172master_thread_sleep_lock_t inline lock_sleep_master(async_tasks_t& async_tasks) {
173 return {&async_tasks};
174}
175
176enum class is_tbb_work_present {
177 NO,
178 YES
179};
180
181//RAII object to block TBB master thread (one that does wait_for_all())
182//N.B. :wait_for_all() return control when root ref_count drops to 1,
183struct root_wait_lock_t {
184 struct root_decrement_ref_count{
185 void operator()(tbb::task* t) const {
186 ASSERT(t);
187 auto result = t->decrement_ref_count();
188 ASSERT(result >= 1);
189 }
190 };
191
192 std::unique_ptr<tbb::task, root_decrement_ref_count> guard;
193
194 root_wait_lock_t() = default;
195 root_wait_lock_t(tasking::root_t& root, is_tbb_work_present& previous_state) : guard{root.get()} {
196 // Block the master thread while the *this object is alive.
197 auto new_root_ref_count = root->add_ref_count(1);
198 previous_state = (new_root_ref_count == 2) ? is_tbb_work_present::NO : is_tbb_work_present::YES;
199 }
200
201};
202
203root_wait_lock_t inline lock_wait_master(tasking::root_t& root, is_tbb_work_present& previous_state) {
204 return root_wait_lock_t{root, previous_state};
205}
206
207} // namespace async
208
209inline tile_node* pop(prio_items_queue_t& q) {
210 tile_node* node = nullptr;
211 bool popped = q.try_pop(node);
212 ASSERT(popped && "queue should be non empty as we push items to it before we spawn");
213 return node;
214}
215
216namespace graph {
217 // Returns : number of items actually pushed into the q
218 std::size_t inline push_ready_dependants(prio_items_queue_t& q, tile_node* node) {
219 GAPI_ITT_STATIC_LOCAL_HANDLE(ittTbbAddReadyBlocksToQueue, "add ready blocks to queue");
220 GAPI_ITT_AUTO_TRACE_GUARD(ittTbbAddReadyBlocksToQueue);
221 std::size_t ready_items = 0;
222 // enable dependent tasks
223 for (auto* dependant : node->dependants) {
224 // fetch_and_sub returns previous value
225 if (1 == dependant->dependency_count.fetch_sub(1)) {
226 // tile node is ready for execution, add it to the queue
227 q.push(dependant);
228 ++ready_items;
229 }
230 }
231 return ready_items;
232 }
233
234 struct exec_ctx {
235 tbb::task_arena& arena;
236 prio_items_queue_t& q;
237 tbb::task_group_context tg_ctx;
238 tasking::root_t root;
239 detail::async::async_tasks_t async_tasks;
240 std::atomic<size_t> executed {0};
241
242 exec_ctx(tbb::task_arena& arena_, prio_items_queue_t& q_)
243 : arena(arena_), q(q_),
244 // As the traits is last argument, explicitly specify (default) value for first argument
245 tg_ctx{tbb::task_group_context::bound, tasking::tg_context_traits()},
246 root(tasking::create_root(tg_ctx))
247 {}
248 };
249
250 // At the moment there are no suitable tools to manage TBB priorities on task by task basis.
251 // Instead priority queue is used to respect tile_node priorities.
252 // As well, TBB task is not bound to any particular tile_node until actually executed.
253
254 // Strictly speaking there are two graphs here:
255 // - G-API one, described by the connected tile_node instances.
256 // This graph is :
257 // - Known beforehand, and do not change during the execution (i.e. static)
258 // - Contains both TBB non-TBB parts
259 // - prioritized, (i.e. all nodes has assigned priority of execution)
260 //
261 // - TBB task tree, which is :
262 // - flat (Has only two levels : root and leaves)
263 // - dynamic, i.e. new leaves are added on demand when new tbb tasks are spawned
264 // - describes only TBB/CPU part of the whole graph
265 // - non-prioritized (i.e. all tasks are created equal)
266
267 // Class below represents TBB task payload.
268 //
269 // Each instance basically does the three things :
270 // 1. Gets the tile_node item from the top of the queue
271 // 2. Executes its body
272 // 3. Pushes dependent tile_nodes to the queue once they are ready
273 //
274 struct task_body {
275 exec_ctx& ctx;
276
277 std::size_t push_ready_dependants(tile_node* node) const {
278 return graph::push_ready_dependants(ctx.q, node);
279 }
280
281 void spawn_clones(std::size_t items) const {
282 tasking::batch_spawn(items, ctx.root.get(), *this);
283 }
284
285 task_body(exec_ctx& ctx_) : ctx(ctx_) {}
286 tasking::use_tbb_scheduler_bypass operator()() const {
287 ASSERT(!ctx.q.empty() && "Spawned task with no job to do ? ");
288
289 tile_node* node = detail::pop(ctx.q);
290
291 auto result = tasking::use_tbb_scheduler_bypass::NO;
292 // execute the task
293
294 if (auto p = util::get_if<tile_node::sync_task_body>(&(node->task_body))) {
295 // synchronous task
296 p->body();
297
298 std::size_t ready_items = push_ready_dependants(node);
299
300 if (ready_items > 0) {
301 // spawn one less tasks and say TBB to reuse(recycle) current task
302 spawn_clones(ready_items - 1);
303 result = tasking::use_tbb_scheduler_bypass::YES;
304 }
305 }
306 else {
307 LOG_DEBUG(NULL, "Async task");
308 using namespace detail::async;
309 using util::copy_through_move;
310
311 auto block_master = copy_through_move(lock_sleep_master(ctx.async_tasks));
312
313 auto self_copy = *this;
314 auto callback = [node, block_master, self_copy] () mutable /*due to block_master.get().unlock()*/ {
315 LOG_DEBUG(NULL, "Async task callback is called");
316 // Implicitly unlock master right in the end of callback
317 auto master_sleep_lock = std::move(block_master);
318 std::size_t ready_items = self_copy.push_ready_dependants(node);
319 if (ready_items > 0) {
320 auto master_was_active = is_tbb_work_present::NO;
321 {
322 GAPI_ITT_STATIC_LOCAL_HANDLE(ittTbbEnqueueSpawnReadyBlocks, "enqueueing a spawn of ready blocks");
323 GAPI_ITT_AUTO_TRACE_GUARD(ittTbbEnqueueSpawnReadyBlocks);
324 // Force master thread (one that does wait_for_all()) to (actively) wait for enqueued tasks
325 // and unlock it right after all dependent tasks are spawned.
326
327 auto root_wait_lock = copy_through_move(lock_wait_master(self_copy.ctx.root, master_was_active));
328
329 // TODO: add test to cover proper holding of root_wait_lock
330 // As the calling thread most likely is not TBB one, instead of spawning TBB tasks directly we
331 // enqueue a task which will spawn them.
332 // For master thread to not leave wait_for_all() prematurely,
333 // hold the root_wait_lock until need tasks are actually spawned.
334 self_copy.ctx.arena.enqueue([ready_items, self_copy, root_wait_lock]() {
335 self_copy.spawn_clones(ready_items);
336 // TODO: why we need this? Either write a descriptive comment or remove it
337 volatile auto unused = root_wait_lock.get().guard.get();
338 util::suppress_unused_warning(unused);
339 });
340 }
341 // Wake master thread (if any) to pick up the enqueued tasks iff:
342 // 1. there is new TBB work to do, and
343 // 2. Master thread was sleeping on condition variable waiting for async tasks to complete
344 // (There was no active work before (i.e. root->ref_count() was == 1))
345 auto wake_master = (master_was_active == is_tbb_work_present::NO) ?
346 wake_tbb_master::YES : wake_tbb_master::NO;
347 master_sleep_lock.get().unlock(wake_master);
348 }
349 };
350
351 auto& body = util::get<tile_node::async_task_body>(node->task_body).body;
352 body(std::move(callback), node->total_order_index);
353 }
354
355 ctx.executed++;
356 // reset dependency_count to initial state to simplify re-execution of the same graph
357 node->dependency_count = node->dependencies;
358
359 return result;
360 }
361 };
362}
363} // namespace detail
364}}} // namespace cv::gimpl::parallel
365
366void cv::gimpl::parallel::execute(prio_items_queue_t& q) {
367 // get the reference to current task_arena (i.e. one we are running in)
368#if TBB_INTERFACE_VERSION > 9002
369 using attach_t = tbb::task_arena::attach;
370#else
371 using attach_t = tbb::internal::attach;
372#endif
373
374 tbb::task_arena arena{attach_t{}};
375 execute(q, arena);
376}
377
378void cv::gimpl::parallel::execute(prio_items_queue_t& q, tbb::task_arena& arena) {
379 using namespace detail;
380 graph::exec_ctx ctx{arena, q};
381
382 arena.execute(
383 [&]() {
384 // Passed in queue is assumed to contain starting tasks, i.e. ones with no (or resolved) dependencies
385 auto num_start_tasks = q.size();
386
387 // TODO: use recursive spawning and task soft affinity for faster task distribution
388 // As graph is starting and no task has been spawned yet
389 // assert_graph_is_running(root) will not hold, so spawn without assert
390 tasking::batch_spawn(num_start_tasks, ctx.root.get(), graph::task_body{ctx}, /* assert_graph_is_running*/false);
391
392 using namespace std::chrono;
393 high_resolution_clock timer;
394
395 auto tbb_work_done = [&ctx]() { return 1 == ctx.root->ref_count(); };
396 auto async_work_done = [&ctx]() { return 0 == ctx.async_tasks.count; };
397 do {
398 // First participate in execution of TBB graph till there are no more ready tasks.
399 ctx.root->wait_for_all();
400
401 if (!async_work_done()) { // Wait on the conditional variable iff there is active async work
402 auto start = timer.now();
403 std::unique_lock<std::mutex> lk(ctx.async_tasks.mtx);
404 // Wait (probably by sleeping) until all async tasks are completed or new TBB tasks are created.
405 // FIXME: Use TBB resumable tasks here to avoid blocking TBB thread
406 ctx.async_tasks.cv.wait(lk, [&]{return async_work_done() || !tbb_work_done() ;});
407
408 LOG_INFO(NULL, "Slept for " << duration_cast<milliseconds>(timer.now() - start).count() << " ms \n");
409 }
410 }
411 while(!tbb_work_done() || !async_work_done());
412
413 ASSERT(tbb_work_done() && async_work_done() && "Graph is still running?");
414 }
415 );
416
417 LOG_INFO(NULL, "Done. Executed " << ctx.executed << " tasks");
418}
419
420std::ostream& cv::gimpl::parallel::operator<<(std::ostream& o, tile_node const& n) {
421 o << "("
422 << " at:" << &n << ","
423 << "indx: " << n.total_order_index << ","
424 << "deps #:" << n.dependency_count.value << ", "
425 << "prods:" << n.dependants.size();
426
427 o << "[";
428 for (auto* d: n.dependants) {
429 o << d << ",";
430 }
431 o << "]";
432
433 o << ")";
434 return o;
435}
436
437#endif // HAVE_TBB && TBB_INTERFACE_VERSION
438

source code of opencv/modules/gapi/src/executor/gtbbexecutor.cpp