1//! Notify async tasks or threads.
2//!
3//! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
4//!
5//! You can use this crate to turn non-blocking data structures into async or blocking data
6//! structures. See a [simple mutex] implementation that exposes an async and a blocking interface
7//! for acquiring locks.
8//!
9//! [eventcounts]: https://www.1024cores.net/home/lock-free-algorithms/eventcounts
10//! [simple mutex]: https://github.com/smol-rs/event-listener/blob/master/examples/mutex.rs
11//!
12//! # Examples
13//!
14//! Wait until another thread sets a boolean flag:
15//!
16//! ```
17//! use std::sync::atomic::{AtomicBool, Ordering};
18//! use std::sync::Arc;
19//! use std::thread;
20//! use std::time::Duration;
21//! use std::usize;
22//! use event_listener::Event;
23//!
24//! let flag = Arc::new(AtomicBool::new(false));
25//! let event = Arc::new(Event::new());
26//!
27//! // Spawn a thread that will set the flag after 1 second.
28//! thread::spawn({
29//! let flag = flag.clone();
30//! let event = event.clone();
31//! move || {
32//! // Wait for a second.
33//! thread::sleep(Duration::from_secs(1));
34//!
35//! // Set the flag.
36//! flag.store(true, Ordering::SeqCst);
37//!
38//! // Notify all listeners that the flag has been set.
39//! event.notify(usize::MAX);
40//! }
41//! });
42//!
43//! // Wait until the flag is set.
44//! loop {
45//! // Check the flag.
46//! if flag.load(Ordering::SeqCst) {
47//! break;
48//! }
49//!
50//! // Start listening for events.
51//! let mut listener = event.listen();
52//!
53//! // Check the flag again after creating the listener.
54//! if flag.load(Ordering::SeqCst) {
55//! break;
56//! }
57//!
58//! // Wait for a notification and continue the loop.
59//! listener.as_mut().wait();
60//! }
61//! ```
62//!
63//! # Features
64//!
65//! - The `portable-atomic` feature enables the use of the [`portable-atomic`] crate to provide
66//! atomic operations on platforms that don't support them.
67//!
68//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
69
70#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
71#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
72#![doc(
73 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
74)]
75#![doc(
76 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
77)]
78
79extern crate alloc;
80
81#[cfg_attr(feature = "std", path = "std.rs")]
82#[cfg_attr(not(feature = "std"), path = "no_std.rs")]
83mod sys;
84
85mod notify;
86
87use alloc::boxed::Box;
88
89use core::borrow::Borrow;
90use core::fmt;
91use core::future::Future;
92use core::mem::ManuallyDrop;
93use core::pin::Pin;
94use core::ptr;
95use core::task::{Context, Poll, Waker};
96
97#[cfg(all(feature = "std", not(target_family = "wasm")))]
98use {
99 parking::{Parker, Unparker},
100 std::time::{Duration, Instant},
101};
102
103use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
104use sync::{Arc, WithMut};
105
106use notify::{Internal, NotificationPrivate};
107pub use notify::{IntoNotification, Notification};
108
109/// Useful traits for notifications.
110pub mod prelude {
111 pub use crate::{IntoNotification, Notification};
112}
113
114/// Inner state of [`Event`].
115struct Inner<T> {
116 /// The number of notified entries, or `usize::MAX` if all of them have been notified.
117 ///
118 /// If there are no entries, this value is set to `usize::MAX`.
119 notified: AtomicUsize,
120
121 /// Inner queue of event listeners.
122 ///
123 /// On `std` platforms, this is an intrusive linked list. On `no_std` platforms, this is a
124 /// more traditional `Vec` of listeners, with an atomic queue used as a backup for high
125 /// contention.
126 list: sys::List<T>,
127}
128
129impl<T> Inner<T> {
130 fn new() -> Self {
131 Self {
132 notified: AtomicUsize::new(core::usize::MAX),
133 list: sys::List::new(),
134 }
135 }
136}
137
138/// A synchronization primitive for notifying async tasks and threads.
139///
140/// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners:
141///
142/// 1. [`Event::notify()`] notifies a number of listeners.
143/// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners.
144///
145/// If there are no active listeners at the time a notification is sent, it simply gets lost.
146///
147/// There are two ways for a listener to wait for a notification:
148///
149/// 1. In an asynchronous manner using `.await`.
150/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
151///
152/// If a notified listener is dropped without receiving a notification, dropping will notify
153/// another active listener. Whether one *additional* listener will be notified depends on what
154/// kind of notification was delivered.
155///
156/// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
157pub struct Event<T = ()> {
158 /// A pointer to heap-allocated inner state.
159 ///
160 /// This pointer is initially null and gets lazily initialized on first use. Semantically, it
161 /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
162 /// reference count.
163 inner: AtomicPtr<Inner<T>>,
164}
165
166unsafe impl<T: Send> Send for Event<T> {}
167unsafe impl<T: Send> Sync for Event<T> {}
168
169impl<T> core::panic::UnwindSafe for Event<T> {}
170impl<T> core::panic::RefUnwindSafe for Event<T> {}
171
172impl<T> fmt::Debug for Event<T> {
173 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174 match self.try_inner() {
175 Some(inner) => {
176 let notified_count = inner.notified.load(Ordering::Relaxed);
177 let total_count = match inner.list.total_listeners() {
178 Ok(total_count) => total_count,
179 Err(_) => {
180 return f
181 .debug_tuple("Event")
182 .field(&format_args!("<locked>"))
183 .finish()
184 }
185 };
186
187 f.debug_struct("Event")
188 .field("listeners_notified", &notified_count)
189 .field("listeners_total", &total_count)
190 .finish()
191 }
192 None => f
193 .debug_tuple("Event")
194 .field(&format_args!("<uninitialized>"))
195 .finish(),
196 }
197 }
198}
199
200impl Default for Event {
201 #[inline]
202 fn default() -> Self {
203 Self::new()
204 }
205}
206
207impl<T> Event<T> {
208 /// Creates a new `Event` with a tag type.
209 ///
210 /// Tagging cannot be implemented efficiently on `no_std`, so this is only available when the
211 /// `std` feature is enabled.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use event_listener::Event;
217 ///
218 /// let event = Event::<usize>::with_tag();
219 /// ```
220 #[cfg(feature = "std")]
221 #[inline]
222 pub const fn with_tag() -> Self {
223 Self {
224 inner: AtomicPtr::new(ptr::null_mut()),
225 }
226 }
227
228 /// Tell whether any listeners are currently notified.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use event_listener::Event;
234 ///
235 /// let event = Event::new();
236 /// let listener = event.listen();
237 /// assert!(!event.is_notified());
238 ///
239 /// event.notify(1);
240 /// assert!(event.is_notified());
241 /// ```
242 #[inline]
243 pub fn is_notified(&self) -> bool {
244 self.try_inner()
245 .map_or(false, |inner| inner.notified.load(Ordering::Acquire) > 0)
246 }
247
248 /// Returns a guard listening for a notification.
249 ///
250 /// This method emits a `SeqCst` fence after registering a listener. For now, this method
251 /// is an alias for calling [`EventListener::new()`], pinning it to the heap, and then
252 /// inserting it into a list.
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// use event_listener::Event;
258 ///
259 /// let event = Event::new();
260 /// let listener = event.listen();
261 /// ```
262 ///
263 /// # Caveats
264 ///
265 /// The above example is equivalent to this code:
266 ///
267 /// ```
268 /// use event_listener::{Event, EventListener};
269 ///
270 /// let event = Event::new();
271 /// let mut listener = Box::pin(EventListener::new());
272 /// listener.as_mut().listen(&event);
273 /// ```
274 ///
275 /// It creates a new listener, pins it to the heap, and inserts it into the linked list
276 /// of listeners. While this type of usage is simple, it may be desired to eliminate this
277 /// heap allocation. In this case, consider using the [`EventListener::new`] constructor
278 /// directly, which allows for greater control over where the [`EventListener`] is
279 /// allocated. However, users of this `new` method must be careful to ensure that the
280 /// [`EventListener`] is `listen`ing before waiting on it; panics may occur otherwise.
281 #[cold]
282 pub fn listen(&self) -> Pin<Box<EventListener<T>>> {
283 let mut listener = Box::pin(EventListener::new());
284 listener.as_mut().listen(self);
285 listener
286 }
287
288 /// Notifies a number of active listeners.
289 ///
290 /// The number is allowed to be zero or exceed the current number of listeners.
291 ///
292 /// The [`Notification`] trait is used to define what kind of notification is delivered.
293 /// The default implementation (implemented on `usize`) is a notification that only notifies
294 /// *at least* the specified number of listeners.
295 ///
296 /// In certain cases, this function emits a `SeqCst` fence before notifying listeners.
297 ///
298 /// This function returns the number of [`EventListener`]s that were notified by this call.
299 ///
300 /// # Caveats
301 ///
302 /// If the `std` feature is disabled, the notification will be delayed under high contention,
303 /// such as when another thread is taking a while to `notify` the event. In this circumstance,
304 /// this function will return `0` instead of the number of listeners actually notified. Therefore
305 /// if the `std` feature is disabled the return value of this function should not be relied upon
306 /// for soundness and should be used only as a hint.
307 ///
308 /// If the `std` feature is enabled, no spurious returns are possible, since the `std`
309 /// implementation uses system locking primitives to ensure there is no unavoidable
310 /// contention.
311 ///
312 /// # Examples
313 ///
314 /// Use the default notification strategy:
315 ///
316 /// ```
317 /// use event_listener::Event;
318 ///
319 /// let event = Event::new();
320 ///
321 /// // This notification gets lost because there are no listeners.
322 /// event.notify(1);
323 ///
324 /// let listener1 = event.listen();
325 /// let listener2 = event.listen();
326 /// let listener3 = event.listen();
327 ///
328 /// // Notifies two listeners.
329 /// //
330 /// // Listener queueing is fair, which means `listener1` and `listener2`
331 /// // get notified here since they start listening before `listener3`.
332 /// event.notify(2);
333 /// ```
334 ///
335 /// Notify without emitting a `SeqCst` fence. This uses the [`relaxed`] notification strategy.
336 /// This is equivalent to calling [`Event::notify_relaxed()`].
337 ///
338 /// [`relaxed`]: IntoNotification::relaxed
339 ///
340 /// ```
341 /// use event_listener::{prelude::*, Event};
342 /// use std::sync::atomic::{self, Ordering};
343 ///
344 /// let event = Event::new();
345 ///
346 /// // This notification gets lost because there are no listeners.
347 /// event.notify(1.relaxed());
348 ///
349 /// let listener1 = event.listen();
350 /// let listener2 = event.listen();
351 /// let listener3 = event.listen();
352 ///
353 /// // We should emit a fence manually when using relaxed notifications.
354 /// atomic::fence(Ordering::SeqCst);
355 ///
356 /// // Notifies two listeners.
357 /// //
358 /// // Listener queueing is fair, which means `listener1` and `listener2`
359 /// // get notified here since they start listening before `listener3`.
360 /// event.notify(2.relaxed());
361 /// ```
362 ///
363 /// Notify additional listeners. In contrast to [`Event::notify()`], this method will notify `n`
364 /// *additional* listeners that were previously unnotified. This uses the [`additional`]
365 /// notification strategy. This is equivalent to calling [`Event::notify_additional()`].
366 ///
367 /// [`additional`]: IntoNotification::additional
368 ///
369 /// ```
370 /// use event_listener::{prelude::*, Event};
371 ///
372 /// let event = Event::new();
373 ///
374 /// // This notification gets lost because there are no listeners.
375 /// event.notify(1.additional());
376 ///
377 /// let listener1 = event.listen();
378 /// let listener2 = event.listen();
379 /// let listener3 = event.listen();
380 ///
381 /// // Notifies two listeners.
382 /// //
383 /// // Listener queueing is fair, which means `listener1` and `listener2`
384 /// // get notified here since they start listening before `listener3`.
385 /// event.notify(1.additional());
386 /// event.notify(1.additional());
387 /// ```
388 ///
389 /// Notifies with the [`additional`] and [`relaxed`] strategies at the same time. This is
390 /// equivalent to calling [`Event::notify_additional_relaxed()`].
391 ///
392 /// ```
393 /// use event_listener::{prelude::*, Event};
394 /// use std::sync::atomic::{self, Ordering};
395 ///
396 /// let event = Event::new();
397 ///
398 /// // This notification gets lost because there are no listeners.
399 /// event.notify(1.additional().relaxed());
400 ///
401 /// let listener1 = event.listen();
402 /// let listener2 = event.listen();
403 /// let listener3 = event.listen();
404 ///
405 /// // We should emit a fence manually when using relaxed notifications.
406 /// atomic::fence(Ordering::SeqCst);
407 ///
408 /// // Notifies two listeners.
409 /// //
410 /// // Listener queueing is fair, which means `listener1` and `listener2`
411 /// // get notified here since they start listening before `listener3`.
412 /// event.notify(1.additional().relaxed());
413 /// event.notify(1.additional().relaxed());
414 /// ```
415 #[inline]
416 pub fn notify(&self, notify: impl IntoNotification<Tag = T>) -> usize {
417 let notify = notify.into_notification();
418
419 // Make sure the notification comes after whatever triggered it.
420 notify.fence(notify::Internal::new());
421
422 if let Some(inner) = self.try_inner() {
423 let limit = if notify.is_additional(Internal::new()) {
424 core::usize::MAX
425 } else {
426 notify.count(Internal::new())
427 };
428
429 // Notify if there is at least one unnotified listener and the number of notified
430 // listeners is less than `limit`.
431 if inner.needs_notification(limit) {
432 return inner.notify(notify);
433 }
434 }
435
436 0
437 }
438
439 /// Return a reference to the inner state if it has been initialized.
440 #[inline]
441 fn try_inner(&self) -> Option<&Inner<T>> {
442 let inner = self.inner.load(Ordering::Acquire);
443 unsafe { inner.as_ref() }
444 }
445
446 /// Returns a raw, initialized pointer to the inner state.
447 ///
448 /// This returns a raw pointer instead of reference because `from_raw`
449 /// requires raw/mut provenance: <https://github.com/rust-lang/rust/pull/67339>.
450 fn inner(&self) -> *const Inner<T> {
451 let mut inner = self.inner.load(Ordering::Acquire);
452
453 // If this is the first use, initialize the state.
454 if inner.is_null() {
455 // Allocate the state on the heap.
456 let new = Arc::new(Inner::<T>::new());
457
458 // Convert the state to a raw pointer.
459 let new = Arc::into_raw(new) as *mut Inner<T>;
460
461 // Replace the null pointer with the new state pointer.
462 inner = self
463 .inner
464 .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire)
465 .unwrap_or_else(|x| x);
466
467 // Check if the old pointer value was indeed null.
468 if inner.is_null() {
469 // If yes, then use the new state pointer.
470 inner = new;
471 } else {
472 // If not, that means a concurrent operation has initialized the state.
473 // In that case, use the old pointer and deallocate the new one.
474 unsafe {
475 drop(Arc::from_raw(new));
476 }
477 }
478 }
479
480 inner
481 }
482}
483
484impl Event<()> {
485 /// Creates a new [`Event`].
486 ///
487 /// # Examples
488 ///
489 /// ```
490 /// use event_listener::Event;
491 ///
492 /// let event = Event::new();
493 /// ```
494 #[inline]
495 pub const fn new() -> Self {
496 Self {
497 inner: AtomicPtr::new(ptr::null_mut()),
498 }
499 }
500
501 /// Notifies a number of active listeners without emitting a `SeqCst` fence.
502 ///
503 /// The number is allowed to be zero or exceed the current number of listeners.
504 ///
505 /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
506 /// listeners among the active ones are notified.
507 ///
508 /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence.
509 ///
510 /// This method only works for untagged events. In other cases, it is recommended to instead
511 /// use [`Event::notify()`] like so:
512 ///
513 /// ```
514 /// use event_listener::{prelude::*, Event};
515 /// let event = Event::new();
516 ///
517 /// // Old way:
518 /// event.notify_relaxed(1);
519 ///
520 /// // New way:
521 /// event.notify(1.relaxed());
522 /// ```
523 ///
524 /// # Examples
525 ///
526 /// ```
527 /// use event_listener::Event;
528 /// use std::sync::atomic::{self, Ordering};
529 ///
530 /// let event = Event::new();
531 ///
532 /// // This notification gets lost because there are no listeners.
533 /// event.notify_relaxed(1);
534 ///
535 /// let listener1 = event.listen();
536 /// let listener2 = event.listen();
537 /// let listener3 = event.listen();
538 ///
539 /// // We should emit a fence manually when using relaxed notifications.
540 /// atomic::fence(Ordering::SeqCst);
541 ///
542 /// // Notifies two listeners.
543 /// //
544 /// // Listener queueing is fair, which means `listener1` and `listener2`
545 /// // get notified here since they start listening before `listener3`.
546 /// event.notify_relaxed(2);
547 /// ```
548 #[inline]
549 pub fn notify_relaxed(&self, n: usize) -> usize {
550 self.notify(n.relaxed())
551 }
552
553 /// Notifies a number of active and still unnotified listeners.
554 ///
555 /// The number is allowed to be zero or exceed the current number of listeners.
556 ///
557 /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
558 /// were previously unnotified.
559 ///
560 /// This method emits a `SeqCst` fence before notifying listeners.
561 ///
562 /// This method only works for untagged events. In other cases, it is recommended to instead
563 /// use [`Event::notify()`] like so:
564 ///
565 /// ```
566 /// use event_listener::{prelude::*, Event};
567 /// let event = Event::new();
568 ///
569 /// // Old way:
570 /// event.notify_additional(1);
571 ///
572 /// // New way:
573 /// event.notify(1.additional());
574 /// ```
575 ///
576 /// # Examples
577 ///
578 /// ```
579 /// use event_listener::Event;
580 ///
581 /// let event = Event::new();
582 ///
583 /// // This notification gets lost because there are no listeners.
584 /// event.notify_additional(1);
585 ///
586 /// let listener1 = event.listen();
587 /// let listener2 = event.listen();
588 /// let listener3 = event.listen();
589 ///
590 /// // Notifies two listeners.
591 /// //
592 /// // Listener queueing is fair, which means `listener1` and `listener2`
593 /// // get notified here since they start listening before `listener3`.
594 /// event.notify_additional(1);
595 /// event.notify_additional(1);
596 /// ```
597 #[inline]
598 pub fn notify_additional(&self, n: usize) -> usize {
599 self.notify(n.additional())
600 }
601
602 /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst`
603 /// fence.
604 ///
605 /// The number is allowed to be zero or exceed the current number of listeners.
606 ///
607 /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
608 /// were previously unnotified.
609 ///
610 /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence.
611 ///
612 /// This method only works for untagged events. In other cases, it is recommended to instead
613 /// use [`Event::notify()`] like so:
614 ///
615 /// ```
616 /// use event_listener::{prelude::*, Event};
617 /// let event = Event::new();
618 ///
619 /// // Old way:
620 /// event.notify_additional_relaxed(1);
621 ///
622 /// // New way:
623 /// event.notify(1.additional().relaxed());
624 /// ```
625 ///
626 /// # Examples
627 ///
628 /// ```
629 /// use event_listener::Event;
630 /// use std::sync::atomic::{self, Ordering};
631 ///
632 /// let event = Event::new();
633 ///
634 /// // This notification gets lost because there are no listeners.
635 /// event.notify(1);
636 ///
637 /// let listener1 = event.listen();
638 /// let listener2 = event.listen();
639 /// let listener3 = event.listen();
640 ///
641 /// // We should emit a fence manually when using relaxed notifications.
642 /// atomic::fence(Ordering::SeqCst);
643 ///
644 /// // Notifies two listeners.
645 /// //
646 /// // Listener queueing is fair, which means `listener1` and `listener2`
647 /// // get notified here since they start listening before `listener3`.
648 /// event.notify_additional_relaxed(1);
649 /// event.notify_additional_relaxed(1);
650 /// ```
651 #[inline]
652 pub fn notify_additional_relaxed(&self, n: usize) -> usize {
653 self.notify(n.additional().relaxed())
654 }
655}
656
657impl<T> Drop for Event<T> {
658 #[inline]
659 fn drop(&mut self) {
660 self.inner.with_mut(|&mut inner: *mut Inner| {
661 // If the state pointer has been initialized, drop it.
662 if !inner.is_null() {
663 unsafe {
664 drop(Arc::from_raw(ptr:inner));
665 }
666 }
667 })
668 }
669}
670
671pin_project_lite::pin_project! {
672 /// A guard waiting for a notification from an [`Event`].
673 ///
674 /// There are two ways for a listener to wait for a notification:
675 ///
676 /// 1. In an asynchronous manner using `.await`.
677 /// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
678 ///
679 /// If a notified listener is dropped without receiving a notification, dropping will notify
680 /// another active listener. Whether one *additional* listener will be notified depends on what
681 /// kind of notification was delivered.
682 ///
683 /// The listener is not registered into the linked list inside of the [`Event`] by default if
684 /// it is created via the `new()` method. It needs to be pinned first before being inserted
685 /// using the `listen()` method. After the listener has begun `listen`ing, the user can
686 /// `await` it like a future or call `wait()` to block the current thread until it is notified.
687 ///
688 /// ## Examples
689 ///
690 /// ```
691 /// use event_listener::{Event, EventListener};
692 /// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
693 /// use std::thread;
694 /// use std::time::Duration;
695 ///
696 /// // Some flag to wait on.
697 /// let flag = Arc::new(AtomicBool::new(false));
698 ///
699 /// // Create an event to wait on.
700 /// let event = Arc::new(Event::new());
701 ///
702 /// thread::spawn({
703 /// let flag = flag.clone();
704 /// let event = event.clone();
705 /// move || {
706 /// thread::sleep(Duration::from_secs(2));
707 /// flag.store(true, Ordering::SeqCst);
708 ///
709 /// // Wake up the listener.
710 /// event.notify_additional(std::usize::MAX);
711 /// }
712 /// });
713 ///
714 /// let listener = EventListener::new();
715 ///
716 /// // Make sure that the event listener is pinned before doing anything else.
717 /// //
718 /// // We pin the listener to the stack here, as it lets us avoid a heap allocation.
719 /// futures_lite::pin!(listener);
720 ///
721 /// // Wait for the flag to become ready.
722 /// loop {
723 /// if flag.load(Ordering::Acquire) {
724 /// // We are done.
725 /// break;
726 /// }
727 ///
728 /// if listener.is_listening() {
729 /// // We are inserted into the linked list and we can now wait.
730 /// listener.as_mut().wait();
731 /// } else {
732 /// // We need to insert ourselves into the list. Since this insertion is an atomic
733 /// // operation, we should check the flag again before waiting.
734 /// listener.as_mut().listen(&event);
735 /// }
736 /// }
737 /// ```
738 ///
739 /// The above example is equivalent to the one provided in the crate level example. However,
740 /// it has some advantages. By directly creating the listener with `EventListener::new()`,
741 /// we have control over how the listener is handled in memory. We take advantage of this by
742 /// pinning the `listener` variable to the stack using the [`futures_lite::pin`] macro. In
743 /// contrast, `Event::listen` binds the listener to the heap.
744 ///
745 /// However, this additional power comes with additional responsibility. By default, the
746 /// event listener is created in an "uninserted" state. This property means that any
747 /// notifications delivered to the [`Event`] by default will not wake up this listener.
748 /// Before any notifications can be received, the `listen()` method must be called on
749 /// `EventListener` to insert it into the list of listeners. After a `.await` or a `wait()`
750 /// call has completed, `listen()` must be called again if the user is still interested in
751 /// any events.
752 ///
753 /// [`futures_lite::pin`]: https://docs.rs/futures-lite/latest/futures_lite/macro.pin.html
754 #[project(!Unpin)] // implied by Listener, but can generate better docs
755 pub struct EventListener<T = ()> {
756 #[pin]
757 listener: Listener<T, Arc<Inner<T>>>,
758 }
759}
760
761unsafe impl<T: Send> Send for EventListener<T> {}
762unsafe impl<T: Send> Sync for EventListener<T> {}
763
764impl<T> core::panic::UnwindSafe for EventListener<T> {}
765impl<T> core::panic::RefUnwindSafe for EventListener<T> {}
766
767impl<T> Default for EventListener<T> {
768 fn default() -> Self {
769 Self::new()
770 }
771}
772
773impl<T> fmt::Debug for EventListener<T> {
774 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
775 f&mut DebugStruct<'_, '_>.debug_struct("EventListener")
776 .field(name:"listening", &self.is_listening())
777 .finish()
778 }
779}
780
781impl<T> EventListener<T> {
782 /// Create a new `EventListener` that will wait for a notification from the given [`Event`].
783 ///
784 /// This function does not register the `EventListener` into the linked list of listeners
785 /// contained within the [`Event`]. Make sure to call `listen` before `await`ing on
786 /// this future or calling `wait()`.
787 ///
788 /// ## Examples
789 ///
790 /// ```
791 /// use event_listener::{Event, EventListener};
792 ///
793 /// let event = Event::new();
794 /// let listener = EventListener::new();
795 ///
796 /// // Make sure that the listener is pinned and listening before doing anything else.
797 /// let mut listener = Box::pin(listener);
798 /// listener.as_mut().listen(&event);
799 /// ```
800 pub fn new() -> Self {
801 Self {
802 listener: Listener {
803 event: None,
804 listener: None,
805 },
806 }
807 }
808
809 /// Register this listener into the given [`Event`].
810 ///
811 /// This method can only be called after the listener has been pinned, and must be called before
812 /// the listener is polled.
813 ///
814 /// Notifications that exist when this function is called will be discarded.
815 pub fn listen(mut self: Pin<&mut Self>, event: &Event<T>) {
816 let inner = {
817 let inner = event.inner();
818 unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }
819 };
820
821 let ListenerProject {
822 event,
823 mut listener,
824 } = self.as_mut().project().listener.project();
825
826 // If an event is already registered, make sure to remove it.
827 if let Some(current_event) = event.as_ref() {
828 current_event.remove(listener.as_mut(), false);
829 }
830
831 let inner = event.insert(inner);
832 inner.insert(listener);
833
834 // Make sure the listener is registered before whatever happens next.
835 notify::full_fence();
836 }
837
838 /// Tell if this [`EventListener`] is currently listening for a notification.
839 ///
840 /// # Examples
841 ///
842 /// ```
843 /// use event_listener::{Event, EventListener};
844 ///
845 /// let event = Event::new();
846 /// let mut listener = Box::pin(EventListener::new());
847 ///
848 /// // The listener starts off not listening.
849 /// assert!(!listener.is_listening());
850 ///
851 /// // After listen() is called, the listener is listening.
852 /// listener.as_mut().listen(&event);
853 /// assert!(listener.is_listening());
854 ///
855 /// // Once the future is notified, the listener is no longer listening.
856 /// event.notify(1);
857 /// listener.as_mut().wait();
858 /// assert!(!listener.is_listening());
859 /// ```
860 pub fn is_listening(&self) -> bool {
861 self.listener.listener.is_some()
862 }
863
864 /// Blocks until a notification is received.
865 ///
866 /// # Examples
867 ///
868 /// ```
869 /// use event_listener::Event;
870 ///
871 /// let event = Event::new();
872 /// let mut listener = event.listen();
873 ///
874 /// // Notify `listener`.
875 /// event.notify(1);
876 ///
877 /// // Receive the notification.
878 /// listener.as_mut().wait();
879 /// ```
880 #[cfg(all(feature = "std", not(target_family = "wasm")))]
881 pub fn wait(self: Pin<&mut Self>) -> T {
882 self.listener().wait_internal(None).unwrap()
883 }
884
885 /// Blocks until a notification is received or a timeout is reached.
886 ///
887 /// Returns `true` if a notification was received.
888 ///
889 /// # Examples
890 ///
891 /// ```
892 /// use std::time::Duration;
893 /// use event_listener::Event;
894 ///
895 /// let event = Event::new();
896 /// let mut listener = event.listen();
897 ///
898 /// // There are no notification so this times out.
899 /// assert!(listener.as_mut().wait_timeout(Duration::from_secs(1)).is_none());
900 /// ```
901 #[cfg(all(feature = "std", not(target_family = "wasm")))]
902 pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> Option<T> {
903 self.listener()
904 .wait_internal(Instant::now().checked_add(timeout))
905 }
906
907 /// Blocks until a notification is received or a deadline is reached.
908 ///
909 /// Returns `true` if a notification was received.
910 ///
911 /// # Examples
912 ///
913 /// ```
914 /// use std::time::{Duration, Instant};
915 /// use event_listener::Event;
916 ///
917 /// let event = Event::new();
918 /// let mut listener = event.listen();
919 ///
920 /// // There are no notification so this times out.
921 /// assert!(listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)).is_none());
922 /// ```
923 #[cfg(all(feature = "std", not(target_family = "wasm")))]
924 pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> Option<T> {
925 self.listener().wait_internal(Some(deadline))
926 }
927
928 /// Drops this listener and discards its notification (if any) without notifying another
929 /// active listener.
930 ///
931 /// Returns `true` if a notification was discarded.
932 ///
933 /// # Examples
934 /// ```
935 /// use event_listener::Event;
936 ///
937 /// let event = Event::new();
938 /// let mut listener1 = event.listen();
939 /// let mut listener2 = event.listen();
940 ///
941 /// event.notify(1);
942 ///
943 /// assert!(listener1.as_mut().discard());
944 /// assert!(!listener2.as_mut().discard());
945 /// ```
946 pub fn discard(self: Pin<&mut Self>) -> bool {
947 self.project().listener.discard()
948 }
949
950 /// Returns `true` if this listener listens to the given `Event`.
951 ///
952 /// # Examples
953 ///
954 /// ```
955 /// use event_listener::Event;
956 ///
957 /// let event = Event::new();
958 /// let listener = event.listen();
959 ///
960 /// assert!(listener.listens_to(&event));
961 /// ```
962 #[inline]
963 pub fn listens_to(&self, event: &Event<T>) -> bool {
964 if let Some(inner) = &self.listener.event {
965 return ptr::eq::<Inner<T>>(&**inner, event.inner.load(Ordering::Acquire));
966 }
967
968 false
969 }
970
971 /// Returns `true` if both listeners listen to the same `Event`.
972 ///
973 /// # Examples
974 ///
975 /// ```
976 /// use event_listener::Event;
977 ///
978 /// let event = Event::new();
979 /// let listener1 = event.listen();
980 /// let listener2 = event.listen();
981 ///
982 /// assert!(listener1.same_event(&listener2));
983 /// ```
984 pub fn same_event(&self, other: &EventListener<T>) -> bool {
985 if let (Some(inner1), Some(inner2)) = (self.inner(), other.inner()) {
986 return ptr::eq::<Inner<T>>(&**inner1, &**inner2);
987 }
988
989 false
990 }
991
992 fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener<T, Arc<Inner<T>>>> {
993 self.project().listener
994 }
995
996 fn inner(&self) -> Option<&Arc<Inner<T>>> {
997 self.listener.event.as_ref()
998 }
999}
1000
1001impl<T> Future for EventListener<T> {
1002 type Output = T;
1003
1004 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1005 self.listener().poll_internal(cx)
1006 }
1007}
1008
1009pin_project_lite::pin_project! {
1010 #[project(!Unpin)]
1011 #[project = ListenerProject]
1012 struct Listener<T, B: Borrow<Inner<T>>>
1013 where
1014 B: Unpin,
1015 {
1016 // The reference to the original event.
1017 event: Option<B>,
1018
1019 // The inner state of the listener.
1020 //
1021 // This is only ever `None` during initialization. After `listen()` has completed, this
1022 // should be `Some`.
1023 #[pin]
1024 listener: Option<sys::Listener<T>>,
1025 }
1026
1027 impl<T, B: Borrow<Inner<T>>> PinnedDrop for Listener<T, B>
1028 where
1029 B: Unpin,
1030 {
1031 fn drop(mut this: Pin<&mut Self>) {
1032 // If we're being dropped, we need to remove ourself from the list.
1033 let this = this.project();
1034 if let Some(inner) = this.event {
1035 (*inner).borrow().remove(this.listener, true);
1036 }
1037 }
1038 }
1039}
1040
1041unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Send> Send for Listener<T, B> {}
1042unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Sync> Sync for Listener<T, B> {}
1043
1044impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
1045 /// Wait until the provided deadline.
1046 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1047 fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> Option<T> {
1048 use std::cell::RefCell;
1049
1050 std::thread_local! {
1051 /// Cached thread-local parker/unparker pair.
1052 static PARKER: RefCell<Option<(Parker, Task)>> = RefCell::new(None);
1053 }
1054
1055 // Try to borrow the thread-local parker/unparker pair.
1056 PARKER
1057 .try_with({
1058 let this = self.as_mut();
1059 |parker| {
1060 let mut pair = parker
1061 .try_borrow_mut()
1062 .expect("Shouldn't be able to borrow parker reentrantly");
1063 let (parker, unparker) = pair.get_or_insert_with(|| {
1064 let (parker, unparker) = parking::pair();
1065 (parker, Task::Unparker(unparker))
1066 });
1067
1068 this.wait_with_parker(deadline, parker, unparker.as_task_ref())
1069 }
1070 })
1071 .unwrap_or_else(|_| {
1072 // If the pair isn't accessible, we may be being called in a destructor.
1073 // Just create a new pair.
1074 let (parker, unparker) = parking::pair();
1075 self.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker))
1076 })
1077 }
1078
1079 /// Wait until the provided deadline using the specified parker/unparker pair.
1080 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1081 fn wait_with_parker(
1082 self: Pin<&mut Self>,
1083 deadline: Option<Instant>,
1084 parker: &Parker,
1085 unparker: TaskRef<'_>,
1086 ) -> Option<T> {
1087 let mut this = self.project();
1088 let inner = (*this
1089 .event
1090 .as_ref()
1091 .expect("must listen() on event listener before waiting"))
1092 .borrow();
1093
1094 // Set the listener's state to `Task`.
1095 if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
1096 // We were already notified, so we don't need to park.
1097 return Some(tag);
1098 }
1099
1100 // Wait until a notification is received or the timeout is reached.
1101 loop {
1102 match deadline {
1103 None => parker.park(),
1104
1105 Some(deadline) => {
1106 // Make sure we're not timed out already.
1107 let now = Instant::now();
1108 if now >= deadline {
1109 // Remove our entry and check if we were notified.
1110 return inner
1111 .remove(this.listener, false)
1112 .expect("We never removed ourself from the list")
1113 .notified();
1114 }
1115 parker.park_deadline(deadline);
1116 }
1117 }
1118
1119 // See if we were notified.
1120 if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
1121 return Some(tag);
1122 }
1123 }
1124 }
1125
1126 /// Drops this listener and discards its notification (if any) without notifying another
1127 /// active listener.
1128 fn discard(self: Pin<&mut Self>) -> bool {
1129 let this = self.project();
1130
1131 if let Some(inner) = this.event.as_ref() {
1132 (*inner)
1133 .borrow()
1134 .remove(this.listener, false)
1135 .map_or(false, |state| state.is_notified())
1136 } else {
1137 false
1138 }
1139 }
1140
1141 /// Poll this listener for a notification.
1142 fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
1143 let mut this = self.project();
1144 let inner = match &this.event {
1145 Some(inner) => (*inner).borrow(),
1146 None => panic!(""),
1147 };
1148
1149 // Try to register the listener.
1150 match inner
1151 .register(this.listener.as_mut(), TaskRef::Waker(cx.waker()))
1152 .notified()
1153 {
1154 Some(tag) => {
1155 // We were already notified, so we don't need to park.
1156 Poll::Ready(tag)
1157 }
1158
1159 None => {
1160 // We're now waiting for a notification.
1161 Poll::Pending
1162 }
1163 }
1164 }
1165}
1166
1167/// The state of a listener.
1168#[derive(PartialEq)]
1169enum State<T> {
1170 /// The listener was just created.
1171 Created,
1172
1173 /// The listener has received a notification.
1174 ///
1175 /// The `bool` is `true` if this was an "additional" notification.
1176 Notified {
1177 /// Whether or not this is an "additional" notification.
1178 additional: bool,
1179
1180 /// The tag associated with the notification.
1181 tag: T,
1182 },
1183
1184 /// A task is waiting for a notification.
1185 Task(Task),
1186
1187 /// Empty hole used to replace a notified listener.
1188 NotifiedTaken,
1189}
1190
1191impl<T> fmt::Debug for State<T> {
1192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1193 match self {
1194 Self::Created => f.write_str(data:"Created"),
1195 Self::Notified { additional: &bool, .. } => f&mut DebugStruct<'_, '_>
1196 .debug_struct("Notified")
1197 .field(name:"additional", value:additional)
1198 .finish(),
1199 Self::Task(_) => f.write_str(data:"Task(_)"),
1200 Self::NotifiedTaken => f.write_str(data:"NotifiedTaken"),
1201 }
1202 }
1203}
1204
1205impl<T> State<T> {
1206 fn is_notified(&self) -> bool {
1207 matches!(self, Self::Notified { .. } | Self::NotifiedTaken)
1208 }
1209
1210 /// If this state was notified, return the tag associated with the notification.
1211 #[allow(unused)]
1212 fn notified(self) -> Option<T> {
1213 match self {
1214 Self::Notified { tag: T, .. } => Some(tag),
1215 Self::NotifiedTaken => panic!("listener was already notified but taken"),
1216 _ => None,
1217 }
1218 }
1219}
1220
1221/// The result of registering a listener.
1222#[derive(Debug, PartialEq)]
1223enum RegisterResult<T> {
1224 /// The listener was already notified.
1225 Notified(T),
1226
1227 /// The listener has been registered.
1228 Registered,
1229
1230 /// The listener was never inserted into the list.
1231 NeverInserted,
1232}
1233
1234impl<T> RegisterResult<T> {
1235 /// Whether or not the listener was notified.
1236 ///
1237 /// Panics if the listener was never inserted into the list.
1238 fn notified(self) -> Option<T> {
1239 match self {
1240 Self::Notified(tag: T) => Some(tag),
1241 Self::Registered => None,
1242 Self::NeverInserted => panic!("listener was never inserted into the list"),
1243 }
1244 }
1245}
1246
1247/// A task that can be woken up.
1248#[derive(Debug, Clone)]
1249enum Task {
1250 /// A waker that wakes up a future.
1251 Waker(Waker),
1252
1253 /// An unparker that wakes up a thread.
1254 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1255 Unparker(Unparker),
1256}
1257
1258impl Task {
1259 fn as_task_ref(&self) -> TaskRef<'_> {
1260 match self {
1261 Self::Waker(waker: &Waker) => TaskRef::Waker(waker),
1262 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1263 Self::Unparker(unparker: &Unparker) => TaskRef::Unparker(unparker),
1264 }
1265 }
1266
1267 fn wake(self) {
1268 match self {
1269 Self::Waker(waker: Waker) => waker.wake(),
1270 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1271 Self::Unparker(unparker: Unparker) => {
1272 unparker.unpark();
1273 }
1274 }
1275 }
1276}
1277
1278impl PartialEq for Task {
1279 fn eq(&self, other: &Self) -> bool {
1280 self.as_task_ref().will_wake(other.as_task_ref())
1281 }
1282}
1283
1284/// A reference to a task.
1285#[derive(Clone, Copy)]
1286enum TaskRef<'a> {
1287 /// A waker that wakes up a future.
1288 Waker(&'a Waker),
1289
1290 /// An unparker that wakes up a thread.
1291 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1292 Unparker(&'a Unparker),
1293}
1294
1295impl TaskRef<'_> {
1296 /// Tells if this task will wake up the other task.
1297 #[allow(unreachable_patterns)]
1298 fn will_wake(self, other: Self) -> bool {
1299 match (self, other) {
1300 (Self::Waker(a: &Waker), Self::Waker(b: &Waker)) => a.will_wake(b),
1301 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1302 (Self::Unparker(_), Self::Unparker(_)) => {
1303 // TODO: Use unreleased will_unpark API.
1304 false
1305 }
1306 _ => false,
1307 }
1308 }
1309
1310 /// Converts this task reference to a task by cloning.
1311 fn into_task(self) -> Task {
1312 match self {
1313 Self::Waker(waker: &Waker) => Task::Waker(waker.clone()),
1314 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1315 Self::Unparker(unparker: &Unparker) => Task::Unparker(unparker.clone()),
1316 }
1317 }
1318}
1319
1320/// Synchronization primitive implementation.
1321mod sync {
1322 pub(super) use core::cell;
1323
1324 #[cfg(not(feature = "portable-atomic"))]
1325 pub(super) use alloc::sync::Arc;
1326 #[cfg(not(feature = "portable-atomic"))]
1327 pub(super) use core::sync::atomic;
1328
1329 #[cfg(feature = "portable-atomic")]
1330 pub(super) use portable_atomic_crate as atomic;
1331 #[cfg(feature = "portable-atomic")]
1332 pub(super) use portable_atomic_util::Arc;
1333
1334 #[cfg(feature = "std")]
1335 pub(super) use std::sync::{Mutex, MutexGuard};
1336
1337 pub(super) trait WithMut {
1338 type Output;
1339
1340 fn with_mut<F, R>(&mut self, f: F) -> R
1341 where
1342 F: FnOnce(&mut Self::Output) -> R;
1343 }
1344
1345 impl<T> WithMut for atomic::AtomicPtr<T> {
1346 type Output = *mut T;
1347
1348 #[inline]
1349 fn with_mut<F, R>(&mut self, f: F) -> R
1350 where
1351 F: FnOnce(&mut Self::Output) -> R,
1352 {
1353 f(self.get_mut())
1354 }
1355 }
1356}
1357
1358fn __test_send_and_sync() {
1359 fn _assert_send<T: Send>() {}
1360 fn _assert_sync<T: Sync>() {}
1361
1362 _assert_send::<Event<()>>();
1363 _assert_sync::<Event<()>>();
1364 _assert_send::<EventListener<()>>();
1365 _assert_sync::<EventListener<()>>();
1366}
1367