1 | use core::pin::Pin; |
2 | use futures_core::future::{FusedFuture, Future, TryFuture}; |
3 | use futures_core::ready; |
4 | use futures_core::stream::{FusedStream, Stream, TryStream}; |
5 | use futures_core::task::{Context, Poll}; |
6 | #[cfg (feature = "sink" )] |
7 | use futures_sink::Sink; |
8 | use pin_project_lite::pin_project; |
9 | |
10 | pin_project! { |
11 | #[project = TryFlattenProj] |
12 | #[derive(Debug)] |
13 | pub enum TryFlatten<Fut1, Fut2> { |
14 | First { #[pin] f: Fut1 }, |
15 | Second { #[pin] f: Fut2 }, |
16 | Empty, |
17 | } |
18 | } |
19 | |
20 | impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> { |
21 | pub(crate) fn new(future: Fut1) -> Self { |
22 | Self::First { f: future } |
23 | } |
24 | } |
25 | |
26 | impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok> |
27 | where |
28 | Fut: TryFuture, |
29 | Fut::Ok: TryFuture<Error = Fut::Error>, |
30 | { |
31 | fn is_terminated(&self) -> bool { |
32 | match self { |
33 | Self::Empty => true, |
34 | _ => false, |
35 | } |
36 | } |
37 | } |
38 | |
39 | impl<Fut> Future for TryFlatten<Fut, Fut::Ok> |
40 | where |
41 | Fut: TryFuture, |
42 | Fut::Ok: TryFuture<Error = Fut::Error>, |
43 | { |
44 | type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>; |
45 | |
46 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
47 | Poll::Ready(loop { |
48 | match self.as_mut().project() { |
49 | TryFlattenProj::First { f: Pin<&mut Fut> } => match ready!(f.try_poll(cx)) { |
50 | Ok(f: ::Ok) => self.set(Self::Second { f }), |
51 | Err(e: ::Error) => { |
52 | self.set(Self::Empty); |
53 | break Err(e); |
54 | } |
55 | }, |
56 | TryFlattenProj::Second { f: Pin<&mut ::Ok> } => { |
57 | let output: Result<<::Ok as TryFuture>::Ok, …> = ready!(f.try_poll(cx)); |
58 | self.set(Self::Empty); |
59 | break output; |
60 | } |
61 | TryFlattenProj::Empty => panic!("TryFlatten polled after completion" ), |
62 | } |
63 | }) |
64 | } |
65 | } |
66 | |
67 | impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok> |
68 | where |
69 | Fut: TryFuture, |
70 | Fut::Ok: TryStream<Error = Fut::Error>, |
71 | { |
72 | fn is_terminated(&self) -> bool { |
73 | match self { |
74 | Self::Empty => true, |
75 | _ => false, |
76 | } |
77 | } |
78 | } |
79 | |
80 | impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> |
81 | where |
82 | Fut: TryFuture, |
83 | Fut::Ok: TryStream<Error = Fut::Error>, |
84 | { |
85 | type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>; |
86 | |
87 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
88 | Poll::Ready(loop { |
89 | match self.as_mut().project() { |
90 | TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { |
91 | Ok(f) => self.set(Self::Second { f }), |
92 | Err(e) => { |
93 | self.set(Self::Empty); |
94 | break Some(Err(e)); |
95 | } |
96 | }, |
97 | TryFlattenProj::Second { f } => { |
98 | let output = ready!(f.try_poll_next(cx)); |
99 | if output.is_none() { |
100 | self.set(Self::Empty); |
101 | } |
102 | break output; |
103 | } |
104 | TryFlattenProj::Empty => break None, |
105 | } |
106 | }) |
107 | } |
108 | } |
109 | |
110 | #[cfg (feature = "sink" )] |
111 | impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok> |
112 | where |
113 | Fut: TryFuture, |
114 | Fut::Ok: Sink<Item, Error = Fut::Error>, |
115 | { |
116 | type Error = Fut::Error; |
117 | |
118 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
119 | Poll::Ready(loop { |
120 | match self.as_mut().project() { |
121 | TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { |
122 | Ok(f) => self.set(Self::Second { f }), |
123 | Err(e) => { |
124 | self.set(Self::Empty); |
125 | break Err(e); |
126 | } |
127 | }, |
128 | TryFlattenProj::Second { f } => { |
129 | break ready!(f.poll_ready(cx)); |
130 | } |
131 | TryFlattenProj::Empty => panic!("poll_ready called after eof" ), |
132 | } |
133 | }) |
134 | } |
135 | |
136 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
137 | match self.project() { |
138 | TryFlattenProj::First { .. } => panic!("poll_ready not called first" ), |
139 | TryFlattenProj::Second { f } => f.start_send(item), |
140 | TryFlattenProj::Empty => panic!("start_send called after eof" ), |
141 | } |
142 | } |
143 | |
144 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
145 | match self.project() { |
146 | TryFlattenProj::First { .. } => Poll::Ready(Ok(())), |
147 | TryFlattenProj::Second { f } => f.poll_flush(cx), |
148 | TryFlattenProj::Empty => panic!("poll_flush called after eof" ), |
149 | } |
150 | } |
151 | |
152 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
153 | let res = match self.as_mut().project() { |
154 | TryFlattenProj::Second { f } => f.poll_close(cx), |
155 | _ => Poll::Ready(Ok(())), |
156 | }; |
157 | if res.is_ready() { |
158 | self.set(Self::Empty); |
159 | } |
160 | res |
161 | } |
162 | } |
163 | |