1 | use core::marker::PhantomData; |
2 | use core::pin::Pin; |
3 | |
4 | use futures_core::ready; |
5 | use futures_core::stream::{FusedStream, Stream, TryStream}; |
6 | use futures_core::task::{Context, Poll}; |
7 | #[cfg (feature = "sink" )] |
8 | use futures_sink::Sink; |
9 | |
10 | use pin_project_lite::pin_project; |
11 | |
12 | use crate::future::Either; |
13 | use crate::stream::stream::flatten_unordered::{ |
14 | FlattenUnorderedWithFlowController, FlowController, FlowStep, |
15 | }; |
16 | use crate::stream::IntoStream; |
17 | use crate::TryStreamExt; |
18 | |
19 | delegate_all!( |
20 | /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. |
21 | TryFlattenUnordered<St>( |
22 | FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>> |
23 | ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] |
24 | + New[ |
25 | |stream: St, limit: impl Into<Option<usize>>| |
26 | FlattenUnorderedWithFlowController::new( |
27 | NestedTryStreamIntoEitherTryStream::new(stream), |
28 | limit.into() |
29 | ) |
30 | ] |
31 | where |
32 | St: TryStream, |
33 | St::Ok: TryStream, |
34 | St::Ok: Unpin, |
35 | <St::Ok as TryStream>::Error: From<St::Error> |
36 | ); |
37 | |
38 | pin_project! { |
39 | /// Emits either successful streams or single-item streams containing the underlying errors. |
40 | /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. |
41 | #[derive(Debug)] |
42 | #[must_use = "streams do nothing unless polled" ] |
43 | pub struct NestedTryStreamIntoEitherTryStream<St> |
44 | where |
45 | St: TryStream, |
46 | St::Ok: TryStream, |
47 | St::Ok: Unpin, |
48 | <St::Ok as TryStream>::Error: From<St::Error> |
49 | { |
50 | #[pin] |
51 | stream: St |
52 | } |
53 | } |
54 | |
55 | impl<St> NestedTryStreamIntoEitherTryStream<St> |
56 | where |
57 | St: TryStream, |
58 | St::Ok: TryStream + Unpin, |
59 | <St::Ok as TryStream>::Error: From<St::Error>, |
60 | { |
61 | fn new(stream: St) -> Self { |
62 | Self { stream } |
63 | } |
64 | |
65 | delegate_access_inner!(stream, St, ()); |
66 | } |
67 | |
68 | /// Emits a single item immediately, then stream will be terminated. |
69 | #[derive(Debug, Clone)] |
70 | pub struct Single<T>(Option<T>); |
71 | |
72 | impl<T> Single<T> { |
73 | /// Constructs new `Single` with the given value. |
74 | fn new(val: T) -> Self { |
75 | Self(Some(val)) |
76 | } |
77 | |
78 | /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated. |
79 | fn next_immediate(&mut self) -> Option<T> { |
80 | self.0.take() |
81 | } |
82 | } |
83 | |
84 | impl<T> Unpin for Single<T> {} |
85 | |
86 | impl<T> Stream for Single<T> { |
87 | type Item = T; |
88 | |
89 | fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
90 | Poll::Ready(self.0.take()) |
91 | } |
92 | |
93 | fn size_hint(&self) -> (usize, Option<usize>) { |
94 | self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) |
95 | } |
96 | } |
97 | |
98 | /// Immediately propagates errors occurred in the base stream. |
99 | #[derive(Debug, Clone, Copy)] |
100 | pub struct PropagateBaseStreamError<St>(PhantomData<St>); |
101 | |
102 | type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item; |
103 | type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item; |
104 | |
105 | impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St> |
106 | where |
107 | St: TryStream, |
108 | St::Ok: TryStream + Unpin, |
109 | <St::Ok as TryStream>::Error: From<St::Error>, |
110 | { |
111 | fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> { |
112 | match item { |
113 | // A new successful inner stream received |
114 | st @ Either::Left(_) => FlowStep::Continue(st), |
115 | // An error encountered |
116 | Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()), |
117 | } |
118 | } |
119 | } |
120 | |
121 | type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>; |
122 | |
123 | impl<St> Stream for NestedTryStreamIntoEitherTryStream<St> |
124 | where |
125 | St: TryStream, |
126 | St::Ok: TryStream + Unpin, |
127 | <St::Ok as TryStream>::Error: From<St::Error>, |
128 | { |
129 | // Item is either an inner stream or a stream containing a single error. |
130 | // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. |
131 | type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>; |
132 | |
133 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
134 | let item = ready!(self.project().stream.try_poll_next(cx)); |
135 | |
136 | let out = match item { |
137 | Some(res) => match res { |
138 | // Emit successful inner stream as is |
139 | Ok(stream) => Either::Left(stream.into_stream()), |
140 | // Wrap an error into a stream containing a single item |
141 | err @ Err(_) => { |
142 | let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); |
143 | |
144 | Either::Right(Single::new(res)) |
145 | } |
146 | }, |
147 | None => return Poll::Ready(None), |
148 | }; |
149 | |
150 | Poll::Ready(Some(out)) |
151 | } |
152 | } |
153 | |
154 | impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St> |
155 | where |
156 | St: TryStream + FusedStream, |
157 | St::Ok: TryStream + Unpin, |
158 | <St::Ok as TryStream>::Error: From<St::Error>, |
159 | { |
160 | fn is_terminated(&self) -> bool { |
161 | self.stream.is_terminated() |
162 | } |
163 | } |
164 | |
165 | // Forwarding impl of Sink from the underlying stream |
166 | #[cfg (feature = "sink" )] |
167 | impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St> |
168 | where |
169 | St: TryStream + Sink<Item>, |
170 | St::Ok: TryStream + Unpin, |
171 | <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>, |
172 | { |
173 | type Error = <St as Sink<Item>>::Error; |
174 | |
175 | delegate_sink!(stream, Item); |
176 | } |
177 | |