1 | use core::pin::Pin; |
2 | use futures_core::ready; |
3 | use futures_core::stream::{FusedStream, Stream, TryStream}; |
4 | use futures_core::task::{Context, Poll}; |
5 | #[cfg (feature = "sink" )] |
6 | use futures_sink::Sink; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method. |
11 | #[derive(Debug)] |
12 | #[must_use = "streams do nothing unless polled" ] |
13 | pub struct TryFlatten<St> |
14 | where |
15 | St: TryStream, |
16 | { |
17 | #[pin] |
18 | stream: St, |
19 | #[pin] |
20 | next: Option<St::Ok>, |
21 | } |
22 | } |
23 | |
24 | impl<St> TryFlatten<St> |
25 | where |
26 | St: TryStream, |
27 | St::Ok: TryStream, |
28 | <St::Ok as TryStream>::Error: From<St::Error>, |
29 | { |
30 | pub(super) fn new(stream: St) -> Self { |
31 | Self { stream, next: None } |
32 | } |
33 | |
34 | delegate_access_inner!(stream, St, ()); |
35 | } |
36 | |
37 | impl<St> FusedStream for TryFlatten<St> |
38 | where |
39 | St: TryStream + FusedStream, |
40 | St::Ok: TryStream, |
41 | <St::Ok as TryStream>::Error: From<St::Error>, |
42 | { |
43 | fn is_terminated(&self) -> bool { |
44 | self.next.is_none() && self.stream.is_terminated() |
45 | } |
46 | } |
47 | |
48 | impl<St> Stream for TryFlatten<St> |
49 | where |
50 | St: TryStream, |
51 | St::Ok: TryStream, |
52 | <St::Ok as TryStream>::Error: From<St::Error>, |
53 | { |
54 | type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>; |
55 | |
56 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
57 | let mut this: Projection<'_, St> = self.project(); |
58 | |
59 | Poll::Ready(loop { |
60 | if let Some(s: Pin<&mut ::Ok>) = this.next.as_mut().as_pin_mut() { |
61 | if let Some(item: <::Ok as TryStream>::Ok) = ready!(s.try_poll_next(cx)?) { |
62 | break Some(Ok(item)); |
63 | } else { |
64 | this.next.set(None); |
65 | } |
66 | } else if let Some(s: ::Ok) = ready!(this.stream.as_mut().try_poll_next(cx)?) { |
67 | this.next.set(Some(s)); |
68 | } else { |
69 | break None; |
70 | } |
71 | }) |
72 | } |
73 | } |
74 | |
75 | // Forwarding impl of Sink from the underlying stream |
76 | #[cfg (feature = "sink" )] |
77 | impl<S, Item> Sink<Item> for TryFlatten<S> |
78 | where |
79 | S: TryStream + Sink<Item>, |
80 | { |
81 | type Error = <S as Sink<Item>>::Error; |
82 | |
83 | delegate_sink!(stream, Item); |
84 | } |
85 | |