1use crate::sync::atomic::{Atomic, AtomicBool, AtomicUsize, Ordering};
2use crate::{ops, process};
3
4/// Reference counter internals.
5struct Counter<C> {
6 /// The number of senders associated with the channel.
7 senders: Atomic<usize>,
8
9 /// The number of receivers associated with the channel.
10 receivers: Atomic<usize>,
11
12 /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
13 destroy: Atomic<bool>,
14
15 /// The internal channel.
16 chan: C,
17}
18
19/// Wraps a channel into the reference counter.
20pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
21 let counter: *mut Counter = Box::into_raw(Box::new(Counter {
22 senders: AtomicUsize::new(1),
23 receivers: AtomicUsize::new(1),
24 destroy: AtomicBool::new(false),
25 chan,
26 }));
27 let s: Sender = Sender { counter };
28 let r: Receiver = Receiver { counter };
29 (s, r)
30}
31
32/// The sending side.
33pub(crate) struct Sender<C> {
34 counter: *mut Counter<C>,
35}
36
37impl<C> Sender<C> {
38 /// Returns the internal `Counter`.
39 fn counter(&self) -> &Counter<C> {
40 unsafe { &*self.counter }
41 }
42
43 /// Acquires another sender reference.
44 pub(crate) fn acquire(&self) -> Sender<C> {
45 let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
46
47 // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
48 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
49 // just abort when the count becomes very large.
50 if count > isize::MAX as usize {
51 process::abort();
52 }
53
54 Sender { counter: self.counter }
55 }
56
57 /// Releases the sender reference.
58 ///
59 /// Function `disconnect` will be called if this is the last sender reference.
60 pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
61 if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
62 disconnect(&self.counter().chan);
63
64 if self.counter().destroy.swap(true, Ordering::AcqRel) {
65 drop(unsafe { Box::from_raw(self.counter) });
66 }
67 }
68 }
69}
70
71impl<C> ops::Deref for Sender<C> {
72 type Target = C;
73
74 fn deref(&self) -> &C {
75 &self.counter().chan
76 }
77}
78
79impl<C> PartialEq for Sender<C> {
80 fn eq(&self, other: &Sender<C>) -> bool {
81 self.counter == other.counter
82 }
83}
84
85/// The receiving side.
86pub(crate) struct Receiver<C> {
87 counter: *mut Counter<C>,
88}
89
90impl<C> Receiver<C> {
91 /// Returns the internal `Counter`.
92 fn counter(&self) -> &Counter<C> {
93 unsafe { &*self.counter }
94 }
95
96 /// Acquires another receiver reference.
97 pub(crate) fn acquire(&self) -> Receiver<C> {
98 let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
99
100 // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
101 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
102 // just abort when the count becomes very large.
103 if count > isize::MAX as usize {
104 process::abort();
105 }
106
107 Receiver { counter: self.counter }
108 }
109
110 /// Releases the receiver reference.
111 ///
112 /// Function `disconnect` will be called if this is the last receiver reference.
113 pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
114 if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
115 disconnect(&self.counter().chan);
116
117 if self.counter().destroy.swap(true, Ordering::AcqRel) {
118 drop(unsafe { Box::from_raw(self.counter) });
119 }
120 }
121 }
122}
123
124impl<C> ops::Deref for Receiver<C> {
125 type Target = C;
126
127 fn deref(&self) -> &C {
128 &self.counter().chan
129 }
130}
131
132impl<C> PartialEq for Receiver<C> {
133 fn eq(&self, other: &Receiver<C>) -> bool {
134 self.counter == other.counter
135 }
136}
137