1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "full" )] |
3 | |
4 | use futures::{ |
5 | future::{pending, ready}, |
6 | FutureExt, |
7 | }; |
8 | |
9 | use tokio::runtime; |
10 | use tokio::sync::{mpsc, oneshot}; |
11 | use tokio::task::{self, LocalSet}; |
12 | use tokio::time; |
13 | |
14 | #[cfg (not(target_os = "wasi" ))] |
15 | use std::cell::Cell; |
16 | use std::sync::atomic::AtomicBool; |
17 | #[cfg (not(target_os = "wasi" ))] |
18 | use std::sync::atomic::AtomicUsize; |
19 | use std::sync::atomic::Ordering; |
20 | #[cfg (not(target_os = "wasi" ))] |
21 | use std::sync::atomic::Ordering::SeqCst; |
22 | use std::time::Duration; |
23 | |
24 | #[tokio::test (flavor = "current_thread" )] |
25 | async fn local_current_thread_scheduler() { |
26 | LocalSet::new() |
27 | .run_until(async { |
28 | task::spawn_local(async {}).await.unwrap(); |
29 | }) |
30 | .await; |
31 | } |
32 | |
33 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
34 | #[tokio::test (flavor = "multi_thread" )] |
35 | async fn local_threadpool() { |
36 | thread_local! { |
37 | static ON_RT_THREAD: Cell<bool> = Cell::new(false); |
38 | } |
39 | |
40 | ON_RT_THREAD.with(|cell| cell.set(true)); |
41 | |
42 | LocalSet::new() |
43 | .run_until(async { |
44 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
45 | task::spawn_local(async { |
46 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
47 | }) |
48 | .await |
49 | .unwrap(); |
50 | }) |
51 | .await; |
52 | } |
53 | |
54 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
55 | #[tokio::test (flavor = "multi_thread" )] |
56 | async fn localset_future_threadpool() { |
57 | thread_local! { |
58 | static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false); |
59 | } |
60 | |
61 | ON_LOCAL_THREAD.with(|cell| cell.set(true)); |
62 | |
63 | let local = LocalSet::new(); |
64 | local.spawn_local(async move { |
65 | assert!(ON_LOCAL_THREAD.with(|cell| cell.get())); |
66 | }); |
67 | local.await; |
68 | } |
69 | |
70 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
71 | #[tokio::test (flavor = "multi_thread" )] |
72 | async fn localset_future_timers() { |
73 | static RAN1: AtomicBool = AtomicBool::new(false); |
74 | static RAN2: AtomicBool = AtomicBool::new(false); |
75 | |
76 | let local = LocalSet::new(); |
77 | local.spawn_local(async move { |
78 | time::sleep(Duration::from_millis(5)).await; |
79 | RAN1.store(true, Ordering::SeqCst); |
80 | }); |
81 | local.spawn_local(async move { |
82 | time::sleep(Duration::from_millis(10)).await; |
83 | RAN2.store(true, Ordering::SeqCst); |
84 | }); |
85 | local.await; |
86 | assert!(RAN1.load(Ordering::SeqCst)); |
87 | assert!(RAN2.load(Ordering::SeqCst)); |
88 | } |
89 | |
90 | #[tokio::test ] |
91 | async fn localset_future_drives_all_local_futs() { |
92 | static RAN1: AtomicBool = AtomicBool::new(false); |
93 | static RAN2: AtomicBool = AtomicBool::new(false); |
94 | static RAN3: AtomicBool = AtomicBool::new(false); |
95 | |
96 | let local = LocalSet::new(); |
97 | local.spawn_local(async move { |
98 | task::spawn_local(async { |
99 | task::yield_now().await; |
100 | RAN3.store(true, Ordering::SeqCst); |
101 | }); |
102 | task::yield_now().await; |
103 | RAN1.store(true, Ordering::SeqCst); |
104 | }); |
105 | local.spawn_local(async move { |
106 | task::yield_now().await; |
107 | RAN2.store(true, Ordering::SeqCst); |
108 | }); |
109 | local.await; |
110 | assert!(RAN1.load(Ordering::SeqCst)); |
111 | assert!(RAN2.load(Ordering::SeqCst)); |
112 | assert!(RAN3.load(Ordering::SeqCst)); |
113 | } |
114 | |
115 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
116 | #[tokio::test (flavor = "multi_thread" )] |
117 | async fn local_threadpool_timer() { |
118 | // This test ensures that runtime services like the timer are properly |
119 | // set for the local task set. |
120 | thread_local! { |
121 | static ON_RT_THREAD: Cell<bool> = Cell::new(false); |
122 | } |
123 | |
124 | ON_RT_THREAD.with(|cell| cell.set(true)); |
125 | |
126 | LocalSet::new() |
127 | .run_until(async { |
128 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
129 | let join = task::spawn_local(async move { |
130 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
131 | time::sleep(Duration::from_millis(10)).await; |
132 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
133 | }); |
134 | join.await.unwrap(); |
135 | }) |
136 | .await; |
137 | } |
138 | #[test] |
139 | fn enter_guard_spawn() { |
140 | let local = LocalSet::new(); |
141 | let _guard = local.enter(); |
142 | // Run the local task set. |
143 | |
144 | let join = task::spawn_local(async { true }); |
145 | let rt = runtime::Builder::new_current_thread() |
146 | .enable_all() |
147 | .build() |
148 | .unwrap(); |
149 | local.block_on(&rt, async move { |
150 | assert!(join.await.unwrap()); |
151 | }); |
152 | } |
153 | |
154 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support panic recovery |
155 | #[test] |
156 | // This will panic, since the thread that calls `block_on` cannot use |
157 | // in-place blocking inside of `block_on`. |
158 | #[should_panic ] |
159 | fn local_threadpool_blocking_in_place() { |
160 | thread_local! { |
161 | static ON_RT_THREAD: Cell<bool> = Cell::new(false); |
162 | } |
163 | |
164 | ON_RT_THREAD.with(|cell| cell.set(true)); |
165 | |
166 | let rt = runtime::Builder::new_current_thread() |
167 | .enable_all() |
168 | .build() |
169 | .unwrap(); |
170 | LocalSet::new().block_on(&rt, async { |
171 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
172 | let join = task::spawn_local(async move { |
173 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
174 | task::block_in_place(|| {}); |
175 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
176 | }); |
177 | join.await.unwrap(); |
178 | }); |
179 | } |
180 | |
181 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
182 | #[tokio::test (flavor = "multi_thread" )] |
183 | async fn local_threadpool_blocking_run() { |
184 | thread_local! { |
185 | static ON_RT_THREAD: Cell<bool> = Cell::new(false); |
186 | } |
187 | |
188 | ON_RT_THREAD.with(|cell| cell.set(true)); |
189 | |
190 | LocalSet::new() |
191 | .run_until(async { |
192 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
193 | let join = task::spawn_local(async move { |
194 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
195 | task::spawn_blocking(|| { |
196 | assert!( |
197 | !ON_RT_THREAD.with(|cell| cell.get()), |
198 | "blocking must not run on the local task set's thread" |
199 | ); |
200 | }) |
201 | .await |
202 | .unwrap(); |
203 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
204 | }); |
205 | join.await.unwrap(); |
206 | }) |
207 | .await; |
208 | } |
209 | |
210 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
211 | #[tokio::test (flavor = "multi_thread" )] |
212 | async fn all_spawns_are_local() { |
213 | use futures::future; |
214 | thread_local! { |
215 | static ON_RT_THREAD: Cell<bool> = Cell::new(false); |
216 | } |
217 | |
218 | ON_RT_THREAD.with(|cell| cell.set(true)); |
219 | |
220 | LocalSet::new() |
221 | .run_until(async { |
222 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
223 | let handles = (0..128) |
224 | .map(|_| { |
225 | task::spawn_local(async { |
226 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
227 | }) |
228 | }) |
229 | .collect::<Vec<_>>(); |
230 | for joined in future::join_all(handles).await { |
231 | joined.unwrap(); |
232 | } |
233 | }) |
234 | .await; |
235 | } |
236 | |
237 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
238 | #[tokio::test (flavor = "multi_thread" )] |
239 | async fn nested_spawn_is_local() { |
240 | thread_local! { |
241 | static ON_RT_THREAD: Cell<bool> = Cell::new(false); |
242 | } |
243 | |
244 | ON_RT_THREAD.with(|cell| cell.set(true)); |
245 | |
246 | LocalSet::new() |
247 | .run_until(async { |
248 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
249 | task::spawn_local(async { |
250 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
251 | task::spawn_local(async { |
252 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
253 | task::spawn_local(async { |
254 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
255 | task::spawn_local(async { |
256 | assert!(ON_RT_THREAD.with(|cell| cell.get())); |
257 | }) |
258 | .await |
259 | .unwrap(); |
260 | }) |
261 | .await |
262 | .unwrap(); |
263 | }) |
264 | .await |
265 | .unwrap(); |
266 | }) |
267 | .await |
268 | .unwrap(); |
269 | }) |
270 | .await; |
271 | } |
272 | |
273 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
274 | #[test] |
275 | fn join_local_future_elsewhere() { |
276 | thread_local! { |
277 | static ON_RT_THREAD: Cell<bool> = Cell::new(false); |
278 | } |
279 | |
280 | ON_RT_THREAD.with(|cell| cell.set(true)); |
281 | |
282 | let rt = runtime::Runtime::new().unwrap(); |
283 | let local = LocalSet::new(); |
284 | local.block_on(&rt, async move { |
285 | let (tx, rx) = oneshot::channel(); |
286 | let join = task::spawn_local(async move { |
287 | assert!( |
288 | ON_RT_THREAD.with(|cell| cell.get()), |
289 | "local task must run on local thread, no matter where it is awaited" |
290 | ); |
291 | rx.await.unwrap(); |
292 | |
293 | "hello world" |
294 | }); |
295 | let join2 = task::spawn(async move { |
296 | assert!( |
297 | !ON_RT_THREAD.with(|cell| cell.get()), |
298 | "spawned task should be on a worker" |
299 | ); |
300 | |
301 | tx.send(()).expect("task shouldn't have ended yet" ); |
302 | |
303 | join.await.expect("task should complete successfully" ); |
304 | }); |
305 | join2.await.unwrap() |
306 | }); |
307 | } |
308 | |
309 | // Tests for <https://github.com/tokio-rs/tokio/issues/4973> |
310 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
311 | #[tokio::test (flavor = "multi_thread" )] |
312 | async fn localset_in_thread_local() { |
313 | thread_local! { |
314 | static LOCAL_SET: LocalSet = LocalSet::new(); |
315 | } |
316 | |
317 | // holds runtime thread until end of main fn. |
318 | let (_tx, rx) = oneshot::channel::<()>(); |
319 | let handle = tokio::runtime::Handle::current(); |
320 | |
321 | std::thread::spawn(move || { |
322 | LOCAL_SET.with(|local_set| { |
323 | handle.block_on(local_set.run_until(async move { |
324 | let _ = rx.await; |
325 | })) |
326 | }); |
327 | }); |
328 | } |
329 | |
330 | #[test] |
331 | fn drop_cancels_tasks() { |
332 | use std::rc::Rc; |
333 | |
334 | // This test reproduces issue #1842 |
335 | let rt = rt(); |
336 | let rc1 = Rc::new(()); |
337 | let rc2 = rc1.clone(); |
338 | |
339 | let (started_tx, started_rx) = oneshot::channel(); |
340 | |
341 | let local = LocalSet::new(); |
342 | local.spawn_local(async move { |
343 | // Move this in |
344 | let _rc2 = rc2; |
345 | |
346 | started_tx.send(()).unwrap(); |
347 | futures::future::pending::<()>().await; |
348 | }); |
349 | |
350 | local.block_on(&rt, async { |
351 | started_rx.await.unwrap(); |
352 | }); |
353 | drop(local); |
354 | drop(rt); |
355 | |
356 | assert_eq!(1, Rc::strong_count(&rc1)); |
357 | } |
358 | |
359 | /// Runs a test function in a separate thread, and panics if the test does not |
360 | /// complete within the specified timeout, or if the test function panics. |
361 | /// |
362 | /// This is intended for running tests whose failure mode is a hang or infinite |
363 | /// loop that cannot be detected otherwise. |
364 | fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) { |
365 | use std::sync::mpsc::RecvTimeoutError; |
366 | |
367 | let (done_tx, done_rx) = std::sync::mpsc::channel(); |
368 | let thread = std::thread::spawn(move || { |
369 | f(); |
370 | |
371 | // Send a message on the channel so that the test thread can |
372 | // determine if we have entered an infinite loop: |
373 | done_tx.send(()).unwrap(); |
374 | }); |
375 | |
376 | // Since the failure mode of this test is an infinite loop, rather than |
377 | // something we can easily make assertions about, we'll run it in a |
378 | // thread. When the test thread finishes, it will send a message on a |
379 | // channel to this thread. We'll wait for that message with a fairly |
380 | // generous timeout, and if we don't receive it, we assume the test |
381 | // thread has hung. |
382 | // |
383 | // Note that it should definitely complete in under a minute, but just |
384 | // in case CI is slow, we'll give it a long timeout. |
385 | match done_rx.recv_timeout(timeout) { |
386 | Err(RecvTimeoutError::Timeout) => panic!( |
387 | "test did not complete within {:?} seconds, \ |
388 | we have (probably) entered an infinite loop!" , |
389 | timeout, |
390 | ), |
391 | // Did the test thread panic? We'll find out for sure when we `join` |
392 | // with it. |
393 | Err(RecvTimeoutError::Disconnected) => {} |
394 | // Test completed successfully! |
395 | Ok(()) => {} |
396 | } |
397 | |
398 | thread.join().expect("test thread should not panic!" ) |
399 | } |
400 | |
401 | #[cfg_attr ( |
402 | target_os = "wasi" , |
403 | ignore = "`unwrap()` in `with_timeout()` panics on Wasi" |
404 | )] |
405 | #[test] |
406 | fn drop_cancels_remote_tasks() { |
407 | // This test reproduces issue #1885. |
408 | with_timeout(Duration::from_secs(60), || { |
409 | let (tx, mut rx) = mpsc::channel::<()>(1024); |
410 | |
411 | let rt = rt(); |
412 | |
413 | let local = LocalSet::new(); |
414 | local.spawn_local(async move { while rx.recv().await.is_some() {} }); |
415 | local.block_on(&rt, async { |
416 | time::sleep(Duration::from_millis(1)).await; |
417 | }); |
418 | |
419 | drop(tx); |
420 | |
421 | // This enters an infinite loop if the remote notified tasks are not |
422 | // properly cancelled. |
423 | drop(local); |
424 | }); |
425 | } |
426 | |
427 | #[cfg_attr ( |
428 | target_os = "wasi" , |
429 | ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi" |
430 | )] |
431 | #[test] |
432 | fn local_tasks_wake_join_all() { |
433 | // This test reproduces issue #2460. |
434 | with_timeout(Duration::from_secs(60), || { |
435 | use futures::future::join_all; |
436 | use tokio::task::LocalSet; |
437 | |
438 | let rt = rt(); |
439 | let set = LocalSet::new(); |
440 | let mut handles = Vec::new(); |
441 | |
442 | for _ in 1..=128 { |
443 | handles.push(set.spawn_local(async move { |
444 | tokio::task::spawn_local(async move {}).await.unwrap(); |
445 | })); |
446 | } |
447 | |
448 | rt.block_on(set.run_until(join_all(handles))); |
449 | }); |
450 | } |
451 | |
452 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support panic recovery |
453 | #[test] |
454 | fn local_tasks_are_polled_after_tick() { |
455 | // This test depends on timing, so we run it up to five times. |
456 | for _ in 0..4 { |
457 | let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner); |
458 | if res.is_ok() { |
459 | // success |
460 | return; |
461 | } |
462 | } |
463 | |
464 | // Test failed 4 times. Try one more time without catching panics. If it |
465 | // fails again, the test fails. |
466 | local_tasks_are_polled_after_tick_inner(); |
467 | } |
468 | |
469 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support panic recovery |
470 | #[tokio::main(flavor = "current_thread" )] |
471 | async fn local_tasks_are_polled_after_tick_inner() { |
472 | // Reproduces issues #1899 and #1900 |
473 | |
474 | static RX1: AtomicUsize = AtomicUsize::new(0); |
475 | static RX2: AtomicUsize = AtomicUsize::new(0); |
476 | const EXPECTED: usize = 500; |
477 | |
478 | RX1.store(0, SeqCst); |
479 | RX2.store(0, SeqCst); |
480 | |
481 | let (tx, mut rx) = mpsc::unbounded_channel(); |
482 | |
483 | let local = LocalSet::new(); |
484 | |
485 | local |
486 | .run_until(async { |
487 | let task2 = task::spawn(async move { |
488 | // Wait a bit |
489 | time::sleep(Duration::from_millis(10)).await; |
490 | |
491 | let mut oneshots = Vec::with_capacity(EXPECTED); |
492 | |
493 | // Send values |
494 | for _ in 0..EXPECTED { |
495 | let (oneshot_tx, oneshot_rx) = oneshot::channel(); |
496 | oneshots.push(oneshot_tx); |
497 | tx.send(oneshot_rx).unwrap(); |
498 | } |
499 | |
500 | time::sleep(Duration::from_millis(10)).await; |
501 | |
502 | for tx in oneshots.drain(..) { |
503 | tx.send(()).unwrap(); |
504 | } |
505 | |
506 | loop { |
507 | time::sleep(Duration::from_millis(20)).await; |
508 | let rx1 = RX1.load(SeqCst); |
509 | let rx2 = RX2.load(SeqCst); |
510 | |
511 | if rx1 == EXPECTED && rx2 == EXPECTED { |
512 | break; |
513 | } |
514 | } |
515 | }); |
516 | |
517 | while let Some(oneshot) = rx.recv().await { |
518 | RX1.fetch_add(1, SeqCst); |
519 | |
520 | task::spawn_local(async move { |
521 | oneshot.await.unwrap(); |
522 | RX2.fetch_add(1, SeqCst); |
523 | }); |
524 | } |
525 | |
526 | task2.await.unwrap(); |
527 | }) |
528 | .await; |
529 | } |
530 | |
531 | #[tokio::test ] |
532 | async fn acquire_mutex_in_drop() { |
533 | use futures::future::pending; |
534 | |
535 | let (tx1, rx1) = oneshot::channel(); |
536 | let (tx2, rx2) = oneshot::channel(); |
537 | let local = LocalSet::new(); |
538 | |
539 | local.spawn_local(async move { |
540 | let _ = rx2.await; |
541 | unreachable!(); |
542 | }); |
543 | |
544 | local.spawn_local(async move { |
545 | let _ = rx1.await; |
546 | tx2.send(()).unwrap(); |
547 | unreachable!(); |
548 | }); |
549 | |
550 | // Spawn a task that will never notify |
551 | local.spawn_local(async move { |
552 | pending::<()>().await; |
553 | tx1.send(()).unwrap(); |
554 | }); |
555 | |
556 | // Tick the loop |
557 | local |
558 | .run_until(async { |
559 | task::yield_now().await; |
560 | }) |
561 | .await; |
562 | |
563 | // Drop the LocalSet |
564 | drop(local); |
565 | } |
566 | |
567 | #[tokio::test ] |
568 | async fn spawn_wakes_localset() { |
569 | let local = LocalSet::new(); |
570 | futures::select! { |
571 | _ = local.run_until(pending::<()>()).fuse() => unreachable!(), |
572 | ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret |
573 | } |
574 | } |
575 | |
576 | /// Checks that the task wakes up with `enter`. |
577 | /// Reproduces <https://github.com/tokio-rs/tokio/issues/5020>. |
578 | #[tokio::test ] |
579 | async fn sleep_with_local_enter_guard() { |
580 | let local = LocalSet::new(); |
581 | let _guard = local.enter(); |
582 | |
583 | let (tx, rx) = oneshot::channel(); |
584 | |
585 | local |
586 | .run_until(async move { |
587 | tokio::task::spawn_local(async move { |
588 | time::sleep(Duration::ZERO).await; |
589 | |
590 | tx.send(()).expect("failed to send" ); |
591 | }); |
592 | assert_eq!(rx.await, Ok(())); |
593 | }) |
594 | .await; |
595 | } |
596 | |
597 | #[test] |
598 | fn store_local_set_in_thread_local_with_runtime() { |
599 | use tokio::runtime::Runtime; |
600 | |
601 | thread_local! { |
602 | static CURRENT: RtAndLocalSet = RtAndLocalSet::new(); |
603 | } |
604 | |
605 | struct RtAndLocalSet { |
606 | rt: Runtime, |
607 | local: LocalSet, |
608 | } |
609 | |
610 | impl RtAndLocalSet { |
611 | fn new() -> RtAndLocalSet { |
612 | RtAndLocalSet { |
613 | rt: tokio::runtime::Builder::new_current_thread() |
614 | .enable_all() |
615 | .build() |
616 | .unwrap(), |
617 | local: LocalSet::new(), |
618 | } |
619 | } |
620 | |
621 | async fn inner_method(&self) { |
622 | self.local |
623 | .run_until(async move { |
624 | tokio::task::spawn_local(async {}); |
625 | }) |
626 | .await |
627 | } |
628 | |
629 | fn method(&self) { |
630 | self.rt.block_on(self.inner_method()); |
631 | } |
632 | } |
633 | |
634 | CURRENT.with(|f| { |
635 | f.method(); |
636 | }); |
637 | } |
638 | |
639 | #[cfg (tokio_unstable)] |
640 | mod unstable { |
641 | use tokio::runtime::UnhandledPanic; |
642 | use tokio::task::LocalSet; |
643 | |
644 | #[tokio::test ] |
645 | #[should_panic ( |
646 | expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic" |
647 | )] |
648 | async fn shutdown_on_panic() { |
649 | LocalSet::new() |
650 | .unhandled_panic(UnhandledPanic::ShutdownRuntime) |
651 | .run_until(async { |
652 | tokio::task::spawn_local(async { |
653 | panic!("boom" ); |
654 | }); |
655 | |
656 | futures::future::pending::<()>().await; |
657 | }) |
658 | .await; |
659 | } |
660 | |
661 | // This test compares that, when the task driving `run_until` has already |
662 | // consumed budget, the `run_until` future has less budget than a "spawned" |
663 | // task. |
664 | // |
665 | // "Budget" is a fuzzy metric as the Tokio runtime is able to change values |
666 | // internally. This is why the test uses indirection to test this. |
667 | #[tokio::test ] |
668 | async fn run_until_does_not_get_own_budget() { |
669 | // Consume some budget |
670 | tokio::task::consume_budget().await; |
671 | |
672 | LocalSet::new() |
673 | .run_until(async { |
674 | let spawned = tokio::spawn(async { |
675 | let mut spawned_n = 0; |
676 | |
677 | { |
678 | let mut spawned = tokio_test::task::spawn(async { |
679 | loop { |
680 | spawned_n += 1; |
681 | tokio::task::consume_budget().await; |
682 | } |
683 | }); |
684 | // Poll once |
685 | assert!(!spawned.poll().is_ready()); |
686 | } |
687 | |
688 | spawned_n |
689 | }); |
690 | |
691 | let mut run_until_n = 0; |
692 | { |
693 | let mut run_until = tokio_test::task::spawn(async { |
694 | loop { |
695 | run_until_n += 1; |
696 | tokio::task::consume_budget().await; |
697 | } |
698 | }); |
699 | // Poll once |
700 | assert!(!run_until.poll().is_ready()); |
701 | } |
702 | |
703 | let spawned_n = spawned.await.unwrap(); |
704 | assert_ne!(spawned_n, 0); |
705 | assert_ne!(run_until_n, 0); |
706 | assert!(spawned_n > run_until_n); |
707 | }) |
708 | .await |
709 | } |
710 | } |
711 | |
712 | fn rt() -> runtime::Runtime { |
713 | tokio::runtime::Builder::new_current_thread() |
714 | .enable_all() |
715 | .build() |
716 | .unwrap() |
717 | } |
718 | |