1#![warn(rust_2018_idioms)]
2#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
3
4use tokio::{runtime, task, time};
5use tokio_test::assert_ok;
6
7use std::thread;
8use std::time::Duration;
9
10mod support {
11 pub(crate) mod mpsc_stream;
12}
13
14#[tokio::test]
15async fn basic_blocking() {
16 // Run a few times
17 for _ in 0..100 {
18 let out = assert_ok!(
19 tokio::spawn(async {
20 assert_ok!(
21 task::spawn_blocking(|| {
22 thread::sleep(Duration::from_millis(5));
23 "hello"
24 })
25 .await
26 )
27 })
28 .await
29 );
30
31 assert_eq!(out, "hello");
32 }
33}
34
35#[tokio::test(flavor = "multi_thread")]
36async fn block_in_blocking() {
37 // Run a few times
38 for _ in 0..100 {
39 let out = assert_ok!(
40 tokio::spawn(async {
41 assert_ok!(
42 task::spawn_blocking(|| {
43 task::block_in_place(|| {
44 thread::sleep(Duration::from_millis(5));
45 });
46 "hello"
47 })
48 .await
49 )
50 })
51 .await
52 );
53
54 assert_eq!(out, "hello");
55 }
56}
57
58#[tokio::test(flavor = "multi_thread")]
59async fn block_in_block() {
60 // Run a few times
61 for _ in 0..100 {
62 let out = assert_ok!(
63 tokio::spawn(async {
64 task::block_in_place(|| {
65 task::block_in_place(|| {
66 thread::sleep(Duration::from_millis(5));
67 });
68 "hello"
69 })
70 })
71 .await
72 );
73
74 assert_eq!(out, "hello");
75 }
76}
77
78#[tokio::test(flavor = "current_thread")]
79#[should_panic]
80async fn no_block_in_current_thread_scheduler() {
81 task::block_in_place(|| {});
82}
83
84#[test]
85fn yes_block_in_threaded_block_on() {
86 let rt = runtime::Runtime::new().unwrap();
87 rt.block_on(async {
88 task::block_in_place(|| {});
89 });
90}
91
92#[test]
93#[should_panic]
94fn no_block_in_current_thread_block_on() {
95 let rt = runtime::Builder::new_current_thread().build().unwrap();
96 rt.block_on(async {
97 task::block_in_place(|| {});
98 });
99}
100
101#[test]
102fn can_enter_current_thread_rt_from_within_block_in_place() {
103 let outer = tokio::runtime::Runtime::new().unwrap();
104
105 outer.block_on(async {
106 tokio::task::block_in_place(|| {
107 let inner = tokio::runtime::Builder::new_current_thread()
108 .build()
109 .unwrap();
110
111 inner.block_on(async {})
112 })
113 });
114}
115
116#[test]
117#[cfg(panic = "unwind")]
118fn useful_panic_message_when_dropping_rt_in_rt() {
119 use std::panic::{catch_unwind, AssertUnwindSafe};
120
121 let outer = tokio::runtime::Runtime::new().unwrap();
122
123 let result = catch_unwind(AssertUnwindSafe(|| {
124 outer.block_on(async {
125 let _ = tokio::runtime::Builder::new_current_thread()
126 .build()
127 .unwrap();
128 });
129 }));
130
131 assert!(result.is_err());
132 let err = result.unwrap_err();
133 let err: &'static str = err.downcast_ref::<&'static str>().unwrap();
134
135 assert!(
136 err.contains("Cannot drop a runtime"),
137 "Wrong panic message: {:?}",
138 err
139 );
140}
141
142#[test]
143fn can_shutdown_with_zero_timeout_in_runtime() {
144 let outer = tokio::runtime::Runtime::new().unwrap();
145
146 outer.block_on(async {
147 let rt = tokio::runtime::Builder::new_current_thread()
148 .build()
149 .unwrap();
150 rt.shutdown_timeout(Duration::from_nanos(0));
151 });
152}
153
154#[test]
155fn can_shutdown_now_in_runtime() {
156 let outer = tokio::runtime::Runtime::new().unwrap();
157
158 outer.block_on(async {
159 let rt = tokio::runtime::Builder::new_current_thread()
160 .build()
161 .unwrap();
162 rt.shutdown_background();
163 });
164}
165
166#[test]
167fn coop_disabled_in_block_in_place() {
168 let outer = tokio::runtime::Builder::new_multi_thread()
169 .enable_time()
170 .build()
171 .unwrap();
172
173 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
174
175 for i in 0..200 {
176 tx.send(i).unwrap();
177 }
178 drop(tx);
179
180 outer.block_on(async move {
181 let jh = tokio::spawn(async move {
182 tokio::task::block_in_place(move || {
183 futures::executor::block_on(async move {
184 use tokio_stream::StreamExt;
185 assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
186 })
187 })
188 });
189
190 tokio::time::timeout(Duration::from_secs(1), jh)
191 .await
192 .expect("timed out (probably hanging)")
193 .unwrap()
194 });
195}
196
197#[test]
198fn coop_disabled_in_block_in_place_in_block_on() {
199 let (done_tx, done_rx) = std::sync::mpsc::channel();
200 let done = done_tx.clone();
201 thread::spawn(move || {
202 let outer = tokio::runtime::Runtime::new().unwrap();
203
204 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
205
206 for i in 0..200 {
207 tx.send(i).unwrap();
208 }
209 drop(tx);
210
211 outer.block_on(async move {
212 tokio::task::block_in_place(move || {
213 futures::executor::block_on(async move {
214 use tokio_stream::StreamExt;
215 assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
216 })
217 })
218 });
219
220 let _ = done.send(Ok(()));
221 });
222
223 thread::spawn(move || {
224 thread::sleep(Duration::from_secs(1));
225 let _ = done_tx.send(Err("timed out (probably hanging)"));
226 });
227
228 done_rx.recv().unwrap().unwrap();
229}
230
231#[cfg(feature = "test-util")]
232#[tokio::test(start_paused = true)]
233async fn blocking_when_paused() {
234 // Do not auto-advance time when we have started a blocking task that has
235 // not yet finished.
236 time::timeout(
237 Duration::from_secs(3),
238 task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
239 )
240 .await
241 .expect("timeout should not trigger")
242 .expect("blocking task should finish");
243
244 // Really: Do not auto-advance time, even if the timeout is short and the
245 // blocking task runs for longer than that. It doesn't matter: Tokio time
246 // is paused; system time is not.
247 time::timeout(
248 Duration::from_millis(1),
249 task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))),
250 )
251 .await
252 .expect("timeout should not trigger")
253 .expect("blocking task should finish");
254}
255
256#[cfg(feature = "test-util")]
257#[tokio::test(start_paused = true)]
258async fn blocking_task_wakes_paused_runtime() {
259 let t0 = std::time::Instant::now();
260 time::timeout(
261 Duration::from_secs(15),
262 task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
263 )
264 .await
265 .expect("timeout should not trigger")
266 .expect("blocking task should finish");
267 assert!(
268 t0.elapsed() < Duration::from_secs(10),
269 "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
270 );
271}
272
273#[cfg(feature = "test-util")]
274#[tokio::test(start_paused = true)]
275async fn unawaited_blocking_task_wakes_paused_runtime() {
276 let t0 = std::time::Instant::now();
277
278 // When this task finishes, time should auto-advance, even though the
279 // JoinHandle has not been awaited yet.
280 let a = task::spawn_blocking(|| {
281 thread::sleep(Duration::from_millis(1));
282 });
283
284 crate::time::sleep(Duration::from_secs(15)).await;
285 a.await.expect("blocking task should finish");
286 assert!(
287 t0.elapsed() < Duration::from_secs(10),
288 "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
289 );
290}
291
292#[cfg(panic = "unwind")]
293#[cfg(feature = "test-util")]
294#[tokio::test(start_paused = true)]
295async fn panicking_blocking_task_wakes_paused_runtime() {
296 let t0 = std::time::Instant::now();
297 let result = time::timeout(
298 Duration::from_secs(15),
299 task::spawn_blocking(|| {
300 thread::sleep(Duration::from_millis(1));
301 panic!("blocking task panicked");
302 }),
303 )
304 .await
305 .expect("timeout should not trigger");
306 assert!(result.is_err(), "blocking task should have panicked");
307 assert!(
308 t0.elapsed() < Duration::from_secs(10),
309 "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
310 );
311}
312