1 | use crate::stream::{Fuse, StreamExt}; |
2 | use alloc::vec::Vec; |
3 | use core::pin::Pin; |
4 | use futures_core::stream::{FusedStream, Stream}; |
5 | use futures_core::task::{Context, Poll}; |
6 | #[cfg (feature = "sink" )] |
7 | use futures_sink::Sink; |
8 | use pin_project_lite::pin_project; |
9 | |
10 | pin_project! { |
11 | /// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method. |
12 | #[derive(Debug)] |
13 | #[must_use = "streams do nothing unless polled" ] |
14 | pub struct ReadyChunks<St: Stream> { |
15 | #[pin] |
16 | stream: Fuse<St>, |
17 | cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 |
18 | } |
19 | } |
20 | |
21 | impl<St: Stream> ReadyChunks<St> { |
22 | pub(super) fn new(stream: St, capacity: usize) -> Self { |
23 | assert!(capacity > 0); |
24 | |
25 | Self { stream: stream.fuse(), cap: capacity } |
26 | } |
27 | |
28 | delegate_access_inner!(stream, St, (.)); |
29 | } |
30 | |
31 | impl<St: Stream> Stream for ReadyChunks<St> { |
32 | type Item = Vec<St::Item>; |
33 | |
34 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
35 | let mut this = self.project(); |
36 | |
37 | let mut items: Vec<St::Item> = Vec::new(); |
38 | |
39 | loop { |
40 | match this.stream.as_mut().poll_next(cx) { |
41 | // Flush all collected data if underlying stream doesn't contain |
42 | // more ready values |
43 | Poll::Pending => { |
44 | return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) } |
45 | } |
46 | |
47 | // Push the ready item into the buffer and check whether it is full. |
48 | // If so, replace our buffer with a new and empty one and return |
49 | // the full one. |
50 | Poll::Ready(Some(item)) => { |
51 | if items.is_empty() { |
52 | items.reserve(*this.cap); |
53 | } |
54 | items.push(item); |
55 | if items.len() >= *this.cap { |
56 | return Poll::Ready(Some(items)); |
57 | } |
58 | } |
59 | |
60 | // Since the underlying stream ran out of values, return what we |
61 | // have buffered, if we have anything. |
62 | Poll::Ready(None) => { |
63 | let last = if items.is_empty() { None } else { Some(items) }; |
64 | |
65 | return Poll::Ready(last); |
66 | } |
67 | } |
68 | } |
69 | } |
70 | |
71 | fn size_hint(&self) -> (usize, Option<usize>) { |
72 | let (lower, upper) = self.stream.size_hint(); |
73 | let lower = lower / self.cap; |
74 | (lower, upper) |
75 | } |
76 | } |
77 | |
78 | impl<St: Stream> FusedStream for ReadyChunks<St> { |
79 | fn is_terminated(&self) -> bool { |
80 | self.stream.is_terminated() |
81 | } |
82 | } |
83 | |
84 | // Forwarding impl of Sink from the underlying stream |
85 | #[cfg (feature = "sink" )] |
86 | impl<S, Item> Sink<Item> for ReadyChunks<S> |
87 | where |
88 | S: Stream + Sink<Item>, |
89 | { |
90 | type Error = S::Error; |
91 | |
92 | delegate_sink!(stream, Item); |
93 | } |
94 | |