| 1 | /* |
| 2 | Copyright (c) 2005-2021 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #ifndef __TBB_task_arena_H |
| 18 | #define __TBB_task_arena_H |
| 19 | |
| 20 | #include "detail/_namespace_injection.h" |
| 21 | #include "detail/_task.h" |
| 22 | #include "detail/_exception.h" |
| 23 | #include "detail/_aligned_space.h" |
| 24 | #include "detail/_small_object_pool.h" |
| 25 | |
| 26 | #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 27 | #include "detail/_task_handle.h" |
| 28 | #endif |
| 29 | |
| 30 | #if __TBB_ARENA_BINDING |
| 31 | #include "info.h" |
| 32 | #endif /*__TBB_ARENA_BINDING*/ |
| 33 | |
| 34 | namespace tbb { |
| 35 | namespace detail { |
| 36 | |
| 37 | namespace d1 { |
| 38 | |
| 39 | template<typename F, typename R> |
| 40 | class task_arena_function : public delegate_base { |
| 41 | F &my_func; |
| 42 | aligned_space<R> my_return_storage; |
| 43 | bool my_constructed{false}; |
| 44 | // The function should be called only once. |
| 45 | bool operator()() const override { |
| 46 | new (my_return_storage.begin()) R(my_func()); |
| 47 | return true; |
| 48 | } |
| 49 | public: |
| 50 | task_arena_function(F& f) : my_func(f) {} |
| 51 | // The function can be called only after operator() and only once. |
| 52 | R consume_result() { |
| 53 | my_constructed = true; |
| 54 | return std::move(*(my_return_storage.begin())); |
| 55 | } |
| 56 | ~task_arena_function() override { |
| 57 | if (my_constructed) { |
| 58 | my_return_storage.begin()->~R(); |
| 59 | } |
| 60 | } |
| 61 | }; |
| 62 | |
| 63 | template<typename F> |
| 64 | class task_arena_function<F,void> : public delegate_base { |
| 65 | F &my_func; |
| 66 | bool operator()() const override { |
| 67 | my_func(); |
| 68 | return true; |
| 69 | } |
| 70 | public: |
| 71 | task_arena_function(F& f) : my_func(f) {} |
| 72 | void consume_result() const {} |
| 73 | |
| 74 | friend class task_arena_base; |
| 75 | }; |
| 76 | |
| 77 | class task_arena_base; |
| 78 | class task_scheduler_observer; |
| 79 | } // namespace d1 |
| 80 | |
| 81 | namespace r1 { |
| 82 | class arena; |
| 83 | struct task_arena_impl; |
| 84 | |
| 85 | TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool); |
| 86 | TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&); |
| 87 | TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&); |
| 88 | TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&); |
| 89 | TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&); |
| 90 | TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&); |
| 91 | TBB_EXPORT int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*); |
| 92 | TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t); |
| 93 | |
| 94 | TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*); |
| 95 | TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*); |
| 96 | TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t); |
| 97 | } // namespace r1 |
| 98 | |
| 99 | namespace d2 { |
| 100 | #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 101 | inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) { |
| 102 | if (th == nullptr) { |
| 103 | throw_exception(exception_id::bad_task_handle); |
| 104 | } |
| 105 | |
| 106 | auto& ctx = task_handle_accessor::ctx_of(th); |
| 107 | |
| 108 | // Do not access th after release |
| 109 | r1::enqueue(*task_handle_accessor::release(th), ctx, ta); |
| 110 | } |
| 111 | #endif// __TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 112 | |
| 113 | } |
| 114 | namespace d1 { |
| 115 | |
| 116 | static constexpr int priority_stride = INT_MAX / 4; |
| 117 | |
| 118 | class task_arena_base { |
| 119 | friend struct r1::task_arena_impl; |
| 120 | friend void r1::observe(d1::task_scheduler_observer&, bool); |
| 121 | public: |
| 122 | enum class priority : int { |
| 123 | low = 1 * priority_stride, |
| 124 | normal = 2 * priority_stride, |
| 125 | high = 3 * priority_stride |
| 126 | }; |
| 127 | #if __TBB_ARENA_BINDING |
| 128 | using constraints = tbb::detail::d1::constraints; |
| 129 | #endif /*__TBB_ARENA_BINDING*/ |
| 130 | protected: |
| 131 | //! Special settings |
| 132 | intptr_t my_version_and_traits; |
| 133 | |
| 134 | std::atomic<do_once_state> my_initialization_state; |
| 135 | |
| 136 | //! NULL if not currently initialized. |
| 137 | std::atomic<r1::arena*> my_arena; |
| 138 | static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*), |
| 139 | "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer" ); |
| 140 | |
| 141 | //! Concurrency level for deferred initialization |
| 142 | int my_max_concurrency; |
| 143 | |
| 144 | //! Reserved slots for external threads |
| 145 | unsigned my_num_reserved_slots; |
| 146 | |
| 147 | //! Arena priority |
| 148 | priority my_priority; |
| 149 | |
| 150 | //! The NUMA node index to which the arena will be attached |
| 151 | numa_node_id my_numa_id; |
| 152 | |
| 153 | //! The core type index to which arena will be attached |
| 154 | core_type_id my_core_type; |
| 155 | |
| 156 | //! Number of threads per core |
| 157 | int my_max_threads_per_core; |
| 158 | |
| 159 | // Backward compatibility checks. |
| 160 | core_type_id core_type() const { |
| 161 | return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic; |
| 162 | } |
| 163 | int max_threads_per_core() const { |
| 164 | return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic; |
| 165 | } |
| 166 | |
| 167 | enum { |
| 168 | default_flags = 0 |
| 169 | , core_type_support_flag = 1 |
| 170 | }; |
| 171 | |
| 172 | task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority) |
| 173 | : my_version_and_traits(default_flags | core_type_support_flag) |
| 174 | , my_initialization_state(do_once_state::uninitialized) |
| 175 | , my_arena(nullptr) |
| 176 | , my_max_concurrency(max_concurrency) |
| 177 | , my_num_reserved_slots(reserved_for_masters) |
| 178 | , my_priority(a_priority) |
| 179 | , my_numa_id(automatic) |
| 180 | , my_core_type(automatic) |
| 181 | , my_max_threads_per_core(automatic) |
| 182 | {} |
| 183 | |
| 184 | #if __TBB_ARENA_BINDING |
| 185 | task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority) |
| 186 | : my_version_and_traits(default_flags | core_type_support_flag) |
| 187 | , my_initialization_state(do_once_state::uninitialized) |
| 188 | , my_arena(nullptr) |
| 189 | , my_max_concurrency(constraints_.max_concurrency) |
| 190 | , my_num_reserved_slots(reserved_for_masters) |
| 191 | , my_priority(a_priority) |
| 192 | , my_numa_id(constraints_.numa_id) |
| 193 | #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT |
| 194 | , my_core_type(constraints_.core_type) |
| 195 | , my_max_threads_per_core(constraints_.max_threads_per_core) |
| 196 | #else |
| 197 | , my_core_type(automatic) |
| 198 | , my_max_threads_per_core(automatic) |
| 199 | #endif |
| 200 | {} |
| 201 | #endif /*__TBB_ARENA_BINDING*/ |
| 202 | public: |
| 203 | //! Typedef for number of threads that is automatic. |
| 204 | static const int automatic = -1; |
| 205 | static const int not_initialized = -2; |
| 206 | }; |
| 207 | |
| 208 | template<typename R, typename F> |
| 209 | R isolate_impl(F& f) { |
| 210 | task_arena_function<F, R> func(f); |
| 211 | r1::isolate_within_arena(d&: func, /*isolation*/ 0); |
| 212 | return func.consume_result(); |
| 213 | } |
| 214 | |
| 215 | template <typename F> |
| 216 | class enqueue_task : public task { |
| 217 | small_object_allocator m_allocator; |
| 218 | const F m_func; |
| 219 | |
| 220 | void finalize(const execution_data& ed) { |
| 221 | m_allocator.delete_object(this, ed); |
| 222 | } |
| 223 | task* execute(execution_data& ed) override { |
| 224 | m_func(); |
| 225 | finalize(ed); |
| 226 | return nullptr; |
| 227 | } |
| 228 | task* cancel(execution_data&) override { |
| 229 | __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught" ); |
| 230 | return nullptr; |
| 231 | } |
| 232 | public: |
| 233 | enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {} |
| 234 | enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {} |
| 235 | }; |
| 236 | |
| 237 | template<typename F> |
| 238 | void enqueue_impl(F&& f, task_arena_base* ta) { |
| 239 | small_object_allocator alloc{}; |
| 240 | r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta); |
| 241 | } |
| 242 | /** 1-to-1 proxy representation class of scheduler's arena |
| 243 | * Constructors set up settings only, real construction is deferred till the first method invocation |
| 244 | * Destructor only removes one of the references to the inner arena representation. |
| 245 | * Final destruction happens when all the references (and the work) are gone. |
| 246 | */ |
| 247 | class task_arena : public task_arena_base { |
| 248 | |
| 249 | void mark_initialized() { |
| 250 | __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" ); |
| 251 | my_initialization_state.store(i: do_once_state::initialized, m: std::memory_order_release); |
| 252 | } |
| 253 | |
| 254 | template<typename R, typename F> |
| 255 | R execute_impl(F& f) { |
| 256 | initialize(); |
| 257 | task_arena_function<F, R> func(f); |
| 258 | r1::execute(*this, func); |
| 259 | return func.consume_result(); |
| 260 | } |
| 261 | public: |
| 262 | //! Creates task_arena with certain concurrency limits |
| 263 | /** Sets up settings only, real construction is deferred till the first method invocation |
| 264 | * @arg max_concurrency specifies total number of slots in arena where threads work |
| 265 | * @arg reserved_for_masters specifies number of slots to be used by external threads only. |
| 266 | * Value of 1 is default and reflects behavior of implicit arenas. |
| 267 | **/ |
| 268 | task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1, |
| 269 | priority a_priority = priority::normal) |
| 270 | : task_arena_base(max_concurrency_, reserved_for_masters, a_priority) |
| 271 | {} |
| 272 | |
| 273 | #if __TBB_ARENA_BINDING |
| 274 | //! Creates task arena pinned to certain NUMA node |
| 275 | task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1, |
| 276 | priority a_priority = priority::normal) |
| 277 | : task_arena_base(constraints_, reserved_for_masters, a_priority) |
| 278 | {} |
| 279 | |
| 280 | //! Copies settings from another task_arena |
| 281 | task_arena(const task_arena &s) // copy settings but not the reference or instance |
| 282 | : task_arena_base( |
| 283 | constraints{} |
| 284 | .set_numa_id(s.my_numa_id) |
| 285 | .set_max_concurrency(s.my_max_concurrency) |
| 286 | #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT |
| 287 | .set_core_type(s.my_core_type) |
| 288 | .set_max_threads_per_core(s.my_max_threads_per_core) |
| 289 | #endif |
| 290 | , s.my_num_reserved_slots, s.my_priority) |
| 291 | {} |
| 292 | #else |
| 293 | //! Copies settings from another task_arena |
| 294 | task_arena(const task_arena& a) // copy settings but not the reference or instance |
| 295 | : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority) |
| 296 | {} |
| 297 | #endif /*__TBB_ARENA_BINDING*/ |
| 298 | |
| 299 | //! Tag class used to indicate the "attaching" constructor |
| 300 | struct attach {}; |
| 301 | |
| 302 | //! Creates an instance of task_arena attached to the current arena of the thread |
| 303 | explicit task_arena( attach ) |
| 304 | : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails |
| 305 | { |
| 306 | if (r1::attach(*this)) { |
| 307 | mark_initialized(); |
| 308 | } |
| 309 | } |
| 310 | |
| 311 | //! Forces allocation of the resources for the task_arena as specified in constructor arguments |
| 312 | void initialize() { |
| 313 | atomic_do_once(initializer: [this]{ r1::initialize(*this); }, state&: my_initialization_state); |
| 314 | } |
| 315 | |
| 316 | //! Overrides concurrency level and forces initialization of internal representation |
| 317 | void initialize(int max_concurrency_, unsigned reserved_for_masters = 1, |
| 318 | priority a_priority = priority::normal) |
| 319 | { |
| 320 | __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena" ); |
| 321 | if( !is_active() ) { |
| 322 | my_max_concurrency = max_concurrency_; |
| 323 | my_num_reserved_slots = reserved_for_masters; |
| 324 | my_priority = a_priority; |
| 325 | r1::initialize(*this); |
| 326 | mark_initialized(); |
| 327 | } |
| 328 | } |
| 329 | |
| 330 | #if __TBB_ARENA_BINDING |
| 331 | void initialize(constraints constraints_, unsigned reserved_for_masters = 1, |
| 332 | priority a_priority = priority::normal) |
| 333 | { |
| 334 | __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena" ); |
| 335 | if( !is_active() ) { |
| 336 | my_numa_id = constraints_.numa_id; |
| 337 | my_max_concurrency = constraints_.max_concurrency; |
| 338 | #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT |
| 339 | my_core_type = constraints_.core_type; |
| 340 | my_max_threads_per_core = constraints_.max_threads_per_core; |
| 341 | #endif |
| 342 | my_num_reserved_slots = reserved_for_masters; |
| 343 | my_priority = a_priority; |
| 344 | r1::initialize(*this); |
| 345 | mark_initialized(); |
| 346 | } |
| 347 | } |
| 348 | #endif /*__TBB_ARENA_BINDING*/ |
| 349 | |
| 350 | //! Attaches this instance to the current arena of the thread |
| 351 | void initialize(attach) { |
| 352 | // TODO: decide if this call must be thread-safe |
| 353 | __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena" ); |
| 354 | if( !is_active() ) { |
| 355 | if ( !r1::attach(*this) ) { |
| 356 | r1::initialize(*this); |
| 357 | } |
| 358 | mark_initialized(); |
| 359 | } |
| 360 | } |
| 361 | |
| 362 | //! Removes the reference to the internal arena representation. |
| 363 | //! Not thread safe wrt concurrent invocations of other methods. |
| 364 | void terminate() { |
| 365 | if( is_active() ) { |
| 366 | r1::terminate(*this); |
| 367 | my_initialization_state.store(i: do_once_state::uninitialized, m: std::memory_order_relaxed); |
| 368 | } |
| 369 | } |
| 370 | |
| 371 | //! Removes the reference to the internal arena representation, and destroys the external object. |
| 372 | //! Not thread safe wrt concurrent invocations of other methods. |
| 373 | ~task_arena() { |
| 374 | terminate(); |
| 375 | } |
| 376 | |
| 377 | //! Returns true if the arena is active (initialized); false otherwise. |
| 378 | //! The name was chosen to match a task_scheduler_init method with the same semantics. |
| 379 | bool is_active() const { |
| 380 | return my_initialization_state.load(m: std::memory_order_acquire) == do_once_state::initialized; |
| 381 | } |
| 382 | |
| 383 | //! Enqueues a task into the arena to process a functor, and immediately returns. |
| 384 | //! Does not require the calling thread to join the arena |
| 385 | |
| 386 | template<typename F> |
| 387 | void enqueue(F&& f) { |
| 388 | initialize(); |
| 389 | enqueue_impl(std::forward<F>(f), this); |
| 390 | } |
| 391 | |
| 392 | //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns. |
| 393 | //! Does not require the calling thread to join the arena |
| 394 | #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 395 | void enqueue(d2::task_handle&& th) { |
| 396 | initialize(); |
| 397 | d2::enqueue_impl(std::move(th), this); |
| 398 | } |
| 399 | #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 400 | |
| 401 | //! Joins the arena and executes a mutable functor, then returns |
| 402 | //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion |
| 403 | //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread |
| 404 | //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). |
| 405 | template<typename F> |
| 406 | auto execute(F&& f) -> decltype(f()) { |
| 407 | return execute_impl<decltype(f())>(f); |
| 408 | } |
| 409 | |
| 410 | #if __TBB_EXTRA_DEBUG |
| 411 | //! Returns my_num_reserved_slots |
| 412 | int debug_reserved_slots() const { |
| 413 | // Handle special cases inside the library |
| 414 | return my_num_reserved_slots; |
| 415 | } |
| 416 | |
| 417 | //! Returns my_max_concurrency |
| 418 | int debug_max_concurrency() const { |
| 419 | // Handle special cases inside the library |
| 420 | return my_max_concurrency; |
| 421 | } |
| 422 | |
| 423 | //! Wait for all work in the arena to be completed |
| 424 | //! Even submitted by other application threads |
| 425 | //! Joins arena if/when possible (in the same way as execute()) |
| 426 | void debug_wait_until_empty() { |
| 427 | initialize(); |
| 428 | r1::wait(*this); |
| 429 | } |
| 430 | #endif //__TBB_EXTRA_DEBUG |
| 431 | |
| 432 | //! Returns the maximal number of threads that can work inside the arena |
| 433 | int max_concurrency() const { |
| 434 | // Handle special cases inside the library |
| 435 | return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this); |
| 436 | } |
| 437 | |
| 438 | friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) { |
| 439 | __TBB_ASSERT(ta.is_active(), nullptr); |
| 440 | call_itt_task_notify(t: releasing, ptr: &t); |
| 441 | r1::submit(t, ctx, ta.my_arena.load(m: std::memory_order_relaxed), as_critical ? 1 : 0); |
| 442 | } |
| 443 | }; |
| 444 | |
| 445 | //! Executes a mutable functor in isolation within the current task arena. |
| 446 | //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). |
| 447 | template<typename F> |
| 448 | inline auto isolate(F&& f) -> decltype(f()) { |
| 449 | return isolate_impl<decltype(f())>(f); |
| 450 | } |
| 451 | |
| 452 | //! Returns the index, aka slot number, of the calling thread in its current arena |
| 453 | inline int current_thread_index() { |
| 454 | slot_id idx = r1::execution_slot(nullptr); |
| 455 | return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx); |
| 456 | } |
| 457 | |
| 458 | #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 459 | inline bool is_inside_task() { |
| 460 | return nullptr != current_context(); |
| 461 | } |
| 462 | #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 463 | |
| 464 | //! Returns the maximal number of threads that can work inside the arena |
| 465 | inline int max_concurrency() { |
| 466 | return r1::max_concurrency(nullptr); |
| 467 | } |
| 468 | |
| 469 | #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 470 | inline void enqueue(d2::task_handle&& th) { |
| 471 | d2::enqueue_impl(std::move(th), nullptr); |
| 472 | } |
| 473 | |
| 474 | template<typename F> |
| 475 | inline void enqueue(F&& f) { |
| 476 | enqueue_impl(std::forward<F>(f), nullptr); |
| 477 | } |
| 478 | #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 479 | |
| 480 | using r1::submit; |
| 481 | |
| 482 | } // namespace d1 |
| 483 | } // namespace detail |
| 484 | |
| 485 | inline namespace v1 { |
| 486 | using detail::d1::task_arena; |
| 487 | |
| 488 | #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 489 | using detail::d1::is_inside_task; |
| 490 | #endif |
| 491 | |
| 492 | namespace this_task_arena { |
| 493 | using detail::d1::current_thread_index; |
| 494 | using detail::d1::max_concurrency; |
| 495 | using detail::d1::isolate; |
| 496 | |
| 497 | #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS |
| 498 | using detail::d1::enqueue; |
| 499 | #endif |
| 500 | } // namespace this_task_arena |
| 501 | |
| 502 | } // inline namespace v1 |
| 503 | |
| 504 | } // namespace tbb |
| 505 | #endif /* __TBB_task_arena_H */ |
| 506 | |