| 1 | use core::pin::Pin; |
| 2 | use futures_core::future::{FusedFuture, Future}; |
| 3 | use futures_core::ready; |
| 4 | use futures_core::stream::{FusedStream, Stream}; |
| 5 | use futures_core::task::{Context, Poll}; |
| 6 | use pin_project_lite::pin_project; |
| 7 | |
| 8 | pin_project! { |
| 9 | /// Future for the [`concat`](super::StreamExt::concat) method. |
| 10 | #[derive(Debug)] |
| 11 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
| 12 | pub struct Concat<St: Stream> { |
| 13 | #[pin] |
| 14 | stream: St, |
| 15 | accum: Option<St::Item>, |
| 16 | } |
| 17 | } |
| 18 | |
| 19 | impl<St> Concat<St> |
| 20 | where |
| 21 | St: Stream, |
| 22 | St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, |
| 23 | { |
| 24 | pub(super) fn new(stream: St) -> Self { |
| 25 | Self { stream, accum: None } |
| 26 | } |
| 27 | } |
| 28 | |
| 29 | impl<St> Future for Concat<St> |
| 30 | where |
| 31 | St: Stream, |
| 32 | St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, |
| 33 | { |
| 34 | type Output = St::Item; |
| 35 | |
| 36 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 37 | let mut this = self.project(); |
| 38 | |
| 39 | loop { |
| 40 | match ready!(this.stream.as_mut().poll_next(cx)) { |
| 41 | None => return Poll::Ready(this.accum.take().unwrap_or_default()), |
| 42 | Some(e) => { |
| 43 | if let Some(a) = this.accum { |
| 44 | a.extend(e) |
| 45 | } else { |
| 46 | *this.accum = Some(e) |
| 47 | } |
| 48 | } |
| 49 | } |
| 50 | } |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | impl<St> FusedFuture for Concat<St> |
| 55 | where |
| 56 | St: FusedStream, |
| 57 | St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, |
| 58 | { |
| 59 | fn is_terminated(&self) -> bool { |
| 60 | self.accum.is_none() && self.stream.is_terminated() |
| 61 | } |
| 62 | } |
| 63 | |