1 | //! An SPSC broadcast channel. |
2 | //! |
3 | //! - The value can only be a `usize`. |
4 | //! - The consumer is only notified if the value is different. |
5 | //! - The value `0` is reserved for closed. |
6 | |
7 | use futures_util::task::AtomicWaker; |
8 | use std::sync::{ |
9 | atomic::{AtomicUsize, Ordering}, |
10 | Arc, |
11 | }; |
12 | use std::task; |
13 | |
14 | type Value = usize; |
15 | |
16 | pub(crate) const CLOSED: usize = 0; |
17 | |
18 | pub(crate) fn channel(initial: Value) -> (Sender, Receiver) { |
19 | debug_assert!( |
20 | initial != CLOSED, |
21 | "watch::channel initial state of 0 is reserved" |
22 | ); |
23 | |
24 | let shared: Arc = Arc::new(data:Shared { |
25 | value: AtomicUsize::new(initial), |
26 | waker: AtomicWaker::new(), |
27 | }); |
28 | |
29 | ( |
30 | Sender { |
31 | shared: shared.clone(), |
32 | }, |
33 | Receiver { shared }, |
34 | ) |
35 | } |
36 | |
37 | pub(crate) struct Sender { |
38 | shared: Arc<Shared>, |
39 | } |
40 | |
41 | pub(crate) struct Receiver { |
42 | shared: Arc<Shared>, |
43 | } |
44 | |
45 | struct Shared { |
46 | value: AtomicUsize, |
47 | waker: AtomicWaker, |
48 | } |
49 | |
50 | impl Sender { |
51 | pub(crate) fn send(&mut self, value: Value) { |
52 | if self.shared.value.swap(val:value, order:Ordering::SeqCst) != value { |
53 | self.shared.waker.wake(); |
54 | } |
55 | } |
56 | } |
57 | |
58 | impl Drop for Sender { |
59 | fn drop(&mut self) { |
60 | self.send(CLOSED); |
61 | } |
62 | } |
63 | |
64 | impl Receiver { |
65 | pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value { |
66 | self.shared.waker.register(cx.waker()); |
67 | self.shared.value.load(order:Ordering::SeqCst) |
68 | } |
69 | |
70 | pub(crate) fn peek(&self) -> Value { |
71 | self.shared.value.load(order:Ordering::Relaxed) |
72 | } |
73 | } |
74 | |