1 | use crate::ops; |
2 | use crate::process; |
3 | use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
4 | |
5 | /// Reference counter internals. |
6 | struct Counter<C> { |
7 | /// The number of senders associated with the channel. |
8 | senders: AtomicUsize, |
9 | |
10 | /// The number of receivers associated with the channel. |
11 | receivers: AtomicUsize, |
12 | |
13 | /// Set to `true` if the last sender or the last receiver reference deallocates the channel. |
14 | destroy: AtomicBool, |
15 | |
16 | /// The internal channel. |
17 | chan: C, |
18 | } |
19 | |
20 | /// Wraps a channel into the reference counter. |
21 | pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { |
22 | let counter: *mut Counter = Box::into_raw(Box::new(Counter { |
23 | senders: AtomicUsize::new(1), |
24 | receivers: AtomicUsize::new(1), |
25 | destroy: AtomicBool::new(false), |
26 | chan, |
27 | })); |
28 | let s: Sender = Sender { counter }; |
29 | let r: Receiver = Receiver { counter }; |
30 | (s, r) |
31 | } |
32 | |
33 | /// The sending side. |
34 | pub(crate) struct Sender<C> { |
35 | counter: *mut Counter<C>, |
36 | } |
37 | |
38 | impl<C> Sender<C> { |
39 | /// Returns the internal `Counter`. |
40 | fn counter(&self) -> &Counter<C> { |
41 | unsafe { &*self.counter } |
42 | } |
43 | |
44 | /// Acquires another sender reference. |
45 | pub(crate) fn acquire(&self) -> Sender<C> { |
46 | let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); |
47 | |
48 | // Cloning senders and calling `mem::forget` on the clones could potentially overflow the |
49 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
50 | // just abort when the count becomes very large. |
51 | if count > isize::MAX as usize { |
52 | process::abort(); |
53 | } |
54 | |
55 | Sender { counter: self.counter } |
56 | } |
57 | |
58 | /// Releases the sender reference. |
59 | /// |
60 | /// Function `disconnect` will be called if this is the last sender reference. |
61 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
62 | if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { |
63 | disconnect(&self.counter().chan); |
64 | |
65 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
66 | drop(Box::from_raw(self.counter)); |
67 | } |
68 | } |
69 | } |
70 | } |
71 | |
72 | impl<C> ops::Deref for Sender<C> { |
73 | type Target = C; |
74 | |
75 | fn deref(&self) -> &C { |
76 | &self.counter().chan |
77 | } |
78 | } |
79 | |
80 | impl<C> PartialEq for Sender<C> { |
81 | fn eq(&self, other: &Sender<C>) -> bool { |
82 | self.counter == other.counter |
83 | } |
84 | } |
85 | |
86 | /// The receiving side. |
87 | pub(crate) struct Receiver<C> { |
88 | counter: *mut Counter<C>, |
89 | } |
90 | |
91 | impl<C> Receiver<C> { |
92 | /// Returns the internal `Counter`. |
93 | fn counter(&self) -> &Counter<C> { |
94 | unsafe { &*self.counter } |
95 | } |
96 | |
97 | /// Acquires another receiver reference. |
98 | pub(crate) fn acquire(&self) -> Receiver<C> { |
99 | let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); |
100 | |
101 | // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the |
102 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
103 | // just abort when the count becomes very large. |
104 | if count > isize::MAX as usize { |
105 | process::abort(); |
106 | } |
107 | |
108 | Receiver { counter: self.counter } |
109 | } |
110 | |
111 | /// Releases the receiver reference. |
112 | /// |
113 | /// Function `disconnect` will be called if this is the last receiver reference. |
114 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
115 | if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { |
116 | disconnect(&self.counter().chan); |
117 | |
118 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
119 | drop(Box::from_raw(self.counter)); |
120 | } |
121 | } |
122 | } |
123 | } |
124 | |
125 | impl<C> ops::Deref for Receiver<C> { |
126 | type Target = C; |
127 | |
128 | fn deref(&self) -> &C { |
129 | &self.counter().chan |
130 | } |
131 | } |
132 | |
133 | impl<C> PartialEq for Receiver<C> { |
134 | fn eq(&self, other: &Receiver<C>) -> bool { |
135 | self.counter == other.counter |
136 | } |
137 | } |
138 | |