1use super::plumbing::*;
2use super::ParallelIterator;
3
4use std::iter::{self, Sum};
5use std::marker::PhantomData;
6
7pub(super) fn sum<PI, S>(pi: PI) -> S
8where
9 PI: ParallelIterator,
10 S: Send + Sum<PI::Item> + Sum,
11{
12 pi.drive_unindexed(SumConsumer::new())
13}
14
15fn add<T: Sum>(left: T, right: T) -> T {
16 [left, right].into_iter().sum()
17}
18
19struct SumConsumer<S: Send> {
20 _marker: PhantomData<*const S>,
21}
22
23unsafe impl<S: Send> Send for SumConsumer<S> {}
24
25impl<S: Send> SumConsumer<S> {
26 fn new() -> SumConsumer<S> {
27 SumConsumer {
28 _marker: PhantomData,
29 }
30 }
31}
32
33impl<S, T> Consumer<T> for SumConsumer<S>
34where
35 S: Send + Sum<T> + Sum,
36{
37 type Folder = SumFolder<S>;
38 type Reducer = Self;
39 type Result = S;
40
41 fn split_at(self, _index: usize) -> (Self, Self, Self) {
42 (SumConsumer::new(), SumConsumer::new(), SumConsumer::new())
43 }
44
45 fn into_folder(self) -> Self::Folder {
46 SumFolder {
47 sum: iter::empty::<T>().sum(),
48 }
49 }
50
51 fn full(&self) -> bool {
52 false
53 }
54}
55
56impl<S, T> UnindexedConsumer<T> for SumConsumer<S>
57where
58 S: Send + Sum<T> + Sum,
59{
60 fn split_off_left(&self) -> Self {
61 SumConsumer::new()
62 }
63
64 fn to_reducer(&self) -> Self::Reducer {
65 SumConsumer::new()
66 }
67}
68
69impl<S> Reducer<S> for SumConsumer<S>
70where
71 S: Send + Sum,
72{
73 fn reduce(self, left: S, right: S) -> S {
74 add(left, right)
75 }
76}
77
78struct SumFolder<S> {
79 sum: S,
80}
81
82impl<S, T> Folder<T> for SumFolder<S>
83where
84 S: Sum<T> + Sum,
85{
86 type Result = S;
87
88 fn consume(self, item: T) -> Self {
89 SumFolder {
90 sum: add(self.sum, iter::once(item).sum()),
91 }
92 }
93
94 fn consume_iter<I>(self, iter: I) -> Self
95 where
96 I: IntoIterator<Item = T>,
97 {
98 SumFolder {
99 sum: add(self.sum, iter.into_iter().sum()),
100 }
101 }
102
103 fn complete(self) -> S {
104 self.sum
105 }
106
107 fn full(&self) -> bool {
108 false
109 }
110}
111