1cfg_not_wasi! {
2 use crate::future::poll_fn;
3 use crate::net::{to_socket_addrs, ToSocketAddrs};
4 use std::time::Duration;
5}
6
7use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10
11use std::fmt;
12use std::io;
13use std::net::{Shutdown, SocketAddr};
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17cfg_io_util! {
18 use bytes::BufMut;
19}
20
21cfg_net! {
22 /// A TCP stream between a local and a remote socket.
23 ///
24 /// A TCP stream can either be created by connecting to an endpoint, via the
25 /// [`connect`] method, or by [accepting] a connection from a [listener]. A
26 /// TCP stream can also be created via the [`TcpSocket`] type.
27 ///
28 /// Reading and writing to a `TcpStream` is usually done using the
29 /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
30 /// traits.
31 ///
32 /// [`connect`]: method@TcpStream::connect
33 /// [accepting]: method@crate::net::TcpListener::accept
34 /// [listener]: struct@crate::net::TcpListener
35 /// [`TcpSocket`]: struct@crate::net::TcpSocket
36 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
37 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
38 ///
39 /// # Examples
40 ///
41 /// ```no_run
42 /// use tokio::net::TcpStream;
43 /// use tokio::io::AsyncWriteExt;
44 /// use std::error::Error;
45 ///
46 /// #[tokio::main]
47 /// async fn main() -> Result<(), Box<dyn Error>> {
48 /// // Connect to a peer
49 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
50 ///
51 /// // Write some data.
52 /// stream.write_all(b"hello world!").await?;
53 ///
54 /// Ok(())
55 /// }
56 /// ```
57 ///
58 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
59 ///
60 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
61 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
62 ///
63 /// To shut down the stream in the write direction, you can call the
64 /// [`shutdown()`] method. This will cause the other peer to receive a read of
65 /// length 0, indicating that no more data will be sent. This only closes
66 /// the stream in one direction.
67 ///
68 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
69 pub struct TcpStream {
70 io: PollEvented<mio::net::TcpStream>,
71 }
72}
73
74impl TcpStream {
75 cfg_not_wasi! {
76 /// Opens a TCP connection to a remote host.
77 ///
78 /// `addr` is an address of the remote host. Anything which implements the
79 /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
80 /// yields multiple addresses, connect will be attempted with each of the
81 /// addresses until a connection is successful. If none of the addresses
82 /// result in a successful connection, the error returned from the last
83 /// connection attempt (the last address) is returned.
84 ///
85 /// To configure the socket before connecting, you can use the [`TcpSocket`]
86 /// type.
87 ///
88 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
89 /// [`TcpSocket`]: struct@crate::net::TcpSocket
90 ///
91 /// # Examples
92 ///
93 /// ```no_run
94 /// use tokio::net::TcpStream;
95 /// use tokio::io::AsyncWriteExt;
96 /// use std::error::Error;
97 ///
98 /// #[tokio::main]
99 /// async fn main() -> Result<(), Box<dyn Error>> {
100 /// // Connect to a peer
101 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
102 ///
103 /// // Write some data.
104 /// stream.write_all(b"hello world!").await?;
105 ///
106 /// Ok(())
107 /// }
108 /// ```
109 ///
110 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
111 ///
112 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
113 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
114 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
115 let addrs = to_socket_addrs(addr).await?;
116
117 let mut last_err = None;
118
119 for addr in addrs {
120 match TcpStream::connect_addr(addr).await {
121 Ok(stream) => return Ok(stream),
122 Err(e) => last_err = Some(e),
123 }
124 }
125
126 Err(last_err.unwrap_or_else(|| {
127 io::Error::new(
128 io::ErrorKind::InvalidInput,
129 "could not resolve to any address",
130 )
131 }))
132 }
133
134 /// Establishes a connection to the specified `addr`.
135 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
136 let sys = mio::net::TcpStream::connect(addr)?;
137 TcpStream::connect_mio(sys).await
138 }
139
140 pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
141 let stream = TcpStream::new(sys)?;
142
143 // Once we've connected, wait for the stream to be writable as
144 // that's when the actual connection has been initiated. Once we're
145 // writable we check for `take_socket_error` to see if the connect
146 // actually hit an error or not.
147 //
148 // If all that succeeded then we ship everything on up.
149 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
150
151 if let Some(e) = stream.io.take_error()? {
152 return Err(e);
153 }
154
155 Ok(stream)
156 }
157 }
158
159 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
160 let io = PollEvented::new(connected)?;
161 Ok(TcpStream { io })
162 }
163
164 /// Creates new `TcpStream` from a `std::net::TcpStream`.
165 ///
166 /// This function is intended to be used to wrap a TCP stream from the
167 /// standard library in the Tokio equivalent.
168 ///
169 /// # Notes
170 ///
171 /// The caller is responsible for ensuring that the stream is in
172 /// non-blocking mode. Otherwise all I/O operations on the stream
173 /// will block the thread, which will cause unexpected behavior.
174 /// Non-blocking mode can be set using [`set_nonblocking`].
175 ///
176 /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
177 ///
178 /// # Examples
179 ///
180 /// ```rust,no_run
181 /// use std::error::Error;
182 /// use tokio::net::TcpStream;
183 ///
184 /// #[tokio::main]
185 /// async fn main() -> Result<(), Box<dyn Error>> {
186 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
187 /// std_stream.set_nonblocking(true)?;
188 /// let stream = TcpStream::from_std(std_stream)?;
189 /// Ok(())
190 /// }
191 /// ```
192 ///
193 /// # Panics
194 ///
195 /// This function panics if it is not called from within a runtime with
196 /// IO enabled.
197 ///
198 /// The runtime is usually set implicitly when this function is called
199 /// from a future driven by a tokio runtime, otherwise runtime can be set
200 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
201 #[track_caller]
202 pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
203 let io = mio::net::TcpStream::from_std(stream);
204 let io = PollEvented::new(io)?;
205 Ok(TcpStream { io })
206 }
207
208 /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
209 ///
210 /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
211 /// Use [`set_nonblocking`] to change the blocking mode if needed.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use std::error::Error;
217 /// use std::io::Read;
218 /// use tokio::net::TcpListener;
219 /// # use tokio::net::TcpStream;
220 /// # use tokio::io::AsyncWriteExt;
221 ///
222 /// #[tokio::main]
223 /// async fn main() -> Result<(), Box<dyn Error>> {
224 /// let mut data = [0u8; 12];
225 /// let listener = TcpListener::bind("127.0.0.1:34254").await?;
226 /// # let handle = tokio::spawn(async {
227 /// # let mut stream: TcpStream = TcpStream::connect("127.0.0.1:34254").await.unwrap();
228 /// # stream.write(b"Hello world!").await.unwrap();
229 /// # });
230 /// let (tokio_tcp_stream, _) = listener.accept().await?;
231 /// let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
232 /// # handle.await.expect("The task being joined has panicked");
233 /// std_tcp_stream.set_nonblocking(false)?;
234 /// std_tcp_stream.read_exact(&mut data)?;
235 /// # assert_eq!(b"Hello world!", &data);
236 /// Ok(())
237 /// }
238 /// ```
239 /// [`tokio::net::TcpStream`]: TcpStream
240 /// [`std::net::TcpStream`]: std::net::TcpStream
241 /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
242 pub fn into_std(self) -> io::Result<std::net::TcpStream> {
243 #[cfg(unix)]
244 {
245 use std::os::unix::io::{FromRawFd, IntoRawFd};
246 self.io
247 .into_inner()
248 .map(|io| io.into_raw_fd())
249 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
250 }
251
252 #[cfg(windows)]
253 {
254 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
255 self.io
256 .into_inner()
257 .map(|io| io.into_raw_socket())
258 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
259 }
260
261 #[cfg(tokio_wasi)]
262 {
263 use std::os::wasi::io::{FromRawFd, IntoRawFd};
264 self.io
265 .into_inner()
266 .map(|io| io.into_raw_fd())
267 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
268 }
269 }
270
271 /// Returns the local address that this stream is bound to.
272 ///
273 /// # Examples
274 ///
275 /// ```no_run
276 /// use tokio::net::TcpStream;
277 ///
278 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
279 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
280 ///
281 /// println!("{:?}", stream.local_addr()?);
282 /// # Ok(())
283 /// # }
284 /// ```
285 pub fn local_addr(&self) -> io::Result<SocketAddr> {
286 self.io.local_addr()
287 }
288
289 /// Returns the value of the `SO_ERROR` option.
290 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
291 self.io.take_error()
292 }
293
294 /// Returns the remote address that this stream is connected to.
295 ///
296 /// # Examples
297 ///
298 /// ```no_run
299 /// use tokio::net::TcpStream;
300 ///
301 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
302 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
303 ///
304 /// println!("{:?}", stream.peer_addr()?);
305 /// # Ok(())
306 /// # }
307 /// ```
308 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
309 self.io.peer_addr()
310 }
311
312 /// Attempts to receive data on the socket, without removing that data from
313 /// the queue, registering the current task for wakeup if data is not yet
314 /// available.
315 ///
316 /// Note that on multiple calls to `poll_peek`, `poll_read` or
317 /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
318 /// most recent call is scheduled to receive a wakeup. (However,
319 /// `poll_write` retains a second, independent waker.)
320 ///
321 /// # Return value
322 ///
323 /// The function returns:
324 ///
325 /// * `Poll::Pending` if data is not yet available.
326 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
327 /// * `Poll::Ready(Err(e))` if an error is encountered.
328 ///
329 /// # Errors
330 ///
331 /// This function may encounter any standard I/O error except `WouldBlock`.
332 ///
333 /// # Examples
334 ///
335 /// ```no_run
336 /// use tokio::io::{self, ReadBuf};
337 /// use tokio::net::TcpStream;
338 ///
339 /// use futures::future::poll_fn;
340 ///
341 /// #[tokio::main]
342 /// async fn main() -> io::Result<()> {
343 /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
344 /// let mut buf = [0; 10];
345 /// let mut buf = ReadBuf::new(&mut buf);
346 ///
347 /// poll_fn(|cx| {
348 /// stream.poll_peek(cx, &mut buf)
349 /// }).await?;
350 ///
351 /// Ok(())
352 /// }
353 /// ```
354 pub fn poll_peek(
355 &self,
356 cx: &mut Context<'_>,
357 buf: &mut ReadBuf<'_>,
358 ) -> Poll<io::Result<usize>> {
359 loop {
360 let ev = ready!(self.io.registration().poll_read_ready(cx))?;
361
362 let b = unsafe {
363 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
364 };
365
366 match self.io.peek(b) {
367 Ok(ret) => {
368 unsafe { buf.assume_init(ret) };
369 buf.advance(ret);
370 return Poll::Ready(Ok(ret));
371 }
372 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
373 self.io.registration().clear_readiness(ev);
374 }
375 Err(e) => return Poll::Ready(Err(e)),
376 }
377 }
378 }
379
380 /// Waits for any of the requested ready states.
381 ///
382 /// This function is usually paired with `try_read()` or `try_write()`. It
383 /// can be used to concurrently read / write to the same socket on a single
384 /// task without splitting the socket.
385 ///
386 /// The function may complete without the socket being ready. This is a
387 /// false-positive and attempting an operation will return with
388 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
389 /// [`Ready`] set, so you should always check the returned value and possibly
390 /// wait again if the requested states are not set.
391 ///
392 /// # Cancel safety
393 ///
394 /// This method is cancel safe. Once a readiness event occurs, the method
395 /// will continue to return immediately until the readiness event is
396 /// consumed by an attempt to read or write that fails with `WouldBlock` or
397 /// `Poll::Pending`.
398 ///
399 /// # Examples
400 ///
401 /// Concurrently read and write to the stream on the same task without
402 /// splitting.
403 ///
404 /// ```no_run
405 /// use tokio::io::Interest;
406 /// use tokio::net::TcpStream;
407 /// use std::error::Error;
408 /// use std::io;
409 ///
410 /// #[tokio::main]
411 /// async fn main() -> Result<(), Box<dyn Error>> {
412 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
413 ///
414 /// loop {
415 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
416 ///
417 /// if ready.is_readable() {
418 /// let mut data = vec![0; 1024];
419 /// // Try to read data, this may still fail with `WouldBlock`
420 /// // if the readiness event is a false positive.
421 /// match stream.try_read(&mut data) {
422 /// Ok(n) => {
423 /// println!("read {} bytes", n);
424 /// }
425 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
426 /// continue;
427 /// }
428 /// Err(e) => {
429 /// return Err(e.into());
430 /// }
431 /// }
432 ///
433 /// }
434 ///
435 /// if ready.is_writable() {
436 /// // Try to write data, this may still fail with `WouldBlock`
437 /// // if the readiness event is a false positive.
438 /// match stream.try_write(b"hello world") {
439 /// Ok(n) => {
440 /// println!("write {} bytes", n);
441 /// }
442 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
443 /// continue
444 /// }
445 /// Err(e) => {
446 /// return Err(e.into());
447 /// }
448 /// }
449 /// }
450 /// }
451 /// }
452 /// ```
453 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
454 let event = self.io.registration().readiness(interest).await?;
455 Ok(event.ready)
456 }
457
458 /// Waits for the socket to become readable.
459 ///
460 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
461 /// paired with `try_read()`.
462 ///
463 /// # Cancel safety
464 ///
465 /// This method is cancel safe. Once a readiness event occurs, the method
466 /// will continue to return immediately until the readiness event is
467 /// consumed by an attempt to read that fails with `WouldBlock` or
468 /// `Poll::Pending`.
469 ///
470 /// # Examples
471 ///
472 /// ```no_run
473 /// use tokio::net::TcpStream;
474 /// use std::error::Error;
475 /// use std::io;
476 ///
477 /// #[tokio::main]
478 /// async fn main() -> Result<(), Box<dyn Error>> {
479 /// // Connect to a peer
480 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
481 ///
482 /// let mut msg = vec![0; 1024];
483 ///
484 /// loop {
485 /// // Wait for the socket to be readable
486 /// stream.readable().await?;
487 ///
488 /// // Try to read data, this may still fail with `WouldBlock`
489 /// // if the readiness event is a false positive.
490 /// match stream.try_read(&mut msg) {
491 /// Ok(n) => {
492 /// msg.truncate(n);
493 /// break;
494 /// }
495 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
496 /// continue;
497 /// }
498 /// Err(e) => {
499 /// return Err(e.into());
500 /// }
501 /// }
502 /// }
503 ///
504 /// println!("GOT = {:?}", msg);
505 /// Ok(())
506 /// }
507 /// ```
508 pub async fn readable(&self) -> io::Result<()> {
509 self.ready(Interest::READABLE).await?;
510 Ok(())
511 }
512
513 /// Polls for read readiness.
514 ///
515 /// If the tcp stream is not currently ready for reading, this method will
516 /// store a clone of the `Waker` from the provided `Context`. When the tcp
517 /// stream becomes ready for reading, `Waker::wake` will be called on the
518 /// waker.
519 ///
520 /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
521 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
522 /// recent call is scheduled to receive a wakeup. (However,
523 /// `poll_write_ready` retains a second, independent waker.)
524 ///
525 /// This function is intended for cases where creating and pinning a future
526 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
527 /// preferred, as this supports polling from multiple tasks at once.
528 ///
529 /// # Return value
530 ///
531 /// The function returns:
532 ///
533 /// * `Poll::Pending` if the tcp stream is not ready for reading.
534 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
535 /// * `Poll::Ready(Err(e))` if an error is encountered.
536 ///
537 /// # Errors
538 ///
539 /// This function may encounter any standard I/O error except `WouldBlock`.
540 ///
541 /// [`readable`]: method@Self::readable
542 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
543 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
544 }
545
546 /// Tries to read data from the stream into the provided buffer, returning how
547 /// many bytes were read.
548 ///
549 /// Receives any pending data from the socket but does not wait for new data
550 /// to arrive. On success, returns the number of bytes read. Because
551 /// `try_read()` is non-blocking, the buffer does not have to be stored by
552 /// the async task and can exist entirely on the stack.
553 ///
554 /// Usually, [`readable()`] or [`ready()`] is used with this function.
555 ///
556 /// [`readable()`]: TcpStream::readable()
557 /// [`ready()`]: TcpStream::ready()
558 ///
559 /// # Return
560 ///
561 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
562 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
563 ///
564 /// 1. The stream's read half is closed and will no longer yield data.
565 /// 2. The specified buffer was 0 bytes in length.
566 ///
567 /// If the stream is not ready to read data,
568 /// `Err(io::ErrorKind::WouldBlock)` is returned.
569 ///
570 /// # Examples
571 ///
572 /// ```no_run
573 /// use tokio::net::TcpStream;
574 /// use std::error::Error;
575 /// use std::io;
576 ///
577 /// #[tokio::main]
578 /// async fn main() -> Result<(), Box<dyn Error>> {
579 /// // Connect to a peer
580 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
581 ///
582 /// loop {
583 /// // Wait for the socket to be readable
584 /// stream.readable().await?;
585 ///
586 /// // Creating the buffer **after** the `await` prevents it from
587 /// // being stored in the async task.
588 /// let mut buf = [0; 4096];
589 ///
590 /// // Try to read data, this may still fail with `WouldBlock`
591 /// // if the readiness event is a false positive.
592 /// match stream.try_read(&mut buf) {
593 /// Ok(0) => break,
594 /// Ok(n) => {
595 /// println!("read {} bytes", n);
596 /// }
597 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
598 /// continue;
599 /// }
600 /// Err(e) => {
601 /// return Err(e.into());
602 /// }
603 /// }
604 /// }
605 ///
606 /// Ok(())
607 /// }
608 /// ```
609 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
610 use std::io::Read;
611
612 self.io
613 .registration()
614 .try_io(Interest::READABLE, || (&*self.io).read(buf))
615 }
616
617 /// Tries to read data from the stream into the provided buffers, returning
618 /// how many bytes were read.
619 ///
620 /// Data is copied to fill each buffer in order, with the final buffer
621 /// written to possibly being only partially filled. This method behaves
622 /// equivalently to a single call to [`try_read()`] with concatenated
623 /// buffers.
624 ///
625 /// Receives any pending data from the socket but does not wait for new data
626 /// to arrive. On success, returns the number of bytes read. Because
627 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
628 /// stored by the async task and can exist entirely on the stack.
629 ///
630 /// Usually, [`readable()`] or [`ready()`] is used with this function.
631 ///
632 /// [`try_read()`]: TcpStream::try_read()
633 /// [`readable()`]: TcpStream::readable()
634 /// [`ready()`]: TcpStream::ready()
635 ///
636 /// # Return
637 ///
638 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
639 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
640 /// and will no longer yield data. If the stream is not ready to read data
641 /// `Err(io::ErrorKind::WouldBlock)` is returned.
642 ///
643 /// # Examples
644 ///
645 /// ```no_run
646 /// use tokio::net::TcpStream;
647 /// use std::error::Error;
648 /// use std::io::{self, IoSliceMut};
649 ///
650 /// #[tokio::main]
651 /// async fn main() -> Result<(), Box<dyn Error>> {
652 /// // Connect to a peer
653 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
654 ///
655 /// loop {
656 /// // Wait for the socket to be readable
657 /// stream.readable().await?;
658 ///
659 /// // Creating the buffer **after** the `await` prevents it from
660 /// // being stored in the async task.
661 /// let mut buf_a = [0; 512];
662 /// let mut buf_b = [0; 1024];
663 /// let mut bufs = [
664 /// IoSliceMut::new(&mut buf_a),
665 /// IoSliceMut::new(&mut buf_b),
666 /// ];
667 ///
668 /// // Try to read data, this may still fail with `WouldBlock`
669 /// // if the readiness event is a false positive.
670 /// match stream.try_read_vectored(&mut bufs) {
671 /// Ok(0) => break,
672 /// Ok(n) => {
673 /// println!("read {} bytes", n);
674 /// }
675 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
676 /// continue;
677 /// }
678 /// Err(e) => {
679 /// return Err(e.into());
680 /// }
681 /// }
682 /// }
683 ///
684 /// Ok(())
685 /// }
686 /// ```
687 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
688 use std::io::Read;
689
690 self.io
691 .registration()
692 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
693 }
694
695 cfg_io_util! {
696 /// Tries to read data from the stream into the provided buffer, advancing the
697 /// buffer's internal cursor, returning how many bytes were read.
698 ///
699 /// Receives any pending data from the socket but does not wait for new data
700 /// to arrive. On success, returns the number of bytes read. Because
701 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
702 /// the async task and can exist entirely on the stack.
703 ///
704 /// Usually, [`readable()`] or [`ready()`] is used with this function.
705 ///
706 /// [`readable()`]: TcpStream::readable()
707 /// [`ready()`]: TcpStream::ready()
708 ///
709 /// # Return
710 ///
711 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
712 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
713 /// and will no longer yield data. If the stream is not ready to read data
714 /// `Err(io::ErrorKind::WouldBlock)` is returned.
715 ///
716 /// # Examples
717 ///
718 /// ```no_run
719 /// use tokio::net::TcpStream;
720 /// use std::error::Error;
721 /// use std::io;
722 ///
723 /// #[tokio::main]
724 /// async fn main() -> Result<(), Box<dyn Error>> {
725 /// // Connect to a peer
726 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
727 ///
728 /// loop {
729 /// // Wait for the socket to be readable
730 /// stream.readable().await?;
731 ///
732 /// let mut buf = Vec::with_capacity(4096);
733 ///
734 /// // Try to read data, this may still fail with `WouldBlock`
735 /// // if the readiness event is a false positive.
736 /// match stream.try_read_buf(&mut buf) {
737 /// Ok(0) => break,
738 /// Ok(n) => {
739 /// println!("read {} bytes", n);
740 /// }
741 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
742 /// continue;
743 /// }
744 /// Err(e) => {
745 /// return Err(e.into());
746 /// }
747 /// }
748 /// }
749 ///
750 /// Ok(())
751 /// }
752 /// ```
753 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
754 self.io.registration().try_io(Interest::READABLE, || {
755 use std::io::Read;
756
757 let dst = buf.chunk_mut();
758 let dst =
759 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
760
761 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
762 // buffer.
763 let n = (&*self.io).read(dst)?;
764
765 unsafe {
766 buf.advance_mut(n);
767 }
768
769 Ok(n)
770 })
771 }
772 }
773
774 /// Waits for the socket to become writable.
775 ///
776 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
777 /// paired with `try_write()`.
778 ///
779 /// # Cancel safety
780 ///
781 /// This method is cancel safe. Once a readiness event occurs, the method
782 /// will continue to return immediately until the readiness event is
783 /// consumed by an attempt to write that fails with `WouldBlock` or
784 /// `Poll::Pending`.
785 ///
786 /// # Examples
787 ///
788 /// ```no_run
789 /// use tokio::net::TcpStream;
790 /// use std::error::Error;
791 /// use std::io;
792 ///
793 /// #[tokio::main]
794 /// async fn main() -> Result<(), Box<dyn Error>> {
795 /// // Connect to a peer
796 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
797 ///
798 /// loop {
799 /// // Wait for the socket to be writable
800 /// stream.writable().await?;
801 ///
802 /// // Try to write data, this may still fail with `WouldBlock`
803 /// // if the readiness event is a false positive.
804 /// match stream.try_write(b"hello world") {
805 /// Ok(n) => {
806 /// break;
807 /// }
808 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
809 /// continue;
810 /// }
811 /// Err(e) => {
812 /// return Err(e.into());
813 /// }
814 /// }
815 /// }
816 ///
817 /// Ok(())
818 /// }
819 /// ```
820 pub async fn writable(&self) -> io::Result<()> {
821 self.ready(Interest::WRITABLE).await?;
822 Ok(())
823 }
824
825 /// Polls for write readiness.
826 ///
827 /// If the tcp stream is not currently ready for writing, this method will
828 /// store a clone of the `Waker` from the provided `Context`. When the tcp
829 /// stream becomes ready for writing, `Waker::wake` will be called on the
830 /// waker.
831 ///
832 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
833 /// the `Waker` from the `Context` passed to the most recent call is
834 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
835 /// second, independent waker.)
836 ///
837 /// This function is intended for cases where creating and pinning a future
838 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
839 /// preferred, as this supports polling from multiple tasks at once.
840 ///
841 /// # Return value
842 ///
843 /// The function returns:
844 ///
845 /// * `Poll::Pending` if the tcp stream is not ready for writing.
846 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
847 /// * `Poll::Ready(Err(e))` if an error is encountered.
848 ///
849 /// # Errors
850 ///
851 /// This function may encounter any standard I/O error except `WouldBlock`.
852 ///
853 /// [`writable`]: method@Self::writable
854 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
855 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
856 }
857
858 /// Try to write a buffer to the stream, returning how many bytes were
859 /// written.
860 ///
861 /// The function will attempt to write the entire contents of `buf`, but
862 /// only part of the buffer may be written.
863 ///
864 /// This function is usually paired with `writable()`.
865 ///
866 /// # Return
867 ///
868 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
869 /// number of bytes written. If the stream is not ready to write data,
870 /// `Err(io::ErrorKind::WouldBlock)` is returned.
871 ///
872 /// # Examples
873 ///
874 /// ```no_run
875 /// use tokio::net::TcpStream;
876 /// use std::error::Error;
877 /// use std::io;
878 ///
879 /// #[tokio::main]
880 /// async fn main() -> Result<(), Box<dyn Error>> {
881 /// // Connect to a peer
882 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
883 ///
884 /// loop {
885 /// // Wait for the socket to be writable
886 /// stream.writable().await?;
887 ///
888 /// // Try to write data, this may still fail with `WouldBlock`
889 /// // if the readiness event is a false positive.
890 /// match stream.try_write(b"hello world") {
891 /// Ok(n) => {
892 /// break;
893 /// }
894 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
895 /// continue;
896 /// }
897 /// Err(e) => {
898 /// return Err(e.into());
899 /// }
900 /// }
901 /// }
902 ///
903 /// Ok(())
904 /// }
905 /// ```
906 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
907 use std::io::Write;
908
909 self.io
910 .registration()
911 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
912 }
913
914 /// Tries to write several buffers to the stream, returning how many bytes
915 /// were written.
916 ///
917 /// Data is written from each buffer in order, with the final buffer read
918 /// from possible being only partially consumed. This method behaves
919 /// equivalently to a single call to [`try_write()`] with concatenated
920 /// buffers.
921 ///
922 /// This function is usually paired with `writable()`.
923 ///
924 /// [`try_write()`]: TcpStream::try_write()
925 ///
926 /// # Return
927 ///
928 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
929 /// number of bytes written. If the stream is not ready to write data,
930 /// `Err(io::ErrorKind::WouldBlock)` is returned.
931 ///
932 /// # Examples
933 ///
934 /// ```no_run
935 /// use tokio::net::TcpStream;
936 /// use std::error::Error;
937 /// use std::io;
938 ///
939 /// #[tokio::main]
940 /// async fn main() -> Result<(), Box<dyn Error>> {
941 /// // Connect to a peer
942 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
943 ///
944 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
945 ///
946 /// loop {
947 /// // Wait for the socket to be writable
948 /// stream.writable().await?;
949 ///
950 /// // Try to write data, this may still fail with `WouldBlock`
951 /// // if the readiness event is a false positive.
952 /// match stream.try_write_vectored(&bufs) {
953 /// Ok(n) => {
954 /// break;
955 /// }
956 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
957 /// continue;
958 /// }
959 /// Err(e) => {
960 /// return Err(e.into());
961 /// }
962 /// }
963 /// }
964 ///
965 /// Ok(())
966 /// }
967 /// ```
968 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
969 use std::io::Write;
970
971 self.io
972 .registration()
973 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
974 }
975
976 /// Tries to read or write from the socket using a user-provided IO operation.
977 ///
978 /// If the socket is ready, the provided closure is called. The closure
979 /// should attempt to perform IO operation on the socket by manually
980 /// calling the appropriate syscall. If the operation fails because the
981 /// socket is not actually ready, then the closure should return a
982 /// `WouldBlock` error and the readiness flag is cleared. The return value
983 /// of the closure is then returned by `try_io`.
984 ///
985 /// If the socket is not ready, then the closure is not called
986 /// and a `WouldBlock` error is returned.
987 ///
988 /// The closure should only return a `WouldBlock` error if it has performed
989 /// an IO operation on the socket that failed due to the socket not being
990 /// ready. Returning a `WouldBlock` error in any other situation will
991 /// incorrectly clear the readiness flag, which can cause the socket to
992 /// behave incorrectly.
993 ///
994 /// The closure should not perform the IO operation using any of the methods
995 /// defined on the Tokio `TcpStream` type, as this will mess with the
996 /// readiness flag and can cause the socket to behave incorrectly.
997 ///
998 /// This method is not intended to be used with combined interests.
999 /// The closure should perform only one type of IO operation, so it should not
1000 /// require more than one ready state. This method may panic or sleep forever
1001 /// if it is called with a combined interest.
1002 ///
1003 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1004 ///
1005 /// [`readable()`]: TcpStream::readable()
1006 /// [`writable()`]: TcpStream::writable()
1007 /// [`ready()`]: TcpStream::ready()
1008 pub fn try_io<R>(
1009 &self,
1010 interest: Interest,
1011 f: impl FnOnce() -> io::Result<R>,
1012 ) -> io::Result<R> {
1013 self.io
1014 .registration()
1015 .try_io(interest, || self.io.try_io(f))
1016 }
1017
1018 /// Reads or writes from the socket using a user-provided IO operation.
1019 ///
1020 /// The readiness of the socket is awaited and when the socket is ready,
1021 /// the provided closure is called. The closure should attempt to perform
1022 /// IO operation on the socket by manually calling the appropriate syscall.
1023 /// If the operation fails because the socket is not actually ready,
1024 /// then the closure should return a `WouldBlock` error. In such case the
1025 /// readiness flag is cleared and the socket readiness is awaited again.
1026 /// This loop is repeated until the closure returns an `Ok` or an error
1027 /// other than `WouldBlock`.
1028 ///
1029 /// The closure should only return a `WouldBlock` error if it has performed
1030 /// an IO operation on the socket that failed due to the socket not being
1031 /// ready. Returning a `WouldBlock` error in any other situation will
1032 /// incorrectly clear the readiness flag, which can cause the socket to
1033 /// behave incorrectly.
1034 ///
1035 /// The closure should not perform the IO operation using any of the methods
1036 /// defined on the Tokio `TcpStream` type, as this will mess with the
1037 /// readiness flag and can cause the socket to behave incorrectly.
1038 ///
1039 /// This method is not intended to be used with combined interests.
1040 /// The closure should perform only one type of IO operation, so it should not
1041 /// require more than one ready state. This method may panic or sleep forever
1042 /// if it is called with a combined interest.
1043 pub async fn async_io<R>(
1044 &self,
1045 interest: Interest,
1046 mut f: impl FnMut() -> io::Result<R>,
1047 ) -> io::Result<R> {
1048 self.io
1049 .registration()
1050 .async_io(interest, || self.io.try_io(&mut f))
1051 .await
1052 }
1053
1054 /// Receives data on the socket from the remote address to which it is
1055 /// connected, without removing that data from the queue. On success,
1056 /// returns the number of bytes peeked.
1057 ///
1058 /// Successive calls return the same data. This is accomplished by passing
1059 /// `MSG_PEEK` as a flag to the underlying recv system call.
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```no_run
1064 /// use tokio::net::TcpStream;
1065 /// use tokio::io::AsyncReadExt;
1066 /// use std::error::Error;
1067 ///
1068 /// #[tokio::main]
1069 /// async fn main() -> Result<(), Box<dyn Error>> {
1070 /// // Connect to a peer
1071 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1072 ///
1073 /// let mut b1 = [0; 10];
1074 /// let mut b2 = [0; 10];
1075 ///
1076 /// // Peek at the data
1077 /// let n = stream.peek(&mut b1).await?;
1078 ///
1079 /// // Read the data
1080 /// assert_eq!(n, stream.read(&mut b2[..n]).await?);
1081 /// assert_eq!(&b1[..n], &b2[..n]);
1082 ///
1083 /// Ok(())
1084 /// }
1085 /// ```
1086 ///
1087 /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1088 ///
1089 /// [`read`]: fn@crate::io::AsyncReadExt::read
1090 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
1091 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1092 self.io
1093 .registration()
1094 .async_io(Interest::READABLE, || self.io.peek(buf))
1095 .await
1096 }
1097
1098 /// Shuts down the read, write, or both halves of this connection.
1099 ///
1100 /// This function will cause all pending and future I/O on the specified
1101 /// portions to return immediately with an appropriate value (see the
1102 /// documentation of `Shutdown`).
1103 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1104 self.io.shutdown(how)
1105 }
1106
1107 /// Gets the value of the `TCP_NODELAY` option on this socket.
1108 ///
1109 /// For more information about this option, see [`set_nodelay`].
1110 ///
1111 /// [`set_nodelay`]: TcpStream::set_nodelay
1112 ///
1113 /// # Examples
1114 ///
1115 /// ```no_run
1116 /// use tokio::net::TcpStream;
1117 ///
1118 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1119 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1120 ///
1121 /// println!("{:?}", stream.nodelay()?);
1122 /// # Ok(())
1123 /// # }
1124 /// ```
1125 pub fn nodelay(&self) -> io::Result<bool> {
1126 self.io.nodelay()
1127 }
1128
1129 /// Sets the value of the `TCP_NODELAY` option on this socket.
1130 ///
1131 /// If set, this option disables the Nagle algorithm. This means that
1132 /// segments are always sent as soon as possible, even if there is only a
1133 /// small amount of data. When not set, data is buffered until there is a
1134 /// sufficient amount to send out, thereby avoiding the frequent sending of
1135 /// small packets.
1136 ///
1137 /// # Examples
1138 ///
1139 /// ```no_run
1140 /// use tokio::net::TcpStream;
1141 ///
1142 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1143 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1144 ///
1145 /// stream.set_nodelay(true)?;
1146 /// # Ok(())
1147 /// # }
1148 /// ```
1149 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1150 self.io.set_nodelay(nodelay)
1151 }
1152
1153 cfg_not_wasi! {
1154 /// Reads the linger duration for this socket by getting the `SO_LINGER`
1155 /// option.
1156 ///
1157 /// For more information about this option, see [`set_linger`].
1158 ///
1159 /// [`set_linger`]: TcpStream::set_linger
1160 ///
1161 /// # Examples
1162 ///
1163 /// ```no_run
1164 /// use tokio::net::TcpStream;
1165 ///
1166 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1167 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1168 ///
1169 /// println!("{:?}", stream.linger()?);
1170 /// # Ok(())
1171 /// # }
1172 /// ```
1173 pub fn linger(&self) -> io::Result<Option<Duration>> {
1174 socket2::SockRef::from(self).linger()
1175 }
1176
1177 /// Sets the linger duration of this socket by setting the SO_LINGER option.
1178 ///
1179 /// This option controls the action taken when a stream has unsent messages and the stream is
1180 /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
1181 /// data or until the time expires.
1182 ///
1183 /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
1184 /// way that allows the process to continue as quickly as possible.
1185 ///
1186 /// # Examples
1187 ///
1188 /// ```no_run
1189 /// use tokio::net::TcpStream;
1190 ///
1191 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1192 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1193 ///
1194 /// stream.set_linger(None)?;
1195 /// # Ok(())
1196 /// # }
1197 /// ```
1198 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1199 socket2::SockRef::from(self).set_linger(dur)
1200 }
1201 }
1202
1203 /// Gets the value of the `IP_TTL` option for this socket.
1204 ///
1205 /// For more information about this option, see [`set_ttl`].
1206 ///
1207 /// [`set_ttl`]: TcpStream::set_ttl
1208 ///
1209 /// # Examples
1210 ///
1211 /// ```no_run
1212 /// use tokio::net::TcpStream;
1213 ///
1214 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1215 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1216 ///
1217 /// println!("{:?}", stream.ttl()?);
1218 /// # Ok(())
1219 /// # }
1220 /// ```
1221 pub fn ttl(&self) -> io::Result<u32> {
1222 self.io.ttl()
1223 }
1224
1225 /// Sets the value for the `IP_TTL` option on this socket.
1226 ///
1227 /// This value sets the time-to-live field that is used in every packet sent
1228 /// from this socket.
1229 ///
1230 /// # Examples
1231 ///
1232 /// ```no_run
1233 /// use tokio::net::TcpStream;
1234 ///
1235 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1236 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1237 ///
1238 /// stream.set_ttl(123)?;
1239 /// # Ok(())
1240 /// # }
1241 /// ```
1242 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1243 self.io.set_ttl(ttl)
1244 }
1245
1246 // These lifetime markers also appear in the generated documentation, and make
1247 // it more clear that this is a *borrowed* split.
1248 #[allow(clippy::needless_lifetimes)]
1249 /// Splits a `TcpStream` into a read half and a write half, which can be used
1250 /// to read and write the stream concurrently.
1251 ///
1252 /// This method is more efficient than [`into_split`], but the halves cannot be
1253 /// moved into independently spawned tasks.
1254 ///
1255 /// [`into_split`]: TcpStream::into_split()
1256 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1257 split(self)
1258 }
1259
1260 /// Splits a `TcpStream` into a read half and a write half, which can be used
1261 /// to read and write the stream concurrently.
1262 ///
1263 /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1264 /// this comes at the cost of a heap allocation.
1265 ///
1266 /// **Note:** Dropping the write half will shut down the write half of the TCP
1267 /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1268 ///
1269 /// [`split`]: TcpStream::split()
1270 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
1271 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1272 split_owned(self)
1273 }
1274
1275 // == Poll IO functions that takes `&self` ==
1276 //
1277 // To read or write without mutable access to the `UnixStream`, combine the
1278 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1279 // `try_write` methods.
1280
1281 pub(crate) fn poll_read_priv(
1282 &self,
1283 cx: &mut Context<'_>,
1284 buf: &mut ReadBuf<'_>,
1285 ) -> Poll<io::Result<()>> {
1286 // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1287 unsafe { self.io.poll_read(cx, buf) }
1288 }
1289
1290 pub(super) fn poll_write_priv(
1291 &self,
1292 cx: &mut Context<'_>,
1293 buf: &[u8],
1294 ) -> Poll<io::Result<usize>> {
1295 self.io.poll_write(cx, buf)
1296 }
1297
1298 pub(super) fn poll_write_vectored_priv(
1299 &self,
1300 cx: &mut Context<'_>,
1301 bufs: &[io::IoSlice<'_>],
1302 ) -> Poll<io::Result<usize>> {
1303 self.io.poll_write_vectored(cx, bufs)
1304 }
1305}
1306
1307impl TryFrom<std::net::TcpStream> for TcpStream {
1308 type Error = io::Error;
1309
1310 /// Consumes stream, returning the tokio I/O object.
1311 ///
1312 /// This is equivalent to
1313 /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
1314 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1315 Self::from_std(stream)
1316 }
1317}
1318
1319// ===== impl Read / Write =====
1320
1321impl AsyncRead for TcpStream {
1322 fn poll_read(
1323 self: Pin<&mut Self>,
1324 cx: &mut Context<'_>,
1325 buf: &mut ReadBuf<'_>,
1326 ) -> Poll<io::Result<()>> {
1327 self.poll_read_priv(cx, buf)
1328 }
1329}
1330
1331impl AsyncWrite for TcpStream {
1332 fn poll_write(
1333 self: Pin<&mut Self>,
1334 cx: &mut Context<'_>,
1335 buf: &[u8],
1336 ) -> Poll<io::Result<usize>> {
1337 self.poll_write_priv(cx, buf)
1338 }
1339
1340 fn poll_write_vectored(
1341 self: Pin<&mut Self>,
1342 cx: &mut Context<'_>,
1343 bufs: &[io::IoSlice<'_>],
1344 ) -> Poll<io::Result<usize>> {
1345 self.poll_write_vectored_priv(cx, bufs)
1346 }
1347
1348 fn is_write_vectored(&self) -> bool {
1349 true
1350 }
1351
1352 #[inline]
1353 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1354 // tcp flush is a no-op
1355 Poll::Ready(Ok(()))
1356 }
1357
1358 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1359 self.shutdown_std(std::net::Shutdown::Write)?;
1360 Poll::Ready(Ok(()))
1361 }
1362}
1363
1364impl fmt::Debug for TcpStream {
1365 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1366 self.io.fmt(f)
1367 }
1368}
1369
1370#[cfg(unix)]
1371mod sys {
1372 use super::TcpStream;
1373 use std::os::unix::prelude::*;
1374
1375 impl AsRawFd for TcpStream {
1376 fn as_raw_fd(&self) -> RawFd {
1377 self.io.as_raw_fd()
1378 }
1379 }
1380
1381 #[cfg(not(tokio_no_as_fd))]
1382 impl AsFd for TcpStream {
1383 fn as_fd(&self) -> BorrowedFd<'_> {
1384 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1385 }
1386 }
1387}
1388
1389cfg_windows! {
1390 use crate::os::windows::io::{AsRawSocket, RawSocket};
1391 #[cfg(not(tokio_no_as_fd))]
1392 use crate::os::windows::io::{AsSocket, BorrowedSocket};
1393
1394 impl AsRawSocket for TcpStream {
1395 fn as_raw_socket(&self) -> RawSocket {
1396 self.io.as_raw_socket()
1397 }
1398 }
1399
1400 #[cfg(not(tokio_no_as_fd))]
1401 impl AsSocket for TcpStream {
1402 fn as_socket(&self) -> BorrowedSocket<'_> {
1403 unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1404 }
1405 }
1406}
1407
1408#[cfg(all(tokio_unstable, tokio_wasi))]
1409mod sys {
1410 use super::TcpStream;
1411 use std::os::wasi::prelude::*;
1412
1413 impl AsRawFd for TcpStream {
1414 fn as_raw_fd(&self) -> RawFd {
1415 self.io.as_raw_fd()
1416 }
1417 }
1418
1419 #[cfg(not(tokio_no_as_fd))]
1420 impl AsFd for TcpStream {
1421 fn as_fd(&self) -> BorrowedFd<'_> {
1422 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1423 }
1424 }
1425}
1426