| 1 | use crate::sync::atomic::{Atomic, AtomicBool, AtomicUsize, Ordering}; |
| 2 | use crate::{ops, process}; |
| 3 | |
| 4 | /// Reference counter internals. |
| 5 | struct Counter<C> { |
| 6 | /// The number of senders associated with the channel. |
| 7 | senders: Atomic<usize>, |
| 8 | |
| 9 | /// The number of receivers associated with the channel. |
| 10 | receivers: Atomic<usize>, |
| 11 | |
| 12 | /// Set to `true` if the last sender or the last receiver reference deallocates the channel. |
| 13 | destroy: Atomic<bool>, |
| 14 | |
| 15 | /// The internal channel. |
| 16 | chan: C, |
| 17 | } |
| 18 | |
| 19 | /// Wraps a channel into the reference counter. |
| 20 | pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { |
| 21 | let counter: *mut Counter = Box::into_raw(Box::new(Counter { |
| 22 | senders: AtomicUsize::new(1), |
| 23 | receivers: AtomicUsize::new(1), |
| 24 | destroy: AtomicBool::new(false), |
| 25 | chan, |
| 26 | })); |
| 27 | let s: Sender = Sender { counter }; |
| 28 | let r: Receiver = Receiver { counter }; |
| 29 | (s, r) |
| 30 | } |
| 31 | |
| 32 | /// The sending side. |
| 33 | pub(crate) struct Sender<C> { |
| 34 | counter: *mut Counter<C>, |
| 35 | } |
| 36 | |
| 37 | impl<C> Sender<C> { |
| 38 | /// Returns the internal `Counter`. |
| 39 | fn counter(&self) -> &Counter<C> { |
| 40 | unsafe { &*self.counter } |
| 41 | } |
| 42 | |
| 43 | /// Acquires another sender reference. |
| 44 | pub(crate) fn acquire(&self) -> Sender<C> { |
| 45 | let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); |
| 46 | |
| 47 | // Cloning senders and calling `mem::forget` on the clones could potentially overflow the |
| 48 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
| 49 | // just abort when the count becomes very large. |
| 50 | if count > isize::MAX as usize { |
| 51 | process::abort(); |
| 52 | } |
| 53 | |
| 54 | Sender { counter: self.counter } |
| 55 | } |
| 56 | |
| 57 | /// Releases the sender reference. |
| 58 | /// |
| 59 | /// Function `disconnect` will be called if this is the last sender reference. |
| 60 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
| 61 | if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { |
| 62 | disconnect(&self.counter().chan); |
| 63 | |
| 64 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
| 65 | drop(unsafe { Box::from_raw(self.counter) }); |
| 66 | } |
| 67 | } |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | impl<C> ops::Deref for Sender<C> { |
| 72 | type Target = C; |
| 73 | |
| 74 | fn deref(&self) -> &C { |
| 75 | &self.counter().chan |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | impl<C> PartialEq for Sender<C> { |
| 80 | fn eq(&self, other: &Sender<C>) -> bool { |
| 81 | self.counter == other.counter |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | /// The receiving side. |
| 86 | pub(crate) struct Receiver<C> { |
| 87 | counter: *mut Counter<C>, |
| 88 | } |
| 89 | |
| 90 | impl<C> Receiver<C> { |
| 91 | /// Returns the internal `Counter`. |
| 92 | fn counter(&self) -> &Counter<C> { |
| 93 | unsafe { &*self.counter } |
| 94 | } |
| 95 | |
| 96 | /// Acquires another receiver reference. |
| 97 | pub(crate) fn acquire(&self) -> Receiver<C> { |
| 98 | let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); |
| 99 | |
| 100 | // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the |
| 101 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
| 102 | // just abort when the count becomes very large. |
| 103 | if count > isize::MAX as usize { |
| 104 | process::abort(); |
| 105 | } |
| 106 | |
| 107 | Receiver { counter: self.counter } |
| 108 | } |
| 109 | |
| 110 | /// Releases the receiver reference. |
| 111 | /// |
| 112 | /// Function `disconnect` will be called if this is the last receiver reference. |
| 113 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
| 114 | if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { |
| 115 | disconnect(&self.counter().chan); |
| 116 | |
| 117 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
| 118 | drop(unsafe { Box::from_raw(self.counter) }); |
| 119 | } |
| 120 | } |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | impl<C> ops::Deref for Receiver<C> { |
| 125 | type Target = C; |
| 126 | |
| 127 | fn deref(&self) -> &C { |
| 128 | &self.counter().chan |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | impl<C> PartialEq for Receiver<C> { |
| 133 | fn eq(&self, other: &Receiver<C>) -> bool { |
| 134 | self.counter == other.counter |
| 135 | } |
| 136 | } |
| 137 | |