1 | //! Writer-based compression/decompression streams |
2 | |
3 | use lzma_sys; |
4 | use std::io; |
5 | use std::io::prelude::*; |
6 | |
7 | #[cfg (feature = "tokio" )] |
8 | use futures::Poll; |
9 | #[cfg (feature = "tokio" )] |
10 | use tokio_io::{try_nb, AsyncRead, AsyncWrite}; |
11 | |
12 | use crate::stream::{Action, Check, Status, Stream}; |
13 | |
14 | /// A compression stream which will have uncompressed data written to it and |
15 | /// will write compressed data to an output stream. |
16 | pub struct XzEncoder<W: Write> { |
17 | data: Stream, |
18 | obj: Option<W>, |
19 | buf: Vec<u8>, |
20 | } |
21 | |
22 | /// A compression stream which will have compressed data written to it and |
23 | /// will write uncompressed data to an output stream. |
24 | pub struct XzDecoder<W: Write> { |
25 | data: Stream, |
26 | obj: Option<W>, |
27 | buf: Vec<u8>, |
28 | } |
29 | |
30 | impl<W: Write> XzEncoder<W> { |
31 | /// Create a new compression stream which will compress at the given level |
32 | /// to write compress output to the give output stream. |
33 | pub fn new(obj: W, level: u32) -> XzEncoder<W> { |
34 | let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap(); |
35 | XzEncoder::new_stream(obj, stream) |
36 | } |
37 | |
38 | /// Create a new encoder which will use the specified `Stream` to encode |
39 | /// (compress) data into the provided `obj`. |
40 | pub fn new_stream(obj: W, stream: Stream) -> XzEncoder<W> { |
41 | XzEncoder { |
42 | data: stream, |
43 | obj: Some(obj), |
44 | buf: Vec::with_capacity(32 * 1024), |
45 | } |
46 | } |
47 | |
48 | /// Acquires a reference to the underlying writer. |
49 | pub fn get_ref(&self) -> &W { |
50 | self.obj.as_ref().unwrap() |
51 | } |
52 | |
53 | /// Acquires a mutable reference to the underlying writer. |
54 | /// |
55 | /// Note that mutating the output/input state of the stream may corrupt this |
56 | /// object, so care must be taken when using this method. |
57 | pub fn get_mut(&mut self) -> &mut W { |
58 | self.obj.as_mut().unwrap() |
59 | } |
60 | |
61 | fn dump(&mut self) -> io::Result<()> { |
62 | while self.buf.len() > 0 { |
63 | let n = self.obj.as_mut().unwrap().write(&self.buf)?; |
64 | self.buf.drain(..n); |
65 | } |
66 | Ok(()) |
67 | } |
68 | |
69 | /// Attempt to finish this output stream, writing out final chunks of data. |
70 | /// |
71 | /// Note that this function can only be used once data has finished being |
72 | /// written to the output stream. After this function is called then further |
73 | /// calls to `write` may result in a panic. |
74 | /// |
75 | /// # Panics |
76 | /// |
77 | /// Attempts to write data to this stream may result in a panic after this |
78 | /// function is called. |
79 | pub fn try_finish(&mut self) -> io::Result<()> { |
80 | loop { |
81 | self.dump()?; |
82 | let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?; |
83 | if res == Status::StreamEnd { |
84 | break; |
85 | } |
86 | } |
87 | self.dump() |
88 | } |
89 | |
90 | /// Consumes this encoder, flushing the output stream. |
91 | /// |
92 | /// This will flush the underlying data stream and then return the contained |
93 | /// writer if the flush succeeded. |
94 | /// |
95 | /// Note that this function may not be suitable to call in a situation where |
96 | /// the underlying stream is an asynchronous I/O stream. To finish a stream |
97 | /// the `try_finish` (or `shutdown`) method should be used instead. To |
98 | /// re-acquire ownership of a stream it is safe to call this method after |
99 | /// `try_finish` or `shutdown` has returned `Ok`. |
100 | pub fn finish(mut self) -> io::Result<W> { |
101 | self.try_finish()?; |
102 | Ok(self.obj.take().unwrap()) |
103 | } |
104 | |
105 | /// Returns the number of bytes produced by the compressor |
106 | /// |
107 | /// Note that, due to buffering, this only bears any relation to |
108 | /// `total_in()` after a call to `flush()`. At that point, |
109 | /// `total_out() / total_in()` is the compression ratio. |
110 | pub fn total_out(&self) -> u64 { |
111 | self.data.total_out() |
112 | } |
113 | |
114 | /// Returns the number of bytes consumed by the compressor |
115 | /// (e.g. the number of bytes written to this stream.) |
116 | pub fn total_in(&self) -> u64 { |
117 | self.data.total_in() |
118 | } |
119 | } |
120 | |
121 | impl<W: Write> Write for XzEncoder<W> { |
122 | fn write(&mut self, data: &[u8]) -> io::Result<usize> { |
123 | loop { |
124 | self.dump()?; |
125 | |
126 | let total_in = self.total_in(); |
127 | self.data |
128 | .process_vec(data, &mut self.buf, Action::Run) |
129 | .unwrap(); |
130 | let written = (self.total_in() - total_in) as usize; |
131 | |
132 | if written > 0 || data.len() == 0 { |
133 | return Ok(written); |
134 | } |
135 | } |
136 | } |
137 | |
138 | fn flush(&mut self) -> io::Result<()> { |
139 | loop { |
140 | self.dump()?; |
141 | let status = self |
142 | .data |
143 | .process_vec(&[], &mut self.buf, Action::FullFlush) |
144 | .unwrap(); |
145 | if status == Status::StreamEnd { |
146 | break; |
147 | } |
148 | } |
149 | self.obj.as_mut().unwrap().flush() |
150 | } |
151 | } |
152 | |
153 | #[cfg (feature = "tokio" )] |
154 | impl<W: AsyncWrite> AsyncWrite for XzEncoder<W> { |
155 | fn shutdown(&mut self) -> Poll<(), io::Error> { |
156 | try_nb!(self.try_finish()); |
157 | self.get_mut().shutdown() |
158 | } |
159 | } |
160 | |
161 | impl<W: Read + Write> Read for XzEncoder<W> { |
162 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
163 | self.get_mut().read(buf) |
164 | } |
165 | } |
166 | |
167 | #[cfg (feature = "tokio" )] |
168 | impl<W: AsyncRead + AsyncWrite> AsyncRead for XzEncoder<W> {} |
169 | |
170 | impl<W: Write> Drop for XzEncoder<W> { |
171 | fn drop(&mut self) { |
172 | if self.obj.is_some() { |
173 | let _ = self.try_finish(); |
174 | } |
175 | } |
176 | } |
177 | |
178 | impl<W: Write> XzDecoder<W> { |
179 | /// Creates a new decoding stream which will decode into `obj` one xz stream |
180 | /// from the input written to it. |
181 | pub fn new(obj: W) -> XzDecoder<W> { |
182 | let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap(); |
183 | XzDecoder::new_stream(obj, stream) |
184 | } |
185 | |
186 | /// Creates a new decoding stream which will decode into `obj` all the xz streams |
187 | /// from the input written to it. |
188 | pub fn new_multi_decoder(obj: W) -> XzDecoder<W> { |
189 | let stream = |
190 | Stream::new_stream_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap(); |
191 | XzDecoder::new_stream(obj, stream) |
192 | } |
193 | |
194 | /// Creates a new decoding stream which will decode all input written to it |
195 | /// into `obj`. |
196 | /// |
197 | /// A custom `stream` can be specified to configure what format this decoder |
198 | /// will recognize or configure other various decoding options. |
199 | pub fn new_stream(obj: W, stream: Stream) -> XzDecoder<W> { |
200 | XzDecoder { |
201 | data: stream, |
202 | obj: Some(obj), |
203 | buf: Vec::with_capacity(32 * 1024), |
204 | } |
205 | } |
206 | |
207 | /// Acquires a reference to the underlying writer. |
208 | pub fn get_ref(&self) -> &W { |
209 | self.obj.as_ref().unwrap() |
210 | } |
211 | |
212 | /// Acquires a mutable reference to the underlying writer. |
213 | /// |
214 | /// Note that mutating the output/input state of the stream may corrupt this |
215 | /// object, so care must be taken when using this method. |
216 | pub fn get_mut(&mut self) -> &mut W { |
217 | self.obj.as_mut().unwrap() |
218 | } |
219 | |
220 | fn dump(&mut self) -> io::Result<()> { |
221 | if self.buf.len() > 0 { |
222 | self.obj.as_mut().unwrap().write_all(&self.buf)?; |
223 | self.buf.truncate(0); |
224 | } |
225 | Ok(()) |
226 | } |
227 | |
228 | fn try_finish(&mut self) -> io::Result<()> { |
229 | loop { |
230 | self.dump()?; |
231 | let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?; |
232 | |
233 | // When decoding a truncated file, XZ returns LZMA_BUF_ERROR and |
234 | // decodes no new data, which corresponds to this crate's MemNeeded |
235 | // status. Since we're finishing, we cannot provide more data so |
236 | // this is an error. |
237 | // |
238 | // See the 02_decompress.c example in xz-utils. |
239 | if self.buf.is_empty() && res == Status::MemNeeded { |
240 | let msg = "xz compressed stream is truncated or otherwise corrupt" ; |
241 | return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg)); |
242 | } |
243 | |
244 | if res == Status::StreamEnd { |
245 | break; |
246 | } |
247 | } |
248 | self.dump() |
249 | } |
250 | |
251 | /// Unwrap the underlying writer, finishing the compression stream. |
252 | pub fn finish(&mut self) -> io::Result<W> { |
253 | self.try_finish()?; |
254 | Ok(self.obj.take().unwrap()) |
255 | } |
256 | |
257 | /// Returns the number of bytes produced by the decompressor |
258 | /// |
259 | /// Note that, due to buffering, this only bears any relation to |
260 | /// `total_in()` after a call to `flush()`. At that point, |
261 | /// `total_in() / total_out()` is the compression ratio. |
262 | pub fn total_out(&self) -> u64 { |
263 | self.data.total_out() |
264 | } |
265 | |
266 | /// Returns the number of bytes consumed by the decompressor |
267 | /// (e.g. the number of bytes written to this stream.) |
268 | pub fn total_in(&self) -> u64 { |
269 | self.data.total_in() |
270 | } |
271 | } |
272 | |
273 | impl<W: Write> Write for XzDecoder<W> { |
274 | fn write(&mut self, data: &[u8]) -> io::Result<usize> { |
275 | loop { |
276 | self.dump()?; |
277 | |
278 | let before: u64 = self.total_in(); |
279 | let res: Status = self.data.process_vec(input:data, &mut self.buf, Action::Run)?; |
280 | let written: usize = (self.total_in() - before) as usize; |
281 | |
282 | if written > 0 || data.len() == 0 || res == Status::StreamEnd { |
283 | return Ok(written); |
284 | } |
285 | } |
286 | } |
287 | |
288 | fn flush(&mut self) -> io::Result<()> { |
289 | self.dump()?; |
290 | self.obj.as_mut().unwrap().flush() |
291 | } |
292 | } |
293 | |
294 | #[cfg (feature = "tokio" )] |
295 | impl<W: AsyncWrite> AsyncWrite for XzDecoder<W> { |
296 | fn shutdown(&mut self) -> Poll<(), io::Error> { |
297 | try_nb!(self.try_finish()); |
298 | self.get_mut().shutdown() |
299 | } |
300 | } |
301 | |
302 | impl<W: Read + Write> Read for XzDecoder<W> { |
303 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
304 | self.get_mut().read(buf) |
305 | } |
306 | } |
307 | |
308 | #[cfg (feature = "tokio" )] |
309 | impl<W: AsyncRead + AsyncWrite> AsyncRead for XzDecoder<W> {} |
310 | |
311 | impl<W: Write> Drop for XzDecoder<W> { |
312 | fn drop(&mut self) { |
313 | if self.obj.is_some() { |
314 | let _ = self.try_finish(); |
315 | } |
316 | } |
317 | } |
318 | |
319 | #[cfg (test)] |
320 | mod tests { |
321 | use super::{XzDecoder, XzEncoder}; |
322 | use std::io::prelude::*; |
323 | use std::iter::repeat; |
324 | |
325 | #[test ] |
326 | fn smoke() { |
327 | let d = XzDecoder::new(Vec::new()); |
328 | let mut c = XzEncoder::new(d, 6); |
329 | c.write_all(b"12834" ).unwrap(); |
330 | let s = repeat("12345" ).take(100000).collect::<String>(); |
331 | c.write_all(s.as_bytes()).unwrap(); |
332 | let data = c.finish().unwrap().finish().unwrap(); |
333 | assert_eq!(&data[0..5], b"12834" ); |
334 | assert_eq!(data.len(), 500005); |
335 | assert!(format!("12834 {}" , s).as_bytes() == &*data); |
336 | } |
337 | |
338 | #[test ] |
339 | fn write_empty() { |
340 | let d = XzDecoder::new(Vec::new()); |
341 | let mut c = XzEncoder::new(d, 6); |
342 | c.write(b"" ).unwrap(); |
343 | let data = c.finish().unwrap().finish().unwrap(); |
344 | assert_eq!(&data[..], b"" ); |
345 | } |
346 | |
347 | #[test ] |
348 | fn qc() { |
349 | ::quickcheck::quickcheck(test as fn(_) -> _); |
350 | |
351 | fn test(v: Vec<u8>) -> bool { |
352 | let w = XzDecoder::new(Vec::new()); |
353 | let mut w = XzEncoder::new(w, 6); |
354 | w.write_all(&v).unwrap(); |
355 | v == w.finish().unwrap().finish().unwrap() |
356 | } |
357 | } |
358 | } |
359 | |