1 | #![cfg (feature = "full" )] |
2 | #![cfg (unix)] |
3 | |
4 | use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; |
5 | use tokio::net::unix::pipe; |
6 | use tokio_test::task; |
7 | use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok}; |
8 | |
9 | use std::fs::File; |
10 | use std::io; |
11 | use std::os::unix::fs::OpenOptionsExt; |
12 | use std::os::unix::io::AsRawFd; |
13 | use std::path::{Path, PathBuf}; |
14 | |
15 | /// Helper struct which will clean up temporary files once dropped. |
16 | struct TempFifo { |
17 | path: PathBuf, |
18 | _dir: tempfile::TempDir, |
19 | } |
20 | |
21 | impl TempFifo { |
22 | fn new(name: &str) -> io::Result<TempFifo> { |
23 | let dir = tempfile::Builder::new() |
24 | .prefix("tokio-fifo-tests" ) |
25 | .tempdir()?; |
26 | let path = dir.path().join(name); |
27 | nix::unistd::mkfifo(&path, nix::sys::stat::Mode::S_IRWXU)?; |
28 | |
29 | Ok(TempFifo { path, _dir: dir }) |
30 | } |
31 | } |
32 | |
33 | impl AsRef<Path> for TempFifo { |
34 | fn as_ref(&self) -> &Path { |
35 | self.path.as_ref() |
36 | } |
37 | } |
38 | |
39 | #[tokio::test ] |
40 | async fn fifo_simple_send() -> io::Result<()> { |
41 | const DATA: &[u8] = b"this is some data to write to the fifo" ; |
42 | |
43 | let fifo = TempFifo::new("simple_send" )?; |
44 | |
45 | // Create a reading task which should wait for data from the pipe. |
46 | let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; |
47 | let mut read_fut = task::spawn(async move { |
48 | let mut buf = vec![0; DATA.len()]; |
49 | reader.read_exact(&mut buf).await?; |
50 | Ok::<_, io::Error>(buf) |
51 | }); |
52 | assert_pending!(read_fut.poll()); |
53 | |
54 | let mut writer = pipe::OpenOptions::new().open_sender(&fifo)?; |
55 | writer.write_all(DATA).await?; |
56 | |
57 | // Let the IO driver poll events for the reader. |
58 | while !read_fut.is_woken() { |
59 | tokio::task::yield_now().await; |
60 | } |
61 | |
62 | // Reading task should be ready now. |
63 | let read_data = assert_ready_ok!(read_fut.poll()); |
64 | assert_eq!(&read_data, DATA); |
65 | |
66 | Ok(()) |
67 | } |
68 | |
69 | #[tokio::test ] |
70 | #[cfg (target_os = "linux" )] |
71 | async fn fifo_simple_send_sender_first() -> io::Result<()> { |
72 | const DATA: &[u8] = b"this is some data to write to the fifo" ; |
73 | |
74 | // Create a new fifo file with *no reading ends open*. |
75 | let fifo = TempFifo::new("simple_send_sender_first" )?; |
76 | |
77 | // Simple `open_sender` should fail with ENXIO (no such device or address). |
78 | let err = assert_err!(pipe::OpenOptions::new().open_sender(&fifo)); |
79 | assert_eq!(err.raw_os_error(), Some(libc::ENXIO)); |
80 | |
81 | // `open_sender` in read-write mode should succeed and the pipe should be ready to write. |
82 | let mut writer = pipe::OpenOptions::new() |
83 | .read_write(true) |
84 | .open_sender(&fifo)?; |
85 | writer.write_all(DATA).await?; |
86 | |
87 | // Read the written data and validate. |
88 | let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; |
89 | let mut read_data = vec![0; DATA.len()]; |
90 | reader.read_exact(&mut read_data).await?; |
91 | assert_eq!(&read_data, DATA); |
92 | |
93 | Ok(()) |
94 | } |
95 | |
96 | // Opens a FIFO file, write and *close the writer*. |
97 | async fn write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()> { |
98 | let mut writer = pipe::OpenOptions::new().open_sender(path)?; |
99 | writer.write_all(msg).await?; |
100 | drop(writer); // Explicit drop. |
101 | Ok(()) |
102 | } |
103 | |
104 | /// Checks EOF behavior with single reader and writers sequentially opening |
105 | /// and closing a FIFO. |
106 | #[tokio::test ] |
107 | async fn fifo_multiple_writes() -> io::Result<()> { |
108 | const DATA: &[u8] = b"this is some data to write to the fifo" ; |
109 | |
110 | let fifo = TempFifo::new("fifo_multiple_writes" )?; |
111 | |
112 | let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; |
113 | |
114 | write_and_close(&fifo, DATA).await?; |
115 | let ev = reader.ready(Interest::READABLE).await?; |
116 | assert!(ev.is_readable()); |
117 | let mut read_data = vec![0; DATA.len()]; |
118 | assert_ok!(reader.read_exact(&mut read_data).await); |
119 | |
120 | // Check that reader hits EOF. |
121 | let err = assert_err!(reader.read_exact(&mut read_data).await); |
122 | assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof); |
123 | |
124 | // Write more data and read again. |
125 | write_and_close(&fifo, DATA).await?; |
126 | assert_ok!(reader.read_exact(&mut read_data).await); |
127 | |
128 | Ok(()) |
129 | } |
130 | |
131 | /// Checks behavior of a resilient reader (Receiver in O_RDWR access mode) |
132 | /// with writers sequentially opening and closing a FIFO. |
133 | #[tokio::test ] |
134 | #[cfg (target_os = "linux" )] |
135 | async fn fifo_resilient_reader() -> io::Result<()> { |
136 | const DATA: &[u8] = b"this is some data to write to the fifo" ; |
137 | |
138 | let fifo = TempFifo::new("fifo_resilient_reader" )?; |
139 | |
140 | // Open reader in read-write access mode. |
141 | let mut reader = pipe::OpenOptions::new() |
142 | .read_write(true) |
143 | .open_receiver(&fifo)?; |
144 | |
145 | write_and_close(&fifo, DATA).await?; |
146 | let ev = reader.ready(Interest::READABLE).await?; |
147 | let mut read_data = vec![0; DATA.len()]; |
148 | reader.read_exact(&mut read_data).await?; |
149 | |
150 | // Check that reader didn't hit EOF. |
151 | assert!(!ev.is_read_closed()); |
152 | |
153 | // Resilient reader can asynchronously wait for the next writer. |
154 | let mut second_read_fut = task::spawn(reader.read_exact(&mut read_data)); |
155 | assert_pending!(second_read_fut.poll()); |
156 | |
157 | // Write more data and read again. |
158 | write_and_close(&fifo, DATA).await?; |
159 | assert_ok!(second_read_fut.await); |
160 | |
161 | Ok(()) |
162 | } |
163 | |
164 | #[tokio::test ] |
165 | async fn open_detects_not_a_fifo() -> io::Result<()> { |
166 | let dir = tempfile::Builder::new() |
167 | .prefix("tokio-fifo-tests" ) |
168 | .tempdir() |
169 | .unwrap(); |
170 | let path = dir.path().join("not_a_fifo" ); |
171 | |
172 | // Create an ordinary file. |
173 | File::create(&path)?; |
174 | |
175 | // Check if Sender detects invalid file type. |
176 | let err = assert_err!(pipe::OpenOptions::new().open_sender(&path)); |
177 | assert_eq!(err.kind(), io::ErrorKind::InvalidInput); |
178 | |
179 | // Check if Receiver detects invalid file type. |
180 | let err = assert_err!(pipe::OpenOptions::new().open_sender(&path)); |
181 | assert_eq!(err.kind(), io::ErrorKind::InvalidInput); |
182 | |
183 | Ok(()) |
184 | } |
185 | |
186 | #[tokio::test ] |
187 | async fn from_file() -> io::Result<()> { |
188 | const DATA: &[u8] = b"this is some data to write to the fifo" ; |
189 | |
190 | let fifo = TempFifo::new("from_file" )?; |
191 | |
192 | // Construct a Receiver from a File. |
193 | let file = std::fs::OpenOptions::new() |
194 | .read(true) |
195 | .custom_flags(libc::O_NONBLOCK) |
196 | .open(&fifo)?; |
197 | let mut reader = pipe::Receiver::from_file(file)?; |
198 | |
199 | // Construct a Sender from a File. |
200 | let file = std::fs::OpenOptions::new() |
201 | .write(true) |
202 | .custom_flags(libc::O_NONBLOCK) |
203 | .open(&fifo)?; |
204 | let mut writer = pipe::Sender::from_file(file)?; |
205 | |
206 | // Write and read some data to test async. |
207 | let mut read_fut = task::spawn(async move { |
208 | let mut buf = vec![0; DATA.len()]; |
209 | reader.read_exact(&mut buf).await?; |
210 | Ok::<_, io::Error>(buf) |
211 | }); |
212 | assert_pending!(read_fut.poll()); |
213 | |
214 | writer.write_all(DATA).await?; |
215 | |
216 | let read_data = assert_ok!(read_fut.await); |
217 | assert_eq!(&read_data, DATA); |
218 | |
219 | Ok(()) |
220 | } |
221 | |
222 | #[tokio::test ] |
223 | async fn from_file_detects_not_a_fifo() -> io::Result<()> { |
224 | let dir = tempfile::Builder::new() |
225 | .prefix("tokio-fifo-tests" ) |
226 | .tempdir() |
227 | .unwrap(); |
228 | let path = dir.path().join("not_a_fifo" ); |
229 | |
230 | // Create an ordinary file. |
231 | File::create(&path)?; |
232 | |
233 | // Check if Sender detects invalid file type. |
234 | let file = std::fs::OpenOptions::new().write(true).open(&path)?; |
235 | let err = assert_err!(pipe::Sender::from_file(file)); |
236 | assert_eq!(err.kind(), io::ErrorKind::InvalidInput); |
237 | |
238 | // Check if Receiver detects invalid file type. |
239 | let file = std::fs::OpenOptions::new().read(true).open(&path)?; |
240 | let err = assert_err!(pipe::Receiver::from_file(file)); |
241 | assert_eq!(err.kind(), io::ErrorKind::InvalidInput); |
242 | |
243 | Ok(()) |
244 | } |
245 | |
246 | #[tokio::test ] |
247 | async fn from_file_detects_wrong_access_mode() -> io::Result<()> { |
248 | let fifo = TempFifo::new("wrong_access_mode" )?; |
249 | |
250 | // Open a read end to open the fifo for writing. |
251 | let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?; |
252 | |
253 | // Check if Receiver detects write-only access mode. |
254 | let wronly = std::fs::OpenOptions::new() |
255 | .write(true) |
256 | .custom_flags(libc::O_NONBLOCK) |
257 | .open(&fifo)?; |
258 | let err = assert_err!(pipe::Receiver::from_file(wronly)); |
259 | assert_eq!(err.kind(), io::ErrorKind::InvalidInput); |
260 | |
261 | // Check if Sender detects read-only access mode. |
262 | let rdonly = std::fs::OpenOptions::new() |
263 | .read(true) |
264 | .custom_flags(libc::O_NONBLOCK) |
265 | .open(&fifo)?; |
266 | let err = assert_err!(pipe::Sender::from_file(rdonly)); |
267 | assert_eq!(err.kind(), io::ErrorKind::InvalidInput); |
268 | |
269 | Ok(()) |
270 | } |
271 | |
272 | fn is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool> { |
273 | let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?; |
274 | Ok((flags & libc::O_NONBLOCK) != 0) |
275 | } |
276 | |
277 | #[tokio::test ] |
278 | async fn from_file_sets_nonblock() -> io::Result<()> { |
279 | let fifo = TempFifo::new("sets_nonblock" )?; |
280 | |
281 | // Open read and write ends to let blocking files open. |
282 | let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?; |
283 | let _writer = pipe::OpenOptions::new().open_sender(&fifo)?; |
284 | |
285 | // Check if Receiver sets the pipe in non-blocking mode. |
286 | let rdonly = std::fs::OpenOptions::new().read(true).open(&fifo)?; |
287 | assert!(!is_nonblocking(&rdonly)?); |
288 | let reader = pipe::Receiver::from_file(rdonly)?; |
289 | assert!(is_nonblocking(&reader)?); |
290 | |
291 | // Check if Sender sets the pipe in non-blocking mode. |
292 | let wronly = std::fs::OpenOptions::new().write(true).open(&fifo)?; |
293 | assert!(!is_nonblocking(&wronly)?); |
294 | let writer = pipe::Sender::from_file(wronly)?; |
295 | assert!(is_nonblocking(&writer)?); |
296 | |
297 | Ok(()) |
298 | } |
299 | |
300 | fn writable_by_poll(writer: &pipe::Sender) -> bool { |
301 | task::spawn(writer.writable()).poll().is_ready() |
302 | } |
303 | |
304 | #[tokio::test ] |
305 | async fn try_read_write() -> io::Result<()> { |
306 | const DATA: &[u8] = b"this is some data to write to the fifo" ; |
307 | |
308 | // Create a pipe pair over a fifo file. |
309 | let fifo = TempFifo::new("try_read_write" )?; |
310 | let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; |
311 | let writer = pipe::OpenOptions::new().open_sender(&fifo)?; |
312 | |
313 | // Fill the pipe buffer with `try_write`. |
314 | let mut write_data = Vec::new(); |
315 | while writable_by_poll(&writer) { |
316 | match writer.try_write(DATA) { |
317 | Ok(n) => write_data.extend(&DATA[..n]), |
318 | Err(e) => { |
319 | assert_eq!(e.kind(), io::ErrorKind::WouldBlock); |
320 | break; |
321 | } |
322 | } |
323 | } |
324 | |
325 | // Drain the pipe buffer with `try_read`. |
326 | let mut read_data = vec![0; write_data.len()]; |
327 | let mut i = 0; |
328 | while i < write_data.len() { |
329 | reader.readable().await?; |
330 | match reader.try_read(&mut read_data[i..]) { |
331 | Ok(n) => i += n, |
332 | Err(e) => { |
333 | assert_eq!(e.kind(), io::ErrorKind::WouldBlock); |
334 | continue; |
335 | } |
336 | } |
337 | } |
338 | |
339 | assert_eq!(read_data, write_data); |
340 | |
341 | Ok(()) |
342 | } |
343 | |
344 | #[tokio::test ] |
345 | async fn try_read_write_vectored() -> io::Result<()> { |
346 | const DATA: &[u8] = b"this is some data to write to the fifo" ; |
347 | |
348 | // Create a pipe pair over a fifo file. |
349 | let fifo = TempFifo::new("try_read_write_vectored" )?; |
350 | let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; |
351 | let writer = pipe::OpenOptions::new().open_sender(&fifo)?; |
352 | |
353 | let write_bufs: Vec<_> = DATA.chunks(3).map(io::IoSlice::new).collect(); |
354 | |
355 | // Fill the pipe buffer with `try_write_vectored`. |
356 | let mut write_data = Vec::new(); |
357 | while writable_by_poll(&writer) { |
358 | match writer.try_write_vectored(&write_bufs) { |
359 | Ok(n) => write_data.extend(&DATA[..n]), |
360 | Err(e) => { |
361 | assert_eq!(e.kind(), io::ErrorKind::WouldBlock); |
362 | break; |
363 | } |
364 | } |
365 | } |
366 | |
367 | // Drain the pipe buffer with `try_read_vectored`. |
368 | let mut read_data = vec![0; write_data.len()]; |
369 | let mut i = 0; |
370 | while i < write_data.len() { |
371 | reader.readable().await?; |
372 | |
373 | let mut read_bufs: Vec<_> = read_data[i..] |
374 | .chunks_mut(0x10000) |
375 | .map(io::IoSliceMut::new) |
376 | .collect(); |
377 | match reader.try_read_vectored(&mut read_bufs) { |
378 | Ok(n) => i += n, |
379 | Err(e) => { |
380 | assert_eq!(e.kind(), io::ErrorKind::WouldBlock); |
381 | continue; |
382 | } |
383 | } |
384 | } |
385 | |
386 | assert_eq!(read_data, write_data); |
387 | |
388 | Ok(()) |
389 | } |
390 | |
391 | #[tokio::test ] |
392 | async fn try_read_buf() -> std::io::Result<()> { |
393 | const DATA: &[u8] = b"this is some data to write to the fifo" ; |
394 | |
395 | // Create a pipe pair over a fifo file. |
396 | let fifo = TempFifo::new("try_read_write_vectored" )?; |
397 | let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; |
398 | let writer = pipe::OpenOptions::new().open_sender(&fifo)?; |
399 | |
400 | // Fill the pipe buffer with `try_write`. |
401 | let mut write_data = Vec::new(); |
402 | while writable_by_poll(&writer) { |
403 | match writer.try_write(DATA) { |
404 | Ok(n) => write_data.extend(&DATA[..n]), |
405 | Err(e) => { |
406 | assert_eq!(e.kind(), io::ErrorKind::WouldBlock); |
407 | break; |
408 | } |
409 | } |
410 | } |
411 | |
412 | // Drain the pipe buffer with `try_read_buf`. |
413 | let mut read_data = vec![0; write_data.len()]; |
414 | let mut i = 0; |
415 | while i < write_data.len() { |
416 | reader.readable().await?; |
417 | match reader.try_read_buf(&mut read_data) { |
418 | Ok(n) => i += n, |
419 | Err(e) => { |
420 | assert_eq!(e.kind(), io::ErrorKind::WouldBlock); |
421 | continue; |
422 | } |
423 | } |
424 | } |
425 | |
426 | assert_eq!(read_data, write_data); |
427 | |
428 | Ok(()) |
429 | } |
430 | |