| 1 | use alloc::boxed::Box; |
| 2 | use alloc::rc::Rc; |
| 3 | use core::cell::{Cell, RefCell}; |
| 4 | use core::future::Future; |
| 5 | use core::mem::ManuallyDrop; |
| 6 | use core::pin::Pin; |
| 7 | use core::task::{Context, RawWaker, RawWakerVTable, Waker}; |
| 8 | |
| 9 | struct Inner { |
| 10 | future: Pin<Box<dyn Future<Output = ()> + 'static>>, |
| 11 | waker: Waker, |
| 12 | } |
| 13 | |
| 14 | pub(crate) struct Task { |
| 15 | // The actual Future that we're executing as part of this task. |
| 16 | // |
| 17 | // This is an Option so that the Future can be immediately dropped when it's |
| 18 | // finished |
| 19 | inner: RefCell<Option<Inner>>, |
| 20 | |
| 21 | // This is used to ensure that the Task will only be queued once |
| 22 | is_queued: Cell<bool>, |
| 23 | } |
| 24 | |
| 25 | impl Task { |
| 26 | pub(crate) fn spawn(future: Pin<Box<dyn Future<Output = ()> + 'static>>) { |
| 27 | let this = Rc::new(Self { |
| 28 | inner: RefCell::new(None), |
| 29 | is_queued: Cell::new(true), |
| 30 | }); |
| 31 | |
| 32 | let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) }; |
| 33 | |
| 34 | *this.inner.borrow_mut() = Some(Inner { future, waker }); |
| 35 | |
| 36 | crate::queue::Queue::with(|queue| queue.schedule_task(this)); |
| 37 | } |
| 38 | |
| 39 | fn force_wake(this: Rc<Self>) { |
| 40 | crate::queue::Queue::with(|queue| { |
| 41 | queue.push_task(this); |
| 42 | }); |
| 43 | } |
| 44 | |
| 45 | fn wake(this: Rc<Self>) { |
| 46 | // If we've already been placed on the run queue then there's no need to |
| 47 | // requeue ourselves since we're going to run at some point in the |
| 48 | // future anyway. |
| 49 | if this.is_queued.replace(true) { |
| 50 | return; |
| 51 | } |
| 52 | |
| 53 | Self::force_wake(this); |
| 54 | } |
| 55 | |
| 56 | fn wake_by_ref(this: &Rc<Self>) { |
| 57 | // If we've already been placed on the run queue then there's no need to |
| 58 | // requeue ourselves since we're going to run at some point in the |
| 59 | // future anyway. |
| 60 | if this.is_queued.replace(true) { |
| 61 | return; |
| 62 | } |
| 63 | |
| 64 | Self::force_wake(Rc::clone(this)); |
| 65 | } |
| 66 | |
| 67 | /// Creates a standard library `RawWaker` from an `Rc` of ourselves. |
| 68 | /// |
| 69 | /// Note that in general this is wildly unsafe because everything with |
| 70 | /// Futures requires `Sync` + `Send` with regard to Wakers. For wasm, |
| 71 | /// however, everything is guaranteed to be singlethreaded (since we're |
| 72 | /// compiled without the `atomics` feature) so we "safely lie" and say our |
| 73 | /// `Rc` pointer is good enough. |
| 74 | /// |
| 75 | /// The implementation is based off of futures::task::ArcWake |
| 76 | unsafe fn into_raw_waker(this: Rc<Self>) -> RawWaker { |
| 77 | unsafe fn raw_clone(ptr: *const ()) -> RawWaker { |
| 78 | let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); |
| 79 | Task::into_raw_waker(Rc::clone(&ptr)) |
| 80 | } |
| 81 | |
| 82 | unsafe fn raw_wake(ptr: *const ()) { |
| 83 | let ptr = Rc::from_raw(ptr as *const Task); |
| 84 | Task::wake(ptr); |
| 85 | } |
| 86 | |
| 87 | unsafe fn raw_wake_by_ref(ptr: *const ()) { |
| 88 | let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); |
| 89 | Task::wake_by_ref(&ptr); |
| 90 | } |
| 91 | |
| 92 | unsafe fn raw_drop(ptr: *const ()) { |
| 93 | drop(Rc::from_raw(ptr as *const Task)); |
| 94 | } |
| 95 | |
| 96 | static VTABLE: RawWakerVTable = |
| 97 | RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop); |
| 98 | |
| 99 | RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE) |
| 100 | } |
| 101 | |
| 102 | pub(crate) fn run(&self) { |
| 103 | let mut borrow = self.inner.borrow_mut(); |
| 104 | |
| 105 | // Wakeups can come in after a Future has finished and been destroyed, |
| 106 | // so handle this gracefully by just ignoring the request to run. |
| 107 | let inner = match borrow.as_mut() { |
| 108 | Some(inner) => inner, |
| 109 | None => return, |
| 110 | }; |
| 111 | |
| 112 | // Ensure that if poll calls `waker.wake()` we can get enqueued back on |
| 113 | // the run queue. |
| 114 | self.is_queued.set(false); |
| 115 | |
| 116 | let poll = { |
| 117 | let mut cx = Context::from_waker(&inner.waker); |
| 118 | inner.future.as_mut().poll(&mut cx) |
| 119 | }; |
| 120 | |
| 121 | // If a future has finished (`Ready`) then clean up resources associated |
| 122 | // with the future ASAP. This ensures that we don't keep anything extra |
| 123 | // alive in-memory by accident. Our own struct, `Rc<Task>` won't |
| 124 | // actually go away until all wakers referencing us go away, which may |
| 125 | // take quite some time, so ensure that the heaviest of resources are |
| 126 | // released early. |
| 127 | if poll.is_ready() { |
| 128 | *borrow = None; |
| 129 | } |
| 130 | } |
| 131 | } |
| 132 | |