1//! Reference counter for channels.
2
3use std::isize;
4use std::ops;
5use std::process;
6use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7
8/// Reference counter internals.
9struct Counter<C> {
10 /// The number of senders associated with the channel.
11 senders: AtomicUsize,
12
13 /// The number of receivers associated with the channel.
14 receivers: AtomicUsize,
15
16 /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
17 destroy: AtomicBool,
18
19 /// The internal channel.
20 chan: C,
21}
22
23/// Wraps a channel into the reference counter.
24pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
25 let counter: *mut Counter = Box::into_raw(Box::new(Counter {
26 senders: AtomicUsize::new(1),
27 receivers: AtomicUsize::new(1),
28 destroy: AtomicBool::new(false),
29 chan,
30 }));
31 let s: Sender = Sender { counter };
32 let r: Receiver = Receiver { counter };
33 (s, r)
34}
35
36/// The sending side.
37pub(crate) struct Sender<C> {
38 counter: *mut Counter<C>,
39}
40
41impl<C> Sender<C> {
42 /// Returns the internal `Counter`.
43 fn counter(&self) -> &Counter<C> {
44 unsafe { &*self.counter }
45 }
46
47 /// Acquires another sender reference.
48 pub(crate) fn acquire(&self) -> Sender<C> {
49 let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
50
51 // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
52 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
53 // just abort when the count becomes very large.
54 if count > isize::MAX as usize {
55 process::abort();
56 }
57
58 Sender {
59 counter: self.counter,
60 }
61 }
62
63 /// Releases the sender reference.
64 ///
65 /// Function `disconnect` will be called if this is the last sender reference.
66 pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
67 if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
68 disconnect(&self.counter().chan);
69
70 if self.counter().destroy.swap(true, Ordering::AcqRel) {
71 drop(Box::from_raw(self.counter));
72 }
73 }
74 }
75}
76
77impl<C> ops::Deref for Sender<C> {
78 type Target = C;
79
80 fn deref(&self) -> &C {
81 &self.counter().chan
82 }
83}
84
85impl<C> PartialEq for Sender<C> {
86 fn eq(&self, other: &Sender<C>) -> bool {
87 self.counter == other.counter
88 }
89}
90
91/// The receiving side.
92pub(crate) struct Receiver<C> {
93 counter: *mut Counter<C>,
94}
95
96impl<C> Receiver<C> {
97 /// Returns the internal `Counter`.
98 fn counter(&self) -> &Counter<C> {
99 unsafe { &*self.counter }
100 }
101
102 /// Acquires another receiver reference.
103 pub(crate) fn acquire(&self) -> Receiver<C> {
104 let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
105
106 // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
107 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
108 // just abort when the count becomes very large.
109 if count > isize::MAX as usize {
110 process::abort();
111 }
112
113 Receiver {
114 counter: self.counter,
115 }
116 }
117
118 /// Releases the receiver reference.
119 ///
120 /// Function `disconnect` will be called if this is the last receiver reference.
121 pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
122 if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
123 disconnect(&self.counter().chan);
124
125 if self.counter().destroy.swap(true, Ordering::AcqRel) {
126 drop(Box::from_raw(self.counter));
127 }
128 }
129 }
130}
131
132impl<C> ops::Deref for Receiver<C> {
133 type Target = C;
134
135 fn deref(&self) -> &C {
136 &self.counter().chan
137 }
138}
139
140impl<C> PartialEq for Receiver<C> {
141 fn eq(&self, other: &Receiver<C>) -> bool {
142 self.counter == other.counter
143 }
144}
145