1use crate::enter;
2use futures_core::future::Future;
3use futures_core::stream::Stream;
4use futures_core::task::{Context, Poll};
5use futures_task::{waker_ref, ArcWake};
6use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
7use futures_util::pin_mut;
8use futures_util::stream::FuturesUnordered;
9use futures_util::stream::StreamExt;
10use std::cell::RefCell;
11use std::ops::{Deref, DerefMut};
12use std::rc::{Rc, Weak};
13use std::sync::{
14 atomic::{AtomicBool, Ordering},
15 Arc,
16};
17use std::thread::{self, Thread};
18
19/// A single-threaded task pool for polling futures to completion.
20///
21/// This executor allows you to multiplex any number of tasks onto a single
22/// thread. It's appropriate to poll strictly I/O-bound futures that do very
23/// little work in between I/O actions.
24///
25/// To get a handle to the pool that implements
26/// [`Spawn`](futures_task::Spawn), use the
27/// [`spawner()`](LocalPool::spawner) method. Because the executor is
28/// single-threaded, it supports a special form of task spawning for non-`Send`
29/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
30#[derive(Debug)]
31pub struct LocalPool {
32 pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
33 incoming: Rc<Incoming>,
34}
35
36/// A handle to a [`LocalPool`](LocalPool) that implements
37/// [`Spawn`](futures_task::Spawn).
38#[derive(Clone, Debug)]
39pub struct LocalSpawner {
40 incoming: Weak<Incoming>,
41}
42
43type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
44
45pub(crate) struct ThreadNotify {
46 /// The (single) executor thread.
47 thread: Thread,
48 /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
49 /// before the next `park()`, which may otherwise happen if the code
50 /// being executed as part of the future(s) being polled makes use of
51 /// park / unpark calls of its own, i.e. we cannot assume that no other
52 /// code uses park / unpark on the executing `thread`.
53 unparked: AtomicBool,
54}
55
56thread_local! {
57 static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
58 thread: thread::current(),
59 unparked: AtomicBool::new(false),
60 });
61}
62
63impl ArcWake for ThreadNotify {
64 fn wake_by_ref(arc_self: &Arc<Self>) {
65 // Make sure the wakeup is remembered until the next `park()`.
66 let unparked = arc_self.unparked.swap(true, Ordering::Release);
67 if !unparked {
68 // If the thread has not been unparked yet, it must be done
69 // now. If it was actually parked, it will run again,
70 // otherwise the token made available by `unpark`
71 // may be consumed before reaching `park()`, but `unparked`
72 // ensures it is not forgotten.
73 arc_self.thread.unpark();
74 }
75 }
76}
77
78// Set up and run a basic single-threaded spawner loop, invoking `f` on each
79// turn.
80fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
81 let _enter = enter().expect(
82 "cannot execute `LocalPool` executor from within \
83 another executor",
84 );
85
86 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
87 let waker = waker_ref(thread_notify);
88 let mut cx = Context::from_waker(&waker);
89 loop {
90 if let Poll::Ready(t) = f(&mut cx) {
91 return t;
92 }
93
94 // Wait for a wakeup.
95 while !thread_notify.unparked.swap(false, Ordering::Acquire) {
96 // No wakeup occurred. It may occur now, right before parking,
97 // but in that case the token made available by `unpark()`
98 // is guaranteed to still be available and `park()` is a no-op.
99 thread::park();
100 }
101 }
102 })
103}
104
105/// Check for a wakeup, but don't consume it.
106fn woken() -> bool {
107 CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
108}
109
110impl LocalPool {
111 /// Create a new, empty pool of tasks.
112 pub fn new() -> Self {
113 Self { pool: FuturesUnordered::new(), incoming: Default::default() }
114 }
115
116 /// Get a clonable handle to the pool as a [`Spawn`].
117 pub fn spawner(&self) -> LocalSpawner {
118 LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
119 }
120
121 /// Run all tasks in the pool to completion.
122 ///
123 /// ```
124 /// use futures::executor::LocalPool;
125 ///
126 /// let mut pool = LocalPool::new();
127 ///
128 /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
129 ///
130 /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
131 /// pool.run();
132 /// ```
133 ///
134 /// The function will block the calling thread until *all* tasks in the pool
135 /// are complete, including any spawned while running existing tasks.
136 pub fn run(&mut self) {
137 run_executor(|cx| self.poll_pool(cx))
138 }
139
140 /// Runs all the tasks in the pool until the given future completes.
141 ///
142 /// ```
143 /// use futures::executor::LocalPool;
144 ///
145 /// let mut pool = LocalPool::new();
146 /// # let my_app = async {};
147 ///
148 /// // run tasks in the pool until `my_app` completes
149 /// pool.run_until(my_app);
150 /// ```
151 ///
152 /// The function will block the calling thread *only* until the future `f`
153 /// completes; there may still be incomplete tasks in the pool, which will
154 /// be inert after the call completes, but can continue with further use of
155 /// one of the pool's run or poll methods. While the function is running,
156 /// however, all tasks in the pool will try to make progress.
157 pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
158 pin_mut!(future);
159
160 run_executor(|cx| {
161 {
162 // if our main task is done, so are we
163 let result = future.as_mut().poll(cx);
164 if let Poll::Ready(output) = result {
165 return Poll::Ready(output);
166 }
167 }
168
169 let _ = self.poll_pool(cx);
170 Poll::Pending
171 })
172 }
173
174 /// Runs all tasks and returns after completing one future or until no more progress
175 /// can be made. Returns `true` if one future was completed, `false` otherwise.
176 ///
177 /// ```
178 /// use futures::executor::LocalPool;
179 /// use futures::task::LocalSpawnExt;
180 /// use futures::future::{ready, pending};
181 ///
182 /// let mut pool = LocalPool::new();
183 /// let spawner = pool.spawner();
184 ///
185 /// spawner.spawn_local(ready(())).unwrap();
186 /// spawner.spawn_local(ready(())).unwrap();
187 /// spawner.spawn_local(pending()).unwrap();
188 ///
189 /// // Run the two ready tasks and return true for them.
190 /// pool.try_run_one(); // returns true after completing one of the ready futures
191 /// pool.try_run_one(); // returns true after completing the other ready future
192 ///
193 /// // the remaining task can not be completed
194 /// assert!(!pool.try_run_one()); // returns false
195 /// ```
196 ///
197 /// This function will not block the calling thread and will return the moment
198 /// that there are no tasks left for which progress can be made or after exactly one
199 /// task was completed; Remaining incomplete tasks in the pool can continue with
200 /// further use of one of the pool's run or poll methods.
201 /// Though only one task will be completed, progress may be made on multiple tasks.
202 pub fn try_run_one(&mut self) -> bool {
203 run_executor(|cx| {
204 loop {
205 self.drain_incoming();
206
207 match self.pool.poll_next_unpin(cx) {
208 // Success!
209 Poll::Ready(Some(())) => return Poll::Ready(true),
210 // The pool was empty.
211 Poll::Ready(None) => return Poll::Ready(false),
212 Poll::Pending => (),
213 }
214
215 if !self.incoming.borrow().is_empty() {
216 // New tasks were spawned; try again.
217 continue;
218 } else if woken() {
219 // The pool yielded to us, but there's more progress to be made.
220 return Poll::Pending;
221 } else {
222 return Poll::Ready(false);
223 }
224 }
225 })
226 }
227
228 /// Runs all tasks in the pool and returns if no more progress can be made
229 /// on any task.
230 ///
231 /// ```
232 /// use futures::executor::LocalPool;
233 /// use futures::task::LocalSpawnExt;
234 /// use futures::future::{ready, pending};
235 ///
236 /// let mut pool = LocalPool::new();
237 /// let spawner = pool.spawner();
238 ///
239 /// spawner.spawn_local(ready(())).unwrap();
240 /// spawner.spawn_local(ready(())).unwrap();
241 /// spawner.spawn_local(pending()).unwrap();
242 ///
243 /// // Runs the two ready task and returns.
244 /// // The empty task remains in the pool.
245 /// pool.run_until_stalled();
246 /// ```
247 ///
248 /// This function will not block the calling thread and will return the moment
249 /// that there are no tasks left for which progress can be made;
250 /// remaining incomplete tasks in the pool can continue with further use of one
251 /// of the pool's run or poll methods. While the function is running, all tasks
252 /// in the pool will try to make progress.
253 pub fn run_until_stalled(&mut self) {
254 run_executor(|cx| match self.poll_pool(cx) {
255 // The pool is empty.
256 Poll::Ready(()) => Poll::Ready(()),
257 Poll::Pending => {
258 if woken() {
259 Poll::Pending
260 } else {
261 // We're stalled for now.
262 Poll::Ready(())
263 }
264 }
265 });
266 }
267
268 /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
269 /// Repeat until either the pool is empty, or it returns `Pending`.
270 ///
271 /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
272 ///
273 /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
274 /// mean that the pool can't make progress.
275 fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
276 loop {
277 self.drain_incoming();
278
279 let pool_ret = self.pool.poll_next_unpin(cx);
280
281 // We queued up some new tasks; add them and poll again.
282 if !self.incoming.borrow().is_empty() {
283 continue;
284 }
285
286 match pool_ret {
287 Poll::Ready(Some(())) => continue,
288 Poll::Ready(None) => return Poll::Ready(()),
289 Poll::Pending => return Poll::Pending,
290 }
291 }
292 }
293
294 /// Empty the incoming queue of newly-spawned tasks.
295 fn drain_incoming(&mut self) {
296 let mut incoming = self.incoming.borrow_mut();
297 for task in incoming.drain(..) {
298 self.pool.push(task)
299 }
300 }
301}
302
303impl Default for LocalPool {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309/// Run a future to completion on the current thread.
310///
311/// This function will block the caller until the given future has completed.
312///
313/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
314/// spawned tasks.
315pub fn block_on<F: Future>(f: F) -> F::Output {
316 pin_mut!(f);
317 run_executor(|cx| f.as_mut().poll(cx))
318}
319
320/// Turn a stream into a blocking iterator.
321///
322/// When `next` is called on the resulting `BlockingStream`, the caller
323/// will be blocked until the next element of the `Stream` becomes available.
324pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
325 BlockingStream { stream }
326}
327
328/// An iterator which blocks on values from a stream until they become available.
329#[derive(Debug)]
330pub struct BlockingStream<S: Stream + Unpin> {
331 stream: S,
332}
333
334impl<S: Stream + Unpin> Deref for BlockingStream<S> {
335 type Target = S;
336 fn deref(&self) -> &Self::Target {
337 &self.stream
338 }
339}
340
341impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
342 fn deref_mut(&mut self) -> &mut Self::Target {
343 &mut self.stream
344 }
345}
346
347impl<S: Stream + Unpin> BlockingStream<S> {
348 /// Convert this `BlockingStream` into the inner `Stream` type.
349 pub fn into_inner(self) -> S {
350 self.stream
351 }
352}
353
354impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
355 type Item = S::Item;
356
357 fn next(&mut self) -> Option<Self::Item> {
358 LocalPool::new().run_until(self.stream.next())
359 }
360
361 fn size_hint(&self) -> (usize, Option<usize>) {
362 self.stream.size_hint()
363 }
364}
365
366impl Spawn for LocalSpawner {
367 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
368 if let Some(incoming) = self.incoming.upgrade() {
369 incoming.borrow_mut().push(future.into());
370 Ok(())
371 } else {
372 Err(SpawnError::shutdown())
373 }
374 }
375
376 fn status(&self) -> Result<(), SpawnError> {
377 if self.incoming.upgrade().is_some() {
378 Ok(())
379 } else {
380 Err(SpawnError::shutdown())
381 }
382 }
383}
384
385impl LocalSpawn for LocalSpawner {
386 fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
387 if let Some(incoming) = self.incoming.upgrade() {
388 incoming.borrow_mut().push(future);
389 Ok(())
390 } else {
391 Err(SpawnError::shutdown())
392 }
393 }
394
395 fn status_local(&self) -> Result<(), SpawnError> {
396 if self.incoming.upgrade().is_some() {
397 Ok(())
398 } else {
399 Err(SpawnError::shutdown())
400 }
401 }
402}
403