1//! Notify async tasks or threads.
2//!
3//! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
4//!
5//! You can use this crate to turn non-blocking data structures into async or blocking data
6//! structures. See a [simple mutex] implementation that exposes an async and a blocking interface
7//! for acquiring locks.
8//!
9//! [eventcounts]: https://www.1024cores.net/home/lock-free-algorithms/eventcounts
10//! [simple mutex]: https://github.com/smol-rs/event-listener/blob/master/examples/mutex.rs
11//!
12//! # Examples
13//!
14//! Wait until another thread sets a boolean flag:
15//!
16//! ```
17//! use std::sync::atomic::{AtomicBool, Ordering};
18//! use std::sync::Arc;
19//! use std::thread;
20//! use std::time::Duration;
21//! use std::usize;
22//! use event_listener::{Event, Listener};
23//!
24//! let flag = Arc::new(AtomicBool::new(false));
25//! let event = Arc::new(Event::new());
26//!
27//! // Spawn a thread that will set the flag after 1 second.
28//! thread::spawn({
29//! let flag = flag.clone();
30//! let event = event.clone();
31//! move || {
32//! // Wait for a second.
33//! thread::sleep(Duration::from_secs(1));
34//!
35//! // Set the flag.
36//! flag.store(true, Ordering::SeqCst);
37//!
38//! // Notify all listeners that the flag has been set.
39//! event.notify(usize::MAX);
40//! }
41//! });
42//!
43//! // Wait until the flag is set.
44//! loop {
45//! // Check the flag.
46//! if flag.load(Ordering::SeqCst) {
47//! break;
48//! }
49//!
50//! // Start listening for events.
51//! let mut listener = event.listen();
52//!
53//! // Check the flag again after creating the listener.
54//! if flag.load(Ordering::SeqCst) {
55//! break;
56//! }
57//!
58//! // Wait for a notification and continue the loop.
59//! listener.wait();
60//! }
61//! ```
62//!
63//! # Features
64//!
65//! - The `portable-atomic` feature enables the use of the [`portable-atomic`] crate to provide
66//! atomic operations on platforms that don't support them.
67//!
68//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
69
70#![cfg_attr(not(feature = "std"), no_std)]
71#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
72#![doc(
73 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
74)]
75#![doc(
76 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
77)]
78
79#[cfg(not(feature = "std"))]
80extern crate alloc;
81#[cfg(feature = "std")]
82extern crate std as alloc;
83
84#[cfg_attr(feature = "std", path = "std.rs")]
85#[cfg_attr(not(feature = "std"), path = "no_std.rs")]
86mod sys;
87
88mod notify;
89
90#[cfg(not(feature = "std"))]
91use alloc::boxed::Box;
92
93use core::borrow::Borrow;
94use core::fmt;
95use core::future::Future;
96use core::mem::ManuallyDrop;
97use core::pin::Pin;
98use core::ptr;
99use core::task::{Context, Poll, Waker};
100
101#[cfg(all(feature = "std", not(target_family = "wasm")))]
102use {
103 parking::{Parker, Unparker},
104 std::time::{Duration, Instant},
105};
106
107use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
108use sync::{Arc, WithMut};
109
110use notify::{Internal, NotificationPrivate};
111pub use notify::{IntoNotification, Notification};
112
113/// Inner state of [`Event`].
114struct Inner<T> {
115 /// The number of notified entries, or `usize::MAX` if all of them have been notified.
116 ///
117 /// If there are no entries, this value is set to `usize::MAX`.
118 notified: AtomicUsize,
119
120 /// Inner queue of event listeners.
121 ///
122 /// On `std` platforms, this is an intrusive linked list. On `no_std` platforms, this is a
123 /// more traditional `Vec` of listeners, with an atomic queue used as a backup for high
124 /// contention.
125 list: sys::List<T>,
126}
127
128impl<T> Inner<T> {
129 fn new() -> Self {
130 Self {
131 notified: AtomicUsize::new(core::usize::MAX),
132 list: sys::List::new(),
133 }
134 }
135}
136
137/// A synchronization primitive for notifying async tasks and threads.
138///
139/// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners:
140///
141/// 1. [`Event::notify()`] notifies a number of listeners.
142/// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners.
143///
144/// If there are no active listeners at the time a notification is sent, it simply gets lost.
145///
146/// There are two ways for a listener to wait for a notification:
147///
148/// 1. In an asynchronous manner using `.await`.
149/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
150///
151/// If a notified listener is dropped without receiving a notification, dropping will notify
152/// another active listener. Whether one *additional* listener will be notified depends on what
153/// kind of notification was delivered.
154///
155/// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
156pub struct Event<T = ()> {
157 /// A pointer to heap-allocated inner state.
158 ///
159 /// This pointer is initially null and gets lazily initialized on first use. Semantically, it
160 /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
161 /// reference count.
162 inner: AtomicPtr<Inner<T>>,
163}
164
165unsafe impl<T: Send> Send for Event<T> {}
166unsafe impl<T: Send> Sync for Event<T> {}
167
168impl<T> core::panic::UnwindSafe for Event<T> {}
169impl<T> core::panic::RefUnwindSafe for Event<T> {}
170
171impl<T> fmt::Debug for Event<T> {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 match self.try_inner() {
174 Some(inner) => {
175 let notified_count = inner.notified.load(Ordering::Relaxed);
176 let total_count = match inner.list.try_total_listeners() {
177 Some(total_count) => total_count,
178 None => {
179 return f
180 .debug_tuple("Event")
181 .field(&format_args!("<locked>"))
182 .finish()
183 }
184 };
185
186 f.debug_struct("Event")
187 .field("listeners_notified", &notified_count)
188 .field("listeners_total", &total_count)
189 .finish()
190 }
191 None => f
192 .debug_tuple("Event")
193 .field(&format_args!("<uninitialized>"))
194 .finish(),
195 }
196 }
197}
198
199impl Default for Event {
200 #[inline]
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl<T> Event<T> {
207 /// Creates a new `Event` with a tag type.
208 ///
209 /// Tagging cannot be implemented efficiently on `no_std`, so this is only available when the
210 /// `std` feature is enabled.
211 ///
212 /// # Examples
213 ///
214 /// ```
215 /// use event_listener::Event;
216 ///
217 /// let event = Event::<usize>::with_tag();
218 /// ```
219 #[cfg(feature = "std")]
220 #[inline]
221 pub const fn with_tag() -> Self {
222 Self {
223 inner: AtomicPtr::new(ptr::null_mut()),
224 }
225 }
226
227 /// Tell whether any listeners are currently notified.
228 ///
229 /// # Examples
230 ///
231 /// ```
232 /// use event_listener::{Event, Listener};
233 ///
234 /// let event = Event::new();
235 /// let listener = event.listen();
236 /// assert!(!event.is_notified());
237 ///
238 /// event.notify(1);
239 /// assert!(event.is_notified());
240 /// ```
241 #[inline]
242 pub fn is_notified(&self) -> bool {
243 self.try_inner()
244 .map_or(false, |inner| inner.notified.load(Ordering::Acquire) > 0)
245 }
246
247 /// Returns a guard listening for a notification.
248 ///
249 /// This method emits a `SeqCst` fence after registering a listener. For now, this method
250 /// is an alias for calling [`EventListener::new()`], pinning it to the heap, and then
251 /// inserting it into a list.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// use event_listener::Event;
257 ///
258 /// let event = Event::new();
259 /// let listener = event.listen();
260 /// ```
261 ///
262 /// # Caveats
263 ///
264 /// The above example is equivalent to this code:
265 ///
266 /// ```no_compile
267 /// use event_listener::{Event, EventListener};
268 ///
269 /// let event = Event::new();
270 /// let mut listener = Box::pin(EventListener::new());
271 /// listener.listen(&event);
272 /// ```
273 ///
274 /// It creates a new listener, pins it to the heap, and inserts it into the linked list
275 /// of listeners. While this type of usage is simple, it may be desired to eliminate this
276 /// heap allocation. In this case, consider using the [`EventListener::new`] constructor
277 /// directly, which allows for greater control over where the [`EventListener`] is
278 /// allocated. However, users of this `new` method must be careful to ensure that the
279 /// [`EventListener`] is `listen`ing before waiting on it; panics may occur otherwise.
280 #[cold]
281 pub fn listen(&self) -> EventListener<T> {
282 let inner = ManuallyDrop::new(unsafe { Arc::from_raw(self.inner()) });
283
284 // Allocate the listener on the heap and insert it.
285 let mut listener = Box::pin(InnerListener {
286 event: Arc::clone(&inner),
287 listener: None,
288 });
289 listener.as_mut().listen();
290
291 // Return the listener.
292 EventListener { listener }
293 }
294
295 /// Notifies a number of active listeners.
296 ///
297 /// The number is allowed to be zero or exceed the current number of listeners.
298 ///
299 /// The [`Notification`] trait is used to define what kind of notification is delivered.
300 /// The default implementation (implemented on `usize`) is a notification that only notifies
301 /// *at least* the specified number of listeners.
302 ///
303 /// In certain cases, this function emits a `SeqCst` fence before notifying listeners.
304 ///
305 /// This function returns the number of [`EventListener`]s that were notified by this call.
306 ///
307 /// # Caveats
308 ///
309 /// If the `std` feature is disabled, the notification will be delayed under high contention,
310 /// such as when another thread is taking a while to `notify` the event. In this circumstance,
311 /// this function will return `0` instead of the number of listeners actually notified. Therefore
312 /// if the `std` feature is disabled the return value of this function should not be relied upon
313 /// for soundness and should be used only as a hint.
314 ///
315 /// If the `std` feature is enabled, no spurious returns are possible, since the `std`
316 /// implementation uses system locking primitives to ensure there is no unavoidable
317 /// contention.
318 ///
319 /// # Examples
320 ///
321 /// Use the default notification strategy:
322 ///
323 /// ```
324 /// use event_listener::Event;
325 ///
326 /// let event = Event::new();
327 ///
328 /// // This notification gets lost because there are no listeners.
329 /// event.notify(1);
330 ///
331 /// let listener1 = event.listen();
332 /// let listener2 = event.listen();
333 /// let listener3 = event.listen();
334 ///
335 /// // Notifies two listeners.
336 /// //
337 /// // Listener queueing is fair, which means `listener1` and `listener2`
338 /// // get notified here since they start listening before `listener3`.
339 /// event.notify(2);
340 /// ```
341 ///
342 /// Notify without emitting a `SeqCst` fence. This uses the [`relaxed`] notification strategy.
343 /// This is equivalent to calling [`Event::notify_relaxed()`].
344 ///
345 /// [`relaxed`]: IntoNotification::relaxed
346 ///
347 /// ```
348 /// use event_listener::{IntoNotification, Event};
349 /// use std::sync::atomic::{self, Ordering};
350 ///
351 /// let event = Event::new();
352 ///
353 /// // This notification gets lost because there are no listeners.
354 /// event.notify(1.relaxed());
355 ///
356 /// let listener1 = event.listen();
357 /// let listener2 = event.listen();
358 /// let listener3 = event.listen();
359 ///
360 /// // We should emit a fence manually when using relaxed notifications.
361 /// atomic::fence(Ordering::SeqCst);
362 ///
363 /// // Notifies two listeners.
364 /// //
365 /// // Listener queueing is fair, which means `listener1` and `listener2`
366 /// // get notified here since they start listening before `listener3`.
367 /// event.notify(2.relaxed());
368 /// ```
369 ///
370 /// Notify additional listeners. In contrast to [`Event::notify()`], this method will notify `n`
371 /// *additional* listeners that were previously unnotified. This uses the [`additional`]
372 /// notification strategy. This is equivalent to calling [`Event::notify_additional()`].
373 ///
374 /// [`additional`]: IntoNotification::additional
375 ///
376 /// ```
377 /// use event_listener::{IntoNotification, Event};
378 ///
379 /// let event = Event::new();
380 ///
381 /// // This notification gets lost because there are no listeners.
382 /// event.notify(1.additional());
383 ///
384 /// let listener1 = event.listen();
385 /// let listener2 = event.listen();
386 /// let listener3 = event.listen();
387 ///
388 /// // Notifies two listeners.
389 /// //
390 /// // Listener queueing is fair, which means `listener1` and `listener2`
391 /// // get notified here since they start listening before `listener3`.
392 /// event.notify(1.additional());
393 /// event.notify(1.additional());
394 /// ```
395 ///
396 /// Notifies with the [`additional`] and [`relaxed`] strategies at the same time. This is
397 /// equivalent to calling [`Event::notify_additional_relaxed()`].
398 ///
399 /// ```
400 /// use event_listener::{IntoNotification, Event};
401 /// use std::sync::atomic::{self, Ordering};
402 ///
403 /// let event = Event::new();
404 ///
405 /// // This notification gets lost because there are no listeners.
406 /// event.notify(1.additional().relaxed());
407 ///
408 /// let listener1 = event.listen();
409 /// let listener2 = event.listen();
410 /// let listener3 = event.listen();
411 ///
412 /// // We should emit a fence manually when using relaxed notifications.
413 /// atomic::fence(Ordering::SeqCst);
414 ///
415 /// // Notifies two listeners.
416 /// //
417 /// // Listener queueing is fair, which means `listener1` and `listener2`
418 /// // get notified here since they start listening before `listener3`.
419 /// event.notify(1.additional().relaxed());
420 /// event.notify(1.additional().relaxed());
421 /// ```
422 #[inline]
423 pub fn notify(&self, notify: impl IntoNotification<Tag = T>) -> usize {
424 let notify = notify.into_notification();
425
426 // Make sure the notification comes after whatever triggered it.
427 notify.fence(notify::Internal::new());
428
429 if let Some(inner) = self.try_inner() {
430 let limit = if notify.is_additional(Internal::new()) {
431 core::usize::MAX
432 } else {
433 notify.count(Internal::new())
434 };
435
436 // Notify if there is at least one unnotified listener and the number of notified
437 // listeners is less than `limit`.
438 if inner.needs_notification(limit) {
439 return inner.notify(notify);
440 }
441 }
442
443 0
444 }
445
446 /// Return a reference to the inner state if it has been initialized.
447 #[inline]
448 fn try_inner(&self) -> Option<&Inner<T>> {
449 let inner = self.inner.load(Ordering::Acquire);
450 unsafe { inner.as_ref() }
451 }
452
453 /// Returns a raw, initialized pointer to the inner state.
454 ///
455 /// This returns a raw pointer instead of reference because `from_raw`
456 /// requires raw/mut provenance: <https://github.com/rust-lang/rust/pull/67339>.
457 fn inner(&self) -> *const Inner<T> {
458 let mut inner = self.inner.load(Ordering::Acquire);
459
460 // If this is the first use, initialize the state.
461 if inner.is_null() {
462 // Allocate the state on the heap.
463 let new = Arc::new(Inner::<T>::new());
464
465 // Convert the state to a raw pointer.
466 let new = Arc::into_raw(new) as *mut Inner<T>;
467
468 // Replace the null pointer with the new state pointer.
469 inner = self
470 .inner
471 .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire)
472 .unwrap_or_else(|x| x);
473
474 // Check if the old pointer value was indeed null.
475 if inner.is_null() {
476 // If yes, then use the new state pointer.
477 inner = new;
478 } else {
479 // If not, that means a concurrent operation has initialized the state.
480 // In that case, use the old pointer and deallocate the new one.
481 unsafe {
482 drop(Arc::from_raw(new));
483 }
484 }
485 }
486
487 inner
488 }
489
490 /// Get the number of listeners currently listening to this [`Event`].
491 ///
492 /// This call returns the number of [`EventListener`]s that are currently listening to
493 /// this event. It does this by acquiring the internal event lock and reading the listener
494 /// count. Therefore it is only available for `std`-enabled platforms.
495 ///
496 /// # Caveats
497 ///
498 /// This function returns just a snapshot of the number of listeners at this point in time.
499 /// Due to the nature of multi-threaded CPUs, it is possible that this number will be
500 /// inaccurate by the time that this function returns.
501 ///
502 /// It is possible for the actual number to change at any point. Therefore, the number should
503 /// only ever be used as a hint.
504 ///
505 /// # Examples
506 ///
507 /// ```
508 /// use event_listener::Event;
509 ///
510 /// let event = Event::new();
511 ///
512 /// assert_eq!(event.total_listeners(), 0);
513 ///
514 /// let listener1 = event.listen();
515 /// assert_eq!(event.total_listeners(), 1);
516 ///
517 /// let listener2 = event.listen();
518 /// assert_eq!(event.total_listeners(), 2);
519 ///
520 /// drop(listener1);
521 /// drop(listener2);
522 /// assert_eq!(event.total_listeners(), 0);
523 /// ```
524 #[cfg(feature = "std")]
525 #[inline]
526 pub fn total_listeners(&self) -> usize {
527 if let Some(inner) = self.try_inner() {
528 inner.list.total_listeners()
529 } else {
530 0
531 }
532 }
533}
534
535impl Event<()> {
536 /// Creates a new [`Event`].
537 ///
538 /// # Examples
539 ///
540 /// ```
541 /// use event_listener::Event;
542 ///
543 /// let event = Event::new();
544 /// ```
545 #[inline]
546 pub const fn new() -> Self {
547 Self {
548 inner: AtomicPtr::new(ptr::null_mut()),
549 }
550 }
551
552 /// Notifies a number of active listeners without emitting a `SeqCst` fence.
553 ///
554 /// The number is allowed to be zero or exceed the current number of listeners.
555 ///
556 /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
557 /// listeners among the active ones are notified.
558 ///
559 /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence.
560 ///
561 /// This method only works for untagged events. In other cases, it is recommended to instead
562 /// use [`Event::notify()`] like so:
563 ///
564 /// ```
565 /// use event_listener::{IntoNotification, Event};
566 /// let event = Event::new();
567 ///
568 /// // Old way:
569 /// event.notify_relaxed(1);
570 ///
571 /// // New way:
572 /// event.notify(1.relaxed());
573 /// ```
574 ///
575 /// # Examples
576 ///
577 /// ```
578 /// use event_listener::{Event, IntoNotification};
579 /// use std::sync::atomic::{self, Ordering};
580 ///
581 /// let event = Event::new();
582 ///
583 /// // This notification gets lost because there are no listeners.
584 /// event.notify_relaxed(1);
585 ///
586 /// let listener1 = event.listen();
587 /// let listener2 = event.listen();
588 /// let listener3 = event.listen();
589 ///
590 /// // We should emit a fence manually when using relaxed notifications.
591 /// atomic::fence(Ordering::SeqCst);
592 ///
593 /// // Notifies two listeners.
594 /// //
595 /// // Listener queueing is fair, which means `listener1` and `listener2`
596 /// // get notified here since they start listening before `listener3`.
597 /// event.notify_relaxed(2);
598 /// ```
599 #[inline]
600 pub fn notify_relaxed(&self, n: usize) -> usize {
601 self.notify(n.relaxed())
602 }
603
604 /// Notifies a number of active and still unnotified listeners.
605 ///
606 /// The number is allowed to be zero or exceed the current number of listeners.
607 ///
608 /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
609 /// were previously unnotified.
610 ///
611 /// This method emits a `SeqCst` fence before notifying listeners.
612 ///
613 /// This method only works for untagged events. In other cases, it is recommended to instead
614 /// use [`Event::notify()`] like so:
615 ///
616 /// ```
617 /// use event_listener::{IntoNotification, Event};
618 /// let event = Event::new();
619 ///
620 /// // Old way:
621 /// event.notify_additional(1);
622 ///
623 /// // New way:
624 /// event.notify(1.additional());
625 /// ```
626 ///
627 /// # Examples
628 ///
629 /// ```
630 /// use event_listener::Event;
631 ///
632 /// let event = Event::new();
633 ///
634 /// // This notification gets lost because there are no listeners.
635 /// event.notify_additional(1);
636 ///
637 /// let listener1 = event.listen();
638 /// let listener2 = event.listen();
639 /// let listener3 = event.listen();
640 ///
641 /// // Notifies two listeners.
642 /// //
643 /// // Listener queueing is fair, which means `listener1` and `listener2`
644 /// // get notified here since they start listening before `listener3`.
645 /// event.notify_additional(1);
646 /// event.notify_additional(1);
647 /// ```
648 #[inline]
649 pub fn notify_additional(&self, n: usize) -> usize {
650 self.notify(n.additional())
651 }
652
653 /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst`
654 /// fence.
655 ///
656 /// The number is allowed to be zero or exceed the current number of listeners.
657 ///
658 /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
659 /// were previously unnotified.
660 ///
661 /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence.
662 ///
663 /// This method only works for untagged events. In other cases, it is recommended to instead
664 /// use [`Event::notify()`] like so:
665 ///
666 /// ```
667 /// use event_listener::{IntoNotification, Event};
668 /// let event = Event::new();
669 ///
670 /// // Old way:
671 /// event.notify_additional_relaxed(1);
672 ///
673 /// // New way:
674 /// event.notify(1.additional().relaxed());
675 /// ```
676 ///
677 /// # Examples
678 ///
679 /// ```
680 /// use event_listener::Event;
681 /// use std::sync::atomic::{self, Ordering};
682 ///
683 /// let event = Event::new();
684 ///
685 /// // This notification gets lost because there are no listeners.
686 /// event.notify(1);
687 ///
688 /// let listener1 = event.listen();
689 /// let listener2 = event.listen();
690 /// let listener3 = event.listen();
691 ///
692 /// // We should emit a fence manually when using relaxed notifications.
693 /// atomic::fence(Ordering::SeqCst);
694 ///
695 /// // Notifies two listeners.
696 /// //
697 /// // Listener queueing is fair, which means `listener1` and `listener2`
698 /// // get notified here since they start listening before `listener3`.
699 /// event.notify_additional_relaxed(1);
700 /// event.notify_additional_relaxed(1);
701 /// ```
702 #[inline]
703 pub fn notify_additional_relaxed(&self, n: usize) -> usize {
704 self.notify(n.additional().relaxed())
705 }
706}
707
708impl<T> Drop for Event<T> {
709 #[inline]
710 fn drop(&mut self) {
711 self.inner.with_mut(|&mut inner: *mut Inner| {
712 // If the state pointer has been initialized, drop it.
713 if !inner.is_null() {
714 unsafe {
715 drop(Arc::from_raw(ptr:inner));
716 }
717 }
718 })
719 }
720}
721
722/// A handle that is listening to an [`Event`].
723///
724/// This trait represents a type waiting for a notification from an [`Event`]. See the
725/// [`EventListener`] type for more documentation on this trait's usage.
726pub trait Listener<T = ()>: Future<Output = T> + __sealed::Sealed {
727 /// Blocks until a notification is received.
728 ///
729 /// # Examples
730 ///
731 /// ```
732 /// use event_listener::{Event, Listener};
733 ///
734 /// let event = Event::new();
735 /// let mut listener = event.listen();
736 ///
737 /// // Notify `listener`.
738 /// event.notify(1);
739 ///
740 /// // Receive the notification.
741 /// listener.wait();
742 /// ```
743 #[cfg(all(feature = "std", not(target_family = "wasm")))]
744 fn wait(self) -> T;
745
746 /// Blocks until a notification is received or a timeout is reached.
747 ///
748 /// Returns `true` if a notification was received.
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// use std::time::Duration;
754 /// use event_listener::{Event, Listener};
755 ///
756 /// let event = Event::new();
757 /// let mut listener = event.listen();
758 ///
759 /// // There are no notification so this times out.
760 /// assert!(listener.wait_timeout(Duration::from_secs(1)).is_none());
761 /// ```
762 #[cfg(all(feature = "std", not(target_family = "wasm")))]
763 fn wait_timeout(self, timeout: Duration) -> Option<T>;
764
765 /// Blocks until a notification is received or a deadline is reached.
766 ///
767 /// Returns `true` if a notification was received.
768 ///
769 /// # Examples
770 ///
771 /// ```
772 /// use std::time::{Duration, Instant};
773 /// use event_listener::{Event, Listener};
774 ///
775 /// let event = Event::new();
776 /// let mut listener = event.listen();
777 ///
778 /// // There are no notification so this times out.
779 /// assert!(listener.wait_deadline(Instant::now() + Duration::from_secs(1)).is_none());
780 /// ```
781 #[cfg(all(feature = "std", not(target_family = "wasm")))]
782 fn wait_deadline(self, deadline: Instant) -> Option<T>;
783
784 /// Drops this listener and discards its notification (if any) without notifying another
785 /// active listener.
786 ///
787 /// Returns `true` if a notification was discarded.
788 ///
789 /// # Examples
790 ///
791 /// ```
792 /// use event_listener::{Event, Listener};
793 ///
794 /// let event = Event::new();
795 /// let mut listener1 = event.listen();
796 /// let mut listener2 = event.listen();
797 ///
798 /// event.notify(1);
799 ///
800 /// assert!(listener1.discard());
801 /// assert!(!listener2.discard());
802 /// ```
803 fn discard(self) -> bool;
804
805 /// Returns `true` if this listener listens to the given `Event`.
806 ///
807 /// # Examples
808 ///
809 /// ```
810 /// use event_listener::{Event, Listener};
811 ///
812 /// let event = Event::new();
813 /// let listener = event.listen();
814 ///
815 /// assert!(listener.listens_to(&event));
816 /// ```
817 fn listens_to(&self, event: &Event<T>) -> bool;
818
819 /// Returns `true` if both listeners listen to the same `Event`.
820 ///
821 /// # Examples
822 ///
823 /// ```
824 /// use event_listener::{Event, Listener};
825 ///
826 /// let event = Event::new();
827 /// let listener1 = event.listen();
828 /// let listener2 = event.listen();
829 ///
830 /// assert!(listener1.same_event(&listener2));
831 /// ```
832 fn same_event(&self, other: &Self) -> bool;
833}
834
835/// Implement the `Listener` trait using the underlying `InnerListener`.
836macro_rules! forward_impl_to_listener {
837 ($gen:ident => $ty:ty) => {
838 impl<$gen> crate::Listener<$gen> for $ty {
839 #[cfg(all(feature = "std", not(target_family = "wasm")))]
840 fn wait(mut self) -> $gen {
841 self.listener_mut().wait_internal(None).unwrap()
842 }
843
844 #[cfg(all(feature = "std", not(target_family = "wasm")))]
845 fn wait_timeout(mut self, timeout: std::time::Duration) -> Option<$gen> {
846 self.listener_mut()
847 .wait_internal(std::time::Instant::now().checked_add(timeout))
848 }
849
850 #[cfg(all(feature = "std", not(target_family = "wasm")))]
851 fn wait_deadline(mut self, deadline: std::time::Instant) -> Option<$gen> {
852 self.listener_mut().wait_internal(Some(deadline))
853 }
854
855 fn discard(mut self) -> bool {
856 self.listener_mut().discard()
857 }
858
859 #[inline]
860 fn listens_to(&self, event: &Event<$gen>) -> bool {
861 core::ptr::eq::<Inner<$gen>>(
862 &*self.listener().event,
863 event.inner.load(core::sync::atomic::Ordering::Acquire),
864 )
865 }
866
867 #[inline]
868 fn same_event(&self, other: &$ty) -> bool {
869 core::ptr::eq::<Inner<$gen>>(&*self.listener().event, &*other.listener().event)
870 }
871 }
872
873 impl<$gen> Future for $ty {
874 type Output = $gen;
875
876 #[inline]
877 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<$gen> {
878 self.listener_mut().poll_internal(cx)
879 }
880 }
881 };
882}
883
884/// A guard waiting for a notification from an [`Event`].
885///
886/// There are two ways for a listener to wait for a notification:
887///
888/// 1. In an asynchronous manner using `.await`.
889/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
890///
891/// If a notified listener is dropped without receiving a notification, dropping will notify
892/// another active listener. Whether one *additional* listener will be notified depends on what
893/// kind of notification was delivered.
894///
895/// See the [`Listener`] trait for the functionality exposed by this type.
896///
897/// This structure allocates the listener on the heap.
898pub struct EventListener<T = ()> {
899 listener: Pin<Box<InnerListener<T, Arc<Inner<T>>>>>,
900}
901
902unsafe impl<T: Send> Send for EventListener<T> {}
903unsafe impl<T: Send> Sync for EventListener<T> {}
904
905impl<T> core::panic::UnwindSafe for EventListener<T> {}
906impl<T> core::panic::RefUnwindSafe for EventListener<T> {}
907impl<T> Unpin for EventListener<T> {}
908
909impl<T> fmt::Debug for EventListener<T> {
910 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911 f.debug_struct(name:"EventListener").finish_non_exhaustive()
912 }
913}
914
915impl<T> EventListener<T> {
916 #[inline]
917 fn listener(&self) -> &InnerListener<T, Arc<Inner<T>>> {
918 &self.listener
919 }
920
921 #[inline]
922 fn listener_mut(&mut self) -> Pin<&mut InnerListener<T, Arc<Inner<T>>>> {
923 self.listener.as_mut()
924 }
925}
926
927forward_impl_to_listener! { T => EventListener<T> }
928
929/// Create a stack-based event listener for an [`Event`].
930///
931/// [`EventListener`] allocates the listener on the heap. While this works for most use cases, in
932/// practice this heap allocation can be expensive for repeated uses. This method allows for
933/// allocating the listener on the stack instead.
934///
935/// There are limitations to using this macro instead of the [`EventListener`] type, however.
936/// Firstly, it is significantly less flexible. The listener is locked to the current stack
937/// frame, meaning that it can't be returned or put into a place where it would go out of
938/// scope. For instance, this will not work:
939///
940/// ```compile_fail
941/// use event_listener::{Event, Listener, listener};
942///
943/// fn get_listener(event: &Event) -> impl Listener {
944/// listener!(event => cant_return_this);
945/// cant_return_this
946/// }
947/// ```
948///
949/// In addition, the types involved in creating this listener are not able to be named. Therefore
950/// it cannot be used in hand-rolled futures or similar structures.
951///
952/// The type created by this macro implements [`Listener`], allowing it to be used in cases where
953/// [`EventListener`] would normally be used.
954///
955/// ## Example
956///
957/// To use this macro, replace cases where you would normally use this...
958///
959/// ```no_compile
960/// let listener = event.listen();
961/// ```
962///
963/// ...with this:
964///
965/// ```no_compile
966/// listener!(event => listener);
967/// ```
968///
969/// Here is the top level example from this crate's documentation, but using [`listener`] instead
970/// of [`EventListener`].
971///
972/// ```
973/// use std::sync::atomic::{AtomicBool, Ordering};
974/// use std::sync::Arc;
975/// use std::thread;
976/// use std::time::Duration;
977/// use std::usize;
978/// use event_listener::{Event, listener, IntoNotification, Listener};
979///
980/// let flag = Arc::new(AtomicBool::new(false));
981/// let event = Arc::new(Event::new());
982///
983/// // Spawn a thread that will set the flag after 1 second.
984/// thread::spawn({
985/// let flag = flag.clone();
986/// let event = event.clone();
987/// move || {
988/// // Wait for a second.
989/// thread::sleep(Duration::from_secs(1));
990///
991/// // Set the flag.
992/// flag.store(true, Ordering::SeqCst);
993///
994/// // Notify all listeners that the flag has been set.
995/// event.notify(usize::MAX);
996/// }
997/// });
998///
999/// // Wait until the flag is set.
1000/// loop {
1001/// // Check the flag.
1002/// if flag.load(Ordering::SeqCst) {
1003/// break;
1004/// }
1005///
1006/// // Start listening for events.
1007/// // NEW: Changed to a stack-based listener.
1008/// listener!(event => listener);
1009///
1010/// // Check the flag again after creating the listener.
1011/// if flag.load(Ordering::SeqCst) {
1012/// break;
1013/// }
1014///
1015/// // Wait for a notification and continue the loop.
1016/// listener.wait();
1017/// }
1018/// ```
1019#[macro_export]
1020macro_rules! listener {
1021 ($event:expr => $listener:ident) => {
1022 let mut $listener = $crate::__private::StackSlot::new(&$event);
1023 // SAFETY: We shadow $listener so it can't be moved after.
1024 let mut $listener = unsafe { $crate::__private::Pin::new_unchecked(&mut $listener) };
1025 #[allow(unused_mut)]
1026 let mut $listener = $listener.listen();
1027 };
1028}
1029
1030pin_project_lite::pin_project! {
1031 #[project(!Unpin)]
1032 #[project = ListenerProject]
1033 struct InnerListener<T, B: Borrow<Inner<T>>>
1034 where
1035 B: Unpin,
1036 {
1037 // The reference to the original event.
1038 event: B,
1039
1040 // The inner state of the listener.
1041 //
1042 // This is only ever `None` during initialization. After `listen()` has completed, this
1043 // should be `Some`.
1044 #[pin]
1045 listener: Option<sys::Listener<T>>,
1046 }
1047
1048 impl<T, B: Borrow<Inner<T>>> PinnedDrop for InnerListener<T, B>
1049 where
1050 B: Unpin,
1051 {
1052 fn drop(mut this: Pin<&mut Self>) {
1053 // If we're being dropped, we need to remove ourself from the list.
1054 let this = this.project();
1055 (*this.event).borrow().remove(this.listener, true);
1056 }
1057 }
1058}
1059
1060unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Send> Send for InnerListener<T, B> {}
1061unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Sync> Sync for InnerListener<T, B> {}
1062
1063impl<T, B: Borrow<Inner<T>> + Unpin> InnerListener<T, B> {
1064 /// Insert this listener into the linked list.
1065 #[inline]
1066 fn listen(self: Pin<&mut Self>) {
1067 let this = self.project();
1068 (*this.event).borrow().insert(this.listener);
1069 }
1070
1071 /// Wait until the provided deadline.
1072 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1073 fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> Option<T> {
1074 fn parker_and_task() -> (Parker, Task) {
1075 let parker = Parker::new();
1076 let unparker = parker.unparker();
1077 (parker, Task::Unparker(unparker))
1078 }
1079
1080 std::thread_local! {
1081 /// Cached thread-local parker/unparker pair.
1082 static PARKER: (Parker, Task) = parker_and_task();
1083 }
1084
1085 // Try to borrow the thread-local parker/unparker pair.
1086 PARKER
1087 .try_with({
1088 let this = self.as_mut();
1089 |(parker, unparker)| this.wait_with_parker(deadline, parker, unparker.as_task_ref())
1090 })
1091 .unwrap_or_else(|_| {
1092 // If the pair isn't accessible, we may be being called in a destructor.
1093 // Just create a new pair.
1094 let (parker, unparker) = parking::pair();
1095 self.as_mut()
1096 .wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker))
1097 })
1098 }
1099
1100 /// Wait until the provided deadline using the specified parker/unparker pair.
1101 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1102 fn wait_with_parker(
1103 self: Pin<&mut Self>,
1104 deadline: Option<Instant>,
1105 parker: &Parker,
1106 unparker: TaskRef<'_>,
1107 ) -> Option<T> {
1108 let mut this = self.project();
1109 let inner = (*this.event).borrow();
1110
1111 // Set the listener's state to `Task`.
1112 if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
1113 // We were already notified, so we don't need to park.
1114 return Some(tag);
1115 }
1116
1117 // Wait until a notification is received or the timeout is reached.
1118 loop {
1119 match deadline {
1120 None => parker.park(),
1121
1122 Some(deadline) => {
1123 // Make sure we're not timed out already.
1124 let now = Instant::now();
1125 if now >= deadline {
1126 // Remove our entry and check if we were notified.
1127 return inner
1128 .remove(this.listener.as_mut(), false)
1129 .expect("We never removed ourself from the list")
1130 .notified();
1131 }
1132 parker.park_deadline(deadline);
1133 }
1134 }
1135
1136 // See if we were notified.
1137 if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
1138 return Some(tag);
1139 }
1140 }
1141 }
1142
1143 /// Drops this listener and discards its notification (if any) without notifying another
1144 /// active listener.
1145 fn discard(self: Pin<&mut Self>) -> bool {
1146 let this = self.project();
1147 (*this.event)
1148 .borrow()
1149 .remove(this.listener, false)
1150 .map_or(false, |state| state.is_notified())
1151 }
1152
1153 /// Poll this listener for a notification.
1154 fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
1155 let this = self.project();
1156 let inner = (*this.event).borrow();
1157
1158 // Try to register the listener.
1159 match inner
1160 .register(this.listener, TaskRef::Waker(cx.waker()))
1161 .notified()
1162 {
1163 Some(tag) => {
1164 // We were already notified, so we don't need to park.
1165 Poll::Ready(tag)
1166 }
1167
1168 None => {
1169 // We're now waiting for a notification.
1170 Poll::Pending
1171 }
1172 }
1173 }
1174}
1175
1176/// The state of a listener.
1177#[derive(PartialEq)]
1178enum State<T> {
1179 /// The listener was just created.
1180 Created,
1181
1182 /// The listener has received a notification.
1183 ///
1184 /// The `bool` is `true` if this was an "additional" notification.
1185 Notified {
1186 /// Whether or not this is an "additional" notification.
1187 additional: bool,
1188
1189 /// The tag associated with the notification.
1190 tag: T,
1191 },
1192
1193 /// A task is waiting for a notification.
1194 Task(Task),
1195
1196 /// Empty hole used to replace a notified listener.
1197 NotifiedTaken,
1198}
1199
1200impl<T> fmt::Debug for State<T> {
1201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1202 match self {
1203 Self::Created => f.write_str(data:"Created"),
1204 Self::Notified { additional: &bool, .. } => f&mut DebugStruct<'_, '_>
1205 .debug_struct("Notified")
1206 .field(name:"additional", value:additional)
1207 .finish(),
1208 Self::Task(_) => f.write_str(data:"Task(_)"),
1209 Self::NotifiedTaken => f.write_str(data:"NotifiedTaken"),
1210 }
1211 }
1212}
1213
1214impl<T> State<T> {
1215 fn is_notified(&self) -> bool {
1216 matches!(self, Self::Notified { .. } | Self::NotifiedTaken)
1217 }
1218
1219 /// If this state was notified, return the tag associated with the notification.
1220 #[allow(unused)]
1221 fn notified(self) -> Option<T> {
1222 match self {
1223 Self::Notified { tag: T, .. } => Some(tag),
1224 Self::NotifiedTaken => panic!("listener was already notified but taken"),
1225 _ => None,
1226 }
1227 }
1228}
1229
1230/// The result of registering a listener.
1231#[derive(Debug, PartialEq)]
1232enum RegisterResult<T> {
1233 /// The listener was already notified.
1234 Notified(T),
1235
1236 /// The listener has been registered.
1237 Registered,
1238
1239 /// The listener was never inserted into the list.
1240 NeverInserted,
1241}
1242
1243impl<T> RegisterResult<T> {
1244 /// Whether or not the listener was notified.
1245 ///
1246 /// Panics if the listener was never inserted into the list.
1247 fn notified(self) -> Option<T> {
1248 match self {
1249 Self::Notified(tag: T) => Some(tag),
1250 Self::Registered => None,
1251 Self::NeverInserted => panic!("listener was never inserted into the list"),
1252 }
1253 }
1254}
1255
1256/// A task that can be woken up.
1257#[derive(Debug, Clone)]
1258enum Task {
1259 /// A waker that wakes up a future.
1260 Waker(Waker),
1261
1262 /// An unparker that wakes up a thread.
1263 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1264 Unparker(Unparker),
1265}
1266
1267impl Task {
1268 fn as_task_ref(&self) -> TaskRef<'_> {
1269 match self {
1270 Self::Waker(waker: &Waker) => TaskRef::Waker(waker),
1271 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1272 Self::Unparker(unparker: &Unparker) => TaskRef::Unparker(unparker),
1273 }
1274 }
1275
1276 fn wake(self) {
1277 match self {
1278 Self::Waker(waker: Waker) => waker.wake(),
1279 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1280 Self::Unparker(unparker: Unparker) => {
1281 unparker.unpark();
1282 }
1283 }
1284 }
1285}
1286
1287impl PartialEq for Task {
1288 fn eq(&self, other: &Self) -> bool {
1289 self.as_task_ref().will_wake(other.as_task_ref())
1290 }
1291}
1292
1293/// A reference to a task.
1294#[derive(Clone, Copy)]
1295enum TaskRef<'a> {
1296 /// A waker that wakes up a future.
1297 Waker(&'a Waker),
1298
1299 /// An unparker that wakes up a thread.
1300 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1301 Unparker(&'a Unparker),
1302}
1303
1304impl TaskRef<'_> {
1305 /// Tells if this task will wake up the other task.
1306 #[allow(unreachable_patterns)]
1307 fn will_wake(self, other: Self) -> bool {
1308 match (self, other) {
1309 (Self::Waker(a: &Waker), Self::Waker(b: &Waker)) => a.will_wake(b),
1310 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1311 (Self::Unparker(_), Self::Unparker(_)) => {
1312 // TODO: Use unreleased will_unpark API.
1313 false
1314 }
1315 _ => false,
1316 }
1317 }
1318
1319 /// Converts this task reference to a task by cloning.
1320 fn into_task(self) -> Task {
1321 match self {
1322 Self::Waker(waker: &Waker) => Task::Waker(waker.clone()),
1323 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1324 Self::Unparker(unparker: &Unparker) => Task::Unparker(unparker.clone()),
1325 }
1326 }
1327}
1328
1329/// Synchronization primitive implementation.
1330mod sync {
1331 pub(super) use core::cell;
1332
1333 #[cfg(not(feature = "portable-atomic"))]
1334 pub(super) use alloc::sync::Arc;
1335 #[cfg(not(feature = "portable-atomic"))]
1336 pub(super) use core::sync::atomic;
1337
1338 #[cfg(feature = "portable-atomic")]
1339 pub(super) use portable_atomic_crate as atomic;
1340 #[cfg(feature = "portable-atomic")]
1341 pub(super) use portable_atomic_util::Arc;
1342
1343 #[cfg(feature = "std")]
1344 pub(super) use std::sync::{Mutex, MutexGuard};
1345
1346 pub(super) trait WithMut {
1347 type Output;
1348
1349 fn with_mut<F, R>(&mut self, f: F) -> R
1350 where
1351 F: FnOnce(&mut Self::Output) -> R;
1352 }
1353
1354 impl<T> WithMut for atomic::AtomicPtr<T> {
1355 type Output = *mut T;
1356
1357 #[inline]
1358 fn with_mut<F, R>(&mut self, f: F) -> R
1359 where
1360 F: FnOnce(&mut Self::Output) -> R,
1361 {
1362 f(self.get_mut())
1363 }
1364 }
1365}
1366
1367fn __test_send_and_sync() {
1368 fn _assert_send<T: Send>() {}
1369 fn _assert_sync<T: Sync>() {}
1370
1371 _assert_send::<crate::__private::StackSlot<'_, ()>>();
1372 _assert_sync::<crate::__private::StackSlot<'_, ()>>();
1373 _assert_send::<crate::__private::StackListener<'_, '_, ()>>();
1374 _assert_sync::<crate::__private::StackListener<'_, '_, ()>>();
1375 _assert_send::<Event<()>>();
1376 _assert_sync::<Event<()>>();
1377 _assert_send::<EventListener<()>>();
1378 _assert_sync::<EventListener<()>>();
1379}
1380
1381#[doc(hidden)]
1382mod __sealed {
1383 use super::{EventListener, __private::StackListener};
1384
1385 pub trait Sealed {}
1386 impl<T> Sealed for EventListener<T> {}
1387 impl<T> Sealed for StackListener<'_, '_, T> {}
1388}
1389
1390/// Semver exempt module.
1391#[doc(hidden)]
1392pub mod __private {
1393 pub use core::pin::Pin;
1394
1395 use super::{Event, Inner, InnerListener};
1396 use core::fmt;
1397 use core::future::Future;
1398 use core::task::{Context, Poll};
1399
1400 pin_project_lite::pin_project! {
1401 /// Space on the stack where a stack-based listener can be allocated.
1402 #[doc(hidden)]
1403 #[project(!Unpin)]
1404 pub struct StackSlot<'ev, T> {
1405 #[pin]
1406 listener: InnerListener<T, &'ev Inner<T>>
1407 }
1408 }
1409
1410 impl<T> fmt::Debug for StackSlot<'_, T> {
1411 #[inline]
1412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1413 f.debug_struct("StackSlot").finish_non_exhaustive()
1414 }
1415 }
1416
1417 impl<T> core::panic::UnwindSafe for StackSlot<'_, T> {}
1418 impl<T> core::panic::RefUnwindSafe for StackSlot<'_, T> {}
1419 unsafe impl<T> Send for StackSlot<'_, T> {}
1420 unsafe impl<T> Sync for StackSlot<'_, T> {}
1421
1422 impl<'ev, T> StackSlot<'ev, T> {
1423 /// Create a new `StackSlot` on the stack.
1424 #[inline]
1425 #[doc(hidden)]
1426 pub fn new(event: &'ev Event<T>) -> Self {
1427 let inner = unsafe { &*event.inner() };
1428 Self {
1429 listener: InnerListener {
1430 event: inner,
1431 listener: None,
1432 },
1433 }
1434 }
1435
1436 /// Start listening on this `StackSlot`.
1437 #[inline]
1438 #[doc(hidden)]
1439 pub fn listen(mut self: Pin<&mut Self>) -> StackListener<'ev, '_, T> {
1440 // Insert ourselves into the list.
1441 self.as_mut().project().listener.listen();
1442
1443 // We are now listening.
1444 StackListener { slot: self }
1445 }
1446 }
1447
1448 /// A stack-based `EventListener`.
1449 #[doc(hidden)]
1450 pub struct StackListener<'ev, 'stack, T> {
1451 slot: Pin<&'stack mut StackSlot<'ev, T>>,
1452 }
1453
1454 impl<T> core::panic::UnwindSafe for StackListener<'_, '_, T> {}
1455 impl<T> core::panic::RefUnwindSafe for StackListener<'_, '_, T> {}
1456 impl<T> Unpin for StackListener<'_, '_, T> {}
1457
1458 impl<T> fmt::Debug for StackListener<'_, '_, T> {
1459 #[inline]
1460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1461 f.debug_struct("StackListener").finish_non_exhaustive()
1462 }
1463 }
1464
1465 impl<'ev, T> StackListener<'ev, '_, T> {
1466 #[inline]
1467 fn listener(&self) -> &InnerListener<T, &'ev Inner<T>> {
1468 &self.slot.listener
1469 }
1470
1471 #[inline]
1472 fn listener_mut(&mut self) -> Pin<&mut InnerListener<T, &'ev Inner<T>>> {
1473 self.slot.as_mut().project().listener
1474 }
1475 }
1476
1477 forward_impl_to_listener! { T => StackListener<'_, '_, T> }
1478}
1479