1use std::borrow::Cow;
2#[cfg(feature = "stream")]
3use std::error::Error as StdError;
4use std::fmt;
5
6use bytes::Bytes;
7use futures_channel::mpsc;
8use futures_channel::oneshot;
9use futures_core::Stream; // for mpsc::Receiver
10#[cfg(feature = "stream")]
11use futures_util::TryStreamExt;
12use http::HeaderMap;
13use http_body::{Body as HttpBody, SizeHint};
14
15use super::DecodedLength;
16#[cfg(feature = "stream")]
17use crate::common::sync_wrapper::SyncWrapper;
18use crate::common::Future;
19#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
20use crate::common::Never;
21use crate::common::{task, watch, Pin, Poll};
22#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
23use crate::proto::h2::ping;
24
25type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
26type TrailersSender = oneshot::Sender<HeaderMap>;
27
28/// A stream of `Bytes`, used when receiving bodies.
29///
30/// A good default [`HttpBody`](crate::body::HttpBody) to use in many
31/// applications.
32///
33/// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes)
34/// or [`body::aggregate`](crate::body::aggregate).
35#[must_use = "streams do nothing unless polled"]
36pub struct Body {
37 kind: Kind,
38 /// Keep the extra bits in an `Option<Box<Extra>>`, so that
39 /// Body stays small in the common case (no extras needed).
40 extra: Option<Box<Extra>>,
41}
42
43enum Kind {
44 Once(Option<Bytes>),
45 Chan {
46 content_length: DecodedLength,
47 want_tx: watch::Sender,
48 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
49 trailers_rx: oneshot::Receiver<HeaderMap>,
50 },
51 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
52 H2 {
53 ping: ping::Recorder,
54 content_length: DecodedLength,
55 recv: h2::RecvStream,
56 },
57 #[cfg(feature = "ffi")]
58 Ffi(crate::ffi::UserBody),
59 #[cfg(feature = "stream")]
60 Wrapped(
61 SyncWrapper<
62 Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
63 >,
64 ),
65}
66
67struct Extra {
68 /// Allow the client to pass a future to delay the `Body` from returning
69 /// EOF. This allows the `Client` to try to put the idle connection
70 /// back into the pool before the body is "finished".
71 ///
72 /// The reason for this is so that creating a new request after finishing
73 /// streaming the body of a response could sometimes result in creating
74 /// a brand new connection, since the pool didn't know about the idle
75 /// connection yet.
76 delayed_eof: Option<DelayEof>,
77}
78
79#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
80type DelayEofUntil = oneshot::Receiver<Never>;
81
82enum DelayEof {
83 /// Initial state, stream hasn't seen EOF yet.
84 #[cfg(any(feature = "http1", feature = "http2"))]
85 #[cfg(feature = "client")]
86 NotEof(DelayEofUntil),
87 /// Transitions to this state once we've seen `poll` try to
88 /// return EOF (`None`). This future is then polled, and
89 /// when it completes, the Body finally returns EOF (`None`).
90 #[cfg(any(feature = "http1", feature = "http2"))]
91 #[cfg(feature = "client")]
92 Eof(DelayEofUntil),
93}
94
95/// A sender half created through [`Body::channel()`].
96///
97/// Useful when wanting to stream chunks from another thread.
98///
99/// ## Body Closing
100///
101/// Note that the request body will always be closed normally when the sender is dropped (meaning
102/// that the empty terminating chunk will be sent to the remote). If you desire to close the
103/// connection with an incomplete response (e.g. in the case of an error during asynchronous
104/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
105///
106/// [`Body::channel()`]: struct.Body.html#method.channel
107/// [`Sender::abort()`]: struct.Sender.html#method.abort
108#[must_use = "Sender does nothing unless sent on"]
109pub struct Sender {
110 want_rx: watch::Receiver,
111 data_tx: BodySender,
112 trailers_tx: Option<TrailersSender>,
113}
114
115const WANT_PENDING: usize = 1;
116const WANT_READY: usize = 2;
117
118impl Body {
119 /// Create an empty `Body` stream.
120 ///
121 /// # Example
122 ///
123 /// ```
124 /// use hyper::{Body, Request};
125 ///
126 /// // create a `GET /` request
127 /// let get = Request::new(Body::empty());
128 /// ```
129 #[inline]
130 pub fn empty() -> Body {
131 Body::new(Kind::Once(None))
132 }
133
134 /// Create a `Body` stream with an associated sender half.
135 ///
136 /// Useful when wanting to stream chunks from another thread.
137 #[inline]
138 pub fn channel() -> (Sender, Body) {
139 Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
140 }
141
142 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
143 let (data_tx, data_rx) = mpsc::channel(0);
144 let (trailers_tx, trailers_rx) = oneshot::channel();
145
146 // If wanter is true, `Sender::poll_ready()` won't becoming ready
147 // until the `Body` has been polled for data once.
148 let want = if wanter { WANT_PENDING } else { WANT_READY };
149
150 let (want_tx, want_rx) = watch::channel(want);
151
152 let tx = Sender {
153 want_rx,
154 data_tx,
155 trailers_tx: Some(trailers_tx),
156 };
157 let rx = Body::new(Kind::Chan {
158 content_length,
159 want_tx,
160 data_rx,
161 trailers_rx,
162 });
163
164 (tx, rx)
165 }
166
167 /// Wrap a futures `Stream` in a box inside `Body`.
168 ///
169 /// # Example
170 ///
171 /// ```
172 /// # use hyper::Body;
173 /// let chunks: Vec<Result<_, std::io::Error>> = vec![
174 /// Ok("hello"),
175 /// Ok(" "),
176 /// Ok("world"),
177 /// ];
178 ///
179 /// let stream = futures_util::stream::iter(chunks);
180 ///
181 /// let body = Body::wrap_stream(stream);
182 /// ```
183 ///
184 /// # Optional
185 ///
186 /// This function requires enabling the `stream` feature in your
187 /// `Cargo.toml`.
188 #[cfg(feature = "stream")]
189 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
190 pub fn wrap_stream<S, O, E>(stream: S) -> Body
191 where
192 S: Stream<Item = Result<O, E>> + Send + 'static,
193 O: Into<Bytes> + 'static,
194 E: Into<Box<dyn StdError + Send + Sync>> + 'static,
195 {
196 let mapped = stream.map_ok(Into::into).map_err(Into::into);
197 Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
198 }
199
200 fn new(kind: Kind) -> Body {
201 Body { kind, extra: None }
202 }
203
204 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
205 pub(crate) fn h2(
206 recv: h2::RecvStream,
207 mut content_length: DecodedLength,
208 ping: ping::Recorder,
209 ) -> Self {
210 // If the stream is already EOS, then the "unknown length" is clearly
211 // actually ZERO.
212 if !content_length.is_exact() && recv.is_end_stream() {
213 content_length = DecodedLength::ZERO;
214 }
215 let body = Body::new(Kind::H2 {
216 ping,
217 content_length,
218 recv,
219 });
220
221 body
222 }
223
224 #[cfg(any(feature = "http1", feature = "http2"))]
225 #[cfg(feature = "client")]
226 pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
227 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
228 }
229
230 fn take_delayed_eof(&mut self) -> Option<DelayEof> {
231 self.extra
232 .as_mut()
233 .and_then(|extra| extra.delayed_eof.take())
234 }
235
236 #[cfg(any(feature = "http1", feature = "http2"))]
237 fn extra_mut(&mut self) -> &mut Extra {
238 self.extra
239 .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
240 }
241
242 fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
243 match self.take_delayed_eof() {
244 #[cfg(any(feature = "http1", feature = "http2"))]
245 #[cfg(feature = "client")]
246 Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
247 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
248 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
249 ok
250 }
251 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
252 Poll::Ready(Ok(never)) => match never {},
253 Poll::Pending => {
254 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
255 Poll::Pending
256 }
257 Poll::Ready(Err(_done)) => Poll::Ready(None),
258 },
259 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
260 },
261 #[cfg(any(feature = "http1", feature = "http2"))]
262 #[cfg(feature = "client")]
263 Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
264 Poll::Ready(Ok(never)) => match never {},
265 Poll::Pending => {
266 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
267 Poll::Pending
268 }
269 Poll::Ready(Err(_done)) => Poll::Ready(None),
270 },
271 #[cfg(any(
272 not(any(feature = "http1", feature = "http2")),
273 not(feature = "client")
274 ))]
275 Some(delay_eof) => match delay_eof {},
276 None => self.poll_inner(cx),
277 }
278 }
279
280 #[cfg(feature = "ffi")]
281 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
282 match self.kind {
283 Kind::Ffi(ref mut body) => return body,
284 _ => {
285 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
286 }
287 }
288
289 match self.kind {
290 Kind::Ffi(ref mut body) => body,
291 _ => unreachable!(),
292 }
293 }
294
295 fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
296 match self.kind {
297 Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
298 Kind::Chan {
299 content_length: ref mut len,
300 ref mut data_rx,
301 ref mut want_tx,
302 ..
303 } => {
304 want_tx.send(WANT_READY);
305
306 match ready!(Pin::new(data_rx).poll_next(cx)?) {
307 Some(chunk) => {
308 len.sub_if(chunk.len() as u64);
309 Poll::Ready(Some(Ok(chunk)))
310 }
311 None => Poll::Ready(None),
312 }
313 }
314 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
315 Kind::H2 {
316 ref ping,
317 recv: ref mut h2,
318 content_length: ref mut len,
319 } => match ready!(h2.poll_data(cx)) {
320 Some(Ok(bytes)) => {
321 let _ = h2.flow_control().release_capacity(bytes.len());
322 len.sub_if(bytes.len() as u64);
323 ping.record_data(bytes.len());
324 Poll::Ready(Some(Ok(bytes)))
325 }
326 Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
327 None => Poll::Ready(None),
328 },
329
330 #[cfg(feature = "ffi")]
331 Kind::Ffi(ref mut body) => body.poll_data(cx),
332
333 #[cfg(feature = "stream")]
334 Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
335 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
336 None => Poll::Ready(None),
337 },
338 }
339 }
340
341 #[cfg(feature = "http1")]
342 pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
343 if let Kind::Once(ref mut chunk) = self.kind {
344 chunk.take()
345 } else {
346 None
347 }
348 }
349}
350
351impl Default for Body {
352 /// Returns `Body::empty()`.
353 #[inline]
354 fn default() -> Body {
355 Body::empty()
356 }
357}
358
359impl HttpBody for Body {
360 type Data = Bytes;
361 type Error = crate::Error;
362
363 fn poll_data(
364 mut self: Pin<&mut Self>,
365 cx: &mut task::Context<'_>,
366 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
367 self.poll_eof(cx)
368 }
369
370 fn poll_trailers(
371 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
372 #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
373 ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
374 match self.kind {
375 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
376 Kind::H2 {
377 recv: ref mut h2,
378 ref ping,
379 ..
380 } => match ready!(h2.poll_trailers(cx)) {
381 Ok(t) => {
382 ping.record_non_data();
383 Poll::Ready(Ok(t))
384 }
385 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
386 },
387 Kind::Chan {
388 ref mut trailers_rx,
389 ..
390 } => match ready!(Pin::new(trailers_rx).poll(cx)) {
391 Ok(t) => Poll::Ready(Ok(Some(t))),
392 Err(_) => Poll::Ready(Ok(None)),
393 },
394 #[cfg(feature = "ffi")]
395 Kind::Ffi(ref mut body) => body.poll_trailers(cx),
396 _ => Poll::Ready(Ok(None)),
397 }
398 }
399
400 fn is_end_stream(&self) -> bool {
401 match self.kind {
402 Kind::Once(ref val) => val.is_none(),
403 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
404 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
405 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
406 #[cfg(feature = "ffi")]
407 Kind::Ffi(..) => false,
408 #[cfg(feature = "stream")]
409 Kind::Wrapped(..) => false,
410 }
411 }
412
413 fn size_hint(&self) -> SizeHint {
414 macro_rules! opt_len {
415 ($content_length:expr) => {{
416 let mut hint = SizeHint::default();
417
418 if let Some(content_length) = $content_length.into_opt() {
419 hint.set_exact(content_length);
420 }
421
422 hint
423 }};
424 }
425
426 match self.kind {
427 Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
428 Kind::Once(None) => SizeHint::with_exact(0),
429 #[cfg(feature = "stream")]
430 Kind::Wrapped(..) => SizeHint::default(),
431 Kind::Chan { content_length, .. } => opt_len!(content_length),
432 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
433 Kind::H2 { content_length, .. } => opt_len!(content_length),
434 #[cfg(feature = "ffi")]
435 Kind::Ffi(..) => SizeHint::default(),
436 }
437 }
438}
439
440impl fmt::Debug for Body {
441 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
442 #[derive(Debug)]
443 struct Streaming;
444 #[derive(Debug)]
445 struct Empty;
446 #[derive(Debug)]
447 struct Full<'a>(&'a Bytes);
448
449 let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Body");
450 match self.kind {
451 Kind::Once(None) => builder.field(&Empty),
452 Kind::Once(Some(ref chunk: &Bytes)) => builder.field(&Full(chunk)),
453 _ => builder.field(&Streaming),
454 };
455
456 builder.finish()
457 }
458}
459
460/// # Optional
461///
462/// This function requires enabling the `stream` feature in your
463/// `Cargo.toml`.
464#[cfg(feature = "stream")]
465impl Stream for Body {
466 type Item = crate::Result<Bytes>;
467
468 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
469 HttpBody::poll_data(self, cx)
470 }
471}
472
473/// # Optional
474///
475/// This function requires enabling the `stream` feature in your
476/// `Cargo.toml`.
477#[cfg(feature = "stream")]
478impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
479 #[inline]
480 fn from(
481 stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
482 ) -> Body {
483 Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
484 }
485}
486
487impl From<Bytes> for Body {
488 #[inline]
489 fn from(chunk: Bytes) -> Body {
490 if chunk.is_empty() {
491 Body::empty()
492 } else {
493 Body::new(Kind::Once(Some(chunk)))
494 }
495 }
496}
497
498impl From<Vec<u8>> for Body {
499 #[inline]
500 fn from(vec: Vec<u8>) -> Body {
501 Body::from(Bytes::from(vec))
502 }
503}
504
505impl From<&'static [u8]> for Body {
506 #[inline]
507 fn from(slice: &'static [u8]) -> Body {
508 Body::from(Bytes::from(slice))
509 }
510}
511
512impl From<Cow<'static, [u8]>> for Body {
513 #[inline]
514 fn from(cow: Cow<'static, [u8]>) -> Body {
515 match cow {
516 Cow::Borrowed(b: &[u8]) => Body::from(b),
517 Cow::Owned(o: Vec) => Body::from(o),
518 }
519 }
520}
521
522impl From<String> for Body {
523 #[inline]
524 fn from(s: String) -> Body {
525 Body::from(Bytes::from(s.into_bytes()))
526 }
527}
528
529impl From<&'static str> for Body {
530 #[inline]
531 fn from(slice: &'static str) -> Body {
532 Body::from(Bytes::from(slice.as_bytes()))
533 }
534}
535
536impl From<Cow<'static, str>> for Body {
537 #[inline]
538 fn from(cow: Cow<'static, str>) -> Body {
539 match cow {
540 Cow::Borrowed(b: &str) => Body::from(b),
541 Cow::Owned(o: String) => Body::from(o),
542 }
543 }
544}
545
546impl Sender {
547 /// Check to see if this `Sender` can send more data.
548 pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
549 // Check if the receiver end has tried polling for the body yet
550 ready!(self.poll_want(cx)?);
551 self.data_tx
552 .poll_ready(cx)
553 .map_err(|_| crate::Error::new_closed())
554 }
555
556 fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
557 match self.want_rx.load(cx) {
558 WANT_READY => Poll::Ready(Ok(())),
559 WANT_PENDING => Poll::Pending,
560 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
561 unexpected => unreachable!("want_rx value: {}", unexpected),
562 }
563 }
564
565 async fn ready(&mut self) -> crate::Result<()> {
566 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
567 }
568
569 /// Send data on data channel when it is ready.
570 pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
571 self.ready().await?;
572 self.data_tx
573 .try_send(Ok(chunk))
574 .map_err(|_| crate::Error::new_closed())
575 }
576
577 /// Send trailers on trailers channel.
578 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
579 let tx = match self.trailers_tx.take() {
580 Some(tx) => tx,
581 None => return Err(crate::Error::new_closed()),
582 };
583 tx.send(trailers).map_err(|_| crate::Error::new_closed())
584 }
585
586 /// Try to send data on this channel.
587 ///
588 /// # Errors
589 ///
590 /// Returns `Err(Bytes)` if the channel could not (currently) accept
591 /// another `Bytes`.
592 ///
593 /// # Note
594 ///
595 /// This is mostly useful for when trying to send from some other thread
596 /// that doesn't have an async context. If in an async context, prefer
597 /// `send_data()` instead.
598 pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
599 self.data_tx
600 .try_send(Ok(chunk))
601 .map_err(|err| err.into_inner().expect("just sent Ok"))
602 }
603
604 /// Aborts the body in an abnormal fashion.
605 pub fn abort(self) {
606 let _ = self
607 .data_tx
608 // clone so the send works even if buffer is full
609 .clone()
610 .try_send(Err(crate::Error::new_body_write_aborted()));
611 }
612
613 #[cfg(feature = "http1")]
614 pub(crate) fn send_error(&mut self, err: crate::Error) {
615 let _ = self.data_tx.try_send(Err(err));
616 }
617}
618
619impl fmt::Debug for Sender {
620 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621 #[derive(Debug)]
622 struct Open;
623 #[derive(Debug)]
624 struct Closed;
625
626 let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Sender");
627 match self.want_rx.peek() {
628 watch::CLOSED => builder.field(&Closed),
629 _ => builder.field(&Open),
630 };
631
632 builder.finish()
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use std::mem;
639 use std::task::Poll;
640
641 use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
642
643 #[test]
644 fn test_size_of() {
645 // These are mostly to help catch *accidentally* increasing
646 // the size by too much.
647
648 let body_size = mem::size_of::<Body>();
649 let body_expected_size = mem::size_of::<u64>() * 6;
650 assert!(
651 body_size <= body_expected_size,
652 "Body size = {} <= {}",
653 body_size,
654 body_expected_size,
655 );
656
657 assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
658
659 assert_eq!(
660 mem::size_of::<Sender>(),
661 mem::size_of::<usize>() * 5,
662 "Sender"
663 );
664
665 assert_eq!(
666 mem::size_of::<Sender>(),
667 mem::size_of::<Option<Sender>>(),
668 "Option<Sender>"
669 );
670 }
671
672 #[test]
673 fn size_hint() {
674 fn eq(body: Body, b: SizeHint, note: &str) {
675 let a = body.size_hint();
676 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
677 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
678 }
679
680 eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
681
682 eq(Body::empty(), SizeHint::with_exact(0), "empty");
683
684 eq(Body::channel().1, SizeHint::new(), "channel");
685
686 eq(
687 Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
688 SizeHint::with_exact(4),
689 "channel with length",
690 );
691 }
692
693 #[tokio::test]
694 async fn channel_abort() {
695 let (tx, mut rx) = Body::channel();
696
697 tx.abort();
698
699 let err = rx.data().await.unwrap().unwrap_err();
700 assert!(err.is_body_write_aborted(), "{:?}", err);
701 }
702
703 #[tokio::test]
704 async fn channel_abort_when_buffer_is_full() {
705 let (mut tx, mut rx) = Body::channel();
706
707 tx.try_send_data("chunk 1".into()).expect("send 1");
708 // buffer is full, but can still send abort
709 tx.abort();
710
711 let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
712 assert_eq!(chunk1, "chunk 1");
713
714 let err = rx.data().await.unwrap().unwrap_err();
715 assert!(err.is_body_write_aborted(), "{:?}", err);
716 }
717
718 #[test]
719 fn channel_buffers_one() {
720 let (mut tx, _rx) = Body::channel();
721
722 tx.try_send_data("chunk 1".into()).expect("send 1");
723
724 // buffer is now full
725 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
726 assert_eq!(chunk2, "chunk 2");
727 }
728
729 #[tokio::test]
730 async fn channel_empty() {
731 let (_, mut rx) = Body::channel();
732
733 assert!(rx.data().await.is_none());
734 }
735
736 #[test]
737 fn channel_ready() {
738 let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
739
740 let mut tx_ready = tokio_test::task::spawn(tx.ready());
741
742 assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
743 }
744
745 #[test]
746 fn channel_wanter() {
747 let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
748
749 let mut tx_ready = tokio_test::task::spawn(tx.ready());
750 let mut rx_data = tokio_test::task::spawn(rx.data());
751
752 assert!(
753 tx_ready.poll().is_pending(),
754 "tx isn't ready before rx has been polled"
755 );
756
757 assert!(rx_data.poll().is_pending(), "poll rx.data");
758 assert!(tx_ready.is_woken(), "rx poll wakes tx");
759
760 assert!(
761 tx_ready.poll().is_ready(),
762 "tx is ready after rx has been polled"
763 );
764 }
765
766 #[test]
767 fn channel_notices_closure() {
768 let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
769
770 let mut tx_ready = tokio_test::task::spawn(tx.ready());
771
772 assert!(
773 tx_ready.poll().is_pending(),
774 "tx isn't ready before rx has been polled"
775 );
776
777 drop(rx);
778 assert!(tx_ready.is_woken(), "dropping rx wakes tx");
779
780 match tx_ready.poll() {
781 Poll::Ready(Err(ref e)) if e.is_closed() => (),
782 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
783 }
784 }
785}
786