1 | use crate::io::util::{BufReader, BufWriter}; |
2 | use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; |
3 | |
4 | use pin_project_lite::pin_project; |
5 | use std::io::{self, IoSlice, SeekFrom}; |
6 | use std::pin::Pin; |
7 | use std::task::{Context, Poll}; |
8 | |
9 | pin_project! { |
10 | /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. |
11 | /// |
12 | /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`] |
13 | /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall |
14 | /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`] |
15 | /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps |
16 | /// one in the other so that both directions are buffered. See their documentation for details. |
17 | #[derive(Debug)] |
18 | #[cfg_attr(docsrs, doc(cfg(feature = "io-util" )))] |
19 | pub struct BufStream<RW> { |
20 | #[pin] |
21 | inner: BufReader<BufWriter<RW>>, |
22 | } |
23 | } |
24 | |
25 | impl<RW: AsyncRead + AsyncWrite> BufStream<RW> { |
26 | /// Wraps a type in both [`BufWriter`] and [`BufReader`]. |
27 | /// |
28 | /// See the documentation for those types and [`BufStream`] for details. |
29 | pub fn new(stream: RW) -> BufStream<RW> { |
30 | BufStream { |
31 | inner: BufReader::new(BufWriter::new(stream)), |
32 | } |
33 | } |
34 | |
35 | /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`] |
36 | /// capacity. |
37 | /// |
38 | /// See the documentation for those types and [`BufStream`] for details. |
39 | pub fn with_capacity( |
40 | reader_capacity: usize, |
41 | writer_capacity: usize, |
42 | stream: RW, |
43 | ) -> BufStream<RW> { |
44 | BufStream { |
45 | inner: BufReader::with_capacity( |
46 | reader_capacity, |
47 | BufWriter::with_capacity(writer_capacity, stream), |
48 | ), |
49 | } |
50 | } |
51 | |
52 | /// Gets a reference to the underlying I/O object. |
53 | /// |
54 | /// It is inadvisable to directly read from the underlying I/O object. |
55 | pub fn get_ref(&self) -> &RW { |
56 | self.inner.get_ref().get_ref() |
57 | } |
58 | |
59 | /// Gets a mutable reference to the underlying I/O object. |
60 | /// |
61 | /// It is inadvisable to directly read from the underlying I/O object. |
62 | pub fn get_mut(&mut self) -> &mut RW { |
63 | self.inner.get_mut().get_mut() |
64 | } |
65 | |
66 | /// Gets a pinned mutable reference to the underlying I/O object. |
67 | /// |
68 | /// It is inadvisable to directly read from the underlying I/O object. |
69 | pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> { |
70 | self.project().inner.get_pin_mut().get_pin_mut() |
71 | } |
72 | |
73 | /// Consumes this `BufStream`, returning the underlying I/O object. |
74 | /// |
75 | /// Note that any leftover data in the internal buffer is lost. |
76 | pub fn into_inner(self) -> RW { |
77 | self.inner.into_inner().into_inner() |
78 | } |
79 | } |
80 | |
81 | impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> { |
82 | fn from(b: BufReader<BufWriter<RW>>) -> Self { |
83 | BufStream { inner: b } |
84 | } |
85 | } |
86 | |
87 | impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { |
88 | fn from(b: BufWriter<BufReader<RW>>) -> Self { |
89 | // we need to "invert" the reader and writer |
90 | let BufWriter { |
91 | inner: |
92 | BufReader { |
93 | inner, |
94 | buf: rbuf, |
95 | pos, |
96 | cap, |
97 | seek_state: rseek_state, |
98 | }, |
99 | buf: wbuf, |
100 | written, |
101 | seek_state: wseek_state, |
102 | } = b; |
103 | |
104 | BufStream { |
105 | inner: BufReader { |
106 | inner: BufWriter { |
107 | inner, |
108 | buf: wbuf, |
109 | written, |
110 | seek_state: wseek_state, |
111 | }, |
112 | buf: rbuf, |
113 | pos, |
114 | cap, |
115 | seek_state: rseek_state, |
116 | }, |
117 | } |
118 | } |
119 | } |
120 | |
121 | impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> { |
122 | fn poll_write( |
123 | self: Pin<&mut Self>, |
124 | cx: &mut Context<'_>, |
125 | buf: &[u8], |
126 | ) -> Poll<io::Result<usize>> { |
127 | self.project().inner.poll_write(cx, buf) |
128 | } |
129 | |
130 | fn poll_write_vectored( |
131 | self: Pin<&mut Self>, |
132 | cx: &mut Context<'_>, |
133 | bufs: &[IoSlice<'_>], |
134 | ) -> Poll<io::Result<usize>> { |
135 | self.project().inner.poll_write_vectored(cx, bufs) |
136 | } |
137 | |
138 | fn is_write_vectored(&self) -> bool { |
139 | self.inner.is_write_vectored() |
140 | } |
141 | |
142 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
143 | self.project().inner.poll_flush(cx) |
144 | } |
145 | |
146 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
147 | self.project().inner.poll_shutdown(cx) |
148 | } |
149 | } |
150 | |
151 | impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> { |
152 | fn poll_read( |
153 | self: Pin<&mut Self>, |
154 | cx: &mut Context<'_>, |
155 | buf: &mut ReadBuf<'_>, |
156 | ) -> Poll<io::Result<()>> { |
157 | self.project().inner.poll_read(cx, buf) |
158 | } |
159 | } |
160 | |
161 | /// Seek to an offset, in bytes, in the underlying stream. |
162 | /// |
163 | /// The position used for seeking with `SeekFrom::Current(_)` is the |
164 | /// position the underlying stream would be at if the `BufStream` had no |
165 | /// internal buffer. |
166 | /// |
167 | /// Seeking always discards the internal buffer, even if the seek position |
168 | /// would otherwise fall within it. This guarantees that calling |
169 | /// `.into_inner()` immediately after a seek yields the underlying reader |
170 | /// at the same position. |
171 | /// |
172 | /// See [`AsyncSeek`] for more details. |
173 | /// |
174 | /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` |
175 | /// where `n` minus the internal buffer length overflows an `i64`, two |
176 | /// seeks will be performed instead of one. If the second seek returns |
177 | /// `Err`, the underlying reader will be left at the same position it would |
178 | /// have if you called `seek` with `SeekFrom::Current(0)`. |
179 | impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> { |
180 | fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { |
181 | self.project().inner.start_seek(position) |
182 | } |
183 | |
184 | fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { |
185 | self.project().inner.poll_complete(cx) |
186 | } |
187 | } |
188 | |
189 | impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> { |
190 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
191 | self.project().inner.poll_fill_buf(cx) |
192 | } |
193 | |
194 | fn consume(self: Pin<&mut Self>, amt: usize) { |
195 | self.project().inner.consume(amt); |
196 | } |
197 | } |
198 | |
199 | #[cfg (test)] |
200 | mod tests { |
201 | use super::*; |
202 | |
203 | #[test ] |
204 | fn assert_unpin() { |
205 | crate::is_unpin::<BufStream<()>>(); |
206 | } |
207 | } |
208 | |