1 | //! Client implementation of the HTTP/2 protocol. |
2 | //! |
3 | //! # Getting started |
4 | //! |
5 | //! Running an HTTP/2 client requires the caller to establish the underlying |
6 | //! connection as well as get the connection to a state that is ready to begin |
7 | //! the HTTP/2 handshake. See [here](../index.html#handshake) for more |
8 | //! details. |
9 | //! |
10 | //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote |
11 | //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades. |
12 | //! |
13 | //! Once a connection is obtained, it is passed to [`handshake`], which will |
14 | //! begin the [HTTP/2 handshake]. This returns a future that completes once |
15 | //! the handshake process is performed and HTTP/2 streams may be initialized. |
16 | //! |
17 | //! [`handshake`] uses default configuration values. There are a number of |
18 | //! settings that can be changed by using [`Builder`] instead. |
19 | //! |
20 | //! Once the handshake future completes, the caller is provided with a |
21 | //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`] |
22 | //! instance is used to drive the connection (see [Managing the connection]). |
23 | //! The [`SendRequest`] instance is used to initialize new streams (see [Making |
24 | //! requests]). |
25 | //! |
26 | //! # Making requests |
27 | //! |
28 | //! Requests are made using the [`SendRequest`] handle provided by the handshake |
29 | //! future. Once a request is submitted, an HTTP/2 stream is initialized and |
30 | //! the request is sent to the server. |
31 | //! |
32 | //! A request body and request trailers are sent using [`SendRequest`] and the |
33 | //! server's response is returned once the [`ResponseFuture`] future completes. |
34 | //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by |
35 | //! [`SendRequest::send_request`] and are tied to the HTTP/2 stream |
36 | //! initialized by the sent request. |
37 | //! |
38 | //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2 |
39 | //! stream can be created, i.e. as long as the current number of active streams |
40 | //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the |
41 | //! caller will be notified once an existing stream closes, freeing capacity for |
42 | //! the caller. The caller should use [`SendRequest::poll_ready`] to check for |
43 | //! capacity before sending a request to the server. |
44 | //! |
45 | //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user |
46 | //! must not send a request if `poll_ready` does not return `Ready`. Attempting |
47 | //! to do so will result in an [`Error`] being returned. |
48 | //! |
49 | //! # Managing the connection |
50 | //! |
51 | //! The [`Connection`] instance is used to manage connection state. The caller |
52 | //! is required to call [`Connection::poll`] in order to advance state. |
53 | //! [`SendRequest::send_request`] and other functions have no effect unless |
54 | //! [`Connection::poll`] is called. |
55 | //! |
56 | //! The [`Connection`] instance should only be dropped once [`Connection::poll`] |
57 | //! returns `Ready`. At this point, the underlying socket has been closed and no |
58 | //! further work needs to be done. |
59 | //! |
60 | //! The easiest way to ensure that the [`Connection`] instance gets polled is to |
61 | //! submit the [`Connection`] instance to an [executor]. The executor will then |
62 | //! manage polling the connection until the connection is complete. |
63 | //! Alternatively, the caller can call `poll` manually. |
64 | //! |
65 | //! # Example |
66 | //! |
67 | //! ```rust, no_run |
68 | //! |
69 | //! use h2::client; |
70 | //! |
71 | //! use http::{Request, Method}; |
72 | //! use std::error::Error; |
73 | //! use tokio::net::TcpStream; |
74 | //! |
75 | //! #[tokio::main] |
76 | //! pub async fn main() -> Result<(), Box<dyn Error>> { |
77 | //! // Establish TCP connection to the server. |
78 | //! let tcp = TcpStream::connect("127.0.0.1:5928" ).await?; |
79 | //! let (h2, connection) = client::handshake(tcp).await?; |
80 | //! tokio::spawn(async move { |
81 | //! connection.await.unwrap(); |
82 | //! }); |
83 | //! |
84 | //! let mut h2 = h2.ready().await?; |
85 | //! // Prepare the HTTP request to send to the server. |
86 | //! let request = Request::builder() |
87 | //! .method(Method::GET) |
88 | //! .uri("https://www.example.com/" ) |
89 | //! .body(()) |
90 | //! .unwrap(); |
91 | //! |
92 | //! // Send the request. The second tuple item allows the caller |
93 | //! // to stream a request body. |
94 | //! let (response, _) = h2.send_request(request, true).unwrap(); |
95 | //! |
96 | //! let (head, mut body) = response.await?.into_parts(); |
97 | //! |
98 | //! println!("Received response: {:?}" , head); |
99 | //! |
100 | //! // The `flow_control` handle allows the caller to manage |
101 | //! // flow control. |
102 | //! // |
103 | //! // Whenever data is received, the caller is responsible for |
104 | //! // releasing capacity back to the server once it has freed |
105 | //! // the data from memory. |
106 | //! let mut flow_control = body.flow_control().clone(); |
107 | //! |
108 | //! while let Some(chunk) = body.data().await { |
109 | //! let chunk = chunk?; |
110 | //! println!("RX: {:?}" , chunk); |
111 | //! |
112 | //! // Let the server send more data. |
113 | //! let _ = flow_control.release_capacity(chunk.len()); |
114 | //! } |
115 | //! |
116 | //! Ok(()) |
117 | //! } |
118 | //! ``` |
119 | //! |
120 | //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html |
121 | //! [`handshake`]: fn.handshake.html |
122 | //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html |
123 | //! [`SendRequest`]: struct.SendRequest.html |
124 | //! [`SendStream`]: ../struct.SendStream.html |
125 | //! [Making requests]: #making-requests |
126 | //! [Managing the connection]: #managing-the-connection |
127 | //! [`Connection`]: struct.Connection.html |
128 | //! [`Connection::poll`]: struct.Connection.html#method.poll |
129 | //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request |
130 | //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues |
131 | //! [`SendRequest`]: struct.SendRequest.html |
132 | //! [`ResponseFuture`]: struct.ResponseFuture.html |
133 | //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready |
134 | //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
135 | //! [`Builder`]: struct.Builder.html |
136 | //! [`Error`]: ../struct.Error.html |
137 | |
138 | use crate::codec::{Codec, SendError, UserError}; |
139 | use crate::ext::Protocol; |
140 | use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; |
141 | use crate::proto::{self, Error}; |
142 | use crate::{FlowControl, PingPong, RecvStream, SendStream}; |
143 | |
144 | use bytes::{Buf, Bytes}; |
145 | use http::{uri, HeaderMap, Method, Request, Response, Version}; |
146 | use std::fmt; |
147 | use std::future::Future; |
148 | use std::pin::Pin; |
149 | use std::task::{Context, Poll}; |
150 | use std::time::Duration; |
151 | use std::usize; |
152 | use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; |
153 | use tracing::Instrument; |
154 | |
155 | /// Initializes new HTTP/2 streams on a connection by sending a request. |
156 | /// |
157 | /// This type does no work itself. Instead, it is a handle to the inner |
158 | /// connection state held by [`Connection`]. If the associated connection |
159 | /// instance is dropped, all `SendRequest` functions will return [`Error`]. |
160 | /// |
161 | /// [`SendRequest`] instances are able to move to and operate on separate tasks |
162 | /// / threads than their associated [`Connection`] instance. Internally, there |
163 | /// is a buffer used to stage requests before they get written to the |
164 | /// connection. There is no guarantee that requests get written to the |
165 | /// connection in FIFO order as HTTP/2 prioritization logic can play a role. |
166 | /// |
167 | /// [`SendRequest`] implements [`Clone`], enabling the creation of many |
168 | /// instances that are backed by a single connection. |
169 | /// |
170 | /// See [module] level documentation for more details. |
171 | /// |
172 | /// [module]: index.html |
173 | /// [`Connection`]: struct.Connection.html |
174 | /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html |
175 | /// [`Error`]: ../struct.Error.html |
176 | pub struct SendRequest<B: Buf> { |
177 | inner: proto::Streams<B, Peer>, |
178 | pending: Option<proto::OpaqueStreamRef>, |
179 | } |
180 | |
181 | /// Returns a `SendRequest` instance once it is ready to send at least one |
182 | /// request. |
183 | #[derive (Debug)] |
184 | pub struct ReadySendRequest<B: Buf> { |
185 | inner: Option<SendRequest<B>>, |
186 | } |
187 | |
188 | /// Manages all state associated with an HTTP/2 client connection. |
189 | /// |
190 | /// A `Connection` is backed by an I/O resource (usually a TCP socket) and |
191 | /// implements the HTTP/2 client logic for that connection. It is responsible |
192 | /// for driving the internal state forward, performing the work requested of the |
193 | /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`], |
194 | /// [`RecvStream`]). |
195 | /// |
196 | /// `Connection` values are created by calling [`handshake`]. Once a |
197 | /// `Connection` value is obtained, the caller must repeatedly call [`poll`] |
198 | /// until `Ready` is returned. The easiest way to do this is to submit the |
199 | /// `Connection` instance to an [executor]. |
200 | /// |
201 | /// [module]: index.html |
202 | /// [`handshake`]: fn.handshake.html |
203 | /// [`SendRequest`]: struct.SendRequest.html |
204 | /// [`ResponseFuture`]: struct.ResponseFuture.html |
205 | /// [`SendStream`]: ../struct.SendStream.html |
206 | /// [`RecvStream`]: ../struct.RecvStream.html |
207 | /// [`poll`]: #method.poll |
208 | /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html |
209 | /// |
210 | /// # Examples |
211 | /// |
212 | /// ``` |
213 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
214 | /// # use h2::client; |
215 | /// # use h2::client::*; |
216 | /// # |
217 | /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error> |
218 | /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, |
219 | /// # { |
220 | /// let (send_request, connection) = client::handshake(my_io).await?; |
221 | /// // Submit the connection handle to an executor. |
222 | /// tokio::spawn(async { connection.await.expect("connection failed" ); }); |
223 | /// |
224 | /// // Now, use `send_request` to initialize HTTP/2 streams. |
225 | /// // ... |
226 | /// # Ok(()) |
227 | /// # } |
228 | /// # |
229 | /// # pub fn main() {} |
230 | /// ``` |
231 | #[must_use = "futures do nothing unless polled" ] |
232 | pub struct Connection<T, B: Buf = Bytes> { |
233 | inner: proto::Connection<T, Peer, B>, |
234 | } |
235 | |
236 | /// A future of an HTTP response. |
237 | #[derive (Debug)] |
238 | #[must_use = "futures do nothing unless polled" ] |
239 | pub struct ResponseFuture { |
240 | inner: proto::OpaqueStreamRef, |
241 | push_promise_consumed: bool, |
242 | } |
243 | |
244 | /// A future of a pushed HTTP response. |
245 | /// |
246 | /// We have to differentiate between pushed and non pushed because of the spec |
247 | /// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE> |
248 | /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream |
249 | /// > that is in either the "open" or "half-closed (remote)" state. |
250 | #[derive (Debug)] |
251 | #[must_use = "futures do nothing unless polled" ] |
252 | pub struct PushedResponseFuture { |
253 | inner: ResponseFuture, |
254 | } |
255 | |
256 | /// A pushed response and corresponding request headers |
257 | #[derive (Debug)] |
258 | pub struct PushPromise { |
259 | /// The request headers |
260 | request: Request<()>, |
261 | |
262 | /// The pushed response |
263 | response: PushedResponseFuture, |
264 | } |
265 | |
266 | /// A stream of pushed responses and corresponding promised requests |
267 | #[derive (Debug)] |
268 | #[must_use = "streams do nothing unless polled" ] |
269 | pub struct PushPromises { |
270 | inner: proto::OpaqueStreamRef, |
271 | } |
272 | |
273 | /// Builds client connections with custom configuration values. |
274 | /// |
275 | /// Methods can be chained in order to set the configuration values. |
276 | /// |
277 | /// The client is constructed by calling [`handshake`] and passing the I/O |
278 | /// handle that will back the HTTP/2 server. |
279 | /// |
280 | /// New instances of `Builder` are obtained via [`Builder::new`]. |
281 | /// |
282 | /// See function level documentation for details on the various client |
283 | /// configuration settings. |
284 | /// |
285 | /// [`Builder::new`]: struct.Builder.html#method.new |
286 | /// [`handshake`]: struct.Builder.html#method.handshake |
287 | /// |
288 | /// # Examples |
289 | /// |
290 | /// ``` |
291 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
292 | /// # use h2::client::*; |
293 | /// # use bytes::Bytes; |
294 | /// # |
295 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
296 | /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
297 | /// # { |
298 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
299 | /// // handshake. |
300 | /// let client_fut = Builder::new() |
301 | /// .initial_window_size(1_000_000) |
302 | /// .max_concurrent_streams(1000) |
303 | /// .handshake(my_io); |
304 | /// # client_fut.await |
305 | /// # } |
306 | /// # |
307 | /// # pub fn main() {} |
308 | /// ``` |
309 | #[derive (Clone, Debug)] |
310 | pub struct Builder { |
311 | /// Time to keep locally reset streams around before reaping. |
312 | reset_stream_duration: Duration, |
313 | |
314 | /// Initial maximum number of locally initiated (send) streams. |
315 | /// After receiving a Settings frame from the remote peer, |
316 | /// the connection will overwrite this value with the |
317 | /// MAX_CONCURRENT_STREAMS specified in the frame. |
318 | initial_max_send_streams: usize, |
319 | |
320 | /// Initial target window size for new connections. |
321 | initial_target_connection_window_size: Option<u32>, |
322 | |
323 | /// Maximum amount of bytes to "buffer" for writing per stream. |
324 | max_send_buffer_size: usize, |
325 | |
326 | /// Maximum number of locally reset streams to keep at a time. |
327 | reset_stream_max: usize, |
328 | |
329 | /// Maximum number of remotely reset streams to allow in the pending |
330 | /// accept queue. |
331 | pending_accept_reset_stream_max: usize, |
332 | |
333 | /// Initial `Settings` frame to send as part of the handshake. |
334 | settings: Settings, |
335 | |
336 | /// The stream ID of the first (lowest) stream. Subsequent streams will use |
337 | /// monotonically increasing stream IDs. |
338 | stream_id: StreamId, |
339 | |
340 | /// Maximum number of locally reset streams due to protocol error across |
341 | /// the lifetime of the connection. |
342 | /// |
343 | /// When this gets exceeded, we issue GOAWAYs. |
344 | local_max_error_reset_streams: Option<usize>, |
345 | } |
346 | |
347 | #[derive (Debug)] |
348 | pub(crate) struct Peer; |
349 | |
350 | // ===== impl SendRequest ===== |
351 | |
352 | impl<B> SendRequest<B> |
353 | where |
354 | B: Buf, |
355 | { |
356 | /// Returns `Ready` when the connection can initialize a new HTTP/2 |
357 | /// stream. |
358 | /// |
359 | /// This function must return `Ready` before `send_request` is called. When |
360 | /// `Poll::Pending` is returned, the task will be notified once the readiness |
361 | /// state changes. |
362 | /// |
363 | /// See [module] level docs for more details. |
364 | /// |
365 | /// [module]: index.html |
366 | pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { |
367 | ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?; |
368 | self.pending = None; |
369 | Poll::Ready(Ok(())) |
370 | } |
371 | |
372 | /// Consumes `self`, returning a future that returns `self` back once it is |
373 | /// ready to send a request. |
374 | /// |
375 | /// This function should be called before calling `send_request`. |
376 | /// |
377 | /// This is a functional combinator for [`poll_ready`]. The returned future |
378 | /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to |
379 | /// the caller. |
380 | /// |
381 | /// # Examples |
382 | /// |
383 | /// ```rust |
384 | /// # use h2::client::*; |
385 | /// # use http::*; |
386 | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
387 | /// # { |
388 | /// // First, wait until the `send_request` handle is ready to send a new |
389 | /// // request |
390 | /// let mut send_request = send_request.ready().await.unwrap(); |
391 | /// // Use `send_request` here. |
392 | /// # } |
393 | /// # pub fn main() {} |
394 | /// ``` |
395 | /// |
396 | /// See [module] level docs for more details. |
397 | /// |
398 | /// [`poll_ready`]: #method.poll_ready |
399 | /// [module]: index.html |
400 | pub fn ready(self) -> ReadySendRequest<B> { |
401 | ReadySendRequest { inner: Some(self) } |
402 | } |
403 | |
404 | /// Sends a HTTP/2 request to the server. |
405 | /// |
406 | /// `send_request` initializes a new HTTP/2 stream on the associated |
407 | /// connection, then sends the given request using this new stream. Only the |
408 | /// request head is sent. |
409 | /// |
410 | /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance |
411 | /// are returned. The [`ResponseFuture`] instance is used to get the |
412 | /// server's response and the [`SendStream`] instance is used to send a |
413 | /// request body or trailers to the server over the same HTTP/2 stream. |
414 | /// |
415 | /// To send a request body or trailers, set `end_of_stream` to `false`. |
416 | /// Then, use the returned [`SendStream`] instance to stream request body |
417 | /// chunks or send trailers. If `end_of_stream` is **not** set to `false` |
418 | /// then attempting to call [`SendStream::send_data`] or |
419 | /// [`SendStream::send_trailers`] will result in an error. |
420 | /// |
421 | /// If no request body or trailers are to be sent, set `end_of_stream` to |
422 | /// `true` and drop the returned [`SendStream`] instance. |
423 | /// |
424 | /// # A note on HTTP versions |
425 | /// |
426 | /// The provided `Request` will be encoded differently depending on the |
427 | /// value of its version field. If the version is set to 2.0, then the |
428 | /// request is encoded as per the specification recommends. |
429 | /// |
430 | /// If the version is set to a lower value, then the request is encoded to |
431 | /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host |
432 | /// headers are permitted and the `:authority` pseudo header is not |
433 | /// included. |
434 | /// |
435 | /// The caller should always set the request's version field to 2.0 unless |
436 | /// specifically transmitting an HTTP 1.1 request over 2.0. |
437 | /// |
438 | /// # Examples |
439 | /// |
440 | /// Sending a request with no body |
441 | /// |
442 | /// ```rust |
443 | /// # use h2::client::*; |
444 | /// # use http::*; |
445 | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
446 | /// # { |
447 | /// // First, wait until the `send_request` handle is ready to send a new |
448 | /// // request |
449 | /// let mut send_request = send_request.ready().await.unwrap(); |
450 | /// // Prepare the HTTP request to send to the server. |
451 | /// let request = Request::get("https://www.example.com/" ) |
452 | /// .body(()) |
453 | /// .unwrap(); |
454 | /// |
455 | /// // Send the request to the server. Since we are not sending a |
456 | /// // body or trailers, we can drop the `SendStream` instance. |
457 | /// let (response, _) = send_request.send_request(request, true).unwrap(); |
458 | /// let response = response.await.unwrap(); |
459 | /// // Process the response |
460 | /// # } |
461 | /// # pub fn main() {} |
462 | /// ``` |
463 | /// |
464 | /// Sending a request with a body and trailers |
465 | /// |
466 | /// ```rust |
467 | /// # use h2::client::*; |
468 | /// # use http::*; |
469 | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
470 | /// # { |
471 | /// // First, wait until the `send_request` handle is ready to send a new |
472 | /// // request |
473 | /// let mut send_request = send_request.ready().await.unwrap(); |
474 | /// |
475 | /// // Prepare the HTTP request to send to the server. |
476 | /// let request = Request::get("https://www.example.com/" ) |
477 | /// .body(()) |
478 | /// .unwrap(); |
479 | /// |
480 | /// // Send the request to the server. If we are not sending a |
481 | /// // body or trailers, we can drop the `SendStream` instance. |
482 | /// let (response, mut send_stream) = send_request |
483 | /// .send_request(request, false).unwrap(); |
484 | /// |
485 | /// // At this point, one option would be to wait for send capacity. |
486 | /// // Doing so would allow us to not hold data in memory that |
487 | /// // cannot be sent. However, this is not a requirement, so this |
488 | /// // example will skip that step. See `SendStream` documentation |
489 | /// // for more details. |
490 | /// send_stream.send_data(b"hello" , false).unwrap(); |
491 | /// send_stream.send_data(b"world" , false).unwrap(); |
492 | /// |
493 | /// // Send the trailers. |
494 | /// let mut trailers = HeaderMap::new(); |
495 | /// trailers.insert( |
496 | /// header::HeaderName::from_bytes(b"my-trailer" ).unwrap(), |
497 | /// header::HeaderValue::from_bytes(b"hello" ).unwrap()); |
498 | /// |
499 | /// send_stream.send_trailers(trailers).unwrap(); |
500 | /// |
501 | /// let response = response.await.unwrap(); |
502 | /// // Process the response |
503 | /// # } |
504 | /// # pub fn main() {} |
505 | /// ``` |
506 | /// |
507 | /// [`ResponseFuture`]: struct.ResponseFuture.html |
508 | /// [`SendStream`]: ../struct.SendStream.html |
509 | /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data |
510 | /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers |
511 | pub fn send_request( |
512 | &mut self, |
513 | request: Request<()>, |
514 | end_of_stream: bool, |
515 | ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> { |
516 | self.inner |
517 | .send_request(request, end_of_stream, self.pending.as_ref()) |
518 | .map_err(Into::into) |
519 | .map(|(stream, is_full)| { |
520 | if stream.is_pending_open() && is_full { |
521 | // Only prevent sending another request when the request queue |
522 | // is not full. |
523 | self.pending = Some(stream.clone_to_opaque()); |
524 | } |
525 | |
526 | let response = ResponseFuture { |
527 | inner: stream.clone_to_opaque(), |
528 | push_promise_consumed: false, |
529 | }; |
530 | |
531 | let stream = SendStream::new(stream); |
532 | |
533 | (response, stream) |
534 | }) |
535 | } |
536 | |
537 | /// Returns whether the [extended CONNECT protocol][1] is enabled or not. |
538 | /// |
539 | /// This setting is configured by the server peer by sending the |
540 | /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. |
541 | /// This method returns the currently acknowledged value received from the |
542 | /// remote. |
543 | /// |
544 | /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 |
545 | /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 |
546 | pub fn is_extended_connect_protocol_enabled(&self) -> bool { |
547 | self.inner.is_extended_connect_protocol_enabled() |
548 | } |
549 | } |
550 | |
551 | impl<B> fmt::Debug for SendRequest<B> |
552 | where |
553 | B: Buf, |
554 | { |
555 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
556 | fmt.debug_struct(name:"SendRequest" ).finish() |
557 | } |
558 | } |
559 | |
560 | impl<B> Clone for SendRequest<B> |
561 | where |
562 | B: Buf, |
563 | { |
564 | fn clone(&self) -> Self { |
565 | SendRequest { |
566 | inner: self.inner.clone(), |
567 | pending: None, |
568 | } |
569 | } |
570 | } |
571 | |
572 | #[cfg (feature = "unstable" )] |
573 | impl<B> SendRequest<B> |
574 | where |
575 | B: Buf, |
576 | { |
577 | /// Returns the number of active streams. |
578 | /// |
579 | /// An active stream is a stream that has not yet transitioned to a closed |
580 | /// state. |
581 | pub fn num_active_streams(&self) -> usize { |
582 | self.inner.num_active_streams() |
583 | } |
584 | |
585 | /// Returns the number of streams that are held in memory. |
586 | /// |
587 | /// A wired stream is a stream that is either active or is closed but must |
588 | /// stay in memory for some reason. For example, there are still outstanding |
589 | /// userspace handles pointing to the slot. |
590 | pub fn num_wired_streams(&self) -> usize { |
591 | self.inner.num_wired_streams() |
592 | } |
593 | } |
594 | |
595 | // ===== impl ReadySendRequest ===== |
596 | |
597 | impl<B> Future for ReadySendRequest<B> |
598 | where |
599 | B: Buf, |
600 | { |
601 | type Output = Result<SendRequest<B>, crate::Error>; |
602 | |
603 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
604 | match &mut self.inner { |
605 | Some(send_request: &mut SendRequest) => { |
606 | ready!(send_request.poll_ready(cx))?; |
607 | } |
608 | None => panic!("called `poll` after future completed" ), |
609 | } |
610 | |
611 | Poll::Ready(Ok(self.inner.take().unwrap())) |
612 | } |
613 | } |
614 | |
615 | // ===== impl Builder ===== |
616 | |
617 | impl Builder { |
618 | /// Returns a new client builder instance initialized with default |
619 | /// configuration values. |
620 | /// |
621 | /// Configuration methods can be chained on the return value. |
622 | /// |
623 | /// # Examples |
624 | /// |
625 | /// ``` |
626 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
627 | /// # use h2::client::*; |
628 | /// # use bytes::Bytes; |
629 | /// # |
630 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
631 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
632 | /// # { |
633 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
634 | /// // handshake. |
635 | /// let client_fut = Builder::new() |
636 | /// .initial_window_size(1_000_000) |
637 | /// .max_concurrent_streams(1000) |
638 | /// .handshake(my_io); |
639 | /// # client_fut.await |
640 | /// # } |
641 | /// # |
642 | /// # pub fn main() {} |
643 | /// ``` |
644 | pub fn new() -> Builder { |
645 | Builder { |
646 | max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, |
647 | reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), |
648 | reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, |
649 | pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, |
650 | initial_target_connection_window_size: None, |
651 | initial_max_send_streams: usize::MAX, |
652 | settings: Default::default(), |
653 | stream_id: 1.into(), |
654 | local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), |
655 | } |
656 | } |
657 | |
658 | /// Indicates the initial window size (in octets) for stream-level |
659 | /// flow control for received data. |
660 | /// |
661 | /// The initial window of a stream is used as part of flow control. For more |
662 | /// details, see [`FlowControl`]. |
663 | /// |
664 | /// The default value is 65,535. |
665 | /// |
666 | /// [`FlowControl`]: ../struct.FlowControl.html |
667 | /// |
668 | /// # Examples |
669 | /// |
670 | /// ``` |
671 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
672 | /// # use h2::client::*; |
673 | /// # use bytes::Bytes; |
674 | /// # |
675 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
676 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
677 | /// # { |
678 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
679 | /// // handshake. |
680 | /// let client_fut = Builder::new() |
681 | /// .initial_window_size(1_000_000) |
682 | /// .handshake(my_io); |
683 | /// # client_fut.await |
684 | /// # } |
685 | /// # |
686 | /// # pub fn main() {} |
687 | /// ``` |
688 | pub fn initial_window_size(&mut self, size: u32) -> &mut Self { |
689 | self.settings.set_initial_window_size(Some(size)); |
690 | self |
691 | } |
692 | |
693 | /// Indicates the initial window size (in octets) for connection-level flow control |
694 | /// for received data. |
695 | /// |
696 | /// The initial window of a connection is used as part of flow control. For more details, |
697 | /// see [`FlowControl`]. |
698 | /// |
699 | /// The default value is 65,535. |
700 | /// |
701 | /// [`FlowControl`]: ../struct.FlowControl.html |
702 | /// |
703 | /// # Examples |
704 | /// |
705 | /// ``` |
706 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
707 | /// # use h2::client::*; |
708 | /// # use bytes::Bytes; |
709 | /// # |
710 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
711 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
712 | /// # { |
713 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
714 | /// // handshake. |
715 | /// let client_fut = Builder::new() |
716 | /// .initial_connection_window_size(1_000_000) |
717 | /// .handshake(my_io); |
718 | /// # client_fut.await |
719 | /// # } |
720 | /// # |
721 | /// # pub fn main() {} |
722 | /// ``` |
723 | pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self { |
724 | self.initial_target_connection_window_size = Some(size); |
725 | self |
726 | } |
727 | |
728 | /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the |
729 | /// configured client is able to accept. |
730 | /// |
731 | /// The sender may send data frames that are **smaller** than this value, |
732 | /// but any data larger than `max` will be broken up into multiple `DATA` |
733 | /// frames. |
734 | /// |
735 | /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384. |
736 | /// |
737 | /// # Examples |
738 | /// |
739 | /// ``` |
740 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
741 | /// # use h2::client::*; |
742 | /// # use bytes::Bytes; |
743 | /// # |
744 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
745 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
746 | /// # { |
747 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
748 | /// // handshake. |
749 | /// let client_fut = Builder::new() |
750 | /// .max_frame_size(1_000_000) |
751 | /// .handshake(my_io); |
752 | /// # client_fut.await |
753 | /// # } |
754 | /// # |
755 | /// # pub fn main() {} |
756 | /// ``` |
757 | /// |
758 | /// # Panics |
759 | /// |
760 | /// This function panics if `max` is not within the legal range specified |
761 | /// above. |
762 | pub fn max_frame_size(&mut self, max: u32) -> &mut Self { |
763 | self.settings.set_max_frame_size(Some(max)); |
764 | self |
765 | } |
766 | |
767 | /// Sets the max size of received header frames. |
768 | /// |
769 | /// This advisory setting informs a peer of the maximum size of header list |
770 | /// that the sender is prepared to accept, in octets. The value is based on |
771 | /// the uncompressed size of header fields, including the length of the name |
772 | /// and value in octets plus an overhead of 32 octets for each header field. |
773 | /// |
774 | /// This setting is also used to limit the maximum amount of data that is |
775 | /// buffered to decode HEADERS frames. |
776 | /// |
777 | /// # Examples |
778 | /// |
779 | /// ``` |
780 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
781 | /// # use h2::client::*; |
782 | /// # use bytes::Bytes; |
783 | /// # |
784 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
785 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
786 | /// # { |
787 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
788 | /// // handshake. |
789 | /// let client_fut = Builder::new() |
790 | /// .max_header_list_size(16 * 1024) |
791 | /// .handshake(my_io); |
792 | /// # client_fut.await |
793 | /// # } |
794 | /// # |
795 | /// # pub fn main() {} |
796 | /// ``` |
797 | pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { |
798 | self.settings.set_max_header_list_size(Some(max)); |
799 | self |
800 | } |
801 | |
802 | /// Sets the maximum number of concurrent streams. |
803 | /// |
804 | /// The maximum concurrent streams setting only controls the maximum number |
805 | /// of streams that can be initiated by the remote peer. In other words, |
806 | /// when this setting is set to 100, this does not limit the number of |
807 | /// concurrent streams that can be created by the caller. |
808 | /// |
809 | /// It is recommended that this value be no smaller than 100, so as to not |
810 | /// unnecessarily limit parallelism. However, any value is legal, including |
811 | /// 0. If `max` is set to 0, then the remote will not be permitted to |
812 | /// initiate streams. |
813 | /// |
814 | /// Note that streams in the reserved state, i.e., push promises that have |
815 | /// been reserved but the stream has not started, do not count against this |
816 | /// setting. |
817 | /// |
818 | /// Also note that if the remote *does* exceed the value set here, it is not |
819 | /// a protocol level error. Instead, the `h2` library will immediately reset |
820 | /// the stream. |
821 | /// |
822 | /// See [Section 5.1.2] in the HTTP/2 spec for more details. |
823 | /// |
824 | /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 |
825 | /// |
826 | /// # Examples |
827 | /// |
828 | /// ``` |
829 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
830 | /// # use h2::client::*; |
831 | /// # use bytes::Bytes; |
832 | /// # |
833 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
834 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
835 | /// # { |
836 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
837 | /// // handshake. |
838 | /// let client_fut = Builder::new() |
839 | /// .max_concurrent_streams(1000) |
840 | /// .handshake(my_io); |
841 | /// # client_fut.await |
842 | /// # } |
843 | /// # |
844 | /// # pub fn main() {} |
845 | /// ``` |
846 | pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self { |
847 | self.settings.set_max_concurrent_streams(Some(max)); |
848 | self |
849 | } |
850 | |
851 | /// Sets the initial maximum of locally initiated (send) streams. |
852 | /// |
853 | /// The initial settings will be overwritten by the remote peer when |
854 | /// the Settings frame is received. The new value will be set to the |
855 | /// `max_concurrent_streams()` from the frame. |
856 | /// |
857 | /// This setting prevents the caller from exceeding this number of |
858 | /// streams that are counted towards the concurrency limit. |
859 | /// |
860 | /// Sending streams past the limit returned by the peer will be treated |
861 | /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM. |
862 | /// |
863 | /// See [Section 5.1.2] in the HTTP/2 spec for more details. |
864 | /// |
865 | /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 |
866 | /// |
867 | /// # Examples |
868 | /// |
869 | /// ``` |
870 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
871 | /// # use h2::client::*; |
872 | /// # use bytes::Bytes; |
873 | /// # |
874 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
875 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
876 | /// # { |
877 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
878 | /// // handshake. |
879 | /// let client_fut = Builder::new() |
880 | /// .initial_max_send_streams(1000) |
881 | /// .handshake(my_io); |
882 | /// # client_fut.await |
883 | /// # } |
884 | /// # |
885 | /// # pub fn main() {} |
886 | /// ``` |
887 | pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self { |
888 | self.initial_max_send_streams = initial; |
889 | self |
890 | } |
891 | |
892 | /// Sets the maximum number of concurrent locally reset streams. |
893 | /// |
894 | /// When a stream is explicitly reset, the HTTP/2 specification requires |
895 | /// that any further frames received for that stream must be ignored for |
896 | /// "some time". |
897 | /// |
898 | /// In order to satisfy the specification, internal state must be maintained |
899 | /// to implement the behavior. This state grows linearly with the number of |
900 | /// streams that are locally reset. |
901 | /// |
902 | /// The `max_concurrent_reset_streams` setting configures sets an upper |
903 | /// bound on the amount of state that is maintained. When this max value is |
904 | /// reached, the oldest reset stream is purged from memory. |
905 | /// |
906 | /// Once the stream has been fully purged from memory, any additional frames |
907 | /// received for that stream will result in a connection level protocol |
908 | /// error, forcing the connection to terminate. |
909 | /// |
910 | /// The default value is 10. |
911 | /// |
912 | /// # Examples |
913 | /// |
914 | /// ``` |
915 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
916 | /// # use h2::client::*; |
917 | /// # use bytes::Bytes; |
918 | /// # |
919 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
920 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
921 | /// # { |
922 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
923 | /// // handshake. |
924 | /// let client_fut = Builder::new() |
925 | /// .max_concurrent_reset_streams(1000) |
926 | /// .handshake(my_io); |
927 | /// # client_fut.await |
928 | /// # } |
929 | /// # |
930 | /// # pub fn main() {} |
931 | /// ``` |
932 | pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { |
933 | self.reset_stream_max = max; |
934 | self |
935 | } |
936 | |
937 | /// Sets the duration to remember locally reset streams. |
938 | /// |
939 | /// When a stream is explicitly reset, the HTTP/2 specification requires |
940 | /// that any further frames received for that stream must be ignored for |
941 | /// "some time". |
942 | /// |
943 | /// In order to satisfy the specification, internal state must be maintained |
944 | /// to implement the behavior. This state grows linearly with the number of |
945 | /// streams that are locally reset. |
946 | /// |
947 | /// The `reset_stream_duration` setting configures the max amount of time |
948 | /// this state will be maintained in memory. Once the duration elapses, the |
949 | /// stream state is purged from memory. |
950 | /// |
951 | /// Once the stream has been fully purged from memory, any additional frames |
952 | /// received for that stream will result in a connection level protocol |
953 | /// error, forcing the connection to terminate. |
954 | /// |
955 | /// The default value is 30 seconds. |
956 | /// |
957 | /// # Examples |
958 | /// |
959 | /// ``` |
960 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
961 | /// # use h2::client::*; |
962 | /// # use std::time::Duration; |
963 | /// # use bytes::Bytes; |
964 | /// # |
965 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
966 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
967 | /// # { |
968 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
969 | /// // handshake. |
970 | /// let client_fut = Builder::new() |
971 | /// .reset_stream_duration(Duration::from_secs(10)) |
972 | /// .handshake(my_io); |
973 | /// # client_fut.await |
974 | /// # } |
975 | /// # |
976 | /// # pub fn main() {} |
977 | /// ``` |
978 | pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self { |
979 | self.reset_stream_duration = dur; |
980 | self |
981 | } |
982 | |
983 | /// Sets the maximum number of local resets due to protocol errors made by the remote end. |
984 | /// |
985 | /// Invalid frames and many other protocol errors will lead to resets being generated for those streams. |
986 | /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers. |
987 | /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate. |
988 | /// |
989 | /// When the number of local resets exceeds this threshold, the client will close the connection. |
990 | /// |
991 | /// If you really want to disable this, supply [`Option::None`] here. |
992 | /// Disabling this is not recommended and may expose you to DOS attacks. |
993 | /// |
994 | /// The default value is currently 1024, but could change. |
995 | pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self { |
996 | self.local_max_error_reset_streams = max; |
997 | self |
998 | } |
999 | |
1000 | /// Sets the maximum number of pending-accept remotely-reset streams. |
1001 | /// |
1002 | /// Streams that have been received by the peer, but not accepted by the |
1003 | /// user, can also receive a RST_STREAM. This is a legitimate pattern: one |
1004 | /// could send a request and then shortly after, realize it is not needed, |
1005 | /// sending a CANCEL. |
1006 | /// |
1007 | /// However, since those streams are now "closed", they don't count towards |
1008 | /// the max concurrent streams. So, they will sit in the accept queue, |
1009 | /// using memory. |
1010 | /// |
1011 | /// When the number of remotely-reset streams sitting in the pending-accept |
1012 | /// queue reaches this maximum value, a connection error with the code of |
1013 | /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the |
1014 | /// `Future`. |
1015 | /// |
1016 | /// The default value is currently 20, but could change. |
1017 | /// |
1018 | /// # Examples |
1019 | /// |
1020 | /// ``` |
1021 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1022 | /// # use h2::client::*; |
1023 | /// # use bytes::Bytes; |
1024 | /// # |
1025 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1026 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1027 | /// # { |
1028 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1029 | /// // handshake. |
1030 | /// let client_fut = Builder::new() |
1031 | /// .max_pending_accept_reset_streams(100) |
1032 | /// .handshake(my_io); |
1033 | /// # client_fut.await |
1034 | /// # } |
1035 | /// # |
1036 | /// # pub fn main() {} |
1037 | /// ``` |
1038 | pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self { |
1039 | self.pending_accept_reset_stream_max = max; |
1040 | self |
1041 | } |
1042 | |
1043 | /// Sets the maximum send buffer size per stream. |
1044 | /// |
1045 | /// Once a stream has buffered up to (or over) the maximum, the stream's |
1046 | /// flow control will not "poll" additional capacity. Once bytes for the |
1047 | /// stream have been written to the connection, the send buffer capacity |
1048 | /// will be freed up again. |
1049 | /// |
1050 | /// The default is currently ~400KB, but may change. |
1051 | /// |
1052 | /// # Panics |
1053 | /// |
1054 | /// This function panics if `max` is larger than `u32::MAX`. |
1055 | pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { |
1056 | assert!(max <= std::u32::MAX as usize); |
1057 | self.max_send_buffer_size = max; |
1058 | self |
1059 | } |
1060 | |
1061 | /// Enables or disables server push promises. |
1062 | /// |
1063 | /// This value is included in the initial SETTINGS handshake. |
1064 | /// Setting this value to value to |
1065 | /// false in the initial SETTINGS handshake guarantees that the remote server |
1066 | /// will never send a push promise. |
1067 | /// |
1068 | /// This setting can be changed during the life of a single HTTP/2 |
1069 | /// connection by sending another settings frame updating the value. |
1070 | /// |
1071 | /// Default value: `true`. |
1072 | /// |
1073 | /// # Examples |
1074 | /// |
1075 | /// ``` |
1076 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1077 | /// # use h2::client::*; |
1078 | /// # use std::time::Duration; |
1079 | /// # use bytes::Bytes; |
1080 | /// # |
1081 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1082 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1083 | /// # { |
1084 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1085 | /// // handshake. |
1086 | /// let client_fut = Builder::new() |
1087 | /// .enable_push(false) |
1088 | /// .handshake(my_io); |
1089 | /// # client_fut.await |
1090 | /// # } |
1091 | /// # |
1092 | /// # pub fn main() {} |
1093 | /// ``` |
1094 | pub fn enable_push(&mut self, enabled: bool) -> &mut Self { |
1095 | self.settings.set_enable_push(enabled); |
1096 | self |
1097 | } |
1098 | |
1099 | /// Sets the header table size. |
1100 | /// |
1101 | /// This setting informs the peer of the maximum size of the header compression |
1102 | /// table used to encode header blocks, in octets. The encoder may select any value |
1103 | /// equal to or less than the header table size specified by the sender. |
1104 | /// |
1105 | /// The default value is 4,096. |
1106 | /// |
1107 | /// # Examples |
1108 | /// |
1109 | /// ``` |
1110 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1111 | /// # use h2::client::*; |
1112 | /// # use bytes::Bytes; |
1113 | /// # |
1114 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1115 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1116 | /// # { |
1117 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1118 | /// // handshake. |
1119 | /// let client_fut = Builder::new() |
1120 | /// .header_table_size(1_000_000) |
1121 | /// .handshake(my_io); |
1122 | /// # client_fut.await |
1123 | /// # } |
1124 | /// # |
1125 | /// # pub fn main() {} |
1126 | /// ``` |
1127 | pub fn header_table_size(&mut self, size: u32) -> &mut Self { |
1128 | self.settings.set_header_table_size(Some(size)); |
1129 | self |
1130 | } |
1131 | |
1132 | /// Sets the first stream ID to something other than 1. |
1133 | #[cfg (feature = "unstable" )] |
1134 | pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self { |
1135 | self.stream_id = stream_id.into(); |
1136 | assert!( |
1137 | self.stream_id.is_client_initiated(), |
1138 | "stream id must be odd" |
1139 | ); |
1140 | self |
1141 | } |
1142 | |
1143 | /// Creates a new configured HTTP/2 client backed by `io`. |
1144 | /// |
1145 | /// It is expected that `io` already be in an appropriate state to commence |
1146 | /// the [HTTP/2 handshake]. The handshake is completed once both the connection |
1147 | /// preface and the initial settings frame is sent by the client. |
1148 | /// |
1149 | /// The handshake future does not wait for the initial settings frame from the |
1150 | /// server. |
1151 | /// |
1152 | /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] |
1153 | /// tuple once the HTTP/2 handshake has been completed. |
1154 | /// |
1155 | /// This function also allows the caller to configure the send payload data |
1156 | /// type. See [Outbound data type] for more details. |
1157 | /// |
1158 | /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
1159 | /// [`Connection`]: struct.Connection.html |
1160 | /// [`SendRequest`]: struct.SendRequest.html |
1161 | /// [Outbound data type]: ../index.html#outbound-data-type. |
1162 | /// |
1163 | /// # Examples |
1164 | /// |
1165 | /// Basic usage: |
1166 | /// |
1167 | /// ``` |
1168 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1169 | /// # use h2::client::*; |
1170 | /// # use bytes::Bytes; |
1171 | /// # |
1172 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1173 | /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1174 | /// # { |
1175 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1176 | /// // handshake. |
1177 | /// let client_fut = Builder::new() |
1178 | /// .handshake(my_io); |
1179 | /// # client_fut.await |
1180 | /// # } |
1181 | /// # |
1182 | /// # pub fn main() {} |
1183 | /// ``` |
1184 | /// |
1185 | /// Configures the send-payload data type. In this case, the outbound data |
1186 | /// type will be `&'static [u8]`. |
1187 | /// |
1188 | /// ``` |
1189 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1190 | /// # use h2::client::*; |
1191 | /// # |
1192 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1193 | /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error> |
1194 | /// # { |
1195 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1196 | /// // handshake. |
1197 | /// let client_fut = Builder::new() |
1198 | /// .handshake::<_, &'static [u8]>(my_io); |
1199 | /// # client_fut.await |
1200 | /// # } |
1201 | /// # |
1202 | /// # pub fn main() {} |
1203 | /// ``` |
1204 | pub fn handshake<T, B>( |
1205 | &self, |
1206 | io: T, |
1207 | ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> |
1208 | where |
1209 | T: AsyncRead + AsyncWrite + Unpin, |
1210 | B: Buf, |
1211 | { |
1212 | Connection::handshake2(io, self.clone()) |
1213 | } |
1214 | } |
1215 | |
1216 | impl Default for Builder { |
1217 | fn default() -> Builder { |
1218 | Builder::new() |
1219 | } |
1220 | } |
1221 | |
1222 | /// Creates a new configured HTTP/2 client with default configuration |
1223 | /// values backed by `io`. |
1224 | /// |
1225 | /// It is expected that `io` already be in an appropriate state to commence |
1226 | /// the [HTTP/2 handshake]. See [Handshake] for more details. |
1227 | /// |
1228 | /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] |
1229 | /// tuple once the HTTP/2 handshake has been completed. The returned |
1230 | /// [`Connection`] instance will be using default configuration values. Use |
1231 | /// [`Builder`] to customize the configuration values used by a [`Connection`] |
1232 | /// instance. |
1233 | /// |
1234 | /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
1235 | /// [Handshake]: ../index.html#handshake |
1236 | /// [`Connection`]: struct.Connection.html |
1237 | /// [`SendRequest`]: struct.SendRequest.html |
1238 | /// |
1239 | /// # Examples |
1240 | /// |
1241 | /// ``` |
1242 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1243 | /// # use h2::client; |
1244 | /// # use h2::client::*; |
1245 | /// # |
1246 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error> |
1247 | /// # { |
1248 | /// let (send_request, connection) = client::handshake(my_io).await?; |
1249 | /// // The HTTP/2 handshake has completed, now start polling |
1250 | /// // `connection` and use `send_request` to send requests to the |
1251 | /// // server. |
1252 | /// # Ok(()) |
1253 | /// # } |
1254 | /// # |
1255 | /// # pub fn main() {} |
1256 | /// ``` |
1257 | pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> |
1258 | where |
1259 | T: AsyncRead + AsyncWrite + Unpin, |
1260 | { |
1261 | let builder: Builder = Builder::new(); |
1262 | builderInstrumented> |
1263 | .handshake(io) |
1264 | .instrument(tracing::trace_span!("client_handshake" )) |
1265 | .await |
1266 | } |
1267 | |
1268 | // ===== impl Connection ===== |
1269 | |
1270 | async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error> |
1271 | where |
1272 | T: AsyncRead + AsyncWrite + Unpin, |
1273 | { |
1274 | tracing::debug!("binding client connection" ); |
1275 | |
1276 | let msg: &'static [u8] = b"PRI * HTTP/2.0 \r\n\r\nSM \r\n\r\n" ; |
1277 | io.write_all(msg).await.map_err(op:crate::Error::from_io)?; |
1278 | |
1279 | tracing::debug!("client connection bound" ); |
1280 | |
1281 | Ok(()) |
1282 | } |
1283 | |
1284 | impl<T, B> Connection<T, B> |
1285 | where |
1286 | T: AsyncRead + AsyncWrite + Unpin, |
1287 | B: Buf, |
1288 | { |
1289 | async fn handshake2( |
1290 | mut io: T, |
1291 | builder: Builder, |
1292 | ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> { |
1293 | bind_connection(&mut io).await?; |
1294 | |
1295 | // Create the codec |
1296 | let mut codec = Codec::new(io); |
1297 | |
1298 | if let Some(max) = builder.settings.max_frame_size() { |
1299 | codec.set_max_recv_frame_size(max as usize); |
1300 | } |
1301 | |
1302 | if let Some(max) = builder.settings.max_header_list_size() { |
1303 | codec.set_max_recv_header_list_size(max as usize); |
1304 | } |
1305 | |
1306 | // Send initial settings frame |
1307 | codec |
1308 | .buffer(builder.settings.clone().into()) |
1309 | .expect("invalid SETTINGS frame" ); |
1310 | |
1311 | let inner = proto::Connection::new( |
1312 | codec, |
1313 | proto::Config { |
1314 | next_stream_id: builder.stream_id, |
1315 | initial_max_send_streams: builder.initial_max_send_streams, |
1316 | max_send_buffer_size: builder.max_send_buffer_size, |
1317 | reset_stream_duration: builder.reset_stream_duration, |
1318 | reset_stream_max: builder.reset_stream_max, |
1319 | remote_reset_stream_max: builder.pending_accept_reset_stream_max, |
1320 | local_error_reset_streams_max: builder.local_max_error_reset_streams, |
1321 | settings: builder.settings.clone(), |
1322 | }, |
1323 | ); |
1324 | let send_request = SendRequest { |
1325 | inner: inner.streams().clone(), |
1326 | pending: None, |
1327 | }; |
1328 | |
1329 | let mut connection = Connection { inner }; |
1330 | if let Some(sz) = builder.initial_target_connection_window_size { |
1331 | connection.set_target_window_size(sz); |
1332 | } |
1333 | |
1334 | Ok((send_request, connection)) |
1335 | } |
1336 | |
1337 | /// Sets the target window size for the whole connection. |
1338 | /// |
1339 | /// If `size` is greater than the current value, then a `WINDOW_UPDATE` |
1340 | /// frame will be immediately sent to the remote, increasing the connection |
1341 | /// level window by `size - current_value`. |
1342 | /// |
1343 | /// If `size` is less than the current value, nothing will happen |
1344 | /// immediately. However, as window capacity is released by |
1345 | /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent |
1346 | /// out until the number of "in flight" bytes drops below `size`. |
1347 | /// |
1348 | /// The default value is 65,535. |
1349 | /// |
1350 | /// See [`FlowControl`] documentation for more details. |
1351 | /// |
1352 | /// [`FlowControl`]: ../struct.FlowControl.html |
1353 | /// [library level]: ../index.html#flow-control |
1354 | pub fn set_target_window_size(&mut self, size: u32) { |
1355 | assert!(size <= proto::MAX_WINDOW_SIZE); |
1356 | self.inner.set_target_window_size(size); |
1357 | } |
1358 | |
1359 | /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level |
1360 | /// flow control for received data. |
1361 | /// |
1362 | /// The `SETTINGS` will be sent to the remote, and only applied once the |
1363 | /// remote acknowledges the change. |
1364 | /// |
1365 | /// This can be used to increase or decrease the window size for existing |
1366 | /// streams. |
1367 | /// |
1368 | /// # Errors |
1369 | /// |
1370 | /// Returns an error if a previous call is still pending acknowledgement |
1371 | /// from the remote endpoint. |
1372 | pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { |
1373 | assert!(size <= proto::MAX_WINDOW_SIZE); |
1374 | self.inner.set_initial_window_size(size)?; |
1375 | Ok(()) |
1376 | } |
1377 | |
1378 | /// Takes a `PingPong` instance from the connection. |
1379 | /// |
1380 | /// # Note |
1381 | /// |
1382 | /// This may only be called once. Calling multiple times will return `None`. |
1383 | pub fn ping_pong(&mut self) -> Option<PingPong> { |
1384 | self.inner.take_user_pings().map(PingPong::new) |
1385 | } |
1386 | |
1387 | /// Returns the maximum number of concurrent streams that may be initiated |
1388 | /// by this client. |
1389 | /// |
1390 | /// This limit is configured by the server peer by sending the |
1391 | /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame. |
1392 | /// This method returns the currently acknowledged value received from the |
1393 | /// remote. |
1394 | /// |
1395 | /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 |
1396 | pub fn max_concurrent_send_streams(&self) -> usize { |
1397 | self.inner.max_send_streams() |
1398 | } |
1399 | /// Returns the maximum number of concurrent streams that may be initiated |
1400 | /// by the server on this connection. |
1401 | /// |
1402 | /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS` |
1403 | /// parameter][1] sent in a `SETTINGS` frame that has been |
1404 | /// acknowledged by the remote peer. The value to be sent is configured by |
1405 | /// the [`Builder::max_concurrent_streams`][2] method before handshaking |
1406 | /// with the remote peer. |
1407 | /// |
1408 | /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 |
1409 | /// [2]: ../struct.Builder.html#method.max_concurrent_streams |
1410 | pub fn max_concurrent_recv_streams(&self) -> usize { |
1411 | self.inner.max_recv_streams() |
1412 | } |
1413 | } |
1414 | |
1415 | impl<T, B> Future for Connection<T, B> |
1416 | where |
1417 | T: AsyncRead + AsyncWrite + Unpin, |
1418 | B: Buf, |
1419 | { |
1420 | type Output = Result<(), crate::Error>; |
1421 | |
1422 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1423 | self.inner.maybe_close_connection_if_no_streams(); |
1424 | self.inner.poll(cx).map_err(Into::into) |
1425 | } |
1426 | } |
1427 | |
1428 | impl<T, B> fmt::Debug for Connection<T, B> |
1429 | where |
1430 | T: AsyncRead + AsyncWrite, |
1431 | T: fmt::Debug, |
1432 | B: fmt::Debug + Buf, |
1433 | { |
1434 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
1435 | fmt::Debug::fmt(&self.inner, f:fmt) |
1436 | } |
1437 | } |
1438 | |
1439 | // ===== impl ResponseFuture ===== |
1440 | |
1441 | impl Future for ResponseFuture { |
1442 | type Output = Result<Response<RecvStream>, crate::Error>; |
1443 | |
1444 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1445 | let (parts: Parts, _) = ready!(self.inner.poll_response(cx))?.into_parts(); |
1446 | let body: RecvStream = RecvStream::new(inner:FlowControl::new(self.inner.clone())); |
1447 | |
1448 | Poll::Ready(Ok(Response::from_parts(parts, body))) |
1449 | } |
1450 | } |
1451 | |
1452 | impl ResponseFuture { |
1453 | /// Returns the stream ID of the response stream. |
1454 | /// |
1455 | /// # Panics |
1456 | /// |
1457 | /// If the lock on the stream store has been poisoned. |
1458 | pub fn stream_id(&self) -> crate::StreamId { |
1459 | crate::StreamId::from_internal(self.inner.stream_id()) |
1460 | } |
1461 | /// Returns a stream of PushPromises |
1462 | /// |
1463 | /// # Panics |
1464 | /// |
1465 | /// If this method has been called before |
1466 | /// or the stream was itself was pushed |
1467 | pub fn push_promises(&mut self) -> PushPromises { |
1468 | if self.push_promise_consumed { |
1469 | panic!("Reference to push promises stream taken!" ); |
1470 | } |
1471 | self.push_promise_consumed = true; |
1472 | PushPromises { |
1473 | inner: self.inner.clone(), |
1474 | } |
1475 | } |
1476 | } |
1477 | |
1478 | // ===== impl PushPromises ===== |
1479 | |
1480 | impl PushPromises { |
1481 | /// Get the next `PushPromise`. |
1482 | pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> { |
1483 | futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await |
1484 | } |
1485 | |
1486 | #[doc (hidden)] |
1487 | pub fn poll_push_promise( |
1488 | &mut self, |
1489 | cx: &mut Context<'_>, |
1490 | ) -> Poll<Option<Result<PushPromise, crate::Error>>> { |
1491 | match self.inner.poll_pushed(cx) { |
1492 | Poll::Ready(Some(Ok((request, response)))) => { |
1493 | let response = PushedResponseFuture { |
1494 | inner: ResponseFuture { |
1495 | inner: response, |
1496 | push_promise_consumed: false, |
1497 | }, |
1498 | }; |
1499 | Poll::Ready(Some(Ok(PushPromise { request, response }))) |
1500 | } |
1501 | Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), |
1502 | Poll::Ready(None) => Poll::Ready(None), |
1503 | Poll::Pending => Poll::Pending, |
1504 | } |
1505 | } |
1506 | } |
1507 | |
1508 | #[cfg (feature = "stream" )] |
1509 | impl futures_core::Stream for PushPromises { |
1510 | type Item = Result<PushPromise, crate::Error>; |
1511 | |
1512 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1513 | self.poll_push_promise(cx) |
1514 | } |
1515 | } |
1516 | |
1517 | // ===== impl PushPromise ===== |
1518 | |
1519 | impl PushPromise { |
1520 | /// Returns a reference to the push promise's request headers. |
1521 | pub fn request(&self) -> &Request<()> { |
1522 | &self.request |
1523 | } |
1524 | |
1525 | /// Returns a mutable reference to the push promise's request headers. |
1526 | pub fn request_mut(&mut self) -> &mut Request<()> { |
1527 | &mut self.request |
1528 | } |
1529 | |
1530 | /// Consumes `self`, returning the push promise's request headers and |
1531 | /// response future. |
1532 | pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) { |
1533 | (self.request, self.response) |
1534 | } |
1535 | } |
1536 | |
1537 | // ===== impl PushedResponseFuture ===== |
1538 | |
1539 | impl Future for PushedResponseFuture { |
1540 | type Output = Result<Response<RecvStream>, crate::Error>; |
1541 | |
1542 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1543 | Pin::new(&mut self.inner).poll(cx) |
1544 | } |
1545 | } |
1546 | |
1547 | impl PushedResponseFuture { |
1548 | /// Returns the stream ID of the response stream. |
1549 | /// |
1550 | /// # Panics |
1551 | /// |
1552 | /// If the lock on the stream store has been poisoned. |
1553 | pub fn stream_id(&self) -> crate::StreamId { |
1554 | self.inner.stream_id() |
1555 | } |
1556 | } |
1557 | |
1558 | // ===== impl Peer ===== |
1559 | |
1560 | impl Peer { |
1561 | pub fn convert_send_message( |
1562 | id: StreamId, |
1563 | request: Request<()>, |
1564 | protocol: Option<Protocol>, |
1565 | end_of_stream: bool, |
1566 | ) -> Result<Headers, SendError> { |
1567 | use http::request::Parts; |
1568 | |
1569 | let ( |
1570 | Parts { |
1571 | method, |
1572 | uri, |
1573 | headers, |
1574 | version, |
1575 | .. |
1576 | }, |
1577 | _, |
1578 | ) = request.into_parts(); |
1579 | |
1580 | let is_connect = method == Method::CONNECT; |
1581 | |
1582 | // Build the set pseudo header set. All requests will include `method` |
1583 | // and `path`. |
1584 | let mut pseudo = Pseudo::request(method, uri, protocol); |
1585 | |
1586 | if pseudo.scheme.is_none() { |
1587 | // If the scheme is not set, then there are a two options. |
1588 | // |
1589 | // 1) Authority is not set. In this case, a request was issued with |
1590 | // a relative URI. This is permitted **only** when forwarding |
1591 | // HTTP 1.x requests. If the HTTP version is set to 2.0, then |
1592 | // this is an error. |
1593 | // |
1594 | // 2) Authority is set, then the HTTP method *must* be CONNECT. |
1595 | // |
1596 | // It is not possible to have a scheme but not an authority set (the |
1597 | // `http` crate does not allow it). |
1598 | // |
1599 | if pseudo.authority.is_none() { |
1600 | if version == Version::HTTP_2 { |
1601 | return Err(UserError::MissingUriSchemeAndAuthority.into()); |
1602 | } else { |
1603 | // This is acceptable as per the above comment. However, |
1604 | // HTTP/2 requires that a scheme is set. Since we are |
1605 | // forwarding an HTTP 1.1 request, the scheme is set to |
1606 | // "http". |
1607 | pseudo.set_scheme(uri::Scheme::HTTP); |
1608 | } |
1609 | } else if !is_connect { |
1610 | // TODO: Error |
1611 | } |
1612 | } |
1613 | |
1614 | // Create the HEADERS frame |
1615 | let mut frame = Headers::new(id, pseudo, headers); |
1616 | |
1617 | if end_of_stream { |
1618 | frame.set_end_stream() |
1619 | } |
1620 | |
1621 | Ok(frame) |
1622 | } |
1623 | } |
1624 | |
1625 | impl proto::Peer for Peer { |
1626 | type Poll = Response<()>; |
1627 | |
1628 | const NAME: &'static str = "Client" ; |
1629 | |
1630 | fn r#dyn() -> proto::DynPeer { |
1631 | proto::DynPeer::Client |
1632 | } |
1633 | |
1634 | fn is_server() -> bool { |
1635 | false |
1636 | } |
1637 | |
1638 | fn convert_poll_message( |
1639 | pseudo: Pseudo, |
1640 | fields: HeaderMap, |
1641 | stream_id: StreamId, |
1642 | ) -> Result<Self::Poll, Error> { |
1643 | let mut b = Response::builder(); |
1644 | |
1645 | b = b.version(Version::HTTP_2); |
1646 | |
1647 | if let Some(status) = pseudo.status { |
1648 | b = b.status(status); |
1649 | } |
1650 | |
1651 | let mut response = match b.body(()) { |
1652 | Ok(response) => response, |
1653 | Err(_) => { |
1654 | // TODO: Should there be more specialized handling for different |
1655 | // kinds of errors |
1656 | return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); |
1657 | } |
1658 | }; |
1659 | |
1660 | *response.headers_mut() = fields; |
1661 | |
1662 | Ok(response) |
1663 | } |
1664 | } |
1665 | |