1 | use crate::stream_ext::Fuse; |
2 | use crate::{Elapsed, Stream}; |
3 | use tokio::time::Interval; |
4 | |
5 | use core::pin::Pin; |
6 | use core::task::{Context, Poll}; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method. |
11 | #[must_use = "streams do nothing unless polled" ] |
12 | #[derive(Debug)] |
13 | pub struct TimeoutRepeating<S> { |
14 | #[pin] |
15 | stream: Fuse<S>, |
16 | #[pin] |
17 | interval: Interval, |
18 | } |
19 | } |
20 | |
21 | impl<S: Stream> TimeoutRepeating<S> { |
22 | pub(super) fn new(stream: S, interval: Interval) -> Self { |
23 | TimeoutRepeating { |
24 | stream: Fuse::new(stream), |
25 | interval, |
26 | } |
27 | } |
28 | } |
29 | |
30 | impl<S: Stream> Stream for TimeoutRepeating<S> { |
31 | type Item = Result<S::Item, Elapsed>; |
32 | |
33 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
34 | let mut me = self.project(); |
35 | |
36 | match me.stream.poll_next(cx) { |
37 | Poll::Ready(v) => { |
38 | if v.is_some() { |
39 | me.interval.reset(); |
40 | } |
41 | return Poll::Ready(v.map(Ok)); |
42 | } |
43 | Poll::Pending => {} |
44 | }; |
45 | |
46 | ready!(me.interval.poll_tick(cx)); |
47 | Poll::Ready(Some(Err(Elapsed::new()))) |
48 | } |
49 | |
50 | fn size_hint(&self) -> (usize, Option<usize>) { |
51 | let (lower, _) = self.stream.size_hint(); |
52 | |
53 | // The timeout stream may insert an error an infinite number of times. |
54 | (lower, None) |
55 | } |
56 | } |
57 | |