1 | #![cfg_attr (not(feature = "full" ), allow(dead_code))] |
2 | #![cfg_attr (not(feature = "rt" ), allow(unreachable_pub))] |
3 | |
4 | //! Utilities for improved cooperative scheduling. |
5 | //! |
6 | //! ### Cooperative scheduling |
7 | //! |
8 | //! A single call to [`poll`] on a top-level task may potentially do a lot of |
9 | //! work before it returns `Poll::Pending`. If a task runs for a long period of |
10 | //! time without yielding back to the executor, it can starve other tasks |
11 | //! waiting on that executor to execute them, or drive underlying resources. |
12 | //! Since Rust does not have a runtime, it is difficult to forcibly preempt a |
13 | //! long-running task. Instead, this module provides an opt-in mechanism for |
14 | //! futures to collaborate with the executor to avoid starvation. |
15 | //! |
16 | //! Consider a future like this one: |
17 | //! |
18 | //! ``` |
19 | //! # use tokio_stream::{Stream, StreamExt}; |
20 | //! async fn drop_all<I: Stream + Unpin>(mut input: I) { |
21 | //! while let Some(_) = input.next().await {} |
22 | //! } |
23 | //! ``` |
24 | //! |
25 | //! It may look harmless, but consider what happens under heavy load if the |
26 | //! input stream is _always_ ready. If we spawn `drop_all`, the task will never |
27 | //! yield, and will starve other tasks and resources on the same executor. |
28 | //! |
29 | //! To account for this, Tokio has explicit yield points in a number of library |
30 | //! functions, which force tasks to return to the executor periodically. |
31 | //! |
32 | //! |
33 | //! #### unconstrained |
34 | //! |
35 | //! If necessary, [`task::unconstrained`] lets you opt a future out of Tokio's cooperative |
36 | //! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to |
37 | //! Tokio. For example: |
38 | //! |
39 | //! ``` |
40 | //! # #[tokio::main] |
41 | //! # async fn main() { |
42 | //! use tokio::{task, sync::mpsc}; |
43 | //! |
44 | //! let fut = async { |
45 | //! let (tx, mut rx) = mpsc::unbounded_channel(); |
46 | //! |
47 | //! for i in 0..1000 { |
48 | //! let _ = tx.send(()); |
49 | //! // This will always be ready. If coop was in effect, this code would be forced to yield |
50 | //! // periodically. However, if left unconstrained, then this code will never yield. |
51 | //! rx.recv().await; |
52 | //! } |
53 | //! }; |
54 | //! |
55 | //! task::coop::unconstrained(fut).await; |
56 | //! # } |
57 | //! ``` |
58 | //! [`poll`]: method@std::future::Future::poll |
59 | //! [`task::unconstrained`]: crate::task::unconstrained() |
60 | |
61 | cfg_rt! { |
62 | mod consume_budget; |
63 | pub use consume_budget::consume_budget; |
64 | |
65 | mod unconstrained; |
66 | pub use unconstrained::{unconstrained, Unconstrained}; |
67 | } |
68 | |
69 | // ```ignore |
70 | // # use tokio_stream::{Stream, StreamExt}; |
71 | // async fn drop_all<I: Stream + Unpin>(mut input: I) { |
72 | // while let Some(_) = input.next().await { |
73 | // tokio::coop::proceed().await; |
74 | // } |
75 | // } |
76 | // ``` |
77 | // |
78 | // The `proceed` future will coordinate with the executor to make sure that |
79 | // every so often control is yielded back to the executor so it can run other |
80 | // tasks. |
81 | // |
82 | // # Placing yield points |
83 | // |
84 | // Voluntary yield points should be placed _after_ at least some work has been |
85 | // done. If they are not, a future sufficiently deep in the task hierarchy may |
86 | // end up _never_ getting to run because of the number of yield points that |
87 | // inevitably appear before it is reached. In general, you will want yield |
88 | // points to only appear in "leaf" futures -- those that do not themselves poll |
89 | // other futures. By doing this, you avoid double-counting each iteration of |
90 | // the outer future against the cooperating budget. |
91 | |
92 | use crate::runtime::context; |
93 | |
94 | /// Opaque type tracking the amount of "work" a task may still do before |
95 | /// yielding back to the scheduler. |
96 | #[derive (Debug, Copy, Clone)] |
97 | pub(crate) struct Budget(Option<u8>); |
98 | |
99 | pub(crate) struct BudgetDecrement { |
100 | success: bool, |
101 | hit_zero: bool, |
102 | } |
103 | |
104 | impl Budget { |
105 | /// Budget assigned to a task on each poll. |
106 | /// |
107 | /// The value itself is chosen somewhat arbitrarily. It needs to be high |
108 | /// enough to amortize wakeup and scheduling costs, but low enough that we |
109 | /// do not starve other tasks for too long. The value also needs to be high |
110 | /// enough that particularly deep tasks are able to do at least some useful |
111 | /// work at all. |
112 | /// |
113 | /// Note that as more yield points are added in the ecosystem, this value |
114 | /// will probably also have to be raised. |
115 | const fn initial() -> Budget { |
116 | Budget(Some(128)) |
117 | } |
118 | |
119 | /// Returns an unconstrained budget. Operations will not be limited. |
120 | pub(crate) const fn unconstrained() -> Budget { |
121 | Budget(None) |
122 | } |
123 | |
124 | fn has_remaining(self) -> bool { |
125 | self.0.map_or(default:true, |budget: u8| budget > 0) |
126 | } |
127 | } |
128 | |
129 | /// Runs the given closure with a cooperative task budget. When the function |
130 | /// returns, the budget is reset to the value prior to calling the function. |
131 | #[inline (always)] |
132 | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { |
133 | with_budget(Budget::initial(), f) |
134 | } |
135 | |
136 | /// Runs the given closure with an unconstrained task budget. When the function returns, the budget |
137 | /// is reset to the value prior to calling the function. |
138 | #[inline (always)] |
139 | pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R { |
140 | with_budget(Budget::unconstrained(), f) |
141 | } |
142 | |
143 | #[inline (always)] |
144 | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { |
145 | struct ResetGuard { |
146 | prev: Budget, |
147 | } |
148 | |
149 | impl Drop for ResetGuard { |
150 | fn drop(&mut self) { |
151 | let _ = context::budget(|cell| { |
152 | cell.set(self.prev); |
153 | }); |
154 | } |
155 | } |
156 | |
157 | #[allow (unused_variables)] |
158 | let maybe_guard = context::budget(|cell| { |
159 | let prev = cell.get(); |
160 | cell.set(budget); |
161 | |
162 | ResetGuard { prev } |
163 | }); |
164 | |
165 | // The function is called regardless even if the budget is not successfully |
166 | // set due to the thread-local being destroyed. |
167 | f() |
168 | } |
169 | |
170 | /// Returns `true` if there is still budget left on the task. |
171 | /// |
172 | /// # Examples |
173 | /// |
174 | /// This example defines a `Timeout` future that requires a given `future` to complete before the |
175 | /// specified duration elapses. If it does, its result is returned; otherwise, an error is returned |
176 | /// and the future is canceled. |
177 | /// |
178 | /// Note that the future could exhaust the budget before we evaluate the timeout. Using `has_budget_remaining`, |
179 | /// we can detect this scenario and ensure the timeout is always checked. |
180 | /// |
181 | /// ``` |
182 | /// # use std::future::Future; |
183 | /// # use std::pin::{pin, Pin}; |
184 | /// # use std::task::{ready, Context, Poll}; |
185 | /// # use tokio::task::coop; |
186 | /// # use tokio::time::Sleep; |
187 | /// pub struct Timeout<T> { |
188 | /// future: T, |
189 | /// delay: Pin<Box<Sleep>>, |
190 | /// } |
191 | /// |
192 | /// impl<T> Future for Timeout<T> |
193 | /// where |
194 | /// T: Future + Unpin, |
195 | /// { |
196 | /// type Output = Result<T::Output, ()>; |
197 | /// |
198 | /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
199 | /// let this = Pin::into_inner(self); |
200 | /// let future = Pin::new(&mut this.future); |
201 | /// let delay = Pin::new(&mut this.delay); |
202 | /// |
203 | /// // check if the future is ready |
204 | /// let had_budget_before = coop::has_budget_remaining(); |
205 | /// if let Poll::Ready(v) = future.poll(cx) { |
206 | /// return Poll::Ready(Ok(v)); |
207 | /// } |
208 | /// let has_budget_now = coop::has_budget_remaining(); |
209 | /// |
210 | /// // evaluate the timeout |
211 | /// if let (true, false) = (had_budget_before, has_budget_now) { |
212 | /// // it is the underlying future that exhausted the budget |
213 | /// ready!(pin!(coop::unconstrained(delay)).poll(cx)); |
214 | /// } else { |
215 | /// ready!(delay.poll(cx)); |
216 | /// } |
217 | /// return Poll::Ready(Err(())); |
218 | /// } |
219 | /// } |
220 | ///``` |
221 | #[inline (always)] |
222 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
223 | pub fn has_budget_remaining() -> bool { |
224 | // If the current budget cannot be accessed due to the thread-local being |
225 | // shutdown, then we assume there is budget remaining. |
226 | context::budget(|cell| cell.get().has_remaining()).unwrap_or(default:true) |
227 | } |
228 | |
229 | cfg_rt_multi_thread! { |
230 | /// Sets the current task's budget. |
231 | pub(crate) fn set(budget: Budget) { |
232 | let _ = context::budget(|cell| cell.set(budget)); |
233 | } |
234 | } |
235 | |
236 | cfg_rt! { |
237 | /// Forcibly removes the budgeting constraints early. |
238 | /// |
239 | /// Returns the remaining budget |
240 | pub(crate) fn stop() -> Budget { |
241 | context::budget(|cell| { |
242 | let prev = cell.get(); |
243 | cell.set(Budget::unconstrained()); |
244 | prev |
245 | }).unwrap_or(Budget::unconstrained()) |
246 | } |
247 | } |
248 | |
249 | cfg_coop! { |
250 | use pin_project_lite::pin_project; |
251 | use std::cell::Cell; |
252 | use std::future::Future; |
253 | use std::pin::Pin; |
254 | use std::task::{ready, Context, Poll}; |
255 | |
256 | #[must_use ] |
257 | pub(crate) struct RestoreOnPending(Cell<Budget>); |
258 | |
259 | impl RestoreOnPending { |
260 | pub(crate) fn made_progress(&self) { |
261 | self.0.set(Budget::unconstrained()); |
262 | } |
263 | } |
264 | |
265 | impl Drop for RestoreOnPending { |
266 | fn drop(&mut self) { |
267 | // Don't reset if budget was unconstrained or if we made progress. |
268 | // They are both represented as the remembered budget being unconstrained. |
269 | let budget = self.0.get(); |
270 | if !budget.is_unconstrained() { |
271 | let _ = context::budget(|cell| { |
272 | cell.set(budget); |
273 | }); |
274 | } |
275 | } |
276 | } |
277 | |
278 | /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. |
279 | /// |
280 | /// When you call this method, the current budget is decremented. However, to ensure that |
281 | /// progress is made every time a task is polled, the budget is automatically restored to its |
282 | /// former value if the returned `RestoreOnPending` is dropped. It is the caller's |
283 | /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure |
284 | /// that the budget empties appropriately. |
285 | /// |
286 | /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**. |
287 | /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and |
288 | /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates |
289 | /// that progress was made. |
290 | #[inline ] |
291 | pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { |
292 | context::budget(|cell| { |
293 | let mut budget = cell.get(); |
294 | |
295 | let decrement = budget.decrement(); |
296 | |
297 | if decrement.success { |
298 | let restore = RestoreOnPending(Cell::new(cell.get())); |
299 | cell.set(budget); |
300 | |
301 | // avoid double counting |
302 | if decrement.hit_zero { |
303 | inc_budget_forced_yield_count(); |
304 | } |
305 | |
306 | Poll::Ready(restore) |
307 | } else { |
308 | register_waker(cx); |
309 | Poll::Pending |
310 | } |
311 | }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) |
312 | } |
313 | |
314 | /// Returns `Poll::Ready` if the current task has budget to consume, and `Poll::Pending` otherwise. |
315 | /// |
316 | /// Note that in contrast to `poll_proceed`, this method does not consume any budget and is used when |
317 | /// polling for budget availability. |
318 | #[inline ] |
319 | pub(crate) fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> { |
320 | if has_budget_remaining() { |
321 | Poll::Ready(()) |
322 | } else { |
323 | register_waker(cx); |
324 | |
325 | Poll::Pending |
326 | } |
327 | } |
328 | |
329 | cfg_rt! { |
330 | cfg_unstable_metrics! { |
331 | #[inline (always)] |
332 | fn inc_budget_forced_yield_count() { |
333 | let _ = context::with_current(|handle| { |
334 | handle.scheduler_metrics().inc_budget_forced_yield_count(); |
335 | }); |
336 | } |
337 | } |
338 | |
339 | cfg_not_unstable_metrics! { |
340 | #[inline (always)] |
341 | fn inc_budget_forced_yield_count() {} |
342 | } |
343 | |
344 | fn register_waker(cx: &mut Context<'_>) { |
345 | context::defer(cx.waker()); |
346 | } |
347 | } |
348 | |
349 | cfg_not_rt! { |
350 | #[inline (always)] |
351 | fn inc_budget_forced_yield_count() {} |
352 | |
353 | fn register_waker(cx: &mut Context<'_>) { |
354 | cx.waker().wake_by_ref() |
355 | } |
356 | } |
357 | |
358 | impl Budget { |
359 | /// Decrements the budget. Returns `true` if successful. Decrementing fails |
360 | /// when there is not enough remaining budget. |
361 | fn decrement(&mut self) -> BudgetDecrement { |
362 | if let Some(num) = &mut self.0 { |
363 | if *num > 0 { |
364 | *num -= 1; |
365 | |
366 | let hit_zero = *num == 0; |
367 | |
368 | BudgetDecrement { success: true, hit_zero } |
369 | } else { |
370 | BudgetDecrement { success: false, hit_zero: false } |
371 | } |
372 | } else { |
373 | BudgetDecrement { success: true, hit_zero: false } |
374 | } |
375 | } |
376 | |
377 | fn is_unconstrained(self) -> bool { |
378 | self.0.is_none() |
379 | } |
380 | } |
381 | |
382 | pin_project! { |
383 | /// Future wrapper to ensure cooperative scheduling. |
384 | /// |
385 | /// When being polled `poll_proceed` is called before the inner future is polled to check |
386 | /// if the inner future has exceeded its budget. If the inner future resolves, this will |
387 | /// automatically call `RestoreOnPending::made_progress` before resolving this future with |
388 | /// the result of the inner one. If polling the inner future is pending, polling this future |
389 | /// type will also return a `Poll::Pending`. |
390 | #[must_use = "futures do nothing unless polled" ] |
391 | pub(crate) struct Coop<F: Future> { |
392 | #[pin] |
393 | pub(crate) fut: F, |
394 | } |
395 | } |
396 | |
397 | impl<F: Future> Future for Coop<F> { |
398 | type Output = F::Output; |
399 | |
400 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
401 | let coop = ready!(poll_proceed(cx)); |
402 | let me = self.project(); |
403 | if let Poll::Ready(ret) = me.fut.poll(cx) { |
404 | coop.made_progress(); |
405 | Poll::Ready(ret) |
406 | } else { |
407 | Poll::Pending |
408 | } |
409 | } |
410 | } |
411 | |
412 | /// Run a future with a budget constraint for cooperative scheduling. |
413 | /// If the future exceeds its budget while being polled, control is yielded back to the |
414 | /// runtime. |
415 | #[inline ] |
416 | pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> { |
417 | Coop { fut } |
418 | } |
419 | } |
420 | |
421 | #[cfg (all(test, not(loom)))] |
422 | mod test { |
423 | use super::*; |
424 | |
425 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
426 | use wasm_bindgen_test::wasm_bindgen_test as test; |
427 | |
428 | fn get() -> Budget { |
429 | context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained()) |
430 | } |
431 | |
432 | #[test ] |
433 | fn budgeting() { |
434 | use std::future::poll_fn; |
435 | use tokio_test::*; |
436 | |
437 | assert!(get().0.is_none()); |
438 | |
439 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
440 | |
441 | assert!(get().0.is_none()); |
442 | drop(coop); |
443 | assert!(get().0.is_none()); |
444 | |
445 | budget(|| { |
446 | assert_eq!(get().0, Budget::initial().0); |
447 | |
448 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
449 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
450 | drop(coop); |
451 | // we didn't make progress |
452 | assert_eq!(get().0, Budget::initial().0); |
453 | |
454 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
455 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
456 | coop.made_progress(); |
457 | drop(coop); |
458 | // we _did_ make progress |
459 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
460 | |
461 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
462 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
463 | coop.made_progress(); |
464 | drop(coop); |
465 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
466 | |
467 | budget(|| { |
468 | assert_eq!(get().0, Budget::initial().0); |
469 | |
470 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
471 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
472 | coop.made_progress(); |
473 | drop(coop); |
474 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
475 | }); |
476 | |
477 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
478 | }); |
479 | |
480 | assert!(get().0.is_none()); |
481 | |
482 | budget(|| { |
483 | let n = get().0.unwrap(); |
484 | |
485 | for _ in 0..n { |
486 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
487 | coop.made_progress(); |
488 | } |
489 | |
490 | let mut task = task::spawn(poll_fn(|cx| { |
491 | let coop = std::task::ready!(poll_proceed(cx)); |
492 | coop.made_progress(); |
493 | Poll::Ready(()) |
494 | })); |
495 | |
496 | assert_pending!(task.poll()); |
497 | }); |
498 | } |
499 | } |
500 | |