1use crate::io::{self, BufWriter, IoSlice, Write};
2use core::slice::memchr;
3
4/// Private helper struct for implementing the line-buffered writing logic.
5/// This shim temporarily wraps a BufWriter, and uses its internals to
6/// implement a line-buffered writer (specifically by using the internal
7/// methods like write_to_buf and flush_buf). In this way, a more
8/// efficient abstraction can be created than one that only had access to
9/// `write` and `flush`, without needlessly duplicating a lot of the
10/// implementation details of BufWriter. This also allows existing
11/// `BufWriters` to be temporarily given line-buffering logic; this is what
12/// enables Stdout to be alternately in line-buffered or block-buffered mode.
13#[derive(Debug)]
14pub struct LineWriterShim<'a, W: ?Sized + Write> {
15 buffer: &'a mut BufWriter<W>,
16}
17
18impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> {
19 pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
20 Self { buffer }
21 }
22
23 /// Get a reference to the inner writer (that is, the writer
24 /// wrapped by the BufWriter).
25 fn inner(&self) -> &W {
26 self.buffer.get_ref()
27 }
28
29 /// Get a mutable reference to the inner writer (that is, the writer
30 /// wrapped by the BufWriter). Be careful with this writer, as writes to
31 /// it will bypass the buffer.
32 fn inner_mut(&mut self) -> &mut W {
33 self.buffer.get_mut()
34 }
35
36 /// Get the content currently buffered in self.buffer
37 fn buffered(&self) -> &[u8] {
38 self.buffer.buffer()
39 }
40
41 /// Flush the buffer iff the last byte is a newline (indicating that an
42 /// earlier write only succeeded partially, and we want to retry flushing
43 /// the buffered line before continuing with a subsequent write)
44 fn flush_if_completed_line(&mut self) -> io::Result<()> {
45 match self.buffered().last().copied() {
46 Some(b'\n') => self.buffer.flush_buf(),
47 _ => Ok(()),
48 }
49 }
50}
51
52impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> {
53 /// Write some data into this BufReader with line buffering. This means
54 /// that, if any newlines are present in the data, the data up to the last
55 /// newline is sent directly to the underlying writer, and data after it
56 /// is buffered. Returns the number of bytes written.
57 ///
58 /// This function operates on a "best effort basis"; in keeping with the
59 /// convention of `Write::write`, it makes at most one attempt to write
60 /// new data to the underlying writer. If that write only reports a partial
61 /// success, the remaining data will be buffered.
62 ///
63 /// Because this function attempts to send completed lines to the underlying
64 /// writer, it will also flush the existing buffer if it ends with a
65 /// newline, even if the incoming data does not contain any newlines.
66 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
67 let newline_idx = match memchr::memrchr(b'\n', buf) {
68 // If there are no new newlines (that is, if this write is less than
69 // one line), just do a regular buffered write (which may flush if
70 // we exceed the inner buffer's size)
71 None => {
72 self.flush_if_completed_line()?;
73 return self.buffer.write(buf);
74 }
75 // Otherwise, arrange for the lines to be written directly to the
76 // inner writer.
77 Some(newline_idx) => newline_idx + 1,
78 };
79
80 // Flush existing content to prepare for our write. We have to do this
81 // before attempting to write `buf` in order to maintain consistency;
82 // if we add `buf` to the buffer then try to flush it all at once,
83 // we're obligated to return Ok(), which would mean suppressing any
84 // errors that occur during flush.
85 self.buffer.flush_buf()?;
86
87 // This is what we're going to try to write directly to the inner
88 // writer. The rest will be buffered, if nothing goes wrong.
89 let lines = &buf[..newline_idx];
90
91 // Write `lines` directly to the inner writer. In keeping with the
92 // `write` convention, make at most one attempt to add new (unbuffered)
93 // data. Because this write doesn't touch the BufWriter state directly,
94 // and the buffer is known to be empty, we don't need to worry about
95 // self.buffer.panicked here.
96 let flushed = self.inner_mut().write(lines)?;
97
98 // If buffer returns Ok(0), propagate that to the caller without
99 // doing additional buffering; otherwise we're just guaranteeing
100 // an "ErrorKind::WriteZero" later.
101 if flushed == 0 {
102 return Ok(0);
103 }
104
105 // Now that the write has succeeded, buffer the rest (or as much of
106 // the rest as possible). If there were any unwritten newlines, we
107 // only buffer out to the last unwritten newline that fits in the
108 // buffer; this helps prevent flushing partial lines on subsequent
109 // calls to LineWriterShim::write.
110
111 // Handle the cases in order of most-common to least-common, under
112 // the presumption that most writes succeed in totality, and that most
113 // writes are smaller than the buffer.
114 // - Is this a partial line (ie, no newlines left in the unwritten tail)
115 // - If not, does the data out to the last unwritten newline fit in
116 // the buffer?
117 // - If not, scan for the last newline that *does* fit in the buffer
118 let tail = if flushed >= newline_idx {
119 &buf[flushed..]
120 } else if newline_idx - flushed <= self.buffer.capacity() {
121 &buf[flushed..newline_idx]
122 } else {
123 let scan_area = &buf[flushed..];
124 let scan_area = &scan_area[..self.buffer.capacity()];
125 match memchr::memrchr(b'\n', scan_area) {
126 Some(newline_idx) => &scan_area[..newline_idx + 1],
127 None => scan_area,
128 }
129 };
130
131 let buffered = self.buffer.write_to_buf(tail);
132 Ok(flushed + buffered)
133 }
134
135 fn flush(&mut self) -> io::Result<()> {
136 self.buffer.flush()
137 }
138
139 /// Write some vectored data into this BufReader with line buffering. This
140 /// means that, if any newlines are present in the data, the data up to
141 /// and including the buffer containing the last newline is sent directly
142 /// to the inner writer, and the data after it is buffered. Returns the
143 /// number of bytes written.
144 ///
145 /// This function operates on a "best effort basis"; in keeping with the
146 /// convention of `Write::write`, it makes at most one attempt to write
147 /// new data to the underlying writer.
148 ///
149 /// Because this function attempts to send completed lines to the underlying
150 /// writer, it will also flush the existing buffer if it contains any
151 /// newlines.
152 ///
153 /// Because sorting through an array of `IoSlice` can be a bit convoluted,
154 /// This method differs from write in the following ways:
155 ///
156 /// - It attempts to write the full content of all the buffers up to and
157 /// including the one containing the last newline. This means that it
158 /// may attempt to write a partial line, that buffer has data past the
159 /// newline.
160 /// - If the write only reports partial success, it does not attempt to
161 /// find the precise location of the written bytes and buffer the rest.
162 ///
163 /// If the underlying vector doesn't support vectored writing, we instead
164 /// simply write the first non-empty buffer with `write`. This way, we
165 /// get the benefits of more granular partial-line handling without losing
166 /// anything in efficiency
167 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
168 // If there's no specialized behavior for write_vectored, just use
169 // write. This has the benefit of more granular partial-line handling.
170 if !self.is_write_vectored() {
171 return match bufs.iter().find(|buf| !buf.is_empty()) {
172 Some(buf) => self.write(buf),
173 None => Ok(0),
174 };
175 }
176
177 // Find the buffer containing the last newline
178 // FIXME: This is overly slow if there are very many bufs and none contain
179 // newlines. e.g. writev() on Linux only writes up to 1024 slices, so
180 // scanning the rest is wasted effort. This makes write_all_vectored()
181 // quadratic.
182 let last_newline_buf_idx = bufs
183 .iter()
184 .enumerate()
185 .rev()
186 .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
187
188 // If there are no new newlines (that is, if this write is less than
189 // one line), just do a regular buffered write
190 let last_newline_buf_idx = match last_newline_buf_idx {
191 // No newlines; just do a normal buffered write
192 None => {
193 self.flush_if_completed_line()?;
194 return self.buffer.write_vectored(bufs);
195 }
196 Some(i) => i,
197 };
198
199 // Flush existing content to prepare for our write
200 self.buffer.flush_buf()?;
201
202 // This is what we're going to try to write directly to the inner
203 // writer. The rest will be buffered, if nothing goes wrong.
204 let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
205
206 // Write `lines` directly to the inner writer. In keeping with the
207 // `write` convention, make at most one attempt to add new (unbuffered)
208 // data. Because this write doesn't touch the BufWriter state directly,
209 // and the buffer is known to be empty, we don't need to worry about
210 // self.panicked here.
211 let flushed = self.inner_mut().write_vectored(lines)?;
212
213 // If inner returns Ok(0), propagate that to the caller without
214 // doing additional buffering; otherwise we're just guaranteeing
215 // an "ErrorKind::WriteZero" later.
216 if flushed == 0 {
217 return Ok(0);
218 }
219
220 // Don't try to reconstruct the exact amount written; just bail
221 // in the event of a partial write
222 let mut lines_len: usize = 0;
223 for buf in lines {
224 // With overlapping/duplicate slices the total length may in theory
225 // exceed usize::MAX
226 lines_len = lines_len.saturating_add(buf.len());
227 if flushed < lines_len {
228 return Ok(flushed);
229 }
230 }
231
232 // Now that the write has succeeded, buffer the rest (or as much of the
233 // rest as possible)
234 let buffered: usize = tail
235 .iter()
236 .filter(|buf| !buf.is_empty())
237 .map(|buf| self.buffer.write_to_buf(buf))
238 .take_while(|&n| n > 0)
239 .sum();
240
241 Ok(flushed + buffered)
242 }
243
244 fn is_write_vectored(&self) -> bool {
245 self.inner().is_write_vectored()
246 }
247
248 /// Write some data into this BufReader with line buffering. This means
249 /// that, if any newlines are present in the data, the data up to the last
250 /// newline is sent directly to the underlying writer, and data after it
251 /// is buffered.
252 ///
253 /// Because this function attempts to send completed lines to the underlying
254 /// writer, it will also flush the existing buffer if it contains any
255 /// newlines, even if the incoming data does not contain any newlines.
256 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
257 match memchr::memrchr(b'\n', buf) {
258 // If there are no new newlines (that is, if this write is less than
259 // one line), just do a regular buffered write (which may flush if
260 // we exceed the inner buffer's size)
261 None => {
262 self.flush_if_completed_line()?;
263 self.buffer.write_all(buf)
264 }
265 Some(newline_idx) => {
266 let (lines, tail) = buf.split_at(newline_idx + 1);
267
268 if self.buffered().is_empty() {
269 self.inner_mut().write_all(lines)?;
270 } else {
271 // If there is any buffered data, we add the incoming lines
272 // to that buffer before flushing, which saves us at least
273 // one write call. We can't really do this with `write`,
274 // since we can't do this *and* not suppress errors *and*
275 // report a consistent state to the caller in a return
276 // value, but here in write_all it's fine.
277 self.buffer.write_all(lines)?;
278 self.buffer.flush_buf()?;
279 }
280
281 self.buffer.write_all(tail)
282 }
283 }
284 }
285}
286