1use super::plumbing::*;
2use super::*;
3
4/// `Flatten` turns each element to a parallel iterator, then flattens these iterators
5/// together. This struct is created by the [`flatten()`] method on [`ParallelIterator`].
6///
7/// [`flatten()`]: trait.ParallelIterator.html#method.flatten
8/// [`ParallelIterator`]: trait.ParallelIterator.html
9#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
10#[derive(Debug, Clone)]
11pub struct Flatten<I: ParallelIterator> {
12 base: I,
13}
14
15impl<I> Flatten<I>
16where
17 I: ParallelIterator,
18 I::Item: IntoParallelIterator,
19{
20 /// Creates a new `Flatten` iterator.
21 pub(super) fn new(base: I) -> Self {
22 Flatten { base }
23 }
24}
25
26impl<I> ParallelIterator for Flatten<I>
27where
28 I: ParallelIterator,
29 I::Item: IntoParallelIterator,
30{
31 type Item = <I::Item as IntoParallelIterator>::Item;
32
33 fn drive_unindexed<C>(self, consumer: C) -> C::Result
34 where
35 C: UnindexedConsumer<Self::Item>,
36 {
37 let consumer: FlattenConsumer = FlattenConsumer::new(base:consumer);
38 self.base.drive_unindexed(consumer)
39 }
40}
41
42/// ////////////////////////////////////////////////////////////////////////
43/// Consumer implementation
44
45struct FlattenConsumer<C> {
46 base: C,
47}
48
49impl<C> FlattenConsumer<C> {
50 fn new(base: C) -> Self {
51 FlattenConsumer { base }
52 }
53}
54
55impl<T, C> Consumer<T> for FlattenConsumer<C>
56where
57 C: UnindexedConsumer<T::Item>,
58 T: IntoParallelIterator,
59{
60 type Folder = FlattenFolder<C, C::Result>;
61 type Reducer = C::Reducer;
62 type Result = C::Result;
63
64 fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
65 let (left, right, reducer) = self.base.split_at(index);
66 (
67 FlattenConsumer::new(left),
68 FlattenConsumer::new(right),
69 reducer,
70 )
71 }
72
73 fn into_folder(self) -> Self::Folder {
74 FlattenFolder {
75 base: self.base,
76 previous: None,
77 }
78 }
79
80 fn full(&self) -> bool {
81 self.base.full()
82 }
83}
84
85impl<T, C> UnindexedConsumer<T> for FlattenConsumer<C>
86where
87 C: UnindexedConsumer<T::Item>,
88 T: IntoParallelIterator,
89{
90 fn split_off_left(&self) -> Self {
91 FlattenConsumer::new(self.base.split_off_left())
92 }
93
94 fn to_reducer(&self) -> Self::Reducer {
95 self.base.to_reducer()
96 }
97}
98
99struct FlattenFolder<C, R> {
100 base: C,
101 previous: Option<R>,
102}
103
104impl<T, C> Folder<T> for FlattenFolder<C, C::Result>
105where
106 C: UnindexedConsumer<T::Item>,
107 T: IntoParallelIterator,
108{
109 type Result = C::Result;
110
111 fn consume(self, item: T) -> Self {
112 let par_iter = item.into_par_iter();
113 let consumer = self.base.split_off_left();
114 let result = par_iter.drive_unindexed(consumer);
115
116 let previous = match self.previous {
117 None => Some(result),
118 Some(previous) => {
119 let reducer = self.base.to_reducer();
120 Some(reducer.reduce(previous, result))
121 }
122 };
123
124 FlattenFolder {
125 base: self.base,
126 previous,
127 }
128 }
129
130 fn complete(self) -> Self::Result {
131 match self.previous {
132 Some(previous) => previous,
133 None => self.base.into_folder().complete(),
134 }
135 }
136
137 fn full(&self) -> bool {
138 self.base.full()
139 }
140}
141