1#![warn(rust_2018_idioms)]
2#![cfg(all(feature = "full", not(target_os = "wasi")))]
3
4use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
5use tokio::join;
6use tokio::process::{Child, Command};
7use tokio_test::assert_ok;
8
9use futures::future::{self, FutureExt};
10use std::env;
11use std::io;
12use std::process::{ExitStatus, Stdio};
13
14fn cat() -> Command {
15 let mut cmd = Command::new(env!("CARGO_BIN_EXE_test-cat"));
16 cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
17 cmd
18}
19
20async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
21 let mut stdin = cat.stdin.take().unwrap();
22 let stdout = cat.stdout.take().unwrap();
23
24 // Produce n lines on the child's stdout.
25 let write = async {
26 for i in 0..n {
27 let bytes = format!("line {}\n", i).into_bytes();
28 stdin.write_all(&bytes).await.unwrap();
29 }
30
31 drop(stdin);
32 };
33
34 let read = async {
35 let mut reader = BufReader::new(stdout).lines();
36 let mut num_lines = 0;
37
38 // Try to read `n + 1` lines, ensuring the last one is empty
39 // (i.e. EOF is reached after `n` lines.
40 loop {
41 let data = reader
42 .next_line()
43 .await
44 .unwrap_or_else(|_| Some(String::new()))
45 .expect("failed to read line");
46
47 let num_read = data.len();
48 let done = num_lines >= n;
49
50 match (done, num_read) {
51 (false, 0) => panic!("broken pipe"),
52 (true, n) if n != 0 => panic!("extraneous data"),
53 _ => {
54 let expected = format!("line {}", num_lines);
55 assert_eq!(expected, data);
56 }
57 };
58
59 num_lines += 1;
60 if num_lines >= n {
61 break;
62 }
63 }
64 };
65
66 // Compose reading and writing concurrently.
67 future::join3(write, read, cat.wait())
68 .map(|(_, _, status)| status)
69 .await
70}
71
72/// Check for the following properties when feeding stdin and
73/// consuming stdout of a cat-like process:
74///
75/// - A number of lines that amounts to a number of bytes exceeding a
76/// typical OS buffer size can be fed to the child without
77/// deadlock. This tests that we also consume the stdout
78/// concurrently; otherwise this would deadlock.
79///
80/// - We read the same lines from the child that we fed it.
81///
82/// - The child does produce EOF on stdout after the last line.
83#[tokio::test]
84async fn feed_a_lot() {
85 let child = cat().spawn().unwrap();
86 let status = feed_cat(child, 10000).await.unwrap();
87 assert_eq!(status.code(), Some(0));
88}
89
90#[tokio::test]
91async fn wait_with_output_captures() {
92 let mut child = cat().spawn().unwrap();
93 let mut stdin = child.stdin.take().unwrap();
94
95 let write_bytes = b"1234";
96
97 let future = async {
98 stdin.write_all(write_bytes).await?;
99 drop(stdin);
100 let out = child.wait_with_output();
101 out.await
102 };
103
104 let output = future.await.unwrap();
105
106 assert!(output.status.success());
107 assert_eq!(output.stdout, write_bytes);
108 assert_eq!(output.stderr.len(), 0);
109}
110
111#[tokio::test]
112async fn status_closes_any_pipes() {
113 // Cat will open a pipe between the parent and child.
114 // If `status_async` doesn't ensure the handles are closed,
115 // we would end up blocking forever (and time out).
116 let child = cat().status();
117
118 assert_ok!(child.await);
119}
120
121#[tokio::test]
122async fn try_wait() {
123 let mut child = cat().spawn().unwrap();
124
125 let id = child.id().expect("missing id");
126 assert!(id > 0);
127
128 assert_eq!(None, assert_ok!(child.try_wait()));
129
130 // Drop the child's stdio handles so it can terminate
131 drop(child.stdin.take());
132 drop(child.stderr.take());
133 drop(child.stdout.take());
134
135 assert_ok!(child.wait().await);
136
137 // test that the `.try_wait()` method is fused just like the stdlib
138 assert!(assert_ok!(child.try_wait()).unwrap().success());
139
140 // Can't get id after process has exited
141 assert_eq!(child.id(), None);
142}
143
144#[tokio::test]
145async fn pipe_from_one_command_to_another() {
146 let mut first = cat().spawn().expect("first cmd");
147 let mut third = cat().spawn().expect("third cmd");
148
149 // Convert ChildStdout to Stdio
150 let second_stdin: Stdio = first
151 .stdout
152 .take()
153 .expect("first.stdout")
154 .try_into()
155 .expect("first.stdout into Stdio");
156
157 // Convert ChildStdin to Stdio
158 let second_stdout: Stdio = third
159 .stdin
160 .take()
161 .expect("third.stdin")
162 .try_into()
163 .expect("third.stdin into Stdio");
164
165 let mut second = cat()
166 .stdin(second_stdin)
167 .stdout(second_stdout)
168 .spawn()
169 .expect("first cmd");
170
171 let msg = "hello world! please pipe this message through";
172
173 let mut stdin = first.stdin.take().expect("first.stdin");
174 let write = async move { stdin.write_all(msg.as_bytes()).await };
175
176 let mut stdout = third.stdout.take().expect("third.stdout");
177 let read = async move {
178 let mut data = String::new();
179 stdout.read_to_string(&mut data).await.map(|_| data)
180 };
181
182 let (read, write, first_status, second_status, third_status) =
183 join!(read, write, first.wait(), second.wait(), third.wait());
184
185 assert_eq!(msg, read.expect("read result"));
186 write.expect("write result");
187
188 assert!(first_status.expect("first status").success());
189 assert!(second_status.expect("second status").success());
190 assert!(third_status.expect("third status").success());
191}
192
193#[tokio::test]
194async fn vectored_writes() {
195 use bytes::{Buf, Bytes};
196 use std::{io::IoSlice, pin::Pin};
197 use tokio::io::AsyncWrite;
198
199 let mut cat = cat().spawn().unwrap();
200 let mut stdin = cat.stdin.take().unwrap();
201 let are_writes_vectored = stdin.is_write_vectored();
202 let mut stdout = cat.stdout.take().unwrap();
203
204 let write = async {
205 let mut input = Bytes::from_static(b"hello\n").chain(Bytes::from_static(b"world!\n"));
206 let mut writes_completed = 0;
207
208 futures::future::poll_fn(|cx| loop {
209 let mut slices = [IoSlice::new(&[]); 2];
210 let vectored = input.chunks_vectored(&mut slices);
211 if vectored == 0 {
212 return std::task::Poll::Ready(std::io::Result::Ok(()));
213 }
214 let n = futures::ready!(Pin::new(&mut stdin).poll_write_vectored(cx, &slices))?;
215 writes_completed += 1;
216 input.advance(n);
217 })
218 .await?;
219
220 drop(stdin);
221
222 std::io::Result::Ok(writes_completed)
223 };
224
225 let read = async {
226 let mut buffer = Vec::with_capacity(6 + 7);
227 stdout.read_to_end(&mut buffer).await?;
228 std::io::Result::Ok(buffer)
229 };
230
231 let (write, read, status) = future::join3(write, read, cat.wait()).await;
232
233 assert!(status.unwrap().success());
234
235 let writes_completed = write.unwrap();
236 // on unix our small payload should always fit in whatever default sized pipe with a single
237 // syscall. if multiple are used, then the forwarding does not work, or we are on a platform
238 // for which the `std` does not support vectored writes.
239 assert_eq!(writes_completed == 1, are_writes_vectored);
240
241 assert_eq!(&read.unwrap(), b"hello\nworld!\n");
242}
243