1 | //! A cross-platform library for opening OS pipes, like those from |
2 | //! [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) on Linux |
3 | //! or |
4 | //! [`CreatePipe`](https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe) |
5 | //! on Windows. The Rust standard library provides |
6 | //! [`Stdio::piped`](https://doc.rust-lang.org/std/process/struct.Stdio.html#method.piped) |
7 | //! for simple use cases involving child processes, but it doesn't |
8 | //! support creating pipes directly. This crate fills that gap. |
9 | //! |
10 | //! - [Docs](https://docs.rs/os_pipe) |
11 | //! - [Crate](https://crates.io/crates/os_pipe) |
12 | //! - [Repo](https://github.com/oconnor663/os_pipe.rs) |
13 | //! |
14 | //! # Common deadlocks related to pipes |
15 | //! |
16 | //! When you work with pipes, you often end up debugging a deadlock at |
17 | //! some point. These can be confusing if you don't know why they |
18 | //! happen. Here are two things you need to know: |
19 | //! |
20 | //! 1. Pipe reads will block waiting for input as long as there's at |
21 | //! least one writer still open. **If you forget to close a writer, |
22 | //! reads will block forever.** This includes writers that you give |
23 | //! to child processes. |
24 | //! 2. Pipes have an internal buffer of some fixed size. On Linux for |
25 | //! example, pipe buffers are 64 KiB by default. When the buffer is |
26 | //! full, writes will block waiting for space. **If the buffer is |
27 | //! full and there aren't any readers, writes will block forever.** |
28 | //! |
29 | //! Deadlocks caused by a forgotten writer usually show up immediately, |
30 | //! which makes them relatively easy to fix once you know what to look |
31 | //! for. (See "Avoid a deadlock!" in the example code below.) However, |
32 | //! deadlocks caused by full pipe buffers are trickier. These might only |
33 | //! show up for larger inputs, and they might be timing-dependent or |
34 | //! platform-dependent. If you find that writing to a pipe deadlocks |
35 | //! sometimes, think about who's supposed to be reading from that pipe, |
36 | //! and whether that thread or process might be blocked on something |
37 | //! else. For more on this, see the [Gotchas |
38 | //! Doc](https://github.com/oconnor663/duct.py/blob/master/gotchas.md#using-io-threads-to-avoid-blocking-children) |
39 | //! from the [`duct`](https://github.com/oconnor663/duct.rs) crate. (And |
40 | //! consider whether [`duct`](https://github.com/oconnor663/duct.rs) |
41 | //! might be a good fit for your use case.) |
42 | //! |
43 | //! # Examples |
44 | //! |
45 | //! Here we write a single byte into a pipe and read it back out: |
46 | //! |
47 | //! ```rust |
48 | //! # fn main() -> Result<(), Box<dyn std::error::Error>> { |
49 | //! use std::io::prelude::*; |
50 | //! |
51 | //! let (mut reader, mut writer) = os_pipe::pipe()?; |
52 | //! // XXX: If this write blocks, we'll never get to the read. |
53 | //! writer.write_all(b"x" )?; |
54 | //! let mut output = [0]; |
55 | //! reader.read_exact(&mut output)?; |
56 | //! assert_eq!(b"x" , &output); |
57 | //! # Ok(()) |
58 | //! # } |
59 | //! ``` |
60 | //! |
61 | //! This is a minimal working example, but as discussed in the section |
62 | //! above, reading and writing on the same thread like this is |
63 | //! deadlock-prone. If we wrote 100 KB instead of just one byte, this |
64 | //! example would block on `write_all`, it would never make it to |
65 | //! `read_exact`, and that would be a deadlock. Doing the read and write |
66 | //! from different threads or different processes would fix the |
67 | //! deadlock. |
68 | //! |
69 | //! For a more complex example, here we join the stdout and stderr of a |
70 | //! child process into a single pipe. To do that we open a pipe, clone |
71 | //! its writer, and set that pair of writers as the child's stdout and |
72 | //! stderr. (This is possible because `PipeWriter` implements |
73 | //! `Into<Stdio>`.) Then we can read interleaved output from the pipe |
74 | //! reader. This example is deadlock-free, but note the comment about |
75 | //! closing the writers. |
76 | //! |
77 | //! ```rust |
78 | //! # use std::io::prelude::*; |
79 | //! # fn main() -> Result<(), Box<dyn std::error::Error>> { |
80 | //! // We're going to spawn a child process that prints "foo" to stdout |
81 | //! // and "bar" to stderr, and we'll combine these into a single pipe. |
82 | //! let mut command = std::process::Command::new("python" ); |
83 | //! command.args(&["-c" , r#" |
84 | //! import sys |
85 | //! sys.stdout.write("foo") |
86 | //! sys.stdout.flush() |
87 | //! sys.stderr.write("bar") |
88 | //! sys.stderr.flush() |
89 | //! "# ]); |
90 | //! |
91 | //! // Here's the interesting part. Open a pipe, clone its writer, and |
92 | //! // set that pair of writers as the child's stdout and stderr. |
93 | //! let (mut reader, writer) = os_pipe::pipe()?; |
94 | //! let writer_clone = writer.try_clone()?; |
95 | //! command.stdout(writer); |
96 | //! command.stderr(writer_clone); |
97 | //! |
98 | //! // Now start the child process running. |
99 | //! let mut handle = command.spawn()?; |
100 | //! |
101 | //! // Avoid a deadlock! This parent process is still holding open pipe |
102 | //! // writers inside the Command object, and we have to close those |
103 | //! // before we read. Here we do this by dropping the Command object. |
104 | //! drop(command); |
105 | //! |
106 | //! // Finally we can read all the output and clean up the child. |
107 | //! let mut output = String::new(); |
108 | //! reader.read_to_string(&mut output)?; |
109 | //! handle.wait()?; |
110 | //! assert_eq!(output, "foobar" ); |
111 | //! # Ok(()) |
112 | //! # } |
113 | //! ``` |
114 | //! |
115 | //! Note that the [`duct`](https://github.com/oconnor663/duct.rs) crate |
116 | //! can reproduce the example above in a single line of code, with no |
117 | //! risk of deadlocks and no risk of leaking [zombie |
118 | //! children](https://en.wikipedia.org/wiki/Zombie_process). |
119 | |
120 | use std::fs::File; |
121 | use std::io; |
122 | use std::process::Stdio; |
123 | |
124 | #[cfg (not(windows))] |
125 | #[path = "unix.rs" ] |
126 | mod sys; |
127 | #[cfg (windows)] |
128 | #[path = "windows.rs" ] |
129 | mod sys; |
130 | |
131 | /// The reading end of a pipe, returned by [`pipe`](fn.pipe.html). |
132 | /// |
133 | /// `PipeReader` implements `Into<Stdio>`, so you can pass it as an argument to |
134 | /// `Command::stdin` to spawn a child process that reads from the pipe. |
135 | #[derive (Debug)] |
136 | pub struct PipeReader( |
137 | // We use std::fs::File here for two reasons: OwnedFd and OwnedHandle are platform-specific, |
138 | // and this gives us read/write/flush for free. |
139 | File, |
140 | ); |
141 | |
142 | impl PipeReader { |
143 | pub fn try_clone(&self) -> io::Result<PipeReader> { |
144 | self.0.try_clone().map(op:PipeReader) |
145 | } |
146 | } |
147 | |
148 | impl io::Read for PipeReader { |
149 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
150 | self.0.read(buf) |
151 | } |
152 | } |
153 | |
154 | impl<'a> io::Read for &'a PipeReader { |
155 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
156 | (&self.0).read(buf) |
157 | } |
158 | } |
159 | |
160 | impl From<PipeReader> for Stdio { |
161 | fn from(p: PipeReader) -> Stdio { |
162 | p.0.into() |
163 | } |
164 | } |
165 | |
166 | /// The writing end of a pipe, returned by [`pipe`](fn.pipe.html). |
167 | /// |
168 | /// `PipeWriter` implements `Into<Stdio>`, so you can pass it as an argument to |
169 | /// `Command::stdout` or `Command::stderr` to spawn a child process that writes |
170 | /// to the pipe. |
171 | #[derive (Debug)] |
172 | pub struct PipeWriter(File); |
173 | |
174 | impl PipeWriter { |
175 | pub fn try_clone(&self) -> io::Result<PipeWriter> { |
176 | self.0.try_clone().map(op:PipeWriter) |
177 | } |
178 | } |
179 | |
180 | impl io::Write for PipeWriter { |
181 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
182 | self.0.write(buf) |
183 | } |
184 | |
185 | fn flush(&mut self) -> io::Result<()> { |
186 | self.0.flush() |
187 | } |
188 | } |
189 | |
190 | impl<'a> io::Write for &'a PipeWriter { |
191 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
192 | (&self.0).write(buf) |
193 | } |
194 | |
195 | fn flush(&mut self) -> io::Result<()> { |
196 | (&self.0).flush() |
197 | } |
198 | } |
199 | |
200 | impl From<PipeWriter> for Stdio { |
201 | fn from(p: PipeWriter) -> Stdio { |
202 | p.0.into() |
203 | } |
204 | } |
205 | |
206 | /// Open a new pipe and return a [`PipeReader`] and [`PipeWriter`] pair. |
207 | /// |
208 | /// This corresponds to the `pipe2` library call on Posix and the |
209 | /// `CreatePipe` library call on Windows (though these implementation |
210 | /// details might change). These pipes are non-inheritable, so new child |
211 | /// processes won't receive a copy of them unless they're explicitly |
212 | /// passed as stdin/stdout/stderr. |
213 | /// |
214 | /// [`PipeReader`]: struct.PipeReader.html |
215 | /// [`PipeWriter`]: struct.PipeWriter.html |
216 | pub fn pipe() -> io::Result<(PipeReader, PipeWriter)> { |
217 | sys::pipe() |
218 | } |
219 | |
220 | /// Get a duplicated copy of the current process's standard input, as a |
221 | /// [`PipeReader`]. |
222 | /// |
223 | /// Reading directly from this pipe isn't recommended, because it's not |
224 | /// synchronized with [`std::io::stdin`]. [`PipeReader`] implements |
225 | /// [`Into<Stdio>`], so it can be passed directly to [`Command::stdin`]. This is |
226 | /// equivalent to [`Stdio::inherit`], though, so it's usually not necessary |
227 | /// unless you need a collection of different pipes. |
228 | /// |
229 | /// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html |
230 | /// [`PipeReader`]: struct.PipeReader.html |
231 | /// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html |
232 | /// [`Command::stdin`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdin |
233 | /// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit |
234 | pub fn dup_stdin() -> io::Result<PipeReader> { |
235 | sys::dup(io::stdin()).map(op:PipeReader::from) |
236 | } |
237 | |
238 | /// Get a duplicated copy of the current process's standard output, as a |
239 | /// [`PipeWriter`](struct.PipeWriter.html). |
240 | /// |
241 | /// Writing directly to this pipe isn't recommended, because it's not |
242 | /// synchronized with [`std::io::stdout`]. [`PipeWriter`] implements |
243 | /// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or |
244 | /// [`Command::stderr`]. This can be useful if you want the child's stderr to go |
245 | /// to the parent's stdout. |
246 | /// |
247 | /// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html |
248 | /// [`PipeWriter`]: struct.PipeWriter.html |
249 | /// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html |
250 | /// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout |
251 | /// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr |
252 | /// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit |
253 | pub fn dup_stdout() -> io::Result<PipeWriter> { |
254 | sys::dup(io::stdout()).map(op:PipeWriter::from) |
255 | } |
256 | |
257 | /// Get a duplicated copy of the current process's standard error, as a |
258 | /// [`PipeWriter`](struct.PipeWriter.html). |
259 | /// |
260 | /// Writing directly to this pipe isn't recommended, because it's not |
261 | /// synchronized with [`std::io::stderr`]. [`PipeWriter`] implements |
262 | /// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or |
263 | /// [`Command::stderr`]. This can be useful if you want the child's stdout to go |
264 | /// to the parent's stderr. |
265 | /// |
266 | /// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html |
267 | /// [`PipeWriter`]: struct.PipeWriter.html |
268 | /// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html |
269 | /// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout |
270 | /// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr |
271 | /// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit |
272 | pub fn dup_stderr() -> io::Result<PipeWriter> { |
273 | sys::dup(io::stderr()).map(op:PipeWriter::from) |
274 | } |
275 | |
276 | #[cfg (test)] |
277 | mod tests { |
278 | use std::env::consts::EXE_EXTENSION; |
279 | use std::io::prelude::*; |
280 | use std::path::{Path, PathBuf}; |
281 | use std::process::Command; |
282 | use std::sync::Once; |
283 | use std::thread; |
284 | |
285 | fn path_to_exe(name: &str) -> PathBuf { |
286 | // This project defines some associated binaries for testing, and we shell out to them in |
287 | // these tests. `cargo test` doesn't automatically build associated binaries, so this |
288 | // function takes care of building them explicitly, with the right debug/release flavor. |
289 | static CARGO_BUILD_ONCE: Once = Once::new(); |
290 | CARGO_BUILD_ONCE.call_once(|| { |
291 | let mut build_command = Command::new("cargo" ); |
292 | build_command.args(&["build" , "--quiet" ]); |
293 | if !cfg!(debug_assertions) { |
294 | build_command.arg("--release" ); |
295 | } |
296 | let build_status = build_command.status().unwrap(); |
297 | assert!( |
298 | build_status.success(), |
299 | "Cargo failed to build associated binaries." |
300 | ); |
301 | }); |
302 | let flavor = if cfg!(debug_assertions) { |
303 | "debug" |
304 | } else { |
305 | "release" |
306 | }; |
307 | Path::new("target" ) |
308 | .join(flavor) |
309 | .join(name) |
310 | .with_extension(EXE_EXTENSION) |
311 | } |
312 | |
313 | #[test ] |
314 | fn test_pipe_some_data() { |
315 | let (mut reader, mut writer) = crate::pipe().unwrap(); |
316 | // A small write won't fill the pipe buffer, so it won't block this thread. |
317 | writer.write_all(b"some stuff" ).unwrap(); |
318 | drop(writer); |
319 | let mut out = String::new(); |
320 | reader.read_to_string(&mut out).unwrap(); |
321 | assert_eq!(out, "some stuff" ); |
322 | } |
323 | |
324 | #[test ] |
325 | fn test_pipe_some_data_with_refs() { |
326 | // As with `File`, there's a second set of impls for shared |
327 | // refs. Test those. |
328 | let (reader, writer) = crate::pipe().unwrap(); |
329 | let mut reader_ref = &reader; |
330 | { |
331 | let mut writer_ref = &writer; |
332 | // A small write won't fill the pipe buffer, so it won't block this thread. |
333 | writer_ref.write_all(b"some stuff" ).unwrap(); |
334 | } |
335 | drop(writer); |
336 | let mut out = String::new(); |
337 | reader_ref.read_to_string(&mut out).unwrap(); |
338 | assert_eq!(out, "some stuff" ); |
339 | } |
340 | |
341 | #[test ] |
342 | fn test_pipe_no_data() { |
343 | let (mut reader, writer) = crate::pipe().unwrap(); |
344 | drop(writer); |
345 | let mut out = String::new(); |
346 | reader.read_to_string(&mut out).unwrap(); |
347 | assert_eq!(out, "" ); |
348 | } |
349 | |
350 | #[test ] |
351 | fn test_pipe_a_megabyte_of_data_from_another_thread() { |
352 | let data = vec![0xff; 1_000_000]; |
353 | let data_copy = data.clone(); |
354 | let (mut reader, mut writer) = crate::pipe().unwrap(); |
355 | let joiner = thread::spawn(move || { |
356 | writer.write_all(&data_copy).unwrap(); |
357 | // This drop happens automatically, so writing it out here is mostly |
358 | // just for clarity. For what it's worth, it also guards against |
359 | // accidentally forgetting to drop if we switch to scoped threads or |
360 | // something like that and change this to a non-moving closure. The |
361 | // explicit drop forces `writer` to move. |
362 | drop(writer); |
363 | }); |
364 | let mut out = Vec::new(); |
365 | reader.read_to_end(&mut out).unwrap(); |
366 | joiner.join().unwrap(); |
367 | assert_eq!(out, data); |
368 | } |
369 | |
370 | #[test ] |
371 | fn test_pipes_are_not_inheritable() { |
372 | // Create pipes for a child process. |
373 | let (input_reader, mut input_writer) = crate::pipe().unwrap(); |
374 | let (mut output_reader, output_writer) = crate::pipe().unwrap(); |
375 | |
376 | // Create a bunch of duplicated copies, which we'll close later. This |
377 | // tests that duplication preserves non-inheritability. |
378 | let ir_dup = input_reader.try_clone().unwrap(); |
379 | let iw_dup = input_writer.try_clone().unwrap(); |
380 | let or_dup = output_reader.try_clone().unwrap(); |
381 | let ow_dup = output_writer.try_clone().unwrap(); |
382 | |
383 | // Spawn the child. Note that this temporary Command object takes |
384 | // ownership of our copies of the child's stdin and stdout, and then |
385 | // closes them immediately when it drops. That stops us from blocking |
386 | // our own read below. We use our own simple implementation of cat for |
387 | // compatibility with Windows. |
388 | let mut child = Command::new(path_to_exe("cat" )) |
389 | .stdin(input_reader) |
390 | .stdout(output_writer) |
391 | .spawn() |
392 | .unwrap(); |
393 | |
394 | // Drop all the dups now that the child is spawned. |
395 | drop(ir_dup); |
396 | drop(iw_dup); |
397 | drop(or_dup); |
398 | drop(ow_dup); |
399 | |
400 | // Write to the child's stdin. This is a small write, so it shouldn't |
401 | // block. |
402 | input_writer.write_all(b"hello" ).unwrap(); |
403 | drop(input_writer); |
404 | |
405 | // Read from the child's stdout. If this child has accidentally |
406 | // inherited the write end of its own stdin, then it will never exit, |
407 | // and this read will block forever. That's what this test is all |
408 | // about. |
409 | let mut output = Vec::new(); |
410 | output_reader.read_to_end(&mut output).unwrap(); |
411 | child.wait().unwrap(); |
412 | |
413 | // Confirm that we got the right bytes. |
414 | assert_eq!(b"hello" , &*output); |
415 | } |
416 | |
417 | #[test ] |
418 | fn test_parent_handles() { |
419 | // This test invokes the `swap` test program, which uses parent_stdout() and |
420 | // parent_stderr() to swap the outputs for another child that it spawns. |
421 | |
422 | // Create pipes for a child process. |
423 | let (reader, mut writer) = crate::pipe().unwrap(); |
424 | |
425 | // Write input. This shouldn't block because it's small. Then close the write end, or else |
426 | // the child will hang. |
427 | writer.write_all(b"quack" ).unwrap(); |
428 | drop(writer); |
429 | |
430 | // Use `swap` to run `cat_both`. `cat_both will read "quack" from stdin |
431 | // and write it to stdout and stderr with different tags. But because we |
432 | // run it inside `swap`, the tags in the output should be backwards. |
433 | let output = Command::new(path_to_exe("swap" )) |
434 | .arg(path_to_exe("cat_both" )) |
435 | .stdin(reader) |
436 | .output() |
437 | .unwrap(); |
438 | |
439 | // Check for a clean exit. |
440 | assert!( |
441 | output.status.success(), |
442 | "child process returned {:#?}" , |
443 | output |
444 | ); |
445 | |
446 | // Confirm that we got the right bytes. |
447 | assert_eq!(b"stderr: quack" , &*output.stdout); |
448 | assert_eq!(b"stdout: quack" , &*output.stderr); |
449 | } |
450 | |
451 | #[test ] |
452 | fn test_parent_handles_dont_close() { |
453 | // Open and close each parent pipe multiple times. If this closes the |
454 | // original, subsequent opens should fail. |
455 | let stdin = crate::dup_stdin().unwrap(); |
456 | drop(stdin); |
457 | let stdin = crate::dup_stdin().unwrap(); |
458 | drop(stdin); |
459 | |
460 | let stdout = crate::dup_stdout().unwrap(); |
461 | drop(stdout); |
462 | let stdout = crate::dup_stdout().unwrap(); |
463 | drop(stdout); |
464 | |
465 | let stderr = crate::dup_stderr().unwrap(); |
466 | drop(stderr); |
467 | let stderr = crate::dup_stderr().unwrap(); |
468 | drop(stderr); |
469 | } |
470 | |
471 | #[test ] |
472 | fn test_try_clone() { |
473 | let (reader, writer) = crate::pipe().unwrap(); |
474 | let mut reader_clone = reader.try_clone().unwrap(); |
475 | let mut writer_clone = writer.try_clone().unwrap(); |
476 | // A small write won't fill the pipe buffer, so it won't block this thread. |
477 | writer_clone.write_all(b"some stuff" ).unwrap(); |
478 | drop(writer); |
479 | drop(writer_clone); |
480 | let mut out = String::new(); |
481 | reader_clone.read_to_string(&mut out).unwrap(); |
482 | assert_eq!(out, "some stuff" ); |
483 | } |
484 | |
485 | #[test ] |
486 | fn test_debug() { |
487 | let (reader, writer) = crate::pipe().unwrap(); |
488 | _ = format!("{:?} {:?}" , reader, writer); |
489 | } |
490 | } |
491 | |