1 | /// Waits on multiple concurrent branches, returning when **all** branches |
2 | /// complete. |
3 | /// |
4 | /// The `join!` macro must be used inside of async functions, closures, and |
5 | /// blocks. |
6 | /// |
7 | /// The `join!` macro takes a list of async expressions and evaluates them |
8 | /// concurrently on the same task. Each async expression evaluates to a future |
9 | /// and the futures from each expression are multiplexed on the current task. |
10 | /// |
11 | /// When working with async expressions returning `Result`, `join!` will wait |
12 | /// for **all** branches complete regardless if any complete with `Err`. Use |
13 | /// [`try_join!`] to return early when `Err` is encountered. |
14 | /// |
15 | /// [`try_join!`]: crate::try_join |
16 | /// |
17 | /// # Notes |
18 | /// |
19 | /// The supplied futures are stored inline and does not require allocating a |
20 | /// `Vec`. |
21 | /// |
22 | /// ### Runtime characteristics |
23 | /// |
24 | /// By running all async expressions on the current task, the expressions are |
25 | /// able to run **concurrently** but not in **parallel**. This means all |
26 | /// expressions are run on the same thread and if one branch blocks the thread, |
27 | /// all other expressions will be unable to continue. If parallelism is |
28 | /// required, spawn each async expression using [`tokio::spawn`] and pass the |
29 | /// join handle to `join!`. |
30 | /// |
31 | /// [`tokio::spawn`]: crate::spawn |
32 | /// |
33 | /// # Examples |
34 | /// |
35 | /// Basic join with two branches |
36 | /// |
37 | /// ``` |
38 | /// async fn do_stuff_async() { |
39 | /// // async work |
40 | /// } |
41 | /// |
42 | /// async fn more_async_work() { |
43 | /// // more here |
44 | /// } |
45 | /// |
46 | /// #[tokio::main] |
47 | /// async fn main() { |
48 | /// let (first, second) = tokio::join!( |
49 | /// do_stuff_async(), |
50 | /// more_async_work()); |
51 | /// |
52 | /// // do something with the values |
53 | /// } |
54 | /// ``` |
55 | #[macro_export ] |
56 | #[cfg_attr (docsrs, doc(cfg(feature = "macros" )))] |
57 | macro_rules! join { |
58 | (@ { |
59 | // One `_` for each branch in the `join!` macro. This is not used once |
60 | // normalization is complete. |
61 | ( $($count:tt)* ) |
62 | |
63 | // The expression `0+1+1+ ... +1` equal to the number of branches. |
64 | ( $($total:tt)* ) |
65 | |
66 | // Normalized join! branches |
67 | $( ( $($skip:tt)* ) $e:expr, )* |
68 | |
69 | }) => {{ |
70 | use $crate::macros::support::{maybe_done, poll_fn, Future, Pin}; |
71 | use $crate::macros::support::Poll::{Ready, Pending}; |
72 | |
73 | // Safety: nothing must be moved out of `futures`. This is to satisfy |
74 | // the requirement of `Pin::new_unchecked` called below. |
75 | // |
76 | // We can't use the `pin!` macro for this because `futures` is a tuple |
77 | // and the standard library provides no way to pin-project to the fields |
78 | // of a tuple. |
79 | let mut futures = ( $( maybe_done($e), )* ); |
80 | |
81 | // This assignment makes sure that the `poll_fn` closure only has a |
82 | // reference to the futures, instead of taking ownership of them. This |
83 | // mitigates the issue described in |
84 | // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484> |
85 | let mut futures = &mut futures; |
86 | |
87 | // Each time the future created by poll_fn is polled, a different future will be polled first |
88 | // to ensure every future passed to join! gets a chance to make progress even if |
89 | // one of the futures consumes the whole budget. |
90 | // |
91 | // This is number of futures that will be skipped in the first loop |
92 | // iteration the next time. |
93 | let mut skip_next_time: u32 = 0; |
94 | |
95 | poll_fn(move |cx| { |
96 | const COUNT: u32 = $($total)*; |
97 | |
98 | let mut is_pending = false; |
99 | |
100 | let mut to_run = COUNT; |
101 | |
102 | // The number of futures that will be skipped in the first loop iteration. |
103 | let mut skip = skip_next_time; |
104 | |
105 | skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 }; |
106 | |
107 | // This loop runs twice and the first `skip` futures |
108 | // are not polled in the first iteration. |
109 | loop { |
110 | $( |
111 | if skip == 0 { |
112 | if to_run == 0 { |
113 | // Every future has been polled |
114 | break; |
115 | } |
116 | to_run -= 1; |
117 | |
118 | // Extract the future for this branch from the tuple. |
119 | let ( $($skip,)* fut, .. ) = &mut *futures; |
120 | |
121 | // Safety: future is stored on the stack above |
122 | // and never moved. |
123 | let mut fut = unsafe { Pin::new_unchecked(fut) }; |
124 | |
125 | // Try polling |
126 | if fut.poll(cx).is_pending() { |
127 | is_pending = true; |
128 | } |
129 | } else { |
130 | // Future skipped, one less future to skip in the next iteration |
131 | skip -= 1; |
132 | } |
133 | )* |
134 | } |
135 | |
136 | if is_pending { |
137 | Pending |
138 | } else { |
139 | Ready(($({ |
140 | // Extract the future for this branch from the tuple. |
141 | let ( $($skip,)* fut, .. ) = &mut futures; |
142 | |
143 | // Safety: future is stored on the stack above |
144 | // and never moved. |
145 | let mut fut = unsafe { Pin::new_unchecked(fut) }; |
146 | |
147 | fut.take_output().expect("expected completed future" ) |
148 | },)*)) |
149 | } |
150 | }).await |
151 | }}; |
152 | |
153 | // ===== Normalize ===== |
154 | |
155 | (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => { |
156 | $crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*) |
157 | }; |
158 | |
159 | // ===== Entry point ===== |
160 | |
161 | ( $($e:expr),+ $(,)?) => { |
162 | $crate::join!(@{ () (0) } $($e,)*) |
163 | }; |
164 | |
165 | () => { async {}.await } |
166 | } |
167 | |