1use std::fmt;
2use std::io;
3use std::marker::PhantomData;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7#[cfg(all(feature = "server", feature = "runtime"))]
8use std::time::Duration;
9
10use bytes::{Buf, Bytes};
11use http::header::{HeaderValue, CONNECTION};
12use http::{HeaderMap, Method, Version};
13use httparse::ParserConfig;
14use tokio::io::{AsyncRead, AsyncWrite};
15#[cfg(all(feature = "server", feature = "runtime"))]
16use tokio::time::Sleep;
17use tracing::{debug, error, trace};
18
19use super::io::Buffered;
20use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21use crate::body::DecodedLength;
22use crate::headers::connection_keep_alive;
23use crate::proto::{BodyLength, MessageHead};
24
25const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
26
27/// This handles a connection, which will have been established over an
28/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
29/// `Transaction`s over HTTP.
30///
31/// The connection will determine when a message begins and ends as well as
32/// determine if this connection can be kept alive after the message,
33/// or if it is complete.
34pub(crate) struct Conn<I, B, T> {
35 io: Buffered<I, EncodedBuf<B>>,
36 state: State,
37 _marker: PhantomData<fn(T)>,
38}
39
40impl<I, B, T> Conn<I, B, T>
41where
42 I: AsyncRead + AsyncWrite + Unpin,
43 B: Buf,
44 T: Http1Transaction,
45{
46 pub(crate) fn new(io: I) -> Conn<I, B, T> {
47 Conn {
48 io: Buffered::new(io),
49 state: State {
50 allow_half_close: false,
51 cached_headers: None,
52 error: None,
53 keep_alive: KA::Busy,
54 method: None,
55 h1_parser_config: ParserConfig::default(),
56 #[cfg(all(feature = "server", feature = "runtime"))]
57 h1_header_read_timeout: None,
58 #[cfg(all(feature = "server", feature = "runtime"))]
59 h1_header_read_timeout_fut: None,
60 #[cfg(all(feature = "server", feature = "runtime"))]
61 h1_header_read_timeout_running: false,
62 preserve_header_case: false,
63 #[cfg(feature = "ffi")]
64 preserve_header_order: false,
65 title_case_headers: false,
66 h09_responses: false,
67 #[cfg(feature = "ffi")]
68 on_informational: None,
69 #[cfg(feature = "ffi")]
70 raw_headers: false,
71 notify_read: false,
72 reading: Reading::Init,
73 writing: Writing::Init,
74 upgrade: None,
75 // We assume a modern world where the remote speaks HTTP/1.1.
76 // If they tell us otherwise, we'll downgrade in `read_head`.
77 version: Version::HTTP_11,
78 },
79 _marker: PhantomData,
80 }
81 }
82
83 #[cfg(feature = "server")]
84 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
85 self.io.set_flush_pipeline(enabled);
86 }
87
88 pub(crate) fn set_write_strategy_queue(&mut self) {
89 self.io.set_write_strategy_queue();
90 }
91
92 pub(crate) fn set_max_buf_size(&mut self, max: usize) {
93 self.io.set_max_buf_size(max);
94 }
95
96 #[cfg(feature = "client")]
97 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
98 self.io.set_read_buf_exact_size(sz);
99 }
100
101 pub(crate) fn set_write_strategy_flatten(&mut self) {
102 self.io.set_write_strategy_flatten();
103 }
104
105 #[cfg(feature = "client")]
106 pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
107 self.state.h1_parser_config = parser_config;
108 }
109
110 pub(crate) fn set_title_case_headers(&mut self) {
111 self.state.title_case_headers = true;
112 }
113
114 pub(crate) fn set_preserve_header_case(&mut self) {
115 self.state.preserve_header_case = true;
116 }
117
118 #[cfg(feature = "ffi")]
119 pub(crate) fn set_preserve_header_order(&mut self) {
120 self.state.preserve_header_order = true;
121 }
122
123 #[cfg(feature = "client")]
124 pub(crate) fn set_h09_responses(&mut self) {
125 self.state.h09_responses = true;
126 }
127
128 #[cfg(all(feature = "server", feature = "runtime"))]
129 pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
130 self.state.h1_header_read_timeout = Some(val);
131 }
132
133 #[cfg(feature = "server")]
134 pub(crate) fn set_allow_half_close(&mut self) {
135 self.state.allow_half_close = true;
136 }
137
138 #[cfg(feature = "ffi")]
139 pub(crate) fn set_raw_headers(&mut self, enabled: bool) {
140 self.state.raw_headers = enabled;
141 }
142
143 pub(crate) fn into_inner(self) -> (I, Bytes) {
144 self.io.into_inner()
145 }
146
147 pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
148 self.state.upgrade.take()
149 }
150
151 pub(crate) fn is_read_closed(&self) -> bool {
152 self.state.is_read_closed()
153 }
154
155 pub(crate) fn is_write_closed(&self) -> bool {
156 self.state.is_write_closed()
157 }
158
159 pub(crate) fn can_read_head(&self) -> bool {
160 if !matches!(self.state.reading, Reading::Init) {
161 return false;
162 }
163
164 if T::should_read_first() {
165 return true;
166 }
167
168 !matches!(self.state.writing, Writing::Init)
169 }
170
171 pub(crate) fn can_read_body(&self) -> bool {
172 match self.state.reading {
173 Reading::Body(..) | Reading::Continue(..) => true,
174 _ => false,
175 }
176 }
177
178 fn should_error_on_eof(&self) -> bool {
179 // If we're idle, it's probably just the connection closing gracefully.
180 T::should_error_on_parse_eof() && !self.state.is_idle()
181 }
182
183 fn has_h2_prefix(&self) -> bool {
184 let read_buf = self.io.read_buf();
185 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
186 }
187
188 pub(super) fn poll_read_head(
189 &mut self,
190 cx: &mut Context<'_>,
191 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
192 debug_assert!(self.can_read_head());
193 trace!("Conn::read_head");
194
195 let msg = match ready!(self.io.parse::<T>(
196 cx,
197 ParseContext {
198 cached_headers: &mut self.state.cached_headers,
199 req_method: &mut self.state.method,
200 h1_parser_config: self.state.h1_parser_config.clone(),
201 #[cfg(all(feature = "server", feature = "runtime"))]
202 h1_header_read_timeout: self.state.h1_header_read_timeout,
203 #[cfg(all(feature = "server", feature = "runtime"))]
204 h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut,
205 #[cfg(all(feature = "server", feature = "runtime"))]
206 h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running,
207 preserve_header_case: self.state.preserve_header_case,
208 #[cfg(feature = "ffi")]
209 preserve_header_order: self.state.preserve_header_order,
210 h09_responses: self.state.h09_responses,
211 #[cfg(feature = "ffi")]
212 on_informational: &mut self.state.on_informational,
213 #[cfg(feature = "ffi")]
214 raw_headers: self.state.raw_headers,
215 }
216 )) {
217 Ok(msg) => msg,
218 Err(e) => return self.on_read_head_error(e),
219 };
220
221 // Note: don't deconstruct `msg` into local variables, it appears
222 // the optimizer doesn't remove the extra copies.
223
224 debug!("incoming body is {}", msg.decode);
225
226 // Prevent accepting HTTP/0.9 responses after the initial one, if any.
227 self.state.h09_responses = false;
228
229 // Drop any OnInformational callbacks, we're done there!
230 #[cfg(feature = "ffi")]
231 {
232 self.state.on_informational = None;
233 }
234
235 self.state.busy();
236 self.state.keep_alive &= msg.keep_alive;
237 self.state.version = msg.head.version;
238
239 let mut wants = if msg.wants_upgrade {
240 Wants::UPGRADE
241 } else {
242 Wants::EMPTY
243 };
244
245 if msg.decode == DecodedLength::ZERO {
246 if msg.expect_continue {
247 debug!("ignoring expect-continue since body is empty");
248 }
249 self.state.reading = Reading::KeepAlive;
250 if !T::should_read_first() {
251 self.try_keep_alive(cx);
252 }
253 } else if msg.expect_continue {
254 self.state.reading = Reading::Continue(Decoder::new(msg.decode));
255 wants = wants.add(Wants::EXPECT);
256 } else {
257 self.state.reading = Reading::Body(Decoder::new(msg.decode));
258 }
259
260 Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
261 }
262
263 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
264 // If we are currently waiting on a message, then an empty
265 // message should be reported as an error. If not, it is just
266 // the connection closing gracefully.
267 let must_error = self.should_error_on_eof();
268 self.close_read();
269 self.io.consume_leading_lines();
270 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
271 if was_mid_parse || must_error {
272 // We check if the buf contains the h2 Preface
273 debug!(
274 "parse error ({}) with {} bytes",
275 e,
276 self.io.read_buf().len()
277 );
278 match self.on_parse_error(e) {
279 Ok(()) => Poll::Pending, // XXX: wat?
280 Err(e) => Poll::Ready(Some(Err(e))),
281 }
282 } else {
283 debug!("read eof");
284 self.close_write();
285 Poll::Ready(None)
286 }
287 }
288
289 pub(crate) fn poll_read_body(
290 &mut self,
291 cx: &mut Context<'_>,
292 ) -> Poll<Option<io::Result<Bytes>>> {
293 debug_assert!(self.can_read_body());
294
295 let (reading, ret) = match self.state.reading {
296 Reading::Body(ref mut decoder) => {
297 match ready!(decoder.decode(cx, &mut self.io)) {
298 Ok(slice) => {
299 let (reading, chunk) = if decoder.is_eof() {
300 debug!("incoming body completed");
301 (
302 Reading::KeepAlive,
303 if !slice.is_empty() {
304 Some(Ok(slice))
305 } else {
306 None
307 },
308 )
309 } else if slice.is_empty() {
310 error!("incoming body unexpectedly ended");
311 // This should be unreachable, since all 3 decoders
312 // either set eof=true or return an Err when reading
313 // an empty slice...
314 (Reading::Closed, None)
315 } else {
316 return Poll::Ready(Some(Ok(slice)));
317 };
318 (reading, Poll::Ready(chunk))
319 }
320 Err(e) => {
321 debug!("incoming body decode error: {}", e);
322 (Reading::Closed, Poll::Ready(Some(Err(e))))
323 }
324 }
325 }
326 Reading::Continue(ref decoder) => {
327 // Write the 100 Continue if not already responded...
328 if let Writing::Init = self.state.writing {
329 trace!("automatically sending 100 Continue");
330 let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
331 self.io.headers_buf().extend_from_slice(cont);
332 }
333
334 // And now recurse once in the Reading::Body state...
335 self.state.reading = Reading::Body(decoder.clone());
336 return self.poll_read_body(cx);
337 }
338 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
339 };
340
341 self.state.reading = reading;
342 self.try_keep_alive(cx);
343 ret
344 }
345
346 pub(crate) fn wants_read_again(&mut self) -> bool {
347 let ret = self.state.notify_read;
348 self.state.notify_read = false;
349 ret
350 }
351
352 pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
353 debug_assert!(!self.can_read_head() && !self.can_read_body());
354
355 if self.is_read_closed() {
356 Poll::Pending
357 } else if self.is_mid_message() {
358 self.mid_message_detect_eof(cx)
359 } else {
360 self.require_empty_read(cx)
361 }
362 }
363
364 fn is_mid_message(&self) -> bool {
365 !matches!(
366 (&self.state.reading, &self.state.writing),
367 (&Reading::Init, &Writing::Init)
368 )
369 }
370
371 // This will check to make sure the io object read is empty.
372 //
373 // This should only be called for Clients wanting to enter the idle
374 // state.
375 fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
376 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
377 debug_assert!(!self.is_mid_message());
378 debug_assert!(T::is_client());
379
380 if !self.io.read_buf().is_empty() {
381 debug!("received an unexpected {} bytes", self.io.read_buf().len());
382 return Poll::Ready(Err(crate::Error::new_unexpected_message()));
383 }
384
385 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
386
387 if num_read == 0 {
388 let ret = if self.should_error_on_eof() {
389 trace!("found unexpected EOF on busy connection: {:?}", self.state);
390 Poll::Ready(Err(crate::Error::new_incomplete()))
391 } else {
392 trace!("found EOF on idle connection, closing");
393 Poll::Ready(Ok(()))
394 };
395
396 // order is important: should_error needs state BEFORE close_read
397 self.state.close_read();
398 return ret;
399 }
400
401 debug!(
402 "received unexpected {} bytes on an idle connection",
403 num_read
404 );
405 Poll::Ready(Err(crate::Error::new_unexpected_message()))
406 }
407
408 fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
409 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
410 debug_assert!(self.is_mid_message());
411
412 if self.state.allow_half_close || !self.io.read_buf().is_empty() {
413 return Poll::Pending;
414 }
415
416 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
417
418 if num_read == 0 {
419 trace!("found unexpected EOF on busy connection: {:?}", self.state);
420 self.state.close_read();
421 Poll::Ready(Err(crate::Error::new_incomplete()))
422 } else {
423 Poll::Ready(Ok(()))
424 }
425 }
426
427 fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
428 debug_assert!(!self.state.is_read_closed());
429
430 let result = ready!(self.io.poll_read_from_io(cx));
431 Poll::Ready(result.map_err(|e| {
432 trace!("force_io_read; io error = {:?}", e);
433 self.state.close();
434 e
435 }))
436 }
437
438 fn maybe_notify(&mut self, cx: &mut Context<'_>) {
439 // its possible that we returned NotReady from poll() without having
440 // exhausted the underlying Io. We would have done this when we
441 // determined we couldn't keep reading until we knew how writing
442 // would finish.
443
444 match self.state.reading {
445 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
446 return
447 }
448 Reading::Init => (),
449 };
450
451 match self.state.writing {
452 Writing::Body(..) => return,
453 Writing::Init | Writing::KeepAlive | Writing::Closed => (),
454 }
455
456 if !self.io.is_read_blocked() {
457 if self.io.read_buf().is_empty() {
458 match self.io.poll_read_from_io(cx) {
459 Poll::Ready(Ok(n)) => {
460 if n == 0 {
461 trace!("maybe_notify; read eof");
462 if self.state.is_idle() {
463 self.state.close();
464 } else {
465 self.close_read()
466 }
467 return;
468 }
469 }
470 Poll::Pending => {
471 trace!("maybe_notify; read_from_io blocked");
472 return;
473 }
474 Poll::Ready(Err(e)) => {
475 trace!("maybe_notify; read_from_io error: {}", e);
476 self.state.close();
477 self.state.error = Some(crate::Error::new_io(e));
478 }
479 }
480 }
481 self.state.notify_read = true;
482 }
483 }
484
485 fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
486 self.state.try_keep_alive::<T>();
487 self.maybe_notify(cx);
488 }
489
490 pub(crate) fn can_write_head(&self) -> bool {
491 if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
492 return false;
493 }
494
495 match self.state.writing {
496 Writing::Init => self.io.can_headers_buf(),
497 _ => false,
498 }
499 }
500
501 pub(crate) fn can_write_body(&self) -> bool {
502 match self.state.writing {
503 Writing::Body(..) => true,
504 Writing::Init | Writing::KeepAlive | Writing::Closed => false,
505 }
506 }
507
508 pub(crate) fn can_buffer_body(&self) -> bool {
509 self.io.can_buffer()
510 }
511
512 pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
513 if let Some(encoder) = self.encode_head(head, body) {
514 self.state.writing = if !encoder.is_eof() {
515 Writing::Body(encoder)
516 } else if encoder.is_last() {
517 Writing::Closed
518 } else {
519 Writing::KeepAlive
520 };
521 }
522 }
523
524 pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
525 if let Some(encoder) =
526 self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
527 {
528 let is_last = encoder.is_last();
529 // Make sure we don't write a body if we weren't actually allowed
530 // to do so, like because its a HEAD request.
531 if !encoder.is_eof() {
532 encoder.danger_full_buf(body, self.io.write_buf());
533 }
534 self.state.writing = if is_last {
535 Writing::Closed
536 } else {
537 Writing::KeepAlive
538 }
539 }
540 }
541
542 fn encode_head(
543 &mut self,
544 mut head: MessageHead<T::Outgoing>,
545 body: Option<BodyLength>,
546 ) -> Option<Encoder> {
547 debug_assert!(self.can_write_head());
548
549 if !T::should_read_first() {
550 self.state.busy();
551 }
552
553 self.enforce_version(&mut head);
554
555 let buf = self.io.headers_buf();
556 match super::role::encode_headers::<T>(
557 Encode {
558 head: &mut head,
559 body,
560 #[cfg(feature = "server")]
561 keep_alive: self.state.wants_keep_alive(),
562 req_method: &mut self.state.method,
563 title_case_headers: self.state.title_case_headers,
564 },
565 buf,
566 ) {
567 Ok(encoder) => {
568 debug_assert!(self.state.cached_headers.is_none());
569 debug_assert!(head.headers.is_empty());
570 self.state.cached_headers = Some(head.headers);
571
572 #[cfg(feature = "ffi")]
573 {
574 self.state.on_informational =
575 head.extensions.remove::<crate::ffi::OnInformational>();
576 }
577
578 Some(encoder)
579 }
580 Err(err) => {
581 self.state.error = Some(err);
582 self.state.writing = Writing::Closed;
583 None
584 }
585 }
586 }
587
588 // Fix keep-alive when Connection: keep-alive header is not present
589 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
590 let outgoing_is_keep_alive = head
591 .headers
592 .get(CONNECTION)
593 .map(connection_keep_alive)
594 .unwrap_or(false);
595
596 if !outgoing_is_keep_alive {
597 match head.version {
598 // If response is version 1.0 and keep-alive is not present in the response,
599 // disable keep-alive so the server closes the connection
600 Version::HTTP_10 => self.state.disable_keep_alive(),
601 // If response is version 1.1 and keep-alive is wanted, add
602 // Connection: keep-alive header when not present
603 Version::HTTP_11 => {
604 if self.state.wants_keep_alive() {
605 head.headers
606 .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
607 }
608 }
609 _ => (),
610 }
611 }
612 }
613
614 // If we know the remote speaks an older version, we try to fix up any messages
615 // to work with our older peer.
616 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
617 if let Version::HTTP_10 = self.state.version {
618 // Fixes response or connection when keep-alive header is not present
619 self.fix_keep_alive(head);
620 // If the remote only knows HTTP/1.0, we should force ourselves
621 // to do only speak HTTP/1.0 as well.
622 head.version = Version::HTTP_10;
623 }
624 // If the remote speaks HTTP/1.1, then it *should* be fine with
625 // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
626 // the user's headers be.
627 }
628
629 pub(crate) fn write_body(&mut self, chunk: B) {
630 debug_assert!(self.can_write_body() && self.can_buffer_body());
631 // empty chunks should be discarded at Dispatcher level
632 debug_assert!(chunk.remaining() != 0);
633
634 let state = match self.state.writing {
635 Writing::Body(ref mut encoder) => {
636 self.io.buffer(encoder.encode(chunk));
637
638 if !encoder.is_eof() {
639 return;
640 }
641
642 if encoder.is_last() {
643 Writing::Closed
644 } else {
645 Writing::KeepAlive
646 }
647 }
648 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
649 };
650
651 self.state.writing = state;
652 }
653
654 pub(crate) fn write_body_and_end(&mut self, chunk: B) {
655 debug_assert!(self.can_write_body() && self.can_buffer_body());
656 // empty chunks should be discarded at Dispatcher level
657 debug_assert!(chunk.remaining() != 0);
658
659 let state = match self.state.writing {
660 Writing::Body(ref encoder) => {
661 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
662 if can_keep_alive {
663 Writing::KeepAlive
664 } else {
665 Writing::Closed
666 }
667 }
668 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
669 };
670
671 self.state.writing = state;
672 }
673
674 pub(crate) fn end_body(&mut self) -> crate::Result<()> {
675 debug_assert!(self.can_write_body());
676
677 let encoder = match self.state.writing {
678 Writing::Body(ref mut enc) => enc,
679 _ => return Ok(()),
680 };
681
682 // end of stream, that means we should try to eof
683 match encoder.end() {
684 Ok(end) => {
685 if let Some(end) = end {
686 self.io.buffer(end);
687 }
688
689 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
690 Writing::Closed
691 } else {
692 Writing::KeepAlive
693 };
694
695 Ok(())
696 }
697 Err(not_eof) => {
698 self.state.writing = Writing::Closed;
699 Err(crate::Error::new_body_write_aborted().with(not_eof))
700 }
701 }
702 }
703
704 // When we get a parse error, depending on what side we are, we might be able
705 // to write a response before closing the connection.
706 //
707 // - Client: there is nothing we can do
708 // - Server: if Response hasn't been written yet, we can send a 4xx response
709 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
710 if let Writing::Init = self.state.writing {
711 if self.has_h2_prefix() {
712 return Err(crate::Error::new_version_h2());
713 }
714 if let Some(msg) = T::on_error(&err) {
715 // Drop the cached headers so as to not trigger a debug
716 // assert in `write_head`...
717 self.state.cached_headers.take();
718 self.write_head(msg, None);
719 self.state.error = Some(err);
720 return Ok(());
721 }
722 }
723
724 // fallback is pass the error back up
725 Err(err)
726 }
727
728 pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
729 ready!(Pin::new(&mut self.io).poll_flush(cx))?;
730 self.try_keep_alive(cx);
731 trace!("flushed({}): {:?}", T::LOG, self.state);
732 Poll::Ready(Ok(()))
733 }
734
735 pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
736 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
737 Ok(()) => {
738 trace!("shut down IO complete");
739 Poll::Ready(Ok(()))
740 }
741 Err(e) => {
742 debug!("error shutting down IO: {}", e);
743 Poll::Ready(Err(e))
744 }
745 }
746 }
747
748 /// If the read side can be cheaply drained, do so. Otherwise, close.
749 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
750 if let Reading::Continue(ref decoder) = self.state.reading {
751 // skip sending the 100-continue
752 // just move forward to a read, in case a tiny body was included
753 self.state.reading = Reading::Body(decoder.clone());
754 }
755
756 let _ = self.poll_read_body(cx);
757
758 // If still in Reading::Body, just give up
759 match self.state.reading {
760 Reading::Init | Reading::KeepAlive => trace!("body drained"),
761 _ => self.close_read(),
762 }
763 }
764
765 pub(crate) fn close_read(&mut self) {
766 self.state.close_read();
767 }
768
769 pub(crate) fn close_write(&mut self) {
770 self.state.close_write();
771 }
772
773 #[cfg(feature = "server")]
774 pub(crate) fn disable_keep_alive(&mut self) {
775 if self.state.is_idle() {
776 trace!("disable_keep_alive; closing idle connection");
777 self.state.close();
778 } else {
779 trace!("disable_keep_alive; in-progress connection");
780 self.state.disable_keep_alive();
781 }
782 }
783
784 pub(crate) fn take_error(&mut self) -> crate::Result<()> {
785 if let Some(err) = self.state.error.take() {
786 Err(err)
787 } else {
788 Ok(())
789 }
790 }
791
792 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
793 trace!("{}: prepare possible HTTP upgrade", T::LOG);
794 self.state.prepare_upgrade()
795 }
796}
797
798impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
799 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
800 f&mut DebugStruct<'_, '_>.debug_struct("Conn")
801 .field("state", &self.state)
802 .field(name:"io", &self.io)
803 .finish()
804 }
805}
806
807// B and T are never pinned
808impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
809
810struct State {
811 allow_half_close: bool,
812 /// Re-usable HeaderMap to reduce allocating new ones.
813 cached_headers: Option<HeaderMap>,
814 /// If an error occurs when there wasn't a direct way to return it
815 /// back to the user, this is set.
816 error: Option<crate::Error>,
817 /// Current keep-alive status.
818 keep_alive: KA,
819 /// If mid-message, the HTTP Method that started it.
820 ///
821 /// This is used to know things such as if the message can include
822 /// a body or not.
823 method: Option<Method>,
824 h1_parser_config: ParserConfig,
825 #[cfg(all(feature = "server", feature = "runtime"))]
826 h1_header_read_timeout: Option<Duration>,
827 #[cfg(all(feature = "server", feature = "runtime"))]
828 h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>,
829 #[cfg(all(feature = "server", feature = "runtime"))]
830 h1_header_read_timeout_running: bool,
831 preserve_header_case: bool,
832 #[cfg(feature = "ffi")]
833 preserve_header_order: bool,
834 title_case_headers: bool,
835 h09_responses: bool,
836 /// If set, called with each 1xx informational response received for
837 /// the current request. MUST be unset after a non-1xx response is
838 /// received.
839 #[cfg(feature = "ffi")]
840 on_informational: Option<crate::ffi::OnInformational>,
841 #[cfg(feature = "ffi")]
842 raw_headers: bool,
843 /// Set to true when the Dispatcher should poll read operations
844 /// again. See the `maybe_notify` method for more.
845 notify_read: bool,
846 /// State of allowed reads
847 reading: Reading,
848 /// State of allowed writes
849 writing: Writing,
850 /// An expected pending HTTP upgrade.
851 upgrade: Option<crate::upgrade::Pending>,
852 /// Either HTTP/1.0 or 1.1 connection
853 version: Version,
854}
855
856#[derive(Debug)]
857enum Reading {
858 Init,
859 Continue(Decoder),
860 Body(Decoder),
861 KeepAlive,
862 Closed,
863}
864
865enum Writing {
866 Init,
867 Body(Encoder),
868 KeepAlive,
869 Closed,
870}
871
872impl fmt::Debug for State {
873 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
874 let mut builder: DebugStruct<'_, '_> = f.debug_struct(name:"State");
875 builder
876 .field("reading", &self.reading)
877 .field("writing", &self.writing)
878 .field(name:"keep_alive", &self.keep_alive);
879
880 // Only show error field if it's interesting...
881 if let Some(ref error: &Error) = self.error {
882 builder.field(name:"error", value:error);
883 }
884
885 if self.allow_half_close {
886 builder.field(name:"allow_half_close", &true);
887 }
888
889 // Purposefully leaving off other fields..
890
891 builder.finish()
892 }
893}
894
895impl fmt::Debug for Writing {
896 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
897 match *self {
898 Writing::Init => f.write_str(data:"Init"),
899 Writing::Body(ref enc: &Encoder) => f.debug_tuple(name:"Body").field(enc).finish(),
900 Writing::KeepAlive => f.write_str(data:"KeepAlive"),
901 Writing::Closed => f.write_str(data:"Closed"),
902 }
903 }
904}
905
906impl std::ops::BitAndAssign<bool> for KA {
907 fn bitand_assign(&mut self, enabled: bool) {
908 if !enabled {
909 trace!("remote disabling keep-alive");
910 *self = KA::Disabled;
911 }
912 }
913}
914
915#[derive(Clone, Copy, Debug)]
916enum KA {
917 Idle,
918 Busy,
919 Disabled,
920}
921
922impl Default for KA {
923 fn default() -> KA {
924 KA::Busy
925 }
926}
927
928impl KA {
929 fn idle(&mut self) {
930 *self = KA::Idle;
931 }
932
933 fn busy(&mut self) {
934 *self = KA::Busy;
935 }
936
937 fn disable(&mut self) {
938 *self = KA::Disabled;
939 }
940
941 fn status(&self) -> KA {
942 *self
943 }
944}
945
946impl State {
947 fn close(&mut self) {
948 trace!("State::close()");
949 self.reading = Reading::Closed;
950 self.writing = Writing::Closed;
951 self.keep_alive.disable();
952 }
953
954 fn close_read(&mut self) {
955 trace!("State::close_read()");
956 self.reading = Reading::Closed;
957 self.keep_alive.disable();
958 }
959
960 fn close_write(&mut self) {
961 trace!("State::close_write()");
962 self.writing = Writing::Closed;
963 self.keep_alive.disable();
964 }
965
966 fn wants_keep_alive(&self) -> bool {
967 if let KA::Disabled = self.keep_alive.status() {
968 false
969 } else {
970 true
971 }
972 }
973
974 fn try_keep_alive<T: Http1Transaction>(&mut self) {
975 match (&self.reading, &self.writing) {
976 (&Reading::KeepAlive, &Writing::KeepAlive) => {
977 if let KA::Busy = self.keep_alive.status() {
978 self.idle::<T>();
979 } else {
980 trace!(
981 "try_keep_alive({}): could keep-alive, but status = {:?}",
982 T::LOG,
983 self.keep_alive
984 );
985 self.close();
986 }
987 }
988 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
989 self.close()
990 }
991 _ => (),
992 }
993 }
994
995 fn disable_keep_alive(&mut self) {
996 self.keep_alive.disable()
997 }
998
999 fn busy(&mut self) {
1000 if let KA::Disabled = self.keep_alive.status() {
1001 return;
1002 }
1003 self.keep_alive.busy();
1004 }
1005
1006 fn idle<T: Http1Transaction>(&mut self) {
1007 debug_assert!(!self.is_idle(), "State::idle() called while idle");
1008
1009 self.method = None;
1010 self.keep_alive.idle();
1011
1012 if !self.is_idle() {
1013 self.close();
1014 return;
1015 }
1016
1017 self.reading = Reading::Init;
1018 self.writing = Writing::Init;
1019
1020 // !T::should_read_first() means Client.
1021 //
1022 // If Client connection has just gone idle, the Dispatcher
1023 // should try the poll loop one more time, so as to poll the
1024 // pending requests stream.
1025 if !T::should_read_first() {
1026 self.notify_read = true;
1027 }
1028 }
1029
1030 fn is_idle(&self) -> bool {
1031 matches!(self.keep_alive.status(), KA::Idle)
1032 }
1033
1034 fn is_read_closed(&self) -> bool {
1035 matches!(self.reading, Reading::Closed)
1036 }
1037
1038 fn is_write_closed(&self) -> bool {
1039 matches!(self.writing, Writing::Closed)
1040 }
1041
1042 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1043 let (tx, rx) = crate::upgrade::pending();
1044 self.upgrade = Some(tx);
1045 rx
1046 }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051 #[cfg(feature = "nightly")]
1052 #[bench]
1053 fn bench_read_head_short(b: &mut ::test::Bencher) {
1054 use super::*;
1055 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1056 let len = s.len();
1057 b.bytes = len as u64;
1058
1059 // an empty IO, we'll be skipping and using the read buffer anyways
1060 let io = tokio_test::io::Builder::new().build();
1061 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1062 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1063 conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1064
1065 let rt = tokio::runtime::Builder::new_current_thread()
1066 .enable_all()
1067 .build()
1068 .unwrap();
1069
1070 b.iter(|| {
1071 rt.block_on(futures_util::future::poll_fn(|cx| {
1072 match conn.poll_read_head(cx) {
1073 Poll::Ready(Some(Ok(x))) => {
1074 ::test::black_box(&x);
1075 let mut headers = x.0.headers;
1076 headers.clear();
1077 conn.state.cached_headers = Some(headers);
1078 }
1079 f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1080 }
1081
1082 conn.io.read_buf_mut().reserve(1);
1083 unsafe {
1084 conn.io.read_buf_mut().set_len(len);
1085 }
1086 conn.state.reading = Reading::Init;
1087 Poll::Ready(())
1088 }));
1089 });
1090 }
1091
1092 /*
1093 //TODO: rewrite these using dispatch... someday...
1094 use futures::{Async, Future, Stream, Sink};
1095 use futures::future;
1096
1097 use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1098 use super::super::Encoder;
1099 use mock::AsyncIo;
1100
1101 use super::{Conn, Decoder, Reading, Writing};
1102 use ::uri::Uri;
1103
1104 use std::str::FromStr;
1105
1106 #[test]
1107 fn test_conn_init_read() {
1108 let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1109 let len = good_message.len();
1110 let io = AsyncIo::new_buf(good_message, len);
1111 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1112
1113 match conn.poll().unwrap() {
1114 Async::Ready(Some(Frame::Message { message, body: false })) => {
1115 assert_eq!(message, MessageHead {
1116 subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1117 .. MessageHead::default()
1118 })
1119 },
1120 f => panic!("frame is not Frame::Message: {:?}", f)
1121 }
1122 }
1123
1124 #[test]
1125 fn test_conn_parse_partial() {
1126 let _: Result<(), ()> = future::lazy(|| {
1127 let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1128 let io = AsyncIo::new_buf(good_message, 10);
1129 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1130 assert!(conn.poll().unwrap().is_not_ready());
1131 conn.io.io_mut().block_in(50);
1132 let async = conn.poll().unwrap();
1133 assert!(async.is_ready());
1134 match async {
1135 Async::Ready(Some(Frame::Message { .. })) => (),
1136 f => panic!("frame is not Message: {:?}", f),
1137 }
1138 Ok(())
1139 }).wait();
1140 }
1141
1142 #[test]
1143 fn test_conn_init_read_eof_idle() {
1144 let io = AsyncIo::new_buf(vec![], 1);
1145 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1146 conn.state.idle();
1147
1148 match conn.poll().unwrap() {
1149 Async::Ready(None) => {},
1150 other => panic!("frame is not None: {:?}", other)
1151 }
1152 }
1153
1154 #[test]
1155 fn test_conn_init_read_eof_idle_partial_parse() {
1156 let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1157 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1158 conn.state.idle();
1159
1160 match conn.poll() {
1161 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1162 other => panic!("unexpected frame: {:?}", other)
1163 }
1164 }
1165
1166 #[test]
1167 fn test_conn_init_read_eof_busy() {
1168 let _: Result<(), ()> = future::lazy(|| {
1169 // server ignores
1170 let io = AsyncIo::new_eof();
1171 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1172 conn.state.busy();
1173
1174 match conn.poll().unwrap() {
1175 Async::Ready(None) => {},
1176 other => panic!("unexpected frame: {:?}", other)
1177 }
1178
1179 // client
1180 let io = AsyncIo::new_eof();
1181 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1182 conn.state.busy();
1183
1184 match conn.poll() {
1185 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1186 other => panic!("unexpected frame: {:?}", other)
1187 }
1188 Ok(())
1189 }).wait();
1190 }
1191
1192 #[test]
1193 fn test_conn_body_finish_read_eof() {
1194 let _: Result<(), ()> = future::lazy(|| {
1195 let io = AsyncIo::new_eof();
1196 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1197 conn.state.busy();
1198 conn.state.writing = Writing::KeepAlive;
1199 conn.state.reading = Reading::Body(Decoder::length(0));
1200
1201 match conn.poll() {
1202 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1203 other => panic!("unexpected frame: {:?}", other)
1204 }
1205
1206 // conn eofs, but tokio-proto will call poll() again, before calling flush()
1207 // the conn eof in this case is perfectly fine
1208
1209 match conn.poll() {
1210 Ok(Async::Ready(None)) => (),
1211 other => panic!("unexpected frame: {:?}", other)
1212 }
1213 Ok(())
1214 }).wait();
1215 }
1216
1217 #[test]
1218 fn test_conn_message_empty_body_read_eof() {
1219 let _: Result<(), ()> = future::lazy(|| {
1220 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1221 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1222 conn.state.busy();
1223 conn.state.writing = Writing::KeepAlive;
1224
1225 match conn.poll() {
1226 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1227 other => panic!("unexpected frame: {:?}", other)
1228 }
1229
1230 // conn eofs, but tokio-proto will call poll() again, before calling flush()
1231 // the conn eof in this case is perfectly fine
1232
1233 match conn.poll() {
1234 Ok(Async::Ready(None)) => (),
1235 other => panic!("unexpected frame: {:?}", other)
1236 }
1237 Ok(())
1238 }).wait();
1239 }
1240
1241 #[test]
1242 fn test_conn_read_body_end() {
1243 let _: Result<(), ()> = future::lazy(|| {
1244 let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1245 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1246 conn.state.busy();
1247
1248 match conn.poll() {
1249 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1250 other => panic!("unexpected frame: {:?}", other)
1251 }
1252
1253 match conn.poll() {
1254 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1255 other => panic!("unexpected frame: {:?}", other)
1256 }
1257
1258 // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1259 match conn.poll() {
1260 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1261 other => panic!("unexpected frame: {:?}", other)
1262 }
1263
1264 match conn.poll() {
1265 Ok(Async::NotReady) => (),
1266 other => panic!("unexpected frame: {:?}", other)
1267 }
1268 Ok(())
1269 }).wait();
1270 }
1271
1272 #[test]
1273 fn test_conn_closed_read() {
1274 let io = AsyncIo::new_buf(vec![], 0);
1275 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1276 conn.state.close();
1277
1278 match conn.poll().unwrap() {
1279 Async::Ready(None) => {},
1280 other => panic!("frame is not None: {:?}", other)
1281 }
1282 }
1283
1284 #[test]
1285 fn test_conn_body_write_length() {
1286 let _ = pretty_env_logger::try_init();
1287 let _: Result<(), ()> = future::lazy(|| {
1288 let io = AsyncIo::new_buf(vec![], 0);
1289 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1290 let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1291 conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1292
1293 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1294 assert!(!conn.can_buffer_body());
1295
1296 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1297
1298 conn.io.io_mut().block_in(1024 * 3);
1299 assert!(conn.poll_complete().unwrap().is_not_ready());
1300 conn.io.io_mut().block_in(1024 * 3);
1301 assert!(conn.poll_complete().unwrap().is_not_ready());
1302 conn.io.io_mut().block_in(max * 2);
1303 assert!(conn.poll_complete().unwrap().is_ready());
1304
1305 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1306 Ok(())
1307 }).wait();
1308 }
1309
1310 #[test]
1311 fn test_conn_body_write_chunked() {
1312 let _: Result<(), ()> = future::lazy(|| {
1313 let io = AsyncIo::new_buf(vec![], 4096);
1314 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1315 conn.state.writing = Writing::Body(Encoder::chunked());
1316
1317 assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1318 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1319 Ok(())
1320 }).wait();
1321 }
1322
1323 #[test]
1324 fn test_conn_body_flush() {
1325 let _: Result<(), ()> = future::lazy(|| {
1326 let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1327 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1328 conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1329 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1330 assert!(!conn.can_buffer_body());
1331 conn.io.io_mut().block_in(1024 * 1024 * 5);
1332 assert!(conn.poll_complete().unwrap().is_ready());
1333 assert!(conn.can_buffer_body());
1334 assert!(conn.io.io_mut().flushed());
1335
1336 Ok(())
1337 }).wait();
1338 }
1339
1340 #[test]
1341 fn test_conn_parking() {
1342 use std::sync::Arc;
1343 use futures::executor::Notify;
1344 use futures::executor::NotifyHandle;
1345
1346 struct Car {
1347 permit: bool,
1348 }
1349 impl Notify for Car {
1350 fn notify(&self, _id: usize) {
1351 assert!(self.permit, "unparked without permit");
1352 }
1353 }
1354
1355 fn car(permit: bool) -> NotifyHandle {
1356 Arc::new(Car {
1357 permit: permit,
1358 }).into()
1359 }
1360
1361 // test that once writing is done, unparks
1362 let f = future::lazy(|| {
1363 let io = AsyncIo::new_buf(vec![], 4096);
1364 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1365 conn.state.reading = Reading::KeepAlive;
1366 assert!(conn.poll().unwrap().is_not_ready());
1367
1368 conn.state.writing = Writing::KeepAlive;
1369 assert!(conn.poll_complete().unwrap().is_ready());
1370 Ok::<(), ()>(())
1371 });
1372 ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1373
1374
1375 // test that flushing when not waiting on read doesn't unpark
1376 let f = future::lazy(|| {
1377 let io = AsyncIo::new_buf(vec![], 4096);
1378 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1379 conn.state.writing = Writing::KeepAlive;
1380 assert!(conn.poll_complete().unwrap().is_ready());
1381 Ok::<(), ()>(())
1382 });
1383 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1384
1385
1386 // test that flushing and writing isn't done doesn't unpark
1387 let f = future::lazy(|| {
1388 let io = AsyncIo::new_buf(vec![], 4096);
1389 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1390 conn.state.reading = Reading::KeepAlive;
1391 assert!(conn.poll().unwrap().is_not_ready());
1392 conn.state.writing = Writing::Body(Encoder::length(5_000));
1393 assert!(conn.poll_complete().unwrap().is_ready());
1394 Ok::<(), ()>(())
1395 });
1396 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1397 }
1398
1399 #[test]
1400 fn test_conn_closed_write() {
1401 let io = AsyncIo::new_buf(vec![], 0);
1402 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1403 conn.state.close();
1404
1405 match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1406 Err(_e) => {},
1407 other => panic!("did not return Err: {:?}", other)
1408 }
1409
1410 assert!(conn.state.is_write_closed());
1411 }
1412
1413 #[test]
1414 fn test_conn_write_empty_chunk() {
1415 let io = AsyncIo::new_buf(vec![], 0);
1416 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1417 conn.state.writing = Writing::KeepAlive;
1418
1419 assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1420 assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1421 conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1422 }
1423 */
1424}
1425