1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::TryFuture;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream, TryStream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
12 /// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while)
13 /// method.
14 #[must_use = "streams do nothing unless polled"]
15 pub struct TryTakeWhile<St, Fut, F>
16 where
17 St: TryStream,
18 {
19 #[pin]
20 stream: St,
21 f: F,
22 #[pin]
23 pending_fut: Option<Fut>,
24 pending_item: Option<St::Ok>,
25 done_taking: bool,
26 }
27}
28
29impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F>
30where
31 St: TryStream + fmt::Debug,
32 St::Ok: fmt::Debug,
33 Fut: fmt::Debug,
34{
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 f.debug_struct("TryTakeWhile")
37 .field("stream", &self.stream)
38 .field("pending_fut", &self.pending_fut)
39 .field("pending_item", &self.pending_item)
40 .field("done_taking", &self.done_taking)
41 .finish()
42 }
43}
44
45impl<St, Fut, F> TryTakeWhile<St, Fut, F>
46where
47 St: TryStream,
48 F: FnMut(&St::Ok) -> Fut,
49 Fut: TryFuture<Ok = bool, Error = St::Error>,
50{
51 pub(super) fn new(stream: St, f: F) -> Self {
52 Self { stream, f, pending_fut: None, pending_item: None, done_taking: false }
53 }
54
55 delegate_access_inner!(stream, St, ());
56}
57
58impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F>
59where
60 St: TryStream,
61 F: FnMut(&St::Ok) -> Fut,
62 Fut: TryFuture<Ok = bool, Error = St::Error>,
63{
64 type Item = Result<St::Ok, St::Error>;
65
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 let mut this = self.project();
68
69 if *this.done_taking {
70 return Poll::Ready(None);
71 }
72
73 Poll::Ready(loop {
74 if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
75 let res = ready!(fut.try_poll(cx));
76 this.pending_fut.set(None);
77 let take = res?;
78 let item = this.pending_item.take();
79 if take {
80 break item.map(Ok);
81 } else {
82 *this.done_taking = true;
83 break None;
84 }
85 } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
86 this.pending_fut.set(Some((this.f)(&item)));
87 *this.pending_item = Some(item);
88 } else {
89 break None;
90 }
91 })
92 }
93
94 fn size_hint(&self) -> (usize, Option<usize>) {
95 if self.done_taking {
96 return (0, Some(0));
97 }
98
99 let pending_len = usize::from(self.pending_item.is_some());
100 let (_, upper) = self.stream.size_hint();
101 let upper = match upper {
102 Some(x) => x.checked_add(pending_len),
103 None => None,
104 };
105 (0, upper) // can't know a lower bound, due to the predicate
106 }
107}
108
109impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F>
110where
111 St: TryStream + FusedStream,
112 F: FnMut(&St::Ok) -> Fut,
113 Fut: TryFuture<Ok = bool, Error = St::Error>,
114{
115 fn is_terminated(&self) -> bool {
116 self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
117 }
118}
119
120// Forwarding impl of Sink from the underlying stream
121#[cfg(feature = "sink")]
122impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F>
123where
124 S: TryStream + Sink<Item, Error = E>,
125{
126 type Error = E;
127
128 delegate_sink!(stream, Item);
129}
130