1 | use super::plumbing::*; |
2 | use super::*; |
3 | use std::sync::atomic::{AtomicBool, Ordering}; |
4 | |
5 | pub(super) fn find<I, P>(pi: I, find_op: P) -> Option<I::Item> |
6 | where |
7 | I: ParallelIterator, |
8 | P: Fn(&I::Item) -> bool + Sync, |
9 | { |
10 | let found = AtomicBool::new(false); |
11 | let consumer = FindConsumer::new(&find_op, &found); |
12 | pi.drive_unindexed(consumer) |
13 | } |
14 | |
15 | struct FindConsumer<'p, P> { |
16 | find_op: &'p P, |
17 | found: &'p AtomicBool, |
18 | } |
19 | |
20 | impl<'p, P> FindConsumer<'p, P> { |
21 | fn new(find_op: &'p P, found: &'p AtomicBool) -> Self { |
22 | FindConsumer { find_op, found } |
23 | } |
24 | } |
25 | |
26 | impl<'p, T, P: 'p> Consumer<T> for FindConsumer<'p, P> |
27 | where |
28 | T: Send, |
29 | P: Fn(&T) -> bool + Sync, |
30 | { |
31 | type Folder = FindFolder<'p, T, P>; |
32 | type Reducer = FindReducer; |
33 | type Result = Option<T>; |
34 | |
35 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
36 | (self.split_off_left(), self, FindReducer) |
37 | } |
38 | |
39 | fn into_folder(self) -> Self::Folder { |
40 | FindFolder { |
41 | find_op: self.find_op, |
42 | found: self.found, |
43 | item: None, |
44 | } |
45 | } |
46 | |
47 | fn full(&self) -> bool { |
48 | self.found.load(Ordering::Relaxed) |
49 | } |
50 | } |
51 | |
52 | impl<'p, T, P: 'p> UnindexedConsumer<T> for FindConsumer<'p, P> |
53 | where |
54 | T: Send, |
55 | P: Fn(&T) -> bool + Sync, |
56 | { |
57 | fn split_off_left(&self) -> Self { |
58 | FindConsumer::new(self.find_op, self.found) |
59 | } |
60 | |
61 | fn to_reducer(&self) -> Self::Reducer { |
62 | FindReducer |
63 | } |
64 | } |
65 | |
66 | struct FindFolder<'p, T, P> { |
67 | find_op: &'p P, |
68 | found: &'p AtomicBool, |
69 | item: Option<T>, |
70 | } |
71 | |
72 | impl<'p, T, P> Folder<T> for FindFolder<'p, T, P> |
73 | where |
74 | P: Fn(&T) -> bool + 'p, |
75 | { |
76 | type Result = Option<T>; |
77 | |
78 | fn consume(mut self, item: T) -> Self { |
79 | if (self.find_op)(&item) { |
80 | self.found.store(true, Ordering::Relaxed); |
81 | self.item = Some(item); |
82 | } |
83 | self |
84 | } |
85 | |
86 | fn consume_iter<I>(mut self, iter: I) -> Self |
87 | where |
88 | I: IntoIterator<Item = T>, |
89 | { |
90 | fn not_full<T>(found: &AtomicBool) -> impl Fn(&T) -> bool + '_ { |
91 | move |_| !found.load(Ordering::Relaxed) |
92 | } |
93 | |
94 | self.item = iter |
95 | .into_iter() |
96 | // stop iterating if another thread has found something |
97 | .take_while(not_full(self.found)) |
98 | .find(self.find_op); |
99 | if self.item.is_some() { |
100 | self.found.store(true, Ordering::Relaxed) |
101 | } |
102 | self |
103 | } |
104 | |
105 | fn complete(self) -> Self::Result { |
106 | self.item |
107 | } |
108 | |
109 | fn full(&self) -> bool { |
110 | self.found.load(Ordering::Relaxed) |
111 | } |
112 | } |
113 | |
114 | struct FindReducer; |
115 | |
116 | impl<T> Reducer<Option<T>> for FindReducer { |
117 | fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> { |
118 | left.or(right) |
119 | } |
120 | } |
121 | |