| 1 | #![ cfg_attr(not(feature = "full"), allow(dead_code))] | 
| 2 |  | 
|---|
| 3 | //! Yield points for improved cooperative scheduling. | 
|---|
| 4 | //! | 
|---|
| 5 | //! Documentation for this can be found in the [`tokio::task`] module. | 
|---|
| 6 | //! | 
|---|
| 7 | //! [`tokio::task`]: crate::task. | 
|---|
| 8 |  | 
|---|
| 9 | // ```ignore | 
|---|
| 10 | // # use tokio_stream::{Stream, StreamExt}; | 
|---|
| 11 | // async fn drop_all<I: Stream + Unpin>(mut input: I) { | 
|---|
| 12 | //     while let Some(_) = input.next().await { | 
|---|
| 13 | //         tokio::coop::proceed().await; | 
|---|
| 14 | //     } | 
|---|
| 15 | // } | 
|---|
| 16 | // ``` | 
|---|
| 17 | // | 
|---|
| 18 | // The `proceed` future will coordinate with the executor to make sure that | 
|---|
| 19 | // every so often control is yielded back to the executor so it can run other | 
|---|
| 20 | // tasks. | 
|---|
| 21 | // | 
|---|
| 22 | // # Placing yield points | 
|---|
| 23 | // | 
|---|
| 24 | // Voluntary yield points should be placed _after_ at least some work has been | 
|---|
| 25 | // done. If they are not, a future sufficiently deep in the task hierarchy may | 
|---|
| 26 | // end up _never_ getting to run because of the number of yield points that | 
|---|
| 27 | // inevitably appear before it is reached. In general, you will want yield | 
|---|
| 28 | // points to only appear in "leaf" futures -- those that do not themselves poll | 
|---|
| 29 | // other futures. By doing this, you avoid double-counting each iteration of | 
|---|
| 30 | // the outer future against the cooperating budget. | 
|---|
| 31 |  | 
|---|
| 32 | use crate::runtime::context; | 
|---|
| 33 |  | 
|---|
| 34 | /// Opaque type tracking the amount of "work" a task may still do before | 
|---|
| 35 | /// yielding back to the scheduler. | 
|---|
| 36 | #[ derive(Debug, Copy, Clone)] | 
|---|
| 37 | pub(crate) struct Budget(Option<u8>); | 
|---|
| 38 |  | 
|---|
| 39 | pub(crate) struct BudgetDecrement { | 
|---|
| 40 | success: bool, | 
|---|
| 41 | hit_zero: bool, | 
|---|
| 42 | } | 
|---|
| 43 |  | 
|---|
| 44 | impl Budget { | 
|---|
| 45 | /// Budget assigned to a task on each poll. | 
|---|
| 46 | /// | 
|---|
| 47 | /// The value itself is chosen somewhat arbitrarily. It needs to be high | 
|---|
| 48 | /// enough to amortize wakeup and scheduling costs, but low enough that we | 
|---|
| 49 | /// do not starve other tasks for too long. The value also needs to be high | 
|---|
| 50 | /// enough that particularly deep tasks are able to do at least some useful | 
|---|
| 51 | /// work at all. | 
|---|
| 52 | /// | 
|---|
| 53 | /// Note that as more yield points are added in the ecosystem, this value | 
|---|
| 54 | /// will probably also have to be raised. | 
|---|
| 55 | const fn initial() -> Budget { | 
|---|
| 56 | Budget(Some(128)) | 
|---|
| 57 | } | 
|---|
| 58 |  | 
|---|
| 59 | /// Returns an unconstrained budget. Operations will not be limited. | 
|---|
| 60 | pub(super) const fn unconstrained() -> Budget { | 
|---|
| 61 | Budget(None) | 
|---|
| 62 | } | 
|---|
| 63 |  | 
|---|
| 64 | fn has_remaining(self) -> bool { | 
|---|
| 65 | self.0.map_or(default:true, |budget: u8| budget > 0) | 
|---|
| 66 | } | 
|---|
| 67 | } | 
|---|
| 68 |  | 
|---|
| 69 | /// Runs the given closure with a cooperative task budget. When the function | 
|---|
| 70 | /// returns, the budget is reset to the value prior to calling the function. | 
|---|
| 71 | #[ inline(always)] | 
|---|
| 72 | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { | 
|---|
| 73 | with_budget(Budget::initial(), f) | 
|---|
| 74 | } | 
|---|
| 75 |  | 
|---|
| 76 | /// Runs the given closure with an unconstrained task budget. When the function returns, the budget | 
|---|
| 77 | /// is reset to the value prior to calling the function. | 
|---|
| 78 | #[ inline(always)] | 
|---|
| 79 | pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R { | 
|---|
| 80 | with_budget(Budget::unconstrained(), f) | 
|---|
| 81 | } | 
|---|
| 82 |  | 
|---|
| 83 | #[ inline(always)] | 
|---|
| 84 | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { | 
|---|
| 85 | struct ResetGuard { | 
|---|
| 86 | prev: Budget, | 
|---|
| 87 | } | 
|---|
| 88 |  | 
|---|
| 89 | impl Drop for ResetGuard { | 
|---|
| 90 | fn drop(&mut self) { | 
|---|
| 91 | let _ = context::budget(|cell| { | 
|---|
| 92 | cell.set(self.prev); | 
|---|
| 93 | }); | 
|---|
| 94 | } | 
|---|
| 95 | } | 
|---|
| 96 |  | 
|---|
| 97 | #[ allow(unused_variables)] | 
|---|
| 98 | let maybe_guard = context::budget(|cell| { | 
|---|
| 99 | let prev = cell.get(); | 
|---|
| 100 | cell.set(budget); | 
|---|
| 101 |  | 
|---|
| 102 | ResetGuard { prev } | 
|---|
| 103 | }); | 
|---|
| 104 |  | 
|---|
| 105 | // The function is called regardless even if the budget is not successfully | 
|---|
| 106 | // set due to the thread-local being destroyed. | 
|---|
| 107 | f() | 
|---|
| 108 | } | 
|---|
| 109 |  | 
|---|
| 110 | #[ inline(always)] | 
|---|
| 111 | pub(crate) fn has_budget_remaining() -> bool { | 
|---|
| 112 | // If the current budget cannot be accessed due to the thread-local being | 
|---|
| 113 | // shutdown, then we assume there is budget remaining. | 
|---|
| 114 | context::budget(|cell| cell.get().has_remaining()).unwrap_or(default:true) | 
|---|
| 115 | } | 
|---|
| 116 |  | 
|---|
| 117 | cfg_rt_multi_thread! { | 
|---|
| 118 | /// Sets the current task's budget. | 
|---|
| 119 | pub(crate) fn set(budget: Budget) { | 
|---|
| 120 | let _ = context::budget(|cell| cell.set(budget)); | 
|---|
| 121 | } | 
|---|
| 122 | } | 
|---|
| 123 |  | 
|---|
| 124 | cfg_rt! { | 
|---|
| 125 | /// Forcibly removes the budgeting constraints early. | 
|---|
| 126 | /// | 
|---|
| 127 | /// Returns the remaining budget | 
|---|
| 128 | pub(crate) fn stop() -> Budget { | 
|---|
| 129 | context::budget(|cell| { | 
|---|
| 130 | let prev = cell.get(); | 
|---|
| 131 | cell.set(Budget::unconstrained()); | 
|---|
| 132 | prev | 
|---|
| 133 | }).unwrap_or(Budget::unconstrained()) | 
|---|
| 134 | } | 
|---|
| 135 | } | 
|---|
| 136 |  | 
|---|
| 137 | cfg_coop! { | 
|---|
| 138 | use pin_project_lite::pin_project; | 
|---|
| 139 | use std::cell::Cell; | 
|---|
| 140 | use std::future::Future; | 
|---|
| 141 | use std::pin::Pin; | 
|---|
| 142 | use std::task::{ready, Context, Poll}; | 
|---|
| 143 |  | 
|---|
| 144 | #[ must_use] | 
|---|
| 145 | pub(crate) struct RestoreOnPending(Cell<Budget>); | 
|---|
| 146 |  | 
|---|
| 147 | impl RestoreOnPending { | 
|---|
| 148 | pub(crate) fn made_progress(&self) { | 
|---|
| 149 | self.0.set(Budget::unconstrained()); | 
|---|
| 150 | } | 
|---|
| 151 | } | 
|---|
| 152 |  | 
|---|
| 153 | impl Drop for RestoreOnPending { | 
|---|
| 154 | fn drop(&mut self) { | 
|---|
| 155 | // Don't reset if budget was unconstrained or if we made progress. | 
|---|
| 156 | // They are both represented as the remembered budget being unconstrained. | 
|---|
| 157 | let budget = self.0.get(); | 
|---|
| 158 | if !budget.is_unconstrained() { | 
|---|
| 159 | let _ = context::budget(|cell| { | 
|---|
| 160 | cell.set(budget); | 
|---|
| 161 | }); | 
|---|
| 162 | } | 
|---|
| 163 | } | 
|---|
| 164 | } | 
|---|
| 165 |  | 
|---|
| 166 | /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. | 
|---|
| 167 | /// | 
|---|
| 168 | /// When you call this method, the current budget is decremented. However, to ensure that | 
|---|
| 169 | /// progress is made every time a task is polled, the budget is automatically restored to its | 
|---|
| 170 | /// former value if the returned `RestoreOnPending` is dropped. It is the caller's | 
|---|
| 171 | /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure | 
|---|
| 172 | /// that the budget empties appropriately. | 
|---|
| 173 | /// | 
|---|
| 174 | /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**. | 
|---|
| 175 | /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and | 
|---|
| 176 | /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates | 
|---|
| 177 | /// that progress was made. | 
|---|
| 178 | #[ inline] | 
|---|
| 179 | pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { | 
|---|
| 180 | context::budget(|cell| { | 
|---|
| 181 | let mut budget = cell.get(); | 
|---|
| 182 |  | 
|---|
| 183 | let decrement = budget.decrement(); | 
|---|
| 184 |  | 
|---|
| 185 | if decrement.success { | 
|---|
| 186 | let restore = RestoreOnPending(Cell::new(cell.get())); | 
|---|
| 187 | cell.set(budget); | 
|---|
| 188 |  | 
|---|
| 189 | // avoid double counting | 
|---|
| 190 | if decrement.hit_zero { | 
|---|
| 191 | inc_budget_forced_yield_count(); | 
|---|
| 192 | } | 
|---|
| 193 |  | 
|---|
| 194 | Poll::Ready(restore) | 
|---|
| 195 | } else { | 
|---|
| 196 | cx.waker().wake_by_ref(); | 
|---|
| 197 | Poll::Pending | 
|---|
| 198 | } | 
|---|
| 199 | }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) | 
|---|
| 200 | } | 
|---|
| 201 |  | 
|---|
| 202 | cfg_rt! { | 
|---|
| 203 | cfg_unstable_metrics! { | 
|---|
| 204 | #[inline(always)] | 
|---|
| 205 | fn inc_budget_forced_yield_count() { | 
|---|
| 206 | let _ = context::with_current(|handle| { | 
|---|
| 207 | handle.scheduler_metrics().inc_budget_forced_yield_count(); | 
|---|
| 208 | }); | 
|---|
| 209 | } | 
|---|
| 210 | } | 
|---|
| 211 |  | 
|---|
| 212 | cfg_not_unstable_metrics! { | 
|---|
| 213 | #[inline(always)] | 
|---|
| 214 | fn inc_budget_forced_yield_count() {} | 
|---|
| 215 | } | 
|---|
| 216 | } | 
|---|
| 217 |  | 
|---|
| 218 | cfg_not_rt! { | 
|---|
| 219 | #[inline(always)] | 
|---|
| 220 | fn inc_budget_forced_yield_count() {} | 
|---|
| 221 | } | 
|---|
| 222 |  | 
|---|
| 223 | impl Budget { | 
|---|
| 224 | /// Decrements the budget. Returns `true` if successful. Decrementing fails | 
|---|
| 225 | /// when there is not enough remaining budget. | 
|---|
| 226 | fn decrement(&mut self) -> BudgetDecrement { | 
|---|
| 227 | if let Some(num) = &mut self.0 { | 
|---|
| 228 | if *num > 0 { | 
|---|
| 229 | *num -= 1; | 
|---|
| 230 |  | 
|---|
| 231 | let hit_zero = *num == 0; | 
|---|
| 232 |  | 
|---|
| 233 | BudgetDecrement { success: true, hit_zero } | 
|---|
| 234 | } else { | 
|---|
| 235 | BudgetDecrement { success: false, hit_zero: false } | 
|---|
| 236 | } | 
|---|
| 237 | } else { | 
|---|
| 238 | BudgetDecrement { success: true, hit_zero: false } | 
|---|
| 239 | } | 
|---|
| 240 | } | 
|---|
| 241 |  | 
|---|
| 242 | fn is_unconstrained(self) -> bool { | 
|---|
| 243 | self.0.is_none() | 
|---|
| 244 | } | 
|---|
| 245 | } | 
|---|
| 246 |  | 
|---|
| 247 | pin_project! { | 
|---|
| 248 | /// Future wrapper to ensure cooperative scheduling. | 
|---|
| 249 | /// | 
|---|
| 250 | /// When being polled `poll_proceed` is called before the inner future is polled to check | 
|---|
| 251 | /// if the inner future has exceeded its budget. If the inner future resolves, this will | 
|---|
| 252 | /// automatically call `RestoreOnPending::made_progress` before resolving this future with | 
|---|
| 253 | /// the result of the inner one. If polling the inner future is pending, polling this future | 
|---|
| 254 | /// type will also return a `Poll::Pending`. | 
|---|
| 255 | #[must_use = "futures do nothing unless polled"] | 
|---|
| 256 | pub(crate) struct Coop<F: Future> { | 
|---|
| 257 | #[pin] | 
|---|
| 258 | pub(crate) fut: F, | 
|---|
| 259 | } | 
|---|
| 260 | } | 
|---|
| 261 |  | 
|---|
| 262 | impl<F: Future> Future for Coop<F> { | 
|---|
| 263 | type Output = F::Output; | 
|---|
| 264 |  | 
|---|
| 265 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 
|---|
| 266 | let coop = ready!(poll_proceed(cx)); | 
|---|
| 267 | let me = self.project(); | 
|---|
| 268 | if let Poll::Ready(ret) = me.fut.poll(cx) { | 
|---|
| 269 | coop.made_progress(); | 
|---|
| 270 | Poll::Ready(ret) | 
|---|
| 271 | } else { | 
|---|
| 272 | Poll::Pending | 
|---|
| 273 | } | 
|---|
| 274 | } | 
|---|
| 275 | } | 
|---|
| 276 |  | 
|---|
| 277 | /// Run a future with a budget constraint for cooperative scheduling. | 
|---|
| 278 | /// If the future exceeds its budget while being polled, control is yielded back to the | 
|---|
| 279 | /// runtime. | 
|---|
| 280 | #[ inline] | 
|---|
| 281 | pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> { | 
|---|
| 282 | Coop { fut } | 
|---|
| 283 | } | 
|---|
| 284 | } | 
|---|
| 285 |  | 
|---|
| 286 | #[ cfg(all(test, not(loom)))] | 
|---|
| 287 | mod test { | 
|---|
| 288 | use super::*; | 
|---|
| 289 |  | 
|---|
| 290 | #[ cfg(all(target_family = "wasm", not(target_os = "wasi")))] | 
|---|
| 291 | use wasm_bindgen_test::wasm_bindgen_test as test; | 
|---|
| 292 |  | 
|---|
| 293 | fn get() -> Budget { | 
|---|
| 294 | context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained()) | 
|---|
| 295 | } | 
|---|
| 296 |  | 
|---|
| 297 | #[ test] | 
|---|
| 298 | fn budgeting() { | 
|---|
| 299 | use std::future::poll_fn; | 
|---|
| 300 | use tokio_test::*; | 
|---|
| 301 |  | 
|---|
| 302 | assert!(get().0.is_none()); | 
|---|
| 303 |  | 
|---|
| 304 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); | 
|---|
| 305 |  | 
|---|
| 306 | assert!(get().0.is_none()); | 
|---|
| 307 | drop(coop); | 
|---|
| 308 | assert!(get().0.is_none()); | 
|---|
| 309 |  | 
|---|
| 310 | budget(|| { | 
|---|
| 311 | assert_eq!(get().0, Budget::initial().0); | 
|---|
| 312 |  | 
|---|
| 313 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); | 
|---|
| 314 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); | 
|---|
| 315 | drop(coop); | 
|---|
| 316 | // we didn't make progress | 
|---|
| 317 | assert_eq!(get().0, Budget::initial().0); | 
|---|
| 318 |  | 
|---|
| 319 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); | 
|---|
| 320 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); | 
|---|
| 321 | coop.made_progress(); | 
|---|
| 322 | drop(coop); | 
|---|
| 323 | // we _did_ make progress | 
|---|
| 324 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); | 
|---|
| 325 |  | 
|---|
| 326 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); | 
|---|
| 327 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); | 
|---|
| 328 | coop.made_progress(); | 
|---|
| 329 | drop(coop); | 
|---|
| 330 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); | 
|---|
| 331 |  | 
|---|
| 332 | budget(|| { | 
|---|
| 333 | assert_eq!(get().0, Budget::initial().0); | 
|---|
| 334 |  | 
|---|
| 335 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); | 
|---|
| 336 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); | 
|---|
| 337 | coop.made_progress(); | 
|---|
| 338 | drop(coop); | 
|---|
| 339 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); | 
|---|
| 340 | }); | 
|---|
| 341 |  | 
|---|
| 342 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); | 
|---|
| 343 | }); | 
|---|
| 344 |  | 
|---|
| 345 | assert!(get().0.is_none()); | 
|---|
| 346 |  | 
|---|
| 347 | budget(|| { | 
|---|
| 348 | let n = get().0.unwrap(); | 
|---|
| 349 |  | 
|---|
| 350 | for _ in 0..n { | 
|---|
| 351 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); | 
|---|
| 352 | coop.made_progress(); | 
|---|
| 353 | } | 
|---|
| 354 |  | 
|---|
| 355 | let mut task = task::spawn(poll_fn(|cx| { | 
|---|
| 356 | let coop = std::task::ready!(poll_proceed(cx)); | 
|---|
| 357 | coop.made_progress(); | 
|---|
| 358 | Poll::Ready(()) | 
|---|
| 359 | })); | 
|---|
| 360 |  | 
|---|
| 361 | assert_pending!(task.poll()); | 
|---|
| 362 | }); | 
|---|
| 363 | } | 
|---|
| 364 | } | 
|---|
| 365 |  | 
|---|