1 | use std::cell::{Cell, RefCell}; |
2 | use std::future::Future; |
3 | use std::mem::ManuallyDrop; |
4 | use std::pin::Pin; |
5 | use std::rc::Rc; |
6 | use std::task::{Context, RawWaker, RawWakerVTable, Waker}; |
7 | |
8 | struct Inner { |
9 | future: Pin<Box<dyn Future<Output = ()> + 'static>>, |
10 | waker: Waker, |
11 | } |
12 | |
13 | pub(crate) struct Task { |
14 | // The actual Future that we're executing as part of this task. |
15 | // |
16 | // This is an Option so that the Future can be immediately dropped when it's |
17 | // finished |
18 | inner: RefCell<Option<Inner>>, |
19 | |
20 | // This is used to ensure that the Task will only be queued once |
21 | is_queued: Cell<bool>, |
22 | } |
23 | |
24 | impl Task { |
25 | pub(crate) fn spawn(future: Pin<Box<dyn Future<Output = ()> + 'static>>) { |
26 | let this = Rc::new(Self { |
27 | inner: RefCell::new(None), |
28 | is_queued: Cell::new(true), |
29 | }); |
30 | |
31 | let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) }; |
32 | |
33 | *this.inner.borrow_mut() = Some(Inner { future, waker }); |
34 | |
35 | crate::queue::QUEUE.with(|queue| queue.schedule_task(this)); |
36 | } |
37 | |
38 | fn force_wake(this: Rc<Self>) { |
39 | crate::queue::QUEUE.with(|queue| { |
40 | queue.push_task(this); |
41 | }); |
42 | } |
43 | |
44 | fn wake(this: Rc<Self>) { |
45 | // If we've already been placed on the run queue then there's no need to |
46 | // requeue ourselves since we're going to run at some point in the |
47 | // future anyway. |
48 | if this.is_queued.replace(true) { |
49 | return; |
50 | } |
51 | |
52 | Self::force_wake(this); |
53 | } |
54 | |
55 | fn wake_by_ref(this: &Rc<Self>) { |
56 | // If we've already been placed on the run queue then there's no need to |
57 | // requeue ourselves since we're going to run at some point in the |
58 | // future anyway. |
59 | if this.is_queued.replace(true) { |
60 | return; |
61 | } |
62 | |
63 | Self::force_wake(Rc::clone(this)); |
64 | } |
65 | |
66 | /// Creates a standard library `RawWaker` from an `Rc` of ourselves. |
67 | /// |
68 | /// Note that in general this is wildly unsafe because everything with |
69 | /// Futures requires `Sync` + `Send` with regard to Wakers. For wasm, |
70 | /// however, everything is guaranteed to be singlethreaded (since we're |
71 | /// compiled without the `atomics` feature) so we "safely lie" and say our |
72 | /// `Rc` pointer is good enough. |
73 | /// |
74 | /// The implementation is based off of futures::task::ArcWake |
75 | unsafe fn into_raw_waker(this: Rc<Self>) -> RawWaker { |
76 | unsafe fn raw_clone(ptr: *const ()) -> RawWaker { |
77 | let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); |
78 | Task::into_raw_waker(Rc::clone(&ptr)) |
79 | } |
80 | |
81 | unsafe fn raw_wake(ptr: *const ()) { |
82 | let ptr = Rc::from_raw(ptr as *const Task); |
83 | Task::wake(ptr); |
84 | } |
85 | |
86 | unsafe fn raw_wake_by_ref(ptr: *const ()) { |
87 | let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); |
88 | Task::wake_by_ref(&ptr); |
89 | } |
90 | |
91 | unsafe fn raw_drop(ptr: *const ()) { |
92 | drop(Rc::from_raw(ptr as *const Task)); |
93 | } |
94 | |
95 | static VTABLE: RawWakerVTable = |
96 | RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop); |
97 | |
98 | RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE) |
99 | } |
100 | |
101 | pub(crate) fn run(&self) { |
102 | let mut borrow = self.inner.borrow_mut(); |
103 | |
104 | // Wakeups can come in after a Future has finished and been destroyed, |
105 | // so handle this gracefully by just ignoring the request to run. |
106 | let inner = match borrow.as_mut() { |
107 | Some(inner) => inner, |
108 | None => return, |
109 | }; |
110 | |
111 | // Ensure that if poll calls `waker.wake()` we can get enqueued back on |
112 | // the run queue. |
113 | self.is_queued.set(false); |
114 | |
115 | let poll = { |
116 | let mut cx = Context::from_waker(&inner.waker); |
117 | inner.future.as_mut().poll(&mut cx) |
118 | }; |
119 | |
120 | // If a future has finished (`Ready`) then clean up resources associated |
121 | // with the future ASAP. This ensures that we don't keep anything extra |
122 | // alive in-memory by accident. Our own struct, `Rc<Task>` won't |
123 | // actually go away until all wakers referencing us go away, which may |
124 | // take quite some time, so ensure that the heaviest of resources are |
125 | // released early. |
126 | if poll.is_ready() { |
127 | *borrow = None; |
128 | } |
129 | } |
130 | } |
131 | |