1/*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_parallel_scan_H
18#define __TBB_parallel_scan_H
19
20#include <functional>
21
22#include "detail/_config.h"
23#include "detail/_namespace_injection.h"
24#include "detail/_exception.h"
25#include "detail/_task.h"
26
27#include "profiling.h"
28#include "partitioner.h"
29#include "blocked_range.h"
30#include "task_group.h"
31
32namespace tbb {
33namespace detail {
34namespace d1 {
35
36//! Used to indicate that the initial scan is being performed.
37/** @ingroup algorithms */
38struct pre_scan_tag {
39 static bool is_final_scan() {return false;}
40 operator bool() {return is_final_scan();}
41};
42
43//! Used to indicate that the final scan is being performed.
44/** @ingroup algorithms */
45struct final_scan_tag {
46 static bool is_final_scan() {return true;}
47 operator bool() {return is_final_scan();}
48};
49
50template<typename Range, typename Body>
51struct sum_node;
52
53#if __TBB_CPP20_CONCEPTS_PRESENT
54} // namespace d1
55namespace d0 {
56
57template <typename Body, typename Range>
58concept parallel_scan_body = splittable<Body> &&
59 requires( Body& body, const Range& range, Body& other ) {
60 body(range, tbb::detail::d1::pre_scan_tag{});
61 body(range, tbb::detail::d1::final_scan_tag{});
62 body.reverse_join(other);
63 body.assign(other);
64 };
65
66template <typename Function, typename Range, typename Value>
67concept parallel_scan_function = requires( const std::remove_reference_t<Function>& func,
68 const Range& range, const Value& value ) {
69 { func(range, value, true) } -> std::convertible_to<Value>;
70};
71
72template <typename Combine, typename Value>
73concept parallel_scan_combine = requires( const std::remove_reference_t<Combine>& combine,
74 const Value& lhs, const Value& rhs ) {
75 { combine(lhs, rhs) } -> std::convertible_to<Value>;
76};
77
78} // namespace d0
79namespace d1 {
80#endif // __TBB_CPP20_CONCEPTS_PRESENT
81
82//! Performs final scan for a leaf
83/** @ingroup algorithms */
84template<typename Range, typename Body>
85struct final_sum : public task {
86private:
87 using sum_node_type = sum_node<Range, Body>;
88 Body m_body;
89 aligned_space<Range> m_range;
90 //! Where to put result of last subrange, or nullptr if not last subrange.
91 Body* m_stuff_last;
92
93 wait_context& m_wait_context;
94 sum_node_type* m_parent = nullptr;
95public:
96 small_object_allocator m_allocator;
97 final_sum( Body& body, wait_context& w_o, small_object_allocator& alloc ) :
98 m_body(body, split()), m_wait_context(w_o), m_allocator(alloc) {
99 poison_pointer(m_stuff_last);
100 }
101
102 final_sum( final_sum& sum, small_object_allocator& alloc ) :
103 m_body(sum.m_body, split()), m_wait_context(sum.m_wait_context), m_allocator(alloc) {
104 poison_pointer(m_stuff_last);
105 }
106
107 ~final_sum() {
108 m_range.begin()->~Range();
109 }
110 void finish_construction( sum_node_type* parent, const Range& range, Body* stuff_last ) {
111 __TBB_ASSERT( m_parent == nullptr, nullptr );
112 m_parent = parent;
113 new( m_range.begin() ) Range(range);
114 m_stuff_last = stuff_last;
115 }
116private:
117 sum_node_type* release_parent() {
118 call_itt_task_notify(releasing, m_parent);
119 if (m_parent) {
120 auto parent = m_parent;
121 m_parent = nullptr;
122 if (parent->ref_count.fetch_sub(1) == 1) {
123 return parent;
124 }
125 }
126 else
127 m_wait_context.release();
128 return nullptr;
129 }
130 sum_node_type* finalize(const execution_data& ed){
131 sum_node_type* next_task = release_parent();
132 m_allocator.delete_object<final_sum>(this, ed);
133 return next_task;
134 }
135
136public:
137 task* execute(execution_data& ed) override {
138 m_body( *m_range.begin(), final_scan_tag() );
139 if( m_stuff_last )
140 m_stuff_last->assign(m_body);
141
142 return finalize(ed);
143 }
144 task* cancel(execution_data& ed) override {
145 return finalize(ed);
146 }
147 template<typename Tag>
148 void operator()( const Range& r, Tag tag ) {
149 m_body( r, tag );
150 }
151 void reverse_join( final_sum& a ) {
152 m_body.reverse_join(a.m_body);
153 }
154 void reverse_join( Body& body ) {
155 m_body.reverse_join(body);
156 }
157 void assign_to( Body& body ) {
158 body.assign(m_body);
159 }
160 void self_destroy(const execution_data& ed) {
161 m_allocator.delete_object<final_sum>(this, ed);
162 }
163};
164
165//! Split work to be done in the scan.
166/** @ingroup algorithms */
167template<typename Range, typename Body>
168struct sum_node : public task {
169private:
170 using final_sum_type = final_sum<Range,Body>;
171public:
172 final_sum_type *m_incoming;
173 final_sum_type *m_body;
174 Body *m_stuff_last;
175private:
176 final_sum_type *m_left_sum;
177 sum_node *m_left;
178 sum_node *m_right;
179 bool m_left_is_final;
180 Range m_range;
181 wait_context& m_wait_context;
182 sum_node* m_parent;
183 small_object_allocator m_allocator;
184public:
185 std::atomic<unsigned int> ref_count{0};
186 sum_node( const Range range, bool left_is_final_, sum_node* parent, wait_context& w_o, small_object_allocator& alloc ) :
187 m_stuff_last(nullptr),
188 m_left_sum(nullptr),
189 m_left(nullptr),
190 m_right(nullptr),
191 m_left_is_final(left_is_final_),
192 m_range(range),
193 m_wait_context(w_o),
194 m_parent(parent),
195 m_allocator(alloc)
196 {
197 if( m_parent )
198 m_parent->ref_count.fetch_add(1);
199 // Poison fields that will be set by second pass.
200 poison_pointer(m_body);
201 poison_pointer(m_incoming);
202 }
203
204 ~sum_node() {
205 if (m_parent)
206 m_parent->ref_count.fetch_sub(1);
207 }
208private:
209 sum_node* release_parent() {
210 call_itt_task_notify(releasing, m_parent);
211 if (m_parent) {
212 auto parent = m_parent;
213 m_parent = nullptr;
214 if (parent->ref_count.fetch_sub(1) == 1) {
215 return parent;
216 }
217 }
218 else
219 m_wait_context.release();
220 return nullptr;
221 }
222 task* create_child( const Range& range, final_sum_type& body, sum_node* child, final_sum_type* incoming, Body* stuff_last ) {
223 if( child ) {
224 __TBB_ASSERT( is_poisoned(child->m_body) && is_poisoned(child->m_incoming), nullptr );
225 child->prepare_for_execution(body, incoming, stuff_last);
226 return child;
227 } else {
228 body.finish_construction(this, range, stuff_last);
229 return &body;
230 }
231 }
232
233 sum_node* finalize(const execution_data& ed) {
234 sum_node* next_task = release_parent();
235 m_allocator.delete_object<sum_node>(this, ed);
236 return next_task;
237 }
238
239public:
240 void prepare_for_execution(final_sum_type& body, final_sum_type* incoming, Body *stuff_last) {
241 this->m_body = &body;
242 this->m_incoming = incoming;
243 this->m_stuff_last = stuff_last;
244 }
245 task* execute(execution_data& ed) override {
246 if( m_body ) {
247 if( m_incoming )
248 m_left_sum->reverse_join( *m_incoming );
249 task* right_child = this->create_child(Range(m_range,split()), *m_left_sum, m_right, m_left_sum, m_stuff_last);
250 task* left_child = m_left_is_final ? nullptr : this->create_child(m_range, *m_body, m_left, m_incoming, nullptr);
251 ref_count = (left_child != nullptr) + (right_child != nullptr);
252 m_body = nullptr;
253 if( left_child ) {
254 spawn(t&: *right_child, ctx&: *ed.context);
255 return left_child;
256 } else {
257 return right_child;
258 }
259 } else {
260 return finalize(ed);
261 }
262 }
263 task* cancel(execution_data& ed) override {
264 return finalize(ed);
265 }
266 void self_destroy(const execution_data& ed) {
267 m_allocator.delete_object<sum_node>(this, ed);
268 }
269 template<typename range,typename body,typename partitioner>
270 friend struct start_scan;
271
272 template<typename range,typename body>
273 friend struct finish_scan;
274};
275
276//! Combine partial results
277/** @ingroup algorithms */
278template<typename Range, typename Body>
279struct finish_scan : public task {
280private:
281 using sum_node_type = sum_node<Range,Body>;
282 using final_sum_type = final_sum<Range,Body>;
283 final_sum_type** const m_sum_slot;
284 sum_node_type*& m_return_slot;
285 small_object_allocator m_allocator;
286public:
287 std::atomic<final_sum_type*> m_right_zombie;
288 sum_node_type& m_result;
289 std::atomic<unsigned int> ref_count{2};
290 finish_scan* m_parent;
291 wait_context& m_wait_context;
292 task* execute(execution_data& ed) override {
293 __TBB_ASSERT( m_result.ref_count.load() == static_cast<unsigned int>((m_result.m_left!=nullptr)+(m_result.m_right!=nullptr)), nullptr );
294 if( m_result.m_left )
295 m_result.m_left_is_final = false;
296 final_sum_type* right_zombie = m_right_zombie.load(std::memory_order_acquire);
297 if( right_zombie && m_sum_slot )
298 (*m_sum_slot)->reverse_join(*m_result.m_left_sum);
299 __TBB_ASSERT( !m_return_slot, nullptr );
300 if( right_zombie || m_result.m_right ) {
301 m_return_slot = &m_result;
302 } else {
303 m_result.self_destroy(ed);
304 }
305 if( right_zombie && !m_sum_slot && !m_result.m_right ) {
306 right_zombie->self_destroy(ed);
307 m_right_zombie.store(nullptr, std::memory_order_relaxed);
308 }
309 return finalize(ed);
310 }
311 task* cancel(execution_data& ed) override {
312 return finalize(ed);
313 }
314 finish_scan(sum_node_type*& return_slot, final_sum_type** sum, sum_node_type& result_, finish_scan* parent, wait_context& w_o, small_object_allocator& alloc) :
315 m_sum_slot(sum),
316 m_return_slot(return_slot),
317 m_allocator(alloc),
318 m_right_zombie(nullptr),
319 m_result(result_),
320 m_parent(parent),
321 m_wait_context(w_o)
322 {
323 __TBB_ASSERT( !m_return_slot, nullptr );
324 }
325private:
326 finish_scan* release_parent() {
327 call_itt_task_notify(releasing, m_parent);
328 if (m_parent) {
329 auto parent = m_parent;
330 m_parent = nullptr;
331 if (parent->ref_count.fetch_sub(1) == 1) {
332 return parent;
333 }
334 }
335 else
336 m_wait_context.release();
337 return nullptr;
338 }
339 finish_scan* finalize(const execution_data& ed) {
340 finish_scan* next_task = release_parent();
341 m_allocator.delete_object<finish_scan>(this, ed);
342 return next_task;
343 }
344};
345
346//! Initial task to split the work
347/** @ingroup algorithms */
348template<typename Range, typename Body, typename Partitioner>
349struct start_scan : public task {
350private:
351 using sum_node_type = sum_node<Range,Body>;
352 using final_sum_type = final_sum<Range,Body>;
353 using finish_pass1_type = finish_scan<Range,Body>;
354 std::reference_wrapper<sum_node_type*> m_return_slot;
355 Range m_range;
356 std::reference_wrapper<final_sum_type> m_body;
357 typename Partitioner::partition_type m_partition;
358 /** Non-null if caller is requesting total. */
359 final_sum_type** m_sum_slot;
360 bool m_is_final;
361 bool m_is_right_child;
362
363 finish_pass1_type* m_parent;
364 small_object_allocator m_allocator;
365 wait_context& m_wait_context;
366
367 finish_pass1_type* release_parent() {
368 call_itt_task_notify(releasing, m_parent);
369 if (m_parent) {
370 auto parent = m_parent;
371 m_parent = nullptr;
372 if (parent->ref_count.fetch_sub(1) == 1) {
373 return parent;
374 }
375 }
376 else
377 m_wait_context.release();
378 return nullptr;
379 }
380
381 finish_pass1_type* finalize( const execution_data& ed ) {
382 finish_pass1_type* next_task = release_parent();
383 m_allocator.delete_object<start_scan>(this, ed);
384 return next_task;
385 }
386
387public:
388 task* execute( execution_data& ) override;
389 task* cancel( execution_data& ed ) override {
390 return finalize(ed);
391 }
392 start_scan( sum_node_type*& return_slot, start_scan& parent, small_object_allocator& alloc ) :
393 m_return_slot(return_slot),
394 m_range(parent.m_range,split()),
395 m_body(parent.m_body),
396 m_partition(parent.m_partition,split()),
397 m_sum_slot(parent.m_sum_slot),
398 m_is_final(parent.m_is_final),
399 m_is_right_child(true),
400 m_parent(parent.m_parent),
401 m_allocator(alloc),
402 m_wait_context(parent.m_wait_context)
403 {
404 __TBB_ASSERT( !m_return_slot, nullptr );
405 parent.m_is_right_child = false;
406 }
407
408 start_scan( sum_node_type*& return_slot, const Range& range, final_sum_type& body, const Partitioner& partitioner, wait_context& w_o, small_object_allocator& alloc ) :
409 m_return_slot(return_slot),
410 m_range(range),
411 m_body(body),
412 m_partition(partitioner),
413 m_sum_slot(nullptr),
414 m_is_final(true),
415 m_is_right_child(false),
416 m_parent(nullptr),
417 m_allocator(alloc),
418 m_wait_context(w_o)
419 {
420 __TBB_ASSERT( !m_return_slot, nullptr );
421 }
422
423 static void run( const Range& range, Body& body, const Partitioner& partitioner ) {
424 if( !range.empty() ) {
425 task_group_context context(PARALLEL_SCAN);
426
427 using start_pass1_type = start_scan<Range,Body,Partitioner>;
428 sum_node_type* root = nullptr;
429 wait_context w_ctx{1};
430 small_object_allocator alloc{};
431
432 auto& temp_body = *alloc.new_object<final_sum_type>(body, w_ctx, alloc);
433 temp_body.reverse_join(body);
434
435 auto& pass1 = *alloc.new_object<start_pass1_type>(/*m_return_slot=*/root, range, temp_body, partitioner, w_ctx, alloc);
436
437 execute_and_wait(pass1, context, w_ctx, context);
438 if( root ) {
439 root->prepare_for_execution(temp_body, nullptr, &body);
440 w_ctx.reserve();
441 execute_and_wait(*root, context, w_ctx, context);
442 } else {
443 temp_body.assign_to(body);
444 temp_body.finish_construction(nullptr, range, nullptr);
445 alloc.delete_object<final_sum_type>(&temp_body);
446 }
447 }
448 }
449};
450
451template<typename Range, typename Body, typename Partitioner>
452task* start_scan<Range,Body,Partitioner>::execute( execution_data& ed ) {
453 // Inspecting m_parent->result.left_sum would ordinarily be a race condition.
454 // But we inspect it only if we are not a stolen task, in which case we
455 // know that task assigning to m_parent->result.left_sum has completed.
456 __TBB_ASSERT(!m_is_right_child || m_parent, "right child is never an orphan");
457 bool treat_as_stolen = m_is_right_child && (is_stolen(ed) || &m_body.get()!=m_parent->m_result.m_left_sum);
458 if( treat_as_stolen ) {
459 // Invocation is for right child that has been really stolen or needs to be virtually stolen
460 small_object_allocator alloc{};
461 final_sum_type* right_zombie = alloc.new_object<final_sum_type>(m_body, alloc);
462 m_parent->m_right_zombie.store(right_zombie, std::memory_order_release);
463 m_body = *right_zombie;
464 m_is_final = false;
465 }
466 task* next_task = nullptr;
467 if( (m_is_right_child && !treat_as_stolen) || !m_range.is_divisible() || m_partition.should_execute_range(ed) ) {
468 if( m_is_final )
469 m_body(m_range, final_scan_tag());
470 else if( m_sum_slot )
471 m_body(m_range, pre_scan_tag());
472 if( m_sum_slot )
473 *m_sum_slot = &m_body.get();
474 __TBB_ASSERT( !m_return_slot, nullptr );
475
476 next_task = finalize(ed);
477 } else {
478 small_object_allocator alloc{};
479 auto result = alloc.new_object<sum_node_type>(m_range,/*m_left_is_final=*/m_is_final, m_parent? &m_parent->m_result: nullptr, m_wait_context, alloc);
480
481 auto new_parent = alloc.new_object<finish_pass1_type>(m_return_slot, m_sum_slot, *result, m_parent, m_wait_context, alloc);
482 m_parent = new_parent;
483
484 // Split off right child
485 auto& right_child = *alloc.new_object<start_scan>(/*m_return_slot=*/result->m_right, *this, alloc);
486
487 spawn(right_child, *ed.context);
488
489 m_sum_slot = &result->m_left_sum;
490 m_return_slot = result->m_left;
491
492 __TBB_ASSERT( !m_return_slot, nullptr );
493 next_task = this;
494 }
495 return next_task;
496}
497
498template<typename Range, typename Value, typename Scan, typename ReverseJoin>
499class lambda_scan_body {
500 Value m_sum_slot;
501 const Value& identity_element;
502 const Scan& m_scan;
503 const ReverseJoin& m_reverse_join;
504public:
505 void operator=(const lambda_scan_body&) = delete;
506 lambda_scan_body(const lambda_scan_body&) = default;
507
508 lambda_scan_body( const Value& identity, const Scan& scan, const ReverseJoin& rev_join )
509 : m_sum_slot(identity)
510 , identity_element(identity)
511 , m_scan(scan)
512 , m_reverse_join(rev_join) {}
513
514 lambda_scan_body( lambda_scan_body& b, split )
515 : m_sum_slot(b.identity_element)
516 , identity_element(b.identity_element)
517 , m_scan(b.m_scan)
518 , m_reverse_join(b.m_reverse_join) {}
519
520 template<typename Tag>
521 void operator()( const Range& r, Tag tag ) {
522 m_sum_slot = m_scan(r, m_sum_slot, tag);
523 }
524
525 void reverse_join( lambda_scan_body& a ) {
526 m_sum_slot = m_reverse_join(a.m_sum_slot, m_sum_slot);
527 }
528
529 void assign( lambda_scan_body& b ) {
530 m_sum_slot = b.m_sum_slot;
531 }
532
533 Value result() const {
534 return m_sum_slot;
535 }
536};
537
538// Requirements on Range concept are documented in blocked_range.h
539
540/** \page parallel_scan_body_req Requirements on parallel_scan body
541 Class \c Body implementing the concept of parallel_scan body must define:
542 - \code Body::Body( Body&, split ); \endcode Splitting constructor.
543 Split \c b so that \c this and \c b can accumulate separately
544 - \code Body::~Body(); \endcode Destructor
545 - \code void Body::operator()( const Range& r, pre_scan_tag ); \endcode
546 Preprocess iterations for range \c r
547 - \code void Body::operator()( const Range& r, final_scan_tag ); \endcode
548 Do final processing for iterations of range \c r
549 - \code void Body::reverse_join( Body& a ); \endcode
550 Merge preprocessing state of \c a into \c this, where \c a was
551 created earlier from \c b by b's splitting constructor
552**/
553
554/** \name parallel_scan
555 See also requirements on \ref range_req "Range" and \ref parallel_scan_body_req "parallel_scan Body". **/
556//@{
557
558//! Parallel prefix with default partitioner
559/** @ingroup algorithms **/
560template<typename Range, typename Body>
561 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
562void parallel_scan( const Range& range, Body& body ) {
563 start_scan<Range, Body, auto_partitioner>::run(range,body,__TBB_DEFAULT_PARTITIONER());
564}
565
566//! Parallel prefix with simple_partitioner
567/** @ingroup algorithms **/
568template<typename Range, typename Body>
569 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
570void parallel_scan( const Range& range, Body& body, const simple_partitioner& partitioner ) {
571 start_scan<Range, Body, simple_partitioner>::run(range, body, partitioner);
572}
573
574//! Parallel prefix with auto_partitioner
575/** @ingroup algorithms **/
576template<typename Range, typename Body>
577 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
578void parallel_scan( const Range& range, Body& body, const auto_partitioner& partitioner ) {
579 start_scan<Range,Body,auto_partitioner>::run(range, body, partitioner);
580}
581
582//! Parallel prefix with default partitioner
583/** @ingroup algorithms **/
584template<typename Range, typename Value, typename Scan, typename ReverseJoin>
585 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
586 parallel_scan_combine<ReverseJoin, Value>)
587Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join ) {
588 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
589 parallel_scan(range, body, __TBB_DEFAULT_PARTITIONER());
590 return body.result();
591}
592
593//! Parallel prefix with simple_partitioner
594/** @ingroup algorithms **/
595template<typename Range, typename Value, typename Scan, typename ReverseJoin>
596 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
597 parallel_scan_combine<ReverseJoin, Value>)
598Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join,
599 const simple_partitioner& partitioner ) {
600 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
601 parallel_scan(range, body, partitioner);
602 return body.result();
603}
604
605//! Parallel prefix with auto_partitioner
606/** @ingroup algorithms **/
607template<typename Range, typename Value, typename Scan, typename ReverseJoin>
608 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
609 parallel_scan_combine<ReverseJoin, Value>)
610Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join,
611 const auto_partitioner& partitioner ) {
612 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
613 parallel_scan(range, body, partitioner);
614 return body.result();
615}
616
617} // namespace d1
618} // namespace detail
619
620inline namespace v1 {
621 using detail::d1::parallel_scan;
622 using detail::d1::pre_scan_tag;
623 using detail::d1::final_scan_tag;
624} // namespace v1
625
626} // namespace tbb
627
628#endif /* __TBB_parallel_scan_H */
629
630

source code of include/oneapi/tbb/parallel_scan.h