1//! I/O streams for wrapping `BufRead` types as encoders/decoders
2
3use lzma_sys;
4use std::io;
5use std::io::prelude::*;
6
7#[cfg(feature = "tokio")]
8use futures::Poll;
9#[cfg(feature = "tokio")]
10use tokio_io::{AsyncRead, AsyncWrite};
11
12use crate::stream::{Action, Check, Status, Stream};
13
14/// An xz encoder, or compressor.
15///
16/// This structure implements a `BufRead` interface and will read uncompressed
17/// data from an underlying stream and emit a stream of compressed data.
18pub struct XzEncoder<R> {
19 obj: R,
20 data: Stream,
21}
22
23/// A xz decoder, or decompressor.
24///
25/// This structure implements a `BufRead` interface and takes a stream of
26/// compressed data as input, providing the decompressed data when read from.
27pub struct XzDecoder<R> {
28 obj: R,
29 data: Stream,
30}
31
32impl<R: BufRead> XzEncoder<R> {
33 /// Creates a new encoder which will read uncompressed data from the given
34 /// stream and emit the compressed stream.
35 ///
36 /// The `level` argument here is typically 0-9 with 6 being a good default.
37 pub fn new(r: R, level: u32) -> XzEncoder<R> {
38 let stream: Stream = Stream::new_easy_encoder(preset:level, Check::Crc64).unwrap();
39 XzEncoder::new_stream(r, stream)
40 }
41
42 /// Creates a new encoder with a custom `Stream`.
43 ///
44 /// The `Stream` can be pre-configured for multithreaded encoding, different
45 /// compression options/tuning, etc.
46 pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
47 XzEncoder {
48 obj: r,
49 data: stream,
50 }
51 }
52}
53
54impl<R> XzEncoder<R> {
55 /// Acquires a reference to the underlying stream
56 pub fn get_ref(&self) -> &R {
57 &self.obj
58 }
59
60 /// Acquires a mutable reference to the underlying stream
61 ///
62 /// Note that mutation of the stream may result in surprising results if
63 /// this encoder is continued to be used.
64 pub fn get_mut(&mut self) -> &mut R {
65 &mut self.obj
66 }
67
68 /// Consumes this encoder, returning the underlying reader.
69 pub fn into_inner(self) -> R {
70 self.obj
71 }
72
73 /// Returns the number of bytes produced by the compressor
74 /// (e.g. the number of bytes read from this stream)
75 ///
76 /// Note that, due to buffering, this only bears any relation to
77 /// total_in() when the compressor chooses to flush its data
78 /// (unfortunately, this won't happen in general at the end of the
79 /// stream, because the compressor doesn't know if there's more data
80 /// to come). At that point, `total_out() / total_in()` would be
81 /// the compression ratio.
82 pub fn total_out(&self) -> u64 {
83 self.data.total_out()
84 }
85
86 /// Returns the number of bytes consumed by the compressor
87 /// (e.g. the number of bytes read from the underlying stream)
88 pub fn total_in(&self) -> u64 {
89 self.data.total_in()
90 }
91}
92
93impl<R: BufRead> Read for XzEncoder<R> {
94 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
95 loop {
96 let (read, consumed, eof, ret);
97 {
98 let input = self.obj.fill_buf()?;
99 eof = input.is_empty();
100 let before_out = self.data.total_out();
101 let before_in = self.data.total_in();
102 let action = if eof { Action::Finish } else { Action::Run };
103 ret = self.data.process(input, buf, action);
104 read = (self.data.total_out() - before_out) as usize;
105 consumed = (self.data.total_in() - before_in) as usize;
106 }
107 self.obj.consume(consumed);
108
109 ret.unwrap();
110
111 // If we haven't ready any data and we haven't hit EOF yet, then we
112 // need to keep asking for more data because if we return that 0
113 // bytes of data have been read then it will be interpreted as EOF.
114 if read == 0 && !eof && buf.len() > 0 {
115 continue;
116 }
117 return Ok(read);
118 }
119 }
120}
121
122#[cfg(feature = "tokio")]
123impl<R: AsyncRead + BufRead> AsyncRead for XzEncoder<R> {}
124
125impl<W: Write> Write for XzEncoder<W> {
126 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
127 self.get_mut().write(buf)
128 }
129
130 fn flush(&mut self) -> io::Result<()> {
131 self.get_mut().flush()
132 }
133}
134
135#[cfg(feature = "tokio")]
136impl<R: AsyncWrite> AsyncWrite for XzEncoder<R> {
137 fn shutdown(&mut self) -> Poll<(), io::Error> {
138 self.get_mut().shutdown()
139 }
140}
141
142impl<R: BufRead> XzDecoder<R> {
143 /// Creates a new decoder which will decompress data read from the given
144 /// stream.
145 pub fn new(r: R) -> XzDecoder<R> {
146 let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap();
147 XzDecoder::new_stream(r, stream)
148 }
149
150 /// Creates a new decoder which will decompress data read from the given
151 /// input. All the concatenated xz streams from input will be consumed.
152 pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
153 let stream =
154 Stream::new_auto_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap();
155 XzDecoder::new_stream(r, stream)
156 }
157
158 /// Creates a new decoder with a custom `Stream`.
159 ///
160 /// The `Stream` can be pre-configured for various checks, different
161 /// decompression options/tuning, etc.
162 pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
163 XzDecoder {
164 obj: r,
165 data: stream,
166 }
167 }
168}
169
170impl<R> XzDecoder<R> {
171 /// Acquires a reference to the underlying stream
172 pub fn get_ref(&self) -> &R {
173 &self.obj
174 }
175
176 /// Acquires a mutable reference to the underlying stream
177 ///
178 /// Note that mutation of the stream may result in surprising results if
179 /// this encoder is continued to be used.
180 pub fn get_mut(&mut self) -> &mut R {
181 &mut self.obj
182 }
183
184 /// Consumes this decoder, returning the underlying reader.
185 pub fn into_inner(self) -> R {
186 self.obj
187 }
188
189 /// Returns the number of bytes that the decompressor has consumed.
190 ///
191 /// Note that this will likely be smaller than what the decompressor
192 /// actually read from the underlying stream due to buffering.
193 pub fn total_in(&self) -> u64 {
194 self.data.total_in()
195 }
196
197 /// Returns the number of bytes that the decompressor has produced.
198 pub fn total_out(&self) -> u64 {
199 self.data.total_out()
200 }
201}
202
203impl<R: BufRead> Read for XzDecoder<R> {
204 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
205 loop {
206 let (read, consumed, eof, ret);
207 {
208 let input = self.obj.fill_buf()?;
209 eof = input.is_empty();
210 let before_out = self.data.total_out();
211 let before_in = self.data.total_in();
212 ret = self
213 .data
214 .process(input, buf, if eof { Action::Finish } else { Action::Run });
215 read = (self.data.total_out() - before_out) as usize;
216 consumed = (self.data.total_in() - before_in) as usize;
217 }
218 self.obj.consume(consumed);
219
220 let status = ret?;
221 if read > 0 || eof || buf.len() == 0 {
222 if read == 0 && status != Status::StreamEnd && buf.len() > 0 {
223 return Err(io::Error::new(
224 io::ErrorKind::UnexpectedEof,
225 "premature eof",
226 ));
227 }
228 return Ok(read);
229 }
230 if consumed == 0 {
231 return Err(io::Error::new(
232 io::ErrorKind::InvalidData,
233 "corrupt xz stream",
234 ));
235 }
236 }
237 }
238}
239
240#[cfg(feature = "tokio")]
241impl<R: AsyncRead + BufRead> AsyncRead for XzDecoder<R> {}
242
243impl<W: Write> Write for XzDecoder<W> {
244 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
245 self.get_mut().write(buf)
246 }
247
248 fn flush(&mut self) -> io::Result<()> {
249 self.get_mut().flush()
250 }
251}
252
253#[cfg(feature = "tokio")]
254impl<R: AsyncWrite> AsyncWrite for XzDecoder<R> {
255 fn shutdown(&mut self) -> Poll<(), io::Error> {
256 self.get_mut().shutdown()
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use crate::bufread::{XzDecoder, XzEncoder};
263 use std::io::Read;
264
265 #[test]
266 fn compressed_and_trailing_data() {
267 // Make a vector with compressed data...
268 let mut to_compress: Vec<u8> = Vec::new();
269 const COMPRESSED_ORIG_SIZE: usize = 1024;
270 for num in 0..COMPRESSED_ORIG_SIZE {
271 to_compress.push(num as u8)
272 }
273 let mut encoder = XzEncoder::new(&to_compress[..], 6);
274
275 let mut decoder_input = Vec::new();
276 encoder.read_to_end(&mut decoder_input).unwrap();
277
278 // ...plus additional unrelated trailing data
279 const ADDITIONAL_SIZE: usize = 123;
280 let mut additional_data = Vec::new();
281 for num in 0..ADDITIONAL_SIZE {
282 additional_data.push(((25 + num) % 256) as u8)
283 }
284 decoder_input.extend(&additional_data);
285
286 // Decoder must be able to read the compressed xz stream, and keep the trailing data.
287 let mut decoder_reader = &decoder_input[..];
288 {
289 let mut decoder = XzDecoder::new(&mut decoder_reader);
290 let mut decompressed_data = vec![0u8; to_compress.len()];
291
292 assert_eq!(
293 decoder.read(&mut decompressed_data).unwrap(),
294 COMPRESSED_ORIG_SIZE
295 );
296 assert_eq!(decompressed_data, &to_compress[..]);
297 }
298
299 let mut remaining_data = Vec::new();
300 let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap();
301 assert_eq!(nb_read, ADDITIONAL_SIZE);
302 assert_eq!(remaining_data, &additional_data[..]);
303 }
304}
305