1 | use super::plumbing::*; |
2 | use super::*; |
3 | use std::sync::atomic::{AtomicUsize, Ordering}; |
4 | |
5 | /// `TakeAny` is an iterator that iterates over `n` elements from anywhere in `I`. |
6 | /// This struct is created by the [`take_any()`] method on [`ParallelIterator`] |
7 | /// |
8 | /// [`take_any()`]: trait.ParallelIterator.html#method.take_any |
9 | /// [`ParallelIterator`]: trait.ParallelIterator.html |
10 | #[must_use = "iterator adaptors are lazy and do nothing unless consumed" ] |
11 | #[derive(Clone, Debug)] |
12 | pub struct TakeAny<I: ParallelIterator> { |
13 | base: I, |
14 | count: usize, |
15 | } |
16 | |
17 | impl<I> TakeAny<I> |
18 | where |
19 | I: ParallelIterator, |
20 | { |
21 | /// Creates a new `TakeAny` iterator. |
22 | pub(super) fn new(base: I, count: usize) -> Self { |
23 | TakeAny { base, count } |
24 | } |
25 | } |
26 | |
27 | impl<I> ParallelIterator for TakeAny<I> |
28 | where |
29 | I: ParallelIterator, |
30 | { |
31 | type Item = I::Item; |
32 | |
33 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
34 | where |
35 | C: UnindexedConsumer<Self::Item>, |
36 | { |
37 | let consumer1 = TakeAnyConsumer { |
38 | base: consumer, |
39 | count: &AtomicUsize::new(self.count), |
40 | }; |
41 | self.base.drive_unindexed(consumer1) |
42 | } |
43 | } |
44 | |
45 | /// //////////////////////////////////////////////////////////////////////// |
46 | /// Consumer implementation |
47 | |
48 | struct TakeAnyConsumer<'f, C> { |
49 | base: C, |
50 | count: &'f AtomicUsize, |
51 | } |
52 | |
53 | impl<'f, T, C> Consumer<T> for TakeAnyConsumer<'f, C> |
54 | where |
55 | C: Consumer<T>, |
56 | T: Send, |
57 | { |
58 | type Folder = TakeAnyFolder<'f, C::Folder>; |
59 | type Reducer = C::Reducer; |
60 | type Result = C::Result; |
61 | |
62 | fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { |
63 | let (left, right, reducer) = self.base.split_at(index); |
64 | ( |
65 | TakeAnyConsumer { base: left, ..self }, |
66 | TakeAnyConsumer { |
67 | base: right, |
68 | ..self |
69 | }, |
70 | reducer, |
71 | ) |
72 | } |
73 | |
74 | fn into_folder(self) -> Self::Folder { |
75 | TakeAnyFolder { |
76 | base: self.base.into_folder(), |
77 | count: self.count, |
78 | } |
79 | } |
80 | |
81 | fn full(&self) -> bool { |
82 | self.count.load(Ordering::Relaxed) == 0 || self.base.full() |
83 | } |
84 | } |
85 | |
86 | impl<'f, T, C> UnindexedConsumer<T> for TakeAnyConsumer<'f, C> |
87 | where |
88 | C: UnindexedConsumer<T>, |
89 | T: Send, |
90 | { |
91 | fn split_off_left(&self) -> Self { |
92 | TakeAnyConsumer { |
93 | base: self.base.split_off_left(), |
94 | ..*self |
95 | } |
96 | } |
97 | |
98 | fn to_reducer(&self) -> Self::Reducer { |
99 | self.base.to_reducer() |
100 | } |
101 | } |
102 | |
103 | struct TakeAnyFolder<'f, C> { |
104 | base: C, |
105 | count: &'f AtomicUsize, |
106 | } |
107 | |
108 | fn checked_decrement(u: &AtomicUsize) -> bool { |
109 | u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1)) |
110 | .is_ok() |
111 | } |
112 | |
113 | impl<'f, T, C> Folder<T> for TakeAnyFolder<'f, C> |
114 | where |
115 | C: Folder<T>, |
116 | { |
117 | type Result = C::Result; |
118 | |
119 | fn consume(mut self, item: T) -> Self { |
120 | if checked_decrement(self.count) { |
121 | self.base = self.base.consume(item); |
122 | } |
123 | self |
124 | } |
125 | |
126 | fn consume_iter<I>(mut self, iter: I) -> Self |
127 | where |
128 | I: IntoIterator<Item = T>, |
129 | { |
130 | self.base = self.base.consume_iter( |
131 | iter.into_iter() |
132 | .take_while(move |_| checked_decrement(self.count)), |
133 | ); |
134 | self |
135 | } |
136 | |
137 | fn complete(self) -> C::Result { |
138 | self.base.complete() |
139 | } |
140 | |
141 | fn full(&self) -> bool { |
142 | self.count.load(Ordering::Relaxed) == 0 || self.base.full() |
143 | } |
144 | } |
145 | |