| 1 | use crate::process::imp::orphan::{OrphanQueue, Wait}; |
| 2 | use crate::process::kill::Kill; |
| 3 | use crate::signal::unix::InternalStream; |
| 4 | |
| 5 | use std::future::Future; |
| 6 | use std::io; |
| 7 | use std::ops::Deref; |
| 8 | use std::pin::Pin; |
| 9 | use std::process::ExitStatus; |
| 10 | use std::task::Context; |
| 11 | use std::task::Poll; |
| 12 | |
| 13 | /// Orchestrates between registering interest for receiving signals when a |
| 14 | /// child process has exited, and attempting to poll for process completion. |
| 15 | #[derive (Debug)] |
| 16 | pub(crate) struct Reaper<W, Q, S> |
| 17 | where |
| 18 | W: Wait, |
| 19 | Q: OrphanQueue<W>, |
| 20 | { |
| 21 | inner: Option<W>, |
| 22 | orphan_queue: Q, |
| 23 | signal: S, |
| 24 | } |
| 25 | |
| 26 | impl<W, Q, S> Deref for Reaper<W, Q, S> |
| 27 | where |
| 28 | W: Wait, |
| 29 | Q: OrphanQueue<W>, |
| 30 | { |
| 31 | type Target = W; |
| 32 | |
| 33 | fn deref(&self) -> &Self::Target { |
| 34 | self.inner() |
| 35 | } |
| 36 | } |
| 37 | |
| 38 | impl<W, Q, S> Reaper<W, Q, S> |
| 39 | where |
| 40 | W: Wait, |
| 41 | Q: OrphanQueue<W>, |
| 42 | { |
| 43 | pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { |
| 44 | Self { |
| 45 | inner: Some(inner), |
| 46 | orphan_queue, |
| 47 | signal, |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | fn inner(&self) -> &W { |
| 52 | self.inner.as_ref().expect(msg:"inner has gone away" ) |
| 53 | } |
| 54 | |
| 55 | pub(crate) fn inner_mut(&mut self) -> &mut W { |
| 56 | self.inner.as_mut().expect(msg:"inner has gone away" ) |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | impl<W, Q, S> Future for Reaper<W, Q, S> |
| 61 | where |
| 62 | W: Wait + Unpin, |
| 63 | Q: OrphanQueue<W> + Unpin, |
| 64 | S: InternalStream + Unpin, |
| 65 | { |
| 66 | type Output = io::Result<ExitStatus>; |
| 67 | |
| 68 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 69 | loop { |
| 70 | // If the child hasn't exited yet, then it's our responsibility to |
| 71 | // ensure the current task gets notified when it might be able to |
| 72 | // make progress. We can use the delivery of a SIGCHLD signal as a |
| 73 | // sign that we can potentially make progress. |
| 74 | // |
| 75 | // However, we will register for a notification on the next signal |
| 76 | // BEFORE we poll the child. Otherwise it is possible that the child |
| 77 | // can exit and the signal can arrive after we last polled the child, |
| 78 | // but before we've registered for a notification on the next signal |
| 79 | // (this can cause a deadlock if there are no more spawned children |
| 80 | // which can generate a different signal for us). A side effect of |
| 81 | // pre-registering for signal notifications is that when the child |
| 82 | // exits, we will have already registered for an additional |
| 83 | // notification we don't need to consume. If another signal arrives, |
| 84 | // this future's task will be notified/woken up again. Since the |
| 85 | // futures model allows for spurious wake ups this extra wakeup |
| 86 | // should not cause significant issues with parent futures. |
| 87 | let registered_interest = self.signal.poll_recv(cx).is_pending(); |
| 88 | |
| 89 | if let Some(status) = self.inner_mut().try_wait()? { |
| 90 | return Poll::Ready(Ok(status)); |
| 91 | } |
| 92 | |
| 93 | // If our attempt to poll for the next signal was not ready, then |
| 94 | // we've arranged for our task to get notified and we can bail out. |
| 95 | if registered_interest { |
| 96 | return Poll::Pending; |
| 97 | } else { |
| 98 | // Otherwise, if the signal stream delivered a signal to us, we |
| 99 | // won't get notified at the next signal, so we'll loop and try |
| 100 | // again. |
| 101 | continue; |
| 102 | } |
| 103 | } |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | impl<W, Q, S> Kill for Reaper<W, Q, S> |
| 108 | where |
| 109 | W: Kill + Wait, |
| 110 | Q: OrphanQueue<W>, |
| 111 | { |
| 112 | fn kill(&mut self) -> io::Result<()> { |
| 113 | self.inner_mut().kill() |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | impl<W, Q, S> Drop for Reaper<W, Q, S> |
| 118 | where |
| 119 | W: Wait, |
| 120 | Q: OrphanQueue<W>, |
| 121 | { |
| 122 | fn drop(&mut self) { |
| 123 | if let Ok(Some(_)) = self.inner_mut().try_wait() { |
| 124 | return; |
| 125 | } |
| 126 | |
| 127 | let orphan: W = self.inner.take().unwrap(); |
| 128 | self.orphan_queue.push_orphan(orphan); |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | #[cfg (all(test, not(loom)))] |
| 133 | mod test { |
| 134 | use super::*; |
| 135 | |
| 136 | use crate::process::unix::orphan::test::MockQueue; |
| 137 | use futures::future::FutureExt; |
| 138 | use std::os::unix::process::ExitStatusExt; |
| 139 | use std::process::ExitStatus; |
| 140 | use std::task::Context; |
| 141 | use std::task::Poll; |
| 142 | |
| 143 | #[derive (Debug)] |
| 144 | struct MockWait { |
| 145 | total_kills: usize, |
| 146 | total_waits: usize, |
| 147 | num_wait_until_status: usize, |
| 148 | status: ExitStatus, |
| 149 | } |
| 150 | |
| 151 | impl MockWait { |
| 152 | fn new(status: ExitStatus, num_wait_until_status: usize) -> Self { |
| 153 | Self { |
| 154 | total_kills: 0, |
| 155 | total_waits: 0, |
| 156 | num_wait_until_status, |
| 157 | status, |
| 158 | } |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | impl Wait for MockWait { |
| 163 | fn id(&self) -> u32 { |
| 164 | 0 |
| 165 | } |
| 166 | |
| 167 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
| 168 | let ret = if self.num_wait_until_status == self.total_waits { |
| 169 | Some(self.status) |
| 170 | } else { |
| 171 | None |
| 172 | }; |
| 173 | |
| 174 | self.total_waits += 1; |
| 175 | Ok(ret) |
| 176 | } |
| 177 | } |
| 178 | |
| 179 | impl Kill for MockWait { |
| 180 | fn kill(&mut self) -> io::Result<()> { |
| 181 | self.total_kills += 1; |
| 182 | Ok(()) |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | struct MockStream { |
| 187 | total_polls: usize, |
| 188 | values: Vec<Option<()>>, |
| 189 | } |
| 190 | |
| 191 | impl MockStream { |
| 192 | fn new(values: Vec<Option<()>>) -> Self { |
| 193 | Self { |
| 194 | total_polls: 0, |
| 195 | values, |
| 196 | } |
| 197 | } |
| 198 | } |
| 199 | |
| 200 | impl InternalStream for MockStream { |
| 201 | fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { |
| 202 | self.total_polls += 1; |
| 203 | match self.values.remove(0) { |
| 204 | Some(()) => Poll::Ready(Some(())), |
| 205 | None => Poll::Pending, |
| 206 | } |
| 207 | } |
| 208 | } |
| 209 | |
| 210 | #[test ] |
| 211 | fn reaper() { |
| 212 | let exit = ExitStatus::from_raw(0); |
| 213 | let mock = MockWait::new(exit, 3); |
| 214 | let mut grim = Reaper::new( |
| 215 | mock, |
| 216 | MockQueue::new(), |
| 217 | MockStream::new(vec![None, Some(()), None, None, None]), |
| 218 | ); |
| 219 | |
| 220 | let waker = futures::task::noop_waker(); |
| 221 | let mut context = Context::from_waker(&waker); |
| 222 | |
| 223 | // Not yet exited, interest registered |
| 224 | assert!(grim.poll_unpin(&mut context).is_pending()); |
| 225 | assert_eq!(1, grim.signal.total_polls); |
| 226 | assert_eq!(1, grim.total_waits); |
| 227 | assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); |
| 228 | |
| 229 | // Not yet exited, couldn't register interest the first time |
| 230 | // but managed to register interest the second time around |
| 231 | assert!(grim.poll_unpin(&mut context).is_pending()); |
| 232 | assert_eq!(3, grim.signal.total_polls); |
| 233 | assert_eq!(3, grim.total_waits); |
| 234 | assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); |
| 235 | |
| 236 | // Exited |
| 237 | if let Poll::Ready(r) = grim.poll_unpin(&mut context) { |
| 238 | assert!(r.is_ok()); |
| 239 | let exit_code = r.unwrap(); |
| 240 | assert_eq!(exit_code, exit); |
| 241 | } else { |
| 242 | unreachable!(); |
| 243 | } |
| 244 | assert_eq!(4, grim.signal.total_polls); |
| 245 | assert_eq!(4, grim.total_waits); |
| 246 | assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); |
| 247 | } |
| 248 | |
| 249 | #[test ] |
| 250 | fn kill() { |
| 251 | let exit = ExitStatus::from_raw(0); |
| 252 | let mut grim = Reaper::new( |
| 253 | MockWait::new(exit, 0), |
| 254 | MockQueue::new(), |
| 255 | MockStream::new(vec![None]), |
| 256 | ); |
| 257 | |
| 258 | grim.kill().unwrap(); |
| 259 | assert_eq!(1, grim.total_kills); |
| 260 | assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); |
| 261 | } |
| 262 | |
| 263 | #[test ] |
| 264 | fn drop_reaps_if_possible() { |
| 265 | let exit = ExitStatus::from_raw(0); |
| 266 | let mut mock = MockWait::new(exit, 0); |
| 267 | |
| 268 | { |
| 269 | let queue = MockQueue::new(); |
| 270 | |
| 271 | let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); |
| 272 | |
| 273 | drop(grim); |
| 274 | |
| 275 | assert!(queue.all_enqueued.borrow().is_empty()); |
| 276 | } |
| 277 | |
| 278 | assert_eq!(1, mock.total_waits); |
| 279 | assert_eq!(0, mock.total_kills); |
| 280 | } |
| 281 | |
| 282 | #[test ] |
| 283 | fn drop_enqueues_orphan_if_wait_fails() { |
| 284 | let exit = ExitStatus::from_raw(0); |
| 285 | let mut mock = MockWait::new(exit, 2); |
| 286 | |
| 287 | { |
| 288 | let queue = MockQueue::<&mut MockWait>::new(); |
| 289 | let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); |
| 290 | drop(grim); |
| 291 | |
| 292 | assert_eq!(1, queue.all_enqueued.borrow().len()); |
| 293 | } |
| 294 | |
| 295 | assert_eq!(1, mock.total_waits); |
| 296 | assert_eq!(0, mock.total_kills); |
| 297 | } |
| 298 | } |
| 299 | |