1 | use crate::runtime::task::{Header, RawTask}; |
2 | |
3 | use std::fmt; |
4 | use std::future::Future; |
5 | use std::marker::PhantomData; |
6 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
7 | use std::pin::Pin; |
8 | use std::task::{Context, Poll, Waker}; |
9 | |
10 | cfg_rt! { |
11 | /// An owned permission to join on a task (await its termination). |
12 | /// |
13 | /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] |
14 | /// for a Tokio task rather than a thread. Note that the background task |
15 | /// associated with this `JoinHandle` started running immediately when you |
16 | /// called spawn, even if you have not yet awaited the `JoinHandle`. |
17 | /// |
18 | /// A `JoinHandle` *detaches* the associated task when it is dropped, which |
19 | /// means that there is no longer any handle to the task, and no way to `join` |
20 | /// on it. |
21 | /// |
22 | /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`] |
23 | /// functions. |
24 | /// |
25 | /// # Cancel safety |
26 | /// |
27 | /// The `&mut JoinHandle<T>` type is cancel safe. If it is used as the event |
28 | /// in a `tokio::select!` statement and some other branch completes first, |
29 | /// then it is guaranteed that the output of the task is not lost. |
30 | /// |
31 | /// If a `JoinHandle` is dropped, then the task continues running in the |
32 | /// background and its return value is lost. |
33 | /// |
34 | /// # Examples |
35 | /// |
36 | /// Creation from [`task::spawn`]: |
37 | /// |
38 | /// ``` |
39 | /// use tokio::task; |
40 | /// |
41 | /// # async fn doc() { |
42 | /// let join_handle: task::JoinHandle<_> = task::spawn(async { |
43 | /// // some work here |
44 | /// }); |
45 | /// # } |
46 | /// ``` |
47 | /// |
48 | /// Creation from [`task::spawn_blocking`]: |
49 | /// |
50 | /// ``` |
51 | /// use tokio::task; |
52 | /// |
53 | /// # async fn doc() { |
54 | /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| { |
55 | /// // some blocking work here |
56 | /// }); |
57 | /// # } |
58 | /// ``` |
59 | /// |
60 | /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task. |
61 | /// If the return value is an `i32`, the join handle has type `JoinHandle<i32>`: |
62 | /// |
63 | /// ``` |
64 | /// use tokio::task; |
65 | /// |
66 | /// # async fn doc() { |
67 | /// let join_handle: task::JoinHandle<i32> = task::spawn(async { |
68 | /// 5 + 3 |
69 | /// }); |
70 | /// # } |
71 | /// |
72 | /// ``` |
73 | /// |
74 | /// If the task does not have a return value, the join handle has type `JoinHandle<()>`: |
75 | /// |
76 | /// ``` |
77 | /// use tokio::task; |
78 | /// |
79 | /// # async fn doc() { |
80 | /// let join_handle: task::JoinHandle<()> = task::spawn(async { |
81 | /// println!("I return nothing."); |
82 | /// }); |
83 | /// # } |
84 | /// ``` |
85 | /// |
86 | /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a |
87 | /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has |
88 | /// to be double chained to extract the returned value: |
89 | /// |
90 | /// ``` |
91 | /// use tokio::task; |
92 | /// use std::io; |
93 | /// |
94 | /// #[tokio::main] |
95 | /// async fn main() -> io::Result<()> { |
96 | /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async { |
97 | /// Ok(5 + 3) |
98 | /// }); |
99 | /// |
100 | /// let result = join_handle.await??; |
101 | /// assert_eq!(result, 8); |
102 | /// Ok(()) |
103 | /// } |
104 | /// ``` |
105 | /// |
106 | /// If the task panics, the error is a [`JoinError`] that contains the panic: |
107 | /// |
108 | /// ``` |
109 | /// use tokio::task; |
110 | /// use std::io; |
111 | /// use std::panic; |
112 | /// |
113 | /// #[tokio::main] |
114 | /// async fn main() -> io::Result<()> { |
115 | /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async { |
116 | /// panic!("boom"); |
117 | /// }); |
118 | /// |
119 | /// let err = join_handle.await.unwrap_err(); |
120 | /// assert!(err.is_panic()); |
121 | /// Ok(()) |
122 | /// } |
123 | /// |
124 | /// ``` |
125 | /// Child being detached and outliving its parent: |
126 | /// |
127 | /// ```no_run |
128 | /// use tokio::task; |
129 | /// use tokio::time; |
130 | /// use std::time::Duration; |
131 | /// |
132 | /// # #[tokio::main] async fn main() { |
133 | /// let original_task = task::spawn(async { |
134 | /// let _detached_task = task::spawn(async { |
135 | /// // Here we sleep to make sure that the first task returns before. |
136 | /// time::sleep(Duration::from_millis(10)).await; |
137 | /// // This will be called, even though the JoinHandle is dropped. |
138 | /// println!("♫ Still alive ♫"); |
139 | /// }); |
140 | /// }); |
141 | /// |
142 | /// original_task.await.expect("The task being joined has panicked"); |
143 | /// println!("Original task is joined."); |
144 | /// |
145 | /// // We make sure that the new task has time to run, before the main |
146 | /// // task returns. |
147 | /// |
148 | /// time::sleep(Duration::from_millis(1000)).await; |
149 | /// # } |
150 | /// ``` |
151 | /// |
152 | /// [`task::spawn`]: crate::task::spawn() |
153 | /// [`task::spawn_blocking`]: crate::task::spawn_blocking |
154 | /// [`std::thread::JoinHandle`]: std::thread::JoinHandle |
155 | /// [`JoinError`]: crate::task::JoinError |
156 | pub struct JoinHandle<T> { |
157 | raw: RawTask, |
158 | _p: PhantomData<T>, |
159 | } |
160 | } |
161 | |
162 | unsafe impl<T: Send> Send for JoinHandle<T> {} |
163 | unsafe impl<T: Send> Sync for JoinHandle<T> {} |
164 | |
165 | impl<T> UnwindSafe for JoinHandle<T> {} |
166 | impl<T> RefUnwindSafe for JoinHandle<T> {} |
167 | |
168 | impl<T> JoinHandle<T> { |
169 | pub(super) fn new(raw: RawTask) -> JoinHandle<T> { |
170 | JoinHandle { |
171 | raw, |
172 | _p: PhantomData, |
173 | } |
174 | } |
175 | |
176 | /// Abort the task associated with the handle. |
177 | /// |
178 | /// Awaiting a cancelled task might complete as usual if the task was |
179 | /// already completed at the time it was cancelled, but most likely it |
180 | /// will fail with a [cancelled] `JoinError`. |
181 | /// |
182 | /// See also [the module level docs] for more information on cancellation. |
183 | /// |
184 | /// ```rust |
185 | /// use tokio::time; |
186 | /// |
187 | /// # #[tokio::main(flavor = "current_thread" , start_paused = true)] |
188 | /// # async fn main() { |
189 | /// let mut handles = Vec::new(); |
190 | /// |
191 | /// handles.push(tokio::spawn(async { |
192 | /// time::sleep(time::Duration::from_secs(10)).await; |
193 | /// true |
194 | /// })); |
195 | /// |
196 | /// handles.push(tokio::spawn(async { |
197 | /// time::sleep(time::Duration::from_secs(10)).await; |
198 | /// false |
199 | /// })); |
200 | /// |
201 | /// for handle in &handles { |
202 | /// handle.abort(); |
203 | /// } |
204 | /// |
205 | /// for handle in handles { |
206 | /// assert!(handle.await.unwrap_err().is_cancelled()); |
207 | /// } |
208 | /// # } |
209 | /// ``` |
210 | /// |
211 | /// [cancelled]: method@super::error::JoinError::is_cancelled |
212 | /// [the module level docs]: crate::task#cancellation |
213 | pub fn abort(&self) { |
214 | self.raw.remote_abort(); |
215 | } |
216 | |
217 | /// Checks if the task associated with this `JoinHandle` has finished. |
218 | /// |
219 | /// Please note that this method can return `false` even if [`abort`] has been |
220 | /// called on the task. This is because the cancellation process may take |
221 | /// some time, and this method does not return `true` until it has |
222 | /// completed. |
223 | /// |
224 | /// ```rust |
225 | /// use tokio::time; |
226 | /// |
227 | /// # #[tokio::main(flavor = "current_thread" , start_paused = true)] |
228 | /// # async fn main() { |
229 | /// let handle1 = tokio::spawn(async { |
230 | /// // do some stuff here |
231 | /// }); |
232 | /// let handle2 = tokio::spawn(async { |
233 | /// // do some other stuff here |
234 | /// time::sleep(time::Duration::from_secs(10)).await; |
235 | /// }); |
236 | /// // Wait for the task to finish |
237 | /// handle2.abort(); |
238 | /// time::sleep(time::Duration::from_secs(1)).await; |
239 | /// assert!(handle1.is_finished()); |
240 | /// assert!(handle2.is_finished()); |
241 | /// # } |
242 | /// ``` |
243 | /// [`abort`]: method@JoinHandle::abort |
244 | pub fn is_finished(&self) -> bool { |
245 | let state = self.raw.header().state.load(); |
246 | state.is_complete() |
247 | } |
248 | |
249 | /// Set the waker that is notified when the task completes. |
250 | pub(crate) fn set_join_waker(&mut self, waker: &Waker) { |
251 | if self.raw.try_set_join_waker(waker) { |
252 | // In this case the task has already completed. We wake the waker immediately. |
253 | waker.wake_by_ref(); |
254 | } |
255 | } |
256 | |
257 | /// Returns a new `AbortHandle` that can be used to remotely abort this task. |
258 | /// |
259 | /// Awaiting a task cancelled by the `AbortHandle` might complete as usual if the task was |
260 | /// already completed at the time it was cancelled, but most likely it |
261 | /// will fail with a [cancelled] `JoinError`. |
262 | /// |
263 | /// ```rust |
264 | /// use tokio::{time, task}; |
265 | /// |
266 | /// # #[tokio::main(flavor = "current_thread" , start_paused = true)] |
267 | /// # async fn main() { |
268 | /// let mut handles = Vec::new(); |
269 | /// |
270 | /// handles.push(tokio::spawn(async { |
271 | /// time::sleep(time::Duration::from_secs(10)).await; |
272 | /// true |
273 | /// })); |
274 | /// |
275 | /// handles.push(tokio::spawn(async { |
276 | /// time::sleep(time::Duration::from_secs(10)).await; |
277 | /// false |
278 | /// })); |
279 | /// |
280 | /// let abort_handles: Vec<task::AbortHandle> = handles.iter().map(|h| h.abort_handle()).collect(); |
281 | /// |
282 | /// for handle in abort_handles { |
283 | /// handle.abort(); |
284 | /// } |
285 | /// |
286 | /// for handle in handles { |
287 | /// assert!(handle.await.unwrap_err().is_cancelled()); |
288 | /// } |
289 | /// # } |
290 | /// ``` |
291 | /// [cancelled]: method@super::error::JoinError::is_cancelled |
292 | pub fn abort_handle(&self) -> super::AbortHandle { |
293 | self.raw.ref_inc(); |
294 | super::AbortHandle::new(self.raw) |
295 | } |
296 | |
297 | /// Returns a [task ID] that uniquely identifies this task relative to other |
298 | /// currently spawned tasks. |
299 | /// |
300 | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
301 | /// may break in 1.x releases. See [the documentation on unstable |
302 | /// features][unstable] for details. |
303 | /// |
304 | /// [task ID]: crate::task::Id |
305 | /// [unstable]: crate#unstable-features |
306 | #[cfg (tokio_unstable)] |
307 | #[cfg_attr (docsrs, doc(cfg(tokio_unstable)))] |
308 | pub fn id(&self) -> super::Id { |
309 | // Safety: The header pointer is valid. |
310 | unsafe { Header::get_id(self.raw.header_ptr()) } |
311 | } |
312 | } |
313 | |
314 | impl<T> Unpin for JoinHandle<T> {} |
315 | |
316 | impl<T> Future for JoinHandle<T> { |
317 | type Output = super::Result<T>; |
318 | |
319 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
320 | ready!(crate::trace::trace_leaf(cx)); |
321 | let mut ret = Poll::Pending; |
322 | |
323 | // Keep track of task budget |
324 | let coop = ready!(crate::runtime::coop::poll_proceed(cx)); |
325 | |
326 | // Try to read the task output. If the task is not yet complete, the |
327 | // waker is stored and is notified once the task does complete. |
328 | // |
329 | // The function must go via the vtable, which requires erasing generic |
330 | // types. To do this, the function "return" is placed on the stack |
331 | // **before** calling the function and is passed into the function using |
332 | // `*mut ()`. |
333 | // |
334 | // Safety: |
335 | // |
336 | // The type of `T` must match the task's output type. |
337 | unsafe { |
338 | self.raw |
339 | .try_read_output(&mut ret as *mut _ as *mut (), cx.waker()); |
340 | } |
341 | |
342 | if ret.is_ready() { |
343 | coop.made_progress(); |
344 | } |
345 | |
346 | ret |
347 | } |
348 | } |
349 | |
350 | impl<T> Drop for JoinHandle<T> { |
351 | fn drop(&mut self) { |
352 | if self.raw.state().drop_join_handle_fast().is_ok() { |
353 | return; |
354 | } |
355 | |
356 | self.raw.drop_join_handle_slow(); |
357 | } |
358 | } |
359 | |
360 | impl<T> fmt::Debug for JoinHandle<T> |
361 | where |
362 | T: fmt::Debug, |
363 | { |
364 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
365 | // Safety: The header pointer is valid. |
366 | let id_ptr: NonNull = unsafe { Header::get_id_ptr(self.raw.header_ptr()) }; |
367 | let id: &Id = unsafe { id_ptr.as_ref() }; |
368 | fmt.debug_struct("JoinHandle" ).field(name:"id" , value:id).finish() |
369 | } |
370 | } |
371 | |