| 1 | use super::plumbing::*; |
| 2 | use super::*; |
| 3 | use rayon_core::join; |
| 4 | use std::cmp; |
| 5 | use std::iter; |
| 6 | |
| 7 | /// `Chain` is an iterator that joins `b` after `a` in one continuous iterator. |
| 8 | /// This struct is created by the [`chain()`] method on [`ParallelIterator`] |
| 9 | /// |
| 10 | /// [`chain()`]: trait.ParallelIterator.html#method.chain |
| 11 | /// [`ParallelIterator`]: trait.ParallelIterator.html |
| 12 | #[must_use = "iterator adaptors are lazy and do nothing unless consumed" ] |
| 13 | #[derive (Debug, Clone)] |
| 14 | pub struct Chain<A, B> |
| 15 | where |
| 16 | A: ParallelIterator, |
| 17 | B: ParallelIterator<Item = A::Item>, |
| 18 | { |
| 19 | a: A, |
| 20 | b: B, |
| 21 | } |
| 22 | |
| 23 | impl<A, B> Chain<A, B> |
| 24 | where |
| 25 | A: ParallelIterator, |
| 26 | B: ParallelIterator<Item = A::Item>, |
| 27 | { |
| 28 | /// Creates a new `Chain` iterator. |
| 29 | pub(super) fn new(a: A, b: B) -> Self { |
| 30 | Chain { a, b } |
| 31 | } |
| 32 | } |
| 33 | |
| 34 | impl<A, B> ParallelIterator for Chain<A, B> |
| 35 | where |
| 36 | A: ParallelIterator, |
| 37 | B: ParallelIterator<Item = A::Item>, |
| 38 | { |
| 39 | type Item = A::Item; |
| 40 | |
| 41 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
| 42 | where |
| 43 | C: UnindexedConsumer<Self::Item>, |
| 44 | { |
| 45 | let Chain { a, b } = self; |
| 46 | |
| 47 | // If we returned a value from our own `opt_len`, then the collect consumer in particular |
| 48 | // will balk at being treated like an actual `UnindexedConsumer`. But when we do know the |
| 49 | // length, we can use `Consumer::split_at` instead, and this is still harmless for other |
| 50 | // truly-unindexed consumers too. |
| 51 | let (left, right, reducer) = if let Some(len) = a.opt_len() { |
| 52 | consumer.split_at(len) |
| 53 | } else { |
| 54 | let reducer = consumer.to_reducer(); |
| 55 | (consumer.split_off_left(), consumer, reducer) |
| 56 | }; |
| 57 | |
| 58 | let (a, b) = join(|| a.drive_unindexed(left), || b.drive_unindexed(right)); |
| 59 | reducer.reduce(a, b) |
| 60 | } |
| 61 | |
| 62 | fn opt_len(&self) -> Option<usize> { |
| 63 | self.a.opt_len()?.checked_add(self.b.opt_len()?) |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | impl<A, B> IndexedParallelIterator for Chain<A, B> |
| 68 | where |
| 69 | A: IndexedParallelIterator, |
| 70 | B: IndexedParallelIterator<Item = A::Item>, |
| 71 | { |
| 72 | fn drive<C>(self, consumer: C) -> C::Result |
| 73 | where |
| 74 | C: Consumer<Self::Item>, |
| 75 | { |
| 76 | let Chain { a, b } = self; |
| 77 | let (left, right, reducer) = consumer.split_at(a.len()); |
| 78 | let (a, b) = join(|| a.drive(left), || b.drive(right)); |
| 79 | reducer.reduce(a, b) |
| 80 | } |
| 81 | |
| 82 | fn len(&self) -> usize { |
| 83 | self.a.len().checked_add(self.b.len()).expect("overflow" ) |
| 84 | } |
| 85 | |
| 86 | fn with_producer<CB>(self, callback: CB) -> CB::Output |
| 87 | where |
| 88 | CB: ProducerCallback<Self::Item>, |
| 89 | { |
| 90 | let a_len = self.a.len(); |
| 91 | return self.a.with_producer(CallbackA { |
| 92 | callback, |
| 93 | a_len, |
| 94 | b: self.b, |
| 95 | }); |
| 96 | |
| 97 | struct CallbackA<CB, B> { |
| 98 | callback: CB, |
| 99 | a_len: usize, |
| 100 | b: B, |
| 101 | } |
| 102 | |
| 103 | impl<CB, B> ProducerCallback<B::Item> for CallbackA<CB, B> |
| 104 | where |
| 105 | B: IndexedParallelIterator, |
| 106 | CB: ProducerCallback<B::Item>, |
| 107 | { |
| 108 | type Output = CB::Output; |
| 109 | |
| 110 | fn callback<A>(self, a_producer: A) -> Self::Output |
| 111 | where |
| 112 | A: Producer<Item = B::Item>, |
| 113 | { |
| 114 | self.b.with_producer(CallbackB { |
| 115 | callback: self.callback, |
| 116 | a_len: self.a_len, |
| 117 | a_producer, |
| 118 | }) |
| 119 | } |
| 120 | } |
| 121 | |
| 122 | struct CallbackB<CB, A> { |
| 123 | callback: CB, |
| 124 | a_len: usize, |
| 125 | a_producer: A, |
| 126 | } |
| 127 | |
| 128 | impl<CB, A> ProducerCallback<A::Item> for CallbackB<CB, A> |
| 129 | where |
| 130 | A: Producer, |
| 131 | CB: ProducerCallback<A::Item>, |
| 132 | { |
| 133 | type Output = CB::Output; |
| 134 | |
| 135 | fn callback<B>(self, b_producer: B) -> Self::Output |
| 136 | where |
| 137 | B: Producer<Item = A::Item>, |
| 138 | { |
| 139 | let producer = ChainProducer::new(self.a_len, self.a_producer, b_producer); |
| 140 | self.callback.callback(producer) |
| 141 | } |
| 142 | } |
| 143 | } |
| 144 | } |
| 145 | |
| 146 | /// //////////////////////////////////////////////////////////////////////// |
| 147 | |
| 148 | struct ChainProducer<A, B> |
| 149 | where |
| 150 | A: Producer, |
| 151 | B: Producer<Item = A::Item>, |
| 152 | { |
| 153 | a_len: usize, |
| 154 | a: A, |
| 155 | b: B, |
| 156 | } |
| 157 | |
| 158 | impl<A, B> ChainProducer<A, B> |
| 159 | where |
| 160 | A: Producer, |
| 161 | B: Producer<Item = A::Item>, |
| 162 | { |
| 163 | fn new(a_len: usize, a: A, b: B) -> Self { |
| 164 | ChainProducer { a_len, a, b } |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | impl<A, B> Producer for ChainProducer<A, B> |
| 169 | where |
| 170 | A: Producer, |
| 171 | B: Producer<Item = A::Item>, |
| 172 | { |
| 173 | type Item = A::Item; |
| 174 | type IntoIter = ChainSeq<A::IntoIter, B::IntoIter>; |
| 175 | |
| 176 | fn into_iter(self) -> Self::IntoIter { |
| 177 | ChainSeq::new(self.a.into_iter(), self.b.into_iter()) |
| 178 | } |
| 179 | |
| 180 | fn min_len(&self) -> usize { |
| 181 | cmp::max(self.a.min_len(), self.b.min_len()) |
| 182 | } |
| 183 | |
| 184 | fn max_len(&self) -> usize { |
| 185 | cmp::min(self.a.max_len(), self.b.max_len()) |
| 186 | } |
| 187 | |
| 188 | fn split_at(self, index: usize) -> (Self, Self) { |
| 189 | if index <= self.a_len { |
| 190 | let a_rem = self.a_len - index; |
| 191 | let (a_left, a_right) = self.a.split_at(index); |
| 192 | let (b_left, b_right) = self.b.split_at(0); |
| 193 | ( |
| 194 | ChainProducer::new(index, a_left, b_left), |
| 195 | ChainProducer::new(a_rem, a_right, b_right), |
| 196 | ) |
| 197 | } else { |
| 198 | let (a_left, a_right) = self.a.split_at(self.a_len); |
| 199 | let (b_left, b_right) = self.b.split_at(index - self.a_len); |
| 200 | ( |
| 201 | ChainProducer::new(self.a_len, a_left, b_left), |
| 202 | ChainProducer::new(0, a_right, b_right), |
| 203 | ) |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | fn fold_with<F>(self, mut folder: F) -> F |
| 208 | where |
| 209 | F: Folder<A::Item>, |
| 210 | { |
| 211 | folder = self.a.fold_with(folder); |
| 212 | if folder.full() { |
| 213 | folder |
| 214 | } else { |
| 215 | self.b.fold_with(folder) |
| 216 | } |
| 217 | } |
| 218 | } |
| 219 | |
| 220 | /// //////////////////////////////////////////////////////////////////////// |
| 221 | /// Wrapper for Chain to implement ExactSizeIterator |
| 222 | |
| 223 | struct ChainSeq<A, B> { |
| 224 | chain: iter::Chain<A, B>, |
| 225 | } |
| 226 | |
| 227 | impl<A, B> ChainSeq<A, B> { |
| 228 | fn new(a: A, b: B) -> ChainSeq<A, B> |
| 229 | where |
| 230 | A: ExactSizeIterator, |
| 231 | B: ExactSizeIterator<Item = A::Item>, |
| 232 | { |
| 233 | ChainSeq { chain: a.chain(b) } |
| 234 | } |
| 235 | } |
| 236 | |
| 237 | impl<A, B> Iterator for ChainSeq<A, B> |
| 238 | where |
| 239 | A: Iterator, |
| 240 | B: Iterator<Item = A::Item>, |
| 241 | { |
| 242 | type Item = A::Item; |
| 243 | |
| 244 | fn next(&mut self) -> Option<Self::Item> { |
| 245 | self.chain.next() |
| 246 | } |
| 247 | |
| 248 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 249 | self.chain.size_hint() |
| 250 | } |
| 251 | } |
| 252 | |
| 253 | impl<A, B> ExactSizeIterator for ChainSeq<A, B> |
| 254 | where |
| 255 | A: ExactSizeIterator, |
| 256 | B: ExactSizeIterator<Item = A::Item>, |
| 257 | { |
| 258 | } |
| 259 | |
| 260 | impl<A, B> DoubleEndedIterator for ChainSeq<A, B> |
| 261 | where |
| 262 | A: DoubleEndedIterator, |
| 263 | B: DoubleEndedIterator<Item = A::Item>, |
| 264 | { |
| 265 | fn next_back(&mut self) -> Option<Self::Item> { |
| 266 | self.chain.next_back() |
| 267 | } |
| 268 | } |
| 269 | |