1use crate::Stream;
2
3use core::fmt;
4use core::future::Future;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10 /// Stream for the [`then`](super::StreamExt::then) method.
11 #[must_use = "streams do nothing unless polled"]
12 pub struct Then<St, Fut, F> {
13 #[pin]
14 stream: St,
15 #[pin]
16 future: Option<Fut>,
17 f: F,
18 }
19}
20
21impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
22where
23 St: fmt::Debug,
24{
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 f.debug_struct("Then")
27 .field("stream", &self.stream)
28 .finish()
29 }
30}
31
32impl<St, Fut, F> Then<St, Fut, F> {
33 pub(super) fn new(stream: St, f: F) -> Self {
34 Then {
35 stream,
36 future: None,
37 f,
38 }
39 }
40}
41
42impl<St, F, Fut> Stream for Then<St, Fut, F>
43where
44 St: Stream,
45 Fut: Future,
46 F: FnMut(St::Item) -> Fut,
47{
48 type Item = Fut::Output;
49
50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> {
51 let mut me = self.project();
52
53 loop {
54 if let Some(future) = me.future.as_mut().as_pin_mut() {
55 match future.poll(cx) {
56 Poll::Ready(item) => {
57 me.future.set(None);
58 return Poll::Ready(Some(item));
59 }
60 Poll::Pending => return Poll::Pending,
61 }
62 }
63
64 match me.stream.as_mut().poll_next(cx) {
65 Poll::Ready(Some(item)) => {
66 me.future.set(Some((me.f)(item)));
67 }
68 Poll::Ready(None) => return Poll::Ready(None),
69 Poll::Pending => return Poll::Pending,
70 }
71 }
72 }
73
74 fn size_hint(&self) -> (usize, Option<usize>) {
75 let future_len = usize::from(self.future.is_some());
76 let (lower, upper) = self.stream.size_hint();
77
78 let lower = lower.saturating_add(future_len);
79 let upper = upper.and_then(|upper| upper.checked_add(future_len));
80
81 (lower, upper)
82 }
83}
84