1 | use crate::stream_ext::Fuse; |
2 | use crate::Stream; |
3 | use tokio::time::{sleep, Sleep}; |
4 | |
5 | use core::future::Future; |
6 | use core::pin::Pin; |
7 | use core::task::{Context, Poll}; |
8 | use pin_project_lite::pin_project; |
9 | use std::time::Duration; |
10 | |
11 | pin_project! { |
12 | /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. |
13 | #[must_use = "streams do nothing unless polled" ] |
14 | #[derive(Debug)] |
15 | pub struct ChunksTimeout<S: Stream> { |
16 | #[pin] |
17 | stream: Fuse<S>, |
18 | #[pin] |
19 | deadline: Option<Sleep>, |
20 | duration: Duration, |
21 | items: Vec<S::Item>, |
22 | cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 |
23 | } |
24 | } |
25 | |
26 | impl<S: Stream> ChunksTimeout<S> { |
27 | pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { |
28 | ChunksTimeout { |
29 | stream: Fuse::new(stream), |
30 | deadline: None, |
31 | duration, |
32 | items: Vec::with_capacity(max_size), |
33 | cap: max_size, |
34 | } |
35 | } |
36 | } |
37 | |
38 | impl<S: Stream> Stream for ChunksTimeout<S> { |
39 | type Item = Vec<S::Item>; |
40 | |
41 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
42 | let mut me = self.as_mut().project(); |
43 | loop { |
44 | match me.stream.as_mut().poll_next(cx) { |
45 | Poll::Pending => break, |
46 | Poll::Ready(Some(item)) => { |
47 | if me.items.is_empty() { |
48 | me.deadline.set(Some(sleep(*me.duration))); |
49 | me.items.reserve_exact(*me.cap); |
50 | } |
51 | me.items.push(item); |
52 | if me.items.len() >= *me.cap { |
53 | return Poll::Ready(Some(std::mem::take(me.items))); |
54 | } |
55 | } |
56 | Poll::Ready(None) => { |
57 | // Returning Some here is only correct because we fuse the inner stream. |
58 | let last = if me.items.is_empty() { |
59 | None |
60 | } else { |
61 | Some(std::mem::take(me.items)) |
62 | }; |
63 | |
64 | return Poll::Ready(last); |
65 | } |
66 | } |
67 | } |
68 | |
69 | if !me.items.is_empty() { |
70 | if let Some(deadline) = me.deadline.as_pin_mut() { |
71 | ready!(deadline.poll(cx)); |
72 | } |
73 | return Poll::Ready(Some(std::mem::take(me.items))); |
74 | } |
75 | |
76 | Poll::Pending |
77 | } |
78 | |
79 | fn size_hint(&self) -> (usize, Option<usize>) { |
80 | let chunk_len = if self.items.is_empty() { 0 } else { 1 }; |
81 | let (lower, upper) = self.stream.size_hint(); |
82 | let lower = (lower / self.cap).saturating_add(chunk_len); |
83 | let upper = upper.and_then(|x| x.checked_add(chunk_len)); |
84 | (lower, upper) |
85 | } |
86 | } |
87 | |