1 | //! HTTP/1 client connections |
2 | |
3 | use std::error::Error as StdError; |
4 | use std::fmt; |
5 | use std::future::Future; |
6 | use std::pin::Pin; |
7 | use std::task::{Context, Poll}; |
8 | |
9 | use crate::rt::{Read, Write}; |
10 | use bytes::Bytes; |
11 | use futures_util::ready; |
12 | use http::{Request, Response}; |
13 | use httparse::ParserConfig; |
14 | |
15 | use super::super::dispatch::{self, TrySendError}; |
16 | use crate::body::{Body, Incoming as IncomingBody}; |
17 | use crate::proto; |
18 | |
19 | type Dispatcher<T, B> = |
20 | proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>; |
21 | |
22 | /// The sender side of an established connection. |
23 | pub struct SendRequest<B> { |
24 | dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>, |
25 | } |
26 | |
27 | /// Deconstructed parts of a `Connection`. |
28 | /// |
29 | /// This allows taking apart a `Connection` at a later time, in order to |
30 | /// reclaim the IO object, and additional related pieces. |
31 | #[derive (Debug)] |
32 | #[non_exhaustive ] |
33 | pub struct Parts<T> { |
34 | /// The original IO object used in the handshake. |
35 | pub io: T, |
36 | /// A buffer of bytes that have been read but not processed as HTTP. |
37 | /// |
38 | /// For instance, if the `Connection` is used for an HTTP upgrade request, |
39 | /// it is possible the server sent back the first bytes of the new protocol |
40 | /// along with the response upgrade. |
41 | /// |
42 | /// You will want to check for any existing bytes if you plan to continue |
43 | /// communicating on the IO object. |
44 | pub read_buf: Bytes, |
45 | } |
46 | |
47 | /// A future that processes all HTTP state for the IO object. |
48 | /// |
49 | /// In most cases, this should just be spawned into an executor, so that it |
50 | /// can process incoming and outgoing messages, notice hangups, and the like. |
51 | /// |
52 | /// Instances of this type are typically created via the [`handshake`] function |
53 | #[must_use = "futures do nothing unless polled" ] |
54 | pub struct Connection<T, B> |
55 | where |
56 | T: Read + Write, |
57 | B: Body + 'static, |
58 | { |
59 | inner: Dispatcher<T, B>, |
60 | } |
61 | |
62 | impl<T, B> Connection<T, B> |
63 | where |
64 | T: Read + Write + Unpin, |
65 | B: Body + 'static, |
66 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
67 | { |
68 | /// Return the inner IO object, and additional information. |
69 | /// |
70 | /// Only works for HTTP/1 connections. HTTP/2 connections will panic. |
71 | pub fn into_parts(self) -> Parts<T> { |
72 | let (io, read_buf, _) = self.inner.into_inner(); |
73 | Parts { io, read_buf } |
74 | } |
75 | |
76 | /// Poll the connection for completion, but without calling `shutdown` |
77 | /// on the underlying IO. |
78 | /// |
79 | /// This is useful to allow running a connection while doing an HTTP |
80 | /// upgrade. Once the upgrade is completed, the connection would be "done", |
81 | /// but it is not desired to actually shutdown the IO object. Instead you |
82 | /// would take it back using `into_parts`. |
83 | /// |
84 | /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) |
85 | /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) |
86 | /// to work with this function; or use the `without_shutdown` wrapper. |
87 | pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
88 | self.inner.poll_without_shutdown(cx) |
89 | } |
90 | |
91 | /// Prevent shutdown of the underlying IO object at the end of service the request, |
92 | /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. |
93 | pub async fn without_shutdown(self) -> crate::Result<Parts<T>> { |
94 | let mut conn = Some(self); |
95 | futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> { |
96 | ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; |
97 | Poll::Ready(Ok(conn.take().unwrap().into_parts())) |
98 | }) |
99 | .await |
100 | } |
101 | } |
102 | |
103 | /// A builder to configure an HTTP connection. |
104 | /// |
105 | /// After setting options, the builder is used to create a handshake future. |
106 | /// |
107 | /// **Note**: The default values of options are *not considered stable*. They |
108 | /// are subject to change at any time. |
109 | #[derive (Clone, Debug)] |
110 | pub struct Builder { |
111 | h09_responses: bool, |
112 | h1_parser_config: ParserConfig, |
113 | h1_writev: Option<bool>, |
114 | h1_title_case_headers: bool, |
115 | h1_preserve_header_case: bool, |
116 | h1_max_headers: Option<usize>, |
117 | #[cfg (feature = "ffi" )] |
118 | h1_preserve_header_order: bool, |
119 | h1_read_buf_exact_size: Option<usize>, |
120 | h1_max_buf_size: Option<usize>, |
121 | } |
122 | |
123 | /// Returns a handshake future over some IO. |
124 | /// |
125 | /// This is a shortcut for `Builder::new().handshake(io)`. |
126 | /// See [`client::conn`](crate::client::conn) for more. |
127 | pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)> |
128 | where |
129 | T: Read + Write + Unpin, |
130 | B: Body + 'static, |
131 | B::Data: Send, |
132 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
133 | { |
134 | Builder::new().handshake(io).await |
135 | } |
136 | |
137 | // ===== impl SendRequest |
138 | |
139 | impl<B> SendRequest<B> { |
140 | /// Polls to determine whether this sender can be used yet for a request. |
141 | /// |
142 | /// If the associated connection is closed, this returns an Error. |
143 | pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
144 | self.dispatch.poll_ready(cx) |
145 | } |
146 | |
147 | /// Waits until the dispatcher is ready |
148 | /// |
149 | /// If the associated connection is closed, this returns an Error. |
150 | pub async fn ready(&mut self) -> crate::Result<()> { |
151 | futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await |
152 | } |
153 | |
154 | /// Checks if the connection is currently ready to send a request. |
155 | /// |
156 | /// # Note |
157 | /// |
158 | /// This is mostly a hint. Due to inherent latency of networks, it is |
159 | /// possible that even after checking this is ready, sending a request |
160 | /// may still fail because the connection was closed in the meantime. |
161 | pub fn is_ready(&self) -> bool { |
162 | self.dispatch.is_ready() |
163 | } |
164 | |
165 | /// Checks if the connection side has been closed. |
166 | pub fn is_closed(&self) -> bool { |
167 | self.dispatch.is_closed() |
168 | } |
169 | } |
170 | |
171 | impl<B> SendRequest<B> |
172 | where |
173 | B: Body + 'static, |
174 | { |
175 | /// Sends a `Request` on the associated connection. |
176 | /// |
177 | /// Returns a future that if successful, yields the `Response`. |
178 | /// |
179 | /// `req` must have a `Host` header. |
180 | /// |
181 | /// # Uri |
182 | /// |
183 | /// The `Uri` of the request is serialized as-is. |
184 | /// |
185 | /// - Usually you want origin-form (`/path?query`). |
186 | /// - For sending to an HTTP proxy, you want to send in absolute-form |
187 | /// (`https://hyper.rs/guides`). |
188 | /// |
189 | /// This is however not enforced or validated and it is up to the user |
190 | /// of this method to ensure the `Uri` is correct for their intended purpose. |
191 | pub fn send_request( |
192 | &mut self, |
193 | req: Request<B>, |
194 | ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> { |
195 | let sent = self.dispatch.send(req); |
196 | |
197 | async move { |
198 | match sent { |
199 | Ok(rx) => match rx.await { |
200 | Ok(Ok(resp)) => Ok(resp), |
201 | Ok(Err(err)) => Err(err), |
202 | // this is definite bug if it happens, but it shouldn't happen! |
203 | Err(_canceled) => panic!("dispatch dropped without returning error" ), |
204 | }, |
205 | Err(_req) => { |
206 | debug!("connection was not ready" ); |
207 | Err(crate::Error::new_canceled().with("connection was not ready" )) |
208 | } |
209 | } |
210 | } |
211 | } |
212 | |
213 | /// Sends a `Request` on the associated connection. |
214 | /// |
215 | /// Returns a future that if successful, yields the `Response`. |
216 | /// |
217 | /// # Error |
218 | /// |
219 | /// If there was an error before trying to serialize the request to the |
220 | /// connection, the message will be returned as part of this error. |
221 | pub fn try_send_request( |
222 | &mut self, |
223 | req: Request<B>, |
224 | ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> { |
225 | let sent = self.dispatch.try_send(req); |
226 | async move { |
227 | match sent { |
228 | Ok(rx) => match rx.await { |
229 | Ok(Ok(res)) => Ok(res), |
230 | Ok(Err(err)) => Err(err), |
231 | // this is definite bug if it happens, but it shouldn't happen! |
232 | Err(_) => panic!("dispatch dropped without returning error" ), |
233 | }, |
234 | Err(req) => { |
235 | debug!("connection was not ready" ); |
236 | let error = crate::Error::new_canceled().with("connection was not ready" ); |
237 | Err(TrySendError { |
238 | error, |
239 | message: Some(req), |
240 | }) |
241 | } |
242 | } |
243 | } |
244 | } |
245 | } |
246 | |
247 | impl<B> fmt::Debug for SendRequest<B> { |
248 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
249 | f.debug_struct(name:"SendRequest" ).finish() |
250 | } |
251 | } |
252 | |
253 | // ===== impl Connection |
254 | |
255 | impl<T, B> Connection<T, B> |
256 | where |
257 | T: Read + Write + Unpin + Send, |
258 | B: Body + 'static, |
259 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
260 | { |
261 | /// Enable this connection to support higher-level HTTP upgrades. |
262 | /// |
263 | /// See [the `upgrade` module](crate::upgrade) for more. |
264 | pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> { |
265 | upgrades::UpgradeableConnection { inner: Some(self) } |
266 | } |
267 | } |
268 | |
269 | impl<T, B> fmt::Debug for Connection<T, B> |
270 | where |
271 | T: Read + Write + fmt::Debug, |
272 | B: Body + 'static, |
273 | { |
274 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
275 | f.debug_struct(name:"Connection" ).finish() |
276 | } |
277 | } |
278 | |
279 | impl<T, B> Future for Connection<T, B> |
280 | where |
281 | T: Read + Write + Unpin, |
282 | B: Body + 'static, |
283 | B::Data: Send, |
284 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
285 | { |
286 | type Output = crate::Result<()>; |
287 | |
288 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
289 | match ready!(Pin::new(&mut self.inner).poll(cx))? { |
290 | proto::Dispatched::Shutdown => Poll::Ready(Ok(())), |
291 | proto::Dispatched::Upgrade(pending: Pending) => { |
292 | // With no `Send` bound on `I`, we can't try to do |
293 | // upgrades here. In case a user was trying to use |
294 | // `upgrade` with this API, send a special |
295 | // error letting them know about that. |
296 | pending.manual(); |
297 | Poll::Ready(Ok(())) |
298 | } |
299 | } |
300 | } |
301 | } |
302 | |
303 | // ===== impl Builder |
304 | |
305 | impl Builder { |
306 | /// Creates a new connection builder. |
307 | #[inline ] |
308 | pub fn new() -> Builder { |
309 | Builder { |
310 | h09_responses: false, |
311 | h1_writev: None, |
312 | h1_read_buf_exact_size: None, |
313 | h1_parser_config: Default::default(), |
314 | h1_title_case_headers: false, |
315 | h1_preserve_header_case: false, |
316 | h1_max_headers: None, |
317 | #[cfg (feature = "ffi" )] |
318 | h1_preserve_header_order: false, |
319 | h1_max_buf_size: None, |
320 | } |
321 | } |
322 | |
323 | /// Set whether HTTP/0.9 responses should be tolerated. |
324 | /// |
325 | /// Default is false. |
326 | pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder { |
327 | self.h09_responses = enabled; |
328 | self |
329 | } |
330 | |
331 | /// Set whether HTTP/1 connections will accept spaces between header names |
332 | /// and the colon that follow them in responses. |
333 | /// |
334 | /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has |
335 | /// to say about it: |
336 | /// |
337 | /// > No whitespace is allowed between the header field-name and colon. In |
338 | /// > the past, differences in the handling of such whitespace have led to |
339 | /// > security vulnerabilities in request routing and response handling. A |
340 | /// > server MUST reject any received request message that contains |
341 | /// > whitespace between a header field-name and colon with a response code |
342 | /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a |
343 | /// > response message before forwarding the message downstream. |
344 | /// |
345 | /// Default is false. |
346 | /// |
347 | /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 |
348 | pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder { |
349 | self.h1_parser_config |
350 | .allow_spaces_after_header_name_in_responses(enabled); |
351 | self |
352 | } |
353 | |
354 | /// Set whether HTTP/1 connections will accept obsolete line folding for |
355 | /// header values. |
356 | /// |
357 | /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when |
358 | /// parsing. |
359 | /// |
360 | /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has |
361 | /// to say about it: |
362 | /// |
363 | /// > A server that receives an obs-fold in a request message that is not |
364 | /// > within a message/http container MUST either reject the message by |
365 | /// > sending a 400 (Bad Request), preferably with a representation |
366 | /// > explaining that obsolete line folding is unacceptable, or replace |
367 | /// > each received obs-fold with one or more SP octets prior to |
368 | /// > interpreting the field value or forwarding the message downstream. |
369 | /// |
370 | /// > A proxy or gateway that receives an obs-fold in a response message |
371 | /// > that is not within a message/http container MUST either discard the |
372 | /// > message and replace it with a 502 (Bad Gateway) response, preferably |
373 | /// > with a representation explaining that unacceptable line folding was |
374 | /// > received, or replace each received obs-fold with one or more SP |
375 | /// > octets prior to interpreting the field value or forwarding the |
376 | /// > message downstream. |
377 | /// |
378 | /// > A user agent that receives an obs-fold in a response message that is |
379 | /// > not within a message/http container MUST replace each received |
380 | /// > obs-fold with one or more SP octets prior to interpreting the field |
381 | /// > value. |
382 | /// |
383 | /// Default is false. |
384 | /// |
385 | /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 |
386 | pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder { |
387 | self.h1_parser_config |
388 | .allow_obsolete_multiline_headers_in_responses(enabled); |
389 | self |
390 | } |
391 | |
392 | /// Set whether HTTP/1 connections will silently ignored malformed header lines. |
393 | /// |
394 | /// If this is enabled and a header line does not start with a valid header |
395 | /// name, or does not include a colon at all, the line will be silently ignored |
396 | /// and no error will be reported. |
397 | /// |
398 | /// Default is false. |
399 | pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder { |
400 | self.h1_parser_config |
401 | .ignore_invalid_headers_in_responses(enabled); |
402 | self |
403 | } |
404 | |
405 | /// Set whether HTTP/1 connections should try to use vectored writes, |
406 | /// or always flatten into a single buffer. |
407 | /// |
408 | /// Note that setting this to false may mean more copies of body data, |
409 | /// but may also improve performance when an IO transport doesn't |
410 | /// support vectored writes well, such as most TLS implementations. |
411 | /// |
412 | /// Setting this to true will force hyper to use queued strategy |
413 | /// which may eliminate unnecessary cloning on some TLS backends |
414 | /// |
415 | /// Default is `auto`. In this mode hyper will try to guess which |
416 | /// mode to use |
417 | pub fn writev(&mut self, enabled: bool) -> &mut Builder { |
418 | self.h1_writev = Some(enabled); |
419 | self |
420 | } |
421 | |
422 | /// Set whether HTTP/1 connections will write header names as title case at |
423 | /// the socket level. |
424 | /// |
425 | /// Default is false. |
426 | pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder { |
427 | self.h1_title_case_headers = enabled; |
428 | self |
429 | } |
430 | |
431 | /// Set whether to support preserving original header cases. |
432 | /// |
433 | /// Currently, this will record the original cases received, and store them |
434 | /// in a private extension on the `Response`. It will also look for and use |
435 | /// such an extension in any provided `Request`. |
436 | /// |
437 | /// Since the relevant extension is still private, there is no way to |
438 | /// interact with the original cases. The only effect this can have now is |
439 | /// to forward the cases in a proxy-like fashion. |
440 | /// |
441 | /// Default is false. |
442 | pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder { |
443 | self.h1_preserve_header_case = enabled; |
444 | self |
445 | } |
446 | |
447 | /// Set the maximum number of headers. |
448 | /// |
449 | /// When a response is received, the parser will reserve a buffer to store headers for optimal |
450 | /// performance. |
451 | /// |
452 | /// If client receives more headers than the buffer size, the error "message header too large" |
453 | /// is returned. |
454 | /// |
455 | /// Note that headers is allocated on the stack by default, which has higher performance. After |
456 | /// setting this value, headers will be allocated in heap memory, that is, heap memory |
457 | /// allocation will occur for each response, and there will be a performance drop of about 5%. |
458 | /// |
459 | /// Default is 100. |
460 | pub fn max_headers(&mut self, val: usize) -> &mut Self { |
461 | self.h1_max_headers = Some(val); |
462 | self |
463 | } |
464 | |
465 | /// Set whether to support preserving original header order. |
466 | /// |
467 | /// Currently, this will record the order in which headers are received, and store this |
468 | /// ordering in a private extension on the `Response`. It will also look for and use |
469 | /// such an extension in any provided `Request`. |
470 | /// |
471 | /// Default is false. |
472 | #[cfg (feature = "ffi" )] |
473 | pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder { |
474 | self.h1_preserve_header_order = enabled; |
475 | self |
476 | } |
477 | |
478 | /// Sets the exact size of the read buffer to *always* use. |
479 | /// |
480 | /// Note that setting this option unsets the `max_buf_size` option. |
481 | /// |
482 | /// Default is an adaptive read buffer. |
483 | pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder { |
484 | self.h1_read_buf_exact_size = sz; |
485 | self.h1_max_buf_size = None; |
486 | self |
487 | } |
488 | |
489 | /// Set the maximum buffer size for the connection. |
490 | /// |
491 | /// Default is ~400kb. |
492 | /// |
493 | /// Note that setting this option unsets the `read_exact_buf_size` option. |
494 | /// |
495 | /// # Panics |
496 | /// |
497 | /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. |
498 | pub fn max_buf_size(&mut self, max: usize) -> &mut Self { |
499 | assert!( |
500 | max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, |
501 | "the max_buf_size cannot be smaller than the minimum that h1 specifies." |
502 | ); |
503 | |
504 | self.h1_max_buf_size = Some(max); |
505 | self.h1_read_buf_exact_size = None; |
506 | self |
507 | } |
508 | |
509 | /// Constructs a connection with the configured options and IO. |
510 | /// See [`client::conn`](crate::client::conn) for more. |
511 | /// |
512 | /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will |
513 | /// do nothing. |
514 | pub fn handshake<T, B>( |
515 | &self, |
516 | io: T, |
517 | ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>> |
518 | where |
519 | T: Read + Write + Unpin, |
520 | B: Body + 'static, |
521 | B::Data: Send, |
522 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
523 | { |
524 | let opts = self.clone(); |
525 | |
526 | async move { |
527 | trace!("client handshake HTTP/1" ); |
528 | |
529 | let (tx, rx) = dispatch::channel(); |
530 | let mut conn = proto::Conn::new(io); |
531 | conn.set_h1_parser_config(opts.h1_parser_config); |
532 | if let Some(writev) = opts.h1_writev { |
533 | if writev { |
534 | conn.set_write_strategy_queue(); |
535 | } else { |
536 | conn.set_write_strategy_flatten(); |
537 | } |
538 | } |
539 | if opts.h1_title_case_headers { |
540 | conn.set_title_case_headers(); |
541 | } |
542 | if opts.h1_preserve_header_case { |
543 | conn.set_preserve_header_case(); |
544 | } |
545 | if let Some(max_headers) = opts.h1_max_headers { |
546 | conn.set_http1_max_headers(max_headers); |
547 | } |
548 | #[cfg (feature = "ffi" )] |
549 | if opts.h1_preserve_header_order { |
550 | conn.set_preserve_header_order(); |
551 | } |
552 | |
553 | if opts.h09_responses { |
554 | conn.set_h09_responses(); |
555 | } |
556 | |
557 | if let Some(sz) = opts.h1_read_buf_exact_size { |
558 | conn.set_read_buf_exact_size(sz); |
559 | } |
560 | if let Some(max) = opts.h1_max_buf_size { |
561 | conn.set_max_buf_size(max); |
562 | } |
563 | let cd = proto::h1::dispatch::Client::new(rx); |
564 | let proto = proto::h1::Dispatcher::new(cd, conn); |
565 | |
566 | Ok((SendRequest { dispatch: tx }, Connection { inner: proto })) |
567 | } |
568 | } |
569 | } |
570 | |
571 | mod upgrades { |
572 | use crate::upgrade::Upgraded; |
573 | |
574 | use super::*; |
575 | |
576 | // A future binding a connection with a Service with Upgrade support. |
577 | // |
578 | // This type is unnameable outside the crate. |
579 | #[must_use = "futures do nothing unless polled" ] |
580 | #[allow (missing_debug_implementations)] |
581 | pub struct UpgradeableConnection<T, B> |
582 | where |
583 | T: Read + Write + Unpin + Send + 'static, |
584 | B: Body + 'static, |
585 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
586 | { |
587 | pub(super) inner: Option<Connection<T, B>>, |
588 | } |
589 | |
590 | impl<I, B> Future for UpgradeableConnection<I, B> |
591 | where |
592 | I: Read + Write + Unpin + Send + 'static, |
593 | B: Body + 'static, |
594 | B::Data: Send, |
595 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
596 | { |
597 | type Output = crate::Result<()>; |
598 | |
599 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
600 | match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) { |
601 | Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())), |
602 | Ok(proto::Dispatched::Upgrade(pending)) => { |
603 | let Parts { io, read_buf } = self.inner.take().unwrap().into_parts(); |
604 | pending.fulfill(Upgraded::new(io, read_buf)); |
605 | Poll::Ready(Ok(())) |
606 | } |
607 | Err(e) => Poll::Ready(Err(e)), |
608 | } |
609 | } |
610 | } |
611 | } |
612 | |