1/*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_parallel_scan_H
18#define __TBB_parallel_scan_H
19
20#include <functional>
21
22#include "detail/_config.h"
23#include "detail/_namespace_injection.h"
24#include "detail/_exception.h"
25#include "detail/_task.h"
26
27#include "profiling.h"
28#include "partitioner.h"
29#include "blocked_range.h"
30#include "task_group.h"
31
32namespace tbb {
33namespace detail {
34namespace d1 {
35
36//! Used to indicate that the initial scan is being performed.
37/** @ingroup algorithms */
38struct pre_scan_tag {
39 static bool is_final_scan() {return false;}
40 operator bool() {return is_final_scan();}
41};
42
43//! Used to indicate that the final scan is being performed.
44/** @ingroup algorithms */
45struct final_scan_tag {
46 static bool is_final_scan() {return true;}
47 operator bool() {return is_final_scan();}
48};
49
50template<typename Range, typename Body>
51struct sum_node;
52
53#if __TBB_CPP20_CONCEPTS_PRESENT
54} // namespace d1
55namespace d0 {
56
57template <typename Body, typename Range>
58concept parallel_scan_body = splittable<Body> &&
59 requires( Body& body, const Range& range, Body& other ) {
60 body(range, tbb::detail::d1::pre_scan_tag{});
61 body(range, tbb::detail::d1::final_scan_tag{});
62 body.reverse_join(other);
63 body.assign(other);
64 };
65
66template <typename Function, typename Range, typename Value>
67concept parallel_scan_function = std::invocable<const std::remove_reference_t<Function>&,
68 const Range&, const Value&, bool> &&
69 std::convertible_to<std::invoke_result_t<const std::remove_reference_t<Function>&,
70 const Range&, const Value&, bool>,
71 Value>;
72
73template <typename Combine, typename Value>
74concept parallel_scan_combine = std::invocable<const std::remove_reference_t<Combine>&,
75 const Value&, const Value&> &&
76 std::convertible_to<std::invoke_result_t<const std::remove_reference_t<Combine>&,
77 const Value&, const Value&>,
78 Value>;
79
80} // namespace d0
81namespace d1 {
82#endif // __TBB_CPP20_CONCEPTS_PRESENT
83
84//! Performs final scan for a leaf
85/** @ingroup algorithms */
86template<typename Range, typename Body>
87struct final_sum : public task {
88private:
89 using sum_node_type = sum_node<Range, Body>;
90 Body m_body;
91 aligned_space<Range> m_range;
92 //! Where to put result of last subrange, or nullptr if not last subrange.
93 Body* m_stuff_last;
94
95 wait_context& m_wait_context;
96 sum_node_type* m_parent = nullptr;
97public:
98 small_object_allocator m_allocator;
99 final_sum( Body& body, wait_context& w_o, small_object_allocator& alloc ) :
100 m_body(body, split()), m_wait_context(w_o), m_allocator(alloc) {
101 poison_pointer(m_stuff_last);
102 }
103
104 final_sum( final_sum& sum, small_object_allocator& alloc ) :
105 m_body(sum.m_body, split()), m_wait_context(sum.m_wait_context), m_allocator(alloc) {
106 poison_pointer(m_stuff_last);
107 }
108
109 ~final_sum() {
110 m_range.begin()->~Range();
111 }
112 void finish_construction( sum_node_type* parent, const Range& range, Body* stuff_last ) {
113 __TBB_ASSERT( m_parent == nullptr, nullptr );
114 m_parent = parent;
115 new( m_range.begin() ) Range(range);
116 m_stuff_last = stuff_last;
117 }
118private:
119 sum_node_type* release_parent() {
120 call_itt_task_notify(releasing, m_parent);
121 if (m_parent) {
122 auto parent = m_parent;
123 m_parent = nullptr;
124 if (parent->ref_count.fetch_sub(1) == 1) {
125 return parent;
126 }
127 }
128 else
129 m_wait_context.release();
130 return nullptr;
131 }
132 sum_node_type* finalize(const execution_data& ed){
133 sum_node_type* next_task = release_parent();
134 m_allocator.delete_object<final_sum>(this, ed);
135 return next_task;
136 }
137
138public:
139 task* execute(execution_data& ed) override {
140 m_body( *m_range.begin(), final_scan_tag() );
141 if( m_stuff_last )
142 m_stuff_last->assign(m_body);
143
144 return finalize(ed);
145 }
146 task* cancel(execution_data& ed) override {
147 return finalize(ed);
148 }
149 template<typename Tag>
150 void operator()( const Range& r, Tag tag ) {
151 m_body( r, tag );
152 }
153 void reverse_join( final_sum& a ) {
154 m_body.reverse_join(a.m_body);
155 }
156 void reverse_join( Body& body ) {
157 m_body.reverse_join(body);
158 }
159 void assign_to( Body& body ) {
160 body.assign(m_body);
161 }
162 void self_destroy(const execution_data& ed) {
163 m_allocator.delete_object<final_sum>(this, ed);
164 }
165};
166
167//! Split work to be done in the scan.
168/** @ingroup algorithms */
169template<typename Range, typename Body>
170struct sum_node : public task {
171private:
172 using final_sum_type = final_sum<Range,Body>;
173public:
174 final_sum_type *m_incoming;
175 final_sum_type *m_body;
176 Body *m_stuff_last;
177private:
178 final_sum_type *m_left_sum;
179 sum_node *m_left;
180 sum_node *m_right;
181 bool m_left_is_final;
182 Range m_range;
183 wait_context& m_wait_context;
184 sum_node* m_parent;
185 small_object_allocator m_allocator;
186public:
187 std::atomic<unsigned int> ref_count{0};
188 sum_node( const Range range, bool left_is_final_, sum_node* parent, wait_context& w_o, small_object_allocator& alloc ) :
189 m_stuff_last(nullptr),
190 m_left_sum(nullptr),
191 m_left(nullptr),
192 m_right(nullptr),
193 m_left_is_final(left_is_final_),
194 m_range(range),
195 m_wait_context(w_o),
196 m_parent(parent),
197 m_allocator(alloc)
198 {
199 if( m_parent )
200 m_parent->ref_count.fetch_add(i: 1);
201 // Poison fields that will be set by second pass.
202 poison_pointer(m_body);
203 poison_pointer(m_incoming);
204 }
205
206 ~sum_node() {
207 if (m_parent)
208 m_parent->ref_count.fetch_sub(i: 1);
209 }
210private:
211 sum_node* release_parent() {
212 call_itt_task_notify(releasing, m_parent);
213 if (m_parent) {
214 auto parent = m_parent;
215 m_parent = nullptr;
216 if (parent->ref_count.fetch_sub(1) == 1) {
217 return parent;
218 }
219 }
220 else
221 m_wait_context.release();
222 return nullptr;
223 }
224 task* create_child( const Range& range, final_sum_type& body, sum_node* child, final_sum_type* incoming, Body* stuff_last ) {
225 if( child ) {
226 __TBB_ASSERT( is_poisoned(child->m_body) && is_poisoned(child->m_incoming), nullptr );
227 child->prepare_for_execution(body, incoming, stuff_last);
228 return child;
229 } else {
230 body.finish_construction(this, range, stuff_last);
231 return &body;
232 }
233 }
234
235 sum_node* finalize(const execution_data& ed) {
236 sum_node* next_task = release_parent();
237 m_allocator.delete_object<sum_node>(this, ed);
238 return next_task;
239 }
240
241public:
242 void prepare_for_execution(final_sum_type& body, final_sum_type* incoming, Body *stuff_last) {
243 this->m_body = &body;
244 this->m_incoming = incoming;
245 this->m_stuff_last = stuff_last;
246 }
247 task* execute(execution_data& ed) override {
248 if( m_body ) {
249 if( m_incoming )
250 m_left_sum->reverse_join( *m_incoming );
251 task* right_child = this->create_child(range: Range(m_range,split()), body&: *m_left_sum, child: m_right, incoming: m_left_sum, stuff_last: m_stuff_last);
252 task* left_child = m_left_is_final ? nullptr : this->create_child(range: m_range, body&: *m_body, child: m_left, incoming: m_incoming, stuff_last: nullptr);
253 ref_count = (left_child != nullptr) + (right_child != nullptr);
254 m_body = nullptr;
255 if( left_child ) {
256 spawn(t&: *right_child, ctx&: *ed.context);
257 return left_child;
258 } else {
259 return right_child;
260 }
261 } else {
262 return finalize(ed);
263 }
264 }
265 task* cancel(execution_data& ed) override {
266 return finalize(ed);
267 }
268 void self_destroy(const execution_data& ed) {
269 m_allocator.delete_object<sum_node>(this, ed);
270 }
271 template<typename range,typename body,typename partitioner>
272 friend struct start_scan;
273
274 template<typename range,typename body>
275 friend struct finish_scan;
276};
277
278//! Combine partial results
279/** @ingroup algorithms */
280template<typename Range, typename Body>
281struct finish_scan : public task {
282private:
283 using sum_node_type = sum_node<Range,Body>;
284 using final_sum_type = final_sum<Range,Body>;
285 final_sum_type** const m_sum_slot;
286 sum_node_type*& m_return_slot;
287 small_object_allocator m_allocator;
288public:
289 std::atomic<final_sum_type*> m_right_zombie;
290 sum_node_type& m_result;
291 std::atomic<unsigned int> ref_count{2};
292 finish_scan* m_parent;
293 wait_context& m_wait_context;
294 task* execute(execution_data& ed) override {
295 __TBB_ASSERT( m_result.ref_count.load() == static_cast<unsigned int>((m_result.m_left!=nullptr)+(m_result.m_right!=nullptr)), nullptr );
296 if( m_result.m_left )
297 m_result.m_left_is_final = false;
298 final_sum_type* right_zombie = m_right_zombie.load(std::memory_order_acquire);
299 if( right_zombie && m_sum_slot )
300 (*m_sum_slot)->reverse_join(*m_result.m_left_sum);
301 __TBB_ASSERT( !m_return_slot, nullptr );
302 if( right_zombie || m_result.m_right ) {
303 m_return_slot = &m_result;
304 } else {
305 m_result.self_destroy(ed);
306 }
307 if( right_zombie && !m_sum_slot && !m_result.m_right ) {
308 right_zombie->self_destroy(ed);
309 m_right_zombie.store(nullptr, std::memory_order_relaxed);
310 }
311 return finalize(ed);
312 }
313 task* cancel(execution_data& ed) override {
314 return finalize(ed);
315 }
316 finish_scan(sum_node_type*& return_slot, final_sum_type** sum, sum_node_type& result_, finish_scan* parent, wait_context& w_o, small_object_allocator& alloc) :
317 m_sum_slot(sum),
318 m_return_slot(return_slot),
319 m_allocator(alloc),
320 m_right_zombie(nullptr),
321 m_result(result_),
322 m_parent(parent),
323 m_wait_context(w_o)
324 {
325 __TBB_ASSERT( !m_return_slot, nullptr );
326 }
327private:
328 finish_scan* release_parent() {
329 call_itt_task_notify(releasing, m_parent);
330 if (m_parent) {
331 auto parent = m_parent;
332 m_parent = nullptr;
333 if (parent->ref_count.fetch_sub(1) == 1) {
334 return parent;
335 }
336 }
337 else
338 m_wait_context.release();
339 return nullptr;
340 }
341 finish_scan* finalize(const execution_data& ed) {
342 finish_scan* next_task = release_parent();
343 m_allocator.delete_object<finish_scan>(this, ed);
344 return next_task;
345 }
346};
347
348//! Initial task to split the work
349/** @ingroup algorithms */
350template<typename Range, typename Body, typename Partitioner>
351struct start_scan : public task {
352private:
353 using sum_node_type = sum_node<Range,Body>;
354 using final_sum_type = final_sum<Range,Body>;
355 using finish_pass1_type = finish_scan<Range,Body>;
356 std::reference_wrapper<sum_node_type*> m_return_slot;
357 Range m_range;
358 std::reference_wrapper<final_sum_type> m_body;
359 typename Partitioner::partition_type m_partition;
360 /** Non-null if caller is requesting total. */
361 final_sum_type** m_sum_slot;
362 bool m_is_final;
363 bool m_is_right_child;
364
365 finish_pass1_type* m_parent;
366 small_object_allocator m_allocator;
367 wait_context& m_wait_context;
368
369 finish_pass1_type* release_parent() {
370 call_itt_task_notify(releasing, m_parent);
371 if (m_parent) {
372 auto parent = m_parent;
373 m_parent = nullptr;
374 if (parent->ref_count.fetch_sub(1) == 1) {
375 return parent;
376 }
377 }
378 else
379 m_wait_context.release();
380 return nullptr;
381 }
382
383 finish_pass1_type* finalize( const execution_data& ed ) {
384 finish_pass1_type* next_task = release_parent();
385 m_allocator.delete_object<start_scan>(this, ed);
386 return next_task;
387 }
388
389public:
390 task* execute( execution_data& ) override;
391 task* cancel( execution_data& ed ) override {
392 return finalize(ed);
393 }
394 start_scan( sum_node_type*& return_slot, start_scan& parent, small_object_allocator& alloc ) :
395 m_return_slot(return_slot),
396 m_range(parent.m_range,split()),
397 m_body(parent.m_body),
398 m_partition(parent.m_partition,split()),
399 m_sum_slot(parent.m_sum_slot),
400 m_is_final(parent.m_is_final),
401 m_is_right_child(true),
402 m_parent(parent.m_parent),
403 m_allocator(alloc),
404 m_wait_context(parent.m_wait_context)
405 {
406 __TBB_ASSERT( !m_return_slot, nullptr );
407 parent.m_is_right_child = false;
408 }
409
410 start_scan( sum_node_type*& return_slot, const Range& range, final_sum_type& body, const Partitioner& partitioner, wait_context& w_o, small_object_allocator& alloc ) :
411 m_return_slot(return_slot),
412 m_range(range),
413 m_body(body),
414 m_partition(partitioner),
415 m_sum_slot(nullptr),
416 m_is_final(true),
417 m_is_right_child(false),
418 m_parent(nullptr),
419 m_allocator(alloc),
420 m_wait_context(w_o)
421 {
422 __TBB_ASSERT( !m_return_slot, nullptr );
423 }
424
425 static void run( const Range& range, Body& body, const Partitioner& partitioner ) {
426 if( !range.empty() ) {
427 task_group_context context(PARALLEL_SCAN);
428
429 using start_pass1_type = start_scan<Range,Body,Partitioner>;
430 sum_node_type* root = nullptr;
431 wait_context w_ctx{1};
432 small_object_allocator alloc{};
433
434 auto& temp_body = *alloc.new_object<final_sum_type>(body, w_ctx, alloc);
435 temp_body.reverse_join(body);
436
437 auto& pass1 = *alloc.new_object<start_pass1_type>(/*m_return_slot=*/root, range, temp_body, partitioner, w_ctx, alloc);
438
439 execute_and_wait(pass1, context, w_ctx, context);
440 if( root ) {
441 root->prepare_for_execution(temp_body, nullptr, &body);
442 w_ctx.reserve();
443 execute_and_wait(*root, context, w_ctx, context);
444 } else {
445 temp_body.assign_to(body);
446 temp_body.finish_construction(nullptr, range, nullptr);
447 alloc.delete_object<final_sum_type>(&temp_body);
448 }
449 }
450 }
451};
452
453template<typename Range, typename Body, typename Partitioner>
454task* start_scan<Range,Body,Partitioner>::execute( execution_data& ed ) {
455 // Inspecting m_parent->result.left_sum would ordinarily be a race condition.
456 // But we inspect it only if we are not a stolen task, in which case we
457 // know that task assigning to m_parent->result.left_sum has completed.
458 __TBB_ASSERT(!m_is_right_child || m_parent, "right child is never an orphan");
459 bool treat_as_stolen = m_is_right_child && (is_stolen(ed) || &m_body.get()!=m_parent->m_result.m_left_sum);
460 if( treat_as_stolen ) {
461 // Invocation is for right child that has been really stolen or needs to be virtually stolen
462 small_object_allocator alloc{};
463 final_sum_type* right_zombie = alloc.new_object<final_sum_type>(m_body, alloc);
464 m_parent->m_right_zombie.store(right_zombie, std::memory_order_release);
465 m_body = *right_zombie;
466 m_is_final = false;
467 }
468 task* next_task = nullptr;
469 if( (m_is_right_child && !treat_as_stolen) || !m_range.is_divisible() || m_partition.should_execute_range(ed) ) {
470 if( m_is_final )
471 m_body(m_range, final_scan_tag());
472 else if( m_sum_slot )
473 m_body(m_range, pre_scan_tag());
474 if( m_sum_slot )
475 *m_sum_slot = &m_body.get();
476 __TBB_ASSERT( !m_return_slot, nullptr );
477
478 next_task = finalize(ed);
479 } else {
480 small_object_allocator alloc{};
481 auto result = alloc.new_object<sum_node_type>(m_range,/*m_left_is_final=*/m_is_final, m_parent? &m_parent->m_result: nullptr, m_wait_context, alloc);
482
483 auto new_parent = alloc.new_object<finish_pass1_type>(m_return_slot, m_sum_slot, *result, m_parent, m_wait_context, alloc);
484 m_parent = new_parent;
485
486 // Split off right child
487 auto& right_child = *alloc.new_object<start_scan>(/*m_return_slot=*/result->m_right, *this, alloc);
488
489 spawn(right_child, *ed.context);
490
491 m_sum_slot = &result->m_left_sum;
492 m_return_slot = result->m_left;
493
494 __TBB_ASSERT( !m_return_slot, nullptr );
495 next_task = this;
496 }
497 return next_task;
498}
499
500template<typename Range, typename Value, typename Scan, typename ReverseJoin>
501class lambda_scan_body {
502 Value m_sum_slot;
503 const Value& identity_element;
504 const Scan& m_scan;
505 const ReverseJoin& m_reverse_join;
506public:
507 void operator=(const lambda_scan_body&) = delete;
508 lambda_scan_body(const lambda_scan_body&) = default;
509
510 lambda_scan_body( const Value& identity, const Scan& scan, const ReverseJoin& rev_join )
511 : m_sum_slot(identity)
512 , identity_element(identity)
513 , m_scan(scan)
514 , m_reverse_join(rev_join) {}
515
516 lambda_scan_body( lambda_scan_body& b, split )
517 : m_sum_slot(b.identity_element)
518 , identity_element(b.identity_element)
519 , m_scan(b.m_scan)
520 , m_reverse_join(b.m_reverse_join) {}
521
522 template<typename Tag>
523 void operator()( const Range& r, Tag tag ) {
524 m_sum_slot = tbb::detail::invoke(m_scan, r, m_sum_slot, tag);
525 }
526
527 void reverse_join( lambda_scan_body& a ) {
528 m_sum_slot = tbb::detail::invoke(m_reverse_join, a.m_sum_slot, m_sum_slot);
529 }
530
531 void assign( lambda_scan_body& b ) {
532 m_sum_slot = b.m_sum_slot;
533 }
534
535 Value result() const {
536 return m_sum_slot;
537 }
538};
539
540// Requirements on Range concept are documented in blocked_range.h
541
542/** \page parallel_scan_body_req Requirements on parallel_scan body
543 Class \c Body implementing the concept of parallel_scan body must define:
544 - \code Body::Body( Body&, split ); \endcode Splitting constructor.
545 Split \c b so that \c this and \c b can accumulate separately
546 - \code Body::~Body(); \endcode Destructor
547 - \code void Body::operator()( const Range& r, pre_scan_tag ); \endcode
548 Preprocess iterations for range \c r
549 - \code void Body::operator()( const Range& r, final_scan_tag ); \endcode
550 Do final processing for iterations of range \c r
551 - \code void Body::reverse_join( Body& a ); \endcode
552 Merge preprocessing state of \c a into \c this, where \c a was
553 created earlier from \c b by b's splitting constructor
554**/
555
556/** \name parallel_scan
557 See also requirements on \ref range_req "Range" and \ref parallel_scan_body_req "parallel_scan Body". **/
558//@{
559
560//! Parallel prefix with default partitioner
561/** @ingroup algorithms **/
562template<typename Range, typename Body>
563 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
564void parallel_scan( const Range& range, Body& body ) {
565 start_scan<Range, Body, auto_partitioner>::run(range,body,__TBB_DEFAULT_PARTITIONER());
566}
567
568//! Parallel prefix with simple_partitioner
569/** @ingroup algorithms **/
570template<typename Range, typename Body>
571 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
572void parallel_scan( const Range& range, Body& body, const simple_partitioner& partitioner ) {
573 start_scan<Range, Body, simple_partitioner>::run(range, body, partitioner);
574}
575
576//! Parallel prefix with auto_partitioner
577/** @ingroup algorithms **/
578template<typename Range, typename Body>
579 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
580void parallel_scan( const Range& range, Body& body, const auto_partitioner& partitioner ) {
581 start_scan<Range,Body,auto_partitioner>::run(range, body, partitioner);
582}
583
584//! Parallel prefix with default partitioner
585/** @ingroup algorithms **/
586template<typename Range, typename Value, typename Scan, typename ReverseJoin>
587 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
588 parallel_scan_combine<ReverseJoin, Value>)
589Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join ) {
590 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
591 parallel_scan(range, body, __TBB_DEFAULT_PARTITIONER());
592 return body.result();
593}
594
595//! Parallel prefix with simple_partitioner
596/** @ingroup algorithms **/
597template<typename Range, typename Value, typename Scan, typename ReverseJoin>
598 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
599 parallel_scan_combine<ReverseJoin, Value>)
600Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join,
601 const simple_partitioner& partitioner ) {
602 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
603 parallel_scan(range, body, partitioner);
604 return body.result();
605}
606
607//! Parallel prefix with auto_partitioner
608/** @ingroup algorithms **/
609template<typename Range, typename Value, typename Scan, typename ReverseJoin>
610 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
611 parallel_scan_combine<ReverseJoin, Value>)
612Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join,
613 const auto_partitioner& partitioner ) {
614 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
615 parallel_scan(range, body, partitioner);
616 return body.result();
617}
618
619} // namespace d1
620} // namespace detail
621
622inline namespace v1 {
623 using detail::d1::parallel_scan;
624 using detail::d1::pre_scan_tag;
625 using detail::d1::final_scan_tag;
626} // namespace v1
627
628} // namespace tbb
629
630#endif /* __TBB_parallel_scan_H */
631

source code of include/oneapi/tbb/parallel_scan.h