1 | use futures_core::{ready, Stream}; |
2 | use pin_project_lite::pin_project; |
3 | use std::{ |
4 | fmt, |
5 | future::Future, |
6 | pin::Pin, |
7 | task::{Context, Poll}, |
8 | }; |
9 | use tower_service::Service; |
10 | |
11 | pin_project! { |
12 | /// The [`Future`] returned by the [`ServiceExt::call_all`] combinator. |
13 | pub(crate) struct CallAll<Svc, S, Q> |
14 | where |
15 | S: Stream, |
16 | { |
17 | service: Option<Svc>, |
18 | #[pin] |
19 | stream: S, |
20 | queue: Q, |
21 | eof: bool, |
22 | curr_req: Option<S::Item> |
23 | } |
24 | } |
25 | |
26 | impl<Svc, S, Q> fmt::Debug for CallAll<Svc, S, Q> |
27 | where |
28 | Svc: fmt::Debug, |
29 | S: Stream + fmt::Debug, |
30 | { |
31 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
32 | f&mut DebugStruct<'_, '_>.debug_struct("CallAll" ) |
33 | .field("service" , &self.service) |
34 | .field("stream" , &self.stream) |
35 | .field(name:"eof" , &self.eof) |
36 | .finish() |
37 | } |
38 | } |
39 | |
40 | pub(crate) trait Drive<F: Future> { |
41 | fn is_empty(&self) -> bool; |
42 | |
43 | fn push(&mut self, future: F); |
44 | |
45 | fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>; |
46 | } |
47 | |
48 | impl<Svc, S, Q> CallAll<Svc, S, Q> |
49 | where |
50 | Svc: Service<S::Item>, |
51 | S: Stream, |
52 | Q: Drive<Svc::Future>, |
53 | { |
54 | pub(crate) const fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> { |
55 | CallAll { |
56 | service: Some(service), |
57 | stream, |
58 | queue, |
59 | eof: false, |
60 | curr_req: None, |
61 | } |
62 | } |
63 | |
64 | /// Extract the wrapped [`Service`]. |
65 | pub(crate) fn into_inner(mut self) -> Svc { |
66 | self.service.take().expect("Service already taken" ) |
67 | } |
68 | |
69 | /// Extract the wrapped [`Service`]. |
70 | pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc { |
71 | self.project() |
72 | .service |
73 | .take() |
74 | .expect("Service already taken" ) |
75 | } |
76 | |
77 | pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> { |
78 | assert!(self.queue.is_empty() && !self.eof); |
79 | |
80 | super::CallAllUnordered::new(self.service.take().unwrap(), self.stream) |
81 | } |
82 | } |
83 | |
84 | impl<Svc, S, Q> Stream for CallAll<Svc, S, Q> |
85 | where |
86 | Svc: Service<S::Item>, |
87 | S: Stream, |
88 | Q: Drive<Svc::Future>, |
89 | { |
90 | type Item = Result<Svc::Response, Svc::Error>; |
91 | |
92 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
93 | let mut this = self.project(); |
94 | |
95 | loop { |
96 | // First, see if we have any responses to yield |
97 | if let Poll::Ready(r) = this.queue.poll(cx) { |
98 | if let Some(rsp) = r.transpose()? { |
99 | return Poll::Ready(Some(Ok(rsp))); |
100 | } |
101 | } |
102 | |
103 | // If there are no more requests coming, check if we're done |
104 | if *this.eof { |
105 | if this.queue.is_empty() { |
106 | return Poll::Ready(None); |
107 | } else { |
108 | return Poll::Pending; |
109 | } |
110 | } |
111 | |
112 | // If not done, and we don't have a stored request, gather the next request from the |
113 | // stream (if there is one), or return `Pending` if the stream is not ready. |
114 | if this.curr_req.is_none() { |
115 | *this.curr_req = match ready!(this.stream.as_mut().poll_next(cx)) { |
116 | Some(next_req) => Some(next_req), |
117 | None => { |
118 | // Mark that there will be no more requests. |
119 | *this.eof = true; |
120 | continue; |
121 | } |
122 | }; |
123 | } |
124 | |
125 | // Then, see that the service is ready for another request |
126 | let svc = this |
127 | .service |
128 | .as_mut() |
129 | .expect("Using CallAll after extracting inner Service" ); |
130 | |
131 | if let Err(e) = ready!(svc.poll_ready(cx)) { |
132 | // Set eof to prevent the service from being called again after a `poll_ready` error |
133 | *this.eof = true; |
134 | return Poll::Ready(Some(Err(e))); |
135 | } |
136 | |
137 | // Unwrap: The check above always sets `this.curr_req` if none. |
138 | this.queue.push(svc.call(this.curr_req.take().unwrap())); |
139 | } |
140 | } |
141 | } |
142 | |