1 | use std::fmt; |
2 | use std::future::Future; |
3 | use std::pin::Pin; |
4 | use std::task::{Context, Poll}; |
5 | |
6 | #[cfg (feature = "gzip" )] |
7 | use async_compression::tokio::bufread::GzipDecoder; |
8 | |
9 | #[cfg (feature = "brotli" )] |
10 | use async_compression::tokio::bufread::BrotliDecoder; |
11 | |
12 | #[cfg (feature = "deflate" )] |
13 | use async_compression::tokio::bufread::ZlibDecoder; |
14 | |
15 | use bytes::Bytes; |
16 | use futures_core::Stream; |
17 | use futures_util::stream::Peekable; |
18 | use http::HeaderMap; |
19 | use hyper::body::HttpBody; |
20 | |
21 | #[cfg (any(feature = "gzip" , feature = "brotli" , feature = "deflate" ))] |
22 | use tokio_util::codec::{BytesCodec, FramedRead}; |
23 | #[cfg (any(feature = "gzip" , feature = "brotli" , feature = "deflate" ))] |
24 | use tokio_util::io::StreamReader; |
25 | |
26 | use super::super::Body; |
27 | use crate::error; |
28 | |
29 | #[derive (Clone, Copy, Debug)] |
30 | pub(super) struct Accepts { |
31 | #[cfg (feature = "gzip" )] |
32 | pub(super) gzip: bool, |
33 | #[cfg (feature = "brotli" )] |
34 | pub(super) brotli: bool, |
35 | #[cfg (feature = "deflate" )] |
36 | pub(super) deflate: bool, |
37 | } |
38 | |
39 | /// A response decompressor over a non-blocking stream of chunks. |
40 | /// |
41 | /// The inner decoder may be constructed asynchronously. |
42 | pub(crate) struct Decoder { |
43 | inner: Inner, |
44 | } |
45 | |
46 | type PeekableIoStream = Peekable<IoStream>; |
47 | |
48 | #[cfg (any(feature = "gzip" , feature = "brotli" , feature = "deflate" ))] |
49 | type PeekableIoStreamReader = StreamReader<PeekableIoStream, Bytes>; |
50 | |
51 | enum Inner { |
52 | /// A `PlainText` decoder just returns the response content as is. |
53 | PlainText(super::body::ImplStream), |
54 | |
55 | /// A `Gzip` decoder will uncompress the gzipped response content before returning it. |
56 | #[cfg (feature = "gzip" )] |
57 | Gzip(Pin<Box<FramedRead<GzipDecoder<PeekableIoStreamReader>, BytesCodec>>>), |
58 | |
59 | /// A `Brotli` decoder will uncompress the brotlied response content before returning it. |
60 | #[cfg (feature = "brotli" )] |
61 | Brotli(Pin<Box<FramedRead<BrotliDecoder<PeekableIoStreamReader>, BytesCodec>>>), |
62 | |
63 | /// A `Deflate` decoder will uncompress the deflated response content before returning it. |
64 | #[cfg (feature = "deflate" )] |
65 | Deflate(Pin<Box<FramedRead<ZlibDecoder<PeekableIoStreamReader>, BytesCodec>>>), |
66 | |
67 | /// A decoder that doesn't have a value yet. |
68 | #[cfg (any(feature = "brotli" , feature = "gzip" , feature = "deflate" ))] |
69 | Pending(Pin<Box<Pending>>), |
70 | } |
71 | |
72 | /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. |
73 | struct Pending(PeekableIoStream, DecoderType); |
74 | |
75 | struct IoStream(super::body::ImplStream); |
76 | |
77 | enum DecoderType { |
78 | #[cfg (feature = "gzip" )] |
79 | Gzip, |
80 | #[cfg (feature = "brotli" )] |
81 | Brotli, |
82 | #[cfg (feature = "deflate" )] |
83 | Deflate, |
84 | } |
85 | |
86 | impl fmt::Debug for Decoder { |
87 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
88 | f.debug_struct(name:"Decoder" ).finish() |
89 | } |
90 | } |
91 | |
92 | impl Decoder { |
93 | #[cfg (feature = "blocking" )] |
94 | pub(crate) fn empty() -> Decoder { |
95 | Decoder { |
96 | inner: Inner::PlainText(Body::empty().into_stream()), |
97 | } |
98 | } |
99 | |
100 | /// A plain text decoder. |
101 | /// |
102 | /// This decoder will emit the underlying chunks as-is. |
103 | fn plain_text(body: Body) -> Decoder { |
104 | Decoder { |
105 | inner: Inner::PlainText(body.into_stream()), |
106 | } |
107 | } |
108 | |
109 | /// A gzip decoder. |
110 | /// |
111 | /// This decoder will buffer and decompress chunks that are gzipped. |
112 | #[cfg (feature = "gzip" )] |
113 | fn gzip(body: Body) -> Decoder { |
114 | use futures_util::StreamExt; |
115 | |
116 | Decoder { |
117 | inner: Inner::Pending(Box::pin(Pending( |
118 | IoStream(body.into_stream()).peekable(), |
119 | DecoderType::Gzip, |
120 | ))), |
121 | } |
122 | } |
123 | |
124 | /// A brotli decoder. |
125 | /// |
126 | /// This decoder will buffer and decompress chunks that are brotlied. |
127 | #[cfg (feature = "brotli" )] |
128 | fn brotli(body: Body) -> Decoder { |
129 | use futures_util::StreamExt; |
130 | |
131 | Decoder { |
132 | inner: Inner::Pending(Box::pin(Pending( |
133 | IoStream(body.into_stream()).peekable(), |
134 | DecoderType::Brotli, |
135 | ))), |
136 | } |
137 | } |
138 | |
139 | /// A deflate decoder. |
140 | /// |
141 | /// This decoder will buffer and decompress chunks that are deflated. |
142 | #[cfg (feature = "deflate" )] |
143 | fn deflate(body: Body) -> Decoder { |
144 | use futures_util::StreamExt; |
145 | |
146 | Decoder { |
147 | inner: Inner::Pending(Box::pin(Pending( |
148 | IoStream(body.into_stream()).peekable(), |
149 | DecoderType::Deflate, |
150 | ))), |
151 | } |
152 | } |
153 | |
154 | #[cfg (any(feature = "brotli" , feature = "gzip" , feature = "deflate" ))] |
155 | fn detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool { |
156 | use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; |
157 | use log::warn; |
158 | |
159 | let mut is_content_encoded = { |
160 | headers |
161 | .get_all(CONTENT_ENCODING) |
162 | .iter() |
163 | .any(|enc| enc == encoding_str) |
164 | || headers |
165 | .get_all(TRANSFER_ENCODING) |
166 | .iter() |
167 | .any(|enc| enc == encoding_str) |
168 | }; |
169 | if is_content_encoded { |
170 | if let Some(content_length) = headers.get(CONTENT_LENGTH) { |
171 | if content_length == "0" { |
172 | warn!("{encoding_str} response with content-length of 0" ); |
173 | is_content_encoded = false; |
174 | } |
175 | } |
176 | } |
177 | if is_content_encoded { |
178 | headers.remove(CONTENT_ENCODING); |
179 | headers.remove(CONTENT_LENGTH); |
180 | } |
181 | is_content_encoded |
182 | } |
183 | |
184 | /// Constructs a Decoder from a hyper request. |
185 | /// |
186 | /// A decoder is just a wrapper around the hyper request that knows |
187 | /// how to decode the content body of the request. |
188 | /// |
189 | /// Uses the correct variant by inspecting the Content-Encoding header. |
190 | pub(super) fn detect(_headers: &mut HeaderMap, body: Body, _accepts: Accepts) -> Decoder { |
191 | #[cfg (feature = "gzip" )] |
192 | { |
193 | if _accepts.gzip && Decoder::detect_encoding(_headers, "gzip" ) { |
194 | return Decoder::gzip(body); |
195 | } |
196 | } |
197 | |
198 | #[cfg (feature = "brotli" )] |
199 | { |
200 | if _accepts.brotli && Decoder::detect_encoding(_headers, "br" ) { |
201 | return Decoder::brotli(body); |
202 | } |
203 | } |
204 | |
205 | #[cfg (feature = "deflate" )] |
206 | { |
207 | if _accepts.deflate && Decoder::detect_encoding(_headers, "deflate" ) { |
208 | return Decoder::deflate(body); |
209 | } |
210 | } |
211 | |
212 | Decoder::plain_text(body) |
213 | } |
214 | } |
215 | |
216 | impl Stream for Decoder { |
217 | type Item = Result<Bytes, error::Error>; |
218 | |
219 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { |
220 | // Do a read or poll for a pending decoder value. |
221 | match self.inner { |
222 | #[cfg (any(feature = "brotli" , feature = "gzip" , feature = "deflate" ))] |
223 | Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { |
224 | Poll::Ready(Ok(inner)) => { |
225 | self.inner = inner; |
226 | self.poll_next(cx) |
227 | } |
228 | Poll::Ready(Err(e)) => Poll::Ready(Some(Err(crate::error::decode_io(e)))), |
229 | Poll::Pending => Poll::Pending, |
230 | }, |
231 | Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx), |
232 | #[cfg (feature = "gzip" )] |
233 | Inner::Gzip(ref mut decoder) => { |
234 | match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { |
235 | Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), |
236 | Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), |
237 | None => Poll::Ready(None), |
238 | } |
239 | } |
240 | #[cfg (feature = "brotli" )] |
241 | Inner::Brotli(ref mut decoder) => { |
242 | match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { |
243 | Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), |
244 | Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), |
245 | None => Poll::Ready(None), |
246 | } |
247 | } |
248 | #[cfg (feature = "deflate" )] |
249 | Inner::Deflate(ref mut decoder) => { |
250 | match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { |
251 | Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), |
252 | Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), |
253 | None => Poll::Ready(None), |
254 | } |
255 | } |
256 | } |
257 | } |
258 | } |
259 | |
260 | impl HttpBody for Decoder { |
261 | type Data = Bytes; |
262 | type Error = crate::Error; |
263 | |
264 | fn poll_data( |
265 | self: Pin<&mut Self>, |
266 | cx: &mut Context, |
267 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
268 | self.poll_next(cx) |
269 | } |
270 | |
271 | fn poll_trailers( |
272 | self: Pin<&mut Self>, |
273 | _cx: &mut Context, |
274 | ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { |
275 | Poll::Ready(Ok(None)) |
276 | } |
277 | |
278 | fn size_hint(&self) -> http_body::SizeHint { |
279 | match self.inner { |
280 | Inner::PlainText(ref body) => HttpBody::size_hint(body), |
281 | // the rest are "unknown", so default |
282 | #[cfg (any(feature = "brotli" , feature = "gzip" , feature = "deflate" ))] |
283 | _ => http_body::SizeHint::default(), |
284 | } |
285 | } |
286 | } |
287 | |
288 | impl Future for Pending { |
289 | type Output = Result<Inner, std::io::Error>; |
290 | |
291 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
292 | use futures_util::StreamExt; |
293 | |
294 | match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) { |
295 | Some(Ok(_)) => { |
296 | // fallthrough |
297 | } |
298 | Some(Err(_e)) => { |
299 | // error was just a ref, so we need to really poll to move it |
300 | return Poll::Ready(Err(futures_core::ready!( |
301 | Pin::new(&mut self.0).poll_next(cx) |
302 | ) |
303 | .expect("just peeked Some" ) |
304 | .unwrap_err())); |
305 | } |
306 | None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), |
307 | }; |
308 | |
309 | let _body = std::mem::replace( |
310 | &mut self.0, |
311 | IoStream(Body::empty().into_stream()).peekable(), |
312 | ); |
313 | |
314 | match self.1 { |
315 | #[cfg (feature = "brotli" )] |
316 | DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(Box::pin(FramedRead::new( |
317 | BrotliDecoder::new(StreamReader::new(_body)), |
318 | BytesCodec::new(), |
319 | ))))), |
320 | #[cfg (feature = "gzip" )] |
321 | DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new( |
322 | GzipDecoder::new(StreamReader::new(_body)), |
323 | BytesCodec::new(), |
324 | ))))), |
325 | #[cfg (feature = "deflate" )] |
326 | DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(Box::pin(FramedRead::new( |
327 | ZlibDecoder::new(StreamReader::new(_body)), |
328 | BytesCodec::new(), |
329 | ))))), |
330 | } |
331 | } |
332 | } |
333 | |
334 | impl Stream for IoStream { |
335 | type Item = Result<Bytes, std::io::Error>; |
336 | |
337 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { |
338 | match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { |
339 | Some(Ok(chunk: Bytes)) => Poll::Ready(Some(Ok(chunk))), |
340 | Some(Err(err: Error)) => Poll::Ready(Some(Err(err.into_io()))), |
341 | None => Poll::Ready(None), |
342 | } |
343 | } |
344 | } |
345 | |
346 | // ===== impl Accepts ===== |
347 | |
348 | impl Accepts { |
349 | pub(super) fn none() -> Self { |
350 | Accepts { |
351 | #[cfg (feature = "gzip" )] |
352 | gzip: false, |
353 | #[cfg (feature = "brotli" )] |
354 | brotli: false, |
355 | #[cfg (feature = "deflate" )] |
356 | deflate: false, |
357 | } |
358 | } |
359 | |
360 | pub(super) fn as_str(&self) -> Option<&'static str> { |
361 | match (self.is_gzip(), self.is_brotli(), self.is_deflate()) { |
362 | (true, true, true) => Some("gzip, br, deflate" ), |
363 | (true, true, false) => Some("gzip, br" ), |
364 | (true, false, true) => Some("gzip, deflate" ), |
365 | (false, true, true) => Some("br, deflate" ), |
366 | (true, false, false) => Some("gzip" ), |
367 | (false, true, false) => Some("br" ), |
368 | (false, false, true) => Some("deflate" ), |
369 | (false, false, false) => None, |
370 | } |
371 | } |
372 | |
373 | fn is_gzip(&self) -> bool { |
374 | #[cfg (feature = "gzip" )] |
375 | { |
376 | self.gzip |
377 | } |
378 | |
379 | #[cfg (not(feature = "gzip" ))] |
380 | { |
381 | false |
382 | } |
383 | } |
384 | |
385 | fn is_brotli(&self) -> bool { |
386 | #[cfg (feature = "brotli" )] |
387 | { |
388 | self.brotli |
389 | } |
390 | |
391 | #[cfg (not(feature = "brotli" ))] |
392 | { |
393 | false |
394 | } |
395 | } |
396 | |
397 | fn is_deflate(&self) -> bool { |
398 | #[cfg (feature = "deflate" )] |
399 | { |
400 | self.deflate |
401 | } |
402 | |
403 | #[cfg (not(feature = "deflate" ))] |
404 | { |
405 | false |
406 | } |
407 | } |
408 | } |
409 | |
410 | impl Default for Accepts { |
411 | fn default() -> Accepts { |
412 | Accepts { |
413 | #[cfg (feature = "gzip" )] |
414 | gzip: true, |
415 | #[cfg (feature = "brotli" )] |
416 | brotli: true, |
417 | #[cfg (feature = "deflate" )] |
418 | deflate: true, |
419 | } |
420 | } |
421 | } |
422 | |