| 1 | //! Async executors. |
| 2 | //! |
| 3 | //! This crate provides two reference executors that trade performance for |
| 4 | //! functionality. They should be considered reference executors that are "good |
| 5 | //! enough" for most use cases. For more specialized use cases, consider writing |
| 6 | //! your own executor on top of [`async-task`]. |
| 7 | //! |
| 8 | //! [`async-task`]: https://crates.io/crates/async-task |
| 9 | //! |
| 10 | //! # Examples |
| 11 | //! |
| 12 | //! ``` |
| 13 | //! use async_executor::Executor; |
| 14 | //! use futures_lite::future; |
| 15 | //! |
| 16 | //! // Create a new executor. |
| 17 | //! let ex = Executor::new(); |
| 18 | //! |
| 19 | //! // Spawn a task. |
| 20 | //! let task = ex.spawn(async { |
| 21 | //! println!("Hello world" ); |
| 22 | //! }); |
| 23 | //! |
| 24 | //! // Run the executor until the task completes. |
| 25 | //! future::block_on(ex.run(task)); |
| 26 | //! ``` |
| 27 | |
| 28 | #![warn ( |
| 29 | missing_docs, |
| 30 | missing_debug_implementations, |
| 31 | rust_2018_idioms, |
| 32 | clippy::undocumented_unsafe_blocks |
| 33 | )] |
| 34 | #![doc ( |
| 35 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
| 36 | )] |
| 37 | #![doc ( |
| 38 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
| 39 | )] |
| 40 | #![cfg_attr (docsrs, feature(doc_cfg, doc_auto_cfg))] |
| 41 | |
| 42 | use std::fmt; |
| 43 | use std::marker::PhantomData; |
| 44 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
| 45 | use std::rc::Rc; |
| 46 | use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; |
| 47 | use std::sync::{Arc, Mutex, RwLock, TryLockError}; |
| 48 | use std::task::{Poll, Waker}; |
| 49 | |
| 50 | use async_task::{Builder, Runnable}; |
| 51 | use concurrent_queue::ConcurrentQueue; |
| 52 | use futures_lite::{future, prelude::*}; |
| 53 | use slab::Slab; |
| 54 | |
| 55 | #[cfg (feature = "static" )] |
| 56 | mod static_executors; |
| 57 | |
| 58 | #[doc (no_inline)] |
| 59 | pub use async_task::{FallibleTask, Task}; |
| 60 | #[cfg (feature = "static" )] |
| 61 | #[cfg_attr (docsrs, doc(cfg(any(feature = "static" ))))] |
| 62 | pub use static_executors::*; |
| 63 | |
| 64 | /// An async executor. |
| 65 | /// |
| 66 | /// # Examples |
| 67 | /// |
| 68 | /// A multi-threaded executor: |
| 69 | /// |
| 70 | /// ``` |
| 71 | /// use async_channel::unbounded; |
| 72 | /// use async_executor::Executor; |
| 73 | /// use easy_parallel::Parallel; |
| 74 | /// use futures_lite::future; |
| 75 | /// |
| 76 | /// let ex = Executor::new(); |
| 77 | /// let (signal, shutdown) = unbounded::<()>(); |
| 78 | /// |
| 79 | /// Parallel::new() |
| 80 | /// // Run four executor threads. |
| 81 | /// .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) |
| 82 | /// // Run the main future on the current thread. |
| 83 | /// .finish(|| future::block_on(async { |
| 84 | /// println!("Hello world!" ); |
| 85 | /// drop(signal); |
| 86 | /// })); |
| 87 | /// ``` |
| 88 | pub struct Executor<'a> { |
| 89 | /// The executor state. |
| 90 | state: AtomicPtr<State>, |
| 91 | |
| 92 | /// Makes the `'a` lifetime invariant. |
| 93 | _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>, |
| 94 | } |
| 95 | |
| 96 | // SAFETY: Executor stores no thread local state that can be accessed via other thread. |
| 97 | unsafe impl Send for Executor<'_> {} |
| 98 | // SAFETY: Executor internally synchronizes all of it's operations internally. |
| 99 | unsafe impl Sync for Executor<'_> {} |
| 100 | |
| 101 | impl UnwindSafe for Executor<'_> {} |
| 102 | impl RefUnwindSafe for Executor<'_> {} |
| 103 | |
| 104 | impl fmt::Debug for Executor<'_> { |
| 105 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 106 | debug_executor(self, name:"Executor" , f) |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | impl<'a> Executor<'a> { |
| 111 | /// Creates a new executor. |
| 112 | /// |
| 113 | /// # Examples |
| 114 | /// |
| 115 | /// ``` |
| 116 | /// use async_executor::Executor; |
| 117 | /// |
| 118 | /// let ex = Executor::new(); |
| 119 | /// ``` |
| 120 | pub const fn new() -> Executor<'a> { |
| 121 | Executor { |
| 122 | state: AtomicPtr::new(std::ptr::null_mut()), |
| 123 | _marker: PhantomData, |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | /// Returns `true` if there are no unfinished tasks. |
| 128 | /// |
| 129 | /// # Examples |
| 130 | /// |
| 131 | /// ``` |
| 132 | /// use async_executor::Executor; |
| 133 | /// |
| 134 | /// let ex = Executor::new(); |
| 135 | /// assert!(ex.is_empty()); |
| 136 | /// |
| 137 | /// let task = ex.spawn(async { |
| 138 | /// println!("Hello world" ); |
| 139 | /// }); |
| 140 | /// assert!(!ex.is_empty()); |
| 141 | /// |
| 142 | /// assert!(ex.try_tick()); |
| 143 | /// assert!(ex.is_empty()); |
| 144 | /// ``` |
| 145 | pub fn is_empty(&self) -> bool { |
| 146 | self.state().active.lock().unwrap().is_empty() |
| 147 | } |
| 148 | |
| 149 | /// Spawns a task onto the executor. |
| 150 | /// |
| 151 | /// # Examples |
| 152 | /// |
| 153 | /// ``` |
| 154 | /// use async_executor::Executor; |
| 155 | /// |
| 156 | /// let ex = Executor::new(); |
| 157 | /// |
| 158 | /// let task = ex.spawn(async { |
| 159 | /// println!("Hello world" ); |
| 160 | /// }); |
| 161 | /// ``` |
| 162 | pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> { |
| 163 | let mut active = self.state().active.lock().unwrap(); |
| 164 | |
| 165 | // SAFETY: `T` and the future are `Send`. |
| 166 | unsafe { self.spawn_inner(future, &mut active) } |
| 167 | } |
| 168 | |
| 169 | /// Spawns many tasks onto the executor. |
| 170 | /// |
| 171 | /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and |
| 172 | /// spawns all of the tasks in one go. With large amounts of tasks this can improve |
| 173 | /// contention. |
| 174 | /// |
| 175 | /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to |
| 176 | /// prevent runner thread starvation. It is assumed that the iterator provided does not |
| 177 | /// block; blocking iterators can lock up the internal mutex and therefore the entire |
| 178 | /// executor. |
| 179 | /// |
| 180 | /// ## Example |
| 181 | /// |
| 182 | /// ``` |
| 183 | /// use async_executor::Executor; |
| 184 | /// use futures_lite::{stream, prelude::*}; |
| 185 | /// use std::future::ready; |
| 186 | /// |
| 187 | /// # futures_lite::future::block_on(async { |
| 188 | /// let mut ex = Executor::new(); |
| 189 | /// |
| 190 | /// let futures = [ |
| 191 | /// ready(1), |
| 192 | /// ready(2), |
| 193 | /// ready(3) |
| 194 | /// ]; |
| 195 | /// |
| 196 | /// // Spawn all of the futures onto the executor at once. |
| 197 | /// let mut tasks = vec![]; |
| 198 | /// ex.spawn_many(futures, &mut tasks); |
| 199 | /// |
| 200 | /// // Await all of them. |
| 201 | /// let results = ex.run(async move { |
| 202 | /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await |
| 203 | /// }).await; |
| 204 | /// assert_eq!(results, [1, 2, 3]); |
| 205 | /// # }); |
| 206 | /// ``` |
| 207 | /// |
| 208 | /// [`spawn`]: Executor::spawn |
| 209 | pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>( |
| 210 | &self, |
| 211 | futures: impl IntoIterator<Item = F>, |
| 212 | handles: &mut impl Extend<Task<F::Output>>, |
| 213 | ) { |
| 214 | let mut active = Some(self.state().active.lock().unwrap()); |
| 215 | |
| 216 | // Convert the futures into tasks. |
| 217 | let tasks = futures.into_iter().enumerate().map(move |(i, future)| { |
| 218 | // SAFETY: `T` and the future are `Send`. |
| 219 | let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) }; |
| 220 | |
| 221 | // Yield the lock every once in a while to ease contention. |
| 222 | if i.wrapping_sub(1) % 500 == 0 { |
| 223 | drop(active.take()); |
| 224 | active = Some(self.state().active.lock().unwrap()); |
| 225 | } |
| 226 | |
| 227 | task |
| 228 | }); |
| 229 | |
| 230 | // Push the tasks to the user's collection. |
| 231 | handles.extend(tasks); |
| 232 | } |
| 233 | |
| 234 | /// Spawn a future while holding the inner lock. |
| 235 | /// |
| 236 | /// # Safety |
| 237 | /// |
| 238 | /// If this is an `Executor`, `F` and `T` must be `Send`. |
| 239 | unsafe fn spawn_inner<T: 'a>( |
| 240 | &self, |
| 241 | future: impl Future<Output = T> + 'a, |
| 242 | active: &mut Slab<Waker>, |
| 243 | ) -> Task<T> { |
| 244 | // Remove the task from the set of active tasks when the future finishes. |
| 245 | let entry = active.vacant_entry(); |
| 246 | let index = entry.key(); |
| 247 | let state = self.state_as_arc(); |
| 248 | let future = async move { |
| 249 | let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); |
| 250 | future.await |
| 251 | }; |
| 252 | |
| 253 | // Create the task and register it in the set of active tasks. |
| 254 | // |
| 255 | // SAFETY: |
| 256 | // |
| 257 | // If `future` is not `Send`, this must be a `LocalExecutor` as per this |
| 258 | // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, |
| 259 | // `try_tick`, `tick` and `run` can only be called from the origin |
| 260 | // thread of the `LocalExecutor`. Similarly, `spawn` can only be called |
| 261 | // from the origin thread, ensuring that `future` and the executor share |
| 262 | // the same origin thread. The `Runnable` can be scheduled from other |
| 263 | // threads, but because of the above `Runnable` can only be called or |
| 264 | // dropped on the origin thread. |
| 265 | // |
| 266 | // `future` is not `'static`, but we make sure that the `Runnable` does |
| 267 | // not outlive `'a`. When the executor is dropped, the `active` field is |
| 268 | // drained and all of the `Waker`s are woken. Then, the queue inside of |
| 269 | // the `Executor` is drained of all of its runnables. This ensures that |
| 270 | // runnables are dropped and this precondition is satisfied. |
| 271 | // |
| 272 | // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. |
| 273 | // Therefore we do not need to worry about what is done with the |
| 274 | // `Waker`. |
| 275 | let (runnable, task) = Builder::new() |
| 276 | .propagate_panic(true) |
| 277 | .spawn_unchecked(|()| future, self.schedule()); |
| 278 | entry.insert(runnable.waker()); |
| 279 | |
| 280 | runnable.schedule(); |
| 281 | task |
| 282 | } |
| 283 | |
| 284 | /// Attempts to run a task if at least one is scheduled. |
| 285 | /// |
| 286 | /// Running a scheduled task means simply polling its future once. |
| 287 | /// |
| 288 | /// # Examples |
| 289 | /// |
| 290 | /// ``` |
| 291 | /// use async_executor::Executor; |
| 292 | /// |
| 293 | /// let ex = Executor::new(); |
| 294 | /// assert!(!ex.try_tick()); // no tasks to run |
| 295 | /// |
| 296 | /// let task = ex.spawn(async { |
| 297 | /// println!("Hello world" ); |
| 298 | /// }); |
| 299 | /// assert!(ex.try_tick()); // a task was found |
| 300 | /// ``` |
| 301 | pub fn try_tick(&self) -> bool { |
| 302 | self.state().try_tick() |
| 303 | } |
| 304 | |
| 305 | /// Runs a single task. |
| 306 | /// |
| 307 | /// Running a task means simply polling its future once. |
| 308 | /// |
| 309 | /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. |
| 310 | /// |
| 311 | /// # Examples |
| 312 | /// |
| 313 | /// ``` |
| 314 | /// use async_executor::Executor; |
| 315 | /// use futures_lite::future; |
| 316 | /// |
| 317 | /// let ex = Executor::new(); |
| 318 | /// |
| 319 | /// let task = ex.spawn(async { |
| 320 | /// println!("Hello world" ); |
| 321 | /// }); |
| 322 | /// future::block_on(ex.tick()); // runs the task |
| 323 | /// ``` |
| 324 | pub async fn tick(&self) { |
| 325 | self.state().tick().await; |
| 326 | } |
| 327 | |
| 328 | /// Runs the executor until the given future completes. |
| 329 | /// |
| 330 | /// # Examples |
| 331 | /// |
| 332 | /// ``` |
| 333 | /// use async_executor::Executor; |
| 334 | /// use futures_lite::future; |
| 335 | /// |
| 336 | /// let ex = Executor::new(); |
| 337 | /// |
| 338 | /// let task = ex.spawn(async { 1 + 2 }); |
| 339 | /// let res = future::block_on(ex.run(async { task.await * 2 })); |
| 340 | /// |
| 341 | /// assert_eq!(res, 6); |
| 342 | /// ``` |
| 343 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
| 344 | self.state().run(future).await |
| 345 | } |
| 346 | |
| 347 | /// Returns a function that schedules a runnable task when it gets woken up. |
| 348 | fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { |
| 349 | let state = self.state_as_arc(); |
| 350 | |
| 351 | // TODO: If possible, push into the current local queue and notify the ticker. |
| 352 | move |runnable| { |
| 353 | state.queue.push(runnable).unwrap(); |
| 354 | state.notify(); |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | /// Returns a pointer to the inner state. |
| 359 | #[inline ] |
| 360 | fn state_ptr(&self) -> *const State { |
| 361 | #[cold ] |
| 362 | fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State { |
| 363 | let state = Arc::new(State::new()); |
| 364 | // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65 |
| 365 | let ptr = Arc::into_raw(state) as *mut State; |
| 366 | if let Err(actual) = atomic_ptr.compare_exchange( |
| 367 | std::ptr::null_mut(), |
| 368 | ptr, |
| 369 | Ordering::AcqRel, |
| 370 | Ordering::Acquire, |
| 371 | ) { |
| 372 | // SAFETY: This was just created from Arc::into_raw. |
| 373 | drop(unsafe { Arc::from_raw(ptr) }); |
| 374 | actual |
| 375 | } else { |
| 376 | ptr |
| 377 | } |
| 378 | } |
| 379 | |
| 380 | let mut ptr = self.state.load(Ordering::Acquire); |
| 381 | if ptr.is_null() { |
| 382 | ptr = alloc_state(&self.state); |
| 383 | } |
| 384 | ptr |
| 385 | } |
| 386 | |
| 387 | /// Returns a reference to the inner state. |
| 388 | #[inline ] |
| 389 | fn state(&self) -> &State { |
| 390 | // SAFETY: So long as an Executor lives, it's state pointer will always be valid |
| 391 | // when accessed through state_ptr. |
| 392 | unsafe { &*self.state_ptr() } |
| 393 | } |
| 394 | |
| 395 | // Clones the inner state Arc |
| 396 | #[inline ] |
| 397 | fn state_as_arc(&self) -> Arc<State> { |
| 398 | // SAFETY: So long as an Executor lives, it's state pointer will always be a valid |
| 399 | // Arc when accessed through state_ptr. |
| 400 | let arc = unsafe { Arc::from_raw(self.state_ptr()) }; |
| 401 | let clone = arc.clone(); |
| 402 | std::mem::forget(arc); |
| 403 | clone |
| 404 | } |
| 405 | } |
| 406 | |
| 407 | impl Drop for Executor<'_> { |
| 408 | fn drop(&mut self) { |
| 409 | let ptr: *mut State = *self.state.get_mut(); |
| 410 | if ptr.is_null() { |
| 411 | return; |
| 412 | } |
| 413 | |
| 414 | // SAFETY: As ptr is not null, it was allocated via Arc::new and converted |
| 415 | // via Arc::into_raw in state_ptr. |
| 416 | let state: Arc = unsafe { Arc::from_raw(ptr) }; |
| 417 | |
| 418 | let mut active: MutexGuard<'_, Slab> = state.active.lock().unwrap_or_else(|e: PoisonError>| e.into_inner()); |
| 419 | for w: Waker in active.drain() { |
| 420 | w.wake(); |
| 421 | } |
| 422 | drop(active); |
| 423 | |
| 424 | while state.queue.pop().is_ok() {} |
| 425 | } |
| 426 | } |
| 427 | |
| 428 | impl<'a> Default for Executor<'a> { |
| 429 | fn default() -> Executor<'a> { |
| 430 | Executor::new() |
| 431 | } |
| 432 | } |
| 433 | |
| 434 | /// A thread-local executor. |
| 435 | /// |
| 436 | /// The executor can only be run on the thread that created it. |
| 437 | /// |
| 438 | /// # Examples |
| 439 | /// |
| 440 | /// ``` |
| 441 | /// use async_executor::LocalExecutor; |
| 442 | /// use futures_lite::future; |
| 443 | /// |
| 444 | /// let local_ex = LocalExecutor::new(); |
| 445 | /// |
| 446 | /// future::block_on(local_ex.run(async { |
| 447 | /// println!("Hello world!" ); |
| 448 | /// })); |
| 449 | /// ``` |
| 450 | pub struct LocalExecutor<'a> { |
| 451 | /// The inner executor. |
| 452 | inner: Executor<'a>, |
| 453 | |
| 454 | /// Makes the type `!Send` and `!Sync`. |
| 455 | _marker: PhantomData<Rc<()>>, |
| 456 | } |
| 457 | |
| 458 | impl UnwindSafe for LocalExecutor<'_> {} |
| 459 | impl RefUnwindSafe for LocalExecutor<'_> {} |
| 460 | |
| 461 | impl fmt::Debug for LocalExecutor<'_> { |
| 462 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 463 | debug_executor(&self.inner, name:"LocalExecutor" , f) |
| 464 | } |
| 465 | } |
| 466 | |
| 467 | impl<'a> LocalExecutor<'a> { |
| 468 | /// Creates a single-threaded executor. |
| 469 | /// |
| 470 | /// # Examples |
| 471 | /// |
| 472 | /// ``` |
| 473 | /// use async_executor::LocalExecutor; |
| 474 | /// |
| 475 | /// let local_ex = LocalExecutor::new(); |
| 476 | /// ``` |
| 477 | pub const fn new() -> LocalExecutor<'a> { |
| 478 | LocalExecutor { |
| 479 | inner: Executor::new(), |
| 480 | _marker: PhantomData, |
| 481 | } |
| 482 | } |
| 483 | |
| 484 | /// Returns `true` if there are no unfinished tasks. |
| 485 | /// |
| 486 | /// # Examples |
| 487 | /// |
| 488 | /// ``` |
| 489 | /// use async_executor::LocalExecutor; |
| 490 | /// |
| 491 | /// let local_ex = LocalExecutor::new(); |
| 492 | /// assert!(local_ex.is_empty()); |
| 493 | /// |
| 494 | /// let task = local_ex.spawn(async { |
| 495 | /// println!("Hello world" ); |
| 496 | /// }); |
| 497 | /// assert!(!local_ex.is_empty()); |
| 498 | /// |
| 499 | /// assert!(local_ex.try_tick()); |
| 500 | /// assert!(local_ex.is_empty()); |
| 501 | /// ``` |
| 502 | pub fn is_empty(&self) -> bool { |
| 503 | self.inner().is_empty() |
| 504 | } |
| 505 | |
| 506 | /// Spawns a task onto the executor. |
| 507 | /// |
| 508 | /// # Examples |
| 509 | /// |
| 510 | /// ``` |
| 511 | /// use async_executor::LocalExecutor; |
| 512 | /// |
| 513 | /// let local_ex = LocalExecutor::new(); |
| 514 | /// |
| 515 | /// let task = local_ex.spawn(async { |
| 516 | /// println!("Hello world" ); |
| 517 | /// }); |
| 518 | /// ``` |
| 519 | pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> { |
| 520 | let mut active = self.inner().state().active.lock().unwrap(); |
| 521 | |
| 522 | // SAFETY: This executor is not thread safe, so the future and its result |
| 523 | // cannot be sent to another thread. |
| 524 | unsafe { self.inner().spawn_inner(future, &mut active) } |
| 525 | } |
| 526 | |
| 527 | /// Spawns many tasks onto the executor. |
| 528 | /// |
| 529 | /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and |
| 530 | /// spawns all of the tasks in one go. With large amounts of tasks this can improve |
| 531 | /// contention. |
| 532 | /// |
| 533 | /// It is assumed that the iterator provided does not block; blocking iterators can lock up |
| 534 | /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the |
| 535 | /// mutex is not released, as there are no other threads that can poll this executor. |
| 536 | /// |
| 537 | /// ## Example |
| 538 | /// |
| 539 | /// ``` |
| 540 | /// use async_executor::LocalExecutor; |
| 541 | /// use futures_lite::{stream, prelude::*}; |
| 542 | /// use std::future::ready; |
| 543 | /// |
| 544 | /// # futures_lite::future::block_on(async { |
| 545 | /// let mut ex = LocalExecutor::new(); |
| 546 | /// |
| 547 | /// let futures = [ |
| 548 | /// ready(1), |
| 549 | /// ready(2), |
| 550 | /// ready(3) |
| 551 | /// ]; |
| 552 | /// |
| 553 | /// // Spawn all of the futures onto the executor at once. |
| 554 | /// let mut tasks = vec![]; |
| 555 | /// ex.spawn_many(futures, &mut tasks); |
| 556 | /// |
| 557 | /// // Await all of them. |
| 558 | /// let results = ex.run(async move { |
| 559 | /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await |
| 560 | /// }).await; |
| 561 | /// assert_eq!(results, [1, 2, 3]); |
| 562 | /// # }); |
| 563 | /// ``` |
| 564 | /// |
| 565 | /// [`spawn`]: LocalExecutor::spawn |
| 566 | /// [`Executor::spawn_many`]: Executor::spawn_many |
| 567 | pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>( |
| 568 | &self, |
| 569 | futures: impl IntoIterator<Item = F>, |
| 570 | handles: &mut impl Extend<Task<F::Output>>, |
| 571 | ) { |
| 572 | let mut active = self.inner().state().active.lock().unwrap(); |
| 573 | |
| 574 | // Convert all of the futures to tasks. |
| 575 | let tasks = futures.into_iter().map(|future| { |
| 576 | // SAFETY: This executor is not thread safe, so the future and its result |
| 577 | // cannot be sent to another thread. |
| 578 | unsafe { self.inner().spawn_inner(future, &mut active) } |
| 579 | |
| 580 | // As only one thread can spawn or poll tasks at a time, there is no need |
| 581 | // to release lock contention here. |
| 582 | }); |
| 583 | |
| 584 | // Push them to the user's collection. |
| 585 | handles.extend(tasks); |
| 586 | } |
| 587 | |
| 588 | /// Attempts to run a task if at least one is scheduled. |
| 589 | /// |
| 590 | /// Running a scheduled task means simply polling its future once. |
| 591 | /// |
| 592 | /// # Examples |
| 593 | /// |
| 594 | /// ``` |
| 595 | /// use async_executor::LocalExecutor; |
| 596 | /// |
| 597 | /// let ex = LocalExecutor::new(); |
| 598 | /// assert!(!ex.try_tick()); // no tasks to run |
| 599 | /// |
| 600 | /// let task = ex.spawn(async { |
| 601 | /// println!("Hello world" ); |
| 602 | /// }); |
| 603 | /// assert!(ex.try_tick()); // a task was found |
| 604 | /// ``` |
| 605 | pub fn try_tick(&self) -> bool { |
| 606 | self.inner().try_tick() |
| 607 | } |
| 608 | |
| 609 | /// Runs a single task. |
| 610 | /// |
| 611 | /// Running a task means simply polling its future once. |
| 612 | /// |
| 613 | /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. |
| 614 | /// |
| 615 | /// # Examples |
| 616 | /// |
| 617 | /// ``` |
| 618 | /// use async_executor::LocalExecutor; |
| 619 | /// use futures_lite::future; |
| 620 | /// |
| 621 | /// let ex = LocalExecutor::new(); |
| 622 | /// |
| 623 | /// let task = ex.spawn(async { |
| 624 | /// println!("Hello world" ); |
| 625 | /// }); |
| 626 | /// future::block_on(ex.tick()); // runs the task |
| 627 | /// ``` |
| 628 | pub async fn tick(&self) { |
| 629 | self.inner().tick().await |
| 630 | } |
| 631 | |
| 632 | /// Runs the executor until the given future completes. |
| 633 | /// |
| 634 | /// # Examples |
| 635 | /// |
| 636 | /// ``` |
| 637 | /// use async_executor::LocalExecutor; |
| 638 | /// use futures_lite::future; |
| 639 | /// |
| 640 | /// let local_ex = LocalExecutor::new(); |
| 641 | /// |
| 642 | /// let task = local_ex.spawn(async { 1 + 2 }); |
| 643 | /// let res = future::block_on(local_ex.run(async { task.await * 2 })); |
| 644 | /// |
| 645 | /// assert_eq!(res, 6); |
| 646 | /// ``` |
| 647 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
| 648 | self.inner().run(future).await |
| 649 | } |
| 650 | |
| 651 | /// Returns a reference to the inner executor. |
| 652 | fn inner(&self) -> &Executor<'a> { |
| 653 | &self.inner |
| 654 | } |
| 655 | } |
| 656 | |
| 657 | impl<'a> Default for LocalExecutor<'a> { |
| 658 | fn default() -> LocalExecutor<'a> { |
| 659 | LocalExecutor::new() |
| 660 | } |
| 661 | } |
| 662 | |
| 663 | /// The state of a executor. |
| 664 | struct State { |
| 665 | /// The global queue. |
| 666 | queue: ConcurrentQueue<Runnable>, |
| 667 | |
| 668 | /// Local queues created by runners. |
| 669 | local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>, |
| 670 | |
| 671 | /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. |
| 672 | notified: AtomicBool, |
| 673 | |
| 674 | /// A list of sleeping tickers. |
| 675 | sleepers: Mutex<Sleepers>, |
| 676 | |
| 677 | /// Currently active tasks. |
| 678 | active: Mutex<Slab<Waker>>, |
| 679 | } |
| 680 | |
| 681 | impl State { |
| 682 | /// Creates state for a new executor. |
| 683 | const fn new() -> State { |
| 684 | State { |
| 685 | queue: ConcurrentQueue::unbounded(), |
| 686 | local_queues: RwLock::new(Vec::new()), |
| 687 | notified: AtomicBool::new(true), |
| 688 | sleepers: Mutex::new(Sleepers { |
| 689 | count: 0, |
| 690 | wakers: Vec::new(), |
| 691 | free_ids: Vec::new(), |
| 692 | }), |
| 693 | active: Mutex::new(Slab::new()), |
| 694 | } |
| 695 | } |
| 696 | |
| 697 | /// Notifies a sleeping ticker. |
| 698 | #[inline ] |
| 699 | fn notify(&self) { |
| 700 | if self |
| 701 | .notified |
| 702 | .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) |
| 703 | .is_ok() |
| 704 | { |
| 705 | let waker = self.sleepers.lock().unwrap().notify(); |
| 706 | if let Some(w) = waker { |
| 707 | w.wake(); |
| 708 | } |
| 709 | } |
| 710 | } |
| 711 | |
| 712 | pub(crate) fn try_tick(&self) -> bool { |
| 713 | match self.queue.pop() { |
| 714 | Err(_) => false, |
| 715 | Ok(runnable) => { |
| 716 | // Notify another ticker now to pick up where this ticker left off, just in case |
| 717 | // running the task takes a long time. |
| 718 | self.notify(); |
| 719 | |
| 720 | // Run the task. |
| 721 | runnable.run(); |
| 722 | true |
| 723 | } |
| 724 | } |
| 725 | } |
| 726 | |
| 727 | pub(crate) async fn tick(&self) { |
| 728 | let runnable = Ticker::new(self).runnable().await; |
| 729 | runnable.run(); |
| 730 | } |
| 731 | |
| 732 | pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { |
| 733 | let mut runner = Runner::new(self); |
| 734 | let mut rng = fastrand::Rng::new(); |
| 735 | |
| 736 | // A future that runs tasks forever. |
| 737 | let run_forever = async { |
| 738 | loop { |
| 739 | for _ in 0..200 { |
| 740 | let runnable = runner.runnable(&mut rng).await; |
| 741 | runnable.run(); |
| 742 | } |
| 743 | future::yield_now().await; |
| 744 | } |
| 745 | }; |
| 746 | |
| 747 | // Run `future` and `run_forever` concurrently until `future` completes. |
| 748 | future.or(run_forever).await |
| 749 | } |
| 750 | } |
| 751 | |
| 752 | /// A list of sleeping tickers. |
| 753 | struct Sleepers { |
| 754 | /// Number of sleeping tickers (both notified and unnotified). |
| 755 | count: usize, |
| 756 | |
| 757 | /// IDs and wakers of sleeping unnotified tickers. |
| 758 | /// |
| 759 | /// A sleeping ticker is notified when its waker is missing from this list. |
| 760 | wakers: Vec<(usize, Waker)>, |
| 761 | |
| 762 | /// Reclaimed IDs. |
| 763 | free_ids: Vec<usize>, |
| 764 | } |
| 765 | |
| 766 | impl Sleepers { |
| 767 | /// Inserts a new sleeping ticker. |
| 768 | fn insert(&mut self, waker: &Waker) -> usize { |
| 769 | let id = match self.free_ids.pop() { |
| 770 | Some(id) => id, |
| 771 | None => self.count + 1, |
| 772 | }; |
| 773 | self.count += 1; |
| 774 | self.wakers.push((id, waker.clone())); |
| 775 | id |
| 776 | } |
| 777 | |
| 778 | /// Re-inserts a sleeping ticker's waker if it was notified. |
| 779 | /// |
| 780 | /// Returns `true` if the ticker was notified. |
| 781 | fn update(&mut self, id: usize, waker: &Waker) -> bool { |
| 782 | for item in &mut self.wakers { |
| 783 | if item.0 == id { |
| 784 | item.1.clone_from(waker); |
| 785 | return false; |
| 786 | } |
| 787 | } |
| 788 | |
| 789 | self.wakers.push((id, waker.clone())); |
| 790 | true |
| 791 | } |
| 792 | |
| 793 | /// Removes a previously inserted sleeping ticker. |
| 794 | /// |
| 795 | /// Returns `true` if the ticker was notified. |
| 796 | fn remove(&mut self, id: usize) -> bool { |
| 797 | self.count -= 1; |
| 798 | self.free_ids.push(id); |
| 799 | |
| 800 | for i in (0..self.wakers.len()).rev() { |
| 801 | if self.wakers[i].0 == id { |
| 802 | self.wakers.remove(i); |
| 803 | return false; |
| 804 | } |
| 805 | } |
| 806 | true |
| 807 | } |
| 808 | |
| 809 | /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. |
| 810 | fn is_notified(&self) -> bool { |
| 811 | self.count == 0 || self.count > self.wakers.len() |
| 812 | } |
| 813 | |
| 814 | /// Returns notification waker for a sleeping ticker. |
| 815 | /// |
| 816 | /// If a ticker was notified already or there are no tickers, `None` will be returned. |
| 817 | fn notify(&mut self) -> Option<Waker> { |
| 818 | if self.wakers.len() == self.count { |
| 819 | self.wakers.pop().map(|item| item.1) |
| 820 | } else { |
| 821 | None |
| 822 | } |
| 823 | } |
| 824 | } |
| 825 | |
| 826 | /// Runs task one by one. |
| 827 | struct Ticker<'a> { |
| 828 | /// The executor state. |
| 829 | state: &'a State, |
| 830 | |
| 831 | /// Set to a non-zero sleeper ID when in sleeping state. |
| 832 | /// |
| 833 | /// States a ticker can be in: |
| 834 | /// 1) Woken. |
| 835 | /// 2a) Sleeping and unnotified. |
| 836 | /// 2b) Sleeping and notified. |
| 837 | sleeping: usize, |
| 838 | } |
| 839 | |
| 840 | impl Ticker<'_> { |
| 841 | /// Creates a ticker. |
| 842 | fn new(state: &State) -> Ticker<'_> { |
| 843 | Ticker { state, sleeping: 0 } |
| 844 | } |
| 845 | |
| 846 | /// Moves the ticker into sleeping and unnotified state. |
| 847 | /// |
| 848 | /// Returns `false` if the ticker was already sleeping and unnotified. |
| 849 | fn sleep(&mut self, waker: &Waker) -> bool { |
| 850 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
| 851 | |
| 852 | match self.sleeping { |
| 853 | // Move to sleeping state. |
| 854 | 0 => { |
| 855 | self.sleeping = sleepers.insert(waker); |
| 856 | } |
| 857 | |
| 858 | // Already sleeping, check if notified. |
| 859 | id => { |
| 860 | if !sleepers.update(id, waker) { |
| 861 | return false; |
| 862 | } |
| 863 | } |
| 864 | } |
| 865 | |
| 866 | self.state |
| 867 | .notified |
| 868 | .store(sleepers.is_notified(), Ordering::Release); |
| 869 | |
| 870 | true |
| 871 | } |
| 872 | |
| 873 | /// Moves the ticker into woken state. |
| 874 | fn wake(&mut self) { |
| 875 | if self.sleeping != 0 { |
| 876 | let mut sleepers = self.state.sleepers.lock().unwrap(); |
| 877 | sleepers.remove(self.sleeping); |
| 878 | |
| 879 | self.state |
| 880 | .notified |
| 881 | .store(sleepers.is_notified(), Ordering::Release); |
| 882 | } |
| 883 | self.sleeping = 0; |
| 884 | } |
| 885 | |
| 886 | /// Waits for the next runnable task to run. |
| 887 | async fn runnable(&mut self) -> Runnable { |
| 888 | self.runnable_with(|| self.state.queue.pop().ok()).await |
| 889 | } |
| 890 | |
| 891 | /// Waits for the next runnable task to run, given a function that searches for a task. |
| 892 | async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable { |
| 893 | future::poll_fn(|cx| { |
| 894 | loop { |
| 895 | match search() { |
| 896 | None => { |
| 897 | // Move to sleeping and unnotified state. |
| 898 | if !self.sleep(cx.waker()) { |
| 899 | // If already sleeping and unnotified, return. |
| 900 | return Poll::Pending; |
| 901 | } |
| 902 | } |
| 903 | Some(r) => { |
| 904 | // Wake up. |
| 905 | self.wake(); |
| 906 | |
| 907 | // Notify another ticker now to pick up where this ticker left off, just in |
| 908 | // case running the task takes a long time. |
| 909 | self.state.notify(); |
| 910 | |
| 911 | return Poll::Ready(r); |
| 912 | } |
| 913 | } |
| 914 | } |
| 915 | }) |
| 916 | .await |
| 917 | } |
| 918 | } |
| 919 | |
| 920 | impl Drop for Ticker<'_> { |
| 921 | fn drop(&mut self) { |
| 922 | // If this ticker is in sleeping state, it must be removed from the sleepers list. |
| 923 | if self.sleeping != 0 { |
| 924 | let mut sleepers: MutexGuard<'_, Sleepers> = self.state.sleepers.lock().unwrap(); |
| 925 | let notified: bool = sleepers.remove(self.sleeping); |
| 926 | |
| 927 | self.state |
| 928 | .notified |
| 929 | .store(val:sleepers.is_notified(), order:Ordering::Release); |
| 930 | |
| 931 | // If this ticker was notified, then notify another ticker. |
| 932 | if notified { |
| 933 | drop(sleepers); |
| 934 | self.state.notify(); |
| 935 | } |
| 936 | } |
| 937 | } |
| 938 | } |
| 939 | |
| 940 | /// A worker in a work-stealing executor. |
| 941 | /// |
| 942 | /// This is just a ticker that also has an associated local queue for improved cache locality. |
| 943 | struct Runner<'a> { |
| 944 | /// The executor state. |
| 945 | state: &'a State, |
| 946 | |
| 947 | /// Inner ticker. |
| 948 | ticker: Ticker<'a>, |
| 949 | |
| 950 | /// The local queue. |
| 951 | local: Arc<ConcurrentQueue<Runnable>>, |
| 952 | |
| 953 | /// Bumped every time a runnable task is found. |
| 954 | ticks: usize, |
| 955 | } |
| 956 | |
| 957 | impl Runner<'_> { |
| 958 | /// Creates a runner and registers it in the executor state. |
| 959 | fn new(state: &State) -> Runner<'_> { |
| 960 | let runner = Runner { |
| 961 | state, |
| 962 | ticker: Ticker::new(state), |
| 963 | local: Arc::new(ConcurrentQueue::bounded(512)), |
| 964 | ticks: 0, |
| 965 | }; |
| 966 | state |
| 967 | .local_queues |
| 968 | .write() |
| 969 | .unwrap() |
| 970 | .push(runner.local.clone()); |
| 971 | runner |
| 972 | } |
| 973 | |
| 974 | /// Waits for the next runnable task to run. |
| 975 | async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable { |
| 976 | let runnable = self |
| 977 | .ticker |
| 978 | .runnable_with(|| { |
| 979 | // Try the local queue. |
| 980 | if let Ok(r) = self.local.pop() { |
| 981 | return Some(r); |
| 982 | } |
| 983 | |
| 984 | // Try stealing from the global queue. |
| 985 | if let Ok(r) = self.state.queue.pop() { |
| 986 | steal(&self.state.queue, &self.local); |
| 987 | return Some(r); |
| 988 | } |
| 989 | |
| 990 | // Try stealing from other runners. |
| 991 | let local_queues = self.state.local_queues.read().unwrap(); |
| 992 | |
| 993 | // Pick a random starting point in the iterator list and rotate the list. |
| 994 | let n = local_queues.len(); |
| 995 | let start = rng.usize(..n); |
| 996 | let iter = local_queues |
| 997 | .iter() |
| 998 | .chain(local_queues.iter()) |
| 999 | .skip(start) |
| 1000 | .take(n); |
| 1001 | |
| 1002 | // Remove this runner's local queue. |
| 1003 | let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); |
| 1004 | |
| 1005 | // Try stealing from each local queue in the list. |
| 1006 | for local in iter { |
| 1007 | steal(local, &self.local); |
| 1008 | if let Ok(r) = self.local.pop() { |
| 1009 | return Some(r); |
| 1010 | } |
| 1011 | } |
| 1012 | |
| 1013 | None |
| 1014 | }) |
| 1015 | .await; |
| 1016 | |
| 1017 | // Bump the tick counter. |
| 1018 | self.ticks = self.ticks.wrapping_add(1); |
| 1019 | |
| 1020 | if self.ticks % 64 == 0 { |
| 1021 | // Steal tasks from the global queue to ensure fair task scheduling. |
| 1022 | steal(&self.state.queue, &self.local); |
| 1023 | } |
| 1024 | |
| 1025 | runnable |
| 1026 | } |
| 1027 | } |
| 1028 | |
| 1029 | impl Drop for Runner<'_> { |
| 1030 | fn drop(&mut self) { |
| 1031 | // Remove the local queue. |
| 1032 | self.state |
| 1033 | .local_queues |
| 1034 | .write() |
| 1035 | .unwrap() |
| 1036 | .retain(|local: &Arc>| !Arc::ptr_eq(this:local, &self.local)); |
| 1037 | |
| 1038 | // Re-schedule remaining tasks in the local queue. |
| 1039 | while let Ok(r: Runnable) = self.local.pop() { |
| 1040 | r.schedule(); |
| 1041 | } |
| 1042 | } |
| 1043 | } |
| 1044 | |
| 1045 | /// Steals some items from one queue into another. |
| 1046 | fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) { |
| 1047 | // Half of `src`'s length rounded up. |
| 1048 | let mut count: usize = (src.len() + 1) / 2; |
| 1049 | |
| 1050 | if count > 0 { |
| 1051 | // Don't steal more than fits into the queue. |
| 1052 | if let Some(cap: usize) = dest.capacity() { |
| 1053 | count = count.min(cap - dest.len()); |
| 1054 | } |
| 1055 | |
| 1056 | // Steal tasks. |
| 1057 | for _ in 0..count { |
| 1058 | if let Ok(t: T) = src.pop() { |
| 1059 | assert!(dest.push(t).is_ok()); |
| 1060 | } else { |
| 1061 | break; |
| 1062 | } |
| 1063 | } |
| 1064 | } |
| 1065 | } |
| 1066 | |
| 1067 | /// Debug implementation for `Executor` and `LocalExecutor`. |
| 1068 | fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 1069 | // Get a reference to the state. |
| 1070 | let ptr: *mut State = executor.state.load(order:Ordering::Acquire); |
| 1071 | if ptr.is_null() { |
| 1072 | // The executor has not been initialized. |
| 1073 | struct Uninitialized; |
| 1074 | |
| 1075 | impl fmt::Debug for Uninitialized { |
| 1076 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 1077 | f.write_str(data:"<uninitialized>" ) |
| 1078 | } |
| 1079 | } |
| 1080 | |
| 1081 | return f.debug_tuple(name).field(&Uninitialized).finish(); |
| 1082 | } |
| 1083 | |
| 1084 | // SAFETY: If the state pointer is not null, it must have been |
| 1085 | // allocated properly by Arc::new and converted via Arc::into_raw |
| 1086 | // in state_ptr. |
| 1087 | let state: &State = unsafe { &*ptr }; |
| 1088 | |
| 1089 | debug_state(state, name, f) |
| 1090 | } |
| 1091 | |
| 1092 | /// Debug implementation for `Executor` and `LocalExecutor`. |
| 1093 | fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 1094 | /// Debug wrapper for the number of active tasks. |
| 1095 | struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>); |
| 1096 | |
| 1097 | impl fmt::Debug for ActiveTasks<'_> { |
| 1098 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 1099 | match self.0.try_lock() { |
| 1100 | Ok(lock) => fmt::Debug::fmt(&lock.len(), f), |
| 1101 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
| 1102 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
| 1103 | } |
| 1104 | } |
| 1105 | } |
| 1106 | |
| 1107 | /// Debug wrapper for the local runners. |
| 1108 | struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>); |
| 1109 | |
| 1110 | impl fmt::Debug for LocalRunners<'_> { |
| 1111 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 1112 | match self.0.try_read() { |
| 1113 | Ok(lock) => f |
| 1114 | .debug_list() |
| 1115 | .entries(lock.iter().map(|queue| queue.len())) |
| 1116 | .finish(), |
| 1117 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
| 1118 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
| 1119 | } |
| 1120 | } |
| 1121 | } |
| 1122 | |
| 1123 | /// Debug wrapper for the sleepers. |
| 1124 | struct SleepCount<'a>(&'a Mutex<Sleepers>); |
| 1125 | |
| 1126 | impl fmt::Debug for SleepCount<'_> { |
| 1127 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 1128 | match self.0.try_lock() { |
| 1129 | Ok(lock) => fmt::Debug::fmt(&lock.count, f), |
| 1130 | Err(TryLockError::WouldBlock) => f.write_str("<locked>" ), |
| 1131 | Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>" ), |
| 1132 | } |
| 1133 | } |
| 1134 | } |
| 1135 | |
| 1136 | f.debug_struct(name) |
| 1137 | .field("active" , &ActiveTasks(&state.active)) |
| 1138 | .field("global_tasks" , &state.queue.len()) |
| 1139 | .field("local_runners" , &LocalRunners(&state.local_queues)) |
| 1140 | .field("sleepers" , &SleepCount(&state.sleepers)) |
| 1141 | .finish() |
| 1142 | } |
| 1143 | |
| 1144 | /// Runs a closure when dropped. |
| 1145 | struct CallOnDrop<F: FnMut()>(F); |
| 1146 | |
| 1147 | impl<F: FnMut()> Drop for CallOnDrop<F> { |
| 1148 | fn drop(&mut self) { |
| 1149 | (self.0)(); |
| 1150 | } |
| 1151 | } |
| 1152 | |
| 1153 | fn _ensure_send_and_sync() { |
| 1154 | use futures_lite::future::pending; |
| 1155 | |
| 1156 | fn is_send<T: Send>(_: T) {} |
| 1157 | fn is_sync<T: Sync>(_: T) {} |
| 1158 | fn is_static<T: 'static>(_: T) {} |
| 1159 | |
| 1160 | is_send::<Executor<'_>>(Executor::new()); |
| 1161 | is_sync::<Executor<'_>>(Executor::new()); |
| 1162 | |
| 1163 | let ex = Executor::new(); |
| 1164 | is_send(ex.run(pending::<()>())); |
| 1165 | is_sync(ex.run(pending::<()>())); |
| 1166 | is_send(ex.tick()); |
| 1167 | is_sync(ex.tick()); |
| 1168 | is_send(ex.schedule()); |
| 1169 | is_sync(ex.schedule()); |
| 1170 | is_static(ex.schedule()); |
| 1171 | |
| 1172 | /// ```compile_fail |
| 1173 | /// use async_executor::LocalExecutor; |
| 1174 | /// use futures_lite::future::pending; |
| 1175 | /// |
| 1176 | /// fn is_send<T: Send>(_: T) {} |
| 1177 | /// fn is_sync<T: Sync>(_: T) {} |
| 1178 | /// |
| 1179 | /// is_send::<LocalExecutor<'_>>(LocalExecutor::new()); |
| 1180 | /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new()); |
| 1181 | /// |
| 1182 | /// let ex = LocalExecutor::new(); |
| 1183 | /// is_send(ex.run(pending::<()>())); |
| 1184 | /// is_sync(ex.run(pending::<()>())); |
| 1185 | /// is_send(ex.tick()); |
| 1186 | /// is_sync(ex.tick()); |
| 1187 | /// ``` |
| 1188 | fn _negative_test() {} |
| 1189 | } |
| 1190 | |