1//! Lower-level client connection API.
2//!
3//! The types in this module are to provide a lower-level API based around a
4//! single connection. Connecting to a host, pooling connections, and the like
5//! are not handled at this level. This module provides the building blocks to
6//! customize those things externally.
7//!
8//! If don't have need to manage connections yourself, consider using the
9//! higher-level [Client](super) API.
10//!
11//! ## Example
12//! A simple example that uses the `SendRequest` struct to talk HTTP over a Tokio TCP stream
13//! ```no_run
14//! # #[cfg(all(feature = "client", feature = "http1", feature = "runtime"))]
15//! # mod rt {
16//! use tower::ServiceExt;
17//! use http::{Request, StatusCode};
18//! use hyper::{client::conn, Body};
19//! use tokio::net::TcpStream;
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//! let target_stream = TcpStream::connect("example.com:80").await?;
24//!
25//! let (mut request_sender, connection) = conn::handshake(target_stream).await?;
26//!
27//! // spawn a task to poll the connection and drive the HTTP state
28//! tokio::spawn(async move {
29//! if let Err(e) = connection.await {
30//! eprintln!("Error in connection: {}", e);
31//! }
32//! });
33//!
34//! let request = Request::builder()
35//! // We need to manually add the host header because SendRequest does not
36//! .header("Host", "example.com")
37//! .method("GET")
38//! .body(Body::from(""))?;
39//! let response = request_sender.send_request(request).await?;
40//! assert!(response.status() == StatusCode::OK);
41//!
42//! // To send via the same connection again, it may not work as it may not be ready,
43//! // so we have to wait until the request_sender becomes ready.
44//! request_sender.ready().await?;
45//! let request = Request::builder()
46//! .header("Host", "example.com")
47//! .method("GET")
48//! .body(Body::from(""))?;
49//! let response = request_sender.send_request(request).await?;
50//! assert!(response.status() == StatusCode::OK);
51//! Ok(())
52//! }
53//!
54//! # }
55//! ```
56
57use std::error::Error as StdError;
58use std::fmt;
59#[cfg(not(all(feature = "http1", feature = "http2")))]
60use std::marker::PhantomData;
61use std::sync::Arc;
62#[cfg(all(feature = "runtime", feature = "http2"))]
63use std::time::Duration;
64
65use bytes::Bytes;
66use futures_util::future::{self, Either, FutureExt as _};
67use httparse::ParserConfig;
68use pin_project_lite::pin_project;
69use tokio::io::{AsyncRead, AsyncWrite};
70use tower_service::Service;
71use tracing::{debug, trace};
72
73use super::dispatch;
74use crate::body::HttpBody;
75#[cfg(not(all(feature = "http1", feature = "http2")))]
76use crate::common::Never;
77use crate::common::{
78 exec::{BoxSendFuture, Exec},
79 task, Future, Pin, Poll,
80};
81use crate::proto;
82use crate::rt::Executor;
83#[cfg(feature = "http1")]
84use crate::upgrade::Upgraded;
85use crate::{Body, Request, Response};
86
87#[cfg(feature = "http1")]
88type Http1Dispatcher<T, B> =
89 proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
90
91#[cfg(not(feature = "http1"))]
92type Http1Dispatcher<T, B> = (Never, PhantomData<(T, Pin<Box<B>>)>);
93
94#[cfg(feature = "http2")]
95type Http2ClientTask<B> = proto::h2::ClientTask<B>;
96
97#[cfg(not(feature = "http2"))]
98type Http2ClientTask<B> = (Never, PhantomData<Pin<Box<B>>>);
99
100pin_project! {
101 #[project = ProtoClientProj]
102 enum ProtoClient<T, B>
103 where
104 B: HttpBody,
105 {
106 H1 {
107 #[pin]
108 h1: Http1Dispatcher<T, B>,
109 },
110 H2 {
111 #[pin]
112 h2: Http2ClientTask<B>,
113 },
114 }
115}
116
117/// Returns a handshake future over some IO.
118///
119/// This is a shortcut for `Builder::new().handshake(io)`.
120/// See [`client::conn`](crate::client::conn) for more.
121pub async fn handshake<T>(
122 io: T,
123) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
124where
125 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
126{
127 Builder::new().handshake(io).await
128}
129
130/// The sender side of an established connection.
131pub struct SendRequest<B> {
132 dispatch: dispatch::Sender<Request<B>, Response<Body>>,
133}
134
135/// A future that processes all HTTP state for the IO object.
136///
137/// In most cases, this should just be spawned into an executor, so that it
138/// can process incoming and outgoing messages, notice hangups, and the like.
139#[must_use = "futures do nothing unless polled"]
140pub struct Connection<T, B>
141where
142 T: AsyncRead + AsyncWrite + Send + 'static,
143 B: HttpBody + 'static,
144{
145 inner: Option<ProtoClient<T, B>>,
146}
147
148/// A builder to configure an HTTP connection.
149///
150/// After setting options, the builder is used to create a handshake future.
151#[derive(Clone, Debug)]
152pub struct Builder {
153 pub(super) exec: Exec,
154 h09_responses: bool,
155 h1_parser_config: ParserConfig,
156 h1_writev: Option<bool>,
157 h1_title_case_headers: bool,
158 h1_preserve_header_case: bool,
159 #[cfg(feature = "ffi")]
160 h1_preserve_header_order: bool,
161 h1_read_buf_exact_size: Option<usize>,
162 h1_max_buf_size: Option<usize>,
163 #[cfg(feature = "ffi")]
164 h1_headers_raw: bool,
165 #[cfg(feature = "http2")]
166 h2_builder: proto::h2::client::Config,
167 version: Proto,
168}
169
170#[derive(Clone, Debug)]
171enum Proto {
172 #[cfg(feature = "http1")]
173 Http1,
174 #[cfg(feature = "http2")]
175 Http2,
176}
177
178/// A future returned by `SendRequest::send_request`.
179///
180/// Yields a `Response` if successful.
181#[must_use = "futures do nothing unless polled"]
182pub struct ResponseFuture {
183 inner: ResponseFutureState,
184}
185
186enum ResponseFutureState {
187 Waiting(dispatch::Promise<Response<Body>>),
188 // Option is to be able to `take()` it in `poll`
189 Error(Option<crate::Error>),
190}
191
192/// Deconstructed parts of a `Connection`.
193///
194/// This allows taking apart a `Connection` at a later time, in order to
195/// reclaim the IO object, and additional related pieces.
196#[derive(Debug)]
197pub struct Parts<T> {
198 /// The original IO object used in the handshake.
199 pub io: T,
200 /// A buffer of bytes that have been read but not processed as HTTP.
201 ///
202 /// For instance, if the `Connection` is used for an HTTP upgrade request,
203 /// it is possible the server sent back the first bytes of the new protocol
204 /// along with the response upgrade.
205 ///
206 /// You will want to check for any existing bytes if you plan to continue
207 /// communicating on the IO object.
208 pub read_buf: Bytes,
209 _inner: (),
210}
211
212// ========== internal client api
213
214// A `SendRequest` that can be cloned to send HTTP2 requests.
215// private for now, probably not a great idea of a type...
216#[must_use = "futures do nothing unless polled"]
217#[cfg(feature = "http2")]
218pub(super) struct Http2SendRequest<B> {
219 dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>,
220}
221
222// ===== impl SendRequest
223
224impl<B> SendRequest<B> {
225 /// Polls to determine whether this sender can be used yet for a request.
226 ///
227 /// If the associated connection is closed, this returns an Error.
228 pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
229 self.dispatch.poll_ready(cx)
230 }
231
232 pub(super) async fn when_ready(self) -> crate::Result<Self> {
233 let mut me = Some(self);
234 future::poll_fn(move |cx| {
235 ready!(me.as_mut().unwrap().poll_ready(cx))?;
236 Poll::Ready(Ok(me.take().unwrap()))
237 })
238 .await
239 }
240
241 pub(super) fn is_ready(&self) -> bool {
242 self.dispatch.is_ready()
243 }
244
245 pub(super) fn is_closed(&self) -> bool {
246 self.dispatch.is_closed()
247 }
248
249 #[cfg(feature = "http2")]
250 pub(super) fn into_http2(self) -> Http2SendRequest<B> {
251 Http2SendRequest {
252 dispatch: self.dispatch.unbound(),
253 }
254 }
255}
256
257impl<B> SendRequest<B>
258where
259 B: HttpBody + 'static,
260{
261 /// Sends a `Request` on the associated connection.
262 ///
263 /// Returns a future that if successful, yields the `Response`.
264 ///
265 /// # Note
266 ///
267 /// There are some key differences in what automatic things the `Client`
268 /// does for you that will not be done here:
269 ///
270 /// - `Client` requires absolute-form `Uri`s, since the scheme and
271 /// authority are needed to connect. They aren't required here.
272 /// - Since the `Client` requires absolute-form `Uri`s, it can add
273 /// the `Host` header based on it. You must add a `Host` header yourself
274 /// before calling this method.
275 /// - Since absolute-form `Uri`s are not required, if received, they will
276 /// be serialized as-is.
277 ///
278 /// # Example
279 ///
280 /// ```
281 /// # use http::header::HOST;
282 /// # use hyper::client::conn::SendRequest;
283 /// # use hyper::Body;
284 /// use hyper::Request;
285 ///
286 /// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
287 /// // build a Request
288 /// let req = Request::builder()
289 /// .uri("/foo/bar")
290 /// .header(HOST, "hyper.rs")
291 /// .body(Body::empty())
292 /// .unwrap();
293 ///
294 /// // send it and await a Response
295 /// let res = tx.send_request(req).await?;
296 /// // assert the Response
297 /// assert!(res.status().is_success());
298 /// # Ok(())
299 /// # }
300 /// # fn main() {}
301 /// ```
302 pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
303 let inner = match self.dispatch.send(req) {
304 Ok(rx) => ResponseFutureState::Waiting(rx),
305 Err(_req) => {
306 debug!("connection was not ready");
307 let err = crate::Error::new_canceled().with("connection was not ready");
308 ResponseFutureState::Error(Some(err))
309 }
310 };
311
312 ResponseFuture { inner }
313 }
314
315 pub(super) fn send_request_retryable(
316 &mut self,
317 req: Request<B>,
318 ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
319 where
320 B: Send,
321 {
322 match self.dispatch.try_send(req) {
323 Ok(rx) => {
324 Either::Left(rx.then(move |res| {
325 match res {
326 Ok(Ok(res)) => future::ok(res),
327 Ok(Err(err)) => future::err(err),
328 // this is definite bug if it happens, but it shouldn't happen!
329 Err(_) => panic!("dispatch dropped without returning error"),
330 }
331 }))
332 }
333 Err(req) => {
334 debug!("connection was not ready");
335 let err = crate::Error::new_canceled().with("connection was not ready");
336 Either::Right(future::err((err, Some(req))))
337 }
338 }
339 }
340}
341
342impl<B> Service<Request<B>> for SendRequest<B>
343where
344 B: HttpBody + 'static,
345{
346 type Response = Response<Body>;
347 type Error = crate::Error;
348 type Future = ResponseFuture;
349
350 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
351 self.poll_ready(cx)
352 }
353
354 fn call(&mut self, req: Request<B>) -> Self::Future {
355 self.send_request(req)
356 }
357}
358
359impl<B> fmt::Debug for SendRequest<B> {
360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 f.debug_struct(name:"SendRequest").finish()
362 }
363}
364
365// ===== impl Http2SendRequest
366
367#[cfg(feature = "http2")]
368impl<B> Http2SendRequest<B> {
369 pub(super) fn is_ready(&self) -> bool {
370 self.dispatch.is_ready()
371 }
372
373 pub(super) fn is_closed(&self) -> bool {
374 self.dispatch.is_closed()
375 }
376}
377
378#[cfg(feature = "http2")]
379impl<B> Http2SendRequest<B>
380where
381 B: HttpBody + 'static,
382{
383 pub(super) fn send_request_retryable(
384 &mut self,
385 req: Request<B>,
386 ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
387 where
388 B: Send,
389 {
390 match self.dispatch.try_send(req) {
391 Ok(rx) => {
392 Either::Left(rx.then(move |res| {
393 match res {
394 Ok(Ok(res)) => future::ok(res),
395 Ok(Err(err)) => future::err(err),
396 // this is definite bug if it happens, but it shouldn't happen!
397 Err(_) => panic!("dispatch dropped without returning error"),
398 }
399 }))
400 }
401 Err(req) => {
402 debug!("connection was not ready");
403 let err = crate::Error::new_canceled().with("connection was not ready");
404 Either::Right(future::err((err, Some(req))))
405 }
406 }
407 }
408}
409
410#[cfg(feature = "http2")]
411impl<B> fmt::Debug for Http2SendRequest<B> {
412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413 f.debug_struct(name:"Http2SendRequest").finish()
414 }
415}
416
417#[cfg(feature = "http2")]
418impl<B> Clone for Http2SendRequest<B> {
419 fn clone(&self) -> Self {
420 Http2SendRequest {
421 dispatch: self.dispatch.clone(),
422 }
423 }
424}
425
426// ===== impl Connection
427
428impl<T, B> Connection<T, B>
429where
430 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
431 B: HttpBody + Unpin + Send + 'static,
432 B::Data: Send,
433 B::Error: Into<Box<dyn StdError + Send + Sync>>,
434{
435 /// Return the inner IO object, and additional information.
436 ///
437 /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
438 pub fn into_parts(self) -> Parts<T> {
439 match self.inner.expect("already upgraded") {
440 #[cfg(feature = "http1")]
441 ProtoClient::H1 { h1 } => {
442 let (io, read_buf, _) = h1.into_inner();
443 Parts {
444 io,
445 read_buf,
446 _inner: (),
447 }
448 }
449 ProtoClient::H2 { .. } => {
450 panic!("http2 cannot into_inner");
451 }
452
453 #[cfg(not(feature = "http1"))]
454 ProtoClient::H1 { h1 } => match h1.0 {},
455 }
456 }
457
458 /// Poll the connection for completion, but without calling `shutdown`
459 /// on the underlying IO.
460 ///
461 /// This is useful to allow running a connection while doing an HTTP
462 /// upgrade. Once the upgrade is completed, the connection would be "done",
463 /// but it is not desired to actually shutdown the IO object. Instead you
464 /// would take it back using `into_parts`.
465 ///
466 /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
467 /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
468 /// to work with this function; or use the `without_shutdown` wrapper.
469 pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
470 match *self.inner.as_mut().expect("already upgraded") {
471 #[cfg(feature = "http1")]
472 ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
473 #[cfg(feature = "http2")]
474 ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
475
476 #[cfg(not(feature = "http1"))]
477 ProtoClient::H1 { ref mut h1 } => match h1.0 {},
478 #[cfg(not(feature = "http2"))]
479 ProtoClient::H2 { ref mut h2, .. } => match h2.0 {},
480 }
481 }
482
483 /// Prevent shutdown of the underlying IO object at the end of service the request,
484 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
485 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
486 let mut conn = Some(self);
487 future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
488 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
489 Poll::Ready(Ok(conn.take().unwrap().into_parts()))
490 })
491 }
492
493 /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
494 ///
495 /// This setting is configured by the server peer by sending the
496 /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
497 /// This method returns the currently acknowledged value received from the
498 /// remote.
499 ///
500 /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
501 /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
502 #[cfg(feature = "http2")]
503 pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
504 match self.inner.as_ref().unwrap() {
505 ProtoClient::H1 { .. } => false,
506 ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
507 }
508 }
509}
510
511impl<T, B> Future for Connection<T, B>
512where
513 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
514 B: HttpBody + Send + 'static,
515 B::Data: Send,
516 B::Error: Into<Box<dyn StdError + Send + Sync>>,
517{
518 type Output = crate::Result<()>;
519
520 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
521 match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
522 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
523 #[cfg(feature = "http1")]
524 proto::Dispatched::Upgrade(pending: Pending) => match self.inner.take() {
525 Some(ProtoClient::H1 { h1: Dispatcher, B, …, …> }) => {
526 let (io: T, buf: Bytes, _) = h1.into_inner();
527 pending.fulfill(Upgraded::new(io, read_buf:buf));
528 Poll::Ready(Ok(()))
529 }
530 _ => {
531 drop(pending);
532 unreachable!("Upgrade expects h1");
533 }
534 },
535 }
536 }
537}
538
539impl<T, B> fmt::Debug for Connection<T, B>
540where
541 T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
542 B: HttpBody + 'static,
543{
544 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
545 f.debug_struct(name:"Connection").finish()
546 }
547}
548
549// ===== impl Builder
550
551impl Builder {
552 /// Creates a new connection builder.
553 #[inline]
554 pub fn new() -> Builder {
555 Builder {
556 exec: Exec::Default,
557 h09_responses: false,
558 h1_writev: None,
559 h1_read_buf_exact_size: None,
560 h1_parser_config: Default::default(),
561 h1_title_case_headers: false,
562 h1_preserve_header_case: false,
563 #[cfg(feature = "ffi")]
564 h1_preserve_header_order: false,
565 h1_max_buf_size: None,
566 #[cfg(feature = "ffi")]
567 h1_headers_raw: false,
568 #[cfg(feature = "http2")]
569 h2_builder: Default::default(),
570 #[cfg(feature = "http1")]
571 version: Proto::Http1,
572 #[cfg(not(feature = "http1"))]
573 version: Proto::Http2,
574 }
575 }
576
577 /// Provide an executor to execute background HTTP2 tasks.
578 pub fn executor<E>(&mut self, exec: E) -> &mut Builder
579 where
580 E: Executor<BoxSendFuture> + Send + Sync + 'static,
581 {
582 self.exec = Exec::Executor(Arc::new(exec));
583 self
584 }
585
586 /// Set whether HTTP/0.9 responses should be tolerated.
587 ///
588 /// Default is false.
589 pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
590 self.h09_responses = enabled;
591 self
592 }
593
594 /// Set whether HTTP/1 connections will accept spaces between header names
595 /// and the colon that follow them in responses.
596 ///
597 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
598 /// to say about it:
599 ///
600 /// > No whitespace is allowed between the header field-name and colon. In
601 /// > the past, differences in the handling of such whitespace have led to
602 /// > security vulnerabilities in request routing and response handling. A
603 /// > server MUST reject any received request message that contains
604 /// > whitespace between a header field-name and colon with a response code
605 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
606 /// > response message before forwarding the message downstream.
607 ///
608 /// Note that this setting does not affect HTTP/2.
609 ///
610 /// Default is false.
611 ///
612 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
613 pub fn http1_allow_spaces_after_header_name_in_responses(
614 &mut self,
615 enabled: bool,
616 ) -> &mut Builder {
617 self.h1_parser_config
618 .allow_spaces_after_header_name_in_responses(enabled);
619 self
620 }
621
622 /// Set whether HTTP/1 connections will accept obsolete line folding for
623 /// header values.
624 ///
625 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
626 /// parsing.
627 ///
628 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
629 /// to say about it:
630 ///
631 /// > A server that receives an obs-fold in a request message that is not
632 /// > within a message/http container MUST either reject the message by
633 /// > sending a 400 (Bad Request), preferably with a representation
634 /// > explaining that obsolete line folding is unacceptable, or replace
635 /// > each received obs-fold with one or more SP octets prior to
636 /// > interpreting the field value or forwarding the message downstream.
637 ///
638 /// > A proxy or gateway that receives an obs-fold in a response message
639 /// > that is not within a message/http container MUST either discard the
640 /// > message and replace it with a 502 (Bad Gateway) response, preferably
641 /// > with a representation explaining that unacceptable line folding was
642 /// > received, or replace each received obs-fold with one or more SP
643 /// > octets prior to interpreting the field value or forwarding the
644 /// > message downstream.
645 ///
646 /// > A user agent that receives an obs-fold in a response message that is
647 /// > not within a message/http container MUST replace each received
648 /// > obs-fold with one or more SP octets prior to interpreting the field
649 /// > value.
650 ///
651 /// Note that this setting does not affect HTTP/2.
652 ///
653 /// Default is false.
654 ///
655 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
656 pub fn http1_allow_obsolete_multiline_headers_in_responses(
657 &mut self,
658 enabled: bool,
659 ) -> &mut Builder {
660 self.h1_parser_config
661 .allow_obsolete_multiline_headers_in_responses(enabled);
662 self
663 }
664
665 /// Set whether HTTP/1 connections will silently ignored malformed header lines.
666 ///
667 /// If this is enabled and and a header line does not start with a valid header
668 /// name, or does not include a colon at all, the line will be silently ignored
669 /// and no error will be reported.
670 ///
671 /// Note that this setting does not affect HTTP/2.
672 ///
673 /// Default is false.
674 pub fn http1_ignore_invalid_headers_in_responses(
675 &mut self,
676 enabled: bool,
677 ) -> &mut Builder {
678 self.h1_parser_config
679 .ignore_invalid_headers_in_responses(enabled);
680 self
681 }
682
683 /// Set whether HTTP/1 connections should try to use vectored writes,
684 /// or always flatten into a single buffer.
685 ///
686 /// Note that setting this to false may mean more copies of body data,
687 /// but may also improve performance when an IO transport doesn't
688 /// support vectored writes well, such as most TLS implementations.
689 ///
690 /// Setting this to true will force hyper to use queued strategy
691 /// which may eliminate unnecessary cloning on some TLS backends
692 ///
693 /// Default is `auto`. In this mode hyper will try to guess which
694 /// mode to use
695 pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
696 self.h1_writev = Some(enabled);
697 self
698 }
699
700 /// Set whether HTTP/1 connections will write header names as title case at
701 /// the socket level.
702 ///
703 /// Note that this setting does not affect HTTP/2.
704 ///
705 /// Default is false.
706 pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
707 self.h1_title_case_headers = enabled;
708 self
709 }
710
711 /// Set whether to support preserving original header cases.
712 ///
713 /// Currently, this will record the original cases received, and store them
714 /// in a private extension on the `Response`. It will also look for and use
715 /// such an extension in any provided `Request`.
716 ///
717 /// Since the relevant extension is still private, there is no way to
718 /// interact with the original cases. The only effect this can have now is
719 /// to forward the cases in a proxy-like fashion.
720 ///
721 /// Note that this setting does not affect HTTP/2.
722 ///
723 /// Default is false.
724 pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
725 self.h1_preserve_header_case = enabled;
726 self
727 }
728
729 /// Set whether to support preserving original header order.
730 ///
731 /// Currently, this will record the order in which headers are received, and store this
732 /// ordering in a private extension on the `Response`. It will also look for and use
733 /// such an extension in any provided `Request`.
734 ///
735 /// Note that this setting does not affect HTTP/2.
736 ///
737 /// Default is false.
738 #[cfg(feature = "ffi")]
739 pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
740 self.h1_preserve_header_order = enabled;
741 self
742 }
743
744 /// Sets the exact size of the read buffer to *always* use.
745 ///
746 /// Note that setting this option unsets the `http1_max_buf_size` option.
747 ///
748 /// Default is an adaptive read buffer.
749 pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
750 self.h1_read_buf_exact_size = sz;
751 self.h1_max_buf_size = None;
752 self
753 }
754
755 /// Set the maximum buffer size for the connection.
756 ///
757 /// Default is ~400kb.
758 ///
759 /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
760 ///
761 /// # Panics
762 ///
763 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
764 #[cfg(feature = "http1")]
765 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
766 pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
767 assert!(
768 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
769 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
770 );
771
772 self.h1_max_buf_size = Some(max);
773 self.h1_read_buf_exact_size = None;
774 self
775 }
776
777 #[cfg(feature = "ffi")]
778 pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self {
779 self.h1_headers_raw = enabled;
780 self
781 }
782
783 /// Sets whether HTTP2 is required.
784 ///
785 /// Default is false.
786 #[cfg(feature = "http2")]
787 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
788 pub fn http2_only(&mut self, enabled: bool) -> &mut Builder {
789 if enabled {
790 self.version = Proto::Http2
791 }
792 self
793 }
794
795 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
796 /// stream-level flow control.
797 ///
798 /// Passing `None` will do nothing.
799 ///
800 /// If not set, hyper will use a default.
801 ///
802 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
803 #[cfg(feature = "http2")]
804 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
805 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
806 if let Some(sz) = sz.into() {
807 self.h2_builder.adaptive_window = false;
808 self.h2_builder.initial_stream_window_size = sz;
809 }
810 self
811 }
812
813 /// Sets the max connection-level flow control for HTTP2
814 ///
815 /// Passing `None` will do nothing.
816 ///
817 /// If not set, hyper will use a default.
818 #[cfg(feature = "http2")]
819 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
820 pub fn http2_initial_connection_window_size(
821 &mut self,
822 sz: impl Into<Option<u32>>,
823 ) -> &mut Self {
824 if let Some(sz) = sz.into() {
825 self.h2_builder.adaptive_window = false;
826 self.h2_builder.initial_conn_window_size = sz;
827 }
828 self
829 }
830
831 /// Sets whether to use an adaptive flow control.
832 ///
833 /// Enabling this will override the limits set in
834 /// `http2_initial_stream_window_size` and
835 /// `http2_initial_connection_window_size`.
836 #[cfg(feature = "http2")]
837 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
838 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
839 use proto::h2::SPEC_WINDOW_SIZE;
840
841 self.h2_builder.adaptive_window = enabled;
842 if enabled {
843 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
844 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
845 }
846 self
847 }
848
849 /// Sets the maximum frame size to use for HTTP2.
850 ///
851 /// Passing `None` will do nothing.
852 ///
853 /// If not set, hyper will use a default.
854 #[cfg(feature = "http2")]
855 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
856 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
857 if let Some(sz) = sz.into() {
858 self.h2_builder.max_frame_size = sz;
859 }
860 self
861 }
862
863 /// Sets an interval for HTTP2 Ping frames should be sent to keep a
864 /// connection alive.
865 ///
866 /// Pass `None` to disable HTTP2 keep-alive.
867 ///
868 /// Default is currently disabled.
869 ///
870 /// # Cargo Feature
871 ///
872 /// Requires the `runtime` cargo feature to be enabled.
873 #[cfg(feature = "runtime")]
874 #[cfg(feature = "http2")]
875 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
876 pub fn http2_keep_alive_interval(
877 &mut self,
878 interval: impl Into<Option<Duration>>,
879 ) -> &mut Self {
880 self.h2_builder.keep_alive_interval = interval.into();
881 self
882 }
883
884 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
885 ///
886 /// If the ping is not acknowledged within the timeout, the connection will
887 /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
888 ///
889 /// Default is 20 seconds.
890 ///
891 /// # Cargo Feature
892 ///
893 /// Requires the `runtime` cargo feature to be enabled.
894 #[cfg(feature = "runtime")]
895 #[cfg(feature = "http2")]
896 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
897 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
898 self.h2_builder.keep_alive_timeout = timeout;
899 self
900 }
901
902 /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
903 ///
904 /// If disabled, keep-alive pings are only sent while there are open
905 /// request/responses streams. If enabled, pings are also sent when no
906 /// streams are active. Does nothing if `http2_keep_alive_interval` is
907 /// disabled.
908 ///
909 /// Default is `false`.
910 ///
911 /// # Cargo Feature
912 ///
913 /// Requires the `runtime` cargo feature to be enabled.
914 #[cfg(feature = "runtime")]
915 #[cfg(feature = "http2")]
916 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
917 pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
918 self.h2_builder.keep_alive_while_idle = enabled;
919 self
920 }
921
922 /// Sets the maximum number of HTTP2 concurrent locally reset streams.
923 ///
924 /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
925 /// details.
926 ///
927 /// The default value is determined by the `h2` crate.
928 ///
929 /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
930 #[cfg(feature = "http2")]
931 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
932 pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
933 self.h2_builder.max_concurrent_reset_streams = Some(max);
934 self
935 }
936
937 /// Set the maximum write buffer size for each HTTP/2 stream.
938 ///
939 /// Default is currently 1MB, but may change.
940 ///
941 /// # Panics
942 ///
943 /// The value must be no larger than `u32::MAX`.
944 #[cfg(feature = "http2")]
945 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
946 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
947 assert!(max <= std::u32::MAX as usize);
948 self.h2_builder.max_send_buffer_size = max;
949 self
950 }
951
952 /// Constructs a connection with the configured options and IO.
953 /// See [`client::conn`](crate::client::conn) for more.
954 ///
955 /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
956 /// do nothing.
957 pub fn handshake<T, B>(
958 &self,
959 io: T,
960 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
961 where
962 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
963 B: HttpBody + 'static,
964 B::Data: Send,
965 B::Error: Into<Box<dyn StdError + Send + Sync>>,
966 {
967 let opts = self.clone();
968
969 async move {
970 trace!("client handshake {:?}", opts.version);
971
972 let (tx, rx) = dispatch::channel();
973 let proto = match opts.version {
974 #[cfg(feature = "http1")]
975 Proto::Http1 => {
976 let mut conn = proto::Conn::new(io);
977 conn.set_h1_parser_config(opts.h1_parser_config);
978 if let Some(writev) = opts.h1_writev {
979 if writev {
980 conn.set_write_strategy_queue();
981 } else {
982 conn.set_write_strategy_flatten();
983 }
984 }
985 if opts.h1_title_case_headers {
986 conn.set_title_case_headers();
987 }
988 if opts.h1_preserve_header_case {
989 conn.set_preserve_header_case();
990 }
991 #[cfg(feature = "ffi")]
992 if opts.h1_preserve_header_order {
993 conn.set_preserve_header_order();
994 }
995 if opts.h09_responses {
996 conn.set_h09_responses();
997 }
998
999 #[cfg(feature = "ffi")]
1000 conn.set_raw_headers(opts.h1_headers_raw);
1001
1002 if let Some(sz) = opts.h1_read_buf_exact_size {
1003 conn.set_read_buf_exact_size(sz);
1004 }
1005 if let Some(max) = opts.h1_max_buf_size {
1006 conn.set_max_buf_size(max);
1007 }
1008 let cd = proto::h1::dispatch::Client::new(rx);
1009 let dispatch = proto::h1::Dispatcher::new(cd, conn);
1010 ProtoClient::H1 { h1: dispatch }
1011 }
1012 #[cfg(feature = "http2")]
1013 Proto::Http2 => {
1014 let h2 =
1015 proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
1016 .await?;
1017 ProtoClient::H2 { h2 }
1018 }
1019 };
1020
1021 Ok((
1022 SendRequest { dispatch: tx },
1023 Connection { inner: Some(proto) },
1024 ))
1025 }
1026 }
1027}
1028
1029// ===== impl ResponseFuture
1030
1031impl Future for ResponseFuture {
1032 type Output = crate::Result<Response<Body>>;
1033
1034 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1035 match self.inner {
1036 ResponseFutureState::Waiting(ref mut rx: &mut Receiver, …>>) => {
1037 Pin::new(pointer:rx).poll(cx).map(|res: Result, …>, …>| match res {
1038 Ok(Ok(resp: Response)) => Ok(resp),
1039 Ok(Err(err: Error)) => Err(err),
1040 // this is definite bug if it happens, but it shouldn't happen!
1041 Err(_canceled: RecvError) => panic!("dispatch dropped without returning error"),
1042 })
1043 }
1044 ResponseFutureState::Error(ref mut err: &mut Option) => {
1045 Poll::Ready(Err(err.take().expect(msg:"polled after ready")))
1046 }
1047 }
1048 }
1049}
1050
1051impl fmt::Debug for ResponseFuture {
1052 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1053 f.debug_struct(name:"ResponseFuture").finish()
1054 }
1055}
1056
1057// ===== impl ProtoClient
1058
1059impl<T, B> Future for ProtoClient<T, B>
1060where
1061 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
1062 B: HttpBody + Send + 'static,
1063 B::Data: Send,
1064 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1065{
1066 type Output = crate::Result<proto::Dispatched>;
1067
1068 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1069 match self.project() {
1070 #[cfg(feature = "http1")]
1071 ProtoClientProj::H1 { h1: Pin<&mut Dispatcher, …, …, …>> } => h1.poll(cx),
1072 #[cfg(feature = "http2")]
1073 ProtoClientProj::H2 { h2: Pin<&mut ClientTask>, .. } => h2.poll(cx),
1074
1075 #[cfg(not(feature = "http1"))]
1076 ProtoClientProj::H1 { h1 } => match h1.0 {},
1077 #[cfg(not(feature = "http2"))]
1078 ProtoClientProj::H2 { h2, .. } => match h2.0 {},
1079 }
1080 }
1081}
1082
1083// assert trait markers
1084
1085trait AssertSend: Send {}
1086trait AssertSendSync: Send + Sync {}
1087
1088#[doc(hidden)]
1089impl<B: Send> AssertSendSync for SendRequest<B> {}
1090
1091#[doc(hidden)]
1092impl<T: Send, B: Send> AssertSend for Connection<T, B>
1093where
1094 T: AsyncRead + AsyncWrite + Send + 'static,
1095 B: HttpBody + 'static,
1096 B::Data: Send,
1097{
1098}
1099
1100#[doc(hidden)]
1101impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
1102where
1103 T: AsyncRead + AsyncWrite + Send + 'static,
1104 B: HttpBody + 'static,
1105 B::Data: Send + Sync + 'static,
1106{
1107}
1108
1109#[doc(hidden)]
1110impl AssertSendSync for Builder {}
1111
1112#[doc(hidden)]
1113impl AssertSend for ResponseFuture {}
1114