1//! Writer-based compression/decompression streams
2
3use lzma_sys;
4use std::io;
5use std::io::prelude::*;
6
7#[cfg(feature = "tokio")]
8use futures::Poll;
9#[cfg(feature = "tokio")]
10use tokio_io::{try_nb, AsyncRead, AsyncWrite};
11
12use crate::stream::{Action, Check, Status, Stream};
13
14/// A compression stream which will have uncompressed data written to it and
15/// will write compressed data to an output stream.
16pub struct XzEncoder<W: Write> {
17 data: Stream,
18 obj: Option<W>,
19 buf: Vec<u8>,
20}
21
22/// A compression stream which will have compressed data written to it and
23/// will write uncompressed data to an output stream.
24pub struct XzDecoder<W: Write> {
25 data: Stream,
26 obj: Option<W>,
27 buf: Vec<u8>,
28}
29
30impl<W: Write> XzEncoder<W> {
31 /// Create a new compression stream which will compress at the given level
32 /// to write compress output to the give output stream.
33 pub fn new(obj: W, level: u32) -> XzEncoder<W> {
34 let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
35 XzEncoder::new_stream(obj, stream)
36 }
37
38 /// Create a new encoder which will use the specified `Stream` to encode
39 /// (compress) data into the provided `obj`.
40 pub fn new_stream(obj: W, stream: Stream) -> XzEncoder<W> {
41 XzEncoder {
42 data: stream,
43 obj: Some(obj),
44 buf: Vec::with_capacity(32 * 1024),
45 }
46 }
47
48 /// Acquires a reference to the underlying writer.
49 pub fn get_ref(&self) -> &W {
50 self.obj.as_ref().unwrap()
51 }
52
53 /// Acquires a mutable reference to the underlying writer.
54 ///
55 /// Note that mutating the output/input state of the stream may corrupt this
56 /// object, so care must be taken when using this method.
57 pub fn get_mut(&mut self) -> &mut W {
58 self.obj.as_mut().unwrap()
59 }
60
61 fn dump(&mut self) -> io::Result<()> {
62 while self.buf.len() > 0 {
63 let n = self.obj.as_mut().unwrap().write(&self.buf)?;
64 self.buf.drain(..n);
65 }
66 Ok(())
67 }
68
69 /// Attempt to finish this output stream, writing out final chunks of data.
70 ///
71 /// Note that this function can only be used once data has finished being
72 /// written to the output stream. After this function is called then further
73 /// calls to `write` may result in a panic.
74 ///
75 /// # Panics
76 ///
77 /// Attempts to write data to this stream may result in a panic after this
78 /// function is called.
79 pub fn try_finish(&mut self) -> io::Result<()> {
80 loop {
81 self.dump()?;
82 let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
83 if res == Status::StreamEnd {
84 break;
85 }
86 }
87 self.dump()
88 }
89
90 /// Consumes this encoder, flushing the output stream.
91 ///
92 /// This will flush the underlying data stream and then return the contained
93 /// writer if the flush succeeded.
94 ///
95 /// Note that this function may not be suitable to call in a situation where
96 /// the underlying stream is an asynchronous I/O stream. To finish a stream
97 /// the `try_finish` (or `shutdown`) method should be used instead. To
98 /// re-acquire ownership of a stream it is safe to call this method after
99 /// `try_finish` or `shutdown` has returned `Ok`.
100 pub fn finish(mut self) -> io::Result<W> {
101 self.try_finish()?;
102 Ok(self.obj.take().unwrap())
103 }
104
105 /// Returns the number of bytes produced by the compressor
106 ///
107 /// Note that, due to buffering, this only bears any relation to
108 /// `total_in()` after a call to `flush()`. At that point,
109 /// `total_out() / total_in()` is the compression ratio.
110 pub fn total_out(&self) -> u64 {
111 self.data.total_out()
112 }
113
114 /// Returns the number of bytes consumed by the compressor
115 /// (e.g. the number of bytes written to this stream.)
116 pub fn total_in(&self) -> u64 {
117 self.data.total_in()
118 }
119}
120
121impl<W: Write> Write for XzEncoder<W> {
122 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
123 loop {
124 self.dump()?;
125
126 let total_in = self.total_in();
127 self.data
128 .process_vec(data, &mut self.buf, Action::Run)
129 .unwrap();
130 let written = (self.total_in() - total_in) as usize;
131
132 if written > 0 || data.len() == 0 {
133 return Ok(written);
134 }
135 }
136 }
137
138 fn flush(&mut self) -> io::Result<()> {
139 loop {
140 self.dump()?;
141 let status = self
142 .data
143 .process_vec(&[], &mut self.buf, Action::FullFlush)
144 .unwrap();
145 if status == Status::StreamEnd {
146 break;
147 }
148 }
149 self.obj.as_mut().unwrap().flush()
150 }
151}
152
153#[cfg(feature = "tokio")]
154impl<W: AsyncWrite> AsyncWrite for XzEncoder<W> {
155 fn shutdown(&mut self) -> Poll<(), io::Error> {
156 try_nb!(self.try_finish());
157 self.get_mut().shutdown()
158 }
159}
160
161impl<W: Read + Write> Read for XzEncoder<W> {
162 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
163 self.get_mut().read(buf)
164 }
165}
166
167#[cfg(feature = "tokio")]
168impl<W: AsyncRead + AsyncWrite> AsyncRead for XzEncoder<W> {}
169
170impl<W: Write> Drop for XzEncoder<W> {
171 fn drop(&mut self) {
172 if self.obj.is_some() {
173 let _ = self.try_finish();
174 }
175 }
176}
177
178impl<W: Write> XzDecoder<W> {
179 /// Creates a new decoding stream which will decode into `obj` one xz stream
180 /// from the input written to it.
181 pub fn new(obj: W) -> XzDecoder<W> {
182 let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap();
183 XzDecoder::new_stream(obj, stream)
184 }
185
186 /// Creates a new decoding stream which will decode into `obj` all the xz streams
187 /// from the input written to it.
188 pub fn new_multi_decoder(obj: W) -> XzDecoder<W> {
189 let stream =
190 Stream::new_stream_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap();
191 XzDecoder::new_stream(obj, stream)
192 }
193
194 /// Creates a new decoding stream which will decode all input written to it
195 /// into `obj`.
196 ///
197 /// A custom `stream` can be specified to configure what format this decoder
198 /// will recognize or configure other various decoding options.
199 pub fn new_stream(obj: W, stream: Stream) -> XzDecoder<W> {
200 XzDecoder {
201 data: stream,
202 obj: Some(obj),
203 buf: Vec::with_capacity(32 * 1024),
204 }
205 }
206
207 /// Acquires a reference to the underlying writer.
208 pub fn get_ref(&self) -> &W {
209 self.obj.as_ref().unwrap()
210 }
211
212 /// Acquires a mutable reference to the underlying writer.
213 ///
214 /// Note that mutating the output/input state of the stream may corrupt this
215 /// object, so care must be taken when using this method.
216 pub fn get_mut(&mut self) -> &mut W {
217 self.obj.as_mut().unwrap()
218 }
219
220 fn dump(&mut self) -> io::Result<()> {
221 if self.buf.len() > 0 {
222 self.obj.as_mut().unwrap().write_all(&self.buf)?;
223 self.buf.truncate(0);
224 }
225 Ok(())
226 }
227
228 fn try_finish(&mut self) -> io::Result<()> {
229 loop {
230 self.dump()?;
231 let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
232
233 // When decoding a truncated file, XZ returns LZMA_BUF_ERROR and
234 // decodes no new data, which corresponds to this crate's MemNeeded
235 // status. Since we're finishing, we cannot provide more data so
236 // this is an error.
237 //
238 // See the 02_decompress.c example in xz-utils.
239 if self.buf.is_empty() && res == Status::MemNeeded {
240 let msg = "xz compressed stream is truncated or otherwise corrupt";
241 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg));
242 }
243
244 if res == Status::StreamEnd {
245 break;
246 }
247 }
248 self.dump()
249 }
250
251 /// Unwrap the underlying writer, finishing the compression stream.
252 pub fn finish(&mut self) -> io::Result<W> {
253 self.try_finish()?;
254 Ok(self.obj.take().unwrap())
255 }
256
257 /// Returns the number of bytes produced by the decompressor
258 ///
259 /// Note that, due to buffering, this only bears any relation to
260 /// `total_in()` after a call to `flush()`. At that point,
261 /// `total_in() / total_out()` is the compression ratio.
262 pub fn total_out(&self) -> u64 {
263 self.data.total_out()
264 }
265
266 /// Returns the number of bytes consumed by the decompressor
267 /// (e.g. the number of bytes written to this stream.)
268 pub fn total_in(&self) -> u64 {
269 self.data.total_in()
270 }
271}
272
273impl<W: Write> Write for XzDecoder<W> {
274 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
275 loop {
276 self.dump()?;
277
278 let before: u64 = self.total_in();
279 let res: Status = self.data.process_vec(input:data, &mut self.buf, Action::Run)?;
280 let written: usize = (self.total_in() - before) as usize;
281
282 if written > 0 || data.len() == 0 || res == Status::StreamEnd {
283 return Ok(written);
284 }
285 }
286 }
287
288 fn flush(&mut self) -> io::Result<()> {
289 self.dump()?;
290 self.obj.as_mut().unwrap().flush()
291 }
292}
293
294#[cfg(feature = "tokio")]
295impl<W: AsyncWrite> AsyncWrite for XzDecoder<W> {
296 fn shutdown(&mut self) -> Poll<(), io::Error> {
297 try_nb!(self.try_finish());
298 self.get_mut().shutdown()
299 }
300}
301
302impl<W: Read + Write> Read for XzDecoder<W> {
303 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
304 self.get_mut().read(buf)
305 }
306}
307
308#[cfg(feature = "tokio")]
309impl<W: AsyncRead + AsyncWrite> AsyncRead for XzDecoder<W> {}
310
311impl<W: Write> Drop for XzDecoder<W> {
312 fn drop(&mut self) {
313 if self.obj.is_some() {
314 let _ = self.try_finish();
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::{XzDecoder, XzEncoder};
322 use std::io::prelude::*;
323 use std::iter::repeat;
324
325 #[test]
326 fn smoke() {
327 let d = XzDecoder::new(Vec::new());
328 let mut c = XzEncoder::new(d, 6);
329 c.write_all(b"12834").unwrap();
330 let s = repeat("12345").take(100000).collect::<String>();
331 c.write_all(s.as_bytes()).unwrap();
332 let data = c.finish().unwrap().finish().unwrap();
333 assert_eq!(&data[0..5], b"12834");
334 assert_eq!(data.len(), 500005);
335 assert!(format!("12834{}", s).as_bytes() == &*data);
336 }
337
338 #[test]
339 fn write_empty() {
340 let d = XzDecoder::new(Vec::new());
341 let mut c = XzEncoder::new(d, 6);
342 c.write(b"").unwrap();
343 let data = c.finish().unwrap().finish().unwrap();
344 assert_eq!(&data[..], b"");
345 }
346
347 #[test]
348 fn qc() {
349 ::quickcheck::quickcheck(test as fn(_) -> _);
350
351 fn test(v: Vec<u8>) -> bool {
352 let w = XzDecoder::new(Vec::new());
353 let mut w = XzEncoder::new(w, 6);
354 w.write_all(&v).unwrap();
355 v == w.finish().unwrap().finish().unwrap()
356 }
357 }
358}
359