1 | //! Client implementation of the HTTP/2 protocol. |
2 | //! |
3 | //! # Getting started |
4 | //! |
5 | //! Running an HTTP/2 client requires the caller to establish the underlying |
6 | //! connection as well as get the connection to a state that is ready to begin |
7 | //! the HTTP/2 handshake. See [here](../index.html#handshake) for more |
8 | //! details. |
9 | //! |
10 | //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote |
11 | //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades. |
12 | //! |
13 | //! Once a connection is obtained, it is passed to [`handshake`], which will |
14 | //! begin the [HTTP/2 handshake]. This returns a future that completes once |
15 | //! the handshake process is performed and HTTP/2 streams may be initialized. |
16 | //! |
17 | //! [`handshake`] uses default configuration values. There are a number of |
18 | //! settings that can be changed by using [`Builder`] instead. |
19 | //! |
20 | //! Once the handshake future completes, the caller is provided with a |
21 | //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`] |
22 | //! instance is used to drive the connection (see [Managing the connection]). |
23 | //! The [`SendRequest`] instance is used to initialize new streams (see [Making |
24 | //! requests]). |
25 | //! |
26 | //! # Making requests |
27 | //! |
28 | //! Requests are made using the [`SendRequest`] handle provided by the handshake |
29 | //! future. Once a request is submitted, an HTTP/2 stream is initialized and |
30 | //! the request is sent to the server. |
31 | //! |
32 | //! A request body and request trailers are sent using [`SendRequest`] and the |
33 | //! server's response is returned once the [`ResponseFuture`] future completes. |
34 | //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by |
35 | //! [`SendRequest::send_request`] and are tied to the HTTP/2 stream |
36 | //! initialized by the sent request. |
37 | //! |
38 | //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2 |
39 | //! stream can be created, i.e. as long as the current number of active streams |
40 | //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the |
41 | //! caller will be notified once an existing stream closes, freeing capacity for |
42 | //! the caller. The caller should use [`SendRequest::poll_ready`] to check for |
43 | //! capacity before sending a request to the server. |
44 | //! |
45 | //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user |
46 | //! must not send a request if `poll_ready` does not return `Ready`. Attempting |
47 | //! to do so will result in an [`Error`] being returned. |
48 | //! |
49 | //! # Managing the connection |
50 | //! |
51 | //! The [`Connection`] instance is used to manage connection state. The caller |
52 | //! is required to call [`Connection::poll`] in order to advance state. |
53 | //! [`SendRequest::send_request`] and other functions have no effect unless |
54 | //! [`Connection::poll`] is called. |
55 | //! |
56 | //! The [`Connection`] instance should only be dropped once [`Connection::poll`] |
57 | //! returns `Ready`. At this point, the underlying socket has been closed and no |
58 | //! further work needs to be done. |
59 | //! |
60 | //! The easiest way to ensure that the [`Connection`] instance gets polled is to |
61 | //! submit the [`Connection`] instance to an [executor]. The executor will then |
62 | //! manage polling the connection until the connection is complete. |
63 | //! Alternatively, the caller can call `poll` manually. |
64 | //! |
65 | //! # Example |
66 | //! |
67 | //! ```rust, no_run |
68 | //! |
69 | //! use h2::client; |
70 | //! |
71 | //! use http::{Request, Method}; |
72 | //! use std::error::Error; |
73 | //! use tokio::net::TcpStream; |
74 | //! |
75 | //! #[tokio::main] |
76 | //! pub async fn main() -> Result<(), Box<dyn Error>> { |
77 | //! // Establish TCP connection to the server. |
78 | //! let tcp = TcpStream::connect("127.0.0.1:5928" ).await?; |
79 | //! let (h2, connection) = client::handshake(tcp).await?; |
80 | //! tokio::spawn(async move { |
81 | //! connection.await.unwrap(); |
82 | //! }); |
83 | //! |
84 | //! let mut h2 = h2.ready().await?; |
85 | //! // Prepare the HTTP request to send to the server. |
86 | //! let request = Request::builder() |
87 | //! .method(Method::GET) |
88 | //! .uri("https://www.example.com/" ) |
89 | //! .body(()) |
90 | //! .unwrap(); |
91 | //! |
92 | //! // Send the request. The second tuple item allows the caller |
93 | //! // to stream a request body. |
94 | //! let (response, _) = h2.send_request(request, true).unwrap(); |
95 | //! |
96 | //! let (head, mut body) = response.await?.into_parts(); |
97 | //! |
98 | //! println!("Received response: {:?}" , head); |
99 | //! |
100 | //! // The `flow_control` handle allows the caller to manage |
101 | //! // flow control. |
102 | //! // |
103 | //! // Whenever data is received, the caller is responsible for |
104 | //! // releasing capacity back to the server once it has freed |
105 | //! // the data from memory. |
106 | //! let mut flow_control = body.flow_control().clone(); |
107 | //! |
108 | //! while let Some(chunk) = body.data().await { |
109 | //! let chunk = chunk?; |
110 | //! println!("RX: {:?}" , chunk); |
111 | //! |
112 | //! // Let the server send more data. |
113 | //! let _ = flow_control.release_capacity(chunk.len()); |
114 | //! } |
115 | //! |
116 | //! Ok(()) |
117 | //! } |
118 | //! ``` |
119 | //! |
120 | //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html |
121 | //! [`handshake`]: fn.handshake.html |
122 | //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html |
123 | //! [`SendRequest`]: struct.SendRequest.html |
124 | //! [`SendStream`]: ../struct.SendStream.html |
125 | //! [Making requests]: #making-requests |
126 | //! [Managing the connection]: #managing-the-connection |
127 | //! [`Connection`]: struct.Connection.html |
128 | //! [`Connection::poll`]: struct.Connection.html#method.poll |
129 | //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request |
130 | //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues |
131 | //! [`SendRequest`]: struct.SendRequest.html |
132 | //! [`ResponseFuture`]: struct.ResponseFuture.html |
133 | //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready |
134 | //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
135 | //! [`Builder`]: struct.Builder.html |
136 | //! [`Error`]: ../struct.Error.html |
137 | |
138 | use crate::codec::{Codec, SendError, UserError}; |
139 | use crate::ext::Protocol; |
140 | use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; |
141 | use crate::proto::{self, Error}; |
142 | use crate::{FlowControl, PingPong, RecvStream, SendStream}; |
143 | |
144 | use bytes::{Buf, Bytes}; |
145 | use http::{uri, HeaderMap, Method, Request, Response, Version}; |
146 | use std::fmt; |
147 | use std::future::Future; |
148 | use std::pin::Pin; |
149 | use std::task::{Context, Poll}; |
150 | use std::time::Duration; |
151 | use std::usize; |
152 | use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; |
153 | use tracing::Instrument; |
154 | |
155 | /// Initializes new HTTP/2 streams on a connection by sending a request. |
156 | /// |
157 | /// This type does no work itself. Instead, it is a handle to the inner |
158 | /// connection state held by [`Connection`]. If the associated connection |
159 | /// instance is dropped, all `SendRequest` functions will return [`Error`]. |
160 | /// |
161 | /// [`SendRequest`] instances are able to move to and operate on separate tasks |
162 | /// / threads than their associated [`Connection`] instance. Internally, there |
163 | /// is a buffer used to stage requests before they get written to the |
164 | /// connection. There is no guarantee that requests get written to the |
165 | /// connection in FIFO order as HTTP/2 prioritization logic can play a role. |
166 | /// |
167 | /// [`SendRequest`] implements [`Clone`], enabling the creation of many |
168 | /// instances that are backed by a single connection. |
169 | /// |
170 | /// See [module] level documentation for more details. |
171 | /// |
172 | /// [module]: index.html |
173 | /// [`Connection`]: struct.Connection.html |
174 | /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html |
175 | /// [`Error`]: ../struct.Error.html |
176 | pub struct SendRequest<B: Buf> { |
177 | inner: proto::Streams<B, Peer>, |
178 | pending: Option<proto::OpaqueStreamRef>, |
179 | } |
180 | |
181 | /// Returns a `SendRequest` instance once it is ready to send at least one |
182 | /// request. |
183 | #[derive (Debug)] |
184 | pub struct ReadySendRequest<B: Buf> { |
185 | inner: Option<SendRequest<B>>, |
186 | } |
187 | |
188 | /// Manages all state associated with an HTTP/2 client connection. |
189 | /// |
190 | /// A `Connection` is backed by an I/O resource (usually a TCP socket) and |
191 | /// implements the HTTP/2 client logic for that connection. It is responsible |
192 | /// for driving the internal state forward, performing the work requested of the |
193 | /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`], |
194 | /// [`RecvStream`]). |
195 | /// |
196 | /// `Connection` values are created by calling [`handshake`]. Once a |
197 | /// `Connection` value is obtained, the caller must repeatedly call [`poll`] |
198 | /// until `Ready` is returned. The easiest way to do this is to submit the |
199 | /// `Connection` instance to an [executor]. |
200 | /// |
201 | /// [module]: index.html |
202 | /// [`handshake`]: fn.handshake.html |
203 | /// [`SendRequest`]: struct.SendRequest.html |
204 | /// [`ResponseFuture`]: struct.ResponseFuture.html |
205 | /// [`SendStream`]: ../struct.SendStream.html |
206 | /// [`RecvStream`]: ../struct.RecvStream.html |
207 | /// [`poll`]: #method.poll |
208 | /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html |
209 | /// |
210 | /// # Examples |
211 | /// |
212 | /// ``` |
213 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
214 | /// # use h2::client; |
215 | /// # use h2::client::*; |
216 | /// # |
217 | /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error> |
218 | /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, |
219 | /// # { |
220 | /// let (send_request, connection) = client::handshake(my_io).await?; |
221 | /// // Submit the connection handle to an executor. |
222 | /// tokio::spawn(async { connection.await.expect("connection failed" ); }); |
223 | /// |
224 | /// // Now, use `send_request` to initialize HTTP/2 streams. |
225 | /// // ... |
226 | /// # Ok(()) |
227 | /// # } |
228 | /// # |
229 | /// # pub fn main() {} |
230 | /// ``` |
231 | #[must_use = "futures do nothing unless polled" ] |
232 | pub struct Connection<T, B: Buf = Bytes> { |
233 | inner: proto::Connection<T, Peer, B>, |
234 | } |
235 | |
236 | /// A future of an HTTP response. |
237 | #[derive (Debug)] |
238 | #[must_use = "futures do nothing unless polled" ] |
239 | pub struct ResponseFuture { |
240 | inner: proto::OpaqueStreamRef, |
241 | push_promise_consumed: bool, |
242 | } |
243 | |
244 | /// A future of a pushed HTTP response. |
245 | /// |
246 | /// We have to differentiate between pushed and non pushed because of the spec |
247 | /// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE> |
248 | /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream |
249 | /// > that is in either the "open" or "half-closed (remote)" state. |
250 | #[derive (Debug)] |
251 | #[must_use = "futures do nothing unless polled" ] |
252 | pub struct PushedResponseFuture { |
253 | inner: ResponseFuture, |
254 | } |
255 | |
256 | /// A pushed response and corresponding request headers |
257 | #[derive (Debug)] |
258 | pub struct PushPromise { |
259 | /// The request headers |
260 | request: Request<()>, |
261 | |
262 | /// The pushed response |
263 | response: PushedResponseFuture, |
264 | } |
265 | |
266 | /// A stream of pushed responses and corresponding promised requests |
267 | #[derive (Debug)] |
268 | #[must_use = "streams do nothing unless polled" ] |
269 | pub struct PushPromises { |
270 | inner: proto::OpaqueStreamRef, |
271 | } |
272 | |
273 | /// Builds client connections with custom configuration values. |
274 | /// |
275 | /// Methods can be chained in order to set the configuration values. |
276 | /// |
277 | /// The client is constructed by calling [`handshake`] and passing the I/O |
278 | /// handle that will back the HTTP/2 server. |
279 | /// |
280 | /// New instances of `Builder` are obtained via [`Builder::new`]. |
281 | /// |
282 | /// See function level documentation for details on the various client |
283 | /// configuration settings. |
284 | /// |
285 | /// [`Builder::new`]: struct.Builder.html#method.new |
286 | /// [`handshake`]: struct.Builder.html#method.handshake |
287 | /// |
288 | /// # Examples |
289 | /// |
290 | /// ``` |
291 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
292 | /// # use h2::client::*; |
293 | /// # use bytes::Bytes; |
294 | /// # |
295 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
296 | /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
297 | /// # { |
298 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
299 | /// // handshake. |
300 | /// let client_fut = Builder::new() |
301 | /// .initial_window_size(1_000_000) |
302 | /// .max_concurrent_streams(1000) |
303 | /// .handshake(my_io); |
304 | /// # client_fut.await |
305 | /// # } |
306 | /// # |
307 | /// # pub fn main() {} |
308 | /// ``` |
309 | #[derive (Clone, Debug)] |
310 | pub struct Builder { |
311 | /// Time to keep locally reset streams around before reaping. |
312 | reset_stream_duration: Duration, |
313 | |
314 | /// Initial maximum number of locally initiated (send) streams. |
315 | /// After receiving a Settings frame from the remote peer, |
316 | /// the connection will overwrite this value with the |
317 | /// MAX_CONCURRENT_STREAMS specified in the frame. |
318 | initial_max_send_streams: usize, |
319 | |
320 | /// Initial target window size for new connections. |
321 | initial_target_connection_window_size: Option<u32>, |
322 | |
323 | /// Maximum amount of bytes to "buffer" for writing per stream. |
324 | max_send_buffer_size: usize, |
325 | |
326 | /// Maximum number of locally reset streams to keep at a time. |
327 | reset_stream_max: usize, |
328 | |
329 | /// Maximum number of remotely reset streams to allow in the pending |
330 | /// accept queue. |
331 | pending_accept_reset_stream_max: usize, |
332 | |
333 | /// Initial `Settings` frame to send as part of the handshake. |
334 | settings: Settings, |
335 | |
336 | /// The stream ID of the first (lowest) stream. Subsequent streams will use |
337 | /// monotonically increasing stream IDs. |
338 | stream_id: StreamId, |
339 | } |
340 | |
341 | #[derive (Debug)] |
342 | pub(crate) struct Peer; |
343 | |
344 | // ===== impl SendRequest ===== |
345 | |
346 | impl<B> SendRequest<B> |
347 | where |
348 | B: Buf, |
349 | { |
350 | /// Returns `Ready` when the connection can initialize a new HTTP/2 |
351 | /// stream. |
352 | /// |
353 | /// This function must return `Ready` before `send_request` is called. When |
354 | /// `Poll::Pending` is returned, the task will be notified once the readiness |
355 | /// state changes. |
356 | /// |
357 | /// See [module] level docs for more details. |
358 | /// |
359 | /// [module]: index.html |
360 | pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { |
361 | ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?; |
362 | self.pending = None; |
363 | Poll::Ready(Ok(())) |
364 | } |
365 | |
366 | /// Consumes `self`, returning a future that returns `self` back once it is |
367 | /// ready to send a request. |
368 | /// |
369 | /// This function should be called before calling `send_request`. |
370 | /// |
371 | /// This is a functional combinator for [`poll_ready`]. The returned future |
372 | /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to |
373 | /// the caller. |
374 | /// |
375 | /// # Examples |
376 | /// |
377 | /// ```rust |
378 | /// # use h2::client::*; |
379 | /// # use http::*; |
380 | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
381 | /// # { |
382 | /// // First, wait until the `send_request` handle is ready to send a new |
383 | /// // request |
384 | /// let mut send_request = send_request.ready().await.unwrap(); |
385 | /// // Use `send_request` here. |
386 | /// # } |
387 | /// # pub fn main() {} |
388 | /// ``` |
389 | /// |
390 | /// See [module] level docs for more details. |
391 | /// |
392 | /// [`poll_ready`]: #method.poll_ready |
393 | /// [module]: index.html |
394 | pub fn ready(self) -> ReadySendRequest<B> { |
395 | ReadySendRequest { inner: Some(self) } |
396 | } |
397 | |
398 | /// Sends a HTTP/2 request to the server. |
399 | /// |
400 | /// `send_request` initializes a new HTTP/2 stream on the associated |
401 | /// connection, then sends the given request using this new stream. Only the |
402 | /// request head is sent. |
403 | /// |
404 | /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance |
405 | /// are returned. The [`ResponseFuture`] instance is used to get the |
406 | /// server's response and the [`SendStream`] instance is used to send a |
407 | /// request body or trailers to the server over the same HTTP/2 stream. |
408 | /// |
409 | /// To send a request body or trailers, set `end_of_stream` to `false`. |
410 | /// Then, use the returned [`SendStream`] instance to stream request body |
411 | /// chunks or send trailers. If `end_of_stream` is **not** set to `false` |
412 | /// then attempting to call [`SendStream::send_data`] or |
413 | /// [`SendStream::send_trailers`] will result in an error. |
414 | /// |
415 | /// If no request body or trailers are to be sent, set `end_of_stream` to |
416 | /// `true` and drop the returned [`SendStream`] instance. |
417 | /// |
418 | /// # A note on HTTP versions |
419 | /// |
420 | /// The provided `Request` will be encoded differently depending on the |
421 | /// value of its version field. If the version is set to 2.0, then the |
422 | /// request is encoded as per the specification recommends. |
423 | /// |
424 | /// If the version is set to a lower value, then the request is encoded to |
425 | /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host |
426 | /// headers are permitted and the `:authority` pseudo header is not |
427 | /// included. |
428 | /// |
429 | /// The caller should always set the request's version field to 2.0 unless |
430 | /// specifically transmitting an HTTP 1.1 request over 2.0. |
431 | /// |
432 | /// # Examples |
433 | /// |
434 | /// Sending a request with no body |
435 | /// |
436 | /// ```rust |
437 | /// # use h2::client::*; |
438 | /// # use http::*; |
439 | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
440 | /// # { |
441 | /// // First, wait until the `send_request` handle is ready to send a new |
442 | /// // request |
443 | /// let mut send_request = send_request.ready().await.unwrap(); |
444 | /// // Prepare the HTTP request to send to the server. |
445 | /// let request = Request::get("https://www.example.com/" ) |
446 | /// .body(()) |
447 | /// .unwrap(); |
448 | /// |
449 | /// // Send the request to the server. Since we are not sending a |
450 | /// // body or trailers, we can drop the `SendStream` instance. |
451 | /// let (response, _) = send_request.send_request(request, true).unwrap(); |
452 | /// let response = response.await.unwrap(); |
453 | /// // Process the response |
454 | /// # } |
455 | /// # pub fn main() {} |
456 | /// ``` |
457 | /// |
458 | /// Sending a request with a body and trailers |
459 | /// |
460 | /// ```rust |
461 | /// # use h2::client::*; |
462 | /// # use http::*; |
463 | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
464 | /// # { |
465 | /// // First, wait until the `send_request` handle is ready to send a new |
466 | /// // request |
467 | /// let mut send_request = send_request.ready().await.unwrap(); |
468 | /// |
469 | /// // Prepare the HTTP request to send to the server. |
470 | /// let request = Request::get("https://www.example.com/" ) |
471 | /// .body(()) |
472 | /// .unwrap(); |
473 | /// |
474 | /// // Send the request to the server. If we are not sending a |
475 | /// // body or trailers, we can drop the `SendStream` instance. |
476 | /// let (response, mut send_stream) = send_request |
477 | /// .send_request(request, false).unwrap(); |
478 | /// |
479 | /// // At this point, one option would be to wait for send capacity. |
480 | /// // Doing so would allow us to not hold data in memory that |
481 | /// // cannot be sent. However, this is not a requirement, so this |
482 | /// // example will skip that step. See `SendStream` documentation |
483 | /// // for more details. |
484 | /// send_stream.send_data(b"hello" , false).unwrap(); |
485 | /// send_stream.send_data(b"world" , false).unwrap(); |
486 | /// |
487 | /// // Send the trailers. |
488 | /// let mut trailers = HeaderMap::new(); |
489 | /// trailers.insert( |
490 | /// header::HeaderName::from_bytes(b"my-trailer" ).unwrap(), |
491 | /// header::HeaderValue::from_bytes(b"hello" ).unwrap()); |
492 | /// |
493 | /// send_stream.send_trailers(trailers).unwrap(); |
494 | /// |
495 | /// let response = response.await.unwrap(); |
496 | /// // Process the response |
497 | /// # } |
498 | /// # pub fn main() {} |
499 | /// ``` |
500 | /// |
501 | /// [`ResponseFuture`]: struct.ResponseFuture.html |
502 | /// [`SendStream`]: ../struct.SendStream.html |
503 | /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data |
504 | /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers |
505 | pub fn send_request( |
506 | &mut self, |
507 | request: Request<()>, |
508 | end_of_stream: bool, |
509 | ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> { |
510 | self.inner |
511 | .send_request(request, end_of_stream, self.pending.as_ref()) |
512 | .map_err(Into::into) |
513 | .map(|(stream, is_full)| { |
514 | if stream.is_pending_open() && is_full { |
515 | // Only prevent sending another request when the request queue |
516 | // is not full. |
517 | self.pending = Some(stream.clone_to_opaque()); |
518 | } |
519 | |
520 | let response = ResponseFuture { |
521 | inner: stream.clone_to_opaque(), |
522 | push_promise_consumed: false, |
523 | }; |
524 | |
525 | let stream = SendStream::new(stream); |
526 | |
527 | (response, stream) |
528 | }) |
529 | } |
530 | |
531 | /// Returns whether the [extended CONNECT protocol][1] is enabled or not. |
532 | /// |
533 | /// This setting is configured by the server peer by sending the |
534 | /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. |
535 | /// This method returns the currently acknowledged value received from the |
536 | /// remote. |
537 | /// |
538 | /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 |
539 | /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 |
540 | pub fn is_extended_connect_protocol_enabled(&self) -> bool { |
541 | self.inner.is_extended_connect_protocol_enabled() |
542 | } |
543 | } |
544 | |
545 | impl<B> fmt::Debug for SendRequest<B> |
546 | where |
547 | B: Buf, |
548 | { |
549 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
550 | fmt.debug_struct(name:"SendRequest" ).finish() |
551 | } |
552 | } |
553 | |
554 | impl<B> Clone for SendRequest<B> |
555 | where |
556 | B: Buf, |
557 | { |
558 | fn clone(&self) -> Self { |
559 | SendRequest { |
560 | inner: self.inner.clone(), |
561 | pending: None, |
562 | } |
563 | } |
564 | } |
565 | |
566 | #[cfg (feature = "unstable" )] |
567 | impl<B> SendRequest<B> |
568 | where |
569 | B: Buf, |
570 | { |
571 | /// Returns the number of active streams. |
572 | /// |
573 | /// An active stream is a stream that has not yet transitioned to a closed |
574 | /// state. |
575 | pub fn num_active_streams(&self) -> usize { |
576 | self.inner.num_active_streams() |
577 | } |
578 | |
579 | /// Returns the number of streams that are held in memory. |
580 | /// |
581 | /// A wired stream is a stream that is either active or is closed but must |
582 | /// stay in memory for some reason. For example, there are still outstanding |
583 | /// userspace handles pointing to the slot. |
584 | pub fn num_wired_streams(&self) -> usize { |
585 | self.inner.num_wired_streams() |
586 | } |
587 | } |
588 | |
589 | // ===== impl ReadySendRequest ===== |
590 | |
591 | impl<B> Future for ReadySendRequest<B> |
592 | where |
593 | B: Buf, |
594 | { |
595 | type Output = Result<SendRequest<B>, crate::Error>; |
596 | |
597 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
598 | match &mut self.inner { |
599 | Some(send_request: &mut SendRequest) => { |
600 | ready!(send_request.poll_ready(cx))?; |
601 | } |
602 | None => panic!("called `poll` after future completed" ), |
603 | } |
604 | |
605 | Poll::Ready(Ok(self.inner.take().unwrap())) |
606 | } |
607 | } |
608 | |
609 | // ===== impl Builder ===== |
610 | |
611 | impl Builder { |
612 | /// Returns a new client builder instance initialized with default |
613 | /// configuration values. |
614 | /// |
615 | /// Configuration methods can be chained on the return value. |
616 | /// |
617 | /// # Examples |
618 | /// |
619 | /// ``` |
620 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
621 | /// # use h2::client::*; |
622 | /// # use bytes::Bytes; |
623 | /// # |
624 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
625 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
626 | /// # { |
627 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
628 | /// // handshake. |
629 | /// let client_fut = Builder::new() |
630 | /// .initial_window_size(1_000_000) |
631 | /// .max_concurrent_streams(1000) |
632 | /// .handshake(my_io); |
633 | /// # client_fut.await |
634 | /// # } |
635 | /// # |
636 | /// # pub fn main() {} |
637 | /// ``` |
638 | pub fn new() -> Builder { |
639 | Builder { |
640 | max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, |
641 | reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), |
642 | reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, |
643 | pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, |
644 | initial_target_connection_window_size: None, |
645 | initial_max_send_streams: usize::MAX, |
646 | settings: Default::default(), |
647 | stream_id: 1.into(), |
648 | } |
649 | } |
650 | |
651 | /// Indicates the initial window size (in octets) for stream-level |
652 | /// flow control for received data. |
653 | /// |
654 | /// The initial window of a stream is used as part of flow control. For more |
655 | /// details, see [`FlowControl`]. |
656 | /// |
657 | /// The default value is 65,535. |
658 | /// |
659 | /// [`FlowControl`]: ../struct.FlowControl.html |
660 | /// |
661 | /// # Examples |
662 | /// |
663 | /// ``` |
664 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
665 | /// # use h2::client::*; |
666 | /// # use bytes::Bytes; |
667 | /// # |
668 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
669 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
670 | /// # { |
671 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
672 | /// // handshake. |
673 | /// let client_fut = Builder::new() |
674 | /// .initial_window_size(1_000_000) |
675 | /// .handshake(my_io); |
676 | /// # client_fut.await |
677 | /// # } |
678 | /// # |
679 | /// # pub fn main() {} |
680 | /// ``` |
681 | pub fn initial_window_size(&mut self, size: u32) -> &mut Self { |
682 | self.settings.set_initial_window_size(Some(size)); |
683 | self |
684 | } |
685 | |
686 | /// Indicates the initial window size (in octets) for connection-level flow control |
687 | /// for received data. |
688 | /// |
689 | /// The initial window of a connection is used as part of flow control. For more details, |
690 | /// see [`FlowControl`]. |
691 | /// |
692 | /// The default value is 65,535. |
693 | /// |
694 | /// [`FlowControl`]: ../struct.FlowControl.html |
695 | /// |
696 | /// # Examples |
697 | /// |
698 | /// ``` |
699 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
700 | /// # use h2::client::*; |
701 | /// # use bytes::Bytes; |
702 | /// # |
703 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
704 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
705 | /// # { |
706 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
707 | /// // handshake. |
708 | /// let client_fut = Builder::new() |
709 | /// .initial_connection_window_size(1_000_000) |
710 | /// .handshake(my_io); |
711 | /// # client_fut.await |
712 | /// # } |
713 | /// # |
714 | /// # pub fn main() {} |
715 | /// ``` |
716 | pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self { |
717 | self.initial_target_connection_window_size = Some(size); |
718 | self |
719 | } |
720 | |
721 | /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the |
722 | /// configured client is able to accept. |
723 | /// |
724 | /// The sender may send data frames that are **smaller** than this value, |
725 | /// but any data larger than `max` will be broken up into multiple `DATA` |
726 | /// frames. |
727 | /// |
728 | /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384. |
729 | /// |
730 | /// # Examples |
731 | /// |
732 | /// ``` |
733 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
734 | /// # use h2::client::*; |
735 | /// # use bytes::Bytes; |
736 | /// # |
737 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
738 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
739 | /// # { |
740 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
741 | /// // handshake. |
742 | /// let client_fut = Builder::new() |
743 | /// .max_frame_size(1_000_000) |
744 | /// .handshake(my_io); |
745 | /// # client_fut.await |
746 | /// # } |
747 | /// # |
748 | /// # pub fn main() {} |
749 | /// ``` |
750 | /// |
751 | /// # Panics |
752 | /// |
753 | /// This function panics if `max` is not within the legal range specified |
754 | /// above. |
755 | pub fn max_frame_size(&mut self, max: u32) -> &mut Self { |
756 | self.settings.set_max_frame_size(Some(max)); |
757 | self |
758 | } |
759 | |
760 | /// Sets the max size of received header frames. |
761 | /// |
762 | /// This advisory setting informs a peer of the maximum size of header list |
763 | /// that the sender is prepared to accept, in octets. The value is based on |
764 | /// the uncompressed size of header fields, including the length of the name |
765 | /// and value in octets plus an overhead of 32 octets for each header field. |
766 | /// |
767 | /// This setting is also used to limit the maximum amount of data that is |
768 | /// buffered to decode HEADERS frames. |
769 | /// |
770 | /// # Examples |
771 | /// |
772 | /// ``` |
773 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
774 | /// # use h2::client::*; |
775 | /// # use bytes::Bytes; |
776 | /// # |
777 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
778 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
779 | /// # { |
780 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
781 | /// // handshake. |
782 | /// let client_fut = Builder::new() |
783 | /// .max_header_list_size(16 * 1024) |
784 | /// .handshake(my_io); |
785 | /// # client_fut.await |
786 | /// # } |
787 | /// # |
788 | /// # pub fn main() {} |
789 | /// ``` |
790 | pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { |
791 | self.settings.set_max_header_list_size(Some(max)); |
792 | self |
793 | } |
794 | |
795 | /// Sets the maximum number of concurrent streams. |
796 | /// |
797 | /// The maximum concurrent streams setting only controls the maximum number |
798 | /// of streams that can be initiated by the remote peer. In other words, |
799 | /// when this setting is set to 100, this does not limit the number of |
800 | /// concurrent streams that can be created by the caller. |
801 | /// |
802 | /// It is recommended that this value be no smaller than 100, so as to not |
803 | /// unnecessarily limit parallelism. However, any value is legal, including |
804 | /// 0. If `max` is set to 0, then the remote will not be permitted to |
805 | /// initiate streams. |
806 | /// |
807 | /// Note that streams in the reserved state, i.e., push promises that have |
808 | /// been reserved but the stream has not started, do not count against this |
809 | /// setting. |
810 | /// |
811 | /// Also note that if the remote *does* exceed the value set here, it is not |
812 | /// a protocol level error. Instead, the `h2` library will immediately reset |
813 | /// the stream. |
814 | /// |
815 | /// See [Section 5.1.2] in the HTTP/2 spec for more details. |
816 | /// |
817 | /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 |
818 | /// |
819 | /// # Examples |
820 | /// |
821 | /// ``` |
822 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
823 | /// # use h2::client::*; |
824 | /// # use bytes::Bytes; |
825 | /// # |
826 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
827 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
828 | /// # { |
829 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
830 | /// // handshake. |
831 | /// let client_fut = Builder::new() |
832 | /// .max_concurrent_streams(1000) |
833 | /// .handshake(my_io); |
834 | /// # client_fut.await |
835 | /// # } |
836 | /// # |
837 | /// # pub fn main() {} |
838 | /// ``` |
839 | pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self { |
840 | self.settings.set_max_concurrent_streams(Some(max)); |
841 | self |
842 | } |
843 | |
844 | /// Sets the initial maximum of locally initiated (send) streams. |
845 | /// |
846 | /// The initial settings will be overwritten by the remote peer when |
847 | /// the Settings frame is received. The new value will be set to the |
848 | /// `max_concurrent_streams()` from the frame. |
849 | /// |
850 | /// This setting prevents the caller from exceeding this number of |
851 | /// streams that are counted towards the concurrency limit. |
852 | /// |
853 | /// Sending streams past the limit returned by the peer will be treated |
854 | /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM. |
855 | /// |
856 | /// See [Section 5.1.2] in the HTTP/2 spec for more details. |
857 | /// |
858 | /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 |
859 | /// |
860 | /// # Examples |
861 | /// |
862 | /// ``` |
863 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
864 | /// # use h2::client::*; |
865 | /// # use bytes::Bytes; |
866 | /// # |
867 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
868 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
869 | /// # { |
870 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
871 | /// // handshake. |
872 | /// let client_fut = Builder::new() |
873 | /// .initial_max_send_streams(1000) |
874 | /// .handshake(my_io); |
875 | /// # client_fut.await |
876 | /// # } |
877 | /// # |
878 | /// # pub fn main() {} |
879 | /// ``` |
880 | pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self { |
881 | self.initial_max_send_streams = initial; |
882 | self |
883 | } |
884 | |
885 | /// Sets the maximum number of concurrent locally reset streams. |
886 | /// |
887 | /// When a stream is explicitly reset, the HTTP/2 specification requires |
888 | /// that any further frames received for that stream must be ignored for |
889 | /// "some time". |
890 | /// |
891 | /// In order to satisfy the specification, internal state must be maintained |
892 | /// to implement the behavior. This state grows linearly with the number of |
893 | /// streams that are locally reset. |
894 | /// |
895 | /// The `max_concurrent_reset_streams` setting configures sets an upper |
896 | /// bound on the amount of state that is maintained. When this max value is |
897 | /// reached, the oldest reset stream is purged from memory. |
898 | /// |
899 | /// Once the stream has been fully purged from memory, any additional frames |
900 | /// received for that stream will result in a connection level protocol |
901 | /// error, forcing the connection to terminate. |
902 | /// |
903 | /// The default value is 10. |
904 | /// |
905 | /// # Examples |
906 | /// |
907 | /// ``` |
908 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
909 | /// # use h2::client::*; |
910 | /// # use bytes::Bytes; |
911 | /// # |
912 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
913 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
914 | /// # { |
915 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
916 | /// // handshake. |
917 | /// let client_fut = Builder::new() |
918 | /// .max_concurrent_reset_streams(1000) |
919 | /// .handshake(my_io); |
920 | /// # client_fut.await |
921 | /// # } |
922 | /// # |
923 | /// # pub fn main() {} |
924 | /// ``` |
925 | pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { |
926 | self.reset_stream_max = max; |
927 | self |
928 | } |
929 | |
930 | /// Sets the duration to remember locally reset streams. |
931 | /// |
932 | /// When a stream is explicitly reset, the HTTP/2 specification requires |
933 | /// that any further frames received for that stream must be ignored for |
934 | /// "some time". |
935 | /// |
936 | /// In order to satisfy the specification, internal state must be maintained |
937 | /// to implement the behavior. This state grows linearly with the number of |
938 | /// streams that are locally reset. |
939 | /// |
940 | /// The `reset_stream_duration` setting configures the max amount of time |
941 | /// this state will be maintained in memory. Once the duration elapses, the |
942 | /// stream state is purged from memory. |
943 | /// |
944 | /// Once the stream has been fully purged from memory, any additional frames |
945 | /// received for that stream will result in a connection level protocol |
946 | /// error, forcing the connection to terminate. |
947 | /// |
948 | /// The default value is 30 seconds. |
949 | /// |
950 | /// # Examples |
951 | /// |
952 | /// ``` |
953 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
954 | /// # use h2::client::*; |
955 | /// # use std::time::Duration; |
956 | /// # use bytes::Bytes; |
957 | /// # |
958 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
959 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
960 | /// # { |
961 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
962 | /// // handshake. |
963 | /// let client_fut = Builder::new() |
964 | /// .reset_stream_duration(Duration::from_secs(10)) |
965 | /// .handshake(my_io); |
966 | /// # client_fut.await |
967 | /// # } |
968 | /// # |
969 | /// # pub fn main() {} |
970 | /// ``` |
971 | pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self { |
972 | self.reset_stream_duration = dur; |
973 | self |
974 | } |
975 | |
976 | /// Sets the maximum number of pending-accept remotely-reset streams. |
977 | /// |
978 | /// Streams that have been received by the peer, but not accepted by the |
979 | /// user, can also receive a RST_STREAM. This is a legitimate pattern: one |
980 | /// could send a request and then shortly after, realize it is not needed, |
981 | /// sending a CANCEL. |
982 | /// |
983 | /// However, since those streams are now "closed", they don't count towards |
984 | /// the max concurrent streams. So, they will sit in the accept queue, |
985 | /// using memory. |
986 | /// |
987 | /// When the number of remotely-reset streams sitting in the pending-accept |
988 | /// queue reaches this maximum value, a connection error with the code of |
989 | /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the |
990 | /// `Future`. |
991 | /// |
992 | /// The default value is currently 20, but could change. |
993 | /// |
994 | /// # Examples |
995 | /// |
996 | /// ``` |
997 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
998 | /// # use h2::client::*; |
999 | /// # use bytes::Bytes; |
1000 | /// # |
1001 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1002 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1003 | /// # { |
1004 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1005 | /// // handshake. |
1006 | /// let client_fut = Builder::new() |
1007 | /// .max_pending_accept_reset_streams(100) |
1008 | /// .handshake(my_io); |
1009 | /// # client_fut.await |
1010 | /// # } |
1011 | /// # |
1012 | /// # pub fn main() {} |
1013 | /// ``` |
1014 | pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self { |
1015 | self.pending_accept_reset_stream_max = max; |
1016 | self |
1017 | } |
1018 | |
1019 | /// Sets the maximum send buffer size per stream. |
1020 | /// |
1021 | /// Once a stream has buffered up to (or over) the maximum, the stream's |
1022 | /// flow control will not "poll" additional capacity. Once bytes for the |
1023 | /// stream have been written to the connection, the send buffer capacity |
1024 | /// will be freed up again. |
1025 | /// |
1026 | /// The default is currently ~400KB, but may change. |
1027 | /// |
1028 | /// # Panics |
1029 | /// |
1030 | /// This function panics if `max` is larger than `u32::MAX`. |
1031 | pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { |
1032 | assert!(max <= std::u32::MAX as usize); |
1033 | self.max_send_buffer_size = max; |
1034 | self |
1035 | } |
1036 | |
1037 | /// Enables or disables server push promises. |
1038 | /// |
1039 | /// This value is included in the initial SETTINGS handshake. |
1040 | /// Setting this value to value to |
1041 | /// false in the initial SETTINGS handshake guarantees that the remote server |
1042 | /// will never send a push promise. |
1043 | /// |
1044 | /// This setting can be changed during the life of a single HTTP/2 |
1045 | /// connection by sending another settings frame updating the value. |
1046 | /// |
1047 | /// Default value: `true`. |
1048 | /// |
1049 | /// # Examples |
1050 | /// |
1051 | /// ``` |
1052 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1053 | /// # use h2::client::*; |
1054 | /// # use std::time::Duration; |
1055 | /// # use bytes::Bytes; |
1056 | /// # |
1057 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1058 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1059 | /// # { |
1060 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1061 | /// // handshake. |
1062 | /// let client_fut = Builder::new() |
1063 | /// .enable_push(false) |
1064 | /// .handshake(my_io); |
1065 | /// # client_fut.await |
1066 | /// # } |
1067 | /// # |
1068 | /// # pub fn main() {} |
1069 | /// ``` |
1070 | pub fn enable_push(&mut self, enabled: bool) -> &mut Self { |
1071 | self.settings.set_enable_push(enabled); |
1072 | self |
1073 | } |
1074 | |
1075 | /// Sets the header table size. |
1076 | /// |
1077 | /// This setting informs the peer of the maximum size of the header compression |
1078 | /// table used to encode header blocks, in octets. The encoder may select any value |
1079 | /// equal to or less than the header table size specified by the sender. |
1080 | /// |
1081 | /// The default value is 4,096. |
1082 | /// |
1083 | /// # Examples |
1084 | /// |
1085 | /// ``` |
1086 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1087 | /// # use h2::client::*; |
1088 | /// # use bytes::Bytes; |
1089 | /// # |
1090 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1091 | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1092 | /// # { |
1093 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1094 | /// // handshake. |
1095 | /// let client_fut = Builder::new() |
1096 | /// .header_table_size(1_000_000) |
1097 | /// .handshake(my_io); |
1098 | /// # client_fut.await |
1099 | /// # } |
1100 | /// # |
1101 | /// # pub fn main() {} |
1102 | /// ``` |
1103 | pub fn header_table_size(&mut self, size: u32) -> &mut Self { |
1104 | self.settings.set_header_table_size(Some(size)); |
1105 | self |
1106 | } |
1107 | |
1108 | /// Sets the first stream ID to something other than 1. |
1109 | #[cfg (feature = "unstable" )] |
1110 | pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self { |
1111 | self.stream_id = stream_id.into(); |
1112 | assert!( |
1113 | self.stream_id.is_client_initiated(), |
1114 | "stream id must be odd" |
1115 | ); |
1116 | self |
1117 | } |
1118 | |
1119 | /// Creates a new configured HTTP/2 client backed by `io`. |
1120 | /// |
1121 | /// It is expected that `io` already be in an appropriate state to commence |
1122 | /// the [HTTP/2 handshake]. The handshake is completed once both the connection |
1123 | /// preface and the initial settings frame is sent by the client. |
1124 | /// |
1125 | /// The handshake future does not wait for the initial settings frame from the |
1126 | /// server. |
1127 | /// |
1128 | /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] |
1129 | /// tuple once the HTTP/2 handshake has been completed. |
1130 | /// |
1131 | /// This function also allows the caller to configure the send payload data |
1132 | /// type. See [Outbound data type] for more details. |
1133 | /// |
1134 | /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
1135 | /// [`Connection`]: struct.Connection.html |
1136 | /// [`SendRequest`]: struct.SendRequest.html |
1137 | /// [Outbound data type]: ../index.html#outbound-data-type. |
1138 | /// |
1139 | /// # Examples |
1140 | /// |
1141 | /// Basic usage: |
1142 | /// |
1143 | /// ``` |
1144 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1145 | /// # use h2::client::*; |
1146 | /// # use bytes::Bytes; |
1147 | /// # |
1148 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1149 | /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1150 | /// # { |
1151 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1152 | /// // handshake. |
1153 | /// let client_fut = Builder::new() |
1154 | /// .handshake(my_io); |
1155 | /// # client_fut.await |
1156 | /// # } |
1157 | /// # |
1158 | /// # pub fn main() {} |
1159 | /// ``` |
1160 | /// |
1161 | /// Configures the send-payload data type. In this case, the outbound data |
1162 | /// type will be `&'static [u8]`. |
1163 | /// |
1164 | /// ``` |
1165 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1166 | /// # use h2::client::*; |
1167 | /// # |
1168 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1169 | /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error> |
1170 | /// # { |
1171 | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1172 | /// // handshake. |
1173 | /// let client_fut = Builder::new() |
1174 | /// .handshake::<_, &'static [u8]>(my_io); |
1175 | /// # client_fut.await |
1176 | /// # } |
1177 | /// # |
1178 | /// # pub fn main() {} |
1179 | /// ``` |
1180 | pub fn handshake<T, B>( |
1181 | &self, |
1182 | io: T, |
1183 | ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> |
1184 | where |
1185 | T: AsyncRead + AsyncWrite + Unpin, |
1186 | B: Buf, |
1187 | { |
1188 | Connection::handshake2(io, self.clone()) |
1189 | } |
1190 | } |
1191 | |
1192 | impl Default for Builder { |
1193 | fn default() -> Builder { |
1194 | Builder::new() |
1195 | } |
1196 | } |
1197 | |
1198 | /// Creates a new configured HTTP/2 client with default configuration |
1199 | /// values backed by `io`. |
1200 | /// |
1201 | /// It is expected that `io` already be in an appropriate state to commence |
1202 | /// the [HTTP/2 handshake]. See [Handshake] for more details. |
1203 | /// |
1204 | /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] |
1205 | /// tuple once the HTTP/2 handshake has been completed. The returned |
1206 | /// [`Connection`] instance will be using default configuration values. Use |
1207 | /// [`Builder`] to customize the configuration values used by a [`Connection`] |
1208 | /// instance. |
1209 | /// |
1210 | /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
1211 | /// [Handshake]: ../index.html#handshake |
1212 | /// [`Connection`]: struct.Connection.html |
1213 | /// [`SendRequest`]: struct.SendRequest.html |
1214 | /// |
1215 | /// # Examples |
1216 | /// |
1217 | /// ``` |
1218 | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1219 | /// # use h2::client; |
1220 | /// # use h2::client::*; |
1221 | /// # |
1222 | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error> |
1223 | /// # { |
1224 | /// let (send_request, connection) = client::handshake(my_io).await?; |
1225 | /// // The HTTP/2 handshake has completed, now start polling |
1226 | /// // `connection` and use `send_request` to send requests to the |
1227 | /// // server. |
1228 | /// # Ok(()) |
1229 | /// # } |
1230 | /// # |
1231 | /// # pub fn main() {} |
1232 | /// ``` |
1233 | pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> |
1234 | where |
1235 | T: AsyncRead + AsyncWrite + Unpin, |
1236 | { |
1237 | let builder: Builder = Builder::new(); |
1238 | builderInstrumented> |
1239 | .handshake(io) |
1240 | .instrument(tracing::trace_span!("client_handshake" )) |
1241 | .await |
1242 | } |
1243 | |
1244 | // ===== impl Connection ===== |
1245 | |
1246 | async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error> |
1247 | where |
1248 | T: AsyncRead + AsyncWrite + Unpin, |
1249 | { |
1250 | tracing::debug!("binding client connection" ); |
1251 | |
1252 | let msg: &'static [u8] = b"PRI * HTTP/2.0 \r\n\r\nSM \r\n\r\n" ; |
1253 | io.write_all(msg).await.map_err(op:crate::Error::from_io)?; |
1254 | |
1255 | tracing::debug!("client connection bound" ); |
1256 | |
1257 | Ok(()) |
1258 | } |
1259 | |
1260 | impl<T, B> Connection<T, B> |
1261 | where |
1262 | T: AsyncRead + AsyncWrite + Unpin, |
1263 | B: Buf, |
1264 | { |
1265 | async fn handshake2( |
1266 | mut io: T, |
1267 | builder: Builder, |
1268 | ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> { |
1269 | bind_connection(&mut io).await?; |
1270 | |
1271 | // Create the codec |
1272 | let mut codec = Codec::new(io); |
1273 | |
1274 | if let Some(max) = builder.settings.max_frame_size() { |
1275 | codec.set_max_recv_frame_size(max as usize); |
1276 | } |
1277 | |
1278 | if let Some(max) = builder.settings.max_header_list_size() { |
1279 | codec.set_max_recv_header_list_size(max as usize); |
1280 | } |
1281 | |
1282 | // Send initial settings frame |
1283 | codec |
1284 | .buffer(builder.settings.clone().into()) |
1285 | .expect("invalid SETTINGS frame" ); |
1286 | |
1287 | let inner = proto::Connection::new( |
1288 | codec, |
1289 | proto::Config { |
1290 | next_stream_id: builder.stream_id, |
1291 | initial_max_send_streams: builder.initial_max_send_streams, |
1292 | max_send_buffer_size: builder.max_send_buffer_size, |
1293 | reset_stream_duration: builder.reset_stream_duration, |
1294 | reset_stream_max: builder.reset_stream_max, |
1295 | remote_reset_stream_max: builder.pending_accept_reset_stream_max, |
1296 | settings: builder.settings.clone(), |
1297 | }, |
1298 | ); |
1299 | let send_request = SendRequest { |
1300 | inner: inner.streams().clone(), |
1301 | pending: None, |
1302 | }; |
1303 | |
1304 | let mut connection = Connection { inner }; |
1305 | if let Some(sz) = builder.initial_target_connection_window_size { |
1306 | connection.set_target_window_size(sz); |
1307 | } |
1308 | |
1309 | Ok((send_request, connection)) |
1310 | } |
1311 | |
1312 | /// Sets the target window size for the whole connection. |
1313 | /// |
1314 | /// If `size` is greater than the current value, then a `WINDOW_UPDATE` |
1315 | /// frame will be immediately sent to the remote, increasing the connection |
1316 | /// level window by `size - current_value`. |
1317 | /// |
1318 | /// If `size` is less than the current value, nothing will happen |
1319 | /// immediately. However, as window capacity is released by |
1320 | /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent |
1321 | /// out until the number of "in flight" bytes drops below `size`. |
1322 | /// |
1323 | /// The default value is 65,535. |
1324 | /// |
1325 | /// See [`FlowControl`] documentation for more details. |
1326 | /// |
1327 | /// [`FlowControl`]: ../struct.FlowControl.html |
1328 | /// [library level]: ../index.html#flow-control |
1329 | pub fn set_target_window_size(&mut self, size: u32) { |
1330 | assert!(size <= proto::MAX_WINDOW_SIZE); |
1331 | self.inner.set_target_window_size(size); |
1332 | } |
1333 | |
1334 | /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level |
1335 | /// flow control for received data. |
1336 | /// |
1337 | /// The `SETTINGS` will be sent to the remote, and only applied once the |
1338 | /// remote acknowledges the change. |
1339 | /// |
1340 | /// This can be used to increase or decrease the window size for existing |
1341 | /// streams. |
1342 | /// |
1343 | /// # Errors |
1344 | /// |
1345 | /// Returns an error if a previous call is still pending acknowledgement |
1346 | /// from the remote endpoint. |
1347 | pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { |
1348 | assert!(size <= proto::MAX_WINDOW_SIZE); |
1349 | self.inner.set_initial_window_size(size)?; |
1350 | Ok(()) |
1351 | } |
1352 | |
1353 | /// Takes a `PingPong` instance from the connection. |
1354 | /// |
1355 | /// # Note |
1356 | /// |
1357 | /// This may only be called once. Calling multiple times will return `None`. |
1358 | pub fn ping_pong(&mut self) -> Option<PingPong> { |
1359 | self.inner.take_user_pings().map(PingPong::new) |
1360 | } |
1361 | |
1362 | /// Returns the maximum number of concurrent streams that may be initiated |
1363 | /// by this client. |
1364 | /// |
1365 | /// This limit is configured by the server peer by sending the |
1366 | /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame. |
1367 | /// This method returns the currently acknowledged value received from the |
1368 | /// remote. |
1369 | /// |
1370 | /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 |
1371 | pub fn max_concurrent_send_streams(&self) -> usize { |
1372 | self.inner.max_send_streams() |
1373 | } |
1374 | /// Returns the maximum number of concurrent streams that may be initiated |
1375 | /// by the server on this connection. |
1376 | /// |
1377 | /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS` |
1378 | /// parameter][1] sent in a `SETTINGS` frame that has been |
1379 | /// acknowledged by the remote peer. The value to be sent is configured by |
1380 | /// the [`Builder::max_concurrent_streams`][2] method before handshaking |
1381 | /// with the remote peer. |
1382 | /// |
1383 | /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 |
1384 | /// [2]: ../struct.Builder.html#method.max_concurrent_streams |
1385 | pub fn max_concurrent_recv_streams(&self) -> usize { |
1386 | self.inner.max_recv_streams() |
1387 | } |
1388 | } |
1389 | |
1390 | impl<T, B> Future for Connection<T, B> |
1391 | where |
1392 | T: AsyncRead + AsyncWrite + Unpin, |
1393 | B: Buf, |
1394 | { |
1395 | type Output = Result<(), crate::Error>; |
1396 | |
1397 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1398 | self.inner.maybe_close_connection_if_no_streams(); |
1399 | self.inner.poll(cx).map_err(Into::into) |
1400 | } |
1401 | } |
1402 | |
1403 | impl<T, B> fmt::Debug for Connection<T, B> |
1404 | where |
1405 | T: AsyncRead + AsyncWrite, |
1406 | T: fmt::Debug, |
1407 | B: fmt::Debug + Buf, |
1408 | { |
1409 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
1410 | fmt::Debug::fmt(&self.inner, f:fmt) |
1411 | } |
1412 | } |
1413 | |
1414 | // ===== impl ResponseFuture ===== |
1415 | |
1416 | impl Future for ResponseFuture { |
1417 | type Output = Result<Response<RecvStream>, crate::Error>; |
1418 | |
1419 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1420 | let (parts: Parts, _) = ready!(self.inner.poll_response(cx))?.into_parts(); |
1421 | let body: RecvStream = RecvStream::new(inner:FlowControl::new(self.inner.clone())); |
1422 | |
1423 | Poll::Ready(Ok(Response::from_parts(parts, body))) |
1424 | } |
1425 | } |
1426 | |
1427 | impl ResponseFuture { |
1428 | /// Returns the stream ID of the response stream. |
1429 | /// |
1430 | /// # Panics |
1431 | /// |
1432 | /// If the lock on the stream store has been poisoned. |
1433 | pub fn stream_id(&self) -> crate::StreamId { |
1434 | crate::StreamId::from_internal(self.inner.stream_id()) |
1435 | } |
1436 | /// Returns a stream of PushPromises |
1437 | /// |
1438 | /// # Panics |
1439 | /// |
1440 | /// If this method has been called before |
1441 | /// or the stream was itself was pushed |
1442 | pub fn push_promises(&mut self) -> PushPromises { |
1443 | if self.push_promise_consumed { |
1444 | panic!("Reference to push promises stream taken!" ); |
1445 | } |
1446 | self.push_promise_consumed = true; |
1447 | PushPromises { |
1448 | inner: self.inner.clone(), |
1449 | } |
1450 | } |
1451 | } |
1452 | |
1453 | // ===== impl PushPromises ===== |
1454 | |
1455 | impl PushPromises { |
1456 | /// Get the next `PushPromise`. |
1457 | pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> { |
1458 | futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await |
1459 | } |
1460 | |
1461 | #[doc (hidden)] |
1462 | pub fn poll_push_promise( |
1463 | &mut self, |
1464 | cx: &mut Context<'_>, |
1465 | ) -> Poll<Option<Result<PushPromise, crate::Error>>> { |
1466 | match self.inner.poll_pushed(cx) { |
1467 | Poll::Ready(Some(Ok((request, response)))) => { |
1468 | let response = PushedResponseFuture { |
1469 | inner: ResponseFuture { |
1470 | inner: response, |
1471 | push_promise_consumed: false, |
1472 | }, |
1473 | }; |
1474 | Poll::Ready(Some(Ok(PushPromise { request, response }))) |
1475 | } |
1476 | Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), |
1477 | Poll::Ready(None) => Poll::Ready(None), |
1478 | Poll::Pending => Poll::Pending, |
1479 | } |
1480 | } |
1481 | } |
1482 | |
1483 | #[cfg (feature = "stream" )] |
1484 | impl futures_core::Stream for PushPromises { |
1485 | type Item = Result<PushPromise, crate::Error>; |
1486 | |
1487 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1488 | self.poll_push_promise(cx) |
1489 | } |
1490 | } |
1491 | |
1492 | // ===== impl PushPromise ===== |
1493 | |
1494 | impl PushPromise { |
1495 | /// Returns a reference to the push promise's request headers. |
1496 | pub fn request(&self) -> &Request<()> { |
1497 | &self.request |
1498 | } |
1499 | |
1500 | /// Returns a mutable reference to the push promise's request headers. |
1501 | pub fn request_mut(&mut self) -> &mut Request<()> { |
1502 | &mut self.request |
1503 | } |
1504 | |
1505 | /// Consumes `self`, returning the push promise's request headers and |
1506 | /// response future. |
1507 | pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) { |
1508 | (self.request, self.response) |
1509 | } |
1510 | } |
1511 | |
1512 | // ===== impl PushedResponseFuture ===== |
1513 | |
1514 | impl Future for PushedResponseFuture { |
1515 | type Output = Result<Response<RecvStream>, crate::Error>; |
1516 | |
1517 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1518 | Pin::new(&mut self.inner).poll(cx) |
1519 | } |
1520 | } |
1521 | |
1522 | impl PushedResponseFuture { |
1523 | /// Returns the stream ID of the response stream. |
1524 | /// |
1525 | /// # Panics |
1526 | /// |
1527 | /// If the lock on the stream store has been poisoned. |
1528 | pub fn stream_id(&self) -> crate::StreamId { |
1529 | self.inner.stream_id() |
1530 | } |
1531 | } |
1532 | |
1533 | // ===== impl Peer ===== |
1534 | |
1535 | impl Peer { |
1536 | pub fn convert_send_message( |
1537 | id: StreamId, |
1538 | request: Request<()>, |
1539 | protocol: Option<Protocol>, |
1540 | end_of_stream: bool, |
1541 | ) -> Result<Headers, SendError> { |
1542 | use http::request::Parts; |
1543 | |
1544 | let ( |
1545 | Parts { |
1546 | method, |
1547 | uri, |
1548 | headers, |
1549 | version, |
1550 | .. |
1551 | }, |
1552 | _, |
1553 | ) = request.into_parts(); |
1554 | |
1555 | let is_connect = method == Method::CONNECT; |
1556 | |
1557 | // Build the set pseudo header set. All requests will include `method` |
1558 | // and `path`. |
1559 | let mut pseudo = Pseudo::request(method, uri, protocol); |
1560 | |
1561 | if pseudo.scheme.is_none() { |
1562 | // If the scheme is not set, then there are a two options. |
1563 | // |
1564 | // 1) Authority is not set. In this case, a request was issued with |
1565 | // a relative URI. This is permitted **only** when forwarding |
1566 | // HTTP 1.x requests. If the HTTP version is set to 2.0, then |
1567 | // this is an error. |
1568 | // |
1569 | // 2) Authority is set, then the HTTP method *must* be CONNECT. |
1570 | // |
1571 | // It is not possible to have a scheme but not an authority set (the |
1572 | // `http` crate does not allow it). |
1573 | // |
1574 | if pseudo.authority.is_none() { |
1575 | if version == Version::HTTP_2 { |
1576 | return Err(UserError::MissingUriSchemeAndAuthority.into()); |
1577 | } else { |
1578 | // This is acceptable as per the above comment. However, |
1579 | // HTTP/2 requires that a scheme is set. Since we are |
1580 | // forwarding an HTTP 1.1 request, the scheme is set to |
1581 | // "http". |
1582 | pseudo.set_scheme(uri::Scheme::HTTP); |
1583 | } |
1584 | } else if !is_connect { |
1585 | // TODO: Error |
1586 | } |
1587 | } |
1588 | |
1589 | // Create the HEADERS frame |
1590 | let mut frame = Headers::new(id, pseudo, headers); |
1591 | |
1592 | if end_of_stream { |
1593 | frame.set_end_stream() |
1594 | } |
1595 | |
1596 | Ok(frame) |
1597 | } |
1598 | } |
1599 | |
1600 | impl proto::Peer for Peer { |
1601 | type Poll = Response<()>; |
1602 | |
1603 | const NAME: &'static str = "Client" ; |
1604 | |
1605 | fn r#dyn() -> proto::DynPeer { |
1606 | proto::DynPeer::Client |
1607 | } |
1608 | |
1609 | fn is_server() -> bool { |
1610 | false |
1611 | } |
1612 | |
1613 | fn convert_poll_message( |
1614 | pseudo: Pseudo, |
1615 | fields: HeaderMap, |
1616 | stream_id: StreamId, |
1617 | ) -> Result<Self::Poll, Error> { |
1618 | let mut b = Response::builder(); |
1619 | |
1620 | b = b.version(Version::HTTP_2); |
1621 | |
1622 | if let Some(status) = pseudo.status { |
1623 | b = b.status(status); |
1624 | } |
1625 | |
1626 | let mut response = match b.body(()) { |
1627 | Ok(response) => response, |
1628 | Err(_) => { |
1629 | // TODO: Should there be more specialized handling for different |
1630 | // kinds of errors |
1631 | return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); |
1632 | } |
1633 | }; |
1634 | |
1635 | *response.headers_mut() = fields; |
1636 | |
1637 | Ok(response) |
1638 | } |
1639 | } |
1640 | |