1 | use std::fmt; |
2 | use std::fs::File; |
3 | use std::future::Future; |
4 | #[cfg (feature = "multipart" )] |
5 | use std::io::Cursor; |
6 | use std::io::{self, Read}; |
7 | use std::mem; |
8 | use std::ptr; |
9 | |
10 | use bytes::buf::UninitSlice; |
11 | use bytes::Bytes; |
12 | |
13 | use crate::async_impl; |
14 | |
15 | /// The body of a `Request`. |
16 | /// |
17 | /// In most cases, this is not needed directly, as the |
18 | /// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows |
19 | /// passing many things (like a string or vector of bytes). |
20 | /// |
21 | /// [builder]: ./struct.RequestBuilder.html#method.body |
22 | #[derive (Debug)] |
23 | pub struct Body { |
24 | kind: Kind, |
25 | } |
26 | |
27 | impl Body { |
28 | /// Instantiate a `Body` from a reader. |
29 | /// |
30 | /// # Note |
31 | /// |
32 | /// While allowing for many types to be used, these bodies do not have |
33 | /// a way to reset to the beginning and be reused. This means that when |
34 | /// encountering a 307 or 308 status code, instead of repeating the |
35 | /// request at the new location, the `Response` will be returned with |
36 | /// the redirect status code set. |
37 | /// |
38 | /// ```rust |
39 | /// # use std::fs::File; |
40 | /// # use reqwest::blocking::Body; |
41 | /// # fn run() -> Result<(), Box<std::error::Error>> { |
42 | /// let file = File::open("national_secrets.txt" )?; |
43 | /// let body = Body::new(file); |
44 | /// # Ok(()) |
45 | /// # } |
46 | /// ``` |
47 | /// |
48 | /// If you have a set of bytes, like `String` or `Vec<u8>`, using the |
49 | /// `From` implementations for `Body` will store the data in a manner |
50 | /// it can be reused. |
51 | /// |
52 | /// ```rust |
53 | /// # use reqwest::blocking::Body; |
54 | /// # fn run() -> Result<(), Box<std::error::Error>> { |
55 | /// let s = "A stringy body" ; |
56 | /// let body = Body::from(s); |
57 | /// # Ok(()) |
58 | /// # } |
59 | /// ``` |
60 | pub fn new<R: Read + Send + 'static>(reader: R) -> Body { |
61 | Body { |
62 | kind: Kind::Reader(Box::from(reader), None), |
63 | } |
64 | } |
65 | |
66 | /// Create a `Body` from a `Read` where the size is known in advance |
67 | /// but the data should not be fully loaded into memory. This will |
68 | /// set the `Content-Length` header and stream from the `Read`. |
69 | /// |
70 | /// ```rust |
71 | /// # use std::fs::File; |
72 | /// # use reqwest::blocking::Body; |
73 | /// # fn run() -> Result<(), Box<std::error::Error>> { |
74 | /// let file = File::open("a_large_file.txt" )?; |
75 | /// let file_size = file.metadata()?.len(); |
76 | /// let body = Body::sized(file, file_size); |
77 | /// # Ok(()) |
78 | /// # } |
79 | /// ``` |
80 | pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body { |
81 | Body { |
82 | kind: Kind::Reader(Box::from(reader), Some(len)), |
83 | } |
84 | } |
85 | |
86 | /// Returns the body as a byte slice if the body is already buffered in |
87 | /// memory. For streamed requests this method returns `None`. |
88 | pub fn as_bytes(&self) -> Option<&[u8]> { |
89 | match self.kind { |
90 | Kind::Reader(_, _) => None, |
91 | Kind::Bytes(ref bytes) => Some(bytes.as_ref()), |
92 | } |
93 | } |
94 | |
95 | /// Converts streamed requests to their buffered equivalent and |
96 | /// returns a reference to the buffer. If the request is already |
97 | /// buffered, this has no effect. |
98 | /// |
99 | /// Be aware that for large requests this method is expensive |
100 | /// and may cause your program to run out of memory. |
101 | pub fn buffer(&mut self) -> Result<&[u8], crate::Error> { |
102 | match self.kind { |
103 | Kind::Reader(ref mut reader, maybe_len) => { |
104 | let mut bytes = if let Some(len) = maybe_len { |
105 | Vec::with_capacity(len as usize) |
106 | } else { |
107 | Vec::new() |
108 | }; |
109 | io::copy(reader, &mut bytes).map_err(crate::error::builder)?; |
110 | self.kind = Kind::Bytes(bytes.into()); |
111 | self.buffer() |
112 | } |
113 | Kind::Bytes(ref bytes) => Ok(bytes.as_ref()), |
114 | } |
115 | } |
116 | |
117 | #[cfg (feature = "multipart" )] |
118 | pub(crate) fn len(&self) -> Option<u64> { |
119 | match self.kind { |
120 | Kind::Reader(_, len) => len, |
121 | Kind::Bytes(ref bytes) => Some(bytes.len() as u64), |
122 | } |
123 | } |
124 | |
125 | #[cfg (feature = "multipart" )] |
126 | pub(crate) fn into_reader(self) -> Reader { |
127 | match self.kind { |
128 | Kind::Reader(r, _) => Reader::Reader(r), |
129 | Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)), |
130 | } |
131 | } |
132 | |
133 | pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) { |
134 | match self.kind { |
135 | Kind::Reader(read, len) => { |
136 | let (tx, rx) = hyper::Body::channel(); |
137 | let tx = Sender { |
138 | body: (read, len), |
139 | tx, |
140 | }; |
141 | (Some(tx), async_impl::Body::wrap(rx), len) |
142 | } |
143 | Kind::Bytes(chunk) => { |
144 | let len = chunk.len() as u64; |
145 | (None, async_impl::Body::reusable(chunk), Some(len)) |
146 | } |
147 | } |
148 | } |
149 | |
150 | pub(crate) fn try_clone(&self) -> Option<Body> { |
151 | self.kind.try_clone().map(|kind| Body { kind }) |
152 | } |
153 | } |
154 | |
155 | enum Kind { |
156 | Reader(Box<dyn Read + Send>, Option<u64>), |
157 | Bytes(Bytes), |
158 | } |
159 | |
160 | impl Kind { |
161 | fn try_clone(&self) -> Option<Kind> { |
162 | match self { |
163 | Kind::Reader(..) => None, |
164 | Kind::Bytes(v: &Bytes) => Some(Kind::Bytes(v.clone())), |
165 | } |
166 | } |
167 | } |
168 | |
169 | impl From<Vec<u8>> for Body { |
170 | #[inline ] |
171 | fn from(v: Vec<u8>) -> Body { |
172 | Body { |
173 | kind: Kind::Bytes(v.into()), |
174 | } |
175 | } |
176 | } |
177 | |
178 | impl From<String> for Body { |
179 | #[inline ] |
180 | fn from(s: String) -> Body { |
181 | s.into_bytes().into() |
182 | } |
183 | } |
184 | |
185 | impl From<&'static [u8]> for Body { |
186 | #[inline ] |
187 | fn from(s: &'static [u8]) -> Body { |
188 | Body { |
189 | kind: Kind::Bytes(Bytes::from_static(bytes:s)), |
190 | } |
191 | } |
192 | } |
193 | |
194 | impl From<&'static str> for Body { |
195 | #[inline ] |
196 | fn from(s: &'static str) -> Body { |
197 | s.as_bytes().into() |
198 | } |
199 | } |
200 | |
201 | impl From<File> for Body { |
202 | #[inline ] |
203 | fn from(f: File) -> Body { |
204 | let len: Option = f.metadata().map(|m: Metadata| m.len()).ok(); |
205 | Body { |
206 | kind: Kind::Reader(Box::new(f), len), |
207 | } |
208 | } |
209 | } |
210 | impl From<Bytes> for Body { |
211 | #[inline ] |
212 | fn from(b: Bytes) -> Body { |
213 | Body { |
214 | kind: Kind::Bytes(b), |
215 | } |
216 | } |
217 | } |
218 | |
219 | impl fmt::Debug for Kind { |
220 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
221 | match *self { |
222 | Kind::Reader(_, ref v: &Option) => f&mut DebugStruct<'_, '_> |
223 | .debug_struct("Reader" ) |
224 | .field(name:"length" , &DebugLength(v)) |
225 | .finish(), |
226 | Kind::Bytes(ref v: &Bytes) => fmt::Debug::fmt(self:v, f), |
227 | } |
228 | } |
229 | } |
230 | |
231 | struct DebugLength<'a>(&'a Option<u64>); |
232 | |
233 | impl<'a> fmt::Debug for DebugLength<'a> { |
234 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
235 | match *self.0 { |
236 | Some(ref len: &u64) => fmt::Debug::fmt(self:len, f), |
237 | None => f.write_str(data:"Unknown" ), |
238 | } |
239 | } |
240 | } |
241 | |
242 | #[cfg (feature = "multipart" )] |
243 | pub(crate) enum Reader { |
244 | Reader(Box<dyn Read + Send>), |
245 | Bytes(Cursor<Bytes>), |
246 | } |
247 | |
248 | #[cfg (feature = "multipart" )] |
249 | impl Read for Reader { |
250 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
251 | match *self { |
252 | Reader::Reader(ref mut rdr) => rdr.read(buf), |
253 | Reader::Bytes(ref mut rdr) => rdr.read(buf), |
254 | } |
255 | } |
256 | } |
257 | |
258 | pub(crate) struct Sender { |
259 | body: (Box<dyn Read + Send>, Option<u64>), |
260 | tx: hyper::body::Sender, |
261 | } |
262 | |
263 | async fn send_future(sender: Sender) -> Result<(), crate::Error> { |
264 | use bytes::{BufMut, BytesMut}; |
265 | use std::cmp; |
266 | |
267 | let con_len = sender.body.1; |
268 | let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192); |
269 | let mut written = 0; |
270 | let mut buf = BytesMut::with_capacity(cap as usize); |
271 | let mut body = sender.body.0; |
272 | // Put in an option so that it can be consumed on error to call abort() |
273 | let mut tx = Some(sender.tx); |
274 | |
275 | loop { |
276 | if Some(written) == con_len { |
277 | // Written up to content-length, so stop. |
278 | return Ok(()); |
279 | } |
280 | |
281 | // The input stream is read only if the buffer is empty so |
282 | // that there is only one read in the buffer at any time. |
283 | // |
284 | // We need to know whether there is any data to send before |
285 | // we check the transmission channel (with poll_ready below) |
286 | // because somestimes the receiver disappears as soon as is |
287 | // considers the data is completely transmitted, which may |
288 | // be true. |
289 | // |
290 | // The use case is a web server that closes its |
291 | // input stream as soon as the data received is valid JSON. |
292 | // This behaviour is questionable, but it exists and the |
293 | // fact is that there is actually no remaining data to read. |
294 | if buf.is_empty() { |
295 | if buf.remaining_mut() == 0 { |
296 | buf.reserve(8192); |
297 | // zero out the reserved memory |
298 | let uninit = buf.chunk_mut(); |
299 | unsafe { |
300 | ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len()); |
301 | } |
302 | } |
303 | |
304 | let bytes = unsafe { mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut()) }; |
305 | match body.read(bytes) { |
306 | Ok(0) => { |
307 | // The buffer was empty and nothing's left to |
308 | // read. Return. |
309 | return Ok(()); |
310 | } |
311 | Ok(n) => unsafe { |
312 | buf.advance_mut(n); |
313 | }, |
314 | Err(e) => { |
315 | tx.take().expect("tx only taken on error" ).abort(); |
316 | return Err(crate::error::body(e)); |
317 | } |
318 | } |
319 | } |
320 | |
321 | // The only way to get here is when the buffer is not empty. |
322 | // We can check the transmission channel |
323 | |
324 | let buf_len = buf.len() as u64; |
325 | tx.as_mut() |
326 | .expect("tx only taken on error" ) |
327 | .send_data(buf.split().freeze()) |
328 | .await |
329 | .map_err(crate::error::body)?; |
330 | |
331 | written += buf_len; |
332 | } |
333 | } |
334 | |
335 | impl Sender { |
336 | // A `Future` that may do blocking read calls. |
337 | // As a `Future`, this integrates easily with `wait::timeout`. |
338 | pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> { |
339 | send_future(self) |
340 | } |
341 | } |
342 | |
343 | // useful for tests, but not publicly exposed |
344 | #[cfg (test)] |
345 | pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> { |
346 | let mut s = String::new(); |
347 | match body.kind { |
348 | Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s), |
349 | Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s), |
350 | } |
351 | .map(|_| s) |
352 | } |
353 | |