1 | use crate::loom::sync::{Mutex, MutexGuard}; |
2 | use crate::runtime::signal::Handle as SignalHandle; |
3 | use crate::signal::unix::{signal_with_handle, SignalKind}; |
4 | use crate::sync::watch; |
5 | use std::io; |
6 | use std::process::ExitStatus; |
7 | |
8 | /// An interface for waiting on a process to exit. |
9 | pub(crate) trait Wait { |
10 | /// Get the identifier for this process or diagnostics. |
11 | #[allow (dead_code)] |
12 | fn id(&self) -> u32; |
13 | /// Try waiting for a process to exit in a non-blocking manner. |
14 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>; |
15 | } |
16 | |
17 | impl<T: Wait> Wait for &mut T { |
18 | fn id(&self) -> u32 { |
19 | (**self).id() |
20 | } |
21 | |
22 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
23 | (**self).try_wait() |
24 | } |
25 | } |
26 | |
27 | /// An interface for queueing up an orphaned process so that it can be reaped. |
28 | pub(crate) trait OrphanQueue<T> { |
29 | /// Adds an orphan to the queue. |
30 | fn push_orphan(&self, orphan: T); |
31 | } |
32 | |
33 | impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O { |
34 | fn push_orphan(&self, orphan: T) { |
35 | (**self).push_orphan(orphan); |
36 | } |
37 | } |
38 | |
39 | /// An implementation of `OrphanQueue`. |
40 | #[derive (Debug)] |
41 | pub(crate) struct OrphanQueueImpl<T> { |
42 | sigchild: Mutex<Option<watch::Receiver<()>>>, |
43 | queue: Mutex<Vec<T>>, |
44 | } |
45 | |
46 | impl<T> OrphanQueueImpl<T> { |
47 | cfg_not_has_const_mutex_new! { |
48 | pub(crate) fn new() -> Self { |
49 | Self { |
50 | sigchild: Mutex::new(None), |
51 | queue: Mutex::new(Vec::new()), |
52 | } |
53 | } |
54 | } |
55 | |
56 | cfg_has_const_mutex_new! { |
57 | pub(crate) const fn new() -> Self { |
58 | Self { |
59 | sigchild: Mutex::const_new(None), |
60 | queue: Mutex::const_new(Vec::new()), |
61 | } |
62 | } |
63 | } |
64 | |
65 | #[cfg (test)] |
66 | fn len(&self) -> usize { |
67 | self.queue.lock().len() |
68 | } |
69 | |
70 | pub(crate) fn push_orphan(&self, orphan: T) |
71 | where |
72 | T: Wait, |
73 | { |
74 | self.queue.lock().push(orphan); |
75 | } |
76 | |
77 | /// Attempts to reap every process in the queue, ignoring any errors and |
78 | /// enqueueing any orphans which have not yet exited. |
79 | pub(crate) fn reap_orphans(&self, handle: &SignalHandle) |
80 | where |
81 | T: Wait, |
82 | { |
83 | // If someone else is holding the lock, they will be responsible for draining |
84 | // the queue as necessary, so we can safely bail if that happens |
85 | if let Some(mut sigchild_guard) = self.sigchild.try_lock() { |
86 | match &mut *sigchild_guard { |
87 | Some(sigchild) => { |
88 | if sigchild.try_has_changed().and_then(Result::ok).is_some() { |
89 | drain_orphan_queue(self.queue.lock()); |
90 | } |
91 | } |
92 | None => { |
93 | let queue = self.queue.lock(); |
94 | |
95 | // Be lazy and only initialize the SIGCHLD listener if there |
96 | // are any orphaned processes in the queue. |
97 | if !queue.is_empty() { |
98 | // An errors shouldn't really happen here, but if it does it |
99 | // means that the signal driver isn't running, in |
100 | // which case there isn't anything we can |
101 | // register/initialize here, so we can try again later |
102 | if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) { |
103 | *sigchild_guard = Some(sigchild); |
104 | drain_orphan_queue(queue); |
105 | } |
106 | } |
107 | } |
108 | } |
109 | } |
110 | } |
111 | } |
112 | |
113 | fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) |
114 | where |
115 | T: Wait, |
116 | { |
117 | for i: usize in (0..queue.len()).rev() { |
118 | match queue[i].try_wait() { |
119 | Ok(None) => {} |
120 | Ok(Some(_)) | Err(_) => { |
121 | // The stdlib handles interruption errors (EINTR) when polling a child process. |
122 | // All other errors represent invalid inputs or pids that have already been |
123 | // reaped, so we can drop the orphan in case an error is raised. |
124 | queue.swap_remove(index:i); |
125 | } |
126 | } |
127 | } |
128 | |
129 | drop(queue); |
130 | } |
131 | |
132 | #[cfg (all(test, not(loom)))] |
133 | pub(crate) mod test { |
134 | use super::*; |
135 | use crate::runtime::io::Driver as IoDriver; |
136 | use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle}; |
137 | use crate::sync::watch; |
138 | use std::cell::{Cell, RefCell}; |
139 | use std::io; |
140 | use std::os::unix::process::ExitStatusExt; |
141 | use std::process::ExitStatus; |
142 | use std::rc::Rc; |
143 | |
144 | pub(crate) struct MockQueue<W> { |
145 | pub(crate) all_enqueued: RefCell<Vec<W>>, |
146 | } |
147 | |
148 | impl<W> MockQueue<W> { |
149 | pub(crate) fn new() -> Self { |
150 | Self { |
151 | all_enqueued: RefCell::new(Vec::new()), |
152 | } |
153 | } |
154 | } |
155 | |
156 | impl<W> OrphanQueue<W> for MockQueue<W> { |
157 | fn push_orphan(&self, orphan: W) { |
158 | self.all_enqueued.borrow_mut().push(orphan); |
159 | } |
160 | } |
161 | |
162 | struct MockWait { |
163 | total_waits: Rc<Cell<usize>>, |
164 | num_wait_until_status: usize, |
165 | return_err: bool, |
166 | } |
167 | |
168 | impl MockWait { |
169 | fn new(num_wait_until_status: usize) -> Self { |
170 | Self { |
171 | total_waits: Rc::new(Cell::new(0)), |
172 | num_wait_until_status, |
173 | return_err: false, |
174 | } |
175 | } |
176 | |
177 | fn with_err() -> Self { |
178 | Self { |
179 | total_waits: Rc::new(Cell::new(0)), |
180 | num_wait_until_status: 0, |
181 | return_err: true, |
182 | } |
183 | } |
184 | } |
185 | |
186 | impl Wait for MockWait { |
187 | fn id(&self) -> u32 { |
188 | 42 |
189 | } |
190 | |
191 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
192 | let waits = self.total_waits.get(); |
193 | |
194 | let ret = if self.num_wait_until_status == waits { |
195 | if self.return_err { |
196 | Ok(Some(ExitStatus::from_raw(0))) |
197 | } else { |
198 | Err(io::Error::new(io::ErrorKind::Other, "mock err" )) |
199 | } |
200 | } else { |
201 | Ok(None) |
202 | }; |
203 | |
204 | self.total_waits.set(waits + 1); |
205 | ret |
206 | } |
207 | } |
208 | |
209 | #[test ] |
210 | fn drain_attempts_a_single_reap_of_all_queued_orphans() { |
211 | let first_orphan = MockWait::new(0); |
212 | let second_orphan = MockWait::new(1); |
213 | let third_orphan = MockWait::new(2); |
214 | let fourth_orphan = MockWait::with_err(); |
215 | |
216 | let first_waits = first_orphan.total_waits.clone(); |
217 | let second_waits = second_orphan.total_waits.clone(); |
218 | let third_waits = third_orphan.total_waits.clone(); |
219 | let fourth_waits = fourth_orphan.total_waits.clone(); |
220 | |
221 | let orphanage = OrphanQueueImpl::new(); |
222 | orphanage.push_orphan(first_orphan); |
223 | orphanage.push_orphan(third_orphan); |
224 | orphanage.push_orphan(second_orphan); |
225 | orphanage.push_orphan(fourth_orphan); |
226 | |
227 | assert_eq!(orphanage.len(), 4); |
228 | |
229 | drain_orphan_queue(orphanage.queue.lock()); |
230 | assert_eq!(orphanage.len(), 2); |
231 | assert_eq!(first_waits.get(), 1); |
232 | assert_eq!(second_waits.get(), 1); |
233 | assert_eq!(third_waits.get(), 1); |
234 | assert_eq!(fourth_waits.get(), 1); |
235 | |
236 | drain_orphan_queue(orphanage.queue.lock()); |
237 | assert_eq!(orphanage.len(), 1); |
238 | assert_eq!(first_waits.get(), 1); |
239 | assert_eq!(second_waits.get(), 2); |
240 | assert_eq!(third_waits.get(), 2); |
241 | assert_eq!(fourth_waits.get(), 1); |
242 | |
243 | drain_orphan_queue(orphanage.queue.lock()); |
244 | assert_eq!(orphanage.len(), 0); |
245 | assert_eq!(first_waits.get(), 1); |
246 | assert_eq!(second_waits.get(), 2); |
247 | assert_eq!(third_waits.get(), 3); |
248 | assert_eq!(fourth_waits.get(), 1); |
249 | |
250 | // Safe to reap when empty |
251 | drain_orphan_queue(orphanage.queue.lock()); |
252 | } |
253 | |
254 | #[test ] |
255 | fn no_reap_if_no_signal_received() { |
256 | let (tx, rx) = watch::channel(()); |
257 | |
258 | let handle = SignalHandle::default(); |
259 | |
260 | let orphanage = OrphanQueueImpl::new(); |
261 | *orphanage.sigchild.lock() = Some(rx); |
262 | |
263 | let orphan = MockWait::new(2); |
264 | let waits = orphan.total_waits.clone(); |
265 | orphanage.push_orphan(orphan); |
266 | |
267 | orphanage.reap_orphans(&handle); |
268 | assert_eq!(waits.get(), 0); |
269 | |
270 | orphanage.reap_orphans(&handle); |
271 | assert_eq!(waits.get(), 0); |
272 | |
273 | tx.send(()).unwrap(); |
274 | orphanage.reap_orphans(&handle); |
275 | assert_eq!(waits.get(), 1); |
276 | } |
277 | |
278 | #[test ] |
279 | fn no_reap_if_signal_lock_held() { |
280 | let handle = SignalHandle::default(); |
281 | |
282 | let orphanage = OrphanQueueImpl::new(); |
283 | let signal_guard = orphanage.sigchild.lock(); |
284 | |
285 | let orphan = MockWait::new(2); |
286 | let waits = orphan.total_waits.clone(); |
287 | orphanage.push_orphan(orphan); |
288 | |
289 | orphanage.reap_orphans(&handle); |
290 | assert_eq!(waits.get(), 0); |
291 | |
292 | drop(signal_guard); |
293 | } |
294 | |
295 | #[cfg_attr (miri, ignore)] // Miri does not support epoll. |
296 | #[test ] |
297 | fn does_not_register_signal_if_queue_empty() { |
298 | let (io_driver, io_handle) = IoDriver::new(1024).unwrap(); |
299 | let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap(); |
300 | let handle = signal_driver.handle(); |
301 | |
302 | let orphanage = OrphanQueueImpl::new(); |
303 | assert!(orphanage.sigchild.lock().is_none()); // Sanity |
304 | |
305 | // No register when queue empty |
306 | orphanage.reap_orphans(&handle); |
307 | assert!(orphanage.sigchild.lock().is_none()); |
308 | |
309 | let orphan = MockWait::new(2); |
310 | let waits = orphan.total_waits.clone(); |
311 | orphanage.push_orphan(orphan); |
312 | |
313 | orphanage.reap_orphans(&handle); |
314 | assert!(orphanage.sigchild.lock().is_some()); |
315 | assert_eq!(waits.get(), 1); // Eager reap when registering listener |
316 | } |
317 | |
318 | #[test ] |
319 | fn does_nothing_if_signal_could_not_be_registered() { |
320 | let handle = SignalHandle::default(); |
321 | |
322 | let orphanage = OrphanQueueImpl::new(); |
323 | assert!(orphanage.sigchild.lock().is_none()); |
324 | |
325 | let orphan = MockWait::new(2); |
326 | let waits = orphan.total_waits.clone(); |
327 | orphanage.push_orphan(orphan); |
328 | |
329 | // Signal handler has "gone away", nothing to register or reap |
330 | orphanage.reap_orphans(&handle); |
331 | assert!(orphanage.sigchild.lock().is_none()); |
332 | assert_eq!(waits.get(), 0); |
333 | } |
334 | } |
335 | |