1use std::borrow::Cow;
2#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
3use std::convert::Infallible;
4#[cfg(feature = "stream")]
5use std::error::Error as StdError;
6use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use bytes::Bytes;
12use futures_channel::mpsc;
13use futures_channel::oneshot;
14use futures_core::Stream; // for mpsc::Receiver
15#[cfg(feature = "stream")]
16use futures_util::TryStreamExt;
17use http::HeaderMap;
18use http_body::{Body as HttpBody, SizeHint};
19
20use super::DecodedLength;
21#[cfg(feature = "stream")]
22use crate::common::sync_wrapper::SyncWrapper;
23use crate::common::watch;
24#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
25use crate::proto::h2::ping;
26
27type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
28type TrailersSender = oneshot::Sender<HeaderMap>;
29
30/// A stream of `Bytes`, used when receiving bodies.
31///
32/// A good default [`HttpBody`](crate::body::HttpBody) to use in many
33/// applications.
34///
35/// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes())
36/// or [`body::aggregate`](crate::body::aggregate()).
37#[must_use = "streams do nothing unless polled"]
38pub struct Body {
39 kind: Kind,
40 /// Keep the extra bits in an `Option<Box<Extra>>`, so that
41 /// Body stays small in the common case (no extras needed).
42 extra: Option<Box<Extra>>,
43}
44
45enum Kind {
46 Once(Option<Bytes>),
47 Chan {
48 content_length: DecodedLength,
49 want_tx: watch::Sender,
50 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
51 trailers_rx: oneshot::Receiver<HeaderMap>,
52 },
53 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
54 H2 {
55 ping: ping::Recorder,
56 content_length: DecodedLength,
57 recv: h2::RecvStream,
58 },
59 #[cfg(feature = "ffi")]
60 Ffi(crate::ffi::UserBody),
61 #[cfg(feature = "stream")]
62 Wrapped(
63 SyncWrapper<
64 Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
65 >,
66 ),
67}
68
69struct Extra {
70 /// Allow the client to pass a future to delay the `Body` from returning
71 /// EOF. This allows the `Client` to try to put the idle connection
72 /// back into the pool before the body is "finished".
73 ///
74 /// The reason for this is so that creating a new request after finishing
75 /// streaming the body of a response could sometimes result in creating
76 /// a brand new connection, since the pool didn't know about the idle
77 /// connection yet.
78 delayed_eof: Option<DelayEof>,
79}
80
81#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
82type DelayEofUntil = oneshot::Receiver<Infallible>;
83
84enum DelayEof {
85 /// Initial state, stream hasn't seen EOF yet.
86 #[cfg(any(feature = "http1", feature = "http2"))]
87 #[cfg(feature = "client")]
88 NotEof(DelayEofUntil),
89 /// Transitions to this state once we've seen `poll` try to
90 /// return EOF (`None`). This future is then polled, and
91 /// when it completes, the Body finally returns EOF (`None`).
92 #[cfg(any(feature = "http1", feature = "http2"))]
93 #[cfg(feature = "client")]
94 Eof(DelayEofUntil),
95}
96
97/// A sender half created through [`Body::channel()`].
98///
99/// Useful when wanting to stream chunks from another thread.
100///
101/// ## Body Closing
102///
103/// Note that the request body will always be closed normally when the sender is dropped (meaning
104/// that the empty terminating chunk will be sent to the remote). If you desire to close the
105/// connection with an incomplete response (e.g. in the case of an error during asynchronous
106/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
107///
108/// [`Body::channel()`]: struct.Body.html#method.channel
109/// [`Sender::abort()`]: struct.Sender.html#method.abort
110#[must_use = "Sender does nothing unless sent on"]
111pub struct Sender {
112 want_rx: watch::Receiver,
113 data_tx: BodySender,
114 trailers_tx: Option<TrailersSender>,
115}
116
117const WANT_PENDING: usize = 1;
118const WANT_READY: usize = 2;
119
120impl Body {
121 /// Create an empty `Body` stream.
122 ///
123 /// # Example
124 ///
125 /// ```
126 /// use hyper::{Body, Request};
127 ///
128 /// // create a `GET /` request
129 /// let get = Request::new(Body::empty());
130 /// ```
131 #[inline]
132 pub fn empty() -> Body {
133 Body::new(Kind::Once(None))
134 }
135
136 /// Create a `Body` stream with an associated sender half.
137 ///
138 /// Useful when wanting to stream chunks from another thread.
139 #[inline]
140 pub fn channel() -> (Sender, Body) {
141 Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
142 }
143
144 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
145 let (data_tx, data_rx) = mpsc::channel(0);
146 let (trailers_tx, trailers_rx) = oneshot::channel();
147
148 // If wanter is true, `Sender::poll_ready()` won't becoming ready
149 // until the `Body` has been polled for data once.
150 let want = if wanter { WANT_PENDING } else { WANT_READY };
151
152 let (want_tx, want_rx) = watch::channel(want);
153
154 let tx = Sender {
155 want_rx,
156 data_tx,
157 trailers_tx: Some(trailers_tx),
158 };
159 let rx = Body::new(Kind::Chan {
160 content_length,
161 want_tx,
162 data_rx,
163 trailers_rx,
164 });
165
166 (tx, rx)
167 }
168
169 /// Wrap a futures `Stream` in a box inside `Body`.
170 ///
171 /// # Example
172 ///
173 /// ```
174 /// # use hyper::Body;
175 /// let chunks: Vec<Result<_, std::io::Error>> = vec![
176 /// Ok("hello"),
177 /// Ok(" "),
178 /// Ok("world"),
179 /// ];
180 ///
181 /// let stream = futures_util::stream::iter(chunks);
182 ///
183 /// let body = Body::wrap_stream(stream);
184 /// ```
185 ///
186 /// # Optional
187 ///
188 /// This function requires enabling the `stream` feature in your
189 /// `Cargo.toml`.
190 #[cfg(feature = "stream")]
191 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
192 pub fn wrap_stream<S, O, E>(stream: S) -> Body
193 where
194 S: Stream<Item = Result<O, E>> + Send + 'static,
195 O: Into<Bytes> + 'static,
196 E: Into<Box<dyn StdError + Send + Sync>> + 'static,
197 {
198 let mapped = stream.map_ok(Into::into).map_err(Into::into);
199 Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
200 }
201
202 fn new(kind: Kind) -> Body {
203 Body { kind, extra: None }
204 }
205
206 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
207 pub(crate) fn h2(
208 recv: h2::RecvStream,
209 mut content_length: DecodedLength,
210 ping: ping::Recorder,
211 ) -> Self {
212 // If the stream is already EOS, then the "unknown length" is clearly
213 // actually ZERO.
214 if !content_length.is_exact() && recv.is_end_stream() {
215 content_length = DecodedLength::ZERO;
216 }
217 let body = Body::new(Kind::H2 {
218 ping,
219 content_length,
220 recv,
221 });
222
223 body
224 }
225
226 #[cfg(any(feature = "http1", feature = "http2"))]
227 #[cfg(feature = "client")]
228 pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
229 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
230 }
231
232 fn take_delayed_eof(&mut self) -> Option<DelayEof> {
233 self.extra
234 .as_mut()
235 .and_then(|extra| extra.delayed_eof.take())
236 }
237
238 #[cfg(any(feature = "http1", feature = "http2"))]
239 fn extra_mut(&mut self) -> &mut Extra {
240 self.extra
241 .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
242 }
243
244 fn poll_eof(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
245 match self.take_delayed_eof() {
246 #[cfg(any(feature = "http1", feature = "http2"))]
247 #[cfg(feature = "client")]
248 Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
249 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
250 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
251 ok
252 }
253 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
254 Poll::Ready(Ok(never)) => match never {},
255 Poll::Pending => {
256 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
257 Poll::Pending
258 }
259 Poll::Ready(Err(_done)) => Poll::Ready(None),
260 },
261 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
262 },
263 #[cfg(any(feature = "http1", feature = "http2"))]
264 #[cfg(feature = "client")]
265 Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
266 Poll::Ready(Ok(never)) => match never {},
267 Poll::Pending => {
268 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
269 Poll::Pending
270 }
271 Poll::Ready(Err(_done)) => Poll::Ready(None),
272 },
273 #[cfg(any(
274 not(any(feature = "http1", feature = "http2")),
275 not(feature = "client")
276 ))]
277 Some(delay_eof) => match delay_eof {},
278 None => self.poll_inner(cx),
279 }
280 }
281
282 #[cfg(feature = "ffi")]
283 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
284 match self.kind {
285 Kind::Ffi(ref mut body) => return body,
286 _ => {
287 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
288 }
289 }
290
291 match self.kind {
292 Kind::Ffi(ref mut body) => body,
293 _ => unreachable!(),
294 }
295 }
296
297 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
298 match self.kind {
299 Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
300 Kind::Chan {
301 content_length: ref mut len,
302 ref mut data_rx,
303 ref mut want_tx,
304 ..
305 } => {
306 want_tx.send(WANT_READY);
307
308 match ready!(Pin::new(data_rx).poll_next(cx)?) {
309 Some(chunk) => {
310 len.sub_if(chunk.len() as u64);
311 Poll::Ready(Some(Ok(chunk)))
312 }
313 None => Poll::Ready(None),
314 }
315 }
316 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
317 Kind::H2 {
318 ref ping,
319 recv: ref mut h2,
320 content_length: ref mut len,
321 } => match ready!(h2.poll_data(cx)) {
322 Some(Ok(bytes)) => {
323 let _ = h2.flow_control().release_capacity(bytes.len());
324 len.sub_if(bytes.len() as u64);
325 ping.record_data(bytes.len());
326 Poll::Ready(Some(Ok(bytes)))
327 }
328 Some(Err(e)) => match e.reason() {
329 // These reasons should cause stop of body reading, but nor fail it.
330 // The same logic as for `AsyncRead for H2Upgraded` is applied here.
331 Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None),
332 _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
333 },
334 None => Poll::Ready(None),
335 },
336
337 #[cfg(feature = "ffi")]
338 Kind::Ffi(ref mut body) => body.poll_data(cx),
339
340 #[cfg(feature = "stream")]
341 Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
342 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
343 None => Poll::Ready(None),
344 },
345 }
346 }
347
348 #[cfg(feature = "http1")]
349 pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
350 if let Kind::Once(ref mut chunk) = self.kind {
351 chunk.take()
352 } else {
353 None
354 }
355 }
356}
357
358impl Default for Body {
359 /// Returns `Body::empty()`.
360 #[inline]
361 fn default() -> Body {
362 Body::empty()
363 }
364}
365
366impl HttpBody for Body {
367 type Data = Bytes;
368 type Error = crate::Error;
369
370 fn poll_data(
371 mut self: Pin<&mut Self>,
372 cx: &mut Context<'_>,
373 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
374 self.poll_eof(cx)
375 }
376
377 fn poll_trailers(
378 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
379 #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut Context<'_>,
380 ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
381 match self.kind {
382 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
383 Kind::H2 {
384 recv: ref mut h2,
385 ref ping,
386 ..
387 } => match ready!(h2.poll_trailers(cx)) {
388 Ok(t) => {
389 ping.record_non_data();
390 Poll::Ready(Ok(t))
391 }
392 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
393 },
394 Kind::Chan {
395 ref mut trailers_rx,
396 ..
397 } => match ready!(Pin::new(trailers_rx).poll(cx)) {
398 Ok(t) => Poll::Ready(Ok(Some(t))),
399 Err(_) => Poll::Ready(Ok(None)),
400 },
401 #[cfg(feature = "ffi")]
402 Kind::Ffi(ref mut body) => body.poll_trailers(cx),
403 _ => Poll::Ready(Ok(None)),
404 }
405 }
406
407 fn is_end_stream(&self) -> bool {
408 match self.kind {
409 Kind::Once(ref val) => val.is_none(),
410 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
411 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
412 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
413 #[cfg(feature = "ffi")]
414 Kind::Ffi(..) => false,
415 #[cfg(feature = "stream")]
416 Kind::Wrapped(..) => false,
417 }
418 }
419
420 fn size_hint(&self) -> SizeHint {
421 macro_rules! opt_len {
422 ($content_length:expr) => {{
423 let mut hint = SizeHint::default();
424
425 if let Some(content_length) = $content_length.into_opt() {
426 hint.set_exact(content_length);
427 }
428
429 hint
430 }};
431 }
432
433 match self.kind {
434 Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
435 Kind::Once(None) => SizeHint::with_exact(0),
436 #[cfg(feature = "stream")]
437 Kind::Wrapped(..) => SizeHint::default(),
438 Kind::Chan { content_length, .. } => opt_len!(content_length),
439 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
440 Kind::H2 { content_length, .. } => opt_len!(content_length),
441 #[cfg(feature = "ffi")]
442 Kind::Ffi(..) => SizeHint::default(),
443 }
444 }
445}
446
447impl fmt::Debug for Body {
448 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449 #[derive(Debug)]
450 struct Streaming;
451 #[derive(Debug)]
452 struct Empty;
453 #[derive(Debug)]
454 struct Full<'a>(&'a Bytes);
455
456 let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Body");
457 match self.kind {
458 Kind::Once(None) => builder.field(&Empty),
459 Kind::Once(Some(ref chunk: &Bytes)) => builder.field(&Full(chunk)),
460 _ => builder.field(&Streaming),
461 };
462
463 builder.finish()
464 }
465}
466
467/// # Optional
468///
469/// This function requires enabling the `stream` feature in your
470/// `Cargo.toml`.
471#[cfg(feature = "stream")]
472impl Stream for Body {
473 type Item = crate::Result<Bytes>;
474
475 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
476 HttpBody::poll_data(self, cx)
477 }
478}
479
480/// # Optional
481///
482/// This function requires enabling the `stream` feature in your
483/// `Cargo.toml`.
484#[cfg(feature = "stream")]
485impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
486 #[inline]
487 fn from(
488 stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
489 ) -> Body {
490 Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
491 }
492}
493
494impl From<Bytes> for Body {
495 #[inline]
496 fn from(chunk: Bytes) -> Body {
497 if chunk.is_empty() {
498 Body::empty()
499 } else {
500 Body::new(Kind::Once(Some(chunk)))
501 }
502 }
503}
504
505impl From<Vec<u8>> for Body {
506 #[inline]
507 fn from(vec: Vec<u8>) -> Body {
508 Body::from(Bytes::from(vec))
509 }
510}
511
512impl From<&'static [u8]> for Body {
513 #[inline]
514 fn from(slice: &'static [u8]) -> Body {
515 Body::from(Bytes::from(slice))
516 }
517}
518
519impl From<Cow<'static, [u8]>> for Body {
520 #[inline]
521 fn from(cow: Cow<'static, [u8]>) -> Body {
522 match cow {
523 Cow::Borrowed(b: &[u8]) => Body::from(b),
524 Cow::Owned(o: Vec) => Body::from(o),
525 }
526 }
527}
528
529impl From<String> for Body {
530 #[inline]
531 fn from(s: String) -> Body {
532 Body::from(Bytes::from(s.into_bytes()))
533 }
534}
535
536impl From<&'static str> for Body {
537 #[inline]
538 fn from(slice: &'static str) -> Body {
539 Body::from(Bytes::from(slice.as_bytes()))
540 }
541}
542
543impl From<Cow<'static, str>> for Body {
544 #[inline]
545 fn from(cow: Cow<'static, str>) -> Body {
546 match cow {
547 Cow::Borrowed(b: &str) => Body::from(b),
548 Cow::Owned(o: String) => Body::from(o),
549 }
550 }
551}
552
553impl Sender {
554 /// Check to see if this `Sender` can send more data.
555 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
556 // Check if the receiver end has tried polling for the body yet
557 ready!(self.poll_want(cx)?);
558 self.data_tx
559 .poll_ready(cx)
560 .map_err(|_| crate::Error::new_closed())
561 }
562
563 fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
564 match self.want_rx.load(cx) {
565 WANT_READY => Poll::Ready(Ok(())),
566 WANT_PENDING => Poll::Pending,
567 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
568 unexpected => unreachable!("want_rx value: {}", unexpected),
569 }
570 }
571
572 async fn ready(&mut self) -> crate::Result<()> {
573 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
574 }
575
576 /// Send data on data channel when it is ready.
577 pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
578 self.ready().await?;
579 self.data_tx
580 .try_send(Ok(chunk))
581 .map_err(|_| crate::Error::new_closed())
582 }
583
584 /// Send trailers on trailers channel.
585 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
586 let tx = match self.trailers_tx.take() {
587 Some(tx) => tx,
588 None => return Err(crate::Error::new_closed()),
589 };
590 tx.send(trailers).map_err(|_| crate::Error::new_closed())
591 }
592
593 /// Try to send data on this channel.
594 ///
595 /// # Errors
596 ///
597 /// Returns `Err(Bytes)` if the channel could not (currently) accept
598 /// another `Bytes`.
599 ///
600 /// # Note
601 ///
602 /// This is mostly useful for when trying to send from some other thread
603 /// that doesn't have an async context. If in an async context, prefer
604 /// `send_data()` instead.
605 pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
606 self.data_tx
607 .try_send(Ok(chunk))
608 .map_err(|err| err.into_inner().expect("just sent Ok"))
609 }
610
611 /// Aborts the body in an abnormal fashion.
612 pub fn abort(mut self) {
613 self.send_error(crate::Error::new_body_write_aborted());
614 }
615
616 pub(crate) fn send_error(&mut self, err: crate::Error) {
617 let _ = self
618 .data_tx
619 // clone so the send works even if buffer is full
620 .clone()
621 .try_send(Err(err));
622 }
623}
624
625impl fmt::Debug for Sender {
626 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627 #[derive(Debug)]
628 struct Open;
629 #[derive(Debug)]
630 struct Closed;
631
632 let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Sender");
633 match self.want_rx.peek() {
634 watch::CLOSED => builder.field(&Closed),
635 _ => builder.field(&Open),
636 };
637
638 builder.finish()
639 }
640}
641
642#[cfg(test)]
643mod tests {
644 use std::mem;
645 use std::task::Poll;
646
647 use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
648
649 #[test]
650 fn test_size_of() {
651 // These are mostly to help catch *accidentally* increasing
652 // the size by too much.
653
654 let body_size = mem::size_of::<Body>();
655 let body_expected_size = mem::size_of::<u64>() * 6;
656 assert!(
657 body_size <= body_expected_size,
658 "Body size = {} <= {}",
659 body_size,
660 body_expected_size,
661 );
662
663 assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
664
665 assert_eq!(
666 mem::size_of::<Sender>(),
667 mem::size_of::<usize>() * 5,
668 "Sender"
669 );
670
671 assert_eq!(
672 mem::size_of::<Sender>(),
673 mem::size_of::<Option<Sender>>(),
674 "Option<Sender>"
675 );
676 }
677
678 #[test]
679 fn size_hint() {
680 fn eq(body: Body, b: SizeHint, note: &str) {
681 let a = body.size_hint();
682 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
683 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
684 }
685
686 eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
687
688 eq(Body::empty(), SizeHint::with_exact(0), "empty");
689
690 eq(Body::channel().1, SizeHint::new(), "channel");
691
692 eq(
693 Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
694 SizeHint::with_exact(4),
695 "channel with length",
696 );
697 }
698
699 #[tokio::test]
700 async fn channel_abort() {
701 let (tx, mut rx) = Body::channel();
702
703 tx.abort();
704
705 let err = rx.data().await.unwrap().unwrap_err();
706 assert!(err.is_body_write_aborted(), "{:?}", err);
707 }
708
709 #[tokio::test]
710 async fn channel_abort_when_buffer_is_full() {
711 let (mut tx, mut rx) = Body::channel();
712
713 tx.try_send_data("chunk 1".into()).expect("send 1");
714 // buffer is full, but can still send abort
715 tx.abort();
716
717 let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
718 assert_eq!(chunk1, "chunk 1");
719
720 let err = rx.data().await.unwrap().unwrap_err();
721 assert!(err.is_body_write_aborted(), "{:?}", err);
722 }
723
724 #[test]
725 fn channel_buffers_one() {
726 let (mut tx, _rx) = Body::channel();
727
728 tx.try_send_data("chunk 1".into()).expect("send 1");
729
730 // buffer is now full
731 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
732 assert_eq!(chunk2, "chunk 2");
733 }
734
735 #[tokio::test]
736 async fn channel_empty() {
737 let (_, mut rx) = Body::channel();
738
739 assert!(rx.data().await.is_none());
740 }
741
742 #[test]
743 fn channel_ready() {
744 let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
745
746 let mut tx_ready = tokio_test::task::spawn(tx.ready());
747
748 assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
749 }
750
751 #[test]
752 fn channel_wanter() {
753 let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
754
755 let mut tx_ready = tokio_test::task::spawn(tx.ready());
756 let mut rx_data = tokio_test::task::spawn(rx.data());
757
758 assert!(
759 tx_ready.poll().is_pending(),
760 "tx isn't ready before rx has been polled"
761 );
762
763 assert!(rx_data.poll().is_pending(), "poll rx.data");
764 assert!(tx_ready.is_woken(), "rx poll wakes tx");
765
766 assert!(
767 tx_ready.poll().is_ready(),
768 "tx is ready after rx has been polled"
769 );
770 }
771
772 #[test]
773 fn channel_notices_closure() {
774 let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
775
776 let mut tx_ready = tokio_test::task::spawn(tx.ready());
777
778 assert!(
779 tx_ready.poll().is_pending(),
780 "tx isn't ready before rx has been polled"
781 );
782
783 drop(rx);
784 assert!(tx_ready.is_woken(), "dropping rx wakes tx");
785
786 match tx_ready.poll() {
787 Poll::Ready(Err(ref e)) if e.is_closed() => (),
788 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
789 }
790 }
791}
792