1use super::plumbing::*;
2use super::*;
3
4use std::fmt::{self, Debug};
5
6/// `Update` is an iterator that mutates the elements of an
7/// underlying iterator before they are yielded.
8///
9/// This struct is created by the [`update()`] method on [`ParallelIterator`]
10///
11/// [`update()`]: trait.ParallelIterator.html#method.update
12/// [`ParallelIterator`]: trait.ParallelIterator.html
13#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14#[derive(Clone)]
15pub struct Update<I: ParallelIterator, F> {
16 base: I,
17 update_op: F,
18}
19
20impl<I: ParallelIterator + Debug, F> Debug for Update<I, F> {
21 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22 f.debug_struct("Update").field("base", &self.base).finish()
23 }
24}
25
26impl<I, F> Update<I, F>
27where
28 I: ParallelIterator,
29{
30 /// Creates a new `Update` iterator.
31 pub(super) fn new(base: I, update_op: F) -> Self {
32 Update { base, update_op }
33 }
34}
35
36impl<I, F> ParallelIterator for Update<I, F>
37where
38 I: ParallelIterator,
39 F: Fn(&mut I::Item) + Send + Sync,
40{
41 type Item = I::Item;
42
43 fn drive_unindexed<C>(self, consumer: C) -> C::Result
44 where
45 C: UnindexedConsumer<Self::Item>,
46 {
47 let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
48 self.base.drive_unindexed(consumer1)
49 }
50
51 fn opt_len(&self) -> Option<usize> {
52 self.base.opt_len()
53 }
54}
55
56impl<I, F> IndexedParallelIterator for Update<I, F>
57where
58 I: IndexedParallelIterator,
59 F: Fn(&mut I::Item) + Send + Sync,
60{
61 fn drive<C>(self, consumer: C) -> C::Result
62 where
63 C: Consumer<Self::Item>,
64 {
65 let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
66 self.base.drive(consumer1)
67 }
68
69 fn len(&self) -> usize {
70 self.base.len()
71 }
72
73 fn with_producer<CB>(self, callback: CB) -> CB::Output
74 where
75 CB: ProducerCallback<Self::Item>,
76 {
77 return self.base.with_producer(Callback {
78 callback,
79 update_op: self.update_op,
80 });
81
82 struct Callback<CB, F> {
83 callback: CB,
84 update_op: F,
85 }
86
87 impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
88 where
89 CB: ProducerCallback<T>,
90 F: Fn(&mut T) + Send + Sync,
91 {
92 type Output = CB::Output;
93
94 fn callback<P>(self, base: P) -> CB::Output
95 where
96 P: Producer<Item = T>,
97 {
98 let producer = UpdateProducer {
99 base,
100 update_op: &self.update_op,
101 };
102 self.callback.callback(producer)
103 }
104 }
105 }
106}
107
108/// ////////////////////////////////////////////////////////////////////////
109
110struct UpdateProducer<'f, P, F> {
111 base: P,
112 update_op: &'f F,
113}
114
115impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
116where
117 P: Producer,
118 F: Fn(&mut P::Item) + Send + Sync,
119{
120 type Item = P::Item;
121 type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
122
123 fn into_iter(self) -> Self::IntoIter {
124 UpdateSeq {
125 base: self.base.into_iter(),
126 update_op: self.update_op,
127 }
128 }
129
130 fn min_len(&self) -> usize {
131 self.base.min_len()
132 }
133 fn max_len(&self) -> usize {
134 self.base.max_len()
135 }
136
137 fn split_at(self, index: usize) -> (Self, Self) {
138 let (left, right) = self.base.split_at(index);
139 (
140 UpdateProducer {
141 base: left,
142 update_op: self.update_op,
143 },
144 UpdateProducer {
145 base: right,
146 update_op: self.update_op,
147 },
148 )
149 }
150
151 fn fold_with<G>(self, folder: G) -> G
152 where
153 G: Folder<Self::Item>,
154 {
155 let folder1 = UpdateFolder {
156 base: folder,
157 update_op: self.update_op,
158 };
159 self.base.fold_with(folder1).base
160 }
161}
162
163/// ////////////////////////////////////////////////////////////////////////
164/// Consumer implementation
165
166struct UpdateConsumer<'f, C, F> {
167 base: C,
168 update_op: &'f F,
169}
170
171impl<'f, C, F> UpdateConsumer<'f, C, F> {
172 fn new(base: C, update_op: &'f F) -> Self {
173 UpdateConsumer { base, update_op }
174 }
175}
176
177impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
178where
179 C: Consumer<T>,
180 F: Fn(&mut T) + Send + Sync,
181{
182 type Folder = UpdateFolder<'f, C::Folder, F>;
183 type Reducer = C::Reducer;
184 type Result = C::Result;
185
186 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
187 let (left, right, reducer) = self.base.split_at(index);
188 (
189 UpdateConsumer::new(left, self.update_op),
190 UpdateConsumer::new(right, self.update_op),
191 reducer,
192 )
193 }
194
195 fn into_folder(self) -> Self::Folder {
196 UpdateFolder {
197 base: self.base.into_folder(),
198 update_op: self.update_op,
199 }
200 }
201
202 fn full(&self) -> bool {
203 self.base.full()
204 }
205}
206
207impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
208where
209 C: UnindexedConsumer<T>,
210 F: Fn(&mut T) + Send + Sync,
211{
212 fn split_off_left(&self) -> Self {
213 UpdateConsumer::new(self.base.split_off_left(), self.update_op)
214 }
215
216 fn to_reducer(&self) -> Self::Reducer {
217 self.base.to_reducer()
218 }
219}
220
221struct UpdateFolder<'f, C, F> {
222 base: C,
223 update_op: &'f F,
224}
225
226fn apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T {
227 move |mut item| {
228 update_op(&mut item);
229 item
230 }
231}
232
233impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
234where
235 C: Folder<T>,
236 F: Fn(&mut T),
237{
238 type Result = C::Result;
239
240 fn consume(self, mut item: T) -> Self {
241 (self.update_op)(&mut item);
242
243 UpdateFolder {
244 base: self.base.consume(item),
245 update_op: self.update_op,
246 }
247 }
248
249 fn consume_iter<I>(mut self, iter: I) -> Self
250 where
251 I: IntoIterator<Item = T>,
252 {
253 let update_op = self.update_op;
254 self.base = self
255 .base
256 .consume_iter(iter.into_iter().map(apply(update_op)));
257 self
258 }
259
260 fn complete(self) -> C::Result {
261 self.base.complete()
262 }
263
264 fn full(&self) -> bool {
265 self.base.full()
266 }
267}
268
269/// Standard Update adaptor, based on `itertools::adaptors::Update`
270#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
271#[derive(Debug, Clone)]
272struct UpdateSeq<I, F> {
273 base: I,
274 update_op: F,
275}
276
277impl<I, F> Iterator for UpdateSeq<I, F>
278where
279 I: Iterator,
280 F: Fn(&mut I::Item),
281{
282 type Item = I::Item;
283
284 fn next(&mut self) -> Option<Self::Item> {
285 let mut v = self.base.next()?;
286 (self.update_op)(&mut v);
287 Some(v)
288 }
289
290 fn size_hint(&self) -> (usize, Option<usize>) {
291 self.base.size_hint()
292 }
293
294 fn fold<Acc, G>(self, init: Acc, g: G) -> Acc
295 where
296 G: FnMut(Acc, Self::Item) -> Acc,
297 {
298 self.base.map(apply(self.update_op)).fold(init, g)
299 }
300
301 // if possible, re-use inner iterator specializations in collect
302 fn collect<C>(self) -> C
303 where
304 C: ::std::iter::FromIterator<Self::Item>,
305 {
306 self.base.map(apply(self.update_op)).collect()
307 }
308}
309
310impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
311where
312 I: ExactSizeIterator,
313 F: Fn(&mut I::Item),
314{
315}
316
317impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
318where
319 I: DoubleEndedIterator,
320 F: Fn(&mut I::Item),
321{
322 fn next_back(&mut self) -> Option<Self::Item> {
323 let mut v = self.base.next_back()?;
324 (self.update_op)(&mut v);
325 Some(v)
326 }
327}
328