1use super::plumbing::*;
2use super::*;
3
4/// This trait abstracts the different ways we can "unzip" one parallel
5/// iterator into two distinct consumers, which we can handle almost
6/// identically apart from how to process the individual items.
7trait UnzipOp<T>: Sync + Send {
8 /// The type of item expected by the left consumer.
9 type Left: Send;
10
11 /// The type of item expected by the right consumer.
12 type Right: Send;
13
14 /// Consumes one item and feeds it to one or both of the underlying folders.
15 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
16 where
17 FA: Folder<Self::Left>,
18 FB: Folder<Self::Right>;
19
20 /// Reports whether this op may support indexed consumers.
21 /// - e.g. true for `unzip` where the item count passed through directly.
22 /// - e.g. false for `partition` where the sorting is not yet known.
23 fn indexable() -> bool {
24 false
25 }
26}
27
28/// Runs an unzip-like operation into default `ParallelExtend` collections.
29fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)
30where
31 I: ParallelIterator,
32 OP: UnzipOp<I::Item>,
33 FromA: Default + Send + ParallelExtend<OP::Left>,
34 FromB: Default + Send + ParallelExtend<OP::Right>,
35{
36 let mut a = FromA::default();
37 let mut b = FromB::default();
38 execute_into(&mut a, &mut b, pi, op);
39 (a, b)
40}
41
42/// Runs an unzip-like operation into `ParallelExtend` collections.
43fn execute_into<I, OP, FromA, FromB>(a: &mut FromA, b: &mut FromB, pi: I, op: OP)
44where
45 I: ParallelIterator,
46 OP: UnzipOp<I::Item>,
47 FromA: Send + ParallelExtend<OP::Left>,
48 FromB: Send + ParallelExtend<OP::Right>,
49{
50 // We have no idea what the consumers will look like for these
51 // collections' `par_extend`, but we can intercept them in our own
52 // `drive_unindexed`. Start with the left side, type `A`:
53 let iter = UnzipA { base: pi, op, b };
54 a.par_extend(iter);
55}
56
57/// Unzips the items of a parallel iterator into a pair of arbitrary
58/// `ParallelExtend` containers.
59///
60/// This is called by `ParallelIterator::unzip`.
61pub(super) fn unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB)
62where
63 I: ParallelIterator<Item = (A, B)>,
64 FromA: Default + Send + ParallelExtend<A>,
65 FromB: Default + Send + ParallelExtend<B>,
66 A: Send,
67 B: Send,
68{
69 execute(pi, Unzip)
70}
71
72/// Unzips an `IndexedParallelIterator` into two arbitrary `Consumer`s.
73///
74/// This is called by `super::collect::unzip_into_vecs`.
75pub(super) fn unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result)
76where
77 I: IndexedParallelIterator<Item = (A, B)>,
78 CA: Consumer<A>,
79 CB: Consumer<B>,
80 A: Send,
81 B: Send,
82{
83 let consumer = UnzipConsumer {
84 op: &Unzip,
85 left,
86 right,
87 };
88 pi.drive(consumer)
89}
90
91/// An `UnzipOp` that splits a tuple directly into the two consumers.
92struct Unzip;
93
94impl<A: Send, B: Send> UnzipOp<(A, B)> for Unzip {
95 type Left = A;
96 type Right = B;
97
98 fn consume<FA, FB>(&self, item: (A, B), left: FA, right: FB) -> (FA, FB)
99 where
100 FA: Folder<A>,
101 FB: Folder<B>,
102 {
103 (left.consume(item.0), right.consume(item.1))
104 }
105
106 fn indexable() -> bool {
107 true
108 }
109}
110
111/// Partitions the items of a parallel iterator into a pair of arbitrary
112/// `ParallelExtend` containers.
113///
114/// This is called by `ParallelIterator::partition`.
115pub(super) fn partition<I, A, B, P>(pi: I, predicate: P) -> (A, B)
116where
117 I: ParallelIterator,
118 A: Default + Send + ParallelExtend<I::Item>,
119 B: Default + Send + ParallelExtend<I::Item>,
120 P: Fn(&I::Item) -> bool + Sync + Send,
121{
122 execute(pi, Partition { predicate })
123}
124
125/// An `UnzipOp` that routes items depending on a predicate function.
126struct Partition<P> {
127 predicate: P,
128}
129
130impl<P, T> UnzipOp<T> for Partition<P>
131where
132 P: Fn(&T) -> bool + Sync + Send,
133 T: Send,
134{
135 type Left = T;
136 type Right = T;
137
138 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
139 where
140 FA: Folder<T>,
141 FB: Folder<T>,
142 {
143 if (self.predicate)(&item) {
144 (left.consume(item), right)
145 } else {
146 (left, right.consume(item))
147 }
148 }
149}
150
151/// Partitions and maps the items of a parallel iterator into a pair of
152/// arbitrary `ParallelExtend` containers.
153///
154/// This called by `ParallelIterator::partition_map`.
155pub(super) fn partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B)
156where
157 I: ParallelIterator,
158 A: Default + Send + ParallelExtend<L>,
159 B: Default + Send + ParallelExtend<R>,
160 P: Fn(I::Item) -> Either<L, R> + Sync + Send,
161 L: Send,
162 R: Send,
163{
164 execute(pi, PartitionMap { predicate })
165}
166
167/// An `UnzipOp` that routes items depending on how they are mapped `Either`.
168struct PartitionMap<P> {
169 predicate: P,
170}
171
172impl<P, L, R, T> UnzipOp<T> for PartitionMap<P>
173where
174 P: Fn(T) -> Either<L, R> + Sync + Send,
175 L: Send,
176 R: Send,
177{
178 type Left = L;
179 type Right = R;
180
181 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
182 where
183 FA: Folder<L>,
184 FB: Folder<R>,
185 {
186 match (self.predicate)(item) {
187 Either::Left(item) => (left.consume(item), right),
188 Either::Right(item) => (left, right.consume(item)),
189 }
190 }
191}
192
193/// A fake iterator to intercept the `Consumer` for type `A`.
194struct UnzipA<'b, I, OP, FromB> {
195 base: I,
196 op: OP,
197 b: &'b mut FromB,
198}
199
200impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB>
201where
202 I: ParallelIterator,
203 OP: UnzipOp<I::Item>,
204 FromB: Send + ParallelExtend<OP::Right>,
205{
206 type Item = OP::Left;
207
208 fn drive_unindexed<C>(self, consumer: C) -> C::Result
209 where
210 C: UnindexedConsumer<Self::Item>,
211 {
212 let mut result = None;
213 {
214 // Now it's time to find the consumer for type `B`
215 let iter = UnzipB {
216 base: self.base,
217 op: self.op,
218 left_consumer: consumer,
219 left_result: &mut result,
220 };
221 self.b.par_extend(iter);
222 }
223 // NB: If for some reason `b.par_extend` doesn't actually drive the
224 // iterator, then we won't have a result for the left side to return
225 // at all. We can't fake an arbitrary consumer's result, so panic.
226 result.expect("unzip consumers didn't execute!")
227 }
228
229 fn opt_len(&self) -> Option<usize> {
230 if OP::indexable() {
231 self.base.opt_len()
232 } else {
233 None
234 }
235 }
236}
237
238/// A fake iterator to intercept the `Consumer` for type `B`.
239struct UnzipB<'r, I, OP, CA>
240where
241 I: ParallelIterator,
242 OP: UnzipOp<I::Item>,
243 CA: UnindexedConsumer<OP::Left>,
244 CA::Result: 'r,
245{
246 base: I,
247 op: OP,
248 left_consumer: CA,
249 left_result: &'r mut Option<CA::Result>,
250}
251
252impl<'r, I, OP, CA> ParallelIterator for UnzipB<'r, I, OP, CA>
253where
254 I: ParallelIterator,
255 OP: UnzipOp<I::Item>,
256 CA: UnindexedConsumer<OP::Left>,
257{
258 type Item = OP::Right;
259
260 fn drive_unindexed<C>(self, consumer: C) -> C::Result
261 where
262 C: UnindexedConsumer<Self::Item>,
263 {
264 // Now that we have two consumers, we can unzip the real iterator.
265 let consumer = UnzipConsumer {
266 op: &self.op,
267 left: self.left_consumer,
268 right: consumer,
269 };
270
271 let result = self.base.drive_unindexed(consumer);
272 *self.left_result = Some(result.0);
273 result.1
274 }
275
276 fn opt_len(&self) -> Option<usize> {
277 if OP::indexable() {
278 self.base.opt_len()
279 } else {
280 None
281 }
282 }
283}
284
285/// `Consumer` that unzips into two other `Consumer`s
286struct UnzipConsumer<'a, OP, CA, CB> {
287 op: &'a OP,
288 left: CA,
289 right: CB,
290}
291
292impl<'a, T, OP, CA, CB> Consumer<T> for UnzipConsumer<'a, OP, CA, CB>
293where
294 OP: UnzipOp<T>,
295 CA: Consumer<OP::Left>,
296 CB: Consumer<OP::Right>,
297{
298 type Folder = UnzipFolder<'a, OP, CA::Folder, CB::Folder>;
299 type Reducer = UnzipReducer<CA::Reducer, CB::Reducer>;
300 type Result = (CA::Result, CB::Result);
301
302 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
303 let (left1, left2, left_reducer) = self.left.split_at(index);
304 let (right1, right2, right_reducer) = self.right.split_at(index);
305
306 (
307 UnzipConsumer {
308 op: self.op,
309 left: left1,
310 right: right1,
311 },
312 UnzipConsumer {
313 op: self.op,
314 left: left2,
315 right: right2,
316 },
317 UnzipReducer {
318 left: left_reducer,
319 right: right_reducer,
320 },
321 )
322 }
323
324 fn into_folder(self) -> Self::Folder {
325 UnzipFolder {
326 op: self.op,
327 left: self.left.into_folder(),
328 right: self.right.into_folder(),
329 }
330 }
331
332 fn full(&self) -> bool {
333 // don't stop until everyone is full
334 self.left.full() && self.right.full()
335 }
336}
337
338impl<'a, T, OP, CA, CB> UnindexedConsumer<T> for UnzipConsumer<'a, OP, CA, CB>
339where
340 OP: UnzipOp<T>,
341 CA: UnindexedConsumer<OP::Left>,
342 CB: UnindexedConsumer<OP::Right>,
343{
344 fn split_off_left(&self) -> Self {
345 UnzipConsumer {
346 op: self.op,
347 left: self.left.split_off_left(),
348 right: self.right.split_off_left(),
349 }
350 }
351
352 fn to_reducer(&self) -> Self::Reducer {
353 UnzipReducer {
354 left: self.left.to_reducer(),
355 right: self.right.to_reducer(),
356 }
357 }
358}
359
360/// `Folder` that unzips into two other `Folder`s
361struct UnzipFolder<'a, OP, FA, FB> {
362 op: &'a OP,
363 left: FA,
364 right: FB,
365}
366
367impl<'a, T, OP, FA, FB> Folder<T> for UnzipFolder<'a, OP, FA, FB>
368where
369 OP: UnzipOp<T>,
370 FA: Folder<OP::Left>,
371 FB: Folder<OP::Right>,
372{
373 type Result = (FA::Result, FB::Result);
374
375 fn consume(self, item: T) -> Self {
376 let (left, right) = self.op.consume(item, self.left, self.right);
377 UnzipFolder {
378 op: self.op,
379 left,
380 right,
381 }
382 }
383
384 fn complete(self) -> Self::Result {
385 (self.left.complete(), self.right.complete())
386 }
387
388 fn full(&self) -> bool {
389 // don't stop until everyone is full
390 self.left.full() && self.right.full()
391 }
392}
393
394/// `Reducer` that unzips into two other `Reducer`s
395struct UnzipReducer<RA, RB> {
396 left: RA,
397 right: RB,
398}
399
400impl<A, B, RA, RB> Reducer<(A, B)> for UnzipReducer<RA, RB>
401where
402 RA: Reducer<A>,
403 RB: Reducer<B>,
404{
405 fn reduce(self, left: (A, B), right: (A, B)) -> (A, B) {
406 (
407 self.left.reduce(left.0, right.0),
408 self.right.reduce(left.1, right.1),
409 )
410 }
411}
412
413impl<A, B, FromA, FromB> ParallelExtend<(A, B)> for (FromA, FromB)
414where
415 A: Send,
416 B: Send,
417 FromA: Send + ParallelExtend<A>,
418 FromB: Send + ParallelExtend<B>,
419{
420 fn par_extend<I>(&mut self, pi: I)
421 where
422 I: IntoParallelIterator<Item = (A, B)>,
423 {
424 execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), Unzip);
425 }
426}
427
428impl<L, R, A, B> ParallelExtend<Either<L, R>> for (A, B)
429where
430 L: Send,
431 R: Send,
432 A: Send + ParallelExtend<L>,
433 B: Send + ParallelExtend<R>,
434{
435 fn par_extend<I>(&mut self, pi: I)
436 where
437 I: IntoParallelIterator<Item = Either<L, R>>,
438 {
439 execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), UnEither);
440 }
441}
442
443/// An `UnzipOp` that routes items depending on their `Either` variant.
444struct UnEither;
445
446impl<L, R> UnzipOp<Either<L, R>> for UnEither
447where
448 L: Send,
449 R: Send,
450{
451 type Left = L;
452 type Right = R;
453
454 fn consume<FL, FR>(&self, item: Either<L, R>, left: FL, right: FR) -> (FL, FR)
455 where
456 FL: Folder<L>,
457 FR: Folder<R>,
458 {
459 match item {
460 Either::Left(item) => (left.consume(item), right),
461 Either::Right(item) => (left, right.consume(item)),
462 }
463 }
464}
465
466impl<A, B, FromA, FromB> FromParallelIterator<(A, B)> for (FromA, FromB)
467where
468 A: Send,
469 B: Send,
470 FromA: Send + FromParallelIterator<A>,
471 FromB: Send + FromParallelIterator<B>,
472{
473 fn from_par_iter<I>(pi: I) -> Self
474 where
475 I: IntoParallelIterator<Item = (A, B)>,
476 {
477 let (a, b): (Collector<FromA>, Collector<FromB>) = pi.into_par_iter().unzip();
478 (a.result.unwrap(), b.result.unwrap())
479 }
480}
481
482impl<L, R, A, B> FromParallelIterator<Either<L, R>> for (A, B)
483where
484 L: Send,
485 R: Send,
486 A: Send + FromParallelIterator<L>,
487 B: Send + FromParallelIterator<R>,
488{
489 fn from_par_iter<I>(pi: I) -> Self
490 where
491 I: IntoParallelIterator<Item = Either<L, R>>,
492 {
493 fn identity<T>(x: T) -> T {
494 x
495 }
496
497 let (a, b): (Collector<A>, Collector<B>) = pi.into_par_iter().partition_map(identity);
498 (a.result.unwrap(), b.result.unwrap())
499 }
500}
501
502/// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`.
503struct Collector<FromT> {
504 result: Option<FromT>,
505}
506
507impl<FromT> Default for Collector<FromT> {
508 fn default() -> Self {
509 Collector { result: None }
510 }
511}
512
513impl<T, FromT> ParallelExtend<T> for Collector<FromT>
514where
515 T: Send,
516 FromT: Send + FromParallelIterator<T>,
517{
518 fn par_extend<I>(&mut self, pi: I)
519 where
520 I: IntoParallelIterator<Item = T>,
521 {
522 debug_assert!(self.result.is_none());
523 self.result = Some(pi.into_par_iter().collect());
524 }
525}
526