1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::mem::ManuallyDrop;
4use std::pin::Pin;
5use std::rc::Rc;
6use std::task::{Context, RawWaker, RawWakerVTable, Waker};
7
8struct Inner {
9 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
10 waker: Waker,
11}
12
13pub(crate) struct Task {
14 // The actual Future that we're executing as part of this task.
15 //
16 // This is an Option so that the Future can be immediately dropped when it's
17 // finished
18 inner: RefCell<Option<Inner>>,
19
20 // This is used to ensure that the Task will only be queued once
21 is_queued: Cell<bool>,
22}
23
24impl Task {
25 pub(crate) fn spawn(future: Pin<Box<dyn Future<Output = ()> + 'static>>) {
26 let this = Rc::new(Self {
27 inner: RefCell::new(None),
28 is_queued: Cell::new(true),
29 });
30
31 let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) };
32
33 *this.inner.borrow_mut() = Some(Inner { future, waker });
34
35 crate::queue::QUEUE.with(|queue| queue.schedule_task(this));
36 }
37
38 fn force_wake(this: Rc<Self>) {
39 crate::queue::QUEUE.with(|queue| {
40 queue.push_task(this);
41 });
42 }
43
44 fn wake(this: Rc<Self>) {
45 // If we've already been placed on the run queue then there's no need to
46 // requeue ourselves since we're going to run at some point in the
47 // future anyway.
48 if this.is_queued.replace(true) {
49 return;
50 }
51
52 Self::force_wake(this);
53 }
54
55 fn wake_by_ref(this: &Rc<Self>) {
56 // If we've already been placed on the run queue then there's no need to
57 // requeue ourselves since we're going to run at some point in the
58 // future anyway.
59 if this.is_queued.replace(true) {
60 return;
61 }
62
63 Self::force_wake(Rc::clone(this));
64 }
65
66 /// Creates a standard library `RawWaker` from an `Rc` of ourselves.
67 ///
68 /// Note that in general this is wildly unsafe because everything with
69 /// Futures requires `Sync` + `Send` with regard to Wakers. For wasm,
70 /// however, everything is guaranteed to be singlethreaded (since we're
71 /// compiled without the `atomics` feature) so we "safely lie" and say our
72 /// `Rc` pointer is good enough.
73 ///
74 /// The implementation is based off of futures::task::ArcWake
75 unsafe fn into_raw_waker(this: Rc<Self>) -> RawWaker {
76 unsafe fn raw_clone(ptr: *const ()) -> RawWaker {
77 let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
78 Task::into_raw_waker(Rc::clone(&ptr))
79 }
80
81 unsafe fn raw_wake(ptr: *const ()) {
82 let ptr = Rc::from_raw(ptr as *const Task);
83 Task::wake(ptr);
84 }
85
86 unsafe fn raw_wake_by_ref(ptr: *const ()) {
87 let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
88 Task::wake_by_ref(&ptr);
89 }
90
91 unsafe fn raw_drop(ptr: *const ()) {
92 drop(Rc::from_raw(ptr as *const Task));
93 }
94
95 static VTABLE: RawWakerVTable =
96 RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);
97
98 RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE)
99 }
100
101 pub(crate) fn run(&self) {
102 let mut borrow = self.inner.borrow_mut();
103
104 // Wakeups can come in after a Future has finished and been destroyed,
105 // so handle this gracefully by just ignoring the request to run.
106 let inner = match borrow.as_mut() {
107 Some(inner) => inner,
108 None => return,
109 };
110
111 // Ensure that if poll calls `waker.wake()` we can get enqueued back on
112 // the run queue.
113 self.is_queued.set(false);
114
115 let poll = {
116 let mut cx = Context::from_waker(&inner.waker);
117 inner.future.as_mut().poll(&mut cx)
118 };
119
120 // If a future has finished (`Ready`) then clean up resources associated
121 // with the future ASAP. This ensures that we don't keep anything extra
122 // alive in-memory by accident. Our own struct, `Rc<Task>` won't
123 // actually go away until all wakers referencing us go away, which may
124 // take quite some time, so ensure that the heaviest of resources are
125 // released early.
126 if poll.is_ready() {
127 *borrow = None;
128 }
129 }
130}
131