1 | use crate::Stream; |
2 | |
3 | use core::fmt; |
4 | use core::pin::Pin; |
5 | use core::task::{Context, Poll}; |
6 | use pin_project_lite::pin_project; |
7 | |
8 | pin_project! { |
9 | /// Stream returned by the [`filter`](super::StreamExt::filter) method. |
10 | #[must_use = "streams do nothing unless polled" ] |
11 | pub struct Filter<St, F> { |
12 | #[pin] |
13 | stream: St, |
14 | f: F, |
15 | } |
16 | } |
17 | |
18 | impl<St, F> fmt::Debug for Filter<St, F> |
19 | where |
20 | St: fmt::Debug, |
21 | { |
22 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
23 | f.debug_struct("Filter" ) |
24 | .field("stream" , &self.stream) |
25 | .finish() |
26 | } |
27 | } |
28 | |
29 | impl<St, F> Filter<St, F> { |
30 | pub(super) fn new(stream: St, f: F) -> Self { |
31 | Self { stream, f } |
32 | } |
33 | } |
34 | |
35 | impl<St, F> Stream for Filter<St, F> |
36 | where |
37 | St: Stream, |
38 | F: FnMut(&St::Item) -> bool, |
39 | { |
40 | type Item = St::Item; |
41 | |
42 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { |
43 | loop { |
44 | match ready!(self.as_mut().project().stream.poll_next(cx)) { |
45 | Some(e) => { |
46 | if (self.as_mut().project().f)(&e) { |
47 | return Poll::Ready(Some(e)); |
48 | } |
49 | } |
50 | None => return Poll::Ready(None), |
51 | } |
52 | } |
53 | } |
54 | |
55 | fn size_hint(&self) -> (usize, Option<usize>) { |
56 | (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate |
57 | } |
58 | } |
59 | |