1use bytes::{Bytes, BytesMut};
2use futures_core::stream::Stream;
3use pin_project_lite::pin_project;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::io::AsyncRead;
7
8const DEFAULT_CAPACITY: usize = 4096;
9
10pin_project! {
11 /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
12 ///
13 /// This stream is fused. It performs the inverse operation of
14 /// [`StreamReader`].
15 ///
16 /// # Example
17 ///
18 /// ```
19 /// # #[tokio::main]
20 /// # async fn main() -> std::io::Result<()> {
21 /// use tokio_stream::StreamExt;
22 /// use tokio_util::io::ReaderStream;
23 ///
24 /// // Create a stream of data.
25 /// let data = b"hello, world!";
26 /// let mut stream = ReaderStream::new(&data[..]);
27 ///
28 /// // Read all of the chunks into a vector.
29 /// let mut stream_contents = Vec::new();
30 /// while let Some(chunk) = stream.next().await {
31 /// stream_contents.extend_from_slice(&chunk?);
32 /// }
33 ///
34 /// // Once the chunks are concatenated, we should have the
35 /// // original data.
36 /// assert_eq!(stream_contents, data);
37 /// # Ok(())
38 /// # }
39 /// ```
40 ///
41 /// [`AsyncRead`]: tokio::io::AsyncRead
42 /// [`StreamReader`]: crate::io::StreamReader
43 /// [`Stream`]: futures_core::Stream
44 #[derive(Debug)]
45 pub struct ReaderStream<R> {
46 // Reader itself.
47 //
48 // This value is `None` if the stream has terminated.
49 #[pin]
50 reader: Option<R>,
51 // Working buffer, used to optimize allocations.
52 buf: BytesMut,
53 capacity: usize,
54 }
55}
56
57impl<R: AsyncRead> ReaderStream<R> {
58 /// Convert an [`AsyncRead`] into a [`Stream`] with item type
59 /// `Result<Bytes, std::io::Error>`.
60 ///
61 /// [`AsyncRead`]: tokio::io::AsyncRead
62 /// [`Stream`]: futures_core::Stream
63 pub fn new(reader: R) -> Self {
64 ReaderStream {
65 reader: Some(reader),
66 buf: BytesMut::new(),
67 capacity: DEFAULT_CAPACITY,
68 }
69 }
70
71 /// Convert an [`AsyncRead`] into a [`Stream`] with item type
72 /// `Result<Bytes, std::io::Error>`,
73 /// with a specific read buffer initial capacity.
74 ///
75 /// [`AsyncRead`]: tokio::io::AsyncRead
76 /// [`Stream`]: futures_core::Stream
77 pub fn with_capacity(reader: R, capacity: usize) -> Self {
78 ReaderStream {
79 reader: Some(reader),
80 buf: BytesMut::with_capacity(capacity),
81 capacity,
82 }
83 }
84}
85
86impl<R: AsyncRead> Stream for ReaderStream<R> {
87 type Item = std::io::Result<Bytes>;
88 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89 use crate::util::poll_read_buf;
90
91 let mut this = self.as_mut().project();
92
93 let reader = match this.reader.as_pin_mut() {
94 Some(r) => r,
95 None => return Poll::Ready(None),
96 };
97
98 if this.buf.capacity() == 0 {
99 this.buf.reserve(*this.capacity);
100 }
101
102 match poll_read_buf(reader, cx, &mut this.buf) {
103 Poll::Pending => Poll::Pending,
104 Poll::Ready(Err(err)) => {
105 self.project().reader.set(None);
106 Poll::Ready(Some(Err(err)))
107 }
108 Poll::Ready(Ok(0)) => {
109 self.project().reader.set(None);
110 Poll::Ready(None)
111 }
112 Poll::Ready(Ok(_)) => {
113 let chunk = this.buf.split();
114 Poll::Ready(Some(Ok(chunk.freeze())))
115 }
116 }
117 }
118}
119