1 | //! Unix handling of child processes. |
2 | //! |
3 | //! Right now the only "fancy" thing about this is how we implement the |
4 | //! `Future` implementation on `Child` to get the exit status. Unix offers |
5 | //! no way to register a child with epoll, and the only real way to get a |
6 | //! notification when a process exits is the SIGCHLD signal. |
7 | //! |
8 | //! Signal handling in general is *super* hairy and complicated, and it's even |
9 | //! more complicated here with the fact that signals are coalesced, so we may |
10 | //! not get a SIGCHLD-per-child. |
11 | //! |
12 | //! Our best approximation here is to check *all spawned processes* for all |
13 | //! SIGCHLD signals received. To do that we create a `Signal`, implemented in |
14 | //! the `tokio-net` crate, which is a stream over signals being received. |
15 | //! |
16 | //! Later when we poll the process's exit status we simply check to see if a |
17 | //! SIGCHLD has happened since we last checked, and while that returns "yes" we |
18 | //! keep trying. |
19 | //! |
20 | //! Note that this means that this isn't really scalable, but then again |
21 | //! processes in general aren't scalable (e.g. millions) so it shouldn't be that |
22 | //! bad in theory... |
23 | |
24 | pub(crate) mod orphan; |
25 | use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; |
26 | |
27 | mod reap; |
28 | use reap::Reaper; |
29 | |
30 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
31 | mod pidfd_reaper; |
32 | |
33 | use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; |
34 | use crate::process::kill::Kill; |
35 | use crate::process::SpawnedChild; |
36 | use crate::runtime::signal::Handle as SignalHandle; |
37 | use crate::signal::unix::{signal, Signal, SignalKind}; |
38 | |
39 | use mio::event::Source; |
40 | use mio::unix::SourceFd; |
41 | use std::fmt; |
42 | use std::fs::File; |
43 | use std::future::Future; |
44 | use std::io; |
45 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; |
46 | use std::pin::Pin; |
47 | use std::process::{Child as StdChild, ExitStatus, Stdio}; |
48 | use std::task::Context; |
49 | use std::task::Poll; |
50 | |
51 | impl Wait for StdChild { |
52 | fn id(&self) -> u32 { |
53 | self.id() |
54 | } |
55 | |
56 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
57 | self.try_wait() |
58 | } |
59 | } |
60 | |
61 | impl Kill for StdChild { |
62 | fn kill(&mut self) -> io::Result<()> { |
63 | self.kill() |
64 | } |
65 | } |
66 | |
67 | cfg_not_has_const_mutex_new! { |
68 | fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> { |
69 | use crate::util::once_cell::OnceCell; |
70 | |
71 | static ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new(); |
72 | |
73 | ORPHAN_QUEUE.get(OrphanQueueImpl::new) |
74 | } |
75 | } |
76 | |
77 | cfg_has_const_mutex_new! { |
78 | fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> { |
79 | static ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new(); |
80 | |
81 | &ORPHAN_QUEUE |
82 | } |
83 | } |
84 | |
85 | pub(crate) struct GlobalOrphanQueue; |
86 | |
87 | impl fmt::Debug for GlobalOrphanQueue { |
88 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
89 | get_orphan_queue().fmt(fmt) |
90 | } |
91 | } |
92 | |
93 | impl GlobalOrphanQueue { |
94 | pub(crate) fn reap_orphans(handle: &SignalHandle) { |
95 | get_orphan_queue().reap_orphans(handle); |
96 | } |
97 | } |
98 | |
99 | impl OrphanQueue<StdChild> for GlobalOrphanQueue { |
100 | fn push_orphan(&self, orphan: StdChild) { |
101 | get_orphan_queue().push_orphan(orphan); |
102 | } |
103 | } |
104 | |
105 | #[must_use = "futures do nothing unless polled" ] |
106 | pub(crate) enum Child { |
107 | SignalReaper(Reaper<StdChild, GlobalOrphanQueue, Signal>), |
108 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
109 | PidfdReaper(pidfd_reaper::PidfdReaper<StdChild, GlobalOrphanQueue>), |
110 | } |
111 | |
112 | impl fmt::Debug for Child { |
113 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
114 | fmt.debug_struct("Child" ).field(name:"pid" , &self.id()).finish() |
115 | } |
116 | } |
117 | |
118 | pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> { |
119 | let mut child = cmd.spawn()?; |
120 | let stdin = child.stdin.take().map(stdio).transpose()?; |
121 | let stdout = child.stdout.take().map(stdio).transpose()?; |
122 | let stderr = child.stderr.take().map(stdio).transpose()?; |
123 | |
124 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
125 | match pidfd_reaper::PidfdReaper::new(child, GlobalOrphanQueue) { |
126 | Ok(pidfd_reaper) => { |
127 | return Ok(SpawnedChild { |
128 | child: Child::PidfdReaper(pidfd_reaper), |
129 | stdin, |
130 | stdout, |
131 | stderr, |
132 | }) |
133 | } |
134 | Err((Some(err), _child)) => return Err(err), |
135 | Err((None, child_returned)) => child = child_returned, |
136 | } |
137 | |
138 | let signal = signal(SignalKind::child())?; |
139 | |
140 | Ok(SpawnedChild { |
141 | child: Child::SignalReaper(Reaper::new(child, GlobalOrphanQueue, signal)), |
142 | stdin, |
143 | stdout, |
144 | stderr, |
145 | }) |
146 | } |
147 | |
148 | impl Child { |
149 | pub(crate) fn id(&self) -> u32 { |
150 | match self { |
151 | Self::SignalReaper(signal_reaper: &Reaper) => signal_reaper.id(), |
152 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
153 | Self::PidfdReaper(pidfd_reaper: &PidfdReaper) => pidfd_reaper.id(), |
154 | } |
155 | } |
156 | |
157 | fn std_child(&mut self) -> &mut StdChild { |
158 | match self { |
159 | Self::SignalReaper(signal_reaper: &mut Reaper) => signal_reaper.inner_mut(), |
160 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
161 | Self::PidfdReaper(pidfd_reaper: &mut PidfdReaper) => pidfd_reaper.inner_mut(), |
162 | } |
163 | } |
164 | |
165 | pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
166 | self.std_child().try_wait() |
167 | } |
168 | } |
169 | |
170 | impl Kill for Child { |
171 | fn kill(&mut self) -> io::Result<()> { |
172 | self.std_child().kill() |
173 | } |
174 | } |
175 | |
176 | impl Future for Child { |
177 | type Output = io::Result<ExitStatus>; |
178 | |
179 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
180 | match Pin::into_inner(self) { |
181 | Self::SignalReaper(signal_reaper: &mut Reaper) => Pin::new(pointer:signal_reaper).poll(cx), |
182 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
183 | Self::PidfdReaper(pidfd_reaper: &mut PidfdReaper) => Pin::new(pointer:pidfd_reaper).poll(cx), |
184 | } |
185 | } |
186 | } |
187 | |
188 | #[derive (Debug)] |
189 | pub(crate) struct Pipe { |
190 | // Actually a pipe is not a File. However, we are reusing `File` to get |
191 | // close on drop. This is a similar trick as `mio`. |
192 | fd: File, |
193 | } |
194 | |
195 | impl<T: IntoRawFd> From<T> for Pipe { |
196 | fn from(fd: T) -> Self { |
197 | let fd: File = unsafe { File::from_raw_fd(fd.into_raw_fd()) }; |
198 | Self { fd } |
199 | } |
200 | } |
201 | |
202 | impl<'a> io::Read for &'a Pipe { |
203 | fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { |
204 | (&self.fd).read(buf:bytes) |
205 | } |
206 | } |
207 | |
208 | impl<'a> io::Write for &'a Pipe { |
209 | fn write(&mut self, bytes: &[u8]) -> io::Result<usize> { |
210 | (&self.fd).write(buf:bytes) |
211 | } |
212 | |
213 | fn flush(&mut self) -> io::Result<()> { |
214 | (&self.fd).flush() |
215 | } |
216 | |
217 | fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { |
218 | (&self.fd).write_vectored(bufs) |
219 | } |
220 | } |
221 | |
222 | impl AsRawFd for Pipe { |
223 | fn as_raw_fd(&self) -> RawFd { |
224 | self.fd.as_raw_fd() |
225 | } |
226 | } |
227 | |
228 | impl AsFd for Pipe { |
229 | fn as_fd(&self) -> BorrowedFd<'_> { |
230 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
231 | } |
232 | } |
233 | |
234 | fn convert_to_blocking_file(io: ChildStdio) -> io::Result<File> { |
235 | let mut fd: File = io.inner.into_inner()?.fd; |
236 | |
237 | // Ensure that the fd to be inherited is set to *blocking* mode, as this |
238 | // is the default that virtually all programs expect to have. Those |
239 | // programs that know how to work with nonblocking stdio will know how to |
240 | // change it to nonblocking mode. |
241 | set_nonblocking(&mut fd, nonblocking:false)?; |
242 | |
243 | Ok(fd) |
244 | } |
245 | |
246 | pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> { |
247 | convert_to_blocking_file(io).map(op:Stdio::from) |
248 | } |
249 | |
250 | impl Source for Pipe { |
251 | fn register( |
252 | &mut self, |
253 | registry: &mio::Registry, |
254 | token: mio::Token, |
255 | interest: mio::Interest, |
256 | ) -> io::Result<()> { |
257 | SourceFd(&self.as_raw_fd()).register(registry, token, interests:interest) |
258 | } |
259 | |
260 | fn reregister( |
261 | &mut self, |
262 | registry: &mio::Registry, |
263 | token: mio::Token, |
264 | interest: mio::Interest, |
265 | ) -> io::Result<()> { |
266 | SourceFd(&self.as_raw_fd()).reregister(registry, token, interests:interest) |
267 | } |
268 | |
269 | fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { |
270 | SourceFd(&self.as_raw_fd()).deregister(registry) |
271 | } |
272 | } |
273 | |
274 | pub(crate) struct ChildStdio { |
275 | inner: PollEvented<Pipe>, |
276 | } |
277 | |
278 | impl ChildStdio { |
279 | pub(super) fn into_owned_fd(self) -> io::Result<OwnedFd> { |
280 | convert_to_blocking_file(self).map(op:OwnedFd::from) |
281 | } |
282 | } |
283 | |
284 | impl fmt::Debug for ChildStdio { |
285 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
286 | self.inner.fmt(fmt) |
287 | } |
288 | } |
289 | |
290 | impl AsRawFd for ChildStdio { |
291 | fn as_raw_fd(&self) -> RawFd { |
292 | self.inner.as_raw_fd() |
293 | } |
294 | } |
295 | |
296 | impl AsFd for ChildStdio { |
297 | fn as_fd(&self) -> BorrowedFd<'_> { |
298 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
299 | } |
300 | } |
301 | |
302 | impl AsyncWrite for ChildStdio { |
303 | fn poll_write( |
304 | self: Pin<&mut Self>, |
305 | cx: &mut Context<'_>, |
306 | buf: &[u8], |
307 | ) -> Poll<io::Result<usize>> { |
308 | self.inner.poll_write(cx, buf) |
309 | } |
310 | |
311 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
312 | Poll::Ready(Ok(())) |
313 | } |
314 | |
315 | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
316 | Poll::Ready(Ok(())) |
317 | } |
318 | |
319 | fn poll_write_vectored( |
320 | self: Pin<&mut Self>, |
321 | cx: &mut Context<'_>, |
322 | bufs: &[io::IoSlice<'_>], |
323 | ) -> Poll<Result<usize, io::Error>> { |
324 | self.inner.poll_write_vectored(cx, bufs) |
325 | } |
326 | |
327 | fn is_write_vectored(&self) -> bool { |
328 | true |
329 | } |
330 | } |
331 | |
332 | impl AsyncRead for ChildStdio { |
333 | fn poll_read( |
334 | self: Pin<&mut Self>, |
335 | cx: &mut Context<'_>, |
336 | buf: &mut ReadBuf<'_>, |
337 | ) -> Poll<io::Result<()>> { |
338 | // Safety: pipes support reading into uninitialized memory |
339 | unsafe { self.inner.poll_read(cx, buf) } |
340 | } |
341 | } |
342 | |
343 | fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> { |
344 | unsafe { |
345 | let fd: i32 = fd.as_raw_fd(); |
346 | let previous: i32 = libc::fcntl(fd, cmd:libc::F_GETFL); |
347 | if previous == -1 { |
348 | return Err(io::Error::last_os_error()); |
349 | } |
350 | |
351 | let new: i32 = if nonblocking { |
352 | previous | libc::O_NONBLOCK |
353 | } else { |
354 | previous & !libc::O_NONBLOCK |
355 | }; |
356 | |
357 | let r: i32 = libc::fcntl(fd, cmd:libc::F_SETFL, new); |
358 | if r == -1 { |
359 | return Err(io::Error::last_os_error()); |
360 | } |
361 | } |
362 | |
363 | Ok(()) |
364 | } |
365 | |
366 | pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio> |
367 | where |
368 | T: IntoRawFd, |
369 | { |
370 | // Set the fd to nonblocking before we pass it to the event loop |
371 | let mut pipe: Pipe = Pipe::from(io); |
372 | set_nonblocking(&mut pipe, nonblocking:true)?; |
373 | |
374 | PollEvented::new(pipe).map(|inner: PollEvented| ChildStdio { inner }) |
375 | } |
376 | |