1 | //! Async interface for working with processes. |
2 | //! |
3 | //! This crate is an async version of [`std::process`]. |
4 | //! |
5 | //! # Implementation |
6 | //! |
7 | //! A background thread named "async-process" is lazily created on first use, which waits for |
8 | //! spawned child processes to exit and then calls the `wait()` syscall to clean up the "zombie" |
9 | //! processes. This is unlike the `process` API in the standard library, where dropping a running |
10 | //! `Child` leaks its resources. |
11 | //! |
12 | //! This crate uses [`async-io`] for async I/O on Unix-like systems and [`blocking`] for async I/O |
13 | //! on Windows. |
14 | //! |
15 | //! [`async-io`]: https://docs.rs/async-io |
16 | //! [`blocking`]: https://docs.rs/blocking |
17 | //! |
18 | //! # Examples |
19 | //! |
20 | //! Spawn a process and collect its output: |
21 | //! |
22 | //! ```no_run |
23 | //! # futures_lite::future::block_on(async { |
24 | //! use async_process::Command; |
25 | //! |
26 | //! let out = Command::new("echo" ).arg("hello" ).arg("world" ).output().await?; |
27 | //! assert_eq!(out.stdout, b"hello world \n" ); |
28 | //! # std::io::Result::Ok(()) }); |
29 | //! ``` |
30 | //! |
31 | //! Read the output line-by-line as it gets produced: |
32 | //! |
33 | //! ```no_run |
34 | //! # futures_lite::future::block_on(async { |
35 | //! use async_process::{Command, Stdio}; |
36 | //! use futures_lite::{io::BufReader, prelude::*}; |
37 | //! |
38 | //! let mut child = Command::new("find" ) |
39 | //! .arg("." ) |
40 | //! .stdout(Stdio::piped()) |
41 | //! .spawn()?; |
42 | //! |
43 | //! let mut lines = BufReader::new(child.stdout.take().unwrap()).lines(); |
44 | //! |
45 | //! while let Some(line) = lines.next().await { |
46 | //! println!("{}" , line?); |
47 | //! } |
48 | //! # std::io::Result::Ok(()) }); |
49 | //! ``` |
50 | |
51 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
52 | #![doc ( |
53 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
54 | )] |
55 | #![doc ( |
56 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
57 | )] |
58 | |
59 | use std::convert::Infallible; |
60 | use std::ffi::OsStr; |
61 | use std::fmt; |
62 | use std::mem; |
63 | use std::path::Path; |
64 | use std::pin::Pin; |
65 | use std::sync::atomic::{AtomicUsize, Ordering}; |
66 | use std::sync::{Arc, Mutex}; |
67 | use std::task::{Context, Poll}; |
68 | use std::thread; |
69 | |
70 | #[cfg (unix)] |
71 | use async_io::Async; |
72 | #[cfg (unix)] |
73 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; |
74 | |
75 | #[cfg (windows)] |
76 | use blocking::Unblock; |
77 | |
78 | use async_lock::{Mutex as AsyncMutex, OnceCell}; |
79 | use event_listener::Event; |
80 | use futures_lite::{future, io, prelude::*}; |
81 | |
82 | #[doc (no_inline)] |
83 | pub use std::process::{ExitStatus, Output, Stdio}; |
84 | |
85 | #[cfg (unix)] |
86 | pub mod unix; |
87 | #[cfg (windows)] |
88 | pub mod windows; |
89 | |
90 | mod sealed { |
91 | pub trait Sealed {} |
92 | } |
93 | |
94 | #[cfg (test)] |
95 | static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool = |
96 | std::sync::atomic::AtomicBool::new(false); |
97 | |
98 | /// The zombie process reaper. |
99 | /// |
100 | /// This structure reaps zombie processes and emits the `SIGCHLD` signal. |
101 | struct Reaper { |
102 | /// An event delivered every time the SIGCHLD signal occurs. |
103 | sigchld: Event, |
104 | |
105 | /// The list of zombie processes. |
106 | zombies: Mutex<Vec<std::process::Child>>, |
107 | |
108 | /// The pipe that delivers signal notifications. |
109 | pipe: Pipe, |
110 | |
111 | /// Locking this mutex indicates that we are polling the SIGCHLD event. |
112 | driver_guard: AsyncMutex<()>, |
113 | |
114 | /// The number of tasks polling the SIGCHLD event. |
115 | /// |
116 | /// If this is zero, the `async-process` thread must be spawned. |
117 | drivers: AtomicUsize, |
118 | |
119 | /// Number of live `Child` instances currently running. |
120 | /// |
121 | /// This is used to prevent the reaper thread from being spawned right as the program closes, |
122 | /// when the reaper thread isn't needed. This represents the number of active processes. |
123 | child_count: AtomicUsize, |
124 | } |
125 | |
126 | impl Reaper { |
127 | /// Get the singleton instance of the reaper. |
128 | fn get() -> &'static Self { |
129 | static REAPER: OnceCell<Reaper> = OnceCell::new(); |
130 | |
131 | REAPER.get_or_init_blocking(|| Reaper { |
132 | sigchld: Event::new(), |
133 | zombies: Mutex::new(Vec::new()), |
134 | pipe: Pipe::new().expect("cannot create SIGCHLD pipe" ), |
135 | driver_guard: AsyncMutex::new(()), |
136 | drivers: AtomicUsize::new(0), |
137 | child_count: AtomicUsize::new(0), |
138 | }) |
139 | } |
140 | |
141 | /// Ensure that the reaper is driven. |
142 | /// |
143 | /// If there are no active `driver()` callers, this will spawn the `async-process` thread. |
144 | #[inline ] |
145 | fn ensure_driven(&'static self) { |
146 | if self |
147 | .drivers |
148 | .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire) |
149 | .is_ok() |
150 | { |
151 | self.start_driver_thread(); |
152 | } |
153 | } |
154 | |
155 | /// Start the `async-process` thread. |
156 | #[cold ] |
157 | fn start_driver_thread(&'static self) { |
158 | #[cfg (test)] |
159 | DRIVER_THREAD_SPAWNED |
160 | .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) |
161 | .unwrap_or_else(|_| unreachable!("Driver thread already spawned" )); |
162 | |
163 | thread::Builder::new() |
164 | .name("async-process" .to_string()) |
165 | .spawn(move || { |
166 | let driver = async move { |
167 | // No need to bump self.drivers, it was already bumped in ensure_driven. |
168 | let guard = self.driver_guard.lock().await; |
169 | self.reap(guard).await |
170 | }; |
171 | |
172 | #[cfg (unix)] |
173 | async_io::block_on(driver); |
174 | |
175 | #[cfg (not(unix))] |
176 | future::block_on(driver); |
177 | }) |
178 | .expect("cannot spawn async-process thread" ); |
179 | } |
180 | |
181 | /// Reap zombie processes forever. |
182 | async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! { |
183 | loop { |
184 | // Wait for the next SIGCHLD signal. |
185 | self.pipe.wait().await; |
186 | |
187 | // Notify all listeners waiting on the SIGCHLD event. |
188 | self.sigchld.notify(std::usize::MAX); |
189 | |
190 | // Reap zombie processes, but make sure we don't hold onto the lock for too long! |
191 | let mut zombies = mem::take(&mut *self.zombies.lock().unwrap()); |
192 | let mut i = 0; |
193 | 'reap_zombies: loop { |
194 | for _ in 0..50 { |
195 | if i >= zombies.len() { |
196 | break 'reap_zombies; |
197 | } |
198 | |
199 | if let Ok(None) = zombies[i].try_wait() { |
200 | i += 1; |
201 | } else { |
202 | zombies.swap_remove(i); |
203 | } |
204 | } |
205 | |
206 | // Be a good citizen; yield if there are a lot of processes. |
207 | // |
208 | // After we yield, check if there are more zombie processes. |
209 | future::yield_now().await; |
210 | zombies.append(&mut self.zombies.lock().unwrap()); |
211 | } |
212 | |
213 | // Put zombie processes back. |
214 | self.zombies.lock().unwrap().append(&mut zombies); |
215 | } |
216 | } |
217 | |
218 | /// Register a process with this reaper. |
219 | fn register(&'static self, child: &std::process::Child) -> io::Result<()> { |
220 | self.ensure_driven(); |
221 | self.pipe.register(child) |
222 | } |
223 | } |
224 | |
225 | cfg_if::cfg_if! { |
226 | if #[cfg(windows)] { |
227 | use async_channel::{Sender, Receiver, bounded}; |
228 | use std::ffi::c_void; |
229 | use std::os::windows::io::AsRawHandle; |
230 | |
231 | use windows_sys::Win32::{ |
232 | Foundation::{BOOLEAN, HANDLE}, |
233 | System::Threading::{ |
234 | RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, |
235 | }, |
236 | }; |
237 | |
238 | /// Waits for the next SIGCHLD signal. |
239 | struct Pipe { |
240 | /// The sender channel for the SIGCHLD signal. |
241 | sender: Sender<()>, |
242 | |
243 | /// The receiver channel for the SIGCHLD signal. |
244 | receiver: Receiver<()>, |
245 | } |
246 | |
247 | impl Pipe { |
248 | /// Creates a new pipe. |
249 | fn new() -> io::Result<Pipe> { |
250 | let (sender, receiver) = bounded(1); |
251 | Ok(Pipe { |
252 | sender, |
253 | receiver |
254 | }) |
255 | } |
256 | |
257 | /// Waits for the next SIGCHLD signal. |
258 | async fn wait(&self) { |
259 | self.receiver.recv().await.ok(); |
260 | } |
261 | |
262 | /// Register a process object into this pipe. |
263 | fn register(&self, child: &std::process::Child) -> io::Result<()> { |
264 | // Called when a child exits. |
265 | unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { |
266 | Reaper::get().pipe.sender.try_send(()).ok(); |
267 | } |
268 | |
269 | // Register this child process to invoke `callback` on exit. |
270 | let mut wait_object = 0; |
271 | let ret = unsafe { |
272 | RegisterWaitForSingleObject( |
273 | &mut wait_object, |
274 | child.as_raw_handle() as HANDLE, |
275 | Some(callback), |
276 | std::ptr::null_mut(), |
277 | INFINITE, |
278 | WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, |
279 | ) |
280 | }; |
281 | |
282 | if ret == 0 { |
283 | Err(io::Error::last_os_error()) |
284 | } else { |
285 | Ok(()) |
286 | } |
287 | } |
288 | } |
289 | |
290 | // Wraps a sync I/O type into an async I/O type. |
291 | fn wrap<T>(io: T) -> io::Result<Unblock<T>> { |
292 | Ok(Unblock::new(io)) |
293 | } |
294 | } else if #[cfg(unix)] { |
295 | use async_signal::{Signal, Signals}; |
296 | |
297 | /// Waits for the next SIGCHLD signal. |
298 | struct Pipe { |
299 | /// The iterator over SIGCHLD signals. |
300 | signals: Signals, |
301 | } |
302 | |
303 | impl Pipe { |
304 | /// Creates a new pipe. |
305 | fn new() -> io::Result<Pipe> { |
306 | Ok(Pipe { |
307 | signals: Signals::new(Some(Signal::Child))?, |
308 | }) |
309 | } |
310 | |
311 | /// Waits for the next SIGCHLD signal. |
312 | async fn wait(&self) { |
313 | (&self.signals).next().await; |
314 | } |
315 | |
316 | /// Register a process object into this pipe. |
317 | fn register(&self, _child: &std::process::Child) -> io::Result<()> { |
318 | Ok(()) |
319 | } |
320 | } |
321 | |
322 | /// Wrap a file descriptor into a non-blocking I/O type. |
323 | fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> { |
324 | Async::new(io) |
325 | } |
326 | } |
327 | } |
328 | |
329 | /// A guard that can kill child processes, or push them into the zombie list. |
330 | struct ChildGuard { |
331 | inner: Option<std::process::Child>, |
332 | reap_on_drop: bool, |
333 | kill_on_drop: bool, |
334 | reaper: &'static Reaper, |
335 | } |
336 | |
337 | impl ChildGuard { |
338 | fn get_mut(&mut self) -> &mut std::process::Child { |
339 | self.inner.as_mut().unwrap() |
340 | } |
341 | } |
342 | |
343 | // When the last reference to the child process is dropped, push it into the zombie list. |
344 | impl Drop for ChildGuard { |
345 | fn drop(&mut self) { |
346 | if self.kill_on_drop { |
347 | self.get_mut().kill().ok(); |
348 | } |
349 | if self.reap_on_drop { |
350 | let mut zombies: MutexGuard<'_, Vec> = self.reaper.zombies.lock().unwrap(); |
351 | if let Ok(None) = self.get_mut().try_wait() { |
352 | zombies.push(self.inner.take().unwrap()); |
353 | } |
354 | } |
355 | |
356 | // Decrement number of children. |
357 | self.reaper.child_count.fetch_sub(val:1, order:Ordering::Acquire); |
358 | } |
359 | } |
360 | |
361 | /// A spawned child process. |
362 | /// |
363 | /// The process can be in running or exited state. Use [`status()`][`Child::status()`] or |
364 | /// [`output()`][`Child::output()`] to wait for it to exit. |
365 | /// |
366 | /// If the [`Child`] is dropped, the process keeps running in the background. |
367 | /// |
368 | /// # Examples |
369 | /// |
370 | /// Spawn a process and wait for it to complete: |
371 | /// |
372 | /// ```no_run |
373 | /// # futures_lite::future::block_on(async { |
374 | /// use async_process::Command; |
375 | /// |
376 | /// Command::new("cp" ).arg("a.txt" ).arg("b.txt" ).status().await?; |
377 | /// # std::io::Result::Ok(()) }); |
378 | /// ``` |
379 | pub struct Child { |
380 | /// The handle for writing to the child's standard input (stdin), if it has been captured. |
381 | pub stdin: Option<ChildStdin>, |
382 | |
383 | /// The handle for reading from the child's standard output (stdout), if it has been captured. |
384 | pub stdout: Option<ChildStdout>, |
385 | |
386 | /// The handle for reading from the child's standard error (stderr), if it has been captured. |
387 | pub stderr: Option<ChildStderr>, |
388 | |
389 | /// The inner child process handle. |
390 | child: Arc<Mutex<ChildGuard>>, |
391 | } |
392 | |
393 | impl Child { |
394 | /// Wraps the inner child process handle and registers it in the global process list. |
395 | /// |
396 | /// The "async-process" thread waits for processes in the global list and cleans up the |
397 | /// resources when they exit. |
398 | fn new(cmd: &mut Command) -> io::Result<Child> { |
399 | // Make sure the reaper exists before we spawn the child process. |
400 | let reaper = Reaper::get(); |
401 | let mut child = cmd.inner.spawn()?; |
402 | |
403 | // Convert sync I/O types into async I/O types. |
404 | let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin); |
405 | let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout); |
406 | let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr); |
407 | |
408 | // Bump the child count. |
409 | reaper.child_count.fetch_add(1, Ordering::Relaxed); |
410 | |
411 | // Register the child process in the global list. |
412 | reaper.register(&child)?; |
413 | |
414 | Ok(Child { |
415 | stdin, |
416 | stdout, |
417 | stderr, |
418 | child: Arc::new(Mutex::new(ChildGuard { |
419 | inner: Some(child), |
420 | reap_on_drop: cmd.reap_on_drop, |
421 | kill_on_drop: cmd.kill_on_drop, |
422 | reaper, |
423 | })), |
424 | }) |
425 | } |
426 | |
427 | /// Returns the OS-assigned process identifier associated with this child. |
428 | /// |
429 | /// # Examples |
430 | /// |
431 | /// ```no_run |
432 | /// # futures_lite::future::block_on(async { |
433 | /// use async_process::Command; |
434 | /// |
435 | /// let mut child = Command::new("ls" ).spawn()?; |
436 | /// println!("id: {}" , child.id()); |
437 | /// # std::io::Result::Ok(()) }); |
438 | /// ``` |
439 | pub fn id(&self) -> u32 { |
440 | self.child.lock().unwrap().get_mut().id() |
441 | } |
442 | |
443 | /// Forces the child process to exit. |
444 | /// |
445 | /// If the child has already exited, an [`InvalidInput`] error is returned. |
446 | /// |
447 | /// This is equivalent to sending a SIGKILL on Unix platforms. |
448 | /// |
449 | /// [`InvalidInput`]: `std::io::ErrorKind::InvalidInput` |
450 | /// |
451 | /// # Examples |
452 | /// |
453 | /// ```no_run |
454 | /// # futures_lite::future::block_on(async { |
455 | /// use async_process::Command; |
456 | /// |
457 | /// let mut child = Command::new("yes" ).spawn()?; |
458 | /// child.kill()?; |
459 | /// println!("exit status: {}" , child.status().await?); |
460 | /// # std::io::Result::Ok(()) }); |
461 | /// ``` |
462 | pub fn kill(&mut self) -> io::Result<()> { |
463 | self.child.lock().unwrap().get_mut().kill() |
464 | } |
465 | |
466 | /// Returns the exit status if the process has exited. |
467 | /// |
468 | /// Unlike [`status()`][`Child::status()`], this method will not drop the stdin handle. |
469 | /// |
470 | /// # Examples |
471 | /// |
472 | /// ```no_run |
473 | /// # futures_lite::future::block_on(async { |
474 | /// use async_process::Command; |
475 | /// |
476 | /// let mut child = Command::new("ls" ).spawn()?; |
477 | /// |
478 | /// match child.try_status()? { |
479 | /// None => println!("still running" ), |
480 | /// Some(status) => println!("exited with: {}" , status), |
481 | /// } |
482 | /// # std::io::Result::Ok(()) }); |
483 | /// ``` |
484 | pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> { |
485 | self.child.lock().unwrap().get_mut().try_wait() |
486 | } |
487 | |
488 | /// Drops the stdin handle and waits for the process to exit. |
489 | /// |
490 | /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does |
491 | /// not block waiting for input from the parent process while the parent waits for the child to |
492 | /// exit. |
493 | /// |
494 | /// # Examples |
495 | /// |
496 | /// ```no_run |
497 | /// # futures_lite::future::block_on(async { |
498 | /// use async_process::{Command, Stdio}; |
499 | /// |
500 | /// let mut child = Command::new("cp" ) |
501 | /// .arg("a.txt" ) |
502 | /// .arg("b.txt" ) |
503 | /// .spawn()?; |
504 | /// |
505 | /// println!("exit status: {}" , child.status().await?); |
506 | /// # std::io::Result::Ok(()) }); |
507 | /// ``` |
508 | pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> { |
509 | self.stdin.take(); |
510 | let child = self.child.clone(); |
511 | |
512 | async move { |
513 | loop { |
514 | // Wait on the child process. |
515 | if let Some(status) = child.lock().unwrap().get_mut().try_wait()? { |
516 | return Ok(status); |
517 | } |
518 | |
519 | // Start listening. |
520 | event_listener::listener!(Reaper::get().sigchld => listener); |
521 | |
522 | // Try again. |
523 | if let Some(status) = child.lock().unwrap().get_mut().try_wait()? { |
524 | return Ok(status); |
525 | } |
526 | |
527 | // Wait on the listener. |
528 | listener.await; |
529 | } |
530 | } |
531 | } |
532 | |
533 | /// Drops the stdin handle and collects the output of the process. |
534 | /// |
535 | /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does |
536 | /// not block waiting for input from the parent process while the parent waits for the child to |
537 | /// exit. |
538 | /// |
539 | /// In order to capture the output of the process, [`Command::stdout()`] and |
540 | /// [`Command::stderr()`] must be configured with [`Stdio::piped()`]. |
541 | /// |
542 | /// # Examples |
543 | /// |
544 | /// ```no_run |
545 | /// # futures_lite::future::block_on(async { |
546 | /// use async_process::{Command, Stdio}; |
547 | /// |
548 | /// let child = Command::new("ls" ) |
549 | /// .stdout(Stdio::piped()) |
550 | /// .stderr(Stdio::piped()) |
551 | /// .spawn()?; |
552 | /// |
553 | /// let out = child.output().await?; |
554 | /// # std::io::Result::Ok(()) }); |
555 | /// ``` |
556 | pub fn output(mut self) -> impl Future<Output = io::Result<Output>> { |
557 | // A future that waits for the exit status. |
558 | let status = self.status(); |
559 | |
560 | // A future that collects stdout. |
561 | let stdout = self.stdout.take(); |
562 | let stdout = async move { |
563 | let mut v = Vec::new(); |
564 | if let Some(mut s) = stdout { |
565 | s.read_to_end(&mut v).await?; |
566 | } |
567 | io::Result::Ok(v) |
568 | }; |
569 | |
570 | // A future that collects stderr. |
571 | let stderr = self.stderr.take(); |
572 | let stderr = async move { |
573 | let mut v = Vec::new(); |
574 | if let Some(mut s) = stderr { |
575 | s.read_to_end(&mut v).await?; |
576 | } |
577 | io::Result::Ok(v) |
578 | }; |
579 | |
580 | async move { |
581 | let (stdout, stderr) = future::try_zip(stdout, stderr).await?; |
582 | let status = status.await?; |
583 | Ok(Output { |
584 | status, |
585 | stdout, |
586 | stderr, |
587 | }) |
588 | } |
589 | } |
590 | } |
591 | |
592 | impl fmt::Debug for Child { |
593 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
594 | f&mut DebugStruct<'_, '_>.debug_struct("Child" ) |
595 | .field("stdin" , &self.stdin) |
596 | .field("stdout" , &self.stdout) |
597 | .field(name:"stderr" , &self.stderr) |
598 | .finish() |
599 | } |
600 | } |
601 | |
602 | /// A handle to a child process's standard input (stdin). |
603 | /// |
604 | /// When a [`ChildStdin`] is dropped, the underlying handle gets clossed. If the child process was |
605 | /// previously blocked on input, it becomes unblocked after dropping. |
606 | #[derive (Debug)] |
607 | pub struct ChildStdin( |
608 | #[cfg (windows)] Unblock<std::process::ChildStdin>, |
609 | #[cfg (unix)] Async<std::process::ChildStdin>, |
610 | ); |
611 | |
612 | impl ChildStdin { |
613 | /// Convert async_process::ChildStdin into std::process::Stdio. |
614 | /// |
615 | /// You can use it to associate to the next process. |
616 | /// |
617 | /// # Examples |
618 | /// |
619 | /// ```no_run |
620 | /// # futures_lite::future::block_on(async { |
621 | /// use async_process::Command; |
622 | /// use std::process::Stdio; |
623 | /// |
624 | /// let mut ls_child = Command::new("ls" ).stdin(Stdio::piped()).spawn()?; |
625 | /// let stdio:Stdio = ls_child.stdin.take().unwrap().into_stdio().await?; |
626 | /// |
627 | /// let mut echo_child = Command::new("echo" ).arg("./" ).stdout(stdio).spawn()?; |
628 | /// |
629 | /// # std::io::Result::Ok(()) }); |
630 | /// ``` |
631 | pub async fn into_stdio(self) -> io::Result<std::process::Stdio> { |
632 | cfg_if::cfg_if! { |
633 | if #[cfg(windows)] { |
634 | Ok(self.0.into_inner().await.into()) |
635 | } else if #[cfg(unix)] { |
636 | let child_stdin = self.0.into_inner()?; |
637 | blocking_fd(rustix::fd::AsFd::as_fd(&child_stdin))?; |
638 | Ok(child_stdin.into()) |
639 | } |
640 | } |
641 | } |
642 | } |
643 | |
644 | impl io::AsyncWrite for ChildStdin { |
645 | fn poll_write( |
646 | mut self: Pin<&mut Self>, |
647 | cx: &mut Context<'_>, |
648 | buf: &[u8], |
649 | ) -> Poll<io::Result<usize>> { |
650 | Pin::new(&mut self.0).poll_write(cx, buf) |
651 | } |
652 | |
653 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
654 | Pin::new(&mut self.0).poll_flush(cx) |
655 | } |
656 | |
657 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
658 | Pin::new(&mut self.0).poll_close(cx) |
659 | } |
660 | } |
661 | |
662 | #[cfg (unix)] |
663 | impl AsRawFd for ChildStdin { |
664 | fn as_raw_fd(&self) -> RawFd { |
665 | self.0.as_raw_fd() |
666 | } |
667 | } |
668 | |
669 | #[cfg (unix)] |
670 | impl AsFd for ChildStdin { |
671 | fn as_fd(&self) -> BorrowedFd<'_> { |
672 | self.0.as_fd() |
673 | } |
674 | } |
675 | |
676 | #[cfg (unix)] |
677 | impl TryFrom<ChildStdin> for OwnedFd { |
678 | type Error = io::Error; |
679 | |
680 | fn try_from(value: ChildStdin) -> Result<Self, Self::Error> { |
681 | value.0.try_into() |
682 | } |
683 | } |
684 | |
685 | // TODO(notgull): Add mirroring AsRawHandle impls for all of the child handles |
686 | // |
687 | // at the moment this is pretty hard to do because of how they're wrapped in |
688 | // Unblock, meaning that we can't always access the underlying handle. async-fs |
689 | // gets around this by putting the handle in an Arc, but there's still some decision |
690 | // to be made about how to handle this (no pun intended) |
691 | |
692 | /// A handle to a child process's standard output (stdout). |
693 | /// |
694 | /// When a [`ChildStdout`] is dropped, the underlying handle gets closed. |
695 | #[derive (Debug)] |
696 | pub struct ChildStdout( |
697 | #[cfg (windows)] Unblock<std::process::ChildStdout>, |
698 | #[cfg (unix)] Async<std::process::ChildStdout>, |
699 | ); |
700 | |
701 | impl ChildStdout { |
702 | /// Convert async_process::ChildStdout into std::process::Stdio. |
703 | /// |
704 | /// You can use it to associate to the next process. |
705 | /// |
706 | /// # Examples |
707 | /// |
708 | /// ```no_run |
709 | /// # futures_lite::future::block_on(async { |
710 | /// use async_process::Command; |
711 | /// use std::process::Stdio; |
712 | /// use std::io::Read; |
713 | /// use futures_lite::AsyncReadExt; |
714 | /// |
715 | /// let mut ls_child = Command::new("ls" ).stdout(Stdio::piped()).spawn()?; |
716 | /// let stdio:Stdio = ls_child.stdout.take().unwrap().into_stdio().await?; |
717 | /// |
718 | /// let mut echo_child = Command::new("echo" ).stdin(stdio).stdout(Stdio::piped()).spawn()?; |
719 | /// let mut buf = vec![]; |
720 | /// echo_child.stdout.take().unwrap().read(&mut buf).await; |
721 | /// # std::io::Result::Ok(()) }); |
722 | /// ``` |
723 | pub async fn into_stdio(self) -> io::Result<std::process::Stdio> { |
724 | cfg_if::cfg_if! { |
725 | if #[cfg(windows)] { |
726 | Ok(self.0.into_inner().await.into()) |
727 | } else if #[cfg(unix)] { |
728 | let child_stdout = self.0.into_inner()?; |
729 | blocking_fd(rustix::fd::AsFd::as_fd(&child_stdout))?; |
730 | Ok(child_stdout.into()) |
731 | } |
732 | } |
733 | } |
734 | } |
735 | |
736 | impl io::AsyncRead for ChildStdout { |
737 | fn poll_read( |
738 | mut self: Pin<&mut Self>, |
739 | cx: &mut Context<'_>, |
740 | buf: &mut [u8], |
741 | ) -> Poll<io::Result<usize>> { |
742 | Pin::new(&mut self.0).poll_read(cx, buf) |
743 | } |
744 | } |
745 | |
746 | #[cfg (unix)] |
747 | impl AsRawFd for ChildStdout { |
748 | fn as_raw_fd(&self) -> RawFd { |
749 | self.0.as_raw_fd() |
750 | } |
751 | } |
752 | |
753 | #[cfg (unix)] |
754 | impl AsFd for ChildStdout { |
755 | fn as_fd(&self) -> BorrowedFd<'_> { |
756 | self.0.as_fd() |
757 | } |
758 | } |
759 | |
760 | #[cfg (unix)] |
761 | impl TryFrom<ChildStdout> for OwnedFd { |
762 | type Error = io::Error; |
763 | |
764 | fn try_from(value: ChildStdout) -> Result<Self, Self::Error> { |
765 | value.0.try_into() |
766 | } |
767 | } |
768 | |
769 | /// A handle to a child process's standard error (stderr). |
770 | /// |
771 | /// When a [`ChildStderr`] is dropped, the underlying handle gets closed. |
772 | #[derive (Debug)] |
773 | pub struct ChildStderr( |
774 | #[cfg (windows)] Unblock<std::process::ChildStderr>, |
775 | #[cfg (unix)] Async<std::process::ChildStderr>, |
776 | ); |
777 | |
778 | impl ChildStderr { |
779 | /// Convert async_process::ChildStderr into std::process::Stdio. |
780 | /// |
781 | /// You can use it to associate to the next process. |
782 | /// |
783 | /// # Examples |
784 | /// |
785 | /// ```no_run |
786 | /// # futures_lite::future::block_on(async { |
787 | /// use async_process::Command; |
788 | /// use std::process::Stdio; |
789 | /// |
790 | /// let mut ls_child = Command::new("ls" ).arg("x" ).stderr(Stdio::piped()).spawn()?; |
791 | /// let stdio:Stdio = ls_child.stderr.take().unwrap().into_stdio().await?; |
792 | /// |
793 | /// let mut echo_child = Command::new("echo" ).stdin(stdio).spawn()?; |
794 | /// # std::io::Result::Ok(()) }); |
795 | /// ``` |
796 | pub async fn into_stdio(self) -> io::Result<std::process::Stdio> { |
797 | cfg_if::cfg_if! { |
798 | if #[cfg(windows)] { |
799 | Ok(self.0.into_inner().await.into()) |
800 | } else if #[cfg(unix)] { |
801 | let child_stderr = self.0.into_inner()?; |
802 | blocking_fd(rustix::fd::AsFd::as_fd(&child_stderr))?; |
803 | Ok(child_stderr.into()) |
804 | } |
805 | } |
806 | } |
807 | } |
808 | |
809 | impl io::AsyncRead for ChildStderr { |
810 | fn poll_read( |
811 | mut self: Pin<&mut Self>, |
812 | cx: &mut Context<'_>, |
813 | buf: &mut [u8], |
814 | ) -> Poll<io::Result<usize>> { |
815 | Pin::new(&mut self.0).poll_read(cx, buf) |
816 | } |
817 | } |
818 | |
819 | #[cfg (unix)] |
820 | impl AsRawFd for ChildStderr { |
821 | fn as_raw_fd(&self) -> RawFd { |
822 | self.0.as_raw_fd() |
823 | } |
824 | } |
825 | |
826 | #[cfg (unix)] |
827 | impl AsFd for ChildStderr { |
828 | fn as_fd(&self) -> BorrowedFd<'_> { |
829 | self.0.as_fd() |
830 | } |
831 | } |
832 | |
833 | #[cfg (unix)] |
834 | impl TryFrom<ChildStderr> for OwnedFd { |
835 | type Error = io::Error; |
836 | |
837 | fn try_from(value: ChildStderr) -> Result<Self, Self::Error> { |
838 | value.0.try_into() |
839 | } |
840 | } |
841 | |
842 | /// Runs the driver for the asynchronous processes. |
843 | /// |
844 | /// This future takes control of global structures related to driving [`Child`]ren and reaping |
845 | /// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and |
846 | /// making sure zombie processes are successfully waited on. |
847 | /// |
848 | /// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other |
849 | /// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping |
850 | /// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread |
851 | /// will be spawned. The "async-process" thread just blocks on this future and will automatically |
852 | /// be spawned if no tasks are driving the reaper once a [`Child`] is created. |
853 | /// |
854 | /// This future will never complete. It is intended to be ran on a background task in your |
855 | /// executor of choice. |
856 | /// |
857 | /// # Examples |
858 | /// |
859 | /// ```no_run |
860 | /// use async_executor::Executor; |
861 | /// use async_process::{driver, Command}; |
862 | /// |
863 | /// # futures_lite::future::block_on(async { |
864 | /// // Create an executor and run on it. |
865 | /// let ex = Executor::new(); |
866 | /// ex.run(async { |
867 | /// // Run the driver future in the background. |
868 | /// ex.spawn(driver()).detach(); |
869 | /// |
870 | /// // Run a command. |
871 | /// Command::new("ls" ).output().await.ok(); |
872 | /// }).await; |
873 | /// # }); |
874 | /// ``` |
875 | #[inline ] |
876 | pub fn driver() -> impl Future<Output = Infallible> + Send + 'static { |
877 | struct CallOnDrop<F: FnMut()>(F); |
878 | |
879 | impl<F: FnMut()> Drop for CallOnDrop<F> { |
880 | fn drop(&mut self) { |
881 | (self.0)(); |
882 | } |
883 | } |
884 | |
885 | async { |
886 | // Get the reaper. |
887 | let reaper = Reaper::get(); |
888 | |
889 | // Make sure the reaper knows we're driving it. |
890 | reaper.drivers.fetch_add(1, Ordering::SeqCst); |
891 | |
892 | // Decrement the driver count when this future is dropped. |
893 | let _guard = CallOnDrop(|| { |
894 | let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst); |
895 | |
896 | // If this was the last driver, and there are still resources actively using the |
897 | // reaper, make sure that there is a thread driving the reaper. |
898 | if prev_count == 1 |
899 | && reaper.child_count.load(Ordering::SeqCst) > 0 |
900 | && !reaper |
901 | .zombies |
902 | .lock() |
903 | .unwrap_or_else(|x| x.into_inner()) |
904 | .is_empty() |
905 | { |
906 | reaper.ensure_driven(); |
907 | } |
908 | }); |
909 | |
910 | // Acquire the reaper lock and start polling the SIGCHLD event. |
911 | let guard = reaper.driver_guard.lock().await; |
912 | reaper.reap(guard).await |
913 | } |
914 | } |
915 | |
916 | /// A builder for spawning processes. |
917 | /// |
918 | /// # Examples |
919 | /// |
920 | /// ```no_run |
921 | /// # futures_lite::future::block_on(async { |
922 | /// use async_process::Command; |
923 | /// |
924 | /// let output = if cfg!(target_os = "windows" ) { |
925 | /// Command::new("cmd" ).args(&["/C" , "echo hello" ]).output().await? |
926 | /// } else { |
927 | /// Command::new("sh" ).arg("-c" ).arg("echo hello" ).output().await? |
928 | /// }; |
929 | /// # std::io::Result::Ok(()) }); |
930 | /// ``` |
931 | pub struct Command { |
932 | inner: std::process::Command, |
933 | stdin: bool, |
934 | stdout: bool, |
935 | stderr: bool, |
936 | reap_on_drop: bool, |
937 | kill_on_drop: bool, |
938 | } |
939 | |
940 | impl Command { |
941 | /// Constructs a new [`Command`] for launching `program`. |
942 | /// |
943 | /// The initial configuration (the working directory and environment variables) is inherited |
944 | /// from the current process. |
945 | /// |
946 | /// # Examples |
947 | /// |
948 | /// ``` |
949 | /// use async_process::Command; |
950 | /// |
951 | /// let mut cmd = Command::new("ls" ); |
952 | /// ``` |
953 | pub fn new<S: AsRef<OsStr>>(program: S) -> Command { |
954 | Self::from(std::process::Command::new(program)) |
955 | } |
956 | |
957 | /// Adds a single argument to pass to the program. |
958 | /// |
959 | /// # Examples |
960 | /// |
961 | /// ``` |
962 | /// use async_process::Command; |
963 | /// |
964 | /// let mut cmd = Command::new("echo" ); |
965 | /// cmd.arg("hello" ); |
966 | /// cmd.arg("world" ); |
967 | /// ``` |
968 | pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Command { |
969 | self.inner.arg(arg); |
970 | self |
971 | } |
972 | |
973 | /// Adds multiple arguments to pass to the program. |
974 | /// |
975 | /// # Examples |
976 | /// |
977 | /// ``` |
978 | /// use async_process::Command; |
979 | /// |
980 | /// let mut cmd = Command::new("echo" ); |
981 | /// cmd.args(&["hello" , "world" ]); |
982 | /// ``` |
983 | pub fn args<I, S>(&mut self, args: I) -> &mut Command |
984 | where |
985 | I: IntoIterator<Item = S>, |
986 | S: AsRef<OsStr>, |
987 | { |
988 | self.inner.args(args); |
989 | self |
990 | } |
991 | |
992 | /// Configures an environment variable for the new process. |
993 | /// |
994 | /// Note that environment variable names are case-insensitive (but case-preserving) on Windows, |
995 | /// and case-sensitive on all other platforms. |
996 | /// |
997 | /// # Examples |
998 | /// |
999 | /// ``` |
1000 | /// use async_process::Command; |
1001 | /// |
1002 | /// let mut cmd = Command::new("ls" ); |
1003 | /// cmd.env("PATH" , "/bin" ); |
1004 | /// ``` |
1005 | pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Command |
1006 | where |
1007 | K: AsRef<OsStr>, |
1008 | V: AsRef<OsStr>, |
1009 | { |
1010 | self.inner.env(key, val); |
1011 | self |
1012 | } |
1013 | |
1014 | /// Configures multiple environment variables for the new process. |
1015 | /// |
1016 | /// Note that environment variable names are case-insensitive (but case-preserving) on Windows, |
1017 | /// and case-sensitive on all other platforms. |
1018 | /// |
1019 | /// # Examples |
1020 | /// |
1021 | /// ``` |
1022 | /// use async_process::Command; |
1023 | /// |
1024 | /// let mut cmd = Command::new("ls" ); |
1025 | /// cmd.envs(vec![("PATH" , "/bin" ), ("TERM" , "xterm-256color" )]); |
1026 | /// ``` |
1027 | pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Command |
1028 | where |
1029 | I: IntoIterator<Item = (K, V)>, |
1030 | K: AsRef<OsStr>, |
1031 | V: AsRef<OsStr>, |
1032 | { |
1033 | self.inner.envs(vars); |
1034 | self |
1035 | } |
1036 | |
1037 | /// Removes an environment variable mapping. |
1038 | /// |
1039 | /// # Examples |
1040 | /// |
1041 | /// ``` |
1042 | /// use async_process::Command; |
1043 | /// |
1044 | /// let mut cmd = Command::new("ls" ); |
1045 | /// cmd.env_remove("PATH" ); |
1046 | /// ``` |
1047 | pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Command { |
1048 | self.inner.env_remove(key); |
1049 | self |
1050 | } |
1051 | |
1052 | /// Removes all environment variable mappings. |
1053 | /// |
1054 | /// # Examples |
1055 | /// |
1056 | /// ``` |
1057 | /// use async_process::Command; |
1058 | /// |
1059 | /// let mut cmd = Command::new("ls" ); |
1060 | /// cmd.env_clear(); |
1061 | /// ``` |
1062 | pub fn env_clear(&mut self) -> &mut Command { |
1063 | self.inner.env_clear(); |
1064 | self |
1065 | } |
1066 | |
1067 | /// Configures the working directory for the new process. |
1068 | /// |
1069 | /// # Examples |
1070 | /// |
1071 | /// ``` |
1072 | /// use async_process::Command; |
1073 | /// |
1074 | /// let mut cmd = Command::new("ls" ); |
1075 | /// cmd.current_dir("/" ); |
1076 | /// ``` |
1077 | pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Command { |
1078 | self.inner.current_dir(dir); |
1079 | self |
1080 | } |
1081 | |
1082 | /// Configures the standard input (stdin) for the new process. |
1083 | /// |
1084 | /// # Examples |
1085 | /// |
1086 | /// ``` |
1087 | /// use async_process::{Command, Stdio}; |
1088 | /// |
1089 | /// let mut cmd = Command::new("cat" ); |
1090 | /// cmd.stdin(Stdio::null()); |
1091 | /// ``` |
1092 | pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command { |
1093 | self.stdin = true; |
1094 | self.inner.stdin(cfg); |
1095 | self |
1096 | } |
1097 | |
1098 | /// Configures the standard output (stdout) for the new process. |
1099 | /// |
1100 | /// # Examples |
1101 | /// |
1102 | /// ``` |
1103 | /// use async_process::{Command, Stdio}; |
1104 | /// |
1105 | /// let mut cmd = Command::new("ls" ); |
1106 | /// cmd.stdout(Stdio::piped()); |
1107 | /// ``` |
1108 | pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command { |
1109 | self.stdout = true; |
1110 | self.inner.stdout(cfg); |
1111 | self |
1112 | } |
1113 | |
1114 | /// Configures the standard error (stderr) for the new process. |
1115 | /// |
1116 | /// # Examples |
1117 | /// |
1118 | /// ``` |
1119 | /// use async_process::{Command, Stdio}; |
1120 | /// |
1121 | /// let mut cmd = Command::new("ls" ); |
1122 | /// cmd.stderr(Stdio::piped()); |
1123 | /// ``` |
1124 | pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command { |
1125 | self.stderr = true; |
1126 | self.inner.stderr(cfg); |
1127 | self |
1128 | } |
1129 | |
1130 | /// Configures whether to reap the zombie process when [`Child`] is dropped. |
1131 | /// |
1132 | /// When the process finishes, it becomes a "zombie" and some resources associated with it |
1133 | /// remain until [`Child::try_status()`], [`Child::status()`], or [`Child::output()`] collects |
1134 | /// its exit code. |
1135 | /// |
1136 | /// If its exit code is never collected, the resources may leak forever. This crate has a |
1137 | /// background thread named "async-process" that collects such "zombie" processes and then |
1138 | /// "reaps" them, thus preventing the resource leaks. |
1139 | /// |
1140 | /// The default value of this option is `true`. |
1141 | /// |
1142 | /// # Examples |
1143 | /// |
1144 | /// ``` |
1145 | /// use async_process::{Command, Stdio}; |
1146 | /// |
1147 | /// let mut cmd = Command::new("cat" ); |
1148 | /// cmd.reap_on_drop(false); |
1149 | /// ``` |
1150 | pub fn reap_on_drop(&mut self, reap_on_drop: bool) -> &mut Command { |
1151 | self.reap_on_drop = reap_on_drop; |
1152 | self |
1153 | } |
1154 | |
1155 | /// Configures whether to kill the process when [`Child`] is dropped. |
1156 | /// |
1157 | /// The default value of this option is `false`. |
1158 | /// |
1159 | /// # Examples |
1160 | /// |
1161 | /// ``` |
1162 | /// use async_process::{Command, Stdio}; |
1163 | /// |
1164 | /// let mut cmd = Command::new("cat" ); |
1165 | /// cmd.kill_on_drop(true); |
1166 | /// ``` |
1167 | pub fn kill_on_drop(&mut self, kill_on_drop: bool) -> &mut Command { |
1168 | self.kill_on_drop = kill_on_drop; |
1169 | self |
1170 | } |
1171 | |
1172 | /// Executes the command and returns the [`Child`] handle to it. |
1173 | /// |
1174 | /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`]. |
1175 | /// |
1176 | /// # Examples |
1177 | /// |
1178 | /// ```no_run |
1179 | /// # futures_lite::future::block_on(async { |
1180 | /// use async_process::Command; |
1181 | /// |
1182 | /// let child = Command::new("ls" ).spawn()?; |
1183 | /// # std::io::Result::Ok(()) }); |
1184 | /// ``` |
1185 | pub fn spawn(&mut self) -> io::Result<Child> { |
1186 | if !self.stdin { |
1187 | self.inner.stdin(Stdio::inherit()); |
1188 | } |
1189 | if !self.stdout { |
1190 | self.inner.stdout(Stdio::inherit()); |
1191 | } |
1192 | if !self.stderr { |
1193 | self.inner.stderr(Stdio::inherit()); |
1194 | } |
1195 | |
1196 | Child::new(self) |
1197 | } |
1198 | |
1199 | /// Executes the command, waits for it to exit, and returns the exit status. |
1200 | /// |
1201 | /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`]. |
1202 | /// |
1203 | /// # Examples |
1204 | /// |
1205 | /// ```no_run |
1206 | /// # futures_lite::future::block_on(async { |
1207 | /// use async_process::Command; |
1208 | /// |
1209 | /// let status = Command::new("cp" ) |
1210 | /// .arg("a.txt" ) |
1211 | /// .arg("b.txt" ) |
1212 | /// .status() |
1213 | /// .await?; |
1214 | /// # std::io::Result::Ok(()) }); |
1215 | /// ``` |
1216 | pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> { |
1217 | let child = self.spawn(); |
1218 | async { child?.status().await } |
1219 | } |
1220 | |
1221 | /// Executes the command and collects its output. |
1222 | /// |
1223 | /// If not configured, stdin will be set to [`Stdio::null()`], and stdout and stderr will be |
1224 | /// set to [`Stdio::piped()`]. |
1225 | /// |
1226 | /// # Examples |
1227 | /// |
1228 | /// ```no_run |
1229 | /// # futures_lite::future::block_on(async { |
1230 | /// use async_process::Command; |
1231 | /// |
1232 | /// let output = Command::new("cat" ) |
1233 | /// .arg("a.txt" ) |
1234 | /// .output() |
1235 | /// .await?; |
1236 | /// # std::io::Result::Ok(()) }); |
1237 | /// ``` |
1238 | pub fn output(&mut self) -> impl Future<Output = io::Result<Output>> { |
1239 | if !self.stdin { |
1240 | self.inner.stdin(Stdio::null()); |
1241 | } |
1242 | if !self.stdout { |
1243 | self.inner.stdout(Stdio::piped()); |
1244 | } |
1245 | if !self.stderr { |
1246 | self.inner.stderr(Stdio::piped()); |
1247 | } |
1248 | |
1249 | let child = Child::new(self); |
1250 | async { child?.output().await } |
1251 | } |
1252 | } |
1253 | |
1254 | impl From<std::process::Command> for Command { |
1255 | fn from(inner: std::process::Command) -> Self { |
1256 | Self { |
1257 | inner, |
1258 | stdin: false, |
1259 | stdout: false, |
1260 | stderr: false, |
1261 | reap_on_drop: true, |
1262 | kill_on_drop: false, |
1263 | } |
1264 | } |
1265 | } |
1266 | |
1267 | impl fmt::Debug for Command { |
1268 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1269 | if f.alternate() { |
1270 | f&mut DebugStruct<'_, '_>.debug_struct("Command" ) |
1271 | .field("inner" , &self.inner) |
1272 | .field("stdin" , &self.stdin) |
1273 | .field("stdout" , &self.stdout) |
1274 | .field("stderr" , &self.stderr) |
1275 | .field("reap_on_drop" , &self.reap_on_drop) |
1276 | .field(name:"kill_on_drop" , &self.kill_on_drop) |
1277 | .finish() |
1278 | } else { |
1279 | // Stdlib outputs command-line in Debug for Command. This does the |
1280 | // same, if not in "alternate" (long pretty-printed) mode. |
1281 | // This is useful for logs, for example. |
1282 | fmt::Debug::fmt(&self.inner, f) |
1283 | } |
1284 | } |
1285 | } |
1286 | |
1287 | /// Moves `Fd` out of non-blocking mode. |
1288 | #[cfg (unix)] |
1289 | fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> { |
1290 | cfg_if::cfg_if! { |
1291 | // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux |
1292 | // for now, as with the standard library, because it seems to behave |
1293 | // differently depending on the platform. |
1294 | // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d |
1295 | // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80 |
1296 | // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a |
1297 | if #[cfg(target_os = "linux" )] { |
1298 | rustix::io::ioctl_fionbio(fd, false)?; |
1299 | } else { |
1300 | let previous = rustix::fs::fcntl_getfl(fd)?; |
1301 | let new = previous & !rustix::fs::OFlags::NONBLOCK; |
1302 | if new != previous { |
1303 | rustix::fs::fcntl_setfl(fd, new)?; |
1304 | } |
1305 | } |
1306 | } |
1307 | Ok(()) |
1308 | } |
1309 | |
1310 | #[cfg (test)] |
1311 | mod test { |
1312 | #[test ] |
1313 | fn polled_driver() { |
1314 | use super::{driver, Command}; |
1315 | use futures_lite::future; |
1316 | use futures_lite::prelude::*; |
1317 | |
1318 | let is_thread_spawned = |
1319 | || super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst); |
1320 | |
1321 | #[cfg (unix)] |
1322 | fn command() -> Command { |
1323 | let mut cmd = Command::new("sh" ); |
1324 | cmd.arg("-c" ).arg("echo hello" ); |
1325 | cmd |
1326 | } |
1327 | |
1328 | #[cfg (windows)] |
1329 | fn command() -> Command { |
1330 | let mut cmd = Command::new("cmd" ); |
1331 | cmd.arg("/C" ).arg("echo hello" ); |
1332 | cmd |
1333 | } |
1334 | |
1335 | #[cfg (unix)] |
1336 | const OUTPUT: &[u8] = b"hello \n" ; |
1337 | #[cfg (windows)] |
1338 | const OUTPUT: &[u8] = b"hello \r\n" ; |
1339 | |
1340 | future::block_on(async { |
1341 | // Thread should not be spawned off the bat. |
1342 | assert!(!is_thread_spawned()); |
1343 | |
1344 | // Spawn a driver. |
1345 | let mut driver1 = Box::pin(driver()); |
1346 | future::poll_once(&mut driver1).await; |
1347 | assert!(!is_thread_spawned()); |
1348 | |
1349 | // We should be able to run the driver in parallel with a process future. |
1350 | async { |
1351 | (&mut driver1).await; |
1352 | } |
1353 | .or(async { |
1354 | let output = command().output().await.unwrap(); |
1355 | assert_eq!(output.stdout, OUTPUT); |
1356 | }) |
1357 | .await; |
1358 | assert!(!is_thread_spawned()); |
1359 | |
1360 | // Spawn a second driver. |
1361 | let mut driver2 = Box::pin(driver()); |
1362 | future::poll_once(&mut driver2).await; |
1363 | assert!(!is_thread_spawned()); |
1364 | |
1365 | // Poll both drivers in parallel. |
1366 | async { |
1367 | (&mut driver1).await; |
1368 | } |
1369 | .or(async { |
1370 | (&mut driver2).await; |
1371 | }) |
1372 | .or(async { |
1373 | let output = command().output().await.unwrap(); |
1374 | assert_eq!(output.stdout, OUTPUT); |
1375 | }) |
1376 | .await; |
1377 | assert!(!is_thread_spawned()); |
1378 | |
1379 | // Once one is dropped, the other should take over. |
1380 | drop(driver1); |
1381 | assert!(!is_thread_spawned()); |
1382 | |
1383 | // Poll driver2 in parallel with a process future. |
1384 | async { |
1385 | (&mut driver2).await; |
1386 | } |
1387 | .or(async { |
1388 | let output = command().output().await.unwrap(); |
1389 | assert_eq!(output.stdout, OUTPUT); |
1390 | }) |
1391 | .await; |
1392 | assert!(!is_thread_spawned()); |
1393 | |
1394 | // Once driver2 is dropped, the thread should not be spawned, as there are no active |
1395 | // child processes.. |
1396 | drop(driver2); |
1397 | assert!(!is_thread_spawned()); |
1398 | |
1399 | // We should now be able to poll the process future independently, it will spawn the |
1400 | // thread. |
1401 | let output = command().output().await.unwrap(); |
1402 | assert_eq!(output.stdout, OUTPUT); |
1403 | assert!(is_thread_spawned()); |
1404 | }); |
1405 | } |
1406 | } |
1407 | |