| 1 | macro_rules! doc { |
| 2 | ($join:item) => { |
| 3 | /// Waits on multiple concurrent branches, returning when **all** branches |
| 4 | /// complete. |
| 5 | /// |
| 6 | /// The `join!` macro must be used inside of async functions, closures, and |
| 7 | /// blocks. |
| 8 | /// |
| 9 | /// The `join!` macro takes a list of async expressions and evaluates them |
| 10 | /// concurrently on the same task. Each async expression evaluates to a future |
| 11 | /// and the futures from each expression are multiplexed on the current task. |
| 12 | /// |
| 13 | /// When working with async expressions returning `Result`, `join!` will wait |
| 14 | /// for **all** branches complete regardless if any complete with `Err`. Use |
| 15 | /// [`try_join!`] to return early when `Err` is encountered. |
| 16 | /// |
| 17 | /// [`try_join!`]: crate::try_join |
| 18 | /// |
| 19 | /// # Notes |
| 20 | /// |
| 21 | /// The supplied futures are stored inline and do not require allocating a |
| 22 | /// `Vec`. |
| 23 | /// |
| 24 | /// ### Runtime characteristics |
| 25 | /// |
| 26 | /// By running all async expressions on the current task, the expressions are |
| 27 | /// able to run **concurrently** but not in **parallel**. This means all |
| 28 | /// expressions are run on the same thread and if one branch blocks the thread, |
| 29 | /// all other expressions will be unable to continue. If parallelism is |
| 30 | /// required, spawn each async expression using [`tokio::spawn`] and pass the |
| 31 | /// join handle to `join!`. |
| 32 | /// |
| 33 | /// [`tokio::spawn`]: crate::spawn |
| 34 | /// |
| 35 | /// # Examples |
| 36 | /// |
| 37 | /// Basic join with two branches |
| 38 | /// |
| 39 | /// ``` |
| 40 | /// async fn do_stuff_async() { |
| 41 | /// // async work |
| 42 | /// } |
| 43 | /// |
| 44 | /// async fn more_async_work() { |
| 45 | /// // more here |
| 46 | /// } |
| 47 | /// |
| 48 | /// #[tokio::main] |
| 49 | /// async fn main() { |
| 50 | /// let (first, second) = tokio::join!( |
| 51 | /// do_stuff_async(), |
| 52 | /// more_async_work()); |
| 53 | /// |
| 54 | /// // do something with the values |
| 55 | /// } |
| 56 | /// ``` |
| 57 | #[macro_export] |
| 58 | #[cfg_attr(docsrs, doc(cfg(feature = "macros" )))] |
| 59 | $join |
| 60 | }; |
| 61 | } |
| 62 | |
| 63 | #[cfg (doc)] |
| 64 | doc! {macro_rules! join { |
| 65 | ($($future:expr),*) => { unimplemented!() } |
| 66 | }} |
| 67 | |
| 68 | #[cfg (not(doc))] |
| 69 | doc! {macro_rules! join { |
| 70 | (@ { |
| 71 | // One `_` for each branch in the `join!` macro. This is not used once |
| 72 | // normalization is complete. |
| 73 | ( $($count:tt)* ) |
| 74 | |
| 75 | // The expression `0+1+1+ ... +1` equal to the number of branches. |
| 76 | ( $($total:tt)* ) |
| 77 | |
| 78 | // Normalized join! branches |
| 79 | $( ( $($skip:tt)* ) $e:expr, )* |
| 80 | |
| 81 | }) => {{ |
| 82 | use $crate::macros::support::{maybe_done, poll_fn, Future, Pin}; |
| 83 | use $crate::macros::support::Poll::{Ready, Pending}; |
| 84 | |
| 85 | // Safety: nothing must be moved out of `futures`. This is to satisfy |
| 86 | // the requirement of `Pin::new_unchecked` called below. |
| 87 | // |
| 88 | // We can't use the `pin!` macro for this because `futures` is a tuple |
| 89 | // and the standard library provides no way to pin-project to the fields |
| 90 | // of a tuple. |
| 91 | let mut futures = ( $( maybe_done($e), )* ); |
| 92 | |
| 93 | // This assignment makes sure that the `poll_fn` closure only has a |
| 94 | // reference to the futures, instead of taking ownership of them. This |
| 95 | // mitigates the issue described in |
| 96 | // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484> |
| 97 | let mut futures = &mut futures; |
| 98 | |
| 99 | // Each time the future created by poll_fn is polled, a different future will be polled first |
| 100 | // to ensure every future passed to join! gets a chance to make progress even if |
| 101 | // one of the futures consumes the whole budget. |
| 102 | // |
| 103 | // This is number of futures that will be skipped in the first loop |
| 104 | // iteration the next time. |
| 105 | let mut skip_next_time: u32 = 0; |
| 106 | |
| 107 | poll_fn(move |cx| { |
| 108 | const COUNT: u32 = $($total)*; |
| 109 | |
| 110 | let mut is_pending = false; |
| 111 | |
| 112 | let mut to_run = COUNT; |
| 113 | |
| 114 | // The number of futures that will be skipped in the first loop iteration. |
| 115 | let mut skip = skip_next_time; |
| 116 | |
| 117 | skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 }; |
| 118 | |
| 119 | // This loop runs twice and the first `skip` futures |
| 120 | // are not polled in the first iteration. |
| 121 | loop { |
| 122 | $( |
| 123 | if skip == 0 { |
| 124 | if to_run == 0 { |
| 125 | // Every future has been polled |
| 126 | break; |
| 127 | } |
| 128 | to_run -= 1; |
| 129 | |
| 130 | // Extract the future for this branch from the tuple. |
| 131 | let ( $($skip,)* fut, .. ) = &mut *futures; |
| 132 | |
| 133 | // Safety: future is stored on the stack above |
| 134 | // and never moved. |
| 135 | let mut fut = unsafe { Pin::new_unchecked(fut) }; |
| 136 | |
| 137 | // Try polling |
| 138 | if fut.poll(cx).is_pending() { |
| 139 | is_pending = true; |
| 140 | } |
| 141 | } else { |
| 142 | // Future skipped, one less future to skip in the next iteration |
| 143 | skip -= 1; |
| 144 | } |
| 145 | )* |
| 146 | } |
| 147 | |
| 148 | if is_pending { |
| 149 | Pending |
| 150 | } else { |
| 151 | Ready(($({ |
| 152 | // Extract the future for this branch from the tuple. |
| 153 | let ( $($skip,)* fut, .. ) = &mut futures; |
| 154 | |
| 155 | // Safety: future is stored on the stack above |
| 156 | // and never moved. |
| 157 | let mut fut = unsafe { Pin::new_unchecked(fut) }; |
| 158 | |
| 159 | fut.take_output().expect("expected completed future" ) |
| 160 | },)*)) |
| 161 | } |
| 162 | }).await |
| 163 | }}; |
| 164 | |
| 165 | // ===== Normalize ===== |
| 166 | |
| 167 | (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => { |
| 168 | $crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*) |
| 169 | }; |
| 170 | |
| 171 | // ===== Entry point ===== |
| 172 | |
| 173 | ( $($e:expr),+ $(,)?) => { |
| 174 | $crate::join!(@{ () (0) } $($e,)*) |
| 175 | }; |
| 176 | |
| 177 | () => { async {}.await } |
| 178 | }} |
| 179 | |