1 | //! Parallel iterator types for [results][std::result] |
2 | //! |
3 | //! You will rarely need to interact with this module directly unless you need |
4 | //! to name one of the iterator types. |
5 | //! |
6 | //! [std::result]: https://doc.rust-lang.org/stable/std/result/ |
7 | |
8 | use crate::iter::plumbing::*; |
9 | use crate::iter::*; |
10 | use std::sync::Mutex; |
11 | |
12 | use crate::option; |
13 | |
14 | /// Parallel iterator over a result |
15 | #[derive (Debug, Clone)] |
16 | pub struct IntoIter<T: Send> { |
17 | inner: option::IntoIter<T>, |
18 | } |
19 | |
20 | impl<T: Send, E> IntoParallelIterator for Result<T, E> { |
21 | type Item = T; |
22 | type Iter = IntoIter<T>; |
23 | |
24 | fn into_par_iter(self) -> Self::Iter { |
25 | IntoIter { |
26 | inner: self.ok().into_par_iter(), |
27 | } |
28 | } |
29 | } |
30 | |
31 | delegate_indexed_iterator! { |
32 | IntoIter<T> => T, |
33 | impl<T: Send> |
34 | } |
35 | |
36 | /// Parallel iterator over an immutable reference to a result |
37 | #[derive (Debug)] |
38 | pub struct Iter<'a, T: Sync> { |
39 | inner: option::IntoIter<&'a T>, |
40 | } |
41 | |
42 | impl<'a, T: Sync> Clone for Iter<'a, T> { |
43 | fn clone(&self) -> Self { |
44 | Iter { |
45 | inner: self.inner.clone(), |
46 | } |
47 | } |
48 | } |
49 | |
50 | impl<'a, T: Sync, E> IntoParallelIterator for &'a Result<T, E> { |
51 | type Item = &'a T; |
52 | type Iter = Iter<'a, T>; |
53 | |
54 | fn into_par_iter(self) -> Self::Iter { |
55 | Iter { |
56 | inner: self.as_ref().ok().into_par_iter(), |
57 | } |
58 | } |
59 | } |
60 | |
61 | delegate_indexed_iterator! { |
62 | Iter<'a, T> => &'a T, |
63 | impl<'a, T: Sync + 'a> |
64 | } |
65 | |
66 | /// Parallel iterator over a mutable reference to a result |
67 | #[derive (Debug)] |
68 | pub struct IterMut<'a, T: Send> { |
69 | inner: option::IntoIter<&'a mut T>, |
70 | } |
71 | |
72 | impl<'a, T: Send, E> IntoParallelIterator for &'a mut Result<T, E> { |
73 | type Item = &'a mut T; |
74 | type Iter = IterMut<'a, T>; |
75 | |
76 | fn into_par_iter(self) -> Self::Iter { |
77 | IterMut { |
78 | inner: self.as_mut().ok().into_par_iter(), |
79 | } |
80 | } |
81 | } |
82 | |
83 | delegate_indexed_iterator! { |
84 | IterMut<'a, T> => &'a mut T, |
85 | impl<'a, T: Send + 'a> |
86 | } |
87 | |
88 | /// Collect an arbitrary `Result`-wrapped collection. |
89 | /// |
90 | /// If any item is `Err`, then all previous `Ok` items collected are |
91 | /// discarded, and it returns that error. If there are multiple errors, the |
92 | /// one returned is not deterministic. |
93 | impl<C, T, E> FromParallelIterator<Result<T, E>> for Result<C, E> |
94 | where |
95 | C: FromParallelIterator<T>, |
96 | T: Send, |
97 | E: Send, |
98 | { |
99 | fn from_par_iter<I>(par_iter: I) -> Self |
100 | where |
101 | I: IntoParallelIterator<Item = Result<T, E>>, |
102 | { |
103 | fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ { |
104 | move |item| match item { |
105 | Ok(item) => Some(item), |
106 | Err(error) => { |
107 | // We don't need a blocking `lock()`, as anybody |
108 | // else holding the lock will also be writing |
109 | // `Some(error)`, and then ours is irrelevant. |
110 | if let Ok(mut guard) = saved.try_lock() { |
111 | if guard.is_none() { |
112 | *guard = Some(error); |
113 | } |
114 | } |
115 | None |
116 | } |
117 | } |
118 | } |
119 | |
120 | let saved_error = Mutex::new(None); |
121 | let collection = par_iter |
122 | .into_par_iter() |
123 | .map(ok(&saved_error)) |
124 | .while_some() |
125 | .collect(); |
126 | |
127 | match saved_error.into_inner().unwrap() { |
128 | Some(error) => Err(error), |
129 | None => Ok(collection), |
130 | } |
131 | } |
132 | } |
133 | |