1 | #![warn (rust_2018_idioms)] |
2 | // Wasi does not support panic recovery or threading |
3 | #![cfg (all(feature = "full" , not(target_os = "wasi" )))] |
4 | |
5 | use tokio::net::TcpListener; |
6 | use tokio::runtime; |
7 | use tokio_test::{assert_ok, assert_pending}; |
8 | |
9 | use futures::task::{waker_ref, ArcWake}; |
10 | use std::future::Future; |
11 | use std::net::TcpStream; |
12 | use std::pin::Pin; |
13 | use std::sync::{mpsc, Arc, Mutex}; |
14 | use std::task::Context; |
15 | |
16 | struct Task<T> { |
17 | future: Mutex<Pin<Box<T>>>, |
18 | } |
19 | |
20 | impl<T: Send> ArcWake for Task<T> { |
21 | fn wake_by_ref(_: &Arc<Self>) { |
22 | // Do nothing... |
23 | } |
24 | } |
25 | |
26 | impl<T> Task<T> { |
27 | fn new(future: T) -> Task<T> { |
28 | Task { |
29 | future: Mutex::new(Box::pin(future)), |
30 | } |
31 | } |
32 | } |
33 | |
34 | #[test] |
35 | fn test_drop_on_notify() { |
36 | // When the reactor receives a kernel notification, it notifies the |
37 | // task that holds the associated socket. If this notification results in |
38 | // the task being dropped, the socket will also be dropped. |
39 | // |
40 | // Previously, there was a deadlock scenario where the reactor, while |
41 | // notifying, held a lock and the task being dropped attempted to acquire |
42 | // that same lock in order to clean up state. |
43 | // |
44 | // To simulate this case, we create a fake executor that does nothing when |
45 | // the task is notified. This simulates an executor in the process of |
46 | // shutting down. Then, when the task handle is dropped, the task itself is |
47 | // dropped. |
48 | |
49 | let rt = runtime::Builder::new_current_thread() |
50 | .enable_all() |
51 | .build() |
52 | .unwrap(); |
53 | |
54 | let (addr_tx, addr_rx) = mpsc::channel(); |
55 | |
56 | // Define a task that just drains the listener |
57 | let task = Arc::new(Task::new(async move { |
58 | // Create a listener |
59 | let listener = assert_ok!(TcpListener::bind("127.0.0.1:0" ).await); |
60 | |
61 | // Send the address |
62 | let addr = listener.local_addr().unwrap(); |
63 | addr_tx.send(addr).unwrap(); |
64 | |
65 | loop { |
66 | let _ = listener.accept().await; |
67 | } |
68 | })); |
69 | |
70 | { |
71 | let _enter = rt.enter(); |
72 | let waker = waker_ref(&task); |
73 | let mut cx = Context::from_waker(&waker); |
74 | assert_pending!(task.future.lock().unwrap().as_mut().poll(&mut cx)); |
75 | } |
76 | |
77 | // Get the address |
78 | let addr = addr_rx.recv().unwrap(); |
79 | |
80 | drop(task); |
81 | |
82 | // Establish a connection to the acceptor |
83 | let _s = TcpStream::connect(addr).unwrap(); |
84 | |
85 | // Force the reactor to turn |
86 | rt.block_on(async {}); |
87 | } |
88 | |
89 | #[test] |
90 | #[should_panic ( |
91 | expected = "A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO." |
92 | )] |
93 | fn panics_when_io_disabled() { |
94 | let rt = runtime::Builder::new_current_thread().build().unwrap(); |
95 | |
96 | rt.block_on(async { |
97 | let _ = |
98 | tokio::net::TcpListener::from_std(std::net::TcpListener::bind("127.0.0.1:0" ).unwrap()); |
99 | }); |
100 | } |
101 | |