1 | use core::pin::Pin; |
2 | use futures_core::future::Future; |
3 | use futures_core::ready; |
4 | use futures_core::stream::TryStream; |
5 | use futures_core::task::{Context, Poll}; |
6 | use pin_project_lite::pin_project; |
7 | |
8 | pin_project! { |
9 | /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. |
10 | #[derive(Debug)] |
11 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
12 | pub struct TryConcat<St: TryStream> { |
13 | #[pin] |
14 | stream: St, |
15 | accum: Option<St::Ok>, |
16 | } |
17 | } |
18 | |
19 | impl<St> TryConcat<St> |
20 | where |
21 | St: TryStream, |
22 | St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, |
23 | { |
24 | pub(super) fn new(stream: St) -> Self { |
25 | Self { stream, accum: None } |
26 | } |
27 | } |
28 | |
29 | impl<St> Future for TryConcat<St> |
30 | where |
31 | St: TryStream, |
32 | St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, |
33 | { |
34 | type Output = Result<St::Ok, St::Error>; |
35 | |
36 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
37 | let mut this = self.project(); |
38 | |
39 | Poll::Ready(Ok(loop { |
40 | if let Some(x) = ready!(this.stream.as_mut().try_poll_next(cx)?) { |
41 | if let Some(a) = this.accum { |
42 | a.extend(x) |
43 | } else { |
44 | *this.accum = Some(x) |
45 | } |
46 | } else { |
47 | break this.accum.take().unwrap_or_default(); |
48 | } |
49 | })) |
50 | } |
51 | } |
52 | |