1 | use crate::io::{self, BufWriter, IoSlice, Write}; |
2 | use core::slice::memchr; |
3 | |
4 | /// Private helper struct for implementing the line-buffered writing logic. |
5 | /// This shim temporarily wraps a BufWriter, and uses its internals to |
6 | /// implement a line-buffered writer (specifically by using the internal |
7 | /// methods like write_to_buf and flush_buf). In this way, a more |
8 | /// efficient abstraction can be created than one that only had access to |
9 | /// `write` and `flush`, without needlessly duplicating a lot of the |
10 | /// implementation details of BufWriter. This also allows existing |
11 | /// `BufWriters` to be temporarily given line-buffering logic; this is what |
12 | /// enables Stdout to be alternately in line-buffered or block-buffered mode. |
13 | #[derive (Debug)] |
14 | pub struct LineWriterShim<'a, W: ?Sized + Write> { |
15 | buffer: &'a mut BufWriter<W>, |
16 | } |
17 | |
18 | impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> { |
19 | pub fn new(buffer: &'a mut BufWriter<W>) -> Self { |
20 | Self { buffer } |
21 | } |
22 | |
23 | /// Get a reference to the inner writer (that is, the writer |
24 | /// wrapped by the BufWriter). |
25 | fn inner(&self) -> &W { |
26 | self.buffer.get_ref() |
27 | } |
28 | |
29 | /// Get a mutable reference to the inner writer (that is, the writer |
30 | /// wrapped by the BufWriter). Be careful with this writer, as writes to |
31 | /// it will bypass the buffer. |
32 | fn inner_mut(&mut self) -> &mut W { |
33 | self.buffer.get_mut() |
34 | } |
35 | |
36 | /// Get the content currently buffered in self.buffer |
37 | fn buffered(&self) -> &[u8] { |
38 | self.buffer.buffer() |
39 | } |
40 | |
41 | /// Flush the buffer iff the last byte is a newline (indicating that an |
42 | /// earlier write only succeeded partially, and we want to retry flushing |
43 | /// the buffered line before continuing with a subsequent write) |
44 | fn flush_if_completed_line(&mut self) -> io::Result<()> { |
45 | match self.buffered().last().copied() { |
46 | Some(b' \n' ) => self.buffer.flush_buf(), |
47 | _ => Ok(()), |
48 | } |
49 | } |
50 | } |
51 | |
52 | impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> { |
53 | /// Write some data into this BufReader with line buffering. This means |
54 | /// that, if any newlines are present in the data, the data up to the last |
55 | /// newline is sent directly to the underlying writer, and data after it |
56 | /// is buffered. Returns the number of bytes written. |
57 | /// |
58 | /// This function operates on a "best effort basis"; in keeping with the |
59 | /// convention of `Write::write`, it makes at most one attempt to write |
60 | /// new data to the underlying writer. If that write only reports a partial |
61 | /// success, the remaining data will be buffered. |
62 | /// |
63 | /// Because this function attempts to send completed lines to the underlying |
64 | /// writer, it will also flush the existing buffer if it ends with a |
65 | /// newline, even if the incoming data does not contain any newlines. |
66 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
67 | let newline_idx = match memchr::memrchr(b' \n' , buf) { |
68 | // If there are no new newlines (that is, if this write is less than |
69 | // one line), just do a regular buffered write (which may flush if |
70 | // we exceed the inner buffer's size) |
71 | None => { |
72 | self.flush_if_completed_line()?; |
73 | return self.buffer.write(buf); |
74 | } |
75 | // Otherwise, arrange for the lines to be written directly to the |
76 | // inner writer. |
77 | Some(newline_idx) => newline_idx + 1, |
78 | }; |
79 | |
80 | // Flush existing content to prepare for our write. We have to do this |
81 | // before attempting to write `buf` in order to maintain consistency; |
82 | // if we add `buf` to the buffer then try to flush it all at once, |
83 | // we're obligated to return Ok(), which would mean suppressing any |
84 | // errors that occur during flush. |
85 | self.buffer.flush_buf()?; |
86 | |
87 | // This is what we're going to try to write directly to the inner |
88 | // writer. The rest will be buffered, if nothing goes wrong. |
89 | let lines = &buf[..newline_idx]; |
90 | |
91 | // Write `lines` directly to the inner writer. In keeping with the |
92 | // `write` convention, make at most one attempt to add new (unbuffered) |
93 | // data. Because this write doesn't touch the BufWriter state directly, |
94 | // and the buffer is known to be empty, we don't need to worry about |
95 | // self.buffer.panicked here. |
96 | let flushed = self.inner_mut().write(lines)?; |
97 | |
98 | // If buffer returns Ok(0), propagate that to the caller without |
99 | // doing additional buffering; otherwise we're just guaranteeing |
100 | // an "ErrorKind::WriteZero" later. |
101 | if flushed == 0 { |
102 | return Ok(0); |
103 | } |
104 | |
105 | // Now that the write has succeeded, buffer the rest (or as much of |
106 | // the rest as possible). If there were any unwritten newlines, we |
107 | // only buffer out to the last unwritten newline that fits in the |
108 | // buffer; this helps prevent flushing partial lines on subsequent |
109 | // calls to LineWriterShim::write. |
110 | |
111 | // Handle the cases in order of most-common to least-common, under |
112 | // the presumption that most writes succeed in totality, and that most |
113 | // writes are smaller than the buffer. |
114 | // - Is this a partial line (ie, no newlines left in the unwritten tail) |
115 | // - If not, does the data out to the last unwritten newline fit in |
116 | // the buffer? |
117 | // - If not, scan for the last newline that *does* fit in the buffer |
118 | let tail = if flushed >= newline_idx { |
119 | &buf[flushed..] |
120 | } else if newline_idx - flushed <= self.buffer.capacity() { |
121 | &buf[flushed..newline_idx] |
122 | } else { |
123 | let scan_area = &buf[flushed..]; |
124 | let scan_area = &scan_area[..self.buffer.capacity()]; |
125 | match memchr::memrchr(b' \n' , scan_area) { |
126 | Some(newline_idx) => &scan_area[..newline_idx + 1], |
127 | None => scan_area, |
128 | } |
129 | }; |
130 | |
131 | let buffered = self.buffer.write_to_buf(tail); |
132 | Ok(flushed + buffered) |
133 | } |
134 | |
135 | fn flush(&mut self) -> io::Result<()> { |
136 | self.buffer.flush() |
137 | } |
138 | |
139 | /// Write some vectored data into this BufReader with line buffering. This |
140 | /// means that, if any newlines are present in the data, the data up to |
141 | /// and including the buffer containing the last newline is sent directly |
142 | /// to the inner writer, and the data after it is buffered. Returns the |
143 | /// number of bytes written. |
144 | /// |
145 | /// This function operates on a "best effort basis"; in keeping with the |
146 | /// convention of `Write::write`, it makes at most one attempt to write |
147 | /// new data to the underlying writer. |
148 | /// |
149 | /// Because this function attempts to send completed lines to the underlying |
150 | /// writer, it will also flush the existing buffer if it contains any |
151 | /// newlines. |
152 | /// |
153 | /// Because sorting through an array of `IoSlice` can be a bit convoluted, |
154 | /// This method differs from write in the following ways: |
155 | /// |
156 | /// - It attempts to write the full content of all the buffers up to and |
157 | /// including the one containing the last newline. This means that it |
158 | /// may attempt to write a partial line, that buffer has data past the |
159 | /// newline. |
160 | /// - If the write only reports partial success, it does not attempt to |
161 | /// find the precise location of the written bytes and buffer the rest. |
162 | /// |
163 | /// If the underlying vector doesn't support vectored writing, we instead |
164 | /// simply write the first non-empty buffer with `write`. This way, we |
165 | /// get the benefits of more granular partial-line handling without losing |
166 | /// anything in efficiency |
167 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { |
168 | // If there's no specialized behavior for write_vectored, just use |
169 | // write. This has the benefit of more granular partial-line handling. |
170 | if !self.is_write_vectored() { |
171 | return match bufs.iter().find(|buf| !buf.is_empty()) { |
172 | Some(buf) => self.write(buf), |
173 | None => Ok(0), |
174 | }; |
175 | } |
176 | |
177 | // Find the buffer containing the last newline |
178 | // FIXME: This is overly slow if there are very many bufs and none contain |
179 | // newlines. e.g. writev() on Linux only writes up to 1024 slices, so |
180 | // scanning the rest is wasted effort. This makes write_all_vectored() |
181 | // quadratic. |
182 | let last_newline_buf_idx = bufs |
183 | .iter() |
184 | .enumerate() |
185 | .rev() |
186 | .find_map(|(i, buf)| memchr::memchr(b' \n' , buf).map(|_| i)); |
187 | |
188 | // If there are no new newlines (that is, if this write is less than |
189 | // one line), just do a regular buffered write |
190 | let last_newline_buf_idx = match last_newline_buf_idx { |
191 | // No newlines; just do a normal buffered write |
192 | None => { |
193 | self.flush_if_completed_line()?; |
194 | return self.buffer.write_vectored(bufs); |
195 | } |
196 | Some(i) => i, |
197 | }; |
198 | |
199 | // Flush existing content to prepare for our write |
200 | self.buffer.flush_buf()?; |
201 | |
202 | // This is what we're going to try to write directly to the inner |
203 | // writer. The rest will be buffered, if nothing goes wrong. |
204 | let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); |
205 | |
206 | // Write `lines` directly to the inner writer. In keeping with the |
207 | // `write` convention, make at most one attempt to add new (unbuffered) |
208 | // data. Because this write doesn't touch the BufWriter state directly, |
209 | // and the buffer is known to be empty, we don't need to worry about |
210 | // self.panicked here. |
211 | let flushed = self.inner_mut().write_vectored(lines)?; |
212 | |
213 | // If inner returns Ok(0), propagate that to the caller without |
214 | // doing additional buffering; otherwise we're just guaranteeing |
215 | // an "ErrorKind::WriteZero" later. |
216 | if flushed == 0 { |
217 | return Ok(0); |
218 | } |
219 | |
220 | // Don't try to reconstruct the exact amount written; just bail |
221 | // in the event of a partial write |
222 | let mut lines_len: usize = 0; |
223 | for buf in lines { |
224 | // With overlapping/duplicate slices the total length may in theory |
225 | // exceed usize::MAX |
226 | lines_len = lines_len.saturating_add(buf.len()); |
227 | if flushed < lines_len { |
228 | return Ok(flushed); |
229 | } |
230 | } |
231 | |
232 | // Now that the write has succeeded, buffer the rest (or as much of the |
233 | // rest as possible) |
234 | let buffered: usize = tail |
235 | .iter() |
236 | .filter(|buf| !buf.is_empty()) |
237 | .map(|buf| self.buffer.write_to_buf(buf)) |
238 | .take_while(|&n| n > 0) |
239 | .sum(); |
240 | |
241 | Ok(flushed + buffered) |
242 | } |
243 | |
244 | fn is_write_vectored(&self) -> bool { |
245 | self.inner().is_write_vectored() |
246 | } |
247 | |
248 | /// Write some data into this BufReader with line buffering. This means |
249 | /// that, if any newlines are present in the data, the data up to the last |
250 | /// newline is sent directly to the underlying writer, and data after it |
251 | /// is buffered. |
252 | /// |
253 | /// Because this function attempts to send completed lines to the underlying |
254 | /// writer, it will also flush the existing buffer if it contains any |
255 | /// newlines, even if the incoming data does not contain any newlines. |
256 | fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { |
257 | match memchr::memrchr(b' \n' , buf) { |
258 | // If there are no new newlines (that is, if this write is less than |
259 | // one line), just do a regular buffered write (which may flush if |
260 | // we exceed the inner buffer's size) |
261 | None => { |
262 | self.flush_if_completed_line()?; |
263 | self.buffer.write_all(buf) |
264 | } |
265 | Some(newline_idx) => { |
266 | let (lines, tail) = buf.split_at(newline_idx + 1); |
267 | |
268 | if self.buffered().is_empty() { |
269 | self.inner_mut().write_all(lines)?; |
270 | } else { |
271 | // If there is any buffered data, we add the incoming lines |
272 | // to that buffer before flushing, which saves us at least |
273 | // one write call. We can't really do this with `write`, |
274 | // since we can't do this *and* not suppress errors *and* |
275 | // report a consistent state to the caller in a return |
276 | // value, but here in write_all it's fine. |
277 | self.buffer.write_all(lines)?; |
278 | self.buffer.flush_buf()?; |
279 | } |
280 | |
281 | self.buffer.write_all(tail) |
282 | } |
283 | } |
284 | } |
285 | } |
286 | |