1 | //! Reference counter for channels. |
2 | |
3 | use std::boxed::Box; |
4 | use std::isize; |
5 | use std::ops; |
6 | use std::process; |
7 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
8 | |
9 | /// Reference counter internals. |
10 | struct Counter<C> { |
11 | /// The number of senders associated with the channel. |
12 | senders: AtomicUsize, |
13 | |
14 | /// The number of receivers associated with the channel. |
15 | receivers: AtomicUsize, |
16 | |
17 | /// Set to `true` if the last sender or the last receiver reference deallocates the channel. |
18 | destroy: AtomicBool, |
19 | |
20 | /// The internal channel. |
21 | chan: C, |
22 | } |
23 | |
24 | /// Wraps a channel into the reference counter. |
25 | pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { |
26 | let counter: *mut Counter = Box::into_raw(Box::new(Counter { |
27 | senders: AtomicUsize::new(1), |
28 | receivers: AtomicUsize::new(1), |
29 | destroy: AtomicBool::new(false), |
30 | chan, |
31 | })); |
32 | let s: Sender = Sender { counter }; |
33 | let r: Receiver = Receiver { counter }; |
34 | (s, r) |
35 | } |
36 | |
37 | /// The sending side. |
38 | pub(crate) struct Sender<C> { |
39 | counter: *mut Counter<C>, |
40 | } |
41 | |
42 | impl<C> Sender<C> { |
43 | /// Returns the internal `Counter`. |
44 | fn counter(&self) -> &Counter<C> { |
45 | unsafe { &*self.counter } |
46 | } |
47 | |
48 | /// Acquires another sender reference. |
49 | pub(crate) fn acquire(&self) -> Sender<C> { |
50 | let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); |
51 | |
52 | // Cloning senders and calling `mem::forget` on the clones could potentially overflow the |
53 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
54 | // just abort when the count becomes very large. |
55 | if count > isize::MAX as usize { |
56 | process::abort(); |
57 | } |
58 | |
59 | Sender { |
60 | counter: self.counter, |
61 | } |
62 | } |
63 | |
64 | /// Releases the sender reference. |
65 | /// |
66 | /// Function `disconnect` will be called if this is the last sender reference. |
67 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
68 | if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { |
69 | disconnect(&self.counter().chan); |
70 | |
71 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
72 | drop(Box::from_raw(self.counter)); |
73 | } |
74 | } |
75 | } |
76 | } |
77 | |
78 | impl<C> ops::Deref for Sender<C> { |
79 | type Target = C; |
80 | |
81 | fn deref(&self) -> &C { |
82 | &self.counter().chan |
83 | } |
84 | } |
85 | |
86 | impl<C> PartialEq for Sender<C> { |
87 | fn eq(&self, other: &Sender<C>) -> bool { |
88 | self.counter == other.counter |
89 | } |
90 | } |
91 | |
92 | /// The receiving side. |
93 | pub(crate) struct Receiver<C> { |
94 | counter: *mut Counter<C>, |
95 | } |
96 | |
97 | impl<C> Receiver<C> { |
98 | /// Returns the internal `Counter`. |
99 | fn counter(&self) -> &Counter<C> { |
100 | unsafe { &*self.counter } |
101 | } |
102 | |
103 | /// Acquires another receiver reference. |
104 | pub(crate) fn acquire(&self) -> Receiver<C> { |
105 | let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); |
106 | |
107 | // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the |
108 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
109 | // just abort when the count becomes very large. |
110 | if count > isize::MAX as usize { |
111 | process::abort(); |
112 | } |
113 | |
114 | Receiver { |
115 | counter: self.counter, |
116 | } |
117 | } |
118 | |
119 | /// Releases the receiver reference. |
120 | /// |
121 | /// Function `disconnect` will be called if this is the last receiver reference. |
122 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
123 | if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { |
124 | disconnect(&self.counter().chan); |
125 | |
126 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
127 | drop(Box::from_raw(self.counter)); |
128 | } |
129 | } |
130 | } |
131 | } |
132 | |
133 | impl<C> ops::Deref for Receiver<C> { |
134 | type Target = C; |
135 | |
136 | fn deref(&self) -> &C { |
137 | &self.counter().chan |
138 | } |
139 | } |
140 | |
141 | impl<C> PartialEq for Receiver<C> { |
142 | fn eq(&self, other: &Receiver<C>) -> bool { |
143 | self.counter == other.counter |
144 | } |
145 | } |
146 | |