1use crate::Stream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9 /// Stream returned by the [`filter`](super::StreamExt::filter) method.
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Filter<St, F> {
12 #[pin]
13 stream: St,
14 f: F,
15 }
16}
17
18impl<St, F> fmt::Debug for Filter<St, F>
19where
20 St: fmt::Debug,
21{
22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23 f.debug_struct("Filter")
24 .field("stream", &self.stream)
25 .finish()
26 }
27}
28
29impl<St, F> Filter<St, F> {
30 pub(super) fn new(stream: St, f: F) -> Self {
31 Self { stream, f }
32 }
33}
34
35impl<St, F> Stream for Filter<St, F>
36where
37 St: Stream,
38 F: FnMut(&St::Item) -> bool,
39{
40 type Item = St::Item;
41
42 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
43 loop {
44 match ready!(self.as_mut().project().stream.poll_next(cx)) {
45 Some(e) => {
46 if (self.as_mut().project().f)(&e) {
47 return Poll::Ready(Some(e));
48 }
49 }
50 None => return Poll::Ready(None),
51 }
52 }
53 }
54
55 fn size_hint(&self) -> (usize, Option<usize>) {
56 (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate
57 }
58}
59