1 | use crate::sync::atomic::{Atomic, AtomicBool, AtomicUsize, Ordering}; |
2 | use crate::{ops, process}; |
3 | |
4 | /// Reference counter internals. |
5 | struct Counter<C> { |
6 | /// The number of senders associated with the channel. |
7 | senders: Atomic<usize>, |
8 | |
9 | /// The number of receivers associated with the channel. |
10 | receivers: Atomic<usize>, |
11 | |
12 | /// Set to `true` if the last sender or the last receiver reference deallocates the channel. |
13 | destroy: Atomic<bool>, |
14 | |
15 | /// The internal channel. |
16 | chan: C, |
17 | } |
18 | |
19 | /// Wraps a channel into the reference counter. |
20 | pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { |
21 | let counter: *mut Counter = Box::into_raw(Box::new(Counter { |
22 | senders: AtomicUsize::new(1), |
23 | receivers: AtomicUsize::new(1), |
24 | destroy: AtomicBool::new(false), |
25 | chan, |
26 | })); |
27 | let s: Sender = Sender { counter }; |
28 | let r: Receiver = Receiver { counter }; |
29 | (s, r) |
30 | } |
31 | |
32 | /// The sending side. |
33 | pub(crate) struct Sender<C> { |
34 | counter: *mut Counter<C>, |
35 | } |
36 | |
37 | impl<C> Sender<C> { |
38 | /// Returns the internal `Counter`. |
39 | fn counter(&self) -> &Counter<C> { |
40 | unsafe { &*self.counter } |
41 | } |
42 | |
43 | /// Acquires another sender reference. |
44 | pub(crate) fn acquire(&self) -> Sender<C> { |
45 | let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); |
46 | |
47 | // Cloning senders and calling `mem::forget` on the clones could potentially overflow the |
48 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
49 | // just abort when the count becomes very large. |
50 | if count > isize::MAX as usize { |
51 | process::abort(); |
52 | } |
53 | |
54 | Sender { counter: self.counter } |
55 | } |
56 | |
57 | /// Releases the sender reference. |
58 | /// |
59 | /// Function `disconnect` will be called if this is the last sender reference. |
60 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
61 | if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { |
62 | disconnect(&self.counter().chan); |
63 | |
64 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
65 | drop(unsafe { Box::from_raw(self.counter) }); |
66 | } |
67 | } |
68 | } |
69 | } |
70 | |
71 | impl<C> ops::Deref for Sender<C> { |
72 | type Target = C; |
73 | |
74 | fn deref(&self) -> &C { |
75 | &self.counter().chan |
76 | } |
77 | } |
78 | |
79 | impl<C> PartialEq for Sender<C> { |
80 | fn eq(&self, other: &Sender<C>) -> bool { |
81 | self.counter == other.counter |
82 | } |
83 | } |
84 | |
85 | /// The receiving side. |
86 | pub(crate) struct Receiver<C> { |
87 | counter: *mut Counter<C>, |
88 | } |
89 | |
90 | impl<C> Receiver<C> { |
91 | /// Returns the internal `Counter`. |
92 | fn counter(&self) -> &Counter<C> { |
93 | unsafe { &*self.counter } |
94 | } |
95 | |
96 | /// Acquires another receiver reference. |
97 | pub(crate) fn acquire(&self) -> Receiver<C> { |
98 | let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); |
99 | |
100 | // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the |
101 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
102 | // just abort when the count becomes very large. |
103 | if count > isize::MAX as usize { |
104 | process::abort(); |
105 | } |
106 | |
107 | Receiver { counter: self.counter } |
108 | } |
109 | |
110 | /// Releases the receiver reference. |
111 | /// |
112 | /// Function `disconnect` will be called if this is the last receiver reference. |
113 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
114 | if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { |
115 | disconnect(&self.counter().chan); |
116 | |
117 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
118 | drop(unsafe { Box::from_raw(self.counter) }); |
119 | } |
120 | } |
121 | } |
122 | } |
123 | |
124 | impl<C> ops::Deref for Receiver<C> { |
125 | type Target = C; |
126 | |
127 | fn deref(&self) -> &C { |
128 | &self.counter().chan |
129 | } |
130 | } |
131 | |
132 | impl<C> PartialEq for Receiver<C> { |
133 | fn eq(&self, other: &Receiver<C>) -> bool { |
134 | self.counter == other.counter |
135 | } |
136 | } |
137 | |