| 1 | #![warn (rust_2018_idioms)] |
| 2 | #![cfg (all(feature = "full" , not(target_os = "wasi" )))] |
| 3 | |
| 4 | use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; |
| 5 | use tokio::join; |
| 6 | use tokio::process::{Child, Command}; |
| 7 | use tokio_test::assert_ok; |
| 8 | |
| 9 | use futures::future::{self, FutureExt}; |
| 10 | use std::env; |
| 11 | use std::io; |
| 12 | use std::process::{ExitStatus, Stdio}; |
| 13 | |
| 14 | fn cat() -> Command { |
| 15 | let mut cmd = Command::new(env!("CARGO_BIN_EXE_test-cat" )); |
| 16 | cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); |
| 17 | cmd |
| 18 | } |
| 19 | |
| 20 | async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> { |
| 21 | let mut stdin = cat.stdin.take().unwrap(); |
| 22 | let stdout = cat.stdout.take().unwrap(); |
| 23 | |
| 24 | // Produce n lines on the child's stdout. |
| 25 | let write = async { |
| 26 | for i in 0..n { |
| 27 | let bytes = format!("line {} \n" , i).into_bytes(); |
| 28 | stdin.write_all(&bytes).await.unwrap(); |
| 29 | } |
| 30 | |
| 31 | drop(stdin); |
| 32 | }; |
| 33 | |
| 34 | let read = async { |
| 35 | let mut reader = BufReader::new(stdout).lines(); |
| 36 | let mut num_lines = 0; |
| 37 | |
| 38 | // Try to read `n + 1` lines, ensuring the last one is empty |
| 39 | // (i.e. EOF is reached after `n` lines. |
| 40 | loop { |
| 41 | let data = reader |
| 42 | .next_line() |
| 43 | .await |
| 44 | .unwrap_or_else(|_| Some(String::new())) |
| 45 | .expect("failed to read line" ); |
| 46 | |
| 47 | let num_read = data.len(); |
| 48 | let done = num_lines >= n; |
| 49 | |
| 50 | match (done, num_read) { |
| 51 | (false, 0) => panic!("broken pipe" ), |
| 52 | (true, n) if n != 0 => panic!("extraneous data" ), |
| 53 | _ => { |
| 54 | let expected = format!("line {}" , num_lines); |
| 55 | assert_eq!(expected, data); |
| 56 | } |
| 57 | }; |
| 58 | |
| 59 | num_lines += 1; |
| 60 | if num_lines >= n { |
| 61 | break; |
| 62 | } |
| 63 | } |
| 64 | }; |
| 65 | |
| 66 | // Compose reading and writing concurrently. |
| 67 | future::join3(write, read, cat.wait()) |
| 68 | .map(|(_, _, status)| status) |
| 69 | .await |
| 70 | } |
| 71 | |
| 72 | /// Check for the following properties when feeding stdin and |
| 73 | /// consuming stdout of a cat-like process: |
| 74 | /// |
| 75 | /// - A number of lines that amounts to a number of bytes exceeding a |
| 76 | /// typical OS buffer size can be fed to the child without |
| 77 | /// deadlock. This tests that we also consume the stdout |
| 78 | /// concurrently; otherwise this would deadlock. |
| 79 | /// |
| 80 | /// - We read the same lines from the child that we fed it. |
| 81 | /// |
| 82 | /// - The child does produce EOF on stdout after the last line. |
| 83 | #[tokio::test ] |
| 84 | async fn feed_a_lot() { |
| 85 | let child = cat().spawn().unwrap(); |
| 86 | let status = feed_cat(child, 10000).await.unwrap(); |
| 87 | assert_eq!(status.code(), Some(0)); |
| 88 | } |
| 89 | |
| 90 | #[tokio::test ] |
| 91 | async fn wait_with_output_captures() { |
| 92 | let mut child = cat().spawn().unwrap(); |
| 93 | let mut stdin = child.stdin.take().unwrap(); |
| 94 | |
| 95 | let write_bytes = b"1234" ; |
| 96 | |
| 97 | let future = async { |
| 98 | stdin.write_all(write_bytes).await?; |
| 99 | drop(stdin); |
| 100 | let out = child.wait_with_output(); |
| 101 | out.await |
| 102 | }; |
| 103 | |
| 104 | let output = future.await.unwrap(); |
| 105 | |
| 106 | assert!(output.status.success()); |
| 107 | assert_eq!(output.stdout, write_bytes); |
| 108 | assert_eq!(output.stderr.len(), 0); |
| 109 | } |
| 110 | |
| 111 | #[tokio::test ] |
| 112 | async fn status_closes_any_pipes() { |
| 113 | // Cat will open a pipe between the parent and child. |
| 114 | // If `status_async` doesn't ensure the handles are closed, |
| 115 | // we would end up blocking forever (and time out). |
| 116 | let child = cat().status(); |
| 117 | |
| 118 | assert_ok!(child.await); |
| 119 | } |
| 120 | |
| 121 | #[tokio::test ] |
| 122 | async fn try_wait() { |
| 123 | let mut child = cat().spawn().unwrap(); |
| 124 | |
| 125 | let id = child.id().expect("missing id" ); |
| 126 | assert!(id > 0); |
| 127 | |
| 128 | assert_eq!(None, assert_ok!(child.try_wait())); |
| 129 | |
| 130 | // Drop the child's stdio handles so it can terminate |
| 131 | drop(child.stdin.take()); |
| 132 | drop(child.stderr.take()); |
| 133 | drop(child.stdout.take()); |
| 134 | |
| 135 | assert_ok!(child.wait().await); |
| 136 | |
| 137 | // test that the `.try_wait()` method is fused just like the stdlib |
| 138 | assert!(assert_ok!(child.try_wait()).unwrap().success()); |
| 139 | |
| 140 | // Can't get id after process has exited |
| 141 | assert_eq!(child.id(), None); |
| 142 | } |
| 143 | |
| 144 | #[tokio::test ] |
| 145 | async fn pipe_from_one_command_to_another() { |
| 146 | let mut first = cat().spawn().expect("first cmd" ); |
| 147 | let mut third = cat().spawn().expect("third cmd" ); |
| 148 | |
| 149 | // Convert ChildStdout to Stdio |
| 150 | let second_stdin: Stdio = first |
| 151 | .stdout |
| 152 | .take() |
| 153 | .expect("first.stdout" ) |
| 154 | .try_into() |
| 155 | .expect("first.stdout into Stdio" ); |
| 156 | |
| 157 | // Convert ChildStdin to Stdio |
| 158 | let second_stdout: Stdio = third |
| 159 | .stdin |
| 160 | .take() |
| 161 | .expect("third.stdin" ) |
| 162 | .try_into() |
| 163 | .expect("third.stdin into Stdio" ); |
| 164 | |
| 165 | let mut second = cat() |
| 166 | .stdin(second_stdin) |
| 167 | .stdout(second_stdout) |
| 168 | .spawn() |
| 169 | .expect("first cmd" ); |
| 170 | |
| 171 | let msg = "hello world! please pipe this message through" ; |
| 172 | |
| 173 | let mut stdin = first.stdin.take().expect("first.stdin" ); |
| 174 | let write = async move { stdin.write_all(msg.as_bytes()).await }; |
| 175 | |
| 176 | let mut stdout = third.stdout.take().expect("third.stdout" ); |
| 177 | let read = async move { |
| 178 | let mut data = String::new(); |
| 179 | stdout.read_to_string(&mut data).await.map(|_| data) |
| 180 | }; |
| 181 | |
| 182 | let (read, write, first_status, second_status, third_status) = |
| 183 | join!(read, write, first.wait(), second.wait(), third.wait()); |
| 184 | |
| 185 | assert_eq!(msg, read.expect("read result" )); |
| 186 | write.expect("write result" ); |
| 187 | |
| 188 | assert!(first_status.expect("first status" ).success()); |
| 189 | assert!(second_status.expect("second status" ).success()); |
| 190 | assert!(third_status.expect("third status" ).success()); |
| 191 | } |
| 192 | |
| 193 | #[tokio::test ] |
| 194 | async fn vectored_writes() { |
| 195 | use bytes::{Buf, Bytes}; |
| 196 | use std::{io::IoSlice, pin::Pin}; |
| 197 | use tokio::io::AsyncWrite; |
| 198 | |
| 199 | let mut cat = cat().spawn().unwrap(); |
| 200 | let mut stdin = cat.stdin.take().unwrap(); |
| 201 | let are_writes_vectored = stdin.is_write_vectored(); |
| 202 | let mut stdout = cat.stdout.take().unwrap(); |
| 203 | |
| 204 | let write = async { |
| 205 | let mut input = Bytes::from_static(b"hello \n" ).chain(Bytes::from_static(b"world! \n" )); |
| 206 | let mut writes_completed = 0; |
| 207 | |
| 208 | futures::future::poll_fn(|cx| loop { |
| 209 | let mut slices = [IoSlice::new(&[]); 2]; |
| 210 | let vectored = input.chunks_vectored(&mut slices); |
| 211 | if vectored == 0 { |
| 212 | return std::task::Poll::Ready(std::io::Result::Ok(())); |
| 213 | } |
| 214 | let n = futures::ready!(Pin::new(&mut stdin).poll_write_vectored(cx, &slices))?; |
| 215 | writes_completed += 1; |
| 216 | input.advance(n); |
| 217 | }) |
| 218 | .await?; |
| 219 | |
| 220 | drop(stdin); |
| 221 | |
| 222 | std::io::Result::Ok(writes_completed) |
| 223 | }; |
| 224 | |
| 225 | let read = async { |
| 226 | let mut buffer = Vec::with_capacity(6 + 7); |
| 227 | stdout.read_to_end(&mut buffer).await?; |
| 228 | std::io::Result::Ok(buffer) |
| 229 | }; |
| 230 | |
| 231 | let (write, read, status) = future::join3(write, read, cat.wait()).await; |
| 232 | |
| 233 | assert!(status.unwrap().success()); |
| 234 | |
| 235 | let writes_completed = write.unwrap(); |
| 236 | // on unix our small payload should always fit in whatever default sized pipe with a single |
| 237 | // syscall. if multiple are used, then the forwarding does not work, or we are on a platform |
| 238 | // for which the `std` does not support vectored writes. |
| 239 | assert_eq!(writes_completed == 1, are_writes_vectored); |
| 240 | |
| 241 | assert_eq!(&read.unwrap(), b"hello \nworld! \n" ); |
| 242 | } |
| 243 | |