1//! Adaptors from AsyncRead/AsyncWrite to Stream/Sink
2//!
3//! Raw I/O objects work with byte sequences, but higher-level code usually
4//! wants to batch these into meaningful chunks, called "frames".
5//!
6//! This module contains adapters to go from streams of bytes, [`AsyncRead`] and
7//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
8//! Framed streams are also known as transports.
9//!
10//! # The Decoder trait
11//!
12//! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an
13//! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify
14//! how sequences of bytes are turned into a sequence of frames, and to
15//! determine where the boundaries between frames are. The job of the
16//! `FramedRead` is to repeatedly switch between reading more data from the IO
17//! resource, and asking the decoder whether we have received enough data to
18//! decode another frame of data.
19//!
20//! The main method on the `Decoder` trait is the [`decode`] method. This method
21//! takes as argument the data that has been read so far, and when it is called,
22//! it will be in one of the following situations:
23//!
24//! 1. The buffer contains less than a full frame.
25//! 2. The buffer contains exactly a full frame.
26//! 3. The buffer contains more than a full frame.
27//!
28//! In the first situation, the decoder should return `Ok(None)`.
29//!
30//! In the second situation, the decoder should clear the provided buffer and
31//! return `Ok(Some(the_decoded_frame))`.
32//!
33//! In the third situation, the decoder should use a method such as [`split_to`]
34//! or [`advance`] to modify the buffer such that the frame is removed from the
35//! buffer, but any data in the buffer after that frame should still remain in
36//! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in
37//! this case.
38//!
39//! Finally the decoder may return an error if the data is invalid in some way.
40//! The decoder should _not_ return an error just because it has yet to receive
41//! a full frame.
42//!
43//! It is guaranteed that, from one call to `decode` to another, the provided
44//! buffer will contain the exact same data as before, except that if more data
45//! has arrived through the IO resource, that data will have been appended to
46//! the buffer. This means that reading frames from a `FramedRead` is
47//! essentially equivalent to the following loop:
48//!
49//! ```no_run
50//! use tokio::io::AsyncReadExt;
51//! # // This uses async_stream to create an example that compiles.
52//! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! {
53//! # use tokio_util::codec::Decoder;
54//! # let mut decoder = tokio_util::codec::BytesCodec::new();
55//! # let io_resource = &mut &[0u8, 1, 2, 3][..];
56//!
57//! let mut buf = bytes::BytesMut::new();
58//! loop {
59//! // The read_buf call will append to buf rather than overwrite existing data.
60//! let len = io_resource.read_buf(&mut buf).await?;
61//!
62//! if len == 0 {
63//! while let Some(frame) = decoder.decode_eof(&mut buf)? {
64//! yield frame;
65//! }
66//! break;
67//! }
68//!
69//! while let Some(frame) = decoder.decode(&mut buf)? {
70//! yield frame;
71//! }
72//! }
73//! # }}
74//! ```
75//! The example above uses `yield` whenever the `Stream` produces an item.
76//!
77//! ## Example decoder
78//!
79//! As an example, consider a protocol that can be used to send strings where
80//! each frame is a four byte integer that contains the length of the frame,
81//! followed by that many bytes of string data. The decoder fails with an error
82//! if the string data is not valid utf-8 or too long.
83//!
84//! Such a decoder can be written like this:
85//! ```
86//! use tokio_util::codec::Decoder;
87//! use bytes::{BytesMut, Buf};
88//!
89//! struct MyStringDecoder {}
90//!
91//! const MAX: usize = 8 * 1024 * 1024;
92//!
93//! impl Decoder for MyStringDecoder {
94//! type Item = String;
95//! type Error = std::io::Error;
96//!
97//! fn decode(
98//! &mut self,
99//! src: &mut BytesMut
100//! ) -> Result<Option<Self::Item>, Self::Error> {
101//! if src.len() < 4 {
102//! // Not enough data to read length marker.
103//! return Ok(None);
104//! }
105//!
106//! // Read length marker.
107//! let mut length_bytes = [0u8; 4];
108//! length_bytes.copy_from_slice(&src[..4]);
109//! let length = u32::from_le_bytes(length_bytes) as usize;
110//!
111//! // Check that the length is not too large to avoid a denial of
112//! // service attack where the server runs out of memory.
113//! if length > MAX {
114//! return Err(std::io::Error::new(
115//! std::io::ErrorKind::InvalidData,
116//! format!("Frame of length {} is too large.", length)
117//! ));
118//! }
119//!
120//! if src.len() < 4 + length {
121//! // The full string has not yet arrived.
122//! //
123//! // We reserve more space in the buffer. This is not strictly
124//! // necessary, but is a good idea performance-wise.
125//! src.reserve(4 + length - src.len());
126//!
127//! // We inform the Framed that we need more bytes to form the next
128//! // frame.
129//! return Ok(None);
130//! }
131//!
132//! // Use advance to modify src such that it no longer contains
133//! // this frame.
134//! let data = src[4..4 + length].to_vec();
135//! src.advance(4 + length);
136//!
137//! // Convert the data to a string, or fail if it is not valid utf-8.
138//! match String::from_utf8(data) {
139//! Ok(string) => Ok(Some(string)),
140//! Err(utf8_error) => {
141//! Err(std::io::Error::new(
142//! std::io::ErrorKind::InvalidData,
143//! utf8_error.utf8_error(),
144//! ))
145//! },
146//! }
147//! }
148//! }
149//! ```
150//!
151//! # The Encoder trait
152//!
153//! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn
154//! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to
155//! specify how frames are turned into a sequences of bytes. The job of the
156//! `FramedWrite` is to take the resulting sequence of bytes and write it to the
157//! IO resource.
158//!
159//! The main method on the `Encoder` trait is the [`encode`] method. This method
160//! takes an item that is being written, and a buffer to write the item to. The
161//! buffer may already contain data, and in this case, the encoder should append
162//! the new frame the to buffer rather than overwrite the existing data.
163//!
164//! It is guaranteed that, from one call to `encode` to another, the provided
165//! buffer will contain the exact same data as before, except that some of the
166//! data may have been removed from the front of the buffer. Writing to a
167//! `FramedWrite` is essentially equivalent to the following loop:
168//!
169//! ```no_run
170//! use tokio::io::AsyncWriteExt;
171//! use bytes::Buf; // for advance
172//! # use tokio_util::codec::Encoder;
173//! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() }
174//! # async fn no_more_frames() { }
175//! # #[tokio::main] async fn main() -> std::io::Result<()> {
176//! # let mut io_resource = tokio::io::sink();
177//! # let mut encoder = tokio_util::codec::BytesCodec::new();
178//!
179//! const MAX: usize = 8192;
180//!
181//! let mut buf = bytes::BytesMut::new();
182//! loop {
183//! tokio::select! {
184//! num_written = io_resource.write(&buf), if !buf.is_empty() => {
185//! buf.advance(num_written?);
186//! },
187//! frame = next_frame(), if buf.len() < MAX => {
188//! encoder.encode(frame, &mut buf)?;
189//! },
190//! _ = no_more_frames() => {
191//! io_resource.write_all(&buf).await?;
192//! io_resource.shutdown().await?;
193//! return Ok(());
194//! },
195//! }
196//! }
197//! # }
198//! ```
199//! Here the `next_frame` method corresponds to any frames you write to the
200//! `FramedWrite`. The `no_more_frames` method corresponds to closing the
201//! `FramedWrite` with [`SinkExt::close`].
202//!
203//! ## Example encoder
204//!
205//! As an example, consider a protocol that can be used to send strings where
206//! each frame is a four byte integer that contains the length of the frame,
207//! followed by that many bytes of string data. The encoder will fail if the
208//! string is too long.
209//!
210//! Such an encoder can be written like this:
211//! ```
212//! use tokio_util::codec::Encoder;
213//! use bytes::BytesMut;
214//!
215//! struct MyStringEncoder {}
216//!
217//! const MAX: usize = 8 * 1024 * 1024;
218//!
219//! impl Encoder<String> for MyStringEncoder {
220//! type Error = std::io::Error;
221//!
222//! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
223//! // Don't send a string if it is longer than the other end will
224//! // accept.
225//! if item.len() > MAX {
226//! return Err(std::io::Error::new(
227//! std::io::ErrorKind::InvalidData,
228//! format!("Frame of length {} is too large.", item.len())
229//! ));
230//! }
231//!
232//! // Convert the length into a byte array.
233//! // The cast to u32 cannot overflow due to the length check above.
234//! let len_slice = u32::to_le_bytes(item.len() as u32);
235//!
236//! // Reserve space in the buffer.
237//! dst.reserve(4 + item.len());
238//!
239//! // Write the length and string to the buffer.
240//! dst.extend_from_slice(&len_slice);
241//! dst.extend_from_slice(item.as_bytes());
242//! Ok(())
243//! }
244//! }
245//! ```
246//!
247//! [`AsyncRead`]: tokio::io::AsyncRead
248//! [`AsyncWrite`]: tokio::io::AsyncWrite
249//! [`Stream`]: futures_core::Stream
250//! [`Sink`]: futures_sink::Sink
251//! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close
252//! [`FramedRead`]: struct@crate::codec::FramedRead
253//! [`FramedWrite`]: struct@crate::codec::FramedWrite
254//! [`Framed`]: struct@crate::codec::Framed
255//! [`Decoder`]: trait@crate::codec::Decoder
256//! [`decode`]: fn@crate::codec::Decoder::decode
257//! [`encode`]: fn@crate::codec::Encoder::encode
258//! [`split_to`]: fn@bytes::BytesMut::split_to
259//! [`advance`]: fn@bytes::Buf::advance
260
261mod bytes_codec;
262pub use self::bytes_codec::BytesCodec;
263
264mod decoder;
265pub use self::decoder::Decoder;
266
267mod encoder;
268pub use self::encoder::Encoder;
269
270mod framed_impl;
271#[allow(unused_imports)]
272pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
273
274mod framed;
275pub use self::framed::{Framed, FramedParts};
276
277mod framed_read;
278pub use self::framed_read::FramedRead;
279
280mod framed_write;
281pub use self::framed_write::FramedWrite;
282
283pub mod length_delimited;
284pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
285
286mod lines_codec;
287pub use self::lines_codec::{LinesCodec, LinesCodecError};
288
289mod any_delimiter_codec;
290pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError};
291