1 | use core::mem; |
2 | use core::pin::Pin; |
3 | use futures_core::future::{FusedFuture, Future}; |
4 | use futures_core::ready; |
5 | use futures_core::stream::{FusedStream, TryStream}; |
6 | use futures_core::task::{Context, Poll}; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method. |
11 | #[derive(Debug)] |
12 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
13 | pub struct TryCollect<St, C> { |
14 | #[pin] |
15 | stream: St, |
16 | items: C, |
17 | } |
18 | } |
19 | |
20 | impl<St: TryStream, C: Default> TryCollect<St, C> { |
21 | pub(super) fn new(s: St) -> Self { |
22 | Self { stream: s, items: Default::default() } |
23 | } |
24 | } |
25 | |
26 | impl<St, C> FusedFuture for TryCollect<St, C> |
27 | where |
28 | St: TryStream + FusedStream, |
29 | C: Default + Extend<St::Ok>, |
30 | { |
31 | fn is_terminated(&self) -> bool { |
32 | self.stream.is_terminated() |
33 | } |
34 | } |
35 | |
36 | impl<St, C> Future for TryCollect<St, C> |
37 | where |
38 | St: TryStream, |
39 | C: Default + Extend<St::Ok>, |
40 | { |
41 | type Output = Result<C, St::Error>; |
42 | |
43 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
44 | let mut this = self.project(); |
45 | Poll::Ready(Ok(loop { |
46 | match ready!(this.stream.as_mut().try_poll_next(cx)?) { |
47 | Some(x) => this.items.extend(Some(x)), |
48 | None => break mem::take(this.items), |
49 | } |
50 | })) |
51 | } |
52 | } |
53 | |