1 | //! Intrusive linked list-based implementation of `event-listener`. |
2 | //! |
3 | //! This implementation crates an intrusive linked list of listeners. This list |
4 | //! is secured using either a libstd mutex or a critical section. |
5 | |
6 | use crate::notify::{GenericNotify, Internal, Notification}; |
7 | use crate::sync::atomic::Ordering; |
8 | use crate::sync::cell::{Cell, UnsafeCell}; |
9 | use crate::{RegisterResult, State, TaskRef}; |
10 | |
11 | #[cfg (feature = "critical-section" )] |
12 | use core::cell::RefCell; |
13 | #[cfg (all(feature = "std" , not(feature = "critical-section" )))] |
14 | use core::ops::{Deref, DerefMut}; |
15 | |
16 | use core::marker::PhantomPinned; |
17 | use core::mem; |
18 | use core::pin::Pin; |
19 | use core::ptr::NonNull; |
20 | |
21 | pub(super) struct List<T>( |
22 | /// libstd-based implementation uses a normal Muetx to secure the data. |
23 | #[cfg (all(feature = "std" , not(feature = "critical-section" )))] |
24 | crate::sync::Mutex<Inner<T>>, |
25 | /// Critical-section-based implementation uses a CS cell that wraps a RefCell. |
26 | #[cfg (feature = "critical-section" )] |
27 | critical_section::Mutex<RefCell<Inner<T>>>, |
28 | ); |
29 | |
30 | struct Inner<T> { |
31 | /// The head of the linked list. |
32 | head: Option<NonNull<Link<T>>>, |
33 | |
34 | /// The tail of the linked list. |
35 | tail: Option<NonNull<Link<T>>>, |
36 | |
37 | /// The first unnotified listener. |
38 | next: Option<NonNull<Link<T>>>, |
39 | |
40 | /// Total number of listeners. |
41 | len: usize, |
42 | |
43 | /// The number of notified listeners. |
44 | notified: usize, |
45 | } |
46 | |
47 | impl<T> List<T> { |
48 | /// Create a new, empty event listener list. |
49 | pub(super) fn new() -> Self { |
50 | let inner = Inner { |
51 | head: None, |
52 | tail: None, |
53 | next: None, |
54 | len: 0, |
55 | notified: 0, |
56 | }; |
57 | |
58 | #[cfg (feature = "critical-section" )] |
59 | { |
60 | Self(critical_section::Mutex::new(RefCell::new(inner))) |
61 | } |
62 | |
63 | #[cfg (not(feature = "critical-section" ))] |
64 | Self(crate::sync::Mutex::new(inner)) |
65 | } |
66 | |
67 | /// Get the total number of listeners without blocking. |
68 | #[cfg (all(feature = "std" , not(feature = "critical-section" )))] |
69 | pub(crate) fn try_total_listeners(&self) -> Option<usize> { |
70 | self.0.try_lock().ok().map(|list| list.len) |
71 | } |
72 | |
73 | /// Get the total number of listeners without blocking. |
74 | #[cfg (feature = "critical-section" )] |
75 | pub(crate) fn try_total_listeners(&self) -> Option<usize> { |
76 | Some(self.total_listeners()) |
77 | } |
78 | |
79 | /// Get the total number of listeners with blocking. |
80 | #[cfg (all(feature = "std" , not(feature = "critical-section" )))] |
81 | pub(crate) fn total_listeners(&self) -> usize { |
82 | self.0.lock().unwrap_or_else(|e| e.into_inner()).len |
83 | } |
84 | |
85 | /// Get the total number of listeners with blocking. |
86 | #[cfg (feature = "critical-section" )] |
87 | #[allow (unused)] |
88 | pub(crate) fn total_listeners(&self) -> usize { |
89 | critical_section::with(|cs| self.0.borrow(cs).borrow().len) |
90 | } |
91 | } |
92 | |
93 | impl<T> crate::Inner<T> { |
94 | #[cfg (all(feature = "std" , not(feature = "critical-section" )))] |
95 | fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R { |
96 | struct ListLock<'a, 'b, T> { |
97 | lock: crate::sync::MutexGuard<'a, Inner<T>>, |
98 | inner: &'b crate::Inner<T>, |
99 | } |
100 | |
101 | impl<T> Deref for ListLock<'_, '_, T> { |
102 | type Target = Inner<T>; |
103 | |
104 | fn deref(&self) -> &Self::Target { |
105 | &self.lock |
106 | } |
107 | } |
108 | |
109 | impl<T> DerefMut for ListLock<'_, '_, T> { |
110 | fn deref_mut(&mut self) -> &mut Self::Target { |
111 | &mut self.lock |
112 | } |
113 | } |
114 | |
115 | impl<T> Drop for ListLock<'_, '_, T> { |
116 | fn drop(&mut self) { |
117 | update_notified(&self.inner.notified, &self.lock); |
118 | } |
119 | } |
120 | |
121 | let mut list = ListLock { |
122 | inner: self, |
123 | lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()), |
124 | }; |
125 | f(&mut list) |
126 | } |
127 | |
128 | #[cfg (feature = "critical-section" )] |
129 | fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R { |
130 | struct ListWrapper<'a, T> { |
131 | inner: &'a crate::Inner<T>, |
132 | list: &'a mut Inner<T>, |
133 | } |
134 | |
135 | impl<T> Drop for ListWrapper<'_, T> { |
136 | fn drop(&mut self) { |
137 | update_notified(&self.inner.notified, self.list); |
138 | } |
139 | } |
140 | |
141 | critical_section::with(move |cs| { |
142 | let mut list = self.list.0.borrow_ref_mut(cs); |
143 | let wrapper = ListWrapper { |
144 | inner: self, |
145 | list: &mut *list, |
146 | }; |
147 | |
148 | f(wrapper.list) |
149 | }) |
150 | } |
151 | |
152 | /// Add a new listener to the list. |
153 | pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) { |
154 | self.with_inner(|inner| { |
155 | listener.as_mut().set(Some(Listener { |
156 | link: UnsafeCell::new(Link { |
157 | state: Cell::new(State::Created), |
158 | prev: Cell::new(inner.tail), |
159 | next: Cell::new(None), |
160 | }), |
161 | _pin: PhantomPinned, |
162 | })); |
163 | let listener = listener.as_pin_mut().unwrap(); |
164 | |
165 | { |
166 | let entry_guard = listener.link.get(); |
167 | // SAFETY: We are locked, so we can access the inner `link`. |
168 | let entry = unsafe { entry_guard.deref() }; |
169 | |
170 | // Replace the tail with the new entry. |
171 | match mem::replace(&mut inner.tail, Some(entry.into())) { |
172 | None => inner.head = Some(entry.into()), |
173 | Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, |
174 | }; |
175 | } |
176 | |
177 | // If there are no unnotified entries, this is the first one. |
178 | if inner.next.is_none() { |
179 | inner.next = inner.tail; |
180 | } |
181 | |
182 | // Bump the entry count. |
183 | inner.len += 1; |
184 | }); |
185 | } |
186 | |
187 | /// Remove a listener from the list. |
188 | pub(crate) fn remove( |
189 | &self, |
190 | listener: Pin<&mut Option<Listener<T>>>, |
191 | propagate: bool, |
192 | ) -> Option<State<T>> { |
193 | self.with_inner(|inner| inner.remove(listener, propagate)) |
194 | } |
195 | |
196 | /// Notifies a number of entries. |
197 | #[cold ] |
198 | pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize { |
199 | self.with_inner(|inner| inner.notify(notify)) |
200 | } |
201 | |
202 | /// Register a task to be notified when the event is triggered. |
203 | /// |
204 | /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener |
205 | /// isn't inserted, returns `None`. |
206 | pub(crate) fn register( |
207 | &self, |
208 | mut listener: Pin<&mut Option<Listener<T>>>, |
209 | task: TaskRef<'_>, |
210 | ) -> RegisterResult<T> { |
211 | self.with_inner(|inner| { |
212 | let entry_guard = match listener.as_mut().as_pin_mut() { |
213 | Some(listener) => listener.link.get(), |
214 | None => return RegisterResult::NeverInserted, |
215 | }; |
216 | // SAFETY: We are locked, so we can access the inner `link`. |
217 | let entry = unsafe { entry_guard.deref() }; |
218 | |
219 | // Take out the state and check it. |
220 | match entry.state.replace(State::NotifiedTaken) { |
221 | State::Notified { tag, .. } => { |
222 | // We have been notified, remove the listener. |
223 | inner.remove(listener, false); |
224 | RegisterResult::Notified(tag) |
225 | } |
226 | |
227 | State::Task(other_task) => { |
228 | // Only replace the task if it's different. |
229 | entry.state.set(State::Task({ |
230 | if !task.will_wake(other_task.as_task_ref()) { |
231 | task.into_task() |
232 | } else { |
233 | other_task |
234 | } |
235 | })); |
236 | |
237 | RegisterResult::Registered |
238 | } |
239 | |
240 | _ => { |
241 | // We have not been notified, register the task. |
242 | entry.state.set(State::Task(task.into_task())); |
243 | RegisterResult::Registered |
244 | } |
245 | } |
246 | }) |
247 | } |
248 | } |
249 | |
250 | impl<T> Inner<T> { |
251 | fn remove( |
252 | &mut self, |
253 | mut listener: Pin<&mut Option<Listener<T>>>, |
254 | propagate: bool, |
255 | ) -> Option<State<T>> { |
256 | let entry_guard = listener.as_mut().as_pin_mut()?.link.get(); |
257 | let entry = unsafe { entry_guard.deref() }; |
258 | |
259 | let prev = entry.prev.get(); |
260 | let next = entry.next.get(); |
261 | |
262 | // Unlink from the previous entry. |
263 | match prev { |
264 | None => self.head = next, |
265 | Some(p) => unsafe { |
266 | p.as_ref().next.set(next); |
267 | }, |
268 | } |
269 | |
270 | // Unlink from the next entry. |
271 | match next { |
272 | None => self.tail = prev, |
273 | Some(n) => unsafe { |
274 | n.as_ref().prev.set(prev); |
275 | }, |
276 | } |
277 | |
278 | // If this was the first unnotified entry, update the next pointer. |
279 | if self.next == Some(entry.into()) { |
280 | self.next = next; |
281 | } |
282 | |
283 | // The entry is now fully unlinked, so we can now take it out safely. |
284 | let entry = unsafe { |
285 | listener |
286 | .get_unchecked_mut() |
287 | .take() |
288 | .unwrap() |
289 | .link |
290 | .into_inner() |
291 | }; |
292 | |
293 | // This State::Created is immediately dropped and exists as a workaround for the absence of |
294 | // loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();` |
295 | // |
296 | // refs: https://github.com/tokio-rs/loom/pull/341 |
297 | let mut state = entry.state.replace(State::Created); |
298 | |
299 | // Update the notified count. |
300 | if state.is_notified() { |
301 | self.notified -= 1; |
302 | |
303 | if propagate { |
304 | let state = mem::replace(&mut state, State::NotifiedTaken); |
305 | if let State::Notified { additional, tag } = state { |
306 | let tags = { |
307 | let mut tag = Some(tag); |
308 | move || tag.take().expect("tag already taken" ) |
309 | }; |
310 | self.notify(GenericNotify::new(1, additional, tags)); |
311 | } |
312 | } |
313 | } |
314 | self.len -= 1; |
315 | |
316 | Some(state) |
317 | } |
318 | |
319 | #[cold ] |
320 | fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize { |
321 | let mut n = notify.count(Internal::new()); |
322 | let is_additional = notify.is_additional(Internal::new()); |
323 | |
324 | if !is_additional { |
325 | if n < self.notified { |
326 | return 0; |
327 | } |
328 | n -= self.notified; |
329 | } |
330 | |
331 | let original_count = n; |
332 | while n > 0 { |
333 | n -= 1; |
334 | |
335 | // Notify the next entry. |
336 | match self.next { |
337 | None => return original_count - n - 1, |
338 | |
339 | Some(e) => { |
340 | // Get the entry and move the pointer forwards. |
341 | let entry = unsafe { e.as_ref() }; |
342 | self.next = entry.next.get(); |
343 | |
344 | // Set the state to `Notified` and notify. |
345 | let tag = notify.next_tag(Internal::new()); |
346 | if let State::Task(task) = entry.state.replace(State::Notified { |
347 | additional: is_additional, |
348 | tag, |
349 | }) { |
350 | task.wake(); |
351 | } |
352 | |
353 | // Bump the notified count. |
354 | self.notified += 1; |
355 | } |
356 | } |
357 | } |
358 | |
359 | original_count - n |
360 | } |
361 | } |
362 | |
363 | fn update_notified<T>(slot: &crate::sync::atomic::AtomicUsize, list: &Inner<T>) { |
364 | // Update the notified count. |
365 | let notified: usize = if list.notified < list.len { |
366 | list.notified |
367 | } else { |
368 | usize::MAX |
369 | }; |
370 | |
371 | slot.store(val:notified, order:Ordering::Release); |
372 | } |
373 | |
374 | pub(crate) struct Listener<T> { |
375 | /// The inner link in the linked list. |
376 | /// |
377 | /// # Safety |
378 | /// |
379 | /// This can only be accessed while the central mutex is locked. |
380 | link: UnsafeCell<Link<T>>, |
381 | |
382 | /// This listener cannot be moved after being pinned. |
383 | _pin: PhantomPinned, |
384 | } |
385 | |
386 | struct Link<T> { |
387 | /// The current state of the listener. |
388 | state: Cell<State<T>>, |
389 | |
390 | /// The previous link in the linked list. |
391 | prev: Cell<Option<NonNull<Link<T>>>>, |
392 | |
393 | /// The next link in the linked list. |
394 | next: Cell<Option<NonNull<Link<T>>>>, |
395 | } |
396 | |
397 | #[cfg (test)] |
398 | mod tests { |
399 | use super::*; |
400 | use futures_lite::pin; |
401 | |
402 | #[cfg (target_family = "wasm" )] |
403 | use wasm_bindgen_test::wasm_bindgen_test as test; |
404 | |
405 | macro_rules! make_listeners { |
406 | ($($id:ident),*) => { |
407 | $( |
408 | let $id = Option::<Listener<()>>::None; |
409 | pin!($id); |
410 | )* |
411 | }; |
412 | } |
413 | |
414 | #[test ] |
415 | fn insert() { |
416 | let inner = crate::Inner::new(); |
417 | make_listeners!(listen1, listen2, listen3); |
418 | |
419 | // Register the listeners. |
420 | inner.insert(listen1.as_mut()); |
421 | inner.insert(listen2.as_mut()); |
422 | inner.insert(listen3.as_mut()); |
423 | |
424 | assert_eq!(inner.list.try_total_listeners(), Some(3)); |
425 | |
426 | // Remove one. |
427 | assert_eq!(inner.remove(listen2, false), Some(State::Created)); |
428 | assert_eq!(inner.list.try_total_listeners(), Some(2)); |
429 | |
430 | // Remove another. |
431 | assert_eq!(inner.remove(listen1, false), Some(State::Created)); |
432 | assert_eq!(inner.list.try_total_listeners(), Some(1)); |
433 | } |
434 | |
435 | #[test ] |
436 | fn drop_non_notified() { |
437 | let inner = crate::Inner::new(); |
438 | make_listeners!(listen1, listen2, listen3); |
439 | |
440 | // Register the listeners. |
441 | inner.insert(listen1.as_mut()); |
442 | inner.insert(listen2.as_mut()); |
443 | inner.insert(listen3.as_mut()); |
444 | |
445 | // Notify one. |
446 | inner.notify(GenericNotify::new(1, false, || ())); |
447 | |
448 | // Remove one. |
449 | inner.remove(listen3, true); |
450 | |
451 | // Remove the rest. |
452 | inner.remove(listen1, true); |
453 | inner.remove(listen2, true); |
454 | } |
455 | } |
456 | |