1 | use std::borrow::Cow; |
2 | #[cfg (feature = "stream" )] |
3 | use std::error::Error as StdError; |
4 | use std::fmt; |
5 | |
6 | use bytes::Bytes; |
7 | use futures_channel::mpsc; |
8 | use futures_channel::oneshot; |
9 | use futures_core::Stream; // for mpsc::Receiver |
10 | #[cfg (feature = "stream" )] |
11 | use futures_util::TryStreamExt; |
12 | use http::HeaderMap; |
13 | use http_body::{Body as HttpBody, SizeHint}; |
14 | |
15 | use super::DecodedLength; |
16 | #[cfg (feature = "stream" )] |
17 | use crate::common::sync_wrapper::SyncWrapper; |
18 | use crate::common::Future; |
19 | #[cfg (all(feature = "client" , any(feature = "http1" , feature = "http2" )))] |
20 | use crate::common::Never; |
21 | use crate::common::{task, watch, Pin, Poll}; |
22 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
23 | use crate::proto::h2::ping; |
24 | |
25 | type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; |
26 | type TrailersSender = oneshot::Sender<HeaderMap>; |
27 | |
28 | /// A stream of `Bytes`, used when receiving bodies. |
29 | /// |
30 | /// A good default [`HttpBody`](crate::body::HttpBody) to use in many |
31 | /// applications. |
32 | /// |
33 | /// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes) |
34 | /// or [`body::aggregate`](crate::body::aggregate). |
35 | #[must_use = "streams do nothing unless polled" ] |
36 | pub struct Body { |
37 | kind: Kind, |
38 | /// Keep the extra bits in an `Option<Box<Extra>>`, so that |
39 | /// Body stays small in the common case (no extras needed). |
40 | extra: Option<Box<Extra>>, |
41 | } |
42 | |
43 | enum Kind { |
44 | Once(Option<Bytes>), |
45 | Chan { |
46 | content_length: DecodedLength, |
47 | want_tx: watch::Sender, |
48 | data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>, |
49 | trailers_rx: oneshot::Receiver<HeaderMap>, |
50 | }, |
51 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
52 | H2 { |
53 | ping: ping::Recorder, |
54 | content_length: DecodedLength, |
55 | recv: h2::RecvStream, |
56 | }, |
57 | #[cfg (feature = "ffi" )] |
58 | Ffi(crate::ffi::UserBody), |
59 | #[cfg (feature = "stream" )] |
60 | Wrapped( |
61 | SyncWrapper< |
62 | Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>, |
63 | >, |
64 | ), |
65 | } |
66 | |
67 | struct Extra { |
68 | /// Allow the client to pass a future to delay the `Body` from returning |
69 | /// EOF. This allows the `Client` to try to put the idle connection |
70 | /// back into the pool before the body is "finished". |
71 | /// |
72 | /// The reason for this is so that creating a new request after finishing |
73 | /// streaming the body of a response could sometimes result in creating |
74 | /// a brand new connection, since the pool didn't know about the idle |
75 | /// connection yet. |
76 | delayed_eof: Option<DelayEof>, |
77 | } |
78 | |
79 | #[cfg (all(feature = "client" , any(feature = "http1" , feature = "http2" )))] |
80 | type DelayEofUntil = oneshot::Receiver<Never>; |
81 | |
82 | enum DelayEof { |
83 | /// Initial state, stream hasn't seen EOF yet. |
84 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
85 | #[cfg (feature = "client" )] |
86 | NotEof(DelayEofUntil), |
87 | /// Transitions to this state once we've seen `poll` try to |
88 | /// return EOF (`None`). This future is then polled, and |
89 | /// when it completes, the Body finally returns EOF (`None`). |
90 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
91 | #[cfg (feature = "client" )] |
92 | Eof(DelayEofUntil), |
93 | } |
94 | |
95 | /// A sender half created through [`Body::channel()`]. |
96 | /// |
97 | /// Useful when wanting to stream chunks from another thread. |
98 | /// |
99 | /// ## Body Closing |
100 | /// |
101 | /// Note that the request body will always be closed normally when the sender is dropped (meaning |
102 | /// that the empty terminating chunk will be sent to the remote). If you desire to close the |
103 | /// connection with an incomplete response (e.g. in the case of an error during asynchronous |
104 | /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. |
105 | /// |
106 | /// [`Body::channel()`]: struct.Body.html#method.channel |
107 | /// [`Sender::abort()`]: struct.Sender.html#method.abort |
108 | #[must_use = "Sender does nothing unless sent on" ] |
109 | pub struct Sender { |
110 | want_rx: watch::Receiver, |
111 | data_tx: BodySender, |
112 | trailers_tx: Option<TrailersSender>, |
113 | } |
114 | |
115 | const WANT_PENDING: usize = 1; |
116 | const WANT_READY: usize = 2; |
117 | |
118 | impl Body { |
119 | /// Create an empty `Body` stream. |
120 | /// |
121 | /// # Example |
122 | /// |
123 | /// ``` |
124 | /// use hyper::{Body, Request}; |
125 | /// |
126 | /// // create a `GET /` request |
127 | /// let get = Request::new(Body::empty()); |
128 | /// ``` |
129 | #[inline ] |
130 | pub fn empty() -> Body { |
131 | Body::new(Kind::Once(None)) |
132 | } |
133 | |
134 | /// Create a `Body` stream with an associated sender half. |
135 | /// |
136 | /// Useful when wanting to stream chunks from another thread. |
137 | #[inline ] |
138 | pub fn channel() -> (Sender, Body) { |
139 | Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) |
140 | } |
141 | |
142 | pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { |
143 | let (data_tx, data_rx) = mpsc::channel(0); |
144 | let (trailers_tx, trailers_rx) = oneshot::channel(); |
145 | |
146 | // If wanter is true, `Sender::poll_ready()` won't becoming ready |
147 | // until the `Body` has been polled for data once. |
148 | let want = if wanter { WANT_PENDING } else { WANT_READY }; |
149 | |
150 | let (want_tx, want_rx) = watch::channel(want); |
151 | |
152 | let tx = Sender { |
153 | want_rx, |
154 | data_tx, |
155 | trailers_tx: Some(trailers_tx), |
156 | }; |
157 | let rx = Body::new(Kind::Chan { |
158 | content_length, |
159 | want_tx, |
160 | data_rx, |
161 | trailers_rx, |
162 | }); |
163 | |
164 | (tx, rx) |
165 | } |
166 | |
167 | /// Wrap a futures `Stream` in a box inside `Body`. |
168 | /// |
169 | /// # Example |
170 | /// |
171 | /// ``` |
172 | /// # use hyper::Body; |
173 | /// let chunks: Vec<Result<_, std::io::Error>> = vec![ |
174 | /// Ok("hello"), |
175 | /// Ok(" "), |
176 | /// Ok("world"), |
177 | /// ]; |
178 | /// |
179 | /// let stream = futures_util::stream::iter(chunks); |
180 | /// |
181 | /// let body = Body::wrap_stream(stream); |
182 | /// ``` |
183 | /// |
184 | /// # Optional |
185 | /// |
186 | /// This function requires enabling the `stream` feature in your |
187 | /// `Cargo.toml`. |
188 | #[cfg (feature = "stream" )] |
189 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
190 | pub fn wrap_stream<S, O, E>(stream: S) -> Body |
191 | where |
192 | S: Stream<Item = Result<O, E>> + Send + 'static, |
193 | O: Into<Bytes> + 'static, |
194 | E: Into<Box<dyn StdError + Send + Sync>> + 'static, |
195 | { |
196 | let mapped = stream.map_ok(Into::into).map_err(Into::into); |
197 | Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) |
198 | } |
199 | |
200 | fn new(kind: Kind) -> Body { |
201 | Body { kind, extra: None } |
202 | } |
203 | |
204 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
205 | pub(crate) fn h2( |
206 | recv: h2::RecvStream, |
207 | mut content_length: DecodedLength, |
208 | ping: ping::Recorder, |
209 | ) -> Self { |
210 | // If the stream is already EOS, then the "unknown length" is clearly |
211 | // actually ZERO. |
212 | if !content_length.is_exact() && recv.is_end_stream() { |
213 | content_length = DecodedLength::ZERO; |
214 | } |
215 | let body = Body::new(Kind::H2 { |
216 | ping, |
217 | content_length, |
218 | recv, |
219 | }); |
220 | |
221 | body |
222 | } |
223 | |
224 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
225 | #[cfg (feature = "client" )] |
226 | pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { |
227 | self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); |
228 | } |
229 | |
230 | fn take_delayed_eof(&mut self) -> Option<DelayEof> { |
231 | self.extra |
232 | .as_mut() |
233 | .and_then(|extra| extra.delayed_eof.take()) |
234 | } |
235 | |
236 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
237 | fn extra_mut(&mut self) -> &mut Extra { |
238 | self.extra |
239 | .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) |
240 | } |
241 | |
242 | fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { |
243 | match self.take_delayed_eof() { |
244 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
245 | #[cfg (feature = "client" )] |
246 | Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { |
247 | ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { |
248 | self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); |
249 | ok |
250 | } |
251 | Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) { |
252 | Poll::Ready(Ok(never)) => match never {}, |
253 | Poll::Pending => { |
254 | self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); |
255 | Poll::Pending |
256 | } |
257 | Poll::Ready(Err(_done)) => Poll::Ready(None), |
258 | }, |
259 | Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), |
260 | }, |
261 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
262 | #[cfg (feature = "client" )] |
263 | Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { |
264 | Poll::Ready(Ok(never)) => match never {}, |
265 | Poll::Pending => { |
266 | self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); |
267 | Poll::Pending |
268 | } |
269 | Poll::Ready(Err(_done)) => Poll::Ready(None), |
270 | }, |
271 | #[cfg (any( |
272 | not(any(feature = "http1" , feature = "http2" )), |
273 | not(feature = "client" ) |
274 | ))] |
275 | Some(delay_eof) => match delay_eof {}, |
276 | None => self.poll_inner(cx), |
277 | } |
278 | } |
279 | |
280 | #[cfg (feature = "ffi" )] |
281 | pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { |
282 | match self.kind { |
283 | Kind::Ffi(ref mut body) => return body, |
284 | _ => { |
285 | self.kind = Kind::Ffi(crate::ffi::UserBody::new()); |
286 | } |
287 | } |
288 | |
289 | match self.kind { |
290 | Kind::Ffi(ref mut body) => body, |
291 | _ => unreachable!(), |
292 | } |
293 | } |
294 | |
295 | fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { |
296 | match self.kind { |
297 | Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), |
298 | Kind::Chan { |
299 | content_length: ref mut len, |
300 | ref mut data_rx, |
301 | ref mut want_tx, |
302 | .. |
303 | } => { |
304 | want_tx.send(WANT_READY); |
305 | |
306 | match ready!(Pin::new(data_rx).poll_next(cx)?) { |
307 | Some(chunk) => { |
308 | len.sub_if(chunk.len() as u64); |
309 | Poll::Ready(Some(Ok(chunk))) |
310 | } |
311 | None => Poll::Ready(None), |
312 | } |
313 | } |
314 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
315 | Kind::H2 { |
316 | ref ping, |
317 | recv: ref mut h2, |
318 | content_length: ref mut len, |
319 | } => match ready!(h2.poll_data(cx)) { |
320 | Some(Ok(bytes)) => { |
321 | let _ = h2.flow_control().release_capacity(bytes.len()); |
322 | len.sub_if(bytes.len() as u64); |
323 | ping.record_data(bytes.len()); |
324 | Poll::Ready(Some(Ok(bytes))) |
325 | } |
326 | Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), |
327 | None => Poll::Ready(None), |
328 | }, |
329 | |
330 | #[cfg (feature = "ffi" )] |
331 | Kind::Ffi(ref mut body) => body.poll_data(cx), |
332 | |
333 | #[cfg (feature = "stream" )] |
334 | Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { |
335 | Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), |
336 | None => Poll::Ready(None), |
337 | }, |
338 | } |
339 | } |
340 | |
341 | #[cfg (feature = "http1" )] |
342 | pub(super) fn take_full_data(&mut self) -> Option<Bytes> { |
343 | if let Kind::Once(ref mut chunk) = self.kind { |
344 | chunk.take() |
345 | } else { |
346 | None |
347 | } |
348 | } |
349 | } |
350 | |
351 | impl Default for Body { |
352 | /// Returns `Body::empty()`. |
353 | #[inline ] |
354 | fn default() -> Body { |
355 | Body::empty() |
356 | } |
357 | } |
358 | |
359 | impl HttpBody for Body { |
360 | type Data = Bytes; |
361 | type Error = crate::Error; |
362 | |
363 | fn poll_data( |
364 | mut self: Pin<&mut Self>, |
365 | cx: &mut task::Context<'_>, |
366 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
367 | self.poll_eof(cx) |
368 | } |
369 | |
370 | fn poll_trailers( |
371 | #[cfg_attr (not(feature = "http2" ), allow(unused_mut))] mut self: Pin<&mut Self>, |
372 | #[cfg_attr (not(feature = "http2" ), allow(unused))] cx: &mut task::Context<'_>, |
373 | ) -> Poll<Result<Option<HeaderMap>, Self::Error>> { |
374 | match self.kind { |
375 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
376 | Kind::H2 { |
377 | recv: ref mut h2, |
378 | ref ping, |
379 | .. |
380 | } => match ready!(h2.poll_trailers(cx)) { |
381 | Ok(t) => { |
382 | ping.record_non_data(); |
383 | Poll::Ready(Ok(t)) |
384 | } |
385 | Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), |
386 | }, |
387 | Kind::Chan { |
388 | ref mut trailers_rx, |
389 | .. |
390 | } => match ready!(Pin::new(trailers_rx).poll(cx)) { |
391 | Ok(t) => Poll::Ready(Ok(Some(t))), |
392 | Err(_) => Poll::Ready(Ok(None)), |
393 | }, |
394 | #[cfg (feature = "ffi" )] |
395 | Kind::Ffi(ref mut body) => body.poll_trailers(cx), |
396 | _ => Poll::Ready(Ok(None)), |
397 | } |
398 | } |
399 | |
400 | fn is_end_stream(&self) -> bool { |
401 | match self.kind { |
402 | Kind::Once(ref val) => val.is_none(), |
403 | Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, |
404 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
405 | Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), |
406 | #[cfg (feature = "ffi" )] |
407 | Kind::Ffi(..) => false, |
408 | #[cfg (feature = "stream" )] |
409 | Kind::Wrapped(..) => false, |
410 | } |
411 | } |
412 | |
413 | fn size_hint(&self) -> SizeHint { |
414 | macro_rules! opt_len { |
415 | ($content_length:expr) => {{ |
416 | let mut hint = SizeHint::default(); |
417 | |
418 | if let Some(content_length) = $content_length.into_opt() { |
419 | hint.set_exact(content_length); |
420 | } |
421 | |
422 | hint |
423 | }}; |
424 | } |
425 | |
426 | match self.kind { |
427 | Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), |
428 | Kind::Once(None) => SizeHint::with_exact(0), |
429 | #[cfg (feature = "stream" )] |
430 | Kind::Wrapped(..) => SizeHint::default(), |
431 | Kind::Chan { content_length, .. } => opt_len!(content_length), |
432 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
433 | Kind::H2 { content_length, .. } => opt_len!(content_length), |
434 | #[cfg (feature = "ffi" )] |
435 | Kind::Ffi(..) => SizeHint::default(), |
436 | } |
437 | } |
438 | } |
439 | |
440 | impl fmt::Debug for Body { |
441 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
442 | #[derive (Debug)] |
443 | struct Streaming; |
444 | #[derive (Debug)] |
445 | struct Empty; |
446 | #[derive (Debug)] |
447 | struct Full<'a>(&'a Bytes); |
448 | |
449 | let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Body" ); |
450 | match self.kind { |
451 | Kind::Once(None) => builder.field(&Empty), |
452 | Kind::Once(Some(ref chunk: &Bytes)) => builder.field(&Full(chunk)), |
453 | _ => builder.field(&Streaming), |
454 | }; |
455 | |
456 | builder.finish() |
457 | } |
458 | } |
459 | |
460 | /// # Optional |
461 | /// |
462 | /// This function requires enabling the `stream` feature in your |
463 | /// `Cargo.toml`. |
464 | #[cfg (feature = "stream" )] |
465 | impl Stream for Body { |
466 | type Item = crate::Result<Bytes>; |
467 | |
468 | fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { |
469 | HttpBody::poll_data(self, cx) |
470 | } |
471 | } |
472 | |
473 | /// # Optional |
474 | /// |
475 | /// This function requires enabling the `stream` feature in your |
476 | /// `Cargo.toml`. |
477 | #[cfg (feature = "stream" )] |
478 | impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body { |
479 | #[inline ] |
480 | fn from( |
481 | stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, |
482 | ) -> Body { |
483 | Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) |
484 | } |
485 | } |
486 | |
487 | impl From<Bytes> for Body { |
488 | #[inline ] |
489 | fn from(chunk: Bytes) -> Body { |
490 | if chunk.is_empty() { |
491 | Body::empty() |
492 | } else { |
493 | Body::new(Kind::Once(Some(chunk))) |
494 | } |
495 | } |
496 | } |
497 | |
498 | impl From<Vec<u8>> for Body { |
499 | #[inline ] |
500 | fn from(vec: Vec<u8>) -> Body { |
501 | Body::from(Bytes::from(vec)) |
502 | } |
503 | } |
504 | |
505 | impl From<&'static [u8]> for Body { |
506 | #[inline ] |
507 | fn from(slice: &'static [u8]) -> Body { |
508 | Body::from(Bytes::from(slice)) |
509 | } |
510 | } |
511 | |
512 | impl From<Cow<'static, [u8]>> for Body { |
513 | #[inline ] |
514 | fn from(cow: Cow<'static, [u8]>) -> Body { |
515 | match cow { |
516 | Cow::Borrowed(b: &[u8]) => Body::from(b), |
517 | Cow::Owned(o: Vec) => Body::from(o), |
518 | } |
519 | } |
520 | } |
521 | |
522 | impl From<String> for Body { |
523 | #[inline ] |
524 | fn from(s: String) -> Body { |
525 | Body::from(Bytes::from(s.into_bytes())) |
526 | } |
527 | } |
528 | |
529 | impl From<&'static str> for Body { |
530 | #[inline ] |
531 | fn from(slice: &'static str) -> Body { |
532 | Body::from(Bytes::from(slice.as_bytes())) |
533 | } |
534 | } |
535 | |
536 | impl From<Cow<'static, str>> for Body { |
537 | #[inline ] |
538 | fn from(cow: Cow<'static, str>) -> Body { |
539 | match cow { |
540 | Cow::Borrowed(b: &str) => Body::from(b), |
541 | Cow::Owned(o: String) => Body::from(o), |
542 | } |
543 | } |
544 | } |
545 | |
546 | impl Sender { |
547 | /// Check to see if this `Sender` can send more data. |
548 | pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { |
549 | // Check if the receiver end has tried polling for the body yet |
550 | ready!(self.poll_want(cx)?); |
551 | self.data_tx |
552 | .poll_ready(cx) |
553 | .map_err(|_| crate::Error::new_closed()) |
554 | } |
555 | |
556 | fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { |
557 | match self.want_rx.load(cx) { |
558 | WANT_READY => Poll::Ready(Ok(())), |
559 | WANT_PENDING => Poll::Pending, |
560 | watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), |
561 | unexpected => unreachable!("want_rx value: {}" , unexpected), |
562 | } |
563 | } |
564 | |
565 | async fn ready(&mut self) -> crate::Result<()> { |
566 | futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await |
567 | } |
568 | |
569 | /// Send data on data channel when it is ready. |
570 | pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { |
571 | self.ready().await?; |
572 | self.data_tx |
573 | .try_send(Ok(chunk)) |
574 | .map_err(|_| crate::Error::new_closed()) |
575 | } |
576 | |
577 | /// Send trailers on trailers channel. |
578 | pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { |
579 | let tx = match self.trailers_tx.take() { |
580 | Some(tx) => tx, |
581 | None => return Err(crate::Error::new_closed()), |
582 | }; |
583 | tx.send(trailers).map_err(|_| crate::Error::new_closed()) |
584 | } |
585 | |
586 | /// Try to send data on this channel. |
587 | /// |
588 | /// # Errors |
589 | /// |
590 | /// Returns `Err(Bytes)` if the channel could not (currently) accept |
591 | /// another `Bytes`. |
592 | /// |
593 | /// # Note |
594 | /// |
595 | /// This is mostly useful for when trying to send from some other thread |
596 | /// that doesn't have an async context. If in an async context, prefer |
597 | /// `send_data()` instead. |
598 | pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { |
599 | self.data_tx |
600 | .try_send(Ok(chunk)) |
601 | .map_err(|err| err.into_inner().expect("just sent Ok" )) |
602 | } |
603 | |
604 | /// Aborts the body in an abnormal fashion. |
605 | pub fn abort(self) { |
606 | let _ = self |
607 | .data_tx |
608 | // clone so the send works even if buffer is full |
609 | .clone() |
610 | .try_send(Err(crate::Error::new_body_write_aborted())); |
611 | } |
612 | |
613 | #[cfg (feature = "http1" )] |
614 | pub(crate) fn send_error(&mut self, err: crate::Error) { |
615 | let _ = self.data_tx.try_send(Err(err)); |
616 | } |
617 | } |
618 | |
619 | impl fmt::Debug for Sender { |
620 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
621 | #[derive (Debug)] |
622 | struct Open; |
623 | #[derive (Debug)] |
624 | struct Closed; |
625 | |
626 | let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Sender" ); |
627 | match self.want_rx.peek() { |
628 | watch::CLOSED => builder.field(&Closed), |
629 | _ => builder.field(&Open), |
630 | }; |
631 | |
632 | builder.finish() |
633 | } |
634 | } |
635 | |
636 | #[cfg (test)] |
637 | mod tests { |
638 | use std::mem; |
639 | use std::task::Poll; |
640 | |
641 | use super::{Body, DecodedLength, HttpBody, Sender, SizeHint}; |
642 | |
643 | #[test ] |
644 | fn test_size_of() { |
645 | // These are mostly to help catch *accidentally* increasing |
646 | // the size by too much. |
647 | |
648 | let body_size = mem::size_of::<Body>(); |
649 | let body_expected_size = mem::size_of::<u64>() * 6; |
650 | assert!( |
651 | body_size <= body_expected_size, |
652 | "Body size = {} <= {}" , |
653 | body_size, |
654 | body_expected_size, |
655 | ); |
656 | |
657 | assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>" ); |
658 | |
659 | assert_eq!( |
660 | mem::size_of::<Sender>(), |
661 | mem::size_of::<usize>() * 5, |
662 | "Sender" |
663 | ); |
664 | |
665 | assert_eq!( |
666 | mem::size_of::<Sender>(), |
667 | mem::size_of::<Option<Sender>>(), |
668 | "Option<Sender>" |
669 | ); |
670 | } |
671 | |
672 | #[test ] |
673 | fn size_hint() { |
674 | fn eq(body: Body, b: SizeHint, note: &str) { |
675 | let a = body.size_hint(); |
676 | assert_eq!(a.lower(), b.lower(), "lower for {:?}" , note); |
677 | assert_eq!(a.upper(), b.upper(), "upper for {:?}" , note); |
678 | } |
679 | |
680 | eq(Body::from("Hello" ), SizeHint::with_exact(5), "from str" ); |
681 | |
682 | eq(Body::empty(), SizeHint::with_exact(0), "empty" ); |
683 | |
684 | eq(Body::channel().1, SizeHint::new(), "channel" ); |
685 | |
686 | eq( |
687 | Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, |
688 | SizeHint::with_exact(4), |
689 | "channel with length" , |
690 | ); |
691 | } |
692 | |
693 | #[tokio::test] |
694 | async fn channel_abort() { |
695 | let (tx, mut rx) = Body::channel(); |
696 | |
697 | tx.abort(); |
698 | |
699 | let err = rx.data().await.unwrap().unwrap_err(); |
700 | assert!(err.is_body_write_aborted(), " {:?}" , err); |
701 | } |
702 | |
703 | #[tokio::test] |
704 | async fn channel_abort_when_buffer_is_full() { |
705 | let (mut tx, mut rx) = Body::channel(); |
706 | |
707 | tx.try_send_data("chunk 1" .into()).expect("send 1" ); |
708 | // buffer is full, but can still send abort |
709 | tx.abort(); |
710 | |
711 | let chunk1 = rx.data().await.expect("item 1" ).expect("chunk 1" ); |
712 | assert_eq!(chunk1, "chunk 1" ); |
713 | |
714 | let err = rx.data().await.unwrap().unwrap_err(); |
715 | assert!(err.is_body_write_aborted(), " {:?}" , err); |
716 | } |
717 | |
718 | #[test ] |
719 | fn channel_buffers_one() { |
720 | let (mut tx, _rx) = Body::channel(); |
721 | |
722 | tx.try_send_data("chunk 1" .into()).expect("send 1" ); |
723 | |
724 | // buffer is now full |
725 | let chunk2 = tx.try_send_data("chunk 2" .into()).expect_err("send 2" ); |
726 | assert_eq!(chunk2, "chunk 2" ); |
727 | } |
728 | |
729 | #[tokio::test] |
730 | async fn channel_empty() { |
731 | let (_, mut rx) = Body::channel(); |
732 | |
733 | assert!(rx.data().await.is_none()); |
734 | } |
735 | |
736 | #[test ] |
737 | fn channel_ready() { |
738 | let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); |
739 | |
740 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
741 | |
742 | assert!(tx_ready.poll().is_ready(), "tx is ready immediately" ); |
743 | } |
744 | |
745 | #[test ] |
746 | fn channel_wanter() { |
747 | let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); |
748 | |
749 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
750 | let mut rx_data = tokio_test::task::spawn(rx.data()); |
751 | |
752 | assert!( |
753 | tx_ready.poll().is_pending(), |
754 | "tx isn't ready before rx has been polled" |
755 | ); |
756 | |
757 | assert!(rx_data.poll().is_pending(), "poll rx.data" ); |
758 | assert!(tx_ready.is_woken(), "rx poll wakes tx" ); |
759 | |
760 | assert!( |
761 | tx_ready.poll().is_ready(), |
762 | "tx is ready after rx has been polled" |
763 | ); |
764 | } |
765 | |
766 | #[test ] |
767 | fn channel_notices_closure() { |
768 | let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); |
769 | |
770 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
771 | |
772 | assert!( |
773 | tx_ready.poll().is_pending(), |
774 | "tx isn't ready before rx has been polled" |
775 | ); |
776 | |
777 | drop(rx); |
778 | assert!(tx_ready.is_woken(), "dropping rx wakes tx" ); |
779 | |
780 | match tx_ready.poll() { |
781 | Poll::Ready(Err(ref e)) if e.is_closed() => (), |
782 | unexpected => panic!("tx poll ready unexpected: {:?}" , unexpected), |
783 | } |
784 | } |
785 | } |
786 | |