| 1 | use std::fmt; |
| 2 | use std::io::{self, IoSlice, IoSliceMut, Read, Write}; |
| 3 | use std::net::Shutdown; |
| 4 | use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; |
| 5 | use std::os::unix::net::{self, SocketAddr}; |
| 6 | use std::path::Path; |
| 7 | |
| 8 | use crate::io_source::IoSource; |
| 9 | use crate::{event, sys, Interest, Registry, Token}; |
| 10 | |
| 11 | /// A non-blocking Unix stream socket. |
| 12 | pub struct UnixStream { |
| 13 | inner: IoSource<net::UnixStream>, |
| 14 | } |
| 15 | |
| 16 | impl UnixStream { |
| 17 | /// Connects to the socket named by `path`. |
| 18 | /// |
| 19 | /// This may return a `WouldBlock` in which case the socket connection |
| 20 | /// cannot be completed immediately. Usually it means the backlog is full. |
| 21 | pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { |
| 22 | let addr = SocketAddr::from_pathname(path)?; |
| 23 | UnixStream::connect_addr(&addr) |
| 24 | } |
| 25 | |
| 26 | /// Connects to the socket named by `address`. |
| 27 | /// |
| 28 | /// This may return a `WouldBlock` in which case the socket connection |
| 29 | /// cannot be completed immediately. Usually it means the backlog is full. |
| 30 | pub fn connect_addr(address: &SocketAddr) -> io::Result<UnixStream> { |
| 31 | sys::uds::stream::connect_addr(address).map(UnixStream::from_std) |
| 32 | } |
| 33 | |
| 34 | /// Creates a new `UnixStream` from a standard `net::UnixStream`. |
| 35 | /// |
| 36 | /// This function is intended to be used to wrap a Unix stream from the |
| 37 | /// standard library in the Mio equivalent. The conversion assumes nothing |
| 38 | /// about the underlying stream; it is left up to the user to set it in |
| 39 | /// non-blocking mode. |
| 40 | /// |
| 41 | /// # Note |
| 42 | /// |
| 43 | /// The Unix stream here will not have `connect` called on it, so it |
| 44 | /// should already be connected via some other means (be it manually, or |
| 45 | /// the standard library). |
| 46 | pub fn from_std(stream: net::UnixStream) -> UnixStream { |
| 47 | UnixStream { |
| 48 | inner: IoSource::new(stream), |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | /// Creates an unnamed pair of connected sockets. |
| 53 | /// |
| 54 | /// Returns two `UnixStream`s which are connected to each other. |
| 55 | pub fn pair() -> io::Result<(UnixStream, UnixStream)> { |
| 56 | sys::uds::stream::pair().map(|(stream1, stream2)| { |
| 57 | (UnixStream::from_std(stream1), UnixStream::from_std(stream2)) |
| 58 | }) |
| 59 | } |
| 60 | |
| 61 | /// Returns the socket address of the local half of this connection. |
| 62 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
| 63 | self.inner.local_addr() |
| 64 | } |
| 65 | |
| 66 | /// Returns the socket address of the remote half of this connection. |
| 67 | pub fn peer_addr(&self) -> io::Result<SocketAddr> { |
| 68 | self.inner.peer_addr() |
| 69 | } |
| 70 | |
| 71 | /// Returns the value of the `SO_ERROR` option. |
| 72 | pub fn take_error(&self) -> io::Result<Option<io::Error>> { |
| 73 | self.inner.take_error() |
| 74 | } |
| 75 | |
| 76 | /// Shuts down the read, write, or both halves of this connection. |
| 77 | /// |
| 78 | /// This function will cause all pending and future I/O calls on the |
| 79 | /// specified portions to immediately return with an appropriate value |
| 80 | /// (see the documentation of `Shutdown`). |
| 81 | pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { |
| 82 | self.inner.shutdown(how) |
| 83 | } |
| 84 | |
| 85 | /// Execute an I/O operation ensuring that the socket receives more events |
| 86 | /// if it hits a [`WouldBlock`] error. |
| 87 | /// |
| 88 | /// # Notes |
| 89 | /// |
| 90 | /// This method is required to be called for **all** I/O operations to |
| 91 | /// ensure the user will receive events once the socket is ready again after |
| 92 | /// returning a [`WouldBlock`] error. |
| 93 | /// |
| 94 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
| 95 | /// |
| 96 | /// # Examples |
| 97 | /// |
| 98 | /// ``` |
| 99 | /// # use std::error::Error; |
| 100 | /// # |
| 101 | /// # fn main() -> Result<(), Box<dyn Error>> { |
| 102 | /// use std::io; |
| 103 | /// use std::os::fd::AsRawFd; |
| 104 | /// use mio::net::UnixStream; |
| 105 | /// |
| 106 | /// let (stream1, stream2) = UnixStream::pair()?; |
| 107 | /// |
| 108 | /// // Wait until the stream is writable... |
| 109 | /// |
| 110 | /// // Write to the stream using a direct libc call, of course the |
| 111 | /// // `io::Write` implementation would be easier to use. |
| 112 | /// let buf = b"hello" ; |
| 113 | /// let n = stream1.try_io(|| { |
| 114 | /// let buf_ptr = &buf as *const _ as *const _; |
| 115 | /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) }; |
| 116 | /// if res != -1 { |
| 117 | /// Ok(res as usize) |
| 118 | /// } else { |
| 119 | /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure |
| 120 | /// // should return `WouldBlock` error. |
| 121 | /// Err(io::Error::last_os_error()) |
| 122 | /// } |
| 123 | /// })?; |
| 124 | /// eprintln!("write {} bytes" , n); |
| 125 | /// |
| 126 | /// // Wait until the stream is readable... |
| 127 | /// |
| 128 | /// // Read from the stream using a direct libc call, of course the |
| 129 | /// // `io::Read` implementation would be easier to use. |
| 130 | /// let mut buf = [0; 512]; |
| 131 | /// let n = stream2.try_io(|| { |
| 132 | /// let buf_ptr = &mut buf as *mut _ as *mut _; |
| 133 | /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) }; |
| 134 | /// if res != -1 { |
| 135 | /// Ok(res as usize) |
| 136 | /// } else { |
| 137 | /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure |
| 138 | /// // should return `WouldBlock` error. |
| 139 | /// Err(io::Error::last_os_error()) |
| 140 | /// } |
| 141 | /// })?; |
| 142 | /// eprintln!("read {} bytes" , n); |
| 143 | /// # Ok(()) |
| 144 | /// # } |
| 145 | /// ``` |
| 146 | pub fn try_io<F, T>(&self, f: F) -> io::Result<T> |
| 147 | where |
| 148 | F: FnOnce() -> io::Result<T>, |
| 149 | { |
| 150 | self.inner.do_io(|_| f()) |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | impl Read for UnixStream { |
| 155 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| 156 | self.inner.do_io(|mut inner: &UnixStream| inner.read(buf)) |
| 157 | } |
| 158 | |
| 159 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { |
| 160 | self.inner.do_io(|mut inner: &UnixStream| inner.read_vectored(bufs)) |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | impl<'a> Read for &'a UnixStream { |
| 165 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| 166 | self.inner.do_io(|mut inner: &UnixStream| inner.read(buf)) |
| 167 | } |
| 168 | |
| 169 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { |
| 170 | self.inner.do_io(|mut inner: &UnixStream| inner.read_vectored(bufs)) |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | impl Write for UnixStream { |
| 175 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| 176 | self.inner.do_io(|mut inner: &UnixStream| inner.write(buf)) |
| 177 | } |
| 178 | |
| 179 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
| 180 | self.inner.do_io(|mut inner: &UnixStream| inner.write_vectored(bufs)) |
| 181 | } |
| 182 | |
| 183 | fn flush(&mut self) -> io::Result<()> { |
| 184 | self.inner.do_io(|mut inner: &UnixStream| inner.flush()) |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | impl<'a> Write for &'a UnixStream { |
| 189 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| 190 | self.inner.do_io(|mut inner: &UnixStream| inner.write(buf)) |
| 191 | } |
| 192 | |
| 193 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
| 194 | self.inner.do_io(|mut inner: &UnixStream| inner.write_vectored(bufs)) |
| 195 | } |
| 196 | |
| 197 | fn flush(&mut self) -> io::Result<()> { |
| 198 | self.inner.do_io(|mut inner: &UnixStream| inner.flush()) |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | impl event::Source for UnixStream { |
| 203 | fn register( |
| 204 | &mut self, |
| 205 | registry: &Registry, |
| 206 | token: Token, |
| 207 | interests: Interest, |
| 208 | ) -> io::Result<()> { |
| 209 | self.inner.register(registry, token, interests) |
| 210 | } |
| 211 | |
| 212 | fn reregister( |
| 213 | &mut self, |
| 214 | registry: &Registry, |
| 215 | token: Token, |
| 216 | interests: Interest, |
| 217 | ) -> io::Result<()> { |
| 218 | self.inner.reregister(registry, token, interests) |
| 219 | } |
| 220 | |
| 221 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
| 222 | self.inner.deregister(registry) |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | impl fmt::Debug for UnixStream { |
| 227 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 228 | self.inner.fmt(f) |
| 229 | } |
| 230 | } |
| 231 | |
| 232 | impl IntoRawFd for UnixStream { |
| 233 | fn into_raw_fd(self) -> RawFd { |
| 234 | self.inner.into_inner().into_raw_fd() |
| 235 | } |
| 236 | } |
| 237 | |
| 238 | impl AsRawFd for UnixStream { |
| 239 | fn as_raw_fd(&self) -> RawFd { |
| 240 | self.inner.as_raw_fd() |
| 241 | } |
| 242 | } |
| 243 | |
| 244 | impl FromRawFd for UnixStream { |
| 245 | /// Converts a `RawFd` to a `UnixStream`. |
| 246 | /// |
| 247 | /// # Notes |
| 248 | /// |
| 249 | /// The caller is responsible for ensuring that the socket is in |
| 250 | /// non-blocking mode. |
| 251 | unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { |
| 252 | UnixStream::from_std(stream:FromRawFd::from_raw_fd(fd)) |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | impl From<UnixStream> for net::UnixStream { |
| 257 | fn from(stream: UnixStream) -> Self { |
| 258 | // Safety: This is safe since we are extracting the raw fd from a well-constructed |
| 259 | // mio::net::uds::UnixStream which ensures that we actually pass in a valid file |
| 260 | // descriptor/socket |
| 261 | unsafe { net::UnixStream::from_raw_fd(stream.into_raw_fd()) } |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | impl From<UnixStream> for OwnedFd { |
| 266 | fn from(unix_stream: UnixStream) -> Self { |
| 267 | unix_stream.inner.into_inner().into() |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | impl AsFd for UnixStream { |
| 272 | fn as_fd(&self) -> BorrowedFd<'_> { |
| 273 | self.inner.as_fd() |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | impl From<OwnedFd> for UnixStream { |
| 278 | fn from(fd: OwnedFd) -> Self { |
| 279 | UnixStream::from_std(stream:From::from(fd)) |
| 280 | } |
| 281 | } |
| 282 | |