1 | //! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream]. |
2 | //! |
3 | //! [`Service<Request>`]: crate::Service |
4 | //! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html |
5 | |
6 | use super::common; |
7 | use futures_core::Stream; |
8 | use futures_util::stream::FuturesUnordered; |
9 | use pin_project_lite::pin_project; |
10 | use std::{ |
11 | future::Future, |
12 | pin::Pin, |
13 | task::{Context, Poll}, |
14 | }; |
15 | use tower_service::Service; |
16 | |
17 | pin_project! { |
18 | /// A stream of responses received from the inner service in received order. |
19 | /// |
20 | /// Similar to [`CallAll`] except, instead of yielding responses in request order, |
21 | /// responses are returned as they are available. |
22 | /// |
23 | /// [`CallAll`]: crate::util::CallAll |
24 | #[derive (Debug)] |
25 | pub struct CallAllUnordered<Svc, S> |
26 | where |
27 | Svc: Service<S::Item>, |
28 | S: Stream, |
29 | { |
30 | #[pin] |
31 | inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>, |
32 | } |
33 | } |
34 | |
35 | impl<Svc, S> CallAllUnordered<Svc, S> |
36 | where |
37 | Svc: Service<S::Item>, |
38 | S: Stream, |
39 | { |
40 | /// Create new [`CallAllUnordered`] combinator. |
41 | /// |
42 | /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html |
43 | pub fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> { |
44 | CallAllUnordered { |
45 | inner: common::CallAll::new(service, stream, FuturesUnordered::new()), |
46 | } |
47 | } |
48 | |
49 | /// Extract the wrapped [`Service`]. |
50 | /// |
51 | /// # Panics |
52 | /// |
53 | /// Panics if [`take_service`] was already called. |
54 | /// |
55 | /// [`take_service`]: crate::util::CallAllUnordered::take_service |
56 | pub fn into_inner(self) -> Svc { |
57 | self.inner.into_inner() |
58 | } |
59 | |
60 | /// Extract the wrapped `Service`. |
61 | /// |
62 | /// This [`CallAllUnordered`] can no longer be used after this function has been called. |
63 | /// |
64 | /// # Panics |
65 | /// |
66 | /// Panics if [`take_service`] was already called. |
67 | /// |
68 | /// [`take_service`]: crate::util::CallAllUnordered::take_service |
69 | pub fn take_service(self: Pin<&mut Self>) -> Svc { |
70 | self.project().inner.take_service() |
71 | } |
72 | } |
73 | |
74 | impl<Svc, S> Stream for CallAllUnordered<Svc, S> |
75 | where |
76 | Svc: Service<S::Item>, |
77 | S: Stream, |
78 | { |
79 | type Item = Result<Svc::Response, Svc::Error>; |
80 | |
81 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
82 | self.project().inner.poll_next(cx) |
83 | } |
84 | } |
85 | |
86 | impl<F: Future> common::Drive<F> for FuturesUnordered<F> { |
87 | fn is_empty(&self) -> bool { |
88 | FuturesUnordered::is_empty(self) |
89 | } |
90 | |
91 | fn push(&mut self, future: F) { |
92 | FuturesUnordered::push(self, future) |
93 | } |
94 | |
95 | fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> { |
96 | Stream::poll_next(self:Pin::new(self), cx) |
97 | } |
98 | } |
99 | |