1 | use core::fmt; |
2 | use core::pin::Pin; |
3 | use futures_core::future::Future; |
4 | use futures_core::ready; |
5 | use futures_core::stream::{FusedStream, Stream}; |
6 | use futures_core::task::{Context, Poll}; |
7 | #[cfg (feature = "sink" )] |
8 | use futures_sink::Sink; |
9 | use pin_project_lite::pin_project; |
10 | |
11 | struct StateFn<S, F> { |
12 | state: S, |
13 | f: F, |
14 | } |
15 | |
16 | pin_project! { |
17 | /// Stream for the [`scan`](super::StreamExt::scan) method. |
18 | #[must_use = "streams do nothing unless polled" ] |
19 | pub struct Scan<St: Stream, S, Fut, F> { |
20 | #[pin] |
21 | stream: St, |
22 | state_f: Option<StateFn<S, F>>, |
23 | #[pin] |
24 | future: Option<Fut>, |
25 | } |
26 | } |
27 | |
28 | impl<St, S, Fut, F> fmt::Debug for Scan<St, S, Fut, F> |
29 | where |
30 | St: Stream + fmt::Debug, |
31 | St::Item: fmt::Debug, |
32 | S: fmt::Debug, |
33 | Fut: fmt::Debug, |
34 | { |
35 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
36 | f.debug_struct("Scan" ) |
37 | .field("stream" , &self.stream) |
38 | .field("state" , &self.state_f.as_ref().map(|s| &s.state)) |
39 | .field("future" , &self.future) |
40 | .field("done_taking" , &self.is_done_taking()) |
41 | .finish() |
42 | } |
43 | } |
44 | |
45 | impl<St: Stream, S, Fut, F> Scan<St, S, Fut, F> { |
46 | /// Checks if internal state is `None`. |
47 | fn is_done_taking(&self) -> bool { |
48 | self.state_f.is_none() |
49 | } |
50 | } |
51 | |
52 | impl<B, St, S, Fut, F> Scan<St, S, Fut, F> |
53 | where |
54 | St: Stream, |
55 | F: FnMut(&mut S, St::Item) -> Fut, |
56 | Fut: Future<Output = Option<B>>, |
57 | { |
58 | pub(super) fn new(stream: St, initial_state: S, f: F) -> Self { |
59 | Self { stream, state_f: Some(StateFn { state: initial_state, f }), future: None } |
60 | } |
61 | |
62 | delegate_access_inner!(stream, St, ()); |
63 | } |
64 | |
65 | impl<B, St, S, Fut, F> Stream for Scan<St, S, Fut, F> |
66 | where |
67 | St: Stream, |
68 | F: FnMut(&mut S, St::Item) -> Fut, |
69 | Fut: Future<Output = Option<B>>, |
70 | { |
71 | type Item = B; |
72 | |
73 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> { |
74 | if self.is_done_taking() { |
75 | return Poll::Ready(None); |
76 | } |
77 | |
78 | let mut this = self.project(); |
79 | |
80 | Poll::Ready(loop { |
81 | if let Some(fut) = this.future.as_mut().as_pin_mut() { |
82 | let item = ready!(fut.poll(cx)); |
83 | this.future.set(None); |
84 | |
85 | if item.is_none() { |
86 | *this.state_f = None; |
87 | } |
88 | |
89 | break item; |
90 | } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { |
91 | let state_f = this.state_f.as_mut().unwrap(); |
92 | this.future.set(Some((state_f.f)(&mut state_f.state, item))) |
93 | } else { |
94 | break None; |
95 | } |
96 | }) |
97 | } |
98 | |
99 | fn size_hint(&self) -> (usize, Option<usize>) { |
100 | if self.is_done_taking() { |
101 | (0, Some(0)) |
102 | } else { |
103 | self.stream.size_hint() // can't know a lower bound, due to the predicate |
104 | } |
105 | } |
106 | } |
107 | |
108 | impl<B, St, S, Fut, F> FusedStream for Scan<St, S, Fut, F> |
109 | where |
110 | St: FusedStream, |
111 | F: FnMut(&mut S, St::Item) -> Fut, |
112 | Fut: Future<Output = Option<B>>, |
113 | { |
114 | fn is_terminated(&self) -> bool { |
115 | self.is_done_taking() || self.future.is_none() && self.stream.is_terminated() |
116 | } |
117 | } |
118 | |
119 | // Forwarding impl of Sink from the underlying stream |
120 | #[cfg (feature = "sink" )] |
121 | impl<St, S, Fut, F, Item> Sink<Item> for Scan<St, S, Fut, F> |
122 | where |
123 | St: Stream + Sink<Item>, |
124 | { |
125 | type Error = St::Error; |
126 | |
127 | delegate_sink!(stream, Item); |
128 | } |
129 | |