1use crate::stream_ext::Fuse;
2use crate::{Elapsed, Stream};
3use tokio::time::Interval;
4
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10 /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method.
11 #[must_use = "streams do nothing unless polled"]
12 #[derive(Debug)]
13 pub struct TimeoutRepeating<S> {
14 #[pin]
15 stream: Fuse<S>,
16 #[pin]
17 interval: Interval,
18 }
19}
20
21impl<S: Stream> TimeoutRepeating<S> {
22 pub(super) fn new(stream: S, interval: Interval) -> Self {
23 TimeoutRepeating {
24 stream: Fuse::new(stream),
25 interval,
26 }
27 }
28}
29
30impl<S: Stream> Stream for TimeoutRepeating<S> {
31 type Item = Result<S::Item, Elapsed>;
32
33 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34 let mut me = self.project();
35
36 match me.stream.poll_next(cx) {
37 Poll::Ready(v) => {
38 if v.is_some() {
39 me.interval.reset();
40 }
41 return Poll::Ready(v.map(Ok));
42 }
43 Poll::Pending => {}
44 };
45
46 ready!(me.interval.poll_tick(cx));
47 Poll::Ready(Some(Err(Elapsed::new())))
48 }
49
50 fn size_hint(&self) -> (usize, Option<usize>) {
51 let (lower, _) = self.stream.size_hint();
52
53 // The timeout stream may insert an error an infinite number of times.
54 (lower, None)
55 }
56}
57