1//! A restricted channel to pass data from signal handler.
2//!
3//! When trying to communicate data from signal handler to the outside world, one can use an atomic
4//! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for
5//! larger data.
6//!
7//! This module provides a channel that can be used for that purpose. It is used by certain
8//! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom
9//! actions. In general, this is not a ready-made end-user API.
10//!
11//! # How does it work
12//!
13//! Each channel has a fixed number of slots and two queues (one for empty slots, one for full
14//! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the
15//! full one. Outside of signal handler, it can take the value out of the full queue and return the
16//! slot to the empty queue.
17//!
18//! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are
19//! stored in an atomic variable.
20//!
21//! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or
22//! filled).
23//!
24//! # Fallible allocation of a slot
25//!
26//! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In
27//! such case, there's no way to send the new value out of the handler (there's no way to safely
28//! wait for a slot to appear, because the handler can be blocking the thread that is responsible
29//! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds
30//! of signals together if they are not consumed by application fast enough and there are no free
31//! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole
32//! system will yield a signal.
33//!
34//! This assumes that separate signals don't share the same buffer and that there's only one reader
35//! (using multiple readers is still safe, but it is possible that all slots would be inside the
36//! readers, but already empty, so the above argument would not hold).
37
38// TODO: Other sizes? Does anyone need more than 5 slots?
39
40use std::cell::UnsafeCell;
41use std::sync::atomic::{AtomicU16, Ordering};
42
43const SLOTS: usize = 5;
44const BITS: u16 = 3;
45const MASK: u16 = 0b111;
46
47fn get(n: u16, idx: u16) -> u16 {
48 (n >> (BITS * idx)) & MASK
49}
50
51fn set(n: u16, idx: u16, v: u16) -> u16 {
52 let v: u16 = v << (BITS * idx);
53 let mask: u16 = MASK << (BITS * idx);
54 (n & !mask) | v
55}
56
57fn enqueue(q: &AtomicU16, val: u16) {
58 let mut current: u16 = q.load(order:Ordering::Relaxed);
59 loop {
60 let empty: u16 = (0..SLOTS as u16)
61 .find(|i| get(current, *i) == 0)
62 .expect(msg:"No empty slot available");
63 let modified: u16 = set(n:current, idx:empty, v:val);
64 match q.compare_exchange_weak(current, new:modified, success:Ordering::Release, failure:Ordering::Relaxed) {
65 Ok(_) => break,
66 Err(changed: u16) => current = changed, // And retry with the changed value
67 }
68 }
69}
70
71fn dequeue(q: &AtomicU16) -> Option<u16> {
72 let mut current: u16 = q.load(order:Ordering::Relaxed);
73 loop {
74 let val: u16 = current & MASK;
75 // It's completely empty
76 if val == 0 {
77 break None;
78 }
79 let modified: u16 = current >> BITS;
80 match q.compare_exchange_weak(current, new:modified, success:Ordering::Acquire, failure:Ordering::Relaxed) {
81 Ok(_) => break Some(val),
82 Err(changed: u16) => current = changed,
83 }
84 }
85}
86
87/// A restricted async-signal-safe channel
88///
89/// This is a bit like the usual channel used for inter-thread communication, but with several
90/// restrictions:
91///
92/// * There's a limited number of slots (currently 5).
93/// * There's no way to wait for a place in it or for a value. If value is not available, `None` is
94/// returned. If there's no space for a value, the value is silently dropped.
95///
96/// In exchange for that, all the operations on that channel are async-signal-safe. That means it
97/// is possible to use it to communicate between a signal handler and the rest of the world with it
98/// (specifically, it's designed to send information from the handler to the rest of the
99/// application). The throwing out of values when full is in line with collating of the same type
100/// in kernel (you should not use the same channel for multiple different signals).
101///
102/// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC
103/// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal
104/// at the same time). The channel is not responsible for wakeups.
105///
106/// While the channel is async-signal-safe, you still need to make sure *creating* of the values is
107/// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc).
108///
109/// The code was *not* tuned for performance (signals are not expected to happen often).
110pub struct Channel<T> {
111 storage: [UnsafeCell<Option<T>>; SLOTS],
112 empty: AtomicU16,
113 full: AtomicU16,
114}
115
116impl<T> Channel<T> {
117 /// Creates a new channel with nothing in it.
118 pub fn new() -> Self {
119 let storage = Default::default();
120 let me = Self {
121 storage,
122 empty: AtomicU16::new(0),
123 full: AtomicU16::new(0),
124 };
125
126 for i in 1..SLOTS + 1 {
127 enqueue(&me.empty, i as u16);
128 }
129
130 me
131 }
132
133 /// Inserts a value into the channel.
134 ///
135 /// If the value doesn't fit, it is silently dropped. Never blocks.
136 pub fn send(&self, val: T) {
137 if let Some(empty_idx) = dequeue(&self.empty) {
138 unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) };
139 enqueue(&self.full, empty_idx);
140 }
141 }
142
143 /// Takes a value from the channel.
144 ///
145 /// Or returns `None` if the channel is empty. Never blocks.
146 pub fn recv(&self) -> Option<T> {
147 dequeue(&self.full).map(|idx| {
148 let result = unsafe { &mut *self.storage[idx as usize - 1].get() }
149 .take()
150 .expect("Full slot with nothing in it");
151 enqueue(&self.empty, idx);
152 result
153 })
154 }
155}
156
157impl<T> Default for Channel<T> {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163unsafe impl<T: Send> Send for Channel<T> {}
164
165// Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs
166// on them.
167unsafe impl<T: Send> Sync for Channel<T> {}
168
169#[cfg(test)]
170mod tests {
171 use std::sync::Arc;
172 use std::thread;
173
174 use super::*;
175
176 #[test]
177 fn new_empty() {
178 let channel = Channel::<usize>::new();
179 assert!(channel.recv().is_none());
180 assert!(channel.recv().is_none());
181 }
182
183 #[test]
184 fn pass_value() {
185 let channel = Channel::new();
186 channel.send(42);
187 assert_eq!(42, channel.recv().unwrap());
188 assert!(channel.recv().is_none());
189 }
190
191 #[test]
192 fn multiple() {
193 let channel = Channel::new();
194 for i in 0..1000 {
195 channel.send(i);
196 assert_eq!(i, channel.recv().unwrap());
197 assert!(channel.recv().is_none());
198 }
199 }
200
201 #[test]
202 fn overflow() {
203 let channel = Channel::new();
204 for i in 0..10 {
205 channel.send(i);
206 }
207 for i in 0..5 {
208 assert_eq!(i, channel.recv().unwrap());
209 }
210 assert!(channel.recv().is_none());
211 }
212
213 #[test]
214 fn multi_thread() {
215 let channel = Arc::new(Channel::<usize>::new());
216
217 let sender = thread::spawn({
218 let channel = Arc::clone(&channel);
219 move || {
220 for i in 0..4 {
221 channel.send(i);
222 }
223 }
224 });
225
226 let mut results = Vec::new();
227 while results.len() < 4 {
228 results.extend(channel.recv());
229 }
230
231 assert_eq!(vec![0, 1, 2, 3], results);
232
233 sender.join().unwrap();
234 }
235}
236