1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "full" )] |
3 | |
4 | use tokio::runtime::Runtime; |
5 | use tokio::sync::oneshot; |
6 | use tokio::time::{timeout, Duration}; |
7 | use tokio_test::{assert_err, assert_ok}; |
8 | |
9 | use std::future::Future; |
10 | use std::pin::Pin; |
11 | use std::sync::atomic::{AtomicBool, Ordering}; |
12 | use std::task::{Context, Poll}; |
13 | use std::thread; |
14 | |
15 | mod support { |
16 | pub(crate) mod mpsc_stream; |
17 | } |
18 | |
19 | macro_rules! cfg_metrics { |
20 | ($($t:tt)*) => { |
21 | #[cfg(tokio_unstable)] |
22 | { |
23 | $( $t )* |
24 | } |
25 | } |
26 | } |
27 | |
28 | #[test] |
29 | fn spawned_task_does_not_progress_without_block_on() { |
30 | let (tx, mut rx) = oneshot::channel(); |
31 | |
32 | let rt = rt(); |
33 | |
34 | rt.spawn(async move { |
35 | assert_ok!(tx.send("hello" )); |
36 | }); |
37 | |
38 | thread::sleep(Duration::from_millis(50)); |
39 | |
40 | assert_err!(rx.try_recv()); |
41 | |
42 | let out = rt.block_on(async { assert_ok!(rx.await) }); |
43 | |
44 | assert_eq!(out, "hello" ); |
45 | } |
46 | |
47 | #[test] |
48 | fn no_extra_poll() { |
49 | use pin_project_lite::pin_project; |
50 | use std::pin::Pin; |
51 | use std::sync::{ |
52 | atomic::{AtomicUsize, Ordering::SeqCst}, |
53 | Arc, |
54 | }; |
55 | use std::task::{Context, Poll}; |
56 | use tokio_stream::{Stream, StreamExt}; |
57 | |
58 | pin_project! { |
59 | struct TrackPolls<S> { |
60 | npolls: Arc<AtomicUsize>, |
61 | #[pin] |
62 | s: S, |
63 | } |
64 | } |
65 | |
66 | impl<S> Stream for TrackPolls<S> |
67 | where |
68 | S: Stream, |
69 | { |
70 | type Item = S::Item; |
71 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
72 | let this = self.project(); |
73 | this.npolls.fetch_add(1, SeqCst); |
74 | this.s.poll_next(cx) |
75 | } |
76 | } |
77 | |
78 | let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>(); |
79 | let rx = TrackPolls { |
80 | npolls: Arc::new(AtomicUsize::new(0)), |
81 | s: rx, |
82 | }; |
83 | let npolls = Arc::clone(&rx.npolls); |
84 | |
85 | let rt = rt(); |
86 | |
87 | // TODO: could probably avoid this, but why not. |
88 | let mut rx = Box::pin(rx); |
89 | |
90 | rt.spawn(async move { while rx.next().await.is_some() {} }); |
91 | rt.block_on(async { |
92 | tokio::task::yield_now().await; |
93 | }); |
94 | |
95 | // should have been polled exactly once: the initial poll |
96 | assert_eq!(npolls.load(SeqCst), 1); |
97 | |
98 | tx.send(()).unwrap(); |
99 | rt.block_on(async { |
100 | tokio::task::yield_now().await; |
101 | }); |
102 | |
103 | // should have been polled twice more: once to yield Some(), then once to yield Pending |
104 | assert_eq!(npolls.load(SeqCst), 1 + 2); |
105 | |
106 | drop(tx); |
107 | rt.block_on(async { |
108 | tokio::task::yield_now().await; |
109 | }); |
110 | |
111 | // should have been polled once more: to yield None |
112 | assert_eq!(npolls.load(SeqCst), 1 + 2 + 1); |
113 | } |
114 | |
115 | #[test] |
116 | fn acquire_mutex_in_drop() { |
117 | use futures::future::pending; |
118 | use tokio::task; |
119 | |
120 | let (tx1, rx1) = oneshot::channel(); |
121 | let (tx2, rx2) = oneshot::channel(); |
122 | |
123 | let rt = rt(); |
124 | |
125 | rt.spawn(async move { |
126 | let _ = rx2.await; |
127 | unreachable!(); |
128 | }); |
129 | |
130 | rt.spawn(async move { |
131 | let _ = rx1.await; |
132 | tx2.send(()).unwrap(); |
133 | unreachable!(); |
134 | }); |
135 | |
136 | // Spawn a task that will never notify |
137 | rt.spawn(async move { |
138 | pending::<()>().await; |
139 | tx1.send(()).unwrap(); |
140 | }); |
141 | |
142 | // Tick the loop |
143 | rt.block_on(async { |
144 | task::yield_now().await; |
145 | }); |
146 | |
147 | // Drop the rt |
148 | drop(rt); |
149 | } |
150 | |
151 | #[test] |
152 | fn drop_tasks_in_context() { |
153 | static SUCCESS: AtomicBool = AtomicBool::new(false); |
154 | |
155 | struct ContextOnDrop; |
156 | |
157 | impl Future for ContextOnDrop { |
158 | type Output = (); |
159 | |
160 | fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
161 | Poll::Pending |
162 | } |
163 | } |
164 | |
165 | impl Drop for ContextOnDrop { |
166 | fn drop(&mut self) { |
167 | if tokio::runtime::Handle::try_current().is_ok() { |
168 | SUCCESS.store(true, Ordering::SeqCst); |
169 | } |
170 | } |
171 | } |
172 | |
173 | let rt = rt(); |
174 | rt.spawn(ContextOnDrop); |
175 | drop(rt); |
176 | |
177 | assert!(SUCCESS.load(Ordering::SeqCst)); |
178 | } |
179 | |
180 | #[test] |
181 | #[cfg_attr (target_os = "wasi" , ignore = "Wasi does not support panic recovery" )] |
182 | #[should_panic (expected = "boom" )] |
183 | fn wake_in_drop_after_panic() { |
184 | let (tx, rx) = oneshot::channel::<()>(); |
185 | |
186 | struct WakeOnDrop(Option<oneshot::Sender<()>>); |
187 | |
188 | impl Drop for WakeOnDrop { |
189 | fn drop(&mut self) { |
190 | self.0.take().unwrap().send(()).unwrap(); |
191 | } |
192 | } |
193 | |
194 | let rt = rt(); |
195 | |
196 | rt.spawn(async move { |
197 | let _wake_on_drop = WakeOnDrop(Some(tx)); |
198 | // wait forever |
199 | futures::future::pending::<()>().await; |
200 | }); |
201 | |
202 | let _join = rt.spawn(async move { rx.await }); |
203 | |
204 | rt.block_on(async { |
205 | tokio::task::yield_now().await; |
206 | panic!("boom" ); |
207 | }); |
208 | } |
209 | |
210 | #[test] |
211 | fn spawn_two() { |
212 | let rt = rt(); |
213 | |
214 | let out = rt.block_on(async { |
215 | let (tx, rx) = oneshot::channel(); |
216 | |
217 | tokio::spawn(async move { |
218 | tokio::spawn(async move { |
219 | tx.send("ZOMG" ).unwrap(); |
220 | }); |
221 | }); |
222 | |
223 | assert_ok!(rx.await) |
224 | }); |
225 | |
226 | assert_eq!(out, "ZOMG" ); |
227 | |
228 | cfg_metrics! { |
229 | let metrics = rt.metrics(); |
230 | drop(rt); |
231 | assert_eq!(0, metrics.remote_schedule_count()); |
232 | |
233 | let mut local = 0; |
234 | for i in 0..metrics.num_workers() { |
235 | local += metrics.worker_local_schedule_count(i); |
236 | } |
237 | |
238 | assert_eq!(2, local); |
239 | } |
240 | } |
241 | |
242 | #[cfg_attr (target_os = "wasi" , ignore = "WASI: std::thread::spawn not supported" )] |
243 | #[test] |
244 | fn spawn_remote() { |
245 | let rt = rt(); |
246 | |
247 | let out = rt.block_on(async { |
248 | let (tx, rx) = oneshot::channel(); |
249 | |
250 | let handle = tokio::spawn(async move { |
251 | std::thread::spawn(move || { |
252 | std::thread::sleep(Duration::from_millis(10)); |
253 | tx.send("ZOMG" ).unwrap(); |
254 | }); |
255 | |
256 | rx.await.unwrap() |
257 | }); |
258 | |
259 | handle.await.unwrap() |
260 | }); |
261 | |
262 | assert_eq!(out, "ZOMG" ); |
263 | |
264 | cfg_metrics! { |
265 | let metrics = rt.metrics(); |
266 | drop(rt); |
267 | assert_eq!(1, metrics.remote_schedule_count()); |
268 | |
269 | let mut local = 0; |
270 | for i in 0..metrics.num_workers() { |
271 | local += metrics.worker_local_schedule_count(i); |
272 | } |
273 | |
274 | assert_eq!(1, local); |
275 | } |
276 | } |
277 | |
278 | #[test] |
279 | #[cfg_attr (target_os = "wasi" , ignore = "Wasi does not support panic recovery" )] |
280 | #[should_panic ( |
281 | expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." |
282 | )] |
283 | fn timeout_panics_when_no_time_handle() { |
284 | let rt = tokio::runtime::Builder::new_current_thread() |
285 | .build() |
286 | .unwrap(); |
287 | rt.block_on(async { |
288 | let (_tx, rx) = oneshot::channel::<()>(); |
289 | let dur = Duration::from_millis(20); |
290 | let _ = timeout(dur, rx).await; |
291 | }); |
292 | } |
293 | |
294 | #[cfg (tokio_unstable)] |
295 | mod unstable { |
296 | use tokio::runtime::{Builder, RngSeed, UnhandledPanic}; |
297 | |
298 | #[test] |
299 | #[should_panic ( |
300 | expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic" |
301 | )] |
302 | fn shutdown_on_panic() { |
303 | let rt = Builder::new_current_thread() |
304 | .unhandled_panic(UnhandledPanic::ShutdownRuntime) |
305 | .build() |
306 | .unwrap(); |
307 | |
308 | rt.block_on(async { |
309 | tokio::spawn(async { |
310 | panic!("boom" ); |
311 | }); |
312 | |
313 | futures::future::pending::<()>().await; |
314 | }) |
315 | } |
316 | |
317 | #[test] |
318 | #[cfg_attr (target_os = "wasi" , ignore = "Wasi does not support panic recovery" )] |
319 | fn spawns_do_nothing() { |
320 | use std::sync::Arc; |
321 | |
322 | let rt = Builder::new_current_thread() |
323 | .unhandled_panic(UnhandledPanic::ShutdownRuntime) |
324 | .build() |
325 | .unwrap(); |
326 | |
327 | let rt1 = Arc::new(rt); |
328 | let rt2 = rt1.clone(); |
329 | |
330 | let _ = std::thread::spawn(move || { |
331 | rt2.block_on(async { |
332 | tokio::spawn(async { |
333 | panic!("boom" ); |
334 | }); |
335 | |
336 | futures::future::pending::<()>().await; |
337 | }) |
338 | }) |
339 | .join(); |
340 | |
341 | let task = rt1.spawn(async {}); |
342 | let res = futures::executor::block_on(task); |
343 | assert!(res.is_err()); |
344 | } |
345 | |
346 | #[test] |
347 | #[cfg_attr (target_os = "wasi" , ignore = "Wasi does not support panic recovery" )] |
348 | fn shutdown_all_concurrent_block_on() { |
349 | const N: usize = 2; |
350 | use std::sync::{mpsc, Arc}; |
351 | |
352 | let rt = Builder::new_current_thread() |
353 | .unhandled_panic(UnhandledPanic::ShutdownRuntime) |
354 | .build() |
355 | .unwrap(); |
356 | |
357 | let rt = Arc::new(rt); |
358 | let mut ths = vec![]; |
359 | let (tx, rx) = mpsc::channel(); |
360 | |
361 | for _ in 0..N { |
362 | let rt = rt.clone(); |
363 | let tx = tx.clone(); |
364 | ths.push(std::thread::spawn(move || { |
365 | rt.block_on(async { |
366 | tx.send(()).unwrap(); |
367 | futures::future::pending::<()>().await; |
368 | }); |
369 | })); |
370 | } |
371 | |
372 | for _ in 0..N { |
373 | rx.recv().unwrap(); |
374 | } |
375 | |
376 | rt.spawn(async { |
377 | panic!("boom" ); |
378 | }); |
379 | |
380 | for th in ths { |
381 | assert!(th.join().is_err()); |
382 | } |
383 | } |
384 | |
385 | #[test] |
386 | fn rng_seed() { |
387 | let seed = b"bytes used to generate seed" ; |
388 | let rt1 = tokio::runtime::Builder::new_current_thread() |
389 | .rng_seed(RngSeed::from_bytes(seed)) |
390 | .build() |
391 | .unwrap(); |
392 | let rt1_values = rt1.block_on(async { |
393 | let rand_1 = tokio::macros::support::thread_rng_n(100); |
394 | let rand_2 = tokio::macros::support::thread_rng_n(100); |
395 | |
396 | (rand_1, rand_2) |
397 | }); |
398 | |
399 | let rt2 = tokio::runtime::Builder::new_current_thread() |
400 | .rng_seed(RngSeed::from_bytes(seed)) |
401 | .build() |
402 | .unwrap(); |
403 | let rt2_values = rt2.block_on(async { |
404 | let rand_1 = tokio::macros::support::thread_rng_n(100); |
405 | let rand_2 = tokio::macros::support::thread_rng_n(100); |
406 | |
407 | (rand_1, rand_2) |
408 | }); |
409 | |
410 | assert_eq!(rt1_values, rt2_values); |
411 | } |
412 | |
413 | #[test] |
414 | fn rng_seed_multi_enter() { |
415 | let seed = b"bytes used to generate seed" ; |
416 | |
417 | fn two_rand_values() -> (u32, u32) { |
418 | let rand_1 = tokio::macros::support::thread_rng_n(100); |
419 | let rand_2 = tokio::macros::support::thread_rng_n(100); |
420 | |
421 | (rand_1, rand_2) |
422 | } |
423 | |
424 | let rt1 = tokio::runtime::Builder::new_current_thread() |
425 | .rng_seed(RngSeed::from_bytes(seed)) |
426 | .build() |
427 | .unwrap(); |
428 | let rt1_values_1 = rt1.block_on(async { two_rand_values() }); |
429 | let rt1_values_2 = rt1.block_on(async { two_rand_values() }); |
430 | |
431 | let rt2 = tokio::runtime::Builder::new_current_thread() |
432 | .rng_seed(RngSeed::from_bytes(seed)) |
433 | .build() |
434 | .unwrap(); |
435 | let rt2_values_1 = rt2.block_on(async { two_rand_values() }); |
436 | let rt2_values_2 = rt2.block_on(async { two_rand_values() }); |
437 | |
438 | assert_eq!(rt1_values_1, rt2_values_1); |
439 | assert_eq!(rt1_values_2, rt2_values_2); |
440 | } |
441 | } |
442 | |
443 | fn rt() -> Runtime { |
444 | tokio::runtime::Builder::new_current_thread() |
445 | .enable_all() |
446 | .build() |
447 | .unwrap() |
448 | } |
449 | |