1 | use core::pin::Pin; |
2 | use futures_core::ready; |
3 | use futures_core::stream::{FusedStream, Stream}; |
4 | use futures_core::task::{Context, Poll}; |
5 | #[cfg (feature = "sink" )] |
6 | use futures_sink::Sink; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Stream for the [`enumerate`](super::StreamExt::enumerate) method. |
11 | #[derive(Debug)] |
12 | #[must_use = "streams do nothing unless polled" ] |
13 | pub struct Enumerate<St> { |
14 | #[pin] |
15 | stream: St, |
16 | count: usize, |
17 | } |
18 | } |
19 | |
20 | impl<St: Stream> Enumerate<St> { |
21 | pub(super) fn new(stream: St) -> Self { |
22 | Self { stream, count: 0 } |
23 | } |
24 | |
25 | delegate_access_inner!(stream, St, ()); |
26 | } |
27 | |
28 | impl<St: Stream + FusedStream> FusedStream for Enumerate<St> { |
29 | fn is_terminated(&self) -> bool { |
30 | self.stream.is_terminated() |
31 | } |
32 | } |
33 | |
34 | impl<St: Stream> Stream for Enumerate<St> { |
35 | type Item = (usize, St::Item); |
36 | |
37 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
38 | let this = self.project(); |
39 | |
40 | match ready!(this.stream.poll_next(cx)) { |
41 | Some(item) => { |
42 | let prev_count = *this.count; |
43 | *this.count += 1; |
44 | Poll::Ready(Some((prev_count, item))) |
45 | } |
46 | None => Poll::Ready(None), |
47 | } |
48 | } |
49 | |
50 | fn size_hint(&self) -> (usize, Option<usize>) { |
51 | self.stream.size_hint() |
52 | } |
53 | } |
54 | |
55 | // Forwarding impl of Sink from the underlying stream |
56 | #[cfg (feature = "sink" )] |
57 | impl<S, Item> Sink<Item> for Enumerate<S> |
58 | where |
59 | S: Stream + Sink<Item>, |
60 | { |
61 | type Error = S::Error; |
62 | |
63 | delegate_sink!(stream, Item); |
64 | } |
65 | |