| 1 | //! Unix-specific networking extensions. |
| 2 | |
| 3 | use std::fmt; |
| 4 | use std::net::Shutdown; |
| 5 | use std::os::unix::net::UnixStream as StdUnixStream; |
| 6 | use std::pin::Pin; |
| 7 | |
| 8 | use async_io::Async; |
| 9 | |
| 10 | use super::SocketAddr; |
| 11 | use crate::io::{self, Read, Write}; |
| 12 | use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; |
| 13 | use crate::path::Path; |
| 14 | use crate::sync::Arc; |
| 15 | use crate::task::{Context, Poll}; |
| 16 | |
| 17 | /// A Unix stream socket. |
| 18 | /// |
| 19 | /// This type is an async version of [`std::os::unix::net::UnixStream`]. |
| 20 | /// |
| 21 | /// [`std::os::unix::net::UnixStream`]: |
| 22 | /// https://doc.rust-lang.org/std/os/unix/net/struct.UnixStream.html |
| 23 | /// |
| 24 | /// # Examples |
| 25 | /// |
| 26 | /// ```no_run |
| 27 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 28 | /// # |
| 29 | /// use async_std::os::unix::net::UnixStream; |
| 30 | /// use async_std::prelude::*; |
| 31 | /// |
| 32 | /// let mut stream = UnixStream::connect("/tmp/socket" ).await?; |
| 33 | /// stream.write_all(b"hello world" ).await?; |
| 34 | /// |
| 35 | /// let mut response = Vec::new(); |
| 36 | /// stream.read_to_end(&mut response).await?; |
| 37 | /// # |
| 38 | /// # Ok(()) }) } |
| 39 | /// ``` |
| 40 | #[derive (Clone)] |
| 41 | pub struct UnixStream { |
| 42 | pub(super) watcher: Arc<Async<StdUnixStream>>, |
| 43 | } |
| 44 | |
| 45 | impl UnixStream { |
| 46 | /// Connects to the socket to the specified address. |
| 47 | /// |
| 48 | /// # Examples |
| 49 | /// |
| 50 | /// ```no_run |
| 51 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 52 | /// # |
| 53 | /// use async_std::os::unix::net::UnixStream; |
| 54 | /// |
| 55 | /// let stream = UnixStream::connect("/tmp/socket" ).await?; |
| 56 | /// # |
| 57 | /// # Ok(()) }) } |
| 58 | /// ``` |
| 59 | pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { |
| 60 | let path = path.as_ref().to_owned(); |
| 61 | let stream = Arc::new(Async::<StdUnixStream>::connect(path).await?); |
| 62 | |
| 63 | Ok(UnixStream { watcher: stream }) |
| 64 | } |
| 65 | |
| 66 | /// Creates an unnamed pair of connected sockets. |
| 67 | /// |
| 68 | /// Returns two streams which are connected to each other. |
| 69 | /// |
| 70 | /// # Examples |
| 71 | /// |
| 72 | /// ```no_run |
| 73 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 74 | /// # |
| 75 | /// use async_std::os::unix::net::UnixStream; |
| 76 | /// |
| 77 | /// let stream = UnixStream::pair()?; |
| 78 | /// # |
| 79 | /// # Ok(()) }) } |
| 80 | /// ``` |
| 81 | pub fn pair() -> io::Result<(UnixStream, UnixStream)> { |
| 82 | let (a, b) = Async::<StdUnixStream>::pair()?; |
| 83 | let a = UnixStream { |
| 84 | watcher: Arc::new(a), |
| 85 | }; |
| 86 | let b = UnixStream { |
| 87 | watcher: Arc::new(b), |
| 88 | }; |
| 89 | Ok((a, b)) |
| 90 | } |
| 91 | |
| 92 | /// Returns the socket address of the local half of this connection. |
| 93 | /// |
| 94 | /// # Examples |
| 95 | /// |
| 96 | /// ```no_run |
| 97 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 98 | /// # |
| 99 | /// use async_std::os::unix::net::UnixStream; |
| 100 | /// |
| 101 | /// let stream = UnixStream::connect("/tmp/socket" ).await?; |
| 102 | /// let addr = stream.local_addr()?; |
| 103 | /// # |
| 104 | /// # Ok(()) }) } |
| 105 | /// ``` |
| 106 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
| 107 | self.watcher.get_ref().local_addr() |
| 108 | } |
| 109 | |
| 110 | /// Returns the socket address of the remote half of this connection. |
| 111 | /// |
| 112 | /// # Examples |
| 113 | /// |
| 114 | /// ```no_run |
| 115 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 116 | /// # |
| 117 | /// use async_std::os::unix::net::UnixStream; |
| 118 | /// |
| 119 | /// let stream = UnixStream::connect("/tmp/socket" ).await?; |
| 120 | /// let peer = stream.peer_addr()?; |
| 121 | /// # |
| 122 | /// # Ok(()) }) } |
| 123 | /// ``` |
| 124 | pub fn peer_addr(&self) -> io::Result<SocketAddr> { |
| 125 | self.watcher.get_ref().peer_addr() |
| 126 | } |
| 127 | |
| 128 | /// Shuts down the read, write, or both halves of this connection. |
| 129 | /// |
| 130 | /// This function will cause all pending and future I/O calls on the specified portions to |
| 131 | /// immediately return with an appropriate value (see the documentation of [`Shutdown`]). |
| 132 | /// |
| 133 | /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html |
| 134 | /// |
| 135 | /// ```no_run |
| 136 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 137 | /// # |
| 138 | /// use async_std::os::unix::net::UnixStream; |
| 139 | /// use std::net::Shutdown; |
| 140 | /// |
| 141 | /// let stream = UnixStream::connect("/tmp/socket" ).await?; |
| 142 | /// stream.shutdown(Shutdown::Both)?; |
| 143 | /// # |
| 144 | /// # Ok(()) }) } |
| 145 | /// ``` |
| 146 | pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { |
| 147 | self.watcher.get_ref().shutdown(how) |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | impl Read for UnixStream { |
| 152 | fn poll_read( |
| 153 | self: Pin<&mut Self>, |
| 154 | cx: &mut Context<'_>, |
| 155 | buf: &mut [u8], |
| 156 | ) -> Poll<io::Result<usize>> { |
| 157 | Pin::new(&mut &*self).poll_read(cx, buf) |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | impl Read for &UnixStream { |
| 162 | fn poll_read( |
| 163 | self: Pin<&mut Self>, |
| 164 | cx: &mut Context<'_>, |
| 165 | buf: &mut [u8], |
| 166 | ) -> Poll<io::Result<usize>> { |
| 167 | Pin::new(&mut &*self.watcher).poll_read(cx, buf) |
| 168 | } |
| 169 | } |
| 170 | |
| 171 | impl Write for UnixStream { |
| 172 | fn poll_write( |
| 173 | self: Pin<&mut Self>, |
| 174 | cx: &mut Context<'_>, |
| 175 | buf: &[u8], |
| 176 | ) -> Poll<io::Result<usize>> { |
| 177 | Pin::new(&mut &*self).poll_write(cx, buf) |
| 178 | } |
| 179 | |
| 180 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 181 | Pin::new(&mut &*self).poll_flush(cx) |
| 182 | } |
| 183 | |
| 184 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 185 | Pin::new(&mut &*self).poll_close(cx) |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | impl Write for &UnixStream { |
| 190 | fn poll_write( |
| 191 | self: Pin<&mut Self>, |
| 192 | cx: &mut Context<'_>, |
| 193 | buf: &[u8], |
| 194 | ) -> Poll<io::Result<usize>> { |
| 195 | Pin::new(&mut &*self.watcher).poll_write(cx, buf) |
| 196 | } |
| 197 | |
| 198 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 199 | Pin::new(&mut &*self.watcher).poll_flush(cx) |
| 200 | } |
| 201 | |
| 202 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 203 | Pin::new(&mut &*self.watcher).poll_close(cx) |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | impl fmt::Debug for UnixStream { |
| 208 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 209 | let mut builder: DebugStruct<'_, '_> = f.debug_struct(name:"UnixStream" ); |
| 210 | builder.field(name:"fd" , &self.as_raw_fd()); |
| 211 | |
| 212 | if let Ok(addr: SocketAddr) = self.local_addr() { |
| 213 | builder.field(name:"local" , &addr); |
| 214 | } |
| 215 | |
| 216 | if let Ok(addr: SocketAddr) = self.peer_addr() { |
| 217 | builder.field(name:"peer" , &addr); |
| 218 | } |
| 219 | |
| 220 | builder.finish() |
| 221 | } |
| 222 | } |
| 223 | |
| 224 | impl From<StdUnixStream> for UnixStream { |
| 225 | /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. |
| 226 | fn from(stream: StdUnixStream) -> UnixStream { |
| 227 | let stream: Async = Async::new(stream).expect(msg:"UnixStream is known to be good" ); |
| 228 | UnixStream { |
| 229 | watcher: Arc::new(data:stream), |
| 230 | } |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | impl std::convert::TryFrom<UnixStream> for StdUnixStream { |
| 235 | type Error = io::Error; |
| 236 | /// Converts a `UnixStream` into its synchronous equivalent. |
| 237 | fn try_from(stream: UnixStream) -> io::Result<StdUnixStream> { |
| 238 | let inner: UnixStream = ArcAsync::try_unwrap(stream.watcher) |
| 239 | .map_err(|_| io::Error::new( |
| 240 | kind:io::ErrorKind::Other, |
| 241 | error:"Cannot convert UnixStream to synchronous: multiple references" , |
| 242 | ))? |
| 243 | .into_inner()?; |
| 244 | inner.set_nonblocking(false)?; |
| 245 | Ok(inner) |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | impl AsRawFd for UnixStream { |
| 250 | fn as_raw_fd(&self) -> RawFd { |
| 251 | self.watcher.as_raw_fd() |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | impl FromRawFd for UnixStream { |
| 256 | unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { |
| 257 | let stream: UnixStream = std::os::unix::net::UnixStream::from_raw_fd(fd); |
| 258 | stream.into() |
| 259 | } |
| 260 | } |
| 261 | |
| 262 | impl IntoRawFd for UnixStream { |
| 263 | fn into_raw_fd(self) -> RawFd { |
| 264 | (*self.watcher).get_ref().try_clone().unwrap().into_raw_fd() |
| 265 | } |
| 266 | } |
| 267 | |
| 268 | cfg_io_safety! { |
| 269 | use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; |
| 270 | |
| 271 | impl AsFd for UnixStream { |
| 272 | fn as_fd(&self) -> BorrowedFd<'_> { |
| 273 | self.watcher.get_ref().as_fd() |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | impl From<OwnedFd> for UnixStream { |
| 278 | fn from(fd: OwnedFd) -> UnixStream { |
| 279 | std::os::unix::net::UnixStream::from(fd).into() |
| 280 | } |
| 281 | } |
| 282 | |
| 283 | impl From<UnixStream> for OwnedFd { |
| 284 | fn from(stream: UnixStream) -> OwnedFd { |
| 285 | stream.watcher.get_ref().try_clone().unwrap().into() |
| 286 | } |
| 287 | } |
| 288 | } |
| 289 | |