1 | //! Async executors. |
2 | //! |
3 | //! This crate provides two reference executors that trade performance for |
4 | //! functionality. They should be considered reference executors that are "good |
5 | //! enough" for most use cases. For more specialized use cases, consider writing |
6 | //! your own executor on top of [`async-task`]. |
7 | //! |
8 | //! [`async-task`]: https://crates.io/crates/async-task |
9 | //! |
10 | //! # Examples |
11 | //! |
12 | //! ``` |
13 | //! use async_executor::Executor; |
14 | //! use futures_lite::future; |
15 | //! |
16 | //! // Create a new executor. |
17 | //! let ex = Executor::new(); |
18 | //! |
19 | //! // Spawn a task. |
20 | //! let task = ex.spawn(async { |
21 | //! println!("Hello world" ); |
22 | //! }); |
23 | //! |
24 | //! // Run the executor until the task completes. |
25 | //! future::block_on(ex.run(task)); |
26 | //! ``` |
27 | |
28 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
29 | #![doc ( |
30 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
31 | )] |
32 | #![doc ( |
33 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
34 | )] |
35 | |
36 | use std::fmt; |
37 | use std::future::Future; |
38 | use std::marker::PhantomData; |
39 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
40 | use std::rc::Rc; |
41 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
42 | use std::sync::{Arc, Mutex, RwLock, TryLockError}; |
43 | use std::task::{Poll, Waker}; |
44 | |
45 | use async_lock::OnceCell; |
46 | use async_task::{Builder, Runnable}; |
47 | use concurrent_queue::ConcurrentQueue; |
48 | use futures_lite::{future, prelude::*}; |
49 | use slab::Slab; |
50 | |
51 | #[doc (no_inline)] |
52 | pub use async_task::Task; |
53 | |
54 | /// An async executor. |
55 | /// |
56 | /// # Examples |
57 | /// |
58 | /// A multi-threaded executor: |
59 | /// |
60 | /// ``` |
61 | /// use async_channel::unbounded; |
62 | /// use async_executor::Executor; |
63 | /// use easy_parallel::Parallel; |
64 | /// use futures_lite::future; |
65 | /// |
66 | /// let ex = Executor::new(); |
67 | /// let (signal, shutdown) = unbounded::<()>(); |
68 | /// |
69 | /// Parallel::new() |
70 | /// // Run four executor threads. |
71 | /// .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) |
72 | /// // Run the main future on the current thread. |
73 | /// .finish(|| future::block_on(async { |
74 | /// println!("Hello world!" ); |
75 | /// drop(signal); |
76 | /// })); |
77 | /// ``` |
78 | pub struct Executor<'a> { |
79 | /// The executor state. |
80 | state: OnceCell<Arc<State>>, |
81 | |
82 | /// Makes the `'a` lifetime invariant. |
83 | _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>, |
84 | } |
85 | |
86 | unsafe impl Send for Executor<'_> {} |
87 | unsafe impl Sync for Executor<'_> {} |
88 | |
89 | impl UnwindSafe for Executor<'_> {} |
90 | impl RefUnwindSafe for Executor<'_> {} |
91 | |
92 | impl fmt::Debug for Executor<'_> { |
93 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
94 | debug_executor(self, name:"Executor" , f) |
95 | } |
96 | } |
97 | |
98 | impl<'a> Executor<'a> { |
99 | /// Creates a new executor. |
100 | /// |
101 | /// # Examples |
102 | /// |
103 | /// ``` |
104 | /// use async_executor::Executor; |
105 | /// |
106 | /// let ex = Executor::new(); |
107 | /// ``` |
108 | pub const fn new() -> Executor<'a> { |
109 | Executor { |
110 | state: OnceCell::new(), |
111 | _marker: PhantomData, |
112 | } |
113 | } |
114 | |
115 | /// Returns `true` if there are no unfinished tasks. |
116 | /// |
117 | /// # Examples |
118 | /// |
119 | /// ``` |
120 | /// use async_executor::Executor; |
121 | /// |
122 | /// let ex = Executor::new(); |
123 | /// assert!(ex.is_empty()); |
124 | /// |
125 | /// let task = ex.spawn(async { |
126 | /// println!("Hello world" ); |
127 | /// }); |
128 | /// assert!(!ex.is_empty()); |
129 | /// |
130 | /// assert!(ex.try_tick()); |
131 | /// assert!(ex.is_empty()); |
132 | /// ``` |
133 | pub fn is_empty(&self) -> bool { |
134 | self.state().active.lock().unwrap().is_empty() |
135 | } |
136 | |
137 | /// Spawns a task onto the executor. |
138 | /// |
139 | /// # Examples |
140 | /// |
141 | /// ``` |
142 | /// use async_executor::Executor; |
143 | /// |
144 | /// let ex = Executor::new(); |
145 | /// |
146 | /// let task = ex.spawn(async { |
147 | /// println!("Hello world" ); |
148 | /// }); |
149 | /// ``` |
150 | pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> { |
151 | let mut active = self.state().active.lock().unwrap(); |
152 | |
153 | // Remove the task from the set of active tasks when the future finishes. |
154 | let index = active.vacant_entry().key(); |
155 | let state = self.state().clone(); |
156 | let future = async move { |
157 | let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); |
158 | future.await |
159 | }; |
160 | |
161 | // Create the task and register it in the set of active tasks. |
162 | let (runnable, task) = unsafe { |
163 | Builder::new() |
164 | .propagate_panic(true) |
165 | .spawn_unchecked(|()| future, self.schedule()) |
166 | }; |
167 | active.insert(runnable.waker()); |
168 | |
169 | runnable.schedule(); |
170 | task |
171 | } |
172 | |
173 | /// Attempts to run a task if at least one is scheduled. |
174 | /// |
175 | /// Running a scheduled task means simply polling its future once. |
176 | /// |
177 | /// # Examples |
178 | /// |
179 | /// ``` |
180 | /// use async_executor::Executor; |
181 | /// |
182 | /// let ex = Executor::new(); |
183 | /// assert!(!ex.try_tick()); // no tasks to run |
184 | /// |
185 | /// let task = ex.spawn(async { |
186 | /// println!("Hello world" ); |
187 | /// }); |
188 | /// assert!(ex.try_tick()); // a task was found |
189 | /// ``` |
190 | pub fn try_tick(&self) -> bool { |
191 | match self.state().queue.pop() { |
192 | Err(_) => false, |
193 | Ok(runnable) => { |
194 | // Notify another ticker now to pick up where this ticker left off, just in case |
195 | // running the task takes a long time. |
196 | self.state().notify(); |
197 | |
198 | // Run the task. |
199 | runnable.run(); |
200 | true |
201 | } |
202 | } |
203 | } |
204 | |
205 | /// Runs a single task. |
206 | /// |
207 | /// Running a task means simply polling its future once. |
208 | /// |
209 | /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. |
210 | /// |
211 | /// # Examples |
212 | /// |
213 | /// ``` |
214 | /// use async_executor::Executor; |
215 | /// use futures_lite::future; |
216 | /// |
217 | /// let ex = Executor::new(); |
218 | /// |
219 | /// let task = ex.spawn(async { |
220 | /// println!("Hello world" ); |
221 | /// }); |
222 | /// future::block_on(ex.tick()); // runs the task |
223 | /// ``` |
224 | pub async fn tick(&self) { |
225 | let state = self.state(); |
226 | let runnable = Ticker::new(state).runnable().await; |
227 | runnable.run(); |
228 | } |
229 | |
230 | /// Runs the executor until the given future completes. |
231 | /// |
232 | /// # Examples |
233 | /// |
234 | /// ``` |
235 | /// use async_executor::Executor; |
236 | /// use futures_lite::future; |
237 | /// |
238 | /// let ex = Executor::new(); |
239 | /// |
240 | /// let task = ex.spawn(async { 1 + 2 }); |
241 | /// let res = future::block_on(ex.run(async { task.await * 2 })); |
242 | /// |
243 | /// assert_eq!(res, 6); |
244 | /// ``` |
245 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
246 | let runner = Runner::new(self.state()); |
247 | let mut rng = fastrand::Rng::new(); |
248 | |
249 | // A future that runs tasks forever. |
250 | let run_forever = async { |
251 | loop { |
252 | for _ in 0..200 { |
253 | let runnable = runner.runnable(&mut rng).await; |
254 | runnable.run(); |
255 | } |
256 | future::yield_now().await; |
257 | } |
258 | }; |
259 | |
260 | // Run `future` and `run_forever` concurrently until `future` completes. |
261 | future.or(run_forever).await |
262 | } |
263 | |
264 | /// Returns a function that schedules a runnable task when it gets woken up. |
265 | fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { |
266 | let state = self.state().clone(); |
267 | |
268 | // TODO: If possible, push into the current local queue and notify the ticker. |
269 | move |runnable| { |
270 | state.queue.push(runnable).unwrap(); |
271 | state.notify(); |
272 | } |
273 | } |
274 | |
275 | /// Returns a reference to the inner state. |
276 | fn state(&self) -> &Arc<State> { |
277 | #[cfg (not(target_family = "wasm" ))] |
278 | { |
279 | return self.state.get_or_init_blocking(|| Arc::new(State::new())); |
280 | } |
281 | |
282 | // Some projects use this on WASM for some reason. In this case get_or_init_blocking |
283 | // doesn't work. Just poll the future once and panic if there is contention. |
284 | #[cfg (target_family = "wasm" )] |
285 | future::block_on(future::poll_once( |
286 | self.state.get_or_init(|| async { Arc::new(State::new()) }), |
287 | )) |
288 | .expect("encountered contention on WASM" ) |
289 | } |
290 | } |
291 | |
292 | impl Drop for Executor<'_> { |
293 | fn drop(&mut self) { |
294 | if let Some(state: &Arc) = self.state.get() { |
295 | let mut active: MutexGuard<'_, Slab> = state.active.lock().unwrap(); |
296 | for w: Waker in active.drain() { |
297 | w.wake(); |
298 | } |
299 | drop(active); |
300 | |
301 | while state.queue.pop().is_ok() {} |
302 | } |
303 | } |
304 | } |
305 | |
306 | impl<'a> Default for Executor<'a> { |
307 | fn default() -> Executor<'a> { |
308 | Executor::new() |
309 | } |
310 | } |
311 | |
312 | /// A thread-local executor. |
313 | /// |
314 | /// The executor can only be run on the thread that created it. |
315 | /// |
316 | /// # Examples |
317 | /// |
318 | /// ``` |
319 | /// use async_executor::LocalExecutor; |
320 | /// use futures_lite::future; |
321 | /// |
322 | /// let local_ex = LocalExecutor::new(); |
323 | /// |
324 | /// future::block_on(local_ex.run(async { |
325 | /// println!("Hello world!" ); |
326 | /// })); |
327 | /// ``` |
328 | pub struct LocalExecutor<'a> { |
329 | /// The inner executor. |
330 | inner: Executor<'a>, |
331 | |
332 | /// Makes the type `!Send` and `!Sync`. |
333 | _marker: PhantomData<Rc<()>>, |
334 | } |
335 | |
336 | impl UnwindSafe for LocalExecutor<'_> {} |
337 | impl RefUnwindSafe for LocalExecutor<'_> {} |
338 | |
339 | impl fmt::Debug for LocalExecutor<'_> { |
340 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
341 | debug_executor(&self.inner, name:"LocalExecutor" , f) |
342 | } |
343 | } |
344 | |
345 | impl<'a> LocalExecutor<'a> { |
346 | /// Creates a single-threaded executor. |
347 | /// |
348 | /// # Examples |
349 | /// |
350 | /// ``` |
351 | /// use async_executor::LocalExecutor; |
352 | /// |
353 | /// let local_ex = LocalExecutor::new(); |
354 | /// ``` |
355 | pub const fn new() -> LocalExecutor<'a> { |
356 | LocalExecutor { |
357 | inner: Executor::new(), |
358 | _marker: PhantomData, |
359 | } |
360 | } |
361 | |
362 | /// Returns `true` if there are no unfinished tasks. |
363 | /// |
364 | /// # Examples |
365 | /// |
366 | /// ``` |
367 | /// use async_executor::LocalExecutor; |
368 | /// |
369 | /// let local_ex = LocalExecutor::new(); |
370 | /// assert!(local_ex.is_empty()); |
371 | /// |
372 | /// let task = local_ex.spawn(async { |
373 | /// println!("Hello world" ); |
374 | /// }); |
375 | /// assert!(!local_ex.is_empty()); |
376 | /// |
377 | /// assert!(local_ex.try_tick()); |
378 | /// assert!(local_ex.is_empty()); |
379 | /// ``` |
380 | pub fn is_empty(&self) -> bool { |
381 | self.inner().is_empty() |
382 | } |
383 | |
384 | /// Spawns a task onto the executor. |
385 | /// |
386 | /// # Examples |
387 | /// |
388 | /// ``` |
389 | /// use async_executor::LocalExecutor; |
390 | /// |
391 | /// let local_ex = LocalExecutor::new(); |
392 | /// |
393 | /// let task = local_ex.spawn(async { |
394 | /// println!("Hello world" ); |
395 | /// }); |
396 | /// ``` |
397 | pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> { |
398 | let mut active = self.inner().state().active.lock().unwrap(); |
399 | |
400 | // Remove the task from the set of active tasks when the future finishes. |
401 | let index = active.vacant_entry().key(); |
402 | let state = self.inner().state().clone(); |
403 | let future = async move { |
404 | let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); |
405 | future.await |
406 | }; |
407 | |
408 | // Create the task and register it in the set of active tasks. |
409 | let (runnable, task) = unsafe { |
410 | Builder::new() |
411 | .propagate_panic(true) |
412 | .spawn_unchecked(|()| future, self.schedule()) |
413 | }; |
414 | active.insert(runnable.waker()); |
415 | |
416 | runnable.schedule(); |
417 | task |
418 | } |
419 | |
420 | /// Attempts to run a task if at least one is scheduled. |
421 | /// |
422 | /// Running a scheduled task means simply polling its future once. |
423 | /// |
424 | /// # Examples |
425 | /// |
426 | /// ``` |
427 | /// use async_executor::LocalExecutor; |
428 | /// |
429 | /// let ex = LocalExecutor::new(); |
430 | /// assert!(!ex.try_tick()); // no tasks to run |
431 | /// |
432 | /// let task = ex.spawn(async { |
433 | /// println!("Hello world" ); |
434 | /// }); |
435 | /// assert!(ex.try_tick()); // a task was found |
436 | /// ``` |
437 | pub fn try_tick(&self) -> bool { |
438 | self.inner().try_tick() |
439 | } |
440 | |
441 | /// Runs a single task. |
442 | /// |
443 | /// Running a task means simply polling its future once. |
444 | /// |
445 | /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. |
446 | /// |
447 | /// # Examples |
448 | /// |
449 | /// ``` |
450 | /// use async_executor::LocalExecutor; |
451 | /// use futures_lite::future; |
452 | /// |
453 | /// let ex = LocalExecutor::new(); |
454 | /// |
455 | /// let task = ex.spawn(async { |
456 | /// println!("Hello world" ); |
457 | /// }); |
458 | /// future::block_on(ex.tick()); // runs the task |
459 | /// ``` |
460 | pub async fn tick(&self) { |
461 | self.inner().tick().await |
462 | } |
463 | |
464 | /// Runs the executor until the given future completes. |
465 | /// |
466 | /// # Examples |
467 | /// |
468 | /// ``` |
469 | /// use async_executor::LocalExecutor; |
470 | /// use futures_lite::future; |
471 | /// |
472 | /// let local_ex = LocalExecutor::new(); |
473 | /// |
474 | /// let task = local_ex.spawn(async { 1 + 2 }); |
475 | /// let res = future::block_on(local_ex.run(async { task.await * 2 })); |
476 | /// |
477 | /// assert_eq!(res, 6); |
478 | /// ``` |
479 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
480 | self.inner().run(future).await |
481 | } |
482 | |
483 | /// Returns a function that schedules a runnable task when it gets woken up. |
484 | fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { |
485 | let state = self.inner().state().clone(); |
486 | |
487 | move |runnable| { |
488 | state.queue.push(runnable).unwrap(); |
489 | state.notify(); |
490 | } |
491 | } |
492 | |
493 | /// Returns a reference to the inner executor. |
494 | fn inner(&self) -> &Executor<'a> { |
495 | &self.inner |
496 | } |
497 | } |
498 | |
499 | impl<'a> Default for LocalExecutor<'a> { |
500 | fn default() -> LocalExecutor<'a> { |
501 | LocalExecutor::new() |
502 | } |
503 | } |
504 | |
505 | /// The state of a executor. |
506 | struct State { |
507 | /// The global queue. |
508 | queue: ConcurrentQueue<Runnable>, |
509 | |
510 | /// Local queues created by runners. |
511 | local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>, |
512 | |
513 | /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. |
514 | notified: AtomicBool, |
515 | |
516 | /// A list of sleeping tickers. |
517 | sleepers: Mutex<Sleepers>, |
518 | |
519 | /// Currently active tasks. |
520 | active: Mutex<Slab<Waker>>, |
521 | } |
522 | |
523 | impl State { |
524 | /// Creates state for a new executor. |
525 | fn new() -> State { |
526 | State { |
527 | queue: ConcurrentQueue::unbounded(), |
528 | local_queues: RwLock::new(Vec::new()), |
529 | notified: AtomicBool::new(true), |
530 | sleepers: Mutex::new(Sleepers { |
531 | count: 0, |
532 | wakers: Vec::new(), |
533 | free_ids: Vec::new(), |
534 | }), |
535 | active: Mutex::new(Slab::new()), |
536 | } |
537 | } |
538 | |
539 | /// Notifies a sleeping ticker. |
540 | #[inline ] |
541 | fn notify(&self) { |
542 | if self |
543 | .notified |
544 | .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) |
545 | .is_ok() |
546 | { |
547 | let waker = self.sleepers.lock().unwrap().notify(); |
548 | if let Some(w) = waker { |
549 | w.wake(); |
550 | } |
551 | } |
552 | } |
553 | } |
554 | |
555 | /// A list of sleeping tickers. |
556 | struct Sleepers { |
557 | /// Number of sleeping tickers (both notified and unnotified). |
558 | count: usize, |
559 | |
560 | /// IDs and wakers of sleeping unnotified tickers. |
561 | /// |
562 | /// A sleeping ticker is notified when its waker is missing from this list. |
563 | wakers: Vec<(usize, Waker)>, |
564 | |
565 | /// Reclaimed IDs. |
566 | free_ids: Vec<usize>, |
567 | } |
568 | |
569 | impl Sleepers { |
570 | /// Inserts a new sleeping ticker. |
571 | fn insert(&mut self, waker: &Waker) -> usize { |
572 | let id = match self.free_ids.pop() { |
573 | Some(id) => id, |
574 | None => self.count + 1, |
575 | }; |
576 | self.count += 1; |
577 | self.wakers.push((id, waker.clone())); |
578 | id |
579 | } |
580 | |
581 | /// Re-inserts a sleeping ticker's waker if it was notified. |
582 | /// |
583 | /// Returns `true` if the ticker was notified. |
584 | fn update(&mut self, id: usize, waker: &Waker) -> bool { |
585 | for item in &mut self.wakers { |
586 | if item.0 == id { |
587 | if !item.1.will_wake(waker) { |
588 | item.1 = waker.clone(); |
589 | } |
590 | return false; |
591 | } |
592 | } |
593 | |
594 | self.wakers.push((id, waker.clone())); |
595 | true |
596 | } |
597 | |
598 | /// Removes a previously inserted sleeping ticker. |
599 | /// |
600 | /// Returns `true` if the ticker was notified. |
601 | fn remove(&mut self, id: usize) -> bool { |
602 | self.count -= 1; |
603 | self.free_ids.push(id); |
604 | |
605 | for i in (0..self.wakers.len()).rev() { |
606 | if self.wakers[i].0 == id { |
607 | self.wakers.remove(i); |
608 | return false; |
609 | } |
610 | } |
611 | true |
612 | } |
613 | |
614 | /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. |
615 | fn is_notified(&self) -> bool { |
616 | self.count == 0 || self.count > self.wakers.len() |
617 | } |
618 | |
619 | /// Returns notification waker for a sleeping ticker. |
620 | /// |
621 | /// If a ticker was notified already or there are no tickers, `None` will be returned. |
622 | fn notify(&mut self) -> Option<Waker> { |
623 | if self.wakers.len() == self.count { |
624 | self.wakers.pop().map(|item| item.1) |
625 | } else { |
626 | None |
627 | } |
628 | } |
629 | } |
630 | |
631 | /// Runs task one by one. |
632 | struct Ticker<'a> { |
633 | /// The executor state. |
634 | state: &'a State, |
635 | |
636 | /// Set to a non-zero sleeper ID when in sleeping state. |
637 | /// |
638 | /// States a ticker can be in: |
639 | /// 1) Woken. |
640 | /// 2a) Sleeping and unnotified. |
641 | /// 2b) Sleeping and notified. |
642 | sleeping: AtomicUsize, |
643 | } |
644 | |
645 | impl Ticker<'_> { |
646 | /// Creates a ticker. |
647 | fn new(state: &State) -> Ticker<'_> { |
648 | Ticker { |
649 | state, |
650 | sleeping: AtomicUsize::new(0), |
651 | } |
652 | } |
653 | |
654 | /// Moves the ticker into sleeping and unnotified state. |
655 | /// |
656 | /// Returns `false` if the ticker was already sleeping and unnotified. |
657 | fn sleep(&self, waker: &Waker) -> bool { |
658 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
659 | |
660 | match self.sleeping.load(Ordering::SeqCst) { |
661 | // Move to sleeping state. |
662 | 0 => self |
663 | .sleeping |
664 | .store(sleepers.insert(waker), Ordering::SeqCst), |
665 | |
666 | // Already sleeping, check if notified. |
667 | id => { |
668 | if !sleepers.update(id, waker) { |
669 | return false; |
670 | } |
671 | } |
672 | } |
673 | |
674 | self.state |
675 | .notified |
676 | .swap(sleepers.is_notified(), Ordering::SeqCst); |
677 | |
678 | true |
679 | } |
680 | |
681 | /// Moves the ticker into woken state. |
682 | fn wake(&self) { |
683 | let id = self.sleeping.swap(0, Ordering::SeqCst); |
684 | if id != 0 { |
685 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
686 | sleepers.remove(id); |
687 | |
688 | self.state |
689 | .notified |
690 | .swap(sleepers.is_notified(), Ordering::SeqCst); |
691 | } |
692 | } |
693 | |
694 | /// Waits for the next runnable task to run. |
695 | async fn runnable(&self) -> Runnable { |
696 | self.runnable_with(|| self.state.queue.pop().ok()).await |
697 | } |
698 | |
699 | /// Waits for the next runnable task to run, given a function that searches for a task. |
700 | async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable { |
701 | future::poll_fn(|cx| { |
702 | loop { |
703 | match search() { |
704 | None => { |
705 | // Move to sleeping and unnotified state. |
706 | if !self.sleep(cx.waker()) { |
707 | // If already sleeping and unnotified, return. |
708 | return Poll::Pending; |
709 | } |
710 | } |
711 | Some(r) => { |
712 | // Wake up. |
713 | self.wake(); |
714 | |
715 | // Notify another ticker now to pick up where this ticker left off, just in |
716 | // case running the task takes a long time. |
717 | self.state.notify(); |
718 | |
719 | return Poll::Ready(r); |
720 | } |
721 | } |
722 | } |
723 | }) |
724 | .await |
725 | } |
726 | } |
727 | |
728 | impl Drop for Ticker<'_> { |
729 | fn drop(&mut self) { |
730 | // If this ticker is in sleeping state, it must be removed from the sleepers list. |
731 | let id: usize = self.sleeping.swap(val:0, order:Ordering::SeqCst); |
732 | if id != 0 { |
733 | let mut sleepers: MutexGuard<'_, Sleepers> = self.state.sleepers.lock().unwrap(); |
734 | let notified: bool = sleepers.remove(id); |
735 | |
736 | self.state |
737 | .notified |
738 | .swap(val:sleepers.is_notified(), order:Ordering::SeqCst); |
739 | |
740 | // If this ticker was notified, then notify another ticker. |
741 | if notified { |
742 | drop(sleepers); |
743 | self.state.notify(); |
744 | } |
745 | } |
746 | } |
747 | } |
748 | |
749 | /// A worker in a work-stealing executor. |
750 | /// |
751 | /// This is just a ticker that also has an associated local queue for improved cache locality. |
752 | struct Runner<'a> { |
753 | /// The executor state. |
754 | state: &'a State, |
755 | |
756 | /// Inner ticker. |
757 | ticker: Ticker<'a>, |
758 | |
759 | /// The local queue. |
760 | local: Arc<ConcurrentQueue<Runnable>>, |
761 | |
762 | /// Bumped every time a runnable task is found. |
763 | ticks: AtomicUsize, |
764 | } |
765 | |
766 | impl Runner<'_> { |
767 | /// Creates a runner and registers it in the executor state. |
768 | fn new(state: &State) -> Runner<'_> { |
769 | let runner = Runner { |
770 | state, |
771 | ticker: Ticker::new(state), |
772 | local: Arc::new(ConcurrentQueue::bounded(512)), |
773 | ticks: AtomicUsize::new(0), |
774 | }; |
775 | state |
776 | .local_queues |
777 | .write() |
778 | .unwrap() |
779 | .push(runner.local.clone()); |
780 | runner |
781 | } |
782 | |
783 | /// Waits for the next runnable task to run. |
784 | async fn runnable(&self, rng: &mut fastrand::Rng) -> Runnable { |
785 | let runnable = self |
786 | .ticker |
787 | .runnable_with(|| { |
788 | // Try the local queue. |
789 | if let Ok(r) = self.local.pop() { |
790 | return Some(r); |
791 | } |
792 | |
793 | // Try stealing from the global queue. |
794 | if let Ok(r) = self.state.queue.pop() { |
795 | steal(&self.state.queue, &self.local); |
796 | return Some(r); |
797 | } |
798 | |
799 | // Try stealing from other runners. |
800 | let local_queues = self.state.local_queues.read().unwrap(); |
801 | |
802 | // Pick a random starting point in the iterator list and rotate the list. |
803 | let n = local_queues.len(); |
804 | let start = rng.usize(..n); |
805 | let iter = local_queues |
806 | .iter() |
807 | .chain(local_queues.iter()) |
808 | .skip(start) |
809 | .take(n); |
810 | |
811 | // Remove this runner's local queue. |
812 | let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); |
813 | |
814 | // Try stealing from each local queue in the list. |
815 | for local in iter { |
816 | steal(local, &self.local); |
817 | if let Ok(r) = self.local.pop() { |
818 | return Some(r); |
819 | } |
820 | } |
821 | |
822 | None |
823 | }) |
824 | .await; |
825 | |
826 | // Bump the tick counter. |
827 | let ticks = self.ticks.fetch_add(1, Ordering::SeqCst); |
828 | |
829 | if ticks % 64 == 0 { |
830 | // Steal tasks from the global queue to ensure fair task scheduling. |
831 | steal(&self.state.queue, &self.local); |
832 | } |
833 | |
834 | runnable |
835 | } |
836 | } |
837 | |
838 | impl Drop for Runner<'_> { |
839 | fn drop(&mut self) { |
840 | // Remove the local queue. |
841 | self.state |
842 | .local_queues |
843 | .write() |
844 | .unwrap() |
845 | .retain(|local: &Arc>| !Arc::ptr_eq(this:local, &self.local)); |
846 | |
847 | // Re-schedule remaining tasks in the local queue. |
848 | while let Ok(r: Runnable) = self.local.pop() { |
849 | r.schedule(); |
850 | } |
851 | } |
852 | } |
853 | |
854 | /// Steals some items from one queue into another. |
855 | fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) { |
856 | // Half of `src`'s length rounded up. |
857 | let mut count: usize = (src.len() + 1) / 2; |
858 | |
859 | if count > 0 { |
860 | // Don't steal more than fits into the queue. |
861 | if let Some(cap: usize) = dest.capacity() { |
862 | count = count.min(cap - dest.len()); |
863 | } |
864 | |
865 | // Steal tasks. |
866 | for _ in 0..count { |
867 | if let Ok(t: T) = src.pop() { |
868 | assert!(dest.push(t).is_ok()); |
869 | } else { |
870 | break; |
871 | } |
872 | } |
873 | } |
874 | } |
875 | |
876 | /// Debug implementation for `Executor` and `LocalExecutor`. |
877 | fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
878 | // Get a reference to the state. |
879 | let state = match executor.state.get() { |
880 | Some(state) => state, |
881 | None => { |
882 | // The executor has not been initialized. |
883 | struct Uninitialized; |
884 | |
885 | impl fmt::Debug for Uninitialized { |
886 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
887 | f.write_str("<uninitialized>" ) |
888 | } |
889 | } |
890 | |
891 | return f.debug_tuple(name).field(&Uninitialized).finish(); |
892 | } |
893 | }; |
894 | |
895 | /// Debug wrapper for the number of active tasks. |
896 | struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>); |
897 | |
898 | impl fmt::Debug for ActiveTasks<'_> { |
899 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
900 | match self.0.try_lock() { |
901 | Ok(lock) => fmt::Debug::fmt(&lock.len(), f), |
902 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
903 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
904 | } |
905 | } |
906 | } |
907 | |
908 | /// Debug wrapper for the local runners. |
909 | struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>); |
910 | |
911 | impl fmt::Debug for LocalRunners<'_> { |
912 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
913 | match self.0.try_read() { |
914 | Ok(lock) => f |
915 | .debug_list() |
916 | .entries(lock.iter().map(|queue| queue.len())) |
917 | .finish(), |
918 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
919 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
920 | } |
921 | } |
922 | } |
923 | |
924 | /// Debug wrapper for the sleepers. |
925 | struct SleepCount<'a>(&'a Mutex<Sleepers>); |
926 | |
927 | impl fmt::Debug for SleepCount<'_> { |
928 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
929 | match self.0.try_lock() { |
930 | Ok(lock) => fmt::Debug::fmt(&lock.count, f), |
931 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
932 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
933 | } |
934 | } |
935 | } |
936 | |
937 | f.debug_struct(name) |
938 | .field("active" , &ActiveTasks(&state.active)) |
939 | .field("global_tasks" , &state.queue.len()) |
940 | .field("local_runners" , &LocalRunners(&state.local_queues)) |
941 | .field("sleepers" , &SleepCount(&state.sleepers)) |
942 | .finish() |
943 | } |
944 | |
945 | /// Runs a closure when dropped. |
946 | struct CallOnDrop<F: FnMut()>(F); |
947 | |
948 | impl<F: FnMut()> Drop for CallOnDrop<F> { |
949 | fn drop(&mut self) { |
950 | (self.0)(); |
951 | } |
952 | } |
953 | |
954 | fn _ensure_send_and_sync() { |
955 | use futures_lite::future::pending; |
956 | |
957 | fn is_send<T: Send>(_: T) {} |
958 | fn is_sync<T: Sync>(_: T) {} |
959 | |
960 | is_send::<Executor<'_>>(Executor::new()); |
961 | is_sync::<Executor<'_>>(Executor::new()); |
962 | |
963 | let ex = Executor::new(); |
964 | is_send(ex.run(pending::<()>())); |
965 | is_sync(ex.run(pending::<()>())); |
966 | is_send(ex.tick()); |
967 | is_sync(ex.tick()); |
968 | |
969 | /// ```compile_fail |
970 | /// use async_executor::LocalExecutor; |
971 | /// use futures_lite::future::pending; |
972 | /// |
973 | /// fn is_send<T: Send>(_: T) {} |
974 | /// fn is_sync<T: Sync>(_: T) {} |
975 | /// |
976 | /// is_send::<LocalExecutor<'_>>(LocalExecutor::new()); |
977 | /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new()); |
978 | /// |
979 | /// let ex = LocalExecutor::new(); |
980 | /// is_send(ex.run(pending::<()>())); |
981 | /// is_sync(ex.run(pending::<()>())); |
982 | /// is_send(ex.tick()); |
983 | /// is_sync(ex.tick()); |
984 | /// ``` |
985 | fn _negative_test() {} |
986 | } |
987 | |