| 1 | //! A restricted channel to pass data from signal handler. |
| 2 | //! |
| 3 | //! When trying to communicate data from signal handler to the outside world, one can use an atomic |
| 4 | //! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for |
| 5 | //! larger data. |
| 6 | //! |
| 7 | //! This module provides a channel that can be used for that purpose. It is used by certain |
| 8 | //! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom |
| 9 | //! actions. In general, this is not a ready-made end-user API. |
| 10 | //! |
| 11 | //! # How does it work |
| 12 | //! |
| 13 | //! Each channel has a fixed number of slots and two queues (one for empty slots, one for full |
| 14 | //! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the |
| 15 | //! full one. Outside of signal handler, it can take the value out of the full queue and return the |
| 16 | //! slot to the empty queue. |
| 17 | //! |
| 18 | //! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are |
| 19 | //! stored in an atomic variable. |
| 20 | //! |
| 21 | //! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or |
| 22 | //! filled). |
| 23 | //! |
| 24 | //! # Fallible allocation of a slot |
| 25 | //! |
| 26 | //! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In |
| 27 | //! such case, there's no way to send the new value out of the handler (there's no way to safely |
| 28 | //! wait for a slot to appear, because the handler can be blocking the thread that is responsible |
| 29 | //! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds |
| 30 | //! of signals together if they are not consumed by application fast enough and there are no free |
| 31 | //! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole |
| 32 | //! system will yield a signal. |
| 33 | //! |
| 34 | //! This assumes that separate signals don't share the same buffer and that there's only one reader |
| 35 | //! (using multiple readers is still safe, but it is possible that all slots would be inside the |
| 36 | //! readers, but already empty, so the above argument would not hold). |
| 37 | |
| 38 | // TODO: Other sizes? Does anyone need more than 5 slots? |
| 39 | |
| 40 | use std::cell::UnsafeCell; |
| 41 | use std::sync::atomic::{AtomicU16, Ordering}; |
| 42 | |
| 43 | const SLOTS: usize = 5; |
| 44 | const BITS: u16 = 3; |
| 45 | const MASK: u16 = 0b111; |
| 46 | |
| 47 | fn get(n: u16, idx: u16) -> u16 { |
| 48 | (n >> (BITS * idx)) & MASK |
| 49 | } |
| 50 | |
| 51 | fn set(n: u16, idx: u16, v: u16) -> u16 { |
| 52 | let v: u16 = v << (BITS * idx); |
| 53 | let mask: u16 = MASK << (BITS * idx); |
| 54 | (n & !mask) | v |
| 55 | } |
| 56 | |
| 57 | fn enqueue(q: &AtomicU16, val: u16) { |
| 58 | let mut current: u16 = q.load(order:Ordering::Relaxed); |
| 59 | loop { |
| 60 | let empty: u16 = (0..SLOTS as u16) |
| 61 | .find(|i| get(current, *i) == 0) |
| 62 | .expect(msg:"No empty slot available" ); |
| 63 | let modified: u16 = set(n:current, idx:empty, v:val); |
| 64 | match q.compare_exchange_weak(current, new:modified, success:Ordering::Release, failure:Ordering::Relaxed) { |
| 65 | Ok(_) => break, |
| 66 | Err(changed: u16) => current = changed, // And retry with the changed value |
| 67 | } |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | fn dequeue(q: &AtomicU16) -> Option<u16> { |
| 72 | let mut current: u16 = q.load(order:Ordering::Relaxed); |
| 73 | loop { |
| 74 | let val: u16 = current & MASK; |
| 75 | // It's completely empty |
| 76 | if val == 0 { |
| 77 | break None; |
| 78 | } |
| 79 | let modified: u16 = current >> BITS; |
| 80 | match q.compare_exchange_weak(current, new:modified, success:Ordering::Acquire, failure:Ordering::Relaxed) { |
| 81 | Ok(_) => break Some(val), |
| 82 | Err(changed: u16) => current = changed, |
| 83 | } |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | /// A restricted async-signal-safe channel |
| 88 | /// |
| 89 | /// This is a bit like the usual channel used for inter-thread communication, but with several |
| 90 | /// restrictions: |
| 91 | /// |
| 92 | /// * There's a limited number of slots (currently 5). |
| 93 | /// * There's no way to wait for a place in it or for a value. If value is not available, `None` is |
| 94 | /// returned. If there's no space for a value, the value is silently dropped. |
| 95 | /// |
| 96 | /// In exchange for that, all the operations on that channel are async-signal-safe. That means it |
| 97 | /// is possible to use it to communicate between a signal handler and the rest of the world with it |
| 98 | /// (specifically, it's designed to send information from the handler to the rest of the |
| 99 | /// application). The throwing out of values when full is in line with collating of the same type |
| 100 | /// in kernel (you should not use the same channel for multiple different signals). |
| 101 | /// |
| 102 | /// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC |
| 103 | /// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal |
| 104 | /// at the same time). The channel is not responsible for wakeups. |
| 105 | /// |
| 106 | /// While the channel is async-signal-safe, you still need to make sure *creating* of the values is |
| 107 | /// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc). |
| 108 | /// |
| 109 | /// The code was *not* tuned for performance (signals are not expected to happen often). |
| 110 | pub struct Channel<T> { |
| 111 | storage: [UnsafeCell<Option<T>>; SLOTS], |
| 112 | empty: AtomicU16, |
| 113 | full: AtomicU16, |
| 114 | } |
| 115 | |
| 116 | impl<T> Channel<T> { |
| 117 | /// Creates a new channel with nothing in it. |
| 118 | pub fn new() -> Self { |
| 119 | let storage = Default::default(); |
| 120 | let me = Self { |
| 121 | storage, |
| 122 | empty: AtomicU16::new(0), |
| 123 | full: AtomicU16::new(0), |
| 124 | }; |
| 125 | |
| 126 | for i in 1..SLOTS + 1 { |
| 127 | enqueue(&me.empty, i as u16); |
| 128 | } |
| 129 | |
| 130 | me |
| 131 | } |
| 132 | |
| 133 | /// Inserts a value into the channel. |
| 134 | /// |
| 135 | /// If the value doesn't fit, it is silently dropped. Never blocks. |
| 136 | pub fn send(&self, val: T) { |
| 137 | if let Some(empty_idx) = dequeue(&self.empty) { |
| 138 | unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) }; |
| 139 | enqueue(&self.full, empty_idx); |
| 140 | } |
| 141 | } |
| 142 | |
| 143 | /// Takes a value from the channel. |
| 144 | /// |
| 145 | /// Or returns `None` if the channel is empty. Never blocks. |
| 146 | pub fn recv(&self) -> Option<T> { |
| 147 | dequeue(&self.full).map(|idx| { |
| 148 | let result = unsafe { &mut *self.storage[idx as usize - 1].get() } |
| 149 | .take() |
| 150 | .expect("Full slot with nothing in it" ); |
| 151 | enqueue(&self.empty, idx); |
| 152 | result |
| 153 | }) |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | impl<T> Default for Channel<T> { |
| 158 | fn default() -> Self { |
| 159 | Self::new() |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | unsafe impl<T: Send> Send for Channel<T> {} |
| 164 | |
| 165 | // Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs |
| 166 | // on them. |
| 167 | unsafe impl<T: Send> Sync for Channel<T> {} |
| 168 | |
| 169 | #[cfg (test)] |
| 170 | mod tests { |
| 171 | use std::sync::Arc; |
| 172 | use std::thread; |
| 173 | |
| 174 | use super::*; |
| 175 | |
| 176 | #[test ] |
| 177 | fn new_empty() { |
| 178 | let channel = Channel::<usize>::new(); |
| 179 | assert!(channel.recv().is_none()); |
| 180 | assert!(channel.recv().is_none()); |
| 181 | } |
| 182 | |
| 183 | #[test ] |
| 184 | fn pass_value() { |
| 185 | let channel = Channel::new(); |
| 186 | channel.send(42); |
| 187 | assert_eq!(42, channel.recv().unwrap()); |
| 188 | assert!(channel.recv().is_none()); |
| 189 | } |
| 190 | |
| 191 | #[test ] |
| 192 | fn multiple() { |
| 193 | let channel = Channel::new(); |
| 194 | for i in 0..1000 { |
| 195 | channel.send(i); |
| 196 | assert_eq!(i, channel.recv().unwrap()); |
| 197 | assert!(channel.recv().is_none()); |
| 198 | } |
| 199 | } |
| 200 | |
| 201 | #[test ] |
| 202 | fn overflow() { |
| 203 | let channel = Channel::new(); |
| 204 | for i in 0..10 { |
| 205 | channel.send(i); |
| 206 | } |
| 207 | for i in 0..5 { |
| 208 | assert_eq!(i, channel.recv().unwrap()); |
| 209 | } |
| 210 | assert!(channel.recv().is_none()); |
| 211 | } |
| 212 | |
| 213 | #[test ] |
| 214 | fn multi_thread() { |
| 215 | let channel = Arc::new(Channel::<usize>::new()); |
| 216 | |
| 217 | let sender = thread::spawn({ |
| 218 | let channel = Arc::clone(&channel); |
| 219 | move || { |
| 220 | for i in 0..4 { |
| 221 | channel.send(i); |
| 222 | } |
| 223 | } |
| 224 | }); |
| 225 | |
| 226 | let mut results = Vec::new(); |
| 227 | while results.len() < 4 { |
| 228 | results.extend(channel.recv()); |
| 229 | } |
| 230 | |
| 231 | assert_eq!(vec![0, 1, 2, 3], results); |
| 232 | |
| 233 | sender.join().unwrap(); |
| 234 | } |
| 235 | } |
| 236 | |