| 1 | use super::plumbing::*; |
| 2 | use super::*; |
| 3 | use std::cell::Cell; |
| 4 | use std::sync::atomic::{AtomicUsize, Ordering}; |
| 5 | |
| 6 | #[cfg (test)] |
| 7 | mod test; |
| 8 | |
| 9 | // The key optimization for find_first is that a consumer can stop its search if |
| 10 | // some consumer to its left already found a match (and similarly for consumers |
| 11 | // to the right for find_last). To make this work, all consumers need some |
| 12 | // notion of their position in the data relative to other consumers, including |
| 13 | // unindexed consumers that have no built-in notion of position. |
| 14 | // |
| 15 | // To solve this, we assign each consumer a lower and upper bound for an |
| 16 | // imaginary "range" of data that it consumes. The initial consumer starts with |
| 17 | // the range 0..usize::max_value(). The split divides this range in half so that |
| 18 | // one resulting consumer has the range 0..(usize::max_value() / 2), and the |
| 19 | // other has (usize::max_value() / 2)..usize::max_value(). Every subsequent |
| 20 | // split divides the range in half again until it cannot be split anymore |
| 21 | // (i.e. its length is 1), in which case the split returns two consumers with |
| 22 | // the same range. In that case both consumers will continue to consume all |
| 23 | // their data regardless of whether a better match is found, but the reducer |
| 24 | // will still return the correct answer. |
| 25 | |
| 26 | #[derive (Copy, Clone)] |
| 27 | enum MatchPosition { |
| 28 | Leftmost, |
| 29 | Rightmost, |
| 30 | } |
| 31 | |
| 32 | /// Returns true if pos1 is a better match than pos2 according to MatchPosition |
| 33 | #[inline ] |
| 34 | fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool { |
| 35 | match mp { |
| 36 | MatchPosition::Leftmost => pos1 < pos2, |
| 37 | MatchPosition::Rightmost => pos1 > pos2, |
| 38 | } |
| 39 | } |
| 40 | |
| 41 | pub(super) fn find_first<I, P>(pi: I, find_op: P) -> Option<I::Item> |
| 42 | where |
| 43 | I: ParallelIterator, |
| 44 | P: Fn(&I::Item) -> bool + Sync, |
| 45 | { |
| 46 | let best_found: AtomicUsize = AtomicUsize::new(usize::max_value()); |
| 47 | let consumer: FindConsumer<'_, P> = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found); |
| 48 | pi.drive_unindexed(consumer) |
| 49 | } |
| 50 | |
| 51 | pub(super) fn find_last<I, P>(pi: I, find_op: P) -> Option<I::Item> |
| 52 | where |
| 53 | I: ParallelIterator, |
| 54 | P: Fn(&I::Item) -> bool + Sync, |
| 55 | { |
| 56 | let best_found: AtomicUsize = AtomicUsize::new(0); |
| 57 | let consumer: FindConsumer<'_, P> = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found); |
| 58 | pi.drive_unindexed(consumer) |
| 59 | } |
| 60 | |
| 61 | struct FindConsumer<'p, P> { |
| 62 | find_op: &'p P, |
| 63 | lower_bound: Cell<usize>, |
| 64 | upper_bound: usize, |
| 65 | match_position: MatchPosition, |
| 66 | best_found: &'p AtomicUsize, |
| 67 | } |
| 68 | |
| 69 | impl<'p, P> FindConsumer<'p, P> { |
| 70 | fn new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self { |
| 71 | FindConsumer { |
| 72 | find_op, |
| 73 | lower_bound: Cell::new(0), |
| 74 | upper_bound: usize::max_value(), |
| 75 | match_position, |
| 76 | best_found, |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | fn current_index(&self) -> usize { |
| 81 | match self.match_position { |
| 82 | MatchPosition::Leftmost => self.lower_bound.get(), |
| 83 | MatchPosition::Rightmost => self.upper_bound, |
| 84 | } |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | impl<'p, T, P> Consumer<T> for FindConsumer<'p, P> |
| 89 | where |
| 90 | T: Send, |
| 91 | P: Fn(&T) -> bool + Sync, |
| 92 | { |
| 93 | type Folder = FindFolder<'p, T, P>; |
| 94 | type Reducer = FindReducer; |
| 95 | type Result = Option<T>; |
| 96 | |
| 97 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
| 98 | let dir = self.match_position; |
| 99 | ( |
| 100 | self.split_off_left(), |
| 101 | self, |
| 102 | FindReducer { |
| 103 | match_position: dir, |
| 104 | }, |
| 105 | ) |
| 106 | } |
| 107 | |
| 108 | fn into_folder(self) -> Self::Folder { |
| 109 | FindFolder { |
| 110 | find_op: self.find_op, |
| 111 | boundary: self.current_index(), |
| 112 | match_position: self.match_position, |
| 113 | best_found: self.best_found, |
| 114 | item: None, |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | fn full(&self) -> bool { |
| 119 | // can stop consuming if the best found index so far is *strictly* |
| 120 | // better than anything this consumer will find |
| 121 | better_position( |
| 122 | self.best_found.load(Ordering::Relaxed), |
| 123 | self.current_index(), |
| 124 | self.match_position, |
| 125 | ) |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | impl<'p, T, P> UnindexedConsumer<T> for FindConsumer<'p, P> |
| 130 | where |
| 131 | T: Send, |
| 132 | P: Fn(&T) -> bool + Sync, |
| 133 | { |
| 134 | fn split_off_left(&self) -> Self { |
| 135 | // Upper bound for one consumer will be lower bound for the other. This |
| 136 | // overlap is okay, because only one of the bounds will be used for |
| 137 | // comparing against best_found; the other is kept only to be able to |
| 138 | // divide the range in half. |
| 139 | // |
| 140 | // When the resolution of usize has been exhausted (i.e. when |
| 141 | // upper_bound = lower_bound), both results of this split will have the |
| 142 | // same range. When that happens, we lose the ability to tell one |
| 143 | // consumer to stop working when the other finds a better match, but the |
| 144 | // reducer ensures that the best answer is still returned (see the test |
| 145 | // above). |
| 146 | let old_lower_bound = self.lower_bound.get(); |
| 147 | let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2); |
| 148 | self.lower_bound.set(median); |
| 149 | |
| 150 | FindConsumer { |
| 151 | find_op: self.find_op, |
| 152 | lower_bound: Cell::new(old_lower_bound), |
| 153 | upper_bound: median, |
| 154 | match_position: self.match_position, |
| 155 | best_found: self.best_found, |
| 156 | } |
| 157 | } |
| 158 | |
| 159 | fn to_reducer(&self) -> Self::Reducer { |
| 160 | FindReducer { |
| 161 | match_position: self.match_position, |
| 162 | } |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | struct FindFolder<'p, T, P> { |
| 167 | find_op: &'p P, |
| 168 | boundary: usize, |
| 169 | match_position: MatchPosition, |
| 170 | best_found: &'p AtomicUsize, |
| 171 | item: Option<T>, |
| 172 | } |
| 173 | |
| 174 | impl<'p, P: 'p + Fn(&T) -> bool, T> Folder<T> for FindFolder<'p, T, P> { |
| 175 | type Result = Option<T>; |
| 176 | |
| 177 | fn consume(mut self, item: T) -> Self { |
| 178 | let found_best_in_range = match self.match_position { |
| 179 | MatchPosition::Leftmost => self.item.is_some(), |
| 180 | MatchPosition::Rightmost => false, |
| 181 | }; |
| 182 | |
| 183 | if !found_best_in_range && (self.find_op)(&item) { |
| 184 | // Update the best found index if ours is better. |
| 185 | let update = |
| 186 | self.best_found |
| 187 | .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { |
| 188 | better_position(self.boundary, current, self.match_position) |
| 189 | .then_some(self.boundary) |
| 190 | }); |
| 191 | |
| 192 | // Save this item if our index was better or equal. |
| 193 | if update.is_ok() || update == Err(self.boundary) { |
| 194 | self.item = Some(item); |
| 195 | } |
| 196 | } |
| 197 | self |
| 198 | } |
| 199 | |
| 200 | fn complete(self) -> Self::Result { |
| 201 | self.item |
| 202 | } |
| 203 | |
| 204 | fn full(&self) -> bool { |
| 205 | let found_best_in_range = match self.match_position { |
| 206 | MatchPosition::Leftmost => self.item.is_some(), |
| 207 | MatchPosition::Rightmost => false, |
| 208 | }; |
| 209 | |
| 210 | found_best_in_range |
| 211 | || better_position( |
| 212 | self.best_found.load(Ordering::Relaxed), |
| 213 | self.boundary, |
| 214 | self.match_position, |
| 215 | ) |
| 216 | } |
| 217 | } |
| 218 | |
| 219 | struct FindReducer { |
| 220 | match_position: MatchPosition, |
| 221 | } |
| 222 | |
| 223 | impl<T> Reducer<Option<T>> for FindReducer { |
| 224 | fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> { |
| 225 | match self.match_position { |
| 226 | MatchPosition::Leftmost => left.or(optb:right), |
| 227 | MatchPosition::Rightmost => right.or(optb:left), |
| 228 | } |
| 229 | } |
| 230 | } |
| 231 | |