1//! An SPSC broadcast channel.
2//!
3//! - The value can only be a `usize`.
4//! - The consumer is only notified if the value is different.
5//! - The value `0` is reserved for closed.
6
7use futures_util::task::AtomicWaker;
8use std::sync::{
9 atomic::{AtomicUsize, Ordering},
10 Arc,
11};
12use std::task;
13
14type Value = usize;
15
16pub(crate) const CLOSED: usize = 0;
17
18pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
19 debug_assert!(
20 initial != CLOSED,
21 "watch::channel initial state of 0 is reserved"
22 );
23
24 let shared: Arc = Arc::new(data:Shared {
25 value: AtomicUsize::new(initial),
26 waker: AtomicWaker::new(),
27 });
28
29 (
30 Sender {
31 shared: shared.clone(),
32 },
33 Receiver { shared },
34 )
35}
36
37pub(crate) struct Sender {
38 shared: Arc<Shared>,
39}
40
41pub(crate) struct Receiver {
42 shared: Arc<Shared>,
43}
44
45struct Shared {
46 value: AtomicUsize,
47 waker: AtomicWaker,
48}
49
50impl Sender {
51 pub(crate) fn send(&mut self, value: Value) {
52 if self.shared.value.swap(val:value, order:Ordering::SeqCst) != value {
53 self.shared.waker.wake();
54 }
55 }
56}
57
58impl Drop for Sender {
59 fn drop(&mut self) {
60 self.send(CLOSED);
61 }
62}
63
64impl Receiver {
65 pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
66 self.shared.waker.register(cx.waker());
67 self.shared.value.load(order:Ordering::SeqCst)
68 }
69
70 pub(crate) fn peek(&self) -> Value {
71 self.shared.value.load(order:Ordering::Relaxed)
72 }
73}
74