1 | use super::assert_stream; |
2 | use core::fmt; |
3 | use core::pin::Pin; |
4 | use futures_core::future::TryFuture; |
5 | use futures_core::ready; |
6 | use futures_core::stream::Stream; |
7 | use futures_core::task::{Context, Poll}; |
8 | use pin_project_lite::pin_project; |
9 | |
10 | /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`. |
11 | /// |
12 | /// This function is the dual for the `TryStream::try_fold()` adapter: while |
13 | /// `TryStream::try_fold()` reduces a `TryStream` to one single value, |
14 | /// `try_unfold()` creates a `TryStream` from a seed value. |
15 | /// |
16 | /// `try_unfold()` will call the provided closure with the provided seed, then |
17 | /// wait for the returned `TryFuture` to complete with `(a, b)`. It will then |
18 | /// yield the value `a`, and use `b` as the next internal state. |
19 | /// |
20 | /// If the closure returns `None` instead of `Some(TryFuture)`, then the |
21 | /// `try_unfold()` will stop producing items and return `Poll::Ready(None)` in |
22 | /// future calls to `poll()`. |
23 | /// |
24 | /// In case of error generated by the returned `TryFuture`, the error will be |
25 | /// returned by the `TryStream`. The `TryStream` will then yield |
26 | /// `Poll::Ready(None)` in future calls to `poll()`. |
27 | /// |
28 | /// This function can typically be used when wanting to go from the "world of |
29 | /// futures" to the "world of streams": the provided closure can build a |
30 | /// `TryFuture` using other library functions working on futures, and |
31 | /// `try_unfold()` will turn it into a `TryStream` by repeating the operation. |
32 | /// |
33 | /// # Example |
34 | /// |
35 | /// ``` |
36 | /// # #[derive(Debug, PartialEq)] |
37 | /// # struct SomeError; |
38 | /// # futures::executor::block_on(async { |
39 | /// use futures::stream::{self, TryStreamExt}; |
40 | /// |
41 | /// let stream = stream::try_unfold(0, |state| async move { |
42 | /// if state < 0 { |
43 | /// return Err(SomeError); |
44 | /// } |
45 | /// |
46 | /// if state <= 2 { |
47 | /// let next_state = state + 1; |
48 | /// let yielded = state * 2; |
49 | /// Ok(Some((yielded, next_state))) |
50 | /// } else { |
51 | /// Ok(None) |
52 | /// } |
53 | /// }); |
54 | /// |
55 | /// let result: Result<Vec<i32>, _> = stream.try_collect().await; |
56 | /// assert_eq!(result, Ok(vec![0, 2, 4])); |
57 | /// # }); |
58 | /// ``` |
59 | pub fn try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut> |
60 | where |
61 | F: FnMut(T) -> Fut, |
62 | Fut: TryFuture<Ok = Option<(Item, T)>>, |
63 | { |
64 | assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None }) |
65 | } |
66 | |
67 | pin_project! { |
68 | /// Stream for the [`try_unfold`] function. |
69 | #[must_use = "streams do nothing unless polled" ] |
70 | pub struct TryUnfold<T, F, Fut> { |
71 | f: F, |
72 | state: Option<T>, |
73 | #[pin] |
74 | fut: Option<Fut>, |
75 | } |
76 | } |
77 | |
78 | impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut> |
79 | where |
80 | T: fmt::Debug, |
81 | Fut: fmt::Debug, |
82 | { |
83 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
84 | f.debug_struct("TryUnfold" ).field("state" , &self.state).field("fut" , &self.fut).finish() |
85 | } |
86 | } |
87 | |
88 | impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut> |
89 | where |
90 | F: FnMut(T) -> Fut, |
91 | Fut: TryFuture<Ok = Option<(Item, T)>>, |
92 | { |
93 | type Item = Result<Item, Fut::Error>; |
94 | |
95 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
96 | let mut this = self.project(); |
97 | |
98 | if let Some(state) = this.state.take() { |
99 | this.fut.set(Some((this.f)(state))); |
100 | } |
101 | |
102 | match this.fut.as_mut().as_pin_mut() { |
103 | None => { |
104 | // The future previously errored |
105 | Poll::Ready(None) |
106 | } |
107 | Some(future) => { |
108 | let step = ready!(future.try_poll(cx)); |
109 | this.fut.set(None); |
110 | |
111 | match step { |
112 | Ok(Some((item, next_state))) => { |
113 | *this.state = Some(next_state); |
114 | Poll::Ready(Some(Ok(item))) |
115 | } |
116 | Ok(None) => Poll::Ready(None), |
117 | Err(e) => Poll::Ready(Some(Err(e))), |
118 | } |
119 | } |
120 | } |
121 | } |
122 | } |
123 | |