1 | use core::slice::memchr; |
2 | |
3 | use crate::io::{self, BufWriter, IoSlice, Write}; |
4 | |
5 | /// Private helper struct for implementing the line-buffered writing logic. |
6 | /// |
7 | /// This shim temporarily wraps a BufWriter, and uses its internals to |
8 | /// implement a line-buffered writer (specifically by using the internal |
9 | /// methods like write_to_buf and flush_buf). In this way, a more |
10 | /// efficient abstraction can be created than one that only had access to |
11 | /// `write` and `flush`, without needlessly duplicating a lot of the |
12 | /// implementation details of BufWriter. This also allows existing |
13 | /// `BufWriters` to be temporarily given line-buffering logic; this is what |
14 | /// enables Stdout to be alternately in line-buffered or block-buffered mode. |
15 | #[derive (Debug)] |
16 | pub struct LineWriterShim<'a, W: ?Sized + Write> { |
17 | buffer: &'a mut BufWriter<W>, |
18 | } |
19 | |
20 | impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> { |
21 | pub fn new(buffer: &'a mut BufWriter<W>) -> Self { |
22 | Self { buffer } |
23 | } |
24 | |
25 | /// Gets a reference to the inner writer (that is, the writer |
26 | /// wrapped by the BufWriter). |
27 | fn inner(&self) -> &W { |
28 | self.buffer.get_ref() |
29 | } |
30 | |
31 | /// Gets a mutable reference to the inner writer (that is, the writer |
32 | /// wrapped by the BufWriter). Be careful with this writer, as writes to |
33 | /// it will bypass the buffer. |
34 | fn inner_mut(&mut self) -> &mut W { |
35 | self.buffer.get_mut() |
36 | } |
37 | |
38 | /// Gets the content currently buffered in self.buffer |
39 | fn buffered(&self) -> &[u8] { |
40 | self.buffer.buffer() |
41 | } |
42 | |
43 | /// Flushes the buffer iff the last byte is a newline (indicating that an |
44 | /// earlier write only succeeded partially, and we want to retry flushing |
45 | /// the buffered line before continuing with a subsequent write). |
46 | fn flush_if_completed_line(&mut self) -> io::Result<()> { |
47 | match self.buffered().last().copied() { |
48 | Some(b' \n' ) => self.buffer.flush_buf(), |
49 | _ => Ok(()), |
50 | } |
51 | } |
52 | } |
53 | |
54 | impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> { |
55 | /// Writes some data into this BufReader with line buffering. |
56 | /// |
57 | /// This means that, if any newlines are present in the data, the data up to |
58 | /// the last newline is sent directly to the underlying writer, and data |
59 | /// after it is buffered. Returns the number of bytes written. |
60 | /// |
61 | /// This function operates on a "best effort basis"; in keeping with the |
62 | /// convention of `Write::write`, it makes at most one attempt to write |
63 | /// new data to the underlying writer. If that write only reports a partial |
64 | /// success, the remaining data will be buffered. |
65 | /// |
66 | /// Because this function attempts to send completed lines to the underlying |
67 | /// writer, it will also flush the existing buffer if it ends with a |
68 | /// newline, even if the incoming data does not contain any newlines. |
69 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
70 | let newline_idx = match memchr::memrchr(b' \n' , buf) { |
71 | // If there are no new newlines (that is, if this write is less than |
72 | // one line), just do a regular buffered write (which may flush if |
73 | // we exceed the inner buffer's size) |
74 | None => { |
75 | self.flush_if_completed_line()?; |
76 | return self.buffer.write(buf); |
77 | } |
78 | // Otherwise, arrange for the lines to be written directly to the |
79 | // inner writer. |
80 | Some(newline_idx) => newline_idx + 1, |
81 | }; |
82 | |
83 | // Flush existing content to prepare for our write. We have to do this |
84 | // before attempting to write `buf` in order to maintain consistency; |
85 | // if we add `buf` to the buffer then try to flush it all at once, |
86 | // we're obligated to return Ok(), which would mean suppressing any |
87 | // errors that occur during flush. |
88 | self.buffer.flush_buf()?; |
89 | |
90 | // This is what we're going to try to write directly to the inner |
91 | // writer. The rest will be buffered, if nothing goes wrong. |
92 | let lines = &buf[..newline_idx]; |
93 | |
94 | // Write `lines` directly to the inner writer. In keeping with the |
95 | // `write` convention, make at most one attempt to add new (unbuffered) |
96 | // data. Because this write doesn't touch the BufWriter state directly, |
97 | // and the buffer is known to be empty, we don't need to worry about |
98 | // self.buffer.panicked here. |
99 | let flushed = self.inner_mut().write(lines)?; |
100 | |
101 | // If buffer returns Ok(0), propagate that to the caller without |
102 | // doing additional buffering; otherwise we're just guaranteeing |
103 | // an "ErrorKind::WriteZero" later. |
104 | if flushed == 0 { |
105 | return Ok(0); |
106 | } |
107 | |
108 | // Now that the write has succeeded, buffer the rest (or as much of |
109 | // the rest as possible). If there were any unwritten newlines, we |
110 | // only buffer out to the last unwritten newline that fits in the |
111 | // buffer; this helps prevent flushing partial lines on subsequent |
112 | // calls to LineWriterShim::write. |
113 | |
114 | // Handle the cases in order of most-common to least-common, under |
115 | // the presumption that most writes succeed in totality, and that most |
116 | // writes are smaller than the buffer. |
117 | // - Is this a partial line (ie, no newlines left in the unwritten tail) |
118 | // - If not, does the data out to the last unwritten newline fit in |
119 | // the buffer? |
120 | // - If not, scan for the last newline that *does* fit in the buffer |
121 | let tail = if flushed >= newline_idx { |
122 | let tail = &buf[flushed..]; |
123 | // Avoid unnecessary short writes by not splitting the remaining |
124 | // bytes if they're larger than the buffer. |
125 | // They can be written in full by the next call to write. |
126 | if tail.len() >= self.buffer.capacity() { |
127 | return Ok(flushed); |
128 | } |
129 | tail |
130 | } else if newline_idx - flushed <= self.buffer.capacity() { |
131 | &buf[flushed..newline_idx] |
132 | } else { |
133 | let scan_area = &buf[flushed..]; |
134 | let scan_area = &scan_area[..self.buffer.capacity()]; |
135 | match memchr::memrchr(b' \n' , scan_area) { |
136 | Some(newline_idx) => &scan_area[..newline_idx + 1], |
137 | None => scan_area, |
138 | } |
139 | }; |
140 | |
141 | let buffered = self.buffer.write_to_buf(tail); |
142 | Ok(flushed + buffered) |
143 | } |
144 | |
145 | fn flush(&mut self) -> io::Result<()> { |
146 | self.buffer.flush() |
147 | } |
148 | |
149 | /// Writes some vectored data into this BufReader with line buffering. |
150 | /// |
151 | /// This means that, if any newlines are present in the data, the data up to |
152 | /// and including the buffer containing the last newline is sent directly to |
153 | /// the inner writer, and the data after it is buffered. Returns the number |
154 | /// of bytes written. |
155 | /// |
156 | /// This function operates on a "best effort basis"; in keeping with the |
157 | /// convention of `Write::write`, it makes at most one attempt to write |
158 | /// new data to the underlying writer. |
159 | /// |
160 | /// Because this function attempts to send completed lines to the underlying |
161 | /// writer, it will also flush the existing buffer if it contains any |
162 | /// newlines. |
163 | /// |
164 | /// Because sorting through an array of `IoSlice` can be a bit convoluted, |
165 | /// This method differs from write in the following ways: |
166 | /// |
167 | /// - It attempts to write the full content of all the buffers up to and |
168 | /// including the one containing the last newline. This means that it |
169 | /// may attempt to write a partial line, that buffer has data past the |
170 | /// newline. |
171 | /// - If the write only reports partial success, it does not attempt to |
172 | /// find the precise location of the written bytes and buffer the rest. |
173 | /// |
174 | /// If the underlying vector doesn't support vectored writing, we instead |
175 | /// simply write the first non-empty buffer with `write`. This way, we |
176 | /// get the benefits of more granular partial-line handling without losing |
177 | /// anything in efficiency |
178 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
179 | // If there's no specialized behavior for write_vectored, just use |
180 | // write. This has the benefit of more granular partial-line handling. |
181 | if !self.is_write_vectored() { |
182 | return match bufs.iter().find(|buf| !buf.is_empty()) { |
183 | Some(buf) => self.write(buf), |
184 | None => Ok(0), |
185 | }; |
186 | } |
187 | |
188 | // Find the buffer containing the last newline |
189 | // FIXME: This is overly slow if there are very many bufs and none contain |
190 | // newlines. e.g. writev() on Linux only writes up to 1024 slices, so |
191 | // scanning the rest is wasted effort. This makes write_all_vectored() |
192 | // quadratic. |
193 | let last_newline_buf_idx = bufs |
194 | .iter() |
195 | .enumerate() |
196 | .rev() |
197 | .find_map(|(i, buf)| memchr::memchr(b' \n' , buf).map(|_| i)); |
198 | |
199 | // If there are no new newlines (that is, if this write is less than |
200 | // one line), just do a regular buffered write |
201 | let last_newline_buf_idx = match last_newline_buf_idx { |
202 | // No newlines; just do a normal buffered write |
203 | None => { |
204 | self.flush_if_completed_line()?; |
205 | return self.buffer.write_vectored(bufs); |
206 | } |
207 | Some(i) => i, |
208 | }; |
209 | |
210 | // Flush existing content to prepare for our write |
211 | self.buffer.flush_buf()?; |
212 | |
213 | // This is what we're going to try to write directly to the inner |
214 | // writer. The rest will be buffered, if nothing goes wrong. |
215 | let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); |
216 | |
217 | // Write `lines` directly to the inner writer. In keeping with the |
218 | // `write` convention, make at most one attempt to add new (unbuffered) |
219 | // data. Because this write doesn't touch the BufWriter state directly, |
220 | // and the buffer is known to be empty, we don't need to worry about |
221 | // self.panicked here. |
222 | let flushed = self.inner_mut().write_vectored(lines)?; |
223 | |
224 | // If inner returns Ok(0), propagate that to the caller without |
225 | // doing additional buffering; otherwise we're just guaranteeing |
226 | // an "ErrorKind::WriteZero" later. |
227 | if flushed == 0 { |
228 | return Ok(0); |
229 | } |
230 | |
231 | // Don't try to reconstruct the exact amount written; just bail |
232 | // in the event of a partial write |
233 | let mut lines_len: usize = 0; |
234 | for buf in lines { |
235 | // With overlapping/duplicate slices the total length may in theory |
236 | // exceed usize::MAX |
237 | lines_len = lines_len.saturating_add(buf.len()); |
238 | if flushed < lines_len { |
239 | return Ok(flushed); |
240 | } |
241 | } |
242 | |
243 | // Now that the write has succeeded, buffer the rest (or as much of the |
244 | // rest as possible) |
245 | let buffered: usize = tail |
246 | .iter() |
247 | .filter(|buf| !buf.is_empty()) |
248 | .map(|buf| self.buffer.write_to_buf(buf)) |
249 | .take_while(|&n| n > 0) |
250 | .sum(); |
251 | |
252 | Ok(flushed + buffered) |
253 | } |
254 | |
255 | fn is_write_vectored(&self) -> bool { |
256 | self.inner().is_write_vectored() |
257 | } |
258 | |
259 | /// Writes some data into this BufReader with line buffering. |
260 | /// |
261 | /// This means that, if any newlines are present in the data, the data up to |
262 | /// the last newline is sent directly to the underlying writer, and data |
263 | /// after it is buffered. |
264 | /// |
265 | /// Because this function attempts to send completed lines to the underlying |
266 | /// writer, it will also flush the existing buffer if it contains any |
267 | /// newlines, even if the incoming data does not contain any newlines. |
268 | fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { |
269 | match memchr::memrchr(b' \n' , buf) { |
270 | // If there are no new newlines (that is, if this write is less than |
271 | // one line), just do a regular buffered write (which may flush if |
272 | // we exceed the inner buffer's size) |
273 | None => { |
274 | self.flush_if_completed_line()?; |
275 | self.buffer.write_all(buf) |
276 | } |
277 | Some(newline_idx) => { |
278 | let (lines, tail) = buf.split_at(newline_idx + 1); |
279 | |
280 | if self.buffered().is_empty() { |
281 | self.inner_mut().write_all(lines)?; |
282 | } else { |
283 | // If there is any buffered data, we add the incoming lines |
284 | // to that buffer before flushing, which saves us at least |
285 | // one write call. We can't really do this with `write`, |
286 | // since we can't do this *and* not suppress errors *and* |
287 | // report a consistent state to the caller in a return |
288 | // value, but here in write_all it's fine. |
289 | self.buffer.write_all(lines)?; |
290 | self.buffer.flush_buf()?; |
291 | } |
292 | |
293 | self.buffer.write_all(tail) |
294 | } |
295 | } |
296 | } |
297 | } |
298 | |