1use crate::ops;
2use crate::process;
3use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4
5/// Reference counter internals.
6struct Counter<C> {
7 /// The number of senders associated with the channel.
8 senders: AtomicUsize,
9
10 /// The number of receivers associated with the channel.
11 receivers: AtomicUsize,
12
13 /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
14 destroy: AtomicBool,
15
16 /// The internal channel.
17 chan: C,
18}
19
20/// Wraps a channel into the reference counter.
21pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
22 let counter: *mut Counter = Box::into_raw(Box::new(Counter {
23 senders: AtomicUsize::new(1),
24 receivers: AtomicUsize::new(1),
25 destroy: AtomicBool::new(false),
26 chan,
27 }));
28 let s: Sender = Sender { counter };
29 let r: Receiver = Receiver { counter };
30 (s, r)
31}
32
33/// The sending side.
34pub(crate) struct Sender<C> {
35 counter: *mut Counter<C>,
36}
37
38impl<C> Sender<C> {
39 /// Returns the internal `Counter`.
40 fn counter(&self) -> &Counter<C> {
41 unsafe { &*self.counter }
42 }
43
44 /// Acquires another sender reference.
45 pub(crate) fn acquire(&self) -> Sender<C> {
46 let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
47
48 // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
49 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
50 // just abort when the count becomes very large.
51 if count > isize::MAX as usize {
52 process::abort();
53 }
54
55 Sender { counter: self.counter }
56 }
57
58 /// Releases the sender reference.
59 ///
60 /// Function `disconnect` will be called if this is the last sender reference.
61 pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
62 if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
63 disconnect(&self.counter().chan);
64
65 if self.counter().destroy.swap(true, Ordering::AcqRel) {
66 drop(Box::from_raw(self.counter));
67 }
68 }
69 }
70}
71
72impl<C> ops::Deref for Sender<C> {
73 type Target = C;
74
75 fn deref(&self) -> &C {
76 &self.counter().chan
77 }
78}
79
80impl<C> PartialEq for Sender<C> {
81 fn eq(&self, other: &Sender<C>) -> bool {
82 self.counter == other.counter
83 }
84}
85
86/// The receiving side.
87pub(crate) struct Receiver<C> {
88 counter: *mut Counter<C>,
89}
90
91impl<C> Receiver<C> {
92 /// Returns the internal `Counter`.
93 fn counter(&self) -> &Counter<C> {
94 unsafe { &*self.counter }
95 }
96
97 /// Acquires another receiver reference.
98 pub(crate) fn acquire(&self) -> Receiver<C> {
99 let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
100
101 // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
102 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
103 // just abort when the count becomes very large.
104 if count > isize::MAX as usize {
105 process::abort();
106 }
107
108 Receiver { counter: self.counter }
109 }
110
111 /// Releases the receiver reference.
112 ///
113 /// Function `disconnect` will be called if this is the last receiver reference.
114 pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
115 if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
116 disconnect(&self.counter().chan);
117
118 if self.counter().destroy.swap(true, Ordering::AcqRel) {
119 drop(Box::from_raw(self.counter));
120 }
121 }
122 }
123}
124
125impl<C> ops::Deref for Receiver<C> {
126 type Target = C;
127
128 fn deref(&self) -> &C {
129 &self.counter().chan
130 }
131}
132
133impl<C> PartialEq for Receiver<C> {
134 fn eq(&self, other: &Receiver<C>) -> bool {
135 self.counter == other.counter
136 }
137}
138