1 | use super::noop::*; |
2 | use super::plumbing::*; |
3 | use super::ParallelIterator; |
4 | |
5 | pub(super) fn for_each<I, F, T>(pi: I, op: &F) |
6 | where |
7 | I: ParallelIterator<Item = T>, |
8 | F: Fn(T) + Sync, |
9 | T: Send, |
10 | { |
11 | let consumer: ForEachConsumer<'_, F> = ForEachConsumer { op }; |
12 | pi.drive_unindexed(consumer) |
13 | } |
14 | |
15 | struct ForEachConsumer<'f, F> { |
16 | op: &'f F, |
17 | } |
18 | |
19 | impl<'f, F, T> Consumer<T> for ForEachConsumer<'f, F> |
20 | where |
21 | F: Fn(T) + Sync, |
22 | { |
23 | type Folder = ForEachConsumer<'f, F>; |
24 | type Reducer = NoopReducer; |
25 | type Result = (); |
26 | |
27 | fn split_at(self, _index: usize) -> (Self, Self, NoopReducer) { |
28 | (self.split_off_left(), self, NoopReducer) |
29 | } |
30 | |
31 | fn into_folder(self) -> Self { |
32 | self |
33 | } |
34 | |
35 | fn full(&self) -> bool { |
36 | false |
37 | } |
38 | } |
39 | |
40 | impl<'f, F, T> Folder<T> for ForEachConsumer<'f, F> |
41 | where |
42 | F: Fn(T) + Sync, |
43 | { |
44 | type Result = (); |
45 | |
46 | fn consume(self, item: T) -> Self { |
47 | (self.op)(item); |
48 | self |
49 | } |
50 | |
51 | fn consume_iter<I>(self, iter: I) -> Self |
52 | where |
53 | I: IntoIterator<Item = T>, |
54 | { |
55 | iter.into_iter().for_each(self.op); |
56 | self |
57 | } |
58 | |
59 | fn complete(self) {} |
60 | |
61 | fn full(&self) -> bool { |
62 | false |
63 | } |
64 | } |
65 | |
66 | impl<'f, F, T> UnindexedConsumer<T> for ForEachConsumer<'f, F> |
67 | where |
68 | F: Fn(T) + Sync, |
69 | { |
70 | fn split_off_left(&self) -> Self { |
71 | ForEachConsumer { op: self.op } |
72 | } |
73 | |
74 | fn to_reducer(&self) -> NoopReducer { |
75 | NoopReducer |
76 | } |
77 | } |
78 | |