| 1 | use crate::loom::sync::{Mutex, MutexGuard}; |
| 2 | use crate::runtime::signal::Handle as SignalHandle; |
| 3 | use crate::signal::unix::{signal_with_handle, SignalKind}; |
| 4 | use crate::sync::watch; |
| 5 | use std::io; |
| 6 | use std::process::ExitStatus; |
| 7 | |
| 8 | /// An interface for waiting on a process to exit. |
| 9 | pub(crate) trait Wait { |
| 10 | /// Get the identifier for this process or diagnostics. |
| 11 | #[allow (dead_code)] |
| 12 | fn id(&self) -> u32; |
| 13 | /// Try waiting for a process to exit in a non-blocking manner. |
| 14 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>; |
| 15 | } |
| 16 | |
| 17 | impl<T: Wait> Wait for &mut T { |
| 18 | fn id(&self) -> u32 { |
| 19 | (**self).id() |
| 20 | } |
| 21 | |
| 22 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
| 23 | (**self).try_wait() |
| 24 | } |
| 25 | } |
| 26 | |
| 27 | /// An interface for queueing up an orphaned process so that it can be reaped. |
| 28 | pub(crate) trait OrphanQueue<T> { |
| 29 | /// Adds an orphan to the queue. |
| 30 | fn push_orphan(&self, orphan: T); |
| 31 | } |
| 32 | |
| 33 | impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O { |
| 34 | fn push_orphan(&self, orphan: T) { |
| 35 | (**self).push_orphan(orphan); |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | /// An implementation of `OrphanQueue`. |
| 40 | #[derive (Debug)] |
| 41 | pub(crate) struct OrphanQueueImpl<T> { |
| 42 | sigchild: Mutex<Option<watch::Receiver<()>>>, |
| 43 | queue: Mutex<Vec<T>>, |
| 44 | } |
| 45 | |
| 46 | impl<T> OrphanQueueImpl<T> { |
| 47 | cfg_not_has_const_mutex_new! { |
| 48 | pub(crate) fn new() -> Self { |
| 49 | Self { |
| 50 | sigchild: Mutex::new(None), |
| 51 | queue: Mutex::new(Vec::new()), |
| 52 | } |
| 53 | } |
| 54 | } |
| 55 | |
| 56 | cfg_has_const_mutex_new! { |
| 57 | pub(crate) const fn new() -> Self { |
| 58 | Self { |
| 59 | sigchild: Mutex::const_new(None), |
| 60 | queue: Mutex::const_new(Vec::new()), |
| 61 | } |
| 62 | } |
| 63 | } |
| 64 | |
| 65 | #[cfg (test)] |
| 66 | fn len(&self) -> usize { |
| 67 | self.queue.lock().len() |
| 68 | } |
| 69 | |
| 70 | pub(crate) fn push_orphan(&self, orphan: T) |
| 71 | where |
| 72 | T: Wait, |
| 73 | { |
| 74 | self.queue.lock().push(orphan); |
| 75 | } |
| 76 | |
| 77 | /// Attempts to reap every process in the queue, ignoring any errors and |
| 78 | /// enqueueing any orphans which have not yet exited. |
| 79 | pub(crate) fn reap_orphans(&self, handle: &SignalHandle) |
| 80 | where |
| 81 | T: Wait, |
| 82 | { |
| 83 | // If someone else is holding the lock, they will be responsible for draining |
| 84 | // the queue as necessary, so we can safely bail if that happens |
| 85 | if let Some(mut sigchild_guard) = self.sigchild.try_lock() { |
| 86 | match &mut *sigchild_guard { |
| 87 | Some(sigchild) => { |
| 88 | if sigchild.try_has_changed().and_then(Result::ok).is_some() { |
| 89 | drain_orphan_queue(self.queue.lock()); |
| 90 | } |
| 91 | } |
| 92 | None => { |
| 93 | let queue = self.queue.lock(); |
| 94 | |
| 95 | // Be lazy and only initialize the SIGCHLD listener if there |
| 96 | // are any orphaned processes in the queue. |
| 97 | if !queue.is_empty() { |
| 98 | // An errors shouldn't really happen here, but if it does it |
| 99 | // means that the signal driver isn't running, in |
| 100 | // which case there isn't anything we can |
| 101 | // register/initialize here, so we can try again later |
| 102 | if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) { |
| 103 | *sigchild_guard = Some(sigchild); |
| 104 | drain_orphan_queue(queue); |
| 105 | } |
| 106 | } |
| 107 | } |
| 108 | } |
| 109 | } |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) |
| 114 | where |
| 115 | T: Wait, |
| 116 | { |
| 117 | for i: usize in (0..queue.len()).rev() { |
| 118 | match queue[i].try_wait() { |
| 119 | Ok(None) => {} |
| 120 | Ok(Some(_)) | Err(_) => { |
| 121 | // The stdlib handles interruption errors (EINTR) when polling a child process. |
| 122 | // All other errors represent invalid inputs or pids that have already been |
| 123 | // reaped, so we can drop the orphan in case an error is raised. |
| 124 | queue.swap_remove(index:i); |
| 125 | } |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | drop(queue); |
| 130 | } |
| 131 | |
| 132 | #[cfg (all(test, not(loom)))] |
| 133 | pub(crate) mod test { |
| 134 | use super::*; |
| 135 | use crate::runtime::io::Driver as IoDriver; |
| 136 | use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle}; |
| 137 | use crate::sync::watch; |
| 138 | use std::cell::{Cell, RefCell}; |
| 139 | use std::io; |
| 140 | use std::os::unix::process::ExitStatusExt; |
| 141 | use std::process::ExitStatus; |
| 142 | use std::rc::Rc; |
| 143 | |
| 144 | pub(crate) struct MockQueue<W> { |
| 145 | pub(crate) all_enqueued: RefCell<Vec<W>>, |
| 146 | } |
| 147 | |
| 148 | impl<W> MockQueue<W> { |
| 149 | pub(crate) fn new() -> Self { |
| 150 | Self { |
| 151 | all_enqueued: RefCell::new(Vec::new()), |
| 152 | } |
| 153 | } |
| 154 | } |
| 155 | |
| 156 | impl<W> OrphanQueue<W> for MockQueue<W> { |
| 157 | fn push_orphan(&self, orphan: W) { |
| 158 | self.all_enqueued.borrow_mut().push(orphan); |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | struct MockWait { |
| 163 | total_waits: Rc<Cell<usize>>, |
| 164 | num_wait_until_status: usize, |
| 165 | return_err: bool, |
| 166 | } |
| 167 | |
| 168 | impl MockWait { |
| 169 | fn new(num_wait_until_status: usize) -> Self { |
| 170 | Self { |
| 171 | total_waits: Rc::new(Cell::new(0)), |
| 172 | num_wait_until_status, |
| 173 | return_err: false, |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | fn with_err() -> Self { |
| 178 | Self { |
| 179 | total_waits: Rc::new(Cell::new(0)), |
| 180 | num_wait_until_status: 0, |
| 181 | return_err: true, |
| 182 | } |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | impl Wait for MockWait { |
| 187 | fn id(&self) -> u32 { |
| 188 | 42 |
| 189 | } |
| 190 | |
| 191 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
| 192 | let waits = self.total_waits.get(); |
| 193 | |
| 194 | let ret = if self.num_wait_until_status == waits { |
| 195 | if self.return_err { |
| 196 | Ok(Some(ExitStatus::from_raw(0))) |
| 197 | } else { |
| 198 | Err(io::Error::new(io::ErrorKind::Other, "mock err" )) |
| 199 | } |
| 200 | } else { |
| 201 | Ok(None) |
| 202 | }; |
| 203 | |
| 204 | self.total_waits.set(waits + 1); |
| 205 | ret |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | #[test ] |
| 210 | fn drain_attempts_a_single_reap_of_all_queued_orphans() { |
| 211 | let first_orphan = MockWait::new(0); |
| 212 | let second_orphan = MockWait::new(1); |
| 213 | let third_orphan = MockWait::new(2); |
| 214 | let fourth_orphan = MockWait::with_err(); |
| 215 | |
| 216 | let first_waits = first_orphan.total_waits.clone(); |
| 217 | let second_waits = second_orphan.total_waits.clone(); |
| 218 | let third_waits = third_orphan.total_waits.clone(); |
| 219 | let fourth_waits = fourth_orphan.total_waits.clone(); |
| 220 | |
| 221 | let orphanage = OrphanQueueImpl::new(); |
| 222 | orphanage.push_orphan(first_orphan); |
| 223 | orphanage.push_orphan(third_orphan); |
| 224 | orphanage.push_orphan(second_orphan); |
| 225 | orphanage.push_orphan(fourth_orphan); |
| 226 | |
| 227 | assert_eq!(orphanage.len(), 4); |
| 228 | |
| 229 | drain_orphan_queue(orphanage.queue.lock()); |
| 230 | assert_eq!(orphanage.len(), 2); |
| 231 | assert_eq!(first_waits.get(), 1); |
| 232 | assert_eq!(second_waits.get(), 1); |
| 233 | assert_eq!(third_waits.get(), 1); |
| 234 | assert_eq!(fourth_waits.get(), 1); |
| 235 | |
| 236 | drain_orphan_queue(orphanage.queue.lock()); |
| 237 | assert_eq!(orphanage.len(), 1); |
| 238 | assert_eq!(first_waits.get(), 1); |
| 239 | assert_eq!(second_waits.get(), 2); |
| 240 | assert_eq!(third_waits.get(), 2); |
| 241 | assert_eq!(fourth_waits.get(), 1); |
| 242 | |
| 243 | drain_orphan_queue(orphanage.queue.lock()); |
| 244 | assert_eq!(orphanage.len(), 0); |
| 245 | assert_eq!(first_waits.get(), 1); |
| 246 | assert_eq!(second_waits.get(), 2); |
| 247 | assert_eq!(third_waits.get(), 3); |
| 248 | assert_eq!(fourth_waits.get(), 1); |
| 249 | |
| 250 | // Safe to reap when empty |
| 251 | drain_orphan_queue(orphanage.queue.lock()); |
| 252 | } |
| 253 | |
| 254 | #[test ] |
| 255 | fn no_reap_if_no_signal_received() { |
| 256 | let (tx, rx) = watch::channel(()); |
| 257 | |
| 258 | let handle = SignalHandle::default(); |
| 259 | |
| 260 | let orphanage = OrphanQueueImpl::new(); |
| 261 | *orphanage.sigchild.lock() = Some(rx); |
| 262 | |
| 263 | let orphan = MockWait::new(2); |
| 264 | let waits = orphan.total_waits.clone(); |
| 265 | orphanage.push_orphan(orphan); |
| 266 | |
| 267 | orphanage.reap_orphans(&handle); |
| 268 | assert_eq!(waits.get(), 0); |
| 269 | |
| 270 | orphanage.reap_orphans(&handle); |
| 271 | assert_eq!(waits.get(), 0); |
| 272 | |
| 273 | tx.send(()).unwrap(); |
| 274 | orphanage.reap_orphans(&handle); |
| 275 | assert_eq!(waits.get(), 1); |
| 276 | } |
| 277 | |
| 278 | #[test ] |
| 279 | fn no_reap_if_signal_lock_held() { |
| 280 | let handle = SignalHandle::default(); |
| 281 | |
| 282 | let orphanage = OrphanQueueImpl::new(); |
| 283 | let signal_guard = orphanage.sigchild.lock(); |
| 284 | |
| 285 | let orphan = MockWait::new(2); |
| 286 | let waits = orphan.total_waits.clone(); |
| 287 | orphanage.push_orphan(orphan); |
| 288 | |
| 289 | orphanage.reap_orphans(&handle); |
| 290 | assert_eq!(waits.get(), 0); |
| 291 | |
| 292 | drop(signal_guard); |
| 293 | } |
| 294 | |
| 295 | #[cfg_attr (miri, ignore)] // Miri does not support epoll. |
| 296 | #[test ] |
| 297 | fn does_not_register_signal_if_queue_empty() { |
| 298 | let (io_driver, io_handle) = IoDriver::new(1024).unwrap(); |
| 299 | let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap(); |
| 300 | let handle = signal_driver.handle(); |
| 301 | |
| 302 | let orphanage = OrphanQueueImpl::new(); |
| 303 | assert!(orphanage.sigchild.lock().is_none()); // Sanity |
| 304 | |
| 305 | // No register when queue empty |
| 306 | orphanage.reap_orphans(&handle); |
| 307 | assert!(orphanage.sigchild.lock().is_none()); |
| 308 | |
| 309 | let orphan = MockWait::new(2); |
| 310 | let waits = orphan.total_waits.clone(); |
| 311 | orphanage.push_orphan(orphan); |
| 312 | |
| 313 | orphanage.reap_orphans(&handle); |
| 314 | assert!(orphanage.sigchild.lock().is_some()); |
| 315 | assert_eq!(waits.get(), 1); // Eager reap when registering listener |
| 316 | } |
| 317 | |
| 318 | #[test ] |
| 319 | fn does_nothing_if_signal_could_not_be_registered() { |
| 320 | let handle = SignalHandle::default(); |
| 321 | |
| 322 | let orphanage = OrphanQueueImpl::new(); |
| 323 | assert!(orphanage.sigchild.lock().is_none()); |
| 324 | |
| 325 | let orphan = MockWait::new(2); |
| 326 | let waits = orphan.total_waits.clone(); |
| 327 | orphanage.push_orphan(orphan); |
| 328 | |
| 329 | // Signal handler has "gone away", nothing to register or reap |
| 330 | orphanage.reap_orphans(&handle); |
| 331 | assert!(orphanage.sigchild.lock().is_none()); |
| 332 | assert_eq!(waits.get(), 0); |
| 333 | } |
| 334 | } |
| 335 | |