1use crate::codec::UserError;
2use crate::frame::{Reason, StreamId};
3use crate::{client, frame, server};
4
5use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
6use crate::proto::*;
7
8use bytes::{Buf, Bytes};
9use futures_core::Stream;
10use std::io;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncWrite};
16
17/// An H2 connection
18#[derive(Debug)]
19pub(crate) struct Connection<T, P, B: Buf = Bytes>
20where
21 P: Peer,
22{
23 /// Read / write frame values
24 codec: Codec<T, Prioritized<B>>,
25
26 inner: ConnectionInner<P, B>,
27}
28
29// Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated
30// method instantiations.
31#[derive(Debug)]
32struct ConnectionInner<P, B: Buf = Bytes>
33where
34 P: Peer,
35{
36 /// Tracks the connection level state transitions.
37 state: State,
38
39 /// An error to report back once complete.
40 ///
41 /// This exists separately from State in order to support
42 /// graceful shutdown.
43 error: Option<frame::GoAway>,
44
45 /// Pending GOAWAY frames to write.
46 go_away: GoAway,
47
48 /// Ping/pong handler
49 ping_pong: PingPong,
50
51 /// Connection settings
52 settings: Settings,
53
54 /// Stream state handler
55 streams: Streams<B, P>,
56
57 /// A `tracing` span tracking the lifetime of the connection.
58 span: tracing::Span,
59
60 /// Client or server
61 _phantom: PhantomData<P>,
62}
63
64struct DynConnection<'a, B: Buf = Bytes> {
65 state: &'a mut State,
66
67 go_away: &'a mut GoAway,
68
69 streams: DynStreams<'a, B>,
70
71 error: &'a mut Option<frame::GoAway>,
72
73 ping_pong: &'a mut PingPong,
74}
75
76#[derive(Debug, Clone)]
77pub(crate) struct Config {
78 pub next_stream_id: StreamId,
79 pub initial_max_send_streams: usize,
80 pub max_send_buffer_size: usize,
81 pub reset_stream_duration: Duration,
82 pub reset_stream_max: usize,
83 pub remote_reset_stream_max: usize,
84 pub local_error_reset_streams_max: Option<usize>,
85 pub settings: frame::Settings,
86}
87
88#[derive(Debug)]
89enum State {
90 /// Currently open in a sane state
91 Open,
92
93 /// The codec must be flushed
94 Closing(Reason, Initiator),
95
96 /// In a closed state
97 Closed(Reason, Initiator),
98}
99
100impl<T, P, B> Connection<T, P, B>
101where
102 T: AsyncRead + AsyncWrite + Unpin,
103 P: Peer,
104 B: Buf,
105{
106 pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
107 fn streams_config(config: &Config) -> streams::Config {
108 streams::Config {
109 local_init_window_sz: config
110 .settings
111 .initial_window_size()
112 .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
113 initial_max_send_streams: config.initial_max_send_streams,
114 local_max_buffer_size: config.max_send_buffer_size,
115 local_next_stream_id: config.next_stream_id,
116 local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
117 extended_connect_protocol_enabled: config
118 .settings
119 .is_extended_connect_protocol_enabled()
120 .unwrap_or(false),
121 local_reset_duration: config.reset_stream_duration,
122 local_reset_max: config.reset_stream_max,
123 remote_reset_max: config.remote_reset_stream_max,
124 remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
125 remote_max_initiated: config
126 .settings
127 .max_concurrent_streams()
128 .map(|max| max as usize),
129 local_max_error_reset_streams: config.local_error_reset_streams_max,
130 }
131 }
132 let streams = Streams::new(streams_config(&config));
133 Connection {
134 codec,
135 inner: ConnectionInner {
136 state: State::Open,
137 error: None,
138 go_away: GoAway::new(),
139 ping_pong: PingPong::new(),
140 settings: Settings::new(config.settings),
141 streams,
142 span: tracing::debug_span!("Connection", peer = %P::NAME),
143 _phantom: PhantomData,
144 },
145 }
146 }
147
148 /// connection flow control
149 pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
150 let _res = self.inner.streams.set_target_connection_window_size(size);
151 // TODO: proper error handling
152 debug_assert!(_res.is_ok());
153 }
154
155 /// Send a new SETTINGS frame with an updated initial window size.
156 pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
157 let mut settings = frame::Settings::default();
158 settings.set_initial_window_size(Some(size));
159 self.inner.settings.send_settings(settings)
160 }
161
162 /// Send a new SETTINGS frame with extended CONNECT protocol enabled.
163 pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> {
164 let mut settings = frame::Settings::default();
165 settings.set_enable_connect_protocol(Some(1));
166 self.inner.settings.send_settings(settings)
167 }
168
169 /// Returns the maximum number of concurrent streams that may be initiated
170 /// by this peer.
171 pub(crate) fn max_send_streams(&self) -> usize {
172 self.inner.streams.max_send_streams()
173 }
174
175 /// Returns the maximum number of concurrent streams that may be initiated
176 /// by the remote peer.
177 pub(crate) fn max_recv_streams(&self) -> usize {
178 self.inner.streams.max_recv_streams()
179 }
180
181 #[cfg(feature = "unstable")]
182 pub fn num_wired_streams(&self) -> usize {
183 self.inner.streams.num_wired_streams()
184 }
185
186 /// Returns `Ready` when the connection is ready to receive a frame.
187 ///
188 /// Returns `Error` as this may raise errors that are caused by delayed
189 /// processing of received frames.
190 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
191 let _e = self.inner.span.enter();
192 let span = tracing::trace_span!("poll_ready");
193 let _e = span.enter();
194 // The order of these calls don't really matter too much
195 ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?;
196 ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?;
197 ready!(self
198 .inner
199 .settings
200 .poll_send(cx, &mut self.codec, &mut self.inner.streams))?;
201 ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?;
202
203 Poll::Ready(Ok(()))
204 }
205
206 /// Send any pending GOAWAY frames.
207 ///
208 /// This will return `Some(reason)` if the connection should be closed
209 /// afterwards. If this is a graceful shutdown, this returns `None`.
210 fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> {
211 self.inner.go_away.send_pending_go_away(cx, &mut self.codec)
212 }
213
214 pub fn go_away_from_user(&mut self, e: Reason) {
215 self.inner.as_dyn().go_away_from_user(e)
216 }
217
218 fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
219 let (debug_data, theirs) = self
220 .inner
221 .error
222 .take()
223 .as_ref()
224 .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
225 (frame.debug_data().clone(), frame.reason())
226 });
227
228 match (ours, theirs) {
229 (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
230 (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
231 // If both sides reported an error, give their
232 // error back to th user. We assume our error
233 // was a consequence of their error, and less
234 // important.
235 (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
236 }
237 }
238
239 /// Closes the connection by transitioning to a GOAWAY state
240 /// iff there are no streams or references
241 pub fn maybe_close_connection_if_no_streams(&mut self) {
242 // If we poll() and realize that there are no streams or references
243 // then we can close the connection by transitioning to GOAWAY
244 if !self.inner.streams.has_streams_or_other_references() {
245 self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
246 }
247 }
248
249 pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
250 self.inner.ping_pong.take_user_pings()
251 }
252
253 /// Advances the internal state of the connection.
254 pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
255 // XXX(eliza): cloning the span is unfortunately necessary here in
256 // order to placate the borrow checker — `self` is mutably borrowed by
257 // `poll2`, which means that we can't borrow `self.span` to enter it.
258 // The clone is just an atomic ref bump.
259 let span = self.inner.span.clone();
260 let _e = span.enter();
261 let span = tracing::trace_span!("poll");
262 let _e = span.enter();
263
264 loop {
265 tracing::trace!(connection.state = ?self.inner.state);
266 // TODO: probably clean up this glob of code
267 match self.inner.state {
268 // When open, continue to poll a frame
269 State::Open => {
270 let result = match self.poll2(cx) {
271 Poll::Ready(result) => result,
272 // The connection is not ready to make progress
273 Poll::Pending => {
274 // Ensure all window updates have been sent.
275 //
276 // This will also handle flushing `self.codec`
277 ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?;
278
279 if (self.inner.error.is_some()
280 || self.inner.go_away.should_close_on_idle())
281 && !self.inner.streams.has_streams()
282 {
283 self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
284 continue;
285 }
286
287 return Poll::Pending;
288 }
289 };
290
291 self.inner.as_dyn().handle_poll2_result(result)?
292 }
293 State::Closing(reason, initiator) => {
294 tracing::trace!("connection closing after flush");
295 // Flush/shutdown the codec
296 ready!(self.codec.shutdown(cx))?;
297
298 // Transition the state to error
299 self.inner.state = State::Closed(reason, initiator);
300 }
301 State::Closed(reason, initiator) => {
302 return Poll::Ready(self.take_error(reason, initiator));
303 }
304 }
305 }
306 }
307
308 fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
309 // This happens outside of the loop to prevent needing to do a clock
310 // check and then comparison of the queue possibly multiple times a
311 // second (and thus, the clock wouldn't have changed enough to matter).
312 self.clear_expired_reset_streams();
313
314 loop {
315 // First, ensure that the `Connection` is able to receive a frame
316 //
317 // The order here matters:
318 // - poll_go_away may buffer a graceful shutdown GOAWAY frame
319 // - If it has, we've also added a PING to be sent in poll_ready
320 if let Some(reason) = ready!(self.poll_go_away(cx)?) {
321 if self.inner.go_away.should_close_now() {
322 if self.inner.go_away.is_user_initiated() {
323 // A user initiated abrupt shutdown shouldn't return
324 // the same error back to the user.
325 return Poll::Ready(Ok(()));
326 } else {
327 return Poll::Ready(Err(Error::library_go_away(reason)));
328 }
329 }
330 // Only NO_ERROR should be waiting for idle
331 debug_assert_eq!(
332 reason,
333 Reason::NO_ERROR,
334 "graceful GOAWAY should be NO_ERROR"
335 );
336 }
337 ready!(self.poll_ready(cx))?;
338
339 match self
340 .inner
341 .as_dyn()
342 .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
343 {
344 ReceivedFrame::Settings(frame) => {
345 self.inner.settings.recv_settings(
346 frame,
347 &mut self.codec,
348 &mut self.inner.streams,
349 )?;
350 }
351 ReceivedFrame::Continue => (),
352 ReceivedFrame::Done => {
353 return Poll::Ready(Ok(()));
354 }
355 }
356 }
357 }
358
359 fn clear_expired_reset_streams(&mut self) {
360 self.inner.streams.clear_expired_reset_streams();
361 }
362}
363
364impl<P, B> ConnectionInner<P, B>
365where
366 P: Peer,
367 B: Buf,
368{
369 fn as_dyn(&mut self) -> DynConnection<'_, B> {
370 let ConnectionInner {
371 state: &mut State,
372 go_away: &mut GoAway,
373 streams: &mut Streams,
374 error: &mut Option,
375 ping_pong: &mut PingPong,
376 ..
377 } = self;
378 let streams: DynStreams<'_, B> = streams.as_dyn();
379 DynConnection {
380 state,
381 go_away,
382 streams,
383 error,
384 ping_pong,
385 }
386 }
387}
388
389impl<B> DynConnection<'_, B>
390where
391 B: Buf,
392{
393 fn go_away(&mut self, id: StreamId, e: Reason) {
394 let frame = frame::GoAway::new(id, e);
395 self.streams.send_go_away(id);
396 self.go_away.go_away(frame);
397 }
398
399 fn go_away_now(&mut self, e: Reason) {
400 let last_processed_id = self.streams.last_processed_id();
401 let frame = frame::GoAway::new(last_processed_id, e);
402 self.go_away.go_away_now(frame);
403 }
404
405 fn go_away_now_data(&mut self, e: Reason, data: Bytes) {
406 let last_processed_id = self.streams.last_processed_id();
407 let frame = frame::GoAway::with_debug_data(last_processed_id, e, data);
408 self.go_away.go_away_now(frame);
409 }
410
411 fn go_away_from_user(&mut self, e: Reason) {
412 let last_processed_id = self.streams.last_processed_id();
413 let frame = frame::GoAway::new(last_processed_id, e);
414 self.go_away.go_away_from_user(frame);
415
416 // Notify all streams of reason we're abruptly closing.
417 self.streams.handle_error(Error::user_go_away(e));
418 }
419
420 fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
421 match result {
422 // The connection has shutdown normally
423 Ok(()) => {
424 *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
425 Ok(())
426 }
427 // Attempting to read a frame resulted in a connection level
428 // error. This is handled by setting a GOAWAY frame followed by
429 // terminating the connection.
430 Err(Error::GoAway(debug_data, reason, initiator)) => {
431 let e = Error::GoAway(debug_data.clone(), reason, initiator);
432 tracing::debug!(error = ?e, "Connection::poll; connection error");
433
434 // We may have already sent a GOAWAY for this error,
435 // if so, don't send another, just flush and close up.
436 if self
437 .go_away
438 .going_away()
439 .map_or(false, |frame| frame.reason() == reason)
440 {
441 tracing::trace!(" -> already going away");
442 *self.state = State::Closing(reason, initiator);
443 return Ok(());
444 }
445
446 // Reset all active streams
447 self.streams.handle_error(e);
448 self.go_away_now_data(reason, debug_data);
449 Ok(())
450 }
451 // Attempting to read a frame resulted in a stream level error.
452 // This is handled by resetting the frame then trying to read
453 // another frame.
454 Err(Error::Reset(id, reason, initiator)) => {
455 debug_assert_eq!(initiator, Initiator::Library);
456 tracing::trace!(?id, ?reason, "stream error");
457 self.streams.send_reset(id, reason);
458 Ok(())
459 }
460 // Attempting to read a frame resulted in an I/O error. All
461 // active streams must be reset.
462 //
463 // TODO: Are I/O errors recoverable?
464 Err(Error::Io(e, inner)) => {
465 tracing::debug!(error = ?e, "Connection::poll; IO error");
466 let e = Error::Io(e, inner);
467
468 // Reset all active streams
469 self.streams.handle_error(e.clone());
470
471 // Return the error
472 Err(e)
473 }
474 }
475 }
476
477 fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
478 use crate::frame::Frame::*;
479 match frame {
480 Some(Headers(frame)) => {
481 tracing::trace!(?frame, "recv HEADERS");
482 self.streams.recv_headers(frame)?;
483 }
484 Some(Data(frame)) => {
485 tracing::trace!(?frame, "recv DATA");
486 self.streams.recv_data(frame)?;
487 }
488 Some(Reset(frame)) => {
489 tracing::trace!(?frame, "recv RST_STREAM");
490 self.streams.recv_reset(frame)?;
491 }
492 Some(PushPromise(frame)) => {
493 tracing::trace!(?frame, "recv PUSH_PROMISE");
494 self.streams.recv_push_promise(frame)?;
495 }
496 Some(Settings(frame)) => {
497 tracing::trace!(?frame, "recv SETTINGS");
498 return Ok(ReceivedFrame::Settings(frame));
499 }
500 Some(GoAway(frame)) => {
501 tracing::trace!(?frame, "recv GOAWAY");
502 // This should prevent starting new streams,
503 // but should allow continuing to process current streams
504 // until they are all EOS. Once they are, State should
505 // transition to GoAway.
506 self.streams.recv_go_away(&frame)?;
507 *self.error = Some(frame);
508 }
509 Some(Ping(frame)) => {
510 tracing::trace!(?frame, "recv PING");
511 let status = self.ping_pong.recv_ping(frame);
512 if status.is_shutdown() {
513 assert!(
514 self.go_away.is_going_away(),
515 "received unexpected shutdown ping"
516 );
517
518 let last_processed_id = self.streams.last_processed_id();
519 self.go_away(last_processed_id, Reason::NO_ERROR);
520 }
521 }
522 Some(WindowUpdate(frame)) => {
523 tracing::trace!(?frame, "recv WINDOW_UPDATE");
524 self.streams.recv_window_update(frame)?;
525 }
526 Some(Priority(frame)) => {
527 tracing::trace!(?frame, "recv PRIORITY");
528 // TODO: handle
529 }
530 None => {
531 tracing::trace!("codec closed");
532 self.streams.recv_eof(false).expect("mutex poisoned");
533 return Ok(ReceivedFrame::Done);
534 }
535 }
536 Ok(ReceivedFrame::Continue)
537 }
538}
539
540enum ReceivedFrame {
541 Settings(frame::Settings),
542 Continue,
543 Done,
544}
545
546impl<T, B> Connection<T, client::Peer, B>
547where
548 T: AsyncRead + AsyncWrite,
549 B: Buf,
550{
551 pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
552 &self.inner.streams
553 }
554}
555
556impl<T, B> Connection<T, server::Peer, B>
557where
558 T: AsyncRead + AsyncWrite + Unpin,
559 B: Buf,
560{
561 pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
562 self.inner.streams.next_incoming()
563 }
564
565 // Graceful shutdown only makes sense for server peers.
566 pub fn go_away_gracefully(&mut self) {
567 if self.inner.go_away.is_going_away() {
568 // No reason to start a new one.
569 return;
570 }
571
572 // According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
573 //
574 // > A server that is attempting to gracefully shut down a connection
575 // > SHOULD send an initial GOAWAY frame with the last stream
576 // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the
577 // > client that a shutdown is imminent and that initiating further
578 // > requests is prohibited. After allowing time for any in-flight
579 // > stream creation (at least one round-trip time), the server can
580 // > send another GOAWAY frame with an updated last stream identifier.
581 // > This ensures that a connection can be cleanly shut down without
582 // > losing requests.
583 self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR);
584
585 // We take the advice of waiting 1 RTT literally, and wait
586 // for a pong before proceeding.
587 self.inner.ping_pong.ping_shutdown();
588 }
589}
590
591impl<T, P, B> Drop for Connection<T, P, B>
592where
593 P: Peer,
594 B: Buf,
595{
596 fn drop(&mut self) {
597 // Ignore errors as this indicates that the mutex is poisoned.
598 let _ = self.inner.streams.recv_eof(clear_pending_accept:true);
599 }
600}
601