1use super::{Pop, Synced};
2
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::task;
5
6use std::marker::PhantomData;
7use std::sync::atomic::Ordering::{Acquire, Release};
8
9pub(crate) struct Shared<T: 'static> {
10 /// Number of pending tasks in the queue. This helps prevent unnecessary
11 /// locking in the hot path.
12 pub(super) len: AtomicUsize,
13
14 _p: PhantomData<T>,
15}
16
17unsafe impl<T> Send for Shared<T> {}
18unsafe impl<T> Sync for Shared<T> {}
19
20impl<T: 'static> Shared<T> {
21 pub(crate) fn new() -> (Shared<T>, Synced) {
22 let inject = Shared {
23 len: AtomicUsize::new(0),
24 _p: PhantomData,
25 };
26
27 let synced = Synced {
28 is_closed: false,
29 head: None,
30 tail: None,
31 };
32
33 (inject, synced)
34 }
35
36 pub(crate) fn is_empty(&self) -> bool {
37 self.len() == 0
38 }
39
40 // Kind of annoying to have to include the cfg here
41 #[cfg(any(
42 tokio_taskdump,
43 all(feature = "rt-multi-thread", not(target_os = "wasi"))
44 ))]
45 pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
46 synced.is_closed
47 }
48
49 /// Closes the injection queue, returns `true` if the queue is open when the
50 /// transition is made.
51 pub(crate) fn close(&self, synced: &mut Synced) -> bool {
52 if synced.is_closed {
53 return false;
54 }
55
56 synced.is_closed = true;
57 true
58 }
59
60 pub(crate) fn len(&self) -> usize {
61 self.len.load(Acquire)
62 }
63
64 /// Pushes a value into the queue.
65 ///
66 /// This does nothing if the queue is closed.
67 ///
68 /// # Safety
69 ///
70 /// Must be called with the same `Synced` instance returned by `Inject::new`
71 pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
72 if synced.is_closed {
73 return;
74 }
75
76 // safety: only mutated with the lock held
77 let len = self.len.unsync_load();
78 let task = task.into_raw();
79
80 // The next pointer should already be null
81 debug_assert!(unsafe { task.get_queue_next().is_none() });
82
83 if let Some(tail) = synced.tail {
84 // safety: Holding the Notified for a task guarantees exclusive
85 // access to the `queue_next` field.
86 unsafe { tail.set_queue_next(Some(task)) };
87 } else {
88 synced.head = Some(task);
89 }
90
91 synced.tail = Some(task);
92 self.len.store(len + 1, Release);
93 }
94
95 /// Pop a value from the queue.
96 ///
97 /// # Safety
98 ///
99 /// Must be called with the same `Synced` instance returned by `Inject::new`
100 pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
101 self.pop_n(synced, 1).next()
102 }
103
104 /// Pop `n` values from the queue
105 ///
106 /// # Safety
107 ///
108 /// Must be called with the same `Synced` instance returned by `Inject::new`
109 pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
110 use std::cmp;
111
112 debug_assert!(n > 0);
113
114 // safety: All updates to the len atomic are guarded by the mutex. As
115 // such, a non-atomic load followed by a store is safe.
116 let len = self.len.unsync_load();
117 let n = cmp::min(n, len);
118
119 // Decrement the count.
120 self.len.store(len - n, Release);
121
122 Pop::new(n, synced)
123 }
124}
125