1 | use super::{Pop, Synced}; |
2 | |
3 | use crate::loom::sync::atomic::AtomicUsize; |
4 | use crate::runtime::task; |
5 | |
6 | use std::marker::PhantomData; |
7 | use std::sync::atomic::Ordering::{Acquire, Release}; |
8 | |
9 | pub(crate) struct Shared<T: 'static> { |
10 | /// Number of pending tasks in the queue. This helps prevent unnecessary |
11 | /// locking in the hot path. |
12 | pub(super) len: AtomicUsize, |
13 | |
14 | _p: PhantomData<T>, |
15 | } |
16 | |
17 | unsafe impl<T> Send for Shared<T> {} |
18 | unsafe impl<T> Sync for Shared<T> {} |
19 | |
20 | impl<T: 'static> Shared<T> { |
21 | pub(crate) fn new() -> (Shared<T>, Synced) { |
22 | let inject = Shared { |
23 | len: AtomicUsize::new(0), |
24 | _p: PhantomData, |
25 | }; |
26 | |
27 | let synced = Synced { |
28 | is_closed: false, |
29 | head: None, |
30 | tail: None, |
31 | }; |
32 | |
33 | (inject, synced) |
34 | } |
35 | |
36 | pub(crate) fn is_empty(&self) -> bool { |
37 | self.len() == 0 |
38 | } |
39 | |
40 | // Kind of annoying to have to include the cfg here |
41 | #[cfg (any( |
42 | tokio_taskdump, |
43 | all(feature = "rt-multi-thread" , not(target_os = "wasi" )) |
44 | ))] |
45 | pub(crate) fn is_closed(&self, synced: &Synced) -> bool { |
46 | synced.is_closed |
47 | } |
48 | |
49 | /// Closes the injection queue, returns `true` if the queue is open when the |
50 | /// transition is made. |
51 | pub(crate) fn close(&self, synced: &mut Synced) -> bool { |
52 | if synced.is_closed { |
53 | return false; |
54 | } |
55 | |
56 | synced.is_closed = true; |
57 | true |
58 | } |
59 | |
60 | pub(crate) fn len(&self) -> usize { |
61 | self.len.load(Acquire) |
62 | } |
63 | |
64 | /// Pushes a value into the queue. |
65 | /// |
66 | /// This does nothing if the queue is closed. |
67 | /// |
68 | /// # Safety |
69 | /// |
70 | /// Must be called with the same `Synced` instance returned by `Inject::new` |
71 | pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) { |
72 | if synced.is_closed { |
73 | return; |
74 | } |
75 | |
76 | // safety: only mutated with the lock held |
77 | let len = self.len.unsync_load(); |
78 | let task = task.into_raw(); |
79 | |
80 | // The next pointer should already be null |
81 | debug_assert!(unsafe { task.get_queue_next().is_none() }); |
82 | |
83 | if let Some(tail) = synced.tail { |
84 | // safety: Holding the Notified for a task guarantees exclusive |
85 | // access to the `queue_next` field. |
86 | unsafe { tail.set_queue_next(Some(task)) }; |
87 | } else { |
88 | synced.head = Some(task); |
89 | } |
90 | |
91 | synced.tail = Some(task); |
92 | self.len.store(len + 1, Release); |
93 | } |
94 | |
95 | /// Pop a value from the queue. |
96 | /// |
97 | /// # Safety |
98 | /// |
99 | /// Must be called with the same `Synced` instance returned by `Inject::new` |
100 | pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> { |
101 | self.pop_n(synced, 1).next() |
102 | } |
103 | |
104 | /// Pop `n` values from the queue |
105 | /// |
106 | /// # Safety |
107 | /// |
108 | /// Must be called with the same `Synced` instance returned by `Inject::new` |
109 | pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> { |
110 | use std::cmp; |
111 | |
112 | debug_assert!(n > 0); |
113 | |
114 | // safety: All updates to the len atomic are guarded by the mutex. As |
115 | // such, a non-atomic load followed by a store is safe. |
116 | let len = self.len.unsync_load(); |
117 | let n = cmp::min(n, len); |
118 | |
119 | // Decrement the count. |
120 | self.len.store(len - n, Release); |
121 | |
122 | Pop::new(n, synced) |
123 | } |
124 | } |
125 | |