1use futures::channel::oneshot;
2use futures::executor::LocalPool;
3use futures::future::{self, lazy, poll_fn, Future};
4use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
5use std::cell::{Cell, RefCell};
6use std::pin::Pin;
7use std::rc::Rc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::thread;
11use std::time::Duration;
12
13struct Pending(Rc<()>);
14
15impl Future for Pending {
16 type Output = ();
17
18 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
19 Poll::Pending
20 }
21}
22
23fn pending() -> Pending {
24 Pending(Rc::new(()))
25}
26
27#[test]
28fn run_until_single_future() {
29 let mut cnt = 0;
30
31 {
32 let mut pool = LocalPool::new();
33 let fut = lazy(|_| {
34 cnt += 1;
35 });
36 pool.run_until(fut);
37 }
38
39 assert_eq!(cnt, 1);
40}
41
42#[test]
43fn run_until_ignores_spawned() {
44 let mut pool = LocalPool::new();
45 let spawn = pool.spawner();
46 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
47 pool.run_until(lazy(|_| ()));
48}
49
50#[test]
51fn run_until_executes_spawned() {
52 let (tx, rx) = oneshot::channel();
53 let mut pool = LocalPool::new();
54 let spawn = pool.spawner();
55 spawn
56 .spawn_local_obj(
57 Box::pin(lazy(move |_| {
58 tx.send(()).unwrap();
59 }))
60 .into(),
61 )
62 .unwrap();
63 pool.run_until(rx).unwrap();
64}
65
66#[test]
67fn run_returns_if_empty() {
68 let mut pool = LocalPool::new();
69 pool.run();
70 pool.run();
71}
72
73#[test]
74fn run_executes_spawned() {
75 let cnt = Rc::new(Cell::new(0));
76 let cnt2 = cnt.clone();
77
78 let mut pool = LocalPool::new();
79 let spawn = pool.spawner();
80 let spawn2 = pool.spawner();
81
82 spawn
83 .spawn_local_obj(
84 Box::pin(lazy(move |_| {
85 spawn2
86 .spawn_local_obj(
87 Box::pin(lazy(move |_| {
88 cnt2.set(cnt2.get() + 1);
89 }))
90 .into(),
91 )
92 .unwrap();
93 }))
94 .into(),
95 )
96 .unwrap();
97
98 pool.run();
99
100 assert_eq!(cnt.get(), 1);
101}
102
103#[test]
104fn run_spawn_many() {
105 const ITER: usize = 200;
106
107 let cnt = Rc::new(Cell::new(0));
108
109 let mut pool = LocalPool::new();
110 let spawn = pool.spawner();
111
112 for _ in 0..ITER {
113 let cnt = cnt.clone();
114 spawn
115 .spawn_local_obj(
116 Box::pin(lazy(move |_| {
117 cnt.set(cnt.get() + 1);
118 }))
119 .into(),
120 )
121 .unwrap();
122 }
123
124 pool.run();
125
126 assert_eq!(cnt.get(), ITER);
127}
128
129#[test]
130fn try_run_one_returns_if_empty() {
131 let mut pool = LocalPool::new();
132 assert!(!pool.try_run_one());
133}
134
135#[test]
136fn try_run_one_executes_one_ready() {
137 const ITER: usize = 200;
138
139 let cnt = Rc::new(Cell::new(0));
140
141 let mut pool = LocalPool::new();
142 let spawn = pool.spawner();
143
144 for _ in 0..ITER {
145 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
146
147 let cnt = cnt.clone();
148 spawn
149 .spawn_local_obj(
150 Box::pin(lazy(move |_| {
151 cnt.set(cnt.get() + 1);
152 }))
153 .into(),
154 )
155 .unwrap();
156
157 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
158 }
159
160 for i in 0..ITER {
161 assert_eq!(cnt.get(), i);
162 assert!(pool.try_run_one());
163 assert_eq!(cnt.get(), i + 1);
164 }
165 assert!(!pool.try_run_one());
166}
167
168#[test]
169fn try_run_one_returns_on_no_progress() {
170 const ITER: usize = 10;
171
172 let cnt = Rc::new(Cell::new(0));
173
174 let mut pool = LocalPool::new();
175 let spawn = pool.spawner();
176
177 let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
178 {
179 let cnt = cnt.clone();
180 let waker = waker.clone();
181 spawn
182 .spawn_local_obj(
183 Box::pin(poll_fn(move |ctx| {
184 cnt.set(cnt.get() + 1);
185 waker.set(Some(ctx.waker().clone()));
186 if cnt.get() == ITER {
187 Poll::Ready(())
188 } else {
189 Poll::Pending
190 }
191 }))
192 .into(),
193 )
194 .unwrap();
195 }
196
197 for i in 0..ITER - 1 {
198 assert_eq!(cnt.get(), i);
199 assert!(!pool.try_run_one());
200 assert_eq!(cnt.get(), i + 1);
201 let w = waker.take();
202 assert!(w.is_some());
203 w.unwrap().wake();
204 }
205 assert!(pool.try_run_one());
206 assert_eq!(cnt.get(), ITER);
207}
208
209#[test]
210fn try_run_one_runs_sub_futures() {
211 let mut pool = LocalPool::new();
212 let spawn = pool.spawner();
213 let cnt = Rc::new(Cell::new(0));
214
215 let inner_spawner = spawn.clone();
216 let cnt1 = cnt.clone();
217 spawn
218 .spawn_local_obj(
219 Box::pin(poll_fn(move |_| {
220 cnt1.set(cnt1.get() + 1);
221
222 let cnt2 = cnt1.clone();
223 inner_spawner
224 .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
225 .unwrap();
226
227 Poll::Pending
228 }))
229 .into(),
230 )
231 .unwrap();
232
233 pool.try_run_one();
234 assert_eq!(cnt.get(), 2);
235}
236
237#[test]
238fn run_until_stalled_returns_if_empty() {
239 let mut pool = LocalPool::new();
240 pool.run_until_stalled();
241 pool.run_until_stalled();
242}
243
244#[test]
245fn run_until_stalled_returns_multiple_times() {
246 let mut pool = LocalPool::new();
247 let spawn = pool.spawner();
248 let cnt = Rc::new(Cell::new(0));
249
250 let cnt1 = cnt.clone();
251 spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
252 pool.run_until_stalled();
253 assert_eq!(cnt.get(), 1);
254
255 let cnt2 = cnt.clone();
256 spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
257 pool.run_until_stalled();
258 assert_eq!(cnt.get(), 2);
259}
260
261#[test]
262fn run_until_stalled_runs_spawned_sub_futures() {
263 let mut pool = LocalPool::new();
264 let spawn = pool.spawner();
265 let cnt = Rc::new(Cell::new(0));
266
267 let inner_spawner = spawn.clone();
268 let cnt1 = cnt.clone();
269 spawn
270 .spawn_local_obj(
271 Box::pin(poll_fn(move |_| {
272 cnt1.set(cnt1.get() + 1);
273
274 let cnt2 = cnt1.clone();
275 inner_spawner
276 .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
277 .unwrap();
278
279 Poll::Pending
280 }))
281 .into(),
282 )
283 .unwrap();
284
285 pool.run_until_stalled();
286 assert_eq!(cnt.get(), 2);
287}
288
289#[test]
290fn run_until_stalled_executes_all_ready() {
291 const ITER: usize = if cfg!(miri) { 50 } else { 200 };
292 const PER_ITER: usize = 3;
293
294 let cnt = Rc::new(Cell::new(0));
295
296 let mut pool = LocalPool::new();
297 let spawn = pool.spawner();
298
299 for i in 0..ITER {
300 for _ in 0..PER_ITER {
301 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
302
303 let cnt = cnt.clone();
304 spawn
305 .spawn_local_obj(
306 Box::pin(lazy(move |_| {
307 cnt.set(cnt.get() + 1);
308 }))
309 .into(),
310 )
311 .unwrap();
312
313 // also add some pending tasks to test if they are ignored
314 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
315 }
316 assert_eq!(cnt.get(), i * PER_ITER);
317 pool.run_until_stalled();
318 assert_eq!(cnt.get(), (i + 1) * PER_ITER);
319 }
320}
321
322#[test]
323#[should_panic]
324fn nesting_run() {
325 let mut pool = LocalPool::new();
326 let spawn = pool.spawner();
327
328 spawn
329 .spawn_obj(
330 Box::pin(lazy(|_| {
331 let mut pool = LocalPool::new();
332 pool.run();
333 }))
334 .into(),
335 )
336 .unwrap();
337
338 pool.run();
339}
340
341#[test]
342#[should_panic]
343fn nesting_run_run_until_stalled() {
344 let mut pool = LocalPool::new();
345 let spawn = pool.spawner();
346
347 spawn
348 .spawn_obj(
349 Box::pin(lazy(|_| {
350 let mut pool = LocalPool::new();
351 pool.run_until_stalled();
352 }))
353 .into(),
354 )
355 .unwrap();
356
357 pool.run();
358}
359
360#[test]
361fn tasks_are_scheduled_fairly() {
362 let state = Rc::new(RefCell::new([0, 0]));
363
364 struct Spin {
365 state: Rc<RefCell<[i32; 2]>>,
366 idx: usize,
367 }
368
369 impl Future for Spin {
370 type Output = ();
371
372 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
373 let mut state = self.state.borrow_mut();
374
375 if self.idx == 0 {
376 let diff = state[0] - state[1];
377
378 assert!(diff.abs() <= 1);
379
380 if state[0] >= 50 {
381 return Poll::Ready(());
382 }
383 }
384
385 state[self.idx] += 1;
386
387 if state[self.idx] >= 100 {
388 return Poll::Ready(());
389 }
390
391 cx.waker().wake_by_ref();
392 Poll::Pending
393 }
394 }
395
396 let mut pool = LocalPool::new();
397 let spawn = pool.spawner();
398
399 spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
400
401 spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
402
403 pool.run();
404}
405
406// Tests that the use of park/unpark in user-code has no
407// effect on the expected behavior of the executor.
408#[test]
409fn park_unpark_independence() {
410 let mut done = false;
411
412 let future = future::poll_fn(move |cx| {
413 if done {
414 return Poll::Ready(());
415 }
416 done = true;
417 cx.waker().clone().wake(); // (*)
418 // some user-code that temporarily parks the thread
419 let test = thread::current();
420 let latch = Arc::new(AtomicBool::new(false));
421 let signal = latch.clone();
422 thread::spawn(move || {
423 thread::sleep(Duration::from_millis(10));
424 signal.store(true, Ordering::SeqCst);
425 test.unpark()
426 });
427 while !latch.load(Ordering::Relaxed) {
428 thread::park();
429 }
430 Poll::Pending // Expect to be called again due to (*).
431 });
432
433 futures::executor::block_on(future)
434}
435
436struct SelfWaking {
437 wakeups_remaining: Rc<RefCell<usize>>,
438}
439
440impl Future for SelfWaking {
441 type Output = ();
442
443 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
444 if *self.wakeups_remaining.borrow() != 0 {
445 *self.wakeups_remaining.borrow_mut() -= 1;
446 cx.waker().wake_by_ref();
447 }
448
449 Poll::Pending
450 }
451}
452
453/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
454///
455/// The issue was that self-waking futures could cause `run_until_stalled`
456/// to exit early, even when progress could still be made.
457#[test]
458fn self_waking_run_until_stalled() {
459 let wakeups_remaining = Rc::new(RefCell::new(10));
460
461 let mut pool = LocalPool::new();
462 let spawner = pool.spawner();
463 for _ in 0..3 {
464 let wakeups_remaining = Rc::clone(&wakeups_remaining);
465 spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
466 }
467
468 // This should keep polling until there are no more wakeups.
469 pool.run_until_stalled();
470
471 assert_eq!(*wakeups_remaining.borrow(), 0);
472}
473
474/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
475///
476/// The issue was that self-waking futures could cause `try_run_one`
477/// to exit early, even when progress could still be made.
478#[test]
479fn self_waking_try_run_one() {
480 let wakeups_remaining = Rc::new(RefCell::new(10));
481
482 let mut pool = LocalPool::new();
483 let spawner = pool.spawner();
484 for _ in 0..3 {
485 let wakeups_remaining = Rc::clone(&wakeups_remaining);
486 spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
487 }
488
489 spawner.spawn(future::ready(())).unwrap();
490
491 // The `ready` future should complete.
492 assert!(pool.try_run_one());
493
494 // The self-waking futures are each polled once.
495 assert_eq!(*wakeups_remaining.borrow(), 7);
496}
497