1use std::io;
2use std::io::prelude::*;
3use std::mem;
4
5use crate::{Compress, Decompress, DecompressError, FlushCompress, FlushDecompress, Status};
6
7#[derive(Debug)]
8pub struct Writer<W: Write, D: Ops> {
9 obj: Option<W>,
10 pub data: D,
11 buf: Vec<u8>,
12}
13
14pub trait Ops {
15 type Flush: Flush;
16 fn total_in(&self) -> u64;
17 fn total_out(&self) -> u64;
18 fn run(
19 &mut self,
20 input: &[u8],
21 output: &mut [u8],
22 flush: Self::Flush,
23 ) -> Result<Status, DecompressError>;
24 fn run_vec(
25 &mut self,
26 input: &[u8],
27 output: &mut Vec<u8>,
28 flush: Self::Flush,
29 ) -> Result<Status, DecompressError>;
30}
31
32impl Ops for Compress {
33 type Flush = FlushCompress;
34 fn total_in(&self) -> u64 {
35 self.total_in()
36 }
37 fn total_out(&self) -> u64 {
38 self.total_out()
39 }
40 fn run(
41 &mut self,
42 input: &[u8],
43 output: &mut [u8],
44 flush: FlushCompress,
45 ) -> Result<Status, DecompressError> {
46 Ok(self.compress(input, output, flush).unwrap())
47 }
48 fn run_vec(
49 &mut self,
50 input: &[u8],
51 output: &mut Vec<u8>,
52 flush: FlushCompress,
53 ) -> Result<Status, DecompressError> {
54 Ok(self.compress_vec(input, output, flush).unwrap())
55 }
56}
57
58impl Ops for Decompress {
59 type Flush = FlushDecompress;
60 fn total_in(&self) -> u64 {
61 self.total_in()
62 }
63 fn total_out(&self) -> u64 {
64 self.total_out()
65 }
66 fn run(
67 &mut self,
68 input: &[u8],
69 output: &mut [u8],
70 flush: FlushDecompress,
71 ) -> Result<Status, DecompressError> {
72 self.decompress(input, output, flush)
73 }
74 fn run_vec(
75 &mut self,
76 input: &[u8],
77 output: &mut Vec<u8>,
78 flush: FlushDecompress,
79 ) -> Result<Status, DecompressError> {
80 self.decompress_vec(input, output, flush)
81 }
82}
83
84pub trait Flush {
85 fn none() -> Self;
86 fn sync() -> Self;
87 fn finish() -> Self;
88}
89
90impl Flush for FlushCompress {
91 fn none() -> Self {
92 FlushCompress::None
93 }
94
95 fn sync() -> Self {
96 FlushCompress::Sync
97 }
98
99 fn finish() -> Self {
100 FlushCompress::Finish
101 }
102}
103
104impl Flush for FlushDecompress {
105 fn none() -> Self {
106 FlushDecompress::None
107 }
108
109 fn sync() -> Self {
110 FlushDecompress::Sync
111 }
112
113 fn finish() -> Self {
114 FlushDecompress::Finish
115 }
116}
117
118pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize>
119where
120 R: BufRead,
121 D: Ops,
122{
123 loop {
124 let (read, consumed, ret, eof);
125 {
126 let input = obj.fill_buf()?;
127 eof = input.is_empty();
128 let before_out = data.total_out();
129 let before_in = data.total_in();
130 let flush = if eof {
131 D::Flush::finish()
132 } else {
133 D::Flush::none()
134 };
135 ret = data.run(input, dst, flush);
136 read = (data.total_out() - before_out) as usize;
137 consumed = (data.total_in() - before_in) as usize;
138 }
139 obj.consume(consumed);
140
141 match ret {
142 // If we haven't ready any data and we haven't hit EOF yet,
143 // then we need to keep asking for more data because if we
144 // return that 0 bytes of data have been read then it will
145 // be interpreted as EOF.
146 Ok(Status::Ok | Status::BufError) if read == 0 && !eof && !dst.is_empty() => continue,
147 Ok(Status::Ok | Status::BufError | Status::StreamEnd) => return Ok(read),
148
149 Err(..) => {
150 return Err(io::Error::new(
151 io::ErrorKind::InvalidInput,
152 "corrupt deflate stream",
153 ))
154 }
155 }
156 }
157}
158
159impl<W: Write, D: Ops> Writer<W, D> {
160 pub fn new(w: W, d: D) -> Writer<W, D> {
161 Writer {
162 obj: Some(w),
163 data: d,
164 buf: Vec::with_capacity(32 * 1024),
165 }
166 }
167
168 pub fn finish(&mut self) -> io::Result<()> {
169 loop {
170 self.dump()?;
171
172 let before = self.data.total_out();
173 self.data.run_vec(&[], &mut self.buf, D::Flush::finish())?;
174 if before == self.data.total_out() {
175 return Ok(());
176 }
177 }
178 }
179
180 pub fn replace(&mut self, w: W) -> W {
181 self.buf.truncate(0);
182 mem::replace(self.get_mut(), w)
183 }
184
185 pub fn get_ref(&self) -> &W {
186 self.obj.as_ref().unwrap()
187 }
188
189 pub fn get_mut(&mut self) -> &mut W {
190 self.obj.as_mut().unwrap()
191 }
192
193 // Note that this should only be called if the outer object is just about
194 // to be consumed!
195 //
196 // (e.g. an implementation of `into_inner`)
197 pub fn take_inner(&mut self) -> W {
198 self.obj.take().unwrap()
199 }
200
201 pub fn is_present(&self) -> bool {
202 self.obj.is_some()
203 }
204
205 // Returns total written bytes and status of underlying codec
206 pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> {
207 // miniz isn't guaranteed to actually write any of the buffer provided,
208 // it may be in a flushing mode where it's just giving us data before
209 // we're actually giving it any data. We don't want to spuriously return
210 // `Ok(0)` when possible as it will cause calls to write_all() to fail.
211 // As a result we execute this in a loop to ensure that we try our
212 // darndest to write the data.
213 loop {
214 self.dump()?;
215
216 let before_in = self.data.total_in();
217 let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none());
218 let written = (self.data.total_in() - before_in) as usize;
219 let is_stream_end = matches!(ret, Ok(Status::StreamEnd));
220
221 if !buf.is_empty() && written == 0 && ret.is_ok() && !is_stream_end {
222 continue;
223 }
224 return match ret {
225 Ok(st) => match st {
226 Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)),
227 },
228 Err(..) => Err(io::Error::new(
229 io::ErrorKind::InvalidInput,
230 "corrupt deflate stream",
231 )),
232 };
233 }
234 }
235
236 fn dump(&mut self) -> io::Result<()> {
237 // TODO: should manage this buffer not with `drain` but probably more of
238 // a deque-like strategy.
239 while !self.buf.is_empty() {
240 let n = self.obj.as_mut().unwrap().write(&self.buf)?;
241 if n == 0 {
242 return Err(io::ErrorKind::WriteZero.into());
243 }
244 self.buf.drain(..n);
245 }
246 Ok(())
247 }
248}
249
250impl<W: Write, D: Ops> Write for Writer<W, D> {
251 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
252 self.write_with_status(buf).map(|res| res.0)
253 }
254
255 fn flush(&mut self) -> io::Result<()> {
256 self.data
257 .run_vec(&[], &mut self.buf, D::Flush::sync())
258 .unwrap();
259
260 // Unfortunately miniz doesn't actually tell us when we're done with
261 // pulling out all the data from the internal stream. To remedy this we
262 // have to continually ask the stream for more memory until it doesn't
263 // give us a chunk of memory the same size as our own internal buffer,
264 // at which point we assume it's reached the end.
265 loop {
266 self.dump()?;
267 let before = self.data.total_out();
268 self.data
269 .run_vec(&[], &mut self.buf, D::Flush::none())
270 .unwrap();
271 if before == self.data.total_out() {
272 break;
273 }
274 }
275
276 self.obj.as_mut().unwrap().flush()
277 }
278}
279
280impl<W: Write, D: Ops> Drop for Writer<W, D> {
281 fn drop(&mut self) {
282 if self.obj.is_some() {
283 let _ = self.finish();
284 }
285 }
286}
287