1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "full" )] |
3 | |
4 | // All io tests that deal with shutdown is currently ignored because there are known bugs in with |
5 | // shutting down the io driver while concurrently registering new resources. See |
6 | // https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 for more details. |
7 | // |
8 | // When this has been fixed we want to re-enable these tests. |
9 | |
10 | use std::time::Duration; |
11 | use tokio::runtime::{Handle, Runtime}; |
12 | use tokio::sync::mpsc; |
13 | #[cfg (not(target_os = "wasi" ))] |
14 | use tokio::{net, time}; |
15 | |
16 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
17 | macro_rules! multi_threaded_rt_test { |
18 | ($($t:tt)*) => { |
19 | mod threaded_scheduler_4_threads_only { |
20 | use super::*; |
21 | |
22 | $($t)* |
23 | |
24 | fn rt() -> Runtime { |
25 | tokio::runtime::Builder::new_multi_thread() |
26 | .worker_threads(4) |
27 | .enable_all() |
28 | .build() |
29 | .unwrap() |
30 | } |
31 | } |
32 | |
33 | mod threaded_scheduler_1_thread_only { |
34 | use super::*; |
35 | |
36 | $($t)* |
37 | |
38 | fn rt() -> Runtime { |
39 | tokio::runtime::Builder::new_multi_thread() |
40 | .worker_threads(1) |
41 | .enable_all() |
42 | .build() |
43 | .unwrap() |
44 | } |
45 | } |
46 | } |
47 | } |
48 | |
49 | #[cfg (not(target_os = "wasi" ))] |
50 | macro_rules! rt_test { |
51 | ($($t:tt)*) => { |
52 | mod current_thread_scheduler { |
53 | use super::*; |
54 | |
55 | $($t)* |
56 | |
57 | fn rt() -> Runtime { |
58 | tokio::runtime::Builder::new_current_thread() |
59 | .enable_all() |
60 | .build() |
61 | .unwrap() |
62 | } |
63 | } |
64 | |
65 | mod threaded_scheduler_4_threads { |
66 | use super::*; |
67 | |
68 | $($t)* |
69 | |
70 | fn rt() -> Runtime { |
71 | tokio::runtime::Builder::new_multi_thread() |
72 | .worker_threads(4) |
73 | .enable_all() |
74 | .build() |
75 | .unwrap() |
76 | } |
77 | } |
78 | |
79 | mod threaded_scheduler_1_thread { |
80 | use super::*; |
81 | |
82 | $($t)* |
83 | |
84 | fn rt() -> Runtime { |
85 | tokio::runtime::Builder::new_multi_thread() |
86 | .worker_threads(1) |
87 | .enable_all() |
88 | .build() |
89 | .unwrap() |
90 | } |
91 | } |
92 | } |
93 | } |
94 | |
95 | // ==== runtime independent futures ====== |
96 | |
97 | #[test] |
98 | fn basic() { |
99 | test_with_runtimes(|| { |
100 | let one = Handle::current().block_on(async { 1 }); |
101 | assert_eq!(1, one); |
102 | }); |
103 | } |
104 | |
105 | #[test] |
106 | fn bounded_mpsc_channel() { |
107 | test_with_runtimes(|| { |
108 | let (tx, mut rx) = mpsc::channel(1024); |
109 | |
110 | Handle::current().block_on(tx.send(42)).unwrap(); |
111 | |
112 | let value = Handle::current().block_on(rx.recv()).unwrap(); |
113 | assert_eq!(value, 42); |
114 | }); |
115 | } |
116 | |
117 | #[test] |
118 | fn unbounded_mpsc_channel() { |
119 | test_with_runtimes(|| { |
120 | let (tx, mut rx) = mpsc::unbounded_channel(); |
121 | |
122 | let _ = tx.send(42); |
123 | |
124 | let value = Handle::current().block_on(rx.recv()).unwrap(); |
125 | assert_eq!(value, 42); |
126 | }) |
127 | } |
128 | |
129 | #[cfg (not(target_os = "wasi" ))] // Wasi doesn't support file operations or bind |
130 | rt_test! { |
131 | use tokio::fs; |
132 | // ==== spawn blocking futures ====== |
133 | |
134 | #[test] |
135 | fn basic_fs() { |
136 | let rt = rt(); |
137 | let _enter = rt.enter(); |
138 | |
139 | let contents = Handle::current() |
140 | .block_on(fs::read_to_string("Cargo.toml" )) |
141 | .unwrap(); |
142 | assert!(contents.contains("https://tokio.rs" )); |
143 | } |
144 | |
145 | #[test] |
146 | fn fs_shutdown_before_started() { |
147 | let rt = rt(); |
148 | let _enter = rt.enter(); |
149 | rt.shutdown_timeout(Duration::from_secs(1000)); |
150 | |
151 | let err: std::io::Error = Handle::current() |
152 | .block_on(fs::read_to_string("Cargo.toml" )) |
153 | .unwrap_err(); |
154 | |
155 | assert_eq!(err.kind(), std::io::ErrorKind::Other); |
156 | |
157 | let inner_err = err.get_ref().expect("no inner error" ); |
158 | assert_eq!(inner_err.to_string(), "background task failed" ); |
159 | } |
160 | |
161 | #[test] |
162 | fn basic_spawn_blocking() { |
163 | use tokio::task::spawn_blocking; |
164 | let rt = rt(); |
165 | let _enter = rt.enter(); |
166 | |
167 | let answer = Handle::current() |
168 | .block_on(spawn_blocking(|| { |
169 | std::thread::sleep(Duration::from_millis(100)); |
170 | 42 |
171 | })) |
172 | .unwrap(); |
173 | |
174 | assert_eq!(answer, 42); |
175 | } |
176 | |
177 | #[test] |
178 | fn spawn_blocking_after_shutdown_fails() { |
179 | use tokio::task::spawn_blocking; |
180 | let rt = rt(); |
181 | let _enter = rt.enter(); |
182 | rt.shutdown_timeout(Duration::from_secs(1000)); |
183 | |
184 | let join_err = Handle::current() |
185 | .block_on(spawn_blocking(|| { |
186 | std::thread::sleep(Duration::from_millis(100)); |
187 | 42 |
188 | })) |
189 | .unwrap_err(); |
190 | |
191 | assert!(join_err.is_cancelled()); |
192 | } |
193 | |
194 | #[test] |
195 | fn spawn_blocking_started_before_shutdown_continues() { |
196 | use tokio::task::spawn_blocking; |
197 | let rt = rt(); |
198 | let _enter = rt.enter(); |
199 | |
200 | let handle = spawn_blocking(|| { |
201 | std::thread::sleep(Duration::from_secs(1)); |
202 | 42 |
203 | }); |
204 | |
205 | rt.shutdown_timeout(Duration::from_secs(1000)); |
206 | |
207 | let answer = Handle::current().block_on(handle).unwrap(); |
208 | |
209 | assert_eq!(answer, 42); |
210 | } |
211 | |
212 | // ==== net ====== |
213 | |
214 | #[test] |
215 | fn tcp_listener_bind() { |
216 | let rt = rt(); |
217 | let _enter = rt.enter(); |
218 | |
219 | Handle::current() |
220 | .block_on(net::TcpListener::bind("127.0.0.1:0" )) |
221 | .unwrap(); |
222 | } |
223 | |
224 | // All io tests are ignored for now. See above why that is. |
225 | #[ignore ] |
226 | #[test] |
227 | fn tcp_listener_connect_after_shutdown() { |
228 | let rt = rt(); |
229 | let _enter = rt.enter(); |
230 | |
231 | rt.shutdown_timeout(Duration::from_secs(1000)); |
232 | |
233 | let err = Handle::current() |
234 | .block_on(net::TcpListener::bind("127.0.0.1:0" )) |
235 | .unwrap_err(); |
236 | |
237 | assert_eq!(err.kind(), std::io::ErrorKind::Other); |
238 | assert_eq!( |
239 | err.get_ref().unwrap().to_string(), |
240 | "A Tokio 1.x context was found, but it is being shutdown." , |
241 | ); |
242 | } |
243 | |
244 | // All io tests are ignored for now. See above why that is. |
245 | #[ignore ] |
246 | #[test] |
247 | fn tcp_listener_connect_before_shutdown() { |
248 | let rt = rt(); |
249 | let _enter = rt.enter(); |
250 | |
251 | let bind_future = net::TcpListener::bind("127.0.0.1:0" ); |
252 | |
253 | rt.shutdown_timeout(Duration::from_secs(1000)); |
254 | |
255 | let err = Handle::current().block_on(bind_future).unwrap_err(); |
256 | |
257 | assert_eq!(err.kind(), std::io::ErrorKind::Other); |
258 | assert_eq!( |
259 | err.get_ref().unwrap().to_string(), |
260 | "A Tokio 1.x context was found, but it is being shutdown." , |
261 | ); |
262 | } |
263 | |
264 | #[test] |
265 | fn udp_socket_bind() { |
266 | let rt = rt(); |
267 | let _enter = rt.enter(); |
268 | |
269 | Handle::current() |
270 | .block_on(net::UdpSocket::bind("127.0.0.1:0" )) |
271 | .unwrap(); |
272 | } |
273 | |
274 | // All io tests are ignored for now. See above why that is. |
275 | #[ignore ] |
276 | #[test] |
277 | fn udp_stream_bind_after_shutdown() { |
278 | let rt = rt(); |
279 | let _enter = rt.enter(); |
280 | |
281 | rt.shutdown_timeout(Duration::from_secs(1000)); |
282 | |
283 | let err = Handle::current() |
284 | .block_on(net::UdpSocket::bind("127.0.0.1:0" )) |
285 | .unwrap_err(); |
286 | |
287 | assert_eq!(err.kind(), std::io::ErrorKind::Other); |
288 | assert_eq!( |
289 | err.get_ref().unwrap().to_string(), |
290 | "A Tokio 1.x context was found, but it is being shutdown." , |
291 | ); |
292 | } |
293 | |
294 | // All io tests are ignored for now. See above why that is. |
295 | #[ignore ] |
296 | #[test] |
297 | fn udp_stream_bind_before_shutdown() { |
298 | let rt = rt(); |
299 | let _enter = rt.enter(); |
300 | |
301 | let bind_future = net::UdpSocket::bind("127.0.0.1:0" ); |
302 | |
303 | rt.shutdown_timeout(Duration::from_secs(1000)); |
304 | |
305 | let err = Handle::current().block_on(bind_future).unwrap_err(); |
306 | |
307 | assert_eq!(err.kind(), std::io::ErrorKind::Other); |
308 | assert_eq!( |
309 | err.get_ref().unwrap().to_string(), |
310 | "A Tokio 1.x context was found, but it is being shutdown." , |
311 | ); |
312 | } |
313 | |
314 | // All io tests are ignored for now. See above why that is. |
315 | #[ignore ] |
316 | #[cfg (unix)] |
317 | #[test] |
318 | fn unix_listener_bind_after_shutdown() { |
319 | let rt = rt(); |
320 | let _enter = rt.enter(); |
321 | |
322 | let dir = tempfile::tempdir().unwrap(); |
323 | let path = dir.path().join("socket" ); |
324 | |
325 | rt.shutdown_timeout(Duration::from_secs(1000)); |
326 | |
327 | let err = net::UnixListener::bind(path).unwrap_err(); |
328 | |
329 | assert_eq!(err.kind(), std::io::ErrorKind::Other); |
330 | assert_eq!( |
331 | err.get_ref().unwrap().to_string(), |
332 | "A Tokio 1.x context was found, but it is being shutdown." , |
333 | ); |
334 | } |
335 | |
336 | // All io tests are ignored for now. See above why that is. |
337 | #[ignore ] |
338 | #[cfg (unix)] |
339 | #[test] |
340 | fn unix_listener_shutdown_after_bind() { |
341 | let rt = rt(); |
342 | let _enter = rt.enter(); |
343 | |
344 | let dir = tempfile::tempdir().unwrap(); |
345 | let path = dir.path().join("socket" ); |
346 | |
347 | let listener = net::UnixListener::bind(path).unwrap(); |
348 | |
349 | rt.shutdown_timeout(Duration::from_secs(1000)); |
350 | |
351 | // this should not timeout but fail immediately since the runtime has been shutdown |
352 | let err = Handle::current().block_on(listener.accept()).unwrap_err(); |
353 | |
354 | assert_eq!(err.kind(), std::io::ErrorKind::Other); |
355 | assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone" ); |
356 | } |
357 | |
358 | // All io tests are ignored for now. See above why that is. |
359 | #[ignore ] |
360 | #[cfg (unix)] |
361 | #[test] |
362 | fn unix_listener_shutdown_after_accept() { |
363 | let rt = rt(); |
364 | let _enter = rt.enter(); |
365 | |
366 | let dir = tempfile::tempdir().unwrap(); |
367 | let path = dir.path().join("socket" ); |
368 | |
369 | let listener = net::UnixListener::bind(path).unwrap(); |
370 | |
371 | let accept_future = listener.accept(); |
372 | |
373 | rt.shutdown_timeout(Duration::from_secs(1000)); |
374 | |
375 | // this should not timeout but fail immediately since the runtime has been shutdown |
376 | let err = Handle::current().block_on(accept_future).unwrap_err(); |
377 | |
378 | assert_eq!(err.kind(), std::io::ErrorKind::Other); |
379 | assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone" ); |
380 | } |
381 | |
382 | // ==== nesting ====== |
383 | |
384 | #[test] |
385 | #[should_panic ( |
386 | expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks." |
387 | )] |
388 | fn nesting() { |
389 | fn some_non_async_function() -> i32 { |
390 | Handle::current().block_on(time::sleep(Duration::from_millis(10))); |
391 | 1 |
392 | } |
393 | |
394 | let rt = rt(); |
395 | |
396 | rt.block_on(async { some_non_async_function() }); |
397 | } |
398 | |
399 | #[test] |
400 | fn spawn_after_runtime_dropped() { |
401 | use futures::future::FutureExt; |
402 | |
403 | let rt = rt(); |
404 | |
405 | let handle = rt.block_on(async move { |
406 | Handle::current() |
407 | }); |
408 | |
409 | let jh1 = handle.spawn(futures::future::pending::<()>()); |
410 | |
411 | drop(rt); |
412 | |
413 | let jh2 = handle.spawn(futures::future::pending::<()>()); |
414 | |
415 | let err1 = jh1.now_or_never().unwrap().unwrap_err(); |
416 | let err2 = jh2.now_or_never().unwrap().unwrap_err(); |
417 | assert!(err1.is_cancelled()); |
418 | assert!(err2.is_cancelled()); |
419 | } |
420 | } |
421 | |
422 | #[cfg (not(target_os = "wasi" ))] |
423 | multi_threaded_rt_test! { |
424 | #[cfg (unix)] |
425 | #[test] |
426 | fn unix_listener_bind() { |
427 | let rt = rt(); |
428 | let _enter = rt.enter(); |
429 | |
430 | let dir = tempfile::tempdir().unwrap(); |
431 | let path = dir.path().join("socket" ); |
432 | |
433 | let listener = net::UnixListener::bind(path).unwrap(); |
434 | |
435 | // this should timeout and not fail immediately since the runtime has not been shutdown |
436 | let _: tokio::time::error::Elapsed = Handle::current() |
437 | .block_on(tokio::time::timeout( |
438 | Duration::from_millis(10), |
439 | listener.accept(), |
440 | )) |
441 | .unwrap_err(); |
442 | } |
443 | |
444 | // ==== timers ====== |
445 | |
446 | // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no |
447 | // one to drive the timers so they will just hang forever. Therefore they are not tested. |
448 | |
449 | #[test] |
450 | fn sleep() { |
451 | let rt = rt(); |
452 | let _enter = rt.enter(); |
453 | |
454 | Handle::current().block_on(time::sleep(Duration::from_millis(100))); |
455 | } |
456 | |
457 | #[test] |
458 | #[should_panic (expected = "A Tokio 1.x context was found, but it is being shutdown." )] |
459 | fn sleep_before_shutdown_panics() { |
460 | let rt = rt(); |
461 | let _enter = rt.enter(); |
462 | |
463 | let f = time::sleep(Duration::from_millis(100)); |
464 | |
465 | rt.shutdown_timeout(Duration::from_secs(1000)); |
466 | |
467 | Handle::current().block_on(f); |
468 | } |
469 | |
470 | #[test] |
471 | #[should_panic (expected = "A Tokio 1.x context was found, but it is being shutdown." )] |
472 | fn sleep_after_shutdown_panics() { |
473 | let rt = rt(); |
474 | let _enter = rt.enter(); |
475 | |
476 | rt.shutdown_timeout(Duration::from_secs(1000)); |
477 | |
478 | Handle::current().block_on(time::sleep(Duration::from_millis(100))); |
479 | } |
480 | } |
481 | |
482 | // ==== utils ====== |
483 | |
484 | /// Create a new multi threaded runtime |
485 | #[cfg (not(target_os = "wasi" ))] |
486 | fn new_multi_thread(n: usize) -> Runtime { |
487 | tokio::runtime::Builder::new_multi_thread() |
488 | .worker_threads(n) |
489 | .enable_all() |
490 | .build() |
491 | .unwrap() |
492 | } |
493 | |
494 | /// Create a new single threaded runtime |
495 | fn new_current_thread() -> Runtime { |
496 | tokio::runtime::Builder::new_current_thread() |
497 | .enable_all() |
498 | .build() |
499 | .unwrap() |
500 | } |
501 | |
502 | /// Utility to test things on both kinds of runtimes both before and after shutting it down. |
503 | fn test_with_runtimes<F>(f: F) |
504 | where |
505 | F: Fn(), |
506 | { |
507 | { |
508 | let rt = new_current_thread(); |
509 | let _enter = rt.enter(); |
510 | f(); |
511 | |
512 | rt.shutdown_timeout(Duration::from_secs(1000)); |
513 | f(); |
514 | } |
515 | |
516 | #[cfg (not(target_os = "wasi" ))] |
517 | { |
518 | let rt = new_multi_thread(1); |
519 | let _enter = rt.enter(); |
520 | f(); |
521 | |
522 | rt.shutdown_timeout(Duration::from_secs(1000)); |
523 | f(); |
524 | } |
525 | |
526 | #[cfg (not(target_os = "wasi" ))] |
527 | { |
528 | let rt = new_multi_thread(4); |
529 | let _enter = rt.enter(); |
530 | f(); |
531 | |
532 | rt.shutdown_timeout(Duration::from_secs(1000)); |
533 | f(); |
534 | } |
535 | } |
536 | |