1use std::error::Error as StdError;
2use std::fmt;
3use std::io;
4use std::task::{Context, Poll};
5use std::usize;
6
7use bytes::Bytes;
8use tracing::{debug, trace};
9
10use super::io::MemRead;
11use super::DecodedLength;
12
13use self::Kind::{Chunked, Eof, Length};
14
15/// Maximum amount of bytes allowed in chunked extensions.
16///
17/// This limit is currentlty applied for the entire body, not per chunk.
18const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16;
19
20/// Decoders to handle different Transfer-Encodings.
21///
22/// If a message body does not include a Transfer-Encoding, it *should*
23/// include a Content-Length header.
24#[derive(Clone, PartialEq)]
25pub(crate) struct Decoder {
26 kind: Kind,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq)]
30enum Kind {
31 /// A Reader used when a Content-Length header is passed with a positive integer.
32 Length(u64),
33 /// A Reader used when Transfer-Encoding is `chunked`.
34 Chunked {
35 state: ChunkedState,
36 chunk_len: u64,
37 extensions_cnt: u64,
38 },
39 /// A Reader used for responses that don't indicate a length or chunked.
40 ///
41 /// The bool tracks when EOF is seen on the transport.
42 ///
43 /// Note: This should only used for `Response`s. It is illegal for a
44 /// `Request` to be made with both `Content-Length` and
45 /// `Transfer-Encoding: chunked` missing, as explained from the spec:
46 ///
47 /// > If a Transfer-Encoding header field is present in a response and
48 /// > the chunked transfer coding is not the final encoding, the
49 /// > message body length is determined by reading the connection until
50 /// > it is closed by the server. If a Transfer-Encoding header field
51 /// > is present in a request and the chunked transfer coding is not
52 /// > the final encoding, the message body length cannot be determined
53 /// > reliably; the server MUST respond with the 400 (Bad Request)
54 /// > status code and then close the connection.
55 Eof(bool),
56}
57
58#[derive(Debug, PartialEq, Clone, Copy)]
59enum ChunkedState {
60 Start,
61 Size,
62 SizeLws,
63 Extension,
64 SizeLf,
65 Body,
66 BodyCr,
67 BodyLf,
68 Trailer,
69 TrailerLf,
70 EndCr,
71 EndLf,
72 End,
73}
74
75impl Decoder {
76 // constructors
77
78 pub(crate) fn length(x: u64) -> Decoder {
79 Decoder {
80 kind: Kind::Length(x),
81 }
82 }
83
84 pub(crate) fn chunked() -> Decoder {
85 Decoder {
86 kind: Kind::Chunked {
87 state: ChunkedState::new(),
88 chunk_len: 0,
89 extensions_cnt: 0,
90 },
91 }
92 }
93
94 pub(crate) fn eof() -> Decoder {
95 Decoder {
96 kind: Kind::Eof(false),
97 }
98 }
99
100 pub(super) fn new(len: DecodedLength) -> Self {
101 match len {
102 DecodedLength::CHUNKED => Decoder::chunked(),
103 DecodedLength::CLOSE_DELIMITED => Decoder::eof(),
104 length => Decoder::length(length.danger_len()),
105 }
106 }
107
108 // methods
109
110 pub(crate) fn is_eof(&self) -> bool {
111 matches!(
112 self.kind,
113 Length(0)
114 | Chunked {
115 state: ChunkedState::End,
116 ..
117 }
118 | Eof(true)
119 )
120 }
121
122 pub(crate) fn decode<R: MemRead>(
123 &mut self,
124 cx: &mut Context<'_>,
125 body: &mut R,
126 ) -> Poll<Result<Bytes, io::Error>> {
127 trace!("decode; state={:?}", self.kind);
128 match self.kind {
129 Length(ref mut remaining) => {
130 if *remaining == 0 {
131 Poll::Ready(Ok(Bytes::new()))
132 } else {
133 let to_read = *remaining as usize;
134 let buf = ready!(body.read_mem(cx, to_read))?;
135 let num = buf.as_ref().len() as u64;
136 if num > *remaining {
137 *remaining = 0;
138 } else if num == 0 {
139 return Poll::Ready(Err(io::Error::new(
140 io::ErrorKind::UnexpectedEof,
141 IncompleteBody,
142 )));
143 } else {
144 *remaining -= num;
145 }
146 Poll::Ready(Ok(buf))
147 }
148 }
149 Chunked {
150 ref mut state,
151 ref mut chunk_len,
152 ref mut extensions_cnt,
153 } => {
154 loop {
155 let mut buf = None;
156 // advances the chunked state
157 *state = ready!(state.step(cx, body, chunk_len, extensions_cnt, &mut buf))?;
158 if *state == ChunkedState::End {
159 trace!("end of chunked");
160 return Poll::Ready(Ok(Bytes::new()));
161 }
162 if let Some(buf) = buf {
163 return Poll::Ready(Ok(buf));
164 }
165 }
166 }
167 Eof(ref mut is_eof) => {
168 if *is_eof {
169 Poll::Ready(Ok(Bytes::new()))
170 } else {
171 // 8192 chosen because its about 2 packets, there probably
172 // won't be that much available, so don't have MemReaders
173 // allocate buffers to big
174 body.read_mem(cx, 8192).map_ok(|slice| {
175 *is_eof = slice.is_empty();
176 slice
177 })
178 }
179 }
180 }
181 }
182
183 #[cfg(test)]
184 async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> {
185 futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await
186 }
187}
188
189impl fmt::Debug for Decoder {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 fmt::Debug::fmt(&self.kind, f)
192 }
193}
194
195macro_rules! byte (
196 ($rdr:ident, $cx:expr) => ({
197 let buf = ready!($rdr.read_mem($cx, 1))?;
198 if !buf.is_empty() {
199 buf[0]
200 } else {
201 return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof,
202 "unexpected EOF during chunk size line")));
203 }
204 })
205);
206
207macro_rules! or_overflow {
208 ($e:expr) => (
209 match $e {
210 Some(val) => val,
211 None => return Poll::Ready(Err(io::Error::new(
212 io::ErrorKind::InvalidData,
213 "invalid chunk size: overflow",
214 ))),
215 }
216 )
217}
218
219impl ChunkedState {
220 fn new() -> ChunkedState {
221 ChunkedState::Start
222 }
223 fn step<R: MemRead>(
224 &self,
225 cx: &mut Context<'_>,
226 body: &mut R,
227 size: &mut u64,
228 extensions_cnt: &mut u64,
229 buf: &mut Option<Bytes>,
230 ) -> Poll<Result<ChunkedState, io::Error>> {
231 use self::ChunkedState::*;
232 match *self {
233 Start => ChunkedState::read_start(cx, body, size),
234 Size => ChunkedState::read_size(cx, body, size),
235 SizeLws => ChunkedState::read_size_lws(cx, body),
236 Extension => ChunkedState::read_extension(cx, body, extensions_cnt),
237 SizeLf => ChunkedState::read_size_lf(cx, body, *size),
238 Body => ChunkedState::read_body(cx, body, size, buf),
239 BodyCr => ChunkedState::read_body_cr(cx, body),
240 BodyLf => ChunkedState::read_body_lf(cx, body),
241 Trailer => ChunkedState::read_trailer(cx, body),
242 TrailerLf => ChunkedState::read_trailer_lf(cx, body),
243 EndCr => ChunkedState::read_end_cr(cx, body),
244 EndLf => ChunkedState::read_end_lf(cx, body),
245 End => Poll::Ready(Ok(ChunkedState::End)),
246 }
247 }
248
249 fn read_start<R: MemRead>(
250 cx: &mut Context<'_>,
251 rdr: &mut R,
252 size: &mut u64,
253 ) -> Poll<Result<ChunkedState, io::Error>> {
254 trace!("Read chunk start");
255
256 let radix = 16;
257 match byte!(rdr, cx) {
258 b @ b'0'..=b'9' => {
259 *size = or_overflow!(size.checked_mul(radix));
260 *size = or_overflow!(size.checked_add((b - b'0') as u64));
261 }
262 b @ b'a'..=b'f' => {
263 *size = or_overflow!(size.checked_mul(radix));
264 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
265 }
266 b @ b'A'..=b'F' => {
267 *size = or_overflow!(size.checked_mul(radix));
268 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
269 }
270 _ => {
271 return Poll::Ready(Err(io::Error::new(
272 io::ErrorKind::InvalidInput,
273 "Invalid chunk size line: missing size digit",
274 )));
275 }
276 }
277
278 Poll::Ready(Ok(ChunkedState::Size))
279 }
280
281 fn read_size<R: MemRead>(
282 cx: &mut Context<'_>,
283 rdr: &mut R,
284 size: &mut u64,
285 ) -> Poll<Result<ChunkedState, io::Error>> {
286 trace!("Read chunk hex size");
287
288 let radix = 16;
289 match byte!(rdr, cx) {
290 b @ b'0'..=b'9' => {
291 *size = or_overflow!(size.checked_mul(radix));
292 *size = or_overflow!(size.checked_add((b - b'0') as u64));
293 }
294 b @ b'a'..=b'f' => {
295 *size = or_overflow!(size.checked_mul(radix));
296 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
297 }
298 b @ b'A'..=b'F' => {
299 *size = or_overflow!(size.checked_mul(radix));
300 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
301 }
302 b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)),
303 b';' => return Poll::Ready(Ok(ChunkedState::Extension)),
304 b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)),
305 _ => {
306 return Poll::Ready(Err(io::Error::new(
307 io::ErrorKind::InvalidInput,
308 "Invalid chunk size line: Invalid Size",
309 )));
310 }
311 }
312 Poll::Ready(Ok(ChunkedState::Size))
313 }
314 fn read_size_lws<R: MemRead>(
315 cx: &mut Context<'_>,
316 rdr: &mut R,
317 ) -> Poll<Result<ChunkedState, io::Error>> {
318 trace!("read_size_lws");
319 match byte!(rdr, cx) {
320 // LWS can follow the chunk size, but no more digits can come
321 b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)),
322 b';' => Poll::Ready(Ok(ChunkedState::Extension)),
323 b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
324 _ => Poll::Ready(Err(io::Error::new(
325 io::ErrorKind::InvalidInput,
326 "Invalid chunk size linear white space",
327 ))),
328 }
329 }
330 fn read_extension<R: MemRead>(
331 cx: &mut Context<'_>,
332 rdr: &mut R,
333 extensions_cnt: &mut u64,
334 ) -> Poll<Result<ChunkedState, io::Error>> {
335 trace!("read_extension");
336 // We don't care about extensions really at all. Just ignore them.
337 // They "end" at the next CRLF.
338 //
339 // However, some implementations may not check for the CR, so to save
340 // them from themselves, we reject extensions containing plain LF as
341 // well.
342 match byte!(rdr, cx) {
343 b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
344 b'\n' => Poll::Ready(Err(io::Error::new(
345 io::ErrorKind::InvalidData,
346 "invalid chunk extension contains newline",
347 ))),
348 _ => {
349 *extensions_cnt += 1;
350 if *extensions_cnt >= CHUNKED_EXTENSIONS_LIMIT {
351 Poll::Ready(Err(io::Error::new(
352 io::ErrorKind::InvalidData,
353 "chunk extensions over limit",
354 )))
355 } else {
356 Poll::Ready(Ok(ChunkedState::Extension))
357 }
358 } // no supported extensions
359 }
360 }
361 fn read_size_lf<R: MemRead>(
362 cx: &mut Context<'_>,
363 rdr: &mut R,
364 size: u64,
365 ) -> Poll<Result<ChunkedState, io::Error>> {
366 trace!("Chunk size is {:?}", size);
367 match byte!(rdr, cx) {
368 b'\n' => {
369 if size == 0 {
370 Poll::Ready(Ok(ChunkedState::EndCr))
371 } else {
372 debug!("incoming chunked header: {0:#X} ({0} bytes)", size);
373 Poll::Ready(Ok(ChunkedState::Body))
374 }
375 }
376 _ => Poll::Ready(Err(io::Error::new(
377 io::ErrorKind::InvalidInput,
378 "Invalid chunk size LF",
379 ))),
380 }
381 }
382
383 fn read_body<R: MemRead>(
384 cx: &mut Context<'_>,
385 rdr: &mut R,
386 rem: &mut u64,
387 buf: &mut Option<Bytes>,
388 ) -> Poll<Result<ChunkedState, io::Error>> {
389 trace!("Chunked read, remaining={:?}", rem);
390
391 // cap remaining bytes at the max capacity of usize
392 let rem_cap = match *rem {
393 r if r > usize::MAX as u64 => usize::MAX,
394 r => r as usize,
395 };
396
397 let to_read = rem_cap;
398 let slice = ready!(rdr.read_mem(cx, to_read))?;
399 let count = slice.len();
400
401 if count == 0 {
402 *rem = 0;
403 return Poll::Ready(Err(io::Error::new(
404 io::ErrorKind::UnexpectedEof,
405 IncompleteBody,
406 )));
407 }
408 *buf = Some(slice);
409 *rem -= count as u64;
410
411 if *rem > 0 {
412 Poll::Ready(Ok(ChunkedState::Body))
413 } else {
414 Poll::Ready(Ok(ChunkedState::BodyCr))
415 }
416 }
417 fn read_body_cr<R: MemRead>(
418 cx: &mut Context<'_>,
419 rdr: &mut R,
420 ) -> Poll<Result<ChunkedState, io::Error>> {
421 match byte!(rdr, cx) {
422 b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)),
423 _ => Poll::Ready(Err(io::Error::new(
424 io::ErrorKind::InvalidInput,
425 "Invalid chunk body CR",
426 ))),
427 }
428 }
429 fn read_body_lf<R: MemRead>(
430 cx: &mut Context<'_>,
431 rdr: &mut R,
432 ) -> Poll<Result<ChunkedState, io::Error>> {
433 match byte!(rdr, cx) {
434 b'\n' => Poll::Ready(Ok(ChunkedState::Size)),
435 _ => Poll::Ready(Err(io::Error::new(
436 io::ErrorKind::InvalidInput,
437 "Invalid chunk body LF",
438 ))),
439 }
440 }
441
442 fn read_trailer<R: MemRead>(
443 cx: &mut Context<'_>,
444 rdr: &mut R,
445 ) -> Poll<Result<ChunkedState, io::Error>> {
446 trace!("read_trailer");
447 match byte!(rdr, cx) {
448 b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)),
449 _ => Poll::Ready(Ok(ChunkedState::Trailer)),
450 }
451 }
452 fn read_trailer_lf<R: MemRead>(
453 cx: &mut Context<'_>,
454 rdr: &mut R,
455 ) -> Poll<Result<ChunkedState, io::Error>> {
456 match byte!(rdr, cx) {
457 b'\n' => Poll::Ready(Ok(ChunkedState::EndCr)),
458 _ => Poll::Ready(Err(io::Error::new(
459 io::ErrorKind::InvalidInput,
460 "Invalid trailer end LF",
461 ))),
462 }
463 }
464
465 fn read_end_cr<R: MemRead>(
466 cx: &mut Context<'_>,
467 rdr: &mut R,
468 ) -> Poll<Result<ChunkedState, io::Error>> {
469 match byte!(rdr, cx) {
470 b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)),
471 _ => Poll::Ready(Ok(ChunkedState::Trailer)),
472 }
473 }
474 fn read_end_lf<R: MemRead>(
475 cx: &mut Context<'_>,
476 rdr: &mut R,
477 ) -> Poll<Result<ChunkedState, io::Error>> {
478 match byte!(rdr, cx) {
479 b'\n' => Poll::Ready(Ok(ChunkedState::End)),
480 _ => Poll::Ready(Err(io::Error::new(
481 io::ErrorKind::InvalidInput,
482 "Invalid chunk end LF",
483 ))),
484 }
485 }
486}
487
488#[derive(Debug)]
489struct IncompleteBody;
490
491impl fmt::Display for IncompleteBody {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 write!(f, "end of file before message length reached")
494 }
495}
496
497impl StdError for IncompleteBody {}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502 use std::pin::Pin;
503 use std::time::Duration;
504 use tokio::io::{AsyncRead, ReadBuf};
505
506 impl<'a> MemRead for &'a [u8] {
507 fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
508 let n = std::cmp::min(len, self.len());
509 if n > 0 {
510 let (a, b) = self.split_at(n);
511 let buf = Bytes::copy_from_slice(a);
512 *self = b;
513 Poll::Ready(Ok(buf))
514 } else {
515 Poll::Ready(Ok(Bytes::new()))
516 }
517 }
518 }
519
520 impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) {
521 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
522 let mut v = vec![0; len];
523 let mut buf = ReadBuf::new(&mut v);
524 ready!(Pin::new(self).poll_read(cx, &mut buf)?);
525 Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled())))
526 }
527 }
528
529 impl MemRead for Bytes {
530 fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
531 let n = std::cmp::min(len, self.len());
532 let ret = self.split_to(n);
533 Poll::Ready(Ok(ret))
534 }
535 }
536
537 /*
538 use std::io;
539 use std::io::Write;
540 use super::Decoder;
541 use super::ChunkedState;
542 use futures::{Async, Poll};
543 use bytes::{BytesMut, Bytes};
544 use crate::mock::AsyncIo;
545 */
546
547 #[tokio::test]
548 async fn test_read_chunk_size() {
549 use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof};
550
551 async fn read(s: &str) -> u64 {
552 let mut state = ChunkedState::new();
553 let rdr = &mut s.as_bytes();
554 let mut size = 0;
555 let mut ext_cnt = 0;
556 loop {
557 let result = futures_util::future::poll_fn(|cx| {
558 state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None)
559 })
560 .await;
561 let desc = format!("read_size failed for {:?}", s);
562 state = result.expect(desc.as_str());
563 if state == ChunkedState::Body || state == ChunkedState::EndCr {
564 break;
565 }
566 }
567 size
568 }
569
570 async fn read_err(s: &str, expected_err: io::ErrorKind) {
571 let mut state = ChunkedState::new();
572 let rdr = &mut s.as_bytes();
573 let mut size = 0;
574 let mut ext_cnt = 0;
575 loop {
576 let result = futures_util::future::poll_fn(|cx| {
577 state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None)
578 })
579 .await;
580 state = match result {
581 Ok(s) => s,
582 Err(e) => {
583 assert!(
584 expected_err == e.kind(),
585 "Reading {:?}, expected {:?}, but got {:?}",
586 s,
587 expected_err,
588 e.kind()
589 );
590 return;
591 }
592 };
593 if state == ChunkedState::Body || state == ChunkedState::End {
594 panic!("Was Ok. Expected Err for {:?}", s);
595 }
596 }
597 }
598
599 assert_eq!(1, read("1\r\n").await);
600 assert_eq!(1, read("01\r\n").await);
601 assert_eq!(0, read("0\r\n").await);
602 assert_eq!(0, read("00\r\n").await);
603 assert_eq!(10, read("A\r\n").await);
604 assert_eq!(10, read("a\r\n").await);
605 assert_eq!(255, read("Ff\r\n").await);
606 assert_eq!(255, read("Ff \r\n").await);
607 // Missing LF or CRLF
608 read_err("F\rF", InvalidInput).await;
609 read_err("F", UnexpectedEof).await;
610 // Missing digit
611 read_err("\r\n\r\n", InvalidInput).await;
612 read_err("\r\n", InvalidInput).await;
613 // Invalid hex digit
614 read_err("X\r\n", InvalidInput).await;
615 read_err("1X\r\n", InvalidInput).await;
616 read_err("-\r\n", InvalidInput).await;
617 read_err("-1\r\n", InvalidInput).await;
618 // Acceptable (if not fully valid) extensions do not influence the size
619 assert_eq!(1, read("1;extension\r\n").await);
620 assert_eq!(10, read("a;ext name=value\r\n").await);
621 assert_eq!(1, read("1;extension;extension2\r\n").await);
622 assert_eq!(1, read("1;;; ;\r\n").await);
623 assert_eq!(2, read("2; extension...\r\n").await);
624 assert_eq!(3, read("3 ; extension=123\r\n").await);
625 assert_eq!(3, read("3 ;\r\n").await);
626 assert_eq!(3, read("3 ; \r\n").await);
627 // Invalid extensions cause an error
628 read_err("1 invalid extension\r\n", InvalidInput).await;
629 read_err("1 A\r\n", InvalidInput).await;
630 read_err("1;no CRLF", UnexpectedEof).await;
631 read_err("1;reject\nnewlines\r\n", InvalidData).await;
632 // Overflow
633 read_err("f0000000000000003\r\n", InvalidData).await;
634 }
635
636 #[tokio::test]
637 async fn test_read_sized_early_eof() {
638 let mut bytes = &b"foo bar"[..];
639 let mut decoder = Decoder::length(10);
640 assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
641 let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
642 assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
643 }
644
645 #[tokio::test]
646 async fn test_read_chunked_early_eof() {
647 let mut bytes = &b"\
648 9\r\n\
649 foo bar\
650 "[..];
651 let mut decoder = Decoder::chunked();
652 assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
653 let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
654 assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
655 }
656
657 #[tokio::test]
658 async fn test_read_chunked_single_read() {
659 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..];
660 let buf = Decoder::chunked()
661 .decode_fut(&mut mock_buf)
662 .await
663 .expect("decode");
664 assert_eq!(16, buf.len());
665 let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
666 assert_eq!("1234567890abcdef", &result);
667 }
668
669 #[tokio::test]
670 async fn test_read_chunked_extensions_over_limit() {
671 // construct a chunked body where each individual chunked extension
672 // is totally fine, but combined is over the limit.
673 let per_chunk = super::CHUNKED_EXTENSIONS_LIMIT * 2 / 3;
674 let mut scratch = vec![];
675 for _ in 0..2 {
676 scratch.extend(b"1;");
677 scratch.extend(b"x".repeat(per_chunk as usize));
678 scratch.extend(b"\r\nA\r\n");
679 }
680 scratch.extend(b"0\r\n\r\n");
681 let mut mock_buf = Bytes::from(scratch);
682
683 let mut decoder = Decoder::chunked();
684 let buf1 = decoder.decode_fut(&mut mock_buf).await.expect("decode1");
685 assert_eq!(&buf1[..], b"A");
686
687 let err = decoder
688 .decode_fut(&mut mock_buf)
689 .await
690 .expect_err("decode2");
691 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
692 assert_eq!(err.to_string(), "chunk extensions over limit");
693 }
694
695 #[cfg(not(miri))]
696 #[tokio::test]
697 async fn test_read_chunked_trailer_with_missing_lf() {
698 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..];
699 let mut decoder = Decoder::chunked();
700 decoder.decode_fut(&mut mock_buf).await.expect("decode");
701 let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err();
702 assert_eq!(e.kind(), io::ErrorKind::InvalidInput);
703 }
704
705 #[tokio::test]
706 async fn test_read_chunked_after_eof() {
707 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..];
708 let mut decoder = Decoder::chunked();
709
710 // normal read
711 let buf = decoder.decode_fut(&mut mock_buf).await.unwrap();
712 assert_eq!(16, buf.len());
713 let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
714 assert_eq!("1234567890abcdef", &result);
715
716 // eof read
717 let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
718 assert_eq!(0, buf.len());
719
720 // ensure read after eof also returns eof
721 let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
722 assert_eq!(0, buf.len());
723 }
724
725 // perform an async read using a custom buffer size and causing a blocking
726 // read at the specified byte
727 async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String {
728 let mut outs = Vec::new();
729
730 let mut ins = if block_at == 0 {
731 tokio_test::io::Builder::new()
732 .wait(Duration::from_millis(10))
733 .read(content)
734 .build()
735 } else {
736 tokio_test::io::Builder::new()
737 .read(&content[..block_at])
738 .wait(Duration::from_millis(10))
739 .read(&content[block_at..])
740 .build()
741 };
742
743 let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin);
744
745 loop {
746 let buf = decoder
747 .decode_fut(&mut ins)
748 .await
749 .expect("unexpected decode error");
750 if buf.is_empty() {
751 break; // eof
752 }
753 outs.extend(buf.as_ref());
754 }
755
756 String::from_utf8(outs).expect("decode String")
757 }
758
759 // iterate over the different ways that this async read could go.
760 // tests blocking a read at each byte along the content - The shotgun approach
761 async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) {
762 let content_len = content.len();
763 for block_at in 0..content_len {
764 let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await;
765 assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at);
766 }
767 }
768
769 #[tokio::test]
770 async fn test_read_length_async() {
771 let content = "foobar";
772 all_async_cases(content, content, Decoder::length(content.len() as u64)).await;
773 }
774
775 #[tokio::test]
776 async fn test_read_chunked_async() {
777 let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n";
778 let expected = "foobar";
779 all_async_cases(content, expected, Decoder::chunked()).await;
780 }
781
782 #[tokio::test]
783 async fn test_read_eof_async() {
784 let content = "foobar";
785 all_async_cases(content, content, Decoder::eof()).await;
786 }
787
788 #[cfg(feature = "nightly")]
789 #[bench]
790 fn bench_decode_chunked_1kb(b: &mut test::Bencher) {
791 let rt = new_runtime();
792
793 const LEN: usize = 1024;
794 let mut vec = Vec::new();
795 vec.extend(format!("{:x}\r\n", LEN).as_bytes());
796 vec.extend(&[0; LEN][..]);
797 vec.extend(b"\r\n");
798 let content = Bytes::from(vec);
799
800 b.bytes = LEN as u64;
801
802 b.iter(|| {
803 let mut decoder = Decoder::chunked();
804 rt.block_on(async {
805 let mut raw = content.clone();
806 let chunk = decoder.decode_fut(&mut raw).await.unwrap();
807 assert_eq!(chunk.len(), LEN);
808 });
809 });
810 }
811
812 #[cfg(feature = "nightly")]
813 #[bench]
814 fn bench_decode_length_1kb(b: &mut test::Bencher) {
815 let rt = new_runtime();
816
817 const LEN: usize = 1024;
818 let content = Bytes::from(&[0; LEN][..]);
819 b.bytes = LEN as u64;
820
821 b.iter(|| {
822 let mut decoder = Decoder::length(LEN as u64);
823 rt.block_on(async {
824 let mut raw = content.clone();
825 let chunk = decoder.decode_fut(&mut raw).await.unwrap();
826 assert_eq!(chunk.len(), LEN);
827 });
828 });
829 }
830
831 #[cfg(feature = "nightly")]
832 fn new_runtime() -> tokio::runtime::Runtime {
833 tokio::runtime::Builder::new_current_thread()
834 .enable_all()
835 .build()
836 .expect("rt build")
837 }
838}
839