1use alloc::alloc::Layout as StdLayout;
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::mem::{self, ManuallyDrop};
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
8
9#[cfg(not(feature = "portable-atomic"))]
10use core::sync::atomic::AtomicUsize;
11use core::sync::atomic::Ordering;
12#[cfg(feature = "portable-atomic")]
13use portable_atomic::AtomicUsize;
14
15use crate::header::Header;
16use crate::runnable::{Schedule, ScheduleInfo};
17use crate::state::*;
18use crate::utils::{abort, abort_on_panic, max, Layout};
19use crate::Runnable;
20
21#[cfg(feature = "std")]
22pub(crate) type Panic = alloc::boxed::Box<dyn core::any::Any + Send + 'static>;
23
24#[cfg(not(feature = "std"))]
25pub(crate) type Panic = core::convert::Infallible;
26
27/// The vtable for a task.
28pub(crate) struct TaskVTable {
29 /// Schedules the task.
30 pub(crate) schedule: unsafe fn(*const (), ScheduleInfo),
31
32 /// Drops the future inside the task.
33 pub(crate) drop_future: unsafe fn(*const ()),
34
35 /// Returns a pointer to the output stored after completion.
36 pub(crate) get_output: unsafe fn(*const ()) -> *const (),
37
38 /// Drops the task reference (`Runnable` or `Waker`).
39 pub(crate) drop_ref: unsafe fn(ptr: *const ()),
40
41 /// Destroys the task.
42 pub(crate) destroy: unsafe fn(*const ()),
43
44 /// Runs the task.
45 pub(crate) run: unsafe fn(*const ()) -> bool,
46
47 /// Creates a new waker associated with the task.
48 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
49
50 /// The memory layout of the task. This information enables
51 /// debuggers to decode raw task memory blobs. Do not remove
52 /// the field, even if it appears to be unused.
53 #[allow(unused)]
54 pub(crate) layout_info: &'static TaskLayout,
55}
56
57/// Memory layout of a task.
58///
59/// This struct contains the following information:
60///
61/// 1. How to allocate and deallocate the task.
62/// 2. How to access the fields inside the task.
63#[derive(Clone, Copy)]
64pub(crate) struct TaskLayout {
65 /// Memory layout of the whole task.
66 pub(crate) layout: StdLayout,
67
68 /// Offset into the task at which the schedule function is stored.
69 pub(crate) offset_s: usize,
70
71 /// Offset into the task at which the future is stored.
72 pub(crate) offset_f: usize,
73
74 /// Offset into the task at which the output is stored.
75 pub(crate) offset_r: usize,
76}
77
78/// Raw pointers to the fields inside a task.
79pub(crate) struct RawTask<F, T, S, M> {
80 /// The task header.
81 pub(crate) header: *const Header<M>,
82
83 /// The schedule function.
84 pub(crate) schedule: *const S,
85
86 /// The future.
87 pub(crate) future: *mut F,
88
89 /// The output of the future.
90 pub(crate) output: *mut Result<T, Panic>,
91}
92
93impl<F, T, S, M> Copy for RawTask<F, T, S, M> {}
94
95impl<F, T, S, M> Clone for RawTask<F, T, S, M> {
96 fn clone(&self) -> Self {
97 *self
98 }
99}
100
101impl<F, T, S, M> RawTask<F, T, S, M> {
102 const TASK_LAYOUT: TaskLayout = Self::eval_task_layout();
103
104 /// Computes the memory layout for a task.
105 #[inline]
106 const fn eval_task_layout() -> TaskLayout {
107 // Compute the layouts for `Header`, `S`, `F`, and `T`.
108 let layout_header = Layout::new::<Header<M>>();
109 let layout_s = Layout::new::<S>();
110 let layout_f = Layout::new::<F>();
111 let layout_r = Layout::new::<Result<T, Panic>>();
112
113 // Compute the layout for `union { F, T }`.
114 let size_union = max(layout_f.size(), layout_r.size());
115 let align_union = max(layout_f.align(), layout_r.align());
116 let layout_union = Layout::from_size_align(size_union, align_union);
117
118 // Compute the layout for `Header` followed `S` and `union { F, T }`.
119 let layout = layout_header;
120 let (layout, offset_s) = leap_unwrap!(layout.extend(layout_s));
121 let (layout, offset_union) = leap_unwrap!(layout.extend(layout_union));
122 let offset_f = offset_union;
123 let offset_r = offset_union;
124
125 TaskLayout {
126 layout: unsafe { layout.into_std() },
127 offset_s,
128 offset_f,
129 offset_r,
130 }
131 }
132}
133
134impl<F, T, S, M> RawTask<F, T, S, M>
135where
136 F: Future<Output = T>,
137 S: Schedule<M>,
138{
139 const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
140 Self::clone_waker,
141 Self::wake,
142 Self::wake_by_ref,
143 Self::drop_waker,
144 );
145
146 /// Allocates a task with the given `future` and `schedule` function.
147 ///
148 /// It is assumed that initially only the `Runnable` and the `Task` exist.
149 pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>(
150 future: Gen,
151 schedule: S,
152 builder: crate::Builder<M>,
153 ) -> NonNull<()>
154 where
155 F: 'a,
156 M: 'a,
157 {
158 // Compute the layout of the task for allocation. Abort if the computation fails.
159 //
160 // n.b. notgull: task_layout now automatically aborts instead of panicking
161 let task_layout = Self::task_layout();
162
163 unsafe {
164 // Allocate enough space for the entire task.
165 let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
166 None => abort(),
167 Some(p) => p,
168 };
169
170 let raw = Self::from_ptr(ptr.as_ptr());
171
172 let crate::Builder {
173 metadata,
174 #[cfg(feature = "std")]
175 propagate_panic,
176 } = builder;
177
178 // Write the header as the first field of the task.
179 (raw.header as *mut Header<M>).write(Header {
180 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
181 awaiter: UnsafeCell::new(None),
182 vtable: &TaskVTable {
183 schedule: Self::schedule,
184 drop_future: Self::drop_future,
185 get_output: Self::get_output,
186 drop_ref: Self::drop_ref,
187 destroy: Self::destroy,
188 run: Self::run,
189 clone_waker: Self::clone_waker,
190 layout_info: &Self::TASK_LAYOUT,
191 },
192 metadata,
193 #[cfg(feature = "std")]
194 propagate_panic,
195 });
196
197 // Write the schedule function as the third field of the task.
198 (raw.schedule as *mut S).write(schedule);
199
200 // Generate the future, now that the metadata has been pinned in place.
201 let future = abort_on_panic(|| future(&(*raw.header).metadata));
202
203 // Write the future as the fourth field of the task.
204 raw.future.write(future);
205
206 ptr
207 }
208 }
209
210 /// Creates a `RawTask` from a raw task pointer.
211 #[inline]
212 pub(crate) fn from_ptr(ptr: *const ()) -> Self {
213 let task_layout = Self::task_layout();
214 let p = ptr as *const u8;
215
216 unsafe {
217 Self {
218 header: p as *const Header<M>,
219 schedule: p.add(task_layout.offset_s) as *const S,
220 future: p.add(task_layout.offset_f) as *mut F,
221 output: p.add(task_layout.offset_r) as *mut Result<T, Panic>,
222 }
223 }
224 }
225
226 /// Returns the layout of the task.
227 #[inline]
228 fn task_layout() -> TaskLayout {
229 Self::TASK_LAYOUT
230 }
231 /// Wakes a waker.
232 unsafe fn wake(ptr: *const ()) {
233 // This is just an optimization. If the schedule function has captured variables, then
234 // we'll do less reference counting if we wake the waker by reference and then drop it.
235 if mem::size_of::<S>() > 0 {
236 Self::wake_by_ref(ptr);
237 Self::drop_waker(ptr);
238 return;
239 }
240
241 let raw = Self::from_ptr(ptr);
242
243 let mut state = (*raw.header).state.load(Ordering::Acquire);
244
245 loop {
246 // If the task is completed or closed, it can't be woken up.
247 if state & (COMPLETED | CLOSED) != 0 {
248 // Drop the waker.
249 Self::drop_waker(ptr);
250 break;
251 }
252
253 // If the task is already scheduled, we just need to synchronize with the thread that
254 // will run the task by "publishing" our current view of the memory.
255 if state & SCHEDULED != 0 {
256 // Update the state without actually modifying it.
257 match (*raw.header).state.compare_exchange_weak(
258 state,
259 state,
260 Ordering::AcqRel,
261 Ordering::Acquire,
262 ) {
263 Ok(_) => {
264 // Drop the waker.
265 Self::drop_waker(ptr);
266 break;
267 }
268 Err(s) => state = s,
269 }
270 } else {
271 // Mark the task as scheduled.
272 match (*raw.header).state.compare_exchange_weak(
273 state,
274 state | SCHEDULED,
275 Ordering::AcqRel,
276 Ordering::Acquire,
277 ) {
278 Ok(_) => {
279 // If the task is not yet scheduled and isn't currently running, now is the
280 // time to schedule it.
281 if state & RUNNING == 0 {
282 // Schedule the task.
283 Self::schedule(ptr, ScheduleInfo::new(false));
284 } else {
285 // Drop the waker.
286 Self::drop_waker(ptr);
287 }
288
289 break;
290 }
291 Err(s) => state = s,
292 }
293 }
294 }
295 }
296
297 /// Wakes a waker by reference.
298 unsafe fn wake_by_ref(ptr: *const ()) {
299 let raw = Self::from_ptr(ptr);
300
301 let mut state = (*raw.header).state.load(Ordering::Acquire);
302
303 loop {
304 // If the task is completed or closed, it can't be woken up.
305 if state & (COMPLETED | CLOSED) != 0 {
306 break;
307 }
308
309 // If the task is already scheduled, we just need to synchronize with the thread that
310 // will run the task by "publishing" our current view of the memory.
311 if state & SCHEDULED != 0 {
312 // Update the state without actually modifying it.
313 match (*raw.header).state.compare_exchange_weak(
314 state,
315 state,
316 Ordering::AcqRel,
317 Ordering::Acquire,
318 ) {
319 Ok(_) => break,
320 Err(s) => state = s,
321 }
322 } else {
323 // If the task is not running, we can schedule right away.
324 let new = if state & RUNNING == 0 {
325 (state | SCHEDULED) + REFERENCE
326 } else {
327 state | SCHEDULED
328 };
329
330 // Mark the task as scheduled.
331 match (*raw.header).state.compare_exchange_weak(
332 state,
333 new,
334 Ordering::AcqRel,
335 Ordering::Acquire,
336 ) {
337 Ok(_) => {
338 // If the task is not running, now is the time to schedule.
339 if state & RUNNING == 0 {
340 // If the reference count overflowed, abort.
341 if state > isize::MAX as usize {
342 abort();
343 }
344
345 // Schedule the task. There is no need to call `Self::schedule(ptr)`
346 // because the schedule function cannot be destroyed while the waker is
347 // still alive.
348 let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ()));
349 (*raw.schedule).schedule(task, ScheduleInfo::new(false));
350 }
351
352 break;
353 }
354 Err(s) => state = s,
355 }
356 }
357 }
358 }
359
360 /// Clones a waker.
361 unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
362 let raw = Self::from_ptr(ptr);
363
364 // Increment the reference count. With any kind of reference-counted data structure,
365 // relaxed ordering is appropriate when incrementing the counter.
366 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
367
368 // If the reference count overflowed, abort.
369 if state > isize::MAX as usize {
370 abort();
371 }
372
373 RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
374 }
375
376 /// Drops a waker.
377 ///
378 /// This function will decrement the reference count. If it drops down to zero, the associated
379 /// `Task` has been dropped too, and the task has not been completed, then it will get
380 /// scheduled one more time so that its future gets dropped by the executor.
381 #[inline]
382 unsafe fn drop_waker(ptr: *const ()) {
383 let raw = Self::from_ptr(ptr);
384
385 // Decrement the reference count.
386 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
387
388 // If this was the last reference to the task and the `Task` has been dropped too,
389 // then we need to decide how to destroy the task.
390 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
391 if new & (COMPLETED | CLOSED) == 0 {
392 // If the task was not completed nor closed, close it and schedule one more time so
393 // that its future gets dropped by the executor.
394 (*raw.header)
395 .state
396 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
397 Self::schedule(ptr, ScheduleInfo::new(false));
398 } else {
399 // Otherwise, destroy the task right away.
400 Self::destroy(ptr);
401 }
402 }
403 }
404
405 /// Drops a task reference (`Runnable` or `Waker`).
406 ///
407 /// This function will decrement the reference count. If it drops down to zero and the
408 /// associated `Task` handle has been dropped too, then the task gets destroyed.
409 #[inline]
410 unsafe fn drop_ref(ptr: *const ()) {
411 let raw = Self::from_ptr(ptr);
412
413 // Decrement the reference count.
414 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
415
416 // If this was the last reference to the task and the `Task` has been dropped too,
417 // then destroy the task.
418 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
419 Self::destroy(ptr);
420 }
421 }
422
423 /// Schedules a task for running.
424 ///
425 /// This function doesn't modify the state of the task. It only passes the task reference to
426 /// its schedule function.
427 unsafe fn schedule(ptr: *const (), info: ScheduleInfo) {
428 let raw = Self::from_ptr(ptr);
429
430 // If the schedule function has captured variables, create a temporary waker that prevents
431 // the task from getting deallocated while the function is being invoked.
432 let _waker;
433 if mem::size_of::<S>() > 0 {
434 _waker = Waker::from_raw(Self::clone_waker(ptr));
435 }
436
437 let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ()));
438 (*raw.schedule).schedule(task, info);
439 }
440
441 /// Drops the future inside a task.
442 #[inline]
443 unsafe fn drop_future(ptr: *const ()) {
444 let raw = Self::from_ptr(ptr);
445
446 // We need a safeguard against panics because the destructor can panic.
447 abort_on_panic(|| {
448 raw.future.drop_in_place();
449 })
450 }
451
452 /// Returns a pointer to the output inside a task.
453 unsafe fn get_output(ptr: *const ()) -> *const () {
454 let raw = Self::from_ptr(ptr);
455 raw.output as *const ()
456 }
457
458 /// Cleans up task's resources and deallocates it.
459 ///
460 /// The schedule function will be dropped, and the task will then get deallocated.
461 /// The task must be closed before this function is called.
462 #[inline]
463 unsafe fn destroy(ptr: *const ()) {
464 let raw = Self::from_ptr(ptr);
465 let task_layout = Self::task_layout();
466
467 // We need a safeguard against panics because destructors can panic.
468 abort_on_panic(|| {
469 // Drop the header along with the metadata.
470 (raw.header as *mut Header<M>).drop_in_place();
471
472 // Drop the schedule function.
473 (raw.schedule as *mut S).drop_in_place();
474 });
475
476 // Finally, deallocate the memory reserved by the task.
477 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
478 }
479
480 /// Runs a task.
481 ///
482 /// If polling its future panics, the task will be closed and the panic will be propagated into
483 /// the caller.
484 unsafe fn run(ptr: *const ()) -> bool {
485 let raw = Self::from_ptr(ptr);
486
487 // Create a context from the raw task pointer and the vtable inside the its header.
488 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
489 let cx = &mut Context::from_waker(&waker);
490
491 let mut state = (*raw.header).state.load(Ordering::Acquire);
492
493 // Update the task's state before polling its future.
494 loop {
495 // If the task has already been closed, drop the task reference and return.
496 if state & CLOSED != 0 {
497 // Drop the future.
498 Self::drop_future(ptr);
499
500 // Mark the task as unscheduled.
501 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
502
503 // Take the awaiter out.
504 let mut awaiter = None;
505 if state & AWAITER != 0 {
506 awaiter = (*raw.header).take(None);
507 }
508
509 // Drop the task reference.
510 Self::drop_ref(ptr);
511
512 // Notify the awaiter that the future has been dropped.
513 if let Some(w) = awaiter {
514 abort_on_panic(|| w.wake());
515 }
516 return false;
517 }
518
519 // Mark the task as unscheduled and running.
520 match (*raw.header).state.compare_exchange_weak(
521 state,
522 (state & !SCHEDULED) | RUNNING,
523 Ordering::AcqRel,
524 Ordering::Acquire,
525 ) {
526 Ok(_) => {
527 // Update the state because we're continuing with polling the future.
528 state = (state & !SCHEDULED) | RUNNING;
529 break;
530 }
531 Err(s) => state = s,
532 }
533 }
534
535 // Poll the inner future, but surround it with a guard that closes the task in case polling
536 // panics.
537 // If available, we should also try to catch the panic so that it is propagated correctly.
538 let guard = Guard(raw);
539
540 // Panic propagation is not available for no_std.
541 #[cfg(not(feature = "std"))]
542 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok);
543
544 #[cfg(feature = "std")]
545 let poll = {
546 // Check if we should propagate panics.
547 if (*raw.header).propagate_panic {
548 // Use catch_unwind to catch the panic.
549 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
550 <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx)
551 })) {
552 Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
553 Ok(Poll::Pending) => Poll::Pending,
554 Err(e) => Poll::Ready(Err(e)),
555 }
556 } else {
557 <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok)
558 }
559 };
560
561 mem::forget(guard);
562
563 match poll {
564 Poll::Ready(out) => {
565 // Replace the future with its output.
566 Self::drop_future(ptr);
567 raw.output.write(out);
568
569 // The task is now completed.
570 loop {
571 // If the `Task` is dropped, we'll need to close it and drop the output.
572 let new = if state & TASK == 0 {
573 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
574 } else {
575 (state & !RUNNING & !SCHEDULED) | COMPLETED
576 };
577
578 // Mark the task as not running and completed.
579 match (*raw.header).state.compare_exchange_weak(
580 state,
581 new,
582 Ordering::AcqRel,
583 Ordering::Acquire,
584 ) {
585 Ok(_) => {
586 // If the `Task` is dropped or if the task was closed while running,
587 // now it's time to drop the output.
588 if state & TASK == 0 || state & CLOSED != 0 {
589 // Drop the output.
590 abort_on_panic(|| raw.output.drop_in_place());
591 }
592
593 // Take the awaiter out.
594 let mut awaiter = None;
595 if state & AWAITER != 0 {
596 awaiter = (*raw.header).take(None);
597 }
598
599 // Drop the task reference.
600 Self::drop_ref(ptr);
601
602 // Notify the awaiter that the future has been dropped.
603 if let Some(w) = awaiter {
604 abort_on_panic(|| w.wake());
605 }
606 break;
607 }
608 Err(s) => state = s,
609 }
610 }
611 }
612 Poll::Pending => {
613 let mut future_dropped = false;
614
615 // The task is still not completed.
616 loop {
617 // If the task was closed while running, we'll need to unschedule in case it
618 // was woken up and then destroy it.
619 let new = if state & CLOSED != 0 {
620 state & !RUNNING & !SCHEDULED
621 } else {
622 state & !RUNNING
623 };
624
625 if state & CLOSED != 0 && !future_dropped {
626 // The thread that closed the task didn't drop the future because it was
627 // running so now it's our responsibility to do so.
628 Self::drop_future(ptr);
629 future_dropped = true;
630 }
631
632 // Mark the task as not running.
633 match (*raw.header).state.compare_exchange_weak(
634 state,
635 new,
636 Ordering::AcqRel,
637 Ordering::Acquire,
638 ) {
639 Ok(state) => {
640 // If the task was closed while running, we need to notify the awaiter.
641 // If the task was woken up while running, we need to schedule it.
642 // Otherwise, we just drop the task reference.
643 if state & CLOSED != 0 {
644 // Take the awaiter out.
645 let mut awaiter = None;
646 if state & AWAITER != 0 {
647 awaiter = (*raw.header).take(None);
648 }
649
650 // Drop the task reference.
651 Self::drop_ref(ptr);
652
653 // Notify the awaiter that the future has been dropped.
654 if let Some(w) = awaiter {
655 abort_on_panic(|| w.wake());
656 }
657 } else if state & SCHEDULED != 0 {
658 // The thread that woke the task up didn't reschedule it because
659 // it was running so now it's our responsibility to do so.
660 Self::schedule(ptr, ScheduleInfo::new(true));
661 return true;
662 } else {
663 // Drop the task reference.
664 Self::drop_ref(ptr);
665 }
666 break;
667 }
668 Err(s) => state = s,
669 }
670 }
671 }
672 }
673
674 return false;
675
676 /// A guard that closes the task if polling its future panics.
677 struct Guard<F, T, S, M>(RawTask<F, T, S, M>)
678 where
679 F: Future<Output = T>,
680 S: Schedule<M>;
681
682 impl<F, T, S, M> Drop for Guard<F, T, S, M>
683 where
684 F: Future<Output = T>,
685 S: Schedule<M>,
686 {
687 fn drop(&mut self) {
688 let raw = self.0;
689 let ptr = raw.header as *const ();
690
691 unsafe {
692 let mut state = (*raw.header).state.load(Ordering::Acquire);
693
694 loop {
695 // If the task was closed while running, then unschedule it, drop its
696 // future, and drop the task reference.
697 if state & CLOSED != 0 {
698 // The thread that closed the task didn't drop the future because it
699 // was running so now it's our responsibility to do so.
700 RawTask::<F, T, S, M>::drop_future(ptr);
701
702 // Mark the task as not running and not scheduled.
703 (*raw.header)
704 .state
705 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
706
707 // Take the awaiter out.
708 let mut awaiter = None;
709 if state & AWAITER != 0 {
710 awaiter = (*raw.header).take(None);
711 }
712
713 // Drop the task reference.
714 RawTask::<F, T, S, M>::drop_ref(ptr);
715
716 // Notify the awaiter that the future has been dropped.
717 if let Some(w) = awaiter {
718 abort_on_panic(|| w.wake());
719 }
720 break;
721 }
722
723 // Mark the task as not running, not scheduled, and closed.
724 match (*raw.header).state.compare_exchange_weak(
725 state,
726 (state & !RUNNING & !SCHEDULED) | CLOSED,
727 Ordering::AcqRel,
728 Ordering::Acquire,
729 ) {
730 Ok(state) => {
731 // Drop the future because the task is now closed.
732 RawTask::<F, T, S, M>::drop_future(ptr);
733
734 // Take the awaiter out.
735 let mut awaiter = None;
736 if state & AWAITER != 0 {
737 awaiter = (*raw.header).take(None);
738 }
739
740 // Drop the task reference.
741 RawTask::<F, T, S, M>::drop_ref(ptr);
742
743 // Notify the awaiter that the future has been dropped.
744 if let Some(w) = awaiter {
745 abort_on_panic(|| w.wake());
746 }
747 break;
748 }
749 Err(s) => state = s,
750 }
751 }
752 }
753 }
754 }
755 }
756}
757