1 | use super::plumbing::*; |
2 | use super::*; |
3 | use std::cell::Cell; |
4 | use std::sync::atomic::{AtomicUsize, Ordering}; |
5 | |
6 | #[cfg (test)] |
7 | mod test; |
8 | |
9 | // The key optimization for find_first is that a consumer can stop its search if |
10 | // some consumer to its left already found a match (and similarly for consumers |
11 | // to the right for find_last). To make this work, all consumers need some |
12 | // notion of their position in the data relative to other consumers, including |
13 | // unindexed consumers that have no built-in notion of position. |
14 | // |
15 | // To solve this, we assign each consumer a lower and upper bound for an |
16 | // imaginary "range" of data that it consumes. The initial consumer starts with |
17 | // the range 0..usize::max_value(). The split divides this range in half so that |
18 | // one resulting consumer has the range 0..(usize::max_value() / 2), and the |
19 | // other has (usize::max_value() / 2)..usize::max_value(). Every subsequent |
20 | // split divides the range in half again until it cannot be split anymore |
21 | // (i.e. its length is 1), in which case the split returns two consumers with |
22 | // the same range. In that case both consumers will continue to consume all |
23 | // their data regardless of whether a better match is found, but the reducer |
24 | // will still return the correct answer. |
25 | |
26 | #[derive(Copy, Clone)] |
27 | enum MatchPosition { |
28 | Leftmost, |
29 | Rightmost, |
30 | } |
31 | |
32 | /// Returns true if pos1 is a better match than pos2 according to MatchPosition |
33 | #[inline ] |
34 | fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool { |
35 | match mp { |
36 | MatchPosition::Leftmost => pos1 < pos2, |
37 | MatchPosition::Rightmost => pos1 > pos2, |
38 | } |
39 | } |
40 | |
41 | pub(super) fn find_first<I, P>(pi: I, find_op: P) -> Option<I::Item> |
42 | where |
43 | I: ParallelIterator, |
44 | P: Fn(&I::Item) -> bool + Sync, |
45 | { |
46 | let best_found = AtomicUsize::new(usize::max_value()); |
47 | let consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found); |
48 | pi.drive_unindexed(consumer) |
49 | } |
50 | |
51 | pub(super) fn find_last<I, P>(pi: I, find_op: P) -> Option<I::Item> |
52 | where |
53 | I: ParallelIterator, |
54 | P: Fn(&I::Item) -> bool + Sync, |
55 | { |
56 | let best_found = AtomicUsize::new(0); |
57 | let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found); |
58 | pi.drive_unindexed(consumer) |
59 | } |
60 | |
61 | struct FindConsumer<'p, P> { |
62 | find_op: &'p P, |
63 | lower_bound: Cell<usize>, |
64 | upper_bound: usize, |
65 | match_position: MatchPosition, |
66 | best_found: &'p AtomicUsize, |
67 | } |
68 | |
69 | impl<'p, P> FindConsumer<'p, P> { |
70 | fn new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self { |
71 | FindConsumer { |
72 | find_op, |
73 | lower_bound: Cell::new(0), |
74 | upper_bound: usize::max_value(), |
75 | match_position, |
76 | best_found, |
77 | } |
78 | } |
79 | |
80 | fn current_index(&self) -> usize { |
81 | match self.match_position { |
82 | MatchPosition::Leftmost => self.lower_bound.get(), |
83 | MatchPosition::Rightmost => self.upper_bound, |
84 | } |
85 | } |
86 | } |
87 | |
88 | impl<'p, T, P> Consumer<T> for FindConsumer<'p, P> |
89 | where |
90 | T: Send, |
91 | P: Fn(&T) -> bool + Sync, |
92 | { |
93 | type Folder = FindFolder<'p, T, P>; |
94 | type Reducer = FindReducer; |
95 | type Result = Option<T>; |
96 | |
97 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
98 | let dir = self.match_position; |
99 | ( |
100 | self.split_off_left(), |
101 | self, |
102 | FindReducer { |
103 | match_position: dir, |
104 | }, |
105 | ) |
106 | } |
107 | |
108 | fn into_folder(self) -> Self::Folder { |
109 | FindFolder { |
110 | find_op: self.find_op, |
111 | boundary: self.current_index(), |
112 | match_position: self.match_position, |
113 | best_found: self.best_found, |
114 | item: None, |
115 | } |
116 | } |
117 | |
118 | fn full(&self) -> bool { |
119 | // can stop consuming if the best found index so far is *strictly* |
120 | // better than anything this consumer will find |
121 | better_position( |
122 | self.best_found.load(Ordering::Relaxed), |
123 | self.current_index(), |
124 | self.match_position, |
125 | ) |
126 | } |
127 | } |
128 | |
129 | impl<'p, T, P> UnindexedConsumer<T> for FindConsumer<'p, P> |
130 | where |
131 | T: Send, |
132 | P: Fn(&T) -> bool + Sync, |
133 | { |
134 | fn split_off_left(&self) -> Self { |
135 | // Upper bound for one consumer will be lower bound for the other. This |
136 | // overlap is okay, because only one of the bounds will be used for |
137 | // comparing against best_found; the other is kept only to be able to |
138 | // divide the range in half. |
139 | // |
140 | // When the resolution of usize has been exhausted (i.e. when |
141 | // upper_bound = lower_bound), both results of this split will have the |
142 | // same range. When that happens, we lose the ability to tell one |
143 | // consumer to stop working when the other finds a better match, but the |
144 | // reducer ensures that the best answer is still returned (see the test |
145 | // above). |
146 | let old_lower_bound = self.lower_bound.get(); |
147 | let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2); |
148 | self.lower_bound.set(median); |
149 | |
150 | FindConsumer { |
151 | find_op: self.find_op, |
152 | lower_bound: Cell::new(old_lower_bound), |
153 | upper_bound: median, |
154 | match_position: self.match_position, |
155 | best_found: self.best_found, |
156 | } |
157 | } |
158 | |
159 | fn to_reducer(&self) -> Self::Reducer { |
160 | FindReducer { |
161 | match_position: self.match_position, |
162 | } |
163 | } |
164 | } |
165 | |
166 | struct FindFolder<'p, T, P> { |
167 | find_op: &'p P, |
168 | boundary: usize, |
169 | match_position: MatchPosition, |
170 | best_found: &'p AtomicUsize, |
171 | item: Option<T>, |
172 | } |
173 | |
174 | impl<'p, P: 'p + Fn(&T) -> bool, T> Folder<T> for FindFolder<'p, T, P> { |
175 | type Result = Option<T>; |
176 | |
177 | fn consume(mut self, item: T) -> Self { |
178 | let found_best_in_range = match self.match_position { |
179 | MatchPosition::Leftmost => self.item.is_some(), |
180 | MatchPosition::Rightmost => false, |
181 | }; |
182 | |
183 | if !found_best_in_range && (self.find_op)(&item) { |
184 | // Continuously try to set best_found until we succeed or we |
185 | // discover a better match was already found. |
186 | let mut current = self.best_found.load(Ordering::Relaxed); |
187 | loop { |
188 | if better_position(current, self.boundary, self.match_position) { |
189 | break; |
190 | } |
191 | match self.best_found.compare_exchange_weak( |
192 | current, |
193 | self.boundary, |
194 | Ordering::Relaxed, |
195 | Ordering::Relaxed, |
196 | ) { |
197 | Ok(_) => { |
198 | self.item = Some(item); |
199 | break; |
200 | } |
201 | Err(v) => current = v, |
202 | } |
203 | } |
204 | } |
205 | self |
206 | } |
207 | |
208 | fn complete(self) -> Self::Result { |
209 | self.item |
210 | } |
211 | |
212 | fn full(&self) -> bool { |
213 | let found_best_in_range = match self.match_position { |
214 | MatchPosition::Leftmost => self.item.is_some(), |
215 | MatchPosition::Rightmost => false, |
216 | }; |
217 | |
218 | found_best_in_range |
219 | || better_position( |
220 | self.best_found.load(Ordering::Relaxed), |
221 | self.boundary, |
222 | self.match_position, |
223 | ) |
224 | } |
225 | } |
226 | |
227 | struct FindReducer { |
228 | match_position: MatchPosition, |
229 | } |
230 | |
231 | impl<T> Reducer<Option<T>> for FindReducer { |
232 | fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> { |
233 | match self.match_position { |
234 | MatchPosition::Leftmost => left.or(right), |
235 | MatchPosition::Rightmost => right.or(left), |
236 | } |
237 | } |
238 | } |
239 | |