1//! libstd-based implementation of `event-listener`.
2//!
3//! This implementation crates an intrusive linked list of listeners.
4
5use crate::notify::{GenericNotify, Internal, Notification};
6use crate::sync::atomic::Ordering;
7use crate::sync::cell::{Cell, UnsafeCell};
8use crate::sync::{Mutex, MutexGuard};
9use crate::{RegisterResult, State, TaskRef};
10
11use core::marker::PhantomPinned;
12use core::mem;
13use core::ops::{Deref, DerefMut};
14use core::pin::Pin;
15use core::ptr::NonNull;
16
17pub(super) struct List<T>(Mutex<Inner<T>>);
18
19struct Inner<T> {
20 /// The head of the linked list.
21 head: Option<NonNull<Link<T>>>,
22
23 /// The tail of the linked list.
24 tail: Option<NonNull<Link<T>>>,
25
26 /// The first unnotified listener.
27 next: Option<NonNull<Link<T>>>,
28
29 /// Total number of listeners.
30 len: usize,
31
32 /// The number of notified listeners.
33 notified: usize,
34}
35
36impl<T> List<T> {
37 /// Create a new, empty event listener list.
38 pub(super) fn new() -> Self {
39 Self(Mutex::new(Inner {
40 head: None,
41 tail: None,
42 next: None,
43 len: 0,
44 notified: 0,
45 }))
46 }
47 // Accessor method because fields are private, not sure how to go around it
48 pub fn total_listeners(&self) -> Result<usize, &str> {
49 match self.0.try_lock() {
50 Ok(mutex: MutexGuard<'_, Inner>) => {
51 let len: usize = mutex.len;
52 Ok(len)
53 }
54 Err(_) => Err("<locked>"),
55 }
56 }
57}
58
59impl<T> crate::Inner<T> {
60 fn lock(&self) -> ListLock<'_, '_, T> {
61 ListLock {
62 inner: self,
63 lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()),
64 }
65 }
66
67 /// Whether or not this number of listeners would lead to a notification.
68 pub(crate) fn needs_notification(&self, limit: usize) -> bool {
69 self.notified.load(Ordering::Acquire) < limit
70 }
71
72 /// Add a new listener to the list.
73 pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
74 let mut inner = self.lock();
75
76 // SAFETY: We are locked, so we can access the inner `link`.
77 let entry = unsafe {
78 listener.as_mut().set(Some(Listener {
79 link: UnsafeCell::new(Link {
80 state: Cell::new(State::Created),
81 prev: Cell::new(inner.tail),
82 next: Cell::new(None),
83 }),
84 _pin: PhantomPinned,
85 }));
86 let listener = listener.as_pin_mut().unwrap();
87
88 // Get the inner pointer.
89 &*listener.link.get()
90 };
91
92 // Replace the tail with the new entry.
93 match mem::replace(&mut inner.tail, Some(entry.into())) {
94 None => inner.head = Some(entry.into()),
95 Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
96 };
97
98 // If there are no unnotified entries, this is the first one.
99 if inner.next.is_none() {
100 inner.next = inner.tail;
101 }
102
103 // Bump the entry count.
104 inner.len += 1;
105 }
106
107 /// Remove a listener from the list.
108 pub(crate) fn remove(
109 &self,
110 listener: Pin<&mut Option<Listener<T>>>,
111 propagate: bool,
112 ) -> Option<State<T>> {
113 self.lock().remove(listener, propagate)
114 }
115
116 /// Notifies a number of entries.
117 #[cold]
118 pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
119 self.lock().notify(notify)
120 }
121
122 /// Register a task to be notified when the event is triggered.
123 ///
124 /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
125 /// isn't inserted, returns `None`.
126 pub(crate) fn register(
127 &self,
128 mut listener: Pin<&mut Option<Listener<T>>>,
129 task: TaskRef<'_>,
130 ) -> RegisterResult<T> {
131 let mut inner = self.lock();
132
133 // SAFETY: We are locked, so we can access the inner `link`.
134 let entry = unsafe {
135 let listener = match listener.as_mut().as_pin_mut() {
136 Some(listener) => listener,
137 None => return RegisterResult::NeverInserted,
138 };
139 &*listener.link.get()
140 };
141
142 // Take out the state and check it.
143 match entry.state.replace(State::NotifiedTaken) {
144 State::Notified { tag, .. } => {
145 // We have been notified, remove the listener.
146 inner.remove(listener, false);
147 RegisterResult::Notified(tag)
148 }
149
150 State::Task(other_task) => {
151 // Only replace the task if it's different.
152 entry.state.set(State::Task({
153 if !task.will_wake(other_task.as_task_ref()) {
154 task.into_task()
155 } else {
156 other_task
157 }
158 }));
159
160 RegisterResult::Registered
161 }
162
163 _ => {
164 // We have not been notified, register the task.
165 entry.state.set(State::Task(task.into_task()));
166 RegisterResult::Registered
167 }
168 }
169 }
170}
171
172impl<T> Inner<T> {
173 fn remove(
174 &mut self,
175 mut listener: Pin<&mut Option<Listener<T>>>,
176 propagate: bool,
177 ) -> Option<State<T>> {
178 let entry = unsafe {
179 let listener = listener.as_mut().as_pin_mut()?;
180
181 // Get the inner pointer.
182 &*listener.link.get()
183 };
184
185 let prev = entry.prev.get();
186 let next = entry.next.get();
187
188 // Unlink from the previous entry.
189 match prev {
190 None => self.head = next,
191 Some(p) => unsafe {
192 p.as_ref().next.set(next);
193 },
194 }
195
196 // Unlink from the next entry.
197 match next {
198 None => self.tail = prev,
199 Some(n) => unsafe {
200 n.as_ref().prev.set(prev);
201 },
202 }
203
204 // If this was the first unnotified entry, update the next pointer.
205 if self.next == Some(entry.into()) {
206 self.next = next;
207 }
208
209 // The entry is now fully unlinked, so we can now take it out safely.
210 let entry = unsafe {
211 listener
212 .get_unchecked_mut()
213 .take()
214 .unwrap()
215 .link
216 .into_inner()
217 };
218
219 let mut state = entry.state.into_inner();
220
221 // Update the notified count.
222 if state.is_notified() {
223 self.notified -= 1;
224
225 if propagate {
226 let state = mem::replace(&mut state, State::NotifiedTaken);
227 if let State::Notified { additional, tag } = state {
228 let tags = {
229 let mut tag = Some(tag);
230 move || tag.take().expect("tag already taken")
231 };
232 self.notify(GenericNotify::new(1, additional, tags));
233 }
234 }
235 }
236 self.len -= 1;
237
238 Some(state)
239 }
240
241 #[cold]
242 fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
243 let mut n = notify.count(Internal::new());
244 let is_additional = notify.is_additional(Internal::new());
245
246 if !is_additional {
247 if n < self.notified {
248 return 0;
249 }
250 n -= self.notified;
251 }
252
253 let original_count = n;
254 while n > 0 {
255 n -= 1;
256
257 // Notify the next entry.
258 match self.next {
259 None => return original_count - n - 1,
260
261 Some(e) => {
262 // Get the entry and move the pointer forwards.
263 let entry = unsafe { e.as_ref() };
264 self.next = entry.next.get();
265
266 // Set the state to `Notified` and notify.
267 let tag = notify.next_tag(Internal::new());
268 if let State::Task(task) = entry.state.replace(State::Notified {
269 additional: is_additional,
270 tag,
271 }) {
272 task.wake();
273 }
274
275 // Bump the notified count.
276 self.notified += 1;
277 }
278 }
279 }
280
281 original_count - n
282 }
283}
284
285struct ListLock<'a, 'b, T> {
286 lock: MutexGuard<'a, Inner<T>>,
287 inner: &'b crate::Inner<T>,
288}
289
290impl<T> Deref for ListLock<'_, '_, T> {
291 type Target = Inner<T>;
292
293 fn deref(&self) -> &Self::Target {
294 &self.lock
295 }
296}
297
298impl<T> DerefMut for ListLock<'_, '_, T> {
299 fn deref_mut(&mut self) -> &mut Self::Target {
300 &mut self.lock
301 }
302}
303
304impl<T> Drop for ListLock<'_, '_, T> {
305 fn drop(&mut self) {
306 let list: &mut Inner = &mut **self;
307
308 // Update the notified count.
309 let notified: usize = if list.notified < list.len {
310 list.notified
311 } else {
312 core::usize::MAX
313 };
314
315 self.inner.notified.store(val:notified, order:Ordering::Release);
316 }
317}
318
319pub(crate) struct Listener<T> {
320 /// The inner link in the linked list.
321 ///
322 /// # Safety
323 ///
324 /// This can only be accessed while the central mutex is locked.
325 link: UnsafeCell<Link<T>>,
326
327 /// This listener cannot be moved after being pinned.
328 _pin: PhantomPinned,
329}
330
331struct Link<T> {
332 /// The current state of the listener.
333 state: Cell<State<T>>,
334
335 /// The previous link in the linked list.
336 prev: Cell<Option<NonNull<Link<T>>>>,
337
338 /// The next link in the linked list.
339 next: Cell<Option<NonNull<Link<T>>>>,
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use futures_lite::pin;
346
347 #[cfg(target_family = "wasm")]
348 use wasm_bindgen_test::wasm_bindgen_test as test;
349
350 macro_rules! make_listeners {
351 ($($id:ident),*) => {
352 $(
353 let $id = Option::<Listener<()>>::None;
354 pin!($id);
355 )*
356 };
357 }
358
359 #[test]
360 fn insert() {
361 let inner = crate::Inner::new();
362 make_listeners!(listen1, listen2, listen3);
363
364 // Register the listeners.
365 inner.insert(listen1.as_mut());
366 inner.insert(listen2.as_mut());
367 inner.insert(listen3.as_mut());
368
369 assert_eq!(inner.lock().len, 3);
370
371 // Remove one.
372 assert_eq!(inner.remove(listen2, false), Some(State::Created));
373 assert_eq!(inner.lock().len, 2);
374
375 // Remove another.
376 assert_eq!(inner.remove(listen1, false), Some(State::Created));
377 assert_eq!(inner.lock().len, 1);
378 }
379
380 #[test]
381 fn drop_non_notified() {
382 let inner = crate::Inner::new();
383 make_listeners!(listen1, listen2, listen3);
384
385 // Register the listeners.
386 inner.insert(listen1.as_mut());
387 inner.insert(listen2.as_mut());
388 inner.insert(listen3.as_mut());
389
390 // Notify one.
391 inner.notify(GenericNotify::new(1, false, || ()));
392
393 // Remove one.
394 inner.remove(listen3, true);
395
396 // Remove the rest.
397 inner.remove(listen1, true);
398 inner.remove(listen2, true);
399 }
400}
401