1 | //! libstd-based implementation of `event-listener`. |
2 | //! |
3 | //! This implementation crates an intrusive linked list of listeners. |
4 | |
5 | use crate::notify::{GenericNotify, Internal, Notification}; |
6 | use crate::sync::atomic::Ordering; |
7 | use crate::sync::cell::{Cell, UnsafeCell}; |
8 | use crate::sync::{Mutex, MutexGuard}; |
9 | use crate::{RegisterResult, State, TaskRef}; |
10 | |
11 | use core::marker::PhantomPinned; |
12 | use core::mem; |
13 | use core::ops::{Deref, DerefMut}; |
14 | use core::pin::Pin; |
15 | use core::ptr::NonNull; |
16 | |
17 | pub(super) struct List<T>(Mutex<Inner<T>>); |
18 | |
19 | struct Inner<T> { |
20 | /// The head of the linked list. |
21 | head: Option<NonNull<Link<T>>>, |
22 | |
23 | /// The tail of the linked list. |
24 | tail: Option<NonNull<Link<T>>>, |
25 | |
26 | /// The first unnotified listener. |
27 | next: Option<NonNull<Link<T>>>, |
28 | |
29 | /// Total number of listeners. |
30 | len: usize, |
31 | |
32 | /// The number of notified listeners. |
33 | notified: usize, |
34 | } |
35 | |
36 | impl<T> List<T> { |
37 | /// Create a new, empty event listener list. |
38 | pub(super) fn new() -> Self { |
39 | Self(Mutex::new(Inner { |
40 | head: None, |
41 | tail: None, |
42 | next: None, |
43 | len: 0, |
44 | notified: 0, |
45 | })) |
46 | } |
47 | // Accessor method because fields are private, not sure how to go around it |
48 | pub fn total_listeners(&self) -> Result<usize, &str> { |
49 | match self.0.try_lock() { |
50 | Ok(mutex: MutexGuard<'_, Inner>) => { |
51 | let len: usize = mutex.len; |
52 | Ok(len) |
53 | } |
54 | Err(_) => Err("<locked>" ), |
55 | } |
56 | } |
57 | } |
58 | |
59 | impl<T> crate::Inner<T> { |
60 | fn lock(&self) -> ListLock<'_, '_, T> { |
61 | ListLock { |
62 | inner: self, |
63 | lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()), |
64 | } |
65 | } |
66 | |
67 | /// Whether or not this number of listeners would lead to a notification. |
68 | pub(crate) fn needs_notification(&self, limit: usize) -> bool { |
69 | self.notified.load(Ordering::Acquire) < limit |
70 | } |
71 | |
72 | /// Add a new listener to the list. |
73 | pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) { |
74 | let mut inner = self.lock(); |
75 | |
76 | // SAFETY: We are locked, so we can access the inner `link`. |
77 | let entry = unsafe { |
78 | listener.as_mut().set(Some(Listener { |
79 | link: UnsafeCell::new(Link { |
80 | state: Cell::new(State::Created), |
81 | prev: Cell::new(inner.tail), |
82 | next: Cell::new(None), |
83 | }), |
84 | _pin: PhantomPinned, |
85 | })); |
86 | let listener = listener.as_pin_mut().unwrap(); |
87 | |
88 | // Get the inner pointer. |
89 | &*listener.link.get() |
90 | }; |
91 | |
92 | // Replace the tail with the new entry. |
93 | match mem::replace(&mut inner.tail, Some(entry.into())) { |
94 | None => inner.head = Some(entry.into()), |
95 | Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, |
96 | }; |
97 | |
98 | // If there are no unnotified entries, this is the first one. |
99 | if inner.next.is_none() { |
100 | inner.next = inner.tail; |
101 | } |
102 | |
103 | // Bump the entry count. |
104 | inner.len += 1; |
105 | } |
106 | |
107 | /// Remove a listener from the list. |
108 | pub(crate) fn remove( |
109 | &self, |
110 | listener: Pin<&mut Option<Listener<T>>>, |
111 | propagate: bool, |
112 | ) -> Option<State<T>> { |
113 | self.lock().remove(listener, propagate) |
114 | } |
115 | |
116 | /// Notifies a number of entries. |
117 | #[cold ] |
118 | pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize { |
119 | self.lock().notify(notify) |
120 | } |
121 | |
122 | /// Register a task to be notified when the event is triggered. |
123 | /// |
124 | /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener |
125 | /// isn't inserted, returns `None`. |
126 | pub(crate) fn register( |
127 | &self, |
128 | mut listener: Pin<&mut Option<Listener<T>>>, |
129 | task: TaskRef<'_>, |
130 | ) -> RegisterResult<T> { |
131 | let mut inner = self.lock(); |
132 | |
133 | // SAFETY: We are locked, so we can access the inner `link`. |
134 | let entry = unsafe { |
135 | let listener = match listener.as_mut().as_pin_mut() { |
136 | Some(listener) => listener, |
137 | None => return RegisterResult::NeverInserted, |
138 | }; |
139 | &*listener.link.get() |
140 | }; |
141 | |
142 | // Take out the state and check it. |
143 | match entry.state.replace(State::NotifiedTaken) { |
144 | State::Notified { tag, .. } => { |
145 | // We have been notified, remove the listener. |
146 | inner.remove(listener, false); |
147 | RegisterResult::Notified(tag) |
148 | } |
149 | |
150 | State::Task(other_task) => { |
151 | // Only replace the task if it's different. |
152 | entry.state.set(State::Task({ |
153 | if !task.will_wake(other_task.as_task_ref()) { |
154 | task.into_task() |
155 | } else { |
156 | other_task |
157 | } |
158 | })); |
159 | |
160 | RegisterResult::Registered |
161 | } |
162 | |
163 | _ => { |
164 | // We have not been notified, register the task. |
165 | entry.state.set(State::Task(task.into_task())); |
166 | RegisterResult::Registered |
167 | } |
168 | } |
169 | } |
170 | } |
171 | |
172 | impl<T> Inner<T> { |
173 | fn remove( |
174 | &mut self, |
175 | mut listener: Pin<&mut Option<Listener<T>>>, |
176 | propagate: bool, |
177 | ) -> Option<State<T>> { |
178 | let entry = unsafe { |
179 | let listener = listener.as_mut().as_pin_mut()?; |
180 | |
181 | // Get the inner pointer. |
182 | &*listener.link.get() |
183 | }; |
184 | |
185 | let prev = entry.prev.get(); |
186 | let next = entry.next.get(); |
187 | |
188 | // Unlink from the previous entry. |
189 | match prev { |
190 | None => self.head = next, |
191 | Some(p) => unsafe { |
192 | p.as_ref().next.set(next); |
193 | }, |
194 | } |
195 | |
196 | // Unlink from the next entry. |
197 | match next { |
198 | None => self.tail = prev, |
199 | Some(n) => unsafe { |
200 | n.as_ref().prev.set(prev); |
201 | }, |
202 | } |
203 | |
204 | // If this was the first unnotified entry, update the next pointer. |
205 | if self.next == Some(entry.into()) { |
206 | self.next = next; |
207 | } |
208 | |
209 | // The entry is now fully unlinked, so we can now take it out safely. |
210 | let entry = unsafe { |
211 | listener |
212 | .get_unchecked_mut() |
213 | .take() |
214 | .unwrap() |
215 | .link |
216 | .into_inner() |
217 | }; |
218 | |
219 | let mut state = entry.state.into_inner(); |
220 | |
221 | // Update the notified count. |
222 | if state.is_notified() { |
223 | self.notified -= 1; |
224 | |
225 | if propagate { |
226 | let state = mem::replace(&mut state, State::NotifiedTaken); |
227 | if let State::Notified { additional, tag } = state { |
228 | let tags = { |
229 | let mut tag = Some(tag); |
230 | move || tag.take().expect("tag already taken" ) |
231 | }; |
232 | self.notify(GenericNotify::new(1, additional, tags)); |
233 | } |
234 | } |
235 | } |
236 | self.len -= 1; |
237 | |
238 | Some(state) |
239 | } |
240 | |
241 | #[cold ] |
242 | fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize { |
243 | let mut n = notify.count(Internal::new()); |
244 | let is_additional = notify.is_additional(Internal::new()); |
245 | |
246 | if !is_additional { |
247 | if n < self.notified { |
248 | return 0; |
249 | } |
250 | n -= self.notified; |
251 | } |
252 | |
253 | let original_count = n; |
254 | while n > 0 { |
255 | n -= 1; |
256 | |
257 | // Notify the next entry. |
258 | match self.next { |
259 | None => return original_count - n - 1, |
260 | |
261 | Some(e) => { |
262 | // Get the entry and move the pointer forwards. |
263 | let entry = unsafe { e.as_ref() }; |
264 | self.next = entry.next.get(); |
265 | |
266 | // Set the state to `Notified` and notify. |
267 | let tag = notify.next_tag(Internal::new()); |
268 | if let State::Task(task) = entry.state.replace(State::Notified { |
269 | additional: is_additional, |
270 | tag, |
271 | }) { |
272 | task.wake(); |
273 | } |
274 | |
275 | // Bump the notified count. |
276 | self.notified += 1; |
277 | } |
278 | } |
279 | } |
280 | |
281 | original_count - n |
282 | } |
283 | } |
284 | |
285 | struct ListLock<'a, 'b, T> { |
286 | lock: MutexGuard<'a, Inner<T>>, |
287 | inner: &'b crate::Inner<T>, |
288 | } |
289 | |
290 | impl<T> Deref for ListLock<'_, '_, T> { |
291 | type Target = Inner<T>; |
292 | |
293 | fn deref(&self) -> &Self::Target { |
294 | &self.lock |
295 | } |
296 | } |
297 | |
298 | impl<T> DerefMut for ListLock<'_, '_, T> { |
299 | fn deref_mut(&mut self) -> &mut Self::Target { |
300 | &mut self.lock |
301 | } |
302 | } |
303 | |
304 | impl<T> Drop for ListLock<'_, '_, T> { |
305 | fn drop(&mut self) { |
306 | let list: &mut Inner = &mut **self; |
307 | |
308 | // Update the notified count. |
309 | let notified: usize = if list.notified < list.len { |
310 | list.notified |
311 | } else { |
312 | core::usize::MAX |
313 | }; |
314 | |
315 | self.inner.notified.store(val:notified, order:Ordering::Release); |
316 | } |
317 | } |
318 | |
319 | pub(crate) struct Listener<T> { |
320 | /// The inner link in the linked list. |
321 | /// |
322 | /// # Safety |
323 | /// |
324 | /// This can only be accessed while the central mutex is locked. |
325 | link: UnsafeCell<Link<T>>, |
326 | |
327 | /// This listener cannot be moved after being pinned. |
328 | _pin: PhantomPinned, |
329 | } |
330 | |
331 | struct Link<T> { |
332 | /// The current state of the listener. |
333 | state: Cell<State<T>>, |
334 | |
335 | /// The previous link in the linked list. |
336 | prev: Cell<Option<NonNull<Link<T>>>>, |
337 | |
338 | /// The next link in the linked list. |
339 | next: Cell<Option<NonNull<Link<T>>>>, |
340 | } |
341 | |
342 | #[cfg (test)] |
343 | mod tests { |
344 | use super::*; |
345 | use futures_lite::pin; |
346 | |
347 | #[cfg (target_family = "wasm" )] |
348 | use wasm_bindgen_test::wasm_bindgen_test as test; |
349 | |
350 | macro_rules! make_listeners { |
351 | ($($id:ident),*) => { |
352 | $( |
353 | let $id = Option::<Listener<()>>::None; |
354 | pin!($id); |
355 | )* |
356 | }; |
357 | } |
358 | |
359 | #[test ] |
360 | fn insert() { |
361 | let inner = crate::Inner::new(); |
362 | make_listeners!(listen1, listen2, listen3); |
363 | |
364 | // Register the listeners. |
365 | inner.insert(listen1.as_mut()); |
366 | inner.insert(listen2.as_mut()); |
367 | inner.insert(listen3.as_mut()); |
368 | |
369 | assert_eq!(inner.lock().len, 3); |
370 | |
371 | // Remove one. |
372 | assert_eq!(inner.remove(listen2, false), Some(State::Created)); |
373 | assert_eq!(inner.lock().len, 2); |
374 | |
375 | // Remove another. |
376 | assert_eq!(inner.remove(listen1, false), Some(State::Created)); |
377 | assert_eq!(inner.lock().len, 1); |
378 | } |
379 | |
380 | #[test ] |
381 | fn drop_non_notified() { |
382 | let inner = crate::Inner::new(); |
383 | make_listeners!(listen1, listen2, listen3); |
384 | |
385 | // Register the listeners. |
386 | inner.insert(listen1.as_mut()); |
387 | inner.insert(listen2.as_mut()); |
388 | inner.insert(listen3.as_mut()); |
389 | |
390 | // Notify one. |
391 | inner.notify(GenericNotify::new(1, false, || ())); |
392 | |
393 | // Remove one. |
394 | inner.remove(listen3, true); |
395 | |
396 | // Remove the rest. |
397 | inner.remove(listen1, true); |
398 | inner.remove(listen2, true); |
399 | } |
400 | } |
401 | |