1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use futures_core::Stream;
8use http_body::Body as HttpBody;
9use pin_project_lite::pin_project;
10#[cfg(feature = "stream")]
11use tokio::fs::File;
12use tokio::time::Sleep;
13#[cfg(feature = "stream")]
14use tokio_util::io::ReaderStream;
15
16/// An asynchronous request body.
17pub struct Body {
18 inner: Inner,
19}
20
21// The `Stream` trait isn't stable, so the impl isn't public.
22pub(crate) struct ImplStream(Body);
23
24enum Inner {
25 Reusable(Bytes),
26 Streaming {
27 body: Pin<
28 Box<
29 dyn HttpBody<Data = Bytes, Error = Box<dyn std::error::Error + Send + Sync>>
30 + Send
31 + Sync,
32 >,
33 >,
34 timeout: Option<Pin<Box<Sleep>>>,
35 },
36}
37
38pin_project! {
39 struct WrapStream<S> {
40 #[pin]
41 inner: S,
42 }
43}
44
45struct WrapHyper(hyper::Body);
46
47impl Body {
48 /// Returns a reference to the internal data of the `Body`.
49 ///
50 /// `None` is returned, if the underlying data is a stream.
51 pub fn as_bytes(&self) -> Option<&[u8]> {
52 match &self.inner {
53 Inner::Reusable(bytes) => Some(bytes.as_ref()),
54 Inner::Streaming { .. } => None,
55 }
56 }
57
58 /// Wrap a futures `Stream` in a box inside `Body`.
59 ///
60 /// # Example
61 ///
62 /// ```
63 /// # use reqwest::Body;
64 /// # use futures_util;
65 /// # fn main() {
66 /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
67 /// Ok("hello"),
68 /// Ok(" "),
69 /// Ok("world"),
70 /// ];
71 ///
72 /// let stream = futures_util::stream::iter(chunks);
73 ///
74 /// let body = Body::wrap_stream(stream);
75 /// # }
76 /// ```
77 ///
78 /// # Optional
79 ///
80 /// This requires the `stream` feature to be enabled.
81 #[cfg(feature = "stream")]
82 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
83 pub fn wrap_stream<S>(stream: S) -> Body
84 where
85 S: futures_core::stream::TryStream + Send + Sync + 'static,
86 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
87 Bytes: From<S::Ok>,
88 {
89 Body::stream(stream)
90 }
91
92 pub(crate) fn stream<S>(stream: S) -> Body
93 where
94 S: futures_core::stream::TryStream + Send + Sync + 'static,
95 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
96 Bytes: From<S::Ok>,
97 {
98 use futures_util::TryStreamExt;
99
100 let body = Box::pin(WrapStream {
101 inner: stream.map_ok(Bytes::from).map_err(Into::into),
102 });
103 Body {
104 inner: Inner::Streaming {
105 body,
106 timeout: None,
107 },
108 }
109 }
110
111 pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body {
112 Body {
113 inner: Inner::Streaming {
114 body: Box::pin(WrapHyper(body)),
115 timeout,
116 },
117 }
118 }
119
120 #[cfg(feature = "blocking")]
121 pub(crate) fn wrap(body: hyper::Body) -> Body {
122 Body {
123 inner: Inner::Streaming {
124 body: Box::pin(WrapHyper(body)),
125 timeout: None,
126 },
127 }
128 }
129
130 pub(crate) fn empty() -> Body {
131 Body::reusable(Bytes::new())
132 }
133
134 pub(crate) fn reusable(chunk: Bytes) -> Body {
135 Body {
136 inner: Inner::Reusable(chunk),
137 }
138 }
139
140 pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
141 let reuse = match self.inner {
142 Inner::Reusable(ref chunk) => Some(chunk.clone()),
143 Inner::Streaming { .. } => None,
144 };
145
146 (reuse, self)
147 }
148
149 pub(crate) fn try_clone(&self) -> Option<Body> {
150 match self.inner {
151 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
152 Inner::Streaming { .. } => None,
153 }
154 }
155
156 pub(crate) fn into_stream(self) -> ImplStream {
157 ImplStream(self)
158 }
159
160 #[cfg(feature = "multipart")]
161 pub(crate) fn content_length(&self) -> Option<u64> {
162 match self.inner {
163 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
164 Inner::Streaming { ref body, .. } => body.size_hint().exact(),
165 }
166 }
167}
168
169impl From<hyper::Body> for Body {
170 #[inline]
171 fn from(body: hyper::Body) -> Body {
172 Self {
173 inner: Inner::Streaming {
174 body: Box::pin(WrapHyper(body)),
175 timeout: None,
176 },
177 }
178 }
179}
180
181impl From<Bytes> for Body {
182 #[inline]
183 fn from(bytes: Bytes) -> Body {
184 Body::reusable(chunk:bytes)
185 }
186}
187
188impl From<Vec<u8>> for Body {
189 #[inline]
190 fn from(vec: Vec<u8>) -> Body {
191 Body::reusable(chunk:vec.into())
192 }
193}
194
195impl From<&'static [u8]> for Body {
196 #[inline]
197 fn from(s: &'static [u8]) -> Body {
198 Body::reusable(chunk:Bytes::from_static(bytes:s))
199 }
200}
201
202impl From<String> for Body {
203 #[inline]
204 fn from(s: String) -> Body {
205 Body::reusable(chunk:s.into())
206 }
207}
208
209impl From<&'static str> for Body {
210 #[inline]
211 fn from(s: &'static str) -> Body {
212 s.as_bytes().into()
213 }
214}
215
216#[cfg(feature = "stream")]
217#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
218impl From<File> for Body {
219 #[inline]
220 fn from(file: File) -> Body {
221 Body::wrap_stream(ReaderStream::new(file))
222 }
223}
224
225impl fmt::Debug for Body {
226 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
227 f.debug_struct(name:"Body").finish()
228 }
229}
230
231// ===== impl ImplStream =====
232
233impl HttpBody for ImplStream {
234 type Data = Bytes;
235 type Error = crate::Error;
236
237 fn poll_data(
238 mut self: Pin<&mut Self>,
239 cx: &mut Context,
240 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
241 let opt_try_chunk = match self.0.inner {
242 Inner::Streaming {
243 ref mut body,
244 ref mut timeout,
245 } => {
246 if let Some(ref mut timeout) = timeout {
247 if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
248 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
249 }
250 }
251 futures_core::ready!(Pin::new(body).poll_data(cx))
252 .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body))
253 }
254 Inner::Reusable(ref mut bytes) => {
255 if bytes.is_empty() {
256 None
257 } else {
258 Some(Ok(std::mem::replace(bytes, Bytes::new())))
259 }
260 }
261 };
262
263 Poll::Ready(opt_try_chunk)
264 }
265
266 fn poll_trailers(
267 self: Pin<&mut Self>,
268 _cx: &mut Context,
269 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
270 Poll::Ready(Ok(None))
271 }
272
273 fn is_end_stream(&self) -> bool {
274 match self.0.inner {
275 Inner::Streaming { ref body, .. } => body.is_end_stream(),
276 Inner::Reusable(ref bytes) => bytes.is_empty(),
277 }
278 }
279
280 fn size_hint(&self) -> http_body::SizeHint {
281 match self.0.inner {
282 Inner::Streaming { ref body, .. } => body.size_hint(),
283 Inner::Reusable(ref bytes) => {
284 let mut hint = http_body::SizeHint::default();
285 hint.set_exact(bytes.len() as u64);
286 hint
287 }
288 }
289 }
290}
291
292impl Stream for ImplStream {
293 type Item = Result<Bytes, crate::Error>;
294
295 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
296 self.poll_data(cx)
297 }
298}
299
300// ===== impl WrapStream =====
301
302impl<S, D, E> HttpBody for WrapStream<S>
303where
304 S: Stream<Item = Result<D, E>>,
305 D: Into<Bytes>,
306 E: Into<Box<dyn std::error::Error + Send + Sync>>,
307{
308 type Data = Bytes;
309 type Error = E;
310
311 fn poll_data(
312 self: Pin<&mut Self>,
313 cx: &mut Context,
314 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
315 let item: Option = futures_core::ready!(self.project().inner.poll_next(cx)?);
316
317 Poll::Ready(item.map(|val: D| Ok(val.into())))
318 }
319
320 fn poll_trailers(
321 self: Pin<&mut Self>,
322 _cx: &mut Context,
323 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
324 Poll::Ready(Ok(None))
325 }
326}
327
328// ===== impl WrapHyper =====
329
330impl HttpBody for WrapHyper {
331 type Data = Bytes;
332 type Error = Box<dyn std::error::Error + Send + Sync>;
333
334 fn poll_data(
335 mut self: Pin<&mut Self>,
336 cx: &mut Context,
337 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
338 // safe pin projection
339 Pin::new(&mut self.0)
340 .poll_data(cx)
341 .map(|opt| opt.map(|res| res.map_err(Into::into)))
342 }
343
344 fn poll_trailers(
345 self: Pin<&mut Self>,
346 _cx: &mut Context,
347 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
348 Poll::Ready(Ok(None))
349 }
350
351 fn is_end_stream(&self) -> bool {
352 self.0.is_end_stream()
353 }
354
355 fn size_hint(&self) -> http_body::SizeHint {
356 HttpBody::size_hint(&self.0)
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::Body;
363
364 #[test]
365 fn test_as_bytes() {
366 let test_data = b"Test body";
367 let body = Body::from(&test_data[..]);
368 assert_eq!(body.as_bytes(), Some(&test_data[..]));
369 }
370}
371