1//! Unix pipe types.
2
3use crate::io::interest::Interest;
4use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
5
6use mio::unix::pipe as mio_pipe;
7use std::fs::File;
8use std::io::{self, Read, Write};
9use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
10#[cfg(not(tokio_no_as_fd))]
11use std::os::unix::io::{AsFd, BorrowedFd};
12use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
13use std::path::Path;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17cfg_io_util! {
18 use bytes::BufMut;
19}
20
21/// Options and flags which can be used to configure how a FIFO file is opened.
22///
23/// This builder allows configuring how to create a pipe end from a FIFO file.
24/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
25/// then chain calls to methods to set each option, then call either
26/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
27/// are trying to open. This will give you a [`io::Result`][result] with a pipe
28/// end inside that you can further operate on.
29///
30/// [`new`]: OpenOptions::new
31/// [`open_receiver`]: OpenOptions::open_receiver
32/// [`open_sender`]: OpenOptions::open_sender
33/// [result]: std::io::Result
34///
35/// # Examples
36///
37/// Opening a pair of pipe ends from a FIFO file:
38///
39/// ```no_run
40/// use tokio::net::unix::pipe;
41/// # use std::error::Error;
42///
43/// const FIFO_NAME: &str = "path/to/a/fifo";
44///
45/// # async fn dox() -> Result<(), Box<dyn Error>> {
46/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
47/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
48/// # Ok(())
49/// # }
50/// ```
51///
52/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
53///
54/// ```ignore
55/// use tokio::net::unix::pipe;
56/// use nix::{unistd::mkfifo, sys::stat::Mode};
57/// # use std::error::Error;
58///
59/// // Our program has exclusive access to this path.
60/// const FIFO_NAME: &str = "path/to/a/new/fifo";
61///
62/// # async fn dox() -> Result<(), Box<dyn Error>> {
63/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
64/// let tx = pipe::OpenOptions::new()
65/// .read_write(true)
66/// .unchecked(true)
67/// .open_sender(FIFO_NAME)?;
68/// # Ok(())
69/// # }
70/// ```
71#[derive(Clone, Debug)]
72pub struct OpenOptions {
73 #[cfg(target_os = "linux")]
74 read_write: bool,
75 unchecked: bool,
76}
77
78impl OpenOptions {
79 /// Creates a blank new set of options ready for configuration.
80 ///
81 /// All options are initially set to `false`.
82 pub fn new() -> OpenOptions {
83 OpenOptions {
84 #[cfg(target_os = "linux")]
85 read_write: false,
86 unchecked: false,
87 }
88 }
89
90 /// Sets the option for read-write access.
91 ///
92 /// This option, when true, will indicate that a FIFO file will be opened
93 /// in read-write access mode. This operation is not defined by the POSIX
94 /// standard and is only guaranteed to work on Linux.
95 ///
96 /// # Examples
97 ///
98 /// Opening a [`Sender`] even if there are no open reading ends:
99 ///
100 /// ```ignore
101 /// use tokio::net::unix::pipe;
102 ///
103 /// let tx = pipe::OpenOptions::new()
104 /// .read_write(true)
105 /// .open_sender("path/to/a/fifo");
106 /// ```
107 ///
108 /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
109 /// fail with [`UnexpectedEof`] during reading if all writing ends of the
110 /// pipe close the FIFO file.
111 ///
112 /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
113 ///
114 /// ```ignore
115 /// use tokio::net::unix::pipe;
116 ///
117 /// let tx = pipe::OpenOptions::new()
118 /// .read_write(true)
119 /// .open_receiver("path/to/a/fifo");
120 /// ```
121 #[cfg(target_os = "linux")]
122 #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
123 pub fn read_write(&mut self, value: bool) -> &mut Self {
124 self.read_write = value;
125 self
126 }
127
128 /// Sets the option to skip the check for FIFO file type.
129 ///
130 /// By default, [`open_receiver`] and [`open_sender`] functions will check
131 /// if the opened file is a FIFO file. Set this option to `true` if you are
132 /// sure the file is a FIFO file.
133 ///
134 /// [`open_receiver`]: OpenOptions::open_receiver
135 /// [`open_sender`]: OpenOptions::open_sender
136 ///
137 /// # Examples
138 ///
139 /// ```no_run
140 /// use tokio::net::unix::pipe;
141 /// use nix::{unistd::mkfifo, sys::stat::Mode};
142 /// # use std::error::Error;
143 ///
144 /// // Our program has exclusive access to this path.
145 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
146 ///
147 /// # async fn dox() -> Result<(), Box<dyn Error>> {
148 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
149 /// let rx = pipe::OpenOptions::new()
150 /// .unchecked(true)
151 /// .open_receiver(FIFO_NAME)?;
152 /// # Ok(())
153 /// # }
154 /// ```
155 pub fn unchecked(&mut self, value: bool) -> &mut Self {
156 self.unchecked = value;
157 self
158 }
159
160 /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
161 ///
162 /// This function will open the FIFO file at the specified path, possibly
163 /// check if it is a pipe, and associate the pipe with the default event
164 /// loop for reading.
165 ///
166 /// # Errors
167 ///
168 /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
169 /// This function may also fail with other standard OS errors.
170 ///
171 /// # Panics
172 ///
173 /// This function panics if it is not called from within a runtime with
174 /// IO enabled.
175 ///
176 /// The runtime is usually set implicitly when this function is called
177 /// from a future driven by a tokio runtime, otherwise runtime can be set
178 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
179 pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
180 let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
181 Receiver::from_file_unchecked(file)
182 }
183
184 /// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
185 ///
186 /// This function will open the FIFO file at the specified path, possibly
187 /// check if it is a pipe, and associate the pipe with the default event
188 /// loop for writing.
189 ///
190 /// # Errors
191 ///
192 /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
193 /// If the file is not opened in read-write access mode and the file is not
194 /// currently open for reading, this function will fail with `ENXIO`.
195 /// This function may also fail with other standard OS errors.
196 ///
197 /// # Panics
198 ///
199 /// This function panics if it is not called from within a runtime with
200 /// IO enabled.
201 ///
202 /// The runtime is usually set implicitly when this function is called
203 /// from a future driven by a tokio runtime, otherwise runtime can be set
204 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
205 pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
206 let file = self.open(path.as_ref(), PipeEnd::Sender)?;
207 Sender::from_file_unchecked(file)
208 }
209
210 fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
211 let mut options = std::fs::OpenOptions::new();
212 options
213 .read(pipe_end == PipeEnd::Receiver)
214 .write(pipe_end == PipeEnd::Sender)
215 .custom_flags(libc::O_NONBLOCK);
216
217 #[cfg(target_os = "linux")]
218 if self.read_write {
219 options.read(true).write(true);
220 }
221
222 let file = options.open(path)?;
223
224 if !self.unchecked && !is_fifo(&file)? {
225 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
226 }
227
228 Ok(file)
229 }
230}
231
232impl Default for OpenOptions {
233 fn default() -> OpenOptions {
234 OpenOptions::new()
235 }
236}
237
238#[derive(Clone, Copy, PartialEq, Eq, Debug)]
239enum PipeEnd {
240 Sender,
241 Receiver,
242}
243
244/// Writing end of a Unix pipe.
245///
246/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
247///
248/// Opening a named pipe for writing involves a few steps.
249/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
250/// different things:
251///
252/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
253/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
254/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
255/// Sleep for a while and try again.
256/// * Other OS errors not specific to opening FIFO files.
257///
258/// Opening a `Sender` from a FIFO file should look like this:
259///
260/// ```no_run
261/// use tokio::net::unix::pipe;
262/// use tokio::time::{self, Duration};
263///
264/// const FIFO_NAME: &str = "path/to/a/fifo";
265///
266/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
267/// // Wait for a reader to open the file.
268/// let tx = loop {
269/// match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
270/// Ok(tx) => break tx,
271/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
272/// Err(e) => return Err(e.into()),
273/// }
274///
275/// time::sleep(Duration::from_millis(50)).await;
276/// };
277/// # Ok(())
278/// # }
279/// ```
280///
281/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
282/// loop. This is done by opening a named pipe in read-write access mode with
283/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
284/// both a writing end and a reading end, and the latter allows to open a FIFO
285/// without [`ENXIO`] error since the pipe is open for reading as well.
286///
287/// `Sender` cannot be used to read from a pipe, so in practice the read access
288/// is only used when a FIFO is opened. However, using a `Sender` in read-write
289/// mode **may lead to lost data**, because written data will be dropped by the
290/// system as soon as all pipe ends are closed. To avoid lost data you have to
291/// make sure that a reading end has been opened before dropping a `Sender`.
292///
293/// Note that using read-write access mode with FIFO files is not defined by
294/// the POSIX standard and it is only guaranteed to work on Linux.
295///
296/// ```ignore
297/// use tokio::io::AsyncWriteExt;
298/// use tokio::net::unix::pipe;
299///
300/// const FIFO_NAME: &str = "path/to/a/fifo";
301///
302/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
303/// let mut tx = pipe::OpenOptions::new()
304/// .read_write(true)
305/// .open_sender(FIFO_NAME)?;
306///
307/// // Asynchronously write to the pipe before a reader.
308/// tx.write_all(b"hello world").await?;
309/// # Ok(())
310/// # }
311/// ```
312///
313/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
314#[derive(Debug)]
315pub struct Sender {
316 io: PollEvented<mio_pipe::Sender>,
317}
318
319impl Sender {
320 fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
321 let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
322 Ok(Sender { io })
323 }
324
325 /// Creates a new `Sender` from a [`File`].
326 ///
327 /// This function is intended to construct a pipe from a [`File`] representing
328 /// a special FIFO file. It will check if the file is a pipe and has write access,
329 /// set it in non-blocking mode and perform the conversion.
330 ///
331 /// # Errors
332 ///
333 /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
334 /// does not have write access. Also fails with any standard OS error if it occurs.
335 ///
336 /// # Panics
337 ///
338 /// This function panics if it is not called from within a runtime with
339 /// IO enabled.
340 ///
341 /// The runtime is usually set implicitly when this function is called
342 /// from a future driven by a tokio runtime, otherwise runtime can be set
343 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
344 pub fn from_file(mut file: File) -> io::Result<Sender> {
345 if !is_fifo(&file)? {
346 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
347 }
348
349 let flags = get_file_flags(&file)?;
350 if has_write_access(flags) {
351 set_nonblocking(&mut file, flags)?;
352 Sender::from_file_unchecked(file)
353 } else {
354 Err(io::Error::new(
355 io::ErrorKind::InvalidInput,
356 "not in O_WRONLY or O_RDWR access mode",
357 ))
358 }
359 }
360
361 /// Creates a new `Sender` from a [`File`] without checking pipe properties.
362 ///
363 /// This function is intended to construct a pipe from a File representing
364 /// a special FIFO file. The conversion assumes nothing about the underlying
365 /// file; it is left up to the user to make sure it is opened with write access,
366 /// represents a pipe and is set in non-blocking mode.
367 ///
368 /// # Examples
369 ///
370 /// ```no_run
371 /// use tokio::net::unix::pipe;
372 /// use std::fs::OpenOptions;
373 /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
374 /// # use std::error::Error;
375 ///
376 /// const FIFO_NAME: &str = "path/to/a/fifo";
377 ///
378 /// # async fn dox() -> Result<(), Box<dyn Error>> {
379 /// let file = OpenOptions::new()
380 /// .write(true)
381 /// .custom_flags(libc::O_NONBLOCK)
382 /// .open(FIFO_NAME)?;
383 /// if file.metadata()?.file_type().is_fifo() {
384 /// let tx = pipe::Sender::from_file_unchecked(file)?;
385 /// /* use the Sender */
386 /// }
387 /// # Ok(())
388 /// # }
389 /// ```
390 ///
391 /// # Panics
392 ///
393 /// This function panics if it is not called from within a runtime with
394 /// IO enabled.
395 ///
396 /// The runtime is usually set implicitly when this function is called
397 /// from a future driven by a tokio runtime, otherwise runtime can be set
398 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
399 pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
400 let raw_fd = file.into_raw_fd();
401 let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) };
402 Sender::from_mio(mio_tx)
403 }
404
405 /// Waits for any of the requested ready states.
406 ///
407 /// This function can be used instead of [`writable()`] to check the returned
408 /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
409 ///
410 /// The function may complete without the pipe being ready. This is a
411 /// false-positive and attempting an operation will return with
412 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
413 /// [`Ready`] set, so you should always check the returned value and possibly
414 /// wait again if the requested states are not set.
415 ///
416 /// [`writable()`]: Self::writable
417 ///
418 /// # Cancel safety
419 ///
420 /// This method is cancel safe. Once a readiness event occurs, the method
421 /// will continue to return immediately until the readiness event is
422 /// consumed by an attempt to write that fails with `WouldBlock` or
423 /// `Poll::Pending`.
424 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
425 let event = self.io.registration().readiness(interest).await?;
426 Ok(event.ready)
427 }
428
429 /// Waits for the pipe to become writable.
430 ///
431 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
432 /// paired with [`try_write()`].
433 ///
434 /// [`try_write()`]: Self::try_write
435 ///
436 /// # Examples
437 ///
438 /// ```no_run
439 /// use tokio::net::unix::pipe;
440 /// use std::io;
441 ///
442 /// #[tokio::main]
443 /// async fn main() -> io::Result<()> {
444 /// // Open a writing end of a fifo
445 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
446 ///
447 /// loop {
448 /// // Wait for the pipe to be writable
449 /// tx.writable().await?;
450 ///
451 /// // Try to write data, this may still fail with `WouldBlock`
452 /// // if the readiness event is a false positive.
453 /// match tx.try_write(b"hello world") {
454 /// Ok(n) => {
455 /// break;
456 /// }
457 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
458 /// continue;
459 /// }
460 /// Err(e) => {
461 /// return Err(e.into());
462 /// }
463 /// }
464 /// }
465 ///
466 /// Ok(())
467 /// }
468 /// ```
469 pub async fn writable(&self) -> io::Result<()> {
470 self.ready(Interest::WRITABLE).await?;
471 Ok(())
472 }
473
474 /// Polls for write readiness.
475 ///
476 /// If the pipe is not currently ready for writing, this method will
477 /// store a clone of the `Waker` from the provided `Context`. When the pipe
478 /// becomes ready for writing, `Waker::wake` will be called on the waker.
479 ///
480 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
481 /// the `Waker` from the `Context` passed to the most recent call is
482 /// scheduled to receive a wakeup.
483 ///
484 /// This function is intended for cases where creating and pinning a future
485 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
486 /// preferred, as this supports polling from multiple tasks at once.
487 ///
488 /// [`writable`]: Self::writable
489 ///
490 /// # Return value
491 ///
492 /// The function returns:
493 ///
494 /// * `Poll::Pending` if the pipe is not ready for writing.
495 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
496 /// * `Poll::Ready(Err(e))` if an error is encountered.
497 ///
498 /// # Errors
499 ///
500 /// This function may encounter any standard I/O error except `WouldBlock`.
501 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
502 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
503 }
504
505 /// Tries to write a buffer to the pipe, returning how many bytes were
506 /// written.
507 ///
508 /// The function will attempt to write the entire contents of `buf`, but
509 /// only part of the buffer may be written. If the length of `buf` is not
510 /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
511 /// write is guaranteed to be atomic, i.e. either the entire content of
512 /// `buf` will be written or this method will fail with `WouldBlock`. There
513 /// is no such guarantee if `buf` is larger than `PIPE_BUF`.
514 ///
515 /// This function is usually paired with [`writable`].
516 ///
517 /// [`writable`]: Self::writable
518 ///
519 /// # Return
520 ///
521 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
522 /// number of bytes written. If the pipe is not ready to write data,
523 /// `Err(io::ErrorKind::WouldBlock)` is returned.
524 ///
525 /// # Examples
526 ///
527 /// ```no_run
528 /// use tokio::net::unix::pipe;
529 /// use std::io;
530 ///
531 /// #[tokio::main]
532 /// async fn main() -> io::Result<()> {
533 /// // Open a writing end of a fifo
534 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
535 ///
536 /// loop {
537 /// // Wait for the pipe to be writable
538 /// tx.writable().await?;
539 ///
540 /// // Try to write data, this may still fail with `WouldBlock`
541 /// // if the readiness event is a false positive.
542 /// match tx.try_write(b"hello world") {
543 /// Ok(n) => {
544 /// break;
545 /// }
546 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
547 /// continue;
548 /// }
549 /// Err(e) => {
550 /// return Err(e.into());
551 /// }
552 /// }
553 /// }
554 ///
555 /// Ok(())
556 /// }
557 /// ```
558 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
559 self.io
560 .registration()
561 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
562 }
563
564 /// Tries to write several buffers to the pipe, returning how many bytes
565 /// were written.
566 ///
567 /// Data is written from each buffer in order, with the final buffer read
568 /// from possible being only partially consumed. This method behaves
569 /// equivalently to a single call to [`try_write()`] with concatenated
570 /// buffers.
571 ///
572 /// If the total length of buffers is not greater than `PIPE_BUF` (an OS
573 /// constant, 4096 under Linux), then the write is guaranteed to be atomic,
574 /// i.e. either the entire contents of buffers will be written or this
575 /// method will fail with `WouldBlock`. There is no such guarantee if the
576 /// total length of buffers is greater than `PIPE_BUF`.
577 ///
578 /// This function is usually paired with [`writable`].
579 ///
580 /// [`try_write()`]: Self::try_write()
581 /// [`writable`]: Self::writable
582 ///
583 /// # Return
584 ///
585 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
586 /// number of bytes written. If the pipe is not ready to write data,
587 /// `Err(io::ErrorKind::WouldBlock)` is returned.
588 ///
589 /// # Examples
590 ///
591 /// ```no_run
592 /// use tokio::net::unix::pipe;
593 /// use std::io;
594 ///
595 /// #[tokio::main]
596 /// async fn main() -> io::Result<()> {
597 /// // Open a writing end of a fifo
598 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
599 ///
600 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
601 ///
602 /// loop {
603 /// // Wait for the pipe to be writable
604 /// tx.writable().await?;
605 ///
606 /// // Try to write data, this may still fail with `WouldBlock`
607 /// // if the readiness event is a false positive.
608 /// match tx.try_write_vectored(&bufs) {
609 /// Ok(n) => {
610 /// break;
611 /// }
612 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
613 /// continue;
614 /// }
615 /// Err(e) => {
616 /// return Err(e.into());
617 /// }
618 /// }
619 /// }
620 ///
621 /// Ok(())
622 /// }
623 /// ```
624 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
625 self.io
626 .registration()
627 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
628 }
629}
630
631impl AsyncWrite for Sender {
632 fn poll_write(
633 self: Pin<&mut Self>,
634 cx: &mut Context<'_>,
635 buf: &[u8],
636 ) -> Poll<io::Result<usize>> {
637 self.io.poll_write(cx, buf)
638 }
639
640 fn poll_write_vectored(
641 self: Pin<&mut Self>,
642 cx: &mut Context<'_>,
643 bufs: &[io::IoSlice<'_>],
644 ) -> Poll<io::Result<usize>> {
645 self.io.poll_write_vectored(cx, bufs)
646 }
647
648 fn is_write_vectored(&self) -> bool {
649 true
650 }
651
652 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
653 Poll::Ready(Ok(()))
654 }
655
656 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
657 Poll::Ready(Ok(()))
658 }
659}
660
661impl AsRawFd for Sender {
662 fn as_raw_fd(&self) -> RawFd {
663 self.io.as_raw_fd()
664 }
665}
666
667#[cfg(not(tokio_no_as_fd))]
668impl AsFd for Sender {
669 fn as_fd(&self) -> BorrowedFd<'_> {
670 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
671 }
672}
673
674/// Reading end of a Unix pipe.
675///
676/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
677///
678/// # Examples
679///
680/// Receiving messages from a named pipe in a loop:
681///
682/// ```no_run
683/// use tokio::net::unix::pipe;
684/// use tokio::io::{self, AsyncReadExt};
685///
686/// const FIFO_NAME: &str = "path/to/a/fifo";
687///
688/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
689/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
690/// loop {
691/// let mut msg = vec![0; 256];
692/// match rx.read_exact(&mut msg).await {
693/// Ok(_) => {
694/// /* handle the message */
695/// }
696/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
697/// // Writing end has been closed, we should reopen the pipe.
698/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
699/// }
700/// Err(e) => return Err(e.into()),
701/// }
702/// }
703/// # }
704/// ```
705///
706/// On Linux, you can use a `Receiver` in read-write access mode to implement
707/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
708/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
709/// when the writing end is closed. This way, a `Receiver` can asynchronously
710/// wait for the next writer to open the pipe.
711///
712/// You should not use functions waiting for EOF such as [`read_to_end`] with
713/// a `Receiver` in read-write access mode, since it **may wait forever**.
714/// `Receiver` in this mode also holds an open writing end, which prevents
715/// receiving EOF.
716///
717/// To set the read-write access mode you can use `OpenOptions::read_write`.
718/// Note that using read-write access mode with FIFO files is not defined by
719/// the POSIX standard and it is only guaranteed to work on Linux.
720///
721/// ```ignore
722/// use tokio::net::unix::pipe;
723/// use tokio::io::AsyncReadExt;
724/// # use std::error::Error;
725///
726/// const FIFO_NAME: &str = "path/to/a/fifo";
727///
728/// # async fn dox() -> Result<(), Box<dyn Error>> {
729/// let mut rx = pipe::OpenOptions::new()
730/// .read_write(true)
731/// .open_receiver(FIFO_NAME)?;
732/// loop {
733/// let mut msg = vec![0; 256];
734/// rx.read_exact(&mut msg).await?;
735/// /* handle the message */
736/// }
737/// # }
738/// ```
739///
740/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
741#[derive(Debug)]
742pub struct Receiver {
743 io: PollEvented<mio_pipe::Receiver>,
744}
745
746impl Receiver {
747 fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
748 let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
749 Ok(Receiver { io })
750 }
751
752 /// Creates a new `Receiver` from a [`File`].
753 ///
754 /// This function is intended to construct a pipe from a [`File`] representing
755 /// a special FIFO file. It will check if the file is a pipe and has read access,
756 /// set it in non-blocking mode and perform the conversion.
757 ///
758 /// # Errors
759 ///
760 /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
761 /// does not have read access. Also fails with any standard OS error if it occurs.
762 ///
763 /// # Panics
764 ///
765 /// This function panics if it is not called from within a runtime with
766 /// IO enabled.
767 ///
768 /// The runtime is usually set implicitly when this function is called
769 /// from a future driven by a tokio runtime, otherwise runtime can be set
770 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
771 pub fn from_file(mut file: File) -> io::Result<Receiver> {
772 if !is_fifo(&file)? {
773 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
774 }
775
776 let flags = get_file_flags(&file)?;
777 if has_read_access(flags) {
778 set_nonblocking(&mut file, flags)?;
779 Receiver::from_file_unchecked(file)
780 } else {
781 Err(io::Error::new(
782 io::ErrorKind::InvalidInput,
783 "not in O_RDONLY or O_RDWR access mode",
784 ))
785 }
786 }
787
788 /// Creates a new `Receiver` from a [`File`] without checking pipe properties.
789 ///
790 /// This function is intended to construct a pipe from a File representing
791 /// a special FIFO file. The conversion assumes nothing about the underlying
792 /// file; it is left up to the user to make sure it is opened with read access,
793 /// represents a pipe and is set in non-blocking mode.
794 ///
795 /// # Examples
796 ///
797 /// ```no_run
798 /// use tokio::net::unix::pipe;
799 /// use std::fs::OpenOptions;
800 /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
801 /// # use std::error::Error;
802 ///
803 /// const FIFO_NAME: &str = "path/to/a/fifo";
804 ///
805 /// # async fn dox() -> Result<(), Box<dyn Error>> {
806 /// let file = OpenOptions::new()
807 /// .read(true)
808 /// .custom_flags(libc::O_NONBLOCK)
809 /// .open(FIFO_NAME)?;
810 /// if file.metadata()?.file_type().is_fifo() {
811 /// let rx = pipe::Receiver::from_file_unchecked(file)?;
812 /// /* use the Receiver */
813 /// }
814 /// # Ok(())
815 /// # }
816 /// ```
817 ///
818 /// # Panics
819 ///
820 /// This function panics if it is not called from within a runtime with
821 /// IO enabled.
822 ///
823 /// The runtime is usually set implicitly when this function is called
824 /// from a future driven by a tokio runtime, otherwise runtime can be set
825 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
826 pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
827 let raw_fd = file.into_raw_fd();
828 let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) };
829 Receiver::from_mio(mio_rx)
830 }
831
832 /// Waits for any of the requested ready states.
833 ///
834 /// This function can be used instead of [`readable()`] to check the returned
835 /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
836 ///
837 /// The function may complete without the pipe being ready. This is a
838 /// false-positive and attempting an operation will return with
839 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
840 /// [`Ready`] set, so you should always check the returned value and possibly
841 /// wait again if the requested states are not set.
842 ///
843 /// [`readable()`]: Self::readable
844 ///
845 /// # Cancel safety
846 ///
847 /// This method is cancel safe. Once a readiness event occurs, the method
848 /// will continue to return immediately until the readiness event is
849 /// consumed by an attempt to read that fails with `WouldBlock` or
850 /// `Poll::Pending`.
851 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
852 let event = self.io.registration().readiness(interest).await?;
853 Ok(event.ready)
854 }
855
856 /// Waits for the pipe to become readable.
857 ///
858 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
859 /// paired with [`try_read()`].
860 ///
861 /// [`try_read()`]: Self::try_read()
862 ///
863 /// # Examples
864 ///
865 /// ```no_run
866 /// use tokio::net::unix::pipe;
867 /// use std::io;
868 ///
869 /// #[tokio::main]
870 /// async fn main() -> io::Result<()> {
871 /// // Open a reading end of a fifo
872 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
873 ///
874 /// let mut msg = vec![0; 1024];
875 ///
876 /// loop {
877 /// // Wait for the pipe to be readable
878 /// rx.readable().await?;
879 ///
880 /// // Try to read data, this may still fail with `WouldBlock`
881 /// // if the readiness event is a false positive.
882 /// match rx.try_read(&mut msg) {
883 /// Ok(n) => {
884 /// msg.truncate(n);
885 /// break;
886 /// }
887 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
888 /// continue;
889 /// }
890 /// Err(e) => {
891 /// return Err(e.into());
892 /// }
893 /// }
894 /// }
895 ///
896 /// println!("GOT = {:?}", msg);
897 /// Ok(())
898 /// }
899 /// ```
900 pub async fn readable(&self) -> io::Result<()> {
901 self.ready(Interest::READABLE).await?;
902 Ok(())
903 }
904
905 /// Polls for read readiness.
906 ///
907 /// If the pipe is not currently ready for reading, this method will
908 /// store a clone of the `Waker` from the provided `Context`. When the pipe
909 /// becomes ready for reading, `Waker::wake` will be called on the waker.
910 ///
911 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
912 /// the `Waker` from the `Context` passed to the most recent call is
913 /// scheduled to receive a wakeup.
914 ///
915 /// This function is intended for cases where creating and pinning a future
916 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
917 /// preferred, as this supports polling from multiple tasks at once.
918 ///
919 /// [`readable`]: Self::readable
920 ///
921 /// # Return value
922 ///
923 /// The function returns:
924 ///
925 /// * `Poll::Pending` if the pipe is not ready for reading.
926 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
927 /// * `Poll::Ready(Err(e))` if an error is encountered.
928 ///
929 /// # Errors
930 ///
931 /// This function may encounter any standard I/O error except `WouldBlock`.
932 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
933 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
934 }
935
936 /// Tries to read data from the pipe into the provided buffer, returning how
937 /// many bytes were read.
938 ///
939 /// Reads any pending data from the pipe but does not wait for new data
940 /// to arrive. On success, returns the number of bytes read. Because
941 /// `try_read()` is non-blocking, the buffer does not have to be stored by
942 /// the async task and can exist entirely on the stack.
943 ///
944 /// Usually [`readable()`] is used with this function.
945 ///
946 /// [`readable()`]: Self::readable()
947 ///
948 /// # Return
949 ///
950 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
951 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
952 ///
953 /// 1. The pipe's writing end is closed and will no longer write data.
954 /// 2. The specified buffer was 0 bytes in length.
955 ///
956 /// If the pipe is not ready to read data,
957 /// `Err(io::ErrorKind::WouldBlock)` is returned.
958 ///
959 /// # Examples
960 ///
961 /// ```no_run
962 /// use tokio::net::unix::pipe;
963 /// use std::io;
964 ///
965 /// #[tokio::main]
966 /// async fn main() -> io::Result<()> {
967 /// // Open a reading end of a fifo
968 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
969 ///
970 /// let mut msg = vec![0; 1024];
971 ///
972 /// loop {
973 /// // Wait for the pipe to be readable
974 /// rx.readable().await?;
975 ///
976 /// // Try to read data, this may still fail with `WouldBlock`
977 /// // if the readiness event is a false positive.
978 /// match rx.try_read(&mut msg) {
979 /// Ok(n) => {
980 /// msg.truncate(n);
981 /// break;
982 /// }
983 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
984 /// continue;
985 /// }
986 /// Err(e) => {
987 /// return Err(e.into());
988 /// }
989 /// }
990 /// }
991 ///
992 /// println!("GOT = {:?}", msg);
993 /// Ok(())
994 /// }
995 /// ```
996 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
997 self.io
998 .registration()
999 .try_io(Interest::READABLE, || (&*self.io).read(buf))
1000 }
1001
1002 /// Tries to read data from the pipe into the provided buffers, returning
1003 /// how many bytes were read.
1004 ///
1005 /// Data is copied to fill each buffer in order, with the final buffer
1006 /// written to possibly being only partially filled. This method behaves
1007 /// equivalently to a single call to [`try_read()`] with concatenated
1008 /// buffers.
1009 ///
1010 /// Reads any pending data from the pipe but does not wait for new data
1011 /// to arrive. On success, returns the number of bytes read. Because
1012 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1013 /// stored by the async task and can exist entirely on the stack.
1014 ///
1015 /// Usually, [`readable()`] is used with this function.
1016 ///
1017 /// [`try_read()`]: Self::try_read()
1018 /// [`readable()`]: Self::readable()
1019 ///
1020 /// # Return
1021 ///
1022 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1023 /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1024 /// closed and will no longer write data. If the pipe is not ready to read
1025 /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1026 ///
1027 /// # Examples
1028 ///
1029 /// ```no_run
1030 /// use tokio::net::unix::pipe;
1031 /// use std::io;
1032 ///
1033 /// #[tokio::main]
1034 /// async fn main() -> io::Result<()> {
1035 /// // Open a reading end of a fifo
1036 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1037 ///
1038 /// loop {
1039 /// // Wait for the pipe to be readable
1040 /// rx.readable().await?;
1041 ///
1042 /// // Creating the buffer **after** the `await` prevents it from
1043 /// // being stored in the async task.
1044 /// let mut buf_a = [0; 512];
1045 /// let mut buf_b = [0; 1024];
1046 /// let mut bufs = [
1047 /// io::IoSliceMut::new(&mut buf_a),
1048 /// io::IoSliceMut::new(&mut buf_b),
1049 /// ];
1050 ///
1051 /// // Try to read data, this may still fail with `WouldBlock`
1052 /// // if the readiness event is a false positive.
1053 /// match rx.try_read_vectored(&mut bufs) {
1054 /// Ok(0) => break,
1055 /// Ok(n) => {
1056 /// println!("read {} bytes", n);
1057 /// }
1058 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1059 /// continue;
1060 /// }
1061 /// Err(e) => {
1062 /// return Err(e.into());
1063 /// }
1064 /// }
1065 /// }
1066 ///
1067 /// Ok(())
1068 /// }
1069 /// ```
1070 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1071 self.io
1072 .registration()
1073 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1074 }
1075
1076 cfg_io_util! {
1077 /// Tries to read data from the pipe into the provided buffer, advancing the
1078 /// buffer's internal cursor, returning how many bytes were read.
1079 ///
1080 /// Reads any pending data from the pipe but does not wait for new data
1081 /// to arrive. On success, returns the number of bytes read. Because
1082 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1083 /// the async task and can exist entirely on the stack.
1084 ///
1085 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1086 ///
1087 /// [`readable()`]: Self::readable
1088 /// [`ready()`]: Self::ready
1089 ///
1090 /// # Return
1091 ///
1092 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1093 /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1094 /// closed and will no longer write data. If the pipe is not ready to read
1095 /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1096 ///
1097 /// # Examples
1098 ///
1099 /// ```no_run
1100 /// use tokio::net::unix::pipe;
1101 /// use std::io;
1102 ///
1103 /// #[tokio::main]
1104 /// async fn main() -> io::Result<()> {
1105 /// // Open a reading end of a fifo
1106 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1107 ///
1108 /// loop {
1109 /// // Wait for the pipe to be readable
1110 /// rx.readable().await?;
1111 ///
1112 /// let mut buf = Vec::with_capacity(4096);
1113 ///
1114 /// // Try to read data, this may still fail with `WouldBlock`
1115 /// // if the readiness event is a false positive.
1116 /// match rx.try_read_buf(&mut buf) {
1117 /// Ok(0) => break,
1118 /// Ok(n) => {
1119 /// println!("read {} bytes", n);
1120 /// }
1121 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1122 /// continue;
1123 /// }
1124 /// Err(e) => {
1125 /// return Err(e.into());
1126 /// }
1127 /// }
1128 /// }
1129 ///
1130 /// Ok(())
1131 /// }
1132 /// ```
1133 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1134 self.io.registration().try_io(Interest::READABLE, || {
1135 use std::io::Read;
1136
1137 let dst = buf.chunk_mut();
1138 let dst =
1139 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1140
1141 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1142 // which correctly handles reads into uninitialized memory.
1143 let n = (&*self.io).read(dst)?;
1144
1145 unsafe {
1146 buf.advance_mut(n);
1147 }
1148
1149 Ok(n)
1150 })
1151 }
1152 }
1153}
1154
1155impl AsyncRead for Receiver {
1156 fn poll_read(
1157 self: Pin<&mut Self>,
1158 cx: &mut Context<'_>,
1159 buf: &mut ReadBuf<'_>,
1160 ) -> Poll<io::Result<()>> {
1161 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1162 // which correctly handles reads into uninitialized memory.
1163 unsafe { self.io.poll_read(cx, buf) }
1164 }
1165}
1166
1167impl AsRawFd for Receiver {
1168 fn as_raw_fd(&self) -> RawFd {
1169 self.io.as_raw_fd()
1170 }
1171}
1172
1173#[cfg(not(tokio_no_as_fd))]
1174impl AsFd for Receiver {
1175 fn as_fd(&self) -> BorrowedFd<'_> {
1176 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1177 }
1178}
1179
1180/// Checks if file is a FIFO
1181fn is_fifo(file: &File) -> io::Result<bool> {
1182 Ok(file.metadata()?.file_type().is_fifo())
1183}
1184
1185/// Gets file descriptor's flags by fcntl.
1186fn get_file_flags(file: &File) -> io::Result<libc::c_int> {
1187 let fd: i32 = file.as_raw_fd();
1188 let flags: i32 = unsafe { libc::fcntl(fd, cmd:libc::F_GETFL) };
1189 if flags < 0 {
1190 Err(io::Error::last_os_error())
1191 } else {
1192 Ok(flags)
1193 }
1194}
1195
1196/// Checks for O_RDONLY or O_RDWR access mode.
1197fn has_read_access(flags: libc::c_int) -> bool {
1198 let mode: i32 = flags & libc::O_ACCMODE;
1199 mode == libc::O_RDONLY || mode == libc::O_RDWR
1200}
1201
1202/// Checks for O_WRONLY or O_RDWR access mode.
1203fn has_write_access(flags: libc::c_int) -> bool {
1204 let mode: i32 = flags & libc::O_ACCMODE;
1205 mode == libc::O_WRONLY || mode == libc::O_RDWR
1206}
1207
1208/// Sets file's flags with O_NONBLOCK by fcntl.
1209fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> {
1210 let fd: i32 = file.as_raw_fd();
1211
1212 let flags: i32 = current_flags | libc::O_NONBLOCK;
1213
1214 if flags != current_flags {
1215 let ret: i32 = unsafe { libc::fcntl(fd, cmd:libc::F_SETFL, flags) };
1216 if ret < 0 {
1217 return Err(io::Error::last_os_error());
1218 }
1219 }
1220
1221 Ok(())
1222}
1223