1 | use std::error::Error as StdError; |
2 | use std::fmt; |
3 | use std::future::Future; |
4 | use std::io; |
5 | use std::marker::PhantomData; |
6 | use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; |
7 | use std::pin::Pin; |
8 | use std::sync::Arc; |
9 | use std::task::{self, Poll}; |
10 | use std::time::Duration; |
11 | |
12 | use futures_util::future::Either; |
13 | use http::uri::{Scheme, Uri}; |
14 | use pin_project_lite::pin_project; |
15 | use socket2::TcpKeepalive; |
16 | use tokio::net::{TcpSocket, TcpStream}; |
17 | use tokio::time::Sleep; |
18 | use tracing::{debug, trace, warn}; |
19 | |
20 | use super::dns::{self, resolve, GaiResolver, Resolve}; |
21 | use super::{Connected, Connection}; |
22 | use crate::rt::TokioIo; |
23 | |
24 | /// A connector for the `http` scheme. |
25 | /// |
26 | /// Performs DNS resolution in a thread pool, and then connects over TCP. |
27 | /// |
28 | /// # Note |
29 | /// |
30 | /// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes |
31 | /// transport information such as the remote socket address used. |
32 | #[derive (Clone)] |
33 | pub struct HttpConnector<R = GaiResolver> { |
34 | config: Arc<Config>, |
35 | resolver: R, |
36 | } |
37 | |
38 | /// Extra information about the transport when an HttpConnector is used. |
39 | /// |
40 | /// # Example |
41 | /// |
42 | /// ``` |
43 | /// # fn doc(res: http::Response<()>) { |
44 | /// use hyper_util::client::legacy::connect::HttpInfo; |
45 | /// |
46 | /// // res = http::Response |
47 | /// res |
48 | /// .extensions() |
49 | /// .get::<HttpInfo>() |
50 | /// .map(|info| { |
51 | /// println!("remote addr = {}" , info.remote_addr()); |
52 | /// }); |
53 | /// # } |
54 | /// ``` |
55 | /// |
56 | /// # Note |
57 | /// |
58 | /// If a different connector is used besides [`HttpConnector`](HttpConnector), |
59 | /// this value will not exist in the extensions. Consult that specific |
60 | /// connector to see what "extra" information it might provide to responses. |
61 | #[derive (Clone, Debug)] |
62 | pub struct HttpInfo { |
63 | remote_addr: SocketAddr, |
64 | local_addr: SocketAddr, |
65 | } |
66 | |
67 | #[derive (Clone)] |
68 | struct Config { |
69 | connect_timeout: Option<Duration>, |
70 | enforce_http: bool, |
71 | happy_eyeballs_timeout: Option<Duration>, |
72 | tcp_keepalive_config: TcpKeepaliveConfig, |
73 | local_address_ipv4: Option<Ipv4Addr>, |
74 | local_address_ipv6: Option<Ipv6Addr>, |
75 | nodelay: bool, |
76 | reuse_address: bool, |
77 | send_buffer_size: Option<usize>, |
78 | recv_buffer_size: Option<usize>, |
79 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
80 | interface: Option<String>, |
81 | #[cfg (any( |
82 | target_os = "illumos" , |
83 | target_os = "ios" , |
84 | target_os = "macos" , |
85 | target_os = "solaris" , |
86 | target_os = "tvos" , |
87 | target_os = "visionos" , |
88 | target_os = "watchos" , |
89 | ))] |
90 | interface: Option<std::ffi::CString>, |
91 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
92 | tcp_user_timeout: Option<Duration>, |
93 | } |
94 | |
95 | #[derive (Default, Debug, Clone, Copy)] |
96 | struct TcpKeepaliveConfig { |
97 | time: Option<Duration>, |
98 | interval: Option<Duration>, |
99 | retries: Option<u32>, |
100 | } |
101 | |
102 | impl TcpKeepaliveConfig { |
103 | /// Converts into a `socket2::TcpKeealive` if there is any keep alive configuration. |
104 | fn into_tcpkeepalive(self) -> Option<TcpKeepalive> { |
105 | let mut dirty = false; |
106 | let mut ka = TcpKeepalive::new(); |
107 | if let Some(time) = self.time { |
108 | ka = ka.with_time(time); |
109 | dirty = true |
110 | } |
111 | if let Some(interval) = self.interval { |
112 | ka = Self::ka_with_interval(ka, interval, &mut dirty) |
113 | }; |
114 | if let Some(retries) = self.retries { |
115 | ka = Self::ka_with_retries(ka, retries, &mut dirty) |
116 | }; |
117 | if dirty { |
118 | Some(ka) |
119 | } else { |
120 | None |
121 | } |
122 | } |
123 | |
124 | #[cfg ( |
125 | // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#511-525 |
126 | any( |
127 | target_os = "android" , |
128 | target_os = "dragonfly" , |
129 | target_os = "freebsd" , |
130 | target_os = "fuchsia" , |
131 | target_os = "illumos" , |
132 | target_os = "ios" , |
133 | target_os = "visionos" , |
134 | target_os = "linux" , |
135 | target_os = "macos" , |
136 | target_os = "netbsd" , |
137 | target_os = "tvos" , |
138 | target_os = "watchos" , |
139 | target_os = "windows" , |
140 | ) |
141 | )] |
142 | fn ka_with_interval(ka: TcpKeepalive, interval: Duration, dirty: &mut bool) -> TcpKeepalive { |
143 | *dirty = true; |
144 | ka.with_interval(interval) |
145 | } |
146 | |
147 | #[cfg (not( |
148 | // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#511-525 |
149 | any( |
150 | target_os = "android" , |
151 | target_os = "dragonfly" , |
152 | target_os = "freebsd" , |
153 | target_os = "fuchsia" , |
154 | target_os = "illumos" , |
155 | target_os = "ios" , |
156 | target_os = "visionos" , |
157 | target_os = "linux" , |
158 | target_os = "macos" , |
159 | target_os = "netbsd" , |
160 | target_os = "tvos" , |
161 | target_os = "watchos" , |
162 | target_os = "windows" , |
163 | ) |
164 | ))] |
165 | fn ka_with_interval(ka: TcpKeepalive, _: Duration, _: &mut bool) -> TcpKeepalive { |
166 | ka // no-op as keepalive interval is not supported on this platform |
167 | } |
168 | |
169 | #[cfg ( |
170 | // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#557-570 |
171 | any( |
172 | target_os = "android" , |
173 | target_os = "dragonfly" , |
174 | target_os = "freebsd" , |
175 | target_os = "fuchsia" , |
176 | target_os = "illumos" , |
177 | target_os = "ios" , |
178 | target_os = "visionos" , |
179 | target_os = "linux" , |
180 | target_os = "macos" , |
181 | target_os = "netbsd" , |
182 | target_os = "tvos" , |
183 | target_os = "watchos" , |
184 | ) |
185 | )] |
186 | fn ka_with_retries(ka: TcpKeepalive, retries: u32, dirty: &mut bool) -> TcpKeepalive { |
187 | *dirty = true; |
188 | ka.with_retries(retries) |
189 | } |
190 | |
191 | #[cfg (not( |
192 | // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#557-570 |
193 | any( |
194 | target_os = "android" , |
195 | target_os = "dragonfly" , |
196 | target_os = "freebsd" , |
197 | target_os = "fuchsia" , |
198 | target_os = "illumos" , |
199 | target_os = "ios" , |
200 | target_os = "visionos" , |
201 | target_os = "linux" , |
202 | target_os = "macos" , |
203 | target_os = "netbsd" , |
204 | target_os = "tvos" , |
205 | target_os = "watchos" , |
206 | ) |
207 | ))] |
208 | fn ka_with_retries(ka: TcpKeepalive, _: u32, _: &mut bool) -> TcpKeepalive { |
209 | ka // no-op as keepalive retries is not supported on this platform |
210 | } |
211 | } |
212 | |
213 | // ===== impl HttpConnector ===== |
214 | |
215 | impl HttpConnector { |
216 | /// Construct a new HttpConnector. |
217 | pub fn new() -> HttpConnector { |
218 | HttpConnector::new_with_resolver(GaiResolver::new()) |
219 | } |
220 | } |
221 | |
222 | impl<R> HttpConnector<R> { |
223 | /// Construct a new HttpConnector. |
224 | /// |
225 | /// Takes a [`Resolver`](crate::client::legacy::connect::dns#resolvers-are-services) to handle DNS lookups. |
226 | pub fn new_with_resolver(resolver: R) -> HttpConnector<R> { |
227 | HttpConnector { |
228 | config: Arc::new(Config { |
229 | connect_timeout: None, |
230 | enforce_http: true, |
231 | happy_eyeballs_timeout: Some(Duration::from_millis(300)), |
232 | tcp_keepalive_config: TcpKeepaliveConfig::default(), |
233 | local_address_ipv4: None, |
234 | local_address_ipv6: None, |
235 | nodelay: false, |
236 | reuse_address: false, |
237 | send_buffer_size: None, |
238 | recv_buffer_size: None, |
239 | #[cfg (any( |
240 | target_os = "android" , |
241 | target_os = "fuchsia" , |
242 | target_os = "illumos" , |
243 | target_os = "ios" , |
244 | target_os = "linux" , |
245 | target_os = "macos" , |
246 | target_os = "solaris" , |
247 | target_os = "tvos" , |
248 | target_os = "visionos" , |
249 | target_os = "watchos" , |
250 | ))] |
251 | interface: None, |
252 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
253 | tcp_user_timeout: None, |
254 | }), |
255 | resolver, |
256 | } |
257 | } |
258 | |
259 | /// Option to enforce all `Uri`s have the `http` scheme. |
260 | /// |
261 | /// Enabled by default. |
262 | #[inline ] |
263 | pub fn enforce_http(&mut self, is_enforced: bool) { |
264 | self.config_mut().enforce_http = is_enforced; |
265 | } |
266 | |
267 | /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration |
268 | /// to remain idle before sending TCP keepalive probes. |
269 | /// |
270 | /// If `None`, keepalive is disabled. |
271 | /// |
272 | /// Default is `None`. |
273 | #[inline ] |
274 | pub fn set_keepalive(&mut self, time: Option<Duration>) { |
275 | self.config_mut().tcp_keepalive_config.time = time; |
276 | } |
277 | |
278 | /// Set the duration between two successive TCP keepalive retransmissions, |
279 | /// if acknowledgement to the previous keepalive transmission is not received. |
280 | #[inline ] |
281 | pub fn set_keepalive_interval(&mut self, interval: Option<Duration>) { |
282 | self.config_mut().tcp_keepalive_config.interval = interval; |
283 | } |
284 | |
285 | /// Set the number of retransmissions to be carried out before declaring that remote end is not available. |
286 | #[inline ] |
287 | pub fn set_keepalive_retries(&mut self, retries: Option<u32>) { |
288 | self.config_mut().tcp_keepalive_config.retries = retries; |
289 | } |
290 | |
291 | /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`. |
292 | /// |
293 | /// Default is `false`. |
294 | #[inline ] |
295 | pub fn set_nodelay(&mut self, nodelay: bool) { |
296 | self.config_mut().nodelay = nodelay; |
297 | } |
298 | |
299 | /// Sets the value of the SO_SNDBUF option on the socket. |
300 | #[inline ] |
301 | pub fn set_send_buffer_size(&mut self, size: Option<usize>) { |
302 | self.config_mut().send_buffer_size = size; |
303 | } |
304 | |
305 | /// Sets the value of the SO_RCVBUF option on the socket. |
306 | #[inline ] |
307 | pub fn set_recv_buffer_size(&mut self, size: Option<usize>) { |
308 | self.config_mut().recv_buffer_size = size; |
309 | } |
310 | |
311 | /// Set that all sockets are bound to the configured address before connection. |
312 | /// |
313 | /// If `None`, the sockets will not be bound. |
314 | /// |
315 | /// Default is `None`. |
316 | #[inline ] |
317 | pub fn set_local_address(&mut self, addr: Option<IpAddr>) { |
318 | let (v4, v6) = match addr { |
319 | Some(IpAddr::V4(a)) => (Some(a), None), |
320 | Some(IpAddr::V6(a)) => (None, Some(a)), |
321 | _ => (None, None), |
322 | }; |
323 | |
324 | let cfg = self.config_mut(); |
325 | |
326 | cfg.local_address_ipv4 = v4; |
327 | cfg.local_address_ipv6 = v6; |
328 | } |
329 | |
330 | /// Set that all sockets are bound to the configured IPv4 or IPv6 address (depending on host's |
331 | /// preferences) before connection. |
332 | #[inline ] |
333 | pub fn set_local_addresses(&mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) { |
334 | let cfg = self.config_mut(); |
335 | |
336 | cfg.local_address_ipv4 = Some(addr_ipv4); |
337 | cfg.local_address_ipv6 = Some(addr_ipv6); |
338 | } |
339 | |
340 | /// Set the connect timeout. |
341 | /// |
342 | /// If a domain resolves to multiple IP addresses, the timeout will be |
343 | /// evenly divided across them. |
344 | /// |
345 | /// Default is `None`. |
346 | #[inline ] |
347 | pub fn set_connect_timeout(&mut self, dur: Option<Duration>) { |
348 | self.config_mut().connect_timeout = dur; |
349 | } |
350 | |
351 | /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. |
352 | /// |
353 | /// If hostname resolves to both IPv4 and IPv6 addresses and connection |
354 | /// cannot be established using preferred address family before timeout |
355 | /// elapses, then connector will in parallel attempt connection using other |
356 | /// address family. |
357 | /// |
358 | /// If `None`, parallel connection attempts are disabled. |
359 | /// |
360 | /// Default is 300 milliseconds. |
361 | /// |
362 | /// [RFC 6555]: https://tools.ietf.org/html/rfc6555 |
363 | #[inline ] |
364 | pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) { |
365 | self.config_mut().happy_eyeballs_timeout = dur; |
366 | } |
367 | |
368 | /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`. |
369 | /// |
370 | /// Default is `false`. |
371 | #[inline ] |
372 | pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self { |
373 | self.config_mut().reuse_address = reuse_address; |
374 | self |
375 | } |
376 | |
377 | /// Sets the name of the interface to bind sockets produced by this |
378 | /// connector. |
379 | /// |
380 | /// On Linux, this sets the `SO_BINDTODEVICE` option on this socket (see |
381 | /// [`man 7 socket`] for details). On macOS (and macOS-derived systems like |
382 | /// iOS), illumos, and Solaris, this will instead use the `IP_BOUND_IF` |
383 | /// socket option (see [`man 7p ip`]). |
384 | /// |
385 | /// If a socket is bound to an interface, only packets received from that particular |
386 | /// interface are processed by the socket. Note that this only works for some socket |
387 | /// types, particularly `AF_INET`` sockets. |
388 | /// |
389 | /// On Linux it can be used to specify a [VRF], but the binary needs |
390 | /// to either have `CAP_NET_RAW` or to be run as root. |
391 | /// |
392 | /// This function is only available on the following operating systems: |
393 | /// - Linux, including Android |
394 | /// - Fuchsia |
395 | /// - illumos and Solaris |
396 | /// - macOS, iOS, visionOS, watchOS, and tvOS |
397 | /// |
398 | /// [VRF]: https://www.kernel.org/doc/Documentation/networking/vrf.txt |
399 | /// [`man 7 socket`] https://man7.org/linux/man-pages/man7/socket.7.html |
400 | /// [`man 7p ip`]: https://docs.oracle.com/cd/E86824_01/html/E54777/ip-7p.html |
401 | #[cfg (any( |
402 | target_os = "android" , |
403 | target_os = "fuchsia" , |
404 | target_os = "illumos" , |
405 | target_os = "ios" , |
406 | target_os = "linux" , |
407 | target_os = "macos" , |
408 | target_os = "solaris" , |
409 | target_os = "tvos" , |
410 | target_os = "visionos" , |
411 | target_os = "watchos" , |
412 | ))] |
413 | #[inline ] |
414 | pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self { |
415 | let interface = interface.into(); |
416 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
417 | { |
418 | self.config_mut().interface = Some(interface); |
419 | } |
420 | #[cfg (not(any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" )))] |
421 | { |
422 | let interface = std::ffi::CString::new(interface) |
423 | .expect("interface name should not have nulls in it" ); |
424 | self.config_mut().interface = Some(interface); |
425 | } |
426 | self |
427 | } |
428 | |
429 | /// Sets the value of the TCP_USER_TIMEOUT option on the socket. |
430 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
431 | #[inline ] |
432 | pub fn set_tcp_user_timeout(&mut self, time: Option<Duration>) { |
433 | self.config_mut().tcp_user_timeout = time; |
434 | } |
435 | |
436 | // private |
437 | |
438 | fn config_mut(&mut self) -> &mut Config { |
439 | // If the are HttpConnector clones, this will clone the inner |
440 | // config. So mutating the config won't ever affect previous |
441 | // clones. |
442 | Arc::make_mut(&mut self.config) |
443 | } |
444 | } |
445 | |
446 | static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http" ; |
447 | static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing" ; |
448 | static INVALID_MISSING_HOST: &str = "invalid URL, host is missing" ; |
449 | |
450 | // R: Debug required for now to allow adding it to debug output later... |
451 | impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> { |
452 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
453 | f.debug_struct(name:"HttpConnector" ).finish() |
454 | } |
455 | } |
456 | |
457 | impl<R> tower_service::Service<Uri> for HttpConnector<R> |
458 | where |
459 | R: Resolve + Clone + Send + Sync + 'static, |
460 | R::Future: Send, |
461 | { |
462 | type Response = TokioIo<TcpStream>; |
463 | type Error = ConnectError; |
464 | type Future = HttpConnecting<R>; |
465 | |
466 | fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { |
467 | futures_util::ready!(self.resolver.poll_ready(cx)).map_err(op:ConnectError::dns)?; |
468 | Poll::Ready(Ok(())) |
469 | } |
470 | |
471 | fn call(&mut self, dst: Uri) -> Self::Future { |
472 | let mut self_: HttpConnector = self.clone(); |
473 | HttpConnecting { |
474 | fut: Box::pin(async move { self_.call_async(dst).await }), |
475 | _marker: PhantomData, |
476 | } |
477 | } |
478 | } |
479 | |
480 | fn get_host_port<'u>(config: &Config, dst: &'u Uri) -> Result<(&'u str, u16), ConnectError> { |
481 | trace!( |
482 | "Http::connect; scheme= {:?}, host= {:?}, port= {:?}" , |
483 | dst.scheme(), |
484 | dst.host(), |
485 | dst.port(), |
486 | ); |
487 | |
488 | if config.enforce_http { |
489 | if dst.scheme() != Some(&Scheme::HTTP) { |
490 | return Err(ConnectError { |
491 | msg: INVALID_NOT_HTTP.into(), |
492 | cause: None, |
493 | }); |
494 | } |
495 | } else if dst.scheme().is_none() { |
496 | return Err(ConnectError { |
497 | msg: INVALID_MISSING_SCHEME.into(), |
498 | cause: None, |
499 | }); |
500 | } |
501 | |
502 | let host = match dst.host() { |
503 | Some(s) => s, |
504 | None => { |
505 | return Err(ConnectError { |
506 | msg: INVALID_MISSING_HOST.into(), |
507 | cause: None, |
508 | }) |
509 | } |
510 | }; |
511 | let port = match dst.port() { |
512 | Some(port) => port.as_u16(), |
513 | None => { |
514 | if dst.scheme() == Some(&Scheme::HTTPS) { |
515 | 443 |
516 | } else { |
517 | 80 |
518 | } |
519 | } |
520 | }; |
521 | |
522 | Ok((host, port)) |
523 | } |
524 | |
525 | impl<R> HttpConnector<R> |
526 | where |
527 | R: Resolve, |
528 | { |
529 | async fn call_async(&mut self, dst: Uri) -> Result<TokioIo<TcpStream>, ConnectError> { |
530 | let config = &self.config; |
531 | |
532 | let (host, port) = get_host_port(config, &dst)?; |
533 | let host = host.trim_start_matches('[' ).trim_end_matches(']' ); |
534 | |
535 | // If the host is already an IP addr (v4 or v6), |
536 | // skip resolving the dns and start connecting right away. |
537 | let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) { |
538 | addrs |
539 | } else { |
540 | let addrs = resolve(&mut self.resolver, dns::Name::new(host.into())) |
541 | .await |
542 | .map_err(ConnectError::dns)?; |
543 | let addrs = addrs |
544 | .map(|mut addr| { |
545 | set_port(&mut addr, port, dst.port().is_some()); |
546 | |
547 | addr |
548 | }) |
549 | .collect(); |
550 | dns::SocketAddrs::new(addrs) |
551 | }; |
552 | |
553 | let c = ConnectingTcp::new(addrs, config); |
554 | |
555 | let sock = c.connect().await?; |
556 | |
557 | if let Err(e) = sock.set_nodelay(config.nodelay) { |
558 | warn!("tcp set_nodelay error: {}" , e); |
559 | } |
560 | |
561 | Ok(TokioIo::new(sock)) |
562 | } |
563 | } |
564 | |
565 | impl Connection for TcpStream { |
566 | fn connected(&self) -> Connected { |
567 | let connected: Connected = Connected::new(); |
568 | if let (Ok(remote_addr: SocketAddr), Ok(local_addr: SocketAddr)) = (self.peer_addr(), self.local_addr()) { |
569 | connected.extra(HttpInfo { |
570 | remote_addr, |
571 | local_addr, |
572 | }) |
573 | } else { |
574 | connected |
575 | } |
576 | } |
577 | } |
578 | |
579 | #[cfg (unix)] |
580 | impl Connection for tokio::net::UnixStream { |
581 | fn connected(&self) -> Connected { |
582 | Connected::new() |
583 | } |
584 | } |
585 | |
586 | #[cfg (windows)] |
587 | impl Connection for tokio::net::windows::named_pipe::NamedPipeClient { |
588 | fn connected(&self) -> Connected { |
589 | Connected::new() |
590 | } |
591 | } |
592 | |
593 | // Implement `Connection` for generic `TokioIo<T>` so that external crates can |
594 | // implement their own `HttpConnector` with `TokioIo<CustomTcpStream>`. |
595 | impl<T> Connection for TokioIo<T> |
596 | where |
597 | T: Connection, |
598 | { |
599 | fn connected(&self) -> Connected { |
600 | self.inner().connected() |
601 | } |
602 | } |
603 | |
604 | impl HttpInfo { |
605 | /// Get the remote address of the transport used. |
606 | pub fn remote_addr(&self) -> SocketAddr { |
607 | self.remote_addr |
608 | } |
609 | |
610 | /// Get the local address of the transport used. |
611 | pub fn local_addr(&self) -> SocketAddr { |
612 | self.local_addr |
613 | } |
614 | } |
615 | |
616 | pin_project! { |
617 | // Not publicly exported (so missing_docs doesn't trigger). |
618 | // |
619 | // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly |
620 | // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot |
621 | // (and thus we can change the type in the future). |
622 | #[must_use = "futures do nothing unless polled" ] |
623 | #[allow (missing_debug_implementations)] |
624 | pub struct HttpConnecting<R> { |
625 | #[pin] |
626 | fut: BoxConnecting, |
627 | _marker: PhantomData<R>, |
628 | } |
629 | } |
630 | |
631 | type ConnectResult = Result<TokioIo<TcpStream>, ConnectError>; |
632 | type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>; |
633 | |
634 | impl<R: Resolve> Future for HttpConnecting<R> { |
635 | type Output = ConnectResult; |
636 | |
637 | fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
638 | self.project().fut.poll(cx) |
639 | } |
640 | } |
641 | |
642 | // Not publicly exported (so missing_docs doesn't trigger). |
643 | pub struct ConnectError { |
644 | msg: Box<str>, |
645 | cause: Option<Box<dyn StdError + Send + Sync>>, |
646 | } |
647 | |
648 | impl ConnectError { |
649 | fn new<S, E>(msg: S, cause: E) -> ConnectError |
650 | where |
651 | S: Into<Box<str>>, |
652 | E: Into<Box<dyn StdError + Send + Sync>>, |
653 | { |
654 | ConnectError { |
655 | msg: msg.into(), |
656 | cause: Some(cause.into()), |
657 | } |
658 | } |
659 | |
660 | fn dns<E>(cause: E) -> ConnectError |
661 | where |
662 | E: Into<Box<dyn StdError + Send + Sync>>, |
663 | { |
664 | ConnectError::new("dns error" , cause) |
665 | } |
666 | |
667 | fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError |
668 | where |
669 | S: Into<Box<str>>, |
670 | E: Into<Box<dyn StdError + Send + Sync>>, |
671 | { |
672 | move |cause| ConnectError::new(msg, cause) |
673 | } |
674 | } |
675 | |
676 | impl fmt::Debug for ConnectError { |
677 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
678 | if let Some(ref cause: &Box) = self.cause { |
679 | f&mut DebugTuple<'_, '_>.debug_tuple(name:"ConnectError" ) |
680 | .field(&self.msg) |
681 | .field(cause) |
682 | .finish() |
683 | } else { |
684 | self.msg.fmt(f) |
685 | } |
686 | } |
687 | } |
688 | |
689 | impl fmt::Display for ConnectError { |
690 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
691 | f.write_str(&self.msg)?; |
692 | |
693 | if let Some(ref cause: &Box) = self.cause { |
694 | write!(f, ": {}" , cause)?; |
695 | } |
696 | |
697 | Ok(()) |
698 | } |
699 | } |
700 | |
701 | impl StdError for ConnectError { |
702 | fn source(&self) -> Option<&(dyn StdError + 'static)> { |
703 | self.cause.as_ref().map(|e: &Box| &**e as _) |
704 | } |
705 | } |
706 | |
707 | struct ConnectingTcp<'a> { |
708 | preferred: ConnectingTcpRemote, |
709 | fallback: Option<ConnectingTcpFallback>, |
710 | config: &'a Config, |
711 | } |
712 | |
713 | impl<'a> ConnectingTcp<'a> { |
714 | fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self { |
715 | if let Some(fallback_timeout) = config.happy_eyeballs_timeout { |
716 | let (preferred_addrs, fallback_addrs) = remote_addrs |
717 | .split_by_preference(config.local_address_ipv4, config.local_address_ipv6); |
718 | if fallback_addrs.is_empty() { |
719 | return ConnectingTcp { |
720 | preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), |
721 | fallback: None, |
722 | config, |
723 | }; |
724 | } |
725 | |
726 | ConnectingTcp { |
727 | preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), |
728 | fallback: Some(ConnectingTcpFallback { |
729 | delay: tokio::time::sleep(fallback_timeout), |
730 | remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout), |
731 | }), |
732 | config, |
733 | } |
734 | } else { |
735 | ConnectingTcp { |
736 | preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout), |
737 | fallback: None, |
738 | config, |
739 | } |
740 | } |
741 | } |
742 | } |
743 | |
744 | struct ConnectingTcpFallback { |
745 | delay: Sleep, |
746 | remote: ConnectingTcpRemote, |
747 | } |
748 | |
749 | struct ConnectingTcpRemote { |
750 | addrs: dns::SocketAddrs, |
751 | connect_timeout: Option<Duration>, |
752 | } |
753 | |
754 | impl ConnectingTcpRemote { |
755 | fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self { |
756 | let connect_timeout: Option = connect_timeout.and_then(|t: Duration| t.checked_div(addrs.len() as u32)); |
757 | |
758 | Self { |
759 | addrs, |
760 | connect_timeout, |
761 | } |
762 | } |
763 | } |
764 | |
765 | impl ConnectingTcpRemote { |
766 | async fn connect(&mut self, config: &Config) -> Result<TcpStream, ConnectError> { |
767 | let mut err = None; |
768 | for addr in &mut self.addrs { |
769 | debug!("connecting to {}" , addr); |
770 | match connect(&addr, config, self.connect_timeout)?.await { |
771 | Ok(tcp) => { |
772 | debug!("connected to {}" , addr); |
773 | return Ok(tcp); |
774 | } |
775 | Err(e) => { |
776 | trace!("connect error for {}: {:?}" , addr, e); |
777 | err = Some(e); |
778 | } |
779 | } |
780 | } |
781 | |
782 | match err { |
783 | Some(e) => Err(e), |
784 | None => Err(ConnectError::new( |
785 | "tcp connect error" , |
786 | std::io::Error::new(std::io::ErrorKind::NotConnected, "Network unreachable" ), |
787 | )), |
788 | } |
789 | } |
790 | } |
791 | |
792 | fn bind_local_address( |
793 | socket: &socket2::Socket, |
794 | dst_addr: &SocketAddr, |
795 | local_addr_ipv4: &Option<Ipv4Addr>, |
796 | local_addr_ipv6: &Option<Ipv6Addr>, |
797 | ) -> io::Result<()> { |
798 | match (*dst_addr, local_addr_ipv4, local_addr_ipv6) { |
799 | (SocketAddr::V4(_), Some(addr: &Ipv4Addr), _) => { |
800 | socket.bind(&SocketAddr::new((*addr).into(), port:0).into())?; |
801 | } |
802 | (SocketAddr::V6(_), _, Some(addr: &Ipv6Addr)) => { |
803 | socket.bind(&SocketAddr::new((*addr).into(), port:0).into())?; |
804 | } |
805 | _ => { |
806 | if cfg!(windows) { |
807 | // Windows requires a socket be bound before calling connect |
808 | let any: SocketAddr = match *dst_addr { |
809 | SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(), |
810 | SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(), |
811 | }; |
812 | socket.bind(&any.into())?; |
813 | } |
814 | } |
815 | } |
816 | |
817 | Ok(()) |
818 | } |
819 | |
820 | fn connect( |
821 | addr: &SocketAddr, |
822 | config: &Config, |
823 | connect_timeout: Option<Duration>, |
824 | ) -> Result<impl Future<Output = Result<TcpStream, ConnectError>>, ConnectError> { |
825 | // TODO(eliza): if Tokio's `TcpSocket` gains support for setting the |
826 | // keepalive timeout, it would be nice to use that instead of socket2, |
827 | // and avoid the unsafe `into_raw_fd`/`from_raw_fd` dance... |
828 | use socket2::{Domain, Protocol, Socket, Type}; |
829 | |
830 | let domain = Domain::for_address(*addr); |
831 | let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) |
832 | .map_err(ConnectError::m("tcp open error" ))?; |
833 | |
834 | // When constructing a Tokio `TcpSocket` from a raw fd/socket, the user is |
835 | // responsible for ensuring O_NONBLOCK is set. |
836 | socket |
837 | .set_nonblocking(true) |
838 | .map_err(ConnectError::m("tcp set_nonblocking error" ))?; |
839 | |
840 | if let Some(tcp_keepalive) = &config.tcp_keepalive_config.into_tcpkeepalive() { |
841 | if let Err(e) = socket.set_tcp_keepalive(tcp_keepalive) { |
842 | warn!("tcp set_keepalive error: {}" , e); |
843 | } |
844 | } |
845 | |
846 | // That this only works for some socket types, particularly AF_INET sockets. |
847 | #[cfg (any( |
848 | target_os = "android" , |
849 | target_os = "fuchsia" , |
850 | target_os = "illumos" , |
851 | target_os = "ios" , |
852 | target_os = "linux" , |
853 | target_os = "macos" , |
854 | target_os = "solaris" , |
855 | target_os = "tvos" , |
856 | target_os = "visionos" , |
857 | target_os = "watchos" , |
858 | ))] |
859 | if let Some(interface) = &config.interface { |
860 | // On Linux-like systems, set the interface to bind using |
861 | // `SO_BINDTODEVICE`. |
862 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
863 | socket |
864 | .bind_device(Some(interface.as_bytes())) |
865 | .map_err(ConnectError::m("tcp bind interface error" ))?; |
866 | |
867 | // On macOS-like and Solaris-like systems, we instead use `IP_BOUND_IF`. |
868 | // This socket option desires an integer index for the interface, so we |
869 | // must first determine the index of the requested interface name using |
870 | // `if_nametoindex`. |
871 | #[cfg (any( |
872 | target_os = "illumos" , |
873 | target_os = "ios" , |
874 | target_os = "macos" , |
875 | target_os = "solaris" , |
876 | target_os = "tvos" , |
877 | target_os = "visionos" , |
878 | target_os = "watchos" , |
879 | ))] |
880 | { |
881 | let idx = unsafe { libc::if_nametoindex(interface.as_ptr()) }; |
882 | let idx = std::num::NonZeroU32::new(idx).ok_or_else(|| { |
883 | // If the index is 0, check errno and return an I/O error. |
884 | ConnectError::new( |
885 | "error converting interface name to index" , |
886 | io::Error::last_os_error(), |
887 | ) |
888 | })?; |
889 | // Different setsockopt calls are necessary depending on whether the |
890 | // address is IPv4 or IPv6. |
891 | match addr { |
892 | SocketAddr::V4(_) => socket.bind_device_by_index_v4(Some(idx)), |
893 | SocketAddr::V6(_) => socket.bind_device_by_index_v6(Some(idx)), |
894 | } |
895 | .map_err(ConnectError::m("tcp bind interface error" ))?; |
896 | } |
897 | } |
898 | |
899 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
900 | if let Some(tcp_user_timeout) = &config.tcp_user_timeout { |
901 | if let Err(e) = socket.set_tcp_user_timeout(Some(*tcp_user_timeout)) { |
902 | warn!("tcp set_tcp_user_timeout error: {}" , e); |
903 | } |
904 | } |
905 | |
906 | bind_local_address( |
907 | &socket, |
908 | addr, |
909 | &config.local_address_ipv4, |
910 | &config.local_address_ipv6, |
911 | ) |
912 | .map_err(ConnectError::m("tcp bind local error" ))?; |
913 | |
914 | #[cfg (unix)] |
915 | let socket = unsafe { |
916 | // Safety: `from_raw_fd` is only safe to call if ownership of the raw |
917 | // file descriptor is transferred. Since we call `into_raw_fd` on the |
918 | // socket2 socket, it gives up ownership of the fd and will not close |
919 | // it, so this is safe. |
920 | use std::os::unix::io::{FromRawFd, IntoRawFd}; |
921 | TcpSocket::from_raw_fd(socket.into_raw_fd()) |
922 | }; |
923 | #[cfg (windows)] |
924 | let socket = unsafe { |
925 | // Safety: `from_raw_socket` is only safe to call if ownership of the raw |
926 | // Windows SOCKET is transferred. Since we call `into_raw_socket` on the |
927 | // socket2 socket, it gives up ownership of the SOCKET and will not close |
928 | // it, so this is safe. |
929 | use std::os::windows::io::{FromRawSocket, IntoRawSocket}; |
930 | TcpSocket::from_raw_socket(socket.into_raw_socket()) |
931 | }; |
932 | |
933 | if config.reuse_address { |
934 | if let Err(e) = socket.set_reuseaddr(true) { |
935 | warn!("tcp set_reuse_address error: {}" , e); |
936 | } |
937 | } |
938 | |
939 | if let Some(size) = config.send_buffer_size { |
940 | if let Err(e) = socket.set_send_buffer_size(size.try_into().unwrap_or(u32::MAX)) { |
941 | warn!("tcp set_buffer_size error: {}" , e); |
942 | } |
943 | } |
944 | |
945 | if let Some(size) = config.recv_buffer_size { |
946 | if let Err(e) = socket.set_recv_buffer_size(size.try_into().unwrap_or(u32::MAX)) { |
947 | warn!("tcp set_recv_buffer_size error: {}" , e); |
948 | } |
949 | } |
950 | |
951 | let connect = socket.connect(*addr); |
952 | Ok(async move { |
953 | match connect_timeout { |
954 | Some(dur) => match tokio::time::timeout(dur, connect).await { |
955 | Ok(Ok(s)) => Ok(s), |
956 | Ok(Err(e)) => Err(e), |
957 | Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)), |
958 | }, |
959 | None => connect.await, |
960 | } |
961 | .map_err(ConnectError::m("tcp connect error" )) |
962 | }) |
963 | } |
964 | |
965 | impl ConnectingTcp<'_> { |
966 | async fn connect(mut self) -> Result<TcpStream, ConnectError> { |
967 | match self.fallback { |
968 | None => self.preferred.connect(self.config).await, |
969 | Some(mut fallback) => { |
970 | let preferred_fut = self.preferred.connect(self.config); |
971 | futures_util::pin_mut!(preferred_fut); |
972 | |
973 | let fallback_fut = fallback.remote.connect(self.config); |
974 | futures_util::pin_mut!(fallback_fut); |
975 | |
976 | let fallback_delay = fallback.delay; |
977 | futures_util::pin_mut!(fallback_delay); |
978 | |
979 | let (result, future) = |
980 | match futures_util::future::select(preferred_fut, fallback_delay).await { |
981 | Either::Left((result, _fallback_delay)) => { |
982 | (result, Either::Right(fallback_fut)) |
983 | } |
984 | Either::Right(((), preferred_fut)) => { |
985 | // Delay is done, start polling both the preferred and the fallback |
986 | futures_util::future::select(preferred_fut, fallback_fut) |
987 | .await |
988 | .factor_first() |
989 | } |
990 | }; |
991 | |
992 | if result.is_err() { |
993 | // Fallback to the remaining future (could be preferred or fallback) |
994 | // if we get an error |
995 | future.await |
996 | } else { |
997 | result |
998 | } |
999 | } |
1000 | } |
1001 | } |
1002 | } |
1003 | |
1004 | /// Respect explicit ports in the URI, if none, either |
1005 | /// keep non `0` ports resolved from a custom dns resolver, |
1006 | /// or use the default port for the scheme. |
1007 | fn set_port(addr: &mut SocketAddr, host_port: u16, explicit: bool) { |
1008 | if explicit || addr.port() == 0 { |
1009 | addr.set_port(new_port:host_port) |
1010 | }; |
1011 | } |
1012 | |
1013 | #[cfg (test)] |
1014 | mod tests { |
1015 | use std::io; |
1016 | use std::net::SocketAddr; |
1017 | |
1018 | use ::http::Uri; |
1019 | |
1020 | use crate::client::legacy::connect::http::TcpKeepaliveConfig; |
1021 | |
1022 | use super::super::sealed::{Connect, ConnectSvc}; |
1023 | use super::{Config, ConnectError, HttpConnector}; |
1024 | |
1025 | use super::set_port; |
1026 | |
1027 | async fn connect<C>( |
1028 | connector: C, |
1029 | dst: Uri, |
1030 | ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error> |
1031 | where |
1032 | C: Connect, |
1033 | { |
1034 | connector.connect(super::super::sealed::Internal, dst).await |
1035 | } |
1036 | |
1037 | #[tokio::test ] |
1038 | #[cfg_attr (miri, ignore)] |
1039 | async fn test_errors_enforce_http() { |
1040 | let dst = "https://example.domain/foo/bar?baz" .parse().unwrap(); |
1041 | let connector = HttpConnector::new(); |
1042 | |
1043 | let err = connect(connector, dst).await.unwrap_err(); |
1044 | assert_eq!(&*err.msg, super::INVALID_NOT_HTTP); |
1045 | } |
1046 | |
1047 | #[cfg (any(target_os = "linux" , target_os = "macos" ))] |
1048 | fn get_local_ips() -> (Option<std::net::Ipv4Addr>, Option<std::net::Ipv6Addr>) { |
1049 | use std::net::{IpAddr, TcpListener}; |
1050 | |
1051 | let mut ip_v4 = None; |
1052 | let mut ip_v6 = None; |
1053 | |
1054 | let ips = pnet_datalink::interfaces() |
1055 | .into_iter() |
1056 | .flat_map(|i| i.ips.into_iter().map(|n| n.ip())); |
1057 | |
1058 | for ip in ips { |
1059 | match ip { |
1060 | IpAddr::V4(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v4 = Some(ip), |
1061 | IpAddr::V6(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v6 = Some(ip), |
1062 | _ => (), |
1063 | } |
1064 | |
1065 | if ip_v4.is_some() && ip_v6.is_some() { |
1066 | break; |
1067 | } |
1068 | } |
1069 | |
1070 | (ip_v4, ip_v6) |
1071 | } |
1072 | |
1073 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
1074 | fn default_interface() -> Option<String> { |
1075 | pnet_datalink::interfaces() |
1076 | .iter() |
1077 | .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty()) |
1078 | .map(|e| e.name.clone()) |
1079 | } |
1080 | |
1081 | #[tokio::test ] |
1082 | #[cfg_attr (miri, ignore)] |
1083 | async fn test_errors_missing_scheme() { |
1084 | let dst = "example.domain" .parse().unwrap(); |
1085 | let mut connector = HttpConnector::new(); |
1086 | connector.enforce_http(false); |
1087 | |
1088 | let err = connect(connector, dst).await.unwrap_err(); |
1089 | assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME); |
1090 | } |
1091 | |
1092 | // NOTE: pnet crate that we use in this test doesn't compile on Windows |
1093 | #[cfg (any(target_os = "linux" , target_os = "macos" ))] |
1094 | #[cfg_attr (miri, ignore)] |
1095 | #[tokio::test ] |
1096 | async fn local_address() { |
1097 | use std::net::{IpAddr, TcpListener}; |
1098 | |
1099 | let (bind_ip_v4, bind_ip_v6) = get_local_ips(); |
1100 | let server4 = TcpListener::bind("127.0.0.1:0" ).unwrap(); |
1101 | let port = server4.local_addr().unwrap().port(); |
1102 | let server6 = TcpListener::bind(&format!("[::1]:{}" , port)).unwrap(); |
1103 | |
1104 | let assert_client_ip = |dst: String, server: TcpListener, expected_ip: IpAddr| async move { |
1105 | let mut connector = HttpConnector::new(); |
1106 | |
1107 | match (bind_ip_v4, bind_ip_v6) { |
1108 | (Some(v4), Some(v6)) => connector.set_local_addresses(v4, v6), |
1109 | (Some(v4), None) => connector.set_local_address(Some(v4.into())), |
1110 | (None, Some(v6)) => connector.set_local_address(Some(v6.into())), |
1111 | _ => unreachable!(), |
1112 | } |
1113 | |
1114 | connect(connector, dst.parse().unwrap()).await.unwrap(); |
1115 | |
1116 | let (_, client_addr) = server.accept().unwrap(); |
1117 | |
1118 | assert_eq!(client_addr.ip(), expected_ip); |
1119 | }; |
1120 | |
1121 | if let Some(ip) = bind_ip_v4 { |
1122 | assert_client_ip(format!("http://127.0.0.1:{}" , port), server4, ip.into()).await; |
1123 | } |
1124 | |
1125 | if let Some(ip) = bind_ip_v6 { |
1126 | assert_client_ip(format!("http://[::1]:{}" , port), server6, ip.into()).await; |
1127 | } |
1128 | } |
1129 | |
1130 | // NOTE: pnet crate that we use in this test doesn't compile on Windows |
1131 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
1132 | #[tokio::test ] |
1133 | #[ignore = "setting `SO_BINDTODEVICE` requires the `CAP_NET_RAW` capability (works when running as root)" ] |
1134 | async fn interface() { |
1135 | use socket2::{Domain, Protocol, Socket, Type}; |
1136 | use std::net::TcpListener; |
1137 | |
1138 | let interface: Option<String> = default_interface(); |
1139 | |
1140 | let server4 = TcpListener::bind("127.0.0.1:0" ).unwrap(); |
1141 | let port = server4.local_addr().unwrap().port(); |
1142 | |
1143 | let server6 = TcpListener::bind(&format!("[::1]:{}" , port)).unwrap(); |
1144 | |
1145 | let assert_interface_name = |
1146 | |dst: String, |
1147 | server: TcpListener, |
1148 | bind_iface: Option<String>, |
1149 | expected_interface: Option<String>| async move { |
1150 | let mut connector = HttpConnector::new(); |
1151 | if let Some(iface) = bind_iface { |
1152 | connector.set_interface(iface); |
1153 | } |
1154 | |
1155 | connect(connector, dst.parse().unwrap()).await.unwrap(); |
1156 | let domain = Domain::for_address(server.local_addr().unwrap()); |
1157 | let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).unwrap(); |
1158 | |
1159 | assert_eq!( |
1160 | socket.device().unwrap().as_deref(), |
1161 | expected_interface.as_deref().map(|val| val.as_bytes()) |
1162 | ); |
1163 | }; |
1164 | |
1165 | assert_interface_name( |
1166 | format!("http://127.0.0.1:{}" , port), |
1167 | server4, |
1168 | interface.clone(), |
1169 | interface.clone(), |
1170 | ) |
1171 | .await; |
1172 | assert_interface_name( |
1173 | format!("http://[::1]:{}" , port), |
1174 | server6, |
1175 | interface.clone(), |
1176 | interface.clone(), |
1177 | ) |
1178 | .await; |
1179 | } |
1180 | |
1181 | #[test ] |
1182 | #[ignore ] // TODO |
1183 | #[cfg_attr (not(feature = "__internal_happy_eyeballs_tests" ), ignore)] |
1184 | fn client_happy_eyeballs() { |
1185 | use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; |
1186 | use std::time::{Duration, Instant}; |
1187 | |
1188 | use super::dns; |
1189 | use super::ConnectingTcp; |
1190 | |
1191 | let server4 = TcpListener::bind("127.0.0.1:0" ).unwrap(); |
1192 | let addr = server4.local_addr().unwrap(); |
1193 | let _server6 = TcpListener::bind(&format!("[::1]:{}" , addr.port())).unwrap(); |
1194 | let rt = tokio::runtime::Builder::new_current_thread() |
1195 | .enable_all() |
1196 | .build() |
1197 | .unwrap(); |
1198 | |
1199 | let local_timeout = Duration::default(); |
1200 | let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1; |
1201 | let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1; |
1202 | let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout) |
1203 | + Duration::from_millis(250); |
1204 | |
1205 | let scenarios = &[ |
1206 | // Fast primary, without fallback. |
1207 | (&[local_ipv4_addr()][..], 4, local_timeout, false), |
1208 | (&[local_ipv6_addr()][..], 6, local_timeout, false), |
1209 | // Fast primary, with (unused) fallback. |
1210 | ( |
1211 | &[local_ipv4_addr(), local_ipv6_addr()][..], |
1212 | 4, |
1213 | local_timeout, |
1214 | false, |
1215 | ), |
1216 | ( |
1217 | &[local_ipv6_addr(), local_ipv4_addr()][..], |
1218 | 6, |
1219 | local_timeout, |
1220 | false, |
1221 | ), |
1222 | // Unreachable + fast primary, without fallback. |
1223 | ( |
1224 | &[unreachable_ipv4_addr(), local_ipv4_addr()][..], |
1225 | 4, |
1226 | unreachable_v4_timeout, |
1227 | false, |
1228 | ), |
1229 | ( |
1230 | &[unreachable_ipv6_addr(), local_ipv6_addr()][..], |
1231 | 6, |
1232 | unreachable_v6_timeout, |
1233 | false, |
1234 | ), |
1235 | // Unreachable + fast primary, with (unused) fallback. |
1236 | ( |
1237 | &[ |
1238 | unreachable_ipv4_addr(), |
1239 | local_ipv4_addr(), |
1240 | local_ipv6_addr(), |
1241 | ][..], |
1242 | 4, |
1243 | unreachable_v4_timeout, |
1244 | false, |
1245 | ), |
1246 | ( |
1247 | &[ |
1248 | unreachable_ipv6_addr(), |
1249 | local_ipv6_addr(), |
1250 | local_ipv4_addr(), |
1251 | ][..], |
1252 | 6, |
1253 | unreachable_v6_timeout, |
1254 | true, |
1255 | ), |
1256 | // Slow primary, with (used) fallback. |
1257 | ( |
1258 | &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..], |
1259 | 6, |
1260 | fallback_timeout, |
1261 | false, |
1262 | ), |
1263 | ( |
1264 | &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..], |
1265 | 4, |
1266 | fallback_timeout, |
1267 | true, |
1268 | ), |
1269 | // Slow primary, with (used) unreachable + fast fallback. |
1270 | ( |
1271 | &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..], |
1272 | 6, |
1273 | fallback_timeout + unreachable_v6_timeout, |
1274 | false, |
1275 | ), |
1276 | ( |
1277 | &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..], |
1278 | 4, |
1279 | fallback_timeout + unreachable_v4_timeout, |
1280 | true, |
1281 | ), |
1282 | ]; |
1283 | |
1284 | // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network. |
1285 | // Otherwise, connection to "slow" IPv6 address will error-out immediately. |
1286 | let ipv6_accessible = measure_connect(slow_ipv6_addr()).0; |
1287 | |
1288 | for &(hosts, family, timeout, needs_ipv6_access) in scenarios { |
1289 | if needs_ipv6_access && !ipv6_accessible { |
1290 | continue; |
1291 | } |
1292 | |
1293 | let (start, stream) = rt |
1294 | .block_on(async move { |
1295 | let addrs = hosts |
1296 | .iter() |
1297 | .map(|host| (host.clone(), addr.port()).into()) |
1298 | .collect(); |
1299 | let cfg = Config { |
1300 | local_address_ipv4: None, |
1301 | local_address_ipv6: None, |
1302 | connect_timeout: None, |
1303 | tcp_keepalive_config: TcpKeepaliveConfig::default(), |
1304 | happy_eyeballs_timeout: Some(fallback_timeout), |
1305 | nodelay: false, |
1306 | reuse_address: false, |
1307 | enforce_http: false, |
1308 | send_buffer_size: None, |
1309 | recv_buffer_size: None, |
1310 | #[cfg (any( |
1311 | target_os = "android" , |
1312 | target_os = "fuchsia" , |
1313 | target_os = "linux" |
1314 | ))] |
1315 | interface: None, |
1316 | #[cfg (any( |
1317 | target_os = "illumos" , |
1318 | target_os = "ios" , |
1319 | target_os = "macos" , |
1320 | target_os = "solaris" , |
1321 | target_os = "tvos" , |
1322 | target_os = "visionos" , |
1323 | target_os = "watchos" , |
1324 | ))] |
1325 | interface: None, |
1326 | #[cfg (any( |
1327 | target_os = "android" , |
1328 | target_os = "fuchsia" , |
1329 | target_os = "linux" |
1330 | ))] |
1331 | tcp_user_timeout: None, |
1332 | }; |
1333 | let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg); |
1334 | let start = Instant::now(); |
1335 | Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?)) |
1336 | }) |
1337 | .unwrap(); |
1338 | let res = if stream.peer_addr().unwrap().is_ipv4() { |
1339 | 4 |
1340 | } else { |
1341 | 6 |
1342 | }; |
1343 | let duration = start.elapsed(); |
1344 | |
1345 | // Allow actual duration to be +/- 150ms off. |
1346 | let min_duration = if timeout >= Duration::from_millis(150) { |
1347 | timeout - Duration::from_millis(150) |
1348 | } else { |
1349 | Duration::default() |
1350 | }; |
1351 | let max_duration = timeout + Duration::from_millis(150); |
1352 | |
1353 | assert_eq!(res, family); |
1354 | assert!(duration >= min_duration); |
1355 | assert!(duration <= max_duration); |
1356 | } |
1357 | |
1358 | fn local_ipv4_addr() -> IpAddr { |
1359 | Ipv4Addr::new(127, 0, 0, 1).into() |
1360 | } |
1361 | |
1362 | fn local_ipv6_addr() -> IpAddr { |
1363 | Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into() |
1364 | } |
1365 | |
1366 | fn unreachable_ipv4_addr() -> IpAddr { |
1367 | Ipv4Addr::new(127, 0, 0, 2).into() |
1368 | } |
1369 | |
1370 | fn unreachable_ipv6_addr() -> IpAddr { |
1371 | Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into() |
1372 | } |
1373 | |
1374 | fn slow_ipv4_addr() -> IpAddr { |
1375 | // RFC 6890 reserved IPv4 address. |
1376 | Ipv4Addr::new(198, 18, 0, 25).into() |
1377 | } |
1378 | |
1379 | fn slow_ipv6_addr() -> IpAddr { |
1380 | // RFC 6890 reserved IPv6 address. |
1381 | Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into() |
1382 | } |
1383 | |
1384 | fn measure_connect(addr: IpAddr) -> (bool, Duration) { |
1385 | let start = Instant::now(); |
1386 | let result = |
1387 | std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1)); |
1388 | |
1389 | let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut; |
1390 | let duration = start.elapsed(); |
1391 | (reachable, duration) |
1392 | } |
1393 | } |
1394 | |
1395 | use std::time::Duration; |
1396 | |
1397 | #[test ] |
1398 | fn no_tcp_keepalive_config() { |
1399 | assert!(TcpKeepaliveConfig::default().into_tcpkeepalive().is_none()); |
1400 | } |
1401 | |
1402 | #[test ] |
1403 | fn tcp_keepalive_time_config() { |
1404 | let mut kac = TcpKeepaliveConfig::default(); |
1405 | kac.time = Some(Duration::from_secs(60)); |
1406 | if let Some(tcp_keepalive) = kac.into_tcpkeepalive() { |
1407 | assert!(format!("{tcp_keepalive:?}" ).contains("time: Some(60s)" )); |
1408 | } else { |
1409 | panic!("test failed" ); |
1410 | } |
1411 | } |
1412 | |
1413 | #[cfg (not(any(target_os = "openbsd" , target_os = "redox" , target_os = "solaris" )))] |
1414 | #[test ] |
1415 | fn tcp_keepalive_interval_config() { |
1416 | let mut kac = TcpKeepaliveConfig::default(); |
1417 | kac.interval = Some(Duration::from_secs(1)); |
1418 | if let Some(tcp_keepalive) = kac.into_tcpkeepalive() { |
1419 | assert!(format!("{tcp_keepalive:?}" ).contains("interval: Some(1s)" )); |
1420 | } else { |
1421 | panic!("test failed" ); |
1422 | } |
1423 | } |
1424 | |
1425 | #[cfg (not(any( |
1426 | target_os = "openbsd" , |
1427 | target_os = "redox" , |
1428 | target_os = "solaris" , |
1429 | target_os = "windows" |
1430 | )))] |
1431 | #[test ] |
1432 | fn tcp_keepalive_retries_config() { |
1433 | let mut kac = TcpKeepaliveConfig::default(); |
1434 | kac.retries = Some(3); |
1435 | if let Some(tcp_keepalive) = kac.into_tcpkeepalive() { |
1436 | assert!(format!("{tcp_keepalive:?}" ).contains("retries: Some(3)" )); |
1437 | } else { |
1438 | panic!("test failed" ); |
1439 | } |
1440 | } |
1441 | |
1442 | #[test ] |
1443 | fn test_set_port() { |
1444 | // Respect explicit ports no matter what the resolved port is. |
1445 | let mut addr = SocketAddr::from(([0, 0, 0, 0], 6881)); |
1446 | set_port(&mut addr, 42, true); |
1447 | assert_eq!(addr.port(), 42); |
1448 | |
1449 | // Ignore default host port, and use the socket port instead. |
1450 | let mut addr = SocketAddr::from(([0, 0, 0, 0], 6881)); |
1451 | set_port(&mut addr, 443, false); |
1452 | assert_eq!(addr.port(), 6881); |
1453 | |
1454 | // Use the default port if the resolved port is `0`. |
1455 | let mut addr = SocketAddr::from(([0, 0, 0, 0], 0)); |
1456 | set_port(&mut addr, 443, false); |
1457 | assert_eq!(addr.port(), 443); |
1458 | } |
1459 | } |
1460 | |