1 | use crate::Stream; |
2 | |
3 | use core::fmt; |
4 | use core::pin::Pin; |
5 | use core::task::{Context, Poll}; |
6 | use pin_project_lite::pin_project; |
7 | |
8 | pin_project! { |
9 | /// Stream for the [`take_while`](super::StreamExt::take_while) method. |
10 | #[must_use = "streams do nothing unless polled" ] |
11 | pub struct TakeWhile<St, F> { |
12 | #[pin] |
13 | stream: St, |
14 | predicate: F, |
15 | done: bool, |
16 | } |
17 | } |
18 | |
19 | impl<St, F> fmt::Debug for TakeWhile<St, F> |
20 | where |
21 | St: fmt::Debug, |
22 | { |
23 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
24 | f.debug_struct("TakeWhile" ) |
25 | .field("stream" , &self.stream) |
26 | .field("done" , &self.done) |
27 | .finish() |
28 | } |
29 | } |
30 | |
31 | impl<St, F> TakeWhile<St, F> { |
32 | pub(super) fn new(stream: St, predicate: F) -> Self { |
33 | Self { |
34 | stream, |
35 | predicate, |
36 | done: false, |
37 | } |
38 | } |
39 | } |
40 | |
41 | impl<St, F> Stream for TakeWhile<St, F> |
42 | where |
43 | St: Stream, |
44 | F: FnMut(&St::Item) -> bool, |
45 | { |
46 | type Item = St::Item; |
47 | |
48 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
49 | if !*self.as_mut().project().done { |
50 | self.as_mut().project().stream.poll_next(cx).map(|ready| { |
51 | let ready = ready.and_then(|item| { |
52 | if !(self.as_mut().project().predicate)(&item) { |
53 | None |
54 | } else { |
55 | Some(item) |
56 | } |
57 | }); |
58 | |
59 | if ready.is_none() { |
60 | *self.as_mut().project().done = true; |
61 | } |
62 | |
63 | ready |
64 | }) |
65 | } else { |
66 | Poll::Ready(None) |
67 | } |
68 | } |
69 | |
70 | fn size_hint(&self) -> (usize, Option<usize>) { |
71 | if self.done { |
72 | return (0, Some(0)); |
73 | } |
74 | |
75 | let (_, upper) = self.stream.size_hint(); |
76 | |
77 | (0, upper) |
78 | } |
79 | } |
80 | |