1use crate::stream_ext::Fuse;
2use crate::Stream;
3use tokio::time::{sleep, Sleep};
4
5use core::future::Future;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use pin_project_lite::pin_project;
9use std::time::Duration;
10
11pin_project! {
12 /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
13 #[must_use = "streams do nothing unless polled"]
14 #[derive(Debug)]
15 pub struct ChunksTimeout<S: Stream> {
16 #[pin]
17 stream: Fuse<S>,
18 #[pin]
19 deadline: Option<Sleep>,
20 duration: Duration,
21 items: Vec<S::Item>,
22 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
23 }
24}
25
26impl<S: Stream> ChunksTimeout<S> {
27 pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
28 ChunksTimeout {
29 stream: Fuse::new(stream),
30 deadline: None,
31 duration,
32 items: Vec::with_capacity(max_size),
33 cap: max_size,
34 }
35 }
36}
37
38impl<S: Stream> Stream for ChunksTimeout<S> {
39 type Item = Vec<S::Item>;
40
41 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 let mut me = self.as_mut().project();
43 loop {
44 match me.stream.as_mut().poll_next(cx) {
45 Poll::Pending => break,
46 Poll::Ready(Some(item)) => {
47 if me.items.is_empty() {
48 me.deadline.set(Some(sleep(*me.duration)));
49 me.items.reserve_exact(*me.cap);
50 }
51 me.items.push(item);
52 if me.items.len() >= *me.cap {
53 return Poll::Ready(Some(std::mem::take(me.items)));
54 }
55 }
56 Poll::Ready(None) => {
57 // Returning Some here is only correct because we fuse the inner stream.
58 let last = if me.items.is_empty() {
59 None
60 } else {
61 Some(std::mem::take(me.items))
62 };
63
64 return Poll::Ready(last);
65 }
66 }
67 }
68
69 if !me.items.is_empty() {
70 if let Some(deadline) = me.deadline.as_pin_mut() {
71 ready!(deadline.poll(cx));
72 }
73 return Poll::Ready(Some(std::mem::take(me.items)));
74 }
75
76 Poll::Pending
77 }
78
79 fn size_hint(&self) -> (usize, Option<usize>) {
80 let chunk_len = if self.items.is_empty() { 0 } else { 1 };
81 let (lower, upper) = self.stream.size_hint();
82 let lower = (lower / self.cap).saturating_add(chunk_len);
83 let upper = upper.and_then(|x| x.checked_add(chunk_len));
84 (lower, upper)
85 }
86}
87