1use super::{Pop, Synced};
2
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::task;
5
6use std::marker::PhantomData;
7use std::sync::atomic::Ordering::{Acquire, Release};
8
9pub(crate) struct Shared<T: 'static> {
10 /// Number of pending tasks in the queue. This helps prevent unnecessary
11 /// locking in the hot path.
12 pub(super) len: AtomicUsize,
13
14 _p: PhantomData<T>,
15}
16
17unsafe impl<T> Send for Shared<T> {}
18unsafe impl<T> Sync for Shared<T> {}
19
20impl<T: 'static> Shared<T> {
21 pub(crate) fn new() -> (Shared<T>, Synced) {
22 let inject = Shared {
23 len: AtomicUsize::new(0),
24 _p: PhantomData,
25 };
26
27 let synced = Synced {
28 is_closed: false,
29 head: None,
30 tail: None,
31 };
32
33 (inject, synced)
34 }
35
36 pub(crate) fn is_empty(&self) -> bool {
37 self.len() == 0
38 }
39
40 // Kind of annoying to have to include the cfg here
41 #[cfg(any(tokio_taskdump, all(feature = "rt-multi-thread", not(tokio_wasi))))]
42 pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
43 synced.is_closed
44 }
45
46 /// Closes the injection queue, returns `true` if the queue is open when the
47 /// transition is made.
48 pub(crate) fn close(&self, synced: &mut Synced) -> bool {
49 if synced.is_closed {
50 return false;
51 }
52
53 synced.is_closed = true;
54 true
55 }
56
57 pub(crate) fn len(&self) -> usize {
58 self.len.load(Acquire)
59 }
60
61 /// Pushes a value into the queue.
62 ///
63 /// This does nothing if the queue is closed.
64 ///
65 /// # Safety
66 ///
67 /// Must be called with the same `Synced` instance returned by `Inject::new`
68 pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
69 if synced.is_closed {
70 return;
71 }
72
73 // safety: only mutated with the lock held
74 let len = self.len.unsync_load();
75 let task = task.into_raw();
76
77 // The next pointer should already be null
78 debug_assert!(unsafe { task.get_queue_next().is_none() });
79
80 if let Some(tail) = synced.tail {
81 // safety: Holding the Notified for a task guarantees exclusive
82 // access to the `queue_next` field.
83 unsafe { tail.set_queue_next(Some(task)) };
84 } else {
85 synced.head = Some(task);
86 }
87
88 synced.tail = Some(task);
89 self.len.store(len + 1, Release);
90 }
91
92 /// Pop a value from the queue.
93 ///
94 /// # Safety
95 ///
96 /// Must be called with the same `Synced` instance returned by `Inject::new`
97 pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
98 self.pop_n(synced, 1).next()
99 }
100
101 /// Pop `n` values from the queue
102 ///
103 /// # Safety
104 ///
105 /// Must be called with the same `Synced` instance returned by `Inject::new`
106 pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
107 use std::cmp;
108
109 // safety: All updates to the len atomic are guarded by the mutex. As
110 // such, a non-atomic load followed by a store is safe.
111 let len = self.len.unsync_load();
112 let n = cmp::min(n, len);
113
114 // Decrement the count.
115 self.len.store(len - n, Release);
116
117 Pop::new(n, synced)
118 }
119}
120