1 | use std::fmt; |
2 | use std::net::SocketAddr; |
3 | use std::net::TcpStream as StdTcpStream; |
4 | use std::pin::Pin; |
5 | |
6 | use async_io::Async; |
7 | |
8 | use crate::io; |
9 | use crate::net::{TcpStream, ToSocketAddrs}; |
10 | use crate::stream::Stream; |
11 | use crate::sync::Arc; |
12 | use crate::task::{ready, Context, Poll}; |
13 | |
14 | /// A TCP socket server, listening for connections. |
15 | /// |
16 | /// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming |
17 | /// TCP connections. These can be accepted by awaiting elements from the async stream of |
18 | /// [`incoming`] connections. |
19 | /// |
20 | /// The socket will be closed when the value is dropped. |
21 | /// |
22 | /// The Transmission Control Protocol is specified in [IETF RFC 793]. |
23 | /// |
24 | /// This type is an async version of [`std::net::TcpListener`]. |
25 | /// |
26 | /// [`bind`]: #method.bind |
27 | /// [`incoming`]: #method.incoming |
28 | /// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 |
29 | /// [`std::net::TcpListener`]: https://doc.rust-lang.org/std/net/struct.TcpListener.html |
30 | /// |
31 | /// # Examples |
32 | /// |
33 | /// ```no_run |
34 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
35 | /// # |
36 | /// use async_std::io; |
37 | /// use async_std::net::TcpListener; |
38 | /// use async_std::prelude::*; |
39 | /// |
40 | /// let listener = TcpListener::bind("127.0.0.1:8080" ).await?; |
41 | /// let mut incoming = listener.incoming(); |
42 | /// |
43 | /// while let Some(stream) = incoming.next().await { |
44 | /// let stream = stream?; |
45 | /// let (reader, writer) = &mut (&stream, &stream); |
46 | /// io::copy(reader, writer).await?; |
47 | /// } |
48 | /// # |
49 | /// # Ok(()) }) } |
50 | /// ``` |
51 | #[derive (Debug)] |
52 | pub struct TcpListener { |
53 | watcher: Async<std::net::TcpListener>, |
54 | } |
55 | |
56 | impl TcpListener { |
57 | /// Creates a new `TcpListener` which will be bound to the specified address. |
58 | /// |
59 | /// The returned listener is ready for accepting connections. |
60 | /// |
61 | /// Binding with a port number of 0 will request that the OS assigns a port to this listener. |
62 | /// The port allocated can be queried via the [`local_addr`] method. |
63 | /// |
64 | /// # Examples |
65 | /// Create a TCP listener bound to 127.0.0.1:0: |
66 | /// |
67 | /// ```no_run |
68 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
69 | /// # |
70 | /// use async_std::net::TcpListener; |
71 | /// |
72 | /// let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
73 | /// # |
74 | /// # Ok(()) }) } |
75 | /// ``` |
76 | /// |
77 | /// [`local_addr`]: #method.local_addr |
78 | pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> { |
79 | let mut last_err = None; |
80 | let addrs = addrs.to_socket_addrs().await?; |
81 | |
82 | for addr in addrs { |
83 | match Async::<std::net::TcpListener>::bind(addr) { |
84 | Ok(listener) => { |
85 | return Ok(TcpListener { watcher: listener }); |
86 | } |
87 | Err(err) => last_err = Some(err), |
88 | } |
89 | } |
90 | |
91 | Err(last_err.unwrap_or_else(|| { |
92 | io::Error::new( |
93 | io::ErrorKind::InvalidInput, |
94 | "could not resolve to any addresses" , |
95 | ) |
96 | })) |
97 | } |
98 | |
99 | /// Accepts a new incoming connection to this listener. |
100 | /// |
101 | /// When a connection is established, the corresponding stream and address will be returned. |
102 | /// |
103 | /// ## Examples |
104 | /// |
105 | /// ```no_run |
106 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
107 | /// # |
108 | /// use async_std::net::TcpListener; |
109 | /// |
110 | /// let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
111 | /// let (stream, addr) = listener.accept().await?; |
112 | /// # |
113 | /// # Ok(()) }) } |
114 | /// ``` |
115 | pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { |
116 | let (stream, addr) = self.watcher.accept().await?; |
117 | let stream = TcpStream { |
118 | watcher: Arc::new(stream), |
119 | }; |
120 | Ok((stream, addr)) |
121 | } |
122 | |
123 | /// Returns a stream of incoming connections. |
124 | /// |
125 | /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of |
126 | /// connections is infinite, i.e awaiting the next connection will never result in [`None`]. |
127 | /// |
128 | /// [`accept`]: #method.accept |
129 | /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None |
130 | /// |
131 | /// ## Examples |
132 | /// |
133 | /// ```no_run |
134 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
135 | /// # |
136 | /// use async_std::net::TcpListener; |
137 | /// use async_std::prelude::*; |
138 | /// |
139 | /// let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
140 | /// let mut incoming = listener.incoming(); |
141 | /// |
142 | /// while let Some(stream) = incoming.next().await { |
143 | /// let mut stream = stream?; |
144 | /// stream.write_all(b"hello world" ).await?; |
145 | /// } |
146 | /// # |
147 | /// # Ok(()) }) } |
148 | /// ``` |
149 | pub fn incoming(&self) -> Incoming<'_> { |
150 | Incoming { |
151 | incoming: Box::pin(self.watcher.incoming()), |
152 | } |
153 | } |
154 | |
155 | /// Turn this into a stream over the connections being received on this |
156 | /// listener. |
157 | /// |
158 | /// The returned stream is infinite and will also not yield |
159 | /// the peer's [`SocketAddr`] structure. Iterating over it is equivalent to |
160 | /// calling [`TcpListener::accept`] in a loop. |
161 | /// |
162 | /// ## Examples |
163 | /// |
164 | /// Merge the incoming connections of multiple sockets into one [`Stream`]: |
165 | /// |
166 | /// ```no_run |
167 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
168 | /// # |
169 | /// use async_std::net::TcpListener; |
170 | /// |
171 | /// // Our server listens on multiple ports for some reason |
172 | /// let listeners = vec![ |
173 | /// TcpListener::bind("[::0]:8080").await?, |
174 | /// TcpListener::bind("[::0]:12345").await?, |
175 | /// TcpListener::bind("[::0]:5678").await?, |
176 | /// ]; |
177 | /// // Iterate over all incoming connections |
178 | /// let incoming = futures::stream::select_all( |
179 | /// listeners.into_iter() |
180 | /// .map(TcpListener::into_incoming) |
181 | /// .map(Box::pin) |
182 | /// ); |
183 | /// # |
184 | /// # Ok(()) }) } |
185 | /// ``` |
186 | #[cfg (feature = "unstable" )] |
187 | pub fn into_incoming(self) -> impl Stream<Item = io::Result<TcpStream>> + Send { |
188 | futures_lite::stream::unfold(self, |listener| async move { |
189 | let res = listener.accept().await.map(|(stream, _)| stream); |
190 | Some((res, listener)) |
191 | }) |
192 | } |
193 | |
194 | /// Returns the local address that this listener is bound to. |
195 | /// |
196 | /// This can be useful, for example, to identify when binding to port 0 which port was assigned |
197 | /// by the OS. |
198 | /// |
199 | /// # Examples |
200 | /// |
201 | /// ```no_run |
202 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
203 | /// # |
204 | /// use async_std::net::TcpListener; |
205 | /// |
206 | /// let listener = TcpListener::bind("127.0.0.1:8080" ).await?; |
207 | /// let addr = listener.local_addr()?; |
208 | /// # |
209 | /// # Ok(()) }) } |
210 | /// ``` |
211 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
212 | self.watcher.get_ref().local_addr() |
213 | } |
214 | } |
215 | |
216 | /// A stream of incoming TCP connections. |
217 | /// |
218 | /// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is |
219 | /// created by the [`incoming`] method on [`TcpListener`]. |
220 | /// |
221 | /// This type is an async version of [`std::net::Incoming`]. |
222 | /// |
223 | /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None |
224 | /// [`incoming`]: struct.TcpListener.html#method.incoming |
225 | /// [`TcpListener`]: struct.TcpListener.html |
226 | /// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html |
227 | pub struct Incoming<'a> { |
228 | incoming: Pin<Box<dyn Stream<Item = io::Result<Async<StdTcpStream>>> + Send + Sync + 'a>>, |
229 | } |
230 | |
231 | impl Stream for Incoming<'_> { |
232 | type Item = io::Result<TcpStream>; |
233 | |
234 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
235 | let res: Option, …>> = ready!(Pin::new(&mut self.incoming).poll_next(cx)); |
236 | Poll::Ready(res.map(|res: Result, …>| res.map(|stream: Async| TcpStream { watcher: Arc::new(data:stream) }))) |
237 | } |
238 | } |
239 | |
240 | impl fmt::Debug for Incoming<'_> { |
241 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
242 | write!(f, "Incoming {{ ... }}" ) |
243 | } |
244 | } |
245 | |
246 | impl From<std::net::TcpListener> for TcpListener { |
247 | /// Converts a `std::net::TcpListener` into its asynchronous equivalent. |
248 | fn from(listener: std::net::TcpListener) -> TcpListener { |
249 | TcpListener { |
250 | watcher: Async::new(listener).expect(msg:"TcpListener is known to be good" ), |
251 | } |
252 | } |
253 | } |
254 | |
255 | impl std::convert::TryFrom<TcpListener> for std::net::TcpListener { |
256 | type Error = io::Error; |
257 | /// Converts a `TcpListener` into its synchronous equivalent. |
258 | fn try_from(listener: TcpListener) -> io::Result<std::net::TcpListener> { |
259 | let inner: TcpListener = listener.watcher.into_inner()?; |
260 | inner.set_nonblocking(false)?; |
261 | Ok(inner) |
262 | } |
263 | } |
264 | |
265 | cfg_unix! { |
266 | use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; |
267 | |
268 | impl AsRawFd for TcpListener { |
269 | fn as_raw_fd(&self) -> RawFd { |
270 | self.watcher.get_ref().as_raw_fd() |
271 | } |
272 | } |
273 | |
274 | impl FromRawFd for TcpListener { |
275 | unsafe fn from_raw_fd(fd: RawFd) -> TcpListener { |
276 | std::net::TcpListener::from_raw_fd(fd).into() |
277 | } |
278 | } |
279 | |
280 | impl IntoRawFd for TcpListener { |
281 | fn into_raw_fd(self) -> RawFd { |
282 | self.watcher.into_inner().unwrap().into_raw_fd() |
283 | } |
284 | } |
285 | |
286 | cfg_io_safety! { |
287 | use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; |
288 | |
289 | impl AsFd for TcpListener { |
290 | fn as_fd(&self) -> BorrowedFd<'_> { |
291 | self.watcher.get_ref().as_fd() |
292 | } |
293 | } |
294 | |
295 | impl From<OwnedFd> for TcpListener { |
296 | fn from(fd: OwnedFd) -> TcpListener { |
297 | std::net::TcpListener::from(fd).into() |
298 | } |
299 | } |
300 | |
301 | impl From<TcpListener> for OwnedFd { |
302 | fn from(listener: TcpListener) -> OwnedFd { |
303 | listener.watcher.into_inner().unwrap().into() |
304 | } |
305 | } |
306 | } |
307 | } |
308 | |
309 | cfg_windows! { |
310 | use crate::os::windows::io::{ |
311 | AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket, |
312 | }; |
313 | |
314 | impl AsRawSocket for TcpListener { |
315 | fn as_raw_socket(&self) -> RawSocket { |
316 | self.watcher.as_raw_socket() |
317 | } |
318 | } |
319 | |
320 | impl FromRawSocket for TcpListener { |
321 | unsafe fn from_raw_socket(handle: RawSocket) -> TcpListener { |
322 | std::net::TcpListener::from_raw_socket(handle).into() |
323 | } |
324 | } |
325 | |
326 | impl IntoRawSocket for TcpListener { |
327 | fn into_raw_socket(self) -> RawSocket { |
328 | self.watcher.into_inner().unwrap().into_raw_socket() |
329 | } |
330 | } |
331 | |
332 | cfg_io_safety! { |
333 | use crate::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket}; |
334 | |
335 | impl AsSocket for TcpListener { |
336 | fn as_socket(&self) -> BorrowedSocket<'_> { |
337 | self.watcher.get_ref().as_socket() |
338 | } |
339 | } |
340 | |
341 | impl From<OwnedSocket> for TcpListener { |
342 | fn from(fd: OwnedSocket) -> TcpListener { |
343 | std::net::TcpListener::from(fd).into() |
344 | } |
345 | } |
346 | |
347 | impl From<TcpListener> for OwnedSocket { |
348 | fn from(listener: TcpListener) -> OwnedSocket { |
349 | listener.watcher.into_inner().unwrap().into() |
350 | } |
351 | } |
352 | } |
353 | } |
354 | |