| 1 | use crate::task::AtomicWaker; |
| 2 | use alloc::sync::Arc; |
| 3 | use core::cell::UnsafeCell; |
| 4 | use core::ptr; |
| 5 | use core::sync::atomic::AtomicPtr; |
| 6 | use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; |
| 7 | |
| 8 | use super::abort::abort; |
| 9 | use super::task::Task; |
| 10 | |
| 11 | pub(super) enum Dequeue<Fut> { |
| 12 | Data(*const Task<Fut>), |
| 13 | Empty, |
| 14 | Inconsistent, |
| 15 | } |
| 16 | |
| 17 | pub(super) struct ReadyToRunQueue<Fut> { |
| 18 | // The waker of the task using `FuturesUnordered`. |
| 19 | pub(super) waker: AtomicWaker, |
| 20 | |
| 21 | // Head/tail of the readiness queue |
| 22 | pub(super) head: AtomicPtr<Task<Fut>>, |
| 23 | pub(super) tail: UnsafeCell<*const Task<Fut>>, |
| 24 | pub(super) stub: Arc<Task<Fut>>, |
| 25 | } |
| 26 | |
| 27 | /// An MPSC queue into which the tasks containing the futures are inserted |
| 28 | /// whenever the future inside is scheduled for polling. |
| 29 | impl<Fut> ReadyToRunQueue<Fut> { |
| 30 | // FIXME: this takes raw pointer without safety conditions. |
| 31 | |
| 32 | /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. |
| 33 | pub(super) fn enqueue(&self, task: *const Task<Fut>) { |
| 34 | unsafe { |
| 35 | debug_assert!((*task).queued.load(Relaxed)); |
| 36 | |
| 37 | // This action does not require any coordination |
| 38 | (*task).next_ready_to_run.store(ptr::null_mut(), Relaxed); |
| 39 | |
| 40 | // Note that these atomic orderings come from 1024cores |
| 41 | let task = task as *mut _; |
| 42 | let prev = self.head.swap(task, AcqRel); |
| 43 | (*prev).next_ready_to_run.store(task, Release); |
| 44 | } |
| 45 | } |
| 46 | |
| 47 | /// The dequeue function from the 1024cores intrusive MPSC queue algorithm |
| 48 | /// |
| 49 | /// Note that this is unsafe as it required mutual exclusion (only one |
| 50 | /// thread can call this) to be guaranteed elsewhere. |
| 51 | pub(super) unsafe fn dequeue(&self) -> Dequeue<Fut> { |
| 52 | unsafe { |
| 53 | let mut tail = *self.tail.get(); |
| 54 | let mut next = (*tail).next_ready_to_run.load(Acquire); |
| 55 | |
| 56 | if tail == self.stub() { |
| 57 | if next.is_null() { |
| 58 | return Dequeue::Empty; |
| 59 | } |
| 60 | |
| 61 | *self.tail.get() = next; |
| 62 | tail = next; |
| 63 | next = (*next).next_ready_to_run.load(Acquire); |
| 64 | } |
| 65 | |
| 66 | if !next.is_null() { |
| 67 | *self.tail.get() = next; |
| 68 | debug_assert!(tail != self.stub()); |
| 69 | return Dequeue::Data(tail); |
| 70 | } |
| 71 | |
| 72 | if self.head.load(Acquire) as *const _ != tail { |
| 73 | return Dequeue::Inconsistent; |
| 74 | } |
| 75 | |
| 76 | self.enqueue(self.stub()); |
| 77 | |
| 78 | next = (*tail).next_ready_to_run.load(Acquire); |
| 79 | |
| 80 | if !next.is_null() { |
| 81 | *self.tail.get() = next; |
| 82 | return Dequeue::Data(tail); |
| 83 | } |
| 84 | |
| 85 | Dequeue::Inconsistent |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | pub(super) fn stub(&self) -> *const Task<Fut> { |
| 90 | Arc::as_ptr(&self.stub) |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | impl<Fut> Drop for ReadyToRunQueue<Fut> { |
| 95 | fn drop(&mut self) { |
| 96 | // Once we're in the destructor for `Inner<Fut>` we need to clear out |
| 97 | // the ready to run queue of tasks if there's anything left in there. |
| 98 | // |
| 99 | // Note that each task has a strong reference count associated with it |
| 100 | // which is owned by the ready to run queue. All tasks should have had |
| 101 | // their futures dropped already by the `FuturesUnordered` destructor |
| 102 | // above, so we're just pulling out tasks and dropping their refcounts. |
| 103 | unsafe { |
| 104 | loop { |
| 105 | match self.dequeue() { |
| 106 | Dequeue::Empty => break, |
| 107 | Dequeue::Inconsistent => abort("inconsistent in drop" ), |
| 108 | Dequeue::Data(ptr: *const Task) => drop(Arc::from_raw(ptr)), |
| 109 | } |
| 110 | } |
| 111 | } |
| 112 | } |
| 113 | } |
| 114 | |