1cfg_not_wasi! {
2 use crate::future::poll_fn;
3 use crate::net::{to_socket_addrs, ToSocketAddrs};
4 use std::time::Duration;
5}
6
7use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10
11use std::fmt;
12use std::io;
13use std::net::{Shutdown, SocketAddr};
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17cfg_io_util! {
18 use bytes::BufMut;
19}
20
21cfg_net! {
22 /// A TCP stream between a local and a remote socket.
23 ///
24 /// A TCP stream can either be created by connecting to an endpoint, via the
25 /// [`connect`] method, or by [accepting] a connection from a [listener]. A
26 /// TCP stream can also be created via the [`TcpSocket`] type.
27 ///
28 /// Reading and writing to a `TcpStream` is usually done using the
29 /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
30 /// traits.
31 ///
32 /// [`connect`]: method@TcpStream::connect
33 /// [accepting]: method@crate::net::TcpListener::accept
34 /// [listener]: struct@crate::net::TcpListener
35 /// [`TcpSocket`]: struct@crate::net::TcpSocket
36 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
37 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
38 ///
39 /// # Examples
40 ///
41 /// ```no_run
42 /// use tokio::net::TcpStream;
43 /// use tokio::io::AsyncWriteExt;
44 /// use std::error::Error;
45 ///
46 /// #[tokio::main]
47 /// async fn main() -> Result<(), Box<dyn Error>> {
48 /// // Connect to a peer
49 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
50 ///
51 /// // Write some data.
52 /// stream.write_all(b"hello world!").await?;
53 ///
54 /// Ok(())
55 /// }
56 /// ```
57 ///
58 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
59 ///
60 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
61 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
62 ///
63 /// To shut down the stream in the write direction, you can call the
64 /// [`shutdown()`] method. This will cause the other peer to receive a read of
65 /// length 0, indicating that no more data will be sent. This only closes
66 /// the stream in one direction.
67 ///
68 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
69 pub struct TcpStream {
70 io: PollEvented<mio::net::TcpStream>,
71 }
72}
73
74impl TcpStream {
75 cfg_not_wasi! {
76 /// Opens a TCP connection to a remote host.
77 ///
78 /// `addr` is an address of the remote host. Anything which implements the
79 /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
80 /// yields multiple addresses, connect will be attempted with each of the
81 /// addresses until a connection is successful. If none of the addresses
82 /// result in a successful connection, the error returned from the last
83 /// connection attempt (the last address) is returned.
84 ///
85 /// To configure the socket before connecting, you can use the [`TcpSocket`]
86 /// type.
87 ///
88 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
89 /// [`TcpSocket`]: struct@crate::net::TcpSocket
90 ///
91 /// # Examples
92 ///
93 /// ```no_run
94 /// use tokio::net::TcpStream;
95 /// use tokio::io::AsyncWriteExt;
96 /// use std::error::Error;
97 ///
98 /// #[tokio::main]
99 /// async fn main() -> Result<(), Box<dyn Error>> {
100 /// // Connect to a peer
101 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
102 ///
103 /// // Write some data.
104 /// stream.write_all(b"hello world!").await?;
105 ///
106 /// Ok(())
107 /// }
108 /// ```
109 ///
110 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
111 ///
112 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
113 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
114 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
115 let addrs = to_socket_addrs(addr).await?;
116
117 let mut last_err = None;
118
119 for addr in addrs {
120 match TcpStream::connect_addr(addr).await {
121 Ok(stream) => return Ok(stream),
122 Err(e) => last_err = Some(e),
123 }
124 }
125
126 Err(last_err.unwrap_or_else(|| {
127 io::Error::new(
128 io::ErrorKind::InvalidInput,
129 "could not resolve to any address",
130 )
131 }))
132 }
133
134 /// Establishes a connection to the specified `addr`.
135 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
136 let sys = mio::net::TcpStream::connect(addr)?;
137 TcpStream::connect_mio(sys).await
138 }
139
140 pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
141 let stream = TcpStream::new(sys)?;
142
143 // Once we've connected, wait for the stream to be writable as
144 // that's when the actual connection has been initiated. Once we're
145 // writable we check for `take_socket_error` to see if the connect
146 // actually hit an error or not.
147 //
148 // If all that succeeded then we ship everything on up.
149 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
150
151 if let Some(e) = stream.io.take_error()? {
152 return Err(e);
153 }
154
155 Ok(stream)
156 }
157 }
158
159 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
160 let io = PollEvented::new(connected)?;
161 Ok(TcpStream { io })
162 }
163
164 /// Creates new `TcpStream` from a `std::net::TcpStream`.
165 ///
166 /// This function is intended to be used to wrap a TCP stream from the
167 /// standard library in the Tokio equivalent.
168 ///
169 /// # Notes
170 ///
171 /// The caller is responsible for ensuring that the stream is in
172 /// non-blocking mode. Otherwise all I/O operations on the stream
173 /// will block the thread, which will cause unexpected behavior.
174 /// Non-blocking mode can be set using [`set_nonblocking`].
175 ///
176 /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
177 ///
178 /// # Examples
179 ///
180 /// ```rust,no_run
181 /// use std::error::Error;
182 /// use tokio::net::TcpStream;
183 ///
184 /// #[tokio::main]
185 /// async fn main() -> Result<(), Box<dyn Error>> {
186 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
187 /// std_stream.set_nonblocking(true)?;
188 /// let stream = TcpStream::from_std(std_stream)?;
189 /// Ok(())
190 /// }
191 /// ```
192 ///
193 /// # Panics
194 ///
195 /// This function panics if it is not called from within a runtime with
196 /// IO enabled.
197 ///
198 /// The runtime is usually set implicitly when this function is called
199 /// from a future driven by a tokio runtime, otherwise runtime can be set
200 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
201 #[track_caller]
202 pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
203 let io = mio::net::TcpStream::from_std(stream);
204 let io = PollEvented::new(io)?;
205 Ok(TcpStream { io })
206 }
207
208 /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
209 ///
210 /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
211 /// Use [`set_nonblocking`] to change the blocking mode if needed.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use std::error::Error;
217 /// use std::io::Read;
218 /// use tokio::net::TcpListener;
219 /// # use tokio::net::TcpStream;
220 /// # use tokio::io::AsyncWriteExt;
221 ///
222 /// #[tokio::main]
223 /// async fn main() -> Result<(), Box<dyn Error>> {
224 /// let mut data = [0u8; 12];
225 /// # if false {
226 /// let listener = TcpListener::bind("127.0.0.1:34254").await?;
227 /// # }
228 /// # let listener = TcpListener::bind("127.0.0.1:0").await?;
229 /// # let addr = listener.local_addr().unwrap();
230 /// # let handle = tokio::spawn(async move {
231 /// # let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap();
232 /// # stream.write_all(b"Hello world!").await.unwrap();
233 /// # });
234 /// let (tokio_tcp_stream, _) = listener.accept().await?;
235 /// let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
236 /// # handle.await.expect("The task being joined has panicked");
237 /// std_tcp_stream.set_nonblocking(false)?;
238 /// std_tcp_stream.read_exact(&mut data)?;
239 /// # assert_eq!(b"Hello world!", &data);
240 /// Ok(())
241 /// }
242 /// ```
243 /// [`tokio::net::TcpStream`]: TcpStream
244 /// [`std::net::TcpStream`]: std::net::TcpStream
245 /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
246 pub fn into_std(self) -> io::Result<std::net::TcpStream> {
247 #[cfg(unix)]
248 {
249 use std::os::unix::io::{FromRawFd, IntoRawFd};
250 self.io
251 .into_inner()
252 .map(IntoRawFd::into_raw_fd)
253 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
254 }
255
256 #[cfg(windows)]
257 {
258 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
259 self.io
260 .into_inner()
261 .map(|io| io.into_raw_socket())
262 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
263 }
264
265 #[cfg(target_os = "wasi")]
266 {
267 use std::os::wasi::io::{FromRawFd, IntoRawFd};
268 self.io
269 .into_inner()
270 .map(|io| io.into_raw_fd())
271 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
272 }
273 }
274
275 /// Returns the local address that this stream is bound to.
276 ///
277 /// # Examples
278 ///
279 /// ```no_run
280 /// use tokio::net::TcpStream;
281 ///
282 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
283 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
284 ///
285 /// println!("{:?}", stream.local_addr()?);
286 /// # Ok(())
287 /// # }
288 /// ```
289 pub fn local_addr(&self) -> io::Result<SocketAddr> {
290 self.io.local_addr()
291 }
292
293 /// Returns the value of the `SO_ERROR` option.
294 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
295 self.io.take_error()
296 }
297
298 /// Returns the remote address that this stream is connected to.
299 ///
300 /// # Examples
301 ///
302 /// ```no_run
303 /// use tokio::net::TcpStream;
304 ///
305 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
306 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
307 ///
308 /// println!("{:?}", stream.peer_addr()?);
309 /// # Ok(())
310 /// # }
311 /// ```
312 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
313 self.io.peer_addr()
314 }
315
316 /// Attempts to receive data on the socket, without removing that data from
317 /// the queue, registering the current task for wakeup if data is not yet
318 /// available.
319 ///
320 /// Note that on multiple calls to `poll_peek`, `poll_read` or
321 /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
322 /// most recent call is scheduled to receive a wakeup. (However,
323 /// `poll_write` retains a second, independent waker.)
324 ///
325 /// # Return value
326 ///
327 /// The function returns:
328 ///
329 /// * `Poll::Pending` if data is not yet available.
330 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
331 /// * `Poll::Ready(Err(e))` if an error is encountered.
332 ///
333 /// # Errors
334 ///
335 /// This function may encounter any standard I/O error except `WouldBlock`.
336 ///
337 /// # Examples
338 ///
339 /// ```no_run
340 /// use tokio::io::{self, ReadBuf};
341 /// use tokio::net::TcpStream;
342 ///
343 /// use futures::future::poll_fn;
344 ///
345 /// #[tokio::main]
346 /// async fn main() -> io::Result<()> {
347 /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
348 /// let mut buf = [0; 10];
349 /// let mut buf = ReadBuf::new(&mut buf);
350 ///
351 /// poll_fn(|cx| {
352 /// stream.poll_peek(cx, &mut buf)
353 /// }).await?;
354 ///
355 /// Ok(())
356 /// }
357 /// ```
358 pub fn poll_peek(
359 &self,
360 cx: &mut Context<'_>,
361 buf: &mut ReadBuf<'_>,
362 ) -> Poll<io::Result<usize>> {
363 loop {
364 let ev = ready!(self.io.registration().poll_read_ready(cx))?;
365
366 let b = unsafe {
367 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
368 };
369
370 match self.io.peek(b) {
371 Ok(ret) => {
372 unsafe { buf.assume_init(ret) };
373 buf.advance(ret);
374 return Poll::Ready(Ok(ret));
375 }
376 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
377 self.io.registration().clear_readiness(ev);
378 }
379 Err(e) => return Poll::Ready(Err(e)),
380 }
381 }
382 }
383
384 /// Waits for any of the requested ready states.
385 ///
386 /// This function is usually paired with `try_read()` or `try_write()`. It
387 /// can be used to concurrently read / write to the same socket on a single
388 /// task without splitting the socket.
389 ///
390 /// The function may complete without the socket being ready. This is a
391 /// false-positive and attempting an operation will return with
392 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
393 /// [`Ready`] set, so you should always check the returned value and possibly
394 /// wait again if the requested states are not set.
395 ///
396 /// # Cancel safety
397 ///
398 /// This method is cancel safe. Once a readiness event occurs, the method
399 /// will continue to return immediately until the readiness event is
400 /// consumed by an attempt to read or write that fails with `WouldBlock` or
401 /// `Poll::Pending`.
402 ///
403 /// # Examples
404 ///
405 /// Concurrently read and write to the stream on the same task without
406 /// splitting.
407 ///
408 /// ```no_run
409 /// use tokio::io::Interest;
410 /// use tokio::net::TcpStream;
411 /// use std::error::Error;
412 /// use std::io;
413 ///
414 /// #[tokio::main]
415 /// async fn main() -> Result<(), Box<dyn Error>> {
416 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
417 ///
418 /// loop {
419 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
420 ///
421 /// if ready.is_readable() {
422 /// let mut data = vec![0; 1024];
423 /// // Try to read data, this may still fail with `WouldBlock`
424 /// // if the readiness event is a false positive.
425 /// match stream.try_read(&mut data) {
426 /// Ok(n) => {
427 /// println!("read {} bytes", n);
428 /// }
429 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
430 /// continue;
431 /// }
432 /// Err(e) => {
433 /// return Err(e.into());
434 /// }
435 /// }
436 ///
437 /// }
438 ///
439 /// if ready.is_writable() {
440 /// // Try to write data, this may still fail with `WouldBlock`
441 /// // if the readiness event is a false positive.
442 /// match stream.try_write(b"hello world") {
443 /// Ok(n) => {
444 /// println!("write {} bytes", n);
445 /// }
446 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
447 /// continue
448 /// }
449 /// Err(e) => {
450 /// return Err(e.into());
451 /// }
452 /// }
453 /// }
454 /// }
455 /// }
456 /// ```
457 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
458 let event = self.io.registration().readiness(interest).await?;
459 Ok(event.ready)
460 }
461
462 /// Waits for the socket to become readable.
463 ///
464 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
465 /// paired with `try_read()`.
466 ///
467 /// # Cancel safety
468 ///
469 /// This method is cancel safe. Once a readiness event occurs, the method
470 /// will continue to return immediately until the readiness event is
471 /// consumed by an attempt to read that fails with `WouldBlock` or
472 /// `Poll::Pending`.
473 ///
474 /// # Examples
475 ///
476 /// ```no_run
477 /// use tokio::net::TcpStream;
478 /// use std::error::Error;
479 /// use std::io;
480 ///
481 /// #[tokio::main]
482 /// async fn main() -> Result<(), Box<dyn Error>> {
483 /// // Connect to a peer
484 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
485 ///
486 /// let mut msg = vec![0; 1024];
487 ///
488 /// loop {
489 /// // Wait for the socket to be readable
490 /// stream.readable().await?;
491 ///
492 /// // Try to read data, this may still fail with `WouldBlock`
493 /// // if the readiness event is a false positive.
494 /// match stream.try_read(&mut msg) {
495 /// Ok(n) => {
496 /// msg.truncate(n);
497 /// break;
498 /// }
499 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
500 /// continue;
501 /// }
502 /// Err(e) => {
503 /// return Err(e.into());
504 /// }
505 /// }
506 /// }
507 ///
508 /// println!("GOT = {:?}", msg);
509 /// Ok(())
510 /// }
511 /// ```
512 pub async fn readable(&self) -> io::Result<()> {
513 self.ready(Interest::READABLE).await?;
514 Ok(())
515 }
516
517 /// Polls for read readiness.
518 ///
519 /// If the tcp stream is not currently ready for reading, this method will
520 /// store a clone of the `Waker` from the provided `Context`. When the tcp
521 /// stream becomes ready for reading, `Waker::wake` will be called on the
522 /// waker.
523 ///
524 /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
525 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
526 /// recent call is scheduled to receive a wakeup. (However,
527 /// `poll_write_ready` retains a second, independent waker.)
528 ///
529 /// This function is intended for cases where creating and pinning a future
530 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
531 /// preferred, as this supports polling from multiple tasks at once.
532 ///
533 /// # Return value
534 ///
535 /// The function returns:
536 ///
537 /// * `Poll::Pending` if the tcp stream is not ready for reading.
538 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
539 /// * `Poll::Ready(Err(e))` if an error is encountered.
540 ///
541 /// # Errors
542 ///
543 /// This function may encounter any standard I/O error except `WouldBlock`.
544 ///
545 /// [`readable`]: method@Self::readable
546 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
547 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
548 }
549
550 /// Tries to read data from the stream into the provided buffer, returning how
551 /// many bytes were read.
552 ///
553 /// Receives any pending data from the socket but does not wait for new data
554 /// to arrive. On success, returns the number of bytes read. Because
555 /// `try_read()` is non-blocking, the buffer does not have to be stored by
556 /// the async task and can exist entirely on the stack.
557 ///
558 /// Usually, [`readable()`] or [`ready()`] is used with this function.
559 ///
560 /// [`readable()`]: TcpStream::readable()
561 /// [`ready()`]: TcpStream::ready()
562 ///
563 /// # Return
564 ///
565 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
566 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
567 ///
568 /// 1. The stream's read half is closed and will no longer yield data.
569 /// 2. The specified buffer was 0 bytes in length.
570 ///
571 /// If the stream is not ready to read data,
572 /// `Err(io::ErrorKind::WouldBlock)` is returned.
573 ///
574 /// # Examples
575 ///
576 /// ```no_run
577 /// use tokio::net::TcpStream;
578 /// use std::error::Error;
579 /// use std::io;
580 ///
581 /// #[tokio::main]
582 /// async fn main() -> Result<(), Box<dyn Error>> {
583 /// // Connect to a peer
584 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
585 ///
586 /// loop {
587 /// // Wait for the socket to be readable
588 /// stream.readable().await?;
589 ///
590 /// // Creating the buffer **after** the `await` prevents it from
591 /// // being stored in the async task.
592 /// let mut buf = [0; 4096];
593 ///
594 /// // Try to read data, this may still fail with `WouldBlock`
595 /// // if the readiness event is a false positive.
596 /// match stream.try_read(&mut buf) {
597 /// Ok(0) => break,
598 /// Ok(n) => {
599 /// println!("read {} bytes", n);
600 /// }
601 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
602 /// continue;
603 /// }
604 /// Err(e) => {
605 /// return Err(e.into());
606 /// }
607 /// }
608 /// }
609 ///
610 /// Ok(())
611 /// }
612 /// ```
613 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
614 use std::io::Read;
615
616 self.io
617 .registration()
618 .try_io(Interest::READABLE, || (&*self.io).read(buf))
619 }
620
621 /// Tries to read data from the stream into the provided buffers, returning
622 /// how many bytes were read.
623 ///
624 /// Data is copied to fill each buffer in order, with the final buffer
625 /// written to possibly being only partially filled. This method behaves
626 /// equivalently to a single call to [`try_read()`] with concatenated
627 /// buffers.
628 ///
629 /// Receives any pending data from the socket but does not wait for new data
630 /// to arrive. On success, returns the number of bytes read. Because
631 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
632 /// stored by the async task and can exist entirely on the stack.
633 ///
634 /// Usually, [`readable()`] or [`ready()`] is used with this function.
635 ///
636 /// [`try_read()`]: TcpStream::try_read()
637 /// [`readable()`]: TcpStream::readable()
638 /// [`ready()`]: TcpStream::ready()
639 ///
640 /// # Return
641 ///
642 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
643 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
644 /// and will no longer yield data. If the stream is not ready to read data
645 /// `Err(io::ErrorKind::WouldBlock)` is returned.
646 ///
647 /// # Examples
648 ///
649 /// ```no_run
650 /// use tokio::net::TcpStream;
651 /// use std::error::Error;
652 /// use std::io::{self, IoSliceMut};
653 ///
654 /// #[tokio::main]
655 /// async fn main() -> Result<(), Box<dyn Error>> {
656 /// // Connect to a peer
657 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
658 ///
659 /// loop {
660 /// // Wait for the socket to be readable
661 /// stream.readable().await?;
662 ///
663 /// // Creating the buffer **after** the `await` prevents it from
664 /// // being stored in the async task.
665 /// let mut buf_a = [0; 512];
666 /// let mut buf_b = [0; 1024];
667 /// let mut bufs = [
668 /// IoSliceMut::new(&mut buf_a),
669 /// IoSliceMut::new(&mut buf_b),
670 /// ];
671 ///
672 /// // Try to read data, this may still fail with `WouldBlock`
673 /// // if the readiness event is a false positive.
674 /// match stream.try_read_vectored(&mut bufs) {
675 /// Ok(0) => break,
676 /// Ok(n) => {
677 /// println!("read {} bytes", n);
678 /// }
679 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
680 /// continue;
681 /// }
682 /// Err(e) => {
683 /// return Err(e.into());
684 /// }
685 /// }
686 /// }
687 ///
688 /// Ok(())
689 /// }
690 /// ```
691 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
692 use std::io::Read;
693
694 self.io
695 .registration()
696 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
697 }
698
699 cfg_io_util! {
700 /// Tries to read data from the stream into the provided buffer, advancing the
701 /// buffer's internal cursor, returning how many bytes were read.
702 ///
703 /// Receives any pending data from the socket but does not wait for new data
704 /// to arrive. On success, returns the number of bytes read. Because
705 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
706 /// the async task and can exist entirely on the stack.
707 ///
708 /// Usually, [`readable()`] or [`ready()`] is used with this function.
709 ///
710 /// [`readable()`]: TcpStream::readable()
711 /// [`ready()`]: TcpStream::ready()
712 ///
713 /// # Return
714 ///
715 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
716 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
717 /// and will no longer yield data. If the stream is not ready to read data
718 /// `Err(io::ErrorKind::WouldBlock)` is returned.
719 ///
720 /// # Examples
721 ///
722 /// ```no_run
723 /// use tokio::net::TcpStream;
724 /// use std::error::Error;
725 /// use std::io;
726 ///
727 /// #[tokio::main]
728 /// async fn main() -> Result<(), Box<dyn Error>> {
729 /// // Connect to a peer
730 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
731 ///
732 /// loop {
733 /// // Wait for the socket to be readable
734 /// stream.readable().await?;
735 ///
736 /// let mut buf = Vec::with_capacity(4096);
737 ///
738 /// // Try to read data, this may still fail with `WouldBlock`
739 /// // if the readiness event is a false positive.
740 /// match stream.try_read_buf(&mut buf) {
741 /// Ok(0) => break,
742 /// Ok(n) => {
743 /// println!("read {} bytes", n);
744 /// }
745 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
746 /// continue;
747 /// }
748 /// Err(e) => {
749 /// return Err(e.into());
750 /// }
751 /// }
752 /// }
753 ///
754 /// Ok(())
755 /// }
756 /// ```
757 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
758 self.io.registration().try_io(Interest::READABLE, || {
759 use std::io::Read;
760
761 let dst = buf.chunk_mut();
762 let dst =
763 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
764
765 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
766 // buffer.
767 let n = (&*self.io).read(dst)?;
768
769 unsafe {
770 buf.advance_mut(n);
771 }
772
773 Ok(n)
774 })
775 }
776 }
777
778 /// Waits for the socket to become writable.
779 ///
780 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
781 /// paired with `try_write()`.
782 ///
783 /// # Cancel safety
784 ///
785 /// This method is cancel safe. Once a readiness event occurs, the method
786 /// will continue to return immediately until the readiness event is
787 /// consumed by an attempt to write that fails with `WouldBlock` or
788 /// `Poll::Pending`.
789 ///
790 /// # Examples
791 ///
792 /// ```no_run
793 /// use tokio::net::TcpStream;
794 /// use std::error::Error;
795 /// use std::io;
796 ///
797 /// #[tokio::main]
798 /// async fn main() -> Result<(), Box<dyn Error>> {
799 /// // Connect to a peer
800 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
801 ///
802 /// loop {
803 /// // Wait for the socket to be writable
804 /// stream.writable().await?;
805 ///
806 /// // Try to write data, this may still fail with `WouldBlock`
807 /// // if the readiness event is a false positive.
808 /// match stream.try_write(b"hello world") {
809 /// Ok(n) => {
810 /// break;
811 /// }
812 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
813 /// continue;
814 /// }
815 /// Err(e) => {
816 /// return Err(e.into());
817 /// }
818 /// }
819 /// }
820 ///
821 /// Ok(())
822 /// }
823 /// ```
824 pub async fn writable(&self) -> io::Result<()> {
825 self.ready(Interest::WRITABLE).await?;
826 Ok(())
827 }
828
829 /// Polls for write readiness.
830 ///
831 /// If the tcp stream is not currently ready for writing, this method will
832 /// store a clone of the `Waker` from the provided `Context`. When the tcp
833 /// stream becomes ready for writing, `Waker::wake` will be called on the
834 /// waker.
835 ///
836 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
837 /// the `Waker` from the `Context` passed to the most recent call is
838 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
839 /// second, independent waker.)
840 ///
841 /// This function is intended for cases where creating and pinning a future
842 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
843 /// preferred, as this supports polling from multiple tasks at once.
844 ///
845 /// # Return value
846 ///
847 /// The function returns:
848 ///
849 /// * `Poll::Pending` if the tcp stream is not ready for writing.
850 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
851 /// * `Poll::Ready(Err(e))` if an error is encountered.
852 ///
853 /// # Errors
854 ///
855 /// This function may encounter any standard I/O error except `WouldBlock`.
856 ///
857 /// [`writable`]: method@Self::writable
858 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
859 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
860 }
861
862 /// Try to write a buffer to the stream, returning how many bytes were
863 /// written.
864 ///
865 /// The function will attempt to write the entire contents of `buf`, but
866 /// only part of the buffer may be written.
867 ///
868 /// This function is usually paired with `writable()`.
869 ///
870 /// # Return
871 ///
872 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
873 /// number of bytes written. If the stream is not ready to write data,
874 /// `Err(io::ErrorKind::WouldBlock)` is returned.
875 ///
876 /// # Examples
877 ///
878 /// ```no_run
879 /// use tokio::net::TcpStream;
880 /// use std::error::Error;
881 /// use std::io;
882 ///
883 /// #[tokio::main]
884 /// async fn main() -> Result<(), Box<dyn Error>> {
885 /// // Connect to a peer
886 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
887 ///
888 /// loop {
889 /// // Wait for the socket to be writable
890 /// stream.writable().await?;
891 ///
892 /// // Try to write data, this may still fail with `WouldBlock`
893 /// // if the readiness event is a false positive.
894 /// match stream.try_write(b"hello world") {
895 /// Ok(n) => {
896 /// break;
897 /// }
898 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
899 /// continue;
900 /// }
901 /// Err(e) => {
902 /// return Err(e.into());
903 /// }
904 /// }
905 /// }
906 ///
907 /// Ok(())
908 /// }
909 /// ```
910 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
911 use std::io::Write;
912
913 self.io
914 .registration()
915 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
916 }
917
918 /// Tries to write several buffers to the stream, returning how many bytes
919 /// were written.
920 ///
921 /// Data is written from each buffer in order, with the final buffer read
922 /// from possible being only partially consumed. This method behaves
923 /// equivalently to a single call to [`try_write()`] with concatenated
924 /// buffers.
925 ///
926 /// This function is usually paired with `writable()`.
927 ///
928 /// [`try_write()`]: TcpStream::try_write()
929 ///
930 /// # Return
931 ///
932 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
933 /// number of bytes written. If the stream is not ready to write data,
934 /// `Err(io::ErrorKind::WouldBlock)` is returned.
935 ///
936 /// # Examples
937 ///
938 /// ```no_run
939 /// use tokio::net::TcpStream;
940 /// use std::error::Error;
941 /// use std::io;
942 ///
943 /// #[tokio::main]
944 /// async fn main() -> Result<(), Box<dyn Error>> {
945 /// // Connect to a peer
946 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
947 ///
948 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
949 ///
950 /// loop {
951 /// // Wait for the socket to be writable
952 /// stream.writable().await?;
953 ///
954 /// // Try to write data, this may still fail with `WouldBlock`
955 /// // if the readiness event is a false positive.
956 /// match stream.try_write_vectored(&bufs) {
957 /// Ok(n) => {
958 /// break;
959 /// }
960 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
961 /// continue;
962 /// }
963 /// Err(e) => {
964 /// return Err(e.into());
965 /// }
966 /// }
967 /// }
968 ///
969 /// Ok(())
970 /// }
971 /// ```
972 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
973 use std::io::Write;
974
975 self.io
976 .registration()
977 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
978 }
979
980 /// Tries to read or write from the socket using a user-provided IO operation.
981 ///
982 /// If the socket is ready, the provided closure is called. The closure
983 /// should attempt to perform IO operation on the socket by manually
984 /// calling the appropriate syscall. If the operation fails because the
985 /// socket is not actually ready, then the closure should return a
986 /// `WouldBlock` error and the readiness flag is cleared. The return value
987 /// of the closure is then returned by `try_io`.
988 ///
989 /// If the socket is not ready, then the closure is not called
990 /// and a `WouldBlock` error is returned.
991 ///
992 /// The closure should only return a `WouldBlock` error if it has performed
993 /// an IO operation on the socket that failed due to the socket not being
994 /// ready. Returning a `WouldBlock` error in any other situation will
995 /// incorrectly clear the readiness flag, which can cause the socket to
996 /// behave incorrectly.
997 ///
998 /// The closure should not perform the IO operation using any of the methods
999 /// defined on the Tokio `TcpStream` type, as this will mess with the
1000 /// readiness flag and can cause the socket to behave incorrectly.
1001 ///
1002 /// This method is not intended to be used with combined interests.
1003 /// The closure should perform only one type of IO operation, so it should not
1004 /// require more than one ready state. This method may panic or sleep forever
1005 /// if it is called with a combined interest.
1006 ///
1007 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1008 ///
1009 /// [`readable()`]: TcpStream::readable()
1010 /// [`writable()`]: TcpStream::writable()
1011 /// [`ready()`]: TcpStream::ready()
1012 pub fn try_io<R>(
1013 &self,
1014 interest: Interest,
1015 f: impl FnOnce() -> io::Result<R>,
1016 ) -> io::Result<R> {
1017 self.io
1018 .registration()
1019 .try_io(interest, || self.io.try_io(f))
1020 }
1021
1022 /// Reads or writes from the socket using a user-provided IO operation.
1023 ///
1024 /// The readiness of the socket is awaited and when the socket is ready,
1025 /// the provided closure is called. The closure should attempt to perform
1026 /// IO operation on the socket by manually calling the appropriate syscall.
1027 /// If the operation fails because the socket is not actually ready,
1028 /// then the closure should return a `WouldBlock` error. In such case the
1029 /// readiness flag is cleared and the socket readiness is awaited again.
1030 /// This loop is repeated until the closure returns an `Ok` or an error
1031 /// other than `WouldBlock`.
1032 ///
1033 /// The closure should only return a `WouldBlock` error if it has performed
1034 /// an IO operation on the socket that failed due to the socket not being
1035 /// ready. Returning a `WouldBlock` error in any other situation will
1036 /// incorrectly clear the readiness flag, which can cause the socket to
1037 /// behave incorrectly.
1038 ///
1039 /// The closure should not perform the IO operation using any of the methods
1040 /// defined on the Tokio `TcpStream` type, as this will mess with the
1041 /// readiness flag and can cause the socket to behave incorrectly.
1042 ///
1043 /// This method is not intended to be used with combined interests.
1044 /// The closure should perform only one type of IO operation, so it should not
1045 /// require more than one ready state. This method may panic or sleep forever
1046 /// if it is called with a combined interest.
1047 pub async fn async_io<R>(
1048 &self,
1049 interest: Interest,
1050 mut f: impl FnMut() -> io::Result<R>,
1051 ) -> io::Result<R> {
1052 self.io
1053 .registration()
1054 .async_io(interest, || self.io.try_io(&mut f))
1055 .await
1056 }
1057
1058 /// Receives data on the socket from the remote address to which it is
1059 /// connected, without removing that data from the queue. On success,
1060 /// returns the number of bytes peeked.
1061 ///
1062 /// Successive calls return the same data. This is accomplished by passing
1063 /// `MSG_PEEK` as a flag to the underlying `recv` system call.
1064 ///
1065 /// # Examples
1066 ///
1067 /// ```no_run
1068 /// use tokio::net::TcpStream;
1069 /// use tokio::io::AsyncReadExt;
1070 /// use std::error::Error;
1071 ///
1072 /// #[tokio::main]
1073 /// async fn main() -> Result<(), Box<dyn Error>> {
1074 /// // Connect to a peer
1075 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1076 ///
1077 /// let mut b1 = [0; 10];
1078 /// let mut b2 = [0; 10];
1079 ///
1080 /// // Peek at the data
1081 /// let n = stream.peek(&mut b1).await?;
1082 ///
1083 /// // Read the data
1084 /// assert_eq!(n, stream.read(&mut b2[..n]).await?);
1085 /// assert_eq!(&b1[..n], &b2[..n]);
1086 ///
1087 /// Ok(())
1088 /// }
1089 /// ```
1090 ///
1091 /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1092 ///
1093 /// [`read`]: fn@crate::io::AsyncReadExt::read
1094 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
1095 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1096 self.io
1097 .registration()
1098 .async_io(Interest::READABLE, || self.io.peek(buf))
1099 .await
1100 }
1101
1102 /// Shuts down the read, write, or both halves of this connection.
1103 ///
1104 /// This function will cause all pending and future I/O on the specified
1105 /// portions to return immediately with an appropriate value (see the
1106 /// documentation of `Shutdown`).
1107 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1108 self.io.shutdown(how)
1109 }
1110
1111 /// Gets the value of the `TCP_NODELAY` option on this socket.
1112 ///
1113 /// For more information about this option, see [`set_nodelay`].
1114 ///
1115 /// [`set_nodelay`]: TcpStream::set_nodelay
1116 ///
1117 /// # Examples
1118 ///
1119 /// ```no_run
1120 /// use tokio::net::TcpStream;
1121 ///
1122 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1123 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1124 ///
1125 /// println!("{:?}", stream.nodelay()?);
1126 /// # Ok(())
1127 /// # }
1128 /// ```
1129 pub fn nodelay(&self) -> io::Result<bool> {
1130 self.io.nodelay()
1131 }
1132
1133 /// Sets the value of the `TCP_NODELAY` option on this socket.
1134 ///
1135 /// If set, this option disables the Nagle algorithm. This means that
1136 /// segments are always sent as soon as possible, even if there is only a
1137 /// small amount of data. When not set, data is buffered until there is a
1138 /// sufficient amount to send out, thereby avoiding the frequent sending of
1139 /// small packets.
1140 ///
1141 /// # Examples
1142 ///
1143 /// ```no_run
1144 /// use tokio::net::TcpStream;
1145 ///
1146 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1147 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1148 ///
1149 /// stream.set_nodelay(true)?;
1150 /// # Ok(())
1151 /// # }
1152 /// ```
1153 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1154 self.io.set_nodelay(nodelay)
1155 }
1156
1157 cfg_not_wasi! {
1158 /// Reads the linger duration for this socket by getting the `SO_LINGER`
1159 /// option.
1160 ///
1161 /// For more information about this option, see [`set_linger`].
1162 ///
1163 /// [`set_linger`]: TcpStream::set_linger
1164 ///
1165 /// # Examples
1166 ///
1167 /// ```no_run
1168 /// use tokio::net::TcpStream;
1169 ///
1170 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1171 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1172 ///
1173 /// println!("{:?}", stream.linger()?);
1174 /// # Ok(())
1175 /// # }
1176 /// ```
1177 pub fn linger(&self) -> io::Result<Option<Duration>> {
1178 socket2::SockRef::from(self).linger()
1179 }
1180
1181 /// Sets the linger duration of this socket by setting the `SO_LINGER` option.
1182 ///
1183 /// This option controls the action taken when a stream has unsent messages and the stream is
1184 /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the
1185 /// data or until the time expires.
1186 ///
1187 /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a
1188 /// way that allows the process to continue as quickly as possible.
1189 ///
1190 /// # Examples
1191 ///
1192 /// ```no_run
1193 /// use tokio::net::TcpStream;
1194 ///
1195 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1196 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1197 ///
1198 /// stream.set_linger(None)?;
1199 /// # Ok(())
1200 /// # }
1201 /// ```
1202 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1203 socket2::SockRef::from(self).set_linger(dur)
1204 }
1205 }
1206
1207 /// Gets the value of the `IP_TTL` option for this socket.
1208 ///
1209 /// For more information about this option, see [`set_ttl`].
1210 ///
1211 /// [`set_ttl`]: TcpStream::set_ttl
1212 ///
1213 /// # Examples
1214 ///
1215 /// ```no_run
1216 /// use tokio::net::TcpStream;
1217 ///
1218 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1219 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1220 ///
1221 /// println!("{:?}", stream.ttl()?);
1222 /// # Ok(())
1223 /// # }
1224 /// ```
1225 pub fn ttl(&self) -> io::Result<u32> {
1226 self.io.ttl()
1227 }
1228
1229 /// Sets the value for the `IP_TTL` option on this socket.
1230 ///
1231 /// This value sets the time-to-live field that is used in every packet sent
1232 /// from this socket.
1233 ///
1234 /// # Examples
1235 ///
1236 /// ```no_run
1237 /// use tokio::net::TcpStream;
1238 ///
1239 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1240 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1241 ///
1242 /// stream.set_ttl(123)?;
1243 /// # Ok(())
1244 /// # }
1245 /// ```
1246 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1247 self.io.set_ttl(ttl)
1248 }
1249
1250 // These lifetime markers also appear in the generated documentation, and make
1251 // it more clear that this is a *borrowed* split.
1252 #[allow(clippy::needless_lifetimes)]
1253 /// Splits a `TcpStream` into a read half and a write half, which can be used
1254 /// to read and write the stream concurrently.
1255 ///
1256 /// This method is more efficient than [`into_split`], but the halves cannot be
1257 /// moved into independently spawned tasks.
1258 ///
1259 /// [`into_split`]: TcpStream::into_split()
1260 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1261 split(self)
1262 }
1263
1264 /// Splits a `TcpStream` into a read half and a write half, which can be used
1265 /// to read and write the stream concurrently.
1266 ///
1267 /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1268 /// this comes at the cost of a heap allocation.
1269 ///
1270 /// **Note:** Dropping the write half will shut down the write half of the TCP
1271 /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1272 ///
1273 /// [`split`]: TcpStream::split()
1274 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
1275 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1276 split_owned(self)
1277 }
1278
1279 // == Poll IO functions that takes `&self` ==
1280 //
1281 // To read or write without mutable access to the `UnixStream`, combine the
1282 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1283 // `try_write` methods.
1284
1285 pub(crate) fn poll_read_priv(
1286 &self,
1287 cx: &mut Context<'_>,
1288 buf: &mut ReadBuf<'_>,
1289 ) -> Poll<io::Result<()>> {
1290 // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1291 unsafe { self.io.poll_read(cx, buf) }
1292 }
1293
1294 pub(super) fn poll_write_priv(
1295 &self,
1296 cx: &mut Context<'_>,
1297 buf: &[u8],
1298 ) -> Poll<io::Result<usize>> {
1299 self.io.poll_write(cx, buf)
1300 }
1301
1302 pub(super) fn poll_write_vectored_priv(
1303 &self,
1304 cx: &mut Context<'_>,
1305 bufs: &[io::IoSlice<'_>],
1306 ) -> Poll<io::Result<usize>> {
1307 self.io.poll_write_vectored(cx, bufs)
1308 }
1309}
1310
1311impl TryFrom<std::net::TcpStream> for TcpStream {
1312 type Error = io::Error;
1313
1314 /// Consumes stream, returning the tokio I/O object.
1315 ///
1316 /// This is equivalent to
1317 /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
1318 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1319 Self::from_std(stream)
1320 }
1321}
1322
1323// ===== impl Read / Write =====
1324
1325impl AsyncRead for TcpStream {
1326 fn poll_read(
1327 self: Pin<&mut Self>,
1328 cx: &mut Context<'_>,
1329 buf: &mut ReadBuf<'_>,
1330 ) -> Poll<io::Result<()>> {
1331 self.poll_read_priv(cx, buf)
1332 }
1333}
1334
1335impl AsyncWrite for TcpStream {
1336 fn poll_write(
1337 self: Pin<&mut Self>,
1338 cx: &mut Context<'_>,
1339 buf: &[u8],
1340 ) -> Poll<io::Result<usize>> {
1341 self.poll_write_priv(cx, buf)
1342 }
1343
1344 fn poll_write_vectored(
1345 self: Pin<&mut Self>,
1346 cx: &mut Context<'_>,
1347 bufs: &[io::IoSlice<'_>],
1348 ) -> Poll<io::Result<usize>> {
1349 self.poll_write_vectored_priv(cx, bufs)
1350 }
1351
1352 fn is_write_vectored(&self) -> bool {
1353 true
1354 }
1355
1356 #[inline]
1357 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1358 // tcp flush is a no-op
1359 Poll::Ready(Ok(()))
1360 }
1361
1362 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1363 self.shutdown_std(std::net::Shutdown::Write)?;
1364 Poll::Ready(Ok(()))
1365 }
1366}
1367
1368impl fmt::Debug for TcpStream {
1369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1370 self.io.fmt(f)
1371 }
1372}
1373
1374#[cfg(unix)]
1375mod sys {
1376 use super::TcpStream;
1377 use std::os::unix::prelude::*;
1378
1379 impl AsRawFd for TcpStream {
1380 fn as_raw_fd(&self) -> RawFd {
1381 self.io.as_raw_fd()
1382 }
1383 }
1384
1385 impl AsFd for TcpStream {
1386 fn as_fd(&self) -> BorrowedFd<'_> {
1387 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1388 }
1389 }
1390}
1391
1392cfg_windows! {
1393 use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1394
1395 impl AsRawSocket for TcpStream {
1396 fn as_raw_socket(&self) -> RawSocket {
1397 self.io.as_raw_socket()
1398 }
1399 }
1400
1401 impl AsSocket for TcpStream {
1402 fn as_socket(&self) -> BorrowedSocket<'_> {
1403 unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1404 }
1405 }
1406}
1407
1408#[cfg(all(tokio_unstable, target_os = "wasi"))]
1409mod sys {
1410 use super::TcpStream;
1411 use std::os::wasi::prelude::*;
1412
1413 impl AsRawFd for TcpStream {
1414 fn as_raw_fd(&self) -> RawFd {
1415 self.io.as_raw_fd()
1416 }
1417 }
1418
1419 impl AsFd for TcpStream {
1420 fn as_fd(&self) -> BorrowedFd<'_> {
1421 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1422 }
1423 }
1424}
1425