1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5/// `WhileSome` is an iterator that yields the `Some` elements of an iterator,
6/// halting as soon as any `None` is produced.
7///
8/// This struct is created by the [`while_some()`] method on [`ParallelIterator`]
9///
10/// [`while_some()`]: trait.ParallelIterator.html#method.while_some
11/// [`ParallelIterator`]: trait.ParallelIterator.html
12#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13#[derive(Debug, Clone)]
14pub struct WhileSome<I: ParallelIterator> {
15 base: I,
16}
17
18impl<I> WhileSome<I>
19where
20 I: ParallelIterator,
21{
22 /// Creates a new `WhileSome` iterator.
23 pub(super) fn new(base: I) -> Self {
24 WhileSome { base }
25 }
26}
27
28impl<I, T> ParallelIterator for WhileSome<I>
29where
30 I: ParallelIterator<Item = Option<T>>,
31 T: Send,
32{
33 type Item = T;
34
35 fn drive_unindexed<C>(self, consumer: C) -> C::Result
36 where
37 C: UnindexedConsumer<Self::Item>,
38 {
39 let full = AtomicBool::new(false);
40 let consumer1 = WhileSomeConsumer {
41 base: consumer,
42 full: &full,
43 };
44 self.base.drive_unindexed(consumer1)
45 }
46}
47
48/// ////////////////////////////////////////////////////////////////////////
49/// Consumer implementation
50
51struct WhileSomeConsumer<'f, C> {
52 base: C,
53 full: &'f AtomicBool,
54}
55
56impl<'f, T, C> Consumer<Option<T>> for WhileSomeConsumer<'f, C>
57where
58 C: Consumer<T>,
59 T: Send,
60{
61 type Folder = WhileSomeFolder<'f, C::Folder>;
62 type Reducer = C::Reducer;
63 type Result = C::Result;
64
65 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
66 let (left, right, reducer) = self.base.split_at(index);
67 (
68 WhileSomeConsumer { base: left, ..self },
69 WhileSomeConsumer {
70 base: right,
71 ..self
72 },
73 reducer,
74 )
75 }
76
77 fn into_folder(self) -> Self::Folder {
78 WhileSomeFolder {
79 base: self.base.into_folder(),
80 full: self.full,
81 }
82 }
83
84 fn full(&self) -> bool {
85 self.full.load(Ordering::Relaxed) || self.base.full()
86 }
87}
88
89impl<'f, T, C> UnindexedConsumer<Option<T>> for WhileSomeConsumer<'f, C>
90where
91 C: UnindexedConsumer<T>,
92 T: Send,
93{
94 fn split_off_left(&self) -> Self {
95 WhileSomeConsumer {
96 base: self.base.split_off_left(),
97 ..*self
98 }
99 }
100
101 fn to_reducer(&self) -> Self::Reducer {
102 self.base.to_reducer()
103 }
104}
105
106struct WhileSomeFolder<'f, C> {
107 base: C,
108 full: &'f AtomicBool,
109}
110
111impl<'f, T, C> Folder<Option<T>> for WhileSomeFolder<'f, C>
112where
113 C: Folder<T>,
114{
115 type Result = C::Result;
116
117 fn consume(mut self, item: Option<T>) -> Self {
118 match item {
119 Some(item) => self.base = self.base.consume(item),
120 None => self.full.store(true, Ordering::Relaxed),
121 }
122 self
123 }
124
125 fn consume_iter<I>(mut self, iter: I) -> Self
126 where
127 I: IntoIterator<Item = Option<T>>,
128 {
129 fn some<T>(full: &AtomicBool) -> impl Fn(&Option<T>) -> bool + '_ {
130 move |x| match *x {
131 Some(_) => !full.load(Ordering::Relaxed),
132 None => {
133 full.store(true, Ordering::Relaxed);
134 false
135 }
136 }
137 }
138
139 self.base = self.base.consume_iter(
140 iter.into_iter()
141 .take_while(some(self.full))
142 .map(Option::unwrap),
143 );
144 self
145 }
146
147 fn complete(self) -> C::Result {
148 self.base.complete()
149 }
150
151 fn full(&self) -> bool {
152 self.full.load(Ordering::Relaxed) || self.base.full()
153 }
154}
155