1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (all(feature = "full" , not(target_os = "wasi" )))] // Wasi doesn't support threads |
3 | |
4 | use tokio::{runtime, task, time}; |
5 | use tokio_test::assert_ok; |
6 | |
7 | use std::thread; |
8 | use std::time::Duration; |
9 | |
10 | mod support { |
11 | pub(crate) mod mpsc_stream; |
12 | } |
13 | |
14 | #[tokio::test ] |
15 | async fn basic_blocking() { |
16 | // Run a few times |
17 | for _ in 0..100 { |
18 | let out = assert_ok!( |
19 | tokio::spawn(async { |
20 | assert_ok!( |
21 | task::spawn_blocking(|| { |
22 | thread::sleep(Duration::from_millis(5)); |
23 | "hello" |
24 | }) |
25 | .await |
26 | ) |
27 | }) |
28 | .await |
29 | ); |
30 | |
31 | assert_eq!(out, "hello" ); |
32 | } |
33 | } |
34 | |
35 | #[tokio::test (flavor = "multi_thread" )] |
36 | async fn block_in_blocking() { |
37 | // Run a few times |
38 | for _ in 0..100 { |
39 | let out = assert_ok!( |
40 | tokio::spawn(async { |
41 | assert_ok!( |
42 | task::spawn_blocking(|| { |
43 | task::block_in_place(|| { |
44 | thread::sleep(Duration::from_millis(5)); |
45 | }); |
46 | "hello" |
47 | }) |
48 | .await |
49 | ) |
50 | }) |
51 | .await |
52 | ); |
53 | |
54 | assert_eq!(out, "hello" ); |
55 | } |
56 | } |
57 | |
58 | #[tokio::test (flavor = "multi_thread" )] |
59 | async fn block_in_block() { |
60 | // Run a few times |
61 | for _ in 0..100 { |
62 | let out = assert_ok!( |
63 | tokio::spawn(async { |
64 | task::block_in_place(|| { |
65 | task::block_in_place(|| { |
66 | thread::sleep(Duration::from_millis(5)); |
67 | }); |
68 | "hello" |
69 | }) |
70 | }) |
71 | .await |
72 | ); |
73 | |
74 | assert_eq!(out, "hello" ); |
75 | } |
76 | } |
77 | |
78 | #[tokio::test (flavor = "current_thread" )] |
79 | #[should_panic ] |
80 | async fn no_block_in_current_thread_scheduler() { |
81 | task::block_in_place(|| {}); |
82 | } |
83 | |
84 | #[test] |
85 | fn yes_block_in_threaded_block_on() { |
86 | let rt = runtime::Runtime::new().unwrap(); |
87 | rt.block_on(async { |
88 | task::block_in_place(|| {}); |
89 | }); |
90 | } |
91 | |
92 | #[test] |
93 | #[should_panic ] |
94 | fn no_block_in_current_thread_block_on() { |
95 | let rt = runtime::Builder::new_current_thread().build().unwrap(); |
96 | rt.block_on(async { |
97 | task::block_in_place(|| {}); |
98 | }); |
99 | } |
100 | |
101 | #[test] |
102 | fn can_enter_current_thread_rt_from_within_block_in_place() { |
103 | let outer = tokio::runtime::Runtime::new().unwrap(); |
104 | |
105 | outer.block_on(async { |
106 | tokio::task::block_in_place(|| { |
107 | let inner = tokio::runtime::Builder::new_current_thread() |
108 | .build() |
109 | .unwrap(); |
110 | |
111 | inner.block_on(async {}) |
112 | }) |
113 | }); |
114 | } |
115 | |
116 | #[test] |
117 | #[cfg (panic = "unwind" )] |
118 | fn useful_panic_message_when_dropping_rt_in_rt() { |
119 | use std::panic::{catch_unwind, AssertUnwindSafe}; |
120 | |
121 | let outer = tokio::runtime::Runtime::new().unwrap(); |
122 | |
123 | let result = catch_unwind(AssertUnwindSafe(|| { |
124 | outer.block_on(async { |
125 | let _ = tokio::runtime::Builder::new_current_thread() |
126 | .build() |
127 | .unwrap(); |
128 | }); |
129 | })); |
130 | |
131 | assert!(result.is_err()); |
132 | let err = result.unwrap_err(); |
133 | let err: &'static str = err.downcast_ref::<&'static str>().unwrap(); |
134 | |
135 | assert!( |
136 | err.contains("Cannot drop a runtime" ), |
137 | "Wrong panic message: {:?}" , |
138 | err |
139 | ); |
140 | } |
141 | |
142 | #[test] |
143 | fn can_shutdown_with_zero_timeout_in_runtime() { |
144 | let outer = tokio::runtime::Runtime::new().unwrap(); |
145 | |
146 | outer.block_on(async { |
147 | let rt = tokio::runtime::Builder::new_current_thread() |
148 | .build() |
149 | .unwrap(); |
150 | rt.shutdown_timeout(Duration::from_nanos(0)); |
151 | }); |
152 | } |
153 | |
154 | #[test] |
155 | fn can_shutdown_now_in_runtime() { |
156 | let outer = tokio::runtime::Runtime::new().unwrap(); |
157 | |
158 | outer.block_on(async { |
159 | let rt = tokio::runtime::Builder::new_current_thread() |
160 | .build() |
161 | .unwrap(); |
162 | rt.shutdown_background(); |
163 | }); |
164 | } |
165 | |
166 | #[test] |
167 | fn coop_disabled_in_block_in_place() { |
168 | let outer = tokio::runtime::Builder::new_multi_thread() |
169 | .enable_time() |
170 | .build() |
171 | .unwrap(); |
172 | |
173 | let (tx, rx) = support::mpsc_stream::unbounded_channel_stream(); |
174 | |
175 | for i in 0..200 { |
176 | tx.send(i).unwrap(); |
177 | } |
178 | drop(tx); |
179 | |
180 | outer.block_on(async move { |
181 | let jh = tokio::spawn(async move { |
182 | tokio::task::block_in_place(move || { |
183 | futures::executor::block_on(async move { |
184 | use tokio_stream::StreamExt; |
185 | assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); |
186 | }) |
187 | }) |
188 | }); |
189 | |
190 | tokio::time::timeout(Duration::from_secs(1), jh) |
191 | .await |
192 | .expect("timed out (probably hanging)" ) |
193 | .unwrap() |
194 | }); |
195 | } |
196 | |
197 | #[test] |
198 | fn coop_disabled_in_block_in_place_in_block_on() { |
199 | let (done_tx, done_rx) = std::sync::mpsc::channel(); |
200 | let done = done_tx.clone(); |
201 | thread::spawn(move || { |
202 | let outer = tokio::runtime::Runtime::new().unwrap(); |
203 | |
204 | let (tx, rx) = support::mpsc_stream::unbounded_channel_stream(); |
205 | |
206 | for i in 0..200 { |
207 | tx.send(i).unwrap(); |
208 | } |
209 | drop(tx); |
210 | |
211 | outer.block_on(async move { |
212 | tokio::task::block_in_place(move || { |
213 | futures::executor::block_on(async move { |
214 | use tokio_stream::StreamExt; |
215 | assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); |
216 | }) |
217 | }) |
218 | }); |
219 | |
220 | let _ = done.send(Ok(())); |
221 | }); |
222 | |
223 | thread::spawn(move || { |
224 | thread::sleep(Duration::from_secs(1)); |
225 | let _ = done_tx.send(Err("timed out (probably hanging)" )); |
226 | }); |
227 | |
228 | done_rx.recv().unwrap().unwrap(); |
229 | } |
230 | |
231 | #[cfg (feature = "test-util" )] |
232 | #[tokio::test (start_paused = true)] |
233 | async fn blocking_when_paused() { |
234 | // Do not auto-advance time when we have started a blocking task that has |
235 | // not yet finished. |
236 | time::timeout( |
237 | Duration::from_secs(3), |
238 | task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), |
239 | ) |
240 | .await |
241 | .expect("timeout should not trigger" ) |
242 | .expect("blocking task should finish" ); |
243 | |
244 | // Really: Do not auto-advance time, even if the timeout is short and the |
245 | // blocking task runs for longer than that. It doesn't matter: Tokio time |
246 | // is paused; system time is not. |
247 | time::timeout( |
248 | Duration::from_millis(1), |
249 | task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))), |
250 | ) |
251 | .await |
252 | .expect("timeout should not trigger" ) |
253 | .expect("blocking task should finish" ); |
254 | } |
255 | |
256 | #[cfg (feature = "test-util" )] |
257 | #[tokio::test (start_paused = true)] |
258 | async fn blocking_task_wakes_paused_runtime() { |
259 | let t0 = std::time::Instant::now(); |
260 | time::timeout( |
261 | Duration::from_secs(15), |
262 | task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), |
263 | ) |
264 | .await |
265 | .expect("timeout should not trigger" ) |
266 | .expect("blocking task should finish" ); |
267 | assert!( |
268 | t0.elapsed() < Duration::from_secs(10), |
269 | "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" |
270 | ); |
271 | } |
272 | |
273 | #[cfg (feature = "test-util" )] |
274 | #[tokio::test (start_paused = true)] |
275 | async fn unawaited_blocking_task_wakes_paused_runtime() { |
276 | let t0 = std::time::Instant::now(); |
277 | |
278 | // When this task finishes, time should auto-advance, even though the |
279 | // JoinHandle has not been awaited yet. |
280 | let a = task::spawn_blocking(|| { |
281 | thread::sleep(Duration::from_millis(1)); |
282 | }); |
283 | |
284 | crate::time::sleep(Duration::from_secs(15)).await; |
285 | a.await.expect("blocking task should finish" ); |
286 | assert!( |
287 | t0.elapsed() < Duration::from_secs(10), |
288 | "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" |
289 | ); |
290 | } |
291 | |
292 | #[cfg (panic = "unwind" )] |
293 | #[cfg (feature = "test-util" )] |
294 | #[tokio::test (start_paused = true)] |
295 | async fn panicking_blocking_task_wakes_paused_runtime() { |
296 | let t0 = std::time::Instant::now(); |
297 | let result = time::timeout( |
298 | Duration::from_secs(15), |
299 | task::spawn_blocking(|| { |
300 | thread::sleep(Duration::from_millis(1)); |
301 | panic!("blocking task panicked" ); |
302 | }), |
303 | ) |
304 | .await |
305 | .expect("timeout should not trigger" ); |
306 | assert!(result.is_err(), "blocking task should have panicked" ); |
307 | assert!( |
308 | t0.elapsed() < Duration::from_secs(10), |
309 | "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" |
310 | ); |
311 | } |
312 | |