1 | use core::mem; |
2 | use core::pin::Pin; |
3 | use futures_core::future::{FusedFuture, Future}; |
4 | use futures_core::ready; |
5 | use futures_core::stream::{FusedStream, Stream}; |
6 | use futures_core::task::{Context, Poll}; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Future for the [`collect`](super::StreamExt::collect) method. |
11 | #[derive(Debug)] |
12 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
13 | pub struct Collect<St, C> { |
14 | #[pin] |
15 | stream: St, |
16 | collection: C, |
17 | } |
18 | } |
19 | |
20 | impl<St: Stream, C: Default> Collect<St, C> { |
21 | fn finish(self: Pin<&mut Self>) -> C { |
22 | mem::take(self.project().collection) |
23 | } |
24 | |
25 | pub(super) fn new(stream: St) -> Self { |
26 | Self { stream, collection: Default::default() } |
27 | } |
28 | } |
29 | |
30 | impl<St, C> FusedFuture for Collect<St, C> |
31 | where |
32 | St: FusedStream, |
33 | C: Default + Extend<St::Item>, |
34 | { |
35 | fn is_terminated(&self) -> bool { |
36 | self.stream.is_terminated() |
37 | } |
38 | } |
39 | |
40 | impl<St, C> Future for Collect<St, C> |
41 | where |
42 | St: Stream, |
43 | C: Default + Extend<St::Item>, |
44 | { |
45 | type Output = C; |
46 | |
47 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { |
48 | let mut this = self.as_mut().project(); |
49 | loop { |
50 | match ready!(this.stream.as_mut().poll_next(cx)) { |
51 | Some(e) => this.collection.extend(Some(e)), |
52 | None => return Poll::Ready(self.finish()), |
53 | } |
54 | } |
55 | } |
56 | } |
57 | |