1 | //! Unix pipe. |
2 | //! |
3 | //! See the [`new`] function for documentation. |
4 | |
5 | use std::fs::File; |
6 | use std::io::{self, IoSlice, IoSliceMut, Read, Write}; |
7 | use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; |
8 | use std::process::{ChildStderr, ChildStdin, ChildStdout}; |
9 | |
10 | use crate::io_source::IoSource; |
11 | use crate::{event, Interest, Registry, Token}; |
12 | |
13 | /// Create a new non-blocking Unix pipe. |
14 | /// |
15 | /// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as |
16 | /// inter-process or thread communication channel. |
17 | /// |
18 | /// This channel may be created before forking the process and then one end used |
19 | /// in each process, e.g. the parent process has the sending end to send command |
20 | /// to the child process. |
21 | /// |
22 | /// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html |
23 | /// |
24 | /// # Events |
25 | /// |
26 | /// The [`Sender`] can be registered with [`WRITABLE`] interest to receive |
27 | /// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is |
28 | /// written to the `Sender` the `Receiver` will receive an [readable event]. |
29 | /// |
30 | /// In addition to those events, events will also be generated if the other side |
31 | /// is dropped. To check if the `Sender` is dropped you'll need to check |
32 | /// [`is_read_closed`] on events for the `Receiver`, if it returns true the |
33 | /// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it |
34 | /// returns true the `Receiver` was dropped. Also see the second example below. |
35 | /// |
36 | /// [`WRITABLE`]: Interest::WRITABLE |
37 | /// [writable events]: event::Event::is_writable |
38 | /// [`READABLE`]: Interest::READABLE |
39 | /// [readable event]: event::Event::is_readable |
40 | /// [`is_read_closed`]: event::Event::is_read_closed |
41 | /// [`is_write_closed`]: event::Event::is_write_closed |
42 | /// |
43 | /// # Deregistering |
44 | /// |
45 | /// Both `Sender` and `Receiver` will deregister themselves when dropped, |
46 | /// **iff** the file descriptors are not duplicated (via [`dup(2)`]). |
47 | /// |
48 | /// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html |
49 | /// |
50 | /// # Examples |
51 | /// |
52 | /// Simple example that writes data into the sending end and read it from the |
53 | /// receiving end. |
54 | /// |
55 | /// ``` |
56 | /// use std::io::{self, Read, Write}; |
57 | /// |
58 | /// use mio::{Poll, Events, Interest, Token}; |
59 | /// use mio::unix::pipe; |
60 | /// |
61 | /// // Unique tokens for the two ends of the channel. |
62 | /// const PIPE_RECV: Token = Token(0); |
63 | /// const PIPE_SEND: Token = Token(1); |
64 | /// |
65 | /// # fn main() -> io::Result<()> { |
66 | /// // Create our `Poll` instance and the `Events` container. |
67 | /// let mut poll = Poll::new()?; |
68 | /// let mut events = Events::with_capacity(8); |
69 | /// |
70 | /// // Create a new pipe. |
71 | /// let (mut sender, mut receiver) = pipe::new()?; |
72 | /// |
73 | /// // Register both ends of the channel. |
74 | /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; |
75 | /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; |
76 | /// |
77 | /// const MSG: &[u8; 11] = b"Hello world" ; |
78 | /// |
79 | /// loop { |
80 | /// poll.poll(&mut events, None)?; |
81 | /// |
82 | /// for event in events.iter() { |
83 | /// match event.token() { |
84 | /// PIPE_SEND => sender.write(MSG) |
85 | /// .and_then(|n| if n != MSG.len() { |
86 | /// // We'll consider a short write an error in this |
87 | /// // example. NOTE: we can't use `write_all` with |
88 | /// // non-blocking I/O. |
89 | /// Err(io::ErrorKind::WriteZero.into()) |
90 | /// } else { |
91 | /// Ok(()) |
92 | /// })?, |
93 | /// PIPE_RECV => { |
94 | /// let mut buf = [0; 11]; |
95 | /// let n = receiver.read(&mut buf)?; |
96 | /// println!("received: {:?}" , &buf[0..n]); |
97 | /// assert_eq!(n, MSG.len()); |
98 | /// assert_eq!(&buf, &*MSG); |
99 | /// return Ok(()); |
100 | /// }, |
101 | /// _ => unreachable!(), |
102 | /// } |
103 | /// } |
104 | /// } |
105 | /// # } |
106 | /// ``` |
107 | /// |
108 | /// Example that receives an event once the `Sender` is dropped. |
109 | /// |
110 | /// ``` |
111 | /// # use std::io; |
112 | /// # |
113 | /// # use mio::{Poll, Events, Interest, Token}; |
114 | /// # use mio::unix::pipe; |
115 | /// # |
116 | /// # const PIPE_RECV: Token = Token(0); |
117 | /// # const PIPE_SEND: Token = Token(1); |
118 | /// # |
119 | /// # fn main() -> io::Result<()> { |
120 | /// // Same setup as in the example above. |
121 | /// let mut poll = Poll::new()?; |
122 | /// let mut events = Events::with_capacity(8); |
123 | /// |
124 | /// let (mut sender, mut receiver) = pipe::new()?; |
125 | /// |
126 | /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; |
127 | /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; |
128 | /// |
129 | /// // Drop the sender. |
130 | /// drop(sender); |
131 | /// |
132 | /// poll.poll(&mut events, None)?; |
133 | /// |
134 | /// for event in events.iter() { |
135 | /// match event.token() { |
136 | /// PIPE_RECV if event.is_read_closed() => { |
137 | /// // Detected that the sender was dropped. |
138 | /// println!("Sender dropped!" ); |
139 | /// return Ok(()); |
140 | /// }, |
141 | /// _ => unreachable!(), |
142 | /// } |
143 | /// } |
144 | /// # unreachable!(); |
145 | /// # } |
146 | /// ``` |
147 | pub fn new() -> io::Result<(Sender, Receiver)> { |
148 | let mut fds: [RawFd; 2] = [-1, -1]; |
149 | |
150 | #[cfg (any( |
151 | target_os = "android" , |
152 | target_os = "dragonfly" , |
153 | target_os = "freebsd" , |
154 | target_os = "linux" , |
155 | target_os = "netbsd" , |
156 | target_os = "openbsd" , |
157 | target_os = "illumos" , |
158 | target_os = "redox" , |
159 | ))] |
160 | unsafe { |
161 | if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { |
162 | return Err(io::Error::last_os_error()); |
163 | } |
164 | } |
165 | |
166 | #[cfg (any( |
167 | target_os = "ios" , |
168 | target_os = "macos" , |
169 | target_os = "tvos" , |
170 | target_os = "watchos" , |
171 | ))] |
172 | unsafe { |
173 | // For platforms that don't have `pipe2(2)` we need to manually set the |
174 | // correct flags on the file descriptor. |
175 | if libc::pipe(fds.as_mut_ptr()) != 0 { |
176 | return Err(io::Error::last_os_error()); |
177 | } |
178 | |
179 | for fd in &fds { |
180 | if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 |
181 | || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 |
182 | { |
183 | let err = io::Error::last_os_error(); |
184 | // Don't leak file descriptors. Can't handle closing error though. |
185 | let _ = libc::close(fds[0]); |
186 | let _ = libc::close(fds[1]); |
187 | return Err(err); |
188 | } |
189 | } |
190 | } |
191 | |
192 | #[cfg (not(any( |
193 | target_os = "android" , |
194 | target_os = "dragonfly" , |
195 | target_os = "freebsd" , |
196 | target_os = "illumos" , |
197 | target_os = "ios" , |
198 | target_os = "linux" , |
199 | target_os = "macos" , |
200 | target_os = "netbsd" , |
201 | target_os = "openbsd" , |
202 | target_os = "redox" , |
203 | target_os = "tvos" , |
204 | target_os = "watchos" , |
205 | )))] |
206 | compile_error!("unsupported target for `mio::unix::pipe`" ); |
207 | |
208 | // SAFETY: we just initialised the `fds` above. |
209 | let r = unsafe { Receiver::from_raw_fd(fds[0]) }; |
210 | let w = unsafe { Sender::from_raw_fd(fds[1]) }; |
211 | |
212 | Ok((w, r)) |
213 | } |
214 | |
215 | /// Sending end of an Unix pipe. |
216 | /// |
217 | /// See [`new`] for documentation, including examples. |
218 | #[derive (Debug)] |
219 | pub struct Sender { |
220 | inner: IoSource<File>, |
221 | } |
222 | |
223 | impl Sender { |
224 | /// Set the `Sender` into or out of non-blocking mode. |
225 | pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { |
226 | set_nonblocking(self.inner.as_raw_fd(), nonblocking) |
227 | } |
228 | |
229 | /// Execute an I/O operation ensuring that the socket receives more events |
230 | /// if it hits a [`WouldBlock`] error. |
231 | /// |
232 | /// # Notes |
233 | /// |
234 | /// This method is required to be called for **all** I/O operations to |
235 | /// ensure the user will receive events once the socket is ready again after |
236 | /// returning a [`WouldBlock`] error. |
237 | /// |
238 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
239 | /// |
240 | /// # Examples |
241 | /// |
242 | /// ``` |
243 | /// # use std::error::Error; |
244 | /// # |
245 | /// # fn main() -> Result<(), Box<dyn Error>> { |
246 | /// use std::io; |
247 | /// use std::os::unix::io::AsRawFd; |
248 | /// use mio::unix::pipe; |
249 | /// |
250 | /// let (sender, receiver) = pipe::new()?; |
251 | /// |
252 | /// // Wait until the sender is writable... |
253 | /// |
254 | /// // Write to the sender using a direct libc call, of course the |
255 | /// // `io::Write` implementation would be easier to use. |
256 | /// let buf = b"hello" ; |
257 | /// let n = sender.try_io(|| { |
258 | /// let buf_ptr = &buf as *const _ as *const _; |
259 | /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) }; |
260 | /// if res != -1 { |
261 | /// Ok(res as usize) |
262 | /// } else { |
263 | /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure |
264 | /// // should return `WouldBlock` error. |
265 | /// Err(io::Error::last_os_error()) |
266 | /// } |
267 | /// })?; |
268 | /// eprintln!("write {} bytes" , n); |
269 | /// |
270 | /// // Wait until the receiver is readable... |
271 | /// |
272 | /// // Read from the receiver using a direct libc call, of course the |
273 | /// // `io::Read` implementation would be easier to use. |
274 | /// let mut buf = [0; 512]; |
275 | /// let n = receiver.try_io(|| { |
276 | /// let buf_ptr = &mut buf as *mut _ as *mut _; |
277 | /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) }; |
278 | /// if res != -1 { |
279 | /// Ok(res as usize) |
280 | /// } else { |
281 | /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure |
282 | /// // should return `WouldBlock` error. |
283 | /// Err(io::Error::last_os_error()) |
284 | /// } |
285 | /// })?; |
286 | /// eprintln!("read {} bytes" , n); |
287 | /// # Ok(()) |
288 | /// # } |
289 | /// ``` |
290 | pub fn try_io<F, T>(&self, f: F) -> io::Result<T> |
291 | where |
292 | F: FnOnce() -> io::Result<T>, |
293 | { |
294 | self.inner.do_io(|_| f()) |
295 | } |
296 | } |
297 | |
298 | impl event::Source for Sender { |
299 | fn register( |
300 | &mut self, |
301 | registry: &Registry, |
302 | token: Token, |
303 | interests: Interest, |
304 | ) -> io::Result<()> { |
305 | self.inner.register(registry, token, interests) |
306 | } |
307 | |
308 | fn reregister( |
309 | &mut self, |
310 | registry: &Registry, |
311 | token: Token, |
312 | interests: Interest, |
313 | ) -> io::Result<()> { |
314 | self.inner.reregister(registry, token, interests) |
315 | } |
316 | |
317 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
318 | self.inner.deregister(registry) |
319 | } |
320 | } |
321 | |
322 | impl Write for Sender { |
323 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
324 | self.inner.do_io(|mut sender: &File| sender.write(buf)) |
325 | } |
326 | |
327 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
328 | self.inner.do_io(|mut sender: &File| sender.write_vectored(bufs)) |
329 | } |
330 | |
331 | fn flush(&mut self) -> io::Result<()> { |
332 | self.inner.do_io(|mut sender: &File| sender.flush()) |
333 | } |
334 | } |
335 | |
336 | impl Write for &Sender { |
337 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
338 | self.inner.do_io(|mut sender: &File| sender.write(buf)) |
339 | } |
340 | |
341 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
342 | self.inner.do_io(|mut sender: &File| sender.write_vectored(bufs)) |
343 | } |
344 | |
345 | fn flush(&mut self) -> io::Result<()> { |
346 | self.inner.do_io(|mut sender: &File| sender.flush()) |
347 | } |
348 | } |
349 | |
350 | /// # Notes |
351 | /// |
352 | /// The underlying pipe is **not** set to non-blocking. |
353 | impl From<ChildStdin> for Sender { |
354 | fn from(stdin: ChildStdin) -> Sender { |
355 | // Safety: `ChildStdin` is guaranteed to be a valid file descriptor. |
356 | unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) } |
357 | } |
358 | } |
359 | |
360 | impl FromRawFd for Sender { |
361 | unsafe fn from_raw_fd(fd: RawFd) -> Sender { |
362 | Sender { |
363 | inner: IoSource::new(io:File::from_raw_fd(fd)), |
364 | } |
365 | } |
366 | } |
367 | |
368 | impl AsRawFd for Sender { |
369 | fn as_raw_fd(&self) -> RawFd { |
370 | self.inner.as_raw_fd() |
371 | } |
372 | } |
373 | |
374 | impl IntoRawFd for Sender { |
375 | fn into_raw_fd(self) -> RawFd { |
376 | self.inner.into_inner().into_raw_fd() |
377 | } |
378 | } |
379 | |
380 | /// Receiving end of an Unix pipe. |
381 | /// |
382 | /// See [`new`] for documentation, including examples. |
383 | #[derive (Debug)] |
384 | pub struct Receiver { |
385 | inner: IoSource<File>, |
386 | } |
387 | |
388 | impl Receiver { |
389 | /// Set the `Receiver` into or out of non-blocking mode. |
390 | pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { |
391 | set_nonblocking(self.inner.as_raw_fd(), nonblocking) |
392 | } |
393 | |
394 | /// Execute an I/O operation ensuring that the socket receives more events |
395 | /// if it hits a [`WouldBlock`] error. |
396 | /// |
397 | /// # Notes |
398 | /// |
399 | /// This method is required to be called for **all** I/O operations to |
400 | /// ensure the user will receive events once the socket is ready again after |
401 | /// returning a [`WouldBlock`] error. |
402 | /// |
403 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
404 | /// |
405 | /// # Examples |
406 | /// |
407 | /// ``` |
408 | /// # use std::error::Error; |
409 | /// # |
410 | /// # fn main() -> Result<(), Box<dyn Error>> { |
411 | /// use std::io; |
412 | /// use std::os::unix::io::AsRawFd; |
413 | /// use mio::unix::pipe; |
414 | /// |
415 | /// let (sender, receiver) = pipe::new()?; |
416 | /// |
417 | /// // Wait until the sender is writable... |
418 | /// |
419 | /// // Write to the sender using a direct libc call, of course the |
420 | /// // `io::Write` implementation would be easier to use. |
421 | /// let buf = b"hello" ; |
422 | /// let n = sender.try_io(|| { |
423 | /// let buf_ptr = &buf as *const _ as *const _; |
424 | /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) }; |
425 | /// if res != -1 { |
426 | /// Ok(res as usize) |
427 | /// } else { |
428 | /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure |
429 | /// // should return `WouldBlock` error. |
430 | /// Err(io::Error::last_os_error()) |
431 | /// } |
432 | /// })?; |
433 | /// eprintln!("write {} bytes" , n); |
434 | /// |
435 | /// // Wait until the receiver is readable... |
436 | /// |
437 | /// // Read from the receiver using a direct libc call, of course the |
438 | /// // `io::Read` implementation would be easier to use. |
439 | /// let mut buf = [0; 512]; |
440 | /// let n = receiver.try_io(|| { |
441 | /// let buf_ptr = &mut buf as *mut _ as *mut _; |
442 | /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) }; |
443 | /// if res != -1 { |
444 | /// Ok(res as usize) |
445 | /// } else { |
446 | /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure |
447 | /// // should return `WouldBlock` error. |
448 | /// Err(io::Error::last_os_error()) |
449 | /// } |
450 | /// })?; |
451 | /// eprintln!("read {} bytes" , n); |
452 | /// # Ok(()) |
453 | /// # } |
454 | /// ``` |
455 | pub fn try_io<F, T>(&self, f: F) -> io::Result<T> |
456 | where |
457 | F: FnOnce() -> io::Result<T>, |
458 | { |
459 | self.inner.do_io(|_| f()) |
460 | } |
461 | } |
462 | |
463 | impl event::Source for Receiver { |
464 | fn register( |
465 | &mut self, |
466 | registry: &Registry, |
467 | token: Token, |
468 | interests: Interest, |
469 | ) -> io::Result<()> { |
470 | self.inner.register(registry, token, interests) |
471 | } |
472 | |
473 | fn reregister( |
474 | &mut self, |
475 | registry: &Registry, |
476 | token: Token, |
477 | interests: Interest, |
478 | ) -> io::Result<()> { |
479 | self.inner.reregister(registry, token, interests) |
480 | } |
481 | |
482 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
483 | self.inner.deregister(registry) |
484 | } |
485 | } |
486 | |
487 | impl Read for Receiver { |
488 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
489 | self.inner.do_io(|mut sender: &File| sender.read(buf)) |
490 | } |
491 | |
492 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { |
493 | self.inner.do_io(|mut sender: &File| sender.read_vectored(bufs)) |
494 | } |
495 | } |
496 | |
497 | impl Read for &Receiver { |
498 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
499 | self.inner.do_io(|mut sender: &File| sender.read(buf)) |
500 | } |
501 | |
502 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { |
503 | self.inner.do_io(|mut sender: &File| sender.read_vectored(bufs)) |
504 | } |
505 | } |
506 | |
507 | /// # Notes |
508 | /// |
509 | /// The underlying pipe is **not** set to non-blocking. |
510 | impl From<ChildStdout> for Receiver { |
511 | fn from(stdout: ChildStdout) -> Receiver { |
512 | // Safety: `ChildStdout` is guaranteed to be a valid file descriptor. |
513 | unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) } |
514 | } |
515 | } |
516 | |
517 | /// # Notes |
518 | /// |
519 | /// The underlying pipe is **not** set to non-blocking. |
520 | impl From<ChildStderr> for Receiver { |
521 | fn from(stderr: ChildStderr) -> Receiver { |
522 | // Safety: `ChildStderr` is guaranteed to be a valid file descriptor. |
523 | unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) } |
524 | } |
525 | } |
526 | |
527 | impl FromRawFd for Receiver { |
528 | unsafe fn from_raw_fd(fd: RawFd) -> Receiver { |
529 | Receiver { |
530 | inner: IoSource::new(io:File::from_raw_fd(fd)), |
531 | } |
532 | } |
533 | } |
534 | |
535 | impl AsRawFd for Receiver { |
536 | fn as_raw_fd(&self) -> RawFd { |
537 | self.inner.as_raw_fd() |
538 | } |
539 | } |
540 | |
541 | impl IntoRawFd for Receiver { |
542 | fn into_raw_fd(self) -> RawFd { |
543 | self.inner.into_inner().into_raw_fd() |
544 | } |
545 | } |
546 | |
547 | #[cfg (not(target_os = "illumos" ))] |
548 | fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { |
549 | let value: i32 = nonblocking as libc::c_int; |
550 | if unsafe { libc::ioctl(fd, request:libc::FIONBIO, &value) } == -1 { |
551 | Err(io::Error::last_os_error()) |
552 | } else { |
553 | Ok(()) |
554 | } |
555 | } |
556 | |
557 | #[cfg (target_os = "illumos" )] |
558 | fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { |
559 | let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; |
560 | if flags < 0 { |
561 | return Err(io::Error::last_os_error()); |
562 | } |
563 | |
564 | let nflags = if nonblocking { |
565 | flags | libc::O_NONBLOCK |
566 | } else { |
567 | flags & !libc::O_NONBLOCK |
568 | }; |
569 | |
570 | if flags != nflags { |
571 | if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 { |
572 | return Err(io::Error::last_os_error()); |
573 | } |
574 | } |
575 | |
576 | Ok(()) |
577 | } |
578 | |