1use crate::io_source::IoSource;
2use crate::net::SocketAddr;
3use crate::{event, sys, Interest, Registry, Token};
4
5use std::fmt;
6use std::io::{self, IoSlice, IoSliceMut, Read, Write};
7use std::net::Shutdown;
8use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
9use std::os::unix::net;
10use std::path::Path;
11
12/// A non-blocking Unix stream socket.
13pub struct UnixStream {
14 inner: IoSource<net::UnixStream>,
15}
16
17impl UnixStream {
18 /// Connects to the socket named by `path`.
19 ///
20 /// This may return a `WouldBlock` in which case the socket connection
21 /// cannot be completed immediately. Usually it means the backlog is full.
22 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
23 sys::uds::stream::connect(path.as_ref()).map(UnixStream::from_std)
24 }
25
26 /// Connects to the socket named by `address`.
27 ///
28 /// This may return a `WouldBlock` in which case the socket connection
29 /// cannot be completed immediately. Usually it means the backlog is full.
30 pub fn connect_addr(address: &SocketAddr) -> io::Result<UnixStream> {
31 sys::uds::stream::connect_addr(address).map(UnixStream::from_std)
32 }
33
34 /// Creates a new `UnixStream` from a standard `net::UnixStream`.
35 ///
36 /// This function is intended to be used to wrap a Unix stream from the
37 /// standard library in the Mio equivalent. The conversion assumes nothing
38 /// about the underlying stream; it is left up to the user to set it in
39 /// non-blocking mode.
40 ///
41 /// # Note
42 ///
43 /// The Unix stream here will not have `connect` called on it, so it
44 /// should already be connected via some other means (be it manually, or
45 /// the standard library).
46 pub fn from_std(stream: net::UnixStream) -> UnixStream {
47 UnixStream {
48 inner: IoSource::new(stream),
49 }
50 }
51
52 /// Creates an unnamed pair of connected sockets.
53 ///
54 /// Returns two `UnixStream`s which are connected to each other.
55 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
56 sys::uds::stream::pair().map(|(stream1, stream2)| {
57 (UnixStream::from_std(stream1), UnixStream::from_std(stream2))
58 })
59 }
60
61 /// Returns the socket address of the local half of this connection.
62 pub fn local_addr(&self) -> io::Result<sys::SocketAddr> {
63 sys::uds::stream::local_addr(&self.inner)
64 }
65
66 /// Returns the socket address of the remote half of this connection.
67 pub fn peer_addr(&self) -> io::Result<sys::SocketAddr> {
68 sys::uds::stream::peer_addr(&self.inner)
69 }
70
71 /// Returns the value of the `SO_ERROR` option.
72 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
73 self.inner.take_error()
74 }
75
76 /// Shuts down the read, write, or both halves of this connection.
77 ///
78 /// This function will cause all pending and future I/O calls on the
79 /// specified portions to immediately return with an appropriate value
80 /// (see the documentation of `Shutdown`).
81 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
82 self.inner.shutdown(how)
83 }
84
85 /// Execute an I/O operation ensuring that the socket receives more events
86 /// if it hits a [`WouldBlock`] error.
87 ///
88 /// # Notes
89 ///
90 /// This method is required to be called for **all** I/O operations to
91 /// ensure the user will receive events once the socket is ready again after
92 /// returning a [`WouldBlock`] error.
93 ///
94 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
95 ///
96 /// # Examples
97 ///
98 /// ```
99 /// # use std::error::Error;
100 /// #
101 /// # fn main() -> Result<(), Box<dyn Error>> {
102 /// use std::io;
103 /// use std::os::unix::io::AsRawFd;
104 /// use mio::net::UnixStream;
105 ///
106 /// let (stream1, stream2) = UnixStream::pair()?;
107 ///
108 /// // Wait until the stream is writable...
109 ///
110 /// // Write to the stream using a direct libc call, of course the
111 /// // `io::Write` implementation would be easier to use.
112 /// let buf = b"hello";
113 /// let n = stream1.try_io(|| {
114 /// let buf_ptr = &buf as *const _ as *const _;
115 /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) };
116 /// if res != -1 {
117 /// Ok(res as usize)
118 /// } else {
119 /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure
120 /// // should return `WouldBlock` error.
121 /// Err(io::Error::last_os_error())
122 /// }
123 /// })?;
124 /// eprintln!("write {} bytes", n);
125 ///
126 /// // Wait until the stream is readable...
127 ///
128 /// // Read from the stream using a direct libc call, of course the
129 /// // `io::Read` implementation would be easier to use.
130 /// let mut buf = [0; 512];
131 /// let n = stream2.try_io(|| {
132 /// let buf_ptr = &mut buf as *mut _ as *mut _;
133 /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) };
134 /// if res != -1 {
135 /// Ok(res as usize)
136 /// } else {
137 /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
138 /// // should return `WouldBlock` error.
139 /// Err(io::Error::last_os_error())
140 /// }
141 /// })?;
142 /// eprintln!("read {} bytes", n);
143 /// # Ok(())
144 /// # }
145 /// ```
146 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
147 where
148 F: FnOnce() -> io::Result<T>,
149 {
150 self.inner.do_io(|_| f())
151 }
152}
153
154impl Read for UnixStream {
155 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
156 self.inner.do_io(|mut inner: &UnixStream| inner.read(buf))
157 }
158
159 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
160 self.inner.do_io(|mut inner: &UnixStream| inner.read_vectored(bufs))
161 }
162}
163
164impl<'a> Read for &'a UnixStream {
165 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
166 self.inner.do_io(|mut inner: &UnixStream| inner.read(buf))
167 }
168
169 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
170 self.inner.do_io(|mut inner: &UnixStream| inner.read_vectored(bufs))
171 }
172}
173
174impl Write for UnixStream {
175 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176 self.inner.do_io(|mut inner: &UnixStream| inner.write(buf))
177 }
178
179 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
180 self.inner.do_io(|mut inner: &UnixStream| inner.write_vectored(bufs))
181 }
182
183 fn flush(&mut self) -> io::Result<()> {
184 self.inner.do_io(|mut inner: &UnixStream| inner.flush())
185 }
186}
187
188impl<'a> Write for &'a UnixStream {
189 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
190 self.inner.do_io(|mut inner: &UnixStream| inner.write(buf))
191 }
192
193 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
194 self.inner.do_io(|mut inner: &UnixStream| inner.write_vectored(bufs))
195 }
196
197 fn flush(&mut self) -> io::Result<()> {
198 self.inner.do_io(|mut inner: &UnixStream| inner.flush())
199 }
200}
201
202impl event::Source for UnixStream {
203 fn register(
204 &mut self,
205 registry: &Registry,
206 token: Token,
207 interests: Interest,
208 ) -> io::Result<()> {
209 self.inner.register(registry, token, interests)
210 }
211
212 fn reregister(
213 &mut self,
214 registry: &Registry,
215 token: Token,
216 interests: Interest,
217 ) -> io::Result<()> {
218 self.inner.reregister(registry, token, interests)
219 }
220
221 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
222 self.inner.deregister(registry)
223 }
224}
225
226impl fmt::Debug for UnixStream {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 self.inner.fmt(f)
229 }
230}
231
232impl IntoRawFd for UnixStream {
233 fn into_raw_fd(self) -> RawFd {
234 self.inner.into_inner().into_raw_fd()
235 }
236}
237
238impl AsRawFd for UnixStream {
239 fn as_raw_fd(&self) -> RawFd {
240 self.inner.as_raw_fd()
241 }
242}
243
244impl FromRawFd for UnixStream {
245 /// Converts a `RawFd` to a `UnixStream`.
246 ///
247 /// # Notes
248 ///
249 /// The caller is responsible for ensuring that the socket is in
250 /// non-blocking mode.
251 unsafe fn from_raw_fd(fd: RawFd) -> UnixStream {
252 UnixStream::from_std(stream:FromRawFd::from_raw_fd(fd))
253 }
254}
255