1use core::pin::Pin;
2use core::usize;
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9 /// Stream for the [`cycle`](super::StreamExt::cycle) method.
10 #[derive(Debug)]
11 #[must_use = "streams do nothing unless polled"]
12 pub struct Cycle<St> {
13 orig: St,
14 #[pin]
15 stream: St,
16 }
17}
18
19impl<St> Cycle<St>
20where
21 St: Clone + Stream,
22{
23 pub(super) fn new(stream: St) -> Self {
24 Self { orig: stream.clone(), stream }
25 }
26}
27
28impl<St> Stream for Cycle<St>
29where
30 St: Clone + Stream,
31{
32 type Item = St::Item;
33
34 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35 let mut this = self.project();
36
37 match ready!(this.stream.as_mut().poll_next(cx)) {
38 None => {
39 this.stream.set(this.orig.clone());
40 this.stream.poll_next(cx)
41 }
42 item => Poll::Ready(item),
43 }
44 }
45
46 fn size_hint(&self) -> (usize, Option<usize>) {
47 // the cycle stream is either empty or infinite
48 match self.orig.size_hint() {
49 size @ (0, Some(0)) => size,
50 (0, _) => (0, None),
51 _ => (usize::max_value(), None),
52 }
53 }
54}
55
56impl<St> FusedStream for Cycle<St>
57where
58 St: Clone + Stream,
59{
60 fn is_terminated(&self) -> bool {
61 // the cycle stream is either empty or infinite
62 if let (0, Some(0)) = self.size_hint() {
63 true
64 } else {
65 false
66 }
67 }
68}
69