1//! An asynchronously awaitable `CancellationToken`.
2//! The token allows to signal a cancellation request to one or more tasks.
3pub(crate) mod guard;
4mod tree_node;
5
6use crate::loom::sync::Arc;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use guard::DropGuard;
12use pin_project_lite::pin_project;
13
14/// A token which can be used to signal a cancellation request to one or more
15/// tasks.
16///
17/// Tasks can call [`CancellationToken::cancelled()`] in order to
18/// obtain a Future which will be resolved when cancellation is requested.
19///
20/// Cancellation can be requested through the [`CancellationToken::cancel`] method.
21///
22/// # Examples
23///
24/// ```no_run
25/// use tokio::select;
26/// use tokio_util::sync::CancellationToken;
27///
28/// #[tokio::main]
29/// async fn main() {
30/// let token = CancellationToken::new();
31/// let cloned_token = token.clone();
32///
33/// let join_handle = tokio::spawn(async move {
34/// // Wait for either cancellation or a very long time
35/// select! {
36/// _ = cloned_token.cancelled() => {
37/// // The token was cancelled
38/// 5
39/// }
40/// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
41/// 99
42/// }
43/// }
44/// });
45///
46/// tokio::spawn(async move {
47/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
48/// token.cancel();
49/// });
50///
51/// assert_eq!(5, join_handle.await.unwrap());
52/// }
53/// ```
54pub struct CancellationToken {
55 inner: Arc<tree_node::TreeNode>,
56}
57
58pin_project! {
59 /// A Future that is resolved once the corresponding [`CancellationToken`]
60 /// is cancelled.
61 #[must_use = "futures do nothing unless polled"]
62 pub struct WaitForCancellationFuture<'a> {
63 cancellation_token: &'a CancellationToken,
64 #[pin]
65 future: tokio::sync::futures::Notified<'a>,
66 }
67}
68
69// ===== impl CancellationToken =====
70
71impl core::fmt::Debug for CancellationToken {
72 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
73 f&mut DebugStruct<'_, '_>.debug_struct("CancellationToken")
74 .field(name:"is_cancelled", &self.is_cancelled())
75 .finish()
76 }
77}
78
79impl Clone for CancellationToken {
80 fn clone(&self) -> Self {
81 tree_node::increase_handle_refcount(&self.inner);
82 CancellationToken {
83 inner: self.inner.clone(),
84 }
85 }
86}
87
88impl Drop for CancellationToken {
89 fn drop(&mut self) {
90 tree_node::decrease_handle_refcount(&self.inner);
91 }
92}
93
94impl Default for CancellationToken {
95 fn default() -> CancellationToken {
96 CancellationToken::new()
97 }
98}
99
100impl CancellationToken {
101 /// Creates a new CancellationToken in the non-cancelled state.
102 pub fn new() -> CancellationToken {
103 CancellationToken {
104 inner: Arc::new(tree_node::TreeNode::new()),
105 }
106 }
107
108 /// Creates a `CancellationToken` which will get cancelled whenever the
109 /// current token gets cancelled.
110 ///
111 /// If the current token is already cancelled, the child token will get
112 /// returned in cancelled state.
113 ///
114 /// # Examples
115 ///
116 /// ```no_run
117 /// use tokio::select;
118 /// use tokio_util::sync::CancellationToken;
119 ///
120 /// #[tokio::main]
121 /// async fn main() {
122 /// let token = CancellationToken::new();
123 /// let child_token = token.child_token();
124 ///
125 /// let join_handle = tokio::spawn(async move {
126 /// // Wait for either cancellation or a very long time
127 /// select! {
128 /// _ = child_token.cancelled() => {
129 /// // The token was cancelled
130 /// 5
131 /// }
132 /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
133 /// 99
134 /// }
135 /// }
136 /// });
137 ///
138 /// tokio::spawn(async move {
139 /// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
140 /// token.cancel();
141 /// });
142 ///
143 /// assert_eq!(5, join_handle.await.unwrap());
144 /// }
145 /// ```
146 pub fn child_token(&self) -> CancellationToken {
147 CancellationToken {
148 inner: tree_node::child_node(&self.inner),
149 }
150 }
151
152 /// Cancel the [`CancellationToken`] and all child tokens which had been
153 /// derived from it.
154 ///
155 /// This will wake up all tasks which are waiting for cancellation.
156 ///
157 /// Be aware that cancellation is not an atomic operation. It is possible
158 /// for another thread running in parallel with a call to `cancel` to first
159 /// receive `true` from `is_cancelled` on one child node, and then receive
160 /// `false` from `is_cancelled` on another child node. However, once the
161 /// call to `cancel` returns, all child nodes have been fully cancelled.
162 pub fn cancel(&self) {
163 tree_node::cancel(&self.inner);
164 }
165
166 /// Returns `true` if the `CancellationToken` is cancelled.
167 pub fn is_cancelled(&self) -> bool {
168 tree_node::is_cancelled(&self.inner)
169 }
170
171 /// Returns a `Future` that gets fulfilled when cancellation is requested.
172 ///
173 /// The future will complete immediately if the token is already cancelled
174 /// when this method is called.
175 ///
176 /// # Cancel safety
177 ///
178 /// This method is cancel safe.
179 pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
180 WaitForCancellationFuture {
181 cancellation_token: self,
182 future: self.inner.notified(),
183 }
184 }
185
186 /// Creates a `DropGuard` for this token.
187 ///
188 /// Returned guard will cancel this token (and all its children) on drop
189 /// unless disarmed.
190 pub fn drop_guard(self) -> DropGuard {
191 DropGuard { inner: Some(self) }
192 }
193}
194
195// ===== impl WaitForCancellationFuture =====
196
197impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
198 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
199 f.debug_struct(name:"WaitForCancellationFuture").finish()
200 }
201}
202
203impl<'a> Future for WaitForCancellationFuture<'a> {
204 type Output = ();
205
206 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
207 let mut this: Projection<'_, '_> = self.project();
208 loop {
209 if this.cancellation_token.is_cancelled() {
210 return Poll::Ready(());
211 }
212
213 // No wakeups can be lost here because there is always a call to
214 // `is_cancelled` between the creation of the future and the call to
215 // `poll`, and the code that sets the cancelled flag does so before
216 // waking the `Notified`.
217 if this.future.as_mut().poll(cx).is_pending() {
218 return Poll::Pending;
219 }
220
221 this.future.set(this.cancellation_token.inner.notified());
222 }
223 }
224}
225