1 | use std::{ |
2 | convert::Infallible, |
3 | future::Future, |
4 | marker::PhantomData, |
5 | pin::Pin, |
6 | task::{Context, Poll}, |
7 | time::Duration, |
8 | }; |
9 | |
10 | use crate::rt::{Read, Write}; |
11 | use bytes::Bytes; |
12 | use futures_channel::mpsc::{Receiver, Sender}; |
13 | use futures_channel::{mpsc, oneshot}; |
14 | use futures_util::future::{Either, FusedFuture, FutureExt as _}; |
15 | use futures_util::ready; |
16 | use futures_util::stream::{StreamExt as _, StreamFuture}; |
17 | use h2::client::{Builder, Connection, SendRequest}; |
18 | use h2::SendStream; |
19 | use http::{Method, StatusCode}; |
20 | use pin_project_lite::pin_project; |
21 | |
22 | use super::ping::{Ponger, Recorder}; |
23 | use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; |
24 | use crate::body::{Body, Incoming as IncomingBody}; |
25 | use crate::client::dispatch::{Callback, SendWhen, TrySendError}; |
26 | use crate::common::io::Compat; |
27 | use crate::common::time::Time; |
28 | use crate::ext::Protocol; |
29 | use crate::headers; |
30 | use crate::proto::h2::UpgradedSendStream; |
31 | use crate::proto::Dispatched; |
32 | use crate::rt::bounds::Http2ClientConnExec; |
33 | use crate::upgrade::Upgraded; |
34 | use crate::{Request, Response}; |
35 | use h2::client::ResponseFuture; |
36 | |
37 | type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>; |
38 | |
39 | ///// An mpsc channel is used to help notify the `Connection` task when *all* |
40 | ///// other handles to it have been dropped, so that it can shutdown. |
41 | type ConnDropRef = mpsc::Sender<Infallible>; |
42 | |
43 | ///// A oneshot channel watches the `Connection` task, and when it completes, |
44 | ///// the "dispatch" task will be notified and can shutdown sooner. |
45 | type ConnEof = oneshot::Receiver<Infallible>; |
46 | |
47 | // Our defaults are chosen for the "majority" case, which usually are not |
48 | // resource constrained, and so the spec default of 64kb can be too limiting |
49 | // for performance. |
50 | const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb |
51 | const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb |
52 | const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb |
53 | const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb |
54 | const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb |
55 | |
56 | // The maximum number of concurrent streams that the client is allowed to open |
57 | // before it receives the initial SETTINGS frame from the server. |
58 | // This default value is derived from what the HTTP/2 spec recommends as the |
59 | // minimum value that endpoints advertise to their peers. It means that using |
60 | // this value will minimize the chance of the failure where the local endpoint |
61 | // attempts to open too many streams and gets rejected by the remote peer with |
62 | // the `REFUSED_STREAM` error. |
63 | const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100; |
64 | |
65 | #[derive (Clone, Debug)] |
66 | pub(crate) struct Config { |
67 | pub(crate) adaptive_window: bool, |
68 | pub(crate) initial_conn_window_size: u32, |
69 | pub(crate) initial_stream_window_size: u32, |
70 | pub(crate) initial_max_send_streams: usize, |
71 | pub(crate) max_frame_size: Option<u32>, |
72 | pub(crate) max_header_list_size: u32, |
73 | pub(crate) keep_alive_interval: Option<Duration>, |
74 | pub(crate) keep_alive_timeout: Duration, |
75 | pub(crate) keep_alive_while_idle: bool, |
76 | pub(crate) max_concurrent_reset_streams: Option<usize>, |
77 | pub(crate) max_send_buffer_size: usize, |
78 | pub(crate) max_pending_accept_reset_streams: Option<usize>, |
79 | pub(crate) header_table_size: Option<u32>, |
80 | pub(crate) max_concurrent_streams: Option<u32>, |
81 | } |
82 | |
83 | impl Default for Config { |
84 | fn default() -> Config { |
85 | Config { |
86 | adaptive_window: false, |
87 | initial_conn_window_size: DEFAULT_CONN_WINDOW, |
88 | initial_stream_window_size: DEFAULT_STREAM_WINDOW, |
89 | initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS, |
90 | max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE), |
91 | max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE, |
92 | keep_alive_interval: None, |
93 | keep_alive_timeout: Duration::from_secs(20), |
94 | keep_alive_while_idle: false, |
95 | max_concurrent_reset_streams: None, |
96 | max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, |
97 | max_pending_accept_reset_streams: None, |
98 | header_table_size: None, |
99 | max_concurrent_streams: None, |
100 | } |
101 | } |
102 | } |
103 | |
104 | fn new_builder(config: &Config) -> Builder { |
105 | let mut builder = Builder::default(); |
106 | builder |
107 | .initial_max_send_streams(config.initial_max_send_streams) |
108 | .initial_window_size(config.initial_stream_window_size) |
109 | .initial_connection_window_size(config.initial_conn_window_size) |
110 | .max_header_list_size(config.max_header_list_size) |
111 | .max_send_buffer_size(config.max_send_buffer_size) |
112 | .enable_push(false); |
113 | if let Some(max) = config.max_frame_size { |
114 | builder.max_frame_size(max); |
115 | } |
116 | if let Some(max) = config.max_concurrent_reset_streams { |
117 | builder.max_concurrent_reset_streams(max); |
118 | } |
119 | if let Some(max) = config.max_pending_accept_reset_streams { |
120 | builder.max_pending_accept_reset_streams(max); |
121 | } |
122 | if let Some(size) = config.header_table_size { |
123 | builder.header_table_size(size); |
124 | } |
125 | if let Some(max) = config.max_concurrent_streams { |
126 | builder.max_concurrent_streams(max); |
127 | } |
128 | builder |
129 | } |
130 | |
131 | fn new_ping_config(config: &Config) -> ping::Config { |
132 | ping::Config { |
133 | bdp_initial_window: if config.adaptive_window { |
134 | Some(config.initial_stream_window_size) |
135 | } else { |
136 | None |
137 | }, |
138 | keep_alive_interval: config.keep_alive_interval, |
139 | keep_alive_timeout: config.keep_alive_timeout, |
140 | keep_alive_while_idle: config.keep_alive_while_idle, |
141 | } |
142 | } |
143 | |
144 | pub(crate) async fn handshake<T, B, E>( |
145 | io: T, |
146 | req_rx: ClientRx<B>, |
147 | config: &Config, |
148 | mut exec: E, |
149 | timer: Time, |
150 | ) -> crate::Result<ClientTask<B, E, T>> |
151 | where |
152 | T: Read + Write + Unpin, |
153 | B: Body + 'static, |
154 | B::Data: Send + 'static, |
155 | E: Http2ClientConnExec<B, T> + Unpin, |
156 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
157 | { |
158 | let (h2_tx, mut conn) = new_builder(config) |
159 | .handshake::<_, SendBuf<B::Data>>(Compat::new(io)) |
160 | .await |
161 | .map_err(crate::Error::new_h2)?; |
162 | |
163 | // An mpsc channel is used entirely to detect when the |
164 | // 'Client' has been dropped. This is to get around a bug |
165 | // in h2 where dropping all SendRequests won't notify a |
166 | // parked Connection. |
167 | let (conn_drop_ref, rx) = mpsc::channel(1); |
168 | let (cancel_tx, conn_eof) = oneshot::channel(); |
169 | |
170 | let conn_drop_rx = rx.into_future(); |
171 | |
172 | let ping_config = new_ping_config(config); |
173 | |
174 | let (conn, ping) = if ping_config.is_enabled() { |
175 | let pp = conn.ping_pong().expect("conn.ping_pong" ); |
176 | let (recorder, ponger) = ping::channel(pp, ping_config, timer); |
177 | |
178 | let conn: Conn<_, B> = Conn::new(ponger, conn); |
179 | (Either::Left(conn), recorder) |
180 | } else { |
181 | (Either::Right(conn), ping::disabled()) |
182 | }; |
183 | let conn: ConnMapErr<T, B> = ConnMapErr { |
184 | conn, |
185 | is_terminated: false, |
186 | }; |
187 | |
188 | exec.execute_h2_future(H2ClientFuture::Task { |
189 | task: ConnTask::new(conn, conn_drop_rx, cancel_tx), |
190 | }); |
191 | |
192 | Ok(ClientTask { |
193 | ping, |
194 | conn_drop_ref, |
195 | conn_eof, |
196 | executor: exec, |
197 | h2_tx, |
198 | req_rx, |
199 | fut_ctx: None, |
200 | marker: PhantomData, |
201 | }) |
202 | } |
203 | |
204 | pin_project! { |
205 | struct Conn<T, B> |
206 | where |
207 | B: Body, |
208 | { |
209 | #[pin] |
210 | ponger: Ponger, |
211 | #[pin] |
212 | conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>, |
213 | } |
214 | } |
215 | |
216 | impl<T, B> Conn<T, B> |
217 | where |
218 | B: Body, |
219 | T: Read + Write + Unpin, |
220 | { |
221 | fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self { |
222 | Conn { ponger, conn } |
223 | } |
224 | } |
225 | |
226 | impl<T, B> Future for Conn<T, B> |
227 | where |
228 | B: Body, |
229 | T: Read + Write + Unpin, |
230 | { |
231 | type Output = Result<(), h2::Error>; |
232 | |
233 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
234 | let mut this: Projection<'_, T, B> = self.project(); |
235 | match this.ponger.poll(cx) { |
236 | Poll::Ready(ping::Ponged::SizeUpdate(wnd: u32)) => { |
237 | this.conn.set_target_window_size(wnd); |
238 | this.conn.set_initial_window_size(wnd)?; |
239 | } |
240 | Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { |
241 | debug!("connection keep-alive timed out" ); |
242 | return Poll::Ready(Ok(())); |
243 | } |
244 | Poll::Pending => {} |
245 | } |
246 | |
247 | Pin::new(&mut this.conn).poll(cx) |
248 | } |
249 | } |
250 | |
251 | pin_project! { |
252 | struct ConnMapErr<T, B> |
253 | where |
254 | B: Body, |
255 | T: Read, |
256 | T: Write, |
257 | T: Unpin, |
258 | { |
259 | #[pin] |
260 | conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>, |
261 | #[pin] |
262 | is_terminated: bool, |
263 | } |
264 | } |
265 | |
266 | impl<T, B> Future for ConnMapErr<T, B> |
267 | where |
268 | B: Body, |
269 | T: Read + Write + Unpin, |
270 | { |
271 | type Output = Result<(), ()>; |
272 | |
273 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
274 | let mut this: Projection<'_, T, B> = self.project(); |
275 | |
276 | if *this.is_terminated { |
277 | return Poll::Pending; |
278 | } |
279 | let polled: Poll> = this.conn.poll(cx); |
280 | if polled.is_ready() { |
281 | *this.is_terminated = true; |
282 | } |
283 | polled.map_err(|_e: Error| { |
284 | debug!(error = %_e, "connection error" ); |
285 | }) |
286 | } |
287 | } |
288 | |
289 | impl<T, B> FusedFuture for ConnMapErr<T, B> |
290 | where |
291 | B: Body, |
292 | T: Read + Write + Unpin, |
293 | { |
294 | fn is_terminated(&self) -> bool { |
295 | self.is_terminated |
296 | } |
297 | } |
298 | |
299 | pin_project! { |
300 | pub struct ConnTask<T, B> |
301 | where |
302 | B: Body, |
303 | T: Read, |
304 | T: Write, |
305 | T: Unpin, |
306 | { |
307 | #[pin] |
308 | drop_rx: StreamFuture<Receiver<Infallible>>, |
309 | #[pin] |
310 | cancel_tx: Option<oneshot::Sender<Infallible>>, |
311 | #[pin] |
312 | conn: ConnMapErr<T, B>, |
313 | } |
314 | } |
315 | |
316 | impl<T, B> ConnTask<T, B> |
317 | where |
318 | B: Body, |
319 | T: Read + Write + Unpin, |
320 | { |
321 | fn new( |
322 | conn: ConnMapErr<T, B>, |
323 | drop_rx: StreamFuture<Receiver<Infallible>>, |
324 | cancel_tx: oneshot::Sender<Infallible>, |
325 | ) -> Self { |
326 | Self { |
327 | drop_rx, |
328 | cancel_tx: Some(cancel_tx), |
329 | conn, |
330 | } |
331 | } |
332 | } |
333 | |
334 | impl<T, B> Future for ConnTask<T, B> |
335 | where |
336 | B: Body, |
337 | T: Read + Write + Unpin, |
338 | { |
339 | type Output = (); |
340 | |
341 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
342 | let mut this: Projection<'_, T, B> = self.project(); |
343 | |
344 | if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() { |
345 | // ok or err, the `conn` has finished. |
346 | return Poll::Ready(()); |
347 | } |
348 | |
349 | if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() { |
350 | // mpsc has been dropped, hopefully polling |
351 | // the connection some more should start shutdown |
352 | // and then close. |
353 | trace!("send_request dropped, starting conn shutdown" ); |
354 | drop(this.cancel_tx.take().expect(msg:"ConnTask Future polled twice" )); |
355 | } |
356 | |
357 | Poll::Pending |
358 | } |
359 | } |
360 | |
361 | pin_project! { |
362 | #[project = H2ClientFutureProject] |
363 | pub enum H2ClientFuture<B, T> |
364 | where |
365 | B: http_body::Body, |
366 | B: 'static, |
367 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
368 | T: Read, |
369 | T: Write, |
370 | T: Unpin, |
371 | { |
372 | Pipe { |
373 | #[pin] |
374 | pipe: PipeMap<B>, |
375 | }, |
376 | Send { |
377 | #[pin] |
378 | send_when: SendWhen<B>, |
379 | }, |
380 | Task { |
381 | #[pin] |
382 | task: ConnTask<T, B>, |
383 | }, |
384 | } |
385 | } |
386 | |
387 | impl<B, T> Future for H2ClientFuture<B, T> |
388 | where |
389 | B: http_body::Body + 'static, |
390 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
391 | T: Read + Write + Unpin, |
392 | { |
393 | type Output = (); |
394 | |
395 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { |
396 | let this: H2ClientFutureProject<'_, …, …> = self.project(); |
397 | |
398 | match this { |
399 | H2ClientFutureProject::Pipe { pipe: Pin<&mut PipeMap> } => pipe.poll(cx), |
400 | H2ClientFutureProject::Send { send_when: Pin<&mut SendWhen> } => send_when.poll(cx), |
401 | H2ClientFutureProject::Task { task: Pin<&mut ConnTask> } => task.poll(cx), |
402 | } |
403 | } |
404 | } |
405 | |
406 | struct FutCtx<B> |
407 | where |
408 | B: Body, |
409 | { |
410 | is_connect: bool, |
411 | eos: bool, |
412 | fut: ResponseFuture, |
413 | body_tx: SendStream<SendBuf<B::Data>>, |
414 | body: B, |
415 | cb: Callback<Request<B>, Response<IncomingBody>>, |
416 | } |
417 | |
418 | impl<B: Body> Unpin for FutCtx<B> {} |
419 | |
420 | pub(crate) struct ClientTask<B, E, T> |
421 | where |
422 | B: Body, |
423 | E: Unpin, |
424 | { |
425 | ping: ping::Recorder, |
426 | conn_drop_ref: ConnDropRef, |
427 | conn_eof: ConnEof, |
428 | executor: E, |
429 | h2_tx: SendRequest<SendBuf<B::Data>>, |
430 | req_rx: ClientRx<B>, |
431 | fut_ctx: Option<FutCtx<B>>, |
432 | marker: PhantomData<T>, |
433 | } |
434 | |
435 | impl<B, E, T> ClientTask<B, E, T> |
436 | where |
437 | B: Body + 'static, |
438 | E: Http2ClientConnExec<B, T> + Unpin, |
439 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
440 | T: Read + Write + Unpin, |
441 | { |
442 | pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { |
443 | self.h2_tx.is_extended_connect_protocol_enabled() |
444 | } |
445 | } |
446 | |
447 | pin_project! { |
448 | pub struct PipeMap<S> |
449 | where |
450 | S: Body, |
451 | { |
452 | #[pin] |
453 | pipe: PipeToSendStream<S>, |
454 | #[pin] |
455 | conn_drop_ref: Option<Sender<Infallible>>, |
456 | #[pin] |
457 | ping: Option<Recorder>, |
458 | } |
459 | } |
460 | |
461 | impl<B> Future for PipeMap<B> |
462 | where |
463 | B: http_body::Body, |
464 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
465 | { |
466 | type Output = (); |
467 | |
468 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { |
469 | let mut this: Projection<'_, B> = self.project(); |
470 | |
471 | match this.pipe.poll_unpin(cx) { |
472 | Poll::Ready(result: Result<(), Error>) => { |
473 | if let Err(_e: Error) = result { |
474 | debug!("client request body error: {}" , _e); |
475 | } |
476 | drop(this.conn_drop_ref.take().expect(msg:"Future polled twice" )); |
477 | drop(this.ping.take().expect(msg:"Future polled twice" )); |
478 | return Poll::Ready(()); |
479 | } |
480 | Poll::Pending => (), |
481 | }; |
482 | Poll::Pending |
483 | } |
484 | } |
485 | |
486 | impl<B, E, T> ClientTask<B, E, T> |
487 | where |
488 | B: Body + 'static + Unpin, |
489 | B::Data: Send, |
490 | E: Http2ClientConnExec<B, T> + Unpin, |
491 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
492 | T: Read + Write + Unpin, |
493 | { |
494 | fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) { |
495 | let ping = self.ping.clone(); |
496 | |
497 | let send_stream = if !f.is_connect { |
498 | if !f.eos { |
499 | let mut pipe = PipeToSendStream::new(f.body, f.body_tx); |
500 | |
501 | // eagerly see if the body pipe is ready and |
502 | // can thus skip allocating in the executor |
503 | match Pin::new(&mut pipe).poll(cx) { |
504 | Poll::Ready(_) => (), |
505 | Poll::Pending => { |
506 | let conn_drop_ref = self.conn_drop_ref.clone(); |
507 | // keep the ping recorder's knowledge of an |
508 | // "open stream" alive while this body is |
509 | // still sending... |
510 | let ping = ping.clone(); |
511 | |
512 | let pipe = PipeMap { |
513 | pipe, |
514 | conn_drop_ref: Some(conn_drop_ref), |
515 | ping: Some(ping), |
516 | }; |
517 | // Clear send task |
518 | self.executor |
519 | .execute_h2_future(H2ClientFuture::Pipe { pipe }); |
520 | } |
521 | } |
522 | } |
523 | |
524 | None |
525 | } else { |
526 | Some(f.body_tx) |
527 | }; |
528 | |
529 | self.executor.execute_h2_future(H2ClientFuture::Send { |
530 | send_when: SendWhen { |
531 | when: ResponseFutMap { |
532 | fut: f.fut, |
533 | ping: Some(ping), |
534 | send_stream: Some(send_stream), |
535 | }, |
536 | call_back: Some(f.cb), |
537 | }, |
538 | }); |
539 | } |
540 | } |
541 | |
542 | pin_project! { |
543 | pub(crate) struct ResponseFutMap<B> |
544 | where |
545 | B: Body, |
546 | B: 'static, |
547 | { |
548 | #[pin] |
549 | fut: ResponseFuture, |
550 | #[pin] |
551 | ping: Option<Recorder>, |
552 | #[pin] |
553 | send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>, |
554 | } |
555 | } |
556 | |
557 | impl<B> Future for ResponseFutMap<B> |
558 | where |
559 | B: Body + 'static, |
560 | { |
561 | type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>; |
562 | |
563 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
564 | let mut this = self.project(); |
565 | |
566 | let result = ready!(this.fut.poll(cx)); |
567 | |
568 | let ping = this.ping.take().expect("Future polled twice" ); |
569 | let send_stream = this.send_stream.take().expect("Future polled twice" ); |
570 | |
571 | match result { |
572 | Ok(res) => { |
573 | // record that we got the response headers |
574 | ping.record_non_data(); |
575 | |
576 | let content_length = headers::content_length_parse_all(res.headers()); |
577 | if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) { |
578 | if content_length.map_or(false, |len| len != 0) { |
579 | warn!("h2 connect response with non-zero body not supported" ); |
580 | |
581 | send_stream.send_reset(h2::Reason::INTERNAL_ERROR); |
582 | return Poll::Ready(Err(( |
583 | crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), |
584 | None::<Request<B>>, |
585 | ))); |
586 | } |
587 | let (parts, recv_stream) = res.into_parts(); |
588 | let mut res = Response::from_parts(parts, IncomingBody::empty()); |
589 | |
590 | let (pending, on_upgrade) = crate::upgrade::pending(); |
591 | let io = H2Upgraded { |
592 | ping, |
593 | send_stream: unsafe { UpgradedSendStream::new(send_stream) }, |
594 | recv_stream, |
595 | buf: Bytes::new(), |
596 | }; |
597 | let upgraded = Upgraded::new(io, Bytes::new()); |
598 | |
599 | pending.fulfill(upgraded); |
600 | res.extensions_mut().insert(on_upgrade); |
601 | |
602 | Poll::Ready(Ok(res)) |
603 | } else { |
604 | let res = res.map(|stream| { |
605 | let ping = ping.for_stream(&stream); |
606 | IncomingBody::h2(stream, content_length.into(), ping) |
607 | }); |
608 | Poll::Ready(Ok(res)) |
609 | } |
610 | } |
611 | Err(err) => { |
612 | ping.ensure_not_timed_out().map_err(|e| (e, None))?; |
613 | |
614 | debug!("client response error: {}" , err); |
615 | Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>))) |
616 | } |
617 | } |
618 | } |
619 | } |
620 | |
621 | impl<B, E, T> Future for ClientTask<B, E, T> |
622 | where |
623 | B: Body + 'static + Unpin, |
624 | B::Data: Send, |
625 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
626 | E: Http2ClientConnExec<B, T> + Unpin, |
627 | T: Read + Write + Unpin, |
628 | { |
629 | type Output = crate::Result<Dispatched>; |
630 | |
631 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
632 | loop { |
633 | match ready!(self.h2_tx.poll_ready(cx)) { |
634 | Ok(()) => (), |
635 | Err(err) => { |
636 | self.ping.ensure_not_timed_out()?; |
637 | return if err.reason() == Some(::h2::Reason::NO_ERROR) { |
638 | trace!("connection gracefully shutdown" ); |
639 | Poll::Ready(Ok(Dispatched::Shutdown)) |
640 | } else { |
641 | Poll::Ready(Err(crate::Error::new_h2(err))) |
642 | }; |
643 | } |
644 | }; |
645 | |
646 | // If we were waiting on pending open |
647 | // continue where we left off. |
648 | if let Some(f) = self.fut_ctx.take() { |
649 | self.poll_pipe(f, cx); |
650 | continue; |
651 | } |
652 | |
653 | match self.req_rx.poll_recv(cx) { |
654 | Poll::Ready(Some((req, cb))) => { |
655 | // check that future hasn't been canceled already |
656 | if cb.is_canceled() { |
657 | trace!("request callback is canceled" ); |
658 | continue; |
659 | } |
660 | let (head, body) = req.into_parts(); |
661 | let mut req = ::http::Request::from_parts(head, ()); |
662 | super::strip_connection_headers(req.headers_mut(), true); |
663 | if let Some(len) = body.size_hint().exact() { |
664 | if len != 0 || headers::method_has_defined_payload_semantics(req.method()) { |
665 | headers::set_content_length_if_missing(req.headers_mut(), len); |
666 | } |
667 | } |
668 | |
669 | let is_connect = req.method() == Method::CONNECT; |
670 | let eos = body.is_end_stream(); |
671 | |
672 | if is_connect |
673 | && headers::content_length_parse_all(req.headers()) |
674 | .map_or(false, |len| len != 0) |
675 | { |
676 | warn!("h2 connect request with non-zero body not supported" ); |
677 | cb.send(Err(TrySendError { |
678 | error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), |
679 | message: None, |
680 | })); |
681 | continue; |
682 | } |
683 | |
684 | if let Some(protocol) = req.extensions_mut().remove::<Protocol>() { |
685 | req.extensions_mut().insert(protocol.into_inner()); |
686 | } |
687 | |
688 | let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { |
689 | Ok(ok) => ok, |
690 | Err(err) => { |
691 | debug!("client send request error: {}" , err); |
692 | cb.send(Err(TrySendError { |
693 | error: crate::Error::new_h2(err), |
694 | message: None, |
695 | })); |
696 | continue; |
697 | } |
698 | }; |
699 | |
700 | let f = FutCtx { |
701 | is_connect, |
702 | eos, |
703 | fut, |
704 | body_tx, |
705 | body, |
706 | cb, |
707 | }; |
708 | |
709 | // Check poll_ready() again. |
710 | // If the call to send_request() resulted in the new stream being pending open |
711 | // we have to wait for the open to complete before accepting new requests. |
712 | match self.h2_tx.poll_ready(cx) { |
713 | Poll::Pending => { |
714 | // Save Context |
715 | self.fut_ctx = Some(f); |
716 | return Poll::Pending; |
717 | } |
718 | Poll::Ready(Ok(())) => (), |
719 | Poll::Ready(Err(err)) => { |
720 | f.cb.send(Err(TrySendError { |
721 | error: crate::Error::new_h2(err), |
722 | message: None, |
723 | })); |
724 | continue; |
725 | } |
726 | } |
727 | self.poll_pipe(f, cx); |
728 | continue; |
729 | } |
730 | |
731 | Poll::Ready(None) => { |
732 | trace!("client::dispatch::Sender dropped" ); |
733 | return Poll::Ready(Ok(Dispatched::Shutdown)); |
734 | } |
735 | |
736 | Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { |
737 | // As of Rust 1.82, this pattern is no longer needed, and emits a warning. |
738 | // But we cannot remove it as long as MSRV is less than that. |
739 | #[allow (unused)] |
740 | Ok(never) => match never {}, |
741 | Err(_conn_is_eof) => { |
742 | trace!("connection task is closed, closing dispatch task" ); |
743 | return Poll::Ready(Ok(Dispatched::Shutdown)); |
744 | } |
745 | }, |
746 | } |
747 | } |
748 | } |
749 | } |
750 | |