1 | //! Types related to the [`TaskTracker`] collection. |
2 | //! |
3 | //! See the documentation of [`TaskTracker`] for more information. |
4 | |
5 | use pin_project_lite::pin_project; |
6 | use std::fmt; |
7 | use std::future::Future; |
8 | use std::pin::Pin; |
9 | use std::sync::atomic::{AtomicUsize, Ordering}; |
10 | use std::sync::Arc; |
11 | use std::task::{Context, Poll}; |
12 | use tokio::sync::{futures::Notified, Notify}; |
13 | |
14 | #[cfg (feature = "rt" )] |
15 | use tokio::{ |
16 | runtime::Handle, |
17 | task::{JoinHandle, LocalSet}, |
18 | }; |
19 | |
20 | /// A task tracker used for waiting until tasks exit. |
21 | /// |
22 | /// This is usually used together with [`CancellationToken`] to implement [graceful shutdown]. The |
23 | /// `CancellationToken` is used to signal to tasks that they should shut down, and the |
24 | /// `TaskTracker` is used to wait for them to finish shutting down. |
25 | /// |
26 | /// The `TaskTracker` will also keep track of a `closed` boolean. This is used to handle the case |
27 | /// where the `TaskTracker` is empty, but we don't want to shut down yet. This means that the |
28 | /// [`wait`] method will wait until *both* of the following happen at the same time: |
29 | /// |
30 | /// * The `TaskTracker` must be closed using the [`close`] method. |
31 | /// * The `TaskTracker` must be empty, that is, all tasks that it is tracking must have exited. |
32 | /// |
33 | /// When a call to [`wait`] returns, it is guaranteed that all tracked tasks have exited and that |
34 | /// the destructor of the future has finished running. However, there might be a short amount of |
35 | /// time where [`JoinHandle::is_finished`] returns false. |
36 | /// |
37 | /// # Comparison to `JoinSet` |
38 | /// |
39 | /// The main Tokio crate has a similar collection known as [`JoinSet`]. The `JoinSet` type has a |
40 | /// lot more features than `TaskTracker`, so `TaskTracker` should only be used when one of its |
41 | /// unique features is required: |
42 | /// |
43 | /// 1. When tasks exit, a `TaskTracker` will allow the task to immediately free its memory. |
44 | /// 2. By not closing the `TaskTracker`, [`wait`] will be prevented from from returning even if |
45 | /// the `TaskTracker` is empty. |
46 | /// 3. A `TaskTracker` does not require mutable access to insert tasks. |
47 | /// 4. A `TaskTracker` can be cloned to share it with many tasks. |
48 | /// |
49 | /// The first point is the most important one. A [`JoinSet`] keeps track of the return value of |
50 | /// every inserted task. This means that if the caller keeps inserting tasks and never calls |
51 | /// [`join_next`], then their return values will keep building up and consuming memory, _even if_ |
52 | /// most of the tasks have already exited. This can cause the process to run out of memory. With a |
53 | /// `TaskTracker`, this does not happen. Once tasks exit, they are immediately removed from the |
54 | /// `TaskTracker`. |
55 | /// |
56 | /// # Examples |
57 | /// |
58 | /// For more examples, please see the topic page on [graceful shutdown]. |
59 | /// |
60 | /// ## Spawn tasks and wait for them to exit |
61 | /// |
62 | /// This is a simple example. For this case, [`JoinSet`] should probably be used instead. |
63 | /// |
64 | /// ``` |
65 | /// use tokio_util::task::TaskTracker; |
66 | /// |
67 | /// #[tokio::main] |
68 | /// async fn main() { |
69 | /// let tracker = TaskTracker::new(); |
70 | /// |
71 | /// for i in 0..10 { |
72 | /// tracker.spawn(async move { |
73 | /// println!("Task {} is running!" , i); |
74 | /// }); |
75 | /// } |
76 | /// // Once we spawned everything, we close the tracker. |
77 | /// tracker.close(); |
78 | /// |
79 | /// // Wait for everything to finish. |
80 | /// tracker.wait().await; |
81 | /// |
82 | /// println!("This is printed after all of the tasks." ); |
83 | /// } |
84 | /// ``` |
85 | /// |
86 | /// ## Wait for tasks to exit |
87 | /// |
88 | /// This example shows the intended use-case of `TaskTracker`. It is used together with |
89 | /// [`CancellationToken`] to implement graceful shutdown. |
90 | /// ``` |
91 | /// use tokio_util::sync::CancellationToken; |
92 | /// use tokio_util::task::TaskTracker; |
93 | /// use tokio::time::{self, Duration}; |
94 | /// |
95 | /// async fn background_task(num: u64) { |
96 | /// for i in 0..10 { |
97 | /// time::sleep(Duration::from_millis(100*num)).await; |
98 | /// println!("Background task {} in iteration {}." , num, i); |
99 | /// } |
100 | /// } |
101 | /// |
102 | /// #[tokio::main] |
103 | /// # async fn _hidden() {} |
104 | /// # #[tokio::main(flavor = "current_thread" , start_paused = true)] |
105 | /// async fn main() { |
106 | /// let tracker = TaskTracker::new(); |
107 | /// let token = CancellationToken::new(); |
108 | /// |
109 | /// for i in 0..10 { |
110 | /// let token = token.clone(); |
111 | /// tracker.spawn(async move { |
112 | /// // Use a `tokio::select!` to kill the background task if the token is |
113 | /// // cancelled. |
114 | /// tokio::select! { |
115 | /// () = background_task(i) => { |
116 | /// println!("Task {} exiting normally." , i); |
117 | /// }, |
118 | /// () = token.cancelled() => { |
119 | /// // Do some cleanup before we really exit. |
120 | /// time::sleep(Duration::from_millis(50)).await; |
121 | /// println!("Task {} finished cleanup." , i); |
122 | /// }, |
123 | /// } |
124 | /// }); |
125 | /// } |
126 | /// |
127 | /// // Spawn a background task that will send the shutdown signal. |
128 | /// { |
129 | /// let tracker = tracker.clone(); |
130 | /// tokio::spawn(async move { |
131 | /// // Normally you would use something like ctrl-c instead of |
132 | /// // sleeping. |
133 | /// time::sleep(Duration::from_secs(2)).await; |
134 | /// tracker.close(); |
135 | /// token.cancel(); |
136 | /// }); |
137 | /// } |
138 | /// |
139 | /// // Wait for all tasks to exit. |
140 | /// tracker.wait().await; |
141 | /// |
142 | /// println!("All tasks have exited now." ); |
143 | /// } |
144 | /// ``` |
145 | /// |
146 | /// [`CancellationToken`]: crate::sync::CancellationToken |
147 | /// [`JoinHandle::is_finished`]: tokio::task::JoinHandle::is_finished |
148 | /// [`JoinSet`]: tokio::task::JoinSet |
149 | /// [`close`]: Self::close |
150 | /// [`join_next`]: tokio::task::JoinSet::join_next |
151 | /// [`wait`]: Self::wait |
152 | /// [graceful shutdown]: https://tokio.rs/tokio/topics/shutdown |
153 | pub struct TaskTracker { |
154 | inner: Arc<TaskTrackerInner>, |
155 | } |
156 | |
157 | /// Represents a task tracked by a [`TaskTracker`]. |
158 | #[must_use ] |
159 | #[derive(Debug)] |
160 | pub struct TaskTrackerToken { |
161 | task_tracker: TaskTracker, |
162 | } |
163 | |
164 | struct TaskTrackerInner { |
165 | /// Keeps track of the state. |
166 | /// |
167 | /// The lowest bit is whether the task tracker is closed. |
168 | /// |
169 | /// The rest of the bits count the number of tracked tasks. |
170 | state: AtomicUsize, |
171 | /// Used to notify when the last task exits. |
172 | on_last_exit: Notify, |
173 | } |
174 | |
175 | pin_project! { |
176 | /// A future that is tracked as a task by a [`TaskTracker`]. |
177 | /// |
178 | /// The associated [`TaskTracker`] cannot complete until this future is dropped. |
179 | /// |
180 | /// This future is returned by [`TaskTracker::track_future`]. |
181 | #[must_use = "futures do nothing unless polled" ] |
182 | pub struct TrackedFuture<F> { |
183 | #[pin] |
184 | future: F, |
185 | token: TaskTrackerToken, |
186 | } |
187 | } |
188 | |
189 | pin_project! { |
190 | /// A future that completes when the [`TaskTracker`] is empty and closed. |
191 | /// |
192 | /// This future is returned by [`TaskTracker::wait`]. |
193 | #[must_use = "futures do nothing unless polled" ] |
194 | pub struct TaskTrackerWaitFuture<'a> { |
195 | #[pin] |
196 | future: Notified<'a>, |
197 | inner: Option<&'a TaskTrackerInner>, |
198 | } |
199 | } |
200 | |
201 | impl TaskTrackerInner { |
202 | #[inline ] |
203 | fn new() -> Self { |
204 | Self { |
205 | state: AtomicUsize::new(0), |
206 | on_last_exit: Notify::new(), |
207 | } |
208 | } |
209 | |
210 | #[inline ] |
211 | fn is_closed_and_empty(&self) -> bool { |
212 | // If empty and closed bit set, then we are done. |
213 | // |
214 | // The acquire load will synchronize with the release store of any previous call to |
215 | // `set_closed` and `drop_task`. |
216 | self.state.load(Ordering::Acquire) == 1 |
217 | } |
218 | |
219 | #[inline ] |
220 | fn set_closed(&self) -> bool { |
221 | // The AcqRel ordering makes the closed bit behave like a `Mutex<bool>` for synchronization |
222 | // purposes. We do this because it makes the return value of `TaskTracker::{close,reopen}` |
223 | // more meaningful for the user. Without these orderings, this assert could fail: |
224 | // ``` |
225 | // // thread 1 |
226 | // some_other_atomic.store(true, Relaxed); |
227 | // tracker.close(); |
228 | // |
229 | // // thread 2 |
230 | // if tracker.reopen() { |
231 | // assert!(some_other_atomic.load(Relaxed)); |
232 | // } |
233 | // ``` |
234 | // However, with the AcqRel ordering, we establish a happens-before relationship from the |
235 | // call to `close` and the later call to `reopen` that returned true. |
236 | let state = self.state.fetch_or(1, Ordering::AcqRel); |
237 | |
238 | // If there are no tasks, and if it was not already closed: |
239 | if state == 0 { |
240 | self.notify_now(); |
241 | } |
242 | |
243 | (state & 1) == 0 |
244 | } |
245 | |
246 | #[inline ] |
247 | fn set_open(&self) -> bool { |
248 | // See `set_closed` regarding the AcqRel ordering. |
249 | let state = self.state.fetch_and(!1, Ordering::AcqRel); |
250 | (state & 1) == 1 |
251 | } |
252 | |
253 | #[inline ] |
254 | fn add_task(&self) { |
255 | self.state.fetch_add(2, Ordering::Relaxed); |
256 | } |
257 | |
258 | #[inline ] |
259 | fn drop_task(&self) { |
260 | let state = self.state.fetch_sub(2, Ordering::Release); |
261 | |
262 | // If this was the last task and we are closed: |
263 | if state == 3 { |
264 | self.notify_now(); |
265 | } |
266 | } |
267 | |
268 | #[cold ] |
269 | fn notify_now(&self) { |
270 | // Insert an acquire fence. This matters for `drop_task` but doesn't matter for |
271 | // `set_closed` since it already uses AcqRel. |
272 | // |
273 | // This synchronizes with the release store of any other call to `drop_task`, and with the |
274 | // release store in the call to `set_closed`. That ensures that everything that happened |
275 | // before those other calls to `drop_task` or `set_closed` will be visible after this load, |
276 | // and those things will also be visible to anything woken by the call to `notify_waiters`. |
277 | self.state.load(Ordering::Acquire); |
278 | |
279 | self.on_last_exit.notify_waiters(); |
280 | } |
281 | } |
282 | |
283 | impl TaskTracker { |
284 | /// Creates a new `TaskTracker`. |
285 | /// |
286 | /// The `TaskTracker` will start out as open. |
287 | #[must_use ] |
288 | pub fn new() -> Self { |
289 | Self { |
290 | inner: Arc::new(TaskTrackerInner::new()), |
291 | } |
292 | } |
293 | |
294 | /// Waits until this `TaskTracker` is both closed and empty. |
295 | /// |
296 | /// If the `TaskTracker` is already closed and empty when this method is called, then it |
297 | /// returns immediately. |
298 | /// |
299 | /// The `wait` future is resistant against [ABA problems][aba]. That is, if the `TaskTracker` |
300 | /// becomes both closed and empty for a short amount of time, then it is guarantee that all |
301 | /// `wait` futures that were created before the short time interval will trigger, even if they |
302 | /// are not polled during that short time interval. |
303 | /// |
304 | /// # Cancel safety |
305 | /// |
306 | /// This method is cancel safe. |
307 | /// |
308 | /// However, the resistance against [ABA problems][aba] is lost when using `wait` as the |
309 | /// condition in a `tokio::select!` loop. |
310 | /// |
311 | /// [aba]: https://en.wikipedia.org/wiki/ABA_problem |
312 | #[inline ] |
313 | pub fn wait(&self) -> TaskTrackerWaitFuture<'_> { |
314 | TaskTrackerWaitFuture { |
315 | future: self.inner.on_last_exit.notified(), |
316 | inner: if self.inner.is_closed_and_empty() { |
317 | None |
318 | } else { |
319 | Some(&self.inner) |
320 | }, |
321 | } |
322 | } |
323 | |
324 | /// Close this `TaskTracker`. |
325 | /// |
326 | /// This allows [`wait`] futures to complete. It does not prevent you from spawning new tasks. |
327 | /// |
328 | /// Returns `true` if this closed the `TaskTracker`, or `false` if it was already closed. |
329 | /// |
330 | /// [`wait`]: Self::wait |
331 | #[inline ] |
332 | pub fn close(&self) -> bool { |
333 | self.inner.set_closed() |
334 | } |
335 | |
336 | /// Reopen this `TaskTracker`. |
337 | /// |
338 | /// This prevents [`wait`] futures from completing even if the `TaskTracker` is empty. |
339 | /// |
340 | /// Returns `true` if this reopened the `TaskTracker`, or `false` if it was already open. |
341 | /// |
342 | /// [`wait`]: Self::wait |
343 | #[inline ] |
344 | pub fn reopen(&self) -> bool { |
345 | self.inner.set_open() |
346 | } |
347 | |
348 | /// Returns `true` if this `TaskTracker` is [closed](Self::close). |
349 | #[inline ] |
350 | #[must_use ] |
351 | pub fn is_closed(&self) -> bool { |
352 | (self.inner.state.load(Ordering::Acquire) & 1) != 0 |
353 | } |
354 | |
355 | /// Returns the number of tasks tracked by this `TaskTracker`. |
356 | #[inline ] |
357 | #[must_use ] |
358 | pub fn len(&self) -> usize { |
359 | self.inner.state.load(Ordering::Acquire) >> 1 |
360 | } |
361 | |
362 | /// Returns `true` if there are no tasks in this `TaskTracker`. |
363 | #[inline ] |
364 | #[must_use ] |
365 | pub fn is_empty(&self) -> bool { |
366 | self.inner.state.load(Ordering::Acquire) <= 1 |
367 | } |
368 | |
369 | /// Spawn the provided future on the current Tokio runtime, and track it in this `TaskTracker`. |
370 | /// |
371 | /// This is equivalent to `tokio::spawn(tracker.track_future(task))`. |
372 | #[inline ] |
373 | #[track_caller ] |
374 | #[cfg (feature = "rt" )] |
375 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
376 | pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output> |
377 | where |
378 | F: Future + Send + 'static, |
379 | F::Output: Send + 'static, |
380 | { |
381 | tokio::task::spawn(self.track_future(task)) |
382 | } |
383 | |
384 | /// Spawn the provided future on the provided Tokio runtime, and track it in this `TaskTracker`. |
385 | /// |
386 | /// This is equivalent to `handle.spawn(tracker.track_future(task))`. |
387 | #[inline ] |
388 | #[track_caller ] |
389 | #[cfg (feature = "rt" )] |
390 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
391 | pub fn spawn_on<F>(&self, task: F, handle: &Handle) -> JoinHandle<F::Output> |
392 | where |
393 | F: Future + Send + 'static, |
394 | F::Output: Send + 'static, |
395 | { |
396 | handle.spawn(self.track_future(task)) |
397 | } |
398 | |
399 | /// Spawn the provided future on the current [`LocalSet`], and track it in this `TaskTracker`. |
400 | /// |
401 | /// This is equivalent to `tokio::task::spawn_local(tracker.track_future(task))`. |
402 | /// |
403 | /// [`LocalSet`]: tokio::task::LocalSet |
404 | #[inline ] |
405 | #[track_caller ] |
406 | #[cfg (feature = "rt" )] |
407 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
408 | pub fn spawn_local<F>(&self, task: F) -> JoinHandle<F::Output> |
409 | where |
410 | F: Future + 'static, |
411 | F::Output: 'static, |
412 | { |
413 | tokio::task::spawn_local(self.track_future(task)) |
414 | } |
415 | |
416 | /// Spawn the provided future on the provided [`LocalSet`], and track it in this `TaskTracker`. |
417 | /// |
418 | /// This is equivalent to `local_set.spawn_local(tracker.track_future(task))`. |
419 | /// |
420 | /// [`LocalSet`]: tokio::task::LocalSet |
421 | #[inline ] |
422 | #[track_caller ] |
423 | #[cfg (feature = "rt" )] |
424 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
425 | pub fn spawn_local_on<F>(&self, task: F, local_set: &LocalSet) -> JoinHandle<F::Output> |
426 | where |
427 | F: Future + 'static, |
428 | F::Output: 'static, |
429 | { |
430 | local_set.spawn_local(self.track_future(task)) |
431 | } |
432 | |
433 | /// Spawn the provided blocking task on the current Tokio runtime, and track it in this `TaskTracker`. |
434 | /// |
435 | /// This is equivalent to `tokio::task::spawn_blocking(tracker.track_future(task))`. |
436 | #[inline ] |
437 | #[track_caller ] |
438 | #[cfg (feature = "rt" )] |
439 | #[cfg (not(target_family = "wasm" ))] |
440 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
441 | pub fn spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T> |
442 | where |
443 | F: FnOnce() -> T, |
444 | F: Send + 'static, |
445 | T: Send + 'static, |
446 | { |
447 | let token = self.token(); |
448 | tokio::task::spawn_blocking(move || { |
449 | let res = task(); |
450 | drop(token); |
451 | res |
452 | }) |
453 | } |
454 | |
455 | /// Spawn the provided blocking task on the provided Tokio runtime, and track it in this `TaskTracker`. |
456 | /// |
457 | /// This is equivalent to `handle.spawn_blocking(tracker.track_future(task))`. |
458 | #[inline ] |
459 | #[track_caller ] |
460 | #[cfg (feature = "rt" )] |
461 | #[cfg (not(target_family = "wasm" ))] |
462 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
463 | pub fn spawn_blocking_on<F, T>(&self, task: F, handle: &Handle) -> JoinHandle<T> |
464 | where |
465 | F: FnOnce() -> T, |
466 | F: Send + 'static, |
467 | T: Send + 'static, |
468 | { |
469 | let token = self.token(); |
470 | handle.spawn_blocking(move || { |
471 | let res = task(); |
472 | drop(token); |
473 | res |
474 | }) |
475 | } |
476 | |
477 | /// Track the provided future. |
478 | /// |
479 | /// The returned [`TrackedFuture`] will count as a task tracked by this collection, and will |
480 | /// prevent calls to [`wait`] from returning until the task is dropped. |
481 | /// |
482 | /// The task is removed from the collection when it is dropped, not when [`poll`] returns |
483 | /// [`Poll::Ready`]. |
484 | /// |
485 | /// # Examples |
486 | /// |
487 | /// Track a future spawned with [`tokio::spawn`]. |
488 | /// |
489 | /// ``` |
490 | /// # async fn my_async_fn() {} |
491 | /// use tokio_util::task::TaskTracker; |
492 | /// |
493 | /// # #[tokio::main(flavor = "current_thread" )] |
494 | /// # async fn main() { |
495 | /// let tracker = TaskTracker::new(); |
496 | /// |
497 | /// tokio::spawn(tracker.track_future(my_async_fn())); |
498 | /// # } |
499 | /// ``` |
500 | /// |
501 | /// Track a future spawned on a [`JoinSet`]. |
502 | /// ``` |
503 | /// # async fn my_async_fn() {} |
504 | /// use tokio::task::JoinSet; |
505 | /// use tokio_util::task::TaskTracker; |
506 | /// |
507 | /// # #[tokio::main(flavor = "current_thread" )] |
508 | /// # async fn main() { |
509 | /// let tracker = TaskTracker::new(); |
510 | /// let mut join_set = JoinSet::new(); |
511 | /// |
512 | /// join_set.spawn(tracker.track_future(my_async_fn())); |
513 | /// # } |
514 | /// ``` |
515 | /// |
516 | /// [`JoinSet`]: tokio::task::JoinSet |
517 | /// [`Poll::Pending`]: std::task::Poll::Pending |
518 | /// [`poll`]: std::future::Future::poll |
519 | /// [`wait`]: Self::wait |
520 | #[inline ] |
521 | pub fn track_future<F: Future>(&self, future: F) -> TrackedFuture<F> { |
522 | TrackedFuture { |
523 | future, |
524 | token: self.token(), |
525 | } |
526 | } |
527 | |
528 | /// Creates a [`TaskTrackerToken`] representing a task tracked by this `TaskTracker`. |
529 | /// |
530 | /// This token is a lower-level utility than the spawn methods. Each token is considered to |
531 | /// correspond to a task. As long as the token exists, the `TaskTracker` cannot complete. |
532 | /// Furthermore, the count returned by the [`len`] method will include the tokens in the count. |
533 | /// |
534 | /// Dropping the token indicates to the `TaskTracker` that the task has exited. |
535 | /// |
536 | /// [`len`]: TaskTracker::len |
537 | #[inline ] |
538 | pub fn token(&self) -> TaskTrackerToken { |
539 | self.inner.add_task(); |
540 | TaskTrackerToken { |
541 | task_tracker: self.clone(), |
542 | } |
543 | } |
544 | |
545 | /// Returns `true` if both task trackers correspond to the same set of tasks. |
546 | /// |
547 | /// # Examples |
548 | /// |
549 | /// ``` |
550 | /// use tokio_util::task::TaskTracker; |
551 | /// |
552 | /// let tracker_1 = TaskTracker::new(); |
553 | /// let tracker_2 = TaskTracker::new(); |
554 | /// let tracker_1_clone = tracker_1.clone(); |
555 | /// |
556 | /// assert!(TaskTracker::ptr_eq(&tracker_1, &tracker_1_clone)); |
557 | /// assert!(!TaskTracker::ptr_eq(&tracker_1, &tracker_2)); |
558 | /// ``` |
559 | #[inline ] |
560 | #[must_use ] |
561 | pub fn ptr_eq(left: &TaskTracker, right: &TaskTracker) -> bool { |
562 | Arc::ptr_eq(&left.inner, &right.inner) |
563 | } |
564 | } |
565 | |
566 | impl Default for TaskTracker { |
567 | /// Creates a new `TaskTracker`. |
568 | /// |
569 | /// The `TaskTracker` will start out as open. |
570 | #[inline ] |
571 | fn default() -> TaskTracker { |
572 | TaskTracker::new() |
573 | } |
574 | } |
575 | |
576 | impl Clone for TaskTracker { |
577 | /// Returns a new `TaskTracker` that tracks the same set of tasks. |
578 | /// |
579 | /// Since the new `TaskTracker` shares the same set of tasks, changes to one set are visible in |
580 | /// all other clones. |
581 | /// |
582 | /// # Examples |
583 | /// |
584 | /// ``` |
585 | /// use tokio_util::task::TaskTracker; |
586 | /// |
587 | /// #[tokio::main] |
588 | /// # async fn _hidden() {} |
589 | /// # #[tokio::main(flavor = "current_thread" )] |
590 | /// async fn main() { |
591 | /// let tracker = TaskTracker::new(); |
592 | /// let cloned = tracker.clone(); |
593 | /// |
594 | /// // Spawns on `tracker` are visible in `cloned`. |
595 | /// tracker.spawn(std::future::pending::<()>()); |
596 | /// assert_eq!(cloned.len(), 1); |
597 | /// |
598 | /// // Spawns on `cloned` are visible in `tracker`. |
599 | /// cloned.spawn(std::future::pending::<()>()); |
600 | /// assert_eq!(tracker.len(), 2); |
601 | /// |
602 | /// // Calling `close` is visible to `cloned`. |
603 | /// tracker.close(); |
604 | /// assert!(cloned.is_closed()); |
605 | /// |
606 | /// // Calling `reopen` is visible to `tracker`. |
607 | /// cloned.reopen(); |
608 | /// assert!(!tracker.is_closed()); |
609 | /// } |
610 | /// ``` |
611 | #[inline ] |
612 | fn clone(&self) -> TaskTracker { |
613 | Self { |
614 | inner: self.inner.clone(), |
615 | } |
616 | } |
617 | } |
618 | |
619 | fn debug_inner(inner: &TaskTrackerInner, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
620 | let state = inner.state.load(Ordering::Acquire); |
621 | let is_closed = (state & 1) != 0; |
622 | let len = state >> 1; |
623 | |
624 | f.debug_struct("TaskTracker" ) |
625 | .field("len" , &len) |
626 | .field("is_closed" , &is_closed) |
627 | .field("inner" , &(inner as *const TaskTrackerInner)) |
628 | .finish() |
629 | } |
630 | |
631 | impl fmt::Debug for TaskTracker { |
632 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
633 | debug_inner(&self.inner, f) |
634 | } |
635 | } |
636 | |
637 | impl TaskTrackerToken { |
638 | /// Returns the [`TaskTracker`] that this token is associated with. |
639 | #[inline ] |
640 | #[must_use ] |
641 | pub fn task_tracker(&self) -> &TaskTracker { |
642 | &self.task_tracker |
643 | } |
644 | } |
645 | |
646 | impl Clone for TaskTrackerToken { |
647 | /// Returns a new `TaskTrackerToken` associated with the same [`TaskTracker`]. |
648 | /// |
649 | /// This is equivalent to `token.task_tracker().token()`. |
650 | #[inline ] |
651 | fn clone(&self) -> TaskTrackerToken { |
652 | self.task_tracker.token() |
653 | } |
654 | } |
655 | |
656 | impl Drop for TaskTrackerToken { |
657 | /// Dropping the token indicates to the [`TaskTracker`] that the task has exited. |
658 | #[inline ] |
659 | fn drop(&mut self) { |
660 | self.task_tracker.inner.drop_task(); |
661 | } |
662 | } |
663 | |
664 | impl<F: Future> Future for TrackedFuture<F> { |
665 | type Output = F::Output; |
666 | |
667 | #[inline ] |
668 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> { |
669 | self.project().future.poll(cx) |
670 | } |
671 | } |
672 | |
673 | impl<F: fmt::Debug> fmt::Debug for TrackedFuture<F> { |
674 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
675 | f.debug_struct("TrackedFuture" ) |
676 | .field("future" , &self.future) |
677 | .field("task_tracker" , self.token.task_tracker()) |
678 | .finish() |
679 | } |
680 | } |
681 | |
682 | impl<'a> Future for TaskTrackerWaitFuture<'a> { |
683 | type Output = (); |
684 | |
685 | #[inline ] |
686 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
687 | let me = self.project(); |
688 | |
689 | let inner = match me.inner.as_ref() { |
690 | None => return Poll::Ready(()), |
691 | Some(inner) => inner, |
692 | }; |
693 | |
694 | let ready = inner.is_closed_and_empty() || me.future.poll(cx).is_ready(); |
695 | if ready { |
696 | *me.inner = None; |
697 | Poll::Ready(()) |
698 | } else { |
699 | Poll::Pending |
700 | } |
701 | } |
702 | } |
703 | |
704 | impl<'a> fmt::Debug for TaskTrackerWaitFuture<'a> { |
705 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
706 | struct Helper<'a>(&'a TaskTrackerInner); |
707 | |
708 | impl fmt::Debug for Helper<'_> { |
709 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
710 | debug_inner(self.0, f) |
711 | } |
712 | } |
713 | |
714 | f.debug_struct("TaskTrackerWaitFuture" ) |
715 | .field("future" , &self.future) |
716 | .field("task_tracker" , &self.inner.map(Helper)) |
717 | .finish() |
718 | } |
719 | } |
720 | |