| 1 | //! Unix handling of child processes. |
| 2 | //! |
| 3 | //! Right now the only "fancy" thing about this is how we implement the |
| 4 | //! `Future` implementation on `Child` to get the exit status. Unix offers |
| 5 | //! no way to register a child with epoll, and the only real way to get a |
| 6 | //! notification when a process exits is the SIGCHLD signal. |
| 7 | //! |
| 8 | //! Signal handling in general is *super* hairy and complicated, and it's even |
| 9 | //! more complicated here with the fact that signals are coalesced, so we may |
| 10 | //! not get a SIGCHLD-per-child. |
| 11 | //! |
| 12 | //! Our best approximation here is to check *all spawned processes* for all |
| 13 | //! SIGCHLD signals received. To do that we create a `Signal`, implemented in |
| 14 | //! the `tokio-net` crate, which is a stream over signals being received. |
| 15 | //! |
| 16 | //! Later when we poll the process's exit status we simply check to see if a |
| 17 | //! SIGCHLD has happened since we last checked, and while that returns "yes" we |
| 18 | //! keep trying. |
| 19 | //! |
| 20 | //! Note that this means that this isn't really scalable, but then again |
| 21 | //! processes in general aren't scalable (e.g. millions) so it shouldn't be that |
| 22 | //! bad in theory... |
| 23 | |
| 24 | pub(crate) mod orphan; |
| 25 | use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; |
| 26 | |
| 27 | mod reap; |
| 28 | use reap::Reaper; |
| 29 | |
| 30 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
| 31 | mod pidfd_reaper; |
| 32 | |
| 33 | use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; |
| 34 | use crate::process::kill::Kill; |
| 35 | use crate::process::SpawnedChild; |
| 36 | use crate::runtime::signal::Handle as SignalHandle; |
| 37 | use crate::signal::unix::{signal, Signal, SignalKind}; |
| 38 | |
| 39 | use mio::event::Source; |
| 40 | use mio::unix::SourceFd; |
| 41 | use std::fmt; |
| 42 | use std::fs::File; |
| 43 | use std::future::Future; |
| 44 | use std::io; |
| 45 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; |
| 46 | use std::pin::Pin; |
| 47 | use std::process::{Child as StdChild, ExitStatus, Stdio}; |
| 48 | use std::task::Context; |
| 49 | use std::task::Poll; |
| 50 | |
| 51 | impl Wait for StdChild { |
| 52 | fn id(&self) -> u32 { |
| 53 | self.id() |
| 54 | } |
| 55 | |
| 56 | fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
| 57 | self.try_wait() |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | impl Kill for StdChild { |
| 62 | fn kill(&mut self) -> io::Result<()> { |
| 63 | self.kill() |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | cfg_not_has_const_mutex_new! { |
| 68 | fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> { |
| 69 | use crate::util::once_cell::OnceCell; |
| 70 | |
| 71 | static ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new(); |
| 72 | |
| 73 | ORPHAN_QUEUE.get(OrphanQueueImpl::new) |
| 74 | } |
| 75 | } |
| 76 | |
| 77 | cfg_has_const_mutex_new! { |
| 78 | fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> { |
| 79 | static ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new(); |
| 80 | |
| 81 | &ORPHAN_QUEUE |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | pub(crate) struct GlobalOrphanQueue; |
| 86 | |
| 87 | impl fmt::Debug for GlobalOrphanQueue { |
| 88 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 89 | get_orphan_queue().fmt(fmt) |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | impl GlobalOrphanQueue { |
| 94 | pub(crate) fn reap_orphans(handle: &SignalHandle) { |
| 95 | get_orphan_queue().reap_orphans(handle); |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | impl OrphanQueue<StdChild> for GlobalOrphanQueue { |
| 100 | fn push_orphan(&self, orphan: StdChild) { |
| 101 | get_orphan_queue().push_orphan(orphan); |
| 102 | } |
| 103 | } |
| 104 | |
| 105 | #[must_use = "futures do nothing unless polled" ] |
| 106 | pub(crate) enum Child { |
| 107 | SignalReaper(Reaper<StdChild, GlobalOrphanQueue, Signal>), |
| 108 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
| 109 | PidfdReaper(pidfd_reaper::PidfdReaper<StdChild, GlobalOrphanQueue>), |
| 110 | } |
| 111 | |
| 112 | impl fmt::Debug for Child { |
| 113 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 114 | fmt.debug_struct("Child" ).field(name:"pid" , &self.id()).finish() |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> { |
| 119 | let mut child = cmd.spawn()?; |
| 120 | let stdin = child.stdin.take().map(stdio).transpose()?; |
| 121 | let stdout = child.stdout.take().map(stdio).transpose()?; |
| 122 | let stderr = child.stderr.take().map(stdio).transpose()?; |
| 123 | |
| 124 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
| 125 | match pidfd_reaper::PidfdReaper::new(child, GlobalOrphanQueue) { |
| 126 | Ok(pidfd_reaper) => { |
| 127 | return Ok(SpawnedChild { |
| 128 | child: Child::PidfdReaper(pidfd_reaper), |
| 129 | stdin, |
| 130 | stdout, |
| 131 | stderr, |
| 132 | }) |
| 133 | } |
| 134 | Err((Some(err), _child)) => return Err(err), |
| 135 | Err((None, child_returned)) => child = child_returned, |
| 136 | } |
| 137 | |
| 138 | let signal = signal(SignalKind::child())?; |
| 139 | |
| 140 | Ok(SpawnedChild { |
| 141 | child: Child::SignalReaper(Reaper::new(child, GlobalOrphanQueue, signal)), |
| 142 | stdin, |
| 143 | stdout, |
| 144 | stderr, |
| 145 | }) |
| 146 | } |
| 147 | |
| 148 | impl Child { |
| 149 | pub(crate) fn id(&self) -> u32 { |
| 150 | match self { |
| 151 | Self::SignalReaper(signal_reaper: &Reaper) => signal_reaper.id(), |
| 152 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
| 153 | Self::PidfdReaper(pidfd_reaper: &PidfdReaper) => pidfd_reaper.id(), |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | fn std_child(&mut self) -> &mut StdChild { |
| 158 | match self { |
| 159 | Self::SignalReaper(signal_reaper: &mut Reaper) => signal_reaper.inner_mut(), |
| 160 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
| 161 | Self::PidfdReaper(pidfd_reaper: &mut PidfdReaper) => pidfd_reaper.inner_mut(), |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { |
| 166 | self.std_child().try_wait() |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | impl Kill for Child { |
| 171 | fn kill(&mut self) -> io::Result<()> { |
| 172 | self.std_child().kill() |
| 173 | } |
| 174 | } |
| 175 | |
| 176 | impl Future for Child { |
| 177 | type Output = io::Result<ExitStatus>; |
| 178 | |
| 179 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 180 | match Pin::into_inner(self) { |
| 181 | Self::SignalReaper(signal_reaper: &mut Reaper) => Pin::new(pointer:signal_reaper).poll(cx), |
| 182 | #[cfg (all(target_os = "linux" , feature = "rt" ))] |
| 183 | Self::PidfdReaper(pidfd_reaper: &mut PidfdReaper) => Pin::new(pointer:pidfd_reaper).poll(cx), |
| 184 | } |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | #[derive (Debug)] |
| 189 | pub(crate) struct Pipe { |
| 190 | // Actually a pipe is not a File. However, we are reusing `File` to get |
| 191 | // close on drop. This is a similar trick as `mio`. |
| 192 | fd: File, |
| 193 | } |
| 194 | |
| 195 | impl<T: IntoRawFd> From<T> for Pipe { |
| 196 | fn from(fd: T) -> Self { |
| 197 | let fd: File = unsafe { File::from_raw_fd(fd.into_raw_fd()) }; |
| 198 | Self { fd } |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | impl<'a> io::Read for &'a Pipe { |
| 203 | fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { |
| 204 | (&self.fd).read(buf:bytes) |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | impl<'a> io::Write for &'a Pipe { |
| 209 | fn write(&mut self, bytes: &[u8]) -> io::Result<usize> { |
| 210 | (&self.fd).write(buf:bytes) |
| 211 | } |
| 212 | |
| 213 | fn flush(&mut self) -> io::Result<()> { |
| 214 | (&self.fd).flush() |
| 215 | } |
| 216 | |
| 217 | fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { |
| 218 | (&self.fd).write_vectored(bufs) |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | impl AsRawFd for Pipe { |
| 223 | fn as_raw_fd(&self) -> RawFd { |
| 224 | self.fd.as_raw_fd() |
| 225 | } |
| 226 | } |
| 227 | |
| 228 | impl AsFd for Pipe { |
| 229 | fn as_fd(&self) -> BorrowedFd<'_> { |
| 230 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | fn convert_to_blocking_file(io: ChildStdio) -> io::Result<File> { |
| 235 | let mut fd: File = io.inner.into_inner()?.fd; |
| 236 | |
| 237 | // Ensure that the fd to be inherited is set to *blocking* mode, as this |
| 238 | // is the default that virtually all programs expect to have. Those |
| 239 | // programs that know how to work with nonblocking stdio will know how to |
| 240 | // change it to nonblocking mode. |
| 241 | set_nonblocking(&mut fd, nonblocking:false)?; |
| 242 | |
| 243 | Ok(fd) |
| 244 | } |
| 245 | |
| 246 | pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> { |
| 247 | convert_to_blocking_file(io).map(op:Stdio::from) |
| 248 | } |
| 249 | |
| 250 | impl Source for Pipe { |
| 251 | fn register( |
| 252 | &mut self, |
| 253 | registry: &mio::Registry, |
| 254 | token: mio::Token, |
| 255 | interest: mio::Interest, |
| 256 | ) -> io::Result<()> { |
| 257 | SourceFd(&self.as_raw_fd()).register(registry, token, interests:interest) |
| 258 | } |
| 259 | |
| 260 | fn reregister( |
| 261 | &mut self, |
| 262 | registry: &mio::Registry, |
| 263 | token: mio::Token, |
| 264 | interest: mio::Interest, |
| 265 | ) -> io::Result<()> { |
| 266 | SourceFd(&self.as_raw_fd()).reregister(registry, token, interests:interest) |
| 267 | } |
| 268 | |
| 269 | fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { |
| 270 | SourceFd(&self.as_raw_fd()).deregister(registry) |
| 271 | } |
| 272 | } |
| 273 | |
| 274 | pub(crate) struct ChildStdio { |
| 275 | inner: PollEvented<Pipe>, |
| 276 | } |
| 277 | |
| 278 | impl ChildStdio { |
| 279 | pub(super) fn into_owned_fd(self) -> io::Result<OwnedFd> { |
| 280 | convert_to_blocking_file(self).map(op:OwnedFd::from) |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | impl fmt::Debug for ChildStdio { |
| 285 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 286 | self.inner.fmt(fmt) |
| 287 | } |
| 288 | } |
| 289 | |
| 290 | impl AsRawFd for ChildStdio { |
| 291 | fn as_raw_fd(&self) -> RawFd { |
| 292 | self.inner.as_raw_fd() |
| 293 | } |
| 294 | } |
| 295 | |
| 296 | impl AsFd for ChildStdio { |
| 297 | fn as_fd(&self) -> BorrowedFd<'_> { |
| 298 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
| 299 | } |
| 300 | } |
| 301 | |
| 302 | impl AsyncWrite for ChildStdio { |
| 303 | fn poll_write( |
| 304 | self: Pin<&mut Self>, |
| 305 | cx: &mut Context<'_>, |
| 306 | buf: &[u8], |
| 307 | ) -> Poll<io::Result<usize>> { |
| 308 | self.inner.poll_write(cx, buf) |
| 309 | } |
| 310 | |
| 311 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 312 | Poll::Ready(Ok(())) |
| 313 | } |
| 314 | |
| 315 | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 316 | Poll::Ready(Ok(())) |
| 317 | } |
| 318 | |
| 319 | fn poll_write_vectored( |
| 320 | self: Pin<&mut Self>, |
| 321 | cx: &mut Context<'_>, |
| 322 | bufs: &[io::IoSlice<'_>], |
| 323 | ) -> Poll<Result<usize, io::Error>> { |
| 324 | self.inner.poll_write_vectored(cx, bufs) |
| 325 | } |
| 326 | |
| 327 | fn is_write_vectored(&self) -> bool { |
| 328 | true |
| 329 | } |
| 330 | } |
| 331 | |
| 332 | impl AsyncRead for ChildStdio { |
| 333 | fn poll_read( |
| 334 | self: Pin<&mut Self>, |
| 335 | cx: &mut Context<'_>, |
| 336 | buf: &mut ReadBuf<'_>, |
| 337 | ) -> Poll<io::Result<()>> { |
| 338 | // Safety: pipes support reading into uninitialized memory |
| 339 | unsafe { self.inner.poll_read(cx, buf) } |
| 340 | } |
| 341 | } |
| 342 | |
| 343 | fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> { |
| 344 | unsafe { |
| 345 | let fd: i32 = fd.as_raw_fd(); |
| 346 | let previous: i32 = libc::fcntl(fd, cmd:libc::F_GETFL); |
| 347 | if previous == -1 { |
| 348 | return Err(io::Error::last_os_error()); |
| 349 | } |
| 350 | |
| 351 | let new: i32 = if nonblocking { |
| 352 | previous | libc::O_NONBLOCK |
| 353 | } else { |
| 354 | previous & !libc::O_NONBLOCK |
| 355 | }; |
| 356 | |
| 357 | let r: i32 = libc::fcntl(fd, cmd:libc::F_SETFL, new); |
| 358 | if r == -1 { |
| 359 | return Err(io::Error::last_os_error()); |
| 360 | } |
| 361 | } |
| 362 | |
| 363 | Ok(()) |
| 364 | } |
| 365 | |
| 366 | pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio> |
| 367 | where |
| 368 | T: IntoRawFd, |
| 369 | { |
| 370 | // Set the fd to nonblocking before we pass it to the event loop |
| 371 | let mut pipe: Pipe = Pipe::from(io); |
| 372 | set_nonblocking(&mut pipe, nonblocking:true)?; |
| 373 | |
| 374 | PollEvented::new(pipe).map(|inner: PollEvented| ChildStdio { inner }) |
| 375 | } |
| 376 | |