1use super::DEFAULT_BUF_SIZE;
2use futures_core::future::Future;
3use futures_core::ready;
4use futures_core::task::{Context, Poll};
5use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
6use pin_project_lite::pin_project;
7use std::io::{self, Read};
8use std::pin::Pin;
9use std::{cmp, fmt};
10
11pin_project! {
12 /// The `BufReader` struct adds buffering to any reader.
13 ///
14 /// It can be excessively inefficient to work directly with a [`AsyncRead`]
15 /// instance. A `BufReader` performs large, infrequent reads on the underlying
16 /// [`AsyncRead`] and maintains an in-memory buffer of the results.
17 ///
18 /// `BufReader` can improve the speed of programs that make *small* and
19 /// *repeated* read calls to the same file or network socket. It does not
20 /// help when reading very large amounts at once, or reading just one or a few
21 /// times. It also provides no advantage when reading from a source that is
22 /// already in memory, like a `Vec<u8>`.
23 ///
24 /// When the `BufReader` is dropped, the contents of its buffer will be
25 /// discarded. Creating multiple instances of a `BufReader` on the same
26 /// stream can cause data loss.
27 ///
28 /// [`AsyncRead`]: futures_io::AsyncRead
29 ///
30 // TODO: Examples
31 pub struct BufReader<R> {
32 #[pin]
33 inner: R,
34 buffer: Box<[u8]>,
35 pos: usize,
36 cap: usize,
37 }
38}
39
40impl<R: AsyncRead> BufReader<R> {
41 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
42 /// but may change in the future.
43 pub fn new(inner: R) -> Self {
44 Self::with_capacity(DEFAULT_BUF_SIZE, inner)
45 }
46
47 /// Creates a new `BufReader` with the specified buffer capacity.
48 pub fn with_capacity(capacity: usize, inner: R) -> Self {
49 unsafe {
50 let mut buffer = Vec::with_capacity(capacity);
51 buffer.set_len(capacity);
52 super::initialize(&inner, &mut buffer);
53 Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
54 }
55 }
56
57 delegate_access_inner!(inner, R, ());
58
59 /// Returns a reference to the internally buffered data.
60 ///
61 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
62 pub fn buffer(&self) -> &[u8] {
63 &self.buffer[self.pos..self.cap]
64 }
65
66 /// Invalidates all data in the internal buffer.
67 #[inline]
68 fn discard_buffer(self: Pin<&mut Self>) {
69 let this = self.project();
70 *this.pos = 0;
71 *this.cap = 0;
72 }
73}
74
75impl<R: AsyncRead + AsyncSeek> BufReader<R> {
76 /// Seeks relative to the current position. If the new position lies within the buffer,
77 /// the buffer will not be flushed, allowing for more efficient seeks.
78 /// This method does not return the location of the underlying reader, so the caller
79 /// must track this information themselves if it is required.
80 pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
81 SeeKRelative { inner: self, offset, first: true }
82 }
83
84 /// Attempts to seek relative to the current position. If the new position lies within the buffer,
85 /// the buffer will not be flushed, allowing for more efficient seeks.
86 /// This method does not return the location of the underlying reader, so the caller
87 /// must track this information themselves if it is required.
88 pub fn poll_seek_relative(
89 self: Pin<&mut Self>,
90 cx: &mut Context<'_>,
91 offset: i64,
92 ) -> Poll<io::Result<()>> {
93 let pos = self.pos as u64;
94 if offset < 0 {
95 if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
96 *self.project().pos = new_pos as usize;
97 return Poll::Ready(Ok(()));
98 }
99 } else if let Some(new_pos) = pos.checked_add(offset as u64) {
100 if new_pos <= self.cap as u64 {
101 *self.project().pos = new_pos as usize;
102 return Poll::Ready(Ok(()));
103 }
104 }
105 self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
106 }
107}
108
109impl<R: AsyncRead> AsyncRead for BufReader<R> {
110 fn poll_read(
111 mut self: Pin<&mut Self>,
112 cx: &mut Context<'_>,
113 buf: &mut [u8],
114 ) -> Poll<io::Result<usize>> {
115 // If we don't have any buffered data and we're doing a massive read
116 // (larger than our internal buffer), bypass our internal buffer
117 // entirely.
118 if self.pos == self.cap && buf.len() >= self.buffer.len() {
119 let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
120 self.discard_buffer();
121 return Poll::Ready(res);
122 }
123 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
124 let nread = rem.read(buf)?;
125 self.consume(nread);
126 Poll::Ready(Ok(nread))
127 }
128
129 fn poll_read_vectored(
130 mut self: Pin<&mut Self>,
131 cx: &mut Context<'_>,
132 bufs: &mut [IoSliceMut<'_>],
133 ) -> Poll<io::Result<usize>> {
134 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
135 if self.pos == self.cap && total_len >= self.buffer.len() {
136 let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
137 self.discard_buffer();
138 return Poll::Ready(res);
139 }
140 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
141 let nread = rem.read_vectored(bufs)?;
142 self.consume(nread);
143 Poll::Ready(Ok(nread))
144 }
145}
146
147impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
148 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
149 let this: Projection<'_, R> = self.project();
150
151 // If we've reached the end of our internal buffer then we need to fetch
152 // some more data from the underlying reader.
153 // Branch using `>=` instead of the more correct `==`
154 // to tell the compiler that the pos..cap slice is always valid.
155 if *this.pos >= *this.cap {
156 debug_assert!(*this.pos == *this.cap);
157 *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
158 *this.pos = 0;
159 }
160 Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
161 }
162
163 fn consume(self: Pin<&mut Self>, amt: usize) {
164 *self.project().pos = cmp::min(self.pos + amt, self.cap);
165 }
166}
167
168impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
169 delegate_async_write!(inner);
170}
171
172impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
173 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174 f&mut DebugStruct<'_, '_>.debug_struct("BufReader")
175 .field("reader", &self.inner)
176 .field(name:"buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
177 .finish()
178 }
179}
180
181impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
182 /// Seek to an offset, in bytes, in the underlying reader.
183 ///
184 /// The position used for seeking with `SeekFrom::Current(_)` is the
185 /// position the underlying reader would be at if the `BufReader` had no
186 /// internal buffer.
187 ///
188 /// Seeking always discards the internal buffer, even if the seek position
189 /// would otherwise fall within it. This guarantees that calling
190 /// `.into_inner()` immediately after a seek yields the underlying reader
191 /// at the same position.
192 ///
193 /// To seek without discarding the internal buffer, use
194 /// [`BufReader::seek_relative`](BufReader::seek_relative) or
195 /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
196 ///
197 /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
198 ///
199 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
200 /// where `n` minus the internal buffer length overflows an `i64`, two
201 /// seeks will be performed instead of one. If the second seek returns
202 /// `Err`, the underlying reader will be left at the same position it would
203 /// have if you called `seek` with `SeekFrom::Current(0)`.
204 fn poll_seek(
205 mut self: Pin<&mut Self>,
206 cx: &mut Context<'_>,
207 pos: SeekFrom,
208 ) -> Poll<io::Result<u64>> {
209 let result: u64;
210 if let SeekFrom::Current(n) = pos {
211 let remainder = (self.cap - self.pos) as i64;
212 // it should be safe to assume that remainder fits within an i64 as the alternative
213 // means we managed to allocate 8 exbibytes and that's absurd.
214 // But it's not out of the realm of possibility for some weird underlying reader to
215 // support seeking by i64::min_value() so we need to handle underflow when subtracting
216 // remainder.
217 if let Some(offset) = n.checked_sub(remainder) {
218 result =
219 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
220 } else {
221 // seek backwards by our remainder, and then by the offset
222 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
223 self.as_mut().discard_buffer();
224 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
225 }
226 } else {
227 // Seeking with Start/End doesn't care about our buffer length.
228 result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
229 }
230 self.discard_buffer();
231 Poll::Ready(Ok(result))
232 }
233}
234
235/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
236#[derive(Debug)]
237#[must_use = "futures do nothing unless polled"]
238pub struct SeeKRelative<'a, R> {
239 inner: Pin<&'a mut BufReader<R>>,
240 offset: i64,
241 first: bool,
242}
243
244impl<R> Future for SeeKRelative<'_, R>
245where
246 R: AsyncRead + AsyncSeek,
247{
248 type Output = io::Result<()>;
249
250 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
251 let offset: i64 = self.offset;
252 if self.first {
253 self.first = false;
254 self.inner.as_mut().poll_seek_relative(cx, offset)
255 } else {
256 self.inner
257 .as_mut()
258 .as_mut()
259 .poll_seek(cx, pos:SeekFrom::Current(offset))
260 .map(|res: Result| res.map(|_| ()))
261 }
262 }
263}
264