1 | use crate::io_source::IoSource; |
2 | use crate::net::SocketAddr; |
3 | use crate::{event, sys, Interest, Registry, Token}; |
4 | |
5 | use std::fmt; |
6 | use std::io::{self, IoSlice, IoSliceMut, Read, Write}; |
7 | use std::net::Shutdown; |
8 | use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; |
9 | use std::os::unix::net; |
10 | use std::path::Path; |
11 | |
12 | /// A non-blocking Unix stream socket. |
13 | pub struct UnixStream { |
14 | inner: IoSource<net::UnixStream>, |
15 | } |
16 | |
17 | impl UnixStream { |
18 | /// Connects to the socket named by `path`. |
19 | /// |
20 | /// This may return a `WouldBlock` in which case the socket connection |
21 | /// cannot be completed immediately. Usually it means the backlog is full. |
22 | pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { |
23 | sys::uds::stream::connect(path.as_ref()).map(UnixStream::from_std) |
24 | } |
25 | |
26 | /// Connects to the socket named by `address`. |
27 | /// |
28 | /// This may return a `WouldBlock` in which case the socket connection |
29 | /// cannot be completed immediately. Usually it means the backlog is full. |
30 | pub fn connect_addr(address: &SocketAddr) -> io::Result<UnixStream> { |
31 | sys::uds::stream::connect_addr(address).map(UnixStream::from_std) |
32 | } |
33 | |
34 | /// Creates a new `UnixStream` from a standard `net::UnixStream`. |
35 | /// |
36 | /// This function is intended to be used to wrap a Unix stream from the |
37 | /// standard library in the Mio equivalent. The conversion assumes nothing |
38 | /// about the underlying stream; it is left up to the user to set it in |
39 | /// non-blocking mode. |
40 | /// |
41 | /// # Note |
42 | /// |
43 | /// The Unix stream here will not have `connect` called on it, so it |
44 | /// should already be connected via some other means (be it manually, or |
45 | /// the standard library). |
46 | pub fn from_std(stream: net::UnixStream) -> UnixStream { |
47 | UnixStream { |
48 | inner: IoSource::new(stream), |
49 | } |
50 | } |
51 | |
52 | /// Creates an unnamed pair of connected sockets. |
53 | /// |
54 | /// Returns two `UnixStream`s which are connected to each other. |
55 | pub fn pair() -> io::Result<(UnixStream, UnixStream)> { |
56 | sys::uds::stream::pair().map(|(stream1, stream2)| { |
57 | (UnixStream::from_std(stream1), UnixStream::from_std(stream2)) |
58 | }) |
59 | } |
60 | |
61 | /// Returns the socket address of the local half of this connection. |
62 | pub fn local_addr(&self) -> io::Result<sys::SocketAddr> { |
63 | sys::uds::stream::local_addr(&self.inner) |
64 | } |
65 | |
66 | /// Returns the socket address of the remote half of this connection. |
67 | pub fn peer_addr(&self) -> io::Result<sys::SocketAddr> { |
68 | sys::uds::stream::peer_addr(&self.inner) |
69 | } |
70 | |
71 | /// Returns the value of the `SO_ERROR` option. |
72 | pub fn take_error(&self) -> io::Result<Option<io::Error>> { |
73 | self.inner.take_error() |
74 | } |
75 | |
76 | /// Shuts down the read, write, or both halves of this connection. |
77 | /// |
78 | /// This function will cause all pending and future I/O calls on the |
79 | /// specified portions to immediately return with an appropriate value |
80 | /// (see the documentation of `Shutdown`). |
81 | pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { |
82 | self.inner.shutdown(how) |
83 | } |
84 | |
85 | /// Execute an I/O operation ensuring that the socket receives more events |
86 | /// if it hits a [`WouldBlock`] error. |
87 | /// |
88 | /// # Notes |
89 | /// |
90 | /// This method is required to be called for **all** I/O operations to |
91 | /// ensure the user will receive events once the socket is ready again after |
92 | /// returning a [`WouldBlock`] error. |
93 | /// |
94 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
95 | /// |
96 | /// # Examples |
97 | /// |
98 | /// ``` |
99 | /// # use std::error::Error; |
100 | /// # |
101 | /// # fn main() -> Result<(), Box<dyn Error>> { |
102 | /// use std::io; |
103 | /// use std::os::unix::io::AsRawFd; |
104 | /// use mio::net::UnixStream; |
105 | /// |
106 | /// let (stream1, stream2) = UnixStream::pair()?; |
107 | /// |
108 | /// // Wait until the stream is writable... |
109 | /// |
110 | /// // Write to the stream using a direct libc call, of course the |
111 | /// // `io::Write` implementation would be easier to use. |
112 | /// let buf = b"hello" ; |
113 | /// let n = stream1.try_io(|| { |
114 | /// let buf_ptr = &buf as *const _ as *const _; |
115 | /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) }; |
116 | /// if res != -1 { |
117 | /// Ok(res as usize) |
118 | /// } else { |
119 | /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure |
120 | /// // should return `WouldBlock` error. |
121 | /// Err(io::Error::last_os_error()) |
122 | /// } |
123 | /// })?; |
124 | /// eprintln!("write {} bytes" , n); |
125 | /// |
126 | /// // Wait until the stream is readable... |
127 | /// |
128 | /// // Read from the stream using a direct libc call, of course the |
129 | /// // `io::Read` implementation would be easier to use. |
130 | /// let mut buf = [0; 512]; |
131 | /// let n = stream2.try_io(|| { |
132 | /// let buf_ptr = &mut buf as *mut _ as *mut _; |
133 | /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) }; |
134 | /// if res != -1 { |
135 | /// Ok(res as usize) |
136 | /// } else { |
137 | /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure |
138 | /// // should return `WouldBlock` error. |
139 | /// Err(io::Error::last_os_error()) |
140 | /// } |
141 | /// })?; |
142 | /// eprintln!("read {} bytes" , n); |
143 | /// # Ok(()) |
144 | /// # } |
145 | /// ``` |
146 | pub fn try_io<F, T>(&self, f: F) -> io::Result<T> |
147 | where |
148 | F: FnOnce() -> io::Result<T>, |
149 | { |
150 | self.inner.do_io(|_| f()) |
151 | } |
152 | } |
153 | |
154 | impl Read for UnixStream { |
155 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
156 | self.inner.do_io(|mut inner: &UnixStream| inner.read(buf)) |
157 | } |
158 | |
159 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { |
160 | self.inner.do_io(|mut inner: &UnixStream| inner.read_vectored(bufs)) |
161 | } |
162 | } |
163 | |
164 | impl<'a> Read for &'a UnixStream { |
165 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
166 | self.inner.do_io(|mut inner: &UnixStream| inner.read(buf)) |
167 | } |
168 | |
169 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { |
170 | self.inner.do_io(|mut inner: &UnixStream| inner.read_vectored(bufs)) |
171 | } |
172 | } |
173 | |
174 | impl Write for UnixStream { |
175 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
176 | self.inner.do_io(|mut inner: &UnixStream| inner.write(buf)) |
177 | } |
178 | |
179 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
180 | self.inner.do_io(|mut inner: &UnixStream| inner.write_vectored(bufs)) |
181 | } |
182 | |
183 | fn flush(&mut self) -> io::Result<()> { |
184 | self.inner.do_io(|mut inner: &UnixStream| inner.flush()) |
185 | } |
186 | } |
187 | |
188 | impl<'a> Write for &'a UnixStream { |
189 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
190 | self.inner.do_io(|mut inner: &UnixStream| inner.write(buf)) |
191 | } |
192 | |
193 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
194 | self.inner.do_io(|mut inner: &UnixStream| inner.write_vectored(bufs)) |
195 | } |
196 | |
197 | fn flush(&mut self) -> io::Result<()> { |
198 | self.inner.do_io(|mut inner: &UnixStream| inner.flush()) |
199 | } |
200 | } |
201 | |
202 | impl event::Source for UnixStream { |
203 | fn register( |
204 | &mut self, |
205 | registry: &Registry, |
206 | token: Token, |
207 | interests: Interest, |
208 | ) -> io::Result<()> { |
209 | self.inner.register(registry, token, interests) |
210 | } |
211 | |
212 | fn reregister( |
213 | &mut self, |
214 | registry: &Registry, |
215 | token: Token, |
216 | interests: Interest, |
217 | ) -> io::Result<()> { |
218 | self.inner.reregister(registry, token, interests) |
219 | } |
220 | |
221 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
222 | self.inner.deregister(registry) |
223 | } |
224 | } |
225 | |
226 | impl fmt::Debug for UnixStream { |
227 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
228 | self.inner.fmt(f) |
229 | } |
230 | } |
231 | |
232 | impl IntoRawFd for UnixStream { |
233 | fn into_raw_fd(self) -> RawFd { |
234 | self.inner.into_inner().into_raw_fd() |
235 | } |
236 | } |
237 | |
238 | impl AsRawFd for UnixStream { |
239 | fn as_raw_fd(&self) -> RawFd { |
240 | self.inner.as_raw_fd() |
241 | } |
242 | } |
243 | |
244 | impl FromRawFd for UnixStream { |
245 | /// Converts a `RawFd` to a `UnixStream`. |
246 | /// |
247 | /// # Notes |
248 | /// |
249 | /// The caller is responsible for ensuring that the socket is in |
250 | /// non-blocking mode. |
251 | unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { |
252 | UnixStream::from_std(stream:FromRawFd::from_raw_fd(fd)) |
253 | } |
254 | } |
255 | |