1//! A version of the reaper that waits for a signal to check for process progress.
2
3use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
4use async_signal::{Signal, Signals};
5use event_listener::Event;
6use futures_lite::{future, prelude::*};
7
8use std::io;
9use std::mem;
10use std::sync::Mutex;
11
12pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
13
14/// The zombie process reaper.
15pub(crate) struct Reaper {
16 /// An event delivered every time the SIGCHLD signal occurs.
17 sigchld: Event,
18
19 /// The list of zombie processes.
20 zombies: Mutex<Vec<std::process::Child>>,
21
22 /// The pipe that delivers signal notifications.
23 pipe: Pipe,
24
25 /// Locking this mutex indicates that we are polling the SIGCHLD event.
26 driver_guard: AsyncMutex<()>,
27}
28
29impl Reaper {
30 /// Create a new reaper.
31 pub(crate) fn new() -> Self {
32 Reaper {
33 sigchld: Event::new(),
34 zombies: Mutex::new(Vec::new()),
35 pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
36 driver_guard: AsyncMutex::new(()),
37 }
38 }
39
40 /// Lock the driver thread.
41 pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
42 self.driver_guard.lock().await
43 }
44
45 /// Reap zombie processes forever.
46 pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
47 loop {
48 // Wait for the next SIGCHLD signal.
49 self.pipe.wait().await;
50
51 // Notify all listeners waiting on the SIGCHLD event.
52 self.sigchld.notify(usize::MAX);
53
54 // Reap zombie processes, but make sure we don't hold onto the lock for too long!
55 let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
56 let mut i = 0;
57 'reap_zombies: loop {
58 for _ in 0..50 {
59 if i >= zombies.len() {
60 break 'reap_zombies;
61 }
62
63 if let Ok(None) = zombies[i].try_wait() {
64 i += 1;
65 } else {
66 zombies.swap_remove(i);
67 }
68 }
69
70 // Be a good citizen; yield if there are a lot of processes.
71 //
72 // After we yield, check if there are more zombie processes.
73 future::yield_now().await;
74 zombies.append(&mut self.zombies.lock().unwrap());
75 }
76
77 // Put zombie processes back.
78 self.zombies.lock().unwrap().append(&mut zombies);
79 }
80 }
81
82 /// Register a process with this reaper.
83 pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
84 self.pipe.register(&child)?;
85 Ok(ChildGuard { inner: Some(child) })
86 }
87
88 /// Wait for an event to occur for a child process.
89 pub(crate) async fn status(
90 &'static self,
91 child: &Mutex<crate::ChildGuard>,
92 ) -> io::Result<std::process::ExitStatus> {
93 loop {
94 // Wait on the child process.
95 if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
96 return Ok(status);
97 }
98
99 // Start listening.
100 event_listener::listener!(self.sigchld => listener);
101
102 // Try again.
103 if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
104 return Ok(status);
105 }
106
107 // Wait on the listener.
108 listener.await;
109 }
110 }
111
112 /// Do we have any registered zombie processes?
113 pub(crate) fn has_zombies(&'static self) -> bool {
114 !self
115 .zombies
116 .lock()
117 .unwrap_or_else(|x| x.into_inner())
118 .is_empty()
119 }
120}
121
122/// The wrapper around the child.
123pub(crate) struct ChildGuard {
124 inner: Option<std::process::Child>,
125}
126
127impl ChildGuard {
128 /// Get a mutable reference to the inner child.
129 pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
130 self.inner.as_mut().unwrap()
131 }
132
133 /// Begin the reaping process for this child.
134 pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
135 if let Ok(None) = self.get_mut().try_wait() {
136 reaperMutexGuard<'_, Vec>
137 .zombies
138 .lock()
139 .unwrap()
140 .push(self.inner.take().unwrap());
141 }
142 }
143}
144
145/// Waits for the next SIGCHLD signal.
146struct Pipe {
147 /// The iterator over SIGCHLD signals.
148 signals: Signals,
149}
150
151impl Pipe {
152 /// Creates a new pipe.
153 fn new() -> io::Result<Pipe> {
154 Ok(Pipe {
155 signals: Signals::new(signals:Some(Signal::Child))?,
156 })
157 }
158
159 /// Waits for the next SIGCHLD signal.
160 async fn wait(&self) {
161 (&self.signals).next().await;
162 }
163
164 /// Register a process object into this pipe.
165 fn register(&self, _child: &std::process::Child) -> io::Result<()> {
166 Ok(())
167 }
168}
169