1/*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_parallel_invoke_H
18#define __TBB_parallel_invoke_H
19
20#include "detail/_config.h"
21#include "detail/_namespace_injection.h"
22#include "detail/_exception.h"
23#include "detail/_task.h"
24#include "detail/_template_helpers.h"
25#include "detail/_small_object_pool.h"
26
27#include "task_group.h"
28
29#include <tuple>
30#include <atomic>
31#include <utility>
32
33namespace tbb {
34namespace detail {
35namespace d1 {
36
37//! Simple task object, executing user method
38template<typename Function, typename WaitObject>
39struct function_invoker : public task {
40 function_invoker(const Function& function, WaitObject& wait_ctx) :
41 my_function(function),
42 parent_wait_ctx(wait_ctx)
43 {}
44
45 task* execute(execution_data& ed) override {
46 my_function();
47 parent_wait_ctx.release(ed);
48 call_itt_task_notify(destroy, this);
49 return nullptr;
50 }
51
52 task* cancel(execution_data& ed) override {
53 parent_wait_ctx.release(ed);
54 return nullptr;
55 }
56
57 const Function& my_function;
58 WaitObject& parent_wait_ctx;
59}; // struct function_invoker
60
61//! Task object for managing subroots in trinary task trees.
62// Endowed with additional synchronization logic (compatible with wait object intefaces) to support
63// continuation passing execution. This task spawns 2 function_invoker tasks with first and second functors
64// and then executes first functor by itself. But only the last executed functor must destruct and deallocate
65// the subroot task.
66template<typename F1, typename F2, typename F3>
67struct invoke_subroot_task : public task {
68 wait_context& root_wait_ctx;
69 std::atomic<unsigned> ref_count{0};
70 bool child_spawned = false;
71
72 const F1& self_invoked_functor;
73 function_invoker<F2, invoke_subroot_task<F1, F2, F3>> f2_invoker;
74 function_invoker<F3, invoke_subroot_task<F1, F2, F3>> f3_invoker;
75
76 task_group_context& my_execution_context;
77 small_object_allocator my_allocator;
78
79 invoke_subroot_task(const F1& f1, const F2& f2, const F3& f3, wait_context& wait_ctx, task_group_context& context,
80 small_object_allocator& alloc) :
81 root_wait_ctx(wait_ctx),
82 self_invoked_functor(f1),
83 f2_invoker(f2, *this),
84 f3_invoker(f3, *this),
85 my_execution_context(context),
86 my_allocator(alloc)
87 {
88 root_wait_ctx.reserve();
89 }
90
91 void finalize(const execution_data& ed) {
92 root_wait_ctx.release();
93
94 my_allocator.delete_object(this, ed);
95 }
96
97 void release(const execution_data& ed) {
98 __TBB_ASSERT(ref_count > 0, nullptr);
99 call_itt_task_notify(releasing, this);
100 if( --ref_count == 0 ) {
101 call_itt_task_notify(acquired, this);
102 finalize(ed);
103 }
104 }
105
106 task* execute(execution_data& ed) override {
107 ref_count.fetch_add(i: 3, m: std::memory_order_relaxed);
108 spawn(f3_invoker, my_execution_context);
109 spawn(f2_invoker, my_execution_context);
110 self_invoked_functor();
111
112 release(ed);
113 return nullptr;
114 }
115
116 task* cancel(execution_data& ed) override {
117 if( ref_count > 0 ) { // detect children spawn
118 release(ed);
119 } else {
120 finalize(ed);
121 }
122 return nullptr;
123 }
124}; // struct subroot_task
125
126class invoke_root_task {
127public:
128 invoke_root_task(wait_context& wc) : my_wait_context(wc) {}
129 void release(const execution_data&) {
130 my_wait_context.release();
131 }
132private:
133 wait_context& my_wait_context;
134};
135
136template<typename F1>
137void invoke_recursive_separation(wait_context& root_wait_ctx, task_group_context& context, const F1& f1) {
138 root_wait_ctx.reserve(delta: 1);
139 invoke_root_task root(root_wait_ctx);
140 function_invoker<F1, invoke_root_task> invoker1(f1, root);
141
142 execute_and_wait(invoker1, context, root_wait_ctx, context);
143}
144
145template<typename F1, typename F2>
146void invoke_recursive_separation(wait_context& root_wait_ctx, task_group_context& context, const F1& f1, const F2& f2) {
147 root_wait_ctx.reserve(delta: 2);
148 invoke_root_task root(root_wait_ctx);
149 function_invoker<F1, invoke_root_task> invoker1(f1, root);
150 function_invoker<F2, invoke_root_task> invoker2(f2, root);
151
152 spawn(invoker1, context);
153 execute_and_wait(invoker2, context, root_wait_ctx, context);
154}
155
156template<typename F1, typename F2, typename F3>
157void invoke_recursive_separation(wait_context& root_wait_ctx, task_group_context& context, const F1& f1, const F2& f2, const F3& f3) {
158 root_wait_ctx.reserve(delta: 3);
159 invoke_root_task root(root_wait_ctx);
160 function_invoker<F1, invoke_root_task> invoker1(f1, root);
161 function_invoker<F2, invoke_root_task> invoker2(f2, root);
162 function_invoker<F3, invoke_root_task> invoker3(f3, root);
163
164 //TODO: implement sub root for two tasks (measure performance)
165 spawn(invoker1, context);
166 spawn(invoker2, context);
167 execute_and_wait(invoker3, context, root_wait_ctx, context);
168}
169
170template<typename F1, typename F2, typename F3, typename... Fs>
171void invoke_recursive_separation(wait_context& root_wait_ctx, task_group_context& context,
172 const F1& f1, const F2& f2, const F3& f3, const Fs&... fs) {
173 small_object_allocator alloc{};
174 auto sub_root = alloc.new_object<invoke_subroot_task<F1, F2, F3>>(f1, f2, f3, root_wait_ctx, context, alloc);
175 spawn(*sub_root, context);
176
177 invoke_recursive_separation(root_wait_ctx, context, fs...);
178}
179
180template<typename... Fs>
181void parallel_invoke_impl(task_group_context& context, const Fs&... fs) {
182 static_assert(sizeof...(Fs) >= 2, "Parallel invoke may be called with at least two callable");
183 wait_context root_wait_ctx{0};
184
185 invoke_recursive_separation(root_wait_ctx, context, fs...);
186}
187
188template<typename F1, typename... Fs>
189void parallel_invoke_impl(const F1& f1, const Fs&... fs) {
190 static_assert(sizeof...(Fs) >= 1, "Parallel invoke may be called with at least two callable");
191 task_group_context context(PARALLEL_INVOKE);
192 wait_context root_wait_ctx{0};
193
194 invoke_recursive_separation(root_wait_ctx, context, fs..., f1);
195}
196
197//! Passes last argument of variadic pack as first for handling user provided task_group_context
198template <typename Tuple, typename... Fs>
199struct invoke_helper;
200
201template <typename... Args, typename T, typename... Fs>
202struct invoke_helper<std::tuple<Args...>, T, Fs...> : invoke_helper<std::tuple<Args..., T>, Fs...> {};
203
204template <typename... Fs, typename T/*task_group_context or callable*/>
205struct invoke_helper<std::tuple<Fs...>, T> {
206 void operator()(Fs&&... args, T&& t) {
207 parallel_invoke_impl(std::forward<T>(t), std::forward<Fs>(args)...);
208 }
209};
210
211//! Parallel execution of several function objects
212// We need to pass parameter pack through forwarding reference,
213// since this pack may contain task_group_context that must be passed via lvalue non-const reference
214template<typename... Fs>
215void parallel_invoke(Fs&&... fs) {
216 invoke_helper<std::tuple<>, Fs...>()(std::forward<Fs>(fs)...);
217}
218
219} // namespace d1
220} // namespace detail
221
222inline namespace v1 {
223using detail::d1::parallel_invoke;
224} // namespace v1
225
226} // namespace tbb
227#endif /* __TBB_parallel_invoke_H */
228

source code of include/oneapi/tbb/parallel_invoke.h