1 | use core::pin::Pin; |
2 | use futures_core::ready; |
3 | use futures_core::stream::{FusedStream, Stream}; |
4 | use futures_core::task::{Context, Poll}; |
5 | #[cfg (feature = "sink" )] |
6 | use futures_sink::Sink; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Stream for the [`flatten`](super::StreamExt::flatten) method. |
11 | #[derive(Debug)] |
12 | #[must_use = "streams do nothing unless polled" ] |
13 | pub struct Flatten<St, U> { |
14 | #[pin] |
15 | stream: St, |
16 | #[pin] |
17 | next: Option<U>, |
18 | } |
19 | } |
20 | |
21 | impl<St, U> Flatten<St, U> { |
22 | pub(super) fn new(stream: St) -> Self { |
23 | Self { stream, next: None } |
24 | } |
25 | |
26 | delegate_access_inner!(stream, St, ()); |
27 | } |
28 | |
29 | impl<St> FusedStream for Flatten<St, St::Item> |
30 | where |
31 | St: FusedStream, |
32 | St::Item: Stream, |
33 | { |
34 | fn is_terminated(&self) -> bool { |
35 | self.next.is_none() && self.stream.is_terminated() |
36 | } |
37 | } |
38 | |
39 | impl<St> Stream for Flatten<St, St::Item> |
40 | where |
41 | St: Stream, |
42 | St::Item: Stream, |
43 | { |
44 | type Item = <St::Item as Stream>::Item; |
45 | |
46 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
47 | let mut this = self.project(); |
48 | Poll::Ready(loop { |
49 | if let Some(s) = this.next.as_mut().as_pin_mut() { |
50 | if let Some(item) = ready!(s.poll_next(cx)) { |
51 | break Some(item); |
52 | } else { |
53 | this.next.set(None); |
54 | } |
55 | } else if let Some(s) = ready!(this.stream.as_mut().poll_next(cx)) { |
56 | this.next.set(Some(s)); |
57 | } else { |
58 | break None; |
59 | } |
60 | }) |
61 | } |
62 | } |
63 | |
64 | // Forwarding impl of Sink from the underlying stream |
65 | #[cfg (feature = "sink" )] |
66 | impl<S, Item> Sink<Item> for Flatten<S, S::Item> |
67 | where |
68 | S: Stream + Sink<Item>, |
69 | { |
70 | type Error = S::Error; |
71 | |
72 | delegate_sink!(stream, Item); |
73 | } |
74 | |