| 1 | //! Parallel iterator types for [results][std::result] |
| 2 | //! |
| 3 | //! You will rarely need to interact with this module directly unless you need |
| 4 | //! to name one of the iterator types. |
| 5 | //! |
| 6 | //! [std::result]: https://doc.rust-lang.org/stable/std/result/ |
| 7 | |
| 8 | use crate::iter::plumbing::*; |
| 9 | use crate::iter::*; |
| 10 | use std::sync::Mutex; |
| 11 | |
| 12 | use crate::option; |
| 13 | |
| 14 | /// Parallel iterator over a result |
| 15 | #[derive (Debug, Clone)] |
| 16 | pub struct IntoIter<T: Send> { |
| 17 | inner: option::IntoIter<T>, |
| 18 | } |
| 19 | |
| 20 | impl<T: Send, E> IntoParallelIterator for Result<T, E> { |
| 21 | type Item = T; |
| 22 | type Iter = IntoIter<T>; |
| 23 | |
| 24 | fn into_par_iter(self) -> Self::Iter { |
| 25 | IntoIter { |
| 26 | inner: self.ok().into_par_iter(), |
| 27 | } |
| 28 | } |
| 29 | } |
| 30 | |
| 31 | delegate_indexed_iterator! { |
| 32 | IntoIter<T> => T, |
| 33 | impl<T: Send> |
| 34 | } |
| 35 | |
| 36 | /// Parallel iterator over an immutable reference to a result |
| 37 | #[derive (Debug)] |
| 38 | pub struct Iter<'a, T: Sync> { |
| 39 | inner: option::IntoIter<&'a T>, |
| 40 | } |
| 41 | |
| 42 | impl<'a, T: Sync> Clone for Iter<'a, T> { |
| 43 | fn clone(&self) -> Self { |
| 44 | Iter { |
| 45 | inner: self.inner.clone(), |
| 46 | } |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | impl<'a, T: Sync, E> IntoParallelIterator for &'a Result<T, E> { |
| 51 | type Item = &'a T; |
| 52 | type Iter = Iter<'a, T>; |
| 53 | |
| 54 | fn into_par_iter(self) -> Self::Iter { |
| 55 | Iter { |
| 56 | inner: self.as_ref().ok().into_par_iter(), |
| 57 | } |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | delegate_indexed_iterator! { |
| 62 | Iter<'a, T> => &'a T, |
| 63 | impl<'a, T: Sync + 'a> |
| 64 | } |
| 65 | |
| 66 | /// Parallel iterator over a mutable reference to a result |
| 67 | #[derive (Debug)] |
| 68 | pub struct IterMut<'a, T: Send> { |
| 69 | inner: option::IntoIter<&'a mut T>, |
| 70 | } |
| 71 | |
| 72 | impl<'a, T: Send, E> IntoParallelIterator for &'a mut Result<T, E> { |
| 73 | type Item = &'a mut T; |
| 74 | type Iter = IterMut<'a, T>; |
| 75 | |
| 76 | fn into_par_iter(self) -> Self::Iter { |
| 77 | IterMut { |
| 78 | inner: self.as_mut().ok().into_par_iter(), |
| 79 | } |
| 80 | } |
| 81 | } |
| 82 | |
| 83 | delegate_indexed_iterator! { |
| 84 | IterMut<'a, T> => &'a mut T, |
| 85 | impl<'a, T: Send + 'a> |
| 86 | } |
| 87 | |
| 88 | /// Collect an arbitrary `Result`-wrapped collection. |
| 89 | /// |
| 90 | /// If any item is `Err`, then all previous `Ok` items collected are |
| 91 | /// discarded, and it returns that error. If there are multiple errors, the |
| 92 | /// one returned is not deterministic. |
| 93 | impl<C, T, E> FromParallelIterator<Result<T, E>> for Result<C, E> |
| 94 | where |
| 95 | C: FromParallelIterator<T>, |
| 96 | T: Send, |
| 97 | E: Send, |
| 98 | { |
| 99 | fn from_par_iter<I>(par_iter: I) -> Self |
| 100 | where |
| 101 | I: IntoParallelIterator<Item = Result<T, E>>, |
| 102 | { |
| 103 | fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ { |
| 104 | move |item| match item { |
| 105 | Ok(item) => Some(item), |
| 106 | Err(error) => { |
| 107 | // We don't need a blocking `lock()`, as anybody |
| 108 | // else holding the lock will also be writing |
| 109 | // `Some(error)`, and then ours is irrelevant. |
| 110 | if let Ok(mut guard) = saved.try_lock() { |
| 111 | if guard.is_none() { |
| 112 | *guard = Some(error); |
| 113 | } |
| 114 | } |
| 115 | None |
| 116 | } |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | let saved_error = Mutex::new(None); |
| 121 | let collection = par_iter |
| 122 | .into_par_iter() |
| 123 | .map(ok(&saved_error)) |
| 124 | .while_some() |
| 125 | .collect(); |
| 126 | |
| 127 | match saved_error.into_inner().unwrap() { |
| 128 | Some(error) => Err(error), |
| 129 | None => Ok(collection), |
| 130 | } |
| 131 | } |
| 132 | } |
| 133 | |