1 | //! Frame a stream of bytes based on a length prefix |
2 | //! |
3 | //! Many protocols delimit their frames by prefacing frame data with a |
4 | //! frame head that specifies the length of the frame. The |
5 | //! `length_delimited` module provides utilities for handling the length |
6 | //! based framing. This allows the consumer to work with entire frames |
7 | //! without having to worry about buffering or other framing logic. |
8 | //! |
9 | //! # Getting started |
10 | //! |
11 | //! If implementing a protocol from scratch, using length delimited framing |
12 | //! is an easy way to get started. [`LengthDelimitedCodec::new()`] will |
13 | //! return a length delimited codec using default configuration values. |
14 | //! This can then be used to construct a framer to adapt a full-duplex |
15 | //! byte stream into a stream of frames. |
16 | //! |
17 | //! ``` |
18 | //! use tokio::io::{AsyncRead, AsyncWrite}; |
19 | //! use tokio_util::codec::{Framed, LengthDelimitedCodec}; |
20 | //! |
21 | //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) |
22 | //! -> Framed<T, LengthDelimitedCodec> |
23 | //! { |
24 | //! Framed::new(io, LengthDelimitedCodec::new()) |
25 | //! } |
26 | //! # pub fn main() {} |
27 | //! ``` |
28 | //! |
29 | //! The returned transport implements `Sink + Stream` for `BytesMut`. It |
30 | //! encodes the frame with a big-endian `u32` header denoting the frame |
31 | //! payload length: |
32 | //! |
33 | //! ```text |
34 | //! +----------+--------------------------------+ |
35 | //! | len: u32 | frame payload | |
36 | //! +----------+--------------------------------+ |
37 | //! ``` |
38 | //! |
39 | //! Specifically, given the following: |
40 | //! |
41 | //! ``` |
42 | //! use tokio::io::{AsyncRead, AsyncWrite}; |
43 | //! use tokio_util::codec::{Framed, LengthDelimitedCodec}; |
44 | //! |
45 | //! use futures::SinkExt; |
46 | //! use bytes::Bytes; |
47 | //! |
48 | //! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>> |
49 | //! where |
50 | //! T: AsyncRead + AsyncWrite + Unpin, |
51 | //! { |
52 | //! let mut transport = Framed::new(io, LengthDelimitedCodec::new()); |
53 | //! let frame = Bytes::from("hello world" ); |
54 | //! |
55 | //! transport.send(frame).await?; |
56 | //! Ok(()) |
57 | //! } |
58 | //! ``` |
59 | //! |
60 | //! The encoded frame will look like this: |
61 | //! |
62 | //! ```text |
63 | //! +---- len: u32 ----+---- data ----+ |
64 | //! | \x00\x00\x00\x0b | hello world | |
65 | //! +------------------+--------------+ |
66 | //! ``` |
67 | //! |
68 | //! # Decoding |
69 | //! |
70 | //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], |
71 | //! such that each yielded [`BytesMut`] value contains the contents of an |
72 | //! entire frame. There are many configuration parameters enabling |
73 | //! [`FramedRead`] to handle a wide range of protocols. Here are some |
74 | //! examples that will cover the various options at a high level. |
75 | //! |
76 | //! ## Example 1 |
77 | //! |
78 | //! The following will parse a `u16` length field at offset 0, including the |
79 | //! frame head in the yielded `BytesMut`. |
80 | //! |
81 | //! ``` |
82 | //! # use tokio::io::AsyncRead; |
83 | //! # use tokio_util::codec::LengthDelimitedCodec; |
84 | //! # fn bind_read<T: AsyncRead>(io: T) { |
85 | //! LengthDelimitedCodec::builder() |
86 | //! .length_field_offset(0) // default value |
87 | //! .length_field_type::<u16>() |
88 | //! .length_adjustment(0) // default value |
89 | //! .num_skip(0) // Do not strip frame header |
90 | //! .new_read(io); |
91 | //! # } |
92 | //! # pub fn main() {} |
93 | //! ``` |
94 | //! |
95 | //! The following frame will be decoded as such: |
96 | //! |
97 | //! ```text |
98 | //! INPUT DECODED |
99 | //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ |
100 | //! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | |
101 | //! +----------+---------------+ +----------+---------------+ |
102 | //! ``` |
103 | //! |
104 | //! The value of the length field is 11 (`\x0B`) which represents the length |
105 | //! of the payload, `hello world`. By default, [`FramedRead`] assumes that |
106 | //! the length field represents the number of bytes that **follows** the |
107 | //! length field. Thus, the entire frame has a length of 13: 2 bytes for the |
108 | //! frame head + 11 bytes for the payload. |
109 | //! |
110 | //! ## Example 2 |
111 | //! |
112 | //! The following will parse a `u16` length field at offset 0, omitting the |
113 | //! frame head in the yielded `BytesMut`. |
114 | //! |
115 | //! ``` |
116 | //! # use tokio::io::AsyncRead; |
117 | //! # use tokio_util::codec::LengthDelimitedCodec; |
118 | //! # fn bind_read<T: AsyncRead>(io: T) { |
119 | //! LengthDelimitedCodec::builder() |
120 | //! .length_field_offset(0) // default value |
121 | //! .length_field_type::<u16>() |
122 | //! .length_adjustment(0) // default value |
123 | //! // `num_skip` is not needed, the default is to skip |
124 | //! .new_read(io); |
125 | //! # } |
126 | //! # pub fn main() {} |
127 | //! ``` |
128 | //! |
129 | //! The following frame will be decoded as such: |
130 | //! |
131 | //! ```text |
132 | //! INPUT DECODED |
133 | //! +-- len ---+--- Payload ---+ +--- Payload ---+ |
134 | //! | \x00\x0B | Hello world | --> | Hello world | |
135 | //! +----------+---------------+ +---------------+ |
136 | //! ``` |
137 | //! |
138 | //! This is similar to the first example, the only difference is that the |
139 | //! frame head is **not** included in the yielded `BytesMut` value. |
140 | //! |
141 | //! ## Example 3 |
142 | //! |
143 | //! The following will parse a `u16` length field at offset 0, including the |
144 | //! frame head in the yielded `BytesMut`. In this case, the length field |
145 | //! **includes** the frame head length. |
146 | //! |
147 | //! ``` |
148 | //! # use tokio::io::AsyncRead; |
149 | //! # use tokio_util::codec::LengthDelimitedCodec; |
150 | //! # fn bind_read<T: AsyncRead>(io: T) { |
151 | //! LengthDelimitedCodec::builder() |
152 | //! .length_field_offset(0) // default value |
153 | //! .length_field_type::<u16>() |
154 | //! .length_adjustment(-2) // size of head |
155 | //! .num_skip(0) |
156 | //! .new_read(io); |
157 | //! # } |
158 | //! # pub fn main() {} |
159 | //! ``` |
160 | //! |
161 | //! The following frame will be decoded as such: |
162 | //! |
163 | //! ```text |
164 | //! INPUT DECODED |
165 | //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ |
166 | //! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | |
167 | //! +----------+---------------+ +----------+---------------+ |
168 | //! ``` |
169 | //! |
170 | //! In most cases, the length field represents the length of the payload |
171 | //! only, as shown in the previous examples. However, in some protocols the |
172 | //! length field represents the length of the whole frame, including the |
173 | //! head. In such cases, we specify a negative `length_adjustment` to adjust |
174 | //! the value provided in the frame head to represent the payload length. |
175 | //! |
176 | //! ## Example 4 |
177 | //! |
178 | //! The following will parse a 3 byte length field at offset 0 in a 5 byte |
179 | //! frame head, including the frame head in the yielded `BytesMut`. |
180 | //! |
181 | //! ``` |
182 | //! # use tokio::io::AsyncRead; |
183 | //! # use tokio_util::codec::LengthDelimitedCodec; |
184 | //! # fn bind_read<T: AsyncRead>(io: T) { |
185 | //! LengthDelimitedCodec::builder() |
186 | //! .length_field_offset(0) // default value |
187 | //! .length_field_length(3) |
188 | //! .length_adjustment(2) // remaining head |
189 | //! .num_skip(0) |
190 | //! .new_read(io); |
191 | //! # } |
192 | //! # pub fn main() {} |
193 | //! ``` |
194 | //! |
195 | //! The following frame will be decoded as such: |
196 | //! |
197 | //! ```text |
198 | //! INPUT |
199 | //! +---- len -----+- head -+--- Payload ---+ |
200 | //! | \x00\x00\x0B | \xCAFE | Hello world | |
201 | //! +--------------+--------+---------------+ |
202 | //! |
203 | //! DECODED |
204 | //! +---- len -----+- head -+--- Payload ---+ |
205 | //! | \x00\x00\x0B | \xCAFE | Hello world | |
206 | //! +--------------+--------+---------------+ |
207 | //! ``` |
208 | //! |
209 | //! A more advanced example that shows a case where there is extra frame |
210 | //! head data between the length field and the payload. In such cases, it is |
211 | //! usually desirable to include the frame head as part of the yielded |
212 | //! `BytesMut`. This lets consumers of the length delimited framer to |
213 | //! process the frame head as needed. |
214 | //! |
215 | //! The positive `length_adjustment` value lets `FramedRead` factor in the |
216 | //! additional head into the frame length calculation. |
217 | //! |
218 | //! ## Example 5 |
219 | //! |
220 | //! The following will parse a `u16` length field at offset 1 of a 4 byte |
221 | //! frame head. The first byte and the length field will be omitted from the |
222 | //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be |
223 | //! included. |
224 | //! |
225 | //! ``` |
226 | //! # use tokio::io::AsyncRead; |
227 | //! # use tokio_util::codec::LengthDelimitedCodec; |
228 | //! # fn bind_read<T: AsyncRead>(io: T) { |
229 | //! LengthDelimitedCodec::builder() |
230 | //! .length_field_offset(1) // length of hdr1 |
231 | //! .length_field_type::<u16>() |
232 | //! .length_adjustment(1) // length of hdr2 |
233 | //! .num_skip(3) // length of hdr1 + LEN |
234 | //! .new_read(io); |
235 | //! # } |
236 | //! # pub fn main() {} |
237 | //! ``` |
238 | //! |
239 | //! The following frame will be decoded as such: |
240 | //! |
241 | //! ```text |
242 | //! INPUT |
243 | //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ |
244 | //! | \xCA | \x00\x0B | \xFE | Hello world | |
245 | //! +--------+----------+--------+---------------+ |
246 | //! |
247 | //! DECODED |
248 | //! +- hdr2 -+--- Payload ---+ |
249 | //! | \xFE | Hello world | |
250 | //! +--------+---------------+ |
251 | //! ``` |
252 | //! |
253 | //! The length field is situated in the middle of the frame head. In this |
254 | //! case, the first byte in the frame head could be a version or some other |
255 | //! identifier that is not needed for processing. On the other hand, the |
256 | //! second half of the head is needed. |
257 | //! |
258 | //! `length_field_offset` indicates how many bytes to skip before starting |
259 | //! to read the length field. `length_adjustment` is the number of bytes to |
260 | //! skip starting at the end of the length field. In this case, it is the |
261 | //! second half of the head. |
262 | //! |
263 | //! ## Example 6 |
264 | //! |
265 | //! The following will parse a `u16` length field at offset 1 of a 4 byte |
266 | //! frame head. The first byte and the length field will be omitted from the |
267 | //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be |
268 | //! included. In this case, the length field **includes** the frame head |
269 | //! length. |
270 | //! |
271 | //! ``` |
272 | //! # use tokio::io::AsyncRead; |
273 | //! # use tokio_util::codec::LengthDelimitedCodec; |
274 | //! # fn bind_read<T: AsyncRead>(io: T) { |
275 | //! LengthDelimitedCodec::builder() |
276 | //! .length_field_offset(1) // length of hdr1 |
277 | //! .length_field_type::<u16>() |
278 | //! .length_adjustment(-3) // length of hdr1 + LEN, negative |
279 | //! .num_skip(3) |
280 | //! .new_read(io); |
281 | //! # } |
282 | //! ``` |
283 | //! |
284 | //! The following frame will be decoded as such: |
285 | //! |
286 | //! ```text |
287 | //! INPUT |
288 | //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ |
289 | //! | \xCA | \x00\x0F | \xFE | Hello world | |
290 | //! +--------+----------+--------+---------------+ |
291 | //! |
292 | //! DECODED |
293 | //! +- hdr2 -+--- Payload ---+ |
294 | //! | \xFE | Hello world | |
295 | //! +--------+---------------+ |
296 | //! ``` |
297 | //! |
298 | //! Similar to the example above, the difference is that the length field |
299 | //! represents the length of the entire frame instead of just the payload. |
300 | //! The length of `hdr1` and `len` must be counted in `length_adjustment`. |
301 | //! Note that the length of `hdr2` does **not** need to be explicitly set |
302 | //! anywhere because it already is factored into the total frame length that |
303 | //! is read from the byte stream. |
304 | //! |
305 | //! ## Example 7 |
306 | //! |
307 | //! The following will parse a 3 byte length field at offset 0 in a 4 byte |
308 | //! frame head, excluding the 4th byte from the yielded `BytesMut`. |
309 | //! |
310 | //! ``` |
311 | //! # use tokio::io::AsyncRead; |
312 | //! # use tokio_util::codec::LengthDelimitedCodec; |
313 | //! # fn bind_read<T: AsyncRead>(io: T) { |
314 | //! LengthDelimitedCodec::builder() |
315 | //! .length_field_offset(0) // default value |
316 | //! .length_field_length(3) |
317 | //! .length_adjustment(0) // default value |
318 | //! .num_skip(4) // skip the first 4 bytes |
319 | //! .new_read(io); |
320 | //! # } |
321 | //! # pub fn main() {} |
322 | //! ``` |
323 | //! |
324 | //! The following frame will be decoded as such: |
325 | //! |
326 | //! ```text |
327 | //! INPUT DECODED |
328 | //! +------- len ------+--- Payload ---+ +--- Payload ---+ |
329 | //! | \x00\x00\x0B\xFF | Hello world | => | Hello world | |
330 | //! +------------------+---------------+ +---------------+ |
331 | //! ``` |
332 | //! |
333 | //! A simple example where there are unused bytes between the length field |
334 | //! and the payload. |
335 | //! |
336 | //! # Encoding |
337 | //! |
338 | //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], |
339 | //! such that each submitted [`BytesMut`] is prefaced by a length field. |
340 | //! There are fewer configuration options than [`FramedRead`]. Given |
341 | //! protocols that have more complex frame heads, an encoder should probably |
342 | //! be written by hand using [`Encoder`]. |
343 | //! |
344 | //! Here is a simple example, given a `FramedWrite` with the following |
345 | //! configuration: |
346 | //! |
347 | //! ``` |
348 | //! # use tokio::io::AsyncWrite; |
349 | //! # use tokio_util::codec::LengthDelimitedCodec; |
350 | //! # fn write_frame<T: AsyncWrite>(io: T) { |
351 | //! # let _ = |
352 | //! LengthDelimitedCodec::builder() |
353 | //! .length_field_type::<u16>() |
354 | //! .new_write(io); |
355 | //! # } |
356 | //! # pub fn main() {} |
357 | //! ``` |
358 | //! |
359 | //! A payload of `hello world` will be encoded as: |
360 | //! |
361 | //! ```text |
362 | //! +- len: u16 -+---- data ----+ |
363 | //! | \x00\x0b | hello world | |
364 | //! +------------+--------------+ |
365 | //! ``` |
366 | //! |
367 | //! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new |
368 | //! [`FramedRead`]: struct@FramedRead |
369 | //! [`FramedWrite`]: struct@FramedWrite |
370 | //! [`AsyncRead`]: trait@tokio::io::AsyncRead |
371 | //! [`AsyncWrite`]: trait@tokio::io::AsyncWrite |
372 | //! [`Encoder`]: trait@Encoder |
373 | //! [`BytesMut`]: bytes::BytesMut |
374 | |
375 | use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}; |
376 | |
377 | use tokio::io::{AsyncRead, AsyncWrite}; |
378 | |
379 | use bytes::{Buf, BufMut, Bytes, BytesMut}; |
380 | use std::error::Error as StdError; |
381 | use std::io::{self, Cursor}; |
382 | use std::{cmp, fmt, mem}; |
383 | |
384 | /// Configure length delimited `LengthDelimitedCodec`s. |
385 | /// |
386 | /// `Builder` enables constructing configured length delimited codecs. Note |
387 | /// that not all configuration settings apply to both encoding and decoding. See |
388 | /// the documentation for specific methods for more detail. |
389 | #[derive (Debug, Clone, Copy)] |
390 | pub struct Builder { |
391 | // Maximum frame length |
392 | max_frame_len: usize, |
393 | |
394 | // Number of bytes representing the field length |
395 | length_field_len: usize, |
396 | |
397 | // Number of bytes in the header before the length field |
398 | length_field_offset: usize, |
399 | |
400 | // Adjust the length specified in the header field by this amount |
401 | length_adjustment: isize, |
402 | |
403 | // Total number of bytes to skip before reading the payload, if not set, |
404 | // `length_field_len + length_field_offset` |
405 | num_skip: Option<usize>, |
406 | |
407 | // Length field byte order (little or big endian) |
408 | length_field_is_big_endian: bool, |
409 | } |
410 | |
411 | /// An error when the number of bytes read is more than max frame length. |
412 | pub struct LengthDelimitedCodecError { |
413 | _priv: (), |
414 | } |
415 | |
416 | /// A codec for frames delimited by a frame head specifying their lengths. |
417 | /// |
418 | /// This allows the consumer to work with entire frames without having to worry |
419 | /// about buffering or other framing logic. |
420 | /// |
421 | /// See [module level] documentation for more detail. |
422 | /// |
423 | /// [module level]: index.html |
424 | #[derive (Debug, Clone)] |
425 | pub struct LengthDelimitedCodec { |
426 | // Configuration values |
427 | builder: Builder, |
428 | |
429 | // Read state |
430 | state: DecodeState, |
431 | } |
432 | |
433 | #[derive (Debug, Clone, Copy)] |
434 | enum DecodeState { |
435 | Head, |
436 | Data(usize), |
437 | } |
438 | |
439 | // ===== impl LengthDelimitedCodec ====== |
440 | |
441 | impl LengthDelimitedCodec { |
442 | /// Creates a new `LengthDelimitedCodec` with the default configuration values. |
443 | pub fn new() -> Self { |
444 | Self { |
445 | builder: Builder::new(), |
446 | state: DecodeState::Head, |
447 | } |
448 | } |
449 | |
450 | /// Creates a new length delimited codec builder with default configuration |
451 | /// values. |
452 | pub fn builder() -> Builder { |
453 | Builder::new() |
454 | } |
455 | |
456 | /// Returns the current max frame setting |
457 | /// |
458 | /// This is the largest size this codec will accept from the wire. Larger |
459 | /// frames will be rejected. |
460 | pub fn max_frame_length(&self) -> usize { |
461 | self.builder.max_frame_len |
462 | } |
463 | |
464 | /// Updates the max frame setting. |
465 | /// |
466 | /// The change takes effect the next time a frame is decoded. In other |
467 | /// words, if a frame is currently in process of being decoded with a frame |
468 | /// size greater than `val` but less than the max frame length in effect |
469 | /// before calling this function, then the frame will be allowed. |
470 | pub fn set_max_frame_length(&mut self, val: usize) { |
471 | self.builder.max_frame_length(val); |
472 | } |
473 | |
474 | fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { |
475 | let head_len = self.builder.num_head_bytes(); |
476 | let field_len = self.builder.length_field_len; |
477 | |
478 | if src.len() < head_len { |
479 | // Not enough data |
480 | return Ok(None); |
481 | } |
482 | |
483 | let n = { |
484 | let mut src = Cursor::new(&mut *src); |
485 | |
486 | // Skip the required bytes |
487 | src.advance(self.builder.length_field_offset); |
488 | |
489 | // match endianness |
490 | let n = if self.builder.length_field_is_big_endian { |
491 | src.get_uint(field_len) |
492 | } else { |
493 | src.get_uint_le(field_len) |
494 | }; |
495 | |
496 | if n > self.builder.max_frame_len as u64 { |
497 | return Err(io::Error::new( |
498 | io::ErrorKind::InvalidData, |
499 | LengthDelimitedCodecError { _priv: () }, |
500 | )); |
501 | } |
502 | |
503 | // The check above ensures there is no overflow |
504 | let n = n as usize; |
505 | |
506 | // Adjust `n` with bounds checking |
507 | let n = if self.builder.length_adjustment < 0 { |
508 | n.checked_sub(-self.builder.length_adjustment as usize) |
509 | } else { |
510 | n.checked_add(self.builder.length_adjustment as usize) |
511 | }; |
512 | |
513 | // Error handling |
514 | match n { |
515 | Some(n) => n, |
516 | None => { |
517 | return Err(io::Error::new( |
518 | io::ErrorKind::InvalidInput, |
519 | "provided length would overflow after adjustment" , |
520 | )); |
521 | } |
522 | } |
523 | }; |
524 | |
525 | let num_skip = self.builder.get_num_skip(); |
526 | |
527 | if num_skip > 0 { |
528 | src.advance(num_skip); |
529 | } |
530 | |
531 | // Ensure that the buffer has enough space to read the incoming |
532 | // payload |
533 | src.reserve(n); |
534 | |
535 | Ok(Some(n)) |
536 | } |
537 | |
538 | fn decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut> { |
539 | // At this point, the buffer has already had the required capacity |
540 | // reserved. All there is to do is read. |
541 | if src.len() < n { |
542 | return None; |
543 | } |
544 | |
545 | Some(src.split_to(n)) |
546 | } |
547 | } |
548 | |
549 | impl Decoder for LengthDelimitedCodec { |
550 | type Item = BytesMut; |
551 | type Error = io::Error; |
552 | |
553 | fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { |
554 | let n = match self.state { |
555 | DecodeState::Head => match self.decode_head(src)? { |
556 | Some(n) => { |
557 | self.state = DecodeState::Data(n); |
558 | n |
559 | } |
560 | None => return Ok(None), |
561 | }, |
562 | DecodeState::Data(n) => n, |
563 | }; |
564 | |
565 | match self.decode_data(n, src) { |
566 | Some(data) => { |
567 | // Update the decode state |
568 | self.state = DecodeState::Head; |
569 | |
570 | // Make sure the buffer has enough space to read the next head |
571 | src.reserve(self.builder.num_head_bytes()); |
572 | |
573 | Ok(Some(data)) |
574 | } |
575 | None => Ok(None), |
576 | } |
577 | } |
578 | } |
579 | |
580 | impl Encoder<Bytes> for LengthDelimitedCodec { |
581 | type Error = io::Error; |
582 | |
583 | fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { |
584 | let n = data.len(); |
585 | |
586 | if n > self.builder.max_frame_len { |
587 | return Err(io::Error::new( |
588 | io::ErrorKind::InvalidInput, |
589 | LengthDelimitedCodecError { _priv: () }, |
590 | )); |
591 | } |
592 | |
593 | // Adjust `n` with bounds checking |
594 | let n = if self.builder.length_adjustment < 0 { |
595 | n.checked_add(-self.builder.length_adjustment as usize) |
596 | } else { |
597 | n.checked_sub(self.builder.length_adjustment as usize) |
598 | }; |
599 | |
600 | let n = n.ok_or_else(|| { |
601 | io::Error::new( |
602 | io::ErrorKind::InvalidInput, |
603 | "provided length would overflow after adjustment" , |
604 | ) |
605 | })?; |
606 | |
607 | // Reserve capacity in the destination buffer to fit the frame and |
608 | // length field (plus adjustment). |
609 | dst.reserve(self.builder.length_field_len + n); |
610 | |
611 | if self.builder.length_field_is_big_endian { |
612 | dst.put_uint(n as u64, self.builder.length_field_len); |
613 | } else { |
614 | dst.put_uint_le(n as u64, self.builder.length_field_len); |
615 | } |
616 | |
617 | // Write the frame to the buffer |
618 | dst.extend_from_slice(&data[..]); |
619 | |
620 | Ok(()) |
621 | } |
622 | } |
623 | |
624 | impl Default for LengthDelimitedCodec { |
625 | fn default() -> Self { |
626 | Self::new() |
627 | } |
628 | } |
629 | |
630 | // ===== impl Builder ===== |
631 | |
632 | mod builder { |
633 | /// Types that can be used with `Builder::length_field_type`. |
634 | pub trait LengthFieldType {} |
635 | |
636 | impl LengthFieldType for u8 {} |
637 | impl LengthFieldType for u16 {} |
638 | impl LengthFieldType for u32 {} |
639 | impl LengthFieldType for u64 {} |
640 | |
641 | #[cfg (any( |
642 | target_pointer_width = "8" , |
643 | target_pointer_width = "16" , |
644 | target_pointer_width = "32" , |
645 | target_pointer_width = "64" , |
646 | ))] |
647 | impl LengthFieldType for usize {} |
648 | } |
649 | |
650 | impl Builder { |
651 | /// Creates a new length delimited codec builder with default configuration |
652 | /// values. |
653 | /// |
654 | /// # Examples |
655 | /// |
656 | /// ``` |
657 | /// # use tokio::io::AsyncRead; |
658 | /// use tokio_util::codec::LengthDelimitedCodec; |
659 | /// |
660 | /// # fn bind_read<T: AsyncRead>(io: T) { |
661 | /// LengthDelimitedCodec::builder() |
662 | /// .length_field_offset(0) |
663 | /// .length_field_type::<u16>() |
664 | /// .length_adjustment(0) |
665 | /// .num_skip(0) |
666 | /// .new_read(io); |
667 | /// # } |
668 | /// # pub fn main() {} |
669 | /// ``` |
670 | pub fn new() -> Builder { |
671 | Builder { |
672 | // Default max frame length of 8MB |
673 | max_frame_len: 8 * 1_024 * 1_024, |
674 | |
675 | // Default byte length of 4 |
676 | length_field_len: 4, |
677 | |
678 | // Default to the header field being at the start of the header. |
679 | length_field_offset: 0, |
680 | |
681 | length_adjustment: 0, |
682 | |
683 | // Total number of bytes to skip before reading the payload, if not set, |
684 | // `length_field_len + length_field_offset` |
685 | num_skip: None, |
686 | |
687 | // Default to reading the length field in network (big) endian. |
688 | length_field_is_big_endian: true, |
689 | } |
690 | } |
691 | |
692 | /// Read the length field as a big endian integer |
693 | /// |
694 | /// This is the default setting. |
695 | /// |
696 | /// This configuration option applies to both encoding and decoding. |
697 | /// |
698 | /// # Examples |
699 | /// |
700 | /// ``` |
701 | /// # use tokio::io::AsyncRead; |
702 | /// use tokio_util::codec::LengthDelimitedCodec; |
703 | /// |
704 | /// # fn bind_read<T: AsyncRead>(io: T) { |
705 | /// LengthDelimitedCodec::builder() |
706 | /// .big_endian() |
707 | /// .new_read(io); |
708 | /// # } |
709 | /// # pub fn main() {} |
710 | /// ``` |
711 | pub fn big_endian(&mut self) -> &mut Self { |
712 | self.length_field_is_big_endian = true; |
713 | self |
714 | } |
715 | |
716 | /// Read the length field as a little endian integer |
717 | /// |
718 | /// The default setting is big endian. |
719 | /// |
720 | /// This configuration option applies to both encoding and decoding. |
721 | /// |
722 | /// # Examples |
723 | /// |
724 | /// ``` |
725 | /// # use tokio::io::AsyncRead; |
726 | /// use tokio_util::codec::LengthDelimitedCodec; |
727 | /// |
728 | /// # fn bind_read<T: AsyncRead>(io: T) { |
729 | /// LengthDelimitedCodec::builder() |
730 | /// .little_endian() |
731 | /// .new_read(io); |
732 | /// # } |
733 | /// # pub fn main() {} |
734 | /// ``` |
735 | pub fn little_endian(&mut self) -> &mut Self { |
736 | self.length_field_is_big_endian = false; |
737 | self |
738 | } |
739 | |
740 | /// Read the length field as a native endian integer |
741 | /// |
742 | /// The default setting is big endian. |
743 | /// |
744 | /// This configuration option applies to both encoding and decoding. |
745 | /// |
746 | /// # Examples |
747 | /// |
748 | /// ``` |
749 | /// # use tokio::io::AsyncRead; |
750 | /// use tokio_util::codec::LengthDelimitedCodec; |
751 | /// |
752 | /// # fn bind_read<T: AsyncRead>(io: T) { |
753 | /// LengthDelimitedCodec::builder() |
754 | /// .native_endian() |
755 | /// .new_read(io); |
756 | /// # } |
757 | /// # pub fn main() {} |
758 | /// ``` |
759 | pub fn native_endian(&mut self) -> &mut Self { |
760 | if cfg!(target_endian = "big" ) { |
761 | self.big_endian() |
762 | } else { |
763 | self.little_endian() |
764 | } |
765 | } |
766 | |
767 | /// Sets the max frame length in bytes |
768 | /// |
769 | /// This configuration option applies to both encoding and decoding. The |
770 | /// default value is 8MB. |
771 | /// |
772 | /// When decoding, the length field read from the byte stream is checked |
773 | /// against this setting **before** any adjustments are applied. When |
774 | /// encoding, the length of the submitted payload is checked against this |
775 | /// setting. |
776 | /// |
777 | /// When frames exceed the max length, an `io::Error` with the custom value |
778 | /// of the `LengthDelimitedCodecError` type will be returned. |
779 | /// |
780 | /// # Examples |
781 | /// |
782 | /// ``` |
783 | /// # use tokio::io::AsyncRead; |
784 | /// use tokio_util::codec::LengthDelimitedCodec; |
785 | /// |
786 | /// # fn bind_read<T: AsyncRead>(io: T) { |
787 | /// LengthDelimitedCodec::builder() |
788 | /// .max_frame_length(8 * 1024 * 1024) |
789 | /// .new_read(io); |
790 | /// # } |
791 | /// # pub fn main() {} |
792 | /// ``` |
793 | pub fn max_frame_length(&mut self, val: usize) -> &mut Self { |
794 | self.max_frame_len = val; |
795 | self |
796 | } |
797 | |
798 | /// Sets the unsigned integer type used to represent the length field. |
799 | /// |
800 | /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on |
801 | /// 64-bit targets). |
802 | /// |
803 | /// # Examples |
804 | /// |
805 | /// ``` |
806 | /// # use tokio::io::AsyncRead; |
807 | /// use tokio_util::codec::LengthDelimitedCodec; |
808 | /// |
809 | /// # fn bind_read<T: AsyncRead>(io: T) { |
810 | /// LengthDelimitedCodec::builder() |
811 | /// .length_field_type::<u32>() |
812 | /// .new_read(io); |
813 | /// # } |
814 | /// # pub fn main() {} |
815 | /// ``` |
816 | /// |
817 | /// Unlike [`Builder::length_field_length`], this does not fail at runtime |
818 | /// and instead produces a compile error: |
819 | /// |
820 | /// ```compile_fail |
821 | /// # use tokio::io::AsyncRead; |
822 | /// # use tokio_util::codec::LengthDelimitedCodec; |
823 | /// # fn bind_read<T: AsyncRead>(io: T) { |
824 | /// LengthDelimitedCodec::builder() |
825 | /// .length_field_type::<u128>() |
826 | /// .new_read(io); |
827 | /// # } |
828 | /// # pub fn main() {} |
829 | /// ``` |
830 | pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self { |
831 | self.length_field_length(mem::size_of::<T>()) |
832 | } |
833 | |
834 | /// Sets the number of bytes used to represent the length field |
835 | /// |
836 | /// The default value is `4`. The max value is `8`. |
837 | /// |
838 | /// This configuration option applies to both encoding and decoding. |
839 | /// |
840 | /// # Examples |
841 | /// |
842 | /// ``` |
843 | /// # use tokio::io::AsyncRead; |
844 | /// use tokio_util::codec::LengthDelimitedCodec; |
845 | /// |
846 | /// # fn bind_read<T: AsyncRead>(io: T) { |
847 | /// LengthDelimitedCodec::builder() |
848 | /// .length_field_length(4) |
849 | /// .new_read(io); |
850 | /// # } |
851 | /// # pub fn main() {} |
852 | /// ``` |
853 | pub fn length_field_length(&mut self, val: usize) -> &mut Self { |
854 | assert!(val > 0 && val <= 8, "invalid length field length" ); |
855 | self.length_field_len = val; |
856 | self |
857 | } |
858 | |
859 | /// Sets the number of bytes in the header before the length field |
860 | /// |
861 | /// This configuration option only applies to decoding. |
862 | /// |
863 | /// # Examples |
864 | /// |
865 | /// ``` |
866 | /// # use tokio::io::AsyncRead; |
867 | /// use tokio_util::codec::LengthDelimitedCodec; |
868 | /// |
869 | /// # fn bind_read<T: AsyncRead>(io: T) { |
870 | /// LengthDelimitedCodec::builder() |
871 | /// .length_field_offset(1) |
872 | /// .new_read(io); |
873 | /// # } |
874 | /// # pub fn main() {} |
875 | /// ``` |
876 | pub fn length_field_offset(&mut self, val: usize) -> &mut Self { |
877 | self.length_field_offset = val; |
878 | self |
879 | } |
880 | |
881 | /// Delta between the payload length specified in the header and the real |
882 | /// payload length |
883 | /// |
884 | /// # Examples |
885 | /// |
886 | /// ``` |
887 | /// # use tokio::io::AsyncRead; |
888 | /// use tokio_util::codec::LengthDelimitedCodec; |
889 | /// |
890 | /// # fn bind_read<T: AsyncRead>(io: T) { |
891 | /// LengthDelimitedCodec::builder() |
892 | /// .length_adjustment(-2) |
893 | /// .new_read(io); |
894 | /// # } |
895 | /// # pub fn main() {} |
896 | /// ``` |
897 | pub fn length_adjustment(&mut self, val: isize) -> &mut Self { |
898 | self.length_adjustment = val; |
899 | self |
900 | } |
901 | |
902 | /// Sets the number of bytes to skip before reading the payload |
903 | /// |
904 | /// Default value is `length_field_len + length_field_offset` |
905 | /// |
906 | /// This configuration option only applies to decoding |
907 | /// |
908 | /// # Examples |
909 | /// |
910 | /// ``` |
911 | /// # use tokio::io::AsyncRead; |
912 | /// use tokio_util::codec::LengthDelimitedCodec; |
913 | /// |
914 | /// # fn bind_read<T: AsyncRead>(io: T) { |
915 | /// LengthDelimitedCodec::builder() |
916 | /// .num_skip(4) |
917 | /// .new_read(io); |
918 | /// # } |
919 | /// # pub fn main() {} |
920 | /// ``` |
921 | pub fn num_skip(&mut self, val: usize) -> &mut Self { |
922 | self.num_skip = Some(val); |
923 | self |
924 | } |
925 | |
926 | /// Create a configured length delimited `LengthDelimitedCodec` |
927 | /// |
928 | /// # Examples |
929 | /// |
930 | /// ``` |
931 | /// use tokio_util::codec::LengthDelimitedCodec; |
932 | /// # pub fn main() { |
933 | /// LengthDelimitedCodec::builder() |
934 | /// .length_field_offset(0) |
935 | /// .length_field_type::<u16>() |
936 | /// .length_adjustment(0) |
937 | /// .num_skip(0) |
938 | /// .new_codec(); |
939 | /// # } |
940 | /// ``` |
941 | pub fn new_codec(&self) -> LengthDelimitedCodec { |
942 | LengthDelimitedCodec { |
943 | builder: *self, |
944 | state: DecodeState::Head, |
945 | } |
946 | } |
947 | |
948 | /// Create a configured length delimited `FramedRead` |
949 | /// |
950 | /// # Examples |
951 | /// |
952 | /// ``` |
953 | /// # use tokio::io::AsyncRead; |
954 | /// use tokio_util::codec::LengthDelimitedCodec; |
955 | /// |
956 | /// # fn bind_read<T: AsyncRead>(io: T) { |
957 | /// LengthDelimitedCodec::builder() |
958 | /// .length_field_offset(0) |
959 | /// .length_field_type::<u16>() |
960 | /// .length_adjustment(0) |
961 | /// .num_skip(0) |
962 | /// .new_read(io); |
963 | /// # } |
964 | /// # pub fn main() {} |
965 | /// ``` |
966 | pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> |
967 | where |
968 | T: AsyncRead, |
969 | { |
970 | FramedRead::new(upstream, self.new_codec()) |
971 | } |
972 | |
973 | /// Create a configured length delimited `FramedWrite` |
974 | /// |
975 | /// # Examples |
976 | /// |
977 | /// ``` |
978 | /// # use tokio::io::AsyncWrite; |
979 | /// # use tokio_util::codec::LengthDelimitedCodec; |
980 | /// # fn write_frame<T: AsyncWrite>(io: T) { |
981 | /// LengthDelimitedCodec::builder() |
982 | /// .length_field_type::<u16>() |
983 | /// .new_write(io); |
984 | /// # } |
985 | /// # pub fn main() {} |
986 | /// ``` |
987 | pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> |
988 | where |
989 | T: AsyncWrite, |
990 | { |
991 | FramedWrite::new(inner, self.new_codec()) |
992 | } |
993 | |
994 | /// Create a configured length delimited `Framed` |
995 | /// |
996 | /// # Examples |
997 | /// |
998 | /// ``` |
999 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1000 | /// # use tokio_util::codec::LengthDelimitedCodec; |
1001 | /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { |
1002 | /// # let _ = |
1003 | /// LengthDelimitedCodec::builder() |
1004 | /// .length_field_type::<u16>() |
1005 | /// .new_framed(io); |
1006 | /// # } |
1007 | /// # pub fn main() {} |
1008 | /// ``` |
1009 | pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> |
1010 | where |
1011 | T: AsyncRead + AsyncWrite, |
1012 | { |
1013 | Framed::new(inner, self.new_codec()) |
1014 | } |
1015 | |
1016 | fn num_head_bytes(&self) -> usize { |
1017 | let num = self.length_field_offset + self.length_field_len; |
1018 | cmp::max(num, self.num_skip.unwrap_or(0)) |
1019 | } |
1020 | |
1021 | fn get_num_skip(&self) -> usize { |
1022 | self.num_skip |
1023 | .unwrap_or(self.length_field_offset + self.length_field_len) |
1024 | } |
1025 | } |
1026 | |
1027 | impl Default for Builder { |
1028 | fn default() -> Self { |
1029 | Self::new() |
1030 | } |
1031 | } |
1032 | |
1033 | // ===== impl LengthDelimitedCodecError ===== |
1034 | |
1035 | impl fmt::Debug for LengthDelimitedCodecError { |
1036 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1037 | f.debug_struct(name:"LengthDelimitedCodecError" ).finish() |
1038 | } |
1039 | } |
1040 | |
1041 | impl fmt::Display for LengthDelimitedCodecError { |
1042 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1043 | f.write_str(data:"frame size too big" ) |
1044 | } |
1045 | } |
1046 | |
1047 | impl StdError for LengthDelimitedCodecError {} |
1048 | |