1 | //! Connectors used by the `Client`. |
2 | //! |
3 | //! This module contains: |
4 | //! |
5 | //! - A default [`HttpConnector`][] that does DNS resolution and establishes |
6 | //! connections over TCP. |
7 | //! - Types to build custom connectors. |
8 | //! |
9 | //! # Connectors |
10 | //! |
11 | //! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and |
12 | //! its `Response` is some type implementing [`Read`][], [`Write`][], |
13 | //! and [`Connection`][]. |
14 | //! |
15 | //! ## Custom Connectors |
16 | //! |
17 | //! A simple connector that ignores the `Uri` destination and always returns |
18 | //! a TCP connection to the same address could be written like this: |
19 | //! |
20 | //! ```rust,ignore |
21 | //! let connector = tower::service_fn(|_dst| async { |
22 | //! tokio::net::TcpStream::connect("127.0.0.1:1337" ) |
23 | //! }) |
24 | //! ``` |
25 | //! |
26 | //! Or, fully written out: |
27 | //! |
28 | //! ``` |
29 | //! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}}; |
30 | //! use http::Uri; |
31 | //! use tokio::net::TcpStream; |
32 | //! use tower_service::Service; |
33 | //! |
34 | //! #[derive(Clone)] |
35 | //! struct LocalConnector; |
36 | //! |
37 | //! impl Service<Uri> for LocalConnector { |
38 | //! type Response = TcpStream; |
39 | //! type Error = std::io::Error; |
40 | //! // We can't "name" an `async` generated future. |
41 | //! type Future = Pin<Box< |
42 | //! dyn Future<Output = Result<Self::Response, Self::Error>> + Send |
43 | //! >>; |
44 | //! |
45 | //! fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { |
46 | //! // This connector is always ready, but others might not be. |
47 | //! Poll::Ready(Ok(())) |
48 | //! } |
49 | //! |
50 | //! fn call(&mut self, _: Uri) -> Self::Future { |
51 | //! Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337)))) |
52 | //! } |
53 | //! } |
54 | //! ``` |
55 | //! |
56 | //! It's worth noting that for `TcpStream`s, the [`HttpConnector`][] is a |
57 | //! better starting place to extend from. |
58 | //! |
59 | //! [`HttpConnector`]: HttpConnector |
60 | //! [`Service`]: tower_service::Service |
61 | //! [`Uri`]: ::http::Uri |
62 | //! [`Read`]: hyper::rt::Read |
63 | //! [`Write`]: hyper::rt::Write |
64 | //! [`Connection`]: Connection |
65 | use std::{ |
66 | fmt::{self, Formatter}, |
67 | sync::{ |
68 | atomic::{AtomicBool, Ordering}, |
69 | Arc, |
70 | }, |
71 | }; |
72 | |
73 | use ::http::Extensions; |
74 | |
75 | #[cfg (feature = "tokio" )] |
76 | pub use self::http::{HttpConnector, HttpInfo}; |
77 | |
78 | #[cfg (feature = "tokio" )] |
79 | pub mod dns; |
80 | #[cfg (feature = "tokio" )] |
81 | mod http; |
82 | |
83 | pub(crate) mod capture; |
84 | pub use capture::{capture_connection, CaptureConnection}; |
85 | |
86 | pub use self::sealed::Connect; |
87 | |
88 | /// Describes a type returned by a connector. |
89 | pub trait Connection { |
90 | /// Return metadata describing the connection. |
91 | fn connected(&self) -> Connected; |
92 | } |
93 | |
94 | /// Extra information about the connected transport. |
95 | /// |
96 | /// This can be used to inform recipients about things like if ALPN |
97 | /// was used, or if connected to an HTTP proxy. |
98 | #[derive (Debug)] |
99 | pub struct Connected { |
100 | pub(super) alpn: Alpn, |
101 | pub(super) is_proxied: bool, |
102 | pub(super) extra: Option<Extra>, |
103 | pub(super) poisoned: PoisonPill, |
104 | } |
105 | |
106 | #[derive (Clone)] |
107 | pub(crate) struct PoisonPill { |
108 | poisoned: Arc<AtomicBool>, |
109 | } |
110 | |
111 | impl fmt::Debug for PoisonPill { |
112 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
113 | // print the address of the pill—this makes debugging issues much easier |
114 | write!( |
115 | f, |
116 | "PoisonPill@ {:p} {{ poisoned: {} }}" , |
117 | self.poisoned, |
118 | self.poisoned.load(Ordering::Relaxed) |
119 | ) |
120 | } |
121 | } |
122 | |
123 | impl PoisonPill { |
124 | pub(crate) fn healthy() -> Self { |
125 | Self { |
126 | poisoned: Arc::new(data:AtomicBool::new(false)), |
127 | } |
128 | } |
129 | pub(crate) fn poison(&self) { |
130 | self.poisoned.store(val:true, order:Ordering::Relaxed) |
131 | } |
132 | |
133 | pub(crate) fn poisoned(&self) -> bool { |
134 | self.poisoned.load(order:Ordering::Relaxed) |
135 | } |
136 | } |
137 | |
138 | pub(super) struct Extra(Box<dyn ExtraInner>); |
139 | |
140 | #[derive (Clone, Copy, Debug, PartialEq)] |
141 | pub(super) enum Alpn { |
142 | H2, |
143 | None, |
144 | } |
145 | |
146 | impl Connected { |
147 | /// Create new `Connected` type with empty metadata. |
148 | pub fn new() -> Connected { |
149 | Connected { |
150 | alpn: Alpn::None, |
151 | is_proxied: false, |
152 | extra: None, |
153 | poisoned: PoisonPill::healthy(), |
154 | } |
155 | } |
156 | |
157 | /// Set whether the connected transport is to an HTTP proxy. |
158 | /// |
159 | /// This setting will affect if HTTP/1 requests written on the transport |
160 | /// will have the request-target in absolute-form or origin-form: |
161 | /// |
162 | /// - When `proxy(false)`: |
163 | /// |
164 | /// ```http |
165 | /// GET /guide HTTP/1.1 |
166 | /// ``` |
167 | /// |
168 | /// - When `proxy(true)`: |
169 | /// |
170 | /// ```http |
171 | /// GET http://hyper.rs/guide HTTP/1.1 |
172 | /// ``` |
173 | /// |
174 | /// Default is `false`. |
175 | pub fn proxy(mut self, is_proxied: bool) -> Connected { |
176 | self.is_proxied = is_proxied; |
177 | self |
178 | } |
179 | |
180 | /// Determines if the connected transport is to an HTTP proxy. |
181 | pub fn is_proxied(&self) -> bool { |
182 | self.is_proxied |
183 | } |
184 | |
185 | /// Set extra connection information to be set in the extensions of every `Response`. |
186 | pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected { |
187 | if let Some(prev) = self.extra { |
188 | self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra)))); |
189 | } else { |
190 | self.extra = Some(Extra(Box::new(ExtraEnvelope(extra)))); |
191 | } |
192 | self |
193 | } |
194 | |
195 | /// Copies the extra connection information into an `Extensions` map. |
196 | pub fn get_extras(&self, extensions: &mut Extensions) { |
197 | if let Some(extra) = &self.extra { |
198 | extra.set(extensions); |
199 | } |
200 | } |
201 | |
202 | /// Set that the connected transport negotiated HTTP/2 as its next protocol. |
203 | pub fn negotiated_h2(mut self) -> Connected { |
204 | self.alpn = Alpn::H2; |
205 | self |
206 | } |
207 | |
208 | /// Determines if the connected transport negotiated HTTP/2 as its next protocol. |
209 | pub fn is_negotiated_h2(&self) -> bool { |
210 | self.alpn == Alpn::H2 |
211 | } |
212 | |
213 | /// Poison this connection |
214 | /// |
215 | /// A poisoned connection will not be reused for subsequent requests by the pool |
216 | pub fn poison(&self) { |
217 | self.poisoned.poison(); |
218 | tracing::debug!( |
219 | poison_pill = ?self.poisoned, "connection was poisoned. this connection will not be reused for subsequent requests" |
220 | ); |
221 | } |
222 | |
223 | // Don't public expose that `Connected` is `Clone`, unsure if we want to |
224 | // keep that contract... |
225 | pub(super) fn clone(&self) -> Connected { |
226 | Connected { |
227 | alpn: self.alpn, |
228 | is_proxied: self.is_proxied, |
229 | extra: self.extra.clone(), |
230 | poisoned: self.poisoned.clone(), |
231 | } |
232 | } |
233 | } |
234 | |
235 | // ===== impl Extra ===== |
236 | |
237 | impl Extra { |
238 | pub(super) fn set(&self, res: &mut Extensions) { |
239 | self.0.set(res); |
240 | } |
241 | } |
242 | |
243 | impl Clone for Extra { |
244 | fn clone(&self) -> Extra { |
245 | Extra(self.0.clone_box()) |
246 | } |
247 | } |
248 | |
249 | impl fmt::Debug for Extra { |
250 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
251 | f.debug_struct(name:"Extra" ).finish() |
252 | } |
253 | } |
254 | |
255 | trait ExtraInner: Send + Sync { |
256 | fn clone_box(&self) -> Box<dyn ExtraInner>; |
257 | fn set(&self, res: &mut Extensions); |
258 | } |
259 | |
260 | // This indirection allows the `Connected` to have a type-erased "extra" value, |
261 | // while that type still knows its inner extra type. This allows the correct |
262 | // TypeId to be used when inserting into `res.extensions_mut()`. |
263 | #[derive (Clone)] |
264 | struct ExtraEnvelope<T>(T); |
265 | |
266 | impl<T> ExtraInner for ExtraEnvelope<T> |
267 | where |
268 | T: Clone + Send + Sync + 'static, |
269 | { |
270 | fn clone_box(&self) -> Box<dyn ExtraInner> { |
271 | Box::new(self.clone()) |
272 | } |
273 | |
274 | fn set(&self, res: &mut Extensions) { |
275 | res.insert(self.0.clone()); |
276 | } |
277 | } |
278 | |
279 | struct ExtraChain<T>(Box<dyn ExtraInner>, T); |
280 | |
281 | impl<T: Clone> Clone for ExtraChain<T> { |
282 | fn clone(&self) -> Self { |
283 | ExtraChain(self.0.clone_box(), self.1.clone()) |
284 | } |
285 | } |
286 | |
287 | impl<T> ExtraInner for ExtraChain<T> |
288 | where |
289 | T: Clone + Send + Sync + 'static, |
290 | { |
291 | fn clone_box(&self) -> Box<dyn ExtraInner> { |
292 | Box::new(self.clone()) |
293 | } |
294 | |
295 | fn set(&self, res: &mut Extensions) { |
296 | self.0.set(res); |
297 | res.insert(self.1.clone()); |
298 | } |
299 | } |
300 | |
301 | pub(super) mod sealed { |
302 | use std::error::Error as StdError; |
303 | use std::future::Future; |
304 | |
305 | use ::http::Uri; |
306 | use hyper::rt::{Read, Write}; |
307 | |
308 | use super::Connection; |
309 | |
310 | /// Connect to a destination, returning an IO transport. |
311 | /// |
312 | /// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the |
313 | /// ready connection. |
314 | /// |
315 | /// # Trait Alias |
316 | /// |
317 | /// This is really just an *alias* for the `tower::Service` trait, with |
318 | /// additional bounds set for convenience *inside* hyper. You don't actually |
319 | /// implement this trait, but `tower::Service<Uri>` instead. |
320 | // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot |
321 | // fit the `Connect` bounds because of the blanket impl for `Service`. |
322 | pub trait Connect: Sealed + Sized { |
323 | #[doc (hidden)] |
324 | type _Svc: ConnectSvc; |
325 | #[doc (hidden)] |
326 | fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future; |
327 | } |
328 | |
329 | pub trait ConnectSvc { |
330 | type Connection: Read + Write + Connection + Unpin + Send + 'static; |
331 | type Error: Into<Box<dyn StdError + Send + Sync>>; |
332 | type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static; |
333 | |
334 | fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future; |
335 | } |
336 | |
337 | impl<S, T> Connect for S |
338 | where |
339 | S: tower_service::Service<Uri, Response = T> + Send + 'static, |
340 | S::Error: Into<Box<dyn StdError + Send + Sync>>, |
341 | S::Future: Unpin + Send, |
342 | T: Read + Write + Connection + Unpin + Send + 'static, |
343 | { |
344 | type _Svc = S; |
345 | |
346 | fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> { |
347 | crate::service::Oneshot::new(self, dst) |
348 | } |
349 | } |
350 | |
351 | impl<S, T> ConnectSvc for S |
352 | where |
353 | S: tower_service::Service<Uri, Response = T> + Send + 'static, |
354 | S::Error: Into<Box<dyn StdError + Send + Sync>>, |
355 | S::Future: Unpin + Send, |
356 | T: Read + Write + Connection + Unpin + Send + 'static, |
357 | { |
358 | type Connection = T; |
359 | type Error = S::Error; |
360 | type Future = crate::service::Oneshot<S, Uri>; |
361 | |
362 | fn connect(self, _: Internal, dst: Uri) -> Self::Future { |
363 | crate::service::Oneshot::new(self, dst) |
364 | } |
365 | } |
366 | |
367 | impl<S, T> Sealed for S |
368 | where |
369 | S: tower_service::Service<Uri, Response = T> + Send, |
370 | S::Error: Into<Box<dyn StdError + Send + Sync>>, |
371 | S::Future: Unpin + Send, |
372 | T: Read + Write + Connection + Unpin + Send + 'static, |
373 | { |
374 | } |
375 | |
376 | pub trait Sealed {} |
377 | #[allow (missing_debug_implementations)] |
378 | pub struct Internal; |
379 | } |
380 | |
381 | #[cfg (test)] |
382 | mod tests { |
383 | use super::Connected; |
384 | |
385 | #[derive (Clone, Debug, PartialEq)] |
386 | struct Ex1(usize); |
387 | |
388 | #[derive (Clone, Debug, PartialEq)] |
389 | struct Ex2(&'static str); |
390 | |
391 | #[derive (Clone, Debug, PartialEq)] |
392 | struct Ex3(&'static str); |
393 | |
394 | #[test ] |
395 | fn test_connected_extra() { |
396 | let c1 = Connected::new().extra(Ex1(41)); |
397 | |
398 | let mut ex = ::http::Extensions::new(); |
399 | |
400 | assert_eq!(ex.get::<Ex1>(), None); |
401 | |
402 | c1.extra.as_ref().expect("c1 extra" ).set(&mut ex); |
403 | |
404 | assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41))); |
405 | } |
406 | |
407 | #[test ] |
408 | fn test_connected_extra_chain() { |
409 | // If a user composes connectors and at each stage, there's "extra" |
410 | // info to attach, it shouldn't override the previous extras. |
411 | |
412 | let c1 = Connected::new() |
413 | .extra(Ex1(45)) |
414 | .extra(Ex2("zoom" )) |
415 | .extra(Ex3("pew pew" )); |
416 | |
417 | let mut ex1 = ::http::Extensions::new(); |
418 | |
419 | assert_eq!(ex1.get::<Ex1>(), None); |
420 | assert_eq!(ex1.get::<Ex2>(), None); |
421 | assert_eq!(ex1.get::<Ex3>(), None); |
422 | |
423 | c1.extra.as_ref().expect("c1 extra" ).set(&mut ex1); |
424 | |
425 | assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45))); |
426 | assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom" ))); |
427 | assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew" ))); |
428 | |
429 | // Just like extensions, inserting the same type overrides previous type. |
430 | let c2 = Connected::new() |
431 | .extra(Ex1(33)) |
432 | .extra(Ex2("hiccup" )) |
433 | .extra(Ex1(99)); |
434 | |
435 | let mut ex2 = ::http::Extensions::new(); |
436 | |
437 | c2.extra.as_ref().expect("c2 extra" ).set(&mut ex2); |
438 | |
439 | assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99))); |
440 | assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup" ))); |
441 | } |
442 | } |
443 | |