1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use futures_core::Stream;
8use http_body::Body as HttpBody;
9use pin_project_lite::pin_project;
10use sync_wrapper::SyncWrapper;
11#[cfg(feature = "stream")]
12use tokio::fs::File;
13use tokio::time::Sleep;
14#[cfg(feature = "stream")]
15use tokio_util::io::ReaderStream;
16
17/// An asynchronous request body.
18pub struct Body {
19 inner: Inner,
20}
21
22// The `Stream` trait isn't stable, so the impl isn't public.
23pub(crate) struct ImplStream(Body);
24
25enum Inner {
26 Reusable(Bytes),
27 Streaming {
28 body: Pin<
29 Box<
30 dyn HttpBody<Data = Bytes, Error = Box<dyn std::error::Error + Send + Sync>>
31 + Send
32 + Sync,
33 >,
34 >,
35 timeout: Option<Pin<Box<Sleep>>>,
36 },
37}
38
39pin_project! {
40 struct WrapStream<S> {
41 #[pin]
42 inner: SyncWrapper<S>,
43 }
44}
45
46struct WrapHyper(hyper::Body);
47
48impl Body {
49 /// Returns a reference to the internal data of the `Body`.
50 ///
51 /// `None` is returned, if the underlying data is a stream.
52 pub fn as_bytes(&self) -> Option<&[u8]> {
53 match &self.inner {
54 Inner::Reusable(bytes) => Some(bytes.as_ref()),
55 Inner::Streaming { .. } => None,
56 }
57 }
58
59 /// Wrap a futures `Stream` in a box inside `Body`.
60 ///
61 /// # Example
62 ///
63 /// ```
64 /// # use reqwest::Body;
65 /// # use futures_util;
66 /// # fn main() {
67 /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
68 /// Ok("hello"),
69 /// Ok(" "),
70 /// Ok("world"),
71 /// ];
72 ///
73 /// let stream = futures_util::stream::iter(chunks);
74 ///
75 /// let body = Body::wrap_stream(stream);
76 /// # }
77 /// ```
78 ///
79 /// # Optional
80 ///
81 /// This requires the `stream` feature to be enabled.
82 #[cfg(feature = "stream")]
83 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
84 pub fn wrap_stream<S>(stream: S) -> Body
85 where
86 S: futures_core::stream::TryStream + Send + 'static,
87 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
88 Bytes: From<S::Ok>,
89 {
90 Body::stream(stream)
91 }
92
93 pub(crate) fn stream<S>(stream: S) -> Body
94 where
95 S: futures_core::stream::TryStream + Send + 'static,
96 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
97 Bytes: From<S::Ok>,
98 {
99 use futures_util::TryStreamExt;
100
101 let body = Box::pin(WrapStream {
102 inner: SyncWrapper::new(stream.map_ok(Bytes::from).map_err(Into::into)),
103 });
104 Body {
105 inner: Inner::Streaming {
106 body,
107 timeout: None,
108 },
109 }
110 }
111
112 pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body {
113 Body {
114 inner: Inner::Streaming {
115 body: Box::pin(WrapHyper(body)),
116 timeout,
117 },
118 }
119 }
120
121 #[cfg(feature = "blocking")]
122 pub(crate) fn wrap(body: hyper::Body) -> Body {
123 Body {
124 inner: Inner::Streaming {
125 body: Box::pin(WrapHyper(body)),
126 timeout: None,
127 },
128 }
129 }
130
131 pub(crate) fn empty() -> Body {
132 Body::reusable(Bytes::new())
133 }
134
135 pub(crate) fn reusable(chunk: Bytes) -> Body {
136 Body {
137 inner: Inner::Reusable(chunk),
138 }
139 }
140
141 pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
142 let reuse = match self.inner {
143 Inner::Reusable(ref chunk) => Some(chunk.clone()),
144 Inner::Streaming { .. } => None,
145 };
146
147 (reuse, self)
148 }
149
150 pub(crate) fn try_clone(&self) -> Option<Body> {
151 match self.inner {
152 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
153 Inner::Streaming { .. } => None,
154 }
155 }
156
157 pub(crate) fn into_stream(self) -> ImplStream {
158 ImplStream(self)
159 }
160
161 #[cfg(feature = "multipart")]
162 pub(crate) fn content_length(&self) -> Option<u64> {
163 match self.inner {
164 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
165 Inner::Streaming { ref body, .. } => body.size_hint().exact(),
166 }
167 }
168}
169
170impl From<hyper::Body> for Body {
171 #[inline]
172 fn from(body: hyper::Body) -> Body {
173 Self {
174 inner: Inner::Streaming {
175 body: Box::pin(WrapHyper(body)),
176 timeout: None,
177 },
178 }
179 }
180}
181
182impl From<Bytes> for Body {
183 #[inline]
184 fn from(bytes: Bytes) -> Body {
185 Body::reusable(chunk:bytes)
186 }
187}
188
189impl From<Vec<u8>> for Body {
190 #[inline]
191 fn from(vec: Vec<u8>) -> Body {
192 Body::reusable(chunk:vec.into())
193 }
194}
195
196impl From<&'static [u8]> for Body {
197 #[inline]
198 fn from(s: &'static [u8]) -> Body {
199 Body::reusable(chunk:Bytes::from_static(bytes:s))
200 }
201}
202
203impl From<String> for Body {
204 #[inline]
205 fn from(s: String) -> Body {
206 Body::reusable(chunk:s.into())
207 }
208}
209
210impl From<&'static str> for Body {
211 #[inline]
212 fn from(s: &'static str) -> Body {
213 s.as_bytes().into()
214 }
215}
216
217#[cfg(feature = "stream")]
218#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
219impl From<File> for Body {
220 #[inline]
221 fn from(file: File) -> Body {
222 Body::wrap_stream(ReaderStream::new(reader:file))
223 }
224}
225
226impl fmt::Debug for Body {
227 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
228 f.debug_struct(name:"Body").finish()
229 }
230}
231
232// ===== impl ImplStream =====
233
234impl HttpBody for ImplStream {
235 type Data = Bytes;
236 type Error = crate::Error;
237
238 fn poll_data(
239 mut self: Pin<&mut Self>,
240 cx: &mut Context,
241 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
242 let opt_try_chunk = match self.0.inner {
243 Inner::Streaming {
244 ref mut body,
245 ref mut timeout,
246 } => {
247 if let Some(ref mut timeout) = timeout {
248 if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
249 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
250 }
251 }
252 futures_core::ready!(Pin::new(body).poll_data(cx))
253 .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body))
254 }
255 Inner::Reusable(ref mut bytes) => {
256 if bytes.is_empty() {
257 None
258 } else {
259 Some(Ok(std::mem::replace(bytes, Bytes::new())))
260 }
261 }
262 };
263
264 Poll::Ready(opt_try_chunk)
265 }
266
267 fn poll_trailers(
268 self: Pin<&mut Self>,
269 _cx: &mut Context,
270 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
271 Poll::Ready(Ok(None))
272 }
273
274 fn is_end_stream(&self) -> bool {
275 match self.0.inner {
276 Inner::Streaming { ref body, .. } => body.is_end_stream(),
277 Inner::Reusable(ref bytes) => bytes.is_empty(),
278 }
279 }
280
281 fn size_hint(&self) -> http_body::SizeHint {
282 match self.0.inner {
283 Inner::Streaming { ref body, .. } => body.size_hint(),
284 Inner::Reusable(ref bytes) => {
285 let mut hint = http_body::SizeHint::default();
286 hint.set_exact(bytes.len() as u64);
287 hint
288 }
289 }
290 }
291}
292
293impl Stream for ImplStream {
294 type Item = Result<Bytes, crate::Error>;
295
296 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
297 self.poll_data(cx)
298 }
299}
300
301// ===== impl WrapStream =====
302
303impl<S, D, E> HttpBody for WrapStream<S>
304where
305 S: Stream<Item = Result<D, E>>,
306 D: Into<Bytes>,
307 E: Into<Box<dyn std::error::Error + Send + Sync>>,
308{
309 type Data = Bytes;
310 type Error = E;
311
312 fn poll_data(
313 self: Pin<&mut Self>,
314 cx: &mut Context,
315 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
316 let item: Option = futures_core::ready!(self.project().inner.get_pin_mut().poll_next(cx)?);
317
318 Poll::Ready(item.map(|val: D| Ok(val.into())))
319 }
320
321 fn poll_trailers(
322 self: Pin<&mut Self>,
323 _cx: &mut Context,
324 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
325 Poll::Ready(Ok(None))
326 }
327}
328
329// ===== impl WrapHyper =====
330
331impl HttpBody for WrapHyper {
332 type Data = Bytes;
333 type Error = Box<dyn std::error::Error + Send + Sync>;
334
335 fn poll_data(
336 mut self: Pin<&mut Self>,
337 cx: &mut Context,
338 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
339 // safe pin projection
340 Pin::new(&mut self.0)
341 .poll_data(cx)
342 .map(|opt| opt.map(|res| res.map_err(Into::into)))
343 }
344
345 fn poll_trailers(
346 self: Pin<&mut Self>,
347 _cx: &mut Context,
348 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
349 Poll::Ready(Ok(None))
350 }
351
352 fn is_end_stream(&self) -> bool {
353 self.0.is_end_stream()
354 }
355
356 fn size_hint(&self) -> http_body::SizeHint {
357 HttpBody::size_hint(&self.0)
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::Body;
364
365 #[test]
366 fn test_as_bytes() {
367 let test_data = b"Test body";
368 let body = Body::from(&test_data[..]);
369 assert_eq!(body.as_bytes(), Some(&test_data[..]));
370 }
371}
372