1#![allow(clippy::needless_range_loop)]
2#![warn(rust_2018_idioms)]
3#![cfg(feature = "full")]
4
5// Tests to run on both current-thread & multi-thread runtime variants.
6
7macro_rules! rt_test {
8 ($($t:tt)*) => {
9 mod current_thread_scheduler {
10 $($t)*
11
12 #[cfg(not(target_os="wasi"))]
13 const NUM_WORKERS: usize = 1;
14
15 fn rt() -> Arc<Runtime> {
16 tokio::runtime::Builder::new_current_thread()
17 .enable_all()
18 .build()
19 .unwrap()
20 .into()
21 }
22 }
23
24 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
25 mod threaded_scheduler_4_threads {
26 $($t)*
27
28 const NUM_WORKERS: usize = 4;
29
30 fn rt() -> Arc<Runtime> {
31 tokio::runtime::Builder::new_multi_thread()
32 .worker_threads(4)
33 .enable_all()
34 .build()
35 .unwrap()
36 .into()
37 }
38 }
39
40 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
41 mod threaded_scheduler_1_thread {
42 $($t)*
43
44 const NUM_WORKERS: usize = 1;
45
46 fn rt() -> Arc<Runtime> {
47 tokio::runtime::Builder::new_multi_thread()
48 .worker_threads(1)
49 .enable_all()
50 .build()
51 .unwrap()
52 .into()
53 }
54 }
55
56 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
57 #[cfg(tokio_unstable)]
58 mod alt_threaded_scheduler_4_threads {
59 $($t)*
60
61 const NUM_WORKERS: usize = 4;
62
63 fn rt() -> Arc<Runtime> {
64 tokio::runtime::Builder::new_multi_thread()
65 .worker_threads(4)
66 .enable_all()
67 .build()
68 .unwrap()
69 .into()
70 }
71 }
72
73 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
74 #[cfg(tokio_unstable)]
75 mod alt_threaded_scheduler_1_thread {
76 $($t)*
77
78 const NUM_WORKERS: usize = 1;
79
80 fn rt() -> Arc<Runtime> {
81 tokio::runtime::Builder::new_multi_thread()
82 .worker_threads(1)
83 .enable_all()
84 .build()
85 .unwrap()
86 .into()
87 }
88 }
89 }
90}
91
92#[test]
93fn send_sync_bound() {
94 use tokio::runtime::Runtime;
95 fn is_send<T: Send + Sync>() {}
96
97 is_send::<Runtime>();
98}
99
100rt_test! {
101 #[cfg(not(target_os="wasi"))]
102 use tokio::net::{TcpListener, TcpStream};
103 #[cfg(not(target_os="wasi"))]
104 use tokio::io::{AsyncReadExt, AsyncWriteExt};
105
106 use tokio::runtime::Runtime;
107 use tokio::sync::oneshot;
108 use tokio::{task, time};
109
110 #[cfg(not(target_os="wasi"))]
111 use tokio_test::assert_err;
112 use tokio_test::assert_ok;
113
114 use futures::future::poll_fn;
115 use std::future::Future;
116 use std::pin::Pin;
117
118 #[cfg(not(target_os="wasi"))]
119 use std::sync::mpsc;
120
121 use std::sync::Arc;
122 use std::task::{Context, Poll};
123
124 #[cfg(not(target_os="wasi"))]
125 use std::thread;
126 use std::time::{Duration, Instant};
127
128 #[test]
129 fn block_on_sync() {
130 let rt = rt();
131
132 let mut win = false;
133 rt.block_on(async {
134 win = true;
135 });
136
137 assert!(win);
138 }
139
140
141 #[cfg(not(target_os="wasi"))]
142 #[test]
143 fn block_on_async() {
144 let rt = rt();
145
146 let out = rt.block_on(async {
147 let (tx, rx) = oneshot::channel();
148
149 thread::spawn(move || {
150 thread::sleep(Duration::from_millis(50));
151 tx.send("ZOMG").unwrap();
152 });
153
154 assert_ok!(rx.await)
155 });
156
157 assert_eq!(out, "ZOMG");
158 }
159
160 #[test]
161 fn spawn_one_bg() {
162 let rt = rt();
163
164 let out = rt.block_on(async {
165 let (tx, rx) = oneshot::channel();
166
167 tokio::spawn(async move {
168 tx.send("ZOMG").unwrap();
169 });
170
171 assert_ok!(rx.await)
172 });
173
174 assert_eq!(out, "ZOMG");
175 }
176
177 #[test]
178 fn spawn_one_join() {
179 let rt = rt();
180
181 let out = rt.block_on(async {
182 let (tx, rx) = oneshot::channel();
183
184 let handle = tokio::spawn(async move {
185 tx.send("ZOMG").unwrap();
186 "DONE"
187 });
188
189 let msg = assert_ok!(rx.await);
190
191 let out = assert_ok!(handle.await);
192 assert_eq!(out, "DONE");
193
194 msg
195 });
196
197 assert_eq!(out, "ZOMG");
198 }
199
200 #[test]
201 fn spawn_two() {
202 let rt = rt();
203
204 let out = rt.block_on(async {
205 let (tx1, rx1) = oneshot::channel();
206 let (tx2, rx2) = oneshot::channel();
207
208 tokio::spawn(async move {
209 assert_ok!(tx1.send("ZOMG"));
210 });
211
212 tokio::spawn(async move {
213 let msg = assert_ok!(rx1.await);
214 assert_ok!(tx2.send(msg));
215 });
216
217 assert_ok!(rx2.await)
218 });
219
220 assert_eq!(out, "ZOMG");
221 }
222
223 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
224 #[test]
225 fn spawn_many_from_block_on() {
226 use tokio::sync::mpsc;
227
228 const ITER: usize = 200;
229
230 let rt = rt();
231
232 let out = rt.block_on(async {
233 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
234
235 let mut txs = (0..ITER)
236 .map(|i| {
237 let (tx, rx) = oneshot::channel();
238 let done_tx = done_tx.clone();
239
240 tokio::spawn(async move {
241 let msg = assert_ok!(rx.await);
242 assert_eq!(i, msg);
243 assert_ok!(done_tx.send(msg));
244 });
245
246 tx
247 })
248 .collect::<Vec<_>>();
249
250 drop(done_tx);
251
252 thread::spawn(move || {
253 for (i, tx) in txs.drain(..).enumerate() {
254 assert_ok!(tx.send(i));
255 }
256 });
257
258 let mut out = vec![];
259 while let Some(i) = done_rx.recv().await {
260 out.push(i);
261 }
262
263 out.sort_unstable();
264 out
265 });
266
267 assert_eq!(ITER, out.len());
268
269 for i in 0..ITER {
270 assert_eq!(i, out[i]);
271 }
272 }
273
274 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
275 #[test]
276 fn spawn_many_from_task() {
277 use tokio::sync::mpsc;
278
279 const ITER: usize = 500;
280
281 let rt = rt();
282
283 let out = rt.block_on(async {
284 tokio::spawn(async move {
285 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
286
287 let mut txs = (0..ITER)
288 .map(|i| {
289 let (tx, rx) = oneshot::channel();
290 let done_tx = done_tx.clone();
291
292 tokio::spawn(async move {
293 let msg = assert_ok!(rx.await);
294 assert_eq!(i, msg);
295 assert_ok!(done_tx.send(msg));
296 });
297
298 tx
299 })
300 .collect::<Vec<_>>();
301
302 drop(done_tx);
303
304 thread::spawn(move || {
305 for (i, tx) in txs.drain(..).enumerate() {
306 assert_ok!(tx.send(i));
307 }
308 });
309
310 let mut out = vec![];
311 while let Some(i) = done_rx.recv().await {
312 out.push(i);
313 }
314
315 out.sort_unstable();
316 out
317 }).await.unwrap()
318 });
319
320 assert_eq!(ITER, out.len());
321
322 for i in 0..ITER {
323 assert_eq!(i, out[i]);
324 }
325 }
326
327 #[test]
328 fn spawn_one_from_block_on_called_on_handle() {
329 let rt = rt();
330 let (tx, rx) = oneshot::channel();
331
332 #[allow(clippy::async_yields_async)]
333 let handle = rt.handle().block_on(async {
334 tokio::spawn(async move {
335 tx.send("ZOMG").unwrap();
336 "DONE"
337 })
338 });
339
340 let out = rt.block_on(async {
341 let msg = assert_ok!(rx.await);
342
343 let out = assert_ok!(handle.await);
344 assert_eq!(out, "DONE");
345
346 msg
347 });
348
349 assert_eq!(out, "ZOMG");
350 }
351
352 #[test]
353 fn spawn_await_chain() {
354 let rt = rt();
355
356 let out = rt.block_on(async {
357 assert_ok!(tokio::spawn(async {
358 assert_ok!(tokio::spawn(async {
359 "hello"
360 }).await)
361 }).await)
362 });
363
364 assert_eq!(out, "hello");
365 }
366
367 #[test]
368 fn outstanding_tasks_dropped() {
369 let rt = rt();
370
371 let cnt = Arc::new(());
372
373 rt.block_on(async {
374 let cnt = cnt.clone();
375
376 tokio::spawn(poll_fn(move |_| {
377 assert_eq!(2, Arc::strong_count(&cnt));
378 Poll::<()>::Pending
379 }));
380 });
381
382 assert_eq!(2, Arc::strong_count(&cnt));
383
384 drop(rt);
385
386 assert_eq!(1, Arc::strong_count(&cnt));
387 }
388
389 #[test]
390 #[should_panic]
391 fn nested_rt() {
392 let rt1 = rt();
393 let rt2 = rt();
394
395 rt1.block_on(async { rt2.block_on(async { "hello" }) });
396 }
397
398 #[test]
399 fn create_rt_in_block_on() {
400 let rt1 = rt();
401 let rt2 = rt1.block_on(async { rt() });
402 let out = rt2.block_on(async { "ZOMG" });
403
404 assert_eq!(out, "ZOMG");
405 }
406
407 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
408 #[test]
409 fn complete_block_on_under_load() {
410 let rt = rt();
411
412 rt.block_on(async {
413 let (tx, rx) = oneshot::channel();
414
415 // Spin hard
416 tokio::spawn(async {
417 loop {
418 yield_once().await;
419 }
420 });
421
422 thread::spawn(move || {
423 thread::sleep(Duration::from_millis(50));
424 assert_ok!(tx.send(()));
425 });
426
427 assert_ok!(rx.await);
428 });
429 }
430
431 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
432 #[test]
433 fn complete_task_under_load() {
434 let rt = rt();
435
436 rt.block_on(async {
437 let (tx1, rx1) = oneshot::channel();
438 let (tx2, rx2) = oneshot::channel();
439
440 // Spin hard
441 tokio::spawn(async {
442 loop {
443 yield_once().await;
444 }
445 });
446
447 thread::spawn(move || {
448 thread::sleep(Duration::from_millis(50));
449 assert_ok!(tx1.send(()));
450 });
451
452 tokio::spawn(async move {
453 assert_ok!(rx1.await);
454 assert_ok!(tx2.send(()));
455 });
456
457 assert_ok!(rx2.await);
458 });
459 }
460
461 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
462 #[test]
463 fn spawn_from_other_thread_idle() {
464 let rt = rt();
465 let handle = rt.clone();
466
467 let (tx, rx) = oneshot::channel();
468
469 thread::spawn(move || {
470 thread::sleep(Duration::from_millis(50));
471
472 handle.spawn(async move {
473 assert_ok!(tx.send(()));
474 });
475 });
476
477 rt.block_on(async move {
478 assert_ok!(rx.await);
479 });
480 }
481
482 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
483 #[test]
484 fn spawn_from_other_thread_under_load() {
485 let rt = rt();
486 let handle = rt.clone();
487
488 let (tx, rx) = oneshot::channel();
489
490 thread::spawn(move || {
491 handle.spawn(async move {
492 assert_ok!(tx.send(()));
493 });
494 });
495
496 rt.block_on(async move {
497 // Spin hard
498 tokio::spawn(async {
499 loop {
500 yield_once().await;
501 }
502 });
503
504 assert_ok!(rx.await);
505 });
506 }
507
508 #[test]
509 fn sleep_at_root() {
510 let rt = rt();
511
512 let now = Instant::now();
513 let dur = Duration::from_millis(50);
514
515 rt.block_on(async move {
516 time::sleep(dur).await;
517 });
518
519 assert!(now.elapsed() >= dur);
520 }
521
522 #[test]
523 fn sleep_in_spawn() {
524 let rt = rt();
525
526 let now = Instant::now();
527 let dur = Duration::from_millis(50);
528
529 rt.block_on(async move {
530 let (tx, rx) = oneshot::channel();
531
532 tokio::spawn(async move {
533 time::sleep(dur).await;
534 assert_ok!(tx.send(()));
535 });
536
537 assert_ok!(rx.await);
538 });
539
540 assert!(now.elapsed() >= dur);
541 }
542
543 #[cfg(not(target_os="wasi"))] // Wasi does not support bind
544 #[test]
545 fn block_on_socket() {
546 let rt = rt();
547
548 rt.block_on(async move {
549 let (tx, rx) = oneshot::channel();
550
551 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
552 let addr = listener.local_addr().unwrap();
553
554 tokio::spawn(async move {
555 let _ = listener.accept().await;
556 tx.send(()).unwrap();
557 });
558
559 TcpStream::connect(&addr).await.unwrap();
560 rx.await.unwrap();
561 });
562 }
563
564 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
565 #[test]
566 fn spawn_from_blocking() {
567 let rt = rt();
568
569 let out = rt.block_on(async move {
570 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
571 tokio::spawn(async move { "hello" })
572 }).await);
573
574 assert_ok!(inner.await)
575 });
576
577 assert_eq!(out, "hello")
578 }
579
580 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
581 #[test]
582 fn spawn_blocking_from_blocking() {
583 let rt = rt();
584
585 let out = rt.block_on(async move {
586 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
587 tokio::task::spawn_blocking(|| "hello")
588 }).await);
589
590 assert_ok!(inner.await)
591 });
592
593 assert_eq!(out, "hello")
594 }
595
596 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
597 #[test]
598 fn sleep_from_blocking() {
599 let rt = rt();
600
601 rt.block_on(async move {
602 assert_ok!(tokio::task::spawn_blocking(|| {
603 let now = std::time::Instant::now();
604 let dur = Duration::from_millis(1);
605
606 // use the futures' block_on fn to make sure we aren't setting
607 // any Tokio context
608 futures::executor::block_on(async {
609 tokio::time::sleep(dur).await;
610 });
611
612 assert!(now.elapsed() >= dur);
613 }).await);
614 });
615 }
616
617 #[cfg(not(target_os="wasi"))] // Wasi does not support bind
618 #[test]
619 fn socket_from_blocking() {
620 let rt = rt();
621
622 rt.block_on(async move {
623 let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
624 let addr = assert_ok!(listener.local_addr());
625
626 let peer = tokio::task::spawn_blocking(move || {
627 // use the futures' block_on fn to make sure we aren't setting
628 // any Tokio context
629 futures::executor::block_on(async {
630 assert_ok!(TcpStream::connect(addr).await);
631 });
632 });
633
634 // Wait for the client to connect
635 let _ = assert_ok!(listener.accept().await);
636
637 assert_ok!(peer.await);
638 });
639 }
640
641 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
642 #[test]
643 fn always_active_parker() {
644 // This test it to show that we will always have
645 // an active parker even if we call block_on concurrently
646
647 let rt = rt();
648 let rt2 = rt.clone();
649
650 let (tx1, rx1) = oneshot::channel();
651 let (tx2, rx2) = oneshot::channel();
652
653 let jh1 = thread::spawn(move || {
654 rt.block_on(async move {
655 rx2.await.unwrap();
656 time::sleep(Duration::from_millis(5)).await;
657 tx1.send(()).unwrap();
658 });
659 });
660
661 let jh2 = thread::spawn(move || {
662 rt2.block_on(async move {
663 tx2.send(()).unwrap();
664 time::sleep(Duration::from_millis(5)).await;
665 rx1.await.unwrap();
666 time::sleep(Duration::from_millis(5)).await;
667 });
668 });
669
670 jh1.join().unwrap();
671 jh2.join().unwrap();
672 }
673
674 #[test]
675 // IOCP requires setting the "max thread" concurrency value. The sane,
676 // default, is to set this to the number of cores. Threads that poll I/O
677 // become associated with the IOCP handle. Once those threads sleep for any
678 // reason (mutex), they yield their ownership.
679 //
680 // This test hits an edge case on windows where more threads than cores are
681 // created, none of those threads ever yield due to being at capacity, so
682 // IOCP gets "starved".
683 //
684 // For now, this is a very edge case that is probably not a real production
685 // concern. There also isn't a great/obvious solution to take. For now, the
686 // test is disabled.
687 #[cfg(not(windows))]
688 #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads
689 fn io_driver_called_when_under_load() {
690 let rt = rt();
691
692 // Create a lot of constant load. The scheduler will always be busy.
693 for _ in 0..100 {
694 rt.spawn(async {
695 loop {
696 // Don't use Tokio's `yield_now()` to avoid special defer
697 // logic.
698 futures::future::poll_fn::<(), _>(|cx| {
699 cx.waker().wake_by_ref();
700 std::task::Poll::Pending
701 }).await;
702 }
703 });
704 }
705
706 // Do some I/O work
707 rt.block_on(async {
708 let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
709 let addr = assert_ok!(listener.local_addr());
710
711 let srv = tokio::spawn(async move {
712 let (mut stream, _) = assert_ok!(listener.accept().await);
713 assert_ok!(stream.write_all(b"hello world").await);
714 });
715
716 let cli = tokio::spawn(async move {
717 let mut stream = assert_ok!(TcpStream::connect(addr).await);
718 let mut dst = vec![0; 11];
719
720 assert_ok!(stream.read_exact(&mut dst).await);
721 assert_eq!(dst, b"hello world");
722 });
723
724 assert_ok!(srv.await);
725 assert_ok!(cli.await);
726 });
727 }
728
729 /// Tests that yielded tasks are not scheduled until **after** resource
730 /// drivers are polled.
731 ///
732 /// The OS does not guarantee when I/O events are delivered, so there may be
733 /// more yields than anticipated. This makes the test slightly flaky. To
734 /// help avoid flakiness, we run the test 10 times and only fail it after
735 /// 10 failures in a row.
736 ///
737 /// Note that if the test fails by panicking rather than by returning false,
738 /// then we fail it immediately. That kind of failure should not happen
739 /// spuriously.
740 #[test]
741 #[cfg(not(target_os="wasi"))]
742 fn yield_defers_until_park() {
743 for _ in 0..10 {
744 if yield_defers_until_park_inner() {
745 // test passed
746 return;
747 }
748
749 // Wait a bit and run the test again.
750 std::thread::sleep(std::time::Duration::from_secs(2));
751 }
752
753 panic!("yield_defers_until_park is failing consistently");
754 }
755
756 /// Implementation of `yield_defers_until_park` test. Returns `true` if the
757 /// test passed.
758 #[cfg(not(target_os="wasi"))]
759 fn yield_defers_until_park_inner() -> bool {
760 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
761 use std::sync::Barrier;
762
763 let rt = rt();
764
765 let flag = Arc::new(AtomicBool::new(false));
766 let barrier = Arc::new(Barrier::new(NUM_WORKERS));
767
768 rt.block_on(async {
769 // Make sure other workers cannot steal tasks
770 #[allow(clippy::reversed_empty_ranges)]
771 for _ in 0..(NUM_WORKERS-1) {
772 let flag = flag.clone();
773 let barrier = barrier.clone();
774
775 tokio::spawn(async move {
776 barrier.wait();
777
778 while !flag.load(SeqCst) {
779 std::thread::sleep(std::time::Duration::from_millis(1));
780 }
781 });
782 }
783
784 barrier.wait();
785
786 let (fail_test, fail_test_recv) = oneshot::channel::<()>();
787
788 let jh = tokio::spawn(async move {
789 // Create a TCP litener
790 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
791 let addr = listener.local_addr().unwrap();
792
793 tokio::join!(
794 async {
795 // Done in a blocking manner intentionally.
796 let _socket = std::net::TcpStream::connect(addr).unwrap();
797
798 // Yield until connected
799 let mut cnt = 0;
800 while !flag.load(SeqCst){
801 tokio::task::yield_now().await;
802 cnt += 1;
803
804 if cnt >= 10 {
805 // yielded too many times; report failure and
806 // sleep forever so that the `fail_test` branch
807 // of the `select!` below triggers.
808 let _ = fail_test.send(());
809 futures::future::pending::<()>().await;
810 break;
811 }
812 }
813 },
814 async {
815 let _ = listener.accept().await.unwrap();
816 flag.store(true, SeqCst);
817 }
818 );
819 });
820
821 // Wait until the spawned task completes or fails. If no message is
822 // sent on `fail_test`, then the test succeeds. Otherwise, it fails.
823 let success = fail_test_recv.await.is_err();
824
825 if success {
826 // Check for panics in spawned task.
827 jh.abort();
828 jh.await.unwrap();
829 }
830
831 success
832 })
833 }
834
835 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
836 #[test]
837 fn client_server_block_on() {
838 let rt = rt();
839 let (tx, rx) = mpsc::channel();
840
841 rt.block_on(async move { client_server(tx).await });
842
843 assert_ok!(rx.try_recv());
844 assert_err!(rx.try_recv());
845 }
846
847 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads or panic recovery")]
848 #[cfg(panic = "unwind")]
849 #[test]
850 fn panic_in_task() {
851 let rt = rt();
852 let (tx, rx) = oneshot::channel();
853
854 struct Boom(Option<oneshot::Sender<()>>);
855
856 impl Future for Boom {
857 type Output = ();
858
859 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
860 panic!();
861 }
862 }
863
864 impl Drop for Boom {
865 fn drop(&mut self) {
866 assert!(std::thread::panicking());
867 self.0.take().unwrap().send(()).unwrap();
868 }
869 }
870
871 rt.spawn(Boom(Some(tx)));
872 assert_ok!(rt.block_on(rx));
873 }
874
875 #[test]
876 #[should_panic]
877 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
878 fn panic_in_block_on() {
879 let rt = rt();
880 rt.block_on(async { panic!() });
881 }
882
883 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
884 async fn yield_once() {
885 let mut yielded = false;
886 poll_fn(|cx| {
887 if yielded {
888 Poll::Ready(())
889 } else {
890 yielded = true;
891 cx.waker().wake_by_ref();
892 Poll::Pending
893 }
894 })
895 .await
896 }
897
898 #[test]
899 fn enter_and_spawn() {
900 let rt = rt();
901 let handle = {
902 let _enter = rt.enter();
903 tokio::spawn(async {})
904 };
905
906 assert_ok!(rt.block_on(handle));
907 }
908
909 #[test]
910 fn eagerly_drops_futures_on_shutdown() {
911 use std::sync::mpsc;
912
913 struct Never {
914 drop_tx: mpsc::Sender<()>,
915 }
916
917 impl Future for Never {
918 type Output = ();
919
920 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
921 Poll::Pending
922 }
923 }
924
925 impl Drop for Never {
926 fn drop(&mut self) {
927 self.drop_tx.send(()).unwrap();
928 }
929 }
930
931 let rt = rt();
932
933 let (drop_tx, drop_rx) = mpsc::channel();
934 let (run_tx, run_rx) = oneshot::channel();
935
936 rt.block_on(async move {
937 tokio::spawn(async move {
938 assert_ok!(run_tx.send(()));
939
940 Never { drop_tx }.await
941 });
942
943 assert_ok!(run_rx.await);
944 });
945
946 drop(rt);
947
948 assert_ok!(drop_rx.recv());
949 }
950
951 #[test]
952 fn wake_while_rt_is_dropping() {
953 use tokio::sync::Barrier;
954
955 struct OnDrop<F: FnMut()>(F);
956
957 impl<F: FnMut()> Drop for OnDrop<F> {
958 fn drop(&mut self) {
959 (self.0)()
960 }
961 }
962
963 let (tx1, rx1) = oneshot::channel();
964 let (tx2, rx2) = oneshot::channel();
965
966 let barrier = Arc::new(Barrier::new(3));
967 let barrier1 = barrier.clone();
968 let barrier2 = barrier.clone();
969
970 let rt = rt();
971
972 rt.spawn(async move {
973 let mut tx2 = Some(tx2);
974 let _d = OnDrop(move || {
975 let _ = tx2.take().unwrap().send(());
976 });
977
978 // Ensure a waker gets stored in oneshot 1.
979 let _ = tokio::join!(rx1, barrier1.wait());
980 });
981
982 rt.spawn(async move {
983 let mut tx1 = Some(tx1);
984 let _d = OnDrop(move || {
985 let _ = tx1.take().unwrap().send(());
986 });
987
988 // Ensure a waker gets stored in oneshot 2.
989 let _ = tokio::join!(rx2, barrier2.wait());
990 });
991
992 // Wait until every oneshot channel has been polled.
993 rt.block_on(barrier.wait());
994
995 // Drop the rt. Regardless of which task is dropped first, its destructor will wake the
996 // other task.
997 drop(rt);
998 }
999
1000 #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
1001 #[test]
1002 fn io_notify_while_shutting_down() {
1003 use tokio::net::UdpSocket;
1004 use std::sync::Arc;
1005
1006 for _ in 1..10 {
1007 let runtime = rt();
1008
1009 runtime.block_on(async {
1010 let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
1011 let addr = socket.local_addr().unwrap();
1012 let send_half = Arc::new(socket);
1013 let recv_half = send_half.clone();
1014
1015 tokio::spawn(async move {
1016 let mut buf = [0];
1017 loop {
1018 recv_half.recv_from(&mut buf).await.unwrap();
1019 std::thread::sleep(Duration::from_millis(2));
1020 }
1021 });
1022
1023 tokio::spawn(async move {
1024 let buf = [0];
1025 loop {
1026 send_half.send_to(&buf, &addr).await.unwrap();
1027 tokio::time::sleep(Duration::from_millis(1)).await;
1028 }
1029 });
1030
1031 tokio::time::sleep(Duration::from_millis(5)).await;
1032 });
1033 }
1034 }
1035
1036 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
1037 #[test]
1038 fn shutdown_timeout() {
1039 let (tx, rx) = oneshot::channel();
1040 let runtime = rt();
1041
1042 runtime.block_on(async move {
1043 task::spawn_blocking(move || {
1044 tx.send(()).unwrap();
1045 thread::sleep(Duration::from_secs(10_000));
1046 });
1047
1048 rx.await.unwrap();
1049 });
1050
1051 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
1052 }
1053
1054 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
1055 #[test]
1056 fn shutdown_timeout_0() {
1057 let runtime = rt();
1058
1059 runtime.block_on(async move {
1060 task::spawn_blocking(move || {
1061 thread::sleep(Duration::from_secs(10_000));
1062 });
1063 });
1064
1065 let now = Instant::now();
1066 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
1067 assert!(now.elapsed().as_secs() < 1);
1068 }
1069
1070 #[test]
1071 fn shutdown_wakeup_time() {
1072 let runtime = rt();
1073
1074 runtime.block_on(async move {
1075 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1076 });
1077
1078 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
1079 }
1080
1081 // This test is currently ignored on Windows because of a
1082 // rust-lang issue in thread local storage destructors.
1083 // See https://github.com/rust-lang/rust/issues/74875
1084 #[test]
1085 #[cfg(not(windows))]
1086 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads")]
1087 fn runtime_in_thread_local() {
1088 use std::cell::RefCell;
1089 use std::thread;
1090
1091 thread_local!(
1092 static R: RefCell<Option<Runtime>> = RefCell::new(None);
1093 );
1094
1095 thread::spawn(|| {
1096 R.with(|cell| {
1097 let rt = rt();
1098 let rt = Arc::try_unwrap(rt).unwrap();
1099 *cell.borrow_mut() = Some(rt);
1100 });
1101
1102 let _rt = rt();
1103 }).join().unwrap();
1104 }
1105
1106 #[cfg(not(target_os="wasi"))] // Wasi does not support bind
1107 async fn client_server(tx: mpsc::Sender<()>) {
1108 let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1109
1110 // Get the assigned address
1111 let addr = assert_ok!(server.local_addr());
1112
1113 // Spawn the server
1114 tokio::spawn(async move {
1115 // Accept a socket
1116 let (mut socket, _) = server.accept().await.unwrap();
1117
1118 // Write some data
1119 socket.write_all(b"hello").await.unwrap();
1120 });
1121
1122 let mut client = TcpStream::connect(&addr).await.unwrap();
1123
1124 let mut buf = vec![];
1125 client.read_to_end(&mut buf).await.unwrap();
1126
1127 assert_eq!(buf, b"hello");
1128 tx.send(()).unwrap();
1129 }
1130
1131 #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1132 #[test]
1133 fn local_set_block_on_socket() {
1134 let rt = rt();
1135 let local = task::LocalSet::new();
1136
1137 local.block_on(&rt, async move {
1138 let (tx, rx) = oneshot::channel();
1139
1140 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1141 let addr = listener.local_addr().unwrap();
1142
1143 task::spawn_local(async move {
1144 let _ = listener.accept().await;
1145 tx.send(()).unwrap();
1146 });
1147
1148 TcpStream::connect(&addr).await.unwrap();
1149 rx.await.unwrap();
1150 });
1151 }
1152
1153 #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1154 #[test]
1155 fn local_set_client_server_block_on() {
1156 let rt = rt();
1157 let (tx, rx) = mpsc::channel();
1158
1159 let local = task::LocalSet::new();
1160
1161 local.block_on(&rt, async move { client_server_local(tx).await });
1162
1163 assert_ok!(rx.try_recv());
1164 assert_err!(rx.try_recv());
1165 }
1166
1167 #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1168 async fn client_server_local(tx: mpsc::Sender<()>) {
1169 let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1170
1171 // Get the assigned address
1172 let addr = assert_ok!(server.local_addr());
1173
1174 // Spawn the server
1175 task::spawn_local(async move {
1176 // Accept a socket
1177 let (mut socket, _) = server.accept().await.unwrap();
1178
1179 // Write some data
1180 socket.write_all(b"hello").await.unwrap();
1181 });
1182
1183 let mut client = TcpStream::connect(&addr).await.unwrap();
1184
1185 let mut buf = vec![];
1186 client.read_to_end(&mut buf).await.unwrap();
1187
1188 assert_eq!(buf, b"hello");
1189 tx.send(()).unwrap();
1190 }
1191
1192 #[test]
1193 fn coop() {
1194 use std::task::Poll::Ready;
1195 use tokio::sync::mpsc;
1196
1197 let rt = rt();
1198
1199 rt.block_on(async {
1200 let (send, mut recv) = mpsc::unbounded_channel();
1201
1202 // Send a bunch of messages.
1203 for _ in 0..1_000 {
1204 send.send(()).unwrap();
1205 }
1206
1207 poll_fn(|cx| {
1208 // At least one response should return pending.
1209 for _ in 0..1_000 {
1210 if recv.poll_recv(cx).is_pending() {
1211 return Ready(());
1212 }
1213 }
1214
1215 panic!("did not yield");
1216 }).await;
1217 });
1218 }
1219
1220 #[test]
1221 fn coop_unconstrained() {
1222 use std::task::Poll::Ready;
1223 use tokio::sync::mpsc;
1224
1225 let rt = rt();
1226
1227 rt.block_on(async {
1228 let (send, mut recv) = mpsc::unbounded_channel();
1229
1230 // Send a bunch of messages.
1231 for _ in 0..1_000 {
1232 send.send(()).unwrap();
1233 }
1234
1235 tokio::task::unconstrained(poll_fn(|cx| {
1236 // All the responses should be ready.
1237 for _ in 0..1_000 {
1238 assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(())));
1239 }
1240
1241 Ready(())
1242 })).await;
1243 });
1244 }
1245
1246 #[cfg(tokio_unstable)]
1247 #[test]
1248 fn coop_consume_budget() {
1249 let rt = rt();
1250
1251 rt.block_on(async {
1252 poll_fn(|cx| {
1253 let counter = Arc::new(std::sync::Mutex::new(0));
1254 let counter_clone = Arc::clone(&counter);
1255 let mut worker = Box::pin(async move {
1256 // Consume the budget until a yield happens
1257 for _ in 0..1000 {
1258 *counter.lock().unwrap() += 1;
1259 task::consume_budget().await
1260 }
1261 });
1262 // Assert that the worker was yielded and it didn't manage
1263 // to finish the whole work (assuming the total budget of 128)
1264 assert!(Pin::new(&mut worker).poll(cx).is_pending());
1265 assert!(*counter_clone.lock().unwrap() < 1000);
1266 std::task::Poll::Ready(())
1267 }).await;
1268 });
1269 }
1270
1271 // Tests that the "next task" scheduler optimization is not able to starve
1272 // other tasks.
1273 #[test]
1274 fn ping_pong_saturation() {
1275 use std::sync::atomic::{Ordering, AtomicBool};
1276 use tokio::sync::mpsc;
1277
1278 const NUM: usize = 100;
1279
1280 let rt = rt();
1281
1282 let running = Arc::new(AtomicBool::new(true));
1283
1284 rt.block_on(async {
1285 let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1286
1287 let mut tasks = vec![];
1288 // Spawn a bunch of tasks that ping-pong between each other to
1289 // saturate the runtime.
1290 for _ in 0..NUM {
1291 let (tx1, mut rx1) = mpsc::unbounded_channel();
1292 let (tx2, mut rx2) = mpsc::unbounded_channel();
1293 let spawned_tx = spawned_tx.clone();
1294 let running = running.clone();
1295 tasks.push(task::spawn(async move {
1296 spawned_tx.send(()).unwrap();
1297
1298
1299 while running.load(Ordering::Relaxed) {
1300 tx1.send(()).unwrap();
1301 rx2.recv().await.unwrap();
1302 }
1303
1304 // Close the channel and wait for the other task to exit.
1305 drop(tx1);
1306 assert!(rx2.recv().await.is_none());
1307 }));
1308
1309 tasks.push(task::spawn(async move {
1310 while rx1.recv().await.is_some() {
1311 tx2.send(()).unwrap();
1312 }
1313 }));
1314 }
1315
1316 for _ in 0..NUM {
1317 spawned_rx.recv().await.unwrap();
1318 }
1319
1320 // spawn another task and wait for it to complete
1321 let handle = task::spawn(async {
1322 for _ in 0..5 {
1323 // Yielding forces it back into the local queue.
1324 task::yield_now().await;
1325 }
1326 });
1327 handle.await.unwrap();
1328 running.store(false, Ordering::Relaxed);
1329 for t in tasks {
1330 t.await.unwrap();
1331 }
1332 });
1333 }
1334
1335 #[test]
1336 #[cfg(not(target_os="wasi"))]
1337 fn shutdown_concurrent_spawn() {
1338 const NUM_TASKS: usize = 10_000;
1339 for _ in 0..5 {
1340 let (tx, rx) = std::sync::mpsc::channel();
1341 let rt = rt();
1342
1343 let mut txs = vec![];
1344
1345 for _ in 0..NUM_TASKS {
1346 let (tx, rx) = tokio::sync::oneshot::channel();
1347 txs.push(tx);
1348 rt.spawn(async move {
1349 rx.await.unwrap();
1350 });
1351 }
1352
1353 // Prime the tasks
1354 rt.block_on(async { tokio::task::yield_now().await });
1355
1356 let th = std::thread::spawn(move || {
1357 tx.send(()).unwrap();
1358 for tx in txs.drain(..) {
1359 let _ = tx.send(());
1360 }
1361 });
1362
1363 rx.recv().unwrap();
1364 drop(rt);
1365
1366 th.join().unwrap();
1367 }
1368 }
1369
1370 #[test]
1371 #[cfg_attr(target_family = "wasm", ignore)]
1372 fn wake_by_ref_from_thread_local() {
1373 wake_from_thread_local(true);
1374 }
1375
1376 #[test]
1377 #[cfg_attr(target_family = "wasm", ignore)]
1378 fn wake_by_val_from_thread_local() {
1379 wake_from_thread_local(false);
1380 }
1381
1382 fn wake_from_thread_local(by_ref: bool) {
1383 use std::cell::RefCell;
1384 use std::sync::mpsc::{channel, Sender};
1385 use std::task::Waker;
1386
1387 struct TLData {
1388 by_ref: bool,
1389 waker: Option<Waker>,
1390 done: Sender<bool>,
1391 }
1392
1393 impl Drop for TLData {
1394 fn drop(&mut self) {
1395 if self.by_ref {
1396 self.waker.take().unwrap().wake_by_ref();
1397 } else {
1398 self.waker.take().unwrap().wake();
1399 }
1400 let _ = self.done.send(true);
1401 }
1402 }
1403
1404 std::thread_local! {
1405 static TL_DATA: RefCell<Option<TLData>> = RefCell::new(None);
1406 };
1407
1408 let (send, recv) = channel();
1409
1410 std::thread::spawn(move || {
1411 let rt = rt();
1412 rt.block_on(rt.spawn(poll_fn(move |cx| {
1413 let waker = cx.waker().clone();
1414 let send = send.clone();
1415 TL_DATA.with(|tl| {
1416 tl.replace(Some(TLData {
1417 by_ref,
1418 waker: Some(waker),
1419 done: send,
1420 }));
1421 });
1422 Poll::Ready(())
1423 })))
1424 .unwrap();
1425 })
1426 .join()
1427 .unwrap();
1428
1429 assert!(recv.recv().unwrap());
1430 }
1431}
1432