1 | use std::io; |
2 | use std::io::prelude::*; |
3 | use std::mem; |
4 | |
5 | use crate::{Compress, Decompress, DecompressError, FlushCompress, FlushDecompress, Status}; |
6 | |
7 | #[derive (Debug)] |
8 | pub struct Writer<W: Write, D: Ops> { |
9 | obj: Option<W>, |
10 | pub data: D, |
11 | buf: Vec<u8>, |
12 | } |
13 | |
14 | pub trait Ops { |
15 | type Flush: Flush; |
16 | fn total_in(&self) -> u64; |
17 | fn total_out(&self) -> u64; |
18 | fn run( |
19 | &mut self, |
20 | input: &[u8], |
21 | output: &mut [u8], |
22 | flush: Self::Flush, |
23 | ) -> Result<Status, DecompressError>; |
24 | fn run_vec( |
25 | &mut self, |
26 | input: &[u8], |
27 | output: &mut Vec<u8>, |
28 | flush: Self::Flush, |
29 | ) -> Result<Status, DecompressError>; |
30 | } |
31 | |
32 | impl Ops for Compress { |
33 | type Flush = FlushCompress; |
34 | fn total_in(&self) -> u64 { |
35 | self.total_in() |
36 | } |
37 | fn total_out(&self) -> u64 { |
38 | self.total_out() |
39 | } |
40 | fn run( |
41 | &mut self, |
42 | input: &[u8], |
43 | output: &mut [u8], |
44 | flush: FlushCompress, |
45 | ) -> Result<Status, DecompressError> { |
46 | Ok(self.compress(input, output, flush).unwrap()) |
47 | } |
48 | fn run_vec( |
49 | &mut self, |
50 | input: &[u8], |
51 | output: &mut Vec<u8>, |
52 | flush: FlushCompress, |
53 | ) -> Result<Status, DecompressError> { |
54 | Ok(self.compress_vec(input, output, flush).unwrap()) |
55 | } |
56 | } |
57 | |
58 | impl Ops for Decompress { |
59 | type Flush = FlushDecompress; |
60 | fn total_in(&self) -> u64 { |
61 | self.total_in() |
62 | } |
63 | fn total_out(&self) -> u64 { |
64 | self.total_out() |
65 | } |
66 | fn run( |
67 | &mut self, |
68 | input: &[u8], |
69 | output: &mut [u8], |
70 | flush: FlushDecompress, |
71 | ) -> Result<Status, DecompressError> { |
72 | self.decompress(input, output, flush) |
73 | } |
74 | fn run_vec( |
75 | &mut self, |
76 | input: &[u8], |
77 | output: &mut Vec<u8>, |
78 | flush: FlushDecompress, |
79 | ) -> Result<Status, DecompressError> { |
80 | self.decompress_vec(input, output, flush) |
81 | } |
82 | } |
83 | |
84 | pub trait Flush { |
85 | fn none() -> Self; |
86 | fn sync() -> Self; |
87 | fn finish() -> Self; |
88 | } |
89 | |
90 | impl Flush for FlushCompress { |
91 | fn none() -> Self { |
92 | FlushCompress::None |
93 | } |
94 | |
95 | fn sync() -> Self { |
96 | FlushCompress::Sync |
97 | } |
98 | |
99 | fn finish() -> Self { |
100 | FlushCompress::Finish |
101 | } |
102 | } |
103 | |
104 | impl Flush for FlushDecompress { |
105 | fn none() -> Self { |
106 | FlushDecompress::None |
107 | } |
108 | |
109 | fn sync() -> Self { |
110 | FlushDecompress::Sync |
111 | } |
112 | |
113 | fn finish() -> Self { |
114 | FlushDecompress::Finish |
115 | } |
116 | } |
117 | |
118 | pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize> |
119 | where |
120 | R: BufRead, |
121 | D: Ops, |
122 | { |
123 | loop { |
124 | let (read, consumed, ret, eof); |
125 | { |
126 | let input = obj.fill_buf()?; |
127 | eof = input.is_empty(); |
128 | let before_out = data.total_out(); |
129 | let before_in = data.total_in(); |
130 | let flush = if eof { |
131 | D::Flush::finish() |
132 | } else { |
133 | D::Flush::none() |
134 | }; |
135 | ret = data.run(input, dst, flush); |
136 | read = (data.total_out() - before_out) as usize; |
137 | consumed = (data.total_in() - before_in) as usize; |
138 | } |
139 | obj.consume(consumed); |
140 | |
141 | match ret { |
142 | // If we haven't ready any data and we haven't hit EOF yet, |
143 | // then we need to keep asking for more data because if we |
144 | // return that 0 bytes of data have been read then it will |
145 | // be interpreted as EOF. |
146 | Ok(Status::Ok | Status::BufError) if read == 0 && !eof && !dst.is_empty() => continue, |
147 | Ok(Status::Ok | Status::BufError | Status::StreamEnd) => return Ok(read), |
148 | |
149 | Err(..) => { |
150 | return Err(io::Error::new( |
151 | io::ErrorKind::InvalidInput, |
152 | "corrupt deflate stream" , |
153 | )) |
154 | } |
155 | } |
156 | } |
157 | } |
158 | |
159 | impl<W: Write, D: Ops> Writer<W, D> { |
160 | pub fn new(w: W, d: D) -> Writer<W, D> { |
161 | Writer { |
162 | obj: Some(w), |
163 | data: d, |
164 | buf: Vec::with_capacity(32 * 1024), |
165 | } |
166 | } |
167 | |
168 | pub fn finish(&mut self) -> io::Result<()> { |
169 | loop { |
170 | self.dump()?; |
171 | |
172 | let before = self.data.total_out(); |
173 | self.data.run_vec(&[], &mut self.buf, D::Flush::finish())?; |
174 | if before == self.data.total_out() { |
175 | return Ok(()); |
176 | } |
177 | } |
178 | } |
179 | |
180 | pub fn replace(&mut self, w: W) -> W { |
181 | self.buf.truncate(0); |
182 | mem::replace(self.get_mut(), w) |
183 | } |
184 | |
185 | pub fn get_ref(&self) -> &W { |
186 | self.obj.as_ref().unwrap() |
187 | } |
188 | |
189 | pub fn get_mut(&mut self) -> &mut W { |
190 | self.obj.as_mut().unwrap() |
191 | } |
192 | |
193 | // Note that this should only be called if the outer object is just about |
194 | // to be consumed! |
195 | // |
196 | // (e.g. an implementation of `into_inner`) |
197 | pub fn take_inner(&mut self) -> W { |
198 | self.obj.take().unwrap() |
199 | } |
200 | |
201 | pub fn is_present(&self) -> bool { |
202 | self.obj.is_some() |
203 | } |
204 | |
205 | // Returns total written bytes and status of underlying codec |
206 | pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> { |
207 | // miniz isn't guaranteed to actually write any of the buffer provided, |
208 | // it may be in a flushing mode where it's just giving us data before |
209 | // we're actually giving it any data. We don't want to spuriously return |
210 | // `Ok(0)` when possible as it will cause calls to write_all() to fail. |
211 | // As a result we execute this in a loop to ensure that we try our |
212 | // darndest to write the data. |
213 | loop { |
214 | self.dump()?; |
215 | |
216 | let before_in = self.data.total_in(); |
217 | let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none()); |
218 | let written = (self.data.total_in() - before_in) as usize; |
219 | let is_stream_end = matches!(ret, Ok(Status::StreamEnd)); |
220 | |
221 | if !buf.is_empty() && written == 0 && ret.is_ok() && !is_stream_end { |
222 | continue; |
223 | } |
224 | return match ret { |
225 | Ok(st) => match st { |
226 | Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)), |
227 | }, |
228 | Err(..) => Err(io::Error::new( |
229 | io::ErrorKind::InvalidInput, |
230 | "corrupt deflate stream" , |
231 | )), |
232 | }; |
233 | } |
234 | } |
235 | |
236 | fn dump(&mut self) -> io::Result<()> { |
237 | // TODO: should manage this buffer not with `drain` but probably more of |
238 | // a deque-like strategy. |
239 | while !self.buf.is_empty() { |
240 | let n = self.obj.as_mut().unwrap().write(&self.buf)?; |
241 | if n == 0 { |
242 | return Err(io::ErrorKind::WriteZero.into()); |
243 | } |
244 | self.buf.drain(..n); |
245 | } |
246 | Ok(()) |
247 | } |
248 | } |
249 | |
250 | impl<W: Write, D: Ops> Write for Writer<W, D> { |
251 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
252 | self.write_with_status(buf).map(|res| res.0) |
253 | } |
254 | |
255 | fn flush(&mut self) -> io::Result<()> { |
256 | self.data |
257 | .run_vec(&[], &mut self.buf, D::Flush::sync()) |
258 | .unwrap(); |
259 | |
260 | // Unfortunately miniz doesn't actually tell us when we're done with |
261 | // pulling out all the data from the internal stream. To remedy this we |
262 | // have to continually ask the stream for more memory until it doesn't |
263 | // give us a chunk of memory the same size as our own internal buffer, |
264 | // at which point we assume it's reached the end. |
265 | loop { |
266 | self.dump()?; |
267 | let before = self.data.total_out(); |
268 | self.data |
269 | .run_vec(&[], &mut self.buf, D::Flush::none()) |
270 | .unwrap(); |
271 | if before == self.data.total_out() { |
272 | break; |
273 | } |
274 | } |
275 | |
276 | self.obj.as_mut().unwrap().flush() |
277 | } |
278 | } |
279 | |
280 | impl<W: Write, D: Ops> Drop for Writer<W, D> { |
281 | fn drop(&mut self) { |
282 | if self.obj.is_some() { |
283 | let _ = self.finish(); |
284 | } |
285 | } |
286 | } |
287 | |