1#![warn(rust_2018_idioms)]
2#![cfg(feature = "full")]
3
4use futures::{
5 future::{pending, ready},
6 FutureExt,
7};
8
9use tokio::runtime;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::{self, LocalSet};
12use tokio::time;
13
14#[cfg(not(target_os = "wasi"))]
15use std::cell::Cell;
16use std::sync::atomic::AtomicBool;
17#[cfg(not(target_os = "wasi"))]
18use std::sync::atomic::AtomicUsize;
19use std::sync::atomic::Ordering;
20#[cfg(not(target_os = "wasi"))]
21use std::sync::atomic::Ordering::SeqCst;
22use std::time::Duration;
23
24#[tokio::test(flavor = "current_thread")]
25async fn local_current_thread_scheduler() {
26 LocalSet::new()
27 .run_until(async {
28 task::spawn_local(async {}).await.unwrap();
29 })
30 .await;
31}
32
33#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
34#[tokio::test(flavor = "multi_thread")]
35async fn local_threadpool() {
36 thread_local! {
37 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
38 }
39
40 ON_RT_THREAD.with(|cell| cell.set(true));
41
42 LocalSet::new()
43 .run_until(async {
44 assert!(ON_RT_THREAD.with(|cell| cell.get()));
45 task::spawn_local(async {
46 assert!(ON_RT_THREAD.with(|cell| cell.get()));
47 })
48 .await
49 .unwrap();
50 })
51 .await;
52}
53
54#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
55#[tokio::test(flavor = "multi_thread")]
56async fn localset_future_threadpool() {
57 thread_local! {
58 static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false);
59 }
60
61 ON_LOCAL_THREAD.with(|cell| cell.set(true));
62
63 let local = LocalSet::new();
64 local.spawn_local(async move {
65 assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
66 });
67 local.await;
68}
69
70#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
71#[tokio::test(flavor = "multi_thread")]
72async fn localset_future_timers() {
73 static RAN1: AtomicBool = AtomicBool::new(false);
74 static RAN2: AtomicBool = AtomicBool::new(false);
75
76 let local = LocalSet::new();
77 local.spawn_local(async move {
78 time::sleep(Duration::from_millis(5)).await;
79 RAN1.store(true, Ordering::SeqCst);
80 });
81 local.spawn_local(async move {
82 time::sleep(Duration::from_millis(10)).await;
83 RAN2.store(true, Ordering::SeqCst);
84 });
85 local.await;
86 assert!(RAN1.load(Ordering::SeqCst));
87 assert!(RAN2.load(Ordering::SeqCst));
88}
89
90#[tokio::test]
91async fn localset_future_drives_all_local_futs() {
92 static RAN1: AtomicBool = AtomicBool::new(false);
93 static RAN2: AtomicBool = AtomicBool::new(false);
94 static RAN3: AtomicBool = AtomicBool::new(false);
95
96 let local = LocalSet::new();
97 local.spawn_local(async move {
98 task::spawn_local(async {
99 task::yield_now().await;
100 RAN3.store(true, Ordering::SeqCst);
101 });
102 task::yield_now().await;
103 RAN1.store(true, Ordering::SeqCst);
104 });
105 local.spawn_local(async move {
106 task::yield_now().await;
107 RAN2.store(true, Ordering::SeqCst);
108 });
109 local.await;
110 assert!(RAN1.load(Ordering::SeqCst));
111 assert!(RAN2.load(Ordering::SeqCst));
112 assert!(RAN3.load(Ordering::SeqCst));
113}
114
115#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
116#[tokio::test(flavor = "multi_thread")]
117async fn local_threadpool_timer() {
118 // This test ensures that runtime services like the timer are properly
119 // set for the local task set.
120 thread_local! {
121 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
122 }
123
124 ON_RT_THREAD.with(|cell| cell.set(true));
125
126 LocalSet::new()
127 .run_until(async {
128 assert!(ON_RT_THREAD.with(|cell| cell.get()));
129 let join = task::spawn_local(async move {
130 assert!(ON_RT_THREAD.with(|cell| cell.get()));
131 time::sleep(Duration::from_millis(10)).await;
132 assert!(ON_RT_THREAD.with(|cell| cell.get()));
133 });
134 join.await.unwrap();
135 })
136 .await;
137}
138#[test]
139fn enter_guard_spawn() {
140 let local = LocalSet::new();
141 let _guard = local.enter();
142 // Run the local task set.
143
144 let join = task::spawn_local(async { true });
145 let rt = runtime::Builder::new_current_thread()
146 .enable_all()
147 .build()
148 .unwrap();
149 local.block_on(&rt, async move {
150 assert!(join.await.unwrap());
151 });
152}
153
154#[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
155#[test]
156// This will panic, since the thread that calls `block_on` cannot use
157// in-place blocking inside of `block_on`.
158#[should_panic]
159fn local_threadpool_blocking_in_place() {
160 thread_local! {
161 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
162 }
163
164 ON_RT_THREAD.with(|cell| cell.set(true));
165
166 let rt = runtime::Builder::new_current_thread()
167 .enable_all()
168 .build()
169 .unwrap();
170 LocalSet::new().block_on(&rt, async {
171 assert!(ON_RT_THREAD.with(|cell| cell.get()));
172 let join = task::spawn_local(async move {
173 assert!(ON_RT_THREAD.with(|cell| cell.get()));
174 task::block_in_place(|| {});
175 assert!(ON_RT_THREAD.with(|cell| cell.get()));
176 });
177 join.await.unwrap();
178 });
179}
180
181#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
182#[tokio::test(flavor = "multi_thread")]
183async fn local_threadpool_blocking_run() {
184 thread_local! {
185 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
186 }
187
188 ON_RT_THREAD.with(|cell| cell.set(true));
189
190 LocalSet::new()
191 .run_until(async {
192 assert!(ON_RT_THREAD.with(|cell| cell.get()));
193 let join = task::spawn_local(async move {
194 assert!(ON_RT_THREAD.with(|cell| cell.get()));
195 task::spawn_blocking(|| {
196 assert!(
197 !ON_RT_THREAD.with(|cell| cell.get()),
198 "blocking must not run on the local task set's thread"
199 );
200 })
201 .await
202 .unwrap();
203 assert!(ON_RT_THREAD.with(|cell| cell.get()));
204 });
205 join.await.unwrap();
206 })
207 .await;
208}
209
210#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
211#[tokio::test(flavor = "multi_thread")]
212async fn all_spawns_are_local() {
213 use futures::future;
214 thread_local! {
215 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
216 }
217
218 ON_RT_THREAD.with(|cell| cell.set(true));
219
220 LocalSet::new()
221 .run_until(async {
222 assert!(ON_RT_THREAD.with(|cell| cell.get()));
223 let handles = (0..128)
224 .map(|_| {
225 task::spawn_local(async {
226 assert!(ON_RT_THREAD.with(|cell| cell.get()));
227 })
228 })
229 .collect::<Vec<_>>();
230 for joined in future::join_all(handles).await {
231 joined.unwrap();
232 }
233 })
234 .await;
235}
236
237#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
238#[tokio::test(flavor = "multi_thread")]
239async fn nested_spawn_is_local() {
240 thread_local! {
241 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
242 }
243
244 ON_RT_THREAD.with(|cell| cell.set(true));
245
246 LocalSet::new()
247 .run_until(async {
248 assert!(ON_RT_THREAD.with(|cell| cell.get()));
249 task::spawn_local(async {
250 assert!(ON_RT_THREAD.with(|cell| cell.get()));
251 task::spawn_local(async {
252 assert!(ON_RT_THREAD.with(|cell| cell.get()));
253 task::spawn_local(async {
254 assert!(ON_RT_THREAD.with(|cell| cell.get()));
255 task::spawn_local(async {
256 assert!(ON_RT_THREAD.with(|cell| cell.get()));
257 })
258 .await
259 .unwrap();
260 })
261 .await
262 .unwrap();
263 })
264 .await
265 .unwrap();
266 })
267 .await
268 .unwrap();
269 })
270 .await;
271}
272
273#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
274#[test]
275fn join_local_future_elsewhere() {
276 thread_local! {
277 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
278 }
279
280 ON_RT_THREAD.with(|cell| cell.set(true));
281
282 let rt = runtime::Runtime::new().unwrap();
283 let local = LocalSet::new();
284 local.block_on(&rt, async move {
285 let (tx, rx) = oneshot::channel();
286 let join = task::spawn_local(async move {
287 assert!(
288 ON_RT_THREAD.with(|cell| cell.get()),
289 "local task must run on local thread, no matter where it is awaited"
290 );
291 rx.await.unwrap();
292
293 "hello world"
294 });
295 let join2 = task::spawn(async move {
296 assert!(
297 !ON_RT_THREAD.with(|cell| cell.get()),
298 "spawned task should be on a worker"
299 );
300
301 tx.send(()).expect("task shouldn't have ended yet");
302
303 join.await.expect("task should complete successfully");
304 });
305 join2.await.unwrap()
306 });
307}
308
309// Tests for <https://github.com/tokio-rs/tokio/issues/4973>
310#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
311#[tokio::test(flavor = "multi_thread")]
312async fn localset_in_thread_local() {
313 thread_local! {
314 static LOCAL_SET: LocalSet = LocalSet::new();
315 }
316
317 // holds runtime thread until end of main fn.
318 let (_tx, rx) = oneshot::channel::<()>();
319 let handle = tokio::runtime::Handle::current();
320
321 std::thread::spawn(move || {
322 LOCAL_SET.with(|local_set| {
323 handle.block_on(local_set.run_until(async move {
324 let _ = rx.await;
325 }))
326 });
327 });
328}
329
330#[test]
331fn drop_cancels_tasks() {
332 use std::rc::Rc;
333
334 // This test reproduces issue #1842
335 let rt = rt();
336 let rc1 = Rc::new(());
337 let rc2 = rc1.clone();
338
339 let (started_tx, started_rx) = oneshot::channel();
340
341 let local = LocalSet::new();
342 local.spawn_local(async move {
343 // Move this in
344 let _rc2 = rc2;
345
346 started_tx.send(()).unwrap();
347 futures::future::pending::<()>().await;
348 });
349
350 local.block_on(&rt, async {
351 started_rx.await.unwrap();
352 });
353 drop(local);
354 drop(rt);
355
356 assert_eq!(1, Rc::strong_count(&rc1));
357}
358
359/// Runs a test function in a separate thread, and panics if the test does not
360/// complete within the specified timeout, or if the test function panics.
361///
362/// This is intended for running tests whose failure mode is a hang or infinite
363/// loop that cannot be detected otherwise.
364fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
365 use std::sync::mpsc::RecvTimeoutError;
366
367 let (done_tx, done_rx) = std::sync::mpsc::channel();
368 let thread = std::thread::spawn(move || {
369 f();
370
371 // Send a message on the channel so that the test thread can
372 // determine if we have entered an infinite loop:
373 done_tx.send(()).unwrap();
374 });
375
376 // Since the failure mode of this test is an infinite loop, rather than
377 // something we can easily make assertions about, we'll run it in a
378 // thread. When the test thread finishes, it will send a message on a
379 // channel to this thread. We'll wait for that message with a fairly
380 // generous timeout, and if we don't receive it, we assume the test
381 // thread has hung.
382 //
383 // Note that it should definitely complete in under a minute, but just
384 // in case CI is slow, we'll give it a long timeout.
385 match done_rx.recv_timeout(timeout) {
386 Err(RecvTimeoutError::Timeout) => panic!(
387 "test did not complete within {:?} seconds, \
388 we have (probably) entered an infinite loop!",
389 timeout,
390 ),
391 // Did the test thread panic? We'll find out for sure when we `join`
392 // with it.
393 Err(RecvTimeoutError::Disconnected) => {}
394 // Test completed successfully!
395 Ok(()) => {}
396 }
397
398 thread.join().expect("test thread should not panic!")
399}
400
401#[cfg_attr(
402 target_os = "wasi",
403 ignore = "`unwrap()` in `with_timeout()` panics on Wasi"
404)]
405#[test]
406fn drop_cancels_remote_tasks() {
407 // This test reproduces issue #1885.
408 with_timeout(Duration::from_secs(60), || {
409 let (tx, mut rx) = mpsc::channel::<()>(1024);
410
411 let rt = rt();
412
413 let local = LocalSet::new();
414 local.spawn_local(async move { while rx.recv().await.is_some() {} });
415 local.block_on(&rt, async {
416 time::sleep(Duration::from_millis(1)).await;
417 });
418
419 drop(tx);
420
421 // This enters an infinite loop if the remote notified tasks are not
422 // properly cancelled.
423 drop(local);
424 });
425}
426
427#[cfg_attr(
428 target_os = "wasi",
429 ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi"
430)]
431#[test]
432fn local_tasks_wake_join_all() {
433 // This test reproduces issue #2460.
434 with_timeout(Duration::from_secs(60), || {
435 use futures::future::join_all;
436 use tokio::task::LocalSet;
437
438 let rt = rt();
439 let set = LocalSet::new();
440 let mut handles = Vec::new();
441
442 for _ in 1..=128 {
443 handles.push(set.spawn_local(async move {
444 tokio::task::spawn_local(async move {}).await.unwrap();
445 }));
446 }
447
448 rt.block_on(set.run_until(join_all(handles)));
449 });
450}
451
452#[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
453#[test]
454fn local_tasks_are_polled_after_tick() {
455 // This test depends on timing, so we run it up to five times.
456 for _ in 0..4 {
457 let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner);
458 if res.is_ok() {
459 // success
460 return;
461 }
462 }
463
464 // Test failed 4 times. Try one more time without catching panics. If it
465 // fails again, the test fails.
466 local_tasks_are_polled_after_tick_inner();
467}
468
469#[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
470#[tokio::main(flavor = "current_thread")]
471async fn local_tasks_are_polled_after_tick_inner() {
472 // Reproduces issues #1899 and #1900
473
474 static RX1: AtomicUsize = AtomicUsize::new(0);
475 static RX2: AtomicUsize = AtomicUsize::new(0);
476 const EXPECTED: usize = 500;
477
478 RX1.store(0, SeqCst);
479 RX2.store(0, SeqCst);
480
481 let (tx, mut rx) = mpsc::unbounded_channel();
482
483 let local = LocalSet::new();
484
485 local
486 .run_until(async {
487 let task2 = task::spawn(async move {
488 // Wait a bit
489 time::sleep(Duration::from_millis(10)).await;
490
491 let mut oneshots = Vec::with_capacity(EXPECTED);
492
493 // Send values
494 for _ in 0..EXPECTED {
495 let (oneshot_tx, oneshot_rx) = oneshot::channel();
496 oneshots.push(oneshot_tx);
497 tx.send(oneshot_rx).unwrap();
498 }
499
500 time::sleep(Duration::from_millis(10)).await;
501
502 for tx in oneshots.drain(..) {
503 tx.send(()).unwrap();
504 }
505
506 loop {
507 time::sleep(Duration::from_millis(20)).await;
508 let rx1 = RX1.load(SeqCst);
509 let rx2 = RX2.load(SeqCst);
510
511 if rx1 == EXPECTED && rx2 == EXPECTED {
512 break;
513 }
514 }
515 });
516
517 while let Some(oneshot) = rx.recv().await {
518 RX1.fetch_add(1, SeqCst);
519
520 task::spawn_local(async move {
521 oneshot.await.unwrap();
522 RX2.fetch_add(1, SeqCst);
523 });
524 }
525
526 task2.await.unwrap();
527 })
528 .await;
529}
530
531#[tokio::test]
532async fn acquire_mutex_in_drop() {
533 use futures::future::pending;
534
535 let (tx1, rx1) = oneshot::channel();
536 let (tx2, rx2) = oneshot::channel();
537 let local = LocalSet::new();
538
539 local.spawn_local(async move {
540 let _ = rx2.await;
541 unreachable!();
542 });
543
544 local.spawn_local(async move {
545 let _ = rx1.await;
546 tx2.send(()).unwrap();
547 unreachable!();
548 });
549
550 // Spawn a task that will never notify
551 local.spawn_local(async move {
552 pending::<()>().await;
553 tx1.send(()).unwrap();
554 });
555
556 // Tick the loop
557 local
558 .run_until(async {
559 task::yield_now().await;
560 })
561 .await;
562
563 // Drop the LocalSet
564 drop(local);
565}
566
567#[tokio::test]
568async fn spawn_wakes_localset() {
569 let local = LocalSet::new();
570 futures::select! {
571 _ = local.run_until(pending::<()>()).fuse() => unreachable!(),
572 ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret
573 }
574}
575
576/// Checks that the task wakes up with `enter`.
577/// Reproduces <https://github.com/tokio-rs/tokio/issues/5020>.
578#[tokio::test]
579async fn sleep_with_local_enter_guard() {
580 let local = LocalSet::new();
581 let _guard = local.enter();
582
583 let (tx, rx) = oneshot::channel();
584
585 local
586 .run_until(async move {
587 tokio::task::spawn_local(async move {
588 time::sleep(Duration::ZERO).await;
589
590 tx.send(()).expect("failed to send");
591 });
592 assert_eq!(rx.await, Ok(()));
593 })
594 .await;
595}
596
597#[test]
598fn store_local_set_in_thread_local_with_runtime() {
599 use tokio::runtime::Runtime;
600
601 thread_local! {
602 static CURRENT: RtAndLocalSet = RtAndLocalSet::new();
603 }
604
605 struct RtAndLocalSet {
606 rt: Runtime,
607 local: LocalSet,
608 }
609
610 impl RtAndLocalSet {
611 fn new() -> RtAndLocalSet {
612 RtAndLocalSet {
613 rt: tokio::runtime::Builder::new_current_thread()
614 .enable_all()
615 .build()
616 .unwrap(),
617 local: LocalSet::new(),
618 }
619 }
620
621 async fn inner_method(&self) {
622 self.local
623 .run_until(async move {
624 tokio::task::spawn_local(async {});
625 })
626 .await
627 }
628
629 fn method(&self) {
630 self.rt.block_on(self.inner_method());
631 }
632 }
633
634 CURRENT.with(|f| {
635 f.method();
636 });
637}
638
639#[cfg(tokio_unstable)]
640mod unstable {
641 use tokio::runtime::UnhandledPanic;
642 use tokio::task::LocalSet;
643
644 #[tokio::test]
645 #[should_panic(
646 expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"
647 )]
648 async fn shutdown_on_panic() {
649 LocalSet::new()
650 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
651 .run_until(async {
652 tokio::task::spawn_local(async {
653 panic!("boom");
654 });
655
656 futures::future::pending::<()>().await;
657 })
658 .await;
659 }
660
661 // This test compares that, when the task driving `run_until` has already
662 // consumed budget, the `run_until` future has less budget than a "spawned"
663 // task.
664 //
665 // "Budget" is a fuzzy metric as the Tokio runtime is able to change values
666 // internally. This is why the test uses indirection to test this.
667 #[tokio::test]
668 async fn run_until_does_not_get_own_budget() {
669 // Consume some budget
670 tokio::task::consume_budget().await;
671
672 LocalSet::new()
673 .run_until(async {
674 let spawned = tokio::spawn(async {
675 let mut spawned_n = 0;
676
677 {
678 let mut spawned = tokio_test::task::spawn(async {
679 loop {
680 spawned_n += 1;
681 tokio::task::consume_budget().await;
682 }
683 });
684 // Poll once
685 assert!(!spawned.poll().is_ready());
686 }
687
688 spawned_n
689 });
690
691 let mut run_until_n = 0;
692 {
693 let mut run_until = tokio_test::task::spawn(async {
694 loop {
695 run_until_n += 1;
696 tokio::task::consume_budget().await;
697 }
698 });
699 // Poll once
700 assert!(!run_until.poll().is_ready());
701 }
702
703 let spawned_n = spawned.await.unwrap();
704 assert_ne!(spawned_n, 0);
705 assert_ne!(run_until_n, 0);
706 assert!(spawned_n > run_until_n);
707 })
708 .await
709 }
710}
711
712fn rt() -> runtime::Runtime {
713 tokio::runtime::Builder::new_current_thread()
714 .enable_all()
715 .build()
716 .unwrap()
717}
718