1//! LineReader
2//!
3//! A fast byte-delimiter-oriented buffered reader, offering a faster alternative
4//! to `read_until` that returns byte slices into its internal buffer rather than
5//! copying them out to one you provide.
6//!
7//! Because the internal buffer is fixed, lines longer than the buffer will be
8//! split.
9
10/*
11128k blocks: 0 lines 31603121046 bytes in 36.85s (817.92 MB/s)
12LineReader: 501636842 lines 31603121046 bytes in 73.96s (407.52 MB/s)
13read_until: 501636842 lines 31603121046 bytes in 119.30s (252.62 MB/s)
14read_line: 501636842 lines 31603121046 bytes in 139.14s (216.61 MB/s)
15lines(): 501636842 lines 30599847362 bytes in 167.17s (174.57 MB/s)
16*/
17
18use std::cmp;
19use std::io;
20use std::io::ErrorKind;
21
22extern crate memchr;
23use memchr::{memchr, memrchr};
24
25const NEWLINE: u8 = b'\n';
26const DEFAULT_CAPACITY: usize = 1024 * 64;
27
28/// The `LineReader` struct adds buffered, byte-delimited (default: `\n`)
29/// reading to any io::Reader.
30pub struct LineReader<R> {
31 inner: R,
32 delimiter: u8,
33 buf: Vec<u8>,
34 pos: usize,
35 end_of_complete: usize,
36 end_of_buffer: usize,
37}
38
39use std::fmt;
40
41impl<R: io::Read> fmt::Debug for LineReader<R> {
42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43 write!(
44 f,
45 "LineReader {{ delimiter: {:?}, pos: {}, end_of_complete: {}, end_of_buffer: {} }}",
46 self.delimiter, self.pos, self.end_of_complete, self.end_of_buffer
47 )
48 }
49}
50
51impl<R: io::Read> LineReader<R> {
52 /// Create a new `LineReader` around the reader with a default capacity of
53 /// 64 KiB and delimiter of `\n`.
54 ///
55 /// ```no_run
56 /// # use linereader::LineReader;
57 /// # use std::fs::File;
58 /// # use std::io;
59 /// # fn x() -> io::Result<()> {
60 /// let reader = LineReader::new(File::open("myfile.txt")?);
61 /// # Ok(())
62 /// # }
63 /// ```
64 pub fn new(inner: R) -> Self {
65 Self::with_delimiter_and_capacity(NEWLINE, DEFAULT_CAPACITY, inner)
66 }
67
68 /// Create a new `LineReader` around the reader with a given capacity and
69 /// delimiter of `\n`.
70 ///
71 /// ```no_run
72 /// # use linereader::LineReader;
73 /// # use std::fs::File;
74 /// # use std::io;
75 /// # fn x() -> io::Result<()> {
76 /// let mut reader = LineReader::with_capacity(1024*512, File::open("myfile.txt")?);
77 /// # Ok(())
78 /// # }
79 /// ```
80 pub fn with_capacity(capacity: usize, inner: R) -> Self {
81 Self::with_delimiter_and_capacity(NEWLINE, capacity, inner)
82 }
83
84 /// Create a new `LineReader` around the reader with a default capacity of
85 /// 64 KiB and the given delimiter.
86 ///
87 /// ```no_run
88 /// # use linereader::LineReader;
89 /// # use std::fs::File;
90 /// # use std::io;
91 /// # fn x() -> io::Result<()> {
92 /// let mut reader = LineReader::with_delimiter(b'\t', File::open("myfile.txt")?);
93 /// # Ok(())
94 /// # }
95 /// ```
96 pub fn with_delimiter(delimiter: u8, inner: R) -> Self {
97 Self::with_delimiter_and_capacity(delimiter, DEFAULT_CAPACITY, inner)
98 }
99
100 /// Create a new `LineReader` around the reader with a given capacity and
101 /// delimiter.
102 ///
103 /// ```no_run
104 /// # use linereader::LineReader;
105 /// # use std::fs::File;
106 /// # use std::io;
107 /// # fn x() -> io::Result<()> {
108 /// let mut reader = LineReader::with_delimiter_and_capacity(b'\t', 1024*512, File::open("myfile.txt")?);
109 /// # Ok(())
110 /// # }
111 /// ```
112 pub fn with_delimiter_and_capacity(delimiter: u8, capacity: usize, inner: R) -> Self {
113 Self {
114 inner,
115 delimiter,
116 buf: vec![0; capacity],
117 pos: 0,
118 end_of_complete: 0,
119 end_of_buffer: 0,
120 }
121 }
122
123 /// Run the given closure for each line while while the closure returns `Ok(true)`.
124 ///
125 /// If either the reader or the closure return an error, iteration ends and the error is returned.
126 ///
127 /// ```no_run
128 /// # use linereader::LineReader;
129 /// # use std::fs::File;
130 /// # use std::io;
131 /// # fn x() -> io::Result<()> {
132 /// let buf: &[u8] = b"foo\nbar\nbaz";
133 /// let mut reader = LineReader::new(buf);
134 /// let mut lines = vec![];
135 /// reader.for_each(|line| {
136 /// lines.push(line.to_vec());
137 /// Ok(true)
138 /// })?;
139 /// assert_eq!(lines.len(), 3);
140 /// assert_eq!(lines[0], b"foo\n");
141 /// assert_eq!(lines[1], b"bar\n");
142 /// assert_eq!(lines[2], b"baz");
143 /// # Ok(())
144 /// # }
145 /// ```
146 pub fn for_each<F: FnMut(&[u8]) -> io::Result<bool>>(&mut self, mut f: F) -> io::Result<()> {
147 while let Some(line) = self.next_line() {
148 if !f(line?)? {
149 break;
150 }
151 }
152
153 Ok(())
154 }
155
156 /// Get the next line from the reader, an IO error, or `None` on EOF. The delimiter
157 /// is included in any returned slice, unless the file ends without one or a line was
158 /// truncated to the buffer size due to length.
159 ///
160 /// ```no_run
161 /// # use linereader::LineReader;
162 /// # use std::fs::File;
163 /// # use std::io;
164 /// # fn x() -> io::Result<()> {
165 /// # let mut reader = LineReader::new(File::open("myfile.txt")?);
166 /// while let Some(line) = reader.next_line() {
167 /// let line = line?; // unwrap io::Result to &[u8]
168 /// }
169 /// # Ok(())
170 /// # }
171 /// ```
172 pub fn next_line(&mut self) -> Option<io::Result<&[u8]>> {
173 if self.pos < self.end_of_complete {
174 let lastpos = self.pos;
175 self.pos = cmp::min(
176 1 + lastpos
177 + memchr(self.delimiter, &self.buf[lastpos..self.end_of_complete])
178 .unwrap_or(self.end_of_complete),
179 self.end_of_complete,
180 );
181
182 return Some(Ok(&self.buf[lastpos..self.pos]));
183 }
184
185 match self.refill() {
186 Ok(true) => self.next_line(),
187 Ok(false) => {
188 if self.end_of_buffer == self.pos {
189 None
190 } else {
191 self.pos = self.end_of_buffer;
192 Some(Ok(&self.buf[..self.end_of_buffer]))
193 }
194 }
195 Err(e) => Some(Err(e)),
196 }
197 }
198
199 /// Return a slice of complete lines, up to the size of the internal buffer.
200 ///
201 /// This is functionally identical to next_line, only instead of getting up
202 /// to the *first* instance of the delimiter, you get up to the *last*.
203 ///
204 /// ```no_run
205 /// # use linereader::LineReader;
206 /// # use std::fs::File;
207 /// # use std::io;
208 /// # fn x() -> io::Result<()> {
209 /// # let mut reader = LineReader::new(File::open("myfile.txt")?);
210 /// while let Some(lines) = reader.next_batch() {
211 /// let lines = lines?; // unwrap io::Result to &[u8]
212 /// }
213 /// # Ok(())
214 /// # }
215 /// ```
216 pub fn next_batch(&mut self) -> Option<io::Result<&[u8]>> {
217 if self.pos < self.end_of_complete {
218 let ret = &self.buf[self.pos..self.end_of_complete];
219 self.pos = self.end_of_complete;
220 return Some(Ok(ret));
221 }
222
223 match self.refill() {
224 Ok(true) => self.next_batch(),
225 Ok(false) => {
226 if self.end_of_buffer == self.pos {
227 None
228 } else {
229 self.pos = self.end_of_buffer;
230 Some(Ok(&self.buf[..self.end_of_buffer]))
231 }
232 }
233 Err(e) => Some(Err(e)),
234 }
235 }
236
237 fn refill(&mut self) -> io::Result<bool> {
238 assert!(self.pos == self.end_of_complete);
239 assert!(self.end_of_complete <= self.end_of_buffer);
240
241 self.pos = 0;
242
243 // Move the start of the next line, if any, to the start of buf
244 let fragment_len = self.end_of_buffer - self.end_of_complete;
245 if fragment_len > 0 {
246 // unsafe variants of these using ptr::copy/copy_nonoverlapping can
247 // be found in 5ccea2c - they made no appreciable difference.
248 if fragment_len > self.end_of_complete {
249 self.buf.drain(..self.end_of_complete);
250 self.buf.extend(vec![0_u8; self.end_of_complete]);
251 } else {
252 let (start, rest) = self.buf.split_at_mut(self.end_of_complete);
253 start[0..fragment_len].copy_from_slice(&rest[0..fragment_len]);
254 }
255 self.end_of_buffer = fragment_len;
256 } else {
257 self.end_of_buffer = 0;
258 }
259
260 // Fill the rest of buf from the underlying IO
261 while self.end_of_buffer < self.buf.len() {
262 // Loop until we find a delimiter or read zero bytes.
263 match self.inner.read(&mut self.buf[self.end_of_buffer..]) {
264 Ok(0) => {
265 self.end_of_complete = self.end_of_buffer;
266 return Ok(false);
267 }
268 Ok(n) => {
269 let lastpos = self.end_of_buffer;
270 self.end_of_buffer += n;
271 if let Some(nl) =
272 memrchr(self.delimiter, &self.buf[lastpos..self.end_of_buffer])
273 {
274 self.end_of_complete = cmp::min(self.end_of_buffer, 1 + lastpos + nl);
275 return Ok(true);
276 } else {
277 // No delimiter - see if we can read any more.
278 self.end_of_complete = self.end_of_buffer;
279 }
280 }
281 Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
282 Err(e) => return Err(e),
283 }
284 }
285
286 // We read through until the end of the buffer.
287 Ok(true)
288 }
289
290 /// Reset the internal state of the buffer. Next lines are read from wherever
291 /// the reader happens to be.
292 pub fn reset(&mut self) {
293 self.pos = 0;
294 self.end_of_buffer = 0;
295 self.end_of_complete = 0;
296 }
297
298 /// Get a reference to the reader.
299 pub fn get_ref(&self) -> &R {
300 &self.inner
301 }
302
303 /// Get a mutable reference to the reader.
304 pub fn get_mut(&mut self) -> &mut R {
305 &mut self.inner
306 }
307
308 /// Unwrap this `LineReader`, returning the underlying reader and discarding any
309 /// unread buffered lines.
310 pub fn into_inner(self) -> R {
311 self.inner
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use LineReader;
318
319 #[test]
320 fn test_next_line() {
321 let buf: &[u8] = b"0a0\n1bb1\n2ccc2\n3dddd3\n4eeeee4\n5ffffffff5\n6ggggg6\n7hhhhhh7";
322 let mut reader = LineReader::with_capacity(8, buf);
323
324 assert_eq!(b"0a0\n", reader.next_line().unwrap().unwrap());
325 assert_eq!(b"1bb1\n", reader.next_line().unwrap().unwrap());
326 assert_eq!(b"2ccc2\n", reader.next_line().unwrap().unwrap());
327 assert_eq!(b"3dddd3\n", reader.next_line().unwrap().unwrap());
328 assert_eq!(b"4eeeee4\n", reader.next_line().unwrap().unwrap());
329 assert_eq!(b"5fffffff", reader.next_line().unwrap().unwrap());
330 assert_eq!(b"f5\n", reader.next_line().unwrap().unwrap());
331 assert_eq!(b"6ggggg6\n", reader.next_line().unwrap().unwrap());
332 assert_eq!(b"7hhhhhh7", reader.next_line().unwrap().unwrap());
333 assert!(reader.next_line().is_none());
334 }
335
336 #[test]
337 fn test_next_batch() {
338 let buf: &[u8] = b"0a0\n1bb1\n2ccc2\n3dddd3\n4eeeee4\n5ffffffff5\n6ggggg6\n7hhhhhh7";
339 let mut reader = LineReader::with_capacity(19, buf);
340
341 assert_eq!(b"0a0\n1bb1\n2ccc2\n", reader.next_batch().unwrap().unwrap());
342 assert_eq!(b"3dddd3\n4eeeee4\n", reader.next_batch().unwrap().unwrap());
343 assert_eq!(
344 b"5ffffffff5\n6ggggg6\n",
345 reader.next_batch().unwrap().unwrap()
346 );
347 assert_eq!(b"7hhhhhh7", reader.next_batch().unwrap().unwrap());
348 }
349
350 #[test]
351 fn test_for_each() {
352 let buf: &[u8] = b"f\nba\nbaz\n";
353 let mut reader = LineReader::new(buf);
354
355 let mut len = 2;
356 reader.for_each(|l| { assert_eq!(len, l.len()); len += 1; Ok(true) }).unwrap();
357
358 let buf: &[u8] = b"f\nba\nbaz\n";
359 let mut reader = LineReader::new(buf);
360
361 reader.for_each(|l| { assert_eq!(l.len(), 2); Ok(false) }).unwrap();
362 }
363
364 extern crate rand;
365 use std::io::BufRead;
366 use std::io::{Cursor, Read};
367 use tests::rand::prelude::*;
368
369 #[test]
370 fn test_next_line_randomly() {
371 let mut rng = thread_rng();
372
373 for _ in 1..128 {
374 let mut buf = [0u8; 65535];
375 rng.fill(&mut buf[..]);
376 let delimiter = rng.gen::<u8>();
377 let max_line = rng.gen::<u8>().saturating_add(8) as usize;
378
379 let mut reader =
380 LineReader::with_delimiter_and_capacity(delimiter, max_line, Cursor::new(&buf[..]));
381 let mut cursor = Cursor::new(&buf[..]);
382 let mut expected = vec![];
383
384 while cursor
385 .by_ref()
386 .take(max_line as u64)
387 .read_until(delimiter, &mut expected)
388 .unwrap() > 0
389 {
390 assert_eq!(expected, reader.next_line().unwrap().unwrap());
391 expected.clear();
392 }
393
394 assert!(reader.next_line().is_none());
395 }
396 }
397}
398