1 | //! Slow down a stream by enforcing a delay between items. |
2 | |
3 | use crate::Stream; |
4 | use tokio::time::{Duration, Instant, Sleep}; |
5 | |
6 | use std::future::Future; |
7 | use std::pin::Pin; |
8 | use std::task::{self, Poll}; |
9 | |
10 | use pin_project_lite::pin_project; |
11 | |
12 | pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T> |
13 | where |
14 | T: Stream, |
15 | { |
16 | Throttle { |
17 | delay: tokio::time::sleep_until(Instant::now() + duration), |
18 | duration, |
19 | has_delayed: true, |
20 | stream, |
21 | } |
22 | } |
23 | |
24 | pin_project! { |
25 | /// Stream for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to |
26 | /// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`. |
27 | #[derive(Debug)] |
28 | #[must_use = "streams do nothing unless polled" ] |
29 | pub struct Throttle<T> { |
30 | #[pin] |
31 | delay: Sleep, |
32 | duration: Duration, |
33 | |
34 | // Set to true when `delay` has returned ready, but `stream` hasn't. |
35 | has_delayed: bool, |
36 | |
37 | // The stream to throttle |
38 | #[pin] |
39 | stream: T, |
40 | } |
41 | } |
42 | |
43 | impl<T> Throttle<T> { |
44 | /// Acquires a reference to the underlying stream that this combinator is |
45 | /// pulling from. |
46 | pub fn get_ref(&self) -> &T { |
47 | &self.stream |
48 | } |
49 | |
50 | /// Acquires a mutable reference to the underlying stream that this combinator |
51 | /// is pulling from. |
52 | /// |
53 | /// Note that care must be taken to avoid tampering with the state of the stream |
54 | /// which may otherwise confuse this combinator. |
55 | pub fn get_mut(&mut self) -> &mut T { |
56 | &mut self.stream |
57 | } |
58 | |
59 | /// Consumes this combinator, returning the underlying stream. |
60 | /// |
61 | /// Note that this may discard intermediate state of this combinator, so care |
62 | /// should be taken to avoid losing resources when this is called. |
63 | pub fn into_inner(self) -> T { |
64 | self.stream |
65 | } |
66 | } |
67 | |
68 | impl<T: Stream> Stream for Throttle<T> { |
69 | type Item = T::Item; |
70 | |
71 | fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { |
72 | let mut me = self.project(); |
73 | let dur = *me.duration; |
74 | |
75 | if !*me.has_delayed && !is_zero(dur) { |
76 | ready!(me.delay.as_mut().poll(cx)); |
77 | *me.has_delayed = true; |
78 | } |
79 | |
80 | let value = ready!(me.stream.poll_next(cx)); |
81 | |
82 | if value.is_some() { |
83 | if !is_zero(dur) { |
84 | me.delay.reset(Instant::now() + dur); |
85 | } |
86 | |
87 | *me.has_delayed = false; |
88 | } |
89 | |
90 | Poll::Ready(value) |
91 | } |
92 | } |
93 | |
94 | fn is_zero(dur: Duration) -> bool { |
95 | dur == Duration::from_millis(0) |
96 | } |
97 | |