1 | use std::io; |
2 | use std::io::prelude::*; |
3 | use std::mem; |
4 | |
5 | use crate::{ |
6 | Compress, CompressError, Decompress, DecompressError, FlushCompress, FlushDecompress, Status, |
7 | }; |
8 | |
9 | #[derive (Debug)] |
10 | pub struct Writer<W: Write, D: Ops> { |
11 | obj: Option<W>, |
12 | pub data: D, |
13 | buf: Vec<u8>, |
14 | } |
15 | |
16 | pub trait Ops { |
17 | type Error: Into<io::Error>; |
18 | type Flush: Flush; |
19 | fn total_in(&self) -> u64; |
20 | fn total_out(&self) -> u64; |
21 | fn run( |
22 | &mut self, |
23 | input: &[u8], |
24 | output: &mut [u8], |
25 | flush: Self::Flush, |
26 | ) -> Result<Status, Self::Error>; |
27 | fn run_vec( |
28 | &mut self, |
29 | input: &[u8], |
30 | output: &mut Vec<u8>, |
31 | flush: Self::Flush, |
32 | ) -> Result<Status, Self::Error>; |
33 | } |
34 | |
35 | impl Ops for Compress { |
36 | type Error = CompressError; |
37 | type Flush = FlushCompress; |
38 | fn total_in(&self) -> u64 { |
39 | self.total_in() |
40 | } |
41 | fn total_out(&self) -> u64 { |
42 | self.total_out() |
43 | } |
44 | fn run( |
45 | &mut self, |
46 | input: &[u8], |
47 | output: &mut [u8], |
48 | flush: FlushCompress, |
49 | ) -> Result<Status, CompressError> { |
50 | self.compress(input, output, flush) |
51 | } |
52 | fn run_vec( |
53 | &mut self, |
54 | input: &[u8], |
55 | output: &mut Vec<u8>, |
56 | flush: FlushCompress, |
57 | ) -> Result<Status, CompressError> { |
58 | self.compress_vec(input, output, flush) |
59 | } |
60 | } |
61 | |
62 | impl Ops for Decompress { |
63 | type Error = DecompressError; |
64 | type Flush = FlushDecompress; |
65 | fn total_in(&self) -> u64 { |
66 | self.total_in() |
67 | } |
68 | fn total_out(&self) -> u64 { |
69 | self.total_out() |
70 | } |
71 | fn run( |
72 | &mut self, |
73 | input: &[u8], |
74 | output: &mut [u8], |
75 | flush: FlushDecompress, |
76 | ) -> Result<Status, DecompressError> { |
77 | self.decompress(input, output, flush) |
78 | } |
79 | fn run_vec( |
80 | &mut self, |
81 | input: &[u8], |
82 | output: &mut Vec<u8>, |
83 | flush: FlushDecompress, |
84 | ) -> Result<Status, DecompressError> { |
85 | self.decompress_vec(input, output, flush) |
86 | } |
87 | } |
88 | |
89 | pub trait Flush { |
90 | fn none() -> Self; |
91 | fn sync() -> Self; |
92 | fn finish() -> Self; |
93 | } |
94 | |
95 | impl Flush for FlushCompress { |
96 | fn none() -> Self { |
97 | FlushCompress::None |
98 | } |
99 | |
100 | fn sync() -> Self { |
101 | FlushCompress::Sync |
102 | } |
103 | |
104 | fn finish() -> Self { |
105 | FlushCompress::Finish |
106 | } |
107 | } |
108 | |
109 | impl Flush for FlushDecompress { |
110 | fn none() -> Self { |
111 | FlushDecompress::None |
112 | } |
113 | |
114 | fn sync() -> Self { |
115 | FlushDecompress::Sync |
116 | } |
117 | |
118 | fn finish() -> Self { |
119 | FlushDecompress::Finish |
120 | } |
121 | } |
122 | |
123 | pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize> |
124 | where |
125 | R: BufRead, |
126 | D: Ops, |
127 | { |
128 | loop { |
129 | let (read, consumed, ret, eof); |
130 | { |
131 | let input = obj.fill_buf()?; |
132 | eof = input.is_empty(); |
133 | let before_out = data.total_out(); |
134 | let before_in = data.total_in(); |
135 | let flush = if eof { |
136 | D::Flush::finish() |
137 | } else { |
138 | D::Flush::none() |
139 | }; |
140 | ret = data.run(input, dst, flush); |
141 | read = (data.total_out() - before_out) as usize; |
142 | consumed = (data.total_in() - before_in) as usize; |
143 | } |
144 | obj.consume(consumed); |
145 | |
146 | match ret { |
147 | // If we haven't ready any data and we haven't hit EOF yet, |
148 | // then we need to keep asking for more data because if we |
149 | // return that 0 bytes of data have been read then it will |
150 | // be interpreted as EOF. |
151 | Ok(Status::Ok | Status::BufError) if read == 0 && !eof && !dst.is_empty() => continue, |
152 | Ok(Status::Ok | Status::BufError | Status::StreamEnd) => return Ok(read), |
153 | |
154 | Err(..) => { |
155 | return Err(io::Error::new( |
156 | io::ErrorKind::InvalidInput, |
157 | "corrupt deflate stream" , |
158 | )) |
159 | } |
160 | } |
161 | } |
162 | } |
163 | |
164 | impl<W: Write, D: Ops> Writer<W, D> { |
165 | pub fn new(w: W, d: D) -> Writer<W, D> { |
166 | Writer { |
167 | obj: Some(w), |
168 | data: d, |
169 | buf: Vec::with_capacity(32 * 1024), |
170 | } |
171 | } |
172 | |
173 | pub fn finish(&mut self) -> io::Result<()> { |
174 | loop { |
175 | self.dump()?; |
176 | |
177 | let before = self.data.total_out(); |
178 | self.data |
179 | .run_vec(&[], &mut self.buf, Flush::finish()) |
180 | .map_err(Into::into)?; |
181 | if before == self.data.total_out() { |
182 | return Ok(()); |
183 | } |
184 | } |
185 | } |
186 | |
187 | pub fn replace(&mut self, w: W) -> W { |
188 | self.buf.truncate(0); |
189 | mem::replace(self.get_mut(), w) |
190 | } |
191 | |
192 | pub fn get_ref(&self) -> &W { |
193 | self.obj.as_ref().unwrap() |
194 | } |
195 | |
196 | pub fn get_mut(&mut self) -> &mut W { |
197 | self.obj.as_mut().unwrap() |
198 | } |
199 | |
200 | // Note that this should only be called if the outer object is just about |
201 | // to be consumed! |
202 | // |
203 | // (e.g. an implementation of `into_inner`) |
204 | pub fn take_inner(&mut self) -> W { |
205 | self.obj.take().unwrap() |
206 | } |
207 | |
208 | pub fn is_present(&self) -> bool { |
209 | self.obj.is_some() |
210 | } |
211 | |
212 | // Returns total written bytes and status of underlying codec |
213 | pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> { |
214 | // miniz isn't guaranteed to actually write any of the buffer provided, |
215 | // it may be in a flushing mode where it's just giving us data before |
216 | // we're actually giving it any data. We don't want to spuriously return |
217 | // `Ok(0)` when possible as it will cause calls to write_all() to fail. |
218 | // As a result we execute this in a loop to ensure that we try our |
219 | // darndest to write the data. |
220 | loop { |
221 | self.dump()?; |
222 | |
223 | let before_in = self.data.total_in(); |
224 | let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none()); |
225 | let written = (self.data.total_in() - before_in) as usize; |
226 | let is_stream_end = matches!(ret, Ok(Status::StreamEnd)); |
227 | |
228 | if !buf.is_empty() && written == 0 && ret.is_ok() && !is_stream_end { |
229 | continue; |
230 | } |
231 | return match ret { |
232 | Ok(st) => match st { |
233 | Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)), |
234 | }, |
235 | Err(..) => Err(io::Error::new( |
236 | io::ErrorKind::InvalidInput, |
237 | "corrupt deflate stream" , |
238 | )), |
239 | }; |
240 | } |
241 | } |
242 | |
243 | fn dump(&mut self) -> io::Result<()> { |
244 | // TODO: should manage this buffer not with `drain` but probably more of |
245 | // a deque-like strategy. |
246 | while !self.buf.is_empty() { |
247 | let n = self.obj.as_mut().unwrap().write(&self.buf)?; |
248 | if n == 0 { |
249 | return Err(io::ErrorKind::WriteZero.into()); |
250 | } |
251 | self.buf.drain(..n); |
252 | } |
253 | Ok(()) |
254 | } |
255 | } |
256 | |
257 | impl<W: Write, D: Ops> Write for Writer<W, D> { |
258 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
259 | self.write_with_status(buf).map(|res| res.0) |
260 | } |
261 | |
262 | fn flush(&mut self) -> io::Result<()> { |
263 | self.data |
264 | .run_vec(&[], &mut self.buf, Flush::sync()) |
265 | .map_err(Into::into)?; |
266 | |
267 | // Unfortunately miniz doesn't actually tell us when we're done with |
268 | // pulling out all the data from the internal stream. To remedy this we |
269 | // have to continually ask the stream for more memory until it doesn't |
270 | // give us a chunk of memory the same size as our own internal buffer, |
271 | // at which point we assume it's reached the end. |
272 | loop { |
273 | self.dump()?; |
274 | let before = self.data.total_out(); |
275 | self.data |
276 | .run_vec(&[], &mut self.buf, Flush::none()) |
277 | .map_err(Into::into)?; |
278 | if before == self.data.total_out() { |
279 | break; |
280 | } |
281 | } |
282 | |
283 | self.obj.as_mut().unwrap().flush() |
284 | } |
285 | } |
286 | |
287 | impl<W: Write, D: Ops> Drop for Writer<W, D> { |
288 | fn drop(&mut self) { |
289 | if self.obj.is_some() { |
290 | let _ = self.finish(); |
291 | } |
292 | } |
293 | } |
294 | |