| 1 | use super::assert_stream; |
| 2 | use crate::unfold_state::UnfoldState; |
| 3 | use core::fmt; |
| 4 | use core::pin::Pin; |
| 5 | use futures_core::future::Future; |
| 6 | use futures_core::ready; |
| 7 | use futures_core::stream::{FusedStream, Stream}; |
| 8 | use futures_core::task::{Context, Poll}; |
| 9 | use pin_project_lite::pin_project; |
| 10 | |
| 11 | /// Creates a `Stream` from a seed and a closure returning a `Future`. |
| 12 | /// |
| 13 | /// This function is the dual for the `Stream::fold()` adapter: while |
| 14 | /// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a |
| 15 | /// `Stream` from a seed value. |
| 16 | /// |
| 17 | /// `unfold()` will call the provided closure with the provided seed, then wait |
| 18 | /// for the returned `Future` to complete with `(a, b)`. It will then yield the |
| 19 | /// value `a`, and use `b` as the next internal state. |
| 20 | /// |
| 21 | /// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` |
| 22 | /// will stop producing items and return `Poll::Ready(None)` in future |
| 23 | /// calls to `poll()`. |
| 24 | /// |
| 25 | /// This function can typically be used when wanting to go from the "world of |
| 26 | /// futures" to the "world of streams": the provided closure can build a |
| 27 | /// `Future` using other library functions working on futures, and `unfold()` |
| 28 | /// will turn it into a `Stream` by repeating the operation. |
| 29 | /// |
| 30 | /// # Example |
| 31 | /// |
| 32 | /// ``` |
| 33 | /// # futures::executor::block_on(async { |
| 34 | /// use futures::stream::{self, StreamExt}; |
| 35 | /// |
| 36 | /// let stream = stream::unfold(0, |state| async move { |
| 37 | /// if state <= 2 { |
| 38 | /// let next_state = state + 1; |
| 39 | /// let yielded = state * 2; |
| 40 | /// Some((yielded, next_state)) |
| 41 | /// } else { |
| 42 | /// None |
| 43 | /// } |
| 44 | /// }); |
| 45 | /// |
| 46 | /// let result = stream.collect::<Vec<i32>>().await; |
| 47 | /// assert_eq!(result, vec![0, 2, 4]); |
| 48 | /// # }); |
| 49 | /// ``` |
| 50 | pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut> |
| 51 | where |
| 52 | F: FnMut(T) -> Fut, |
| 53 | Fut: Future<Output = Option<(Item, T)>>, |
| 54 | { |
| 55 | assert_stream::<Item, _>(Unfold { f, state: UnfoldState::Value { value: init } }) |
| 56 | } |
| 57 | |
| 58 | pin_project! { |
| 59 | /// Stream for the [`unfold`] function. |
| 60 | #[must_use = "streams do nothing unless polled" ] |
| 61 | pub struct Unfold<T, F, Fut> { |
| 62 | f: F, |
| 63 | #[pin] |
| 64 | state: UnfoldState<T, Fut>, |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut> |
| 69 | where |
| 70 | T: fmt::Debug, |
| 71 | Fut: fmt::Debug, |
| 72 | { |
| 73 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 74 | f.debug_struct("Unfold" ).field(name:"state" , &self.state).finish() |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut> |
| 79 | where |
| 80 | F: FnMut(T) -> Fut, |
| 81 | Fut: Future<Output = Option<(Item, T)>>, |
| 82 | { |
| 83 | fn is_terminated(&self) -> bool { |
| 84 | if let UnfoldState::Empty = self.state { |
| 85 | true |
| 86 | } else { |
| 87 | false |
| 88 | } |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut> |
| 93 | where |
| 94 | F: FnMut(T) -> Fut, |
| 95 | Fut: Future<Output = Option<(Item, T)>>, |
| 96 | { |
| 97 | type Item = Item; |
| 98 | |
| 99 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 100 | let mut this: Projection<'_, T, F, Fut> = self.project(); |
| 101 | |
| 102 | if let Some(state: T) = this.state.as_mut().take_value() { |
| 103 | this.state.set(UnfoldState::Future { future: (this.f)(state) }); |
| 104 | } |
| 105 | |
| 106 | let step: Option<(Item, T)> = match this.state.as_mut().project_future() { |
| 107 | Some(fut: Pin<&mut Fut>) => ready!(fut.poll(cx)), |
| 108 | None => panic!("Unfold must not be polled after it returned `Poll::Ready(None)`" ), |
| 109 | }; |
| 110 | |
| 111 | if let Some((item: Item, next_state: T)) = step { |
| 112 | this.state.set(UnfoldState::Value { value: next_state }); |
| 113 | Poll::Ready(Some(item)) |
| 114 | } else { |
| 115 | this.state.set(UnfoldState::Empty); |
| 116 | Poll::Ready(None) |
| 117 | } |
| 118 | } |
| 119 | } |
| 120 | |