1 | //! Frame a stream of bytes based on a length prefix |
2 | //! |
3 | //! Many protocols delimit their frames by prefacing frame data with a |
4 | //! frame head that specifies the length of the frame. The |
5 | //! `length_delimited` module provides utilities for handling the length |
6 | //! based framing. This allows the consumer to work with entire frames |
7 | //! without having to worry about buffering or other framing logic. |
8 | //! |
9 | //! # Getting started |
10 | //! |
11 | //! If implementing a protocol from scratch, using length delimited framing |
12 | //! is an easy way to get started. [`LengthDelimitedCodec::new()`] will |
13 | //! return a length delimited codec using default configuration values. |
14 | //! This can then be used to construct a framer to adapt a full-duplex |
15 | //! byte stream into a stream of frames. |
16 | //! |
17 | //! ``` |
18 | //! use tokio::io::{AsyncRead, AsyncWrite}; |
19 | //! use tokio_util::codec::{Framed, LengthDelimitedCodec}; |
20 | //! |
21 | //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) |
22 | //! -> Framed<T, LengthDelimitedCodec> |
23 | //! { |
24 | //! Framed::new(io, LengthDelimitedCodec::new()) |
25 | //! } |
26 | //! # pub fn main() {} |
27 | //! ``` |
28 | //! |
29 | //! The returned transport implements `Sink + Stream` for `BytesMut`. It |
30 | //! encodes the frame with a big-endian `u32` header denoting the frame |
31 | //! payload length: |
32 | //! |
33 | //! ```text |
34 | //! +----------+--------------------------------+ |
35 | //! | len: u32 | frame payload | |
36 | //! +----------+--------------------------------+ |
37 | //! ``` |
38 | //! |
39 | //! Specifically, given the following: |
40 | //! |
41 | //! ``` |
42 | //! use tokio::io::{AsyncRead, AsyncWrite}; |
43 | //! use tokio_util::codec::{Framed, LengthDelimitedCodec}; |
44 | //! |
45 | //! use futures::SinkExt; |
46 | //! use bytes::Bytes; |
47 | //! |
48 | //! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>> |
49 | //! where |
50 | //! T: AsyncRead + AsyncWrite + Unpin, |
51 | //! { |
52 | //! let mut transport = Framed::new(io, LengthDelimitedCodec::new()); |
53 | //! let frame = Bytes::from("hello world" ); |
54 | //! |
55 | //! transport.send(frame).await?; |
56 | //! Ok(()) |
57 | //! } |
58 | //! ``` |
59 | //! |
60 | //! The encoded frame will look like this: |
61 | //! |
62 | //! ```text |
63 | //! +---- len: u32 ----+---- data ----+ |
64 | //! | \x00\x00\x00\x0b | hello world | |
65 | //! +------------------+--------------+ |
66 | //! ``` |
67 | //! |
68 | //! # Decoding |
69 | //! |
70 | //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], |
71 | //! such that each yielded [`BytesMut`] value contains the contents of an |
72 | //! entire frame. There are many configuration parameters enabling |
73 | //! [`FramedRead`] to handle a wide range of protocols. Here are some |
74 | //! examples that will cover the various options at a high level. |
75 | //! |
76 | //! ## Example 1 |
77 | //! |
78 | //! The following will parse a `u16` length field at offset 0, including the |
79 | //! frame head in the yielded `BytesMut`. |
80 | //! |
81 | //! ``` |
82 | //! # use tokio::io::AsyncRead; |
83 | //! # use tokio_util::codec::LengthDelimitedCodec; |
84 | //! # fn bind_read<T: AsyncRead>(io: T) { |
85 | //! LengthDelimitedCodec::builder() |
86 | //! .length_field_offset(0) // default value |
87 | //! .length_field_type::<u16>() |
88 | //! .length_adjustment(0) // default value |
89 | //! .num_skip(0) // Do not strip frame header |
90 | //! .new_read(io); |
91 | //! # } |
92 | //! # pub fn main() {} |
93 | //! ``` |
94 | //! |
95 | //! The following frame will be decoded as such: |
96 | //! |
97 | //! ```text |
98 | //! INPUT DECODED |
99 | //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ |
100 | //! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | |
101 | //! +----------+---------------+ +----------+---------------+ |
102 | //! ``` |
103 | //! |
104 | //! The value of the length field is 11 (`\x0B`) which represents the length |
105 | //! of the payload, `hello world`. By default, [`FramedRead`] assumes that |
106 | //! the length field represents the number of bytes that **follows** the |
107 | //! length field. Thus, the entire frame has a length of 13: 2 bytes for the |
108 | //! frame head + 11 bytes for the payload. |
109 | //! |
110 | //! ## Example 2 |
111 | //! |
112 | //! The following will parse a `u16` length field at offset 0, omitting the |
113 | //! frame head in the yielded `BytesMut`. |
114 | //! |
115 | //! ``` |
116 | //! # use tokio::io::AsyncRead; |
117 | //! # use tokio_util::codec::LengthDelimitedCodec; |
118 | //! # fn bind_read<T: AsyncRead>(io: T) { |
119 | //! LengthDelimitedCodec::builder() |
120 | //! .length_field_offset(0) // default value |
121 | //! .length_field_type::<u16>() |
122 | //! .length_adjustment(0) // default value |
123 | //! // `num_skip` is not needed, the default is to skip |
124 | //! .new_read(io); |
125 | //! # } |
126 | //! # pub fn main() {} |
127 | //! ``` |
128 | //! |
129 | //! The following frame will be decoded as such: |
130 | //! |
131 | //! ```text |
132 | //! INPUT DECODED |
133 | //! +-- len ---+--- Payload ---+ +--- Payload ---+ |
134 | //! | \x00\x0B | Hello world | --> | Hello world | |
135 | //! +----------+---------------+ +---------------+ |
136 | //! ``` |
137 | //! |
138 | //! This is similar to the first example, the only difference is that the |
139 | //! frame head is **not** included in the yielded `BytesMut` value. |
140 | //! |
141 | //! ## Example 3 |
142 | //! |
143 | //! The following will parse a `u16` length field at offset 0, including the |
144 | //! frame head in the yielded `BytesMut`. In this case, the length field |
145 | //! **includes** the frame head length. |
146 | //! |
147 | //! ``` |
148 | //! # use tokio::io::AsyncRead; |
149 | //! # use tokio_util::codec::LengthDelimitedCodec; |
150 | //! # fn bind_read<T: AsyncRead>(io: T) { |
151 | //! LengthDelimitedCodec::builder() |
152 | //! .length_field_offset(0) // default value |
153 | //! .length_field_type::<u16>() |
154 | //! .length_adjustment(-2) // size of head |
155 | //! .num_skip(0) |
156 | //! .new_read(io); |
157 | //! # } |
158 | //! # pub fn main() {} |
159 | //! ``` |
160 | //! |
161 | //! The following frame will be decoded as such: |
162 | //! |
163 | //! ```text |
164 | //! INPUT DECODED |
165 | //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ |
166 | //! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | |
167 | //! +----------+---------------+ +----------+---------------+ |
168 | //! ``` |
169 | //! |
170 | //! In most cases, the length field represents the length of the payload |
171 | //! only, as shown in the previous examples. However, in some protocols the |
172 | //! length field represents the length of the whole frame, including the |
173 | //! head. In such cases, we specify a negative `length_adjustment` to adjust |
174 | //! the value provided in the frame head to represent the payload length. |
175 | //! |
176 | //! ## Example 4 |
177 | //! |
178 | //! The following will parse a 3 byte length field at offset 0 in a 5 byte |
179 | //! frame head, including the frame head in the yielded `BytesMut`. |
180 | //! |
181 | //! ``` |
182 | //! # use tokio::io::AsyncRead; |
183 | //! # use tokio_util::codec::LengthDelimitedCodec; |
184 | //! # fn bind_read<T: AsyncRead>(io: T) { |
185 | //! LengthDelimitedCodec::builder() |
186 | //! .length_field_offset(0) // default value |
187 | //! .length_field_length(3) |
188 | //! .length_adjustment(2) // remaining head |
189 | //! .num_skip(0) |
190 | //! .new_read(io); |
191 | //! # } |
192 | //! # pub fn main() {} |
193 | //! ``` |
194 | //! |
195 | //! The following frame will be decoded as such: |
196 | //! |
197 | //! ```text |
198 | //! INPUT |
199 | //! +---- len -----+- head -+--- Payload ---+ |
200 | //! | \x00\x00\x0B | \xCAFE | Hello world | |
201 | //! +--------------+--------+---------------+ |
202 | //! |
203 | //! DECODED |
204 | //! +---- len -----+- head -+--- Payload ---+ |
205 | //! | \x00\x00\x0B | \xCAFE | Hello world | |
206 | //! +--------------+--------+---------------+ |
207 | //! ``` |
208 | //! |
209 | //! A more advanced example that shows a case where there is extra frame |
210 | //! head data between the length field and the payload. In such cases, it is |
211 | //! usually desirable to include the frame head as part of the yielded |
212 | //! `BytesMut`. This lets consumers of the length delimited framer to |
213 | //! process the frame head as needed. |
214 | //! |
215 | //! The positive `length_adjustment` value lets `FramedRead` factor in the |
216 | //! additional head into the frame length calculation. |
217 | //! |
218 | //! ## Example 5 |
219 | //! |
220 | //! The following will parse a `u16` length field at offset 1 of a 4 byte |
221 | //! frame head. The first byte and the length field will be omitted from the |
222 | //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be |
223 | //! included. |
224 | //! |
225 | //! ``` |
226 | //! # use tokio::io::AsyncRead; |
227 | //! # use tokio_util::codec::LengthDelimitedCodec; |
228 | //! # fn bind_read<T: AsyncRead>(io: T) { |
229 | //! LengthDelimitedCodec::builder() |
230 | //! .length_field_offset(1) // length of hdr1 |
231 | //! .length_field_type::<u16>() |
232 | //! .length_adjustment(1) // length of hdr2 |
233 | //! .num_skip(3) // length of hdr1 + LEN |
234 | //! .new_read(io); |
235 | //! # } |
236 | //! # pub fn main() {} |
237 | //! ``` |
238 | //! |
239 | //! The following frame will be decoded as such: |
240 | //! |
241 | //! ```text |
242 | //! INPUT |
243 | //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ |
244 | //! | \xCA | \x00\x0B | \xFE | Hello world | |
245 | //! +--------+----------+--------+---------------+ |
246 | //! |
247 | //! DECODED |
248 | //! +- hdr2 -+--- Payload ---+ |
249 | //! | \xFE | Hello world | |
250 | //! +--------+---------------+ |
251 | //! ``` |
252 | //! |
253 | //! The length field is situated in the middle of the frame head. In this |
254 | //! case, the first byte in the frame head could be a version or some other |
255 | //! identifier that is not needed for processing. On the other hand, the |
256 | //! second half of the head is needed. |
257 | //! |
258 | //! `length_field_offset` indicates how many bytes to skip before starting |
259 | //! to read the length field. `length_adjustment` is the number of bytes to |
260 | //! skip starting at the end of the length field. In this case, it is the |
261 | //! second half of the head. |
262 | //! |
263 | //! ## Example 6 |
264 | //! |
265 | //! The following will parse a `u16` length field at offset 1 of a 4 byte |
266 | //! frame head. The first byte and the length field will be omitted from the |
267 | //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be |
268 | //! included. In this case, the length field **includes** the frame head |
269 | //! length. |
270 | //! |
271 | //! ``` |
272 | //! # use tokio::io::AsyncRead; |
273 | //! # use tokio_util::codec::LengthDelimitedCodec; |
274 | //! # fn bind_read<T: AsyncRead>(io: T) { |
275 | //! LengthDelimitedCodec::builder() |
276 | //! .length_field_offset(1) // length of hdr1 |
277 | //! .length_field_type::<u16>() |
278 | //! .length_adjustment(-3) // length of hdr1 + LEN, negative |
279 | //! .num_skip(3) |
280 | //! .new_read(io); |
281 | //! # } |
282 | //! ``` |
283 | //! |
284 | //! The following frame will be decoded as such: |
285 | //! |
286 | //! ```text |
287 | //! INPUT |
288 | //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ |
289 | //! | \xCA | \x00\x0F | \xFE | Hello world | |
290 | //! +--------+----------+--------+---------------+ |
291 | //! |
292 | //! DECODED |
293 | //! +- hdr2 -+--- Payload ---+ |
294 | //! | \xFE | Hello world | |
295 | //! +--------+---------------+ |
296 | //! ``` |
297 | //! |
298 | //! Similar to the example above, the difference is that the length field |
299 | //! represents the length of the entire frame instead of just the payload. |
300 | //! The length of `hdr1` and `len` must be counted in `length_adjustment`. |
301 | //! Note that the length of `hdr2` does **not** need to be explicitly set |
302 | //! anywhere because it already is factored into the total frame length that |
303 | //! is read from the byte stream. |
304 | //! |
305 | //! ## Example 7 |
306 | //! |
307 | //! The following will parse a 3 byte length field at offset 0 in a 4 byte |
308 | //! frame head, excluding the 4th byte from the yielded `BytesMut`. |
309 | //! |
310 | //! ``` |
311 | //! # use tokio::io::AsyncRead; |
312 | //! # use tokio_util::codec::LengthDelimitedCodec; |
313 | //! # fn bind_read<T: AsyncRead>(io: T) { |
314 | //! LengthDelimitedCodec::builder() |
315 | //! .length_field_offset(0) // default value |
316 | //! .length_field_length(3) |
317 | //! .length_adjustment(0) // default value |
318 | //! .num_skip(4) // skip the first 4 bytes |
319 | //! .new_read(io); |
320 | //! # } |
321 | //! # pub fn main() {} |
322 | //! ``` |
323 | //! |
324 | //! The following frame will be decoded as such: |
325 | //! |
326 | //! ```text |
327 | //! INPUT DECODED |
328 | //! +------- len ------+--- Payload ---+ +--- Payload ---+ |
329 | //! | \x00\x00\x0B\xFF | Hello world | => | Hello world | |
330 | //! +------------------+---------------+ +---------------+ |
331 | //! ``` |
332 | //! |
333 | //! A simple example where there are unused bytes between the length field |
334 | //! and the payload. |
335 | //! |
336 | //! # Encoding |
337 | //! |
338 | //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], |
339 | //! such that each submitted [`BytesMut`] is prefaced by a length field. |
340 | //! There are fewer configuration options than [`FramedRead`]. Given |
341 | //! protocols that have more complex frame heads, an encoder should probably |
342 | //! be written by hand using [`Encoder`]. |
343 | //! |
344 | //! Here is a simple example, given a `FramedWrite` with the following |
345 | //! configuration: |
346 | //! |
347 | //! ``` |
348 | //! # use tokio::io::AsyncWrite; |
349 | //! # use tokio_util::codec::LengthDelimitedCodec; |
350 | //! # fn write_frame<T: AsyncWrite>(io: T) { |
351 | //! # let _ = |
352 | //! LengthDelimitedCodec::builder() |
353 | //! .length_field_type::<u16>() |
354 | //! .new_write(io); |
355 | //! # } |
356 | //! # pub fn main() {} |
357 | //! ``` |
358 | //! |
359 | //! A payload of `hello world` will be encoded as: |
360 | //! |
361 | //! ```text |
362 | //! +- len: u16 -+---- data ----+ |
363 | //! | \x00\x0b | hello world | |
364 | //! +------------+--------------+ |
365 | //! ``` |
366 | //! |
367 | //! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new |
368 | //! [`FramedRead`]: struct@FramedRead |
369 | //! [`FramedWrite`]: struct@FramedWrite |
370 | //! [`AsyncRead`]: trait@tokio::io::AsyncRead |
371 | //! [`AsyncWrite`]: trait@tokio::io::AsyncWrite |
372 | //! [`Encoder`]: trait@Encoder |
373 | //! [`BytesMut`]: bytes::BytesMut |
374 | |
375 | use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}; |
376 | |
377 | use tokio::io::{AsyncRead, AsyncWrite}; |
378 | |
379 | use bytes::{Buf, BufMut, Bytes, BytesMut}; |
380 | use std::error::Error as StdError; |
381 | use std::io::{self, Cursor}; |
382 | use std::{cmp, fmt, mem}; |
383 | |
384 | /// Configure length delimited `LengthDelimitedCodec`s. |
385 | /// |
386 | /// `Builder` enables constructing configured length delimited codecs. Note |
387 | /// that not all configuration settings apply to both encoding and decoding. See |
388 | /// the documentation for specific methods for more detail. |
389 | #[derive (Debug, Clone, Copy)] |
390 | pub struct Builder { |
391 | // Maximum frame length |
392 | max_frame_len: usize, |
393 | |
394 | // Number of bytes representing the field length |
395 | length_field_len: usize, |
396 | |
397 | // Number of bytes in the header before the length field |
398 | length_field_offset: usize, |
399 | |
400 | // Adjust the length specified in the header field by this amount |
401 | length_adjustment: isize, |
402 | |
403 | // Total number of bytes to skip before reading the payload, if not set, |
404 | // `length_field_len + length_field_offset` |
405 | num_skip: Option<usize>, |
406 | |
407 | // Length field byte order (little or big endian) |
408 | length_field_is_big_endian: bool, |
409 | } |
410 | |
411 | /// An error when the number of bytes read is more than max frame length. |
412 | pub struct LengthDelimitedCodecError { |
413 | _priv: (), |
414 | } |
415 | |
416 | /// A codec for frames delimited by a frame head specifying their lengths. |
417 | /// |
418 | /// This allows the consumer to work with entire frames without having to worry |
419 | /// about buffering or other framing logic. |
420 | /// |
421 | /// See [module level] documentation for more detail. |
422 | /// |
423 | /// [module level]: index.html |
424 | #[derive (Debug, Clone)] |
425 | pub struct LengthDelimitedCodec { |
426 | // Configuration values |
427 | builder: Builder, |
428 | |
429 | // Read state |
430 | state: DecodeState, |
431 | } |
432 | |
433 | #[derive (Debug, Clone, Copy)] |
434 | enum DecodeState { |
435 | Head, |
436 | Data(usize), |
437 | } |
438 | |
439 | // ===== impl LengthDelimitedCodec ====== |
440 | |
441 | impl LengthDelimitedCodec { |
442 | /// Creates a new `LengthDelimitedCodec` with the default configuration values. |
443 | pub fn new() -> Self { |
444 | Self { |
445 | builder: Builder::new(), |
446 | state: DecodeState::Head, |
447 | } |
448 | } |
449 | |
450 | /// Creates a new length delimited codec builder with default configuration |
451 | /// values. |
452 | pub fn builder() -> Builder { |
453 | Builder::new() |
454 | } |
455 | |
456 | /// Returns the current max frame setting |
457 | /// |
458 | /// This is the largest size this codec will accept from the wire. Larger |
459 | /// frames will be rejected. |
460 | pub fn max_frame_length(&self) -> usize { |
461 | self.builder.max_frame_len |
462 | } |
463 | |
464 | /// Updates the max frame setting. |
465 | /// |
466 | /// The change takes effect the next time a frame is decoded. In other |
467 | /// words, if a frame is currently in process of being decoded with a frame |
468 | /// size greater than `val` but less than the max frame length in effect |
469 | /// before calling this function, then the frame will be allowed. |
470 | pub fn set_max_frame_length(&mut self, val: usize) { |
471 | self.builder.max_frame_length(val); |
472 | } |
473 | |
474 | fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { |
475 | let head_len = self.builder.num_head_bytes(); |
476 | let field_len = self.builder.length_field_len; |
477 | |
478 | if src.len() < head_len { |
479 | // Not enough data |
480 | return Ok(None); |
481 | } |
482 | |
483 | let n = { |
484 | let mut src = Cursor::new(&mut *src); |
485 | |
486 | // Skip the required bytes |
487 | src.advance(self.builder.length_field_offset); |
488 | |
489 | // match endianness |
490 | let n = if self.builder.length_field_is_big_endian { |
491 | src.get_uint(field_len) |
492 | } else { |
493 | src.get_uint_le(field_len) |
494 | }; |
495 | |
496 | if n > self.builder.max_frame_len as u64 { |
497 | return Err(io::Error::new( |
498 | io::ErrorKind::InvalidData, |
499 | LengthDelimitedCodecError { _priv: () }, |
500 | )); |
501 | } |
502 | |
503 | // The check above ensures there is no overflow |
504 | let n = n as usize; |
505 | |
506 | // Adjust `n` with bounds checking |
507 | let n = if self.builder.length_adjustment < 0 { |
508 | n.checked_sub(-self.builder.length_adjustment as usize) |
509 | } else { |
510 | n.checked_add(self.builder.length_adjustment as usize) |
511 | }; |
512 | |
513 | // Error handling |
514 | match n { |
515 | Some(n) => n, |
516 | None => { |
517 | return Err(io::Error::new( |
518 | io::ErrorKind::InvalidInput, |
519 | "provided length would overflow after adjustment" , |
520 | )); |
521 | } |
522 | } |
523 | }; |
524 | |
525 | src.advance(self.builder.get_num_skip()); |
526 | |
527 | // Ensure that the buffer has enough space to read the incoming |
528 | // payload |
529 | src.reserve(n.saturating_sub(src.len())); |
530 | |
531 | Ok(Some(n)) |
532 | } |
533 | |
534 | fn decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut> { |
535 | // At this point, the buffer has already had the required capacity |
536 | // reserved. All there is to do is read. |
537 | if src.len() < n { |
538 | return None; |
539 | } |
540 | |
541 | Some(src.split_to(n)) |
542 | } |
543 | } |
544 | |
545 | impl Decoder for LengthDelimitedCodec { |
546 | type Item = BytesMut; |
547 | type Error = io::Error; |
548 | |
549 | fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { |
550 | let n = match self.state { |
551 | DecodeState::Head => match self.decode_head(src)? { |
552 | Some(n) => { |
553 | self.state = DecodeState::Data(n); |
554 | n |
555 | } |
556 | None => return Ok(None), |
557 | }, |
558 | DecodeState::Data(n) => n, |
559 | }; |
560 | |
561 | match self.decode_data(n, src) { |
562 | Some(data) => { |
563 | // Update the decode state |
564 | self.state = DecodeState::Head; |
565 | |
566 | // Make sure the buffer has enough space to read the next head |
567 | src.reserve(self.builder.num_head_bytes().saturating_sub(src.len())); |
568 | |
569 | Ok(Some(data)) |
570 | } |
571 | None => Ok(None), |
572 | } |
573 | } |
574 | } |
575 | |
576 | impl Encoder<Bytes> for LengthDelimitedCodec { |
577 | type Error = io::Error; |
578 | |
579 | fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { |
580 | let n = data.len(); |
581 | |
582 | if n > self.builder.max_frame_len { |
583 | return Err(io::Error::new( |
584 | io::ErrorKind::InvalidInput, |
585 | LengthDelimitedCodecError { _priv: () }, |
586 | )); |
587 | } |
588 | |
589 | // Adjust `n` with bounds checking |
590 | let n = if self.builder.length_adjustment < 0 { |
591 | n.checked_add(-self.builder.length_adjustment as usize) |
592 | } else { |
593 | n.checked_sub(self.builder.length_adjustment as usize) |
594 | }; |
595 | |
596 | let n = n.ok_or_else(|| { |
597 | io::Error::new( |
598 | io::ErrorKind::InvalidInput, |
599 | "provided length would overflow after adjustment" , |
600 | ) |
601 | })?; |
602 | |
603 | // Reserve capacity in the destination buffer to fit the frame and |
604 | // length field (plus adjustment). |
605 | dst.reserve(self.builder.length_field_len + n); |
606 | |
607 | if self.builder.length_field_is_big_endian { |
608 | dst.put_uint(n as u64, self.builder.length_field_len); |
609 | } else { |
610 | dst.put_uint_le(n as u64, self.builder.length_field_len); |
611 | } |
612 | |
613 | // Write the frame to the buffer |
614 | dst.extend_from_slice(&data[..]); |
615 | |
616 | Ok(()) |
617 | } |
618 | } |
619 | |
620 | impl Default for LengthDelimitedCodec { |
621 | fn default() -> Self { |
622 | Self::new() |
623 | } |
624 | } |
625 | |
626 | // ===== impl Builder ===== |
627 | |
628 | mod builder { |
629 | /// Types that can be used with `Builder::length_field_type`. |
630 | pub trait LengthFieldType {} |
631 | |
632 | impl LengthFieldType for u8 {} |
633 | impl LengthFieldType for u16 {} |
634 | impl LengthFieldType for u32 {} |
635 | impl LengthFieldType for u64 {} |
636 | |
637 | #[cfg (any( |
638 | target_pointer_width = "8" , |
639 | target_pointer_width = "16" , |
640 | target_pointer_width = "32" , |
641 | target_pointer_width = "64" , |
642 | ))] |
643 | impl LengthFieldType for usize {} |
644 | } |
645 | |
646 | impl Builder { |
647 | /// Creates a new length delimited codec builder with default configuration |
648 | /// values. |
649 | /// |
650 | /// # Examples |
651 | /// |
652 | /// ``` |
653 | /// # use tokio::io::AsyncRead; |
654 | /// use tokio_util::codec::LengthDelimitedCodec; |
655 | /// |
656 | /// # fn bind_read<T: AsyncRead>(io: T) { |
657 | /// LengthDelimitedCodec::builder() |
658 | /// .length_field_offset(0) |
659 | /// .length_field_type::<u16>() |
660 | /// .length_adjustment(0) |
661 | /// .num_skip(0) |
662 | /// .new_read(io); |
663 | /// # } |
664 | /// # pub fn main() {} |
665 | /// ``` |
666 | pub fn new() -> Builder { |
667 | Builder { |
668 | // Default max frame length of 8MB |
669 | max_frame_len: 8 * 1_024 * 1_024, |
670 | |
671 | // Default byte length of 4 |
672 | length_field_len: 4, |
673 | |
674 | // Default to the header field being at the start of the header. |
675 | length_field_offset: 0, |
676 | |
677 | length_adjustment: 0, |
678 | |
679 | // Total number of bytes to skip before reading the payload, if not set, |
680 | // `length_field_len + length_field_offset` |
681 | num_skip: None, |
682 | |
683 | // Default to reading the length field in network (big) endian. |
684 | length_field_is_big_endian: true, |
685 | } |
686 | } |
687 | |
688 | /// Read the length field as a big endian integer |
689 | /// |
690 | /// This is the default setting. |
691 | /// |
692 | /// This configuration option applies to both encoding and decoding. |
693 | /// |
694 | /// # Examples |
695 | /// |
696 | /// ``` |
697 | /// # use tokio::io::AsyncRead; |
698 | /// use tokio_util::codec::LengthDelimitedCodec; |
699 | /// |
700 | /// # fn bind_read<T: AsyncRead>(io: T) { |
701 | /// LengthDelimitedCodec::builder() |
702 | /// .big_endian() |
703 | /// .new_read(io); |
704 | /// # } |
705 | /// # pub fn main() {} |
706 | /// ``` |
707 | pub fn big_endian(&mut self) -> &mut Self { |
708 | self.length_field_is_big_endian = true; |
709 | self |
710 | } |
711 | |
712 | /// Read the length field as a little endian integer |
713 | /// |
714 | /// The default setting is big endian. |
715 | /// |
716 | /// This configuration option applies to both encoding and decoding. |
717 | /// |
718 | /// # Examples |
719 | /// |
720 | /// ``` |
721 | /// # use tokio::io::AsyncRead; |
722 | /// use tokio_util::codec::LengthDelimitedCodec; |
723 | /// |
724 | /// # fn bind_read<T: AsyncRead>(io: T) { |
725 | /// LengthDelimitedCodec::builder() |
726 | /// .little_endian() |
727 | /// .new_read(io); |
728 | /// # } |
729 | /// # pub fn main() {} |
730 | /// ``` |
731 | pub fn little_endian(&mut self) -> &mut Self { |
732 | self.length_field_is_big_endian = false; |
733 | self |
734 | } |
735 | |
736 | /// Read the length field as a native endian integer |
737 | /// |
738 | /// The default setting is big endian. |
739 | /// |
740 | /// This configuration option applies to both encoding and decoding. |
741 | /// |
742 | /// # Examples |
743 | /// |
744 | /// ``` |
745 | /// # use tokio::io::AsyncRead; |
746 | /// use tokio_util::codec::LengthDelimitedCodec; |
747 | /// |
748 | /// # fn bind_read<T: AsyncRead>(io: T) { |
749 | /// LengthDelimitedCodec::builder() |
750 | /// .native_endian() |
751 | /// .new_read(io); |
752 | /// # } |
753 | /// # pub fn main() {} |
754 | /// ``` |
755 | pub fn native_endian(&mut self) -> &mut Self { |
756 | if cfg!(target_endian = "big" ) { |
757 | self.big_endian() |
758 | } else { |
759 | self.little_endian() |
760 | } |
761 | } |
762 | |
763 | /// Sets the max frame length in bytes |
764 | /// |
765 | /// This configuration option applies to both encoding and decoding. The |
766 | /// default value is 8MB. |
767 | /// |
768 | /// When decoding, the length field read from the byte stream is checked |
769 | /// against this setting **before** any adjustments are applied. When |
770 | /// encoding, the length of the submitted payload is checked against this |
771 | /// setting. |
772 | /// |
773 | /// When frames exceed the max length, an `io::Error` with the custom value |
774 | /// of the `LengthDelimitedCodecError` type will be returned. |
775 | /// |
776 | /// # Examples |
777 | /// |
778 | /// ``` |
779 | /// # use tokio::io::AsyncRead; |
780 | /// use tokio_util::codec::LengthDelimitedCodec; |
781 | /// |
782 | /// # fn bind_read<T: AsyncRead>(io: T) { |
783 | /// LengthDelimitedCodec::builder() |
784 | /// .max_frame_length(8 * 1024 * 1024) |
785 | /// .new_read(io); |
786 | /// # } |
787 | /// # pub fn main() {} |
788 | /// ``` |
789 | pub fn max_frame_length(&mut self, val: usize) -> &mut Self { |
790 | self.max_frame_len = val; |
791 | self |
792 | } |
793 | |
794 | /// Sets the unsigned integer type used to represent the length field. |
795 | /// |
796 | /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on |
797 | /// 64-bit targets). |
798 | /// |
799 | /// # Examples |
800 | /// |
801 | /// ``` |
802 | /// # use tokio::io::AsyncRead; |
803 | /// use tokio_util::codec::LengthDelimitedCodec; |
804 | /// |
805 | /// # fn bind_read<T: AsyncRead>(io: T) { |
806 | /// LengthDelimitedCodec::builder() |
807 | /// .length_field_type::<u32>() |
808 | /// .new_read(io); |
809 | /// # } |
810 | /// # pub fn main() {} |
811 | /// ``` |
812 | /// |
813 | /// Unlike [`Builder::length_field_length`], this does not fail at runtime |
814 | /// and instead produces a compile error: |
815 | /// |
816 | /// ```compile_fail |
817 | /// # use tokio::io::AsyncRead; |
818 | /// # use tokio_util::codec::LengthDelimitedCodec; |
819 | /// # fn bind_read<T: AsyncRead>(io: T) { |
820 | /// LengthDelimitedCodec::builder() |
821 | /// .length_field_type::<u128>() |
822 | /// .new_read(io); |
823 | /// # } |
824 | /// # pub fn main() {} |
825 | /// ``` |
826 | pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self { |
827 | self.length_field_length(mem::size_of::<T>()) |
828 | } |
829 | |
830 | /// Sets the number of bytes used to represent the length field |
831 | /// |
832 | /// The default value is `4`. The max value is `8`. |
833 | /// |
834 | /// This configuration option applies to both encoding and decoding. |
835 | /// |
836 | /// # Examples |
837 | /// |
838 | /// ``` |
839 | /// # use tokio::io::AsyncRead; |
840 | /// use tokio_util::codec::LengthDelimitedCodec; |
841 | /// |
842 | /// # fn bind_read<T: AsyncRead>(io: T) { |
843 | /// LengthDelimitedCodec::builder() |
844 | /// .length_field_length(4) |
845 | /// .new_read(io); |
846 | /// # } |
847 | /// # pub fn main() {} |
848 | /// ``` |
849 | pub fn length_field_length(&mut self, val: usize) -> &mut Self { |
850 | assert!(val > 0 && val <= 8, "invalid length field length" ); |
851 | self.length_field_len = val; |
852 | self |
853 | } |
854 | |
855 | /// Sets the number of bytes in the header before the length field |
856 | /// |
857 | /// This configuration option only applies to decoding. |
858 | /// |
859 | /// # Examples |
860 | /// |
861 | /// ``` |
862 | /// # use tokio::io::AsyncRead; |
863 | /// use tokio_util::codec::LengthDelimitedCodec; |
864 | /// |
865 | /// # fn bind_read<T: AsyncRead>(io: T) { |
866 | /// LengthDelimitedCodec::builder() |
867 | /// .length_field_offset(1) |
868 | /// .new_read(io); |
869 | /// # } |
870 | /// # pub fn main() {} |
871 | /// ``` |
872 | pub fn length_field_offset(&mut self, val: usize) -> &mut Self { |
873 | self.length_field_offset = val; |
874 | self |
875 | } |
876 | |
877 | /// Delta between the payload length specified in the header and the real |
878 | /// payload length |
879 | /// |
880 | /// # Examples |
881 | /// |
882 | /// ``` |
883 | /// # use tokio::io::AsyncRead; |
884 | /// use tokio_util::codec::LengthDelimitedCodec; |
885 | /// |
886 | /// # fn bind_read<T: AsyncRead>(io: T) { |
887 | /// LengthDelimitedCodec::builder() |
888 | /// .length_adjustment(-2) |
889 | /// .new_read(io); |
890 | /// # } |
891 | /// # pub fn main() {} |
892 | /// ``` |
893 | pub fn length_adjustment(&mut self, val: isize) -> &mut Self { |
894 | self.length_adjustment = val; |
895 | self |
896 | } |
897 | |
898 | /// Sets the number of bytes to skip before reading the payload |
899 | /// |
900 | /// Default value is `length_field_len + length_field_offset` |
901 | /// |
902 | /// This configuration option only applies to decoding |
903 | /// |
904 | /// # Examples |
905 | /// |
906 | /// ``` |
907 | /// # use tokio::io::AsyncRead; |
908 | /// use tokio_util::codec::LengthDelimitedCodec; |
909 | /// |
910 | /// # fn bind_read<T: AsyncRead>(io: T) { |
911 | /// LengthDelimitedCodec::builder() |
912 | /// .num_skip(4) |
913 | /// .new_read(io); |
914 | /// # } |
915 | /// # pub fn main() {} |
916 | /// ``` |
917 | pub fn num_skip(&mut self, val: usize) -> &mut Self { |
918 | self.num_skip = Some(val); |
919 | self |
920 | } |
921 | |
922 | /// Create a configured length delimited `LengthDelimitedCodec` |
923 | /// |
924 | /// # Examples |
925 | /// |
926 | /// ``` |
927 | /// use tokio_util::codec::LengthDelimitedCodec; |
928 | /// # pub fn main() { |
929 | /// LengthDelimitedCodec::builder() |
930 | /// .length_field_offset(0) |
931 | /// .length_field_type::<u16>() |
932 | /// .length_adjustment(0) |
933 | /// .num_skip(0) |
934 | /// .new_codec(); |
935 | /// # } |
936 | /// ``` |
937 | pub fn new_codec(&self) -> LengthDelimitedCodec { |
938 | LengthDelimitedCodec { |
939 | builder: *self, |
940 | state: DecodeState::Head, |
941 | } |
942 | } |
943 | |
944 | /// Create a configured length delimited `FramedRead` |
945 | /// |
946 | /// # Examples |
947 | /// |
948 | /// ``` |
949 | /// # use tokio::io::AsyncRead; |
950 | /// use tokio_util::codec::LengthDelimitedCodec; |
951 | /// |
952 | /// # fn bind_read<T: AsyncRead>(io: T) { |
953 | /// LengthDelimitedCodec::builder() |
954 | /// .length_field_offset(0) |
955 | /// .length_field_type::<u16>() |
956 | /// .length_adjustment(0) |
957 | /// .num_skip(0) |
958 | /// .new_read(io); |
959 | /// # } |
960 | /// # pub fn main() {} |
961 | /// ``` |
962 | pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> |
963 | where |
964 | T: AsyncRead, |
965 | { |
966 | FramedRead::new(upstream, self.new_codec()) |
967 | } |
968 | |
969 | /// Create a configured length delimited `FramedWrite` |
970 | /// |
971 | /// # Examples |
972 | /// |
973 | /// ``` |
974 | /// # use tokio::io::AsyncWrite; |
975 | /// # use tokio_util::codec::LengthDelimitedCodec; |
976 | /// # fn write_frame<T: AsyncWrite>(io: T) { |
977 | /// LengthDelimitedCodec::builder() |
978 | /// .length_field_type::<u16>() |
979 | /// .new_write(io); |
980 | /// # } |
981 | /// # pub fn main() {} |
982 | /// ``` |
983 | pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> |
984 | where |
985 | T: AsyncWrite, |
986 | { |
987 | FramedWrite::new(inner, self.new_codec()) |
988 | } |
989 | |
990 | /// Create a configured length delimited `Framed` |
991 | /// |
992 | /// # Examples |
993 | /// |
994 | /// ``` |
995 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
996 | /// # use tokio_util::codec::LengthDelimitedCodec; |
997 | /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { |
998 | /// # let _ = |
999 | /// LengthDelimitedCodec::builder() |
1000 | /// .length_field_type::<u16>() |
1001 | /// .new_framed(io); |
1002 | /// # } |
1003 | /// # pub fn main() {} |
1004 | /// ``` |
1005 | pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> |
1006 | where |
1007 | T: AsyncRead + AsyncWrite, |
1008 | { |
1009 | Framed::new(inner, self.new_codec()) |
1010 | } |
1011 | |
1012 | fn num_head_bytes(&self) -> usize { |
1013 | let num = self.length_field_offset + self.length_field_len; |
1014 | cmp::max(num, self.num_skip.unwrap_or(0)) |
1015 | } |
1016 | |
1017 | fn get_num_skip(&self) -> usize { |
1018 | self.num_skip |
1019 | .unwrap_or(self.length_field_offset + self.length_field_len) |
1020 | } |
1021 | } |
1022 | |
1023 | impl Default for Builder { |
1024 | fn default() -> Self { |
1025 | Self::new() |
1026 | } |
1027 | } |
1028 | |
1029 | // ===== impl LengthDelimitedCodecError ===== |
1030 | |
1031 | impl fmt::Debug for LengthDelimitedCodecError { |
1032 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1033 | f.debug_struct(name:"LengthDelimitedCodecError" ).finish() |
1034 | } |
1035 | } |
1036 | |
1037 | impl fmt::Display for LengthDelimitedCodecError { |
1038 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1039 | f.write_str(data:"frame size too big" ) |
1040 | } |
1041 | } |
1042 | |
1043 | impl StdError for LengthDelimitedCodecError {} |
1044 | |