1use super::assert_stream;
2use core::pin::Pin;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5
6/// An stream that repeats elements of type `A` endlessly by
7/// applying the provided closure `F: FnMut() -> A`.
8///
9/// This `struct` is created by the [`repeat_with()`] function.
10/// See its documentation for more.
11#[derive(Debug, Clone)]
12#[must_use = "streams do nothing unless polled"]
13pub struct RepeatWith<F> {
14 repeater: F,
15}
16
17impl<A, F: FnMut() -> A> Unpin for RepeatWith<F> {}
18
19impl<A, F: FnMut() -> A> Stream for RepeatWith<F> {
20 type Item = A;
21
22 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
23 Poll::Ready(Some((&mut self.repeater)()))
24 }
25
26 fn size_hint(&self) -> (usize, Option<usize>) {
27 (usize::max_value(), None)
28 }
29}
30
31impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F> {
32 fn is_terminated(&self) -> bool {
33 false
34 }
35}
36
37/// Creates a new stream that repeats elements of type `A` endlessly by
38/// applying the provided closure, the repeater, `F: FnMut() -> A`.
39///
40/// The `repeat_with()` function calls the repeater over and over again.
41///
42/// Infinite stream like `repeat_with()` are often used with adapters like
43/// [`stream.take()`], in order to make them finite.
44///
45/// If the element type of the stream you need implements [`Clone`], and
46/// it is OK to keep the source element in memory, you should instead use
47/// the [`stream.repeat()`] function.
48///
49/// # Examples
50///
51/// Basic usage:
52///
53/// ```
54/// # futures::executor::block_on(async {
55/// use futures::stream::{self, StreamExt};
56///
57/// // let's assume we have some value of a type that is not `Clone`
58/// // or which don't want to have in memory just yet because it is expensive:
59/// #[derive(PartialEq, Debug)]
60/// struct Expensive;
61///
62/// // a particular value forever:
63/// let mut things = stream::repeat_with(|| Expensive);
64///
65/// assert_eq!(Some(Expensive), things.next().await);
66/// assert_eq!(Some(Expensive), things.next().await);
67/// assert_eq!(Some(Expensive), things.next().await);
68/// # });
69/// ```
70///
71/// Using mutation and going finite:
72///
73/// ```rust
74/// # futures::executor::block_on(async {
75/// use futures::stream::{self, StreamExt};
76///
77/// // From the zeroth to the third power of two:
78/// let mut curr = 1;
79/// let mut pow2 = stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp })
80/// .take(4);
81///
82/// assert_eq!(Some(1), pow2.next().await);
83/// assert_eq!(Some(2), pow2.next().await);
84/// assert_eq!(Some(4), pow2.next().await);
85/// assert_eq!(Some(8), pow2.next().await);
86///
87/// // ... and now we're done
88/// assert_eq!(None, pow2.next().await);
89/// # });
90/// ```
91pub fn repeat_with<A, F: FnMut() -> A>(repeater: F) -> RepeatWith<F> {
92 assert_stream::<A, _>(RepeatWith { repeater })
93}
94