1 | //! Adaptors from `AsyncRead`/`AsyncWrite` to Stream/Sink |
2 | //! |
3 | //! Raw I/O objects work with byte sequences, but higher-level code usually |
4 | //! wants to batch these into meaningful chunks, called "frames". |
5 | //! |
6 | //! This module contains adapters to go from streams of bytes, [`AsyncRead`] and |
7 | //! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. |
8 | //! Framed streams are also known as transports. |
9 | //! |
10 | //! # Example encoding using `LinesCodec` |
11 | //! |
12 | //! The following example demonstrates how to use a codec such as [`LinesCodec`] to |
13 | //! write framed data. [`FramedWrite`] can be used to achieve this. Data sent to |
14 | //! [`FramedWrite`] are first framed according to a specific codec, and then sent to |
15 | //! an implementor of [`AsyncWrite`]. |
16 | //! |
17 | //! ``` |
18 | //! use futures::sink::SinkExt; |
19 | //! use tokio_util::codec::LinesCodec; |
20 | //! use tokio_util::codec::FramedWrite; |
21 | //! |
22 | //! #[tokio::main] |
23 | //! async fn main() { |
24 | //! let buffer = Vec::new(); |
25 | //! let messages = vec!["Hello" , "World" ]; |
26 | //! let encoder = LinesCodec::new(); |
27 | //! |
28 | //! // FramedWrite is a sink which means you can send values into it |
29 | //! // asynchronously. |
30 | //! let mut writer = FramedWrite::new(buffer, encoder); |
31 | //! |
32 | //! // To be able to send values into a FramedWrite, you need to bring the |
33 | //! // `SinkExt` trait into scope. |
34 | //! writer.send(messages[0]).await.unwrap(); |
35 | //! writer.send(messages[1]).await.unwrap(); |
36 | //! |
37 | //! let buffer = writer.get_ref(); |
38 | //! |
39 | //! assert_eq!(buffer.as_slice(), "Hello \nWorld \n" .as_bytes()); |
40 | //! } |
41 | //!``` |
42 | //! |
43 | //! # Example decoding using `LinesCodec` |
44 | //! The following example demonstrates how to use a codec such as [`LinesCodec`] to |
45 | //! read a stream of framed data. [`FramedRead`] can be used to achieve this. [`FramedRead`] |
46 | //! will keep reading from an [`AsyncRead`] implementor until a whole frame, according to a codec, |
47 | //! can be parsed. |
48 | //! |
49 | //!``` |
50 | //! use tokio_stream::StreamExt; |
51 | //! use tokio_util::codec::LinesCodec; |
52 | //! use tokio_util::codec::FramedRead; |
53 | //! |
54 | //! #[tokio::main] |
55 | //! async fn main() { |
56 | //! let message = "Hello \nWorld" .as_bytes(); |
57 | //! let decoder = LinesCodec::new(); |
58 | //! |
59 | //! // FramedRead can be used to read a stream of values that are framed according to |
60 | //! // a codec. FramedRead will read from its input (here `buffer`) until a whole frame |
61 | //! // can be parsed. |
62 | //! let mut reader = FramedRead::new(message, decoder); |
63 | //! |
64 | //! // To read values from a FramedRead, you need to bring the |
65 | //! // `StreamExt` trait into scope. |
66 | //! let frame1 = reader.next().await.unwrap().unwrap(); |
67 | //! let frame2 = reader.next().await.unwrap().unwrap(); |
68 | //! |
69 | //! assert!(reader.next().await.is_none()); |
70 | //! assert_eq!(frame1, "Hello" ); |
71 | //! assert_eq!(frame2, "World" ); |
72 | //! } |
73 | //! ``` |
74 | //! |
75 | //! # The Decoder trait |
76 | //! |
77 | //! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an |
78 | //! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify |
79 | //! how sequences of bytes are turned into a sequence of frames, and to |
80 | //! determine where the boundaries between frames are. The job of the |
81 | //! `FramedRead` is to repeatedly switch between reading more data from the IO |
82 | //! resource, and asking the decoder whether we have received enough data to |
83 | //! decode another frame of data. |
84 | //! |
85 | //! The main method on the `Decoder` trait is the [`decode`] method. This method |
86 | //! takes as argument the data that has been read so far, and when it is called, |
87 | //! it will be in one of the following situations: |
88 | //! |
89 | //! 1. The buffer contains less than a full frame. |
90 | //! 2. The buffer contains exactly a full frame. |
91 | //! 3. The buffer contains more than a full frame. |
92 | //! |
93 | //! In the first situation, the decoder should return `Ok(None)`. |
94 | //! |
95 | //! In the second situation, the decoder should clear the provided buffer and |
96 | //! return `Ok(Some(the_decoded_frame))`. |
97 | //! |
98 | //! In the third situation, the decoder should use a method such as [`split_to`] |
99 | //! or [`advance`] to modify the buffer such that the frame is removed from the |
100 | //! buffer, but any data in the buffer after that frame should still remain in |
101 | //! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in |
102 | //! this case. |
103 | //! |
104 | //! Finally the decoder may return an error if the data is invalid in some way. |
105 | //! The decoder should _not_ return an error just because it has yet to receive |
106 | //! a full frame. |
107 | //! |
108 | //! It is guaranteed that, from one call to `decode` to another, the provided |
109 | //! buffer will contain the exact same data as before, except that if more data |
110 | //! has arrived through the IO resource, that data will have been appended to |
111 | //! the buffer. This means that reading frames from a `FramedRead` is |
112 | //! essentially equivalent to the following loop: |
113 | //! |
114 | //! ```no_run |
115 | //! use tokio::io::AsyncReadExt; |
116 | //! # // This uses async_stream to create an example that compiles. |
117 | //! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! { |
118 | //! # use tokio_util::codec::Decoder; |
119 | //! # let mut decoder = tokio_util::codec::BytesCodec::new(); |
120 | //! # let io_resource = &mut &[0u8, 1, 2, 3][..]; |
121 | //! |
122 | //! let mut buf = bytes::BytesMut::new(); |
123 | //! loop { |
124 | //! // The read_buf call will append to buf rather than overwrite existing data. |
125 | //! let len = io_resource.read_buf(&mut buf).await?; |
126 | //! |
127 | //! if len == 0 { |
128 | //! while let Some(frame) = decoder.decode_eof(&mut buf)? { |
129 | //! yield frame; |
130 | //! } |
131 | //! break; |
132 | //! } |
133 | //! |
134 | //! while let Some(frame) = decoder.decode(&mut buf)? { |
135 | //! yield frame; |
136 | //! } |
137 | //! } |
138 | //! # }} |
139 | //! ``` |
140 | //! The example above uses `yield` whenever the `Stream` produces an item. |
141 | //! |
142 | //! ## Example decoder |
143 | //! |
144 | //! As an example, consider a protocol that can be used to send strings where |
145 | //! each frame is a four byte integer that contains the length of the frame, |
146 | //! followed by that many bytes of string data. The decoder fails with an error |
147 | //! if the string data is not valid utf-8 or too long. |
148 | //! |
149 | //! Such a decoder can be written like this: |
150 | //! ``` |
151 | //! use tokio_util::codec::Decoder; |
152 | //! use bytes::{BytesMut, Buf}; |
153 | //! |
154 | //! struct MyStringDecoder {} |
155 | //! |
156 | //! const MAX: usize = 8 * 1024 * 1024; |
157 | //! |
158 | //! impl Decoder for MyStringDecoder { |
159 | //! type Item = String; |
160 | //! type Error = std::io::Error; |
161 | //! |
162 | //! fn decode( |
163 | //! &mut self, |
164 | //! src: &mut BytesMut |
165 | //! ) -> Result<Option<Self::Item>, Self::Error> { |
166 | //! if src.len() < 4 { |
167 | //! // Not enough data to read length marker. |
168 | //! return Ok(None); |
169 | //! } |
170 | //! |
171 | //! // Read length marker. |
172 | //! let mut length_bytes = [0u8; 4]; |
173 | //! length_bytes.copy_from_slice(&src[..4]); |
174 | //! let length = u32::from_le_bytes(length_bytes) as usize; |
175 | //! |
176 | //! // Check that the length is not too large to avoid a denial of |
177 | //! // service attack where the server runs out of memory. |
178 | //! if length > MAX { |
179 | //! return Err(std::io::Error::new( |
180 | //! std::io::ErrorKind::InvalidData, |
181 | //! format!("Frame of length {} is too large." , length) |
182 | //! )); |
183 | //! } |
184 | //! |
185 | //! if src.len() < 4 + length { |
186 | //! // The full string has not yet arrived. |
187 | //! // |
188 | //! // We reserve more space in the buffer. This is not strictly |
189 | //! // necessary, but is a good idea performance-wise. |
190 | //! src.reserve(4 + length - src.len()); |
191 | //! |
192 | //! // We inform the Framed that we need more bytes to form the next |
193 | //! // frame. |
194 | //! return Ok(None); |
195 | //! } |
196 | //! |
197 | //! // Use advance to modify src such that it no longer contains |
198 | //! // this frame. |
199 | //! let data = src[4..4 + length].to_vec(); |
200 | //! src.advance(4 + length); |
201 | //! |
202 | //! // Convert the data to a string, or fail if it is not valid utf-8. |
203 | //! match String::from_utf8(data) { |
204 | //! Ok(string) => Ok(Some(string)), |
205 | //! Err(utf8_error) => { |
206 | //! Err(std::io::Error::new( |
207 | //! std::io::ErrorKind::InvalidData, |
208 | //! utf8_error.utf8_error(), |
209 | //! )) |
210 | //! }, |
211 | //! } |
212 | //! } |
213 | //! } |
214 | //! ``` |
215 | //! |
216 | //! # The Encoder trait |
217 | //! |
218 | //! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn |
219 | //! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to |
220 | //! specify how frames are turned into a sequences of bytes. The job of the |
221 | //! `FramedWrite` is to take the resulting sequence of bytes and write it to the |
222 | //! IO resource. |
223 | //! |
224 | //! The main method on the `Encoder` trait is the [`encode`] method. This method |
225 | //! takes an item that is being written, and a buffer to write the item to. The |
226 | //! buffer may already contain data, and in this case, the encoder should append |
227 | //! the new frame to the buffer rather than overwrite the existing data. |
228 | //! |
229 | //! It is guaranteed that, from one call to `encode` to another, the provided |
230 | //! buffer will contain the exact same data as before, except that some of the |
231 | //! data may have been removed from the front of the buffer. Writing to a |
232 | //! `FramedWrite` is essentially equivalent to the following loop: |
233 | //! |
234 | //! ```no_run |
235 | //! use tokio::io::AsyncWriteExt; |
236 | //! use bytes::Buf; // for advance |
237 | //! # use tokio_util::codec::Encoder; |
238 | //! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() } |
239 | //! # async fn no_more_frames() { } |
240 | //! # #[tokio::main] async fn main() -> std::io::Result<()> { |
241 | //! # let mut io_resource = tokio::io::sink(); |
242 | //! # let mut encoder = tokio_util::codec::BytesCodec::new(); |
243 | //! |
244 | //! const MAX: usize = 8192; |
245 | //! |
246 | //! let mut buf = bytes::BytesMut::new(); |
247 | //! loop { |
248 | //! tokio::select! { |
249 | //! num_written = io_resource.write(&buf), if !buf.is_empty() => { |
250 | //! buf.advance(num_written?); |
251 | //! }, |
252 | //! frame = next_frame(), if buf.len() < MAX => { |
253 | //! encoder.encode(frame, &mut buf)?; |
254 | //! }, |
255 | //! _ = no_more_frames() => { |
256 | //! io_resource.write_all(&buf).await?; |
257 | //! io_resource.shutdown().await?; |
258 | //! return Ok(()); |
259 | //! }, |
260 | //! } |
261 | //! } |
262 | //! # } |
263 | //! ``` |
264 | //! Here the `next_frame` method corresponds to any frames you write to the |
265 | //! `FramedWrite`. The `no_more_frames` method corresponds to closing the |
266 | //! `FramedWrite` with [`SinkExt::close`]. |
267 | //! |
268 | //! ## Example encoder |
269 | //! |
270 | //! As an example, consider a protocol that can be used to send strings where |
271 | //! each frame is a four byte integer that contains the length of the frame, |
272 | //! followed by that many bytes of string data. The encoder will fail if the |
273 | //! string is too long. |
274 | //! |
275 | //! Such an encoder can be written like this: |
276 | //! ``` |
277 | //! use tokio_util::codec::Encoder; |
278 | //! use bytes::BytesMut; |
279 | //! |
280 | //! struct MyStringEncoder {} |
281 | //! |
282 | //! const MAX: usize = 8 * 1024 * 1024; |
283 | //! |
284 | //! impl Encoder<String> for MyStringEncoder { |
285 | //! type Error = std::io::Error; |
286 | //! |
287 | //! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> { |
288 | //! // Don't send a string if it is longer than the other end will |
289 | //! // accept. |
290 | //! if item.len() > MAX { |
291 | //! return Err(std::io::Error::new( |
292 | //! std::io::ErrorKind::InvalidData, |
293 | //! format!("Frame of length {} is too large." , item.len()) |
294 | //! )); |
295 | //! } |
296 | //! |
297 | //! // Convert the length into a byte array. |
298 | //! // The cast to u32 cannot overflow due to the length check above. |
299 | //! let len_slice = u32::to_le_bytes(item.len() as u32); |
300 | //! |
301 | //! // Reserve space in the buffer. |
302 | //! dst.reserve(4 + item.len()); |
303 | //! |
304 | //! // Write the length and string to the buffer. |
305 | //! dst.extend_from_slice(&len_slice); |
306 | //! dst.extend_from_slice(item.as_bytes()); |
307 | //! Ok(()) |
308 | //! } |
309 | //! } |
310 | //! ``` |
311 | //! |
312 | //! [`AsyncRead`]: tokio::io::AsyncRead |
313 | //! [`AsyncWrite`]: tokio::io::AsyncWrite |
314 | //! [`Stream`]: futures_core::Stream |
315 | //! [`Sink`]: futures_sink::Sink |
316 | //! [`SinkExt`]: futures::sink::SinkExt |
317 | //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close |
318 | //! [`FramedRead`]: struct@crate::codec::FramedRead |
319 | //! [`FramedWrite`]: struct@crate::codec::FramedWrite |
320 | //! [`Framed`]: struct@crate::codec::Framed |
321 | //! [`Decoder`]: trait@crate::codec::Decoder |
322 | //! [`decode`]: fn@crate::codec::Decoder::decode |
323 | //! [`encode`]: fn@crate::codec::Encoder::encode |
324 | //! [`split_to`]: fn@bytes::BytesMut::split_to |
325 | //! [`advance`]: fn@bytes::Buf::advance |
326 | |
327 | mod bytes_codec; |
328 | pub use self::bytes_codec::BytesCodec; |
329 | |
330 | mod decoder; |
331 | pub use self::decoder::Decoder; |
332 | |
333 | mod encoder; |
334 | pub use self::encoder::Encoder; |
335 | |
336 | mod framed_impl; |
337 | #[allow (unused_imports)] |
338 | pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; |
339 | |
340 | mod framed; |
341 | pub use self::framed::{Framed, FramedParts}; |
342 | |
343 | mod framed_read; |
344 | pub use self::framed_read::FramedRead; |
345 | |
346 | mod framed_write; |
347 | pub use self::framed_write::FramedWrite; |
348 | |
349 | pub mod length_delimited; |
350 | pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError}; |
351 | |
352 | mod lines_codec; |
353 | pub use self::lines_codec::{LinesCodec, LinesCodecError}; |
354 | |
355 | mod any_delimiter_codec; |
356 | pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError}; |
357 | |