1 | //! Notify async tasks or threads. |
2 | //! |
3 | //! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov. |
4 | //! |
5 | //! You can use this crate to turn non-blocking data structures into async or blocking data |
6 | //! structures. See a [simple mutex] implementation that exposes an async and a blocking interface |
7 | //! for acquiring locks. |
8 | //! |
9 | //! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts |
10 | //! [simple mutex]: https://github.com/smol-rs/event-listener/blob/master/examples/mutex.rs |
11 | //! |
12 | //! # Examples |
13 | //! |
14 | //! Wait until another thread sets a boolean flag: |
15 | //! |
16 | //! ``` |
17 | //! use std::sync::atomic::{AtomicBool, Ordering}; |
18 | //! use std::sync::Arc; |
19 | //! use std::thread; |
20 | //! use std::time::Duration; |
21 | //! use std::usize; |
22 | //! use event_listener::Event; |
23 | //! |
24 | //! let flag = Arc::new(AtomicBool::new(false)); |
25 | //! let event = Arc::new(Event::new()); |
26 | //! |
27 | //! // Spawn a thread that will set the flag after 1 second. |
28 | //! thread::spawn({ |
29 | //! let flag = flag.clone(); |
30 | //! let event = event.clone(); |
31 | //! move || { |
32 | //! // Wait for a second. |
33 | //! thread::sleep(Duration::from_secs(1)); |
34 | //! |
35 | //! // Set the flag. |
36 | //! flag.store(true, Ordering::SeqCst); |
37 | //! |
38 | //! // Notify all listeners that the flag has been set. |
39 | //! event.notify(usize::MAX); |
40 | //! } |
41 | //! }); |
42 | //! |
43 | //! // Wait until the flag is set. |
44 | //! loop { |
45 | //! // Check the flag. |
46 | //! if flag.load(Ordering::SeqCst) { |
47 | //! break; |
48 | //! } |
49 | //! |
50 | //! // Start listening for events. |
51 | //! let listener = event.listen(); |
52 | //! |
53 | //! // Check the flag again after creating the listener. |
54 | //! if flag.load(Ordering::SeqCst) { |
55 | //! break; |
56 | //! } |
57 | //! |
58 | //! // Wait for a notification and continue the loop. |
59 | //! listener.wait(); |
60 | //! } |
61 | //! ``` |
62 | |
63 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
64 | |
65 | use std::cell::{Cell, UnsafeCell}; |
66 | use std::fmt; |
67 | use std::future::Future; |
68 | use std::mem::{self, ManuallyDrop}; |
69 | use std::ops::{Deref, DerefMut}; |
70 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
71 | use std::pin::Pin; |
72 | use std::ptr::{self, NonNull}; |
73 | use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; |
74 | use std::sync::{Arc, Mutex, MutexGuard}; |
75 | use std::task::{Context, Poll, Waker}; |
76 | use std::thread::{self, Thread}; |
77 | use std::time::{Duration, Instant}; |
78 | use std::usize; |
79 | |
80 | /// Inner state of [`Event`]. |
81 | struct Inner { |
82 | /// The number of notified entries, or `usize::MAX` if all of them have been notified. |
83 | /// |
84 | /// If there are no entries, this value is set to `usize::MAX`. |
85 | notified: AtomicUsize, |
86 | |
87 | /// A linked list holding registered listeners. |
88 | list: Mutex<List>, |
89 | |
90 | /// A single cached list entry to avoid allocations on the fast path of the insertion. |
91 | cache: UnsafeCell<Entry>, |
92 | } |
93 | |
94 | impl Inner { |
95 | /// Locks the list. |
96 | fn lock(&self) -> ListGuard<'_> { |
97 | ListGuard { |
98 | inner: self, |
99 | guard: self.list.lock().unwrap(), |
100 | } |
101 | } |
102 | |
103 | /// Returns the pointer to the single cached list entry. |
104 | #[inline (always)] |
105 | fn cache_ptr(&self) -> NonNull<Entry> { |
106 | unsafe { NonNull::new_unchecked(self.cache.get()) } |
107 | } |
108 | } |
109 | |
110 | /// A synchronization primitive for notifying async tasks and threads. |
111 | /// |
112 | /// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners: |
113 | /// |
114 | /// 1. [`Event::notify()`] notifies a number of listeners. |
115 | /// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners. |
116 | /// |
117 | /// If there are no active listeners at the time a notification is sent, it simply gets lost. |
118 | /// |
119 | /// There are two ways for a listener to wait for a notification: |
120 | /// |
121 | /// 1. In an asynchronous manner using `.await`. |
122 | /// 2. In a blocking manner by calling [`EventListener::wait()`] on it. |
123 | /// |
124 | /// If a notified listener is dropped without receiving a notification, dropping will notify |
125 | /// another active listener. Whether one *additional* listener will be notified depends on what |
126 | /// kind of notification was delivered. |
127 | /// |
128 | /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness. |
129 | pub struct Event { |
130 | /// A pointer to heap-allocated inner state. |
131 | /// |
132 | /// This pointer is initially null and gets lazily initialized on first use. Semantically, it |
133 | /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s |
134 | /// reference count. |
135 | inner: AtomicPtr<Inner>, |
136 | } |
137 | |
138 | unsafe impl Send for Event {} |
139 | unsafe impl Sync for Event {} |
140 | |
141 | impl UnwindSafe for Event {} |
142 | impl RefUnwindSafe for Event {} |
143 | |
144 | impl Event { |
145 | /// Creates a new [`Event`]. |
146 | /// |
147 | /// # Examples |
148 | /// |
149 | /// ``` |
150 | /// use event_listener::Event; |
151 | /// |
152 | /// let event = Event::new(); |
153 | /// ``` |
154 | #[inline ] |
155 | pub const fn new() -> Event { |
156 | Event { |
157 | inner: AtomicPtr::new(ptr::null_mut()), |
158 | } |
159 | } |
160 | |
161 | /// Returns a guard listening for a notification. |
162 | /// |
163 | /// This method emits a `SeqCst` fence after registering a listener. |
164 | /// |
165 | /// # Examples |
166 | /// |
167 | /// ``` |
168 | /// use event_listener::Event; |
169 | /// |
170 | /// let event = Event::new(); |
171 | /// let listener = event.listen(); |
172 | /// ``` |
173 | #[cold ] |
174 | pub fn listen(&self) -> EventListener { |
175 | let inner = self.inner(); |
176 | let listener = EventListener { |
177 | inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }, |
178 | entry: unsafe { Some((*inner).lock().insert((*inner).cache_ptr())) }, |
179 | }; |
180 | |
181 | // Make sure the listener is registered before whatever happens next. |
182 | full_fence(); |
183 | listener |
184 | } |
185 | |
186 | /// Notifies a number of active listeners. |
187 | /// |
188 | /// The number is allowed to be zero or exceed the current number of listeners. |
189 | /// |
190 | /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n` |
191 | /// listeners among the active ones are notified. |
192 | /// |
193 | /// This method emits a `SeqCst` fence before notifying listeners. |
194 | /// |
195 | /// # Examples |
196 | /// |
197 | /// ``` |
198 | /// use event_listener::Event; |
199 | /// |
200 | /// let event = Event::new(); |
201 | /// |
202 | /// // This notification gets lost because there are no listeners. |
203 | /// event.notify(1); |
204 | /// |
205 | /// let listener1 = event.listen(); |
206 | /// let listener2 = event.listen(); |
207 | /// let listener3 = event.listen(); |
208 | /// |
209 | /// // Notifies two listeners. |
210 | /// // |
211 | /// // Listener queueing is fair, which means `listener1` and `listener2` |
212 | /// // get notified here since they start listening before `listener3`. |
213 | /// event.notify(2); |
214 | /// ``` |
215 | #[inline ] |
216 | pub fn notify(&self, n: usize) { |
217 | // Make sure the notification comes after whatever triggered it. |
218 | full_fence(); |
219 | |
220 | if let Some(inner) = self.try_inner() { |
221 | // Notify if there is at least one unnotified listener and the number of notified |
222 | // listeners is less than `n`. |
223 | if inner.notified.load(Ordering::Acquire) < n { |
224 | inner.lock().notify(n); |
225 | } |
226 | } |
227 | } |
228 | |
229 | /// Notifies a number of active listeners without emitting a `SeqCst` fence. |
230 | /// |
231 | /// The number is allowed to be zero or exceed the current number of listeners. |
232 | /// |
233 | /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n` |
234 | /// listeners among the active ones are notified. |
235 | /// |
236 | /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence. |
237 | /// |
238 | /// # Examples |
239 | /// |
240 | /// ``` |
241 | /// use event_listener::Event; |
242 | /// use std::sync::atomic::{self, Ordering}; |
243 | /// |
244 | /// let event = Event::new(); |
245 | /// |
246 | /// // This notification gets lost because there are no listeners. |
247 | /// event.notify(1); |
248 | /// |
249 | /// let listener1 = event.listen(); |
250 | /// let listener2 = event.listen(); |
251 | /// let listener3 = event.listen(); |
252 | /// |
253 | /// // We should emit a fence manually when using relaxed notifications. |
254 | /// atomic::fence(Ordering::SeqCst); |
255 | /// |
256 | /// // Notifies two listeners. |
257 | /// // |
258 | /// // Listener queueing is fair, which means `listener1` and `listener2` |
259 | /// // get notified here since they start listening before `listener3`. |
260 | /// event.notify(2); |
261 | /// ``` |
262 | #[inline ] |
263 | pub fn notify_relaxed(&self, n: usize) { |
264 | if let Some(inner) = self.try_inner() { |
265 | // Notify if there is at least one unnotified listener and the number of notified |
266 | // listeners is less than `n`. |
267 | if inner.notified.load(Ordering::Acquire) < n { |
268 | inner.lock().notify(n); |
269 | } |
270 | } |
271 | } |
272 | |
273 | /// Notifies a number of active and still unnotified listeners. |
274 | /// |
275 | /// The number is allowed to be zero or exceed the current number of listeners. |
276 | /// |
277 | /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that |
278 | /// were previously unnotified. |
279 | /// |
280 | /// This method emits a `SeqCst` fence before notifying listeners. |
281 | /// |
282 | /// # Examples |
283 | /// |
284 | /// ``` |
285 | /// use event_listener::Event; |
286 | /// |
287 | /// let event = Event::new(); |
288 | /// |
289 | /// // This notification gets lost because there are no listeners. |
290 | /// event.notify(1); |
291 | /// |
292 | /// let listener1 = event.listen(); |
293 | /// let listener2 = event.listen(); |
294 | /// let listener3 = event.listen(); |
295 | /// |
296 | /// // Notifies two listeners. |
297 | /// // |
298 | /// // Listener queueing is fair, which means `listener1` and `listener2` |
299 | /// // get notified here since they start listening before `listener3`. |
300 | /// event.notify_additional(1); |
301 | /// event.notify_additional(1); |
302 | /// ``` |
303 | #[inline ] |
304 | pub fn notify_additional(&self, n: usize) { |
305 | // Make sure the notification comes after whatever triggered it. |
306 | full_fence(); |
307 | |
308 | if let Some(inner) = self.try_inner() { |
309 | // Notify if there is at least one unnotified listener. |
310 | if inner.notified.load(Ordering::Acquire) < usize::MAX { |
311 | inner.lock().notify_additional(n); |
312 | } |
313 | } |
314 | } |
315 | |
316 | /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst` |
317 | /// fence. |
318 | /// |
319 | /// The number is allowed to be zero or exceed the current number of listeners. |
320 | /// |
321 | /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that |
322 | /// were previously unnotified. |
323 | /// |
324 | /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence. |
325 | /// |
326 | /// # Examples |
327 | /// |
328 | /// ``` |
329 | /// use event_listener::Event; |
330 | /// use std::sync::atomic::{self, Ordering}; |
331 | /// |
332 | /// let event = Event::new(); |
333 | /// |
334 | /// // This notification gets lost because there are no listeners. |
335 | /// event.notify(1); |
336 | /// |
337 | /// let listener1 = event.listen(); |
338 | /// let listener2 = event.listen(); |
339 | /// let listener3 = event.listen(); |
340 | /// |
341 | /// // We should emit a fence manually when using relaxed notifications. |
342 | /// atomic::fence(Ordering::SeqCst); |
343 | /// |
344 | /// // Notifies two listeners. |
345 | /// // |
346 | /// // Listener queueing is fair, which means `listener1` and `listener2` |
347 | /// // get notified here since they start listening before `listener3`. |
348 | /// event.notify_additional_relaxed(1); |
349 | /// event.notify_additional_relaxed(1); |
350 | /// ``` |
351 | #[inline ] |
352 | pub fn notify_additional_relaxed(&self, n: usize) { |
353 | if let Some(inner) = self.try_inner() { |
354 | // Notify if there is at least one unnotified listener. |
355 | if inner.notified.load(Ordering::Acquire) < usize::MAX { |
356 | inner.lock().notify_additional(n); |
357 | } |
358 | } |
359 | } |
360 | |
361 | /// Returns a reference to the inner state if it was initialized. |
362 | #[inline ] |
363 | fn try_inner(&self) -> Option<&Inner> { |
364 | let inner = self.inner.load(Ordering::Acquire); |
365 | unsafe { inner.as_ref() } |
366 | } |
367 | |
368 | /// Returns a raw pointer to the inner state, initializing it if necessary. |
369 | /// |
370 | /// This returns a raw pointer instead of reference because `from_raw` |
371 | /// requires raw/mut provenance: <https://github.com/rust-lang/rust/pull/67339> |
372 | fn inner(&self) -> *const Inner { |
373 | let mut inner = self.inner.load(Ordering::Acquire); |
374 | |
375 | // Initialize the state if this is its first use. |
376 | if inner.is_null() { |
377 | // Allocate on the heap. |
378 | let new = Arc::new(Inner { |
379 | notified: AtomicUsize::new(usize::MAX), |
380 | list: std::sync::Mutex::new(List { |
381 | head: None, |
382 | tail: None, |
383 | start: None, |
384 | len: 0, |
385 | notified: 0, |
386 | cache_used: false, |
387 | }), |
388 | cache: UnsafeCell::new(Entry { |
389 | state: Cell::new(State::Created), |
390 | prev: Cell::new(None), |
391 | next: Cell::new(None), |
392 | }), |
393 | }); |
394 | // Convert the heap-allocated state into a raw pointer. |
395 | let new = Arc::into_raw(new) as *mut Inner; |
396 | |
397 | // Attempt to replace the null-pointer with the new state pointer. |
398 | inner = self |
399 | .inner |
400 | .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire) |
401 | .unwrap_or_else(|x| x); |
402 | |
403 | // Check if the old pointer value was indeed null. |
404 | if inner.is_null() { |
405 | // If yes, then use the new state pointer. |
406 | inner = new; |
407 | } else { |
408 | // If not, that means a concurrent operation has initialized the state. |
409 | // In that case, use the old pointer and deallocate the new one. |
410 | unsafe { |
411 | drop(Arc::from_raw(new)); |
412 | } |
413 | } |
414 | } |
415 | |
416 | inner |
417 | } |
418 | } |
419 | |
420 | impl Drop for Event { |
421 | #[inline ] |
422 | fn drop(&mut self) { |
423 | let inner: *mut Inner = *self.inner.get_mut(); |
424 | |
425 | // If the state pointer has been initialized, deallocate it. |
426 | if !inner.is_null() { |
427 | unsafe { |
428 | drop(Arc::from_raw(ptr:inner)); |
429 | } |
430 | } |
431 | } |
432 | } |
433 | |
434 | impl fmt::Debug for Event { |
435 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
436 | f.pad("Event { .. }" ) |
437 | } |
438 | } |
439 | |
440 | impl Default for Event { |
441 | fn default() -> Event { |
442 | Event::new() |
443 | } |
444 | } |
445 | |
446 | /// A guard waiting for a notification from an [`Event`]. |
447 | /// |
448 | /// There are two ways for a listener to wait for a notification: |
449 | /// |
450 | /// 1. In an asynchronous manner using `.await`. |
451 | /// 2. In a blocking manner by calling [`EventListener::wait()`] on it. |
452 | /// |
453 | /// If a notified listener is dropped without receiving a notification, dropping will notify |
454 | /// another active listener. Whether one *additional* listener will be notified depends on what |
455 | /// kind of notification was delivered. |
456 | pub struct EventListener { |
457 | /// A reference to [`Event`]'s inner state. |
458 | inner: Arc<Inner>, |
459 | |
460 | /// A pointer to this listener's entry in the linked list. |
461 | entry: Option<NonNull<Entry>>, |
462 | } |
463 | |
464 | unsafe impl Send for EventListener {} |
465 | unsafe impl Sync for EventListener {} |
466 | |
467 | impl UnwindSafe for EventListener {} |
468 | impl RefUnwindSafe for EventListener {} |
469 | |
470 | impl EventListener { |
471 | /// Blocks until a notification is received. |
472 | /// |
473 | /// # Examples |
474 | /// |
475 | /// ``` |
476 | /// use event_listener::Event; |
477 | /// |
478 | /// let event = Event::new(); |
479 | /// let listener = event.listen(); |
480 | /// |
481 | /// // Notify `listener`. |
482 | /// event.notify(1); |
483 | /// |
484 | /// // Receive the notification. |
485 | /// listener.wait(); |
486 | /// ``` |
487 | pub fn wait(self) { |
488 | self.wait_internal(None); |
489 | } |
490 | |
491 | /// Blocks until a notification is received or a timeout is reached. |
492 | /// |
493 | /// Returns `true` if a notification was received. |
494 | /// |
495 | /// # Examples |
496 | /// |
497 | /// ``` |
498 | /// use std::time::Duration; |
499 | /// use event_listener::Event; |
500 | /// |
501 | /// let event = Event::new(); |
502 | /// let listener = event.listen(); |
503 | /// |
504 | /// // There are no notification so this times out. |
505 | /// assert!(!listener.wait_timeout(Duration::from_secs(1))); |
506 | /// ``` |
507 | pub fn wait_timeout(self, timeout: Duration) -> bool { |
508 | self.wait_internal(Some(Instant::now() + timeout)) |
509 | } |
510 | |
511 | /// Blocks until a notification is received or a deadline is reached. |
512 | /// |
513 | /// Returns `true` if a notification was received. |
514 | /// |
515 | /// # Examples |
516 | /// |
517 | /// ``` |
518 | /// use std::time::{Duration, Instant}; |
519 | /// use event_listener::Event; |
520 | /// |
521 | /// let event = Event::new(); |
522 | /// let listener = event.listen(); |
523 | /// |
524 | /// // There are no notification so this times out. |
525 | /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1))); |
526 | /// ``` |
527 | pub fn wait_deadline(self, deadline: Instant) -> bool { |
528 | self.wait_internal(Some(deadline)) |
529 | } |
530 | |
531 | /// Drops this listener and discards its notification (if any) without notifying another |
532 | /// active listener. |
533 | /// |
534 | /// Returns `true` if a notification was discarded. |
535 | /// |
536 | /// # Examples |
537 | /// ``` |
538 | /// use event_listener::Event; |
539 | /// |
540 | /// let event = Event::new(); |
541 | /// let listener1 = event.listen(); |
542 | /// let listener2 = event.listen(); |
543 | /// |
544 | /// event.notify(1); |
545 | /// |
546 | /// assert!(listener1.discard()); |
547 | /// assert!(!listener2.discard()); |
548 | /// ``` |
549 | pub fn discard(mut self) -> bool { |
550 | // If this listener has never picked up a notification... |
551 | if let Some(entry) = self.entry.take() { |
552 | let mut list = self.inner.lock(); |
553 | // Remove the listener from the list and return `true` if it was notified. |
554 | if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) { |
555 | return true; |
556 | } |
557 | } |
558 | false |
559 | } |
560 | |
561 | /// Returns `true` if this listener listens to the given `Event`. |
562 | /// |
563 | /// # Examples |
564 | /// |
565 | /// ``` |
566 | /// use event_listener::Event; |
567 | /// |
568 | /// let event = Event::new(); |
569 | /// let listener = event.listen(); |
570 | /// |
571 | /// assert!(listener.listens_to(&event)); |
572 | /// ``` |
573 | #[inline ] |
574 | pub fn listens_to(&self, event: &Event) -> bool { |
575 | ptr::eq::<Inner>(&*self.inner, event.inner.load(Ordering::Acquire)) |
576 | } |
577 | |
578 | /// Returns `true` if both listeners listen to the same `Event`. |
579 | /// |
580 | /// # Examples |
581 | /// |
582 | /// ``` |
583 | /// use event_listener::Event; |
584 | /// |
585 | /// let event = Event::new(); |
586 | /// let listener1 = event.listen(); |
587 | /// let listener2 = event.listen(); |
588 | /// |
589 | /// assert!(listener1.same_event(&listener2)); |
590 | /// ``` |
591 | pub fn same_event(&self, other: &EventListener) -> bool { |
592 | ptr::eq::<Inner>(&*self.inner, &*other.inner) |
593 | } |
594 | |
595 | fn wait_internal(mut self, deadline: Option<Instant>) -> bool { |
596 | // Take out the entry pointer and set it to `None`. |
597 | let entry = match self.entry.take() { |
598 | None => unreachable!("cannot wait twice on an `EventListener`" ), |
599 | Some(entry) => entry, |
600 | }; |
601 | |
602 | // Set this listener's state to `Waiting`. |
603 | { |
604 | let mut list = self.inner.lock(); |
605 | let e = unsafe { entry.as_ref() }; |
606 | |
607 | // Do a dummy replace operation in order to take out the state. |
608 | match e.state.replace(State::Notified(false)) { |
609 | State::Notified(_) => { |
610 | // If this listener has been notified, remove it from the list and return. |
611 | list.remove(entry, self.inner.cache_ptr()); |
612 | return true; |
613 | } |
614 | // Otherwise, set the state to `Waiting`. |
615 | _ => e.state.set(State::Waiting(thread::current())), |
616 | } |
617 | } |
618 | |
619 | // Wait until a notification is received or the timeout is reached. |
620 | loop { |
621 | match deadline { |
622 | None => thread::park(), |
623 | |
624 | Some(deadline) => { |
625 | // Check for timeout. |
626 | let now = Instant::now(); |
627 | if now >= deadline { |
628 | // Remove the entry and check if notified. |
629 | return self |
630 | .inner |
631 | .lock() |
632 | .remove(entry, self.inner.cache_ptr()) |
633 | .is_notified(); |
634 | } |
635 | |
636 | // Park until the deadline. |
637 | thread::park_timeout(deadline - now); |
638 | } |
639 | } |
640 | |
641 | let mut list = self.inner.lock(); |
642 | let e = unsafe { entry.as_ref() }; |
643 | |
644 | // Do a dummy replace operation in order to take out the state. |
645 | match e.state.replace(State::Notified(false)) { |
646 | State::Notified(_) => { |
647 | // If this listener has been notified, remove it from the list and return. |
648 | list.remove(entry, self.inner.cache_ptr()); |
649 | return true; |
650 | } |
651 | // Otherwise, set the state back to `Waiting`. |
652 | state => e.state.set(state), |
653 | } |
654 | } |
655 | } |
656 | } |
657 | |
658 | impl fmt::Debug for EventListener { |
659 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
660 | f.pad("EventListener { .. }" ) |
661 | } |
662 | } |
663 | |
664 | impl Future for EventListener { |
665 | type Output = (); |
666 | |
667 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
668 | let mut list = self.inner.lock(); |
669 | |
670 | let entry = match self.entry { |
671 | None => unreachable!("cannot poll a completed `EventListener` future" ), |
672 | Some(entry) => entry, |
673 | }; |
674 | let state = unsafe { &entry.as_ref().state }; |
675 | |
676 | // Do a dummy replace operation in order to take out the state. |
677 | match state.replace(State::Notified(false)) { |
678 | State::Notified(_) => { |
679 | // If this listener has been notified, remove it from the list and return. |
680 | list.remove(entry, self.inner.cache_ptr()); |
681 | drop(list); |
682 | self.entry = None; |
683 | return Poll::Ready(()); |
684 | } |
685 | State::Created => { |
686 | // If the listener was just created, put it in the `Polling` state. |
687 | state.set(State::Polling(cx.waker().clone())); |
688 | } |
689 | State::Polling(w) => { |
690 | // If the listener was in the `Polling` state, update the waker. |
691 | if w.will_wake(cx.waker()) { |
692 | state.set(State::Polling(w)); |
693 | } else { |
694 | state.set(State::Polling(cx.waker().clone())); |
695 | } |
696 | } |
697 | State::Waiting(_) => { |
698 | unreachable!("cannot poll and wait on `EventListener` at the same time" ) |
699 | } |
700 | } |
701 | |
702 | Poll::Pending |
703 | } |
704 | } |
705 | |
706 | impl Drop for EventListener { |
707 | fn drop(&mut self) { |
708 | // If this listener has never picked up a notification... |
709 | if let Some(entry: NonNull) = self.entry.take() { |
710 | let mut list: ListGuard<'_> = self.inner.lock(); |
711 | |
712 | // But if a notification was delivered to it... |
713 | if let State::Notified(additional: bool) = list.remove(entry, self.inner.cache_ptr()) { |
714 | // Then pass it on to another active listener. |
715 | if additional { |
716 | list.notify_additional(1); |
717 | } else { |
718 | list.notify(1); |
719 | } |
720 | } |
721 | } |
722 | } |
723 | } |
724 | |
725 | /// A guard holding the linked list locked. |
726 | struct ListGuard<'a> { |
727 | /// A reference to [`Event`]'s inner state. |
728 | inner: &'a Inner, |
729 | |
730 | /// The actual guard that acquired the linked list. |
731 | guard: MutexGuard<'a, List>, |
732 | } |
733 | |
734 | impl Drop for ListGuard<'_> { |
735 | #[inline ] |
736 | fn drop(&mut self) { |
737 | let list: &mut List = &mut **self; |
738 | |
739 | // Update the atomic `notified` counter. |
740 | let notified: usize = if list.notified < list.len { |
741 | list.notified |
742 | } else { |
743 | usize::MAX |
744 | }; |
745 | self.inner.notified.store(val:notified, order:Ordering::Release); |
746 | } |
747 | } |
748 | |
749 | impl Deref for ListGuard<'_> { |
750 | type Target = List; |
751 | |
752 | #[inline ] |
753 | fn deref(&self) -> &List { |
754 | &*self.guard |
755 | } |
756 | } |
757 | |
758 | impl DerefMut for ListGuard<'_> { |
759 | #[inline ] |
760 | fn deref_mut(&mut self) -> &mut List { |
761 | &mut *self.guard |
762 | } |
763 | } |
764 | |
765 | /// The state of a listener. |
766 | enum State { |
767 | /// It has just been created. |
768 | Created, |
769 | |
770 | /// It has received a notification. |
771 | /// |
772 | /// The `bool` is `true` if this was an "additional" notification. |
773 | Notified(bool), |
774 | |
775 | /// An async task is polling it. |
776 | Polling(Waker), |
777 | |
778 | /// A thread is blocked on it. |
779 | Waiting(Thread), |
780 | } |
781 | |
782 | impl State { |
783 | /// Returns `true` if this is the `Notified` state. |
784 | #[inline ] |
785 | fn is_notified(&self) -> bool { |
786 | match self { |
787 | State::Notified(_) => true, |
788 | State::Created | State::Polling(_) | State::Waiting(_) => false, |
789 | } |
790 | } |
791 | } |
792 | |
793 | /// An entry representing a registered listener. |
794 | struct Entry { |
795 | /// THe state of this listener. |
796 | state: Cell<State>, |
797 | |
798 | /// Previous entry in the linked list. |
799 | prev: Cell<Option<NonNull<Entry>>>, |
800 | |
801 | /// Next entry in the linked list. |
802 | next: Cell<Option<NonNull<Entry>>>, |
803 | } |
804 | |
805 | /// A linked list of entries. |
806 | struct List { |
807 | /// First entry in the list. |
808 | head: Option<NonNull<Entry>>, |
809 | |
810 | /// Last entry in the list. |
811 | tail: Option<NonNull<Entry>>, |
812 | |
813 | /// The first unnotified entry in the list. |
814 | start: Option<NonNull<Entry>>, |
815 | |
816 | /// Total number of entries in the list. |
817 | len: usize, |
818 | |
819 | /// The number of notified entries in the list. |
820 | notified: usize, |
821 | |
822 | /// Whether the cached entry is used. |
823 | cache_used: bool, |
824 | } |
825 | |
826 | impl List { |
827 | /// Inserts a new entry into the list. |
828 | fn insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> { |
829 | unsafe { |
830 | let entry = Entry { |
831 | state: Cell::new(State::Created), |
832 | prev: Cell::new(self.tail), |
833 | next: Cell::new(None), |
834 | }; |
835 | |
836 | let entry = if self.cache_used { |
837 | // Allocate an entry that is going to become the new tail. |
838 | NonNull::new_unchecked(Box::into_raw(Box::new(entry))) |
839 | } else { |
840 | // No need to allocate - we can use the cached entry. |
841 | self.cache_used = true; |
842 | cache.as_ptr().write(entry); |
843 | cache |
844 | }; |
845 | |
846 | // Replace the tail with the new entry. |
847 | match mem::replace(&mut self.tail, Some(entry)) { |
848 | None => self.head = Some(entry), |
849 | Some(t) => t.as_ref().next.set(Some(entry)), |
850 | } |
851 | |
852 | // If there were no unnotified entries, this one is the first now. |
853 | if self.start.is_none() { |
854 | self.start = self.tail; |
855 | } |
856 | |
857 | // Bump the entry count. |
858 | self.len += 1; |
859 | |
860 | entry |
861 | } |
862 | } |
863 | |
864 | /// Removes an entry from the list and returns its state. |
865 | fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State { |
866 | unsafe { |
867 | let prev = entry.as_ref().prev.get(); |
868 | let next = entry.as_ref().next.get(); |
869 | |
870 | // Unlink from the previous entry. |
871 | match prev { |
872 | None => self.head = next, |
873 | Some(p) => p.as_ref().next.set(next), |
874 | } |
875 | |
876 | // Unlink from the next entry. |
877 | match next { |
878 | None => self.tail = prev, |
879 | Some(n) => n.as_ref().prev.set(prev), |
880 | } |
881 | |
882 | // If this was the first unnotified entry, move the pointer to the next one. |
883 | if self.start == Some(entry) { |
884 | self.start = next; |
885 | } |
886 | |
887 | // Extract the state. |
888 | let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) { |
889 | // Free the cached entry. |
890 | self.cache_used = false; |
891 | entry.as_ref().state.replace(State::Created) |
892 | } else { |
893 | // Deallocate the entry. |
894 | Box::from_raw(entry.as_ptr()).state.into_inner() |
895 | }; |
896 | |
897 | // Update the counters. |
898 | if state.is_notified() { |
899 | self.notified -= 1; |
900 | } |
901 | self.len -= 1; |
902 | |
903 | state |
904 | } |
905 | } |
906 | |
907 | /// Notifies a number of entries. |
908 | #[cold ] |
909 | fn notify(&mut self, mut n: usize) { |
910 | if n <= self.notified { |
911 | return; |
912 | } |
913 | n -= self.notified; |
914 | |
915 | while n > 0 { |
916 | n -= 1; |
917 | |
918 | // Notify the first unnotified entry. |
919 | match self.start { |
920 | None => break, |
921 | Some(e) => { |
922 | // Get the entry and move the pointer forward. |
923 | let e = unsafe { e.as_ref() }; |
924 | self.start = e.next.get(); |
925 | |
926 | // Set the state of this entry to `Notified` and notify. |
927 | match e.state.replace(State::Notified(false)) { |
928 | State::Notified(_) => {} |
929 | State::Created => {} |
930 | State::Polling(w) => w.wake(), |
931 | State::Waiting(t) => t.unpark(), |
932 | } |
933 | |
934 | // Update the counter. |
935 | self.notified += 1; |
936 | } |
937 | } |
938 | } |
939 | } |
940 | |
941 | /// Notifies a number of additional entries. |
942 | #[cold ] |
943 | fn notify_additional(&mut self, mut n: usize) { |
944 | while n > 0 { |
945 | n -= 1; |
946 | |
947 | // Notify the first unnotified entry. |
948 | match self.start { |
949 | None => break, |
950 | Some(e) => { |
951 | // Get the entry and move the pointer forward. |
952 | let e = unsafe { e.as_ref() }; |
953 | self.start = e.next.get(); |
954 | |
955 | // Set the state of this entry to `Notified` and notify. |
956 | match e.state.replace(State::Notified(true)) { |
957 | State::Notified(_) => {} |
958 | State::Created => {} |
959 | State::Polling(w) => w.wake(), |
960 | State::Waiting(t) => t.unpark(), |
961 | } |
962 | |
963 | // Update the counter. |
964 | self.notified += 1; |
965 | } |
966 | } |
967 | } |
968 | } |
969 | } |
970 | |
971 | /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. |
972 | #[inline ] |
973 | fn full_fence() { |
974 | if cfg!(all( |
975 | any(target_arch = "x86" , target_arch = "x86_64" ), |
976 | not(miri) |
977 | )) { |
978 | // HACK(stjepang): On x86 architectures there are two different ways of executing |
979 | // a `SeqCst` fence. |
980 | // |
981 | // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. |
982 | // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction. |
983 | // |
984 | // Both instructions have the effect of a full barrier, but empirical benchmarks have shown |
985 | // that the second one is sometimes a bit faster. |
986 | // |
987 | // The ideal solution here would be to use inline assembly, but we're instead creating a |
988 | // temporary atomic variable and compare-and-exchanging its value. No sane compiler to |
989 | // x86 platforms is going to optimize this away. |
990 | atomic::compiler_fence(Ordering::SeqCst); |
991 | let a = AtomicUsize::new(0); |
992 | let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst); |
993 | atomic::compiler_fence(Ordering::SeqCst); |
994 | } else { |
995 | atomic::fence(Ordering::SeqCst); |
996 | } |
997 | } |
998 | |