| 1 | use super::assert_stream; |
| 2 | use core::fmt; |
| 3 | use core::pin::Pin; |
| 4 | use futures_core::future::TryFuture; |
| 5 | use futures_core::ready; |
| 6 | use futures_core::stream::Stream; |
| 7 | use futures_core::task::{Context, Poll}; |
| 8 | use pin_project_lite::pin_project; |
| 9 | |
| 10 | /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`. |
| 11 | /// |
| 12 | /// This function is the dual for the `TryStream::try_fold()` adapter: while |
| 13 | /// `TryStream::try_fold()` reduces a `TryStream` to one single value, |
| 14 | /// `try_unfold()` creates a `TryStream` from a seed value. |
| 15 | /// |
| 16 | /// `try_unfold()` will call the provided closure with the provided seed, then |
| 17 | /// wait for the returned `TryFuture` to complete with `(a, b)`. It will then |
| 18 | /// yield the value `a`, and use `b` as the next internal state. |
| 19 | /// |
| 20 | /// If the closure returns `None` instead of `Some(TryFuture)`, then the |
| 21 | /// `try_unfold()` will stop producing items and return `Poll::Ready(None)` in |
| 22 | /// future calls to `poll()`. |
| 23 | /// |
| 24 | /// In case of error generated by the returned `TryFuture`, the error will be |
| 25 | /// returned by the `TryStream`. The `TryStream` will then yield |
| 26 | /// `Poll::Ready(None)` in future calls to `poll()`. |
| 27 | /// |
| 28 | /// This function can typically be used when wanting to go from the "world of |
| 29 | /// futures" to the "world of streams": the provided closure can build a |
| 30 | /// `TryFuture` using other library functions working on futures, and |
| 31 | /// `try_unfold()` will turn it into a `TryStream` by repeating the operation. |
| 32 | /// |
| 33 | /// # Example |
| 34 | /// |
| 35 | /// ``` |
| 36 | /// # #[derive(Debug, PartialEq)] |
| 37 | /// # struct SomeError; |
| 38 | /// # futures::executor::block_on(async { |
| 39 | /// use futures::stream::{self, TryStreamExt}; |
| 40 | /// |
| 41 | /// let stream = stream::try_unfold(0, |state| async move { |
| 42 | /// if state < 0 { |
| 43 | /// return Err(SomeError); |
| 44 | /// } |
| 45 | /// |
| 46 | /// if state <= 2 { |
| 47 | /// let next_state = state + 1; |
| 48 | /// let yielded = state * 2; |
| 49 | /// Ok(Some((yielded, next_state))) |
| 50 | /// } else { |
| 51 | /// Ok(None) |
| 52 | /// } |
| 53 | /// }); |
| 54 | /// |
| 55 | /// let result: Result<Vec<i32>, _> = stream.try_collect().await; |
| 56 | /// assert_eq!(result, Ok(vec![0, 2, 4])); |
| 57 | /// # }); |
| 58 | /// ``` |
| 59 | pub fn try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut> |
| 60 | where |
| 61 | F: FnMut(T) -> Fut, |
| 62 | Fut: TryFuture<Ok = Option<(Item, T)>>, |
| 63 | { |
| 64 | assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None }) |
| 65 | } |
| 66 | |
| 67 | pin_project! { |
| 68 | /// Stream for the [`try_unfold`] function. |
| 69 | #[must_use = "streams do nothing unless polled" ] |
| 70 | pub struct TryUnfold<T, F, Fut> { |
| 71 | f: F, |
| 72 | state: Option<T>, |
| 73 | #[pin] |
| 74 | fut: Option<Fut>, |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut> |
| 79 | where |
| 80 | T: fmt::Debug, |
| 81 | Fut: fmt::Debug, |
| 82 | { |
| 83 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 84 | f.debug_struct("TryUnfold" ).field("state" , &self.state).field(name:"fut" , &self.fut).finish() |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut> |
| 89 | where |
| 90 | F: FnMut(T) -> Fut, |
| 91 | Fut: TryFuture<Ok = Option<(Item, T)>>, |
| 92 | { |
| 93 | type Item = Result<Item, Fut::Error>; |
| 94 | |
| 95 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 96 | let mut this = self.project(); |
| 97 | |
| 98 | if let Some(state) = this.state.take() { |
| 99 | this.fut.set(Some((this.f)(state))); |
| 100 | } |
| 101 | |
| 102 | match this.fut.as_mut().as_pin_mut() { |
| 103 | None => { |
| 104 | // The future previously errored |
| 105 | Poll::Ready(None) |
| 106 | } |
| 107 | Some(future) => { |
| 108 | let step = ready!(future.try_poll(cx)); |
| 109 | this.fut.set(None); |
| 110 | |
| 111 | match step { |
| 112 | Ok(Some((item, next_state))) => { |
| 113 | *this.state = Some(next_state); |
| 114 | Poll::Ready(Some(Ok(item))) |
| 115 | } |
| 116 | Ok(None) => Poll::Ready(None), |
| 117 | Err(e) => Poll::Ready(Some(Err(e))), |
| 118 | } |
| 119 | } |
| 120 | } |
| 121 | } |
| 122 | } |
| 123 | |