1//! Unix pipe types.
2
3use crate::io::interest::Interest;
4use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
5
6use mio::unix::pipe as mio_pipe;
7use std::fs::File;
8use std::io::{self, Read, Write};
9use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
10use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
11use std::path::Path;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15cfg_io_util! {
16 use bytes::BufMut;
17}
18
19/// Options and flags which can be used to configure how a FIFO file is opened.
20///
21/// This builder allows configuring how to create a pipe end from a FIFO file.
22/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
23/// then chain calls to methods to set each option, then call either
24/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
25/// are trying to open. This will give you a [`io::Result`] with a pipe end
26/// inside that you can further operate on.
27///
28/// [`new`]: OpenOptions::new
29/// [`open_receiver`]: OpenOptions::open_receiver
30/// [`open_sender`]: OpenOptions::open_sender
31///
32/// # Examples
33///
34/// Opening a pair of pipe ends from a FIFO file:
35///
36/// ```no_run
37/// use tokio::net::unix::pipe;
38/// # use std::error::Error;
39///
40/// const FIFO_NAME: &str = "path/to/a/fifo";
41///
42/// # async fn dox() -> Result<(), Box<dyn Error>> {
43/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
44/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
45/// # Ok(())
46/// # }
47/// ```
48///
49/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
50///
51/// ```ignore
52/// use tokio::net::unix::pipe;
53/// use nix::{unistd::mkfifo, sys::stat::Mode};
54/// # use std::error::Error;
55///
56/// // Our program has exclusive access to this path.
57/// const FIFO_NAME: &str = "path/to/a/new/fifo";
58///
59/// # async fn dox() -> Result<(), Box<dyn Error>> {
60/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
61/// let tx = pipe::OpenOptions::new()
62/// .read_write(true)
63/// .unchecked(true)
64/// .open_sender(FIFO_NAME)?;
65/// # Ok(())
66/// # }
67/// ```
68#[derive(Clone, Debug)]
69pub struct OpenOptions {
70 #[cfg(target_os = "linux")]
71 read_write: bool,
72 unchecked: bool,
73}
74
75impl OpenOptions {
76 /// Creates a blank new set of options ready for configuration.
77 ///
78 /// All options are initially set to `false`.
79 pub fn new() -> OpenOptions {
80 OpenOptions {
81 #[cfg(target_os = "linux")]
82 read_write: false,
83 unchecked: false,
84 }
85 }
86
87 /// Sets the option for read-write access.
88 ///
89 /// This option, when true, will indicate that a FIFO file will be opened
90 /// in read-write access mode. This operation is not defined by the POSIX
91 /// standard and is only guaranteed to work on Linux.
92 ///
93 /// # Examples
94 ///
95 /// Opening a [`Sender`] even if there are no open reading ends:
96 ///
97 /// ```ignore
98 /// use tokio::net::unix::pipe;
99 ///
100 /// let tx = pipe::OpenOptions::new()
101 /// .read_write(true)
102 /// .open_sender("path/to/a/fifo");
103 /// ```
104 ///
105 /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
106 /// fail with [`UnexpectedEof`] during reading if all writing ends of the
107 /// pipe close the FIFO file.
108 ///
109 /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
110 ///
111 /// ```ignore
112 /// use tokio::net::unix::pipe;
113 ///
114 /// let tx = pipe::OpenOptions::new()
115 /// .read_write(true)
116 /// .open_receiver("path/to/a/fifo");
117 /// ```
118 #[cfg(target_os = "linux")]
119 #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
120 pub fn read_write(&mut self, value: bool) -> &mut Self {
121 self.read_write = value;
122 self
123 }
124
125 /// Sets the option to skip the check for FIFO file type.
126 ///
127 /// By default, [`open_receiver`] and [`open_sender`] functions will check
128 /// if the opened file is a FIFO file. Set this option to `true` if you are
129 /// sure the file is a FIFO file.
130 ///
131 /// [`open_receiver`]: OpenOptions::open_receiver
132 /// [`open_sender`]: OpenOptions::open_sender
133 ///
134 /// # Examples
135 ///
136 /// ```no_run
137 /// use tokio::net::unix::pipe;
138 /// use nix::{unistd::mkfifo, sys::stat::Mode};
139 /// # use std::error::Error;
140 ///
141 /// // Our program has exclusive access to this path.
142 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
143 ///
144 /// # async fn dox() -> Result<(), Box<dyn Error>> {
145 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
146 /// let rx = pipe::OpenOptions::new()
147 /// .unchecked(true)
148 /// .open_receiver(FIFO_NAME)?;
149 /// # Ok(())
150 /// # }
151 /// ```
152 pub fn unchecked(&mut self, value: bool) -> &mut Self {
153 self.unchecked = value;
154 self
155 }
156
157 /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
158 ///
159 /// This function will open the FIFO file at the specified path, possibly
160 /// check if it is a pipe, and associate the pipe with the default event
161 /// loop for reading.
162 ///
163 /// # Errors
164 ///
165 /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
166 /// This function may also fail with other standard OS errors.
167 ///
168 /// # Panics
169 ///
170 /// This function panics if it is not called from within a runtime with
171 /// IO enabled.
172 ///
173 /// The runtime is usually set implicitly when this function is called
174 /// from a future driven by a tokio runtime, otherwise runtime can be set
175 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
176 pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
177 let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
178 Receiver::from_file_unchecked(file)
179 }
180
181 /// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
182 ///
183 /// This function will open the FIFO file at the specified path, possibly
184 /// check if it is a pipe, and associate the pipe with the default event
185 /// loop for writing.
186 ///
187 /// # Errors
188 ///
189 /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
190 /// If the file is not opened in read-write access mode and the file is not
191 /// currently open for reading, this function will fail with `ENXIO`.
192 /// This function may also fail with other standard OS errors.
193 ///
194 /// # Panics
195 ///
196 /// This function panics if it is not called from within a runtime with
197 /// IO enabled.
198 ///
199 /// The runtime is usually set implicitly when this function is called
200 /// from a future driven by a tokio runtime, otherwise runtime can be set
201 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
202 pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
203 let file = self.open(path.as_ref(), PipeEnd::Sender)?;
204 Sender::from_file_unchecked(file)
205 }
206
207 fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
208 let mut options = std::fs::OpenOptions::new();
209 options
210 .read(pipe_end == PipeEnd::Receiver)
211 .write(pipe_end == PipeEnd::Sender)
212 .custom_flags(libc::O_NONBLOCK);
213
214 #[cfg(target_os = "linux")]
215 if self.read_write {
216 options.read(true).write(true);
217 }
218
219 let file = options.open(path)?;
220
221 if !self.unchecked && !is_fifo(&file)? {
222 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
223 }
224
225 Ok(file)
226 }
227}
228
229impl Default for OpenOptions {
230 fn default() -> OpenOptions {
231 OpenOptions::new()
232 }
233}
234
235#[derive(Clone, Copy, PartialEq, Eq, Debug)]
236enum PipeEnd {
237 Sender,
238 Receiver,
239}
240
241/// Writing end of a Unix pipe.
242///
243/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
244///
245/// Opening a named pipe for writing involves a few steps.
246/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
247/// different things:
248///
249/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
250/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
251/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
252/// Sleep for a while and try again.
253/// * Other OS errors not specific to opening FIFO files.
254///
255/// Opening a `Sender` from a FIFO file should look like this:
256///
257/// ```no_run
258/// use tokio::net::unix::pipe;
259/// use tokio::time::{self, Duration};
260///
261/// const FIFO_NAME: &str = "path/to/a/fifo";
262///
263/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
264/// // Wait for a reader to open the file.
265/// let tx = loop {
266/// match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
267/// Ok(tx) => break tx,
268/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
269/// Err(e) => return Err(e.into()),
270/// }
271///
272/// time::sleep(Duration::from_millis(50)).await;
273/// };
274/// # Ok(())
275/// # }
276/// ```
277///
278/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
279/// loop. This is done by opening a named pipe in read-write access mode with
280/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
281/// both a writing end and a reading end, and the latter allows to open a FIFO
282/// without [`ENXIO`] error since the pipe is open for reading as well.
283///
284/// `Sender` cannot be used to read from a pipe, so in practice the read access
285/// is only used when a FIFO is opened. However, using a `Sender` in read-write
286/// mode **may lead to lost data**, because written data will be dropped by the
287/// system as soon as all pipe ends are closed. To avoid lost data you have to
288/// make sure that a reading end has been opened before dropping a `Sender`.
289///
290/// Note that using read-write access mode with FIFO files is not defined by
291/// the POSIX standard and it is only guaranteed to work on Linux.
292///
293/// ```ignore
294/// use tokio::io::AsyncWriteExt;
295/// use tokio::net::unix::pipe;
296///
297/// const FIFO_NAME: &str = "path/to/a/fifo";
298///
299/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
300/// let mut tx = pipe::OpenOptions::new()
301/// .read_write(true)
302/// .open_sender(FIFO_NAME)?;
303///
304/// // Asynchronously write to the pipe before a reader.
305/// tx.write_all(b"hello world").await?;
306/// # Ok(())
307/// # }
308/// ```
309///
310/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
311#[derive(Debug)]
312pub struct Sender {
313 io: PollEvented<mio_pipe::Sender>,
314}
315
316impl Sender {
317 fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
318 let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
319 Ok(Sender { io })
320 }
321
322 /// Creates a new `Sender` from a [`File`].
323 ///
324 /// This function is intended to construct a pipe from a [`File`] representing
325 /// a special FIFO file. It will check if the file is a pipe and has write access,
326 /// set it in non-blocking mode and perform the conversion.
327 ///
328 /// # Errors
329 ///
330 /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
331 /// does not have write access. Also fails with any standard OS error if it occurs.
332 ///
333 /// # Panics
334 ///
335 /// This function panics if it is not called from within a runtime with
336 /// IO enabled.
337 ///
338 /// The runtime is usually set implicitly when this function is called
339 /// from a future driven by a tokio runtime, otherwise runtime can be set
340 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
341 pub fn from_file(mut file: File) -> io::Result<Sender> {
342 if !is_fifo(&file)? {
343 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
344 }
345
346 let flags = get_file_flags(&file)?;
347 if has_write_access(flags) {
348 set_nonblocking(&mut file, flags)?;
349 Sender::from_file_unchecked(file)
350 } else {
351 Err(io::Error::new(
352 io::ErrorKind::InvalidInput,
353 "not in O_WRONLY or O_RDWR access mode",
354 ))
355 }
356 }
357
358 /// Creates a new `Sender` from a [`File`] without checking pipe properties.
359 ///
360 /// This function is intended to construct a pipe from a File representing
361 /// a special FIFO file. The conversion assumes nothing about the underlying
362 /// file; it is left up to the user to make sure it is opened with write access,
363 /// represents a pipe and is set in non-blocking mode.
364 ///
365 /// # Examples
366 ///
367 /// ```no_run
368 /// use tokio::net::unix::pipe;
369 /// use std::fs::OpenOptions;
370 /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
371 /// # use std::error::Error;
372 ///
373 /// const FIFO_NAME: &str = "path/to/a/fifo";
374 ///
375 /// # async fn dox() -> Result<(), Box<dyn Error>> {
376 /// let file = OpenOptions::new()
377 /// .write(true)
378 /// .custom_flags(libc::O_NONBLOCK)
379 /// .open(FIFO_NAME)?;
380 /// if file.metadata()?.file_type().is_fifo() {
381 /// let tx = pipe::Sender::from_file_unchecked(file)?;
382 /// /* use the Sender */
383 /// }
384 /// # Ok(())
385 /// # }
386 /// ```
387 ///
388 /// # Panics
389 ///
390 /// This function panics if it is not called from within a runtime with
391 /// IO enabled.
392 ///
393 /// The runtime is usually set implicitly when this function is called
394 /// from a future driven by a tokio runtime, otherwise runtime can be set
395 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
396 pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
397 let raw_fd = file.into_raw_fd();
398 let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) };
399 Sender::from_mio(mio_tx)
400 }
401
402 /// Waits for any of the requested ready states.
403 ///
404 /// This function can be used instead of [`writable()`] to check the returned
405 /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
406 ///
407 /// The function may complete without the pipe being ready. This is a
408 /// false-positive and attempting an operation will return with
409 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
410 /// [`Ready`] set, so you should always check the returned value and possibly
411 /// wait again if the requested states are not set.
412 ///
413 /// [`writable()`]: Self::writable
414 ///
415 /// # Cancel safety
416 ///
417 /// This method is cancel safe. Once a readiness event occurs, the method
418 /// will continue to return immediately until the readiness event is
419 /// consumed by an attempt to write that fails with `WouldBlock` or
420 /// `Poll::Pending`.
421 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
422 let event = self.io.registration().readiness(interest).await?;
423 Ok(event.ready)
424 }
425
426 /// Waits for the pipe to become writable.
427 ///
428 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
429 /// paired with [`try_write()`].
430 ///
431 /// [`try_write()`]: Self::try_write
432 ///
433 /// # Examples
434 ///
435 /// ```no_run
436 /// use tokio::net::unix::pipe;
437 /// use std::io;
438 ///
439 /// #[tokio::main]
440 /// async fn main() -> io::Result<()> {
441 /// // Open a writing end of a fifo
442 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
443 ///
444 /// loop {
445 /// // Wait for the pipe to be writable
446 /// tx.writable().await?;
447 ///
448 /// // Try to write data, this may still fail with `WouldBlock`
449 /// // if the readiness event is a false positive.
450 /// match tx.try_write(b"hello world") {
451 /// Ok(n) => {
452 /// break;
453 /// }
454 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
455 /// continue;
456 /// }
457 /// Err(e) => {
458 /// return Err(e.into());
459 /// }
460 /// }
461 /// }
462 ///
463 /// Ok(())
464 /// }
465 /// ```
466 pub async fn writable(&self) -> io::Result<()> {
467 self.ready(Interest::WRITABLE).await?;
468 Ok(())
469 }
470
471 /// Polls for write readiness.
472 ///
473 /// If the pipe is not currently ready for writing, this method will
474 /// store a clone of the `Waker` from the provided `Context`. When the pipe
475 /// becomes ready for writing, `Waker::wake` will be called on the waker.
476 ///
477 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
478 /// the `Waker` from the `Context` passed to the most recent call is
479 /// scheduled to receive a wakeup.
480 ///
481 /// This function is intended for cases where creating and pinning a future
482 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
483 /// preferred, as this supports polling from multiple tasks at once.
484 ///
485 /// [`writable`]: Self::writable
486 ///
487 /// # Return value
488 ///
489 /// The function returns:
490 ///
491 /// * `Poll::Pending` if the pipe is not ready for writing.
492 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
493 /// * `Poll::Ready(Err(e))` if an error is encountered.
494 ///
495 /// # Errors
496 ///
497 /// This function may encounter any standard I/O error except `WouldBlock`.
498 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
499 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
500 }
501
502 /// Tries to write a buffer to the pipe, returning how many bytes were
503 /// written.
504 ///
505 /// The function will attempt to write the entire contents of `buf`, but
506 /// only part of the buffer may be written. If the length of `buf` is not
507 /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
508 /// write is guaranteed to be atomic, i.e. either the entire content of
509 /// `buf` will be written or this method will fail with `WouldBlock`. There
510 /// is no such guarantee if `buf` is larger than `PIPE_BUF`.
511 ///
512 /// This function is usually paired with [`writable`].
513 ///
514 /// [`writable`]: Self::writable
515 ///
516 /// # Return
517 ///
518 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
519 /// number of bytes written. If the pipe is not ready to write data,
520 /// `Err(io::ErrorKind::WouldBlock)` is returned.
521 ///
522 /// # Examples
523 ///
524 /// ```no_run
525 /// use tokio::net::unix::pipe;
526 /// use std::io;
527 ///
528 /// #[tokio::main]
529 /// async fn main() -> io::Result<()> {
530 /// // Open a writing end of a fifo
531 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
532 ///
533 /// loop {
534 /// // Wait for the pipe to be writable
535 /// tx.writable().await?;
536 ///
537 /// // Try to write data, this may still fail with `WouldBlock`
538 /// // if the readiness event is a false positive.
539 /// match tx.try_write(b"hello world") {
540 /// Ok(n) => {
541 /// break;
542 /// }
543 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
544 /// continue;
545 /// }
546 /// Err(e) => {
547 /// return Err(e.into());
548 /// }
549 /// }
550 /// }
551 ///
552 /// Ok(())
553 /// }
554 /// ```
555 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
556 self.io
557 .registration()
558 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
559 }
560
561 /// Tries to write several buffers to the pipe, returning how many bytes
562 /// were written.
563 ///
564 /// Data is written from each buffer in order, with the final buffer read
565 /// from possible being only partially consumed. This method behaves
566 /// equivalently to a single call to [`try_write()`] with concatenated
567 /// buffers.
568 ///
569 /// If the total length of buffers is not greater than `PIPE_BUF` (an OS
570 /// constant, 4096 under Linux), then the write is guaranteed to be atomic,
571 /// i.e. either the entire contents of buffers will be written or this
572 /// method will fail with `WouldBlock`. There is no such guarantee if the
573 /// total length of buffers is greater than `PIPE_BUF`.
574 ///
575 /// This function is usually paired with [`writable`].
576 ///
577 /// [`try_write()`]: Self::try_write()
578 /// [`writable`]: Self::writable
579 ///
580 /// # Return
581 ///
582 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
583 /// number of bytes written. If the pipe is not ready to write data,
584 /// `Err(io::ErrorKind::WouldBlock)` is returned.
585 ///
586 /// # Examples
587 ///
588 /// ```no_run
589 /// use tokio::net::unix::pipe;
590 /// use std::io;
591 ///
592 /// #[tokio::main]
593 /// async fn main() -> io::Result<()> {
594 /// // Open a writing end of a fifo
595 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
596 ///
597 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
598 ///
599 /// loop {
600 /// // Wait for the pipe to be writable
601 /// tx.writable().await?;
602 ///
603 /// // Try to write data, this may still fail with `WouldBlock`
604 /// // if the readiness event is a false positive.
605 /// match tx.try_write_vectored(&bufs) {
606 /// Ok(n) => {
607 /// break;
608 /// }
609 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
610 /// continue;
611 /// }
612 /// Err(e) => {
613 /// return Err(e.into());
614 /// }
615 /// }
616 /// }
617 ///
618 /// Ok(())
619 /// }
620 /// ```
621 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
622 self.io
623 .registration()
624 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
625 }
626}
627
628impl AsyncWrite for Sender {
629 fn poll_write(
630 self: Pin<&mut Self>,
631 cx: &mut Context<'_>,
632 buf: &[u8],
633 ) -> Poll<io::Result<usize>> {
634 self.io.poll_write(cx, buf)
635 }
636
637 fn poll_write_vectored(
638 self: Pin<&mut Self>,
639 cx: &mut Context<'_>,
640 bufs: &[io::IoSlice<'_>],
641 ) -> Poll<io::Result<usize>> {
642 self.io.poll_write_vectored(cx, bufs)
643 }
644
645 fn is_write_vectored(&self) -> bool {
646 true
647 }
648
649 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
650 Poll::Ready(Ok(()))
651 }
652
653 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
654 Poll::Ready(Ok(()))
655 }
656}
657
658impl AsRawFd for Sender {
659 fn as_raw_fd(&self) -> RawFd {
660 self.io.as_raw_fd()
661 }
662}
663
664impl AsFd for Sender {
665 fn as_fd(&self) -> BorrowedFd<'_> {
666 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
667 }
668}
669
670/// Reading end of a Unix pipe.
671///
672/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
673///
674/// # Examples
675///
676/// Receiving messages from a named pipe in a loop:
677///
678/// ```no_run
679/// use tokio::net::unix::pipe;
680/// use tokio::io::{self, AsyncReadExt};
681///
682/// const FIFO_NAME: &str = "path/to/a/fifo";
683///
684/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
685/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
686/// loop {
687/// let mut msg = vec![0; 256];
688/// match rx.read_exact(&mut msg).await {
689/// Ok(_) => {
690/// /* handle the message */
691/// }
692/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
693/// // Writing end has been closed, we should reopen the pipe.
694/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
695/// }
696/// Err(e) => return Err(e.into()),
697/// }
698/// }
699/// # }
700/// ```
701///
702/// On Linux, you can use a `Receiver` in read-write access mode to implement
703/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
704/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
705/// when the writing end is closed. This way, a `Receiver` can asynchronously
706/// wait for the next writer to open the pipe.
707///
708/// You should not use functions waiting for EOF such as [`read_to_end`] with
709/// a `Receiver` in read-write access mode, since it **may wait forever**.
710/// `Receiver` in this mode also holds an open writing end, which prevents
711/// receiving EOF.
712///
713/// To set the read-write access mode you can use `OpenOptions::read_write`.
714/// Note that using read-write access mode with FIFO files is not defined by
715/// the POSIX standard and it is only guaranteed to work on Linux.
716///
717/// ```ignore
718/// use tokio::net::unix::pipe;
719/// use tokio::io::AsyncReadExt;
720/// # use std::error::Error;
721///
722/// const FIFO_NAME: &str = "path/to/a/fifo";
723///
724/// # async fn dox() -> Result<(), Box<dyn Error>> {
725/// let mut rx = pipe::OpenOptions::new()
726/// .read_write(true)
727/// .open_receiver(FIFO_NAME)?;
728/// loop {
729/// let mut msg = vec![0; 256];
730/// rx.read_exact(&mut msg).await?;
731/// /* handle the message */
732/// }
733/// # }
734/// ```
735///
736/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
737#[derive(Debug)]
738pub struct Receiver {
739 io: PollEvented<mio_pipe::Receiver>,
740}
741
742impl Receiver {
743 fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
744 let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
745 Ok(Receiver { io })
746 }
747
748 /// Creates a new `Receiver` from a [`File`].
749 ///
750 /// This function is intended to construct a pipe from a [`File`] representing
751 /// a special FIFO file. It will check if the file is a pipe and has read access,
752 /// set it in non-blocking mode and perform the conversion.
753 ///
754 /// # Errors
755 ///
756 /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
757 /// does not have read access. Also fails with any standard OS error if it occurs.
758 ///
759 /// # Panics
760 ///
761 /// This function panics if it is not called from within a runtime with
762 /// IO enabled.
763 ///
764 /// The runtime is usually set implicitly when this function is called
765 /// from a future driven by a tokio runtime, otherwise runtime can be set
766 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
767 pub fn from_file(mut file: File) -> io::Result<Receiver> {
768 if !is_fifo(&file)? {
769 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
770 }
771
772 let flags = get_file_flags(&file)?;
773 if has_read_access(flags) {
774 set_nonblocking(&mut file, flags)?;
775 Receiver::from_file_unchecked(file)
776 } else {
777 Err(io::Error::new(
778 io::ErrorKind::InvalidInput,
779 "not in O_RDONLY or O_RDWR access mode",
780 ))
781 }
782 }
783
784 /// Creates a new `Receiver` from a [`File`] without checking pipe properties.
785 ///
786 /// This function is intended to construct a pipe from a File representing
787 /// a special FIFO file. The conversion assumes nothing about the underlying
788 /// file; it is left up to the user to make sure it is opened with read access,
789 /// represents a pipe and is set in non-blocking mode.
790 ///
791 /// # Examples
792 ///
793 /// ```no_run
794 /// use tokio::net::unix::pipe;
795 /// use std::fs::OpenOptions;
796 /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
797 /// # use std::error::Error;
798 ///
799 /// const FIFO_NAME: &str = "path/to/a/fifo";
800 ///
801 /// # async fn dox() -> Result<(), Box<dyn Error>> {
802 /// let file = OpenOptions::new()
803 /// .read(true)
804 /// .custom_flags(libc::O_NONBLOCK)
805 /// .open(FIFO_NAME)?;
806 /// if file.metadata()?.file_type().is_fifo() {
807 /// let rx = pipe::Receiver::from_file_unchecked(file)?;
808 /// /* use the Receiver */
809 /// }
810 /// # Ok(())
811 /// # }
812 /// ```
813 ///
814 /// # Panics
815 ///
816 /// This function panics if it is not called from within a runtime with
817 /// IO enabled.
818 ///
819 /// The runtime is usually set implicitly when this function is called
820 /// from a future driven by a tokio runtime, otherwise runtime can be set
821 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
822 pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
823 let raw_fd = file.into_raw_fd();
824 let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) };
825 Receiver::from_mio(mio_rx)
826 }
827
828 /// Waits for any of the requested ready states.
829 ///
830 /// This function can be used instead of [`readable()`] to check the returned
831 /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
832 ///
833 /// The function may complete without the pipe being ready. This is a
834 /// false-positive and attempting an operation will return with
835 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
836 /// [`Ready`] set, so you should always check the returned value and possibly
837 /// wait again if the requested states are not set.
838 ///
839 /// [`readable()`]: Self::readable
840 ///
841 /// # Cancel safety
842 ///
843 /// This method is cancel safe. Once a readiness event occurs, the method
844 /// will continue to return immediately until the readiness event is
845 /// consumed by an attempt to read that fails with `WouldBlock` or
846 /// `Poll::Pending`.
847 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
848 let event = self.io.registration().readiness(interest).await?;
849 Ok(event.ready)
850 }
851
852 /// Waits for the pipe to become readable.
853 ///
854 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
855 /// paired with [`try_read()`].
856 ///
857 /// [`try_read()`]: Self::try_read()
858 ///
859 /// # Examples
860 ///
861 /// ```no_run
862 /// use tokio::net::unix::pipe;
863 /// use std::io;
864 ///
865 /// #[tokio::main]
866 /// async fn main() -> io::Result<()> {
867 /// // Open a reading end of a fifo
868 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
869 ///
870 /// let mut msg = vec![0; 1024];
871 ///
872 /// loop {
873 /// // Wait for the pipe to be readable
874 /// rx.readable().await?;
875 ///
876 /// // Try to read data, this may still fail with `WouldBlock`
877 /// // if the readiness event is a false positive.
878 /// match rx.try_read(&mut msg) {
879 /// Ok(n) => {
880 /// msg.truncate(n);
881 /// break;
882 /// }
883 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
884 /// continue;
885 /// }
886 /// Err(e) => {
887 /// return Err(e.into());
888 /// }
889 /// }
890 /// }
891 ///
892 /// println!("GOT = {:?}", msg);
893 /// Ok(())
894 /// }
895 /// ```
896 pub async fn readable(&self) -> io::Result<()> {
897 self.ready(Interest::READABLE).await?;
898 Ok(())
899 }
900
901 /// Polls for read readiness.
902 ///
903 /// If the pipe is not currently ready for reading, this method will
904 /// store a clone of the `Waker` from the provided `Context`. When the pipe
905 /// becomes ready for reading, `Waker::wake` will be called on the waker.
906 ///
907 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
908 /// the `Waker` from the `Context` passed to the most recent call is
909 /// scheduled to receive a wakeup.
910 ///
911 /// This function is intended for cases where creating and pinning a future
912 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
913 /// preferred, as this supports polling from multiple tasks at once.
914 ///
915 /// [`readable`]: Self::readable
916 ///
917 /// # Return value
918 ///
919 /// The function returns:
920 ///
921 /// * `Poll::Pending` if the pipe is not ready for reading.
922 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
923 /// * `Poll::Ready(Err(e))` if an error is encountered.
924 ///
925 /// # Errors
926 ///
927 /// This function may encounter any standard I/O error except `WouldBlock`.
928 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
929 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
930 }
931
932 /// Tries to read data from the pipe into the provided buffer, returning how
933 /// many bytes were read.
934 ///
935 /// Reads any pending data from the pipe but does not wait for new data
936 /// to arrive. On success, returns the number of bytes read. Because
937 /// `try_read()` is non-blocking, the buffer does not have to be stored by
938 /// the async task and can exist entirely on the stack.
939 ///
940 /// Usually [`readable()`] is used with this function.
941 ///
942 /// [`readable()`]: Self::readable()
943 ///
944 /// # Return
945 ///
946 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
947 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
948 ///
949 /// 1. The pipe's writing end is closed and will no longer write data.
950 /// 2. The specified buffer was 0 bytes in length.
951 ///
952 /// If the pipe is not ready to read data,
953 /// `Err(io::ErrorKind::WouldBlock)` is returned.
954 ///
955 /// # Examples
956 ///
957 /// ```no_run
958 /// use tokio::net::unix::pipe;
959 /// use std::io;
960 ///
961 /// #[tokio::main]
962 /// async fn main() -> io::Result<()> {
963 /// // Open a reading end of a fifo
964 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
965 ///
966 /// let mut msg = vec![0; 1024];
967 ///
968 /// loop {
969 /// // Wait for the pipe to be readable
970 /// rx.readable().await?;
971 ///
972 /// // Try to read data, this may still fail with `WouldBlock`
973 /// // if the readiness event is a false positive.
974 /// match rx.try_read(&mut msg) {
975 /// Ok(n) => {
976 /// msg.truncate(n);
977 /// break;
978 /// }
979 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
980 /// continue;
981 /// }
982 /// Err(e) => {
983 /// return Err(e.into());
984 /// }
985 /// }
986 /// }
987 ///
988 /// println!("GOT = {:?}", msg);
989 /// Ok(())
990 /// }
991 /// ```
992 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
993 self.io
994 .registration()
995 .try_io(Interest::READABLE, || (&*self.io).read(buf))
996 }
997
998 /// Tries to read data from the pipe into the provided buffers, returning
999 /// how many bytes were read.
1000 ///
1001 /// Data is copied to fill each buffer in order, with the final buffer
1002 /// written to possibly being only partially filled. This method behaves
1003 /// equivalently to a single call to [`try_read()`] with concatenated
1004 /// buffers.
1005 ///
1006 /// Reads any pending data from the pipe but does not wait for new data
1007 /// to arrive. On success, returns the number of bytes read. Because
1008 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1009 /// stored by the async task and can exist entirely on the stack.
1010 ///
1011 /// Usually, [`readable()`] is used with this function.
1012 ///
1013 /// [`try_read()`]: Self::try_read()
1014 /// [`readable()`]: Self::readable()
1015 ///
1016 /// # Return
1017 ///
1018 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1019 /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1020 /// closed and will no longer write data. If the pipe is not ready to read
1021 /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1022 ///
1023 /// # Examples
1024 ///
1025 /// ```no_run
1026 /// use tokio::net::unix::pipe;
1027 /// use std::io;
1028 ///
1029 /// #[tokio::main]
1030 /// async fn main() -> io::Result<()> {
1031 /// // Open a reading end of a fifo
1032 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1033 ///
1034 /// loop {
1035 /// // Wait for the pipe to be readable
1036 /// rx.readable().await?;
1037 ///
1038 /// // Creating the buffer **after** the `await` prevents it from
1039 /// // being stored in the async task.
1040 /// let mut buf_a = [0; 512];
1041 /// let mut buf_b = [0; 1024];
1042 /// let mut bufs = [
1043 /// io::IoSliceMut::new(&mut buf_a),
1044 /// io::IoSliceMut::new(&mut buf_b),
1045 /// ];
1046 ///
1047 /// // Try to read data, this may still fail with `WouldBlock`
1048 /// // if the readiness event is a false positive.
1049 /// match rx.try_read_vectored(&mut bufs) {
1050 /// Ok(0) => break,
1051 /// Ok(n) => {
1052 /// println!("read {} bytes", n);
1053 /// }
1054 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1055 /// continue;
1056 /// }
1057 /// Err(e) => {
1058 /// return Err(e.into());
1059 /// }
1060 /// }
1061 /// }
1062 ///
1063 /// Ok(())
1064 /// }
1065 /// ```
1066 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1067 self.io
1068 .registration()
1069 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1070 }
1071
1072 cfg_io_util! {
1073 /// Tries to read data from the pipe into the provided buffer, advancing the
1074 /// buffer's internal cursor, returning how many bytes were read.
1075 ///
1076 /// Reads any pending data from the pipe but does not wait for new data
1077 /// to arrive. On success, returns the number of bytes read. Because
1078 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1079 /// the async task and can exist entirely on the stack.
1080 ///
1081 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1082 ///
1083 /// [`readable()`]: Self::readable
1084 /// [`ready()`]: Self::ready
1085 ///
1086 /// # Return
1087 ///
1088 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1089 /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1090 /// closed and will no longer write data. If the pipe is not ready to read
1091 /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1092 ///
1093 /// # Examples
1094 ///
1095 /// ```no_run
1096 /// use tokio::net::unix::pipe;
1097 /// use std::io;
1098 ///
1099 /// #[tokio::main]
1100 /// async fn main() -> io::Result<()> {
1101 /// // Open a reading end of a fifo
1102 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1103 ///
1104 /// loop {
1105 /// // Wait for the pipe to be readable
1106 /// rx.readable().await?;
1107 ///
1108 /// let mut buf = Vec::with_capacity(4096);
1109 ///
1110 /// // Try to read data, this may still fail with `WouldBlock`
1111 /// // if the readiness event is a false positive.
1112 /// match rx.try_read_buf(&mut buf) {
1113 /// Ok(0) => break,
1114 /// Ok(n) => {
1115 /// println!("read {} bytes", n);
1116 /// }
1117 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1118 /// continue;
1119 /// }
1120 /// Err(e) => {
1121 /// return Err(e.into());
1122 /// }
1123 /// }
1124 /// }
1125 ///
1126 /// Ok(())
1127 /// }
1128 /// ```
1129 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1130 self.io.registration().try_io(Interest::READABLE, || {
1131 use std::io::Read;
1132
1133 let dst = buf.chunk_mut();
1134 let dst =
1135 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1136
1137 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1138 // which correctly handles reads into uninitialized memory.
1139 let n = (&*self.io).read(dst)?;
1140
1141 unsafe {
1142 buf.advance_mut(n);
1143 }
1144
1145 Ok(n)
1146 })
1147 }
1148 }
1149}
1150
1151impl AsyncRead for Receiver {
1152 fn poll_read(
1153 self: Pin<&mut Self>,
1154 cx: &mut Context<'_>,
1155 buf: &mut ReadBuf<'_>,
1156 ) -> Poll<io::Result<()>> {
1157 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1158 // which correctly handles reads into uninitialized memory.
1159 unsafe { self.io.poll_read(cx, buf) }
1160 }
1161}
1162
1163impl AsRawFd for Receiver {
1164 fn as_raw_fd(&self) -> RawFd {
1165 self.io.as_raw_fd()
1166 }
1167}
1168
1169impl AsFd for Receiver {
1170 fn as_fd(&self) -> BorrowedFd<'_> {
1171 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1172 }
1173}
1174
1175/// Checks if file is a FIFO
1176fn is_fifo(file: &File) -> io::Result<bool> {
1177 Ok(file.metadata()?.file_type().is_fifo())
1178}
1179
1180/// Gets file descriptor's flags by fcntl.
1181fn get_file_flags(file: &File) -> io::Result<libc::c_int> {
1182 let fd = file.as_raw_fd();
1183 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
1184 if flags < 0 {
1185 Err(io::Error::last_os_error())
1186 } else {
1187 Ok(flags)
1188 }
1189}
1190
1191/// Checks for `O_RDONLY` or `O_RDWR` access mode.
1192fn has_read_access(flags: libc::c_int) -> bool {
1193 let mode = flags & libc::O_ACCMODE;
1194 mode == libc::O_RDONLY || mode == libc::O_RDWR
1195}
1196
1197/// Checks for `O_WRONLY` or `O_RDWR` access mode.
1198fn has_write_access(flags: libc::c_int) -> bool {
1199 let mode = flags & libc::O_ACCMODE;
1200 mode == libc::O_WRONLY || mode == libc::O_RDWR
1201}
1202
1203/// Sets file's flags with `O_NONBLOCK` by fcntl.
1204fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> {
1205 let fd = file.as_raw_fd();
1206
1207 let flags = current_flags | libc::O_NONBLOCK;
1208
1209 if flags != current_flags {
1210 let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) };
1211 if ret < 0 {
1212 return Err(io::Error::last_os_error());
1213 }
1214 }
1215
1216 Ok(())
1217}
1218