1 | //! An asynchronously awaitable `CancellationToken`. |
2 | //! The token allows to signal a cancellation request to one or more tasks. |
3 | pub(crate) mod guard; |
4 | mod tree_node; |
5 | |
6 | use crate::loom::sync::Arc; |
7 | use core::future::Future; |
8 | use core::pin::Pin; |
9 | use core::task::{Context, Poll}; |
10 | |
11 | use guard::DropGuard; |
12 | use pin_project_lite::pin_project; |
13 | |
14 | /// A token which can be used to signal a cancellation request to one or more |
15 | /// tasks. |
16 | /// |
17 | /// Tasks can call [`CancellationToken::cancelled()`] in order to |
18 | /// obtain a Future which will be resolved when cancellation is requested. |
19 | /// |
20 | /// Cancellation can be requested through the [`CancellationToken::cancel`] method. |
21 | /// |
22 | /// # Examples |
23 | /// |
24 | /// ```no_run |
25 | /// use tokio::select; |
26 | /// use tokio_util::sync::CancellationToken; |
27 | /// |
28 | /// #[tokio::main] |
29 | /// async fn main() { |
30 | /// let token = CancellationToken::new(); |
31 | /// let cloned_token = token.clone(); |
32 | /// |
33 | /// let join_handle = tokio::spawn(async move { |
34 | /// // Wait for either cancellation or a very long time |
35 | /// select! { |
36 | /// _ = cloned_token.cancelled() => { |
37 | /// // The token was cancelled |
38 | /// 5 |
39 | /// } |
40 | /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { |
41 | /// 99 |
42 | /// } |
43 | /// } |
44 | /// }); |
45 | /// |
46 | /// tokio::spawn(async move { |
47 | /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; |
48 | /// token.cancel(); |
49 | /// }); |
50 | /// |
51 | /// assert_eq!(5, join_handle.await.unwrap()); |
52 | /// } |
53 | /// ``` |
54 | pub struct CancellationToken { |
55 | inner: Arc<tree_node::TreeNode>, |
56 | } |
57 | |
58 | pin_project! { |
59 | /// A Future that is resolved once the corresponding [`CancellationToken`] |
60 | /// is cancelled. |
61 | #[must_use = "futures do nothing unless polled" ] |
62 | pub struct WaitForCancellationFuture<'a> { |
63 | cancellation_token: &'a CancellationToken, |
64 | #[pin] |
65 | future: tokio::sync::futures::Notified<'a>, |
66 | } |
67 | } |
68 | |
69 | // ===== impl CancellationToken ===== |
70 | |
71 | impl core::fmt::Debug for CancellationToken { |
72 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
73 | f&mut DebugStruct<'_, '_>.debug_struct("CancellationToken" ) |
74 | .field(name:"is_cancelled" , &self.is_cancelled()) |
75 | .finish() |
76 | } |
77 | } |
78 | |
79 | impl Clone for CancellationToken { |
80 | fn clone(&self) -> Self { |
81 | tree_node::increase_handle_refcount(&self.inner); |
82 | CancellationToken { |
83 | inner: self.inner.clone(), |
84 | } |
85 | } |
86 | } |
87 | |
88 | impl Drop for CancellationToken { |
89 | fn drop(&mut self) { |
90 | tree_node::decrease_handle_refcount(&self.inner); |
91 | } |
92 | } |
93 | |
94 | impl Default for CancellationToken { |
95 | fn default() -> CancellationToken { |
96 | CancellationToken::new() |
97 | } |
98 | } |
99 | |
100 | impl CancellationToken { |
101 | /// Creates a new CancellationToken in the non-cancelled state. |
102 | pub fn new() -> CancellationToken { |
103 | CancellationToken { |
104 | inner: Arc::new(tree_node::TreeNode::new()), |
105 | } |
106 | } |
107 | |
108 | /// Creates a `CancellationToken` which will get cancelled whenever the |
109 | /// current token gets cancelled. |
110 | /// |
111 | /// If the current token is already cancelled, the child token will get |
112 | /// returned in cancelled state. |
113 | /// |
114 | /// # Examples |
115 | /// |
116 | /// ```no_run |
117 | /// use tokio::select; |
118 | /// use tokio_util::sync::CancellationToken; |
119 | /// |
120 | /// #[tokio::main] |
121 | /// async fn main() { |
122 | /// let token = CancellationToken::new(); |
123 | /// let child_token = token.child_token(); |
124 | /// |
125 | /// let join_handle = tokio::spawn(async move { |
126 | /// // Wait for either cancellation or a very long time |
127 | /// select! { |
128 | /// _ = child_token.cancelled() => { |
129 | /// // The token was cancelled |
130 | /// 5 |
131 | /// } |
132 | /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { |
133 | /// 99 |
134 | /// } |
135 | /// } |
136 | /// }); |
137 | /// |
138 | /// tokio::spawn(async move { |
139 | /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; |
140 | /// token.cancel(); |
141 | /// }); |
142 | /// |
143 | /// assert_eq!(5, join_handle.await.unwrap()); |
144 | /// } |
145 | /// ``` |
146 | pub fn child_token(&self) -> CancellationToken { |
147 | CancellationToken { |
148 | inner: tree_node::child_node(&self.inner), |
149 | } |
150 | } |
151 | |
152 | /// Cancel the [`CancellationToken`] and all child tokens which had been |
153 | /// derived from it. |
154 | /// |
155 | /// This will wake up all tasks which are waiting for cancellation. |
156 | /// |
157 | /// Be aware that cancellation is not an atomic operation. It is possible |
158 | /// for another thread running in parallel with a call to `cancel` to first |
159 | /// receive `true` from `is_cancelled` on one child node, and then receive |
160 | /// `false` from `is_cancelled` on another child node. However, once the |
161 | /// call to `cancel` returns, all child nodes have been fully cancelled. |
162 | pub fn cancel(&self) { |
163 | tree_node::cancel(&self.inner); |
164 | } |
165 | |
166 | /// Returns `true` if the `CancellationToken` is cancelled. |
167 | pub fn is_cancelled(&self) -> bool { |
168 | tree_node::is_cancelled(&self.inner) |
169 | } |
170 | |
171 | /// Returns a `Future` that gets fulfilled when cancellation is requested. |
172 | /// |
173 | /// The future will complete immediately if the token is already cancelled |
174 | /// when this method is called. |
175 | /// |
176 | /// # Cancel safety |
177 | /// |
178 | /// This method is cancel safe. |
179 | pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { |
180 | WaitForCancellationFuture { |
181 | cancellation_token: self, |
182 | future: self.inner.notified(), |
183 | } |
184 | } |
185 | |
186 | /// Creates a `DropGuard` for this token. |
187 | /// |
188 | /// Returned guard will cancel this token (and all its children) on drop |
189 | /// unless disarmed. |
190 | pub fn drop_guard(self) -> DropGuard { |
191 | DropGuard { inner: Some(self) } |
192 | } |
193 | } |
194 | |
195 | // ===== impl WaitForCancellationFuture ===== |
196 | |
197 | impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> { |
198 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
199 | f.debug_struct(name:"WaitForCancellationFuture" ).finish() |
200 | } |
201 | } |
202 | |
203 | impl<'a> Future for WaitForCancellationFuture<'a> { |
204 | type Output = (); |
205 | |
206 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
207 | let mut this: Projection<'_, '_> = self.project(); |
208 | loop { |
209 | if this.cancellation_token.is_cancelled() { |
210 | return Poll::Ready(()); |
211 | } |
212 | |
213 | // No wakeups can be lost here because there is always a call to |
214 | // `is_cancelled` between the creation of the future and the call to |
215 | // `poll`, and the code that sets the cancelled flag does so before |
216 | // waking the `Notified`. |
217 | if this.future.as_mut().poll(cx).is_pending() { |
218 | return Poll::Pending; |
219 | } |
220 | |
221 | this.future.set(this.cancellation_token.inner.notified()); |
222 | } |
223 | } |
224 | } |
225 | |