| 1 | //! Reference counter for channels. |
| 2 | |
| 3 | use std::boxed::Box; |
| 4 | use std::isize; |
| 5 | use std::ops; |
| 6 | use std::process; |
| 7 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
| 8 | |
| 9 | /// Reference counter internals. |
| 10 | struct Counter<C> { |
| 11 | /// The number of senders associated with the channel. |
| 12 | senders: AtomicUsize, |
| 13 | |
| 14 | /// The number of receivers associated with the channel. |
| 15 | receivers: AtomicUsize, |
| 16 | |
| 17 | /// Set to `true` if the last sender or the last receiver reference deallocates the channel. |
| 18 | destroy: AtomicBool, |
| 19 | |
| 20 | /// The internal channel. |
| 21 | chan: C, |
| 22 | } |
| 23 | |
| 24 | /// Wraps a channel into the reference counter. |
| 25 | pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) { |
| 26 | let counter: *mut Counter = Box::into_raw(Box::new(Counter { |
| 27 | senders: AtomicUsize::new(1), |
| 28 | receivers: AtomicUsize::new(1), |
| 29 | destroy: AtomicBool::new(false), |
| 30 | chan, |
| 31 | })); |
| 32 | let s: Sender = Sender { counter }; |
| 33 | let r: Receiver = Receiver { counter }; |
| 34 | (s, r) |
| 35 | } |
| 36 | |
| 37 | /// The sending side. |
| 38 | pub(crate) struct Sender<C> { |
| 39 | counter: *mut Counter<C>, |
| 40 | } |
| 41 | |
| 42 | impl<C> Sender<C> { |
| 43 | /// Returns the internal `Counter`. |
| 44 | fn counter(&self) -> &Counter<C> { |
| 45 | unsafe { &*self.counter } |
| 46 | } |
| 47 | |
| 48 | /// Acquires another sender reference. |
| 49 | pub(crate) fn acquire(&self) -> Sender<C> { |
| 50 | let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); |
| 51 | |
| 52 | // Cloning senders and calling `mem::forget` on the clones could potentially overflow the |
| 53 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
| 54 | // just abort when the count becomes very large. |
| 55 | if count > isize::MAX as usize { |
| 56 | process::abort(); |
| 57 | } |
| 58 | |
| 59 | Sender { |
| 60 | counter: self.counter, |
| 61 | } |
| 62 | } |
| 63 | |
| 64 | /// Releases the sender reference. |
| 65 | /// |
| 66 | /// Function `disconnect` will be called if this is the last sender reference. |
| 67 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
| 68 | if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { |
| 69 | disconnect(&self.counter().chan); |
| 70 | |
| 71 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
| 72 | drop(Box::from_raw(self.counter)); |
| 73 | } |
| 74 | } |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | impl<C> ops::Deref for Sender<C> { |
| 79 | type Target = C; |
| 80 | |
| 81 | fn deref(&self) -> &C { |
| 82 | &self.counter().chan |
| 83 | } |
| 84 | } |
| 85 | |
| 86 | impl<C> PartialEq for Sender<C> { |
| 87 | fn eq(&self, other: &Sender<C>) -> bool { |
| 88 | self.counter == other.counter |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | /// The receiving side. |
| 93 | pub(crate) struct Receiver<C> { |
| 94 | counter: *mut Counter<C>, |
| 95 | } |
| 96 | |
| 97 | impl<C> Receiver<C> { |
| 98 | /// Returns the internal `Counter`. |
| 99 | fn counter(&self) -> &Counter<C> { |
| 100 | unsafe { &*self.counter } |
| 101 | } |
| 102 | |
| 103 | /// Acquires another receiver reference. |
| 104 | pub(crate) fn acquire(&self) -> Receiver<C> { |
| 105 | let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); |
| 106 | |
| 107 | // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the |
| 108 | // counter. It's very difficult to recover sensibly from such degenerate scenarios so we |
| 109 | // just abort when the count becomes very large. |
| 110 | if count > isize::MAX as usize { |
| 111 | process::abort(); |
| 112 | } |
| 113 | |
| 114 | Receiver { |
| 115 | counter: self.counter, |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | /// Releases the receiver reference. |
| 120 | /// |
| 121 | /// Function `disconnect` will be called if this is the last receiver reference. |
| 122 | pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) { |
| 123 | if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { |
| 124 | disconnect(&self.counter().chan); |
| 125 | |
| 126 | if self.counter().destroy.swap(true, Ordering::AcqRel) { |
| 127 | drop(Box::from_raw(self.counter)); |
| 128 | } |
| 129 | } |
| 130 | } |
| 131 | } |
| 132 | |
| 133 | impl<C> ops::Deref for Receiver<C> { |
| 134 | type Target = C; |
| 135 | |
| 136 | fn deref(&self) -> &C { |
| 137 | &self.counter().chan |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | impl<C> PartialEq for Receiver<C> { |
| 142 | fn eq(&self, other: &Receiver<C>) -> bool { |
| 143 | self.counter == other.counter |
| 144 | } |
| 145 | } |
| 146 | |