1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6#[cfg(feature = "gzip")]
7use async_compression::tokio::bufread::GzipDecoder;
8
9#[cfg(feature = "brotli")]
10use async_compression::tokio::bufread::BrotliDecoder;
11
12#[cfg(feature = "deflate")]
13use async_compression::tokio::bufread::ZlibDecoder;
14
15use bytes::Bytes;
16use futures_core::Stream;
17use futures_util::stream::Peekable;
18use http::HeaderMap;
19use hyper::body::HttpBody;
20
21#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
22use tokio_util::codec::{BytesCodec, FramedRead};
23#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
24use tokio_util::io::StreamReader;
25
26use super::super::Body;
27use crate::error;
28
29#[derive(Clone, Copy, Debug)]
30pub(super) struct Accepts {
31 #[cfg(feature = "gzip")]
32 pub(super) gzip: bool,
33 #[cfg(feature = "brotli")]
34 pub(super) brotli: bool,
35 #[cfg(feature = "deflate")]
36 pub(super) deflate: bool,
37}
38
39/// A response decompressor over a non-blocking stream of chunks.
40///
41/// The inner decoder may be constructed asynchronously.
42pub(crate) struct Decoder {
43 inner: Inner,
44}
45
46type PeekableIoStream = Peekable<IoStream>;
47
48#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
49type PeekableIoStreamReader = StreamReader<PeekableIoStream, Bytes>;
50
51enum Inner {
52 /// A `PlainText` decoder just returns the response content as is.
53 PlainText(super::body::ImplStream),
54
55 /// A `Gzip` decoder will uncompress the gzipped response content before returning it.
56 #[cfg(feature = "gzip")]
57 Gzip(Pin<Box<FramedRead<GzipDecoder<PeekableIoStreamReader>, BytesCodec>>>),
58
59 /// A `Brotli` decoder will uncompress the brotlied response content before returning it.
60 #[cfg(feature = "brotli")]
61 Brotli(Pin<Box<FramedRead<BrotliDecoder<PeekableIoStreamReader>, BytesCodec>>>),
62
63 /// A `Deflate` decoder will uncompress the deflated response content before returning it.
64 #[cfg(feature = "deflate")]
65 Deflate(Pin<Box<FramedRead<ZlibDecoder<PeekableIoStreamReader>, BytesCodec>>>),
66
67 /// A decoder that doesn't have a value yet.
68 #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
69 Pending(Pin<Box<Pending>>),
70}
71
72/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
73struct Pending(PeekableIoStream, DecoderType);
74
75struct IoStream(super::body::ImplStream);
76
77enum DecoderType {
78 #[cfg(feature = "gzip")]
79 Gzip,
80 #[cfg(feature = "brotli")]
81 Brotli,
82 #[cfg(feature = "deflate")]
83 Deflate,
84}
85
86impl fmt::Debug for Decoder {
87 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
88 f.debug_struct(name:"Decoder").finish()
89 }
90}
91
92impl Decoder {
93 #[cfg(feature = "blocking")]
94 pub(crate) fn empty() -> Decoder {
95 Decoder {
96 inner: Inner::PlainText(Body::empty().into_stream()),
97 }
98 }
99
100 /// A plain text decoder.
101 ///
102 /// This decoder will emit the underlying chunks as-is.
103 fn plain_text(body: Body) -> Decoder {
104 Decoder {
105 inner: Inner::PlainText(body.into_stream()),
106 }
107 }
108
109 /// A gzip decoder.
110 ///
111 /// This decoder will buffer and decompress chunks that are gzipped.
112 #[cfg(feature = "gzip")]
113 fn gzip(body: Body) -> Decoder {
114 use futures_util::StreamExt;
115
116 Decoder {
117 inner: Inner::Pending(Box::pin(Pending(
118 IoStream(body.into_stream()).peekable(),
119 DecoderType::Gzip,
120 ))),
121 }
122 }
123
124 /// A brotli decoder.
125 ///
126 /// This decoder will buffer and decompress chunks that are brotlied.
127 #[cfg(feature = "brotli")]
128 fn brotli(body: Body) -> Decoder {
129 use futures_util::StreamExt;
130
131 Decoder {
132 inner: Inner::Pending(Box::pin(Pending(
133 IoStream(body.into_stream()).peekable(),
134 DecoderType::Brotli,
135 ))),
136 }
137 }
138
139 /// A deflate decoder.
140 ///
141 /// This decoder will buffer and decompress chunks that are deflated.
142 #[cfg(feature = "deflate")]
143 fn deflate(body: Body) -> Decoder {
144 use futures_util::StreamExt;
145
146 Decoder {
147 inner: Inner::Pending(Box::pin(Pending(
148 IoStream(body.into_stream()).peekable(),
149 DecoderType::Deflate,
150 ))),
151 }
152 }
153
154 #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
155 fn detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool {
156 use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
157 use log::warn;
158
159 let mut is_content_encoded = {
160 headers
161 .get_all(CONTENT_ENCODING)
162 .iter()
163 .any(|enc| enc == encoding_str)
164 || headers
165 .get_all(TRANSFER_ENCODING)
166 .iter()
167 .any(|enc| enc == encoding_str)
168 };
169 if is_content_encoded {
170 if let Some(content_length) = headers.get(CONTENT_LENGTH) {
171 if content_length == "0" {
172 warn!("{encoding_str} response with content-length of 0");
173 is_content_encoded = false;
174 }
175 }
176 }
177 if is_content_encoded {
178 headers.remove(CONTENT_ENCODING);
179 headers.remove(CONTENT_LENGTH);
180 }
181 is_content_encoded
182 }
183
184 /// Constructs a Decoder from a hyper request.
185 ///
186 /// A decoder is just a wrapper around the hyper request that knows
187 /// how to decode the content body of the request.
188 ///
189 /// Uses the correct variant by inspecting the Content-Encoding header.
190 pub(super) fn detect(_headers: &mut HeaderMap, body: Body, _accepts: Accepts) -> Decoder {
191 #[cfg(feature = "gzip")]
192 {
193 if _accepts.gzip && Decoder::detect_encoding(_headers, "gzip") {
194 return Decoder::gzip(body);
195 }
196 }
197
198 #[cfg(feature = "brotli")]
199 {
200 if _accepts.brotli && Decoder::detect_encoding(_headers, "br") {
201 return Decoder::brotli(body);
202 }
203 }
204
205 #[cfg(feature = "deflate")]
206 {
207 if _accepts.deflate && Decoder::detect_encoding(_headers, "deflate") {
208 return Decoder::deflate(body);
209 }
210 }
211
212 Decoder::plain_text(body)
213 }
214}
215
216impl Stream for Decoder {
217 type Item = Result<Bytes, error::Error>;
218
219 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
220 // Do a read or poll for a pending decoder value.
221 match self.inner {
222 #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
223 Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
224 Poll::Ready(Ok(inner)) => {
225 self.inner = inner;
226 self.poll_next(cx)
227 }
228 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(crate::error::decode_io(e)))),
229 Poll::Pending => Poll::Pending,
230 },
231 Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx),
232 #[cfg(feature = "gzip")]
233 Inner::Gzip(ref mut decoder) => {
234 match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
235 Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
236 Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
237 None => Poll::Ready(None),
238 }
239 }
240 #[cfg(feature = "brotli")]
241 Inner::Brotli(ref mut decoder) => {
242 match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
243 Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
244 Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
245 None => Poll::Ready(None),
246 }
247 }
248 #[cfg(feature = "deflate")]
249 Inner::Deflate(ref mut decoder) => {
250 match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
251 Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
252 Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
253 None => Poll::Ready(None),
254 }
255 }
256 }
257 }
258}
259
260impl HttpBody for Decoder {
261 type Data = Bytes;
262 type Error = crate::Error;
263
264 fn poll_data(
265 self: Pin<&mut Self>,
266 cx: &mut Context,
267 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
268 self.poll_next(cx)
269 }
270
271 fn poll_trailers(
272 self: Pin<&mut Self>,
273 _cx: &mut Context,
274 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
275 Poll::Ready(Ok(None))
276 }
277
278 fn size_hint(&self) -> http_body::SizeHint {
279 match self.inner {
280 Inner::PlainText(ref body) => HttpBody::size_hint(body),
281 // the rest are "unknown", so default
282 #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
283 _ => http_body::SizeHint::default(),
284 }
285 }
286}
287
288impl Future for Pending {
289 type Output = Result<Inner, std::io::Error>;
290
291 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
292 use futures_util::StreamExt;
293
294 match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) {
295 Some(Ok(_)) => {
296 // fallthrough
297 }
298 Some(Err(_e)) => {
299 // error was just a ref, so we need to really poll to move it
300 return Poll::Ready(Err(futures_core::ready!(
301 Pin::new(&mut self.0).poll_next(cx)
302 )
303 .expect("just peeked Some")
304 .unwrap_err()));
305 }
306 None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))),
307 };
308
309 let _body = std::mem::replace(
310 &mut self.0,
311 IoStream(Body::empty().into_stream()).peekable(),
312 );
313
314 match self.1 {
315 #[cfg(feature = "brotli")]
316 DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(Box::pin(FramedRead::new(
317 BrotliDecoder::new(StreamReader::new(_body)),
318 BytesCodec::new(),
319 ))))),
320 #[cfg(feature = "gzip")]
321 DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new(
322 GzipDecoder::new(StreamReader::new(_body)),
323 BytesCodec::new(),
324 ))))),
325 #[cfg(feature = "deflate")]
326 DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(Box::pin(FramedRead::new(
327 ZlibDecoder::new(StreamReader::new(_body)),
328 BytesCodec::new(),
329 ))))),
330 }
331 }
332}
333
334impl Stream for IoStream {
335 type Item = Result<Bytes, std::io::Error>;
336
337 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
338 match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) {
339 Some(Ok(chunk: Bytes)) => Poll::Ready(Some(Ok(chunk))),
340 Some(Err(err: Error)) => Poll::Ready(Some(Err(err.into_io()))),
341 None => Poll::Ready(None),
342 }
343 }
344}
345
346// ===== impl Accepts =====
347
348impl Accepts {
349 pub(super) fn none() -> Self {
350 Accepts {
351 #[cfg(feature = "gzip")]
352 gzip: false,
353 #[cfg(feature = "brotli")]
354 brotli: false,
355 #[cfg(feature = "deflate")]
356 deflate: false,
357 }
358 }
359
360 pub(super) fn as_str(&self) -> Option<&'static str> {
361 match (self.is_gzip(), self.is_brotli(), self.is_deflate()) {
362 (true, true, true) => Some("gzip, br, deflate"),
363 (true, true, false) => Some("gzip, br"),
364 (true, false, true) => Some("gzip, deflate"),
365 (false, true, true) => Some("br, deflate"),
366 (true, false, false) => Some("gzip"),
367 (false, true, false) => Some("br"),
368 (false, false, true) => Some("deflate"),
369 (false, false, false) => None,
370 }
371 }
372
373 fn is_gzip(&self) -> bool {
374 #[cfg(feature = "gzip")]
375 {
376 self.gzip
377 }
378
379 #[cfg(not(feature = "gzip"))]
380 {
381 false
382 }
383 }
384
385 fn is_brotli(&self) -> bool {
386 #[cfg(feature = "brotli")]
387 {
388 self.brotli
389 }
390
391 #[cfg(not(feature = "brotli"))]
392 {
393 false
394 }
395 }
396
397 fn is_deflate(&self) -> bool {
398 #[cfg(feature = "deflate")]
399 {
400 self.deflate
401 }
402
403 #[cfg(not(feature = "deflate"))]
404 {
405 false
406 }
407 }
408}
409
410impl Default for Accepts {
411 fn default() -> Accepts {
412 Accepts {
413 #[cfg(feature = "gzip")]
414 gzip: true,
415 #[cfg(feature = "brotli")]
416 brotli: true,
417 #[cfg(feature = "deflate")]
418 deflate: true,
419 }
420 }
421}
422