1 | #![allow (clippy::needless_range_loop)] |
2 | #![warn (rust_2018_idioms)] |
3 | #![cfg (feature = "full" )] |
4 | |
5 | // Tests to run on both current-thread & multi-thread runtime variants. |
6 | |
7 | macro_rules! rt_test { |
8 | ($($t:tt)*) => { |
9 | mod current_thread_scheduler { |
10 | $($t)* |
11 | |
12 | #[cfg(not(target_os="wasi" ))] |
13 | const NUM_WORKERS: usize = 1; |
14 | |
15 | fn rt() -> Arc<Runtime> { |
16 | tokio::runtime::Builder::new_current_thread() |
17 | .enable_all() |
18 | .build() |
19 | .unwrap() |
20 | .into() |
21 | } |
22 | } |
23 | |
24 | #[cfg(not(target_os = "wasi" ))] // Wasi doesn't support threads |
25 | mod threaded_scheduler_4_threads { |
26 | $($t)* |
27 | |
28 | const NUM_WORKERS: usize = 4; |
29 | |
30 | fn rt() -> Arc<Runtime> { |
31 | tokio::runtime::Builder::new_multi_thread() |
32 | .worker_threads(4) |
33 | .enable_all() |
34 | .build() |
35 | .unwrap() |
36 | .into() |
37 | } |
38 | } |
39 | |
40 | #[cfg(not(target_os = "wasi" ))] // Wasi doesn't support threads |
41 | mod threaded_scheduler_1_thread { |
42 | $($t)* |
43 | |
44 | const NUM_WORKERS: usize = 1; |
45 | |
46 | fn rt() -> Arc<Runtime> { |
47 | tokio::runtime::Builder::new_multi_thread() |
48 | .worker_threads(1) |
49 | .enable_all() |
50 | .build() |
51 | .unwrap() |
52 | .into() |
53 | } |
54 | } |
55 | |
56 | #[cfg(not(target_os = "wasi" ))] // Wasi doesn't support threads |
57 | #[cfg(tokio_unstable)] |
58 | mod alt_threaded_scheduler_4_threads { |
59 | $($t)* |
60 | |
61 | const NUM_WORKERS: usize = 4; |
62 | |
63 | fn rt() -> Arc<Runtime> { |
64 | tokio::runtime::Builder::new_multi_thread() |
65 | .worker_threads(4) |
66 | .enable_all() |
67 | .build() |
68 | .unwrap() |
69 | .into() |
70 | } |
71 | } |
72 | |
73 | #[cfg(not(target_os = "wasi" ))] // Wasi doesn't support threads |
74 | #[cfg(tokio_unstable)] |
75 | mod alt_threaded_scheduler_1_thread { |
76 | $($t)* |
77 | |
78 | const NUM_WORKERS: usize = 1; |
79 | |
80 | fn rt() -> Arc<Runtime> { |
81 | tokio::runtime::Builder::new_multi_thread() |
82 | .worker_threads(1) |
83 | .enable_all() |
84 | .build() |
85 | .unwrap() |
86 | .into() |
87 | } |
88 | } |
89 | } |
90 | } |
91 | |
92 | #[test] |
93 | fn send_sync_bound() { |
94 | use tokio::runtime::Runtime; |
95 | fn is_send<T: Send + Sync>() {} |
96 | |
97 | is_send::<Runtime>(); |
98 | } |
99 | |
100 | rt_test! { |
101 | #[cfg (not(target_os="wasi" ))] |
102 | use tokio::net::{TcpListener, TcpStream}; |
103 | #[cfg (not(target_os="wasi" ))] |
104 | use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
105 | |
106 | use tokio::runtime::Runtime; |
107 | use tokio::sync::oneshot; |
108 | use tokio::{task, time}; |
109 | |
110 | #[cfg (not(target_os="wasi" ))] |
111 | use tokio_test::assert_err; |
112 | use tokio_test::assert_ok; |
113 | |
114 | use futures::future::poll_fn; |
115 | use std::future::Future; |
116 | use std::pin::Pin; |
117 | |
118 | #[cfg (not(target_os="wasi" ))] |
119 | use std::sync::mpsc; |
120 | |
121 | use std::sync::Arc; |
122 | use std::task::{Context, Poll}; |
123 | |
124 | #[cfg (not(target_os="wasi" ))] |
125 | use std::thread; |
126 | use std::time::{Duration, Instant}; |
127 | |
128 | #[test] |
129 | fn block_on_sync() { |
130 | let rt = rt(); |
131 | |
132 | let mut win = false; |
133 | rt.block_on(async { |
134 | win = true; |
135 | }); |
136 | |
137 | assert!(win); |
138 | } |
139 | |
140 | |
141 | #[cfg (not(target_os="wasi" ))] |
142 | #[test] |
143 | fn block_on_async() { |
144 | let rt = rt(); |
145 | |
146 | let out = rt.block_on(async { |
147 | let (tx, rx) = oneshot::channel(); |
148 | |
149 | thread::spawn(move || { |
150 | thread::sleep(Duration::from_millis(50)); |
151 | tx.send("ZOMG" ).unwrap(); |
152 | }); |
153 | |
154 | assert_ok!(rx.await) |
155 | }); |
156 | |
157 | assert_eq!(out, "ZOMG" ); |
158 | } |
159 | |
160 | #[test] |
161 | fn spawn_one_bg() { |
162 | let rt = rt(); |
163 | |
164 | let out = rt.block_on(async { |
165 | let (tx, rx) = oneshot::channel(); |
166 | |
167 | tokio::spawn(async move { |
168 | tx.send("ZOMG" ).unwrap(); |
169 | }); |
170 | |
171 | assert_ok!(rx.await) |
172 | }); |
173 | |
174 | assert_eq!(out, "ZOMG" ); |
175 | } |
176 | |
177 | #[test] |
178 | fn spawn_one_join() { |
179 | let rt = rt(); |
180 | |
181 | let out = rt.block_on(async { |
182 | let (tx, rx) = oneshot::channel(); |
183 | |
184 | let handle = tokio::spawn(async move { |
185 | tx.send("ZOMG" ).unwrap(); |
186 | "DONE" |
187 | }); |
188 | |
189 | let msg = assert_ok!(rx.await); |
190 | |
191 | let out = assert_ok!(handle.await); |
192 | assert_eq!(out, "DONE" ); |
193 | |
194 | msg |
195 | }); |
196 | |
197 | assert_eq!(out, "ZOMG" ); |
198 | } |
199 | |
200 | #[test] |
201 | fn spawn_two() { |
202 | let rt = rt(); |
203 | |
204 | let out = rt.block_on(async { |
205 | let (tx1, rx1) = oneshot::channel(); |
206 | let (tx2, rx2) = oneshot::channel(); |
207 | |
208 | tokio::spawn(async move { |
209 | assert_ok!(tx1.send("ZOMG" )); |
210 | }); |
211 | |
212 | tokio::spawn(async move { |
213 | let msg = assert_ok!(rx1.await); |
214 | assert_ok!(tx2.send(msg)); |
215 | }); |
216 | |
217 | assert_ok!(rx2.await) |
218 | }); |
219 | |
220 | assert_eq!(out, "ZOMG" ); |
221 | } |
222 | |
223 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
224 | #[test] |
225 | fn spawn_many_from_block_on() { |
226 | use tokio::sync::mpsc; |
227 | |
228 | const ITER: usize = 200; |
229 | |
230 | let rt = rt(); |
231 | |
232 | let out = rt.block_on(async { |
233 | let (done_tx, mut done_rx) = mpsc::unbounded_channel(); |
234 | |
235 | let mut txs = (0..ITER) |
236 | .map(|i| { |
237 | let (tx, rx) = oneshot::channel(); |
238 | let done_tx = done_tx.clone(); |
239 | |
240 | tokio::spawn(async move { |
241 | let msg = assert_ok!(rx.await); |
242 | assert_eq!(i, msg); |
243 | assert_ok!(done_tx.send(msg)); |
244 | }); |
245 | |
246 | tx |
247 | }) |
248 | .collect::<Vec<_>>(); |
249 | |
250 | drop(done_tx); |
251 | |
252 | thread::spawn(move || { |
253 | for (i, tx) in txs.drain(..).enumerate() { |
254 | assert_ok!(tx.send(i)); |
255 | } |
256 | }); |
257 | |
258 | let mut out = vec![]; |
259 | while let Some(i) = done_rx.recv().await { |
260 | out.push(i); |
261 | } |
262 | |
263 | out.sort_unstable(); |
264 | out |
265 | }); |
266 | |
267 | assert_eq!(ITER, out.len()); |
268 | |
269 | for i in 0..ITER { |
270 | assert_eq!(i, out[i]); |
271 | } |
272 | } |
273 | |
274 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
275 | #[test] |
276 | fn spawn_many_from_task() { |
277 | use tokio::sync::mpsc; |
278 | |
279 | const ITER: usize = 500; |
280 | |
281 | let rt = rt(); |
282 | |
283 | let out = rt.block_on(async { |
284 | tokio::spawn(async move { |
285 | let (done_tx, mut done_rx) = mpsc::unbounded_channel(); |
286 | |
287 | let mut txs = (0..ITER) |
288 | .map(|i| { |
289 | let (tx, rx) = oneshot::channel(); |
290 | let done_tx = done_tx.clone(); |
291 | |
292 | tokio::spawn(async move { |
293 | let msg = assert_ok!(rx.await); |
294 | assert_eq!(i, msg); |
295 | assert_ok!(done_tx.send(msg)); |
296 | }); |
297 | |
298 | tx |
299 | }) |
300 | .collect::<Vec<_>>(); |
301 | |
302 | drop(done_tx); |
303 | |
304 | thread::spawn(move || { |
305 | for (i, tx) in txs.drain(..).enumerate() { |
306 | assert_ok!(tx.send(i)); |
307 | } |
308 | }); |
309 | |
310 | let mut out = vec![]; |
311 | while let Some(i) = done_rx.recv().await { |
312 | out.push(i); |
313 | } |
314 | |
315 | out.sort_unstable(); |
316 | out |
317 | }).await.unwrap() |
318 | }); |
319 | |
320 | assert_eq!(ITER, out.len()); |
321 | |
322 | for i in 0..ITER { |
323 | assert_eq!(i, out[i]); |
324 | } |
325 | } |
326 | |
327 | #[test] |
328 | fn spawn_one_from_block_on_called_on_handle() { |
329 | let rt = rt(); |
330 | let (tx, rx) = oneshot::channel(); |
331 | |
332 | #[allow (clippy::async_yields_async)] |
333 | let handle = rt.handle().block_on(async { |
334 | tokio::spawn(async move { |
335 | tx.send("ZOMG" ).unwrap(); |
336 | "DONE" |
337 | }) |
338 | }); |
339 | |
340 | let out = rt.block_on(async { |
341 | let msg = assert_ok!(rx.await); |
342 | |
343 | let out = assert_ok!(handle.await); |
344 | assert_eq!(out, "DONE" ); |
345 | |
346 | msg |
347 | }); |
348 | |
349 | assert_eq!(out, "ZOMG" ); |
350 | } |
351 | |
352 | #[test] |
353 | fn spawn_await_chain() { |
354 | let rt = rt(); |
355 | |
356 | let out = rt.block_on(async { |
357 | assert_ok!(tokio::spawn(async { |
358 | assert_ok!(tokio::spawn(async { |
359 | "hello" |
360 | }).await) |
361 | }).await) |
362 | }); |
363 | |
364 | assert_eq!(out, "hello" ); |
365 | } |
366 | |
367 | #[test] |
368 | fn outstanding_tasks_dropped() { |
369 | let rt = rt(); |
370 | |
371 | let cnt = Arc::new(()); |
372 | |
373 | rt.block_on(async { |
374 | let cnt = cnt.clone(); |
375 | |
376 | tokio::spawn(poll_fn(move |_| { |
377 | assert_eq!(2, Arc::strong_count(&cnt)); |
378 | Poll::<()>::Pending |
379 | })); |
380 | }); |
381 | |
382 | assert_eq!(2, Arc::strong_count(&cnt)); |
383 | |
384 | drop(rt); |
385 | |
386 | assert_eq!(1, Arc::strong_count(&cnt)); |
387 | } |
388 | |
389 | #[test] |
390 | #[should_panic ] |
391 | fn nested_rt() { |
392 | let rt1 = rt(); |
393 | let rt2 = rt(); |
394 | |
395 | rt1.block_on(async { rt2.block_on(async { "hello" }) }); |
396 | } |
397 | |
398 | #[test] |
399 | fn create_rt_in_block_on() { |
400 | let rt1 = rt(); |
401 | let rt2 = rt1.block_on(async { rt() }); |
402 | let out = rt2.block_on(async { "ZOMG" }); |
403 | |
404 | assert_eq!(out, "ZOMG" ); |
405 | } |
406 | |
407 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
408 | #[test] |
409 | fn complete_block_on_under_load() { |
410 | let rt = rt(); |
411 | |
412 | rt.block_on(async { |
413 | let (tx, rx) = oneshot::channel(); |
414 | |
415 | // Spin hard |
416 | tokio::spawn(async { |
417 | loop { |
418 | yield_once().await; |
419 | } |
420 | }); |
421 | |
422 | thread::spawn(move || { |
423 | thread::sleep(Duration::from_millis(50)); |
424 | assert_ok!(tx.send(())); |
425 | }); |
426 | |
427 | assert_ok!(rx.await); |
428 | }); |
429 | } |
430 | |
431 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
432 | #[test] |
433 | fn complete_task_under_load() { |
434 | let rt = rt(); |
435 | |
436 | rt.block_on(async { |
437 | let (tx1, rx1) = oneshot::channel(); |
438 | let (tx2, rx2) = oneshot::channel(); |
439 | |
440 | // Spin hard |
441 | tokio::spawn(async { |
442 | loop { |
443 | yield_once().await; |
444 | } |
445 | }); |
446 | |
447 | thread::spawn(move || { |
448 | thread::sleep(Duration::from_millis(50)); |
449 | assert_ok!(tx1.send(())); |
450 | }); |
451 | |
452 | tokio::spawn(async move { |
453 | assert_ok!(rx1.await); |
454 | assert_ok!(tx2.send(())); |
455 | }); |
456 | |
457 | assert_ok!(rx2.await); |
458 | }); |
459 | } |
460 | |
461 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
462 | #[test] |
463 | fn spawn_from_other_thread_idle() { |
464 | let rt = rt(); |
465 | let handle = rt.clone(); |
466 | |
467 | let (tx, rx) = oneshot::channel(); |
468 | |
469 | thread::spawn(move || { |
470 | thread::sleep(Duration::from_millis(50)); |
471 | |
472 | handle.spawn(async move { |
473 | assert_ok!(tx.send(())); |
474 | }); |
475 | }); |
476 | |
477 | rt.block_on(async move { |
478 | assert_ok!(rx.await); |
479 | }); |
480 | } |
481 | |
482 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
483 | #[test] |
484 | fn spawn_from_other_thread_under_load() { |
485 | let rt = rt(); |
486 | let handle = rt.clone(); |
487 | |
488 | let (tx, rx) = oneshot::channel(); |
489 | |
490 | thread::spawn(move || { |
491 | handle.spawn(async move { |
492 | assert_ok!(tx.send(())); |
493 | }); |
494 | }); |
495 | |
496 | rt.block_on(async move { |
497 | // Spin hard |
498 | tokio::spawn(async { |
499 | loop { |
500 | yield_once().await; |
501 | } |
502 | }); |
503 | |
504 | assert_ok!(rx.await); |
505 | }); |
506 | } |
507 | |
508 | #[test] |
509 | fn sleep_at_root() { |
510 | let rt = rt(); |
511 | |
512 | let now = Instant::now(); |
513 | let dur = Duration::from_millis(50); |
514 | |
515 | rt.block_on(async move { |
516 | time::sleep(dur).await; |
517 | }); |
518 | |
519 | assert!(now.elapsed() >= dur); |
520 | } |
521 | |
522 | #[test] |
523 | fn sleep_in_spawn() { |
524 | let rt = rt(); |
525 | |
526 | let now = Instant::now(); |
527 | let dur = Duration::from_millis(50); |
528 | |
529 | rt.block_on(async move { |
530 | let (tx, rx) = oneshot::channel(); |
531 | |
532 | tokio::spawn(async move { |
533 | time::sleep(dur).await; |
534 | assert_ok!(tx.send(())); |
535 | }); |
536 | |
537 | assert_ok!(rx.await); |
538 | }); |
539 | |
540 | assert!(now.elapsed() >= dur); |
541 | } |
542 | |
543 | #[cfg (not(target_os="wasi" ))] // Wasi does not support bind |
544 | #[test] |
545 | fn block_on_socket() { |
546 | let rt = rt(); |
547 | |
548 | rt.block_on(async move { |
549 | let (tx, rx) = oneshot::channel(); |
550 | |
551 | let listener = TcpListener::bind("127.0.0.1:0" ).await.unwrap(); |
552 | let addr = listener.local_addr().unwrap(); |
553 | |
554 | tokio::spawn(async move { |
555 | let _ = listener.accept().await; |
556 | tx.send(()).unwrap(); |
557 | }); |
558 | |
559 | TcpStream::connect(&addr).await.unwrap(); |
560 | rx.await.unwrap(); |
561 | }); |
562 | } |
563 | |
564 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
565 | #[test] |
566 | fn spawn_from_blocking() { |
567 | let rt = rt(); |
568 | |
569 | let out = rt.block_on(async move { |
570 | let inner = assert_ok!(tokio::task::spawn_blocking(|| { |
571 | tokio::spawn(async move { "hello" }) |
572 | }).await); |
573 | |
574 | assert_ok!(inner.await) |
575 | }); |
576 | |
577 | assert_eq!(out, "hello" ) |
578 | } |
579 | |
580 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
581 | #[test] |
582 | fn spawn_blocking_from_blocking() { |
583 | let rt = rt(); |
584 | |
585 | let out = rt.block_on(async move { |
586 | let inner = assert_ok!(tokio::task::spawn_blocking(|| { |
587 | tokio::task::spawn_blocking(|| "hello" ) |
588 | }).await); |
589 | |
590 | assert_ok!(inner.await) |
591 | }); |
592 | |
593 | assert_eq!(out, "hello" ) |
594 | } |
595 | |
596 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
597 | #[test] |
598 | fn sleep_from_blocking() { |
599 | let rt = rt(); |
600 | |
601 | rt.block_on(async move { |
602 | assert_ok!(tokio::task::spawn_blocking(|| { |
603 | let now = std::time::Instant::now(); |
604 | let dur = Duration::from_millis(1); |
605 | |
606 | // use the futures' block_on fn to make sure we aren't setting |
607 | // any Tokio context |
608 | futures::executor::block_on(async { |
609 | tokio::time::sleep(dur).await; |
610 | }); |
611 | |
612 | assert!(now.elapsed() >= dur); |
613 | }).await); |
614 | }); |
615 | } |
616 | |
617 | #[cfg (not(target_os="wasi" ))] // Wasi does not support bind |
618 | #[test] |
619 | fn socket_from_blocking() { |
620 | let rt = rt(); |
621 | |
622 | rt.block_on(async move { |
623 | let listener = assert_ok!(TcpListener::bind("127.0.0.1:0" ).await); |
624 | let addr = assert_ok!(listener.local_addr()); |
625 | |
626 | let peer = tokio::task::spawn_blocking(move || { |
627 | // use the futures' block_on fn to make sure we aren't setting |
628 | // any Tokio context |
629 | futures::executor::block_on(async { |
630 | assert_ok!(TcpStream::connect(addr).await); |
631 | }); |
632 | }); |
633 | |
634 | // Wait for the client to connect |
635 | let _ = assert_ok!(listener.accept().await); |
636 | |
637 | assert_ok!(peer.await); |
638 | }); |
639 | } |
640 | |
641 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
642 | #[test] |
643 | fn always_active_parker() { |
644 | // This test it to show that we will always have |
645 | // an active parker even if we call block_on concurrently |
646 | |
647 | let rt = rt(); |
648 | let rt2 = rt.clone(); |
649 | |
650 | let (tx1, rx1) = oneshot::channel(); |
651 | let (tx2, rx2) = oneshot::channel(); |
652 | |
653 | let jh1 = thread::spawn(move || { |
654 | rt.block_on(async move { |
655 | rx2.await.unwrap(); |
656 | time::sleep(Duration::from_millis(5)).await; |
657 | tx1.send(()).unwrap(); |
658 | }); |
659 | }); |
660 | |
661 | let jh2 = thread::spawn(move || { |
662 | rt2.block_on(async move { |
663 | tx2.send(()).unwrap(); |
664 | time::sleep(Duration::from_millis(5)).await; |
665 | rx1.await.unwrap(); |
666 | time::sleep(Duration::from_millis(5)).await; |
667 | }); |
668 | }); |
669 | |
670 | jh1.join().unwrap(); |
671 | jh2.join().unwrap(); |
672 | } |
673 | |
674 | #[test] |
675 | // IOCP requires setting the "max thread" concurrency value. The sane, |
676 | // default, is to set this to the number of cores. Threads that poll I/O |
677 | // become associated with the IOCP handle. Once those threads sleep for any |
678 | // reason (mutex), they yield their ownership. |
679 | // |
680 | // This test hits an edge case on windows where more threads than cores are |
681 | // created, none of those threads ever yield due to being at capacity, so |
682 | // IOCP gets "starved". |
683 | // |
684 | // For now, this is a very edge case that is probably not a real production |
685 | // concern. There also isn't a great/obvious solution to take. For now, the |
686 | // test is disabled. |
687 | #[cfg (not(windows))] |
688 | #[cfg (not(target_os="wasi" ))] // Wasi does not support bind or threads |
689 | fn io_driver_called_when_under_load() { |
690 | let rt = rt(); |
691 | |
692 | // Create a lot of constant load. The scheduler will always be busy. |
693 | for _ in 0..100 { |
694 | rt.spawn(async { |
695 | loop { |
696 | // Don't use Tokio's `yield_now()` to avoid special defer |
697 | // logic. |
698 | futures::future::poll_fn::<(), _>(|cx| { |
699 | cx.waker().wake_by_ref(); |
700 | std::task::Poll::Pending |
701 | }).await; |
702 | } |
703 | }); |
704 | } |
705 | |
706 | // Do some I/O work |
707 | rt.block_on(async { |
708 | let listener = assert_ok!(TcpListener::bind("127.0.0.1:0" ).await); |
709 | let addr = assert_ok!(listener.local_addr()); |
710 | |
711 | let srv = tokio::spawn(async move { |
712 | let (mut stream, _) = assert_ok!(listener.accept().await); |
713 | assert_ok!(stream.write_all(b"hello world" ).await); |
714 | }); |
715 | |
716 | let cli = tokio::spawn(async move { |
717 | let mut stream = assert_ok!(TcpStream::connect(addr).await); |
718 | let mut dst = vec![0; 11]; |
719 | |
720 | assert_ok!(stream.read_exact(&mut dst).await); |
721 | assert_eq!(dst, b"hello world" ); |
722 | }); |
723 | |
724 | assert_ok!(srv.await); |
725 | assert_ok!(cli.await); |
726 | }); |
727 | } |
728 | |
729 | /// Tests that yielded tasks are not scheduled until **after** resource |
730 | /// drivers are polled. |
731 | /// |
732 | /// The OS does not guarantee when I/O events are delivered, so there may be |
733 | /// more yields than anticipated. This makes the test slightly flaky. To |
734 | /// help avoid flakiness, we run the test 10 times and only fail it after |
735 | /// 10 failures in a row. |
736 | /// |
737 | /// Note that if the test fails by panicking rather than by returning false, |
738 | /// then we fail it immediately. That kind of failure should not happen |
739 | /// spuriously. |
740 | #[test] |
741 | #[cfg (not(target_os="wasi" ))] |
742 | fn yield_defers_until_park() { |
743 | for _ in 0..10 { |
744 | if yield_defers_until_park_inner() { |
745 | // test passed |
746 | return; |
747 | } |
748 | |
749 | // Wait a bit and run the test again. |
750 | std::thread::sleep(std::time::Duration::from_secs(2)); |
751 | } |
752 | |
753 | panic!("yield_defers_until_park is failing consistently" ); |
754 | } |
755 | |
756 | /// Implementation of `yield_defers_until_park` test. Returns `true` if the |
757 | /// test passed. |
758 | #[cfg (not(target_os="wasi" ))] |
759 | fn yield_defers_until_park_inner() -> bool { |
760 | use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; |
761 | use std::sync::Barrier; |
762 | |
763 | let rt = rt(); |
764 | |
765 | let flag = Arc::new(AtomicBool::new(false)); |
766 | let barrier = Arc::new(Barrier::new(NUM_WORKERS)); |
767 | |
768 | rt.block_on(async { |
769 | // Make sure other workers cannot steal tasks |
770 | #[allow (clippy::reversed_empty_ranges)] |
771 | for _ in 0..(NUM_WORKERS-1) { |
772 | let flag = flag.clone(); |
773 | let barrier = barrier.clone(); |
774 | |
775 | tokio::spawn(async move { |
776 | barrier.wait(); |
777 | |
778 | while !flag.load(SeqCst) { |
779 | std::thread::sleep(std::time::Duration::from_millis(1)); |
780 | } |
781 | }); |
782 | } |
783 | |
784 | barrier.wait(); |
785 | |
786 | let (fail_test, fail_test_recv) = oneshot::channel::<()>(); |
787 | |
788 | let jh = tokio::spawn(async move { |
789 | // Create a TCP litener |
790 | let listener = TcpListener::bind("127.0.0.1:0" ).await.unwrap(); |
791 | let addr = listener.local_addr().unwrap(); |
792 | |
793 | tokio::join!( |
794 | async { |
795 | // Done in a blocking manner intentionally. |
796 | let _socket = std::net::TcpStream::connect(addr).unwrap(); |
797 | |
798 | // Yield until connected |
799 | let mut cnt = 0; |
800 | while !flag.load(SeqCst){ |
801 | tokio::task::yield_now().await; |
802 | cnt += 1; |
803 | |
804 | if cnt >= 10 { |
805 | // yielded too many times; report failure and |
806 | // sleep forever so that the `fail_test` branch |
807 | // of the `select!` below triggers. |
808 | let _ = fail_test.send(()); |
809 | futures::future::pending::<()>().await; |
810 | break; |
811 | } |
812 | } |
813 | }, |
814 | async { |
815 | let _ = listener.accept().await.unwrap(); |
816 | flag.store(true, SeqCst); |
817 | } |
818 | ); |
819 | }); |
820 | |
821 | // Wait until the spawned task completes or fails. If no message is |
822 | // sent on `fail_test`, then the test succeeds. Otherwise, it fails. |
823 | let success = fail_test_recv.await.is_err(); |
824 | |
825 | if success { |
826 | // Check for panics in spawned task. |
827 | jh.abort(); |
828 | jh.await.unwrap(); |
829 | } |
830 | |
831 | success |
832 | }) |
833 | } |
834 | |
835 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
836 | #[test] |
837 | fn client_server_block_on() { |
838 | let rt = rt(); |
839 | let (tx, rx) = mpsc::channel(); |
840 | |
841 | rt.block_on(async move { client_server(tx).await }); |
842 | |
843 | assert_ok!(rx.try_recv()); |
844 | assert_err!(rx.try_recv()); |
845 | } |
846 | |
847 | #[cfg_attr (target_os = "wasi" , ignore = "Wasi does not support threads or panic recovery" )] |
848 | #[cfg (panic = "unwind" )] |
849 | #[test] |
850 | fn panic_in_task() { |
851 | let rt = rt(); |
852 | let (tx, rx) = oneshot::channel(); |
853 | |
854 | struct Boom(Option<oneshot::Sender<()>>); |
855 | |
856 | impl Future for Boom { |
857 | type Output = (); |
858 | |
859 | fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
860 | panic!(); |
861 | } |
862 | } |
863 | |
864 | impl Drop for Boom { |
865 | fn drop(&mut self) { |
866 | assert!(std::thread::panicking()); |
867 | self.0.take().unwrap().send(()).unwrap(); |
868 | } |
869 | } |
870 | |
871 | rt.spawn(Boom(Some(tx))); |
872 | assert_ok!(rt.block_on(rx)); |
873 | } |
874 | |
875 | #[test] |
876 | #[should_panic ] |
877 | #[cfg_attr (target_os = "wasi" , ignore = "Wasi does not support panic recovery" )] |
878 | fn panic_in_block_on() { |
879 | let rt = rt(); |
880 | rt.block_on(async { panic!() }); |
881 | } |
882 | |
883 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
884 | async fn yield_once() { |
885 | let mut yielded = false; |
886 | poll_fn(|cx| { |
887 | if yielded { |
888 | Poll::Ready(()) |
889 | } else { |
890 | yielded = true; |
891 | cx.waker().wake_by_ref(); |
892 | Poll::Pending |
893 | } |
894 | }) |
895 | .await |
896 | } |
897 | |
898 | #[test] |
899 | fn enter_and_spawn() { |
900 | let rt = rt(); |
901 | let handle = { |
902 | let _enter = rt.enter(); |
903 | tokio::spawn(async {}) |
904 | }; |
905 | |
906 | assert_ok!(rt.block_on(handle)); |
907 | } |
908 | |
909 | #[test] |
910 | fn eagerly_drops_futures_on_shutdown() { |
911 | use std::sync::mpsc; |
912 | |
913 | struct Never { |
914 | drop_tx: mpsc::Sender<()>, |
915 | } |
916 | |
917 | impl Future for Never { |
918 | type Output = (); |
919 | |
920 | fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
921 | Poll::Pending |
922 | } |
923 | } |
924 | |
925 | impl Drop for Never { |
926 | fn drop(&mut self) { |
927 | self.drop_tx.send(()).unwrap(); |
928 | } |
929 | } |
930 | |
931 | let rt = rt(); |
932 | |
933 | let (drop_tx, drop_rx) = mpsc::channel(); |
934 | let (run_tx, run_rx) = oneshot::channel(); |
935 | |
936 | rt.block_on(async move { |
937 | tokio::spawn(async move { |
938 | assert_ok!(run_tx.send(())); |
939 | |
940 | Never { drop_tx }.await |
941 | }); |
942 | |
943 | assert_ok!(run_rx.await); |
944 | }); |
945 | |
946 | drop(rt); |
947 | |
948 | assert_ok!(drop_rx.recv()); |
949 | } |
950 | |
951 | #[test] |
952 | fn wake_while_rt_is_dropping() { |
953 | use tokio::sync::Barrier; |
954 | |
955 | struct OnDrop<F: FnMut()>(F); |
956 | |
957 | impl<F: FnMut()> Drop for OnDrop<F> { |
958 | fn drop(&mut self) { |
959 | (self.0)() |
960 | } |
961 | } |
962 | |
963 | let (tx1, rx1) = oneshot::channel(); |
964 | let (tx2, rx2) = oneshot::channel(); |
965 | |
966 | let barrier = Arc::new(Barrier::new(3)); |
967 | let barrier1 = barrier.clone(); |
968 | let barrier2 = barrier.clone(); |
969 | |
970 | let rt = rt(); |
971 | |
972 | rt.spawn(async move { |
973 | let mut tx2 = Some(tx2); |
974 | let _d = OnDrop(move || { |
975 | let _ = tx2.take().unwrap().send(()); |
976 | }); |
977 | |
978 | // Ensure a waker gets stored in oneshot 1. |
979 | let _ = tokio::join!(rx1, barrier1.wait()); |
980 | }); |
981 | |
982 | rt.spawn(async move { |
983 | let mut tx1 = Some(tx1); |
984 | let _d = OnDrop(move || { |
985 | let _ = tx1.take().unwrap().send(()); |
986 | }); |
987 | |
988 | // Ensure a waker gets stored in oneshot 2. |
989 | let _ = tokio::join!(rx2, barrier2.wait()); |
990 | }); |
991 | |
992 | // Wait until every oneshot channel has been polled. |
993 | rt.block_on(barrier.wait()); |
994 | |
995 | // Drop the rt. Regardless of which task is dropped first, its destructor will wake the |
996 | // other task. |
997 | drop(rt); |
998 | } |
999 | |
1000 | #[cfg (not(target_os="wasi" ))] // Wasi doesn't support UDP or bind() |
1001 | #[test] |
1002 | fn io_notify_while_shutting_down() { |
1003 | use tokio::net::UdpSocket; |
1004 | use std::sync::Arc; |
1005 | |
1006 | for _ in 1..10 { |
1007 | let runtime = rt(); |
1008 | |
1009 | runtime.block_on(async { |
1010 | let socket = UdpSocket::bind("127.0.0.1:0" ).await.unwrap(); |
1011 | let addr = socket.local_addr().unwrap(); |
1012 | let send_half = Arc::new(socket); |
1013 | let recv_half = send_half.clone(); |
1014 | |
1015 | tokio::spawn(async move { |
1016 | let mut buf = [0]; |
1017 | loop { |
1018 | recv_half.recv_from(&mut buf).await.unwrap(); |
1019 | std::thread::sleep(Duration::from_millis(2)); |
1020 | } |
1021 | }); |
1022 | |
1023 | tokio::spawn(async move { |
1024 | let buf = [0]; |
1025 | loop { |
1026 | send_half.send_to(&buf, &addr).await.unwrap(); |
1027 | tokio::time::sleep(Duration::from_millis(1)).await; |
1028 | } |
1029 | }); |
1030 | |
1031 | tokio::time::sleep(Duration::from_millis(5)).await; |
1032 | }); |
1033 | } |
1034 | } |
1035 | |
1036 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
1037 | #[test] |
1038 | fn shutdown_timeout() { |
1039 | let (tx, rx) = oneshot::channel(); |
1040 | let runtime = rt(); |
1041 | |
1042 | runtime.block_on(async move { |
1043 | task::spawn_blocking(move || { |
1044 | tx.send(()).unwrap(); |
1045 | thread::sleep(Duration::from_secs(10_000)); |
1046 | }); |
1047 | |
1048 | rx.await.unwrap(); |
1049 | }); |
1050 | |
1051 | Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); |
1052 | } |
1053 | |
1054 | #[cfg (not(target_os="wasi" ))] // Wasi does not support threads |
1055 | #[test] |
1056 | fn shutdown_timeout_0() { |
1057 | let runtime = rt(); |
1058 | |
1059 | runtime.block_on(async move { |
1060 | task::spawn_blocking(move || { |
1061 | thread::sleep(Duration::from_secs(10_000)); |
1062 | }); |
1063 | }); |
1064 | |
1065 | let now = Instant::now(); |
1066 | Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0)); |
1067 | assert!(now.elapsed().as_secs() < 1); |
1068 | } |
1069 | |
1070 | #[test] |
1071 | fn shutdown_wakeup_time() { |
1072 | let runtime = rt(); |
1073 | |
1074 | runtime.block_on(async move { |
1075 | tokio::time::sleep(std::time::Duration::from_millis(100)).await; |
1076 | }); |
1077 | |
1078 | Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000)); |
1079 | } |
1080 | |
1081 | // This test is currently ignored on Windows because of a |
1082 | // rust-lang issue in thread local storage destructors. |
1083 | // See https://github.com/rust-lang/rust/issues/74875 |
1084 | #[test] |
1085 | #[cfg (not(windows))] |
1086 | #[cfg_attr (target_os = "wasi" , ignore = "Wasi does not support threads" )] |
1087 | fn runtime_in_thread_local() { |
1088 | use std::cell::RefCell; |
1089 | use std::thread; |
1090 | |
1091 | thread_local!( |
1092 | static R: RefCell<Option<Runtime>> = RefCell::new(None); |
1093 | ); |
1094 | |
1095 | thread::spawn(|| { |
1096 | R.with(|cell| { |
1097 | let rt = rt(); |
1098 | let rt = Arc::try_unwrap(rt).unwrap(); |
1099 | *cell.borrow_mut() = Some(rt); |
1100 | }); |
1101 | |
1102 | let _rt = rt(); |
1103 | }).join().unwrap(); |
1104 | } |
1105 | |
1106 | #[cfg (not(target_os="wasi" ))] // Wasi does not support bind |
1107 | async fn client_server(tx: mpsc::Sender<()>) { |
1108 | let server = assert_ok!(TcpListener::bind("127.0.0.1:0" ).await); |
1109 | |
1110 | // Get the assigned address |
1111 | let addr = assert_ok!(server.local_addr()); |
1112 | |
1113 | // Spawn the server |
1114 | tokio::spawn(async move { |
1115 | // Accept a socket |
1116 | let (mut socket, _) = server.accept().await.unwrap(); |
1117 | |
1118 | // Write some data |
1119 | socket.write_all(b"hello" ).await.unwrap(); |
1120 | }); |
1121 | |
1122 | let mut client = TcpStream::connect(&addr).await.unwrap(); |
1123 | |
1124 | let mut buf = vec![]; |
1125 | client.read_to_end(&mut buf).await.unwrap(); |
1126 | |
1127 | assert_eq!(buf, b"hello" ); |
1128 | tx.send(()).unwrap(); |
1129 | } |
1130 | |
1131 | #[cfg (not(target_os = "wasi" ))] // Wasi does not support bind |
1132 | #[test] |
1133 | fn local_set_block_on_socket() { |
1134 | let rt = rt(); |
1135 | let local = task::LocalSet::new(); |
1136 | |
1137 | local.block_on(&rt, async move { |
1138 | let (tx, rx) = oneshot::channel(); |
1139 | |
1140 | let listener = TcpListener::bind("127.0.0.1:0" ).await.unwrap(); |
1141 | let addr = listener.local_addr().unwrap(); |
1142 | |
1143 | task::spawn_local(async move { |
1144 | let _ = listener.accept().await; |
1145 | tx.send(()).unwrap(); |
1146 | }); |
1147 | |
1148 | TcpStream::connect(&addr).await.unwrap(); |
1149 | rx.await.unwrap(); |
1150 | }); |
1151 | } |
1152 | |
1153 | #[cfg (not(target_os = "wasi" ))] // Wasi does not support bind |
1154 | #[test] |
1155 | fn local_set_client_server_block_on() { |
1156 | let rt = rt(); |
1157 | let (tx, rx) = mpsc::channel(); |
1158 | |
1159 | let local = task::LocalSet::new(); |
1160 | |
1161 | local.block_on(&rt, async move { client_server_local(tx).await }); |
1162 | |
1163 | assert_ok!(rx.try_recv()); |
1164 | assert_err!(rx.try_recv()); |
1165 | } |
1166 | |
1167 | #[cfg (not(target_os = "wasi" ))] // Wasi does not support bind |
1168 | async fn client_server_local(tx: mpsc::Sender<()>) { |
1169 | let server = assert_ok!(TcpListener::bind("127.0.0.1:0" ).await); |
1170 | |
1171 | // Get the assigned address |
1172 | let addr = assert_ok!(server.local_addr()); |
1173 | |
1174 | // Spawn the server |
1175 | task::spawn_local(async move { |
1176 | // Accept a socket |
1177 | let (mut socket, _) = server.accept().await.unwrap(); |
1178 | |
1179 | // Write some data |
1180 | socket.write_all(b"hello" ).await.unwrap(); |
1181 | }); |
1182 | |
1183 | let mut client = TcpStream::connect(&addr).await.unwrap(); |
1184 | |
1185 | let mut buf = vec![]; |
1186 | client.read_to_end(&mut buf).await.unwrap(); |
1187 | |
1188 | assert_eq!(buf, b"hello" ); |
1189 | tx.send(()).unwrap(); |
1190 | } |
1191 | |
1192 | #[test] |
1193 | fn coop() { |
1194 | use std::task::Poll::Ready; |
1195 | use tokio::sync::mpsc; |
1196 | |
1197 | let rt = rt(); |
1198 | |
1199 | rt.block_on(async { |
1200 | let (send, mut recv) = mpsc::unbounded_channel(); |
1201 | |
1202 | // Send a bunch of messages. |
1203 | for _ in 0..1_000 { |
1204 | send.send(()).unwrap(); |
1205 | } |
1206 | |
1207 | poll_fn(|cx| { |
1208 | // At least one response should return pending. |
1209 | for _ in 0..1_000 { |
1210 | if recv.poll_recv(cx).is_pending() { |
1211 | return Ready(()); |
1212 | } |
1213 | } |
1214 | |
1215 | panic!("did not yield" ); |
1216 | }).await; |
1217 | }); |
1218 | } |
1219 | |
1220 | #[test] |
1221 | fn coop_unconstrained() { |
1222 | use std::task::Poll::Ready; |
1223 | use tokio::sync::mpsc; |
1224 | |
1225 | let rt = rt(); |
1226 | |
1227 | rt.block_on(async { |
1228 | let (send, mut recv) = mpsc::unbounded_channel(); |
1229 | |
1230 | // Send a bunch of messages. |
1231 | for _ in 0..1_000 { |
1232 | send.send(()).unwrap(); |
1233 | } |
1234 | |
1235 | tokio::task::unconstrained(poll_fn(|cx| { |
1236 | // All the responses should be ready. |
1237 | for _ in 0..1_000 { |
1238 | assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(()))); |
1239 | } |
1240 | |
1241 | Ready(()) |
1242 | })).await; |
1243 | }); |
1244 | } |
1245 | |
1246 | #[cfg (tokio_unstable)] |
1247 | #[test] |
1248 | fn coop_consume_budget() { |
1249 | let rt = rt(); |
1250 | |
1251 | rt.block_on(async { |
1252 | poll_fn(|cx| { |
1253 | let counter = Arc::new(std::sync::Mutex::new(0)); |
1254 | let counter_clone = Arc::clone(&counter); |
1255 | let mut worker = Box::pin(async move { |
1256 | // Consume the budget until a yield happens |
1257 | for _ in 0..1000 { |
1258 | *counter.lock().unwrap() += 1; |
1259 | task::consume_budget().await |
1260 | } |
1261 | }); |
1262 | // Assert that the worker was yielded and it didn't manage |
1263 | // to finish the whole work (assuming the total budget of 128) |
1264 | assert!(Pin::new(&mut worker).poll(cx).is_pending()); |
1265 | assert!(*counter_clone.lock().unwrap() < 1000); |
1266 | std::task::Poll::Ready(()) |
1267 | }).await; |
1268 | }); |
1269 | } |
1270 | |
1271 | // Tests that the "next task" scheduler optimization is not able to starve |
1272 | // other tasks. |
1273 | #[test] |
1274 | fn ping_pong_saturation() { |
1275 | use std::sync::atomic::{Ordering, AtomicBool}; |
1276 | use tokio::sync::mpsc; |
1277 | |
1278 | const NUM: usize = 100; |
1279 | |
1280 | let rt = rt(); |
1281 | |
1282 | let running = Arc::new(AtomicBool::new(true)); |
1283 | |
1284 | rt.block_on(async { |
1285 | let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); |
1286 | |
1287 | let mut tasks = vec![]; |
1288 | // Spawn a bunch of tasks that ping-pong between each other to |
1289 | // saturate the runtime. |
1290 | for _ in 0..NUM { |
1291 | let (tx1, mut rx1) = mpsc::unbounded_channel(); |
1292 | let (tx2, mut rx2) = mpsc::unbounded_channel(); |
1293 | let spawned_tx = spawned_tx.clone(); |
1294 | let running = running.clone(); |
1295 | tasks.push(task::spawn(async move { |
1296 | spawned_tx.send(()).unwrap(); |
1297 | |
1298 | |
1299 | while running.load(Ordering::Relaxed) { |
1300 | tx1.send(()).unwrap(); |
1301 | rx2.recv().await.unwrap(); |
1302 | } |
1303 | |
1304 | // Close the channel and wait for the other task to exit. |
1305 | drop(tx1); |
1306 | assert!(rx2.recv().await.is_none()); |
1307 | })); |
1308 | |
1309 | tasks.push(task::spawn(async move { |
1310 | while rx1.recv().await.is_some() { |
1311 | tx2.send(()).unwrap(); |
1312 | } |
1313 | })); |
1314 | } |
1315 | |
1316 | for _ in 0..NUM { |
1317 | spawned_rx.recv().await.unwrap(); |
1318 | } |
1319 | |
1320 | // spawn another task and wait for it to complete |
1321 | let handle = task::spawn(async { |
1322 | for _ in 0..5 { |
1323 | // Yielding forces it back into the local queue. |
1324 | task::yield_now().await; |
1325 | } |
1326 | }); |
1327 | handle.await.unwrap(); |
1328 | running.store(false, Ordering::Relaxed); |
1329 | for t in tasks { |
1330 | t.await.unwrap(); |
1331 | } |
1332 | }); |
1333 | } |
1334 | |
1335 | #[test] |
1336 | #[cfg (not(target_os="wasi" ))] |
1337 | fn shutdown_concurrent_spawn() { |
1338 | const NUM_TASKS: usize = 10_000; |
1339 | for _ in 0..5 { |
1340 | let (tx, rx) = std::sync::mpsc::channel(); |
1341 | let rt = rt(); |
1342 | |
1343 | let mut txs = vec![]; |
1344 | |
1345 | for _ in 0..NUM_TASKS { |
1346 | let (tx, rx) = tokio::sync::oneshot::channel(); |
1347 | txs.push(tx); |
1348 | rt.spawn(async move { |
1349 | rx.await.unwrap(); |
1350 | }); |
1351 | } |
1352 | |
1353 | // Prime the tasks |
1354 | rt.block_on(async { tokio::task::yield_now().await }); |
1355 | |
1356 | let th = std::thread::spawn(move || { |
1357 | tx.send(()).unwrap(); |
1358 | for tx in txs.drain(..) { |
1359 | let _ = tx.send(()); |
1360 | } |
1361 | }); |
1362 | |
1363 | rx.recv().unwrap(); |
1364 | drop(rt); |
1365 | |
1366 | th.join().unwrap(); |
1367 | } |
1368 | } |
1369 | |
1370 | #[test] |
1371 | #[cfg_attr (target_family = "wasm" , ignore)] |
1372 | fn wake_by_ref_from_thread_local() { |
1373 | wake_from_thread_local(true); |
1374 | } |
1375 | |
1376 | #[test] |
1377 | #[cfg_attr (target_family = "wasm" , ignore)] |
1378 | fn wake_by_val_from_thread_local() { |
1379 | wake_from_thread_local(false); |
1380 | } |
1381 | |
1382 | fn wake_from_thread_local(by_ref: bool) { |
1383 | use std::cell::RefCell; |
1384 | use std::sync::mpsc::{channel, Sender}; |
1385 | use std::task::Waker; |
1386 | |
1387 | struct TLData { |
1388 | by_ref: bool, |
1389 | waker: Option<Waker>, |
1390 | done: Sender<bool>, |
1391 | } |
1392 | |
1393 | impl Drop for TLData { |
1394 | fn drop(&mut self) { |
1395 | if self.by_ref { |
1396 | self.waker.take().unwrap().wake_by_ref(); |
1397 | } else { |
1398 | self.waker.take().unwrap().wake(); |
1399 | } |
1400 | let _ = self.done.send(true); |
1401 | } |
1402 | } |
1403 | |
1404 | std::thread_local! { |
1405 | static TL_DATA: RefCell<Option<TLData>> = RefCell::new(None); |
1406 | }; |
1407 | |
1408 | let (send, recv) = channel(); |
1409 | |
1410 | std::thread::spawn(move || { |
1411 | let rt = rt(); |
1412 | rt.block_on(rt.spawn(poll_fn(move |cx| { |
1413 | let waker = cx.waker().clone(); |
1414 | let send = send.clone(); |
1415 | TL_DATA.with(|tl| { |
1416 | tl.replace(Some(TLData { |
1417 | by_ref, |
1418 | waker: Some(waker), |
1419 | done: send, |
1420 | })); |
1421 | }); |
1422 | Poll::Ready(()) |
1423 | }))) |
1424 | .unwrap(); |
1425 | }) |
1426 | .join() |
1427 | .unwrap(); |
1428 | |
1429 | assert!(recv.recv().unwrap()); |
1430 | } |
1431 | } |
1432 | |