1 | use core::pin::Pin; |
2 | use futures_core::future::{FusedFuture, Future}; |
3 | use futures_core::ready; |
4 | use futures_core::stream::{FusedStream, Stream}; |
5 | use futures_core::task::{Context, Poll}; |
6 | use pin_project_lite::pin_project; |
7 | |
8 | pin_project! { |
9 | /// Future for the [`concat`](super::StreamExt::concat) method. |
10 | #[derive(Debug)] |
11 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
12 | pub struct Concat<St: Stream> { |
13 | #[pin] |
14 | stream: St, |
15 | accum: Option<St::Item>, |
16 | } |
17 | } |
18 | |
19 | impl<St> Concat<St> |
20 | where |
21 | St: Stream, |
22 | St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, |
23 | { |
24 | pub(super) fn new(stream: St) -> Self { |
25 | Self { stream, accum: None } |
26 | } |
27 | } |
28 | |
29 | impl<St> Future for Concat<St> |
30 | where |
31 | St: Stream, |
32 | St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, |
33 | { |
34 | type Output = St::Item; |
35 | |
36 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
37 | let mut this: Projection<'_, St> = self.project(); |
38 | |
39 | loop { |
40 | match ready!(this.stream.as_mut().poll_next(cx)) { |
41 | None => return Poll::Ready(this.accum.take().unwrap_or_default()), |
42 | Some(e: ::Item) => { |
43 | if let Some(a: &mut ::Item) = this.accum { |
44 | a.extend(iter:e) |
45 | } else { |
46 | *this.accum = Some(e) |
47 | } |
48 | } |
49 | } |
50 | } |
51 | } |
52 | } |
53 | |
54 | impl<St> FusedFuture for Concat<St> |
55 | where |
56 | St: FusedStream, |
57 | St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, |
58 | { |
59 | fn is_terminated(&self) -> bool { |
60 | self.accum.is_none() && self.stream.is_terminated() |
61 | } |
62 | } |
63 | |