1use std::fmt;
2#[cfg(feature = "server")]
3use std::future::Future;
4use std::io;
5use std::marker::{PhantomData, Unpin};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8#[cfg(feature = "server")]
9use std::time::{Duration, Instant};
10
11use crate::rt::{Read, Write};
12use bytes::{Buf, Bytes};
13use futures_util::ready;
14use http::header::{HeaderValue, CONNECTION, TE};
15use http::{HeaderMap, Method, Version};
16use http_body::Frame;
17use httparse::ParserConfig;
18
19use super::io::Buffered;
20use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21use crate::body::DecodedLength;
22#[cfg(feature = "server")]
23use crate::common::time::Time;
24use crate::headers;
25use crate::proto::{BodyLength, MessageHead};
26#[cfg(feature = "server")]
27use crate::rt::Sleep;
28
29const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
30
31/// This handles a connection, which will have been established over an
32/// `Read + Write` (like a socket), and will likely include multiple
33/// `Transaction`s over HTTP.
34///
35/// The connection will determine when a message begins and ends as well as
36/// determine if this connection can be kept alive after the message,
37/// or if it is complete.
38pub(crate) struct Conn<I, B, T> {
39 io: Buffered<I, EncodedBuf<B>>,
40 state: State,
41 _marker: PhantomData<fn(T)>,
42}
43
44impl<I, B, T> Conn<I, B, T>
45where
46 I: Read + Write + Unpin,
47 B: Buf,
48 T: Http1Transaction,
49{
50 pub(crate) fn new(io: I) -> Conn<I, B, T> {
51 Conn {
52 io: Buffered::new(io),
53 state: State {
54 allow_half_close: false,
55 cached_headers: None,
56 error: None,
57 keep_alive: KA::Busy,
58 method: None,
59 h1_parser_config: ParserConfig::default(),
60 h1_max_headers: None,
61 #[cfg(feature = "server")]
62 h1_header_read_timeout: None,
63 #[cfg(feature = "server")]
64 h1_header_read_timeout_fut: None,
65 #[cfg(feature = "server")]
66 h1_header_read_timeout_running: false,
67 #[cfg(feature = "server")]
68 date_header: true,
69 #[cfg(feature = "server")]
70 timer: Time::Empty,
71 preserve_header_case: false,
72 #[cfg(feature = "ffi")]
73 preserve_header_order: false,
74 title_case_headers: false,
75 h09_responses: false,
76 #[cfg(feature = "client")]
77 on_informational: None,
78 notify_read: false,
79 reading: Reading::Init,
80 writing: Writing::Init,
81 upgrade: None,
82 // We assume a modern world where the remote speaks HTTP/1.1.
83 // If they tell us otherwise, we'll downgrade in `read_head`.
84 version: Version::HTTP_11,
85 allow_trailer_fields: false,
86 },
87 _marker: PhantomData,
88 }
89 }
90
91 #[cfg(feature = "server")]
92 pub(crate) fn set_timer(&mut self, timer: Time) {
93 self.state.timer = timer;
94 }
95
96 #[cfg(feature = "server")]
97 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
98 self.io.set_flush_pipeline(enabled);
99 }
100
101 pub(crate) fn set_write_strategy_queue(&mut self) {
102 self.io.set_write_strategy_queue();
103 }
104
105 pub(crate) fn set_max_buf_size(&mut self, max: usize) {
106 self.io.set_max_buf_size(max);
107 }
108
109 #[cfg(feature = "client")]
110 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
111 self.io.set_read_buf_exact_size(sz);
112 }
113
114 pub(crate) fn set_write_strategy_flatten(&mut self) {
115 self.io.set_write_strategy_flatten();
116 }
117
118 pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
119 self.state.h1_parser_config = parser_config;
120 }
121
122 pub(crate) fn set_title_case_headers(&mut self) {
123 self.state.title_case_headers = true;
124 }
125
126 pub(crate) fn set_preserve_header_case(&mut self) {
127 self.state.preserve_header_case = true;
128 }
129
130 #[cfg(feature = "ffi")]
131 pub(crate) fn set_preserve_header_order(&mut self) {
132 self.state.preserve_header_order = true;
133 }
134
135 #[cfg(feature = "client")]
136 pub(crate) fn set_h09_responses(&mut self) {
137 self.state.h09_responses = true;
138 }
139
140 pub(crate) fn set_http1_max_headers(&mut self, val: usize) {
141 self.state.h1_max_headers = Some(val);
142 }
143
144 #[cfg(feature = "server")]
145 pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
146 self.state.h1_header_read_timeout = Some(val);
147 }
148
149 #[cfg(feature = "server")]
150 pub(crate) fn set_allow_half_close(&mut self) {
151 self.state.allow_half_close = true;
152 }
153
154 #[cfg(feature = "server")]
155 pub(crate) fn disable_date_header(&mut self) {
156 self.state.date_header = false;
157 }
158
159 pub(crate) fn into_inner(self) -> (I, Bytes) {
160 self.io.into_inner()
161 }
162
163 pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
164 self.state.upgrade.take()
165 }
166
167 pub(crate) fn is_read_closed(&self) -> bool {
168 self.state.is_read_closed()
169 }
170
171 pub(crate) fn is_write_closed(&self) -> bool {
172 self.state.is_write_closed()
173 }
174
175 pub(crate) fn can_read_head(&self) -> bool {
176 if !matches!(self.state.reading, Reading::Init) {
177 return false;
178 }
179
180 if T::should_read_first() {
181 return true;
182 }
183
184 !matches!(self.state.writing, Writing::Init)
185 }
186
187 pub(crate) fn can_read_body(&self) -> bool {
188 matches!(
189 self.state.reading,
190 Reading::Body(..) | Reading::Continue(..)
191 )
192 }
193
194 #[cfg(feature = "server")]
195 pub(crate) fn has_initial_read_write_state(&self) -> bool {
196 matches!(self.state.reading, Reading::Init)
197 && matches!(self.state.writing, Writing::Init)
198 && self.io.read_buf().is_empty()
199 }
200
201 fn should_error_on_eof(&self) -> bool {
202 // If we're idle, it's probably just the connection closing gracefully.
203 T::should_error_on_parse_eof() && !self.state.is_idle()
204 }
205
206 fn has_h2_prefix(&self) -> bool {
207 let read_buf = self.io.read_buf();
208 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
209 }
210
211 pub(super) fn poll_read_head(
212 &mut self,
213 cx: &mut Context<'_>,
214 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
215 debug_assert!(self.can_read_head());
216 trace!("Conn::read_head");
217
218 #[cfg(feature = "server")]
219 if !self.state.h1_header_read_timeout_running {
220 if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
221 let deadline = Instant::now() + h1_header_read_timeout;
222 self.state.h1_header_read_timeout_running = true;
223 match self.state.h1_header_read_timeout_fut {
224 Some(ref mut h1_header_read_timeout_fut) => {
225 trace!("resetting h1 header read timeout timer");
226 self.state.timer.reset(h1_header_read_timeout_fut, deadline);
227 }
228 None => {
229 trace!("setting h1 header read timeout timer");
230 self.state.h1_header_read_timeout_fut =
231 Some(self.state.timer.sleep_until(deadline));
232 }
233 }
234 }
235 }
236
237 let msg = match self.io.parse::<T>(
238 cx,
239 ParseContext {
240 cached_headers: &mut self.state.cached_headers,
241 req_method: &mut self.state.method,
242 h1_parser_config: self.state.h1_parser_config.clone(),
243 h1_max_headers: self.state.h1_max_headers,
244 preserve_header_case: self.state.preserve_header_case,
245 #[cfg(feature = "ffi")]
246 preserve_header_order: self.state.preserve_header_order,
247 h09_responses: self.state.h09_responses,
248 #[cfg(feature = "client")]
249 on_informational: &mut self.state.on_informational,
250 },
251 ) {
252 Poll::Ready(Ok(msg)) => msg,
253 Poll::Ready(Err(e)) => return self.on_read_head_error(e),
254 Poll::Pending => {
255 #[cfg(feature = "server")]
256 if self.state.h1_header_read_timeout_running {
257 if let Some(ref mut h1_header_read_timeout_fut) =
258 self.state.h1_header_read_timeout_fut
259 {
260 if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
261 self.state.h1_header_read_timeout_running = false;
262
263 warn!("read header from client timeout");
264 return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
265 }
266 }
267 }
268
269 return Poll::Pending;
270 }
271 };
272
273 #[cfg(feature = "server")]
274 {
275 self.state.h1_header_read_timeout_running = false;
276 self.state.h1_header_read_timeout_fut = None;
277 }
278
279 // Note: don't deconstruct `msg` into local variables, it appears
280 // the optimizer doesn't remove the extra copies.
281
282 debug!("incoming body is {}", msg.decode);
283
284 // Prevent accepting HTTP/0.9 responses after the initial one, if any.
285 self.state.h09_responses = false;
286
287 // Drop any OnInformational callbacks, we're done there!
288 #[cfg(feature = "client")]
289 {
290 self.state.on_informational = None;
291 }
292
293 self.state.busy();
294 self.state.keep_alive &= msg.keep_alive;
295 self.state.version = msg.head.version;
296
297 let mut wants = if msg.wants_upgrade {
298 Wants::UPGRADE
299 } else {
300 Wants::EMPTY
301 };
302
303 if msg.decode == DecodedLength::ZERO {
304 if msg.expect_continue {
305 debug!("ignoring expect-continue since body is empty");
306 }
307 self.state.reading = Reading::KeepAlive;
308 if !T::should_read_first() {
309 self.try_keep_alive(cx);
310 }
311 } else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
312 let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
313 self.state.reading = Reading::Continue(Decoder::new(
314 msg.decode,
315 self.state.h1_max_headers,
316 h1_max_header_size,
317 ));
318 wants = wants.add(Wants::EXPECT);
319 } else {
320 let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
321 self.state.reading = Reading::Body(Decoder::new(
322 msg.decode,
323 self.state.h1_max_headers,
324 h1_max_header_size,
325 ));
326 }
327
328 self.state.allow_trailer_fields = msg
329 .head
330 .headers
331 .get(TE)
332 .map_or(false, |te_header| te_header == "trailers");
333
334 Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
335 }
336
337 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
338 // If we are currently waiting on a message, then an empty
339 // message should be reported as an error. If not, it is just
340 // the connection closing gracefully.
341 let must_error = self.should_error_on_eof();
342 self.close_read();
343 self.io.consume_leading_lines();
344 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
345 if was_mid_parse || must_error {
346 // We check if the buf contains the h2 Preface
347 debug!(
348 "parse error ({}) with {} bytes",
349 e,
350 self.io.read_buf().len()
351 );
352 match self.on_parse_error(e) {
353 Ok(()) => Poll::Pending, // XXX: wat?
354 Err(e) => Poll::Ready(Some(Err(e))),
355 }
356 } else {
357 debug!("read eof");
358 self.close_write();
359 Poll::Ready(None)
360 }
361 }
362
363 pub(crate) fn poll_read_body(
364 &mut self,
365 cx: &mut Context<'_>,
366 ) -> Poll<Option<io::Result<Frame<Bytes>>>> {
367 debug_assert!(self.can_read_body());
368
369 let (reading, ret) = match self.state.reading {
370 Reading::Body(ref mut decoder) => {
371 match ready!(decoder.decode(cx, &mut self.io)) {
372 Ok(frame) => {
373 if frame.is_data() {
374 let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
375 let (reading, maybe_frame) = if decoder.is_eof() {
376 debug!("incoming body completed");
377 (
378 Reading::KeepAlive,
379 if !slice.is_empty() {
380 Some(Ok(frame))
381 } else {
382 None
383 },
384 )
385 } else if slice.is_empty() {
386 error!("incoming body unexpectedly ended");
387 // This should be unreachable, since all 3 decoders
388 // either set eof=true or return an Err when reading
389 // an empty slice...
390 (Reading::Closed, None)
391 } else {
392 return Poll::Ready(Some(Ok(frame)));
393 };
394 (reading, Poll::Ready(maybe_frame))
395 } else if frame.is_trailers() {
396 (Reading::Closed, Poll::Ready(Some(Ok(frame))))
397 } else {
398 trace!("discarding unknown frame");
399 (Reading::Closed, Poll::Ready(None))
400 }
401 }
402 Err(e) => {
403 debug!("incoming body decode error: {}", e);
404 (Reading::Closed, Poll::Ready(Some(Err(e))))
405 }
406 }
407 }
408 Reading::Continue(ref decoder) => {
409 // Write the 100 Continue if not already responded...
410 if let Writing::Init = self.state.writing {
411 trace!("automatically sending 100 Continue");
412 let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
413 self.io.headers_buf().extend_from_slice(cont);
414 }
415
416 // And now recurse once in the Reading::Body state...
417 self.state.reading = Reading::Body(decoder.clone());
418 return self.poll_read_body(cx);
419 }
420 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
421 };
422
423 self.state.reading = reading;
424 self.try_keep_alive(cx);
425 ret
426 }
427
428 pub(crate) fn wants_read_again(&mut self) -> bool {
429 let ret = self.state.notify_read;
430 self.state.notify_read = false;
431 ret
432 }
433
434 pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
435 debug_assert!(!self.can_read_head() && !self.can_read_body());
436
437 if self.is_read_closed() {
438 Poll::Pending
439 } else if self.is_mid_message() {
440 self.mid_message_detect_eof(cx)
441 } else {
442 self.require_empty_read(cx)
443 }
444 }
445
446 fn is_mid_message(&self) -> bool {
447 !matches!(
448 (&self.state.reading, &self.state.writing),
449 (&Reading::Init, &Writing::Init)
450 )
451 }
452
453 // This will check to make sure the io object read is empty.
454 //
455 // This should only be called for Clients wanting to enter the idle
456 // state.
457 fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
458 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
459 debug_assert!(!self.is_mid_message());
460 debug_assert!(T::is_client());
461
462 if !self.io.read_buf().is_empty() {
463 debug!("received an unexpected {} bytes", self.io.read_buf().len());
464 return Poll::Ready(Err(crate::Error::new_unexpected_message()));
465 }
466
467 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
468
469 if num_read == 0 {
470 let ret = if self.should_error_on_eof() {
471 trace!("found unexpected EOF on busy connection: {:?}", self.state);
472 Poll::Ready(Err(crate::Error::new_incomplete()))
473 } else {
474 trace!("found EOF on idle connection, closing");
475 Poll::Ready(Ok(()))
476 };
477
478 // order is important: should_error needs state BEFORE close_read
479 self.state.close_read();
480 return ret;
481 }
482
483 debug!(
484 "received unexpected {} bytes on an idle connection",
485 num_read
486 );
487 Poll::Ready(Err(crate::Error::new_unexpected_message()))
488 }
489
490 fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
491 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
492 debug_assert!(self.is_mid_message());
493
494 if self.state.allow_half_close || !self.io.read_buf().is_empty() {
495 return Poll::Pending;
496 }
497
498 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
499
500 if num_read == 0 {
501 trace!("found unexpected EOF on busy connection: {:?}", self.state);
502 self.state.close_read();
503 Poll::Ready(Err(crate::Error::new_incomplete()))
504 } else {
505 Poll::Ready(Ok(()))
506 }
507 }
508
509 fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
510 debug_assert!(!self.state.is_read_closed());
511
512 let result = ready!(self.io.poll_read_from_io(cx));
513 Poll::Ready(result.map_err(|e| {
514 trace!(error = %e, "force_io_read; io error");
515 self.state.close();
516 e
517 }))
518 }
519
520 fn maybe_notify(&mut self, cx: &mut Context<'_>) {
521 // its possible that we returned NotReady from poll() without having
522 // exhausted the underlying Io. We would have done this when we
523 // determined we couldn't keep reading until we knew how writing
524 // would finish.
525
526 match self.state.reading {
527 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
528 return
529 }
530 Reading::Init => (),
531 };
532
533 match self.state.writing {
534 Writing::Body(..) => return,
535 Writing::Init | Writing::KeepAlive | Writing::Closed => (),
536 }
537
538 if !self.io.is_read_blocked() {
539 if self.io.read_buf().is_empty() {
540 match self.io.poll_read_from_io(cx) {
541 Poll::Ready(Ok(n)) => {
542 if n == 0 {
543 trace!("maybe_notify; read eof");
544 if self.state.is_idle() {
545 self.state.close();
546 } else {
547 self.close_read()
548 }
549 return;
550 }
551 }
552 Poll::Pending => {
553 trace!("maybe_notify; read_from_io blocked");
554 return;
555 }
556 Poll::Ready(Err(e)) => {
557 trace!("maybe_notify; read_from_io error: {}", e);
558 self.state.close();
559 self.state.error = Some(crate::Error::new_io(e));
560 }
561 }
562 }
563 self.state.notify_read = true;
564 }
565 }
566
567 fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
568 self.state.try_keep_alive::<T>();
569 self.maybe_notify(cx);
570 }
571
572 pub(crate) fn can_write_head(&self) -> bool {
573 if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
574 return false;
575 }
576
577 match self.state.writing {
578 Writing::Init => self.io.can_headers_buf(),
579 _ => false,
580 }
581 }
582
583 pub(crate) fn can_write_body(&self) -> bool {
584 match self.state.writing {
585 Writing::Body(..) => true,
586 Writing::Init | Writing::KeepAlive | Writing::Closed => false,
587 }
588 }
589
590 pub(crate) fn can_buffer_body(&self) -> bool {
591 self.io.can_buffer()
592 }
593
594 pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
595 if let Some(encoder) = self.encode_head(head, body) {
596 self.state.writing = if !encoder.is_eof() {
597 Writing::Body(encoder)
598 } else if encoder.is_last() {
599 Writing::Closed
600 } else {
601 Writing::KeepAlive
602 };
603 }
604 }
605
606 fn encode_head(
607 &mut self,
608 mut head: MessageHead<T::Outgoing>,
609 body: Option<BodyLength>,
610 ) -> Option<Encoder> {
611 debug_assert!(self.can_write_head());
612
613 if !T::should_read_first() {
614 self.state.busy();
615 }
616
617 self.enforce_version(&mut head);
618
619 let buf = self.io.headers_buf();
620 match super::role::encode_headers::<T>(
621 Encode {
622 head: &mut head,
623 body,
624 #[cfg(feature = "server")]
625 keep_alive: self.state.wants_keep_alive(),
626 req_method: &mut self.state.method,
627 title_case_headers: self.state.title_case_headers,
628 #[cfg(feature = "server")]
629 date_header: self.state.date_header,
630 },
631 buf,
632 ) {
633 Ok(encoder) => {
634 debug_assert!(self.state.cached_headers.is_none());
635 debug_assert!(head.headers.is_empty());
636 self.state.cached_headers = Some(head.headers);
637
638 #[cfg(feature = "client")]
639 {
640 self.state.on_informational =
641 head.extensions.remove::<crate::ext::OnInformational>();
642 }
643
644 Some(encoder)
645 }
646 Err(err) => {
647 self.state.error = Some(err);
648 self.state.writing = Writing::Closed;
649 None
650 }
651 }
652 }
653
654 // Fix keep-alive when Connection: keep-alive header is not present
655 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
656 let outgoing_is_keep_alive = head
657 .headers
658 .get(CONNECTION)
659 .map_or(false, headers::connection_keep_alive);
660
661 if !outgoing_is_keep_alive {
662 match head.version {
663 // If response is version 1.0 and keep-alive is not present in the response,
664 // disable keep-alive so the server closes the connection
665 Version::HTTP_10 => self.state.disable_keep_alive(),
666 // If response is version 1.1 and keep-alive is wanted, add
667 // Connection: keep-alive header when not present
668 Version::HTTP_11 => {
669 if self.state.wants_keep_alive() {
670 head.headers
671 .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
672 }
673 }
674 _ => (),
675 }
676 }
677 }
678
679 // If we know the remote speaks an older version, we try to fix up any messages
680 // to work with our older peer.
681 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
682 match self.state.version {
683 Version::HTTP_10 => {
684 // Fixes response or connection when keep-alive header is not present
685 self.fix_keep_alive(head);
686 // If the remote only knows HTTP/1.0, we should force ourselves
687 // to do only speak HTTP/1.0 as well.
688 head.version = Version::HTTP_10;
689 }
690 Version::HTTP_11 => {
691 if let KA::Disabled = self.state.keep_alive.status() {
692 head.headers
693 .insert(CONNECTION, HeaderValue::from_static("close"));
694 }
695 }
696 _ => (),
697 }
698 // If the remote speaks HTTP/1.1, then it *should* be fine with
699 // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
700 // the user's headers be.
701 }
702
703 pub(crate) fn write_body(&mut self, chunk: B) {
704 debug_assert!(self.can_write_body() && self.can_buffer_body());
705 // empty chunks should be discarded at Dispatcher level
706 debug_assert!(chunk.remaining() != 0);
707
708 let state = match self.state.writing {
709 Writing::Body(ref mut encoder) => {
710 self.io.buffer(encoder.encode(chunk));
711
712 if !encoder.is_eof() {
713 return;
714 }
715
716 if encoder.is_last() {
717 Writing::Closed
718 } else {
719 Writing::KeepAlive
720 }
721 }
722 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
723 };
724
725 self.state.writing = state;
726 }
727
728 pub(crate) fn write_trailers(&mut self, trailers: HeaderMap) {
729 if T::is_server() && !self.state.allow_trailer_fields {
730 debug!("trailers not allowed to be sent");
731 return;
732 }
733 debug_assert!(self.can_write_body() && self.can_buffer_body());
734
735 match self.state.writing {
736 Writing::Body(ref encoder) => {
737 if let Some(enc_buf) =
738 encoder.encode_trailers(trailers, self.state.title_case_headers)
739 {
740 self.io.buffer(enc_buf);
741
742 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
743 Writing::Closed
744 } else {
745 Writing::KeepAlive
746 };
747 }
748 }
749 _ => unreachable!("write_trailers invalid state: {:?}", self.state.writing),
750 }
751 }
752
753 pub(crate) fn write_body_and_end(&mut self, chunk: B) {
754 debug_assert!(self.can_write_body() && self.can_buffer_body());
755 // empty chunks should be discarded at Dispatcher level
756 debug_assert!(chunk.remaining() != 0);
757
758 let state = match self.state.writing {
759 Writing::Body(ref encoder) => {
760 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
761 if can_keep_alive {
762 Writing::KeepAlive
763 } else {
764 Writing::Closed
765 }
766 }
767 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
768 };
769
770 self.state.writing = state;
771 }
772
773 pub(crate) fn end_body(&mut self) -> crate::Result<()> {
774 debug_assert!(self.can_write_body());
775
776 let encoder = match self.state.writing {
777 Writing::Body(ref mut enc) => enc,
778 _ => return Ok(()),
779 };
780
781 // end of stream, that means we should try to eof
782 match encoder.end() {
783 Ok(end) => {
784 if let Some(end) = end {
785 self.io.buffer(end);
786 }
787
788 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
789 Writing::Closed
790 } else {
791 Writing::KeepAlive
792 };
793
794 Ok(())
795 }
796 Err(not_eof) => {
797 self.state.writing = Writing::Closed;
798 Err(crate::Error::new_body_write_aborted().with(not_eof))
799 }
800 }
801 }
802
803 // When we get a parse error, depending on what side we are, we might be able
804 // to write a response before closing the connection.
805 //
806 // - Client: there is nothing we can do
807 // - Server: if Response hasn't been written yet, we can send a 4xx response
808 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
809 if let Writing::Init = self.state.writing {
810 if self.has_h2_prefix() {
811 return Err(crate::Error::new_version_h2());
812 }
813 if let Some(msg) = T::on_error(&err) {
814 // Drop the cached headers so as to not trigger a debug
815 // assert in `write_head`...
816 self.state.cached_headers.take();
817 self.write_head(msg, None);
818 self.state.error = Some(err);
819 return Ok(());
820 }
821 }
822
823 // fallback is pass the error back up
824 Err(err)
825 }
826
827 pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
828 ready!(Pin::new(&mut self.io).poll_flush(cx))?;
829 self.try_keep_alive(cx);
830 trace!("flushed({}): {:?}", T::LOG, self.state);
831 Poll::Ready(Ok(()))
832 }
833
834 pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
835 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
836 Ok(()) => {
837 trace!("shut down IO complete");
838 Poll::Ready(Ok(()))
839 }
840 Err(e) => {
841 debug!("error shutting down IO: {}", e);
842 Poll::Ready(Err(e))
843 }
844 }
845 }
846
847 /// If the read side can be cheaply drained, do so. Otherwise, close.
848 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
849 if let Reading::Continue(ref decoder) = self.state.reading {
850 // skip sending the 100-continue
851 // just move forward to a read, in case a tiny body was included
852 self.state.reading = Reading::Body(decoder.clone());
853 }
854
855 let _ = self.poll_read_body(cx);
856
857 // If still in Reading::Body, just give up
858 match self.state.reading {
859 Reading::Init | Reading::KeepAlive => {
860 trace!("body drained")
861 }
862 _ => self.close_read(),
863 }
864 }
865
866 pub(crate) fn close_read(&mut self) {
867 self.state.close_read();
868 }
869
870 pub(crate) fn close_write(&mut self) {
871 self.state.close_write();
872 }
873
874 #[cfg(feature = "server")]
875 pub(crate) fn disable_keep_alive(&mut self) {
876 if self.state.is_idle() {
877 trace!("disable_keep_alive; closing idle connection");
878 self.state.close();
879 } else {
880 trace!("disable_keep_alive; in-progress connection");
881 self.state.disable_keep_alive();
882 }
883 }
884
885 pub(crate) fn take_error(&mut self) -> crate::Result<()> {
886 if let Some(err) = self.state.error.take() {
887 Err(err)
888 } else {
889 Ok(())
890 }
891 }
892
893 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
894 trace!("{}: prepare possible HTTP upgrade", T::LOG);
895 self.state.prepare_upgrade()
896 }
897}
898
899impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
900 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
901 f&mut DebugStruct<'_, '_>.debug_struct("Conn")
902 .field("state", &self.state)
903 .field(name:"io", &self.io)
904 .finish()
905 }
906}
907
908// B and T are never pinned
909impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
910
911struct State {
912 allow_half_close: bool,
913 /// Re-usable HeaderMap to reduce allocating new ones.
914 cached_headers: Option<HeaderMap>,
915 /// If an error occurs when there wasn't a direct way to return it
916 /// back to the user, this is set.
917 error: Option<crate::Error>,
918 /// Current keep-alive status.
919 keep_alive: KA,
920 /// If mid-message, the HTTP Method that started it.
921 ///
922 /// This is used to know things such as if the message can include
923 /// a body or not.
924 method: Option<Method>,
925 h1_parser_config: ParserConfig,
926 h1_max_headers: Option<usize>,
927 #[cfg(feature = "server")]
928 h1_header_read_timeout: Option<Duration>,
929 #[cfg(feature = "server")]
930 h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
931 #[cfg(feature = "server")]
932 h1_header_read_timeout_running: bool,
933 #[cfg(feature = "server")]
934 date_header: bool,
935 #[cfg(feature = "server")]
936 timer: Time,
937 preserve_header_case: bool,
938 #[cfg(feature = "ffi")]
939 preserve_header_order: bool,
940 title_case_headers: bool,
941 h09_responses: bool,
942 /// If set, called with each 1xx informational response received for
943 /// the current request. MUST be unset after a non-1xx response is
944 /// received.
945 #[cfg(feature = "client")]
946 on_informational: Option<crate::ext::OnInformational>,
947 /// Set to true when the Dispatcher should poll read operations
948 /// again. See the `maybe_notify` method for more.
949 notify_read: bool,
950 /// State of allowed reads
951 reading: Reading,
952 /// State of allowed writes
953 writing: Writing,
954 /// An expected pending HTTP upgrade.
955 upgrade: Option<crate::upgrade::Pending>,
956 /// Either HTTP/1.0 or 1.1 connection
957 version: Version,
958 /// Flag to track if trailer fields are allowed to be sent
959 allow_trailer_fields: bool,
960}
961
962#[derive(Debug)]
963enum Reading {
964 Init,
965 Continue(Decoder),
966 Body(Decoder),
967 KeepAlive,
968 Closed,
969}
970
971enum Writing {
972 Init,
973 Body(Encoder),
974 KeepAlive,
975 Closed,
976}
977
978impl fmt::Debug for State {
979 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
980 let mut builder: DebugStruct<'_, '_> = f.debug_struct(name:"State");
981 builder
982 .field("reading", &self.reading)
983 .field("writing", &self.writing)
984 .field(name:"keep_alive", &self.keep_alive);
985
986 // Only show error field if it's interesting...
987 if let Some(ref error: &Error) = self.error {
988 builder.field(name:"error", value:error);
989 }
990
991 if self.allow_half_close {
992 builder.field(name:"allow_half_close", &true);
993 }
994
995 // Purposefully leaving off other fields..
996
997 builder.finish()
998 }
999}
1000
1001impl fmt::Debug for Writing {
1002 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1003 match *self {
1004 Writing::Init => f.write_str(data:"Init"),
1005 Writing::Body(ref enc: &Encoder) => f.debug_tuple(name:"Body").field(enc).finish(),
1006 Writing::KeepAlive => f.write_str(data:"KeepAlive"),
1007 Writing::Closed => f.write_str(data:"Closed"),
1008 }
1009 }
1010}
1011
1012impl std::ops::BitAndAssign<bool> for KA {
1013 fn bitand_assign(&mut self, enabled: bool) {
1014 if !enabled {
1015 trace!("remote disabling keep-alive");
1016 *self = KA::Disabled;
1017 }
1018 }
1019}
1020
1021#[derive(Clone, Copy, Debug, Default)]
1022enum KA {
1023 Idle,
1024 #[default]
1025 Busy,
1026 Disabled,
1027}
1028
1029impl KA {
1030 fn idle(&mut self) {
1031 *self = KA::Idle;
1032 }
1033
1034 fn busy(&mut self) {
1035 *self = KA::Busy;
1036 }
1037
1038 fn disable(&mut self) {
1039 *self = KA::Disabled;
1040 }
1041
1042 fn status(&self) -> KA {
1043 *self
1044 }
1045}
1046
1047impl State {
1048 fn close(&mut self) {
1049 trace!("State::close()");
1050 self.reading = Reading::Closed;
1051 self.writing = Writing::Closed;
1052 self.keep_alive.disable();
1053 }
1054
1055 fn close_read(&mut self) {
1056 trace!("State::close_read()");
1057 self.reading = Reading::Closed;
1058 self.keep_alive.disable();
1059 }
1060
1061 fn close_write(&mut self) {
1062 trace!("State::close_write()");
1063 self.writing = Writing::Closed;
1064 self.keep_alive.disable();
1065 }
1066
1067 fn wants_keep_alive(&self) -> bool {
1068 !matches!(self.keep_alive.status(), KA::Disabled)
1069 }
1070
1071 fn try_keep_alive<T: Http1Transaction>(&mut self) {
1072 match (&self.reading, &self.writing) {
1073 (&Reading::KeepAlive, &Writing::KeepAlive) => {
1074 if let KA::Busy = self.keep_alive.status() {
1075 self.idle::<T>();
1076 } else {
1077 trace!(
1078 "try_keep_alive({}): could keep-alive, but status = {:?}",
1079 T::LOG,
1080 self.keep_alive
1081 );
1082 self.close();
1083 }
1084 }
1085 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
1086 self.close()
1087 }
1088 _ => (),
1089 }
1090 }
1091
1092 fn disable_keep_alive(&mut self) {
1093 self.keep_alive.disable()
1094 }
1095
1096 fn busy(&mut self) {
1097 if let KA::Disabled = self.keep_alive.status() {
1098 return;
1099 }
1100 self.keep_alive.busy();
1101 }
1102
1103 fn idle<T: Http1Transaction>(&mut self) {
1104 debug_assert!(!self.is_idle(), "State::idle() called while idle");
1105
1106 self.method = None;
1107 self.keep_alive.idle();
1108
1109 if !self.is_idle() {
1110 self.close();
1111 return;
1112 }
1113
1114 self.reading = Reading::Init;
1115 self.writing = Writing::Init;
1116
1117 // !T::should_read_first() means Client.
1118 //
1119 // If Client connection has just gone idle, the Dispatcher
1120 // should try the poll loop one more time, so as to poll the
1121 // pending requests stream.
1122 if !T::should_read_first() {
1123 self.notify_read = true;
1124 }
1125
1126 #[cfg(feature = "server")]
1127 if self.h1_header_read_timeout.is_some() {
1128 // Next read will start and poll the header read timeout,
1129 // so we can close the connection if another header isn't
1130 // received in a timely manner.
1131 self.notify_read = true;
1132 }
1133 }
1134
1135 fn is_idle(&self) -> bool {
1136 matches!(self.keep_alive.status(), KA::Idle)
1137 }
1138
1139 fn is_read_closed(&self) -> bool {
1140 matches!(self.reading, Reading::Closed)
1141 }
1142
1143 fn is_write_closed(&self) -> bool {
1144 matches!(self.writing, Writing::Closed)
1145 }
1146
1147 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1148 let (tx, rx) = crate::upgrade::pending();
1149 self.upgrade = Some(tx);
1150 rx
1151 }
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156 #[cfg(all(feature = "nightly", not(miri)))]
1157 #[bench]
1158 fn bench_read_head_short(b: &mut ::test::Bencher) {
1159 use super::*;
1160 use crate::common::io::Compat;
1161 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1162 let len = s.len();
1163 b.bytes = len as u64;
1164
1165 // an empty IO, we'll be skipping and using the read buffer anyways
1166 let io = Compat(tokio_test::io::Builder::new().build());
1167 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1168 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1169 conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1170
1171 let rt = tokio::runtime::Builder::new_current_thread()
1172 .enable_all()
1173 .build()
1174 .unwrap();
1175
1176 b.iter(|| {
1177 rt.block_on(futures_util::future::poll_fn(|cx| {
1178 match conn.poll_read_head(cx) {
1179 Poll::Ready(Some(Ok(x))) => {
1180 ::test::black_box(&x);
1181 let mut headers = x.0.headers;
1182 headers.clear();
1183 conn.state.cached_headers = Some(headers);
1184 }
1185 f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1186 }
1187
1188 conn.io.read_buf_mut().reserve(1);
1189 unsafe {
1190 conn.io.read_buf_mut().set_len(len);
1191 }
1192 conn.state.reading = Reading::Init;
1193 Poll::Ready(())
1194 }));
1195 });
1196 }
1197
1198 /*
1199 //TODO: rewrite these using dispatch... someday...
1200 use futures::{Async, Future, Stream, Sink};
1201 use futures::future;
1202
1203 use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1204 use super::super::Encoder;
1205 use mock::AsyncIo;
1206
1207 use super::{Conn, Decoder, Reading, Writing};
1208 use ::uri::Uri;
1209
1210 use std::str::FromStr;
1211
1212 #[test]
1213 fn test_conn_init_read() {
1214 let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1215 let len = good_message.len();
1216 let io = AsyncIo::new_buf(good_message, len);
1217 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1218
1219 match conn.poll().unwrap() {
1220 Async::Ready(Some(Frame::Message { message, body: false })) => {
1221 assert_eq!(message, MessageHead {
1222 subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1223 .. MessageHead::default()
1224 })
1225 },
1226 f => panic!("frame is not Frame::Message: {:?}", f)
1227 }
1228 }
1229
1230 #[test]
1231 fn test_conn_parse_partial() {
1232 let _: Result<(), ()> = future::lazy(|| {
1233 let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1234 let io = AsyncIo::new_buf(good_message, 10);
1235 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1236 assert!(conn.poll().unwrap().is_not_ready());
1237 conn.io.io_mut().block_in(50);
1238 let async = conn.poll().unwrap();
1239 assert!(async.is_ready());
1240 match async {
1241 Async::Ready(Some(Frame::Message { .. })) => (),
1242 f => panic!("frame is not Message: {:?}", f),
1243 }
1244 Ok(())
1245 }).wait();
1246 }
1247
1248 #[test]
1249 fn test_conn_init_read_eof_idle() {
1250 let io = AsyncIo::new_buf(vec![], 1);
1251 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1252 conn.state.idle();
1253
1254 match conn.poll().unwrap() {
1255 Async::Ready(None) => {},
1256 other => panic!("frame is not None: {:?}", other)
1257 }
1258 }
1259
1260 #[test]
1261 fn test_conn_init_read_eof_idle_partial_parse() {
1262 let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1263 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1264 conn.state.idle();
1265
1266 match conn.poll() {
1267 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1268 other => panic!("unexpected frame: {:?}", other)
1269 }
1270 }
1271
1272 #[test]
1273 fn test_conn_init_read_eof_busy() {
1274 let _: Result<(), ()> = future::lazy(|| {
1275 // server ignores
1276 let io = AsyncIo::new_eof();
1277 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1278 conn.state.busy();
1279
1280 match conn.poll().unwrap() {
1281 Async::Ready(None) => {},
1282 other => panic!("unexpected frame: {:?}", other)
1283 }
1284
1285 // client
1286 let io = AsyncIo::new_eof();
1287 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1288 conn.state.busy();
1289
1290 match conn.poll() {
1291 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1292 other => panic!("unexpected frame: {:?}", other)
1293 }
1294 Ok(())
1295 }).wait();
1296 }
1297
1298 #[test]
1299 fn test_conn_body_finish_read_eof() {
1300 let _: Result<(), ()> = future::lazy(|| {
1301 let io = AsyncIo::new_eof();
1302 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1303 conn.state.busy();
1304 conn.state.writing = Writing::KeepAlive;
1305 conn.state.reading = Reading::Body(Decoder::length(0));
1306
1307 match conn.poll() {
1308 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1309 other => panic!("unexpected frame: {:?}", other)
1310 }
1311
1312 // conn eofs, but tokio-proto will call poll() again, before calling flush()
1313 // the conn eof in this case is perfectly fine
1314
1315 match conn.poll() {
1316 Ok(Async::Ready(None)) => (),
1317 other => panic!("unexpected frame: {:?}", other)
1318 }
1319 Ok(())
1320 }).wait();
1321 }
1322
1323 #[test]
1324 fn test_conn_message_empty_body_read_eof() {
1325 let _: Result<(), ()> = future::lazy(|| {
1326 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1327 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1328 conn.state.busy();
1329 conn.state.writing = Writing::KeepAlive;
1330
1331 match conn.poll() {
1332 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1333 other => panic!("unexpected frame: {:?}", other)
1334 }
1335
1336 // conn eofs, but tokio-proto will call poll() again, before calling flush()
1337 // the conn eof in this case is perfectly fine
1338
1339 match conn.poll() {
1340 Ok(Async::Ready(None)) => (),
1341 other => panic!("unexpected frame: {:?}", other)
1342 }
1343 Ok(())
1344 }).wait();
1345 }
1346
1347 #[test]
1348 fn test_conn_read_body_end() {
1349 let _: Result<(), ()> = future::lazy(|| {
1350 let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1351 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1352 conn.state.busy();
1353
1354 match conn.poll() {
1355 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1356 other => panic!("unexpected frame: {:?}", other)
1357 }
1358
1359 match conn.poll() {
1360 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1361 other => panic!("unexpected frame: {:?}", other)
1362 }
1363
1364 // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1365 match conn.poll() {
1366 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1367 other => panic!("unexpected frame: {:?}", other)
1368 }
1369
1370 match conn.poll() {
1371 Ok(Async::NotReady) => (),
1372 other => panic!("unexpected frame: {:?}", other)
1373 }
1374 Ok(())
1375 }).wait();
1376 }
1377
1378 #[test]
1379 fn test_conn_closed_read() {
1380 let io = AsyncIo::new_buf(vec![], 0);
1381 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1382 conn.state.close();
1383
1384 match conn.poll().unwrap() {
1385 Async::Ready(None) => {},
1386 other => panic!("frame is not None: {:?}", other)
1387 }
1388 }
1389
1390 #[test]
1391 fn test_conn_body_write_length() {
1392 let _ = pretty_env_logger::try_init();
1393 let _: Result<(), ()> = future::lazy(|| {
1394 let io = AsyncIo::new_buf(vec![], 0);
1395 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1396 let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1397 conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1398
1399 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1400 assert!(!conn.can_buffer_body());
1401
1402 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1403
1404 conn.io.io_mut().block_in(1024 * 3);
1405 assert!(conn.poll_complete().unwrap().is_not_ready());
1406 conn.io.io_mut().block_in(1024 * 3);
1407 assert!(conn.poll_complete().unwrap().is_not_ready());
1408 conn.io.io_mut().block_in(max * 2);
1409 assert!(conn.poll_complete().unwrap().is_ready());
1410
1411 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1412 Ok(())
1413 }).wait();
1414 }
1415
1416 #[test]
1417 fn test_conn_body_write_chunked() {
1418 let _: Result<(), ()> = future::lazy(|| {
1419 let io = AsyncIo::new_buf(vec![], 4096);
1420 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1421 conn.state.writing = Writing::Body(Encoder::chunked());
1422
1423 assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1424 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1425 Ok(())
1426 }).wait();
1427 }
1428
1429 #[test]
1430 fn test_conn_body_flush() {
1431 let _: Result<(), ()> = future::lazy(|| {
1432 let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1433 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1434 conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1435 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1436 assert!(!conn.can_buffer_body());
1437 conn.io.io_mut().block_in(1024 * 1024 * 5);
1438 assert!(conn.poll_complete().unwrap().is_ready());
1439 assert!(conn.can_buffer_body());
1440 assert!(conn.io.io_mut().flushed());
1441
1442 Ok(())
1443 }).wait();
1444 }
1445
1446 #[test]
1447 fn test_conn_parking() {
1448 use std::sync::Arc;
1449 use futures::executor::Notify;
1450 use futures::executor::NotifyHandle;
1451
1452 struct Car {
1453 permit: bool,
1454 }
1455 impl Notify for Car {
1456 fn notify(&self, _id: usize) {
1457 assert!(self.permit, "unparked without permit");
1458 }
1459 }
1460
1461 fn car(permit: bool) -> NotifyHandle {
1462 Arc::new(Car {
1463 permit: permit,
1464 }).into()
1465 }
1466
1467 // test that once writing is done, unparks
1468 let f = future::lazy(|| {
1469 let io = AsyncIo::new_buf(vec![], 4096);
1470 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1471 conn.state.reading = Reading::KeepAlive;
1472 assert!(conn.poll().unwrap().is_not_ready());
1473
1474 conn.state.writing = Writing::KeepAlive;
1475 assert!(conn.poll_complete().unwrap().is_ready());
1476 Ok::<(), ()>(())
1477 });
1478 ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1479
1480
1481 // test that flushing when not waiting on read doesn't unpark
1482 let f = future::lazy(|| {
1483 let io = AsyncIo::new_buf(vec![], 4096);
1484 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1485 conn.state.writing = Writing::KeepAlive;
1486 assert!(conn.poll_complete().unwrap().is_ready());
1487 Ok::<(), ()>(())
1488 });
1489 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1490
1491
1492 // test that flushing and writing isn't done doesn't unpark
1493 let f = future::lazy(|| {
1494 let io = AsyncIo::new_buf(vec![], 4096);
1495 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1496 conn.state.reading = Reading::KeepAlive;
1497 assert!(conn.poll().unwrap().is_not_ready());
1498 conn.state.writing = Writing::Body(Encoder::length(5_000));
1499 assert!(conn.poll_complete().unwrap().is_ready());
1500 Ok::<(), ()>(())
1501 });
1502 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1503 }
1504
1505 #[test]
1506 fn test_conn_closed_write() {
1507 let io = AsyncIo::new_buf(vec![], 0);
1508 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1509 conn.state.close();
1510
1511 match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1512 Err(_e) => {},
1513 other => panic!("did not return Err: {:?}", other)
1514 }
1515
1516 assert!(conn.state.is_write_closed());
1517 }
1518
1519 #[test]
1520 fn test_conn_write_empty_chunk() {
1521 let io = AsyncIo::new_buf(vec![], 0);
1522 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1523 conn.state.writing = Writing::KeepAlive;
1524
1525 assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1526 assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1527 conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1528 }
1529 */
1530}
1531