1#![cfg(feature = "full")]
2#![cfg(unix)]
3
4use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
5use tokio::net::unix::pipe;
6use tokio_test::task;
7use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok};
8
9use std::fs::File;
10use std::io;
11use std::os::unix::fs::OpenOptionsExt;
12use std::os::unix::io::AsRawFd;
13use std::path::{Path, PathBuf};
14
15/// Helper struct which will clean up temporary files once dropped.
16struct TempFifo {
17 path: PathBuf,
18 _dir: tempfile::TempDir,
19}
20
21impl TempFifo {
22 fn new(name: &str) -> io::Result<TempFifo> {
23 let dir = tempfile::Builder::new()
24 .prefix("tokio-fifo-tests")
25 .tempdir()?;
26 let path = dir.path().join(name);
27 nix::unistd::mkfifo(&path, nix::sys::stat::Mode::S_IRWXU)?;
28
29 Ok(TempFifo { path, _dir: dir })
30 }
31}
32
33impl AsRef<Path> for TempFifo {
34 fn as_ref(&self) -> &Path {
35 self.path.as_ref()
36 }
37}
38
39#[tokio::test]
40async fn fifo_simple_send() -> io::Result<()> {
41 const DATA: &[u8] = b"this is some data to write to the fifo";
42
43 let fifo = TempFifo::new("simple_send")?;
44
45 // Create a reading task which should wait for data from the pipe.
46 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
47 let mut read_fut = task::spawn(async move {
48 let mut buf = vec![0; DATA.len()];
49 reader.read_exact(&mut buf).await?;
50 Ok::<_, io::Error>(buf)
51 });
52 assert_pending!(read_fut.poll());
53
54 let mut writer = pipe::OpenOptions::new().open_sender(&fifo)?;
55 writer.write_all(DATA).await?;
56
57 // Let the IO driver poll events for the reader.
58 while !read_fut.is_woken() {
59 tokio::task::yield_now().await;
60 }
61
62 // Reading task should be ready now.
63 let read_data = assert_ready_ok!(read_fut.poll());
64 assert_eq!(&read_data, DATA);
65
66 Ok(())
67}
68
69#[tokio::test]
70#[cfg(target_os = "linux")]
71async fn fifo_simple_send_sender_first() -> io::Result<()> {
72 const DATA: &[u8] = b"this is some data to write to the fifo";
73
74 // Create a new fifo file with *no reading ends open*.
75 let fifo = TempFifo::new("simple_send_sender_first")?;
76
77 // Simple `open_sender` should fail with ENXIO (no such device or address).
78 let err = assert_err!(pipe::OpenOptions::new().open_sender(&fifo));
79 assert_eq!(err.raw_os_error(), Some(libc::ENXIO));
80
81 // `open_sender` in read-write mode should succeed and the pipe should be ready to write.
82 let mut writer = pipe::OpenOptions::new()
83 .read_write(true)
84 .open_sender(&fifo)?;
85 writer.write_all(DATA).await?;
86
87 // Read the written data and validate.
88 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
89 let mut read_data = vec![0; DATA.len()];
90 reader.read_exact(&mut read_data).await?;
91 assert_eq!(&read_data, DATA);
92
93 Ok(())
94}
95
96// Opens a FIFO file, write and *close the writer*.
97async fn write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()> {
98 let mut writer = pipe::OpenOptions::new().open_sender(path)?;
99 writer.write_all(msg).await?;
100 drop(writer); // Explicit drop.
101 Ok(())
102}
103
104/// Checks EOF behavior with single reader and writers sequentially opening
105/// and closing a FIFO.
106#[tokio::test]
107async fn fifo_multiple_writes() -> io::Result<()> {
108 const DATA: &[u8] = b"this is some data to write to the fifo";
109
110 let fifo = TempFifo::new("fifo_multiple_writes")?;
111
112 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
113
114 write_and_close(&fifo, DATA).await?;
115 let ev = reader.ready(Interest::READABLE).await?;
116 assert!(ev.is_readable());
117 let mut read_data = vec![0; DATA.len()];
118 assert_ok!(reader.read_exact(&mut read_data).await);
119
120 // Check that reader hits EOF.
121 let err = assert_err!(reader.read_exact(&mut read_data).await);
122 assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
123
124 // Write more data and read again.
125 write_and_close(&fifo, DATA).await?;
126 assert_ok!(reader.read_exact(&mut read_data).await);
127
128 Ok(())
129}
130
131/// Checks behavior of a resilient reader (Receiver in O_RDWR access mode)
132/// with writers sequentially opening and closing a FIFO.
133#[tokio::test]
134#[cfg(target_os = "linux")]
135async fn fifo_resilient_reader() -> io::Result<()> {
136 const DATA: &[u8] = b"this is some data to write to the fifo";
137
138 let fifo = TempFifo::new("fifo_resilient_reader")?;
139
140 // Open reader in read-write access mode.
141 let mut reader = pipe::OpenOptions::new()
142 .read_write(true)
143 .open_receiver(&fifo)?;
144
145 write_and_close(&fifo, DATA).await?;
146 let ev = reader.ready(Interest::READABLE).await?;
147 let mut read_data = vec![0; DATA.len()];
148 reader.read_exact(&mut read_data).await?;
149
150 // Check that reader didn't hit EOF.
151 assert!(!ev.is_read_closed());
152
153 // Resilient reader can asynchronously wait for the next writer.
154 let mut second_read_fut = task::spawn(reader.read_exact(&mut read_data));
155 assert_pending!(second_read_fut.poll());
156
157 // Write more data and read again.
158 write_and_close(&fifo, DATA).await?;
159 assert_ok!(second_read_fut.await);
160
161 Ok(())
162}
163
164#[tokio::test]
165async fn open_detects_not_a_fifo() -> io::Result<()> {
166 let dir = tempfile::Builder::new()
167 .prefix("tokio-fifo-tests")
168 .tempdir()
169 .unwrap();
170 let path = dir.path().join("not_a_fifo");
171
172 // Create an ordinary file.
173 File::create(&path)?;
174
175 // Check if Sender detects invalid file type.
176 let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
177 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
178
179 // Check if Receiver detects invalid file type.
180 let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
181 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
182
183 Ok(())
184}
185
186#[tokio::test]
187async fn from_file() -> io::Result<()> {
188 const DATA: &[u8] = b"this is some data to write to the fifo";
189
190 let fifo = TempFifo::new("from_file")?;
191
192 // Construct a Receiver from a File.
193 let file = std::fs::OpenOptions::new()
194 .read(true)
195 .custom_flags(libc::O_NONBLOCK)
196 .open(&fifo)?;
197 let mut reader = pipe::Receiver::from_file(file)?;
198
199 // Construct a Sender from a File.
200 let file = std::fs::OpenOptions::new()
201 .write(true)
202 .custom_flags(libc::O_NONBLOCK)
203 .open(&fifo)?;
204 let mut writer = pipe::Sender::from_file(file)?;
205
206 // Write and read some data to test async.
207 let mut read_fut = task::spawn(async move {
208 let mut buf = vec![0; DATA.len()];
209 reader.read_exact(&mut buf).await?;
210 Ok::<_, io::Error>(buf)
211 });
212 assert_pending!(read_fut.poll());
213
214 writer.write_all(DATA).await?;
215
216 let read_data = assert_ok!(read_fut.await);
217 assert_eq!(&read_data, DATA);
218
219 Ok(())
220}
221
222#[tokio::test]
223async fn from_file_detects_not_a_fifo() -> io::Result<()> {
224 let dir = tempfile::Builder::new()
225 .prefix("tokio-fifo-tests")
226 .tempdir()
227 .unwrap();
228 let path = dir.path().join("not_a_fifo");
229
230 // Create an ordinary file.
231 File::create(&path)?;
232
233 // Check if Sender detects invalid file type.
234 let file = std::fs::OpenOptions::new().write(true).open(&path)?;
235 let err = assert_err!(pipe::Sender::from_file(file));
236 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
237
238 // Check if Receiver detects invalid file type.
239 let file = std::fs::OpenOptions::new().read(true).open(&path)?;
240 let err = assert_err!(pipe::Receiver::from_file(file));
241 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
242
243 Ok(())
244}
245
246#[tokio::test]
247async fn from_file_detects_wrong_access_mode() -> io::Result<()> {
248 let fifo = TempFifo::new("wrong_access_mode")?;
249
250 // Open a read end to open the fifo for writing.
251 let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
252
253 // Check if Receiver detects write-only access mode.
254 let wronly = std::fs::OpenOptions::new()
255 .write(true)
256 .custom_flags(libc::O_NONBLOCK)
257 .open(&fifo)?;
258 let err = assert_err!(pipe::Receiver::from_file(wronly));
259 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
260
261 // Check if Sender detects read-only access mode.
262 let rdonly = std::fs::OpenOptions::new()
263 .read(true)
264 .custom_flags(libc::O_NONBLOCK)
265 .open(&fifo)?;
266 let err = assert_err!(pipe::Sender::from_file(rdonly));
267 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
268
269 Ok(())
270}
271
272fn is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool> {
273 let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?;
274 Ok((flags & libc::O_NONBLOCK) != 0)
275}
276
277#[tokio::test]
278async fn from_file_sets_nonblock() -> io::Result<()> {
279 let fifo = TempFifo::new("sets_nonblock")?;
280
281 // Open read and write ends to let blocking files open.
282 let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
283 let _writer = pipe::OpenOptions::new().open_sender(&fifo)?;
284
285 // Check if Receiver sets the pipe in non-blocking mode.
286 let rdonly = std::fs::OpenOptions::new().read(true).open(&fifo)?;
287 assert!(!is_nonblocking(&rdonly)?);
288 let reader = pipe::Receiver::from_file(rdonly)?;
289 assert!(is_nonblocking(&reader)?);
290
291 // Check if Sender sets the pipe in non-blocking mode.
292 let wronly = std::fs::OpenOptions::new().write(true).open(&fifo)?;
293 assert!(!is_nonblocking(&wronly)?);
294 let writer = pipe::Sender::from_file(wronly)?;
295 assert!(is_nonblocking(&writer)?);
296
297 Ok(())
298}
299
300fn writable_by_poll(writer: &pipe::Sender) -> bool {
301 task::spawn(writer.writable()).poll().is_ready()
302}
303
304#[tokio::test]
305async fn try_read_write() -> io::Result<()> {
306 const DATA: &[u8] = b"this is some data to write to the fifo";
307
308 // Create a pipe pair over a fifo file.
309 let fifo = TempFifo::new("try_read_write")?;
310 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
311 let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
312
313 // Fill the pipe buffer with `try_write`.
314 let mut write_data = Vec::new();
315 while writable_by_poll(&writer) {
316 match writer.try_write(DATA) {
317 Ok(n) => write_data.extend(&DATA[..n]),
318 Err(e) => {
319 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
320 break;
321 }
322 }
323 }
324
325 // Drain the pipe buffer with `try_read`.
326 let mut read_data = vec![0; write_data.len()];
327 let mut i = 0;
328 while i < write_data.len() {
329 reader.readable().await?;
330 match reader.try_read(&mut read_data[i..]) {
331 Ok(n) => i += n,
332 Err(e) => {
333 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
334 continue;
335 }
336 }
337 }
338
339 assert_eq!(read_data, write_data);
340
341 Ok(())
342}
343
344#[tokio::test]
345async fn try_read_write_vectored() -> io::Result<()> {
346 const DATA: &[u8] = b"this is some data to write to the fifo";
347
348 // Create a pipe pair over a fifo file.
349 let fifo = TempFifo::new("try_read_write_vectored")?;
350 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
351 let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
352
353 let write_bufs: Vec<_> = DATA.chunks(3).map(io::IoSlice::new).collect();
354
355 // Fill the pipe buffer with `try_write_vectored`.
356 let mut write_data = Vec::new();
357 while writable_by_poll(&writer) {
358 match writer.try_write_vectored(&write_bufs) {
359 Ok(n) => write_data.extend(&DATA[..n]),
360 Err(e) => {
361 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
362 break;
363 }
364 }
365 }
366
367 // Drain the pipe buffer with `try_read_vectored`.
368 let mut read_data = vec![0; write_data.len()];
369 let mut i = 0;
370 while i < write_data.len() {
371 reader.readable().await?;
372
373 let mut read_bufs: Vec<_> = read_data[i..]
374 .chunks_mut(0x10000)
375 .map(io::IoSliceMut::new)
376 .collect();
377 match reader.try_read_vectored(&mut read_bufs) {
378 Ok(n) => i += n,
379 Err(e) => {
380 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
381 continue;
382 }
383 }
384 }
385
386 assert_eq!(read_data, write_data);
387
388 Ok(())
389}
390
391#[tokio::test]
392async fn try_read_buf() -> std::io::Result<()> {
393 const DATA: &[u8] = b"this is some data to write to the fifo";
394
395 // Create a pipe pair over a fifo file.
396 let fifo = TempFifo::new("try_read_write_vectored")?;
397 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
398 let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
399
400 // Fill the pipe buffer with `try_write`.
401 let mut write_data = Vec::new();
402 while writable_by_poll(&writer) {
403 match writer.try_write(DATA) {
404 Ok(n) => write_data.extend(&DATA[..n]),
405 Err(e) => {
406 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
407 break;
408 }
409 }
410 }
411
412 // Drain the pipe buffer with `try_read_buf`.
413 let mut read_data = vec![0; write_data.len()];
414 let mut i = 0;
415 while i < write_data.len() {
416 reader.readable().await?;
417 match reader.try_read_buf(&mut read_data) {
418 Ok(n) => i += n,
419 Err(e) => {
420 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
421 continue;
422 }
423 }
424 }
425
426 assert_eq!(read_data, write_data);
427
428 Ok(())
429}
430