1use std::fmt;
2use std::io;
3use std::marker::PhantomData;
4#[cfg(all(feature = "server", feature = "runtime"))]
5use std::time::Duration;
6
7use bytes::{Buf, Bytes};
8use http::header::{HeaderValue, CONNECTION};
9use http::{HeaderMap, Method, Version};
10use httparse::ParserConfig;
11use tokio::io::{AsyncRead, AsyncWrite};
12#[cfg(all(feature = "server", feature = "runtime"))]
13use tokio::time::Sleep;
14use tracing::{debug, error, trace};
15
16use super::io::Buffered;
17use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
18use crate::body::DecodedLength;
19use crate::common::{task, Pin, Poll, Unpin};
20use crate::headers::connection_keep_alive;
21use crate::proto::{BodyLength, MessageHead};
22
23const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
24
25/// This handles a connection, which will have been established over an
26/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
27/// `Transaction`s over HTTP.
28///
29/// The connection will determine when a message begins and ends as well as
30/// determine if this connection can be kept alive after the message,
31/// or if it is complete.
32pub(crate) struct Conn<I, B, T> {
33 io: Buffered<I, EncodedBuf<B>>,
34 state: State,
35 _marker: PhantomData<fn(T)>,
36}
37
38impl<I, B, T> Conn<I, B, T>
39where
40 I: AsyncRead + AsyncWrite + Unpin,
41 B: Buf,
42 T: Http1Transaction,
43{
44 pub(crate) fn new(io: I) -> Conn<I, B, T> {
45 Conn {
46 io: Buffered::new(io),
47 state: State {
48 allow_half_close: false,
49 cached_headers: None,
50 error: None,
51 keep_alive: KA::Busy,
52 method: None,
53 h1_parser_config: ParserConfig::default(),
54 #[cfg(all(feature = "server", feature = "runtime"))]
55 h1_header_read_timeout: None,
56 #[cfg(all(feature = "server", feature = "runtime"))]
57 h1_header_read_timeout_fut: None,
58 #[cfg(all(feature = "server", feature = "runtime"))]
59 h1_header_read_timeout_running: false,
60 preserve_header_case: false,
61 #[cfg(feature = "ffi")]
62 preserve_header_order: false,
63 title_case_headers: false,
64 h09_responses: false,
65 #[cfg(feature = "ffi")]
66 on_informational: None,
67 #[cfg(feature = "ffi")]
68 raw_headers: false,
69 notify_read: false,
70 reading: Reading::Init,
71 writing: Writing::Init,
72 upgrade: None,
73 // We assume a modern world where the remote speaks HTTP/1.1.
74 // If they tell us otherwise, we'll downgrade in `read_head`.
75 version: Version::HTTP_11,
76 },
77 _marker: PhantomData,
78 }
79 }
80
81 #[cfg(feature = "server")]
82 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
83 self.io.set_flush_pipeline(enabled);
84 }
85
86 pub(crate) fn set_write_strategy_queue(&mut self) {
87 self.io.set_write_strategy_queue();
88 }
89
90 pub(crate) fn set_max_buf_size(&mut self, max: usize) {
91 self.io.set_max_buf_size(max);
92 }
93
94 #[cfg(feature = "client")]
95 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
96 self.io.set_read_buf_exact_size(sz);
97 }
98
99 pub(crate) fn set_write_strategy_flatten(&mut self) {
100 self.io.set_write_strategy_flatten();
101 }
102
103 #[cfg(feature = "client")]
104 pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
105 self.state.h1_parser_config = parser_config;
106 }
107
108 pub(crate) fn set_title_case_headers(&mut self) {
109 self.state.title_case_headers = true;
110 }
111
112 pub(crate) fn set_preserve_header_case(&mut self) {
113 self.state.preserve_header_case = true;
114 }
115
116 #[cfg(feature = "ffi")]
117 pub(crate) fn set_preserve_header_order(&mut self) {
118 self.state.preserve_header_order = true;
119 }
120
121 #[cfg(feature = "client")]
122 pub(crate) fn set_h09_responses(&mut self) {
123 self.state.h09_responses = true;
124 }
125
126 #[cfg(all(feature = "server", feature = "runtime"))]
127 pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
128 self.state.h1_header_read_timeout = Some(val);
129 }
130
131 #[cfg(feature = "server")]
132 pub(crate) fn set_allow_half_close(&mut self) {
133 self.state.allow_half_close = true;
134 }
135
136 #[cfg(feature = "ffi")]
137 pub(crate) fn set_raw_headers(&mut self, enabled: bool) {
138 self.state.raw_headers = enabled;
139 }
140
141 pub(crate) fn into_inner(self) -> (I, Bytes) {
142 self.io.into_inner()
143 }
144
145 pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
146 self.state.upgrade.take()
147 }
148
149 pub(crate) fn is_read_closed(&self) -> bool {
150 self.state.is_read_closed()
151 }
152
153 pub(crate) fn is_write_closed(&self) -> bool {
154 self.state.is_write_closed()
155 }
156
157 pub(crate) fn can_read_head(&self) -> bool {
158 if !matches!(self.state.reading, Reading::Init) {
159 return false;
160 }
161
162 if T::should_read_first() {
163 return true;
164 }
165
166 !matches!(self.state.writing, Writing::Init)
167 }
168
169 pub(crate) fn can_read_body(&self) -> bool {
170 match self.state.reading {
171 Reading::Body(..) | Reading::Continue(..) => true,
172 _ => false,
173 }
174 }
175
176 fn should_error_on_eof(&self) -> bool {
177 // If we're idle, it's probably just the connection closing gracefully.
178 T::should_error_on_parse_eof() && !self.state.is_idle()
179 }
180
181 fn has_h2_prefix(&self) -> bool {
182 let read_buf = self.io.read_buf();
183 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
184 }
185
186 pub(super) fn poll_read_head(
187 &mut self,
188 cx: &mut task::Context<'_>,
189 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
190 debug_assert!(self.can_read_head());
191 trace!("Conn::read_head");
192
193 let msg = match ready!(self.io.parse::<T>(
194 cx,
195 ParseContext {
196 cached_headers: &mut self.state.cached_headers,
197 req_method: &mut self.state.method,
198 h1_parser_config: self.state.h1_parser_config.clone(),
199 #[cfg(all(feature = "server", feature = "runtime"))]
200 h1_header_read_timeout: self.state.h1_header_read_timeout,
201 #[cfg(all(feature = "server", feature = "runtime"))]
202 h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut,
203 #[cfg(all(feature = "server", feature = "runtime"))]
204 h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running,
205 preserve_header_case: self.state.preserve_header_case,
206 #[cfg(feature = "ffi")]
207 preserve_header_order: self.state.preserve_header_order,
208 h09_responses: self.state.h09_responses,
209 #[cfg(feature = "ffi")]
210 on_informational: &mut self.state.on_informational,
211 #[cfg(feature = "ffi")]
212 raw_headers: self.state.raw_headers,
213 }
214 )) {
215 Ok(msg) => msg,
216 Err(e) => return self.on_read_head_error(e),
217 };
218
219 // Note: don't deconstruct `msg` into local variables, it appears
220 // the optimizer doesn't remove the extra copies.
221
222 debug!("incoming body is {}", msg.decode);
223
224 // Prevent accepting HTTP/0.9 responses after the initial one, if any.
225 self.state.h09_responses = false;
226
227 // Drop any OnInformational callbacks, we're done there!
228 #[cfg(feature = "ffi")]
229 {
230 self.state.on_informational = None;
231 }
232
233 self.state.busy();
234 self.state.keep_alive &= msg.keep_alive;
235 self.state.version = msg.head.version;
236
237 let mut wants = if msg.wants_upgrade {
238 Wants::UPGRADE
239 } else {
240 Wants::EMPTY
241 };
242
243 if msg.decode == DecodedLength::ZERO {
244 if msg.expect_continue {
245 debug!("ignoring expect-continue since body is empty");
246 }
247 self.state.reading = Reading::KeepAlive;
248 if !T::should_read_first() {
249 self.try_keep_alive(cx);
250 }
251 } else if msg.expect_continue {
252 self.state.reading = Reading::Continue(Decoder::new(msg.decode));
253 wants = wants.add(Wants::EXPECT);
254 } else {
255 self.state.reading = Reading::Body(Decoder::new(msg.decode));
256 }
257
258 Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
259 }
260
261 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
262 // If we are currently waiting on a message, then an empty
263 // message should be reported as an error. If not, it is just
264 // the connection closing gracefully.
265 let must_error = self.should_error_on_eof();
266 self.close_read();
267 self.io.consume_leading_lines();
268 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
269 if was_mid_parse || must_error {
270 // We check if the buf contains the h2 Preface
271 debug!(
272 "parse error ({}) with {} bytes",
273 e,
274 self.io.read_buf().len()
275 );
276 match self.on_parse_error(e) {
277 Ok(()) => Poll::Pending, // XXX: wat?
278 Err(e) => Poll::Ready(Some(Err(e))),
279 }
280 } else {
281 debug!("read eof");
282 self.close_write();
283 Poll::Ready(None)
284 }
285 }
286
287 pub(crate) fn poll_read_body(
288 &mut self,
289 cx: &mut task::Context<'_>,
290 ) -> Poll<Option<io::Result<Bytes>>> {
291 debug_assert!(self.can_read_body());
292
293 let (reading, ret) = match self.state.reading {
294 Reading::Body(ref mut decoder) => {
295 match ready!(decoder.decode(cx, &mut self.io)) {
296 Ok(slice) => {
297 let (reading, chunk) = if decoder.is_eof() {
298 debug!("incoming body completed");
299 (
300 Reading::KeepAlive,
301 if !slice.is_empty() {
302 Some(Ok(slice))
303 } else {
304 None
305 },
306 )
307 } else if slice.is_empty() {
308 error!("incoming body unexpectedly ended");
309 // This should be unreachable, since all 3 decoders
310 // either set eof=true or return an Err when reading
311 // an empty slice...
312 (Reading::Closed, None)
313 } else {
314 return Poll::Ready(Some(Ok(slice)));
315 };
316 (reading, Poll::Ready(chunk))
317 }
318 Err(e) => {
319 debug!("incoming body decode error: {}", e);
320 (Reading::Closed, Poll::Ready(Some(Err(e))))
321 }
322 }
323 }
324 Reading::Continue(ref decoder) => {
325 // Write the 100 Continue if not already responded...
326 if let Writing::Init = self.state.writing {
327 trace!("automatically sending 100 Continue");
328 let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
329 self.io.headers_buf().extend_from_slice(cont);
330 }
331
332 // And now recurse once in the Reading::Body state...
333 self.state.reading = Reading::Body(decoder.clone());
334 return self.poll_read_body(cx);
335 }
336 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
337 };
338
339 self.state.reading = reading;
340 self.try_keep_alive(cx);
341 ret
342 }
343
344 pub(crate) fn wants_read_again(&mut self) -> bool {
345 let ret = self.state.notify_read;
346 self.state.notify_read = false;
347 ret
348 }
349
350 pub(crate) fn poll_read_keep_alive(
351 &mut self,
352 cx: &mut task::Context<'_>,
353 ) -> Poll<crate::Result<()>> {
354 debug_assert!(!self.can_read_head() && !self.can_read_body());
355
356 if self.is_read_closed() {
357 Poll::Pending
358 } else if self.is_mid_message() {
359 self.mid_message_detect_eof(cx)
360 } else {
361 self.require_empty_read(cx)
362 }
363 }
364
365 fn is_mid_message(&self) -> bool {
366 !matches!(
367 (&self.state.reading, &self.state.writing),
368 (&Reading::Init, &Writing::Init)
369 )
370 }
371
372 // This will check to make sure the io object read is empty.
373 //
374 // This should only be called for Clients wanting to enter the idle
375 // state.
376 fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
377 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
378 debug_assert!(!self.is_mid_message());
379 debug_assert!(T::is_client());
380
381 if !self.io.read_buf().is_empty() {
382 debug!("received an unexpected {} bytes", self.io.read_buf().len());
383 return Poll::Ready(Err(crate::Error::new_unexpected_message()));
384 }
385
386 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
387
388 if num_read == 0 {
389 let ret = if self.should_error_on_eof() {
390 trace!("found unexpected EOF on busy connection: {:?}", self.state);
391 Poll::Ready(Err(crate::Error::new_incomplete()))
392 } else {
393 trace!("found EOF on idle connection, closing");
394 Poll::Ready(Ok(()))
395 };
396
397 // order is important: should_error needs state BEFORE close_read
398 self.state.close_read();
399 return ret;
400 }
401
402 debug!(
403 "received unexpected {} bytes on an idle connection",
404 num_read
405 );
406 Poll::Ready(Err(crate::Error::new_unexpected_message()))
407 }
408
409 fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
410 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
411 debug_assert!(self.is_mid_message());
412
413 if self.state.allow_half_close || !self.io.read_buf().is_empty() {
414 return Poll::Pending;
415 }
416
417 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
418
419 if num_read == 0 {
420 trace!("found unexpected EOF on busy connection: {:?}", self.state);
421 self.state.close_read();
422 Poll::Ready(Err(crate::Error::new_incomplete()))
423 } else {
424 Poll::Ready(Ok(()))
425 }
426 }
427
428 fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
429 debug_assert!(!self.state.is_read_closed());
430
431 let result = ready!(self.io.poll_read_from_io(cx));
432 Poll::Ready(result.map_err(|e| {
433 trace!("force_io_read; io error = {:?}", e);
434 self.state.close();
435 e
436 }))
437 }
438
439 fn maybe_notify(&mut self, cx: &mut task::Context<'_>) {
440 // its possible that we returned NotReady from poll() without having
441 // exhausted the underlying Io. We would have done this when we
442 // determined we couldn't keep reading until we knew how writing
443 // would finish.
444
445 match self.state.reading {
446 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
447 return
448 }
449 Reading::Init => (),
450 };
451
452 match self.state.writing {
453 Writing::Body(..) => return,
454 Writing::Init | Writing::KeepAlive | Writing::Closed => (),
455 }
456
457 if !self.io.is_read_blocked() {
458 if self.io.read_buf().is_empty() {
459 match self.io.poll_read_from_io(cx) {
460 Poll::Ready(Ok(n)) => {
461 if n == 0 {
462 trace!("maybe_notify; read eof");
463 if self.state.is_idle() {
464 self.state.close();
465 } else {
466 self.close_read()
467 }
468 return;
469 }
470 }
471 Poll::Pending => {
472 trace!("maybe_notify; read_from_io blocked");
473 return;
474 }
475 Poll::Ready(Err(e)) => {
476 trace!("maybe_notify; read_from_io error: {}", e);
477 self.state.close();
478 self.state.error = Some(crate::Error::new_io(e));
479 }
480 }
481 }
482 self.state.notify_read = true;
483 }
484 }
485
486 fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) {
487 self.state.try_keep_alive::<T>();
488 self.maybe_notify(cx);
489 }
490
491 pub(crate) fn can_write_head(&self) -> bool {
492 if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
493 return false;
494 }
495
496 match self.state.writing {
497 Writing::Init => self.io.can_headers_buf(),
498 _ => false,
499 }
500 }
501
502 pub(crate) fn can_write_body(&self) -> bool {
503 match self.state.writing {
504 Writing::Body(..) => true,
505 Writing::Init | Writing::KeepAlive | Writing::Closed => false,
506 }
507 }
508
509 pub(crate) fn can_buffer_body(&self) -> bool {
510 self.io.can_buffer()
511 }
512
513 pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
514 if let Some(encoder) = self.encode_head(head, body) {
515 self.state.writing = if !encoder.is_eof() {
516 Writing::Body(encoder)
517 } else if encoder.is_last() {
518 Writing::Closed
519 } else {
520 Writing::KeepAlive
521 };
522 }
523 }
524
525 pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
526 if let Some(encoder) =
527 self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
528 {
529 let is_last = encoder.is_last();
530 // Make sure we don't write a body if we weren't actually allowed
531 // to do so, like because its a HEAD request.
532 if !encoder.is_eof() {
533 encoder.danger_full_buf(body, self.io.write_buf());
534 }
535 self.state.writing = if is_last {
536 Writing::Closed
537 } else {
538 Writing::KeepAlive
539 }
540 }
541 }
542
543 fn encode_head(
544 &mut self,
545 mut head: MessageHead<T::Outgoing>,
546 body: Option<BodyLength>,
547 ) -> Option<Encoder> {
548 debug_assert!(self.can_write_head());
549
550 if !T::should_read_first() {
551 self.state.busy();
552 }
553
554 self.enforce_version(&mut head);
555
556 let buf = self.io.headers_buf();
557 match super::role::encode_headers::<T>(
558 Encode {
559 head: &mut head,
560 body,
561 #[cfg(feature = "server")]
562 keep_alive: self.state.wants_keep_alive(),
563 req_method: &mut self.state.method,
564 title_case_headers: self.state.title_case_headers,
565 },
566 buf,
567 ) {
568 Ok(encoder) => {
569 debug_assert!(self.state.cached_headers.is_none());
570 debug_assert!(head.headers.is_empty());
571 self.state.cached_headers = Some(head.headers);
572
573 #[cfg(feature = "ffi")]
574 {
575 self.state.on_informational =
576 head.extensions.remove::<crate::ffi::OnInformational>();
577 }
578
579 Some(encoder)
580 }
581 Err(err) => {
582 self.state.error = Some(err);
583 self.state.writing = Writing::Closed;
584 None
585 }
586 }
587 }
588
589 // Fix keep-alive when Connection: keep-alive header is not present
590 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
591 let outgoing_is_keep_alive = head
592 .headers
593 .get(CONNECTION)
594 .map(connection_keep_alive)
595 .unwrap_or(false);
596
597 if !outgoing_is_keep_alive {
598 match head.version {
599 // If response is version 1.0 and keep-alive is not present in the response,
600 // disable keep-alive so the server closes the connection
601 Version::HTTP_10 => self.state.disable_keep_alive(),
602 // If response is version 1.1 and keep-alive is wanted, add
603 // Connection: keep-alive header when not present
604 Version::HTTP_11 => {
605 if self.state.wants_keep_alive() {
606 head.headers
607 .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
608 }
609 }
610 _ => (),
611 }
612 }
613 }
614
615 // If we know the remote speaks an older version, we try to fix up any messages
616 // to work with our older peer.
617 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
618 if let Version::HTTP_10 = self.state.version {
619 // Fixes response or connection when keep-alive header is not present
620 self.fix_keep_alive(head);
621 // If the remote only knows HTTP/1.0, we should force ourselves
622 // to do only speak HTTP/1.0 as well.
623 head.version = Version::HTTP_10;
624 }
625 // If the remote speaks HTTP/1.1, then it *should* be fine with
626 // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
627 // the user's headers be.
628 }
629
630 pub(crate) fn write_body(&mut self, chunk: B) {
631 debug_assert!(self.can_write_body() && self.can_buffer_body());
632 // empty chunks should be discarded at Dispatcher level
633 debug_assert!(chunk.remaining() != 0);
634
635 let state = match self.state.writing {
636 Writing::Body(ref mut encoder) => {
637 self.io.buffer(encoder.encode(chunk));
638
639 if !encoder.is_eof() {
640 return;
641 }
642
643 if encoder.is_last() {
644 Writing::Closed
645 } else {
646 Writing::KeepAlive
647 }
648 }
649 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
650 };
651
652 self.state.writing = state;
653 }
654
655 pub(crate) fn write_body_and_end(&mut self, chunk: B) {
656 debug_assert!(self.can_write_body() && self.can_buffer_body());
657 // empty chunks should be discarded at Dispatcher level
658 debug_assert!(chunk.remaining() != 0);
659
660 let state = match self.state.writing {
661 Writing::Body(ref encoder) => {
662 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
663 if can_keep_alive {
664 Writing::KeepAlive
665 } else {
666 Writing::Closed
667 }
668 }
669 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
670 };
671
672 self.state.writing = state;
673 }
674
675 pub(crate) fn end_body(&mut self) -> crate::Result<()> {
676 debug_assert!(self.can_write_body());
677
678 let encoder = match self.state.writing {
679 Writing::Body(ref mut enc) => enc,
680 _ => return Ok(()),
681 };
682
683 // end of stream, that means we should try to eof
684 match encoder.end() {
685 Ok(end) => {
686 if let Some(end) = end {
687 self.io.buffer(end);
688 }
689
690 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
691 Writing::Closed
692 } else {
693 Writing::KeepAlive
694 };
695
696 Ok(())
697 }
698 Err(not_eof) => {
699 self.state.writing = Writing::Closed;
700 Err(crate::Error::new_body_write_aborted().with(not_eof))
701 }
702 }
703 }
704
705 // When we get a parse error, depending on what side we are, we might be able
706 // to write a response before closing the connection.
707 //
708 // - Client: there is nothing we can do
709 // - Server: if Response hasn't been written yet, we can send a 4xx response
710 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
711 if let Writing::Init = self.state.writing {
712 if self.has_h2_prefix() {
713 return Err(crate::Error::new_version_h2());
714 }
715 if let Some(msg) = T::on_error(&err) {
716 // Drop the cached headers so as to not trigger a debug
717 // assert in `write_head`...
718 self.state.cached_headers.take();
719 self.write_head(msg, None);
720 self.state.error = Some(err);
721 return Ok(());
722 }
723 }
724
725 // fallback is pass the error back up
726 Err(err)
727 }
728
729 pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
730 ready!(Pin::new(&mut self.io).poll_flush(cx))?;
731 self.try_keep_alive(cx);
732 trace!("flushed({}): {:?}", T::LOG, self.state);
733 Poll::Ready(Ok(()))
734 }
735
736 pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
737 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
738 Ok(()) => {
739 trace!("shut down IO complete");
740 Poll::Ready(Ok(()))
741 }
742 Err(e) => {
743 debug!("error shutting down IO: {}", e);
744 Poll::Ready(Err(e))
745 }
746 }
747 }
748
749 /// If the read side can be cheaply drained, do so. Otherwise, close.
750 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
751 let _ = self.poll_read_body(cx);
752
753 // If still in Reading::Body, just give up
754 match self.state.reading {
755 Reading::Init | Reading::KeepAlive => trace!("body drained"),
756 _ => self.close_read(),
757 }
758 }
759
760 pub(crate) fn close_read(&mut self) {
761 self.state.close_read();
762 }
763
764 pub(crate) fn close_write(&mut self) {
765 self.state.close_write();
766 }
767
768 #[cfg(feature = "server")]
769 pub(crate) fn disable_keep_alive(&mut self) {
770 if self.state.is_idle() {
771 trace!("disable_keep_alive; closing idle connection");
772 self.state.close();
773 } else {
774 trace!("disable_keep_alive; in-progress connection");
775 self.state.disable_keep_alive();
776 }
777 }
778
779 pub(crate) fn take_error(&mut self) -> crate::Result<()> {
780 if let Some(err) = self.state.error.take() {
781 Err(err)
782 } else {
783 Ok(())
784 }
785 }
786
787 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
788 trace!("{}: prepare possible HTTP upgrade", T::LOG);
789 self.state.prepare_upgrade()
790 }
791}
792
793impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
794 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
795 f&mut DebugStruct<'_, '_>.debug_struct("Conn")
796 .field("state", &self.state)
797 .field(name:"io", &self.io)
798 .finish()
799 }
800}
801
802// B and T are never pinned
803impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
804
805struct State {
806 allow_half_close: bool,
807 /// Re-usable HeaderMap to reduce allocating new ones.
808 cached_headers: Option<HeaderMap>,
809 /// If an error occurs when there wasn't a direct way to return it
810 /// back to the user, this is set.
811 error: Option<crate::Error>,
812 /// Current keep-alive status.
813 keep_alive: KA,
814 /// If mid-message, the HTTP Method that started it.
815 ///
816 /// This is used to know things such as if the message can include
817 /// a body or not.
818 method: Option<Method>,
819 h1_parser_config: ParserConfig,
820 #[cfg(all(feature = "server", feature = "runtime"))]
821 h1_header_read_timeout: Option<Duration>,
822 #[cfg(all(feature = "server", feature = "runtime"))]
823 h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>,
824 #[cfg(all(feature = "server", feature = "runtime"))]
825 h1_header_read_timeout_running: bool,
826 preserve_header_case: bool,
827 #[cfg(feature = "ffi")]
828 preserve_header_order: bool,
829 title_case_headers: bool,
830 h09_responses: bool,
831 /// If set, called with each 1xx informational response received for
832 /// the current request. MUST be unset after a non-1xx response is
833 /// received.
834 #[cfg(feature = "ffi")]
835 on_informational: Option<crate::ffi::OnInformational>,
836 #[cfg(feature = "ffi")]
837 raw_headers: bool,
838 /// Set to true when the Dispatcher should poll read operations
839 /// again. See the `maybe_notify` method for more.
840 notify_read: bool,
841 /// State of allowed reads
842 reading: Reading,
843 /// State of allowed writes
844 writing: Writing,
845 /// An expected pending HTTP upgrade.
846 upgrade: Option<crate::upgrade::Pending>,
847 /// Either HTTP/1.0 or 1.1 connection
848 version: Version,
849}
850
851#[derive(Debug)]
852enum Reading {
853 Init,
854 Continue(Decoder),
855 Body(Decoder),
856 KeepAlive,
857 Closed,
858}
859
860enum Writing {
861 Init,
862 Body(Encoder),
863 KeepAlive,
864 Closed,
865}
866
867impl fmt::Debug for State {
868 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869 let mut builder: DebugStruct<'_, '_> = f.debug_struct(name:"State");
870 builder
871 .field("reading", &self.reading)
872 .field("writing", &self.writing)
873 .field(name:"keep_alive", &self.keep_alive);
874
875 // Only show error field if it's interesting...
876 if let Some(ref error: &Error) = self.error {
877 builder.field(name:"error", value:error);
878 }
879
880 if self.allow_half_close {
881 builder.field(name:"allow_half_close", &true);
882 }
883
884 // Purposefully leaving off other fields..
885
886 builder.finish()
887 }
888}
889
890impl fmt::Debug for Writing {
891 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
892 match *self {
893 Writing::Init => f.write_str(data:"Init"),
894 Writing::Body(ref enc: &Encoder) => f.debug_tuple(name:"Body").field(enc).finish(),
895 Writing::KeepAlive => f.write_str(data:"KeepAlive"),
896 Writing::Closed => f.write_str(data:"Closed"),
897 }
898 }
899}
900
901impl std::ops::BitAndAssign<bool> for KA {
902 fn bitand_assign(&mut self, enabled: bool) {
903 if !enabled {
904 trace!("remote disabling keep-alive");
905 *self = KA::Disabled;
906 }
907 }
908}
909
910#[derive(Clone, Copy, Debug)]
911enum KA {
912 Idle,
913 Busy,
914 Disabled,
915}
916
917impl Default for KA {
918 fn default() -> KA {
919 KA::Busy
920 }
921}
922
923impl KA {
924 fn idle(&mut self) {
925 *self = KA::Idle;
926 }
927
928 fn busy(&mut self) {
929 *self = KA::Busy;
930 }
931
932 fn disable(&mut self) {
933 *self = KA::Disabled;
934 }
935
936 fn status(&self) -> KA {
937 *self
938 }
939}
940
941impl State {
942 fn close(&mut self) {
943 trace!("State::close()");
944 self.reading = Reading::Closed;
945 self.writing = Writing::Closed;
946 self.keep_alive.disable();
947 }
948
949 fn close_read(&mut self) {
950 trace!("State::close_read()");
951 self.reading = Reading::Closed;
952 self.keep_alive.disable();
953 }
954
955 fn close_write(&mut self) {
956 trace!("State::close_write()");
957 self.writing = Writing::Closed;
958 self.keep_alive.disable();
959 }
960
961 fn wants_keep_alive(&self) -> bool {
962 if let KA::Disabled = self.keep_alive.status() {
963 false
964 } else {
965 true
966 }
967 }
968
969 fn try_keep_alive<T: Http1Transaction>(&mut self) {
970 match (&self.reading, &self.writing) {
971 (&Reading::KeepAlive, &Writing::KeepAlive) => {
972 if let KA::Busy = self.keep_alive.status() {
973 self.idle::<T>();
974 } else {
975 trace!(
976 "try_keep_alive({}): could keep-alive, but status = {:?}",
977 T::LOG,
978 self.keep_alive
979 );
980 self.close();
981 }
982 }
983 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
984 self.close()
985 }
986 _ => (),
987 }
988 }
989
990 fn disable_keep_alive(&mut self) {
991 self.keep_alive.disable()
992 }
993
994 fn busy(&mut self) {
995 if let KA::Disabled = self.keep_alive.status() {
996 return;
997 }
998 self.keep_alive.busy();
999 }
1000
1001 fn idle<T: Http1Transaction>(&mut self) {
1002 debug_assert!(!self.is_idle(), "State::idle() called while idle");
1003
1004 self.method = None;
1005 self.keep_alive.idle();
1006
1007 if !self.is_idle() {
1008 self.close();
1009 return;
1010 }
1011
1012 self.reading = Reading::Init;
1013 self.writing = Writing::Init;
1014
1015 // !T::should_read_first() means Client.
1016 //
1017 // If Client connection has just gone idle, the Dispatcher
1018 // should try the poll loop one more time, so as to poll the
1019 // pending requests stream.
1020 if !T::should_read_first() {
1021 self.notify_read = true;
1022 }
1023 }
1024
1025 fn is_idle(&self) -> bool {
1026 matches!(self.keep_alive.status(), KA::Idle)
1027 }
1028
1029 fn is_read_closed(&self) -> bool {
1030 matches!(self.reading, Reading::Closed)
1031 }
1032
1033 fn is_write_closed(&self) -> bool {
1034 matches!(self.writing, Writing::Closed)
1035 }
1036
1037 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1038 let (tx, rx) = crate::upgrade::pending();
1039 self.upgrade = Some(tx);
1040 rx
1041 }
1042}
1043
1044#[cfg(test)]
1045mod tests {
1046 #[cfg(feature = "nightly")]
1047 #[bench]
1048 fn bench_read_head_short(b: &mut ::test::Bencher) {
1049 use super::*;
1050 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1051 let len = s.len();
1052 b.bytes = len as u64;
1053
1054 // an empty IO, we'll be skipping and using the read buffer anyways
1055 let io = tokio_test::io::Builder::new().build();
1056 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1057 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1058 conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1059
1060 let rt = tokio::runtime::Builder::new_current_thread()
1061 .enable_all()
1062 .build()
1063 .unwrap();
1064
1065 b.iter(|| {
1066 rt.block_on(futures_util::future::poll_fn(|cx| {
1067 match conn.poll_read_head(cx) {
1068 Poll::Ready(Some(Ok(x))) => {
1069 ::test::black_box(&x);
1070 let mut headers = x.0.headers;
1071 headers.clear();
1072 conn.state.cached_headers = Some(headers);
1073 }
1074 f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1075 }
1076
1077 conn.io.read_buf_mut().reserve(1);
1078 unsafe {
1079 conn.io.read_buf_mut().set_len(len);
1080 }
1081 conn.state.reading = Reading::Init;
1082 Poll::Ready(())
1083 }));
1084 });
1085 }
1086
1087 /*
1088 //TODO: rewrite these using dispatch... someday...
1089 use futures::{Async, Future, Stream, Sink};
1090 use futures::future;
1091
1092 use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1093 use super::super::Encoder;
1094 use mock::AsyncIo;
1095
1096 use super::{Conn, Decoder, Reading, Writing};
1097 use ::uri::Uri;
1098
1099 use std::str::FromStr;
1100
1101 #[test]
1102 fn test_conn_init_read() {
1103 let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1104 let len = good_message.len();
1105 let io = AsyncIo::new_buf(good_message, len);
1106 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1107
1108 match conn.poll().unwrap() {
1109 Async::Ready(Some(Frame::Message { message, body: false })) => {
1110 assert_eq!(message, MessageHead {
1111 subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1112 .. MessageHead::default()
1113 })
1114 },
1115 f => panic!("frame is not Frame::Message: {:?}", f)
1116 }
1117 }
1118
1119 #[test]
1120 fn test_conn_parse_partial() {
1121 let _: Result<(), ()> = future::lazy(|| {
1122 let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1123 let io = AsyncIo::new_buf(good_message, 10);
1124 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1125 assert!(conn.poll().unwrap().is_not_ready());
1126 conn.io.io_mut().block_in(50);
1127 let async = conn.poll().unwrap();
1128 assert!(async.is_ready());
1129 match async {
1130 Async::Ready(Some(Frame::Message { .. })) => (),
1131 f => panic!("frame is not Message: {:?}", f),
1132 }
1133 Ok(())
1134 }).wait();
1135 }
1136
1137 #[test]
1138 fn test_conn_init_read_eof_idle() {
1139 let io = AsyncIo::new_buf(vec![], 1);
1140 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1141 conn.state.idle();
1142
1143 match conn.poll().unwrap() {
1144 Async::Ready(None) => {},
1145 other => panic!("frame is not None: {:?}", other)
1146 }
1147 }
1148
1149 #[test]
1150 fn test_conn_init_read_eof_idle_partial_parse() {
1151 let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1152 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1153 conn.state.idle();
1154
1155 match conn.poll() {
1156 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1157 other => panic!("unexpected frame: {:?}", other)
1158 }
1159 }
1160
1161 #[test]
1162 fn test_conn_init_read_eof_busy() {
1163 let _: Result<(), ()> = future::lazy(|| {
1164 // server ignores
1165 let io = AsyncIo::new_eof();
1166 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1167 conn.state.busy();
1168
1169 match conn.poll().unwrap() {
1170 Async::Ready(None) => {},
1171 other => panic!("unexpected frame: {:?}", other)
1172 }
1173
1174 // client
1175 let io = AsyncIo::new_eof();
1176 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1177 conn.state.busy();
1178
1179 match conn.poll() {
1180 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1181 other => panic!("unexpected frame: {:?}", other)
1182 }
1183 Ok(())
1184 }).wait();
1185 }
1186
1187 #[test]
1188 fn test_conn_body_finish_read_eof() {
1189 let _: Result<(), ()> = future::lazy(|| {
1190 let io = AsyncIo::new_eof();
1191 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1192 conn.state.busy();
1193 conn.state.writing = Writing::KeepAlive;
1194 conn.state.reading = Reading::Body(Decoder::length(0));
1195
1196 match conn.poll() {
1197 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1198 other => panic!("unexpected frame: {:?}", other)
1199 }
1200
1201 // conn eofs, but tokio-proto will call poll() again, before calling flush()
1202 // the conn eof in this case is perfectly fine
1203
1204 match conn.poll() {
1205 Ok(Async::Ready(None)) => (),
1206 other => panic!("unexpected frame: {:?}", other)
1207 }
1208 Ok(())
1209 }).wait();
1210 }
1211
1212 #[test]
1213 fn test_conn_message_empty_body_read_eof() {
1214 let _: Result<(), ()> = future::lazy(|| {
1215 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1216 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1217 conn.state.busy();
1218 conn.state.writing = Writing::KeepAlive;
1219
1220 match conn.poll() {
1221 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1222 other => panic!("unexpected frame: {:?}", other)
1223 }
1224
1225 // conn eofs, but tokio-proto will call poll() again, before calling flush()
1226 // the conn eof in this case is perfectly fine
1227
1228 match conn.poll() {
1229 Ok(Async::Ready(None)) => (),
1230 other => panic!("unexpected frame: {:?}", other)
1231 }
1232 Ok(())
1233 }).wait();
1234 }
1235
1236 #[test]
1237 fn test_conn_read_body_end() {
1238 let _: Result<(), ()> = future::lazy(|| {
1239 let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1240 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1241 conn.state.busy();
1242
1243 match conn.poll() {
1244 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1245 other => panic!("unexpected frame: {:?}", other)
1246 }
1247
1248 match conn.poll() {
1249 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1250 other => panic!("unexpected frame: {:?}", other)
1251 }
1252
1253 // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1254 match conn.poll() {
1255 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1256 other => panic!("unexpected frame: {:?}", other)
1257 }
1258
1259 match conn.poll() {
1260 Ok(Async::NotReady) => (),
1261 other => panic!("unexpected frame: {:?}", other)
1262 }
1263 Ok(())
1264 }).wait();
1265 }
1266
1267 #[test]
1268 fn test_conn_closed_read() {
1269 let io = AsyncIo::new_buf(vec![], 0);
1270 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1271 conn.state.close();
1272
1273 match conn.poll().unwrap() {
1274 Async::Ready(None) => {},
1275 other => panic!("frame is not None: {:?}", other)
1276 }
1277 }
1278
1279 #[test]
1280 fn test_conn_body_write_length() {
1281 let _ = pretty_env_logger::try_init();
1282 let _: Result<(), ()> = future::lazy(|| {
1283 let io = AsyncIo::new_buf(vec![], 0);
1284 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1285 let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1286 conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1287
1288 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1289 assert!(!conn.can_buffer_body());
1290
1291 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1292
1293 conn.io.io_mut().block_in(1024 * 3);
1294 assert!(conn.poll_complete().unwrap().is_not_ready());
1295 conn.io.io_mut().block_in(1024 * 3);
1296 assert!(conn.poll_complete().unwrap().is_not_ready());
1297 conn.io.io_mut().block_in(max * 2);
1298 assert!(conn.poll_complete().unwrap().is_ready());
1299
1300 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1301 Ok(())
1302 }).wait();
1303 }
1304
1305 #[test]
1306 fn test_conn_body_write_chunked() {
1307 let _: Result<(), ()> = future::lazy(|| {
1308 let io = AsyncIo::new_buf(vec![], 4096);
1309 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1310 conn.state.writing = Writing::Body(Encoder::chunked());
1311
1312 assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1313 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1314 Ok(())
1315 }).wait();
1316 }
1317
1318 #[test]
1319 fn test_conn_body_flush() {
1320 let _: Result<(), ()> = future::lazy(|| {
1321 let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1322 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1323 conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1324 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1325 assert!(!conn.can_buffer_body());
1326 conn.io.io_mut().block_in(1024 * 1024 * 5);
1327 assert!(conn.poll_complete().unwrap().is_ready());
1328 assert!(conn.can_buffer_body());
1329 assert!(conn.io.io_mut().flushed());
1330
1331 Ok(())
1332 }).wait();
1333 }
1334
1335 #[test]
1336 fn test_conn_parking() {
1337 use std::sync::Arc;
1338 use futures::executor::Notify;
1339 use futures::executor::NotifyHandle;
1340
1341 struct Car {
1342 permit: bool,
1343 }
1344 impl Notify for Car {
1345 fn notify(&self, _id: usize) {
1346 assert!(self.permit, "unparked without permit");
1347 }
1348 }
1349
1350 fn car(permit: bool) -> NotifyHandle {
1351 Arc::new(Car {
1352 permit: permit,
1353 }).into()
1354 }
1355
1356 // test that once writing is done, unparks
1357 let f = future::lazy(|| {
1358 let io = AsyncIo::new_buf(vec![], 4096);
1359 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1360 conn.state.reading = Reading::KeepAlive;
1361 assert!(conn.poll().unwrap().is_not_ready());
1362
1363 conn.state.writing = Writing::KeepAlive;
1364 assert!(conn.poll_complete().unwrap().is_ready());
1365 Ok::<(), ()>(())
1366 });
1367 ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1368
1369
1370 // test that flushing when not waiting on read doesn't unpark
1371 let f = future::lazy(|| {
1372 let io = AsyncIo::new_buf(vec![], 4096);
1373 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1374 conn.state.writing = Writing::KeepAlive;
1375 assert!(conn.poll_complete().unwrap().is_ready());
1376 Ok::<(), ()>(())
1377 });
1378 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1379
1380
1381 // test that flushing and writing isn't done doesn't unpark
1382 let f = future::lazy(|| {
1383 let io = AsyncIo::new_buf(vec![], 4096);
1384 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1385 conn.state.reading = Reading::KeepAlive;
1386 assert!(conn.poll().unwrap().is_not_ready());
1387 conn.state.writing = Writing::Body(Encoder::length(5_000));
1388 assert!(conn.poll_complete().unwrap().is_ready());
1389 Ok::<(), ()>(())
1390 });
1391 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1392 }
1393
1394 #[test]
1395 fn test_conn_closed_write() {
1396 let io = AsyncIo::new_buf(vec![], 0);
1397 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1398 conn.state.close();
1399
1400 match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1401 Err(_e) => {},
1402 other => panic!("did not return Err: {:?}", other)
1403 }
1404
1405 assert!(conn.state.is_write_closed());
1406 }
1407
1408 #[test]
1409 fn test_conn_write_empty_chunk() {
1410 let io = AsyncIo::new_buf(vec![], 0);
1411 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1412 conn.state.writing = Writing::KeepAlive;
1413
1414 assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1415 assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1416 conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1417 }
1418 */
1419}
1420