1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller. The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//!
69//! use h2::client;
70//!
71//! use http::{Request, Method};
72//! use std::error::Error;
73//! use tokio::net::TcpStream;
74//!
75//! #[tokio::main]
76//! pub async fn main() -> Result<(), Box<dyn Error>> {
77//! // Establish TCP connection to the server.
78//! let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79//! let (h2, connection) = client::handshake(tcp).await?;
80//! tokio::spawn(async move {
81//! connection.await.unwrap();
82//! });
83//!
84//! let mut h2 = h2.ready().await?;
85//! // Prepare the HTTP request to send to the server.
86//! let request = Request::builder()
87//! .method(Method::GET)
88//! .uri("https://www.example.com/")
89//! .body(())
90//! .unwrap();
91//!
92//! // Send the request. The second tuple item allows the caller
93//! // to stream a request body.
94//! let (response, _) = h2.send_request(request, true).unwrap();
95//!
96//! let (head, mut body) = response.await?.into_parts();
97//!
98//! println!("Received response: {:?}", head);
99//!
100//! // The `flow_control` handle allows the caller to manage
101//! // flow control.
102//! //
103//! // Whenever data is received, the caller is responsible for
104//! // releasing capacity back to the server once it has freed
105//! // the data from memory.
106//! let mut flow_control = body.flow_control().clone();
107//!
108//! while let Some(chunk) = body.data().await {
109//! let chunk = chunk?;
110//! println!("RX: {:?}", chunk);
111//!
112//! // Let the server send more data.
113//! let _ = flow_control.release_capacity(chunk.len());
114//! }
115//!
116//! Ok(())
117//! }
118//! ```
119//!
120//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121//! [`handshake`]: fn.handshake.html
122//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123//! [`SendRequest`]: struct.SendRequest.html
124//! [`SendStream`]: ../struct.SendStream.html
125//! [Making requests]: #making-requests
126//! [Managing the connection]: #managing-the-connection
127//! [`Connection`]: struct.Connection.html
128//! [`Connection::poll`]: struct.Connection.html#method.poll
129//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131//! [`SendRequest`]: struct.SendRequest.html
132//! [`ResponseFuture`]: struct.ResponseFuture.html
133//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135//! [`Builder`]: struct.Builder.html
136//! [`Error`]: ../struct.Error.html
137
138use crate::codec::{Codec, SendError, UserError};
139use crate::ext::Protocol;
140use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
141use crate::proto::{self, Error};
142use crate::{FlowControl, PingPong, RecvStream, SendStream};
143
144use bytes::{Buf, Bytes};
145use http::{uri, HeaderMap, Method, Request, Response, Version};
146use std::fmt;
147use std::future::Future;
148use std::pin::Pin;
149use std::task::{Context, Poll};
150use std::time::Duration;
151use std::usize;
152use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
153use tracing::Instrument;
154
155/// Initializes new HTTP/2 streams on a connection by sending a request.
156///
157/// This type does no work itself. Instead, it is a handle to the inner
158/// connection state held by [`Connection`]. If the associated connection
159/// instance is dropped, all `SendRequest` functions will return [`Error`].
160///
161/// [`SendRequest`] instances are able to move to and operate on separate tasks
162/// / threads than their associated [`Connection`] instance. Internally, there
163/// is a buffer used to stage requests before they get written to the
164/// connection. There is no guarantee that requests get written to the
165/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
166///
167/// [`SendRequest`] implements [`Clone`], enabling the creation of many
168/// instances that are backed by a single connection.
169///
170/// See [module] level documentation for more details.
171///
172/// [module]: index.html
173/// [`Connection`]: struct.Connection.html
174/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
175/// [`Error`]: ../struct.Error.html
176pub struct SendRequest<B: Buf> {
177 inner: proto::Streams<B, Peer>,
178 pending: Option<proto::OpaqueStreamRef>,
179}
180
181/// Returns a `SendRequest` instance once it is ready to send at least one
182/// request.
183#[derive(Debug)]
184pub struct ReadySendRequest<B: Buf> {
185 inner: Option<SendRequest<B>>,
186}
187
188/// Manages all state associated with an HTTP/2 client connection.
189///
190/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
191/// implements the HTTP/2 client logic for that connection. It is responsible
192/// for driving the internal state forward, performing the work requested of the
193/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
194/// [`RecvStream`]).
195///
196/// `Connection` values are created by calling [`handshake`]. Once a
197/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
198/// until `Ready` is returned. The easiest way to do this is to submit the
199/// `Connection` instance to an [executor].
200///
201/// [module]: index.html
202/// [`handshake`]: fn.handshake.html
203/// [`SendRequest`]: struct.SendRequest.html
204/// [`ResponseFuture`]: struct.ResponseFuture.html
205/// [`SendStream`]: ../struct.SendStream.html
206/// [`RecvStream`]: ../struct.RecvStream.html
207/// [`poll`]: #method.poll
208/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
209///
210/// # Examples
211///
212/// ```
213/// # use tokio::io::{AsyncRead, AsyncWrite};
214/// # use h2::client;
215/// # use h2::client::*;
216/// #
217/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
218/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
219/// # {
220/// let (send_request, connection) = client::handshake(my_io).await?;
221/// // Submit the connection handle to an executor.
222/// tokio::spawn(async { connection.await.expect("connection failed"); });
223///
224/// // Now, use `send_request` to initialize HTTP/2 streams.
225/// // ...
226/// # Ok(())
227/// # }
228/// #
229/// # pub fn main() {}
230/// ```
231#[must_use = "futures do nothing unless polled"]
232pub struct Connection<T, B: Buf = Bytes> {
233 inner: proto::Connection<T, Peer, B>,
234}
235
236/// A future of an HTTP response.
237#[derive(Debug)]
238#[must_use = "futures do nothing unless polled"]
239pub struct ResponseFuture {
240 inner: proto::OpaqueStreamRef,
241 push_promise_consumed: bool,
242}
243
244/// A future of a pushed HTTP response.
245///
246/// We have to differentiate between pushed and non pushed because of the spec
247/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
248/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
249/// > that is in either the "open" or "half-closed (remote)" state.
250#[derive(Debug)]
251#[must_use = "futures do nothing unless polled"]
252pub struct PushedResponseFuture {
253 inner: ResponseFuture,
254}
255
256/// A pushed response and corresponding request headers
257#[derive(Debug)]
258pub struct PushPromise {
259 /// The request headers
260 request: Request<()>,
261
262 /// The pushed response
263 response: PushedResponseFuture,
264}
265
266/// A stream of pushed responses and corresponding promised requests
267#[derive(Debug)]
268#[must_use = "streams do nothing unless polled"]
269pub struct PushPromises {
270 inner: proto::OpaqueStreamRef,
271}
272
273/// Builds client connections with custom configuration values.
274///
275/// Methods can be chained in order to set the configuration values.
276///
277/// The client is constructed by calling [`handshake`] and passing the I/O
278/// handle that will back the HTTP/2 server.
279///
280/// New instances of `Builder` are obtained via [`Builder::new`].
281///
282/// See function level documentation for details on the various client
283/// configuration settings.
284///
285/// [`Builder::new`]: struct.Builder.html#method.new
286/// [`handshake`]: struct.Builder.html#method.handshake
287///
288/// # Examples
289///
290/// ```
291/// # use tokio::io::{AsyncRead, AsyncWrite};
292/// # use h2::client::*;
293/// # use bytes::Bytes;
294/// #
295/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
296/// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
297/// # {
298/// // `client_fut` is a future representing the completion of the HTTP/2
299/// // handshake.
300/// let client_fut = Builder::new()
301/// .initial_window_size(1_000_000)
302/// .max_concurrent_streams(1000)
303/// .handshake(my_io);
304/// # client_fut.await
305/// # }
306/// #
307/// # pub fn main() {}
308/// ```
309#[derive(Clone, Debug)]
310pub struct Builder {
311 /// Time to keep locally reset streams around before reaping.
312 reset_stream_duration: Duration,
313
314 /// Initial maximum number of locally initiated (send) streams.
315 /// After receiving a Settings frame from the remote peer,
316 /// the connection will overwrite this value with the
317 /// MAX_CONCURRENT_STREAMS specified in the frame.
318 initial_max_send_streams: usize,
319
320 /// Initial target window size for new connections.
321 initial_target_connection_window_size: Option<u32>,
322
323 /// Maximum amount of bytes to "buffer" for writing per stream.
324 max_send_buffer_size: usize,
325
326 /// Maximum number of locally reset streams to keep at a time.
327 reset_stream_max: usize,
328
329 /// Maximum number of remotely reset streams to allow in the pending
330 /// accept queue.
331 pending_accept_reset_stream_max: usize,
332
333 /// Initial `Settings` frame to send as part of the handshake.
334 settings: Settings,
335
336 /// The stream ID of the first (lowest) stream. Subsequent streams will use
337 /// monotonically increasing stream IDs.
338 stream_id: StreamId,
339}
340
341#[derive(Debug)]
342pub(crate) struct Peer;
343
344// ===== impl SendRequest =====
345
346impl<B> SendRequest<B>
347where
348 B: Buf,
349{
350 /// Returns `Ready` when the connection can initialize a new HTTP/2
351 /// stream.
352 ///
353 /// This function must return `Ready` before `send_request` is called. When
354 /// `Poll::Pending` is returned, the task will be notified once the readiness
355 /// state changes.
356 ///
357 /// See [module] level docs for more details.
358 ///
359 /// [module]: index.html
360 pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
361 ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
362 self.pending = None;
363 Poll::Ready(Ok(()))
364 }
365
366 /// Consumes `self`, returning a future that returns `self` back once it is
367 /// ready to send a request.
368 ///
369 /// This function should be called before calling `send_request`.
370 ///
371 /// This is a functional combinator for [`poll_ready`]. The returned future
372 /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
373 /// the caller.
374 ///
375 /// # Examples
376 ///
377 /// ```rust
378 /// # use h2::client::*;
379 /// # use http::*;
380 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
381 /// # {
382 /// // First, wait until the `send_request` handle is ready to send a new
383 /// // request
384 /// let mut send_request = send_request.ready().await.unwrap();
385 /// // Use `send_request` here.
386 /// # }
387 /// # pub fn main() {}
388 /// ```
389 ///
390 /// See [module] level docs for more details.
391 ///
392 /// [`poll_ready`]: #method.poll_ready
393 /// [module]: index.html
394 pub fn ready(self) -> ReadySendRequest<B> {
395 ReadySendRequest { inner: Some(self) }
396 }
397
398 /// Sends a HTTP/2 request to the server.
399 ///
400 /// `send_request` initializes a new HTTP/2 stream on the associated
401 /// connection, then sends the given request using this new stream. Only the
402 /// request head is sent.
403 ///
404 /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
405 /// are returned. The [`ResponseFuture`] instance is used to get the
406 /// server's response and the [`SendStream`] instance is used to send a
407 /// request body or trailers to the server over the same HTTP/2 stream.
408 ///
409 /// To send a request body or trailers, set `end_of_stream` to `false`.
410 /// Then, use the returned [`SendStream`] instance to stream request body
411 /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
412 /// then attempting to call [`SendStream::send_data`] or
413 /// [`SendStream::send_trailers`] will result in an error.
414 ///
415 /// If no request body or trailers are to be sent, set `end_of_stream` to
416 /// `true` and drop the returned [`SendStream`] instance.
417 ///
418 /// # A note on HTTP versions
419 ///
420 /// The provided `Request` will be encoded differently depending on the
421 /// value of its version field. If the version is set to 2.0, then the
422 /// request is encoded as per the specification recommends.
423 ///
424 /// If the version is set to a lower value, then the request is encoded to
425 /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
426 /// headers are permitted and the `:authority` pseudo header is not
427 /// included.
428 ///
429 /// The caller should always set the request's version field to 2.0 unless
430 /// specifically transmitting an HTTP 1.1 request over 2.0.
431 ///
432 /// # Examples
433 ///
434 /// Sending a request with no body
435 ///
436 /// ```rust
437 /// # use h2::client::*;
438 /// # use http::*;
439 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
440 /// # {
441 /// // First, wait until the `send_request` handle is ready to send a new
442 /// // request
443 /// let mut send_request = send_request.ready().await.unwrap();
444 /// // Prepare the HTTP request to send to the server.
445 /// let request = Request::get("https://www.example.com/")
446 /// .body(())
447 /// .unwrap();
448 ///
449 /// // Send the request to the server. Since we are not sending a
450 /// // body or trailers, we can drop the `SendStream` instance.
451 /// let (response, _) = send_request.send_request(request, true).unwrap();
452 /// let response = response.await.unwrap();
453 /// // Process the response
454 /// # }
455 /// # pub fn main() {}
456 /// ```
457 ///
458 /// Sending a request with a body and trailers
459 ///
460 /// ```rust
461 /// # use h2::client::*;
462 /// # use http::*;
463 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
464 /// # {
465 /// // First, wait until the `send_request` handle is ready to send a new
466 /// // request
467 /// let mut send_request = send_request.ready().await.unwrap();
468 ///
469 /// // Prepare the HTTP request to send to the server.
470 /// let request = Request::get("https://www.example.com/")
471 /// .body(())
472 /// .unwrap();
473 ///
474 /// // Send the request to the server. If we are not sending a
475 /// // body or trailers, we can drop the `SendStream` instance.
476 /// let (response, mut send_stream) = send_request
477 /// .send_request(request, false).unwrap();
478 ///
479 /// // At this point, one option would be to wait for send capacity.
480 /// // Doing so would allow us to not hold data in memory that
481 /// // cannot be sent. However, this is not a requirement, so this
482 /// // example will skip that step. See `SendStream` documentation
483 /// // for more details.
484 /// send_stream.send_data(b"hello", false).unwrap();
485 /// send_stream.send_data(b"world", false).unwrap();
486 ///
487 /// // Send the trailers.
488 /// let mut trailers = HeaderMap::new();
489 /// trailers.insert(
490 /// header::HeaderName::from_bytes(b"my-trailer").unwrap(),
491 /// header::HeaderValue::from_bytes(b"hello").unwrap());
492 ///
493 /// send_stream.send_trailers(trailers).unwrap();
494 ///
495 /// let response = response.await.unwrap();
496 /// // Process the response
497 /// # }
498 /// # pub fn main() {}
499 /// ```
500 ///
501 /// [`ResponseFuture`]: struct.ResponseFuture.html
502 /// [`SendStream`]: ../struct.SendStream.html
503 /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
504 /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
505 pub fn send_request(
506 &mut self,
507 request: Request<()>,
508 end_of_stream: bool,
509 ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
510 self.inner
511 .send_request(request, end_of_stream, self.pending.as_ref())
512 .map_err(Into::into)
513 .map(|(stream, is_full)| {
514 if stream.is_pending_open() && is_full {
515 // Only prevent sending another request when the request queue
516 // is not full.
517 self.pending = Some(stream.clone_to_opaque());
518 }
519
520 let response = ResponseFuture {
521 inner: stream.clone_to_opaque(),
522 push_promise_consumed: false,
523 };
524
525 let stream = SendStream::new(stream);
526
527 (response, stream)
528 })
529 }
530
531 /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
532 ///
533 /// This setting is configured by the server peer by sending the
534 /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
535 /// This method returns the currently acknowledged value received from the
536 /// remote.
537 ///
538 /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
539 /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
540 pub fn is_extended_connect_protocol_enabled(&self) -> bool {
541 self.inner.is_extended_connect_protocol_enabled()
542 }
543}
544
545impl<B> fmt::Debug for SendRequest<B>
546where
547 B: Buf,
548{
549 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
550 fmt.debug_struct(name:"SendRequest").finish()
551 }
552}
553
554impl<B> Clone for SendRequest<B>
555where
556 B: Buf,
557{
558 fn clone(&self) -> Self {
559 SendRequest {
560 inner: self.inner.clone(),
561 pending: None,
562 }
563 }
564}
565
566#[cfg(feature = "unstable")]
567impl<B> SendRequest<B>
568where
569 B: Buf,
570{
571 /// Returns the number of active streams.
572 ///
573 /// An active stream is a stream that has not yet transitioned to a closed
574 /// state.
575 pub fn num_active_streams(&self) -> usize {
576 self.inner.num_active_streams()
577 }
578
579 /// Returns the number of streams that are held in memory.
580 ///
581 /// A wired stream is a stream that is either active or is closed but must
582 /// stay in memory for some reason. For example, there are still outstanding
583 /// userspace handles pointing to the slot.
584 pub fn num_wired_streams(&self) -> usize {
585 self.inner.num_wired_streams()
586 }
587}
588
589// ===== impl ReadySendRequest =====
590
591impl<B> Future for ReadySendRequest<B>
592where
593 B: Buf,
594{
595 type Output = Result<SendRequest<B>, crate::Error>;
596
597 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
598 match &mut self.inner {
599 Some(send_request: &mut SendRequest) => {
600 ready!(send_request.poll_ready(cx))?;
601 }
602 None => panic!("called `poll` after future completed"),
603 }
604
605 Poll::Ready(Ok(self.inner.take().unwrap()))
606 }
607}
608
609// ===== impl Builder =====
610
611impl Builder {
612 /// Returns a new client builder instance initialized with default
613 /// configuration values.
614 ///
615 /// Configuration methods can be chained on the return value.
616 ///
617 /// # Examples
618 ///
619 /// ```
620 /// # use tokio::io::{AsyncRead, AsyncWrite};
621 /// # use h2::client::*;
622 /// # use bytes::Bytes;
623 /// #
624 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
625 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
626 /// # {
627 /// // `client_fut` is a future representing the completion of the HTTP/2
628 /// // handshake.
629 /// let client_fut = Builder::new()
630 /// .initial_window_size(1_000_000)
631 /// .max_concurrent_streams(1000)
632 /// .handshake(my_io);
633 /// # client_fut.await
634 /// # }
635 /// #
636 /// # pub fn main() {}
637 /// ```
638 pub fn new() -> Builder {
639 Builder {
640 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
641 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
642 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
643 pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
644 initial_target_connection_window_size: None,
645 initial_max_send_streams: usize::MAX,
646 settings: Default::default(),
647 stream_id: 1.into(),
648 }
649 }
650
651 /// Indicates the initial window size (in octets) for stream-level
652 /// flow control for received data.
653 ///
654 /// The initial window of a stream is used as part of flow control. For more
655 /// details, see [`FlowControl`].
656 ///
657 /// The default value is 65,535.
658 ///
659 /// [`FlowControl`]: ../struct.FlowControl.html
660 ///
661 /// # Examples
662 ///
663 /// ```
664 /// # use tokio::io::{AsyncRead, AsyncWrite};
665 /// # use h2::client::*;
666 /// # use bytes::Bytes;
667 /// #
668 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
669 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
670 /// # {
671 /// // `client_fut` is a future representing the completion of the HTTP/2
672 /// // handshake.
673 /// let client_fut = Builder::new()
674 /// .initial_window_size(1_000_000)
675 /// .handshake(my_io);
676 /// # client_fut.await
677 /// # }
678 /// #
679 /// # pub fn main() {}
680 /// ```
681 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
682 self.settings.set_initial_window_size(Some(size));
683 self
684 }
685
686 /// Indicates the initial window size (in octets) for connection-level flow control
687 /// for received data.
688 ///
689 /// The initial window of a connection is used as part of flow control. For more details,
690 /// see [`FlowControl`].
691 ///
692 /// The default value is 65,535.
693 ///
694 /// [`FlowControl`]: ../struct.FlowControl.html
695 ///
696 /// # Examples
697 ///
698 /// ```
699 /// # use tokio::io::{AsyncRead, AsyncWrite};
700 /// # use h2::client::*;
701 /// # use bytes::Bytes;
702 /// #
703 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
704 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
705 /// # {
706 /// // `client_fut` is a future representing the completion of the HTTP/2
707 /// // handshake.
708 /// let client_fut = Builder::new()
709 /// .initial_connection_window_size(1_000_000)
710 /// .handshake(my_io);
711 /// # client_fut.await
712 /// # }
713 /// #
714 /// # pub fn main() {}
715 /// ```
716 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
717 self.initial_target_connection_window_size = Some(size);
718 self
719 }
720
721 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
722 /// configured client is able to accept.
723 ///
724 /// The sender may send data frames that are **smaller** than this value,
725 /// but any data larger than `max` will be broken up into multiple `DATA`
726 /// frames.
727 ///
728 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
729 ///
730 /// # Examples
731 ///
732 /// ```
733 /// # use tokio::io::{AsyncRead, AsyncWrite};
734 /// # use h2::client::*;
735 /// # use bytes::Bytes;
736 /// #
737 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
738 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
739 /// # {
740 /// // `client_fut` is a future representing the completion of the HTTP/2
741 /// // handshake.
742 /// let client_fut = Builder::new()
743 /// .max_frame_size(1_000_000)
744 /// .handshake(my_io);
745 /// # client_fut.await
746 /// # }
747 /// #
748 /// # pub fn main() {}
749 /// ```
750 ///
751 /// # Panics
752 ///
753 /// This function panics if `max` is not within the legal range specified
754 /// above.
755 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
756 self.settings.set_max_frame_size(Some(max));
757 self
758 }
759
760 /// Sets the max size of received header frames.
761 ///
762 /// This advisory setting informs a peer of the maximum size of header list
763 /// that the sender is prepared to accept, in octets. The value is based on
764 /// the uncompressed size of header fields, including the length of the name
765 /// and value in octets plus an overhead of 32 octets for each header field.
766 ///
767 /// This setting is also used to limit the maximum amount of data that is
768 /// buffered to decode HEADERS frames.
769 ///
770 /// # Examples
771 ///
772 /// ```
773 /// # use tokio::io::{AsyncRead, AsyncWrite};
774 /// # use h2::client::*;
775 /// # use bytes::Bytes;
776 /// #
777 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
778 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
779 /// # {
780 /// // `client_fut` is a future representing the completion of the HTTP/2
781 /// // handshake.
782 /// let client_fut = Builder::new()
783 /// .max_header_list_size(16 * 1024)
784 /// .handshake(my_io);
785 /// # client_fut.await
786 /// # }
787 /// #
788 /// # pub fn main() {}
789 /// ```
790 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
791 self.settings.set_max_header_list_size(Some(max));
792 self
793 }
794
795 /// Sets the maximum number of concurrent streams.
796 ///
797 /// The maximum concurrent streams setting only controls the maximum number
798 /// of streams that can be initiated by the remote peer. In other words,
799 /// when this setting is set to 100, this does not limit the number of
800 /// concurrent streams that can be created by the caller.
801 ///
802 /// It is recommended that this value be no smaller than 100, so as to not
803 /// unnecessarily limit parallelism. However, any value is legal, including
804 /// 0. If `max` is set to 0, then the remote will not be permitted to
805 /// initiate streams.
806 ///
807 /// Note that streams in the reserved state, i.e., push promises that have
808 /// been reserved but the stream has not started, do not count against this
809 /// setting.
810 ///
811 /// Also note that if the remote *does* exceed the value set here, it is not
812 /// a protocol level error. Instead, the `h2` library will immediately reset
813 /// the stream.
814 ///
815 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
816 ///
817 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
818 ///
819 /// # Examples
820 ///
821 /// ```
822 /// # use tokio::io::{AsyncRead, AsyncWrite};
823 /// # use h2::client::*;
824 /// # use bytes::Bytes;
825 /// #
826 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
827 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
828 /// # {
829 /// // `client_fut` is a future representing the completion of the HTTP/2
830 /// // handshake.
831 /// let client_fut = Builder::new()
832 /// .max_concurrent_streams(1000)
833 /// .handshake(my_io);
834 /// # client_fut.await
835 /// # }
836 /// #
837 /// # pub fn main() {}
838 /// ```
839 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
840 self.settings.set_max_concurrent_streams(Some(max));
841 self
842 }
843
844 /// Sets the initial maximum of locally initiated (send) streams.
845 ///
846 /// The initial settings will be overwritten by the remote peer when
847 /// the Settings frame is received. The new value will be set to the
848 /// `max_concurrent_streams()` from the frame.
849 ///
850 /// This setting prevents the caller from exceeding this number of
851 /// streams that are counted towards the concurrency limit.
852 ///
853 /// Sending streams past the limit returned by the peer will be treated
854 /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
855 ///
856 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
857 ///
858 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
859 ///
860 /// # Examples
861 ///
862 /// ```
863 /// # use tokio::io::{AsyncRead, AsyncWrite};
864 /// # use h2::client::*;
865 /// # use bytes::Bytes;
866 /// #
867 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
868 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
869 /// # {
870 /// // `client_fut` is a future representing the completion of the HTTP/2
871 /// // handshake.
872 /// let client_fut = Builder::new()
873 /// .initial_max_send_streams(1000)
874 /// .handshake(my_io);
875 /// # client_fut.await
876 /// # }
877 /// #
878 /// # pub fn main() {}
879 /// ```
880 pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
881 self.initial_max_send_streams = initial;
882 self
883 }
884
885 /// Sets the maximum number of concurrent locally reset streams.
886 ///
887 /// When a stream is explicitly reset, the HTTP/2 specification requires
888 /// that any further frames received for that stream must be ignored for
889 /// "some time".
890 ///
891 /// In order to satisfy the specification, internal state must be maintained
892 /// to implement the behavior. This state grows linearly with the number of
893 /// streams that are locally reset.
894 ///
895 /// The `max_concurrent_reset_streams` setting configures sets an upper
896 /// bound on the amount of state that is maintained. When this max value is
897 /// reached, the oldest reset stream is purged from memory.
898 ///
899 /// Once the stream has been fully purged from memory, any additional frames
900 /// received for that stream will result in a connection level protocol
901 /// error, forcing the connection to terminate.
902 ///
903 /// The default value is 10.
904 ///
905 /// # Examples
906 ///
907 /// ```
908 /// # use tokio::io::{AsyncRead, AsyncWrite};
909 /// # use h2::client::*;
910 /// # use bytes::Bytes;
911 /// #
912 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
913 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
914 /// # {
915 /// // `client_fut` is a future representing the completion of the HTTP/2
916 /// // handshake.
917 /// let client_fut = Builder::new()
918 /// .max_concurrent_reset_streams(1000)
919 /// .handshake(my_io);
920 /// # client_fut.await
921 /// # }
922 /// #
923 /// # pub fn main() {}
924 /// ```
925 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
926 self.reset_stream_max = max;
927 self
928 }
929
930 /// Sets the duration to remember locally reset streams.
931 ///
932 /// When a stream is explicitly reset, the HTTP/2 specification requires
933 /// that any further frames received for that stream must be ignored for
934 /// "some time".
935 ///
936 /// In order to satisfy the specification, internal state must be maintained
937 /// to implement the behavior. This state grows linearly with the number of
938 /// streams that are locally reset.
939 ///
940 /// The `reset_stream_duration` setting configures the max amount of time
941 /// this state will be maintained in memory. Once the duration elapses, the
942 /// stream state is purged from memory.
943 ///
944 /// Once the stream has been fully purged from memory, any additional frames
945 /// received for that stream will result in a connection level protocol
946 /// error, forcing the connection to terminate.
947 ///
948 /// The default value is 30 seconds.
949 ///
950 /// # Examples
951 ///
952 /// ```
953 /// # use tokio::io::{AsyncRead, AsyncWrite};
954 /// # use h2::client::*;
955 /// # use std::time::Duration;
956 /// # use bytes::Bytes;
957 /// #
958 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
959 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
960 /// # {
961 /// // `client_fut` is a future representing the completion of the HTTP/2
962 /// // handshake.
963 /// let client_fut = Builder::new()
964 /// .reset_stream_duration(Duration::from_secs(10))
965 /// .handshake(my_io);
966 /// # client_fut.await
967 /// # }
968 /// #
969 /// # pub fn main() {}
970 /// ```
971 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
972 self.reset_stream_duration = dur;
973 self
974 }
975
976 /// Sets the maximum number of pending-accept remotely-reset streams.
977 ///
978 /// Streams that have been received by the peer, but not accepted by the
979 /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
980 /// could send a request and then shortly after, realize it is not needed,
981 /// sending a CANCEL.
982 ///
983 /// However, since those streams are now "closed", they don't count towards
984 /// the max concurrent streams. So, they will sit in the accept queue,
985 /// using memory.
986 ///
987 /// When the number of remotely-reset streams sitting in the pending-accept
988 /// queue reaches this maximum value, a connection error with the code of
989 /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
990 /// `Future`.
991 ///
992 /// The default value is currently 20, but could change.
993 ///
994 /// # Examples
995 ///
996 /// ```
997 /// # use tokio::io::{AsyncRead, AsyncWrite};
998 /// # use h2::client::*;
999 /// # use bytes::Bytes;
1000 /// #
1001 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1002 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1003 /// # {
1004 /// // `client_fut` is a future representing the completion of the HTTP/2
1005 /// // handshake.
1006 /// let client_fut = Builder::new()
1007 /// .max_pending_accept_reset_streams(100)
1008 /// .handshake(my_io);
1009 /// # client_fut.await
1010 /// # }
1011 /// #
1012 /// # pub fn main() {}
1013 /// ```
1014 pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1015 self.pending_accept_reset_stream_max = max;
1016 self
1017 }
1018
1019 /// Sets the maximum send buffer size per stream.
1020 ///
1021 /// Once a stream has buffered up to (or over) the maximum, the stream's
1022 /// flow control will not "poll" additional capacity. Once bytes for the
1023 /// stream have been written to the connection, the send buffer capacity
1024 /// will be freed up again.
1025 ///
1026 /// The default is currently ~400KB, but may change.
1027 ///
1028 /// # Panics
1029 ///
1030 /// This function panics if `max` is larger than `u32::MAX`.
1031 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1032 assert!(max <= std::u32::MAX as usize);
1033 self.max_send_buffer_size = max;
1034 self
1035 }
1036
1037 /// Enables or disables server push promises.
1038 ///
1039 /// This value is included in the initial SETTINGS handshake.
1040 /// Setting this value to value to
1041 /// false in the initial SETTINGS handshake guarantees that the remote server
1042 /// will never send a push promise.
1043 ///
1044 /// This setting can be changed during the life of a single HTTP/2
1045 /// connection by sending another settings frame updating the value.
1046 ///
1047 /// Default value: `true`.
1048 ///
1049 /// # Examples
1050 ///
1051 /// ```
1052 /// # use tokio::io::{AsyncRead, AsyncWrite};
1053 /// # use h2::client::*;
1054 /// # use std::time::Duration;
1055 /// # use bytes::Bytes;
1056 /// #
1057 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1058 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1059 /// # {
1060 /// // `client_fut` is a future representing the completion of the HTTP/2
1061 /// // handshake.
1062 /// let client_fut = Builder::new()
1063 /// .enable_push(false)
1064 /// .handshake(my_io);
1065 /// # client_fut.await
1066 /// # }
1067 /// #
1068 /// # pub fn main() {}
1069 /// ```
1070 pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1071 self.settings.set_enable_push(enabled);
1072 self
1073 }
1074
1075 /// Sets the header table size.
1076 ///
1077 /// This setting informs the peer of the maximum size of the header compression
1078 /// table used to encode header blocks, in octets. The encoder may select any value
1079 /// equal to or less than the header table size specified by the sender.
1080 ///
1081 /// The default value is 4,096.
1082 ///
1083 /// # Examples
1084 ///
1085 /// ```
1086 /// # use tokio::io::{AsyncRead, AsyncWrite};
1087 /// # use h2::client::*;
1088 /// # use bytes::Bytes;
1089 /// #
1090 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1091 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1092 /// # {
1093 /// // `client_fut` is a future representing the completion of the HTTP/2
1094 /// // handshake.
1095 /// let client_fut = Builder::new()
1096 /// .header_table_size(1_000_000)
1097 /// .handshake(my_io);
1098 /// # client_fut.await
1099 /// # }
1100 /// #
1101 /// # pub fn main() {}
1102 /// ```
1103 pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1104 self.settings.set_header_table_size(Some(size));
1105 self
1106 }
1107
1108 /// Sets the first stream ID to something other than 1.
1109 #[cfg(feature = "unstable")]
1110 pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1111 self.stream_id = stream_id.into();
1112 assert!(
1113 self.stream_id.is_client_initiated(),
1114 "stream id must be odd"
1115 );
1116 self
1117 }
1118
1119 /// Creates a new configured HTTP/2 client backed by `io`.
1120 ///
1121 /// It is expected that `io` already be in an appropriate state to commence
1122 /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1123 /// preface and the initial settings frame is sent by the client.
1124 ///
1125 /// The handshake future does not wait for the initial settings frame from the
1126 /// server.
1127 ///
1128 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1129 /// tuple once the HTTP/2 handshake has been completed.
1130 ///
1131 /// This function also allows the caller to configure the send payload data
1132 /// type. See [Outbound data type] for more details.
1133 ///
1134 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1135 /// [`Connection`]: struct.Connection.html
1136 /// [`SendRequest`]: struct.SendRequest.html
1137 /// [Outbound data type]: ../index.html#outbound-data-type.
1138 ///
1139 /// # Examples
1140 ///
1141 /// Basic usage:
1142 ///
1143 /// ```
1144 /// # use tokio::io::{AsyncRead, AsyncWrite};
1145 /// # use h2::client::*;
1146 /// # use bytes::Bytes;
1147 /// #
1148 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1149 /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1150 /// # {
1151 /// // `client_fut` is a future representing the completion of the HTTP/2
1152 /// // handshake.
1153 /// let client_fut = Builder::new()
1154 /// .handshake(my_io);
1155 /// # client_fut.await
1156 /// # }
1157 /// #
1158 /// # pub fn main() {}
1159 /// ```
1160 ///
1161 /// Configures the send-payload data type. In this case, the outbound data
1162 /// type will be `&'static [u8]`.
1163 ///
1164 /// ```
1165 /// # use tokio::io::{AsyncRead, AsyncWrite};
1166 /// # use h2::client::*;
1167 /// #
1168 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1169 /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1170 /// # {
1171 /// // `client_fut` is a future representing the completion of the HTTP/2
1172 /// // handshake.
1173 /// let client_fut = Builder::new()
1174 /// .handshake::<_, &'static [u8]>(my_io);
1175 /// # client_fut.await
1176 /// # }
1177 /// #
1178 /// # pub fn main() {}
1179 /// ```
1180 pub fn handshake<T, B>(
1181 &self,
1182 io: T,
1183 ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1184 where
1185 T: AsyncRead + AsyncWrite + Unpin,
1186 B: Buf,
1187 {
1188 Connection::handshake2(io, self.clone())
1189 }
1190}
1191
1192impl Default for Builder {
1193 fn default() -> Builder {
1194 Builder::new()
1195 }
1196}
1197
1198/// Creates a new configured HTTP/2 client with default configuration
1199/// values backed by `io`.
1200///
1201/// It is expected that `io` already be in an appropriate state to commence
1202/// the [HTTP/2 handshake]. See [Handshake] for more details.
1203///
1204/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1205/// tuple once the HTTP/2 handshake has been completed. The returned
1206/// [`Connection`] instance will be using default configuration values. Use
1207/// [`Builder`] to customize the configuration values used by a [`Connection`]
1208/// instance.
1209///
1210/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1211/// [Handshake]: ../index.html#handshake
1212/// [`Connection`]: struct.Connection.html
1213/// [`SendRequest`]: struct.SendRequest.html
1214///
1215/// # Examples
1216///
1217/// ```
1218/// # use tokio::io::{AsyncRead, AsyncWrite};
1219/// # use h2::client;
1220/// # use h2::client::*;
1221/// #
1222/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1223/// # {
1224/// let (send_request, connection) = client::handshake(my_io).await?;
1225/// // The HTTP/2 handshake has completed, now start polling
1226/// // `connection` and use `send_request` to send requests to the
1227/// // server.
1228/// # Ok(())
1229/// # }
1230/// #
1231/// # pub fn main() {}
1232/// ```
1233pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1234where
1235 T: AsyncRead + AsyncWrite + Unpin,
1236{
1237 let builder: Builder = Builder::new();
1238 builderInstrumented>
1239 .handshake(io)
1240 .instrument(tracing::trace_span!("client_handshake"))
1241 .await
1242}
1243
1244// ===== impl Connection =====
1245
1246async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1247where
1248 T: AsyncRead + AsyncWrite + Unpin,
1249{
1250 tracing::debug!("binding client connection");
1251
1252 let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1253 io.write_all(msg).await.map_err(op:crate::Error::from_io)?;
1254
1255 tracing::debug!("client connection bound");
1256
1257 Ok(())
1258}
1259
1260impl<T, B> Connection<T, B>
1261where
1262 T: AsyncRead + AsyncWrite + Unpin,
1263 B: Buf,
1264{
1265 async fn handshake2(
1266 mut io: T,
1267 builder: Builder,
1268 ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1269 bind_connection(&mut io).await?;
1270
1271 // Create the codec
1272 let mut codec = Codec::new(io);
1273
1274 if let Some(max) = builder.settings.max_frame_size() {
1275 codec.set_max_recv_frame_size(max as usize);
1276 }
1277
1278 if let Some(max) = builder.settings.max_header_list_size() {
1279 codec.set_max_recv_header_list_size(max as usize);
1280 }
1281
1282 // Send initial settings frame
1283 codec
1284 .buffer(builder.settings.clone().into())
1285 .expect("invalid SETTINGS frame");
1286
1287 let inner = proto::Connection::new(
1288 codec,
1289 proto::Config {
1290 next_stream_id: builder.stream_id,
1291 initial_max_send_streams: builder.initial_max_send_streams,
1292 max_send_buffer_size: builder.max_send_buffer_size,
1293 reset_stream_duration: builder.reset_stream_duration,
1294 reset_stream_max: builder.reset_stream_max,
1295 remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1296 settings: builder.settings.clone(),
1297 },
1298 );
1299 let send_request = SendRequest {
1300 inner: inner.streams().clone(),
1301 pending: None,
1302 };
1303
1304 let mut connection = Connection { inner };
1305 if let Some(sz) = builder.initial_target_connection_window_size {
1306 connection.set_target_window_size(sz);
1307 }
1308
1309 Ok((send_request, connection))
1310 }
1311
1312 /// Sets the target window size for the whole connection.
1313 ///
1314 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1315 /// frame will be immediately sent to the remote, increasing the connection
1316 /// level window by `size - current_value`.
1317 ///
1318 /// If `size` is less than the current value, nothing will happen
1319 /// immediately. However, as window capacity is released by
1320 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1321 /// out until the number of "in flight" bytes drops below `size`.
1322 ///
1323 /// The default value is 65,535.
1324 ///
1325 /// See [`FlowControl`] documentation for more details.
1326 ///
1327 /// [`FlowControl`]: ../struct.FlowControl.html
1328 /// [library level]: ../index.html#flow-control
1329 pub fn set_target_window_size(&mut self, size: u32) {
1330 assert!(size <= proto::MAX_WINDOW_SIZE);
1331 self.inner.set_target_window_size(size);
1332 }
1333
1334 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1335 /// flow control for received data.
1336 ///
1337 /// The `SETTINGS` will be sent to the remote, and only applied once the
1338 /// remote acknowledges the change.
1339 ///
1340 /// This can be used to increase or decrease the window size for existing
1341 /// streams.
1342 ///
1343 /// # Errors
1344 ///
1345 /// Returns an error if a previous call is still pending acknowledgement
1346 /// from the remote endpoint.
1347 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1348 assert!(size <= proto::MAX_WINDOW_SIZE);
1349 self.inner.set_initial_window_size(size)?;
1350 Ok(())
1351 }
1352
1353 /// Takes a `PingPong` instance from the connection.
1354 ///
1355 /// # Note
1356 ///
1357 /// This may only be called once. Calling multiple times will return `None`.
1358 pub fn ping_pong(&mut self) -> Option<PingPong> {
1359 self.inner.take_user_pings().map(PingPong::new)
1360 }
1361
1362 /// Returns the maximum number of concurrent streams that may be initiated
1363 /// by this client.
1364 ///
1365 /// This limit is configured by the server peer by sending the
1366 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1367 /// This method returns the currently acknowledged value received from the
1368 /// remote.
1369 ///
1370 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1371 pub fn max_concurrent_send_streams(&self) -> usize {
1372 self.inner.max_send_streams()
1373 }
1374 /// Returns the maximum number of concurrent streams that may be initiated
1375 /// by the server on this connection.
1376 ///
1377 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1378 /// parameter][1] sent in a `SETTINGS` frame that has been
1379 /// acknowledged by the remote peer. The value to be sent is configured by
1380 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1381 /// with the remote peer.
1382 ///
1383 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1384 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1385 pub fn max_concurrent_recv_streams(&self) -> usize {
1386 self.inner.max_recv_streams()
1387 }
1388}
1389
1390impl<T, B> Future for Connection<T, B>
1391where
1392 T: AsyncRead + AsyncWrite + Unpin,
1393 B: Buf,
1394{
1395 type Output = Result<(), crate::Error>;
1396
1397 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1398 self.inner.maybe_close_connection_if_no_streams();
1399 self.inner.poll(cx).map_err(Into::into)
1400 }
1401}
1402
1403impl<T, B> fmt::Debug for Connection<T, B>
1404where
1405 T: AsyncRead + AsyncWrite,
1406 T: fmt::Debug,
1407 B: fmt::Debug + Buf,
1408{
1409 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1410 fmt::Debug::fmt(&self.inner, f:fmt)
1411 }
1412}
1413
1414// ===== impl ResponseFuture =====
1415
1416impl Future for ResponseFuture {
1417 type Output = Result<Response<RecvStream>, crate::Error>;
1418
1419 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1420 let (parts: Parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1421 let body: RecvStream = RecvStream::new(inner:FlowControl::new(self.inner.clone()));
1422
1423 Poll::Ready(Ok(Response::from_parts(parts, body)))
1424 }
1425}
1426
1427impl ResponseFuture {
1428 /// Returns the stream ID of the response stream.
1429 ///
1430 /// # Panics
1431 ///
1432 /// If the lock on the stream store has been poisoned.
1433 pub fn stream_id(&self) -> crate::StreamId {
1434 crate::StreamId::from_internal(self.inner.stream_id())
1435 }
1436 /// Returns a stream of PushPromises
1437 ///
1438 /// # Panics
1439 ///
1440 /// If this method has been called before
1441 /// or the stream was itself was pushed
1442 pub fn push_promises(&mut self) -> PushPromises {
1443 if self.push_promise_consumed {
1444 panic!("Reference to push promises stream taken!");
1445 }
1446 self.push_promise_consumed = true;
1447 PushPromises {
1448 inner: self.inner.clone(),
1449 }
1450 }
1451}
1452
1453// ===== impl PushPromises =====
1454
1455impl PushPromises {
1456 /// Get the next `PushPromise`.
1457 pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1458 futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
1459 }
1460
1461 #[doc(hidden)]
1462 pub fn poll_push_promise(
1463 &mut self,
1464 cx: &mut Context<'_>,
1465 ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1466 match self.inner.poll_pushed(cx) {
1467 Poll::Ready(Some(Ok((request, response)))) => {
1468 let response = PushedResponseFuture {
1469 inner: ResponseFuture {
1470 inner: response,
1471 push_promise_consumed: false,
1472 },
1473 };
1474 Poll::Ready(Some(Ok(PushPromise { request, response })))
1475 }
1476 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1477 Poll::Ready(None) => Poll::Ready(None),
1478 Poll::Pending => Poll::Pending,
1479 }
1480 }
1481}
1482
1483#[cfg(feature = "stream")]
1484impl futures_core::Stream for PushPromises {
1485 type Item = Result<PushPromise, crate::Error>;
1486
1487 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1488 self.poll_push_promise(cx)
1489 }
1490}
1491
1492// ===== impl PushPromise =====
1493
1494impl PushPromise {
1495 /// Returns a reference to the push promise's request headers.
1496 pub fn request(&self) -> &Request<()> {
1497 &self.request
1498 }
1499
1500 /// Returns a mutable reference to the push promise's request headers.
1501 pub fn request_mut(&mut self) -> &mut Request<()> {
1502 &mut self.request
1503 }
1504
1505 /// Consumes `self`, returning the push promise's request headers and
1506 /// response future.
1507 pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1508 (self.request, self.response)
1509 }
1510}
1511
1512// ===== impl PushedResponseFuture =====
1513
1514impl Future for PushedResponseFuture {
1515 type Output = Result<Response<RecvStream>, crate::Error>;
1516
1517 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1518 Pin::new(&mut self.inner).poll(cx)
1519 }
1520}
1521
1522impl PushedResponseFuture {
1523 /// Returns the stream ID of the response stream.
1524 ///
1525 /// # Panics
1526 ///
1527 /// If the lock on the stream store has been poisoned.
1528 pub fn stream_id(&self) -> crate::StreamId {
1529 self.inner.stream_id()
1530 }
1531}
1532
1533// ===== impl Peer =====
1534
1535impl Peer {
1536 pub fn convert_send_message(
1537 id: StreamId,
1538 request: Request<()>,
1539 protocol: Option<Protocol>,
1540 end_of_stream: bool,
1541 ) -> Result<Headers, SendError> {
1542 use http::request::Parts;
1543
1544 let (
1545 Parts {
1546 method,
1547 uri,
1548 headers,
1549 version,
1550 ..
1551 },
1552 _,
1553 ) = request.into_parts();
1554
1555 let is_connect = method == Method::CONNECT;
1556
1557 // Build the set pseudo header set. All requests will include `method`
1558 // and `path`.
1559 let mut pseudo = Pseudo::request(method, uri, protocol);
1560
1561 if pseudo.scheme.is_none() {
1562 // If the scheme is not set, then there are a two options.
1563 //
1564 // 1) Authority is not set. In this case, a request was issued with
1565 // a relative URI. This is permitted **only** when forwarding
1566 // HTTP 1.x requests. If the HTTP version is set to 2.0, then
1567 // this is an error.
1568 //
1569 // 2) Authority is set, then the HTTP method *must* be CONNECT.
1570 //
1571 // It is not possible to have a scheme but not an authority set (the
1572 // `http` crate does not allow it).
1573 //
1574 if pseudo.authority.is_none() {
1575 if version == Version::HTTP_2 {
1576 return Err(UserError::MissingUriSchemeAndAuthority.into());
1577 } else {
1578 // This is acceptable as per the above comment. However,
1579 // HTTP/2 requires that a scheme is set. Since we are
1580 // forwarding an HTTP 1.1 request, the scheme is set to
1581 // "http".
1582 pseudo.set_scheme(uri::Scheme::HTTP);
1583 }
1584 } else if !is_connect {
1585 // TODO: Error
1586 }
1587 }
1588
1589 // Create the HEADERS frame
1590 let mut frame = Headers::new(id, pseudo, headers);
1591
1592 if end_of_stream {
1593 frame.set_end_stream()
1594 }
1595
1596 Ok(frame)
1597 }
1598}
1599
1600impl proto::Peer for Peer {
1601 type Poll = Response<()>;
1602
1603 const NAME: &'static str = "Client";
1604
1605 fn r#dyn() -> proto::DynPeer {
1606 proto::DynPeer::Client
1607 }
1608
1609 fn is_server() -> bool {
1610 false
1611 }
1612
1613 fn convert_poll_message(
1614 pseudo: Pseudo,
1615 fields: HeaderMap,
1616 stream_id: StreamId,
1617 ) -> Result<Self::Poll, Error> {
1618 let mut b = Response::builder();
1619
1620 b = b.version(Version::HTTP_2);
1621
1622 if let Some(status) = pseudo.status {
1623 b = b.status(status);
1624 }
1625
1626 let mut response = match b.body(()) {
1627 Ok(response) => response,
1628 Err(_) => {
1629 // TODO: Should there be more specialized handling for different
1630 // kinds of errors
1631 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1632 }
1633 };
1634
1635 *response.headers_mut() = fields;
1636
1637 Ok(response)
1638 }
1639}
1640