1use crate::stream::{Fuse, IntoStream, StreamExt};
2
3use alloc::vec::Vec;
4use core::pin::Pin;
5use core::{fmt, mem};
6use futures_core::ready;
7use futures_core::stream::{FusedStream, Stream, TryStream};
8use futures_core::task::{Context, Poll};
9#[cfg(feature = "sink")]
10use futures_sink::Sink;
11use pin_project_lite::pin_project;
12
13pin_project! {
14 /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
15 #[derive(Debug)]
16 #[must_use = "streams do nothing unless polled"]
17 pub struct TryChunks<St: TryStream> {
18 #[pin]
19 stream: Fuse<IntoStream<St>>,
20 items: Vec<St::Ok>,
21 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
22 }
23}
24
25impl<St: TryStream> TryChunks<St> {
26 pub(super) fn new(stream: St, capacity: usize) -> Self {
27 assert!(capacity > 0);
28
29 Self {
30 stream: IntoStream::new(stream).fuse(),
31 items: Vec::with_capacity(capacity),
32 cap: capacity,
33 }
34 }
35
36 fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
37 let cap: usize = self.cap;
38 mem::replace(self.project().items, src:Vec::with_capacity(cap))
39 }
40
41 delegate_access_inner!(stream, St, (. .));
42}
43
44type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;
45
46impl<St: TryStream> Stream for TryChunks<St> {
47 type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>;
48
49 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 let mut this = self.as_mut().project();
51 loop {
52 match ready!(this.stream.as_mut().try_poll_next(cx)) {
53 // Push the item into the buffer and check whether it is full.
54 // If so, replace our buffer with a new and empty one and return
55 // the full one.
56 Some(item) => match item {
57 Ok(item) => {
58 this.items.push(item);
59 if this.items.len() >= *this.cap {
60 return Poll::Ready(Some(Ok(self.take())));
61 }
62 }
63 Err(e) => {
64 return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
65 }
66 },
67
68 // Since the underlying stream ran out of values, return what we
69 // have buffered, if we have anything.
70 None => {
71 let last = if this.items.is_empty() {
72 None
73 } else {
74 let full_buf = mem::take(this.items);
75 Some(full_buf)
76 };
77
78 return Poll::Ready(last.map(Ok));
79 }
80 }
81 }
82 }
83
84 fn size_hint(&self) -> (usize, Option<usize>) {
85 let chunk_len = usize::from(!self.items.is_empty());
86 let (lower, upper) = self.stream.size_hint();
87 let lower = (lower / self.cap).saturating_add(chunk_len);
88 let upper = match upper {
89 Some(x) => x.checked_add(chunk_len),
90 None => None,
91 };
92 (lower, upper)
93 }
94}
95
96impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
97 fn is_terminated(&self) -> bool {
98 self.stream.is_terminated() && self.items.is_empty()
99 }
100}
101
102// Forwarding impl of Sink from the underlying stream
103#[cfg(feature = "sink")]
104impl<S, Item> Sink<Item> for TryChunks<S>
105where
106 S: TryStream + Sink<Item>,
107{
108 type Error = <S as Sink<Item>>::Error;
109
110 delegate_sink!(stream, Item);
111}
112
113/// Error indicating, that while chunk was collected inner stream produced an error.
114///
115/// Contains all items that were collected before an error occurred, and the stream error itself.
116#[derive(PartialEq, Eq)]
117pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
118
119impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 self.1.fmt(f)
122 }
123}
124
125impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 self.1.fmt(f)
128 }
129}
130
131#[cfg(feature = "std")]
132impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}
133