1 | use std::fmt; |
2 | use std::future::Future; |
3 | use std::pin::Pin; |
4 | use std::task::{Context, Poll}; |
5 | |
6 | use bytes::Bytes; |
7 | use futures_core::Stream; |
8 | use http_body::Body as HttpBody; |
9 | use pin_project_lite::pin_project; |
10 | #[cfg (feature = "stream" )] |
11 | use tokio::fs::File; |
12 | use tokio::time::Sleep; |
13 | #[cfg (feature = "stream" )] |
14 | use tokio_util::io::ReaderStream; |
15 | |
16 | /// An asynchronous request body. |
17 | pub struct Body { |
18 | inner: Inner, |
19 | } |
20 | |
21 | // The `Stream` trait isn't stable, so the impl isn't public. |
22 | pub(crate) struct ImplStream(Body); |
23 | |
24 | enum Inner { |
25 | Reusable(Bytes), |
26 | Streaming { |
27 | body: Pin< |
28 | Box< |
29 | dyn HttpBody<Data = Bytes, Error = Box<dyn std::error::Error + Send + Sync>> |
30 | + Send |
31 | + Sync, |
32 | >, |
33 | >, |
34 | timeout: Option<Pin<Box<Sleep>>>, |
35 | }, |
36 | } |
37 | |
38 | pin_project! { |
39 | struct WrapStream<S> { |
40 | #[pin] |
41 | inner: S, |
42 | } |
43 | } |
44 | |
45 | struct WrapHyper(hyper::Body); |
46 | |
47 | impl Body { |
48 | /// Returns a reference to the internal data of the `Body`. |
49 | /// |
50 | /// `None` is returned, if the underlying data is a stream. |
51 | pub fn as_bytes(&self) -> Option<&[u8]> { |
52 | match &self.inner { |
53 | Inner::Reusable(bytes) => Some(bytes.as_ref()), |
54 | Inner::Streaming { .. } => None, |
55 | } |
56 | } |
57 | |
58 | /// Wrap a futures `Stream` in a box inside `Body`. |
59 | /// |
60 | /// # Example |
61 | /// |
62 | /// ``` |
63 | /// # use reqwest::Body; |
64 | /// # use futures_util; |
65 | /// # fn main() { |
66 | /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![ |
67 | /// Ok("hello"), |
68 | /// Ok(" "), |
69 | /// Ok("world"), |
70 | /// ]; |
71 | /// |
72 | /// let stream = futures_util::stream::iter(chunks); |
73 | /// |
74 | /// let body = Body::wrap_stream(stream); |
75 | /// # } |
76 | /// ``` |
77 | /// |
78 | /// # Optional |
79 | /// |
80 | /// This requires the `stream` feature to be enabled. |
81 | #[cfg (feature = "stream" )] |
82 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
83 | pub fn wrap_stream<S>(stream: S) -> Body |
84 | where |
85 | S: futures_core::stream::TryStream + Send + Sync + 'static, |
86 | S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
87 | Bytes: From<S::Ok>, |
88 | { |
89 | Body::stream(stream) |
90 | } |
91 | |
92 | pub(crate) fn stream<S>(stream: S) -> Body |
93 | where |
94 | S: futures_core::stream::TryStream + Send + Sync + 'static, |
95 | S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
96 | Bytes: From<S::Ok>, |
97 | { |
98 | use futures_util::TryStreamExt; |
99 | |
100 | let body = Box::pin(WrapStream { |
101 | inner: stream.map_ok(Bytes::from).map_err(Into::into), |
102 | }); |
103 | Body { |
104 | inner: Inner::Streaming { |
105 | body, |
106 | timeout: None, |
107 | }, |
108 | } |
109 | } |
110 | |
111 | pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body { |
112 | Body { |
113 | inner: Inner::Streaming { |
114 | body: Box::pin(WrapHyper(body)), |
115 | timeout, |
116 | }, |
117 | } |
118 | } |
119 | |
120 | #[cfg (feature = "blocking" )] |
121 | pub(crate) fn wrap(body: hyper::Body) -> Body { |
122 | Body { |
123 | inner: Inner::Streaming { |
124 | body: Box::pin(WrapHyper(body)), |
125 | timeout: None, |
126 | }, |
127 | } |
128 | } |
129 | |
130 | pub(crate) fn empty() -> Body { |
131 | Body::reusable(Bytes::new()) |
132 | } |
133 | |
134 | pub(crate) fn reusable(chunk: Bytes) -> Body { |
135 | Body { |
136 | inner: Inner::Reusable(chunk), |
137 | } |
138 | } |
139 | |
140 | pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) { |
141 | let reuse = match self.inner { |
142 | Inner::Reusable(ref chunk) => Some(chunk.clone()), |
143 | Inner::Streaming { .. } => None, |
144 | }; |
145 | |
146 | (reuse, self) |
147 | } |
148 | |
149 | pub(crate) fn try_clone(&self) -> Option<Body> { |
150 | match self.inner { |
151 | Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())), |
152 | Inner::Streaming { .. } => None, |
153 | } |
154 | } |
155 | |
156 | pub(crate) fn into_stream(self) -> ImplStream { |
157 | ImplStream(self) |
158 | } |
159 | |
160 | #[cfg (feature = "multipart" )] |
161 | pub(crate) fn content_length(&self) -> Option<u64> { |
162 | match self.inner { |
163 | Inner::Reusable(ref bytes) => Some(bytes.len() as u64), |
164 | Inner::Streaming { ref body, .. } => body.size_hint().exact(), |
165 | } |
166 | } |
167 | } |
168 | |
169 | impl From<hyper::Body> for Body { |
170 | #[inline ] |
171 | fn from(body: hyper::Body) -> Body { |
172 | Self { |
173 | inner: Inner::Streaming { |
174 | body: Box::pin(WrapHyper(body)), |
175 | timeout: None, |
176 | }, |
177 | } |
178 | } |
179 | } |
180 | |
181 | impl From<Bytes> for Body { |
182 | #[inline ] |
183 | fn from(bytes: Bytes) -> Body { |
184 | Body::reusable(chunk:bytes) |
185 | } |
186 | } |
187 | |
188 | impl From<Vec<u8>> for Body { |
189 | #[inline ] |
190 | fn from(vec: Vec<u8>) -> Body { |
191 | Body::reusable(chunk:vec.into()) |
192 | } |
193 | } |
194 | |
195 | impl From<&'static [u8]> for Body { |
196 | #[inline ] |
197 | fn from(s: &'static [u8]) -> Body { |
198 | Body::reusable(chunk:Bytes::from_static(bytes:s)) |
199 | } |
200 | } |
201 | |
202 | impl From<String> for Body { |
203 | #[inline ] |
204 | fn from(s: String) -> Body { |
205 | Body::reusable(chunk:s.into()) |
206 | } |
207 | } |
208 | |
209 | impl From<&'static str> for Body { |
210 | #[inline ] |
211 | fn from(s: &'static str) -> Body { |
212 | s.as_bytes().into() |
213 | } |
214 | } |
215 | |
216 | #[cfg (feature = "stream" )] |
217 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
218 | impl From<File> for Body { |
219 | #[inline ] |
220 | fn from(file: File) -> Body { |
221 | Body::wrap_stream(ReaderStream::new(file)) |
222 | } |
223 | } |
224 | |
225 | impl fmt::Debug for Body { |
226 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
227 | f.debug_struct(name:"Body" ).finish() |
228 | } |
229 | } |
230 | |
231 | // ===== impl ImplStream ===== |
232 | |
233 | impl HttpBody for ImplStream { |
234 | type Data = Bytes; |
235 | type Error = crate::Error; |
236 | |
237 | fn poll_data( |
238 | mut self: Pin<&mut Self>, |
239 | cx: &mut Context, |
240 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
241 | let opt_try_chunk = match self.0.inner { |
242 | Inner::Streaming { |
243 | ref mut body, |
244 | ref mut timeout, |
245 | } => { |
246 | if let Some(ref mut timeout) = timeout { |
247 | if let Poll::Ready(()) = timeout.as_mut().poll(cx) { |
248 | return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); |
249 | } |
250 | } |
251 | futures_core::ready!(Pin::new(body).poll_data(cx)) |
252 | .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body)) |
253 | } |
254 | Inner::Reusable(ref mut bytes) => { |
255 | if bytes.is_empty() { |
256 | None |
257 | } else { |
258 | Some(Ok(std::mem::replace(bytes, Bytes::new()))) |
259 | } |
260 | } |
261 | }; |
262 | |
263 | Poll::Ready(opt_try_chunk) |
264 | } |
265 | |
266 | fn poll_trailers( |
267 | self: Pin<&mut Self>, |
268 | _cx: &mut Context, |
269 | ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { |
270 | Poll::Ready(Ok(None)) |
271 | } |
272 | |
273 | fn is_end_stream(&self) -> bool { |
274 | match self.0.inner { |
275 | Inner::Streaming { ref body, .. } => body.is_end_stream(), |
276 | Inner::Reusable(ref bytes) => bytes.is_empty(), |
277 | } |
278 | } |
279 | |
280 | fn size_hint(&self) -> http_body::SizeHint { |
281 | match self.0.inner { |
282 | Inner::Streaming { ref body, .. } => body.size_hint(), |
283 | Inner::Reusable(ref bytes) => { |
284 | let mut hint = http_body::SizeHint::default(); |
285 | hint.set_exact(bytes.len() as u64); |
286 | hint |
287 | } |
288 | } |
289 | } |
290 | } |
291 | |
292 | impl Stream for ImplStream { |
293 | type Item = Result<Bytes, crate::Error>; |
294 | |
295 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { |
296 | self.poll_data(cx) |
297 | } |
298 | } |
299 | |
300 | // ===== impl WrapStream ===== |
301 | |
302 | impl<S, D, E> HttpBody for WrapStream<S> |
303 | where |
304 | S: Stream<Item = Result<D, E>>, |
305 | D: Into<Bytes>, |
306 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
307 | { |
308 | type Data = Bytes; |
309 | type Error = E; |
310 | |
311 | fn poll_data( |
312 | self: Pin<&mut Self>, |
313 | cx: &mut Context, |
314 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
315 | let item: Option = futures_core::ready!(self.project().inner.poll_next(cx)?); |
316 | |
317 | Poll::Ready(item.map(|val: D| Ok(val.into()))) |
318 | } |
319 | |
320 | fn poll_trailers( |
321 | self: Pin<&mut Self>, |
322 | _cx: &mut Context, |
323 | ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { |
324 | Poll::Ready(Ok(None)) |
325 | } |
326 | } |
327 | |
328 | // ===== impl WrapHyper ===== |
329 | |
330 | impl HttpBody for WrapHyper { |
331 | type Data = Bytes; |
332 | type Error = Box<dyn std::error::Error + Send + Sync>; |
333 | |
334 | fn poll_data( |
335 | mut self: Pin<&mut Self>, |
336 | cx: &mut Context, |
337 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
338 | // safe pin projection |
339 | Pin::new(&mut self.0) |
340 | .poll_data(cx) |
341 | .map(|opt| opt.map(|res| res.map_err(Into::into))) |
342 | } |
343 | |
344 | fn poll_trailers( |
345 | self: Pin<&mut Self>, |
346 | _cx: &mut Context, |
347 | ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { |
348 | Poll::Ready(Ok(None)) |
349 | } |
350 | |
351 | fn is_end_stream(&self) -> bool { |
352 | self.0.is_end_stream() |
353 | } |
354 | |
355 | fn size_hint(&self) -> http_body::SizeHint { |
356 | HttpBody::size_hint(&self.0) |
357 | } |
358 | } |
359 | |
360 | #[cfg (test)] |
361 | mod tests { |
362 | use super::Body; |
363 | |
364 | #[test ] |
365 | fn test_as_bytes() { |
366 | let test_data = b"Test body" ; |
367 | let body = Body::from(&test_data[..]); |
368 | assert_eq!(body.as_bytes(), Some(&test_data[..])); |
369 | } |
370 | } |
371 | |