1 | //! Unix-specific networking extensions. |
2 | |
3 | use std::fmt; |
4 | use std::net::Shutdown; |
5 | use std::os::unix::net::UnixStream as StdUnixStream; |
6 | use std::pin::Pin; |
7 | |
8 | use async_io::Async; |
9 | |
10 | use super::SocketAddr; |
11 | use crate::io::{self, Read, Write}; |
12 | use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; |
13 | use crate::path::Path; |
14 | use crate::sync::Arc; |
15 | use crate::task::{Context, Poll}; |
16 | |
17 | /// A Unix stream socket. |
18 | /// |
19 | /// This type is an async version of [`std::os::unix::net::UnixStream`]. |
20 | /// |
21 | /// [`std::os::unix::net::UnixStream`]: |
22 | /// https://doc.rust-lang.org/std/os/unix/net/struct.UnixStream.html |
23 | /// |
24 | /// # Examples |
25 | /// |
26 | /// ```no_run |
27 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
28 | /// # |
29 | /// use async_std::os::unix::net::UnixStream; |
30 | /// use async_std::prelude::*; |
31 | /// |
32 | /// let mut stream = UnixStream::connect("/tmp/socket" ).await?; |
33 | /// stream.write_all(b"hello world" ).await?; |
34 | /// |
35 | /// let mut response = Vec::new(); |
36 | /// stream.read_to_end(&mut response).await?; |
37 | /// # |
38 | /// # Ok(()) }) } |
39 | /// ``` |
40 | #[derive (Clone)] |
41 | pub struct UnixStream { |
42 | pub(super) watcher: Arc<Async<StdUnixStream>>, |
43 | } |
44 | |
45 | impl UnixStream { |
46 | /// Connects to the socket to the specified address. |
47 | /// |
48 | /// # Examples |
49 | /// |
50 | /// ```no_run |
51 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
52 | /// # |
53 | /// use async_std::os::unix::net::UnixStream; |
54 | /// |
55 | /// let stream = UnixStream::connect("/tmp/socket" ).await?; |
56 | /// # |
57 | /// # Ok(()) }) } |
58 | /// ``` |
59 | pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { |
60 | let path = path.as_ref().to_owned(); |
61 | let stream = Arc::new(Async::<StdUnixStream>::connect(path).await?); |
62 | |
63 | Ok(UnixStream { watcher: stream }) |
64 | } |
65 | |
66 | /// Creates an unnamed pair of connected sockets. |
67 | /// |
68 | /// Returns two streams which are connected to each other. |
69 | /// |
70 | /// # Examples |
71 | /// |
72 | /// ```no_run |
73 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
74 | /// # |
75 | /// use async_std::os::unix::net::UnixStream; |
76 | /// |
77 | /// let stream = UnixStream::pair()?; |
78 | /// # |
79 | /// # Ok(()) }) } |
80 | /// ``` |
81 | pub fn pair() -> io::Result<(UnixStream, UnixStream)> { |
82 | let (a, b) = Async::<StdUnixStream>::pair()?; |
83 | let a = UnixStream { |
84 | watcher: Arc::new(a), |
85 | }; |
86 | let b = UnixStream { |
87 | watcher: Arc::new(b), |
88 | }; |
89 | Ok((a, b)) |
90 | } |
91 | |
92 | /// Returns the socket address of the local half of this connection. |
93 | /// |
94 | /// # Examples |
95 | /// |
96 | /// ```no_run |
97 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
98 | /// # |
99 | /// use async_std::os::unix::net::UnixStream; |
100 | /// |
101 | /// let stream = UnixStream::connect("/tmp/socket" ).await?; |
102 | /// let addr = stream.local_addr()?; |
103 | /// # |
104 | /// # Ok(()) }) } |
105 | /// ``` |
106 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
107 | self.watcher.get_ref().local_addr() |
108 | } |
109 | |
110 | /// Returns the socket address of the remote half of this connection. |
111 | /// |
112 | /// # Examples |
113 | /// |
114 | /// ```no_run |
115 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
116 | /// # |
117 | /// use async_std::os::unix::net::UnixStream; |
118 | /// |
119 | /// let stream = UnixStream::connect("/tmp/socket" ).await?; |
120 | /// let peer = stream.peer_addr()?; |
121 | /// # |
122 | /// # Ok(()) }) } |
123 | /// ``` |
124 | pub fn peer_addr(&self) -> io::Result<SocketAddr> { |
125 | self.watcher.get_ref().peer_addr() |
126 | } |
127 | |
128 | /// Shuts down the read, write, or both halves of this connection. |
129 | /// |
130 | /// This function will cause all pending and future I/O calls on the specified portions to |
131 | /// immediately return with an appropriate value (see the documentation of [`Shutdown`]). |
132 | /// |
133 | /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html |
134 | /// |
135 | /// ```no_run |
136 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
137 | /// # |
138 | /// use async_std::os::unix::net::UnixStream; |
139 | /// use std::net::Shutdown; |
140 | /// |
141 | /// let stream = UnixStream::connect("/tmp/socket" ).await?; |
142 | /// stream.shutdown(Shutdown::Both)?; |
143 | /// # |
144 | /// # Ok(()) }) } |
145 | /// ``` |
146 | pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { |
147 | self.watcher.get_ref().shutdown(how) |
148 | } |
149 | } |
150 | |
151 | impl Read for UnixStream { |
152 | fn poll_read( |
153 | self: Pin<&mut Self>, |
154 | cx: &mut Context<'_>, |
155 | buf: &mut [u8], |
156 | ) -> Poll<io::Result<usize>> { |
157 | Pin::new(&mut &*self).poll_read(cx, buf) |
158 | } |
159 | } |
160 | |
161 | impl Read for &UnixStream { |
162 | fn poll_read( |
163 | self: Pin<&mut Self>, |
164 | cx: &mut Context<'_>, |
165 | buf: &mut [u8], |
166 | ) -> Poll<io::Result<usize>> { |
167 | Pin::new(&mut &*self.watcher).poll_read(cx, buf) |
168 | } |
169 | } |
170 | |
171 | impl Write for UnixStream { |
172 | fn poll_write( |
173 | self: Pin<&mut Self>, |
174 | cx: &mut Context<'_>, |
175 | buf: &[u8], |
176 | ) -> Poll<io::Result<usize>> { |
177 | Pin::new(&mut &*self).poll_write(cx, buf) |
178 | } |
179 | |
180 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
181 | Pin::new(&mut &*self).poll_flush(cx) |
182 | } |
183 | |
184 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
185 | Pin::new(&mut &*self).poll_close(cx) |
186 | } |
187 | } |
188 | |
189 | impl Write for &UnixStream { |
190 | fn poll_write( |
191 | self: Pin<&mut Self>, |
192 | cx: &mut Context<'_>, |
193 | buf: &[u8], |
194 | ) -> Poll<io::Result<usize>> { |
195 | Pin::new(&mut &*self.watcher).poll_write(cx, buf) |
196 | } |
197 | |
198 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
199 | Pin::new(&mut &*self.watcher).poll_flush(cx) |
200 | } |
201 | |
202 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
203 | Pin::new(&mut &*self.watcher).poll_close(cx) |
204 | } |
205 | } |
206 | |
207 | impl fmt::Debug for UnixStream { |
208 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
209 | let mut builder: DebugStruct<'_, '_> = f.debug_struct(name:"UnixStream" ); |
210 | builder.field(name:"fd" , &self.as_raw_fd()); |
211 | |
212 | if let Ok(addr: SocketAddr) = self.local_addr() { |
213 | builder.field(name:"local" , &addr); |
214 | } |
215 | |
216 | if let Ok(addr: SocketAddr) = self.peer_addr() { |
217 | builder.field(name:"peer" , &addr); |
218 | } |
219 | |
220 | builder.finish() |
221 | } |
222 | } |
223 | |
224 | impl From<StdUnixStream> for UnixStream { |
225 | /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. |
226 | fn from(stream: StdUnixStream) -> UnixStream { |
227 | let stream: Async = Async::new(stream).expect(msg:"UnixStream is known to be good" ); |
228 | UnixStream { |
229 | watcher: Arc::new(data:stream), |
230 | } |
231 | } |
232 | } |
233 | |
234 | impl std::convert::TryFrom<UnixStream> for StdUnixStream { |
235 | type Error = io::Error; |
236 | /// Converts a `UnixStream` into its synchronous equivalent. |
237 | fn try_from(stream: UnixStream) -> io::Result<StdUnixStream> { |
238 | let inner: UnixStream = ArcAsync::try_unwrap(stream.watcher) |
239 | .map_err(|_| io::Error::new( |
240 | kind:io::ErrorKind::Other, |
241 | error:"Cannot convert UnixStream to synchronous: multiple references" , |
242 | ))? |
243 | .into_inner()?; |
244 | inner.set_nonblocking(false)?; |
245 | Ok(inner) |
246 | } |
247 | } |
248 | |
249 | impl AsRawFd for UnixStream { |
250 | fn as_raw_fd(&self) -> RawFd { |
251 | self.watcher.as_raw_fd() |
252 | } |
253 | } |
254 | |
255 | impl FromRawFd for UnixStream { |
256 | unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { |
257 | let stream: UnixStream = std::os::unix::net::UnixStream::from_raw_fd(fd); |
258 | stream.into() |
259 | } |
260 | } |
261 | |
262 | impl IntoRawFd for UnixStream { |
263 | fn into_raw_fd(self) -> RawFd { |
264 | (*self.watcher).get_ref().try_clone().unwrap().into_raw_fd() |
265 | } |
266 | } |
267 | |
268 | cfg_io_safety! { |
269 | use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; |
270 | |
271 | impl AsFd for UnixStream { |
272 | fn as_fd(&self) -> BorrowedFd<'_> { |
273 | self.watcher.get_ref().as_fd() |
274 | } |
275 | } |
276 | |
277 | impl From<OwnedFd> for UnixStream { |
278 | fn from(fd: OwnedFd) -> UnixStream { |
279 | std::os::unix::net::UnixStream::from(fd).into() |
280 | } |
281 | } |
282 | |
283 | impl From<UnixStream> for OwnedFd { |
284 | fn from(stream: UnixStream) -> OwnedFd { |
285 | stream.watcher.get_ref().try_clone().unwrap().into() |
286 | } |
287 | } |
288 | } |
289 | |