| 1 | use crate::io::util::{BufReader, BufWriter}; |
| 2 | use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; |
| 3 | |
| 4 | use pin_project_lite::pin_project; |
| 5 | use std::io::{self, IoSlice, SeekFrom}; |
| 6 | use std::pin::Pin; |
| 7 | use std::task::{Context, Poll}; |
| 8 | |
| 9 | pin_project! { |
| 10 | /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. |
| 11 | /// |
| 12 | /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`] |
| 13 | /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall |
| 14 | /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`] |
| 15 | /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps |
| 16 | /// one in the other so that both directions are buffered. See their documentation for details. |
| 17 | #[derive (Debug)] |
| 18 | #[cfg_attr (docsrs, doc(cfg(feature = "io-util" )))] |
| 19 | pub struct BufStream<RW> { |
| 20 | #[pin] |
| 21 | inner: BufReader<BufWriter<RW>>, |
| 22 | } |
| 23 | } |
| 24 | |
| 25 | impl<RW: AsyncRead + AsyncWrite> BufStream<RW> { |
| 26 | /// Wraps a type in both [`BufWriter`] and [`BufReader`]. |
| 27 | /// |
| 28 | /// See the documentation for those types and [`BufStream`] for details. |
| 29 | pub fn new(stream: RW) -> BufStream<RW> { |
| 30 | BufStream { |
| 31 | inner: BufReader::new(BufWriter::new(stream)), |
| 32 | } |
| 33 | } |
| 34 | |
| 35 | /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`] |
| 36 | /// capacity. |
| 37 | /// |
| 38 | /// See the documentation for those types and [`BufStream`] for details. |
| 39 | pub fn with_capacity( |
| 40 | reader_capacity: usize, |
| 41 | writer_capacity: usize, |
| 42 | stream: RW, |
| 43 | ) -> BufStream<RW> { |
| 44 | BufStream { |
| 45 | inner: BufReader::with_capacity( |
| 46 | reader_capacity, |
| 47 | BufWriter::with_capacity(writer_capacity, stream), |
| 48 | ), |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | /// Gets a reference to the underlying I/O object. |
| 53 | /// |
| 54 | /// It is inadvisable to directly read from the underlying I/O object. |
| 55 | pub fn get_ref(&self) -> &RW { |
| 56 | self.inner.get_ref().get_ref() |
| 57 | } |
| 58 | |
| 59 | /// Gets a mutable reference to the underlying I/O object. |
| 60 | /// |
| 61 | /// It is inadvisable to directly read from the underlying I/O object. |
| 62 | pub fn get_mut(&mut self) -> &mut RW { |
| 63 | self.inner.get_mut().get_mut() |
| 64 | } |
| 65 | |
| 66 | /// Gets a pinned mutable reference to the underlying I/O object. |
| 67 | /// |
| 68 | /// It is inadvisable to directly read from the underlying I/O object. |
| 69 | pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> { |
| 70 | self.project().inner.get_pin_mut().get_pin_mut() |
| 71 | } |
| 72 | |
| 73 | /// Consumes this `BufStream`, returning the underlying I/O object. |
| 74 | /// |
| 75 | /// Note that any leftover data in the internal buffer is lost. |
| 76 | pub fn into_inner(self) -> RW { |
| 77 | self.inner.into_inner().into_inner() |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> { |
| 82 | fn from(b: BufReader<BufWriter<RW>>) -> Self { |
| 83 | BufStream { inner: b } |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { |
| 88 | fn from(b: BufWriter<BufReader<RW>>) -> Self { |
| 89 | // we need to "invert" the reader and writer |
| 90 | let BufWriter { |
| 91 | inner: |
| 92 | BufReader { |
| 93 | inner, |
| 94 | buf: rbuf, |
| 95 | pos, |
| 96 | cap, |
| 97 | seek_state: rseek_state, |
| 98 | }, |
| 99 | buf: wbuf, |
| 100 | written, |
| 101 | seek_state: wseek_state, |
| 102 | } = b; |
| 103 | |
| 104 | BufStream { |
| 105 | inner: BufReader { |
| 106 | inner: BufWriter { |
| 107 | inner, |
| 108 | buf: wbuf, |
| 109 | written, |
| 110 | seek_state: wseek_state, |
| 111 | }, |
| 112 | buf: rbuf, |
| 113 | pos, |
| 114 | cap, |
| 115 | seek_state: rseek_state, |
| 116 | }, |
| 117 | } |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> { |
| 122 | fn poll_write( |
| 123 | self: Pin<&mut Self>, |
| 124 | cx: &mut Context<'_>, |
| 125 | buf: &[u8], |
| 126 | ) -> Poll<io::Result<usize>> { |
| 127 | self.project().inner.poll_write(cx, buf) |
| 128 | } |
| 129 | |
| 130 | fn poll_write_vectored( |
| 131 | self: Pin<&mut Self>, |
| 132 | cx: &mut Context<'_>, |
| 133 | bufs: &[IoSlice<'_>], |
| 134 | ) -> Poll<io::Result<usize>> { |
| 135 | self.project().inner.poll_write_vectored(cx, bufs) |
| 136 | } |
| 137 | |
| 138 | fn is_write_vectored(&self) -> bool { |
| 139 | self.inner.is_write_vectored() |
| 140 | } |
| 141 | |
| 142 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 143 | self.project().inner.poll_flush(cx) |
| 144 | } |
| 145 | |
| 146 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 147 | self.project().inner.poll_shutdown(cx) |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> { |
| 152 | fn poll_read( |
| 153 | self: Pin<&mut Self>, |
| 154 | cx: &mut Context<'_>, |
| 155 | buf: &mut ReadBuf<'_>, |
| 156 | ) -> Poll<io::Result<()>> { |
| 157 | self.project().inner.poll_read(cx, buf) |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | /// Seek to an offset, in bytes, in the underlying stream. |
| 162 | /// |
| 163 | /// The position used for seeking with `SeekFrom::Current(_)` is the |
| 164 | /// position the underlying stream would be at if the `BufStream` had no |
| 165 | /// internal buffer. |
| 166 | /// |
| 167 | /// Seeking always discards the internal buffer, even if the seek position |
| 168 | /// would otherwise fall within it. This guarantees that calling |
| 169 | /// `.into_inner()` immediately after a seek yields the underlying reader |
| 170 | /// at the same position. |
| 171 | /// |
| 172 | /// See [`AsyncSeek`] for more details. |
| 173 | /// |
| 174 | /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` |
| 175 | /// where `n` minus the internal buffer length overflows an `i64`, two |
| 176 | /// seeks will be performed instead of one. If the second seek returns |
| 177 | /// `Err`, the underlying reader will be left at the same position it would |
| 178 | /// have if you called `seek` with `SeekFrom::Current(0)`. |
| 179 | impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> { |
| 180 | fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { |
| 181 | self.project().inner.start_seek(position) |
| 182 | } |
| 183 | |
| 184 | fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { |
| 185 | self.project().inner.poll_complete(cx) |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> { |
| 190 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
| 191 | self.project().inner.poll_fill_buf(cx) |
| 192 | } |
| 193 | |
| 194 | fn consume(self: Pin<&mut Self>, amt: usize) { |
| 195 | self.project().inner.consume(amt); |
| 196 | } |
| 197 | } |
| 198 | |
| 199 | #[cfg (test)] |
| 200 | mod tests { |
| 201 | use super::*; |
| 202 | |
| 203 | #[test ] |
| 204 | fn assert_unpin() { |
| 205 | crate::is_unpin::<BufStream<()>>(); |
| 206 | } |
| 207 | } |
| 208 | |