1use super::plumbing::*;
2use super::ParallelIterator;
3
4pub(super) fn reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T
5where
6 PI: ParallelIterator<Item = T>,
7 R: Fn(T, T) -> T + Sync,
8 ID: Fn() -> T + Sync,
9 T: Send,
10{
11 let consumer: ReduceConsumer<'_, R, ID> = ReduceConsumer {
12 identity: &identity,
13 reduce_op: &reduce_op,
14 };
15 pi.drive_unindexed(consumer)
16}
17
18struct ReduceConsumer<'r, R, ID> {
19 identity: &'r ID,
20 reduce_op: &'r R,
21}
22
23impl<'r, R, ID> Copy for ReduceConsumer<'r, R, ID> {}
24
25impl<'r, R, ID> Clone for ReduceConsumer<'r, R, ID> {
26 fn clone(&self) -> Self {
27 *self
28 }
29}
30
31impl<'r, R, ID, T> Consumer<T> for ReduceConsumer<'r, R, ID>
32where
33 R: Fn(T, T) -> T + Sync,
34 ID: Fn() -> T + Sync,
35 T: Send,
36{
37 type Folder = ReduceFolder<'r, R, T>;
38 type Reducer = Self;
39 type Result = T;
40
41 fn split_at(self, _index: usize) -> (Self, Self, Self) {
42 (self, self, self)
43 }
44
45 fn into_folder(self) -> Self::Folder {
46 ReduceFolder {
47 reduce_op: self.reduce_op,
48 item: (self.identity)(),
49 }
50 }
51
52 fn full(&self) -> bool {
53 false
54 }
55}
56
57impl<'r, R, ID, T> UnindexedConsumer<T> for ReduceConsumer<'r, R, ID>
58where
59 R: Fn(T, T) -> T + Sync,
60 ID: Fn() -> T + Sync,
61 T: Send,
62{
63 fn split_off_left(&self) -> Self {
64 *self
65 }
66
67 fn to_reducer(&self) -> Self::Reducer {
68 *self
69 }
70}
71
72impl<'r, R, ID, T> Reducer<T> for ReduceConsumer<'r, R, ID>
73where
74 R: Fn(T, T) -> T + Sync,
75{
76 fn reduce(self, left: T, right: T) -> T {
77 (self.reduce_op)(left, right)
78 }
79}
80
81struct ReduceFolder<'r, R, T> {
82 reduce_op: &'r R,
83 item: T,
84}
85
86impl<'r, R, T> Folder<T> for ReduceFolder<'r, R, T>
87where
88 R: Fn(T, T) -> T,
89{
90 type Result = T;
91
92 fn consume(self, item: T) -> Self {
93 ReduceFolder {
94 reduce_op: self.reduce_op,
95 item: (self.reduce_op)(self.item, item),
96 }
97 }
98
99 fn consume_iter<I>(self, iter: I) -> Self
100 where
101 I: IntoIterator<Item = T>,
102 {
103 ReduceFolder {
104 reduce_op: self.reduce_op,
105 item: iter.into_iter().fold(self.item, self.reduce_op),
106 }
107 }
108
109 fn complete(self) -> T {
110 self.item
111 }
112
113 fn full(&self) -> bool {
114 false
115 }
116}
117