1 | use std::cmp; |
2 | use std::io; |
3 | use std::io::prelude::*; |
4 | |
5 | use super::bufread::{corrupt, read_gz_header}; |
6 | use super::{GzBuilder, GzHeader}; |
7 | use crate::crc::{Crc, CrcWriter}; |
8 | use crate::zio; |
9 | use crate::{Compress, Compression, Decompress, Status}; |
10 | |
11 | /// A gzip streaming encoder |
12 | /// |
13 | /// This structure exposes a [`Write`] interface that will emit compressed data |
14 | /// to the underlying writer `W`. |
15 | /// |
16 | /// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html |
17 | /// |
18 | /// # Examples |
19 | /// |
20 | /// ``` |
21 | /// use std::io::prelude::*; |
22 | /// use flate2::Compression; |
23 | /// use flate2::write::GzEncoder; |
24 | /// |
25 | /// // Vec<u8> implements Write to print the compressed bytes of sample string |
26 | /// # fn main() { |
27 | /// |
28 | /// let mut e = GzEncoder::new(Vec::new(), Compression::default()); |
29 | /// e.write_all(b"Hello World" ).unwrap(); |
30 | /// println!("{:?}" , e.finish().unwrap()); |
31 | /// # } |
32 | /// ``` |
33 | #[derive (Debug)] |
34 | pub struct GzEncoder<W: Write> { |
35 | inner: zio::Writer<W, Compress>, |
36 | crc: Crc, |
37 | crc_bytes_written: usize, |
38 | header: Vec<u8>, |
39 | } |
40 | |
41 | pub fn gz_encoder<W: Write>(header: Vec<u8>, w: W, lvl: Compression) -> GzEncoder<W> { |
42 | GzEncoder { |
43 | inner: zio::Writer::new(w, d:Compress::new(level:lvl, zlib_header:false)), |
44 | crc: Crc::new(), |
45 | header, |
46 | crc_bytes_written: 0, |
47 | } |
48 | } |
49 | |
50 | impl<W: Write> GzEncoder<W> { |
51 | /// Creates a new encoder which will use the given compression level. |
52 | /// |
53 | /// The encoder is not configured specially for the emitted header. For |
54 | /// header configuration, see the `GzBuilder` type. |
55 | /// |
56 | /// The data written to the returned encoder will be compressed and then |
57 | /// written to the stream `w`. |
58 | pub fn new(w: W, level: Compression) -> GzEncoder<W> { |
59 | GzBuilder::new().write(w, level) |
60 | } |
61 | |
62 | /// Acquires a reference to the underlying writer. |
63 | pub fn get_ref(&self) -> &W { |
64 | self.inner.get_ref() |
65 | } |
66 | |
67 | /// Acquires a mutable reference to the underlying writer. |
68 | /// |
69 | /// Note that mutation of the writer may result in surprising results if |
70 | /// this encoder is continued to be used. |
71 | pub fn get_mut(&mut self) -> &mut W { |
72 | self.inner.get_mut() |
73 | } |
74 | |
75 | /// Attempt to finish this output stream, writing out final chunks of data. |
76 | /// |
77 | /// Note that this function can only be used once data has finished being |
78 | /// written to the output stream. After this function is called then further |
79 | /// calls to `write` may result in a panic. |
80 | /// |
81 | /// # Panics |
82 | /// |
83 | /// Attempts to write data to this stream may result in a panic after this |
84 | /// function is called. |
85 | /// |
86 | /// # Errors |
87 | /// |
88 | /// This function will perform I/O to complete this stream, and any I/O |
89 | /// errors which occur will be returned from this function. |
90 | pub fn try_finish(&mut self) -> io::Result<()> { |
91 | self.write_header()?; |
92 | self.inner.finish()?; |
93 | |
94 | while self.crc_bytes_written < 8 { |
95 | let (sum, amt) = (self.crc.sum(), self.crc.amount()); |
96 | let buf = [ |
97 | (sum >> 0) as u8, |
98 | (sum >> 8) as u8, |
99 | (sum >> 16) as u8, |
100 | (sum >> 24) as u8, |
101 | (amt >> 0) as u8, |
102 | (amt >> 8) as u8, |
103 | (amt >> 16) as u8, |
104 | (amt >> 24) as u8, |
105 | ]; |
106 | let inner = self.inner.get_mut(); |
107 | let n = inner.write(&buf[self.crc_bytes_written..])?; |
108 | self.crc_bytes_written += n; |
109 | } |
110 | Ok(()) |
111 | } |
112 | |
113 | /// Finish encoding this stream, returning the underlying writer once the |
114 | /// encoding is done. |
115 | /// |
116 | /// Note that this function may not be suitable to call in a situation where |
117 | /// the underlying stream is an asynchronous I/O stream. To finish a stream |
118 | /// the `try_finish` (or `shutdown`) method should be used instead. To |
119 | /// re-acquire ownership of a stream it is safe to call this method after |
120 | /// `try_finish` or `shutdown` has returned `Ok`. |
121 | /// |
122 | /// # Errors |
123 | /// |
124 | /// This function will perform I/O to complete this stream, and any I/O |
125 | /// errors which occur will be returned from this function. |
126 | pub fn finish(mut self) -> io::Result<W> { |
127 | self.try_finish()?; |
128 | Ok(self.inner.take_inner()) |
129 | } |
130 | |
131 | fn write_header(&mut self) -> io::Result<()> { |
132 | while !self.header.is_empty() { |
133 | let n = self.inner.get_mut().write(&self.header)?; |
134 | self.header.drain(..n); |
135 | } |
136 | Ok(()) |
137 | } |
138 | } |
139 | |
140 | impl<W: Write> Write for GzEncoder<W> { |
141 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
142 | assert_eq!(self.crc_bytes_written, 0); |
143 | self.write_header()?; |
144 | let n: usize = self.inner.write(buf)?; |
145 | self.crc.update(&buf[..n]); |
146 | Ok(n) |
147 | } |
148 | |
149 | fn flush(&mut self) -> io::Result<()> { |
150 | assert_eq!(self.crc_bytes_written, 0); |
151 | self.write_header()?; |
152 | self.inner.flush() |
153 | } |
154 | } |
155 | |
156 | impl<R: Read + Write> Read for GzEncoder<R> { |
157 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
158 | self.get_mut().read(buf) |
159 | } |
160 | } |
161 | |
162 | impl<W: Write> Drop for GzEncoder<W> { |
163 | fn drop(&mut self) { |
164 | if self.inner.is_present() { |
165 | let _ = self.try_finish(); |
166 | } |
167 | } |
168 | } |
169 | |
170 | /// A gzip streaming decoder |
171 | /// |
172 | /// This structure exposes a [`Write`] interface that will emit uncompressed data |
173 | /// to the underlying writer `W`. |
174 | /// |
175 | /// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html |
176 | /// |
177 | /// # Examples |
178 | /// |
179 | /// ``` |
180 | /// use std::io::prelude::*; |
181 | /// use std::io; |
182 | /// use flate2::Compression; |
183 | /// use flate2::write::{GzEncoder, GzDecoder}; |
184 | /// |
185 | /// # fn main() { |
186 | /// # let mut e = GzEncoder::new(Vec::new(), Compression::default()); |
187 | /// # e.write(b"Hello World" ).unwrap(); |
188 | /// # let bytes = e.finish().unwrap(); |
189 | /// # assert_eq!("Hello World" , decode_writer(bytes).unwrap()); |
190 | /// # } |
191 | /// // Uncompresses a gzip encoded vector of bytes and returns a string or error |
192 | /// // Here Vec<u8> implements Write |
193 | /// fn decode_writer(bytes: Vec<u8>) -> io::Result<String> { |
194 | /// let mut writer = Vec::new(); |
195 | /// let mut decoder = GzDecoder::new(writer); |
196 | /// decoder.write_all(&bytes[..])?; |
197 | /// writer = decoder.finish()?; |
198 | /// let return_string = String::from_utf8(writer).expect("String parsing error" ); |
199 | /// Ok(return_string) |
200 | /// } |
201 | /// ``` |
202 | #[derive (Debug)] |
203 | pub struct GzDecoder<W: Write> { |
204 | inner: zio::Writer<CrcWriter<W>, Decompress>, |
205 | crc_bytes: Vec<u8>, |
206 | header: Option<GzHeader>, |
207 | header_buf: Vec<u8>, |
208 | } |
209 | |
210 | const CRC_BYTES_LEN: usize = 8; |
211 | |
212 | impl<W: Write> GzDecoder<W> { |
213 | /// Creates a new decoder which will write uncompressed data to the stream. |
214 | /// |
215 | /// When this encoder is dropped or unwrapped the final pieces of data will |
216 | /// be flushed. |
217 | pub fn new(w: W) -> GzDecoder<W> { |
218 | GzDecoder { |
219 | inner: zio::Writer::new(CrcWriter::new(w), Decompress::new(false)), |
220 | crc_bytes: Vec::with_capacity(CRC_BYTES_LEN), |
221 | header: None, |
222 | header_buf: Vec::new(), |
223 | } |
224 | } |
225 | |
226 | /// Returns the header associated with this stream. |
227 | pub fn header(&self) -> Option<&GzHeader> { |
228 | self.header.as_ref() |
229 | } |
230 | |
231 | /// Acquires a reference to the underlying writer. |
232 | pub fn get_ref(&self) -> &W { |
233 | self.inner.get_ref().get_ref() |
234 | } |
235 | |
236 | /// Acquires a mutable reference to the underlying writer. |
237 | /// |
238 | /// Note that mutating the output/input state of the stream may corrupt this |
239 | /// object, so care must be taken when using this method. |
240 | pub fn get_mut(&mut self) -> &mut W { |
241 | self.inner.get_mut().get_mut() |
242 | } |
243 | |
244 | /// Attempt to finish this output stream, writing out final chunks of data. |
245 | /// |
246 | /// Note that this function can only be used once data has finished being |
247 | /// written to the output stream. After this function is called then further |
248 | /// calls to `write` may result in a panic. |
249 | /// |
250 | /// # Panics |
251 | /// |
252 | /// Attempts to write data to this stream may result in a panic after this |
253 | /// function is called. |
254 | /// |
255 | /// # Errors |
256 | /// |
257 | /// This function will perform I/O to finish the stream, returning any |
258 | /// errors which happen. |
259 | pub fn try_finish(&mut self) -> io::Result<()> { |
260 | self.finish_and_check_crc()?; |
261 | Ok(()) |
262 | } |
263 | |
264 | /// Consumes this decoder, flushing the output stream. |
265 | /// |
266 | /// This will flush the underlying data stream and then return the contained |
267 | /// writer if the flush succeeded. |
268 | /// |
269 | /// Note that this function may not be suitable to call in a situation where |
270 | /// the underlying stream is an asynchronous I/O stream. To finish a stream |
271 | /// the `try_finish` (or `shutdown`) method should be used instead. To |
272 | /// re-acquire ownership of a stream it is safe to call this method after |
273 | /// `try_finish` or `shutdown` has returned `Ok`. |
274 | /// |
275 | /// # Errors |
276 | /// |
277 | /// This function will perform I/O to complete this stream, and any I/O |
278 | /// errors which occur will be returned from this function. |
279 | pub fn finish(mut self) -> io::Result<W> { |
280 | self.finish_and_check_crc()?; |
281 | Ok(self.inner.take_inner().into_inner()) |
282 | } |
283 | |
284 | fn finish_and_check_crc(&mut self) -> io::Result<()> { |
285 | self.inner.finish()?; |
286 | |
287 | if self.crc_bytes.len() != 8 { |
288 | return Err(corrupt()); |
289 | } |
290 | |
291 | let crc = ((self.crc_bytes[0] as u32) << 0) |
292 | | ((self.crc_bytes[1] as u32) << 8) |
293 | | ((self.crc_bytes[2] as u32) << 16) |
294 | | ((self.crc_bytes[3] as u32) << 24); |
295 | let amt = ((self.crc_bytes[4] as u32) << 0) |
296 | | ((self.crc_bytes[5] as u32) << 8) |
297 | | ((self.crc_bytes[6] as u32) << 16) |
298 | | ((self.crc_bytes[7] as u32) << 24); |
299 | if crc != self.inner.get_ref().crc().sum() { |
300 | return Err(corrupt()); |
301 | } |
302 | if amt != self.inner.get_ref().crc().amount() { |
303 | return Err(corrupt()); |
304 | } |
305 | Ok(()) |
306 | } |
307 | } |
308 | |
309 | struct Counter<T: Read> { |
310 | inner: T, |
311 | pos: usize, |
312 | } |
313 | |
314 | impl<T: Read> Read for Counter<T> { |
315 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
316 | let pos: usize = self.inner.read(buf)?; |
317 | self.pos += pos; |
318 | Ok(pos) |
319 | } |
320 | } |
321 | |
322 | impl<W: Write> Write for GzDecoder<W> { |
323 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
324 | if self.header.is_none() { |
325 | // trying to avoid buffer usage |
326 | let (res, pos) = { |
327 | let mut counter = Counter { |
328 | inner: self.header_buf.chain(buf), |
329 | pos: 0, |
330 | }; |
331 | let res = read_gz_header(&mut counter); |
332 | (res, counter.pos) |
333 | }; |
334 | |
335 | match res { |
336 | Err(err) => { |
337 | if err.kind() == io::ErrorKind::UnexpectedEof { |
338 | // not enough data for header, save to the buffer |
339 | self.header_buf.extend(buf); |
340 | Ok(buf.len()) |
341 | } else { |
342 | Err(err) |
343 | } |
344 | } |
345 | Ok(header) => { |
346 | self.header = Some(header); |
347 | let pos = pos - self.header_buf.len(); |
348 | self.header_buf.truncate(0); |
349 | Ok(pos) |
350 | } |
351 | } |
352 | } else { |
353 | let (n, status) = self.inner.write_with_status(buf)?; |
354 | |
355 | if status == Status::StreamEnd && n < buf.len() && self.crc_bytes.len() < 8 { |
356 | let remaining = buf.len() - n; |
357 | let crc_bytes = cmp::min(remaining, CRC_BYTES_LEN - self.crc_bytes.len()); |
358 | self.crc_bytes.extend(&buf[n..n + crc_bytes]); |
359 | return Ok(n + crc_bytes); |
360 | } |
361 | Ok(n) |
362 | } |
363 | } |
364 | |
365 | fn flush(&mut self) -> io::Result<()> { |
366 | self.inner.flush() |
367 | } |
368 | } |
369 | |
370 | impl<W: Read + Write> Read for GzDecoder<W> { |
371 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
372 | self.inner.get_mut().get_mut().read(buf) |
373 | } |
374 | } |
375 | |
376 | /// A gzip streaming decoder that decodes all members of a multistream |
377 | /// |
378 | /// A gzip member consists of a header, compressed data and a trailer. The [gzip |
379 | /// specification](https://tools.ietf.org/html/rfc1952), however, allows multiple |
380 | /// gzip members to be joined in a single stream. `MultiGzDecoder` will |
381 | /// decode all consecutive members while `GzDecoder` will only decompress |
382 | /// the first gzip member. The multistream format is commonly used in |
383 | /// bioinformatics, for example when using the BGZF compressed data. |
384 | /// |
385 | /// This structure exposes a [`Write`] interface that will consume all gzip members |
386 | /// from the written buffers and write uncompressed data to the writer. |
387 | #[derive (Debug)] |
388 | pub struct MultiGzDecoder<W: Write> { |
389 | inner: GzDecoder<W>, |
390 | } |
391 | |
392 | impl<W: Write> MultiGzDecoder<W> { |
393 | /// Creates a new decoder which will write uncompressed data to the stream. |
394 | /// If the gzip stream contains multiple members all will be decoded. |
395 | pub fn new(w: W) -> MultiGzDecoder<W> { |
396 | MultiGzDecoder { |
397 | inner: GzDecoder::new(w), |
398 | } |
399 | } |
400 | |
401 | /// Returns the header associated with the current member. |
402 | pub fn header(&self) -> Option<&GzHeader> { |
403 | self.inner.header() |
404 | } |
405 | |
406 | /// Acquires a reference to the underlying writer. |
407 | pub fn get_ref(&self) -> &W { |
408 | self.inner.get_ref() |
409 | } |
410 | |
411 | /// Acquires a mutable reference to the underlying writer. |
412 | /// |
413 | /// Note that mutating the output/input state of the stream may corrupt this |
414 | /// object, so care must be taken when using this method. |
415 | pub fn get_mut(&mut self) -> &mut W { |
416 | self.inner.get_mut() |
417 | } |
418 | |
419 | /// Attempt to finish this output stream, writing out final chunks of data. |
420 | /// |
421 | /// Note that this function can only be used once data has finished being |
422 | /// written to the output stream. After this function is called then further |
423 | /// calls to `write` may result in a panic. |
424 | /// |
425 | /// # Panics |
426 | /// |
427 | /// Attempts to write data to this stream may result in a panic after this |
428 | /// function is called. |
429 | /// |
430 | /// # Errors |
431 | /// |
432 | /// This function will perform I/O to finish the stream, returning any |
433 | /// errors which happen. |
434 | pub fn try_finish(&mut self) -> io::Result<()> { |
435 | self.inner.try_finish() |
436 | } |
437 | |
438 | /// Consumes this decoder, flushing the output stream. |
439 | /// |
440 | /// This will flush the underlying data stream and then return the contained |
441 | /// writer if the flush succeeded. |
442 | /// |
443 | /// Note that this function may not be suitable to call in a situation where |
444 | /// the underlying stream is an asynchronous I/O stream. To finish a stream |
445 | /// the `try_finish` (or `shutdown`) method should be used instead. To |
446 | /// re-acquire ownership of a stream it is safe to call this method after |
447 | /// `try_finish` or `shutdown` has returned `Ok`. |
448 | /// |
449 | /// # Errors |
450 | /// |
451 | /// This function will perform I/O to complete this stream, and any I/O |
452 | /// errors which occur will be returned from this function. |
453 | pub fn finish(self) -> io::Result<W> { |
454 | self.inner.finish() |
455 | } |
456 | } |
457 | |
458 | impl<W: Write> Write for MultiGzDecoder<W> { |
459 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
460 | if buf.is_empty() { |
461 | Ok(0) |
462 | } else { |
463 | match self.inner.write(buf) { |
464 | Ok(0) => { |
465 | // When the GzDecoder indicates that it has finished |
466 | // create a new GzDecoder to handle additional data. |
467 | self.inner.try_finish()?; |
468 | let w: W = self.inner.inner.take_inner().into_inner(); |
469 | self.inner = GzDecoder::new(w); |
470 | self.inner.write(buf) |
471 | } |
472 | res: Result => res, |
473 | } |
474 | } |
475 | } |
476 | |
477 | fn flush(&mut self) -> io::Result<()> { |
478 | self.inner.flush() |
479 | } |
480 | } |
481 | |
482 | #[cfg (test)] |
483 | mod tests { |
484 | use super::*; |
485 | |
486 | const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ |
487 | Hello World Hello World Hello World Hello World Hello World \ |
488 | Hello World Hello World Hello World Hello World Hello World \ |
489 | Hello World Hello World Hello World Hello World Hello World \ |
490 | Hello World Hello World Hello World Hello World Hello World" ; |
491 | |
492 | #[test ] |
493 | fn decode_writer_one_chunk() { |
494 | let mut e = GzEncoder::new(Vec::new(), Compression::default()); |
495 | e.write(STR.as_ref()).unwrap(); |
496 | let bytes = e.finish().unwrap(); |
497 | |
498 | let mut writer = Vec::new(); |
499 | let mut decoder = GzDecoder::new(writer); |
500 | let n = decoder.write(&bytes[..]).unwrap(); |
501 | decoder.write(&bytes[n..]).unwrap(); |
502 | decoder.try_finish().unwrap(); |
503 | writer = decoder.finish().unwrap(); |
504 | let return_string = String::from_utf8(writer).expect("String parsing error" ); |
505 | assert_eq!(return_string, STR); |
506 | } |
507 | |
508 | #[test ] |
509 | fn decode_writer_partial_header() { |
510 | let mut e = GzEncoder::new(Vec::new(), Compression::default()); |
511 | e.write(STR.as_ref()).unwrap(); |
512 | let bytes = e.finish().unwrap(); |
513 | |
514 | let mut writer = Vec::new(); |
515 | let mut decoder = GzDecoder::new(writer); |
516 | assert_eq!(decoder.write(&bytes[..5]).unwrap(), 5); |
517 | let n = decoder.write(&bytes[5..]).unwrap(); |
518 | if n < bytes.len() - 5 { |
519 | decoder.write(&bytes[n + 5..]).unwrap(); |
520 | } |
521 | writer = decoder.finish().unwrap(); |
522 | let return_string = String::from_utf8(writer).expect("String parsing error" ); |
523 | assert_eq!(return_string, STR); |
524 | } |
525 | |
526 | #[test ] |
527 | fn decode_writer_exact_header() { |
528 | let mut e = GzEncoder::new(Vec::new(), Compression::default()); |
529 | e.write(STR.as_ref()).unwrap(); |
530 | let bytes = e.finish().unwrap(); |
531 | |
532 | let mut writer = Vec::new(); |
533 | let mut decoder = GzDecoder::new(writer); |
534 | assert_eq!(decoder.write(&bytes[..10]).unwrap(), 10); |
535 | decoder.write(&bytes[10..]).unwrap(); |
536 | writer = decoder.finish().unwrap(); |
537 | let return_string = String::from_utf8(writer).expect("String parsing error" ); |
538 | assert_eq!(return_string, STR); |
539 | } |
540 | |
541 | #[test ] |
542 | fn decode_writer_partial_crc() { |
543 | let mut e = GzEncoder::new(Vec::new(), Compression::default()); |
544 | e.write(STR.as_ref()).unwrap(); |
545 | let bytes = e.finish().unwrap(); |
546 | |
547 | let mut writer = Vec::new(); |
548 | let mut decoder = GzDecoder::new(writer); |
549 | let l = bytes.len() - 5; |
550 | let n = decoder.write(&bytes[..l]).unwrap(); |
551 | decoder.write(&bytes[n..]).unwrap(); |
552 | writer = decoder.finish().unwrap(); |
553 | let return_string = String::from_utf8(writer).expect("String parsing error" ); |
554 | assert_eq!(return_string, STR); |
555 | } |
556 | |
557 | // Two or more gzip files concatenated form a multi-member gzip file. MultiGzDecoder will |
558 | // concatenate the decoded contents of all members. |
559 | #[test ] |
560 | fn decode_multi_writer() { |
561 | let mut e = GzEncoder::new(Vec::new(), Compression::default()); |
562 | e.write(STR.as_ref()).unwrap(); |
563 | let bytes = e.finish().unwrap().repeat(2); |
564 | |
565 | let mut writer = Vec::new(); |
566 | let mut decoder = MultiGzDecoder::new(writer); |
567 | let mut count = 0; |
568 | while count < bytes.len() { |
569 | let n = decoder.write(&bytes[count..]).unwrap(); |
570 | assert!(n != 0); |
571 | count += n; |
572 | } |
573 | writer = decoder.finish().unwrap(); |
574 | let return_string = String::from_utf8(writer).expect("String parsing error" ); |
575 | let expected = STR.repeat(2); |
576 | assert_eq!(return_string, expected); |
577 | } |
578 | } |
579 | |