1 | use std::fmt; |
2 | use std::future::Future; |
3 | use std::pin::Pin; |
4 | use std::task::{Context, Poll}; |
5 | |
6 | use bytes::Bytes; |
7 | use futures_core::Stream; |
8 | use http_body::Body as HttpBody; |
9 | use pin_project_lite::pin_project; |
10 | use sync_wrapper::SyncWrapper; |
11 | #[cfg (feature = "stream" )] |
12 | use tokio::fs::File; |
13 | use tokio::time::Sleep; |
14 | #[cfg (feature = "stream" )] |
15 | use tokio_util::io::ReaderStream; |
16 | |
17 | /// An asynchronous request body. |
18 | pub struct Body { |
19 | inner: Inner, |
20 | } |
21 | |
22 | // The `Stream` trait isn't stable, so the impl isn't public. |
23 | pub(crate) struct ImplStream(Body); |
24 | |
25 | enum Inner { |
26 | Reusable(Bytes), |
27 | Streaming { |
28 | body: Pin< |
29 | Box< |
30 | dyn HttpBody<Data = Bytes, Error = Box<dyn std::error::Error + Send + Sync>> |
31 | + Send |
32 | + Sync, |
33 | >, |
34 | >, |
35 | timeout: Option<Pin<Box<Sleep>>>, |
36 | }, |
37 | } |
38 | |
39 | pin_project! { |
40 | struct WrapStream<S> { |
41 | #[pin] |
42 | inner: SyncWrapper<S>, |
43 | } |
44 | } |
45 | |
46 | struct WrapHyper(hyper::Body); |
47 | |
48 | impl Body { |
49 | /// Returns a reference to the internal data of the `Body`. |
50 | /// |
51 | /// `None` is returned, if the underlying data is a stream. |
52 | pub fn as_bytes(&self) -> Option<&[u8]> { |
53 | match &self.inner { |
54 | Inner::Reusable(bytes) => Some(bytes.as_ref()), |
55 | Inner::Streaming { .. } => None, |
56 | } |
57 | } |
58 | |
59 | /// Wrap a futures `Stream` in a box inside `Body`. |
60 | /// |
61 | /// # Example |
62 | /// |
63 | /// ``` |
64 | /// # use reqwest::Body; |
65 | /// # use futures_util; |
66 | /// # fn main() { |
67 | /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![ |
68 | /// Ok("hello" ), |
69 | /// Ok(" " ), |
70 | /// Ok("world" ), |
71 | /// ]; |
72 | /// |
73 | /// let stream = futures_util::stream::iter(chunks); |
74 | /// |
75 | /// let body = Body::wrap_stream(stream); |
76 | /// # } |
77 | /// ``` |
78 | /// |
79 | /// # Optional |
80 | /// |
81 | /// This requires the `stream` feature to be enabled. |
82 | #[cfg (feature = "stream" )] |
83 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
84 | pub fn wrap_stream<S>(stream: S) -> Body |
85 | where |
86 | S: futures_core::stream::TryStream + Send + 'static, |
87 | S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
88 | Bytes: From<S::Ok>, |
89 | { |
90 | Body::stream(stream) |
91 | } |
92 | |
93 | pub(crate) fn stream<S>(stream: S) -> Body |
94 | where |
95 | S: futures_core::stream::TryStream + Send + 'static, |
96 | S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
97 | Bytes: From<S::Ok>, |
98 | { |
99 | use futures_util::TryStreamExt; |
100 | |
101 | let body = Box::pin(WrapStream { |
102 | inner: SyncWrapper::new(stream.map_ok(Bytes::from).map_err(Into::into)), |
103 | }); |
104 | Body { |
105 | inner: Inner::Streaming { |
106 | body, |
107 | timeout: None, |
108 | }, |
109 | } |
110 | } |
111 | |
112 | pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body { |
113 | Body { |
114 | inner: Inner::Streaming { |
115 | body: Box::pin(WrapHyper(body)), |
116 | timeout, |
117 | }, |
118 | } |
119 | } |
120 | |
121 | #[cfg (feature = "blocking" )] |
122 | pub(crate) fn wrap(body: hyper::Body) -> Body { |
123 | Body { |
124 | inner: Inner::Streaming { |
125 | body: Box::pin(WrapHyper(body)), |
126 | timeout: None, |
127 | }, |
128 | } |
129 | } |
130 | |
131 | pub(crate) fn empty() -> Body { |
132 | Body::reusable(Bytes::new()) |
133 | } |
134 | |
135 | pub(crate) fn reusable(chunk: Bytes) -> Body { |
136 | Body { |
137 | inner: Inner::Reusable(chunk), |
138 | } |
139 | } |
140 | |
141 | pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) { |
142 | let reuse = match self.inner { |
143 | Inner::Reusable(ref chunk) => Some(chunk.clone()), |
144 | Inner::Streaming { .. } => None, |
145 | }; |
146 | |
147 | (reuse, self) |
148 | } |
149 | |
150 | pub(crate) fn try_clone(&self) -> Option<Body> { |
151 | match self.inner { |
152 | Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())), |
153 | Inner::Streaming { .. } => None, |
154 | } |
155 | } |
156 | |
157 | pub(crate) fn into_stream(self) -> ImplStream { |
158 | ImplStream(self) |
159 | } |
160 | |
161 | #[cfg (feature = "multipart" )] |
162 | pub(crate) fn content_length(&self) -> Option<u64> { |
163 | match self.inner { |
164 | Inner::Reusable(ref bytes) => Some(bytes.len() as u64), |
165 | Inner::Streaming { ref body, .. } => body.size_hint().exact(), |
166 | } |
167 | } |
168 | } |
169 | |
170 | impl From<hyper::Body> for Body { |
171 | #[inline ] |
172 | fn from(body: hyper::Body) -> Body { |
173 | Self { |
174 | inner: Inner::Streaming { |
175 | body: Box::pin(WrapHyper(body)), |
176 | timeout: None, |
177 | }, |
178 | } |
179 | } |
180 | } |
181 | |
182 | impl From<Bytes> for Body { |
183 | #[inline ] |
184 | fn from(bytes: Bytes) -> Body { |
185 | Body::reusable(chunk:bytes) |
186 | } |
187 | } |
188 | |
189 | impl From<Vec<u8>> for Body { |
190 | #[inline ] |
191 | fn from(vec: Vec<u8>) -> Body { |
192 | Body::reusable(chunk:vec.into()) |
193 | } |
194 | } |
195 | |
196 | impl From<&'static [u8]> for Body { |
197 | #[inline ] |
198 | fn from(s: &'static [u8]) -> Body { |
199 | Body::reusable(chunk:Bytes::from_static(bytes:s)) |
200 | } |
201 | } |
202 | |
203 | impl From<String> for Body { |
204 | #[inline ] |
205 | fn from(s: String) -> Body { |
206 | Body::reusable(chunk:s.into()) |
207 | } |
208 | } |
209 | |
210 | impl From<&'static str> for Body { |
211 | #[inline ] |
212 | fn from(s: &'static str) -> Body { |
213 | s.as_bytes().into() |
214 | } |
215 | } |
216 | |
217 | #[cfg (feature = "stream" )] |
218 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
219 | impl From<File> for Body { |
220 | #[inline ] |
221 | fn from(file: File) -> Body { |
222 | Body::wrap_stream(ReaderStream::new(reader:file)) |
223 | } |
224 | } |
225 | |
226 | impl fmt::Debug for Body { |
227 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
228 | f.debug_struct(name:"Body" ).finish() |
229 | } |
230 | } |
231 | |
232 | // ===== impl ImplStream ===== |
233 | |
234 | impl HttpBody for ImplStream { |
235 | type Data = Bytes; |
236 | type Error = crate::Error; |
237 | |
238 | fn poll_data( |
239 | mut self: Pin<&mut Self>, |
240 | cx: &mut Context, |
241 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
242 | let opt_try_chunk = match self.0.inner { |
243 | Inner::Streaming { |
244 | ref mut body, |
245 | ref mut timeout, |
246 | } => { |
247 | if let Some(ref mut timeout) = timeout { |
248 | if let Poll::Ready(()) = timeout.as_mut().poll(cx) { |
249 | return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); |
250 | } |
251 | } |
252 | futures_core::ready!(Pin::new(body).poll_data(cx)) |
253 | .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body)) |
254 | } |
255 | Inner::Reusable(ref mut bytes) => { |
256 | if bytes.is_empty() { |
257 | None |
258 | } else { |
259 | Some(Ok(std::mem::replace(bytes, Bytes::new()))) |
260 | } |
261 | } |
262 | }; |
263 | |
264 | Poll::Ready(opt_try_chunk) |
265 | } |
266 | |
267 | fn poll_trailers( |
268 | self: Pin<&mut Self>, |
269 | _cx: &mut Context, |
270 | ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { |
271 | Poll::Ready(Ok(None)) |
272 | } |
273 | |
274 | fn is_end_stream(&self) -> bool { |
275 | match self.0.inner { |
276 | Inner::Streaming { ref body, .. } => body.is_end_stream(), |
277 | Inner::Reusable(ref bytes) => bytes.is_empty(), |
278 | } |
279 | } |
280 | |
281 | fn size_hint(&self) -> http_body::SizeHint { |
282 | match self.0.inner { |
283 | Inner::Streaming { ref body, .. } => body.size_hint(), |
284 | Inner::Reusable(ref bytes) => { |
285 | let mut hint = http_body::SizeHint::default(); |
286 | hint.set_exact(bytes.len() as u64); |
287 | hint |
288 | } |
289 | } |
290 | } |
291 | } |
292 | |
293 | impl Stream for ImplStream { |
294 | type Item = Result<Bytes, crate::Error>; |
295 | |
296 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { |
297 | self.poll_data(cx) |
298 | } |
299 | } |
300 | |
301 | // ===== impl WrapStream ===== |
302 | |
303 | impl<S, D, E> HttpBody for WrapStream<S> |
304 | where |
305 | S: Stream<Item = Result<D, E>>, |
306 | D: Into<Bytes>, |
307 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
308 | { |
309 | type Data = Bytes; |
310 | type Error = E; |
311 | |
312 | fn poll_data( |
313 | self: Pin<&mut Self>, |
314 | cx: &mut Context, |
315 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
316 | let item: Option = futures_core::ready!(self.project().inner.get_pin_mut().poll_next(cx)?); |
317 | |
318 | Poll::Ready(item.map(|val: D| Ok(val.into()))) |
319 | } |
320 | |
321 | fn poll_trailers( |
322 | self: Pin<&mut Self>, |
323 | _cx: &mut Context, |
324 | ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { |
325 | Poll::Ready(Ok(None)) |
326 | } |
327 | } |
328 | |
329 | // ===== impl WrapHyper ===== |
330 | |
331 | impl HttpBody for WrapHyper { |
332 | type Data = Bytes; |
333 | type Error = Box<dyn std::error::Error + Send + Sync>; |
334 | |
335 | fn poll_data( |
336 | mut self: Pin<&mut Self>, |
337 | cx: &mut Context, |
338 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
339 | // safe pin projection |
340 | Pin::new(&mut self.0) |
341 | .poll_data(cx) |
342 | .map(|opt| opt.map(|res| res.map_err(Into::into))) |
343 | } |
344 | |
345 | fn poll_trailers( |
346 | self: Pin<&mut Self>, |
347 | _cx: &mut Context, |
348 | ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { |
349 | Poll::Ready(Ok(None)) |
350 | } |
351 | |
352 | fn is_end_stream(&self) -> bool { |
353 | self.0.is_end_stream() |
354 | } |
355 | |
356 | fn size_hint(&self) -> http_body::SizeHint { |
357 | HttpBody::size_hint(&self.0) |
358 | } |
359 | } |
360 | |
361 | #[cfg (test)] |
362 | mod tests { |
363 | use super::Body; |
364 | |
365 | #[test ] |
366 | fn test_as_bytes() { |
367 | let test_data = b"Test body" ; |
368 | let body = Body::from(&test_data[..]); |
369 | assert_eq!(body.as_bytes(), Some(&test_data[..])); |
370 | } |
371 | } |
372 | |