| 1 | use super::plumbing::*; | 
| 2 | use super::*; | 
| 3 | use std::fmt; | 
| 4 | use std::sync::atomic::{AtomicBool, Ordering}; | 
| 5 |  | 
| 6 | /// `SkipAnyWhile` is an iterator that skips over elements from anywhere in `I` | 
| 7 | /// until the callback returns `false`. | 
| 8 | /// This struct is created by the [`skip_any_while()`] method on [`ParallelIterator`] | 
| 9 | /// | 
| 10 | /// [`skip_any_while()`]: trait.ParallelIterator.html#method.skip_any_while | 
| 11 | /// [`ParallelIterator`]: trait.ParallelIterator.html | 
| 12 | #[must_use  = "iterator adaptors are lazy and do nothing unless consumed" ] | 
| 13 | #[derive (Clone)] | 
| 14 | pub struct SkipAnyWhile<I: ParallelIterator, P> { | 
| 15 |     base: I, | 
| 16 |     predicate: P, | 
| 17 | } | 
| 18 |  | 
| 19 | impl<I: ParallelIterator + fmt::Debug, P> fmt::Debug for SkipAnyWhile<I, P> { | 
| 20 |     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
| 21 |         f&mut DebugStruct<'_, '_>.debug_struct("SkipAnyWhile" ) | 
| 22 |             .field(name:"base" , &self.base) | 
| 23 |             .finish() | 
| 24 |     } | 
| 25 | } | 
| 26 |  | 
| 27 | impl<I, P> SkipAnyWhile<I, P> | 
| 28 | where | 
| 29 |     I: ParallelIterator, | 
| 30 | { | 
| 31 |     /// Creates a new `SkipAnyWhile` iterator. | 
| 32 |     pub(super) fn new(base: I, predicate: P) -> Self { | 
| 33 |         SkipAnyWhile { base, predicate } | 
| 34 |     } | 
| 35 | } | 
| 36 |  | 
| 37 | impl<I, P> ParallelIterator for SkipAnyWhile<I, P> | 
| 38 | where | 
| 39 |     I: ParallelIterator, | 
| 40 |     P: Fn(&I::Item) -> bool + Sync + Send, | 
| 41 | { | 
| 42 |     type Item = I::Item; | 
| 43 |  | 
| 44 |     fn drive_unindexed<C>(self, consumer: C) -> C::Result | 
| 45 |     where | 
| 46 |         C: UnindexedConsumer<Self::Item>, | 
| 47 |     { | 
| 48 |         let consumer1: SkipAnyWhileConsumer<'_, …, …> = SkipAnyWhileConsumer { | 
| 49 |             base: consumer, | 
| 50 |             predicate: &self.predicate, | 
| 51 |             skipping: &AtomicBool::new(true), | 
| 52 |         }; | 
| 53 |         self.base.drive_unindexed(consumer:consumer1) | 
| 54 |     } | 
| 55 | } | 
| 56 |  | 
| 57 | /// //////////////////////////////////////////////////////////////////////// | 
| 58 | /// Consumer implementation | 
| 59 |  | 
| 60 | struct SkipAnyWhileConsumer<'p, C, P> { | 
| 61 |     base: C, | 
| 62 |     predicate: &'p P, | 
| 63 |     skipping: &'p AtomicBool, | 
| 64 | } | 
| 65 |  | 
| 66 | impl<'p, T, C, P> Consumer<T> for SkipAnyWhileConsumer<'p, C, P> | 
| 67 | where | 
| 68 |     C: Consumer<T>, | 
| 69 |     P: Fn(&T) -> bool + Sync, | 
| 70 | { | 
| 71 |     type Folder = SkipAnyWhileFolder<'p, C::Folder, P>; | 
| 72 |     type Reducer = C::Reducer; | 
| 73 |     type Result = C::Result; | 
| 74 |  | 
| 75 |     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { | 
| 76 |         let (left, right, reducer) = self.base.split_at(index); | 
| 77 |         ( | 
| 78 |             SkipAnyWhileConsumer { base: left, ..self }, | 
| 79 |             SkipAnyWhileConsumer { | 
| 80 |                 base: right, | 
| 81 |                 ..self | 
| 82 |             }, | 
| 83 |             reducer, | 
| 84 |         ) | 
| 85 |     } | 
| 86 |  | 
| 87 |     fn into_folder(self) -> Self::Folder { | 
| 88 |         SkipAnyWhileFolder { | 
| 89 |             base: self.base.into_folder(), | 
| 90 |             predicate: self.predicate, | 
| 91 |             skipping: self.skipping, | 
| 92 |         } | 
| 93 |     } | 
| 94 |  | 
| 95 |     fn full(&self) -> bool { | 
| 96 |         self.base.full() | 
| 97 |     } | 
| 98 | } | 
| 99 |  | 
| 100 | impl<'p, T, C, P> UnindexedConsumer<T> for SkipAnyWhileConsumer<'p, C, P> | 
| 101 | where | 
| 102 |     C: UnindexedConsumer<T>, | 
| 103 |     P: Fn(&T) -> bool + Sync, | 
| 104 | { | 
| 105 |     fn split_off_left(&self) -> Self { | 
| 106 |         SkipAnyWhileConsumer { | 
| 107 |             base: self.base.split_off_left(), | 
| 108 |             ..*self | 
| 109 |         } | 
| 110 |     } | 
| 111 |  | 
| 112 |     fn to_reducer(&self) -> Self::Reducer { | 
| 113 |         self.base.to_reducer() | 
| 114 |     } | 
| 115 | } | 
| 116 |  | 
| 117 | struct SkipAnyWhileFolder<'p, C, P> { | 
| 118 |     base: C, | 
| 119 |     predicate: &'p P, | 
| 120 |     skipping: &'p AtomicBool, | 
| 121 | } | 
| 122 |  | 
| 123 | fn skip<T>(item: &T, skipping: &AtomicBool, predicate: &impl Fn(&T) -> bool) -> bool { | 
| 124 |     if !skipping.load(order:Ordering::Relaxed) { | 
| 125 |         return false; | 
| 126 |     } | 
| 127 |     if predicate(item) { | 
| 128 |         return true; | 
| 129 |     } | 
| 130 |     skipping.store(val:false, order:Ordering::Relaxed); | 
| 131 |     false | 
| 132 | } | 
| 133 |  | 
| 134 | impl<'p, T, C, P> Folder<T> for SkipAnyWhileFolder<'p, C, P> | 
| 135 | where | 
| 136 |     C: Folder<T>, | 
| 137 |     P: Fn(&T) -> bool + 'p, | 
| 138 | { | 
| 139 |     type Result = C::Result; | 
| 140 |  | 
| 141 |     fn consume(mut self, item: T) -> Self { | 
| 142 |         if !skip(&item, self.skipping, self.predicate) { | 
| 143 |             self.base = self.base.consume(item); | 
| 144 |         } | 
| 145 |         self | 
| 146 |     } | 
| 147 |  | 
| 148 |     fn consume_iter<I>(mut self, iter: I) -> Self | 
| 149 |     where | 
| 150 |         I: IntoIterator<Item = T>, | 
| 151 |     { | 
| 152 |         self.base = self.base.consume_iter( | 
| 153 |             iter.into_iter() | 
| 154 |                 .skip_while(move |x| skip(x, self.skipping, self.predicate)), | 
| 155 |         ); | 
| 156 |         self | 
| 157 |     } | 
| 158 |  | 
| 159 |     fn complete(self) -> C::Result { | 
| 160 |         self.base.complete() | 
| 161 |     } | 
| 162 |  | 
| 163 |     fn full(&self) -> bool { | 
| 164 |         self.base.full() | 
| 165 |     } | 
| 166 | } | 
| 167 |  |