1 | use core::pin::Pin; |
2 | use futures_core::ready; |
3 | use futures_core::stream::TryStream; |
4 | use futures_core::task::{Context, Poll}; |
5 | use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; |
6 | use pin_project_lite::pin_project; |
7 | use std::cmp; |
8 | use std::io::{Error, Result}; |
9 | |
10 | pin_project! { |
11 | /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. |
12 | #[derive(Debug)] |
13 | #[must_use = "readers do nothing unless polled" ] |
14 | #[cfg_attr(docsrs, doc(cfg(feature = "io" )))] |
15 | pub struct IntoAsyncRead<St> |
16 | where |
17 | St: TryStream<Error = Error>, |
18 | St::Ok: AsRef<[u8]>, |
19 | { |
20 | #[pin] |
21 | stream: St, |
22 | state: ReadState<St::Ok>, |
23 | } |
24 | } |
25 | |
26 | #[derive (Debug)] |
27 | enum ReadState<T: AsRef<[u8]>> { |
28 | Ready { chunk: T, chunk_start: usize }, |
29 | PendingChunk, |
30 | Eof, |
31 | } |
32 | |
33 | impl<St> IntoAsyncRead<St> |
34 | where |
35 | St: TryStream<Error = Error>, |
36 | St::Ok: AsRef<[u8]>, |
37 | { |
38 | pub(super) fn new(stream: St) -> Self { |
39 | Self { stream, state: ReadState::PendingChunk } |
40 | } |
41 | } |
42 | |
43 | impl<St> AsyncRead for IntoAsyncRead<St> |
44 | where |
45 | St: TryStream<Error = Error>, |
46 | St::Ok: AsRef<[u8]>, |
47 | { |
48 | fn poll_read( |
49 | self: Pin<&mut Self>, |
50 | cx: &mut Context<'_>, |
51 | buf: &mut [u8], |
52 | ) -> Poll<Result<usize>> { |
53 | let mut this = self.project(); |
54 | |
55 | loop { |
56 | match this.state { |
57 | ReadState::Ready { chunk, chunk_start } => { |
58 | let chunk = chunk.as_ref(); |
59 | let len = cmp::min(buf.len(), chunk.len() - *chunk_start); |
60 | |
61 | buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]); |
62 | *chunk_start += len; |
63 | |
64 | if chunk.len() == *chunk_start { |
65 | *this.state = ReadState::PendingChunk; |
66 | } |
67 | |
68 | return Poll::Ready(Ok(len)); |
69 | } |
70 | ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { |
71 | Some(Ok(chunk)) => { |
72 | if !chunk.as_ref().is_empty() { |
73 | *this.state = ReadState::Ready { chunk, chunk_start: 0 }; |
74 | } |
75 | } |
76 | Some(Err(err)) => { |
77 | *this.state = ReadState::Eof; |
78 | return Poll::Ready(Err(err)); |
79 | } |
80 | None => { |
81 | *this.state = ReadState::Eof; |
82 | return Poll::Ready(Ok(0)); |
83 | } |
84 | }, |
85 | ReadState::Eof => { |
86 | return Poll::Ready(Ok(0)); |
87 | } |
88 | } |
89 | } |
90 | } |
91 | } |
92 | |
93 | impl<St> AsyncWrite for IntoAsyncRead<St> |
94 | where |
95 | St: TryStream<Error = Error> + AsyncWrite, |
96 | St::Ok: AsRef<[u8]>, |
97 | { |
98 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { |
99 | let this: Projection<'_, St> = self.project(); |
100 | this.stream.poll_write(cx, buf) |
101 | } |
102 | |
103 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
104 | let this: Projection<'_, St> = self.project(); |
105 | this.stream.poll_flush(cx) |
106 | } |
107 | |
108 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
109 | let this: Projection<'_, St> = self.project(); |
110 | this.stream.poll_close(cx) |
111 | } |
112 | } |
113 | |
114 | impl<St> AsyncBufRead for IntoAsyncRead<St> |
115 | where |
116 | St: TryStream<Error = Error>, |
117 | St::Ok: AsRef<[u8]>, |
118 | { |
119 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { |
120 | let mut this = self.project(); |
121 | |
122 | while let ReadState::PendingChunk = this.state { |
123 | match ready!(this.stream.as_mut().try_poll_next(cx)) { |
124 | Some(Ok(chunk)) => { |
125 | if !chunk.as_ref().is_empty() { |
126 | *this.state = ReadState::Ready { chunk, chunk_start: 0 }; |
127 | } |
128 | } |
129 | Some(Err(err)) => { |
130 | *this.state = ReadState::Eof; |
131 | return Poll::Ready(Err(err)); |
132 | } |
133 | None => { |
134 | *this.state = ReadState::Eof; |
135 | return Poll::Ready(Ok(&[])); |
136 | } |
137 | } |
138 | } |
139 | |
140 | if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { |
141 | let chunk = chunk.as_ref(); |
142 | return Poll::Ready(Ok(&chunk[chunk_start..])); |
143 | } |
144 | |
145 | // To get to this point we must be in ReadState::Eof |
146 | Poll::Ready(Ok(&[])) |
147 | } |
148 | |
149 | fn consume(self: Pin<&mut Self>, amount: usize) { |
150 | let this = self.project(); |
151 | |
152 | // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 |
153 | if amount == 0 { |
154 | return; |
155 | } |
156 | if let ReadState::Ready { chunk, chunk_start } = this.state { |
157 | *chunk_start += amount; |
158 | debug_assert!(*chunk_start <= chunk.as_ref().len()); |
159 | if *chunk_start >= chunk.as_ref().len() { |
160 | *this.state = ReadState::PendingChunk; |
161 | } |
162 | } else { |
163 | debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk" ); |
164 | } |
165 | } |
166 | } |
167 | |