1 | use core::fmt; |
2 | use core::pin::Pin; |
3 | use futures_core::future::{FusedFuture, Future}; |
4 | use futures_core::ready; |
5 | use futures_core::stream::{FusedStream, Stream}; |
6 | use futures_core::task::{Context, Poll}; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Future for the [`count`](super::StreamExt::count) method. |
11 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
12 | pub struct Count<St> { |
13 | #[pin] |
14 | stream: St, |
15 | count: usize |
16 | } |
17 | } |
18 | |
19 | impl<St> fmt::Debug for Count<St> |
20 | where |
21 | St: fmt::Debug, |
22 | { |
23 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
24 | f.debug_struct("Count" ).field("stream" , &self.stream).field("count" , &self.count).finish() |
25 | } |
26 | } |
27 | |
28 | impl<St: Stream> Count<St> { |
29 | pub(super) fn new(stream: St) -> Self { |
30 | Self { stream, count: 0 } |
31 | } |
32 | } |
33 | |
34 | impl<St: FusedStream> FusedFuture for Count<St> { |
35 | fn is_terminated(&self) -> bool { |
36 | self.stream.is_terminated() |
37 | } |
38 | } |
39 | |
40 | impl<St: Stream> Future for Count<St> { |
41 | type Output = usize; |
42 | |
43 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
44 | let mut this = self.project(); |
45 | |
46 | Poll::Ready(loop { |
47 | match ready!(this.stream.as_mut().poll_next(cx)) { |
48 | Some(_) => *this.count += 1, |
49 | None => break *this.count, |
50 | } |
51 | }) |
52 | } |
53 | } |
54 | |