| 1 | use std::fmt; |
| 2 | use std::net::SocketAddr; |
| 3 | use std::net::TcpStream as StdTcpStream; |
| 4 | use std::pin::Pin; |
| 5 | |
| 6 | use async_io::Async; |
| 7 | |
| 8 | use crate::io; |
| 9 | use crate::net::{TcpStream, ToSocketAddrs}; |
| 10 | use crate::stream::Stream; |
| 11 | use crate::sync::Arc; |
| 12 | use crate::task::{ready, Context, Poll}; |
| 13 | |
| 14 | /// A TCP socket server, listening for connections. |
| 15 | /// |
| 16 | /// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming |
| 17 | /// TCP connections. These can be accepted by awaiting elements from the async stream of |
| 18 | /// [`incoming`] connections. |
| 19 | /// |
| 20 | /// The socket will be closed when the value is dropped. |
| 21 | /// |
| 22 | /// The Transmission Control Protocol is specified in [IETF RFC 793]. |
| 23 | /// |
| 24 | /// This type is an async version of [`std::net::TcpListener`]. |
| 25 | /// |
| 26 | /// [`bind`]: #method.bind |
| 27 | /// [`incoming`]: #method.incoming |
| 28 | /// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 |
| 29 | /// [`std::net::TcpListener`]: https://doc.rust-lang.org/std/net/struct.TcpListener.html |
| 30 | /// |
| 31 | /// # Examples |
| 32 | /// |
| 33 | /// ```no_run |
| 34 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 35 | /// # |
| 36 | /// use async_std::io; |
| 37 | /// use async_std::net::TcpListener; |
| 38 | /// use async_std::prelude::*; |
| 39 | /// |
| 40 | /// let listener = TcpListener::bind("127.0.0.1:8080" ).await?; |
| 41 | /// let mut incoming = listener.incoming(); |
| 42 | /// |
| 43 | /// while let Some(stream) = incoming.next().await { |
| 44 | /// let stream = stream?; |
| 45 | /// let (reader, writer) = &mut (&stream, &stream); |
| 46 | /// io::copy(reader, writer).await?; |
| 47 | /// } |
| 48 | /// # |
| 49 | /// # Ok(()) }) } |
| 50 | /// ``` |
| 51 | #[derive (Debug)] |
| 52 | pub struct TcpListener { |
| 53 | watcher: Async<std::net::TcpListener>, |
| 54 | } |
| 55 | |
| 56 | impl TcpListener { |
| 57 | /// Creates a new `TcpListener` which will be bound to the specified address. |
| 58 | /// |
| 59 | /// The returned listener is ready for accepting connections. |
| 60 | /// |
| 61 | /// Binding with a port number of 0 will request that the OS assigns a port to this listener. |
| 62 | /// The port allocated can be queried via the [`local_addr`] method. |
| 63 | /// |
| 64 | /// # Examples |
| 65 | /// Create a TCP listener bound to 127.0.0.1:0: |
| 66 | /// |
| 67 | /// ```no_run |
| 68 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 69 | /// # |
| 70 | /// use async_std::net::TcpListener; |
| 71 | /// |
| 72 | /// let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
| 73 | /// # |
| 74 | /// # Ok(()) }) } |
| 75 | /// ``` |
| 76 | /// |
| 77 | /// [`local_addr`]: #method.local_addr |
| 78 | pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> { |
| 79 | let mut last_err = None; |
| 80 | let addrs = addrs.to_socket_addrs().await?; |
| 81 | |
| 82 | for addr in addrs { |
| 83 | match Async::<std::net::TcpListener>::bind(addr) { |
| 84 | Ok(listener) => { |
| 85 | return Ok(TcpListener { watcher: listener }); |
| 86 | } |
| 87 | Err(err) => last_err = Some(err), |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | Err(last_err.unwrap_or_else(|| { |
| 92 | io::Error::new( |
| 93 | io::ErrorKind::InvalidInput, |
| 94 | "could not resolve to any addresses" , |
| 95 | ) |
| 96 | })) |
| 97 | } |
| 98 | |
| 99 | /// Accepts a new incoming connection to this listener. |
| 100 | /// |
| 101 | /// When a connection is established, the corresponding stream and address will be returned. |
| 102 | /// |
| 103 | /// ## Examples |
| 104 | /// |
| 105 | /// ```no_run |
| 106 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 107 | /// # |
| 108 | /// use async_std::net::TcpListener; |
| 109 | /// |
| 110 | /// let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
| 111 | /// let (stream, addr) = listener.accept().await?; |
| 112 | /// # |
| 113 | /// # Ok(()) }) } |
| 114 | /// ``` |
| 115 | pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { |
| 116 | let (stream, addr) = self.watcher.accept().await?; |
| 117 | let stream = TcpStream { |
| 118 | watcher: Arc::new(stream), |
| 119 | }; |
| 120 | Ok((stream, addr)) |
| 121 | } |
| 122 | |
| 123 | /// Returns a stream of incoming connections. |
| 124 | /// |
| 125 | /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of |
| 126 | /// connections is infinite, i.e awaiting the next connection will never result in [`None`]. |
| 127 | /// |
| 128 | /// [`accept`]: #method.accept |
| 129 | /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None |
| 130 | /// |
| 131 | /// ## Examples |
| 132 | /// |
| 133 | /// ```no_run |
| 134 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 135 | /// # |
| 136 | /// use async_std::net::TcpListener; |
| 137 | /// use async_std::prelude::*; |
| 138 | /// |
| 139 | /// let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
| 140 | /// let mut incoming = listener.incoming(); |
| 141 | /// |
| 142 | /// while let Some(stream) = incoming.next().await { |
| 143 | /// let mut stream = stream?; |
| 144 | /// stream.write_all(b"hello world" ).await?; |
| 145 | /// } |
| 146 | /// # |
| 147 | /// # Ok(()) }) } |
| 148 | /// ``` |
| 149 | pub fn incoming(&self) -> Incoming<'_> { |
| 150 | Incoming { |
| 151 | incoming: Box::pin(self.watcher.incoming()), |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | /// Turn this into a stream over the connections being received on this |
| 156 | /// listener. |
| 157 | /// |
| 158 | /// The returned stream is infinite and will also not yield |
| 159 | /// the peer's [`SocketAddr`] structure. Iterating over it is equivalent to |
| 160 | /// calling [`TcpListener::accept`] in a loop. |
| 161 | /// |
| 162 | /// ## Examples |
| 163 | /// |
| 164 | /// Merge the incoming connections of multiple sockets into one [`Stream`]: |
| 165 | /// |
| 166 | /// ```no_run |
| 167 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 168 | /// # |
| 169 | /// use async_std::net::TcpListener; |
| 170 | /// |
| 171 | /// // Our server listens on multiple ports for some reason |
| 172 | /// let listeners = vec![ |
| 173 | /// TcpListener::bind("[::0]:8080").await?, |
| 174 | /// TcpListener::bind("[::0]:12345").await?, |
| 175 | /// TcpListener::bind("[::0]:5678").await?, |
| 176 | /// ]; |
| 177 | /// // Iterate over all incoming connections |
| 178 | /// let incoming = futures::stream::select_all( |
| 179 | /// listeners.into_iter() |
| 180 | /// .map(TcpListener::into_incoming) |
| 181 | /// .map(Box::pin) |
| 182 | /// ); |
| 183 | /// # |
| 184 | /// # Ok(()) }) } |
| 185 | /// ``` |
| 186 | #[cfg (feature = "unstable" )] |
| 187 | pub fn into_incoming(self) -> impl Stream<Item = io::Result<TcpStream>> + Send { |
| 188 | futures_lite::stream::unfold(self, |listener| async move { |
| 189 | let res = listener.accept().await.map(|(stream, _)| stream); |
| 190 | Some((res, listener)) |
| 191 | }) |
| 192 | } |
| 193 | |
| 194 | /// Returns the local address that this listener is bound to. |
| 195 | /// |
| 196 | /// This can be useful, for example, to identify when binding to port 0 which port was assigned |
| 197 | /// by the OS. |
| 198 | /// |
| 199 | /// # Examples |
| 200 | /// |
| 201 | /// ```no_run |
| 202 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 203 | /// # |
| 204 | /// use async_std::net::TcpListener; |
| 205 | /// |
| 206 | /// let listener = TcpListener::bind("127.0.0.1:8080" ).await?; |
| 207 | /// let addr = listener.local_addr()?; |
| 208 | /// # |
| 209 | /// # Ok(()) }) } |
| 210 | /// ``` |
| 211 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
| 212 | self.watcher.get_ref().local_addr() |
| 213 | } |
| 214 | } |
| 215 | |
| 216 | /// A stream of incoming TCP connections. |
| 217 | /// |
| 218 | /// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is |
| 219 | /// created by the [`incoming`] method on [`TcpListener`]. |
| 220 | /// |
| 221 | /// This type is an async version of [`std::net::Incoming`]. |
| 222 | /// |
| 223 | /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None |
| 224 | /// [`incoming`]: struct.TcpListener.html#method.incoming |
| 225 | /// [`TcpListener`]: struct.TcpListener.html |
| 226 | /// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html |
| 227 | pub struct Incoming<'a> { |
| 228 | incoming: Pin<Box<dyn Stream<Item = io::Result<Async<StdTcpStream>>> + Send + Sync + 'a>>, |
| 229 | } |
| 230 | |
| 231 | impl Stream for Incoming<'_> { |
| 232 | type Item = io::Result<TcpStream>; |
| 233 | |
| 234 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 235 | let res: Option, …>> = ready!(Pin::new(&mut self.incoming).poll_next(cx)); |
| 236 | Poll::Ready(res.map(|res: Result, …>| res.map(|stream: Async| TcpStream { watcher: Arc::new(data:stream) }))) |
| 237 | } |
| 238 | } |
| 239 | |
| 240 | impl fmt::Debug for Incoming<'_> { |
| 241 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 242 | write!(f, "Incoming {{ ... }}" ) |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | impl From<std::net::TcpListener> for TcpListener { |
| 247 | /// Converts a `std::net::TcpListener` into its asynchronous equivalent. |
| 248 | fn from(listener: std::net::TcpListener) -> TcpListener { |
| 249 | TcpListener { |
| 250 | watcher: Async::new(listener).expect(msg:"TcpListener is known to be good" ), |
| 251 | } |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | impl std::convert::TryFrom<TcpListener> for std::net::TcpListener { |
| 256 | type Error = io::Error; |
| 257 | /// Converts a `TcpListener` into its synchronous equivalent. |
| 258 | fn try_from(listener: TcpListener) -> io::Result<std::net::TcpListener> { |
| 259 | let inner: TcpListener = listener.watcher.into_inner()?; |
| 260 | inner.set_nonblocking(false)?; |
| 261 | Ok(inner) |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | cfg_unix! { |
| 266 | use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; |
| 267 | |
| 268 | impl AsRawFd for TcpListener { |
| 269 | fn as_raw_fd(&self) -> RawFd { |
| 270 | self.watcher.get_ref().as_raw_fd() |
| 271 | } |
| 272 | } |
| 273 | |
| 274 | impl FromRawFd for TcpListener { |
| 275 | unsafe fn from_raw_fd(fd: RawFd) -> TcpListener { |
| 276 | std::net::TcpListener::from_raw_fd(fd).into() |
| 277 | } |
| 278 | } |
| 279 | |
| 280 | impl IntoRawFd for TcpListener { |
| 281 | fn into_raw_fd(self) -> RawFd { |
| 282 | self.watcher.into_inner().unwrap().into_raw_fd() |
| 283 | } |
| 284 | } |
| 285 | |
| 286 | cfg_io_safety! { |
| 287 | use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; |
| 288 | |
| 289 | impl AsFd for TcpListener { |
| 290 | fn as_fd(&self) -> BorrowedFd<'_> { |
| 291 | self.watcher.get_ref().as_fd() |
| 292 | } |
| 293 | } |
| 294 | |
| 295 | impl From<OwnedFd> for TcpListener { |
| 296 | fn from(fd: OwnedFd) -> TcpListener { |
| 297 | std::net::TcpListener::from(fd).into() |
| 298 | } |
| 299 | } |
| 300 | |
| 301 | impl From<TcpListener> for OwnedFd { |
| 302 | fn from(listener: TcpListener) -> OwnedFd { |
| 303 | listener.watcher.into_inner().unwrap().into() |
| 304 | } |
| 305 | } |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | cfg_windows! { |
| 310 | use crate::os::windows::io::{ |
| 311 | AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket, |
| 312 | }; |
| 313 | |
| 314 | impl AsRawSocket for TcpListener { |
| 315 | fn as_raw_socket(&self) -> RawSocket { |
| 316 | self.watcher.as_raw_socket() |
| 317 | } |
| 318 | } |
| 319 | |
| 320 | impl FromRawSocket for TcpListener { |
| 321 | unsafe fn from_raw_socket(handle: RawSocket) -> TcpListener { |
| 322 | std::net::TcpListener::from_raw_socket(handle).into() |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | impl IntoRawSocket for TcpListener { |
| 327 | fn into_raw_socket(self) -> RawSocket { |
| 328 | self.watcher.into_inner().unwrap().into_raw_socket() |
| 329 | } |
| 330 | } |
| 331 | |
| 332 | cfg_io_safety! { |
| 333 | use crate::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket}; |
| 334 | |
| 335 | impl AsSocket for TcpListener { |
| 336 | fn as_socket(&self) -> BorrowedSocket<'_> { |
| 337 | self.watcher.get_ref().as_socket() |
| 338 | } |
| 339 | } |
| 340 | |
| 341 | impl From<OwnedSocket> for TcpListener { |
| 342 | fn from(fd: OwnedSocket) -> TcpListener { |
| 343 | std::net::TcpListener::from(fd).into() |
| 344 | } |
| 345 | } |
| 346 | |
| 347 | impl From<TcpListener> for OwnedSocket { |
| 348 | fn from(listener: TcpListener) -> OwnedSocket { |
| 349 | listener.watcher.into_inner().unwrap().into() |
| 350 | } |
| 351 | } |
| 352 | } |
| 353 | } |
| 354 | |