1 | use core::pin::Pin; |
2 | use futures_core::future::{FusedFuture, Future}; |
3 | use futures_core::ready; |
4 | use futures_core::stream::{FusedStream, Stream}; |
5 | use futures_core::task::{Context, Poll}; |
6 | #[cfg (feature = "sink" )] |
7 | use futures_sink::Sink; |
8 | use pin_project_lite::pin_project; |
9 | |
10 | pin_project! { |
11 | #[project = FlattenProj] |
12 | #[derive(Debug)] |
13 | pub enum Flatten<Fut1, Fut2> { |
14 | First { #[pin] f: Fut1 }, |
15 | Second { #[pin] f: Fut2 }, |
16 | Empty, |
17 | } |
18 | } |
19 | |
20 | impl<Fut1, Fut2> Flatten<Fut1, Fut2> { |
21 | pub(crate) fn new(future: Fut1) -> Self { |
22 | Self::First { f: future } |
23 | } |
24 | } |
25 | |
26 | impl<Fut> FusedFuture for Flatten<Fut, Fut::Output> |
27 | where |
28 | Fut: Future, |
29 | Fut::Output: Future, |
30 | { |
31 | fn is_terminated(&self) -> bool { |
32 | match self { |
33 | Self::Empty => true, |
34 | _ => false, |
35 | } |
36 | } |
37 | } |
38 | |
39 | impl<Fut> Future for Flatten<Fut, Fut::Output> |
40 | where |
41 | Fut: Future, |
42 | Fut::Output: Future, |
43 | { |
44 | type Output = <Fut::Output as Future>::Output; |
45 | |
46 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
47 | Poll::Ready(loop { |
48 | match self.as_mut().project() { |
49 | FlattenProj::First { f: Pin<&mut Fut> } => { |
50 | let f: ::Output = ready!(f.poll(cx)); |
51 | self.set(Self::Second { f }); |
52 | } |
53 | FlattenProj::Second { f: Pin<&mut ::Output> } => { |
54 | let output: <::Output as Future>::Output = ready!(f.poll(cx)); |
55 | self.set(Self::Empty); |
56 | break output; |
57 | } |
58 | FlattenProj::Empty => panic!("Flatten polled after completion" ), |
59 | } |
60 | }) |
61 | } |
62 | } |
63 | |
64 | impl<Fut> FusedStream for Flatten<Fut, Fut::Output> |
65 | where |
66 | Fut: Future, |
67 | Fut::Output: Stream, |
68 | { |
69 | fn is_terminated(&self) -> bool { |
70 | match self { |
71 | Self::Empty => true, |
72 | _ => false, |
73 | } |
74 | } |
75 | } |
76 | |
77 | impl<Fut> Stream for Flatten<Fut, Fut::Output> |
78 | where |
79 | Fut: Future, |
80 | Fut::Output: Stream, |
81 | { |
82 | type Item = <Fut::Output as Stream>::Item; |
83 | |
84 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
85 | Poll::Ready(loop { |
86 | match self.as_mut().project() { |
87 | FlattenProj::First { f: Pin<&mut Fut> } => { |
88 | let f: ::Output = ready!(f.poll(cx)); |
89 | self.set(Self::Second { f }); |
90 | } |
91 | FlattenProj::Second { f: Pin<&mut ::Output> } => { |
92 | let output: Option<<::Output as Stream>::Item> = ready!(f.poll_next(cx)); |
93 | if output.is_none() { |
94 | self.set(Self::Empty); |
95 | } |
96 | break output; |
97 | } |
98 | FlattenProj::Empty => break None, |
99 | } |
100 | }) |
101 | } |
102 | } |
103 | |
104 | #[cfg (feature = "sink" )] |
105 | impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output> |
106 | where |
107 | Fut: Future, |
108 | Fut::Output: Sink<Item>, |
109 | { |
110 | type Error = <Fut::Output as Sink<Item>>::Error; |
111 | |
112 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
113 | Poll::Ready(loop { |
114 | match self.as_mut().project() { |
115 | FlattenProj::First { f } => { |
116 | let f = ready!(f.poll(cx)); |
117 | self.set(Self::Second { f }); |
118 | } |
119 | FlattenProj::Second { f } => { |
120 | break ready!(f.poll_ready(cx)); |
121 | } |
122 | FlattenProj::Empty => panic!("poll_ready called after eof" ), |
123 | } |
124 | }) |
125 | } |
126 | |
127 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
128 | match self.project() { |
129 | FlattenProj::First { .. } => panic!("poll_ready not called first" ), |
130 | FlattenProj::Second { f } => f.start_send(item), |
131 | FlattenProj::Empty => panic!("start_send called after eof" ), |
132 | } |
133 | } |
134 | |
135 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
136 | match self.project() { |
137 | FlattenProj::First { .. } => Poll::Ready(Ok(())), |
138 | FlattenProj::Second { f } => f.poll_flush(cx), |
139 | FlattenProj::Empty => panic!("poll_flush called after eof" ), |
140 | } |
141 | } |
142 | |
143 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
144 | let res = match self.as_mut().project() { |
145 | FlattenProj::Second { f } => f.poll_close(cx), |
146 | _ => Poll::Ready(Ok(())), |
147 | }; |
148 | if res.is_ready() { |
149 | self.set(Self::Empty); |
150 | } |
151 | res |
152 | } |
153 | } |
154 | |