1 | #![cfg_attr (not(feature = "full" ), allow(dead_code))] |
2 | |
3 | //! Yield points for improved cooperative scheduling. |
4 | //! |
5 | //! Documentation for this can be found in the [`tokio::task`] module. |
6 | //! |
7 | //! [`tokio::task`]: crate::task. |
8 | |
9 | // ```ignore |
10 | // # use tokio_stream::{Stream, StreamExt}; |
11 | // async fn drop_all<I: Stream + Unpin>(mut input: I) { |
12 | // while let Some(_) = input.next().await { |
13 | // tokio::coop::proceed().await; |
14 | // } |
15 | // } |
16 | // ``` |
17 | // |
18 | // The `proceed` future will coordinate with the executor to make sure that |
19 | // every so often control is yielded back to the executor so it can run other |
20 | // tasks. |
21 | // |
22 | // # Placing yield points |
23 | // |
24 | // Voluntary yield points should be placed _after_ at least some work has been |
25 | // done. If they are not, a future sufficiently deep in the task hierarchy may |
26 | // end up _never_ getting to run because of the number of yield points that |
27 | // inevitably appear before it is reached. In general, you will want yield |
28 | // points to only appear in "leaf" futures -- those that do not themselves poll |
29 | // other futures. By doing this, you avoid double-counting each iteration of |
30 | // the outer future against the cooperating budget. |
31 | |
32 | use crate::runtime::context; |
33 | |
34 | /// Opaque type tracking the amount of "work" a task may still do before |
35 | /// yielding back to the scheduler. |
36 | #[derive (Debug, Copy, Clone)] |
37 | pub(crate) struct Budget(Option<u8>); |
38 | |
39 | pub(crate) struct BudgetDecrement { |
40 | success: bool, |
41 | hit_zero: bool, |
42 | } |
43 | |
44 | impl Budget { |
45 | /// Budget assigned to a task on each poll. |
46 | /// |
47 | /// The value itself is chosen somewhat arbitrarily. It needs to be high |
48 | /// enough to amortize wakeup and scheduling costs, but low enough that we |
49 | /// do not starve other tasks for too long. The value also needs to be high |
50 | /// enough that particularly deep tasks are able to do at least some useful |
51 | /// work at all. |
52 | /// |
53 | /// Note that as more yield points are added in the ecosystem, this value |
54 | /// will probably also have to be raised. |
55 | const fn initial() -> Budget { |
56 | Budget(Some(128)) |
57 | } |
58 | |
59 | /// Returns an unconstrained budget. Operations will not be limited. |
60 | pub(super) const fn unconstrained() -> Budget { |
61 | Budget(None) |
62 | } |
63 | |
64 | fn has_remaining(self) -> bool { |
65 | self.0.map_or(default:true, |budget: u8| budget > 0) |
66 | } |
67 | } |
68 | |
69 | /// Runs the given closure with a cooperative task budget. When the function |
70 | /// returns, the budget is reset to the value prior to calling the function. |
71 | #[inline (always)] |
72 | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { |
73 | with_budget(Budget::initial(), f) |
74 | } |
75 | |
76 | /// Runs the given closure with an unconstrained task budget. When the function returns, the budget |
77 | /// is reset to the value prior to calling the function. |
78 | #[inline (always)] |
79 | pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R { |
80 | with_budget(Budget::unconstrained(), f) |
81 | } |
82 | |
83 | #[inline (always)] |
84 | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { |
85 | struct ResetGuard { |
86 | prev: Budget, |
87 | } |
88 | |
89 | impl Drop for ResetGuard { |
90 | fn drop(&mut self) { |
91 | let _ = context::budget(|cell| { |
92 | cell.set(self.prev); |
93 | }); |
94 | } |
95 | } |
96 | |
97 | #[allow (unused_variables)] |
98 | let maybe_guard = context::budget(|cell| { |
99 | let prev = cell.get(); |
100 | cell.set(budget); |
101 | |
102 | ResetGuard { prev } |
103 | }); |
104 | |
105 | // The function is called regardless even if the budget is not successfully |
106 | // set due to the thread-local being destroyed. |
107 | f() |
108 | } |
109 | |
110 | #[inline (always)] |
111 | pub(crate) fn has_budget_remaining() -> bool { |
112 | // If the current budget cannot be accessed due to the thread-local being |
113 | // shutdown, then we assume there is budget remaining. |
114 | context::budget(|cell| cell.get().has_remaining()).unwrap_or(default:true) |
115 | } |
116 | |
117 | cfg_rt_multi_thread! { |
118 | /// Sets the current task's budget. |
119 | pub(crate) fn set(budget: Budget) { |
120 | let _ = context::budget(|cell| cell.set(budget)); |
121 | } |
122 | } |
123 | |
124 | cfg_rt! { |
125 | /// Forcibly removes the budgeting constraints early. |
126 | /// |
127 | /// Returns the remaining budget |
128 | pub(crate) fn stop() -> Budget { |
129 | context::budget(|cell| { |
130 | let prev = cell.get(); |
131 | cell.set(Budget::unconstrained()); |
132 | prev |
133 | }).unwrap_or(Budget::unconstrained()) |
134 | } |
135 | } |
136 | |
137 | cfg_coop! { |
138 | use pin_project_lite::pin_project; |
139 | use std::cell::Cell; |
140 | use std::future::Future; |
141 | use std::pin::Pin; |
142 | use std::task::{ready, Context, Poll}; |
143 | |
144 | #[must_use ] |
145 | pub(crate) struct RestoreOnPending(Cell<Budget>); |
146 | |
147 | impl RestoreOnPending { |
148 | pub(crate) fn made_progress(&self) { |
149 | self.0.set(Budget::unconstrained()); |
150 | } |
151 | } |
152 | |
153 | impl Drop for RestoreOnPending { |
154 | fn drop(&mut self) { |
155 | // Don't reset if budget was unconstrained or if we made progress. |
156 | // They are both represented as the remembered budget being unconstrained. |
157 | let budget = self.0.get(); |
158 | if !budget.is_unconstrained() { |
159 | let _ = context::budget(|cell| { |
160 | cell.set(budget); |
161 | }); |
162 | } |
163 | } |
164 | } |
165 | |
166 | /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. |
167 | /// |
168 | /// When you call this method, the current budget is decremented. However, to ensure that |
169 | /// progress is made every time a task is polled, the budget is automatically restored to its |
170 | /// former value if the returned `RestoreOnPending` is dropped. It is the caller's |
171 | /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure |
172 | /// that the budget empties appropriately. |
173 | /// |
174 | /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**. |
175 | /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and |
176 | /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates |
177 | /// that progress was made. |
178 | #[inline ] |
179 | pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { |
180 | context::budget(|cell| { |
181 | let mut budget = cell.get(); |
182 | |
183 | let decrement = budget.decrement(); |
184 | |
185 | if decrement.success { |
186 | let restore = RestoreOnPending(Cell::new(cell.get())); |
187 | cell.set(budget); |
188 | |
189 | // avoid double counting |
190 | if decrement.hit_zero { |
191 | inc_budget_forced_yield_count(); |
192 | } |
193 | |
194 | Poll::Ready(restore) |
195 | } else { |
196 | cx.waker().wake_by_ref(); |
197 | Poll::Pending |
198 | } |
199 | }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) |
200 | } |
201 | |
202 | cfg_rt! { |
203 | cfg_unstable_metrics! { |
204 | #[inline(always)] |
205 | fn inc_budget_forced_yield_count() { |
206 | let _ = context::with_current(|handle| { |
207 | handle.scheduler_metrics().inc_budget_forced_yield_count(); |
208 | }); |
209 | } |
210 | } |
211 | |
212 | cfg_not_unstable_metrics! { |
213 | #[inline(always)] |
214 | fn inc_budget_forced_yield_count() {} |
215 | } |
216 | } |
217 | |
218 | cfg_not_rt! { |
219 | #[inline(always)] |
220 | fn inc_budget_forced_yield_count() {} |
221 | } |
222 | |
223 | impl Budget { |
224 | /// Decrements the budget. Returns `true` if successful. Decrementing fails |
225 | /// when there is not enough remaining budget. |
226 | fn decrement(&mut self) -> BudgetDecrement { |
227 | if let Some(num) = &mut self.0 { |
228 | if *num > 0 { |
229 | *num -= 1; |
230 | |
231 | let hit_zero = *num == 0; |
232 | |
233 | BudgetDecrement { success: true, hit_zero } |
234 | } else { |
235 | BudgetDecrement { success: false, hit_zero: false } |
236 | } |
237 | } else { |
238 | BudgetDecrement { success: true, hit_zero: false } |
239 | } |
240 | } |
241 | |
242 | fn is_unconstrained(self) -> bool { |
243 | self.0.is_none() |
244 | } |
245 | } |
246 | |
247 | pin_project! { |
248 | /// Future wrapper to ensure cooperative scheduling. |
249 | /// |
250 | /// When being polled `poll_proceed` is called before the inner future is polled to check |
251 | /// if the inner future has exceeded its budget. If the inner future resolves, this will |
252 | /// automatically call `RestoreOnPending::made_progress` before resolving this future with |
253 | /// the result of the inner one. If polling the inner future is pending, polling this future |
254 | /// type will also return a `Poll::Pending`. |
255 | #[must_use = "futures do nothing unless polled" ] |
256 | pub(crate) struct Coop<F: Future> { |
257 | #[pin] |
258 | pub(crate) fut: F, |
259 | } |
260 | } |
261 | |
262 | impl<F: Future> Future for Coop<F> { |
263 | type Output = F::Output; |
264 | |
265 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
266 | let coop = ready!(poll_proceed(cx)); |
267 | let me = self.project(); |
268 | if let Poll::Ready(ret) = me.fut.poll(cx) { |
269 | coop.made_progress(); |
270 | Poll::Ready(ret) |
271 | } else { |
272 | Poll::Pending |
273 | } |
274 | } |
275 | } |
276 | |
277 | /// Run a future with a budget constraint for cooperative scheduling. |
278 | /// If the future exceeds its budget while being polled, control is yielded back to the |
279 | /// runtime. |
280 | #[inline ] |
281 | pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> { |
282 | Coop { fut } |
283 | } |
284 | } |
285 | |
286 | #[cfg (all(test, not(loom)))] |
287 | mod test { |
288 | use super::*; |
289 | |
290 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
291 | use wasm_bindgen_test::wasm_bindgen_test as test; |
292 | |
293 | fn get() -> Budget { |
294 | context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained()) |
295 | } |
296 | |
297 | #[test ] |
298 | fn budgeting() { |
299 | use std::future::poll_fn; |
300 | use tokio_test::*; |
301 | |
302 | assert!(get().0.is_none()); |
303 | |
304 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
305 | |
306 | assert!(get().0.is_none()); |
307 | drop(coop); |
308 | assert!(get().0.is_none()); |
309 | |
310 | budget(|| { |
311 | assert_eq!(get().0, Budget::initial().0); |
312 | |
313 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
314 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
315 | drop(coop); |
316 | // we didn't make progress |
317 | assert_eq!(get().0, Budget::initial().0); |
318 | |
319 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
320 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
321 | coop.made_progress(); |
322 | drop(coop); |
323 | // we _did_ make progress |
324 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
325 | |
326 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
327 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
328 | coop.made_progress(); |
329 | drop(coop); |
330 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
331 | |
332 | budget(|| { |
333 | assert_eq!(get().0, Budget::initial().0); |
334 | |
335 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
336 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
337 | coop.made_progress(); |
338 | drop(coop); |
339 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
340 | }); |
341 | |
342 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
343 | }); |
344 | |
345 | assert!(get().0.is_none()); |
346 | |
347 | budget(|| { |
348 | let n = get().0.unwrap(); |
349 | |
350 | for _ in 0..n { |
351 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
352 | coop.made_progress(); |
353 | } |
354 | |
355 | let mut task = task::spawn(poll_fn(|cx| { |
356 | let coop = std::task::ready!(poll_proceed(cx)); |
357 | coop.made_progress(); |
358 | Poll::Ready(()) |
359 | })); |
360 | |
361 | assert_pending!(task.poll()); |
362 | }); |
363 | } |
364 | } |
365 | |