1//! Notify async tasks or threads.
2//!
3//! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
4//!
5//! You can use this crate to turn non-blocking data structures into async or blocking data
6//! structures. See a [simple mutex] implementation that exposes an async and a blocking interface
7//! for acquiring locks.
8//!
9//! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts
10//! [simple mutex]: https://github.com/smol-rs/event-listener/blob/master/examples/mutex.rs
11//!
12//! # Examples
13//!
14//! Wait until another thread sets a boolean flag:
15//!
16//! ```
17//! use std::sync::atomic::{AtomicBool, Ordering};
18//! use std::sync::Arc;
19//! use std::thread;
20//! use std::time::Duration;
21//! use std::usize;
22//! use event_listener::Event;
23//!
24//! let flag = Arc::new(AtomicBool::new(false));
25//! let event = Arc::new(Event::new());
26//!
27//! // Spawn a thread that will set the flag after 1 second.
28//! thread::spawn({
29//! let flag = flag.clone();
30//! let event = event.clone();
31//! move || {
32//! // Wait for a second.
33//! thread::sleep(Duration::from_secs(1));
34//!
35//! // Set the flag.
36//! flag.store(true, Ordering::SeqCst);
37//!
38//! // Notify all listeners that the flag has been set.
39//! event.notify(usize::MAX);
40//! }
41//! });
42//!
43//! // Wait until the flag is set.
44//! loop {
45//! // Check the flag.
46//! if flag.load(Ordering::SeqCst) {
47//! break;
48//! }
49//!
50//! // Start listening for events.
51//! let listener = event.listen();
52//!
53//! // Check the flag again after creating the listener.
54//! if flag.load(Ordering::SeqCst) {
55//! break;
56//! }
57//!
58//! // Wait for a notification and continue the loop.
59//! listener.wait();
60//! }
61//! ```
62
63#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
64
65use std::cell::{Cell, UnsafeCell};
66use std::fmt;
67use std::future::Future;
68use std::mem::{self, ManuallyDrop};
69use std::ops::{Deref, DerefMut};
70use std::panic::{RefUnwindSafe, UnwindSafe};
71use std::pin::Pin;
72use std::ptr::{self, NonNull};
73use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
74use std::sync::{Arc, Mutex, MutexGuard};
75use std::task::{Context, Poll, Waker};
76use std::thread::{self, Thread};
77use std::time::{Duration, Instant};
78use std::usize;
79
80/// Inner state of [`Event`].
81struct Inner {
82 /// The number of notified entries, or `usize::MAX` if all of them have been notified.
83 ///
84 /// If there are no entries, this value is set to `usize::MAX`.
85 notified: AtomicUsize,
86
87 /// A linked list holding registered listeners.
88 list: Mutex<List>,
89
90 /// A single cached list entry to avoid allocations on the fast path of the insertion.
91 cache: UnsafeCell<Entry>,
92}
93
94impl Inner {
95 /// Locks the list.
96 fn lock(&self) -> ListGuard<'_> {
97 ListGuard {
98 inner: self,
99 guard: self.list.lock().unwrap(),
100 }
101 }
102
103 /// Returns the pointer to the single cached list entry.
104 #[inline(always)]
105 fn cache_ptr(&self) -> NonNull<Entry> {
106 unsafe { NonNull::new_unchecked(self.cache.get()) }
107 }
108}
109
110/// A synchronization primitive for notifying async tasks and threads.
111///
112/// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners:
113///
114/// 1. [`Event::notify()`] notifies a number of listeners.
115/// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners.
116///
117/// If there are no active listeners at the time a notification is sent, it simply gets lost.
118///
119/// There are two ways for a listener to wait for a notification:
120///
121/// 1. In an asynchronous manner using `.await`.
122/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
123///
124/// If a notified listener is dropped without receiving a notification, dropping will notify
125/// another active listener. Whether one *additional* listener will be notified depends on what
126/// kind of notification was delivered.
127///
128/// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
129pub struct Event {
130 /// A pointer to heap-allocated inner state.
131 ///
132 /// This pointer is initially null and gets lazily initialized on first use. Semantically, it
133 /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
134 /// reference count.
135 inner: AtomicPtr<Inner>,
136}
137
138unsafe impl Send for Event {}
139unsafe impl Sync for Event {}
140
141impl UnwindSafe for Event {}
142impl RefUnwindSafe for Event {}
143
144impl Event {
145 /// Creates a new [`Event`].
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// use event_listener::Event;
151 ///
152 /// let event = Event::new();
153 /// ```
154 #[inline]
155 pub const fn new() -> Event {
156 Event {
157 inner: AtomicPtr::new(ptr::null_mut()),
158 }
159 }
160
161 /// Returns a guard listening for a notification.
162 ///
163 /// This method emits a `SeqCst` fence after registering a listener.
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// use event_listener::Event;
169 ///
170 /// let event = Event::new();
171 /// let listener = event.listen();
172 /// ```
173 #[cold]
174 pub fn listen(&self) -> EventListener {
175 let inner = self.inner();
176 let listener = EventListener {
177 inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
178 entry: unsafe { Some((*inner).lock().insert((*inner).cache_ptr())) },
179 };
180
181 // Make sure the listener is registered before whatever happens next.
182 full_fence();
183 listener
184 }
185
186 /// Notifies a number of active listeners.
187 ///
188 /// The number is allowed to be zero or exceed the current number of listeners.
189 ///
190 /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
191 /// listeners among the active ones are notified.
192 ///
193 /// This method emits a `SeqCst` fence before notifying listeners.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use event_listener::Event;
199 ///
200 /// let event = Event::new();
201 ///
202 /// // This notification gets lost because there are no listeners.
203 /// event.notify(1);
204 ///
205 /// let listener1 = event.listen();
206 /// let listener2 = event.listen();
207 /// let listener3 = event.listen();
208 ///
209 /// // Notifies two listeners.
210 /// //
211 /// // Listener queueing is fair, which means `listener1` and `listener2`
212 /// // get notified here since they start listening before `listener3`.
213 /// event.notify(2);
214 /// ```
215 #[inline]
216 pub fn notify(&self, n: usize) {
217 // Make sure the notification comes after whatever triggered it.
218 full_fence();
219
220 if let Some(inner) = self.try_inner() {
221 // Notify if there is at least one unnotified listener and the number of notified
222 // listeners is less than `n`.
223 if inner.notified.load(Ordering::Acquire) < n {
224 inner.lock().notify(n);
225 }
226 }
227 }
228
229 /// Notifies a number of active listeners without emitting a `SeqCst` fence.
230 ///
231 /// The number is allowed to be zero or exceed the current number of listeners.
232 ///
233 /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
234 /// listeners among the active ones are notified.
235 ///
236 /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence.
237 ///
238 /// # Examples
239 ///
240 /// ```
241 /// use event_listener::Event;
242 /// use std::sync::atomic::{self, Ordering};
243 ///
244 /// let event = Event::new();
245 ///
246 /// // This notification gets lost because there are no listeners.
247 /// event.notify(1);
248 ///
249 /// let listener1 = event.listen();
250 /// let listener2 = event.listen();
251 /// let listener3 = event.listen();
252 ///
253 /// // We should emit a fence manually when using relaxed notifications.
254 /// atomic::fence(Ordering::SeqCst);
255 ///
256 /// // Notifies two listeners.
257 /// //
258 /// // Listener queueing is fair, which means `listener1` and `listener2`
259 /// // get notified here since they start listening before `listener3`.
260 /// event.notify(2);
261 /// ```
262 #[inline]
263 pub fn notify_relaxed(&self, n: usize) {
264 if let Some(inner) = self.try_inner() {
265 // Notify if there is at least one unnotified listener and the number of notified
266 // listeners is less than `n`.
267 if inner.notified.load(Ordering::Acquire) < n {
268 inner.lock().notify(n);
269 }
270 }
271 }
272
273 /// Notifies a number of active and still unnotified listeners.
274 ///
275 /// The number is allowed to be zero or exceed the current number of listeners.
276 ///
277 /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
278 /// were previously unnotified.
279 ///
280 /// This method emits a `SeqCst` fence before notifying listeners.
281 ///
282 /// # Examples
283 ///
284 /// ```
285 /// use event_listener::Event;
286 ///
287 /// let event = Event::new();
288 ///
289 /// // This notification gets lost because there are no listeners.
290 /// event.notify(1);
291 ///
292 /// let listener1 = event.listen();
293 /// let listener2 = event.listen();
294 /// let listener3 = event.listen();
295 ///
296 /// // Notifies two listeners.
297 /// //
298 /// // Listener queueing is fair, which means `listener1` and `listener2`
299 /// // get notified here since they start listening before `listener3`.
300 /// event.notify_additional(1);
301 /// event.notify_additional(1);
302 /// ```
303 #[inline]
304 pub fn notify_additional(&self, n: usize) {
305 // Make sure the notification comes after whatever triggered it.
306 full_fence();
307
308 if let Some(inner) = self.try_inner() {
309 // Notify if there is at least one unnotified listener.
310 if inner.notified.load(Ordering::Acquire) < usize::MAX {
311 inner.lock().notify_additional(n);
312 }
313 }
314 }
315
316 /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst`
317 /// fence.
318 ///
319 /// The number is allowed to be zero or exceed the current number of listeners.
320 ///
321 /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
322 /// were previously unnotified.
323 ///
324 /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence.
325 ///
326 /// # Examples
327 ///
328 /// ```
329 /// use event_listener::Event;
330 /// use std::sync::atomic::{self, Ordering};
331 ///
332 /// let event = Event::new();
333 ///
334 /// // This notification gets lost because there are no listeners.
335 /// event.notify(1);
336 ///
337 /// let listener1 = event.listen();
338 /// let listener2 = event.listen();
339 /// let listener3 = event.listen();
340 ///
341 /// // We should emit a fence manually when using relaxed notifications.
342 /// atomic::fence(Ordering::SeqCst);
343 ///
344 /// // Notifies two listeners.
345 /// //
346 /// // Listener queueing is fair, which means `listener1` and `listener2`
347 /// // get notified here since they start listening before `listener3`.
348 /// event.notify_additional_relaxed(1);
349 /// event.notify_additional_relaxed(1);
350 /// ```
351 #[inline]
352 pub fn notify_additional_relaxed(&self, n: usize) {
353 if let Some(inner) = self.try_inner() {
354 // Notify if there is at least one unnotified listener.
355 if inner.notified.load(Ordering::Acquire) < usize::MAX {
356 inner.lock().notify_additional(n);
357 }
358 }
359 }
360
361 /// Returns a reference to the inner state if it was initialized.
362 #[inline]
363 fn try_inner(&self) -> Option<&Inner> {
364 let inner = self.inner.load(Ordering::Acquire);
365 unsafe { inner.as_ref() }
366 }
367
368 /// Returns a raw pointer to the inner state, initializing it if necessary.
369 ///
370 /// This returns a raw pointer instead of reference because `from_raw`
371 /// requires raw/mut provenance: <https://github.com/rust-lang/rust/pull/67339>
372 fn inner(&self) -> *const Inner {
373 let mut inner = self.inner.load(Ordering::Acquire);
374
375 // Initialize the state if this is its first use.
376 if inner.is_null() {
377 // Allocate on the heap.
378 let new = Arc::new(Inner {
379 notified: AtomicUsize::new(usize::MAX),
380 list: std::sync::Mutex::new(List {
381 head: None,
382 tail: None,
383 start: None,
384 len: 0,
385 notified: 0,
386 cache_used: false,
387 }),
388 cache: UnsafeCell::new(Entry {
389 state: Cell::new(State::Created),
390 prev: Cell::new(None),
391 next: Cell::new(None),
392 }),
393 });
394 // Convert the heap-allocated state into a raw pointer.
395 let new = Arc::into_raw(new) as *mut Inner;
396
397 // Attempt to replace the null-pointer with the new state pointer.
398 inner = self
399 .inner
400 .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire)
401 .unwrap_or_else(|x| x);
402
403 // Check if the old pointer value was indeed null.
404 if inner.is_null() {
405 // If yes, then use the new state pointer.
406 inner = new;
407 } else {
408 // If not, that means a concurrent operation has initialized the state.
409 // In that case, use the old pointer and deallocate the new one.
410 unsafe {
411 drop(Arc::from_raw(new));
412 }
413 }
414 }
415
416 inner
417 }
418}
419
420impl Drop for Event {
421 #[inline]
422 fn drop(&mut self) {
423 let inner: *mut Inner = *self.inner.get_mut();
424
425 // If the state pointer has been initialized, deallocate it.
426 if !inner.is_null() {
427 unsafe {
428 drop(Arc::from_raw(ptr:inner));
429 }
430 }
431 }
432}
433
434impl fmt::Debug for Event {
435 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436 f.pad("Event { .. }")
437 }
438}
439
440impl Default for Event {
441 fn default() -> Event {
442 Event::new()
443 }
444}
445
446/// A guard waiting for a notification from an [`Event`].
447///
448/// There are two ways for a listener to wait for a notification:
449///
450/// 1. In an asynchronous manner using `.await`.
451/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
452///
453/// If a notified listener is dropped without receiving a notification, dropping will notify
454/// another active listener. Whether one *additional* listener will be notified depends on what
455/// kind of notification was delivered.
456pub struct EventListener {
457 /// A reference to [`Event`]'s inner state.
458 inner: Arc<Inner>,
459
460 /// A pointer to this listener's entry in the linked list.
461 entry: Option<NonNull<Entry>>,
462}
463
464unsafe impl Send for EventListener {}
465unsafe impl Sync for EventListener {}
466
467impl UnwindSafe for EventListener {}
468impl RefUnwindSafe for EventListener {}
469
470impl EventListener {
471 /// Blocks until a notification is received.
472 ///
473 /// # Examples
474 ///
475 /// ```
476 /// use event_listener::Event;
477 ///
478 /// let event = Event::new();
479 /// let listener = event.listen();
480 ///
481 /// // Notify `listener`.
482 /// event.notify(1);
483 ///
484 /// // Receive the notification.
485 /// listener.wait();
486 /// ```
487 pub fn wait(self) {
488 self.wait_internal(None);
489 }
490
491 /// Blocks until a notification is received or a timeout is reached.
492 ///
493 /// Returns `true` if a notification was received.
494 ///
495 /// # Examples
496 ///
497 /// ```
498 /// use std::time::Duration;
499 /// use event_listener::Event;
500 ///
501 /// let event = Event::new();
502 /// let listener = event.listen();
503 ///
504 /// // There are no notification so this times out.
505 /// assert!(!listener.wait_timeout(Duration::from_secs(1)));
506 /// ```
507 pub fn wait_timeout(self, timeout: Duration) -> bool {
508 self.wait_internal(Some(Instant::now() + timeout))
509 }
510
511 /// Blocks until a notification is received or a deadline is reached.
512 ///
513 /// Returns `true` if a notification was received.
514 ///
515 /// # Examples
516 ///
517 /// ```
518 /// use std::time::{Duration, Instant};
519 /// use event_listener::Event;
520 ///
521 /// let event = Event::new();
522 /// let listener = event.listen();
523 ///
524 /// // There are no notification so this times out.
525 /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
526 /// ```
527 pub fn wait_deadline(self, deadline: Instant) -> bool {
528 self.wait_internal(Some(deadline))
529 }
530
531 /// Drops this listener and discards its notification (if any) without notifying another
532 /// active listener.
533 ///
534 /// Returns `true` if a notification was discarded.
535 ///
536 /// # Examples
537 /// ```
538 /// use event_listener::Event;
539 ///
540 /// let event = Event::new();
541 /// let listener1 = event.listen();
542 /// let listener2 = event.listen();
543 ///
544 /// event.notify(1);
545 ///
546 /// assert!(listener1.discard());
547 /// assert!(!listener2.discard());
548 /// ```
549 pub fn discard(mut self) -> bool {
550 // If this listener has never picked up a notification...
551 if let Some(entry) = self.entry.take() {
552 let mut list = self.inner.lock();
553 // Remove the listener from the list and return `true` if it was notified.
554 if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) {
555 return true;
556 }
557 }
558 false
559 }
560
561 /// Returns `true` if this listener listens to the given `Event`.
562 ///
563 /// # Examples
564 ///
565 /// ```
566 /// use event_listener::Event;
567 ///
568 /// let event = Event::new();
569 /// let listener = event.listen();
570 ///
571 /// assert!(listener.listens_to(&event));
572 /// ```
573 #[inline]
574 pub fn listens_to(&self, event: &Event) -> bool {
575 ptr::eq::<Inner>(&*self.inner, event.inner.load(Ordering::Acquire))
576 }
577
578 /// Returns `true` if both listeners listen to the same `Event`.
579 ///
580 /// # Examples
581 ///
582 /// ```
583 /// use event_listener::Event;
584 ///
585 /// let event = Event::new();
586 /// let listener1 = event.listen();
587 /// let listener2 = event.listen();
588 ///
589 /// assert!(listener1.same_event(&listener2));
590 /// ```
591 pub fn same_event(&self, other: &EventListener) -> bool {
592 ptr::eq::<Inner>(&*self.inner, &*other.inner)
593 }
594
595 fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
596 // Take out the entry pointer and set it to `None`.
597 let entry = match self.entry.take() {
598 None => unreachable!("cannot wait twice on an `EventListener`"),
599 Some(entry) => entry,
600 };
601
602 // Set this listener's state to `Waiting`.
603 {
604 let mut list = self.inner.lock();
605 let e = unsafe { entry.as_ref() };
606
607 // Do a dummy replace operation in order to take out the state.
608 match e.state.replace(State::Notified(false)) {
609 State::Notified(_) => {
610 // If this listener has been notified, remove it from the list and return.
611 list.remove(entry, self.inner.cache_ptr());
612 return true;
613 }
614 // Otherwise, set the state to `Waiting`.
615 _ => e.state.set(State::Waiting(thread::current())),
616 }
617 }
618
619 // Wait until a notification is received or the timeout is reached.
620 loop {
621 match deadline {
622 None => thread::park(),
623
624 Some(deadline) => {
625 // Check for timeout.
626 let now = Instant::now();
627 if now >= deadline {
628 // Remove the entry and check if notified.
629 return self
630 .inner
631 .lock()
632 .remove(entry, self.inner.cache_ptr())
633 .is_notified();
634 }
635
636 // Park until the deadline.
637 thread::park_timeout(deadline - now);
638 }
639 }
640
641 let mut list = self.inner.lock();
642 let e = unsafe { entry.as_ref() };
643
644 // Do a dummy replace operation in order to take out the state.
645 match e.state.replace(State::Notified(false)) {
646 State::Notified(_) => {
647 // If this listener has been notified, remove it from the list and return.
648 list.remove(entry, self.inner.cache_ptr());
649 return true;
650 }
651 // Otherwise, set the state back to `Waiting`.
652 state => e.state.set(state),
653 }
654 }
655 }
656}
657
658impl fmt::Debug for EventListener {
659 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
660 f.pad("EventListener { .. }")
661 }
662}
663
664impl Future for EventListener {
665 type Output = ();
666
667 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
668 let mut list = self.inner.lock();
669
670 let entry = match self.entry {
671 None => unreachable!("cannot poll a completed `EventListener` future"),
672 Some(entry) => entry,
673 };
674 let state = unsafe { &entry.as_ref().state };
675
676 // Do a dummy replace operation in order to take out the state.
677 match state.replace(State::Notified(false)) {
678 State::Notified(_) => {
679 // If this listener has been notified, remove it from the list and return.
680 list.remove(entry, self.inner.cache_ptr());
681 drop(list);
682 self.entry = None;
683 return Poll::Ready(());
684 }
685 State::Created => {
686 // If the listener was just created, put it in the `Polling` state.
687 state.set(State::Polling(cx.waker().clone()));
688 }
689 State::Polling(w) => {
690 // If the listener was in the `Polling` state, update the waker.
691 if w.will_wake(cx.waker()) {
692 state.set(State::Polling(w));
693 } else {
694 state.set(State::Polling(cx.waker().clone()));
695 }
696 }
697 State::Waiting(_) => {
698 unreachable!("cannot poll and wait on `EventListener` at the same time")
699 }
700 }
701
702 Poll::Pending
703 }
704}
705
706impl Drop for EventListener {
707 fn drop(&mut self) {
708 // If this listener has never picked up a notification...
709 if let Some(entry: NonNull) = self.entry.take() {
710 let mut list: ListGuard<'_> = self.inner.lock();
711
712 // But if a notification was delivered to it...
713 if let State::Notified(additional: bool) = list.remove(entry, self.inner.cache_ptr()) {
714 // Then pass it on to another active listener.
715 if additional {
716 list.notify_additional(1);
717 } else {
718 list.notify(1);
719 }
720 }
721 }
722 }
723}
724
725/// A guard holding the linked list locked.
726struct ListGuard<'a> {
727 /// A reference to [`Event`]'s inner state.
728 inner: &'a Inner,
729
730 /// The actual guard that acquired the linked list.
731 guard: MutexGuard<'a, List>,
732}
733
734impl Drop for ListGuard<'_> {
735 #[inline]
736 fn drop(&mut self) {
737 let list: &mut List = &mut **self;
738
739 // Update the atomic `notified` counter.
740 let notified: usize = if list.notified < list.len {
741 list.notified
742 } else {
743 usize::MAX
744 };
745 self.inner.notified.store(val:notified, order:Ordering::Release);
746 }
747}
748
749impl Deref for ListGuard<'_> {
750 type Target = List;
751
752 #[inline]
753 fn deref(&self) -> &List {
754 &*self.guard
755 }
756}
757
758impl DerefMut for ListGuard<'_> {
759 #[inline]
760 fn deref_mut(&mut self) -> &mut List {
761 &mut *self.guard
762 }
763}
764
765/// The state of a listener.
766enum State {
767 /// It has just been created.
768 Created,
769
770 /// It has received a notification.
771 ///
772 /// The `bool` is `true` if this was an "additional" notification.
773 Notified(bool),
774
775 /// An async task is polling it.
776 Polling(Waker),
777
778 /// A thread is blocked on it.
779 Waiting(Thread),
780}
781
782impl State {
783 /// Returns `true` if this is the `Notified` state.
784 #[inline]
785 fn is_notified(&self) -> bool {
786 match self {
787 State::Notified(_) => true,
788 State::Created | State::Polling(_) | State::Waiting(_) => false,
789 }
790 }
791}
792
793/// An entry representing a registered listener.
794struct Entry {
795 /// THe state of this listener.
796 state: Cell<State>,
797
798 /// Previous entry in the linked list.
799 prev: Cell<Option<NonNull<Entry>>>,
800
801 /// Next entry in the linked list.
802 next: Cell<Option<NonNull<Entry>>>,
803}
804
805/// A linked list of entries.
806struct List {
807 /// First entry in the list.
808 head: Option<NonNull<Entry>>,
809
810 /// Last entry in the list.
811 tail: Option<NonNull<Entry>>,
812
813 /// The first unnotified entry in the list.
814 start: Option<NonNull<Entry>>,
815
816 /// Total number of entries in the list.
817 len: usize,
818
819 /// The number of notified entries in the list.
820 notified: usize,
821
822 /// Whether the cached entry is used.
823 cache_used: bool,
824}
825
826impl List {
827 /// Inserts a new entry into the list.
828 fn insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> {
829 unsafe {
830 let entry = Entry {
831 state: Cell::new(State::Created),
832 prev: Cell::new(self.tail),
833 next: Cell::new(None),
834 };
835
836 let entry = if self.cache_used {
837 // Allocate an entry that is going to become the new tail.
838 NonNull::new_unchecked(Box::into_raw(Box::new(entry)))
839 } else {
840 // No need to allocate - we can use the cached entry.
841 self.cache_used = true;
842 cache.as_ptr().write(entry);
843 cache
844 };
845
846 // Replace the tail with the new entry.
847 match mem::replace(&mut self.tail, Some(entry)) {
848 None => self.head = Some(entry),
849 Some(t) => t.as_ref().next.set(Some(entry)),
850 }
851
852 // If there were no unnotified entries, this one is the first now.
853 if self.start.is_none() {
854 self.start = self.tail;
855 }
856
857 // Bump the entry count.
858 self.len += 1;
859
860 entry
861 }
862 }
863
864 /// Removes an entry from the list and returns its state.
865 fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
866 unsafe {
867 let prev = entry.as_ref().prev.get();
868 let next = entry.as_ref().next.get();
869
870 // Unlink from the previous entry.
871 match prev {
872 None => self.head = next,
873 Some(p) => p.as_ref().next.set(next),
874 }
875
876 // Unlink from the next entry.
877 match next {
878 None => self.tail = prev,
879 Some(n) => n.as_ref().prev.set(prev),
880 }
881
882 // If this was the first unnotified entry, move the pointer to the next one.
883 if self.start == Some(entry) {
884 self.start = next;
885 }
886
887 // Extract the state.
888 let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
889 // Free the cached entry.
890 self.cache_used = false;
891 entry.as_ref().state.replace(State::Created)
892 } else {
893 // Deallocate the entry.
894 Box::from_raw(entry.as_ptr()).state.into_inner()
895 };
896
897 // Update the counters.
898 if state.is_notified() {
899 self.notified -= 1;
900 }
901 self.len -= 1;
902
903 state
904 }
905 }
906
907 /// Notifies a number of entries.
908 #[cold]
909 fn notify(&mut self, mut n: usize) {
910 if n <= self.notified {
911 return;
912 }
913 n -= self.notified;
914
915 while n > 0 {
916 n -= 1;
917
918 // Notify the first unnotified entry.
919 match self.start {
920 None => break,
921 Some(e) => {
922 // Get the entry and move the pointer forward.
923 let e = unsafe { e.as_ref() };
924 self.start = e.next.get();
925
926 // Set the state of this entry to `Notified` and notify.
927 match e.state.replace(State::Notified(false)) {
928 State::Notified(_) => {}
929 State::Created => {}
930 State::Polling(w) => w.wake(),
931 State::Waiting(t) => t.unpark(),
932 }
933
934 // Update the counter.
935 self.notified += 1;
936 }
937 }
938 }
939 }
940
941 /// Notifies a number of additional entries.
942 #[cold]
943 fn notify_additional(&mut self, mut n: usize) {
944 while n > 0 {
945 n -= 1;
946
947 // Notify the first unnotified entry.
948 match self.start {
949 None => break,
950 Some(e) => {
951 // Get the entry and move the pointer forward.
952 let e = unsafe { e.as_ref() };
953 self.start = e.next.get();
954
955 // Set the state of this entry to `Notified` and notify.
956 match e.state.replace(State::Notified(true)) {
957 State::Notified(_) => {}
958 State::Created => {}
959 State::Polling(w) => w.wake(),
960 State::Waiting(t) => t.unpark(),
961 }
962
963 // Update the counter.
964 self.notified += 1;
965 }
966 }
967 }
968 }
969}
970
971/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
972#[inline]
973fn full_fence() {
974 if cfg!(all(
975 any(target_arch = "x86", target_arch = "x86_64"),
976 not(miri)
977 )) {
978 // HACK(stjepang): On x86 architectures there are two different ways of executing
979 // a `SeqCst` fence.
980 //
981 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
982 // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
983 //
984 // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
985 // that the second one is sometimes a bit faster.
986 //
987 // The ideal solution here would be to use inline assembly, but we're instead creating a
988 // temporary atomic variable and compare-and-exchanging its value. No sane compiler to
989 // x86 platforms is going to optimize this away.
990 atomic::compiler_fence(Ordering::SeqCst);
991 let a = AtomicUsize::new(0);
992 let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst);
993 atomic::compiler_fence(Ordering::SeqCst);
994 } else {
995 atomic::fence(Ordering::SeqCst);
996 }
997}
998