1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use h2::server;
68//! use http::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//! // Accept all incoming TCP connections.
76//! loop {
77//! if let Ok((socket, _peer_addr)) = listener.accept().await {
78//! // Spawn a new task to process each connection.
79//! tokio::spawn(async {
80//! // Start the HTTP/2 connection handshake
81//! let mut h2 = server::handshake(socket).await.unwrap();
82//! // Accept all inbound HTTP/2 streams sent over the
83//! // connection.
84//! while let Some(request) = h2.accept().await {
85//! let (request, mut respond) = request.unwrap();
86//! println!("Received request: {:?}", request);
87//!
88//! // Build a response with no body
89//! let response = Response::builder()
90//! .status(StatusCode::OK)
91//! .body(())
92//! .unwrap();
93//!
94//! // Send the response back to the client
95//! respond.send_response(response, true)
96//! .unwrap();
97//! }
98//!
99//! });
100//! }
101//! }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::codec::{Codec, UserError};
119use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::proto::{self, Config, Error, Prioritized};
121use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123use bytes::{Buf, Bytes};
124use http::{HeaderMap, Method, Request, Response};
125use std::future::Future;
126use std::pin::Pin;
127use std::task::{Context, Poll};
128use std::time::Duration;
129use std::{fmt, io};
130use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131use tracing::instrument::{Instrument, Instrumented};
132
133/// In progress HTTP/2 connection handshake future.
134///
135/// This type implements `Future`, yielding a `Connection` instance once the
136/// handshake has completed.
137///
138/// The handshake is completed once the connection preface is fully received
139/// from the client **and** the initial settings frame is sent to the client.
140///
141/// The handshake future does not wait for the initial settings frame from the
142/// client.
143///
144/// See [module] level docs for more details.
145///
146/// [module]: index.html
147#[must_use = "futures do nothing unless polled"]
148pub struct Handshake<T, B: Buf = Bytes> {
149 /// The config to pass to Connection::new after handshake succeeds.
150 builder: Builder,
151 /// The current state of the handshake.
152 state: Handshaking<T, B>,
153 /// Span tracking the handshake
154 span: tracing::Span,
155}
156
157/// Accepts inbound HTTP/2 streams on a connection.
158///
159/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160/// implements the HTTP/2 server logic for that connection. It is responsible
161/// for receiving inbound streams initiated by the client as well as driving the
162/// internal state forward.
163///
164/// `Connection` values are created by calling [`handshake`]. Once a
165/// `Connection` value is obtained, the caller must call [`poll`] or
166/// [`poll_close`] in order to drive the internal connection state forward.
167///
168/// See [module level] documentation for more details
169///
170/// [module level]: index.html
171/// [`handshake`]: struct.Connection.html#method.handshake
172/// [`poll`]: struct.Connection.html#method.poll
173/// [`poll_close`]: struct.Connection.html#method.poll_close
174///
175/// # Examples
176///
177/// ```
178/// # use tokio::io::{AsyncRead, AsyncWrite};
179/// # use h2::server;
180/// # use h2::server::*;
181/// #
182/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183/// let mut server = server::handshake(my_io).await.unwrap();
184/// while let Some(request) = server.accept().await {
185/// tokio::spawn(async move {
186/// let (request, respond) = request.unwrap();
187/// // Process the request and send the response back to the client
188/// // using `respond`.
189/// });
190/// }
191/// # }
192/// #
193/// # pub fn main() {}
194/// ```
195#[must_use = "streams do nothing unless polled"]
196pub struct Connection<T, B: Buf> {
197 connection: proto::Connection<T, Peer, B>,
198}
199
200/// Builds server connections with custom configuration values.
201///
202/// Methods can be chained in order to set the configuration values.
203///
204/// The server is constructed by calling [`handshake`] and passing the I/O
205/// handle that will back the HTTP/2 server.
206///
207/// New instances of `Builder` are obtained via [`Builder::new`].
208///
209/// See function level documentation for details on the various server
210/// configuration settings.
211///
212/// [`Builder::new`]: struct.Builder.html#method.new
213/// [`handshake`]: struct.Builder.html#method.handshake
214///
215/// # Examples
216///
217/// ```
218/// # use tokio::io::{AsyncRead, AsyncWrite};
219/// # use h2::server::*;
220/// #
221/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222/// # -> Handshake<T>
223/// # {
224/// // `server_fut` is a future representing the completion of the HTTP/2
225/// // handshake.
226/// let server_fut = Builder::new()
227/// .initial_window_size(1_000_000)
228/// .max_concurrent_streams(1000)
229/// .handshake(my_io);
230/// # server_fut
231/// # }
232/// #
233/// # pub fn main() {}
234/// ```
235#[derive(Clone, Debug)]
236pub struct Builder {
237 /// Time to keep locally reset streams around before reaping.
238 reset_stream_duration: Duration,
239
240 /// Maximum number of locally reset streams to keep at a time.
241 reset_stream_max: usize,
242
243 /// Maximum number of remotely reset streams to allow in the pending
244 /// accept queue.
245 pending_accept_reset_stream_max: usize,
246
247 /// Initial `Settings` frame to send as part of the handshake.
248 settings: Settings,
249
250 /// Initial target window size for new connections.
251 initial_target_connection_window_size: Option<u32>,
252
253 /// Maximum amount of bytes to "buffer" for writing per stream.
254 max_send_buffer_size: usize,
255
256 /// Maximum number of locally reset streams due to protocol error across
257 /// the lifetime of the connection.
258 ///
259 /// When this gets exceeded, we issue GOAWAYs.
260 local_max_error_reset_streams: Option<usize>,
261}
262
263/// Send a response back to the client
264///
265/// A `SendResponse` instance is provided when receiving a request and is used
266/// to send the associated response back to the client. It is also used to
267/// explicitly reset the stream with a custom reason.
268///
269/// It will also be used to initiate push promises linked with the associated
270/// stream.
271///
272/// If the `SendResponse` instance is dropped without sending a response, then
273/// the HTTP/2 stream will be reset.
274///
275/// See [module] level docs for more details.
276///
277/// [module]: index.html
278#[derive(Debug)]
279pub struct SendResponse<B: Buf> {
280 inner: proto::StreamRef<B>,
281}
282
283/// Send a response to a promised request
284///
285/// A `SendPushedResponse` instance is provided when promising a request and is used
286/// to send the associated response to the client. It is also used to
287/// explicitly reset the stream with a custom reason.
288///
289/// It can not be used to initiate push promises.
290///
291/// If the `SendPushedResponse` instance is dropped without sending a response, then
292/// the HTTP/2 stream will be reset.
293///
294/// See [module] level docs for more details.
295///
296/// [module]: index.html
297pub struct SendPushedResponse<B: Buf> {
298 inner: SendResponse<B>,
299}
300
301// Manual implementation necessary because of rust-lang/rust#26925
302impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
303 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304 write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
305 }
306}
307
308/// Stages of an in-progress handshake.
309enum Handshaking<T, B: Buf> {
310 /// State 1. Connection is flushing pending SETTINGS frame.
311 Flushing(Instrumented<Flush<T, Prioritized<B>>>),
312 /// State 2. Connection is waiting for the client preface.
313 ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
314 /// State 3. Handshake is done, polling again would panic.
315 Done,
316}
317
318/// Flush a Sink
319struct Flush<T, B> {
320 codec: Option<Codec<T, B>>,
321}
322
323/// Read the client connection preface
324struct ReadPreface<T, B> {
325 codec: Option<Codec<T, B>>,
326 pos: usize,
327}
328
329#[derive(Debug)]
330pub(crate) struct Peer;
331
332const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
333
334/// Creates a new configured HTTP/2 server with default configuration
335/// values backed by `io`.
336///
337/// It is expected that `io` already be in an appropriate state to commence
338/// the [HTTP/2 handshake]. See [Handshake] for more details.
339///
340/// Returns a future which resolves to the [`Connection`] instance once the
341/// HTTP/2 handshake has been completed. The returned [`Connection`]
342/// instance will be using default configuration values. Use [`Builder`] to
343/// customize the configuration values used by a [`Connection`] instance.
344///
345/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
346/// [Handshake]: ../index.html#handshake
347/// [`Connection`]: struct.Connection.html
348///
349/// # Examples
350///
351/// ```
352/// # use tokio::io::{AsyncRead, AsyncWrite};
353/// # use h2::server;
354/// # use h2::server::*;
355/// #
356/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
357/// # {
358/// let connection = server::handshake(my_io).await.unwrap();
359/// // The HTTP/2 handshake has completed, now use `connection` to
360/// // accept inbound HTTP/2 streams.
361/// # }
362/// #
363/// # pub fn main() {}
364/// ```
365pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
366where
367 T: AsyncRead + AsyncWrite + Unpin,
368{
369 Builder::new().handshake(io)
370}
371
372// ===== impl Connection =====
373
374impl<T, B> Connection<T, B>
375where
376 T: AsyncRead + AsyncWrite + Unpin,
377 B: Buf,
378{
379 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
380 let span = tracing::trace_span!("server_handshake");
381 let entered = span.enter();
382
383 // Create the codec.
384 let mut codec = Codec::new(io);
385
386 if let Some(max) = builder.settings.max_frame_size() {
387 codec.set_max_recv_frame_size(max as usize);
388 }
389
390 if let Some(max) = builder.settings.max_header_list_size() {
391 codec.set_max_recv_header_list_size(max as usize);
392 }
393
394 // Send initial settings frame.
395 codec
396 .buffer(builder.settings.clone().into())
397 .expect("invalid SETTINGS frame");
398
399 // Create the handshake future.
400 let state =
401 Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
402
403 drop(entered);
404
405 Handshake {
406 builder,
407 state,
408 span,
409 }
410 }
411
412 /// Accept the next incoming request on this connection.
413 pub async fn accept(
414 &mut self,
415 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
416 futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
417 }
418
419 #[doc(hidden)]
420 pub fn poll_accept(
421 &mut self,
422 cx: &mut Context<'_>,
423 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
424 // Always try to advance the internal state. Getting Pending also is
425 // needed to allow this function to return Pending.
426 if self.poll_closed(cx)?.is_ready() {
427 // If the socket is closed, don't return anything
428 // TODO: drop any pending streams
429 return Poll::Ready(None);
430 }
431
432 if let Some(inner) = self.connection.next_incoming() {
433 tracing::trace!("received incoming");
434 let (head, _) = inner.take_request().into_parts();
435 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
436
437 let request = Request::from_parts(head, body);
438 let respond = SendResponse { inner };
439
440 return Poll::Ready(Some(Ok((request, respond))));
441 }
442
443 Poll::Pending
444 }
445
446 /// Sets the target window size for the whole connection.
447 ///
448 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
449 /// frame will be immediately sent to the remote, increasing the connection
450 /// level window by `size - current_value`.
451 ///
452 /// If `size` is less than the current value, nothing will happen
453 /// immediately. However, as window capacity is released by
454 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
455 /// out until the number of "in flight" bytes drops below `size`.
456 ///
457 /// The default value is 65,535.
458 ///
459 /// See [`FlowControl`] documentation for more details.
460 ///
461 /// [`FlowControl`]: ../struct.FlowControl.html
462 /// [library level]: ../index.html#flow-control
463 pub fn set_target_window_size(&mut self, size: u32) {
464 assert!(size <= proto::MAX_WINDOW_SIZE);
465 self.connection.set_target_window_size(size);
466 }
467
468 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
469 /// flow control for received data.
470 ///
471 /// The `SETTINGS` will be sent to the remote, and only applied once the
472 /// remote acknowledges the change.
473 ///
474 /// This can be used to increase or decrease the window size for existing
475 /// streams.
476 ///
477 /// # Errors
478 ///
479 /// Returns an error if a previous call is still pending acknowledgement
480 /// from the remote endpoint.
481 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
482 assert!(size <= proto::MAX_WINDOW_SIZE);
483 self.connection.set_initial_window_size(size)?;
484 Ok(())
485 }
486
487 /// Enables the [extended CONNECT protocol].
488 ///
489 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
490 ///
491 /// # Errors
492 ///
493 /// Returns an error if a previous call is still pending acknowledgement
494 /// from the remote endpoint.
495 pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
496 self.connection.set_enable_connect_protocol()?;
497 Ok(())
498 }
499
500 /// Returns `Ready` when the underlying connection has closed.
501 ///
502 /// If any new inbound streams are received during a call to `poll_closed`,
503 /// they will be queued and returned on the next call to [`poll_accept`].
504 ///
505 /// This function will advance the internal connection state, driving
506 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
507 ///
508 /// See [here](index.html#managing-the-connection) for more details.
509 ///
510 /// [`poll_accept`]: struct.Connection.html#method.poll_accept
511 /// [`RecvStream`]: ../struct.RecvStream.html
512 /// [`SendStream`]: ../struct.SendStream.html
513 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514 self.connection.poll(cx).map_err(Into::into)
515 }
516
517 #[doc(hidden)]
518 #[deprecated(note = "renamed to poll_closed")]
519 pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
520 self.poll_closed(cx)
521 }
522
523 /// Sets the connection to a GOAWAY state.
524 ///
525 /// Does not terminate the connection. Must continue being polled to close
526 /// connection.
527 ///
528 /// After flushing the GOAWAY frame, the connection is closed. Any
529 /// outstanding streams do not prevent the connection from closing. This
530 /// should usually be reserved for shutting down when something bad
531 /// external to `h2` has happened, and open streams cannot be properly
532 /// handled.
533 ///
534 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
535 pub fn abrupt_shutdown(&mut self, reason: Reason) {
536 self.connection.go_away_from_user(reason);
537 }
538
539 /// Starts a [graceful shutdown][1] process.
540 ///
541 /// Must continue being polled to close connection.
542 ///
543 /// It's possible to receive more requests after calling this method, since
544 /// they might have been in-flight from the client already. After about
545 /// 1 RTT, no new requests should be accepted. Once all active streams
546 /// have completed, the connection is closed.
547 ///
548 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
549 pub fn graceful_shutdown(&mut self) {
550 self.connection.go_away_gracefully();
551 }
552
553 /// Takes a `PingPong` instance from the connection.
554 ///
555 /// # Note
556 ///
557 /// This may only be called once. Calling multiple times will return `None`.
558 pub fn ping_pong(&mut self) -> Option<PingPong> {
559 self.connection.take_user_pings().map(PingPong::new)
560 }
561
562 /// Returns the maximum number of concurrent streams that may be initiated
563 /// by the server on this connection.
564 ///
565 /// This limit is configured by the client peer by sending the
566 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
567 /// This method returns the currently acknowledged value received from the
568 /// remote.
569 ///
570 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
571 pub fn max_concurrent_send_streams(&self) -> usize {
572 self.connection.max_send_streams()
573 }
574
575 /// Returns the maximum number of concurrent streams that may be initiated
576 /// by the client on this connection.
577 ///
578 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
579 /// parameter][1] sent in a `SETTINGS` frame that has been
580 /// acknowledged by the remote peer. The value to be sent is configured by
581 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
582 /// with the remote peer.
583 ///
584 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
585 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
586 pub fn max_concurrent_recv_streams(&self) -> usize {
587 self.connection.max_recv_streams()
588 }
589
590 // Could disappear at anytime.
591 #[doc(hidden)]
592 #[cfg(feature = "unstable")]
593 pub fn num_wired_streams(&self) -> usize {
594 self.connection.num_wired_streams()
595 }
596}
597
598#[cfg(feature = "stream")]
599impl<T, B> futures_core::Stream for Connection<T, B>
600where
601 T: AsyncRead + AsyncWrite + Unpin,
602 B: Buf,
603{
604 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
605
606 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
607 self.poll_accept(cx)
608 }
609}
610
611impl<T, B> fmt::Debug for Connection<T, B>
612where
613 T: fmt::Debug,
614 B: fmt::Debug + Buf,
615{
616 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
617 fmt&mut DebugStruct<'_, '_>.debug_struct("Connection")
618 .field(name:"connection", &self.connection)
619 .finish()
620 }
621}
622
623// ===== impl Builder =====
624
625impl Builder {
626 /// Returns a new server builder instance initialized with default
627 /// configuration values.
628 ///
629 /// Configuration methods can be chained on the return value.
630 ///
631 /// # Examples
632 ///
633 /// ```
634 /// # use tokio::io::{AsyncRead, AsyncWrite};
635 /// # use h2::server::*;
636 /// #
637 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
638 /// # -> Handshake<T>
639 /// # {
640 /// // `server_fut` is a future representing the completion of the HTTP/2
641 /// // handshake.
642 /// let server_fut = Builder::new()
643 /// .initial_window_size(1_000_000)
644 /// .max_concurrent_streams(1000)
645 /// .handshake(my_io);
646 /// # server_fut
647 /// # }
648 /// #
649 /// # pub fn main() {}
650 /// ```
651 pub fn new() -> Builder {
652 Builder {
653 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
654 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
655 pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
656 settings: Settings::default(),
657 initial_target_connection_window_size: None,
658 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
659
660 local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
661 }
662 }
663
664 /// Indicates the initial window size (in octets) for stream-level
665 /// flow control for received data.
666 ///
667 /// The initial window of a stream is used as part of flow control. For more
668 /// details, see [`FlowControl`].
669 ///
670 /// The default value is 65,535.
671 ///
672 /// [`FlowControl`]: ../struct.FlowControl.html
673 ///
674 /// # Examples
675 ///
676 /// ```
677 /// # use tokio::io::{AsyncRead, AsyncWrite};
678 /// # use h2::server::*;
679 /// #
680 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
681 /// # -> Handshake<T>
682 /// # {
683 /// // `server_fut` is a future representing the completion of the HTTP/2
684 /// // handshake.
685 /// let server_fut = Builder::new()
686 /// .initial_window_size(1_000_000)
687 /// .handshake(my_io);
688 /// # server_fut
689 /// # }
690 /// #
691 /// # pub fn main() {}
692 /// ```
693 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
694 self.settings.set_initial_window_size(Some(size));
695 self
696 }
697
698 /// Indicates the initial window size (in octets) for connection-level flow control
699 /// for received data.
700 ///
701 /// The initial window of a connection is used as part of flow control. For more details,
702 /// see [`FlowControl`].
703 ///
704 /// The default value is 65,535.
705 ///
706 /// [`FlowControl`]: ../struct.FlowControl.html
707 ///
708 /// # Examples
709 ///
710 /// ```
711 /// # use tokio::io::{AsyncRead, AsyncWrite};
712 /// # use h2::server::*;
713 /// #
714 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
715 /// # -> Handshake<T>
716 /// # {
717 /// // `server_fut` is a future representing the completion of the HTTP/2
718 /// // handshake.
719 /// let server_fut = Builder::new()
720 /// .initial_connection_window_size(1_000_000)
721 /// .handshake(my_io);
722 /// # server_fut
723 /// # }
724 /// #
725 /// # pub fn main() {}
726 /// ```
727 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
728 self.initial_target_connection_window_size = Some(size);
729 self
730 }
731
732 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
733 /// configured server is able to accept.
734 ///
735 /// The sender may send data frames that are **smaller** than this value,
736 /// but any data larger than `max` will be broken up into multiple `DATA`
737 /// frames.
738 ///
739 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
740 ///
741 /// # Examples
742 ///
743 /// ```
744 /// # use tokio::io::{AsyncRead, AsyncWrite};
745 /// # use h2::server::*;
746 /// #
747 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
748 /// # -> Handshake<T>
749 /// # {
750 /// // `server_fut` is a future representing the completion of the HTTP/2
751 /// // handshake.
752 /// let server_fut = Builder::new()
753 /// .max_frame_size(1_000_000)
754 /// .handshake(my_io);
755 /// # server_fut
756 /// # }
757 /// #
758 /// # pub fn main() {}
759 /// ```
760 ///
761 /// # Panics
762 ///
763 /// This function panics if `max` is not within the legal range specified
764 /// above.
765 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
766 self.settings.set_max_frame_size(Some(max));
767 self
768 }
769
770 /// Sets the max size of received header frames.
771 ///
772 /// This advisory setting informs a peer of the maximum size of header list
773 /// that the sender is prepared to accept, in octets. The value is based on
774 /// the uncompressed size of header fields, including the length of the name
775 /// and value in octets plus an overhead of 32 octets for each header field.
776 ///
777 /// This setting is also used to limit the maximum amount of data that is
778 /// buffered to decode HEADERS frames.
779 ///
780 /// # Examples
781 ///
782 /// ```
783 /// # use tokio::io::{AsyncRead, AsyncWrite};
784 /// # use h2::server::*;
785 /// #
786 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
787 /// # -> Handshake<T>
788 /// # {
789 /// // `server_fut` is a future representing the completion of the HTTP/2
790 /// // handshake.
791 /// let server_fut = Builder::new()
792 /// .max_header_list_size(16 * 1024)
793 /// .handshake(my_io);
794 /// # server_fut
795 /// # }
796 /// #
797 /// # pub fn main() {}
798 /// ```
799 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
800 self.settings.set_max_header_list_size(Some(max));
801 self
802 }
803
804 /// Sets the maximum number of concurrent streams.
805 ///
806 /// The maximum concurrent streams setting only controls the maximum number
807 /// of streams that can be initiated by the remote peer. In other words,
808 /// when this setting is set to 100, this does not limit the number of
809 /// concurrent streams that can be created by the caller.
810 ///
811 /// It is recommended that this value be no smaller than 100, so as to not
812 /// unnecessarily limit parallelism. However, any value is legal, including
813 /// 0. If `max` is set to 0, then the remote will not be permitted to
814 /// initiate streams.
815 ///
816 /// Note that streams in the reserved state, i.e., push promises that have
817 /// been reserved but the stream has not started, do not count against this
818 /// setting.
819 ///
820 /// Also note that if the remote *does* exceed the value set here, it is not
821 /// a protocol level error. Instead, the `h2` library will immediately reset
822 /// the stream.
823 ///
824 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
825 ///
826 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
827 ///
828 /// # Examples
829 ///
830 /// ```
831 /// # use tokio::io::{AsyncRead, AsyncWrite};
832 /// # use h2::server::*;
833 /// #
834 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
835 /// # -> Handshake<T>
836 /// # {
837 /// // `server_fut` is a future representing the completion of the HTTP/2
838 /// // handshake.
839 /// let server_fut = Builder::new()
840 /// .max_concurrent_streams(1000)
841 /// .handshake(my_io);
842 /// # server_fut
843 /// # }
844 /// #
845 /// # pub fn main() {}
846 /// ```
847 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
848 self.settings.set_max_concurrent_streams(Some(max));
849 self
850 }
851
852 /// Sets the maximum number of concurrent locally reset streams.
853 ///
854 /// When a stream is explicitly reset by either calling
855 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
856 /// before completing the stream, the HTTP/2 specification requires that
857 /// any further frames received for that stream must be ignored for "some
858 /// time".
859 ///
860 /// In order to satisfy the specification, internal state must be maintained
861 /// to implement the behavior. This state grows linearly with the number of
862 /// streams that are locally reset.
863 ///
864 /// The `max_concurrent_reset_streams` setting configures sets an upper
865 /// bound on the amount of state that is maintained. When this max value is
866 /// reached, the oldest reset stream is purged from memory.
867 ///
868 /// Once the stream has been fully purged from memory, any additional frames
869 /// received for that stream will result in a connection level protocol
870 /// error, forcing the connection to terminate.
871 ///
872 /// The default value is 10.
873 ///
874 /// # Examples
875 ///
876 /// ```
877 /// # use tokio::io::{AsyncRead, AsyncWrite};
878 /// # use h2::server::*;
879 /// #
880 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
881 /// # -> Handshake<T>
882 /// # {
883 /// // `server_fut` is a future representing the completion of the HTTP/2
884 /// // handshake.
885 /// let server_fut = Builder::new()
886 /// .max_concurrent_reset_streams(1000)
887 /// .handshake(my_io);
888 /// # server_fut
889 /// # }
890 /// #
891 /// # pub fn main() {}
892 /// ```
893 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
894 self.reset_stream_max = max;
895 self
896 }
897
898 /// Sets the maximum number of local resets due to protocol errors made by the remote end.
899 ///
900 /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
901 /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
902 /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
903 ///
904 /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
905 /// `ENHANCE_YOUR_CALM` to the client.
906 ///
907 /// If you really want to disable this, supply [`Option::None`] here.
908 /// Disabling this is not recommended and may expose you to DOS attacks.
909 ///
910 /// The default value is currently 1024, but could change.
911 pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
912 self.local_max_error_reset_streams = max;
913 self
914 }
915
916 /// Sets the maximum number of pending-accept remotely-reset streams.
917 ///
918 /// Streams that have been received by the peer, but not accepted by the
919 /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
920 /// could send a request and then shortly after, realize it is not needed,
921 /// sending a CANCEL.
922 ///
923 /// However, since those streams are now "closed", they don't count towards
924 /// the max concurrent streams. So, they will sit in the accept queue,
925 /// using memory.
926 ///
927 /// When the number of remotely-reset streams sitting in the pending-accept
928 /// queue reaches this maximum value, a connection error with the code of
929 /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
930 /// `Future`.
931 ///
932 /// The default value is currently 20, but could change.
933 ///
934 /// # Examples
935 ///
936 ///
937 /// ```
938 /// # use tokio::io::{AsyncRead, AsyncWrite};
939 /// # use h2::server::*;
940 /// #
941 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
942 /// # -> Handshake<T>
943 /// # {
944 /// // `server_fut` is a future representing the completion of the HTTP/2
945 /// // handshake.
946 /// let server_fut = Builder::new()
947 /// .max_pending_accept_reset_streams(100)
948 /// .handshake(my_io);
949 /// # server_fut
950 /// # }
951 /// #
952 /// # pub fn main() {}
953 /// ```
954 pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
955 self.pending_accept_reset_stream_max = max;
956 self
957 }
958
959 /// Sets the maximum send buffer size per stream.
960 ///
961 /// Once a stream has buffered up to (or over) the maximum, the stream's
962 /// flow control will not "poll" additional capacity. Once bytes for the
963 /// stream have been written to the connection, the send buffer capacity
964 /// will be freed up again.
965 ///
966 /// The default is currently ~400KB, but may change.
967 ///
968 /// # Panics
969 ///
970 /// This function panics if `max` is larger than `u32::MAX`.
971 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
972 assert!(max <= std::u32::MAX as usize);
973 self.max_send_buffer_size = max;
974 self
975 }
976
977 /// Sets the maximum number of concurrent locally reset streams.
978 ///
979 /// When a stream is explicitly reset by either calling
980 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
981 /// before completing the stream, the HTTP/2 specification requires that
982 /// any further frames received for that stream must be ignored for "some
983 /// time".
984 ///
985 /// In order to satisfy the specification, internal state must be maintained
986 /// to implement the behavior. This state grows linearly with the number of
987 /// streams that are locally reset.
988 ///
989 /// The `reset_stream_duration` setting configures the max amount of time
990 /// this state will be maintained in memory. Once the duration elapses, the
991 /// stream state is purged from memory.
992 ///
993 /// Once the stream has been fully purged from memory, any additional frames
994 /// received for that stream will result in a connection level protocol
995 /// error, forcing the connection to terminate.
996 ///
997 /// The default value is 30 seconds.
998 ///
999 /// # Examples
1000 ///
1001 /// ```
1002 /// # use tokio::io::{AsyncRead, AsyncWrite};
1003 /// # use h2::server::*;
1004 /// # use std::time::Duration;
1005 /// #
1006 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1007 /// # -> Handshake<T>
1008 /// # {
1009 /// // `server_fut` is a future representing the completion of the HTTP/2
1010 /// // handshake.
1011 /// let server_fut = Builder::new()
1012 /// .reset_stream_duration(Duration::from_secs(10))
1013 /// .handshake(my_io);
1014 /// # server_fut
1015 /// # }
1016 /// #
1017 /// # pub fn main() {}
1018 /// ```
1019 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1020 self.reset_stream_duration = dur;
1021 self
1022 }
1023
1024 /// Enables the [extended CONNECT protocol].
1025 ///
1026 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1027 pub fn enable_connect_protocol(&mut self) -> &mut Self {
1028 self.settings.set_enable_connect_protocol(Some(1));
1029 self
1030 }
1031
1032 /// Creates a new configured HTTP/2 server backed by `io`.
1033 ///
1034 /// It is expected that `io` already be in an appropriate state to commence
1035 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1036 ///
1037 /// Returns a future which resolves to the [`Connection`] instance once the
1038 /// HTTP/2 handshake has been completed.
1039 ///
1040 /// This function also allows the caller to configure the send payload data
1041 /// type. See [Outbound data type] for more details.
1042 ///
1043 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1044 /// [Handshake]: ../index.html#handshake
1045 /// [`Connection`]: struct.Connection.html
1046 /// [Outbound data type]: ../index.html#outbound-data-type.
1047 ///
1048 /// # Examples
1049 ///
1050 /// Basic usage:
1051 ///
1052 /// ```
1053 /// # use tokio::io::{AsyncRead, AsyncWrite};
1054 /// # use h2::server::*;
1055 /// #
1056 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1057 /// # -> Handshake<T>
1058 /// # {
1059 /// // `server_fut` is a future representing the completion of the HTTP/2
1060 /// // handshake.
1061 /// let server_fut = Builder::new()
1062 /// .handshake(my_io);
1063 /// # server_fut
1064 /// # }
1065 /// #
1066 /// # pub fn main() {}
1067 /// ```
1068 ///
1069 /// Configures the send-payload data type. In this case, the outbound data
1070 /// type will be `&'static [u8]`.
1071 ///
1072 /// ```
1073 /// # use tokio::io::{AsyncRead, AsyncWrite};
1074 /// # use h2::server::*;
1075 /// #
1076 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1077 /// # -> Handshake<T, &'static [u8]>
1078 /// # {
1079 /// // `server_fut` is a future representing the completion of the HTTP/2
1080 /// // handshake.
1081 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1082 /// .handshake(my_io);
1083 /// # server_fut
1084 /// # }
1085 /// #
1086 /// # pub fn main() {}
1087 /// ```
1088 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1089 where
1090 T: AsyncRead + AsyncWrite + Unpin,
1091 B: Buf,
1092 {
1093 Connection::handshake2(io, self.clone())
1094 }
1095}
1096
1097impl Default for Builder {
1098 fn default() -> Builder {
1099 Builder::new()
1100 }
1101}
1102
1103// ===== impl SendResponse =====
1104
1105impl<B: Buf> SendResponse<B> {
1106 /// Send a response to a client request.
1107 ///
1108 /// On success, a [`SendStream`] instance is returned. This instance can be
1109 /// used to stream the response body and send trailers.
1110 ///
1111 /// If a body or trailers will be sent on the returned [`SendStream`]
1112 /// instance, then `end_of_stream` must be set to `false` when calling this
1113 /// function.
1114 ///
1115 /// The [`SendResponse`] instance is already associated with a received
1116 /// request. This function may only be called once per instance and only if
1117 /// [`send_reset`] has not been previously called.
1118 ///
1119 /// [`SendResponse`]: #
1120 /// [`SendStream`]: ../struct.SendStream.html
1121 /// [`send_reset`]: #method.send_reset
1122 pub fn send_response(
1123 &mut self,
1124 response: Response<()>,
1125 end_of_stream: bool,
1126 ) -> Result<SendStream<B>, crate::Error> {
1127 self.inner
1128 .send_response(response, end_of_stream)
1129 .map(|_| SendStream::new(self.inner.clone()))
1130 .map_err(Into::into)
1131 }
1132
1133 /// Push a request and response to the client
1134 ///
1135 /// On success, a [`SendResponse`] instance is returned.
1136 ///
1137 /// [`SendResponse`]: #
1138 pub fn push_request(
1139 &mut self,
1140 request: Request<()>,
1141 ) -> Result<SendPushedResponse<B>, crate::Error> {
1142 self.inner
1143 .send_push_promise(request)
1144 .map(|inner| SendPushedResponse {
1145 inner: SendResponse { inner },
1146 })
1147 .map_err(Into::into)
1148 }
1149
1150 /// Send a stream reset to the peer.
1151 ///
1152 /// This essentially cancels the stream, including any inbound or outbound
1153 /// data streams.
1154 ///
1155 /// If this function is called before [`send_response`], a call to
1156 /// [`send_response`] will result in an error.
1157 ///
1158 /// If this function is called while a [`SendStream`] instance is active,
1159 /// any further use of the instance will result in an error.
1160 ///
1161 /// This function should only be called once.
1162 ///
1163 /// [`send_response`]: #method.send_response
1164 /// [`SendStream`]: ../struct.SendStream.html
1165 pub fn send_reset(&mut self, reason: Reason) {
1166 self.inner.send_reset(reason)
1167 }
1168
1169 /// Polls to be notified when the client resets this stream.
1170 ///
1171 /// If stream is still open, this returns `Poll::Pending`, and
1172 /// registers the task to be notified if a `RST_STREAM` is received.
1173 ///
1174 /// If a `RST_STREAM` frame is received for this stream, calling this
1175 /// method will yield the `Reason` for the reset.
1176 ///
1177 /// # Error
1178 ///
1179 /// Calling this method after having called `send_response` will return
1180 /// a user error.
1181 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1182 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1183 }
1184
1185 /// Returns the stream ID of the response stream.
1186 ///
1187 /// # Panics
1188 ///
1189 /// If the lock on the stream store has been poisoned.
1190 pub fn stream_id(&self) -> crate::StreamId {
1191 crate::StreamId::from_internal(self.inner.stream_id())
1192 }
1193}
1194
1195// ===== impl SendPushedResponse =====
1196
1197impl<B: Buf> SendPushedResponse<B> {
1198 /// Send a response to a promised request.
1199 ///
1200 /// On success, a [`SendStream`] instance is returned. This instance can be
1201 /// used to stream the response body and send trailers.
1202 ///
1203 /// If a body or trailers will be sent on the returned [`SendStream`]
1204 /// instance, then `end_of_stream` must be set to `false` when calling this
1205 /// function.
1206 ///
1207 /// The [`SendPushedResponse`] instance is associated with a promised
1208 /// request. This function may only be called once per instance and only if
1209 /// [`send_reset`] has not been previously called.
1210 ///
1211 /// [`SendPushedResponse`]: #
1212 /// [`SendStream`]: ../struct.SendStream.html
1213 /// [`send_reset`]: #method.send_reset
1214 pub fn send_response(
1215 &mut self,
1216 response: Response<()>,
1217 end_of_stream: bool,
1218 ) -> Result<SendStream<B>, crate::Error> {
1219 self.inner.send_response(response, end_of_stream)
1220 }
1221
1222 /// Send a stream reset to the peer.
1223 ///
1224 /// This essentially cancels the stream, including any inbound or outbound
1225 /// data streams.
1226 ///
1227 /// If this function is called before [`send_response`], a call to
1228 /// [`send_response`] will result in an error.
1229 ///
1230 /// If this function is called while a [`SendStream`] instance is active,
1231 /// any further use of the instance will result in an error.
1232 ///
1233 /// This function should only be called once.
1234 ///
1235 /// [`send_response`]: #method.send_response
1236 /// [`SendStream`]: ../struct.SendStream.html
1237 pub fn send_reset(&mut self, reason: Reason) {
1238 self.inner.send_reset(reason)
1239 }
1240
1241 /// Polls to be notified when the client resets this stream.
1242 ///
1243 /// If stream is still open, this returns `Poll::Pending`, and
1244 /// registers the task to be notified if a `RST_STREAM` is received.
1245 ///
1246 /// If a `RST_STREAM` frame is received for this stream, calling this
1247 /// method will yield the `Reason` for the reset.
1248 ///
1249 /// # Error
1250 ///
1251 /// Calling this method after having called `send_response` will return
1252 /// a user error.
1253 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1254 self.inner.poll_reset(cx)
1255 }
1256
1257 /// Returns the stream ID of the response stream.
1258 ///
1259 /// # Panics
1260 ///
1261 /// If the lock on the stream store has been poisoned.
1262 pub fn stream_id(&self) -> crate::StreamId {
1263 self.inner.stream_id()
1264 }
1265}
1266
1267// ===== impl Flush =====
1268
1269impl<T, B: Buf> Flush<T, B> {
1270 fn new(codec: Codec<T, B>) -> Self {
1271 Flush { codec: Some(codec) }
1272 }
1273}
1274
1275impl<T, B> Future for Flush<T, B>
1276where
1277 T: AsyncWrite + Unpin,
1278 B: Buf,
1279{
1280 type Output = Result<Codec<T, B>, crate::Error>;
1281
1282 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1283 // Flush the codec
1284 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(op:crate::Error::from_io)?;
1285
1286 // Return the codec
1287 Poll::Ready(Ok(self.codec.take().unwrap()))
1288 }
1289}
1290
1291impl<T, B: Buf> ReadPreface<T, B> {
1292 fn new(codec: Codec<T, B>) -> Self {
1293 ReadPreface {
1294 codec: Some(codec),
1295 pos: 0,
1296 }
1297 }
1298
1299 fn inner_mut(&mut self) -> &mut T {
1300 self.codec.as_mut().unwrap().get_mut()
1301 }
1302}
1303
1304impl<T, B> Future for ReadPreface<T, B>
1305where
1306 T: AsyncRead + Unpin,
1307 B: Buf,
1308{
1309 type Output = Result<Codec<T, B>, crate::Error>;
1310
1311 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1312 let mut buf = [0; 24];
1313 let mut rem = PREFACE.len() - self.pos;
1314
1315 while rem > 0 {
1316 let mut buf = ReadBuf::new(&mut buf[..rem]);
1317 ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1318 .map_err(crate::Error::from_io)?;
1319 let n = buf.filled().len();
1320 if n == 0 {
1321 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1322 io::ErrorKind::UnexpectedEof,
1323 "connection closed before reading preface",
1324 ))));
1325 }
1326
1327 if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1328 proto_err!(conn: "read_preface: invalid preface");
1329 // TODO: Should this just write the GO_AWAY frame directly?
1330 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1331 }
1332
1333 self.pos += n;
1334 rem -= n; // TODO test
1335 }
1336
1337 Poll::Ready(Ok(self.codec.take().unwrap()))
1338 }
1339}
1340
1341// ===== impl Handshake =====
1342
1343impl<T, B: Buf> Future for Handshake<T, B>
1344where
1345 T: AsyncRead + AsyncWrite + Unpin,
1346 B: Buf,
1347{
1348 type Output = Result<Connection<T, B>, crate::Error>;
1349
1350 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1351 let span = self.span.clone(); // XXX(eliza): T_T
1352 let _e = span.enter();
1353 tracing::trace!(state = ?self.state);
1354
1355 loop {
1356 match &mut self.state {
1357 Handshaking::Flushing(flush) => {
1358 // We're currently flushing a pending SETTINGS frame. Poll the
1359 // flush future, and, if it's completed, advance our state to wait
1360 // for the client preface.
1361 let codec = match Pin::new(flush).poll(cx)? {
1362 Poll::Pending => {
1363 tracing::trace!(flush.poll = %"Pending");
1364 return Poll::Pending;
1365 }
1366 Poll::Ready(flushed) => {
1367 tracing::trace!(flush.poll = %"Ready");
1368 flushed
1369 }
1370 };
1371 self.state = Handshaking::ReadingPreface(
1372 ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1373 );
1374 }
1375 Handshaking::ReadingPreface(read) => {
1376 let codec = ready!(Pin::new(read).poll(cx)?);
1377
1378 self.state = Handshaking::Done;
1379
1380 let connection = proto::Connection::new(
1381 codec,
1382 Config {
1383 next_stream_id: 2.into(),
1384 // Server does not need to locally initiate any streams
1385 initial_max_send_streams: 0,
1386 max_send_buffer_size: self.builder.max_send_buffer_size,
1387 reset_stream_duration: self.builder.reset_stream_duration,
1388 reset_stream_max: self.builder.reset_stream_max,
1389 remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1390 local_error_reset_streams_max: self
1391 .builder
1392 .local_max_error_reset_streams,
1393 settings: self.builder.settings.clone(),
1394 },
1395 );
1396
1397 tracing::trace!("connection established!");
1398 let mut c = Connection { connection };
1399 if let Some(sz) = self.builder.initial_target_connection_window_size {
1400 c.set_target_window_size(sz);
1401 }
1402
1403 return Poll::Ready(Ok(c));
1404 }
1405 Handshaking::Done => {
1406 panic!("Handshaking::poll() called again after handshaking was complete")
1407 }
1408 }
1409 }
1410 }
1411}
1412
1413impl<T, B> fmt::Debug for Handshake<T, B>
1414where
1415 T: AsyncRead + AsyncWrite + fmt::Debug,
1416 B: fmt::Debug + Buf,
1417{
1418 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1419 write!(fmt, "server::Handshake")
1420 }
1421}
1422
1423impl Peer {
1424 pub fn convert_send_message(
1425 id: StreamId,
1426 response: Response<()>,
1427 end_of_stream: bool,
1428 ) -> frame::Headers {
1429 use http::response::Parts;
1430
1431 // Extract the components of the HTTP request
1432 let (
1433 Parts {
1434 status, headers, ..
1435 },
1436 _,
1437 ) = response.into_parts();
1438
1439 // Build the set pseudo header set. All requests will include `method`
1440 // and `path`.
1441 let pseudo = Pseudo::response(status);
1442
1443 // Create the HEADERS frame
1444 let mut frame = frame::Headers::new(id, pseudo, headers);
1445
1446 if end_of_stream {
1447 frame.set_end_stream()
1448 }
1449
1450 frame
1451 }
1452
1453 pub fn convert_push_message(
1454 stream_id: StreamId,
1455 promised_id: StreamId,
1456 request: Request<()>,
1457 ) -> Result<frame::PushPromise, UserError> {
1458 use http::request::Parts;
1459
1460 if let Err(e) = frame::PushPromise::validate_request(&request) {
1461 use PushPromiseHeaderError::*;
1462 match e {
1463 NotSafeAndCacheable => tracing::debug!(
1464 ?promised_id,
1465 "convert_push_message: method {} is not safe and cacheable",
1466 request.method(),
1467 ),
1468 InvalidContentLength(e) => tracing::debug!(
1469 ?promised_id,
1470 "convert_push_message; promised request has invalid content-length {:?}",
1471 e,
1472 ),
1473 }
1474 return Err(UserError::MalformedHeaders);
1475 }
1476
1477 // Extract the components of the HTTP request
1478 let (
1479 Parts {
1480 method,
1481 uri,
1482 headers,
1483 ..
1484 },
1485 _,
1486 ) = request.into_parts();
1487
1488 let pseudo = Pseudo::request(method, uri, None);
1489
1490 Ok(frame::PushPromise::new(
1491 stream_id,
1492 promised_id,
1493 pseudo,
1494 headers,
1495 ))
1496 }
1497}
1498
1499impl proto::Peer for Peer {
1500 type Poll = Request<()>;
1501
1502 const NAME: &'static str = "Server";
1503
1504 fn is_server() -> bool {
1505 true
1506 }
1507
1508 fn r#dyn() -> proto::DynPeer {
1509 proto::DynPeer::Server
1510 }
1511
1512 fn convert_poll_message(
1513 pseudo: Pseudo,
1514 fields: HeaderMap,
1515 stream_id: StreamId,
1516 ) -> Result<Self::Poll, Error> {
1517 use http::{uri, Version};
1518
1519 let mut b = Request::builder();
1520
1521 macro_rules! malformed {
1522 ($($arg:tt)*) => {{
1523 tracing::debug!($($arg)*);
1524 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1525 }}
1526 }
1527
1528 b = b.version(Version::HTTP_2);
1529
1530 let is_connect;
1531 if let Some(method) = pseudo.method {
1532 is_connect = method == Method::CONNECT;
1533 b = b.method(method);
1534 } else {
1535 malformed!("malformed headers: missing method");
1536 }
1537
1538 let has_protocol = pseudo.protocol.is_some();
1539 if has_protocol {
1540 if is_connect {
1541 // Assert that we have the right type.
1542 b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1543 } else {
1544 malformed!("malformed headers: :protocol on non-CONNECT request");
1545 }
1546 }
1547
1548 if pseudo.status.is_some() {
1549 malformed!("malformed headers: :status field on request");
1550 }
1551
1552 // Convert the URI
1553 let mut parts = uri::Parts::default();
1554
1555 // A request translated from HTTP/1 must not include the :authority
1556 // header
1557 if let Some(authority) = pseudo.authority {
1558 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1559 parts.authority = Some(maybe_authority.or_else(|why| {
1560 malformed!(
1561 "malformed headers: malformed authority ({:?}): {}",
1562 authority,
1563 why,
1564 )
1565 })?);
1566 }
1567
1568 // A :scheme is required, except CONNECT.
1569 if let Some(scheme) = pseudo.scheme {
1570 if is_connect && !has_protocol {
1571 malformed!("malformed headers: :scheme in CONNECT");
1572 }
1573 let maybe_scheme = scheme.parse();
1574 let scheme = maybe_scheme.or_else(|why| {
1575 malformed!(
1576 "malformed headers: malformed scheme ({:?}): {}",
1577 scheme,
1578 why,
1579 )
1580 })?;
1581
1582 // It's not possible to build an `Uri` from a scheme and path. So,
1583 // after validating is was a valid scheme, we just have to drop it
1584 // if there isn't an :authority.
1585 if parts.authority.is_some() {
1586 parts.scheme = Some(scheme);
1587 }
1588 } else if !is_connect || has_protocol {
1589 malformed!("malformed headers: missing scheme");
1590 }
1591
1592 if let Some(path) = pseudo.path {
1593 if is_connect && !has_protocol {
1594 malformed!("malformed headers: :path in CONNECT");
1595 }
1596
1597 // This cannot be empty
1598 if path.is_empty() {
1599 malformed!("malformed headers: missing path");
1600 }
1601
1602 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1603 parts.path_and_query = Some(maybe_path.or_else(|why| {
1604 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1605 })?);
1606 } else if is_connect && has_protocol {
1607 malformed!("malformed headers: missing path in extended CONNECT");
1608 }
1609
1610 b = b.uri(parts);
1611
1612 let mut request = match b.body(()) {
1613 Ok(request) => request,
1614 Err(e) => {
1615 // TODO: Should there be more specialized handling for different
1616 // kinds of errors
1617 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1618 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1619 }
1620 };
1621
1622 *request.headers_mut() = fields;
1623
1624 Ok(request)
1625 }
1626}
1627
1628// ===== impl Handshaking =====
1629
1630impl<T, B> fmt::Debug for Handshaking<T, B>
1631where
1632 B: Buf,
1633{
1634 #[inline]
1635 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1636 match *self {
1637 Handshaking::Flushing(_) => f.write_str(data:"Flushing(_)"),
1638 Handshaking::ReadingPreface(_) => f.write_str(data:"ReadingPreface(_)"),
1639 Handshaking::Done => f.write_str(data:"Done"),
1640 }
1641 }
1642}
1643