1 | macro_rules! doc { |
2 | ($join:item) => { |
3 | /// Waits on multiple concurrent branches, returning when **all** branches |
4 | /// complete. |
5 | /// |
6 | /// The `join!` macro must be used inside of async functions, closures, and |
7 | /// blocks. |
8 | /// |
9 | /// The `join!` macro takes a list of async expressions and evaluates them |
10 | /// concurrently on the same task. Each async expression evaluates to a future |
11 | /// and the futures from each expression are multiplexed on the current task. |
12 | /// |
13 | /// When working with async expressions returning `Result`, `join!` will wait |
14 | /// for **all** branches complete regardless if any complete with `Err`. Use |
15 | /// [`try_join!`] to return early when `Err` is encountered. |
16 | /// |
17 | /// [`try_join!`]: crate::try_join |
18 | /// |
19 | /// # Notes |
20 | /// |
21 | /// The supplied futures are stored inline and do not require allocating a |
22 | /// `Vec`. |
23 | /// |
24 | /// ### Runtime characteristics |
25 | /// |
26 | /// By running all async expressions on the current task, the expressions are |
27 | /// able to run **concurrently** but not in **parallel**. This means all |
28 | /// expressions are run on the same thread and if one branch blocks the thread, |
29 | /// all other expressions will be unable to continue. If parallelism is |
30 | /// required, spawn each async expression using [`tokio::spawn`] and pass the |
31 | /// join handle to `join!`. |
32 | /// |
33 | /// [`tokio::spawn`]: crate::spawn |
34 | /// |
35 | /// # Examples |
36 | /// |
37 | /// Basic join with two branches |
38 | /// |
39 | /// ``` |
40 | /// async fn do_stuff_async() { |
41 | /// // async work |
42 | /// } |
43 | /// |
44 | /// async fn more_async_work() { |
45 | /// // more here |
46 | /// } |
47 | /// |
48 | /// #[tokio::main] |
49 | /// async fn main() { |
50 | /// let (first, second) = tokio::join!( |
51 | /// do_stuff_async(), |
52 | /// more_async_work()); |
53 | /// |
54 | /// // do something with the values |
55 | /// } |
56 | /// ``` |
57 | #[macro_export] |
58 | #[cfg_attr(docsrs, doc(cfg(feature = "macros" )))] |
59 | $join |
60 | }; |
61 | } |
62 | |
63 | #[cfg (doc)] |
64 | doc! {macro_rules! join { |
65 | ($($future:expr),*) => { unimplemented!() } |
66 | }} |
67 | |
68 | #[cfg (not(doc))] |
69 | doc! {macro_rules! join { |
70 | (@ { |
71 | // One `_` for each branch in the `join!` macro. This is not used once |
72 | // normalization is complete. |
73 | ( $($count:tt)* ) |
74 | |
75 | // The expression `0+1+1+ ... +1` equal to the number of branches. |
76 | ( $($total:tt)* ) |
77 | |
78 | // Normalized join! branches |
79 | $( ( $($skip:tt)* ) $e:expr, )* |
80 | |
81 | }) => {{ |
82 | use $crate::macros::support::{maybe_done, poll_fn, Future, Pin}; |
83 | use $crate::macros::support::Poll::{Ready, Pending}; |
84 | |
85 | // Safety: nothing must be moved out of `futures`. This is to satisfy |
86 | // the requirement of `Pin::new_unchecked` called below. |
87 | // |
88 | // We can't use the `pin!` macro for this because `futures` is a tuple |
89 | // and the standard library provides no way to pin-project to the fields |
90 | // of a tuple. |
91 | let mut futures = ( $( maybe_done($e), )* ); |
92 | |
93 | // This assignment makes sure that the `poll_fn` closure only has a |
94 | // reference to the futures, instead of taking ownership of them. This |
95 | // mitigates the issue described in |
96 | // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484> |
97 | let mut futures = &mut futures; |
98 | |
99 | // Each time the future created by poll_fn is polled, a different future will be polled first |
100 | // to ensure every future passed to join! gets a chance to make progress even if |
101 | // one of the futures consumes the whole budget. |
102 | // |
103 | // This is number of futures that will be skipped in the first loop |
104 | // iteration the next time. |
105 | let mut skip_next_time: u32 = 0; |
106 | |
107 | poll_fn(move |cx| { |
108 | const COUNT: u32 = $($total)*; |
109 | |
110 | let mut is_pending = false; |
111 | |
112 | let mut to_run = COUNT; |
113 | |
114 | // The number of futures that will be skipped in the first loop iteration. |
115 | let mut skip = skip_next_time; |
116 | |
117 | skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 }; |
118 | |
119 | // This loop runs twice and the first `skip` futures |
120 | // are not polled in the first iteration. |
121 | loop { |
122 | $( |
123 | if skip == 0 { |
124 | if to_run == 0 { |
125 | // Every future has been polled |
126 | break; |
127 | } |
128 | to_run -= 1; |
129 | |
130 | // Extract the future for this branch from the tuple. |
131 | let ( $($skip,)* fut, .. ) = &mut *futures; |
132 | |
133 | // Safety: future is stored on the stack above |
134 | // and never moved. |
135 | let mut fut = unsafe { Pin::new_unchecked(fut) }; |
136 | |
137 | // Try polling |
138 | if fut.poll(cx).is_pending() { |
139 | is_pending = true; |
140 | } |
141 | } else { |
142 | // Future skipped, one less future to skip in the next iteration |
143 | skip -= 1; |
144 | } |
145 | )* |
146 | } |
147 | |
148 | if is_pending { |
149 | Pending |
150 | } else { |
151 | Ready(($({ |
152 | // Extract the future for this branch from the tuple. |
153 | let ( $($skip,)* fut, .. ) = &mut futures; |
154 | |
155 | // Safety: future is stored on the stack above |
156 | // and never moved. |
157 | let mut fut = unsafe { Pin::new_unchecked(fut) }; |
158 | |
159 | fut.take_output().expect("expected completed future" ) |
160 | },)*)) |
161 | } |
162 | }).await |
163 | }}; |
164 | |
165 | // ===== Normalize ===== |
166 | |
167 | (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => { |
168 | $crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*) |
169 | }; |
170 | |
171 | // ===== Entry point ===== |
172 | |
173 | ( $($e:expr),+ $(,)?) => { |
174 | $crate::join!(@{ () (0) } $($e,)*) |
175 | }; |
176 | |
177 | () => { async {}.await } |
178 | }} |
179 | |