1 | use std::io::{IoSliceMut, Read as _}; |
2 | use std::pin::Pin; |
3 | use std::{cmp, fmt}; |
4 | |
5 | use pin_project_lite::pin_project; |
6 | |
7 | use crate::io::{self, BufRead, Read, Seek, SeekFrom, DEFAULT_BUF_SIZE}; |
8 | use crate::task::{Context, Poll}; |
9 | |
10 | pin_project! { |
11 | /// Adds buffering to any reader. |
12 | /// |
13 | /// It can be excessively inefficient to work directly with a [`Read`] instance. A `BufReader` |
14 | /// performs large, infrequent reads on the underlying [`Read`] and maintains an in-memory buffer |
15 | /// of the incoming byte stream. |
16 | /// |
17 | /// `BufReader` can improve the speed of programs that make *small* and *repeated* read calls to |
18 | /// the same file or network socket. It does not help when reading very large amounts at once, or |
19 | /// reading just one or a few times. It also provides no advantage when reading from a source that |
20 | /// is already in memory, like a `Vec<u8>`. |
21 | /// |
22 | /// When the `BufReader` is dropped, the contents of its buffer will be discarded. Creating |
23 | /// multiple instances of a `BufReader` on the same stream can cause data loss. |
24 | /// |
25 | /// This type is an async version of [`std::io::BufReader`]. |
26 | /// |
27 | /// [`Read`]: trait.Read.html |
28 | /// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html |
29 | /// |
30 | /// # Examples |
31 | /// |
32 | /// ```no_run |
33 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
34 | /// # |
35 | /// use async_std::fs::File; |
36 | /// use async_std::io::BufReader; |
37 | /// use async_std::prelude::*; |
38 | /// |
39 | /// let mut file = BufReader::new(File::open("a.txt").await?); |
40 | /// |
41 | /// let mut line = String::new(); |
42 | /// file.read_line(&mut line).await?; |
43 | /// # |
44 | /// # Ok(()) }) } |
45 | /// ``` |
46 | pub struct BufReader<R> { |
47 | #[pin] |
48 | inner: R, |
49 | buf: Box<[u8]>, |
50 | pos: usize, |
51 | cap: usize, |
52 | } |
53 | } |
54 | |
55 | impl<R: io::Read> BufReader<R> { |
56 | /// Creates a buffered reader with default buffer capacity. |
57 | /// |
58 | /// The default capacity is currently 8 KB, but may change in the future. |
59 | /// |
60 | /// # Examples |
61 | /// |
62 | /// ```no_run |
63 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
64 | /// # |
65 | /// use async_std::fs::File; |
66 | /// use async_std::io::BufReader; |
67 | /// |
68 | /// let f = BufReader::new(File::open("a.txt" ).await?); |
69 | /// # |
70 | /// # Ok(()) }) } |
71 | /// ``` |
72 | pub fn new(inner: R) -> BufReader<R> { |
73 | BufReader::with_capacity(DEFAULT_BUF_SIZE, inner) |
74 | } |
75 | |
76 | /// Creates a new buffered reader with the specified capacity. |
77 | /// |
78 | /// # Examples |
79 | /// |
80 | /// ```no_run |
81 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
82 | /// # |
83 | /// use async_std::fs::File; |
84 | /// use async_std::io::BufReader; |
85 | /// |
86 | /// let f = BufReader::with_capacity(1024, File::open("a.txt" ).await?); |
87 | /// # |
88 | /// # Ok(()) }) } |
89 | /// ``` |
90 | pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> { |
91 | BufReader { |
92 | inner, |
93 | buf: vec![0; capacity].into_boxed_slice(), |
94 | pos: 0, |
95 | cap: 0, |
96 | } |
97 | } |
98 | } |
99 | |
100 | impl<R> BufReader<R> { |
101 | /// Gets a reference to the underlying reader. |
102 | /// |
103 | /// It is inadvisable to directly read from the underlying reader. |
104 | /// |
105 | /// # Examples |
106 | /// |
107 | /// ```no_run |
108 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
109 | /// # |
110 | /// use async_std::fs::File; |
111 | /// use async_std::io::BufReader; |
112 | /// |
113 | /// let f = BufReader::new(File::open("a.txt" ).await?); |
114 | /// let inner = f.get_ref(); |
115 | /// # |
116 | /// # Ok(()) }) } |
117 | /// ``` |
118 | pub fn get_ref(&self) -> &R { |
119 | &self.inner |
120 | } |
121 | |
122 | /// Gets a mutable reference to the underlying reader. |
123 | /// |
124 | /// It is inadvisable to directly read from the underlying reader. |
125 | /// |
126 | /// # Examples |
127 | /// |
128 | /// ```no_run |
129 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
130 | /// # |
131 | /// use async_std::fs::File; |
132 | /// use async_std::io::BufReader; |
133 | /// |
134 | /// let mut file = BufReader::new(File::open("a.txt" ).await?); |
135 | /// let inner = file.get_mut(); |
136 | /// # |
137 | /// # Ok(()) }) } |
138 | /// ``` |
139 | pub fn get_mut(&mut self) -> &mut R { |
140 | &mut self.inner |
141 | } |
142 | |
143 | /// Gets a pinned mutable reference to the underlying reader. |
144 | /// |
145 | /// It is inadvisable to directly read from the underlying reader. |
146 | fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { |
147 | self.project().inner |
148 | } |
149 | |
150 | /// Returns a reference to the internal buffer. |
151 | /// |
152 | /// This function will not attempt to fill the buffer if it is empty. |
153 | /// |
154 | /// # Examples |
155 | /// |
156 | /// ```no_run |
157 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
158 | /// # |
159 | /// use async_std::fs::File; |
160 | /// use async_std::io::BufReader; |
161 | /// |
162 | /// let f = BufReader::new(File::open("a.txt" ).await?); |
163 | /// let buffer = f.buffer(); |
164 | /// # |
165 | /// # Ok(()) }) } |
166 | /// ``` |
167 | pub fn buffer(&self) -> &[u8] { |
168 | &self.buf[self.pos..self.cap] |
169 | } |
170 | |
171 | /// Unwraps the buffered reader, returning the underlying reader. |
172 | /// |
173 | /// Note that any leftover data in the internal buffer is lost. |
174 | /// |
175 | /// # Examples |
176 | /// |
177 | /// ```no_run |
178 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
179 | /// # |
180 | /// use async_std::fs::File; |
181 | /// use async_std::io::BufReader; |
182 | /// |
183 | /// let f = BufReader::new(File::open("a.txt" ).await?); |
184 | /// let inner = f.into_inner(); |
185 | /// # |
186 | /// # Ok(()) }) } |
187 | /// ``` |
188 | pub fn into_inner(self) -> R { |
189 | self.inner |
190 | } |
191 | |
192 | /// Invalidates all data in the internal buffer. |
193 | #[inline ] |
194 | fn discard_buffer(self: Pin<&mut Self>) { |
195 | let this = self.project(); |
196 | *this.pos = 0; |
197 | *this.cap = 0; |
198 | } |
199 | } |
200 | |
201 | impl<R: Read> Read for BufReader<R> { |
202 | fn poll_read( |
203 | mut self: Pin<&mut Self>, |
204 | cx: &mut Context<'_>, |
205 | buf: &mut [u8], |
206 | ) -> Poll<io::Result<usize>> { |
207 | // If we don't have any buffered data and we're doing a massive read |
208 | // (larger than our internal buffer), bypass our internal buffer |
209 | // entirely. |
210 | if self.pos == self.cap && buf.len() >= self.buf.len() { |
211 | let res = futures_core::ready!(self.as_mut().get_pin_mut().poll_read(cx, buf)); |
212 | self.discard_buffer(); |
213 | return Poll::Ready(res); |
214 | } |
215 | let mut rem = futures_core::ready!(self.as_mut().poll_fill_buf(cx))?; |
216 | let nread = rem.read(buf)?; |
217 | self.consume(nread); |
218 | Poll::Ready(Ok(nread)) |
219 | } |
220 | |
221 | fn poll_read_vectored( |
222 | mut self: Pin<&mut Self>, |
223 | cx: &mut Context<'_>, |
224 | bufs: &mut [IoSliceMut<'_>], |
225 | ) -> Poll<io::Result<usize>> { |
226 | let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); |
227 | if self.pos == self.cap && total_len >= self.buf.len() { |
228 | let res = |
229 | futures_core::ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs)); |
230 | self.discard_buffer(); |
231 | return Poll::Ready(res); |
232 | } |
233 | let mut rem = futures_core::ready!(self.as_mut().poll_fill_buf(cx))?; |
234 | let nread = rem.read_vectored(bufs)?; |
235 | self.consume(nread); |
236 | Poll::Ready(Ok(nread)) |
237 | } |
238 | } |
239 | |
240 | impl<R: Read> BufRead for BufReader<R> { |
241 | fn poll_fill_buf<'a>( |
242 | self: Pin<&'a mut Self>, |
243 | cx: &mut Context<'_>, |
244 | ) -> Poll<io::Result<&'a [u8]>> { |
245 | let mut this: Projection<'_, R> = self.project(); |
246 | |
247 | // If we've reached the end of our internal buffer then we need to fetch |
248 | // some more data from the underlying reader. |
249 | // Branch using `>=` instead of the more correct `==` |
250 | // to tell the compiler that the pos..cap slice is always valid. |
251 | if *this.pos >= *this.cap { |
252 | debug_assert!(*this.pos == *this.cap); |
253 | *this.cap = futures_core::ready!(this.inner.as_mut().poll_read(cx, this.buf))?; |
254 | *this.pos = 0; |
255 | } |
256 | Poll::Ready(Ok(&this.buf[*this.pos..*this.cap])) |
257 | } |
258 | |
259 | fn consume(self: Pin<&mut Self>, amt: usize) { |
260 | let this: Projection<'_, R> = self.project(); |
261 | *this.pos = cmp::min(*this.pos + amt, *this.cap); |
262 | } |
263 | } |
264 | |
265 | impl<R: io::Read + fmt::Debug> fmt::Debug for BufReader<R> { |
266 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
267 | f&mut DebugStruct<'_, '_>.debug_struct("BufReader" ) |
268 | .field("reader" , &self.inner) |
269 | .field( |
270 | name:"buffer" , |
271 | &format_args!(" {}/ {}" , self.cap - self.pos, self.buf.len()), |
272 | ) |
273 | .finish() |
274 | } |
275 | } |
276 | |
277 | impl<R: Seek> Seek for BufReader<R> { |
278 | /// Seeks to an offset, in bytes, in the underlying reader. |
279 | /// |
280 | /// The position used for seeking with `SeekFrom::Current(_)` is the position the underlying |
281 | /// reader would be at if the `BufReader` had no internal buffer. |
282 | /// |
283 | /// Seeking always discards the internal buffer, even if the seek position would otherwise fall |
284 | /// within it. This guarantees that calling `.into_inner()` immediately after a seek yields the |
285 | /// underlying reader at the same position. |
286 | /// |
287 | /// See [`Seek`] for more details. |
288 | /// |
289 | /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the |
290 | /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If |
291 | /// the second seek returns `Err`, the underlying reader will be left at the same position it |
292 | /// would have if you called `seek` with `SeekFrom::Current(0)`. |
293 | /// |
294 | /// [`Seek`]: trait.Seek.html |
295 | fn poll_seek( |
296 | mut self: Pin<&mut Self>, |
297 | cx: &mut Context<'_>, |
298 | pos: SeekFrom, |
299 | ) -> Poll<io::Result<u64>> { |
300 | let result: u64; |
301 | if let SeekFrom::Current(n) = pos { |
302 | let remainder = (self.cap - self.pos) as i64; |
303 | // it should be safe to assume that remainder fits within an i64 as the alternative |
304 | // means we managed to allocate 8 exbibytes and that's absurd. |
305 | // But it's not out of the realm of possibility for some weird underlying reader to |
306 | // support seeking by i64::min_value() so we need to handle underflow when subtracting |
307 | // remainder. |
308 | if let Some(offset) = n.checked_sub(remainder) { |
309 | result = futures_core::ready!( |
310 | self.as_mut() |
311 | .get_pin_mut() |
312 | .poll_seek(cx, SeekFrom::Current(offset)) |
313 | )?; |
314 | } else { |
315 | // seek backwards by our remainder, and then by the offset |
316 | futures_core::ready!( |
317 | self.as_mut() |
318 | .get_pin_mut() |
319 | .poll_seek(cx, SeekFrom::Current(-remainder)) |
320 | )?; |
321 | self.as_mut().discard_buffer(); |
322 | result = futures_core::ready!( |
323 | self.as_mut() |
324 | .get_pin_mut() |
325 | .poll_seek(cx, SeekFrom::Current(n)) |
326 | )?; |
327 | } |
328 | } else { |
329 | // Seeking with Start/End doesn't care about our buffer length. |
330 | result = futures_core::ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?; |
331 | } |
332 | self.discard_buffer(); |
333 | Poll::Ready(Ok(result)) |
334 | } |
335 | } |
336 | |