1 | use super::{Pop, Synced}; |
2 | |
3 | use crate::loom::sync::atomic::AtomicUsize; |
4 | use crate::runtime::task; |
5 | |
6 | use std::marker::PhantomData; |
7 | use std::sync::atomic::Ordering::{Acquire, Release}; |
8 | |
9 | pub(crate) struct Shared<T: 'static> { |
10 | /// Number of pending tasks in the queue. This helps prevent unnecessary |
11 | /// locking in the hot path. |
12 | pub(super) len: AtomicUsize, |
13 | |
14 | _p: PhantomData<T>, |
15 | } |
16 | |
17 | unsafe impl<T> Send for Shared<T> {} |
18 | unsafe impl<T> Sync for Shared<T> {} |
19 | |
20 | impl<T: 'static> Shared<T> { |
21 | pub(crate) fn new() -> (Shared<T>, Synced) { |
22 | let inject = Shared { |
23 | len: AtomicUsize::new(0), |
24 | _p: PhantomData, |
25 | }; |
26 | |
27 | let synced = Synced { |
28 | is_closed: false, |
29 | head: None, |
30 | tail: None, |
31 | }; |
32 | |
33 | (inject, synced) |
34 | } |
35 | |
36 | pub(crate) fn is_empty(&self) -> bool { |
37 | self.len() == 0 |
38 | } |
39 | |
40 | // Kind of annoying to have to include the cfg here |
41 | #[cfg (any(tokio_taskdump, all(feature = "rt-multi-thread" , not(tokio_wasi))))] |
42 | pub(crate) fn is_closed(&self, synced: &Synced) -> bool { |
43 | synced.is_closed |
44 | } |
45 | |
46 | /// Closes the injection queue, returns `true` if the queue is open when the |
47 | /// transition is made. |
48 | pub(crate) fn close(&self, synced: &mut Synced) -> bool { |
49 | if synced.is_closed { |
50 | return false; |
51 | } |
52 | |
53 | synced.is_closed = true; |
54 | true |
55 | } |
56 | |
57 | pub(crate) fn len(&self) -> usize { |
58 | self.len.load(Acquire) |
59 | } |
60 | |
61 | /// Pushes a value into the queue. |
62 | /// |
63 | /// This does nothing if the queue is closed. |
64 | /// |
65 | /// # Safety |
66 | /// |
67 | /// Must be called with the same `Synced` instance returned by `Inject::new` |
68 | pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) { |
69 | if synced.is_closed { |
70 | return; |
71 | } |
72 | |
73 | // safety: only mutated with the lock held |
74 | let len = self.len.unsync_load(); |
75 | let task = task.into_raw(); |
76 | |
77 | // The next pointer should already be null |
78 | debug_assert!(unsafe { task.get_queue_next().is_none() }); |
79 | |
80 | if let Some(tail) = synced.tail { |
81 | // safety: Holding the Notified for a task guarantees exclusive |
82 | // access to the `queue_next` field. |
83 | unsafe { tail.set_queue_next(Some(task)) }; |
84 | } else { |
85 | synced.head = Some(task); |
86 | } |
87 | |
88 | synced.tail = Some(task); |
89 | self.len.store(len + 1, Release); |
90 | } |
91 | |
92 | /// Pop a value from the queue. |
93 | /// |
94 | /// # Safety |
95 | /// |
96 | /// Must be called with the same `Synced` instance returned by `Inject::new` |
97 | pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> { |
98 | self.pop_n(synced, 1).next() |
99 | } |
100 | |
101 | /// Pop `n` values from the queue |
102 | /// |
103 | /// # Safety |
104 | /// |
105 | /// Must be called with the same `Synced` instance returned by `Inject::new` |
106 | pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> { |
107 | use std::cmp; |
108 | |
109 | // safety: All updates to the len atomic are guarded by the mutex. As |
110 | // such, a non-atomic load followed by a store is safe. |
111 | let len = self.len.unsync_load(); |
112 | let n = cmp::min(n, len); |
113 | |
114 | // Decrement the count. |
115 | self.len.store(len - n, Release); |
116 | |
117 | Pop::new(n, synced) |
118 | } |
119 | } |
120 | |