| 1 | use super::plumbing::*; | 
| 2 | use super::ParallelIterator; | 
| 3 |  | 
| 4 | pub(super) fn reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T | 
| 5 | where | 
| 6 |     PI: ParallelIterator<Item = T>, | 
| 7 |     R: Fn(T, T) -> T + Sync, | 
| 8 |     ID: Fn() -> T + Sync, | 
| 9 |     T: Send, | 
| 10 | { | 
| 11 |     let consumer: ReduceConsumer<'_, R, ID> = ReduceConsumer { | 
| 12 |         identity: &identity, | 
| 13 |         reduce_op: &reduce_op, | 
| 14 |     }; | 
| 15 |     pi.drive_unindexed(consumer) | 
| 16 | } | 
| 17 |  | 
| 18 | struct ReduceConsumer<'r, R, ID> { | 
| 19 |     identity: &'r ID, | 
| 20 |     reduce_op: &'r R, | 
| 21 | } | 
| 22 |  | 
| 23 | impl<'r, R, ID> Copy for ReduceConsumer<'r, R, ID> {} | 
| 24 |  | 
| 25 | impl<'r, R, ID> Clone for ReduceConsumer<'r, R, ID> { | 
| 26 |     fn clone(&self) -> Self { | 
| 27 |         *self | 
| 28 |     } | 
| 29 | } | 
| 30 |  | 
| 31 | impl<'r, R, ID, T> Consumer<T> for ReduceConsumer<'r, R, ID> | 
| 32 | where | 
| 33 |     R: Fn(T, T) -> T + Sync, | 
| 34 |     ID: Fn() -> T + Sync, | 
| 35 |     T: Send, | 
| 36 | { | 
| 37 |     type Folder = ReduceFolder<'r, R, T>; | 
| 38 |     type Reducer = Self; | 
| 39 |     type Result = T; | 
| 40 |  | 
| 41 |     fn split_at(self, _index: usize) -> (Self, Self, Self) { | 
| 42 |         (self, self, self) | 
| 43 |     } | 
| 44 |  | 
| 45 |     fn into_folder(self) -> Self::Folder { | 
| 46 |         ReduceFolder { | 
| 47 |             reduce_op: self.reduce_op, | 
| 48 |             item: (self.identity)(), | 
| 49 |         } | 
| 50 |     } | 
| 51 |  | 
| 52 |     fn full(&self) -> bool { | 
| 53 |         false | 
| 54 |     } | 
| 55 | } | 
| 56 |  | 
| 57 | impl<'r, R, ID, T> UnindexedConsumer<T> for ReduceConsumer<'r, R, ID> | 
| 58 | where | 
| 59 |     R: Fn(T, T) -> T + Sync, | 
| 60 |     ID: Fn() -> T + Sync, | 
| 61 |     T: Send, | 
| 62 | { | 
| 63 |     fn split_off_left(&self) -> Self { | 
| 64 |         *self | 
| 65 |     } | 
| 66 |  | 
| 67 |     fn to_reducer(&self) -> Self::Reducer { | 
| 68 |         *self | 
| 69 |     } | 
| 70 | } | 
| 71 |  | 
| 72 | impl<'r, R, ID, T> Reducer<T> for ReduceConsumer<'r, R, ID> | 
| 73 | where | 
| 74 |     R: Fn(T, T) -> T + Sync, | 
| 75 | { | 
| 76 |     fn reduce(self, left: T, right: T) -> T { | 
| 77 |         (self.reduce_op)(left, right) | 
| 78 |     } | 
| 79 | } | 
| 80 |  | 
| 81 | struct ReduceFolder<'r, R, T> { | 
| 82 |     reduce_op: &'r R, | 
| 83 |     item: T, | 
| 84 | } | 
| 85 |  | 
| 86 | impl<'r, R, T> Folder<T> for ReduceFolder<'r, R, T> | 
| 87 | where | 
| 88 |     R: Fn(T, T) -> T, | 
| 89 | { | 
| 90 |     type Result = T; | 
| 91 |  | 
| 92 |     fn consume(self, item: T) -> Self { | 
| 93 |         ReduceFolder { | 
| 94 |             reduce_op: self.reduce_op, | 
| 95 |             item: (self.reduce_op)(self.item, item), | 
| 96 |         } | 
| 97 |     } | 
| 98 |  | 
| 99 |     fn consume_iter<I>(self, iter: I) -> Self | 
| 100 |     where | 
| 101 |         I: IntoIterator<Item = T>, | 
| 102 |     { | 
| 103 |         ReduceFolder { | 
| 104 |             reduce_op: self.reduce_op, | 
| 105 |             item: iter.into_iter().fold(self.item, self.reduce_op), | 
| 106 |         } | 
| 107 |     } | 
| 108 |  | 
| 109 |     fn complete(self) -> T { | 
| 110 |         self.item | 
| 111 |     } | 
| 112 |  | 
| 113 |     fn full(&self) -> bool { | 
| 114 |         false | 
| 115 |     } | 
| 116 | } | 
| 117 |  |