1//! Unix pipe.
2//!
3//! See the [`new`] function for documentation.
4
5use std::io;
6use std::os::unix::io::RawFd;
7
8pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9 let mut fds: [RawFd; 2] = [-1, -1];
10
11 #[cfg(any(
12 target_os = "android",
13 target_os = "dragonfly",
14 target_os = "freebsd",
15 target_os = "linux",
16 target_os = "netbsd",
17 target_os = "openbsd",
18 target_os = "illumos",
19 target_os = "redox",
20 target_os = "solaris",
21 target_os = "vita",
22 ))]
23 unsafe {
24 if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
25 return Err(io::Error::last_os_error());
26 }
27 }
28
29 #[cfg(any(
30 target_os = "aix",
31 target_os = "ios",
32 target_os = "macos",
33 target_os = "tvos",
34 target_os = "watchos",
35 target_os = "espidf",
36 ))]
37 unsafe {
38 // For platforms that don't have `pipe2(2)` we need to manually set the
39 // correct flags on the file descriptor.
40 if libc::pipe(fds.as_mut_ptr()) != 0 {
41 return Err(io::Error::last_os_error());
42 }
43
44 for fd in &fds {
45 if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
46 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
47 {
48 let err = io::Error::last_os_error();
49 // Don't leak file descriptors. Can't handle closing error though.
50 let _ = libc::close(fds[0]);
51 let _ = libc::close(fds[1]);
52 return Err(err);
53 }
54 }
55 }
56
57 #[cfg(not(any(
58 target_os = "aix",
59 target_os = "android",
60 target_os = "dragonfly",
61 target_os = "freebsd",
62 target_os = "illumos",
63 target_os = "ios",
64 target_os = "linux",
65 target_os = "macos",
66 target_os = "netbsd",
67 target_os = "openbsd",
68 target_os = "redox",
69 target_os = "tvos",
70 target_os = "watchos",
71 target_os = "espidf",
72 target_os = "solaris",
73 target_os = "vita",
74 )))]
75 compile_error!("unsupported target for `mio::unix::pipe`");
76
77 Ok(fds)
78}
79
80cfg_os_ext! {
81use std::fs::File;
82use std::io::{IoSlice, IoSliceMut, Read, Write};
83use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
84use std::process::{ChildStderr, ChildStdin, ChildStdout};
85
86use crate::io_source::IoSource;
87use crate::{event, Interest, Registry, Token};
88
89/// Create a new non-blocking Unix pipe.
90///
91/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
92/// inter-process or thread communication channel.
93///
94/// This channel may be created before forking the process and then one end used
95/// in each process, e.g. the parent process has the sending end to send command
96/// to the child process.
97///
98/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
99///
100/// # Events
101///
102/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
103/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
104/// written to the `Sender` the `Receiver` will receive an [readable event].
105///
106/// In addition to those events, events will also be generated if the other side
107/// is dropped. To check if the `Sender` is dropped you'll need to check
108/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
109/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
110/// returns true the `Receiver` was dropped. Also see the second example below.
111///
112/// [`WRITABLE`]: Interest::WRITABLE
113/// [writable events]: event::Event::is_writable
114/// [`READABLE`]: Interest::READABLE
115/// [readable event]: event::Event::is_readable
116/// [`is_read_closed`]: event::Event::is_read_closed
117/// [`is_write_closed`]: event::Event::is_write_closed
118///
119/// # Deregistering
120///
121/// Both `Sender` and `Receiver` will deregister themselves when dropped,
122/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
123///
124/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
125///
126/// # Examples
127///
128/// Simple example that writes data into the sending end and read it from the
129/// receiving end.
130///
131/// ```
132/// use std::io::{self, Read, Write};
133///
134/// use mio::{Poll, Events, Interest, Token};
135/// use mio::unix::pipe;
136///
137/// // Unique tokens for the two ends of the channel.
138/// const PIPE_RECV: Token = Token(0);
139/// const PIPE_SEND: Token = Token(1);
140///
141/// # fn main() -> io::Result<()> {
142/// // Create our `Poll` instance and the `Events` container.
143/// let mut poll = Poll::new()?;
144/// let mut events = Events::with_capacity(8);
145///
146/// // Create a new pipe.
147/// let (mut sender, mut receiver) = pipe::new()?;
148///
149/// // Register both ends of the channel.
150/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
151/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
152///
153/// const MSG: &[u8; 11] = b"Hello world";
154///
155/// loop {
156/// poll.poll(&mut events, None)?;
157///
158/// for event in events.iter() {
159/// match event.token() {
160/// PIPE_SEND => sender.write(MSG)
161/// .and_then(|n| if n != MSG.len() {
162/// // We'll consider a short write an error in this
163/// // example. NOTE: we can't use `write_all` with
164/// // non-blocking I/O.
165/// Err(io::ErrorKind::WriteZero.into())
166/// } else {
167/// Ok(())
168/// })?,
169/// PIPE_RECV => {
170/// let mut buf = [0; 11];
171/// let n = receiver.read(&mut buf)?;
172/// println!("received: {:?}", &buf[0..n]);
173/// assert_eq!(n, MSG.len());
174/// assert_eq!(&buf, &*MSG);
175/// return Ok(());
176/// },
177/// _ => unreachable!(),
178/// }
179/// }
180/// }
181/// # }
182/// ```
183///
184/// Example that receives an event once the `Sender` is dropped.
185///
186/// ```
187/// # use std::io;
188/// #
189/// # use mio::{Poll, Events, Interest, Token};
190/// # use mio::unix::pipe;
191/// #
192/// # const PIPE_RECV: Token = Token(0);
193/// # const PIPE_SEND: Token = Token(1);
194/// #
195/// # fn main() -> io::Result<()> {
196/// // Same setup as in the example above.
197/// let mut poll = Poll::new()?;
198/// let mut events = Events::with_capacity(8);
199///
200/// let (mut sender, mut receiver) = pipe::new()?;
201///
202/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
203/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
204///
205/// // Drop the sender.
206/// drop(sender);
207///
208/// poll.poll(&mut events, None)?;
209///
210/// for event in events.iter() {
211/// match event.token() {
212/// PIPE_RECV if event.is_read_closed() => {
213/// // Detected that the sender was dropped.
214/// println!("Sender dropped!");
215/// return Ok(());
216/// },
217/// _ => unreachable!(),
218/// }
219/// }
220/// # unreachable!();
221/// # }
222/// ```
223pub fn new() -> io::Result<(Sender, Receiver)> {
224 let fds = new_raw()?;
225 // SAFETY: `new_raw` initialised the `fds` above.
226 let r = unsafe { Receiver::from_raw_fd(fds[0]) };
227 let w = unsafe { Sender::from_raw_fd(fds[1]) };
228 Ok((w, r))
229}
230
231/// Sending end of an Unix pipe.
232///
233/// See [`new`] for documentation, including examples.
234#[derive(Debug)]
235pub struct Sender {
236 inner: IoSource<File>,
237}
238
239impl Sender {
240 /// Set the `Sender` into or out of non-blocking mode.
241 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
242 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
243 }
244
245 /// Execute an I/O operation ensuring that the socket receives more events
246 /// if it hits a [`WouldBlock`] error.
247 ///
248 /// # Notes
249 ///
250 /// This method is required to be called for **all** I/O operations to
251 /// ensure the user will receive events once the socket is ready again after
252 /// returning a [`WouldBlock`] error.
253 ///
254 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
255 ///
256 /// # Examples
257 ///
258 /// ```
259 /// # use std::error::Error;
260 /// #
261 /// # fn main() -> Result<(), Box<dyn Error>> {
262 /// use std::io;
263 /// use std::os::unix::io::AsRawFd;
264 /// use mio::unix::pipe;
265 ///
266 /// let (sender, receiver) = pipe::new()?;
267 ///
268 /// // Wait until the sender is writable...
269 ///
270 /// // Write to the sender using a direct libc call, of course the
271 /// // `io::Write` implementation would be easier to use.
272 /// let buf = b"hello";
273 /// let n = sender.try_io(|| {
274 /// let buf_ptr = &buf as *const _ as *const _;
275 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
276 /// if res != -1 {
277 /// Ok(res as usize)
278 /// } else {
279 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
280 /// // should return `WouldBlock` error.
281 /// Err(io::Error::last_os_error())
282 /// }
283 /// })?;
284 /// eprintln!("write {} bytes", n);
285 ///
286 /// // Wait until the receiver is readable...
287 ///
288 /// // Read from the receiver using a direct libc call, of course the
289 /// // `io::Read` implementation would be easier to use.
290 /// let mut buf = [0; 512];
291 /// let n = receiver.try_io(|| {
292 /// let buf_ptr = &mut buf as *mut _ as *mut _;
293 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
294 /// if res != -1 {
295 /// Ok(res as usize)
296 /// } else {
297 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
298 /// // should return `WouldBlock` error.
299 /// Err(io::Error::last_os_error())
300 /// }
301 /// })?;
302 /// eprintln!("read {} bytes", n);
303 /// # Ok(())
304 /// # }
305 /// ```
306 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
307 where
308 F: FnOnce() -> io::Result<T>,
309 {
310 self.inner.do_io(|_| f())
311 }
312}
313
314impl event::Source for Sender {
315 fn register(
316 &mut self,
317 registry: &Registry,
318 token: Token,
319 interests: Interest,
320 ) -> io::Result<()> {
321 self.inner.register(registry, token, interests)
322 }
323
324 fn reregister(
325 &mut self,
326 registry: &Registry,
327 token: Token,
328 interests: Interest,
329 ) -> io::Result<()> {
330 self.inner.reregister(registry, token, interests)
331 }
332
333 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
334 self.inner.deregister(registry)
335 }
336}
337
338impl Write for Sender {
339 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
340 self.inner.do_io(|mut sender| sender.write(buf))
341 }
342
343 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
344 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
345 }
346
347 fn flush(&mut self) -> io::Result<()> {
348 self.inner.do_io(|mut sender| sender.flush())
349 }
350}
351
352impl Write for &Sender {
353 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
354 self.inner.do_io(|mut sender| sender.write(buf))
355 }
356
357 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
358 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
359 }
360
361 fn flush(&mut self) -> io::Result<()> {
362 self.inner.do_io(|mut sender| sender.flush())
363 }
364}
365
366/// # Notes
367///
368/// The underlying pipe is **not** set to non-blocking.
369impl From<ChildStdin> for Sender {
370 fn from(stdin: ChildStdin) -> Sender {
371 // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
372 unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
373 }
374}
375
376impl FromRawFd for Sender {
377 unsafe fn from_raw_fd(fd: RawFd) -> Sender {
378 Sender {
379 inner: IoSource::new(File::from_raw_fd(fd)),
380 }
381 }
382}
383
384impl AsRawFd for Sender {
385 fn as_raw_fd(&self) -> RawFd {
386 self.inner.as_raw_fd()
387 }
388}
389
390impl IntoRawFd for Sender {
391 fn into_raw_fd(self) -> RawFd {
392 self.inner.into_inner().into_raw_fd()
393 }
394}
395
396/// Receiving end of an Unix pipe.
397///
398/// See [`new`] for documentation, including examples.
399#[derive(Debug)]
400pub struct Receiver {
401 inner: IoSource<File>,
402}
403
404impl Receiver {
405 /// Set the `Receiver` into or out of non-blocking mode.
406 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
407 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
408 }
409
410 /// Execute an I/O operation ensuring that the socket receives more events
411 /// if it hits a [`WouldBlock`] error.
412 ///
413 /// # Notes
414 ///
415 /// This method is required to be called for **all** I/O operations to
416 /// ensure the user will receive events once the socket is ready again after
417 /// returning a [`WouldBlock`] error.
418 ///
419 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// # use std::error::Error;
425 /// #
426 /// # fn main() -> Result<(), Box<dyn Error>> {
427 /// use std::io;
428 /// use std::os::unix::io::AsRawFd;
429 /// use mio::unix::pipe;
430 ///
431 /// let (sender, receiver) = pipe::new()?;
432 ///
433 /// // Wait until the sender is writable...
434 ///
435 /// // Write to the sender using a direct libc call, of course the
436 /// // `io::Write` implementation would be easier to use.
437 /// let buf = b"hello";
438 /// let n = sender.try_io(|| {
439 /// let buf_ptr = &buf as *const _ as *const _;
440 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
441 /// if res != -1 {
442 /// Ok(res as usize)
443 /// } else {
444 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
445 /// // should return `WouldBlock` error.
446 /// Err(io::Error::last_os_error())
447 /// }
448 /// })?;
449 /// eprintln!("write {} bytes", n);
450 ///
451 /// // Wait until the receiver is readable...
452 ///
453 /// // Read from the receiver using a direct libc call, of course the
454 /// // `io::Read` implementation would be easier to use.
455 /// let mut buf = [0; 512];
456 /// let n = receiver.try_io(|| {
457 /// let buf_ptr = &mut buf as *mut _ as *mut _;
458 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
459 /// if res != -1 {
460 /// Ok(res as usize)
461 /// } else {
462 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
463 /// // should return `WouldBlock` error.
464 /// Err(io::Error::last_os_error())
465 /// }
466 /// })?;
467 /// eprintln!("read {} bytes", n);
468 /// # Ok(())
469 /// # }
470 /// ```
471 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
472 where
473 F: FnOnce() -> io::Result<T>,
474 {
475 self.inner.do_io(|_| f())
476 }
477}
478
479impl event::Source for Receiver {
480 fn register(
481 &mut self,
482 registry: &Registry,
483 token: Token,
484 interests: Interest,
485 ) -> io::Result<()> {
486 self.inner.register(registry, token, interests)
487 }
488
489 fn reregister(
490 &mut self,
491 registry: &Registry,
492 token: Token,
493 interests: Interest,
494 ) -> io::Result<()> {
495 self.inner.reregister(registry, token, interests)
496 }
497
498 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
499 self.inner.deregister(registry)
500 }
501}
502
503impl Read for Receiver {
504 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
505 self.inner.do_io(|mut sender| sender.read(buf))
506 }
507
508 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
509 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
510 }
511}
512
513impl Read for &Receiver {
514 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
515 self.inner.do_io(|mut sender| sender.read(buf))
516 }
517
518 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
519 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
520 }
521}
522
523/// # Notes
524///
525/// The underlying pipe is **not** set to non-blocking.
526impl From<ChildStdout> for Receiver {
527 fn from(stdout: ChildStdout) -> Receiver {
528 // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
529 unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
530 }
531}
532
533/// # Notes
534///
535/// The underlying pipe is **not** set to non-blocking.
536impl From<ChildStderr> for Receiver {
537 fn from(stderr: ChildStderr) -> Receiver {
538 // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
539 unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
540 }
541}
542
543impl FromRawFd for Receiver {
544 unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
545 Receiver {
546 inner: IoSource::new(File::from_raw_fd(fd)),
547 }
548 }
549}
550
551impl AsRawFd for Receiver {
552 fn as_raw_fd(&self) -> RawFd {
553 self.inner.as_raw_fd()
554 }
555}
556
557impl IntoRawFd for Receiver {
558 fn into_raw_fd(self) -> RawFd {
559 self.inner.into_inner().into_raw_fd()
560 }
561}
562
563#[cfg(not(any(target_os = "illumos", target_os = "solaris", target_os = "vita")))]
564fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
565 let value = nonblocking as libc::c_int;
566 if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
567 Err(io::Error::last_os_error())
568 } else {
569 Ok(())
570 }
571}
572
573#[cfg(any(target_os = "illumos", target_os = "solaris", target_os = "vita"))]
574fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
575 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
576 if flags < 0 {
577 return Err(io::Error::last_os_error());
578 }
579
580 let nflags = if nonblocking {
581 flags | libc::O_NONBLOCK
582 } else {
583 flags & !libc::O_NONBLOCK
584 };
585
586 if flags != nflags {
587 if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
588 return Err(io::Error::last_os_error());
589 }
590 }
591
592 Ok(())
593}
594} // `cfg_os_ext!`.
595