1use crate::process::imp::orphan::{OrphanQueue, Wait};
2use crate::process::kill::Kill;
3use crate::signal::unix::InternalStream;
4
5use std::future::Future;
6use std::io;
7use std::ops::Deref;
8use std::pin::Pin;
9use std::process::ExitStatus;
10use std::task::Context;
11use std::task::Poll;
12
13/// Orchestrates between registering interest for receiving signals when a
14/// child process has exited, and attempting to poll for process completion.
15#[derive(Debug)]
16pub(crate) struct Reaper<W, Q, S>
17where
18 W: Wait,
19 Q: OrphanQueue<W>,
20{
21 inner: Option<W>,
22 orphan_queue: Q,
23 signal: S,
24}
25
26impl<W, Q, S> Deref for Reaper<W, Q, S>
27where
28 W: Wait,
29 Q: OrphanQueue<W>,
30{
31 type Target = W;
32
33 fn deref(&self) -> &Self::Target {
34 self.inner()
35 }
36}
37
38impl<W, Q, S> Reaper<W, Q, S>
39where
40 W: Wait,
41 Q: OrphanQueue<W>,
42{
43 pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
44 Self {
45 inner: Some(inner),
46 orphan_queue,
47 signal,
48 }
49 }
50
51 fn inner(&self) -> &W {
52 self.inner.as_ref().expect("inner has gone away")
53 }
54
55 pub(crate) fn inner_mut(&mut self) -> &mut W {
56 self.inner.as_mut().expect("inner has gone away")
57 }
58}
59
60impl<W, Q, S> Future for Reaper<W, Q, S>
61where
62 W: Wait + Unpin,
63 Q: OrphanQueue<W> + Unpin,
64 S: InternalStream + Unpin,
65{
66 type Output = io::Result<ExitStatus>;
67
68 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69 loop {
70 // If the child hasn't exited yet, then it's our responsibility to
71 // ensure the current task gets notified when it might be able to
72 // make progress. We can use the delivery of a SIGCHLD signal as a
73 // sign that we can potentially make progress.
74 //
75 // However, we will register for a notification on the next signal
76 // BEFORE we poll the child. Otherwise it is possible that the child
77 // can exit and the signal can arrive after we last polled the child,
78 // but before we've registered for a notification on the next signal
79 // (this can cause a deadlock if there are no more spawned children
80 // which can generate a different signal for us). A side effect of
81 // pre-registering for signal notifications is that when the child
82 // exits, we will have already registered for an additional
83 // notification we don't need to consume. If another signal arrives,
84 // this future's task will be notified/woken up again. Since the
85 // futures model allows for spurious wake ups this extra wakeup
86 // should not cause significant issues with parent futures.
87 let registered_interest = self.signal.poll_recv(cx).is_pending();
88
89 if let Some(status) = self.inner_mut().try_wait()? {
90 return Poll::Ready(Ok(status));
91 }
92
93 // If our attempt to poll for the next signal was not ready, then
94 // we've arranged for our task to get notified and we can bail out.
95 if registered_interest {
96 return Poll::Pending;
97 } else {
98 // Otherwise, if the signal stream delivered a signal to us, we
99 // won't get notified at the next signal, so we'll loop and try
100 // again.
101 continue;
102 }
103 }
104 }
105}
106
107impl<W, Q, S> Kill for Reaper<W, Q, S>
108where
109 W: Kill + Wait,
110 Q: OrphanQueue<W>,
111{
112 fn kill(&mut self) -> io::Result<()> {
113 self.inner_mut().kill()
114 }
115}
116
117impl<W, Q, S> Drop for Reaper<W, Q, S>
118where
119 W: Wait,
120 Q: OrphanQueue<W>,
121{
122 fn drop(&mut self) {
123 if let Ok(Some(_)) = self.inner_mut().try_wait() {
124 return;
125 }
126
127 let orphan = self.inner.take().unwrap();
128 self.orphan_queue.push_orphan(orphan);
129 }
130}
131
132#[cfg(all(test, not(loom)))]
133mod test {
134 use super::*;
135
136 use crate::process::unix::orphan::test::MockQueue;
137 use futures::future::FutureExt;
138 use std::os::unix::process::ExitStatusExt;
139 use std::process::ExitStatus;
140 use std::task::Context;
141 use std::task::Poll;
142
143 #[derive(Debug)]
144 struct MockWait {
145 total_kills: usize,
146 total_waits: usize,
147 num_wait_until_status: usize,
148 status: ExitStatus,
149 }
150
151 impl MockWait {
152 fn new(status: ExitStatus, num_wait_until_status: usize) -> Self {
153 Self {
154 total_kills: 0,
155 total_waits: 0,
156 num_wait_until_status,
157 status,
158 }
159 }
160 }
161
162 impl Wait for MockWait {
163 fn id(&self) -> u32 {
164 0
165 }
166
167 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
168 let ret = if self.num_wait_until_status == self.total_waits {
169 Some(self.status)
170 } else {
171 None
172 };
173
174 self.total_waits += 1;
175 Ok(ret)
176 }
177 }
178
179 impl Kill for MockWait {
180 fn kill(&mut self) -> io::Result<()> {
181 self.total_kills += 1;
182 Ok(())
183 }
184 }
185
186 struct MockStream {
187 total_polls: usize,
188 values: Vec<Option<()>>,
189 }
190
191 impl MockStream {
192 fn new(values: Vec<Option<()>>) -> Self {
193 Self {
194 total_polls: 0,
195 values,
196 }
197 }
198 }
199
200 impl InternalStream for MockStream {
201 fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
202 self.total_polls += 1;
203 match self.values.remove(0) {
204 Some(()) => Poll::Ready(Some(())),
205 None => Poll::Pending,
206 }
207 }
208 }
209
210 #[test]
211 fn reaper() {
212 let exit = ExitStatus::from_raw(0);
213 let mock = MockWait::new(exit, 3);
214 let mut grim = Reaper::new(
215 mock,
216 MockQueue::new(),
217 MockStream::new(vec![None, Some(()), None, None, None]),
218 );
219
220 let waker = futures::task::noop_waker();
221 let mut context = Context::from_waker(&waker);
222
223 // Not yet exited, interest registered
224 assert!(grim.poll_unpin(&mut context).is_pending());
225 assert_eq!(1, grim.signal.total_polls);
226 assert_eq!(1, grim.total_waits);
227 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
228
229 // Not yet exited, couldn't register interest the first time
230 // but managed to register interest the second time around
231 assert!(grim.poll_unpin(&mut context).is_pending());
232 assert_eq!(3, grim.signal.total_polls);
233 assert_eq!(3, grim.total_waits);
234 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
235
236 // Exited
237 if let Poll::Ready(r) = grim.poll_unpin(&mut context) {
238 assert!(r.is_ok());
239 let exit_code = r.unwrap();
240 assert_eq!(exit_code, exit);
241 } else {
242 unreachable!();
243 }
244 assert_eq!(4, grim.signal.total_polls);
245 assert_eq!(4, grim.total_waits);
246 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
247 }
248
249 #[test]
250 fn kill() {
251 let exit = ExitStatus::from_raw(0);
252 let mut grim = Reaper::new(
253 MockWait::new(exit, 0),
254 MockQueue::new(),
255 MockStream::new(vec![None]),
256 );
257
258 grim.kill().unwrap();
259 assert_eq!(1, grim.total_kills);
260 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
261 }
262
263 #[test]
264 fn drop_reaps_if_possible() {
265 let exit = ExitStatus::from_raw(0);
266 let mut mock = MockWait::new(exit, 0);
267
268 {
269 let queue = MockQueue::new();
270
271 let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
272
273 drop(grim);
274
275 assert!(queue.all_enqueued.borrow().is_empty());
276 }
277
278 assert_eq!(1, mock.total_waits);
279 assert_eq!(0, mock.total_kills);
280 }
281
282 #[test]
283 fn drop_enqueues_orphan_if_wait_fails() {
284 let exit = ExitStatus::from_raw(0);
285 let mut mock = MockWait::new(exit, 2);
286
287 {
288 let queue = MockQueue::<&mut MockWait>::new();
289 let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
290 drop(grim);
291
292 assert_eq!(1, queue.all_enqueued.borrow().len());
293 }
294
295 assert_eq!(1, mock.total_waits);
296 assert_eq!(0, mock.total_kills);
297 }
298}
299