| 1 | // This file is part of OpenCV project. |
| 2 | // It is subject to the license terms in the LICENSE file found in the top-level directory |
| 3 | // of this distribution and at http://opencv.org/license.html. |
| 4 | // |
| 5 | // Copyright (C) 2020-2021 Intel Corporation |
| 6 | |
| 7 | #include "gtbbexecutor.hpp" |
| 8 | |
| 9 | #if defined(HAVE_TBB) && (TBB_INTERFACE_VERSION < 12000) |
| 10 | // TODO: TBB task API has been deprecated and removed in 12000 |
| 11 | |
| 12 | #include "utils/itt.hpp" |
| 13 | |
| 14 | #include <opencv2/gapi/own/assert.hpp> |
| 15 | #include <opencv2/gapi/util/copy_through_move.hpp> |
| 16 | #include "logger.hpp" // GAPI_LOG |
| 17 | |
| 18 | #include <tbb/task.h> |
| 19 | #include <memory> // unique_ptr |
| 20 | |
| 21 | #include <atomic> |
| 22 | #include <condition_variable> |
| 23 | |
| 24 | #include <chrono> |
| 25 | |
| 26 | #define ASSERT(expr) GAPI_DbgAssert(expr) |
| 27 | |
| 28 | #define LOG_INFO(tag, ...) GAPI_LOG_INFO(tag, __VA_ARGS__) |
| 29 | #define LOG_WARNING(tag, ...) GAPI_LOG_WARNING(tag, __VA_ARGS__) |
| 30 | #define LOG_DEBUG(tag, ...) GAPI_LOG_DEBUG(tag, __VA_ARGS__) |
| 31 | |
| 32 | |
| 33 | namespace cv { namespace gimpl { namespace parallel { |
| 34 | |
| 35 | namespace detail { |
| 36 | // some helper staff to deal with tbb::task related entities |
| 37 | namespace tasking { |
| 38 | |
| 39 | enum class use_tbb_scheduler_bypass { |
| 40 | NO, |
| 41 | YES |
| 42 | }; |
| 43 | |
| 44 | inline void assert_graph_is_running(tbb::task* root) { |
| 45 | // tbb::task::wait_for_all block calling thread until task ref_count is dropped to 1 |
| 46 | // So if the root task ref_count is greater than 1 graph still has a job to do and |
| 47 | // according wait_for_all() has not yet returned |
| 48 | ASSERT(root->ref_count() > 1); |
| 49 | } |
| 50 | |
| 51 | // made template to break circular dependencies |
| 52 | template<typename body_t> |
| 53 | struct functor_task : tbb::task { |
| 54 | body_t body; |
| 55 | |
| 56 | template<typename arg_t> |
| 57 | functor_task(arg_t&& a) : body(std::forward<arg_t>(a)) {} |
| 58 | |
| 59 | tbb::task * execute() override { |
| 60 | assert_graph_is_running(parent()); |
| 61 | |
| 62 | auto reuse_current_task = body(); |
| 63 | // if needed, say TBB to execute current task once again |
| 64 | return (use_tbb_scheduler_bypass::YES == reuse_current_task) ? (recycle_as_continuation(), this) : nullptr; |
| 65 | } |
| 66 | ~functor_task() { |
| 67 | assert_graph_is_running(parent()); |
| 68 | } |
| 69 | }; |
| 70 | |
| 71 | template<typename body_t> |
| 72 | auto allocate_task(tbb::task* root, body_t const& body) -> functor_task<body_t>* { |
| 73 | return new(tbb::task::allocate_additional_child_of(*root)) functor_task<body_t>{body}; |
| 74 | } |
| 75 | |
| 76 | template<typename body_t> |
| 77 | void spawn_no_assert(tbb::task* root, body_t const& body) { |
| 78 | tbb::task::spawn(* allocate_task(root, body)); |
| 79 | } |
| 80 | |
| 81 | template<typename body_t> |
| 82 | void batch_spawn(size_t count, tbb::task* root, body_t const& body, bool do_assert_graph_is_running = true) { |
| 83 | GAPI_ITT_STATIC_LOCAL_HANDLE(ittTbbSpawnReadyBlocks, "spawn ready blocks" ); |
| 84 | GAPI_ITT_AUTO_TRACE_GUARD(ittTbbSpawnReadyBlocks); |
| 85 | if (do_assert_graph_is_running) { |
| 86 | assert_graph_is_running(root); |
| 87 | } |
| 88 | |
| 89 | for (size_t i=0; i<count; i++) { |
| 90 | spawn_no_assert(root, body); |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | |
| 95 | struct destroy_tbb_task { |
| 96 | void operator()(tbb::task* t) const { if (t) tbb::task::destroy(*t);}; |
| 97 | }; |
| 98 | |
| 99 | using root_t = std::unique_ptr<tbb::task, destroy_tbb_task>; |
| 100 | |
| 101 | root_t inline create_root(tbb::task_group_context& ctx) { |
| 102 | root_t root{new (tbb::task::allocate_root(ctx)) tbb::empty_task}; |
| 103 | root->set_ref_count(1); // required by wait_for_all, as it waits until counter drops to 1 |
| 104 | return root; |
| 105 | } |
| 106 | |
| 107 | std::size_t inline tg_context_traits() { |
| 108 | // Specify tbb::task_group_context::concurrent_wait in the traits to ask TBB scheduler not to change |
| 109 | // ref_count of the task we wait on (root) when wait is complete. |
| 110 | return tbb::task_group_context::default_traits | tbb::task_group_context::concurrent_wait; |
| 111 | } |
| 112 | |
| 113 | } // namespace tasking |
| 114 | |
| 115 | namespace async { |
| 116 | struct async_tasks_t { |
| 117 | std::atomic<size_t> count {0}; |
| 118 | std::condition_variable cv; |
| 119 | std::mutex mtx; |
| 120 | }; |
| 121 | |
| 122 | enum class wake_tbb_master { |
| 123 | NO, |
| 124 | YES |
| 125 | }; |
| 126 | |
| 127 | void inline wake_master(async_tasks_t& async_tasks, wake_tbb_master wake_master) { |
| 128 | // TODO: seems that this can be relaxed |
| 129 | auto active_async_tasks = --async_tasks.count; |
| 130 | |
| 131 | if ((active_async_tasks == 0) || (wake_master == wake_tbb_master::YES)) { |
| 132 | // Was the last async task or asked to wake TBB master up(e.g. there are new TBB tasks to execute) |
| 133 | GAPI_ITT_STATIC_LOCAL_HANDLE(ittTbbUnlockMasterThread, "Unlocking master thread" ); |
| 134 | GAPI_ITT_AUTO_TRACE_GUARD(ittTbbUnlockMasterThread); |
| 135 | // While decrement of async_tasks_t::count is atomic, it might occur after the waiting |
| 136 | // thread has read its value but _before_ it actually starts waiting on the condition variable. |
| 137 | // So, lock acquire is needed to guarantee that current condition check (if any) in the waiting thread |
| 138 | // (possibly ran in parallel to async_tasks_t::count decrement above) is completed _before_ signal is issued. |
| 139 | // Therefore when notify_one is called, waiting thread is either sleeping on the condition variable or |
| 140 | // running a new check which is guaranteed to pick the new value and return from wait(). |
| 141 | |
| 142 | // There is no need to _hold_ the lock while signaling, only to acquire it. |
| 143 | std::unique_lock<std::mutex> {async_tasks.mtx}; // Acquire and release the lock. |
| 144 | async_tasks.cv.notify_one(); |
| 145 | } |
| 146 | } |
| 147 | |
| 148 | struct master_thread_sleep_lock_t |
| 149 | { |
| 150 | struct sleep_unlock { |
| 151 | void operator()(async_tasks_t* t) const { |
| 152 | ASSERT(t); |
| 153 | wake_master(*t, wake_tbb_master::NO); |
| 154 | } |
| 155 | }; |
| 156 | |
| 157 | std::unique_ptr<async_tasks_t, sleep_unlock> guard; |
| 158 | |
| 159 | master_thread_sleep_lock_t() = default; |
| 160 | master_thread_sleep_lock_t(async_tasks_t* async_tasks_ptr) : guard(async_tasks_ptr) { |
| 161 | // TODO: seems that this can be relaxed |
| 162 | ++(guard->count); |
| 163 | } |
| 164 | |
| 165 | void unlock(wake_tbb_master wake) { |
| 166 | if (auto* p = guard.release()) { |
| 167 | wake_master(*p, wake); |
| 168 | } |
| 169 | } |
| 170 | }; |
| 171 | |
| 172 | master_thread_sleep_lock_t inline lock_sleep_master(async_tasks_t& async_tasks) { |
| 173 | return {&async_tasks}; |
| 174 | } |
| 175 | |
| 176 | enum class is_tbb_work_present { |
| 177 | NO, |
| 178 | YES |
| 179 | }; |
| 180 | |
| 181 | //RAII object to block TBB master thread (one that does wait_for_all()) |
| 182 | //N.B. :wait_for_all() return control when root ref_count drops to 1, |
| 183 | struct root_wait_lock_t { |
| 184 | struct root_decrement_ref_count{ |
| 185 | void operator()(tbb::task* t) const { |
| 186 | ASSERT(t); |
| 187 | auto result = t->decrement_ref_count(); |
| 188 | ASSERT(result >= 1); |
| 189 | } |
| 190 | }; |
| 191 | |
| 192 | std::unique_ptr<tbb::task, root_decrement_ref_count> guard; |
| 193 | |
| 194 | root_wait_lock_t() = default; |
| 195 | root_wait_lock_t(tasking::root_t& root, is_tbb_work_present& previous_state) : guard{root.get()} { |
| 196 | // Block the master thread while the *this object is alive. |
| 197 | auto new_root_ref_count = root->add_ref_count(1); |
| 198 | previous_state = (new_root_ref_count == 2) ? is_tbb_work_present::NO : is_tbb_work_present::YES; |
| 199 | } |
| 200 | |
| 201 | }; |
| 202 | |
| 203 | root_wait_lock_t inline lock_wait_master(tasking::root_t& root, is_tbb_work_present& previous_state) { |
| 204 | return root_wait_lock_t{root, previous_state}; |
| 205 | } |
| 206 | |
| 207 | } // namespace async |
| 208 | |
| 209 | inline tile_node* pop(prio_items_queue_t& q) { |
| 210 | tile_node* node = nullptr; |
| 211 | bool popped = q.try_pop(node); |
| 212 | ASSERT(popped && "queue should be non empty as we push items to it before we spawn" ); |
| 213 | return node; |
| 214 | } |
| 215 | |
| 216 | namespace graph { |
| 217 | // Returns : number of items actually pushed into the q |
| 218 | std::size_t inline push_ready_dependants(prio_items_queue_t& q, tile_node* node) { |
| 219 | GAPI_ITT_STATIC_LOCAL_HANDLE(ittTbbAddReadyBlocksToQueue, "add ready blocks to queue" ); |
| 220 | GAPI_ITT_AUTO_TRACE_GUARD(ittTbbAddReadyBlocksToQueue); |
| 221 | std::size_t ready_items = 0; |
| 222 | // enable dependent tasks |
| 223 | for (auto* dependant : node->dependants) { |
| 224 | // fetch_and_sub returns previous value |
| 225 | if (1 == dependant->dependency_count.fetch_sub(1)) { |
| 226 | // tile node is ready for execution, add it to the queue |
| 227 | q.push(dependant); |
| 228 | ++ready_items; |
| 229 | } |
| 230 | } |
| 231 | return ready_items; |
| 232 | } |
| 233 | |
| 234 | struct exec_ctx { |
| 235 | tbb::task_arena& arena; |
| 236 | prio_items_queue_t& q; |
| 237 | tbb::task_group_context tg_ctx; |
| 238 | tasking::root_t root; |
| 239 | detail::async::async_tasks_t async_tasks; |
| 240 | std::atomic<size_t> executed {0}; |
| 241 | |
| 242 | exec_ctx(tbb::task_arena& arena_, prio_items_queue_t& q_) |
| 243 | : arena(arena_), q(q_), |
| 244 | // As the traits is last argument, explicitly specify (default) value for first argument |
| 245 | tg_ctx{tbb::task_group_context::bound, tasking::tg_context_traits()}, |
| 246 | root(tasking::create_root(tg_ctx)) |
| 247 | {} |
| 248 | }; |
| 249 | |
| 250 | // At the moment there are no suitable tools to manage TBB priorities on task by task basis. |
| 251 | // Instead priority queue is used to respect tile_node priorities. |
| 252 | // As well, TBB task is not bound to any particular tile_node until actually executed. |
| 253 | |
| 254 | // Strictly speaking there are two graphs here: |
| 255 | // - G-API one, described by the connected tile_node instances. |
| 256 | // This graph is : |
| 257 | // - Known beforehand, and do not change during the execution (i.e. static) |
| 258 | // - Contains both TBB non-TBB parts |
| 259 | // - prioritized, (i.e. all nodes has assigned priority of execution) |
| 260 | // |
| 261 | // - TBB task tree, which is : |
| 262 | // - flat (Has only two levels : root and leaves) |
| 263 | // - dynamic, i.e. new leaves are added on demand when new tbb tasks are spawned |
| 264 | // - describes only TBB/CPU part of the whole graph |
| 265 | // - non-prioritized (i.e. all tasks are created equal) |
| 266 | |
| 267 | // Class below represents TBB task payload. |
| 268 | // |
| 269 | // Each instance basically does the three things : |
| 270 | // 1. Gets the tile_node item from the top of the queue |
| 271 | // 2. Executes its body |
| 272 | // 3. Pushes dependent tile_nodes to the queue once they are ready |
| 273 | // |
| 274 | struct task_body { |
| 275 | exec_ctx& ctx; |
| 276 | |
| 277 | std::size_t push_ready_dependants(tile_node* node) const { |
| 278 | return graph::push_ready_dependants(ctx.q, node); |
| 279 | } |
| 280 | |
| 281 | void spawn_clones(std::size_t items) const { |
| 282 | tasking::batch_spawn(items, ctx.root.get(), *this); |
| 283 | } |
| 284 | |
| 285 | task_body(exec_ctx& ctx_) : ctx(ctx_) {} |
| 286 | tasking::use_tbb_scheduler_bypass operator()() const { |
| 287 | ASSERT(!ctx.q.empty() && "Spawned task with no job to do ? " ); |
| 288 | |
| 289 | tile_node* node = detail::pop(ctx.q); |
| 290 | |
| 291 | auto result = tasking::use_tbb_scheduler_bypass::NO; |
| 292 | // execute the task |
| 293 | |
| 294 | if (auto p = util::get_if<tile_node::sync_task_body>(&(node->task_body))) { |
| 295 | // synchronous task |
| 296 | p->body(); |
| 297 | |
| 298 | std::size_t ready_items = push_ready_dependants(node); |
| 299 | |
| 300 | if (ready_items > 0) { |
| 301 | // spawn one less tasks and say TBB to reuse(recycle) current task |
| 302 | spawn_clones(ready_items - 1); |
| 303 | result = tasking::use_tbb_scheduler_bypass::YES; |
| 304 | } |
| 305 | } |
| 306 | else { |
| 307 | LOG_DEBUG(NULL, "Async task" ); |
| 308 | using namespace detail::async; |
| 309 | using util::copy_through_move; |
| 310 | |
| 311 | auto block_master = copy_through_move(lock_sleep_master(ctx.async_tasks)); |
| 312 | |
| 313 | auto self_copy = *this; |
| 314 | auto callback = [node, block_master, self_copy] () mutable /*due to block_master.get().unlock()*/ { |
| 315 | LOG_DEBUG(NULL, "Async task callback is called" ); |
| 316 | // Implicitly unlock master right in the end of callback |
| 317 | auto master_sleep_lock = std::move(block_master); |
| 318 | std::size_t ready_items = self_copy.push_ready_dependants(node); |
| 319 | if (ready_items > 0) { |
| 320 | auto master_was_active = is_tbb_work_present::NO; |
| 321 | { |
| 322 | GAPI_ITT_STATIC_LOCAL_HANDLE(ittTbbEnqueueSpawnReadyBlocks, "enqueueing a spawn of ready blocks" ); |
| 323 | GAPI_ITT_AUTO_TRACE_GUARD(ittTbbEnqueueSpawnReadyBlocks); |
| 324 | // Force master thread (one that does wait_for_all()) to (actively) wait for enqueued tasks |
| 325 | // and unlock it right after all dependent tasks are spawned. |
| 326 | |
| 327 | auto root_wait_lock = copy_through_move(lock_wait_master(self_copy.ctx.root, master_was_active)); |
| 328 | |
| 329 | // TODO: add test to cover proper holding of root_wait_lock |
| 330 | // As the calling thread most likely is not TBB one, instead of spawning TBB tasks directly we |
| 331 | // enqueue a task which will spawn them. |
| 332 | // For master thread to not leave wait_for_all() prematurely, |
| 333 | // hold the root_wait_lock until need tasks are actually spawned. |
| 334 | self_copy.ctx.arena.enqueue([ready_items, self_copy, root_wait_lock]() { |
| 335 | self_copy.spawn_clones(ready_items); |
| 336 | // TODO: why we need this? Either write a descriptive comment or remove it |
| 337 | volatile auto unused = root_wait_lock.get().guard.get(); |
| 338 | util::suppress_unused_warning(unused); |
| 339 | }); |
| 340 | } |
| 341 | // Wake master thread (if any) to pick up the enqueued tasks iff: |
| 342 | // 1. there is new TBB work to do, and |
| 343 | // 2. Master thread was sleeping on condition variable waiting for async tasks to complete |
| 344 | // (There was no active work before (i.e. root->ref_count() was == 1)) |
| 345 | auto wake_master = (master_was_active == is_tbb_work_present::NO) ? |
| 346 | wake_tbb_master::YES : wake_tbb_master::NO; |
| 347 | master_sleep_lock.get().unlock(wake_master); |
| 348 | } |
| 349 | }; |
| 350 | |
| 351 | auto& body = util::get<tile_node::async_task_body>(node->task_body).body; |
| 352 | body(std::move(callback), node->total_order_index); |
| 353 | } |
| 354 | |
| 355 | ctx.executed++; |
| 356 | // reset dependency_count to initial state to simplify re-execution of the same graph |
| 357 | node->dependency_count = node->dependencies; |
| 358 | |
| 359 | return result; |
| 360 | } |
| 361 | }; |
| 362 | } |
| 363 | } // namespace detail |
| 364 | }}} // namespace cv::gimpl::parallel |
| 365 | |
| 366 | void cv::gimpl::parallel::execute(prio_items_queue_t& q) { |
| 367 | // get the reference to current task_arena (i.e. one we are running in) |
| 368 | #if TBB_INTERFACE_VERSION > 9002 |
| 369 | using attach_t = tbb::task_arena::attach; |
| 370 | #else |
| 371 | using attach_t = tbb::internal::attach; |
| 372 | #endif |
| 373 | |
| 374 | tbb::task_arena arena{attach_t{}}; |
| 375 | execute(q, arena); |
| 376 | } |
| 377 | |
| 378 | void cv::gimpl::parallel::execute(prio_items_queue_t& q, tbb::task_arena& arena) { |
| 379 | using namespace detail; |
| 380 | graph::exec_ctx ctx{arena, q}; |
| 381 | |
| 382 | arena.execute( |
| 383 | [&]() { |
| 384 | // Passed in queue is assumed to contain starting tasks, i.e. ones with no (or resolved) dependencies |
| 385 | auto num_start_tasks = q.size(); |
| 386 | |
| 387 | // TODO: use recursive spawning and task soft affinity for faster task distribution |
| 388 | // As graph is starting and no task has been spawned yet |
| 389 | // assert_graph_is_running(root) will not hold, so spawn without assert |
| 390 | tasking::batch_spawn(num_start_tasks, ctx.root.get(), graph::task_body{ctx}, /* assert_graph_is_running*/false); |
| 391 | |
| 392 | using namespace std::chrono; |
| 393 | high_resolution_clock timer; |
| 394 | |
| 395 | auto tbb_work_done = [&ctx]() { return 1 == ctx.root->ref_count(); }; |
| 396 | auto async_work_done = [&ctx]() { return 0 == ctx.async_tasks.count; }; |
| 397 | do { |
| 398 | // First participate in execution of TBB graph till there are no more ready tasks. |
| 399 | ctx.root->wait_for_all(); |
| 400 | |
| 401 | if (!async_work_done()) { // Wait on the conditional variable iff there is active async work |
| 402 | auto start = timer.now(); |
| 403 | std::unique_lock<std::mutex> lk(ctx.async_tasks.mtx); |
| 404 | // Wait (probably by sleeping) until all async tasks are completed or new TBB tasks are created. |
| 405 | // FIXME: Use TBB resumable tasks here to avoid blocking TBB thread |
| 406 | ctx.async_tasks.cv.wait(lk, [&]{return async_work_done() || !tbb_work_done() ;}); |
| 407 | |
| 408 | LOG_INFO(NULL, "Slept for " << duration_cast<milliseconds>(timer.now() - start).count() << " ms \n" ); |
| 409 | } |
| 410 | } |
| 411 | while(!tbb_work_done() || !async_work_done()); |
| 412 | |
| 413 | ASSERT(tbb_work_done() && async_work_done() && "Graph is still running?" ); |
| 414 | } |
| 415 | ); |
| 416 | |
| 417 | LOG_INFO(NULL, "Done. Executed " << ctx.executed << " tasks" ); |
| 418 | } |
| 419 | |
| 420 | std::ostream& cv::gimpl::parallel::operator<<(std::ostream& o, tile_node const& n) { |
| 421 | o << "(" |
| 422 | << " at:" << &n << "," |
| 423 | << "indx: " << n.total_order_index << "," |
| 424 | << "deps #:" << n.dependency_count.value << ", " |
| 425 | << "prods:" << n.dependants.size(); |
| 426 | |
| 427 | o << "[" ; |
| 428 | for (auto* d: n.dependants) { |
| 429 | o << d << "," ; |
| 430 | } |
| 431 | o << "]" ; |
| 432 | |
| 433 | o << ")" ; |
| 434 | return o; |
| 435 | } |
| 436 | |
| 437 | #endif // HAVE_TBB && TBB_INTERFACE_VERSION |
| 438 | |