1 | use std::error::Error as StdError; |
2 | use std::fmt; |
3 | use std::io; |
4 | use std::usize; |
5 | |
6 | use bytes::Bytes; |
7 | use tracing::{debug, trace}; |
8 | |
9 | use crate::common::{task, Poll}; |
10 | |
11 | use super::io::MemRead; |
12 | use super::DecodedLength; |
13 | |
14 | use self::Kind::{Chunked, Eof, Length}; |
15 | |
16 | /// Decoders to handle different Transfer-Encodings. |
17 | /// |
18 | /// If a message body does not include a Transfer-Encoding, it *should* |
19 | /// include a Content-Length header. |
20 | #[derive (Clone, PartialEq)] |
21 | pub(crate) struct Decoder { |
22 | kind: Kind, |
23 | } |
24 | |
25 | #[derive (Debug, Clone, Copy, PartialEq)] |
26 | enum Kind { |
27 | /// A Reader used when a Content-Length header is passed with a positive integer. |
28 | Length(u64), |
29 | /// A Reader used when Transfer-Encoding is `chunked`. |
30 | Chunked(ChunkedState, u64), |
31 | /// A Reader used for responses that don't indicate a length or chunked. |
32 | /// |
33 | /// The bool tracks when EOF is seen on the transport. |
34 | /// |
35 | /// Note: This should only used for `Response`s. It is illegal for a |
36 | /// `Request` to be made with both `Content-Length` and |
37 | /// `Transfer-Encoding: chunked` missing, as explained from the spec: |
38 | /// |
39 | /// > If a Transfer-Encoding header field is present in a response and |
40 | /// > the chunked transfer coding is not the final encoding, the |
41 | /// > message body length is determined by reading the connection until |
42 | /// > it is closed by the server. If a Transfer-Encoding header field |
43 | /// > is present in a request and the chunked transfer coding is not |
44 | /// > the final encoding, the message body length cannot be determined |
45 | /// > reliably; the server MUST respond with the 400 (Bad Request) |
46 | /// > status code and then close the connection. |
47 | Eof(bool), |
48 | } |
49 | |
50 | #[derive (Debug, PartialEq, Clone, Copy)] |
51 | enum ChunkedState { |
52 | Size, |
53 | SizeLws, |
54 | Extension, |
55 | SizeLf, |
56 | Body, |
57 | BodyCr, |
58 | BodyLf, |
59 | Trailer, |
60 | TrailerLf, |
61 | EndCr, |
62 | EndLf, |
63 | End, |
64 | } |
65 | |
66 | impl Decoder { |
67 | // constructors |
68 | |
69 | pub(crate) fn length(x: u64) -> Decoder { |
70 | Decoder { |
71 | kind: Kind::Length(x), |
72 | } |
73 | } |
74 | |
75 | pub(crate) fn chunked() -> Decoder { |
76 | Decoder { |
77 | kind: Kind::Chunked(ChunkedState::Size, 0), |
78 | } |
79 | } |
80 | |
81 | pub(crate) fn eof() -> Decoder { |
82 | Decoder { |
83 | kind: Kind::Eof(false), |
84 | } |
85 | } |
86 | |
87 | pub(super) fn new(len: DecodedLength) -> Self { |
88 | match len { |
89 | DecodedLength::CHUNKED => Decoder::chunked(), |
90 | DecodedLength::CLOSE_DELIMITED => Decoder::eof(), |
91 | length => Decoder::length(length.danger_len()), |
92 | } |
93 | } |
94 | |
95 | // methods |
96 | |
97 | pub(crate) fn is_eof(&self) -> bool { |
98 | matches!(self.kind, Length(0) | Chunked(ChunkedState::End, _) | Eof(true)) |
99 | } |
100 | |
101 | pub(crate) fn decode<R: MemRead>( |
102 | &mut self, |
103 | cx: &mut task::Context<'_>, |
104 | body: &mut R, |
105 | ) -> Poll<Result<Bytes, io::Error>> { |
106 | trace!("decode; state= {:?}" , self.kind); |
107 | match self.kind { |
108 | Length(ref mut remaining) => { |
109 | if *remaining == 0 { |
110 | Poll::Ready(Ok(Bytes::new())) |
111 | } else { |
112 | let to_read = *remaining as usize; |
113 | let buf = ready!(body.read_mem(cx, to_read))?; |
114 | let num = buf.as_ref().len() as u64; |
115 | if num > *remaining { |
116 | *remaining = 0; |
117 | } else if num == 0 { |
118 | return Poll::Ready(Err(io::Error::new( |
119 | io::ErrorKind::UnexpectedEof, |
120 | IncompleteBody, |
121 | ))); |
122 | } else { |
123 | *remaining -= num; |
124 | } |
125 | Poll::Ready(Ok(buf)) |
126 | } |
127 | } |
128 | Chunked(ref mut state, ref mut size) => { |
129 | loop { |
130 | let mut buf = None; |
131 | // advances the chunked state |
132 | *state = ready!(state.step(cx, body, size, &mut buf))?; |
133 | if *state == ChunkedState::End { |
134 | trace!("end of chunked" ); |
135 | return Poll::Ready(Ok(Bytes::new())); |
136 | } |
137 | if let Some(buf) = buf { |
138 | return Poll::Ready(Ok(buf)); |
139 | } |
140 | } |
141 | } |
142 | Eof(ref mut is_eof) => { |
143 | if *is_eof { |
144 | Poll::Ready(Ok(Bytes::new())) |
145 | } else { |
146 | // 8192 chosen because its about 2 packets, there probably |
147 | // won't be that much available, so don't have MemReaders |
148 | // allocate buffers to big |
149 | body.read_mem(cx, 8192).map_ok(|slice| { |
150 | *is_eof = slice.is_empty(); |
151 | slice |
152 | }) |
153 | } |
154 | } |
155 | } |
156 | } |
157 | |
158 | #[cfg (test)] |
159 | async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> { |
160 | futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await |
161 | } |
162 | } |
163 | |
164 | impl fmt::Debug for Decoder { |
165 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
166 | fmt::Debug::fmt(&self.kind, f) |
167 | } |
168 | } |
169 | |
170 | macro_rules! byte ( |
171 | ($rdr:ident, $cx:expr) => ({ |
172 | let buf = ready!($rdr.read_mem($cx, 1))?; |
173 | if !buf.is_empty() { |
174 | buf[0] |
175 | } else { |
176 | return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof, |
177 | "unexpected EOF during chunk size line" ))); |
178 | } |
179 | }) |
180 | ); |
181 | |
182 | impl ChunkedState { |
183 | fn step<R: MemRead>( |
184 | &self, |
185 | cx: &mut task::Context<'_>, |
186 | body: &mut R, |
187 | size: &mut u64, |
188 | buf: &mut Option<Bytes>, |
189 | ) -> Poll<Result<ChunkedState, io::Error>> { |
190 | use self::ChunkedState::*; |
191 | match *self { |
192 | Size => ChunkedState::read_size(cx, body, size), |
193 | SizeLws => ChunkedState::read_size_lws(cx, body), |
194 | Extension => ChunkedState::read_extension(cx, body), |
195 | SizeLf => ChunkedState::read_size_lf(cx, body, *size), |
196 | Body => ChunkedState::read_body(cx, body, size, buf), |
197 | BodyCr => ChunkedState::read_body_cr(cx, body), |
198 | BodyLf => ChunkedState::read_body_lf(cx, body), |
199 | Trailer => ChunkedState::read_trailer(cx, body), |
200 | TrailerLf => ChunkedState::read_trailer_lf(cx, body), |
201 | EndCr => ChunkedState::read_end_cr(cx, body), |
202 | EndLf => ChunkedState::read_end_lf(cx, body), |
203 | End => Poll::Ready(Ok(ChunkedState::End)), |
204 | } |
205 | } |
206 | fn read_size<R: MemRead>( |
207 | cx: &mut task::Context<'_>, |
208 | rdr: &mut R, |
209 | size: &mut u64, |
210 | ) -> Poll<Result<ChunkedState, io::Error>> { |
211 | trace!("Read chunk hex size" ); |
212 | |
213 | macro_rules! or_overflow { |
214 | ($e:expr) => ( |
215 | match $e { |
216 | Some(val) => val, |
217 | None => return Poll::Ready(Err(io::Error::new( |
218 | io::ErrorKind::InvalidData, |
219 | "invalid chunk size: overflow" , |
220 | ))), |
221 | } |
222 | ) |
223 | } |
224 | |
225 | let radix = 16; |
226 | match byte!(rdr, cx) { |
227 | b @ b'0' ..=b'9' => { |
228 | *size = or_overflow!(size.checked_mul(radix)); |
229 | *size = or_overflow!(size.checked_add((b - b'0' ) as u64)); |
230 | } |
231 | b @ b'a' ..=b'f' => { |
232 | *size = or_overflow!(size.checked_mul(radix)); |
233 | *size = or_overflow!(size.checked_add((b + 10 - b'a' ) as u64)); |
234 | } |
235 | b @ b'A' ..=b'F' => { |
236 | *size = or_overflow!(size.checked_mul(radix)); |
237 | *size = or_overflow!(size.checked_add((b + 10 - b'A' ) as u64)); |
238 | } |
239 | b' \t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)), |
240 | b';' => return Poll::Ready(Ok(ChunkedState::Extension)), |
241 | b' \r' => return Poll::Ready(Ok(ChunkedState::SizeLf)), |
242 | _ => { |
243 | return Poll::Ready(Err(io::Error::new( |
244 | io::ErrorKind::InvalidInput, |
245 | "Invalid chunk size line: Invalid Size" , |
246 | ))); |
247 | } |
248 | } |
249 | Poll::Ready(Ok(ChunkedState::Size)) |
250 | } |
251 | fn read_size_lws<R: MemRead>( |
252 | cx: &mut task::Context<'_>, |
253 | rdr: &mut R, |
254 | ) -> Poll<Result<ChunkedState, io::Error>> { |
255 | trace!("read_size_lws" ); |
256 | match byte!(rdr, cx) { |
257 | // LWS can follow the chunk size, but no more digits can come |
258 | b' \t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)), |
259 | b';' => Poll::Ready(Ok(ChunkedState::Extension)), |
260 | b' \r' => Poll::Ready(Ok(ChunkedState::SizeLf)), |
261 | _ => Poll::Ready(Err(io::Error::new( |
262 | io::ErrorKind::InvalidInput, |
263 | "Invalid chunk size linear white space" , |
264 | ))), |
265 | } |
266 | } |
267 | fn read_extension<R: MemRead>( |
268 | cx: &mut task::Context<'_>, |
269 | rdr: &mut R, |
270 | ) -> Poll<Result<ChunkedState, io::Error>> { |
271 | trace!("read_extension" ); |
272 | // We don't care about extensions really at all. Just ignore them. |
273 | // They "end" at the next CRLF. |
274 | // |
275 | // However, some implementations may not check for the CR, so to save |
276 | // them from themselves, we reject extensions containing plain LF as |
277 | // well. |
278 | match byte!(rdr, cx) { |
279 | b' \r' => Poll::Ready(Ok(ChunkedState::SizeLf)), |
280 | b' \n' => Poll::Ready(Err(io::Error::new( |
281 | io::ErrorKind::InvalidData, |
282 | "invalid chunk extension contains newline" , |
283 | ))), |
284 | _ => Poll::Ready(Ok(ChunkedState::Extension)), // no supported extensions |
285 | } |
286 | } |
287 | fn read_size_lf<R: MemRead>( |
288 | cx: &mut task::Context<'_>, |
289 | rdr: &mut R, |
290 | size: u64, |
291 | ) -> Poll<Result<ChunkedState, io::Error>> { |
292 | trace!("Chunk size is {:?}" , size); |
293 | match byte!(rdr, cx) { |
294 | b' \n' => { |
295 | if size == 0 { |
296 | Poll::Ready(Ok(ChunkedState::EndCr)) |
297 | } else { |
298 | debug!("incoming chunked header: {0:#X} ( {0} bytes)" , size); |
299 | Poll::Ready(Ok(ChunkedState::Body)) |
300 | } |
301 | } |
302 | _ => Poll::Ready(Err(io::Error::new( |
303 | io::ErrorKind::InvalidInput, |
304 | "Invalid chunk size LF" , |
305 | ))), |
306 | } |
307 | } |
308 | |
309 | fn read_body<R: MemRead>( |
310 | cx: &mut task::Context<'_>, |
311 | rdr: &mut R, |
312 | rem: &mut u64, |
313 | buf: &mut Option<Bytes>, |
314 | ) -> Poll<Result<ChunkedState, io::Error>> { |
315 | trace!("Chunked read, remaining= {:?}" , rem); |
316 | |
317 | // cap remaining bytes at the max capacity of usize |
318 | let rem_cap = match *rem { |
319 | r if r > usize::MAX as u64 => usize::MAX, |
320 | r => r as usize, |
321 | }; |
322 | |
323 | let to_read = rem_cap; |
324 | let slice = ready!(rdr.read_mem(cx, to_read))?; |
325 | let count = slice.len(); |
326 | |
327 | if count == 0 { |
328 | *rem = 0; |
329 | return Poll::Ready(Err(io::Error::new( |
330 | io::ErrorKind::UnexpectedEof, |
331 | IncompleteBody, |
332 | ))); |
333 | } |
334 | *buf = Some(slice); |
335 | *rem -= count as u64; |
336 | |
337 | if *rem > 0 { |
338 | Poll::Ready(Ok(ChunkedState::Body)) |
339 | } else { |
340 | Poll::Ready(Ok(ChunkedState::BodyCr)) |
341 | } |
342 | } |
343 | fn read_body_cr<R: MemRead>( |
344 | cx: &mut task::Context<'_>, |
345 | rdr: &mut R, |
346 | ) -> Poll<Result<ChunkedState, io::Error>> { |
347 | match byte!(rdr, cx) { |
348 | b' \r' => Poll::Ready(Ok(ChunkedState::BodyLf)), |
349 | _ => Poll::Ready(Err(io::Error::new( |
350 | io::ErrorKind::InvalidInput, |
351 | "Invalid chunk body CR" , |
352 | ))), |
353 | } |
354 | } |
355 | fn read_body_lf<R: MemRead>( |
356 | cx: &mut task::Context<'_>, |
357 | rdr: &mut R, |
358 | ) -> Poll<Result<ChunkedState, io::Error>> { |
359 | match byte!(rdr, cx) { |
360 | b' \n' => Poll::Ready(Ok(ChunkedState::Size)), |
361 | _ => Poll::Ready(Err(io::Error::new( |
362 | io::ErrorKind::InvalidInput, |
363 | "Invalid chunk body LF" , |
364 | ))), |
365 | } |
366 | } |
367 | |
368 | fn read_trailer<R: MemRead>( |
369 | cx: &mut task::Context<'_>, |
370 | rdr: &mut R, |
371 | ) -> Poll<Result<ChunkedState, io::Error>> { |
372 | trace!("read_trailer" ); |
373 | match byte!(rdr, cx) { |
374 | b' \r' => Poll::Ready(Ok(ChunkedState::TrailerLf)), |
375 | _ => Poll::Ready(Ok(ChunkedState::Trailer)), |
376 | } |
377 | } |
378 | fn read_trailer_lf<R: MemRead>( |
379 | cx: &mut task::Context<'_>, |
380 | rdr: &mut R, |
381 | ) -> Poll<Result<ChunkedState, io::Error>> { |
382 | match byte!(rdr, cx) { |
383 | b' \n' => Poll::Ready(Ok(ChunkedState::EndCr)), |
384 | _ => Poll::Ready(Err(io::Error::new( |
385 | io::ErrorKind::InvalidInput, |
386 | "Invalid trailer end LF" , |
387 | ))), |
388 | } |
389 | } |
390 | |
391 | fn read_end_cr<R: MemRead>( |
392 | cx: &mut task::Context<'_>, |
393 | rdr: &mut R, |
394 | ) -> Poll<Result<ChunkedState, io::Error>> { |
395 | match byte!(rdr, cx) { |
396 | b' \r' => Poll::Ready(Ok(ChunkedState::EndLf)), |
397 | _ => Poll::Ready(Ok(ChunkedState::Trailer)), |
398 | } |
399 | } |
400 | fn read_end_lf<R: MemRead>( |
401 | cx: &mut task::Context<'_>, |
402 | rdr: &mut R, |
403 | ) -> Poll<Result<ChunkedState, io::Error>> { |
404 | match byte!(rdr, cx) { |
405 | b' \n' => Poll::Ready(Ok(ChunkedState::End)), |
406 | _ => Poll::Ready(Err(io::Error::new( |
407 | io::ErrorKind::InvalidInput, |
408 | "Invalid chunk end LF" , |
409 | ))), |
410 | } |
411 | } |
412 | } |
413 | |
414 | #[derive (Debug)] |
415 | struct IncompleteBody; |
416 | |
417 | impl fmt::Display for IncompleteBody { |
418 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
419 | write!(f, "end of file before message length reached" ) |
420 | } |
421 | } |
422 | |
423 | impl StdError for IncompleteBody {} |
424 | |
425 | #[cfg (test)] |
426 | mod tests { |
427 | use super::*; |
428 | use std::pin::Pin; |
429 | use std::time::Duration; |
430 | use tokio::io::{AsyncRead, ReadBuf}; |
431 | |
432 | impl<'a> MemRead for &'a [u8] { |
433 | fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { |
434 | let n = std::cmp::min(len, self.len()); |
435 | if n > 0 { |
436 | let (a, b) = self.split_at(n); |
437 | let buf = Bytes::copy_from_slice(a); |
438 | *self = b; |
439 | Poll::Ready(Ok(buf)) |
440 | } else { |
441 | Poll::Ready(Ok(Bytes::new())) |
442 | } |
443 | } |
444 | } |
445 | |
446 | impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) { |
447 | fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { |
448 | let mut v = vec![0; len]; |
449 | let mut buf = ReadBuf::new(&mut v); |
450 | ready!(Pin::new(self).poll_read(cx, &mut buf)?); |
451 | Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled()))) |
452 | } |
453 | } |
454 | |
455 | #[cfg (feature = "nightly" )] |
456 | impl MemRead for Bytes { |
457 | fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { |
458 | let n = std::cmp::min(len, self.len()); |
459 | let ret = self.split_to(n); |
460 | Poll::Ready(Ok(ret)) |
461 | } |
462 | } |
463 | |
464 | /* |
465 | use std::io; |
466 | use std::io::Write; |
467 | use super::Decoder; |
468 | use super::ChunkedState; |
469 | use futures::{Async, Poll}; |
470 | use bytes::{BytesMut, Bytes}; |
471 | use crate::mock::AsyncIo; |
472 | */ |
473 | |
474 | #[tokio::test] |
475 | async fn test_read_chunk_size() { |
476 | use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof}; |
477 | |
478 | async fn read(s: &str) -> u64 { |
479 | let mut state = ChunkedState::Size; |
480 | let rdr = &mut s.as_bytes(); |
481 | let mut size = 0; |
482 | loop { |
483 | let result = |
484 | futures_util::future::poll_fn(|cx| state.step(cx, rdr, &mut size, &mut None)) |
485 | .await; |
486 | let desc = format!("read_size failed for {:?}" , s); |
487 | state = result.expect(desc.as_str()); |
488 | if state == ChunkedState::Body || state == ChunkedState::EndCr { |
489 | break; |
490 | } |
491 | } |
492 | size |
493 | } |
494 | |
495 | async fn read_err(s: &str, expected_err: io::ErrorKind) { |
496 | let mut state = ChunkedState::Size; |
497 | let rdr = &mut s.as_bytes(); |
498 | let mut size = 0; |
499 | loop { |
500 | let result = |
501 | futures_util::future::poll_fn(|cx| state.step(cx, rdr, &mut size, &mut None)) |
502 | .await; |
503 | state = match result { |
504 | Ok(s) => s, |
505 | Err(e) => { |
506 | assert!( |
507 | expected_err == e.kind(), |
508 | "Reading {:?}, expected {:?}, but got {:?}" , |
509 | s, |
510 | expected_err, |
511 | e.kind() |
512 | ); |
513 | return; |
514 | } |
515 | }; |
516 | if state == ChunkedState::Body || state == ChunkedState::End { |
517 | panic!("Was Ok. Expected Err for {:?}" , s); |
518 | } |
519 | } |
520 | } |
521 | |
522 | assert_eq!(1, read("1 \r\n" ).await); |
523 | assert_eq!(1, read("01 \r\n" ).await); |
524 | assert_eq!(0, read("0 \r\n" ).await); |
525 | assert_eq!(0, read("00 \r\n" ).await); |
526 | assert_eq!(10, read("A \r\n" ).await); |
527 | assert_eq!(10, read("a \r\n" ).await); |
528 | assert_eq!(255, read("Ff \r\n" ).await); |
529 | assert_eq!(255, read("Ff \r\n" ).await); |
530 | // Missing LF or CRLF |
531 | read_err("F \rF" , InvalidInput).await; |
532 | read_err("F" , UnexpectedEof).await; |
533 | // Invalid hex digit |
534 | read_err("X \r\n" , InvalidInput).await; |
535 | read_err("1X \r\n" , InvalidInput).await; |
536 | read_err("- \r\n" , InvalidInput).await; |
537 | read_err("-1 \r\n" , InvalidInput).await; |
538 | // Acceptable (if not fully valid) extensions do not influence the size |
539 | assert_eq!(1, read("1;extension \r\n" ).await); |
540 | assert_eq!(10, read("a;ext name=value \r\n" ).await); |
541 | assert_eq!(1, read("1;extension;extension2 \r\n" ).await); |
542 | assert_eq!(1, read("1;;; ; \r\n" ).await); |
543 | assert_eq!(2, read("2; extension... \r\n" ).await); |
544 | assert_eq!(3, read("3 ; extension=123 \r\n" ).await); |
545 | assert_eq!(3, read("3 ; \r\n" ).await); |
546 | assert_eq!(3, read("3 ; \r\n" ).await); |
547 | // Invalid extensions cause an error |
548 | read_err("1 invalid extension \r\n" , InvalidInput).await; |
549 | read_err("1 A \r\n" , InvalidInput).await; |
550 | read_err("1;no CRLF" , UnexpectedEof).await; |
551 | read_err("1;reject \nnewlines \r\n" , InvalidData).await; |
552 | // Overflow |
553 | read_err("f0000000000000003 \r\n" , InvalidData).await; |
554 | } |
555 | |
556 | #[tokio::test] |
557 | async fn test_read_sized_early_eof() { |
558 | let mut bytes = &b"foo bar" [..]; |
559 | let mut decoder = Decoder::length(10); |
560 | assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); |
561 | let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); |
562 | assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); |
563 | } |
564 | |
565 | #[tokio::test] |
566 | async fn test_read_chunked_early_eof() { |
567 | let mut bytes = &b"\ |
568 | 9 \r\n\ |
569 | foo bar\ |
570 | " [..]; |
571 | let mut decoder = Decoder::chunked(); |
572 | assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); |
573 | let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); |
574 | assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); |
575 | } |
576 | |
577 | #[tokio::test] |
578 | async fn test_read_chunked_single_read() { |
579 | let mut mock_buf = &b"10 \r\n1234567890abcdef \r\n0 \r\n" [..]; |
580 | let buf = Decoder::chunked() |
581 | .decode_fut(&mut mock_buf) |
582 | .await |
583 | .expect("decode" ); |
584 | assert_eq!(16, buf.len()); |
585 | let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String" ); |
586 | assert_eq!("1234567890abcdef" , &result); |
587 | } |
588 | |
589 | #[tokio::test] |
590 | async fn test_read_chunked_trailer_with_missing_lf() { |
591 | let mut mock_buf = &b"10 \r\n1234567890abcdef \r\n0 \r\nbad \r\r\n" [..]; |
592 | let mut decoder = Decoder::chunked(); |
593 | decoder.decode_fut(&mut mock_buf).await.expect("decode" ); |
594 | let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err(); |
595 | assert_eq!(e.kind(), io::ErrorKind::InvalidInput); |
596 | } |
597 | |
598 | #[tokio::test] |
599 | async fn test_read_chunked_after_eof() { |
600 | let mut mock_buf = &b"10 \r\n1234567890abcdef \r\n0 \r\n\r\n" [..]; |
601 | let mut decoder = Decoder::chunked(); |
602 | |
603 | // normal read |
604 | let buf = decoder.decode_fut(&mut mock_buf).await.unwrap(); |
605 | assert_eq!(16, buf.len()); |
606 | let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String" ); |
607 | assert_eq!("1234567890abcdef" , &result); |
608 | |
609 | // eof read |
610 | let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode" ); |
611 | assert_eq!(0, buf.len()); |
612 | |
613 | // ensure read after eof also returns eof |
614 | let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode" ); |
615 | assert_eq!(0, buf.len()); |
616 | } |
617 | |
618 | // perform an async read using a custom buffer size and causing a blocking |
619 | // read at the specified byte |
620 | async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String { |
621 | let mut outs = Vec::new(); |
622 | |
623 | let mut ins = if block_at == 0 { |
624 | tokio_test::io::Builder::new() |
625 | .wait(Duration::from_millis(10)) |
626 | .read(content) |
627 | .build() |
628 | } else { |
629 | tokio_test::io::Builder::new() |
630 | .read(&content[..block_at]) |
631 | .wait(Duration::from_millis(10)) |
632 | .read(&content[block_at..]) |
633 | .build() |
634 | }; |
635 | |
636 | let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin); |
637 | |
638 | loop { |
639 | let buf = decoder |
640 | .decode_fut(&mut ins) |
641 | .await |
642 | .expect("unexpected decode error" ); |
643 | if buf.is_empty() { |
644 | break; // eof |
645 | } |
646 | outs.extend(buf.as_ref()); |
647 | } |
648 | |
649 | String::from_utf8(outs).expect("decode String" ) |
650 | } |
651 | |
652 | // iterate over the different ways that this async read could go. |
653 | // tests blocking a read at each byte along the content - The shotgun approach |
654 | async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) { |
655 | let content_len = content.len(); |
656 | for block_at in 0..content_len { |
657 | let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await; |
658 | assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at); |
659 | } |
660 | } |
661 | |
662 | #[tokio::test] |
663 | async fn test_read_length_async() { |
664 | let content = "foobar" ; |
665 | all_async_cases(content, content, Decoder::length(content.len() as u64)).await; |
666 | } |
667 | |
668 | #[tokio::test] |
669 | async fn test_read_chunked_async() { |
670 | let content = "3 \r\nfoo \r\n3 \r\nbar \r\n0 \r\n\r\n" ; |
671 | let expected = "foobar" ; |
672 | all_async_cases(content, expected, Decoder::chunked()).await; |
673 | } |
674 | |
675 | #[tokio::test] |
676 | async fn test_read_eof_async() { |
677 | let content = "foobar" ; |
678 | all_async_cases(content, content, Decoder::eof()).await; |
679 | } |
680 | |
681 | #[cfg (feature = "nightly" )] |
682 | #[bench ] |
683 | fn bench_decode_chunked_1kb(b: &mut test::Bencher) { |
684 | let rt = new_runtime(); |
685 | |
686 | const LEN: usize = 1024; |
687 | let mut vec = Vec::new(); |
688 | vec.extend(format!(" {:x}\r\n" , LEN).as_bytes()); |
689 | vec.extend(&[0; LEN][..]); |
690 | vec.extend(b" \r\n" ); |
691 | let content = Bytes::from(vec); |
692 | |
693 | b.bytes = LEN as u64; |
694 | |
695 | b.iter(|| { |
696 | let mut decoder = Decoder::chunked(); |
697 | rt.block_on(async { |
698 | let mut raw = content.clone(); |
699 | let chunk = decoder.decode_fut(&mut raw).await.unwrap(); |
700 | assert_eq!(chunk.len(), LEN); |
701 | }); |
702 | }); |
703 | } |
704 | |
705 | #[cfg (feature = "nightly" )] |
706 | #[bench ] |
707 | fn bench_decode_length_1kb(b: &mut test::Bencher) { |
708 | let rt = new_runtime(); |
709 | |
710 | const LEN: usize = 1024; |
711 | let content = Bytes::from(&[0; LEN][..]); |
712 | b.bytes = LEN as u64; |
713 | |
714 | b.iter(|| { |
715 | let mut decoder = Decoder::length(LEN as u64); |
716 | rt.block_on(async { |
717 | let mut raw = content.clone(); |
718 | let chunk = decoder.decode_fut(&mut raw).await.unwrap(); |
719 | assert_eq!(chunk.len(), LEN); |
720 | }); |
721 | }); |
722 | } |
723 | |
724 | #[cfg (feature = "nightly" )] |
725 | fn new_runtime() -> tokio::runtime::Runtime { |
726 | tokio::runtime::Builder::new_current_thread() |
727 | .enable_all() |
728 | .build() |
729 | .expect("rt build" ) |
730 | } |
731 | } |
732 | |