1use crate::Stream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9 /// Stream for the [`take_while`](super::StreamExt::take_while) method.
10 #[must_use = "streams do nothing unless polled"]
11 pub struct TakeWhile<St, F> {
12 #[pin]
13 stream: St,
14 predicate: F,
15 done: bool,
16 }
17}
18
19impl<St, F> fmt::Debug for TakeWhile<St, F>
20where
21 St: fmt::Debug,
22{
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("TakeWhile")
25 .field("stream", &self.stream)
26 .field("done", &self.done)
27 .finish()
28 }
29}
30
31impl<St, F> TakeWhile<St, F> {
32 pub(super) fn new(stream: St, predicate: F) -> Self {
33 Self {
34 stream,
35 predicate,
36 done: false,
37 }
38 }
39}
40
41impl<St, F> Stream for TakeWhile<St, F>
42where
43 St: Stream,
44 F: FnMut(&St::Item) -> bool,
45{
46 type Item = St::Item;
47
48 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49 if !*self.as_mut().project().done {
50 self.as_mut().project().stream.poll_next(cx).map(|ready| {
51 let ready = ready.and_then(|item| {
52 if !(self.as_mut().project().predicate)(&item) {
53 None
54 } else {
55 Some(item)
56 }
57 });
58
59 if ready.is_none() {
60 *self.as_mut().project().done = true;
61 }
62
63 ready
64 })
65 } else {
66 Poll::Ready(None)
67 }
68 }
69
70 fn size_hint(&self) -> (usize, Option<usize>) {
71 if self.done {
72 return (0, Some(0));
73 }
74
75 let (_, upper) = self.stream.size_hint();
76
77 (0, upper)
78 }
79}
80