1 | /*! |
2 | This module provides two `std::io::Read` implementations: |
3 | |
4 | * [`read::FrameDecoder`](struct.FrameDecoder.html) |
5 | wraps another `std::io::Read` implemenation, and decompresses data encoded |
6 | using the Snappy frame format. Use this if you have a compressed data source |
7 | and wish to read it as uncompressed data. |
8 | * [`read::FrameEncoder`](struct.FrameEncoder.html) |
9 | wraps another `std::io::Read` implemenation, and compresses data encoded |
10 | using the Snappy frame format. Use this if you have uncompressed data source |
11 | and wish to read it as compressed data. |
12 | |
13 | Typically, `read::FrameDecoder` is the version that you'll want. |
14 | */ |
15 | |
16 | use std::cmp; |
17 | use std::fmt; |
18 | use std::io; |
19 | |
20 | use crate::bytes; |
21 | use crate::compress::Encoder; |
22 | use crate::crc32::CheckSummer; |
23 | use crate::decompress::{decompress_len, Decoder}; |
24 | use crate::error::Error; |
25 | use crate::frame::{ |
26 | compress_frame, ChunkType, CHUNK_HEADER_AND_CRC_SIZE, |
27 | MAX_COMPRESS_BLOCK_SIZE, STREAM_BODY, STREAM_IDENTIFIER, |
28 | }; |
29 | use crate::MAX_BLOCK_SIZE; |
30 | |
31 | /// The maximum size of a compressed block, including the header and stream |
32 | /// identifier, that can be emitted by FrameEncoder. |
33 | const MAX_READ_FRAME_ENCODER_BLOCK_SIZE: usize = STREAM_IDENTIFIER.len() |
34 | + CHUNK_HEADER_AND_CRC_SIZE |
35 | + MAX_COMPRESS_BLOCK_SIZE; |
36 | |
37 | /// A reader for decompressing a Snappy stream. |
38 | /// |
39 | /// This `FrameDecoder` wraps any other reader that implements `std::io::Read`. |
40 | /// Bytes read from this reader are decompressed using the |
41 | /// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt) |
42 | /// (file extension `sz`, MIME type `application/x-snappy-framed`). |
43 | /// |
44 | /// This reader can potentially make many small reads from the underlying |
45 | /// stream depending on its format, therefore, passing in a buffered reader |
46 | /// may be beneficial. |
47 | pub struct FrameDecoder<R: io::Read> { |
48 | /// The underlying reader. |
49 | r: R, |
50 | /// A Snappy decoder that we reuse that does the actual block based |
51 | /// decompression. |
52 | dec: Decoder, |
53 | /// A CRC32 checksummer that is configured to either use the portable |
54 | /// fallback version or the SSE4.2 accelerated version when the right CPU |
55 | /// features are available. |
56 | checksummer: CheckSummer, |
57 | /// The compressed bytes buffer, taken from the underlying reader. |
58 | src: Vec<u8>, |
59 | /// The decompressed bytes buffer. Bytes are decompressed from src to dst |
60 | /// before being passed back to the caller. |
61 | dst: Vec<u8>, |
62 | /// Index into dst: starting point of bytes not yet given back to caller. |
63 | dsts: usize, |
64 | /// Index into dst: ending point of bytes not yet given back to caller. |
65 | dste: usize, |
66 | /// Whether we've read the special stream header or not. |
67 | read_stream_ident: bool, |
68 | } |
69 | |
70 | impl<R: io::Read> FrameDecoder<R> { |
71 | /// Create a new reader for streaming Snappy decompression. |
72 | pub fn new(rdr: R) -> FrameDecoder<R> { |
73 | FrameDecoder { |
74 | r: rdr, |
75 | dec: Decoder::new(), |
76 | checksummer: CheckSummer::new(), |
77 | src: vec![0; MAX_COMPRESS_BLOCK_SIZE], |
78 | dst: vec![0; MAX_BLOCK_SIZE], |
79 | dsts: 0, |
80 | dste: 0, |
81 | read_stream_ident: false, |
82 | } |
83 | } |
84 | |
85 | /// Gets a reference to the underlying reader in this decoder. |
86 | pub fn get_ref(&self) -> &R { |
87 | &self.r |
88 | } |
89 | |
90 | /// Gets a mutable reference to the underlying reader in this decoder. |
91 | /// |
92 | /// Note that mutation of the stream may result in surprising results if |
93 | /// this decoder is continued to be used. |
94 | pub fn get_mut(&mut self) -> &mut R { |
95 | &mut self.r |
96 | } |
97 | |
98 | /// Gets the underlying reader of this decoder. |
99 | pub fn into_inner(self) -> R { |
100 | self.r |
101 | } |
102 | } |
103 | |
104 | impl<R: io::Read> io::Read for FrameDecoder<R> { |
105 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
106 | macro_rules! fail { |
107 | ($err:expr) => { |
108 | return Err(io::Error::from($err)) |
109 | }; |
110 | } |
111 | loop { |
112 | if self.dsts < self.dste { |
113 | let len = cmp::min(self.dste - self.dsts, buf.len()); |
114 | let dste = self.dsts.checked_add(len).unwrap(); |
115 | buf[0..len].copy_from_slice(&self.dst[self.dsts..dste]); |
116 | self.dsts = dste; |
117 | return Ok(len); |
118 | } |
119 | if !read_exact_eof(&mut self.r, &mut self.src[0..4])? { |
120 | return Ok(0); |
121 | } |
122 | let ty = ChunkType::from_u8(self.src[0]); |
123 | if !self.read_stream_ident { |
124 | if ty != Ok(ChunkType::Stream) { |
125 | fail!(Error::StreamHeader { byte: self.src[0] }); |
126 | } |
127 | self.read_stream_ident = true; |
128 | } |
129 | let len64 = bytes::read_u24_le(&self.src[1..]) as u64; |
130 | if len64 > self.src.len() as u64 { |
131 | fail!(Error::UnsupportedChunkLength { |
132 | len: len64, |
133 | header: false, |
134 | }); |
135 | } |
136 | let len = len64 as usize; |
137 | match ty { |
138 | Err(b) if 0x02 <= b && b <= 0x7F => { |
139 | // Spec says that chunk types 0x02-0x7F are reserved and |
140 | // conformant decoders must return an error. |
141 | fail!(Error::UnsupportedChunkType { byte: b }); |
142 | } |
143 | Err(b) if 0x80 <= b && b <= 0xFD => { |
144 | // Spec says that chunk types 0x80-0xFD are reserved but |
145 | // skippable. |
146 | self.r.read_exact(&mut self.src[0..len])?; |
147 | } |
148 | Err(b) => { |
149 | // Can never happen. 0x02-0x7F and 0x80-0xFD are handled |
150 | // above in the error case. That leaves 0x00, 0x01, 0xFE |
151 | // and 0xFF, each of which correspond to one of the four |
152 | // defined chunk types. |
153 | unreachable!("BUG: unhandled chunk type: {}" , b); |
154 | } |
155 | Ok(ChunkType::Padding) => { |
156 | // Just read and move on. |
157 | self.r.read_exact(&mut self.src[0..len])?; |
158 | } |
159 | Ok(ChunkType::Stream) => { |
160 | if len != STREAM_BODY.len() { |
161 | fail!(Error::UnsupportedChunkLength { |
162 | len: len64, |
163 | header: true, |
164 | }) |
165 | } |
166 | self.r.read_exact(&mut self.src[0..len])?; |
167 | if &self.src[0..len] != STREAM_BODY { |
168 | fail!(Error::StreamHeaderMismatch { |
169 | bytes: self.src[0..len].to_vec(), |
170 | }); |
171 | } |
172 | } |
173 | Ok(ChunkType::Uncompressed) => { |
174 | if len < 4 { |
175 | fail!(Error::UnsupportedChunkLength { |
176 | len: len as u64, |
177 | header: false, |
178 | }); |
179 | } |
180 | let expected_sum = bytes::io_read_u32_le(&mut self.r)?; |
181 | let n = len - 4; |
182 | if n > self.dst.len() { |
183 | fail!(Error::UnsupportedChunkLength { |
184 | len: n as u64, |
185 | header: false, |
186 | }); |
187 | } |
188 | self.r.read_exact(&mut self.dst[0..n])?; |
189 | let got_sum = |
190 | self.checksummer.crc32c_masked(&self.dst[0..n]); |
191 | if expected_sum != got_sum { |
192 | fail!(Error::Checksum { |
193 | expected: expected_sum, |
194 | got: got_sum, |
195 | }); |
196 | } |
197 | self.dsts = 0; |
198 | self.dste = n; |
199 | } |
200 | Ok(ChunkType::Compressed) => { |
201 | if len < 4 { |
202 | fail!(Error::UnsupportedChunkLength { |
203 | len: len as u64, |
204 | header: false, |
205 | }); |
206 | } |
207 | let expected_sum = bytes::io_read_u32_le(&mut self.r)?; |
208 | let sn = len - 4; |
209 | if sn > self.src.len() { |
210 | fail!(Error::UnsupportedChunkLength { |
211 | len: len64, |
212 | header: false, |
213 | }); |
214 | } |
215 | self.r.read_exact(&mut self.src[0..sn])?; |
216 | let dn = decompress_len(&self.src)?; |
217 | if dn > self.dst.len() { |
218 | fail!(Error::UnsupportedChunkLength { |
219 | len: dn as u64, |
220 | header: false, |
221 | }); |
222 | } |
223 | self.dec |
224 | .decompress(&self.src[0..sn], &mut self.dst[0..dn])?; |
225 | let got_sum = |
226 | self.checksummer.crc32c_masked(&self.dst[0..dn]); |
227 | if expected_sum != got_sum { |
228 | fail!(Error::Checksum { |
229 | expected: expected_sum, |
230 | got: got_sum, |
231 | }); |
232 | } |
233 | self.dsts = 0; |
234 | self.dste = dn; |
235 | } |
236 | } |
237 | } |
238 | } |
239 | } |
240 | |
241 | impl<R: fmt::Debug + io::Read> fmt::Debug for FrameDecoder<R> { |
242 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
243 | f&mut DebugStruct<'_, '_>.debug_struct("FrameDecoder" ) |
244 | .field("r" , &self.r) |
245 | .field("dec" , &self.dec) |
246 | .field("checksummer" , &self.checksummer) |
247 | .field("src" , &"[...]" ) |
248 | .field("dst" , &"[...]" ) |
249 | .field("dsts" , &self.dsts) |
250 | .field("dste" , &self.dste) |
251 | .field(name:"read_stream_ident" , &self.read_stream_ident) |
252 | .finish() |
253 | } |
254 | } |
255 | |
256 | /// A reader for compressing data using snappy as it is read. |
257 | /// |
258 | /// This `FrameEncoder` wraps any other reader that implements `std::io::Read`. |
259 | /// Bytes read from this reader are compressed using the |
260 | /// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt) |
261 | /// (file extension `sz`, MIME type `application/x-snappy-framed`). |
262 | /// |
263 | /// Usually you'll want |
264 | /// [`read::FrameDecoder`](struct.FrameDecoder.html) |
265 | /// (for decompressing while reading) or |
266 | /// [`write::FrameEncoder`](../write/struct.FrameEncoder.html) |
267 | /// (for compressing while writing) instead. |
268 | /// |
269 | /// Unlike `FrameDecoder`, this will attempt to make large reads roughly |
270 | /// equivalent to the size of a single Snappy block. Therefore, callers may not |
271 | /// benefit from using a buffered reader. |
272 | pub struct FrameEncoder<R: io::Read> { |
273 | /// Internally, we split `FrameEncoder` in two to keep the borrow checker |
274 | /// happy. The `inner` member contains everything that `read_frame` needs |
275 | /// to fetch a frame's worth of data and compress it. |
276 | inner: Inner<R>, |
277 | /// Data that we've encoded and are ready to return to our caller. |
278 | dst: Vec<u8>, |
279 | /// Starting point of bytes in `dst` not yet given back to the caller. |
280 | dsts: usize, |
281 | /// Ending point of bytes in `dst` that we want to give to our caller. |
282 | dste: usize, |
283 | } |
284 | |
285 | struct Inner<R: io::Read> { |
286 | /// The underlying data source. |
287 | r: R, |
288 | /// An encoder that we reuse that does the actual block based compression. |
289 | enc: Encoder, |
290 | /// A CRC32 checksummer that is configured to either use the portable |
291 | /// fallback version or the SSE4.2 accelerated version when the right CPU |
292 | /// features are available. |
293 | checksummer: CheckSummer, |
294 | /// Data taken from the underlying `r`, and not yet compressed. |
295 | src: Vec<u8>, |
296 | /// Have we written the standard snappy header to `dst` yet? |
297 | wrote_stream_ident: bool, |
298 | } |
299 | |
300 | impl<R: io::Read> FrameEncoder<R> { |
301 | /// Create a new reader for streaming Snappy compression. |
302 | pub fn new(rdr: R) -> FrameEncoder<R> { |
303 | FrameEncoder { |
304 | inner: Inner { |
305 | r: rdr, |
306 | enc: Encoder::new(), |
307 | checksummer: CheckSummer::new(), |
308 | src: vec![0; MAX_BLOCK_SIZE], |
309 | wrote_stream_ident: false, |
310 | }, |
311 | dst: vec![0; MAX_READ_FRAME_ENCODER_BLOCK_SIZE], |
312 | dsts: 0, |
313 | dste: 0, |
314 | } |
315 | } |
316 | |
317 | /// Gets a reference to the underlying reader in this decoder. |
318 | pub fn get_ref(&self) -> &R { |
319 | &self.inner.r |
320 | } |
321 | |
322 | /// Gets a mutable reference to the underlying reader in this decoder. |
323 | /// |
324 | /// Note that mutation of the stream may result in surprising results if |
325 | /// this encoder is continued to be used. |
326 | pub fn get_mut(&mut self) -> &mut R { |
327 | &mut self.inner.r |
328 | } |
329 | |
330 | /// Read previously compressed data from `self.dst`, returning the number of |
331 | /// bytes read. If `self.dst` is empty, returns 0. |
332 | fn read_from_dst(&mut self, buf: &mut [u8]) -> usize { |
333 | let available_bytes = self.dste - self.dsts; |
334 | let count = cmp::min(available_bytes, buf.len()); |
335 | buf[..count].copy_from_slice(&self.dst[self.dsts..self.dsts + count]); |
336 | self.dsts += count; |
337 | count |
338 | } |
339 | } |
340 | |
341 | impl<R: io::Read> io::Read for FrameEncoder<R> { |
342 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
343 | // Try reading previously compressed bytes from our `dst` buffer, if |
344 | // any. |
345 | let count: usize = self.read_from_dst(buf); |
346 | |
347 | if count > 0 { |
348 | // We had some bytes in our `dst` buffer that we used. |
349 | Ok(count) |
350 | } else if buf.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE { |
351 | // Our output `buf` is big enough that we can directly write into |
352 | // it, so bypass `dst` entirely. |
353 | self.inner.read_frame(dst:buf) |
354 | } else { |
355 | // We need to refill `self.dst`, and then return some bytes from |
356 | // that. |
357 | let count: usize = self.inner.read_frame(&mut self.dst)?; |
358 | self.dsts = 0; |
359 | self.dste = count; |
360 | Ok(self.read_from_dst(buf)) |
361 | } |
362 | } |
363 | } |
364 | |
365 | impl<R: io::Read> Inner<R> { |
366 | /// Read from `self.r`, and create a new frame, writing it to `dst`, which |
367 | /// must be at least `MAX_READ_FRAME_ENCODER_BLOCK_SIZE` bytes in size. |
368 | fn read_frame(&mut self, dst: &mut [u8]) -> io::Result<usize> { |
369 | debug_assert!(dst.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE); |
370 | |
371 | // We make one read to the underlying reader. If the underlying reader |
372 | // doesn't fill the buffer but there are still bytes to be read, then |
373 | // compression won't be optimal. The alternative would be to block |
374 | // until our buffer is maximally full (or we see EOF), but this seems |
375 | // more surprising. In general, io::Read implementations should try to |
376 | // fill the caller's buffer as much as they can, so this seems like the |
377 | // better choice. |
378 | let nread = self.r.read(&mut self.src)?; |
379 | if nread == 0 { |
380 | return Ok(0); |
381 | } |
382 | |
383 | // If we haven't yet written the stream header to `dst`, write it. |
384 | let mut dst_write_start = 0; |
385 | if !self.wrote_stream_ident { |
386 | dst[0..STREAM_IDENTIFIER.len()].copy_from_slice(STREAM_IDENTIFIER); |
387 | dst_write_start += STREAM_IDENTIFIER.len(); |
388 | self.wrote_stream_ident = true; |
389 | } |
390 | |
391 | // Reserve space for our chunk header. We need to use `split_at_mut` so |
392 | // that we can get two mutable slices pointing at non-overlapping parts |
393 | // of `dst`. |
394 | let (chunk_header, remaining_dst) = |
395 | dst[dst_write_start..].split_at_mut(CHUNK_HEADER_AND_CRC_SIZE); |
396 | dst_write_start += CHUNK_HEADER_AND_CRC_SIZE; |
397 | |
398 | // Compress our frame if possible, telling `compress_frame` to always |
399 | // put the output in `dst`. |
400 | let frame_data = compress_frame( |
401 | &mut self.enc, |
402 | self.checksummer, |
403 | &self.src[..nread], |
404 | chunk_header, |
405 | remaining_dst, |
406 | true, |
407 | )?; |
408 | Ok(dst_write_start + frame_data.len()) |
409 | } |
410 | } |
411 | |
412 | impl<R: fmt::Debug + io::Read> fmt::Debug for FrameEncoder<R> { |
413 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
414 | f&mut DebugStruct<'_, '_>.debug_struct("FrameEncoder" ) |
415 | .field("inner" , &self.inner) |
416 | .field("dst" , &"[...]" ) |
417 | .field("dsts" , &self.dsts) |
418 | .field(name:"dste" , &self.dste) |
419 | .finish() |
420 | } |
421 | } |
422 | |
423 | impl<R: fmt::Debug + io::Read> fmt::Debug for Inner<R> { |
424 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
425 | f&mut DebugStruct<'_, '_>.debug_struct("Inner" ) |
426 | .field("r" , &self.r) |
427 | .field("enc" , &self.enc) |
428 | .field("checksummer" , &self.checksummer) |
429 | .field("src" , &"[...]" ) |
430 | .field(name:"wrote_stream_ident" , &self.wrote_stream_ident) |
431 | .finish() |
432 | } |
433 | } |
434 | |
435 | // read_exact_eof is like Read::read_exact, except it detects EOF |
436 | // and returns Ok(false) instead of an error. |
437 | // |
438 | // If buf was read successfully, it returns Ok(true). |
439 | fn read_exact_eof<R: io::Read>( |
440 | rdr: &mut R, |
441 | buf: &mut [u8], |
442 | ) -> io::Result<bool> { |
443 | match rdr.read(buf) { |
444 | // EOF |
445 | Ok(0) => Ok(false), |
446 | // Read everything w/ the read call |
447 | Ok(i: usize) if i == buf.len() => Ok(true), |
448 | // There's some bytes left to fill, which can be deferred to read_exact |
449 | Ok(i: usize) => { |
450 | rdr.read_exact(&mut buf[i..])?; |
451 | Ok(true) |
452 | } |
453 | Err(e: Error) => Err(e), |
454 | } |
455 | } |
456 | |