1 | use crate::loom::sync::{Mutex, MutexGuard}; |
2 | use crate::runtime::signal::Handle as SignalHandle; |
3 | use crate::signal::unix::{signal_with_handle, SignalKind}; |
4 | use crate::sync::watch; |
5 | use std::io; |
6 | use std::process::ExitStatus; |
7 | |
8 | /// An interface for waiting on a process to exit. |
9 | pub(crate) trait Wait { |
10 | /// Get the identifier for this process or diagnostics. |
11 | fn id(&self) -> u32; |
12 | /// Try waiting for a process to exit in a non-blocking manner. |
13 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>; |
14 | } |
15 | |
16 | impl<T: Wait> Wait for &mut T { |
17 | fn id(&self) -> u32 { |
18 | (**self).id() |
19 | } |
20 | |
21 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
22 | (**self).try_wait() |
23 | } |
24 | } |
25 | |
26 | /// An interface for queueing up an orphaned process so that it can be reaped. |
27 | pub(crate) trait OrphanQueue<T> { |
28 | /// Adds an orphan to the queue. |
29 | fn push_orphan(&self, orphan: T); |
30 | } |
31 | |
32 | impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O { |
33 | fn push_orphan(&self, orphan: T) { |
34 | (**self).push_orphan(orphan); |
35 | } |
36 | } |
37 | |
38 | /// An implementation of `OrphanQueue`. |
39 | #[derive (Debug)] |
40 | pub(crate) struct OrphanQueueImpl<T> { |
41 | sigchild: Mutex<Option<watch::Receiver<()>>>, |
42 | queue: Mutex<Vec<T>>, |
43 | } |
44 | |
45 | impl<T> OrphanQueueImpl<T> { |
46 | cfg_not_has_const_mutex_new! { |
47 | pub(crate) fn new() -> Self { |
48 | Self { |
49 | sigchild: Mutex::new(None), |
50 | queue: Mutex::new(Vec::new()), |
51 | } |
52 | } |
53 | } |
54 | |
55 | cfg_has_const_mutex_new! { |
56 | pub(crate) const fn new() -> Self { |
57 | Self { |
58 | sigchild: Mutex::const_new(None), |
59 | queue: Mutex::const_new(Vec::new()), |
60 | } |
61 | } |
62 | } |
63 | |
64 | #[cfg (test)] |
65 | fn len(&self) -> usize { |
66 | self.queue.lock().len() |
67 | } |
68 | |
69 | pub(crate) fn push_orphan(&self, orphan: T) |
70 | where |
71 | T: Wait, |
72 | { |
73 | self.queue.lock().push(orphan); |
74 | } |
75 | |
76 | /// Attempts to reap every process in the queue, ignoring any errors and |
77 | /// enqueueing any orphans which have not yet exited. |
78 | pub(crate) fn reap_orphans(&self, handle: &SignalHandle) |
79 | where |
80 | T: Wait, |
81 | { |
82 | // If someone else is holding the lock, they will be responsible for draining |
83 | // the queue as necessary, so we can safely bail if that happens |
84 | if let Some(mut sigchild_guard) = self.sigchild.try_lock() { |
85 | match &mut *sigchild_guard { |
86 | Some(sigchild) => { |
87 | if sigchild.try_has_changed().and_then(Result::ok).is_some() { |
88 | drain_orphan_queue(self.queue.lock()); |
89 | } |
90 | } |
91 | None => { |
92 | let queue = self.queue.lock(); |
93 | |
94 | // Be lazy and only initialize the SIGCHLD listener if there |
95 | // are any orphaned processes in the queue. |
96 | if !queue.is_empty() { |
97 | // An errors shouldn't really happen here, but if it does it |
98 | // means that the signal driver isn't running, in |
99 | // which case there isn't anything we can |
100 | // register/initialize here, so we can try again later |
101 | if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) { |
102 | *sigchild_guard = Some(sigchild); |
103 | drain_orphan_queue(queue); |
104 | } |
105 | } |
106 | } |
107 | } |
108 | } |
109 | } |
110 | } |
111 | |
112 | fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) |
113 | where |
114 | T: Wait, |
115 | { |
116 | for i: usize in (0..queue.len()).rev() { |
117 | match queue[i].try_wait() { |
118 | Ok(None) => {} |
119 | Ok(Some(_)) | Err(_) => { |
120 | // The stdlib handles interruption errors (EINTR) when polling a child process. |
121 | // All other errors represent invalid inputs or pids that have already been |
122 | // reaped, so we can drop the orphan in case an error is raised. |
123 | queue.swap_remove(index:i); |
124 | } |
125 | } |
126 | } |
127 | |
128 | drop(queue); |
129 | } |
130 | |
131 | #[cfg (all(test, not(loom)))] |
132 | pub(crate) mod test { |
133 | use super::*; |
134 | use crate::runtime::io::Driver as IoDriver; |
135 | use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle}; |
136 | use crate::sync::watch; |
137 | use std::cell::{Cell, RefCell}; |
138 | use std::io; |
139 | use std::os::unix::process::ExitStatusExt; |
140 | use std::process::ExitStatus; |
141 | use std::rc::Rc; |
142 | |
143 | pub(crate) struct MockQueue<W> { |
144 | pub(crate) all_enqueued: RefCell<Vec<W>>, |
145 | } |
146 | |
147 | impl<W> MockQueue<W> { |
148 | pub(crate) fn new() -> Self { |
149 | Self { |
150 | all_enqueued: RefCell::new(Vec::new()), |
151 | } |
152 | } |
153 | } |
154 | |
155 | impl<W> OrphanQueue<W> for MockQueue<W> { |
156 | fn push_orphan(&self, orphan: W) { |
157 | self.all_enqueued.borrow_mut().push(orphan); |
158 | } |
159 | } |
160 | |
161 | struct MockWait { |
162 | total_waits: Rc<Cell<usize>>, |
163 | num_wait_until_status: usize, |
164 | return_err: bool, |
165 | } |
166 | |
167 | impl MockWait { |
168 | fn new(num_wait_until_status: usize) -> Self { |
169 | Self { |
170 | total_waits: Rc::new(Cell::new(0)), |
171 | num_wait_until_status, |
172 | return_err: false, |
173 | } |
174 | } |
175 | |
176 | fn with_err() -> Self { |
177 | Self { |
178 | total_waits: Rc::new(Cell::new(0)), |
179 | num_wait_until_status: 0, |
180 | return_err: true, |
181 | } |
182 | } |
183 | } |
184 | |
185 | impl Wait for MockWait { |
186 | fn id(&self) -> u32 { |
187 | 42 |
188 | } |
189 | |
190 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
191 | let waits = self.total_waits.get(); |
192 | |
193 | let ret = if self.num_wait_until_status == waits { |
194 | if self.return_err { |
195 | Ok(Some(ExitStatus::from_raw(0))) |
196 | } else { |
197 | Err(io::Error::new(io::ErrorKind::Other, "mock err" )) |
198 | } |
199 | } else { |
200 | Ok(None) |
201 | }; |
202 | |
203 | self.total_waits.set(waits + 1); |
204 | ret |
205 | } |
206 | } |
207 | |
208 | #[test ] |
209 | fn drain_attempts_a_single_reap_of_all_queued_orphans() { |
210 | let first_orphan = MockWait::new(0); |
211 | let second_orphan = MockWait::new(1); |
212 | let third_orphan = MockWait::new(2); |
213 | let fourth_orphan = MockWait::with_err(); |
214 | |
215 | let first_waits = first_orphan.total_waits.clone(); |
216 | let second_waits = second_orphan.total_waits.clone(); |
217 | let third_waits = third_orphan.total_waits.clone(); |
218 | let fourth_waits = fourth_orphan.total_waits.clone(); |
219 | |
220 | let orphanage = OrphanQueueImpl::new(); |
221 | orphanage.push_orphan(first_orphan); |
222 | orphanage.push_orphan(third_orphan); |
223 | orphanage.push_orphan(second_orphan); |
224 | orphanage.push_orphan(fourth_orphan); |
225 | |
226 | assert_eq!(orphanage.len(), 4); |
227 | |
228 | drain_orphan_queue(orphanage.queue.lock()); |
229 | assert_eq!(orphanage.len(), 2); |
230 | assert_eq!(first_waits.get(), 1); |
231 | assert_eq!(second_waits.get(), 1); |
232 | assert_eq!(third_waits.get(), 1); |
233 | assert_eq!(fourth_waits.get(), 1); |
234 | |
235 | drain_orphan_queue(orphanage.queue.lock()); |
236 | assert_eq!(orphanage.len(), 1); |
237 | assert_eq!(first_waits.get(), 1); |
238 | assert_eq!(second_waits.get(), 2); |
239 | assert_eq!(third_waits.get(), 2); |
240 | assert_eq!(fourth_waits.get(), 1); |
241 | |
242 | drain_orphan_queue(orphanage.queue.lock()); |
243 | assert_eq!(orphanage.len(), 0); |
244 | assert_eq!(first_waits.get(), 1); |
245 | assert_eq!(second_waits.get(), 2); |
246 | assert_eq!(third_waits.get(), 3); |
247 | assert_eq!(fourth_waits.get(), 1); |
248 | |
249 | // Safe to reap when empty |
250 | drain_orphan_queue(orphanage.queue.lock()); |
251 | } |
252 | |
253 | #[test ] |
254 | fn no_reap_if_no_signal_received() { |
255 | let (tx, rx) = watch::channel(()); |
256 | |
257 | let handle = SignalHandle::default(); |
258 | |
259 | let orphanage = OrphanQueueImpl::new(); |
260 | *orphanage.sigchild.lock() = Some(rx); |
261 | |
262 | let orphan = MockWait::new(2); |
263 | let waits = orphan.total_waits.clone(); |
264 | orphanage.push_orphan(orphan); |
265 | |
266 | orphanage.reap_orphans(&handle); |
267 | assert_eq!(waits.get(), 0); |
268 | |
269 | orphanage.reap_orphans(&handle); |
270 | assert_eq!(waits.get(), 0); |
271 | |
272 | tx.send(()).unwrap(); |
273 | orphanage.reap_orphans(&handle); |
274 | assert_eq!(waits.get(), 1); |
275 | } |
276 | |
277 | #[test ] |
278 | fn no_reap_if_signal_lock_held() { |
279 | let handle = SignalHandle::default(); |
280 | |
281 | let orphanage = OrphanQueueImpl::new(); |
282 | let signal_guard = orphanage.sigchild.lock(); |
283 | |
284 | let orphan = MockWait::new(2); |
285 | let waits = orphan.total_waits.clone(); |
286 | orphanage.push_orphan(orphan); |
287 | |
288 | orphanage.reap_orphans(&handle); |
289 | assert_eq!(waits.get(), 0); |
290 | |
291 | drop(signal_guard); |
292 | } |
293 | |
294 | #[cfg_attr (miri, ignore)] // Miri does not support epoll. |
295 | #[test ] |
296 | fn does_not_register_signal_if_queue_empty() { |
297 | let (io_driver, io_handle) = IoDriver::new(1024).unwrap(); |
298 | let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap(); |
299 | let handle = signal_driver.handle(); |
300 | |
301 | let orphanage = OrphanQueueImpl::new(); |
302 | assert!(orphanage.sigchild.lock().is_none()); // Sanity |
303 | |
304 | // No register when queue empty |
305 | orphanage.reap_orphans(&handle); |
306 | assert!(orphanage.sigchild.lock().is_none()); |
307 | |
308 | let orphan = MockWait::new(2); |
309 | let waits = orphan.total_waits.clone(); |
310 | orphanage.push_orphan(orphan); |
311 | |
312 | orphanage.reap_orphans(&handle); |
313 | assert!(orphanage.sigchild.lock().is_some()); |
314 | assert_eq!(waits.get(), 1); // Eager reap when registering listener |
315 | } |
316 | |
317 | #[test ] |
318 | fn does_nothing_if_signal_could_not_be_registered() { |
319 | let handle = SignalHandle::default(); |
320 | |
321 | let orphanage = OrphanQueueImpl::new(); |
322 | assert!(orphanage.sigchild.lock().is_none()); |
323 | |
324 | let orphan = MockWait::new(2); |
325 | let waits = orphan.total_waits.clone(); |
326 | orphanage.push_orphan(orphan); |
327 | |
328 | // Signal handler has "gone away", nothing to register or reap |
329 | orphanage.reap_orphans(&handle); |
330 | assert!(orphanage.sigchild.lock().is_none()); |
331 | assert_eq!(waits.get(), 0); |
332 | } |
333 | } |
334 | |