1 | use std::fmt; |
2 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
3 | use std::future::Future; |
4 | use std::pin::Pin; |
5 | use std::task::{Context, Poll}; |
6 | |
7 | use bytes::Bytes; |
8 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
9 | use futures_channel::{mpsc, oneshot}; |
10 | #[cfg (all( |
11 | any(feature = "http1" , feature = "http2" ), |
12 | any(feature = "client" , feature = "server" ) |
13 | ))] |
14 | use futures_util::ready; |
15 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
16 | use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver |
17 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
18 | use http::HeaderMap; |
19 | use http_body::{Body, Frame, SizeHint}; |
20 | |
21 | #[cfg (all( |
22 | any(feature = "http1" , feature = "http2" ), |
23 | any(feature = "client" , feature = "server" ) |
24 | ))] |
25 | use super::DecodedLength; |
26 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
27 | use crate::common::watch; |
28 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
29 | use crate::proto::h2::ping; |
30 | |
31 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
32 | type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; |
33 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
34 | type TrailersSender = oneshot::Sender<HeaderMap>; |
35 | |
36 | /// A stream of `Bytes`, used when receiving bodies from the network. |
37 | /// |
38 | /// Note that Users should not instantiate this struct directly. When working with the hyper client, |
39 | /// `Incoming` is returned to you in responses. Similarly, when operating with the hyper server, |
40 | /// it is provided within requests. |
41 | /// |
42 | /// # Examples |
43 | /// |
44 | /// ```rust,ignore |
45 | /// async fn echo( |
46 | /// req: Request<hyper::body::Incoming>, |
47 | /// ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { |
48 | /// //Here, you can process `Incoming` |
49 | /// } |
50 | /// ``` |
51 | #[must_use = "streams do nothing unless polled" ] |
52 | pub struct Incoming { |
53 | kind: Kind, |
54 | } |
55 | |
56 | enum Kind { |
57 | Empty, |
58 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
59 | Chan { |
60 | content_length: DecodedLength, |
61 | want_tx: watch::Sender, |
62 | data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>, |
63 | trailers_rx: oneshot::Receiver<HeaderMap>, |
64 | }, |
65 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
66 | H2 { |
67 | content_length: DecodedLength, |
68 | data_done: bool, |
69 | ping: ping::Recorder, |
70 | recv: h2::RecvStream, |
71 | }, |
72 | #[cfg (feature = "ffi" )] |
73 | Ffi(crate::ffi::UserBody), |
74 | } |
75 | |
76 | /// A sender half created through [`Body::channel()`]. |
77 | /// |
78 | /// Useful when wanting to stream chunks from another thread. |
79 | /// |
80 | /// ## Body Closing |
81 | /// |
82 | /// Note that the request body will always be closed normally when the sender is dropped (meaning |
83 | /// that the empty terminating chunk will be sent to the remote). If you desire to close the |
84 | /// connection with an incomplete response (e.g. in the case of an error during asynchronous |
85 | /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. |
86 | /// |
87 | /// [`Body::channel()`]: struct.Body.html#method.channel |
88 | /// [`Sender::abort()`]: struct.Sender.html#method.abort |
89 | #[must_use = "Sender does nothing unless sent on" ] |
90 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
91 | pub(crate) struct Sender { |
92 | want_rx: watch::Receiver, |
93 | data_tx: BodySender, |
94 | trailers_tx: Option<TrailersSender>, |
95 | } |
96 | |
97 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
98 | const WANT_PENDING: usize = 1; |
99 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
100 | const WANT_READY: usize = 2; |
101 | |
102 | impl Incoming { |
103 | /// Create a `Body` stream with an associated sender half. |
104 | /// |
105 | /// Useful when wanting to stream chunks from another thread. |
106 | #[inline ] |
107 | #[cfg (test)] |
108 | pub(crate) fn channel() -> (Sender, Incoming) { |
109 | Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) |
110 | } |
111 | |
112 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
113 | pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) { |
114 | let (data_tx, data_rx) = mpsc::channel(0); |
115 | let (trailers_tx, trailers_rx) = oneshot::channel(); |
116 | |
117 | // If wanter is true, `Sender::poll_ready()` won't becoming ready |
118 | // until the `Body` has been polled for data once. |
119 | let want = if wanter { WANT_PENDING } else { WANT_READY }; |
120 | |
121 | let (want_tx, want_rx) = watch::channel(want); |
122 | |
123 | let tx = Sender { |
124 | want_rx, |
125 | data_tx, |
126 | trailers_tx: Some(trailers_tx), |
127 | }; |
128 | let rx = Incoming::new(Kind::Chan { |
129 | content_length, |
130 | want_tx, |
131 | data_rx, |
132 | trailers_rx, |
133 | }); |
134 | |
135 | (tx, rx) |
136 | } |
137 | |
138 | fn new(kind: Kind) -> Incoming { |
139 | Incoming { kind } |
140 | } |
141 | |
142 | #[allow (dead_code)] |
143 | pub(crate) fn empty() -> Incoming { |
144 | Incoming::new(Kind::Empty) |
145 | } |
146 | |
147 | #[cfg (feature = "ffi" )] |
148 | pub(crate) fn ffi() -> Incoming { |
149 | Incoming::new(Kind::Ffi(crate::ffi::UserBody::new())) |
150 | } |
151 | |
152 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
153 | pub(crate) fn h2( |
154 | recv: h2::RecvStream, |
155 | mut content_length: DecodedLength, |
156 | ping: ping::Recorder, |
157 | ) -> Self { |
158 | // If the stream is already EOS, then the "unknown length" is clearly |
159 | // actually ZERO. |
160 | if !content_length.is_exact() && recv.is_end_stream() { |
161 | content_length = DecodedLength::ZERO; |
162 | } |
163 | |
164 | Incoming::new(Kind::H2 { |
165 | data_done: false, |
166 | ping, |
167 | content_length, |
168 | recv, |
169 | }) |
170 | } |
171 | |
172 | #[cfg (feature = "ffi" )] |
173 | pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { |
174 | match self.kind { |
175 | Kind::Ffi(ref mut body) => return body, |
176 | _ => { |
177 | self.kind = Kind::Ffi(crate::ffi::UserBody::new()); |
178 | } |
179 | } |
180 | |
181 | match self.kind { |
182 | Kind::Ffi(ref mut body) => body, |
183 | _ => unreachable!(), |
184 | } |
185 | } |
186 | } |
187 | |
188 | impl Body for Incoming { |
189 | type Data = Bytes; |
190 | type Error = crate::Error; |
191 | |
192 | fn poll_frame( |
193 | #[cfg_attr ( |
194 | not(all( |
195 | any(feature = "http1" , feature = "http2" ), |
196 | any(feature = "client" , feature = "server" ) |
197 | )), |
198 | allow(unused_mut) |
199 | )] |
200 | mut self: Pin<&mut Self>, |
201 | #[cfg_attr ( |
202 | not(all( |
203 | any(feature = "http1" , feature = "http2" ), |
204 | any(feature = "client" , feature = "server" ) |
205 | )), |
206 | allow(unused_variables) |
207 | )] |
208 | cx: &mut Context<'_>, |
209 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
210 | match self.kind { |
211 | Kind::Empty => Poll::Ready(None), |
212 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
213 | Kind::Chan { |
214 | content_length: ref mut len, |
215 | ref mut data_rx, |
216 | ref mut want_tx, |
217 | ref mut trailers_rx, |
218 | } => { |
219 | want_tx.send(WANT_READY); |
220 | |
221 | if !data_rx.is_terminated() { |
222 | if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) { |
223 | len.sub_if(chunk.len() as u64); |
224 | return Poll::Ready(Some(Ok(Frame::data(chunk)))); |
225 | } |
226 | } |
227 | |
228 | // check trailers after data is terminated |
229 | match ready!(Pin::new(trailers_rx).poll(cx)) { |
230 | Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))), |
231 | Err(_) => Poll::Ready(None), |
232 | } |
233 | } |
234 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
235 | Kind::H2 { |
236 | ref mut data_done, |
237 | ref ping, |
238 | recv: ref mut h2, |
239 | content_length: ref mut len, |
240 | } => { |
241 | if !*data_done { |
242 | match ready!(h2.poll_data(cx)) { |
243 | Some(Ok(bytes)) => { |
244 | let _ = h2.flow_control().release_capacity(bytes.len()); |
245 | len.sub_if(bytes.len() as u64); |
246 | ping.record_data(bytes.len()); |
247 | return Poll::Ready(Some(Ok(Frame::data(bytes)))); |
248 | } |
249 | Some(Err(e)) => { |
250 | return match e.reason() { |
251 | // These reasons should cause the body reading to stop, but not fail it. |
252 | // The same logic as for `Read for H2Upgraded` is applied here. |
253 | Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => { |
254 | Poll::Ready(None) |
255 | } |
256 | _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), |
257 | }; |
258 | } |
259 | None => { |
260 | *data_done = true; |
261 | // fall through to trailers |
262 | } |
263 | } |
264 | } |
265 | |
266 | // after data, check trailers |
267 | match ready!(h2.poll_trailers(cx)) { |
268 | Ok(t) => { |
269 | ping.record_non_data(); |
270 | Poll::Ready(Ok(t.map(Frame::trailers)).transpose()) |
271 | } |
272 | Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), |
273 | } |
274 | } |
275 | |
276 | #[cfg (feature = "ffi" )] |
277 | Kind::Ffi(ref mut body) => body.poll_data(cx), |
278 | } |
279 | } |
280 | |
281 | fn is_end_stream(&self) -> bool { |
282 | match self.kind { |
283 | Kind::Empty => true, |
284 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
285 | Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, |
286 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
287 | Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), |
288 | #[cfg (feature = "ffi" )] |
289 | Kind::Ffi(..) => false, |
290 | } |
291 | } |
292 | |
293 | fn size_hint(&self) -> SizeHint { |
294 | #[cfg (all( |
295 | any(feature = "http1" , feature = "http2" ), |
296 | any(feature = "client" , feature = "server" ) |
297 | ))] |
298 | fn opt_len(decoded_length: DecodedLength) -> SizeHint { |
299 | if let Some(content_length) = decoded_length.into_opt() { |
300 | SizeHint::with_exact(content_length) |
301 | } else { |
302 | SizeHint::default() |
303 | } |
304 | } |
305 | |
306 | match self.kind { |
307 | Kind::Empty => SizeHint::with_exact(0), |
308 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
309 | Kind::Chan { content_length, .. } => opt_len(content_length), |
310 | #[cfg (all(feature = "http2" , any(feature = "client" , feature = "server" )))] |
311 | Kind::H2 { content_length, .. } => opt_len(content_length), |
312 | #[cfg (feature = "ffi" )] |
313 | Kind::Ffi(..) => SizeHint::default(), |
314 | } |
315 | } |
316 | } |
317 | |
318 | impl fmt::Debug for Incoming { |
319 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
320 | #[cfg (any( |
321 | all( |
322 | any(feature = "http1" , feature = "http2" ), |
323 | any(feature = "client" , feature = "server" ) |
324 | ), |
325 | feature = "ffi" |
326 | ))] |
327 | #[derive (Debug)] |
328 | struct Streaming; |
329 | #[derive (Debug)] |
330 | struct Empty; |
331 | |
332 | let mut builder = f.debug_tuple("Body" ); |
333 | match self.kind { |
334 | Kind::Empty => builder.field(&Empty), |
335 | #[cfg (any( |
336 | all( |
337 | any(feature = "http1" , feature = "http2" ), |
338 | any(feature = "client" , feature = "server" ) |
339 | ), |
340 | feature = "ffi" |
341 | ))] |
342 | _ => builder.field(&Streaming), |
343 | }; |
344 | |
345 | builder.finish() |
346 | } |
347 | } |
348 | |
349 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
350 | impl Sender { |
351 | /// Check to see if this `Sender` can send more data. |
352 | pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
353 | // Check if the receiver end has tried polling for the body yet |
354 | ready!(self.poll_want(cx)?); |
355 | self.data_tx |
356 | .poll_ready(cx) |
357 | .map_err(|_| crate::Error::new_closed()) |
358 | } |
359 | |
360 | fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
361 | match self.want_rx.load(cx) { |
362 | WANT_READY => Poll::Ready(Ok(())), |
363 | WANT_PENDING => Poll::Pending, |
364 | watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), |
365 | unexpected => unreachable!("want_rx value: {}" , unexpected), |
366 | } |
367 | } |
368 | |
369 | #[cfg (test)] |
370 | async fn ready(&mut self) -> crate::Result<()> { |
371 | futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await |
372 | } |
373 | |
374 | /// Send data on data channel when it is ready. |
375 | #[cfg (test)] |
376 | #[allow (unused)] |
377 | pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { |
378 | self.ready().await?; |
379 | self.data_tx |
380 | .try_send(Ok(chunk)) |
381 | .map_err(|_| crate::Error::new_closed()) |
382 | } |
383 | |
384 | /// Send trailers on trailers channel. |
385 | #[allow (unused)] |
386 | pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { |
387 | let tx = match self.trailers_tx.take() { |
388 | Some(tx) => tx, |
389 | None => return Err(crate::Error::new_closed()), |
390 | }; |
391 | tx.send(trailers).map_err(|_| crate::Error::new_closed()) |
392 | } |
393 | |
394 | /// Try to send data on this channel. |
395 | /// |
396 | /// # Errors |
397 | /// |
398 | /// Returns `Err(Bytes)` if the channel could not (currently) accept |
399 | /// another `Bytes`. |
400 | /// |
401 | /// # Note |
402 | /// |
403 | /// This is mostly useful for when trying to send from some other thread |
404 | /// that doesn't have an async context. If in an async context, prefer |
405 | /// `send_data()` instead. |
406 | #[cfg (feature = "http1" )] |
407 | pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { |
408 | self.data_tx |
409 | .try_send(Ok(chunk)) |
410 | .map_err(|err| err.into_inner().expect("just sent Ok" )) |
411 | } |
412 | |
413 | #[cfg (feature = "http1" )] |
414 | pub(crate) fn try_send_trailers( |
415 | &mut self, |
416 | trailers: HeaderMap, |
417 | ) -> Result<(), Option<HeaderMap>> { |
418 | let tx = match self.trailers_tx.take() { |
419 | Some(tx) => tx, |
420 | None => return Err(None), |
421 | }; |
422 | |
423 | tx.send(trailers).map_err(Some) |
424 | } |
425 | |
426 | #[cfg (test)] |
427 | pub(crate) fn abort(mut self) { |
428 | self.send_error(crate::Error::new_body_write_aborted()); |
429 | } |
430 | |
431 | pub(crate) fn send_error(&mut self, err: crate::Error) { |
432 | let _ = self |
433 | .data_tx |
434 | // clone so the send works even if buffer is full |
435 | .clone() |
436 | .try_send(Err(err)); |
437 | } |
438 | } |
439 | |
440 | #[cfg (all(feature = "http1" , any(feature = "client" , feature = "server" )))] |
441 | impl fmt::Debug for Sender { |
442 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
443 | #[derive (Debug)] |
444 | struct Open; |
445 | #[derive (Debug)] |
446 | struct Closed; |
447 | |
448 | let mut builder: DebugTuple<'_, '_> = f.debug_tuple(name:"Sender" ); |
449 | match self.want_rx.peek() { |
450 | watch::CLOSED => builder.field(&Closed), |
451 | _ => builder.field(&Open), |
452 | }; |
453 | |
454 | builder.finish() |
455 | } |
456 | } |
457 | |
458 | #[cfg (test)] |
459 | mod tests { |
460 | use std::mem; |
461 | use std::task::Poll; |
462 | |
463 | use super::{Body, DecodedLength, Incoming, Sender, SizeHint}; |
464 | use http_body_util::BodyExt; |
465 | |
466 | #[test ] |
467 | fn test_size_of() { |
468 | // These are mostly to help catch *accidentally* increasing |
469 | // the size by too much. |
470 | |
471 | let body_size = mem::size_of::<Incoming>(); |
472 | let body_expected_size = mem::size_of::<u64>() * 5; |
473 | assert!( |
474 | body_size <= body_expected_size, |
475 | "Body size = {} <= {}" , |
476 | body_size, |
477 | body_expected_size, |
478 | ); |
479 | |
480 | //assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>"); |
481 | |
482 | assert_eq!( |
483 | mem::size_of::<Sender>(), |
484 | mem::size_of::<usize>() * 5, |
485 | "Sender" |
486 | ); |
487 | |
488 | assert_eq!( |
489 | mem::size_of::<Sender>(), |
490 | mem::size_of::<Option<Sender>>(), |
491 | "Option<Sender>" |
492 | ); |
493 | } |
494 | |
495 | #[test ] |
496 | fn size_hint() { |
497 | fn eq(body: Incoming, b: SizeHint, note: &str) { |
498 | let a = body.size_hint(); |
499 | assert_eq!(a.lower(), b.lower(), "lower for {:?}" , note); |
500 | assert_eq!(a.upper(), b.upper(), "upper for {:?}" , note); |
501 | } |
502 | |
503 | eq(Incoming::empty(), SizeHint::with_exact(0), "empty" ); |
504 | |
505 | eq(Incoming::channel().1, SizeHint::new(), "channel" ); |
506 | |
507 | eq( |
508 | Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, |
509 | SizeHint::with_exact(4), |
510 | "channel with length" , |
511 | ); |
512 | } |
513 | |
514 | #[cfg (not(miri))] |
515 | #[tokio::test ] |
516 | async fn channel_abort() { |
517 | let (tx, mut rx) = Incoming::channel(); |
518 | |
519 | tx.abort(); |
520 | |
521 | let err = rx.frame().await.unwrap().unwrap_err(); |
522 | assert!(err.is_body_write_aborted(), "{:?}" , err); |
523 | } |
524 | |
525 | #[cfg (all(not(miri), feature = "http1" ))] |
526 | #[tokio::test ] |
527 | async fn channel_abort_when_buffer_is_full() { |
528 | let (mut tx, mut rx) = Incoming::channel(); |
529 | |
530 | tx.try_send_data("chunk 1" .into()).expect("send 1" ); |
531 | // buffer is full, but can still send abort |
532 | tx.abort(); |
533 | |
534 | let chunk1 = rx |
535 | .frame() |
536 | .await |
537 | .expect("item 1" ) |
538 | .expect("chunk 1" ) |
539 | .into_data() |
540 | .unwrap(); |
541 | assert_eq!(chunk1, "chunk 1" ); |
542 | |
543 | let err = rx.frame().await.unwrap().unwrap_err(); |
544 | assert!(err.is_body_write_aborted(), "{:?}" , err); |
545 | } |
546 | |
547 | #[cfg (feature = "http1" )] |
548 | #[test ] |
549 | fn channel_buffers_one() { |
550 | let (mut tx, _rx) = Incoming::channel(); |
551 | |
552 | tx.try_send_data("chunk 1" .into()).expect("send 1" ); |
553 | |
554 | // buffer is now full |
555 | let chunk2 = tx.try_send_data("chunk 2" .into()).expect_err("send 2" ); |
556 | assert_eq!(chunk2, "chunk 2" ); |
557 | } |
558 | |
559 | #[cfg (not(miri))] |
560 | #[tokio::test ] |
561 | async fn channel_empty() { |
562 | let (_, mut rx) = Incoming::channel(); |
563 | |
564 | assert!(rx.frame().await.is_none()); |
565 | } |
566 | |
567 | #[test ] |
568 | fn channel_ready() { |
569 | let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); |
570 | |
571 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
572 | |
573 | assert!(tx_ready.poll().is_ready(), "tx is ready immediately" ); |
574 | } |
575 | |
576 | #[test ] |
577 | fn channel_wanter() { |
578 | let (mut tx, mut rx) = |
579 | Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); |
580 | |
581 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
582 | let mut rx_data = tokio_test::task::spawn(rx.frame()); |
583 | |
584 | assert!( |
585 | tx_ready.poll().is_pending(), |
586 | "tx isn't ready before rx has been polled" |
587 | ); |
588 | |
589 | assert!(rx_data.poll().is_pending(), "poll rx.data" ); |
590 | assert!(tx_ready.is_woken(), "rx poll wakes tx" ); |
591 | |
592 | assert!( |
593 | tx_ready.poll().is_ready(), |
594 | "tx is ready after rx has been polled" |
595 | ); |
596 | } |
597 | |
598 | #[test ] |
599 | fn channel_notices_closure() { |
600 | let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); |
601 | |
602 | let mut tx_ready = tokio_test::task::spawn(tx.ready()); |
603 | |
604 | assert!( |
605 | tx_ready.poll().is_pending(), |
606 | "tx isn't ready before rx has been polled" |
607 | ); |
608 | |
609 | drop(rx); |
610 | assert!(tx_ready.is_woken(), "dropping rx wakes tx" ); |
611 | |
612 | match tx_ready.poll() { |
613 | Poll::Ready(Err(ref e)) if e.is_closed() => (), |
614 | unexpected => panic!("tx poll ready unexpected: {:?}" , unexpected), |
615 | } |
616 | } |
617 | } |
618 | |