1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//! println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
29#![doc(
30 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
31)]
32#![doc(
33 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
34)]
35
36use std::fmt;
37use std::future::Future;
38use std::marker::PhantomData;
39use std::panic::{RefUnwindSafe, UnwindSafe};
40use std::rc::Rc;
41use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
42use std::sync::{Arc, Mutex, RwLock, TryLockError};
43use std::task::{Poll, Waker};
44
45use async_lock::OnceCell;
46use async_task::{Builder, Runnable};
47use concurrent_queue::ConcurrentQueue;
48use futures_lite::{future, prelude::*};
49use slab::Slab;
50
51#[doc(no_inline)]
52pub use async_task::Task;
53
54/// An async executor.
55///
56/// # Examples
57///
58/// A multi-threaded executor:
59///
60/// ```
61/// use async_channel::unbounded;
62/// use async_executor::Executor;
63/// use easy_parallel::Parallel;
64/// use futures_lite::future;
65///
66/// let ex = Executor::new();
67/// let (signal, shutdown) = unbounded::<()>();
68///
69/// Parallel::new()
70/// // Run four executor threads.
71/// .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
72/// // Run the main future on the current thread.
73/// .finish(|| future::block_on(async {
74/// println!("Hello world!");
75/// drop(signal);
76/// }));
77/// ```
78pub struct Executor<'a> {
79 /// The executor state.
80 state: OnceCell<Arc<State>>,
81
82 /// Makes the `'a` lifetime invariant.
83 _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
84}
85
86unsafe impl Send for Executor<'_> {}
87unsafe impl Sync for Executor<'_> {}
88
89impl UnwindSafe for Executor<'_> {}
90impl RefUnwindSafe for Executor<'_> {}
91
92impl fmt::Debug for Executor<'_> {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 debug_executor(self, name:"Executor", f)
95 }
96}
97
98impl<'a> Executor<'a> {
99 /// Creates a new executor.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// use async_executor::Executor;
105 ///
106 /// let ex = Executor::new();
107 /// ```
108 pub const fn new() -> Executor<'a> {
109 Executor {
110 state: OnceCell::new(),
111 _marker: PhantomData,
112 }
113 }
114
115 /// Returns `true` if there are no unfinished tasks.
116 ///
117 /// # Examples
118 ///
119 /// ```
120 /// use async_executor::Executor;
121 ///
122 /// let ex = Executor::new();
123 /// assert!(ex.is_empty());
124 ///
125 /// let task = ex.spawn(async {
126 /// println!("Hello world");
127 /// });
128 /// assert!(!ex.is_empty());
129 ///
130 /// assert!(ex.try_tick());
131 /// assert!(ex.is_empty());
132 /// ```
133 pub fn is_empty(&self) -> bool {
134 self.state().active.lock().unwrap().is_empty()
135 }
136
137 /// Spawns a task onto the executor.
138 ///
139 /// # Examples
140 ///
141 /// ```
142 /// use async_executor::Executor;
143 ///
144 /// let ex = Executor::new();
145 ///
146 /// let task = ex.spawn(async {
147 /// println!("Hello world");
148 /// });
149 /// ```
150 pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
151 let mut active = self.state().active.lock().unwrap();
152
153 // Remove the task from the set of active tasks when the future finishes.
154 let index = active.vacant_entry().key();
155 let state = self.state().clone();
156 let future = async move {
157 let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
158 future.await
159 };
160
161 // Create the task and register it in the set of active tasks.
162 let (runnable, task) = unsafe {
163 Builder::new()
164 .propagate_panic(true)
165 .spawn_unchecked(|()| future, self.schedule())
166 };
167 active.insert(runnable.waker());
168
169 runnable.schedule();
170 task
171 }
172
173 /// Attempts to run a task if at least one is scheduled.
174 ///
175 /// Running a scheduled task means simply polling its future once.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use async_executor::Executor;
181 ///
182 /// let ex = Executor::new();
183 /// assert!(!ex.try_tick()); // no tasks to run
184 ///
185 /// let task = ex.spawn(async {
186 /// println!("Hello world");
187 /// });
188 /// assert!(ex.try_tick()); // a task was found
189 /// ```
190 pub fn try_tick(&self) -> bool {
191 match self.state().queue.pop() {
192 Err(_) => false,
193 Ok(runnable) => {
194 // Notify another ticker now to pick up where this ticker left off, just in case
195 // running the task takes a long time.
196 self.state().notify();
197
198 // Run the task.
199 runnable.run();
200 true
201 }
202 }
203 }
204
205 /// Runs a single task.
206 ///
207 /// Running a task means simply polling its future once.
208 ///
209 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// use async_executor::Executor;
215 /// use futures_lite::future;
216 ///
217 /// let ex = Executor::new();
218 ///
219 /// let task = ex.spawn(async {
220 /// println!("Hello world");
221 /// });
222 /// future::block_on(ex.tick()); // runs the task
223 /// ```
224 pub async fn tick(&self) {
225 let state = self.state();
226 let runnable = Ticker::new(state).runnable().await;
227 runnable.run();
228 }
229
230 /// Runs the executor until the given future completes.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use async_executor::Executor;
236 /// use futures_lite::future;
237 ///
238 /// let ex = Executor::new();
239 ///
240 /// let task = ex.spawn(async { 1 + 2 });
241 /// let res = future::block_on(ex.run(async { task.await * 2 }));
242 ///
243 /// assert_eq!(res, 6);
244 /// ```
245 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
246 let runner = Runner::new(self.state());
247 let mut rng = fastrand::Rng::new();
248
249 // A future that runs tasks forever.
250 let run_forever = async {
251 loop {
252 for _ in 0..200 {
253 let runnable = runner.runnable(&mut rng).await;
254 runnable.run();
255 }
256 future::yield_now().await;
257 }
258 };
259
260 // Run `future` and `run_forever` concurrently until `future` completes.
261 future.or(run_forever).await
262 }
263
264 /// Returns a function that schedules a runnable task when it gets woken up.
265 fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
266 let state = self.state().clone();
267
268 // TODO: If possible, push into the current local queue and notify the ticker.
269 move |runnable| {
270 state.queue.push(runnable).unwrap();
271 state.notify();
272 }
273 }
274
275 /// Returns a reference to the inner state.
276 fn state(&self) -> &Arc<State> {
277 #[cfg(not(target_family = "wasm"))]
278 {
279 return self.state.get_or_init_blocking(|| Arc::new(State::new()));
280 }
281
282 // Some projects use this on WASM for some reason. In this case get_or_init_blocking
283 // doesn't work. Just poll the future once and panic if there is contention.
284 #[cfg(target_family = "wasm")]
285 future::block_on(future::poll_once(
286 self.state.get_or_init(|| async { Arc::new(State::new()) }),
287 ))
288 .expect("encountered contention on WASM")
289 }
290}
291
292impl Drop for Executor<'_> {
293 fn drop(&mut self) {
294 if let Some(state: &Arc) = self.state.get() {
295 let mut active: MutexGuard<'_, Slab> = state.active.lock().unwrap();
296 for w: Waker in active.drain() {
297 w.wake();
298 }
299 drop(active);
300
301 while state.queue.pop().is_ok() {}
302 }
303 }
304}
305
306impl<'a> Default for Executor<'a> {
307 fn default() -> Executor<'a> {
308 Executor::new()
309 }
310}
311
312/// A thread-local executor.
313///
314/// The executor can only be run on the thread that created it.
315///
316/// # Examples
317///
318/// ```
319/// use async_executor::LocalExecutor;
320/// use futures_lite::future;
321///
322/// let local_ex = LocalExecutor::new();
323///
324/// future::block_on(local_ex.run(async {
325/// println!("Hello world!");
326/// }));
327/// ```
328pub struct LocalExecutor<'a> {
329 /// The inner executor.
330 inner: Executor<'a>,
331
332 /// Makes the type `!Send` and `!Sync`.
333 _marker: PhantomData<Rc<()>>,
334}
335
336impl UnwindSafe for LocalExecutor<'_> {}
337impl RefUnwindSafe for LocalExecutor<'_> {}
338
339impl fmt::Debug for LocalExecutor<'_> {
340 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341 debug_executor(&self.inner, name:"LocalExecutor", f)
342 }
343}
344
345impl<'a> LocalExecutor<'a> {
346 /// Creates a single-threaded executor.
347 ///
348 /// # Examples
349 ///
350 /// ```
351 /// use async_executor::LocalExecutor;
352 ///
353 /// let local_ex = LocalExecutor::new();
354 /// ```
355 pub const fn new() -> LocalExecutor<'a> {
356 LocalExecutor {
357 inner: Executor::new(),
358 _marker: PhantomData,
359 }
360 }
361
362 /// Returns `true` if there are no unfinished tasks.
363 ///
364 /// # Examples
365 ///
366 /// ```
367 /// use async_executor::LocalExecutor;
368 ///
369 /// let local_ex = LocalExecutor::new();
370 /// assert!(local_ex.is_empty());
371 ///
372 /// let task = local_ex.spawn(async {
373 /// println!("Hello world");
374 /// });
375 /// assert!(!local_ex.is_empty());
376 ///
377 /// assert!(local_ex.try_tick());
378 /// assert!(local_ex.is_empty());
379 /// ```
380 pub fn is_empty(&self) -> bool {
381 self.inner().is_empty()
382 }
383
384 /// Spawns a task onto the executor.
385 ///
386 /// # Examples
387 ///
388 /// ```
389 /// use async_executor::LocalExecutor;
390 ///
391 /// let local_ex = LocalExecutor::new();
392 ///
393 /// let task = local_ex.spawn(async {
394 /// println!("Hello world");
395 /// });
396 /// ```
397 pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
398 let mut active = self.inner().state().active.lock().unwrap();
399
400 // Remove the task from the set of active tasks when the future finishes.
401 let index = active.vacant_entry().key();
402 let state = self.inner().state().clone();
403 let future = async move {
404 let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
405 future.await
406 };
407
408 // Create the task and register it in the set of active tasks.
409 let (runnable, task) = unsafe {
410 Builder::new()
411 .propagate_panic(true)
412 .spawn_unchecked(|()| future, self.schedule())
413 };
414 active.insert(runnable.waker());
415
416 runnable.schedule();
417 task
418 }
419
420 /// Attempts to run a task if at least one is scheduled.
421 ///
422 /// Running a scheduled task means simply polling its future once.
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// use async_executor::LocalExecutor;
428 ///
429 /// let ex = LocalExecutor::new();
430 /// assert!(!ex.try_tick()); // no tasks to run
431 ///
432 /// let task = ex.spawn(async {
433 /// println!("Hello world");
434 /// });
435 /// assert!(ex.try_tick()); // a task was found
436 /// ```
437 pub fn try_tick(&self) -> bool {
438 self.inner().try_tick()
439 }
440
441 /// Runs a single task.
442 ///
443 /// Running a task means simply polling its future once.
444 ///
445 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
446 ///
447 /// # Examples
448 ///
449 /// ```
450 /// use async_executor::LocalExecutor;
451 /// use futures_lite::future;
452 ///
453 /// let ex = LocalExecutor::new();
454 ///
455 /// let task = ex.spawn(async {
456 /// println!("Hello world");
457 /// });
458 /// future::block_on(ex.tick()); // runs the task
459 /// ```
460 pub async fn tick(&self) {
461 self.inner().tick().await
462 }
463
464 /// Runs the executor until the given future completes.
465 ///
466 /// # Examples
467 ///
468 /// ```
469 /// use async_executor::LocalExecutor;
470 /// use futures_lite::future;
471 ///
472 /// let local_ex = LocalExecutor::new();
473 ///
474 /// let task = local_ex.spawn(async { 1 + 2 });
475 /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
476 ///
477 /// assert_eq!(res, 6);
478 /// ```
479 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
480 self.inner().run(future).await
481 }
482
483 /// Returns a function that schedules a runnable task when it gets woken up.
484 fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
485 let state = self.inner().state().clone();
486
487 move |runnable| {
488 state.queue.push(runnable).unwrap();
489 state.notify();
490 }
491 }
492
493 /// Returns a reference to the inner executor.
494 fn inner(&self) -> &Executor<'a> {
495 &self.inner
496 }
497}
498
499impl<'a> Default for LocalExecutor<'a> {
500 fn default() -> LocalExecutor<'a> {
501 LocalExecutor::new()
502 }
503}
504
505/// The state of a executor.
506struct State {
507 /// The global queue.
508 queue: ConcurrentQueue<Runnable>,
509
510 /// Local queues created by runners.
511 local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
512
513 /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
514 notified: AtomicBool,
515
516 /// A list of sleeping tickers.
517 sleepers: Mutex<Sleepers>,
518
519 /// Currently active tasks.
520 active: Mutex<Slab<Waker>>,
521}
522
523impl State {
524 /// Creates state for a new executor.
525 fn new() -> State {
526 State {
527 queue: ConcurrentQueue::unbounded(),
528 local_queues: RwLock::new(Vec::new()),
529 notified: AtomicBool::new(true),
530 sleepers: Mutex::new(Sleepers {
531 count: 0,
532 wakers: Vec::new(),
533 free_ids: Vec::new(),
534 }),
535 active: Mutex::new(Slab::new()),
536 }
537 }
538
539 /// Notifies a sleeping ticker.
540 #[inline]
541 fn notify(&self) {
542 if self
543 .notified
544 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
545 .is_ok()
546 {
547 let waker = self.sleepers.lock().unwrap().notify();
548 if let Some(w) = waker {
549 w.wake();
550 }
551 }
552 }
553}
554
555/// A list of sleeping tickers.
556struct Sleepers {
557 /// Number of sleeping tickers (both notified and unnotified).
558 count: usize,
559
560 /// IDs and wakers of sleeping unnotified tickers.
561 ///
562 /// A sleeping ticker is notified when its waker is missing from this list.
563 wakers: Vec<(usize, Waker)>,
564
565 /// Reclaimed IDs.
566 free_ids: Vec<usize>,
567}
568
569impl Sleepers {
570 /// Inserts a new sleeping ticker.
571 fn insert(&mut self, waker: &Waker) -> usize {
572 let id = match self.free_ids.pop() {
573 Some(id) => id,
574 None => self.count + 1,
575 };
576 self.count += 1;
577 self.wakers.push((id, waker.clone()));
578 id
579 }
580
581 /// Re-inserts a sleeping ticker's waker if it was notified.
582 ///
583 /// Returns `true` if the ticker was notified.
584 fn update(&mut self, id: usize, waker: &Waker) -> bool {
585 for item in &mut self.wakers {
586 if item.0 == id {
587 if !item.1.will_wake(waker) {
588 item.1 = waker.clone();
589 }
590 return false;
591 }
592 }
593
594 self.wakers.push((id, waker.clone()));
595 true
596 }
597
598 /// Removes a previously inserted sleeping ticker.
599 ///
600 /// Returns `true` if the ticker was notified.
601 fn remove(&mut self, id: usize) -> bool {
602 self.count -= 1;
603 self.free_ids.push(id);
604
605 for i in (0..self.wakers.len()).rev() {
606 if self.wakers[i].0 == id {
607 self.wakers.remove(i);
608 return false;
609 }
610 }
611 true
612 }
613
614 /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
615 fn is_notified(&self) -> bool {
616 self.count == 0 || self.count > self.wakers.len()
617 }
618
619 /// Returns notification waker for a sleeping ticker.
620 ///
621 /// If a ticker was notified already or there are no tickers, `None` will be returned.
622 fn notify(&mut self) -> Option<Waker> {
623 if self.wakers.len() == self.count {
624 self.wakers.pop().map(|item| item.1)
625 } else {
626 None
627 }
628 }
629}
630
631/// Runs task one by one.
632struct Ticker<'a> {
633 /// The executor state.
634 state: &'a State,
635
636 /// Set to a non-zero sleeper ID when in sleeping state.
637 ///
638 /// States a ticker can be in:
639 /// 1) Woken.
640 /// 2a) Sleeping and unnotified.
641 /// 2b) Sleeping and notified.
642 sleeping: AtomicUsize,
643}
644
645impl Ticker<'_> {
646 /// Creates a ticker.
647 fn new(state: &State) -> Ticker<'_> {
648 Ticker {
649 state,
650 sleeping: AtomicUsize::new(0),
651 }
652 }
653
654 /// Moves the ticker into sleeping and unnotified state.
655 ///
656 /// Returns `false` if the ticker was already sleeping and unnotified.
657 fn sleep(&self, waker: &Waker) -> bool {
658 let mut sleepers = self.state.sleepers.lock().unwrap();
659
660 match self.sleeping.load(Ordering::SeqCst) {
661 // Move to sleeping state.
662 0 => self
663 .sleeping
664 .store(sleepers.insert(waker), Ordering::SeqCst),
665
666 // Already sleeping, check if notified.
667 id => {
668 if !sleepers.update(id, waker) {
669 return false;
670 }
671 }
672 }
673
674 self.state
675 .notified
676 .swap(sleepers.is_notified(), Ordering::SeqCst);
677
678 true
679 }
680
681 /// Moves the ticker into woken state.
682 fn wake(&self) {
683 let id = self.sleeping.swap(0, Ordering::SeqCst);
684 if id != 0 {
685 let mut sleepers = self.state.sleepers.lock().unwrap();
686 sleepers.remove(id);
687
688 self.state
689 .notified
690 .swap(sleepers.is_notified(), Ordering::SeqCst);
691 }
692 }
693
694 /// Waits for the next runnable task to run.
695 async fn runnable(&self) -> Runnable {
696 self.runnable_with(|| self.state.queue.pop().ok()).await
697 }
698
699 /// Waits for the next runnable task to run, given a function that searches for a task.
700 async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
701 future::poll_fn(|cx| {
702 loop {
703 match search() {
704 None => {
705 // Move to sleeping and unnotified state.
706 if !self.sleep(cx.waker()) {
707 // If already sleeping and unnotified, return.
708 return Poll::Pending;
709 }
710 }
711 Some(r) => {
712 // Wake up.
713 self.wake();
714
715 // Notify another ticker now to pick up where this ticker left off, just in
716 // case running the task takes a long time.
717 self.state.notify();
718
719 return Poll::Ready(r);
720 }
721 }
722 }
723 })
724 .await
725 }
726}
727
728impl Drop for Ticker<'_> {
729 fn drop(&mut self) {
730 // If this ticker is in sleeping state, it must be removed from the sleepers list.
731 let id: usize = self.sleeping.swap(val:0, order:Ordering::SeqCst);
732 if id != 0 {
733 let mut sleepers: MutexGuard<'_, Sleepers> = self.state.sleepers.lock().unwrap();
734 let notified: bool = sleepers.remove(id);
735
736 self.state
737 .notified
738 .swap(val:sleepers.is_notified(), order:Ordering::SeqCst);
739
740 // If this ticker was notified, then notify another ticker.
741 if notified {
742 drop(sleepers);
743 self.state.notify();
744 }
745 }
746 }
747}
748
749/// A worker in a work-stealing executor.
750///
751/// This is just a ticker that also has an associated local queue for improved cache locality.
752struct Runner<'a> {
753 /// The executor state.
754 state: &'a State,
755
756 /// Inner ticker.
757 ticker: Ticker<'a>,
758
759 /// The local queue.
760 local: Arc<ConcurrentQueue<Runnable>>,
761
762 /// Bumped every time a runnable task is found.
763 ticks: AtomicUsize,
764}
765
766impl Runner<'_> {
767 /// Creates a runner and registers it in the executor state.
768 fn new(state: &State) -> Runner<'_> {
769 let runner = Runner {
770 state,
771 ticker: Ticker::new(state),
772 local: Arc::new(ConcurrentQueue::bounded(512)),
773 ticks: AtomicUsize::new(0),
774 };
775 state
776 .local_queues
777 .write()
778 .unwrap()
779 .push(runner.local.clone());
780 runner
781 }
782
783 /// Waits for the next runnable task to run.
784 async fn runnable(&self, rng: &mut fastrand::Rng) -> Runnable {
785 let runnable = self
786 .ticker
787 .runnable_with(|| {
788 // Try the local queue.
789 if let Ok(r) = self.local.pop() {
790 return Some(r);
791 }
792
793 // Try stealing from the global queue.
794 if let Ok(r) = self.state.queue.pop() {
795 steal(&self.state.queue, &self.local);
796 return Some(r);
797 }
798
799 // Try stealing from other runners.
800 let local_queues = self.state.local_queues.read().unwrap();
801
802 // Pick a random starting point in the iterator list and rotate the list.
803 let n = local_queues.len();
804 let start = rng.usize(..n);
805 let iter = local_queues
806 .iter()
807 .chain(local_queues.iter())
808 .skip(start)
809 .take(n);
810
811 // Remove this runner's local queue.
812 let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
813
814 // Try stealing from each local queue in the list.
815 for local in iter {
816 steal(local, &self.local);
817 if let Ok(r) = self.local.pop() {
818 return Some(r);
819 }
820 }
821
822 None
823 })
824 .await;
825
826 // Bump the tick counter.
827 let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);
828
829 if ticks % 64 == 0 {
830 // Steal tasks from the global queue to ensure fair task scheduling.
831 steal(&self.state.queue, &self.local);
832 }
833
834 runnable
835 }
836}
837
838impl Drop for Runner<'_> {
839 fn drop(&mut self) {
840 // Remove the local queue.
841 self.state
842 .local_queues
843 .write()
844 .unwrap()
845 .retain(|local: &Arc>| !Arc::ptr_eq(this:local, &self.local));
846
847 // Re-schedule remaining tasks in the local queue.
848 while let Ok(r: Runnable) = self.local.pop() {
849 r.schedule();
850 }
851 }
852}
853
854/// Steals some items from one queue into another.
855fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
856 // Half of `src`'s length rounded up.
857 let mut count: usize = (src.len() + 1) / 2;
858
859 if count > 0 {
860 // Don't steal more than fits into the queue.
861 if let Some(cap: usize) = dest.capacity() {
862 count = count.min(cap - dest.len());
863 }
864
865 // Steal tasks.
866 for _ in 0..count {
867 if let Ok(t: T) = src.pop() {
868 assert!(dest.push(t).is_ok());
869 } else {
870 break;
871 }
872 }
873 }
874}
875
876/// Debug implementation for `Executor` and `LocalExecutor`.
877fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
878 // Get a reference to the state.
879 let state = match executor.state.get() {
880 Some(state) => state,
881 None => {
882 // The executor has not been initialized.
883 struct Uninitialized;
884
885 impl fmt::Debug for Uninitialized {
886 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
887 f.write_str("<uninitialized>")
888 }
889 }
890
891 return f.debug_tuple(name).field(&Uninitialized).finish();
892 }
893 };
894
895 /// Debug wrapper for the number of active tasks.
896 struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
897
898 impl fmt::Debug for ActiveTasks<'_> {
899 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
900 match self.0.try_lock() {
901 Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
902 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
903 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
904 }
905 }
906 }
907
908 /// Debug wrapper for the local runners.
909 struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
910
911 impl fmt::Debug for LocalRunners<'_> {
912 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
913 match self.0.try_read() {
914 Ok(lock) => f
915 .debug_list()
916 .entries(lock.iter().map(|queue| queue.len()))
917 .finish(),
918 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
919 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
920 }
921 }
922 }
923
924 /// Debug wrapper for the sleepers.
925 struct SleepCount<'a>(&'a Mutex<Sleepers>);
926
927 impl fmt::Debug for SleepCount<'_> {
928 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
929 match self.0.try_lock() {
930 Ok(lock) => fmt::Debug::fmt(&lock.count, f),
931 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
932 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
933 }
934 }
935 }
936
937 f.debug_struct(name)
938 .field("active", &ActiveTasks(&state.active))
939 .field("global_tasks", &state.queue.len())
940 .field("local_runners", &LocalRunners(&state.local_queues))
941 .field("sleepers", &SleepCount(&state.sleepers))
942 .finish()
943}
944
945/// Runs a closure when dropped.
946struct CallOnDrop<F: FnMut()>(F);
947
948impl<F: FnMut()> Drop for CallOnDrop<F> {
949 fn drop(&mut self) {
950 (self.0)();
951 }
952}
953
954fn _ensure_send_and_sync() {
955 use futures_lite::future::pending;
956
957 fn is_send<T: Send>(_: T) {}
958 fn is_sync<T: Sync>(_: T) {}
959
960 is_send::<Executor<'_>>(Executor::new());
961 is_sync::<Executor<'_>>(Executor::new());
962
963 let ex = Executor::new();
964 is_send(ex.run(pending::<()>()));
965 is_sync(ex.run(pending::<()>()));
966 is_send(ex.tick());
967 is_sync(ex.tick());
968
969 /// ```compile_fail
970 /// use async_executor::LocalExecutor;
971 /// use futures_lite::future::pending;
972 ///
973 /// fn is_send<T: Send>(_: T) {}
974 /// fn is_sync<T: Sync>(_: T) {}
975 ///
976 /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
977 /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
978 ///
979 /// let ex = LocalExecutor::new();
980 /// is_send(ex.run(pending::<()>()));
981 /// is_sync(ex.run(pending::<()>()));
982 /// is_send(ex.tick());
983 /// is_sync(ex.tick());
984 /// ```
985 fn _negative_test() {}
986}
987