| 1 | use super::assert_sink; |
| 2 | use crate::unfold_state::UnfoldState; |
| 3 | use core::{future::Future, pin::Pin}; |
| 4 | use futures_core::ready; |
| 5 | use futures_core::task::{Context, Poll}; |
| 6 | use futures_sink::Sink; |
| 7 | use pin_project_lite::pin_project; |
| 8 | |
| 9 | pin_project! { |
| 10 | /// Sink for the [`unfold`] function. |
| 11 | #[derive (Debug)] |
| 12 | #[must_use = "sinks do nothing unless polled" ] |
| 13 | pub struct Unfold<T, F, R> { |
| 14 | function: F, |
| 15 | #[pin] |
| 16 | state: UnfoldState<T, R>, |
| 17 | } |
| 18 | } |
| 19 | |
| 20 | /// Create a sink from a function which processes one item at a time. |
| 21 | /// |
| 22 | /// # Examples |
| 23 | /// |
| 24 | /// ``` |
| 25 | /// # futures::executor::block_on(async { |
| 26 | /// use futures::sink::{self, SinkExt}; |
| 27 | /// |
| 28 | /// let unfold = sink::unfold(0, |mut sum, i: i32| { |
| 29 | /// async move { |
| 30 | /// sum += i; |
| 31 | /// eprintln!("{}" , i); |
| 32 | /// Ok::<_, futures::never::Never>(sum) |
| 33 | /// } |
| 34 | /// }); |
| 35 | /// futures::pin_mut!(unfold); |
| 36 | /// unfold.send(5).await?; |
| 37 | /// # Ok::<(), futures::never::Never>(()) }).unwrap(); |
| 38 | /// ``` |
| 39 | pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R> |
| 40 | where |
| 41 | F: FnMut(T, Item) -> R, |
| 42 | R: Future<Output = Result<T, E>>, |
| 43 | { |
| 44 | assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } }) |
| 45 | } |
| 46 | |
| 47 | impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R> |
| 48 | where |
| 49 | F: FnMut(T, Item) -> R, |
| 50 | R: Future<Output = Result<T, E>>, |
| 51 | { |
| 52 | type Error = E; |
| 53 | |
| 54 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 55 | self.poll_flush(cx) |
| 56 | } |
| 57 | |
| 58 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
| 59 | let mut this = self.project(); |
| 60 | let future = match this.state.as_mut().take_value() { |
| 61 | Some(value) => (this.function)(value, item), |
| 62 | None => panic!("start_send called without poll_ready being called first" ), |
| 63 | }; |
| 64 | this.state.set(UnfoldState::Future { future }); |
| 65 | Ok(()) |
| 66 | } |
| 67 | |
| 68 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 69 | let mut this = self.project(); |
| 70 | Poll::Ready(if let Some(future) = this.state.as_mut().project_future() { |
| 71 | match ready!(future.poll(cx)) { |
| 72 | Ok(state) => { |
| 73 | this.state.set(UnfoldState::Value { value: state }); |
| 74 | Ok(()) |
| 75 | } |
| 76 | Err(err) => { |
| 77 | this.state.set(UnfoldState::Empty); |
| 78 | Err(err) |
| 79 | } |
| 80 | } |
| 81 | } else { |
| 82 | Ok(()) |
| 83 | }) |
| 84 | } |
| 85 | |
| 86 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 87 | self.poll_flush(cx) |
| 88 | } |
| 89 | } |
| 90 | |