1 | //! An asynchronously awaitable `CancellationToken`. |
2 | //! The token allows to signal a cancellation request to one or more tasks. |
3 | pub(crate) mod guard; |
4 | mod tree_node; |
5 | |
6 | use crate::loom::sync::Arc; |
7 | use crate::util::MaybeDangling; |
8 | use core::future::Future; |
9 | use core::pin::Pin; |
10 | use core::task::{Context, Poll}; |
11 | |
12 | use guard::DropGuard; |
13 | use pin_project_lite::pin_project; |
14 | |
15 | /// A token which can be used to signal a cancellation request to one or more |
16 | /// tasks. |
17 | /// |
18 | /// Tasks can call [`CancellationToken::cancelled()`] in order to |
19 | /// obtain a Future which will be resolved when cancellation is requested. |
20 | /// |
21 | /// Cancellation can be requested through the [`CancellationToken::cancel`] method. |
22 | /// |
23 | /// # Examples |
24 | /// |
25 | /// ```no_run |
26 | /// use tokio::select; |
27 | /// use tokio_util::sync::CancellationToken; |
28 | /// |
29 | /// #[tokio::main] |
30 | /// async fn main() { |
31 | /// let token = CancellationToken::new(); |
32 | /// let cloned_token = token.clone(); |
33 | /// |
34 | /// let join_handle = tokio::spawn(async move { |
35 | /// // Wait for either cancellation or a very long time |
36 | /// select! { |
37 | /// _ = cloned_token.cancelled() => { |
38 | /// // The token was cancelled |
39 | /// 5 |
40 | /// } |
41 | /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { |
42 | /// 99 |
43 | /// } |
44 | /// } |
45 | /// }); |
46 | /// |
47 | /// tokio::spawn(async move { |
48 | /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; |
49 | /// token.cancel(); |
50 | /// }); |
51 | /// |
52 | /// assert_eq!(5, join_handle.await.unwrap()); |
53 | /// } |
54 | /// ``` |
55 | pub struct CancellationToken { |
56 | inner: Arc<tree_node::TreeNode>, |
57 | } |
58 | |
59 | impl std::panic::UnwindSafe for CancellationToken {} |
60 | impl std::panic::RefUnwindSafe for CancellationToken {} |
61 | |
62 | pin_project! { |
63 | /// A Future that is resolved once the corresponding [`CancellationToken`] |
64 | /// is cancelled. |
65 | #[must_use = "futures do nothing unless polled" ] |
66 | pub struct WaitForCancellationFuture<'a> { |
67 | cancellation_token: &'a CancellationToken, |
68 | #[pin] |
69 | future: tokio::sync::futures::Notified<'a>, |
70 | } |
71 | } |
72 | |
73 | pin_project! { |
74 | /// A Future that is resolved once the corresponding [`CancellationToken`] |
75 | /// is cancelled. |
76 | /// |
77 | /// This is the counterpart to [`WaitForCancellationFuture`] that takes |
78 | /// [`CancellationToken`] by value instead of using a reference. |
79 | #[must_use = "futures do nothing unless polled" ] |
80 | pub struct WaitForCancellationFutureOwned { |
81 | // This field internally has a reference to the cancellation token, but camouflages |
82 | // the relationship with `'static`. To avoid Undefined Behavior, we must ensure |
83 | // that the reference is only used while the cancellation token is still alive. To |
84 | // do that, we ensure that the future is the first field, so that it is dropped |
85 | // before the cancellation token. |
86 | // |
87 | // We use `MaybeDanglingFuture` here because without it, the compiler could assert |
88 | // the reference inside `future` to be valid even after the destructor of that |
89 | // field runs. (Specifically, when the `WaitForCancellationFutureOwned` is passed |
90 | // as an argument to a function, the reference can be asserted to be valid for the |
91 | // rest of that function.) To avoid that, we use `MaybeDangling` which tells the |
92 | // compiler that the reference stored inside it might not be valid. |
93 | // |
94 | // See <https://users.rust-lang.org/t/unsafe-code-review-semi-owning-weak-rwlock-t-guard/95706> |
95 | // for more info. |
96 | #[pin] |
97 | future: MaybeDangling<tokio::sync::futures::Notified<'static>>, |
98 | cancellation_token: CancellationToken, |
99 | } |
100 | } |
101 | |
102 | // ===== impl CancellationToken ===== |
103 | |
104 | impl core::fmt::Debug for CancellationToken { |
105 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
106 | f.debug_struct("CancellationToken" ) |
107 | .field("is_cancelled" , &self.is_cancelled()) |
108 | .finish() |
109 | } |
110 | } |
111 | |
112 | impl Clone for CancellationToken { |
113 | /// Creates a clone of the `CancellationToken` which will get cancelled |
114 | /// whenever the current token gets cancelled, and vice versa. |
115 | fn clone(&self) -> Self { |
116 | tree_node::increase_handle_refcount(&self.inner); |
117 | CancellationToken { |
118 | inner: self.inner.clone(), |
119 | } |
120 | } |
121 | } |
122 | |
123 | impl Drop for CancellationToken { |
124 | fn drop(&mut self) { |
125 | tree_node::decrease_handle_refcount(&self.inner); |
126 | } |
127 | } |
128 | |
129 | impl Default for CancellationToken { |
130 | fn default() -> CancellationToken { |
131 | CancellationToken::new() |
132 | } |
133 | } |
134 | |
135 | impl CancellationToken { |
136 | /// Creates a new `CancellationToken` in the non-cancelled state. |
137 | pub fn new() -> CancellationToken { |
138 | CancellationToken { |
139 | inner: Arc::new(tree_node::TreeNode::new()), |
140 | } |
141 | } |
142 | |
143 | /// Creates a `CancellationToken` which will get cancelled whenever the |
144 | /// current token gets cancelled. Unlike a cloned `CancellationToken`, |
145 | /// cancelling a child token does not cancel the parent token. |
146 | /// |
147 | /// If the current token is already cancelled, the child token will get |
148 | /// returned in cancelled state. |
149 | /// |
150 | /// # Examples |
151 | /// |
152 | /// ```no_run |
153 | /// use tokio::select; |
154 | /// use tokio_util::sync::CancellationToken; |
155 | /// |
156 | /// #[tokio::main] |
157 | /// async fn main() { |
158 | /// let token = CancellationToken::new(); |
159 | /// let child_token = token.child_token(); |
160 | /// |
161 | /// let join_handle = tokio::spawn(async move { |
162 | /// // Wait for either cancellation or a very long time |
163 | /// select! { |
164 | /// _ = child_token.cancelled() => { |
165 | /// // The token was cancelled |
166 | /// 5 |
167 | /// } |
168 | /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { |
169 | /// 99 |
170 | /// } |
171 | /// } |
172 | /// }); |
173 | /// |
174 | /// tokio::spawn(async move { |
175 | /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; |
176 | /// token.cancel(); |
177 | /// }); |
178 | /// |
179 | /// assert_eq!(5, join_handle.await.unwrap()); |
180 | /// } |
181 | /// ``` |
182 | pub fn child_token(&self) -> CancellationToken { |
183 | CancellationToken { |
184 | inner: tree_node::child_node(&self.inner), |
185 | } |
186 | } |
187 | |
188 | /// Cancel the [`CancellationToken`] and all child tokens which had been |
189 | /// derived from it. |
190 | /// |
191 | /// This will wake up all tasks which are waiting for cancellation. |
192 | /// |
193 | /// Be aware that cancellation is not an atomic operation. It is possible |
194 | /// for another thread running in parallel with a call to `cancel` to first |
195 | /// receive `true` from `is_cancelled` on one child node, and then receive |
196 | /// `false` from `is_cancelled` on another child node. However, once the |
197 | /// call to `cancel` returns, all child nodes have been fully cancelled. |
198 | pub fn cancel(&self) { |
199 | tree_node::cancel(&self.inner); |
200 | } |
201 | |
202 | /// Returns `true` if the `CancellationToken` is cancelled. |
203 | pub fn is_cancelled(&self) -> bool { |
204 | tree_node::is_cancelled(&self.inner) |
205 | } |
206 | |
207 | /// Returns a `Future` that gets fulfilled when cancellation is requested. |
208 | /// |
209 | /// The future will complete immediately if the token is already cancelled |
210 | /// when this method is called. |
211 | /// |
212 | /// # Cancel safety |
213 | /// |
214 | /// This method is cancel safe. |
215 | pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { |
216 | WaitForCancellationFuture { |
217 | cancellation_token: self, |
218 | future: self.inner.notified(), |
219 | } |
220 | } |
221 | |
222 | /// Returns a `Future` that gets fulfilled when cancellation is requested. |
223 | /// |
224 | /// The future will complete immediately if the token is already cancelled |
225 | /// when this method is called. |
226 | /// |
227 | /// The function takes self by value and returns a future that owns the |
228 | /// token. |
229 | /// |
230 | /// # Cancel safety |
231 | /// |
232 | /// This method is cancel safe. |
233 | pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned { |
234 | WaitForCancellationFutureOwned::new(self) |
235 | } |
236 | |
237 | /// Creates a `DropGuard` for this token. |
238 | /// |
239 | /// Returned guard will cancel this token (and all its children) on drop |
240 | /// unless disarmed. |
241 | pub fn drop_guard(self) -> DropGuard { |
242 | DropGuard { inner: Some(self) } |
243 | } |
244 | } |
245 | |
246 | // ===== impl WaitForCancellationFuture ===== |
247 | |
248 | impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> { |
249 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
250 | f.debug_struct("WaitForCancellationFuture" ).finish() |
251 | } |
252 | } |
253 | |
254 | impl<'a> Future for WaitForCancellationFuture<'a> { |
255 | type Output = (); |
256 | |
257 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
258 | let mut this = self.project(); |
259 | loop { |
260 | if this.cancellation_token.is_cancelled() { |
261 | return Poll::Ready(()); |
262 | } |
263 | |
264 | // No wakeups can be lost here because there is always a call to |
265 | // `is_cancelled` between the creation of the future and the call to |
266 | // `poll`, and the code that sets the cancelled flag does so before |
267 | // waking the `Notified`. |
268 | if this.future.as_mut().poll(cx).is_pending() { |
269 | return Poll::Pending; |
270 | } |
271 | |
272 | this.future.set(this.cancellation_token.inner.notified()); |
273 | } |
274 | } |
275 | } |
276 | |
277 | // ===== impl WaitForCancellationFutureOwned ===== |
278 | |
279 | impl core::fmt::Debug for WaitForCancellationFutureOwned { |
280 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
281 | f.debug_struct("WaitForCancellationFutureOwned" ).finish() |
282 | } |
283 | } |
284 | |
285 | impl WaitForCancellationFutureOwned { |
286 | fn new(cancellation_token: CancellationToken) -> Self { |
287 | WaitForCancellationFutureOwned { |
288 | // cancellation_token holds a heap allocation and is guaranteed to have a |
289 | // stable deref, thus it would be ok to move the cancellation_token while |
290 | // the future holds a reference to it. |
291 | // |
292 | // # Safety |
293 | // |
294 | // cancellation_token is dropped after future due to the field ordering. |
295 | future: MaybeDangling::new(unsafe { Self::new_future(&cancellation_token) }), |
296 | cancellation_token, |
297 | } |
298 | } |
299 | |
300 | /// # Safety |
301 | /// The returned future must be destroyed before the cancellation token is |
302 | /// destroyed. |
303 | unsafe fn new_future( |
304 | cancellation_token: &CancellationToken, |
305 | ) -> tokio::sync::futures::Notified<'static> { |
306 | let inner_ptr = Arc::as_ptr(&cancellation_token.inner); |
307 | // SAFETY: The `Arc::as_ptr` method guarantees that `inner_ptr` remains |
308 | // valid until the strong count of the Arc drops to zero, and the caller |
309 | // guarantees that they will drop the future before that happens. |
310 | (*inner_ptr).notified() |
311 | } |
312 | } |
313 | |
314 | impl Future for WaitForCancellationFutureOwned { |
315 | type Output = (); |
316 | |
317 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
318 | let mut this = self.project(); |
319 | |
320 | loop { |
321 | if this.cancellation_token.is_cancelled() { |
322 | return Poll::Ready(()); |
323 | } |
324 | |
325 | // No wakeups can be lost here because there is always a call to |
326 | // `is_cancelled` between the creation of the future and the call to |
327 | // `poll`, and the code that sets the cancelled flag does so before |
328 | // waking the `Notified`. |
329 | if this.future.as_mut().poll(cx).is_pending() { |
330 | return Poll::Pending; |
331 | } |
332 | |
333 | // # Safety |
334 | // |
335 | // cancellation_token is dropped after future due to the field ordering. |
336 | this.future.set(MaybeDangling::new(unsafe { |
337 | Self::new_future(this.cancellation_token) |
338 | })); |
339 | } |
340 | } |
341 | } |
342 | |