1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::TryFuture;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream, TryStream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
12 /// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
13 #[must_use = "streams do nothing unless polled"]
14 pub struct OrElse<St, Fut, F> {
15 #[pin]
16 stream: St,
17 #[pin]
18 future: Option<Fut>,
19 f: F,
20 }
21}
22
23impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F>
24where
25 St: fmt::Debug,
26 Fut: fmt::Debug,
27{
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("OrElse")
30 .field("stream", &self.stream)
31 .field("future", &self.future)
32 .finish()
33 }
34}
35
36impl<St, Fut, F> OrElse<St, Fut, F>
37where
38 St: TryStream,
39 F: FnMut(St::Error) -> Fut,
40 Fut: TryFuture<Ok = St::Ok>,
41{
42 pub(super) fn new(stream: St, f: F) -> Self {
43 Self { stream, future: None, f }
44 }
45
46 delegate_access_inner!(stream, St, ());
47}
48
49impl<St, Fut, F> Stream for OrElse<St, Fut, F>
50where
51 St: TryStream,
52 F: FnMut(St::Error) -> Fut,
53 Fut: TryFuture<Ok = St::Ok>,
54{
55 type Item = Result<St::Ok, Fut::Error>;
56
57 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58 let mut this = self.project();
59
60 Poll::Ready(loop {
61 if let Some(fut) = this.future.as_mut().as_pin_mut() {
62 let item = ready!(fut.try_poll(cx));
63 this.future.set(None);
64 break Some(item);
65 } else {
66 match ready!(this.stream.as_mut().try_poll_next(cx)) {
67 Some(Ok(item)) => break Some(Ok(item)),
68 Some(Err(e)) => {
69 this.future.set(Some((this.f)(e)));
70 }
71 None => break None,
72 }
73 }
74 })
75 }
76
77 fn size_hint(&self) -> (usize, Option<usize>) {
78 let future_len = usize::from(self.future.is_some());
79 let (lower, upper) = self.stream.size_hint();
80 let lower = lower.saturating_add(future_len);
81 let upper = match upper {
82 Some(x) => x.checked_add(future_len),
83 None => None,
84 };
85 (lower, upper)
86 }
87}
88
89impl<St, Fut, F> FusedStream for OrElse<St, Fut, F>
90where
91 St: TryStream + FusedStream,
92 F: FnMut(St::Error) -> Fut,
93 Fut: TryFuture<Ok = St::Ok>,
94{
95 fn is_terminated(&self) -> bool {
96 self.future.is_none() && self.stream.is_terminated()
97 }
98}
99
100// Forwarding impl of Sink from the underlying stream
101#[cfg(feature = "sink")]
102impl<S, Fut, F, Item> Sink<Item> for OrElse<S, Fut, F>
103where
104 S: Sink<Item>,
105{
106 type Error = S::Error;
107
108 delegate_sink!(stream, Item);
109}
110