1use bytes::Buf;
2use futures_core::stream::Stream;
3use pin_project_lite::pin_project;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
8
9pin_project! {
10 /// Convert a [`Stream`] of byte chunks into an [`AsyncRead`].
11 ///
12 /// This type performs the inverse operation of [`ReaderStream`].
13 ///
14 /// # Example
15 ///
16 /// ```
17 /// use bytes::Bytes;
18 /// use tokio::io::{AsyncReadExt, Result};
19 /// use tokio_util::io::StreamReader;
20 /// # #[tokio::main]
21 /// # async fn main() -> std::io::Result<()> {
22 ///
23 /// // Create a stream from an iterator.
24 /// let stream = tokio_stream::iter(vec![
25 /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
26 /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
27 /// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
28 /// ]);
29 ///
30 /// // Convert it to an AsyncRead.
31 /// let mut read = StreamReader::new(stream);
32 ///
33 /// // Read five bytes from the stream.
34 /// let mut buf = [0; 5];
35 /// read.read_exact(&mut buf).await?;
36 /// assert_eq!(buf, [0, 1, 2, 3, 4]);
37 ///
38 /// // Read the rest of the current chunk.
39 /// assert_eq!(read.read(&mut buf).await?, 3);
40 /// assert_eq!(&buf[..3], [5, 6, 7]);
41 ///
42 /// // Read the next chunk.
43 /// assert_eq!(read.read(&mut buf).await?, 4);
44 /// assert_eq!(&buf[..4], [8, 9, 10, 11]);
45 ///
46 /// // We have now reached the end.
47 /// assert_eq!(read.read(&mut buf).await?, 0);
48 ///
49 /// # Ok(())
50 /// # }
51 /// ```
52 ///
53 /// [`AsyncRead`]: tokio::io::AsyncRead
54 /// [`Stream`]: futures_core::Stream
55 /// [`ReaderStream`]: crate::io::ReaderStream
56 #[derive(Debug)]
57 pub struct StreamReader<S, B> {
58 #[pin]
59 inner: S,
60 chunk: Option<B>,
61 }
62}
63
64impl<S, B, E> StreamReader<S, B>
65where
66 S: Stream<Item = Result<B, E>>,
67 B: Buf,
68 E: Into<std::io::Error>,
69{
70 /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead).
71 ///
72 /// The item should be a [`Result`] with the ok variant being something that
73 /// implements the [`Buf`] trait (e.g. `Vec<u8>` or `Bytes`). The error
74 /// should be convertible into an [io error].
75 ///
76 /// [`Result`]: std::result::Result
77 /// [`Buf`]: bytes::Buf
78 /// [io error]: std::io::Error
79 pub fn new(stream: S) -> Self {
80 Self {
81 inner: stream,
82 chunk: None,
83 }
84 }
85
86 /// Do we have a chunk and is it non-empty?
87 fn has_chunk(&self) -> bool {
88 if let Some(ref chunk) = self.chunk {
89 chunk.remaining() > 0
90 } else {
91 false
92 }
93 }
94
95 /// Consumes this `StreamReader`, returning a Tuple consisting
96 /// of the underlying stream and an Option of the interal buffer,
97 /// which is Some in case the buffer contains elements.
98 pub fn into_inner_with_chunk(self) -> (S, Option<B>) {
99 if self.has_chunk() {
100 (self.inner, self.chunk)
101 } else {
102 (self.inner, None)
103 }
104 }
105}
106
107impl<S, B> StreamReader<S, B> {
108 /// Gets a reference to the underlying stream.
109 ///
110 /// It is inadvisable to directly read from the underlying stream.
111 pub fn get_ref(&self) -> &S {
112 &self.inner
113 }
114
115 /// Gets a mutable reference to the underlying stream.
116 ///
117 /// It is inadvisable to directly read from the underlying stream.
118 pub fn get_mut(&mut self) -> &mut S {
119 &mut self.inner
120 }
121
122 /// Gets a pinned mutable reference to the underlying stream.
123 ///
124 /// It is inadvisable to directly read from the underlying stream.
125 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
126 self.project().inner
127 }
128
129 /// Consumes this `BufWriter`, returning the underlying stream.
130 ///
131 /// Note that any leftover data in the internal buffer is lost.
132 /// If you additionally want access to the internal buffer use
133 /// [`into_inner_with_chunk`].
134 ///
135 /// [`into_inner_with_chunk`]: crate::io::StreamReader::into_inner_with_chunk
136 pub fn into_inner(self) -> S {
137 self.inner
138 }
139}
140
141impl<S, B, E> AsyncRead for StreamReader<S, B>
142where
143 S: Stream<Item = Result<B, E>>,
144 B: Buf,
145 E: Into<std::io::Error>,
146{
147 fn poll_read(
148 mut self: Pin<&mut Self>,
149 cx: &mut Context<'_>,
150 buf: &mut ReadBuf<'_>,
151 ) -> Poll<io::Result<()>> {
152 if buf.remaining() == 0 {
153 return Poll::Ready(Ok(()));
154 }
155
156 let inner_buf: &[u8] = match self.as_mut().poll_fill_buf(cx) {
157 Poll::Ready(Ok(buf: &[u8])) => buf,
158 Poll::Ready(Err(err: Error)) => return Poll::Ready(Err(err)),
159 Poll::Pending => return Poll::Pending,
160 };
161 let len: usize = std::cmp::min(v1:inner_buf.len(), v2:buf.remaining());
162 buf.put_slice(&inner_buf[..len]);
163
164 self.consume(amt:len);
165 Poll::Ready(Ok(()))
166 }
167}
168
169impl<S, B, E> AsyncBufRead for StreamReader<S, B>
170where
171 S: Stream<Item = Result<B, E>>,
172 B: Buf,
173 E: Into<std::io::Error>,
174{
175 fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
176 loop {
177 if self.as_mut().has_chunk() {
178 // This unwrap is very sad, but it can't be avoided.
179 let buf = self.project().chunk.as_ref().unwrap().chunk();
180 return Poll::Ready(Ok(buf));
181 } else {
182 match self.as_mut().project().inner.poll_next(cx) {
183 Poll::Ready(Some(Ok(chunk))) => {
184 // Go around the loop in case the chunk is empty.
185 *self.as_mut().project().chunk = Some(chunk);
186 }
187 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
188 Poll::Ready(None) => return Poll::Ready(Ok(&[])),
189 Poll::Pending => return Poll::Pending,
190 }
191 }
192 }
193 }
194 fn consume(self: Pin<&mut Self>, amt: usize) {
195 if amt > 0 {
196 self.project()
197 .chunk
198 .as_mut()
199 .expect("No chunk present")
200 .advance(amt);
201 }
202 }
203}
204