1use core::ops::Range;
2use std::boxed::Box;
3use std::cell::RefCell;
4use std::collections::hash_map::Entry;
5use std::collections::HashMap;
6use std::convert::TryInto;
7use std::io::{Read, Seek, SeekFrom};
8use std::mem;
9use std::vec::Vec;
10
11use crate::read::ReadRef;
12
13/// An implementation of [`ReadRef`] for data in a stream that implements
14/// `Read + Seek`.
15///
16/// Contains a cache of read-only blocks of data, allowing references to
17/// them to be returned. Entries in the cache are never removed.
18/// Entries are keyed on the offset and size of the read.
19/// Currently overlapping reads are considered separate reads.
20#[derive(Debug)]
21pub struct ReadCache<R: Read + Seek> {
22 cache: RefCell<ReadCacheInternal<R>>,
23}
24
25#[derive(Debug)]
26struct ReadCacheInternal<R: Read + Seek> {
27 read: R,
28 bufs: HashMap<(u64, u64), Box<[u8]>>,
29 strings: HashMap<(u64, u8), Box<[u8]>>,
30}
31
32impl<R: Read + Seek> ReadCache<R> {
33 /// Create an empty `ReadCache` for the given stream.
34 pub fn new(read: R) -> Self {
35 ReadCache {
36 cache: RefCell::new(ReadCacheInternal {
37 read,
38 bufs: HashMap::new(),
39 strings: HashMap::new(),
40 }),
41 }
42 }
43
44 /// Return an implementation of `ReadRef` that restricts reads
45 /// to the given range of the stream.
46 pub fn range(&self, offset: u64, size: u64) -> ReadCacheRange<'_, R> {
47 ReadCacheRange {
48 r: self,
49 offset,
50 size,
51 }
52 }
53
54 /// Free buffers used by the cache.
55 pub fn clear(&mut self) {
56 self.cache.borrow_mut().bufs.clear();
57 }
58
59 /// Unwrap this `ReadCache<R>`, returning the underlying reader.
60 pub fn into_inner(self) -> R {
61 self.cache.into_inner().read
62 }
63}
64
65impl<'a, R: Read + Seek> ReadRef<'a> for &'a ReadCache<R> {
66 fn len(self) -> Result<u64, ()> {
67 let cache = &mut *self.cache.borrow_mut();
68 cache.read.seek(SeekFrom::End(0)).map_err(|_| ())
69 }
70
71 fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> {
72 if size == 0 {
73 return Ok(&[]);
74 }
75 let cache = &mut *self.cache.borrow_mut();
76 let buf = match cache.bufs.entry((offset, size)) {
77 Entry::Occupied(entry) => entry.into_mut(),
78 Entry::Vacant(entry) => {
79 let size = size.try_into().map_err(|_| ())?;
80 cache.read.seek(SeekFrom::Start(offset)).map_err(|_| ())?;
81 let mut bytes = vec![0; size].into_boxed_slice();
82 cache.read.read_exact(&mut bytes).map_err(|_| ())?;
83 entry.insert(bytes)
84 }
85 };
86 // Extend the lifetime to that of self.
87 // This is OK because we never mutate or remove entries.
88 Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) })
89 }
90
91 fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> {
92 let cache = &mut *self.cache.borrow_mut();
93 let buf = match cache.strings.entry((range.start, delimiter)) {
94 Entry::Occupied(entry) => entry.into_mut(),
95 Entry::Vacant(entry) => {
96 cache
97 .read
98 .seek(SeekFrom::Start(range.start))
99 .map_err(|_| ())?;
100
101 let max_check: usize = (range.end - range.start).try_into().map_err(|_| ())?;
102 // Strings should be relatively small.
103 // TODO: make this configurable?
104 let max_check = max_check.min(4096);
105
106 let mut bytes = Vec::new();
107 let mut checked = 0;
108 loop {
109 bytes.resize((checked + 256).min(max_check), 0);
110 let read = cache.read.read(&mut bytes[checked..]).map_err(|_| ())?;
111 if read == 0 {
112 return Err(());
113 }
114 if let Some(len) = memchr::memchr(delimiter, &bytes[checked..][..read]) {
115 bytes.truncate(checked + len);
116 break entry.insert(bytes.into_boxed_slice());
117 }
118 checked += read;
119 if checked >= max_check {
120 return Err(());
121 }
122 }
123 }
124 };
125 // Extend the lifetime to that of self.
126 // This is OK because we never mutate or remove entries.
127 Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) })
128 }
129}
130
131/// An implementation of [`ReadRef`] for a range of data in a stream that
132/// implements `Read + Seek`.
133///
134/// Shares an underlying `ReadCache` with a lifetime of `'a`.
135#[derive(Debug)]
136pub struct ReadCacheRange<'a, R: Read + Seek> {
137 r: &'a ReadCache<R>,
138 offset: u64,
139 size: u64,
140}
141
142impl<'a, R: Read + Seek> Clone for ReadCacheRange<'a, R> {
143 fn clone(&self) -> Self {
144 *self
145 }
146}
147
148impl<'a, R: Read + Seek> Copy for ReadCacheRange<'a, R> {}
149
150impl<'a, R: Read + Seek> ReadRef<'a> for ReadCacheRange<'a, R> {
151 fn len(self) -> Result<u64, ()> {
152 Ok(self.size)
153 }
154
155 fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> {
156 if size == 0 {
157 return Ok(&[]);
158 }
159 let end = offset.checked_add(size).ok_or(())?;
160 if end > self.size {
161 return Err(());
162 }
163 let r_offset = self.offset.checked_add(offset).ok_or(())?;
164 self.r.read_bytes_at(r_offset, size)
165 }
166
167 fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> {
168 let r_start = self.offset.checked_add(range.start).ok_or(())?;
169 let r_end = self.offset.checked_add(range.end).ok_or(())?;
170 let bytes = self.r.read_bytes_at_until(r_start..r_end, delimiter)?;
171 let size = bytes.len().try_into().map_err(|_| ())?;
172 let end = range.start.checked_add(size).ok_or(())?;
173 if end > self.size {
174 return Err(());
175 }
176 Ok(bytes)
177 }
178}
179