1use crate::io::util::{BufReader, BufWriter};
2use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
3
4use pin_project_lite::pin_project;
5use std::io::{self, IoSlice, SeekFrom};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project! {
10 /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output.
11 ///
12 /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`]
13 /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall
14 /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`]
15 /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps
16 /// one in the other so that both directions are buffered. See their documentation for details.
17 #[derive(Debug)]
18 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
19 pub struct BufStream<RW> {
20 #[pin]
21 inner: BufReader<BufWriter<RW>>,
22 }
23}
24
25impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
26 /// Wraps a type in both [`BufWriter`] and [`BufReader`].
27 ///
28 /// See the documentation for those types and [`BufStream`] for details.
29 pub fn new(stream: RW) -> BufStream<RW> {
30 BufStream {
31 inner: BufReader::new(BufWriter::new(stream)),
32 }
33 }
34
35 /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`]
36 /// capacity.
37 ///
38 /// See the documentation for those types and [`BufStream`] for details.
39 pub fn with_capacity(
40 reader_capacity: usize,
41 writer_capacity: usize,
42 stream: RW,
43 ) -> BufStream<RW> {
44 BufStream {
45 inner: BufReader::with_capacity(
46 reader_capacity,
47 BufWriter::with_capacity(writer_capacity, stream),
48 ),
49 }
50 }
51
52 /// Gets a reference to the underlying I/O object.
53 ///
54 /// It is inadvisable to directly read from the underlying I/O object.
55 pub fn get_ref(&self) -> &RW {
56 self.inner.get_ref().get_ref()
57 }
58
59 /// Gets a mutable reference to the underlying I/O object.
60 ///
61 /// It is inadvisable to directly read from the underlying I/O object.
62 pub fn get_mut(&mut self) -> &mut RW {
63 self.inner.get_mut().get_mut()
64 }
65
66 /// Gets a pinned mutable reference to the underlying I/O object.
67 ///
68 /// It is inadvisable to directly read from the underlying I/O object.
69 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> {
70 self.project().inner.get_pin_mut().get_pin_mut()
71 }
72
73 /// Consumes this `BufStream`, returning the underlying I/O object.
74 ///
75 /// Note that any leftover data in the internal buffer is lost.
76 pub fn into_inner(self) -> RW {
77 self.inner.into_inner().into_inner()
78 }
79}
80
81impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> {
82 fn from(b: BufReader<BufWriter<RW>>) -> Self {
83 BufStream { inner: b }
84 }
85}
86
87impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
88 fn from(b: BufWriter<BufReader<RW>>) -> Self {
89 // we need to "invert" the reader and writer
90 let BufWriter {
91 inner:
92 BufReader {
93 inner,
94 buf: rbuf,
95 pos,
96 cap,
97 seek_state: rseek_state,
98 },
99 buf: wbuf,
100 written,
101 seek_state: wseek_state,
102 } = b;
103
104 BufStream {
105 inner: BufReader {
106 inner: BufWriter {
107 inner,
108 buf: wbuf,
109 written,
110 seek_state: wseek_state,
111 },
112 buf: rbuf,
113 pos,
114 cap,
115 seek_state: rseek_state,
116 },
117 }
118 }
119}
120
121impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
122 fn poll_write(
123 self: Pin<&mut Self>,
124 cx: &mut Context<'_>,
125 buf: &[u8],
126 ) -> Poll<io::Result<usize>> {
127 self.project().inner.poll_write(cx, buf)
128 }
129
130 fn poll_write_vectored(
131 self: Pin<&mut Self>,
132 cx: &mut Context<'_>,
133 bufs: &[IoSlice<'_>],
134 ) -> Poll<io::Result<usize>> {
135 self.project().inner.poll_write_vectored(cx, bufs)
136 }
137
138 fn is_write_vectored(&self) -> bool {
139 self.inner.is_write_vectored()
140 }
141
142 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
143 self.project().inner.poll_flush(cx)
144 }
145
146 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
147 self.project().inner.poll_shutdown(cx)
148 }
149}
150
151impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
152 fn poll_read(
153 self: Pin<&mut Self>,
154 cx: &mut Context<'_>,
155 buf: &mut ReadBuf<'_>,
156 ) -> Poll<io::Result<()>> {
157 self.project().inner.poll_read(cx, buf)
158 }
159}
160
161/// Seek to an offset, in bytes, in the underlying stream.
162///
163/// The position used for seeking with `SeekFrom::Current(_)` is the
164/// position the underlying stream would be at if the `BufStream` had no
165/// internal buffer.
166///
167/// Seeking always discards the internal buffer, even if the seek position
168/// would otherwise fall within it. This guarantees that calling
169/// `.into_inner()` immediately after a seek yields the underlying reader
170/// at the same position.
171///
172/// See [`AsyncSeek`] for more details.
173///
174/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
175/// where `n` minus the internal buffer length overflows an `i64`, two
176/// seeks will be performed instead of one. If the second seek returns
177/// `Err`, the underlying reader will be left at the same position it would
178/// have if you called `seek` with `SeekFrom::Current(0)`.
179impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> {
180 fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
181 self.project().inner.start_seek(position)
182 }
183
184 fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
185 self.project().inner.poll_complete(cx)
186 }
187}
188
189impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
190 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
191 self.project().inner.poll_fill_buf(cx)
192 }
193
194 fn consume(self: Pin<&mut Self>, amt: usize) {
195 self.project().inner.consume(amt);
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn assert_unpin() {
205 crate::is_unpin::<BufStream<()>>();
206 }
207}
208