| 1 | use crate::stream::{Fuse, StreamExt}; |
| 2 | use alloc::vec::Vec; |
| 3 | use core::pin::Pin; |
| 4 | use futures_core::stream::{FusedStream, Stream}; |
| 5 | use futures_core::task::{Context, Poll}; |
| 6 | #[cfg (feature = "sink" )] |
| 7 | use futures_sink::Sink; |
| 8 | use pin_project_lite::pin_project; |
| 9 | |
| 10 | pin_project! { |
| 11 | /// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method. |
| 12 | #[derive (Debug)] |
| 13 | #[must_use = "streams do nothing unless polled" ] |
| 14 | pub struct ReadyChunks<St: Stream> { |
| 15 | #[pin] |
| 16 | stream: Fuse<St>, |
| 17 | cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 |
| 18 | } |
| 19 | } |
| 20 | |
| 21 | impl<St: Stream> ReadyChunks<St> { |
| 22 | pub(super) fn new(stream: St, capacity: usize) -> Self { |
| 23 | assert!(capacity > 0); |
| 24 | |
| 25 | Self { stream: stream.fuse(), cap: capacity } |
| 26 | } |
| 27 | |
| 28 | delegate_access_inner!(stream, St, (.)); |
| 29 | } |
| 30 | |
| 31 | impl<St: Stream> Stream for ReadyChunks<St> { |
| 32 | type Item = Vec<St::Item>; |
| 33 | |
| 34 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 35 | let mut this = self.project(); |
| 36 | |
| 37 | let mut items: Vec<St::Item> = Vec::new(); |
| 38 | |
| 39 | loop { |
| 40 | match this.stream.as_mut().poll_next(cx) { |
| 41 | // Flush all collected data if underlying stream doesn't contain |
| 42 | // more ready values |
| 43 | Poll::Pending => { |
| 44 | return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) } |
| 45 | } |
| 46 | |
| 47 | // Push the ready item into the buffer and check whether it is full. |
| 48 | // If so, replace our buffer with a new and empty one and return |
| 49 | // the full one. |
| 50 | Poll::Ready(Some(item)) => { |
| 51 | if items.is_empty() { |
| 52 | items.reserve(*this.cap); |
| 53 | } |
| 54 | items.push(item); |
| 55 | if items.len() >= *this.cap { |
| 56 | return Poll::Ready(Some(items)); |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | // Since the underlying stream ran out of values, return what we |
| 61 | // have buffered, if we have anything. |
| 62 | Poll::Ready(None) => { |
| 63 | let last = if items.is_empty() { None } else { Some(items) }; |
| 64 | |
| 65 | return Poll::Ready(last); |
| 66 | } |
| 67 | } |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 72 | let (lower, upper) = self.stream.size_hint(); |
| 73 | let lower = lower / self.cap; |
| 74 | (lower, upper) |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | impl<St: Stream> FusedStream for ReadyChunks<St> { |
| 79 | fn is_terminated(&self) -> bool { |
| 80 | self.stream.is_terminated() |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | // Forwarding impl of Sink from the underlying stream |
| 85 | #[cfg (feature = "sink" )] |
| 86 | impl<S, Item> Sink<Item> for ReadyChunks<S> |
| 87 | where |
| 88 | S: Stream + Sink<Item>, |
| 89 | { |
| 90 | type Error = S::Error; |
| 91 | |
| 92 | delegate_sink!(stream, Item); |
| 93 | } |
| 94 | |