1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::Stream;
5use pin_project_lite::pin_project;
6
7use crate::stream_ext::Fuse;
8use crate::StreamExt;
9
10pin_project! {
11 /// Stream returned by the [`chain`](super::StreamExt::peekable) method.
12 pub struct Peekable<T: Stream> {
13 peek: Option<T::Item>,
14 #[pin]
15 stream: Fuse<T>,
16 }
17}
18
19impl<T: Stream> Peekable<T> {
20 pub(crate) fn new(stream: T) -> Self {
21 let stream = stream.fuse();
22 Self { peek: None, stream }
23 }
24
25 /// Peek at the next item in the stream.
26 pub async fn peek(&mut self) -> Option<&T::Item>
27 where
28 T: Unpin,
29 {
30 if let Some(ref it) = self.peek {
31 Some(it)
32 } else {
33 self.peek = self.next().await;
34 self.peek.as_ref()
35 }
36 }
37}
38
39impl<T: Stream> Stream for Peekable<T> {
40 type Item = T::Item;
41
42 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 let this = self.project();
44 if let Some(it) = this.peek.take() {
45 Poll::Ready(Some(it))
46 } else {
47 this.stream.poll_next(cx)
48 }
49 }
50}
51