1use crate::future::Future;
2use crate::runtime::task::core::{Cell, Core, Header, Trailer};
3use crate::runtime::task::state::{Snapshot, State};
4use crate::runtime::task::waker::waker_ref;
5use crate::runtime::task::{JoinError, Notified, RawTask, Schedule, Task};
6
7use std::mem;
8use std::mem::ManuallyDrop;
9use std::panic;
10use std::ptr::NonNull;
11use std::task::{Context, Poll, Waker};
12
13/// Typed raw task handle.
14pub(super) struct Harness<T: Future, S: 'static> {
15 cell: NonNull<Cell<T, S>>,
16}
17
18impl<T, S> Harness<T, S>
19where
20 T: Future,
21 S: 'static,
22{
23 pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
24 Harness {
25 cell: ptr.cast::<Cell<T, S>>(),
26 }
27 }
28
29 fn header_ptr(&self) -> NonNull<Header> {
30 self.cell.cast()
31 }
32
33 fn header(&self) -> &Header {
34 unsafe { &*self.header_ptr().as_ptr() }
35 }
36
37 fn state(&self) -> &State {
38 &self.header().state
39 }
40
41 fn trailer(&self) -> &Trailer {
42 unsafe { &self.cell.as_ref().trailer }
43 }
44
45 fn core(&self) -> &Core<T, S> {
46 unsafe { &self.cell.as_ref().core }
47 }
48}
49
50/// Task operations that can be implemented without being generic over the
51/// scheduler or task. Only one version of these methods should exist in the
52/// final binary.
53impl RawTask {
54 pub(super) fn drop_reference(self) {
55 if self.state().ref_dec() {
56 self.dealloc();
57 }
58 }
59
60 /// This call consumes a ref-count and notifies the task. This will create a
61 /// new Notified and submit it if necessary.
62 ///
63 /// The caller does not need to hold a ref-count besides the one that was
64 /// passed to this call.
65 pub(super) fn wake_by_val(&self) {
66 use super::state::TransitionToNotifiedByVal;
67
68 match self.state().transition_to_notified_by_val() {
69 TransitionToNotifiedByVal::Submit => {
70 // The caller has given us a ref-count, and the transition has
71 // created a new ref-count, so we now hold two. We turn the new
72 // ref-count Notified and pass it to the call to `schedule`.
73 //
74 // The old ref-count is retained for now to ensure that the task
75 // is not dropped during the call to `schedule` if the call
76 // drops the task it was given.
77 self.schedule();
78
79 // Now that we have completed the call to schedule, we can
80 // release our ref-count.
81 self.drop_reference();
82 }
83 TransitionToNotifiedByVal::Dealloc => {
84 self.dealloc();
85 }
86 TransitionToNotifiedByVal::DoNothing => {}
87 }
88 }
89
90 /// This call notifies the task. It will not consume any ref-counts, but the
91 /// caller should hold a ref-count. This will create a new Notified and
92 /// submit it if necessary.
93 pub(super) fn wake_by_ref(&self) {
94 use super::state::TransitionToNotifiedByRef;
95
96 match self.state().transition_to_notified_by_ref() {
97 TransitionToNotifiedByRef::Submit => {
98 // The transition above incremented the ref-count for a new task
99 // and the caller also holds a ref-count. The caller's ref-count
100 // ensures that the task is not destroyed even if the new task
101 // is dropped before `schedule` returns.
102 self.schedule();
103 }
104 TransitionToNotifiedByRef::DoNothing => {}
105 }
106 }
107
108 /// Remotely aborts the task.
109 ///
110 /// The caller should hold a ref-count, but we do not consume it.
111 ///
112 /// This is similar to `shutdown` except that it asks the runtime to perform
113 /// the shutdown. This is necessary to avoid the shutdown happening in the
114 /// wrong thread for non-Send tasks.
115 pub(super) fn remote_abort(&self) {
116 if self.state().transition_to_notified_and_cancel() {
117 // The transition has created a new ref-count, which we turn into
118 // a Notified and pass to the task.
119 //
120 // Since the caller holds a ref-count, the task cannot be destroyed
121 // before the call to `schedule` returns even if the call drops the
122 // `Notified` internally.
123 self.schedule();
124 }
125 }
126
127 /// Try to set the waker notified when the task is complete. Returns true if
128 /// the task has already completed. If this call returns false, then the
129 /// waker will not be notified.
130 pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
131 can_read_output(self.header(), self.trailer(), waker)
132 }
133}
134
135impl<T, S> Harness<T, S>
136where
137 T: Future,
138 S: Schedule,
139{
140 pub(super) fn drop_reference(self) {
141 if self.state().ref_dec() {
142 self.dealloc();
143 }
144 }
145
146 /// Polls the inner future. A ref-count is consumed.
147 ///
148 /// All necessary state checks and transitions are performed.
149 /// Panics raised while polling the future are handled.
150 pub(super) fn poll(self) {
151 // We pass our ref-count to `poll_inner`.
152 match self.poll_inner() {
153 PollFuture::Notified => {
154 // The `poll_inner` call has given us two ref-counts back.
155 // We give one of them to a new task and call `yield_now`.
156 self.core()
157 .scheduler
158 .yield_now(Notified(self.get_new_task()));
159
160 // The remaining ref-count is now dropped. We kept the extra
161 // ref-count until now to ensure that even if the `yield_now`
162 // call drops the provided task, the task isn't deallocated
163 // before after `yield_now` returns.
164 self.drop_reference();
165 }
166 PollFuture::Complete => {
167 self.complete();
168 }
169 PollFuture::Dealloc => {
170 self.dealloc();
171 }
172 PollFuture::Done => (),
173 }
174 }
175
176 /// Polls the task and cancel it if necessary. This takes ownership of a
177 /// ref-count.
178 ///
179 /// If the return value is Notified, the caller is given ownership of two
180 /// ref-counts.
181 ///
182 /// If the return value is Complete, the caller is given ownership of a
183 /// single ref-count, which should be passed on to `complete`.
184 ///
185 /// If the return value is Dealloc, then this call consumed the last
186 /// ref-count and the caller should call `dealloc`.
187 ///
188 /// Otherwise the ref-count is consumed and the caller should not access
189 /// `self` again.
190 fn poll_inner(&self) -> PollFuture {
191 use super::state::{TransitionToIdle, TransitionToRunning};
192
193 match self.state().transition_to_running() {
194 TransitionToRunning::Success => {
195 let header_ptr = self.header_ptr();
196 let waker_ref = waker_ref::<T, S>(&header_ptr);
197 let cx = Context::from_waker(&waker_ref);
198 let res = poll_future(self.core(), cx);
199
200 if res == Poll::Ready(()) {
201 // The future completed. Move on to complete the task.
202 return PollFuture::Complete;
203 }
204
205 match self.state().transition_to_idle() {
206 TransitionToIdle::Ok => PollFuture::Done,
207 TransitionToIdle::OkNotified => PollFuture::Notified,
208 TransitionToIdle::OkDealloc => PollFuture::Dealloc,
209 TransitionToIdle::Cancelled => {
210 // The transition to idle failed because the task was
211 // cancelled during the poll.
212 cancel_task(self.core());
213 PollFuture::Complete
214 }
215 }
216 }
217 TransitionToRunning::Cancelled => {
218 cancel_task(self.core());
219 PollFuture::Complete
220 }
221 TransitionToRunning::Failed => PollFuture::Done,
222 TransitionToRunning::Dealloc => PollFuture::Dealloc,
223 }
224 }
225
226 /// Forcibly shuts down the task.
227 ///
228 /// Attempt to transition to `Running` in order to forcibly shutdown the
229 /// task. If the task is currently running or in a state of completion, then
230 /// there is nothing further to do. When the task completes running, it will
231 /// notice the `CANCELLED` bit and finalize the task.
232 pub(super) fn shutdown(self) {
233 if !self.state().transition_to_shutdown() {
234 // The task is concurrently running. No further work needed.
235 self.drop_reference();
236 return;
237 }
238
239 // By transitioning the lifecycle to `Running`, we have permission to
240 // drop the future.
241 cancel_task(self.core());
242 self.complete();
243 }
244
245 pub(super) fn dealloc(self) {
246 // Release the join waker, if there is one.
247 self.trailer().waker.with_mut(drop);
248
249 // Check causality
250 self.core().stage.with_mut(drop);
251
252 // Safety: The caller of this method just transitioned our ref-count to
253 // zero, so it is our responsibility to release the allocation.
254 //
255 // We don't hold any references into the allocation at this point, but
256 // it is possible for another thread to still hold a `&State` into the
257 // allocation if that other thread has decremented its last ref-count,
258 // but has not yet returned from the relevant method on `State`.
259 //
260 // However, the `State` type consists of just an `AtomicUsize`, and an
261 // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
262 // As explained in the documentation for `UnsafeCell`, such references
263 // are allowed to be dangling after their last use, even if the
264 // reference has not yet gone out of scope.
265 unsafe {
266 drop(Box::from_raw(self.cell.as_ptr()));
267 }
268 }
269
270 // ===== join handle =====
271
272 /// Read the task output into `dst`.
273 pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
274 if can_read_output(self.header(), self.trailer(), waker) {
275 *dst = Poll::Ready(self.core().take_output());
276 }
277 }
278
279 pub(super) fn drop_join_handle_slow(self) {
280 // Try to unset `JOIN_INTEREST`. This must be done as a first step in
281 // case the task concurrently completed.
282 if self.state().unset_join_interested().is_err() {
283 // It is our responsibility to drop the output. This is critical as
284 // the task output may not be `Send` and as such must remain with
285 // the scheduler or `JoinHandle`. i.e. if the output remains in the
286 // task structure until the task is deallocated, it may be dropped
287 // by a Waker on any arbitrary thread.
288 //
289 // Panics are delivered to the user via the `JoinHandle`. Given that
290 // they are dropping the `JoinHandle`, we assume they are not
291 // interested in the panic and swallow it.
292 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
293 self.core().drop_future_or_output();
294 }));
295 }
296
297 // Drop the `JoinHandle` reference, possibly deallocating the task
298 self.drop_reference();
299 }
300
301 // ====== internal ======
302
303 /// Completes the task. This method assumes that the state is RUNNING.
304 fn complete(self) {
305 // The future has completed and its output has been written to the task
306 // stage. We transition from running to complete.
307
308 let snapshot = self.state().transition_to_complete();
309
310 // We catch panics here in case dropping the future or waking the
311 // JoinHandle panics.
312 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
313 if !snapshot.is_join_interested() {
314 // The `JoinHandle` is not interested in the output of
315 // this task. It is our responsibility to drop the
316 // output.
317 self.core().drop_future_or_output();
318 } else if snapshot.is_join_waker_set() {
319 // Notify the waker. Reading the waker field is safe per rule 4
320 // in task/mod.rs, since the JOIN_WAKER bit is set and the call
321 // to transition_to_complete() above set the COMPLETE bit.
322 self.trailer().wake_join();
323 }
324 }));
325
326 // The task has completed execution and will no longer be scheduled.
327 let num_release = self.release();
328
329 if self.state().transition_to_terminal(num_release) {
330 self.dealloc();
331 }
332 }
333
334 /// Releases the task from the scheduler. Returns the number of ref-counts
335 /// that should be decremented.
336 fn release(&self) -> usize {
337 // We don't actually increment the ref-count here, but the new task is
338 // never destroyed, so that's ok.
339 let me = ManuallyDrop::new(self.get_new_task());
340
341 if let Some(task) = self.core().scheduler.release(&me) {
342 mem::forget(task);
343 2
344 } else {
345 1
346 }
347 }
348
349 /// Creates a new task that holds its own ref-count.
350 ///
351 /// # Safety
352 ///
353 /// Any use of `self` after this call must ensure that a ref-count to the
354 /// task holds the task alive until after the use of `self`. Passing the
355 /// returned Task to any method on `self` is unsound if dropping the Task
356 /// could drop `self` before the call on `self` returned.
357 fn get_new_task(&self) -> Task<S> {
358 // safety: The header is at the beginning of the cell, so this cast is
359 // safe.
360 unsafe { Task::from_raw(self.cell.cast()) }
361 }
362}
363
364fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
365 // Load a snapshot of the current task state
366 let snapshot = header.state.load();
367
368 debug_assert!(snapshot.is_join_interested());
369
370 if !snapshot.is_complete() {
371 // If the task is not complete, try storing the provided waker in the
372 // task's waker field.
373
374 let res = if snapshot.is_join_waker_set() {
375 // If JOIN_WAKER is set, then JoinHandle has previously stored a
376 // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
377
378 // Optimization: if the stored waker and the provided waker wake the
379 // same task, then return without touching the waker field. (Reading
380 // the waker field below is safe per rule 3 in task/mod.rs.)
381 if unsafe { trailer.will_wake(waker) } {
382 return false;
383 }
384
385 // Otherwise swap the stored waker with the provided waker by
386 // following the rule 5 in task/mod.rs.
387 header
388 .state
389 .unset_waker()
390 .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
391 } else {
392 // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
393 // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
394 // of rule 5 and try to store the provided waker in the waker field.
395 set_join_waker(header, trailer, waker.clone(), snapshot)
396 };
397
398 match res {
399 Ok(_) => return false,
400 Err(snapshot) => {
401 assert!(snapshot.is_complete());
402 }
403 }
404 }
405 true
406}
407
408fn set_join_waker(
409 header: &Header,
410 trailer: &Trailer,
411 waker: Waker,
412 snapshot: Snapshot,
413) -> Result<Snapshot, Snapshot> {
414 assert!(snapshot.is_join_interested());
415 assert!(!snapshot.is_join_waker_set());
416
417 // Safety: Only the `JoinHandle` may set the `waker` field. When
418 // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
419 unsafe {
420 trailer.set_waker(Some(waker));
421 }
422
423 // Update the `JoinWaker` state accordingly
424 let res: Result = header.state.set_join_waker();
425
426 // If the state could not be updated, then clear the join waker
427 if res.is_err() {
428 unsafe {
429 trailer.set_waker(None);
430 }
431 }
432
433 res
434}
435
436enum PollFuture {
437 Complete,
438 Notified,
439 Done,
440 Dealloc,
441}
442
443/// Cancels the task and store the appropriate error in the stage field.
444fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
445 // Drop the future from a panic guard.
446 let res: Result<(), Box> = panic::catch_unwind(panic::AssertUnwindSafe(|| {
447 core.drop_future_or_output();
448 }));
449
450 match res {
451 Ok(()) => {
452 core.store_output(Err(JoinError::cancelled(core.task_id)));
453 }
454 Err(panic: Box) => {
455 core.store_output(Err(JoinError::panic(core.task_id, err:panic)));
456 }
457 }
458}
459
460/// Polls the future. If the future completes, the output is written to the
461/// stage field.
462fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
463 // Poll the future.
464 let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
465 struct Guard<'a, T: Future, S: Schedule> {
466 core: &'a Core<T, S>,
467 }
468 impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
469 fn drop(&mut self) {
470 // If the future panics on poll, we drop it inside the panic
471 // guard.
472 self.core.drop_future_or_output();
473 }
474 }
475 let guard = Guard { core };
476 let res = guard.core.poll(cx);
477 mem::forget(guard);
478 res
479 }));
480
481 // Prepare output for being placed in the core stage.
482 let output = match output {
483 Ok(Poll::Pending) => return Poll::Pending,
484 Ok(Poll::Ready(output)) => Ok(output),
485 Err(panic) => {
486 core.scheduler.unhandled_panic();
487 Err(JoinError::panic(core.task_id, panic))
488 }
489 };
490
491 // Catch and ignore panics if the future panics on drop.
492 let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
493 core.store_output(output);
494 }));
495
496 if res.is_err() {
497 core.scheduler.unhandled_panic();
498 }
499
500 Poll::Ready(())
501}
502