1 | use std::fmt; |
2 | use std::io; |
3 | use std::marker::PhantomData; |
4 | use std::marker::Unpin; |
5 | use std::pin::Pin; |
6 | use std::task::{Context, Poll}; |
7 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
8 | use std::time::Duration; |
9 | |
10 | use bytes::{Buf, Bytes}; |
11 | use http::header::{HeaderValue, CONNECTION}; |
12 | use http::{HeaderMap, Method, Version}; |
13 | use httparse::ParserConfig; |
14 | use tokio::io::{AsyncRead, AsyncWrite}; |
15 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
16 | use tokio::time::Sleep; |
17 | use tracing::{debug, error, trace}; |
18 | |
19 | use super::io::Buffered; |
20 | use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; |
21 | use crate::body::DecodedLength; |
22 | use crate::headers::connection_keep_alive; |
23 | use crate::proto::{BodyLength, MessageHead}; |
24 | |
25 | const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0 \r\n\r\nSM \r\n\r\n" ; |
26 | |
27 | /// This handles a connection, which will have been established over an |
28 | /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple |
29 | /// `Transaction`s over HTTP. |
30 | /// |
31 | /// The connection will determine when a message begins and ends as well as |
32 | /// determine if this connection can be kept alive after the message, |
33 | /// or if it is complete. |
34 | pub(crate) struct Conn<I, B, T> { |
35 | io: Buffered<I, EncodedBuf<B>>, |
36 | state: State, |
37 | _marker: PhantomData<fn(T)>, |
38 | } |
39 | |
40 | impl<I, B, T> Conn<I, B, T> |
41 | where |
42 | I: AsyncRead + AsyncWrite + Unpin, |
43 | B: Buf, |
44 | T: Http1Transaction, |
45 | { |
46 | pub(crate) fn new(io: I) -> Conn<I, B, T> { |
47 | Conn { |
48 | io: Buffered::new(io), |
49 | state: State { |
50 | allow_half_close: false, |
51 | cached_headers: None, |
52 | error: None, |
53 | keep_alive: KA::Busy, |
54 | method: None, |
55 | h1_parser_config: ParserConfig::default(), |
56 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
57 | h1_header_read_timeout: None, |
58 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
59 | h1_header_read_timeout_fut: None, |
60 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
61 | h1_header_read_timeout_running: false, |
62 | preserve_header_case: false, |
63 | #[cfg (feature = "ffi" )] |
64 | preserve_header_order: false, |
65 | title_case_headers: false, |
66 | h09_responses: false, |
67 | #[cfg (feature = "ffi" )] |
68 | on_informational: None, |
69 | #[cfg (feature = "ffi" )] |
70 | raw_headers: false, |
71 | notify_read: false, |
72 | reading: Reading::Init, |
73 | writing: Writing::Init, |
74 | upgrade: None, |
75 | // We assume a modern world where the remote speaks HTTP/1.1. |
76 | // If they tell us otherwise, we'll downgrade in `read_head`. |
77 | version: Version::HTTP_11, |
78 | }, |
79 | _marker: PhantomData, |
80 | } |
81 | } |
82 | |
83 | #[cfg (feature = "server" )] |
84 | pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) { |
85 | self.io.set_flush_pipeline(enabled); |
86 | } |
87 | |
88 | pub(crate) fn set_write_strategy_queue(&mut self) { |
89 | self.io.set_write_strategy_queue(); |
90 | } |
91 | |
92 | pub(crate) fn set_max_buf_size(&mut self, max: usize) { |
93 | self.io.set_max_buf_size(max); |
94 | } |
95 | |
96 | #[cfg (feature = "client" )] |
97 | pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) { |
98 | self.io.set_read_buf_exact_size(sz); |
99 | } |
100 | |
101 | pub(crate) fn set_write_strategy_flatten(&mut self) { |
102 | self.io.set_write_strategy_flatten(); |
103 | } |
104 | |
105 | #[cfg (feature = "client" )] |
106 | pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) { |
107 | self.state.h1_parser_config = parser_config; |
108 | } |
109 | |
110 | pub(crate) fn set_title_case_headers(&mut self) { |
111 | self.state.title_case_headers = true; |
112 | } |
113 | |
114 | pub(crate) fn set_preserve_header_case(&mut self) { |
115 | self.state.preserve_header_case = true; |
116 | } |
117 | |
118 | #[cfg (feature = "ffi" )] |
119 | pub(crate) fn set_preserve_header_order(&mut self) { |
120 | self.state.preserve_header_order = true; |
121 | } |
122 | |
123 | #[cfg (feature = "client" )] |
124 | pub(crate) fn set_h09_responses(&mut self) { |
125 | self.state.h09_responses = true; |
126 | } |
127 | |
128 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
129 | pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) { |
130 | self.state.h1_header_read_timeout = Some(val); |
131 | } |
132 | |
133 | #[cfg (feature = "server" )] |
134 | pub(crate) fn set_allow_half_close(&mut self) { |
135 | self.state.allow_half_close = true; |
136 | } |
137 | |
138 | #[cfg (feature = "ffi" )] |
139 | pub(crate) fn set_raw_headers(&mut self, enabled: bool) { |
140 | self.state.raw_headers = enabled; |
141 | } |
142 | |
143 | pub(crate) fn into_inner(self) -> (I, Bytes) { |
144 | self.io.into_inner() |
145 | } |
146 | |
147 | pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> { |
148 | self.state.upgrade.take() |
149 | } |
150 | |
151 | pub(crate) fn is_read_closed(&self) -> bool { |
152 | self.state.is_read_closed() |
153 | } |
154 | |
155 | pub(crate) fn is_write_closed(&self) -> bool { |
156 | self.state.is_write_closed() |
157 | } |
158 | |
159 | pub(crate) fn can_read_head(&self) -> bool { |
160 | if !matches!(self.state.reading, Reading::Init) { |
161 | return false; |
162 | } |
163 | |
164 | if T::should_read_first() { |
165 | return true; |
166 | } |
167 | |
168 | !matches!(self.state.writing, Writing::Init) |
169 | } |
170 | |
171 | pub(crate) fn can_read_body(&self) -> bool { |
172 | match self.state.reading { |
173 | Reading::Body(..) | Reading::Continue(..) => true, |
174 | _ => false, |
175 | } |
176 | } |
177 | |
178 | fn should_error_on_eof(&self) -> bool { |
179 | // If we're idle, it's probably just the connection closing gracefully. |
180 | T::should_error_on_parse_eof() && !self.state.is_idle() |
181 | } |
182 | |
183 | fn has_h2_prefix(&self) -> bool { |
184 | let read_buf = self.io.read_buf(); |
185 | read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE |
186 | } |
187 | |
188 | pub(super) fn poll_read_head( |
189 | &mut self, |
190 | cx: &mut Context<'_>, |
191 | ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> { |
192 | debug_assert!(self.can_read_head()); |
193 | trace!("Conn::read_head" ); |
194 | |
195 | let msg = match ready!(self.io.parse::<T>( |
196 | cx, |
197 | ParseContext { |
198 | cached_headers: &mut self.state.cached_headers, |
199 | req_method: &mut self.state.method, |
200 | h1_parser_config: self.state.h1_parser_config.clone(), |
201 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
202 | h1_header_read_timeout: self.state.h1_header_read_timeout, |
203 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
204 | h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut, |
205 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
206 | h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running, |
207 | preserve_header_case: self.state.preserve_header_case, |
208 | #[cfg (feature = "ffi" )] |
209 | preserve_header_order: self.state.preserve_header_order, |
210 | h09_responses: self.state.h09_responses, |
211 | #[cfg (feature = "ffi" )] |
212 | on_informational: &mut self.state.on_informational, |
213 | #[cfg (feature = "ffi" )] |
214 | raw_headers: self.state.raw_headers, |
215 | } |
216 | )) { |
217 | Ok(msg) => msg, |
218 | Err(e) => return self.on_read_head_error(e), |
219 | }; |
220 | |
221 | // Note: don't deconstruct `msg` into local variables, it appears |
222 | // the optimizer doesn't remove the extra copies. |
223 | |
224 | debug!("incoming body is {}" , msg.decode); |
225 | |
226 | // Prevent accepting HTTP/0.9 responses after the initial one, if any. |
227 | self.state.h09_responses = false; |
228 | |
229 | // Drop any OnInformational callbacks, we're done there! |
230 | #[cfg (feature = "ffi" )] |
231 | { |
232 | self.state.on_informational = None; |
233 | } |
234 | |
235 | self.state.busy(); |
236 | self.state.keep_alive &= msg.keep_alive; |
237 | self.state.version = msg.head.version; |
238 | |
239 | let mut wants = if msg.wants_upgrade { |
240 | Wants::UPGRADE |
241 | } else { |
242 | Wants::EMPTY |
243 | }; |
244 | |
245 | if msg.decode == DecodedLength::ZERO { |
246 | if msg.expect_continue { |
247 | debug!("ignoring expect-continue since body is empty" ); |
248 | } |
249 | self.state.reading = Reading::KeepAlive; |
250 | if !T::should_read_first() { |
251 | self.try_keep_alive(cx); |
252 | } |
253 | } else if msg.expect_continue { |
254 | self.state.reading = Reading::Continue(Decoder::new(msg.decode)); |
255 | wants = wants.add(Wants::EXPECT); |
256 | } else { |
257 | self.state.reading = Reading::Body(Decoder::new(msg.decode)); |
258 | } |
259 | |
260 | Poll::Ready(Some(Ok((msg.head, msg.decode, wants)))) |
261 | } |
262 | |
263 | fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> { |
264 | // If we are currently waiting on a message, then an empty |
265 | // message should be reported as an error. If not, it is just |
266 | // the connection closing gracefully. |
267 | let must_error = self.should_error_on_eof(); |
268 | self.close_read(); |
269 | self.io.consume_leading_lines(); |
270 | let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); |
271 | if was_mid_parse || must_error { |
272 | // We check if the buf contains the h2 Preface |
273 | debug!( |
274 | "parse error ( {}) with {} bytes" , |
275 | e, |
276 | self.io.read_buf().len() |
277 | ); |
278 | match self.on_parse_error(e) { |
279 | Ok(()) => Poll::Pending, // XXX: wat? |
280 | Err(e) => Poll::Ready(Some(Err(e))), |
281 | } |
282 | } else { |
283 | debug!("read eof" ); |
284 | self.close_write(); |
285 | Poll::Ready(None) |
286 | } |
287 | } |
288 | |
289 | pub(crate) fn poll_read_body( |
290 | &mut self, |
291 | cx: &mut Context<'_>, |
292 | ) -> Poll<Option<io::Result<Bytes>>> { |
293 | debug_assert!(self.can_read_body()); |
294 | |
295 | let (reading, ret) = match self.state.reading { |
296 | Reading::Body(ref mut decoder) => { |
297 | match ready!(decoder.decode(cx, &mut self.io)) { |
298 | Ok(slice) => { |
299 | let (reading, chunk) = if decoder.is_eof() { |
300 | debug!("incoming body completed" ); |
301 | ( |
302 | Reading::KeepAlive, |
303 | if !slice.is_empty() { |
304 | Some(Ok(slice)) |
305 | } else { |
306 | None |
307 | }, |
308 | ) |
309 | } else if slice.is_empty() { |
310 | error!("incoming body unexpectedly ended" ); |
311 | // This should be unreachable, since all 3 decoders |
312 | // either set eof=true or return an Err when reading |
313 | // an empty slice... |
314 | (Reading::Closed, None) |
315 | } else { |
316 | return Poll::Ready(Some(Ok(slice))); |
317 | }; |
318 | (reading, Poll::Ready(chunk)) |
319 | } |
320 | Err(e) => { |
321 | debug!("incoming body decode error: {}" , e); |
322 | (Reading::Closed, Poll::Ready(Some(Err(e)))) |
323 | } |
324 | } |
325 | } |
326 | Reading::Continue(ref decoder) => { |
327 | // Write the 100 Continue if not already responded... |
328 | if let Writing::Init = self.state.writing { |
329 | trace!("automatically sending 100 Continue" ); |
330 | let cont = b"HTTP/1.1 100 Continue \r\n\r\n" ; |
331 | self.io.headers_buf().extend_from_slice(cont); |
332 | } |
333 | |
334 | // And now recurse once in the Reading::Body state... |
335 | self.state.reading = Reading::Body(decoder.clone()); |
336 | return self.poll_read_body(cx); |
337 | } |
338 | _ => unreachable!("poll_read_body invalid state: {:?}" , self.state.reading), |
339 | }; |
340 | |
341 | self.state.reading = reading; |
342 | self.try_keep_alive(cx); |
343 | ret |
344 | } |
345 | |
346 | pub(crate) fn wants_read_again(&mut self) -> bool { |
347 | let ret = self.state.notify_read; |
348 | self.state.notify_read = false; |
349 | ret |
350 | } |
351 | |
352 | pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
353 | debug_assert!(!self.can_read_head() && !self.can_read_body()); |
354 | |
355 | if self.is_read_closed() { |
356 | Poll::Pending |
357 | } else if self.is_mid_message() { |
358 | self.mid_message_detect_eof(cx) |
359 | } else { |
360 | self.require_empty_read(cx) |
361 | } |
362 | } |
363 | |
364 | fn is_mid_message(&self) -> bool { |
365 | !matches!( |
366 | (&self.state.reading, &self.state.writing), |
367 | (&Reading::Init, &Writing::Init) |
368 | ) |
369 | } |
370 | |
371 | // This will check to make sure the io object read is empty. |
372 | // |
373 | // This should only be called for Clients wanting to enter the idle |
374 | // state. |
375 | fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
376 | debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); |
377 | debug_assert!(!self.is_mid_message()); |
378 | debug_assert!(T::is_client()); |
379 | |
380 | if !self.io.read_buf().is_empty() { |
381 | debug!("received an unexpected {} bytes" , self.io.read_buf().len()); |
382 | return Poll::Ready(Err(crate::Error::new_unexpected_message())); |
383 | } |
384 | |
385 | let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; |
386 | |
387 | if num_read == 0 { |
388 | let ret = if self.should_error_on_eof() { |
389 | trace!("found unexpected EOF on busy connection: {:?}" , self.state); |
390 | Poll::Ready(Err(crate::Error::new_incomplete())) |
391 | } else { |
392 | trace!("found EOF on idle connection, closing" ); |
393 | Poll::Ready(Ok(())) |
394 | }; |
395 | |
396 | // order is important: should_error needs state BEFORE close_read |
397 | self.state.close_read(); |
398 | return ret; |
399 | } |
400 | |
401 | debug!( |
402 | "received unexpected {} bytes on an idle connection" , |
403 | num_read |
404 | ); |
405 | Poll::Ready(Err(crate::Error::new_unexpected_message())) |
406 | } |
407 | |
408 | fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
409 | debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); |
410 | debug_assert!(self.is_mid_message()); |
411 | |
412 | if self.state.allow_half_close || !self.io.read_buf().is_empty() { |
413 | return Poll::Pending; |
414 | } |
415 | |
416 | let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; |
417 | |
418 | if num_read == 0 { |
419 | trace!("found unexpected EOF on busy connection: {:?}" , self.state); |
420 | self.state.close_read(); |
421 | Poll::Ready(Err(crate::Error::new_incomplete())) |
422 | } else { |
423 | Poll::Ready(Ok(())) |
424 | } |
425 | } |
426 | |
427 | fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> { |
428 | debug_assert!(!self.state.is_read_closed()); |
429 | |
430 | let result = ready!(self.io.poll_read_from_io(cx)); |
431 | Poll::Ready(result.map_err(|e| { |
432 | trace!("force_io_read; io error = {:?}" , e); |
433 | self.state.close(); |
434 | e |
435 | })) |
436 | } |
437 | |
438 | fn maybe_notify(&mut self, cx: &mut Context<'_>) { |
439 | // its possible that we returned NotReady from poll() without having |
440 | // exhausted the underlying Io. We would have done this when we |
441 | // determined we couldn't keep reading until we knew how writing |
442 | // would finish. |
443 | |
444 | match self.state.reading { |
445 | Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => { |
446 | return |
447 | } |
448 | Reading::Init => (), |
449 | }; |
450 | |
451 | match self.state.writing { |
452 | Writing::Body(..) => return, |
453 | Writing::Init | Writing::KeepAlive | Writing::Closed => (), |
454 | } |
455 | |
456 | if !self.io.is_read_blocked() { |
457 | if self.io.read_buf().is_empty() { |
458 | match self.io.poll_read_from_io(cx) { |
459 | Poll::Ready(Ok(n)) => { |
460 | if n == 0 { |
461 | trace!("maybe_notify; read eof" ); |
462 | if self.state.is_idle() { |
463 | self.state.close(); |
464 | } else { |
465 | self.close_read() |
466 | } |
467 | return; |
468 | } |
469 | } |
470 | Poll::Pending => { |
471 | trace!("maybe_notify; read_from_io blocked" ); |
472 | return; |
473 | } |
474 | Poll::Ready(Err(e)) => { |
475 | trace!("maybe_notify; read_from_io error: {}" , e); |
476 | self.state.close(); |
477 | self.state.error = Some(crate::Error::new_io(e)); |
478 | } |
479 | } |
480 | } |
481 | self.state.notify_read = true; |
482 | } |
483 | } |
484 | |
485 | fn try_keep_alive(&mut self, cx: &mut Context<'_>) { |
486 | self.state.try_keep_alive::<T>(); |
487 | self.maybe_notify(cx); |
488 | } |
489 | |
490 | pub(crate) fn can_write_head(&self) -> bool { |
491 | if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) { |
492 | return false; |
493 | } |
494 | |
495 | match self.state.writing { |
496 | Writing::Init => self.io.can_headers_buf(), |
497 | _ => false, |
498 | } |
499 | } |
500 | |
501 | pub(crate) fn can_write_body(&self) -> bool { |
502 | match self.state.writing { |
503 | Writing::Body(..) => true, |
504 | Writing::Init | Writing::KeepAlive | Writing::Closed => false, |
505 | } |
506 | } |
507 | |
508 | pub(crate) fn can_buffer_body(&self) -> bool { |
509 | self.io.can_buffer() |
510 | } |
511 | |
512 | pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) { |
513 | if let Some(encoder) = self.encode_head(head, body) { |
514 | self.state.writing = if !encoder.is_eof() { |
515 | Writing::Body(encoder) |
516 | } else if encoder.is_last() { |
517 | Writing::Closed |
518 | } else { |
519 | Writing::KeepAlive |
520 | }; |
521 | } |
522 | } |
523 | |
524 | pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) { |
525 | if let Some(encoder) = |
526 | self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) |
527 | { |
528 | let is_last = encoder.is_last(); |
529 | // Make sure we don't write a body if we weren't actually allowed |
530 | // to do so, like because its a HEAD request. |
531 | if !encoder.is_eof() { |
532 | encoder.danger_full_buf(body, self.io.write_buf()); |
533 | } |
534 | self.state.writing = if is_last { |
535 | Writing::Closed |
536 | } else { |
537 | Writing::KeepAlive |
538 | } |
539 | } |
540 | } |
541 | |
542 | fn encode_head( |
543 | &mut self, |
544 | mut head: MessageHead<T::Outgoing>, |
545 | body: Option<BodyLength>, |
546 | ) -> Option<Encoder> { |
547 | debug_assert!(self.can_write_head()); |
548 | |
549 | if !T::should_read_first() { |
550 | self.state.busy(); |
551 | } |
552 | |
553 | self.enforce_version(&mut head); |
554 | |
555 | let buf = self.io.headers_buf(); |
556 | match super::role::encode_headers::<T>( |
557 | Encode { |
558 | head: &mut head, |
559 | body, |
560 | #[cfg (feature = "server" )] |
561 | keep_alive: self.state.wants_keep_alive(), |
562 | req_method: &mut self.state.method, |
563 | title_case_headers: self.state.title_case_headers, |
564 | }, |
565 | buf, |
566 | ) { |
567 | Ok(encoder) => { |
568 | debug_assert!(self.state.cached_headers.is_none()); |
569 | debug_assert!(head.headers.is_empty()); |
570 | self.state.cached_headers = Some(head.headers); |
571 | |
572 | #[cfg (feature = "ffi" )] |
573 | { |
574 | self.state.on_informational = |
575 | head.extensions.remove::<crate::ffi::OnInformational>(); |
576 | } |
577 | |
578 | Some(encoder) |
579 | } |
580 | Err(err) => { |
581 | self.state.error = Some(err); |
582 | self.state.writing = Writing::Closed; |
583 | None |
584 | } |
585 | } |
586 | } |
587 | |
588 | // Fix keep-alive when Connection: keep-alive header is not present |
589 | fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) { |
590 | let outgoing_is_keep_alive = head |
591 | .headers |
592 | .get(CONNECTION) |
593 | .map(connection_keep_alive) |
594 | .unwrap_or(false); |
595 | |
596 | if !outgoing_is_keep_alive { |
597 | match head.version { |
598 | // If response is version 1.0 and keep-alive is not present in the response, |
599 | // disable keep-alive so the server closes the connection |
600 | Version::HTTP_10 => self.state.disable_keep_alive(), |
601 | // If response is version 1.1 and keep-alive is wanted, add |
602 | // Connection: keep-alive header when not present |
603 | Version::HTTP_11 => { |
604 | if self.state.wants_keep_alive() { |
605 | head.headers |
606 | .insert(CONNECTION, HeaderValue::from_static("keep-alive" )); |
607 | } |
608 | } |
609 | _ => (), |
610 | } |
611 | } |
612 | } |
613 | |
614 | // If we know the remote speaks an older version, we try to fix up any messages |
615 | // to work with our older peer. |
616 | fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { |
617 | if let Version::HTTP_10 = self.state.version { |
618 | // Fixes response or connection when keep-alive header is not present |
619 | self.fix_keep_alive(head); |
620 | // If the remote only knows HTTP/1.0, we should force ourselves |
621 | // to do only speak HTTP/1.0 as well. |
622 | head.version = Version::HTTP_10; |
623 | } |
624 | // If the remote speaks HTTP/1.1, then it *should* be fine with |
625 | // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let |
626 | // the user's headers be. |
627 | } |
628 | |
629 | pub(crate) fn write_body(&mut self, chunk: B) { |
630 | debug_assert!(self.can_write_body() && self.can_buffer_body()); |
631 | // empty chunks should be discarded at Dispatcher level |
632 | debug_assert!(chunk.remaining() != 0); |
633 | |
634 | let state = match self.state.writing { |
635 | Writing::Body(ref mut encoder) => { |
636 | self.io.buffer(encoder.encode(chunk)); |
637 | |
638 | if !encoder.is_eof() { |
639 | return; |
640 | } |
641 | |
642 | if encoder.is_last() { |
643 | Writing::Closed |
644 | } else { |
645 | Writing::KeepAlive |
646 | } |
647 | } |
648 | _ => unreachable!("write_body invalid state: {:?}" , self.state.writing), |
649 | }; |
650 | |
651 | self.state.writing = state; |
652 | } |
653 | |
654 | pub(crate) fn write_body_and_end(&mut self, chunk: B) { |
655 | debug_assert!(self.can_write_body() && self.can_buffer_body()); |
656 | // empty chunks should be discarded at Dispatcher level |
657 | debug_assert!(chunk.remaining() != 0); |
658 | |
659 | let state = match self.state.writing { |
660 | Writing::Body(ref encoder) => { |
661 | let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf()); |
662 | if can_keep_alive { |
663 | Writing::KeepAlive |
664 | } else { |
665 | Writing::Closed |
666 | } |
667 | } |
668 | _ => unreachable!("write_body invalid state: {:?}" , self.state.writing), |
669 | }; |
670 | |
671 | self.state.writing = state; |
672 | } |
673 | |
674 | pub(crate) fn end_body(&mut self) -> crate::Result<()> { |
675 | debug_assert!(self.can_write_body()); |
676 | |
677 | let encoder = match self.state.writing { |
678 | Writing::Body(ref mut enc) => enc, |
679 | _ => return Ok(()), |
680 | }; |
681 | |
682 | // end of stream, that means we should try to eof |
683 | match encoder.end() { |
684 | Ok(end) => { |
685 | if let Some(end) = end { |
686 | self.io.buffer(end); |
687 | } |
688 | |
689 | self.state.writing = if encoder.is_last() || encoder.is_close_delimited() { |
690 | Writing::Closed |
691 | } else { |
692 | Writing::KeepAlive |
693 | }; |
694 | |
695 | Ok(()) |
696 | } |
697 | Err(not_eof) => { |
698 | self.state.writing = Writing::Closed; |
699 | Err(crate::Error::new_body_write_aborted().with(not_eof)) |
700 | } |
701 | } |
702 | } |
703 | |
704 | // When we get a parse error, depending on what side we are, we might be able |
705 | // to write a response before closing the connection. |
706 | // |
707 | // - Client: there is nothing we can do |
708 | // - Server: if Response hasn't been written yet, we can send a 4xx response |
709 | fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { |
710 | if let Writing::Init = self.state.writing { |
711 | if self.has_h2_prefix() { |
712 | return Err(crate::Error::new_version_h2()); |
713 | } |
714 | if let Some(msg) = T::on_error(&err) { |
715 | // Drop the cached headers so as to not trigger a debug |
716 | // assert in `write_head`... |
717 | self.state.cached_headers.take(); |
718 | self.write_head(msg, None); |
719 | self.state.error = Some(err); |
720 | return Ok(()); |
721 | } |
722 | } |
723 | |
724 | // fallback is pass the error back up |
725 | Err(err) |
726 | } |
727 | |
728 | pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
729 | ready!(Pin::new(&mut self.io).poll_flush(cx))?; |
730 | self.try_keep_alive(cx); |
731 | trace!("flushed( {}): {:?}" , T::LOG, self.state); |
732 | Poll::Ready(Ok(())) |
733 | } |
734 | |
735 | pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
736 | match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { |
737 | Ok(()) => { |
738 | trace!("shut down IO complete" ); |
739 | Poll::Ready(Ok(())) |
740 | } |
741 | Err(e) => { |
742 | debug!("error shutting down IO: {}" , e); |
743 | Poll::Ready(Err(e)) |
744 | } |
745 | } |
746 | } |
747 | |
748 | /// If the read side can be cheaply drained, do so. Otherwise, close. |
749 | pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) { |
750 | if let Reading::Continue(ref decoder) = self.state.reading { |
751 | // skip sending the 100-continue |
752 | // just move forward to a read, in case a tiny body was included |
753 | self.state.reading = Reading::Body(decoder.clone()); |
754 | } |
755 | |
756 | let _ = self.poll_read_body(cx); |
757 | |
758 | // If still in Reading::Body, just give up |
759 | match self.state.reading { |
760 | Reading::Init | Reading::KeepAlive => trace!("body drained" ), |
761 | _ => self.close_read(), |
762 | } |
763 | } |
764 | |
765 | pub(crate) fn close_read(&mut self) { |
766 | self.state.close_read(); |
767 | } |
768 | |
769 | pub(crate) fn close_write(&mut self) { |
770 | self.state.close_write(); |
771 | } |
772 | |
773 | #[cfg (feature = "server" )] |
774 | pub(crate) fn disable_keep_alive(&mut self) { |
775 | if self.state.is_idle() { |
776 | trace!("disable_keep_alive; closing idle connection" ); |
777 | self.state.close(); |
778 | } else { |
779 | trace!("disable_keep_alive; in-progress connection" ); |
780 | self.state.disable_keep_alive(); |
781 | } |
782 | } |
783 | |
784 | pub(crate) fn take_error(&mut self) -> crate::Result<()> { |
785 | if let Some(err) = self.state.error.take() { |
786 | Err(err) |
787 | } else { |
788 | Ok(()) |
789 | } |
790 | } |
791 | |
792 | pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { |
793 | trace!(" {}: prepare possible HTTP upgrade" , T::LOG); |
794 | self.state.prepare_upgrade() |
795 | } |
796 | } |
797 | |
798 | impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> { |
799 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
800 | f&mut DebugStruct<'_, '_>.debug_struct("Conn" ) |
801 | .field("state" , &self.state) |
802 | .field(name:"io" , &self.io) |
803 | .finish() |
804 | } |
805 | } |
806 | |
807 | // B and T are never pinned |
808 | impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {} |
809 | |
810 | struct State { |
811 | allow_half_close: bool, |
812 | /// Re-usable HeaderMap to reduce allocating new ones. |
813 | cached_headers: Option<HeaderMap>, |
814 | /// If an error occurs when there wasn't a direct way to return it |
815 | /// back to the user, this is set. |
816 | error: Option<crate::Error>, |
817 | /// Current keep-alive status. |
818 | keep_alive: KA, |
819 | /// If mid-message, the HTTP Method that started it. |
820 | /// |
821 | /// This is used to know things such as if the message can include |
822 | /// a body or not. |
823 | method: Option<Method>, |
824 | h1_parser_config: ParserConfig, |
825 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
826 | h1_header_read_timeout: Option<Duration>, |
827 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
828 | h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>, |
829 | #[cfg (all(feature = "server" , feature = "runtime" ))] |
830 | h1_header_read_timeout_running: bool, |
831 | preserve_header_case: bool, |
832 | #[cfg (feature = "ffi" )] |
833 | preserve_header_order: bool, |
834 | title_case_headers: bool, |
835 | h09_responses: bool, |
836 | /// If set, called with each 1xx informational response received for |
837 | /// the current request. MUST be unset after a non-1xx response is |
838 | /// received. |
839 | #[cfg (feature = "ffi" )] |
840 | on_informational: Option<crate::ffi::OnInformational>, |
841 | #[cfg (feature = "ffi" )] |
842 | raw_headers: bool, |
843 | /// Set to true when the Dispatcher should poll read operations |
844 | /// again. See the `maybe_notify` method for more. |
845 | notify_read: bool, |
846 | /// State of allowed reads |
847 | reading: Reading, |
848 | /// State of allowed writes |
849 | writing: Writing, |
850 | /// An expected pending HTTP upgrade. |
851 | upgrade: Option<crate::upgrade::Pending>, |
852 | /// Either HTTP/1.0 or 1.1 connection |
853 | version: Version, |
854 | } |
855 | |
856 | #[derive (Debug)] |
857 | enum Reading { |
858 | Init, |
859 | Continue(Decoder), |
860 | Body(Decoder), |
861 | KeepAlive, |
862 | Closed, |
863 | } |
864 | |
865 | enum Writing { |
866 | Init, |
867 | Body(Encoder), |
868 | KeepAlive, |
869 | Closed, |
870 | } |
871 | |
872 | impl fmt::Debug for State { |
873 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
874 | let mut builder: DebugStruct<'_, '_> = f.debug_struct(name:"State" ); |
875 | builder |
876 | .field("reading" , &self.reading) |
877 | .field("writing" , &self.writing) |
878 | .field(name:"keep_alive" , &self.keep_alive); |
879 | |
880 | // Only show error field if it's interesting... |
881 | if let Some(ref error: &Error) = self.error { |
882 | builder.field(name:"error" , value:error); |
883 | } |
884 | |
885 | if self.allow_half_close { |
886 | builder.field(name:"allow_half_close" , &true); |
887 | } |
888 | |
889 | // Purposefully leaving off other fields.. |
890 | |
891 | builder.finish() |
892 | } |
893 | } |
894 | |
895 | impl fmt::Debug for Writing { |
896 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
897 | match *self { |
898 | Writing::Init => f.write_str(data:"Init" ), |
899 | Writing::Body(ref enc: &Encoder) => f.debug_tuple(name:"Body" ).field(enc).finish(), |
900 | Writing::KeepAlive => f.write_str(data:"KeepAlive" ), |
901 | Writing::Closed => f.write_str(data:"Closed" ), |
902 | } |
903 | } |
904 | } |
905 | |
906 | impl std::ops::BitAndAssign<bool> for KA { |
907 | fn bitand_assign(&mut self, enabled: bool) { |
908 | if !enabled { |
909 | trace!("remote disabling keep-alive" ); |
910 | *self = KA::Disabled; |
911 | } |
912 | } |
913 | } |
914 | |
915 | #[derive (Clone, Copy, Debug)] |
916 | enum KA { |
917 | Idle, |
918 | Busy, |
919 | Disabled, |
920 | } |
921 | |
922 | impl Default for KA { |
923 | fn default() -> KA { |
924 | KA::Busy |
925 | } |
926 | } |
927 | |
928 | impl KA { |
929 | fn idle(&mut self) { |
930 | *self = KA::Idle; |
931 | } |
932 | |
933 | fn busy(&mut self) { |
934 | *self = KA::Busy; |
935 | } |
936 | |
937 | fn disable(&mut self) { |
938 | *self = KA::Disabled; |
939 | } |
940 | |
941 | fn status(&self) -> KA { |
942 | *self |
943 | } |
944 | } |
945 | |
946 | impl State { |
947 | fn close(&mut self) { |
948 | trace!("State::close()" ); |
949 | self.reading = Reading::Closed; |
950 | self.writing = Writing::Closed; |
951 | self.keep_alive.disable(); |
952 | } |
953 | |
954 | fn close_read(&mut self) { |
955 | trace!("State::close_read()" ); |
956 | self.reading = Reading::Closed; |
957 | self.keep_alive.disable(); |
958 | } |
959 | |
960 | fn close_write(&mut self) { |
961 | trace!("State::close_write()" ); |
962 | self.writing = Writing::Closed; |
963 | self.keep_alive.disable(); |
964 | } |
965 | |
966 | fn wants_keep_alive(&self) -> bool { |
967 | if let KA::Disabled = self.keep_alive.status() { |
968 | false |
969 | } else { |
970 | true |
971 | } |
972 | } |
973 | |
974 | fn try_keep_alive<T: Http1Transaction>(&mut self) { |
975 | match (&self.reading, &self.writing) { |
976 | (&Reading::KeepAlive, &Writing::KeepAlive) => { |
977 | if let KA::Busy = self.keep_alive.status() { |
978 | self.idle::<T>(); |
979 | } else { |
980 | trace!( |
981 | "try_keep_alive( {}): could keep-alive, but status = {:?}" , |
982 | T::LOG, |
983 | self.keep_alive |
984 | ); |
985 | self.close(); |
986 | } |
987 | } |
988 | (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => { |
989 | self.close() |
990 | } |
991 | _ => (), |
992 | } |
993 | } |
994 | |
995 | fn disable_keep_alive(&mut self) { |
996 | self.keep_alive.disable() |
997 | } |
998 | |
999 | fn busy(&mut self) { |
1000 | if let KA::Disabled = self.keep_alive.status() { |
1001 | return; |
1002 | } |
1003 | self.keep_alive.busy(); |
1004 | } |
1005 | |
1006 | fn idle<T: Http1Transaction>(&mut self) { |
1007 | debug_assert!(!self.is_idle(), "State::idle() called while idle" ); |
1008 | |
1009 | self.method = None; |
1010 | self.keep_alive.idle(); |
1011 | |
1012 | if !self.is_idle() { |
1013 | self.close(); |
1014 | return; |
1015 | } |
1016 | |
1017 | self.reading = Reading::Init; |
1018 | self.writing = Writing::Init; |
1019 | |
1020 | // !T::should_read_first() means Client. |
1021 | // |
1022 | // If Client connection has just gone idle, the Dispatcher |
1023 | // should try the poll loop one more time, so as to poll the |
1024 | // pending requests stream. |
1025 | if !T::should_read_first() { |
1026 | self.notify_read = true; |
1027 | } |
1028 | } |
1029 | |
1030 | fn is_idle(&self) -> bool { |
1031 | matches!(self.keep_alive.status(), KA::Idle) |
1032 | } |
1033 | |
1034 | fn is_read_closed(&self) -> bool { |
1035 | matches!(self.reading, Reading::Closed) |
1036 | } |
1037 | |
1038 | fn is_write_closed(&self) -> bool { |
1039 | matches!(self.writing, Writing::Closed) |
1040 | } |
1041 | |
1042 | fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { |
1043 | let (tx, rx) = crate::upgrade::pending(); |
1044 | self.upgrade = Some(tx); |
1045 | rx |
1046 | } |
1047 | } |
1048 | |
1049 | #[cfg (test)] |
1050 | mod tests { |
1051 | #[cfg (feature = "nightly" )] |
1052 | #[bench ] |
1053 | fn bench_read_head_short(b: &mut ::test::Bencher) { |
1054 | use super::*; |
1055 | let s = b"GET / HTTP/1.1 \r\nHost: localhost:8080 \r\n\r\n" ; |
1056 | let len = s.len(); |
1057 | b.bytes = len as u64; |
1058 | |
1059 | // an empty IO, we'll be skipping and using the read buffer anyways |
1060 | let io = tokio_test::io::Builder::new().build(); |
1061 | let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); |
1062 | *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); |
1063 | conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); |
1064 | |
1065 | let rt = tokio::runtime::Builder::new_current_thread() |
1066 | .enable_all() |
1067 | .build() |
1068 | .unwrap(); |
1069 | |
1070 | b.iter(|| { |
1071 | rt.block_on(futures_util::future::poll_fn(|cx| { |
1072 | match conn.poll_read_head(cx) { |
1073 | Poll::Ready(Some(Ok(x))) => { |
1074 | ::test::black_box(&x); |
1075 | let mut headers = x.0.headers; |
1076 | headers.clear(); |
1077 | conn.state.cached_headers = Some(headers); |
1078 | } |
1079 | f => panic!("expected Ready(Some(Ok(..))): {:?}" , f), |
1080 | } |
1081 | |
1082 | conn.io.read_buf_mut().reserve(1); |
1083 | unsafe { |
1084 | conn.io.read_buf_mut().set_len(len); |
1085 | } |
1086 | conn.state.reading = Reading::Init; |
1087 | Poll::Ready(()) |
1088 | })); |
1089 | }); |
1090 | } |
1091 | |
1092 | /* |
1093 | //TODO: rewrite these using dispatch... someday... |
1094 | use futures::{Async, Future, Stream, Sink}; |
1095 | use futures::future; |
1096 | |
1097 | use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; |
1098 | use super::super::Encoder; |
1099 | use mock::AsyncIo; |
1100 | |
1101 | use super::{Conn, Decoder, Reading, Writing}; |
1102 | use ::uri::Uri; |
1103 | |
1104 | use std::str::FromStr; |
1105 | |
1106 | #[test] |
1107 | fn test_conn_init_read() { |
1108 | let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); |
1109 | let len = good_message.len(); |
1110 | let io = AsyncIo::new_buf(good_message, len); |
1111 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1112 | |
1113 | match conn.poll().unwrap() { |
1114 | Async::Ready(Some(Frame::Message { message, body: false })) => { |
1115 | assert_eq!(message, MessageHead { |
1116 | subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()), |
1117 | .. MessageHead::default() |
1118 | }) |
1119 | }, |
1120 | f => panic!("frame is not Frame::Message: {:?}", f) |
1121 | } |
1122 | } |
1123 | |
1124 | #[test] |
1125 | fn test_conn_parse_partial() { |
1126 | let _: Result<(), ()> = future::lazy(|| { |
1127 | let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); |
1128 | let io = AsyncIo::new_buf(good_message, 10); |
1129 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1130 | assert!(conn.poll().unwrap().is_not_ready()); |
1131 | conn.io.io_mut().block_in(50); |
1132 | let async = conn.poll().unwrap(); |
1133 | assert!(async.is_ready()); |
1134 | match async { |
1135 | Async::Ready(Some(Frame::Message { .. })) => (), |
1136 | f => panic!("frame is not Message: {:?}", f), |
1137 | } |
1138 | Ok(()) |
1139 | }).wait(); |
1140 | } |
1141 | |
1142 | #[test] |
1143 | fn test_conn_init_read_eof_idle() { |
1144 | let io = AsyncIo::new_buf(vec![], 1); |
1145 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1146 | conn.state.idle(); |
1147 | |
1148 | match conn.poll().unwrap() { |
1149 | Async::Ready(None) => {}, |
1150 | other => panic!("frame is not None: {:?}", other) |
1151 | } |
1152 | } |
1153 | |
1154 | #[test] |
1155 | fn test_conn_init_read_eof_idle_partial_parse() { |
1156 | let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); |
1157 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1158 | conn.state.idle(); |
1159 | |
1160 | match conn.poll() { |
1161 | Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, |
1162 | other => panic!("unexpected frame: {:?}", other) |
1163 | } |
1164 | } |
1165 | |
1166 | #[test] |
1167 | fn test_conn_init_read_eof_busy() { |
1168 | let _: Result<(), ()> = future::lazy(|| { |
1169 | // server ignores |
1170 | let io = AsyncIo::new_eof(); |
1171 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1172 | conn.state.busy(); |
1173 | |
1174 | match conn.poll().unwrap() { |
1175 | Async::Ready(None) => {}, |
1176 | other => panic!("unexpected frame: {:?}", other) |
1177 | } |
1178 | |
1179 | // client |
1180 | let io = AsyncIo::new_eof(); |
1181 | let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); |
1182 | conn.state.busy(); |
1183 | |
1184 | match conn.poll() { |
1185 | Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, |
1186 | other => panic!("unexpected frame: {:?}", other) |
1187 | } |
1188 | Ok(()) |
1189 | }).wait(); |
1190 | } |
1191 | |
1192 | #[test] |
1193 | fn test_conn_body_finish_read_eof() { |
1194 | let _: Result<(), ()> = future::lazy(|| { |
1195 | let io = AsyncIo::new_eof(); |
1196 | let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); |
1197 | conn.state.busy(); |
1198 | conn.state.writing = Writing::KeepAlive; |
1199 | conn.state.reading = Reading::Body(Decoder::length(0)); |
1200 | |
1201 | match conn.poll() { |
1202 | Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), |
1203 | other => panic!("unexpected frame: {:?}", other) |
1204 | } |
1205 | |
1206 | // conn eofs, but tokio-proto will call poll() again, before calling flush() |
1207 | // the conn eof in this case is perfectly fine |
1208 | |
1209 | match conn.poll() { |
1210 | Ok(Async::Ready(None)) => (), |
1211 | other => panic!("unexpected frame: {:?}", other) |
1212 | } |
1213 | Ok(()) |
1214 | }).wait(); |
1215 | } |
1216 | |
1217 | #[test] |
1218 | fn test_conn_message_empty_body_read_eof() { |
1219 | let _: Result<(), ()> = future::lazy(|| { |
1220 | let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); |
1221 | let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); |
1222 | conn.state.busy(); |
1223 | conn.state.writing = Writing::KeepAlive; |
1224 | |
1225 | match conn.poll() { |
1226 | Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), |
1227 | other => panic!("unexpected frame: {:?}", other) |
1228 | } |
1229 | |
1230 | // conn eofs, but tokio-proto will call poll() again, before calling flush() |
1231 | // the conn eof in this case is perfectly fine |
1232 | |
1233 | match conn.poll() { |
1234 | Ok(Async::Ready(None)) => (), |
1235 | other => panic!("unexpected frame: {:?}", other) |
1236 | } |
1237 | Ok(()) |
1238 | }).wait(); |
1239 | } |
1240 | |
1241 | #[test] |
1242 | fn test_conn_read_body_end() { |
1243 | let _: Result<(), ()> = future::lazy(|| { |
1244 | let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); |
1245 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1246 | conn.state.busy(); |
1247 | |
1248 | match conn.poll() { |
1249 | Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), |
1250 | other => panic!("unexpected frame: {:?}", other) |
1251 | } |
1252 | |
1253 | match conn.poll() { |
1254 | Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), |
1255 | other => panic!("unexpected frame: {:?}", other) |
1256 | } |
1257 | |
1258 | // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` |
1259 | match conn.poll() { |
1260 | Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), |
1261 | other => panic!("unexpected frame: {:?}", other) |
1262 | } |
1263 | |
1264 | match conn.poll() { |
1265 | Ok(Async::NotReady) => (), |
1266 | other => panic!("unexpected frame: {:?}", other) |
1267 | } |
1268 | Ok(()) |
1269 | }).wait(); |
1270 | } |
1271 | |
1272 | #[test] |
1273 | fn test_conn_closed_read() { |
1274 | let io = AsyncIo::new_buf(vec![], 0); |
1275 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1276 | conn.state.close(); |
1277 | |
1278 | match conn.poll().unwrap() { |
1279 | Async::Ready(None) => {}, |
1280 | other => panic!("frame is not None: {:?}", other) |
1281 | } |
1282 | } |
1283 | |
1284 | #[test] |
1285 | fn test_conn_body_write_length() { |
1286 | let _ = pretty_env_logger::try_init(); |
1287 | let _: Result<(), ()> = future::lazy(|| { |
1288 | let io = AsyncIo::new_buf(vec![], 0); |
1289 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1290 | let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; |
1291 | conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); |
1292 | |
1293 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); |
1294 | assert!(!conn.can_buffer_body()); |
1295 | |
1296 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); |
1297 | |
1298 | conn.io.io_mut().block_in(1024 * 3); |
1299 | assert!(conn.poll_complete().unwrap().is_not_ready()); |
1300 | conn.io.io_mut().block_in(1024 * 3); |
1301 | assert!(conn.poll_complete().unwrap().is_not_ready()); |
1302 | conn.io.io_mut().block_in(max * 2); |
1303 | assert!(conn.poll_complete().unwrap().is_ready()); |
1304 | |
1305 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready()); |
1306 | Ok(()) |
1307 | }).wait(); |
1308 | } |
1309 | |
1310 | #[test] |
1311 | fn test_conn_body_write_chunked() { |
1312 | let _: Result<(), ()> = future::lazy(|| { |
1313 | let io = AsyncIo::new_buf(vec![], 4096); |
1314 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1315 | conn.state.writing = Writing::Body(Encoder::chunked()); |
1316 | |
1317 | assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); |
1318 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); |
1319 | Ok(()) |
1320 | }).wait(); |
1321 | } |
1322 | |
1323 | #[test] |
1324 | fn test_conn_body_flush() { |
1325 | let _: Result<(), ()> = future::lazy(|| { |
1326 | let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); |
1327 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1328 | conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); |
1329 | assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); |
1330 | assert!(!conn.can_buffer_body()); |
1331 | conn.io.io_mut().block_in(1024 * 1024 * 5); |
1332 | assert!(conn.poll_complete().unwrap().is_ready()); |
1333 | assert!(conn.can_buffer_body()); |
1334 | assert!(conn.io.io_mut().flushed()); |
1335 | |
1336 | Ok(()) |
1337 | }).wait(); |
1338 | } |
1339 | |
1340 | #[test] |
1341 | fn test_conn_parking() { |
1342 | use std::sync::Arc; |
1343 | use futures::executor::Notify; |
1344 | use futures::executor::NotifyHandle; |
1345 | |
1346 | struct Car { |
1347 | permit: bool, |
1348 | } |
1349 | impl Notify for Car { |
1350 | fn notify(&self, _id: usize) { |
1351 | assert!(self.permit, "unparked without permit"); |
1352 | } |
1353 | } |
1354 | |
1355 | fn car(permit: bool) -> NotifyHandle { |
1356 | Arc::new(Car { |
1357 | permit: permit, |
1358 | }).into() |
1359 | } |
1360 | |
1361 | // test that once writing is done, unparks |
1362 | let f = future::lazy(|| { |
1363 | let io = AsyncIo::new_buf(vec![], 4096); |
1364 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1365 | conn.state.reading = Reading::KeepAlive; |
1366 | assert!(conn.poll().unwrap().is_not_ready()); |
1367 | |
1368 | conn.state.writing = Writing::KeepAlive; |
1369 | assert!(conn.poll_complete().unwrap().is_ready()); |
1370 | Ok::<(), ()>(()) |
1371 | }); |
1372 | ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap(); |
1373 | |
1374 | |
1375 | // test that flushing when not waiting on read doesn't unpark |
1376 | let f = future::lazy(|| { |
1377 | let io = AsyncIo::new_buf(vec![], 4096); |
1378 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1379 | conn.state.writing = Writing::KeepAlive; |
1380 | assert!(conn.poll_complete().unwrap().is_ready()); |
1381 | Ok::<(), ()>(()) |
1382 | }); |
1383 | ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); |
1384 | |
1385 | |
1386 | // test that flushing and writing isn't done doesn't unpark |
1387 | let f = future::lazy(|| { |
1388 | let io = AsyncIo::new_buf(vec![], 4096); |
1389 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1390 | conn.state.reading = Reading::KeepAlive; |
1391 | assert!(conn.poll().unwrap().is_not_ready()); |
1392 | conn.state.writing = Writing::Body(Encoder::length(5_000)); |
1393 | assert!(conn.poll_complete().unwrap().is_ready()); |
1394 | Ok::<(), ()>(()) |
1395 | }); |
1396 | ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); |
1397 | } |
1398 | |
1399 | #[test] |
1400 | fn test_conn_closed_write() { |
1401 | let io = AsyncIo::new_buf(vec![], 0); |
1402 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1403 | conn.state.close(); |
1404 | |
1405 | match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { |
1406 | Err(_e) => {}, |
1407 | other => panic!("did not return Err: {:?}", other) |
1408 | } |
1409 | |
1410 | assert!(conn.state.is_write_closed()); |
1411 | } |
1412 | |
1413 | #[test] |
1414 | fn test_conn_write_empty_chunk() { |
1415 | let io = AsyncIo::new_buf(vec![], 0); |
1416 | let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); |
1417 | conn.state.writing = Writing::KeepAlive; |
1418 | |
1419 | assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); |
1420 | assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); |
1421 | conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); |
1422 | } |
1423 | */ |
1424 | } |
1425 | |