1//! Reader-based compression/decompression streams
2
3use std::io::prelude::*;
4use std::io::{self, BufReader};
5
6#[cfg(feature = "tokio")]
7use futures::Poll;
8#[cfg(feature = "tokio")]
9use tokio_io::{AsyncRead, AsyncWrite};
10
11use crate::bufread;
12use crate::stream::Stream;
13
14/// A compression stream which wraps an uncompressed stream of data. Compressed
15/// data will be read from the stream.
16pub struct XzEncoder<R: Read> {
17 inner: bufread::XzEncoder<BufReader<R>>,
18}
19
20/// A decompression stream which wraps a compressed stream of data. Decompressed
21/// data will be read from the stream.
22pub struct XzDecoder<R: Read> {
23 inner: bufread::XzDecoder<BufReader<R>>,
24}
25
26impl<R: Read> XzEncoder<R> {
27 /// Create a new compression stream which will compress at the given level
28 /// to read compress output to the give output stream.
29 ///
30 /// The `level` argument here is typically 0-9 with 6 being a good default.
31 pub fn new(r: R, level: u32) -> XzEncoder<R> {
32 XzEncoder {
33 inner: bufread::XzEncoder::new(BufReader::new(r), level),
34 }
35 }
36
37 /// Creates a new encoder with a custom `Stream`.
38 ///
39 /// The `Stream` can be pre-configured for multithreaded encoding, different
40 /// compression options/tuning, etc.
41 pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
42 XzEncoder {
43 inner: bufread::XzEncoder::new_stream(BufReader::new(r), stream),
44 }
45 }
46
47 /// Acquires a reference to the underlying stream
48 pub fn get_ref(&self) -> &R {
49 self.inner.get_ref().get_ref()
50 }
51
52 /// Acquires a mutable reference to the underlying stream
53 ///
54 /// Note that mutation of the stream may result in surprising results if
55 /// this encoder is continued to be used.
56 pub fn get_mut(&mut self) -> &mut R {
57 self.inner.get_mut().get_mut()
58 }
59
60 /// Unwrap the underlying writer, finishing the compression stream.
61 pub fn into_inner(self) -> R {
62 self.inner.into_inner().into_inner()
63 }
64
65 /// Returns the number of bytes produced by the compressor
66 /// (e.g. the number of bytes read from this stream)
67 ///
68 /// Note that, due to buffering, this only bears any relation to
69 /// total_in() when the compressor chooses to flush its data
70 /// (unfortunately, this won't happen this won't happen in general
71 /// at the end of the stream, because the compressor doesn't know
72 /// if there's more data to come). At that point,
73 /// `total_out() / total_in()` would be the compression ratio.
74 pub fn total_out(&self) -> u64 {
75 self.inner.total_out()
76 }
77
78 /// Returns the number of bytes consumed by the compressor
79 /// (e.g. the number of bytes read from the underlying stream)
80 pub fn total_in(&self) -> u64 {
81 self.inner.total_in()
82 }
83}
84
85impl<R: Read> Read for XzEncoder<R> {
86 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
87 self.inner.read(buf)
88 }
89}
90
91#[cfg(feature = "tokio")]
92impl<R: AsyncRead> AsyncRead for XzEncoder<R> {}
93
94impl<W: Write + Read> Write for XzEncoder<W> {
95 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
96 self.get_mut().write(buf)
97 }
98
99 fn flush(&mut self) -> io::Result<()> {
100 self.get_mut().flush()
101 }
102}
103
104#[cfg(feature = "tokio")]
105impl<R: AsyncWrite + Read> AsyncWrite for XzEncoder<R> {
106 fn shutdown(&mut self) -> Poll<(), io::Error> {
107 self.get_mut().shutdown()
108 }
109}
110
111impl<R: Read> XzDecoder<R> {
112 /// Create a new decompression stream, which will read compressed
113 /// data from the given input stream, and decompress one xz stream.
114 /// It may also consume input data that follows the xz stream.
115 /// Use [`xz::bufread::XzDecoder`] instead to process a mix of xz and non-xz data.
116 pub fn new(r: R) -> XzDecoder<R> {
117 XzDecoder {
118 inner: bufread::XzDecoder::new(BufReader::new(r)),
119 }
120 }
121
122 /// Create a new decompression stream, which will read compressed
123 /// data from the given input and decompress all the xz stream it contains.
124 pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
125 XzDecoder {
126 inner: bufread::XzDecoder::new_multi_decoder(BufReader::new(r)),
127 }
128 }
129
130 /// Creates a new decoder with a custom `Stream`.
131 ///
132 /// The `Stream` can be pre-configured for various checks, different
133 /// decompression options/tuning, etc.
134 pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
135 XzDecoder {
136 inner: bufread::XzDecoder::new_stream(BufReader::new(r), stream),
137 }
138 }
139
140 /// Acquires a reference to the underlying stream
141 pub fn get_ref(&self) -> &R {
142 self.inner.get_ref().get_ref()
143 }
144
145 /// Acquires a mutable reference to the underlying stream
146 ///
147 /// Note that mutation of the stream may result in surprising results if
148 /// this encoder is continued to be used.
149 pub fn get_mut(&mut self) -> &mut R {
150 self.inner.get_mut().get_mut()
151 }
152
153 /// Unwrap the underlying writer, finishing the compression stream.
154 pub fn into_inner(self) -> R {
155 self.inner.into_inner().into_inner()
156 }
157
158 /// Returns the number of bytes produced by the decompressor
159 /// (e.g. the number of bytes read from this stream)
160 ///
161 /// Note that, due to buffering, this only bears any relation to
162 /// total_in() when the decompressor reaches a sync point
163 /// (e.g. where the original compressed stream was flushed).
164 /// At that point, `total_in() / total_out()` is the compression ratio.
165 pub fn total_out(&self) -> u64 {
166 self.inner.total_out()
167 }
168
169 /// Returns the number of bytes consumed by the decompressor
170 /// (e.g. the number of bytes read from the underlying stream)
171 pub fn total_in(&self) -> u64 {
172 self.inner.total_in()
173 }
174}
175
176impl<R: Read> Read for XzDecoder<R> {
177 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
178 self.inner.read(buf)
179 }
180}
181
182#[cfg(feature = "tokio")]
183impl<R: AsyncRead + Read> AsyncRead for XzDecoder<R> {}
184
185impl<W: Write + Read> Write for XzDecoder<W> {
186 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
187 self.get_mut().write(buf)
188 }
189
190 fn flush(&mut self) -> io::Result<()> {
191 self.get_mut().flush()
192 }
193}
194
195#[cfg(feature = "tokio")]
196impl<R: AsyncWrite + Read> AsyncWrite for XzDecoder<R> {
197 fn shutdown(&mut self) -> Poll<(), io::Error> {
198 self.get_mut().shutdown()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use crate::read::{XzDecoder, XzEncoder};
205 use rand::{thread_rng, Rng};
206 use std::io::prelude::*;
207 use std::iter;
208
209 #[test]
210 fn smoke() {
211 let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
212 let mut c = XzEncoder::new(m, 6);
213 let mut data = vec![];
214 c.read_to_end(&mut data).unwrap();
215 let mut d = XzDecoder::new(&data[..]);
216 let mut data2 = Vec::new();
217 d.read_to_end(&mut data2).unwrap();
218 assert_eq!(data2, m);
219 }
220
221 #[test]
222 fn smoke2() {
223 let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
224 let c = XzEncoder::new(m, 6);
225 let mut d = XzDecoder::new(c);
226 let mut data = vec![];
227 d.read_to_end(&mut data).unwrap();
228 assert_eq!(data, [1, 2, 3, 4, 5, 6, 7, 8]);
229 }
230
231 #[test]
232 fn smoke3() {
233 let m = vec![3u8; 128 * 1024 + 1];
234 let c = XzEncoder::new(&m[..], 6);
235 let mut d = XzDecoder::new(c);
236 let mut data = vec![];
237 d.read_to_end(&mut data).unwrap();
238 assert!(data == &m[..]);
239 }
240
241 #[test]
242 fn self_terminating() {
243 let m = vec![3u8; 128 * 1024 + 1];
244 let mut c = XzEncoder::new(&m[..], 6);
245
246 let mut result = Vec::new();
247 c.read_to_end(&mut result).unwrap();
248
249 let mut rng = thread_rng();
250 let v = iter::repeat_with(|| rng.gen::<u8>())
251 .take(1024)
252 .collect::<Vec<_>>();
253 for _ in 0..200 {
254 result.extend(v.iter().map(|x| *x));
255 }
256
257 let mut d = XzDecoder::new(&result[..]);
258 let mut data = Vec::with_capacity(m.len());
259 unsafe {
260 data.set_len(m.len());
261 }
262 assert!(d.read(&mut data).unwrap() == m.len());
263 assert!(data == &m[..]);
264 }
265
266 #[test]
267 fn zero_length_read_at_eof() {
268 let m = Vec::new();
269 let mut c = XzEncoder::new(&m[..], 6);
270
271 let mut result = Vec::new();
272 c.read_to_end(&mut result).unwrap();
273
274 let mut d = XzDecoder::new(&result[..]);
275 let mut data = Vec::new();
276 assert!(d.read(&mut data).unwrap() == 0);
277 }
278
279 #[test]
280 fn zero_length_read_with_data() {
281 let m = vec![3u8; 128 * 1024 + 1];
282 let mut c = XzEncoder::new(&m[..], 6);
283
284 let mut result = Vec::new();
285 c.read_to_end(&mut result).unwrap();
286
287 let mut d = XzDecoder::new(&result[..]);
288 let mut data = Vec::new();
289 assert!(d.read(&mut data).unwrap() == 0);
290 }
291
292 #[test]
293 fn qc() {
294 ::quickcheck::quickcheck(test as fn(_) -> _);
295
296 fn test(v: Vec<u8>) -> bool {
297 let r = XzEncoder::new(&v[..], 6);
298 let mut r = XzDecoder::new(r);
299 let mut v2 = Vec::new();
300 r.read_to_end(&mut v2).unwrap();
301 v == v2
302 }
303 }
304
305 #[test]
306 fn two_streams() {
307 let mut input_stream1: Vec<u8> = Vec::new();
308 let mut input_stream2: Vec<u8> = Vec::new();
309 let mut all_input: Vec<u8> = Vec::new();
310
311 // Generate input data.
312 const STREAM1_SIZE: usize = 1024;
313 for num in 0..STREAM1_SIZE {
314 input_stream1.push(num as u8)
315 }
316 const STREAM2_SIZE: usize = 532;
317 for num in 0..STREAM2_SIZE {
318 input_stream2.push((num + 32) as u8)
319 }
320 all_input.extend(&input_stream1);
321 all_input.extend(&input_stream2);
322
323 // Make a vector with compressed data
324 let mut decoder_input = Vec::new();
325 {
326 let mut encoder = XzEncoder::new(&input_stream1[..], 6);
327 encoder.read_to_end(&mut decoder_input).unwrap();
328 }
329 {
330 let mut encoder = XzEncoder::new(&input_stream2[..], 6);
331 encoder.read_to_end(&mut decoder_input).unwrap();
332 }
333
334 // Decoder must be able to read the 2 concatenated xz streams and get the same data as input.
335 let mut decoder_reader = &decoder_input[..];
336 {
337 // using `XzDecoder::new` here would fail because only 1 xz stream would be processed.
338 let mut decoder = XzDecoder::new_multi_decoder(&mut decoder_reader);
339 let mut decompressed_data = vec![0u8; all_input.len()];
340
341 assert_eq!(
342 decoder.read(&mut decompressed_data).unwrap(),
343 all_input.len()
344 );
345 assert_eq!(decompressed_data, &all_input[..]);
346 }
347 }
348}
349