1use core::pin::Pin;
2use futures_core::future::{FusedFuture, Future, TryFuture};
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream, TryStream};
5use futures_core::task::{Context, Poll};
6#[cfg(feature = "sink")]
7use futures_sink::Sink;
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[project = TryFlattenProj]
12 #[derive(Debug)]
13 pub enum TryFlatten<Fut1, Fut2> {
14 First { #[pin] f: Fut1 },
15 Second { #[pin] f: Fut2 },
16 Empty,
17 }
18}
19
20impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> {
21 pub(crate) fn new(future: Fut1) -> Self {
22 Self::First { f: future }
23 }
24}
25
26impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok>
27where
28 Fut: TryFuture,
29 Fut::Ok: TryFuture<Error = Fut::Error>,
30{
31 fn is_terminated(&self) -> bool {
32 match self {
33 Self::Empty => true,
34 _ => false,
35 }
36 }
37}
38
39impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
40where
41 Fut: TryFuture,
42 Fut::Ok: TryFuture<Error = Fut::Error>,
43{
44 type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>;
45
46 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47 Poll::Ready(loop {
48 match self.as_mut().project() {
49 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
50 Ok(f) => self.set(Self::Second { f }),
51 Err(e) => {
52 self.set(Self::Empty);
53 break Err(e);
54 }
55 },
56 TryFlattenProj::Second { f } => {
57 let output = ready!(f.try_poll(cx));
58 self.set(Self::Empty);
59 break output;
60 }
61 TryFlattenProj::Empty => panic!("TryFlatten polled after completion"),
62 }
63 })
64 }
65}
66
67impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok>
68where
69 Fut: TryFuture,
70 Fut::Ok: TryStream<Error = Fut::Error>,
71{
72 fn is_terminated(&self) -> bool {
73 match self {
74 Self::Empty => true,
75 _ => false,
76 }
77 }
78}
79
80impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
81where
82 Fut: TryFuture,
83 Fut::Ok: TryStream<Error = Fut::Error>,
84{
85 type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
86
87 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88 Poll::Ready(loop {
89 match self.as_mut().project() {
90 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
91 Ok(f) => self.set(Self::Second { f }),
92 Err(e) => {
93 self.set(Self::Empty);
94 break Some(Err(e));
95 }
96 },
97 TryFlattenProj::Second { f } => {
98 let output = ready!(f.try_poll_next(cx));
99 if output.is_none() {
100 self.set(Self::Empty);
101 }
102 break output;
103 }
104 TryFlattenProj::Empty => break None,
105 }
106 })
107 }
108}
109
110#[cfg(feature = "sink")]
111impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok>
112where
113 Fut: TryFuture,
114 Fut::Ok: Sink<Item, Error = Fut::Error>,
115{
116 type Error = Fut::Error;
117
118 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119 Poll::Ready(loop {
120 match self.as_mut().project() {
121 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
122 Ok(f) => self.set(Self::Second { f }),
123 Err(e) => {
124 self.set(Self::Empty);
125 break Err(e);
126 }
127 },
128 TryFlattenProj::Second { f } => {
129 break ready!(f.poll_ready(cx));
130 }
131 TryFlattenProj::Empty => panic!("poll_ready called after eof"),
132 }
133 })
134 }
135
136 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
137 match self.project() {
138 TryFlattenProj::First { .. } => panic!("poll_ready not called first"),
139 TryFlattenProj::Second { f } => f.start_send(item),
140 TryFlattenProj::Empty => panic!("start_send called after eof"),
141 }
142 }
143
144 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145 match self.project() {
146 TryFlattenProj::First { .. } => Poll::Ready(Ok(())),
147 TryFlattenProj::Second { f } => f.poll_flush(cx),
148 TryFlattenProj::Empty => panic!("poll_flush called after eof"),
149 }
150 }
151
152 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153 let res = match self.as_mut().project() {
154 TryFlattenProj::Second { f } => f.poll_close(cx),
155 _ => Poll::Ready(Ok(())),
156 };
157 if res.is_ready() {
158 self.set(Self::Empty);
159 }
160 res
161 }
162}
163