1use std::fmt;
2use std::fs::File;
3use std::future::Future;
4#[cfg(feature = "multipart")]
5use std::io::Cursor;
6use std::io::{self, Read};
7use std::mem;
8use std::ptr;
9
10use bytes::buf::UninitSlice;
11use bytes::Bytes;
12
13use crate::async_impl;
14
15/// The body of a `Request`.
16///
17/// In most cases, this is not needed directly, as the
18/// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows
19/// passing many things (like a string or vector of bytes).
20///
21/// [builder]: ./struct.RequestBuilder.html#method.body
22#[derive(Debug)]
23pub struct Body {
24 kind: Kind,
25}
26
27impl Body {
28 /// Instantiate a `Body` from a reader.
29 ///
30 /// # Note
31 ///
32 /// While allowing for many types to be used, these bodies do not have
33 /// a way to reset to the beginning and be reused. This means that when
34 /// encountering a 307 or 308 status code, instead of repeating the
35 /// request at the new location, the `Response` will be returned with
36 /// the redirect status code set.
37 ///
38 /// ```rust
39 /// # use std::fs::File;
40 /// # use reqwest::blocking::Body;
41 /// # fn run() -> Result<(), Box<std::error::Error>> {
42 /// let file = File::open("national_secrets.txt")?;
43 /// let body = Body::new(file);
44 /// # Ok(())
45 /// # }
46 /// ```
47 ///
48 /// If you have a set of bytes, like `String` or `Vec<u8>`, using the
49 /// `From` implementations for `Body` will store the data in a manner
50 /// it can be reused.
51 ///
52 /// ```rust
53 /// # use reqwest::blocking::Body;
54 /// # fn run() -> Result<(), Box<std::error::Error>> {
55 /// let s = "A stringy body";
56 /// let body = Body::from(s);
57 /// # Ok(())
58 /// # }
59 /// ```
60 pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
61 Body {
62 kind: Kind::Reader(Box::from(reader), None),
63 }
64 }
65
66 /// Create a `Body` from a `Read` where the size is known in advance
67 /// but the data should not be fully loaded into memory. This will
68 /// set the `Content-Length` header and stream from the `Read`.
69 ///
70 /// ```rust
71 /// # use std::fs::File;
72 /// # use reqwest::blocking::Body;
73 /// # fn run() -> Result<(), Box<std::error::Error>> {
74 /// let file = File::open("a_large_file.txt")?;
75 /// let file_size = file.metadata()?.len();
76 /// let body = Body::sized(file, file_size);
77 /// # Ok(())
78 /// # }
79 /// ```
80 pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
81 Body {
82 kind: Kind::Reader(Box::from(reader), Some(len)),
83 }
84 }
85
86 /// Returns the body as a byte slice if the body is already buffered in
87 /// memory. For streamed requests this method returns `None`.
88 pub fn as_bytes(&self) -> Option<&[u8]> {
89 match self.kind {
90 Kind::Reader(_, _) => None,
91 Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
92 }
93 }
94
95 /// Converts streamed requests to their buffered equivalent and
96 /// returns a reference to the buffer. If the request is already
97 /// buffered, this has no effect.
98 ///
99 /// Be aware that for large requests this method is expensive
100 /// and may cause your program to run out of memory.
101 pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
102 match self.kind {
103 Kind::Reader(ref mut reader, maybe_len) => {
104 let mut bytes = if let Some(len) = maybe_len {
105 Vec::with_capacity(len as usize)
106 } else {
107 Vec::new()
108 };
109 io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
110 self.kind = Kind::Bytes(bytes.into());
111 self.buffer()
112 }
113 Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
114 }
115 }
116
117 #[cfg(feature = "multipart")]
118 pub(crate) fn len(&self) -> Option<u64> {
119 match self.kind {
120 Kind::Reader(_, len) => len,
121 Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
122 }
123 }
124
125 #[cfg(feature = "multipart")]
126 pub(crate) fn into_reader(self) -> Reader {
127 match self.kind {
128 Kind::Reader(r, _) => Reader::Reader(r),
129 Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
130 }
131 }
132
133 pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
134 match self.kind {
135 Kind::Reader(read, len) => {
136 let (tx, rx) = hyper::Body::channel();
137 let tx = Sender {
138 body: (read, len),
139 tx,
140 };
141 (Some(tx), async_impl::Body::wrap(rx), len)
142 }
143 Kind::Bytes(chunk) => {
144 let len = chunk.len() as u64;
145 (None, async_impl::Body::reusable(chunk), Some(len))
146 }
147 }
148 }
149
150 pub(crate) fn try_clone(&self) -> Option<Body> {
151 self.kind.try_clone().map(|kind| Body { kind })
152 }
153}
154
155enum Kind {
156 Reader(Box<dyn Read + Send>, Option<u64>),
157 Bytes(Bytes),
158}
159
160impl Kind {
161 fn try_clone(&self) -> Option<Kind> {
162 match self {
163 Kind::Reader(..) => None,
164 Kind::Bytes(v: &Bytes) => Some(Kind::Bytes(v.clone())),
165 }
166 }
167}
168
169impl From<Vec<u8>> for Body {
170 #[inline]
171 fn from(v: Vec<u8>) -> Body {
172 Body {
173 kind: Kind::Bytes(v.into()),
174 }
175 }
176}
177
178impl From<String> for Body {
179 #[inline]
180 fn from(s: String) -> Body {
181 s.into_bytes().into()
182 }
183}
184
185impl From<&'static [u8]> for Body {
186 #[inline]
187 fn from(s: &'static [u8]) -> Body {
188 Body {
189 kind: Kind::Bytes(Bytes::from_static(bytes:s)),
190 }
191 }
192}
193
194impl From<&'static str> for Body {
195 #[inline]
196 fn from(s: &'static str) -> Body {
197 s.as_bytes().into()
198 }
199}
200
201impl From<File> for Body {
202 #[inline]
203 fn from(f: File) -> Body {
204 let len: Option = f.metadata().map(|m: Metadata| m.len()).ok();
205 Body {
206 kind: Kind::Reader(Box::new(f), len),
207 }
208 }
209}
210impl From<Bytes> for Body {
211 #[inline]
212 fn from(b: Bytes) -> Body {
213 Body {
214 kind: Kind::Bytes(b),
215 }
216 }
217}
218
219impl fmt::Debug for Kind {
220 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221 match *self {
222 Kind::Reader(_, ref v: &Option) => f&mut DebugStruct<'_, '_>
223 .debug_struct("Reader")
224 .field(name:"length", &DebugLength(v))
225 .finish(),
226 Kind::Bytes(ref v: &Bytes) => fmt::Debug::fmt(self:v, f),
227 }
228 }
229}
230
231struct DebugLength<'a>(&'a Option<u64>);
232
233impl<'a> fmt::Debug for DebugLength<'a> {
234 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235 match *self.0 {
236 Some(ref len: &u64) => fmt::Debug::fmt(self:len, f),
237 None => f.write_str(data:"Unknown"),
238 }
239 }
240}
241
242#[cfg(feature = "multipart")]
243pub(crate) enum Reader {
244 Reader(Box<dyn Read + Send>),
245 Bytes(Cursor<Bytes>),
246}
247
248#[cfg(feature = "multipart")]
249impl Read for Reader {
250 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
251 match *self {
252 Reader::Reader(ref mut rdr) => rdr.read(buf),
253 Reader::Bytes(ref mut rdr) => rdr.read(buf),
254 }
255 }
256}
257
258pub(crate) struct Sender {
259 body: (Box<dyn Read + Send>, Option<u64>),
260 tx: hyper::body::Sender,
261}
262
263async fn send_future(sender: Sender) -> Result<(), crate::Error> {
264 use bytes::{BufMut, BytesMut};
265 use std::cmp;
266
267 let con_len = sender.body.1;
268 let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
269 let mut written = 0;
270 let mut buf = BytesMut::with_capacity(cap as usize);
271 let mut body = sender.body.0;
272 // Put in an option so that it can be consumed on error to call abort()
273 let mut tx = Some(sender.tx);
274
275 loop {
276 if Some(written) == con_len {
277 // Written up to content-length, so stop.
278 return Ok(());
279 }
280
281 // The input stream is read only if the buffer is empty so
282 // that there is only one read in the buffer at any time.
283 //
284 // We need to know whether there is any data to send before
285 // we check the transmission channel (with poll_ready below)
286 // because somestimes the receiver disappears as soon as is
287 // considers the data is completely transmitted, which may
288 // be true.
289 //
290 // The use case is a web server that closes its
291 // input stream as soon as the data received is valid JSON.
292 // This behaviour is questionable, but it exists and the
293 // fact is that there is actually no remaining data to read.
294 if buf.is_empty() {
295 if buf.remaining_mut() == 0 {
296 buf.reserve(8192);
297 // zero out the reserved memory
298 let uninit = buf.chunk_mut();
299 unsafe {
300 ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
301 }
302 }
303
304 let bytes = unsafe { mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut()) };
305 match body.read(bytes) {
306 Ok(0) => {
307 // The buffer was empty and nothing's left to
308 // read. Return.
309 return Ok(());
310 }
311 Ok(n) => unsafe {
312 buf.advance_mut(n);
313 },
314 Err(e) => {
315 tx.take().expect("tx only taken on error").abort();
316 return Err(crate::error::body(e));
317 }
318 }
319 }
320
321 // The only way to get here is when the buffer is not empty.
322 // We can check the transmission channel
323
324 let buf_len = buf.len() as u64;
325 tx.as_mut()
326 .expect("tx only taken on error")
327 .send_data(buf.split().freeze())
328 .await
329 .map_err(crate::error::body)?;
330
331 written += buf_len;
332 }
333}
334
335impl Sender {
336 // A `Future` that may do blocking read calls.
337 // As a `Future`, this integrates easily with `wait::timeout`.
338 pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
339 send_future(self)
340 }
341}
342
343// useful for tests, but not publicly exposed
344#[cfg(test)]
345pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
346 let mut s = String::new();
347 match body.kind {
348 Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
349 Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
350 }
351 .map(|_| s)
352}
353