1 | use std::error::Error as StdError; |
2 | #[cfg (feature = "runtime" )] |
3 | use std::time::Duration; |
4 | |
5 | use bytes::Bytes; |
6 | use futures_channel::{mpsc, oneshot}; |
7 | use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _}; |
8 | use futures_util::stream::StreamExt as _; |
9 | use h2::client::{Builder, SendRequest}; |
10 | use http::{Method, StatusCode}; |
11 | use tokio::io::{AsyncRead, AsyncWrite}; |
12 | use tracing::{debug, trace, warn}; |
13 | |
14 | use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; |
15 | use crate::body::HttpBody; |
16 | use crate::common::{exec::Exec, task, Future, Never, Pin, Poll}; |
17 | use crate::ext::Protocol; |
18 | use crate::headers; |
19 | use crate::proto::h2::UpgradedSendStream; |
20 | use crate::proto::Dispatched; |
21 | use crate::upgrade::Upgraded; |
22 | use crate::{Body, Request, Response}; |
23 | |
24 | type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>; |
25 | |
26 | ///// An mpsc channel is used to help notify the `Connection` task when *all* |
27 | ///// other handles to it have been dropped, so that it can shutdown. |
28 | type ConnDropRef = mpsc::Sender<Never>; |
29 | |
30 | ///// A oneshot channel watches the `Connection` task, and when it completes, |
31 | ///// the "dispatch" task will be notified and can shutdown sooner. |
32 | type ConnEof = oneshot::Receiver<Never>; |
33 | |
34 | // Our defaults are chosen for the "majority" case, which usually are not |
35 | // resource constrained, and so the spec default of 64kb can be too limiting |
36 | // for performance. |
37 | const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb |
38 | const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb |
39 | const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb |
40 | const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb |
41 | |
42 | #[derive (Clone, Debug)] |
43 | pub(crate) struct Config { |
44 | pub(crate) adaptive_window: bool, |
45 | pub(crate) initial_conn_window_size: u32, |
46 | pub(crate) initial_stream_window_size: u32, |
47 | pub(crate) max_frame_size: u32, |
48 | #[cfg (feature = "runtime" )] |
49 | pub(crate) keep_alive_interval: Option<Duration>, |
50 | #[cfg (feature = "runtime" )] |
51 | pub(crate) keep_alive_timeout: Duration, |
52 | #[cfg (feature = "runtime" )] |
53 | pub(crate) keep_alive_while_idle: bool, |
54 | pub(crate) max_concurrent_reset_streams: Option<usize>, |
55 | pub(crate) max_send_buffer_size: usize, |
56 | } |
57 | |
58 | impl Default for Config { |
59 | fn default() -> Config { |
60 | Config { |
61 | adaptive_window: false, |
62 | initial_conn_window_size: DEFAULT_CONN_WINDOW, |
63 | initial_stream_window_size: DEFAULT_STREAM_WINDOW, |
64 | max_frame_size: DEFAULT_MAX_FRAME_SIZE, |
65 | #[cfg (feature = "runtime" )] |
66 | keep_alive_interval: None, |
67 | #[cfg (feature = "runtime" )] |
68 | keep_alive_timeout: Duration::from_secs(20), |
69 | #[cfg (feature = "runtime" )] |
70 | keep_alive_while_idle: false, |
71 | max_concurrent_reset_streams: None, |
72 | max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, |
73 | } |
74 | } |
75 | } |
76 | |
77 | fn new_builder(config: &Config) -> Builder { |
78 | let mut builder: Builder = Builder::default(); |
79 | builder |
80 | .initial_window_size(config.initial_stream_window_size) |
81 | .initial_connection_window_size(config.initial_conn_window_size) |
82 | .max_frame_size(config.max_frame_size) |
83 | .max_send_buffer_size(config.max_send_buffer_size) |
84 | .enable_push(enabled:false); |
85 | if let Some(max: usize) = config.max_concurrent_reset_streams { |
86 | builder.max_concurrent_reset_streams(max); |
87 | } |
88 | builder |
89 | } |
90 | |
91 | fn new_ping_config(config: &Config) -> ping::Config { |
92 | ping::Config { |
93 | bdp_initial_window: if config.adaptive_window { |
94 | Some(config.initial_stream_window_size) |
95 | } else { |
96 | None |
97 | }, |
98 | #[cfg (feature = "runtime" )] |
99 | keep_alive_interval: config.keep_alive_interval, |
100 | #[cfg (feature = "runtime" )] |
101 | keep_alive_timeout: config.keep_alive_timeout, |
102 | #[cfg (feature = "runtime" )] |
103 | keep_alive_while_idle: config.keep_alive_while_idle, |
104 | } |
105 | } |
106 | |
107 | pub(crate) async fn handshake<T, B>( |
108 | io: T, |
109 | req_rx: ClientRx<B>, |
110 | config: &Config, |
111 | exec: Exec, |
112 | ) -> crate::Result<ClientTask<B>> |
113 | where |
114 | T: AsyncRead + AsyncWrite + Send + Unpin + 'static, |
115 | B: HttpBody, |
116 | B::Data: Send + 'static, |
117 | { |
118 | let (h2_tx, mut conn) = new_builder(config) |
119 | .handshake::<_, SendBuf<B::Data>>(io) |
120 | .await |
121 | .map_err(crate::Error::new_h2)?; |
122 | |
123 | // An mpsc channel is used entirely to detect when the |
124 | // 'Client' has been dropped. This is to get around a bug |
125 | // in h2 where dropping all SendRequests won't notify a |
126 | // parked Connection. |
127 | let (conn_drop_ref, rx) = mpsc::channel(1); |
128 | let (cancel_tx, conn_eof) = oneshot::channel(); |
129 | |
130 | let conn_drop_rx = rx.into_future().map(|(item, _rx)| { |
131 | if let Some(never) = item { |
132 | match never {} |
133 | } |
134 | }); |
135 | |
136 | let ping_config = new_ping_config(&config); |
137 | |
138 | let (conn, ping) = if ping_config.is_enabled() { |
139 | let pp = conn.ping_pong().expect("conn.ping_pong" ); |
140 | let (recorder, mut ponger) = ping::channel(pp, ping_config); |
141 | |
142 | let conn = future::poll_fn(move |cx| { |
143 | match ponger.poll(cx) { |
144 | Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { |
145 | conn.set_target_window_size(wnd); |
146 | conn.set_initial_window_size(wnd)?; |
147 | } |
148 | #[cfg (feature = "runtime" )] |
149 | Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { |
150 | debug!("connection keep-alive timed out" ); |
151 | return Poll::Ready(Ok(())); |
152 | } |
153 | Poll::Pending => {} |
154 | } |
155 | |
156 | Pin::new(&mut conn).poll(cx) |
157 | }); |
158 | (Either::Left(conn), recorder) |
159 | } else { |
160 | (Either::Right(conn), ping::disabled()) |
161 | }; |
162 | let conn = conn.map_err(|e| debug!("connection error: {}" , e)); |
163 | |
164 | exec.execute(conn_task(conn, conn_drop_rx, cancel_tx)); |
165 | |
166 | Ok(ClientTask { |
167 | ping, |
168 | conn_drop_ref, |
169 | conn_eof, |
170 | executor: exec, |
171 | h2_tx, |
172 | req_rx, |
173 | }) |
174 | } |
175 | |
176 | async fn conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Never>) |
177 | where |
178 | C: Future + Unpin, |
179 | D: Future<Output = ()> + Unpin, |
180 | { |
181 | match future::select(future1:conn, future2:drop_rx).await { |
182 | Either::Left(_) => { |
183 | // ok or err, the `conn` has finished |
184 | } |
185 | Either::Right(((), conn: C)) => { |
186 | // mpsc has been dropped, hopefully polling |
187 | // the connection some more should start shutdown |
188 | // and then close |
189 | trace!("send_request dropped, starting conn shutdown" ); |
190 | drop(cancel_tx); |
191 | let _ = conn.await; |
192 | } |
193 | } |
194 | } |
195 | |
196 | pub(crate) struct ClientTask<B> |
197 | where |
198 | B: HttpBody, |
199 | { |
200 | ping: ping::Recorder, |
201 | conn_drop_ref: ConnDropRef, |
202 | conn_eof: ConnEof, |
203 | executor: Exec, |
204 | h2_tx: SendRequest<SendBuf<B::Data>>, |
205 | req_rx: ClientRx<B>, |
206 | } |
207 | |
208 | impl<B> ClientTask<B> |
209 | where |
210 | B: HttpBody + 'static, |
211 | { |
212 | pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { |
213 | self.h2_tx.is_extended_connect_protocol_enabled() |
214 | } |
215 | } |
216 | |
217 | impl<B> Future for ClientTask<B> |
218 | where |
219 | B: HttpBody + Send + 'static, |
220 | B::Data: Send, |
221 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
222 | { |
223 | type Output = crate::Result<Dispatched>; |
224 | |
225 | fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
226 | loop { |
227 | match ready!(self.h2_tx.poll_ready(cx)) { |
228 | Ok(()) => (), |
229 | Err(err) => { |
230 | self.ping.ensure_not_timed_out()?; |
231 | return if err.reason() == Some(::h2::Reason::NO_ERROR) { |
232 | trace!("connection gracefully shutdown" ); |
233 | Poll::Ready(Ok(Dispatched::Shutdown)) |
234 | } else { |
235 | Poll::Ready(Err(crate::Error::new_h2(err))) |
236 | }; |
237 | } |
238 | }; |
239 | |
240 | match self.req_rx.poll_recv(cx) { |
241 | Poll::Ready(Some((req, cb))) => { |
242 | // check that future hasn't been canceled already |
243 | if cb.is_canceled() { |
244 | trace!("request callback is canceled" ); |
245 | continue; |
246 | } |
247 | let (head, body) = req.into_parts(); |
248 | let mut req = ::http::Request::from_parts(head, ()); |
249 | super::strip_connection_headers(req.headers_mut(), true); |
250 | if let Some(len) = body.size_hint().exact() { |
251 | if len != 0 || headers::method_has_defined_payload_semantics(req.method()) { |
252 | headers::set_content_length_if_missing(req.headers_mut(), len); |
253 | } |
254 | } |
255 | |
256 | let is_connect = req.method() == Method::CONNECT; |
257 | let eos = body.is_end_stream(); |
258 | let ping = self.ping.clone(); |
259 | |
260 | if is_connect { |
261 | if headers::content_length_parse_all(req.headers()) |
262 | .map_or(false, |len| len != 0) |
263 | { |
264 | warn!("h2 connect request with non-zero body not supported" ); |
265 | cb.send(Err(( |
266 | crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), |
267 | None, |
268 | ))); |
269 | continue; |
270 | } |
271 | } |
272 | |
273 | if let Some(protocol) = req.extensions_mut().remove::<Protocol>() { |
274 | req.extensions_mut().insert(protocol.into_inner()); |
275 | } |
276 | |
277 | let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { |
278 | Ok(ok) => ok, |
279 | Err(err) => { |
280 | debug!("client send request error: {}" , err); |
281 | cb.send(Err((crate::Error::new_h2(err), None))); |
282 | continue; |
283 | } |
284 | }; |
285 | |
286 | let send_stream = if !is_connect { |
287 | if !eos { |
288 | let mut pipe = |
289 | Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| { |
290 | if let Err(e) = res { |
291 | debug!("client request body error: {}" , e); |
292 | } |
293 | }); |
294 | |
295 | // eagerly see if the body pipe is ready and |
296 | // can thus skip allocating in the executor |
297 | match Pin::new(&mut pipe).poll(cx) { |
298 | Poll::Ready(_) => (), |
299 | Poll::Pending => { |
300 | let conn_drop_ref = self.conn_drop_ref.clone(); |
301 | // keep the ping recorder's knowledge of an |
302 | // "open stream" alive while this body is |
303 | // still sending... |
304 | let ping = ping.clone(); |
305 | let pipe = pipe.map(move |x| { |
306 | drop(conn_drop_ref); |
307 | drop(ping); |
308 | x |
309 | }); |
310 | self.executor.execute(pipe); |
311 | } |
312 | } |
313 | } |
314 | |
315 | None |
316 | } else { |
317 | Some(body_tx) |
318 | }; |
319 | |
320 | let fut = fut.map(move |result| match result { |
321 | Ok(res) => { |
322 | // record that we got the response headers |
323 | ping.record_non_data(); |
324 | |
325 | let content_length = headers::content_length_parse_all(res.headers()); |
326 | if let (Some(mut send_stream), StatusCode::OK) = |
327 | (send_stream, res.status()) |
328 | { |
329 | if content_length.map_or(false, |len| len != 0) { |
330 | warn!("h2 connect response with non-zero body not supported" ); |
331 | |
332 | send_stream.send_reset(h2::Reason::INTERNAL_ERROR); |
333 | return Err(( |
334 | crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), |
335 | None, |
336 | )); |
337 | } |
338 | let (parts, recv_stream) = res.into_parts(); |
339 | let mut res = Response::from_parts(parts, Body::empty()); |
340 | |
341 | let (pending, on_upgrade) = crate::upgrade::pending(); |
342 | let io = H2Upgraded { |
343 | ping, |
344 | send_stream: unsafe { UpgradedSendStream::new(send_stream) }, |
345 | recv_stream, |
346 | buf: Bytes::new(), |
347 | }; |
348 | let upgraded = Upgraded::new(io, Bytes::new()); |
349 | |
350 | pending.fulfill(upgraded); |
351 | res.extensions_mut().insert(on_upgrade); |
352 | |
353 | Ok(res) |
354 | } else { |
355 | let res = res.map(|stream| { |
356 | let ping = ping.for_stream(&stream); |
357 | crate::Body::h2(stream, content_length.into(), ping) |
358 | }); |
359 | Ok(res) |
360 | } |
361 | } |
362 | Err(err) => { |
363 | ping.ensure_not_timed_out().map_err(|e| (e, None))?; |
364 | |
365 | debug!("client response error: {}" , err); |
366 | Err((crate::Error::new_h2(err), None)) |
367 | } |
368 | }); |
369 | self.executor.execute(cb.send_when(fut)); |
370 | continue; |
371 | } |
372 | |
373 | Poll::Ready(None) => { |
374 | trace!("client::dispatch::Sender dropped" ); |
375 | return Poll::Ready(Ok(Dispatched::Shutdown)); |
376 | } |
377 | |
378 | Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { |
379 | Ok(never) => match never {}, |
380 | Err(_conn_is_eof) => { |
381 | trace!("connection task is closed, closing dispatch task" ); |
382 | return Poll::Ready(Ok(Dispatched::Shutdown)); |
383 | } |
384 | }, |
385 | } |
386 | } |
387 | } |
388 | } |
389 | |