1use super::assert_sink;
2use crate::unfold_state::UnfoldState;
3use core::{future::Future, pin::Pin};
4use futures_core::ready;
5use futures_core::task::{Context, Poll};
6use futures_sink::Sink;
7use pin_project_lite::pin_project;
8
9pin_project! {
10 /// Sink for the [`unfold`] function.
11 #[derive(Debug)]
12 #[must_use = "sinks do nothing unless polled"]
13 pub struct Unfold<T, F, R> {
14 function: F,
15 #[pin]
16 state: UnfoldState<T, R>,
17 }
18}
19
20/// Create a sink from a function which processes one item at a time.
21///
22/// # Examples
23///
24/// ```
25/// # futures::executor::block_on(async {
26/// use futures::sink::{self, SinkExt};
27///
28/// let unfold = sink::unfold(0, |mut sum, i: i32| {
29/// async move {
30/// sum += i;
31/// eprintln!("{}", i);
32/// Ok::<_, futures::never::Never>(sum)
33/// }
34/// });
35/// futures::pin_mut!(unfold);
36/// unfold.send(5).await?;
37/// # Ok::<(), futures::never::Never>(()) }).unwrap();
38/// ```
39pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
40where
41 F: FnMut(T, Item) -> R,
42 R: Future<Output = Result<T, E>>,
43{
44 assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
45}
46
47impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
48where
49 F: FnMut(T, Item) -> R,
50 R: Future<Output = Result<T, E>>,
51{
52 type Error = E;
53
54 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55 self.poll_flush(cx)
56 }
57
58 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
59 let mut this = self.project();
60 let future = match this.state.as_mut().take_value() {
61 Some(value) => (this.function)(value, item),
62 None => panic!("start_send called without poll_ready being called first"),
63 };
64 this.state.set(UnfoldState::Future { future });
65 Ok(())
66 }
67
68 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69 let mut this = self.project();
70 Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
71 match ready!(future.poll(cx)) {
72 Ok(state) => {
73 this.state.set(UnfoldState::Value { value: state });
74 Ok(())
75 }
76 Err(err) => {
77 this.state.set(UnfoldState::Empty);
78 Err(err)
79 }
80 }
81 } else {
82 Ok(())
83 })
84 }
85
86 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 self.poll_flush(cx)
88 }
89}
90