| 1 | use std::io; |
| 2 | use std::io::prelude::*; |
| 3 | use std::mem; |
| 4 | |
| 5 | use crate::{ |
| 6 | Compress, CompressError, Decompress, DecompressError, FlushCompress, FlushDecompress, Status, |
| 7 | }; |
| 8 | |
| 9 | #[derive (Debug)] |
| 10 | pub struct Writer<W: Write, D: Ops> { |
| 11 | obj: Option<W>, |
| 12 | pub data: D, |
| 13 | buf: Vec<u8>, |
| 14 | } |
| 15 | |
| 16 | pub trait Ops { |
| 17 | type Error: Into<io::Error>; |
| 18 | type Flush: Flush; |
| 19 | fn total_in(&self) -> u64; |
| 20 | fn total_out(&self) -> u64; |
| 21 | fn run( |
| 22 | &mut self, |
| 23 | input: &[u8], |
| 24 | output: &mut [u8], |
| 25 | flush: Self::Flush, |
| 26 | ) -> Result<Status, Self::Error>; |
| 27 | fn run_vec( |
| 28 | &mut self, |
| 29 | input: &[u8], |
| 30 | output: &mut Vec<u8>, |
| 31 | flush: Self::Flush, |
| 32 | ) -> Result<Status, Self::Error>; |
| 33 | } |
| 34 | |
| 35 | impl Ops for Compress { |
| 36 | type Error = CompressError; |
| 37 | type Flush = FlushCompress; |
| 38 | fn total_in(&self) -> u64 { |
| 39 | self.total_in() |
| 40 | } |
| 41 | fn total_out(&self) -> u64 { |
| 42 | self.total_out() |
| 43 | } |
| 44 | fn run( |
| 45 | &mut self, |
| 46 | input: &[u8], |
| 47 | output: &mut [u8], |
| 48 | flush: FlushCompress, |
| 49 | ) -> Result<Status, CompressError> { |
| 50 | self.compress(input, output, flush) |
| 51 | } |
| 52 | fn run_vec( |
| 53 | &mut self, |
| 54 | input: &[u8], |
| 55 | output: &mut Vec<u8>, |
| 56 | flush: FlushCompress, |
| 57 | ) -> Result<Status, CompressError> { |
| 58 | self.compress_vec(input, output, flush) |
| 59 | } |
| 60 | } |
| 61 | |
| 62 | impl Ops for Decompress { |
| 63 | type Error = DecompressError; |
| 64 | type Flush = FlushDecompress; |
| 65 | fn total_in(&self) -> u64 { |
| 66 | self.total_in() |
| 67 | } |
| 68 | fn total_out(&self) -> u64 { |
| 69 | self.total_out() |
| 70 | } |
| 71 | fn run( |
| 72 | &mut self, |
| 73 | input: &[u8], |
| 74 | output: &mut [u8], |
| 75 | flush: FlushDecompress, |
| 76 | ) -> Result<Status, DecompressError> { |
| 77 | self.decompress(input, output, flush) |
| 78 | } |
| 79 | fn run_vec( |
| 80 | &mut self, |
| 81 | input: &[u8], |
| 82 | output: &mut Vec<u8>, |
| 83 | flush: FlushDecompress, |
| 84 | ) -> Result<Status, DecompressError> { |
| 85 | self.decompress_vec(input, output, flush) |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | pub trait Flush { |
| 90 | fn none() -> Self; |
| 91 | fn sync() -> Self; |
| 92 | fn finish() -> Self; |
| 93 | } |
| 94 | |
| 95 | impl Flush for FlushCompress { |
| 96 | fn none() -> Self { |
| 97 | FlushCompress::None |
| 98 | } |
| 99 | |
| 100 | fn sync() -> Self { |
| 101 | FlushCompress::Sync |
| 102 | } |
| 103 | |
| 104 | fn finish() -> Self { |
| 105 | FlushCompress::Finish |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | impl Flush for FlushDecompress { |
| 110 | fn none() -> Self { |
| 111 | FlushDecompress::None |
| 112 | } |
| 113 | |
| 114 | fn sync() -> Self { |
| 115 | FlushDecompress::Sync |
| 116 | } |
| 117 | |
| 118 | fn finish() -> Self { |
| 119 | FlushDecompress::Finish |
| 120 | } |
| 121 | } |
| 122 | |
| 123 | pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize> |
| 124 | where |
| 125 | R: BufRead, |
| 126 | D: Ops, |
| 127 | { |
| 128 | loop { |
| 129 | let (read, consumed, ret, eof); |
| 130 | { |
| 131 | let input = obj.fill_buf()?; |
| 132 | eof = input.is_empty(); |
| 133 | let before_out = data.total_out(); |
| 134 | let before_in = data.total_in(); |
| 135 | let flush = if eof { |
| 136 | D::Flush::finish() |
| 137 | } else { |
| 138 | D::Flush::none() |
| 139 | }; |
| 140 | ret = data.run(input, dst, flush); |
| 141 | read = (data.total_out() - before_out) as usize; |
| 142 | consumed = (data.total_in() - before_in) as usize; |
| 143 | } |
| 144 | obj.consume(consumed); |
| 145 | |
| 146 | match ret { |
| 147 | // If we haven't ready any data and we haven't hit EOF yet, |
| 148 | // then we need to keep asking for more data because if we |
| 149 | // return that 0 bytes of data have been read then it will |
| 150 | // be interpreted as EOF. |
| 151 | Ok(Status::Ok | Status::BufError) if read == 0 && !eof && !dst.is_empty() => continue, |
| 152 | Ok(Status::Ok | Status::BufError | Status::StreamEnd) => return Ok(read), |
| 153 | |
| 154 | Err(..) => { |
| 155 | return Err(io::Error::new( |
| 156 | io::ErrorKind::InvalidInput, |
| 157 | "corrupt deflate stream" , |
| 158 | )) |
| 159 | } |
| 160 | } |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | impl<W: Write, D: Ops> Writer<W, D> { |
| 165 | pub fn new(w: W, d: D) -> Writer<W, D> { |
| 166 | Writer { |
| 167 | obj: Some(w), |
| 168 | data: d, |
| 169 | buf: Vec::with_capacity(32 * 1024), |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | pub fn finish(&mut self) -> io::Result<()> { |
| 174 | loop { |
| 175 | self.dump()?; |
| 176 | |
| 177 | let before = self.data.total_out(); |
| 178 | self.data |
| 179 | .run_vec(&[], &mut self.buf, Flush::finish()) |
| 180 | .map_err(Into::into)?; |
| 181 | if before == self.data.total_out() { |
| 182 | return Ok(()); |
| 183 | } |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | pub fn replace(&mut self, w: W) -> W { |
| 188 | self.buf.truncate(0); |
| 189 | mem::replace(self.get_mut(), w) |
| 190 | } |
| 191 | |
| 192 | pub fn get_ref(&self) -> &W { |
| 193 | self.obj.as_ref().unwrap() |
| 194 | } |
| 195 | |
| 196 | pub fn get_mut(&mut self) -> &mut W { |
| 197 | self.obj.as_mut().unwrap() |
| 198 | } |
| 199 | |
| 200 | // Note that this should only be called if the outer object is just about |
| 201 | // to be consumed! |
| 202 | // |
| 203 | // (e.g. an implementation of `into_inner`) |
| 204 | pub fn take_inner(&mut self) -> W { |
| 205 | self.obj.take().unwrap() |
| 206 | } |
| 207 | |
| 208 | pub fn is_present(&self) -> bool { |
| 209 | self.obj.is_some() |
| 210 | } |
| 211 | |
| 212 | // Returns total written bytes and status of underlying codec |
| 213 | pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> { |
| 214 | // miniz isn't guaranteed to actually write any of the buffer provided, |
| 215 | // it may be in a flushing mode where it's just giving us data before |
| 216 | // we're actually giving it any data. We don't want to spuriously return |
| 217 | // `Ok(0)` when possible as it will cause calls to write_all() to fail. |
| 218 | // As a result we execute this in a loop to ensure that we try our |
| 219 | // darndest to write the data. |
| 220 | loop { |
| 221 | self.dump()?; |
| 222 | |
| 223 | let before_in = self.data.total_in(); |
| 224 | let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none()); |
| 225 | let written = (self.data.total_in() - before_in) as usize; |
| 226 | let is_stream_end = matches!(ret, Ok(Status::StreamEnd)); |
| 227 | |
| 228 | if !buf.is_empty() && written == 0 && ret.is_ok() && !is_stream_end { |
| 229 | continue; |
| 230 | } |
| 231 | return match ret { |
| 232 | Ok(st) => match st { |
| 233 | Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)), |
| 234 | }, |
| 235 | Err(..) => Err(io::Error::new( |
| 236 | io::ErrorKind::InvalidInput, |
| 237 | "corrupt deflate stream" , |
| 238 | )), |
| 239 | }; |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | fn dump(&mut self) -> io::Result<()> { |
| 244 | // TODO: should manage this buffer not with `drain` but probably more of |
| 245 | // a deque-like strategy. |
| 246 | while !self.buf.is_empty() { |
| 247 | let n = self.obj.as_mut().unwrap().write(&self.buf)?; |
| 248 | if n == 0 { |
| 249 | return Err(io::ErrorKind::WriteZero.into()); |
| 250 | } |
| 251 | self.buf.drain(..n); |
| 252 | } |
| 253 | Ok(()) |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | impl<W: Write, D: Ops> Write for Writer<W, D> { |
| 258 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| 259 | self.write_with_status(buf).map(|res| res.0) |
| 260 | } |
| 261 | |
| 262 | fn flush(&mut self) -> io::Result<()> { |
| 263 | self.data |
| 264 | .run_vec(&[], &mut self.buf, Flush::sync()) |
| 265 | .map_err(Into::into)?; |
| 266 | |
| 267 | // Unfortunately miniz doesn't actually tell us when we're done with |
| 268 | // pulling out all the data from the internal stream. To remedy this we |
| 269 | // have to continually ask the stream for more memory until it doesn't |
| 270 | // give us a chunk of memory the same size as our own internal buffer, |
| 271 | // at which point we assume it's reached the end. |
| 272 | loop { |
| 273 | self.dump()?; |
| 274 | let before = self.data.total_out(); |
| 275 | self.data |
| 276 | .run_vec(&[], &mut self.buf, Flush::none()) |
| 277 | .map_err(Into::into)?; |
| 278 | if before == self.data.total_out() { |
| 279 | break; |
| 280 | } |
| 281 | } |
| 282 | |
| 283 | self.obj.as_mut().unwrap().flush() |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | impl<W: Write, D: Ops> Drop for Writer<W, D> { |
| 288 | fn drop(&mut self) { |
| 289 | if self.obj.is_some() { |
| 290 | let _ = self.finish(); |
| 291 | } |
| 292 | } |
| 293 | } |
| 294 | |