| 1 | use super::plumbing::*; | 
| 2 | use super::*; | 
|---|
| 3 | use std::cell::Cell; | 
|---|
| 4 | use std::sync::atomic::{AtomicUsize, Ordering}; | 
|---|
| 5 |  | 
|---|
| 6 | #[ cfg(test)] | 
|---|
| 7 | mod test; | 
|---|
| 8 |  | 
|---|
| 9 | // The key optimization for find_first is that a consumer can stop its search if | 
|---|
| 10 | // some consumer to its left already found a match (and similarly for consumers | 
|---|
| 11 | // to the right for find_last). To make this work, all consumers need some | 
|---|
| 12 | // notion of their position in the data relative to other consumers, including | 
|---|
| 13 | // unindexed consumers that have no built-in notion of position. | 
|---|
| 14 | // | 
|---|
| 15 | // To solve this, we assign each consumer a lower and upper bound for an | 
|---|
| 16 | // imaginary "range" of data that it consumes. The initial consumer starts with | 
|---|
| 17 | // the range 0..usize::max_value(). The split divides this range in half so that | 
|---|
| 18 | // one resulting consumer has the range 0..(usize::max_value() / 2), and the | 
|---|
| 19 | // other has (usize::max_value() / 2)..usize::max_value(). Every subsequent | 
|---|
| 20 | // split divides the range in half again until it cannot be split anymore | 
|---|
| 21 | // (i.e. its length is 1), in which case the split returns two consumers with | 
|---|
| 22 | // the same range. In that case both consumers will continue to consume all | 
|---|
| 23 | // their data regardless of whether a better match is found, but the reducer | 
|---|
| 24 | // will still return the correct answer. | 
|---|
| 25 |  | 
|---|
| 26 | #[ derive(Copy, Clone)] | 
|---|
| 27 | enum MatchPosition { | 
|---|
| 28 | Leftmost, | 
|---|
| 29 | Rightmost, | 
|---|
| 30 | } | 
|---|
| 31 |  | 
|---|
| 32 | /// Returns true if pos1 is a better match than pos2 according to MatchPosition | 
|---|
| 33 | #[ inline] | 
|---|
| 34 | fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool { | 
|---|
| 35 | match mp { | 
|---|
| 36 | MatchPosition::Leftmost => pos1 < pos2, | 
|---|
| 37 | MatchPosition::Rightmost => pos1 > pos2, | 
|---|
| 38 | } | 
|---|
| 39 | } | 
|---|
| 40 |  | 
|---|
| 41 | pub(super) fn find_first<I, P>(pi: I, find_op: P) -> Option<I::Item> | 
|---|
| 42 | where | 
|---|
| 43 | I: ParallelIterator, | 
|---|
| 44 | P: Fn(&I::Item) -> bool + Sync, | 
|---|
| 45 | { | 
|---|
| 46 | let best_found: AtomicUsize = AtomicUsize::new(usize::max_value()); | 
|---|
| 47 | let consumer: FindConsumer<'_, P> = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found); | 
|---|
| 48 | pi.drive_unindexed(consumer) | 
|---|
| 49 | } | 
|---|
| 50 |  | 
|---|
| 51 | pub(super) fn find_last<I, P>(pi: I, find_op: P) -> Option<I::Item> | 
|---|
| 52 | where | 
|---|
| 53 | I: ParallelIterator, | 
|---|
| 54 | P: Fn(&I::Item) -> bool + Sync, | 
|---|
| 55 | { | 
|---|
| 56 | let best_found: AtomicUsize = AtomicUsize::new(0); | 
|---|
| 57 | let consumer: FindConsumer<'_, P> = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found); | 
|---|
| 58 | pi.drive_unindexed(consumer) | 
|---|
| 59 | } | 
|---|
| 60 |  | 
|---|
| 61 | struct FindConsumer<'p, P> { | 
|---|
| 62 | find_op: &'p P, | 
|---|
| 63 | lower_bound: Cell<usize>, | 
|---|
| 64 | upper_bound: usize, | 
|---|
| 65 | match_position: MatchPosition, | 
|---|
| 66 | best_found: &'p AtomicUsize, | 
|---|
| 67 | } | 
|---|
| 68 |  | 
|---|
| 69 | impl<'p, P> FindConsumer<'p, P> { | 
|---|
| 70 | fn new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self { | 
|---|
| 71 | FindConsumer { | 
|---|
| 72 | find_op, | 
|---|
| 73 | lower_bound: Cell::new(0), | 
|---|
| 74 | upper_bound: usize::max_value(), | 
|---|
| 75 | match_position, | 
|---|
| 76 | best_found, | 
|---|
| 77 | } | 
|---|
| 78 | } | 
|---|
| 79 |  | 
|---|
| 80 | fn current_index(&self) -> usize { | 
|---|
| 81 | match self.match_position { | 
|---|
| 82 | MatchPosition::Leftmost => self.lower_bound.get(), | 
|---|
| 83 | MatchPosition::Rightmost => self.upper_bound, | 
|---|
| 84 | } | 
|---|
| 85 | } | 
|---|
| 86 | } | 
|---|
| 87 |  | 
|---|
| 88 | impl<'p, T, P> Consumer<T> for FindConsumer<'p, P> | 
|---|
| 89 | where | 
|---|
| 90 | T: Send, | 
|---|
| 91 | P: Fn(&T) -> bool + Sync, | 
|---|
| 92 | { | 
|---|
| 93 | type Folder = FindFolder<'p, T, P>; | 
|---|
| 94 | type Reducer = FindReducer; | 
|---|
| 95 | type Result = Option<T>; | 
|---|
| 96 |  | 
|---|
| 97 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { | 
|---|
| 98 | let dir = self.match_position; | 
|---|
| 99 | ( | 
|---|
| 100 | self.split_off_left(), | 
|---|
| 101 | self, | 
|---|
| 102 | FindReducer { | 
|---|
| 103 | match_position: dir, | 
|---|
| 104 | }, | 
|---|
| 105 | ) | 
|---|
| 106 | } | 
|---|
| 107 |  | 
|---|
| 108 | fn into_folder(self) -> Self::Folder { | 
|---|
| 109 | FindFolder { | 
|---|
| 110 | find_op: self.find_op, | 
|---|
| 111 | boundary: self.current_index(), | 
|---|
| 112 | match_position: self.match_position, | 
|---|
| 113 | best_found: self.best_found, | 
|---|
| 114 | item: None, | 
|---|
| 115 | } | 
|---|
| 116 | } | 
|---|
| 117 |  | 
|---|
| 118 | fn full(&self) -> bool { | 
|---|
| 119 | // can stop consuming if the best found index so far is *strictly* | 
|---|
| 120 | // better than anything this consumer will find | 
|---|
| 121 | better_position( | 
|---|
| 122 | self.best_found.load(Ordering::Relaxed), | 
|---|
| 123 | self.current_index(), | 
|---|
| 124 | self.match_position, | 
|---|
| 125 | ) | 
|---|
| 126 | } | 
|---|
| 127 | } | 
|---|
| 128 |  | 
|---|
| 129 | impl<'p, T, P> UnindexedConsumer<T> for FindConsumer<'p, P> | 
|---|
| 130 | where | 
|---|
| 131 | T: Send, | 
|---|
| 132 | P: Fn(&T) -> bool + Sync, | 
|---|
| 133 | { | 
|---|
| 134 | fn split_off_left(&self) -> Self { | 
|---|
| 135 | // Upper bound for one consumer will be lower bound for the other. This | 
|---|
| 136 | // overlap is okay, because only one of the bounds will be used for | 
|---|
| 137 | // comparing against best_found; the other is kept only to be able to | 
|---|
| 138 | // divide the range in half. | 
|---|
| 139 | // | 
|---|
| 140 | // When the resolution of usize has been exhausted (i.e. when | 
|---|
| 141 | // upper_bound = lower_bound), both results of this split will have the | 
|---|
| 142 | // same range. When that happens, we lose the ability to tell one | 
|---|
| 143 | // consumer to stop working when the other finds a better match, but the | 
|---|
| 144 | // reducer ensures that the best answer is still returned (see the test | 
|---|
| 145 | // above). | 
|---|
| 146 | let old_lower_bound = self.lower_bound.get(); | 
|---|
| 147 | let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2); | 
|---|
| 148 | self.lower_bound.set(median); | 
|---|
| 149 |  | 
|---|
| 150 | FindConsumer { | 
|---|
| 151 | find_op: self.find_op, | 
|---|
| 152 | lower_bound: Cell::new(old_lower_bound), | 
|---|
| 153 | upper_bound: median, | 
|---|
| 154 | match_position: self.match_position, | 
|---|
| 155 | best_found: self.best_found, | 
|---|
| 156 | } | 
|---|
| 157 | } | 
|---|
| 158 |  | 
|---|
| 159 | fn to_reducer(&self) -> Self::Reducer { | 
|---|
| 160 | FindReducer { | 
|---|
| 161 | match_position: self.match_position, | 
|---|
| 162 | } | 
|---|
| 163 | } | 
|---|
| 164 | } | 
|---|
| 165 |  | 
|---|
| 166 | struct FindFolder<'p, T, P> { | 
|---|
| 167 | find_op: &'p P, | 
|---|
| 168 | boundary: usize, | 
|---|
| 169 | match_position: MatchPosition, | 
|---|
| 170 | best_found: &'p AtomicUsize, | 
|---|
| 171 | item: Option<T>, | 
|---|
| 172 | } | 
|---|
| 173 |  | 
|---|
| 174 | impl<'p, P: 'p + Fn(&T) -> bool, T> Folder<T> for FindFolder<'p, T, P> { | 
|---|
| 175 | type Result = Option<T>; | 
|---|
| 176 |  | 
|---|
| 177 | fn consume(mut self, item: T) -> Self { | 
|---|
| 178 | let found_best_in_range = match self.match_position { | 
|---|
| 179 | MatchPosition::Leftmost => self.item.is_some(), | 
|---|
| 180 | MatchPosition::Rightmost => false, | 
|---|
| 181 | }; | 
|---|
| 182 |  | 
|---|
| 183 | if !found_best_in_range && (self.find_op)(&item) { | 
|---|
| 184 | // Continuously try to set best_found until we succeed or we | 
|---|
| 185 | // discover a better match was already found. | 
|---|
| 186 | let mut current = self.best_found.load(Ordering::Relaxed); | 
|---|
| 187 | loop { | 
|---|
| 188 | if better_position(current, self.boundary, self.match_position) { | 
|---|
| 189 | break; | 
|---|
| 190 | } | 
|---|
| 191 | match self.best_found.compare_exchange_weak( | 
|---|
| 192 | current, | 
|---|
| 193 | self.boundary, | 
|---|
| 194 | Ordering::Relaxed, | 
|---|
| 195 | Ordering::Relaxed, | 
|---|
| 196 | ) { | 
|---|
| 197 | Ok(_) => { | 
|---|
| 198 | self.item = Some(item); | 
|---|
| 199 | break; | 
|---|
| 200 | } | 
|---|
| 201 | Err(v) => current = v, | 
|---|
| 202 | } | 
|---|
| 203 | } | 
|---|
| 204 | } | 
|---|
| 205 | self | 
|---|
| 206 | } | 
|---|
| 207 |  | 
|---|
| 208 | fn complete(self) -> Self::Result { | 
|---|
| 209 | self.item | 
|---|
| 210 | } | 
|---|
| 211 |  | 
|---|
| 212 | fn full(&self) -> bool { | 
|---|
| 213 | let found_best_in_range = match self.match_position { | 
|---|
| 214 | MatchPosition::Leftmost => self.item.is_some(), | 
|---|
| 215 | MatchPosition::Rightmost => false, | 
|---|
| 216 | }; | 
|---|
| 217 |  | 
|---|
| 218 | found_best_in_range | 
|---|
| 219 | || better_position( | 
|---|
| 220 | self.best_found.load(Ordering::Relaxed), | 
|---|
| 221 | self.boundary, | 
|---|
| 222 | self.match_position, | 
|---|
| 223 | ) | 
|---|
| 224 | } | 
|---|
| 225 | } | 
|---|
| 226 |  | 
|---|
| 227 | struct FindReducer { | 
|---|
| 228 | match_position: MatchPosition, | 
|---|
| 229 | } | 
|---|
| 230 |  | 
|---|
| 231 | impl<T> Reducer<Option<T>> for FindReducer { | 
|---|
| 232 | fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> { | 
|---|
| 233 | match self.match_position { | 
|---|
| 234 | MatchPosition::Leftmost => left.or(optb:right), | 
|---|
| 235 | MatchPosition::Rightmost => right.or(optb:left), | 
|---|
| 236 | } | 
|---|
| 237 | } | 
|---|
| 238 | } | 
|---|
| 239 |  | 
|---|