1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5pub(super) fn find<I, P>(pi: I, find_op: P) -> Option<I::Item>
6where
7 I: ParallelIterator,
8 P: Fn(&I::Item) -> bool + Sync,
9{
10 let found = AtomicBool::new(false);
11 let consumer = FindConsumer::new(&find_op, &found);
12 pi.drive_unindexed(consumer)
13}
14
15struct FindConsumer<'p, P> {
16 find_op: &'p P,
17 found: &'p AtomicBool,
18}
19
20impl<'p, P> FindConsumer<'p, P> {
21 fn new(find_op: &'p P, found: &'p AtomicBool) -> Self {
22 FindConsumer { find_op, found }
23 }
24}
25
26impl<'p, T, P: 'p> Consumer<T> for FindConsumer<'p, P>
27where
28 T: Send,
29 P: Fn(&T) -> bool + Sync,
30{
31 type Folder = FindFolder<'p, T, P>;
32 type Reducer = FindReducer;
33 type Result = Option<T>;
34
35 fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
36 (self.split_off_left(), self, FindReducer)
37 }
38
39 fn into_folder(self) -> Self::Folder {
40 FindFolder {
41 find_op: self.find_op,
42 found: self.found,
43 item: None,
44 }
45 }
46
47 fn full(&self) -> bool {
48 self.found.load(Ordering::Relaxed)
49 }
50}
51
52impl<'p, T, P: 'p> UnindexedConsumer<T> for FindConsumer<'p, P>
53where
54 T: Send,
55 P: Fn(&T) -> bool + Sync,
56{
57 fn split_off_left(&self) -> Self {
58 FindConsumer::new(self.find_op, self.found)
59 }
60
61 fn to_reducer(&self) -> Self::Reducer {
62 FindReducer
63 }
64}
65
66struct FindFolder<'p, T, P> {
67 find_op: &'p P,
68 found: &'p AtomicBool,
69 item: Option<T>,
70}
71
72impl<'p, T, P> Folder<T> for FindFolder<'p, T, P>
73where
74 P: Fn(&T) -> bool + 'p,
75{
76 type Result = Option<T>;
77
78 fn consume(mut self, item: T) -> Self {
79 if (self.find_op)(&item) {
80 self.found.store(true, Ordering::Relaxed);
81 self.item = Some(item);
82 }
83 self
84 }
85
86 fn consume_iter<I>(mut self, iter: I) -> Self
87 where
88 I: IntoIterator<Item = T>,
89 {
90 fn not_full<T>(found: &AtomicBool) -> impl Fn(&T) -> bool + '_ {
91 move |_| !found.load(Ordering::Relaxed)
92 }
93
94 self.item = iter
95 .into_iter()
96 // stop iterating if another thread has found something
97 .take_while(not_full(self.found))
98 .find(self.find_op);
99 if self.item.is_some() {
100 self.found.store(true, Ordering::Relaxed)
101 }
102 self
103 }
104
105 fn complete(self) -> Self::Result {
106 self.item
107 }
108
109 fn full(&self) -> bool {
110 self.found.load(Ordering::Relaxed)
111 }
112}
113
114struct FindReducer;
115
116impl<T> Reducer<Option<T>> for FindReducer {
117 fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
118 left.or(right)
119 }
120}
121