1use super::plumbing::*;
2use super::ParallelIterator;
3use super::Try;
4
5use std::ops::ControlFlow::{self, Break, Continue};
6use std::sync::atomic::{AtomicBool, Ordering};
7
8pub(super) fn try_reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T
9where
10 PI: ParallelIterator<Item = T>,
11 R: Fn(T::Output, T::Output) -> T + Sync,
12 ID: Fn() -> T::Output + Sync,
13 T: Try + Send,
14{
15 let full: AtomicBool = AtomicBool::new(false);
16 let consumer: TryReduceConsumer<'_, R, …> = TryReduceConsumer {
17 identity: &identity,
18 reduce_op: &reduce_op,
19 full: &full,
20 };
21 pi.drive_unindexed(consumer)
22}
23
24struct TryReduceConsumer<'r, R, ID> {
25 identity: &'r ID,
26 reduce_op: &'r R,
27 full: &'r AtomicBool,
28}
29
30impl<'r, R, ID> Copy for TryReduceConsumer<'r, R, ID> {}
31
32impl<'r, R, ID> Clone for TryReduceConsumer<'r, R, ID> {
33 fn clone(&self) -> Self {
34 *self
35 }
36}
37
38impl<'r, R, ID, T> Consumer<T> for TryReduceConsumer<'r, R, ID>
39where
40 R: Fn(T::Output, T::Output) -> T + Sync,
41 ID: Fn() -> T::Output + Sync,
42 T: Try + Send,
43{
44 type Folder = TryReduceFolder<'r, R, T>;
45 type Reducer = Self;
46 type Result = T;
47
48 fn split_at(self, _index: usize) -> (Self, Self, Self) {
49 (self, self, self)
50 }
51
52 fn into_folder(self) -> Self::Folder {
53 TryReduceFolder {
54 reduce_op: self.reduce_op,
55 control: Continue((self.identity)()),
56 full: self.full,
57 }
58 }
59
60 fn full(&self) -> bool {
61 self.full.load(order:Ordering::Relaxed)
62 }
63}
64
65impl<'r, R, ID, T> UnindexedConsumer<T> for TryReduceConsumer<'r, R, ID>
66where
67 R: Fn(T::Output, T::Output) -> T + Sync,
68 ID: Fn() -> T::Output + Sync,
69 T: Try + Send,
70{
71 fn split_off_left(&self) -> Self {
72 *self
73 }
74
75 fn to_reducer(&self) -> Self::Reducer {
76 *self
77 }
78}
79
80impl<'r, R, ID, T> Reducer<T> for TryReduceConsumer<'r, R, ID>
81where
82 R: Fn(T::Output, T::Output) -> T + Sync,
83 T: Try,
84{
85 fn reduce(self, left: T, right: T) -> T {
86 match (left.branch(), right.branch()) {
87 (Continue(left: ::Output), Continue(right: ::Output)) => (self.reduce_op)(left, right),
88 (Break(r: ::Residual), _) | (_, Break(r: ::Residual)) => T::from_residual(r),
89 }
90 }
91}
92
93struct TryReduceFolder<'r, R, T: Try> {
94 reduce_op: &'r R,
95 control: ControlFlow<T::Residual, T::Output>,
96 full: &'r AtomicBool,
97}
98
99impl<'r, R, T> Folder<T> for TryReduceFolder<'r, R, T>
100where
101 R: Fn(T::Output, T::Output) -> T,
102 T: Try,
103{
104 type Result = T;
105
106 fn consume(mut self, item: T) -> Self {
107 let reduce_op = self.reduce_op;
108 self.control = match (self.control, item.branch()) {
109 (Continue(left), Continue(right)) => reduce_op(left, right).branch(),
110 (control @ Break(_), _) | (_, control @ Break(_)) => control,
111 };
112 if let Break(_) = self.control {
113 self.full.store(true, Ordering::Relaxed);
114 }
115 self
116 }
117
118 fn complete(self) -> T {
119 match self.control {
120 Continue(c) => T::from_output(c),
121 Break(r) => T::from_residual(r),
122 }
123 }
124
125 fn full(&self) -> bool {
126 match self.control {
127 Break(_) => true,
128 _ => self.full.load(Ordering::Relaxed),
129 }
130 }
131}
132