1 | use std::fmt; |
2 | use std::future::Future; |
3 | use std::pin::Pin; |
4 | use std::sync::Arc; |
5 | |
6 | #[cfg (all(feature = "server" , any(feature = "http1" , feature = "http2" )))] |
7 | use crate::body::Body; |
8 | #[cfg (feature = "server" )] |
9 | use crate::body::HttpBody; |
10 | #[cfg (all(feature = "http2" , feature = "server" ))] |
11 | use crate::proto::h2::server::H2Stream; |
12 | use crate::rt::Executor; |
13 | #[cfg (all(feature = "server" , any(feature = "http1" , feature = "http2" )))] |
14 | use crate::server::server::{new_svc::NewSvcTask, Watcher}; |
15 | #[cfg (all(feature = "server" , any(feature = "http1" , feature = "http2" )))] |
16 | use crate::service::HttpService; |
17 | |
18 | #[cfg (feature = "server" )] |
19 | pub trait ConnStreamExec<F, B: HttpBody>: Clone { |
20 | fn execute_h2stream(&mut self, fut: H2Stream<F, B>); |
21 | } |
22 | |
23 | #[cfg (all(feature = "server" , any(feature = "http1" , feature = "http2" )))] |
24 | pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone { |
25 | fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>); |
26 | } |
27 | |
28 | pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>; |
29 | |
30 | // Either the user provides an executor for background tasks, or we use |
31 | // `tokio::spawn`. |
32 | #[derive (Clone)] |
33 | pub enum Exec { |
34 | Default, |
35 | Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>), |
36 | } |
37 | |
38 | // ===== impl Exec ===== |
39 | |
40 | impl Exec { |
41 | pub(crate) fn execute<F>(&self, fut: F) |
42 | where |
43 | F: Future<Output = ()> + Send + 'static, |
44 | { |
45 | match *self { |
46 | Exec::Default => { |
47 | #[cfg (feature = "tcp" )] |
48 | { |
49 | tokio::task::spawn(future:fut); |
50 | } |
51 | #[cfg (not(feature = "tcp" ))] |
52 | { |
53 | // If no runtime, we need an executor! |
54 | panic!("executor must be set" ) |
55 | } |
56 | } |
57 | Exec::Executor(ref e: &Arc>> + Sync + Send>) => { |
58 | e.execute(fut:Box::pin(fut)); |
59 | } |
60 | } |
61 | } |
62 | } |
63 | |
64 | impl fmt::Debug for Exec { |
65 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
66 | f.debug_struct(name:"Exec" ).finish() |
67 | } |
68 | } |
69 | |
70 | #[cfg (feature = "server" )] |
71 | impl<F, B> ConnStreamExec<F, B> for Exec |
72 | where |
73 | H2Stream<F, B>: Future<Output = ()> + Send + 'static, |
74 | B: HttpBody, |
75 | { |
76 | fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { |
77 | self.execute(fut) |
78 | } |
79 | } |
80 | |
81 | #[cfg (all(feature = "server" , any(feature = "http1" , feature = "http2" )))] |
82 | impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec |
83 | where |
84 | NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static, |
85 | S: HttpService<Body>, |
86 | W: Watcher<I, S, E>, |
87 | { |
88 | fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { |
89 | self.execute(fut) |
90 | } |
91 | } |
92 | |
93 | // ==== impl Executor ===== |
94 | |
95 | #[cfg (feature = "server" )] |
96 | impl<E, F, B> ConnStreamExec<F, B> for E |
97 | where |
98 | E: Executor<H2Stream<F, B>> + Clone, |
99 | H2Stream<F, B>: Future<Output = ()>, |
100 | B: HttpBody, |
101 | { |
102 | fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { |
103 | self.execute(fut) |
104 | } |
105 | } |
106 | |
107 | #[cfg (all(feature = "server" , any(feature = "http1" , feature = "http2" )))] |
108 | impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E |
109 | where |
110 | E: Executor<NewSvcTask<I, N, S, E, W>> + Clone, |
111 | NewSvcTask<I, N, S, E, W>: Future<Output = ()>, |
112 | S: HttpService<Body>, |
113 | W: Watcher<I, S, E>, |
114 | { |
115 | fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { |
116 | self.execute(fut) |
117 | } |
118 | } |
119 | |
120 | // If http2 is not enable, we just have a stub here, so that the trait bounds |
121 | // that *would* have been needed are still checked. Why? |
122 | // |
123 | // Because enabling `http2` shouldn't suddenly add new trait bounds that cause |
124 | // a compilation error. |
125 | #[cfg (not(feature = "http2" ))] |
126 | #[allow (missing_debug_implementations)] |
127 | pub struct H2Stream<F, B>(std::marker::PhantomData<(F, B)>); |
128 | |
129 | #[cfg (not(feature = "http2" ))] |
130 | impl<F, B, E> Future for H2Stream<F, B> |
131 | where |
132 | F: Future<Output = Result<http::Response<B>, E>>, |
133 | B: crate::body::HttpBody, |
134 | B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, |
135 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
136 | { |
137 | type Output = (); |
138 | |
139 | fn poll( |
140 | self: Pin<&mut Self>, |
141 | _cx: &mut std::task::Context<'_>, |
142 | ) -> std::task::Poll<Self::Output> { |
143 | unreachable!() |
144 | } |
145 | } |
146 | |