1 | use crate::process::imp::orphan::{OrphanQueue, Wait}; |
2 | use crate::process::kill::Kill; |
3 | use crate::signal::unix::InternalStream; |
4 | |
5 | use std::future::Future; |
6 | use std::io; |
7 | use std::ops::Deref; |
8 | use std::pin::Pin; |
9 | use std::process::ExitStatus; |
10 | use std::task::Context; |
11 | use std::task::Poll; |
12 | |
13 | /// Orchestrates between registering interest for receiving signals when a |
14 | /// child process has exited, and attempting to poll for process completion. |
15 | #[derive(Debug)] |
16 | pub(crate) struct Reaper<W, Q, S> |
17 | where |
18 | W: Wait, |
19 | Q: OrphanQueue<W>, |
20 | { |
21 | inner: Option<W>, |
22 | orphan_queue: Q, |
23 | signal: S, |
24 | } |
25 | |
26 | impl<W, Q, S> Deref for Reaper<W, Q, S> |
27 | where |
28 | W: Wait, |
29 | Q: OrphanQueue<W>, |
30 | { |
31 | type Target = W; |
32 | |
33 | fn deref(&self) -> &Self::Target { |
34 | self.inner() |
35 | } |
36 | } |
37 | |
38 | impl<W, Q, S> Reaper<W, Q, S> |
39 | where |
40 | W: Wait, |
41 | Q: OrphanQueue<W>, |
42 | { |
43 | pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { |
44 | Self { |
45 | inner: Some(inner), |
46 | orphan_queue, |
47 | signal, |
48 | } |
49 | } |
50 | |
51 | fn inner(&self) -> &W { |
52 | self.inner.as_ref().expect("inner has gone away" ) |
53 | } |
54 | |
55 | pub(crate) fn inner_mut(&mut self) -> &mut W { |
56 | self.inner.as_mut().expect("inner has gone away" ) |
57 | } |
58 | } |
59 | |
60 | impl<W, Q, S> Future for Reaper<W, Q, S> |
61 | where |
62 | W: Wait + Unpin, |
63 | Q: OrphanQueue<W> + Unpin, |
64 | S: InternalStream + Unpin, |
65 | { |
66 | type Output = io::Result<ExitStatus>; |
67 | |
68 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
69 | loop { |
70 | // If the child hasn't exited yet, then it's our responsibility to |
71 | // ensure the current task gets notified when it might be able to |
72 | // make progress. We can use the delivery of a SIGCHLD signal as a |
73 | // sign that we can potentially make progress. |
74 | // |
75 | // However, we will register for a notification on the next signal |
76 | // BEFORE we poll the child. Otherwise it is possible that the child |
77 | // can exit and the signal can arrive after we last polled the child, |
78 | // but before we've registered for a notification on the next signal |
79 | // (this can cause a deadlock if there are no more spawned children |
80 | // which can generate a different signal for us). A side effect of |
81 | // pre-registering for signal notifications is that when the child |
82 | // exits, we will have already registered for an additional |
83 | // notification we don't need to consume. If another signal arrives, |
84 | // this future's task will be notified/woken up again. Since the |
85 | // futures model allows for spurious wake ups this extra wakeup |
86 | // should not cause significant issues with parent futures. |
87 | let registered_interest = self.signal.poll_recv(cx).is_pending(); |
88 | |
89 | if let Some(status) = self.inner_mut().try_wait()? { |
90 | return Poll::Ready(Ok(status)); |
91 | } |
92 | |
93 | // If our attempt to poll for the next signal was not ready, then |
94 | // we've arranged for our task to get notified and we can bail out. |
95 | if registered_interest { |
96 | return Poll::Pending; |
97 | } else { |
98 | // Otherwise, if the signal stream delivered a signal to us, we |
99 | // won't get notified at the next signal, so we'll loop and try |
100 | // again. |
101 | continue; |
102 | } |
103 | } |
104 | } |
105 | } |
106 | |
107 | impl<W, Q, S> Kill for Reaper<W, Q, S> |
108 | where |
109 | W: Kill + Wait, |
110 | Q: OrphanQueue<W>, |
111 | { |
112 | fn kill(&mut self) -> io::Result<()> { |
113 | self.inner_mut().kill() |
114 | } |
115 | } |
116 | |
117 | impl<W, Q, S> Drop for Reaper<W, Q, S> |
118 | where |
119 | W: Wait, |
120 | Q: OrphanQueue<W>, |
121 | { |
122 | fn drop(&mut self) { |
123 | if let Ok(Some(_)) = self.inner_mut().try_wait() { |
124 | return; |
125 | } |
126 | |
127 | let orphan = self.inner.take().unwrap(); |
128 | self.orphan_queue.push_orphan(orphan); |
129 | } |
130 | } |
131 | |
132 | #[cfg (all(test, not(loom)))] |
133 | mod test { |
134 | use super::*; |
135 | |
136 | use crate::process::unix::orphan::test::MockQueue; |
137 | use futures::future::FutureExt; |
138 | use std::os::unix::process::ExitStatusExt; |
139 | use std::process::ExitStatus; |
140 | use std::task::Context; |
141 | use std::task::Poll; |
142 | |
143 | #[derive(Debug)] |
144 | struct MockWait { |
145 | total_kills: usize, |
146 | total_waits: usize, |
147 | num_wait_until_status: usize, |
148 | status: ExitStatus, |
149 | } |
150 | |
151 | impl MockWait { |
152 | fn new(status: ExitStatus, num_wait_until_status: usize) -> Self { |
153 | Self { |
154 | total_kills: 0, |
155 | total_waits: 0, |
156 | num_wait_until_status, |
157 | status, |
158 | } |
159 | } |
160 | } |
161 | |
162 | impl Wait for MockWait { |
163 | fn id(&self) -> u32 { |
164 | 0 |
165 | } |
166 | |
167 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
168 | let ret = if self.num_wait_until_status == self.total_waits { |
169 | Some(self.status) |
170 | } else { |
171 | None |
172 | }; |
173 | |
174 | self.total_waits += 1; |
175 | Ok(ret) |
176 | } |
177 | } |
178 | |
179 | impl Kill for MockWait { |
180 | fn kill(&mut self) -> io::Result<()> { |
181 | self.total_kills += 1; |
182 | Ok(()) |
183 | } |
184 | } |
185 | |
186 | struct MockStream { |
187 | total_polls: usize, |
188 | values: Vec<Option<()>>, |
189 | } |
190 | |
191 | impl MockStream { |
192 | fn new(values: Vec<Option<()>>) -> Self { |
193 | Self { |
194 | total_polls: 0, |
195 | values, |
196 | } |
197 | } |
198 | } |
199 | |
200 | impl InternalStream for MockStream { |
201 | fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { |
202 | self.total_polls += 1; |
203 | match self.values.remove(0) { |
204 | Some(()) => Poll::Ready(Some(())), |
205 | None => Poll::Pending, |
206 | } |
207 | } |
208 | } |
209 | |
210 | #[test] |
211 | fn reaper() { |
212 | let exit = ExitStatus::from_raw(0); |
213 | let mock = MockWait::new(exit, 3); |
214 | let mut grim = Reaper::new( |
215 | mock, |
216 | MockQueue::new(), |
217 | MockStream::new(vec![None, Some(()), None, None, None]), |
218 | ); |
219 | |
220 | let waker = futures::task::noop_waker(); |
221 | let mut context = Context::from_waker(&waker); |
222 | |
223 | // Not yet exited, interest registered |
224 | assert!(grim.poll_unpin(&mut context).is_pending()); |
225 | assert_eq!(1, grim.signal.total_polls); |
226 | assert_eq!(1, grim.total_waits); |
227 | assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); |
228 | |
229 | // Not yet exited, couldn't register interest the first time |
230 | // but managed to register interest the second time around |
231 | assert!(grim.poll_unpin(&mut context).is_pending()); |
232 | assert_eq!(3, grim.signal.total_polls); |
233 | assert_eq!(3, grim.total_waits); |
234 | assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); |
235 | |
236 | // Exited |
237 | if let Poll::Ready(r) = grim.poll_unpin(&mut context) { |
238 | assert!(r.is_ok()); |
239 | let exit_code = r.unwrap(); |
240 | assert_eq!(exit_code, exit); |
241 | } else { |
242 | unreachable!(); |
243 | } |
244 | assert_eq!(4, grim.signal.total_polls); |
245 | assert_eq!(4, grim.total_waits); |
246 | assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); |
247 | } |
248 | |
249 | #[test] |
250 | fn kill() { |
251 | let exit = ExitStatus::from_raw(0); |
252 | let mut grim = Reaper::new( |
253 | MockWait::new(exit, 0), |
254 | MockQueue::new(), |
255 | MockStream::new(vec![None]), |
256 | ); |
257 | |
258 | grim.kill().unwrap(); |
259 | assert_eq!(1, grim.total_kills); |
260 | assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); |
261 | } |
262 | |
263 | #[test] |
264 | fn drop_reaps_if_possible() { |
265 | let exit = ExitStatus::from_raw(0); |
266 | let mut mock = MockWait::new(exit, 0); |
267 | |
268 | { |
269 | let queue = MockQueue::new(); |
270 | |
271 | let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); |
272 | |
273 | drop(grim); |
274 | |
275 | assert!(queue.all_enqueued.borrow().is_empty()); |
276 | } |
277 | |
278 | assert_eq!(1, mock.total_waits); |
279 | assert_eq!(0, mock.total_kills); |
280 | } |
281 | |
282 | #[test] |
283 | fn drop_enqueues_orphan_if_wait_fails() { |
284 | let exit = ExitStatus::from_raw(0); |
285 | let mut mock = MockWait::new(exit, 2); |
286 | |
287 | { |
288 | let queue = MockQueue::<&mut MockWait>::new(); |
289 | let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); |
290 | drop(grim); |
291 | |
292 | assert_eq!(1, queue.all_enqueued.borrow().len()); |
293 | } |
294 | |
295 | assert_eq!(1, mock.total_waits); |
296 | assert_eq!(0, mock.total_kills); |
297 | } |
298 | } |
299 | |