1 | //! Utilities used to interact with the Tower ecosystem. |
2 | //! |
3 | //! This module provides `Connect` which hook-ins into the Tower ecosystem. |
4 | |
5 | use std::error::Error as StdError; |
6 | use std::future::Future; |
7 | use std::marker::PhantomData; |
8 | |
9 | use tracing::debug; |
10 | |
11 | use super::conn::{Builder, SendRequest}; |
12 | use crate::{ |
13 | body::HttpBody, |
14 | common::{task, Pin, Poll}, |
15 | service::{MakeConnection, Service}, |
16 | }; |
17 | |
18 | /// Creates a connection via `SendRequest`. |
19 | /// |
20 | /// This accepts a `hyper::client::conn::Builder` and provides |
21 | /// a `MakeService` implementation to create connections from some |
22 | /// target `T`. |
23 | #[derive (Debug)] |
24 | pub struct Connect<C, B, T> { |
25 | inner: C, |
26 | builder: Builder, |
27 | _pd: PhantomData<fn(T, B)>, |
28 | } |
29 | |
30 | impl<C, B, T> Connect<C, B, T> { |
31 | /// Create a new `Connect` with some inner connector `C` and a connection |
32 | /// builder. |
33 | pub fn new(inner: C, builder: Builder) -> Self { |
34 | Self { |
35 | inner, |
36 | builder, |
37 | _pd: PhantomData, |
38 | } |
39 | } |
40 | } |
41 | |
42 | impl<C, B, T> Service<T> for Connect<C, B, T> |
43 | where |
44 | C: MakeConnection<T>, |
45 | C::Connection: Unpin + Send + 'static, |
46 | C::Future: Send + 'static, |
47 | C::Error: Into<Box<dyn StdError + Send + Sync>> + Send, |
48 | B: HttpBody + Unpin + Send + 'static, |
49 | B::Data: Send + Unpin, |
50 | B::Error: Into<Box<dyn StdError + Send + Sync>>, |
51 | { |
52 | type Response = SendRequest<B>; |
53 | type Error = crate::Error; |
54 | type Future = |
55 | Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; |
56 | |
57 | fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { |
58 | self.inner |
59 | .poll_ready(cx) |
60 | .map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into())) |
61 | } |
62 | |
63 | fn call(&mut self, req: T) -> Self::Future { |
64 | let builder = self.builder.clone(); |
65 | let io = self.inner.make_connection(req); |
66 | |
67 | let fut = async move { |
68 | match io.await { |
69 | Ok(io) => match builder.handshake(io).await { |
70 | Ok((sr, conn)) => { |
71 | builder.exec.execute(async move { |
72 | if let Err(e) = conn.await { |
73 | debug!("connection error: {:?}" , e); |
74 | } |
75 | }); |
76 | Ok(sr) |
77 | } |
78 | Err(e) => Err(e), |
79 | }, |
80 | Err(e) => { |
81 | let err = crate::Error::new(crate::error::Kind::Connect).with(e.into()); |
82 | Err(err) |
83 | } |
84 | } |
85 | }; |
86 | |
87 | Box::pin(fut) |
88 | } |
89 | } |
90 | |