1 | //! Reference counter for channels. |
2 | |
3 | use std::isize; |
4 | use std::ops; |
5 | use std::process; |
6 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
7 | |
8 | /// Reference counter internals. |
9 | struct Counter<C> { |
10 | /// The number of senders associated with the channel. |
11 | senders: AtomicUsize, |
12 | |
13 | /// The number of receivers associated with the channel. |
14 | receivers: AtomicUsize, |
15 | |
16 | /// Set to `true` if the last sender or the last receiver reference deallocates the channel. |
17 | destroy: AtomicBool, |
18 | |
19 | /// The internal channel. |
20 | chan: C, |
21 | } |
22 | |
23 | /// Wraps a channel into the reference counter. |
24 | pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { |
25 | let counter: *mut Counter = Box::into_raw(Box::new(Counter { |
26 | senders: AtomicUsize::new(1), |
27 | receivers: AtomicUsize::new(1), |
28 | destroy: AtomicBool::new(false), |
29 | chan, |
30 | })); |
31 | let s: Sender = Sender { counter }; |
32 | let r: Receiver = Receiver { counter }; |
33 | (s, r) |
34 | } |
35 | |
36 | /// The sending side. |
37 | pub(crate) struct Sender<C> { |
38 | counter: *mut Counter<C>, |
39 | } |
40 | |
41 | impl<C> Sender<C> { |
42 | /// Returns the internal `Counter`. |
43 | fn counter(&self) -> &Counter<C> { |
44 | unsafe { &*self.counter } |
45 | } |
46 | |
47 | /// Acquires another sender reference. |
48 | pub(crate) fn acquire(&self) -> Sender<C> { |
49 | let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); |
50 | |
51 | // Cloning senders and calling `mem::forget` on the clones could potentially overflow the |
52 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
53 | // just abort when the count becomes very large. |
54 | if count > isize::MAX as usize { |
55 | process::abort(); |
56 | } |
57 | |
58 | Sender { |
59 | counter: self.counter, |
60 | } |
61 | } |
62 | |
63 | /// Releases the sender reference. |
64 | /// |
65 | /// Function `disconnect` will be called if this is the last sender reference. |
66 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
67 | if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { |
68 | disconnect(&self.counter().chan); |
69 | |
70 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
71 | drop(Box::from_raw(self.counter)); |
72 | } |
73 | } |
74 | } |
75 | } |
76 | |
77 | impl<C> ops::Deref for Sender<C> { |
78 | type Target = C; |
79 | |
80 | fn deref(&self) -> &C { |
81 | &self.counter().chan |
82 | } |
83 | } |
84 | |
85 | impl<C> PartialEq for Sender<C> { |
86 | fn eq(&self, other: &Sender<C>) -> bool { |
87 | self.counter == other.counter |
88 | } |
89 | } |
90 | |
91 | /// The receiving side. |
92 | pub(crate) struct Receiver<C> { |
93 | counter: *mut Counter<C>, |
94 | } |
95 | |
96 | impl<C> Receiver<C> { |
97 | /// Returns the internal `Counter`. |
98 | fn counter(&self) -> &Counter<C> { |
99 | unsafe { &*self.counter } |
100 | } |
101 | |
102 | /// Acquires another receiver reference. |
103 | pub(crate) fn acquire(&self) -> Receiver<C> { |
104 | let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); |
105 | |
106 | // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the |
107 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
108 | // just abort when the count becomes very large. |
109 | if count > isize::MAX as usize { |
110 | process::abort(); |
111 | } |
112 | |
113 | Receiver { |
114 | counter: self.counter, |
115 | } |
116 | } |
117 | |
118 | /// Releases the receiver reference. |
119 | /// |
120 | /// Function `disconnect` will be called if this is the last receiver reference. |
121 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
122 | if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { |
123 | disconnect(&self.counter().chan); |
124 | |
125 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
126 | drop(Box::from_raw(self.counter)); |
127 | } |
128 | } |
129 | } |
130 | } |
131 | |
132 | impl<C> ops::Deref for Receiver<C> { |
133 | type Target = C; |
134 | |
135 | fn deref(&self) -> &C { |
136 | &self.counter().chan |
137 | } |
138 | } |
139 | |
140 | impl<C> PartialEq for Receiver<C> { |
141 | fn eq(&self, other: &Receiver<C>) -> bool { |
142 | self.counter == other.counter |
143 | } |
144 | } |
145 | |