1 | #![allow (clippy::unit_arg)] |
2 | |
3 | use crate::signal::os::{OsExtraData, OsStorage}; |
4 | use crate::sync::watch; |
5 | use crate::util::once_cell::OnceCell; |
6 | |
7 | use std::ops; |
8 | use std::sync::atomic::{AtomicBool, Ordering}; |
9 | |
10 | pub(crate) type EventId = usize; |
11 | |
12 | /// State for a specific event, whether a notification is pending delivery, |
13 | /// and what listeners are registered. |
14 | #[derive(Debug)] |
15 | pub(crate) struct EventInfo { |
16 | pending: AtomicBool, |
17 | tx: watch::Sender<()>, |
18 | } |
19 | |
20 | impl Default for EventInfo { |
21 | fn default() -> Self { |
22 | let (tx, _rx) = watch::channel(()); |
23 | |
24 | Self { |
25 | pending: AtomicBool::new(false), |
26 | tx, |
27 | } |
28 | } |
29 | } |
30 | |
31 | /// An interface for retrieving the `EventInfo` for a particular eventId. |
32 | pub(crate) trait Storage { |
33 | /// Gets the `EventInfo` for `id` if it exists. |
34 | fn event_info(&self, id: EventId) -> Option<&EventInfo>; |
35 | |
36 | /// Invokes `f` once for each defined `EventInfo` in this storage. |
37 | fn for_each<'a, F>(&'a self, f: F) |
38 | where |
39 | F: FnMut(&'a EventInfo); |
40 | } |
41 | |
42 | impl Storage for Vec<EventInfo> { |
43 | fn event_info(&self, id: EventId) -> Option<&EventInfo> { |
44 | self.get(id) |
45 | } |
46 | |
47 | fn for_each<'a, F>(&'a self, f: F) |
48 | where |
49 | F: FnMut(&'a EventInfo), |
50 | { |
51 | self.iter().for_each(f); |
52 | } |
53 | } |
54 | |
55 | /// An interface for initializing a type. Useful for situations where we cannot |
56 | /// inject a configured instance in the constructor of another type. |
57 | pub(crate) trait Init { |
58 | fn init() -> Self; |
59 | } |
60 | |
61 | /// Manages and distributes event notifications to any registered listeners. |
62 | /// |
63 | /// Generic over the underlying storage to allow for domain specific |
64 | /// optimizations (e.g. eventIds may or may not be contiguous). |
65 | #[derive(Debug)] |
66 | pub(crate) struct Registry<S> { |
67 | storage: S, |
68 | } |
69 | |
70 | impl<S> Registry<S> { |
71 | fn new(storage: S) -> Self { |
72 | Self { storage } |
73 | } |
74 | } |
75 | |
76 | impl<S: Storage> Registry<S> { |
77 | /// Registers a new listener for `event_id`. |
78 | fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { |
79 | self.storage |
80 | .event_info(event_id) |
81 | .unwrap_or_else(|| panic!("invalid event_id: {}" , event_id)) |
82 | .tx |
83 | .subscribe() |
84 | } |
85 | |
86 | /// Marks `event_id` as having been delivered, without broadcasting it to |
87 | /// any listeners. |
88 | fn record_event(&self, event_id: EventId) { |
89 | if let Some(event_info) = self.storage.event_info(event_id) { |
90 | event_info.pending.store(true, Ordering::SeqCst); |
91 | } |
92 | } |
93 | |
94 | /// Broadcasts all previously recorded events to their respective listeners. |
95 | /// |
96 | /// Returns `true` if an event was delivered to at least one listener. |
97 | fn broadcast(&self) -> bool { |
98 | let mut did_notify = false; |
99 | self.storage.for_each(|event_info| { |
100 | // Any signal of this kind arrived since we checked last? |
101 | if !event_info.pending.swap(false, Ordering::SeqCst) { |
102 | return; |
103 | } |
104 | |
105 | // Ignore errors if there are no listeners |
106 | if event_info.tx.send(()).is_ok() { |
107 | did_notify = true; |
108 | } |
109 | }); |
110 | |
111 | did_notify |
112 | } |
113 | } |
114 | |
115 | pub(crate) struct Globals { |
116 | extra: OsExtraData, |
117 | registry: Registry<OsStorage>, |
118 | } |
119 | |
120 | impl ops::Deref for Globals { |
121 | type Target = OsExtraData; |
122 | |
123 | fn deref(&self) -> &Self::Target { |
124 | &self.extra |
125 | } |
126 | } |
127 | |
128 | impl Globals { |
129 | /// Registers a new listener for `event_id`. |
130 | pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { |
131 | self.registry.register_listener(event_id) |
132 | } |
133 | |
134 | /// Marks `event_id` as having been delivered, without broadcasting it to |
135 | /// any listeners. |
136 | pub(crate) fn record_event(&self, event_id: EventId) { |
137 | self.registry.record_event(event_id); |
138 | } |
139 | |
140 | /// Broadcasts all previously recorded events to their respective listeners. |
141 | /// |
142 | /// Returns `true` if an event was delivered to at least one listener. |
143 | pub(crate) fn broadcast(&self) -> bool { |
144 | self.registry.broadcast() |
145 | } |
146 | |
147 | #[cfg (unix)] |
148 | pub(crate) fn storage(&self) -> &OsStorage { |
149 | &self.registry.storage |
150 | } |
151 | } |
152 | |
153 | fn globals_init() -> Globals |
154 | where |
155 | OsExtraData: 'static + Send + Sync + Init, |
156 | OsStorage: 'static + Send + Sync + Init, |
157 | { |
158 | Globals { |
159 | extra: OsExtraData::init(), |
160 | registry: Registry::new(OsStorage::init()), |
161 | } |
162 | } |
163 | |
164 | pub(crate) fn globals() -> &'static Globals |
165 | where |
166 | OsExtraData: 'static + Send + Sync + Init, |
167 | OsStorage: 'static + Send + Sync + Init, |
168 | { |
169 | static GLOBALS: OnceCell<Globals> = OnceCell::new(); |
170 | |
171 | GLOBALS.get(globals_init) |
172 | } |
173 | |
174 | #[cfg (all(test, not(loom)))] |
175 | mod tests { |
176 | use super::*; |
177 | use crate::runtime::{self, Runtime}; |
178 | use crate::sync::{oneshot, watch}; |
179 | |
180 | use futures::future; |
181 | |
182 | #[test] |
183 | fn smoke() { |
184 | let rt = rt(); |
185 | rt.block_on(async move { |
186 | let registry = Registry::new(vec![ |
187 | EventInfo::default(), |
188 | EventInfo::default(), |
189 | EventInfo::default(), |
190 | ]); |
191 | |
192 | let first = registry.register_listener(0); |
193 | let second = registry.register_listener(1); |
194 | let third = registry.register_listener(2); |
195 | |
196 | let (fire, wait) = oneshot::channel(); |
197 | |
198 | crate::spawn(async { |
199 | wait.await.expect("wait failed" ); |
200 | |
201 | // Record some events which should get coalesced |
202 | registry.record_event(0); |
203 | registry.record_event(0); |
204 | registry.record_event(1); |
205 | registry.record_event(1); |
206 | registry.broadcast(); |
207 | |
208 | // Yield so the previous broadcast can get received |
209 | // |
210 | // This yields many times since the block_on task is only polled every 61 |
211 | // ticks. |
212 | for _ in 0..100 { |
213 | crate::task::yield_now().await; |
214 | } |
215 | |
216 | // Send subsequent signal |
217 | registry.record_event(0); |
218 | registry.broadcast(); |
219 | |
220 | drop(registry); |
221 | }); |
222 | |
223 | let _ = fire.send(()); |
224 | let all = future::join3(collect(first), collect(second), collect(third)); |
225 | |
226 | let (first_results, second_results, third_results) = all.await; |
227 | assert_eq!(2, first_results.len()); |
228 | assert_eq!(1, second_results.len()); |
229 | assert_eq!(0, third_results.len()); |
230 | }); |
231 | } |
232 | |
233 | #[test] |
234 | #[should_panic = "invalid event_id: 1" ] |
235 | fn register_panics_on_invalid_input() { |
236 | let registry = Registry::new(vec![EventInfo::default()]); |
237 | |
238 | registry.register_listener(1); |
239 | } |
240 | |
241 | #[test] |
242 | fn record_invalid_event_does_nothing() { |
243 | let registry = Registry::new(vec![EventInfo::default()]); |
244 | registry.record_event(1302); |
245 | } |
246 | |
247 | #[test] |
248 | fn broadcast_returns_if_at_least_one_event_fired() { |
249 | let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]); |
250 | |
251 | registry.record_event(0); |
252 | assert!(!registry.broadcast()); |
253 | |
254 | let first = registry.register_listener(0); |
255 | let second = registry.register_listener(1); |
256 | |
257 | registry.record_event(0); |
258 | assert!(registry.broadcast()); |
259 | |
260 | drop(first); |
261 | registry.record_event(0); |
262 | assert!(!registry.broadcast()); |
263 | |
264 | drop(second); |
265 | } |
266 | |
267 | fn rt() -> Runtime { |
268 | runtime::Builder::new_current_thread() |
269 | .enable_time() |
270 | .build() |
271 | .unwrap() |
272 | } |
273 | |
274 | async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> { |
275 | let mut ret = vec![]; |
276 | |
277 | while let Ok(v) = rx.changed().await { |
278 | ret.push(v); |
279 | } |
280 | |
281 | ret |
282 | } |
283 | } |
284 | |