1 | #![cfg_attr (not(feature = "full" ), allow(dead_code))] |
2 | |
3 | //! Yield points for improved cooperative scheduling. |
4 | //! |
5 | //! Documentation for this can be found in the [`tokio::task`] module. |
6 | //! |
7 | //! [`tokio::task`]: crate::task. |
8 | |
9 | // ```ignore |
10 | // # use tokio_stream::{Stream, StreamExt}; |
11 | // async fn drop_all<I: Stream + Unpin>(mut input: I) { |
12 | // while let Some(_) = input.next().await { |
13 | // tokio::coop::proceed().await; |
14 | // } |
15 | // } |
16 | // ``` |
17 | // |
18 | // The `proceed` future will coordinate with the executor to make sure that |
19 | // every so often control is yielded back to the executor so it can run other |
20 | // tasks. |
21 | // |
22 | // # Placing yield points |
23 | // |
24 | // Voluntary yield points should be placed _after_ at least some work has been |
25 | // done. If they are not, a future sufficiently deep in the task hierarchy may |
26 | // end up _never_ getting to run because of the number of yield points that |
27 | // inevitably appear before it is reached. In general, you will want yield |
28 | // points to only appear in "leaf" futures -- those that do not themselves poll |
29 | // other futures. By doing this, you avoid double-counting each iteration of |
30 | // the outer future against the cooperating budget. |
31 | |
32 | use crate::runtime::context; |
33 | |
34 | /// Opaque type tracking the amount of "work" a task may still do before |
35 | /// yielding back to the scheduler. |
36 | #[derive(Debug, Copy, Clone)] |
37 | pub(crate) struct Budget(Option<u8>); |
38 | |
39 | pub(crate) struct BudgetDecrement { |
40 | success: bool, |
41 | hit_zero: bool, |
42 | } |
43 | |
44 | impl Budget { |
45 | /// Budget assigned to a task on each poll. |
46 | /// |
47 | /// The value itself is chosen somewhat arbitrarily. It needs to be high |
48 | /// enough to amortize wakeup and scheduling costs, but low enough that we |
49 | /// do not starve other tasks for too long. The value also needs to be high |
50 | /// enough that particularly deep tasks are able to do at least some useful |
51 | /// work at all. |
52 | /// |
53 | /// Note that as more yield points are added in the ecosystem, this value |
54 | /// will probably also have to be raised. |
55 | const fn initial() -> Budget { |
56 | Budget(Some(128)) |
57 | } |
58 | |
59 | /// Returns an unconstrained budget. Operations will not be limited. |
60 | pub(super) const fn unconstrained() -> Budget { |
61 | Budget(None) |
62 | } |
63 | |
64 | fn has_remaining(self) -> bool { |
65 | self.0.map_or(true, |budget| budget > 0) |
66 | } |
67 | } |
68 | |
69 | /// Runs the given closure with a cooperative task budget. When the function |
70 | /// returns, the budget is reset to the value prior to calling the function. |
71 | #[inline (always)] |
72 | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { |
73 | with_budget(Budget::initial(), f) |
74 | } |
75 | |
76 | /// Runs the given closure with an unconstrained task budget. When the function returns, the budget |
77 | /// is reset to the value prior to calling the function. |
78 | #[inline (always)] |
79 | pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R { |
80 | with_budget(Budget::unconstrained(), f) |
81 | } |
82 | |
83 | #[inline (always)] |
84 | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { |
85 | struct ResetGuard { |
86 | prev: Budget, |
87 | } |
88 | |
89 | impl Drop for ResetGuard { |
90 | fn drop(&mut self) { |
91 | let _ = context::budget(|cell| { |
92 | cell.set(self.prev); |
93 | }); |
94 | } |
95 | } |
96 | |
97 | #[allow (unused_variables)] |
98 | let maybe_guard = context::budget(|cell| { |
99 | let prev = cell.get(); |
100 | cell.set(budget); |
101 | |
102 | ResetGuard { prev } |
103 | }); |
104 | |
105 | // The function is called regardless even if the budget is not successfully |
106 | // set due to the thread-local being destroyed. |
107 | f() |
108 | } |
109 | |
110 | #[inline (always)] |
111 | pub(crate) fn has_budget_remaining() -> bool { |
112 | // If the current budget cannot be accessed due to the thread-local being |
113 | // shutdown, then we assume there is budget remaining. |
114 | context::budget(|cell| cell.get().has_remaining()).unwrap_or(true) |
115 | } |
116 | |
117 | cfg_rt_multi_thread! { |
118 | /// Sets the current task's budget. |
119 | pub(crate) fn set(budget: Budget) { |
120 | let _ = context::budget(|cell| cell.set(budget)); |
121 | } |
122 | } |
123 | |
124 | cfg_rt! { |
125 | /// Forcibly removes the budgeting constraints early. |
126 | /// |
127 | /// Returns the remaining budget |
128 | pub(crate) fn stop() -> Budget { |
129 | context::budget(|cell| { |
130 | let prev = cell.get(); |
131 | cell.set(Budget::unconstrained()); |
132 | prev |
133 | }).unwrap_or(Budget::unconstrained()) |
134 | } |
135 | } |
136 | |
137 | cfg_coop! { |
138 | use std::cell::Cell; |
139 | use std::task::{Context, Poll}; |
140 | |
141 | #[must_use ] |
142 | pub(crate) struct RestoreOnPending(Cell<Budget>); |
143 | |
144 | impl RestoreOnPending { |
145 | pub(crate) fn made_progress(&self) { |
146 | self.0.set(Budget::unconstrained()); |
147 | } |
148 | } |
149 | |
150 | impl Drop for RestoreOnPending { |
151 | fn drop(&mut self) { |
152 | // Don't reset if budget was unconstrained or if we made progress. |
153 | // They are both represented as the remembered budget being unconstrained. |
154 | let budget = self.0.get(); |
155 | if !budget.is_unconstrained() { |
156 | let _ = context::budget(|cell| { |
157 | cell.set(budget); |
158 | }); |
159 | } |
160 | } |
161 | } |
162 | |
163 | /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. |
164 | /// |
165 | /// When you call this method, the current budget is decremented. However, to ensure that |
166 | /// progress is made every time a task is polled, the budget is automatically restored to its |
167 | /// former value if the returned `RestoreOnPending` is dropped. It is the caller's |
168 | /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure |
169 | /// that the budget empties appropriately. |
170 | /// |
171 | /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**. |
172 | /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and |
173 | /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates |
174 | /// that progress was made. |
175 | #[inline ] |
176 | pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { |
177 | context::budget(|cell| { |
178 | let mut budget = cell.get(); |
179 | |
180 | let decrement = budget.decrement(); |
181 | |
182 | if decrement.success { |
183 | let restore = RestoreOnPending(Cell::new(cell.get())); |
184 | cell.set(budget); |
185 | |
186 | // avoid double counting |
187 | if decrement.hit_zero { |
188 | inc_budget_forced_yield_count(); |
189 | } |
190 | |
191 | Poll::Ready(restore) |
192 | } else { |
193 | cx.waker().wake_by_ref(); |
194 | Poll::Pending |
195 | } |
196 | }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) |
197 | } |
198 | |
199 | cfg_rt! { |
200 | cfg_metrics! { |
201 | #[inline (always)] |
202 | fn inc_budget_forced_yield_count() { |
203 | let _ = context::with_current(|handle| { |
204 | handle.scheduler_metrics().inc_budget_forced_yield_count(); |
205 | }); |
206 | } |
207 | } |
208 | |
209 | cfg_not_metrics! { |
210 | #[inline (always)] |
211 | fn inc_budget_forced_yield_count() {} |
212 | } |
213 | } |
214 | |
215 | cfg_not_rt! { |
216 | #[inline (always)] |
217 | fn inc_budget_forced_yield_count() {} |
218 | } |
219 | |
220 | impl Budget { |
221 | /// Decrements the budget. Returns `true` if successful. Decrementing fails |
222 | /// when there is not enough remaining budget. |
223 | fn decrement(&mut self) -> BudgetDecrement { |
224 | if let Some(num) = &mut self.0 { |
225 | if *num > 0 { |
226 | *num -= 1; |
227 | |
228 | let hit_zero = *num == 0; |
229 | |
230 | BudgetDecrement { success: true, hit_zero } |
231 | } else { |
232 | BudgetDecrement { success: false, hit_zero: false } |
233 | } |
234 | } else { |
235 | BudgetDecrement { success: true, hit_zero: false } |
236 | } |
237 | } |
238 | |
239 | fn is_unconstrained(self) -> bool { |
240 | self.0.is_none() |
241 | } |
242 | } |
243 | } |
244 | |
245 | #[cfg (all(test, not(loom)))] |
246 | mod test { |
247 | use super::*; |
248 | |
249 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
250 | use wasm_bindgen_test::wasm_bindgen_test as test; |
251 | |
252 | fn get() -> Budget { |
253 | context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained()) |
254 | } |
255 | |
256 | #[test] |
257 | fn budgeting() { |
258 | use futures::future::poll_fn; |
259 | use tokio_test::*; |
260 | |
261 | assert!(get().0.is_none()); |
262 | |
263 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
264 | |
265 | assert!(get().0.is_none()); |
266 | drop(coop); |
267 | assert!(get().0.is_none()); |
268 | |
269 | budget(|| { |
270 | assert_eq!(get().0, Budget::initial().0); |
271 | |
272 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
273 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
274 | drop(coop); |
275 | // we didn't make progress |
276 | assert_eq!(get().0, Budget::initial().0); |
277 | |
278 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
279 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
280 | coop.made_progress(); |
281 | drop(coop); |
282 | // we _did_ make progress |
283 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
284 | |
285 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
286 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
287 | coop.made_progress(); |
288 | drop(coop); |
289 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
290 | |
291 | budget(|| { |
292 | assert_eq!(get().0, Budget::initial().0); |
293 | |
294 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
295 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
296 | coop.made_progress(); |
297 | drop(coop); |
298 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
299 | }); |
300 | |
301 | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
302 | }); |
303 | |
304 | assert!(get().0.is_none()); |
305 | |
306 | budget(|| { |
307 | let n = get().0.unwrap(); |
308 | |
309 | for _ in 0..n { |
310 | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
311 | coop.made_progress(); |
312 | } |
313 | |
314 | let mut task = task::spawn(poll_fn(|cx| { |
315 | let coop = ready!(poll_proceed(cx)); |
316 | coop.made_progress(); |
317 | Poll::Ready(()) |
318 | })); |
319 | |
320 | assert_pending!(task.poll()); |
321 | }); |
322 | } |
323 | } |
324 | |