1//! A backend module for implementing the iterator like
2//! [`iterator`][crate::iterator] module and the asynchronous
3//! adapter crates.
4//!
5//! This module contains generic types which abstract over the concrete
6//! IO type for the self-pipe. The motivation for having this abstraction
7//! are the adapter crates for different asynchronous runtimes. The runtimes
8//! provide their own wrappers for [`std::os::unix::net::UnixStream`]
9//! which should be used as the internal self pipe. But large parts of the
10//! remaining functionality doesn't depend directly onto the IO type and can
11//! be reused.
12//!
13//! See also the [`SignalDelivery::with_pipe`] method for more information
14//! about requirements the IO types have to fulfill.
15//!
16//! As a regular user you shouldn't need to use the types in this module.
17//! Use the [`Signals`][crate::iterator::Signals] struct or one of the types
18//! contained in the adapter libraries instead.
19
20use std::borrow::{Borrow, BorrowMut};
21use std::fmt::{Debug, Formatter, Result as FmtResult};
22use std::io::Error;
23use std::mem::MaybeUninit;
24use std::os::unix::io::AsRawFd;
25use std::ptr;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Mutex};
28
29use libc::{self, c_int};
30
31use super::exfiltrator::Exfiltrator;
32use crate::low_level::pipe::{self, WakeMethod};
33use crate::SigId;
34
35/// Maximal signal number we support.
36const MAX_SIGNUM: usize = 128;
37
38trait SelfPipeWrite: Debug + Send + Sync {
39 fn wake_readers(&self);
40}
41
42impl<W: AsRawFd + Debug + Send + Sync> SelfPipeWrite for W {
43 fn wake_readers(&self) {
44 pipe::wake(self.as_raw_fd(), method:WakeMethod::Send);
45 }
46}
47
48#[derive(Debug)]
49struct DeliveryState {
50 closed: AtomicBool,
51 registered_signal_ids: Mutex<Vec<Option<SigId>>>,
52}
53
54impl DeliveryState {
55 fn new() -> Self {
56 let ids: Vec> = (0..MAX_SIGNUM).map(|_| None).collect();
57 Self {
58 closed: AtomicBool::new(false),
59 registered_signal_ids: Mutex::new(ids),
60 }
61 }
62}
63
64impl Drop for DeliveryState {
65 fn drop(&mut self) {
66 let lock: MutexGuard<'_, Vec>> = self.registered_signal_ids.lock().unwrap();
67 for id: SigId in lock.iter().filter_map(|s: &Option| *s) {
68 crate::low_level::unregister(id);
69 }
70 }
71}
72
73struct PendingSignals<E: Exfiltrator> {
74 exfiltrator: E,
75 slots: [E::Storage; MAX_SIGNUM],
76}
77
78impl<E: Exfiltrator> PendingSignals<E> {
79 fn new(exfiltrator: E) -> Self {
80 // Unfortunately, Default is not implemented for long arrays :-(
81 //
82 // Note that if the default impl panics, the already existing instances are leaked.
83 let mut slots: MaybeUninit<[::Storage; 128]> = MaybeUninit::<[E::Storage; MAX_SIGNUM]>::uninit();
84 for i: usize in 0..MAX_SIGNUM {
85 unsafe {
86 let slot: *mut E::Storage = slots.as_mut_ptr() as *mut _;
87 let slot: *mut ::Storage = slot.add(count:i);
88 ptr::write(dst:slot, E::Storage::default());
89 }
90 }
91
92 Self {
93 exfiltrator,
94 slots: unsafe { slots.assume_init() },
95 }
96 }
97}
98
99/// An internal trait to hide adding new signals into a Handle behind a dynamic dispatch.
100trait AddSignal: Debug + Send + Sync {
101 fn add_signal(
102 self: Arc<Self>,
103 write: Arc<dyn SelfPipeWrite>,
104 signal: c_int,
105 ) -> Result<SigId, Error>;
106}
107
108// Implemented manually because 1.36.0 doesn't yet support Debug for [X; BIG_NUMBER].
109impl<E: Exfiltrator> Debug for PendingSignals<E> {
110 fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
111 fmt&mut DebugStruct<'_, '_>.debug_struct("PendingSignals")
112 .field("exfiltrator", &self.exfiltrator)
113 // While the array does not, the slice does implement Debug
114 .field(name:"slots", &&self.slots[..])
115 .finish()
116 }
117}
118
119impl<E: Exfiltrator> AddSignal for PendingSignals<E> {
120 fn add_signal(
121 self: Arc<Self>,
122 write: Arc<dyn SelfPipeWrite>,
123 signal: c_int,
124 ) -> Result<SigId, Error> {
125 assert!(signal >= 0);
126 assert!(
127 (signal as usize) < MAX_SIGNUM,
128 "Signal number {} too large. If your OS really supports such signal, file a bug",
129 signal,
130 );
131 assert!(
132 self.exfiltrator.supports_signal(signal),
133 "Signal {} not supported by exfiltrator {:?}",
134 signal,
135 self.exfiltrator,
136 );
137 self.exfiltrator.init(&self.slots[signal as usize], signal);
138
139 let action = move |act: &_| {
140 let slot = &self.slots[signal as usize];
141 let ex = &self.exfiltrator;
142 ex.store(slot, signal, act);
143 write.wake_readers();
144 };
145 let id = unsafe { signal_hook_registry::register_sigaction(signal, action) }?;
146 Ok(id)
147 }
148}
149
150/// A struct to control an instance of an associated type
151/// (like for example [`Signals`][super::Signals]).
152///
153/// It allows to register more signal handlers and to shutdown the signal
154/// delivery. You can [`clone`][Handle::clone] this type which isn't a
155/// very expensive operation. The cloned instances can be shared between
156/// multiple threads.
157#[derive(Debug, Clone)]
158pub struct Handle {
159 pending: Arc<dyn AddSignal>,
160 write: Arc<dyn SelfPipeWrite>,
161 delivery_state: Arc<DeliveryState>,
162}
163
164impl Handle {
165 fn new<W>(write: W, pending: Arc<dyn AddSignal>) -> Self
166 where
167 W: 'static + SelfPipeWrite,
168 {
169 Self {
170 pending,
171 write: Arc::new(write),
172 delivery_state: Arc::new(DeliveryState::new()),
173 }
174 }
175
176 /// Registers another signal to the set watched by the associated instance.
177 ///
178 /// # Notes
179 ///
180 /// * This is safe to call concurrently from whatever thread.
181 /// * This is *not* safe to call from within a signal handler.
182 /// * If the signal number was already registered previously, this is a no-op.
183 /// * If this errors, the original set of signals is left intact.
184 ///
185 /// # Panics
186 ///
187 /// * If the given signal is [forbidden][crate::FORBIDDEN].
188 /// * If the signal number is negative or larger than internal limit. The limit should be
189 /// larger than any supported signal the OS supports.
190 /// * If the relevant [`Exfiltrator`] does not support this particular signal. The default
191 /// [`SignalOnly`] one supports all signals.
192 pub fn add_signal(&self, signal: c_int) -> Result<(), Error> {
193 let mut lock = self.delivery_state.registered_signal_ids.lock().unwrap();
194 // Already registered, ignoring
195 if lock[signal as usize].is_some() {
196 return Ok(());
197 }
198
199 let id = Arc::clone(&self.pending).add_signal(Arc::clone(&self.write), signal)?;
200
201 lock[signal as usize] = Some(id);
202
203 Ok(())
204 }
205
206 /// Closes the associated instance.
207 ///
208 /// This is meant to signalize termination of the signal delivery process.
209 /// After calling close:
210 ///
211 /// * [`is_closed`][Handle::is_closed] will return true.
212 /// * All currently blocking operations of associated instances
213 /// are interrupted and terminate.
214 /// * Any further operations will not block.
215 /// * Further signals may or may not be returned from the iterators. However, if any are
216 /// returned, these are real signals that happened.
217 ///
218 /// The goal is to be able to shut down any background thread that handles only the signals.
219 pub fn close(&self) {
220 self.delivery_state.closed.store(true, Ordering::SeqCst);
221 self.write.wake_readers();
222 }
223
224 /// Is it closed?
225 ///
226 /// See [`close`][Handle::close].
227 pub fn is_closed(&self) -> bool {
228 self.delivery_state.closed.load(Ordering::SeqCst)
229 }
230}
231
232/// A struct for delivering received signals to the main program flow.
233/// The self-pipe IO type is generic. See the
234/// [`with_pipe`][SignalDelivery::with_pipe] method for requirements
235/// for the IO type.
236#[derive(Debug)]
237pub struct SignalDelivery<R, E: Exfiltrator> {
238 read: R,
239 handle: Handle,
240 pending: Arc<PendingSignals<E>>,
241}
242
243impl<R, E: Exfiltrator> SignalDelivery<R, E>
244where
245 R: 'static + AsRawFd + Send + Sync,
246{
247 /// Creates the `SignalDelivery` structure.
248 ///
249 /// The read and write arguments must be the ends of a suitable pipe type. These are used
250 /// for communication between the signal handler and main program flow.
251 ///
252 /// Registers all the signals listed. The same restrictions (panics, errors) apply as with
253 /// [`add_signal`][Handle::add_signal].
254 ///
255 /// # Requirements for the pipe type
256 ///
257 /// * Must support [`send`](https://man7.org/linux/man-pages/man2/send.2.html) for
258 /// asynchronously writing bytes to the write end
259 /// * Must support [`recv`](https://man7.org/linux/man-pages/man2/recv.2.html) for
260 /// reading bytes from the read end
261 ///
262 /// So UnixStream is a good choice for this.
263 pub fn with_pipe<I, S, W>(read: R, write: W, exfiltrator: E, signals: I) -> Result<Self, Error>
264 where
265 I: IntoIterator<Item = S>,
266 S: Borrow<c_int>,
267 W: 'static + AsRawFd + Debug + Send + Sync,
268 {
269 let pending = Arc::new(PendingSignals::new(exfiltrator));
270 let pending_add_signal = Arc::clone(&pending);
271 let handle = Handle::new(write, pending_add_signal);
272 let me = Self {
273 read,
274 handle,
275 pending,
276 };
277 for sig in signals {
278 me.handle.add_signal(*sig.borrow())?;
279 }
280 Ok(me)
281 }
282
283 /// Get a reference to the read end of the self pipe
284 ///
285 /// You may use this method to register the underlying file descriptor
286 /// with an eventing system (e. g. epoll) to get notified if there are
287 /// bytes in the pipe. If the event system reports the file descriptor
288 /// ready for reading you can then call [`pending`][SignalDelivery::pending]
289 /// to get the arrived signals.
290 pub fn get_read(&self) -> &R {
291 &self.read
292 }
293
294 /// Get a mutable reference to the read end of the self pipe
295 ///
296 /// See the [`get_read`][SignalDelivery::get_read] method for some additional
297 /// information.
298 pub fn get_read_mut(&mut self) -> &mut R {
299 &mut self.read
300 }
301
302 /// Drains all data from the internal self-pipe. This method will never block.
303 fn flush(&mut self) {
304 const SIZE: usize = 1024;
305 let mut buff = [0u8; SIZE];
306
307 unsafe {
308 // Draining the data in the self pipe. We ignore all errors on purpose. This
309 // should not be something like closed file descriptor. It could EAGAIN, but
310 // that's OK in case we say MSG_DONTWAIT. If it's EINTR, then it's OK too,
311 // it'll only create a spurious wakeup.
312 #[cfg(target_os = "aix")]
313 let nowait_flag = libc::MSG_NONBLOCK;
314 #[cfg(not(target_os = "aix"))]
315 let nowait_flag = libc::MSG_DONTWAIT;
316 while libc::recv(
317 self.read.as_raw_fd(),
318 buff.as_mut_ptr() as *mut libc::c_void,
319 SIZE,
320 nowait_flag,
321 ) > 0
322 {}
323 }
324 }
325
326 /// Returns an iterator of already received signals.
327 ///
328 /// This returns an iterator over all the signal numbers of the signals received since last
329 /// time they were read (out of the set registered by this `SignalDelivery` instance). Note
330 /// that they are returned in arbitrary order and a signal number is returned only once even
331 /// if it was received multiple times.
332 ///
333 /// This method returns immediately (does not block) and may produce an empty iterator if
334 /// there are no signals ready.
335 pub fn pending(&mut self) -> Pending<E> {
336 self.flush();
337 Pending::new(Arc::clone(&self.pending))
338 }
339
340 /// Checks the reading end of the self pipe for available signals.
341 ///
342 /// If there are no signals available or this instance was already closed it returns
343 /// [`Option::None`]. If there are some signals it returns a [`Pending`]
344 /// instance wrapped inside a [`Option::Some`]. However, due to implementation details,
345 /// this still can produce an empty iterator.
346 ///
347 /// This method doesn't check the reading end by itself but uses the passed in callback.
348 /// This method blocks if and only if the callback blocks trying to read some bytes.
349 pub fn poll_pending<F>(&mut self, has_signals: &mut F) -> Result<Option<Pending<E>>, Error>
350 where
351 F: FnMut(&mut R) -> Result<bool, Error>,
352 {
353 if self.handle.is_closed() {
354 return Ok(None);
355 }
356
357 match has_signals(self.get_read_mut()) {
358 Ok(false) => Ok(None),
359 Ok(true) => Ok(Some(self.pending())),
360 Err(err) => Err(err),
361 }
362 }
363
364 /// Get a [`Handle`] for this `SignalDelivery` instance.
365 ///
366 /// This can be used to add further signals or close the whole
367 /// signal delivery mechanism.
368 pub fn handle(&self) -> Handle {
369 self.handle.clone()
370 }
371}
372
373/// The iterator of one batch of signals.
374///
375/// This is returned by the [`pending`][SignalDelivery::pending] method.
376#[derive(Debug)]
377pub struct Pending<E: Exfiltrator> {
378 pending: Arc<PendingSignals<E>>,
379 position: usize,
380}
381
382impl<E: Exfiltrator> Pending<E> {
383 fn new(pending: Arc<PendingSignals<E>>) -> Self {
384 Self {
385 pending,
386 position: 0,
387 }
388 }
389}
390
391impl<E: Exfiltrator> Iterator for Pending<E> {
392 type Item = E::Output;
393
394 fn next(&mut self) -> Option<E::Output> {
395 while self.position < self.pending.slots.len() {
396 let sig: usize = self.position;
397 let slot: &::Storage = &self.pending.slots[sig];
398 let result: Option<::Output> = self.pending.exfiltrator.load(slot, signal:sig as c_int);
399 if result.is_some() {
400 return result;
401 } else {
402 self.position += 1;
403 }
404 }
405
406 None
407 }
408}
409
410/// Possible results of the [`poll_signal`][SignalIterator::poll_signal] function.
411pub enum PollResult<O> {
412 /// A signal arrived
413 Signal(O),
414 /// There are no signals yet but there may arrive some in the future
415 Pending,
416 /// The iterator was closed. There won't be any signals reported from now on.
417 Closed,
418 /// An error happened during polling for arrived signals.
419 Err(Error),
420}
421
422/// An infinite iterator of received signals.
423pub struct SignalIterator<SD, E: Exfiltrator> {
424 signals: SD,
425 iter: Pending<E>,
426}
427
428impl<SD, E: Exfiltrator> SignalIterator<SD, E> {
429 /// Create a new infinite iterator for signals registered with the passed
430 /// in [`SignalDelivery`] instance.
431 pub fn new<R>(mut signals: SD) -> Self
432 where
433 SD: BorrowMut<SignalDelivery<R, E>>,
434 R: 'static + AsRawFd + Send + Sync,
435 {
436 let iter = signals.borrow_mut().pending();
437 Self { signals, iter }
438 }
439
440 /// Return a signal if there is one or tell the caller that there is none at the moment.
441 ///
442 /// You have to pass in a callback which checks the underlying reading end of the pipe if
443 /// there may be any pending signals. This callback may or may not block. If the callback
444 /// returns [`true`] this method will try to fetch the next signal and return it as a
445 /// [`PollResult::Signal`]. If the callback returns [`false`] the method will return
446 /// [`PollResult::Pending`] and assume it will be called again at a later point in time.
447 /// The callback may be called any number of times by this function.
448 ///
449 /// If the iterator was closed by the [`close`][Handle::close] method of the associated
450 /// [`Handle`] this method will return [`PollResult::Closed`].
451 pub fn poll_signal<R, F>(&mut self, has_signals: &mut F) -> PollResult<E::Output>
452 where
453 SD: BorrowMut<SignalDelivery<R, E>>,
454 R: 'static + AsRawFd + Send + Sync,
455 F: FnMut(&mut R) -> Result<bool, Error>,
456 {
457 // The loop is necessary because it is possible that a signal was already consumed
458 // by a previous pending iterator due to the asynchronous nature of signals and
459 // always moving to the end of the iterator before calling has_more.
460 while !self.signals.borrow_mut().handle.is_closed() {
461 if let Some(result) = self.iter.next() {
462 return PollResult::Signal(result);
463 }
464
465 match self.signals.borrow_mut().poll_pending(has_signals) {
466 Ok(Some(pending)) => self.iter = pending,
467 Ok(None) => return PollResult::Pending,
468 Err(err) => return PollResult::Err(err),
469 }
470 }
471
472 PollResult::Closed
473 }
474
475 /// Get a shareable [`Handle`] for this instance.
476 ///
477 /// This can be used to add further signals or terminate the whole
478 /// signal iteration using the [`close`][Handle::close] method.
479 pub fn handle<R>(&self) -> Handle
480 where
481 SD: Borrow<SignalDelivery<R, E>>,
482 R: 'static + AsRawFd + Send + Sync,
483 {
484 self.signals.borrow().handle()
485 }
486}
487
488/// A signal iterator which consumes a [`SignalDelivery`] instance and takes
489/// ownership of it.
490pub type OwningSignalIterator<R, E> = SignalIterator<SignalDelivery<R, E>, E>;
491
492/// A signal iterator which takes a mutable reference to a [`SignalDelivery`]
493/// instance.
494pub type RefSignalIterator<'a, R, E> = SignalIterator<&'a mut SignalDelivery<R, E>, E>;
495