1//! Connectors used by the `Client`.
2//!
3//! This module contains:
4//!
5//! - A default [`HttpConnector`][] that does DNS resolution and establishes
6//! connections over TCP.
7//! - Types to build custom connectors.
8//!
9//! # Connectors
10//!
11//! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and
12//! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][],
13//! and [`Connection`][].
14//!
15//! ## Custom Connectors
16//!
17//! A simple connector that ignores the `Uri` destination and always returns
18//! a TCP connection to the same address could be written like this:
19//!
20//! ```rust,ignore
21//! let connector = tower::service_fn(|_dst| async {
22//! tokio::net::TcpStream::connect("127.0.0.1:1337")
23//! })
24//! ```
25//!
26//! Or, fully written out:
27//!
28//! ```
29//! # #[cfg(feature = "runtime")]
30//! # mod rt {
31//! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}};
32//! use hyper::{service::Service, Uri};
33//! use tokio::net::TcpStream;
34//!
35//! #[derive(Clone)]
36//! struct LocalConnector;
37//!
38//! impl Service<Uri> for LocalConnector {
39//! type Response = TcpStream;
40//! type Error = std::io::Error;
41//! // We can't "name" an `async` generated future.
42//! type Future = Pin<Box<
43//! dyn Future<Output = Result<Self::Response, Self::Error>> + Send
44//! >>;
45//!
46//! fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
47//! // This connector is always ready, but others might not be.
48//! Poll::Ready(Ok(()))
49//! }
50//!
51//! fn call(&mut self, _: Uri) -> Self::Future {
52//! Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337))))
53//! }
54//! }
55//! # }
56//! ```
57//!
58//! It's worth noting that for `TcpStream`s, the [`HttpConnector`][] is a
59//! better starting place to extend from.
60//!
61//! Using either of the above connector examples, it can be used with the
62//! `Client` like this:
63//!
64//! ```
65//! # #[cfg(feature = "runtime")]
66//! # fn rt () {
67//! # let connector = hyper::client::HttpConnector::new();
68//! // let connector = ...
69//!
70//! let client = hyper::Client::builder()
71//! .build::<_, hyper::Body>(connector);
72//! # }
73//! ```
74//!
75//!
76//! [`HttpConnector`]: HttpConnector
77//! [`Service`]: crate::service::Service
78//! [`Uri`]: ::http::Uri
79//! [`AsyncRead`]: tokio::io::AsyncRead
80//! [`AsyncWrite`]: tokio::io::AsyncWrite
81//! [`Connection`]: Connection
82use std::fmt;
83use std::fmt::{Debug, Formatter};
84use std::ops::Deref;
85use std::sync::atomic::{AtomicBool, Ordering};
86use std::sync::Arc;
87
88use ::http::Extensions;
89use tokio::sync::watch;
90
91cfg_feature! {
92 #![feature = "tcp"]
93
94 pub use self::http::{HttpConnector, HttpInfo};
95
96 pub mod dns;
97 mod http;
98}
99
100cfg_feature! {
101 #![any(feature = "http1", feature = "http2")]
102
103 pub use self::sealed::Connect;
104}
105
106/// Describes a type returned by a connector.
107pub trait Connection {
108 /// Return metadata describing the connection.
109 fn connected(&self) -> Connected;
110}
111
112/// Extra information about the connected transport.
113///
114/// This can be used to inform recipients about things like if ALPN
115/// was used, or if connected to an HTTP proxy.
116#[derive(Debug)]
117pub struct Connected {
118 pub(super) alpn: Alpn,
119 pub(super) is_proxied: bool,
120 pub(super) extra: Option<Extra>,
121 pub(super) poisoned: PoisonPill,
122}
123
124#[derive(Clone)]
125pub(crate) struct PoisonPill {
126 poisoned: Arc<AtomicBool>,
127}
128
129impl Debug for PoisonPill {
130 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131 // print the address of the pill—this makes debugging issues much easier
132 write!(
133 f,
134 "PoisonPill@{:p} {{ poisoned: {} }}",
135 self.poisoned,
136 self.poisoned.load(Ordering::Relaxed)
137 )
138 }
139}
140
141impl PoisonPill {
142 pub(crate) fn healthy() -> Self {
143 Self {
144 poisoned: Arc::new(data:AtomicBool::new(false)),
145 }
146 }
147 pub(crate) fn poison(&self) {
148 self.poisoned.store(val:true, order:Ordering::Relaxed)
149 }
150
151 pub(crate) fn poisoned(&self) -> bool {
152 self.poisoned.load(order:Ordering::Relaxed)
153 }
154}
155
156/// [`CaptureConnection`] allows callers to capture [`Connected`] information
157///
158/// To capture a connection for a request, use [`capture_connection`].
159#[derive(Debug, Clone)]
160pub struct CaptureConnection {
161 rx: watch::Receiver<Option<Connected>>,
162}
163
164/// Capture the connection for a given request
165///
166/// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait.
167/// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon
168/// as the connection is established.
169///
170/// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none.
171///
172/// # Examples
173///
174/// **Synchronous access**:
175/// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been
176/// established. This is ideal for situations where you are certain the connection has already
177/// been established (e.g. after the response future has already completed).
178/// ```rust
179/// use hyper::client::connect::{capture_connection, CaptureConnection};
180/// let mut request = http::Request::builder()
181/// .uri("http://foo.com")
182/// .body(())
183/// .unwrap();
184///
185/// let captured_connection = capture_connection(&mut request);
186/// // some time later after the request has been sent...
187/// let connection_info = captured_connection.connection_metadata();
188/// println!("we are connected! {:?}", connection_info.as_ref());
189/// ```
190///
191/// **Asynchronous access**:
192/// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the
193/// connection is available.
194///
195/// ```rust
196/// # #[cfg(feature = "runtime")]
197/// # async fn example() {
198/// use hyper::client::connect::{capture_connection, CaptureConnection};
199/// let mut request = http::Request::builder()
200/// .uri("http://foo.com")
201/// .body(hyper::Body::empty())
202/// .unwrap();
203///
204/// let mut captured = capture_connection(&mut request);
205/// tokio::task::spawn(async move {
206/// let connection_info = captured.wait_for_connection_metadata().await;
207/// println!("we are connected! {:?}", connection_info.as_ref());
208/// });
209///
210/// let client = hyper::Client::new();
211/// client.request(request).await.expect("request failed");
212/// # }
213/// ```
214pub fn capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection {
215 let (tx: CaptureConnectionExtension, rx: CaptureConnection) = CaptureConnection::new();
216 request.extensions_mut().insert(val:tx);
217 rx
218}
219
220/// TxSide for [`CaptureConnection`]
221///
222/// This is inserted into `Extensions` to allow Hyper to back channel connection info
223#[derive(Clone)]
224pub(crate) struct CaptureConnectionExtension {
225 tx: Arc<watch::Sender<Option<Connected>>>,
226}
227
228impl CaptureConnectionExtension {
229 pub(crate) fn set(&self, connected: &Connected) {
230 self.tx.send_replace(Some(connected.clone()));
231 }
232}
233
234impl CaptureConnection {
235 /// Internal API to create the tx and rx half of [`CaptureConnection`]
236 pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
237 let (tx, rx) = watch::channel(None);
238 (
239 CaptureConnectionExtension { tx: Arc::new(tx) },
240 CaptureConnection { rx },
241 )
242 }
243
244 /// Retrieve the connection metadata, if available
245 pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
246 self.rx.borrow()
247 }
248
249 /// Wait for the connection to be established
250 ///
251 /// If a connection was established, this will always return `Some(...)`. If the request never
252 /// successfully connected (e.g. DNS resolution failure), this method will never return.
253 pub async fn wait_for_connection_metadata(
254 &mut self,
255 ) -> impl Deref<Target = Option<Connected>> + '_ {
256 if self.rx.borrow().is_some() {
257 return self.rx.borrow();
258 }
259 let _ = self.rx.changed().await;
260 self.rx.borrow()
261 }
262}
263
264pub(super) struct Extra(Box<dyn ExtraInner>);
265
266#[derive(Clone, Copy, Debug, PartialEq)]
267pub(super) enum Alpn {
268 H2,
269 None,
270}
271
272impl Connected {
273 /// Create new `Connected` type with empty metadata.
274 pub fn new() -> Connected {
275 Connected {
276 alpn: Alpn::None,
277 is_proxied: false,
278 extra: None,
279 poisoned: PoisonPill::healthy(),
280 }
281 }
282
283 /// Set whether the connected transport is to an HTTP proxy.
284 ///
285 /// This setting will affect if HTTP/1 requests written on the transport
286 /// will have the request-target in absolute-form or origin-form:
287 ///
288 /// - When `proxy(false)`:
289 ///
290 /// ```http
291 /// GET /guide HTTP/1.1
292 /// ```
293 ///
294 /// - When `proxy(true)`:
295 ///
296 /// ```http
297 /// GET http://hyper.rs/guide HTTP/1.1
298 /// ```
299 ///
300 /// Default is `false`.
301 pub fn proxy(mut self, is_proxied: bool) -> Connected {
302 self.is_proxied = is_proxied;
303 self
304 }
305
306 /// Determines if the connected transport is to an HTTP proxy.
307 pub fn is_proxied(&self) -> bool {
308 self.is_proxied
309 }
310
311 /// Set extra connection information to be set in the extensions of every `Response`.
312 pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
313 if let Some(prev) = self.extra {
314 self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra))));
315 } else {
316 self.extra = Some(Extra(Box::new(ExtraEnvelope(extra))));
317 }
318 self
319 }
320
321 /// Copies the extra connection information into an `Extensions` map.
322 pub fn get_extras(&self, extensions: &mut Extensions) {
323 if let Some(extra) = &self.extra {
324 extra.set(extensions);
325 }
326 }
327
328 /// Set that the connected transport negotiated HTTP/2 as its next protocol.
329 pub fn negotiated_h2(mut self) -> Connected {
330 self.alpn = Alpn::H2;
331 self
332 }
333
334 /// Determines if the connected transport negotiated HTTP/2 as its next protocol.
335 pub fn is_negotiated_h2(&self) -> bool {
336 self.alpn == Alpn::H2
337 }
338
339 /// Poison this connection
340 ///
341 /// A poisoned connection will not be reused for subsequent requests by the pool
342 pub fn poison(&self) {
343 self.poisoned.poison();
344 tracing::debug!(
345 poison_pill = ?self.poisoned, "connection was poisoned"
346 );
347 }
348
349 // Don't public expose that `Connected` is `Clone`, unsure if we want to
350 // keep that contract...
351 pub(super) fn clone(&self) -> Connected {
352 Connected {
353 alpn: self.alpn.clone(),
354 is_proxied: self.is_proxied,
355 extra: self.extra.clone(),
356 poisoned: self.poisoned.clone(),
357 }
358 }
359}
360
361// ===== impl Extra =====
362
363impl Extra {
364 pub(super) fn set(&self, res: &mut Extensions) {
365 self.0.set(res);
366 }
367}
368
369impl Clone for Extra {
370 fn clone(&self) -> Extra {
371 Extra(self.0.clone_box())
372 }
373}
374
375impl fmt::Debug for Extra {
376 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377 f.debug_struct(name:"Extra").finish()
378 }
379}
380
381trait ExtraInner: Send + Sync {
382 fn clone_box(&self) -> Box<dyn ExtraInner>;
383 fn set(&self, res: &mut Extensions);
384}
385
386// This indirection allows the `Connected` to have a type-erased "extra" value,
387// while that type still knows its inner extra type. This allows the correct
388// TypeId to be used when inserting into `res.extensions_mut()`.
389#[derive(Clone)]
390struct ExtraEnvelope<T>(T);
391
392impl<T> ExtraInner for ExtraEnvelope<T>
393where
394 T: Clone + Send + Sync + 'static,
395{
396 fn clone_box(&self) -> Box<dyn ExtraInner> {
397 Box::new(self.clone())
398 }
399
400 fn set(&self, res: &mut Extensions) {
401 res.insert(self.0.clone());
402 }
403}
404
405struct ExtraChain<T>(Box<dyn ExtraInner>, T);
406
407impl<T: Clone> Clone for ExtraChain<T> {
408 fn clone(&self) -> Self {
409 ExtraChain(self.0.clone_box(), self.1.clone())
410 }
411}
412
413impl<T> ExtraInner for ExtraChain<T>
414where
415 T: Clone + Send + Sync + 'static,
416{
417 fn clone_box(&self) -> Box<dyn ExtraInner> {
418 Box::new(self.clone())
419 }
420
421 fn set(&self, res: &mut Extensions) {
422 self.0.set(res);
423 res.insert(self.1.clone());
424 }
425}
426
427#[cfg(any(feature = "http1", feature = "http2"))]
428pub(super) mod sealed {
429 use std::error::Error as StdError;
430 use std::future::Future;
431 use std::marker::Unpin;
432
433 use ::http::Uri;
434 use tokio::io::{AsyncRead, AsyncWrite};
435
436 use super::Connection;
437
438 /// Connect to a destination, returning an IO transport.
439 ///
440 /// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the
441 /// ready connection.
442 ///
443 /// # Trait Alias
444 ///
445 /// This is really just an *alias* for the `tower::Service` trait, with
446 /// additional bounds set for convenience *inside* hyper. You don't actually
447 /// implement this trait, but `tower::Service<Uri>` instead.
448 // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot
449 // fit the `Connect` bounds because of the blanket impl for `Service`.
450 pub trait Connect: Sealed + Sized {
451 #[doc(hidden)]
452 type _Svc: ConnectSvc;
453 #[doc(hidden)]
454 fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future;
455 }
456
457 #[allow(unreachable_pub)]
458 pub trait ConnectSvc {
459 type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static;
460 type Error: Into<Box<dyn StdError + Send + Sync>>;
461 type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static;
462
463 fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
464 }
465
466 impl<S, T> Connect for S
467 where
468 S: tower_service::Service<Uri, Response = T> + Send + 'static,
469 S::Error: Into<Box<dyn StdError + Send + Sync>>,
470 S::Future: Unpin + Send,
471 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
472 {
473 type _Svc = S;
474
475 fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
476 crate::service::oneshot(self, dst)
477 }
478 }
479
480 impl<S, T> ConnectSvc for S
481 where
482 S: tower_service::Service<Uri, Response = T> + Send + 'static,
483 S::Error: Into<Box<dyn StdError + Send + Sync>>,
484 S::Future: Unpin + Send,
485 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
486 {
487 type Connection = T;
488 type Error = S::Error;
489 type Future = crate::service::Oneshot<S, Uri>;
490
491 fn connect(self, _: Internal, dst: Uri) -> Self::Future {
492 crate::service::oneshot(self, dst)
493 }
494 }
495
496 impl<S, T> Sealed for S
497 where
498 S: tower_service::Service<Uri, Response = T> + Send,
499 S::Error: Into<Box<dyn StdError + Send + Sync>>,
500 S::Future: Unpin + Send,
501 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
502 {
503 }
504
505 pub trait Sealed {}
506 #[allow(missing_debug_implementations)]
507 pub struct Internal;
508}
509
510#[cfg(test)]
511mod tests {
512 use super::Connected;
513 use crate::client::connect::CaptureConnection;
514
515 #[derive(Clone, Debug, PartialEq)]
516 struct Ex1(usize);
517
518 #[derive(Clone, Debug, PartialEq)]
519 struct Ex2(&'static str);
520
521 #[derive(Clone, Debug, PartialEq)]
522 struct Ex3(&'static str);
523
524 #[test]
525 fn test_connected_extra() {
526 let c1 = Connected::new().extra(Ex1(41));
527
528 let mut ex = ::http::Extensions::new();
529
530 assert_eq!(ex.get::<Ex1>(), None);
531
532 c1.extra.as_ref().expect("c1 extra").set(&mut ex);
533
534 assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41)));
535 }
536
537 #[test]
538 fn test_connected_extra_chain() {
539 // If a user composes connectors and at each stage, there's "extra"
540 // info to attach, it shouldn't override the previous extras.
541
542 let c1 = Connected::new()
543 .extra(Ex1(45))
544 .extra(Ex2("zoom"))
545 .extra(Ex3("pew pew"));
546
547 let mut ex1 = ::http::Extensions::new();
548
549 assert_eq!(ex1.get::<Ex1>(), None);
550 assert_eq!(ex1.get::<Ex2>(), None);
551 assert_eq!(ex1.get::<Ex3>(), None);
552
553 c1.extra.as_ref().expect("c1 extra").set(&mut ex1);
554
555 assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45)));
556 assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom")));
557 assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew")));
558
559 // Just like extensions, inserting the same type overrides previous type.
560 let c2 = Connected::new()
561 .extra(Ex1(33))
562 .extra(Ex2("hiccup"))
563 .extra(Ex1(99));
564
565 let mut ex2 = ::http::Extensions::new();
566
567 c2.extra.as_ref().expect("c2 extra").set(&mut ex2);
568
569 assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
570 assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
571 }
572
573 #[test]
574 fn test_sync_capture_connection() {
575 let (tx, rx) = CaptureConnection::new();
576 assert!(
577 rx.connection_metadata().is_none(),
578 "connection has not been set"
579 );
580 tx.set(&Connected::new().proxy(true));
581 assert_eq!(
582 rx.connection_metadata()
583 .as_ref()
584 .expect("connected should be set")
585 .is_proxied(),
586 true
587 );
588
589 // ensure it can be called multiple times
590 assert_eq!(
591 rx.connection_metadata()
592 .as_ref()
593 .expect("connected should be set")
594 .is_proxied(),
595 true
596 );
597 }
598
599 #[tokio::test]
600 async fn async_capture_connection() {
601 let (tx, mut rx) = CaptureConnection::new();
602 assert!(
603 rx.connection_metadata().is_none(),
604 "connection has not been set"
605 );
606 let test_task = tokio::spawn(async move {
607 assert_eq!(
608 rx.wait_for_connection_metadata()
609 .await
610 .as_ref()
611 .expect("connection should be set")
612 .is_proxied(),
613 true
614 );
615 // can be awaited multiple times
616 assert!(
617 rx.wait_for_connection_metadata().await.is_some(),
618 "should be awaitable multiple times"
619 );
620
621 assert_eq!(rx.connection_metadata().is_some(), true);
622 });
623 // can't be finished, we haven't set the connection yet
624 assert_eq!(test_task.is_finished(), false);
625 tx.set(&Connected::new().proxy(true));
626
627 assert!(test_task.await.is_ok());
628 }
629
630 #[tokio::test]
631 async fn capture_connection_sender_side_dropped() {
632 let (tx, mut rx) = CaptureConnection::new();
633 assert!(
634 rx.connection_metadata().is_none(),
635 "connection has not been set"
636 );
637 drop(tx);
638 assert!(rx.wait_for_connection_metadata().await.is_none());
639 }
640}
641