1/*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_task_arena_H
18#define __TBB_task_arena_H
19
20#include "detail/_namespace_injection.h"
21#include "detail/_task.h"
22#include "detail/_exception.h"
23#include "detail/_aligned_space.h"
24#include "detail/_small_object_pool.h"
25
26#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
27#include "detail/_task_handle.h"
28#endif
29
30#if __TBB_ARENA_BINDING
31#include "info.h"
32#endif /*__TBB_ARENA_BINDING*/
33
34namespace tbb {
35namespace detail {
36
37namespace d1 {
38
39template<typename F, typename R>
40class task_arena_function : public delegate_base {
41 F &my_func;
42 aligned_space<R> my_return_storage;
43 bool my_constructed{false};
44 // The function should be called only once.
45 bool operator()() const override {
46 new (my_return_storage.begin()) R(my_func());
47 return true;
48 }
49public:
50 task_arena_function(F& f) : my_func(f) {}
51 // The function can be called only after operator() and only once.
52 R consume_result() {
53 my_constructed = true;
54 return std::move(*(my_return_storage.begin()));
55 }
56 ~task_arena_function() override {
57 if (my_constructed) {
58 my_return_storage.begin()->~R();
59 }
60 }
61};
62
63template<typename F>
64class task_arena_function<F,void> : public delegate_base {
65 F &my_func;
66 bool operator()() const override {
67 my_func();
68 return true;
69 }
70public:
71 task_arena_function(F& f) : my_func(f) {}
72 void consume_result() const {}
73
74 friend class task_arena_base;
75};
76
77class task_arena_base;
78class task_scheduler_observer;
79} // namespace d1
80
81namespace r1 {
82class arena;
83struct task_arena_impl;
84
85TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool);
86TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&);
87TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&);
88TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&);
89TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&);
90TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&);
91TBB_EXPORT int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*);
92TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t);
93
94TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
95TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
96TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);
97} // namespace r1
98
99namespace d2 {
100#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
101inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) {
102 if (th == nullptr) {
103 throw_exception(exception_id::bad_task_handle);
104 }
105
106 auto& ctx = task_handle_accessor::ctx_of(th);
107
108 // Do not access th after release
109 r1::enqueue(*task_handle_accessor::release(th), ctx, ta);
110}
111#endif// __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
112
113}
114namespace d1 {
115
116static constexpr int priority_stride = INT_MAX / 4;
117
118class task_arena_base {
119 friend struct r1::task_arena_impl;
120 friend void r1::observe(d1::task_scheduler_observer&, bool);
121public:
122 enum class priority : int {
123 low = 1 * priority_stride,
124 normal = 2 * priority_stride,
125 high = 3 * priority_stride
126 };
127#if __TBB_ARENA_BINDING
128 using constraints = tbb::detail::d1::constraints;
129#endif /*__TBB_ARENA_BINDING*/
130protected:
131 //! Special settings
132 intptr_t my_version_and_traits;
133
134 std::atomic<do_once_state> my_initialization_state;
135
136 //! NULL if not currently initialized.
137 std::atomic<r1::arena*> my_arena;
138 static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*),
139 "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer");
140
141 //! Concurrency level for deferred initialization
142 int my_max_concurrency;
143
144 //! Reserved slots for external threads
145 unsigned my_num_reserved_slots;
146
147 //! Arena priority
148 priority my_priority;
149
150 //! The NUMA node index to which the arena will be attached
151 numa_node_id my_numa_id;
152
153 //! The core type index to which arena will be attached
154 core_type_id my_core_type;
155
156 //! Number of threads per core
157 int my_max_threads_per_core;
158
159 // Backward compatibility checks.
160 core_type_id core_type() const {
161 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic;
162 }
163 int max_threads_per_core() const {
164 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
165 }
166
167 enum {
168 default_flags = 0
169 , core_type_support_flag = 1
170 };
171
172 task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
173 : my_version_and_traits(default_flags | core_type_support_flag)
174 , my_initialization_state(do_once_state::uninitialized)
175 , my_arena(nullptr)
176 , my_max_concurrency(max_concurrency)
177 , my_num_reserved_slots(reserved_for_masters)
178 , my_priority(a_priority)
179 , my_numa_id(automatic)
180 , my_core_type(automatic)
181 , my_max_threads_per_core(automatic)
182 {}
183
184#if __TBB_ARENA_BINDING
185 task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
186 : my_version_and_traits(default_flags | core_type_support_flag)
187 , my_initialization_state(do_once_state::uninitialized)
188 , my_arena(nullptr)
189 , my_max_concurrency(constraints_.max_concurrency)
190 , my_num_reserved_slots(reserved_for_masters)
191 , my_priority(a_priority)
192 , my_numa_id(constraints_.numa_id)
193#if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
194 , my_core_type(constraints_.core_type)
195 , my_max_threads_per_core(constraints_.max_threads_per_core)
196#else
197 , my_core_type(automatic)
198 , my_max_threads_per_core(automatic)
199#endif
200 {}
201#endif /*__TBB_ARENA_BINDING*/
202public:
203 //! Typedef for number of threads that is automatic.
204 static const int automatic = -1;
205 static const int not_initialized = -2;
206};
207
208template<typename R, typename F>
209R isolate_impl(F& f) {
210 task_arena_function<F, R> func(f);
211 r1::isolate_within_arena(d&: func, /*isolation*/ 0);
212 return func.consume_result();
213}
214
215template <typename F>
216class enqueue_task : public task {
217 small_object_allocator m_allocator;
218 const F m_func;
219
220 void finalize(const execution_data& ed) {
221 m_allocator.delete_object(this, ed);
222 }
223 task* execute(execution_data& ed) override {
224 m_func();
225 finalize(ed);
226 return nullptr;
227 }
228 task* cancel(execution_data&) override {
229 __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught");
230 return nullptr;
231 }
232public:
233 enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {}
234 enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {}
235};
236
237template<typename F>
238void enqueue_impl(F&& f, task_arena_base* ta) {
239 small_object_allocator alloc{};
240 r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta);
241}
242/** 1-to-1 proxy representation class of scheduler's arena
243 * Constructors set up settings only, real construction is deferred till the first method invocation
244 * Destructor only removes one of the references to the inner arena representation.
245 * Final destruction happens when all the references (and the work) are gone.
246 */
247class task_arena : public task_arena_base {
248
249 void mark_initialized() {
250 __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" );
251 my_initialization_state.store(i: do_once_state::initialized, m: std::memory_order_release);
252 }
253
254 template<typename R, typename F>
255 R execute_impl(F& f) {
256 initialize();
257 task_arena_function<F, R> func(f);
258 r1::execute(*this, func);
259 return func.consume_result();
260 }
261public:
262 //! Creates task_arena with certain concurrency limits
263 /** Sets up settings only, real construction is deferred till the first method invocation
264 * @arg max_concurrency specifies total number of slots in arena where threads work
265 * @arg reserved_for_masters specifies number of slots to be used by external threads only.
266 * Value of 1 is default and reflects behavior of implicit arenas.
267 **/
268 task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
269 priority a_priority = priority::normal)
270 : task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
271 {}
272
273#if __TBB_ARENA_BINDING
274 //! Creates task arena pinned to certain NUMA node
275 task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
276 priority a_priority = priority::normal)
277 : task_arena_base(constraints_, reserved_for_masters, a_priority)
278 {}
279
280 //! Copies settings from another task_arena
281 task_arena(const task_arena &s) // copy settings but not the reference or instance
282 : task_arena_base(
283 constraints{}
284 .set_numa_id(s.my_numa_id)
285 .set_max_concurrency(s.my_max_concurrency)
286#if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
287 .set_core_type(s.my_core_type)
288 .set_max_threads_per_core(s.my_max_threads_per_core)
289#endif
290 , s.my_num_reserved_slots, s.my_priority)
291 {}
292#else
293 //! Copies settings from another task_arena
294 task_arena(const task_arena& a) // copy settings but not the reference or instance
295 : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
296 {}
297#endif /*__TBB_ARENA_BINDING*/
298
299 //! Tag class used to indicate the "attaching" constructor
300 struct attach {};
301
302 //! Creates an instance of task_arena attached to the current arena of the thread
303 explicit task_arena( attach )
304 : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
305 {
306 if (r1::attach(*this)) {
307 mark_initialized();
308 }
309 }
310
311 //! Forces allocation of the resources for the task_arena as specified in constructor arguments
312 void initialize() {
313 atomic_do_once(initializer: [this]{ r1::initialize(*this); }, state&: my_initialization_state);
314 }
315
316 //! Overrides concurrency level and forces initialization of internal representation
317 void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
318 priority a_priority = priority::normal)
319 {
320 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
321 if( !is_active() ) {
322 my_max_concurrency = max_concurrency_;
323 my_num_reserved_slots = reserved_for_masters;
324 my_priority = a_priority;
325 r1::initialize(*this);
326 mark_initialized();
327 }
328 }
329
330#if __TBB_ARENA_BINDING
331 void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
332 priority a_priority = priority::normal)
333 {
334 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
335 if( !is_active() ) {
336 my_numa_id = constraints_.numa_id;
337 my_max_concurrency = constraints_.max_concurrency;
338#if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
339 my_core_type = constraints_.core_type;
340 my_max_threads_per_core = constraints_.max_threads_per_core;
341#endif
342 my_num_reserved_slots = reserved_for_masters;
343 my_priority = a_priority;
344 r1::initialize(*this);
345 mark_initialized();
346 }
347 }
348#endif /*__TBB_ARENA_BINDING*/
349
350 //! Attaches this instance to the current arena of the thread
351 void initialize(attach) {
352 // TODO: decide if this call must be thread-safe
353 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
354 if( !is_active() ) {
355 if ( !r1::attach(*this) ) {
356 r1::initialize(*this);
357 }
358 mark_initialized();
359 }
360 }
361
362 //! Removes the reference to the internal arena representation.
363 //! Not thread safe wrt concurrent invocations of other methods.
364 void terminate() {
365 if( is_active() ) {
366 r1::terminate(*this);
367 my_initialization_state.store(i: do_once_state::uninitialized, m: std::memory_order_relaxed);
368 }
369 }
370
371 //! Removes the reference to the internal arena representation, and destroys the external object.
372 //! Not thread safe wrt concurrent invocations of other methods.
373 ~task_arena() {
374 terminate();
375 }
376
377 //! Returns true if the arena is active (initialized); false otherwise.
378 //! The name was chosen to match a task_scheduler_init method with the same semantics.
379 bool is_active() const {
380 return my_initialization_state.load(m: std::memory_order_acquire) == do_once_state::initialized;
381 }
382
383 //! Enqueues a task into the arena to process a functor, and immediately returns.
384 //! Does not require the calling thread to join the arena
385
386 template<typename F>
387 void enqueue(F&& f) {
388 initialize();
389 enqueue_impl(std::forward<F>(f), this);
390 }
391
392 //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns.
393 //! Does not require the calling thread to join the arena
394#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
395 void enqueue(d2::task_handle&& th) {
396 initialize();
397 d2::enqueue_impl(std::move(th), this);
398 }
399#endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
400
401 //! Joins the arena and executes a mutable functor, then returns
402 //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion
403 //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread
404 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
405 template<typename F>
406 auto execute(F&& f) -> decltype(f()) {
407 return execute_impl<decltype(f())>(f);
408 }
409
410#if __TBB_EXTRA_DEBUG
411 //! Returns my_num_reserved_slots
412 int debug_reserved_slots() const {
413 // Handle special cases inside the library
414 return my_num_reserved_slots;
415 }
416
417 //! Returns my_max_concurrency
418 int debug_max_concurrency() const {
419 // Handle special cases inside the library
420 return my_max_concurrency;
421 }
422
423 //! Wait for all work in the arena to be completed
424 //! Even submitted by other application threads
425 //! Joins arena if/when possible (in the same way as execute())
426 void debug_wait_until_empty() {
427 initialize();
428 r1::wait(*this);
429 }
430#endif //__TBB_EXTRA_DEBUG
431
432 //! Returns the maximal number of threads that can work inside the arena
433 int max_concurrency() const {
434 // Handle special cases inside the library
435 return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this);
436 }
437
438 friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) {
439 __TBB_ASSERT(ta.is_active(), nullptr);
440 call_itt_task_notify(t: releasing, ptr: &t);
441 r1::submit(t, ctx, ta.my_arena.load(m: std::memory_order_relaxed), as_critical ? 1 : 0);
442 }
443};
444
445//! Executes a mutable functor in isolation within the current task arena.
446//! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
447template<typename F>
448inline auto isolate(F&& f) -> decltype(f()) {
449 return isolate_impl<decltype(f())>(f);
450}
451
452//! Returns the index, aka slot number, of the calling thread in its current arena
453inline int current_thread_index() {
454 slot_id idx = r1::execution_slot(nullptr);
455 return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx);
456}
457
458#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
459inline bool is_inside_task() {
460 return nullptr != current_context();
461}
462#endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
463
464//! Returns the maximal number of threads that can work inside the arena
465inline int max_concurrency() {
466 return r1::max_concurrency(nullptr);
467}
468
469#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
470inline void enqueue(d2::task_handle&& th) {
471 d2::enqueue_impl(std::move(th), nullptr);
472}
473
474template<typename F>
475inline void enqueue(F&& f) {
476 enqueue_impl(std::forward<F>(f), nullptr);
477}
478#endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
479
480using r1::submit;
481
482} // namespace d1
483} // namespace detail
484
485inline namespace v1 {
486using detail::d1::task_arena;
487
488#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
489using detail::d1::is_inside_task;
490#endif
491
492namespace this_task_arena {
493using detail::d1::current_thread_index;
494using detail::d1::max_concurrency;
495using detail::d1::isolate;
496
497#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
498using detail::d1::enqueue;
499#endif
500} // namespace this_task_arena
501
502} // inline namespace v1
503
504} // namespace tbb
505#endif /* __TBB_task_arena_H */
506

source code of include/oneapi/tbb/task_arena.h