1use std::error::Error as StdError;
2#[cfg(feature = "runtime")]
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_channel::{mpsc, oneshot};
7use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
8use futures_util::stream::StreamExt as _;
9use h2::client::{Builder, SendRequest};
10use http::{Method, StatusCode};
11use tokio::io::{AsyncRead, AsyncWrite};
12use tracing::{debug, trace, warn};
13
14use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
15use crate::body::HttpBody;
16use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
17use crate::ext::Protocol;
18use crate::headers;
19use crate::proto::h2::UpgradedSendStream;
20use crate::proto::Dispatched;
21use crate::upgrade::Upgraded;
22use crate::{Body, Request, Response};
23
24type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
25
26///// An mpsc channel is used to help notify the `Connection` task when *all*
27///// other handles to it have been dropped, so that it can shutdown.
28type ConnDropRef = mpsc::Sender<Never>;
29
30///// A oneshot channel watches the `Connection` task, and when it completes,
31///// the "dispatch" task will be notified and can shutdown sooner.
32type ConnEof = oneshot::Receiver<Never>;
33
34// Our defaults are chosen for the "majority" case, which usually are not
35// resource constrained, and so the spec default of 64kb can be too limiting
36// for performance.
37const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
38const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
39const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
40const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
41
42#[derive(Clone, Debug)]
43pub(crate) struct Config {
44 pub(crate) adaptive_window: bool,
45 pub(crate) initial_conn_window_size: u32,
46 pub(crate) initial_stream_window_size: u32,
47 pub(crate) max_frame_size: u32,
48 #[cfg(feature = "runtime")]
49 pub(crate) keep_alive_interval: Option<Duration>,
50 #[cfg(feature = "runtime")]
51 pub(crate) keep_alive_timeout: Duration,
52 #[cfg(feature = "runtime")]
53 pub(crate) keep_alive_while_idle: bool,
54 pub(crate) max_concurrent_reset_streams: Option<usize>,
55 pub(crate) max_send_buffer_size: usize,
56}
57
58impl Default for Config {
59 fn default() -> Config {
60 Config {
61 adaptive_window: false,
62 initial_conn_window_size: DEFAULT_CONN_WINDOW,
63 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
64 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
65 #[cfg(feature = "runtime")]
66 keep_alive_interval: None,
67 #[cfg(feature = "runtime")]
68 keep_alive_timeout: Duration::from_secs(20),
69 #[cfg(feature = "runtime")]
70 keep_alive_while_idle: false,
71 max_concurrent_reset_streams: None,
72 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
73 }
74 }
75}
76
77fn new_builder(config: &Config) -> Builder {
78 let mut builder: Builder = Builder::default();
79 builder
80 .initial_window_size(config.initial_stream_window_size)
81 .initial_connection_window_size(config.initial_conn_window_size)
82 .max_frame_size(config.max_frame_size)
83 .max_send_buffer_size(config.max_send_buffer_size)
84 .enable_push(enabled:false);
85 if let Some(max: usize) = config.max_concurrent_reset_streams {
86 builder.max_concurrent_reset_streams(max);
87 }
88 builder
89}
90
91fn new_ping_config(config: &Config) -> ping::Config {
92 ping::Config {
93 bdp_initial_window: if config.adaptive_window {
94 Some(config.initial_stream_window_size)
95 } else {
96 None
97 },
98 #[cfg(feature = "runtime")]
99 keep_alive_interval: config.keep_alive_interval,
100 #[cfg(feature = "runtime")]
101 keep_alive_timeout: config.keep_alive_timeout,
102 #[cfg(feature = "runtime")]
103 keep_alive_while_idle: config.keep_alive_while_idle,
104 }
105}
106
107pub(crate) async fn handshake<T, B>(
108 io: T,
109 req_rx: ClientRx<B>,
110 config: &Config,
111 exec: Exec,
112) -> crate::Result<ClientTask<B>>
113where
114 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
115 B: HttpBody,
116 B::Data: Send + 'static,
117{
118 let (h2_tx, mut conn) = new_builder(config)
119 .handshake::<_, SendBuf<B::Data>>(io)
120 .await
121 .map_err(crate::Error::new_h2)?;
122
123 // An mpsc channel is used entirely to detect when the
124 // 'Client' has been dropped. This is to get around a bug
125 // in h2 where dropping all SendRequests won't notify a
126 // parked Connection.
127 let (conn_drop_ref, rx) = mpsc::channel(1);
128 let (cancel_tx, conn_eof) = oneshot::channel();
129
130 let conn_drop_rx = rx.into_future().map(|(item, _rx)| {
131 if let Some(never) = item {
132 match never {}
133 }
134 });
135
136 let ping_config = new_ping_config(&config);
137
138 let (conn, ping) = if ping_config.is_enabled() {
139 let pp = conn.ping_pong().expect("conn.ping_pong");
140 let (recorder, mut ponger) = ping::channel(pp, ping_config);
141
142 let conn = future::poll_fn(move |cx| {
143 match ponger.poll(cx) {
144 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
145 conn.set_target_window_size(wnd);
146 conn.set_initial_window_size(wnd)?;
147 }
148 #[cfg(feature = "runtime")]
149 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
150 debug!("connection keep-alive timed out");
151 return Poll::Ready(Ok(()));
152 }
153 Poll::Pending => {}
154 }
155
156 Pin::new(&mut conn).poll(cx)
157 });
158 (Either::Left(conn), recorder)
159 } else {
160 (Either::Right(conn), ping::disabled())
161 };
162 let conn = conn.map_err(|e| debug!("connection error: {}", e));
163
164 exec.execute(conn_task(conn, conn_drop_rx, cancel_tx));
165
166 Ok(ClientTask {
167 ping,
168 conn_drop_ref,
169 conn_eof,
170 executor: exec,
171 h2_tx,
172 req_rx,
173 })
174}
175
176async fn conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Never>)
177where
178 C: Future + Unpin,
179 D: Future<Output = ()> + Unpin,
180{
181 match future::select(future1:conn, future2:drop_rx).await {
182 Either::Left(_) => {
183 // ok or err, the `conn` has finished
184 }
185 Either::Right(((), conn: C)) => {
186 // mpsc has been dropped, hopefully polling
187 // the connection some more should start shutdown
188 // and then close
189 trace!("send_request dropped, starting conn shutdown");
190 drop(cancel_tx);
191 let _ = conn.await;
192 }
193 }
194}
195
196pub(crate) struct ClientTask<B>
197where
198 B: HttpBody,
199{
200 ping: ping::Recorder,
201 conn_drop_ref: ConnDropRef,
202 conn_eof: ConnEof,
203 executor: Exec,
204 h2_tx: SendRequest<SendBuf<B::Data>>,
205 req_rx: ClientRx<B>,
206}
207
208impl<B> ClientTask<B>
209where
210 B: HttpBody + 'static,
211{
212 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
213 self.h2_tx.is_extended_connect_protocol_enabled()
214 }
215}
216
217impl<B> Future for ClientTask<B>
218where
219 B: HttpBody + Send + 'static,
220 B::Data: Send,
221 B::Error: Into<Box<dyn StdError + Send + Sync>>,
222{
223 type Output = crate::Result<Dispatched>;
224
225 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
226 loop {
227 match ready!(self.h2_tx.poll_ready(cx)) {
228 Ok(()) => (),
229 Err(err) => {
230 self.ping.ensure_not_timed_out()?;
231 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
232 trace!("connection gracefully shutdown");
233 Poll::Ready(Ok(Dispatched::Shutdown))
234 } else {
235 Poll::Ready(Err(crate::Error::new_h2(err)))
236 };
237 }
238 };
239
240 match self.req_rx.poll_recv(cx) {
241 Poll::Ready(Some((req, cb))) => {
242 // check that future hasn't been canceled already
243 if cb.is_canceled() {
244 trace!("request callback is canceled");
245 continue;
246 }
247 let (head, body) = req.into_parts();
248 let mut req = ::http::Request::from_parts(head, ());
249 super::strip_connection_headers(req.headers_mut(), true);
250 if let Some(len) = body.size_hint().exact() {
251 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
252 headers::set_content_length_if_missing(req.headers_mut(), len);
253 }
254 }
255
256 let is_connect = req.method() == Method::CONNECT;
257 let eos = body.is_end_stream();
258 let ping = self.ping.clone();
259
260 if is_connect {
261 if headers::content_length_parse_all(req.headers())
262 .map_or(false, |len| len != 0)
263 {
264 warn!("h2 connect request with non-zero body not supported");
265 cb.send(Err((
266 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
267 None,
268 )));
269 continue;
270 }
271 }
272
273 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
274 req.extensions_mut().insert(protocol.into_inner());
275 }
276
277 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
278 Ok(ok) => ok,
279 Err(err) => {
280 debug!("client send request error: {}", err);
281 cb.send(Err((crate::Error::new_h2(err), None)));
282 continue;
283 }
284 };
285
286 let send_stream = if !is_connect {
287 if !eos {
288 let mut pipe =
289 Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
290 if let Err(e) = res {
291 debug!("client request body error: {}", e);
292 }
293 });
294
295 // eagerly see if the body pipe is ready and
296 // can thus skip allocating in the executor
297 match Pin::new(&mut pipe).poll(cx) {
298 Poll::Ready(_) => (),
299 Poll::Pending => {
300 let conn_drop_ref = self.conn_drop_ref.clone();
301 // keep the ping recorder's knowledge of an
302 // "open stream" alive while this body is
303 // still sending...
304 let ping = ping.clone();
305 let pipe = pipe.map(move |x| {
306 drop(conn_drop_ref);
307 drop(ping);
308 x
309 });
310 self.executor.execute(pipe);
311 }
312 }
313 }
314
315 None
316 } else {
317 Some(body_tx)
318 };
319
320 let fut = fut.map(move |result| match result {
321 Ok(res) => {
322 // record that we got the response headers
323 ping.record_non_data();
324
325 let content_length = headers::content_length_parse_all(res.headers());
326 if let (Some(mut send_stream), StatusCode::OK) =
327 (send_stream, res.status())
328 {
329 if content_length.map_or(false, |len| len != 0) {
330 warn!("h2 connect response with non-zero body not supported");
331
332 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
333 return Err((
334 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
335 None,
336 ));
337 }
338 let (parts, recv_stream) = res.into_parts();
339 let mut res = Response::from_parts(parts, Body::empty());
340
341 let (pending, on_upgrade) = crate::upgrade::pending();
342 let io = H2Upgraded {
343 ping,
344 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
345 recv_stream,
346 buf: Bytes::new(),
347 };
348 let upgraded = Upgraded::new(io, Bytes::new());
349
350 pending.fulfill(upgraded);
351 res.extensions_mut().insert(on_upgrade);
352
353 Ok(res)
354 } else {
355 let res = res.map(|stream| {
356 let ping = ping.for_stream(&stream);
357 crate::Body::h2(stream, content_length.into(), ping)
358 });
359 Ok(res)
360 }
361 }
362 Err(err) => {
363 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
364
365 debug!("client response error: {}", err);
366 Err((crate::Error::new_h2(err), None))
367 }
368 });
369 self.executor.execute(cb.send_when(fut));
370 continue;
371 }
372
373 Poll::Ready(None) => {
374 trace!("client::dispatch::Sender dropped");
375 return Poll::Ready(Ok(Dispatched::Shutdown));
376 }
377
378 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
379 Ok(never) => match never {},
380 Err(_conn_is_eof) => {
381 trace!("connection task is closed, closing dispatch task");
382 return Poll::Ready(Ok(Dispatched::Shutdown));
383 }
384 },
385 }
386 }
387 }
388}
389