| 1 | use super::plumbing::*; | 
| 2 | use super::*; | 
| 3 |  | 
| 4 | /// This trait abstracts the different ways we can "unzip" one parallel | 
| 5 | /// iterator into two distinct consumers, which we can handle almost | 
| 6 | /// identically apart from how to process the individual items. | 
| 7 | trait UnzipOp<T>: Sync + Send { | 
| 8 |     /// The type of item expected by the left consumer. | 
| 9 |     type Left: Send; | 
| 10 |  | 
| 11 |     /// The type of item expected by the right consumer. | 
| 12 |     type Right: Send; | 
| 13 |  | 
| 14 |     /// Consumes one item and feeds it to one or both of the underlying folders. | 
| 15 |     fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB) | 
| 16 |     where | 
| 17 |         FA: Folder<Self::Left>, | 
| 18 |         FB: Folder<Self::Right>; | 
| 19 |  | 
| 20 |     /// Reports whether this op may support indexed consumers. | 
| 21 |     /// - e.g. true for `unzip` where the item count passed through directly. | 
| 22 |     /// - e.g. false for `partition` where the sorting is not yet known. | 
| 23 |     fn indexable() -> bool { | 
| 24 |         false | 
| 25 |     } | 
| 26 | } | 
| 27 |  | 
| 28 | /// Runs an unzip-like operation into default `ParallelExtend` collections. | 
| 29 | fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB) | 
| 30 | where | 
| 31 |     I: ParallelIterator, | 
| 32 |     OP: UnzipOp<I::Item>, | 
| 33 |     FromA: Default + Send + ParallelExtend<OP::Left>, | 
| 34 |     FromB: Default + Send + ParallelExtend<OP::Right>, | 
| 35 | { | 
| 36 |     let mut a: FromA = FromA::default(); | 
| 37 |     let mut b: FromB = FromB::default(); | 
| 38 |     execute_into(&mut a, &mut b, pi, op); | 
| 39 |     (a, b) | 
| 40 | } | 
| 41 |  | 
| 42 | /// Runs an unzip-like operation into `ParallelExtend` collections. | 
| 43 | fn execute_into<I, OP, FromA, FromB>(a: &mut FromA, b: &mut FromB, pi: I, op: OP) | 
| 44 | where | 
| 45 |     I: ParallelIterator, | 
| 46 |     OP: UnzipOp<I::Item>, | 
| 47 |     FromA: Send + ParallelExtend<OP::Left>, | 
| 48 |     FromB: Send + ParallelExtend<OP::Right>, | 
| 49 | { | 
| 50 |     // We have no idea what the consumers will look like for these | 
| 51 |     // collections' `par_extend`, but we can intercept them in our own | 
| 52 |     // `drive_unindexed`.  Start with the left side, type `A`: | 
| 53 |     let iter: UnzipA<'_, I, OP, FromB> = UnzipA { base: pi, op, b }; | 
| 54 |     a.par_extend(iter); | 
| 55 | } | 
| 56 |  | 
| 57 | /// Unzips the items of a parallel iterator into a pair of arbitrary | 
| 58 | /// `ParallelExtend` containers. | 
| 59 | /// | 
| 60 | /// This is called by `ParallelIterator::unzip`. | 
| 61 | pub(super) fn unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB) | 
| 62 | where | 
| 63 |     I: ParallelIterator<Item = (A, B)>, | 
| 64 |     FromA: Default + Send + ParallelExtend<A>, | 
| 65 |     FromB: Default + Send + ParallelExtend<B>, | 
| 66 |     A: Send, | 
| 67 |     B: Send, | 
| 68 | { | 
| 69 |     execute(pi, op:Unzip) | 
| 70 | } | 
| 71 |  | 
| 72 | /// Unzips an `IndexedParallelIterator` into two arbitrary `Consumer`s. | 
| 73 | /// | 
| 74 | /// This is called by `super::collect::unzip_into_vecs`. | 
| 75 | pub(super) fn unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result) | 
| 76 | where | 
| 77 |     I: IndexedParallelIterator<Item = (A, B)>, | 
| 78 |     CA: Consumer<A>, | 
| 79 |     CB: Consumer<B>, | 
| 80 |     A: Send, | 
| 81 |     B: Send, | 
| 82 | { | 
| 83 |     let consumer: UnzipConsumer<'_, Unzip, …, …> = UnzipConsumer { | 
| 84 |         op: &Unzip, | 
| 85 |         left, | 
| 86 |         right, | 
| 87 |     }; | 
| 88 |     pi.drive(consumer) | 
| 89 | } | 
| 90 |  | 
| 91 | /// An `UnzipOp` that splits a tuple directly into the two consumers. | 
| 92 | struct Unzip; | 
| 93 |  | 
| 94 | impl<A: Send, B: Send> UnzipOp<(A, B)> for Unzip { | 
| 95 |     type Left = A; | 
| 96 |     type Right = B; | 
| 97 |  | 
| 98 |     fn consume<FA, FB>(&self, item: (A, B), left: FA, right: FB) -> (FA, FB) | 
| 99 |     where | 
| 100 |         FA: Folder<A>, | 
| 101 |         FB: Folder<B>, | 
| 102 |     { | 
| 103 |         (left.consume(item.0), right.consume(item.1)) | 
| 104 |     } | 
| 105 |  | 
| 106 |     fn indexable() -> bool { | 
| 107 |         true | 
| 108 |     } | 
| 109 | } | 
| 110 |  | 
| 111 | /// Partitions the items of a parallel iterator into a pair of arbitrary | 
| 112 | /// `ParallelExtend` containers. | 
| 113 | /// | 
| 114 | /// This is called by `ParallelIterator::partition`. | 
| 115 | pub(super) fn partition<I, A, B, P>(pi: I, predicate: P) -> (A, B) | 
| 116 | where | 
| 117 |     I: ParallelIterator, | 
| 118 |     A: Default + Send + ParallelExtend<I::Item>, | 
| 119 |     B: Default + Send + ParallelExtend<I::Item>, | 
| 120 |     P: Fn(&I::Item) -> bool + Sync + Send, | 
| 121 | { | 
| 122 |     execute(pi, op:Partition { predicate }) | 
| 123 | } | 
| 124 |  | 
| 125 | /// An `UnzipOp` that routes items depending on a predicate function. | 
| 126 | struct Partition<P> { | 
| 127 |     predicate: P, | 
| 128 | } | 
| 129 |  | 
| 130 | impl<P, T> UnzipOp<T> for Partition<P> | 
| 131 | where | 
| 132 |     P: Fn(&T) -> bool + Sync + Send, | 
| 133 |     T: Send, | 
| 134 | { | 
| 135 |     type Left = T; | 
| 136 |     type Right = T; | 
| 137 |  | 
| 138 |     fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB) | 
| 139 |     where | 
| 140 |         FA: Folder<T>, | 
| 141 |         FB: Folder<T>, | 
| 142 |     { | 
| 143 |         if (self.predicate)(&item) { | 
| 144 |             (left.consume(item), right) | 
| 145 |         } else { | 
| 146 |             (left, right.consume(item)) | 
| 147 |         } | 
| 148 |     } | 
| 149 | } | 
| 150 |  | 
| 151 | /// Partitions and maps the items of a parallel iterator into a pair of | 
| 152 | /// arbitrary `ParallelExtend` containers. | 
| 153 | /// | 
| 154 | /// This called by `ParallelIterator::partition_map`. | 
| 155 | pub(super) fn partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B) | 
| 156 | where | 
| 157 |     I: ParallelIterator, | 
| 158 |     A: Default + Send + ParallelExtend<L>, | 
| 159 |     B: Default + Send + ParallelExtend<R>, | 
| 160 |     P: Fn(I::Item) -> Either<L, R> + Sync + Send, | 
| 161 |     L: Send, | 
| 162 |     R: Send, | 
| 163 | { | 
| 164 |     execute(pi, op:PartitionMap { predicate }) | 
| 165 | } | 
| 166 |  | 
| 167 | /// An `UnzipOp` that routes items depending on how they are mapped `Either`. | 
| 168 | struct PartitionMap<P> { | 
| 169 |     predicate: P, | 
| 170 | } | 
| 171 |  | 
| 172 | impl<P, L, R, T> UnzipOp<T> for PartitionMap<P> | 
| 173 | where | 
| 174 |     P: Fn(T) -> Either<L, R> + Sync + Send, | 
| 175 |     L: Send, | 
| 176 |     R: Send, | 
| 177 | { | 
| 178 |     type Left = L; | 
| 179 |     type Right = R; | 
| 180 |  | 
| 181 |     fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB) | 
| 182 |     where | 
| 183 |         FA: Folder<L>, | 
| 184 |         FB: Folder<R>, | 
| 185 |     { | 
| 186 |         match (self.predicate)(item) { | 
| 187 |             Either::Left(item: L) => (left.consume(item), right), | 
| 188 |             Either::Right(item: R) => (left, right.consume(item)), | 
| 189 |         } | 
| 190 |     } | 
| 191 | } | 
| 192 |  | 
| 193 | /// A fake iterator to intercept the `Consumer` for type `A`. | 
| 194 | struct UnzipA<'b, I, OP, FromB> { | 
| 195 |     base: I, | 
| 196 |     op: OP, | 
| 197 |     b: &'b mut FromB, | 
| 198 | } | 
| 199 |  | 
| 200 | impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB> | 
| 201 | where | 
| 202 |     I: ParallelIterator, | 
| 203 |     OP: UnzipOp<I::Item>, | 
| 204 |     FromB: Send + ParallelExtend<OP::Right>, | 
| 205 | { | 
| 206 |     type Item = OP::Left; | 
| 207 |  | 
| 208 |     fn drive_unindexed<C>(self, consumer: C) -> C::Result | 
| 209 |     where | 
| 210 |         C: UnindexedConsumer<Self::Item>, | 
| 211 |     { | 
| 212 |         let mut result = None; | 
| 213 |         { | 
| 214 |             // Now it's time to find the consumer for type `B` | 
| 215 |             let iter = UnzipB { | 
| 216 |                 base: self.base, | 
| 217 |                 op: self.op, | 
| 218 |                 left_consumer: consumer, | 
| 219 |                 left_result: &mut result, | 
| 220 |             }; | 
| 221 |             self.b.par_extend(iter); | 
| 222 |         } | 
| 223 |         // NB: If for some reason `b.par_extend` doesn't actually drive the | 
| 224 |         // iterator, then we won't have a result for the left side to return | 
| 225 |         // at all.  We can't fake an arbitrary consumer's result, so panic. | 
| 226 |         result.expect("unzip consumers didn't execute!" ) | 
| 227 |     } | 
| 228 |  | 
| 229 |     fn opt_len(&self) -> Option<usize> { | 
| 230 |         if OP::indexable() { | 
| 231 |             self.base.opt_len() | 
| 232 |         } else { | 
| 233 |             None | 
| 234 |         } | 
| 235 |     } | 
| 236 | } | 
| 237 |  | 
| 238 | /// A fake iterator to intercept the `Consumer` for type `B`. | 
| 239 | struct UnzipB<'r, I, OP, CA> | 
| 240 | where | 
| 241 |     I: ParallelIterator, | 
| 242 |     OP: UnzipOp<I::Item>, | 
| 243 |     CA: UnindexedConsumer<OP::Left>, | 
| 244 |     CA::Result: 'r, | 
| 245 | { | 
| 246 |     base: I, | 
| 247 |     op: OP, | 
| 248 |     left_consumer: CA, | 
| 249 |     left_result: &'r mut Option<CA::Result>, | 
| 250 | } | 
| 251 |  | 
| 252 | impl<'r, I, OP, CA> ParallelIterator for UnzipB<'r, I, OP, CA> | 
| 253 | where | 
| 254 |     I: ParallelIterator, | 
| 255 |     OP: UnzipOp<I::Item>, | 
| 256 |     CA: UnindexedConsumer<OP::Left>, | 
| 257 | { | 
| 258 |     type Item = OP::Right; | 
| 259 |  | 
| 260 |     fn drive_unindexed<C>(self, consumer: C) -> C::Result | 
| 261 |     where | 
| 262 |         C: UnindexedConsumer<Self::Item>, | 
| 263 |     { | 
| 264 |         // Now that we have two consumers, we can unzip the real iterator. | 
| 265 |         let consumer = UnzipConsumer { | 
| 266 |             op: &self.op, | 
| 267 |             left: self.left_consumer, | 
| 268 |             right: consumer, | 
| 269 |         }; | 
| 270 |  | 
| 271 |         let result = self.base.drive_unindexed(consumer); | 
| 272 |         *self.left_result = Some(result.0); | 
| 273 |         result.1 | 
| 274 |     } | 
| 275 |  | 
| 276 |     fn opt_len(&self) -> Option<usize> { | 
| 277 |         if OP::indexable() { | 
| 278 |             self.base.opt_len() | 
| 279 |         } else { | 
| 280 |             None | 
| 281 |         } | 
| 282 |     } | 
| 283 | } | 
| 284 |  | 
| 285 | /// `Consumer` that unzips into two other `Consumer`s | 
| 286 | struct UnzipConsumer<'a, OP, CA, CB> { | 
| 287 |     op: &'a OP, | 
| 288 |     left: CA, | 
| 289 |     right: CB, | 
| 290 | } | 
| 291 |  | 
| 292 | impl<'a, T, OP, CA, CB> Consumer<T> for UnzipConsumer<'a, OP, CA, CB> | 
| 293 | where | 
| 294 |     OP: UnzipOp<T>, | 
| 295 |     CA: Consumer<OP::Left>, | 
| 296 |     CB: Consumer<OP::Right>, | 
| 297 | { | 
| 298 |     type Folder = UnzipFolder<'a, OP, CA::Folder, CB::Folder>; | 
| 299 |     type Reducer = UnzipReducer<CA::Reducer, CB::Reducer>; | 
| 300 |     type Result = (CA::Result, CB::Result); | 
| 301 |  | 
| 302 |     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { | 
| 303 |         let (left1, left2, left_reducer) = self.left.split_at(index); | 
| 304 |         let (right1, right2, right_reducer) = self.right.split_at(index); | 
| 305 |  | 
| 306 |         ( | 
| 307 |             UnzipConsumer { | 
| 308 |                 op: self.op, | 
| 309 |                 left: left1, | 
| 310 |                 right: right1, | 
| 311 |             }, | 
| 312 |             UnzipConsumer { | 
| 313 |                 op: self.op, | 
| 314 |                 left: left2, | 
| 315 |                 right: right2, | 
| 316 |             }, | 
| 317 |             UnzipReducer { | 
| 318 |                 left: left_reducer, | 
| 319 |                 right: right_reducer, | 
| 320 |             }, | 
| 321 |         ) | 
| 322 |     } | 
| 323 |  | 
| 324 |     fn into_folder(self) -> Self::Folder { | 
| 325 |         UnzipFolder { | 
| 326 |             op: self.op, | 
| 327 |             left: self.left.into_folder(), | 
| 328 |             right: self.right.into_folder(), | 
| 329 |         } | 
| 330 |     } | 
| 331 |  | 
| 332 |     fn full(&self) -> bool { | 
| 333 |         // don't stop until everyone is full | 
| 334 |         self.left.full() && self.right.full() | 
| 335 |     } | 
| 336 | } | 
| 337 |  | 
| 338 | impl<'a, T, OP, CA, CB> UnindexedConsumer<T> for UnzipConsumer<'a, OP, CA, CB> | 
| 339 | where | 
| 340 |     OP: UnzipOp<T>, | 
| 341 |     CA: UnindexedConsumer<OP::Left>, | 
| 342 |     CB: UnindexedConsumer<OP::Right>, | 
| 343 | { | 
| 344 |     fn split_off_left(&self) -> Self { | 
| 345 |         UnzipConsumer { | 
| 346 |             op: self.op, | 
| 347 |             left: self.left.split_off_left(), | 
| 348 |             right: self.right.split_off_left(), | 
| 349 |         } | 
| 350 |     } | 
| 351 |  | 
| 352 |     fn to_reducer(&self) -> Self::Reducer { | 
| 353 |         UnzipReducer { | 
| 354 |             left: self.left.to_reducer(), | 
| 355 |             right: self.right.to_reducer(), | 
| 356 |         } | 
| 357 |     } | 
| 358 | } | 
| 359 |  | 
| 360 | /// `Folder` that unzips into two other `Folder`s | 
| 361 | struct UnzipFolder<'a, OP, FA, FB> { | 
| 362 |     op: &'a OP, | 
| 363 |     left: FA, | 
| 364 |     right: FB, | 
| 365 | } | 
| 366 |  | 
| 367 | impl<'a, T, OP, FA, FB> Folder<T> for UnzipFolder<'a, OP, FA, FB> | 
| 368 | where | 
| 369 |     OP: UnzipOp<T>, | 
| 370 |     FA: Folder<OP::Left>, | 
| 371 |     FB: Folder<OP::Right>, | 
| 372 | { | 
| 373 |     type Result = (FA::Result, FB::Result); | 
| 374 |  | 
| 375 |     fn consume(self, item: T) -> Self { | 
| 376 |         let (left: FA, right: FB) = self.op.consume(item, self.left, self.right); | 
| 377 |         UnzipFolder { | 
| 378 |             op: self.op, | 
| 379 |             left, | 
| 380 |             right, | 
| 381 |         } | 
| 382 |     } | 
| 383 |  | 
| 384 |     fn complete(self) -> Self::Result { | 
| 385 |         (self.left.complete(), self.right.complete()) | 
| 386 |     } | 
| 387 |  | 
| 388 |     fn full(&self) -> bool { | 
| 389 |         // don't stop until everyone is full | 
| 390 |         self.left.full() && self.right.full() | 
| 391 |     } | 
| 392 | } | 
| 393 |  | 
| 394 | /// `Reducer` that unzips into two other `Reducer`s | 
| 395 | struct UnzipReducer<RA, RB> { | 
| 396 |     left: RA, | 
| 397 |     right: RB, | 
| 398 | } | 
| 399 |  | 
| 400 | impl<A, B, RA, RB> Reducer<(A, B)> for UnzipReducer<RA, RB> | 
| 401 | where | 
| 402 |     RA: Reducer<A>, | 
| 403 |     RB: Reducer<B>, | 
| 404 | { | 
| 405 |     fn reduce(self, left: (A, B), right: (A, B)) -> (A, B) { | 
| 406 |         ( | 
| 407 |             self.left.reduce(left.0, right.0), | 
| 408 |             self.right.reduce(left.1, right.1), | 
| 409 |         ) | 
| 410 |     } | 
| 411 | } | 
| 412 |  | 
| 413 | impl<A, B, FromA, FromB> ParallelExtend<(A, B)> for (FromA, FromB) | 
| 414 | where | 
| 415 |     A: Send, | 
| 416 |     B: Send, | 
| 417 |     FromA: Send + ParallelExtend<A>, | 
| 418 |     FromB: Send + ParallelExtend<B>, | 
| 419 | { | 
| 420 |     fn par_extend<I>(&mut self, pi: I) | 
| 421 |     where | 
| 422 |         I: IntoParallelIterator<Item = (A, B)>, | 
| 423 |     { | 
| 424 |         execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), op:Unzip); | 
| 425 |     } | 
| 426 | } | 
| 427 |  | 
| 428 | impl<L, R, A, B> ParallelExtend<Either<L, R>> for (A, B) | 
| 429 | where | 
| 430 |     L: Send, | 
| 431 |     R: Send, | 
| 432 |     A: Send + ParallelExtend<L>, | 
| 433 |     B: Send + ParallelExtend<R>, | 
| 434 | { | 
| 435 |     fn par_extend<I>(&mut self, pi: I) | 
| 436 |     where | 
| 437 |         I: IntoParallelIterator<Item = Either<L, R>>, | 
| 438 |     { | 
| 439 |         execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), op:UnEither); | 
| 440 |     } | 
| 441 | } | 
| 442 |  | 
| 443 | /// An `UnzipOp` that routes items depending on their `Either` variant. | 
| 444 | struct UnEither; | 
| 445 |  | 
| 446 | impl<L, R> UnzipOp<Either<L, R>> for UnEither | 
| 447 | where | 
| 448 |     L: Send, | 
| 449 |     R: Send, | 
| 450 | { | 
| 451 |     type Left = L; | 
| 452 |     type Right = R; | 
| 453 |  | 
| 454 |     fn consume<FL, FR>(&self, item: Either<L, R>, left: FL, right: FR) -> (FL, FR) | 
| 455 |     where | 
| 456 |         FL: Folder<L>, | 
| 457 |         FR: Folder<R>, | 
| 458 |     { | 
| 459 |         match item { | 
| 460 |             Either::Left(item: L) => (left.consume(item), right), | 
| 461 |             Either::Right(item: R) => (left, right.consume(item)), | 
| 462 |         } | 
| 463 |     } | 
| 464 | } | 
| 465 |  | 
| 466 | impl<A, B, FromA, FromB> FromParallelIterator<(A, B)> for (FromA, FromB) | 
| 467 | where | 
| 468 |     A: Send, | 
| 469 |     B: Send, | 
| 470 |     FromA: Send + FromParallelIterator<A>, | 
| 471 |     FromB: Send + FromParallelIterator<B>, | 
| 472 | { | 
| 473 |     fn from_par_iter<I>(pi: I) -> Self | 
| 474 |     where | 
| 475 |         I: IntoParallelIterator<Item = (A, B)>, | 
| 476 |     { | 
| 477 |         let (a: Collector, b: Collector): (Collector<FromA>, Collector<FromB>) = pi.into_par_iter().unzip(); | 
| 478 |         (a.result.unwrap(), b.result.unwrap()) | 
| 479 |     } | 
| 480 | } | 
| 481 |  | 
| 482 | impl<L, R, A, B> FromParallelIterator<Either<L, R>> for (A, B) | 
| 483 | where | 
| 484 |     L: Send, | 
| 485 |     R: Send, | 
| 486 |     A: Send + FromParallelIterator<L>, | 
| 487 |     B: Send + FromParallelIterator<R>, | 
| 488 | { | 
| 489 |     fn from_par_iter<I>(pi: I) -> Self | 
| 490 |     where | 
| 491 |         I: IntoParallelIterator<Item = Either<L, R>>, | 
| 492 |     { | 
| 493 |         fn identity<T>(x: T) -> T { | 
| 494 |             x | 
| 495 |         } | 
| 496 |  | 
| 497 |         let (a: Collector, b: Collector): (Collector<A>, Collector<B>) = pi.into_par_iter().partition_map(identity); | 
| 498 |         (a.result.unwrap(), b.result.unwrap()) | 
| 499 |     } | 
| 500 | } | 
| 501 |  | 
| 502 | /// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`. | 
| 503 | struct Collector<FromT> { | 
| 504 |     result: Option<FromT>, | 
| 505 | } | 
| 506 |  | 
| 507 | impl<FromT> Default for Collector<FromT> { | 
| 508 |     fn default() -> Self { | 
| 509 |         Collector { result: None } | 
| 510 |     } | 
| 511 | } | 
| 512 |  | 
| 513 | impl<T, FromT> ParallelExtend<T> for Collector<FromT> | 
| 514 | where | 
| 515 |     T: Send, | 
| 516 |     FromT: Send + FromParallelIterator<T>, | 
| 517 | { | 
| 518 |     fn par_extend<I>(&mut self, pi: I) | 
| 519 |     where | 
| 520 |         I: IntoParallelIterator<Item = T>, | 
| 521 |     { | 
| 522 |         debug_assert!(self.result.is_none()); | 
| 523 |         self.result = Some(pi.into_par_iter().collect()); | 
| 524 |     } | 
| 525 | } | 
| 526 |  |