1//! Inject queue used to send wakeups to a work-stealing scheduler
2
3use crate::loom::sync::Mutex;
4use crate::runtime::task;
5
6mod pop;
7pub(crate) use pop::Pop;
8
9mod shared;
10pub(crate) use shared::Shared;
11
12mod synced;
13pub(crate) use synced::Synced;
14
15cfg_rt_multi_thread! {
16 mod rt_multi_thread;
17}
18
19cfg_metrics! {
20 mod metrics;
21}
22
23/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
24/// overflow queue when the local, fixed-size, array queue overflows.
25pub(crate) struct Inject<T: 'static> {
26 shared: Shared<T>,
27 synced: Mutex<Synced>,
28}
29
30impl<T: 'static> Inject<T> {
31 pub(crate) fn new() -> Inject<T> {
32 let (shared, synced) = Shared::new();
33
34 Inject {
35 shared,
36 synced: Mutex::new(synced),
37 }
38 }
39
40 // Kind of annoying to have to include the cfg here
41 #[cfg(tokio_taskdump)]
42 pub(crate) fn is_closed(&self) -> bool {
43 let synced = self.synced.lock();
44 self.shared.is_closed(&synced)
45 }
46
47 /// Closes the injection queue, returns `true` if the queue is open when the
48 /// transition is made.
49 pub(crate) fn close(&self) -> bool {
50 let mut synced = self.synced.lock();
51 self.shared.close(&mut synced)
52 }
53
54 /// Pushes a value into the queue.
55 ///
56 /// This does nothing if the queue is closed.
57 pub(crate) fn push(&self, task: task::Notified<T>) {
58 let mut synced = self.synced.lock();
59 // safety: passing correct `Synced`
60 unsafe { self.shared.push(&mut synced, task) }
61 }
62
63 pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
64 if self.shared.is_empty() {
65 return None;
66 }
67
68 let mut synced = self.synced.lock();
69 // safety: passing correct `Synced`
70 unsafe { self.shared.pop(&mut synced) }
71 }
72}
73