1 | use crate::task::AtomicWaker; |
2 | use alloc::sync::Arc; |
3 | use core::cell::UnsafeCell; |
4 | use core::ptr; |
5 | use core::sync::atomic::AtomicPtr; |
6 | use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; |
7 | |
8 | use super::abort::abort; |
9 | use super::task::Task; |
10 | |
11 | pub(super) enum Dequeue<Fut> { |
12 | Data(*const Task<Fut>), |
13 | Empty, |
14 | Inconsistent, |
15 | } |
16 | |
17 | pub(super) struct ReadyToRunQueue<Fut> { |
18 | // The waker of the task using `FuturesUnordered`. |
19 | pub(super) waker: AtomicWaker, |
20 | |
21 | // Head/tail of the readiness queue |
22 | pub(super) head: AtomicPtr<Task<Fut>>, |
23 | pub(super) tail: UnsafeCell<*const Task<Fut>>, |
24 | pub(super) stub: Arc<Task<Fut>>, |
25 | } |
26 | |
27 | /// An MPSC queue into which the tasks containing the futures are inserted |
28 | /// whenever the future inside is scheduled for polling. |
29 | impl<Fut> ReadyToRunQueue<Fut> { |
30 | /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. |
31 | pub(super) fn enqueue(&self, task: *const Task<Fut>) { |
32 | unsafe { |
33 | debug_assert!((*task).queued.load(Relaxed)); |
34 | |
35 | // This action does not require any coordination |
36 | (*task).next_ready_to_run.store(ptr::null_mut(), Relaxed); |
37 | |
38 | // Note that these atomic orderings come from 1024cores |
39 | let task = task as *mut _; |
40 | let prev = self.head.swap(task, AcqRel); |
41 | (*prev).next_ready_to_run.store(task, Release); |
42 | } |
43 | } |
44 | |
45 | /// The dequeue function from the 1024cores intrusive MPSC queue algorithm |
46 | /// |
47 | /// Note that this is unsafe as it required mutual exclusion (only one |
48 | /// thread can call this) to be guaranteed elsewhere. |
49 | pub(super) unsafe fn dequeue(&self) -> Dequeue<Fut> { |
50 | let mut tail = *self.tail.get(); |
51 | let mut next = (*tail).next_ready_to_run.load(Acquire); |
52 | |
53 | if tail == self.stub() { |
54 | if next.is_null() { |
55 | return Dequeue::Empty; |
56 | } |
57 | |
58 | *self.tail.get() = next; |
59 | tail = next; |
60 | next = (*next).next_ready_to_run.load(Acquire); |
61 | } |
62 | |
63 | if !next.is_null() { |
64 | *self.tail.get() = next; |
65 | debug_assert!(tail != self.stub()); |
66 | return Dequeue::Data(tail); |
67 | } |
68 | |
69 | if self.head.load(Acquire) as *const _ != tail { |
70 | return Dequeue::Inconsistent; |
71 | } |
72 | |
73 | self.enqueue(self.stub()); |
74 | |
75 | next = (*tail).next_ready_to_run.load(Acquire); |
76 | |
77 | if !next.is_null() { |
78 | *self.tail.get() = next; |
79 | return Dequeue::Data(tail); |
80 | } |
81 | |
82 | Dequeue::Inconsistent |
83 | } |
84 | |
85 | pub(super) fn stub(&self) -> *const Task<Fut> { |
86 | Arc::as_ptr(&self.stub) |
87 | } |
88 | } |
89 | |
90 | impl<Fut> Drop for ReadyToRunQueue<Fut> { |
91 | fn drop(&mut self) { |
92 | // Once we're in the destructor for `Inner<Fut>` we need to clear out |
93 | // the ready to run queue of tasks if there's anything left in there. |
94 | // |
95 | // Note that each task has a strong reference count associated with it |
96 | // which is owned by the ready to run queue. All tasks should have had |
97 | // their futures dropped already by the `FuturesUnordered` destructor |
98 | // above, so we're just pulling out tasks and dropping their refcounts. |
99 | unsafe { |
100 | loop { |
101 | match self.dequeue() { |
102 | Dequeue::Empty => break, |
103 | Dequeue::Inconsistent => abort("inconsistent in drop" ), |
104 | Dequeue::Data(ptr: *const Task) => drop(Arc::from_raw(ptr)), |
105 | } |
106 | } |
107 | } |
108 | } |
109 | } |
110 | |