1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5/// `SkipAny` is an iterator that skips over `n` elements from anywhere in `I`.
6/// This struct is created by the [`skip_any()`] method on [`ParallelIterator`]
7///
8/// [`skip_any()`]: trait.ParallelIterator.html#method.skip_any
9/// [`ParallelIterator`]: trait.ParallelIterator.html
10#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
11#[derive(Clone, Debug)]
12pub struct SkipAny<I: ParallelIterator> {
13 base: I,
14 count: usize,
15}
16
17impl<I> SkipAny<I>
18where
19 I: ParallelIterator,
20{
21 /// Creates a new `SkipAny` iterator.
22 pub(super) fn new(base: I, count: usize) -> Self {
23 SkipAny { base, count }
24 }
25}
26
27impl<I> ParallelIterator for SkipAny<I>
28where
29 I: ParallelIterator,
30{
31 type Item = I::Item;
32
33 fn drive_unindexed<C>(self, consumer: C) -> C::Result
34 where
35 C: UnindexedConsumer<Self::Item>,
36 {
37 let consumer1: SkipAnyConsumer<'_, C> = SkipAnyConsumer {
38 base: consumer,
39 count: &AtomicUsize::new(self.count),
40 };
41 self.base.drive_unindexed(consumer:consumer1)
42 }
43}
44
45/// ////////////////////////////////////////////////////////////////////////
46/// Consumer implementation
47
48struct SkipAnyConsumer<'f, C> {
49 base: C,
50 count: &'f AtomicUsize,
51}
52
53impl<'f, T, C> Consumer<T> for SkipAnyConsumer<'f, C>
54where
55 C: Consumer<T>,
56 T: Send,
57{
58 type Folder = SkipAnyFolder<'f, C::Folder>;
59 type Reducer = C::Reducer;
60 type Result = C::Result;
61
62 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
63 let (left, right, reducer) = self.base.split_at(index);
64 (
65 SkipAnyConsumer { base: left, ..self },
66 SkipAnyConsumer {
67 base: right,
68 ..self
69 },
70 reducer,
71 )
72 }
73
74 fn into_folder(self) -> Self::Folder {
75 SkipAnyFolder {
76 base: self.base.into_folder(),
77 count: self.count,
78 }
79 }
80
81 fn full(&self) -> bool {
82 self.base.full()
83 }
84}
85
86impl<'f, T, C> UnindexedConsumer<T> for SkipAnyConsumer<'f, C>
87where
88 C: UnindexedConsumer<T>,
89 T: Send,
90{
91 fn split_off_left(&self) -> Self {
92 SkipAnyConsumer {
93 base: self.base.split_off_left(),
94 ..*self
95 }
96 }
97
98 fn to_reducer(&self) -> Self::Reducer {
99 self.base.to_reducer()
100 }
101}
102
103struct SkipAnyFolder<'f, C> {
104 base: C,
105 count: &'f AtomicUsize,
106}
107
108fn checked_decrement(u: &AtomicUsize) -> bool {
109 uResult.fetch_update(set_order:Ordering::Relaxed, fetch_order:Ordering::Relaxed, |u: usize| u.checked_sub(1))
110 .is_ok()
111}
112
113impl<'f, T, C> Folder<T> for SkipAnyFolder<'f, C>
114where
115 C: Folder<T>,
116{
117 type Result = C::Result;
118
119 fn consume(mut self, item: T) -> Self {
120 if !checked_decrement(self.count) {
121 self.base = self.base.consume(item);
122 }
123 self
124 }
125
126 fn consume_iter<I>(mut self, iter: I) -> Self
127 where
128 I: IntoIterator<Item = T>,
129 {
130 self.base = self.base.consume_iter(
131 iter.into_iter()
132 .skip_while(move |_| checked_decrement(self.count)),
133 );
134 self
135 }
136
137 fn complete(self) -> C::Result {
138 self.base.complete()
139 }
140
141 fn full(&self) -> bool {
142 self.base.full()
143 }
144}
145