1 | /*! |
2 | This module provides a `std::io::Write` implementation: |
3 | |
4 | - `write::FrameEncoder` wraps another `std::io::Write` implemenation, and |
5 | compresses data encoded using the Snappy frame format. Use this if you have |
6 | uncompressed data source and wish to write it as compressed data. |
7 | |
8 | It would also be possible to provide a `write::FrameDecoder`, which decompresses |
9 | data as it writes it, but it hasn't been implemented yet. |
10 | */ |
11 | |
12 | use std::fmt; |
13 | use std::io::{self, Write}; |
14 | |
15 | use crate::compress::Encoder; |
16 | use crate::crc32::CheckSummer; |
17 | pub use crate::error::IntoInnerError; |
18 | use crate::frame::{ |
19 | compress_frame, CHUNK_HEADER_AND_CRC_SIZE, MAX_COMPRESS_BLOCK_SIZE, |
20 | STREAM_IDENTIFIER, |
21 | }; |
22 | use crate::MAX_BLOCK_SIZE; |
23 | |
24 | /// A writer for compressing a Snappy stream. |
25 | /// |
26 | /// This `FrameEncoder` wraps any other writer that implements `io::Write`. |
27 | /// Bytes written to this writer are compressed using the [Snappy frame |
28 | /// format](https://github.com/google/snappy/blob/master/framing_format.txt) |
29 | /// (file extension `sz`, MIME type `application/x-snappy-framed`). |
30 | /// |
31 | /// Writes are buffered automatically, so there's no need to wrap the given |
32 | /// writer in a `std::io::BufWriter`. |
33 | /// |
34 | /// The writer will be flushed automatically when it is dropped. If an error |
35 | /// occurs, it is ignored. |
36 | pub struct FrameEncoder<W: io::Write> { |
37 | /// Our main internal state, split out for borrowck reasons (happily paid). |
38 | /// |
39 | /// Also, it's an `Option` so we can move out of it even though |
40 | /// `FrameEncoder` impls `Drop`. |
41 | inner: Option<Inner<W>>, |
42 | /// Our buffer of uncompressed bytes. This isn't part of `inner` because |
43 | /// we may write bytes directly from the caller if the given buffer was |
44 | /// big enough. As a result, the main `write` implementation needs to |
45 | /// accept either the internal buffer or the caller's bytes directly. Since |
46 | /// `write` requires a mutable borrow, we satisfy the borrow checker by |
47 | /// separating `src` from the rest of the state. |
48 | src: Vec<u8>, |
49 | } |
50 | |
51 | struct Inner<W> { |
52 | /// The underlying writer. |
53 | w: W, |
54 | /// An encoder that we reuse that does the actual block based compression. |
55 | enc: Encoder, |
56 | /// A CRC32 checksummer that is configured to either use the portable |
57 | /// fallback version or the SSE4.2 accelerated version when the right CPU |
58 | /// features are available. |
59 | checksummer: CheckSummer, |
60 | /// The compressed bytes buffer. Bytes are compressed from src (usually) |
61 | /// to dst before being written to w. |
62 | dst: Vec<u8>, |
63 | /// When false, the stream identifier (with magic bytes) must precede the |
64 | /// next write. |
65 | wrote_stream_ident: bool, |
66 | /// Space for writing the header of a chunk before writing it to the |
67 | /// underlying writer. |
68 | chunk_header: [u8; 8], |
69 | } |
70 | |
71 | impl<W: io::Write> FrameEncoder<W> { |
72 | /// Create a new writer for streaming Snappy compression. |
73 | pub fn new(wtr: W) -> FrameEncoder<W> { |
74 | FrameEncoder { |
75 | inner: Some(Inner { |
76 | w: wtr, |
77 | enc: Encoder::new(), |
78 | checksummer: CheckSummer::new(), |
79 | dst: vec![0; MAX_COMPRESS_BLOCK_SIZE], |
80 | wrote_stream_ident: false, |
81 | chunk_header: [0; CHUNK_HEADER_AND_CRC_SIZE], |
82 | }), |
83 | src: Vec::with_capacity(MAX_BLOCK_SIZE), |
84 | } |
85 | } |
86 | |
87 | /// Returns the underlying stream, consuming and flushing this writer. |
88 | /// |
89 | /// If flushing the writer caused an error, then an `IntoInnerError` is |
90 | /// returned, which contains both the writer and the original writer. |
91 | pub fn into_inner(mut self) -> Result<W, IntoInnerError<FrameEncoder<W>>> { |
92 | match self.flush() { |
93 | Ok(()) => Ok(self.inner.take().unwrap().w), |
94 | Err(err) => Err(IntoInnerError::new(self, err)), |
95 | } |
96 | } |
97 | |
98 | /// Gets a reference to the underlying writer in this encoder. |
99 | pub fn get_ref(&self) -> &W { |
100 | &self.inner.as_ref().unwrap().w |
101 | } |
102 | |
103 | /// Gets a reference to the underlying writer in this encoder. |
104 | /// |
105 | /// Note that mutating the output/input state of the stream may corrupt |
106 | /// this encoder, so care must be taken when using this method. |
107 | pub fn get_mut(&mut self) -> &mut W { |
108 | &mut self.inner.as_mut().unwrap().w |
109 | } |
110 | } |
111 | |
112 | impl<W: io::Write> Drop for FrameEncoder<W> { |
113 | fn drop(&mut self) { |
114 | if self.inner.is_some() { |
115 | // Ignore errors because we can't conceivably return an error and |
116 | // panicing in a dtor is bad juju. |
117 | let _ = self.flush(); |
118 | } |
119 | } |
120 | } |
121 | |
122 | impl<W: io::Write> io::Write for FrameEncoder<W> { |
123 | fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> { |
124 | let mut total = 0; |
125 | // If there isn't enough room to add buf to src, then add only a piece |
126 | // of it, flush it and mush on. |
127 | loop { |
128 | let free = self.src.capacity() - self.src.len(); |
129 | // n is the number of bytes extracted from buf. |
130 | let n = if buf.len() <= free { |
131 | break; |
132 | } else if self.src.is_empty() { |
133 | // If buf is bigger than our entire buffer then avoid |
134 | // the indirection and write the buffer directly. |
135 | self.inner.as_mut().unwrap().write(buf)? |
136 | } else { |
137 | self.src.extend_from_slice(&buf[0..free]); |
138 | self.flush()?; |
139 | free |
140 | }; |
141 | buf = &buf[n..]; |
142 | total += n; |
143 | } |
144 | // We're only here if buf.len() will fit within the available space of |
145 | // self.src. |
146 | debug_assert!(buf.len() <= (self.src.capacity() - self.src.len())); |
147 | self.src.extend_from_slice(buf); |
148 | total += buf.len(); |
149 | // We should never expand or contract self.src. |
150 | debug_assert!(self.src.capacity() == MAX_BLOCK_SIZE); |
151 | Ok(total) |
152 | } |
153 | |
154 | fn flush(&mut self) -> io::Result<()> { |
155 | if self.src.is_empty() { |
156 | return Ok(()); |
157 | } |
158 | self.inner.as_mut().unwrap().write(&self.src)?; |
159 | self.src.truncate(0); |
160 | Ok(()) |
161 | } |
162 | } |
163 | |
164 | impl<W: io::Write> Inner<W> { |
165 | fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> { |
166 | let mut total = 0; |
167 | if !self.wrote_stream_ident { |
168 | self.wrote_stream_ident = true; |
169 | self.w.write_all(STREAM_IDENTIFIER)?; |
170 | } |
171 | while !buf.is_empty() { |
172 | // Advance buf and get our block. |
173 | let mut src = buf; |
174 | if src.len() > MAX_BLOCK_SIZE { |
175 | src = &src[0..MAX_BLOCK_SIZE]; |
176 | } |
177 | buf = &buf[src.len()..]; |
178 | |
179 | let frame_data = compress_frame( |
180 | &mut self.enc, |
181 | self.checksummer, |
182 | src, |
183 | &mut self.chunk_header, |
184 | &mut self.dst, |
185 | false, |
186 | )?; |
187 | self.w.write_all(&self.chunk_header)?; |
188 | self.w.write_all(frame_data)?; |
189 | total += src.len(); |
190 | } |
191 | Ok(total) |
192 | } |
193 | } |
194 | |
195 | impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> { |
196 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
197 | f&mut DebugStruct<'_, '_>.debug_struct("FrameEncoder" ) |
198 | .field("inner" , &self.inner) |
199 | .field(name:"src" , &"[...]" ) |
200 | .finish() |
201 | } |
202 | } |
203 | |
204 | impl<W: fmt::Debug + io::Write> fmt::Debug for Inner<W> { |
205 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
206 | f&mut DebugStruct<'_, '_>.debug_struct("Inner" ) |
207 | .field("w" , &self.w) |
208 | .field("enc" , &self.enc) |
209 | .field("checksummer" , &self.checksummer) |
210 | .field("dst" , &"[...]" ) |
211 | .field("wrote_stream_ident" , &self.wrote_stream_ident) |
212 | .field(name:"chunk_header" , &self.chunk_header) |
213 | .finish() |
214 | } |
215 | } |
216 | |