1 | //! Inject queue used to send wakeups to a work-stealing scheduler |
2 | |
3 | use crate::loom::sync::Mutex; |
4 | use crate::runtime::task; |
5 | |
6 | mod pop; |
7 | pub(crate) use pop::Pop; |
8 | |
9 | mod shared; |
10 | pub(crate) use shared::Shared; |
11 | |
12 | mod synced; |
13 | pub(crate) use synced::Synced; |
14 | |
15 | cfg_rt_multi_thread! { |
16 | mod rt_multi_thread; |
17 | } |
18 | |
19 | mod metrics; |
20 | |
21 | /// Growable, MPMC queue used to inject new tasks into the scheduler and as an |
22 | /// overflow queue when the local, fixed-size, array queue overflows. |
23 | pub(crate) struct Inject<T: 'static> { |
24 | shared: Shared<T>, |
25 | synced: Mutex<Synced>, |
26 | } |
27 | |
28 | impl<T: 'static> Inject<T> { |
29 | pub(crate) fn new() -> Inject<T> { |
30 | let (shared, synced) = Shared::new(); |
31 | |
32 | Inject { |
33 | shared, |
34 | synced: Mutex::new(synced), |
35 | } |
36 | } |
37 | |
38 | // Kind of annoying to have to include the cfg here |
39 | #[cfg (tokio_taskdump)] |
40 | pub(crate) fn is_closed(&self) -> bool { |
41 | let synced = self.synced.lock(); |
42 | self.shared.is_closed(&synced) |
43 | } |
44 | |
45 | /// Closes the injection queue, returns `true` if the queue is open when the |
46 | /// transition is made. |
47 | pub(crate) fn close(&self) -> bool { |
48 | let mut synced = self.synced.lock(); |
49 | self.shared.close(&mut synced) |
50 | } |
51 | |
52 | /// Pushes a value into the queue. |
53 | /// |
54 | /// This does nothing if the queue is closed. |
55 | pub(crate) fn push(&self, task: task::Notified<T>) { |
56 | let mut synced = self.synced.lock(); |
57 | // safety: passing correct `Synced` |
58 | unsafe { self.shared.push(&mut synced, task) } |
59 | } |
60 | |
61 | pub(crate) fn pop(&self) -> Option<task::Notified<T>> { |
62 | if self.shared.is_empty() { |
63 | return None; |
64 | } |
65 | |
66 | let mut synced = self.synced.lock(); |
67 | // safety: passing correct `Synced` |
68 | unsafe { self.shared.pop(&mut synced) } |
69 | } |
70 | } |
71 | |