1 | use crate::enter; |
2 | use futures_core::future::Future; |
3 | use futures_core::stream::Stream; |
4 | use futures_core::task::{Context, Poll}; |
5 | use futures_task::{waker_ref, ArcWake}; |
6 | use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; |
7 | use futures_util::pin_mut; |
8 | use futures_util::stream::FuturesUnordered; |
9 | use futures_util::stream::StreamExt; |
10 | use std::cell::RefCell; |
11 | use std::ops::{Deref, DerefMut}; |
12 | use std::rc::{Rc, Weak}; |
13 | use std::sync::{ |
14 | atomic::{AtomicBool, Ordering}, |
15 | Arc, |
16 | }; |
17 | use std::thread::{self, Thread}; |
18 | |
19 | /// A single-threaded task pool for polling futures to completion. |
20 | /// |
21 | /// This executor allows you to multiplex any number of tasks onto a single |
22 | /// thread. It's appropriate to poll strictly I/O-bound futures that do very |
23 | /// little work in between I/O actions. |
24 | /// |
25 | /// To get a handle to the pool that implements |
26 | /// [`Spawn`](futures_task::Spawn), use the |
27 | /// [`spawner()`](LocalPool::spawner) method. Because the executor is |
28 | /// single-threaded, it supports a special form of task spawning for non-`Send` |
29 | /// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj). |
30 | #[derive(Debug)] |
31 | pub struct LocalPool { |
32 | pool: FuturesUnordered<LocalFutureObj<'static, ()>>, |
33 | incoming: Rc<Incoming>, |
34 | } |
35 | |
36 | /// A handle to a [`LocalPool`](LocalPool) that implements |
37 | /// [`Spawn`](futures_task::Spawn). |
38 | #[derive(Clone, Debug)] |
39 | pub struct LocalSpawner { |
40 | incoming: Weak<Incoming>, |
41 | } |
42 | |
43 | type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>; |
44 | |
45 | pub(crate) struct ThreadNotify { |
46 | /// The (single) executor thread. |
47 | thread: Thread, |
48 | /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten" |
49 | /// before the next `park()`, which may otherwise happen if the code |
50 | /// being executed as part of the future(s) being polled makes use of |
51 | /// park / unpark calls of its own, i.e. we cannot assume that no other |
52 | /// code uses park / unpark on the executing `thread`. |
53 | unparked: AtomicBool, |
54 | } |
55 | |
56 | thread_local! { |
57 | static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify { |
58 | thread: thread::current(), |
59 | unparked: AtomicBool::new(false), |
60 | }); |
61 | } |
62 | |
63 | impl ArcWake for ThreadNotify { |
64 | fn wake_by_ref(arc_self: &Arc<Self>) { |
65 | // Make sure the wakeup is remembered until the next `park()`. |
66 | let unparked = arc_self.unparked.swap(true, Ordering::Release); |
67 | if !unparked { |
68 | // If the thread has not been unparked yet, it must be done |
69 | // now. If it was actually parked, it will run again, |
70 | // otherwise the token made available by `unpark` |
71 | // may be consumed before reaching `park()`, but `unparked` |
72 | // ensures it is not forgotten. |
73 | arc_self.thread.unpark(); |
74 | } |
75 | } |
76 | } |
77 | |
78 | // Set up and run a basic single-threaded spawner loop, invoking `f` on each |
79 | // turn. |
80 | fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T { |
81 | let _enter = enter().expect( |
82 | "cannot execute `LocalPool` executor from within \ |
83 | another executor" , |
84 | ); |
85 | |
86 | CURRENT_THREAD_NOTIFY.with(|thread_notify| { |
87 | let waker = waker_ref(thread_notify); |
88 | let mut cx = Context::from_waker(&waker); |
89 | loop { |
90 | if let Poll::Ready(t) = f(&mut cx) { |
91 | return t; |
92 | } |
93 | |
94 | // Wait for a wakeup. |
95 | while !thread_notify.unparked.swap(false, Ordering::Acquire) { |
96 | // No wakeup occurred. It may occur now, right before parking, |
97 | // but in that case the token made available by `unpark()` |
98 | // is guaranteed to still be available and `park()` is a no-op. |
99 | thread::park(); |
100 | } |
101 | } |
102 | }) |
103 | } |
104 | |
105 | /// Check for a wakeup, but don't consume it. |
106 | fn woken() -> bool { |
107 | CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire)) |
108 | } |
109 | |
110 | impl LocalPool { |
111 | /// Create a new, empty pool of tasks. |
112 | pub fn new() -> Self { |
113 | Self { pool: FuturesUnordered::new(), incoming: Default::default() } |
114 | } |
115 | |
116 | /// Get a clonable handle to the pool as a [`Spawn`]. |
117 | pub fn spawner(&self) -> LocalSpawner { |
118 | LocalSpawner { incoming: Rc::downgrade(&self.incoming) } |
119 | } |
120 | |
121 | /// Run all tasks in the pool to completion. |
122 | /// |
123 | /// ``` |
124 | /// use futures::executor::LocalPool; |
125 | /// |
126 | /// let mut pool = LocalPool::new(); |
127 | /// |
128 | /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()` |
129 | /// |
130 | /// // run *all* tasks in the pool to completion, including any newly-spawned ones. |
131 | /// pool.run(); |
132 | /// ``` |
133 | /// |
134 | /// The function will block the calling thread until *all* tasks in the pool |
135 | /// are complete, including any spawned while running existing tasks. |
136 | pub fn run(&mut self) { |
137 | run_executor(|cx| self.poll_pool(cx)) |
138 | } |
139 | |
140 | /// Runs all the tasks in the pool until the given future completes. |
141 | /// |
142 | /// ``` |
143 | /// use futures::executor::LocalPool; |
144 | /// |
145 | /// let mut pool = LocalPool::new(); |
146 | /// # let my_app = async {}; |
147 | /// |
148 | /// // run tasks in the pool until `my_app` completes |
149 | /// pool.run_until(my_app); |
150 | /// ``` |
151 | /// |
152 | /// The function will block the calling thread *only* until the future `f` |
153 | /// completes; there may still be incomplete tasks in the pool, which will |
154 | /// be inert after the call completes, but can continue with further use of |
155 | /// one of the pool's run or poll methods. While the function is running, |
156 | /// however, all tasks in the pool will try to make progress. |
157 | pub fn run_until<F: Future>(&mut self, future: F) -> F::Output { |
158 | pin_mut!(future); |
159 | |
160 | run_executor(|cx| { |
161 | { |
162 | // if our main task is done, so are we |
163 | let result = future.as_mut().poll(cx); |
164 | if let Poll::Ready(output) = result { |
165 | return Poll::Ready(output); |
166 | } |
167 | } |
168 | |
169 | let _ = self.poll_pool(cx); |
170 | Poll::Pending |
171 | }) |
172 | } |
173 | |
174 | /// Runs all tasks and returns after completing one future or until no more progress |
175 | /// can be made. Returns `true` if one future was completed, `false` otherwise. |
176 | /// |
177 | /// ``` |
178 | /// use futures::executor::LocalPool; |
179 | /// use futures::task::LocalSpawnExt; |
180 | /// use futures::future::{ready, pending}; |
181 | /// |
182 | /// let mut pool = LocalPool::new(); |
183 | /// let spawner = pool.spawner(); |
184 | /// |
185 | /// spawner.spawn_local(ready(())).unwrap(); |
186 | /// spawner.spawn_local(ready(())).unwrap(); |
187 | /// spawner.spawn_local(pending()).unwrap(); |
188 | /// |
189 | /// // Run the two ready tasks and return true for them. |
190 | /// pool.try_run_one(); // returns true after completing one of the ready futures |
191 | /// pool.try_run_one(); // returns true after completing the other ready future |
192 | /// |
193 | /// // the remaining task can not be completed |
194 | /// assert!(!pool.try_run_one()); // returns false |
195 | /// ``` |
196 | /// |
197 | /// This function will not block the calling thread and will return the moment |
198 | /// that there are no tasks left for which progress can be made or after exactly one |
199 | /// task was completed; Remaining incomplete tasks in the pool can continue with |
200 | /// further use of one of the pool's run or poll methods. |
201 | /// Though only one task will be completed, progress may be made on multiple tasks. |
202 | pub fn try_run_one(&mut self) -> bool { |
203 | run_executor(|cx| { |
204 | loop { |
205 | self.drain_incoming(); |
206 | |
207 | match self.pool.poll_next_unpin(cx) { |
208 | // Success! |
209 | Poll::Ready(Some(())) => return Poll::Ready(true), |
210 | // The pool was empty. |
211 | Poll::Ready(None) => return Poll::Ready(false), |
212 | Poll::Pending => (), |
213 | } |
214 | |
215 | if !self.incoming.borrow().is_empty() { |
216 | // New tasks were spawned; try again. |
217 | continue; |
218 | } else if woken() { |
219 | // The pool yielded to us, but there's more progress to be made. |
220 | return Poll::Pending; |
221 | } else { |
222 | return Poll::Ready(false); |
223 | } |
224 | } |
225 | }) |
226 | } |
227 | |
228 | /// Runs all tasks in the pool and returns if no more progress can be made |
229 | /// on any task. |
230 | /// |
231 | /// ``` |
232 | /// use futures::executor::LocalPool; |
233 | /// use futures::task::LocalSpawnExt; |
234 | /// use futures::future::{ready, pending}; |
235 | /// |
236 | /// let mut pool = LocalPool::new(); |
237 | /// let spawner = pool.spawner(); |
238 | /// |
239 | /// spawner.spawn_local(ready(())).unwrap(); |
240 | /// spawner.spawn_local(ready(())).unwrap(); |
241 | /// spawner.spawn_local(pending()).unwrap(); |
242 | /// |
243 | /// // Runs the two ready task and returns. |
244 | /// // The empty task remains in the pool. |
245 | /// pool.run_until_stalled(); |
246 | /// ``` |
247 | /// |
248 | /// This function will not block the calling thread and will return the moment |
249 | /// that there are no tasks left for which progress can be made; |
250 | /// remaining incomplete tasks in the pool can continue with further use of one |
251 | /// of the pool's run or poll methods. While the function is running, all tasks |
252 | /// in the pool will try to make progress. |
253 | pub fn run_until_stalled(&mut self) { |
254 | run_executor(|cx| match self.poll_pool(cx) { |
255 | // The pool is empty. |
256 | Poll::Ready(()) => Poll::Ready(()), |
257 | Poll::Pending => { |
258 | if woken() { |
259 | Poll::Pending |
260 | } else { |
261 | // We're stalled for now. |
262 | Poll::Ready(()) |
263 | } |
264 | } |
265 | }); |
266 | } |
267 | |
268 | /// Poll `self.pool`, re-filling it with any newly-spawned tasks. |
269 | /// Repeat until either the pool is empty, or it returns `Pending`. |
270 | /// |
271 | /// Returns `Ready` if the pool was empty, and `Pending` otherwise. |
272 | /// |
273 | /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily |
274 | /// mean that the pool can't make progress. |
275 | fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
276 | loop { |
277 | self.drain_incoming(); |
278 | |
279 | let pool_ret = self.pool.poll_next_unpin(cx); |
280 | |
281 | // We queued up some new tasks; add them and poll again. |
282 | if !self.incoming.borrow().is_empty() { |
283 | continue; |
284 | } |
285 | |
286 | match pool_ret { |
287 | Poll::Ready(Some(())) => continue, |
288 | Poll::Ready(None) => return Poll::Ready(()), |
289 | Poll::Pending => return Poll::Pending, |
290 | } |
291 | } |
292 | } |
293 | |
294 | /// Empty the incoming queue of newly-spawned tasks. |
295 | fn drain_incoming(&mut self) { |
296 | let mut incoming = self.incoming.borrow_mut(); |
297 | for task in incoming.drain(..) { |
298 | self.pool.push(task) |
299 | } |
300 | } |
301 | } |
302 | |
303 | impl Default for LocalPool { |
304 | fn default() -> Self { |
305 | Self::new() |
306 | } |
307 | } |
308 | |
309 | /// Run a future to completion on the current thread. |
310 | /// |
311 | /// This function will block the caller until the given future has completed. |
312 | /// |
313 | /// Use a [`LocalPool`](LocalPool) if you need finer-grained control over |
314 | /// spawned tasks. |
315 | pub fn block_on<F: Future>(f: F) -> F::Output { |
316 | pin_mut!(f); |
317 | run_executor(|cx| f.as_mut().poll(cx)) |
318 | } |
319 | |
320 | /// Turn a stream into a blocking iterator. |
321 | /// |
322 | /// When `next` is called on the resulting `BlockingStream`, the caller |
323 | /// will be blocked until the next element of the `Stream` becomes available. |
324 | pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> { |
325 | BlockingStream { stream } |
326 | } |
327 | |
328 | /// An iterator which blocks on values from a stream until they become available. |
329 | #[derive(Debug)] |
330 | pub struct BlockingStream<S: Stream + Unpin> { |
331 | stream: S, |
332 | } |
333 | |
334 | impl<S: Stream + Unpin> Deref for BlockingStream<S> { |
335 | type Target = S; |
336 | fn deref(&self) -> &Self::Target { |
337 | &self.stream |
338 | } |
339 | } |
340 | |
341 | impl<S: Stream + Unpin> DerefMut for BlockingStream<S> { |
342 | fn deref_mut(&mut self) -> &mut Self::Target { |
343 | &mut self.stream |
344 | } |
345 | } |
346 | |
347 | impl<S: Stream + Unpin> BlockingStream<S> { |
348 | /// Convert this `BlockingStream` into the inner `Stream` type. |
349 | pub fn into_inner(self) -> S { |
350 | self.stream |
351 | } |
352 | } |
353 | |
354 | impl<S: Stream + Unpin> Iterator for BlockingStream<S> { |
355 | type Item = S::Item; |
356 | |
357 | fn next(&mut self) -> Option<Self::Item> { |
358 | LocalPool::new().run_until(self.stream.next()) |
359 | } |
360 | |
361 | fn size_hint(&self) -> (usize, Option<usize>) { |
362 | self.stream.size_hint() |
363 | } |
364 | } |
365 | |
366 | impl Spawn for LocalSpawner { |
367 | fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { |
368 | if let Some(incoming) = self.incoming.upgrade() { |
369 | incoming.borrow_mut().push(future.into()); |
370 | Ok(()) |
371 | } else { |
372 | Err(SpawnError::shutdown()) |
373 | } |
374 | } |
375 | |
376 | fn status(&self) -> Result<(), SpawnError> { |
377 | if self.incoming.upgrade().is_some() { |
378 | Ok(()) |
379 | } else { |
380 | Err(SpawnError::shutdown()) |
381 | } |
382 | } |
383 | } |
384 | |
385 | impl LocalSpawn for LocalSpawner { |
386 | fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { |
387 | if let Some(incoming) = self.incoming.upgrade() { |
388 | incoming.borrow_mut().push(future); |
389 | Ok(()) |
390 | } else { |
391 | Err(SpawnError::shutdown()) |
392 | } |
393 | } |
394 | |
395 | fn status_local(&self) -> Result<(), SpawnError> { |
396 | if self.incoming.upgrade().is_some() { |
397 | Ok(()) |
398 | } else { |
399 | Err(SpawnError::shutdown()) |
400 | } |
401 | } |
402 | } |
403 | |