1use crate::future::Future;
2use crate::runtime::task::core::{Cell, Core, Header, Trailer};
3use crate::runtime::task::state::{Snapshot, State};
4use crate::runtime::task::waker::waker_ref;
5use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
6
7use std::any::Any;
8use std::mem;
9use std::mem::ManuallyDrop;
10use std::panic;
11use std::ptr::NonNull;
12use std::task::{Context, Poll, Waker};
13
14/// Typed raw task handle.
15pub(super) struct Harness<T: Future, S: 'static> {
16 cell: NonNull<Cell<T, S>>,
17}
18
19impl<T, S> Harness<T, S>
20where
21 T: Future,
22 S: 'static,
23{
24 pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
25 Harness {
26 cell: ptr.cast::<Cell<T, S>>(),
27 }
28 }
29
30 fn header_ptr(&self) -> NonNull<Header> {
31 self.cell.cast()
32 }
33
34 fn header(&self) -> &Header {
35 unsafe { &*self.header_ptr().as_ptr() }
36 }
37
38 fn state(&self) -> &State {
39 &self.header().state
40 }
41
42 fn trailer(&self) -> &Trailer {
43 unsafe { &self.cell.as_ref().trailer }
44 }
45
46 fn core(&self) -> &Core<T, S> {
47 unsafe { &self.cell.as_ref().core }
48 }
49}
50
51/// Task operations that can be implemented without being generic over the
52/// scheduler or task. Only one version of these methods should exist in the
53/// final binary.
54impl RawTask {
55 pub(super) fn drop_reference(self) {
56 if self.state().ref_dec() {
57 self.dealloc();
58 }
59 }
60
61 /// This call consumes a ref-count and notifies the task. This will create a
62 /// new Notified and submit it if necessary.
63 ///
64 /// The caller does not need to hold a ref-count besides the one that was
65 /// passed to this call.
66 pub(super) fn wake_by_val(&self) {
67 use super::state::TransitionToNotifiedByVal;
68
69 match self.state().transition_to_notified_by_val() {
70 TransitionToNotifiedByVal::Submit => {
71 // The caller has given us a ref-count, and the transition has
72 // created a new ref-count, so we now hold two. We turn the new
73 // ref-count Notified and pass it to the call to `schedule`.
74 //
75 // The old ref-count is retained for now to ensure that the task
76 // is not dropped during the call to `schedule` if the call
77 // drops the task it was given.
78 self.schedule();
79
80 // Now that we have completed the call to schedule, we can
81 // release our ref-count.
82 self.drop_reference();
83 }
84 TransitionToNotifiedByVal::Dealloc => {
85 self.dealloc();
86 }
87 TransitionToNotifiedByVal::DoNothing => {}
88 }
89 }
90
91 /// This call notifies the task. It will not consume any ref-counts, but the
92 /// caller should hold a ref-count. This will create a new Notified and
93 /// submit it if necessary.
94 pub(super) fn wake_by_ref(&self) {
95 use super::state::TransitionToNotifiedByRef;
96
97 match self.state().transition_to_notified_by_ref() {
98 TransitionToNotifiedByRef::Submit => {
99 // The transition above incremented the ref-count for a new task
100 // and the caller also holds a ref-count. The caller's ref-count
101 // ensures that the task is not destroyed even if the new task
102 // is dropped before `schedule` returns.
103 self.schedule();
104 }
105 TransitionToNotifiedByRef::DoNothing => {}
106 }
107 }
108
109 /// Remotely aborts the task.
110 ///
111 /// The caller should hold a ref-count, but we do not consume it.
112 ///
113 /// This is similar to `shutdown` except that it asks the runtime to perform
114 /// the shutdown. This is necessary to avoid the shutdown happening in the
115 /// wrong thread for non-Send tasks.
116 pub(super) fn remote_abort(&self) {
117 if self.state().transition_to_notified_and_cancel() {
118 // The transition has created a new ref-count, which we turn into
119 // a Notified and pass to the task.
120 //
121 // Since the caller holds a ref-count, the task cannot be destroyed
122 // before the call to `schedule` returns even if the call drops the
123 // `Notified` internally.
124 self.schedule();
125 }
126 }
127
128 /// Try to set the waker notified when the task is complete. Returns true if
129 /// the task has already completed. If this call returns false, then the
130 /// waker will not be notified.
131 pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
132 can_read_output(self.header(), self.trailer(), waker)
133 }
134}
135
136impl<T, S> Harness<T, S>
137where
138 T: Future,
139 S: Schedule,
140{
141 pub(super) fn drop_reference(self) {
142 if self.state().ref_dec() {
143 self.dealloc();
144 }
145 }
146
147 /// Polls the inner future. A ref-count is consumed.
148 ///
149 /// All necessary state checks and transitions are performed.
150 /// Panics raised while polling the future are handled.
151 pub(super) fn poll(self) {
152 // We pass our ref-count to `poll_inner`.
153 match self.poll_inner() {
154 PollFuture::Notified => {
155 // The `poll_inner` call has given us two ref-counts back.
156 // We give one of them to a new task and call `yield_now`.
157 self.core()
158 .scheduler
159 .yield_now(Notified(self.get_new_task()));
160
161 // The remaining ref-count is now dropped. We kept the extra
162 // ref-count until now to ensure that even if the `yield_now`
163 // call drops the provided task, the task isn't deallocated
164 // before after `yield_now` returns.
165 self.drop_reference();
166 }
167 PollFuture::Complete => {
168 self.complete();
169 }
170 PollFuture::Dealloc => {
171 self.dealloc();
172 }
173 PollFuture::Done => (),
174 }
175 }
176
177 /// Polls the task and cancel it if necessary. This takes ownership of a
178 /// ref-count.
179 ///
180 /// If the return value is Notified, the caller is given ownership of two
181 /// ref-counts.
182 ///
183 /// If the return value is Complete, the caller is given ownership of a
184 /// single ref-count, which should be passed on to `complete`.
185 ///
186 /// If the return value is `Dealloc`, then this call consumed the last
187 /// ref-count and the caller should call `dealloc`.
188 ///
189 /// Otherwise the ref-count is consumed and the caller should not access
190 /// `self` again.
191 fn poll_inner(&self) -> PollFuture {
192 use super::state::{TransitionToIdle, TransitionToRunning};
193
194 match self.state().transition_to_running() {
195 TransitionToRunning::Success => {
196 // Separated to reduce LLVM codegen
197 fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture {
198 match result {
199 TransitionToIdle::Ok => PollFuture::Done,
200 TransitionToIdle::OkNotified => PollFuture::Notified,
201 TransitionToIdle::OkDealloc => PollFuture::Dealloc,
202 TransitionToIdle::Cancelled => PollFuture::Complete,
203 }
204 }
205 let header_ptr = self.header_ptr();
206 let waker_ref = waker_ref::<S>(&header_ptr);
207 let cx = Context::from_waker(&waker_ref);
208 let res = poll_future(self.core(), cx);
209
210 if res == Poll::Ready(()) {
211 // The future completed. Move on to complete the task.
212 return PollFuture::Complete;
213 }
214
215 let transition_res = self.state().transition_to_idle();
216 if let TransitionToIdle::Cancelled = transition_res {
217 // The transition to idle failed because the task was
218 // cancelled during the poll.
219 cancel_task(self.core());
220 }
221 transition_result_to_poll_future(transition_res)
222 }
223 TransitionToRunning::Cancelled => {
224 cancel_task(self.core());
225 PollFuture::Complete
226 }
227 TransitionToRunning::Failed => PollFuture::Done,
228 TransitionToRunning::Dealloc => PollFuture::Dealloc,
229 }
230 }
231
232 /// Forcibly shuts down the task.
233 ///
234 /// Attempt to transition to `Running` in order to forcibly shutdown the
235 /// task. If the task is currently running or in a state of completion, then
236 /// there is nothing further to do. When the task completes running, it will
237 /// notice the `CANCELLED` bit and finalize the task.
238 pub(super) fn shutdown(self) {
239 if !self.state().transition_to_shutdown() {
240 // The task is concurrently running. No further work needed.
241 self.drop_reference();
242 return;
243 }
244
245 // By transitioning the lifecycle to `Running`, we have permission to
246 // drop the future.
247 cancel_task(self.core());
248 self.complete();
249 }
250
251 pub(super) fn dealloc(self) {
252 // Release the join waker, if there is one.
253 self.trailer().waker.with_mut(drop);
254
255 // Check causality
256 self.core().stage.with_mut(drop);
257
258 // Safety: The caller of this method just transitioned our ref-count to
259 // zero, so it is our responsibility to release the allocation.
260 //
261 // We don't hold any references into the allocation at this point, but
262 // it is possible for another thread to still hold a `&State` into the
263 // allocation if that other thread has decremented its last ref-count,
264 // but has not yet returned from the relevant method on `State`.
265 //
266 // However, the `State` type consists of just an `AtomicUsize`, and an
267 // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
268 // As explained in the documentation for `UnsafeCell`, such references
269 // are allowed to be dangling after their last use, even if the
270 // reference has not yet gone out of scope.
271 unsafe {
272 drop(Box::from_raw(self.cell.as_ptr()));
273 }
274 }
275
276 // ===== join handle =====
277
278 /// Read the task output into `dst`.
279 pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
280 if can_read_output(self.header(), self.trailer(), waker) {
281 *dst = Poll::Ready(self.core().take_output());
282 }
283 }
284
285 pub(super) fn drop_join_handle_slow(self) {
286 // Try to unset `JOIN_INTEREST`. This must be done as a first step in
287 // case the task concurrently completed.
288 if self.state().unset_join_interested().is_err() {
289 // It is our responsibility to drop the output. This is critical as
290 // the task output may not be `Send` and as such must remain with
291 // the scheduler or `JoinHandle`. i.e. if the output remains in the
292 // task structure until the task is deallocated, it may be dropped
293 // by a Waker on any arbitrary thread.
294 //
295 // Panics are delivered to the user via the `JoinHandle`. Given that
296 // they are dropping the `JoinHandle`, we assume they are not
297 // interested in the panic and swallow it.
298 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
299 self.core().drop_future_or_output();
300 }));
301 }
302
303 // Drop the `JoinHandle` reference, possibly deallocating the task
304 self.drop_reference();
305 }
306
307 // ====== internal ======
308
309 /// Completes the task. This method assumes that the state is RUNNING.
310 fn complete(self) {
311 // The future has completed and its output has been written to the task
312 // stage. We transition from running to complete.
313
314 let snapshot = self.state().transition_to_complete();
315
316 // We catch panics here in case dropping the future or waking the
317 // JoinHandle panics.
318 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
319 if !snapshot.is_join_interested() {
320 // The `JoinHandle` is not interested in the output of
321 // this task. It is our responsibility to drop the
322 // output.
323 self.core().drop_future_or_output();
324 } else if snapshot.is_join_waker_set() {
325 // Notify the waker. Reading the waker field is safe per rule 4
326 // in task/mod.rs, since the JOIN_WAKER bit is set and the call
327 // to transition_to_complete() above set the COMPLETE bit.
328 self.trailer().wake_join();
329 }
330 }));
331
332 // The task has completed execution and will no longer be scheduled.
333 let num_release = self.release();
334
335 if self.state().transition_to_terminal(num_release) {
336 self.dealloc();
337 }
338 }
339
340 /// Releases the task from the scheduler. Returns the number of ref-counts
341 /// that should be decremented.
342 fn release(&self) -> usize {
343 // We don't actually increment the ref-count here, but the new task is
344 // never destroyed, so that's ok.
345 let me = ManuallyDrop::new(self.get_new_task());
346
347 if let Some(task) = self.core().scheduler.release(&me) {
348 mem::forget(task);
349 2
350 } else {
351 1
352 }
353 }
354
355 /// Creates a new task that holds its own ref-count.
356 ///
357 /// # Safety
358 ///
359 /// Any use of `self` after this call must ensure that a ref-count to the
360 /// task holds the task alive until after the use of `self`. Passing the
361 /// returned Task to any method on `self` is unsound if dropping the Task
362 /// could drop `self` before the call on `self` returned.
363 fn get_new_task(&self) -> Task<S> {
364 // safety: The header is at the beginning of the cell, so this cast is
365 // safe.
366 unsafe { Task::from_raw(self.cell.cast()) }
367 }
368}
369
370fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
371 // Load a snapshot of the current task state
372 let snapshot = header.state.load();
373
374 debug_assert!(snapshot.is_join_interested());
375
376 if !snapshot.is_complete() {
377 // If the task is not complete, try storing the provided waker in the
378 // task's waker field.
379
380 let res = if snapshot.is_join_waker_set() {
381 // If JOIN_WAKER is set, then JoinHandle has previously stored a
382 // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
383
384 // Optimization: if the stored waker and the provided waker wake the
385 // same task, then return without touching the waker field. (Reading
386 // the waker field below is safe per rule 3 in task/mod.rs.)
387 if unsafe { trailer.will_wake(waker) } {
388 return false;
389 }
390
391 // Otherwise swap the stored waker with the provided waker by
392 // following the rule 5 in task/mod.rs.
393 header
394 .state
395 .unset_waker()
396 .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
397 } else {
398 // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
399 // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
400 // of rule 5 and try to store the provided waker in the waker field.
401 set_join_waker(header, trailer, waker.clone(), snapshot)
402 };
403
404 match res {
405 Ok(_) => return false,
406 Err(snapshot) => {
407 assert!(snapshot.is_complete());
408 }
409 }
410 }
411 true
412}
413
414fn set_join_waker(
415 header: &Header,
416 trailer: &Trailer,
417 waker: Waker,
418 snapshot: Snapshot,
419) -> Result<Snapshot, Snapshot> {
420 assert!(snapshot.is_join_interested());
421 assert!(!snapshot.is_join_waker_set());
422
423 // Safety: Only the `JoinHandle` may set the `waker` field. When
424 // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
425 unsafe {
426 trailer.set_waker(Some(waker));
427 }
428
429 // Update the `JoinWaker` state accordingly
430 let res: Result = header.state.set_join_waker();
431
432 // If the state could not be updated, then clear the join waker
433 if res.is_err() {
434 unsafe {
435 trailer.set_waker(None);
436 }
437 }
438
439 res
440}
441
442enum PollFuture {
443 Complete,
444 Notified,
445 Done,
446 Dealloc,
447}
448
449/// Cancels the task and store the appropriate error in the stage field.
450fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
451 // Drop the future from a panic guard.
452 let res: Result<(), Box> = panic::catch_unwind(panic::AssertUnwindSafe(|| {
453 core.drop_future_or_output();
454 }));
455
456 core.store_output(Err(panic_result_to_join_error(core.task_id, res)));
457}
458
459fn panic_result_to_join_error(
460 task_id: Id,
461 res: Result<(), Box<dyn Any + Send + 'static>>,
462) -> JoinError {
463 match res {
464 Ok(()) => JoinError::cancelled(task_id),
465 Err(panic: Box) => JoinError::panic(task_id, err:panic),
466 }
467}
468
469/// Polls the future. If the future completes, the output is written to the
470/// stage field.
471fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
472 // Poll the future.
473 let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
474 struct Guard<'a, T: Future, S: Schedule> {
475 core: &'a Core<T, S>,
476 }
477 impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
478 fn drop(&mut self) {
479 // If the future panics on poll, we drop it inside the panic
480 // guard.
481 self.core.drop_future_or_output();
482 }
483 }
484 let guard = Guard { core };
485 let res = guard.core.poll(cx);
486 mem::forget(guard);
487 res
488 }));
489
490 // Prepare output for being placed in the core stage.
491 let output = match output {
492 Ok(Poll::Pending) => return Poll::Pending,
493 Ok(Poll::Ready(output)) => Ok(output),
494 Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)),
495 };
496
497 // Catch and ignore panics if the future panics on drop.
498 let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
499 core.store_output(output);
500 }));
501
502 if res.is_err() {
503 core.scheduler.unhandled_panic();
504 }
505
506 Poll::Ready(())
507}
508
509#[cold]
510fn panic_to_error<S: Schedule>(
511 scheduler: &S,
512 task_id: Id,
513 panic: Box<dyn Any + Send + 'static>,
514) -> JoinError {
515 scheduler.unhandled_panic();
516 JoinError::panic(task_id, err:panic)
517}
518