1use super::plumbing::*;
2use super::*;
3
4use std::fmt::{self, Debug};
5
6impl<U, I, ID, F> Fold<I, ID, F>
7where
8 I: ParallelIterator,
9 F: Fn(U, I::Item) -> U + Sync + Send,
10 ID: Fn() -> U + Sync + Send,
11 U: Send,
12{
13 pub(super) fn new(base: I, identity: ID, fold_op: F) -> Self {
14 Fold {
15 base,
16 identity,
17 fold_op,
18 }
19 }
20}
21
22/// `Fold` is an iterator that applies a function over an iterator producing a single value.
23/// This struct is created by the [`fold()`] method on [`ParallelIterator`]
24///
25/// [`fold()`]: trait.ParallelIterator.html#method.fold
26/// [`ParallelIterator`]: trait.ParallelIterator.html
27#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
28#[derive(Clone)]
29pub struct Fold<I, ID, F> {
30 base: I,
31 identity: ID,
32 fold_op: F,
33}
34
35impl<I: ParallelIterator + Debug, ID, F> Debug for Fold<I, ID, F> {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 f.debug_struct("Fold").field("base", &self.base).finish()
38 }
39}
40
41impl<U, I, ID, F> ParallelIterator for Fold<I, ID, F>
42where
43 I: ParallelIterator,
44 F: Fn(U, I::Item) -> U + Sync + Send,
45 ID: Fn() -> U + Sync + Send,
46 U: Send,
47{
48 type Item = U;
49
50 fn drive_unindexed<C>(self, consumer: C) -> C::Result
51 where
52 C: UnindexedConsumer<Self::Item>,
53 {
54 let consumer1 = FoldConsumer {
55 base: consumer,
56 fold_op: &self.fold_op,
57 identity: &self.identity,
58 };
59 self.base.drive_unindexed(consumer1)
60 }
61}
62
63struct FoldConsumer<'c, C, ID, F> {
64 base: C,
65 fold_op: &'c F,
66 identity: &'c ID,
67}
68
69impl<'r, U, T, C, ID, F> Consumer<T> for FoldConsumer<'r, C, ID, F>
70where
71 C: Consumer<U>,
72 F: Fn(U, T) -> U + Sync,
73 ID: Fn() -> U + Sync,
74 U: Send,
75{
76 type Folder = FoldFolder<'r, C::Folder, U, F>;
77 type Reducer = C::Reducer;
78 type Result = C::Result;
79
80 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
81 let (left, right, reducer) = self.base.split_at(index);
82 (
83 FoldConsumer { base: left, ..self },
84 FoldConsumer {
85 base: right,
86 ..self
87 },
88 reducer,
89 )
90 }
91
92 fn into_folder(self) -> Self::Folder {
93 FoldFolder {
94 base: self.base.into_folder(),
95 item: (self.identity)(),
96 fold_op: self.fold_op,
97 }
98 }
99
100 fn full(&self) -> bool {
101 self.base.full()
102 }
103}
104
105impl<'r, U, T, C, ID, F> UnindexedConsumer<T> for FoldConsumer<'r, C, ID, F>
106where
107 C: UnindexedConsumer<U>,
108 F: Fn(U, T) -> U + Sync,
109 ID: Fn() -> U + Sync,
110 U: Send,
111{
112 fn split_off_left(&self) -> Self {
113 FoldConsumer {
114 base: self.base.split_off_left(),
115 ..*self
116 }
117 }
118
119 fn to_reducer(&self) -> Self::Reducer {
120 self.base.to_reducer()
121 }
122}
123
124struct FoldFolder<'r, C, ID, F> {
125 base: C,
126 fold_op: &'r F,
127 item: ID,
128}
129
130impl<'r, C, ID, F, T> Folder<T> for FoldFolder<'r, C, ID, F>
131where
132 C: Folder<ID>,
133 F: Fn(ID, T) -> ID + Sync,
134{
135 type Result = C::Result;
136
137 fn consume(self, item: T) -> Self {
138 let item = (self.fold_op)(self.item, item);
139 FoldFolder {
140 base: self.base,
141 fold_op: self.fold_op,
142 item,
143 }
144 }
145
146 fn consume_iter<I>(self, iter: I) -> Self
147 where
148 I: IntoIterator<Item = T>,
149 {
150 fn not_full<C, ID, T>(base: &C) -> impl Fn(&T) -> bool + '_
151 where
152 C: Folder<ID>,
153 {
154 move |_| !base.full()
155 }
156
157 let base = self.base;
158 let item = iter
159 .into_iter()
160 // stop iterating if another thread has finished
161 .take_while(not_full(&base))
162 .fold(self.item, self.fold_op);
163
164 FoldFolder {
165 base,
166 item,
167 fold_op: self.fold_op,
168 }
169 }
170
171 fn complete(self) -> C::Result {
172 self.base.consume(self.item).complete()
173 }
174
175 fn full(&self) -> bool {
176 self.base.full()
177 }
178}
179
180// ///////////////////////////////////////////////////////////////////////////
181
182impl<U, I, F> FoldWith<I, U, F>
183where
184 I: ParallelIterator,
185 F: Fn(U, I::Item) -> U + Sync + Send,
186 U: Send + Clone,
187{
188 pub(super) fn new(base: I, item: U, fold_op: F) -> Self {
189 FoldWith {
190 base,
191 item,
192 fold_op,
193 }
194 }
195}
196
197/// `FoldWith` is an iterator that applies a function over an iterator producing a single value.
198/// This struct is created by the [`fold_with()`] method on [`ParallelIterator`]
199///
200/// [`fold_with()`]: trait.ParallelIterator.html#method.fold_with
201/// [`ParallelIterator`]: trait.ParallelIterator.html
202#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
203#[derive(Clone)]
204pub struct FoldWith<I, U, F> {
205 base: I,
206 item: U,
207 fold_op: F,
208}
209
210impl<I: ParallelIterator + Debug, U: Debug, F> Debug for FoldWith<I, U, F> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 f.debug_struct("FoldWith")
213 .field("base", &self.base)
214 .field("item", &self.item)
215 .finish()
216 }
217}
218
219impl<U, I, F> ParallelIterator for FoldWith<I, U, F>
220where
221 I: ParallelIterator,
222 F: Fn(U, I::Item) -> U + Sync + Send,
223 U: Send + Clone,
224{
225 type Item = U;
226
227 fn drive_unindexed<C>(self, consumer: C) -> C::Result
228 where
229 C: UnindexedConsumer<Self::Item>,
230 {
231 let consumer1 = FoldWithConsumer {
232 base: consumer,
233 item: self.item,
234 fold_op: &self.fold_op,
235 };
236 self.base.drive_unindexed(consumer1)
237 }
238}
239
240struct FoldWithConsumer<'c, C, U, F> {
241 base: C,
242 item: U,
243 fold_op: &'c F,
244}
245
246impl<'r, U, T, C, F> Consumer<T> for FoldWithConsumer<'r, C, U, F>
247where
248 C: Consumer<U>,
249 F: Fn(U, T) -> U + Sync,
250 U: Send + Clone,
251{
252 type Folder = FoldFolder<'r, C::Folder, U, F>;
253 type Reducer = C::Reducer;
254 type Result = C::Result;
255
256 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
257 let (left, right, reducer) = self.base.split_at(index);
258 (
259 FoldWithConsumer {
260 base: left,
261 item: self.item.clone(),
262 ..self
263 },
264 FoldWithConsumer {
265 base: right,
266 ..self
267 },
268 reducer,
269 )
270 }
271
272 fn into_folder(self) -> Self::Folder {
273 FoldFolder {
274 base: self.base.into_folder(),
275 item: self.item,
276 fold_op: self.fold_op,
277 }
278 }
279
280 fn full(&self) -> bool {
281 self.base.full()
282 }
283}
284
285impl<'r, U, T, C, F> UnindexedConsumer<T> for FoldWithConsumer<'r, C, U, F>
286where
287 C: UnindexedConsumer<U>,
288 F: Fn(U, T) -> U + Sync,
289 U: Send + Clone,
290{
291 fn split_off_left(&self) -> Self {
292 FoldWithConsumer {
293 base: self.base.split_off_left(),
294 item: self.item.clone(),
295 ..*self
296 }
297 }
298
299 fn to_reducer(&self) -> Self::Reducer {
300 self.base.to_reducer()
301 }
302}
303