1 | use core::ops::Range; |
2 | use std::boxed::Box; |
3 | use std::cell::RefCell; |
4 | use std::collections::hash_map::Entry; |
5 | use std::collections::HashMap; |
6 | use std::convert::TryInto; |
7 | use std::io::{Read, Seek, SeekFrom}; |
8 | use std::mem; |
9 | use std::vec::Vec; |
10 | |
11 | use crate::read::ReadRef; |
12 | |
13 | /// An implementation of [`ReadRef`] for data in a stream that implements |
14 | /// `Read + Seek`. |
15 | /// |
16 | /// Contains a cache of read-only blocks of data, allowing references to |
17 | /// them to be returned. Entries in the cache are never removed. |
18 | /// Entries are keyed on the offset and size of the read. |
19 | /// Currently overlapping reads are considered separate reads. |
20 | #[derive (Debug)] |
21 | pub struct ReadCache<R: Read + Seek> { |
22 | cache: RefCell<ReadCacheInternal<R>>, |
23 | } |
24 | |
25 | #[derive (Debug)] |
26 | struct ReadCacheInternal<R: Read + Seek> { |
27 | read: R, |
28 | bufs: HashMap<(u64, u64), Box<[u8]>>, |
29 | strings: HashMap<(u64, u8), Box<[u8]>>, |
30 | } |
31 | |
32 | impl<R: Read + Seek> ReadCache<R> { |
33 | /// Create an empty `ReadCache` for the given stream. |
34 | pub fn new(read: R) -> Self { |
35 | ReadCache { |
36 | cache: RefCell::new(ReadCacheInternal { |
37 | read, |
38 | bufs: HashMap::new(), |
39 | strings: HashMap::new(), |
40 | }), |
41 | } |
42 | } |
43 | |
44 | /// Return an implementation of `ReadRef` that restricts reads |
45 | /// to the given range of the stream. |
46 | pub fn range(&self, offset: u64, size: u64) -> ReadCacheRange<'_, R> { |
47 | ReadCacheRange { |
48 | r: self, |
49 | offset, |
50 | size, |
51 | } |
52 | } |
53 | |
54 | /// Free buffers used by the cache. |
55 | pub fn clear(&mut self) { |
56 | self.cache.borrow_mut().bufs.clear(); |
57 | } |
58 | |
59 | /// Unwrap this `ReadCache<R>`, returning the underlying reader. |
60 | pub fn into_inner(self) -> R { |
61 | self.cache.into_inner().read |
62 | } |
63 | } |
64 | |
65 | impl<'a, R: Read + Seek> ReadRef<'a> for &'a ReadCache<R> { |
66 | fn len(self) -> Result<u64, ()> { |
67 | let cache = &mut *self.cache.borrow_mut(); |
68 | cache.read.seek(SeekFrom::End(0)).map_err(|_| ()) |
69 | } |
70 | |
71 | fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> { |
72 | if size == 0 { |
73 | return Ok(&[]); |
74 | } |
75 | let cache = &mut *self.cache.borrow_mut(); |
76 | let buf = match cache.bufs.entry((offset, size)) { |
77 | Entry::Occupied(entry) => entry.into_mut(), |
78 | Entry::Vacant(entry) => { |
79 | let size = size.try_into().map_err(|_| ())?; |
80 | cache.read.seek(SeekFrom::Start(offset)).map_err(|_| ())?; |
81 | let mut bytes = vec![0; size].into_boxed_slice(); |
82 | cache.read.read_exact(&mut bytes).map_err(|_| ())?; |
83 | entry.insert(bytes) |
84 | } |
85 | }; |
86 | // Extend the lifetime to that of self. |
87 | // This is OK because we never mutate or remove entries. |
88 | Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) }) |
89 | } |
90 | |
91 | fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> { |
92 | let cache = &mut *self.cache.borrow_mut(); |
93 | let buf = match cache.strings.entry((range.start, delimiter)) { |
94 | Entry::Occupied(entry) => entry.into_mut(), |
95 | Entry::Vacant(entry) => { |
96 | cache |
97 | .read |
98 | .seek(SeekFrom::Start(range.start)) |
99 | .map_err(|_| ())?; |
100 | |
101 | let max_check: usize = (range.end - range.start).try_into().map_err(|_| ())?; |
102 | // Strings should be relatively small. |
103 | // TODO: make this configurable? |
104 | let max_check = max_check.min(4096); |
105 | |
106 | let mut bytes = Vec::new(); |
107 | let mut checked = 0; |
108 | loop { |
109 | bytes.resize((checked + 256).min(max_check), 0); |
110 | let read = cache.read.read(&mut bytes[checked..]).map_err(|_| ())?; |
111 | if read == 0 { |
112 | return Err(()); |
113 | } |
114 | if let Some(len) = memchr::memchr(delimiter, &bytes[checked..][..read]) { |
115 | bytes.truncate(checked + len); |
116 | break entry.insert(bytes.into_boxed_slice()); |
117 | } |
118 | checked += read; |
119 | if checked >= max_check { |
120 | return Err(()); |
121 | } |
122 | } |
123 | } |
124 | }; |
125 | // Extend the lifetime to that of self. |
126 | // This is OK because we never mutate or remove entries. |
127 | Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) }) |
128 | } |
129 | } |
130 | |
131 | /// An implementation of [`ReadRef`] for a range of data in a stream that |
132 | /// implements `Read + Seek`. |
133 | /// |
134 | /// Shares an underlying `ReadCache` with a lifetime of `'a`. |
135 | #[derive (Debug)] |
136 | pub struct ReadCacheRange<'a, R: Read + Seek> { |
137 | r: &'a ReadCache<R>, |
138 | offset: u64, |
139 | size: u64, |
140 | } |
141 | |
142 | impl<'a, R: Read + Seek> Clone for ReadCacheRange<'a, R> { |
143 | fn clone(&self) -> Self { |
144 | *self |
145 | } |
146 | } |
147 | |
148 | impl<'a, R: Read + Seek> Copy for ReadCacheRange<'a, R> {} |
149 | |
150 | impl<'a, R: Read + Seek> ReadRef<'a> for ReadCacheRange<'a, R> { |
151 | fn len(self) -> Result<u64, ()> { |
152 | Ok(self.size) |
153 | } |
154 | |
155 | fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> { |
156 | if size == 0 { |
157 | return Ok(&[]); |
158 | } |
159 | let end = offset.checked_add(size).ok_or(())?; |
160 | if end > self.size { |
161 | return Err(()); |
162 | } |
163 | let r_offset = self.offset.checked_add(offset).ok_or(())?; |
164 | self.r.read_bytes_at(r_offset, size) |
165 | } |
166 | |
167 | fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> { |
168 | let r_start = self.offset.checked_add(range.start).ok_or(())?; |
169 | let r_end = self.offset.checked_add(range.end).ok_or(())?; |
170 | let bytes = self.r.read_bytes_at_until(r_start..r_end, delimiter)?; |
171 | let size = bytes.len().try_into().map_err(|_| ())?; |
172 | let end = range.start.checked_add(size).ok_or(())?; |
173 | if end > self.size { |
174 | return Err(()); |
175 | } |
176 | Ok(bytes) |
177 | } |
178 | } |
179 | |