1use crate::loom::sync::{Mutex, MutexGuard};
2use crate::runtime::signal::Handle as SignalHandle;
3use crate::signal::unix::{signal_with_handle, SignalKind};
4use crate::sync::watch;
5use std::io;
6use std::process::ExitStatus;
7
8/// An interface for waiting on a process to exit.
9pub(crate) trait Wait {
10 /// Get the identifier for this process or diagnostics.
11 fn id(&self) -> u32;
12 /// Try waiting for a process to exit in a non-blocking manner.
13 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
14}
15
16impl<T: Wait> Wait for &mut T {
17 fn id(&self) -> u32 {
18 (**self).id()
19 }
20
21 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
22 (**self).try_wait()
23 }
24}
25
26/// An interface for queueing up an orphaned process so that it can be reaped.
27pub(crate) trait OrphanQueue<T> {
28 /// Adds an orphan to the queue.
29 fn push_orphan(&self, orphan: T);
30}
31
32impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
33 fn push_orphan(&self, orphan: T) {
34 (**self).push_orphan(orphan);
35 }
36}
37
38/// An implementation of `OrphanQueue`.
39#[derive(Debug)]
40pub(crate) struct OrphanQueueImpl<T> {
41 sigchild: Mutex<Option<watch::Receiver<()>>>,
42 queue: Mutex<Vec<T>>,
43}
44
45impl<T> OrphanQueueImpl<T> {
46 cfg_not_has_const_mutex_new! {
47 pub(crate) fn new() -> Self {
48 Self {
49 sigchild: Mutex::new(None),
50 queue: Mutex::new(Vec::new()),
51 }
52 }
53 }
54
55 cfg_has_const_mutex_new! {
56 pub(crate) const fn new() -> Self {
57 Self {
58 sigchild: Mutex::const_new(None),
59 queue: Mutex::const_new(Vec::new()),
60 }
61 }
62 }
63
64 #[cfg(test)]
65 fn len(&self) -> usize {
66 self.queue.lock().len()
67 }
68
69 pub(crate) fn push_orphan(&self, orphan: T)
70 where
71 T: Wait,
72 {
73 self.queue.lock().push(orphan);
74 }
75
76 /// Attempts to reap every process in the queue, ignoring any errors and
77 /// enqueueing any orphans which have not yet exited.
78 pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
79 where
80 T: Wait,
81 {
82 // If someone else is holding the lock, they will be responsible for draining
83 // the queue as necessary, so we can safely bail if that happens
84 if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
85 match &mut *sigchild_guard {
86 Some(sigchild) => {
87 if sigchild.try_has_changed().and_then(Result::ok).is_some() {
88 drain_orphan_queue(self.queue.lock());
89 }
90 }
91 None => {
92 let queue = self.queue.lock();
93
94 // Be lazy and only initialize the SIGCHLD listener if there
95 // are any orphaned processes in the queue.
96 if !queue.is_empty() {
97 // An errors shouldn't really happen here, but if it does it
98 // means that the signal driver isn't running, in
99 // which case there isn't anything we can
100 // register/initialize here, so we can try again later
101 if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
102 *sigchild_guard = Some(sigchild);
103 drain_orphan_queue(queue);
104 }
105 }
106 }
107 }
108 }
109 }
110}
111
112fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
113where
114 T: Wait,
115{
116 for i in (0..queue.len()).rev() {
117 match queue[i].try_wait() {
118 Ok(None) => {}
119 Ok(Some(_)) | Err(_) => {
120 // The stdlib handles interruption errors (EINTR) when polling a child process.
121 // All other errors represent invalid inputs or pids that have already been
122 // reaped, so we can drop the orphan in case an error is raised.
123 queue.swap_remove(i);
124 }
125 }
126 }
127
128 drop(queue);
129}
130
131#[cfg(all(test, not(loom)))]
132pub(crate) mod test {
133 use super::*;
134 use crate::runtime::io::Driver as IoDriver;
135 use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};
136 use crate::sync::watch;
137 use std::cell::{Cell, RefCell};
138 use std::io;
139 use std::os::unix::process::ExitStatusExt;
140 use std::process::ExitStatus;
141 use std::rc::Rc;
142
143 pub(crate) struct MockQueue<W> {
144 pub(crate) all_enqueued: RefCell<Vec<W>>,
145 }
146
147 impl<W> MockQueue<W> {
148 pub(crate) fn new() -> Self {
149 Self {
150 all_enqueued: RefCell::new(Vec::new()),
151 }
152 }
153 }
154
155 impl<W> OrphanQueue<W> for MockQueue<W> {
156 fn push_orphan(&self, orphan: W) {
157 self.all_enqueued.borrow_mut().push(orphan);
158 }
159 }
160
161 struct MockWait {
162 total_waits: Rc<Cell<usize>>,
163 num_wait_until_status: usize,
164 return_err: bool,
165 }
166
167 impl MockWait {
168 fn new(num_wait_until_status: usize) -> Self {
169 Self {
170 total_waits: Rc::new(Cell::new(0)),
171 num_wait_until_status,
172 return_err: false,
173 }
174 }
175
176 fn with_err() -> Self {
177 Self {
178 total_waits: Rc::new(Cell::new(0)),
179 num_wait_until_status: 0,
180 return_err: true,
181 }
182 }
183 }
184
185 impl Wait for MockWait {
186 fn id(&self) -> u32 {
187 42
188 }
189
190 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
191 let waits = self.total_waits.get();
192
193 let ret = if self.num_wait_until_status == waits {
194 if self.return_err {
195 Ok(Some(ExitStatus::from_raw(0)))
196 } else {
197 Err(io::Error::new(io::ErrorKind::Other, "mock err"))
198 }
199 } else {
200 Ok(None)
201 };
202
203 self.total_waits.set(waits + 1);
204 ret
205 }
206 }
207
208 #[test]
209 fn drain_attempts_a_single_reap_of_all_queued_orphans() {
210 let first_orphan = MockWait::new(0);
211 let second_orphan = MockWait::new(1);
212 let third_orphan = MockWait::new(2);
213 let fourth_orphan = MockWait::with_err();
214
215 let first_waits = first_orphan.total_waits.clone();
216 let second_waits = second_orphan.total_waits.clone();
217 let third_waits = third_orphan.total_waits.clone();
218 let fourth_waits = fourth_orphan.total_waits.clone();
219
220 let orphanage = OrphanQueueImpl::new();
221 orphanage.push_orphan(first_orphan);
222 orphanage.push_orphan(third_orphan);
223 orphanage.push_orphan(second_orphan);
224 orphanage.push_orphan(fourth_orphan);
225
226 assert_eq!(orphanage.len(), 4);
227
228 drain_orphan_queue(orphanage.queue.lock());
229 assert_eq!(orphanage.len(), 2);
230 assert_eq!(first_waits.get(), 1);
231 assert_eq!(second_waits.get(), 1);
232 assert_eq!(third_waits.get(), 1);
233 assert_eq!(fourth_waits.get(), 1);
234
235 drain_orphan_queue(orphanage.queue.lock());
236 assert_eq!(orphanage.len(), 1);
237 assert_eq!(first_waits.get(), 1);
238 assert_eq!(second_waits.get(), 2);
239 assert_eq!(third_waits.get(), 2);
240 assert_eq!(fourth_waits.get(), 1);
241
242 drain_orphan_queue(orphanage.queue.lock());
243 assert_eq!(orphanage.len(), 0);
244 assert_eq!(first_waits.get(), 1);
245 assert_eq!(second_waits.get(), 2);
246 assert_eq!(third_waits.get(), 3);
247 assert_eq!(fourth_waits.get(), 1);
248
249 // Safe to reap when empty
250 drain_orphan_queue(orphanage.queue.lock());
251 }
252
253 #[test]
254 fn no_reap_if_no_signal_received() {
255 let (tx, rx) = watch::channel(());
256
257 let handle = SignalHandle::default();
258
259 let orphanage = OrphanQueueImpl::new();
260 *orphanage.sigchild.lock() = Some(rx);
261
262 let orphan = MockWait::new(2);
263 let waits = orphan.total_waits.clone();
264 orphanage.push_orphan(orphan);
265
266 orphanage.reap_orphans(&handle);
267 assert_eq!(waits.get(), 0);
268
269 orphanage.reap_orphans(&handle);
270 assert_eq!(waits.get(), 0);
271
272 tx.send(()).unwrap();
273 orphanage.reap_orphans(&handle);
274 assert_eq!(waits.get(), 1);
275 }
276
277 #[test]
278 fn no_reap_if_signal_lock_held() {
279 let handle = SignalHandle::default();
280
281 let orphanage = OrphanQueueImpl::new();
282 let signal_guard = orphanage.sigchild.lock();
283
284 let orphan = MockWait::new(2);
285 let waits = orphan.total_waits.clone();
286 orphanage.push_orphan(orphan);
287
288 orphanage.reap_orphans(&handle);
289 assert_eq!(waits.get(), 0);
290
291 drop(signal_guard);
292 }
293
294 #[cfg_attr(miri, ignore)] // Miri does not support epoll.
295 #[test]
296 fn does_not_register_signal_if_queue_empty() {
297 let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
298 let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
299 let handle = signal_driver.handle();
300
301 let orphanage = OrphanQueueImpl::new();
302 assert!(orphanage.sigchild.lock().is_none()); // Sanity
303
304 // No register when queue empty
305 orphanage.reap_orphans(&handle);
306 assert!(orphanage.sigchild.lock().is_none());
307
308 let orphan = MockWait::new(2);
309 let waits = orphan.total_waits.clone();
310 orphanage.push_orphan(orphan);
311
312 orphanage.reap_orphans(&handle);
313 assert!(orphanage.sigchild.lock().is_some());
314 assert_eq!(waits.get(), 1); // Eager reap when registering listener
315 }
316
317 #[test]
318 fn does_nothing_if_signal_could_not_be_registered() {
319 let handle = SignalHandle::default();
320
321 let orphanage = OrphanQueueImpl::new();
322 assert!(orphanage.sigchild.lock().is_none());
323
324 let orphan = MockWait::new(2);
325 let waits = orphan.total_waits.clone();
326 orphanage.push_orphan(orphan);
327
328 // Signal handler has "gone away", nothing to register or reap
329 orphanage.reap_orphans(&handle);
330 assert!(orphanage.sigchild.lock().is_none());
331 assert_eq!(waits.get(), 0);
332 }
333}
334