| 1 | //! Inject queue used to send wakeups to a work-stealing scheduler |
| 2 | |
| 3 | use crate::loom::sync::Mutex; |
| 4 | use crate::runtime::task; |
| 5 | |
| 6 | mod pop; |
| 7 | pub(crate) use pop::Pop; |
| 8 | |
| 9 | mod shared; |
| 10 | pub(crate) use shared::Shared; |
| 11 | |
| 12 | mod synced; |
| 13 | pub(crate) use synced::Synced; |
| 14 | |
| 15 | cfg_rt_multi_thread! { |
| 16 | mod rt_multi_thread; |
| 17 | } |
| 18 | |
| 19 | mod metrics; |
| 20 | |
| 21 | /// Growable, MPMC queue used to inject new tasks into the scheduler and as an |
| 22 | /// overflow queue when the local, fixed-size, array queue overflows. |
| 23 | pub(crate) struct Inject<T: 'static> { |
| 24 | shared: Shared<T>, |
| 25 | synced: Mutex<Synced>, |
| 26 | } |
| 27 | |
| 28 | impl<T: 'static> Inject<T> { |
| 29 | pub(crate) fn new() -> Inject<T> { |
| 30 | let (shared, synced) = Shared::new(); |
| 31 | |
| 32 | Inject { |
| 33 | shared, |
| 34 | synced: Mutex::new(synced), |
| 35 | } |
| 36 | } |
| 37 | |
| 38 | // Kind of annoying to have to include the cfg here |
| 39 | #[cfg (tokio_taskdump)] |
| 40 | pub(crate) fn is_closed(&self) -> bool { |
| 41 | let synced = self.synced.lock(); |
| 42 | self.shared.is_closed(&synced) |
| 43 | } |
| 44 | |
| 45 | /// Closes the injection queue, returns `true` if the queue is open when the |
| 46 | /// transition is made. |
| 47 | pub(crate) fn close(&self) -> bool { |
| 48 | let mut synced = self.synced.lock(); |
| 49 | self.shared.close(&mut synced) |
| 50 | } |
| 51 | |
| 52 | /// Pushes a value into the queue. |
| 53 | /// |
| 54 | /// This does nothing if the queue is closed. |
| 55 | pub(crate) fn push(&self, task: task::Notified<T>) { |
| 56 | let mut synced = self.synced.lock(); |
| 57 | // safety: passing correct `Synced` |
| 58 | unsafe { self.shared.push(&mut synced, task) } |
| 59 | } |
| 60 | |
| 61 | pub(crate) fn pop(&self) -> Option<task::Notified<T>> { |
| 62 | if self.shared.is_empty() { |
| 63 | return None; |
| 64 | } |
| 65 | |
| 66 | let mut synced = self.synced.lock(); |
| 67 | // safety: passing correct `Synced` |
| 68 | unsafe { self.shared.pop(&mut synced) } |
| 69 | } |
| 70 | } |
| 71 | |