1#![warn(rust_2018_idioms)]
2#![cfg(feature = "full")]
3
4// All io tests that deal with shutdown is currently ignored because there are known bugs in with
5// shutting down the io driver while concurrently registering new resources. See
6// https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 for more details.
7//
8// When this has been fixed we want to re-enable these tests.
9
10use std::time::Duration;
11use tokio::runtime::{Handle, Runtime};
12use tokio::sync::mpsc;
13#[cfg(not(target_os = "wasi"))]
14use tokio::{net, time};
15
16#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
17macro_rules! multi_threaded_rt_test {
18 ($($t:tt)*) => {
19 mod threaded_scheduler_4_threads_only {
20 use super::*;
21
22 $($t)*
23
24 fn rt() -> Runtime {
25 tokio::runtime::Builder::new_multi_thread()
26 .worker_threads(4)
27 .enable_all()
28 .build()
29 .unwrap()
30 }
31 }
32
33 mod threaded_scheduler_1_thread_only {
34 use super::*;
35
36 $($t)*
37
38 fn rt() -> Runtime {
39 tokio::runtime::Builder::new_multi_thread()
40 .worker_threads(1)
41 .enable_all()
42 .build()
43 .unwrap()
44 }
45 }
46 }
47}
48
49#[cfg(not(target_os = "wasi"))]
50macro_rules! rt_test {
51 ($($t:tt)*) => {
52 mod current_thread_scheduler {
53 use super::*;
54
55 $($t)*
56
57 fn rt() -> Runtime {
58 tokio::runtime::Builder::new_current_thread()
59 .enable_all()
60 .build()
61 .unwrap()
62 }
63 }
64
65 mod threaded_scheduler_4_threads {
66 use super::*;
67
68 $($t)*
69
70 fn rt() -> Runtime {
71 tokio::runtime::Builder::new_multi_thread()
72 .worker_threads(4)
73 .enable_all()
74 .build()
75 .unwrap()
76 }
77 }
78
79 mod threaded_scheduler_1_thread {
80 use super::*;
81
82 $($t)*
83
84 fn rt() -> Runtime {
85 tokio::runtime::Builder::new_multi_thread()
86 .worker_threads(1)
87 .enable_all()
88 .build()
89 .unwrap()
90 }
91 }
92 }
93}
94
95// ==== runtime independent futures ======
96
97#[test]
98fn basic() {
99 test_with_runtimes(|| {
100 let one = Handle::current().block_on(async { 1 });
101 assert_eq!(1, one);
102 });
103}
104
105#[test]
106fn bounded_mpsc_channel() {
107 test_with_runtimes(|| {
108 let (tx, mut rx) = mpsc::channel(1024);
109
110 Handle::current().block_on(tx.send(42)).unwrap();
111
112 let value = Handle::current().block_on(rx.recv()).unwrap();
113 assert_eq!(value, 42);
114 });
115}
116
117#[test]
118fn unbounded_mpsc_channel() {
119 test_with_runtimes(|| {
120 let (tx, mut rx) = mpsc::unbounded_channel();
121
122 let _ = tx.send(42);
123
124 let value = Handle::current().block_on(rx.recv()).unwrap();
125 assert_eq!(value, 42);
126 })
127}
128
129#[cfg(not(target_os = "wasi"))] // Wasi doesn't support file operations or bind
130rt_test! {
131 use tokio::fs;
132 // ==== spawn blocking futures ======
133
134 #[test]
135 fn basic_fs() {
136 let rt = rt();
137 let _enter = rt.enter();
138
139 let contents = Handle::current()
140 .block_on(fs::read_to_string("Cargo.toml"))
141 .unwrap();
142 assert!(contents.contains("https://tokio.rs"));
143 }
144
145 #[test]
146 fn fs_shutdown_before_started() {
147 let rt = rt();
148 let _enter = rt.enter();
149 rt.shutdown_timeout(Duration::from_secs(1000));
150
151 let err: std::io::Error = Handle::current()
152 .block_on(fs::read_to_string("Cargo.toml"))
153 .unwrap_err();
154
155 assert_eq!(err.kind(), std::io::ErrorKind::Other);
156
157 let inner_err = err.get_ref().expect("no inner error");
158 assert_eq!(inner_err.to_string(), "background task failed");
159 }
160
161 #[test]
162 fn basic_spawn_blocking() {
163 use tokio::task::spawn_blocking;
164 let rt = rt();
165 let _enter = rt.enter();
166
167 let answer = Handle::current()
168 .block_on(spawn_blocking(|| {
169 std::thread::sleep(Duration::from_millis(100));
170 42
171 }))
172 .unwrap();
173
174 assert_eq!(answer, 42);
175 }
176
177 #[test]
178 fn spawn_blocking_after_shutdown_fails() {
179 use tokio::task::spawn_blocking;
180 let rt = rt();
181 let _enter = rt.enter();
182 rt.shutdown_timeout(Duration::from_secs(1000));
183
184 let join_err = Handle::current()
185 .block_on(spawn_blocking(|| {
186 std::thread::sleep(Duration::from_millis(100));
187 42
188 }))
189 .unwrap_err();
190
191 assert!(join_err.is_cancelled());
192 }
193
194 #[test]
195 fn spawn_blocking_started_before_shutdown_continues() {
196 use tokio::task::spawn_blocking;
197 let rt = rt();
198 let _enter = rt.enter();
199
200 let handle = spawn_blocking(|| {
201 std::thread::sleep(Duration::from_secs(1));
202 42
203 });
204
205 rt.shutdown_timeout(Duration::from_secs(1000));
206
207 let answer = Handle::current().block_on(handle).unwrap();
208
209 assert_eq!(answer, 42);
210 }
211
212 // ==== net ======
213
214 #[test]
215 fn tcp_listener_bind() {
216 let rt = rt();
217 let _enter = rt.enter();
218
219 Handle::current()
220 .block_on(net::TcpListener::bind("127.0.0.1:0"))
221 .unwrap();
222 }
223
224 // All io tests are ignored for now. See above why that is.
225 #[ignore]
226 #[test]
227 fn tcp_listener_connect_after_shutdown() {
228 let rt = rt();
229 let _enter = rt.enter();
230
231 rt.shutdown_timeout(Duration::from_secs(1000));
232
233 let err = Handle::current()
234 .block_on(net::TcpListener::bind("127.0.0.1:0"))
235 .unwrap_err();
236
237 assert_eq!(err.kind(), std::io::ErrorKind::Other);
238 assert_eq!(
239 err.get_ref().unwrap().to_string(),
240 "A Tokio 1.x context was found, but it is being shutdown.",
241 );
242 }
243
244 // All io tests are ignored for now. See above why that is.
245 #[ignore]
246 #[test]
247 fn tcp_listener_connect_before_shutdown() {
248 let rt = rt();
249 let _enter = rt.enter();
250
251 let bind_future = net::TcpListener::bind("127.0.0.1:0");
252
253 rt.shutdown_timeout(Duration::from_secs(1000));
254
255 let err = Handle::current().block_on(bind_future).unwrap_err();
256
257 assert_eq!(err.kind(), std::io::ErrorKind::Other);
258 assert_eq!(
259 err.get_ref().unwrap().to_string(),
260 "A Tokio 1.x context was found, but it is being shutdown.",
261 );
262 }
263
264 #[test]
265 fn udp_socket_bind() {
266 let rt = rt();
267 let _enter = rt.enter();
268
269 Handle::current()
270 .block_on(net::UdpSocket::bind("127.0.0.1:0"))
271 .unwrap();
272 }
273
274 // All io tests are ignored for now. See above why that is.
275 #[ignore]
276 #[test]
277 fn udp_stream_bind_after_shutdown() {
278 let rt = rt();
279 let _enter = rt.enter();
280
281 rt.shutdown_timeout(Duration::from_secs(1000));
282
283 let err = Handle::current()
284 .block_on(net::UdpSocket::bind("127.0.0.1:0"))
285 .unwrap_err();
286
287 assert_eq!(err.kind(), std::io::ErrorKind::Other);
288 assert_eq!(
289 err.get_ref().unwrap().to_string(),
290 "A Tokio 1.x context was found, but it is being shutdown.",
291 );
292 }
293
294 // All io tests are ignored for now. See above why that is.
295 #[ignore]
296 #[test]
297 fn udp_stream_bind_before_shutdown() {
298 let rt = rt();
299 let _enter = rt.enter();
300
301 let bind_future = net::UdpSocket::bind("127.0.0.1:0");
302
303 rt.shutdown_timeout(Duration::from_secs(1000));
304
305 let err = Handle::current().block_on(bind_future).unwrap_err();
306
307 assert_eq!(err.kind(), std::io::ErrorKind::Other);
308 assert_eq!(
309 err.get_ref().unwrap().to_string(),
310 "A Tokio 1.x context was found, but it is being shutdown.",
311 );
312 }
313
314 // All io tests are ignored for now. See above why that is.
315 #[ignore]
316 #[cfg(unix)]
317 #[test]
318 fn unix_listener_bind_after_shutdown() {
319 let rt = rt();
320 let _enter = rt.enter();
321
322 let dir = tempfile::tempdir().unwrap();
323 let path = dir.path().join("socket");
324
325 rt.shutdown_timeout(Duration::from_secs(1000));
326
327 let err = net::UnixListener::bind(path).unwrap_err();
328
329 assert_eq!(err.kind(), std::io::ErrorKind::Other);
330 assert_eq!(
331 err.get_ref().unwrap().to_string(),
332 "A Tokio 1.x context was found, but it is being shutdown.",
333 );
334 }
335
336 // All io tests are ignored for now. See above why that is.
337 #[ignore]
338 #[cfg(unix)]
339 #[test]
340 fn unix_listener_shutdown_after_bind() {
341 let rt = rt();
342 let _enter = rt.enter();
343
344 let dir = tempfile::tempdir().unwrap();
345 let path = dir.path().join("socket");
346
347 let listener = net::UnixListener::bind(path).unwrap();
348
349 rt.shutdown_timeout(Duration::from_secs(1000));
350
351 // this should not timeout but fail immediately since the runtime has been shutdown
352 let err = Handle::current().block_on(listener.accept()).unwrap_err();
353
354 assert_eq!(err.kind(), std::io::ErrorKind::Other);
355 assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
356 }
357
358 // All io tests are ignored for now. See above why that is.
359 #[ignore]
360 #[cfg(unix)]
361 #[test]
362 fn unix_listener_shutdown_after_accept() {
363 let rt = rt();
364 let _enter = rt.enter();
365
366 let dir = tempfile::tempdir().unwrap();
367 let path = dir.path().join("socket");
368
369 let listener = net::UnixListener::bind(path).unwrap();
370
371 let accept_future = listener.accept();
372
373 rt.shutdown_timeout(Duration::from_secs(1000));
374
375 // this should not timeout but fail immediately since the runtime has been shutdown
376 let err = Handle::current().block_on(accept_future).unwrap_err();
377
378 assert_eq!(err.kind(), std::io::ErrorKind::Other);
379 assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
380 }
381
382 // ==== nesting ======
383
384 #[test]
385 #[should_panic(
386 expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks."
387 )]
388 fn nesting() {
389 fn some_non_async_function() -> i32 {
390 Handle::current().block_on(time::sleep(Duration::from_millis(10)));
391 1
392 }
393
394 let rt = rt();
395
396 rt.block_on(async { some_non_async_function() });
397 }
398
399 #[test]
400 fn spawn_after_runtime_dropped() {
401 use futures::future::FutureExt;
402
403 let rt = rt();
404
405 let handle = rt.block_on(async move {
406 Handle::current()
407 });
408
409 let jh1 = handle.spawn(futures::future::pending::<()>());
410
411 drop(rt);
412
413 let jh2 = handle.spawn(futures::future::pending::<()>());
414
415 let err1 = jh1.now_or_never().unwrap().unwrap_err();
416 let err2 = jh2.now_or_never().unwrap().unwrap_err();
417 assert!(err1.is_cancelled());
418 assert!(err2.is_cancelled());
419 }
420}
421
422#[cfg(not(target_os = "wasi"))]
423multi_threaded_rt_test! {
424 #[cfg(unix)]
425 #[test]
426 fn unix_listener_bind() {
427 let rt = rt();
428 let _enter = rt.enter();
429
430 let dir = tempfile::tempdir().unwrap();
431 let path = dir.path().join("socket");
432
433 let listener = net::UnixListener::bind(path).unwrap();
434
435 // this should timeout and not fail immediately since the runtime has not been shutdown
436 let _: tokio::time::error::Elapsed = Handle::current()
437 .block_on(tokio::time::timeout(
438 Duration::from_millis(10),
439 listener.accept(),
440 ))
441 .unwrap_err();
442 }
443
444 // ==== timers ======
445
446 // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no
447 // one to drive the timers so they will just hang forever. Therefore they are not tested.
448
449 #[test]
450 fn sleep() {
451 let rt = rt();
452 let _enter = rt.enter();
453
454 Handle::current().block_on(time::sleep(Duration::from_millis(100)));
455 }
456
457 #[test]
458 #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
459 fn sleep_before_shutdown_panics() {
460 let rt = rt();
461 let _enter = rt.enter();
462
463 let f = time::sleep(Duration::from_millis(100));
464
465 rt.shutdown_timeout(Duration::from_secs(1000));
466
467 Handle::current().block_on(f);
468 }
469
470 #[test]
471 #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
472 fn sleep_after_shutdown_panics() {
473 let rt = rt();
474 let _enter = rt.enter();
475
476 rt.shutdown_timeout(Duration::from_secs(1000));
477
478 Handle::current().block_on(time::sleep(Duration::from_millis(100)));
479 }
480}
481
482// ==== utils ======
483
484/// Create a new multi threaded runtime
485#[cfg(not(target_os = "wasi"))]
486fn new_multi_thread(n: usize) -> Runtime {
487 tokio::runtime::Builder::new_multi_thread()
488 .worker_threads(n)
489 .enable_all()
490 .build()
491 .unwrap()
492}
493
494/// Create a new single threaded runtime
495fn new_current_thread() -> Runtime {
496 tokio::runtime::Builder::new_current_thread()
497 .enable_all()
498 .build()
499 .unwrap()
500}
501
502/// Utility to test things on both kinds of runtimes both before and after shutting it down.
503fn test_with_runtimes<F>(f: F)
504where
505 F: Fn(),
506{
507 {
508 let rt = new_current_thread();
509 let _enter = rt.enter();
510 f();
511
512 rt.shutdown_timeout(Duration::from_secs(1000));
513 f();
514 }
515
516 #[cfg(not(target_os = "wasi"))]
517 {
518 let rt = new_multi_thread(1);
519 let _enter = rt.enter();
520 f();
521
522 rt.shutdown_timeout(Duration::from_secs(1000));
523 f();
524 }
525
526 #[cfg(not(target_os = "wasi"))]
527 {
528 let rt = new_multi_thread(4);
529 let _enter = rt.enter();
530 f();
531
532 rt.shutdown_timeout(Duration::from_secs(1000));
533 f();
534 }
535}
536