1 | use std::fmt; |
2 | use std::future::Future; |
3 | use std::pin::Pin; |
4 | use std::task::{ready, Context, Poll}; |
5 | use std::time::Duration; |
6 | |
7 | use bytes::Bytes; |
8 | use http_body::Body as HttpBody; |
9 | use http_body_util::combinators::BoxBody; |
10 | //use sync_wrapper::SyncWrapper; |
11 | use pin_project_lite::pin_project; |
12 | #[cfg (feature = "stream" )] |
13 | use tokio::fs::File; |
14 | use tokio::time::Sleep; |
15 | #[cfg (feature = "stream" )] |
16 | use tokio_util::io::ReaderStream; |
17 | |
18 | /// An asynchronous request body. |
19 | pub struct Body { |
20 | inner: Inner, |
21 | } |
22 | |
23 | enum Inner { |
24 | Reusable(Bytes), |
25 | Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>), |
26 | } |
27 | |
28 | pin_project! { |
29 | /// A body with a total timeout. |
30 | /// |
31 | /// The timeout does not reset upon each chunk, but rather requires the whole |
32 | /// body be streamed before the deadline is reached. |
33 | pub(crate) struct TotalTimeoutBody<B> { |
34 | #[pin] |
35 | inner: B, |
36 | timeout: Pin<Box<Sleep>>, |
37 | } |
38 | } |
39 | |
40 | pin_project! { |
41 | pub(crate) struct ReadTimeoutBody<B> { |
42 | #[pin] |
43 | inner: B, |
44 | #[pin] |
45 | sleep: Option<Sleep>, |
46 | timeout: Duration, |
47 | } |
48 | } |
49 | |
50 | /// Converts any `impl Body` into a `impl Stream` of just its DATA frames. |
51 | #[cfg (any(feature = "stream" , feature = "multipart" ,))] |
52 | pub(crate) struct DataStream<B>(pub(crate) B); |
53 | |
54 | impl Body { |
55 | /// Returns a reference to the internal data of the `Body`. |
56 | /// |
57 | /// `None` is returned, if the underlying data is a stream. |
58 | pub fn as_bytes(&self) -> Option<&[u8]> { |
59 | match &self.inner { |
60 | Inner::Reusable(bytes) => Some(bytes.as_ref()), |
61 | Inner::Streaming(..) => None, |
62 | } |
63 | } |
64 | |
65 | /// Wrap a futures `Stream` in a box inside `Body`. |
66 | /// |
67 | /// # Example |
68 | /// |
69 | /// ``` |
70 | /// # use reqwest::Body; |
71 | /// # use futures_util; |
72 | /// # fn main() { |
73 | /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![ |
74 | /// Ok("hello" ), |
75 | /// Ok(" " ), |
76 | /// Ok("world" ), |
77 | /// ]; |
78 | /// |
79 | /// let stream = futures_util::stream::iter(chunks); |
80 | /// |
81 | /// let body = Body::wrap_stream(stream); |
82 | /// # } |
83 | /// ``` |
84 | /// |
85 | /// # Optional |
86 | /// |
87 | /// This requires the `stream` feature to be enabled. |
88 | #[cfg (feature = "stream" )] |
89 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
90 | pub fn wrap_stream<S>(stream: S) -> Body |
91 | where |
92 | S: futures_core::stream::TryStream + Send + 'static, |
93 | S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
94 | Bytes: From<S::Ok>, |
95 | { |
96 | Body::stream(stream) |
97 | } |
98 | |
99 | #[cfg (any(feature = "stream" , feature = "multipart" , feature = "blocking" ))] |
100 | pub(crate) fn stream<S>(stream: S) -> Body |
101 | where |
102 | S: futures_core::stream::TryStream + Send + 'static, |
103 | S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
104 | Bytes: From<S::Ok>, |
105 | { |
106 | use futures_util::TryStreamExt; |
107 | use http_body::Frame; |
108 | use http_body_util::StreamBody; |
109 | |
110 | let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new( |
111 | stream |
112 | .map_ok(|d| Frame::data(Bytes::from(d))) |
113 | .map_err(Into::into), |
114 | ))); |
115 | Body { |
116 | inner: Inner::Streaming(body), |
117 | } |
118 | } |
119 | |
120 | pub(crate) fn empty() -> Body { |
121 | Body::reusable(Bytes::new()) |
122 | } |
123 | |
124 | pub(crate) fn reusable(chunk: Bytes) -> Body { |
125 | Body { |
126 | inner: Inner::Reusable(chunk), |
127 | } |
128 | } |
129 | |
130 | /// Wrap a [`HttpBody`] in a box inside `Body`. |
131 | /// |
132 | /// # Example |
133 | /// |
134 | /// ``` |
135 | /// # use reqwest::Body; |
136 | /// # use futures_util; |
137 | /// # fn main() { |
138 | /// let content = "hello,world!" .to_string(); |
139 | /// |
140 | /// let body = Body::wrap(content); |
141 | /// # } |
142 | /// ``` |
143 | pub fn wrap<B>(inner: B) -> Body |
144 | where |
145 | B: HttpBody + Send + Sync + 'static, |
146 | B::Data: Into<Bytes>, |
147 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
148 | { |
149 | use http_body_util::BodyExt; |
150 | |
151 | let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed(); |
152 | |
153 | Body { |
154 | inner: Inner::Streaming(boxed), |
155 | } |
156 | } |
157 | |
158 | pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) { |
159 | let reuse = match self.inner { |
160 | Inner::Reusable(ref chunk) => Some(chunk.clone()), |
161 | Inner::Streaming { .. } => None, |
162 | }; |
163 | |
164 | (reuse, self) |
165 | } |
166 | |
167 | pub(crate) fn try_clone(&self) -> Option<Body> { |
168 | match self.inner { |
169 | Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())), |
170 | Inner::Streaming { .. } => None, |
171 | } |
172 | } |
173 | |
174 | #[cfg (feature = "multipart" )] |
175 | pub(crate) fn into_stream(self) -> DataStream<Body> { |
176 | DataStream(self) |
177 | } |
178 | |
179 | #[cfg (feature = "multipart" )] |
180 | pub(crate) fn content_length(&self) -> Option<u64> { |
181 | match self.inner { |
182 | Inner::Reusable(ref bytes) => Some(bytes.len() as u64), |
183 | Inner::Streaming(ref body) => body.size_hint().exact(), |
184 | } |
185 | } |
186 | } |
187 | |
188 | impl Default for Body { |
189 | #[inline ] |
190 | fn default() -> Body { |
191 | Body::empty() |
192 | } |
193 | } |
194 | |
195 | /* |
196 | impl From<hyper::Body> for Body { |
197 | #[inline] |
198 | fn from(body: hyper::Body) -> Body { |
199 | Self { |
200 | inner: Inner::Streaming { |
201 | body: Box::pin(WrapHyper(body)), |
202 | }, |
203 | } |
204 | } |
205 | } |
206 | */ |
207 | |
208 | impl From<Bytes> for Body { |
209 | #[inline ] |
210 | fn from(bytes: Bytes) -> Body { |
211 | Body::reusable(chunk:bytes) |
212 | } |
213 | } |
214 | |
215 | impl From<Vec<u8>> for Body { |
216 | #[inline ] |
217 | fn from(vec: Vec<u8>) -> Body { |
218 | Body::reusable(chunk:vec.into()) |
219 | } |
220 | } |
221 | |
222 | impl From<&'static [u8]> for Body { |
223 | #[inline ] |
224 | fn from(s: &'static [u8]) -> Body { |
225 | Body::reusable(chunk:Bytes::from_static(bytes:s)) |
226 | } |
227 | } |
228 | |
229 | impl From<String> for Body { |
230 | #[inline ] |
231 | fn from(s: String) -> Body { |
232 | Body::reusable(chunk:s.into()) |
233 | } |
234 | } |
235 | |
236 | impl From<&'static str> for Body { |
237 | #[inline ] |
238 | fn from(s: &'static str) -> Body { |
239 | s.as_bytes().into() |
240 | } |
241 | } |
242 | |
243 | #[cfg (feature = "stream" )] |
244 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
245 | impl From<File> for Body { |
246 | #[inline ] |
247 | fn from(file: File) -> Body { |
248 | Body::wrap_stream(ReaderStream::new(reader:file)) |
249 | } |
250 | } |
251 | |
252 | impl fmt::Debug for Body { |
253 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
254 | f.debug_struct(name:"Body" ).finish() |
255 | } |
256 | } |
257 | |
258 | impl HttpBody for Body { |
259 | type Data = Bytes; |
260 | type Error = crate::Error; |
261 | |
262 | fn poll_frame( |
263 | mut self: Pin<&mut Self>, |
264 | cx: &mut Context, |
265 | ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { |
266 | match self.inner { |
267 | Inner::Reusable(ref mut bytes) => { |
268 | let out = bytes.split_off(0); |
269 | if out.is_empty() { |
270 | Poll::Ready(None) |
271 | } else { |
272 | Poll::Ready(Some(Ok(hyper::body::Frame::data(out)))) |
273 | } |
274 | } |
275 | Inner::Streaming(ref mut body) => Poll::Ready( |
276 | ready!(Pin::new(body).poll_frame(cx)) |
277 | .map(|opt_chunk| opt_chunk.map_err(crate::error::body)), |
278 | ), |
279 | } |
280 | } |
281 | |
282 | fn size_hint(&self) -> http_body::SizeHint { |
283 | match self.inner { |
284 | Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64), |
285 | Inner::Streaming(ref body) => body.size_hint(), |
286 | } |
287 | } |
288 | |
289 | fn is_end_stream(&self) -> bool { |
290 | match self.inner { |
291 | Inner::Reusable(ref bytes) => bytes.is_empty(), |
292 | Inner::Streaming(ref body) => body.is_end_stream(), |
293 | } |
294 | } |
295 | } |
296 | |
297 | // ===== impl TotalTimeoutBody ===== |
298 | |
299 | pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> { |
300 | TotalTimeoutBody { |
301 | inner: body, |
302 | timeout, |
303 | } |
304 | } |
305 | |
306 | pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> { |
307 | ReadTimeoutBody { |
308 | inner: body, |
309 | sleep: None, |
310 | timeout, |
311 | } |
312 | } |
313 | |
314 | impl<B> hyper::body::Body for TotalTimeoutBody<B> |
315 | where |
316 | B: hyper::body::Body, |
317 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
318 | { |
319 | type Data = B::Data; |
320 | type Error = crate::Error; |
321 | |
322 | fn poll_frame( |
323 | self: Pin<&mut Self>, |
324 | cx: &mut Context, |
325 | ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { |
326 | let this = self.project(); |
327 | if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) { |
328 | return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); |
329 | } |
330 | Poll::Ready( |
331 | ready!(this.inner.poll_frame(cx)) |
332 | .map(|opt_chunk| opt_chunk.map_err(crate::error::body)), |
333 | ) |
334 | } |
335 | |
336 | #[inline ] |
337 | fn size_hint(&self) -> http_body::SizeHint { |
338 | self.inner.size_hint() |
339 | } |
340 | |
341 | #[inline ] |
342 | fn is_end_stream(&self) -> bool { |
343 | self.inner.is_end_stream() |
344 | } |
345 | } |
346 | |
347 | impl<B> hyper::body::Body for ReadTimeoutBody<B> |
348 | where |
349 | B: hyper::body::Body, |
350 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
351 | { |
352 | type Data = B::Data; |
353 | type Error = crate::Error; |
354 | |
355 | fn poll_frame( |
356 | self: Pin<&mut Self>, |
357 | cx: &mut Context, |
358 | ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { |
359 | let mut this = self.project(); |
360 | |
361 | // Start the `Sleep` if not active. |
362 | let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() { |
363 | some |
364 | } else { |
365 | this.sleep.set(Some(tokio::time::sleep(*this.timeout))); |
366 | this.sleep.as_mut().as_pin_mut().unwrap() |
367 | }; |
368 | |
369 | // Error if the timeout has expired. |
370 | if let Poll::Ready(()) = sleep_pinned.poll(cx) { |
371 | return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); |
372 | } |
373 | |
374 | let item = ready!(this.inner.poll_frame(cx)) |
375 | .map(|opt_chunk| opt_chunk.map_err(crate::error::body)); |
376 | // a ready frame means timeout is reset |
377 | this.sleep.set(None); |
378 | Poll::Ready(item) |
379 | } |
380 | |
381 | #[inline ] |
382 | fn size_hint(&self) -> http_body::SizeHint { |
383 | self.inner.size_hint() |
384 | } |
385 | |
386 | #[inline ] |
387 | fn is_end_stream(&self) -> bool { |
388 | self.inner.is_end_stream() |
389 | } |
390 | } |
391 | |
392 | pub(crate) type ResponseBody = |
393 | http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>; |
394 | |
395 | pub(crate) fn boxed<B>(body: B) -> ResponseBody |
396 | where |
397 | B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static, |
398 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
399 | { |
400 | use http_body_util::BodyExt; |
401 | |
402 | body.map_err(box_err).boxed() |
403 | } |
404 | |
405 | pub(crate) fn response<B>( |
406 | body: B, |
407 | deadline: Option<Pin<Box<Sleep>>>, |
408 | read_timeout: Option<Duration>, |
409 | ) -> ResponseBody |
410 | where |
411 | B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static, |
412 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
413 | { |
414 | use http_body_util::BodyExt; |
415 | |
416 | match (deadline, read_timeout) { |
417 | (Some(total: Pin>), Some(read: Duration)) => { |
418 | let body: MapErr, …> = with_read_timeout(body, timeout:read).map_err(box_err); |
419 | total_timeout(body, timeout:total).map_err(box_err).boxed() |
420 | } |
421 | (Some(total: Pin>), None) => total_timeout(body, timeout:total).map_err(box_err).boxed(), |
422 | (None, Some(read: Duration)) => with_read_timeout(body, timeout:read).map_err(box_err).boxed(), |
423 | (None, None) => body.map_err(box_err).boxed(), |
424 | } |
425 | } |
426 | |
427 | fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync> |
428 | where |
429 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
430 | { |
431 | err.into() |
432 | } |
433 | |
434 | // ===== impl DataStream ===== |
435 | |
436 | #[cfg (any(feature = "stream" , feature = "multipart" ,))] |
437 | impl<B> futures_core::Stream for DataStream<B> |
438 | where |
439 | B: HttpBody<Data = Bytes> + Unpin, |
440 | { |
441 | type Item = Result<Bytes, B::Error>; |
442 | |
443 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { |
444 | loop { |
445 | return match ready!(Pin::new(&mut self.0).poll_frame(cx)) { |
446 | Some(Ok(frame: Frame)) => { |
447 | // skip non-data frames |
448 | if let Ok(buf: Bytes) = frame.into_data() { |
449 | Poll::Ready(Some(Ok(buf))) |
450 | } else { |
451 | continue; |
452 | } |
453 | } |
454 | Some(Err(err: ::Error)) => Poll::Ready(Some(Err(err))), |
455 | None => Poll::Ready(None), |
456 | }; |
457 | } |
458 | } |
459 | } |
460 | |
461 | // ===== impl IntoBytesBody ===== |
462 | |
463 | pin_project! { |
464 | struct IntoBytesBody<B> { |
465 | #[pin] |
466 | inner: B, |
467 | } |
468 | } |
469 | |
470 | // We can't use `map_frame()` because that loses the hint data (for good reason). |
471 | // But we aren't transforming the data. |
472 | impl<B> hyper::body::Body for IntoBytesBody<B> |
473 | where |
474 | B: hyper::body::Body, |
475 | B::Data: Into<Bytes>, |
476 | { |
477 | type Data = Bytes; |
478 | type Error = B::Error; |
479 | |
480 | fn poll_frame( |
481 | self: Pin<&mut Self>, |
482 | cx: &mut Context, |
483 | ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { |
484 | match ready!(self.project().inner.poll_frame(cx)) { |
485 | Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))), |
486 | Some(Err(e)) => Poll::Ready(Some(Err(e))), |
487 | None => Poll::Ready(None), |
488 | } |
489 | } |
490 | |
491 | #[inline ] |
492 | fn size_hint(&self) -> http_body::SizeHint { |
493 | self.inner.size_hint() |
494 | } |
495 | |
496 | #[inline ] |
497 | fn is_end_stream(&self) -> bool { |
498 | self.inner.is_end_stream() |
499 | } |
500 | } |
501 | |
502 | #[cfg (test)] |
503 | mod tests { |
504 | use http_body::Body as _; |
505 | |
506 | use super::Body; |
507 | |
508 | #[test ] |
509 | fn test_as_bytes() { |
510 | let test_data = b"Test body" ; |
511 | let body = Body::from(&test_data[..]); |
512 | assert_eq!(body.as_bytes(), Some(&test_data[..])); |
513 | } |
514 | |
515 | #[test ] |
516 | fn body_exact_length() { |
517 | let empty_body = Body::empty(); |
518 | assert!(empty_body.is_end_stream()); |
519 | assert_eq!(empty_body.size_hint().exact(), Some(0)); |
520 | |
521 | let bytes_body = Body::reusable("abc" .into()); |
522 | assert!(!bytes_body.is_end_stream()); |
523 | assert_eq!(bytes_body.size_hint().exact(), Some(3)); |
524 | |
525 | // can delegate even when wrapped |
526 | let stream_body = Body::wrap(empty_body); |
527 | assert!(stream_body.is_end_stream()); |
528 | assert_eq!(stream_body.size_hint().exact(), Some(0)); |
529 | } |
530 | } |
531 | |