1 | //! Traits and functions used to implement parallel iteration. These are |
2 | //! low-level details -- users of parallel iterators should not need to |
3 | //! interact with them directly. See [the `plumbing` README][r] for a general overview. |
4 | //! |
5 | //! [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md |
6 | |
7 | use crate::join_context; |
8 | |
9 | use super::IndexedParallelIterator; |
10 | |
11 | use std::usize; |
12 | |
13 | /// The `ProducerCallback` trait is a kind of generic closure, |
14 | /// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in |
15 | /// the plumbing README][r] for more details. |
16 | /// |
17 | /// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md#producer-callback |
18 | /// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html |
19 | pub trait ProducerCallback<T> { |
20 | /// The type of value returned by this callback. Analogous to |
21 | /// [`Output` from the `FnOnce` trait][Output]. |
22 | /// |
23 | /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output |
24 | type Output; |
25 | |
26 | /// Invokes the callback with the given producer as argument. The |
27 | /// key point of this trait is that this method is generic over |
28 | /// `P`, and hence implementors must be defined for any producer. |
29 | fn callback<P>(self, producer: P) -> Self::Output |
30 | where |
31 | P: Producer<Item = T>; |
32 | } |
33 | |
34 | /// A `Producer` is effectively a "splittable `IntoIterator`". That |
35 | /// is, a producer is a value which can be converted into an iterator |
36 | /// at any time: at that point, it simply produces items on demand, |
37 | /// like any iterator. But what makes a `Producer` special is that, |
38 | /// *before* we convert to an iterator, we can also **split** it at a |
39 | /// particular point using the `split_at` method. This will yield up |
40 | /// two producers, one producing the items before that point, and one |
41 | /// producing the items after that point (these two producers can then |
42 | /// independently be split further, or be converted into iterators). |
43 | /// In Rayon, this splitting is used to divide between threads. |
44 | /// See [the `plumbing` README][r] for further details. |
45 | /// |
46 | /// Note that each producer will always produce a fixed number of |
47 | /// items N. However, this number N is not queryable through the API; |
48 | /// the consumer is expected to track it. |
49 | /// |
50 | /// NB. You might expect `Producer` to extend the `IntoIterator` |
51 | /// trait. However, [rust-lang/rust#20671][20671] prevents us from |
52 | /// declaring the DoubleEndedIterator and ExactSizeIterator |
53 | /// constraints on a required IntoIterator trait, so we inline |
54 | /// IntoIterator here until that issue is fixed. |
55 | /// |
56 | /// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md |
57 | /// [20671]: https://github.com/rust-lang/rust/issues/20671 |
58 | pub trait Producer: Send + Sized { |
59 | /// The type of item that will be produced by this producer once |
60 | /// it is converted into an iterator. |
61 | type Item; |
62 | |
63 | /// The type of iterator we will become. |
64 | type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator; |
65 | |
66 | /// Convert `self` into an iterator; at this point, no more parallel splits |
67 | /// are possible. |
68 | fn into_iter(self) -> Self::IntoIter; |
69 | |
70 | /// The minimum number of items that we will process |
71 | /// sequentially. Defaults to 1, which means that we will split |
72 | /// all the way down to a single item. This can be raised higher |
73 | /// using the [`with_min_len`] method, which will force us to |
74 | /// create sequential tasks at a larger granularity. Note that |
75 | /// Rayon automatically normally attempts to adjust the size of |
76 | /// parallel splits to reduce overhead, so this should not be |
77 | /// needed. |
78 | /// |
79 | /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len |
80 | fn min_len(&self) -> usize { |
81 | 1 |
82 | } |
83 | |
84 | /// The maximum number of items that we will process |
85 | /// sequentially. Defaults to MAX, which means that we can choose |
86 | /// not to split at all. This can be lowered using the |
87 | /// [`with_max_len`] method, which will force us to create more |
88 | /// parallel tasks. Note that Rayon automatically normally |
89 | /// attempts to adjust the size of parallel splits to reduce |
90 | /// overhead, so this should not be needed. |
91 | /// |
92 | /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len |
93 | fn max_len(&self) -> usize { |
94 | usize::MAX |
95 | } |
96 | |
97 | /// Split into two producers; one produces items `0..index`, the |
98 | /// other `index..N`. Index must be less than or equal to `N`. |
99 | fn split_at(self, index: usize) -> (Self, Self); |
100 | |
101 | /// Iterate the producer, feeding each element to `folder`, and |
102 | /// stop when the folder is full (or all elements have been consumed). |
103 | /// |
104 | /// The provided implementation is sufficient for most iterables. |
105 | fn fold_with<F>(self, folder: F) -> F |
106 | where |
107 | F: Folder<Self::Item>, |
108 | { |
109 | folder.consume_iter(self.into_iter()) |
110 | } |
111 | } |
112 | |
113 | /// A consumer is effectively a [generalized "fold" operation][fold], |
114 | /// and in fact each consumer will eventually be converted into a |
115 | /// [`Folder`]. What makes a consumer special is that, like a |
116 | /// [`Producer`], it can be **split** into multiple consumers using |
117 | /// the `split_at` method. When a consumer is split, it produces two |
118 | /// consumers, as well as a **reducer**. The two consumers can be fed |
119 | /// items independently, and when they are done the reducer is used to |
120 | /// combine their two results into one. See [the `plumbing` |
121 | /// README][r] for further details. |
122 | /// |
123 | /// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md |
124 | /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold |
125 | /// [`Folder`]: trait.Folder.html |
126 | /// [`Producer`]: trait.Producer.html |
127 | pub trait Consumer<Item>: Send + Sized { |
128 | /// The type of folder that this consumer can be converted into. |
129 | type Folder: Folder<Item, Result = Self::Result>; |
130 | |
131 | /// The type of reducer that is produced if this consumer is split. |
132 | type Reducer: Reducer<Self::Result>; |
133 | |
134 | /// The type of result that this consumer will ultimately produce. |
135 | type Result: Send; |
136 | |
137 | /// Divide the consumer into two consumers, one processing items |
138 | /// `0..index` and one processing items from `index..`. Also |
139 | /// produces a reducer that can be used to reduce the results at |
140 | /// the end. |
141 | fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); |
142 | |
143 | /// Convert the consumer into a folder that can consume items |
144 | /// sequentially, eventually producing a final result. |
145 | fn into_folder(self) -> Self::Folder; |
146 | |
147 | /// Hint whether this `Consumer` would like to stop processing |
148 | /// further items, e.g. if a search has been completed. |
149 | fn full(&self) -> bool; |
150 | } |
151 | |
152 | /// The `Folder` trait encapsulates [the standard fold |
153 | /// operation][fold]. It can be fed many items using the `consume` |
154 | /// method. At the end, once all items have been consumed, it can then |
155 | /// be converted (using `complete`) into a final value. |
156 | /// |
157 | /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold |
158 | pub trait Folder<Item>: Sized { |
159 | /// The type of result that will ultimately be produced by the folder. |
160 | type Result; |
161 | |
162 | /// Consume next item and return new sequential state. |
163 | fn consume(self, item: Item) -> Self; |
164 | |
165 | /// Consume items from the iterator until full, and return new sequential state. |
166 | /// |
167 | /// This method is **optional**. The default simply iterates over |
168 | /// `iter`, invoking `consume` and checking after each iteration |
169 | /// whether `full` returns false. |
170 | /// |
171 | /// The main reason to override it is if you can provide a more |
172 | /// specialized, efficient implementation. |
173 | fn consume_iter<I>(mut self, iter: I) -> Self |
174 | where |
175 | I: IntoIterator<Item = Item>, |
176 | { |
177 | for item in iter { |
178 | self = self.consume(item); |
179 | if self.full() { |
180 | break; |
181 | } |
182 | } |
183 | self |
184 | } |
185 | |
186 | /// Finish consuming items, produce final result. |
187 | fn complete(self) -> Self::Result; |
188 | |
189 | /// Hint whether this `Folder` would like to stop processing |
190 | /// further items, e.g. if a search has been completed. |
191 | fn full(&self) -> bool; |
192 | } |
193 | |
194 | /// The reducer is the final step of a `Consumer` -- after a consumer |
195 | /// has been split into two parts, and each of those parts has been |
196 | /// fully processed, we are left with two results. The reducer is then |
197 | /// used to combine those two results into one. See [the `plumbing` |
198 | /// README][r] for further details. |
199 | /// |
200 | /// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md |
201 | pub trait Reducer<Result> { |
202 | /// Reduce two final results into one; this is executed after a |
203 | /// split. |
204 | fn reduce(self, left: Result, right: Result) -> Result; |
205 | } |
206 | |
207 | /// A stateless consumer can be freely copied. These consumers can be |
208 | /// used like regular consumers, but they also support a |
209 | /// `split_off_left` method that does not take an index to split, but |
210 | /// simply splits at some arbitrary point (`for_each`, for example, |
211 | /// produces an unindexed consumer). |
212 | pub trait UnindexedConsumer<I>: Consumer<I> { |
213 | /// Splits off a "left" consumer and returns it. The `self` |
214 | /// consumer should then be used to consume the "right" portion of |
215 | /// the data. (The ordering matters for methods like find_first -- |
216 | /// values produced by the returned value are given precedence |
217 | /// over values produced by `self`.) Once the left and right |
218 | /// halves have been fully consumed, you should reduce the results |
219 | /// with the result of `to_reducer`. |
220 | fn split_off_left(&self) -> Self; |
221 | |
222 | /// Creates a reducer that can be used to combine the results from |
223 | /// a split consumer. |
224 | fn to_reducer(&self) -> Self::Reducer; |
225 | } |
226 | |
227 | /// A variant on `Producer` which does not know its exact length or |
228 | /// cannot represent it in a `usize`. These producers act like |
229 | /// ordinary producers except that they cannot be told to split at a |
230 | /// particular point. Instead, you just ask them to split 'somewhere'. |
231 | /// |
232 | /// (In principle, `Producer` could extend this trait; however, it |
233 | /// does not because to do so would require producers to carry their |
234 | /// own length with them.) |
235 | pub trait UnindexedProducer: Send + Sized { |
236 | /// The type of item returned by this producer. |
237 | type Item; |
238 | |
239 | /// Split midway into a new producer if possible, otherwise return `None`. |
240 | fn split(self) -> (Self, Option<Self>); |
241 | |
242 | /// Iterate the producer, feeding each element to `folder`, and |
243 | /// stop when the folder is full (or all elements have been consumed). |
244 | fn fold_with<F>(self, folder: F) -> F |
245 | where |
246 | F: Folder<Self::Item>; |
247 | } |
248 | |
249 | /// A splitter controls the policy for splitting into smaller work items. |
250 | /// |
251 | /// Thief-splitting is an adaptive policy that starts by splitting into |
252 | /// enough jobs for every worker thread, and then resets itself whenever a |
253 | /// job is actually stolen into a different thread. |
254 | #[derive (Clone, Copy)] |
255 | struct Splitter { |
256 | /// The `splits` tell us approximately how many remaining times we'd |
257 | /// like to split this job. We always just divide it by two though, so |
258 | /// the effective number of pieces will be `next_power_of_two()`. |
259 | splits: usize, |
260 | } |
261 | |
262 | impl Splitter { |
263 | #[inline ] |
264 | fn new() -> Splitter { |
265 | Splitter { |
266 | splits: crate::current_num_threads(), |
267 | } |
268 | } |
269 | |
270 | #[inline ] |
271 | fn try_split(&mut self, stolen: bool) -> bool { |
272 | let Splitter { splits } = *self; |
273 | |
274 | if stolen { |
275 | // This job was stolen! Reset the number of desired splits to the |
276 | // thread count, if that's more than we had remaining anyway. |
277 | self.splits = Ord::max(crate::current_num_threads(), self.splits / 2); |
278 | true |
279 | } else if splits > 0 { |
280 | // We have splits remaining, make it so. |
281 | self.splits /= 2; |
282 | true |
283 | } else { |
284 | // Not stolen, and no more splits -- we're done! |
285 | false |
286 | } |
287 | } |
288 | } |
289 | |
290 | /// The length splitter is built on thief-splitting, but additionally takes |
291 | /// into account the remaining length of the iterator. |
292 | #[derive (Clone, Copy)] |
293 | struct LengthSplitter { |
294 | inner: Splitter, |
295 | |
296 | /// The smallest we're willing to divide into. Usually this is just 1, |
297 | /// but you can choose a larger working size with `with_min_len()`. |
298 | min: usize, |
299 | } |
300 | |
301 | impl LengthSplitter { |
302 | /// Creates a new splitter based on lengths. |
303 | /// |
304 | /// The `min` is a hard lower bound. We'll never split below that, but |
305 | /// of course an iterator might start out smaller already. |
306 | /// |
307 | /// The `max` is an upper bound on the working size, used to determine |
308 | /// the minimum number of times we need to split to get under that limit. |
309 | /// The adaptive algorithm may very well split even further, but never |
310 | /// smaller than the `min`. |
311 | #[inline ] |
312 | fn new(min: usize, max: usize, len: usize) -> LengthSplitter { |
313 | let mut splitter = LengthSplitter { |
314 | inner: Splitter::new(), |
315 | min: Ord::max(min, 1), |
316 | }; |
317 | |
318 | // Divide the given length by the max working length to get the minimum |
319 | // number of splits we need to get under that max. This rounds down, |
320 | // but the splitter actually gives `next_power_of_two()` pieces anyway. |
321 | // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces. |
322 | let min_splits = len / Ord::max(max, 1); |
323 | |
324 | // Only update the value if it's not splitting enough already. |
325 | if min_splits > splitter.inner.splits { |
326 | splitter.inner.splits = min_splits; |
327 | } |
328 | |
329 | splitter |
330 | } |
331 | |
332 | #[inline ] |
333 | fn try_split(&mut self, len: usize, stolen: bool) -> bool { |
334 | // If splitting wouldn't make us too small, try the inner splitter. |
335 | len / 2 >= self.min && self.inner.try_split(stolen) |
336 | } |
337 | } |
338 | |
339 | /// This helper function is used to "connect" a parallel iterator to a |
340 | /// consumer. It will convert the `par_iter` into a producer P and |
341 | /// then pull items from P and feed them to `consumer`, splitting and |
342 | /// creating parallel threads as needed. |
343 | /// |
344 | /// This is useful when you are implementing your own parallel |
345 | /// iterators: it is often used as the definition of the |
346 | /// [`drive_unindexed`] or [`drive`] methods. |
347 | /// |
348 | /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed |
349 | /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive |
350 | pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result |
351 | where |
352 | I: IndexedParallelIterator, |
353 | C: Consumer<I::Item>, |
354 | { |
355 | let len: usize = par_iter.len(); |
356 | return par_iter.with_producer(Callback { len, consumer }); |
357 | |
358 | struct Callback<C> { |
359 | len: usize, |
360 | consumer: C, |
361 | } |
362 | |
363 | impl<C, I> ProducerCallback<I> for Callback<C> |
364 | where |
365 | C: Consumer<I>, |
366 | { |
367 | type Output = C::Result; |
368 | fn callback<P>(self, producer: P) -> C::Result |
369 | where |
370 | P: Producer<Item = I>, |
371 | { |
372 | bridge_producer_consumer(self.len, producer, self.consumer) |
373 | } |
374 | } |
375 | } |
376 | |
377 | /// This helper function is used to "connect" a producer and a |
378 | /// consumer. You may prefer to call [`bridge`], which wraps this |
379 | /// function. This function will draw items from `producer` and feed |
380 | /// them to `consumer`, splitting and creating parallel tasks when |
381 | /// needed. |
382 | /// |
383 | /// This is useful when you are implementing your own parallel |
384 | /// iterators: it is often used as the definition of the |
385 | /// [`drive_unindexed`] or [`drive`] methods. |
386 | /// |
387 | /// [`bridge`]: fn.bridge.html |
388 | /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed |
389 | /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive |
390 | pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result |
391 | where |
392 | P: Producer, |
393 | C: Consumer<P::Item>, |
394 | { |
395 | let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len); |
396 | return helper(len, false, splitter, producer, consumer); |
397 | |
398 | fn helper<P, C>( |
399 | len: usize, |
400 | migrated: bool, |
401 | mut splitter: LengthSplitter, |
402 | producer: P, |
403 | consumer: C, |
404 | ) -> C::Result |
405 | where |
406 | P: Producer, |
407 | C: Consumer<P::Item>, |
408 | { |
409 | if consumer.full() { |
410 | consumer.into_folder().complete() |
411 | } else if splitter.try_split(len, migrated) { |
412 | let mid = len / 2; |
413 | let (left_producer, right_producer) = producer.split_at(mid); |
414 | let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); |
415 | let (left_result, right_result) = join_context( |
416 | |context| { |
417 | helper( |
418 | mid, |
419 | context.migrated(), |
420 | splitter, |
421 | left_producer, |
422 | left_consumer, |
423 | ) |
424 | }, |
425 | |context| { |
426 | helper( |
427 | len - mid, |
428 | context.migrated(), |
429 | splitter, |
430 | right_producer, |
431 | right_consumer, |
432 | ) |
433 | }, |
434 | ); |
435 | reducer.reduce(left_result, right_result) |
436 | } else { |
437 | producer.fold_with(consumer.into_folder()).complete() |
438 | } |
439 | } |
440 | } |
441 | |
442 | /// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer. |
443 | /// |
444 | /// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html |
445 | pub fn bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result |
446 | where |
447 | P: UnindexedProducer, |
448 | C: UnindexedConsumer<P::Item>, |
449 | { |
450 | let splitter: Splitter = Splitter::new(); |
451 | bridge_unindexed_producer_consumer(migrated:false, splitter, producer, consumer) |
452 | } |
453 | |
454 | fn bridge_unindexed_producer_consumer<P, C>( |
455 | migrated: bool, |
456 | mut splitter: Splitter, |
457 | producer: P, |
458 | consumer: C, |
459 | ) -> C::Result |
460 | where |
461 | P: UnindexedProducer, |
462 | C: UnindexedConsumer<P::Item>, |
463 | { |
464 | if consumer.full() { |
465 | consumer.into_folder().complete() |
466 | } else if splitter.try_split(stolen:migrated) { |
467 | match producer.split() { |
468 | (left_producer: P, Some(right_producer: P)) => { |
469 | let (reducer: ::Item>>::Reducer, left_consumer: C, right_consumer: C) = |
470 | (consumer.to_reducer(), consumer.split_off_left(), consumer); |
471 | let bridge: fn bridge_unindexed_producer_consumer<…, …>(…) -> … = bridge_unindexed_producer_consumer; |
472 | let (left_result: ::Item>>::Result, right_result: ::Item>>::Result) = join_context( |
473 | |context| bridge(context.migrated(), splitter, left_producer, left_consumer), |
474 | |context: FnContext| bridge(context.migrated(), splitter, right_producer, right_consumer), |
475 | ); |
476 | reducer.reduce(left_result, right_result) |
477 | } |
478 | (producer: P, None) => producer.fold_with(consumer.into_folder()).complete(), |
479 | } |
480 | } else { |
481 | producer.fold_with(consumer.into_folder()).complete() |
482 | } |
483 | } |
484 | |