1 | #![allow (clippy::needless_doctest_main)] |
2 | #![warn ( |
3 | missing_debug_implementations, |
4 | missing_docs, |
5 | rust_2018_idioms, |
6 | unreachable_pub |
7 | )] |
8 | #![doc (test( |
9 | no_crate_inject, |
10 | attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) |
11 | ))] |
12 | #![cfg_attr (docsrs, feature(doc_cfg))] |
13 | |
14 | //! Utilities for working with Tokio. |
15 | //! |
16 | //! This crate is not versioned in lockstep with the core |
17 | //! [`tokio`] crate. However, `tokio-util` _will_ respect Rust's |
18 | //! semantic versioning policy, especially with regard to breaking changes. |
19 | //! |
20 | //! [`tokio`]: https://docs.rs/tokio |
21 | |
22 | #[macro_use ] |
23 | mod cfg; |
24 | |
25 | mod loom; |
26 | |
27 | cfg_codec! { |
28 | pub mod codec; |
29 | } |
30 | |
31 | cfg_net! { |
32 | pub mod udp; |
33 | pub mod net; |
34 | } |
35 | |
36 | cfg_compat! { |
37 | pub mod compat; |
38 | } |
39 | |
40 | cfg_io! { |
41 | pub mod io; |
42 | } |
43 | |
44 | cfg_rt! { |
45 | pub mod context; |
46 | pub mod task; |
47 | } |
48 | |
49 | cfg_time! { |
50 | pub mod time; |
51 | } |
52 | |
53 | pub mod sync; |
54 | |
55 | pub mod either; |
56 | |
57 | #[cfg (any(feature = "io" , feature = "codec" ))] |
58 | mod util { |
59 | use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; |
60 | |
61 | use bytes::{Buf, BufMut}; |
62 | use futures_core::ready; |
63 | use std::io::{self, IoSlice}; |
64 | use std::mem::MaybeUninit; |
65 | use std::pin::Pin; |
66 | use std::task::{Context, Poll}; |
67 | |
68 | /// Try to read data from an `AsyncRead` into an implementer of the [`BufMut`] trait. |
69 | /// |
70 | /// [`BufMut`]: bytes::Buf |
71 | /// |
72 | /// # Example |
73 | /// |
74 | /// ``` |
75 | /// use bytes::{Bytes, BytesMut}; |
76 | /// use tokio_stream as stream; |
77 | /// use tokio::io::Result; |
78 | /// use tokio_util::io::{StreamReader, poll_read_buf}; |
79 | /// use futures::future::poll_fn; |
80 | /// use std::pin::Pin; |
81 | /// # #[tokio::main] |
82 | /// # async fn main() -> std::io::Result<()> { |
83 | /// |
84 | /// // Create a reader from an iterator. This particular reader will always be |
85 | /// // ready. |
86 | /// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); |
87 | /// |
88 | /// let mut buf = BytesMut::new(); |
89 | /// let mut reads = 0; |
90 | /// |
91 | /// loop { |
92 | /// reads += 1; |
93 | /// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; |
94 | /// |
95 | /// if n == 0 { |
96 | /// break; |
97 | /// } |
98 | /// } |
99 | /// |
100 | /// // one or more reads might be necessary. |
101 | /// assert!(reads >= 1); |
102 | /// assert_eq!(&buf[..], &[0, 1, 2, 3]); |
103 | /// # Ok(()) |
104 | /// # } |
105 | /// ``` |
106 | #[cfg_attr (not(feature = "io" ), allow(unreachable_pub))] |
107 | pub fn poll_read_buf<T: AsyncRead, B: BufMut>( |
108 | io: Pin<&mut T>, |
109 | cx: &mut Context<'_>, |
110 | buf: &mut B, |
111 | ) -> Poll<io::Result<usize>> { |
112 | if !buf.has_remaining_mut() { |
113 | return Poll::Ready(Ok(0)); |
114 | } |
115 | |
116 | let n = { |
117 | let dst = buf.chunk_mut(); |
118 | let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) }; |
119 | let mut buf = ReadBuf::uninit(dst); |
120 | let ptr = buf.filled().as_ptr(); |
121 | ready!(io.poll_read(cx, &mut buf)?); |
122 | |
123 | // Ensure the pointer does not change from under us |
124 | assert_eq!(ptr, buf.filled().as_ptr()); |
125 | buf.filled().len() |
126 | }; |
127 | |
128 | // Safety: This is guaranteed to be the number of initialized (and read) |
129 | // bytes due to the invariants provided by `ReadBuf::filled`. |
130 | unsafe { |
131 | buf.advance_mut(n); |
132 | } |
133 | |
134 | Poll::Ready(Ok(n)) |
135 | } |
136 | |
137 | /// Try to write data from an implementer of the [`Buf`] trait to an |
138 | /// [`AsyncWrite`], advancing the buffer's internal cursor. |
139 | /// |
140 | /// This function will use [vectored writes] when the [`AsyncWrite`] supports |
141 | /// vectored writes. |
142 | /// |
143 | /// # Examples |
144 | /// |
145 | /// [`File`] implements [`AsyncWrite`] and [`Cursor<&[u8]>`] implements |
146 | /// [`Buf`]: |
147 | /// |
148 | /// ```no_run |
149 | /// use tokio_util::io::poll_write_buf; |
150 | /// use tokio::io; |
151 | /// use tokio::fs::File; |
152 | /// |
153 | /// use bytes::Buf; |
154 | /// use std::io::Cursor; |
155 | /// use std::pin::Pin; |
156 | /// use futures::future::poll_fn; |
157 | /// |
158 | /// #[tokio::main] |
159 | /// async fn main() -> io::Result<()> { |
160 | /// let mut file = File::create("foo.txt" ).await?; |
161 | /// let mut buf = Cursor::new(b"data to write" ); |
162 | /// |
163 | /// // Loop until the entire contents of the buffer are written to |
164 | /// // the file. |
165 | /// while buf.has_remaining() { |
166 | /// poll_fn(|cx| poll_write_buf(Pin::new(&mut file), cx, &mut buf)).await?; |
167 | /// } |
168 | /// |
169 | /// Ok(()) |
170 | /// } |
171 | /// ``` |
172 | /// |
173 | /// [`Buf`]: bytes::Buf |
174 | /// [`AsyncWrite`]: tokio::io::AsyncWrite |
175 | /// [`File`]: tokio::fs::File |
176 | /// [vectored writes]: tokio::io::AsyncWrite::poll_write_vectored |
177 | #[cfg_attr (not(feature = "io" ), allow(unreachable_pub))] |
178 | pub fn poll_write_buf<T: AsyncWrite, B: Buf>( |
179 | io: Pin<&mut T>, |
180 | cx: &mut Context<'_>, |
181 | buf: &mut B, |
182 | ) -> Poll<io::Result<usize>> { |
183 | const MAX_BUFS: usize = 64; |
184 | |
185 | if !buf.has_remaining() { |
186 | return Poll::Ready(Ok(0)); |
187 | } |
188 | |
189 | let n = if io.is_write_vectored() { |
190 | let mut slices = [IoSlice::new(&[]); MAX_BUFS]; |
191 | let cnt = buf.chunks_vectored(&mut slices); |
192 | ready!(io.poll_write_vectored(cx, &slices[..cnt]))? |
193 | } else { |
194 | ready!(io.poll_write(cx, buf.chunk()))? |
195 | }; |
196 | |
197 | buf.advance(n); |
198 | |
199 | Poll::Ready(Ok(n)) |
200 | } |
201 | } |
202 | |