| 1 | //! Adaptors from `AsyncRead`/`AsyncWrite` to Stream/Sink |
| 2 | //! |
| 3 | //! Raw I/O objects work with byte sequences, but higher-level code usually |
| 4 | //! wants to batch these into meaningful chunks, called "frames". |
| 5 | //! |
| 6 | //! This module contains adapters to go from streams of bytes, [`AsyncRead`] and |
| 7 | //! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. |
| 8 | //! Framed streams are also known as transports. |
| 9 | //! |
| 10 | //! # Example encoding using `LinesCodec` |
| 11 | //! |
| 12 | //! The following example demonstrates how to use a codec such as [`LinesCodec`] to |
| 13 | //! write framed data. [`FramedWrite`] can be used to achieve this. Data sent to |
| 14 | //! [`FramedWrite`] are first framed according to a specific codec, and then sent to |
| 15 | //! an implementor of [`AsyncWrite`]. |
| 16 | //! |
| 17 | //! ``` |
| 18 | //! use futures::sink::SinkExt; |
| 19 | //! use tokio_util::codec::LinesCodec; |
| 20 | //! use tokio_util::codec::FramedWrite; |
| 21 | //! |
| 22 | //! #[tokio::main] |
| 23 | //! async fn main() { |
| 24 | //! let buffer = Vec::new(); |
| 25 | //! let messages = vec!["Hello" , "World" ]; |
| 26 | //! let encoder = LinesCodec::new(); |
| 27 | //! |
| 28 | //! // FramedWrite is a sink which means you can send values into it |
| 29 | //! // asynchronously. |
| 30 | //! let mut writer = FramedWrite::new(buffer, encoder); |
| 31 | //! |
| 32 | //! // To be able to send values into a FramedWrite, you need to bring the |
| 33 | //! // `SinkExt` trait into scope. |
| 34 | //! writer.send(messages[0]).await.unwrap(); |
| 35 | //! writer.send(messages[1]).await.unwrap(); |
| 36 | //! |
| 37 | //! let buffer = writer.get_ref(); |
| 38 | //! |
| 39 | //! assert_eq!(buffer.as_slice(), "Hello \nWorld \n" .as_bytes()); |
| 40 | //! } |
| 41 | //!``` |
| 42 | //! |
| 43 | //! # Example decoding using `LinesCodec` |
| 44 | //! The following example demonstrates how to use a codec such as [`LinesCodec`] to |
| 45 | //! read a stream of framed data. [`FramedRead`] can be used to achieve this. [`FramedRead`] |
| 46 | //! will keep reading from an [`AsyncRead`] implementor until a whole frame, according to a codec, |
| 47 | //! can be parsed. |
| 48 | //! |
| 49 | //!``` |
| 50 | //! use tokio_stream::StreamExt; |
| 51 | //! use tokio_util::codec::LinesCodec; |
| 52 | //! use tokio_util::codec::FramedRead; |
| 53 | //! |
| 54 | //! #[tokio::main] |
| 55 | //! async fn main() { |
| 56 | //! let message = "Hello \nWorld" .as_bytes(); |
| 57 | //! let decoder = LinesCodec::new(); |
| 58 | //! |
| 59 | //! // FramedRead can be used to read a stream of values that are framed according to |
| 60 | //! // a codec. FramedRead will read from its input (here `buffer`) until a whole frame |
| 61 | //! // can be parsed. |
| 62 | //! let mut reader = FramedRead::new(message, decoder); |
| 63 | //! |
| 64 | //! // To read values from a FramedRead, you need to bring the |
| 65 | //! // `StreamExt` trait into scope. |
| 66 | //! let frame1 = reader.next().await.unwrap().unwrap(); |
| 67 | //! let frame2 = reader.next().await.unwrap().unwrap(); |
| 68 | //! |
| 69 | //! assert!(reader.next().await.is_none()); |
| 70 | //! assert_eq!(frame1, "Hello" ); |
| 71 | //! assert_eq!(frame2, "World" ); |
| 72 | //! } |
| 73 | //! ``` |
| 74 | //! |
| 75 | //! # The Decoder trait |
| 76 | //! |
| 77 | //! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an |
| 78 | //! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify |
| 79 | //! how sequences of bytes are turned into a sequence of frames, and to |
| 80 | //! determine where the boundaries between frames are. The job of the |
| 81 | //! `FramedRead` is to repeatedly switch between reading more data from the IO |
| 82 | //! resource, and asking the decoder whether we have received enough data to |
| 83 | //! decode another frame of data. |
| 84 | //! |
| 85 | //! The main method on the `Decoder` trait is the [`decode`] method. This method |
| 86 | //! takes as argument the data that has been read so far, and when it is called, |
| 87 | //! it will be in one of the following situations: |
| 88 | //! |
| 89 | //! 1. The buffer contains less than a full frame. |
| 90 | //! 2. The buffer contains exactly a full frame. |
| 91 | //! 3. The buffer contains more than a full frame. |
| 92 | //! |
| 93 | //! In the first situation, the decoder should return `Ok(None)`. |
| 94 | //! |
| 95 | //! In the second situation, the decoder should clear the provided buffer and |
| 96 | //! return `Ok(Some(the_decoded_frame))`. |
| 97 | //! |
| 98 | //! In the third situation, the decoder should use a method such as [`split_to`] |
| 99 | //! or [`advance`] to modify the buffer such that the frame is removed from the |
| 100 | //! buffer, but any data in the buffer after that frame should still remain in |
| 101 | //! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in |
| 102 | //! this case. |
| 103 | //! |
| 104 | //! Finally the decoder may return an error if the data is invalid in some way. |
| 105 | //! The decoder should _not_ return an error just because it has yet to receive |
| 106 | //! a full frame. |
| 107 | //! |
| 108 | //! It is guaranteed that, from one call to `decode` to another, the provided |
| 109 | //! buffer will contain the exact same data as before, except that if more data |
| 110 | //! has arrived through the IO resource, that data will have been appended to |
| 111 | //! the buffer. This means that reading frames from a `FramedRead` is |
| 112 | //! essentially equivalent to the following loop: |
| 113 | //! |
| 114 | //! ```no_run |
| 115 | //! use tokio::io::AsyncReadExt; |
| 116 | //! # // This uses async_stream to create an example that compiles. |
| 117 | //! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! { |
| 118 | //! # use tokio_util::codec::Decoder; |
| 119 | //! # let mut decoder = tokio_util::codec::BytesCodec::new(); |
| 120 | //! # let io_resource = &mut &[0u8, 1, 2, 3][..]; |
| 121 | //! |
| 122 | //! let mut buf = bytes::BytesMut::new(); |
| 123 | //! loop { |
| 124 | //! // The read_buf call will append to buf rather than overwrite existing data. |
| 125 | //! let len = io_resource.read_buf(&mut buf).await?; |
| 126 | //! |
| 127 | //! if len == 0 { |
| 128 | //! while let Some(frame) = decoder.decode_eof(&mut buf)? { |
| 129 | //! yield frame; |
| 130 | //! } |
| 131 | //! break; |
| 132 | //! } |
| 133 | //! |
| 134 | //! while let Some(frame) = decoder.decode(&mut buf)? { |
| 135 | //! yield frame; |
| 136 | //! } |
| 137 | //! } |
| 138 | //! # }} |
| 139 | //! ``` |
| 140 | //! The example above uses `yield` whenever the `Stream` produces an item. |
| 141 | //! |
| 142 | //! ## Example decoder |
| 143 | //! |
| 144 | //! As an example, consider a protocol that can be used to send strings where |
| 145 | //! each frame is a four byte integer that contains the length of the frame, |
| 146 | //! followed by that many bytes of string data. The decoder fails with an error |
| 147 | //! if the string data is not valid utf-8 or too long. |
| 148 | //! |
| 149 | //! Such a decoder can be written like this: |
| 150 | //! ``` |
| 151 | //! use tokio_util::codec::Decoder; |
| 152 | //! use bytes::{BytesMut, Buf}; |
| 153 | //! |
| 154 | //! struct MyStringDecoder {} |
| 155 | //! |
| 156 | //! const MAX: usize = 8 * 1024 * 1024; |
| 157 | //! |
| 158 | //! impl Decoder for MyStringDecoder { |
| 159 | //! type Item = String; |
| 160 | //! type Error = std::io::Error; |
| 161 | //! |
| 162 | //! fn decode( |
| 163 | //! &mut self, |
| 164 | //! src: &mut BytesMut |
| 165 | //! ) -> Result<Option<Self::Item>, Self::Error> { |
| 166 | //! if src.len() < 4 { |
| 167 | //! // Not enough data to read length marker. |
| 168 | //! return Ok(None); |
| 169 | //! } |
| 170 | //! |
| 171 | //! // Read length marker. |
| 172 | //! let mut length_bytes = [0u8; 4]; |
| 173 | //! length_bytes.copy_from_slice(&src[..4]); |
| 174 | //! let length = u32::from_le_bytes(length_bytes) as usize; |
| 175 | //! |
| 176 | //! // Check that the length is not too large to avoid a denial of |
| 177 | //! // service attack where the server runs out of memory. |
| 178 | //! if length > MAX { |
| 179 | //! return Err(std::io::Error::new( |
| 180 | //! std::io::ErrorKind::InvalidData, |
| 181 | //! format!("Frame of length {} is too large." , length) |
| 182 | //! )); |
| 183 | //! } |
| 184 | //! |
| 185 | //! if src.len() < 4 + length { |
| 186 | //! // The full string has not yet arrived. |
| 187 | //! // |
| 188 | //! // We reserve more space in the buffer. This is not strictly |
| 189 | //! // necessary, but is a good idea performance-wise. |
| 190 | //! src.reserve(4 + length - src.len()); |
| 191 | //! |
| 192 | //! // We inform the Framed that we need more bytes to form the next |
| 193 | //! // frame. |
| 194 | //! return Ok(None); |
| 195 | //! } |
| 196 | //! |
| 197 | //! // Use advance to modify src such that it no longer contains |
| 198 | //! // this frame. |
| 199 | //! let data = src[4..4 + length].to_vec(); |
| 200 | //! src.advance(4 + length); |
| 201 | //! |
| 202 | //! // Convert the data to a string, or fail if it is not valid utf-8. |
| 203 | //! match String::from_utf8(data) { |
| 204 | //! Ok(string) => Ok(Some(string)), |
| 205 | //! Err(utf8_error) => { |
| 206 | //! Err(std::io::Error::new( |
| 207 | //! std::io::ErrorKind::InvalidData, |
| 208 | //! utf8_error.utf8_error(), |
| 209 | //! )) |
| 210 | //! }, |
| 211 | //! } |
| 212 | //! } |
| 213 | //! } |
| 214 | //! ``` |
| 215 | //! |
| 216 | //! # The Encoder trait |
| 217 | //! |
| 218 | //! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn |
| 219 | //! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to |
| 220 | //! specify how frames are turned into a sequences of bytes. The job of the |
| 221 | //! `FramedWrite` is to take the resulting sequence of bytes and write it to the |
| 222 | //! IO resource. |
| 223 | //! |
| 224 | //! The main method on the `Encoder` trait is the [`encode`] method. This method |
| 225 | //! takes an item that is being written, and a buffer to write the item to. The |
| 226 | //! buffer may already contain data, and in this case, the encoder should append |
| 227 | //! the new frame to the buffer rather than overwrite the existing data. |
| 228 | //! |
| 229 | //! It is guaranteed that, from one call to `encode` to another, the provided |
| 230 | //! buffer will contain the exact same data as before, except that some of the |
| 231 | //! data may have been removed from the front of the buffer. Writing to a |
| 232 | //! `FramedWrite` is essentially equivalent to the following loop: |
| 233 | //! |
| 234 | //! ```no_run |
| 235 | //! use tokio::io::AsyncWriteExt; |
| 236 | //! use bytes::Buf; // for advance |
| 237 | //! # use tokio_util::codec::Encoder; |
| 238 | //! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() } |
| 239 | //! # async fn no_more_frames() { } |
| 240 | //! # #[tokio::main] async fn main() -> std::io::Result<()> { |
| 241 | //! # let mut io_resource = tokio::io::sink(); |
| 242 | //! # let mut encoder = tokio_util::codec::BytesCodec::new(); |
| 243 | //! |
| 244 | //! const MAX: usize = 8192; |
| 245 | //! |
| 246 | //! let mut buf = bytes::BytesMut::new(); |
| 247 | //! loop { |
| 248 | //! tokio::select! { |
| 249 | //! num_written = io_resource.write(&buf), if !buf.is_empty() => { |
| 250 | //! buf.advance(num_written?); |
| 251 | //! }, |
| 252 | //! frame = next_frame(), if buf.len() < MAX => { |
| 253 | //! encoder.encode(frame, &mut buf)?; |
| 254 | //! }, |
| 255 | //! _ = no_more_frames() => { |
| 256 | //! io_resource.write_all(&buf).await?; |
| 257 | //! io_resource.shutdown().await?; |
| 258 | //! return Ok(()); |
| 259 | //! }, |
| 260 | //! } |
| 261 | //! } |
| 262 | //! # } |
| 263 | //! ``` |
| 264 | //! Here the `next_frame` method corresponds to any frames you write to the |
| 265 | //! `FramedWrite`. The `no_more_frames` method corresponds to closing the |
| 266 | //! `FramedWrite` with [`SinkExt::close`]. |
| 267 | //! |
| 268 | //! ## Example encoder |
| 269 | //! |
| 270 | //! As an example, consider a protocol that can be used to send strings where |
| 271 | //! each frame is a four byte integer that contains the length of the frame, |
| 272 | //! followed by that many bytes of string data. The encoder will fail if the |
| 273 | //! string is too long. |
| 274 | //! |
| 275 | //! Such an encoder can be written like this: |
| 276 | //! ``` |
| 277 | //! use tokio_util::codec::Encoder; |
| 278 | //! use bytes::BytesMut; |
| 279 | //! |
| 280 | //! struct MyStringEncoder {} |
| 281 | //! |
| 282 | //! const MAX: usize = 8 * 1024 * 1024; |
| 283 | //! |
| 284 | //! impl Encoder<String> for MyStringEncoder { |
| 285 | //! type Error = std::io::Error; |
| 286 | //! |
| 287 | //! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> { |
| 288 | //! // Don't send a string if it is longer than the other end will |
| 289 | //! // accept. |
| 290 | //! if item.len() > MAX { |
| 291 | //! return Err(std::io::Error::new( |
| 292 | //! std::io::ErrorKind::InvalidData, |
| 293 | //! format!("Frame of length {} is too large." , item.len()) |
| 294 | //! )); |
| 295 | //! } |
| 296 | //! |
| 297 | //! // Convert the length into a byte array. |
| 298 | //! // The cast to u32 cannot overflow due to the length check above. |
| 299 | //! let len_slice = u32::to_le_bytes(item.len() as u32); |
| 300 | //! |
| 301 | //! // Reserve space in the buffer. |
| 302 | //! dst.reserve(4 + item.len()); |
| 303 | //! |
| 304 | //! // Write the length and string to the buffer. |
| 305 | //! dst.extend_from_slice(&len_slice); |
| 306 | //! dst.extend_from_slice(item.as_bytes()); |
| 307 | //! Ok(()) |
| 308 | //! } |
| 309 | //! } |
| 310 | //! ``` |
| 311 | //! |
| 312 | //! [`AsyncRead`]: tokio::io::AsyncRead |
| 313 | //! [`AsyncWrite`]: tokio::io::AsyncWrite |
| 314 | //! [`Stream`]: futures_core::Stream |
| 315 | //! [`Sink`]: futures_sink::Sink |
| 316 | //! [`SinkExt`]: futures::sink::SinkExt |
| 317 | //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close |
| 318 | //! [`FramedRead`]: struct@crate::codec::FramedRead |
| 319 | //! [`FramedWrite`]: struct@crate::codec::FramedWrite |
| 320 | //! [`Framed`]: struct@crate::codec::Framed |
| 321 | //! [`Decoder`]: trait@crate::codec::Decoder |
| 322 | //! [`decode`]: fn@crate::codec::Decoder::decode |
| 323 | //! [`encode`]: fn@crate::codec::Encoder::encode |
| 324 | //! [`split_to`]: fn@bytes::BytesMut::split_to |
| 325 | //! [`advance`]: fn@bytes::Buf::advance |
| 326 | |
| 327 | mod bytes_codec; |
| 328 | pub use self::bytes_codec::BytesCodec; |
| 329 | |
| 330 | mod decoder; |
| 331 | pub use self::decoder::Decoder; |
| 332 | |
| 333 | mod encoder; |
| 334 | pub use self::encoder::Encoder; |
| 335 | |
| 336 | mod framed_impl; |
| 337 | #[allow (unused_imports)] |
| 338 | pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; |
| 339 | |
| 340 | mod framed; |
| 341 | pub use self::framed::{Framed, FramedParts}; |
| 342 | |
| 343 | mod framed_read; |
| 344 | pub use self::framed_read::FramedRead; |
| 345 | |
| 346 | mod framed_write; |
| 347 | pub use self::framed_write::FramedWrite; |
| 348 | |
| 349 | pub mod length_delimited; |
| 350 | pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError}; |
| 351 | |
| 352 | mod lines_codec; |
| 353 | pub use self::lines_codec::{LinesCodec, LinesCodecError}; |
| 354 | |
| 355 | mod any_delimiter_codec; |
| 356 | pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError}; |
| 357 | |