1use super::plumbing::*;
2use super::*;
3
4struct BlocksCallback<S, C> {
5 sizes: S,
6 consumer: C,
7 len: usize,
8}
9
10impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
11where
12 C: UnindexedConsumer<T>,
13 S: Iterator<Item = usize>,
14{
15 type Output = C::Result;
16
17 fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
18 let mut remaining_len = self.len;
19 let mut consumer = self.consumer;
20
21 // we need a local variable for the accumulated results
22 // we call the reducer's identity by splitting at 0
23 let (left_consumer, right_consumer, _) = consumer.split_at(0);
24 let mut leftmost_res = left_consumer.into_folder().complete();
25 consumer = right_consumer;
26
27 // now we loop on each block size
28 while remaining_len > 0 && !consumer.full() {
29 // we compute the next block's size
30 let size = self.sizes.next().unwrap_or(std::usize::MAX);
31 let capped_size = remaining_len.min(size);
32 remaining_len -= capped_size;
33
34 // split the producer
35 let (left_producer, right_producer) = producer.split_at(capped_size);
36 producer = right_producer;
37
38 // split the consumer
39 let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
40 consumer = right_consumer;
41
42 leftmost_res = consumer.to_reducer().reduce(
43 leftmost_res,
44 bridge_producer_consumer(capped_size, left_producer, left_consumer),
45 );
46 }
47 leftmost_res
48 }
49}
50
51/// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence
52/// of parallel blocks of increasing sizes (exponentially).
53///
54/// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`]
55///
56/// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks
57/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
58#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
59#[derive(Debug, Clone)]
60pub struct ExponentialBlocks<I> {
61 base: I,
62}
63
64impl<I> ExponentialBlocks<I> {
65 pub(super) fn new(base: I) -> Self {
66 Self { base }
67 }
68}
69
70impl<I> ParallelIterator for ExponentialBlocks<I>
71where
72 I: IndexedParallelIterator,
73{
74 type Item = I::Item;
75
76 fn drive_unindexed<C>(self, consumer: C) -> C::Result
77 where
78 C: UnindexedConsumer<Self::Item>,
79 {
80 let first: usize = crate::current_num_threads();
81 let callback: BlocksCallback, …> = BlocksCallback {
82 consumer,
83 sizes: std::iter::successors(first:Some(first), succ:exponential_size),
84 len: self.base.len(),
85 };
86 self.base.with_producer(callback)
87 }
88}
89
90fn exponential_size(size: &usize) -> Option<usize> {
91 Some(size.saturating_mul(2))
92}
93
94/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence
95/// of parallel blocks of constant sizes.
96///
97/// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`]
98///
99/// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks
100/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
101#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
102#[derive(Debug, Clone)]
103pub struct UniformBlocks<I> {
104 base: I,
105 block_size: usize,
106}
107
108impl<I> UniformBlocks<I> {
109 pub(super) fn new(base: I, block_size: usize) -> Self {
110 Self { base, block_size }
111 }
112}
113
114impl<I> ParallelIterator for UniformBlocks<I>
115where
116 I: IndexedParallelIterator,
117{
118 type Item = I::Item;
119
120 fn drive_unindexed<C>(self, consumer: C) -> C::Result
121 where
122 C: UnindexedConsumer<Self::Item>,
123 {
124 let callback: BlocksCallback, …> = BlocksCallback {
125 consumer,
126 sizes: std::iter::repeat(self.block_size),
127 len: self.base.len(),
128 };
129 self.base.with_producer(callback)
130 }
131}
132