| 1 | use core::slice::memchr; |
| 2 | |
| 3 | use crate::io::{self, BufWriter, IoSlice, Write}; |
| 4 | |
| 5 | /// Private helper struct for implementing the line-buffered writing logic. |
| 6 | /// |
| 7 | /// This shim temporarily wraps a BufWriter, and uses its internals to |
| 8 | /// implement a line-buffered writer (specifically by using the internal |
| 9 | /// methods like write_to_buf and flush_buf). In this way, a more |
| 10 | /// efficient abstraction can be created than one that only had access to |
| 11 | /// `write` and `flush`, without needlessly duplicating a lot of the |
| 12 | /// implementation details of BufWriter. This also allows existing |
| 13 | /// `BufWriters` to be temporarily given line-buffering logic; this is what |
| 14 | /// enables Stdout to be alternately in line-buffered or block-buffered mode. |
| 15 | #[derive (Debug)] |
| 16 | pub struct LineWriterShim<'a, W: ?Sized + Write> { |
| 17 | buffer: &'a mut BufWriter<W>, |
| 18 | } |
| 19 | |
| 20 | impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> { |
| 21 | pub fn new(buffer: &'a mut BufWriter<W>) -> Self { |
| 22 | Self { buffer } |
| 23 | } |
| 24 | |
| 25 | /// Gets a reference to the inner writer (that is, the writer |
| 26 | /// wrapped by the BufWriter). |
| 27 | fn inner(&self) -> &W { |
| 28 | self.buffer.get_ref() |
| 29 | } |
| 30 | |
| 31 | /// Gets a mutable reference to the inner writer (that is, the writer |
| 32 | /// wrapped by the BufWriter). Be careful with this writer, as writes to |
| 33 | /// it will bypass the buffer. |
| 34 | fn inner_mut(&mut self) -> &mut W { |
| 35 | self.buffer.get_mut() |
| 36 | } |
| 37 | |
| 38 | /// Gets the content currently buffered in self.buffer |
| 39 | fn buffered(&self) -> &[u8] { |
| 40 | self.buffer.buffer() |
| 41 | } |
| 42 | |
| 43 | /// Flushes the buffer iff the last byte is a newline (indicating that an |
| 44 | /// earlier write only succeeded partially, and we want to retry flushing |
| 45 | /// the buffered line before continuing with a subsequent write). |
| 46 | fn flush_if_completed_line(&mut self) -> io::Result<()> { |
| 47 | match self.buffered().last().copied() { |
| 48 | Some(b' \n' ) => self.buffer.flush_buf(), |
| 49 | _ => Ok(()), |
| 50 | } |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> { |
| 55 | /// Writes some data into this BufReader with line buffering. |
| 56 | /// |
| 57 | /// This means that, if any newlines are present in the data, the data up to |
| 58 | /// the last newline is sent directly to the underlying writer, and data |
| 59 | /// after it is buffered. Returns the number of bytes written. |
| 60 | /// |
| 61 | /// This function operates on a "best effort basis"; in keeping with the |
| 62 | /// convention of `Write::write`, it makes at most one attempt to write |
| 63 | /// new data to the underlying writer. If that write only reports a partial |
| 64 | /// success, the remaining data will be buffered. |
| 65 | /// |
| 66 | /// Because this function attempts to send completed lines to the underlying |
| 67 | /// writer, it will also flush the existing buffer if it ends with a |
| 68 | /// newline, even if the incoming data does not contain any newlines. |
| 69 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| 70 | let newline_idx = match memchr::memrchr(b' \n' , buf) { |
| 71 | // If there are no new newlines (that is, if this write is less than |
| 72 | // one line), just do a regular buffered write (which may flush if |
| 73 | // we exceed the inner buffer's size) |
| 74 | None => { |
| 75 | self.flush_if_completed_line()?; |
| 76 | return self.buffer.write(buf); |
| 77 | } |
| 78 | // Otherwise, arrange for the lines to be written directly to the |
| 79 | // inner writer. |
| 80 | Some(newline_idx) => newline_idx + 1, |
| 81 | }; |
| 82 | |
| 83 | // Flush existing content to prepare for our write. We have to do this |
| 84 | // before attempting to write `buf` in order to maintain consistency; |
| 85 | // if we add `buf` to the buffer then try to flush it all at once, |
| 86 | // we're obligated to return Ok(), which would mean suppressing any |
| 87 | // errors that occur during flush. |
| 88 | self.buffer.flush_buf()?; |
| 89 | |
| 90 | // This is what we're going to try to write directly to the inner |
| 91 | // writer. The rest will be buffered, if nothing goes wrong. |
| 92 | let lines = &buf[..newline_idx]; |
| 93 | |
| 94 | // Write `lines` directly to the inner writer. In keeping with the |
| 95 | // `write` convention, make at most one attempt to add new (unbuffered) |
| 96 | // data. Because this write doesn't touch the BufWriter state directly, |
| 97 | // and the buffer is known to be empty, we don't need to worry about |
| 98 | // self.buffer.panicked here. |
| 99 | let flushed = self.inner_mut().write(lines)?; |
| 100 | |
| 101 | // If buffer returns Ok(0), propagate that to the caller without |
| 102 | // doing additional buffering; otherwise we're just guaranteeing |
| 103 | // an "ErrorKind::WriteZero" later. |
| 104 | if flushed == 0 { |
| 105 | return Ok(0); |
| 106 | } |
| 107 | |
| 108 | // Now that the write has succeeded, buffer the rest (or as much of |
| 109 | // the rest as possible). If there were any unwritten newlines, we |
| 110 | // only buffer out to the last unwritten newline that fits in the |
| 111 | // buffer; this helps prevent flushing partial lines on subsequent |
| 112 | // calls to LineWriterShim::write. |
| 113 | |
| 114 | // Handle the cases in order of most-common to least-common, under |
| 115 | // the presumption that most writes succeed in totality, and that most |
| 116 | // writes are smaller than the buffer. |
| 117 | // - Is this a partial line (ie, no newlines left in the unwritten tail) |
| 118 | // - If not, does the data out to the last unwritten newline fit in |
| 119 | // the buffer? |
| 120 | // - If not, scan for the last newline that *does* fit in the buffer |
| 121 | let tail = if flushed >= newline_idx { |
| 122 | let tail = &buf[flushed..]; |
| 123 | // Avoid unnecessary short writes by not splitting the remaining |
| 124 | // bytes if they're larger than the buffer. |
| 125 | // They can be written in full by the next call to write. |
| 126 | if tail.len() >= self.buffer.capacity() { |
| 127 | return Ok(flushed); |
| 128 | } |
| 129 | tail |
| 130 | } else if newline_idx - flushed <= self.buffer.capacity() { |
| 131 | &buf[flushed..newline_idx] |
| 132 | } else { |
| 133 | let scan_area = &buf[flushed..]; |
| 134 | let scan_area = &scan_area[..self.buffer.capacity()]; |
| 135 | match memchr::memrchr(b' \n' , scan_area) { |
| 136 | Some(newline_idx) => &scan_area[..newline_idx + 1], |
| 137 | None => scan_area, |
| 138 | } |
| 139 | }; |
| 140 | |
| 141 | let buffered = self.buffer.write_to_buf(tail); |
| 142 | Ok(flushed + buffered) |
| 143 | } |
| 144 | |
| 145 | fn flush(&mut self) -> io::Result<()> { |
| 146 | self.buffer.flush() |
| 147 | } |
| 148 | |
| 149 | /// Writes some vectored data into this BufReader with line buffering. |
| 150 | /// |
| 151 | /// This means that, if any newlines are present in the data, the data up to |
| 152 | /// and including the buffer containing the last newline is sent directly to |
| 153 | /// the inner writer, and the data after it is buffered. Returns the number |
| 154 | /// of bytes written. |
| 155 | /// |
| 156 | /// This function operates on a "best effort basis"; in keeping with the |
| 157 | /// convention of `Write::write`, it makes at most one attempt to write |
| 158 | /// new data to the underlying writer. |
| 159 | /// |
| 160 | /// Because this function attempts to send completed lines to the underlying |
| 161 | /// writer, it will also flush the existing buffer if it contains any |
| 162 | /// newlines. |
| 163 | /// |
| 164 | /// Because sorting through an array of `IoSlice` can be a bit convoluted, |
| 165 | /// This method differs from write in the following ways: |
| 166 | /// |
| 167 | /// - It attempts to write the full content of all the buffers up to and |
| 168 | /// including the one containing the last newline. This means that it |
| 169 | /// may attempt to write a partial line, that buffer has data past the |
| 170 | /// newline. |
| 171 | /// - If the write only reports partial success, it does not attempt to |
| 172 | /// find the precise location of the written bytes and buffer the rest. |
| 173 | /// |
| 174 | /// If the underlying vector doesn't support vectored writing, we instead |
| 175 | /// simply write the first non-empty buffer with `write`. This way, we |
| 176 | /// get the benefits of more granular partial-line handling without losing |
| 177 | /// anything in efficiency |
| 178 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
| 179 | // If there's no specialized behavior for write_vectored, just use |
| 180 | // write. This has the benefit of more granular partial-line handling. |
| 181 | if !self.is_write_vectored() { |
| 182 | return match bufs.iter().find(|buf| !buf.is_empty()) { |
| 183 | Some(buf) => self.write(buf), |
| 184 | None => Ok(0), |
| 185 | }; |
| 186 | } |
| 187 | |
| 188 | // Find the buffer containing the last newline |
| 189 | // FIXME: This is overly slow if there are very many bufs and none contain |
| 190 | // newlines. e.g. writev() on Linux only writes up to 1024 slices, so |
| 191 | // scanning the rest is wasted effort. This makes write_all_vectored() |
| 192 | // quadratic. |
| 193 | let last_newline_buf_idx = bufs |
| 194 | .iter() |
| 195 | .enumerate() |
| 196 | .rev() |
| 197 | .find_map(|(i, buf)| memchr::memchr(b' \n' , buf).map(|_| i)); |
| 198 | |
| 199 | // If there are no new newlines (that is, if this write is less than |
| 200 | // one line), just do a regular buffered write |
| 201 | let last_newline_buf_idx = match last_newline_buf_idx { |
| 202 | // No newlines; just do a normal buffered write |
| 203 | None => { |
| 204 | self.flush_if_completed_line()?; |
| 205 | return self.buffer.write_vectored(bufs); |
| 206 | } |
| 207 | Some(i) => i, |
| 208 | }; |
| 209 | |
| 210 | // Flush existing content to prepare for our write |
| 211 | self.buffer.flush_buf()?; |
| 212 | |
| 213 | // This is what we're going to try to write directly to the inner |
| 214 | // writer. The rest will be buffered, if nothing goes wrong. |
| 215 | let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); |
| 216 | |
| 217 | // Write `lines` directly to the inner writer. In keeping with the |
| 218 | // `write` convention, make at most one attempt to add new (unbuffered) |
| 219 | // data. Because this write doesn't touch the BufWriter state directly, |
| 220 | // and the buffer is known to be empty, we don't need to worry about |
| 221 | // self.panicked here. |
| 222 | let flushed = self.inner_mut().write_vectored(lines)?; |
| 223 | |
| 224 | // If inner returns Ok(0), propagate that to the caller without |
| 225 | // doing additional buffering; otherwise we're just guaranteeing |
| 226 | // an "ErrorKind::WriteZero" later. |
| 227 | if flushed == 0 { |
| 228 | return Ok(0); |
| 229 | } |
| 230 | |
| 231 | // Don't try to reconstruct the exact amount written; just bail |
| 232 | // in the event of a partial write |
| 233 | let mut lines_len: usize = 0; |
| 234 | for buf in lines { |
| 235 | // With overlapping/duplicate slices the total length may in theory |
| 236 | // exceed usize::MAX |
| 237 | lines_len = lines_len.saturating_add(buf.len()); |
| 238 | if flushed < lines_len { |
| 239 | return Ok(flushed); |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | // Now that the write has succeeded, buffer the rest (or as much of the |
| 244 | // rest as possible) |
| 245 | let buffered: usize = tail |
| 246 | .iter() |
| 247 | .filter(|buf| !buf.is_empty()) |
| 248 | .map(|buf| self.buffer.write_to_buf(buf)) |
| 249 | .take_while(|&n| n > 0) |
| 250 | .sum(); |
| 251 | |
| 252 | Ok(flushed + buffered) |
| 253 | } |
| 254 | |
| 255 | fn is_write_vectored(&self) -> bool { |
| 256 | self.inner().is_write_vectored() |
| 257 | } |
| 258 | |
| 259 | /// Writes some data into this BufReader with line buffering. |
| 260 | /// |
| 261 | /// This means that, if any newlines are present in the data, the data up to |
| 262 | /// the last newline is sent directly to the underlying writer, and data |
| 263 | /// after it is buffered. |
| 264 | /// |
| 265 | /// Because this function attempts to send completed lines to the underlying |
| 266 | /// writer, it will also flush the existing buffer if it contains any |
| 267 | /// newlines, even if the incoming data does not contain any newlines. |
| 268 | fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { |
| 269 | match memchr::memrchr(b' \n' , buf) { |
| 270 | // If there are no new newlines (that is, if this write is less than |
| 271 | // one line), just do a regular buffered write (which may flush if |
| 272 | // we exceed the inner buffer's size) |
| 273 | None => { |
| 274 | self.flush_if_completed_line()?; |
| 275 | self.buffer.write_all(buf) |
| 276 | } |
| 277 | Some(newline_idx) => { |
| 278 | let (lines, tail) = buf.split_at(newline_idx + 1); |
| 279 | |
| 280 | if self.buffered().is_empty() { |
| 281 | self.inner_mut().write_all(lines)?; |
| 282 | } else { |
| 283 | // If there is any buffered data, we add the incoming lines |
| 284 | // to that buffer before flushing, which saves us at least |
| 285 | // one write call. We can't really do this with `write`, |
| 286 | // since we can't do this *and* not suppress errors *and* |
| 287 | // report a consistent state to the caller in a return |
| 288 | // value, but here in write_all it's fine. |
| 289 | self.buffer.write_all(lines)?; |
| 290 | self.buffer.flush_buf()?; |
| 291 | } |
| 292 | |
| 293 | self.buffer.write_all(tail) |
| 294 | } |
| 295 | } |
| 296 | } |
| 297 | } |
| 298 | |