1 | use std::convert::Infallible; |
2 | use std::error::Error as StdError; |
3 | use std::future::Future; |
4 | use std::marker::Unpin; |
5 | use std::pin::Pin; |
6 | use std::task::{Context, Poll}; |
7 | #[cfg (feature = "runtime" )] |
8 | use std::time::Duration; |
9 | |
10 | use bytes::Bytes; |
11 | use futures_channel::{mpsc, oneshot}; |
12 | use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _}; |
13 | use futures_util::stream::StreamExt as _; |
14 | use h2::client::{Builder, SendRequest}; |
15 | use h2::SendStream; |
16 | use http::{Method, StatusCode}; |
17 | use tokio::io::{AsyncRead, AsyncWrite}; |
18 | use tracing::{debug, trace, warn}; |
19 | |
20 | use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; |
21 | use crate::body::HttpBody; |
22 | use crate::client::dispatch::Callback; |
23 | use crate::common::exec::Exec; |
24 | use crate::ext::Protocol; |
25 | use crate::headers; |
26 | use crate::proto::h2::UpgradedSendStream; |
27 | use crate::proto::Dispatched; |
28 | use crate::upgrade::Upgraded; |
29 | use crate::{Body, Request, Response}; |
30 | use h2::client::ResponseFuture; |
31 | |
32 | type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>; |
33 | |
34 | ///// An mpsc channel is used to help notify the `Connection` task when *all* |
35 | ///// other handles to it have been dropped, so that it can shutdown. |
36 | type ConnDropRef = mpsc::Sender<Infallible>; |
37 | |
38 | ///// A oneshot channel watches the `Connection` task, and when it completes, |
39 | ///// the "dispatch" task will be notified and can shutdown sooner. |
40 | type ConnEof = oneshot::Receiver<Infallible>; |
41 | |
42 | // Our defaults are chosen for the "majority" case, which usually are not |
43 | // resource constrained, and so the spec default of 64kb can be too limiting |
44 | // for performance. |
45 | const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb |
46 | const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb |
47 | const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb |
48 | const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb |
49 | |
50 | #[derive (Clone, Debug)] |
51 | pub(crate) struct Config { |
52 | pub(crate) adaptive_window: bool, |
53 | pub(crate) initial_conn_window_size: u32, |
54 | pub(crate) initial_stream_window_size: u32, |
55 | pub(crate) max_frame_size: u32, |
56 | #[cfg (feature = "runtime" )] |
57 | pub(crate) keep_alive_interval: Option<Duration>, |
58 | #[cfg (feature = "runtime" )] |
59 | pub(crate) keep_alive_timeout: Duration, |
60 | #[cfg (feature = "runtime" )] |
61 | pub(crate) keep_alive_while_idle: bool, |
62 | pub(crate) max_concurrent_reset_streams: Option<usize>, |
63 | pub(crate) max_send_buffer_size: usize, |
64 | } |
65 | |
66 | impl Default for Config { |
67 | fn default() -> Config { |
68 | Config { |
69 | adaptive_window: false, |
70 | initial_conn_window_size: DEFAULT_CONN_WINDOW, |
71 | initial_stream_window_size: DEFAULT_STREAM_WINDOW, |
72 | max_frame_size: DEFAULT_MAX_FRAME_SIZE, |
73 | #[cfg (feature = "runtime" )] |
74 | keep_alive_interval: None, |
75 | #[cfg (feature = "runtime" )] |
76 | keep_alive_timeout: Duration::from_secs(20), |
77 | #[cfg (feature = "runtime" )] |
78 | keep_alive_while_idle: false, |
79 | max_concurrent_reset_streams: None, |
80 | max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, |
81 | } |
82 | } |
83 | } |
84 | |
85 | fn new_builder(config: &Config) -> Builder { |
86 | let mut builder: Builder = Builder::default(); |
87 | builder |
88 | .initial_window_size(config.initial_stream_window_size) |
89 | .initial_connection_window_size(config.initial_conn_window_size) |
90 | .max_frame_size(config.max_frame_size) |
91 | .max_send_buffer_size(config.max_send_buffer_size) |
92 | .enable_push(enabled:false); |
93 | if let Some(max: usize) = config.max_concurrent_reset_streams { |
94 | builder.max_concurrent_reset_streams(max); |
95 | } |
96 | builder |
97 | } |
98 | |
99 | fn new_ping_config(config: &Config) -> ping::Config { |
100 | ping::Config { |
101 | bdp_initial_window: if config.adaptive_window { |
102 | Some(config.initial_stream_window_size) |
103 | } else { |
104 | None |
105 | }, |
106 | #[cfg (feature = "runtime" )] |
107 | keep_alive_interval: config.keep_alive_interval, |
108 | #[cfg (feature = "runtime" )] |
109 | keep_alive_timeout: config.keep_alive_timeout, |
110 | #[cfg (feature = "runtime" )] |
111 | keep_alive_while_idle: config.keep_alive_while_idle, |
112 | } |
113 | } |
114 | |
115 | pub(crate) async fn handshake<T, B>( |
116 | io: T, |
117 | req_rx: ClientRx<B>, |
118 | config: &Config, |
119 | exec: Exec, |
120 | ) -> crate::Result<ClientTask<B>> |
121 | where |
122 | T: AsyncRead + AsyncWrite + Send + Unpin + 'static, |
123 | B: HttpBody, |
124 | B::Data: Send + 'static, |
125 | { |
126 | let (h2_tx, mut conn) = new_builder(config) |
127 | .handshake::<_, SendBuf<B::Data>>(io) |
128 | .await |
129 | .map_err(crate::Error::new_h2)?; |
130 | |
131 | // An mpsc channel is used entirely to detect when the |
132 | // 'Client' has been dropped. This is to get around a bug |
133 | // in h2 where dropping all SendRequests won't notify a |
134 | // parked Connection. |
135 | let (conn_drop_ref, rx) = mpsc::channel(1); |
136 | let (cancel_tx, conn_eof) = oneshot::channel(); |
137 | |
138 | let conn_drop_rx = rx.into_future().map(|(item, _rx)| { |
139 | if let Some(never) = item { |
140 | match never {} |
141 | } |
142 | }); |
143 | |
144 | let ping_config = new_ping_config(&config); |
145 | |
146 | let (conn, ping) = if ping_config.is_enabled() { |
147 | let pp = conn.ping_pong().expect("conn.ping_pong" ); |
148 | let (recorder, mut ponger) = ping::channel(pp, ping_config); |
149 | |
150 | let conn = future::poll_fn(move |cx| { |
151 | match ponger.poll(cx) { |
152 | Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { |
153 | conn.set_target_window_size(wnd); |
154 | conn.set_initial_window_size(wnd)?; |
155 | } |
156 | #[cfg (feature = "runtime" )] |
157 | Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { |
158 | debug!("connection keep-alive timed out" ); |
159 | return Poll::Ready(Ok(())); |
160 | } |
161 | Poll::Pending => {} |
162 | } |
163 | |
164 | Pin::new(&mut conn).poll(cx) |
165 | }); |
166 | (Either::Left(conn), recorder) |
167 | } else { |
168 | (Either::Right(conn), ping::disabled()) |
169 | }; |
170 | let conn = conn.map_err(|e| debug!("connection error: {}" , e)); |
171 | |
172 | exec.execute(conn_task(conn, conn_drop_rx, cancel_tx)); |
173 | |
174 | Ok(ClientTask { |
175 | ping, |
176 | conn_drop_ref, |
177 | conn_eof, |
178 | executor: exec, |
179 | h2_tx, |
180 | req_rx, |
181 | fut_ctx: None, |
182 | }) |
183 | } |
184 | |
185 | async fn conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Infallible>) |
186 | where |
187 | C: Future + Unpin, |
188 | D: Future<Output = ()> + Unpin, |
189 | { |
190 | match future::select(future1:conn, future2:drop_rx).await { |
191 | Either::Left(_) => { |
192 | // ok or err, the `conn` has finished |
193 | } |
194 | Either::Right(((), conn: C)) => { |
195 | // mpsc has been dropped, hopefully polling |
196 | // the connection some more should start shutdown |
197 | // and then close |
198 | trace!("send_request dropped, starting conn shutdown" ); |
199 | drop(cancel_tx); |
200 | let _ = conn.await; |
201 | } |
202 | } |
203 | } |
204 | |
205 | struct FutCtx<B> |
206 | where |
207 | B: HttpBody, |
208 | { |
209 | is_connect: bool, |
210 | eos: bool, |
211 | fut: ResponseFuture, |
212 | body_tx: SendStream<SendBuf<B::Data>>, |
213 | body: B, |
214 | cb: Callback<Request<B>, Response<Body>>, |
215 | } |
216 | |
217 | impl<B: HttpBody> Unpin for FutCtx<B> {} |
218 | |
219 | pub(crate) struct ClientTask<B> |
220 | where |
221 | B: HttpBody, |
222 | { |
223 | ping: ping::Recorder, |
224 | conn_drop_ref: ConnDropRef, |
225 | conn_eof: ConnEof, |
226 | executor: Exec, |
227 | h2_tx: SendRequest<SendBuf<B::Data>>, |
228 | req_rx: ClientRx<B>, |
229 | fut_ctx: Option<FutCtx<B>>, |
230 | } |
231 | |
232 | impl<B> ClientTask<B> |
233 | where |
234 | B: HttpBody + 'static, |
235 | { |
236 | pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { |
237 | self.h2_tx.is_extended_connect_protocol_enabled() |
238 | } |
239 | } |
240 | |
241 | impl<B> ClientTask<B> |
242 | where |
243 | B: HttpBody + Send + 'static, |
244 | B::Data: Send, |
245 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
246 | { |
247 | fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) { |
248 | let ping = self.ping.clone(); |
249 | let send_stream = if !f.is_connect { |
250 | if !f.eos { |
251 | let mut pipe = Box::pin(PipeToSendStream::new(f.body, f.body_tx)).map(|res| { |
252 | if let Err(e) = res { |
253 | debug!("client request body error: {}" , e); |
254 | } |
255 | }); |
256 | |
257 | // eagerly see if the body pipe is ready and |
258 | // can thus skip allocating in the executor |
259 | match Pin::new(&mut pipe).poll(cx) { |
260 | Poll::Ready(_) => (), |
261 | Poll::Pending => { |
262 | let conn_drop_ref = self.conn_drop_ref.clone(); |
263 | // keep the ping recorder's knowledge of an |
264 | // "open stream" alive while this body is |
265 | // still sending... |
266 | let ping = ping.clone(); |
267 | let pipe = pipe.map(move |x| { |
268 | drop(conn_drop_ref); |
269 | drop(ping); |
270 | x |
271 | }); |
272 | // Clear send task |
273 | self.executor.execute(pipe); |
274 | } |
275 | } |
276 | } |
277 | |
278 | None |
279 | } else { |
280 | Some(f.body_tx) |
281 | }; |
282 | |
283 | let fut = f.fut.map(move |result| match result { |
284 | Ok(res) => { |
285 | // record that we got the response headers |
286 | ping.record_non_data(); |
287 | |
288 | let content_length = headers::content_length_parse_all(res.headers()); |
289 | if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) { |
290 | if content_length.map_or(false, |len| len != 0) { |
291 | warn!("h2 connect response with non-zero body not supported" ); |
292 | |
293 | send_stream.send_reset(h2::Reason::INTERNAL_ERROR); |
294 | return Err(( |
295 | crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), |
296 | None, |
297 | )); |
298 | } |
299 | let (parts, recv_stream) = res.into_parts(); |
300 | let mut res = Response::from_parts(parts, Body::empty()); |
301 | |
302 | let (pending, on_upgrade) = crate::upgrade::pending(); |
303 | let io = H2Upgraded { |
304 | ping, |
305 | send_stream: unsafe { UpgradedSendStream::new(send_stream) }, |
306 | recv_stream, |
307 | buf: Bytes::new(), |
308 | }; |
309 | let upgraded = Upgraded::new(io, Bytes::new()); |
310 | |
311 | pending.fulfill(upgraded); |
312 | res.extensions_mut().insert(on_upgrade); |
313 | |
314 | Ok(res) |
315 | } else { |
316 | let res = res.map(|stream| { |
317 | let ping = ping.for_stream(&stream); |
318 | crate::Body::h2(stream, content_length.into(), ping) |
319 | }); |
320 | Ok(res) |
321 | } |
322 | } |
323 | Err(err) => { |
324 | ping.ensure_not_timed_out().map_err(|e| (e, None))?; |
325 | |
326 | debug!("client response error: {}" , err); |
327 | Err((crate::Error::new_h2(err), None)) |
328 | } |
329 | }); |
330 | self.executor.execute(f.cb.send_when(fut)); |
331 | } |
332 | } |
333 | |
334 | impl<B> Future for ClientTask<B> |
335 | where |
336 | B: HttpBody + Send + 'static, |
337 | B::Data: Send, |
338 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
339 | { |
340 | type Output = crate::Result<Dispatched>; |
341 | |
342 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
343 | loop { |
344 | match ready!(self.h2_tx.poll_ready(cx)) { |
345 | Ok(()) => (), |
346 | Err(err) => { |
347 | self.ping.ensure_not_timed_out()?; |
348 | return if err.reason() == Some(::h2::Reason::NO_ERROR) { |
349 | trace!("connection gracefully shutdown" ); |
350 | Poll::Ready(Ok(Dispatched::Shutdown)) |
351 | } else { |
352 | Poll::Ready(Err(crate::Error::new_h2(err))) |
353 | }; |
354 | } |
355 | }; |
356 | |
357 | match self.fut_ctx.take() { |
358 | // If we were waiting on pending open |
359 | // continue where we left off. |
360 | Some(f) => { |
361 | self.poll_pipe(f, cx); |
362 | continue; |
363 | } |
364 | None => (), |
365 | } |
366 | |
367 | match self.req_rx.poll_recv(cx) { |
368 | Poll::Ready(Some((req, cb))) => { |
369 | // check that future hasn't been canceled already |
370 | if cb.is_canceled() { |
371 | trace!("request callback is canceled" ); |
372 | continue; |
373 | } |
374 | let (head, body) = req.into_parts(); |
375 | let mut req = ::http::Request::from_parts(head, ()); |
376 | super::strip_connection_headers(req.headers_mut(), true); |
377 | if let Some(len) = body.size_hint().exact() { |
378 | if len != 0 || headers::method_has_defined_payload_semantics(req.method()) { |
379 | headers::set_content_length_if_missing(req.headers_mut(), len); |
380 | } |
381 | } |
382 | |
383 | let is_connect = req.method() == Method::CONNECT; |
384 | let eos = body.is_end_stream(); |
385 | |
386 | if is_connect { |
387 | if headers::content_length_parse_all(req.headers()) |
388 | .map_or(false, |len| len != 0) |
389 | { |
390 | warn!("h2 connect request with non-zero body not supported" ); |
391 | cb.send(Err(( |
392 | crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), |
393 | None, |
394 | ))); |
395 | continue; |
396 | } |
397 | } |
398 | |
399 | if let Some(protocol) = req.extensions_mut().remove::<Protocol>() { |
400 | req.extensions_mut().insert(protocol.into_inner()); |
401 | } |
402 | |
403 | let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { |
404 | Ok(ok) => ok, |
405 | Err(err) => { |
406 | debug!("client send request error: {}" , err); |
407 | cb.send(Err((crate::Error::new_h2(err), None))); |
408 | continue; |
409 | } |
410 | }; |
411 | |
412 | let f = FutCtx { |
413 | is_connect, |
414 | eos, |
415 | fut, |
416 | body_tx, |
417 | body, |
418 | cb, |
419 | }; |
420 | |
421 | // Check poll_ready() again. |
422 | // If the call to send_request() resulted in the new stream being pending open |
423 | // we have to wait for the open to complete before accepting new requests. |
424 | match self.h2_tx.poll_ready(cx) { |
425 | Poll::Pending => { |
426 | // Save Context |
427 | self.fut_ctx = Some(f); |
428 | return Poll::Pending; |
429 | } |
430 | Poll::Ready(Ok(())) => (), |
431 | Poll::Ready(Err(err)) => { |
432 | f.cb.send(Err((crate::Error::new_h2(err), None))); |
433 | continue; |
434 | } |
435 | } |
436 | self.poll_pipe(f, cx); |
437 | continue; |
438 | } |
439 | |
440 | Poll::Ready(None) => { |
441 | trace!("client::dispatch::Sender dropped" ); |
442 | return Poll::Ready(Ok(Dispatched::Shutdown)); |
443 | } |
444 | |
445 | Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { |
446 | Ok(never) => match never {}, |
447 | Err(_conn_is_eof) => { |
448 | trace!("connection task is closed, closing dispatch task" ); |
449 | return Poll::Ready(Ok(Dispatched::Shutdown)); |
450 | } |
451 | }, |
452 | } |
453 | } |
454 | } |
455 | } |
456 | |