1 | //! Adaptors from AsyncRead/AsyncWrite to Stream/Sink |
2 | //! |
3 | //! Raw I/O objects work with byte sequences, but higher-level code usually |
4 | //! wants to batch these into meaningful chunks, called "frames". |
5 | //! |
6 | //! This module contains adapters to go from streams of bytes, [`AsyncRead`] and |
7 | //! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. |
8 | //! Framed streams are also known as transports. |
9 | //! |
10 | //! # The Decoder trait |
11 | //! |
12 | //! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an |
13 | //! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify |
14 | //! how sequences of bytes are turned into a sequence of frames, and to |
15 | //! determine where the boundaries between frames are. The job of the |
16 | //! `FramedRead` is to repeatedly switch between reading more data from the IO |
17 | //! resource, and asking the decoder whether we have received enough data to |
18 | //! decode another frame of data. |
19 | //! |
20 | //! The main method on the `Decoder` trait is the [`decode`] method. This method |
21 | //! takes as argument the data that has been read so far, and when it is called, |
22 | //! it will be in one of the following situations: |
23 | //! |
24 | //! 1. The buffer contains less than a full frame. |
25 | //! 2. The buffer contains exactly a full frame. |
26 | //! 3. The buffer contains more than a full frame. |
27 | //! |
28 | //! In the first situation, the decoder should return `Ok(None)`. |
29 | //! |
30 | //! In the second situation, the decoder should clear the provided buffer and |
31 | //! return `Ok(Some(the_decoded_frame))`. |
32 | //! |
33 | //! In the third situation, the decoder should use a method such as [`split_to`] |
34 | //! or [`advance`] to modify the buffer such that the frame is removed from the |
35 | //! buffer, but any data in the buffer after that frame should still remain in |
36 | //! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in |
37 | //! this case. |
38 | //! |
39 | //! Finally the decoder may return an error if the data is invalid in some way. |
40 | //! The decoder should _not_ return an error just because it has yet to receive |
41 | //! a full frame. |
42 | //! |
43 | //! It is guaranteed that, from one call to `decode` to another, the provided |
44 | //! buffer will contain the exact same data as before, except that if more data |
45 | //! has arrived through the IO resource, that data will have been appended to |
46 | //! the buffer. This means that reading frames from a `FramedRead` is |
47 | //! essentially equivalent to the following loop: |
48 | //! |
49 | //! ```no_run |
50 | //! use tokio::io::AsyncReadExt; |
51 | //! # // This uses async_stream to create an example that compiles. |
52 | //! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! { |
53 | //! # use tokio_util::codec::Decoder; |
54 | //! # let mut decoder = tokio_util::codec::BytesCodec::new(); |
55 | //! # let io_resource = &mut &[0u8, 1, 2, 3][..]; |
56 | //! |
57 | //! let mut buf = bytes::BytesMut::new(); |
58 | //! loop { |
59 | //! // The read_buf call will append to buf rather than overwrite existing data. |
60 | //! let len = io_resource.read_buf(&mut buf).await?; |
61 | //! |
62 | //! if len == 0 { |
63 | //! while let Some(frame) = decoder.decode_eof(&mut buf)? { |
64 | //! yield frame; |
65 | //! } |
66 | //! break; |
67 | //! } |
68 | //! |
69 | //! while let Some(frame) = decoder.decode(&mut buf)? { |
70 | //! yield frame; |
71 | //! } |
72 | //! } |
73 | //! # }} |
74 | //! ``` |
75 | //! The example above uses `yield` whenever the `Stream` produces an item. |
76 | //! |
77 | //! ## Example decoder |
78 | //! |
79 | //! As an example, consider a protocol that can be used to send strings where |
80 | //! each frame is a four byte integer that contains the length of the frame, |
81 | //! followed by that many bytes of string data. The decoder fails with an error |
82 | //! if the string data is not valid utf-8 or too long. |
83 | //! |
84 | //! Such a decoder can be written like this: |
85 | //! ``` |
86 | //! use tokio_util::codec::Decoder; |
87 | //! use bytes::{BytesMut, Buf}; |
88 | //! |
89 | //! struct MyStringDecoder {} |
90 | //! |
91 | //! const MAX: usize = 8 * 1024 * 1024; |
92 | //! |
93 | //! impl Decoder for MyStringDecoder { |
94 | //! type Item = String; |
95 | //! type Error = std::io::Error; |
96 | //! |
97 | //! fn decode( |
98 | //! &mut self, |
99 | //! src: &mut BytesMut |
100 | //! ) -> Result<Option<Self::Item>, Self::Error> { |
101 | //! if src.len() < 4 { |
102 | //! // Not enough data to read length marker. |
103 | //! return Ok(None); |
104 | //! } |
105 | //! |
106 | //! // Read length marker. |
107 | //! let mut length_bytes = [0u8; 4]; |
108 | //! length_bytes.copy_from_slice(&src[..4]); |
109 | //! let length = u32::from_le_bytes(length_bytes) as usize; |
110 | //! |
111 | //! // Check that the length is not too large to avoid a denial of |
112 | //! // service attack where the server runs out of memory. |
113 | //! if length > MAX { |
114 | //! return Err(std::io::Error::new( |
115 | //! std::io::ErrorKind::InvalidData, |
116 | //! format!("Frame of length {} is too large." , length) |
117 | //! )); |
118 | //! } |
119 | //! |
120 | //! if src.len() < 4 + length { |
121 | //! // The full string has not yet arrived. |
122 | //! // |
123 | //! // We reserve more space in the buffer. This is not strictly |
124 | //! // necessary, but is a good idea performance-wise. |
125 | //! src.reserve(4 + length - src.len()); |
126 | //! |
127 | //! // We inform the Framed that we need more bytes to form the next |
128 | //! // frame. |
129 | //! return Ok(None); |
130 | //! } |
131 | //! |
132 | //! // Use advance to modify src such that it no longer contains |
133 | //! // this frame. |
134 | //! let data = src[4..4 + length].to_vec(); |
135 | //! src.advance(4 + length); |
136 | //! |
137 | //! // Convert the data to a string, or fail if it is not valid utf-8. |
138 | //! match String::from_utf8(data) { |
139 | //! Ok(string) => Ok(Some(string)), |
140 | //! Err(utf8_error) => { |
141 | //! Err(std::io::Error::new( |
142 | //! std::io::ErrorKind::InvalidData, |
143 | //! utf8_error.utf8_error(), |
144 | //! )) |
145 | //! }, |
146 | //! } |
147 | //! } |
148 | //! } |
149 | //! ``` |
150 | //! |
151 | //! # The Encoder trait |
152 | //! |
153 | //! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn |
154 | //! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to |
155 | //! specify how frames are turned into a sequences of bytes. The job of the |
156 | //! `FramedWrite` is to take the resulting sequence of bytes and write it to the |
157 | //! IO resource. |
158 | //! |
159 | //! The main method on the `Encoder` trait is the [`encode`] method. This method |
160 | //! takes an item that is being written, and a buffer to write the item to. The |
161 | //! buffer may already contain data, and in this case, the encoder should append |
162 | //! the new frame the to buffer rather than overwrite the existing data. |
163 | //! |
164 | //! It is guaranteed that, from one call to `encode` to another, the provided |
165 | //! buffer will contain the exact same data as before, except that some of the |
166 | //! data may have been removed from the front of the buffer. Writing to a |
167 | //! `FramedWrite` is essentially equivalent to the following loop: |
168 | //! |
169 | //! ```no_run |
170 | //! use tokio::io::AsyncWriteExt; |
171 | //! use bytes::Buf; // for advance |
172 | //! # use tokio_util::codec::Encoder; |
173 | //! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() } |
174 | //! # async fn no_more_frames() { } |
175 | //! # #[tokio::main] async fn main() -> std::io::Result<()> { |
176 | //! # let mut io_resource = tokio::io::sink(); |
177 | //! # let mut encoder = tokio_util::codec::BytesCodec::new(); |
178 | //! |
179 | //! const MAX: usize = 8192; |
180 | //! |
181 | //! let mut buf = bytes::BytesMut::new(); |
182 | //! loop { |
183 | //! tokio::select! { |
184 | //! num_written = io_resource.write(&buf), if !buf.is_empty() => { |
185 | //! buf.advance(num_written?); |
186 | //! }, |
187 | //! frame = next_frame(), if buf.len() < MAX => { |
188 | //! encoder.encode(frame, &mut buf)?; |
189 | //! }, |
190 | //! _ = no_more_frames() => { |
191 | //! io_resource.write_all(&buf).await?; |
192 | //! io_resource.shutdown().await?; |
193 | //! return Ok(()); |
194 | //! }, |
195 | //! } |
196 | //! } |
197 | //! # } |
198 | //! ``` |
199 | //! Here the `next_frame` method corresponds to any frames you write to the |
200 | //! `FramedWrite`. The `no_more_frames` method corresponds to closing the |
201 | //! `FramedWrite` with [`SinkExt::close`]. |
202 | //! |
203 | //! ## Example encoder |
204 | //! |
205 | //! As an example, consider a protocol that can be used to send strings where |
206 | //! each frame is a four byte integer that contains the length of the frame, |
207 | //! followed by that many bytes of string data. The encoder will fail if the |
208 | //! string is too long. |
209 | //! |
210 | //! Such an encoder can be written like this: |
211 | //! ``` |
212 | //! use tokio_util::codec::Encoder; |
213 | //! use bytes::BytesMut; |
214 | //! |
215 | //! struct MyStringEncoder {} |
216 | //! |
217 | //! const MAX: usize = 8 * 1024 * 1024; |
218 | //! |
219 | //! impl Encoder<String> for MyStringEncoder { |
220 | //! type Error = std::io::Error; |
221 | //! |
222 | //! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> { |
223 | //! // Don't send a string if it is longer than the other end will |
224 | //! // accept. |
225 | //! if item.len() > MAX { |
226 | //! return Err(std::io::Error::new( |
227 | //! std::io::ErrorKind::InvalidData, |
228 | //! format!("Frame of length {} is too large." , item.len()) |
229 | //! )); |
230 | //! } |
231 | //! |
232 | //! // Convert the length into a byte array. |
233 | //! // The cast to u32 cannot overflow due to the length check above. |
234 | //! let len_slice = u32::to_le_bytes(item.len() as u32); |
235 | //! |
236 | //! // Reserve space in the buffer. |
237 | //! dst.reserve(4 + item.len()); |
238 | //! |
239 | //! // Write the length and string to the buffer. |
240 | //! dst.extend_from_slice(&len_slice); |
241 | //! dst.extend_from_slice(item.as_bytes()); |
242 | //! Ok(()) |
243 | //! } |
244 | //! } |
245 | //! ``` |
246 | //! |
247 | //! [`AsyncRead`]: tokio::io::AsyncRead |
248 | //! [`AsyncWrite`]: tokio::io::AsyncWrite |
249 | //! [`Stream`]: futures_core::Stream |
250 | //! [`Sink`]: futures_sink::Sink |
251 | //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close |
252 | //! [`FramedRead`]: struct@crate::codec::FramedRead |
253 | //! [`FramedWrite`]: struct@crate::codec::FramedWrite |
254 | //! [`Framed`]: struct@crate::codec::Framed |
255 | //! [`Decoder`]: trait@crate::codec::Decoder |
256 | //! [`decode`]: fn@crate::codec::Decoder::decode |
257 | //! [`encode`]: fn@crate::codec::Encoder::encode |
258 | //! [`split_to`]: fn@bytes::BytesMut::split_to |
259 | //! [`advance`]: fn@bytes::Buf::advance |
260 | |
261 | mod bytes_codec; |
262 | pub use self::bytes_codec::BytesCodec; |
263 | |
264 | mod decoder; |
265 | pub use self::decoder::Decoder; |
266 | |
267 | mod encoder; |
268 | pub use self::encoder::Encoder; |
269 | |
270 | mod framed_impl; |
271 | #[allow (unused_imports)] |
272 | pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; |
273 | |
274 | mod framed; |
275 | pub use self::framed::{Framed, FramedParts}; |
276 | |
277 | mod framed_read; |
278 | pub use self::framed_read::FramedRead; |
279 | |
280 | mod framed_write; |
281 | pub use self::framed_write::FramedWrite; |
282 | |
283 | pub mod length_delimited; |
284 | pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError}; |
285 | |
286 | mod lines_codec; |
287 | pub use self::lines_codec::{LinesCodec, LinesCodecError}; |
288 | |
289 | mod any_delimiter_codec; |
290 | pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError}; |
291 | |