| 1 | //! An SPSC broadcast channel. |
| 2 | //! |
| 3 | //! - The value can only be a `usize`. |
| 4 | //! - The consumer is only notified if the value is different. |
| 5 | //! - The value `0` is reserved for closed. |
| 6 | |
| 7 | use futures_util::task::AtomicWaker; |
| 8 | use std::sync::{ |
| 9 | atomic::{AtomicUsize, Ordering}, |
| 10 | Arc, |
| 11 | }; |
| 12 | use std::task; |
| 13 | |
| 14 | type Value = usize; |
| 15 | |
| 16 | pub(crate) const CLOSED: usize = 0; |
| 17 | |
| 18 | pub(crate) fn channel(initial: Value) -> (Sender, Receiver) { |
| 19 | debug_assert!( |
| 20 | initial != CLOSED, |
| 21 | "watch::channel initial state of 0 is reserved" |
| 22 | ); |
| 23 | |
| 24 | let shared: Arc = Arc::new(data:Shared { |
| 25 | value: AtomicUsize::new(initial), |
| 26 | waker: AtomicWaker::new(), |
| 27 | }); |
| 28 | |
| 29 | ( |
| 30 | Sender { |
| 31 | shared: shared.clone(), |
| 32 | }, |
| 33 | Receiver { shared }, |
| 34 | ) |
| 35 | } |
| 36 | |
| 37 | pub(crate) struct Sender { |
| 38 | shared: Arc<Shared>, |
| 39 | } |
| 40 | |
| 41 | pub(crate) struct Receiver { |
| 42 | shared: Arc<Shared>, |
| 43 | } |
| 44 | |
| 45 | struct Shared { |
| 46 | value: AtomicUsize, |
| 47 | waker: AtomicWaker, |
| 48 | } |
| 49 | |
| 50 | impl Sender { |
| 51 | pub(crate) fn send(&mut self, value: Value) { |
| 52 | if self.shared.value.swap(val:value, order:Ordering::SeqCst) != value { |
| 53 | self.shared.waker.wake(); |
| 54 | } |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | impl Drop for Sender { |
| 59 | fn drop(&mut self) { |
| 60 | self.send(CLOSED); |
| 61 | } |
| 62 | } |
| 63 | |
| 64 | impl Receiver { |
| 65 | pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value { |
| 66 | self.shared.waker.register(cx.waker()); |
| 67 | self.shared.value.load(order:Ordering::SeqCst) |
| 68 | } |
| 69 | |
| 70 | pub(crate) fn peek(&self) -> Value { |
| 71 | self.shared.value.load(order:Ordering::Relaxed) |
| 72 | } |
| 73 | } |
| 74 | |