1 | //! Unix pipe. |
2 | //! |
3 | //! See the [`new`] function for documentation. |
4 | |
5 | use std::io; |
6 | use std::os::unix::io::RawFd; |
7 | |
8 | pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> { |
9 | let mut fds: [RawFd; 2] = [-1, -1]; |
10 | |
11 | #[cfg (any( |
12 | target_os = "android" , |
13 | target_os = "dragonfly" , |
14 | target_os = "freebsd" , |
15 | target_os = "linux" , |
16 | target_os = "netbsd" , |
17 | target_os = "openbsd" , |
18 | target_os = "illumos" , |
19 | target_os = "redox" , |
20 | target_os = "solaris" , |
21 | target_os = "vita" , |
22 | ))] |
23 | unsafe { |
24 | if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { |
25 | return Err(io::Error::last_os_error()); |
26 | } |
27 | } |
28 | |
29 | #[cfg (any( |
30 | target_os = "aix" , |
31 | target_os = "ios" , |
32 | target_os = "macos" , |
33 | target_os = "tvos" , |
34 | target_os = "watchos" , |
35 | target_os = "espidf" , |
36 | ))] |
37 | unsafe { |
38 | // For platforms that don't have `pipe2(2)` we need to manually set the |
39 | // correct flags on the file descriptor. |
40 | if libc::pipe(fds.as_mut_ptr()) != 0 { |
41 | return Err(io::Error::last_os_error()); |
42 | } |
43 | |
44 | for fd in &fds { |
45 | if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 |
46 | || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 |
47 | { |
48 | let err = io::Error::last_os_error(); |
49 | // Don't leak file descriptors. Can't handle closing error though. |
50 | let _ = libc::close(fds[0]); |
51 | let _ = libc::close(fds[1]); |
52 | return Err(err); |
53 | } |
54 | } |
55 | } |
56 | |
57 | #[cfg (not(any( |
58 | target_os = "aix" , |
59 | target_os = "android" , |
60 | target_os = "dragonfly" , |
61 | target_os = "freebsd" , |
62 | target_os = "illumos" , |
63 | target_os = "ios" , |
64 | target_os = "linux" , |
65 | target_os = "macos" , |
66 | target_os = "netbsd" , |
67 | target_os = "openbsd" , |
68 | target_os = "redox" , |
69 | target_os = "tvos" , |
70 | target_os = "watchos" , |
71 | target_os = "espidf" , |
72 | target_os = "solaris" , |
73 | target_os = "vita" , |
74 | )))] |
75 | compile_error!("unsupported target for `mio::unix::pipe`" ); |
76 | |
77 | Ok(fds) |
78 | } |
79 | |
80 | cfg_os_ext! { |
81 | use std::fs::File; |
82 | use std::io::{IoSlice, IoSliceMut, Read, Write}; |
83 | use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; |
84 | use std::process::{ChildStderr, ChildStdin, ChildStdout}; |
85 | |
86 | use crate::io_source::IoSource; |
87 | use crate::{event, Interest, Registry, Token}; |
88 | |
89 | /// Create a new non-blocking Unix pipe. |
90 | /// |
91 | /// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as |
92 | /// inter-process or thread communication channel. |
93 | /// |
94 | /// This channel may be created before forking the process and then one end used |
95 | /// in each process, e.g. the parent process has the sending end to send command |
96 | /// to the child process. |
97 | /// |
98 | /// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html |
99 | /// |
100 | /// # Events |
101 | /// |
102 | /// The [`Sender`] can be registered with [`WRITABLE`] interest to receive |
103 | /// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is |
104 | /// written to the `Sender` the `Receiver` will receive an [readable event]. |
105 | /// |
106 | /// In addition to those events, events will also be generated if the other side |
107 | /// is dropped. To check if the `Sender` is dropped you'll need to check |
108 | /// [`is_read_closed`] on events for the `Receiver`, if it returns true the |
109 | /// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it |
110 | /// returns true the `Receiver` was dropped. Also see the second example below. |
111 | /// |
112 | /// [`WRITABLE`]: Interest::WRITABLE |
113 | /// [writable events]: event::Event::is_writable |
114 | /// [`READABLE`]: Interest::READABLE |
115 | /// [readable event]: event::Event::is_readable |
116 | /// [`is_read_closed`]: event::Event::is_read_closed |
117 | /// [`is_write_closed`]: event::Event::is_write_closed |
118 | /// |
119 | /// # Deregistering |
120 | /// |
121 | /// Both `Sender` and `Receiver` will deregister themselves when dropped, |
122 | /// **iff** the file descriptors are not duplicated (via [`dup(2)`]). |
123 | /// |
124 | /// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html |
125 | /// |
126 | /// # Examples |
127 | /// |
128 | /// Simple example that writes data into the sending end and read it from the |
129 | /// receiving end. |
130 | /// |
131 | /// ``` |
132 | /// use std::io::{self, Read, Write}; |
133 | /// |
134 | /// use mio::{Poll, Events, Interest, Token}; |
135 | /// use mio::unix::pipe; |
136 | /// |
137 | /// // Unique tokens for the two ends of the channel. |
138 | /// const PIPE_RECV: Token = Token(0); |
139 | /// const PIPE_SEND: Token = Token(1); |
140 | /// |
141 | /// # fn main() -> io::Result<()> { |
142 | /// // Create our `Poll` instance and the `Events` container. |
143 | /// let mut poll = Poll::new()?; |
144 | /// let mut events = Events::with_capacity(8); |
145 | /// |
146 | /// // Create a new pipe. |
147 | /// let (mut sender, mut receiver) = pipe::new()?; |
148 | /// |
149 | /// // Register both ends of the channel. |
150 | /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; |
151 | /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; |
152 | /// |
153 | /// const MSG: &[u8; 11] = b"Hello world"; |
154 | /// |
155 | /// loop { |
156 | /// poll.poll(&mut events, None)?; |
157 | /// |
158 | /// for event in events.iter() { |
159 | /// match event.token() { |
160 | /// PIPE_SEND => sender.write(MSG) |
161 | /// .and_then(|n| if n != MSG.len() { |
162 | /// // We'll consider a short write an error in this |
163 | /// // example. NOTE: we can't use `write_all` with |
164 | /// // non-blocking I/O. |
165 | /// Err(io::ErrorKind::WriteZero.into()) |
166 | /// } else { |
167 | /// Ok(()) |
168 | /// })?, |
169 | /// PIPE_RECV => { |
170 | /// let mut buf = [0; 11]; |
171 | /// let n = receiver.read(&mut buf)?; |
172 | /// println!("received: {:?}", &buf[0..n]); |
173 | /// assert_eq!(n, MSG.len()); |
174 | /// assert_eq!(&buf, &*MSG); |
175 | /// return Ok(()); |
176 | /// }, |
177 | /// _ => unreachable!(), |
178 | /// } |
179 | /// } |
180 | /// } |
181 | /// # } |
182 | /// ``` |
183 | /// |
184 | /// Example that receives an event once the `Sender` is dropped. |
185 | /// |
186 | /// ``` |
187 | /// # use std::io; |
188 | /// # |
189 | /// # use mio::{Poll, Events, Interest, Token}; |
190 | /// # use mio::unix::pipe; |
191 | /// # |
192 | /// # const PIPE_RECV: Token = Token(0); |
193 | /// # const PIPE_SEND: Token = Token(1); |
194 | /// # |
195 | /// # fn main() -> io::Result<()> { |
196 | /// // Same setup as in the example above. |
197 | /// let mut poll = Poll::new()?; |
198 | /// let mut events = Events::with_capacity(8); |
199 | /// |
200 | /// let (mut sender, mut receiver) = pipe::new()?; |
201 | /// |
202 | /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; |
203 | /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; |
204 | /// |
205 | /// // Drop the sender. |
206 | /// drop(sender); |
207 | /// |
208 | /// poll.poll(&mut events, None)?; |
209 | /// |
210 | /// for event in events.iter() { |
211 | /// match event.token() { |
212 | /// PIPE_RECV if event.is_read_closed() => { |
213 | /// // Detected that the sender was dropped. |
214 | /// println!("Sender dropped!"); |
215 | /// return Ok(()); |
216 | /// }, |
217 | /// _ => unreachable!(), |
218 | /// } |
219 | /// } |
220 | /// # unreachable!(); |
221 | /// # } |
222 | /// ``` |
223 | pub fn new() -> io::Result<(Sender, Receiver)> { |
224 | let fds = new_raw()?; |
225 | // SAFETY: `new_raw` initialised the `fds` above. |
226 | let r = unsafe { Receiver::from_raw_fd(fds[0]) }; |
227 | let w = unsafe { Sender::from_raw_fd(fds[1]) }; |
228 | Ok((w, r)) |
229 | } |
230 | |
231 | /// Sending end of an Unix pipe. |
232 | /// |
233 | /// See [`new`] for documentation, including examples. |
234 | #[derive (Debug)] |
235 | pub struct Sender { |
236 | inner: IoSource<File>, |
237 | } |
238 | |
239 | impl Sender { |
240 | /// Set the `Sender` into or out of non-blocking mode. |
241 | pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { |
242 | set_nonblocking(self.inner.as_raw_fd(), nonblocking) |
243 | } |
244 | |
245 | /// Execute an I/O operation ensuring that the socket receives more events |
246 | /// if it hits a [`WouldBlock`] error. |
247 | /// |
248 | /// # Notes |
249 | /// |
250 | /// This method is required to be called for **all** I/O operations to |
251 | /// ensure the user will receive events once the socket is ready again after |
252 | /// returning a [`WouldBlock`] error. |
253 | /// |
254 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
255 | /// |
256 | /// # Examples |
257 | /// |
258 | /// ``` |
259 | /// # use std::error::Error; |
260 | /// # |
261 | /// # fn main() -> Result<(), Box<dyn Error>> { |
262 | /// use std::io; |
263 | /// use std::os::unix::io::AsRawFd; |
264 | /// use mio::unix::pipe; |
265 | /// |
266 | /// let (sender, receiver) = pipe::new()?; |
267 | /// |
268 | /// // Wait until the sender is writable... |
269 | /// |
270 | /// // Write to the sender using a direct libc call, of course the |
271 | /// // `io::Write` implementation would be easier to use. |
272 | /// let buf = b"hello"; |
273 | /// let n = sender.try_io(|| { |
274 | /// let buf_ptr = &buf as *const _ as *const _; |
275 | /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) }; |
276 | /// if res != -1 { |
277 | /// Ok(res as usize) |
278 | /// } else { |
279 | /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure |
280 | /// // should return `WouldBlock` error. |
281 | /// Err(io::Error::last_os_error()) |
282 | /// } |
283 | /// })?; |
284 | /// eprintln!("write {} bytes", n); |
285 | /// |
286 | /// // Wait until the receiver is readable... |
287 | /// |
288 | /// // Read from the receiver using a direct libc call, of course the |
289 | /// // `io::Read` implementation would be easier to use. |
290 | /// let mut buf = [0; 512]; |
291 | /// let n = receiver.try_io(|| { |
292 | /// let buf_ptr = &mut buf as *mut _ as *mut _; |
293 | /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) }; |
294 | /// if res != -1 { |
295 | /// Ok(res as usize) |
296 | /// } else { |
297 | /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure |
298 | /// // should return `WouldBlock` error. |
299 | /// Err(io::Error::last_os_error()) |
300 | /// } |
301 | /// })?; |
302 | /// eprintln!("read {} bytes", n); |
303 | /// # Ok(()) |
304 | /// # } |
305 | /// ``` |
306 | pub fn try_io<F, T>(&self, f: F) -> io::Result<T> |
307 | where |
308 | F: FnOnce() -> io::Result<T>, |
309 | { |
310 | self.inner.do_io(|_| f()) |
311 | } |
312 | } |
313 | |
314 | impl event::Source for Sender { |
315 | fn register( |
316 | &mut self, |
317 | registry: &Registry, |
318 | token: Token, |
319 | interests: Interest, |
320 | ) -> io::Result<()> { |
321 | self.inner.register(registry, token, interests) |
322 | } |
323 | |
324 | fn reregister( |
325 | &mut self, |
326 | registry: &Registry, |
327 | token: Token, |
328 | interests: Interest, |
329 | ) -> io::Result<()> { |
330 | self.inner.reregister(registry, token, interests) |
331 | } |
332 | |
333 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
334 | self.inner.deregister(registry) |
335 | } |
336 | } |
337 | |
338 | impl Write for Sender { |
339 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
340 | self.inner.do_io(|mut sender| sender.write(buf)) |
341 | } |
342 | |
343 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
344 | self.inner.do_io(|mut sender| sender.write_vectored(bufs)) |
345 | } |
346 | |
347 | fn flush(&mut self) -> io::Result<()> { |
348 | self.inner.do_io(|mut sender| sender.flush()) |
349 | } |
350 | } |
351 | |
352 | impl Write for &Sender { |
353 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
354 | self.inner.do_io(|mut sender| sender.write(buf)) |
355 | } |
356 | |
357 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
358 | self.inner.do_io(|mut sender| sender.write_vectored(bufs)) |
359 | } |
360 | |
361 | fn flush(&mut self) -> io::Result<()> { |
362 | self.inner.do_io(|mut sender| sender.flush()) |
363 | } |
364 | } |
365 | |
366 | /// # Notes |
367 | /// |
368 | /// The underlying pipe is **not** set to non-blocking. |
369 | impl From<ChildStdin> for Sender { |
370 | fn from(stdin: ChildStdin) -> Sender { |
371 | // Safety: `ChildStdin` is guaranteed to be a valid file descriptor. |
372 | unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) } |
373 | } |
374 | } |
375 | |
376 | impl FromRawFd for Sender { |
377 | unsafe fn from_raw_fd(fd: RawFd) -> Sender { |
378 | Sender { |
379 | inner: IoSource::new(File::from_raw_fd(fd)), |
380 | } |
381 | } |
382 | } |
383 | |
384 | impl AsRawFd for Sender { |
385 | fn as_raw_fd(&self) -> RawFd { |
386 | self.inner.as_raw_fd() |
387 | } |
388 | } |
389 | |
390 | impl IntoRawFd for Sender { |
391 | fn into_raw_fd(self) -> RawFd { |
392 | self.inner.into_inner().into_raw_fd() |
393 | } |
394 | } |
395 | |
396 | /// Receiving end of an Unix pipe. |
397 | /// |
398 | /// See [`new`] for documentation, including examples. |
399 | #[derive (Debug)] |
400 | pub struct Receiver { |
401 | inner: IoSource<File>, |
402 | } |
403 | |
404 | impl Receiver { |
405 | /// Set the `Receiver` into or out of non-blocking mode. |
406 | pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { |
407 | set_nonblocking(self.inner.as_raw_fd(), nonblocking) |
408 | } |
409 | |
410 | /// Execute an I/O operation ensuring that the socket receives more events |
411 | /// if it hits a [`WouldBlock`] error. |
412 | /// |
413 | /// # Notes |
414 | /// |
415 | /// This method is required to be called for **all** I/O operations to |
416 | /// ensure the user will receive events once the socket is ready again after |
417 | /// returning a [`WouldBlock`] error. |
418 | /// |
419 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
420 | /// |
421 | /// # Examples |
422 | /// |
423 | /// ``` |
424 | /// # use std::error::Error; |
425 | /// # |
426 | /// # fn main() -> Result<(), Box<dyn Error>> { |
427 | /// use std::io; |
428 | /// use std::os::unix::io::AsRawFd; |
429 | /// use mio::unix::pipe; |
430 | /// |
431 | /// let (sender, receiver) = pipe::new()?; |
432 | /// |
433 | /// // Wait until the sender is writable... |
434 | /// |
435 | /// // Write to the sender using a direct libc call, of course the |
436 | /// // `io::Write` implementation would be easier to use. |
437 | /// let buf = b"hello"; |
438 | /// let n = sender.try_io(|| { |
439 | /// let buf_ptr = &buf as *const _ as *const _; |
440 | /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) }; |
441 | /// if res != -1 { |
442 | /// Ok(res as usize) |
443 | /// } else { |
444 | /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure |
445 | /// // should return `WouldBlock` error. |
446 | /// Err(io::Error::last_os_error()) |
447 | /// } |
448 | /// })?; |
449 | /// eprintln!("write {} bytes", n); |
450 | /// |
451 | /// // Wait until the receiver is readable... |
452 | /// |
453 | /// // Read from the receiver using a direct libc call, of course the |
454 | /// // `io::Read` implementation would be easier to use. |
455 | /// let mut buf = [0; 512]; |
456 | /// let n = receiver.try_io(|| { |
457 | /// let buf_ptr = &mut buf as *mut _ as *mut _; |
458 | /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) }; |
459 | /// if res != -1 { |
460 | /// Ok(res as usize) |
461 | /// } else { |
462 | /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure |
463 | /// // should return `WouldBlock` error. |
464 | /// Err(io::Error::last_os_error()) |
465 | /// } |
466 | /// })?; |
467 | /// eprintln!("read {} bytes", n); |
468 | /// # Ok(()) |
469 | /// # } |
470 | /// ``` |
471 | pub fn try_io<F, T>(&self, f: F) -> io::Result<T> |
472 | where |
473 | F: FnOnce() -> io::Result<T>, |
474 | { |
475 | self.inner.do_io(|_| f()) |
476 | } |
477 | } |
478 | |
479 | impl event::Source for Receiver { |
480 | fn register( |
481 | &mut self, |
482 | registry: &Registry, |
483 | token: Token, |
484 | interests: Interest, |
485 | ) -> io::Result<()> { |
486 | self.inner.register(registry, token, interests) |
487 | } |
488 | |
489 | fn reregister( |
490 | &mut self, |
491 | registry: &Registry, |
492 | token: Token, |
493 | interests: Interest, |
494 | ) -> io::Result<()> { |
495 | self.inner.reregister(registry, token, interests) |
496 | } |
497 | |
498 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
499 | self.inner.deregister(registry) |
500 | } |
501 | } |
502 | |
503 | impl Read for Receiver { |
504 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
505 | self.inner.do_io(|mut sender| sender.read(buf)) |
506 | } |
507 | |
508 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { |
509 | self.inner.do_io(|mut sender| sender.read_vectored(bufs)) |
510 | } |
511 | } |
512 | |
513 | impl Read for &Receiver { |
514 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
515 | self.inner.do_io(|mut sender| sender.read(buf)) |
516 | } |
517 | |
518 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { |
519 | self.inner.do_io(|mut sender| sender.read_vectored(bufs)) |
520 | } |
521 | } |
522 | |
523 | /// # Notes |
524 | /// |
525 | /// The underlying pipe is **not** set to non-blocking. |
526 | impl From<ChildStdout> for Receiver { |
527 | fn from(stdout: ChildStdout) -> Receiver { |
528 | // Safety: `ChildStdout` is guaranteed to be a valid file descriptor. |
529 | unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) } |
530 | } |
531 | } |
532 | |
533 | /// # Notes |
534 | /// |
535 | /// The underlying pipe is **not** set to non-blocking. |
536 | impl From<ChildStderr> for Receiver { |
537 | fn from(stderr: ChildStderr) -> Receiver { |
538 | // Safety: `ChildStderr` is guaranteed to be a valid file descriptor. |
539 | unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) } |
540 | } |
541 | } |
542 | |
543 | impl FromRawFd for Receiver { |
544 | unsafe fn from_raw_fd(fd: RawFd) -> Receiver { |
545 | Receiver { |
546 | inner: IoSource::new(File::from_raw_fd(fd)), |
547 | } |
548 | } |
549 | } |
550 | |
551 | impl AsRawFd for Receiver { |
552 | fn as_raw_fd(&self) -> RawFd { |
553 | self.inner.as_raw_fd() |
554 | } |
555 | } |
556 | |
557 | impl IntoRawFd for Receiver { |
558 | fn into_raw_fd(self) -> RawFd { |
559 | self.inner.into_inner().into_raw_fd() |
560 | } |
561 | } |
562 | |
563 | #[cfg (not(any(target_os = "illumos" , target_os = "solaris" , target_os = "vita" )))] |
564 | fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { |
565 | let value = nonblocking as libc::c_int; |
566 | if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 { |
567 | Err(io::Error::last_os_error()) |
568 | } else { |
569 | Ok(()) |
570 | } |
571 | } |
572 | |
573 | #[cfg (any(target_os = "illumos" , target_os = "solaris" , target_os = "vita" ))] |
574 | fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { |
575 | let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; |
576 | if flags < 0 { |
577 | return Err(io::Error::last_os_error()); |
578 | } |
579 | |
580 | let nflags = if nonblocking { |
581 | flags | libc::O_NONBLOCK |
582 | } else { |
583 | flags & !libc::O_NONBLOCK |
584 | }; |
585 | |
586 | if flags != nflags { |
587 | if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 { |
588 | return Err(io::Error::last_os_error()); |
589 | } |
590 | } |
591 | |
592 | Ok(()) |
593 | } |
594 | } // `cfg_os_ext!`. |
595 | |