1use std::error::Error as StdError;
2
3use bytes::{Buf, Bytes};
4use http::Request;
5use tokio::io::{AsyncRead, AsyncWrite};
6use tracing::{debug, trace};
7
8use super::{Http1Transaction, Wants};
9use crate::body::{Body, DecodedLength, HttpBody};
10use crate::common::{task, Future, Pin, Poll, Unpin};
11use crate::proto::{
12 BodyLength, Conn, Dispatched, MessageHead, RequestHead,
13};
14use crate::upgrade::OnUpgrade;
15
16pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
17 conn: Conn<I, Bs::Data, T>,
18 dispatch: D,
19 body_tx: Option<crate::body::Sender>,
20 body_rx: Pin<Box<Option<Bs>>>,
21 is_closing: bool,
22}
23
24pub(crate) trait Dispatch {
25 type PollItem;
26 type PollBody;
27 type PollError;
28 type RecvItem;
29 fn poll_msg(
30 self: Pin<&mut Self>,
31 cx: &mut task::Context<'_>,
32 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
33 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
34 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
35 fn should_poll(&self) -> bool;
36}
37
38cfg_server! {
39 use crate::service::HttpService;
40
41 pub(crate) struct Server<S: HttpService<B>, B> {
42 in_flight: Pin<Box<Option<S::Future>>>,
43 pub(crate) service: S,
44 }
45}
46
47cfg_client! {
48 pin_project_lite::pin_project! {
49 pub(crate) struct Client<B> {
50 callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
51 #[pin]
52 rx: ClientRx<B>,
53 rx_closed: bool,
54 }
55 }
56
57 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
58}
59
60impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
61where
62 D: Dispatch<
63 PollItem = MessageHead<T::Outgoing>,
64 PollBody = Bs,
65 RecvItem = MessageHead<T::Incoming>,
66 > + Unpin,
67 D::PollError: Into<Box<dyn StdError + Send + Sync>>,
68 I: AsyncRead + AsyncWrite + Unpin,
69 T: Http1Transaction + Unpin,
70 Bs: HttpBody + 'static,
71 Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
72{
73 pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
74 Dispatcher {
75 conn,
76 dispatch,
77 body_tx: None,
78 body_rx: Box::pin(None),
79 is_closing: false,
80 }
81 }
82
83 #[cfg(feature = "server")]
84 pub(crate) fn disable_keep_alive(&mut self) {
85 self.conn.disable_keep_alive();
86 if self.conn.is_write_closed() {
87 self.close();
88 }
89 }
90
91 pub(crate) fn into_inner(self) -> (I, Bytes, D) {
92 let (io, buf) = self.conn.into_inner();
93 (io, buf, self.dispatch)
94 }
95
96 /// Run this dispatcher until HTTP says this connection is done,
97 /// but don't call `AsyncWrite::shutdown` on the underlying IO.
98 ///
99 /// This is useful for old-style HTTP upgrades, but ignores
100 /// newer-style upgrade API.
101 pub(crate) fn poll_without_shutdown(
102 &mut self,
103 cx: &mut task::Context<'_>,
104 ) -> Poll<crate::Result<()>>
105 where
106 Self: Unpin,
107 {
108 Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
109 if let Dispatched::Upgrade(pending) = ds {
110 pending.manual();
111 }
112 })
113 }
114
115 fn poll_catch(
116 &mut self,
117 cx: &mut task::Context<'_>,
118 should_shutdown: bool,
119 ) -> Poll<crate::Result<Dispatched>> {
120 Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
121 // An error means we're shutting down either way.
122 // We just try to give the error to the user,
123 // and close the connection with an Ok. If we
124 // cannot give it to the user, then return the Err.
125 self.dispatch.recv_msg(Err(e))?;
126 Ok(Dispatched::Shutdown)
127 }))
128 }
129
130 fn poll_inner(
131 &mut self,
132 cx: &mut task::Context<'_>,
133 should_shutdown: bool,
134 ) -> Poll<crate::Result<Dispatched>> {
135 T::update_date();
136
137 ready!(self.poll_loop(cx))?;
138
139 if self.is_done() {
140 if let Some(pending) = self.conn.pending_upgrade() {
141 self.conn.take_error()?;
142 return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
143 } else if should_shutdown {
144 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
145 }
146 self.conn.take_error()?;
147 Poll::Ready(Ok(Dispatched::Shutdown))
148 } else {
149 Poll::Pending
150 }
151 }
152
153 fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
154 // Limit the looping on this connection, in case it is ready far too
155 // often, so that other futures don't starve.
156 //
157 // 16 was chosen arbitrarily, as that is number of pipelined requests
158 // benchmarks often use. Perhaps it should be a config option instead.
159 for _ in 0..16 {
160 let _ = self.poll_read(cx)?;
161 let _ = self.poll_write(cx)?;
162 let _ = self.poll_flush(cx)?;
163
164 // This could happen if reading paused before blocking on IO,
165 // such as getting to the end of a framed message, but then
166 // writing/flushing set the state back to Init. In that case,
167 // if the read buffer still had bytes, we'd want to try poll_read
168 // again, or else we wouldn't ever be woken up again.
169 //
170 // Using this instead of task::current() and notify() inside
171 // the Conn is noticeably faster in pipelined benchmarks.
172 if !self.conn.wants_read_again() {
173 //break;
174 return Poll::Ready(Ok(()));
175 }
176 }
177
178 trace!("poll_loop yielding (self = {:p})", self);
179
180 task::yield_now(cx).map(|never| match never {})
181 }
182
183 fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
184 loop {
185 if self.is_closing {
186 return Poll::Ready(Ok(()));
187 } else if self.conn.can_read_head() {
188 ready!(self.poll_read_head(cx))?;
189 } else if let Some(mut body) = self.body_tx.take() {
190 if self.conn.can_read_body() {
191 match body.poll_ready(cx) {
192 Poll::Ready(Ok(())) => (),
193 Poll::Pending => {
194 self.body_tx = Some(body);
195 return Poll::Pending;
196 }
197 Poll::Ready(Err(_canceled)) => {
198 // user doesn't care about the body
199 // so we should stop reading
200 trace!("body receiver dropped before eof, draining or closing");
201 self.conn.poll_drain_or_close_read(cx);
202 continue;
203 }
204 }
205 match self.conn.poll_read_body(cx) {
206 Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
207 Ok(()) => {
208 self.body_tx = Some(body);
209 }
210 Err(_canceled) => {
211 if self.conn.can_read_body() {
212 trace!("body receiver dropped before eof, closing");
213 self.conn.close_read();
214 }
215 }
216 },
217 Poll::Ready(None) => {
218 // just drop, the body will close automatically
219 }
220 Poll::Pending => {
221 self.body_tx = Some(body);
222 return Poll::Pending;
223 }
224 Poll::Ready(Some(Err(e))) => {
225 body.send_error(crate::Error::new_body(e));
226 }
227 }
228 } else {
229 // just drop, the body will close automatically
230 }
231 } else {
232 return self.conn.poll_read_keep_alive(cx);
233 }
234 }
235 }
236
237 fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
238 // can dispatch receive, or does it still care about, an incoming message?
239 match ready!(self.dispatch.poll_ready(cx)) {
240 Ok(()) => (),
241 Err(()) => {
242 trace!("dispatch no longer receiving messages");
243 self.close();
244 return Poll::Ready(Ok(()));
245 }
246 }
247 // dispatch is ready for a message, try to read one
248 match ready!(self.conn.poll_read_head(cx)) {
249 Some(Ok((mut head, body_len, wants))) => {
250 let body = match body_len {
251 DecodedLength::ZERO => Body::empty(),
252 other => {
253 let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
254 self.body_tx = Some(tx);
255 rx
256 }
257 };
258 if wants.contains(Wants::UPGRADE) {
259 let upgrade = self.conn.on_upgrade();
260 debug_assert!(!upgrade.is_none(), "empty upgrade");
261 debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set");
262 head.extensions.insert(upgrade);
263 }
264 self.dispatch.recv_msg(Ok((head, body)))?;
265 Poll::Ready(Ok(()))
266 }
267 Some(Err(err)) => {
268 debug!("read_head error: {}", err);
269 self.dispatch.recv_msg(Err(err))?;
270 // if here, the dispatcher gave the user the error
271 // somewhere else. we still need to shutdown, but
272 // not as a second error.
273 self.close();
274 Poll::Ready(Ok(()))
275 }
276 None => {
277 // read eof, the write side will have been closed too unless
278 // allow_read_close was set to true, in which case just do
279 // nothing...
280 debug_assert!(self.conn.is_read_closed());
281 if self.conn.is_write_closed() {
282 self.close();
283 }
284 Poll::Ready(Ok(()))
285 }
286 }
287 }
288
289 fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
290 loop {
291 if self.is_closing {
292 return Poll::Ready(Ok(()));
293 } else if self.body_rx.is_none()
294 && self.conn.can_write_head()
295 && self.dispatch.should_poll()
296 {
297 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
298 let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
299
300 // Check if the body knows its full data immediately.
301 //
302 // If so, we can skip a bit of bookkeeping that streaming
303 // bodies need to do.
304 if let Some(full) = crate::body::take_full_data(&mut body) {
305 self.conn.write_full_msg(head, full);
306 return Poll::Ready(Ok(()));
307 }
308
309 let body_type = if body.is_end_stream() {
310 self.body_rx.set(None);
311 None
312 } else {
313 let btype = body
314 .size_hint()
315 .exact()
316 .map(BodyLength::Known)
317 .or_else(|| Some(BodyLength::Unknown));
318 self.body_rx.set(Some(body));
319 btype
320 };
321 self.conn.write_head(head, body_type);
322 } else {
323 self.close();
324 return Poll::Ready(Ok(()));
325 }
326 } else if !self.conn.can_buffer_body() {
327 ready!(self.poll_flush(cx))?;
328 } else {
329 // A new scope is needed :(
330 if let (Some(mut body), clear_body) =
331 OptGuard::new(self.body_rx.as_mut()).guard_mut()
332 {
333 debug_assert!(!*clear_body, "opt guard defaults to keeping body");
334 if !self.conn.can_write_body() {
335 trace!(
336 "no more write body allowed, user body is_end_stream = {}",
337 body.is_end_stream(),
338 );
339 *clear_body = true;
340 continue;
341 }
342
343 let item = ready!(body.as_mut().poll_data(cx));
344 if let Some(item) = item {
345 let chunk = item.map_err(|e| {
346 *clear_body = true;
347 crate::Error::new_user_body(e)
348 })?;
349 let eos = body.is_end_stream();
350 if eos {
351 *clear_body = true;
352 if chunk.remaining() == 0 {
353 trace!("discarding empty chunk");
354 self.conn.end_body()?;
355 } else {
356 self.conn.write_body_and_end(chunk);
357 }
358 } else {
359 if chunk.remaining() == 0 {
360 trace!("discarding empty chunk");
361 continue;
362 }
363 self.conn.write_body(chunk);
364 }
365 } else {
366 *clear_body = true;
367 self.conn.end_body()?;
368 }
369 } else {
370 return Poll::Pending;
371 }
372 }
373 }
374 }
375
376 fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
377 self.conn.poll_flush(cx).map_err(|err| {
378 debug!("error writing: {}", err);
379 crate::Error::new_body_write(err)
380 })
381 }
382
383 fn close(&mut self) {
384 self.is_closing = true;
385 self.conn.close_read();
386 self.conn.close_write();
387 }
388
389 fn is_done(&self) -> bool {
390 if self.is_closing {
391 return true;
392 }
393
394 let read_done = self.conn.is_read_closed();
395
396 if !T::should_read_first() && read_done {
397 // a client that cannot read may was well be done.
398 true
399 } else {
400 let write_done = self.conn.is_write_closed()
401 || (!self.dispatch.should_poll() && self.body_rx.is_none());
402 read_done && write_done
403 }
404 }
405}
406
407impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
408where
409 D: Dispatch<
410 PollItem = MessageHead<T::Outgoing>,
411 PollBody = Bs,
412 RecvItem = MessageHead<T::Incoming>,
413 > + Unpin,
414 D::PollError: Into<Box<dyn StdError + Send + Sync>>,
415 I: AsyncRead + AsyncWrite + Unpin,
416 T: Http1Transaction + Unpin,
417 Bs: HttpBody + 'static,
418 Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
419{
420 type Output = crate::Result<Dispatched>;
421
422 #[inline]
423 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
424 self.poll_catch(cx, should_shutdown:true)
425 }
426}
427
428// ===== impl OptGuard =====
429
430/// A drop guard to allow a mutable borrow of an Option while being able to
431/// set whether the `Option` should be cleared on drop.
432struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
433
434impl<'a, T> OptGuard<'a, T> {
435 fn new(pin: Pin<&'a mut Option<T>>) -> Self {
436 OptGuard(pin, false)
437 }
438
439 fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
440 (self.0.as_mut().as_pin_mut(), &mut self.1)
441 }
442}
443
444impl<'a, T> Drop for OptGuard<'a, T> {
445 fn drop(&mut self) {
446 if self.1 {
447 self.0.set(None);
448 }
449 }
450}
451
452// ===== impl Server =====
453
454cfg_server! {
455 impl<S, B> Server<S, B>
456 where
457 S: HttpService<B>,
458 {
459 pub(crate) fn new(service: S) -> Server<S, B> {
460 Server {
461 in_flight: Box::pin(None),
462 service,
463 }
464 }
465
466 pub(crate) fn into_service(self) -> S {
467 self.service
468 }
469 }
470
471 // Service is never pinned
472 impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
473
474 impl<S, Bs> Dispatch for Server<S, Body>
475 where
476 S: HttpService<Body, ResBody = Bs>,
477 S::Error: Into<Box<dyn StdError + Send + Sync>>,
478 Bs: HttpBody,
479 {
480 type PollItem = MessageHead<http::StatusCode>;
481 type PollBody = Bs;
482 type PollError = S::Error;
483 type RecvItem = RequestHead;
484
485 fn poll_msg(
486 mut self: Pin<&mut Self>,
487 cx: &mut task::Context<'_>,
488 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
489 let mut this = self.as_mut();
490 let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
491 let resp = ready!(fut.as_mut().poll(cx)?);
492 let (parts, body) = resp.into_parts();
493 let head = MessageHead {
494 version: parts.version,
495 subject: parts.status,
496 headers: parts.headers,
497 extensions: parts.extensions,
498 };
499 Poll::Ready(Some(Ok((head, body))))
500 } else {
501 unreachable!("poll_msg shouldn't be called if no inflight");
502 };
503
504 // Since in_flight finished, remove it
505 this.in_flight.set(None);
506 ret
507 }
508
509 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
510 let (msg, body) = msg?;
511 let mut req = Request::new(body);
512 *req.method_mut() = msg.subject.0;
513 *req.uri_mut() = msg.subject.1;
514 *req.headers_mut() = msg.headers;
515 *req.version_mut() = msg.version;
516 *req.extensions_mut() = msg.extensions;
517 let fut = self.service.call(req);
518 self.in_flight.set(Some(fut));
519 Ok(())
520 }
521
522 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
523 if self.in_flight.is_some() {
524 Poll::Pending
525 } else {
526 self.service.poll_ready(cx).map_err(|_e| {
527 // FIXME: return error value.
528 trace!("service closed");
529 })
530 }
531 }
532
533 fn should_poll(&self) -> bool {
534 self.in_flight.is_some()
535 }
536 }
537}
538
539// ===== impl Client =====
540
541cfg_client! {
542 impl<B> Client<B> {
543 pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
544 Client {
545 callback: None,
546 rx,
547 rx_closed: false,
548 }
549 }
550 }
551
552 impl<B> Dispatch for Client<B>
553 where
554 B: HttpBody,
555 {
556 type PollItem = RequestHead;
557 type PollBody = B;
558 type PollError = crate::common::Never;
559 type RecvItem = crate::proto::ResponseHead;
560
561 fn poll_msg(
562 mut self: Pin<&mut Self>,
563 cx: &mut task::Context<'_>,
564 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> {
565 let mut this = self.as_mut();
566 debug_assert!(!this.rx_closed);
567 match this.rx.poll_recv(cx) {
568 Poll::Ready(Some((req, mut cb))) => {
569 // check that future hasn't been canceled already
570 match cb.poll_canceled(cx) {
571 Poll::Ready(()) => {
572 trace!("request canceled");
573 Poll::Ready(None)
574 }
575 Poll::Pending => {
576 let (parts, body) = req.into_parts();
577 let head = RequestHead {
578 version: parts.version,
579 subject: crate::proto::RequestLine(parts.method, parts.uri),
580 headers: parts.headers,
581 extensions: parts.extensions,
582 };
583 this.callback = Some(cb);
584 Poll::Ready(Some(Ok((head, body))))
585 }
586 }
587 }
588 Poll::Ready(None) => {
589 // user has dropped sender handle
590 trace!("client tx closed");
591 this.rx_closed = true;
592 Poll::Ready(None)
593 }
594 Poll::Pending => Poll::Pending,
595 }
596 }
597
598 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
599 match msg {
600 Ok((msg, body)) => {
601 if let Some(cb) = self.callback.take() {
602 let res = msg.into_response(body);
603 cb.send(Ok(res));
604 Ok(())
605 } else {
606 // Getting here is likely a bug! An error should have happened
607 // in Conn::require_empty_read() before ever parsing a
608 // full message!
609 Err(crate::Error::new_unexpected_message())
610 }
611 }
612 Err(err) => {
613 if let Some(cb) = self.callback.take() {
614 cb.send(Err((err, None)));
615 Ok(())
616 } else if !self.rx_closed {
617 self.rx.close();
618 if let Some((req, cb)) = self.rx.try_recv() {
619 trace!("canceling queued request with connection error: {}", err);
620 // in this case, the message was never even started, so it's safe to tell
621 // the user that the request was completely canceled
622 cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
623 Ok(())
624 } else {
625 Err(err)
626 }
627 } else {
628 Err(err)
629 }
630 }
631 }
632 }
633
634 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
635 match self.callback {
636 Some(ref mut cb) => match cb.poll_canceled(cx) {
637 Poll::Ready(()) => {
638 trace!("callback receiver has dropped");
639 Poll::Ready(Err(()))
640 }
641 Poll::Pending => Poll::Ready(Ok(())),
642 },
643 None => Poll::Ready(Err(())),
644 }
645 }
646
647 fn should_poll(&self) -> bool {
648 self.callback.is_none()
649 }
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656 use crate::proto::h1::ClientTransaction;
657 use std::time::Duration;
658
659 #[test]
660 fn client_read_bytes_before_writing_request() {
661 let _ = pretty_env_logger::try_init();
662
663 tokio_test::task::spawn(()).enter(|cx, _| {
664 let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
665
666 // Block at 0 for now, but we will release this response before
667 // the request is ready to write later...
668 let (mut tx, rx) = crate::client::dispatch::channel();
669 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
670 let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
671
672 // First poll is needed to allow tx to send...
673 assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
674
675 // Unblock our IO, which has a response before we've sent request!
676 //
677 handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
678
679 let mut res_rx = tx
680 .try_send(crate::Request::new(crate::Body::empty()))
681 .unwrap();
682
683 tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
684 let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
685 .expect_err("callback should send error");
686
687 match (err.0.kind(), err.1) {
688 (&crate::error::Kind::Canceled, Some(_)) => (),
689 other => panic!("expected Canceled, got {:?}", other),
690 }
691 });
692 }
693
694 #[tokio::test]
695 async fn client_flushing_is_not_ready_for_next_request() {
696 let _ = pretty_env_logger::try_init();
697
698 let (io, _handle) = tokio_test::io::Builder::new()
699 .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
700 .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
701 .wait(std::time::Duration::from_secs(2))
702 .build_with_handle();
703
704 let (mut tx, rx) = crate::client::dispatch::channel();
705 let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
706 conn.set_write_strategy_queue();
707
708 let dispatcher = Dispatcher::new(Client::new(rx), conn);
709 let _dispatcher = tokio::spawn(async move { dispatcher.await });
710
711 let req = crate::Request::builder()
712 .method("POST")
713 .body(crate::Body::from("reee"))
714 .unwrap();
715
716 let res = tx.try_send(req).unwrap().await.expect("response");
717 drop(res);
718
719 assert!(!tx.is_ready());
720 }
721
722 #[tokio::test]
723 async fn body_empty_chunks_ignored() {
724 let _ = pretty_env_logger::try_init();
725
726 let io = tokio_test::io::Builder::new()
727 // no reading or writing, just be blocked for the test...
728 .wait(Duration::from_secs(5))
729 .build();
730
731 let (mut tx, rx) = crate::client::dispatch::channel();
732 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
733 let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
734
735 // First poll is needed to allow tx to send...
736 assert!(dispatcher.poll().is_pending());
737
738 let body = {
739 let (mut tx, body) = crate::Body::channel();
740 tx.try_send_data("".into()).unwrap();
741 body
742 };
743
744 let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
745
746 // Ensure conn.write_body wasn't called with the empty chunk.
747 // If it is, it will trigger an assertion.
748 assert!(dispatcher.poll().is_pending());
749 }
750}
751