1use std::error::Error as StdError;
2use std::future::Future;
3use std::marker::Unpin;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::{Buf, Bytes};
8use http::Request;
9use tokio::io::{AsyncRead, AsyncWrite};
10use tracing::{debug, trace};
11
12use super::{Http1Transaction, Wants};
13use crate::body::{Body, DecodedLength, HttpBody};
14use crate::common;
15use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
16use crate::upgrade::OnUpgrade;
17
18pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
19 conn: Conn<I, Bs::Data, T>,
20 dispatch: D,
21 body_tx: Option<crate::body::Sender>,
22 body_rx: Pin<Box<Option<Bs>>>,
23 is_closing: bool,
24}
25
26pub(crate) trait Dispatch {
27 type PollItem;
28 type PollBody;
29 type PollError;
30 type RecvItem;
31 fn poll_msg(
32 self: Pin<&mut Self>,
33 cx: &mut Context<'_>,
34 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
35 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
36 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
37 fn should_poll(&self) -> bool;
38}
39
40cfg_server! {
41 use crate::service::HttpService;
42
43 pub(crate) struct Server<S: HttpService<B>, B> {
44 in_flight: Pin<Box<Option<S::Future>>>,
45 pub(crate) service: S,
46 }
47}
48
49cfg_client! {
50 pin_project_lite::pin_project! {
51 pub(crate) struct Client<B> {
52 callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
53 #[pin]
54 rx: ClientRx<B>,
55 rx_closed: bool,
56 }
57 }
58
59 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
60}
61
62impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
63where
64 D: Dispatch<
65 PollItem = MessageHead<T::Outgoing>,
66 PollBody = Bs,
67 RecvItem = MessageHead<T::Incoming>,
68 > + Unpin,
69 D::PollError: Into<Box<dyn StdError + Send + Sync>>,
70 I: AsyncRead + AsyncWrite + Unpin,
71 T: Http1Transaction + Unpin,
72 Bs: HttpBody + 'static,
73 Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
74{
75 pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
76 Dispatcher {
77 conn,
78 dispatch,
79 body_tx: None,
80 body_rx: Box::pin(None),
81 is_closing: false,
82 }
83 }
84
85 #[cfg(feature = "server")]
86 pub(crate) fn disable_keep_alive(&mut self) {
87 self.conn.disable_keep_alive();
88 if self.conn.is_write_closed() {
89 self.close();
90 }
91 }
92
93 pub(crate) fn into_inner(self) -> (I, Bytes, D) {
94 let (io, buf) = self.conn.into_inner();
95 (io, buf, self.dispatch)
96 }
97
98 /// Run this dispatcher until HTTP says this connection is done,
99 /// but don't call `AsyncWrite::shutdown` on the underlying IO.
100 ///
101 /// This is useful for old-style HTTP upgrades, but ignores
102 /// newer-style upgrade API.
103 pub(crate) fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
104 where
105 Self: Unpin,
106 {
107 Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
108 if let Dispatched::Upgrade(pending) = ds {
109 pending.manual();
110 }
111 })
112 }
113
114 fn poll_catch(
115 &mut self,
116 cx: &mut Context<'_>,
117 should_shutdown: bool,
118 ) -> Poll<crate::Result<Dispatched>> {
119 Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
120 // Be sure to alert a streaming body of the failure.
121 if let Some(mut body) = self.body_tx.take() {
122 body.send_error(crate::Error::new_body("connection error"));
123 }
124 // An error means we're shutting down either way.
125 // We just try to give the error to the user,
126 // and close the connection with an Ok. If we
127 // cannot give it to the user, then return the Err.
128 self.dispatch.recv_msg(Err(e))?;
129 Ok(Dispatched::Shutdown)
130 }))
131 }
132
133 fn poll_inner(
134 &mut self,
135 cx: &mut Context<'_>,
136 should_shutdown: bool,
137 ) -> Poll<crate::Result<Dispatched>> {
138 T::update_date();
139
140 ready!(self.poll_loop(cx))?;
141
142 if self.is_done() {
143 if let Some(pending) = self.conn.pending_upgrade() {
144 self.conn.take_error()?;
145 return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
146 } else if should_shutdown {
147 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
148 }
149 self.conn.take_error()?;
150 Poll::Ready(Ok(Dispatched::Shutdown))
151 } else {
152 Poll::Pending
153 }
154 }
155
156 fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
157 // Limit the looping on this connection, in case it is ready far too
158 // often, so that other futures don't starve.
159 //
160 // 16 was chosen arbitrarily, as that is number of pipelined requests
161 // benchmarks often use. Perhaps it should be a config option instead.
162 for _ in 0..16 {
163 let _ = self.poll_read(cx)?;
164 let _ = self.poll_write(cx)?;
165 let _ = self.poll_flush(cx)?;
166
167 // This could happen if reading paused before blocking on IO,
168 // such as getting to the end of a framed message, but then
169 // writing/flushing set the state back to Init. In that case,
170 // if the read buffer still had bytes, we'd want to try poll_read
171 // again, or else we wouldn't ever be woken up again.
172 //
173 // Using this instead of task::current() and notify() inside
174 // the Conn is noticeably faster in pipelined benchmarks.
175 if !self.conn.wants_read_again() {
176 //break;
177 return Poll::Ready(Ok(()));
178 }
179 }
180
181 trace!("poll_loop yielding (self = {:p})", self);
182
183 common::task::yield_now(cx).map(|never| match never {})
184 }
185
186 fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
187 loop {
188 if self.is_closing {
189 return Poll::Ready(Ok(()));
190 } else if self.conn.can_read_head() {
191 ready!(self.poll_read_head(cx))?;
192 } else if let Some(mut body) = self.body_tx.take() {
193 if self.conn.can_read_body() {
194 match body.poll_ready(cx) {
195 Poll::Ready(Ok(())) => (),
196 Poll::Pending => {
197 self.body_tx = Some(body);
198 return Poll::Pending;
199 }
200 Poll::Ready(Err(_canceled)) => {
201 // user doesn't care about the body
202 // so we should stop reading
203 trace!("body receiver dropped before eof, draining or closing");
204 self.conn.poll_drain_or_close_read(cx);
205 continue;
206 }
207 }
208 match self.conn.poll_read_body(cx) {
209 Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
210 Ok(()) => {
211 self.body_tx = Some(body);
212 }
213 Err(_canceled) => {
214 if self.conn.can_read_body() {
215 trace!("body receiver dropped before eof, closing");
216 self.conn.close_read();
217 }
218 }
219 },
220 Poll::Ready(None) => {
221 // just drop, the body will close automatically
222 }
223 Poll::Pending => {
224 self.body_tx = Some(body);
225 return Poll::Pending;
226 }
227 Poll::Ready(Some(Err(e))) => {
228 body.send_error(crate::Error::new_body(e));
229 }
230 }
231 } else {
232 // just drop, the body will close automatically
233 }
234 } else {
235 return self.conn.poll_read_keep_alive(cx);
236 }
237 }
238 }
239
240 fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
241 // can dispatch receive, or does it still care about, an incoming message?
242 match ready!(self.dispatch.poll_ready(cx)) {
243 Ok(()) => (),
244 Err(()) => {
245 trace!("dispatch no longer receiving messages");
246 self.close();
247 return Poll::Ready(Ok(()));
248 }
249 }
250 // dispatch is ready for a message, try to read one
251 match ready!(self.conn.poll_read_head(cx)) {
252 Some(Ok((mut head, body_len, wants))) => {
253 let body = match body_len {
254 DecodedLength::ZERO => Body::empty(),
255 other => {
256 let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
257 self.body_tx = Some(tx);
258 rx
259 }
260 };
261 if wants.contains(Wants::UPGRADE) {
262 let upgrade = self.conn.on_upgrade();
263 debug_assert!(!upgrade.is_none(), "empty upgrade");
264 debug_assert!(
265 head.extensions.get::<OnUpgrade>().is_none(),
266 "OnUpgrade already set"
267 );
268 head.extensions.insert(upgrade);
269 }
270 self.dispatch.recv_msg(Ok((head, body)))?;
271 Poll::Ready(Ok(()))
272 }
273 Some(Err(err)) => {
274 debug!("read_head error: {}", err);
275 self.dispatch.recv_msg(Err(err))?;
276 // if here, the dispatcher gave the user the error
277 // somewhere else. we still need to shutdown, but
278 // not as a second error.
279 self.close();
280 Poll::Ready(Ok(()))
281 }
282 None => {
283 // read eof, the write side will have been closed too unless
284 // allow_read_close was set to true, in which case just do
285 // nothing...
286 debug_assert!(self.conn.is_read_closed());
287 if self.conn.is_write_closed() {
288 self.close();
289 }
290 Poll::Ready(Ok(()))
291 }
292 }
293 }
294
295 fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
296 loop {
297 if self.is_closing {
298 return Poll::Ready(Ok(()));
299 } else if self.body_rx.is_none()
300 && self.conn.can_write_head()
301 && self.dispatch.should_poll()
302 {
303 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
304 let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
305
306 // Check if the body knows its full data immediately.
307 //
308 // If so, we can skip a bit of bookkeeping that streaming
309 // bodies need to do.
310 if let Some(full) = crate::body::take_full_data(&mut body) {
311 self.conn.write_full_msg(head, full);
312 return Poll::Ready(Ok(()));
313 }
314
315 let body_type = if body.is_end_stream() {
316 self.body_rx.set(None);
317 None
318 } else {
319 let btype = body
320 .size_hint()
321 .exact()
322 .map(BodyLength::Known)
323 .or_else(|| Some(BodyLength::Unknown));
324 self.body_rx.set(Some(body));
325 btype
326 };
327 self.conn.write_head(head, body_type);
328 } else {
329 self.close();
330 return Poll::Ready(Ok(()));
331 }
332 } else if !self.conn.can_buffer_body() {
333 ready!(self.poll_flush(cx))?;
334 } else {
335 // A new scope is needed :(
336 if let (Some(mut body), clear_body) =
337 OptGuard::new(self.body_rx.as_mut()).guard_mut()
338 {
339 debug_assert!(!*clear_body, "opt guard defaults to keeping body");
340 if !self.conn.can_write_body() {
341 trace!(
342 "no more write body allowed, user body is_end_stream = {}",
343 body.is_end_stream(),
344 );
345 *clear_body = true;
346 continue;
347 }
348
349 let item = ready!(body.as_mut().poll_data(cx));
350 if let Some(item) = item {
351 let chunk = item.map_err(|e| {
352 *clear_body = true;
353 crate::Error::new_user_body(e)
354 })?;
355 let eos = body.is_end_stream();
356 if eos {
357 *clear_body = true;
358 if chunk.remaining() == 0 {
359 trace!("discarding empty chunk");
360 self.conn.end_body()?;
361 } else {
362 self.conn.write_body_and_end(chunk);
363 }
364 } else {
365 if chunk.remaining() == 0 {
366 trace!("discarding empty chunk");
367 continue;
368 }
369 self.conn.write_body(chunk);
370 }
371 } else {
372 *clear_body = true;
373 self.conn.end_body()?;
374 }
375 } else {
376 // If there's no body_rx, end the body
377 if self.conn.can_write_body() {
378 self.conn.end_body()?;
379 } else {
380 return Poll::Pending;
381 }
382 }
383 }
384 }
385 }
386
387 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
388 self.conn.poll_flush(cx).map_err(|err| {
389 debug!("error writing: {}", err);
390 crate::Error::new_body_write(err)
391 })
392 }
393
394 fn close(&mut self) {
395 self.is_closing = true;
396 self.conn.close_read();
397 self.conn.close_write();
398 }
399
400 fn is_done(&self) -> bool {
401 if self.is_closing {
402 return true;
403 }
404
405 let read_done = self.conn.is_read_closed();
406
407 if !T::should_read_first() && read_done {
408 // a client that cannot read may was well be done.
409 true
410 } else {
411 let write_done = self.conn.is_write_closed()
412 || (!self.dispatch.should_poll() && self.body_rx.is_none());
413 read_done && write_done
414 }
415 }
416}
417
418impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
419where
420 D: Dispatch<
421 PollItem = MessageHead<T::Outgoing>,
422 PollBody = Bs,
423 RecvItem = MessageHead<T::Incoming>,
424 > + Unpin,
425 D::PollError: Into<Box<dyn StdError + Send + Sync>>,
426 I: AsyncRead + AsyncWrite + Unpin,
427 T: Http1Transaction + Unpin,
428 Bs: HttpBody + 'static,
429 Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
430{
431 type Output = crate::Result<Dispatched>;
432
433 #[inline]
434 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
435 self.poll_catch(cx, should_shutdown:true)
436 }
437}
438
439// ===== impl OptGuard =====
440
441/// A drop guard to allow a mutable borrow of an Option while being able to
442/// set whether the `Option` should be cleared on drop.
443struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
444
445impl<'a, T> OptGuard<'a, T> {
446 fn new(pin: Pin<&'a mut Option<T>>) -> Self {
447 OptGuard(pin, false)
448 }
449
450 fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
451 (self.0.as_mut().as_pin_mut(), &mut self.1)
452 }
453}
454
455impl<'a, T> Drop for OptGuard<'a, T> {
456 fn drop(&mut self) {
457 if self.1 {
458 self.0.set(None);
459 }
460 }
461}
462
463// ===== impl Server =====
464
465cfg_server! {
466 impl<S, B> Server<S, B>
467 where
468 S: HttpService<B>,
469 {
470 pub(crate) fn new(service: S) -> Server<S, B> {
471 Server {
472 in_flight: Box::pin(None),
473 service,
474 }
475 }
476
477 pub(crate) fn into_service(self) -> S {
478 self.service
479 }
480 }
481
482 // Service is never pinned
483 impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
484
485 impl<S, Bs> Dispatch for Server<S, Body>
486 where
487 S: HttpService<Body, ResBody = Bs>,
488 S::Error: Into<Box<dyn StdError + Send + Sync>>,
489 Bs: HttpBody,
490 {
491 type PollItem = MessageHead<http::StatusCode>;
492 type PollBody = Bs;
493 type PollError = S::Error;
494 type RecvItem = RequestHead;
495
496 fn poll_msg(
497 mut self: Pin<&mut Self>,
498 cx: &mut Context<'_>,
499 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
500 let mut this = self.as_mut();
501 let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
502 let resp = ready!(fut.as_mut().poll(cx)?);
503 let (parts, body) = resp.into_parts();
504 let head = MessageHead {
505 version: parts.version,
506 subject: parts.status,
507 headers: parts.headers,
508 extensions: parts.extensions,
509 };
510 Poll::Ready(Some(Ok((head, body))))
511 } else {
512 unreachable!("poll_msg shouldn't be called if no inflight");
513 };
514
515 // Since in_flight finished, remove it
516 this.in_flight.set(None);
517 ret
518 }
519
520 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
521 let (msg, body) = msg?;
522 let mut req = Request::new(body);
523 *req.method_mut() = msg.subject.0;
524 *req.uri_mut() = msg.subject.1;
525 *req.headers_mut() = msg.headers;
526 *req.version_mut() = msg.version;
527 *req.extensions_mut() = msg.extensions;
528 let fut = self.service.call(req);
529 self.in_flight.set(Some(fut));
530 Ok(())
531 }
532
533 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
534 if self.in_flight.is_some() {
535 Poll::Pending
536 } else {
537 self.service.poll_ready(cx).map_err(|_e| {
538 // FIXME: return error value.
539 trace!("service closed");
540 })
541 }
542 }
543
544 fn should_poll(&self) -> bool {
545 self.in_flight.is_some()
546 }
547 }
548}
549
550// ===== impl Client =====
551
552cfg_client! {
553 impl<B> Client<B> {
554 pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
555 Client {
556 callback: None,
557 rx,
558 rx_closed: false,
559 }
560 }
561 }
562
563 impl<B> Dispatch for Client<B>
564 where
565 B: HttpBody,
566 {
567 type PollItem = RequestHead;
568 type PollBody = B;
569 type PollError = std::convert::Infallible;
570 type RecvItem = crate::proto::ResponseHead;
571
572 fn poll_msg(
573 mut self: Pin<&mut Self>,
574 cx: &mut Context<'_>,
575 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
576 let mut this = self.as_mut();
577 debug_assert!(!this.rx_closed);
578 match this.rx.poll_recv(cx) {
579 Poll::Ready(Some((req, mut cb))) => {
580 // check that future hasn't been canceled already
581 match cb.poll_canceled(cx) {
582 Poll::Ready(()) => {
583 trace!("request canceled");
584 Poll::Ready(None)
585 }
586 Poll::Pending => {
587 let (parts, body) = req.into_parts();
588 let head = RequestHead {
589 version: parts.version,
590 subject: crate::proto::RequestLine(parts.method, parts.uri),
591 headers: parts.headers,
592 extensions: parts.extensions,
593 };
594 this.callback = Some(cb);
595 Poll::Ready(Some(Ok((head, body))))
596 }
597 }
598 }
599 Poll::Ready(None) => {
600 // user has dropped sender handle
601 trace!("client tx closed");
602 this.rx_closed = true;
603 Poll::Ready(None)
604 }
605 Poll::Pending => Poll::Pending,
606 }
607 }
608
609 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
610 match msg {
611 Ok((msg, body)) => {
612 if let Some(cb) = self.callback.take() {
613 let res = msg.into_response(body);
614 cb.send(Ok(res));
615 Ok(())
616 } else {
617 // Getting here is likely a bug! An error should have happened
618 // in Conn::require_empty_read() before ever parsing a
619 // full message!
620 Err(crate::Error::new_unexpected_message())
621 }
622 }
623 Err(err) => {
624 if let Some(cb) = self.callback.take() {
625 cb.send(Err((err, None)));
626 Ok(())
627 } else if !self.rx_closed {
628 self.rx.close();
629 if let Some((req, cb)) = self.rx.try_recv() {
630 trace!("canceling queued request with connection error: {}", err);
631 // in this case, the message was never even started, so it's safe to tell
632 // the user that the request was completely canceled
633 cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
634 Ok(())
635 } else {
636 Err(err)
637 }
638 } else {
639 Err(err)
640 }
641 }
642 }
643 }
644
645 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
646 match self.callback {
647 Some(ref mut cb) => match cb.poll_canceled(cx) {
648 Poll::Ready(()) => {
649 trace!("callback receiver has dropped");
650 Poll::Ready(Err(()))
651 }
652 Poll::Pending => Poll::Ready(Ok(())),
653 },
654 None => Poll::Ready(Err(())),
655 }
656 }
657
658 fn should_poll(&self) -> bool {
659 self.callback.is_none()
660 }
661 }
662}
663
664#[cfg(test)]
665mod tests {
666 use super::*;
667 use crate::proto::h1::ClientTransaction;
668 use std::time::Duration;
669
670 #[test]
671 fn client_read_bytes_before_writing_request() {
672 let _ = pretty_env_logger::try_init();
673
674 tokio_test::task::spawn(()).enter(|cx, _| {
675 let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
676
677 // Block at 0 for now, but we will release this response before
678 // the request is ready to write later...
679 let (mut tx, rx) = crate::client::dispatch::channel();
680 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
681 let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
682
683 // First poll is needed to allow tx to send...
684 assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
685
686 // Unblock our IO, which has a response before we've sent request!
687 //
688 handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
689
690 let mut res_rx = tx
691 .try_send(crate::Request::new(crate::Body::empty()))
692 .unwrap();
693
694 tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
695 let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
696 .expect_err("callback should send error");
697
698 match (err.0.kind(), err.1) {
699 (&crate::error::Kind::Canceled, Some(_)) => (),
700 other => panic!("expected Canceled, got {:?}", other),
701 }
702 });
703 }
704
705 #[tokio::test]
706 async fn client_flushing_is_not_ready_for_next_request() {
707 let _ = pretty_env_logger::try_init();
708
709 let (io, _handle) = tokio_test::io::Builder::new()
710 .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
711 .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
712 .wait(std::time::Duration::from_secs(2))
713 .build_with_handle();
714
715 let (mut tx, rx) = crate::client::dispatch::channel();
716 let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
717 conn.set_write_strategy_queue();
718
719 let dispatcher = Dispatcher::new(Client::new(rx), conn);
720 let _dispatcher = tokio::spawn(async move { dispatcher.await });
721
722 let req = crate::Request::builder()
723 .method("POST")
724 .body(crate::Body::from("reee"))
725 .unwrap();
726
727 let res = tx.try_send(req).unwrap().await.expect("response");
728 drop(res);
729
730 assert!(!tx.is_ready());
731 }
732
733 #[tokio::test]
734 async fn body_empty_chunks_ignored() {
735 let _ = pretty_env_logger::try_init();
736
737 let io = tokio_test::io::Builder::new()
738 // no reading or writing, just be blocked for the test...
739 .wait(Duration::from_secs(5))
740 .build();
741
742 let (mut tx, rx) = crate::client::dispatch::channel();
743 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
744 let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
745
746 // First poll is needed to allow tx to send...
747 assert!(dispatcher.poll().is_pending());
748
749 let body = {
750 let (mut tx, body) = crate::Body::channel();
751 tx.try_send_data("".into()).unwrap();
752 body
753 };
754
755 let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
756
757 // Ensure conn.write_body wasn't called with the empty chunk.
758 // If it is, it will trigger an assertion.
759 assert!(dispatcher.poll().is_pending());
760 }
761}
762