1 | use super::plumbing::*; |
2 | use super::*; |
3 | |
4 | use std::fmt::{self, Debug}; |
5 | |
6 | impl<U, I, ID, F> Fold<I, ID, F> |
7 | where |
8 | I: ParallelIterator, |
9 | F: Fn(U, I::Item) -> U + Sync + Send, |
10 | ID: Fn() -> U + Sync + Send, |
11 | U: Send, |
12 | { |
13 | pub(super) fn new(base: I, identity: ID, fold_op: F) -> Self { |
14 | Fold { |
15 | base, |
16 | identity, |
17 | fold_op, |
18 | } |
19 | } |
20 | } |
21 | |
22 | /// `Fold` is an iterator that applies a function over an iterator producing a single value. |
23 | /// This struct is created by the [`fold()`] method on [`ParallelIterator`] |
24 | /// |
25 | /// [`fold()`]: trait.ParallelIterator.html#method.fold |
26 | /// [`ParallelIterator`]: trait.ParallelIterator.html |
27 | #[must_use = "iterator adaptors are lazy and do nothing unless consumed" ] |
28 | #[derive(Clone)] |
29 | pub struct Fold<I, ID, F> { |
30 | base: I, |
31 | identity: ID, |
32 | fold_op: F, |
33 | } |
34 | |
35 | impl<I: ParallelIterator + Debug, ID, F> Debug for Fold<I, ID, F> { |
36 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
37 | f.debug_struct("Fold" ).field("base" , &self.base).finish() |
38 | } |
39 | } |
40 | |
41 | impl<U, I, ID, F> ParallelIterator for Fold<I, ID, F> |
42 | where |
43 | I: ParallelIterator, |
44 | F: Fn(U, I::Item) -> U + Sync + Send, |
45 | ID: Fn() -> U + Sync + Send, |
46 | U: Send, |
47 | { |
48 | type Item = U; |
49 | |
50 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
51 | where |
52 | C: UnindexedConsumer<Self::Item>, |
53 | { |
54 | let consumer1 = FoldConsumer { |
55 | base: consumer, |
56 | fold_op: &self.fold_op, |
57 | identity: &self.identity, |
58 | }; |
59 | self.base.drive_unindexed(consumer1) |
60 | } |
61 | } |
62 | |
63 | struct FoldConsumer<'c, C, ID, F> { |
64 | base: C, |
65 | fold_op: &'c F, |
66 | identity: &'c ID, |
67 | } |
68 | |
69 | impl<'r, U, T, C, ID, F> Consumer<T> for FoldConsumer<'r, C, ID, F> |
70 | where |
71 | C: Consumer<U>, |
72 | F: Fn(U, T) -> U + Sync, |
73 | ID: Fn() -> U + Sync, |
74 | U: Send, |
75 | { |
76 | type Folder = FoldFolder<'r, C::Folder, U, F>; |
77 | type Reducer = C::Reducer; |
78 | type Result = C::Result; |
79 | |
80 | fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { |
81 | let (left, right, reducer) = self.base.split_at(index); |
82 | ( |
83 | FoldConsumer { base: left, ..self }, |
84 | FoldConsumer { |
85 | base: right, |
86 | ..self |
87 | }, |
88 | reducer, |
89 | ) |
90 | } |
91 | |
92 | fn into_folder(self) -> Self::Folder { |
93 | FoldFolder { |
94 | base: self.base.into_folder(), |
95 | item: (self.identity)(), |
96 | fold_op: self.fold_op, |
97 | } |
98 | } |
99 | |
100 | fn full(&self) -> bool { |
101 | self.base.full() |
102 | } |
103 | } |
104 | |
105 | impl<'r, U, T, C, ID, F> UnindexedConsumer<T> for FoldConsumer<'r, C, ID, F> |
106 | where |
107 | C: UnindexedConsumer<U>, |
108 | F: Fn(U, T) -> U + Sync, |
109 | ID: Fn() -> U + Sync, |
110 | U: Send, |
111 | { |
112 | fn split_off_left(&self) -> Self { |
113 | FoldConsumer { |
114 | base: self.base.split_off_left(), |
115 | ..*self |
116 | } |
117 | } |
118 | |
119 | fn to_reducer(&self) -> Self::Reducer { |
120 | self.base.to_reducer() |
121 | } |
122 | } |
123 | |
124 | struct FoldFolder<'r, C, ID, F> { |
125 | base: C, |
126 | fold_op: &'r F, |
127 | item: ID, |
128 | } |
129 | |
130 | impl<'r, C, ID, F, T> Folder<T> for FoldFolder<'r, C, ID, F> |
131 | where |
132 | C: Folder<ID>, |
133 | F: Fn(ID, T) -> ID + Sync, |
134 | { |
135 | type Result = C::Result; |
136 | |
137 | fn consume(self, item: T) -> Self { |
138 | let item = (self.fold_op)(self.item, item); |
139 | FoldFolder { |
140 | base: self.base, |
141 | fold_op: self.fold_op, |
142 | item, |
143 | } |
144 | } |
145 | |
146 | fn consume_iter<I>(self, iter: I) -> Self |
147 | where |
148 | I: IntoIterator<Item = T>, |
149 | { |
150 | fn not_full<C, ID, T>(base: &C) -> impl Fn(&T) -> bool + '_ |
151 | where |
152 | C: Folder<ID>, |
153 | { |
154 | move |_| !base.full() |
155 | } |
156 | |
157 | let base = self.base; |
158 | let item = iter |
159 | .into_iter() |
160 | // stop iterating if another thread has finished |
161 | .take_while(not_full(&base)) |
162 | .fold(self.item, self.fold_op); |
163 | |
164 | FoldFolder { |
165 | base, |
166 | item, |
167 | fold_op: self.fold_op, |
168 | } |
169 | } |
170 | |
171 | fn complete(self) -> C::Result { |
172 | self.base.consume(self.item).complete() |
173 | } |
174 | |
175 | fn full(&self) -> bool { |
176 | self.base.full() |
177 | } |
178 | } |
179 | |
180 | // /////////////////////////////////////////////////////////////////////////// |
181 | |
182 | impl<U, I, F> FoldWith<I, U, F> |
183 | where |
184 | I: ParallelIterator, |
185 | F: Fn(U, I::Item) -> U + Sync + Send, |
186 | U: Send + Clone, |
187 | { |
188 | pub(super) fn new(base: I, item: U, fold_op: F) -> Self { |
189 | FoldWith { |
190 | base, |
191 | item, |
192 | fold_op, |
193 | } |
194 | } |
195 | } |
196 | |
197 | /// `FoldWith` is an iterator that applies a function over an iterator producing a single value. |
198 | /// This struct is created by the [`fold_with()`] method on [`ParallelIterator`] |
199 | /// |
200 | /// [`fold_with()`]: trait.ParallelIterator.html#method.fold_with |
201 | /// [`ParallelIterator`]: trait.ParallelIterator.html |
202 | #[must_use = "iterator adaptors are lazy and do nothing unless consumed" ] |
203 | #[derive(Clone)] |
204 | pub struct FoldWith<I, U, F> { |
205 | base: I, |
206 | item: U, |
207 | fold_op: F, |
208 | } |
209 | |
210 | impl<I: ParallelIterator + Debug, U: Debug, F> Debug for FoldWith<I, U, F> { |
211 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
212 | f.debug_struct("FoldWith" ) |
213 | .field("base" , &self.base) |
214 | .field("item" , &self.item) |
215 | .finish() |
216 | } |
217 | } |
218 | |
219 | impl<U, I, F> ParallelIterator for FoldWith<I, U, F> |
220 | where |
221 | I: ParallelIterator, |
222 | F: Fn(U, I::Item) -> U + Sync + Send, |
223 | U: Send + Clone, |
224 | { |
225 | type Item = U; |
226 | |
227 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
228 | where |
229 | C: UnindexedConsumer<Self::Item>, |
230 | { |
231 | let consumer1 = FoldWithConsumer { |
232 | base: consumer, |
233 | item: self.item, |
234 | fold_op: &self.fold_op, |
235 | }; |
236 | self.base.drive_unindexed(consumer1) |
237 | } |
238 | } |
239 | |
240 | struct FoldWithConsumer<'c, C, U, F> { |
241 | base: C, |
242 | item: U, |
243 | fold_op: &'c F, |
244 | } |
245 | |
246 | impl<'r, U, T, C, F> Consumer<T> for FoldWithConsumer<'r, C, U, F> |
247 | where |
248 | C: Consumer<U>, |
249 | F: Fn(U, T) -> U + Sync, |
250 | U: Send + Clone, |
251 | { |
252 | type Folder = FoldFolder<'r, C::Folder, U, F>; |
253 | type Reducer = C::Reducer; |
254 | type Result = C::Result; |
255 | |
256 | fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { |
257 | let (left, right, reducer) = self.base.split_at(index); |
258 | ( |
259 | FoldWithConsumer { |
260 | base: left, |
261 | item: self.item.clone(), |
262 | ..self |
263 | }, |
264 | FoldWithConsumer { |
265 | base: right, |
266 | ..self |
267 | }, |
268 | reducer, |
269 | ) |
270 | } |
271 | |
272 | fn into_folder(self) -> Self::Folder { |
273 | FoldFolder { |
274 | base: self.base.into_folder(), |
275 | item: self.item, |
276 | fold_op: self.fold_op, |
277 | } |
278 | } |
279 | |
280 | fn full(&self) -> bool { |
281 | self.base.full() |
282 | } |
283 | } |
284 | |
285 | impl<'r, U, T, C, F> UnindexedConsumer<T> for FoldWithConsumer<'r, C, U, F> |
286 | where |
287 | C: UnindexedConsumer<U>, |
288 | F: Fn(U, T) -> U + Sync, |
289 | U: Send + Clone, |
290 | { |
291 | fn split_off_left(&self) -> Self { |
292 | FoldWithConsumer { |
293 | base: self.base.split_off_left(), |
294 | item: self.item.clone(), |
295 | ..*self |
296 | } |
297 | } |
298 | |
299 | fn to_reducer(&self) -> Self::Reducer { |
300 | self.base.to_reducer() |
301 | } |
302 | } |
303 | |