1 | use bytes::Buf; |
2 | use futures_core::stream::Stream; |
3 | use pin_project_lite::pin_project; |
4 | use std::io; |
5 | use std::pin::Pin; |
6 | use std::task::{Context, Poll}; |
7 | use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; |
8 | |
9 | pin_project! { |
10 | /// Convert a [`Stream`] of byte chunks into an [`AsyncRead`]. |
11 | /// |
12 | /// This type performs the inverse operation of [`ReaderStream`]. |
13 | /// |
14 | /// # Example |
15 | /// |
16 | /// ``` |
17 | /// use bytes::Bytes; |
18 | /// use tokio::io::{AsyncReadExt, Result}; |
19 | /// use tokio_util::io::StreamReader; |
20 | /// # #[tokio::main] |
21 | /// # async fn main() -> std::io::Result<()> { |
22 | /// |
23 | /// // Create a stream from an iterator. |
24 | /// let stream = tokio_stream::iter(vec![ |
25 | /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), |
26 | /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), |
27 | /// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), |
28 | /// ]); |
29 | /// |
30 | /// // Convert it to an AsyncRead. |
31 | /// let mut read = StreamReader::new(stream); |
32 | /// |
33 | /// // Read five bytes from the stream. |
34 | /// let mut buf = [0; 5]; |
35 | /// read.read_exact(&mut buf).await?; |
36 | /// assert_eq!(buf, [0, 1, 2, 3, 4]); |
37 | /// |
38 | /// // Read the rest of the current chunk. |
39 | /// assert_eq!(read.read(&mut buf).await?, 3); |
40 | /// assert_eq!(&buf[..3], [5, 6, 7]); |
41 | /// |
42 | /// // Read the next chunk. |
43 | /// assert_eq!(read.read(&mut buf).await?, 4); |
44 | /// assert_eq!(&buf[..4], [8, 9, 10, 11]); |
45 | /// |
46 | /// // We have now reached the end. |
47 | /// assert_eq!(read.read(&mut buf).await?, 0); |
48 | /// |
49 | /// # Ok(()) |
50 | /// # } |
51 | /// ``` |
52 | /// |
53 | /// [`AsyncRead`]: tokio::io::AsyncRead |
54 | /// [`Stream`]: futures_core::Stream |
55 | /// [`ReaderStream`]: crate::io::ReaderStream |
56 | #[derive(Debug)] |
57 | pub struct StreamReader<S, B> { |
58 | #[pin] |
59 | inner: S, |
60 | chunk: Option<B>, |
61 | } |
62 | } |
63 | |
64 | impl<S, B, E> StreamReader<S, B> |
65 | where |
66 | S: Stream<Item = Result<B, E>>, |
67 | B: Buf, |
68 | E: Into<std::io::Error>, |
69 | { |
70 | /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead). |
71 | /// |
72 | /// The item should be a [`Result`] with the ok variant being something that |
73 | /// implements the [`Buf`] trait (e.g. `Vec<u8>` or `Bytes`). The error |
74 | /// should be convertible into an [io error]. |
75 | /// |
76 | /// [`Result`]: std::result::Result |
77 | /// [`Buf`]: bytes::Buf |
78 | /// [io error]: std::io::Error |
79 | pub fn new(stream: S) -> Self { |
80 | Self { |
81 | inner: stream, |
82 | chunk: None, |
83 | } |
84 | } |
85 | |
86 | /// Do we have a chunk and is it non-empty? |
87 | fn has_chunk(&self) -> bool { |
88 | if let Some(ref chunk) = self.chunk { |
89 | chunk.remaining() > 0 |
90 | } else { |
91 | false |
92 | } |
93 | } |
94 | |
95 | /// Consumes this `StreamReader`, returning a Tuple consisting |
96 | /// of the underlying stream and an Option of the interal buffer, |
97 | /// which is Some in case the buffer contains elements. |
98 | pub fn into_inner_with_chunk(self) -> (S, Option<B>) { |
99 | if self.has_chunk() { |
100 | (self.inner, self.chunk) |
101 | } else { |
102 | (self.inner, None) |
103 | } |
104 | } |
105 | } |
106 | |
107 | impl<S, B> StreamReader<S, B> { |
108 | /// Gets a reference to the underlying stream. |
109 | /// |
110 | /// It is inadvisable to directly read from the underlying stream. |
111 | pub fn get_ref(&self) -> &S { |
112 | &self.inner |
113 | } |
114 | |
115 | /// Gets a mutable reference to the underlying stream. |
116 | /// |
117 | /// It is inadvisable to directly read from the underlying stream. |
118 | pub fn get_mut(&mut self) -> &mut S { |
119 | &mut self.inner |
120 | } |
121 | |
122 | /// Gets a pinned mutable reference to the underlying stream. |
123 | /// |
124 | /// It is inadvisable to directly read from the underlying stream. |
125 | pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> { |
126 | self.project().inner |
127 | } |
128 | |
129 | /// Consumes this `BufWriter`, returning the underlying stream. |
130 | /// |
131 | /// Note that any leftover data in the internal buffer is lost. |
132 | /// If you additionally want access to the internal buffer use |
133 | /// [`into_inner_with_chunk`]. |
134 | /// |
135 | /// [`into_inner_with_chunk`]: crate::io::StreamReader::into_inner_with_chunk |
136 | pub fn into_inner(self) -> S { |
137 | self.inner |
138 | } |
139 | } |
140 | |
141 | impl<S, B, E> AsyncRead for StreamReader<S, B> |
142 | where |
143 | S: Stream<Item = Result<B, E>>, |
144 | B: Buf, |
145 | E: Into<std::io::Error>, |
146 | { |
147 | fn poll_read( |
148 | mut self: Pin<&mut Self>, |
149 | cx: &mut Context<'_>, |
150 | buf: &mut ReadBuf<'_>, |
151 | ) -> Poll<io::Result<()>> { |
152 | if buf.remaining() == 0 { |
153 | return Poll::Ready(Ok(())); |
154 | } |
155 | |
156 | let inner_buf: &[u8] = match self.as_mut().poll_fill_buf(cx) { |
157 | Poll::Ready(Ok(buf: &[u8])) => buf, |
158 | Poll::Ready(Err(err: Error)) => return Poll::Ready(Err(err)), |
159 | Poll::Pending => return Poll::Pending, |
160 | }; |
161 | let len: usize = std::cmp::min(v1:inner_buf.len(), v2:buf.remaining()); |
162 | buf.put_slice(&inner_buf[..len]); |
163 | |
164 | self.consume(amt:len); |
165 | Poll::Ready(Ok(())) |
166 | } |
167 | } |
168 | |
169 | impl<S, B, E> AsyncBufRead for StreamReader<S, B> |
170 | where |
171 | S: Stream<Item = Result<B, E>>, |
172 | B: Buf, |
173 | E: Into<std::io::Error>, |
174 | { |
175 | fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
176 | loop { |
177 | if self.as_mut().has_chunk() { |
178 | // This unwrap is very sad, but it can't be avoided. |
179 | let buf = self.project().chunk.as_ref().unwrap().chunk(); |
180 | return Poll::Ready(Ok(buf)); |
181 | } else { |
182 | match self.as_mut().project().inner.poll_next(cx) { |
183 | Poll::Ready(Some(Ok(chunk))) => { |
184 | // Go around the loop in case the chunk is empty. |
185 | *self.as_mut().project().chunk = Some(chunk); |
186 | } |
187 | Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())), |
188 | Poll::Ready(None) => return Poll::Ready(Ok(&[])), |
189 | Poll::Pending => return Poll::Pending, |
190 | } |
191 | } |
192 | } |
193 | } |
194 | fn consume(self: Pin<&mut Self>, amt: usize) { |
195 | if amt > 0 { |
196 | self.project() |
197 | .chunk |
198 | .as_mut() |
199 | .expect("No chunk present" ) |
200 | .advance(amt); |
201 | } |
202 | } |
203 | } |
204 | |