1 | use crate::stream::{Fuse, IntoStream, StreamExt}; |
2 | |
3 | use alloc::vec::Vec; |
4 | use core::pin::Pin; |
5 | use core::{fmt, mem}; |
6 | use futures_core::ready; |
7 | use futures_core::stream::{FusedStream, Stream, TryStream}; |
8 | use futures_core::task::{Context, Poll}; |
9 | #[cfg (feature = "sink" )] |
10 | use futures_sink::Sink; |
11 | use pin_project_lite::pin_project; |
12 | |
13 | pin_project! { |
14 | /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method. |
15 | #[derive(Debug)] |
16 | #[must_use = "streams do nothing unless polled" ] |
17 | pub struct TryChunks<St: TryStream> { |
18 | #[pin] |
19 | stream: Fuse<IntoStream<St>>, |
20 | items: Vec<St::Ok>, |
21 | cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 |
22 | } |
23 | } |
24 | |
25 | impl<St: TryStream> TryChunks<St> { |
26 | pub(super) fn new(stream: St, capacity: usize) -> Self { |
27 | assert!(capacity > 0); |
28 | |
29 | Self { |
30 | stream: IntoStream::new(stream).fuse(), |
31 | items: Vec::with_capacity(capacity), |
32 | cap: capacity, |
33 | } |
34 | } |
35 | |
36 | fn take(self: Pin<&mut Self>) -> Vec<St::Ok> { |
37 | let cap = self.cap; |
38 | mem::replace(self.project().items, Vec::with_capacity(cap)) |
39 | } |
40 | |
41 | delegate_access_inner!(stream, St, (. .)); |
42 | } |
43 | |
44 | type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>; |
45 | |
46 | impl<St: TryStream> Stream for TryChunks<St> { |
47 | type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>; |
48 | |
49 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
50 | let mut this = self.as_mut().project(); |
51 | loop { |
52 | match ready!(this.stream.as_mut().try_poll_next(cx)) { |
53 | // Push the item into the buffer and check whether it is full. |
54 | // If so, replace our buffer with a new and empty one and return |
55 | // the full one. |
56 | Some(item) => match item { |
57 | Ok(item) => { |
58 | this.items.push(item); |
59 | if this.items.len() >= *this.cap { |
60 | return Poll::Ready(Some(Ok(self.take()))); |
61 | } |
62 | } |
63 | Err(e) => { |
64 | return Poll::Ready(Some(Err(TryChunksError(self.take(), e)))); |
65 | } |
66 | }, |
67 | |
68 | // Since the underlying stream ran out of values, return what we |
69 | // have buffered, if we have anything. |
70 | None => { |
71 | let last = if this.items.is_empty() { |
72 | None |
73 | } else { |
74 | let full_buf = mem::take(this.items); |
75 | Some(full_buf) |
76 | }; |
77 | |
78 | return Poll::Ready(last.map(Ok)); |
79 | } |
80 | } |
81 | } |
82 | } |
83 | |
84 | fn size_hint(&self) -> (usize, Option<usize>) { |
85 | let chunk_len = usize::from(!self.items.is_empty()); |
86 | let (lower, upper) = self.stream.size_hint(); |
87 | let lower = (lower / self.cap).saturating_add(chunk_len); |
88 | let upper = match upper { |
89 | Some(x) => x.checked_add(chunk_len), |
90 | None => None, |
91 | }; |
92 | (lower, upper) |
93 | } |
94 | } |
95 | |
96 | impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> { |
97 | fn is_terminated(&self) -> bool { |
98 | self.stream.is_terminated() && self.items.is_empty() |
99 | } |
100 | } |
101 | |
102 | // Forwarding impl of Sink from the underlying stream |
103 | #[cfg (feature = "sink" )] |
104 | impl<S, Item> Sink<Item> for TryChunks<S> |
105 | where |
106 | S: TryStream + Sink<Item>, |
107 | { |
108 | type Error = <S as Sink<Item>>::Error; |
109 | |
110 | delegate_sink!(stream, Item); |
111 | } |
112 | |
113 | /// Error indicating, that while chunk was collected inner stream produced an error. |
114 | /// |
115 | /// Contains all items that were collected before an error occurred, and the stream error itself. |
116 | #[derive(PartialEq, Eq)] |
117 | pub struct TryChunksError<T, E>(pub Vec<T>, pub E); |
118 | |
119 | impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> { |
120 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
121 | self.1.fmt(f) |
122 | } |
123 | } |
124 | |
125 | impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> { |
126 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
127 | self.1.fmt(f) |
128 | } |
129 | } |
130 | |
131 | #[cfg (feature = "std" )] |
132 | impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {} |
133 | |