1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::TryFuture;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream, TryStream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
12 /// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
13 #[must_use = "streams do nothing unless polled"]
14 pub struct AndThen<St, Fut, F> {
15 #[pin]
16 stream: St,
17 #[pin]
18 future: Option<Fut>,
19 f: F,
20 }
21}
22
23impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F>
24where
25 St: fmt::Debug,
26 Fut: fmt::Debug,
27{
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f&mut DebugStruct<'_, '_>.debug_struct("AndThen")
30 .field("stream", &self.stream)
31 .field(name:"future", &self.future)
32 .finish()
33 }
34}
35
36impl<St, Fut, F> AndThen<St, Fut, F>
37where
38 St: TryStream,
39 F: FnMut(St::Ok) -> Fut,
40 Fut: TryFuture<Error = St::Error>,
41{
42 pub(super) fn new(stream: St, f: F) -> Self {
43 Self { stream, future: None, f }
44 }
45
46 delegate_access_inner!(stream, St, ());
47}
48
49impl<St, Fut, F> Stream for AndThen<St, Fut, F>
50where
51 St: TryStream,
52 F: FnMut(St::Ok) -> Fut,
53 Fut: TryFuture<Error = St::Error>,
54{
55 type Item = Result<Fut::Ok, St::Error>;
56
57 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58 let mut this = self.project();
59
60 Poll::Ready(loop {
61 if let Some(fut) = this.future.as_mut().as_pin_mut() {
62 let item = ready!(fut.try_poll(cx));
63 this.future.set(None);
64 break Some(item);
65 } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
66 this.future.set(Some((this.f)(item)));
67 } else {
68 break None;
69 }
70 })
71 }
72
73 fn size_hint(&self) -> (usize, Option<usize>) {
74 let future_len = usize::from(self.future.is_some());
75 let (lower, upper) = self.stream.size_hint();
76 let lower = lower.saturating_add(future_len);
77 let upper = match upper {
78 Some(x) => x.checked_add(future_len),
79 None => None,
80 };
81 (lower, upper)
82 }
83}
84
85impl<St, Fut, F> FusedStream for AndThen<St, Fut, F>
86where
87 St: TryStream + FusedStream,
88 F: FnMut(St::Ok) -> Fut,
89 Fut: TryFuture<Error = St::Error>,
90{
91 fn is_terminated(&self) -> bool {
92 self.future.is_none() && self.stream.is_terminated()
93 }
94}
95
96// Forwarding impl of Sink from the underlying stream
97#[cfg(feature = "sink")]
98impl<S, Fut, F, Item> Sink<Item> for AndThen<S, Fut, F>
99where
100 S: Sink<Item>,
101{
102 type Error = S::Error;
103
104 delegate_sink!(stream, Item);
105}
106