1 | use std::fmt; |
2 | use std::io::{self, IoSlice, Read as _, Write as _}; |
3 | use std::net::{Shutdown, SocketAddr}; |
4 | #[cfg (unix)] |
5 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; |
6 | #[cfg (windows)] |
7 | use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket}; |
8 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
9 | use std::pin::Pin; |
10 | use std::sync::Arc; |
11 | use std::task::{Context, Poll}; |
12 | |
13 | use async_io::Async; |
14 | use futures_lite::{prelude::*, ready}; |
15 | |
16 | use crate::addr::AsyncToSocketAddrs; |
17 | |
18 | /// A TCP server, listening for connections. |
19 | /// |
20 | /// After creating a [`TcpListener`] by [`bind`][`TcpListener::bind()`]ing it to an address, it |
21 | /// listens for incoming TCP connections. These can be accepted by calling |
22 | /// [`accept()`][`TcpListener::accept()`] or by awaiting items from the stream of |
23 | /// [`incoming`][`TcpListener::incoming()`] connections. |
24 | /// |
25 | /// Cloning a [`TcpListener`] creates another handle to the same socket. The socket will be closed |
26 | /// when all handles to it are dropped. |
27 | /// |
28 | /// The Transmission Control Protocol is specified in [IETF RFC 793]. |
29 | /// |
30 | /// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 |
31 | /// |
32 | /// # Examples |
33 | /// |
34 | /// ```no_run |
35 | /// use async_net::TcpListener; |
36 | /// use futures_lite::prelude::*; |
37 | /// |
38 | /// # futures_lite::future::block_on(async { |
39 | /// let listener = TcpListener::bind("127.0.0.1:8080" ).await?; |
40 | /// let mut incoming = listener.incoming(); |
41 | /// |
42 | /// while let Some(stream) = incoming.next().await { |
43 | /// let mut stream = stream?; |
44 | /// stream.write_all(b"hello" ).await?; |
45 | /// } |
46 | /// # std::io::Result::Ok(()) }); |
47 | /// ``` |
48 | #[derive (Clone, Debug)] |
49 | pub struct TcpListener { |
50 | inner: Arc<Async<std::net::TcpListener>>, |
51 | } |
52 | |
53 | impl TcpListener { |
54 | fn new(inner: Arc<Async<std::net::TcpListener>>) -> TcpListener { |
55 | TcpListener { inner } |
56 | } |
57 | |
58 | /// Creates a new [`TcpListener`] bound to the given address. |
59 | /// |
60 | /// Binding with a port number of 0 will request that the operating system assigns an available |
61 | /// port to this listener. The assigned port can be queried via the |
62 | /// [`local_addr()`][`TcpListener::local_addr()`] method. |
63 | /// |
64 | /// If `addr` yields multiple addresses, binding will be attempted with each of the addresses |
65 | /// until one succeeds and returns the listener. If none of the addresses succeed in creating a |
66 | /// listener, the error from the last attempt is returned. |
67 | /// |
68 | /// # Examples |
69 | /// |
70 | /// Create a TCP listener bound to `127.0.0.1:80`: |
71 | /// |
72 | /// ```no_run |
73 | /// use async_net::TcpListener; |
74 | /// |
75 | /// # futures_lite::future::block_on(async { |
76 | /// let listener = TcpListener::bind("127.0.0.1:80" ).await?; |
77 | /// # std::io::Result::Ok(()) }); |
78 | /// ``` |
79 | /// |
80 | /// Create a TCP listener bound to `127.0.0.1:80`. If that address is unavailable, then try |
81 | /// binding to `127.0.0.1:443`: |
82 | /// |
83 | /// ```no_run |
84 | /// use async_net::{SocketAddr, TcpListener}; |
85 | /// |
86 | /// # futures_lite::future::block_on(async { |
87 | /// let addrs = [ |
88 | /// SocketAddr::from(([127, 0, 0, 1], 80)), |
89 | /// SocketAddr::from(([127, 0, 0, 1], 443)), |
90 | /// ]; |
91 | /// let listener = TcpListener::bind(&addrs[..]).await.unwrap(); |
92 | /// # std::io::Result::Ok(()) }); |
93 | pub async fn bind<A: AsyncToSocketAddrs>(addr: A) -> io::Result<TcpListener> { |
94 | let mut last_err = None; |
95 | |
96 | for addr in addr.to_socket_addrs().await? { |
97 | match Async::<std::net::TcpListener>::bind(addr) { |
98 | Ok(listener) => return Ok(TcpListener::new(Arc::new(listener))), |
99 | Err(err) => last_err = Some(err), |
100 | } |
101 | } |
102 | |
103 | Err(last_err.unwrap_or_else(|| { |
104 | io::Error::new( |
105 | io::ErrorKind::InvalidInput, |
106 | "could not resolve to any of the addresses" , |
107 | ) |
108 | })) |
109 | } |
110 | |
111 | /// Returns the local address this listener is bound to. |
112 | /// |
113 | /// # Examples |
114 | /// |
115 | /// Bind to port 0 and then see which port was assigned by the operating system: |
116 | /// |
117 | /// ```no_run |
118 | /// use async_net::{SocketAddr, TcpListener}; |
119 | /// |
120 | /// # futures_lite::future::block_on(async { |
121 | /// let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
122 | /// println!("Listening on {}" , listener.local_addr()?); |
123 | /// # std::io::Result::Ok(()) }); |
124 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
125 | self.inner.get_ref().local_addr() |
126 | } |
127 | |
128 | /// Accepts a new incoming connection. |
129 | /// |
130 | /// Returns a TCP stream and the address it is connected to. |
131 | /// |
132 | /// # Examples |
133 | /// |
134 | /// ```no_run |
135 | /// use async_net::TcpListener; |
136 | /// |
137 | /// # futures_lite::future::block_on(async { |
138 | /// let listener = TcpListener::bind("127.0.0.1:8080" ).await?; |
139 | /// let (stream, addr) = listener.accept().await?; |
140 | /// # std::io::Result::Ok(()) }); |
141 | /// ``` |
142 | pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { |
143 | let (stream, addr) = self.inner.accept().await?; |
144 | Ok((TcpStream::new(Arc::new(stream)), addr)) |
145 | } |
146 | |
147 | /// Returns a stream of incoming connections. |
148 | /// |
149 | /// Iterating over this stream is equivalent to calling [`accept()`][`TcpListener::accept()`] |
150 | /// in a loop. The stream of connections is infinite, i.e awaiting the next connection will |
151 | /// never result in [`None`]. |
152 | /// |
153 | /// # Examples |
154 | /// |
155 | /// ```no_run |
156 | /// use async_net::TcpListener; |
157 | /// use futures_lite::prelude::*; |
158 | /// |
159 | /// # futures_lite::future::block_on(async { |
160 | /// let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
161 | /// let mut incoming = listener.incoming(); |
162 | /// |
163 | /// while let Some(stream) = incoming.next().await { |
164 | /// let mut stream = stream?; |
165 | /// stream.write_all(b"hello" ).await?; |
166 | /// } |
167 | /// # std::io::Result::Ok(()) }); |
168 | /// ``` |
169 | pub fn incoming(&self) -> Incoming<'_> { |
170 | Incoming { |
171 | incoming: Box::pin(self.inner.incoming()), |
172 | } |
173 | } |
174 | |
175 | /// Gets the value of the `IP_TTL` option for this socket. |
176 | /// |
177 | /// This option configures the time-to-live field that is used in every packet sent from this |
178 | /// socket. |
179 | /// |
180 | /// # Examples |
181 | /// |
182 | /// ```no_run |
183 | /// use async_net::TcpListener; |
184 | /// |
185 | /// # futures_lite::future::block_on(async { |
186 | /// let listener = TcpListener::bind("127.0.0.1:80" ).await?; |
187 | /// listener.set_ttl(100)?; |
188 | /// assert_eq!(listener.ttl()?, 100); |
189 | /// # std::io::Result::Ok(()) }); |
190 | /// ``` |
191 | pub fn ttl(&self) -> io::Result<u32> { |
192 | self.inner.get_ref().ttl() |
193 | } |
194 | |
195 | /// Sets the value of the `IP_TTL` option for this socket. |
196 | /// |
197 | /// This option configures the time-to-live field that is used in every packet sent from this |
198 | /// socket. |
199 | /// |
200 | /// # Examples |
201 | /// |
202 | /// ```no_run |
203 | /// use async_net::TcpListener; |
204 | /// |
205 | /// # futures_lite::future::block_on(async { |
206 | /// let listener = TcpListener::bind("127.0.0.1:80" ).await?; |
207 | /// listener.set_ttl(100)?; |
208 | /// # std::io::Result::Ok(()) }); |
209 | /// ``` |
210 | pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { |
211 | self.inner.get_ref().set_ttl(ttl) |
212 | } |
213 | } |
214 | |
215 | impl From<Async<std::net::TcpListener>> for TcpListener { |
216 | fn from(listener: Async<std::net::TcpListener>) -> TcpListener { |
217 | TcpListener::new(inner:Arc::new(data:listener)) |
218 | } |
219 | } |
220 | |
221 | impl TryFrom<std::net::TcpListener> for TcpListener { |
222 | type Error = io::Error; |
223 | |
224 | fn try_from(listener: std::net::TcpListener) -> io::Result<TcpListener> { |
225 | Ok(TcpListener::new(inner:Arc::new(data:Async::new(io:listener)?))) |
226 | } |
227 | } |
228 | |
229 | impl From<TcpListener> for Arc<Async<std::net::TcpListener>> { |
230 | fn from(val: TcpListener) -> Self { |
231 | val.inner |
232 | } |
233 | } |
234 | |
235 | #[cfg (unix)] |
236 | impl AsRawFd for TcpListener { |
237 | fn as_raw_fd(&self) -> RawFd { |
238 | self.inner.as_raw_fd() |
239 | } |
240 | } |
241 | |
242 | #[cfg (unix)] |
243 | impl AsFd for TcpListener { |
244 | fn as_fd(&self) -> BorrowedFd<'_> { |
245 | self.inner.get_ref().as_fd() |
246 | } |
247 | } |
248 | |
249 | #[cfg (unix)] |
250 | impl TryFrom<OwnedFd> for TcpListener { |
251 | type Error = io::Error; |
252 | |
253 | fn try_from(value: OwnedFd) -> Result<Self, Self::Error> { |
254 | Self::try_from(std::net::TcpListener::from(value)) |
255 | } |
256 | } |
257 | |
258 | #[cfg (windows)] |
259 | impl AsRawSocket for TcpListener { |
260 | fn as_raw_socket(&self) -> RawSocket { |
261 | self.inner.as_raw_socket() |
262 | } |
263 | } |
264 | |
265 | #[cfg (windows)] |
266 | impl AsSocket for TcpListener { |
267 | fn as_socket(&self) -> BorrowedSocket<'_> { |
268 | self.inner.get_ref().as_socket() |
269 | } |
270 | } |
271 | |
272 | #[cfg (windows)] |
273 | impl TryFrom<OwnedSocket> for TcpListener { |
274 | type Error = io::Error; |
275 | |
276 | fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> { |
277 | Self::try_from(std::net::TcpListener::from(value)) |
278 | } |
279 | } |
280 | |
281 | /// A stream of incoming TCP connections. |
282 | /// |
283 | /// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is |
284 | /// created by the [`TcpListener::incoming()`] method. |
285 | pub struct Incoming<'a> { |
286 | incoming: |
287 | Pin<Box<dyn Stream<Item = io::Result<Async<std::net::TcpStream>>> + Send + Sync + 'a>>, |
288 | } |
289 | |
290 | impl Stream for Incoming<'_> { |
291 | type Item = io::Result<TcpStream>; |
292 | |
293 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
294 | let res: Option, …>> = ready!(Pin::new(&mut self.incoming).poll_next(cx)); |
295 | Poll::Ready(res.map(|res: Result, …>| res.map(|stream: Async| TcpStream::new(inner:Arc::new(data:stream))))) |
296 | } |
297 | } |
298 | |
299 | impl fmt::Debug for Incoming<'_> { |
300 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
301 | write!(f, "Incoming {{ ... }}" ) |
302 | } |
303 | } |
304 | |
305 | /// A TCP connection. |
306 | /// |
307 | /// A [`TcpStream`] can be created by [`connect`][`TcpStream::connect()`]ing to an endpoint or by |
308 | /// [`accept`][`TcpListener::accept()`]ing an incoming connection. |
309 | /// |
310 | /// [`TcpStream`] is a bidirectional stream that implements traits [`AsyncRead`] and |
311 | /// [`AsyncWrite`]. |
312 | /// |
313 | /// Cloning a [`TcpStream`] creates another handle to the same socket. The socket will be closed |
314 | /// when all handles to it are dropped. The reading and writing portions of the connection can also |
315 | /// be shut down individually with the [`shutdown()`][`TcpStream::shutdown()`] method. |
316 | /// |
317 | /// The Transmission Control Protocol is specified in [IETF RFC 793]. |
318 | /// |
319 | /// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 |
320 | /// |
321 | /// # Examples |
322 | /// |
323 | /// ```no_run |
324 | /// use async_net::TcpStream; |
325 | /// use futures_lite::prelude::*; |
326 | /// |
327 | /// # futures_lite::future::block_on(async { |
328 | /// let mut stream = TcpStream::connect("127.0.0.1:8080" ).await?; |
329 | /// stream.write_all(b"hello" ).await?; |
330 | /// |
331 | /// let mut buf = vec![0u8; 1024]; |
332 | /// let n = stream.read(&mut buf).await?; |
333 | /// # std::io::Result::Ok(()) }); |
334 | /// ``` |
335 | pub struct TcpStream { |
336 | inner: Arc<Async<std::net::TcpStream>>, |
337 | readable: Option<async_io::ReadableOwned<std::net::TcpStream>>, |
338 | writable: Option<async_io::WritableOwned<std::net::TcpStream>>, |
339 | } |
340 | |
341 | impl UnwindSafe for TcpStream {} |
342 | impl RefUnwindSafe for TcpStream {} |
343 | |
344 | impl TcpStream { |
345 | fn new(inner: Arc<Async<std::net::TcpStream>>) -> TcpStream { |
346 | TcpStream { |
347 | inner, |
348 | readable: None, |
349 | writable: None, |
350 | } |
351 | } |
352 | |
353 | /// Creates a TCP connection to the specified address. |
354 | /// |
355 | /// This method will create a new TCP socket and attempt to connect it to the provided `addr`, |
356 | /// |
357 | /// If `addr` yields multiple addresses, connecting will be attempted with each of the |
358 | /// addresses until connecting to one succeeds. If none of the addresses result in a successful |
359 | /// connection, the error from the last connect attempt is returned. |
360 | /// |
361 | /// # Examples |
362 | /// |
363 | /// Connect to `example.com:80`: |
364 | /// |
365 | /// ``` |
366 | /// use async_net::TcpStream; |
367 | /// |
368 | /// # futures_lite::future::block_on(async { |
369 | /// let stream = TcpStream::connect("example.com:80" ).await?; |
370 | /// # std::io::Result::Ok(()) }); |
371 | /// ``` |
372 | /// |
373 | /// Connect to `127.0.0.1:8080`. If that fails, then try connecting to `127.0.0.1:8081`: |
374 | /// |
375 | /// ```no_run |
376 | /// use async_net::{SocketAddr, TcpStream}; |
377 | /// |
378 | /// # futures_lite::future::block_on(async { |
379 | /// let addrs = [ |
380 | /// SocketAddr::from(([127, 0, 0, 1], 8080)), |
381 | /// SocketAddr::from(([127, 0, 0, 1], 8081)), |
382 | /// ]; |
383 | /// let stream = TcpStream::connect(&addrs[..]).await?; |
384 | /// # std::io::Result::Ok(()) }); |
385 | /// ``` |
386 | pub async fn connect<A: AsyncToSocketAddrs>(addr: A) -> io::Result<TcpStream> { |
387 | let mut last_err = None; |
388 | |
389 | for addr in addr.to_socket_addrs().await? { |
390 | match Async::<std::net::TcpStream>::connect(addr).await { |
391 | Ok(stream) => return Ok(TcpStream::new(Arc::new(stream))), |
392 | Err(e) => last_err = Some(e), |
393 | } |
394 | } |
395 | |
396 | Err(last_err.unwrap_or_else(|| { |
397 | io::Error::new( |
398 | io::ErrorKind::InvalidInput, |
399 | "could not connect to any of the addresses" , |
400 | ) |
401 | })) |
402 | } |
403 | |
404 | /// Returns the local address this stream is bound to. |
405 | /// |
406 | /// # Examples |
407 | /// |
408 | /// ``` |
409 | /// use async_net::TcpStream; |
410 | /// |
411 | /// # futures_lite::future::block_on(async { |
412 | /// let stream = TcpStream::connect("example.com:80" ).await?; |
413 | /// println!("Local address is {}" , stream.local_addr()?); |
414 | /// # std::io::Result::Ok(()) }); |
415 | /// ``` |
416 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
417 | self.inner.get_ref().local_addr() |
418 | } |
419 | |
420 | /// Returns the remote address this stream is connected to. |
421 | /// |
422 | /// # Examples |
423 | /// |
424 | /// ``` |
425 | /// use async_net::TcpStream; |
426 | /// |
427 | /// # futures_lite::future::block_on(async { |
428 | /// let stream = TcpStream::connect("example.com:80" ).await?; |
429 | /// println!("Connected to {}" , stream.peer_addr()?); |
430 | /// # std::io::Result::Ok(()) }); |
431 | /// ``` |
432 | pub fn peer_addr(&self) -> io::Result<SocketAddr> { |
433 | self.inner.get_ref().peer_addr() |
434 | } |
435 | |
436 | /// Shuts down the read half, write half, or both halves of this connection. |
437 | /// |
438 | /// This method will cause all pending and future I/O in the given directions to return |
439 | /// immediately with an appropriate value (see the documentation of [`Shutdown`]). |
440 | /// |
441 | /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html |
442 | /// |
443 | /// # Examples |
444 | /// |
445 | /// ```no_run |
446 | /// use async_net::{Shutdown, TcpStream}; |
447 | /// |
448 | /// # futures_lite::future::block_on(async { |
449 | /// let stream = TcpStream::connect("127.0.0.1:8080" ).await?; |
450 | /// stream.shutdown(Shutdown::Both)?; |
451 | /// # std::io::Result::Ok(()) }); |
452 | /// ``` |
453 | pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> { |
454 | self.inner.get_ref().shutdown(how) |
455 | } |
456 | |
457 | /// Receives data without removing it from the queue. |
458 | /// |
459 | /// On success, returns the number of bytes peeked. |
460 | /// |
461 | /// Successive calls return the same data. This is accomplished by passing `MSG_PEEK` as a flag |
462 | /// to the underlying `recv` system call. |
463 | /// |
464 | /// # Examples |
465 | /// |
466 | /// ```no_run |
467 | /// use async_net::TcpStream; |
468 | /// |
469 | /// # futures_lite::future::block_on(async { |
470 | /// let stream = TcpStream::connect("127.0.0.1:8080" ).await?; |
471 | /// |
472 | /// let mut buf = vec![0; 1024]; |
473 | /// let n = stream.peek(&mut buf).await?; |
474 | /// # std::io::Result::Ok(()) }); |
475 | /// ``` |
476 | pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { |
477 | self.inner.peek(buf).await |
478 | } |
479 | |
480 | /// Gets the value of the `TCP_NODELAY` option for this socket. |
481 | /// |
482 | /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that |
483 | /// written data is always sent as soon as possible, even if there is only a small amount of |
484 | /// it. |
485 | /// |
486 | /// When set to `false`, written data is buffered until there is a certain amount to send out, |
487 | /// thereby avoiding the frequent sending of small packets. |
488 | /// |
489 | /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm |
490 | /// |
491 | /// # Examples |
492 | /// |
493 | /// ```no_run |
494 | /// use async_net::TcpStream; |
495 | /// |
496 | /// # futures_lite::future::block_on(async { |
497 | /// let stream = TcpStream::connect("127.0.0.1:8080" ).await?; |
498 | /// println!("TCP_NODELAY is set to {}" , stream.nodelay()?); |
499 | /// # std::io::Result::Ok(()) }); |
500 | /// ``` |
501 | pub fn nodelay(&self) -> io::Result<bool> { |
502 | self.inner.get_ref().nodelay() |
503 | } |
504 | |
505 | /// Sets the value of the `TCP_NODELAY` option for this socket. |
506 | /// |
507 | /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that |
508 | /// written data is always sent as soon as possible, even if there is only a small amount of |
509 | /// it. |
510 | /// |
511 | /// When set to `false`, written data is buffered until there is a certain amount to send out, |
512 | /// thereby avoiding the frequent sending of small packets. |
513 | /// |
514 | /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm |
515 | /// |
516 | /// # Examples |
517 | /// |
518 | /// ```no_run |
519 | /// use async_net::TcpStream; |
520 | /// |
521 | /// # futures_lite::future::block_on(async { |
522 | /// let stream = TcpStream::connect("127.0.0.1:8080" ).await?; |
523 | /// stream.set_nodelay(false)?; |
524 | /// # std::io::Result::Ok(()) }); |
525 | /// ``` |
526 | pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { |
527 | self.inner.get_ref().set_nodelay(nodelay) |
528 | } |
529 | |
530 | /// Gets the value of the `IP_TTL` option for this socket. |
531 | /// |
532 | /// This option configures the time-to-live field that is used in every packet sent from this |
533 | /// socket. |
534 | /// |
535 | /// # Examples |
536 | /// |
537 | /// ```no_run |
538 | /// use async_net::TcpStream; |
539 | /// |
540 | /// # futures_lite::future::block_on(async { |
541 | /// let stream = TcpStream::connect("127.0.0.1:8080" ).await?; |
542 | /// println!("IP_TTL is set to {}" , stream.ttl()?); |
543 | /// # std::io::Result::Ok(()) }); |
544 | /// ``` |
545 | pub fn ttl(&self) -> io::Result<u32> { |
546 | self.inner.get_ref().ttl() |
547 | } |
548 | |
549 | /// Sets the value of the `IP_TTL` option for this socket. |
550 | /// |
551 | /// This option configures the time-to-live field that is used in every packet sent from this |
552 | /// socket. |
553 | /// |
554 | /// # Examples |
555 | /// |
556 | /// ```no_run |
557 | /// use async_net::TcpStream; |
558 | /// |
559 | /// # futures_lite::future::block_on(async { |
560 | /// let stream = TcpStream::connect("127.0.0.1:8080" ).await?; |
561 | /// stream.set_ttl(100)?; |
562 | /// # std::io::Result::Ok(()) }); |
563 | /// ``` |
564 | pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { |
565 | self.inner.get_ref().set_ttl(ttl) |
566 | } |
567 | } |
568 | |
569 | impl fmt::Debug for TcpStream { |
570 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
571 | self.inner.fmt(f) |
572 | } |
573 | } |
574 | |
575 | impl Clone for TcpStream { |
576 | fn clone(&self) -> TcpStream { |
577 | TcpStream::new(self.inner.clone()) |
578 | } |
579 | } |
580 | |
581 | impl From<Async<std::net::TcpStream>> for TcpStream { |
582 | fn from(stream: Async<std::net::TcpStream>) -> TcpStream { |
583 | TcpStream::new(inner:Arc::new(data:stream)) |
584 | } |
585 | } |
586 | |
587 | impl From<TcpStream> for Arc<Async<std::net::TcpStream>> { |
588 | fn from(val: TcpStream) -> Self { |
589 | val.inner |
590 | } |
591 | } |
592 | |
593 | impl TryFrom<std::net::TcpStream> for TcpStream { |
594 | type Error = io::Error; |
595 | |
596 | fn try_from(stream: std::net::TcpStream) -> io::Result<TcpStream> { |
597 | Ok(TcpStream::new(inner:Arc::new(data:Async::new(io:stream)?))) |
598 | } |
599 | } |
600 | |
601 | #[cfg (unix)] |
602 | impl AsRawFd for TcpStream { |
603 | fn as_raw_fd(&self) -> RawFd { |
604 | self.inner.as_raw_fd() |
605 | } |
606 | } |
607 | |
608 | #[cfg (unix)] |
609 | impl AsFd for TcpStream { |
610 | fn as_fd(&self) -> BorrowedFd<'_> { |
611 | self.inner.get_ref().as_fd() |
612 | } |
613 | } |
614 | |
615 | #[cfg (unix)] |
616 | impl TryFrom<OwnedFd> for TcpStream { |
617 | type Error = io::Error; |
618 | |
619 | fn try_from(value: OwnedFd) -> Result<Self, Self::Error> { |
620 | Self::try_from(std::net::TcpStream::from(value)) |
621 | } |
622 | } |
623 | |
624 | #[cfg (windows)] |
625 | impl AsRawSocket for TcpStream { |
626 | fn as_raw_socket(&self) -> RawSocket { |
627 | self.inner.as_raw_socket() |
628 | } |
629 | } |
630 | |
631 | #[cfg (windows)] |
632 | impl AsSocket for TcpStream { |
633 | fn as_socket(&self) -> BorrowedSocket<'_> { |
634 | self.inner.get_ref().as_socket() |
635 | } |
636 | } |
637 | |
638 | #[cfg (windows)] |
639 | impl TryFrom<OwnedSocket> for TcpStream { |
640 | type Error = io::Error; |
641 | |
642 | fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> { |
643 | Self::try_from(std::net::TcpStream::from(value)) |
644 | } |
645 | } |
646 | |
647 | impl AsyncRead for TcpStream { |
648 | fn poll_read( |
649 | mut self: Pin<&mut Self>, |
650 | cx: &mut Context<'_>, |
651 | buf: &mut [u8], |
652 | ) -> Poll<io::Result<usize>> { |
653 | loop { |
654 | // Attempt the non-blocking operation. |
655 | match self.inner.get_ref().read(buf) { |
656 | Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} |
657 | res => { |
658 | self.readable = None; |
659 | return Poll::Ready(res); |
660 | } |
661 | } |
662 | |
663 | // Initialize the future to wait for readiness. |
664 | if self.readable.is_none() { |
665 | self.readable = Some(self.inner.clone().readable_owned()); |
666 | } |
667 | |
668 | // Poll the future for readiness. |
669 | if let Some(f) = &mut self.readable { |
670 | let res = ready!(Pin::new(f).poll(cx)); |
671 | self.readable = None; |
672 | res?; |
673 | } |
674 | } |
675 | } |
676 | } |
677 | |
678 | impl AsyncWrite for TcpStream { |
679 | fn poll_write( |
680 | mut self: Pin<&mut Self>, |
681 | cx: &mut Context<'_>, |
682 | buf: &[u8], |
683 | ) -> Poll<io::Result<usize>> { |
684 | loop { |
685 | // Attempt the non-blocking operation. |
686 | match self.inner.get_ref().write(buf) { |
687 | Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} |
688 | res => { |
689 | self.writable = None; |
690 | return Poll::Ready(res); |
691 | } |
692 | } |
693 | |
694 | // Initialize the future to wait for readiness. |
695 | if self.writable.is_none() { |
696 | self.writable = Some(self.inner.clone().writable_owned()); |
697 | } |
698 | |
699 | // Poll the future for readiness. |
700 | if let Some(f) = &mut self.writable { |
701 | let res = ready!(Pin::new(f).poll(cx)); |
702 | self.writable = None; |
703 | res?; |
704 | } |
705 | } |
706 | } |
707 | |
708 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
709 | loop { |
710 | // Attempt the non-blocking operation. |
711 | match self.inner.get_ref().flush() { |
712 | Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} |
713 | res => { |
714 | self.writable = None; |
715 | return Poll::Ready(res); |
716 | } |
717 | } |
718 | |
719 | // Initialize the future to wait for readiness. |
720 | if self.writable.is_none() { |
721 | self.writable = Some(self.inner.clone().writable_owned()); |
722 | } |
723 | |
724 | // Poll the future for readiness. |
725 | if let Some(f) = &mut self.writable { |
726 | let res = ready!(Pin::new(f).poll(cx)); |
727 | self.writable = None; |
728 | res?; |
729 | } |
730 | } |
731 | } |
732 | |
733 | fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
734 | Poll::Ready(self.inner.get_ref().shutdown(Shutdown::Write)) |
735 | } |
736 | |
737 | fn poll_write_vectored( |
738 | mut self: Pin<&mut Self>, |
739 | cx: &mut Context<'_>, |
740 | bufs: &[IoSlice<'_>], |
741 | ) -> Poll<io::Result<usize>> { |
742 | loop { |
743 | // Attempt the non-blocking operation. |
744 | match self.inner.get_ref().write_vectored(bufs) { |
745 | Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} |
746 | res => { |
747 | self.writable = None; |
748 | return Poll::Ready(res); |
749 | } |
750 | } |
751 | |
752 | // Initialize the future to wait for readiness. |
753 | if self.writable.is_none() { |
754 | self.writable = Some(self.inner.clone().writable_owned()); |
755 | } |
756 | |
757 | // Poll the future for readiness. |
758 | if let Some(f) = &mut self.writable { |
759 | let res = ready!(Pin::new(f).poll(cx)); |
760 | self.writable = None; |
761 | res?; |
762 | } |
763 | } |
764 | } |
765 | } |
766 | |