1use crate::codec::UserError;
2use crate::frame::{Reason, StreamId};
3use crate::{client, frame, server};
4
5use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
6use crate::proto::*;
7
8use bytes::{Buf, Bytes};
9use futures_core::Stream;
10use std::io;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncWrite};
16
17/// An H2 connection
18#[derive(Debug)]
19pub(crate) struct Connection<T, P, B: Buf = Bytes>
20where
21 P: Peer,
22{
23 /// Read / write frame values
24 codec: Codec<T, Prioritized<B>>,
25
26 inner: ConnectionInner<P, B>,
27}
28
29// Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated
30// method instantiations.
31#[derive(Debug)]
32struct ConnectionInner<P, B: Buf = Bytes>
33where
34 P: Peer,
35{
36 /// Tracks the connection level state transitions.
37 state: State,
38
39 /// An error to report back once complete.
40 ///
41 /// This exists separately from State in order to support
42 /// graceful shutdown.
43 error: Option<frame::GoAway>,
44
45 /// Pending GOAWAY frames to write.
46 go_away: GoAway,
47
48 /// Ping/pong handler
49 ping_pong: PingPong,
50
51 /// Connection settings
52 settings: Settings,
53
54 /// Stream state handler
55 streams: Streams<B, P>,
56
57 /// A `tracing` span tracking the lifetime of the connection.
58 span: tracing::Span,
59
60 /// Client or server
61 _phantom: PhantomData<P>,
62}
63
64struct DynConnection<'a, B: Buf = Bytes> {
65 state: &'a mut State,
66
67 go_away: &'a mut GoAway,
68
69 streams: DynStreams<'a, B>,
70
71 error: &'a mut Option<frame::GoAway>,
72
73 ping_pong: &'a mut PingPong,
74}
75
76#[derive(Debug, Clone)]
77pub(crate) struct Config {
78 pub next_stream_id: StreamId,
79 pub initial_max_send_streams: usize,
80 pub max_send_buffer_size: usize,
81 pub reset_stream_duration: Duration,
82 pub reset_stream_max: usize,
83 pub remote_reset_stream_max: usize,
84 pub settings: frame::Settings,
85}
86
87#[derive(Debug)]
88enum State {
89 /// Currently open in a sane state
90 Open,
91
92 /// The codec must be flushed
93 Closing(Reason, Initiator),
94
95 /// In a closed state
96 Closed(Reason, Initiator),
97}
98
99impl<T, P, B> Connection<T, P, B>
100where
101 T: AsyncRead + AsyncWrite + Unpin,
102 P: Peer,
103 B: Buf,
104{
105 pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
106 fn streams_config(config: &Config) -> streams::Config {
107 streams::Config {
108 local_init_window_sz: config
109 .settings
110 .initial_window_size()
111 .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
112 initial_max_send_streams: config.initial_max_send_streams,
113 local_max_buffer_size: config.max_send_buffer_size,
114 local_next_stream_id: config.next_stream_id,
115 local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
116 extended_connect_protocol_enabled: config
117 .settings
118 .is_extended_connect_protocol_enabled()
119 .unwrap_or(false),
120 local_reset_duration: config.reset_stream_duration,
121 local_reset_max: config.reset_stream_max,
122 remote_reset_max: config.remote_reset_stream_max,
123 remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
124 remote_max_initiated: config
125 .settings
126 .max_concurrent_streams()
127 .map(|max| max as usize),
128 }
129 }
130 let streams = Streams::new(streams_config(&config));
131 Connection {
132 codec,
133 inner: ConnectionInner {
134 state: State::Open,
135 error: None,
136 go_away: GoAway::new(),
137 ping_pong: PingPong::new(),
138 settings: Settings::new(config.settings),
139 streams,
140 span: tracing::debug_span!("Connection", peer = %P::NAME),
141 _phantom: PhantomData,
142 },
143 }
144 }
145
146 /// connection flow control
147 pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
148 let _res = self.inner.streams.set_target_connection_window_size(size);
149 // TODO: proper error handling
150 debug_assert!(_res.is_ok());
151 }
152
153 /// Send a new SETTINGS frame with an updated initial window size.
154 pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
155 let mut settings = frame::Settings::default();
156 settings.set_initial_window_size(Some(size));
157 self.inner.settings.send_settings(settings)
158 }
159
160 /// Send a new SETTINGS frame with extended CONNECT protocol enabled.
161 pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> {
162 let mut settings = frame::Settings::default();
163 settings.set_enable_connect_protocol(Some(1));
164 self.inner.settings.send_settings(settings)
165 }
166
167 /// Returns the maximum number of concurrent streams that may be initiated
168 /// by this peer.
169 pub(crate) fn max_send_streams(&self) -> usize {
170 self.inner.streams.max_send_streams()
171 }
172
173 /// Returns the maximum number of concurrent streams that may be initiated
174 /// by the remote peer.
175 pub(crate) fn max_recv_streams(&self) -> usize {
176 self.inner.streams.max_recv_streams()
177 }
178
179 #[cfg(feature = "unstable")]
180 pub fn num_wired_streams(&self) -> usize {
181 self.inner.streams.num_wired_streams()
182 }
183
184 /// Returns `Ready` when the connection is ready to receive a frame.
185 ///
186 /// Returns `Error` as this may raise errors that are caused by delayed
187 /// processing of received frames.
188 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
189 let _e = self.inner.span.enter();
190 let span = tracing::trace_span!("poll_ready");
191 let _e = span.enter();
192 // The order of these calls don't really matter too much
193 ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?;
194 ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?;
195 ready!(self
196 .inner
197 .settings
198 .poll_send(cx, &mut self.codec, &mut self.inner.streams))?;
199 ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?;
200
201 Poll::Ready(Ok(()))
202 }
203
204 /// Send any pending GOAWAY frames.
205 ///
206 /// This will return `Some(reason)` if the connection should be closed
207 /// afterwards. If this is a graceful shutdown, this returns `None`.
208 fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> {
209 self.inner.go_away.send_pending_go_away(cx, &mut self.codec)
210 }
211
212 pub fn go_away_from_user(&mut self, e: Reason) {
213 self.inner.as_dyn().go_away_from_user(e)
214 }
215
216 fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
217 let (debug_data, theirs) = self
218 .inner
219 .error
220 .take()
221 .as_ref()
222 .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
223 (frame.debug_data().clone(), frame.reason())
224 });
225
226 match (ours, theirs) {
227 (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
228 (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
229 // If both sides reported an error, give their
230 // error back to th user. We assume our error
231 // was a consequence of their error, and less
232 // important.
233 (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
234 }
235 }
236
237 /// Closes the connection by transitioning to a GOAWAY state
238 /// iff there are no streams or references
239 pub fn maybe_close_connection_if_no_streams(&mut self) {
240 // If we poll() and realize that there are no streams or references
241 // then we can close the connection by transitioning to GOAWAY
242 if !self.inner.streams.has_streams_or_other_references() {
243 self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
244 }
245 }
246
247 pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
248 self.inner.ping_pong.take_user_pings()
249 }
250
251 /// Advances the internal state of the connection.
252 pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
253 // XXX(eliza): cloning the span is unfortunately necessary here in
254 // order to placate the borrow checker — `self` is mutably borrowed by
255 // `poll2`, which means that we can't borrow `self.span` to enter it.
256 // The clone is just an atomic ref bump.
257 let span = self.inner.span.clone();
258 let _e = span.enter();
259 let span = tracing::trace_span!("poll");
260 let _e = span.enter();
261
262 loop {
263 tracing::trace!(connection.state = ?self.inner.state);
264 // TODO: probably clean up this glob of code
265 match self.inner.state {
266 // When open, continue to poll a frame
267 State::Open => {
268 let result = match self.poll2(cx) {
269 Poll::Ready(result) => result,
270 // The connection is not ready to make progress
271 Poll::Pending => {
272 // Ensure all window updates have been sent.
273 //
274 // This will also handle flushing `self.codec`
275 ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?;
276
277 if (self.inner.error.is_some()
278 || self.inner.go_away.should_close_on_idle())
279 && !self.inner.streams.has_streams()
280 {
281 self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
282 continue;
283 }
284
285 return Poll::Pending;
286 }
287 };
288
289 self.inner.as_dyn().handle_poll2_result(result)?
290 }
291 State::Closing(reason, initiator) => {
292 tracing::trace!("connection closing after flush");
293 // Flush/shutdown the codec
294 ready!(self.codec.shutdown(cx))?;
295
296 // Transition the state to error
297 self.inner.state = State::Closed(reason, initiator);
298 }
299 State::Closed(reason, initiator) => {
300 return Poll::Ready(self.take_error(reason, initiator));
301 }
302 }
303 }
304 }
305
306 fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
307 // This happens outside of the loop to prevent needing to do a clock
308 // check and then comparison of the queue possibly multiple times a
309 // second (and thus, the clock wouldn't have changed enough to matter).
310 self.clear_expired_reset_streams();
311
312 loop {
313 // First, ensure that the `Connection` is able to receive a frame
314 //
315 // The order here matters:
316 // - poll_go_away may buffer a graceful shutdown GOAWAY frame
317 // - If it has, we've also added a PING to be sent in poll_ready
318 if let Some(reason) = ready!(self.poll_go_away(cx)?) {
319 if self.inner.go_away.should_close_now() {
320 if self.inner.go_away.is_user_initiated() {
321 // A user initiated abrupt shutdown shouldn't return
322 // the same error back to the user.
323 return Poll::Ready(Ok(()));
324 } else {
325 return Poll::Ready(Err(Error::library_go_away(reason)));
326 }
327 }
328 // Only NO_ERROR should be waiting for idle
329 debug_assert_eq!(
330 reason,
331 Reason::NO_ERROR,
332 "graceful GOAWAY should be NO_ERROR"
333 );
334 }
335 ready!(self.poll_ready(cx))?;
336
337 match self
338 .inner
339 .as_dyn()
340 .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
341 {
342 ReceivedFrame::Settings(frame) => {
343 self.inner.settings.recv_settings(
344 frame,
345 &mut self.codec,
346 &mut self.inner.streams,
347 )?;
348 }
349 ReceivedFrame::Continue => (),
350 ReceivedFrame::Done => {
351 return Poll::Ready(Ok(()));
352 }
353 }
354 }
355 }
356
357 fn clear_expired_reset_streams(&mut self) {
358 self.inner.streams.clear_expired_reset_streams();
359 }
360}
361
362impl<P, B> ConnectionInner<P, B>
363where
364 P: Peer,
365 B: Buf,
366{
367 fn as_dyn(&mut self) -> DynConnection<'_, B> {
368 let ConnectionInner {
369 state: &mut State,
370 go_away: &mut GoAway,
371 streams: &mut Streams,
372 error: &mut Option,
373 ping_pong: &mut PingPong,
374 ..
375 } = self;
376 let streams: DynStreams<'_, B> = streams.as_dyn();
377 DynConnection {
378 state,
379 go_away,
380 streams,
381 error,
382 ping_pong,
383 }
384 }
385}
386
387impl<B> DynConnection<'_, B>
388where
389 B: Buf,
390{
391 fn go_away(&mut self, id: StreamId, e: Reason) {
392 let frame = frame::GoAway::new(id, e);
393 self.streams.send_go_away(id);
394 self.go_away.go_away(frame);
395 }
396
397 fn go_away_now(&mut self, e: Reason) {
398 let last_processed_id = self.streams.last_processed_id();
399 let frame = frame::GoAway::new(last_processed_id, e);
400 self.go_away.go_away_now(frame);
401 }
402
403 fn go_away_now_data(&mut self, e: Reason, data: Bytes) {
404 let last_processed_id = self.streams.last_processed_id();
405 let frame = frame::GoAway::with_debug_data(last_processed_id, e, data);
406 self.go_away.go_away_now(frame);
407 }
408
409 fn go_away_from_user(&mut self, e: Reason) {
410 let last_processed_id = self.streams.last_processed_id();
411 let frame = frame::GoAway::new(last_processed_id, e);
412 self.go_away.go_away_from_user(frame);
413
414 // Notify all streams of reason we're abruptly closing.
415 self.streams.handle_error(Error::user_go_away(e));
416 }
417
418 fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
419 match result {
420 // The connection has shutdown normally
421 Ok(()) => {
422 *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
423 Ok(())
424 }
425 // Attempting to read a frame resulted in a connection level
426 // error. This is handled by setting a GOAWAY frame followed by
427 // terminating the connection.
428 Err(Error::GoAway(debug_data, reason, initiator)) => {
429 let e = Error::GoAway(debug_data.clone(), reason, initiator);
430 tracing::debug!(error = ?e, "Connection::poll; connection error");
431
432 // We may have already sent a GOAWAY for this error,
433 // if so, don't send another, just flush and close up.
434 if self
435 .go_away
436 .going_away()
437 .map_or(false, |frame| frame.reason() == reason)
438 {
439 tracing::trace!(" -> already going away");
440 *self.state = State::Closing(reason, initiator);
441 return Ok(());
442 }
443
444 // Reset all active streams
445 self.streams.handle_error(e);
446 self.go_away_now_data(reason, debug_data);
447 Ok(())
448 }
449 // Attempting to read a frame resulted in a stream level error.
450 // This is handled by resetting the frame then trying to read
451 // another frame.
452 Err(Error::Reset(id, reason, initiator)) => {
453 debug_assert_eq!(initiator, Initiator::Library);
454 tracing::trace!(?id, ?reason, "stream error");
455 self.streams.send_reset(id, reason);
456 Ok(())
457 }
458 // Attempting to read a frame resulted in an I/O error. All
459 // active streams must be reset.
460 //
461 // TODO: Are I/O errors recoverable?
462 Err(Error::Io(e, inner)) => {
463 tracing::debug!(error = ?e, "Connection::poll; IO error");
464 let e = Error::Io(e, inner);
465
466 // Reset all active streams
467 self.streams.handle_error(e.clone());
468
469 // Return the error
470 Err(e)
471 }
472 }
473 }
474
475 fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
476 use crate::frame::Frame::*;
477 match frame {
478 Some(Headers(frame)) => {
479 tracing::trace!(?frame, "recv HEADERS");
480 self.streams.recv_headers(frame)?;
481 }
482 Some(Data(frame)) => {
483 tracing::trace!(?frame, "recv DATA");
484 self.streams.recv_data(frame)?;
485 }
486 Some(Reset(frame)) => {
487 tracing::trace!(?frame, "recv RST_STREAM");
488 self.streams.recv_reset(frame)?;
489 }
490 Some(PushPromise(frame)) => {
491 tracing::trace!(?frame, "recv PUSH_PROMISE");
492 self.streams.recv_push_promise(frame)?;
493 }
494 Some(Settings(frame)) => {
495 tracing::trace!(?frame, "recv SETTINGS");
496 return Ok(ReceivedFrame::Settings(frame));
497 }
498 Some(GoAway(frame)) => {
499 tracing::trace!(?frame, "recv GOAWAY");
500 // This should prevent starting new streams,
501 // but should allow continuing to process current streams
502 // until they are all EOS. Once they are, State should
503 // transition to GoAway.
504 self.streams.recv_go_away(&frame)?;
505 *self.error = Some(frame);
506 }
507 Some(Ping(frame)) => {
508 tracing::trace!(?frame, "recv PING");
509 let status = self.ping_pong.recv_ping(frame);
510 if status.is_shutdown() {
511 assert!(
512 self.go_away.is_going_away(),
513 "received unexpected shutdown ping"
514 );
515
516 let last_processed_id = self.streams.last_processed_id();
517 self.go_away(last_processed_id, Reason::NO_ERROR);
518 }
519 }
520 Some(WindowUpdate(frame)) => {
521 tracing::trace!(?frame, "recv WINDOW_UPDATE");
522 self.streams.recv_window_update(frame)?;
523 }
524 Some(Priority(frame)) => {
525 tracing::trace!(?frame, "recv PRIORITY");
526 // TODO: handle
527 }
528 None => {
529 tracing::trace!("codec closed");
530 self.streams.recv_eof(false).expect("mutex poisoned");
531 return Ok(ReceivedFrame::Done);
532 }
533 }
534 Ok(ReceivedFrame::Continue)
535 }
536}
537
538enum ReceivedFrame {
539 Settings(frame::Settings),
540 Continue,
541 Done,
542}
543
544impl<T, B> Connection<T, client::Peer, B>
545where
546 T: AsyncRead + AsyncWrite,
547 B: Buf,
548{
549 pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
550 &self.inner.streams
551 }
552}
553
554impl<T, B> Connection<T, server::Peer, B>
555where
556 T: AsyncRead + AsyncWrite + Unpin,
557 B: Buf,
558{
559 pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
560 self.inner.streams.next_incoming()
561 }
562
563 // Graceful shutdown only makes sense for server peers.
564 pub fn go_away_gracefully(&mut self) {
565 if self.inner.go_away.is_going_away() {
566 // No reason to start a new one.
567 return;
568 }
569
570 // According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
571 //
572 // > A server that is attempting to gracefully shut down a connection
573 // > SHOULD send an initial GOAWAY frame with the last stream
574 // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the
575 // > client that a shutdown is imminent and that initiating further
576 // > requests is prohibited. After allowing time for any in-flight
577 // > stream creation (at least one round-trip time), the server can
578 // > send another GOAWAY frame with an updated last stream identifier.
579 // > This ensures that a connection can be cleanly shut down without
580 // > losing requests.
581 self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR);
582
583 // We take the advice of waiting 1 RTT literally, and wait
584 // for a pong before proceeding.
585 self.inner.ping_pong.ping_shutdown();
586 }
587}
588
589impl<T, P, B> Drop for Connection<T, P, B>
590where
591 P: Peer,
592 B: Buf,
593{
594 fn drop(&mut self) {
595 // Ignore errors as this indicates that the mutex is poisoned.
596 let _ = self.inner.streams.recv_eof(clear_pending_accept:true);
597 }
598}
599