1#![warn(rust_2018_idioms)]
2#![cfg(feature = "full")]
3
4use tokio::runtime::Runtime;
5use tokio::sync::oneshot;
6use tokio::time::{timeout, Duration};
7use tokio_test::{assert_err, assert_ok};
8
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::task::{Context, Poll};
13use std::thread;
14
15mod support {
16 pub(crate) mod mpsc_stream;
17}
18
19macro_rules! cfg_metrics {
20 ($($t:tt)*) => {
21 #[cfg(tokio_unstable)]
22 {
23 $( $t )*
24 }
25 }
26}
27
28#[test]
29fn spawned_task_does_not_progress_without_block_on() {
30 let (tx, mut rx) = oneshot::channel();
31
32 let rt = rt();
33
34 rt.spawn(async move {
35 assert_ok!(tx.send("hello"));
36 });
37
38 thread::sleep(Duration::from_millis(50));
39
40 assert_err!(rx.try_recv());
41
42 let out = rt.block_on(async { assert_ok!(rx.await) });
43
44 assert_eq!(out, "hello");
45}
46
47#[test]
48fn no_extra_poll() {
49 use pin_project_lite::pin_project;
50 use std::pin::Pin;
51 use std::sync::{
52 atomic::{AtomicUsize, Ordering::SeqCst},
53 Arc,
54 };
55 use std::task::{Context, Poll};
56 use tokio_stream::{Stream, StreamExt};
57
58 pin_project! {
59 struct TrackPolls<S> {
60 npolls: Arc<AtomicUsize>,
61 #[pin]
62 s: S,
63 }
64 }
65
66 impl<S> Stream for TrackPolls<S>
67 where
68 S: Stream,
69 {
70 type Item = S::Item;
71 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72 let this = self.project();
73 this.npolls.fetch_add(1, SeqCst);
74 this.s.poll_next(cx)
75 }
76 }
77
78 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>();
79 let rx = TrackPolls {
80 npolls: Arc::new(AtomicUsize::new(0)),
81 s: rx,
82 };
83 let npolls = Arc::clone(&rx.npolls);
84
85 let rt = rt();
86
87 // TODO: could probably avoid this, but why not.
88 let mut rx = Box::pin(rx);
89
90 rt.spawn(async move { while rx.next().await.is_some() {} });
91 rt.block_on(async {
92 tokio::task::yield_now().await;
93 });
94
95 // should have been polled exactly once: the initial poll
96 assert_eq!(npolls.load(SeqCst), 1);
97
98 tx.send(()).unwrap();
99 rt.block_on(async {
100 tokio::task::yield_now().await;
101 });
102
103 // should have been polled twice more: once to yield Some(), then once to yield Pending
104 assert_eq!(npolls.load(SeqCst), 1 + 2);
105
106 drop(tx);
107 rt.block_on(async {
108 tokio::task::yield_now().await;
109 });
110
111 // should have been polled once more: to yield None
112 assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
113}
114
115#[test]
116fn acquire_mutex_in_drop() {
117 use futures::future::pending;
118 use tokio::task;
119
120 let (tx1, rx1) = oneshot::channel();
121 let (tx2, rx2) = oneshot::channel();
122
123 let rt = rt();
124
125 rt.spawn(async move {
126 let _ = rx2.await;
127 unreachable!();
128 });
129
130 rt.spawn(async move {
131 let _ = rx1.await;
132 tx2.send(()).unwrap();
133 unreachable!();
134 });
135
136 // Spawn a task that will never notify
137 rt.spawn(async move {
138 pending::<()>().await;
139 tx1.send(()).unwrap();
140 });
141
142 // Tick the loop
143 rt.block_on(async {
144 task::yield_now().await;
145 });
146
147 // Drop the rt
148 drop(rt);
149}
150
151#[test]
152fn drop_tasks_in_context() {
153 static SUCCESS: AtomicBool = AtomicBool::new(false);
154
155 struct ContextOnDrop;
156
157 impl Future for ContextOnDrop {
158 type Output = ();
159
160 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
161 Poll::Pending
162 }
163 }
164
165 impl Drop for ContextOnDrop {
166 fn drop(&mut self) {
167 if tokio::runtime::Handle::try_current().is_ok() {
168 SUCCESS.store(true, Ordering::SeqCst);
169 }
170 }
171 }
172
173 let rt = rt();
174 rt.spawn(ContextOnDrop);
175 drop(rt);
176
177 assert!(SUCCESS.load(Ordering::SeqCst));
178}
179
180#[test]
181#[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
182#[should_panic(expected = "boom")]
183fn wake_in_drop_after_panic() {
184 let (tx, rx) = oneshot::channel::<()>();
185
186 struct WakeOnDrop(Option<oneshot::Sender<()>>);
187
188 impl Drop for WakeOnDrop {
189 fn drop(&mut self) {
190 self.0.take().unwrap().send(()).unwrap();
191 }
192 }
193
194 let rt = rt();
195
196 rt.spawn(async move {
197 let _wake_on_drop = WakeOnDrop(Some(tx));
198 // wait forever
199 futures::future::pending::<()>().await;
200 });
201
202 let _join = rt.spawn(async move { rx.await });
203
204 rt.block_on(async {
205 tokio::task::yield_now().await;
206 panic!("boom");
207 });
208}
209
210#[test]
211fn spawn_two() {
212 let rt = rt();
213
214 let out = rt.block_on(async {
215 let (tx, rx) = oneshot::channel();
216
217 tokio::spawn(async move {
218 tokio::spawn(async move {
219 tx.send("ZOMG").unwrap();
220 });
221 });
222
223 assert_ok!(rx.await)
224 });
225
226 assert_eq!(out, "ZOMG");
227
228 cfg_metrics! {
229 let metrics = rt.metrics();
230 drop(rt);
231 assert_eq!(0, metrics.remote_schedule_count());
232
233 let mut local = 0;
234 for i in 0..metrics.num_workers() {
235 local += metrics.worker_local_schedule_count(i);
236 }
237
238 assert_eq!(2, local);
239 }
240}
241
242#[cfg_attr(target_os = "wasi", ignore = "WASI: std::thread::spawn not supported")]
243#[test]
244fn spawn_remote() {
245 let rt = rt();
246
247 let out = rt.block_on(async {
248 let (tx, rx) = oneshot::channel();
249
250 let handle = tokio::spawn(async move {
251 std::thread::spawn(move || {
252 std::thread::sleep(Duration::from_millis(10));
253 tx.send("ZOMG").unwrap();
254 });
255
256 rx.await.unwrap()
257 });
258
259 handle.await.unwrap()
260 });
261
262 assert_eq!(out, "ZOMG");
263
264 cfg_metrics! {
265 let metrics = rt.metrics();
266 drop(rt);
267 assert_eq!(1, metrics.remote_schedule_count());
268
269 let mut local = 0;
270 for i in 0..metrics.num_workers() {
271 local += metrics.worker_local_schedule_count(i);
272 }
273
274 assert_eq!(1, local);
275 }
276}
277
278#[test]
279#[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
280#[should_panic(
281 expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
282)]
283fn timeout_panics_when_no_time_handle() {
284 let rt = tokio::runtime::Builder::new_current_thread()
285 .build()
286 .unwrap();
287 rt.block_on(async {
288 let (_tx, rx) = oneshot::channel::<()>();
289 let dur = Duration::from_millis(20);
290 let _ = timeout(dur, rx).await;
291 });
292}
293
294#[cfg(tokio_unstable)]
295mod unstable {
296 use tokio::runtime::{Builder, RngSeed, UnhandledPanic};
297
298 #[test]
299 #[should_panic(
300 expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic"
301 )]
302 fn shutdown_on_panic() {
303 let rt = Builder::new_current_thread()
304 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
305 .build()
306 .unwrap();
307
308 rt.block_on(async {
309 tokio::spawn(async {
310 panic!("boom");
311 });
312
313 futures::future::pending::<()>().await;
314 })
315 }
316
317 #[test]
318 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
319 fn spawns_do_nothing() {
320 use std::sync::Arc;
321
322 let rt = Builder::new_current_thread()
323 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
324 .build()
325 .unwrap();
326
327 let rt1 = Arc::new(rt);
328 let rt2 = rt1.clone();
329
330 let _ = std::thread::spawn(move || {
331 rt2.block_on(async {
332 tokio::spawn(async {
333 panic!("boom");
334 });
335
336 futures::future::pending::<()>().await;
337 })
338 })
339 .join();
340
341 let task = rt1.spawn(async {});
342 let res = futures::executor::block_on(task);
343 assert!(res.is_err());
344 }
345
346 #[test]
347 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
348 fn shutdown_all_concurrent_block_on() {
349 const N: usize = 2;
350 use std::sync::{mpsc, Arc};
351
352 let rt = Builder::new_current_thread()
353 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
354 .build()
355 .unwrap();
356
357 let rt = Arc::new(rt);
358 let mut ths = vec![];
359 let (tx, rx) = mpsc::channel();
360
361 for _ in 0..N {
362 let rt = rt.clone();
363 let tx = tx.clone();
364 ths.push(std::thread::spawn(move || {
365 rt.block_on(async {
366 tx.send(()).unwrap();
367 futures::future::pending::<()>().await;
368 });
369 }));
370 }
371
372 for _ in 0..N {
373 rx.recv().unwrap();
374 }
375
376 rt.spawn(async {
377 panic!("boom");
378 });
379
380 for th in ths {
381 assert!(th.join().is_err());
382 }
383 }
384
385 #[test]
386 fn rng_seed() {
387 let seed = b"bytes used to generate seed";
388 let rt1 = tokio::runtime::Builder::new_current_thread()
389 .rng_seed(RngSeed::from_bytes(seed))
390 .build()
391 .unwrap();
392 let rt1_values = rt1.block_on(async {
393 let rand_1 = tokio::macros::support::thread_rng_n(100);
394 let rand_2 = tokio::macros::support::thread_rng_n(100);
395
396 (rand_1, rand_2)
397 });
398
399 let rt2 = tokio::runtime::Builder::new_current_thread()
400 .rng_seed(RngSeed::from_bytes(seed))
401 .build()
402 .unwrap();
403 let rt2_values = rt2.block_on(async {
404 let rand_1 = tokio::macros::support::thread_rng_n(100);
405 let rand_2 = tokio::macros::support::thread_rng_n(100);
406
407 (rand_1, rand_2)
408 });
409
410 assert_eq!(rt1_values, rt2_values);
411 }
412
413 #[test]
414 fn rng_seed_multi_enter() {
415 let seed = b"bytes used to generate seed";
416
417 fn two_rand_values() -> (u32, u32) {
418 let rand_1 = tokio::macros::support::thread_rng_n(100);
419 let rand_2 = tokio::macros::support::thread_rng_n(100);
420
421 (rand_1, rand_2)
422 }
423
424 let rt1 = tokio::runtime::Builder::new_current_thread()
425 .rng_seed(RngSeed::from_bytes(seed))
426 .build()
427 .unwrap();
428 let rt1_values_1 = rt1.block_on(async { two_rand_values() });
429 let rt1_values_2 = rt1.block_on(async { two_rand_values() });
430
431 let rt2 = tokio::runtime::Builder::new_current_thread()
432 .rng_seed(RngSeed::from_bytes(seed))
433 .build()
434 .unwrap();
435 let rt2_values_1 = rt2.block_on(async { two_rand_values() });
436 let rt2_values_2 = rt2.block_on(async { two_rand_values() });
437
438 assert_eq!(rt1_values_1, rt2_values_1);
439 assert_eq!(rt1_values_2, rt2_values_2);
440 }
441}
442
443fn rt() -> Runtime {
444 tokio::runtime::Builder::new_current_thread()
445 .enable_all()
446 .build()
447 .unwrap()
448}
449