1 | use std::error::Error as StdError; |
2 | use std::fmt; |
3 | use std::io; |
4 | use std::task::{Context, Poll}; |
5 | use std::usize; |
6 | |
7 | use bytes::Bytes; |
8 | use tracing::{debug, trace}; |
9 | |
10 | use super::io::MemRead; |
11 | use super::DecodedLength; |
12 | |
13 | use self::Kind::{Chunked, Eof, Length}; |
14 | |
15 | /// Maximum amount of bytes allowed in chunked extensions. |
16 | /// |
17 | /// This limit is currentlty applied for the entire body, not per chunk. |
18 | const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16; |
19 | |
20 | /// Decoders to handle different Transfer-Encodings. |
21 | /// |
22 | /// If a message body does not include a Transfer-Encoding, it *should* |
23 | /// include a Content-Length header. |
24 | #[derive (Clone, PartialEq)] |
25 | pub(crate) struct Decoder { |
26 | kind: Kind, |
27 | } |
28 | |
29 | #[derive (Debug, Clone, Copy, PartialEq)] |
30 | enum Kind { |
31 | /// A Reader used when a Content-Length header is passed with a positive integer. |
32 | Length(u64), |
33 | /// A Reader used when Transfer-Encoding is `chunked`. |
34 | Chunked { |
35 | state: ChunkedState, |
36 | chunk_len: u64, |
37 | extensions_cnt: u64, |
38 | }, |
39 | /// A Reader used for responses that don't indicate a length or chunked. |
40 | /// |
41 | /// The bool tracks when EOF is seen on the transport. |
42 | /// |
43 | /// Note: This should only used for `Response`s. It is illegal for a |
44 | /// `Request` to be made with both `Content-Length` and |
45 | /// `Transfer-Encoding: chunked` missing, as explained from the spec: |
46 | /// |
47 | /// > If a Transfer-Encoding header field is present in a response and |
48 | /// > the chunked transfer coding is not the final encoding, the |
49 | /// > message body length is determined by reading the connection until |
50 | /// > it is closed by the server. If a Transfer-Encoding header field |
51 | /// > is present in a request and the chunked transfer coding is not |
52 | /// > the final encoding, the message body length cannot be determined |
53 | /// > reliably; the server MUST respond with the 400 (Bad Request) |
54 | /// > status code and then close the connection. |
55 | Eof(bool), |
56 | } |
57 | |
58 | #[derive (Debug, PartialEq, Clone, Copy)] |
59 | enum ChunkedState { |
60 | Start, |
61 | Size, |
62 | SizeLws, |
63 | Extension, |
64 | SizeLf, |
65 | Body, |
66 | BodyCr, |
67 | BodyLf, |
68 | Trailer, |
69 | TrailerLf, |
70 | EndCr, |
71 | EndLf, |
72 | End, |
73 | } |
74 | |
75 | impl Decoder { |
76 | // constructors |
77 | |
78 | pub(crate) fn length(x: u64) -> Decoder { |
79 | Decoder { |
80 | kind: Kind::Length(x), |
81 | } |
82 | } |
83 | |
84 | pub(crate) fn chunked() -> Decoder { |
85 | Decoder { |
86 | kind: Kind::Chunked { |
87 | state: ChunkedState::new(), |
88 | chunk_len: 0, |
89 | extensions_cnt: 0, |
90 | }, |
91 | } |
92 | } |
93 | |
94 | pub(crate) fn eof() -> Decoder { |
95 | Decoder { |
96 | kind: Kind::Eof(false), |
97 | } |
98 | } |
99 | |
100 | pub(super) fn new(len: DecodedLength) -> Self { |
101 | match len { |
102 | DecodedLength::CHUNKED => Decoder::chunked(), |
103 | DecodedLength::CLOSE_DELIMITED => Decoder::eof(), |
104 | length => Decoder::length(length.danger_len()), |
105 | } |
106 | } |
107 | |
108 | // methods |
109 | |
110 | pub(crate) fn is_eof(&self) -> bool { |
111 | matches!( |
112 | self.kind, |
113 | Length(0) |
114 | | Chunked { |
115 | state: ChunkedState::End, |
116 | .. |
117 | } |
118 | | Eof(true) |
119 | ) |
120 | } |
121 | |
122 | pub(crate) fn decode<R: MemRead>( |
123 | &mut self, |
124 | cx: &mut Context<'_>, |
125 | body: &mut R, |
126 | ) -> Poll<Result<Bytes, io::Error>> { |
127 | trace!("decode; state= {:?}" , self.kind); |
128 | match self.kind { |
129 | Length(ref mut remaining) => { |
130 | if *remaining == 0 { |
131 | Poll::Ready(Ok(Bytes::new())) |
132 | } else { |
133 | let to_read = *remaining as usize; |
134 | let buf = ready!(body.read_mem(cx, to_read))?; |
135 | let num = buf.as_ref().len() as u64; |
136 | if num > *remaining { |
137 | *remaining = 0; |
138 | } else if num == 0 { |
139 | return Poll::Ready(Err(io::Error::new( |
140 | io::ErrorKind::UnexpectedEof, |
141 | IncompleteBody, |
142 | ))); |
143 | } else { |
144 | *remaining -= num; |
145 | } |
146 | Poll::Ready(Ok(buf)) |
147 | } |
148 | } |
149 | Chunked { |
150 | ref mut state, |
151 | ref mut chunk_len, |
152 | ref mut extensions_cnt, |
153 | } => { |
154 | loop { |
155 | let mut buf = None; |
156 | // advances the chunked state |
157 | *state = ready!(state.step(cx, body, chunk_len, extensions_cnt, &mut buf))?; |
158 | if *state == ChunkedState::End { |
159 | trace!("end of chunked" ); |
160 | return Poll::Ready(Ok(Bytes::new())); |
161 | } |
162 | if let Some(buf) = buf { |
163 | return Poll::Ready(Ok(buf)); |
164 | } |
165 | } |
166 | } |
167 | Eof(ref mut is_eof) => { |
168 | if *is_eof { |
169 | Poll::Ready(Ok(Bytes::new())) |
170 | } else { |
171 | // 8192 chosen because its about 2 packets, there probably |
172 | // won't be that much available, so don't have MemReaders |
173 | // allocate buffers to big |
174 | body.read_mem(cx, 8192).map_ok(|slice| { |
175 | *is_eof = slice.is_empty(); |
176 | slice |
177 | }) |
178 | } |
179 | } |
180 | } |
181 | } |
182 | |
183 | #[cfg (test)] |
184 | async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> { |
185 | futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await |
186 | } |
187 | } |
188 | |
189 | impl fmt::Debug for Decoder { |
190 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
191 | fmt::Debug::fmt(&self.kind, f) |
192 | } |
193 | } |
194 | |
195 | macro_rules! byte ( |
196 | ($rdr:ident, $cx:expr) => ({ |
197 | let buf = ready!($rdr.read_mem($cx, 1))?; |
198 | if !buf.is_empty() { |
199 | buf[0] |
200 | } else { |
201 | return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof, |
202 | "unexpected EOF during chunk size line" ))); |
203 | } |
204 | }) |
205 | ); |
206 | |
207 | macro_rules! or_overflow { |
208 | ($e:expr) => ( |
209 | match $e { |
210 | Some(val) => val, |
211 | None => return Poll::Ready(Err(io::Error::new( |
212 | io::ErrorKind::InvalidData, |
213 | "invalid chunk size: overflow" , |
214 | ))), |
215 | } |
216 | ) |
217 | } |
218 | |
219 | impl ChunkedState { |
220 | fn new() -> ChunkedState { |
221 | ChunkedState::Start |
222 | } |
223 | fn step<R: MemRead>( |
224 | &self, |
225 | cx: &mut Context<'_>, |
226 | body: &mut R, |
227 | size: &mut u64, |
228 | extensions_cnt: &mut u64, |
229 | buf: &mut Option<Bytes>, |
230 | ) -> Poll<Result<ChunkedState, io::Error>> { |
231 | use self::ChunkedState::*; |
232 | match *self { |
233 | Start => ChunkedState::read_start(cx, body, size), |
234 | Size => ChunkedState::read_size(cx, body, size), |
235 | SizeLws => ChunkedState::read_size_lws(cx, body), |
236 | Extension => ChunkedState::read_extension(cx, body, extensions_cnt), |
237 | SizeLf => ChunkedState::read_size_lf(cx, body, *size), |
238 | Body => ChunkedState::read_body(cx, body, size, buf), |
239 | BodyCr => ChunkedState::read_body_cr(cx, body), |
240 | BodyLf => ChunkedState::read_body_lf(cx, body), |
241 | Trailer => ChunkedState::read_trailer(cx, body), |
242 | TrailerLf => ChunkedState::read_trailer_lf(cx, body), |
243 | EndCr => ChunkedState::read_end_cr(cx, body), |
244 | EndLf => ChunkedState::read_end_lf(cx, body), |
245 | End => Poll::Ready(Ok(ChunkedState::End)), |
246 | } |
247 | } |
248 | |
249 | fn read_start<R: MemRead>( |
250 | cx: &mut Context<'_>, |
251 | rdr: &mut R, |
252 | size: &mut u64, |
253 | ) -> Poll<Result<ChunkedState, io::Error>> { |
254 | trace!("Read chunk start" ); |
255 | |
256 | let radix = 16; |
257 | match byte!(rdr, cx) { |
258 | b @ b'0' ..=b'9' => { |
259 | *size = or_overflow!(size.checked_mul(radix)); |
260 | *size = or_overflow!(size.checked_add((b - b'0' ) as u64)); |
261 | } |
262 | b @ b'a' ..=b'f' => { |
263 | *size = or_overflow!(size.checked_mul(radix)); |
264 | *size = or_overflow!(size.checked_add((b + 10 - b'a' ) as u64)); |
265 | } |
266 | b @ b'A' ..=b'F' => { |
267 | *size = or_overflow!(size.checked_mul(radix)); |
268 | *size = or_overflow!(size.checked_add((b + 10 - b'A' ) as u64)); |
269 | } |
270 | _ => { |
271 | return Poll::Ready(Err(io::Error::new( |
272 | io::ErrorKind::InvalidInput, |
273 | "Invalid chunk size line: missing size digit" , |
274 | ))); |
275 | } |
276 | } |
277 | |
278 | Poll::Ready(Ok(ChunkedState::Size)) |
279 | } |
280 | |
281 | fn read_size<R: MemRead>( |
282 | cx: &mut Context<'_>, |
283 | rdr: &mut R, |
284 | size: &mut u64, |
285 | ) -> Poll<Result<ChunkedState, io::Error>> { |
286 | trace!("Read chunk hex size" ); |
287 | |
288 | let radix = 16; |
289 | match byte!(rdr, cx) { |
290 | b @ b'0' ..=b'9' => { |
291 | *size = or_overflow!(size.checked_mul(radix)); |
292 | *size = or_overflow!(size.checked_add((b - b'0' ) as u64)); |
293 | } |
294 | b @ b'a' ..=b'f' => { |
295 | *size = or_overflow!(size.checked_mul(radix)); |
296 | *size = or_overflow!(size.checked_add((b + 10 - b'a' ) as u64)); |
297 | } |
298 | b @ b'A' ..=b'F' => { |
299 | *size = or_overflow!(size.checked_mul(radix)); |
300 | *size = or_overflow!(size.checked_add((b + 10 - b'A' ) as u64)); |
301 | } |
302 | b' \t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)), |
303 | b';' => return Poll::Ready(Ok(ChunkedState::Extension)), |
304 | b' \r' => return Poll::Ready(Ok(ChunkedState::SizeLf)), |
305 | _ => { |
306 | return Poll::Ready(Err(io::Error::new( |
307 | io::ErrorKind::InvalidInput, |
308 | "Invalid chunk size line: Invalid Size" , |
309 | ))); |
310 | } |
311 | } |
312 | Poll::Ready(Ok(ChunkedState::Size)) |
313 | } |
314 | fn read_size_lws<R: MemRead>( |
315 | cx: &mut Context<'_>, |
316 | rdr: &mut R, |
317 | ) -> Poll<Result<ChunkedState, io::Error>> { |
318 | trace!("read_size_lws" ); |
319 | match byte!(rdr, cx) { |
320 | // LWS can follow the chunk size, but no more digits can come |
321 | b' \t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)), |
322 | b';' => Poll::Ready(Ok(ChunkedState::Extension)), |
323 | b' \r' => Poll::Ready(Ok(ChunkedState::SizeLf)), |
324 | _ => Poll::Ready(Err(io::Error::new( |
325 | io::ErrorKind::InvalidInput, |
326 | "Invalid chunk size linear white space" , |
327 | ))), |
328 | } |
329 | } |
330 | fn read_extension<R: MemRead>( |
331 | cx: &mut Context<'_>, |
332 | rdr: &mut R, |
333 | extensions_cnt: &mut u64, |
334 | ) -> Poll<Result<ChunkedState, io::Error>> { |
335 | trace!("read_extension" ); |
336 | // We don't care about extensions really at all. Just ignore them. |
337 | // They "end" at the next CRLF. |
338 | // |
339 | // However, some implementations may not check for the CR, so to save |
340 | // them from themselves, we reject extensions containing plain LF as |
341 | // well. |
342 | match byte!(rdr, cx) { |
343 | b' \r' => Poll::Ready(Ok(ChunkedState::SizeLf)), |
344 | b' \n' => Poll::Ready(Err(io::Error::new( |
345 | io::ErrorKind::InvalidData, |
346 | "invalid chunk extension contains newline" , |
347 | ))), |
348 | _ => { |
349 | *extensions_cnt += 1; |
350 | if *extensions_cnt >= CHUNKED_EXTENSIONS_LIMIT { |
351 | Poll::Ready(Err(io::Error::new( |
352 | io::ErrorKind::InvalidData, |
353 | "chunk extensions over limit" , |
354 | ))) |
355 | } else { |
356 | Poll::Ready(Ok(ChunkedState::Extension)) |
357 | } |
358 | } // no supported extensions |
359 | } |
360 | } |
361 | fn read_size_lf<R: MemRead>( |
362 | cx: &mut Context<'_>, |
363 | rdr: &mut R, |
364 | size: u64, |
365 | ) -> Poll<Result<ChunkedState, io::Error>> { |
366 | trace!("Chunk size is {:?}" , size); |
367 | match byte!(rdr, cx) { |
368 | b' \n' => { |
369 | if size == 0 { |
370 | Poll::Ready(Ok(ChunkedState::EndCr)) |
371 | } else { |
372 | debug!("incoming chunked header: {0:#X} ( {0} bytes)" , size); |
373 | Poll::Ready(Ok(ChunkedState::Body)) |
374 | } |
375 | } |
376 | _ => Poll::Ready(Err(io::Error::new( |
377 | io::ErrorKind::InvalidInput, |
378 | "Invalid chunk size LF" , |
379 | ))), |
380 | } |
381 | } |
382 | |
383 | fn read_body<R: MemRead>( |
384 | cx: &mut Context<'_>, |
385 | rdr: &mut R, |
386 | rem: &mut u64, |
387 | buf: &mut Option<Bytes>, |
388 | ) -> Poll<Result<ChunkedState, io::Error>> { |
389 | trace!("Chunked read, remaining= {:?}" , rem); |
390 | |
391 | // cap remaining bytes at the max capacity of usize |
392 | let rem_cap = match *rem { |
393 | r if r > usize::MAX as u64 => usize::MAX, |
394 | r => r as usize, |
395 | }; |
396 | |
397 | let to_read = rem_cap; |
398 | let slice = ready!(rdr.read_mem(cx, to_read))?; |
399 | let count = slice.len(); |
400 | |
401 | if count == 0 { |
402 | *rem = 0; |
403 | return Poll::Ready(Err(io::Error::new( |
404 | io::ErrorKind::UnexpectedEof, |
405 | IncompleteBody, |
406 | ))); |
407 | } |
408 | *buf = Some(slice); |
409 | *rem -= count as u64; |
410 | |
411 | if *rem > 0 { |
412 | Poll::Ready(Ok(ChunkedState::Body)) |
413 | } else { |
414 | Poll::Ready(Ok(ChunkedState::BodyCr)) |
415 | } |
416 | } |
417 | fn read_body_cr<R: MemRead>( |
418 | cx: &mut Context<'_>, |
419 | rdr: &mut R, |
420 | ) -> Poll<Result<ChunkedState, io::Error>> { |
421 | match byte!(rdr, cx) { |
422 | b' \r' => Poll::Ready(Ok(ChunkedState::BodyLf)), |
423 | _ => Poll::Ready(Err(io::Error::new( |
424 | io::ErrorKind::InvalidInput, |
425 | "Invalid chunk body CR" , |
426 | ))), |
427 | } |
428 | } |
429 | fn read_body_lf<R: MemRead>( |
430 | cx: &mut Context<'_>, |
431 | rdr: &mut R, |
432 | ) -> Poll<Result<ChunkedState, io::Error>> { |
433 | match byte!(rdr, cx) { |
434 | b' \n' => Poll::Ready(Ok(ChunkedState::Size)), |
435 | _ => Poll::Ready(Err(io::Error::new( |
436 | io::ErrorKind::InvalidInput, |
437 | "Invalid chunk body LF" , |
438 | ))), |
439 | } |
440 | } |
441 | |
442 | fn read_trailer<R: MemRead>( |
443 | cx: &mut Context<'_>, |
444 | rdr: &mut R, |
445 | ) -> Poll<Result<ChunkedState, io::Error>> { |
446 | trace!("read_trailer" ); |
447 | match byte!(rdr, cx) { |
448 | b' \r' => Poll::Ready(Ok(ChunkedState::TrailerLf)), |
449 | _ => Poll::Ready(Ok(ChunkedState::Trailer)), |
450 | } |
451 | } |
452 | fn read_trailer_lf<R: MemRead>( |
453 | cx: &mut Context<'_>, |
454 | rdr: &mut R, |
455 | ) -> Poll<Result<ChunkedState, io::Error>> { |
456 | match byte!(rdr, cx) { |
457 | b' \n' => Poll::Ready(Ok(ChunkedState::EndCr)), |
458 | _ => Poll::Ready(Err(io::Error::new( |
459 | io::ErrorKind::InvalidInput, |
460 | "Invalid trailer end LF" , |
461 | ))), |
462 | } |
463 | } |
464 | |
465 | fn read_end_cr<R: MemRead>( |
466 | cx: &mut Context<'_>, |
467 | rdr: &mut R, |
468 | ) -> Poll<Result<ChunkedState, io::Error>> { |
469 | match byte!(rdr, cx) { |
470 | b' \r' => Poll::Ready(Ok(ChunkedState::EndLf)), |
471 | _ => Poll::Ready(Ok(ChunkedState::Trailer)), |
472 | } |
473 | } |
474 | fn read_end_lf<R: MemRead>( |
475 | cx: &mut Context<'_>, |
476 | rdr: &mut R, |
477 | ) -> Poll<Result<ChunkedState, io::Error>> { |
478 | match byte!(rdr, cx) { |
479 | b' \n' => Poll::Ready(Ok(ChunkedState::End)), |
480 | _ => Poll::Ready(Err(io::Error::new( |
481 | io::ErrorKind::InvalidInput, |
482 | "Invalid chunk end LF" , |
483 | ))), |
484 | } |
485 | } |
486 | } |
487 | |
488 | #[derive (Debug)] |
489 | struct IncompleteBody; |
490 | |
491 | impl fmt::Display for IncompleteBody { |
492 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
493 | write!(f, "end of file before message length reached" ) |
494 | } |
495 | } |
496 | |
497 | impl StdError for IncompleteBody {} |
498 | |
499 | #[cfg (test)] |
500 | mod tests { |
501 | use super::*; |
502 | use std::pin::Pin; |
503 | use std::time::Duration; |
504 | use tokio::io::{AsyncRead, ReadBuf}; |
505 | |
506 | impl<'a> MemRead for &'a [u8] { |
507 | fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { |
508 | let n = std::cmp::min(len, self.len()); |
509 | if n > 0 { |
510 | let (a, b) = self.split_at(n); |
511 | let buf = Bytes::copy_from_slice(a); |
512 | *self = b; |
513 | Poll::Ready(Ok(buf)) |
514 | } else { |
515 | Poll::Ready(Ok(Bytes::new())) |
516 | } |
517 | } |
518 | } |
519 | |
520 | impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) { |
521 | fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { |
522 | let mut v = vec![0; len]; |
523 | let mut buf = ReadBuf::new(&mut v); |
524 | ready!(Pin::new(self).poll_read(cx, &mut buf)?); |
525 | Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled()))) |
526 | } |
527 | } |
528 | |
529 | impl MemRead for Bytes { |
530 | fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { |
531 | let n = std::cmp::min(len, self.len()); |
532 | let ret = self.split_to(n); |
533 | Poll::Ready(Ok(ret)) |
534 | } |
535 | } |
536 | |
537 | /* |
538 | use std::io; |
539 | use std::io::Write; |
540 | use super::Decoder; |
541 | use super::ChunkedState; |
542 | use futures::{Async, Poll}; |
543 | use bytes::{BytesMut, Bytes}; |
544 | use crate::mock::AsyncIo; |
545 | */ |
546 | |
547 | #[tokio::test ] |
548 | async fn test_read_chunk_size() { |
549 | use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof}; |
550 | |
551 | async fn read(s: &str) -> u64 { |
552 | let mut state = ChunkedState::new(); |
553 | let rdr = &mut s.as_bytes(); |
554 | let mut size = 0; |
555 | let mut ext_cnt = 0; |
556 | loop { |
557 | let result = futures_util::future::poll_fn(|cx| { |
558 | state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None) |
559 | }) |
560 | .await; |
561 | let desc = format!("read_size failed for {:?}" , s); |
562 | state = result.expect(desc.as_str()); |
563 | if state == ChunkedState::Body || state == ChunkedState::EndCr { |
564 | break; |
565 | } |
566 | } |
567 | size |
568 | } |
569 | |
570 | async fn read_err(s: &str, expected_err: io::ErrorKind) { |
571 | let mut state = ChunkedState::new(); |
572 | let rdr = &mut s.as_bytes(); |
573 | let mut size = 0; |
574 | let mut ext_cnt = 0; |
575 | loop { |
576 | let result = futures_util::future::poll_fn(|cx| { |
577 | state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None) |
578 | }) |
579 | .await; |
580 | state = match result { |
581 | Ok(s) => s, |
582 | Err(e) => { |
583 | assert!( |
584 | expected_err == e.kind(), |
585 | "Reading {:?}, expected {:?}, but got {:?}" , |
586 | s, |
587 | expected_err, |
588 | e.kind() |
589 | ); |
590 | return; |
591 | } |
592 | }; |
593 | if state == ChunkedState::Body || state == ChunkedState::End { |
594 | panic!("Was Ok. Expected Err for {:?}" , s); |
595 | } |
596 | } |
597 | } |
598 | |
599 | assert_eq!(1, read("1 \r\n" ).await); |
600 | assert_eq!(1, read("01 \r\n" ).await); |
601 | assert_eq!(0, read("0 \r\n" ).await); |
602 | assert_eq!(0, read("00 \r\n" ).await); |
603 | assert_eq!(10, read("A \r\n" ).await); |
604 | assert_eq!(10, read("a \r\n" ).await); |
605 | assert_eq!(255, read("Ff \r\n" ).await); |
606 | assert_eq!(255, read("Ff \r\n" ).await); |
607 | // Missing LF or CRLF |
608 | read_err("F \rF" , InvalidInput).await; |
609 | read_err("F" , UnexpectedEof).await; |
610 | // Missing digit |
611 | read_err(" \r\n\r\n" , InvalidInput).await; |
612 | read_err(" \r\n" , InvalidInput).await; |
613 | // Invalid hex digit |
614 | read_err("X \r\n" , InvalidInput).await; |
615 | read_err("1X \r\n" , InvalidInput).await; |
616 | read_err("- \r\n" , InvalidInput).await; |
617 | read_err("-1 \r\n" , InvalidInput).await; |
618 | // Acceptable (if not fully valid) extensions do not influence the size |
619 | assert_eq!(1, read("1;extension \r\n" ).await); |
620 | assert_eq!(10, read("a;ext name=value \r\n" ).await); |
621 | assert_eq!(1, read("1;extension;extension2 \r\n" ).await); |
622 | assert_eq!(1, read("1;;; ; \r\n" ).await); |
623 | assert_eq!(2, read("2; extension... \r\n" ).await); |
624 | assert_eq!(3, read("3 ; extension=123 \r\n" ).await); |
625 | assert_eq!(3, read("3 ; \r\n" ).await); |
626 | assert_eq!(3, read("3 ; \r\n" ).await); |
627 | // Invalid extensions cause an error |
628 | read_err("1 invalid extension \r\n" , InvalidInput).await; |
629 | read_err("1 A \r\n" , InvalidInput).await; |
630 | read_err("1;no CRLF" , UnexpectedEof).await; |
631 | read_err("1;reject \nnewlines \r\n" , InvalidData).await; |
632 | // Overflow |
633 | read_err("f0000000000000003 \r\n" , InvalidData).await; |
634 | } |
635 | |
636 | #[tokio::test ] |
637 | async fn test_read_sized_early_eof() { |
638 | let mut bytes = &b"foo bar" [..]; |
639 | let mut decoder = Decoder::length(10); |
640 | assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); |
641 | let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); |
642 | assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); |
643 | } |
644 | |
645 | #[tokio::test ] |
646 | async fn test_read_chunked_early_eof() { |
647 | let mut bytes = &b"\ |
648 | 9 \r\n\ |
649 | foo bar\ |
650 | " [..]; |
651 | let mut decoder = Decoder::chunked(); |
652 | assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); |
653 | let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); |
654 | assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); |
655 | } |
656 | |
657 | #[tokio::test ] |
658 | async fn test_read_chunked_single_read() { |
659 | let mut mock_buf = &b"10 \r\n1234567890abcdef \r\n0 \r\n" [..]; |
660 | let buf = Decoder::chunked() |
661 | .decode_fut(&mut mock_buf) |
662 | .await |
663 | .expect("decode" ); |
664 | assert_eq!(16, buf.len()); |
665 | let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String" ); |
666 | assert_eq!("1234567890abcdef" , &result); |
667 | } |
668 | |
669 | #[tokio::test ] |
670 | async fn test_read_chunked_extensions_over_limit() { |
671 | // construct a chunked body where each individual chunked extension |
672 | // is totally fine, but combined is over the limit. |
673 | let per_chunk = super::CHUNKED_EXTENSIONS_LIMIT * 2 / 3; |
674 | let mut scratch = vec![]; |
675 | for _ in 0..2 { |
676 | scratch.extend(b"1;" ); |
677 | scratch.extend(b"x" .repeat(per_chunk as usize)); |
678 | scratch.extend(b" \r\nA \r\n" ); |
679 | } |
680 | scratch.extend(b"0 \r\n\r\n" ); |
681 | let mut mock_buf = Bytes::from(scratch); |
682 | |
683 | let mut decoder = Decoder::chunked(); |
684 | let buf1 = decoder.decode_fut(&mut mock_buf).await.expect("decode1" ); |
685 | assert_eq!(&buf1[..], b"A" ); |
686 | |
687 | let err = decoder |
688 | .decode_fut(&mut mock_buf) |
689 | .await |
690 | .expect_err("decode2" ); |
691 | assert_eq!(err.kind(), io::ErrorKind::InvalidData); |
692 | assert_eq!(err.to_string(), "chunk extensions over limit" ); |
693 | } |
694 | |
695 | #[cfg (not(miri))] |
696 | #[tokio::test ] |
697 | async fn test_read_chunked_trailer_with_missing_lf() { |
698 | let mut mock_buf = &b"10 \r\n1234567890abcdef \r\n0 \r\nbad \r\r\n" [..]; |
699 | let mut decoder = Decoder::chunked(); |
700 | decoder.decode_fut(&mut mock_buf).await.expect("decode" ); |
701 | let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err(); |
702 | assert_eq!(e.kind(), io::ErrorKind::InvalidInput); |
703 | } |
704 | |
705 | #[tokio::test ] |
706 | async fn test_read_chunked_after_eof() { |
707 | let mut mock_buf = &b"10 \r\n1234567890abcdef \r\n0 \r\n\r\n" [..]; |
708 | let mut decoder = Decoder::chunked(); |
709 | |
710 | // normal read |
711 | let buf = decoder.decode_fut(&mut mock_buf).await.unwrap(); |
712 | assert_eq!(16, buf.len()); |
713 | let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String" ); |
714 | assert_eq!("1234567890abcdef" , &result); |
715 | |
716 | // eof read |
717 | let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode" ); |
718 | assert_eq!(0, buf.len()); |
719 | |
720 | // ensure read after eof also returns eof |
721 | let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode" ); |
722 | assert_eq!(0, buf.len()); |
723 | } |
724 | |
725 | // perform an async read using a custom buffer size and causing a blocking |
726 | // read at the specified byte |
727 | async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String { |
728 | let mut outs = Vec::new(); |
729 | |
730 | let mut ins = if block_at == 0 { |
731 | tokio_test::io::Builder::new() |
732 | .wait(Duration::from_millis(10)) |
733 | .read(content) |
734 | .build() |
735 | } else { |
736 | tokio_test::io::Builder::new() |
737 | .read(&content[..block_at]) |
738 | .wait(Duration::from_millis(10)) |
739 | .read(&content[block_at..]) |
740 | .build() |
741 | }; |
742 | |
743 | let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin); |
744 | |
745 | loop { |
746 | let buf = decoder |
747 | .decode_fut(&mut ins) |
748 | .await |
749 | .expect("unexpected decode error" ); |
750 | if buf.is_empty() { |
751 | break; // eof |
752 | } |
753 | outs.extend(buf.as_ref()); |
754 | } |
755 | |
756 | String::from_utf8(outs).expect("decode String" ) |
757 | } |
758 | |
759 | // iterate over the different ways that this async read could go. |
760 | // tests blocking a read at each byte along the content - The shotgun approach |
761 | async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) { |
762 | let content_len = content.len(); |
763 | for block_at in 0..content_len { |
764 | let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await; |
765 | assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at); |
766 | } |
767 | } |
768 | |
769 | #[tokio::test ] |
770 | async fn test_read_length_async() { |
771 | let content = "foobar" ; |
772 | all_async_cases(content, content, Decoder::length(content.len() as u64)).await; |
773 | } |
774 | |
775 | #[tokio::test ] |
776 | async fn test_read_chunked_async() { |
777 | let content = "3 \r\nfoo \r\n3 \r\nbar \r\n0 \r\n\r\n" ; |
778 | let expected = "foobar" ; |
779 | all_async_cases(content, expected, Decoder::chunked()).await; |
780 | } |
781 | |
782 | #[tokio::test ] |
783 | async fn test_read_eof_async() { |
784 | let content = "foobar" ; |
785 | all_async_cases(content, content, Decoder::eof()).await; |
786 | } |
787 | |
788 | #[cfg (feature = "nightly" )] |
789 | #[bench ] |
790 | fn bench_decode_chunked_1kb(b: &mut test::Bencher) { |
791 | let rt = new_runtime(); |
792 | |
793 | const LEN: usize = 1024; |
794 | let mut vec = Vec::new(); |
795 | vec.extend(format!(" {:x}\r\n" , LEN).as_bytes()); |
796 | vec.extend(&[0; LEN][..]); |
797 | vec.extend(b" \r\n" ); |
798 | let content = Bytes::from(vec); |
799 | |
800 | b.bytes = LEN as u64; |
801 | |
802 | b.iter(|| { |
803 | let mut decoder = Decoder::chunked(); |
804 | rt.block_on(async { |
805 | let mut raw = content.clone(); |
806 | let chunk = decoder.decode_fut(&mut raw).await.unwrap(); |
807 | assert_eq!(chunk.len(), LEN); |
808 | }); |
809 | }); |
810 | } |
811 | |
812 | #[cfg (feature = "nightly" )] |
813 | #[bench ] |
814 | fn bench_decode_length_1kb(b: &mut test::Bencher) { |
815 | let rt = new_runtime(); |
816 | |
817 | const LEN: usize = 1024; |
818 | let content = Bytes::from(&[0; LEN][..]); |
819 | b.bytes = LEN as u64; |
820 | |
821 | b.iter(|| { |
822 | let mut decoder = Decoder::length(LEN as u64); |
823 | rt.block_on(async { |
824 | let mut raw = content.clone(); |
825 | let chunk = decoder.decode_fut(&mut raw).await.unwrap(); |
826 | assert_eq!(chunk.len(), LEN); |
827 | }); |
828 | }); |
829 | } |
830 | |
831 | #[cfg (feature = "nightly" )] |
832 | fn new_runtime() -> tokio::runtime::Runtime { |
833 | tokio::runtime::Builder::new_current_thread() |
834 | .enable_all() |
835 | .build() |
836 | .expect("rt build" ) |
837 | } |
838 | } |
839 | |