| 1 | #![allow (unused_imports, unused_macros)] // items are used by the macro |
| 2 | |
| 3 | use crate::cell::UnsafeCell; |
| 4 | use crate::future::{Future, poll_fn}; |
| 5 | use crate::mem; |
| 6 | use crate::pin::Pin; |
| 7 | use crate::task::{Context, Poll, ready}; |
| 8 | |
| 9 | /// Polls multiple futures simultaneously, returning a tuple |
| 10 | /// of all results once complete. |
| 11 | /// |
| 12 | /// While `join!(a, b).await` is similar to `(a.await, b.await)`, |
| 13 | /// `join!` polls both futures concurrently and is therefore more efficient. |
| 14 | /// |
| 15 | /// # Examples |
| 16 | /// |
| 17 | /// ``` |
| 18 | /// #![feature(future_join)] |
| 19 | /// |
| 20 | /// use std::future::join; |
| 21 | /// |
| 22 | /// async fn one() -> usize { 1 } |
| 23 | /// async fn two() -> usize { 2 } |
| 24 | /// |
| 25 | /// # let _ = async { |
| 26 | /// let x = join!(one(), two()).await; |
| 27 | /// assert_eq!(x, (1, 2)); |
| 28 | /// # }; |
| 29 | /// ``` |
| 30 | /// |
| 31 | /// `join!` is variadic, so you can pass any number of futures: |
| 32 | /// |
| 33 | /// ``` |
| 34 | /// #![feature(future_join)] |
| 35 | /// |
| 36 | /// use std::future::join; |
| 37 | /// |
| 38 | /// async fn one() -> usize { 1 } |
| 39 | /// async fn two() -> usize { 2 } |
| 40 | /// async fn three() -> usize { 3 } |
| 41 | /// |
| 42 | /// # let _ = async { |
| 43 | /// let x = join!(one(), two(), three()).await; |
| 44 | /// assert_eq!(x, (1, 2, 3)); |
| 45 | /// # }; |
| 46 | /// ``` |
| 47 | #[unstable (feature = "future_join" , issue = "91642" )] |
| 48 | pub macro join( $($fut:expr),+ $(,)? ) { |
| 49 | // Funnel through an internal macro not to leak implementation details. |
| 50 | join_internal! { |
| 51 | current_position: [] |
| 52 | futures_and_positions: [] |
| 53 | munching: [ $($fut)+ ] |
| 54 | } |
| 55 | } |
| 56 | |
| 57 | // FIXME(danielhenrymantilla): a private macro should need no stability guarantee. |
| 58 | #[unstable (feature = "future_join" , issue = "91642" )] |
| 59 | /// To be able to *name* the i-th future in the tuple (say we want the .4-th), |
| 60 | /// the following trick will be used: `let (_, _, _, _, it, ..) = tuple;` |
| 61 | /// In order to do that, we need to generate a `i`-long repetition of `_`, |
| 62 | /// for each i-th fut. Hence the recursive muncher approach. |
| 63 | macro join_internal { |
| 64 | // Recursion step: map each future with its "position" (underscore count). |
| 65 | ( |
| 66 | // Accumulate a token for each future that has been expanded: "_ _ _". |
| 67 | current_position: [ |
| 68 | $($underscores:tt)* |
| 69 | ] |
| 70 | // Accumulate Futures and their positions in the tuple: `_0th () _1st ( _ ) …`. |
| 71 | futures_and_positions: [ |
| 72 | $($acc:tt)* |
| 73 | ] |
| 74 | // Munch one future. |
| 75 | munching: [ |
| 76 | $current:tt |
| 77 | $($rest:tt)* |
| 78 | ] |
| 79 | ) => ( |
| 80 | join_internal! { |
| 81 | current_position: [ |
| 82 | $($underscores)* |
| 83 | _ |
| 84 | ] |
| 85 | futures_and_positions: [ |
| 86 | $($acc)* |
| 87 | $current ( $($underscores)* ) |
| 88 | ] |
| 89 | munching: [ |
| 90 | $($rest)* |
| 91 | ] |
| 92 | } |
| 93 | ), |
| 94 | |
| 95 | // End of recursion: generate the output future. |
| 96 | ( |
| 97 | current_position: $_:tt |
| 98 | futures_and_positions: [ |
| 99 | $( |
| 100 | $fut_expr:tt ( $($pos:tt)* ) |
| 101 | )* |
| 102 | ] |
| 103 | // Nothing left to munch. |
| 104 | munching: [] |
| 105 | ) => ( |
| 106 | match ( $( MaybeDone::Future($fut_expr), )* ) { futures => async { |
| 107 | let mut futures = futures; |
| 108 | // SAFETY: this is `pin_mut!`. |
| 109 | let mut futures = unsafe { Pin::new_unchecked(&mut futures) }; |
| 110 | poll_fn(move |cx| { |
| 111 | let mut done = true; |
| 112 | // For each `fut`, pin-project to it, and poll it. |
| 113 | $( |
| 114 | // SAFETY: pinning projection |
| 115 | let fut = unsafe { |
| 116 | futures.as_mut().map_unchecked_mut(|it| { |
| 117 | let ( $($pos,)* fut, .. ) = it; |
| 118 | fut |
| 119 | }) |
| 120 | }; |
| 121 | // Despite how tempting it may be to `let () = ready!(fut.poll(cx));` |
| 122 | // doing so would defeat the point of `join!`: to start polling eagerly all |
| 123 | // of the futures, to allow parallelizing the waits. |
| 124 | done &= fut.poll(cx).is_ready(); |
| 125 | )* |
| 126 | if !done { |
| 127 | return Poll::Pending; |
| 128 | } |
| 129 | // All ready; time to extract all the outputs. |
| 130 | |
| 131 | // SAFETY: `.take_output()` does not break the `Pin` invariants for that `fut`. |
| 132 | let futures = unsafe { |
| 133 | futures.as_mut().get_unchecked_mut() |
| 134 | }; |
| 135 | Poll::Ready( |
| 136 | ($( |
| 137 | { |
| 138 | let ( $($pos,)* fut, .. ) = &mut *futures; |
| 139 | fut.take_output().unwrap() |
| 140 | } |
| 141 | ),*) // <- no trailing comma since we don't want 1-tuples. |
| 142 | ) |
| 143 | }).await |
| 144 | }} |
| 145 | ), |
| 146 | } |
| 147 | |
| 148 | /// Future used by `join!` that stores it's output to |
| 149 | /// be later taken and doesn't panic when polled after ready. |
| 150 | /// |
| 151 | /// This type is public in a private module for use by the macro. |
| 152 | #[allow (missing_debug_implementations)] |
| 153 | #[unstable (feature = "future_join" , issue = "91642" )] |
| 154 | pub enum MaybeDone<F: Future> { |
| 155 | Future(F), |
| 156 | Done(F::Output), |
| 157 | Taken, |
| 158 | } |
| 159 | |
| 160 | #[unstable (feature = "future_join" , issue = "91642" )] |
| 161 | impl<F: Future> MaybeDone<F> { |
| 162 | pub fn take_output(&mut self) -> Option<F::Output> { |
| 163 | match *self { |
| 164 | MaybeDone::Done(_) => match mem::replace(self, Self::Taken) { |
| 165 | MaybeDone::Done(val: ::Output) => Some(val), |
| 166 | _ => unreachable!(), |
| 167 | }, |
| 168 | _ => None, |
| 169 | } |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | #[unstable (feature = "future_join" , issue = "91642" )] |
| 174 | impl<F: Future> Future for MaybeDone<F> { |
| 175 | type Output = (); |
| 176 | |
| 177 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 178 | // SAFETY: pinning in structural for `f` |
| 179 | unsafe { |
| 180 | // Do not mix match ergonomics with unsafe. |
| 181 | match *self.as_mut().get_unchecked_mut() { |
| 182 | MaybeDone::Future(ref mut f: &mut F) => { |
| 183 | let val: ::Output = ready!(Pin::new_unchecked(f).poll(cx)); |
| 184 | self.set(Self::Done(val)); |
| 185 | } |
| 186 | MaybeDone::Done(_) => {} |
| 187 | MaybeDone::Taken => unreachable!(), |
| 188 | } |
| 189 | } |
| 190 | |
| 191 | Poll::Ready(()) |
| 192 | } |
| 193 | } |
| 194 | |