1use core::fmt;
2use core::pin::Pin;
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6#[cfg(feature = "sink")]
7use futures_sink::Sink;
8use pin_project_lite::pin_project;
9
10use crate::fns::FnMut1;
11
12pin_project! {
13 /// Stream for the [`map`](super::StreamExt::map) method.
14 #[must_use = "streams do nothing unless polled"]
15 pub struct Map<St, F> {
16 #[pin]
17 stream: St,
18 f: F,
19 }
20}
21
22impl<St, F> fmt::Debug for Map<St, F>
23where
24 St: fmt::Debug,
25{
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 f.debug_struct("Map").field("stream", &self.stream).finish()
28 }
29}
30
31impl<St, F> Map<St, F> {
32 pub(crate) fn new(stream: St, f: F) -> Self {
33 Self { stream, f }
34 }
35
36 delegate_access_inner!(stream, St, ());
37}
38
39impl<St, F> FusedStream for Map<St, F>
40where
41 St: FusedStream,
42 F: FnMut1<St::Item>,
43{
44 fn is_terminated(&self) -> bool {
45 self.stream.is_terminated()
46 }
47}
48
49impl<St, F> Stream for Map<St, F>
50where
51 St: Stream,
52 F: FnMut1<St::Item>,
53{
54 type Item = F::Output;
55
56 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 let mut this = self.project();
58 let res = ready!(this.stream.as_mut().poll_next(cx));
59 Poll::Ready(res.map(|x| this.f.call_mut(x)))
60 }
61
62 fn size_hint(&self) -> (usize, Option<usize>) {
63 self.stream.size_hint()
64 }
65}
66
67// Forwarding impl of Sink from the underlying stream
68#[cfg(feature = "sink")]
69impl<St, F, Item> Sink<Item> for Map<St, F>
70where
71 St: Stream + Sink<Item>,
72 F: FnMut1<St::Item>,
73{
74 type Error = St::Error;
75
76 delegate_sink!(stream, Item);
77}
78