1 | use std::borrow::Cow; |
2 | #[cfg (all(feature = "client" , any(feature = "http1" , feature = "http2" )))] |
3 | use std::convert::Infallible; |
4 | #[cfg (feature = "stream" )] |
5 | use std::error::Error as StdError; |
6 | use std::fmt; |
7 | use std::future::Future; |
8 | use std::pin::Pin; |
9 | use std::task::{Context, Poll}; |
10 | |
11 | use bytes::Bytes; |
12 | use futures_channel::mpsc; |
13 | use futures_channel::oneshot; |
14 | use futures_core::Stream; // for mpsc::Receiver |
15 | #[cfg (feature = "stream" )] |
16 | use futures_util::TryStreamExt; |
17 | use http::HeaderMap; |
18 | use http_body::{Body as HttpBody, SizeHint}; |
19 | |
20 | use super::DecodedLength; |
21 | #[cfg (feature = "stream" )] |
22 | use crate::common::sync_wrapper::SyncWrapper; |
23 | use crate::common::watch; |
24 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
25 | use crate::proto::h2::ping; |
26 | |
27 | type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; |
28 | type TrailersSender = oneshot::Sender<HeaderMap>; |
29 | |
30 | /// A stream of `Bytes`, used when receiving bodies. |
31 | /// |
32 | /// A good default [`HttpBody`](crate::body::HttpBody) to use in many |
33 | /// applications. |
34 | /// |
35 | /// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes()) |
36 | /// or [`body::aggregate`](crate::body::aggregate()). |
37 | #[must_use = "streams do nothing unless polled" ] |
38 | pub struct Body { |
39 | kind: Kind, |
40 | /// Keep the extra bits in an `Option<Box<Extra>>`, so that |
41 | /// Body stays small in the common case (no extras needed). |
42 | extra: Option<Box<Extra>>, |
43 | } |
44 | |
45 | enum Kind { |
46 | Once(Option<Bytes>), |
47 | Chan { |
48 | content_length: DecodedLength, |
49 | want_tx: watch::Sender, |
50 | data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>, |
51 | trailers_rx: oneshot::Receiver<HeaderMap>, |
52 | }, |
53 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
54 | H2 { |
55 | ping: ping::Recorder, |
56 | content_length: DecodedLength, |
57 | recv: h2::RecvStream, |
58 | }, |
59 | #[cfg (feature = "ffi" )] |
60 | Ffi(crate::ffi::UserBody), |
61 | #[cfg (feature = "stream" )] |
62 | Wrapped( |
63 | SyncWrapper< |
64 | Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>, |
65 | >, |
66 | ), |
67 | } |
68 | |
69 | struct Extra { |
70 | /// Allow the client to pass a future to delay the `Body` from returning |
71 | /// EOF. This allows the `Client` to try to put the idle connection |
72 | /// back into the pool before the body is "finished". |
73 | /// |
74 | /// The reason for this is so that creating a new request after finishing |
75 | /// streaming the body of a response could sometimes result in creating |
76 | /// a brand new connection, since the pool didn't know about the idle |
77 | /// connection yet. |
78 | delayed_eof: Option<DelayEof>, |
79 | } |
80 | |
81 | #[cfg (all(feature = "client" , any(feature = "http1" , feature = "http2" )))] |
82 | type DelayEofUntil = oneshot::Receiver<Infallible>; |
83 | |
84 | enum DelayEof { |
85 | /// Initial state, stream hasn't seen EOF yet. |
86 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
87 | #[cfg (feature = "client" )] |
88 | NotEof(DelayEofUntil), |
89 | /// Transitions to this state once we've seen `poll` try to |
90 | /// return EOF (`None`). This future is then polled, and |
91 | /// when it completes, the Body finally returns EOF (`None`). |
92 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
93 | #[cfg (feature = "client" )] |
94 | Eof(DelayEofUntil), |
95 | } |
96 | |
97 | /// A sender half created through [`Body::channel()`]. |
98 | /// |
99 | /// Useful when wanting to stream chunks from another thread. |
100 | /// |
101 | /// ## Body Closing |
102 | /// |
103 | /// Note that the request body will always be closed normally when the sender is dropped (meaning |
104 | /// that the empty terminating chunk will be sent to the remote). If you desire to close the |
105 | /// connection with an incomplete response (e.g. in the case of an error during asynchronous |
106 | /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. |
107 | /// |
108 | /// [`Body::channel()`]: struct.Body.html#method.channel |
109 | /// [`Sender::abort()`]: struct.Sender.html#method.abort |
110 | #[must_use = "Sender does nothing unless sent on" ] |
111 | pub struct Sender { |
112 | want_rx: watch::Receiver, |
113 | data_tx: BodySender, |
114 | trailers_tx: Option<TrailersSender>, |
115 | } |
116 | |
117 | const WANT_PENDING: usize = 1; |
118 | const WANT_READY: usize = 2; |
119 | |
120 | impl Body { |
121 | /// Create an empty `Body` stream. |
122 | /// |
123 | /// # Example |
124 | /// |
125 | /// ``` |
126 | /// use hyper::{Body, Request}; |
127 | /// |
128 | /// // create a `GET /` request |
129 | /// let get = Request::new(Body::empty()); |
130 | /// ``` |
131 | #[inline ] |
132 | pub fn empty() -> Body { |
133 | Body::new(Kind::Once(None)) |
134 | } |
135 | |
136 | /// Create a `Body` stream with an associated sender half. |
137 | /// |
138 | /// Useful when wanting to stream chunks from another thread. |
139 | #[inline ] |
140 | pub fn channel() -> (Sender, Body) { |
141 | Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) |
142 | } |
143 | |
144 | pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { |
145 | let (data_tx, data_rx) = mpsc::channel(0); |
146 | let (trailers_tx, trailers_rx) = oneshot::channel(); |
147 | |
148 | // If wanter is true, `Sender::poll_ready()` won't becoming ready |
149 | // until the `Body` has been polled for data once. |
150 | let want = if wanter { WANT_PENDING } else { WANT_READY }; |
151 | |
152 | let (want_tx, want_rx) = watch::channel(want); |
153 | |
154 | let tx = Sender { |
155 | want_rx, |
156 | data_tx, |
157 | trailers_tx: Some(trailers_tx), |
158 | }; |
159 | let rx = Body::new(Kind::Chan { |
160 | content_length, |
161 | want_tx, |
162 | data_rx, |
163 | trailers_rx, |
164 | }); |
165 | |
166 | (tx, rx) |
167 | } |
168 | |
169 | /// Wrap a futures `Stream` in a box inside `Body`. |
170 | /// |
171 | /// # Example |
172 | /// |
173 | /// ``` |
174 | /// # use hyper::Body; |
175 | /// let chunks: Vec<Result<_, std::io::Error>> = vec![ |
176 | /// Ok("hello"), |
177 | /// Ok(" "), |
178 | /// Ok("world"), |
179 | /// ]; |
180 | /// |
181 | /// let stream = futures_util::stream::iter(chunks); |
182 | /// |
183 | /// let body = Body::wrap_stream(stream); |
184 | /// ``` |
185 | /// |
186 | /// # Optional |
187 | /// |
188 | /// This function requires enabling the `stream` feature in your |
189 | /// `Cargo.toml`. |
190 | #[cfg (feature = "stream" )] |
191 | #[cfg_attr (docsrs, doc(cfg(feature = "stream" )))] |
192 | pub fn wrap_stream<S, O, E>(stream: S) -> Body |
193 | where |
194 | S: Stream<Item = Result<O, E>> + Send + 'static, |
195 | O: Into<Bytes> + 'static, |
196 | E: Into<Box<dyn StdError + Send + Sync>> + 'static, |
197 | { |
198 | let mapped = stream.map_ok(Into::into).map_err(Into::into); |
199 | Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) |
200 | } |
201 | |
202 | fn new(kind: Kind) -> Body { |
203 | Body { kind, extra: None } |
204 | } |
205 | |
206 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
207 | pub(crate) fn h2( |
208 | recv: h2::RecvStream, |
209 | mut content_length: DecodedLength, |
210 | ping: ping::Recorder, |
211 | ) -> Self { |
212 | // If the stream is already EOS, then the "unknown length" is clearly |
213 | // actually ZERO. |
214 | if !content_length.is_exact() && recv.is_end_stream() { |
215 | content_length = DecodedLength::ZERO; |
216 | } |
217 | let body = Body::new(Kind::H2 { |
218 | ping, |
219 | content_length, |
220 | recv, |
221 | }); |
222 | |
223 | body |
224 | } |
225 | |
226 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
227 | #[cfg (feature = "client" )] |
228 | pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { |
229 | self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); |
230 | } |
231 | |
232 | fn take_delayed_eof(&mut self) -> Option<DelayEof> { |
233 | self.extra |
234 | .as_mut() |
235 | .and_then(|extra| extra.delayed_eof.take()) |
236 | } |
237 | |
238 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
239 | fn extra_mut(&mut self) -> &mut Extra { |
240 | self.extra |
241 | .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) |
242 | } |
243 | |
244 | fn poll_eof(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { |
245 | match self.take_delayed_eof() { |
246 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
247 | #[cfg (feature = "client" )] |
248 | Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { |
249 | ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { |
250 | self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); |
251 | ok |
252 | } |
253 | Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) { |
254 | Poll::Ready(Ok(never)) => match never {}, |
255 | Poll::Pending => { |
256 | self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); |
257 | Poll::Pending |
258 | } |
259 | Poll::Ready(Err(_done)) => Poll::Ready(None), |
260 | }, |
261 | Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), |
262 | }, |
263 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
264 | #[cfg (feature = "client" )] |
265 | Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { |
266 | Poll::Ready(Ok(never)) => match never {}, |
267 | Poll::Pending => { |
268 | self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); |
269 | Poll::Pending |
270 | } |
271 | Poll::Ready(Err(_done)) => Poll::Ready(None), |
272 | }, |
273 | #[cfg (any( |
274 | not(any(feature = "http1" , feature = "http2" )), |
275 | not(feature = "client" ) |
276 | ))] |
277 | Some(delay_eof) => match delay_eof {}, |
278 | None => self.poll_inner(cx), |
279 | } |
280 | } |
281 | |
282 | #[cfg (feature = "ffi" )] |
283 | pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { |
284 | match self.kind { |
285 | Kind::Ffi(ref mut body) => return body, |
286 | _ => { |
287 | self.kind = Kind::Ffi(crate::ffi::UserBody::new()); |
288 | } |
289 | } |
290 | |
291 | match self.kind { |
292 | Kind::Ffi(ref mut body) => body, |
293 | _ => unreachable!(), |
294 | } |
295 | } |
296 | |
297 | fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { |
298 | match self.kind { |
299 | Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), |
300 | Kind::Chan { |
301 | content_length: ref mut len, |
302 | ref mut data_rx, |
303 | ref mut want_tx, |
304 | .. |
305 | } => { |
306 | want_tx.send(WANT_READY); |
307 | |
308 | match ready!(Pin::new(data_rx).poll_next(cx)?) { |
309 | Some(chunk) => { |
310 | len.sub_if(chunk.len() as u64); |
311 | Poll::Ready(Some(Ok(chunk))) |
312 | } |
313 | None => Poll::Ready(None), |
314 | } |
315 | } |
316 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
317 | Kind::H2 { |
318 | ref ping, |
319 | recv: ref mut h2, |
320 | content_length: ref mut len, |
321 | } => match ready!(h2.poll_data(cx)) { |
322 | Some(Ok(bytes)) => { |
323 | let _ = h2.flow_control().release_capacity(bytes.len()); |
324 | len.sub_if(bytes.len() as u64); |
325 | ping.record_data(bytes.len()); |
326 | Poll::Ready(Some(Ok(bytes))) |
327 | } |
328 | Some(Err(e)) => match e.reason() { |
329 | // These reasons should cause stop of body reading, but nor fail it. |
330 | // The same logic as for `AsyncRead for H2Upgraded` is applied here. |
331 | Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None), |
332 | _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), |
333 | }, |
334 | None => Poll::Ready(None), |
335 | }, |
336 | |
337 | #[cfg (feature = "ffi" )] |
338 | Kind::Ffi(ref mut body) => body.poll_data(cx), |
339 | |
340 | #[cfg (feature = "stream" )] |
341 | Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { |
342 | Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), |
343 | None => Poll::Ready(None), |
344 | }, |
345 | } |
346 | } |
347 | |
348 | #[cfg (feature = "http1" )] |
349 | pub(super) fn take_full_data(&mut self) -> Option<Bytes> { |
350 | if let Kind::Once(ref mut chunk) = self.kind { |
351 | chunk.take() |
352 | } else { |
353 | None |
354 | } |
355 | } |
356 | } |
357 | |
358 | impl Default for Body { |
359 | /// Returns `Body::empty()`. |
360 | #[inline ] |
361 | fn default() -> Body { |
362 | Body::empty() |
363 | } |
364 | } |
365 | |
366 | impl HttpBody for Body { |
367 | type Data = Bytes; |
368 | type Error = crate::Error; |
369 | |
370 | fn poll_data( |
371 | mut self: Pin<&mut Self>, |
372 | cx: &mut Context<'_>, |
373 | ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
374 | self.poll_eof(cx) |
375 | } |
376 | |
377 | fn poll_trailers( |
378 | #[cfg_attr (not(feature = "http2" ), allow(unused_mut))] mut self: Pin<&mut Self>, |
379 | #[cfg_attr (not(feature = "http2" ), allow(unused))] cx: &mut Context<'_>, |
380 | ) -> Poll<Result<Option<HeaderMap>, Self::Error>> { |
381 | match self.kind { |
382 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
383 | Kind::H2 { |
384 | recv: ref mut h2, |
385 | ref ping, |
386 | .. |
387 | } => match ready!(h2.poll_trailers(cx)) { |
388 | Ok(t) => { |
389 | ping.record_non_data(); |
390 | Poll::Ready(Ok(t)) |
391 | } |
392 | Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), |
393 | }, |
394 | Kind::Chan { |
395 | ref mut trailers_rx, |
396 | .. |
397 | } => match ready!(Pin::new(trailers_rx).poll(cx)) { |
398 | Ok(t) => Poll::Ready(Ok(Some(t))), |
399 | Err(_) => Poll::Ready(Ok(None)), |
400 | }, |
401 | #[cfg (feature = "ffi" )] |
402 | Kind::Ffi(ref mut body) => body.poll_trailers(cx), |
403 | _ => Poll::Ready(Ok(None)), |
404 | } |
405 | } |
406 | |
407 | fn is_end_stream(&self) -> bool { |
408 | match self.kind { |
409 | Kind::Once(ref val) => val.is_none(), |
410 | Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, |
411 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
412 | Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), |
413 | #[cfg (feature = "ffi" )] |
414 | Kind::Ffi(..) => false, |
415 | #[cfg (feature = "stream" )] |
416 | Kind::Wrapped(..) => false, |
417 | } |
418 | } |
419 | |
420 | fn size_hint(&self) -> SizeHint { |
421 | macro_rules! opt_len { |
422 | ($content_length:expr) => {{ |
423 | let mut hint = SizeHint::default(); |
424 | |
425 | if let Some(content_length) = $content_length.into_opt() { |
426 | hint.set_exact(content_length); |
427 | } |
428 | |
429 | hint |
430 | }}; |
431 | } |
432 | |
433 | match self.kind { |
434 | Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), |
435 | Kind::Once(None) => SizeHint::with_exact(0), |
436 | #[cfg (feature = "stream" )] |
437 | Kind::Wrapped(..) => SizeHint::default(), |
438 | Kind::Chan { content_length, .. } => opt_len!(content_length), |
439 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
440 | Kind::H2 { content_length, .. } => opt_len!(content_length), |
441 | #[cfg (feature = "ffi" )] |
442 | Kind::Ffi(..) => SizeHint::default(), |
443 | } |
444 | } |
445 | } |
446 | |
447 | impl fmt::Debug for Body { |
448 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
449 | #[derive (Debug)] |
450 | struct Streaming; |
451 | #[derive (Debug)] |
452 | struct Empty; |
453 | #[derive (Debug)] |
454 | struct Full<'a>(&'a Bytes); |
455 | |
456 | let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Body" ); |
457 | match self.kind { |
458 | Kind::Once(None) => builder.field(&Empty), |
459 | Kind::Once(Some(ref chunk: &Bytes)) => builder.field(&Full(chunk)), |
460 | _ => builder.field(&Streaming), |
461 | }; |
462 | |
463 | builder.finish() |
464 | } |
465 | } |
466 | |
467 | /// # Optional |
468 | /// |
469 | /// This function requires enabling the `stream` feature in your |
470 | /// `Cargo.toml`. |
471 | #[cfg (feature = "stream" )] |
472 | impl Stream for Body { |
473 | type Item = crate::Result<Bytes>; |
474 | |
475 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
476 | HttpBody::poll_data(self, cx) |
477 | } |
478 | } |
479 | |
480 | /// # Optional |
481 | /// |
482 | /// This function requires enabling the `stream` feature in your |
483 | /// `Cargo.toml`. |
484 | #[cfg (feature = "stream" )] |
485 | impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body { |
486 | #[inline ] |
487 | fn from( |
488 | stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, |
489 | ) -> Body { |
490 | Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) |
491 | } |
492 | } |
493 | |
494 | impl From<Bytes> for Body { |
495 | #[inline ] |
496 | fn from(chunk: Bytes) -> Body { |
497 | if chunk.is_empty() { |
498 | Body::empty() |
499 | } else { |
500 | Body::new(Kind::Once(Some(chunk))) |
501 | } |
502 | } |
503 | } |
504 | |
505 | impl From<Vec<u8>> for Body { |
506 | #[inline ] |
507 | fn from(vec: Vec<u8>) -> Body { |
508 | Body::from(Bytes::from(vec)) |
509 | } |
510 | } |
511 | |
512 | impl From<&'static [u8]> for Body { |
513 | #[inline ] |
514 | fn from(slice: &'static [u8]) -> Body { |
515 | Body::from(Bytes::from(slice)) |
516 | } |
517 | } |
518 | |
519 | impl From<Cow<'static, [u8]>> for Body { |
520 | #[inline ] |
521 | fn from(cow: Cow<'static, [u8]>) -> Body { |
522 | match cow { |
523 | Cow::Borrowed(b: &[u8]) => Body::from(b), |
524 | Cow::Owned(o: Vec) => Body::from(o), |
525 | } |
526 | } |
527 | } |
528 | |
529 | impl From<String> for Body { |
530 | #[inline ] |
531 | fn from(s: String) -> Body { |
532 | Body::from(Bytes::from(s.into_bytes())) |
533 | } |
534 | } |
535 | |
536 | impl From<&'static str> for Body { |
537 | #[inline ] |
538 | fn from(slice: &'static str) -> Body { |
539 | Body::from(Bytes::from(slice.as_bytes())) |
540 | } |
541 | } |
542 | |
543 | impl From<Cow<'static, str>> for Body { |
544 | #[inline ] |
545 | fn from(cow: Cow<'static, str>) -> Body { |
546 | match cow { |
547 | Cow::Borrowed(b: &str) => Body::from(b), |
548 | Cow::Owned(o: String) => Body::from(o), |
549 | } |
550 | } |
551 | } |
552 | |
553 | impl Sender { |
554 | /// Check to see if this `Sender` can send more data. |
555 | pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
556 | // Check if the receiver end has tried polling for the body yet |
557 | ready!(self.poll_want(cx)?); |
558 | self.data_tx |
559 | .poll_ready(cx) |
560 | .map_err(|_| crate::Error::new_closed()) |
561 | } |
562 | |
563 | fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
564 | match self.want_rx.load(cx) { |
565 | WANT_READY => Poll::Ready(Ok(())), |
566 | WANT_PENDING => Poll::Pending, |
567 | watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), |
568 | unexpected => unreachable!("want_rx value: {}" , unexpected), |
569 | } |
570 | } |
571 | |
572 | async fn ready(&mut self) -> crate::Result<()> { |
573 | futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await |
574 | } |
575 | |
576 | /// Send data on data channel when it is ready. |
577 | pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { |
578 | self.ready().await?; |
579 | self.data_tx |
580 | .try_send(Ok(chunk)) |
581 | .map_err(|_| crate::Error::new_closed()) |
582 | } |
583 | |
584 | /// Send trailers on trailers channel. |
585 | pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { |
586 | let tx = match self.trailers_tx.take() { |
587 | Some(tx) => tx, |
588 | None => return Err(crate::Error::new_closed()), |
589 | }; |
590 | tx.send(trailers).map_err(|_| crate::Error::new_closed()) |
591 | } |
592 | |
593 | /// Try to send data on this channel. |
594 | /// |
595 | /// # Errors |
596 | /// |
597 | /// Returns `Err(Bytes)` if the channel could not (currently) accept |
598 | /// another `Bytes`. |
599 | /// |
600 | /// # Note |
601 | /// |
602 | /// This is mostly useful for when trying to send from some other thread |
603 | /// that doesn't have an async context. If in an async context, prefer |
604 | /// `send_data()` instead. |
605 | pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { |
606 | self.data_tx |
607 | .try_send(Ok(chunk)) |
608 | .map_err(|err| err.into_inner().expect("just sent Ok" )) |
609 | } |
610 | |
611 | /// Aborts the body in an abnormal fashion. |
612 | pub fn abort(mut self) { |
613 | self.send_error(crate::Error::new_body_write_aborted()); |
614 | } |
615 | |
616 | pub(crate) fn send_error(&mut self, err: crate::Error) { |
617 | let _ = self |
618 | .data_tx |
619 | // clone so the send works even if buffer is full |
620 | .clone() |
621 | .try_send(Err(err)); |
622 | } |
623 | } |
624 | |
625 | impl fmt::Debug for Sender { |
626 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
627 | #[derive (Debug)] |
628 | struct Open; |
629 | #[derive (Debug)] |
630 | struct Closed; |
631 | |
632 | let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Sender" ); |
633 | match self.want_rx.peek() { |
634 | watch::CLOSED => builder.field(&Closed), |
635 | _ => builder.field(&Open), |
636 | }; |
637 | |
638 | builder.finish() |
639 | } |
640 | } |
641 | |
642 | #[cfg (test)] |
643 | mod tests { |
644 | use std::mem; |
645 | use std::task::Poll; |
646 | |
647 | use super::{Body, DecodedLength, HttpBody, Sender, SizeHint}; |
648 | |
649 | #[test ] |
650 | fn test_size_of() { |
651 | // These are mostly to help catch *accidentally* increasing |
652 | // the size by too much. |
653 | |
654 | let body_size = mem::size_of::<Body>(); |
655 | let body_expected_size = mem::size_of::<u64>() * 6; |
656 | assert!( |
657 | body_size <= body_expected_size, |
658 | "Body size = {} <= {}" , |
659 | body_size, |
660 | body_expected_size, |
661 | ); |
662 | |
663 | assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>" ); |
664 | |
665 | assert_eq!( |
666 | mem::size_of::<Sender>(), |
667 | mem::size_of::<usize>() * 5, |
668 | "Sender" |
669 | ); |
670 | |
671 | assert_eq!( |
672 | mem::size_of::<Sender>(), |
673 | mem::size_of::<Option<Sender>>(), |
674 | "Option<Sender>" |
675 | ); |
676 | } |
677 | |
678 | #[test ] |
679 | fn size_hint() { |
680 | fn eq(body: Body, b: SizeHint, note: &str) { |
681 | let a = body.size_hint(); |
682 | assert_eq!(a.lower(), b.lower(), "lower for {:?}" , note); |
683 | assert_eq!(a.upper(), b.upper(), "upper for {:?}" , note); |
684 | } |
685 | |
686 | eq(Body::from("Hello" ), SizeHint::with_exact(5), "from str" ); |
687 | |
688 | eq(Body::empty(), SizeHint::with_exact(0), "empty" ); |
689 | |
690 | eq(Body::channel().1, SizeHint::new(), "channel" ); |
691 | |
692 | eq( |
693 | Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, |
694 | SizeHint::with_exact(4), |
695 | "channel with length" , |
696 | ); |
697 | } |
698 | |
699 | #[tokio::test ] |
700 | async fn channel_abort() { |
701 | let (tx, mut rx) = Body::channel(); |
702 | |
703 | tx.abort(); |
704 | |
705 | let err = rx.data().await.unwrap().unwrap_err(); |
706 | assert!(err.is_body_write_aborted(), " {:?}" , err); |
707 | } |
708 | |
709 | #[tokio::test ] |
710 | async fn channel_abort_when_buffer_is_full() { |
711 | let (mut tx, mut rx) = Body::channel(); |
712 | |
713 | tx.try_send_data("chunk 1" .into()).expect("send 1" ); |
714 | // buffer is full, but can still send abort |
715 | tx.abort(); |
716 | |
717 | let chunk1 = rx.data().await.expect("item 1" ).expect("chunk 1" ); |
718 | assert_eq!(chunk1, "chunk 1" ); |
719 | |
720 | let err = rx.data().await.unwrap().unwrap_err(); |
721 | assert!(err.is_body_write_aborted(), " {:?}" , err); |
722 | } |
723 | |
724 | #[test ] |
725 | fn channel_buffers_one() { |
726 | let (mut tx, _rx) = Body::channel(); |
727 | |
728 | tx.try_send_data("chunk 1" .into()).expect("send 1" ); |
729 | |
730 | // buffer is now full |
731 | let chunk2 = tx.try_send_data("chunk 2" .into()).expect_err("send 2" ); |
732 | assert_eq!(chunk2, "chunk 2" ); |
733 | } |
734 | |
735 | #[tokio::test ] |
736 | async fn channel_empty() { |
737 | let (_, mut rx) = Body::channel(); |
738 | |
739 | assert!(rx.data().await.is_none()); |
740 | } |
741 | |
742 | #[test ] |
743 | fn channel_ready() { |
744 | let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); |
745 | |
746 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
747 | |
748 | assert!(tx_ready.poll().is_ready(), "tx is ready immediately" ); |
749 | } |
750 | |
751 | #[test ] |
752 | fn channel_wanter() { |
753 | let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); |
754 | |
755 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
756 | let mut rx_data = tokio_test::task::spawn(rx.data()); |
757 | |
758 | assert!( |
759 | tx_ready.poll().is_pending(), |
760 | "tx isn't ready before rx has been polled" |
761 | ); |
762 | |
763 | assert!(rx_data.poll().is_pending(), "poll rx.data" ); |
764 | assert!(tx_ready.is_woken(), "rx poll wakes tx" ); |
765 | |
766 | assert!( |
767 | tx_ready.poll().is_ready(), |
768 | "tx is ready after rx has been polled" |
769 | ); |
770 | } |
771 | |
772 | #[test ] |
773 | fn channel_notices_closure() { |
774 | let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); |
775 | |
776 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
777 | |
778 | assert!( |
779 | tx_ready.poll().is_pending(), |
780 | "tx isn't ready before rx has been polled" |
781 | ); |
782 | |
783 | drop(rx); |
784 | assert!(tx_ready.is_woken(), "dropping rx wakes tx" ); |
785 | |
786 | match tx_ready.poll() { |
787 | Poll::Ready(Err(ref e)) if e.is_closed() => (), |
788 | unexpected => panic!("tx poll ready unexpected: {:?}" , unexpected), |
789 | } |
790 | } |
791 | } |
792 | |