1 | use crate::signal::os::{OsExtraData, OsStorage}; |
2 | use crate::sync::watch; |
3 | use crate::util::once_cell::OnceCell; |
4 | |
5 | use std::ops; |
6 | use std::sync::atomic::{AtomicBool, Ordering}; |
7 | |
8 | pub(crate) type EventId = usize; |
9 | |
10 | /// State for a specific event, whether a notification is pending delivery, |
11 | /// and what listeners are registered. |
12 | #[derive (Debug)] |
13 | pub(crate) struct EventInfo { |
14 | pending: AtomicBool, |
15 | tx: watch::Sender<()>, |
16 | } |
17 | |
18 | impl Default for EventInfo { |
19 | fn default() -> Self { |
20 | let (tx: Sender<()>, _rx: Receiver<()>) = watch::channel(()); |
21 | |
22 | Self { |
23 | pending: AtomicBool::new(false), |
24 | tx, |
25 | } |
26 | } |
27 | } |
28 | |
29 | /// An interface for retrieving the `EventInfo` for a particular `eventId`. |
30 | pub(crate) trait Storage { |
31 | /// Gets the `EventInfo` for `id` if it exists. |
32 | fn event_info(&self, id: EventId) -> Option<&EventInfo>; |
33 | |
34 | /// Invokes `f` once for each defined `EventInfo` in this storage. |
35 | fn for_each<'a, F>(&'a self, f: F) |
36 | where |
37 | F: FnMut(&'a EventInfo); |
38 | } |
39 | |
40 | impl Storage for Vec<EventInfo> { |
41 | fn event_info(&self, id: EventId) -> Option<&EventInfo> { |
42 | self.get(index:id) |
43 | } |
44 | |
45 | fn for_each<'a, F>(&'a self, f: F) |
46 | where |
47 | F: FnMut(&'a EventInfo), |
48 | { |
49 | self.iter().for_each(f); |
50 | } |
51 | } |
52 | |
53 | /// An interface for initializing a type. Useful for situations where we cannot |
54 | /// inject a configured instance in the constructor of another type. |
55 | pub(crate) trait Init { |
56 | fn init() -> Self; |
57 | } |
58 | |
59 | /// Manages and distributes event notifications to any registered listeners. |
60 | /// |
61 | /// Generic over the underlying storage to allow for domain specific |
62 | /// optimizations (e.g. `eventIds` may or may not be contiguous). |
63 | #[derive (Debug)] |
64 | pub(crate) struct Registry<S> { |
65 | storage: S, |
66 | } |
67 | |
68 | impl<S> Registry<S> { |
69 | fn new(storage: S) -> Self { |
70 | Self { storage } |
71 | } |
72 | } |
73 | |
74 | impl<S: Storage> Registry<S> { |
75 | /// Registers a new listener for `event_id`. |
76 | fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { |
77 | self.storage |
78 | .event_info(event_id) |
79 | .unwrap_or_else(|| panic!("invalid event_id: {}" , event_id)) |
80 | .tx |
81 | .subscribe() |
82 | } |
83 | |
84 | /// Marks `event_id` as having been delivered, without broadcasting it to |
85 | /// any listeners. |
86 | fn record_event(&self, event_id: EventId) { |
87 | if let Some(event_info) = self.storage.event_info(event_id) { |
88 | event_info.pending.store(true, Ordering::SeqCst); |
89 | } |
90 | } |
91 | |
92 | /// Broadcasts all previously recorded events to their respective listeners. |
93 | /// |
94 | /// Returns `true` if an event was delivered to at least one listener. |
95 | fn broadcast(&self) -> bool { |
96 | let mut did_notify = false; |
97 | self.storage.for_each(|event_info| { |
98 | // Any signal of this kind arrived since we checked last? |
99 | if !event_info.pending.swap(false, Ordering::SeqCst) { |
100 | return; |
101 | } |
102 | |
103 | // Ignore errors if there are no listeners |
104 | if event_info.tx.send(()).is_ok() { |
105 | did_notify = true; |
106 | } |
107 | }); |
108 | |
109 | did_notify |
110 | } |
111 | } |
112 | |
113 | pub(crate) struct Globals { |
114 | extra: OsExtraData, |
115 | registry: Registry<OsStorage>, |
116 | } |
117 | |
118 | impl ops::Deref for Globals { |
119 | type Target = OsExtraData; |
120 | |
121 | fn deref(&self) -> &Self::Target { |
122 | &self.extra |
123 | } |
124 | } |
125 | |
126 | impl Globals { |
127 | /// Registers a new listener for `event_id`. |
128 | pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { |
129 | self.registry.register_listener(event_id) |
130 | } |
131 | |
132 | /// Marks `event_id` as having been delivered, without broadcasting it to |
133 | /// any listeners. |
134 | pub(crate) fn record_event(&self, event_id: EventId) { |
135 | self.registry.record_event(event_id); |
136 | } |
137 | |
138 | /// Broadcasts all previously recorded events to their respective listeners. |
139 | /// |
140 | /// Returns `true` if an event was delivered to at least one listener. |
141 | pub(crate) fn broadcast(&self) -> bool { |
142 | self.registry.broadcast() |
143 | } |
144 | |
145 | #[cfg (unix)] |
146 | pub(crate) fn storage(&self) -> &OsStorage { |
147 | &self.registry.storage |
148 | } |
149 | } |
150 | |
151 | fn globals_init() -> Globals |
152 | where |
153 | OsExtraData: 'static + Send + Sync + Init, |
154 | OsStorage: 'static + Send + Sync + Init, |
155 | { |
156 | Globals { |
157 | extra: OsExtraData::init(), |
158 | registry: Registry::new(storage:OsStorage::init()), |
159 | } |
160 | } |
161 | |
162 | pub(crate) fn globals() -> &'static Globals |
163 | where |
164 | OsExtraData: 'static + Send + Sync + Init, |
165 | OsStorage: 'static + Send + Sync + Init, |
166 | { |
167 | static GLOBALS: OnceCell<Globals> = OnceCell::new(); |
168 | |
169 | GLOBALS.get(globals_init) |
170 | } |
171 | |
172 | #[cfg (all(test, not(loom)))] |
173 | mod tests { |
174 | use super::*; |
175 | use crate::runtime::{self, Runtime}; |
176 | use crate::sync::{oneshot, watch}; |
177 | |
178 | use futures::future; |
179 | |
180 | #[test ] |
181 | fn smoke() { |
182 | let rt = rt(); |
183 | rt.block_on(async move { |
184 | let registry = Registry::new(vec![ |
185 | EventInfo::default(), |
186 | EventInfo::default(), |
187 | EventInfo::default(), |
188 | ]); |
189 | |
190 | let first = registry.register_listener(0); |
191 | let second = registry.register_listener(1); |
192 | let third = registry.register_listener(2); |
193 | |
194 | let (fire, wait) = oneshot::channel(); |
195 | |
196 | crate::spawn(async { |
197 | wait.await.expect("wait failed" ); |
198 | |
199 | // Record some events which should get coalesced |
200 | registry.record_event(0); |
201 | registry.record_event(0); |
202 | registry.record_event(1); |
203 | registry.record_event(1); |
204 | registry.broadcast(); |
205 | |
206 | // Yield so the previous broadcast can get received |
207 | // |
208 | // This yields many times since the block_on task is only polled every 61 |
209 | // ticks. |
210 | for _ in 0..100 { |
211 | crate::task::yield_now().await; |
212 | } |
213 | |
214 | // Send subsequent signal |
215 | registry.record_event(0); |
216 | registry.broadcast(); |
217 | |
218 | drop(registry); |
219 | }); |
220 | |
221 | let _ = fire.send(()); |
222 | let all = future::join3(collect(first), collect(second), collect(third)); |
223 | |
224 | let (first_results, second_results, third_results) = all.await; |
225 | assert_eq!(2, first_results.len()); |
226 | assert_eq!(1, second_results.len()); |
227 | assert_eq!(0, third_results.len()); |
228 | }); |
229 | } |
230 | |
231 | #[test ] |
232 | #[should_panic = "invalid event_id: 1" ] |
233 | fn register_panics_on_invalid_input() { |
234 | let registry = Registry::new(vec![EventInfo::default()]); |
235 | |
236 | registry.register_listener(1); |
237 | } |
238 | |
239 | #[test ] |
240 | fn record_invalid_event_does_nothing() { |
241 | let registry = Registry::new(vec![EventInfo::default()]); |
242 | registry.record_event(1302); |
243 | } |
244 | |
245 | #[test ] |
246 | fn broadcast_returns_if_at_least_one_event_fired() { |
247 | let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]); |
248 | |
249 | registry.record_event(0); |
250 | assert!(!registry.broadcast()); |
251 | |
252 | let first = registry.register_listener(0); |
253 | let second = registry.register_listener(1); |
254 | |
255 | registry.record_event(0); |
256 | assert!(registry.broadcast()); |
257 | |
258 | drop(first); |
259 | registry.record_event(0); |
260 | assert!(!registry.broadcast()); |
261 | |
262 | drop(second); |
263 | } |
264 | |
265 | fn rt() -> Runtime { |
266 | runtime::Builder::new_current_thread() |
267 | .enable_time() |
268 | .build() |
269 | .unwrap() |
270 | } |
271 | |
272 | async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> { |
273 | let mut ret = vec![]; |
274 | |
275 | while let Ok(v) = rx.changed().await { |
276 | ret.push(v); |
277 | } |
278 | |
279 | ret |
280 | } |
281 | } |
282 | |