1 | //! Definition of the `TryJoinAll` combinator, waiting for all of a list of |
2 | //! futures to finish with either success or error. |
3 | |
4 | use alloc::boxed::Box; |
5 | use alloc::vec::Vec; |
6 | use core::fmt; |
7 | use core::future::Future; |
8 | use core::iter::FromIterator; |
9 | use core::mem; |
10 | use core::pin::Pin; |
11 | use core::task::{Context, Poll}; |
12 | |
13 | use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; |
14 | |
15 | #[cfg (not(futures_no_atomic_cas))] |
16 | use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; |
17 | use crate::TryFutureExt; |
18 | |
19 | enum FinalState<E = ()> { |
20 | Pending, |
21 | AllDone, |
22 | Error(E), |
23 | } |
24 | |
25 | /// Future for the [`try_join_all`] function. |
26 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
27 | pub struct TryJoinAll<F> |
28 | where |
29 | F: TryFuture, |
30 | { |
31 | kind: TryJoinAllKind<F>, |
32 | } |
33 | |
34 | enum TryJoinAllKind<F> |
35 | where |
36 | F: TryFuture, |
37 | { |
38 | Small { |
39 | elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>, |
40 | }, |
41 | #[cfg (not(futures_no_atomic_cas))] |
42 | Big { |
43 | fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>, |
44 | }, |
45 | } |
46 | |
47 | impl<F> fmt::Debug for TryJoinAll<F> |
48 | where |
49 | F: TryFuture + fmt::Debug, |
50 | F::Ok: fmt::Debug, |
51 | F::Error: fmt::Debug, |
52 | F::Output: fmt::Debug, |
53 | { |
54 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
55 | match self.kind { |
56 | TryJoinAllKind::Small { ref elems: &Pin>]>> } => { |
57 | f.debug_struct("TryJoinAll" ).field(name:"elems" , value:elems).finish() |
58 | } |
59 | #[cfg (not(futures_no_atomic_cas))] |
60 | TryJoinAllKind::Big { ref fut: &TryCollect, …>, .. } => fmt::Debug::fmt(self:fut, f), |
61 | } |
62 | } |
63 | } |
64 | |
65 | /// Creates a future which represents either a collection of the results of the |
66 | /// futures given or an error. |
67 | /// |
68 | /// The returned future will drive execution for all of its underlying futures, |
69 | /// collecting the results into a destination `Vec<T>` in the same order as they |
70 | /// were provided. |
71 | /// |
72 | /// If any future returns an error then all other futures will be canceled and |
73 | /// an error will be returned immediately. If all futures complete successfully, |
74 | /// however, then the returned future will succeed with a `Vec` of all the |
75 | /// successful results. |
76 | /// |
77 | /// This function is only available when the `std` or `alloc` feature of this |
78 | /// library is activated, and it is activated by default. |
79 | /// |
80 | /// # See Also |
81 | /// |
82 | /// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance |
83 | /// reasons if the number of futures is large. You may want to look into using it or |
84 | /// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. |
85 | /// |
86 | /// Some examples for additional functionality provided by these are: |
87 | /// |
88 | /// * Adding new futures to the set even after it has been started. |
89 | /// |
90 | /// * Only polling the specific futures that have been woken. In cases where |
91 | /// you have a lot of futures this will result in much more efficient polling. |
92 | /// |
93 | /// |
94 | /// # Examples |
95 | /// |
96 | /// ``` |
97 | /// # futures::executor::block_on(async { |
98 | /// use futures::future::{self, try_join_all}; |
99 | /// |
100 | /// let futures = vec![ |
101 | /// future::ok::<u32, u32>(1), |
102 | /// future::ok::<u32, u32>(2), |
103 | /// future::ok::<u32, u32>(3), |
104 | /// ]; |
105 | /// |
106 | /// assert_eq!(try_join_all(futures).await, Ok(vec![1, 2, 3])); |
107 | /// |
108 | /// let futures = vec![ |
109 | /// future::ok::<u32, u32>(1), |
110 | /// future::err::<u32, u32>(2), |
111 | /// future::ok::<u32, u32>(3), |
112 | /// ]; |
113 | /// |
114 | /// assert_eq!(try_join_all(futures).await, Err(2)); |
115 | /// # }); |
116 | /// ``` |
117 | pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item> |
118 | where |
119 | I: IntoIterator, |
120 | I::Item: TryFuture, |
121 | { |
122 | let iter = iter.into_iter().map(TryFutureExt::into_future); |
123 | |
124 | #[cfg (futures_no_atomic_cas)] |
125 | { |
126 | let kind = TryJoinAllKind::Small { |
127 | elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), |
128 | }; |
129 | |
130 | assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( |
131 | TryJoinAll { kind }, |
132 | ) |
133 | } |
134 | |
135 | #[cfg (not(futures_no_atomic_cas))] |
136 | { |
137 | let kind = match iter.size_hint().1 { |
138 | Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { |
139 | elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), |
140 | }, |
141 | _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() }, |
142 | }; |
143 | |
144 | assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( |
145 | TryJoinAll { kind }, |
146 | ) |
147 | } |
148 | } |
149 | |
150 | impl<F> Future for TryJoinAll<F> |
151 | where |
152 | F: TryFuture, |
153 | { |
154 | type Output = Result<Vec<F::Ok>, F::Error>; |
155 | |
156 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
157 | match &mut self.kind { |
158 | TryJoinAllKind::Small { elems } => { |
159 | let mut state = FinalState::AllDone; |
160 | |
161 | for elem in join_all::iter_pin_mut(elems.as_mut()) { |
162 | match elem.try_poll(cx) { |
163 | Poll::Pending => state = FinalState::Pending, |
164 | Poll::Ready(Ok(())) => {} |
165 | Poll::Ready(Err(e)) => { |
166 | state = FinalState::Error(e); |
167 | break; |
168 | } |
169 | } |
170 | } |
171 | |
172 | match state { |
173 | FinalState::Pending => Poll::Pending, |
174 | FinalState::AllDone => { |
175 | let mut elems = mem::replace(elems, Box::pin([])); |
176 | let results = join_all::iter_pin_mut(elems.as_mut()) |
177 | .map(|e| e.take_output().unwrap()) |
178 | .collect(); |
179 | Poll::Ready(Ok(results)) |
180 | } |
181 | FinalState::Error(e) => { |
182 | let _ = mem::replace(elems, Box::pin([])); |
183 | Poll::Ready(Err(e)) |
184 | } |
185 | } |
186 | } |
187 | #[cfg (not(futures_no_atomic_cas))] |
188 | TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), |
189 | } |
190 | } |
191 | } |
192 | |
193 | impl<F> FromIterator<F> for TryJoinAll<F> |
194 | where |
195 | F: TryFuture, |
196 | { |
197 | fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { |
198 | try_join_all(iter) |
199 | } |
200 | } |
201 | |