1/*!
2This module provides a `std::io::Write` implementation:
3
4- `write::FrameEncoder` wraps another `std::io::Write` implemenation, and
5 compresses data encoded using the Snappy frame format. Use this if you have
6 uncompressed data source and wish to write it as compressed data.
7
8It would also be possible to provide a `write::FrameDecoder`, which decompresses
9data as it writes it, but it hasn't been implemented yet.
10*/
11
12use std::fmt;
13use std::io::{self, Write};
14
15use crate::compress::Encoder;
16use crate::crc32::CheckSummer;
17pub use crate::error::IntoInnerError;
18use crate::frame::{
19 compress_frame, CHUNK_HEADER_AND_CRC_SIZE, MAX_COMPRESS_BLOCK_SIZE,
20 STREAM_IDENTIFIER,
21};
22use crate::MAX_BLOCK_SIZE;
23
24/// A writer for compressing a Snappy stream.
25///
26/// This `FrameEncoder` wraps any other writer that implements `io::Write`.
27/// Bytes written to this writer are compressed using the [Snappy frame
28/// format](https://github.com/google/snappy/blob/master/framing_format.txt)
29/// (file extension `sz`, MIME type `application/x-snappy-framed`).
30///
31/// Writes are buffered automatically, so there's no need to wrap the given
32/// writer in a `std::io::BufWriter`.
33///
34/// The writer will be flushed automatically when it is dropped. If an error
35/// occurs, it is ignored.
36pub struct FrameEncoder<W: io::Write> {
37 /// Our main internal state, split out for borrowck reasons (happily paid).
38 ///
39 /// Also, it's an `Option` so we can move out of it even though
40 /// `FrameEncoder` impls `Drop`.
41 inner: Option<Inner<W>>,
42 /// Our buffer of uncompressed bytes. This isn't part of `inner` because
43 /// we may write bytes directly from the caller if the given buffer was
44 /// big enough. As a result, the main `write` implementation needs to
45 /// accept either the internal buffer or the caller's bytes directly. Since
46 /// `write` requires a mutable borrow, we satisfy the borrow checker by
47 /// separating `src` from the rest of the state.
48 src: Vec<u8>,
49}
50
51struct Inner<W> {
52 /// The underlying writer.
53 w: W,
54 /// An encoder that we reuse that does the actual block based compression.
55 enc: Encoder,
56 /// A CRC32 checksummer that is configured to either use the portable
57 /// fallback version or the SSE4.2 accelerated version when the right CPU
58 /// features are available.
59 checksummer: CheckSummer,
60 /// The compressed bytes buffer. Bytes are compressed from src (usually)
61 /// to dst before being written to w.
62 dst: Vec<u8>,
63 /// When false, the stream identifier (with magic bytes) must precede the
64 /// next write.
65 wrote_stream_ident: bool,
66 /// Space for writing the header of a chunk before writing it to the
67 /// underlying writer.
68 chunk_header: [u8; 8],
69}
70
71impl<W: io::Write> FrameEncoder<W> {
72 /// Create a new writer for streaming Snappy compression.
73 pub fn new(wtr: W) -> FrameEncoder<W> {
74 FrameEncoder {
75 inner: Some(Inner {
76 w: wtr,
77 enc: Encoder::new(),
78 checksummer: CheckSummer::new(),
79 dst: vec![0; MAX_COMPRESS_BLOCK_SIZE],
80 wrote_stream_ident: false,
81 chunk_header: [0; CHUNK_HEADER_AND_CRC_SIZE],
82 }),
83 src: Vec::with_capacity(MAX_BLOCK_SIZE),
84 }
85 }
86
87 /// Returns the underlying stream, consuming and flushing this writer.
88 ///
89 /// If flushing the writer caused an error, then an `IntoInnerError` is
90 /// returned, which contains both the writer and the original writer.
91 pub fn into_inner(mut self) -> Result<W, IntoInnerError<FrameEncoder<W>>> {
92 match self.flush() {
93 Ok(()) => Ok(self.inner.take().unwrap().w),
94 Err(err) => Err(IntoInnerError::new(self, err)),
95 }
96 }
97
98 /// Gets a reference to the underlying writer in this encoder.
99 pub fn get_ref(&self) -> &W {
100 &self.inner.as_ref().unwrap().w
101 }
102
103 /// Gets a reference to the underlying writer in this encoder.
104 ///
105 /// Note that mutating the output/input state of the stream may corrupt
106 /// this encoder, so care must be taken when using this method.
107 pub fn get_mut(&mut self) -> &mut W {
108 &mut self.inner.as_mut().unwrap().w
109 }
110}
111
112impl<W: io::Write> Drop for FrameEncoder<W> {
113 fn drop(&mut self) {
114 if self.inner.is_some() {
115 // Ignore errors because we can't conceivably return an error and
116 // panicing in a dtor is bad juju.
117 let _ = self.flush();
118 }
119 }
120}
121
122impl<W: io::Write> io::Write for FrameEncoder<W> {
123 fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
124 let mut total = 0;
125 // If there isn't enough room to add buf to src, then add only a piece
126 // of it, flush it and mush on.
127 loop {
128 let free = self.src.capacity() - self.src.len();
129 // n is the number of bytes extracted from buf.
130 let n = if buf.len() <= free {
131 break;
132 } else if self.src.is_empty() {
133 // If buf is bigger than our entire buffer then avoid
134 // the indirection and write the buffer directly.
135 self.inner.as_mut().unwrap().write(buf)?
136 } else {
137 self.src.extend_from_slice(&buf[0..free]);
138 self.flush()?;
139 free
140 };
141 buf = &buf[n..];
142 total += n;
143 }
144 // We're only here if buf.len() will fit within the available space of
145 // self.src.
146 debug_assert!(buf.len() <= (self.src.capacity() - self.src.len()));
147 self.src.extend_from_slice(buf);
148 total += buf.len();
149 // We should never expand or contract self.src.
150 debug_assert!(self.src.capacity() == MAX_BLOCK_SIZE);
151 Ok(total)
152 }
153
154 fn flush(&mut self) -> io::Result<()> {
155 if self.src.is_empty() {
156 return Ok(());
157 }
158 self.inner.as_mut().unwrap().write(&self.src)?;
159 self.src.truncate(0);
160 Ok(())
161 }
162}
163
164impl<W: io::Write> Inner<W> {
165 fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
166 let mut total = 0;
167 if !self.wrote_stream_ident {
168 self.wrote_stream_ident = true;
169 self.w.write_all(STREAM_IDENTIFIER)?;
170 }
171 while !buf.is_empty() {
172 // Advance buf and get our block.
173 let mut src = buf;
174 if src.len() > MAX_BLOCK_SIZE {
175 src = &src[0..MAX_BLOCK_SIZE];
176 }
177 buf = &buf[src.len()..];
178
179 let frame_data = compress_frame(
180 &mut self.enc,
181 self.checksummer,
182 src,
183 &mut self.chunk_header,
184 &mut self.dst,
185 false,
186 )?;
187 self.w.write_all(&self.chunk_header)?;
188 self.w.write_all(frame_data)?;
189 total += src.len();
190 }
191 Ok(total)
192 }
193}
194
195impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> {
196 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
197 f&mut DebugStruct<'_, '_>.debug_struct("FrameEncoder")
198 .field("inner", &self.inner)
199 .field(name:"src", &"[...]")
200 .finish()
201 }
202}
203
204impl<W: fmt::Debug + io::Write> fmt::Debug for Inner<W> {
205 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206 f&mut DebugStruct<'_, '_>.debug_struct("Inner")
207 .field("w", &self.w)
208 .field("enc", &self.enc)
209 .field("checksummer", &self.checksummer)
210 .field("dst", &"[...]")
211 .field("wrote_stream_ident", &self.wrote_stream_ident)
212 .field(name:"chunk_header", &self.chunk_header)
213 .finish()
214 }
215}
216