1 | //! Unix handling of child processes. |
2 | //! |
3 | //! Right now the only "fancy" thing about this is how we implement the |
4 | //! `Future` implementation on `Child` to get the exit status. Unix offers |
5 | //! no way to register a child with epoll, and the only real way to get a |
6 | //! notification when a process exits is the SIGCHLD signal. |
7 | //! |
8 | //! Signal handling in general is *super* hairy and complicated, and it's even |
9 | //! more complicated here with the fact that signals are coalesced, so we may |
10 | //! not get a SIGCHLD-per-child. |
11 | //! |
12 | //! Our best approximation here is to check *all spawned processes* for all |
13 | //! SIGCHLD signals received. To do that we create a `Signal`, implemented in |
14 | //! the `tokio-net` crate, which is a stream over signals being received. |
15 | //! |
16 | //! Later when we poll the process's exit status we simply check to see if a |
17 | //! SIGCHLD has happened since we last checked, and while that returns "yes" we |
18 | //! keep trying. |
19 | //! |
20 | //! Note that this means that this isn't really scalable, but then again |
21 | //! processes in general aren't scalable (e.g. millions) so it shouldn't be that |
22 | //! bad in theory... |
23 | |
24 | pub(crate) mod orphan; |
25 | use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; |
26 | |
27 | mod reap; |
28 | use reap::Reaper; |
29 | |
30 | use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; |
31 | use crate::process::kill::Kill; |
32 | use crate::process::SpawnedChild; |
33 | use crate::runtime::signal::Handle as SignalHandle; |
34 | use crate::signal::unix::{signal, Signal, SignalKind}; |
35 | |
36 | use mio::event::Source; |
37 | use mio::unix::SourceFd; |
38 | use std::fmt; |
39 | use std::fs::File; |
40 | use std::future::Future; |
41 | use std::io; |
42 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; |
43 | use std::pin::Pin; |
44 | use std::process::{Child as StdChild, ExitStatus, Stdio}; |
45 | use std::task::Context; |
46 | use std::task::Poll; |
47 | |
48 | impl Wait for StdChild { |
49 | fn id(&self) -> u32 { |
50 | self.id() |
51 | } |
52 | |
53 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
54 | self.try_wait() |
55 | } |
56 | } |
57 | |
58 | impl Kill for StdChild { |
59 | fn kill(&mut self) -> io::Result<()> { |
60 | self.kill() |
61 | } |
62 | } |
63 | |
64 | cfg_not_has_const_mutex_new! { |
65 | fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> { |
66 | use crate::util::once_cell::OnceCell; |
67 | |
68 | static ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new(); |
69 | |
70 | ORPHAN_QUEUE.get(OrphanQueueImpl::new) |
71 | } |
72 | } |
73 | |
74 | cfg_has_const_mutex_new! { |
75 | fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> { |
76 | static ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new(); |
77 | |
78 | &ORPHAN_QUEUE |
79 | } |
80 | } |
81 | |
82 | pub(crate) struct GlobalOrphanQueue; |
83 | |
84 | impl fmt::Debug for GlobalOrphanQueue { |
85 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
86 | get_orphan_queue().fmt(fmt) |
87 | } |
88 | } |
89 | |
90 | impl GlobalOrphanQueue { |
91 | pub(crate) fn reap_orphans(handle: &SignalHandle) { |
92 | get_orphan_queue().reap_orphans(handle); |
93 | } |
94 | } |
95 | |
96 | impl OrphanQueue<StdChild> for GlobalOrphanQueue { |
97 | fn push_orphan(&self, orphan: StdChild) { |
98 | get_orphan_queue().push_orphan(orphan); |
99 | } |
100 | } |
101 | |
102 | #[must_use = "futures do nothing unless polled" ] |
103 | pub(crate) struct Child { |
104 | inner: Reaper<StdChild, GlobalOrphanQueue, Signal>, |
105 | } |
106 | |
107 | impl fmt::Debug for Child { |
108 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
109 | fmt.debug_struct("Child" ) |
110 | .field("pid" , &self.inner.id()) |
111 | .finish() |
112 | } |
113 | } |
114 | |
115 | pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> { |
116 | let mut child = cmd.spawn()?; |
117 | let stdin = child.stdin.take().map(stdio).transpose()?; |
118 | let stdout = child.stdout.take().map(stdio).transpose()?; |
119 | let stderr = child.stderr.take().map(stdio).transpose()?; |
120 | |
121 | let signal = signal(SignalKind::child())?; |
122 | |
123 | Ok(SpawnedChild { |
124 | child: Child { |
125 | inner: Reaper::new(child, GlobalOrphanQueue, signal), |
126 | }, |
127 | stdin, |
128 | stdout, |
129 | stderr, |
130 | }) |
131 | } |
132 | |
133 | impl Child { |
134 | pub(crate) fn id(&self) -> u32 { |
135 | self.inner.id() |
136 | } |
137 | |
138 | pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
139 | self.inner.inner_mut().try_wait() |
140 | } |
141 | } |
142 | |
143 | impl Kill for Child { |
144 | fn kill(&mut self) -> io::Result<()> { |
145 | self.inner.kill() |
146 | } |
147 | } |
148 | |
149 | impl Future for Child { |
150 | type Output = io::Result<ExitStatus>; |
151 | |
152 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
153 | Pin::new(&mut self.inner).poll(cx) |
154 | } |
155 | } |
156 | |
157 | #[derive(Debug)] |
158 | pub(crate) struct Pipe { |
159 | // Actually a pipe is not a File. However, we are reusing `File` to get |
160 | // close on drop. This is a similar trick as `mio`. |
161 | fd: File, |
162 | } |
163 | |
164 | impl<T: IntoRawFd> From<T> for Pipe { |
165 | fn from(fd: T) -> Self { |
166 | let fd = unsafe { File::from_raw_fd(fd.into_raw_fd()) }; |
167 | Self { fd } |
168 | } |
169 | } |
170 | |
171 | impl<'a> io::Read for &'a Pipe { |
172 | fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { |
173 | (&self.fd).read(bytes) |
174 | } |
175 | } |
176 | |
177 | impl<'a> io::Write for &'a Pipe { |
178 | fn write(&mut self, bytes: &[u8]) -> io::Result<usize> { |
179 | (&self.fd).write(bytes) |
180 | } |
181 | |
182 | fn flush(&mut self) -> io::Result<()> { |
183 | (&self.fd).flush() |
184 | } |
185 | |
186 | fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { |
187 | (&self.fd).write_vectored(bufs) |
188 | } |
189 | } |
190 | |
191 | impl AsRawFd for Pipe { |
192 | fn as_raw_fd(&self) -> RawFd { |
193 | self.fd.as_raw_fd() |
194 | } |
195 | } |
196 | |
197 | impl AsFd for Pipe { |
198 | fn as_fd(&self) -> BorrowedFd<'_> { |
199 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
200 | } |
201 | } |
202 | |
203 | fn convert_to_blocking_file(io: ChildStdio) -> io::Result<File> { |
204 | let mut fd = io.inner.into_inner()?.fd; |
205 | |
206 | // Ensure that the fd to be inherited is set to *blocking* mode, as this |
207 | // is the default that virtually all programs expect to have. Those |
208 | // programs that know how to work with nonblocking stdio will know how to |
209 | // change it to nonblocking mode. |
210 | set_nonblocking(&mut fd, false)?; |
211 | |
212 | Ok(fd) |
213 | } |
214 | |
215 | pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> { |
216 | convert_to_blocking_file(io).map(Stdio::from) |
217 | } |
218 | |
219 | impl Source for Pipe { |
220 | fn register( |
221 | &mut self, |
222 | registry: &mio::Registry, |
223 | token: mio::Token, |
224 | interest: mio::Interest, |
225 | ) -> io::Result<()> { |
226 | SourceFd(&self.as_raw_fd()).register(registry, token, interest) |
227 | } |
228 | |
229 | fn reregister( |
230 | &mut self, |
231 | registry: &mio::Registry, |
232 | token: mio::Token, |
233 | interest: mio::Interest, |
234 | ) -> io::Result<()> { |
235 | SourceFd(&self.as_raw_fd()).reregister(registry, token, interest) |
236 | } |
237 | |
238 | fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { |
239 | SourceFd(&self.as_raw_fd()).deregister(registry) |
240 | } |
241 | } |
242 | |
243 | pub(crate) struct ChildStdio { |
244 | inner: PollEvented<Pipe>, |
245 | } |
246 | |
247 | impl ChildStdio { |
248 | pub(super) fn into_owned_fd(self) -> io::Result<OwnedFd> { |
249 | convert_to_blocking_file(self).map(OwnedFd::from) |
250 | } |
251 | } |
252 | |
253 | impl fmt::Debug for ChildStdio { |
254 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
255 | self.inner.fmt(fmt) |
256 | } |
257 | } |
258 | |
259 | impl AsRawFd for ChildStdio { |
260 | fn as_raw_fd(&self) -> RawFd { |
261 | self.inner.as_raw_fd() |
262 | } |
263 | } |
264 | |
265 | impl AsFd for ChildStdio { |
266 | fn as_fd(&self) -> BorrowedFd<'_> { |
267 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
268 | } |
269 | } |
270 | |
271 | impl AsyncWrite for ChildStdio { |
272 | fn poll_write( |
273 | self: Pin<&mut Self>, |
274 | cx: &mut Context<'_>, |
275 | buf: &[u8], |
276 | ) -> Poll<io::Result<usize>> { |
277 | self.inner.poll_write(cx, buf) |
278 | } |
279 | |
280 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
281 | Poll::Ready(Ok(())) |
282 | } |
283 | |
284 | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
285 | Poll::Ready(Ok(())) |
286 | } |
287 | |
288 | fn poll_write_vectored( |
289 | self: Pin<&mut Self>, |
290 | cx: &mut Context<'_>, |
291 | bufs: &[io::IoSlice<'_>], |
292 | ) -> Poll<Result<usize, io::Error>> { |
293 | self.inner.poll_write_vectored(cx, bufs) |
294 | } |
295 | |
296 | fn is_write_vectored(&self) -> bool { |
297 | true |
298 | } |
299 | } |
300 | |
301 | impl AsyncRead for ChildStdio { |
302 | fn poll_read( |
303 | self: Pin<&mut Self>, |
304 | cx: &mut Context<'_>, |
305 | buf: &mut ReadBuf<'_>, |
306 | ) -> Poll<io::Result<()>> { |
307 | // Safety: pipes support reading into uninitialized memory |
308 | unsafe { self.inner.poll_read(cx, buf) } |
309 | } |
310 | } |
311 | |
312 | fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> { |
313 | unsafe { |
314 | let fd = fd.as_raw_fd(); |
315 | let previous = libc::fcntl(fd, libc::F_GETFL); |
316 | if previous == -1 { |
317 | return Err(io::Error::last_os_error()); |
318 | } |
319 | |
320 | let new = if nonblocking { |
321 | previous | libc::O_NONBLOCK |
322 | } else { |
323 | previous & !libc::O_NONBLOCK |
324 | }; |
325 | |
326 | let r = libc::fcntl(fd, libc::F_SETFL, new); |
327 | if r == -1 { |
328 | return Err(io::Error::last_os_error()); |
329 | } |
330 | } |
331 | |
332 | Ok(()) |
333 | } |
334 | |
335 | pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio> |
336 | where |
337 | T: IntoRawFd, |
338 | { |
339 | // Set the fd to nonblocking before we pass it to the event loop |
340 | let mut pipe = Pipe::from(io); |
341 | set_nonblocking(&mut pipe, true)?; |
342 | |
343 | PollEvented::new(pipe).map(|inner| ChildStdio { inner }) |
344 | } |
345 | |