1#![allow(clippy::unit_arg)]
2
3use crate::signal::os::{OsExtraData, OsStorage};
4use crate::sync::watch;
5use crate::util::once_cell::OnceCell;
6
7use std::ops;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10pub(crate) type EventId = usize;
11
12/// State for a specific event, whether a notification is pending delivery,
13/// and what listeners are registered.
14#[derive(Debug)]
15pub(crate) struct EventInfo {
16 pending: AtomicBool,
17 tx: watch::Sender<()>,
18}
19
20impl Default for EventInfo {
21 fn default() -> Self {
22 let (tx, _rx) = watch::channel(());
23
24 Self {
25 pending: AtomicBool::new(false),
26 tx,
27 }
28 }
29}
30
31/// An interface for retrieving the `EventInfo` for a particular eventId.
32pub(crate) trait Storage {
33 /// Gets the `EventInfo` for `id` if it exists.
34 fn event_info(&self, id: EventId) -> Option<&EventInfo>;
35
36 /// Invokes `f` once for each defined `EventInfo` in this storage.
37 fn for_each<'a, F>(&'a self, f: F)
38 where
39 F: FnMut(&'a EventInfo);
40}
41
42impl Storage for Vec<EventInfo> {
43 fn event_info(&self, id: EventId) -> Option<&EventInfo> {
44 self.get(id)
45 }
46
47 fn for_each<'a, F>(&'a self, f: F)
48 where
49 F: FnMut(&'a EventInfo),
50 {
51 self.iter().for_each(f);
52 }
53}
54
55/// An interface for initializing a type. Useful for situations where we cannot
56/// inject a configured instance in the constructor of another type.
57pub(crate) trait Init {
58 fn init() -> Self;
59}
60
61/// Manages and distributes event notifications to any registered listeners.
62///
63/// Generic over the underlying storage to allow for domain specific
64/// optimizations (e.g. eventIds may or may not be contiguous).
65#[derive(Debug)]
66pub(crate) struct Registry<S> {
67 storage: S,
68}
69
70impl<S> Registry<S> {
71 fn new(storage: S) -> Self {
72 Self { storage }
73 }
74}
75
76impl<S: Storage> Registry<S> {
77 /// Registers a new listener for `event_id`.
78 fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
79 self.storage
80 .event_info(event_id)
81 .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
82 .tx
83 .subscribe()
84 }
85
86 /// Marks `event_id` as having been delivered, without broadcasting it to
87 /// any listeners.
88 fn record_event(&self, event_id: EventId) {
89 if let Some(event_info) = self.storage.event_info(event_id) {
90 event_info.pending.store(true, Ordering::SeqCst);
91 }
92 }
93
94 /// Broadcasts all previously recorded events to their respective listeners.
95 ///
96 /// Returns `true` if an event was delivered to at least one listener.
97 fn broadcast(&self) -> bool {
98 let mut did_notify = false;
99 self.storage.for_each(|event_info| {
100 // Any signal of this kind arrived since we checked last?
101 if !event_info.pending.swap(false, Ordering::SeqCst) {
102 return;
103 }
104
105 // Ignore errors if there are no listeners
106 if event_info.tx.send(()).is_ok() {
107 did_notify = true;
108 }
109 });
110
111 did_notify
112 }
113}
114
115pub(crate) struct Globals {
116 extra: OsExtraData,
117 registry: Registry<OsStorage>,
118}
119
120impl ops::Deref for Globals {
121 type Target = OsExtraData;
122
123 fn deref(&self) -> &Self::Target {
124 &self.extra
125 }
126}
127
128impl Globals {
129 /// Registers a new listener for `event_id`.
130 pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
131 self.registry.register_listener(event_id)
132 }
133
134 /// Marks `event_id` as having been delivered, without broadcasting it to
135 /// any listeners.
136 pub(crate) fn record_event(&self, event_id: EventId) {
137 self.registry.record_event(event_id);
138 }
139
140 /// Broadcasts all previously recorded events to their respective listeners.
141 ///
142 /// Returns `true` if an event was delivered to at least one listener.
143 pub(crate) fn broadcast(&self) -> bool {
144 self.registry.broadcast()
145 }
146
147 #[cfg(unix)]
148 pub(crate) fn storage(&self) -> &OsStorage {
149 &self.registry.storage
150 }
151}
152
153fn globals_init() -> Globals
154where
155 OsExtraData: 'static + Send + Sync + Init,
156 OsStorage: 'static + Send + Sync + Init,
157{
158 Globals {
159 extra: OsExtraData::init(),
160 registry: Registry::new(OsStorage::init()),
161 }
162}
163
164pub(crate) fn globals() -> &'static Globals
165where
166 OsExtraData: 'static + Send + Sync + Init,
167 OsStorage: 'static + Send + Sync + Init,
168{
169 static GLOBALS: OnceCell<Globals> = OnceCell::new();
170
171 GLOBALS.get(globals_init)
172}
173
174#[cfg(all(test, not(loom)))]
175mod tests {
176 use super::*;
177 use crate::runtime::{self, Runtime};
178 use crate::sync::{oneshot, watch};
179
180 use futures::future;
181
182 #[test]
183 fn smoke() {
184 let rt = rt();
185 rt.block_on(async move {
186 let registry = Registry::new(vec![
187 EventInfo::default(),
188 EventInfo::default(),
189 EventInfo::default(),
190 ]);
191
192 let first = registry.register_listener(0);
193 let second = registry.register_listener(1);
194 let third = registry.register_listener(2);
195
196 let (fire, wait) = oneshot::channel();
197
198 crate::spawn(async {
199 wait.await.expect("wait failed");
200
201 // Record some events which should get coalesced
202 registry.record_event(0);
203 registry.record_event(0);
204 registry.record_event(1);
205 registry.record_event(1);
206 registry.broadcast();
207
208 // Yield so the previous broadcast can get received
209 //
210 // This yields many times since the block_on task is only polled every 61
211 // ticks.
212 for _ in 0..100 {
213 crate::task::yield_now().await;
214 }
215
216 // Send subsequent signal
217 registry.record_event(0);
218 registry.broadcast();
219
220 drop(registry);
221 });
222
223 let _ = fire.send(());
224 let all = future::join3(collect(first), collect(second), collect(third));
225
226 let (first_results, second_results, third_results) = all.await;
227 assert_eq!(2, first_results.len());
228 assert_eq!(1, second_results.len());
229 assert_eq!(0, third_results.len());
230 });
231 }
232
233 #[test]
234 #[should_panic = "invalid event_id: 1"]
235 fn register_panics_on_invalid_input() {
236 let registry = Registry::new(vec![EventInfo::default()]);
237
238 registry.register_listener(1);
239 }
240
241 #[test]
242 fn record_invalid_event_does_nothing() {
243 let registry = Registry::new(vec![EventInfo::default()]);
244 registry.record_event(1302);
245 }
246
247 #[test]
248 fn broadcast_returns_if_at_least_one_event_fired() {
249 let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]);
250
251 registry.record_event(0);
252 assert!(!registry.broadcast());
253
254 let first = registry.register_listener(0);
255 let second = registry.register_listener(1);
256
257 registry.record_event(0);
258 assert!(registry.broadcast());
259
260 drop(first);
261 registry.record_event(0);
262 assert!(!registry.broadcast());
263
264 drop(second);
265 }
266
267 fn rt() -> Runtime {
268 runtime::Builder::new_current_thread()
269 .enable_time()
270 .build()
271 .unwrap()
272 }
273
274 async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> {
275 let mut ret = vec![];
276
277 while let Ok(v) = rx.changed().await {
278 ret.push(v);
279 }
280
281 ret
282 }
283}
284