| 1 | use std::{ |
| 2 | convert::Infallible, |
| 3 | future::Future, |
| 4 | marker::PhantomData, |
| 5 | pin::Pin, |
| 6 | task::{Context, Poll}, |
| 7 | time::Duration, |
| 8 | }; |
| 9 | |
| 10 | use crate::rt::{Read, Write}; |
| 11 | use bytes::Bytes; |
| 12 | use futures_channel::mpsc::{Receiver, Sender}; |
| 13 | use futures_channel::{mpsc, oneshot}; |
| 14 | use futures_util::future::{Either, FusedFuture, FutureExt as _}; |
| 15 | use futures_util::ready; |
| 16 | use futures_util::stream::{StreamExt as _, StreamFuture}; |
| 17 | use h2::client::{Builder, Connection, SendRequest}; |
| 18 | use h2::SendStream; |
| 19 | use http::{Method, StatusCode}; |
| 20 | use pin_project_lite::pin_project; |
| 21 | |
| 22 | use super::ping::{Ponger, Recorder}; |
| 23 | use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; |
| 24 | use crate::body::{Body, Incoming as IncomingBody}; |
| 25 | use crate::client::dispatch::{Callback, SendWhen, TrySendError}; |
| 26 | use crate::common::io::Compat; |
| 27 | use crate::common::time::Time; |
| 28 | use crate::ext::Protocol; |
| 29 | use crate::headers; |
| 30 | use crate::proto::h2::UpgradedSendStream; |
| 31 | use crate::proto::Dispatched; |
| 32 | use crate::rt::bounds::Http2ClientConnExec; |
| 33 | use crate::upgrade::Upgraded; |
| 34 | use crate::{Request, Response}; |
| 35 | use h2::client::ResponseFuture; |
| 36 | |
| 37 | type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>; |
| 38 | |
| 39 | ///// An mpsc channel is used to help notify the `Connection` task when *all* |
| 40 | ///// other handles to it have been dropped, so that it can shutdown. |
| 41 | type ConnDropRef = mpsc::Sender<Infallible>; |
| 42 | |
| 43 | ///// A oneshot channel watches the `Connection` task, and when it completes, |
| 44 | ///// the "dispatch" task will be notified and can shutdown sooner. |
| 45 | type ConnEof = oneshot::Receiver<Infallible>; |
| 46 | |
| 47 | // Our defaults are chosen for the "majority" case, which usually are not |
| 48 | // resource constrained, and so the spec default of 64kb can be too limiting |
| 49 | // for performance. |
| 50 | const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb |
| 51 | const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb |
| 52 | const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb |
| 53 | const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb |
| 54 | const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb |
| 55 | |
| 56 | // The maximum number of concurrent streams that the client is allowed to open |
| 57 | // before it receives the initial SETTINGS frame from the server. |
| 58 | // This default value is derived from what the HTTP/2 spec recommends as the |
| 59 | // minimum value that endpoints advertise to their peers. It means that using |
| 60 | // this value will minimize the chance of the failure where the local endpoint |
| 61 | // attempts to open too many streams and gets rejected by the remote peer with |
| 62 | // the `REFUSED_STREAM` error. |
| 63 | const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100; |
| 64 | |
| 65 | #[derive (Clone, Debug)] |
| 66 | pub(crate) struct Config { |
| 67 | pub(crate) adaptive_window: bool, |
| 68 | pub(crate) initial_conn_window_size: u32, |
| 69 | pub(crate) initial_stream_window_size: u32, |
| 70 | pub(crate) initial_max_send_streams: usize, |
| 71 | pub(crate) max_frame_size: Option<u32>, |
| 72 | pub(crate) max_header_list_size: u32, |
| 73 | pub(crate) keep_alive_interval: Option<Duration>, |
| 74 | pub(crate) keep_alive_timeout: Duration, |
| 75 | pub(crate) keep_alive_while_idle: bool, |
| 76 | pub(crate) max_concurrent_reset_streams: Option<usize>, |
| 77 | pub(crate) max_send_buffer_size: usize, |
| 78 | pub(crate) max_pending_accept_reset_streams: Option<usize>, |
| 79 | pub(crate) header_table_size: Option<u32>, |
| 80 | pub(crate) max_concurrent_streams: Option<u32>, |
| 81 | } |
| 82 | |
| 83 | impl Default for Config { |
| 84 | fn default() -> Config { |
| 85 | Config { |
| 86 | adaptive_window: false, |
| 87 | initial_conn_window_size: DEFAULT_CONN_WINDOW, |
| 88 | initial_stream_window_size: DEFAULT_STREAM_WINDOW, |
| 89 | initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS, |
| 90 | max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE), |
| 91 | max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE, |
| 92 | keep_alive_interval: None, |
| 93 | keep_alive_timeout: Duration::from_secs(20), |
| 94 | keep_alive_while_idle: false, |
| 95 | max_concurrent_reset_streams: None, |
| 96 | max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, |
| 97 | max_pending_accept_reset_streams: None, |
| 98 | header_table_size: None, |
| 99 | max_concurrent_streams: None, |
| 100 | } |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | fn new_builder(config: &Config) -> Builder { |
| 105 | let mut builder = Builder::default(); |
| 106 | builder |
| 107 | .initial_max_send_streams(config.initial_max_send_streams) |
| 108 | .initial_window_size(config.initial_stream_window_size) |
| 109 | .initial_connection_window_size(config.initial_conn_window_size) |
| 110 | .max_header_list_size(config.max_header_list_size) |
| 111 | .max_send_buffer_size(config.max_send_buffer_size) |
| 112 | .enable_push(false); |
| 113 | if let Some(max) = config.max_frame_size { |
| 114 | builder.max_frame_size(max); |
| 115 | } |
| 116 | if let Some(max) = config.max_concurrent_reset_streams { |
| 117 | builder.max_concurrent_reset_streams(max); |
| 118 | } |
| 119 | if let Some(max) = config.max_pending_accept_reset_streams { |
| 120 | builder.max_pending_accept_reset_streams(max); |
| 121 | } |
| 122 | if let Some(size) = config.header_table_size { |
| 123 | builder.header_table_size(size); |
| 124 | } |
| 125 | if let Some(max) = config.max_concurrent_streams { |
| 126 | builder.max_concurrent_streams(max); |
| 127 | } |
| 128 | builder |
| 129 | } |
| 130 | |
| 131 | fn new_ping_config(config: &Config) -> ping::Config { |
| 132 | ping::Config { |
| 133 | bdp_initial_window: if config.adaptive_window { |
| 134 | Some(config.initial_stream_window_size) |
| 135 | } else { |
| 136 | None |
| 137 | }, |
| 138 | keep_alive_interval: config.keep_alive_interval, |
| 139 | keep_alive_timeout: config.keep_alive_timeout, |
| 140 | keep_alive_while_idle: config.keep_alive_while_idle, |
| 141 | } |
| 142 | } |
| 143 | |
| 144 | pub(crate) async fn handshake<T, B, E>( |
| 145 | io: T, |
| 146 | req_rx: ClientRx<B>, |
| 147 | config: &Config, |
| 148 | mut exec: E, |
| 149 | timer: Time, |
| 150 | ) -> crate::Result<ClientTask<B, E, T>> |
| 151 | where |
| 152 | T: Read + Write + Unpin, |
| 153 | B: Body + 'static, |
| 154 | B::Data: Send + 'static, |
| 155 | E: Http2ClientConnExec<B, T> + Unpin, |
| 156 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 157 | { |
| 158 | let (h2_tx, mut conn) = new_builder(config) |
| 159 | .handshake::<_, SendBuf<B::Data>>(Compat::new(io)) |
| 160 | .await |
| 161 | .map_err(crate::Error::new_h2)?; |
| 162 | |
| 163 | // An mpsc channel is used entirely to detect when the |
| 164 | // 'Client' has been dropped. This is to get around a bug |
| 165 | // in h2 where dropping all SendRequests won't notify a |
| 166 | // parked Connection. |
| 167 | let (conn_drop_ref, rx) = mpsc::channel(1); |
| 168 | let (cancel_tx, conn_eof) = oneshot::channel(); |
| 169 | |
| 170 | let conn_drop_rx = rx.into_future(); |
| 171 | |
| 172 | let ping_config = new_ping_config(config); |
| 173 | |
| 174 | let (conn, ping) = if ping_config.is_enabled() { |
| 175 | let pp = conn.ping_pong().expect("conn.ping_pong" ); |
| 176 | let (recorder, ponger) = ping::channel(pp, ping_config, timer); |
| 177 | |
| 178 | let conn: Conn<_, B> = Conn::new(ponger, conn); |
| 179 | (Either::Left(conn), recorder) |
| 180 | } else { |
| 181 | (Either::Right(conn), ping::disabled()) |
| 182 | }; |
| 183 | let conn: ConnMapErr<T, B> = ConnMapErr { |
| 184 | conn, |
| 185 | is_terminated: false, |
| 186 | }; |
| 187 | |
| 188 | exec.execute_h2_future(H2ClientFuture::Task { |
| 189 | task: ConnTask::new(conn, conn_drop_rx, cancel_tx), |
| 190 | }); |
| 191 | |
| 192 | Ok(ClientTask { |
| 193 | ping, |
| 194 | conn_drop_ref, |
| 195 | conn_eof, |
| 196 | executor: exec, |
| 197 | h2_tx, |
| 198 | req_rx, |
| 199 | fut_ctx: None, |
| 200 | marker: PhantomData, |
| 201 | }) |
| 202 | } |
| 203 | |
| 204 | pin_project! { |
| 205 | struct Conn<T, B> |
| 206 | where |
| 207 | B: Body, |
| 208 | { |
| 209 | #[pin] |
| 210 | ponger: Ponger, |
| 211 | #[pin] |
| 212 | conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>, |
| 213 | } |
| 214 | } |
| 215 | |
| 216 | impl<T, B> Conn<T, B> |
| 217 | where |
| 218 | B: Body, |
| 219 | T: Read + Write + Unpin, |
| 220 | { |
| 221 | fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self { |
| 222 | Conn { ponger, conn } |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | impl<T, B> Future for Conn<T, B> |
| 227 | where |
| 228 | B: Body, |
| 229 | T: Read + Write + Unpin, |
| 230 | { |
| 231 | type Output = Result<(), h2::Error>; |
| 232 | |
| 233 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 234 | let mut this: Projection<'_, T, B> = self.project(); |
| 235 | match this.ponger.poll(cx) { |
| 236 | Poll::Ready(ping::Ponged::SizeUpdate(wnd: u32)) => { |
| 237 | this.conn.set_target_window_size(wnd); |
| 238 | this.conn.set_initial_window_size(wnd)?; |
| 239 | } |
| 240 | Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { |
| 241 | debug!("connection keep-alive timed out" ); |
| 242 | return Poll::Ready(Ok(())); |
| 243 | } |
| 244 | Poll::Pending => {} |
| 245 | } |
| 246 | |
| 247 | Pin::new(&mut this.conn).poll(cx) |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | pin_project! { |
| 252 | struct ConnMapErr<T, B> |
| 253 | where |
| 254 | B: Body, |
| 255 | T: Read, |
| 256 | T: Write, |
| 257 | T: Unpin, |
| 258 | { |
| 259 | #[pin] |
| 260 | conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>, |
| 261 | #[pin] |
| 262 | is_terminated: bool, |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | impl<T, B> Future for ConnMapErr<T, B> |
| 267 | where |
| 268 | B: Body, |
| 269 | T: Read + Write + Unpin, |
| 270 | { |
| 271 | type Output = Result<(), ()>; |
| 272 | |
| 273 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 274 | let mut this: Projection<'_, T, B> = self.project(); |
| 275 | |
| 276 | if *this.is_terminated { |
| 277 | return Poll::Pending; |
| 278 | } |
| 279 | let polled: Poll> = this.conn.poll(cx); |
| 280 | if polled.is_ready() { |
| 281 | *this.is_terminated = true; |
| 282 | } |
| 283 | polled.map_err(|_e: Error| { |
| 284 | debug!(error = %_e, "connection error" ); |
| 285 | }) |
| 286 | } |
| 287 | } |
| 288 | |
| 289 | impl<T, B> FusedFuture for ConnMapErr<T, B> |
| 290 | where |
| 291 | B: Body, |
| 292 | T: Read + Write + Unpin, |
| 293 | { |
| 294 | fn is_terminated(&self) -> bool { |
| 295 | self.is_terminated |
| 296 | } |
| 297 | } |
| 298 | |
| 299 | pin_project! { |
| 300 | pub struct ConnTask<T, B> |
| 301 | where |
| 302 | B: Body, |
| 303 | T: Read, |
| 304 | T: Write, |
| 305 | T: Unpin, |
| 306 | { |
| 307 | #[pin] |
| 308 | drop_rx: StreamFuture<Receiver<Infallible>>, |
| 309 | #[pin] |
| 310 | cancel_tx: Option<oneshot::Sender<Infallible>>, |
| 311 | #[pin] |
| 312 | conn: ConnMapErr<T, B>, |
| 313 | } |
| 314 | } |
| 315 | |
| 316 | impl<T, B> ConnTask<T, B> |
| 317 | where |
| 318 | B: Body, |
| 319 | T: Read + Write + Unpin, |
| 320 | { |
| 321 | fn new( |
| 322 | conn: ConnMapErr<T, B>, |
| 323 | drop_rx: StreamFuture<Receiver<Infallible>>, |
| 324 | cancel_tx: oneshot::Sender<Infallible>, |
| 325 | ) -> Self { |
| 326 | Self { |
| 327 | drop_rx, |
| 328 | cancel_tx: Some(cancel_tx), |
| 329 | conn, |
| 330 | } |
| 331 | } |
| 332 | } |
| 333 | |
| 334 | impl<T, B> Future for ConnTask<T, B> |
| 335 | where |
| 336 | B: Body, |
| 337 | T: Read + Write + Unpin, |
| 338 | { |
| 339 | type Output = (); |
| 340 | |
| 341 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 342 | let mut this: Projection<'_, T, B> = self.project(); |
| 343 | |
| 344 | if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() { |
| 345 | // ok or err, the `conn` has finished. |
| 346 | return Poll::Ready(()); |
| 347 | } |
| 348 | |
| 349 | if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() { |
| 350 | // mpsc has been dropped, hopefully polling |
| 351 | // the connection some more should start shutdown |
| 352 | // and then close. |
| 353 | trace!("send_request dropped, starting conn shutdown" ); |
| 354 | drop(this.cancel_tx.take().expect(msg:"ConnTask Future polled twice" )); |
| 355 | } |
| 356 | |
| 357 | Poll::Pending |
| 358 | } |
| 359 | } |
| 360 | |
| 361 | pin_project! { |
| 362 | #[project = H2ClientFutureProject] |
| 363 | pub enum H2ClientFuture<B, T> |
| 364 | where |
| 365 | B: http_body::Body, |
| 366 | B: 'static, |
| 367 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 368 | T: Read, |
| 369 | T: Write, |
| 370 | T: Unpin, |
| 371 | { |
| 372 | Pipe { |
| 373 | #[pin] |
| 374 | pipe: PipeMap<B>, |
| 375 | }, |
| 376 | Send { |
| 377 | #[pin] |
| 378 | send_when: SendWhen<B>, |
| 379 | }, |
| 380 | Task { |
| 381 | #[pin] |
| 382 | task: ConnTask<T, B>, |
| 383 | }, |
| 384 | } |
| 385 | } |
| 386 | |
| 387 | impl<B, T> Future for H2ClientFuture<B, T> |
| 388 | where |
| 389 | B: http_body::Body + 'static, |
| 390 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 391 | T: Read + Write + Unpin, |
| 392 | { |
| 393 | type Output = (); |
| 394 | |
| 395 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { |
| 396 | let this: H2ClientFutureProject<'_, …, …> = self.project(); |
| 397 | |
| 398 | match this { |
| 399 | H2ClientFutureProject::Pipe { pipe: Pin<&mut PipeMap> } => pipe.poll(cx), |
| 400 | H2ClientFutureProject::Send { send_when: Pin<&mut SendWhen> } => send_when.poll(cx), |
| 401 | H2ClientFutureProject::Task { task: Pin<&mut ConnTask> } => task.poll(cx), |
| 402 | } |
| 403 | } |
| 404 | } |
| 405 | |
| 406 | struct FutCtx<B> |
| 407 | where |
| 408 | B: Body, |
| 409 | { |
| 410 | is_connect: bool, |
| 411 | eos: bool, |
| 412 | fut: ResponseFuture, |
| 413 | body_tx: SendStream<SendBuf<B::Data>>, |
| 414 | body: B, |
| 415 | cb: Callback<Request<B>, Response<IncomingBody>>, |
| 416 | } |
| 417 | |
| 418 | impl<B: Body> Unpin for FutCtx<B> {} |
| 419 | |
| 420 | pub(crate) struct ClientTask<B, E, T> |
| 421 | where |
| 422 | B: Body, |
| 423 | E: Unpin, |
| 424 | { |
| 425 | ping: ping::Recorder, |
| 426 | conn_drop_ref: ConnDropRef, |
| 427 | conn_eof: ConnEof, |
| 428 | executor: E, |
| 429 | h2_tx: SendRequest<SendBuf<B::Data>>, |
| 430 | req_rx: ClientRx<B>, |
| 431 | fut_ctx: Option<FutCtx<B>>, |
| 432 | marker: PhantomData<T>, |
| 433 | } |
| 434 | |
| 435 | impl<B, E, T> ClientTask<B, E, T> |
| 436 | where |
| 437 | B: Body + 'static, |
| 438 | E: Http2ClientConnExec<B, T> + Unpin, |
| 439 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 440 | T: Read + Write + Unpin, |
| 441 | { |
| 442 | pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { |
| 443 | self.h2_tx.is_extended_connect_protocol_enabled() |
| 444 | } |
| 445 | } |
| 446 | |
| 447 | pin_project! { |
| 448 | pub struct PipeMap<S> |
| 449 | where |
| 450 | S: Body, |
| 451 | { |
| 452 | #[pin] |
| 453 | pipe: PipeToSendStream<S>, |
| 454 | #[pin] |
| 455 | conn_drop_ref: Option<Sender<Infallible>>, |
| 456 | #[pin] |
| 457 | ping: Option<Recorder>, |
| 458 | } |
| 459 | } |
| 460 | |
| 461 | impl<B> Future for PipeMap<B> |
| 462 | where |
| 463 | B: http_body::Body, |
| 464 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 465 | { |
| 466 | type Output = (); |
| 467 | |
| 468 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { |
| 469 | let mut this: Projection<'_, B> = self.project(); |
| 470 | |
| 471 | match this.pipe.poll_unpin(cx) { |
| 472 | Poll::Ready(result: Result<(), Error>) => { |
| 473 | if let Err(_e: Error) = result { |
| 474 | debug!("client request body error: {}" , _e); |
| 475 | } |
| 476 | drop(this.conn_drop_ref.take().expect(msg:"Future polled twice" )); |
| 477 | drop(this.ping.take().expect(msg:"Future polled twice" )); |
| 478 | return Poll::Ready(()); |
| 479 | } |
| 480 | Poll::Pending => (), |
| 481 | }; |
| 482 | Poll::Pending |
| 483 | } |
| 484 | } |
| 485 | |
| 486 | impl<B, E, T> ClientTask<B, E, T> |
| 487 | where |
| 488 | B: Body + 'static + Unpin, |
| 489 | B::Data: Send, |
| 490 | E: Http2ClientConnExec<B, T> + Unpin, |
| 491 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 492 | T: Read + Write + Unpin, |
| 493 | { |
| 494 | fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) { |
| 495 | let ping = self.ping.clone(); |
| 496 | |
| 497 | let send_stream = if !f.is_connect { |
| 498 | if !f.eos { |
| 499 | let mut pipe = PipeToSendStream::new(f.body, f.body_tx); |
| 500 | |
| 501 | // eagerly see if the body pipe is ready and |
| 502 | // can thus skip allocating in the executor |
| 503 | match Pin::new(&mut pipe).poll(cx) { |
| 504 | Poll::Ready(_) => (), |
| 505 | Poll::Pending => { |
| 506 | let conn_drop_ref = self.conn_drop_ref.clone(); |
| 507 | // keep the ping recorder's knowledge of an |
| 508 | // "open stream" alive while this body is |
| 509 | // still sending... |
| 510 | let ping = ping.clone(); |
| 511 | |
| 512 | let pipe = PipeMap { |
| 513 | pipe, |
| 514 | conn_drop_ref: Some(conn_drop_ref), |
| 515 | ping: Some(ping), |
| 516 | }; |
| 517 | // Clear send task |
| 518 | self.executor |
| 519 | .execute_h2_future(H2ClientFuture::Pipe { pipe }); |
| 520 | } |
| 521 | } |
| 522 | } |
| 523 | |
| 524 | None |
| 525 | } else { |
| 526 | Some(f.body_tx) |
| 527 | }; |
| 528 | |
| 529 | self.executor.execute_h2_future(H2ClientFuture::Send { |
| 530 | send_when: SendWhen { |
| 531 | when: ResponseFutMap { |
| 532 | fut: f.fut, |
| 533 | ping: Some(ping), |
| 534 | send_stream: Some(send_stream), |
| 535 | }, |
| 536 | call_back: Some(f.cb), |
| 537 | }, |
| 538 | }); |
| 539 | } |
| 540 | } |
| 541 | |
| 542 | pin_project! { |
| 543 | pub(crate) struct ResponseFutMap<B> |
| 544 | where |
| 545 | B: Body, |
| 546 | B: 'static, |
| 547 | { |
| 548 | #[pin] |
| 549 | fut: ResponseFuture, |
| 550 | #[pin] |
| 551 | ping: Option<Recorder>, |
| 552 | #[pin] |
| 553 | send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>, |
| 554 | } |
| 555 | } |
| 556 | |
| 557 | impl<B> Future for ResponseFutMap<B> |
| 558 | where |
| 559 | B: Body + 'static, |
| 560 | { |
| 561 | type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>; |
| 562 | |
| 563 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 564 | let mut this = self.project(); |
| 565 | |
| 566 | let result = ready!(this.fut.poll(cx)); |
| 567 | |
| 568 | let ping = this.ping.take().expect("Future polled twice" ); |
| 569 | let send_stream = this.send_stream.take().expect("Future polled twice" ); |
| 570 | |
| 571 | match result { |
| 572 | Ok(res) => { |
| 573 | // record that we got the response headers |
| 574 | ping.record_non_data(); |
| 575 | |
| 576 | let content_length = headers::content_length_parse_all(res.headers()); |
| 577 | if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) { |
| 578 | if content_length.map_or(false, |len| len != 0) { |
| 579 | warn!("h2 connect response with non-zero body not supported" ); |
| 580 | |
| 581 | send_stream.send_reset(h2::Reason::INTERNAL_ERROR); |
| 582 | return Poll::Ready(Err(( |
| 583 | crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), |
| 584 | None::<Request<B>>, |
| 585 | ))); |
| 586 | } |
| 587 | let (parts, recv_stream) = res.into_parts(); |
| 588 | let mut res = Response::from_parts(parts, IncomingBody::empty()); |
| 589 | |
| 590 | let (pending, on_upgrade) = crate::upgrade::pending(); |
| 591 | let io = H2Upgraded { |
| 592 | ping, |
| 593 | send_stream: unsafe { UpgradedSendStream::new(send_stream) }, |
| 594 | recv_stream, |
| 595 | buf: Bytes::new(), |
| 596 | }; |
| 597 | let upgraded = Upgraded::new(io, Bytes::new()); |
| 598 | |
| 599 | pending.fulfill(upgraded); |
| 600 | res.extensions_mut().insert(on_upgrade); |
| 601 | |
| 602 | Poll::Ready(Ok(res)) |
| 603 | } else { |
| 604 | let res = res.map(|stream| { |
| 605 | let ping = ping.for_stream(&stream); |
| 606 | IncomingBody::h2(stream, content_length.into(), ping) |
| 607 | }); |
| 608 | Poll::Ready(Ok(res)) |
| 609 | } |
| 610 | } |
| 611 | Err(err) => { |
| 612 | ping.ensure_not_timed_out().map_err(|e| (e, None))?; |
| 613 | |
| 614 | debug!("client response error: {}" , err); |
| 615 | Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>))) |
| 616 | } |
| 617 | } |
| 618 | } |
| 619 | } |
| 620 | |
| 621 | impl<B, E, T> Future for ClientTask<B, E, T> |
| 622 | where |
| 623 | B: Body + 'static + Unpin, |
| 624 | B::Data: Send, |
| 625 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 626 | E: Http2ClientConnExec<B, T> + Unpin, |
| 627 | T: Read + Write + Unpin, |
| 628 | { |
| 629 | type Output = crate::Result<Dispatched>; |
| 630 | |
| 631 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 632 | loop { |
| 633 | match ready!(self.h2_tx.poll_ready(cx)) { |
| 634 | Ok(()) => (), |
| 635 | Err(err) => { |
| 636 | self.ping.ensure_not_timed_out()?; |
| 637 | return if err.reason() == Some(::h2::Reason::NO_ERROR) { |
| 638 | trace!("connection gracefully shutdown" ); |
| 639 | Poll::Ready(Ok(Dispatched::Shutdown)) |
| 640 | } else { |
| 641 | Poll::Ready(Err(crate::Error::new_h2(err))) |
| 642 | }; |
| 643 | } |
| 644 | }; |
| 645 | |
| 646 | // If we were waiting on pending open |
| 647 | // continue where we left off. |
| 648 | if let Some(f) = self.fut_ctx.take() { |
| 649 | self.poll_pipe(f, cx); |
| 650 | continue; |
| 651 | } |
| 652 | |
| 653 | match self.req_rx.poll_recv(cx) { |
| 654 | Poll::Ready(Some((req, cb))) => { |
| 655 | // check that future hasn't been canceled already |
| 656 | if cb.is_canceled() { |
| 657 | trace!("request callback is canceled" ); |
| 658 | continue; |
| 659 | } |
| 660 | let (head, body) = req.into_parts(); |
| 661 | let mut req = ::http::Request::from_parts(head, ()); |
| 662 | super::strip_connection_headers(req.headers_mut(), true); |
| 663 | if let Some(len) = body.size_hint().exact() { |
| 664 | if len != 0 || headers::method_has_defined_payload_semantics(req.method()) { |
| 665 | headers::set_content_length_if_missing(req.headers_mut(), len); |
| 666 | } |
| 667 | } |
| 668 | |
| 669 | let is_connect = req.method() == Method::CONNECT; |
| 670 | let eos = body.is_end_stream(); |
| 671 | |
| 672 | if is_connect |
| 673 | && headers::content_length_parse_all(req.headers()) |
| 674 | .map_or(false, |len| len != 0) |
| 675 | { |
| 676 | warn!("h2 connect request with non-zero body not supported" ); |
| 677 | cb.send(Err(TrySendError { |
| 678 | error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), |
| 679 | message: None, |
| 680 | })); |
| 681 | continue; |
| 682 | } |
| 683 | |
| 684 | if let Some(protocol) = req.extensions_mut().remove::<Protocol>() { |
| 685 | req.extensions_mut().insert(protocol.into_inner()); |
| 686 | } |
| 687 | |
| 688 | let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { |
| 689 | Ok(ok) => ok, |
| 690 | Err(err) => { |
| 691 | debug!("client send request error: {}" , err); |
| 692 | cb.send(Err(TrySendError { |
| 693 | error: crate::Error::new_h2(err), |
| 694 | message: None, |
| 695 | })); |
| 696 | continue; |
| 697 | } |
| 698 | }; |
| 699 | |
| 700 | let f = FutCtx { |
| 701 | is_connect, |
| 702 | eos, |
| 703 | fut, |
| 704 | body_tx, |
| 705 | body, |
| 706 | cb, |
| 707 | }; |
| 708 | |
| 709 | // Check poll_ready() again. |
| 710 | // If the call to send_request() resulted in the new stream being pending open |
| 711 | // we have to wait for the open to complete before accepting new requests. |
| 712 | match self.h2_tx.poll_ready(cx) { |
| 713 | Poll::Pending => { |
| 714 | // Save Context |
| 715 | self.fut_ctx = Some(f); |
| 716 | return Poll::Pending; |
| 717 | } |
| 718 | Poll::Ready(Ok(())) => (), |
| 719 | Poll::Ready(Err(err)) => { |
| 720 | f.cb.send(Err(TrySendError { |
| 721 | error: crate::Error::new_h2(err), |
| 722 | message: None, |
| 723 | })); |
| 724 | continue; |
| 725 | } |
| 726 | } |
| 727 | self.poll_pipe(f, cx); |
| 728 | continue; |
| 729 | } |
| 730 | |
| 731 | Poll::Ready(None) => { |
| 732 | trace!("client::dispatch::Sender dropped" ); |
| 733 | return Poll::Ready(Ok(Dispatched::Shutdown)); |
| 734 | } |
| 735 | |
| 736 | Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { |
| 737 | // As of Rust 1.82, this pattern is no longer needed, and emits a warning. |
| 738 | // But we cannot remove it as long as MSRV is less than that. |
| 739 | #[allow (unused)] |
| 740 | Ok(never) => match never {}, |
| 741 | Err(_conn_is_eof) => { |
| 742 | trace!("connection task is closed, closing dispatch task" ); |
| 743 | return Poll::Ready(Ok(Dispatched::Shutdown)); |
| 744 | } |
| 745 | }, |
| 746 | } |
| 747 | } |
| 748 | } |
| 749 | } |
| 750 | |