1 | //! A restricted channel to pass data from signal handler. |
2 | //! |
3 | //! When trying to communicate data from signal handler to the outside world, one can use an atomic |
4 | //! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for |
5 | //! larger data. |
6 | //! |
7 | //! This module provides a channel that can be used for that purpose. It is used by certain |
8 | //! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom |
9 | //! actions. In general, this is not a ready-made end-user API. |
10 | //! |
11 | //! # How does it work |
12 | //! |
13 | //! Each channel has a fixed number of slots and two queues (one for empty slots, one for full |
14 | //! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the |
15 | //! full one. Outside of signal handler, it can take the value out of the full queue and return the |
16 | //! slot to the empty queue. |
17 | //! |
18 | //! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are |
19 | //! stored in an atomic variable. |
20 | //! |
21 | //! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or |
22 | //! filled). |
23 | //! |
24 | //! # Fallible allocation of a slot |
25 | //! |
26 | //! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In |
27 | //! such case, there's no way to send the new value out of the handler (there's no way to safely |
28 | //! wait for a slot to appear, because the handler can be blocking the thread that is responsible |
29 | //! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds |
30 | //! of signals together if they are not consumed by application fast enough and there are no free |
31 | //! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole |
32 | //! system will yield a signal. |
33 | //! |
34 | //! This assumes that separate signals don't share the same buffer and that there's only one reader |
35 | //! (using multiple readers is still safe, but it is possible that all slots would be inside the |
36 | //! readers, but already empty, so the above argument would not hold). |
37 | |
38 | // TODO: Other sizes? Does anyone need more than 5 slots? |
39 | |
40 | use std::cell::UnsafeCell; |
41 | use std::sync::atomic::{AtomicU16, Ordering}; |
42 | |
43 | const SLOTS: usize = 5; |
44 | const BITS: u16 = 3; |
45 | const MASK: u16 = 0b111; |
46 | |
47 | fn get(n: u16, idx: u16) -> u16 { |
48 | (n >> (BITS * idx)) & MASK |
49 | } |
50 | |
51 | fn set(n: u16, idx: u16, v: u16) -> u16 { |
52 | let v: u16 = v << (BITS * idx); |
53 | let mask: u16 = MASK << (BITS * idx); |
54 | (n & !mask) | v |
55 | } |
56 | |
57 | fn enqueue(q: &AtomicU16, val: u16) { |
58 | let mut current: u16 = q.load(order:Ordering::Relaxed); |
59 | loop { |
60 | let empty: u16 = (0..SLOTS as u16) |
61 | .find(|i| get(current, *i) == 0) |
62 | .expect(msg:"No empty slot available" ); |
63 | let modified: u16 = set(n:current, idx:empty, v:val); |
64 | match q.compare_exchange_weak(current, new:modified, success:Ordering::Release, failure:Ordering::Relaxed) { |
65 | Ok(_) => break, |
66 | Err(changed: u16) => current = changed, // And retry with the changed value |
67 | } |
68 | } |
69 | } |
70 | |
71 | fn dequeue(q: &AtomicU16) -> Option<u16> { |
72 | let mut current: u16 = q.load(order:Ordering::Relaxed); |
73 | loop { |
74 | let val: u16 = current & MASK; |
75 | // It's completely empty |
76 | if val == 0 { |
77 | break None; |
78 | } |
79 | let modified: u16 = current >> BITS; |
80 | match q.compare_exchange_weak(current, new:modified, success:Ordering::Acquire, failure:Ordering::Relaxed) { |
81 | Ok(_) => break Some(val), |
82 | Err(changed: u16) => current = changed, |
83 | } |
84 | } |
85 | } |
86 | |
87 | /// A restricted async-signal-safe channel |
88 | /// |
89 | /// This is a bit like the usual channel used for inter-thread communication, but with several |
90 | /// restrictions: |
91 | /// |
92 | /// * There's a limited number of slots (currently 5). |
93 | /// * There's no way to wait for a place in it or for a value. If value is not available, `None` is |
94 | /// returned. If there's no space for a value, the value is silently dropped. |
95 | /// |
96 | /// In exchange for that, all the operations on that channel are async-signal-safe. That means it |
97 | /// is possible to use it to communicate between a signal handler and the rest of the world with it |
98 | /// (specifically, it's designed to send information from the handler to the rest of the |
99 | /// application). The throwing out of values when full is in line with collating of the same type |
100 | /// in kernel (you should not use the same channel for multiple different signals). |
101 | /// |
102 | /// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC |
103 | /// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal |
104 | /// at the same time). The channel is not responsible for wakeups. |
105 | /// |
106 | /// While the channel is async-signal-safe, you still need to make sure *creating* of the values is |
107 | /// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc). |
108 | /// |
109 | /// The code was *not* tuned for performance (signals are not expected to happen often). |
110 | pub struct Channel<T> { |
111 | storage: [UnsafeCell<Option<T>>; SLOTS], |
112 | empty: AtomicU16, |
113 | full: AtomicU16, |
114 | } |
115 | |
116 | impl<T> Channel<T> { |
117 | /// Creates a new channel with nothing in it. |
118 | pub fn new() -> Self { |
119 | let storage = Default::default(); |
120 | let me = Self { |
121 | storage, |
122 | empty: AtomicU16::new(0), |
123 | full: AtomicU16::new(0), |
124 | }; |
125 | |
126 | for i in 1..SLOTS + 1 { |
127 | enqueue(&me.empty, i as u16); |
128 | } |
129 | |
130 | me |
131 | } |
132 | |
133 | /// Inserts a value into the channel. |
134 | /// |
135 | /// If the value doesn't fit, it is silently dropped. Never blocks. |
136 | pub fn send(&self, val: T) { |
137 | if let Some(empty_idx) = dequeue(&self.empty) { |
138 | unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) }; |
139 | enqueue(&self.full, empty_idx); |
140 | } |
141 | } |
142 | |
143 | /// Takes a value from the channel. |
144 | /// |
145 | /// Or returns `None` if the channel is empty. Never blocks. |
146 | pub fn recv(&self) -> Option<T> { |
147 | dequeue(&self.full).map(|idx| { |
148 | let result = unsafe { &mut *self.storage[idx as usize - 1].get() } |
149 | .take() |
150 | .expect("Full slot with nothing in it" ); |
151 | enqueue(&self.empty, idx); |
152 | result |
153 | }) |
154 | } |
155 | } |
156 | |
157 | impl<T> Default for Channel<T> { |
158 | fn default() -> Self { |
159 | Self::new() |
160 | } |
161 | } |
162 | |
163 | unsafe impl<T: Send> Send for Channel<T> {} |
164 | |
165 | // Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs |
166 | // on them. |
167 | unsafe impl<T: Send> Sync for Channel<T> {} |
168 | |
169 | #[cfg (test)] |
170 | mod tests { |
171 | use std::sync::Arc; |
172 | use std::thread; |
173 | |
174 | use super::*; |
175 | |
176 | #[test ] |
177 | fn new_empty() { |
178 | let channel = Channel::<usize>::new(); |
179 | assert!(channel.recv().is_none()); |
180 | assert!(channel.recv().is_none()); |
181 | } |
182 | |
183 | #[test ] |
184 | fn pass_value() { |
185 | let channel = Channel::new(); |
186 | channel.send(42); |
187 | assert_eq!(42, channel.recv().unwrap()); |
188 | assert!(channel.recv().is_none()); |
189 | } |
190 | |
191 | #[test ] |
192 | fn multiple() { |
193 | let channel = Channel::new(); |
194 | for i in 0..1000 { |
195 | channel.send(i); |
196 | assert_eq!(i, channel.recv().unwrap()); |
197 | assert!(channel.recv().is_none()); |
198 | } |
199 | } |
200 | |
201 | #[test ] |
202 | fn overflow() { |
203 | let channel = Channel::new(); |
204 | for i in 0..10 { |
205 | channel.send(i); |
206 | } |
207 | for i in 0..5 { |
208 | assert_eq!(i, channel.recv().unwrap()); |
209 | } |
210 | assert!(channel.recv().is_none()); |
211 | } |
212 | |
213 | #[test ] |
214 | fn multi_thread() { |
215 | let channel = Arc::new(Channel::<usize>::new()); |
216 | |
217 | let sender = thread::spawn({ |
218 | let channel = Arc::clone(&channel); |
219 | move || { |
220 | for i in 0..4 { |
221 | channel.send(i); |
222 | } |
223 | } |
224 | }); |
225 | |
226 | let mut results = Vec::new(); |
227 | while results.len() < 4 { |
228 | results.extend(channel.recv()); |
229 | } |
230 | |
231 | assert_eq!(vec![0, 1, 2, 3], results); |
232 | |
233 | sender.join().unwrap(); |
234 | } |
235 | } |
236 | |