1 | use std::fmt; |
2 | use std::io; |
3 | use std::marker::PhantomData; |
4 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
5 | use std::time::Duration; |
6 | |
7 | use bytes::{Buf, Bytes}; |
8 | use http::header::{HeaderValue, CONNECTION}; |
9 | use http::{HeaderMap, Method, Version}; |
10 | use httparse::ParserConfig; |
11 | use tokio::io::{AsyncRead, AsyncWrite}; |
12 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
13 | use tokio::time::Sleep; |
14 | use tracing::{debug, error, trace}; |
15 | |
16 | use super::io::Buffered; |
17 | use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; |
18 | use crate::body::DecodedLength; |
19 | use crate::common::{task, Pin, Poll, Unpin}; |
20 | use crate::headers::connection_keep_alive; |
21 | use crate::proto::{BodyLength, MessageHead}; |
22 | |
23 | const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0 \r\n\r\nSM \r\n\r\n" ; |
24 | |
25 | /// This handles a connection, which will have been established over an |
26 | /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple |
27 | /// `Transaction`s over HTTP. |
28 | /// |
29 | /// The connection will determine when a message begins and ends as well as |
30 | /// determine if this connection can be kept alive after the message, |
31 | /// or if it is complete. |
32 | pub(crate) struct Conn<I, B, T> { |
33 | io: Buffered<I, EncodedBuf<B>>, |
34 | state: State, |
35 | _marker: PhantomData<fn(T)>, |
36 | } |
37 | |
38 | impl<I, B, T> Conn<I, B, T> |
39 | where |
40 | I: AsyncRead + AsyncWrite + Unpin, |
41 | B: Buf, |
42 | T: Http1Transaction, |
43 | { |
44 | pub(crate) fn new(io: I) -> Conn<I, B, T> { |
45 | Conn { |
46 | io: Buffered::new(io), |
47 | state: State { |
48 | allow_half_close: false, |
49 | cached_headers: None, |
50 | error: None, |
51 | keep_alive: KA::Busy, |
52 | method: None, |
53 | h1_parser_config: ParserConfig::default(), |
54 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
55 | h1_header_read_timeout: None, |
56 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
57 | h1_header_read_timeout_fut: None, |
58 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
59 | h1_header_read_timeout_running: false, |
60 | preserve_header_case: false, |
61 | #[cfg (feature = "ffi" )] |
62 | preserve_header_order: false, |
63 | title_case_headers: false, |
64 | h09_responses: false, |
65 | #[cfg (feature = "ffi" )] |
66 | on_informational: None, |
67 | #[cfg (feature = "ffi" )] |
68 | raw_headers: false, |
69 | notify_read: false, |
70 | reading: Reading::Init, |
71 | writing: Writing::Init, |
72 | upgrade: None, |
73 | // We assume a modern world where the remote speaks HTTP/1.1. |
74 | // If they tell us otherwise, we'll downgrade in `read_head`. |
75 | version: Version::HTTP_11, |
76 | }, |
77 | _marker: PhantomData, |
78 | } |
79 | } |
80 | |
81 | #[cfg (feature = "server" )] |
82 | pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) { |
83 | self.io.set_flush_pipeline(enabled); |
84 | } |
85 | |
86 | pub(crate) fn set_write_strategy_queue(&mut self) { |
87 | self.io.set_write_strategy_queue(); |
88 | } |
89 | |
90 | pub(crate) fn set_max_buf_size(&mut self, max: usize) { |
91 | self.io.set_max_buf_size(max); |
92 | } |
93 | |
94 | #[cfg (feature = "client" )] |
95 | pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) { |
96 | self.io.set_read_buf_exact_size(sz); |
97 | } |
98 | |
99 | pub(crate) fn set_write_strategy_flatten(&mut self) { |
100 | self.io.set_write_strategy_flatten(); |
101 | } |
102 | |
103 | #[cfg (feature = "client" )] |
104 | pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) { |
105 | self.state.h1_parser_config = parser_config; |
106 | } |
107 | |
108 | pub(crate) fn set_title_case_headers(&mut self) { |
109 | self.state.title_case_headers = true; |
110 | } |
111 | |
112 | pub(crate) fn set_preserve_header_case(&mut self) { |
113 | self.state.preserve_header_case = true; |
114 | } |
115 | |
116 | #[cfg (feature = "ffi" )] |
117 | pub(crate) fn set_preserve_header_order(&mut self) { |
118 | self.state.preserve_header_order = true; |
119 | } |
120 | |
121 | #[cfg (feature = "client" )] |
122 | pub(crate) fn set_h09_responses(&mut self) { |
123 | self.state.h09_responses = true; |
124 | } |
125 | |
126 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
127 | pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) { |
128 | self.state.h1_header_read_timeout = Some(val); |
129 | } |
130 | |
131 | #[cfg (feature = "server" )] |
132 | pub(crate) fn set_allow_half_close(&mut self) { |
133 | self.state.allow_half_close = true; |
134 | } |
135 | |
136 | #[cfg (feature = "ffi" )] |
137 | pub(crate) fn set_raw_headers(&mut self, enabled: bool) { |
138 | self.state.raw_headers = enabled; |
139 | } |
140 | |
141 | pub(crate) fn into_inner(self) -> (I, Bytes) { |
142 | self.io.into_inner() |
143 | } |
144 | |
145 | pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> { |
146 | self.state.upgrade.take() |
147 | } |
148 | |
149 | pub(crate) fn is_read_closed(&self) -> bool { |
150 | self.state.is_read_closed() |
151 | } |
152 | |
153 | pub(crate) fn is_write_closed(&self) -> bool { |
154 | self.state.is_write_closed() |
155 | } |
156 | |
157 | pub(crate) fn can_read_head(&self) -> bool { |
158 | if !matches!(self.state.reading, Reading::Init) { |
159 | return false; |
160 | } |
161 | |
162 | if T::should_read_first() { |
163 | return true; |
164 | } |
165 | |
166 | !matches!(self.state.writing, Writing::Init) |
167 | } |
168 | |
169 | pub(crate) fn can_read_body(&self) -> bool { |
170 | match self.state.reading { |
171 | Reading::Body(..) | Reading::Continue(..) => true, |
172 | _ => false, |
173 | } |
174 | } |
175 | |
176 | fn should_error_on_eof(&self) -> bool { |
177 | // If we're idle, it's probably just the connection closing gracefully. |
178 | T::should_error_on_parse_eof() && !self.state.is_idle() |
179 | } |
180 | |
181 | fn has_h2_prefix(&self) -> bool { |
182 | let read_buf = self.io.read_buf(); |
183 | read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE |
184 | } |
185 | |
186 | pub(super) fn poll_read_head( |
187 | &mut self, |
188 | cx: &mut task::Context<'_>, |
189 | ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> { |
190 | debug_assert!(self.can_read_head()); |
191 | trace!("Conn::read_head" ); |
192 | |
193 | let msg = match ready!(self.io.parse::<T>( |
194 | cx, |
195 | ParseContext { |
196 | cached_headers: &mut self.state.cached_headers, |
197 | req_method: &mut self.state.method, |
198 | h1_parser_config: self.state.h1_parser_config.clone(), |
199 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
200 | h1_header_read_timeout: self.state.h1_header_read_timeout, |
201 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
202 | h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut, |
203 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
204 | h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running, |
205 | preserve_header_case: self.state.preserve_header_case, |
206 | #[cfg (feature = "ffi" )] |
207 | preserve_header_order: self.state.preserve_header_order, |
208 | h09_responses: self.state.h09_responses, |
209 | #[cfg (feature = "ffi" )] |
210 | on_informational: &mut self.state.on_informational, |
211 | #[cfg (feature = "ffi" )] |
212 | raw_headers: self.state.raw_headers, |
213 | } |
214 | )) { |
215 | Ok(msg) => msg, |
216 | Err(e) => return self.on_read_head_error(e), |
217 | }; |
218 | |
219 | // Note: don't deconstruct `msg` into local variables, it appears |
220 | // the optimizer doesn't remove the extra copies. |
221 | |
222 | debug!("incoming body is {}" , msg.decode); |
223 | |
224 | // Prevent accepting HTTP/0.9 responses after the initial one, if any. |
225 | self.state.h09_responses = false; |
226 | |
227 | // Drop any OnInformational callbacks, we're done there! |
228 | #[cfg (feature = "ffi" )] |
229 | { |
230 | self.state.on_informational = None; |
231 | } |
232 | |
233 | self.state.busy(); |
234 | self.state.keep_alive &= msg.keep_alive; |
235 | self.state.version = msg.head.version; |
236 | |
237 | let mut wants = if msg.wants_upgrade { |
238 | Wants::UPGRADE |
239 | } else { |
240 | Wants::EMPTY |
241 | }; |
242 | |
243 | if msg.decode == DecodedLength::ZERO { |
244 | if msg.expect_continue { |
245 | debug!("ignoring expect-continue since body is empty" ); |
246 | } |
247 | self.state.reading = Reading::KeepAlive; |
248 | if !T::should_read_first() { |
249 | self.try_keep_alive(cx); |
250 | } |
251 | } else if msg.expect_continue { |
252 | self.state.reading = Reading::Continue(Decoder::new(msg.decode)); |
253 | wants = wants.add(Wants::EXPECT); |
254 | } else { |
255 | self.state.reading = Reading::Body(Decoder::new(msg.decode)); |
256 | } |
257 | |
258 | Poll::Ready(Some(Ok((msg.head, msg.decode, wants)))) |
259 | } |
260 | |
261 | fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> { |
262 | // If we are currently waiting on a message, then an empty |
263 | // message should be reported as an error. If not, it is just |
264 | // the connection closing gracefully. |
265 | let must_error = self.should_error_on_eof(); |
266 | self.close_read(); |
267 | self.io.consume_leading_lines(); |
268 | let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); |
269 | if was_mid_parse || must_error { |
270 | // We check if the buf contains the h2 Preface |
271 | debug!( |
272 | "parse error ( {}) with {} bytes" , |
273 | e, |
274 | self.io.read_buf().len() |
275 | ); |
276 | match self.on_parse_error(e) { |
277 | Ok(()) => Poll::Pending, // XXX: wat? |
278 | Err(e) => Poll::Ready(Some(Err(e))), |
279 | } |
280 | } else { |
281 | debug!("read eof" ); |
282 | self.close_write(); |
283 | Poll::Ready(None) |
284 | } |
285 | } |
286 | |
287 | pub(crate) fn poll_read_body( |
288 | &mut self, |
289 | cx: &mut task::Context<'_>, |
290 | ) -> Poll<Option<io::Result<Bytes>>> { |
291 | debug_assert!(self.can_read_body()); |
292 | |
293 | let (reading, ret) = match self.state.reading { |
294 | Reading::Body(ref mut decoder) => { |
295 | match ready!(decoder.decode(cx, &mut self.io)) { |
296 | Ok(slice) => { |
297 | let (reading, chunk) = if decoder.is_eof() { |
298 | debug!("incoming body completed" ); |
299 | ( |
300 | Reading::KeepAlive, |
301 | if !slice.is_empty() { |
302 | Some(Ok(slice)) |
303 | } else { |
304 | None |
305 | }, |
306 | ) |
307 | } else if slice.is_empty() { |
308 | error!("incoming body unexpectedly ended" ); |
309 | // This should be unreachable, since all 3 decoders |
310 | // either set eof=true or return an Err when reading |
311 | // an empty slice... |
312 | (Reading::Closed, None) |
313 | } else { |
314 | return Poll::Ready(Some(Ok(slice))); |
315 | }; |
316 | (reading, Poll::Ready(chunk)) |
317 | } |
318 | Err(e) => { |
319 | debug!("incoming body decode error: {}" , e); |
320 | (Reading::Closed, Poll::Ready(Some(Err(e)))) |
321 | } |
322 | } |
323 | } |
324 | Reading::Continue(ref decoder) => { |
325 | // Write the 100 Continue if not already responded... |
326 | if let Writing::Init = self.state.writing { |
327 | trace!("automatically sending 100 Continue" ); |
328 | let cont = b"HTTP/1.1 100 Continue \r\n\r\n" ; |
329 | self.io.headers_buf().extend_from_slice(cont); |
330 | } |
331 | |
332 | // And now recurse once in the Reading::Body state... |
333 | self.state.reading = Reading::Body(decoder.clone()); |
334 | return self.poll_read_body(cx); |
335 | } |
336 | _ => unreachable!("poll_read_body invalid state: {:?}" , self.state.reading), |
337 | }; |
338 | |
339 | self.state.reading = reading; |
340 | self.try_keep_alive(cx); |
341 | ret |
342 | } |
343 | |
344 | pub(crate) fn wants_read_again(&mut self) -> bool { |
345 | let ret = self.state.notify_read; |
346 | self.state.notify_read = false; |
347 | ret |
348 | } |
349 | |
350 | pub(crate) fn poll_read_keep_alive( |
351 | &mut self, |
352 | cx: &mut task::Context<'_>, |
353 | ) -> Poll<crate::Result<()>> { |
354 | debug_assert!(!self.can_read_head() && !self.can_read_body()); |
355 | |
356 | if self.is_read_closed() { |
357 | Poll::Pending |
358 | } else if self.is_mid_message() { |
359 | self.mid_message_detect_eof(cx) |
360 | } else { |
361 | self.require_empty_read(cx) |
362 | } |
363 | } |
364 | |
365 | fn is_mid_message(&self) -> bool { |
366 | !matches!( |
367 | (&self.state.reading, &self.state.writing), |
368 | (&Reading::Init, &Writing::Init) |
369 | ) |
370 | } |
371 | |
372 | // This will check to make sure the io object read is empty. |
373 | // |
374 | // This should only be called for Clients wanting to enter the idle |
375 | // state. |
376 | fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { |
377 | debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); |
378 | debug_assert!(!self.is_mid_message()); |
379 | debug_assert!(T::is_client()); |
380 | |
381 | if !self.io.read_buf().is_empty() { |
382 | debug!("received an unexpected {} bytes" , self.io.read_buf().len()); |
383 | return Poll::Ready(Err(crate::Error::new_unexpected_message())); |
384 | } |
385 | |
386 | let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; |
387 | |
388 | if num_read == 0 { |
389 | let ret = if self.should_error_on_eof() { |
390 | trace!("found unexpected EOF on busy connection: {:?}" , self.state); |
391 | Poll::Ready(Err(crate::Error::new_incomplete())) |
392 | } else { |
393 | trace!("found EOF on idle connection, closing" ); |
394 | Poll::Ready(Ok(())) |
395 | }; |
396 | |
397 | // order is important: should_error needs state BEFORE close_read |
398 | self.state.close_read(); |
399 | return ret; |
400 | } |
401 | |
402 | debug!( |
403 | "received unexpected {} bytes on an idle connection" , |
404 | num_read |
405 | ); |
406 | Poll::Ready(Err(crate::Error::new_unexpected_message())) |
407 | } |
408 | |
409 | fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { |
410 | debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); |
411 | debug_assert!(self.is_mid_message()); |
412 | |
413 | if self.state.allow_half_close || !self.io.read_buf().is_empty() { |
414 | return Poll::Pending; |
415 | } |
416 | |
417 | let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; |
418 | |
419 | if num_read == 0 { |
420 | trace!("found unexpected EOF on busy connection: {:?}" , self.state); |
421 | self.state.close_read(); |
422 | Poll::Ready(Err(crate::Error::new_incomplete())) |
423 | } else { |
424 | Poll::Ready(Ok(())) |
425 | } |
426 | } |
427 | |
428 | fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> { |
429 | debug_assert!(!self.state.is_read_closed()); |
430 | |
431 | let result = ready!(self.io.poll_read_from_io(cx)); |
432 | Poll::Ready(result.map_err(|e| { |
433 | trace!("force_io_read; io error = {:?}" , e); |
434 | self.state.close(); |
435 | e |
436 | })) |
437 | } |
438 | |
439 | fn maybe_notify(&mut self, cx: &mut task::Context<'_>) { |
440 | // its possible that we returned NotReady from poll() without having |
441 | // exhausted the underlying Io. We would have done this when we |
442 | // determined we couldn't keep reading until we knew how writing |
443 | // would finish. |
444 | |
445 | match self.state.reading { |
446 | Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => { |
447 | return |
448 | } |
449 | Reading::Init => (), |
450 | }; |
451 | |
452 | match self.state.writing { |
453 | Writing::Body(..) => return, |
454 | Writing::Init | Writing::KeepAlive | Writing::Closed => (), |
455 | } |
456 | |
457 | if !self.io.is_read_blocked() { |
458 | if self.io.read_buf().is_empty() { |
459 | match self.io.poll_read_from_io(cx) { |
460 | Poll::Ready(Ok(n)) => { |
461 | if n == 0 { |
462 | trace!("maybe_notify; read eof" ); |
463 | if self.state.is_idle() { |
464 | self.state.close(); |
465 | } else { |
466 | self.close_read() |
467 | } |
468 | return; |
469 | } |
470 | } |
471 | Poll::Pending => { |
472 | trace!("maybe_notify; read_from_io blocked" ); |
473 | return; |
474 | } |
475 | Poll::Ready(Err(e)) => { |
476 | trace!("maybe_notify; read_from_io error: {}" , e); |
477 | self.state.close(); |
478 | self.state.error = Some(crate::Error::new_io(e)); |
479 | } |
480 | } |
481 | } |
482 | self.state.notify_read = true; |
483 | } |
484 | } |
485 | |
486 | fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) { |
487 | self.state.try_keep_alive::<T>(); |
488 | self.maybe_notify(cx); |
489 | } |
490 | |
491 | pub(crate) fn can_write_head(&self) -> bool { |
492 | if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) { |
493 | return false; |
494 | } |
495 | |
496 | match self.state.writing { |
497 | Writing::Init => self.io.can_headers_buf(), |
498 | _ => false, |
499 | } |
500 | } |
501 | |
502 | pub(crate) fn can_write_body(&self) -> bool { |
503 | match self.state.writing { |
504 | Writing::Body(..) => true, |
505 | Writing::Init | Writing::KeepAlive | Writing::Closed => false, |
506 | } |
507 | } |
508 | |
509 | pub(crate) fn can_buffer_body(&self) -> bool { |
510 | self.io.can_buffer() |
511 | } |
512 | |
513 | pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) { |
514 | if let Some(encoder) = self.encode_head(head, body) { |
515 | self.state.writing = if !encoder.is_eof() { |
516 | Writing::Body(encoder) |
517 | } else if encoder.is_last() { |
518 | Writing::Closed |
519 | } else { |
520 | Writing::KeepAlive |
521 | }; |
522 | } |
523 | } |
524 | |
525 | pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) { |
526 | if let Some(encoder) = |
527 | self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) |
528 | { |
529 | let is_last = encoder.is_last(); |
530 | // Make sure we don't write a body if we weren't actually allowed |
531 | // to do so, like because its a HEAD request. |
532 | if !encoder.is_eof() { |
533 | encoder.danger_full_buf(body, self.io.write_buf()); |
534 | } |
535 | self.state.writing = if is_last { |
536 | Writing::Closed |
537 | } else { |
538 | Writing::KeepAlive |
539 | } |
540 | } |
541 | } |
542 | |
543 | fn encode_head( |
544 | &mut self, |
545 | mut head: MessageHead<T::Outgoing>, |
546 | body: Option<BodyLength>, |
547 | ) -> Option<Encoder> { |
548 | debug_assert!(self.can_write_head()); |
549 | |
550 | if !T::should_read_first() { |
551 | self.state.busy(); |
552 | } |
553 | |
554 | self.enforce_version(&mut head); |
555 | |
556 | let buf = self.io.headers_buf(); |
557 | match super::role::encode_headers::<T>( |
558 | Encode { |
559 | head: &mut head, |
560 | body, |
561 | #[cfg (feature = "server" )] |
562 | keep_alive: self.state.wants_keep_alive(), |
563 | req_method: &mut self.state.method, |
564 | title_case_headers: self.state.title_case_headers, |
565 | }, |
566 | buf, |
567 | ) { |
568 | Ok(encoder) => { |
569 | debug_assert!(self.state.cached_headers.is_none()); |
570 | debug_assert!(head.headers.is_empty()); |
571 | self.state.cached_headers = Some(head.headers); |
572 | |
573 | #[cfg (feature = "ffi" )] |
574 | { |
575 | self.state.on_informational = |
576 | head.extensions.remove::<crate::ffi::OnInformational>(); |
577 | } |
578 | |
579 | Some(encoder) |
580 | } |
581 | Err(err) => { |
582 | self.state.error = Some(err); |
583 | self.state.writing = Writing::Closed; |
584 | None |
585 | } |
586 | } |
587 | } |
588 | |
589 | // Fix keep-alive when Connection: keep-alive header is not present |
590 | fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) { |
591 | let outgoing_is_keep_alive = head |
592 | .headers |
593 | .get(CONNECTION) |
594 | .map(connection_keep_alive) |
595 | .unwrap_or(false); |
596 | |
597 | if !outgoing_is_keep_alive { |
598 | match head.version { |
599 | // If response is version 1.0 and keep-alive is not present in the response, |
600 | // disable keep-alive so the server closes the connection |
601 | Version::HTTP_10 => self.state.disable_keep_alive(), |
602 | // If response is version 1.1 and keep-alive is wanted, add |
603 | // Connection: keep-alive header when not present |
604 | Version::HTTP_11 => { |
605 | if self.state.wants_keep_alive() { |
606 | head.headers |
607 | .insert(CONNECTION, HeaderValue::from_static("keep-alive" )); |
608 | } |
609 | } |
610 | _ => (), |
611 | } |
612 | } |
613 | } |
614 | |
615 | // If we know the remote speaks an older version, we try to fix up any messages |
616 | // to work with our older peer. |
617 | fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { |
618 | if let Version::HTTP_10 = self.state.version { |
619 | // Fixes response or connection when keep-alive header is not present |
620 | self.fix_keep_alive(head); |
621 | // If the remote only knows HTTP/1.0, we should force ourselves |
622 | // to do only speak HTTP/1.0 as well. |
623 | head.version = Version::HTTP_10; |
624 | } |
625 | // If the remote speaks HTTP/1.1, then it *should* be fine with |
626 | // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let |
627 | // the user's headers be. |
628 | } |
629 | |
630 | pub(crate) fn write_body(&mut self, chunk: B) { |
631 | debug_assert!(self.can_write_body() && self.can_buffer_body()); |
632 | // empty chunks should be discarded at Dispatcher level |
633 | debug_assert!(chunk.remaining() != 0); |
634 | |
635 | let state = match self.state.writing { |
636 | Writing::Body(ref mut encoder) => { |
637 | self.io.buffer(encoder.encode(chunk)); |
638 | |
639 | if !encoder.is_eof() { |
640 | return; |
641 | } |
642 | |
643 | if encoder.is_last() { |
644 | Writing::Closed |
645 | } else { |
646 | Writing::KeepAlive |
647 | } |
648 | } |
649 | _ => unreachable!("write_body invalid state: {:?}" , self.state.writing), |
650 | }; |
651 | |
652 | self.state.writing = state; |
653 | } |
654 | |
655 | pub(crate) fn write_body_and_end(&mut self, chunk: B) { |
656 | debug_assert!(self.can_write_body() && self.can_buffer_body()); |
657 | // empty chunks should be discarded at Dispatcher level |
658 | debug_assert!(chunk.remaining() != 0); |
659 | |
660 | let state = match self.state.writing { |
661 | Writing::Body(ref encoder) => { |
662 | let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf()); |
663 | if can_keep_alive { |
664 | Writing::KeepAlive |
665 | } else { |
666 | Writing::Closed |
667 | } |
668 | } |
669 | _ => unreachable!("write_body invalid state: {:?}" , self.state.writing), |
670 | }; |
671 | |
672 | self.state.writing = state; |
673 | } |
674 | |
675 | pub(crate) fn end_body(&mut self) -> crate::Result<()> { |
676 | debug_assert!(self.can_write_body()); |
677 | |
678 | let encoder = match self.state.writing { |
679 | Writing::Body(ref mut enc) => enc, |
680 | _ => return Ok(()), |
681 | }; |
682 | |
683 | // end of stream, that means we should try to eof |
684 | match encoder.end() { |
685 | Ok(end) => { |
686 | if let Some(end) = end { |
687 | self.io.buffer(end); |
688 | } |
689 | |
690 | self.state.writing = if encoder.is_last() || encoder.is_close_delimited() { |
691 | Writing::Closed |
692 | } else { |
693 | Writing::KeepAlive |
694 | }; |
695 | |
696 | Ok(()) |
697 | } |
698 | Err(not_eof) => { |
699 | self.state.writing = Writing::Closed; |
700 | Err(crate::Error::new_body_write_aborted().with(not_eof)) |
701 | } |
702 | } |
703 | } |
704 | |
705 | // When we get a parse error, depending on what side we are, we might be able |
706 | // to write a response before closing the connection. |
707 | // |
708 | // - Client: there is nothing we can do |
709 | // - Server: if Response hasn't been written yet, we can send a 4xx response |
710 | fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { |
711 | if let Writing::Init = self.state.writing { |
712 | if self.has_h2_prefix() { |
713 | return Err(crate::Error::new_version_h2()); |
714 | } |
715 | if let Some(msg) = T::on_error(&err) { |
716 | // Drop the cached headers so as to not trigger a debug |
717 | // assert in `write_head`... |
718 | self.state.cached_headers.take(); |
719 | self.write_head(msg, None); |
720 | self.state.error = Some(err); |
721 | return Ok(()); |
722 | } |
723 | } |
724 | |
725 | // fallback is pass the error back up |
726 | Err(err) |
727 | } |
728 | |
729 | pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { |
730 | ready!(Pin::new(&mut self.io).poll_flush(cx))?; |
731 | self.try_keep_alive(cx); |
732 | trace!("flushed( {}): {:?}" , T::LOG, self.state); |
733 | Poll::Ready(Ok(())) |
734 | } |
735 | |
736 | pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { |
737 | match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { |
738 | Ok(()) => { |
739 | trace!("shut down IO complete" ); |
740 | Poll::Ready(Ok(())) |
741 | } |
742 | Err(e) => { |
743 | debug!("error shutting down IO: {}" , e); |
744 | Poll::Ready(Err(e)) |
745 | } |
746 | } |
747 | } |
748 | |
749 | /// If the read side can be cheaply drained, do so. Otherwise, close. |
750 | pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { |
751 | let _ = self.poll_read_body(cx); |
752 | |
753 | // If still in Reading::Body, just give up |
754 | match self.state.reading { |
755 | Reading::Init | Reading::KeepAlive => trace!("body drained" ), |
756 | _ => self.close_read(), |
757 | } |
758 | } |
759 | |
760 | pub(crate) fn close_read(&mut self) { |
761 | self.state.close_read(); |
762 | } |
763 | |
764 | pub(crate) fn close_write(&mut self) { |
765 | self.state.close_write(); |
766 | } |
767 | |
768 | #[cfg (feature = "server" )] |
769 | pub(crate) fn disable_keep_alive(&mut self) { |
770 | if self.state.is_idle() { |
771 | trace!("disable_keep_alive; closing idle connection" ); |
772 | self.state.close(); |
773 | } else { |
774 | trace!("disable_keep_alive; in-progress connection" ); |
775 | self.state.disable_keep_alive(); |
776 | } |
777 | } |
778 | |
779 | pub(crate) fn take_error(&mut self) -> crate::Result<()> { |
780 | if let Some(err) = self.state.error.take() { |
781 | Err(err) |
782 | } else { |
783 | Ok(()) |
784 | } |
785 | } |
786 | |
787 | pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { |
788 | trace!(" {}: prepare possible HTTP upgrade" , T::LOG); |
789 | self.state.prepare_upgrade() |
790 | } |
791 | } |
792 | |
793 | impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> { |
794 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
795 | f&mut DebugStruct<'_, '_>.debug_struct("Conn" ) |
796 | .field("state" , &self.state) |
797 | .field(name:"io" , &self.io) |
798 | .finish() |
799 | } |
800 | } |
801 | |
802 | // B and T are never pinned |
803 | impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {} |
804 | |
805 | struct State { |
806 | allow_half_close: bool, |
807 | /// Re-usable HeaderMap to reduce allocating new ones. |
808 | cached_headers: Option<HeaderMap>, |
809 | /// If an error occurs when there wasn't a direct way to return it |
810 | /// back to the user, this is set. |
811 | error: Option<crate::Error>, |
812 | /// Current keep-alive status. |
813 | keep_alive: KA, |
814 | /// If mid-message, the HTTP Method that started it. |
815 | /// |
816 | /// This is used to know things such as if the message can include |
817 | /// a body or not. |
818 | method: Option<Method>, |
819 | h1_parser_config: ParserConfig, |
820 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
821 | h1_header_read_timeout: Option<Duration>, |
822 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
823 | h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>, |
824 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
825 | h1_header_read_timeout_running: bool, |
826 | preserve_header_case: bool, |
827 | #[cfg (feature = "ffi" )] |
828 | preserve_header_order: bool, |
829 | title_case_headers: bool, |
830 | h09_responses: bool, |
831 | /// If set, called with each 1xx informational response received for |
832 | /// the current request. MUST be unset after a non-1xx response is |
833 | /// received. |
834 | #[cfg (feature = "ffi" )] |
835 | on_informational: Option<crate::ffi::OnInformational>, |
836 | #[cfg (feature = "ffi" )] |
837 | raw_headers: bool, |
838 | /// Set to true when the Dispatcher should poll read operations |
839 | /// again. See the `maybe_notify` method for more. |
840 | notify_read: bool, |
841 | /// State of allowed reads |
842 | reading: Reading, |
843 | /// State of allowed writes |
844 | writing: Writing, |
845 | /// An expected pending HTTP upgrade. |
846 | upgrade: Option<crate::upgrade::Pending>, |
847 | /// Either HTTP/1.0 or 1.1 connection |
848 | version: Version, |
849 | } |
850 | |
851 | #[derive (Debug)] |
852 | enum Reading { |
853 | Init, |
854 | Continue(Decoder), |
855 | Body(Decoder), |
856 | KeepAlive, |
857 | Closed, |
858 | } |
859 | |
860 | enum Writing { |
861 | Init, |
862 | Body(Encoder), |
863 | KeepAlive, |
864 | Closed, |
865 | } |
866 | |
867 | impl fmt::Debug for State { |
868 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
869 | let mut builder: DebugStruct<'_, '_> = f.debug_struct(name:"State" ); |
870 | builder |
871 | .field("reading" , &self.reading) |
872 | .field("writing" , &self.writing) |
873 | .field(name:"keep_alive" , &self.keep_alive); |
874 | |
875 | // Only show error field if it's interesting... |
876 | if let Some(ref error: &Error) = self.error { |
877 | builder.field(name:"error" , value:error); |
878 | } |
879 | |
880 | if self.allow_half_close { |
881 | builder.field(name:"allow_half_close" , &true); |
882 | } |
883 | |
884 | // Purposefully leaving off other fields.. |
885 | |
886 | builder.finish() |
887 | } |
888 | } |
889 | |
890 | impl fmt::Debug for Writing { |
891 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
892 | match *self { |
893 | Writing::Init => f.write_str(data:"Init" ), |
894 | Writing::Body(ref enc: &Encoder) => f.debug_tuple(name:"Body" ).field(enc).finish(), |
895 | Writing::KeepAlive => f.write_str(data:"KeepAlive" ), |
896 | Writing::Closed => f.write_str(data:"Closed" ), |
897 | } |
898 | } |
899 | } |
900 | |
901 | impl std::ops::BitAndAssign<bool> for KA { |
902 | fn bitand_assign(&mut self, enabled: bool) { |
903 | if !enabled { |
904 | trace!("remote disabling keep-alive" ); |
905 | *self = KA::Disabled; |
906 | } |
907 | } |
908 | } |
909 | |
910 | #[derive (Clone, Copy, Debug)] |
911 | enum KA { |
912 | Idle, |
913 | Busy, |
914 | Disabled, |
915 | } |
916 | |
917 | impl Default for KA { |
918 | fn default() -> KA { |
919 | KA::Busy |
920 | } |
921 | } |
922 | |
923 | impl KA { |
924 | fn idle(&mut self) { |
925 | *self = KA::Idle; |
926 | } |
927 | |
928 | fn busy(&mut self) { |
929 | *self = KA::Busy; |
930 | } |
931 | |
932 | fn disable(&mut self) { |
933 | *self = KA::Disabled; |
934 | } |
935 | |
936 | fn status(&self) -> KA { |
937 | *self |
938 | } |
939 | } |
940 | |
941 | impl State { |
942 | fn close(&mut self) { |
943 | trace!("State::close()" ); |
944 | self.reading = Reading::Closed; |
945 | self.writing = Writing::Closed; |
946 | self.keep_alive.disable(); |
947 | } |
948 | |
949 | fn close_read(&mut self) { |
950 | trace!("State::close_read()" ); |
951 | self.reading = Reading::Closed; |
952 | self.keep_alive.disable(); |
953 | } |
954 | |
955 | fn close_write(&mut self) { |
956 | trace!("State::close_write()" ); |
957 | self.writing = Writing::Closed; |
958 | self.keep_alive.disable(); |
959 | } |
960 | |
961 | fn wants_keep_alive(&self) -> bool { |
962 | if let KA::Disabled = self.keep_alive.status() { |
963 | false |
964 | } else { |
965 | true |
966 | } |
967 | } |
968 | |
969 | fn try_keep_alive<T: Http1Transaction>(&mut self) { |
970 | match (&self.reading, &self.writing) { |
971 | (&Reading::KeepAlive, &Writing::KeepAlive) => { |
972 | if let KA::Busy = self.keep_alive.status() { |
973 | self.idle::<T>(); |
974 | } else { |
975 | trace!( |
976 | "try_keep_alive( {}): could keep-alive, but status = {:?}" , |
977 | T::LOG, |
978 | self.keep_alive |
979 | ); |
980 | self.close(); |
981 | } |
982 | } |
983 | (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => { |
984 | self.close() |
985 | } |
986 | _ => (), |
987 | } |
988 | } |
989 | |
990 | fn disable_keep_alive(&mut self) { |
991 | self.keep_alive.disable() |
992 | } |
993 | |
994 | fn busy(&mut self) { |
995 | if let KA::Disabled = self.keep_alive.status() { |
996 | return; |
997 | } |
998 | self.keep_alive.busy(); |
999 | } |
1000 | |
1001 | fn idle<T: Http1Transaction>(&mut self) { |
1002 | debug_assert!(!self.is_idle(), "State::idle() called while idle" ); |
1003 | |
1004 | self.method = None; |
1005 | self.keep_alive.idle(); |
1006 | |
1007 | if !self.is_idle() { |
1008 | self.close(); |
1009 | return; |
1010 | } |
1011 | |
1012 | self.reading = Reading::Init; |
1013 | self.writing = Writing::Init; |
1014 | |
1015 | // !T::should_read_first() means Client. |
1016 | // |
1017 | // If Client connection has just gone idle, the Dispatcher |
1018 | // should try the poll loop one more time, so as to poll the |
1019 | // pending requests stream. |
1020 | if !T::should_read_first() { |
1021 | self.notify_read = true; |
1022 | } |
1023 | } |
1024 | |
1025 | fn is_idle(&self) -> bool { |
1026 | matches!(self.keep_alive.status(), KA::Idle) |
1027 | } |
1028 | |
1029 | fn is_read_closed(&self) -> bool { |
1030 | matches!(self.reading, Reading::Closed) |
1031 | } |
1032 | |
1033 | fn is_write_closed(&self) -> bool { |
1034 | matches!(self.writing, Writing::Closed) |
1035 | } |
1036 | |
1037 | fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { |
1038 | let (tx, rx) = crate::upgrade::pending(); |
1039 | self.upgrade = Some(tx); |
1040 | rx |
1041 | } |
1042 | } |
1043 | |
1044 | #[cfg (test)] |
1045 | mod tests { |
1046 | #[cfg (feature = "nightly" )] |
1047 | #[bench ] |
1048 | fn bench_read_head_short(b: &mut ::test::Bencher) { |
1049 | use super::*; |
1050 | let s = b"GET / HTTP/1.1 \r\nHost: localhost:8080 \r\n\r\n" ; |
1051 | let len = s.len(); |
1052 | b.bytes = len as u64; |
1053 | |
1054 | // an empty IO, we'll be skipping and using the read buffer anyways |
1055 | let io = tokio_test::io::Builder::new().build(); |
1056 | let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); |
1057 | *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); |
1058 | conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); |
1059 | |
1060 | let rt = tokio::runtime::Builder::new_current_thread() |
1061 | .enable_all() |
1062 | .build() |
1063 | .unwrap(); |
1064 | |
1065 | b.iter(|| { |
1066 | rt.block_on(futures_util::future::poll_fn(|cx| { |
1067 | match conn.poll_read_head(cx) { |
1068 | Poll::Ready(Some(Ok(x))) => { |
1069 | ::test::black_box(&x); |
1070 | let mut headers = x.0.headers; |
1071 | headers.clear(); |
1072 | conn.state.cached_headers = Some(headers); |
1073 | } |
1074 | f => panic!("expected Ready(Some(Ok(..))): {:?}" , f), |
1075 | } |
1076 | |
1077 | conn.io.read_buf_mut().reserve(1); |
1078 | unsafe { |
1079 | conn.io.read_buf_mut().set_len(len); |
1080 | } |
1081 | conn.state.reading = Reading::Init; |
1082 | Poll::Ready(()) |
1083 | })); |
1084 | }); |
1085 | } |
1086 | |
1087 | /* |
1088 | //TODO: rewrite these using dispatch... someday... |
1089 | use futures::{Async, Future, Stream, Sink}; |
1090 | use futures::future; |
1091 | |
1092 | use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; |
1093 | use super::super::Encoder; |
1094 | use mock::AsyncIo; |
1095 | |
1096 | use super::{Conn, Decoder, Reading, Writing}; |
1097 | use ::uri::Uri; |
1098 | |
1099 | use std::str::FromStr; |
1100 | |
1101 | #[test] |
1102 | fn test_conn_init_read() { |
1103 | let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); |
1104 | let len = good_message.len(); |
1105 | let io = AsyncIo::new_buf(good_message, len); |
1106 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1107 | |
1108 | match conn.poll().unwrap() { |
1109 | Async::Ready(Some(Frame::Message { message, body: false })) => { |
1110 | assert_eq!(message, MessageHead { |
1111 | subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()), |
1112 | .. MessageHead::default() |
1113 | }) |
1114 | }, |
1115 | f => panic!("frame is not Frame::Message: {:?}", f) |
1116 | } |
1117 | } |
1118 | |
1119 | #[test] |
1120 | fn test_conn_parse_partial() { |
1121 | let _: Result<(), ()> = future::lazy(|| { |
1122 | let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); |
1123 | let io = AsyncIo::new_buf(good_message, 10); |
1124 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1125 | assert!(conn.poll().unwrap().is_not_ready()); |
1126 | conn.io.io_mut().block_in(50); |
1127 | let async = conn.poll().unwrap(); |
1128 | assert!(async.is_ready()); |
1129 | match async { |
1130 | Async::Ready(Some(Frame::Message { .. })) => (), |
1131 | f => panic!("frame is not Message: {:?}", f), |
1132 | } |
1133 | Ok(()) |
1134 | }).wait(); |
1135 | } |
1136 | |
1137 | #[test] |
1138 | fn test_conn_init_read_eof_idle() { |
1139 | let io = AsyncIo::new_buf(vec![], 1); |
1140 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1141 | conn.state.idle(); |
1142 | |
1143 | match conn.poll().unwrap() { |
1144 | Async::Ready(None) => {}, |
1145 | other => panic!("frame is not None: {:?}", other) |
1146 | } |
1147 | } |
1148 | |
1149 | #[test] |
1150 | fn test_conn_init_read_eof_idle_partial_parse() { |
1151 | let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); |
1152 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1153 | conn.state.idle(); |
1154 | |
1155 | match conn.poll() { |
1156 | Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, |
1157 | other => panic!("unexpected frame: {:?}", other) |
1158 | } |
1159 | } |
1160 | |
1161 | #[test] |
1162 | fn test_conn_init_read_eof_busy() { |
1163 | let _: Result<(), ()> = future::lazy(|| { |
1164 | // server ignores |
1165 | let io = AsyncIo::new_eof(); |
1166 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1167 | conn.state.busy(); |
1168 | |
1169 | match conn.poll().unwrap() { |
1170 | Async::Ready(None) => {}, |
1171 | other => panic!("unexpected frame: {:?}", other) |
1172 | } |
1173 | |
1174 | // client |
1175 | let io = AsyncIo::new_eof(); |
1176 | let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); |
1177 | conn.state.busy(); |
1178 | |
1179 | match conn.poll() { |
1180 | Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, |
1181 | other => panic!("unexpected frame: {:?}", other) |
1182 | } |
1183 | Ok(()) |
1184 | }).wait(); |
1185 | } |
1186 | |
1187 | #[test] |
1188 | fn test_conn_body_finish_read_eof() { |
1189 | let _: Result<(), ()> = future::lazy(|| { |
1190 | let io = AsyncIo::new_eof(); |
1191 | let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); |
1192 | conn.state.busy(); |
1193 | conn.state.writing = Writing::KeepAlive; |
1194 | conn.state.reading = Reading::Body(Decoder::length(0)); |
1195 | |
1196 | match conn.poll() { |
1197 | Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), |
1198 | other => panic!("unexpected frame: {:?}", other) |
1199 | } |
1200 | |
1201 | // conn eofs, but tokio-proto will call poll() again, before calling flush() |
1202 | // the conn eof in this case is perfectly fine |
1203 | |
1204 | match conn.poll() { |
1205 | Ok(Async::Ready(None)) => (), |
1206 | other => panic!("unexpected frame: {:?}", other) |
1207 | } |
1208 | Ok(()) |
1209 | }).wait(); |
1210 | } |
1211 | |
1212 | #[test] |
1213 | fn test_conn_message_empty_body_read_eof() { |
1214 | let _: Result<(), ()> = future::lazy(|| { |
1215 | let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); |
1216 | let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); |
1217 | conn.state.busy(); |
1218 | conn.state.writing = Writing::KeepAlive; |
1219 | |
1220 | match conn.poll() { |
1221 | Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), |
1222 | other => panic!("unexpected frame: {:?}", other) |
1223 | } |
1224 | |
1225 | // conn eofs, but tokio-proto will call poll() again, before calling flush() |
1226 | // the conn eof in this case is perfectly fine |
1227 | |
1228 | match conn.poll() { |
1229 | Ok(Async::Ready(None)) => (), |
1230 | other => panic!("unexpected frame: {:?}", other) |
1231 | } |
1232 | Ok(()) |
1233 | }).wait(); |
1234 | } |
1235 | |
1236 | #[test] |
1237 | fn test_conn_read_body_end() { |
1238 | let _: Result<(), ()> = future::lazy(|| { |
1239 | let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); |
1240 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1241 | conn.state.busy(); |
1242 | |
1243 | match conn.poll() { |
1244 | Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), |
1245 | other => panic!("unexpected frame: {:?}", other) |
1246 | } |
1247 | |
1248 | match conn.poll() { |
1249 | Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), |
1250 | other => panic!("unexpected frame: {:?}", other) |
1251 | } |
1252 | |
1253 | // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` |
1254 | match conn.poll() { |
1255 | Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), |
1256 | other => panic!("unexpected frame: {:?}", other) |
1257 | } |
1258 | |
1259 | match conn.poll() { |
1260 | Ok(Async::NotReady) => (), |
1261 | other => panic!("unexpected frame: {:?}", other) |
1262 | } |
1263 | Ok(()) |
1264 | }).wait(); |
1265 | } |
1266 | |
1267 | #[test] |
1268 | fn test_conn_closed_read() { |
1269 | let io = AsyncIo::new_buf(vec![], 0); |
1270 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1271 | conn.state.close(); |
1272 | |
1273 | match conn.poll().unwrap() { |
1274 | Async::Ready(None) => {}, |
1275 | other => panic!("frame is not None: {:?}", other) |
1276 | } |
1277 | } |
1278 | |
1279 | #[test] |
1280 | fn test_conn_body_write_length() { |
1281 | let _ = pretty_env_logger::try_init(); |
1282 | let _: Result<(), ()> = future::lazy(|| { |
1283 | let io = AsyncIo::new_buf(vec![], 0); |
1284 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1285 | let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; |
1286 | conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); |
1287 | |
1288 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); |
1289 | assert!(!conn.can_buffer_body()); |
1290 | |
1291 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); |
1292 | |
1293 | conn.io.io_mut().block_in(1024 * 3); |
1294 | assert!(conn.poll_complete().unwrap().is_not_ready()); |
1295 | conn.io.io_mut().block_in(1024 * 3); |
1296 | assert!(conn.poll_complete().unwrap().is_not_ready()); |
1297 | conn.io.io_mut().block_in(max * 2); |
1298 | assert!(conn.poll_complete().unwrap().is_ready()); |
1299 | |
1300 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready()); |
1301 | Ok(()) |
1302 | }).wait(); |
1303 | } |
1304 | |
1305 | #[test] |
1306 | fn test_conn_body_write_chunked() { |
1307 | let _: Result<(), ()> = future::lazy(|| { |
1308 | let io = AsyncIo::new_buf(vec![], 4096); |
1309 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1310 | conn.state.writing = Writing::Body(Encoder::chunked()); |
1311 | |
1312 | assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); |
1313 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); |
1314 | Ok(()) |
1315 | }).wait(); |
1316 | } |
1317 | |
1318 | #[test] |
1319 | fn test_conn_body_flush() { |
1320 | let _: Result<(), ()> = future::lazy(|| { |
1321 | let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); |
1322 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1323 | conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); |
1324 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); |
1325 | assert!(!conn.can_buffer_body()); |
1326 | conn.io.io_mut().block_in(1024 * 1024 * 5); |
1327 | assert!(conn.poll_complete().unwrap().is_ready()); |
1328 | assert!(conn.can_buffer_body()); |
1329 | assert!(conn.io.io_mut().flushed()); |
1330 | |
1331 | Ok(()) |
1332 | }).wait(); |
1333 | } |
1334 | |
1335 | #[test] |
1336 | fn test_conn_parking() { |
1337 | use std::sync::Arc; |
1338 | use futures::executor::Notify; |
1339 | use futures::executor::NotifyHandle; |
1340 | |
1341 | struct Car { |
1342 | permit: bool, |
1343 | } |
1344 | impl Notify for Car { |
1345 | fn notify(&self, _id: usize) { |
1346 | assert!(self.permit, "unparked without permit"); |
1347 | } |
1348 | } |
1349 | |
1350 | fn car(permit: bool) -> NotifyHandle { |
1351 | Arc::new(Car { |
1352 | permit: permit, |
1353 | }).into() |
1354 | } |
1355 | |
1356 | // test that once writing is done, unparks |
1357 | let f = future::lazy(|| { |
1358 | let io = AsyncIo::new_buf(vec![], 4096); |
1359 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1360 | conn.state.reading = Reading::KeepAlive; |
1361 | assert!(conn.poll().unwrap().is_not_ready()); |
1362 | |
1363 | conn.state.writing = Writing::KeepAlive; |
1364 | assert!(conn.poll_complete().unwrap().is_ready()); |
1365 | Ok::<(), ()>(()) |
1366 | }); |
1367 | ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap(); |
1368 | |
1369 | |
1370 | // test that flushing when not waiting on read doesn't unpark |
1371 | let f = future::lazy(|| { |
1372 | let io = AsyncIo::new_buf(vec![], 4096); |
1373 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1374 | conn.state.writing = Writing::KeepAlive; |
1375 | assert!(conn.poll_complete().unwrap().is_ready()); |
1376 | Ok::<(), ()>(()) |
1377 | }); |
1378 | ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); |
1379 | |
1380 | |
1381 | // test that flushing and writing isn't done doesn't unpark |
1382 | let f = future::lazy(|| { |
1383 | let io = AsyncIo::new_buf(vec![], 4096); |
1384 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1385 | conn.state.reading = Reading::KeepAlive; |
1386 | assert!(conn.poll().unwrap().is_not_ready()); |
1387 | conn.state.writing = Writing::Body(Encoder::length(5_000)); |
1388 | assert!(conn.poll_complete().unwrap().is_ready()); |
1389 | Ok::<(), ()>(()) |
1390 | }); |
1391 | ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); |
1392 | } |
1393 | |
1394 | #[test] |
1395 | fn test_conn_closed_write() { |
1396 | let io = AsyncIo::new_buf(vec![], 0); |
1397 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1398 | conn.state.close(); |
1399 | |
1400 | match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { |
1401 | Err(_e) => {}, |
1402 | other => panic!("did not return Err: {:?}", other) |
1403 | } |
1404 | |
1405 | assert!(conn.state.is_write_closed()); |
1406 | } |
1407 | |
1408 | #[test] |
1409 | fn test_conn_write_empty_chunk() { |
1410 | let io = AsyncIo::new_buf(vec![], 0); |
1411 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1412 | conn.state.writing = Writing::KeepAlive; |
1413 | |
1414 | assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); |
1415 | assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); |
1416 | conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); |
1417 | } |
1418 | */ |
1419 | } |
1420 | |