1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5#[cfg(feature = "sink")]
6use futures_sink::Sink;
7use pin_project_lite::pin_project;
8
9pin_project! {
10 /// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
11 #[derive(Debug)]
12 #[must_use = "streams do nothing unless polled"]
13 pub struct Enumerate<St> {
14 #[pin]
15 stream: St,
16 count: usize,
17 }
18}
19
20impl<St: Stream> Enumerate<St> {
21 pub(super) fn new(stream: St) -> Self {
22 Self { stream, count: 0 }
23 }
24
25 delegate_access_inner!(stream, St, ());
26}
27
28impl<St: Stream + FusedStream> FusedStream for Enumerate<St> {
29 fn is_terminated(&self) -> bool {
30 self.stream.is_terminated()
31 }
32}
33
34impl<St: Stream> Stream for Enumerate<St> {
35 type Item = (usize, St::Item);
36
37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38 let this = self.project();
39
40 match ready!(this.stream.poll_next(cx)) {
41 Some(item) => {
42 let prev_count = *this.count;
43 *this.count += 1;
44 Poll::Ready(Some((prev_count, item)))
45 }
46 None => Poll::Ready(None),
47 }
48 }
49
50 fn size_hint(&self) -> (usize, Option<usize>) {
51 self.stream.size_hint()
52 }
53}
54
55// Forwarding impl of Sink from the underlying stream
56#[cfg(feature = "sink")]
57impl<S, Item> Sink<Item> for Enumerate<S>
58where
59 S: Stream + Sink<Item>,
60{
61 type Error = S::Error;
62
63 delegate_sink!(stream, Item);
64}
65