1 | use crate::task::AtomicWaker; |
2 | use alloc::sync::Arc; |
3 | use core::cell::UnsafeCell; |
4 | use core::ptr; |
5 | use core::sync::atomic::AtomicPtr; |
6 | use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; |
7 | |
8 | use super::abort::abort; |
9 | use super::task::Task; |
10 | |
11 | pub(super) enum Dequeue<Fut> { |
12 | Data(*const Task<Fut>), |
13 | Empty, |
14 | Inconsistent, |
15 | } |
16 | |
17 | pub(super) struct ReadyToRunQueue<Fut> { |
18 | // The waker of the task using `FuturesUnordered`. |
19 | pub(super) waker: AtomicWaker, |
20 | |
21 | // Head/tail of the readiness queue |
22 | pub(super) head: AtomicPtr<Task<Fut>>, |
23 | pub(super) tail: UnsafeCell<*const Task<Fut>>, |
24 | pub(super) stub: Arc<Task<Fut>>, |
25 | } |
26 | |
27 | /// An MPSC queue into which the tasks containing the futures are inserted |
28 | /// whenever the future inside is scheduled for polling. |
29 | impl<Fut> ReadyToRunQueue<Fut> { |
30 | // FIXME: this takes raw pointer without safety conditions. |
31 | |
32 | /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. |
33 | pub(super) fn enqueue(&self, task: *const Task<Fut>) { |
34 | unsafe { |
35 | debug_assert!((*task).queued.load(Relaxed)); |
36 | |
37 | // This action does not require any coordination |
38 | (*task).next_ready_to_run.store(ptr::null_mut(), Relaxed); |
39 | |
40 | // Note that these atomic orderings come from 1024cores |
41 | let task = task as *mut _; |
42 | let prev = self.head.swap(task, AcqRel); |
43 | (*prev).next_ready_to_run.store(task, Release); |
44 | } |
45 | } |
46 | |
47 | /// The dequeue function from the 1024cores intrusive MPSC queue algorithm |
48 | /// |
49 | /// Note that this is unsafe as it required mutual exclusion (only one |
50 | /// thread can call this) to be guaranteed elsewhere. |
51 | pub(super) unsafe fn dequeue(&self) -> Dequeue<Fut> { |
52 | unsafe { |
53 | let mut tail = *self.tail.get(); |
54 | let mut next = (*tail).next_ready_to_run.load(Acquire); |
55 | |
56 | if tail == self.stub() { |
57 | if next.is_null() { |
58 | return Dequeue::Empty; |
59 | } |
60 | |
61 | *self.tail.get() = next; |
62 | tail = next; |
63 | next = (*next).next_ready_to_run.load(Acquire); |
64 | } |
65 | |
66 | if !next.is_null() { |
67 | *self.tail.get() = next; |
68 | debug_assert!(tail != self.stub()); |
69 | return Dequeue::Data(tail); |
70 | } |
71 | |
72 | if self.head.load(Acquire) as *const _ != tail { |
73 | return Dequeue::Inconsistent; |
74 | } |
75 | |
76 | self.enqueue(self.stub()); |
77 | |
78 | next = (*tail).next_ready_to_run.load(Acquire); |
79 | |
80 | if !next.is_null() { |
81 | *self.tail.get() = next; |
82 | return Dequeue::Data(tail); |
83 | } |
84 | |
85 | Dequeue::Inconsistent |
86 | } |
87 | } |
88 | |
89 | pub(super) fn stub(&self) -> *const Task<Fut> { |
90 | Arc::as_ptr(&self.stub) |
91 | } |
92 | } |
93 | |
94 | impl<Fut> Drop for ReadyToRunQueue<Fut> { |
95 | fn drop(&mut self) { |
96 | // Once we're in the destructor for `Inner<Fut>` we need to clear out |
97 | // the ready to run queue of tasks if there's anything left in there. |
98 | // |
99 | // Note that each task has a strong reference count associated with it |
100 | // which is owned by the ready to run queue. All tasks should have had |
101 | // their futures dropped already by the `FuturesUnordered` destructor |
102 | // above, so we're just pulling out tasks and dropping their refcounts. |
103 | unsafe { |
104 | loop { |
105 | match self.dequeue() { |
106 | Dequeue::Empty => break, |
107 | Dequeue::Inconsistent => abort("inconsistent in drop" ), |
108 | Dequeue::Data(ptr: *const Task) => drop(Arc::from_raw(ptr)), |
109 | } |
110 | } |
111 | } |
112 | } |
113 | } |
114 | |