| 1 | use crate::stream_ext::Fuse; |
| 2 | use crate::Stream; |
| 3 | use tokio::time::{sleep, Sleep}; |
| 4 | |
| 5 | use core::future::Future; |
| 6 | use core::pin::Pin; |
| 7 | use core::task::{Context, Poll}; |
| 8 | use pin_project_lite::pin_project; |
| 9 | use std::time::Duration; |
| 10 | |
| 11 | pin_project! { |
| 12 | /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. |
| 13 | #[must_use = "streams do nothing unless polled" ] |
| 14 | #[derive(Debug)] |
| 15 | pub struct ChunksTimeout<S: Stream> { |
| 16 | #[pin] |
| 17 | stream: Fuse<S>, |
| 18 | #[pin] |
| 19 | deadline: Option<Sleep>, |
| 20 | duration: Duration, |
| 21 | items: Vec<S::Item>, |
| 22 | cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 |
| 23 | } |
| 24 | } |
| 25 | |
| 26 | impl<S: Stream> ChunksTimeout<S> { |
| 27 | pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { |
| 28 | ChunksTimeout { |
| 29 | stream: Fuse::new(stream), |
| 30 | deadline: None, |
| 31 | duration, |
| 32 | items: Vec::with_capacity(max_size), |
| 33 | cap: max_size, |
| 34 | } |
| 35 | } |
| 36 | } |
| 37 | |
| 38 | impl<S: Stream> Stream for ChunksTimeout<S> { |
| 39 | type Item = Vec<S::Item>; |
| 40 | |
| 41 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 42 | let mut me = self.as_mut().project(); |
| 43 | loop { |
| 44 | match me.stream.as_mut().poll_next(cx) { |
| 45 | Poll::Pending => break, |
| 46 | Poll::Ready(Some(item)) => { |
| 47 | if me.items.is_empty() { |
| 48 | me.deadline.set(Some(sleep(*me.duration))); |
| 49 | me.items.reserve_exact(*me.cap); |
| 50 | } |
| 51 | me.items.push(item); |
| 52 | if me.items.len() >= *me.cap { |
| 53 | return Poll::Ready(Some(std::mem::take(me.items))); |
| 54 | } |
| 55 | } |
| 56 | Poll::Ready(None) => { |
| 57 | // Returning Some here is only correct because we fuse the inner stream. |
| 58 | let last = if me.items.is_empty() { |
| 59 | None |
| 60 | } else { |
| 61 | Some(std::mem::take(me.items)) |
| 62 | }; |
| 63 | |
| 64 | return Poll::Ready(last); |
| 65 | } |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | if !me.items.is_empty() { |
| 70 | if let Some(deadline) = me.deadline.as_pin_mut() { |
| 71 | ready!(deadline.poll(cx)); |
| 72 | } |
| 73 | return Poll::Ready(Some(std::mem::take(me.items))); |
| 74 | } |
| 75 | |
| 76 | Poll::Pending |
| 77 | } |
| 78 | |
| 79 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 80 | let chunk_len = if self.items.is_empty() { 0 } else { 1 }; |
| 81 | let (lower, upper) = self.stream.size_hint(); |
| 82 | let lower = (lower / self.cap).saturating_add(chunk_len); |
| 83 | let upper = upper.and_then(|x| x.checked_add(chunk_len)); |
| 84 | (lower, upper) |
| 85 | } |
| 86 | } |
| 87 | |