1use crate::io_source::IoSource;
2use crate::{event, sys, Interest, Registry, Token};
3
4use std::fmt;
5use std::io::{self, IoSlice, IoSliceMut, Read, Write};
6use std::net::Shutdown;
7use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8use std::os::unix::net;
9use std::path::Path;
10
11/// A non-blocking Unix stream socket.
12pub struct UnixStream {
13 inner: IoSource<net::UnixStream>,
14}
15
16impl UnixStream {
17 /// Connects to the socket named by `path`.
18 ///
19 /// This may return a `WouldBlock` in which case the socket connection
20 /// cannot be completed immediately. Usually it means the backlog is full.
21 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
22 sys::uds::stream::connect(path.as_ref()).map(UnixStream::from_std)
23 }
24
25 /// Creates a new `UnixStream` from a standard `net::UnixStream`.
26 ///
27 /// This function is intended to be used to wrap a Unix stream from the
28 /// standard library in the Mio equivalent. The conversion assumes nothing
29 /// about the underlying stream; it is left up to the user to set it in
30 /// non-blocking mode.
31 ///
32 /// # Note
33 ///
34 /// The Unix stream here will not have `connect` called on it, so it
35 /// should already be connected via some other means (be it manually, or
36 /// the standard library).
37 pub fn from_std(stream: net::UnixStream) -> UnixStream {
38 UnixStream {
39 inner: IoSource::new(stream),
40 }
41 }
42
43 /// Creates an unnamed pair of connected sockets.
44 ///
45 /// Returns two `UnixStream`s which are connected to each other.
46 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
47 sys::uds::stream::pair().map(|(stream1, stream2)| {
48 (UnixStream::from_std(stream1), UnixStream::from_std(stream2))
49 })
50 }
51
52 /// Returns the socket address of the local half of this connection.
53 pub fn local_addr(&self) -> io::Result<sys::SocketAddr> {
54 sys::uds::stream::local_addr(&self.inner)
55 }
56
57 /// Returns the socket address of the remote half of this connection.
58 pub fn peer_addr(&self) -> io::Result<sys::SocketAddr> {
59 sys::uds::stream::peer_addr(&self.inner)
60 }
61
62 /// Returns the value of the `SO_ERROR` option.
63 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
64 self.inner.take_error()
65 }
66
67 /// Shuts down the read, write, or both halves of this connection.
68 ///
69 /// This function will cause all pending and future I/O calls on the
70 /// specified portions to immediately return with an appropriate value
71 /// (see the documentation of `Shutdown`).
72 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
73 self.inner.shutdown(how)
74 }
75
76 /// Execute an I/O operation ensuring that the socket receives more events
77 /// if it hits a [`WouldBlock`] error.
78 ///
79 /// # Notes
80 ///
81 /// This method is required to be called for **all** I/O operations to
82 /// ensure the user will receive events once the socket is ready again after
83 /// returning a [`WouldBlock`] error.
84 ///
85 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
86 ///
87 /// # Examples
88 ///
89 /// ```
90 /// # use std::error::Error;
91 /// #
92 /// # fn main() -> Result<(), Box<dyn Error>> {
93 /// use std::io;
94 /// use std::os::unix::io::AsRawFd;
95 /// use mio::net::UnixStream;
96 ///
97 /// let (stream1, stream2) = UnixStream::pair()?;
98 ///
99 /// // Wait until the stream is writable...
100 ///
101 /// // Write to the stream using a direct libc call, of course the
102 /// // `io::Write` implementation would be easier to use.
103 /// let buf = b"hello";
104 /// let n = stream1.try_io(|| {
105 /// let buf_ptr = &buf as *const _ as *const _;
106 /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) };
107 /// if res != -1 {
108 /// Ok(res as usize)
109 /// } else {
110 /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure
111 /// // should return `WouldBlock` error.
112 /// Err(io::Error::last_os_error())
113 /// }
114 /// })?;
115 /// eprintln!("write {} bytes", n);
116 ///
117 /// // Wait until the stream is readable...
118 ///
119 /// // Read from the stream using a direct libc call, of course the
120 /// // `io::Read` implementation would be easier to use.
121 /// let mut buf = [0; 512];
122 /// let n = stream2.try_io(|| {
123 /// let buf_ptr = &mut buf as *mut _ as *mut _;
124 /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) };
125 /// if res != -1 {
126 /// Ok(res as usize)
127 /// } else {
128 /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
129 /// // should return `WouldBlock` error.
130 /// Err(io::Error::last_os_error())
131 /// }
132 /// })?;
133 /// eprintln!("read {} bytes", n);
134 /// # Ok(())
135 /// # }
136 /// ```
137 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
138 where
139 F: FnOnce() -> io::Result<T>,
140 {
141 self.inner.do_io(|_| f())
142 }
143}
144
145impl Read for UnixStream {
146 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
147 self.inner.do_io(|mut inner: &UnixStream| inner.read(buf))
148 }
149
150 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
151 self.inner.do_io(|mut inner: &UnixStream| inner.read_vectored(bufs))
152 }
153}
154
155impl<'a> Read for &'a UnixStream {
156 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
157 self.inner.do_io(|mut inner: &UnixStream| inner.read(buf))
158 }
159
160 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
161 self.inner.do_io(|mut inner: &UnixStream| inner.read_vectored(bufs))
162 }
163}
164
165impl Write for UnixStream {
166 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
167 self.inner.do_io(|mut inner: &UnixStream| inner.write(buf))
168 }
169
170 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
171 self.inner.do_io(|mut inner: &UnixStream| inner.write_vectored(bufs))
172 }
173
174 fn flush(&mut self) -> io::Result<()> {
175 self.inner.do_io(|mut inner: &UnixStream| inner.flush())
176 }
177}
178
179impl<'a> Write for &'a UnixStream {
180 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
181 self.inner.do_io(|mut inner: &UnixStream| inner.write(buf))
182 }
183
184 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
185 self.inner.do_io(|mut inner: &UnixStream| inner.write_vectored(bufs))
186 }
187
188 fn flush(&mut self) -> io::Result<()> {
189 self.inner.do_io(|mut inner: &UnixStream| inner.flush())
190 }
191}
192
193impl event::Source for UnixStream {
194 fn register(
195 &mut self,
196 registry: &Registry,
197 token: Token,
198 interests: Interest,
199 ) -> io::Result<()> {
200 self.inner.register(registry, token, interests)
201 }
202
203 fn reregister(
204 &mut self,
205 registry: &Registry,
206 token: Token,
207 interests: Interest,
208 ) -> io::Result<()> {
209 self.inner.reregister(registry, token, interests)
210 }
211
212 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
213 self.inner.deregister(registry)
214 }
215}
216
217impl fmt::Debug for UnixStream {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 self.inner.fmt(f)
220 }
221}
222
223impl IntoRawFd for UnixStream {
224 fn into_raw_fd(self) -> RawFd {
225 self.inner.into_inner().into_raw_fd()
226 }
227}
228
229impl AsRawFd for UnixStream {
230 fn as_raw_fd(&self) -> RawFd {
231 self.inner.as_raw_fd()
232 }
233}
234
235impl FromRawFd for UnixStream {
236 /// Converts a `RawFd` to a `UnixStream`.
237 ///
238 /// # Notes
239 ///
240 /// The caller is responsible for ensuring that the socket is in
241 /// non-blocking mode.
242 unsafe fn from_raw_fd(fd: RawFd) -> UnixStream {
243 UnixStream::from_std(stream:FromRawFd::from_raw_fd(fd))
244 }
245}
246