| 1 | use std::fmt; |
| 2 | use std::future::Future; |
| 3 | use std::pin::Pin; |
| 4 | use std::task::{ready, Context, Poll}; |
| 5 | use std::time::Duration; |
| 6 | |
| 7 | use bytes::Bytes; |
| 8 | use http_body::Body as HttpBody; |
| 9 | use http_body_util::combinators::BoxBody; |
| 10 | //use sync_wrapper::SyncWrapper; |
| 11 | use pin_project_lite::pin_project; |
| 12 | #[cfg (feature = "stream" )] |
| 13 | use tokio::fs::File; |
| 14 | use tokio::time::Sleep; |
| 15 | #[cfg (feature = "stream" )] |
| 16 | use tokio_util::io::ReaderStream; |
| 17 | |
| 18 | /// An asynchronous request body. |
| 19 | pub struct Body { |
| 20 | inner: Inner, |
| 21 | } |
| 22 | |
| 23 | enum Inner { |
| 24 | Reusable(Bytes), |
| 25 | Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>), |
| 26 | } |
| 27 | |
| 28 | pin_project! { |
| 29 | /// A body with a total timeout. |
| 30 | /// |
| 31 | /// The timeout does not reset upon each chunk, but rather requires the whole |
| 32 | /// body be streamed before the deadline is reached. |
| 33 | pub(crate) struct TotalTimeoutBody<B> { |
| 34 | #[pin] |
| 35 | inner: B, |
| 36 | timeout: Pin<Box<Sleep>>, |
| 37 | } |
| 38 | } |
| 39 | |
| 40 | pin_project! { |
| 41 | pub(crate) struct ReadTimeoutBody<B> { |
| 42 | #[pin] |
| 43 | inner: B, |
| 44 | #[pin] |
| 45 | sleep: Option<Sleep>, |
| 46 | timeout: Duration, |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | /// Converts any `impl Body` into a `impl Stream` of just its DATA frames. |
| 51 | #[cfg (any(feature = "stream" , feature = "multipart" ,))] |
| 52 | pub(crate) struct DataStream<B>(pub(crate) B); |
| 53 | |
| 54 | impl Body { |
| 55 | /// Returns a reference to the internal data of the `Body`. |
| 56 | /// |
| 57 | /// `None` is returned, if the underlying data is a stream. |
| 58 | pub fn as_bytes(&self) -> Option<&[u8]> { |
| 59 | match &self.inner { |
| 60 | Inner::Reusable(bytes) => Some(bytes.as_ref()), |
| 61 | Inner::Streaming(..) => None, |
| 62 | } |
| 63 | } |
| 64 | |
| 65 | /// Wrap a futures `Stream` in a box inside `Body`. |
| 66 | /// |
| 67 | /// # Example |
| 68 | /// |
| 69 | /// ``` |
| 70 | /// # use reqwest::Body; |
| 71 | /// # use futures_util; |
| 72 | /// # fn main() { |
| 73 | /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![ |
| 74 | /// Ok("hello" ), |
| 75 | /// Ok(" " ), |
| 76 | /// Ok("world" ), |
| 77 | /// ]; |
| 78 | /// |
| 79 | /// let stream = futures_util::stream::iter(chunks); |
| 80 | /// |
| 81 | /// let body = Body::wrap_stream(stream); |
| 82 | /// # } |
| 83 | /// ``` |
| 84 | /// |
| 85 | /// # Optional |
| 86 | /// |
| 87 | /// This requires the `stream` feature to be enabled. |
| 88 | #[cfg (feature = "stream" )] |
| 89 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
| 90 | pub fn wrap_stream<S>(stream: S) -> Body |
| 91 | where |
| 92 | S: futures_core::stream::TryStream + Send + 'static, |
| 93 | S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 94 | Bytes: From<S::Ok>, |
| 95 | { |
| 96 | Body::stream(stream) |
| 97 | } |
| 98 | |
| 99 | #[cfg (any(feature = "stream" , feature = "multipart" , feature = "blocking" ))] |
| 100 | pub(crate) fn stream<S>(stream: S) -> Body |
| 101 | where |
| 102 | S: futures_core::stream::TryStream + Send + 'static, |
| 103 | S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 104 | Bytes: From<S::Ok>, |
| 105 | { |
| 106 | use futures_util::TryStreamExt; |
| 107 | use http_body::Frame; |
| 108 | use http_body_util::StreamBody; |
| 109 | |
| 110 | let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new( |
| 111 | stream |
| 112 | .map_ok(|d| Frame::data(Bytes::from(d))) |
| 113 | .map_err(Into::into), |
| 114 | ))); |
| 115 | Body { |
| 116 | inner: Inner::Streaming(body), |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | pub(crate) fn empty() -> Body { |
| 121 | Body::reusable(Bytes::new()) |
| 122 | } |
| 123 | |
| 124 | pub(crate) fn reusable(chunk: Bytes) -> Body { |
| 125 | Body { |
| 126 | inner: Inner::Reusable(chunk), |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | /// Wrap a [`HttpBody`] in a box inside `Body`. |
| 131 | /// |
| 132 | /// # Example |
| 133 | /// |
| 134 | /// ``` |
| 135 | /// # use reqwest::Body; |
| 136 | /// # use futures_util; |
| 137 | /// # fn main() { |
| 138 | /// let content = "hello,world!" .to_string(); |
| 139 | /// |
| 140 | /// let body = Body::wrap(content); |
| 141 | /// # } |
| 142 | /// ``` |
| 143 | pub fn wrap<B>(inner: B) -> Body |
| 144 | where |
| 145 | B: HttpBody + Send + Sync + 'static, |
| 146 | B::Data: Into<Bytes>, |
| 147 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 148 | { |
| 149 | use http_body_util::BodyExt; |
| 150 | |
| 151 | let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed(); |
| 152 | |
| 153 | Body { |
| 154 | inner: Inner::Streaming(boxed), |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) { |
| 159 | let reuse = match self.inner { |
| 160 | Inner::Reusable(ref chunk) => Some(chunk.clone()), |
| 161 | Inner::Streaming { .. } => None, |
| 162 | }; |
| 163 | |
| 164 | (reuse, self) |
| 165 | } |
| 166 | |
| 167 | pub(crate) fn try_clone(&self) -> Option<Body> { |
| 168 | match self.inner { |
| 169 | Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())), |
| 170 | Inner::Streaming { .. } => None, |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | #[cfg (feature = "multipart" )] |
| 175 | pub(crate) fn into_stream(self) -> DataStream<Body> { |
| 176 | DataStream(self) |
| 177 | } |
| 178 | |
| 179 | #[cfg (feature = "multipart" )] |
| 180 | pub(crate) fn content_length(&self) -> Option<u64> { |
| 181 | match self.inner { |
| 182 | Inner::Reusable(ref bytes) => Some(bytes.len() as u64), |
| 183 | Inner::Streaming(ref body) => body.size_hint().exact(), |
| 184 | } |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | impl Default for Body { |
| 189 | #[inline ] |
| 190 | fn default() -> Body { |
| 191 | Body::empty() |
| 192 | } |
| 193 | } |
| 194 | |
| 195 | /* |
| 196 | impl From<hyper::Body> for Body { |
| 197 | #[inline] |
| 198 | fn from(body: hyper::Body) -> Body { |
| 199 | Self { |
| 200 | inner: Inner::Streaming { |
| 201 | body: Box::pin(WrapHyper(body)), |
| 202 | }, |
| 203 | } |
| 204 | } |
| 205 | } |
| 206 | */ |
| 207 | |
| 208 | impl From<Bytes> for Body { |
| 209 | #[inline ] |
| 210 | fn from(bytes: Bytes) -> Body { |
| 211 | Body::reusable(chunk:bytes) |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | impl From<Vec<u8>> for Body { |
| 216 | #[inline ] |
| 217 | fn from(vec: Vec<u8>) -> Body { |
| 218 | Body::reusable(chunk:vec.into()) |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | impl From<&'static [u8]> for Body { |
| 223 | #[inline ] |
| 224 | fn from(s: &'static [u8]) -> Body { |
| 225 | Body::reusable(chunk:Bytes::from_static(bytes:s)) |
| 226 | } |
| 227 | } |
| 228 | |
| 229 | impl From<String> for Body { |
| 230 | #[inline ] |
| 231 | fn from(s: String) -> Body { |
| 232 | Body::reusable(chunk:s.into()) |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | impl From<&'static str> for Body { |
| 237 | #[inline ] |
| 238 | fn from(s: &'static str) -> Body { |
| 239 | s.as_bytes().into() |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | #[cfg (feature = "stream" )] |
| 244 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
| 245 | impl From<File> for Body { |
| 246 | #[inline ] |
| 247 | fn from(file: File) -> Body { |
| 248 | Body::wrap_stream(ReaderStream::new(reader:file)) |
| 249 | } |
| 250 | } |
| 251 | |
| 252 | impl fmt::Debug for Body { |
| 253 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| 254 | f.debug_struct(name:"Body" ).finish() |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | impl HttpBody for Body { |
| 259 | type Data = Bytes; |
| 260 | type Error = crate::Error; |
| 261 | |
| 262 | fn poll_frame( |
| 263 | mut self: Pin<&mut Self>, |
| 264 | cx: &mut Context, |
| 265 | ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { |
| 266 | match self.inner { |
| 267 | Inner::Reusable(ref mut bytes) => { |
| 268 | let out = bytes.split_off(0); |
| 269 | if out.is_empty() { |
| 270 | Poll::Ready(None) |
| 271 | } else { |
| 272 | Poll::Ready(Some(Ok(hyper::body::Frame::data(out)))) |
| 273 | } |
| 274 | } |
| 275 | Inner::Streaming(ref mut body) => Poll::Ready( |
| 276 | ready!(Pin::new(body).poll_frame(cx)) |
| 277 | .map(|opt_chunk| opt_chunk.map_err(crate::error::body)), |
| 278 | ), |
| 279 | } |
| 280 | } |
| 281 | |
| 282 | fn size_hint(&self) -> http_body::SizeHint { |
| 283 | match self.inner { |
| 284 | Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64), |
| 285 | Inner::Streaming(ref body) => body.size_hint(), |
| 286 | } |
| 287 | } |
| 288 | |
| 289 | fn is_end_stream(&self) -> bool { |
| 290 | match self.inner { |
| 291 | Inner::Reusable(ref bytes) => bytes.is_empty(), |
| 292 | Inner::Streaming(ref body) => body.is_end_stream(), |
| 293 | } |
| 294 | } |
| 295 | } |
| 296 | |
| 297 | // ===== impl TotalTimeoutBody ===== |
| 298 | |
| 299 | pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> { |
| 300 | TotalTimeoutBody { |
| 301 | inner: body, |
| 302 | timeout, |
| 303 | } |
| 304 | } |
| 305 | |
| 306 | pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> { |
| 307 | ReadTimeoutBody { |
| 308 | inner: body, |
| 309 | sleep: None, |
| 310 | timeout, |
| 311 | } |
| 312 | } |
| 313 | |
| 314 | impl<B> hyper::body::Body for TotalTimeoutBody<B> |
| 315 | where |
| 316 | B: hyper::body::Body, |
| 317 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 318 | { |
| 319 | type Data = B::Data; |
| 320 | type Error = crate::Error; |
| 321 | |
| 322 | fn poll_frame( |
| 323 | self: Pin<&mut Self>, |
| 324 | cx: &mut Context, |
| 325 | ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { |
| 326 | let this = self.project(); |
| 327 | if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) { |
| 328 | return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); |
| 329 | } |
| 330 | Poll::Ready( |
| 331 | ready!(this.inner.poll_frame(cx)) |
| 332 | .map(|opt_chunk| opt_chunk.map_err(crate::error::body)), |
| 333 | ) |
| 334 | } |
| 335 | |
| 336 | #[inline ] |
| 337 | fn size_hint(&self) -> http_body::SizeHint { |
| 338 | self.inner.size_hint() |
| 339 | } |
| 340 | |
| 341 | #[inline ] |
| 342 | fn is_end_stream(&self) -> bool { |
| 343 | self.inner.is_end_stream() |
| 344 | } |
| 345 | } |
| 346 | |
| 347 | impl<B> hyper::body::Body for ReadTimeoutBody<B> |
| 348 | where |
| 349 | B: hyper::body::Body, |
| 350 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 351 | { |
| 352 | type Data = B::Data; |
| 353 | type Error = crate::Error; |
| 354 | |
| 355 | fn poll_frame( |
| 356 | self: Pin<&mut Self>, |
| 357 | cx: &mut Context, |
| 358 | ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { |
| 359 | let mut this = self.project(); |
| 360 | |
| 361 | // Start the `Sleep` if not active. |
| 362 | let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() { |
| 363 | some |
| 364 | } else { |
| 365 | this.sleep.set(Some(tokio::time::sleep(*this.timeout))); |
| 366 | this.sleep.as_mut().as_pin_mut().unwrap() |
| 367 | }; |
| 368 | |
| 369 | // Error if the timeout has expired. |
| 370 | if let Poll::Ready(()) = sleep_pinned.poll(cx) { |
| 371 | return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); |
| 372 | } |
| 373 | |
| 374 | let item = ready!(this.inner.poll_frame(cx)) |
| 375 | .map(|opt_chunk| opt_chunk.map_err(crate::error::body)); |
| 376 | // a ready frame means timeout is reset |
| 377 | this.sleep.set(None); |
| 378 | Poll::Ready(item) |
| 379 | } |
| 380 | |
| 381 | #[inline ] |
| 382 | fn size_hint(&self) -> http_body::SizeHint { |
| 383 | self.inner.size_hint() |
| 384 | } |
| 385 | |
| 386 | #[inline ] |
| 387 | fn is_end_stream(&self) -> bool { |
| 388 | self.inner.is_end_stream() |
| 389 | } |
| 390 | } |
| 391 | |
| 392 | pub(crate) type ResponseBody = |
| 393 | http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>; |
| 394 | |
| 395 | pub(crate) fn boxed<B>(body: B) -> ResponseBody |
| 396 | where |
| 397 | B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static, |
| 398 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 399 | { |
| 400 | use http_body_util::BodyExt; |
| 401 | |
| 402 | body.map_err(box_err).boxed() |
| 403 | } |
| 404 | |
| 405 | pub(crate) fn response<B>( |
| 406 | body: B, |
| 407 | deadline: Option<Pin<Box<Sleep>>>, |
| 408 | read_timeout: Option<Duration>, |
| 409 | ) -> ResponseBody |
| 410 | where |
| 411 | B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static, |
| 412 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 413 | { |
| 414 | use http_body_util::BodyExt; |
| 415 | |
| 416 | match (deadline, read_timeout) { |
| 417 | (Some(total: Pin>), Some(read: Duration)) => { |
| 418 | let body: MapErr, …> = with_read_timeout(body, timeout:read).map_err(box_err); |
| 419 | total_timeout(body, timeout:total).map_err(box_err).boxed() |
| 420 | } |
| 421 | (Some(total: Pin>), None) => total_timeout(body, timeout:total).map_err(box_err).boxed(), |
| 422 | (None, Some(read: Duration)) => with_read_timeout(body, timeout:read).map_err(box_err).boxed(), |
| 423 | (None, None) => body.map_err(box_err).boxed(), |
| 424 | } |
| 425 | } |
| 426 | |
| 427 | fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync> |
| 428 | where |
| 429 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 430 | { |
| 431 | err.into() |
| 432 | } |
| 433 | |
| 434 | // ===== impl DataStream ===== |
| 435 | |
| 436 | #[cfg (any(feature = "stream" , feature = "multipart" ,))] |
| 437 | impl<B> futures_core::Stream for DataStream<B> |
| 438 | where |
| 439 | B: HttpBody<Data = Bytes> + Unpin, |
| 440 | { |
| 441 | type Item = Result<Bytes, B::Error>; |
| 442 | |
| 443 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { |
| 444 | loop { |
| 445 | return match ready!(Pin::new(&mut self.0).poll_frame(cx)) { |
| 446 | Some(Ok(frame: Frame)) => { |
| 447 | // skip non-data frames |
| 448 | if let Ok(buf: Bytes) = frame.into_data() { |
| 449 | Poll::Ready(Some(Ok(buf))) |
| 450 | } else { |
| 451 | continue; |
| 452 | } |
| 453 | } |
| 454 | Some(Err(err: ::Error)) => Poll::Ready(Some(Err(err))), |
| 455 | None => Poll::Ready(None), |
| 456 | }; |
| 457 | } |
| 458 | } |
| 459 | } |
| 460 | |
| 461 | // ===== impl IntoBytesBody ===== |
| 462 | |
| 463 | pin_project! { |
| 464 | struct IntoBytesBody<B> { |
| 465 | #[pin] |
| 466 | inner: B, |
| 467 | } |
| 468 | } |
| 469 | |
| 470 | // We can't use `map_frame()` because that loses the hint data (for good reason). |
| 471 | // But we aren't transforming the data. |
| 472 | impl<B> hyper::body::Body for IntoBytesBody<B> |
| 473 | where |
| 474 | B: hyper::body::Body, |
| 475 | B::Data: Into<Bytes>, |
| 476 | { |
| 477 | type Data = Bytes; |
| 478 | type Error = B::Error; |
| 479 | |
| 480 | fn poll_frame( |
| 481 | self: Pin<&mut Self>, |
| 482 | cx: &mut Context, |
| 483 | ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { |
| 484 | match ready!(self.project().inner.poll_frame(cx)) { |
| 485 | Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))), |
| 486 | Some(Err(e)) => Poll::Ready(Some(Err(e))), |
| 487 | None => Poll::Ready(None), |
| 488 | } |
| 489 | } |
| 490 | |
| 491 | #[inline ] |
| 492 | fn size_hint(&self) -> http_body::SizeHint { |
| 493 | self.inner.size_hint() |
| 494 | } |
| 495 | |
| 496 | #[inline ] |
| 497 | fn is_end_stream(&self) -> bool { |
| 498 | self.inner.is_end_stream() |
| 499 | } |
| 500 | } |
| 501 | |
| 502 | #[cfg (test)] |
| 503 | mod tests { |
| 504 | use http_body::Body as _; |
| 505 | |
| 506 | use super::Body; |
| 507 | |
| 508 | #[test ] |
| 509 | fn test_as_bytes() { |
| 510 | let test_data = b"Test body" ; |
| 511 | let body = Body::from(&test_data[..]); |
| 512 | assert_eq!(body.as_bytes(), Some(&test_data[..])); |
| 513 | } |
| 514 | |
| 515 | #[test ] |
| 516 | fn body_exact_length() { |
| 517 | let empty_body = Body::empty(); |
| 518 | assert!(empty_body.is_end_stream()); |
| 519 | assert_eq!(empty_body.size_hint().exact(), Some(0)); |
| 520 | |
| 521 | let bytes_body = Body::reusable("abc" .into()); |
| 522 | assert!(!bytes_body.is_end_stream()); |
| 523 | assert_eq!(bytes_body.size_hint().exact(), Some(3)); |
| 524 | |
| 525 | // can delegate even when wrapped |
| 526 | let stream_body = Body::wrap(empty_body); |
| 527 | assert!(stream_body.is_end_stream()); |
| 528 | assert_eq!(stream_body.size_hint().exact(), Some(0)); |
| 529 | } |
| 530 | } |
| 531 | |