1 | use super::plumbing::*; |
2 | use super::*; |
3 | |
4 | struct BlocksCallback<S, C> { |
5 | sizes: S, |
6 | consumer: C, |
7 | len: usize, |
8 | } |
9 | |
10 | impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C> |
11 | where |
12 | C: UnindexedConsumer<T>, |
13 | S: Iterator<Item = usize>, |
14 | { |
15 | type Output = C::Result; |
16 | |
17 | fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output { |
18 | let mut remaining_len = self.len; |
19 | let mut consumer = self.consumer; |
20 | |
21 | // we need a local variable for the accumulated results |
22 | // we call the reducer's identity by splitting at 0 |
23 | let (left_consumer, right_consumer, _) = consumer.split_at(0); |
24 | let mut leftmost_res = left_consumer.into_folder().complete(); |
25 | consumer = right_consumer; |
26 | |
27 | // now we loop on each block size |
28 | while remaining_len > 0 && !consumer.full() { |
29 | // we compute the next block's size |
30 | let size = self.sizes.next().unwrap_or(std::usize::MAX); |
31 | let capped_size = remaining_len.min(size); |
32 | remaining_len -= capped_size; |
33 | |
34 | // split the producer |
35 | let (left_producer, right_producer) = producer.split_at(capped_size); |
36 | producer = right_producer; |
37 | |
38 | // split the consumer |
39 | let (left_consumer, right_consumer, _) = consumer.split_at(capped_size); |
40 | consumer = right_consumer; |
41 | |
42 | leftmost_res = consumer.to_reducer().reduce( |
43 | leftmost_res, |
44 | bridge_producer_consumer(capped_size, left_producer, left_consumer), |
45 | ); |
46 | } |
47 | leftmost_res |
48 | } |
49 | } |
50 | |
51 | /// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence |
52 | /// of parallel blocks of increasing sizes (exponentially). |
53 | /// |
54 | /// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`] |
55 | /// |
56 | /// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks |
57 | /// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html |
58 | #[must_use = "iterator adaptors are lazy and do nothing unless consumed" ] |
59 | #[derive (Debug, Clone)] |
60 | pub struct ExponentialBlocks<I> { |
61 | base: I, |
62 | } |
63 | |
64 | impl<I> ExponentialBlocks<I> { |
65 | pub(super) fn new(base: I) -> Self { |
66 | Self { base } |
67 | } |
68 | } |
69 | |
70 | impl<I> ParallelIterator for ExponentialBlocks<I> |
71 | where |
72 | I: IndexedParallelIterator, |
73 | { |
74 | type Item = I::Item; |
75 | |
76 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
77 | where |
78 | C: UnindexedConsumer<Self::Item>, |
79 | { |
80 | let first: usize = crate::current_num_threads(); |
81 | let callback: BlocksCallback, …> = BlocksCallback { |
82 | consumer, |
83 | sizes: std::iter::successors(first:Some(first), succ:exponential_size), |
84 | len: self.base.len(), |
85 | }; |
86 | self.base.with_producer(callback) |
87 | } |
88 | } |
89 | |
90 | fn exponential_size(size: &usize) -> Option<usize> { |
91 | Some(size.saturating_mul(2)) |
92 | } |
93 | |
94 | /// `UniformBlocks` is a parallel iterator that consumes itself as a sequence |
95 | /// of parallel blocks of constant sizes. |
96 | /// |
97 | /// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`] |
98 | /// |
99 | /// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks |
100 | /// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html |
101 | #[must_use = "iterator adaptors are lazy and do nothing unless consumed" ] |
102 | #[derive (Debug, Clone)] |
103 | pub struct UniformBlocks<I> { |
104 | base: I, |
105 | block_size: usize, |
106 | } |
107 | |
108 | impl<I> UniformBlocks<I> { |
109 | pub(super) fn new(base: I, block_size: usize) -> Self { |
110 | Self { base, block_size } |
111 | } |
112 | } |
113 | |
114 | impl<I> ParallelIterator for UniformBlocks<I> |
115 | where |
116 | I: IndexedParallelIterator, |
117 | { |
118 | type Item = I::Item; |
119 | |
120 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
121 | where |
122 | C: UnindexedConsumer<Self::Item>, |
123 | { |
124 | let callback: BlocksCallback, …> = BlocksCallback { |
125 | consumer, |
126 | sizes: std::iter::repeat(self.block_size), |
127 | len: self.base.len(), |
128 | }; |
129 | self.base.with_producer(callback) |
130 | } |
131 | } |
132 | |