1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use h2::server;
68//! use http::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//! // Accept all incoming TCP connections.
76//! loop {
77//! if let Ok((socket, _peer_addr)) = listener.accept().await {
78//! // Spawn a new task to process each connection.
79//! tokio::spawn(async {
80//! // Start the HTTP/2 connection handshake
81//! let mut h2 = server::handshake(socket).await.unwrap();
82//! // Accept all inbound HTTP/2 streams sent over the
83//! // connection.
84//! while let Some(request) = h2.accept().await {
85//! let (request, mut respond) = request.unwrap();
86//! println!("Received request: {:?}", request);
87//!
88//! // Build a response with no body
89//! let response = Response::builder()
90//! .status(StatusCode::OK)
91//! .body(())
92//! .unwrap();
93//!
94//! // Send the response back to the client
95//! respond.send_response(response, true)
96//! .unwrap();
97//! }
98//!
99//! });
100//! }
101//! }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::codec::{Codec, UserError};
119use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::proto::{self, Config, Error, Prioritized};
121use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123use bytes::{Buf, Bytes};
124use http::{HeaderMap, Method, Request, Response};
125use std::future::Future;
126use std::pin::Pin;
127use std::task::{Context, Poll};
128use std::time::Duration;
129use std::{fmt, io};
130use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131use tracing::instrument::{Instrument, Instrumented};
132
133/// In progress HTTP/2 connection handshake future.
134///
135/// This type implements `Future`, yielding a `Connection` instance once the
136/// handshake has completed.
137///
138/// The handshake is completed once the connection preface is fully received
139/// from the client **and** the initial settings frame is sent to the client.
140///
141/// The handshake future does not wait for the initial settings frame from the
142/// client.
143///
144/// See [module] level docs for more details.
145///
146/// [module]: index.html
147#[must_use = "futures do nothing unless polled"]
148pub struct Handshake<T, B: Buf = Bytes> {
149 /// The config to pass to Connection::new after handshake succeeds.
150 builder: Builder,
151 /// The current state of the handshake.
152 state: Handshaking<T, B>,
153 /// Span tracking the handshake
154 span: tracing::Span,
155}
156
157/// Accepts inbound HTTP/2 streams on a connection.
158///
159/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160/// implements the HTTP/2 server logic for that connection. It is responsible
161/// for receiving inbound streams initiated by the client as well as driving the
162/// internal state forward.
163///
164/// `Connection` values are created by calling [`handshake`]. Once a
165/// `Connection` value is obtained, the caller must call [`poll`] or
166/// [`poll_close`] in order to drive the internal connection state forward.
167///
168/// See [module level] documentation for more details
169///
170/// [module level]: index.html
171/// [`handshake`]: struct.Connection.html#method.handshake
172/// [`poll`]: struct.Connection.html#method.poll
173/// [`poll_close`]: struct.Connection.html#method.poll_close
174///
175/// # Examples
176///
177/// ```
178/// # use tokio::io::{AsyncRead, AsyncWrite};
179/// # use h2::server;
180/// # use h2::server::*;
181/// #
182/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183/// let mut server = server::handshake(my_io).await.unwrap();
184/// while let Some(request) = server.accept().await {
185/// tokio::spawn(async move {
186/// let (request, respond) = request.unwrap();
187/// // Process the request and send the response back to the client
188/// // using `respond`.
189/// });
190/// }
191/// # }
192/// #
193/// # pub fn main() {}
194/// ```
195#[must_use = "streams do nothing unless polled"]
196pub struct Connection<T, B: Buf> {
197 connection: proto::Connection<T, Peer, B>,
198}
199
200/// Builds server connections with custom configuration values.
201///
202/// Methods can be chained in order to set the configuration values.
203///
204/// The server is constructed by calling [`handshake`] and passing the I/O
205/// handle that will back the HTTP/2 server.
206///
207/// New instances of `Builder` are obtained via [`Builder::new`].
208///
209/// See function level documentation for details on the various server
210/// configuration settings.
211///
212/// [`Builder::new`]: struct.Builder.html#method.new
213/// [`handshake`]: struct.Builder.html#method.handshake
214///
215/// # Examples
216///
217/// ```
218/// # use tokio::io::{AsyncRead, AsyncWrite};
219/// # use h2::server::*;
220/// #
221/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222/// # -> Handshake<T>
223/// # {
224/// // `server_fut` is a future representing the completion of the HTTP/2
225/// // handshake.
226/// let server_fut = Builder::new()
227/// .initial_window_size(1_000_000)
228/// .max_concurrent_streams(1000)
229/// .handshake(my_io);
230/// # server_fut
231/// # }
232/// #
233/// # pub fn main() {}
234/// ```
235#[derive(Clone, Debug)]
236pub struct Builder {
237 /// Time to keep locally reset streams around before reaping.
238 reset_stream_duration: Duration,
239
240 /// Maximum number of locally reset streams to keep at a time.
241 reset_stream_max: usize,
242
243 /// Maximum number of remotely reset streams to allow in the pending
244 /// accept queue.
245 pending_accept_reset_stream_max: usize,
246
247 /// Initial `Settings` frame to send as part of the handshake.
248 settings: Settings,
249
250 /// Initial target window size for new connections.
251 initial_target_connection_window_size: Option<u32>,
252
253 /// Maximum amount of bytes to "buffer" for writing per stream.
254 max_send_buffer_size: usize,
255}
256
257/// Send a response back to the client
258///
259/// A `SendResponse` instance is provided when receiving a request and is used
260/// to send the associated response back to the client. It is also used to
261/// explicitly reset the stream with a custom reason.
262///
263/// It will also be used to initiate push promises linked with the associated
264/// stream.
265///
266/// If the `SendResponse` instance is dropped without sending a response, then
267/// the HTTP/2 stream will be reset.
268///
269/// See [module] level docs for more details.
270///
271/// [module]: index.html
272#[derive(Debug)]
273pub struct SendResponse<B: Buf> {
274 inner: proto::StreamRef<B>,
275}
276
277/// Send a response to a promised request
278///
279/// A `SendPushedResponse` instance is provided when promising a request and is used
280/// to send the associated response to the client. It is also used to
281/// explicitly reset the stream with a custom reason.
282///
283/// It can not be used to initiate push promises.
284///
285/// If the `SendPushedResponse` instance is dropped without sending a response, then
286/// the HTTP/2 stream will be reset.
287///
288/// See [module] level docs for more details.
289///
290/// [module]: index.html
291pub struct SendPushedResponse<B: Buf> {
292 inner: SendResponse<B>,
293}
294
295// Manual implementation necessary because of rust-lang/rust#26925
296impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
297 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
298 write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
299 }
300}
301
302/// Stages of an in-progress handshake.
303enum Handshaking<T, B: Buf> {
304 /// State 1. Connection is flushing pending SETTINGS frame.
305 Flushing(Instrumented<Flush<T, Prioritized<B>>>),
306 /// State 2. Connection is waiting for the client preface.
307 ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
308 /// State 3. Handshake is done, polling again would panic.
309 Done,
310}
311
312/// Flush a Sink
313struct Flush<T, B> {
314 codec: Option<Codec<T, B>>,
315}
316
317/// Read the client connection preface
318struct ReadPreface<T, B> {
319 codec: Option<Codec<T, B>>,
320 pos: usize,
321}
322
323#[derive(Debug)]
324pub(crate) struct Peer;
325
326const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
327
328/// Creates a new configured HTTP/2 server with default configuration
329/// values backed by `io`.
330///
331/// It is expected that `io` already be in an appropriate state to commence
332/// the [HTTP/2 handshake]. See [Handshake] for more details.
333///
334/// Returns a future which resolves to the [`Connection`] instance once the
335/// HTTP/2 handshake has been completed. The returned [`Connection`]
336/// instance will be using default configuration values. Use [`Builder`] to
337/// customize the configuration values used by a [`Connection`] instance.
338///
339/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
340/// [Handshake]: ../index.html#handshake
341/// [`Connection`]: struct.Connection.html
342///
343/// # Examples
344///
345/// ```
346/// # use tokio::io::{AsyncRead, AsyncWrite};
347/// # use h2::server;
348/// # use h2::server::*;
349/// #
350/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
351/// # {
352/// let connection = server::handshake(my_io).await.unwrap();
353/// // The HTTP/2 handshake has completed, now use `connection` to
354/// // accept inbound HTTP/2 streams.
355/// # }
356/// #
357/// # pub fn main() {}
358/// ```
359pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
360where
361 T: AsyncRead + AsyncWrite + Unpin,
362{
363 Builder::new().handshake(io)
364}
365
366// ===== impl Connection =====
367
368impl<T, B> Connection<T, B>
369where
370 T: AsyncRead + AsyncWrite + Unpin,
371 B: Buf,
372{
373 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
374 let span = tracing::trace_span!("server_handshake");
375 let entered = span.enter();
376
377 // Create the codec.
378 let mut codec = Codec::new(io);
379
380 if let Some(max) = builder.settings.max_frame_size() {
381 codec.set_max_recv_frame_size(max as usize);
382 }
383
384 if let Some(max) = builder.settings.max_header_list_size() {
385 codec.set_max_recv_header_list_size(max as usize);
386 }
387
388 // Send initial settings frame.
389 codec
390 .buffer(builder.settings.clone().into())
391 .expect("invalid SETTINGS frame");
392
393 // Create the handshake future.
394 let state =
395 Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
396
397 drop(entered);
398
399 Handshake {
400 builder,
401 state,
402 span,
403 }
404 }
405
406 /// Accept the next incoming request on this connection.
407 pub async fn accept(
408 &mut self,
409 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
410 futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
411 }
412
413 #[doc(hidden)]
414 pub fn poll_accept(
415 &mut self,
416 cx: &mut Context<'_>,
417 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
418 // Always try to advance the internal state. Getting Pending also is
419 // needed to allow this function to return Pending.
420 if self.poll_closed(cx)?.is_ready() {
421 // If the socket is closed, don't return anything
422 // TODO: drop any pending streams
423 return Poll::Ready(None);
424 }
425
426 if let Some(inner) = self.connection.next_incoming() {
427 tracing::trace!("received incoming");
428 let (head, _) = inner.take_request().into_parts();
429 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
430
431 let request = Request::from_parts(head, body);
432 let respond = SendResponse { inner };
433
434 return Poll::Ready(Some(Ok((request, respond))));
435 }
436
437 Poll::Pending
438 }
439
440 /// Sets the target window size for the whole connection.
441 ///
442 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
443 /// frame will be immediately sent to the remote, increasing the connection
444 /// level window by `size - current_value`.
445 ///
446 /// If `size` is less than the current value, nothing will happen
447 /// immediately. However, as window capacity is released by
448 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
449 /// out until the number of "in flight" bytes drops below `size`.
450 ///
451 /// The default value is 65,535.
452 ///
453 /// See [`FlowControl`] documentation for more details.
454 ///
455 /// [`FlowControl`]: ../struct.FlowControl.html
456 /// [library level]: ../index.html#flow-control
457 pub fn set_target_window_size(&mut self, size: u32) {
458 assert!(size <= proto::MAX_WINDOW_SIZE);
459 self.connection.set_target_window_size(size);
460 }
461
462 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
463 /// flow control for received data.
464 ///
465 /// The `SETTINGS` will be sent to the remote, and only applied once the
466 /// remote acknowledges the change.
467 ///
468 /// This can be used to increase or decrease the window size for existing
469 /// streams.
470 ///
471 /// # Errors
472 ///
473 /// Returns an error if a previous call is still pending acknowledgement
474 /// from the remote endpoint.
475 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
476 assert!(size <= proto::MAX_WINDOW_SIZE);
477 self.connection.set_initial_window_size(size)?;
478 Ok(())
479 }
480
481 /// Enables the [extended CONNECT protocol].
482 ///
483 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
484 ///
485 /// # Errors
486 ///
487 /// Returns an error if a previous call is still pending acknowledgement
488 /// from the remote endpoint.
489 pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
490 self.connection.set_enable_connect_protocol()?;
491 Ok(())
492 }
493
494 /// Returns `Ready` when the underlying connection has closed.
495 ///
496 /// If any new inbound streams are received during a call to `poll_closed`,
497 /// they will be queued and returned on the next call to [`poll_accept`].
498 ///
499 /// This function will advance the internal connection state, driving
500 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
501 ///
502 /// See [here](index.html#managing-the-connection) for more details.
503 ///
504 /// [`poll_accept`]: struct.Connection.html#method.poll_accept
505 /// [`RecvStream`]: ../struct.RecvStream.html
506 /// [`SendStream`]: ../struct.SendStream.html
507 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
508 self.connection.poll(cx).map_err(Into::into)
509 }
510
511 #[doc(hidden)]
512 #[deprecated(note = "renamed to poll_closed")]
513 pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514 self.poll_closed(cx)
515 }
516
517 /// Sets the connection to a GOAWAY state.
518 ///
519 /// Does not terminate the connection. Must continue being polled to close
520 /// connection.
521 ///
522 /// After flushing the GOAWAY frame, the connection is closed. Any
523 /// outstanding streams do not prevent the connection from closing. This
524 /// should usually be reserved for shutting down when something bad
525 /// external to `h2` has happened, and open streams cannot be properly
526 /// handled.
527 ///
528 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
529 pub fn abrupt_shutdown(&mut self, reason: Reason) {
530 self.connection.go_away_from_user(reason);
531 }
532
533 /// Starts a [graceful shutdown][1] process.
534 ///
535 /// Must continue being polled to close connection.
536 ///
537 /// It's possible to receive more requests after calling this method, since
538 /// they might have been in-flight from the client already. After about
539 /// 1 RTT, no new requests should be accepted. Once all active streams
540 /// have completed, the connection is closed.
541 ///
542 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
543 pub fn graceful_shutdown(&mut self) {
544 self.connection.go_away_gracefully();
545 }
546
547 /// Takes a `PingPong` instance from the connection.
548 ///
549 /// # Note
550 ///
551 /// This may only be called once. Calling multiple times will return `None`.
552 pub fn ping_pong(&mut self) -> Option<PingPong> {
553 self.connection.take_user_pings().map(PingPong::new)
554 }
555
556 /// Returns the maximum number of concurrent streams that may be initiated
557 /// by the server on this connection.
558 ///
559 /// This limit is configured by the client peer by sending the
560 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
561 /// This method returns the currently acknowledged value received from the
562 /// remote.
563 ///
564 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
565 pub fn max_concurrent_send_streams(&self) -> usize {
566 self.connection.max_send_streams()
567 }
568
569 /// Returns the maximum number of concurrent streams that may be initiated
570 /// by the client on this connection.
571 ///
572 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
573 /// parameter][1] sent in a `SETTINGS` frame that has been
574 /// acknowledged by the remote peer. The value to be sent is configured by
575 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
576 /// with the remote peer.
577 ///
578 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
579 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
580 pub fn max_concurrent_recv_streams(&self) -> usize {
581 self.connection.max_recv_streams()
582 }
583
584 // Could disappear at anytime.
585 #[doc(hidden)]
586 #[cfg(feature = "unstable")]
587 pub fn num_wired_streams(&self) -> usize {
588 self.connection.num_wired_streams()
589 }
590}
591
592#[cfg(feature = "stream")]
593impl<T, B> futures_core::Stream for Connection<T, B>
594where
595 T: AsyncRead + AsyncWrite + Unpin,
596 B: Buf,
597{
598 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
599
600 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
601 self.poll_accept(cx)
602 }
603}
604
605impl<T, B> fmt::Debug for Connection<T, B>
606where
607 T: fmt::Debug,
608 B: fmt::Debug + Buf,
609{
610 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
611 fmt&mut DebugStruct<'_, '_>.debug_struct("Connection")
612 .field(name:"connection", &self.connection)
613 .finish()
614 }
615}
616
617// ===== impl Builder =====
618
619impl Builder {
620 /// Returns a new server builder instance initialized with default
621 /// configuration values.
622 ///
623 /// Configuration methods can be chained on the return value.
624 ///
625 /// # Examples
626 ///
627 /// ```
628 /// # use tokio::io::{AsyncRead, AsyncWrite};
629 /// # use h2::server::*;
630 /// #
631 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
632 /// # -> Handshake<T>
633 /// # {
634 /// // `server_fut` is a future representing the completion of the HTTP/2
635 /// // handshake.
636 /// let server_fut = Builder::new()
637 /// .initial_window_size(1_000_000)
638 /// .max_concurrent_streams(1000)
639 /// .handshake(my_io);
640 /// # server_fut
641 /// # }
642 /// #
643 /// # pub fn main() {}
644 /// ```
645 pub fn new() -> Builder {
646 Builder {
647 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
648 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
649 pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
650 settings: Settings::default(),
651 initial_target_connection_window_size: None,
652 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
653 }
654 }
655
656 /// Indicates the initial window size (in octets) for stream-level
657 /// flow control for received data.
658 ///
659 /// The initial window of a stream is used as part of flow control. For more
660 /// details, see [`FlowControl`].
661 ///
662 /// The default value is 65,535.
663 ///
664 /// [`FlowControl`]: ../struct.FlowControl.html
665 ///
666 /// # Examples
667 ///
668 /// ```
669 /// # use tokio::io::{AsyncRead, AsyncWrite};
670 /// # use h2::server::*;
671 /// #
672 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
673 /// # -> Handshake<T>
674 /// # {
675 /// // `server_fut` is a future representing the completion of the HTTP/2
676 /// // handshake.
677 /// let server_fut = Builder::new()
678 /// .initial_window_size(1_000_000)
679 /// .handshake(my_io);
680 /// # server_fut
681 /// # }
682 /// #
683 /// # pub fn main() {}
684 /// ```
685 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
686 self.settings.set_initial_window_size(Some(size));
687 self
688 }
689
690 /// Indicates the initial window size (in octets) for connection-level flow control
691 /// for received data.
692 ///
693 /// The initial window of a connection is used as part of flow control. For more details,
694 /// see [`FlowControl`].
695 ///
696 /// The default value is 65,535.
697 ///
698 /// [`FlowControl`]: ../struct.FlowControl.html
699 ///
700 /// # Examples
701 ///
702 /// ```
703 /// # use tokio::io::{AsyncRead, AsyncWrite};
704 /// # use h2::server::*;
705 /// #
706 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
707 /// # -> Handshake<T>
708 /// # {
709 /// // `server_fut` is a future representing the completion of the HTTP/2
710 /// // handshake.
711 /// let server_fut = Builder::new()
712 /// .initial_connection_window_size(1_000_000)
713 /// .handshake(my_io);
714 /// # server_fut
715 /// # }
716 /// #
717 /// # pub fn main() {}
718 /// ```
719 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
720 self.initial_target_connection_window_size = Some(size);
721 self
722 }
723
724 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
725 /// configured server is able to accept.
726 ///
727 /// The sender may send data frames that are **smaller** than this value,
728 /// but any data larger than `max` will be broken up into multiple `DATA`
729 /// frames.
730 ///
731 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
732 ///
733 /// # Examples
734 ///
735 /// ```
736 /// # use tokio::io::{AsyncRead, AsyncWrite};
737 /// # use h2::server::*;
738 /// #
739 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
740 /// # -> Handshake<T>
741 /// # {
742 /// // `server_fut` is a future representing the completion of the HTTP/2
743 /// // handshake.
744 /// let server_fut = Builder::new()
745 /// .max_frame_size(1_000_000)
746 /// .handshake(my_io);
747 /// # server_fut
748 /// # }
749 /// #
750 /// # pub fn main() {}
751 /// ```
752 ///
753 /// # Panics
754 ///
755 /// This function panics if `max` is not within the legal range specified
756 /// above.
757 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
758 self.settings.set_max_frame_size(Some(max));
759 self
760 }
761
762 /// Sets the max size of received header frames.
763 ///
764 /// This advisory setting informs a peer of the maximum size of header list
765 /// that the sender is prepared to accept, in octets. The value is based on
766 /// the uncompressed size of header fields, including the length of the name
767 /// and value in octets plus an overhead of 32 octets for each header field.
768 ///
769 /// This setting is also used to limit the maximum amount of data that is
770 /// buffered to decode HEADERS frames.
771 ///
772 /// # Examples
773 ///
774 /// ```
775 /// # use tokio::io::{AsyncRead, AsyncWrite};
776 /// # use h2::server::*;
777 /// #
778 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
779 /// # -> Handshake<T>
780 /// # {
781 /// // `server_fut` is a future representing the completion of the HTTP/2
782 /// // handshake.
783 /// let server_fut = Builder::new()
784 /// .max_header_list_size(16 * 1024)
785 /// .handshake(my_io);
786 /// # server_fut
787 /// # }
788 /// #
789 /// # pub fn main() {}
790 /// ```
791 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
792 self.settings.set_max_header_list_size(Some(max));
793 self
794 }
795
796 /// Sets the maximum number of concurrent streams.
797 ///
798 /// The maximum concurrent streams setting only controls the maximum number
799 /// of streams that can be initiated by the remote peer. In other words,
800 /// when this setting is set to 100, this does not limit the number of
801 /// concurrent streams that can be created by the caller.
802 ///
803 /// It is recommended that this value be no smaller than 100, so as to not
804 /// unnecessarily limit parallelism. However, any value is legal, including
805 /// 0. If `max` is set to 0, then the remote will not be permitted to
806 /// initiate streams.
807 ///
808 /// Note that streams in the reserved state, i.e., push promises that have
809 /// been reserved but the stream has not started, do not count against this
810 /// setting.
811 ///
812 /// Also note that if the remote *does* exceed the value set here, it is not
813 /// a protocol level error. Instead, the `h2` library will immediately reset
814 /// the stream.
815 ///
816 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
817 ///
818 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
819 ///
820 /// # Examples
821 ///
822 /// ```
823 /// # use tokio::io::{AsyncRead, AsyncWrite};
824 /// # use h2::server::*;
825 /// #
826 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
827 /// # -> Handshake<T>
828 /// # {
829 /// // `server_fut` is a future representing the completion of the HTTP/2
830 /// // handshake.
831 /// let server_fut = Builder::new()
832 /// .max_concurrent_streams(1000)
833 /// .handshake(my_io);
834 /// # server_fut
835 /// # }
836 /// #
837 /// # pub fn main() {}
838 /// ```
839 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
840 self.settings.set_max_concurrent_streams(Some(max));
841 self
842 }
843
844 /// Sets the maximum number of concurrent locally reset streams.
845 ///
846 /// When a stream is explicitly reset by either calling
847 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
848 /// before completing the stream, the HTTP/2 specification requires that
849 /// any further frames received for that stream must be ignored for "some
850 /// time".
851 ///
852 /// In order to satisfy the specification, internal state must be maintained
853 /// to implement the behavior. This state grows linearly with the number of
854 /// streams that are locally reset.
855 ///
856 /// The `max_concurrent_reset_streams` setting configures sets an upper
857 /// bound on the amount of state that is maintained. When this max value is
858 /// reached, the oldest reset stream is purged from memory.
859 ///
860 /// Once the stream has been fully purged from memory, any additional frames
861 /// received for that stream will result in a connection level protocol
862 /// error, forcing the connection to terminate.
863 ///
864 /// The default value is 10.
865 ///
866 /// # Examples
867 ///
868 /// ```
869 /// # use tokio::io::{AsyncRead, AsyncWrite};
870 /// # use h2::server::*;
871 /// #
872 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
873 /// # -> Handshake<T>
874 /// # {
875 /// // `server_fut` is a future representing the completion of the HTTP/2
876 /// // handshake.
877 /// let server_fut = Builder::new()
878 /// .max_concurrent_reset_streams(1000)
879 /// .handshake(my_io);
880 /// # server_fut
881 /// # }
882 /// #
883 /// # pub fn main() {}
884 /// ```
885 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
886 self.reset_stream_max = max;
887 self
888 }
889
890 /// Sets the maximum number of pending-accept remotely-reset streams.
891 ///
892 /// Streams that have been received by the peer, but not accepted by the
893 /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
894 /// could send a request and then shortly after, realize it is not needed,
895 /// sending a CANCEL.
896 ///
897 /// However, since those streams are now "closed", they don't count towards
898 /// the max concurrent streams. So, they will sit in the accept queue,
899 /// using memory.
900 ///
901 /// When the number of remotely-reset streams sitting in the pending-accept
902 /// queue reaches this maximum value, a connection error with the code of
903 /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
904 /// `Future`.
905 ///
906 /// The default value is currently 20, but could change.
907 ///
908 /// # Examples
909 ///
910 ///
911 /// ```
912 /// # use tokio::io::{AsyncRead, AsyncWrite};
913 /// # use h2::server::*;
914 /// #
915 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
916 /// # -> Handshake<T>
917 /// # {
918 /// // `server_fut` is a future representing the completion of the HTTP/2
919 /// // handshake.
920 /// let server_fut = Builder::new()
921 /// .max_pending_accept_reset_streams(100)
922 /// .handshake(my_io);
923 /// # server_fut
924 /// # }
925 /// #
926 /// # pub fn main() {}
927 /// ```
928 pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
929 self.pending_accept_reset_stream_max = max;
930 self
931 }
932
933 /// Sets the maximum send buffer size per stream.
934 ///
935 /// Once a stream has buffered up to (or over) the maximum, the stream's
936 /// flow control will not "poll" additional capacity. Once bytes for the
937 /// stream have been written to the connection, the send buffer capacity
938 /// will be freed up again.
939 ///
940 /// The default is currently ~400KB, but may change.
941 ///
942 /// # Panics
943 ///
944 /// This function panics if `max` is larger than `u32::MAX`.
945 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
946 assert!(max <= std::u32::MAX as usize);
947 self.max_send_buffer_size = max;
948 self
949 }
950
951 /// Sets the maximum number of concurrent locally reset streams.
952 ///
953 /// When a stream is explicitly reset by either calling
954 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
955 /// before completing the stream, the HTTP/2 specification requires that
956 /// any further frames received for that stream must be ignored for "some
957 /// time".
958 ///
959 /// In order to satisfy the specification, internal state must be maintained
960 /// to implement the behavior. This state grows linearly with the number of
961 /// streams that are locally reset.
962 ///
963 /// The `reset_stream_duration` setting configures the max amount of time
964 /// this state will be maintained in memory. Once the duration elapses, the
965 /// stream state is purged from memory.
966 ///
967 /// Once the stream has been fully purged from memory, any additional frames
968 /// received for that stream will result in a connection level protocol
969 /// error, forcing the connection to terminate.
970 ///
971 /// The default value is 30 seconds.
972 ///
973 /// # Examples
974 ///
975 /// ```
976 /// # use tokio::io::{AsyncRead, AsyncWrite};
977 /// # use h2::server::*;
978 /// # use std::time::Duration;
979 /// #
980 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
981 /// # -> Handshake<T>
982 /// # {
983 /// // `server_fut` is a future representing the completion of the HTTP/2
984 /// // handshake.
985 /// let server_fut = Builder::new()
986 /// .reset_stream_duration(Duration::from_secs(10))
987 /// .handshake(my_io);
988 /// # server_fut
989 /// # }
990 /// #
991 /// # pub fn main() {}
992 /// ```
993 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
994 self.reset_stream_duration = dur;
995 self
996 }
997
998 /// Enables the [extended CONNECT protocol].
999 ///
1000 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1001 pub fn enable_connect_protocol(&mut self) -> &mut Self {
1002 self.settings.set_enable_connect_protocol(Some(1));
1003 self
1004 }
1005
1006 /// Creates a new configured HTTP/2 server backed by `io`.
1007 ///
1008 /// It is expected that `io` already be in an appropriate state to commence
1009 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1010 ///
1011 /// Returns a future which resolves to the [`Connection`] instance once the
1012 /// HTTP/2 handshake has been completed.
1013 ///
1014 /// This function also allows the caller to configure the send payload data
1015 /// type. See [Outbound data type] for more details.
1016 ///
1017 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1018 /// [Handshake]: ../index.html#handshake
1019 /// [`Connection`]: struct.Connection.html
1020 /// [Outbound data type]: ../index.html#outbound-data-type.
1021 ///
1022 /// # Examples
1023 ///
1024 /// Basic usage:
1025 ///
1026 /// ```
1027 /// # use tokio::io::{AsyncRead, AsyncWrite};
1028 /// # use h2::server::*;
1029 /// #
1030 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1031 /// # -> Handshake<T>
1032 /// # {
1033 /// // `server_fut` is a future representing the completion of the HTTP/2
1034 /// // handshake.
1035 /// let server_fut = Builder::new()
1036 /// .handshake(my_io);
1037 /// # server_fut
1038 /// # }
1039 /// #
1040 /// # pub fn main() {}
1041 /// ```
1042 ///
1043 /// Configures the send-payload data type. In this case, the outbound data
1044 /// type will be `&'static [u8]`.
1045 ///
1046 /// ```
1047 /// # use tokio::io::{AsyncRead, AsyncWrite};
1048 /// # use h2::server::*;
1049 /// #
1050 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1051 /// # -> Handshake<T, &'static [u8]>
1052 /// # {
1053 /// // `server_fut` is a future representing the completion of the HTTP/2
1054 /// // handshake.
1055 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1056 /// .handshake(my_io);
1057 /// # server_fut
1058 /// # }
1059 /// #
1060 /// # pub fn main() {}
1061 /// ```
1062 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1063 where
1064 T: AsyncRead + AsyncWrite + Unpin,
1065 B: Buf,
1066 {
1067 Connection::handshake2(io, self.clone())
1068 }
1069}
1070
1071impl Default for Builder {
1072 fn default() -> Builder {
1073 Builder::new()
1074 }
1075}
1076
1077// ===== impl SendResponse =====
1078
1079impl<B: Buf> SendResponse<B> {
1080 /// Send a response to a client request.
1081 ///
1082 /// On success, a [`SendStream`] instance is returned. This instance can be
1083 /// used to stream the response body and send trailers.
1084 ///
1085 /// If a body or trailers will be sent on the returned [`SendStream`]
1086 /// instance, then `end_of_stream` must be set to `false` when calling this
1087 /// function.
1088 ///
1089 /// The [`SendResponse`] instance is already associated with a received
1090 /// request. This function may only be called once per instance and only if
1091 /// [`send_reset`] has not been previously called.
1092 ///
1093 /// [`SendResponse`]: #
1094 /// [`SendStream`]: ../struct.SendStream.html
1095 /// [`send_reset`]: #method.send_reset
1096 pub fn send_response(
1097 &mut self,
1098 response: Response<()>,
1099 end_of_stream: bool,
1100 ) -> Result<SendStream<B>, crate::Error> {
1101 self.inner
1102 .send_response(response, end_of_stream)
1103 .map(|_| SendStream::new(self.inner.clone()))
1104 .map_err(Into::into)
1105 }
1106
1107 /// Push a request and response to the client
1108 ///
1109 /// On success, a [`SendResponse`] instance is returned.
1110 ///
1111 /// [`SendResponse`]: #
1112 pub fn push_request(
1113 &mut self,
1114 request: Request<()>,
1115 ) -> Result<SendPushedResponse<B>, crate::Error> {
1116 self.inner
1117 .send_push_promise(request)
1118 .map(|inner| SendPushedResponse {
1119 inner: SendResponse { inner },
1120 })
1121 .map_err(Into::into)
1122 }
1123
1124 /// Send a stream reset to the peer.
1125 ///
1126 /// This essentially cancels the stream, including any inbound or outbound
1127 /// data streams.
1128 ///
1129 /// If this function is called before [`send_response`], a call to
1130 /// [`send_response`] will result in an error.
1131 ///
1132 /// If this function is called while a [`SendStream`] instance is active,
1133 /// any further use of the instance will result in an error.
1134 ///
1135 /// This function should only be called once.
1136 ///
1137 /// [`send_response`]: #method.send_response
1138 /// [`SendStream`]: ../struct.SendStream.html
1139 pub fn send_reset(&mut self, reason: Reason) {
1140 self.inner.send_reset(reason)
1141 }
1142
1143 /// Polls to be notified when the client resets this stream.
1144 ///
1145 /// If stream is still open, this returns `Poll::Pending`, and
1146 /// registers the task to be notified if a `RST_STREAM` is received.
1147 ///
1148 /// If a `RST_STREAM` frame is received for this stream, calling this
1149 /// method will yield the `Reason` for the reset.
1150 ///
1151 /// # Error
1152 ///
1153 /// Calling this method after having called `send_response` will return
1154 /// a user error.
1155 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1156 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1157 }
1158
1159 /// Returns the stream ID of the response stream.
1160 ///
1161 /// # Panics
1162 ///
1163 /// If the lock on the stream store has been poisoned.
1164 pub fn stream_id(&self) -> crate::StreamId {
1165 crate::StreamId::from_internal(self.inner.stream_id())
1166 }
1167}
1168
1169// ===== impl SendPushedResponse =====
1170
1171impl<B: Buf> SendPushedResponse<B> {
1172 /// Send a response to a promised request.
1173 ///
1174 /// On success, a [`SendStream`] instance is returned. This instance can be
1175 /// used to stream the response body and send trailers.
1176 ///
1177 /// If a body or trailers will be sent on the returned [`SendStream`]
1178 /// instance, then `end_of_stream` must be set to `false` when calling this
1179 /// function.
1180 ///
1181 /// The [`SendPushedResponse`] instance is associated with a promised
1182 /// request. This function may only be called once per instance and only if
1183 /// [`send_reset`] has not been previously called.
1184 ///
1185 /// [`SendPushedResponse`]: #
1186 /// [`SendStream`]: ../struct.SendStream.html
1187 /// [`send_reset`]: #method.send_reset
1188 pub fn send_response(
1189 &mut self,
1190 response: Response<()>,
1191 end_of_stream: bool,
1192 ) -> Result<SendStream<B>, crate::Error> {
1193 self.inner.send_response(response, end_of_stream)
1194 }
1195
1196 /// Send a stream reset to the peer.
1197 ///
1198 /// This essentially cancels the stream, including any inbound or outbound
1199 /// data streams.
1200 ///
1201 /// If this function is called before [`send_response`], a call to
1202 /// [`send_response`] will result in an error.
1203 ///
1204 /// If this function is called while a [`SendStream`] instance is active,
1205 /// any further use of the instance will result in an error.
1206 ///
1207 /// This function should only be called once.
1208 ///
1209 /// [`send_response`]: #method.send_response
1210 /// [`SendStream`]: ../struct.SendStream.html
1211 pub fn send_reset(&mut self, reason: Reason) {
1212 self.inner.send_reset(reason)
1213 }
1214
1215 /// Polls to be notified when the client resets this stream.
1216 ///
1217 /// If stream is still open, this returns `Poll::Pending`, and
1218 /// registers the task to be notified if a `RST_STREAM` is received.
1219 ///
1220 /// If a `RST_STREAM` frame is received for this stream, calling this
1221 /// method will yield the `Reason` for the reset.
1222 ///
1223 /// # Error
1224 ///
1225 /// Calling this method after having called `send_response` will return
1226 /// a user error.
1227 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1228 self.inner.poll_reset(cx)
1229 }
1230
1231 /// Returns the stream ID of the response stream.
1232 ///
1233 /// # Panics
1234 ///
1235 /// If the lock on the stream store has been poisoned.
1236 pub fn stream_id(&self) -> crate::StreamId {
1237 self.inner.stream_id()
1238 }
1239}
1240
1241// ===== impl Flush =====
1242
1243impl<T, B: Buf> Flush<T, B> {
1244 fn new(codec: Codec<T, B>) -> Self {
1245 Flush { codec: Some(codec) }
1246 }
1247}
1248
1249impl<T, B> Future for Flush<T, B>
1250where
1251 T: AsyncWrite + Unpin,
1252 B: Buf,
1253{
1254 type Output = Result<Codec<T, B>, crate::Error>;
1255
1256 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1257 // Flush the codec
1258 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(op:crate::Error::from_io)?;
1259
1260 // Return the codec
1261 Poll::Ready(Ok(self.codec.take().unwrap()))
1262 }
1263}
1264
1265impl<T, B: Buf> ReadPreface<T, B> {
1266 fn new(codec: Codec<T, B>) -> Self {
1267 ReadPreface {
1268 codec: Some(codec),
1269 pos: 0,
1270 }
1271 }
1272
1273 fn inner_mut(&mut self) -> &mut T {
1274 self.codec.as_mut().unwrap().get_mut()
1275 }
1276}
1277
1278impl<T, B> Future for ReadPreface<T, B>
1279where
1280 T: AsyncRead + Unpin,
1281 B: Buf,
1282{
1283 type Output = Result<Codec<T, B>, crate::Error>;
1284
1285 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1286 let mut buf = [0; 24];
1287 let mut rem = PREFACE.len() - self.pos;
1288
1289 while rem > 0 {
1290 let mut buf = ReadBuf::new(&mut buf[..rem]);
1291 ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1292 .map_err(crate::Error::from_io)?;
1293 let n = buf.filled().len();
1294 if n == 0 {
1295 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1296 io::ErrorKind::UnexpectedEof,
1297 "connection closed before reading preface",
1298 ))));
1299 }
1300
1301 if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1302 proto_err!(conn: "read_preface: invalid preface");
1303 // TODO: Should this just write the GO_AWAY frame directly?
1304 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1305 }
1306
1307 self.pos += n;
1308 rem -= n; // TODO test
1309 }
1310
1311 Poll::Ready(Ok(self.codec.take().unwrap()))
1312 }
1313}
1314
1315// ===== impl Handshake =====
1316
1317impl<T, B: Buf> Future for Handshake<T, B>
1318where
1319 T: AsyncRead + AsyncWrite + Unpin,
1320 B: Buf,
1321{
1322 type Output = Result<Connection<T, B>, crate::Error>;
1323
1324 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1325 let span = self.span.clone(); // XXX(eliza): T_T
1326 let _e = span.enter();
1327 tracing::trace!(state = ?self.state);
1328
1329 loop {
1330 match &mut self.state {
1331 Handshaking::Flushing(flush) => {
1332 // We're currently flushing a pending SETTINGS frame. Poll the
1333 // flush future, and, if it's completed, advance our state to wait
1334 // for the client preface.
1335 let codec = match Pin::new(flush).poll(cx)? {
1336 Poll::Pending => {
1337 tracing::trace!(flush.poll = %"Pending");
1338 return Poll::Pending;
1339 }
1340 Poll::Ready(flushed) => {
1341 tracing::trace!(flush.poll = %"Ready");
1342 flushed
1343 }
1344 };
1345 self.state = Handshaking::ReadingPreface(
1346 ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1347 );
1348 }
1349 Handshaking::ReadingPreface(read) => {
1350 let codec = ready!(Pin::new(read).poll(cx)?);
1351
1352 self.state = Handshaking::Done;
1353
1354 let connection = proto::Connection::new(
1355 codec,
1356 Config {
1357 next_stream_id: 2.into(),
1358 // Server does not need to locally initiate any streams
1359 initial_max_send_streams: 0,
1360 max_send_buffer_size: self.builder.max_send_buffer_size,
1361 reset_stream_duration: self.builder.reset_stream_duration,
1362 reset_stream_max: self.builder.reset_stream_max,
1363 remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1364 settings: self.builder.settings.clone(),
1365 },
1366 );
1367
1368 tracing::trace!("connection established!");
1369 let mut c = Connection { connection };
1370 if let Some(sz) = self.builder.initial_target_connection_window_size {
1371 c.set_target_window_size(sz);
1372 }
1373
1374 return Poll::Ready(Ok(c));
1375 }
1376 Handshaking::Done => {
1377 panic!("Handshaking::poll() called again after handshaking was complete")
1378 }
1379 }
1380 }
1381 }
1382}
1383
1384impl<T, B> fmt::Debug for Handshake<T, B>
1385where
1386 T: AsyncRead + AsyncWrite + fmt::Debug,
1387 B: fmt::Debug + Buf,
1388{
1389 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1390 write!(fmt, "server::Handshake")
1391 }
1392}
1393
1394impl Peer {
1395 pub fn convert_send_message(
1396 id: StreamId,
1397 response: Response<()>,
1398 end_of_stream: bool,
1399 ) -> frame::Headers {
1400 use http::response::Parts;
1401
1402 // Extract the components of the HTTP request
1403 let (
1404 Parts {
1405 status, headers, ..
1406 },
1407 _,
1408 ) = response.into_parts();
1409
1410 // Build the set pseudo header set. All requests will include `method`
1411 // and `path`.
1412 let pseudo = Pseudo::response(status);
1413
1414 // Create the HEADERS frame
1415 let mut frame = frame::Headers::new(id, pseudo, headers);
1416
1417 if end_of_stream {
1418 frame.set_end_stream()
1419 }
1420
1421 frame
1422 }
1423
1424 pub fn convert_push_message(
1425 stream_id: StreamId,
1426 promised_id: StreamId,
1427 request: Request<()>,
1428 ) -> Result<frame::PushPromise, UserError> {
1429 use http::request::Parts;
1430
1431 if let Err(e) = frame::PushPromise::validate_request(&request) {
1432 use PushPromiseHeaderError::*;
1433 match e {
1434 NotSafeAndCacheable => tracing::debug!(
1435 ?promised_id,
1436 "convert_push_message: method {} is not safe and cacheable",
1437 request.method(),
1438 ),
1439 InvalidContentLength(e) => tracing::debug!(
1440 ?promised_id,
1441 "convert_push_message; promised request has invalid content-length {:?}",
1442 e,
1443 ),
1444 }
1445 return Err(UserError::MalformedHeaders);
1446 }
1447
1448 // Extract the components of the HTTP request
1449 let (
1450 Parts {
1451 method,
1452 uri,
1453 headers,
1454 ..
1455 },
1456 _,
1457 ) = request.into_parts();
1458
1459 let pseudo = Pseudo::request(method, uri, None);
1460
1461 Ok(frame::PushPromise::new(
1462 stream_id,
1463 promised_id,
1464 pseudo,
1465 headers,
1466 ))
1467 }
1468}
1469
1470impl proto::Peer for Peer {
1471 type Poll = Request<()>;
1472
1473 const NAME: &'static str = "Server";
1474
1475 fn is_server() -> bool {
1476 true
1477 }
1478
1479 fn r#dyn() -> proto::DynPeer {
1480 proto::DynPeer::Server
1481 }
1482
1483 fn convert_poll_message(
1484 pseudo: Pseudo,
1485 fields: HeaderMap,
1486 stream_id: StreamId,
1487 ) -> Result<Self::Poll, Error> {
1488 use http::{uri, Version};
1489
1490 let mut b = Request::builder();
1491
1492 macro_rules! malformed {
1493 ($($arg:tt)*) => {{
1494 tracing::debug!($($arg)*);
1495 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1496 }}
1497 }
1498
1499 b = b.version(Version::HTTP_2);
1500
1501 let is_connect;
1502 if let Some(method) = pseudo.method {
1503 is_connect = method == Method::CONNECT;
1504 b = b.method(method);
1505 } else {
1506 malformed!("malformed headers: missing method");
1507 }
1508
1509 let has_protocol = pseudo.protocol.is_some();
1510 if has_protocol {
1511 if is_connect {
1512 // Assert that we have the right type.
1513 b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1514 } else {
1515 malformed!("malformed headers: :protocol on non-CONNECT request");
1516 }
1517 }
1518
1519 if pseudo.status.is_some() {
1520 malformed!("malformed headers: :status field on request");
1521 }
1522
1523 // Convert the URI
1524 let mut parts = uri::Parts::default();
1525
1526 // A request translated from HTTP/1 must not include the :authority
1527 // header
1528 if let Some(authority) = pseudo.authority {
1529 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1530 parts.authority = Some(maybe_authority.or_else(|why| {
1531 malformed!(
1532 "malformed headers: malformed authority ({:?}): {}",
1533 authority,
1534 why,
1535 )
1536 })?);
1537 }
1538
1539 // A :scheme is required, except CONNECT.
1540 if let Some(scheme) = pseudo.scheme {
1541 if is_connect && !has_protocol {
1542 malformed!("malformed headers: :scheme in CONNECT");
1543 }
1544 let maybe_scheme = scheme.parse();
1545 let scheme = maybe_scheme.or_else(|why| {
1546 malformed!(
1547 "malformed headers: malformed scheme ({:?}): {}",
1548 scheme,
1549 why,
1550 )
1551 })?;
1552
1553 // It's not possible to build an `Uri` from a scheme and path. So,
1554 // after validating is was a valid scheme, we just have to drop it
1555 // if there isn't an :authority.
1556 if parts.authority.is_some() {
1557 parts.scheme = Some(scheme);
1558 }
1559 } else if !is_connect || has_protocol {
1560 malformed!("malformed headers: missing scheme");
1561 }
1562
1563 if let Some(path) = pseudo.path {
1564 if is_connect && !has_protocol {
1565 malformed!("malformed headers: :path in CONNECT");
1566 }
1567
1568 // This cannot be empty
1569 if path.is_empty() {
1570 malformed!("malformed headers: missing path");
1571 }
1572
1573 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1574 parts.path_and_query = Some(maybe_path.or_else(|why| {
1575 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1576 })?);
1577 } else if is_connect && has_protocol {
1578 malformed!("malformed headers: missing path in extended CONNECT");
1579 }
1580
1581 b = b.uri(parts);
1582
1583 let mut request = match b.body(()) {
1584 Ok(request) => request,
1585 Err(e) => {
1586 // TODO: Should there be more specialized handling for different
1587 // kinds of errors
1588 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1589 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1590 }
1591 };
1592
1593 *request.headers_mut() = fields;
1594
1595 Ok(request)
1596 }
1597}
1598
1599// ===== impl Handshaking =====
1600
1601impl<T, B> fmt::Debug for Handshaking<T, B>
1602where
1603 B: Buf,
1604{
1605 #[inline]
1606 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1607 match *self {
1608 Handshaking::Flushing(_) => f.write_str(data:"Flushing(_)"),
1609 Handshaking::ReadingPreface(_) => f.write_str(data:"ReadingPreface(_)"),
1610 Handshaking::Done => f.write_str(data:"Done"),
1611 }
1612 }
1613}
1614