1 | //! Traits and functions used to implement parallel iteration. These are |
2 | //! low-level details -- users of parallel iterators should not need to |
3 | //! interact with them directly. See [the `plumbing` README][r] for a general overview. |
4 | //! |
5 | //! [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md |
6 | |
7 | use crate::join_context; |
8 | |
9 | use super::IndexedParallelIterator; |
10 | |
11 | use std::cmp; |
12 | use std::usize; |
13 | |
14 | /// The `ProducerCallback` trait is a kind of generic closure, |
15 | /// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in |
16 | /// the plumbing README][r] for more details. |
17 | /// |
18 | /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback |
19 | /// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html |
20 | pub trait ProducerCallback<T> { |
21 | /// The type of value returned by this callback. Analogous to |
22 | /// [`Output` from the `FnOnce` trait][Output]. |
23 | /// |
24 | /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output |
25 | type Output; |
26 | |
27 | /// Invokes the callback with the given producer as argument. The |
28 | /// key point of this trait is that this method is generic over |
29 | /// `P`, and hence implementors must be defined for any producer. |
30 | fn callback<P>(self, producer: P) -> Self::Output |
31 | where |
32 | P: Producer<Item = T>; |
33 | } |
34 | |
35 | /// A `Producer` is effectively a "splittable `IntoIterator`". That |
36 | /// is, a producer is a value which can be converted into an iterator |
37 | /// at any time: at that point, it simply produces items on demand, |
38 | /// like any iterator. But what makes a `Producer` special is that, |
39 | /// *before* we convert to an iterator, we can also **split** it at a |
40 | /// particular point using the `split_at` method. This will yield up |
41 | /// two producers, one producing the items before that point, and one |
42 | /// producing the items after that point (these two producers can then |
43 | /// independently be split further, or be converted into iterators). |
44 | /// In Rayon, this splitting is used to divide between threads. |
45 | /// See [the `plumbing` README][r] for further details. |
46 | /// |
47 | /// Note that each producer will always produce a fixed number of |
48 | /// items N. However, this number N is not queryable through the API; |
49 | /// the consumer is expected to track it. |
50 | /// |
51 | /// NB. You might expect `Producer` to extend the `IntoIterator` |
52 | /// trait. However, [rust-lang/rust#20671][20671] prevents us from |
53 | /// declaring the DoubleEndedIterator and ExactSizeIterator |
54 | /// constraints on a required IntoIterator trait, so we inline |
55 | /// IntoIterator here until that issue is fixed. |
56 | /// |
57 | /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md |
58 | /// [20671]: https://github.com/rust-lang/rust/issues/20671 |
59 | pub trait Producer: Send + Sized { |
60 | /// The type of item that will be produced by this producer once |
61 | /// it is converted into an iterator. |
62 | type Item; |
63 | |
64 | /// The type of iterator we will become. |
65 | type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator; |
66 | |
67 | /// Convert `self` into an iterator; at this point, no more parallel splits |
68 | /// are possible. |
69 | fn into_iter(self) -> Self::IntoIter; |
70 | |
71 | /// The minimum number of items that we will process |
72 | /// sequentially. Defaults to 1, which means that we will split |
73 | /// all the way down to a single item. This can be raised higher |
74 | /// using the [`with_min_len`] method, which will force us to |
75 | /// create sequential tasks at a larger granularity. Note that |
76 | /// Rayon automatically normally attempts to adjust the size of |
77 | /// parallel splits to reduce overhead, so this should not be |
78 | /// needed. |
79 | /// |
80 | /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len |
81 | fn min_len(&self) -> usize { |
82 | 1 |
83 | } |
84 | |
85 | /// The maximum number of items that we will process |
86 | /// sequentially. Defaults to MAX, which means that we can choose |
87 | /// not to split at all. This can be lowered using the |
88 | /// [`with_max_len`] method, which will force us to create more |
89 | /// parallel tasks. Note that Rayon automatically normally |
90 | /// attempts to adjust the size of parallel splits to reduce |
91 | /// overhead, so this should not be needed. |
92 | /// |
93 | /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len |
94 | fn max_len(&self) -> usize { |
95 | usize::MAX |
96 | } |
97 | |
98 | /// Split into two producers; one produces items `0..index`, the |
99 | /// other `index..N`. Index must be less than or equal to `N`. |
100 | fn split_at(self, index: usize) -> (Self, Self); |
101 | |
102 | /// Iterate the producer, feeding each element to `folder`, and |
103 | /// stop when the folder is full (or all elements have been consumed). |
104 | /// |
105 | /// The provided implementation is sufficient for most iterables. |
106 | fn fold_with<F>(self, folder: F) -> F |
107 | where |
108 | F: Folder<Self::Item>, |
109 | { |
110 | folder.consume_iter(self.into_iter()) |
111 | } |
112 | } |
113 | |
114 | /// A consumer is effectively a [generalized "fold" operation][fold], |
115 | /// and in fact each consumer will eventually be converted into a |
116 | /// [`Folder`]. What makes a consumer special is that, like a |
117 | /// [`Producer`], it can be **split** into multiple consumers using |
118 | /// the `split_at` method. When a consumer is split, it produces two |
119 | /// consumers, as well as a **reducer**. The two consumers can be fed |
120 | /// items independently, and when they are done the reducer is used to |
121 | /// combine their two results into one. See [the `plumbing` |
122 | /// README][r] for further details. |
123 | /// |
124 | /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md |
125 | /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold |
126 | /// [`Folder`]: trait.Folder.html |
127 | /// [`Producer`]: trait.Producer.html |
128 | pub trait Consumer<Item>: Send + Sized { |
129 | /// The type of folder that this consumer can be converted into. |
130 | type Folder: Folder<Item, Result = Self::Result>; |
131 | |
132 | /// The type of reducer that is produced if this consumer is split. |
133 | type Reducer: Reducer<Self::Result>; |
134 | |
135 | /// The type of result that this consumer will ultimately produce. |
136 | type Result: Send; |
137 | |
138 | /// Divide the consumer into two consumers, one processing items |
139 | /// `0..index` and one processing items from `index..`. Also |
140 | /// produces a reducer that can be used to reduce the results at |
141 | /// the end. |
142 | fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); |
143 | |
144 | /// Convert the consumer into a folder that can consume items |
145 | /// sequentially, eventually producing a final result. |
146 | fn into_folder(self) -> Self::Folder; |
147 | |
148 | /// Hint whether this `Consumer` would like to stop processing |
149 | /// further items, e.g. if a search has been completed. |
150 | fn full(&self) -> bool; |
151 | } |
152 | |
153 | /// The `Folder` trait encapsulates [the standard fold |
154 | /// operation][fold]. It can be fed many items using the `consume` |
155 | /// method. At the end, once all items have been consumed, it can then |
156 | /// be converted (using `complete`) into a final value. |
157 | /// |
158 | /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold |
159 | pub trait Folder<Item>: Sized { |
160 | /// The type of result that will ultimately be produced by the folder. |
161 | type Result; |
162 | |
163 | /// Consume next item and return new sequential state. |
164 | fn consume(self, item: Item) -> Self; |
165 | |
166 | /// Consume items from the iterator until full, and return new sequential state. |
167 | /// |
168 | /// This method is **optional**. The default simply iterates over |
169 | /// `iter`, invoking `consume` and checking after each iteration |
170 | /// whether `full` returns false. |
171 | /// |
172 | /// The main reason to override it is if you can provide a more |
173 | /// specialized, efficient implementation. |
174 | fn consume_iter<I>(mut self, iter: I) -> Self |
175 | where |
176 | I: IntoIterator<Item = Item>, |
177 | { |
178 | for item in iter { |
179 | self = self.consume(item); |
180 | if self.full() { |
181 | break; |
182 | } |
183 | } |
184 | self |
185 | } |
186 | |
187 | /// Finish consuming items, produce final result. |
188 | fn complete(self) -> Self::Result; |
189 | |
190 | /// Hint whether this `Folder` would like to stop processing |
191 | /// further items, e.g. if a search has been completed. |
192 | fn full(&self) -> bool; |
193 | } |
194 | |
195 | /// The reducer is the final step of a `Consumer` -- after a consumer |
196 | /// has been split into two parts, and each of those parts has been |
197 | /// fully processed, we are left with two results. The reducer is then |
198 | /// used to combine those two results into one. See [the `plumbing` |
199 | /// README][r] for further details. |
200 | /// |
201 | /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md |
202 | pub trait Reducer<Result> { |
203 | /// Reduce two final results into one; this is executed after a |
204 | /// split. |
205 | fn reduce(self, left: Result, right: Result) -> Result; |
206 | } |
207 | |
208 | /// A stateless consumer can be freely copied. These consumers can be |
209 | /// used like regular consumers, but they also support a |
210 | /// `split_off_left` method that does not take an index to split, but |
211 | /// simply splits at some arbitrary point (`for_each`, for example, |
212 | /// produces an unindexed consumer). |
213 | pub trait UnindexedConsumer<I>: Consumer<I> { |
214 | /// Splits off a "left" consumer and returns it. The `self` |
215 | /// consumer should then be used to consume the "right" portion of |
216 | /// the data. (The ordering matters for methods like find_first -- |
217 | /// values produced by the returned value are given precedence |
218 | /// over values produced by `self`.) Once the left and right |
219 | /// halves have been fully consumed, you should reduce the results |
220 | /// with the result of `to_reducer`. |
221 | fn split_off_left(&self) -> Self; |
222 | |
223 | /// Creates a reducer that can be used to combine the results from |
224 | /// a split consumer. |
225 | fn to_reducer(&self) -> Self::Reducer; |
226 | } |
227 | |
228 | /// A variant on `Producer` which does not know its exact length or |
229 | /// cannot represent it in a `usize`. These producers act like |
230 | /// ordinary producers except that they cannot be told to split at a |
231 | /// particular point. Instead, you just ask them to split 'somewhere'. |
232 | /// |
233 | /// (In principle, `Producer` could extend this trait; however, it |
234 | /// does not because to do so would require producers to carry their |
235 | /// own length with them.) |
236 | pub trait UnindexedProducer: Send + Sized { |
237 | /// The type of item returned by this producer. |
238 | type Item; |
239 | |
240 | /// Split midway into a new producer if possible, otherwise return `None`. |
241 | fn split(self) -> (Self, Option<Self>); |
242 | |
243 | /// Iterate the producer, feeding each element to `folder`, and |
244 | /// stop when the folder is full (or all elements have been consumed). |
245 | fn fold_with<F>(self, folder: F) -> F |
246 | where |
247 | F: Folder<Self::Item>; |
248 | } |
249 | |
250 | /// A splitter controls the policy for splitting into smaller work items. |
251 | /// |
252 | /// Thief-splitting is an adaptive policy that starts by splitting into |
253 | /// enough jobs for every worker thread, and then resets itself whenever a |
254 | /// job is actually stolen into a different thread. |
255 | #[derive(Clone, Copy)] |
256 | struct Splitter { |
257 | /// The `splits` tell us approximately how many remaining times we'd |
258 | /// like to split this job. We always just divide it by two though, so |
259 | /// the effective number of pieces will be `next_power_of_two()`. |
260 | splits: usize, |
261 | } |
262 | |
263 | impl Splitter { |
264 | #[inline ] |
265 | fn new() -> Splitter { |
266 | Splitter { |
267 | splits: crate::current_num_threads(), |
268 | } |
269 | } |
270 | |
271 | #[inline ] |
272 | fn try_split(&mut self, stolen: bool) -> bool { |
273 | let Splitter { splits } = *self; |
274 | |
275 | if stolen { |
276 | // This job was stolen! Reset the number of desired splits to the |
277 | // thread count, if that's more than we had remaining anyway. |
278 | self.splits = cmp::max(crate::current_num_threads(), self.splits / 2); |
279 | true |
280 | } else if splits > 0 { |
281 | // We have splits remaining, make it so. |
282 | self.splits /= 2; |
283 | true |
284 | } else { |
285 | // Not stolen, and no more splits -- we're done! |
286 | false |
287 | } |
288 | } |
289 | } |
290 | |
291 | /// The length splitter is built on thief-splitting, but additionally takes |
292 | /// into account the remaining length of the iterator. |
293 | #[derive(Clone, Copy)] |
294 | struct LengthSplitter { |
295 | inner: Splitter, |
296 | |
297 | /// The smallest we're willing to divide into. Usually this is just 1, |
298 | /// but you can choose a larger working size with `with_min_len()`. |
299 | min: usize, |
300 | } |
301 | |
302 | impl LengthSplitter { |
303 | /// Creates a new splitter based on lengths. |
304 | /// |
305 | /// The `min` is a hard lower bound. We'll never split below that, but |
306 | /// of course an iterator might start out smaller already. |
307 | /// |
308 | /// The `max` is an upper bound on the working size, used to determine |
309 | /// the minimum number of times we need to split to get under that limit. |
310 | /// The adaptive algorithm may very well split even further, but never |
311 | /// smaller than the `min`. |
312 | #[inline ] |
313 | fn new(min: usize, max: usize, len: usize) -> LengthSplitter { |
314 | let mut splitter = LengthSplitter { |
315 | inner: Splitter::new(), |
316 | min: cmp::max(min, 1), |
317 | }; |
318 | |
319 | // Divide the given length by the max working length to get the minimum |
320 | // number of splits we need to get under that max. This rounds down, |
321 | // but the splitter actually gives `next_power_of_two()` pieces anyway. |
322 | // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces. |
323 | let min_splits = len / cmp::max(max, 1); |
324 | |
325 | // Only update the value if it's not splitting enough already. |
326 | if min_splits > splitter.inner.splits { |
327 | splitter.inner.splits = min_splits; |
328 | } |
329 | |
330 | splitter |
331 | } |
332 | |
333 | #[inline ] |
334 | fn try_split(&mut self, len: usize, stolen: bool) -> bool { |
335 | // If splitting wouldn't make us too small, try the inner splitter. |
336 | len / 2 >= self.min && self.inner.try_split(stolen) |
337 | } |
338 | } |
339 | |
340 | /// This helper function is used to "connect" a parallel iterator to a |
341 | /// consumer. It will convert the `par_iter` into a producer P and |
342 | /// then pull items from P and feed them to `consumer`, splitting and |
343 | /// creating parallel threads as needed. |
344 | /// |
345 | /// This is useful when you are implementing your own parallel |
346 | /// iterators: it is often used as the definition of the |
347 | /// [`drive_unindexed`] or [`drive`] methods. |
348 | /// |
349 | /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed |
350 | /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive |
351 | pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result |
352 | where |
353 | I: IndexedParallelIterator, |
354 | C: Consumer<I::Item>, |
355 | { |
356 | let len = par_iter.len(); |
357 | return par_iter.with_producer(Callback { len, consumer }); |
358 | |
359 | struct Callback<C> { |
360 | len: usize, |
361 | consumer: C, |
362 | } |
363 | |
364 | impl<C, I> ProducerCallback<I> for Callback<C> |
365 | where |
366 | C: Consumer<I>, |
367 | { |
368 | type Output = C::Result; |
369 | fn callback<P>(self, producer: P) -> C::Result |
370 | where |
371 | P: Producer<Item = I>, |
372 | { |
373 | bridge_producer_consumer(self.len, producer, self.consumer) |
374 | } |
375 | } |
376 | } |
377 | |
378 | /// This helper function is used to "connect" a producer and a |
379 | /// consumer. You may prefer to call [`bridge`], which wraps this |
380 | /// function. This function will draw items from `producer` and feed |
381 | /// them to `consumer`, splitting and creating parallel tasks when |
382 | /// needed. |
383 | /// |
384 | /// This is useful when you are implementing your own parallel |
385 | /// iterators: it is often used as the definition of the |
386 | /// [`drive_unindexed`] or [`drive`] methods. |
387 | /// |
388 | /// [`bridge`]: fn.bridge.html |
389 | /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed |
390 | /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive |
391 | pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result |
392 | where |
393 | P: Producer, |
394 | C: Consumer<P::Item>, |
395 | { |
396 | let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len); |
397 | return helper(len, false, splitter, producer, consumer); |
398 | |
399 | fn helper<P, C>( |
400 | len: usize, |
401 | migrated: bool, |
402 | mut splitter: LengthSplitter, |
403 | producer: P, |
404 | consumer: C, |
405 | ) -> C::Result |
406 | where |
407 | P: Producer, |
408 | C: Consumer<P::Item>, |
409 | { |
410 | if consumer.full() { |
411 | consumer.into_folder().complete() |
412 | } else if splitter.try_split(len, migrated) { |
413 | let mid = len / 2; |
414 | let (left_producer, right_producer) = producer.split_at(mid); |
415 | let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); |
416 | let (left_result, right_result) = join_context( |
417 | |context| { |
418 | helper( |
419 | mid, |
420 | context.migrated(), |
421 | splitter, |
422 | left_producer, |
423 | left_consumer, |
424 | ) |
425 | }, |
426 | |context| { |
427 | helper( |
428 | len - mid, |
429 | context.migrated(), |
430 | splitter, |
431 | right_producer, |
432 | right_consumer, |
433 | ) |
434 | }, |
435 | ); |
436 | reducer.reduce(left_result, right_result) |
437 | } else { |
438 | producer.fold_with(consumer.into_folder()).complete() |
439 | } |
440 | } |
441 | } |
442 | |
443 | /// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer. |
444 | /// |
445 | /// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html |
446 | pub fn bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result |
447 | where |
448 | P: UnindexedProducer, |
449 | C: UnindexedConsumer<P::Item>, |
450 | { |
451 | let splitter = Splitter::new(); |
452 | bridge_unindexed_producer_consumer(false, splitter, producer, consumer) |
453 | } |
454 | |
455 | fn bridge_unindexed_producer_consumer<P, C>( |
456 | migrated: bool, |
457 | mut splitter: Splitter, |
458 | producer: P, |
459 | consumer: C, |
460 | ) -> C::Result |
461 | where |
462 | P: UnindexedProducer, |
463 | C: UnindexedConsumer<P::Item>, |
464 | { |
465 | if consumer.full() { |
466 | consumer.into_folder().complete() |
467 | } else if splitter.try_split(migrated) { |
468 | match producer.split() { |
469 | (left_producer, Some(right_producer)) => { |
470 | let (reducer, left_consumer, right_consumer) = |
471 | (consumer.to_reducer(), consumer.split_off_left(), consumer); |
472 | let bridge = bridge_unindexed_producer_consumer; |
473 | let (left_result, right_result) = join_context( |
474 | |context| bridge(context.migrated(), splitter, left_producer, left_consumer), |
475 | |context| bridge(context.migrated(), splitter, right_producer, right_consumer), |
476 | ); |
477 | reducer.reduce(left_result, right_result) |
478 | } |
479 | (producer, None) => producer.fold_with(consumer.into_folder()).complete(), |
480 | } |
481 | } else { |
482 | producer.fold_with(consumer.into_folder()).complete() |
483 | } |
484 | } |
485 | |