1 | use crate::Stream; |
2 | |
3 | use core::fmt; |
4 | use core::future::Future; |
5 | use core::pin::Pin; |
6 | use core::task::{Context, Poll}; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Stream for the [`then`](super::StreamExt::then) method. |
11 | #[must_use = "streams do nothing unless polled" ] |
12 | pub struct Then<St, Fut, F> { |
13 | #[pin] |
14 | stream: St, |
15 | #[pin] |
16 | future: Option<Fut>, |
17 | f: F, |
18 | } |
19 | } |
20 | |
21 | impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> |
22 | where |
23 | St: fmt::Debug, |
24 | { |
25 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
26 | f.debug_struct("Then" ) |
27 | .field("stream" , &self.stream) |
28 | .finish() |
29 | } |
30 | } |
31 | |
32 | impl<St, Fut, F> Then<St, Fut, F> { |
33 | pub(super) fn new(stream: St, f: F) -> Self { |
34 | Then { |
35 | stream, |
36 | future: None, |
37 | f, |
38 | } |
39 | } |
40 | } |
41 | |
42 | impl<St, F, Fut> Stream for Then<St, Fut, F> |
43 | where |
44 | St: Stream, |
45 | Fut: Future, |
46 | F: FnMut(St::Item) -> Fut, |
47 | { |
48 | type Item = Fut::Output; |
49 | |
50 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> { |
51 | let mut me = self.project(); |
52 | |
53 | loop { |
54 | if let Some(future) = me.future.as_mut().as_pin_mut() { |
55 | match future.poll(cx) { |
56 | Poll::Ready(item) => { |
57 | me.future.set(None); |
58 | return Poll::Ready(Some(item)); |
59 | } |
60 | Poll::Pending => return Poll::Pending, |
61 | } |
62 | } |
63 | |
64 | match me.stream.as_mut().poll_next(cx) { |
65 | Poll::Ready(Some(item)) => { |
66 | me.future.set(Some((me.f)(item))); |
67 | } |
68 | Poll::Ready(None) => return Poll::Ready(None), |
69 | Poll::Pending => return Poll::Pending, |
70 | } |
71 | } |
72 | } |
73 | |
74 | fn size_hint(&self) -> (usize, Option<usize>) { |
75 | let future_len = usize::from(self.future.is_some()); |
76 | let (lower, upper) = self.stream.size_hint(); |
77 | |
78 | let lower = lower.saturating_add(future_len); |
79 | let upper = upper.and_then(|upper| upper.checked_add(future_len)); |
80 | |
81 | (lower, upper) |
82 | } |
83 | } |
84 | |