1use core::pin::Pin;
2use futures_core::future::{FusedFuture, Future};
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6#[cfg(feature = "sink")]
7use futures_sink::Sink;
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[project = FlattenProj]
12 #[derive(Debug)]
13 pub enum Flatten<Fut1, Fut2> {
14 First { #[pin] f: Fut1 },
15 Second { #[pin] f: Fut2 },
16 Empty,
17 }
18}
19
20impl<Fut1, Fut2> Flatten<Fut1, Fut2> {
21 pub(crate) fn new(future: Fut1) -> Self {
22 Self::First { f: future }
23 }
24}
25
26impl<Fut> FusedFuture for Flatten<Fut, Fut::Output>
27where
28 Fut: Future,
29 Fut::Output: Future,
30{
31 fn is_terminated(&self) -> bool {
32 match self {
33 Self::Empty => true,
34 _ => false,
35 }
36 }
37}
38
39impl<Fut> Future for Flatten<Fut, Fut::Output>
40where
41 Fut: Future,
42 Fut::Output: Future,
43{
44 type Output = <Fut::Output as Future>::Output;
45
46 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47 Poll::Ready(loop {
48 match self.as_mut().project() {
49 FlattenProj::First { f: Pin<&mut Fut> } => {
50 let f: ::Output = ready!(f.poll(cx));
51 self.set(Self::Second { f });
52 }
53 FlattenProj::Second { f: Pin<&mut ::Output> } => {
54 let output: <::Output as Future>::Output = ready!(f.poll(cx));
55 self.set(Self::Empty);
56 break output;
57 }
58 FlattenProj::Empty => panic!("Flatten polled after completion"),
59 }
60 })
61 }
62}
63
64impl<Fut> FusedStream for Flatten<Fut, Fut::Output>
65where
66 Fut: Future,
67 Fut::Output: Stream,
68{
69 fn is_terminated(&self) -> bool {
70 match self {
71 Self::Empty => true,
72 _ => false,
73 }
74 }
75}
76
77impl<Fut> Stream for Flatten<Fut, Fut::Output>
78where
79 Fut: Future,
80 Fut::Output: Stream,
81{
82 type Item = <Fut::Output as Stream>::Item;
83
84 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85 Poll::Ready(loop {
86 match self.as_mut().project() {
87 FlattenProj::First { f: Pin<&mut Fut> } => {
88 let f: ::Output = ready!(f.poll(cx));
89 self.set(Self::Second { f });
90 }
91 FlattenProj::Second { f: Pin<&mut ::Output> } => {
92 let output: Option<<::Output as Stream>::Item> = ready!(f.poll_next(cx));
93 if output.is_none() {
94 self.set(Self::Empty);
95 }
96 break output;
97 }
98 FlattenProj::Empty => break None,
99 }
100 })
101 }
102}
103
104#[cfg(feature = "sink")]
105impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output>
106where
107 Fut: Future,
108 Fut::Output: Sink<Item>,
109{
110 type Error = <Fut::Output as Sink<Item>>::Error;
111
112 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113 Poll::Ready(loop {
114 match self.as_mut().project() {
115 FlattenProj::First { f } => {
116 let f = ready!(f.poll(cx));
117 self.set(Self::Second { f });
118 }
119 FlattenProj::Second { f } => {
120 break ready!(f.poll_ready(cx));
121 }
122 FlattenProj::Empty => panic!("poll_ready called after eof"),
123 }
124 })
125 }
126
127 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
128 match self.project() {
129 FlattenProj::First { .. } => panic!("poll_ready not called first"),
130 FlattenProj::Second { f } => f.start_send(item),
131 FlattenProj::Empty => panic!("start_send called after eof"),
132 }
133 }
134
135 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
136 match self.project() {
137 FlattenProj::First { .. } => Poll::Ready(Ok(())),
138 FlattenProj::Second { f } => f.poll_flush(cx),
139 FlattenProj::Empty => panic!("poll_flush called after eof"),
140 }
141 }
142
143 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144 let res = match self.as_mut().project() {
145 FlattenProj::Second { f } => f.poll_close(cx),
146 _ => Poll::Ready(Ok(())),
147 };
148 if res.is_ready() {
149 self.set(Self::Empty);
150 }
151 res
152 }
153}
154