1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (all(feature = "full" , not(target_os = "wasi" )))] |
3 | |
4 | use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; |
5 | use tokio::join; |
6 | use tokio::process::{Child, Command}; |
7 | use tokio_test::assert_ok; |
8 | |
9 | use futures::future::{self, FutureExt}; |
10 | use std::env; |
11 | use std::io; |
12 | use std::process::{ExitStatus, Stdio}; |
13 | |
14 | fn cat() -> Command { |
15 | let mut cmd = Command::new(env!("CARGO_BIN_EXE_test-cat" )); |
16 | cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); |
17 | cmd |
18 | } |
19 | |
20 | async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> { |
21 | let mut stdin = cat.stdin.take().unwrap(); |
22 | let stdout = cat.stdout.take().unwrap(); |
23 | |
24 | // Produce n lines on the child's stdout. |
25 | let write = async { |
26 | for i in 0..n { |
27 | let bytes = format!("line {} \n" , i).into_bytes(); |
28 | stdin.write_all(&bytes).await.unwrap(); |
29 | } |
30 | |
31 | drop(stdin); |
32 | }; |
33 | |
34 | let read = async { |
35 | let mut reader = BufReader::new(stdout).lines(); |
36 | let mut num_lines = 0; |
37 | |
38 | // Try to read `n + 1` lines, ensuring the last one is empty |
39 | // (i.e. EOF is reached after `n` lines. |
40 | loop { |
41 | let data = reader |
42 | .next_line() |
43 | .await |
44 | .unwrap_or_else(|_| Some(String::new())) |
45 | .expect("failed to read line" ); |
46 | |
47 | let num_read = data.len(); |
48 | let done = num_lines >= n; |
49 | |
50 | match (done, num_read) { |
51 | (false, 0) => panic!("broken pipe" ), |
52 | (true, n) if n != 0 => panic!("extraneous data" ), |
53 | _ => { |
54 | let expected = format!("line {}" , num_lines); |
55 | assert_eq!(expected, data); |
56 | } |
57 | }; |
58 | |
59 | num_lines += 1; |
60 | if num_lines >= n { |
61 | break; |
62 | } |
63 | } |
64 | }; |
65 | |
66 | // Compose reading and writing concurrently. |
67 | future::join3(write, read, cat.wait()) |
68 | .map(|(_, _, status)| status) |
69 | .await |
70 | } |
71 | |
72 | /// Check for the following properties when feeding stdin and |
73 | /// consuming stdout of a cat-like process: |
74 | /// |
75 | /// - A number of lines that amounts to a number of bytes exceeding a |
76 | /// typical OS buffer size can be fed to the child without |
77 | /// deadlock. This tests that we also consume the stdout |
78 | /// concurrently; otherwise this would deadlock. |
79 | /// |
80 | /// - We read the same lines from the child that we fed it. |
81 | /// |
82 | /// - The child does produce EOF on stdout after the last line. |
83 | #[tokio::test ] |
84 | async fn feed_a_lot() { |
85 | let child = cat().spawn().unwrap(); |
86 | let status = feed_cat(child, 10000).await.unwrap(); |
87 | assert_eq!(status.code(), Some(0)); |
88 | } |
89 | |
90 | #[tokio::test ] |
91 | async fn wait_with_output_captures() { |
92 | let mut child = cat().spawn().unwrap(); |
93 | let mut stdin = child.stdin.take().unwrap(); |
94 | |
95 | let write_bytes = b"1234" ; |
96 | |
97 | let future = async { |
98 | stdin.write_all(write_bytes).await?; |
99 | drop(stdin); |
100 | let out = child.wait_with_output(); |
101 | out.await |
102 | }; |
103 | |
104 | let output = future.await.unwrap(); |
105 | |
106 | assert!(output.status.success()); |
107 | assert_eq!(output.stdout, write_bytes); |
108 | assert_eq!(output.stderr.len(), 0); |
109 | } |
110 | |
111 | #[tokio::test ] |
112 | async fn status_closes_any_pipes() { |
113 | // Cat will open a pipe between the parent and child. |
114 | // If `status_async` doesn't ensure the handles are closed, |
115 | // we would end up blocking forever (and time out). |
116 | let child = cat().status(); |
117 | |
118 | assert_ok!(child.await); |
119 | } |
120 | |
121 | #[tokio::test ] |
122 | async fn try_wait() { |
123 | let mut child = cat().spawn().unwrap(); |
124 | |
125 | let id = child.id().expect("missing id" ); |
126 | assert!(id > 0); |
127 | |
128 | assert_eq!(None, assert_ok!(child.try_wait())); |
129 | |
130 | // Drop the child's stdio handles so it can terminate |
131 | drop(child.stdin.take()); |
132 | drop(child.stderr.take()); |
133 | drop(child.stdout.take()); |
134 | |
135 | assert_ok!(child.wait().await); |
136 | |
137 | // test that the `.try_wait()` method is fused just like the stdlib |
138 | assert!(assert_ok!(child.try_wait()).unwrap().success()); |
139 | |
140 | // Can't get id after process has exited |
141 | assert_eq!(child.id(), None); |
142 | } |
143 | |
144 | #[tokio::test ] |
145 | async fn pipe_from_one_command_to_another() { |
146 | let mut first = cat().spawn().expect("first cmd" ); |
147 | let mut third = cat().spawn().expect("third cmd" ); |
148 | |
149 | // Convert ChildStdout to Stdio |
150 | let second_stdin: Stdio = first |
151 | .stdout |
152 | .take() |
153 | .expect("first.stdout" ) |
154 | .try_into() |
155 | .expect("first.stdout into Stdio" ); |
156 | |
157 | // Convert ChildStdin to Stdio |
158 | let second_stdout: Stdio = third |
159 | .stdin |
160 | .take() |
161 | .expect("third.stdin" ) |
162 | .try_into() |
163 | .expect("third.stdin into Stdio" ); |
164 | |
165 | let mut second = cat() |
166 | .stdin(second_stdin) |
167 | .stdout(second_stdout) |
168 | .spawn() |
169 | .expect("first cmd" ); |
170 | |
171 | let msg = "hello world! please pipe this message through" ; |
172 | |
173 | let mut stdin = first.stdin.take().expect("first.stdin" ); |
174 | let write = async move { stdin.write_all(msg.as_bytes()).await }; |
175 | |
176 | let mut stdout = third.stdout.take().expect("third.stdout" ); |
177 | let read = async move { |
178 | let mut data = String::new(); |
179 | stdout.read_to_string(&mut data).await.map(|_| data) |
180 | }; |
181 | |
182 | let (read, write, first_status, second_status, third_status) = |
183 | join!(read, write, first.wait(), second.wait(), third.wait()); |
184 | |
185 | assert_eq!(msg, read.expect("read result" )); |
186 | write.expect("write result" ); |
187 | |
188 | assert!(first_status.expect("first status" ).success()); |
189 | assert!(second_status.expect("second status" ).success()); |
190 | assert!(third_status.expect("third status" ).success()); |
191 | } |
192 | |
193 | #[tokio::test ] |
194 | async fn vectored_writes() { |
195 | use bytes::{Buf, Bytes}; |
196 | use std::{io::IoSlice, pin::Pin}; |
197 | use tokio::io::AsyncWrite; |
198 | |
199 | let mut cat = cat().spawn().unwrap(); |
200 | let mut stdin = cat.stdin.take().unwrap(); |
201 | let are_writes_vectored = stdin.is_write_vectored(); |
202 | let mut stdout = cat.stdout.take().unwrap(); |
203 | |
204 | let write = async { |
205 | let mut input = Bytes::from_static(b"hello \n" ).chain(Bytes::from_static(b"world! \n" )); |
206 | let mut writes_completed = 0; |
207 | |
208 | futures::future::poll_fn(|cx| loop { |
209 | let mut slices = [IoSlice::new(&[]); 2]; |
210 | let vectored = input.chunks_vectored(&mut slices); |
211 | if vectored == 0 { |
212 | return std::task::Poll::Ready(std::io::Result::Ok(())); |
213 | } |
214 | let n = futures::ready!(Pin::new(&mut stdin).poll_write_vectored(cx, &slices))?; |
215 | writes_completed += 1; |
216 | input.advance(n); |
217 | }) |
218 | .await?; |
219 | |
220 | drop(stdin); |
221 | |
222 | std::io::Result::Ok(writes_completed) |
223 | }; |
224 | |
225 | let read = async { |
226 | let mut buffer = Vec::with_capacity(6 + 7); |
227 | stdout.read_to_end(&mut buffer).await?; |
228 | std::io::Result::Ok(buffer) |
229 | }; |
230 | |
231 | let (write, read, status) = future::join3(write, read, cat.wait()).await; |
232 | |
233 | assert!(status.unwrap().success()); |
234 | |
235 | let writes_completed = write.unwrap(); |
236 | // on unix our small payload should always fit in whatever default sized pipe with a single |
237 | // syscall. if multiple are used, then the forwarding does not work, or we are on a platform |
238 | // for which the `std` does not support vectored writes. |
239 | assert_eq!(writes_completed == 1, are_writes_vectored); |
240 | |
241 | assert_eq!(&read.unwrap(), b"hello \nworld! \n" ); |
242 | } |
243 | |