| 1 | #![cfg_attr (not(feature = "full" ), allow(dead_code))] |
| 2 | #![cfg_attr (not(feature = "rt" ), allow(unreachable_pub))] |
| 3 | |
| 4 | //! Utilities for improved cooperative scheduling. |
| 5 | //! |
| 6 | //! ### Cooperative scheduling |
| 7 | //! |
| 8 | //! A single call to [`poll`] on a top-level task may potentially do a lot of |
| 9 | //! work before it returns `Poll::Pending`. If a task runs for a long period of |
| 10 | //! time without yielding back to the executor, it can starve other tasks |
| 11 | //! waiting on that executor to execute them, or drive underlying resources. |
| 12 | //! Since Rust does not have a runtime, it is difficult to forcibly preempt a |
| 13 | //! long-running task. Instead, this module provides an opt-in mechanism for |
| 14 | //! futures to collaborate with the executor to avoid starvation. |
| 15 | //! |
| 16 | //! Consider a future like this one: |
| 17 | //! |
| 18 | //! ``` |
| 19 | //! # use tokio_stream::{Stream, StreamExt}; |
| 20 | //! async fn drop_all<I: Stream + Unpin>(mut input: I) { |
| 21 | //! while let Some(_) = input.next().await {} |
| 22 | //! } |
| 23 | //! ``` |
| 24 | //! |
| 25 | //! It may look harmless, but consider what happens under heavy load if the |
| 26 | //! input stream is _always_ ready. If we spawn `drop_all`, the task will never |
| 27 | //! yield, and will starve other tasks and resources on the same executor. |
| 28 | //! |
| 29 | //! To account for this, Tokio has explicit yield points in a number of library |
| 30 | //! functions, which force tasks to return to the executor periodically. |
| 31 | //! |
| 32 | //! |
| 33 | //! #### unconstrained |
| 34 | //! |
| 35 | //! If necessary, [`task::unconstrained`] lets you opt a future out of Tokio's cooperative |
| 36 | //! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to |
| 37 | //! Tokio. For example: |
| 38 | //! |
| 39 | //! ``` |
| 40 | //! # #[tokio::main] |
| 41 | //! # async fn main() { |
| 42 | //! use tokio::{task, sync::mpsc}; |
| 43 | //! |
| 44 | //! let fut = async { |
| 45 | //! let (tx, mut rx) = mpsc::unbounded_channel(); |
| 46 | //! |
| 47 | //! for i in 0..1000 { |
| 48 | //! let _ = tx.send(()); |
| 49 | //! // This will always be ready. If coop was in effect, this code would be forced to yield |
| 50 | //! // periodically. However, if left unconstrained, then this code will never yield. |
| 51 | //! rx.recv().await; |
| 52 | //! } |
| 53 | //! }; |
| 54 | //! |
| 55 | //! task::coop::unconstrained(fut).await; |
| 56 | //! # } |
| 57 | //! ``` |
| 58 | //! [`poll`]: method@std::future::Future::poll |
| 59 | //! [`task::unconstrained`]: crate::task::unconstrained() |
| 60 | |
| 61 | cfg_rt! { |
| 62 | mod consume_budget; |
| 63 | pub use consume_budget::consume_budget; |
| 64 | |
| 65 | mod unconstrained; |
| 66 | pub use unconstrained::{unconstrained, Unconstrained}; |
| 67 | } |
| 68 | |
| 69 | // ```ignore |
| 70 | // # use tokio_stream::{Stream, StreamExt}; |
| 71 | // async fn drop_all<I: Stream + Unpin>(mut input: I) { |
| 72 | // while let Some(_) = input.next().await { |
| 73 | // tokio::coop::proceed().await; |
| 74 | // } |
| 75 | // } |
| 76 | // ``` |
| 77 | // |
| 78 | // The `proceed` future will coordinate with the executor to make sure that |
| 79 | // every so often control is yielded back to the executor so it can run other |
| 80 | // tasks. |
| 81 | // |
| 82 | // # Placing yield points |
| 83 | // |
| 84 | // Voluntary yield points should be placed _after_ at least some work has been |
| 85 | // done. If they are not, a future sufficiently deep in the task hierarchy may |
| 86 | // end up _never_ getting to run because of the number of yield points that |
| 87 | // inevitably appear before it is reached. In general, you will want yield |
| 88 | // points to only appear in "leaf" futures -- those that do not themselves poll |
| 89 | // other futures. By doing this, you avoid double-counting each iteration of |
| 90 | // the outer future against the cooperating budget. |
| 91 | |
| 92 | use crate::runtime::context; |
| 93 | |
| 94 | /// Opaque type tracking the amount of "work" a task may still do before |
| 95 | /// yielding back to the scheduler. |
| 96 | #[derive (Debug, Copy, Clone)] |
| 97 | pub(crate) struct Budget(Option<u8>); |
| 98 | |
| 99 | pub(crate) struct BudgetDecrement { |
| 100 | success: bool, |
| 101 | hit_zero: bool, |
| 102 | } |
| 103 | |
| 104 | impl Budget { |
| 105 | /// Budget assigned to a task on each poll. |
| 106 | /// |
| 107 | /// The value itself is chosen somewhat arbitrarily. It needs to be high |
| 108 | /// enough to amortize wakeup and scheduling costs, but low enough that we |
| 109 | /// do not starve other tasks for too long. The value also needs to be high |
| 110 | /// enough that particularly deep tasks are able to do at least some useful |
| 111 | /// work at all. |
| 112 | /// |
| 113 | /// Note that as more yield points are added in the ecosystem, this value |
| 114 | /// will probably also have to be raised. |
| 115 | const fn initial() -> Budget { |
| 116 | Budget(Some(128)) |
| 117 | } |
| 118 | |
| 119 | /// Returns an unconstrained budget. Operations will not be limited. |
| 120 | pub(crate) const fn unconstrained() -> Budget { |
| 121 | Budget(None) |
| 122 | } |
| 123 | |
| 124 | fn has_remaining(self) -> bool { |
| 125 | self.0.map_or(default:true, |budget: u8| budget > 0) |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | /// Runs the given closure with a cooperative task budget. When the function |
| 130 | /// returns, the budget is reset to the value prior to calling the function. |
| 131 | #[inline (always)] |
| 132 | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { |
| 133 | with_budget(Budget::initial(), f) |
| 134 | } |
| 135 | |
| 136 | /// Runs the given closure with an unconstrained task budget. When the function returns, the budget |
| 137 | /// is reset to the value prior to calling the function. |
| 138 | #[inline (always)] |
| 139 | pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R { |
| 140 | with_budget(Budget::unconstrained(), f) |
| 141 | } |
| 142 | |
| 143 | #[inline (always)] |
| 144 | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { |
| 145 | struct ResetGuard { |
| 146 | prev: Budget, |
| 147 | } |
| 148 | |
| 149 | impl Drop for ResetGuard { |
| 150 | fn drop(&mut self) { |
| 151 | let _ = context::budget(|cell| { |
| 152 | cell.set(self.prev); |
| 153 | }); |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | #[allow (unused_variables)] |
| 158 | let maybe_guard = context::budget(|cell| { |
| 159 | let prev = cell.get(); |
| 160 | cell.set(budget); |
| 161 | |
| 162 | ResetGuard { prev } |
| 163 | }); |
| 164 | |
| 165 | // The function is called regardless even if the budget is not successfully |
| 166 | // set due to the thread-local being destroyed. |
| 167 | f() |
| 168 | } |
| 169 | |
| 170 | /// Returns `true` if there is still budget left on the task. |
| 171 | /// |
| 172 | /// # Examples |
| 173 | /// |
| 174 | /// This example defines a `Timeout` future that requires a given `future` to complete before the |
| 175 | /// specified duration elapses. If it does, its result is returned; otherwise, an error is returned |
| 176 | /// and the future is canceled. |
| 177 | /// |
| 178 | /// Note that the future could exhaust the budget before we evaluate the timeout. Using `has_budget_remaining`, |
| 179 | /// we can detect this scenario and ensure the timeout is always checked. |
| 180 | /// |
| 181 | /// ``` |
| 182 | /// # use std::future::Future; |
| 183 | /// # use std::pin::{pin, Pin}; |
| 184 | /// # use std::task::{ready, Context, Poll}; |
| 185 | /// # use tokio::task::coop; |
| 186 | /// # use tokio::time::Sleep; |
| 187 | /// pub struct Timeout<T> { |
| 188 | /// future: T, |
| 189 | /// delay: Pin<Box<Sleep>>, |
| 190 | /// } |
| 191 | /// |
| 192 | /// impl<T> Future for Timeout<T> |
| 193 | /// where |
| 194 | /// T: Future + Unpin, |
| 195 | /// { |
| 196 | /// type Output = Result<T::Output, ()>; |
| 197 | /// |
| 198 | /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 199 | /// let this = Pin::into_inner(self); |
| 200 | /// let future = Pin::new(&mut this.future); |
| 201 | /// let delay = Pin::new(&mut this.delay); |
| 202 | /// |
| 203 | /// // check if the future is ready |
| 204 | /// let had_budget_before = coop::has_budget_remaining(); |
| 205 | /// if let Poll::Ready(v) = future.poll(cx) { |
| 206 | /// return Poll::Ready(Ok(v)); |
| 207 | /// } |
| 208 | /// let has_budget_now = coop::has_budget_remaining(); |
| 209 | /// |
| 210 | /// // evaluate the timeout |
| 211 | /// if let (true, false) = (had_budget_before, has_budget_now) { |
| 212 | /// // it is the underlying future that exhausted the budget |
| 213 | /// ready!(pin!(coop::unconstrained(delay)).poll(cx)); |
| 214 | /// } else { |
| 215 | /// ready!(delay.poll(cx)); |
| 216 | /// } |
| 217 | /// return Poll::Ready(Err(())); |
| 218 | /// } |
| 219 | /// } |
| 220 | ///``` |
| 221 | #[inline (always)] |
| 222 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
| 223 | pub fn has_budget_remaining() -> bool { |
| 224 | // If the current budget cannot be accessed due to the thread-local being |
| 225 | // shutdown, then we assume there is budget remaining. |
| 226 | context::budget(|cell| cell.get().has_remaining()).unwrap_or(default:true) |
| 227 | } |
| 228 | |
| 229 | cfg_rt_multi_thread! { |
| 230 | /// Sets the current task's budget. |
| 231 | pub(crate) fn set(budget: Budget) { |
| 232 | let _ = context::budget(|cell| cell.set(budget)); |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | cfg_rt! { |
| 237 | /// Forcibly removes the budgeting constraints early. |
| 238 | /// |
| 239 | /// Returns the remaining budget |
| 240 | pub(crate) fn stop() -> Budget { |
| 241 | context::budget(|cell| { |
| 242 | let prev = cell.get(); |
| 243 | cell.set(Budget::unconstrained()); |
| 244 | prev |
| 245 | }).unwrap_or(Budget::unconstrained()) |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | cfg_coop! { |
| 250 | use pin_project_lite::pin_project; |
| 251 | use std::cell::Cell; |
| 252 | use std::future::Future; |
| 253 | use std::pin::Pin; |
| 254 | use std::task::{ready, Context, Poll}; |
| 255 | |
| 256 | #[must_use ] |
| 257 | pub(crate) struct RestoreOnPending(Cell<Budget>); |
| 258 | |
| 259 | impl RestoreOnPending { |
| 260 | pub(crate) fn made_progress(&self) { |
| 261 | self.0.set(Budget::unconstrained()); |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | impl Drop for RestoreOnPending { |
| 266 | fn drop(&mut self) { |
| 267 | // Don't reset if budget was unconstrained or if we made progress. |
| 268 | // They are both represented as the remembered budget being unconstrained. |
| 269 | let budget = self.0.get(); |
| 270 | if !budget.is_unconstrained() { |
| 271 | let _ = context::budget(|cell| { |
| 272 | cell.set(budget); |
| 273 | }); |
| 274 | } |
| 275 | } |
| 276 | } |
| 277 | |
| 278 | /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. |
| 279 | /// |
| 280 | /// When you call this method, the current budget is decremented. However, to ensure that |
| 281 | /// progress is made every time a task is polled, the budget is automatically restored to its |
| 282 | /// former value if the returned `RestoreOnPending` is dropped. It is the caller's |
| 283 | /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure |
| 284 | /// that the budget empties appropriately. |
| 285 | /// |
| 286 | /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**. |
| 287 | /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and |
| 288 | /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates |
| 289 | /// that progress was made. |
| 290 | #[inline ] |
| 291 | pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { |
| 292 | context::budget(|cell| { |
| 293 | let mut budget = cell.get(); |
| 294 | |
| 295 | let decrement = budget.decrement(); |
| 296 | |
| 297 | if decrement.success { |
| 298 | let restore = RestoreOnPending(Cell::new(cell.get())); |
| 299 | cell.set(budget); |
| 300 | |
| 301 | // avoid double counting |
| 302 | if decrement.hit_zero { |
| 303 | inc_budget_forced_yield_count(); |
| 304 | } |
| 305 | |
| 306 | Poll::Ready(restore) |
| 307 | } else { |
| 308 | register_waker(cx); |
| 309 | Poll::Pending |
| 310 | } |
| 311 | }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) |
| 312 | } |
| 313 | |
| 314 | /// Returns `Poll::Ready` if the current task has budget to consume, and `Poll::Pending` otherwise. |
| 315 | /// |
| 316 | /// Note that in contrast to `poll_proceed`, this method does not consume any budget and is used when |
| 317 | /// polling for budget availability. |
| 318 | #[inline ] |
| 319 | pub(crate) fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> { |
| 320 | if has_budget_remaining() { |
| 321 | Poll::Ready(()) |
| 322 | } else { |
| 323 | register_waker(cx); |
| 324 | |
| 325 | Poll::Pending |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | cfg_rt! { |
| 330 | cfg_unstable_metrics! { |
| 331 | #[inline (always)] |
| 332 | fn inc_budget_forced_yield_count() { |
| 333 | let _ = context::with_current(|handle| { |
| 334 | handle.scheduler_metrics().inc_budget_forced_yield_count(); |
| 335 | }); |
| 336 | } |
| 337 | } |
| 338 | |
| 339 | cfg_not_unstable_metrics! { |
| 340 | #[inline (always)] |
| 341 | fn inc_budget_forced_yield_count() {} |
| 342 | } |
| 343 | |
| 344 | fn register_waker(cx: &mut Context<'_>) { |
| 345 | context::defer(cx.waker()); |
| 346 | } |
| 347 | } |
| 348 | |
| 349 | cfg_not_rt! { |
| 350 | #[inline (always)] |
| 351 | fn inc_budget_forced_yield_count() {} |
| 352 | |
| 353 | fn register_waker(cx: &mut Context<'_>) { |
| 354 | cx.waker().wake_by_ref() |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | impl Budget { |
| 359 | /// Decrements the budget. Returns `true` if successful. Decrementing fails |
| 360 | /// when there is not enough remaining budget. |
| 361 | fn decrement(&mut self) -> BudgetDecrement { |
| 362 | if let Some(num) = &mut self.0 { |
| 363 | if *num > 0 { |
| 364 | *num -= 1; |
| 365 | |
| 366 | let hit_zero = *num == 0; |
| 367 | |
| 368 | BudgetDecrement { success: true, hit_zero } |
| 369 | } else { |
| 370 | BudgetDecrement { success: false, hit_zero: false } |
| 371 | } |
| 372 | } else { |
| 373 | BudgetDecrement { success: true, hit_zero: false } |
| 374 | } |
| 375 | } |
| 376 | |
| 377 | fn is_unconstrained(self) -> bool { |
| 378 | self.0.is_none() |
| 379 | } |
| 380 | } |
| 381 | |
| 382 | pin_project! { |
| 383 | /// Future wrapper to ensure cooperative scheduling. |
| 384 | /// |
| 385 | /// When being polled `poll_proceed` is called before the inner future is polled to check |
| 386 | /// if the inner future has exceeded its budget. If the inner future resolves, this will |
| 387 | /// automatically call `RestoreOnPending::made_progress` before resolving this future with |
| 388 | /// the result of the inner one. If polling the inner future is pending, polling this future |
| 389 | /// type will also return a `Poll::Pending`. |
| 390 | #[must_use = "futures do nothing unless polled" ] |
| 391 | pub(crate) struct Coop<F: Future> { |
| 392 | #[pin] |
| 393 | pub(crate) fut: F, |
| 394 | } |
| 395 | } |
| 396 | |
| 397 | impl<F: Future> Future for Coop<F> { |
| 398 | type Output = F::Output; |
| 399 | |
| 400 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 401 | let coop = ready!(poll_proceed(cx)); |
| 402 | let me = self.project(); |
| 403 | if let Poll::Ready(ret) = me.fut.poll(cx) { |
| 404 | coop.made_progress(); |
| 405 | Poll::Ready(ret) |
| 406 | } else { |
| 407 | Poll::Pending |
| 408 | } |
| 409 | } |
| 410 | } |
| 411 | |
| 412 | /// Run a future with a budget constraint for cooperative scheduling. |
| 413 | /// If the future exceeds its budget while being polled, control is yielded back to the |
| 414 | /// runtime. |
| 415 | #[inline ] |
| 416 | pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> { |
| 417 | Coop { fut } |
| 418 | } |
| 419 | } |
| 420 | |
| 421 | #[cfg (all(test, not(loom)))] |
| 422 | mod test { |
| 423 | use super::*; |
| 424 | |
| 425 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
| 426 | use wasm_bindgen_test::wasm_bindgen_test as test; |
| 427 | |
| 428 | fn get() -> Budget { |
| 429 | context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained()) |
| 430 | } |
| 431 | |
| 432 | #[test ] |
| 433 | fn budgeting() { |
| 434 | use std::future::poll_fn; |
| 435 | use tokio_test::*; |
| 436 | |
| 437 | assert!(get().0.is_none()); |
| 438 | |
| 439 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 440 | |
| 441 | assert!(get().0.is_none()); |
| 442 | drop(coop); |
| 443 | assert!(get().0.is_none()); |
| 444 | |
| 445 | budget(|| { |
| 446 | assert_eq!(get().0, Budget::initial().0); |
| 447 | |
| 448 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 449 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
| 450 | drop(coop); |
| 451 | // we didn't make progress |
| 452 | assert_eq!(get().0, Budget::initial().0); |
| 453 | |
| 454 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 455 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
| 456 | coop.made_progress(); |
| 457 | drop(coop); |
| 458 | // we _did_ make progress |
| 459 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
| 460 | |
| 461 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 462 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
| 463 | coop.made_progress(); |
| 464 | drop(coop); |
| 465 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
| 466 | |
| 467 | budget(|| { |
| 468 | assert_eq!(get().0, Budget::initial().0); |
| 469 | |
| 470 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 471 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
| 472 | coop.made_progress(); |
| 473 | drop(coop); |
| 474 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
| 475 | }); |
| 476 | |
| 477 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
| 478 | }); |
| 479 | |
| 480 | assert!(get().0.is_none()); |
| 481 | |
| 482 | budget(|| { |
| 483 | let n = get().0.unwrap(); |
| 484 | |
| 485 | for _ in 0..n { |
| 486 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 487 | coop.made_progress(); |
| 488 | } |
| 489 | |
| 490 | let mut task = task::spawn(poll_fn(|cx| { |
| 491 | let coop = std::task::ready!(poll_proceed(cx)); |
| 492 | coop.made_progress(); |
| 493 | Poll::Ready(()) |
| 494 | })); |
| 495 | |
| 496 | assert_pending!(task.poll()); |
| 497 | }); |
| 498 | } |
| 499 | } |
| 500 | |