1#![allow(clippy::needless_doctest_main)]
2#![warn(
3 missing_debug_implementations,
4 missing_docs,
5 rust_2018_idioms,
6 unreachable_pub
7)]
8#![doc(test(
9 no_crate_inject,
10 attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
11))]
12#![cfg_attr(docsrs, feature(doc_cfg))]
13
14//! Utilities for working with Tokio.
15//!
16//! This crate is not versioned in lockstep with the core
17//! [`tokio`] crate. However, `tokio-util` _will_ respect Rust's
18//! semantic versioning policy, especially with regard to breaking changes.
19//!
20//! [`tokio`]: https://docs.rs/tokio
21
22#[macro_use]
23mod cfg;
24
25mod loom;
26
27cfg_codec! {
28 pub mod codec;
29}
30
31cfg_net! {
32 pub mod udp;
33 pub mod net;
34}
35
36cfg_compat! {
37 pub mod compat;
38}
39
40cfg_io! {
41 pub mod io;
42}
43
44cfg_rt! {
45 pub mod context;
46 pub mod task;
47}
48
49cfg_time! {
50 pub mod time;
51}
52
53pub mod sync;
54
55pub mod either;
56
57#[cfg(any(feature = "io", feature = "codec"))]
58mod util {
59 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
60
61 use bytes::{Buf, BufMut};
62 use futures_core::ready;
63 use std::io::{self, IoSlice};
64 use std::mem::MaybeUninit;
65 use std::pin::Pin;
66 use std::task::{Context, Poll};
67
68 /// Try to read data from an `AsyncRead` into an implementer of the [`BufMut`] trait.
69 ///
70 /// [`BufMut`]: bytes::Buf
71 ///
72 /// # Example
73 ///
74 /// ```
75 /// use bytes::{Bytes, BytesMut};
76 /// use tokio_stream as stream;
77 /// use tokio::io::Result;
78 /// use tokio_util::io::{StreamReader, poll_read_buf};
79 /// use futures::future::poll_fn;
80 /// use std::pin::Pin;
81 /// # #[tokio::main]
82 /// # async fn main() -> std::io::Result<()> {
83 ///
84 /// // Create a reader from an iterator. This particular reader will always be
85 /// // ready.
86 /// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))]));
87 ///
88 /// let mut buf = BytesMut::new();
89 /// let mut reads = 0;
90 ///
91 /// loop {
92 /// reads += 1;
93 /// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?;
94 ///
95 /// if n == 0 {
96 /// break;
97 /// }
98 /// }
99 ///
100 /// // one or more reads might be necessary.
101 /// assert!(reads >= 1);
102 /// assert_eq!(&buf[..], &[0, 1, 2, 3]);
103 /// # Ok(())
104 /// # }
105 /// ```
106 #[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
107 pub fn poll_read_buf<T: AsyncRead, B: BufMut>(
108 io: Pin<&mut T>,
109 cx: &mut Context<'_>,
110 buf: &mut B,
111 ) -> Poll<io::Result<usize>> {
112 if !buf.has_remaining_mut() {
113 return Poll::Ready(Ok(0));
114 }
115
116 let n = {
117 let dst = buf.chunk_mut();
118 let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
119 let mut buf = ReadBuf::uninit(dst);
120 let ptr = buf.filled().as_ptr();
121 ready!(io.poll_read(cx, &mut buf)?);
122
123 // Ensure the pointer does not change from under us
124 assert_eq!(ptr, buf.filled().as_ptr());
125 buf.filled().len()
126 };
127
128 // Safety: This is guaranteed to be the number of initialized (and read)
129 // bytes due to the invariants provided by `ReadBuf::filled`.
130 unsafe {
131 buf.advance_mut(n);
132 }
133
134 Poll::Ready(Ok(n))
135 }
136
137 /// Try to write data from an implementer of the [`Buf`] trait to an
138 /// [`AsyncWrite`], advancing the buffer's internal cursor.
139 ///
140 /// This function will use [vectored writes] when the [`AsyncWrite`] supports
141 /// vectored writes.
142 ///
143 /// # Examples
144 ///
145 /// [`File`] implements [`AsyncWrite`] and [`Cursor<&[u8]>`] implements
146 /// [`Buf`]:
147 ///
148 /// ```no_run
149 /// use tokio_util::io::poll_write_buf;
150 /// use tokio::io;
151 /// use tokio::fs::File;
152 ///
153 /// use bytes::Buf;
154 /// use std::io::Cursor;
155 /// use std::pin::Pin;
156 /// use futures::future::poll_fn;
157 ///
158 /// #[tokio::main]
159 /// async fn main() -> io::Result<()> {
160 /// let mut file = File::create("foo.txt").await?;
161 /// let mut buf = Cursor::new(b"data to write");
162 ///
163 /// // Loop until the entire contents of the buffer are written to
164 /// // the file.
165 /// while buf.has_remaining() {
166 /// poll_fn(|cx| poll_write_buf(Pin::new(&mut file), cx, &mut buf)).await?;
167 /// }
168 ///
169 /// Ok(())
170 /// }
171 /// ```
172 ///
173 /// [`Buf`]: bytes::Buf
174 /// [`AsyncWrite`]: tokio::io::AsyncWrite
175 /// [`File`]: tokio::fs::File
176 /// [vectored writes]: tokio::io::AsyncWrite::poll_write_vectored
177 #[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
178 pub fn poll_write_buf<T: AsyncWrite, B: Buf>(
179 io: Pin<&mut T>,
180 cx: &mut Context<'_>,
181 buf: &mut B,
182 ) -> Poll<io::Result<usize>> {
183 const MAX_BUFS: usize = 64;
184
185 if !buf.has_remaining() {
186 return Poll::Ready(Ok(0));
187 }
188
189 let n = if io.is_write_vectored() {
190 let mut slices = [IoSlice::new(&[]); MAX_BUFS];
191 let cnt = buf.chunks_vectored(&mut slices);
192 ready!(io.poll_write_vectored(cx, &slices[..cnt]))?
193 } else {
194 ready!(io.poll_write(cx, buf.chunk()))?
195 };
196
197 buf.advance(n);
198
199 Poll::Ready(Ok(n))
200 }
201}
202