1 | //! Async executors. |
2 | //! |
3 | //! This crate provides two reference executors that trade performance for |
4 | //! functionality. They should be considered reference executors that are "good |
5 | //! enough" for most use cases. For more specialized use cases, consider writing |
6 | //! your own executor on top of [`async-task`]. |
7 | //! |
8 | //! [`async-task`]: https://crates.io/crates/async-task |
9 | //! |
10 | //! # Examples |
11 | //! |
12 | //! ``` |
13 | //! use async_executor::Executor; |
14 | //! use futures_lite::future; |
15 | //! |
16 | //! // Create a new executor. |
17 | //! let ex = Executor::new(); |
18 | //! |
19 | //! // Spawn a task. |
20 | //! let task = ex.spawn(async { |
21 | //! println!("Hello world" ); |
22 | //! }); |
23 | //! |
24 | //! // Run the executor until the task completes. |
25 | //! future::block_on(ex.run(task)); |
26 | //! ``` |
27 | |
28 | #![warn ( |
29 | missing_docs, |
30 | missing_debug_implementations, |
31 | rust_2018_idioms, |
32 | clippy::undocumented_unsafe_blocks |
33 | )] |
34 | #![doc ( |
35 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
36 | )] |
37 | #![doc ( |
38 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
39 | )] |
40 | #![cfg_attr (docsrs, feature(doc_cfg, doc_auto_cfg))] |
41 | |
42 | use std::fmt; |
43 | use std::marker::PhantomData; |
44 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
45 | use std::rc::Rc; |
46 | use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; |
47 | use std::sync::{Arc, Mutex, RwLock, TryLockError}; |
48 | use std::task::{Poll, Waker}; |
49 | |
50 | use async_task::{Builder, Runnable}; |
51 | use concurrent_queue::ConcurrentQueue; |
52 | use futures_lite::{future, prelude::*}; |
53 | use slab::Slab; |
54 | |
55 | #[cfg (feature = "static" )] |
56 | mod static_executors; |
57 | |
58 | #[doc (no_inline)] |
59 | pub use async_task::{FallibleTask, Task}; |
60 | #[cfg (feature = "static" )] |
61 | #[cfg_attr (docsrs, doc(cfg(any(feature = "static" ))))] |
62 | pub use static_executors::*; |
63 | |
64 | /// An async executor. |
65 | /// |
66 | /// # Examples |
67 | /// |
68 | /// A multi-threaded executor: |
69 | /// |
70 | /// ``` |
71 | /// use async_channel::unbounded; |
72 | /// use async_executor::Executor; |
73 | /// use easy_parallel::Parallel; |
74 | /// use futures_lite::future; |
75 | /// |
76 | /// let ex = Executor::new(); |
77 | /// let (signal, shutdown) = unbounded::<()>(); |
78 | /// |
79 | /// Parallel::new() |
80 | /// // Run four executor threads. |
81 | /// .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) |
82 | /// // Run the main future on the current thread. |
83 | /// .finish(|| future::block_on(async { |
84 | /// println!("Hello world!" ); |
85 | /// drop(signal); |
86 | /// })); |
87 | /// ``` |
88 | pub struct Executor<'a> { |
89 | /// The executor state. |
90 | state: AtomicPtr<State>, |
91 | |
92 | /// Makes the `'a` lifetime invariant. |
93 | _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>, |
94 | } |
95 | |
96 | // SAFETY: Executor stores no thread local state that can be accessed via other thread. |
97 | unsafe impl Send for Executor<'_> {} |
98 | // SAFETY: Executor internally synchronizes all of it's operations internally. |
99 | unsafe impl Sync for Executor<'_> {} |
100 | |
101 | impl UnwindSafe for Executor<'_> {} |
102 | impl RefUnwindSafe for Executor<'_> {} |
103 | |
104 | impl fmt::Debug for Executor<'_> { |
105 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
106 | debug_executor(self, name:"Executor" , f) |
107 | } |
108 | } |
109 | |
110 | impl<'a> Executor<'a> { |
111 | /// Creates a new executor. |
112 | /// |
113 | /// # Examples |
114 | /// |
115 | /// ``` |
116 | /// use async_executor::Executor; |
117 | /// |
118 | /// let ex = Executor::new(); |
119 | /// ``` |
120 | pub const fn new() -> Executor<'a> { |
121 | Executor { |
122 | state: AtomicPtr::new(std::ptr::null_mut()), |
123 | _marker: PhantomData, |
124 | } |
125 | } |
126 | |
127 | /// Returns `true` if there are no unfinished tasks. |
128 | /// |
129 | /// # Examples |
130 | /// |
131 | /// ``` |
132 | /// use async_executor::Executor; |
133 | /// |
134 | /// let ex = Executor::new(); |
135 | /// assert!(ex.is_empty()); |
136 | /// |
137 | /// let task = ex.spawn(async { |
138 | /// println!("Hello world" ); |
139 | /// }); |
140 | /// assert!(!ex.is_empty()); |
141 | /// |
142 | /// assert!(ex.try_tick()); |
143 | /// assert!(ex.is_empty()); |
144 | /// ``` |
145 | pub fn is_empty(&self) -> bool { |
146 | self.state().active.lock().unwrap().is_empty() |
147 | } |
148 | |
149 | /// Spawns a task onto the executor. |
150 | /// |
151 | /// # Examples |
152 | /// |
153 | /// ``` |
154 | /// use async_executor::Executor; |
155 | /// |
156 | /// let ex = Executor::new(); |
157 | /// |
158 | /// let task = ex.spawn(async { |
159 | /// println!("Hello world" ); |
160 | /// }); |
161 | /// ``` |
162 | pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> { |
163 | let mut active = self.state().active.lock().unwrap(); |
164 | |
165 | // SAFETY: `T` and the future are `Send`. |
166 | unsafe { self.spawn_inner(future, &mut active) } |
167 | } |
168 | |
169 | /// Spawns many tasks onto the executor. |
170 | /// |
171 | /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and |
172 | /// spawns all of the tasks in one go. With large amounts of tasks this can improve |
173 | /// contention. |
174 | /// |
175 | /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to |
176 | /// prevent runner thread starvation. It is assumed that the iterator provided does not |
177 | /// block; blocking iterators can lock up the internal mutex and therefore the entire |
178 | /// executor. |
179 | /// |
180 | /// ## Example |
181 | /// |
182 | /// ``` |
183 | /// use async_executor::Executor; |
184 | /// use futures_lite::{stream, prelude::*}; |
185 | /// use std::future::ready; |
186 | /// |
187 | /// # futures_lite::future::block_on(async { |
188 | /// let mut ex = Executor::new(); |
189 | /// |
190 | /// let futures = [ |
191 | /// ready(1), |
192 | /// ready(2), |
193 | /// ready(3) |
194 | /// ]; |
195 | /// |
196 | /// // Spawn all of the futures onto the executor at once. |
197 | /// let mut tasks = vec![]; |
198 | /// ex.spawn_many(futures, &mut tasks); |
199 | /// |
200 | /// // Await all of them. |
201 | /// let results = ex.run(async move { |
202 | /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await |
203 | /// }).await; |
204 | /// assert_eq!(results, [1, 2, 3]); |
205 | /// # }); |
206 | /// ``` |
207 | /// |
208 | /// [`spawn`]: Executor::spawn |
209 | pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>( |
210 | &self, |
211 | futures: impl IntoIterator<Item = F>, |
212 | handles: &mut impl Extend<Task<F::Output>>, |
213 | ) { |
214 | let mut active = Some(self.state().active.lock().unwrap()); |
215 | |
216 | // Convert the futures into tasks. |
217 | let tasks = futures.into_iter().enumerate().map(move |(i, future)| { |
218 | // SAFETY: `T` and the future are `Send`. |
219 | let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) }; |
220 | |
221 | // Yield the lock every once in a while to ease contention. |
222 | if i.wrapping_sub(1) % 500 == 0 { |
223 | drop(active.take()); |
224 | active = Some(self.state().active.lock().unwrap()); |
225 | } |
226 | |
227 | task |
228 | }); |
229 | |
230 | // Push the tasks to the user's collection. |
231 | handles.extend(tasks); |
232 | } |
233 | |
234 | /// Spawn a future while holding the inner lock. |
235 | /// |
236 | /// # Safety |
237 | /// |
238 | /// If this is an `Executor`, `F` and `T` must be `Send`. |
239 | unsafe fn spawn_inner<T: 'a>( |
240 | &self, |
241 | future: impl Future<Output = T> + 'a, |
242 | active: &mut Slab<Waker>, |
243 | ) -> Task<T> { |
244 | // Remove the task from the set of active tasks when the future finishes. |
245 | let entry = active.vacant_entry(); |
246 | let index = entry.key(); |
247 | let state = self.state_as_arc(); |
248 | let future = async move { |
249 | let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); |
250 | future.await |
251 | }; |
252 | |
253 | // Create the task and register it in the set of active tasks. |
254 | // |
255 | // SAFETY: |
256 | // |
257 | // If `future` is not `Send`, this must be a `LocalExecutor` as per this |
258 | // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, |
259 | // `try_tick`, `tick` and `run` can only be called from the origin |
260 | // thread of the `LocalExecutor`. Similarly, `spawn` can only be called |
261 | // from the origin thread, ensuring that `future` and the executor share |
262 | // the same origin thread. The `Runnable` can be scheduled from other |
263 | // threads, but because of the above `Runnable` can only be called or |
264 | // dropped on the origin thread. |
265 | // |
266 | // `future` is not `'static`, but we make sure that the `Runnable` does |
267 | // not outlive `'a`. When the executor is dropped, the `active` field is |
268 | // drained and all of the `Waker`s are woken. Then, the queue inside of |
269 | // the `Executor` is drained of all of its runnables. This ensures that |
270 | // runnables are dropped and this precondition is satisfied. |
271 | // |
272 | // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. |
273 | // Therefore we do not need to worry about what is done with the |
274 | // `Waker`. |
275 | let (runnable, task) = Builder::new() |
276 | .propagate_panic(true) |
277 | .spawn_unchecked(|()| future, self.schedule()); |
278 | entry.insert(runnable.waker()); |
279 | |
280 | runnable.schedule(); |
281 | task |
282 | } |
283 | |
284 | /// Attempts to run a task if at least one is scheduled. |
285 | /// |
286 | /// Running a scheduled task means simply polling its future once. |
287 | /// |
288 | /// # Examples |
289 | /// |
290 | /// ``` |
291 | /// use async_executor::Executor; |
292 | /// |
293 | /// let ex = Executor::new(); |
294 | /// assert!(!ex.try_tick()); // no tasks to run |
295 | /// |
296 | /// let task = ex.spawn(async { |
297 | /// println!("Hello world" ); |
298 | /// }); |
299 | /// assert!(ex.try_tick()); // a task was found |
300 | /// ``` |
301 | pub fn try_tick(&self) -> bool { |
302 | self.state().try_tick() |
303 | } |
304 | |
305 | /// Runs a single task. |
306 | /// |
307 | /// Running a task means simply polling its future once. |
308 | /// |
309 | /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. |
310 | /// |
311 | /// # Examples |
312 | /// |
313 | /// ``` |
314 | /// use async_executor::Executor; |
315 | /// use futures_lite::future; |
316 | /// |
317 | /// let ex = Executor::new(); |
318 | /// |
319 | /// let task = ex.spawn(async { |
320 | /// println!("Hello world" ); |
321 | /// }); |
322 | /// future::block_on(ex.tick()); // runs the task |
323 | /// ``` |
324 | pub async fn tick(&self) { |
325 | self.state().tick().await; |
326 | } |
327 | |
328 | /// Runs the executor until the given future completes. |
329 | /// |
330 | /// # Examples |
331 | /// |
332 | /// ``` |
333 | /// use async_executor::Executor; |
334 | /// use futures_lite::future; |
335 | /// |
336 | /// let ex = Executor::new(); |
337 | /// |
338 | /// let task = ex.spawn(async { 1 + 2 }); |
339 | /// let res = future::block_on(ex.run(async { task.await * 2 })); |
340 | /// |
341 | /// assert_eq!(res, 6); |
342 | /// ``` |
343 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
344 | self.state().run(future).await |
345 | } |
346 | |
347 | /// Returns a function that schedules a runnable task when it gets woken up. |
348 | fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { |
349 | let state = self.state_as_arc(); |
350 | |
351 | // TODO: If possible, push into the current local queue and notify the ticker. |
352 | move |runnable| { |
353 | state.queue.push(runnable).unwrap(); |
354 | state.notify(); |
355 | } |
356 | } |
357 | |
358 | /// Returns a pointer to the inner state. |
359 | #[inline ] |
360 | fn state_ptr(&self) -> *const State { |
361 | #[cold ] |
362 | fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State { |
363 | let state = Arc::new(State::new()); |
364 | // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65 |
365 | let ptr = Arc::into_raw(state) as *mut State; |
366 | if let Err(actual) = atomic_ptr.compare_exchange( |
367 | std::ptr::null_mut(), |
368 | ptr, |
369 | Ordering::AcqRel, |
370 | Ordering::Acquire, |
371 | ) { |
372 | // SAFETY: This was just created from Arc::into_raw. |
373 | drop(unsafe { Arc::from_raw(ptr) }); |
374 | actual |
375 | } else { |
376 | ptr |
377 | } |
378 | } |
379 | |
380 | let mut ptr = self.state.load(Ordering::Acquire); |
381 | if ptr.is_null() { |
382 | ptr = alloc_state(&self.state); |
383 | } |
384 | ptr |
385 | } |
386 | |
387 | /// Returns a reference to the inner state. |
388 | #[inline ] |
389 | fn state(&self) -> &State { |
390 | // SAFETY: So long as an Executor lives, it's state pointer will always be valid |
391 | // when accessed through state_ptr. |
392 | unsafe { &*self.state_ptr() } |
393 | } |
394 | |
395 | // Clones the inner state Arc |
396 | #[inline ] |
397 | fn state_as_arc(&self) -> Arc<State> { |
398 | // SAFETY: So long as an Executor lives, it's state pointer will always be a valid |
399 | // Arc when accessed through state_ptr. |
400 | let arc = unsafe { Arc::from_raw(self.state_ptr()) }; |
401 | let clone = arc.clone(); |
402 | std::mem::forget(arc); |
403 | clone |
404 | } |
405 | } |
406 | |
407 | impl Drop for Executor<'_> { |
408 | fn drop(&mut self) { |
409 | let ptr: *mut State = *self.state.get_mut(); |
410 | if ptr.is_null() { |
411 | return; |
412 | } |
413 | |
414 | // SAFETY: As ptr is not null, it was allocated via Arc::new and converted |
415 | // via Arc::into_raw in state_ptr. |
416 | let state: Arc = unsafe { Arc::from_raw(ptr) }; |
417 | |
418 | let mut active: MutexGuard<'_, Slab> = state.active.lock().unwrap_or_else(|e: PoisonError>| e.into_inner()); |
419 | for w: Waker in active.drain() { |
420 | w.wake(); |
421 | } |
422 | drop(active); |
423 | |
424 | while state.queue.pop().is_ok() {} |
425 | } |
426 | } |
427 | |
428 | impl<'a> Default for Executor<'a> { |
429 | fn default() -> Executor<'a> { |
430 | Executor::new() |
431 | } |
432 | } |
433 | |
434 | /// A thread-local executor. |
435 | /// |
436 | /// The executor can only be run on the thread that created it. |
437 | /// |
438 | /// # Examples |
439 | /// |
440 | /// ``` |
441 | /// use async_executor::LocalExecutor; |
442 | /// use futures_lite::future; |
443 | /// |
444 | /// let local_ex = LocalExecutor::new(); |
445 | /// |
446 | /// future::block_on(local_ex.run(async { |
447 | /// println!("Hello world!" ); |
448 | /// })); |
449 | /// ``` |
450 | pub struct LocalExecutor<'a> { |
451 | /// The inner executor. |
452 | inner: Executor<'a>, |
453 | |
454 | /// Makes the type `!Send` and `!Sync`. |
455 | _marker: PhantomData<Rc<()>>, |
456 | } |
457 | |
458 | impl UnwindSafe for LocalExecutor<'_> {} |
459 | impl RefUnwindSafe for LocalExecutor<'_> {} |
460 | |
461 | impl fmt::Debug for LocalExecutor<'_> { |
462 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
463 | debug_executor(&self.inner, name:"LocalExecutor" , f) |
464 | } |
465 | } |
466 | |
467 | impl<'a> LocalExecutor<'a> { |
468 | /// Creates a single-threaded executor. |
469 | /// |
470 | /// # Examples |
471 | /// |
472 | /// ``` |
473 | /// use async_executor::LocalExecutor; |
474 | /// |
475 | /// let local_ex = LocalExecutor::new(); |
476 | /// ``` |
477 | pub const fn new() -> LocalExecutor<'a> { |
478 | LocalExecutor { |
479 | inner: Executor::new(), |
480 | _marker: PhantomData, |
481 | } |
482 | } |
483 | |
484 | /// Returns `true` if there are no unfinished tasks. |
485 | /// |
486 | /// # Examples |
487 | /// |
488 | /// ``` |
489 | /// use async_executor::LocalExecutor; |
490 | /// |
491 | /// let local_ex = LocalExecutor::new(); |
492 | /// assert!(local_ex.is_empty()); |
493 | /// |
494 | /// let task = local_ex.spawn(async { |
495 | /// println!("Hello world" ); |
496 | /// }); |
497 | /// assert!(!local_ex.is_empty()); |
498 | /// |
499 | /// assert!(local_ex.try_tick()); |
500 | /// assert!(local_ex.is_empty()); |
501 | /// ``` |
502 | pub fn is_empty(&self) -> bool { |
503 | self.inner().is_empty() |
504 | } |
505 | |
506 | /// Spawns a task onto the executor. |
507 | /// |
508 | /// # Examples |
509 | /// |
510 | /// ``` |
511 | /// use async_executor::LocalExecutor; |
512 | /// |
513 | /// let local_ex = LocalExecutor::new(); |
514 | /// |
515 | /// let task = local_ex.spawn(async { |
516 | /// println!("Hello world" ); |
517 | /// }); |
518 | /// ``` |
519 | pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> { |
520 | let mut active = self.inner().state().active.lock().unwrap(); |
521 | |
522 | // SAFETY: This executor is not thread safe, so the future and its result |
523 | // cannot be sent to another thread. |
524 | unsafe { self.inner().spawn_inner(future, &mut active) } |
525 | } |
526 | |
527 | /// Spawns many tasks onto the executor. |
528 | /// |
529 | /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and |
530 | /// spawns all of the tasks in one go. With large amounts of tasks this can improve |
531 | /// contention. |
532 | /// |
533 | /// It is assumed that the iterator provided does not block; blocking iterators can lock up |
534 | /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the |
535 | /// mutex is not released, as there are no other threads that can poll this executor. |
536 | /// |
537 | /// ## Example |
538 | /// |
539 | /// ``` |
540 | /// use async_executor::LocalExecutor; |
541 | /// use futures_lite::{stream, prelude::*}; |
542 | /// use std::future::ready; |
543 | /// |
544 | /// # futures_lite::future::block_on(async { |
545 | /// let mut ex = LocalExecutor::new(); |
546 | /// |
547 | /// let futures = [ |
548 | /// ready(1), |
549 | /// ready(2), |
550 | /// ready(3) |
551 | /// ]; |
552 | /// |
553 | /// // Spawn all of the futures onto the executor at once. |
554 | /// let mut tasks = vec![]; |
555 | /// ex.spawn_many(futures, &mut tasks); |
556 | /// |
557 | /// // Await all of them. |
558 | /// let results = ex.run(async move { |
559 | /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await |
560 | /// }).await; |
561 | /// assert_eq!(results, [1, 2, 3]); |
562 | /// # }); |
563 | /// ``` |
564 | /// |
565 | /// [`spawn`]: LocalExecutor::spawn |
566 | /// [`Executor::spawn_many`]: Executor::spawn_many |
567 | pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>( |
568 | &self, |
569 | futures: impl IntoIterator<Item = F>, |
570 | handles: &mut impl Extend<Task<F::Output>>, |
571 | ) { |
572 | let mut active = self.inner().state().active.lock().unwrap(); |
573 | |
574 | // Convert all of the futures to tasks. |
575 | let tasks = futures.into_iter().map(|future| { |
576 | // SAFETY: This executor is not thread safe, so the future and its result |
577 | // cannot be sent to another thread. |
578 | unsafe { self.inner().spawn_inner(future, &mut active) } |
579 | |
580 | // As only one thread can spawn or poll tasks at a time, there is no need |
581 | // to release lock contention here. |
582 | }); |
583 | |
584 | // Push them to the user's collection. |
585 | handles.extend(tasks); |
586 | } |
587 | |
588 | /// Attempts to run a task if at least one is scheduled. |
589 | /// |
590 | /// Running a scheduled task means simply polling its future once. |
591 | /// |
592 | /// # Examples |
593 | /// |
594 | /// ``` |
595 | /// use async_executor::LocalExecutor; |
596 | /// |
597 | /// let ex = LocalExecutor::new(); |
598 | /// assert!(!ex.try_tick()); // no tasks to run |
599 | /// |
600 | /// let task = ex.spawn(async { |
601 | /// println!("Hello world" ); |
602 | /// }); |
603 | /// assert!(ex.try_tick()); // a task was found |
604 | /// ``` |
605 | pub fn try_tick(&self) -> bool { |
606 | self.inner().try_tick() |
607 | } |
608 | |
609 | /// Runs a single task. |
610 | /// |
611 | /// Running a task means simply polling its future once. |
612 | /// |
613 | /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. |
614 | /// |
615 | /// # Examples |
616 | /// |
617 | /// ``` |
618 | /// use async_executor::LocalExecutor; |
619 | /// use futures_lite::future; |
620 | /// |
621 | /// let ex = LocalExecutor::new(); |
622 | /// |
623 | /// let task = ex.spawn(async { |
624 | /// println!("Hello world" ); |
625 | /// }); |
626 | /// future::block_on(ex.tick()); // runs the task |
627 | /// ``` |
628 | pub async fn tick(&self) { |
629 | self.inner().tick().await |
630 | } |
631 | |
632 | /// Runs the executor until the given future completes. |
633 | /// |
634 | /// # Examples |
635 | /// |
636 | /// ``` |
637 | /// use async_executor::LocalExecutor; |
638 | /// use futures_lite::future; |
639 | /// |
640 | /// let local_ex = LocalExecutor::new(); |
641 | /// |
642 | /// let task = local_ex.spawn(async { 1 + 2 }); |
643 | /// let res = future::block_on(local_ex.run(async { task.await * 2 })); |
644 | /// |
645 | /// assert_eq!(res, 6); |
646 | /// ``` |
647 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
648 | self.inner().run(future).await |
649 | } |
650 | |
651 | /// Returns a reference to the inner executor. |
652 | fn inner(&self) -> &Executor<'a> { |
653 | &self.inner |
654 | } |
655 | } |
656 | |
657 | impl<'a> Default for LocalExecutor<'a> { |
658 | fn default() -> LocalExecutor<'a> { |
659 | LocalExecutor::new() |
660 | } |
661 | } |
662 | |
663 | /// The state of a executor. |
664 | struct State { |
665 | /// The global queue. |
666 | queue: ConcurrentQueue<Runnable>, |
667 | |
668 | /// Local queues created by runners. |
669 | local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>, |
670 | |
671 | /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. |
672 | notified: AtomicBool, |
673 | |
674 | /// A list of sleeping tickers. |
675 | sleepers: Mutex<Sleepers>, |
676 | |
677 | /// Currently active tasks. |
678 | active: Mutex<Slab<Waker>>, |
679 | } |
680 | |
681 | impl State { |
682 | /// Creates state for a new executor. |
683 | const fn new() -> State { |
684 | State { |
685 | queue: ConcurrentQueue::unbounded(), |
686 | local_queues: RwLock::new(Vec::new()), |
687 | notified: AtomicBool::new(true), |
688 | sleepers: Mutex::new(Sleepers { |
689 | count: 0, |
690 | wakers: Vec::new(), |
691 | free_ids: Vec::new(), |
692 | }), |
693 | active: Mutex::new(Slab::new()), |
694 | } |
695 | } |
696 | |
697 | /// Notifies a sleeping ticker. |
698 | #[inline ] |
699 | fn notify(&self) { |
700 | if self |
701 | .notified |
702 | .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) |
703 | .is_ok() |
704 | { |
705 | let waker = self.sleepers.lock().unwrap().notify(); |
706 | if let Some(w) = waker { |
707 | w.wake(); |
708 | } |
709 | } |
710 | } |
711 | |
712 | pub(crate) fn try_tick(&self) -> bool { |
713 | match self.queue.pop() { |
714 | Err(_) => false, |
715 | Ok(runnable) => { |
716 | // Notify another ticker now to pick up where this ticker left off, just in case |
717 | // running the task takes a long time. |
718 | self.notify(); |
719 | |
720 | // Run the task. |
721 | runnable.run(); |
722 | true |
723 | } |
724 | } |
725 | } |
726 | |
727 | pub(crate) async fn tick(&self) { |
728 | let runnable = Ticker::new(self).runnable().await; |
729 | runnable.run(); |
730 | } |
731 | |
732 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
733 | let mut runner = Runner::new(self); |
734 | let mut rng = fastrand::Rng::new(); |
735 | |
736 | // A future that runs tasks forever. |
737 | let run_forever = async { |
738 | loop { |
739 | for _ in 0..200 { |
740 | let runnable = runner.runnable(&mut rng).await; |
741 | runnable.run(); |
742 | } |
743 | future::yield_now().await; |
744 | } |
745 | }; |
746 | |
747 | // Run `future` and `run_forever` concurrently until `future` completes. |
748 | future.or(run_forever).await |
749 | } |
750 | } |
751 | |
752 | /// A list of sleeping tickers. |
753 | struct Sleepers { |
754 | /// Number of sleeping tickers (both notified and unnotified). |
755 | count: usize, |
756 | |
757 | /// IDs and wakers of sleeping unnotified tickers. |
758 | /// |
759 | /// A sleeping ticker is notified when its waker is missing from this list. |
760 | wakers: Vec<(usize, Waker)>, |
761 | |
762 | /// Reclaimed IDs. |
763 | free_ids: Vec<usize>, |
764 | } |
765 | |
766 | impl Sleepers { |
767 | /// Inserts a new sleeping ticker. |
768 | fn insert(&mut self, waker: &Waker) -> usize { |
769 | let id = match self.free_ids.pop() { |
770 | Some(id) => id, |
771 | None => self.count + 1, |
772 | }; |
773 | self.count += 1; |
774 | self.wakers.push((id, waker.clone())); |
775 | id |
776 | } |
777 | |
778 | /// Re-inserts a sleeping ticker's waker if it was notified. |
779 | /// |
780 | /// Returns `true` if the ticker was notified. |
781 | fn update(&mut self, id: usize, waker: &Waker) -> bool { |
782 | for item in &mut self.wakers { |
783 | if item.0 == id { |
784 | item.1.clone_from(waker); |
785 | return false; |
786 | } |
787 | } |
788 | |
789 | self.wakers.push((id, waker.clone())); |
790 | true |
791 | } |
792 | |
793 | /// Removes a previously inserted sleeping ticker. |
794 | /// |
795 | /// Returns `true` if the ticker was notified. |
796 | fn remove(&mut self, id: usize) -> bool { |
797 | self.count -= 1; |
798 | self.free_ids.push(id); |
799 | |
800 | for i in (0..self.wakers.len()).rev() { |
801 | if self.wakers[i].0 == id { |
802 | self.wakers.remove(i); |
803 | return false; |
804 | } |
805 | } |
806 | true |
807 | } |
808 | |
809 | /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. |
810 | fn is_notified(&self) -> bool { |
811 | self.count == 0 || self.count > self.wakers.len() |
812 | } |
813 | |
814 | /// Returns notification waker for a sleeping ticker. |
815 | /// |
816 | /// If a ticker was notified already or there are no tickers, `None` will be returned. |
817 | fn notify(&mut self) -> Option<Waker> { |
818 | if self.wakers.len() == self.count { |
819 | self.wakers.pop().map(|item| item.1) |
820 | } else { |
821 | None |
822 | } |
823 | } |
824 | } |
825 | |
826 | /// Runs task one by one. |
827 | struct Ticker<'a> { |
828 | /// The executor state. |
829 | state: &'a State, |
830 | |
831 | /// Set to a non-zero sleeper ID when in sleeping state. |
832 | /// |
833 | /// States a ticker can be in: |
834 | /// 1) Woken. |
835 | /// 2a) Sleeping and unnotified. |
836 | /// 2b) Sleeping and notified. |
837 | sleeping: usize, |
838 | } |
839 | |
840 | impl Ticker<'_> { |
841 | /// Creates a ticker. |
842 | fn new(state: &State) -> Ticker<'_> { |
843 | Ticker { state, sleeping: 0 } |
844 | } |
845 | |
846 | /// Moves the ticker into sleeping and unnotified state. |
847 | /// |
848 | /// Returns `false` if the ticker was already sleeping and unnotified. |
849 | fn sleep(&mut self, waker: &Waker) -> bool { |
850 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
851 | |
852 | match self.sleeping { |
853 | // Move to sleeping state. |
854 | 0 => { |
855 | self.sleeping = sleepers.insert(waker); |
856 | } |
857 | |
858 | // Already sleeping, check if notified. |
859 | id => { |
860 | if !sleepers.update(id, waker) { |
861 | return false; |
862 | } |
863 | } |
864 | } |
865 | |
866 | self.state |
867 | .notified |
868 | .store(sleepers.is_notified(), Ordering::Release); |
869 | |
870 | true |
871 | } |
872 | |
873 | /// Moves the ticker into woken state. |
874 | fn wake(&mut self) { |
875 | if self.sleeping != 0 { |
876 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
877 | sleepers.remove(self.sleeping); |
878 | |
879 | self.state |
880 | .notified |
881 | .store(sleepers.is_notified(), Ordering::Release); |
882 | } |
883 | self.sleeping = 0; |
884 | } |
885 | |
886 | /// Waits for the next runnable task to run. |
887 | async fn runnable(&mut self) -> Runnable { |
888 | self.runnable_with(|| self.state.queue.pop().ok()).await |
889 | } |
890 | |
891 | /// Waits for the next runnable task to run, given a function that searches for a task. |
892 | async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable { |
893 | future::poll_fn(|cx| { |
894 | loop { |
895 | match search() { |
896 | None => { |
897 | // Move to sleeping and unnotified state. |
898 | if !self.sleep(cx.waker()) { |
899 | // If already sleeping and unnotified, return. |
900 | return Poll::Pending; |
901 | } |
902 | } |
903 | Some(r) => { |
904 | // Wake up. |
905 | self.wake(); |
906 | |
907 | // Notify another ticker now to pick up where this ticker left off, just in |
908 | // case running the task takes a long time. |
909 | self.state.notify(); |
910 | |
911 | return Poll::Ready(r); |
912 | } |
913 | } |
914 | } |
915 | }) |
916 | .await |
917 | } |
918 | } |
919 | |
920 | impl Drop for Ticker<'_> { |
921 | fn drop(&mut self) { |
922 | // If this ticker is in sleeping state, it must be removed from the sleepers list. |
923 | if self.sleeping != 0 { |
924 | let mut sleepers: MutexGuard<'_, Sleepers> = self.state.sleepers.lock().unwrap(); |
925 | let notified: bool = sleepers.remove(self.sleeping); |
926 | |
927 | self.state |
928 | .notified |
929 | .store(val:sleepers.is_notified(), order:Ordering::Release); |
930 | |
931 | // If this ticker was notified, then notify another ticker. |
932 | if notified { |
933 | drop(sleepers); |
934 | self.state.notify(); |
935 | } |
936 | } |
937 | } |
938 | } |
939 | |
940 | /// A worker in a work-stealing executor. |
941 | /// |
942 | /// This is just a ticker that also has an associated local queue for improved cache locality. |
943 | struct Runner<'a> { |
944 | /// The executor state. |
945 | state: &'a State, |
946 | |
947 | /// Inner ticker. |
948 | ticker: Ticker<'a>, |
949 | |
950 | /// The local queue. |
951 | local: Arc<ConcurrentQueue<Runnable>>, |
952 | |
953 | /// Bumped every time a runnable task is found. |
954 | ticks: usize, |
955 | } |
956 | |
957 | impl Runner<'_> { |
958 | /// Creates a runner and registers it in the executor state. |
959 | fn new(state: &State) -> Runner<'_> { |
960 | let runner = Runner { |
961 | state, |
962 | ticker: Ticker::new(state), |
963 | local: Arc::new(ConcurrentQueue::bounded(512)), |
964 | ticks: 0, |
965 | }; |
966 | state |
967 | .local_queues |
968 | .write() |
969 | .unwrap() |
970 | .push(runner.local.clone()); |
971 | runner |
972 | } |
973 | |
974 | /// Waits for the next runnable task to run. |
975 | async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable { |
976 | let runnable = self |
977 | .ticker |
978 | .runnable_with(|| { |
979 | // Try the local queue. |
980 | if let Ok(r) = self.local.pop() { |
981 | return Some(r); |
982 | } |
983 | |
984 | // Try stealing from the global queue. |
985 | if let Ok(r) = self.state.queue.pop() { |
986 | steal(&self.state.queue, &self.local); |
987 | return Some(r); |
988 | } |
989 | |
990 | // Try stealing from other runners. |
991 | let local_queues = self.state.local_queues.read().unwrap(); |
992 | |
993 | // Pick a random starting point in the iterator list and rotate the list. |
994 | let n = local_queues.len(); |
995 | let start = rng.usize(..n); |
996 | let iter = local_queues |
997 | .iter() |
998 | .chain(local_queues.iter()) |
999 | .skip(start) |
1000 | .take(n); |
1001 | |
1002 | // Remove this runner's local queue. |
1003 | let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); |
1004 | |
1005 | // Try stealing from each local queue in the list. |
1006 | for local in iter { |
1007 | steal(local, &self.local); |
1008 | if let Ok(r) = self.local.pop() { |
1009 | return Some(r); |
1010 | } |
1011 | } |
1012 | |
1013 | None |
1014 | }) |
1015 | .await; |
1016 | |
1017 | // Bump the tick counter. |
1018 | self.ticks = self.ticks.wrapping_add(1); |
1019 | |
1020 | if self.ticks % 64 == 0 { |
1021 | // Steal tasks from the global queue to ensure fair task scheduling. |
1022 | steal(&self.state.queue, &self.local); |
1023 | } |
1024 | |
1025 | runnable |
1026 | } |
1027 | } |
1028 | |
1029 | impl Drop for Runner<'_> { |
1030 | fn drop(&mut self) { |
1031 | // Remove the local queue. |
1032 | self.state |
1033 | .local_queues |
1034 | .write() |
1035 | .unwrap() |
1036 | .retain(|local: &Arc>| !Arc::ptr_eq(this:local, &self.local)); |
1037 | |
1038 | // Re-schedule remaining tasks in the local queue. |
1039 | while let Ok(r: Runnable) = self.local.pop() { |
1040 | r.schedule(); |
1041 | } |
1042 | } |
1043 | } |
1044 | |
1045 | /// Steals some items from one queue into another. |
1046 | fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) { |
1047 | // Half of `src`'s length rounded up. |
1048 | let mut count: usize = (src.len() + 1) / 2; |
1049 | |
1050 | if count > 0 { |
1051 | // Don't steal more than fits into the queue. |
1052 | if let Some(cap: usize) = dest.capacity() { |
1053 | count = count.min(cap - dest.len()); |
1054 | } |
1055 | |
1056 | // Steal tasks. |
1057 | for _ in 0..count { |
1058 | if let Ok(t: T) = src.pop() { |
1059 | assert!(dest.push(t).is_ok()); |
1060 | } else { |
1061 | break; |
1062 | } |
1063 | } |
1064 | } |
1065 | } |
1066 | |
1067 | /// Debug implementation for `Executor` and `LocalExecutor`. |
1068 | fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1069 | // Get a reference to the state. |
1070 | let ptr: *mut State = executor.state.load(order:Ordering::Acquire); |
1071 | if ptr.is_null() { |
1072 | // The executor has not been initialized. |
1073 | struct Uninitialized; |
1074 | |
1075 | impl fmt::Debug for Uninitialized { |
1076 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1077 | f.write_str(data:"<uninitialized>" ) |
1078 | } |
1079 | } |
1080 | |
1081 | return f.debug_tuple(name).field(&Uninitialized).finish(); |
1082 | } |
1083 | |
1084 | // SAFETY: If the state pointer is not null, it must have been |
1085 | // allocated properly by Arc::new and converted via Arc::into_raw |
1086 | // in state_ptr. |
1087 | let state: &State = unsafe { &*ptr }; |
1088 | |
1089 | debug_state(state, name, f) |
1090 | } |
1091 | |
1092 | /// Debug implementation for `Executor` and `LocalExecutor`. |
1093 | fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1094 | /// Debug wrapper for the number of active tasks. |
1095 | struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>); |
1096 | |
1097 | impl fmt::Debug for ActiveTasks<'_> { |
1098 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1099 | match self.0.try_lock() { |
1100 | Ok(lock) => fmt::Debug::fmt(&lock.len(), f), |
1101 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
1102 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
1103 | } |
1104 | } |
1105 | } |
1106 | |
1107 | /// Debug wrapper for the local runners. |
1108 | struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>); |
1109 | |
1110 | impl fmt::Debug for LocalRunners<'_> { |
1111 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1112 | match self.0.try_read() { |
1113 | Ok(lock) => f |
1114 | .debug_list() |
1115 | .entries(lock.iter().map(|queue| queue.len())) |
1116 | .finish(), |
1117 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
1118 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
1119 | } |
1120 | } |
1121 | } |
1122 | |
1123 | /// Debug wrapper for the sleepers. |
1124 | struct SleepCount<'a>(&'a Mutex<Sleepers>); |
1125 | |
1126 | impl fmt::Debug for SleepCount<'_> { |
1127 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1128 | match self.0.try_lock() { |
1129 | Ok(lock) => fmt::Debug::fmt(&lock.count, f), |
1130 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
1131 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
1132 | } |
1133 | } |
1134 | } |
1135 | |
1136 | f.debug_struct(name) |
1137 | .field("active" , &ActiveTasks(&state.active)) |
1138 | .field("global_tasks" , &state.queue.len()) |
1139 | .field("local_runners" , &LocalRunners(&state.local_queues)) |
1140 | .field("sleepers" , &SleepCount(&state.sleepers)) |
1141 | .finish() |
1142 | } |
1143 | |
1144 | /// Runs a closure when dropped. |
1145 | struct CallOnDrop<F: FnMut()>(F); |
1146 | |
1147 | impl<F: FnMut()> Drop for CallOnDrop<F> { |
1148 | fn drop(&mut self) { |
1149 | (self.0)(); |
1150 | } |
1151 | } |
1152 | |
1153 | fn _ensure_send_and_sync() { |
1154 | use futures_lite::future::pending; |
1155 | |
1156 | fn is_send<T: Send>(_: T) {} |
1157 | fn is_sync<T: Sync>(_: T) {} |
1158 | fn is_static<T: 'static>(_: T) {} |
1159 | |
1160 | is_send::<Executor<'_>>(Executor::new()); |
1161 | is_sync::<Executor<'_>>(Executor::new()); |
1162 | |
1163 | let ex = Executor::new(); |
1164 | is_send(ex.run(pending::<()>())); |
1165 | is_sync(ex.run(pending::<()>())); |
1166 | is_send(ex.tick()); |
1167 | is_sync(ex.tick()); |
1168 | is_send(ex.schedule()); |
1169 | is_sync(ex.schedule()); |
1170 | is_static(ex.schedule()); |
1171 | |
1172 | /// ```compile_fail |
1173 | /// use async_executor::LocalExecutor; |
1174 | /// use futures_lite::future::pending; |
1175 | /// |
1176 | /// fn is_send<T: Send>(_: T) {} |
1177 | /// fn is_sync<T: Sync>(_: T) {} |
1178 | /// |
1179 | /// is_send::<LocalExecutor<'_>>(LocalExecutor::new()); |
1180 | /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new()); |
1181 | /// |
1182 | /// let ex = LocalExecutor::new(); |
1183 | /// is_send(ex.run(pending::<()>())); |
1184 | /// is_sync(ex.run(pending::<()>())); |
1185 | /// is_send(ex.tick()); |
1186 | /// is_sync(ex.tick()); |
1187 | /// ``` |
1188 | fn _negative_test() {} |
1189 | } |
1190 | |