1 | use bytes::{Bytes, BytesMut}; |
2 | use futures_core::stream::Stream; |
3 | use pin_project_lite::pin_project; |
4 | use std::pin::Pin; |
5 | use std::task::{Context, Poll}; |
6 | use tokio::io::AsyncRead; |
7 | |
8 | const DEFAULT_CAPACITY: usize = 4096; |
9 | |
10 | pin_project! { |
11 | /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks. |
12 | /// |
13 | /// This stream is fused. It performs the inverse operation of |
14 | /// [`StreamReader`]. |
15 | /// |
16 | /// # Example |
17 | /// |
18 | /// ``` |
19 | /// # #[tokio::main] |
20 | /// # async fn main() -> std::io::Result<()> { |
21 | /// use tokio_stream::StreamExt; |
22 | /// use tokio_util::io::ReaderStream; |
23 | /// |
24 | /// // Create a stream of data. |
25 | /// let data = b"hello, world!"; |
26 | /// let mut stream = ReaderStream::new(&data[..]); |
27 | /// |
28 | /// // Read all of the chunks into a vector. |
29 | /// let mut stream_contents = Vec::new(); |
30 | /// while let Some(chunk) = stream.next().await { |
31 | /// stream_contents.extend_from_slice(&chunk?); |
32 | /// } |
33 | /// |
34 | /// // Once the chunks are concatenated, we should have the |
35 | /// // original data. |
36 | /// assert_eq!(stream_contents, data); |
37 | /// # Ok(()) |
38 | /// # } |
39 | /// ``` |
40 | /// |
41 | /// [`AsyncRead`]: tokio::io::AsyncRead |
42 | /// [`StreamReader`]: crate::io::StreamReader |
43 | /// [`Stream`]: futures_core::Stream |
44 | #[derive(Debug)] |
45 | pub struct ReaderStream<R> { |
46 | // Reader itself. |
47 | // |
48 | // This value is `None` if the stream has terminated. |
49 | #[pin] |
50 | reader: Option<R>, |
51 | // Working buffer, used to optimize allocations. |
52 | buf: BytesMut, |
53 | capacity: usize, |
54 | } |
55 | } |
56 | |
57 | impl<R: AsyncRead> ReaderStream<R> { |
58 | /// Convert an [`AsyncRead`] into a [`Stream`] with item type |
59 | /// `Result<Bytes, std::io::Error>`. |
60 | /// |
61 | /// [`AsyncRead`]: tokio::io::AsyncRead |
62 | /// [`Stream`]: futures_core::Stream |
63 | pub fn new(reader: R) -> Self { |
64 | ReaderStream { |
65 | reader: Some(reader), |
66 | buf: BytesMut::new(), |
67 | capacity: DEFAULT_CAPACITY, |
68 | } |
69 | } |
70 | |
71 | /// Convert an [`AsyncRead`] into a [`Stream`] with item type |
72 | /// `Result<Bytes, std::io::Error>`, |
73 | /// with a specific read buffer initial capacity. |
74 | /// |
75 | /// [`AsyncRead`]: tokio::io::AsyncRead |
76 | /// [`Stream`]: futures_core::Stream |
77 | pub fn with_capacity(reader: R, capacity: usize) -> Self { |
78 | ReaderStream { |
79 | reader: Some(reader), |
80 | buf: BytesMut::with_capacity(capacity), |
81 | capacity, |
82 | } |
83 | } |
84 | } |
85 | |
86 | impl<R: AsyncRead> Stream for ReaderStream<R> { |
87 | type Item = std::io::Result<Bytes>; |
88 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
89 | use crate::util::poll_read_buf; |
90 | |
91 | let mut this = self.as_mut().project(); |
92 | |
93 | let reader = match this.reader.as_pin_mut() { |
94 | Some(r) => r, |
95 | None => return Poll::Ready(None), |
96 | }; |
97 | |
98 | if this.buf.capacity() == 0 { |
99 | this.buf.reserve(*this.capacity); |
100 | } |
101 | |
102 | match poll_read_buf(reader, cx, &mut this.buf) { |
103 | Poll::Pending => Poll::Pending, |
104 | Poll::Ready(Err(err)) => { |
105 | self.project().reader.set(None); |
106 | Poll::Ready(Some(Err(err))) |
107 | } |
108 | Poll::Ready(Ok(0)) => { |
109 | self.project().reader.set(None); |
110 | Poll::Ready(None) |
111 | } |
112 | Poll::Ready(Ok(_)) => { |
113 | let chunk = this.buf.split(); |
114 | Poll::Ready(Some(Ok(chunk.freeze()))) |
115 | } |
116 | } |
117 | } |
118 | } |
119 | |