1//! Async interface for working with processes.
2//!
3//! This crate is an async version of [`std::process`].
4//!
5//! # Implementation
6//!
7//! A background thread named "async-process" is lazily created on first use, which waits for
8//! spawned child processes to exit and then calls the `wait()` syscall to clean up the "zombie"
9//! processes. This is unlike the `process` API in the standard library, where dropping a running
10//! `Child` leaks its resources.
11//!
12//! This crate uses [`async-io`] for async I/O on Unix-like systems and [`blocking`] for async I/O
13//! on Windows.
14//!
15//! [`async-io`]: https://docs.rs/async-io
16//! [`blocking`]: https://docs.rs/blocking
17//!
18//! # Examples
19//!
20//! Spawn a process and collect its output:
21//!
22//! ```no_run
23//! # futures_lite::future::block_on(async {
24//! use async_process::Command;
25//!
26//! let out = Command::new("echo").arg("hello").arg("world").output().await?;
27//! assert_eq!(out.stdout, b"hello world\n");
28//! # std::io::Result::Ok(()) });
29//! ```
30//!
31//! Read the output line-by-line as it gets produced:
32//!
33//! ```no_run
34//! # futures_lite::future::block_on(async {
35//! use async_process::{Command, Stdio};
36//! use futures_lite::{io::BufReader, prelude::*};
37//!
38//! let mut child = Command::new("find")
39//! .arg(".")
40//! .stdout(Stdio::piped())
41//! .spawn()?;
42//!
43//! let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();
44//!
45//! while let Some(line) = lines.next().await {
46//! println!("{}", line?);
47//! }
48//! # std::io::Result::Ok(()) });
49//! ```
50
51#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
52#![doc(
53 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
54)]
55#![doc(
56 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
57)]
58
59use std::convert::Infallible;
60use std::ffi::OsStr;
61use std::fmt;
62use std::mem;
63use std::path::Path;
64use std::pin::Pin;
65use std::sync::atomic::{AtomicUsize, Ordering};
66use std::sync::{Arc, Mutex};
67use std::task::{Context, Poll};
68use std::thread;
69
70#[cfg(unix)]
71use async_io::Async;
72#[cfg(unix)]
73use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
74
75#[cfg(windows)]
76use blocking::Unblock;
77
78use async_lock::{Mutex as AsyncMutex, OnceCell};
79use event_listener::Event;
80use futures_lite::{future, io, prelude::*};
81
82#[doc(no_inline)]
83pub use std::process::{ExitStatus, Output, Stdio};
84
85#[cfg(unix)]
86pub mod unix;
87#[cfg(windows)]
88pub mod windows;
89
90mod sealed {
91 pub trait Sealed {}
92}
93
94#[cfg(test)]
95static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
96 std::sync::atomic::AtomicBool::new(false);
97
98/// The zombie process reaper.
99///
100/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
101struct Reaper {
102 /// An event delivered every time the SIGCHLD signal occurs.
103 sigchld: Event,
104
105 /// The list of zombie processes.
106 zombies: Mutex<Vec<std::process::Child>>,
107
108 /// The pipe that delivers signal notifications.
109 pipe: Pipe,
110
111 /// Locking this mutex indicates that we are polling the SIGCHLD event.
112 driver_guard: AsyncMutex<()>,
113
114 /// The number of tasks polling the SIGCHLD event.
115 ///
116 /// If this is zero, the `async-process` thread must be spawned.
117 drivers: AtomicUsize,
118
119 /// Number of live `Child` instances currently running.
120 ///
121 /// This is used to prevent the reaper thread from being spawned right as the program closes,
122 /// when the reaper thread isn't needed. This represents the number of active processes.
123 child_count: AtomicUsize,
124}
125
126impl Reaper {
127 /// Get the singleton instance of the reaper.
128 fn get() -> &'static Self {
129 static REAPER: OnceCell<Reaper> = OnceCell::new();
130
131 REAPER.get_or_init_blocking(|| Reaper {
132 sigchld: Event::new(),
133 zombies: Mutex::new(Vec::new()),
134 pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
135 driver_guard: AsyncMutex::new(()),
136 drivers: AtomicUsize::new(0),
137 child_count: AtomicUsize::new(0),
138 })
139 }
140
141 /// Ensure that the reaper is driven.
142 ///
143 /// If there are no active `driver()` callers, this will spawn the `async-process` thread.
144 #[inline]
145 fn ensure_driven(&'static self) {
146 if self
147 .drivers
148 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
149 .is_ok()
150 {
151 self.start_driver_thread();
152 }
153 }
154
155 /// Start the `async-process` thread.
156 #[cold]
157 fn start_driver_thread(&'static self) {
158 #[cfg(test)]
159 DRIVER_THREAD_SPAWNED
160 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
161 .unwrap_or_else(|_| unreachable!("Driver thread already spawned"));
162
163 thread::Builder::new()
164 .name("async-process".to_string())
165 .spawn(move || {
166 let driver = async move {
167 // No need to bump self.drivers, it was already bumped in ensure_driven.
168 let guard = self.driver_guard.lock().await;
169 self.reap(guard).await
170 };
171
172 #[cfg(unix)]
173 async_io::block_on(driver);
174
175 #[cfg(not(unix))]
176 future::block_on(driver);
177 })
178 .expect("cannot spawn async-process thread");
179 }
180
181 /// Reap zombie processes forever.
182 async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
183 loop {
184 // Wait for the next SIGCHLD signal.
185 self.pipe.wait().await;
186
187 // Notify all listeners waiting on the SIGCHLD event.
188 self.sigchld.notify(std::usize::MAX);
189
190 // Reap zombie processes, but make sure we don't hold onto the lock for too long!
191 let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
192 let mut i = 0;
193 'reap_zombies: loop {
194 for _ in 0..50 {
195 if i >= zombies.len() {
196 break 'reap_zombies;
197 }
198
199 if let Ok(None) = zombies[i].try_wait() {
200 i += 1;
201 } else {
202 zombies.swap_remove(i);
203 }
204 }
205
206 // Be a good citizen; yield if there are a lot of processes.
207 //
208 // After we yield, check if there are more zombie processes.
209 future::yield_now().await;
210 zombies.append(&mut self.zombies.lock().unwrap());
211 }
212
213 // Put zombie processes back.
214 self.zombies.lock().unwrap().append(&mut zombies);
215 }
216 }
217
218 /// Register a process with this reaper.
219 fn register(&'static self, child: &std::process::Child) -> io::Result<()> {
220 self.ensure_driven();
221 self.pipe.register(child)
222 }
223}
224
225cfg_if::cfg_if! {
226 if #[cfg(windows)] {
227 use async_channel::{Sender, Receiver, bounded};
228 use std::ffi::c_void;
229 use std::os::windows::io::AsRawHandle;
230
231 use windows_sys::Win32::{
232 Foundation::{BOOLEAN, HANDLE},
233 System::Threading::{
234 RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
235 },
236 };
237
238 /// Waits for the next SIGCHLD signal.
239 struct Pipe {
240 /// The sender channel for the SIGCHLD signal.
241 sender: Sender<()>,
242
243 /// The receiver channel for the SIGCHLD signal.
244 receiver: Receiver<()>,
245 }
246
247 impl Pipe {
248 /// Creates a new pipe.
249 fn new() -> io::Result<Pipe> {
250 let (sender, receiver) = bounded(1);
251 Ok(Pipe {
252 sender,
253 receiver
254 })
255 }
256
257 /// Waits for the next SIGCHLD signal.
258 async fn wait(&self) {
259 self.receiver.recv().await.ok();
260 }
261
262 /// Register a process object into this pipe.
263 fn register(&self, child: &std::process::Child) -> io::Result<()> {
264 // Called when a child exits.
265 unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
266 Reaper::get().pipe.sender.try_send(()).ok();
267 }
268
269 // Register this child process to invoke `callback` on exit.
270 let mut wait_object = 0;
271 let ret = unsafe {
272 RegisterWaitForSingleObject(
273 &mut wait_object,
274 child.as_raw_handle() as HANDLE,
275 Some(callback),
276 std::ptr::null_mut(),
277 INFINITE,
278 WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
279 )
280 };
281
282 if ret == 0 {
283 Err(io::Error::last_os_error())
284 } else {
285 Ok(())
286 }
287 }
288 }
289
290 // Wraps a sync I/O type into an async I/O type.
291 fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
292 Ok(Unblock::new(io))
293 }
294 } else if #[cfg(unix)] {
295 use async_signal::{Signal, Signals};
296
297 /// Waits for the next SIGCHLD signal.
298 struct Pipe {
299 /// The iterator over SIGCHLD signals.
300 signals: Signals,
301 }
302
303 impl Pipe {
304 /// Creates a new pipe.
305 fn new() -> io::Result<Pipe> {
306 Ok(Pipe {
307 signals: Signals::new(Some(Signal::Child))?,
308 })
309 }
310
311 /// Waits for the next SIGCHLD signal.
312 async fn wait(&self) {
313 (&self.signals).next().await;
314 }
315
316 /// Register a process object into this pipe.
317 fn register(&self, _child: &std::process::Child) -> io::Result<()> {
318 Ok(())
319 }
320 }
321
322 /// Wrap a file descriptor into a non-blocking I/O type.
323 fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
324 Async::new(io)
325 }
326 }
327}
328
329/// A guard that can kill child processes, or push them into the zombie list.
330struct ChildGuard {
331 inner: Option<std::process::Child>,
332 reap_on_drop: bool,
333 kill_on_drop: bool,
334 reaper: &'static Reaper,
335}
336
337impl ChildGuard {
338 fn get_mut(&mut self) -> &mut std::process::Child {
339 self.inner.as_mut().unwrap()
340 }
341}
342
343// When the last reference to the child process is dropped, push it into the zombie list.
344impl Drop for ChildGuard {
345 fn drop(&mut self) {
346 if self.kill_on_drop {
347 self.get_mut().kill().ok();
348 }
349 if self.reap_on_drop {
350 let mut zombies: MutexGuard<'_, Vec> = self.reaper.zombies.lock().unwrap();
351 if let Ok(None) = self.get_mut().try_wait() {
352 zombies.push(self.inner.take().unwrap());
353 }
354 }
355
356 // Decrement number of children.
357 self.reaper.child_count.fetch_sub(val:1, order:Ordering::Acquire);
358 }
359}
360
361/// A spawned child process.
362///
363/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
364/// [`output()`][`Child::output()`] to wait for it to exit.
365///
366/// If the [`Child`] is dropped, the process keeps running in the background.
367///
368/// # Examples
369///
370/// Spawn a process and wait for it to complete:
371///
372/// ```no_run
373/// # futures_lite::future::block_on(async {
374/// use async_process::Command;
375///
376/// Command::new("cp").arg("a.txt").arg("b.txt").status().await?;
377/// # std::io::Result::Ok(()) });
378/// ```
379pub struct Child {
380 /// The handle for writing to the child's standard input (stdin), if it has been captured.
381 pub stdin: Option<ChildStdin>,
382
383 /// The handle for reading from the child's standard output (stdout), if it has been captured.
384 pub stdout: Option<ChildStdout>,
385
386 /// The handle for reading from the child's standard error (stderr), if it has been captured.
387 pub stderr: Option<ChildStderr>,
388
389 /// The inner child process handle.
390 child: Arc<Mutex<ChildGuard>>,
391}
392
393impl Child {
394 /// Wraps the inner child process handle and registers it in the global process list.
395 ///
396 /// The "async-process" thread waits for processes in the global list and cleans up the
397 /// resources when they exit.
398 fn new(cmd: &mut Command) -> io::Result<Child> {
399 // Make sure the reaper exists before we spawn the child process.
400 let reaper = Reaper::get();
401 let mut child = cmd.inner.spawn()?;
402
403 // Convert sync I/O types into async I/O types.
404 let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
405 let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
406 let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
407
408 // Bump the child count.
409 reaper.child_count.fetch_add(1, Ordering::Relaxed);
410
411 // Register the child process in the global list.
412 reaper.register(&child)?;
413
414 Ok(Child {
415 stdin,
416 stdout,
417 stderr,
418 child: Arc::new(Mutex::new(ChildGuard {
419 inner: Some(child),
420 reap_on_drop: cmd.reap_on_drop,
421 kill_on_drop: cmd.kill_on_drop,
422 reaper,
423 })),
424 })
425 }
426
427 /// Returns the OS-assigned process identifier associated with this child.
428 ///
429 /// # Examples
430 ///
431 /// ```no_run
432 /// # futures_lite::future::block_on(async {
433 /// use async_process::Command;
434 ///
435 /// let mut child = Command::new("ls").spawn()?;
436 /// println!("id: {}", child.id());
437 /// # std::io::Result::Ok(()) });
438 /// ```
439 pub fn id(&self) -> u32 {
440 self.child.lock().unwrap().get_mut().id()
441 }
442
443 /// Forces the child process to exit.
444 ///
445 /// If the child has already exited, an [`InvalidInput`] error is returned.
446 ///
447 /// This is equivalent to sending a SIGKILL on Unix platforms.
448 ///
449 /// [`InvalidInput`]: `std::io::ErrorKind::InvalidInput`
450 ///
451 /// # Examples
452 ///
453 /// ```no_run
454 /// # futures_lite::future::block_on(async {
455 /// use async_process::Command;
456 ///
457 /// let mut child = Command::new("yes").spawn()?;
458 /// child.kill()?;
459 /// println!("exit status: {}", child.status().await?);
460 /// # std::io::Result::Ok(()) });
461 /// ```
462 pub fn kill(&mut self) -> io::Result<()> {
463 self.child.lock().unwrap().get_mut().kill()
464 }
465
466 /// Returns the exit status if the process has exited.
467 ///
468 /// Unlike [`status()`][`Child::status()`], this method will not drop the stdin handle.
469 ///
470 /// # Examples
471 ///
472 /// ```no_run
473 /// # futures_lite::future::block_on(async {
474 /// use async_process::Command;
475 ///
476 /// let mut child = Command::new("ls").spawn()?;
477 ///
478 /// match child.try_status()? {
479 /// None => println!("still running"),
480 /// Some(status) => println!("exited with: {}", status),
481 /// }
482 /// # std::io::Result::Ok(()) });
483 /// ```
484 pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> {
485 self.child.lock().unwrap().get_mut().try_wait()
486 }
487
488 /// Drops the stdin handle and waits for the process to exit.
489 ///
490 /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
491 /// not block waiting for input from the parent process while the parent waits for the child to
492 /// exit.
493 ///
494 /// # Examples
495 ///
496 /// ```no_run
497 /// # futures_lite::future::block_on(async {
498 /// use async_process::{Command, Stdio};
499 ///
500 /// let mut child = Command::new("cp")
501 /// .arg("a.txt")
502 /// .arg("b.txt")
503 /// .spawn()?;
504 ///
505 /// println!("exit status: {}", child.status().await?);
506 /// # std::io::Result::Ok(()) });
507 /// ```
508 pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
509 self.stdin.take();
510 let child = self.child.clone();
511
512 async move {
513 loop {
514 // Wait on the child process.
515 if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
516 return Ok(status);
517 }
518
519 // Start listening.
520 event_listener::listener!(Reaper::get().sigchld => listener);
521
522 // Try again.
523 if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
524 return Ok(status);
525 }
526
527 // Wait on the listener.
528 listener.await;
529 }
530 }
531 }
532
533 /// Drops the stdin handle and collects the output of the process.
534 ///
535 /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
536 /// not block waiting for input from the parent process while the parent waits for the child to
537 /// exit.
538 ///
539 /// In order to capture the output of the process, [`Command::stdout()`] and
540 /// [`Command::stderr()`] must be configured with [`Stdio::piped()`].
541 ///
542 /// # Examples
543 ///
544 /// ```no_run
545 /// # futures_lite::future::block_on(async {
546 /// use async_process::{Command, Stdio};
547 ///
548 /// let child = Command::new("ls")
549 /// .stdout(Stdio::piped())
550 /// .stderr(Stdio::piped())
551 /// .spawn()?;
552 ///
553 /// let out = child.output().await?;
554 /// # std::io::Result::Ok(()) });
555 /// ```
556 pub fn output(mut self) -> impl Future<Output = io::Result<Output>> {
557 // A future that waits for the exit status.
558 let status = self.status();
559
560 // A future that collects stdout.
561 let stdout = self.stdout.take();
562 let stdout = async move {
563 let mut v = Vec::new();
564 if let Some(mut s) = stdout {
565 s.read_to_end(&mut v).await?;
566 }
567 io::Result::Ok(v)
568 };
569
570 // A future that collects stderr.
571 let stderr = self.stderr.take();
572 let stderr = async move {
573 let mut v = Vec::new();
574 if let Some(mut s) = stderr {
575 s.read_to_end(&mut v).await?;
576 }
577 io::Result::Ok(v)
578 };
579
580 async move {
581 let (stdout, stderr) = future::try_zip(stdout, stderr).await?;
582 let status = status.await?;
583 Ok(Output {
584 status,
585 stdout,
586 stderr,
587 })
588 }
589 }
590}
591
592impl fmt::Debug for Child {
593 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
594 f&mut DebugStruct<'_, '_>.debug_struct("Child")
595 .field("stdin", &self.stdin)
596 .field("stdout", &self.stdout)
597 .field(name:"stderr", &self.stderr)
598 .finish()
599 }
600}
601
602/// A handle to a child process's standard input (stdin).
603///
604/// When a [`ChildStdin`] is dropped, the underlying handle gets clossed. If the child process was
605/// previously blocked on input, it becomes unblocked after dropping.
606#[derive(Debug)]
607pub struct ChildStdin(
608 #[cfg(windows)] Unblock<std::process::ChildStdin>,
609 #[cfg(unix)] Async<std::process::ChildStdin>,
610);
611
612impl ChildStdin {
613 /// Convert async_process::ChildStdin into std::process::Stdio.
614 ///
615 /// You can use it to associate to the next process.
616 ///
617 /// # Examples
618 ///
619 /// ```no_run
620 /// # futures_lite::future::block_on(async {
621 /// use async_process::Command;
622 /// use std::process::Stdio;
623 ///
624 /// let mut ls_child = Command::new("ls").stdin(Stdio::piped()).spawn()?;
625 /// let stdio:Stdio = ls_child.stdin.take().unwrap().into_stdio().await?;
626 ///
627 /// let mut echo_child = Command::new("echo").arg("./").stdout(stdio).spawn()?;
628 ///
629 /// # std::io::Result::Ok(()) });
630 /// ```
631 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
632 cfg_if::cfg_if! {
633 if #[cfg(windows)] {
634 Ok(self.0.into_inner().await.into())
635 } else if #[cfg(unix)] {
636 let child_stdin = self.0.into_inner()?;
637 blocking_fd(rustix::fd::AsFd::as_fd(&child_stdin))?;
638 Ok(child_stdin.into())
639 }
640 }
641 }
642}
643
644impl io::AsyncWrite for ChildStdin {
645 fn poll_write(
646 mut self: Pin<&mut Self>,
647 cx: &mut Context<'_>,
648 buf: &[u8],
649 ) -> Poll<io::Result<usize>> {
650 Pin::new(&mut self.0).poll_write(cx, buf)
651 }
652
653 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
654 Pin::new(&mut self.0).poll_flush(cx)
655 }
656
657 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
658 Pin::new(&mut self.0).poll_close(cx)
659 }
660}
661
662#[cfg(unix)]
663impl AsRawFd for ChildStdin {
664 fn as_raw_fd(&self) -> RawFd {
665 self.0.as_raw_fd()
666 }
667}
668
669#[cfg(unix)]
670impl AsFd for ChildStdin {
671 fn as_fd(&self) -> BorrowedFd<'_> {
672 self.0.as_fd()
673 }
674}
675
676#[cfg(unix)]
677impl TryFrom<ChildStdin> for OwnedFd {
678 type Error = io::Error;
679
680 fn try_from(value: ChildStdin) -> Result<Self, Self::Error> {
681 value.0.try_into()
682 }
683}
684
685// TODO(notgull): Add mirroring AsRawHandle impls for all of the child handles
686//
687// at the moment this is pretty hard to do because of how they're wrapped in
688// Unblock, meaning that we can't always access the underlying handle. async-fs
689// gets around this by putting the handle in an Arc, but there's still some decision
690// to be made about how to handle this (no pun intended)
691
692/// A handle to a child process's standard output (stdout).
693///
694/// When a [`ChildStdout`] is dropped, the underlying handle gets closed.
695#[derive(Debug)]
696pub struct ChildStdout(
697 #[cfg(windows)] Unblock<std::process::ChildStdout>,
698 #[cfg(unix)] Async<std::process::ChildStdout>,
699);
700
701impl ChildStdout {
702 /// Convert async_process::ChildStdout into std::process::Stdio.
703 ///
704 /// You can use it to associate to the next process.
705 ///
706 /// # Examples
707 ///
708 /// ```no_run
709 /// # futures_lite::future::block_on(async {
710 /// use async_process::Command;
711 /// use std::process::Stdio;
712 /// use std::io::Read;
713 /// use futures_lite::AsyncReadExt;
714 ///
715 /// let mut ls_child = Command::new("ls").stdout(Stdio::piped()).spawn()?;
716 /// let stdio:Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
717 ///
718 /// let mut echo_child = Command::new("echo").stdin(stdio).stdout(Stdio::piped()).spawn()?;
719 /// let mut buf = vec![];
720 /// echo_child.stdout.take().unwrap().read(&mut buf).await;
721 /// # std::io::Result::Ok(()) });
722 /// ```
723 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
724 cfg_if::cfg_if! {
725 if #[cfg(windows)] {
726 Ok(self.0.into_inner().await.into())
727 } else if #[cfg(unix)] {
728 let child_stdout = self.0.into_inner()?;
729 blocking_fd(rustix::fd::AsFd::as_fd(&child_stdout))?;
730 Ok(child_stdout.into())
731 }
732 }
733 }
734}
735
736impl io::AsyncRead for ChildStdout {
737 fn poll_read(
738 mut self: Pin<&mut Self>,
739 cx: &mut Context<'_>,
740 buf: &mut [u8],
741 ) -> Poll<io::Result<usize>> {
742 Pin::new(&mut self.0).poll_read(cx, buf)
743 }
744}
745
746#[cfg(unix)]
747impl AsRawFd for ChildStdout {
748 fn as_raw_fd(&self) -> RawFd {
749 self.0.as_raw_fd()
750 }
751}
752
753#[cfg(unix)]
754impl AsFd for ChildStdout {
755 fn as_fd(&self) -> BorrowedFd<'_> {
756 self.0.as_fd()
757 }
758}
759
760#[cfg(unix)]
761impl TryFrom<ChildStdout> for OwnedFd {
762 type Error = io::Error;
763
764 fn try_from(value: ChildStdout) -> Result<Self, Self::Error> {
765 value.0.try_into()
766 }
767}
768
769/// A handle to a child process's standard error (stderr).
770///
771/// When a [`ChildStderr`] is dropped, the underlying handle gets closed.
772#[derive(Debug)]
773pub struct ChildStderr(
774 #[cfg(windows)] Unblock<std::process::ChildStderr>,
775 #[cfg(unix)] Async<std::process::ChildStderr>,
776);
777
778impl ChildStderr {
779 /// Convert async_process::ChildStderr into std::process::Stdio.
780 ///
781 /// You can use it to associate to the next process.
782 ///
783 /// # Examples
784 ///
785 /// ```no_run
786 /// # futures_lite::future::block_on(async {
787 /// use async_process::Command;
788 /// use std::process::Stdio;
789 ///
790 /// let mut ls_child = Command::new("ls").arg("x").stderr(Stdio::piped()).spawn()?;
791 /// let stdio:Stdio = ls_child.stderr.take().unwrap().into_stdio().await?;
792 ///
793 /// let mut echo_child = Command::new("echo").stdin(stdio).spawn()?;
794 /// # std::io::Result::Ok(()) });
795 /// ```
796 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
797 cfg_if::cfg_if! {
798 if #[cfg(windows)] {
799 Ok(self.0.into_inner().await.into())
800 } else if #[cfg(unix)] {
801 let child_stderr = self.0.into_inner()?;
802 blocking_fd(rustix::fd::AsFd::as_fd(&child_stderr))?;
803 Ok(child_stderr.into())
804 }
805 }
806 }
807}
808
809impl io::AsyncRead for ChildStderr {
810 fn poll_read(
811 mut self: Pin<&mut Self>,
812 cx: &mut Context<'_>,
813 buf: &mut [u8],
814 ) -> Poll<io::Result<usize>> {
815 Pin::new(&mut self.0).poll_read(cx, buf)
816 }
817}
818
819#[cfg(unix)]
820impl AsRawFd for ChildStderr {
821 fn as_raw_fd(&self) -> RawFd {
822 self.0.as_raw_fd()
823 }
824}
825
826#[cfg(unix)]
827impl AsFd for ChildStderr {
828 fn as_fd(&self) -> BorrowedFd<'_> {
829 self.0.as_fd()
830 }
831}
832
833#[cfg(unix)]
834impl TryFrom<ChildStderr> for OwnedFd {
835 type Error = io::Error;
836
837 fn try_from(value: ChildStderr) -> Result<Self, Self::Error> {
838 value.0.try_into()
839 }
840}
841
842/// Runs the driver for the asynchronous processes.
843///
844/// This future takes control of global structures related to driving [`Child`]ren and reaping
845/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
846/// making sure zombie processes are successfully waited on.
847///
848/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
849/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
850/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
851/// will be spawned. The "async-process" thread just blocks on this future and will automatically
852/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
853///
854/// This future will never complete. It is intended to be ran on a background task in your
855/// executor of choice.
856///
857/// # Examples
858///
859/// ```no_run
860/// use async_executor::Executor;
861/// use async_process::{driver, Command};
862///
863/// # futures_lite::future::block_on(async {
864/// // Create an executor and run on it.
865/// let ex = Executor::new();
866/// ex.run(async {
867/// // Run the driver future in the background.
868/// ex.spawn(driver()).detach();
869///
870/// // Run a command.
871/// Command::new("ls").output().await.ok();
872/// }).await;
873/// # });
874/// ```
875#[inline]
876pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
877 struct CallOnDrop<F: FnMut()>(F);
878
879 impl<F: FnMut()> Drop for CallOnDrop<F> {
880 fn drop(&mut self) {
881 (self.0)();
882 }
883 }
884
885 async {
886 // Get the reaper.
887 let reaper = Reaper::get();
888
889 // Make sure the reaper knows we're driving it.
890 reaper.drivers.fetch_add(1, Ordering::SeqCst);
891
892 // Decrement the driver count when this future is dropped.
893 let _guard = CallOnDrop(|| {
894 let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);
895
896 // If this was the last driver, and there are still resources actively using the
897 // reaper, make sure that there is a thread driving the reaper.
898 if prev_count == 1
899 && reaper.child_count.load(Ordering::SeqCst) > 0
900 && !reaper
901 .zombies
902 .lock()
903 .unwrap_or_else(|x| x.into_inner())
904 .is_empty()
905 {
906 reaper.ensure_driven();
907 }
908 });
909
910 // Acquire the reaper lock and start polling the SIGCHLD event.
911 let guard = reaper.driver_guard.lock().await;
912 reaper.reap(guard).await
913 }
914}
915
916/// A builder for spawning processes.
917///
918/// # Examples
919///
920/// ```no_run
921/// # futures_lite::future::block_on(async {
922/// use async_process::Command;
923///
924/// let output = if cfg!(target_os = "windows") {
925/// Command::new("cmd").args(&["/C", "echo hello"]).output().await?
926/// } else {
927/// Command::new("sh").arg("-c").arg("echo hello").output().await?
928/// };
929/// # std::io::Result::Ok(()) });
930/// ```
931pub struct Command {
932 inner: std::process::Command,
933 stdin: bool,
934 stdout: bool,
935 stderr: bool,
936 reap_on_drop: bool,
937 kill_on_drop: bool,
938}
939
940impl Command {
941 /// Constructs a new [`Command`] for launching `program`.
942 ///
943 /// The initial configuration (the working directory and environment variables) is inherited
944 /// from the current process.
945 ///
946 /// # Examples
947 ///
948 /// ```
949 /// use async_process::Command;
950 ///
951 /// let mut cmd = Command::new("ls");
952 /// ```
953 pub fn new<S: AsRef<OsStr>>(program: S) -> Command {
954 Self::from(std::process::Command::new(program))
955 }
956
957 /// Adds a single argument to pass to the program.
958 ///
959 /// # Examples
960 ///
961 /// ```
962 /// use async_process::Command;
963 ///
964 /// let mut cmd = Command::new("echo");
965 /// cmd.arg("hello");
966 /// cmd.arg("world");
967 /// ```
968 pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Command {
969 self.inner.arg(arg);
970 self
971 }
972
973 /// Adds multiple arguments to pass to the program.
974 ///
975 /// # Examples
976 ///
977 /// ```
978 /// use async_process::Command;
979 ///
980 /// let mut cmd = Command::new("echo");
981 /// cmd.args(&["hello", "world"]);
982 /// ```
983 pub fn args<I, S>(&mut self, args: I) -> &mut Command
984 where
985 I: IntoIterator<Item = S>,
986 S: AsRef<OsStr>,
987 {
988 self.inner.args(args);
989 self
990 }
991
992 /// Configures an environment variable for the new process.
993 ///
994 /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
995 /// and case-sensitive on all other platforms.
996 ///
997 /// # Examples
998 ///
999 /// ```
1000 /// use async_process::Command;
1001 ///
1002 /// let mut cmd = Command::new("ls");
1003 /// cmd.env("PATH", "/bin");
1004 /// ```
1005 pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Command
1006 where
1007 K: AsRef<OsStr>,
1008 V: AsRef<OsStr>,
1009 {
1010 self.inner.env(key, val);
1011 self
1012 }
1013
1014 /// Configures multiple environment variables for the new process.
1015 ///
1016 /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
1017 /// and case-sensitive on all other platforms.
1018 ///
1019 /// # Examples
1020 ///
1021 /// ```
1022 /// use async_process::Command;
1023 ///
1024 /// let mut cmd = Command::new("ls");
1025 /// cmd.envs(vec![("PATH", "/bin"), ("TERM", "xterm-256color")]);
1026 /// ```
1027 pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Command
1028 where
1029 I: IntoIterator<Item = (K, V)>,
1030 K: AsRef<OsStr>,
1031 V: AsRef<OsStr>,
1032 {
1033 self.inner.envs(vars);
1034 self
1035 }
1036
1037 /// Removes an environment variable mapping.
1038 ///
1039 /// # Examples
1040 ///
1041 /// ```
1042 /// use async_process::Command;
1043 ///
1044 /// let mut cmd = Command::new("ls");
1045 /// cmd.env_remove("PATH");
1046 /// ```
1047 pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Command {
1048 self.inner.env_remove(key);
1049 self
1050 }
1051
1052 /// Removes all environment variable mappings.
1053 ///
1054 /// # Examples
1055 ///
1056 /// ```
1057 /// use async_process::Command;
1058 ///
1059 /// let mut cmd = Command::new("ls");
1060 /// cmd.env_clear();
1061 /// ```
1062 pub fn env_clear(&mut self) -> &mut Command {
1063 self.inner.env_clear();
1064 self
1065 }
1066
1067 /// Configures the working directory for the new process.
1068 ///
1069 /// # Examples
1070 ///
1071 /// ```
1072 /// use async_process::Command;
1073 ///
1074 /// let mut cmd = Command::new("ls");
1075 /// cmd.current_dir("/");
1076 /// ```
1077 pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Command {
1078 self.inner.current_dir(dir);
1079 self
1080 }
1081
1082 /// Configures the standard input (stdin) for the new process.
1083 ///
1084 /// # Examples
1085 ///
1086 /// ```
1087 /// use async_process::{Command, Stdio};
1088 ///
1089 /// let mut cmd = Command::new("cat");
1090 /// cmd.stdin(Stdio::null());
1091 /// ```
1092 pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
1093 self.stdin = true;
1094 self.inner.stdin(cfg);
1095 self
1096 }
1097
1098 /// Configures the standard output (stdout) for the new process.
1099 ///
1100 /// # Examples
1101 ///
1102 /// ```
1103 /// use async_process::{Command, Stdio};
1104 ///
1105 /// let mut cmd = Command::new("ls");
1106 /// cmd.stdout(Stdio::piped());
1107 /// ```
1108 pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
1109 self.stdout = true;
1110 self.inner.stdout(cfg);
1111 self
1112 }
1113
1114 /// Configures the standard error (stderr) for the new process.
1115 ///
1116 /// # Examples
1117 ///
1118 /// ```
1119 /// use async_process::{Command, Stdio};
1120 ///
1121 /// let mut cmd = Command::new("ls");
1122 /// cmd.stderr(Stdio::piped());
1123 /// ```
1124 pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
1125 self.stderr = true;
1126 self.inner.stderr(cfg);
1127 self
1128 }
1129
1130 /// Configures whether to reap the zombie process when [`Child`] is dropped.
1131 ///
1132 /// When the process finishes, it becomes a "zombie" and some resources associated with it
1133 /// remain until [`Child::try_status()`], [`Child::status()`], or [`Child::output()`] collects
1134 /// its exit code.
1135 ///
1136 /// If its exit code is never collected, the resources may leak forever. This crate has a
1137 /// background thread named "async-process" that collects such "zombie" processes and then
1138 /// "reaps" them, thus preventing the resource leaks.
1139 ///
1140 /// The default value of this option is `true`.
1141 ///
1142 /// # Examples
1143 ///
1144 /// ```
1145 /// use async_process::{Command, Stdio};
1146 ///
1147 /// let mut cmd = Command::new("cat");
1148 /// cmd.reap_on_drop(false);
1149 /// ```
1150 pub fn reap_on_drop(&mut self, reap_on_drop: bool) -> &mut Command {
1151 self.reap_on_drop = reap_on_drop;
1152 self
1153 }
1154
1155 /// Configures whether to kill the process when [`Child`] is dropped.
1156 ///
1157 /// The default value of this option is `false`.
1158 ///
1159 /// # Examples
1160 ///
1161 /// ```
1162 /// use async_process::{Command, Stdio};
1163 ///
1164 /// let mut cmd = Command::new("cat");
1165 /// cmd.kill_on_drop(true);
1166 /// ```
1167 pub fn kill_on_drop(&mut self, kill_on_drop: bool) -> &mut Command {
1168 self.kill_on_drop = kill_on_drop;
1169 self
1170 }
1171
1172 /// Executes the command and returns the [`Child`] handle to it.
1173 ///
1174 /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1175 ///
1176 /// # Examples
1177 ///
1178 /// ```no_run
1179 /// # futures_lite::future::block_on(async {
1180 /// use async_process::Command;
1181 ///
1182 /// let child = Command::new("ls").spawn()?;
1183 /// # std::io::Result::Ok(()) });
1184 /// ```
1185 pub fn spawn(&mut self) -> io::Result<Child> {
1186 if !self.stdin {
1187 self.inner.stdin(Stdio::inherit());
1188 }
1189 if !self.stdout {
1190 self.inner.stdout(Stdio::inherit());
1191 }
1192 if !self.stderr {
1193 self.inner.stderr(Stdio::inherit());
1194 }
1195
1196 Child::new(self)
1197 }
1198
1199 /// Executes the command, waits for it to exit, and returns the exit status.
1200 ///
1201 /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1202 ///
1203 /// # Examples
1204 ///
1205 /// ```no_run
1206 /// # futures_lite::future::block_on(async {
1207 /// use async_process::Command;
1208 ///
1209 /// let status = Command::new("cp")
1210 /// .arg("a.txt")
1211 /// .arg("b.txt")
1212 /// .status()
1213 /// .await?;
1214 /// # std::io::Result::Ok(()) });
1215 /// ```
1216 pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
1217 let child = self.spawn();
1218 async { child?.status().await }
1219 }
1220
1221 /// Executes the command and collects its output.
1222 ///
1223 /// If not configured, stdin will be set to [`Stdio::null()`], and stdout and stderr will be
1224 /// set to [`Stdio::piped()`].
1225 ///
1226 /// # Examples
1227 ///
1228 /// ```no_run
1229 /// # futures_lite::future::block_on(async {
1230 /// use async_process::Command;
1231 ///
1232 /// let output = Command::new("cat")
1233 /// .arg("a.txt")
1234 /// .output()
1235 /// .await?;
1236 /// # std::io::Result::Ok(()) });
1237 /// ```
1238 pub fn output(&mut self) -> impl Future<Output = io::Result<Output>> {
1239 if !self.stdin {
1240 self.inner.stdin(Stdio::null());
1241 }
1242 if !self.stdout {
1243 self.inner.stdout(Stdio::piped());
1244 }
1245 if !self.stderr {
1246 self.inner.stderr(Stdio::piped());
1247 }
1248
1249 let child = Child::new(self);
1250 async { child?.output().await }
1251 }
1252}
1253
1254impl From<std::process::Command> for Command {
1255 fn from(inner: std::process::Command) -> Self {
1256 Self {
1257 inner,
1258 stdin: false,
1259 stdout: false,
1260 stderr: false,
1261 reap_on_drop: true,
1262 kill_on_drop: false,
1263 }
1264 }
1265}
1266
1267impl fmt::Debug for Command {
1268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1269 if f.alternate() {
1270 f&mut DebugStruct<'_, '_>.debug_struct("Command")
1271 .field("inner", &self.inner)
1272 .field("stdin", &self.stdin)
1273 .field("stdout", &self.stdout)
1274 .field("stderr", &self.stderr)
1275 .field("reap_on_drop", &self.reap_on_drop)
1276 .field(name:"kill_on_drop", &self.kill_on_drop)
1277 .finish()
1278 } else {
1279 // Stdlib outputs command-line in Debug for Command. This does the
1280 // same, if not in "alternate" (long pretty-printed) mode.
1281 // This is useful for logs, for example.
1282 fmt::Debug::fmt(&self.inner, f)
1283 }
1284 }
1285}
1286
1287/// Moves `Fd` out of non-blocking mode.
1288#[cfg(unix)]
1289fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
1290 cfg_if::cfg_if! {
1291 // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
1292 // for now, as with the standard library, because it seems to behave
1293 // differently depending on the platform.
1294 // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
1295 // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
1296 // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
1297 if #[cfg(target_os = "linux")] {
1298 rustix::io::ioctl_fionbio(fd, false)?;
1299 } else {
1300 let previous = rustix::fs::fcntl_getfl(fd)?;
1301 let new = previous & !rustix::fs::OFlags::NONBLOCK;
1302 if new != previous {
1303 rustix::fs::fcntl_setfl(fd, new)?;
1304 }
1305 }
1306 }
1307 Ok(())
1308}
1309
1310#[cfg(test)]
1311mod test {
1312 #[test]
1313 fn polled_driver() {
1314 use super::{driver, Command};
1315 use futures_lite::future;
1316 use futures_lite::prelude::*;
1317
1318 let is_thread_spawned =
1319 || super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
1320
1321 #[cfg(unix)]
1322 fn command() -> Command {
1323 let mut cmd = Command::new("sh");
1324 cmd.arg("-c").arg("echo hello");
1325 cmd
1326 }
1327
1328 #[cfg(windows)]
1329 fn command() -> Command {
1330 let mut cmd = Command::new("cmd");
1331 cmd.arg("/C").arg("echo hello");
1332 cmd
1333 }
1334
1335 #[cfg(unix)]
1336 const OUTPUT: &[u8] = b"hello\n";
1337 #[cfg(windows)]
1338 const OUTPUT: &[u8] = b"hello\r\n";
1339
1340 future::block_on(async {
1341 // Thread should not be spawned off the bat.
1342 assert!(!is_thread_spawned());
1343
1344 // Spawn a driver.
1345 let mut driver1 = Box::pin(driver());
1346 future::poll_once(&mut driver1).await;
1347 assert!(!is_thread_spawned());
1348
1349 // We should be able to run the driver in parallel with a process future.
1350 async {
1351 (&mut driver1).await;
1352 }
1353 .or(async {
1354 let output = command().output().await.unwrap();
1355 assert_eq!(output.stdout, OUTPUT);
1356 })
1357 .await;
1358 assert!(!is_thread_spawned());
1359
1360 // Spawn a second driver.
1361 let mut driver2 = Box::pin(driver());
1362 future::poll_once(&mut driver2).await;
1363 assert!(!is_thread_spawned());
1364
1365 // Poll both drivers in parallel.
1366 async {
1367 (&mut driver1).await;
1368 }
1369 .or(async {
1370 (&mut driver2).await;
1371 })
1372 .or(async {
1373 let output = command().output().await.unwrap();
1374 assert_eq!(output.stdout, OUTPUT);
1375 })
1376 .await;
1377 assert!(!is_thread_spawned());
1378
1379 // Once one is dropped, the other should take over.
1380 drop(driver1);
1381 assert!(!is_thread_spawned());
1382
1383 // Poll driver2 in parallel with a process future.
1384 async {
1385 (&mut driver2).await;
1386 }
1387 .or(async {
1388 let output = command().output().await.unwrap();
1389 assert_eq!(output.stdout, OUTPUT);
1390 })
1391 .await;
1392 assert!(!is_thread_spawned());
1393
1394 // Once driver2 is dropped, the thread should not be spawned, as there are no active
1395 // child processes..
1396 drop(driver2);
1397 assert!(!is_thread_spawned());
1398
1399 // We should now be able to poll the process future independently, it will spawn the
1400 // thread.
1401 let output = command().output().await.unwrap();
1402 assert_eq!(output.stdout, OUTPUT);
1403 assert!(is_thread_spawned());
1404 });
1405 }
1406}
1407