1/*!
2This module provides two `std::io::Read` implementations:
3
4* [`read::FrameDecoder`](struct.FrameDecoder.html)
5 wraps another `std::io::Read` implemenation, and decompresses data encoded
6 using the Snappy frame format. Use this if you have a compressed data source
7 and wish to read it as uncompressed data.
8* [`read::FrameEncoder`](struct.FrameEncoder.html)
9 wraps another `std::io::Read` implemenation, and compresses data encoded
10 using the Snappy frame format. Use this if you have uncompressed data source
11 and wish to read it as compressed data.
12
13Typically, `read::FrameDecoder` is the version that you'll want.
14*/
15
16use std::cmp;
17use std::fmt;
18use std::io;
19
20use crate::bytes;
21use crate::compress::Encoder;
22use crate::crc32::CheckSummer;
23use crate::decompress::{decompress_len, Decoder};
24use crate::error::Error;
25use crate::frame::{
26 compress_frame, ChunkType, CHUNK_HEADER_AND_CRC_SIZE,
27 MAX_COMPRESS_BLOCK_SIZE, STREAM_BODY, STREAM_IDENTIFIER,
28};
29use crate::MAX_BLOCK_SIZE;
30
31/// The maximum size of a compressed block, including the header and stream
32/// identifier, that can be emitted by FrameEncoder.
33const MAX_READ_FRAME_ENCODER_BLOCK_SIZE: usize = STREAM_IDENTIFIER.len()
34 + CHUNK_HEADER_AND_CRC_SIZE
35 + MAX_COMPRESS_BLOCK_SIZE;
36
37/// A reader for decompressing a Snappy stream.
38///
39/// This `FrameDecoder` wraps any other reader that implements `std::io::Read`.
40/// Bytes read from this reader are decompressed using the
41/// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt)
42/// (file extension `sz`, MIME type `application/x-snappy-framed`).
43///
44/// This reader can potentially make many small reads from the underlying
45/// stream depending on its format, therefore, passing in a buffered reader
46/// may be beneficial.
47pub struct FrameDecoder<R: io::Read> {
48 /// The underlying reader.
49 r: R,
50 /// A Snappy decoder that we reuse that does the actual block based
51 /// decompression.
52 dec: Decoder,
53 /// A CRC32 checksummer that is configured to either use the portable
54 /// fallback version or the SSE4.2 accelerated version when the right CPU
55 /// features are available.
56 checksummer: CheckSummer,
57 /// The compressed bytes buffer, taken from the underlying reader.
58 src: Vec<u8>,
59 /// The decompressed bytes buffer. Bytes are decompressed from src to dst
60 /// before being passed back to the caller.
61 dst: Vec<u8>,
62 /// Index into dst: starting point of bytes not yet given back to caller.
63 dsts: usize,
64 /// Index into dst: ending point of bytes not yet given back to caller.
65 dste: usize,
66 /// Whether we've read the special stream header or not.
67 read_stream_ident: bool,
68}
69
70impl<R: io::Read> FrameDecoder<R> {
71 /// Create a new reader for streaming Snappy decompression.
72 pub fn new(rdr: R) -> FrameDecoder<R> {
73 FrameDecoder {
74 r: rdr,
75 dec: Decoder::new(),
76 checksummer: CheckSummer::new(),
77 src: vec![0; MAX_COMPRESS_BLOCK_SIZE],
78 dst: vec![0; MAX_BLOCK_SIZE],
79 dsts: 0,
80 dste: 0,
81 read_stream_ident: false,
82 }
83 }
84
85 /// Gets a reference to the underlying reader in this decoder.
86 pub fn get_ref(&self) -> &R {
87 &self.r
88 }
89
90 /// Gets a mutable reference to the underlying reader in this decoder.
91 ///
92 /// Note that mutation of the stream may result in surprising results if
93 /// this decoder is continued to be used.
94 pub fn get_mut(&mut self) -> &mut R {
95 &mut self.r
96 }
97
98 /// Gets the underlying reader of this decoder.
99 pub fn into_inner(self) -> R {
100 self.r
101 }
102}
103
104impl<R: io::Read> io::Read for FrameDecoder<R> {
105 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
106 macro_rules! fail {
107 ($err:expr) => {
108 return Err(io::Error::from($err))
109 };
110 }
111 loop {
112 if self.dsts < self.dste {
113 let len = cmp::min(self.dste - self.dsts, buf.len());
114 let dste = self.dsts.checked_add(len).unwrap();
115 buf[0..len].copy_from_slice(&self.dst[self.dsts..dste]);
116 self.dsts = dste;
117 return Ok(len);
118 }
119 if !read_exact_eof(&mut self.r, &mut self.src[0..4])? {
120 return Ok(0);
121 }
122 let ty = ChunkType::from_u8(self.src[0]);
123 if !self.read_stream_ident {
124 if ty != Ok(ChunkType::Stream) {
125 fail!(Error::StreamHeader { byte: self.src[0] });
126 }
127 self.read_stream_ident = true;
128 }
129 let len64 = bytes::read_u24_le(&self.src[1..]) as u64;
130 if len64 > self.src.len() as u64 {
131 fail!(Error::UnsupportedChunkLength {
132 len: len64,
133 header: false,
134 });
135 }
136 let len = len64 as usize;
137 match ty {
138 Err(b) if 0x02 <= b && b <= 0x7F => {
139 // Spec says that chunk types 0x02-0x7F are reserved and
140 // conformant decoders must return an error.
141 fail!(Error::UnsupportedChunkType { byte: b });
142 }
143 Err(b) if 0x80 <= b && b <= 0xFD => {
144 // Spec says that chunk types 0x80-0xFD are reserved but
145 // skippable.
146 self.r.read_exact(&mut self.src[0..len])?;
147 }
148 Err(b) => {
149 // Can never happen. 0x02-0x7F and 0x80-0xFD are handled
150 // above in the error case. That leaves 0x00, 0x01, 0xFE
151 // and 0xFF, each of which correspond to one of the four
152 // defined chunk types.
153 unreachable!("BUG: unhandled chunk type: {}", b);
154 }
155 Ok(ChunkType::Padding) => {
156 // Just read and move on.
157 self.r.read_exact(&mut self.src[0..len])?;
158 }
159 Ok(ChunkType::Stream) => {
160 if len != STREAM_BODY.len() {
161 fail!(Error::UnsupportedChunkLength {
162 len: len64,
163 header: true,
164 })
165 }
166 self.r.read_exact(&mut self.src[0..len])?;
167 if &self.src[0..len] != STREAM_BODY {
168 fail!(Error::StreamHeaderMismatch {
169 bytes: self.src[0..len].to_vec(),
170 });
171 }
172 }
173 Ok(ChunkType::Uncompressed) => {
174 if len < 4 {
175 fail!(Error::UnsupportedChunkLength {
176 len: len as u64,
177 header: false,
178 });
179 }
180 let expected_sum = bytes::io_read_u32_le(&mut self.r)?;
181 let n = len - 4;
182 if n > self.dst.len() {
183 fail!(Error::UnsupportedChunkLength {
184 len: n as u64,
185 header: false,
186 });
187 }
188 self.r.read_exact(&mut self.dst[0..n])?;
189 let got_sum =
190 self.checksummer.crc32c_masked(&self.dst[0..n]);
191 if expected_sum != got_sum {
192 fail!(Error::Checksum {
193 expected: expected_sum,
194 got: got_sum,
195 });
196 }
197 self.dsts = 0;
198 self.dste = n;
199 }
200 Ok(ChunkType::Compressed) => {
201 if len < 4 {
202 fail!(Error::UnsupportedChunkLength {
203 len: len as u64,
204 header: false,
205 });
206 }
207 let expected_sum = bytes::io_read_u32_le(&mut self.r)?;
208 let sn = len - 4;
209 if sn > self.src.len() {
210 fail!(Error::UnsupportedChunkLength {
211 len: len64,
212 header: false,
213 });
214 }
215 self.r.read_exact(&mut self.src[0..sn])?;
216 let dn = decompress_len(&self.src)?;
217 if dn > self.dst.len() {
218 fail!(Error::UnsupportedChunkLength {
219 len: dn as u64,
220 header: false,
221 });
222 }
223 self.dec
224 .decompress(&self.src[0..sn], &mut self.dst[0..dn])?;
225 let got_sum =
226 self.checksummer.crc32c_masked(&self.dst[0..dn]);
227 if expected_sum != got_sum {
228 fail!(Error::Checksum {
229 expected: expected_sum,
230 got: got_sum,
231 });
232 }
233 self.dsts = 0;
234 self.dste = dn;
235 }
236 }
237 }
238 }
239}
240
241impl<R: fmt::Debug + io::Read> fmt::Debug for FrameDecoder<R> {
242 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
243 f&mut DebugStruct<'_, '_>.debug_struct("FrameDecoder")
244 .field("r", &self.r)
245 .field("dec", &self.dec)
246 .field("checksummer", &self.checksummer)
247 .field("src", &"[...]")
248 .field("dst", &"[...]")
249 .field("dsts", &self.dsts)
250 .field("dste", &self.dste)
251 .field(name:"read_stream_ident", &self.read_stream_ident)
252 .finish()
253 }
254}
255
256/// A reader for compressing data using snappy as it is read.
257///
258/// This `FrameEncoder` wraps any other reader that implements `std::io::Read`.
259/// Bytes read from this reader are compressed using the
260/// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt)
261/// (file extension `sz`, MIME type `application/x-snappy-framed`).
262///
263/// Usually you'll want
264/// [`read::FrameDecoder`](struct.FrameDecoder.html)
265/// (for decompressing while reading) or
266/// [`write::FrameEncoder`](../write/struct.FrameEncoder.html)
267/// (for compressing while writing) instead.
268///
269/// Unlike `FrameDecoder`, this will attempt to make large reads roughly
270/// equivalent to the size of a single Snappy block. Therefore, callers may not
271/// benefit from using a buffered reader.
272pub struct FrameEncoder<R: io::Read> {
273 /// Internally, we split `FrameEncoder` in two to keep the borrow checker
274 /// happy. The `inner` member contains everything that `read_frame` needs
275 /// to fetch a frame's worth of data and compress it.
276 inner: Inner<R>,
277 /// Data that we've encoded and are ready to return to our caller.
278 dst: Vec<u8>,
279 /// Starting point of bytes in `dst` not yet given back to the caller.
280 dsts: usize,
281 /// Ending point of bytes in `dst` that we want to give to our caller.
282 dste: usize,
283}
284
285struct Inner<R: io::Read> {
286 /// The underlying data source.
287 r: R,
288 /// An encoder that we reuse that does the actual block based compression.
289 enc: Encoder,
290 /// A CRC32 checksummer that is configured to either use the portable
291 /// fallback version or the SSE4.2 accelerated version when the right CPU
292 /// features are available.
293 checksummer: CheckSummer,
294 /// Data taken from the underlying `r`, and not yet compressed.
295 src: Vec<u8>,
296 /// Have we written the standard snappy header to `dst` yet?
297 wrote_stream_ident: bool,
298}
299
300impl<R: io::Read> FrameEncoder<R> {
301 /// Create a new reader for streaming Snappy compression.
302 pub fn new(rdr: R) -> FrameEncoder<R> {
303 FrameEncoder {
304 inner: Inner {
305 r: rdr,
306 enc: Encoder::new(),
307 checksummer: CheckSummer::new(),
308 src: vec![0; MAX_BLOCK_SIZE],
309 wrote_stream_ident: false,
310 },
311 dst: vec![0; MAX_READ_FRAME_ENCODER_BLOCK_SIZE],
312 dsts: 0,
313 dste: 0,
314 }
315 }
316
317 /// Gets a reference to the underlying reader in this decoder.
318 pub fn get_ref(&self) -> &R {
319 &self.inner.r
320 }
321
322 /// Gets a mutable reference to the underlying reader in this decoder.
323 ///
324 /// Note that mutation of the stream may result in surprising results if
325 /// this encoder is continued to be used.
326 pub fn get_mut(&mut self) -> &mut R {
327 &mut self.inner.r
328 }
329
330 /// Read previously compressed data from `self.dst`, returning the number of
331 /// bytes read. If `self.dst` is empty, returns 0.
332 fn read_from_dst(&mut self, buf: &mut [u8]) -> usize {
333 let available_bytes = self.dste - self.dsts;
334 let count = cmp::min(available_bytes, buf.len());
335 buf[..count].copy_from_slice(&self.dst[self.dsts..self.dsts + count]);
336 self.dsts += count;
337 count
338 }
339}
340
341impl<R: io::Read> io::Read for FrameEncoder<R> {
342 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
343 // Try reading previously compressed bytes from our `dst` buffer, if
344 // any.
345 let count: usize = self.read_from_dst(buf);
346
347 if count > 0 {
348 // We had some bytes in our `dst` buffer that we used.
349 Ok(count)
350 } else if buf.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE {
351 // Our output `buf` is big enough that we can directly write into
352 // it, so bypass `dst` entirely.
353 self.inner.read_frame(dst:buf)
354 } else {
355 // We need to refill `self.dst`, and then return some bytes from
356 // that.
357 let count: usize = self.inner.read_frame(&mut self.dst)?;
358 self.dsts = 0;
359 self.dste = count;
360 Ok(self.read_from_dst(buf))
361 }
362 }
363}
364
365impl<R: io::Read> Inner<R> {
366 /// Read from `self.r`, and create a new frame, writing it to `dst`, which
367 /// must be at least `MAX_READ_FRAME_ENCODER_BLOCK_SIZE` bytes in size.
368 fn read_frame(&mut self, dst: &mut [u8]) -> io::Result<usize> {
369 debug_assert!(dst.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE);
370
371 // We make one read to the underlying reader. If the underlying reader
372 // doesn't fill the buffer but there are still bytes to be read, then
373 // compression won't be optimal. The alternative would be to block
374 // until our buffer is maximally full (or we see EOF), but this seems
375 // more surprising. In general, io::Read implementations should try to
376 // fill the caller's buffer as much as they can, so this seems like the
377 // better choice.
378 let nread = self.r.read(&mut self.src)?;
379 if nread == 0 {
380 return Ok(0);
381 }
382
383 // If we haven't yet written the stream header to `dst`, write it.
384 let mut dst_write_start = 0;
385 if !self.wrote_stream_ident {
386 dst[0..STREAM_IDENTIFIER.len()].copy_from_slice(STREAM_IDENTIFIER);
387 dst_write_start += STREAM_IDENTIFIER.len();
388 self.wrote_stream_ident = true;
389 }
390
391 // Reserve space for our chunk header. We need to use `split_at_mut` so
392 // that we can get two mutable slices pointing at non-overlapping parts
393 // of `dst`.
394 let (chunk_header, remaining_dst) =
395 dst[dst_write_start..].split_at_mut(CHUNK_HEADER_AND_CRC_SIZE);
396 dst_write_start += CHUNK_HEADER_AND_CRC_SIZE;
397
398 // Compress our frame if possible, telling `compress_frame` to always
399 // put the output in `dst`.
400 let frame_data = compress_frame(
401 &mut self.enc,
402 self.checksummer,
403 &self.src[..nread],
404 chunk_header,
405 remaining_dst,
406 true,
407 )?;
408 Ok(dst_write_start + frame_data.len())
409 }
410}
411
412impl<R: fmt::Debug + io::Read> fmt::Debug for FrameEncoder<R> {
413 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
414 f&mut DebugStruct<'_, '_>.debug_struct("FrameEncoder")
415 .field("inner", &self.inner)
416 .field("dst", &"[...]")
417 .field("dsts", &self.dsts)
418 .field(name:"dste", &self.dste)
419 .finish()
420 }
421}
422
423impl<R: fmt::Debug + io::Read> fmt::Debug for Inner<R> {
424 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
425 f&mut DebugStruct<'_, '_>.debug_struct("Inner")
426 .field("r", &self.r)
427 .field("enc", &self.enc)
428 .field("checksummer", &self.checksummer)
429 .field("src", &"[...]")
430 .field(name:"wrote_stream_ident", &self.wrote_stream_ident)
431 .finish()
432 }
433}
434
435// read_exact_eof is like Read::read_exact, except it detects EOF
436// and returns Ok(false) instead of an error.
437//
438// If buf was read successfully, it returns Ok(true).
439fn read_exact_eof<R: io::Read>(
440 rdr: &mut R,
441 buf: &mut [u8],
442) -> io::Result<bool> {
443 match rdr.read(buf) {
444 // EOF
445 Ok(0) => Ok(false),
446 // Read everything w/ the read call
447 Ok(i: usize) if i == buf.len() => Ok(true),
448 // There's some bytes left to fill, which can be deferred to read_exact
449 Ok(i: usize) => {
450 rdr.read_exact(&mut buf[i..])?;
451 Ok(true)
452 }
453 Err(e: Error) => Err(e),
454 }
455}
456