1 | use std::{ops::Deref, sync::Arc}; |
2 | |
3 | use http::Request; |
4 | use tokio::sync::watch; |
5 | |
6 | use super::Connected; |
7 | |
8 | /// [`CaptureConnection`] allows callers to capture [`Connected`] information |
9 | /// |
10 | /// To capture a connection for a request, use [`capture_connection`]. |
11 | #[derive (Debug, Clone)] |
12 | pub struct CaptureConnection { |
13 | rx: watch::Receiver<Option<Connected>>, |
14 | } |
15 | |
16 | /// Capture the connection for a given request |
17 | /// |
18 | /// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait. |
19 | /// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon |
20 | /// as the connection is established. |
21 | /// |
22 | /// [`Connection`]: crate::client::legacy::connect::Connection |
23 | /// |
24 | /// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none. |
25 | /// |
26 | /// # Examples |
27 | /// |
28 | /// **Synchronous access**: |
29 | /// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been |
30 | /// established. This is ideal for situations where you are certain the connection has already |
31 | /// been established (e.g. after the response future has already completed). |
32 | /// ```rust |
33 | /// use hyper_util::client::legacy::connect::capture_connection; |
34 | /// let mut request = http::Request::builder() |
35 | /// .uri("http://foo.com" ) |
36 | /// .body(()) |
37 | /// .unwrap(); |
38 | /// |
39 | /// let captured_connection = capture_connection(&mut request); |
40 | /// // some time later after the request has been sent... |
41 | /// let connection_info = captured_connection.connection_metadata(); |
42 | /// println!("we are connected! {:?}" , connection_info.as_ref()); |
43 | /// ``` |
44 | /// |
45 | /// **Asynchronous access**: |
46 | /// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the |
47 | /// connection is available. |
48 | /// |
49 | /// ```rust |
50 | /// # #[cfg (feature = "tokio" )] |
51 | /// # async fn example() { |
52 | /// use hyper_util::client::legacy::connect::capture_connection; |
53 | /// use hyper_util::client::legacy::Client; |
54 | /// use hyper_util::rt::TokioExecutor; |
55 | /// use bytes::Bytes; |
56 | /// use http_body_util::Empty; |
57 | /// let mut request = http::Request::builder() |
58 | /// .uri("http://foo.com" ) |
59 | /// .body(Empty::<Bytes>::new()) |
60 | /// .unwrap(); |
61 | /// |
62 | /// let mut captured = capture_connection(&mut request); |
63 | /// tokio::task::spawn(async move { |
64 | /// let connection_info = captured.wait_for_connection_metadata().await; |
65 | /// println!("we are connected! {:?}" , connection_info.as_ref()); |
66 | /// }); |
67 | /// |
68 | /// let client = Client::builder(TokioExecutor::new()).build_http(); |
69 | /// client.request(request).await.expect("request failed" ); |
70 | /// # } |
71 | /// ``` |
72 | pub fn capture_connection<B>(request: &mut Request<B>) -> CaptureConnection { |
73 | let (tx: CaptureConnectionExtension, rx: CaptureConnection) = CaptureConnection::new(); |
74 | request.extensions_mut().insert(val:tx); |
75 | rx |
76 | } |
77 | |
78 | /// TxSide for [`CaptureConnection`] |
79 | /// |
80 | /// This is inserted into `Extensions` to allow Hyper to back channel connection info |
81 | #[derive (Clone)] |
82 | pub(crate) struct CaptureConnectionExtension { |
83 | tx: Arc<watch::Sender<Option<Connected>>>, |
84 | } |
85 | |
86 | impl CaptureConnectionExtension { |
87 | pub(crate) fn set(&self, connected: &Connected) { |
88 | self.tx.send_replace(Some(connected.clone())); |
89 | } |
90 | } |
91 | |
92 | impl CaptureConnection { |
93 | /// Internal API to create the tx and rx half of [`CaptureConnection`] |
94 | pub(crate) fn new() -> (CaptureConnectionExtension, Self) { |
95 | let (tx, rx) = watch::channel(None); |
96 | ( |
97 | CaptureConnectionExtension { tx: Arc::new(tx) }, |
98 | CaptureConnection { rx }, |
99 | ) |
100 | } |
101 | |
102 | /// Retrieve the connection metadata, if available |
103 | pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ { |
104 | self.rx.borrow() |
105 | } |
106 | |
107 | /// Wait for the connection to be established |
108 | /// |
109 | /// If a connection was established, this will always return `Some(...)`. If the request never |
110 | /// successfully connected (e.g. DNS resolution failure), this method will never return. |
111 | pub async fn wait_for_connection_metadata( |
112 | &mut self, |
113 | ) -> impl Deref<Target = Option<Connected>> + '_ { |
114 | if self.rx.borrow().is_some() { |
115 | return self.rx.borrow(); |
116 | } |
117 | let _ = self.rx.changed().await; |
118 | self.rx.borrow() |
119 | } |
120 | } |
121 | |
122 | #[cfg (all(test, not(miri)))] |
123 | mod test { |
124 | use super::*; |
125 | |
126 | #[test ] |
127 | fn test_sync_capture_connection() { |
128 | let (tx, rx) = CaptureConnection::new(); |
129 | assert!( |
130 | rx.connection_metadata().is_none(), |
131 | "connection has not been set" |
132 | ); |
133 | tx.set(&Connected::new().proxy(true)); |
134 | assert_eq!( |
135 | rx.connection_metadata() |
136 | .as_ref() |
137 | .expect("connected should be set" ) |
138 | .is_proxied(), |
139 | true |
140 | ); |
141 | |
142 | // ensure it can be called multiple times |
143 | assert_eq!( |
144 | rx.connection_metadata() |
145 | .as_ref() |
146 | .expect("connected should be set" ) |
147 | .is_proxied(), |
148 | true |
149 | ); |
150 | } |
151 | |
152 | #[tokio::test ] |
153 | async fn async_capture_connection() { |
154 | let (tx, mut rx) = CaptureConnection::new(); |
155 | assert!( |
156 | rx.connection_metadata().is_none(), |
157 | "connection has not been set" |
158 | ); |
159 | let test_task = tokio::spawn(async move { |
160 | assert_eq!( |
161 | rx.wait_for_connection_metadata() |
162 | .await |
163 | .as_ref() |
164 | .expect("connection should be set" ) |
165 | .is_proxied(), |
166 | true |
167 | ); |
168 | // can be awaited multiple times |
169 | assert!( |
170 | rx.wait_for_connection_metadata().await.is_some(), |
171 | "should be awaitable multiple times" |
172 | ); |
173 | |
174 | assert_eq!(rx.connection_metadata().is_some(), true); |
175 | }); |
176 | // can't be finished, we haven't set the connection yet |
177 | assert_eq!(test_task.is_finished(), false); |
178 | tx.set(&Connected::new().proxy(true)); |
179 | |
180 | assert!(test_task.await.is_ok()); |
181 | } |
182 | |
183 | #[tokio::test ] |
184 | async fn capture_connection_sender_side_dropped() { |
185 | let (tx, mut rx) = CaptureConnection::new(); |
186 | assert!( |
187 | rx.connection_metadata().is_none(), |
188 | "connection has not been set" |
189 | ); |
190 | drop(tx); |
191 | assert!(rx.wait_for_connection_metadata().await.is_none()); |
192 | } |
193 | } |
194 | |