| 1 | use std::error::Error as StdError; |
| 2 | use std::fmt; |
| 3 | use std::future::Future; |
| 4 | use std::io; |
| 5 | use std::marker::PhantomData; |
| 6 | use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; |
| 7 | use std::pin::Pin; |
| 8 | use std::sync::Arc; |
| 9 | use std::task::{self, Poll}; |
| 10 | use std::time::Duration; |
| 11 | |
| 12 | use futures_util::future::Either; |
| 13 | use http::uri::{Scheme, Uri}; |
| 14 | use pin_project_lite::pin_project; |
| 15 | use socket2::TcpKeepalive; |
| 16 | use tokio::net::{TcpSocket, TcpStream}; |
| 17 | use tokio::time::Sleep; |
| 18 | use tracing::{debug, trace, warn}; |
| 19 | |
| 20 | use super::dns::{self, resolve, GaiResolver, Resolve}; |
| 21 | use super::{Connected, Connection}; |
| 22 | use crate::rt::TokioIo; |
| 23 | |
| 24 | /// A connector for the `http` scheme. |
| 25 | /// |
| 26 | /// Performs DNS resolution in a thread pool, and then connects over TCP. |
| 27 | /// |
| 28 | /// # Note |
| 29 | /// |
| 30 | /// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes |
| 31 | /// transport information such as the remote socket address used. |
| 32 | #[derive (Clone)] |
| 33 | pub struct HttpConnector<R = GaiResolver> { |
| 34 | config: Arc<Config>, |
| 35 | resolver: R, |
| 36 | } |
| 37 | |
| 38 | /// Extra information about the transport when an HttpConnector is used. |
| 39 | /// |
| 40 | /// # Example |
| 41 | /// |
| 42 | /// ``` |
| 43 | /// # fn doc(res: http::Response<()>) { |
| 44 | /// use hyper_util::client::legacy::connect::HttpInfo; |
| 45 | /// |
| 46 | /// // res = http::Response |
| 47 | /// res |
| 48 | /// .extensions() |
| 49 | /// .get::<HttpInfo>() |
| 50 | /// .map(|info| { |
| 51 | /// println!("remote addr = {}" , info.remote_addr()); |
| 52 | /// }); |
| 53 | /// # } |
| 54 | /// ``` |
| 55 | /// |
| 56 | /// # Note |
| 57 | /// |
| 58 | /// If a different connector is used besides [`HttpConnector`](HttpConnector), |
| 59 | /// this value will not exist in the extensions. Consult that specific |
| 60 | /// connector to see what "extra" information it might provide to responses. |
| 61 | #[derive (Clone, Debug)] |
| 62 | pub struct HttpInfo { |
| 63 | remote_addr: SocketAddr, |
| 64 | local_addr: SocketAddr, |
| 65 | } |
| 66 | |
| 67 | #[derive (Clone)] |
| 68 | struct Config { |
| 69 | connect_timeout: Option<Duration>, |
| 70 | enforce_http: bool, |
| 71 | happy_eyeballs_timeout: Option<Duration>, |
| 72 | tcp_keepalive_config: TcpKeepaliveConfig, |
| 73 | local_address_ipv4: Option<Ipv4Addr>, |
| 74 | local_address_ipv6: Option<Ipv6Addr>, |
| 75 | nodelay: bool, |
| 76 | reuse_address: bool, |
| 77 | send_buffer_size: Option<usize>, |
| 78 | recv_buffer_size: Option<usize>, |
| 79 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 80 | interface: Option<String>, |
| 81 | #[cfg (any( |
| 82 | target_os = "illumos" , |
| 83 | target_os = "ios" , |
| 84 | target_os = "macos" , |
| 85 | target_os = "solaris" , |
| 86 | target_os = "tvos" , |
| 87 | target_os = "visionos" , |
| 88 | target_os = "watchos" , |
| 89 | ))] |
| 90 | interface: Option<std::ffi::CString>, |
| 91 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 92 | tcp_user_timeout: Option<Duration>, |
| 93 | } |
| 94 | |
| 95 | #[derive (Default, Debug, Clone, Copy)] |
| 96 | struct TcpKeepaliveConfig { |
| 97 | time: Option<Duration>, |
| 98 | interval: Option<Duration>, |
| 99 | retries: Option<u32>, |
| 100 | } |
| 101 | |
| 102 | impl TcpKeepaliveConfig { |
| 103 | /// Converts into a `socket2::TcpKeealive` if there is any keep alive configuration. |
| 104 | fn into_tcpkeepalive(self) -> Option<TcpKeepalive> { |
| 105 | let mut dirty = false; |
| 106 | let mut ka = TcpKeepalive::new(); |
| 107 | if let Some(time) = self.time { |
| 108 | ka = ka.with_time(time); |
| 109 | dirty = true |
| 110 | } |
| 111 | if let Some(interval) = self.interval { |
| 112 | ka = Self::ka_with_interval(ka, interval, &mut dirty) |
| 113 | }; |
| 114 | if let Some(retries) = self.retries { |
| 115 | ka = Self::ka_with_retries(ka, retries, &mut dirty) |
| 116 | }; |
| 117 | if dirty { |
| 118 | Some(ka) |
| 119 | } else { |
| 120 | None |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | #[cfg ( |
| 125 | // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#511-525 |
| 126 | any( |
| 127 | target_os = "android" , |
| 128 | target_os = "dragonfly" , |
| 129 | target_os = "freebsd" , |
| 130 | target_os = "fuchsia" , |
| 131 | target_os = "illumos" , |
| 132 | target_os = "ios" , |
| 133 | target_os = "visionos" , |
| 134 | target_os = "linux" , |
| 135 | target_os = "macos" , |
| 136 | target_os = "netbsd" , |
| 137 | target_os = "tvos" , |
| 138 | target_os = "watchos" , |
| 139 | target_os = "windows" , |
| 140 | ) |
| 141 | )] |
| 142 | fn ka_with_interval(ka: TcpKeepalive, interval: Duration, dirty: &mut bool) -> TcpKeepalive { |
| 143 | *dirty = true; |
| 144 | ka.with_interval(interval) |
| 145 | } |
| 146 | |
| 147 | #[cfg (not( |
| 148 | // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#511-525 |
| 149 | any( |
| 150 | target_os = "android" , |
| 151 | target_os = "dragonfly" , |
| 152 | target_os = "freebsd" , |
| 153 | target_os = "fuchsia" , |
| 154 | target_os = "illumos" , |
| 155 | target_os = "ios" , |
| 156 | target_os = "visionos" , |
| 157 | target_os = "linux" , |
| 158 | target_os = "macos" , |
| 159 | target_os = "netbsd" , |
| 160 | target_os = "tvos" , |
| 161 | target_os = "watchos" , |
| 162 | target_os = "windows" , |
| 163 | ) |
| 164 | ))] |
| 165 | fn ka_with_interval(ka: TcpKeepalive, _: Duration, _: &mut bool) -> TcpKeepalive { |
| 166 | ka // no-op as keepalive interval is not supported on this platform |
| 167 | } |
| 168 | |
| 169 | #[cfg ( |
| 170 | // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#557-570 |
| 171 | any( |
| 172 | target_os = "android" , |
| 173 | target_os = "dragonfly" , |
| 174 | target_os = "freebsd" , |
| 175 | target_os = "fuchsia" , |
| 176 | target_os = "illumos" , |
| 177 | target_os = "ios" , |
| 178 | target_os = "visionos" , |
| 179 | target_os = "linux" , |
| 180 | target_os = "macos" , |
| 181 | target_os = "netbsd" , |
| 182 | target_os = "tvos" , |
| 183 | target_os = "watchos" , |
| 184 | ) |
| 185 | )] |
| 186 | fn ka_with_retries(ka: TcpKeepalive, retries: u32, dirty: &mut bool) -> TcpKeepalive { |
| 187 | *dirty = true; |
| 188 | ka.with_retries(retries) |
| 189 | } |
| 190 | |
| 191 | #[cfg (not( |
| 192 | // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#557-570 |
| 193 | any( |
| 194 | target_os = "android" , |
| 195 | target_os = "dragonfly" , |
| 196 | target_os = "freebsd" , |
| 197 | target_os = "fuchsia" , |
| 198 | target_os = "illumos" , |
| 199 | target_os = "ios" , |
| 200 | target_os = "visionos" , |
| 201 | target_os = "linux" , |
| 202 | target_os = "macos" , |
| 203 | target_os = "netbsd" , |
| 204 | target_os = "tvos" , |
| 205 | target_os = "watchos" , |
| 206 | ) |
| 207 | ))] |
| 208 | fn ka_with_retries(ka: TcpKeepalive, _: u32, _: &mut bool) -> TcpKeepalive { |
| 209 | ka // no-op as keepalive retries is not supported on this platform |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | // ===== impl HttpConnector ===== |
| 214 | |
| 215 | impl HttpConnector { |
| 216 | /// Construct a new HttpConnector. |
| 217 | pub fn new() -> HttpConnector { |
| 218 | HttpConnector::new_with_resolver(GaiResolver::new()) |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | impl<R> HttpConnector<R> { |
| 223 | /// Construct a new HttpConnector. |
| 224 | /// |
| 225 | /// Takes a [`Resolver`](crate::client::legacy::connect::dns#resolvers-are-services) to handle DNS lookups. |
| 226 | pub fn new_with_resolver(resolver: R) -> HttpConnector<R> { |
| 227 | HttpConnector { |
| 228 | config: Arc::new(Config { |
| 229 | connect_timeout: None, |
| 230 | enforce_http: true, |
| 231 | happy_eyeballs_timeout: Some(Duration::from_millis(300)), |
| 232 | tcp_keepalive_config: TcpKeepaliveConfig::default(), |
| 233 | local_address_ipv4: None, |
| 234 | local_address_ipv6: None, |
| 235 | nodelay: false, |
| 236 | reuse_address: false, |
| 237 | send_buffer_size: None, |
| 238 | recv_buffer_size: None, |
| 239 | #[cfg (any( |
| 240 | target_os = "android" , |
| 241 | target_os = "fuchsia" , |
| 242 | target_os = "illumos" , |
| 243 | target_os = "ios" , |
| 244 | target_os = "linux" , |
| 245 | target_os = "macos" , |
| 246 | target_os = "solaris" , |
| 247 | target_os = "tvos" , |
| 248 | target_os = "visionos" , |
| 249 | target_os = "watchos" , |
| 250 | ))] |
| 251 | interface: None, |
| 252 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 253 | tcp_user_timeout: None, |
| 254 | }), |
| 255 | resolver, |
| 256 | } |
| 257 | } |
| 258 | |
| 259 | /// Option to enforce all `Uri`s have the `http` scheme. |
| 260 | /// |
| 261 | /// Enabled by default. |
| 262 | #[inline ] |
| 263 | pub fn enforce_http(&mut self, is_enforced: bool) { |
| 264 | self.config_mut().enforce_http = is_enforced; |
| 265 | } |
| 266 | |
| 267 | /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration |
| 268 | /// to remain idle before sending TCP keepalive probes. |
| 269 | /// |
| 270 | /// If `None`, keepalive is disabled. |
| 271 | /// |
| 272 | /// Default is `None`. |
| 273 | #[inline ] |
| 274 | pub fn set_keepalive(&mut self, time: Option<Duration>) { |
| 275 | self.config_mut().tcp_keepalive_config.time = time; |
| 276 | } |
| 277 | |
| 278 | /// Set the duration between two successive TCP keepalive retransmissions, |
| 279 | /// if acknowledgement to the previous keepalive transmission is not received. |
| 280 | #[inline ] |
| 281 | pub fn set_keepalive_interval(&mut self, interval: Option<Duration>) { |
| 282 | self.config_mut().tcp_keepalive_config.interval = interval; |
| 283 | } |
| 284 | |
| 285 | /// Set the number of retransmissions to be carried out before declaring that remote end is not available. |
| 286 | #[inline ] |
| 287 | pub fn set_keepalive_retries(&mut self, retries: Option<u32>) { |
| 288 | self.config_mut().tcp_keepalive_config.retries = retries; |
| 289 | } |
| 290 | |
| 291 | /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`. |
| 292 | /// |
| 293 | /// Default is `false`. |
| 294 | #[inline ] |
| 295 | pub fn set_nodelay(&mut self, nodelay: bool) { |
| 296 | self.config_mut().nodelay = nodelay; |
| 297 | } |
| 298 | |
| 299 | /// Sets the value of the SO_SNDBUF option on the socket. |
| 300 | #[inline ] |
| 301 | pub fn set_send_buffer_size(&mut self, size: Option<usize>) { |
| 302 | self.config_mut().send_buffer_size = size; |
| 303 | } |
| 304 | |
| 305 | /// Sets the value of the SO_RCVBUF option on the socket. |
| 306 | #[inline ] |
| 307 | pub fn set_recv_buffer_size(&mut self, size: Option<usize>) { |
| 308 | self.config_mut().recv_buffer_size = size; |
| 309 | } |
| 310 | |
| 311 | /// Set that all sockets are bound to the configured address before connection. |
| 312 | /// |
| 313 | /// If `None`, the sockets will not be bound. |
| 314 | /// |
| 315 | /// Default is `None`. |
| 316 | #[inline ] |
| 317 | pub fn set_local_address(&mut self, addr: Option<IpAddr>) { |
| 318 | let (v4, v6) = match addr { |
| 319 | Some(IpAddr::V4(a)) => (Some(a), None), |
| 320 | Some(IpAddr::V6(a)) => (None, Some(a)), |
| 321 | _ => (None, None), |
| 322 | }; |
| 323 | |
| 324 | let cfg = self.config_mut(); |
| 325 | |
| 326 | cfg.local_address_ipv4 = v4; |
| 327 | cfg.local_address_ipv6 = v6; |
| 328 | } |
| 329 | |
| 330 | /// Set that all sockets are bound to the configured IPv4 or IPv6 address (depending on host's |
| 331 | /// preferences) before connection. |
| 332 | #[inline ] |
| 333 | pub fn set_local_addresses(&mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) { |
| 334 | let cfg = self.config_mut(); |
| 335 | |
| 336 | cfg.local_address_ipv4 = Some(addr_ipv4); |
| 337 | cfg.local_address_ipv6 = Some(addr_ipv6); |
| 338 | } |
| 339 | |
| 340 | /// Set the connect timeout. |
| 341 | /// |
| 342 | /// If a domain resolves to multiple IP addresses, the timeout will be |
| 343 | /// evenly divided across them. |
| 344 | /// |
| 345 | /// Default is `None`. |
| 346 | #[inline ] |
| 347 | pub fn set_connect_timeout(&mut self, dur: Option<Duration>) { |
| 348 | self.config_mut().connect_timeout = dur; |
| 349 | } |
| 350 | |
| 351 | /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. |
| 352 | /// |
| 353 | /// If hostname resolves to both IPv4 and IPv6 addresses and connection |
| 354 | /// cannot be established using preferred address family before timeout |
| 355 | /// elapses, then connector will in parallel attempt connection using other |
| 356 | /// address family. |
| 357 | /// |
| 358 | /// If `None`, parallel connection attempts are disabled. |
| 359 | /// |
| 360 | /// Default is 300 milliseconds. |
| 361 | /// |
| 362 | /// [RFC 6555]: https://tools.ietf.org/html/rfc6555 |
| 363 | #[inline ] |
| 364 | pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) { |
| 365 | self.config_mut().happy_eyeballs_timeout = dur; |
| 366 | } |
| 367 | |
| 368 | /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`. |
| 369 | /// |
| 370 | /// Default is `false`. |
| 371 | #[inline ] |
| 372 | pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self { |
| 373 | self.config_mut().reuse_address = reuse_address; |
| 374 | self |
| 375 | } |
| 376 | |
| 377 | /// Sets the name of the interface to bind sockets produced by this |
| 378 | /// connector. |
| 379 | /// |
| 380 | /// On Linux, this sets the `SO_BINDTODEVICE` option on this socket (see |
| 381 | /// [`man 7 socket`] for details). On macOS (and macOS-derived systems like |
| 382 | /// iOS), illumos, and Solaris, this will instead use the `IP_BOUND_IF` |
| 383 | /// socket option (see [`man 7p ip`]). |
| 384 | /// |
| 385 | /// If a socket is bound to an interface, only packets received from that particular |
| 386 | /// interface are processed by the socket. Note that this only works for some socket |
| 387 | /// types, particularly `AF_INET`` sockets. |
| 388 | /// |
| 389 | /// On Linux it can be used to specify a [VRF], but the binary needs |
| 390 | /// to either have `CAP_NET_RAW` or to be run as root. |
| 391 | /// |
| 392 | /// This function is only available on the following operating systems: |
| 393 | /// - Linux, including Android |
| 394 | /// - Fuchsia |
| 395 | /// - illumos and Solaris |
| 396 | /// - macOS, iOS, visionOS, watchOS, and tvOS |
| 397 | /// |
| 398 | /// [VRF]: https://www.kernel.org/doc/Documentation/networking/vrf.txt |
| 399 | /// [`man 7 socket`] https://man7.org/linux/man-pages/man7/socket.7.html |
| 400 | /// [`man 7p ip`]: https://docs.oracle.com/cd/E86824_01/html/E54777/ip-7p.html |
| 401 | #[cfg (any( |
| 402 | target_os = "android" , |
| 403 | target_os = "fuchsia" , |
| 404 | target_os = "illumos" , |
| 405 | target_os = "ios" , |
| 406 | target_os = "linux" , |
| 407 | target_os = "macos" , |
| 408 | target_os = "solaris" , |
| 409 | target_os = "tvos" , |
| 410 | target_os = "visionos" , |
| 411 | target_os = "watchos" , |
| 412 | ))] |
| 413 | #[inline ] |
| 414 | pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self { |
| 415 | let interface = interface.into(); |
| 416 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 417 | { |
| 418 | self.config_mut().interface = Some(interface); |
| 419 | } |
| 420 | #[cfg (not(any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" )))] |
| 421 | { |
| 422 | let interface = std::ffi::CString::new(interface) |
| 423 | .expect("interface name should not have nulls in it" ); |
| 424 | self.config_mut().interface = Some(interface); |
| 425 | } |
| 426 | self |
| 427 | } |
| 428 | |
| 429 | /// Sets the value of the TCP_USER_TIMEOUT option on the socket. |
| 430 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 431 | #[inline ] |
| 432 | pub fn set_tcp_user_timeout(&mut self, time: Option<Duration>) { |
| 433 | self.config_mut().tcp_user_timeout = time; |
| 434 | } |
| 435 | |
| 436 | // private |
| 437 | |
| 438 | fn config_mut(&mut self) -> &mut Config { |
| 439 | // If the are HttpConnector clones, this will clone the inner |
| 440 | // config. So mutating the config won't ever affect previous |
| 441 | // clones. |
| 442 | Arc::make_mut(&mut self.config) |
| 443 | } |
| 444 | } |
| 445 | |
| 446 | static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http" ; |
| 447 | static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing" ; |
| 448 | static INVALID_MISSING_HOST: &str = "invalid URL, host is missing" ; |
| 449 | |
| 450 | // R: Debug required for now to allow adding it to debug output later... |
| 451 | impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> { |
| 452 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 453 | f.debug_struct(name:"HttpConnector" ).finish() |
| 454 | } |
| 455 | } |
| 456 | |
| 457 | impl<R> tower_service::Service<Uri> for HttpConnector<R> |
| 458 | where |
| 459 | R: Resolve + Clone + Send + Sync + 'static, |
| 460 | R::Future: Send, |
| 461 | { |
| 462 | type Response = TokioIo<TcpStream>; |
| 463 | type Error = ConnectError; |
| 464 | type Future = HttpConnecting<R>; |
| 465 | |
| 466 | fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 467 | futures_util::ready!(self.resolver.poll_ready(cx)).map_err(op:ConnectError::dns)?; |
| 468 | Poll::Ready(Ok(())) |
| 469 | } |
| 470 | |
| 471 | fn call(&mut self, dst: Uri) -> Self::Future { |
| 472 | let mut self_: HttpConnector = self.clone(); |
| 473 | HttpConnecting { |
| 474 | fut: Box::pin(async move { self_.call_async(dst).await }), |
| 475 | _marker: PhantomData, |
| 476 | } |
| 477 | } |
| 478 | } |
| 479 | |
| 480 | fn get_host_port<'u>(config: &Config, dst: &'u Uri) -> Result<(&'u str, u16), ConnectError> { |
| 481 | trace!( |
| 482 | "Http::connect; scheme= {:?}, host= {:?}, port= {:?}" , |
| 483 | dst.scheme(), |
| 484 | dst.host(), |
| 485 | dst.port(), |
| 486 | ); |
| 487 | |
| 488 | if config.enforce_http { |
| 489 | if dst.scheme() != Some(&Scheme::HTTP) { |
| 490 | return Err(ConnectError { |
| 491 | msg: INVALID_NOT_HTTP.into(), |
| 492 | cause: None, |
| 493 | }); |
| 494 | } |
| 495 | } else if dst.scheme().is_none() { |
| 496 | return Err(ConnectError { |
| 497 | msg: INVALID_MISSING_SCHEME.into(), |
| 498 | cause: None, |
| 499 | }); |
| 500 | } |
| 501 | |
| 502 | let host = match dst.host() { |
| 503 | Some(s) => s, |
| 504 | None => { |
| 505 | return Err(ConnectError { |
| 506 | msg: INVALID_MISSING_HOST.into(), |
| 507 | cause: None, |
| 508 | }) |
| 509 | } |
| 510 | }; |
| 511 | let port = match dst.port() { |
| 512 | Some(port) => port.as_u16(), |
| 513 | None => { |
| 514 | if dst.scheme() == Some(&Scheme::HTTPS) { |
| 515 | 443 |
| 516 | } else { |
| 517 | 80 |
| 518 | } |
| 519 | } |
| 520 | }; |
| 521 | |
| 522 | Ok((host, port)) |
| 523 | } |
| 524 | |
| 525 | impl<R> HttpConnector<R> |
| 526 | where |
| 527 | R: Resolve, |
| 528 | { |
| 529 | async fn call_async(&mut self, dst: Uri) -> Result<TokioIo<TcpStream>, ConnectError> { |
| 530 | let config = &self.config; |
| 531 | |
| 532 | let (host, port) = get_host_port(config, &dst)?; |
| 533 | let host = host.trim_start_matches('[' ).trim_end_matches(']' ); |
| 534 | |
| 535 | // If the host is already an IP addr (v4 or v6), |
| 536 | // skip resolving the dns and start connecting right away. |
| 537 | let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) { |
| 538 | addrs |
| 539 | } else { |
| 540 | let addrs = resolve(&mut self.resolver, dns::Name::new(host.into())) |
| 541 | .await |
| 542 | .map_err(ConnectError::dns)?; |
| 543 | let addrs = addrs |
| 544 | .map(|mut addr| { |
| 545 | set_port(&mut addr, port, dst.port().is_some()); |
| 546 | |
| 547 | addr |
| 548 | }) |
| 549 | .collect(); |
| 550 | dns::SocketAddrs::new(addrs) |
| 551 | }; |
| 552 | |
| 553 | let c = ConnectingTcp::new(addrs, config); |
| 554 | |
| 555 | let sock = c.connect().await?; |
| 556 | |
| 557 | if let Err(e) = sock.set_nodelay(config.nodelay) { |
| 558 | warn!("tcp set_nodelay error: {}" , e); |
| 559 | } |
| 560 | |
| 561 | Ok(TokioIo::new(sock)) |
| 562 | } |
| 563 | } |
| 564 | |
| 565 | impl Connection for TcpStream { |
| 566 | fn connected(&self) -> Connected { |
| 567 | let connected: Connected = Connected::new(); |
| 568 | if let (Ok(remote_addr: SocketAddr), Ok(local_addr: SocketAddr)) = (self.peer_addr(), self.local_addr()) { |
| 569 | connected.extra(HttpInfo { |
| 570 | remote_addr, |
| 571 | local_addr, |
| 572 | }) |
| 573 | } else { |
| 574 | connected |
| 575 | } |
| 576 | } |
| 577 | } |
| 578 | |
| 579 | #[cfg (unix)] |
| 580 | impl Connection for tokio::net::UnixStream { |
| 581 | fn connected(&self) -> Connected { |
| 582 | Connected::new() |
| 583 | } |
| 584 | } |
| 585 | |
| 586 | #[cfg (windows)] |
| 587 | impl Connection for tokio::net::windows::named_pipe::NamedPipeClient { |
| 588 | fn connected(&self) -> Connected { |
| 589 | Connected::new() |
| 590 | } |
| 591 | } |
| 592 | |
| 593 | // Implement `Connection` for generic `TokioIo<T>` so that external crates can |
| 594 | // implement their own `HttpConnector` with `TokioIo<CustomTcpStream>`. |
| 595 | impl<T> Connection for TokioIo<T> |
| 596 | where |
| 597 | T: Connection, |
| 598 | { |
| 599 | fn connected(&self) -> Connected { |
| 600 | self.inner().connected() |
| 601 | } |
| 602 | } |
| 603 | |
| 604 | impl HttpInfo { |
| 605 | /// Get the remote address of the transport used. |
| 606 | pub fn remote_addr(&self) -> SocketAddr { |
| 607 | self.remote_addr |
| 608 | } |
| 609 | |
| 610 | /// Get the local address of the transport used. |
| 611 | pub fn local_addr(&self) -> SocketAddr { |
| 612 | self.local_addr |
| 613 | } |
| 614 | } |
| 615 | |
| 616 | pin_project! { |
| 617 | // Not publicly exported (so missing_docs doesn't trigger). |
| 618 | // |
| 619 | // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly |
| 620 | // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot |
| 621 | // (and thus we can change the type in the future). |
| 622 | #[must_use = "futures do nothing unless polled" ] |
| 623 | #[allow (missing_debug_implementations)] |
| 624 | pub struct HttpConnecting<R> { |
| 625 | #[pin] |
| 626 | fut: BoxConnecting, |
| 627 | _marker: PhantomData<R>, |
| 628 | } |
| 629 | } |
| 630 | |
| 631 | type ConnectResult = Result<TokioIo<TcpStream>, ConnectError>; |
| 632 | type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>; |
| 633 | |
| 634 | impl<R: Resolve> Future for HttpConnecting<R> { |
| 635 | type Output = ConnectResult; |
| 636 | |
| 637 | fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
| 638 | self.project().fut.poll(cx) |
| 639 | } |
| 640 | } |
| 641 | |
| 642 | // Not publicly exported (so missing_docs doesn't trigger). |
| 643 | pub struct ConnectError { |
| 644 | msg: Box<str>, |
| 645 | cause: Option<Box<dyn StdError + Send + Sync>>, |
| 646 | } |
| 647 | |
| 648 | impl ConnectError { |
| 649 | fn new<S, E>(msg: S, cause: E) -> ConnectError |
| 650 | where |
| 651 | S: Into<Box<str>>, |
| 652 | E: Into<Box<dyn StdError + Send + Sync>>, |
| 653 | { |
| 654 | ConnectError { |
| 655 | msg: msg.into(), |
| 656 | cause: Some(cause.into()), |
| 657 | } |
| 658 | } |
| 659 | |
| 660 | fn dns<E>(cause: E) -> ConnectError |
| 661 | where |
| 662 | E: Into<Box<dyn StdError + Send + Sync>>, |
| 663 | { |
| 664 | ConnectError::new("dns error" , cause) |
| 665 | } |
| 666 | |
| 667 | fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError |
| 668 | where |
| 669 | S: Into<Box<str>>, |
| 670 | E: Into<Box<dyn StdError + Send + Sync>>, |
| 671 | { |
| 672 | move |cause| ConnectError::new(msg, cause) |
| 673 | } |
| 674 | } |
| 675 | |
| 676 | impl fmt::Debug for ConnectError { |
| 677 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 678 | if let Some(ref cause: &Box) = self.cause { |
| 679 | f&mut DebugTuple<'_, '_>.debug_tuple(name:"ConnectError" ) |
| 680 | .field(&self.msg) |
| 681 | .field(cause) |
| 682 | .finish() |
| 683 | } else { |
| 684 | self.msg.fmt(f) |
| 685 | } |
| 686 | } |
| 687 | } |
| 688 | |
| 689 | impl fmt::Display for ConnectError { |
| 690 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 691 | f.write_str(&self.msg)?; |
| 692 | |
| 693 | if let Some(ref cause: &Box) = self.cause { |
| 694 | write!(f, ": {}" , cause)?; |
| 695 | } |
| 696 | |
| 697 | Ok(()) |
| 698 | } |
| 699 | } |
| 700 | |
| 701 | impl StdError for ConnectError { |
| 702 | fn source(&self) -> Option<&(dyn StdError + 'static)> { |
| 703 | self.cause.as_ref().map(|e: &Box| &**e as _) |
| 704 | } |
| 705 | } |
| 706 | |
| 707 | struct ConnectingTcp<'a> { |
| 708 | preferred: ConnectingTcpRemote, |
| 709 | fallback: Option<ConnectingTcpFallback>, |
| 710 | config: &'a Config, |
| 711 | } |
| 712 | |
| 713 | impl<'a> ConnectingTcp<'a> { |
| 714 | fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self { |
| 715 | if let Some(fallback_timeout) = config.happy_eyeballs_timeout { |
| 716 | let (preferred_addrs, fallback_addrs) = remote_addrs |
| 717 | .split_by_preference(config.local_address_ipv4, config.local_address_ipv6); |
| 718 | if fallback_addrs.is_empty() { |
| 719 | return ConnectingTcp { |
| 720 | preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), |
| 721 | fallback: None, |
| 722 | config, |
| 723 | }; |
| 724 | } |
| 725 | |
| 726 | ConnectingTcp { |
| 727 | preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), |
| 728 | fallback: Some(ConnectingTcpFallback { |
| 729 | delay: tokio::time::sleep(fallback_timeout), |
| 730 | remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout), |
| 731 | }), |
| 732 | config, |
| 733 | } |
| 734 | } else { |
| 735 | ConnectingTcp { |
| 736 | preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout), |
| 737 | fallback: None, |
| 738 | config, |
| 739 | } |
| 740 | } |
| 741 | } |
| 742 | } |
| 743 | |
| 744 | struct ConnectingTcpFallback { |
| 745 | delay: Sleep, |
| 746 | remote: ConnectingTcpRemote, |
| 747 | } |
| 748 | |
| 749 | struct ConnectingTcpRemote { |
| 750 | addrs: dns::SocketAddrs, |
| 751 | connect_timeout: Option<Duration>, |
| 752 | } |
| 753 | |
| 754 | impl ConnectingTcpRemote { |
| 755 | fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self { |
| 756 | let connect_timeout: Option = connect_timeout.and_then(|t: Duration| t.checked_div(addrs.len() as u32)); |
| 757 | |
| 758 | Self { |
| 759 | addrs, |
| 760 | connect_timeout, |
| 761 | } |
| 762 | } |
| 763 | } |
| 764 | |
| 765 | impl ConnectingTcpRemote { |
| 766 | async fn connect(&mut self, config: &Config) -> Result<TcpStream, ConnectError> { |
| 767 | let mut err = None; |
| 768 | for addr in &mut self.addrs { |
| 769 | debug!("connecting to {}" , addr); |
| 770 | match connect(&addr, config, self.connect_timeout)?.await { |
| 771 | Ok(tcp) => { |
| 772 | debug!("connected to {}" , addr); |
| 773 | return Ok(tcp); |
| 774 | } |
| 775 | Err(e) => { |
| 776 | trace!("connect error for {}: {:?}" , addr, e); |
| 777 | err = Some(e); |
| 778 | } |
| 779 | } |
| 780 | } |
| 781 | |
| 782 | match err { |
| 783 | Some(e) => Err(e), |
| 784 | None => Err(ConnectError::new( |
| 785 | "tcp connect error" , |
| 786 | std::io::Error::new(std::io::ErrorKind::NotConnected, "Network unreachable" ), |
| 787 | )), |
| 788 | } |
| 789 | } |
| 790 | } |
| 791 | |
| 792 | fn bind_local_address( |
| 793 | socket: &socket2::Socket, |
| 794 | dst_addr: &SocketAddr, |
| 795 | local_addr_ipv4: &Option<Ipv4Addr>, |
| 796 | local_addr_ipv6: &Option<Ipv6Addr>, |
| 797 | ) -> io::Result<()> { |
| 798 | match (*dst_addr, local_addr_ipv4, local_addr_ipv6) { |
| 799 | (SocketAddr::V4(_), Some(addr: &Ipv4Addr), _) => { |
| 800 | socket.bind(&SocketAddr::new((*addr).into(), port:0).into())?; |
| 801 | } |
| 802 | (SocketAddr::V6(_), _, Some(addr: &Ipv6Addr)) => { |
| 803 | socket.bind(&SocketAddr::new((*addr).into(), port:0).into())?; |
| 804 | } |
| 805 | _ => { |
| 806 | if cfg!(windows) { |
| 807 | // Windows requires a socket be bound before calling connect |
| 808 | let any: SocketAddr = match *dst_addr { |
| 809 | SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(), |
| 810 | SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(), |
| 811 | }; |
| 812 | socket.bind(&any.into())?; |
| 813 | } |
| 814 | } |
| 815 | } |
| 816 | |
| 817 | Ok(()) |
| 818 | } |
| 819 | |
| 820 | fn connect( |
| 821 | addr: &SocketAddr, |
| 822 | config: &Config, |
| 823 | connect_timeout: Option<Duration>, |
| 824 | ) -> Result<impl Future<Output = Result<TcpStream, ConnectError>>, ConnectError> { |
| 825 | // TODO(eliza): if Tokio's `TcpSocket` gains support for setting the |
| 826 | // keepalive timeout, it would be nice to use that instead of socket2, |
| 827 | // and avoid the unsafe `into_raw_fd`/`from_raw_fd` dance... |
| 828 | use socket2::{Domain, Protocol, Socket, Type}; |
| 829 | |
| 830 | let domain = Domain::for_address(*addr); |
| 831 | let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) |
| 832 | .map_err(ConnectError::m("tcp open error" ))?; |
| 833 | |
| 834 | // When constructing a Tokio `TcpSocket` from a raw fd/socket, the user is |
| 835 | // responsible for ensuring O_NONBLOCK is set. |
| 836 | socket |
| 837 | .set_nonblocking(true) |
| 838 | .map_err(ConnectError::m("tcp set_nonblocking error" ))?; |
| 839 | |
| 840 | if let Some(tcp_keepalive) = &config.tcp_keepalive_config.into_tcpkeepalive() { |
| 841 | if let Err(e) = socket.set_tcp_keepalive(tcp_keepalive) { |
| 842 | warn!("tcp set_keepalive error: {}" , e); |
| 843 | } |
| 844 | } |
| 845 | |
| 846 | // That this only works for some socket types, particularly AF_INET sockets. |
| 847 | #[cfg (any( |
| 848 | target_os = "android" , |
| 849 | target_os = "fuchsia" , |
| 850 | target_os = "illumos" , |
| 851 | target_os = "ios" , |
| 852 | target_os = "linux" , |
| 853 | target_os = "macos" , |
| 854 | target_os = "solaris" , |
| 855 | target_os = "tvos" , |
| 856 | target_os = "visionos" , |
| 857 | target_os = "watchos" , |
| 858 | ))] |
| 859 | if let Some(interface) = &config.interface { |
| 860 | // On Linux-like systems, set the interface to bind using |
| 861 | // `SO_BINDTODEVICE`. |
| 862 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 863 | socket |
| 864 | .bind_device(Some(interface.as_bytes())) |
| 865 | .map_err(ConnectError::m("tcp bind interface error" ))?; |
| 866 | |
| 867 | // On macOS-like and Solaris-like systems, we instead use `IP_BOUND_IF`. |
| 868 | // This socket option desires an integer index for the interface, so we |
| 869 | // must first determine the index of the requested interface name using |
| 870 | // `if_nametoindex`. |
| 871 | #[cfg (any( |
| 872 | target_os = "illumos" , |
| 873 | target_os = "ios" , |
| 874 | target_os = "macos" , |
| 875 | target_os = "solaris" , |
| 876 | target_os = "tvos" , |
| 877 | target_os = "visionos" , |
| 878 | target_os = "watchos" , |
| 879 | ))] |
| 880 | { |
| 881 | let idx = unsafe { libc::if_nametoindex(interface.as_ptr()) }; |
| 882 | let idx = std::num::NonZeroU32::new(idx).ok_or_else(|| { |
| 883 | // If the index is 0, check errno and return an I/O error. |
| 884 | ConnectError::new( |
| 885 | "error converting interface name to index" , |
| 886 | io::Error::last_os_error(), |
| 887 | ) |
| 888 | })?; |
| 889 | // Different setsockopt calls are necessary depending on whether the |
| 890 | // address is IPv4 or IPv6. |
| 891 | match addr { |
| 892 | SocketAddr::V4(_) => socket.bind_device_by_index_v4(Some(idx)), |
| 893 | SocketAddr::V6(_) => socket.bind_device_by_index_v6(Some(idx)), |
| 894 | } |
| 895 | .map_err(ConnectError::m("tcp bind interface error" ))?; |
| 896 | } |
| 897 | } |
| 898 | |
| 899 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 900 | if let Some(tcp_user_timeout) = &config.tcp_user_timeout { |
| 901 | if let Err(e) = socket.set_tcp_user_timeout(Some(*tcp_user_timeout)) { |
| 902 | warn!("tcp set_tcp_user_timeout error: {}" , e); |
| 903 | } |
| 904 | } |
| 905 | |
| 906 | bind_local_address( |
| 907 | &socket, |
| 908 | addr, |
| 909 | &config.local_address_ipv4, |
| 910 | &config.local_address_ipv6, |
| 911 | ) |
| 912 | .map_err(ConnectError::m("tcp bind local error" ))?; |
| 913 | |
| 914 | #[cfg (unix)] |
| 915 | let socket = unsafe { |
| 916 | // Safety: `from_raw_fd` is only safe to call if ownership of the raw |
| 917 | // file descriptor is transferred. Since we call `into_raw_fd` on the |
| 918 | // socket2 socket, it gives up ownership of the fd and will not close |
| 919 | // it, so this is safe. |
| 920 | use std::os::unix::io::{FromRawFd, IntoRawFd}; |
| 921 | TcpSocket::from_raw_fd(socket.into_raw_fd()) |
| 922 | }; |
| 923 | #[cfg (windows)] |
| 924 | let socket = unsafe { |
| 925 | // Safety: `from_raw_socket` is only safe to call if ownership of the raw |
| 926 | // Windows SOCKET is transferred. Since we call `into_raw_socket` on the |
| 927 | // socket2 socket, it gives up ownership of the SOCKET and will not close |
| 928 | // it, so this is safe. |
| 929 | use std::os::windows::io::{FromRawSocket, IntoRawSocket}; |
| 930 | TcpSocket::from_raw_socket(socket.into_raw_socket()) |
| 931 | }; |
| 932 | |
| 933 | if config.reuse_address { |
| 934 | if let Err(e) = socket.set_reuseaddr(true) { |
| 935 | warn!("tcp set_reuse_address error: {}" , e); |
| 936 | } |
| 937 | } |
| 938 | |
| 939 | if let Some(size) = config.send_buffer_size { |
| 940 | if let Err(e) = socket.set_send_buffer_size(size.try_into().unwrap_or(u32::MAX)) { |
| 941 | warn!("tcp set_buffer_size error: {}" , e); |
| 942 | } |
| 943 | } |
| 944 | |
| 945 | if let Some(size) = config.recv_buffer_size { |
| 946 | if let Err(e) = socket.set_recv_buffer_size(size.try_into().unwrap_or(u32::MAX)) { |
| 947 | warn!("tcp set_recv_buffer_size error: {}" , e); |
| 948 | } |
| 949 | } |
| 950 | |
| 951 | let connect = socket.connect(*addr); |
| 952 | Ok(async move { |
| 953 | match connect_timeout { |
| 954 | Some(dur) => match tokio::time::timeout(dur, connect).await { |
| 955 | Ok(Ok(s)) => Ok(s), |
| 956 | Ok(Err(e)) => Err(e), |
| 957 | Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)), |
| 958 | }, |
| 959 | None => connect.await, |
| 960 | } |
| 961 | .map_err(ConnectError::m("tcp connect error" )) |
| 962 | }) |
| 963 | } |
| 964 | |
| 965 | impl ConnectingTcp<'_> { |
| 966 | async fn connect(mut self) -> Result<TcpStream, ConnectError> { |
| 967 | match self.fallback { |
| 968 | None => self.preferred.connect(self.config).await, |
| 969 | Some(mut fallback) => { |
| 970 | let preferred_fut = self.preferred.connect(self.config); |
| 971 | futures_util::pin_mut!(preferred_fut); |
| 972 | |
| 973 | let fallback_fut = fallback.remote.connect(self.config); |
| 974 | futures_util::pin_mut!(fallback_fut); |
| 975 | |
| 976 | let fallback_delay = fallback.delay; |
| 977 | futures_util::pin_mut!(fallback_delay); |
| 978 | |
| 979 | let (result, future) = |
| 980 | match futures_util::future::select(preferred_fut, fallback_delay).await { |
| 981 | Either::Left((result, _fallback_delay)) => { |
| 982 | (result, Either::Right(fallback_fut)) |
| 983 | } |
| 984 | Either::Right(((), preferred_fut)) => { |
| 985 | // Delay is done, start polling both the preferred and the fallback |
| 986 | futures_util::future::select(preferred_fut, fallback_fut) |
| 987 | .await |
| 988 | .factor_first() |
| 989 | } |
| 990 | }; |
| 991 | |
| 992 | if result.is_err() { |
| 993 | // Fallback to the remaining future (could be preferred or fallback) |
| 994 | // if we get an error |
| 995 | future.await |
| 996 | } else { |
| 997 | result |
| 998 | } |
| 999 | } |
| 1000 | } |
| 1001 | } |
| 1002 | } |
| 1003 | |
| 1004 | /// Respect explicit ports in the URI, if none, either |
| 1005 | /// keep non `0` ports resolved from a custom dns resolver, |
| 1006 | /// or use the default port for the scheme. |
| 1007 | fn set_port(addr: &mut SocketAddr, host_port: u16, explicit: bool) { |
| 1008 | if explicit || addr.port() == 0 { |
| 1009 | addr.set_port(new_port:host_port) |
| 1010 | }; |
| 1011 | } |
| 1012 | |
| 1013 | #[cfg (test)] |
| 1014 | mod tests { |
| 1015 | use std::io; |
| 1016 | use std::net::SocketAddr; |
| 1017 | |
| 1018 | use ::http::Uri; |
| 1019 | |
| 1020 | use crate::client::legacy::connect::http::TcpKeepaliveConfig; |
| 1021 | |
| 1022 | use super::super::sealed::{Connect, ConnectSvc}; |
| 1023 | use super::{Config, ConnectError, HttpConnector}; |
| 1024 | |
| 1025 | use super::set_port; |
| 1026 | |
| 1027 | async fn connect<C>( |
| 1028 | connector: C, |
| 1029 | dst: Uri, |
| 1030 | ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error> |
| 1031 | where |
| 1032 | C: Connect, |
| 1033 | { |
| 1034 | connector.connect(super::super::sealed::Internal, dst).await |
| 1035 | } |
| 1036 | |
| 1037 | #[tokio::test ] |
| 1038 | #[cfg_attr (miri, ignore)] |
| 1039 | async fn test_errors_enforce_http() { |
| 1040 | let dst = "https://example.domain/foo/bar?baz" .parse().unwrap(); |
| 1041 | let connector = HttpConnector::new(); |
| 1042 | |
| 1043 | let err = connect(connector, dst).await.unwrap_err(); |
| 1044 | assert_eq!(&*err.msg, super::INVALID_NOT_HTTP); |
| 1045 | } |
| 1046 | |
| 1047 | #[cfg (any(target_os = "linux" , target_os = "macos" ))] |
| 1048 | fn get_local_ips() -> (Option<std::net::Ipv4Addr>, Option<std::net::Ipv6Addr>) { |
| 1049 | use std::net::{IpAddr, TcpListener}; |
| 1050 | |
| 1051 | let mut ip_v4 = None; |
| 1052 | let mut ip_v6 = None; |
| 1053 | |
| 1054 | let ips = pnet_datalink::interfaces() |
| 1055 | .into_iter() |
| 1056 | .flat_map(|i| i.ips.into_iter().map(|n| n.ip())); |
| 1057 | |
| 1058 | for ip in ips { |
| 1059 | match ip { |
| 1060 | IpAddr::V4(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v4 = Some(ip), |
| 1061 | IpAddr::V6(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v6 = Some(ip), |
| 1062 | _ => (), |
| 1063 | } |
| 1064 | |
| 1065 | if ip_v4.is_some() && ip_v6.is_some() { |
| 1066 | break; |
| 1067 | } |
| 1068 | } |
| 1069 | |
| 1070 | (ip_v4, ip_v6) |
| 1071 | } |
| 1072 | |
| 1073 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 1074 | fn default_interface() -> Option<String> { |
| 1075 | pnet_datalink::interfaces() |
| 1076 | .iter() |
| 1077 | .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty()) |
| 1078 | .map(|e| e.name.clone()) |
| 1079 | } |
| 1080 | |
| 1081 | #[tokio::test ] |
| 1082 | #[cfg_attr (miri, ignore)] |
| 1083 | async fn test_errors_missing_scheme() { |
| 1084 | let dst = "example.domain" .parse().unwrap(); |
| 1085 | let mut connector = HttpConnector::new(); |
| 1086 | connector.enforce_http(false); |
| 1087 | |
| 1088 | let err = connect(connector, dst).await.unwrap_err(); |
| 1089 | assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME); |
| 1090 | } |
| 1091 | |
| 1092 | // NOTE: pnet crate that we use in this test doesn't compile on Windows |
| 1093 | #[cfg (any(target_os = "linux" , target_os = "macos" ))] |
| 1094 | #[cfg_attr (miri, ignore)] |
| 1095 | #[tokio::test ] |
| 1096 | async fn local_address() { |
| 1097 | use std::net::{IpAddr, TcpListener}; |
| 1098 | |
| 1099 | let (bind_ip_v4, bind_ip_v6) = get_local_ips(); |
| 1100 | let server4 = TcpListener::bind("127.0.0.1:0" ).unwrap(); |
| 1101 | let port = server4.local_addr().unwrap().port(); |
| 1102 | let server6 = TcpListener::bind(&format!("[::1]:{}" , port)).unwrap(); |
| 1103 | |
| 1104 | let assert_client_ip = |dst: String, server: TcpListener, expected_ip: IpAddr| async move { |
| 1105 | let mut connector = HttpConnector::new(); |
| 1106 | |
| 1107 | match (bind_ip_v4, bind_ip_v6) { |
| 1108 | (Some(v4), Some(v6)) => connector.set_local_addresses(v4, v6), |
| 1109 | (Some(v4), None) => connector.set_local_address(Some(v4.into())), |
| 1110 | (None, Some(v6)) => connector.set_local_address(Some(v6.into())), |
| 1111 | _ => unreachable!(), |
| 1112 | } |
| 1113 | |
| 1114 | connect(connector, dst.parse().unwrap()).await.unwrap(); |
| 1115 | |
| 1116 | let (_, client_addr) = server.accept().unwrap(); |
| 1117 | |
| 1118 | assert_eq!(client_addr.ip(), expected_ip); |
| 1119 | }; |
| 1120 | |
| 1121 | if let Some(ip) = bind_ip_v4 { |
| 1122 | assert_client_ip(format!("http://127.0.0.1:{}" , port), server4, ip.into()).await; |
| 1123 | } |
| 1124 | |
| 1125 | if let Some(ip) = bind_ip_v6 { |
| 1126 | assert_client_ip(format!("http://[::1]:{}" , port), server6, ip.into()).await; |
| 1127 | } |
| 1128 | } |
| 1129 | |
| 1130 | // NOTE: pnet crate that we use in this test doesn't compile on Windows |
| 1131 | #[cfg (any(target_os = "android" , target_os = "fuchsia" , target_os = "linux" ))] |
| 1132 | #[tokio::test ] |
| 1133 | #[ignore = "setting `SO_BINDTODEVICE` requires the `CAP_NET_RAW` capability (works when running as root)" ] |
| 1134 | async fn interface() { |
| 1135 | use socket2::{Domain, Protocol, Socket, Type}; |
| 1136 | use std::net::TcpListener; |
| 1137 | |
| 1138 | let interface: Option<String> = default_interface(); |
| 1139 | |
| 1140 | let server4 = TcpListener::bind("127.0.0.1:0" ).unwrap(); |
| 1141 | let port = server4.local_addr().unwrap().port(); |
| 1142 | |
| 1143 | let server6 = TcpListener::bind(&format!("[::1]:{}" , port)).unwrap(); |
| 1144 | |
| 1145 | let assert_interface_name = |
| 1146 | |dst: String, |
| 1147 | server: TcpListener, |
| 1148 | bind_iface: Option<String>, |
| 1149 | expected_interface: Option<String>| async move { |
| 1150 | let mut connector = HttpConnector::new(); |
| 1151 | if let Some(iface) = bind_iface { |
| 1152 | connector.set_interface(iface); |
| 1153 | } |
| 1154 | |
| 1155 | connect(connector, dst.parse().unwrap()).await.unwrap(); |
| 1156 | let domain = Domain::for_address(server.local_addr().unwrap()); |
| 1157 | let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).unwrap(); |
| 1158 | |
| 1159 | assert_eq!( |
| 1160 | socket.device().unwrap().as_deref(), |
| 1161 | expected_interface.as_deref().map(|val| val.as_bytes()) |
| 1162 | ); |
| 1163 | }; |
| 1164 | |
| 1165 | assert_interface_name( |
| 1166 | format!("http://127.0.0.1:{}" , port), |
| 1167 | server4, |
| 1168 | interface.clone(), |
| 1169 | interface.clone(), |
| 1170 | ) |
| 1171 | .await; |
| 1172 | assert_interface_name( |
| 1173 | format!("http://[::1]:{}" , port), |
| 1174 | server6, |
| 1175 | interface.clone(), |
| 1176 | interface.clone(), |
| 1177 | ) |
| 1178 | .await; |
| 1179 | } |
| 1180 | |
| 1181 | #[test ] |
| 1182 | #[ignore ] // TODO |
| 1183 | #[cfg_attr (not(feature = "__internal_happy_eyeballs_tests" ), ignore)] |
| 1184 | fn client_happy_eyeballs() { |
| 1185 | use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; |
| 1186 | use std::time::{Duration, Instant}; |
| 1187 | |
| 1188 | use super::dns; |
| 1189 | use super::ConnectingTcp; |
| 1190 | |
| 1191 | let server4 = TcpListener::bind("127.0.0.1:0" ).unwrap(); |
| 1192 | let addr = server4.local_addr().unwrap(); |
| 1193 | let _server6 = TcpListener::bind(&format!("[::1]:{}" , addr.port())).unwrap(); |
| 1194 | let rt = tokio::runtime::Builder::new_current_thread() |
| 1195 | .enable_all() |
| 1196 | .build() |
| 1197 | .unwrap(); |
| 1198 | |
| 1199 | let local_timeout = Duration::default(); |
| 1200 | let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1; |
| 1201 | let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1; |
| 1202 | let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout) |
| 1203 | + Duration::from_millis(250); |
| 1204 | |
| 1205 | let scenarios = &[ |
| 1206 | // Fast primary, without fallback. |
| 1207 | (&[local_ipv4_addr()][..], 4, local_timeout, false), |
| 1208 | (&[local_ipv6_addr()][..], 6, local_timeout, false), |
| 1209 | // Fast primary, with (unused) fallback. |
| 1210 | ( |
| 1211 | &[local_ipv4_addr(), local_ipv6_addr()][..], |
| 1212 | 4, |
| 1213 | local_timeout, |
| 1214 | false, |
| 1215 | ), |
| 1216 | ( |
| 1217 | &[local_ipv6_addr(), local_ipv4_addr()][..], |
| 1218 | 6, |
| 1219 | local_timeout, |
| 1220 | false, |
| 1221 | ), |
| 1222 | // Unreachable + fast primary, without fallback. |
| 1223 | ( |
| 1224 | &[unreachable_ipv4_addr(), local_ipv4_addr()][..], |
| 1225 | 4, |
| 1226 | unreachable_v4_timeout, |
| 1227 | false, |
| 1228 | ), |
| 1229 | ( |
| 1230 | &[unreachable_ipv6_addr(), local_ipv6_addr()][..], |
| 1231 | 6, |
| 1232 | unreachable_v6_timeout, |
| 1233 | false, |
| 1234 | ), |
| 1235 | // Unreachable + fast primary, with (unused) fallback. |
| 1236 | ( |
| 1237 | &[ |
| 1238 | unreachable_ipv4_addr(), |
| 1239 | local_ipv4_addr(), |
| 1240 | local_ipv6_addr(), |
| 1241 | ][..], |
| 1242 | 4, |
| 1243 | unreachable_v4_timeout, |
| 1244 | false, |
| 1245 | ), |
| 1246 | ( |
| 1247 | &[ |
| 1248 | unreachable_ipv6_addr(), |
| 1249 | local_ipv6_addr(), |
| 1250 | local_ipv4_addr(), |
| 1251 | ][..], |
| 1252 | 6, |
| 1253 | unreachable_v6_timeout, |
| 1254 | true, |
| 1255 | ), |
| 1256 | // Slow primary, with (used) fallback. |
| 1257 | ( |
| 1258 | &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..], |
| 1259 | 6, |
| 1260 | fallback_timeout, |
| 1261 | false, |
| 1262 | ), |
| 1263 | ( |
| 1264 | &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..], |
| 1265 | 4, |
| 1266 | fallback_timeout, |
| 1267 | true, |
| 1268 | ), |
| 1269 | // Slow primary, with (used) unreachable + fast fallback. |
| 1270 | ( |
| 1271 | &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..], |
| 1272 | 6, |
| 1273 | fallback_timeout + unreachable_v6_timeout, |
| 1274 | false, |
| 1275 | ), |
| 1276 | ( |
| 1277 | &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..], |
| 1278 | 4, |
| 1279 | fallback_timeout + unreachable_v4_timeout, |
| 1280 | true, |
| 1281 | ), |
| 1282 | ]; |
| 1283 | |
| 1284 | // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network. |
| 1285 | // Otherwise, connection to "slow" IPv6 address will error-out immediately. |
| 1286 | let ipv6_accessible = measure_connect(slow_ipv6_addr()).0; |
| 1287 | |
| 1288 | for &(hosts, family, timeout, needs_ipv6_access) in scenarios { |
| 1289 | if needs_ipv6_access && !ipv6_accessible { |
| 1290 | continue; |
| 1291 | } |
| 1292 | |
| 1293 | let (start, stream) = rt |
| 1294 | .block_on(async move { |
| 1295 | let addrs = hosts |
| 1296 | .iter() |
| 1297 | .map(|host| (host.clone(), addr.port()).into()) |
| 1298 | .collect(); |
| 1299 | let cfg = Config { |
| 1300 | local_address_ipv4: None, |
| 1301 | local_address_ipv6: None, |
| 1302 | connect_timeout: None, |
| 1303 | tcp_keepalive_config: TcpKeepaliveConfig::default(), |
| 1304 | happy_eyeballs_timeout: Some(fallback_timeout), |
| 1305 | nodelay: false, |
| 1306 | reuse_address: false, |
| 1307 | enforce_http: false, |
| 1308 | send_buffer_size: None, |
| 1309 | recv_buffer_size: None, |
| 1310 | #[cfg (any( |
| 1311 | target_os = "android" , |
| 1312 | target_os = "fuchsia" , |
| 1313 | target_os = "linux" |
| 1314 | ))] |
| 1315 | interface: None, |
| 1316 | #[cfg (any( |
| 1317 | target_os = "illumos" , |
| 1318 | target_os = "ios" , |
| 1319 | target_os = "macos" , |
| 1320 | target_os = "solaris" , |
| 1321 | target_os = "tvos" , |
| 1322 | target_os = "visionos" , |
| 1323 | target_os = "watchos" , |
| 1324 | ))] |
| 1325 | interface: None, |
| 1326 | #[cfg (any( |
| 1327 | target_os = "android" , |
| 1328 | target_os = "fuchsia" , |
| 1329 | target_os = "linux" |
| 1330 | ))] |
| 1331 | tcp_user_timeout: None, |
| 1332 | }; |
| 1333 | let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg); |
| 1334 | let start = Instant::now(); |
| 1335 | Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?)) |
| 1336 | }) |
| 1337 | .unwrap(); |
| 1338 | let res = if stream.peer_addr().unwrap().is_ipv4() { |
| 1339 | 4 |
| 1340 | } else { |
| 1341 | 6 |
| 1342 | }; |
| 1343 | let duration = start.elapsed(); |
| 1344 | |
| 1345 | // Allow actual duration to be +/- 150ms off. |
| 1346 | let min_duration = if timeout >= Duration::from_millis(150) { |
| 1347 | timeout - Duration::from_millis(150) |
| 1348 | } else { |
| 1349 | Duration::default() |
| 1350 | }; |
| 1351 | let max_duration = timeout + Duration::from_millis(150); |
| 1352 | |
| 1353 | assert_eq!(res, family); |
| 1354 | assert!(duration >= min_duration); |
| 1355 | assert!(duration <= max_duration); |
| 1356 | } |
| 1357 | |
| 1358 | fn local_ipv4_addr() -> IpAddr { |
| 1359 | Ipv4Addr::new(127, 0, 0, 1).into() |
| 1360 | } |
| 1361 | |
| 1362 | fn local_ipv6_addr() -> IpAddr { |
| 1363 | Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into() |
| 1364 | } |
| 1365 | |
| 1366 | fn unreachable_ipv4_addr() -> IpAddr { |
| 1367 | Ipv4Addr::new(127, 0, 0, 2).into() |
| 1368 | } |
| 1369 | |
| 1370 | fn unreachable_ipv6_addr() -> IpAddr { |
| 1371 | Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into() |
| 1372 | } |
| 1373 | |
| 1374 | fn slow_ipv4_addr() -> IpAddr { |
| 1375 | // RFC 6890 reserved IPv4 address. |
| 1376 | Ipv4Addr::new(198, 18, 0, 25).into() |
| 1377 | } |
| 1378 | |
| 1379 | fn slow_ipv6_addr() -> IpAddr { |
| 1380 | // RFC 6890 reserved IPv6 address. |
| 1381 | Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into() |
| 1382 | } |
| 1383 | |
| 1384 | fn measure_connect(addr: IpAddr) -> (bool, Duration) { |
| 1385 | let start = Instant::now(); |
| 1386 | let result = |
| 1387 | std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1)); |
| 1388 | |
| 1389 | let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut; |
| 1390 | let duration = start.elapsed(); |
| 1391 | (reachable, duration) |
| 1392 | } |
| 1393 | } |
| 1394 | |
| 1395 | use std::time::Duration; |
| 1396 | |
| 1397 | #[test ] |
| 1398 | fn no_tcp_keepalive_config() { |
| 1399 | assert!(TcpKeepaliveConfig::default().into_tcpkeepalive().is_none()); |
| 1400 | } |
| 1401 | |
| 1402 | #[test ] |
| 1403 | fn tcp_keepalive_time_config() { |
| 1404 | let mut kac = TcpKeepaliveConfig::default(); |
| 1405 | kac.time = Some(Duration::from_secs(60)); |
| 1406 | if let Some(tcp_keepalive) = kac.into_tcpkeepalive() { |
| 1407 | assert!(format!("{tcp_keepalive:?}" ).contains("time: Some(60s)" )); |
| 1408 | } else { |
| 1409 | panic!("test failed" ); |
| 1410 | } |
| 1411 | } |
| 1412 | |
| 1413 | #[cfg (not(any(target_os = "openbsd" , target_os = "redox" , target_os = "solaris" )))] |
| 1414 | #[test ] |
| 1415 | fn tcp_keepalive_interval_config() { |
| 1416 | let mut kac = TcpKeepaliveConfig::default(); |
| 1417 | kac.interval = Some(Duration::from_secs(1)); |
| 1418 | if let Some(tcp_keepalive) = kac.into_tcpkeepalive() { |
| 1419 | assert!(format!("{tcp_keepalive:?}" ).contains("interval: Some(1s)" )); |
| 1420 | } else { |
| 1421 | panic!("test failed" ); |
| 1422 | } |
| 1423 | } |
| 1424 | |
| 1425 | #[cfg (not(any( |
| 1426 | target_os = "openbsd" , |
| 1427 | target_os = "redox" , |
| 1428 | target_os = "solaris" , |
| 1429 | target_os = "windows" |
| 1430 | )))] |
| 1431 | #[test ] |
| 1432 | fn tcp_keepalive_retries_config() { |
| 1433 | let mut kac = TcpKeepaliveConfig::default(); |
| 1434 | kac.retries = Some(3); |
| 1435 | if let Some(tcp_keepalive) = kac.into_tcpkeepalive() { |
| 1436 | assert!(format!("{tcp_keepalive:?}" ).contains("retries: Some(3)" )); |
| 1437 | } else { |
| 1438 | panic!("test failed" ); |
| 1439 | } |
| 1440 | } |
| 1441 | |
| 1442 | #[test ] |
| 1443 | fn test_set_port() { |
| 1444 | // Respect explicit ports no matter what the resolved port is. |
| 1445 | let mut addr = SocketAddr::from(([0, 0, 0, 0], 6881)); |
| 1446 | set_port(&mut addr, 42, true); |
| 1447 | assert_eq!(addr.port(), 42); |
| 1448 | |
| 1449 | // Ignore default host port, and use the socket port instead. |
| 1450 | let mut addr = SocketAddr::from(([0, 0, 0, 0], 6881)); |
| 1451 | set_port(&mut addr, 443, false); |
| 1452 | assert_eq!(addr.port(), 6881); |
| 1453 | |
| 1454 | // Use the default port if the resolved port is `0`. |
| 1455 | let mut addr = SocketAddr::from(([0, 0, 0, 0], 0)); |
| 1456 | set_port(&mut addr, 443, false); |
| 1457 | assert_eq!(addr.port(), 443); |
| 1458 | } |
| 1459 | } |
| 1460 | |