1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::TryStream;
4use futures_core::task::{Context, Poll};
5use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
6use pin_project_lite::pin_project;
7use std::cmp;
8use std::io::{Error, Result};
9
10pin_project! {
11 /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
12 #[derive(Debug)]
13 #[must_use = "readers do nothing unless polled"]
14 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
15 pub struct IntoAsyncRead<St>
16 where
17 St: TryStream<Error = Error>,
18 St::Ok: AsRef<[u8]>,
19 {
20 #[pin]
21 stream: St,
22 state: ReadState<St::Ok>,
23 }
24}
25
26#[derive(Debug)]
27enum ReadState<T: AsRef<[u8]>> {
28 Ready { chunk: T, chunk_start: usize },
29 PendingChunk,
30 Eof,
31}
32
33impl<St> IntoAsyncRead<St>
34where
35 St: TryStream<Error = Error>,
36 St::Ok: AsRef<[u8]>,
37{
38 pub(super) fn new(stream: St) -> Self {
39 Self { stream, state: ReadState::PendingChunk }
40 }
41}
42
43impl<St> AsyncRead for IntoAsyncRead<St>
44where
45 St: TryStream<Error = Error>,
46 St::Ok: AsRef<[u8]>,
47{
48 fn poll_read(
49 self: Pin<&mut Self>,
50 cx: &mut Context<'_>,
51 buf: &mut [u8],
52 ) -> Poll<Result<usize>> {
53 let mut this = self.project();
54
55 loop {
56 match this.state {
57 ReadState::Ready { chunk, chunk_start } => {
58 let chunk = chunk.as_ref();
59 let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
60
61 buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]);
62 *chunk_start += len;
63
64 if chunk.len() == *chunk_start {
65 *this.state = ReadState::PendingChunk;
66 }
67
68 return Poll::Ready(Ok(len));
69 }
70 ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
71 Some(Ok(chunk)) => {
72 if !chunk.as_ref().is_empty() {
73 *this.state = ReadState::Ready { chunk, chunk_start: 0 };
74 }
75 }
76 Some(Err(err)) => {
77 *this.state = ReadState::Eof;
78 return Poll::Ready(Err(err));
79 }
80 None => {
81 *this.state = ReadState::Eof;
82 return Poll::Ready(Ok(0));
83 }
84 },
85 ReadState::Eof => {
86 return Poll::Ready(Ok(0));
87 }
88 }
89 }
90 }
91}
92
93impl<St> AsyncWrite for IntoAsyncRead<St>
94where
95 St: TryStream<Error = Error> + AsyncWrite,
96 St::Ok: AsRef<[u8]>,
97{
98 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
99 let this: Projection<'_, St> = self.project();
100 this.stream.poll_write(cx, buf)
101 }
102
103 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
104 let this: Projection<'_, St> = self.project();
105 this.stream.poll_flush(cx)
106 }
107
108 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
109 let this: Projection<'_, St> = self.project();
110 this.stream.poll_close(cx)
111 }
112}
113
114impl<St> AsyncBufRead for IntoAsyncRead<St>
115where
116 St: TryStream<Error = Error>,
117 St::Ok: AsRef<[u8]>,
118{
119 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
120 let mut this = self.project();
121
122 while let ReadState::PendingChunk = this.state {
123 match ready!(this.stream.as_mut().try_poll_next(cx)) {
124 Some(Ok(chunk)) => {
125 if !chunk.as_ref().is_empty() {
126 *this.state = ReadState::Ready { chunk, chunk_start: 0 };
127 }
128 }
129 Some(Err(err)) => {
130 *this.state = ReadState::Eof;
131 return Poll::Ready(Err(err));
132 }
133 None => {
134 *this.state = ReadState::Eof;
135 return Poll::Ready(Ok(&[]));
136 }
137 }
138 }
139
140 if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
141 let chunk = chunk.as_ref();
142 return Poll::Ready(Ok(&chunk[chunk_start..]));
143 }
144
145 // To get to this point we must be in ReadState::Eof
146 Poll::Ready(Ok(&[]))
147 }
148
149 fn consume(self: Pin<&mut Self>, amount: usize) {
150 let this = self.project();
151
152 // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
153 if amount == 0 {
154 return;
155 }
156 if let ReadState::Ready { chunk, chunk_start } = this.state {
157 *chunk_start += amount;
158 debug_assert!(*chunk_start <= chunk.as_ref().len());
159 if *chunk_start >= chunk.as_ref().len() {
160 *this.state = ReadState::PendingChunk;
161 }
162 } else {
163 debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
164 }
165 }
166}
167