1//! Unix pipe.
2//!
3//! See the [`new`] function for documentation.
4
5use std::fs::File;
6use std::io::{self, IoSlice, IoSliceMut, Read, Write};
7use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8use std::process::{ChildStderr, ChildStdin, ChildStdout};
9
10use crate::io_source::IoSource;
11use crate::{event, Interest, Registry, Token};
12
13/// Create a new non-blocking Unix pipe.
14///
15/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
16/// inter-process or thread communication channel.
17///
18/// This channel may be created before forking the process and then one end used
19/// in each process, e.g. the parent process has the sending end to send command
20/// to the child process.
21///
22/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
23///
24/// # Events
25///
26/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
27/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
28/// written to the `Sender` the `Receiver` will receive an [readable event].
29///
30/// In addition to those events, events will also be generated if the other side
31/// is dropped. To check if the `Sender` is dropped you'll need to check
32/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
33/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
34/// returns true the `Receiver` was dropped. Also see the second example below.
35///
36/// [`WRITABLE`]: Interest::WRITABLE
37/// [writable events]: event::Event::is_writable
38/// [`READABLE`]: Interest::READABLE
39/// [readable event]: event::Event::is_readable
40/// [`is_read_closed`]: event::Event::is_read_closed
41/// [`is_write_closed`]: event::Event::is_write_closed
42///
43/// # Deregistering
44///
45/// Both `Sender` and `Receiver` will deregister themselves when dropped,
46/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
47///
48/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
49///
50/// # Examples
51///
52/// Simple example that writes data into the sending end and read it from the
53/// receiving end.
54///
55/// ```
56/// use std::io::{self, Read, Write};
57///
58/// use mio::{Poll, Events, Interest, Token};
59/// use mio::unix::pipe;
60///
61/// // Unique tokens for the two ends of the channel.
62/// const PIPE_RECV: Token = Token(0);
63/// const PIPE_SEND: Token = Token(1);
64///
65/// # fn main() -> io::Result<()> {
66/// // Create our `Poll` instance and the `Events` container.
67/// let mut poll = Poll::new()?;
68/// let mut events = Events::with_capacity(8);
69///
70/// // Create a new pipe.
71/// let (mut sender, mut receiver) = pipe::new()?;
72///
73/// // Register both ends of the channel.
74/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
75/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
76///
77/// const MSG: &[u8; 11] = b"Hello world";
78///
79/// loop {
80/// poll.poll(&mut events, None)?;
81///
82/// for event in events.iter() {
83/// match event.token() {
84/// PIPE_SEND => sender.write(MSG)
85/// .and_then(|n| if n != MSG.len() {
86/// // We'll consider a short write an error in this
87/// // example. NOTE: we can't use `write_all` with
88/// // non-blocking I/O.
89/// Err(io::ErrorKind::WriteZero.into())
90/// } else {
91/// Ok(())
92/// })?,
93/// PIPE_RECV => {
94/// let mut buf = [0; 11];
95/// let n = receiver.read(&mut buf)?;
96/// println!("received: {:?}", &buf[0..n]);
97/// assert_eq!(n, MSG.len());
98/// assert_eq!(&buf, &*MSG);
99/// return Ok(());
100/// },
101/// _ => unreachable!(),
102/// }
103/// }
104/// }
105/// # }
106/// ```
107///
108/// Example that receives an event once the `Sender` is dropped.
109///
110/// ```
111/// # use std::io;
112/// #
113/// # use mio::{Poll, Events, Interest, Token};
114/// # use mio::unix::pipe;
115/// #
116/// # const PIPE_RECV: Token = Token(0);
117/// # const PIPE_SEND: Token = Token(1);
118/// #
119/// # fn main() -> io::Result<()> {
120/// // Same setup as in the example above.
121/// let mut poll = Poll::new()?;
122/// let mut events = Events::with_capacity(8);
123///
124/// let (mut sender, mut receiver) = pipe::new()?;
125///
126/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
127/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
128///
129/// // Drop the sender.
130/// drop(sender);
131///
132/// poll.poll(&mut events, None)?;
133///
134/// for event in events.iter() {
135/// match event.token() {
136/// PIPE_RECV if event.is_read_closed() => {
137/// // Detected that the sender was dropped.
138/// println!("Sender dropped!");
139/// return Ok(());
140/// },
141/// _ => unreachable!(),
142/// }
143/// }
144/// # unreachable!();
145/// # }
146/// ```
147pub fn new() -> io::Result<(Sender, Receiver)> {
148 let mut fds: [RawFd; 2] = [-1, -1];
149
150 #[cfg(any(
151 target_os = "android",
152 target_os = "dragonfly",
153 target_os = "freebsd",
154 target_os = "linux",
155 target_os = "netbsd",
156 target_os = "openbsd",
157 target_os = "illumos",
158 target_os = "redox",
159 ))]
160 unsafe {
161 if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
162 return Err(io::Error::last_os_error());
163 }
164 }
165
166 #[cfg(any(
167 target_os = "ios",
168 target_os = "macos",
169 target_os = "tvos",
170 target_os = "watchos",
171 ))]
172 unsafe {
173 // For platforms that don't have `pipe2(2)` we need to manually set the
174 // correct flags on the file descriptor.
175 if libc::pipe(fds.as_mut_ptr()) != 0 {
176 return Err(io::Error::last_os_error());
177 }
178
179 for fd in &fds {
180 if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
181 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
182 {
183 let err = io::Error::last_os_error();
184 // Don't leak file descriptors. Can't handle closing error though.
185 let _ = libc::close(fds[0]);
186 let _ = libc::close(fds[1]);
187 return Err(err);
188 }
189 }
190 }
191
192 #[cfg(not(any(
193 target_os = "android",
194 target_os = "dragonfly",
195 target_os = "freebsd",
196 target_os = "illumos",
197 target_os = "ios",
198 target_os = "linux",
199 target_os = "macos",
200 target_os = "netbsd",
201 target_os = "openbsd",
202 target_os = "redox",
203 target_os = "tvos",
204 target_os = "watchos",
205 )))]
206 compile_error!("unsupported target for `mio::unix::pipe`");
207
208 // SAFETY: we just initialised the `fds` above.
209 let r = unsafe { Receiver::from_raw_fd(fds[0]) };
210 let w = unsafe { Sender::from_raw_fd(fds[1]) };
211
212 Ok((w, r))
213}
214
215/// Sending end of an Unix pipe.
216///
217/// See [`new`] for documentation, including examples.
218#[derive(Debug)]
219pub struct Sender {
220 inner: IoSource<File>,
221}
222
223impl Sender {
224 /// Set the `Sender` into or out of non-blocking mode.
225 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
226 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
227 }
228
229 /// Execute an I/O operation ensuring that the socket receives more events
230 /// if it hits a [`WouldBlock`] error.
231 ///
232 /// # Notes
233 ///
234 /// This method is required to be called for **all** I/O operations to
235 /// ensure the user will receive events once the socket is ready again after
236 /// returning a [`WouldBlock`] error.
237 ///
238 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
239 ///
240 /// # Examples
241 ///
242 /// ```
243 /// # use std::error::Error;
244 /// #
245 /// # fn main() -> Result<(), Box<dyn Error>> {
246 /// use std::io;
247 /// use std::os::unix::io::AsRawFd;
248 /// use mio::unix::pipe;
249 ///
250 /// let (sender, receiver) = pipe::new()?;
251 ///
252 /// // Wait until the sender is writable...
253 ///
254 /// // Write to the sender using a direct libc call, of course the
255 /// // `io::Write` implementation would be easier to use.
256 /// let buf = b"hello";
257 /// let n = sender.try_io(|| {
258 /// let buf_ptr = &buf as *const _ as *const _;
259 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
260 /// if res != -1 {
261 /// Ok(res as usize)
262 /// } else {
263 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
264 /// // should return `WouldBlock` error.
265 /// Err(io::Error::last_os_error())
266 /// }
267 /// })?;
268 /// eprintln!("write {} bytes", n);
269 ///
270 /// // Wait until the receiver is readable...
271 ///
272 /// // Read from the receiver using a direct libc call, of course the
273 /// // `io::Read` implementation would be easier to use.
274 /// let mut buf = [0; 512];
275 /// let n = receiver.try_io(|| {
276 /// let buf_ptr = &mut buf as *mut _ as *mut _;
277 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
278 /// if res != -1 {
279 /// Ok(res as usize)
280 /// } else {
281 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
282 /// // should return `WouldBlock` error.
283 /// Err(io::Error::last_os_error())
284 /// }
285 /// })?;
286 /// eprintln!("read {} bytes", n);
287 /// # Ok(())
288 /// # }
289 /// ```
290 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
291 where
292 F: FnOnce() -> io::Result<T>,
293 {
294 self.inner.do_io(|_| f())
295 }
296}
297
298impl event::Source for Sender {
299 fn register(
300 &mut self,
301 registry: &Registry,
302 token: Token,
303 interests: Interest,
304 ) -> io::Result<()> {
305 self.inner.register(registry, token, interests)
306 }
307
308 fn reregister(
309 &mut self,
310 registry: &Registry,
311 token: Token,
312 interests: Interest,
313 ) -> io::Result<()> {
314 self.inner.reregister(registry, token, interests)
315 }
316
317 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
318 self.inner.deregister(registry)
319 }
320}
321
322impl Write for Sender {
323 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
324 self.inner.do_io(|mut sender: &File| sender.write(buf))
325 }
326
327 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
328 self.inner.do_io(|mut sender: &File| sender.write_vectored(bufs))
329 }
330
331 fn flush(&mut self) -> io::Result<()> {
332 self.inner.do_io(|mut sender: &File| sender.flush())
333 }
334}
335
336impl Write for &Sender {
337 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
338 self.inner.do_io(|mut sender: &File| sender.write(buf))
339 }
340
341 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
342 self.inner.do_io(|mut sender: &File| sender.write_vectored(bufs))
343 }
344
345 fn flush(&mut self) -> io::Result<()> {
346 self.inner.do_io(|mut sender: &File| sender.flush())
347 }
348}
349
350/// # Notes
351///
352/// The underlying pipe is **not** set to non-blocking.
353impl From<ChildStdin> for Sender {
354 fn from(stdin: ChildStdin) -> Sender {
355 // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
356 unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
357 }
358}
359
360impl FromRawFd for Sender {
361 unsafe fn from_raw_fd(fd: RawFd) -> Sender {
362 Sender {
363 inner: IoSource::new(io:File::from_raw_fd(fd)),
364 }
365 }
366}
367
368impl AsRawFd for Sender {
369 fn as_raw_fd(&self) -> RawFd {
370 self.inner.as_raw_fd()
371 }
372}
373
374impl IntoRawFd for Sender {
375 fn into_raw_fd(self) -> RawFd {
376 self.inner.into_inner().into_raw_fd()
377 }
378}
379
380/// Receiving end of an Unix pipe.
381///
382/// See [`new`] for documentation, including examples.
383#[derive(Debug)]
384pub struct Receiver {
385 inner: IoSource<File>,
386}
387
388impl Receiver {
389 /// Set the `Receiver` into or out of non-blocking mode.
390 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
391 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
392 }
393
394 /// Execute an I/O operation ensuring that the socket receives more events
395 /// if it hits a [`WouldBlock`] error.
396 ///
397 /// # Notes
398 ///
399 /// This method is required to be called for **all** I/O operations to
400 /// ensure the user will receive events once the socket is ready again after
401 /// returning a [`WouldBlock`] error.
402 ///
403 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
404 ///
405 /// # Examples
406 ///
407 /// ```
408 /// # use std::error::Error;
409 /// #
410 /// # fn main() -> Result<(), Box<dyn Error>> {
411 /// use std::io;
412 /// use std::os::unix::io::AsRawFd;
413 /// use mio::unix::pipe;
414 ///
415 /// let (sender, receiver) = pipe::new()?;
416 ///
417 /// // Wait until the sender is writable...
418 ///
419 /// // Write to the sender using a direct libc call, of course the
420 /// // `io::Write` implementation would be easier to use.
421 /// let buf = b"hello";
422 /// let n = sender.try_io(|| {
423 /// let buf_ptr = &buf as *const _ as *const _;
424 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
425 /// if res != -1 {
426 /// Ok(res as usize)
427 /// } else {
428 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
429 /// // should return `WouldBlock` error.
430 /// Err(io::Error::last_os_error())
431 /// }
432 /// })?;
433 /// eprintln!("write {} bytes", n);
434 ///
435 /// // Wait until the receiver is readable...
436 ///
437 /// // Read from the receiver using a direct libc call, of course the
438 /// // `io::Read` implementation would be easier to use.
439 /// let mut buf = [0; 512];
440 /// let n = receiver.try_io(|| {
441 /// let buf_ptr = &mut buf as *mut _ as *mut _;
442 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
443 /// if res != -1 {
444 /// Ok(res as usize)
445 /// } else {
446 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
447 /// // should return `WouldBlock` error.
448 /// Err(io::Error::last_os_error())
449 /// }
450 /// })?;
451 /// eprintln!("read {} bytes", n);
452 /// # Ok(())
453 /// # }
454 /// ```
455 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
456 where
457 F: FnOnce() -> io::Result<T>,
458 {
459 self.inner.do_io(|_| f())
460 }
461}
462
463impl event::Source for Receiver {
464 fn register(
465 &mut self,
466 registry: &Registry,
467 token: Token,
468 interests: Interest,
469 ) -> io::Result<()> {
470 self.inner.register(registry, token, interests)
471 }
472
473 fn reregister(
474 &mut self,
475 registry: &Registry,
476 token: Token,
477 interests: Interest,
478 ) -> io::Result<()> {
479 self.inner.reregister(registry, token, interests)
480 }
481
482 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
483 self.inner.deregister(registry)
484 }
485}
486
487impl Read for Receiver {
488 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
489 self.inner.do_io(|mut sender: &File| sender.read(buf))
490 }
491
492 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
493 self.inner.do_io(|mut sender: &File| sender.read_vectored(bufs))
494 }
495}
496
497impl Read for &Receiver {
498 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
499 self.inner.do_io(|mut sender: &File| sender.read(buf))
500 }
501
502 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
503 self.inner.do_io(|mut sender: &File| sender.read_vectored(bufs))
504 }
505}
506
507/// # Notes
508///
509/// The underlying pipe is **not** set to non-blocking.
510impl From<ChildStdout> for Receiver {
511 fn from(stdout: ChildStdout) -> Receiver {
512 // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
513 unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
514 }
515}
516
517/// # Notes
518///
519/// The underlying pipe is **not** set to non-blocking.
520impl From<ChildStderr> for Receiver {
521 fn from(stderr: ChildStderr) -> Receiver {
522 // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
523 unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
524 }
525}
526
527impl FromRawFd for Receiver {
528 unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
529 Receiver {
530 inner: IoSource::new(io:File::from_raw_fd(fd)),
531 }
532 }
533}
534
535impl AsRawFd for Receiver {
536 fn as_raw_fd(&self) -> RawFd {
537 self.inner.as_raw_fd()
538 }
539}
540
541impl IntoRawFd for Receiver {
542 fn into_raw_fd(self) -> RawFd {
543 self.inner.into_inner().into_raw_fd()
544 }
545}
546
547#[cfg(not(target_os = "illumos"))]
548fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
549 let value: i32 = nonblocking as libc::c_int;
550 if unsafe { libc::ioctl(fd, request:libc::FIONBIO, &value) } == -1 {
551 Err(io::Error::last_os_error())
552 } else {
553 Ok(())
554 }
555}
556
557#[cfg(target_os = "illumos")]
558fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
559 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
560 if flags < 0 {
561 return Err(io::Error::last_os_error());
562 }
563
564 let nflags = if nonblocking {
565 flags | libc::O_NONBLOCK
566 } else {
567 flags & !libc::O_NONBLOCK
568 };
569
570 if flags != nflags {
571 if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
572 return Err(io::Error::last_os_error());
573 }
574 }
575
576 Ok(())
577}
578