| 1 | use std::io; |
| 2 | use std::io::prelude::*; |
| 3 | use std::mem; |
| 4 | |
| 5 | use crate::{Compress, Decompress, DecompressError, FlushCompress, FlushDecompress, Status}; |
| 6 | |
| 7 | #[derive (Debug)] |
| 8 | pub struct Writer<W: Write, D: Ops> { |
| 9 | obj: Option<W>, |
| 10 | pub data: D, |
| 11 | buf: Vec<u8>, |
| 12 | } |
| 13 | |
| 14 | pub trait Ops { |
| 15 | type Flush: Flush; |
| 16 | fn total_in(&self) -> u64; |
| 17 | fn total_out(&self) -> u64; |
| 18 | fn run( |
| 19 | &mut self, |
| 20 | input: &[u8], |
| 21 | output: &mut [u8], |
| 22 | flush: Self::Flush, |
| 23 | ) -> Result<Status, DecompressError>; |
| 24 | fn run_vec( |
| 25 | &mut self, |
| 26 | input: &[u8], |
| 27 | output: &mut Vec<u8>, |
| 28 | flush: Self::Flush, |
| 29 | ) -> Result<Status, DecompressError>; |
| 30 | } |
| 31 | |
| 32 | impl Ops for Compress { |
| 33 | type Flush = FlushCompress; |
| 34 | fn total_in(&self) -> u64 { |
| 35 | self.total_in() |
| 36 | } |
| 37 | fn total_out(&self) -> u64 { |
| 38 | self.total_out() |
| 39 | } |
| 40 | fn run( |
| 41 | &mut self, |
| 42 | input: &[u8], |
| 43 | output: &mut [u8], |
| 44 | flush: FlushCompress, |
| 45 | ) -> Result<Status, DecompressError> { |
| 46 | Ok(self.compress(input, output, flush).unwrap()) |
| 47 | } |
| 48 | fn run_vec( |
| 49 | &mut self, |
| 50 | input: &[u8], |
| 51 | output: &mut Vec<u8>, |
| 52 | flush: FlushCompress, |
| 53 | ) -> Result<Status, DecompressError> { |
| 54 | Ok(self.compress_vec(input, output, flush).unwrap()) |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | impl Ops for Decompress { |
| 59 | type Flush = FlushDecompress; |
| 60 | fn total_in(&self) -> u64 { |
| 61 | self.total_in() |
| 62 | } |
| 63 | fn total_out(&self) -> u64 { |
| 64 | self.total_out() |
| 65 | } |
| 66 | fn run( |
| 67 | &mut self, |
| 68 | input: &[u8], |
| 69 | output: &mut [u8], |
| 70 | flush: FlushDecompress, |
| 71 | ) -> Result<Status, DecompressError> { |
| 72 | self.decompress(input, output, flush) |
| 73 | } |
| 74 | fn run_vec( |
| 75 | &mut self, |
| 76 | input: &[u8], |
| 77 | output: &mut Vec<u8>, |
| 78 | flush: FlushDecompress, |
| 79 | ) -> Result<Status, DecompressError> { |
| 80 | self.decompress_vec(input, output, flush) |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | pub trait Flush { |
| 85 | fn none() -> Self; |
| 86 | fn sync() -> Self; |
| 87 | fn finish() -> Self; |
| 88 | } |
| 89 | |
| 90 | impl Flush for FlushCompress { |
| 91 | fn none() -> Self { |
| 92 | FlushCompress::None |
| 93 | } |
| 94 | |
| 95 | fn sync() -> Self { |
| 96 | FlushCompress::Sync |
| 97 | } |
| 98 | |
| 99 | fn finish() -> Self { |
| 100 | FlushCompress::Finish |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | impl Flush for FlushDecompress { |
| 105 | fn none() -> Self { |
| 106 | FlushDecompress::None |
| 107 | } |
| 108 | |
| 109 | fn sync() -> Self { |
| 110 | FlushDecompress::Sync |
| 111 | } |
| 112 | |
| 113 | fn finish() -> Self { |
| 114 | FlushDecompress::Finish |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize> |
| 119 | where |
| 120 | R: BufRead, |
| 121 | D: Ops, |
| 122 | { |
| 123 | loop { |
| 124 | let (read, consumed, ret, eof); |
| 125 | { |
| 126 | let input = obj.fill_buf()?; |
| 127 | eof = input.is_empty(); |
| 128 | let before_out = data.total_out(); |
| 129 | let before_in = data.total_in(); |
| 130 | let flush = if eof { |
| 131 | D::Flush::finish() |
| 132 | } else { |
| 133 | D::Flush::none() |
| 134 | }; |
| 135 | ret = data.run(input, dst, flush); |
| 136 | read = (data.total_out() - before_out) as usize; |
| 137 | consumed = (data.total_in() - before_in) as usize; |
| 138 | } |
| 139 | obj.consume(consumed); |
| 140 | |
| 141 | match ret { |
| 142 | // If we haven't ready any data and we haven't hit EOF yet, |
| 143 | // then we need to keep asking for more data because if we |
| 144 | // return that 0 bytes of data have been read then it will |
| 145 | // be interpreted as EOF. |
| 146 | Ok(Status::Ok | Status::BufError) if read == 0 && !eof && !dst.is_empty() => continue, |
| 147 | Ok(Status::Ok | Status::BufError | Status::StreamEnd) => return Ok(read), |
| 148 | |
| 149 | Err(..) => { |
| 150 | return Err(io::Error::new( |
| 151 | io::ErrorKind::InvalidInput, |
| 152 | "corrupt deflate stream" , |
| 153 | )) |
| 154 | } |
| 155 | } |
| 156 | } |
| 157 | } |
| 158 | |
| 159 | impl<W: Write, D: Ops> Writer<W, D> { |
| 160 | pub fn new(w: W, d: D) -> Writer<W, D> { |
| 161 | Writer { |
| 162 | obj: Some(w), |
| 163 | data: d, |
| 164 | buf: Vec::with_capacity(32 * 1024), |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | pub fn finish(&mut self) -> io::Result<()> { |
| 169 | loop { |
| 170 | self.dump()?; |
| 171 | |
| 172 | let before = self.data.total_out(); |
| 173 | self.data.run_vec(&[], &mut self.buf, D::Flush::finish())?; |
| 174 | if before == self.data.total_out() { |
| 175 | return Ok(()); |
| 176 | } |
| 177 | } |
| 178 | } |
| 179 | |
| 180 | pub fn replace(&mut self, w: W) -> W { |
| 181 | self.buf.truncate(0); |
| 182 | mem::replace(self.get_mut(), w) |
| 183 | } |
| 184 | |
| 185 | pub fn get_ref(&self) -> &W { |
| 186 | self.obj.as_ref().unwrap() |
| 187 | } |
| 188 | |
| 189 | pub fn get_mut(&mut self) -> &mut W { |
| 190 | self.obj.as_mut().unwrap() |
| 191 | } |
| 192 | |
| 193 | // Note that this should only be called if the outer object is just about |
| 194 | // to be consumed! |
| 195 | // |
| 196 | // (e.g. an implementation of `into_inner`) |
| 197 | pub fn take_inner(&mut self) -> W { |
| 198 | self.obj.take().unwrap() |
| 199 | } |
| 200 | |
| 201 | pub fn is_present(&self) -> bool { |
| 202 | self.obj.is_some() |
| 203 | } |
| 204 | |
| 205 | // Returns total written bytes and status of underlying codec |
| 206 | pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> { |
| 207 | // miniz isn't guaranteed to actually write any of the buffer provided, |
| 208 | // it may be in a flushing mode where it's just giving us data before |
| 209 | // we're actually giving it any data. We don't want to spuriously return |
| 210 | // `Ok(0)` when possible as it will cause calls to write_all() to fail. |
| 211 | // As a result we execute this in a loop to ensure that we try our |
| 212 | // darndest to write the data. |
| 213 | loop { |
| 214 | self.dump()?; |
| 215 | |
| 216 | let before_in = self.data.total_in(); |
| 217 | let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none()); |
| 218 | let written = (self.data.total_in() - before_in) as usize; |
| 219 | let is_stream_end = matches!(ret, Ok(Status::StreamEnd)); |
| 220 | |
| 221 | if !buf.is_empty() && written == 0 && ret.is_ok() && !is_stream_end { |
| 222 | continue; |
| 223 | } |
| 224 | return match ret { |
| 225 | Ok(st) => match st { |
| 226 | Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)), |
| 227 | }, |
| 228 | Err(..) => Err(io::Error::new( |
| 229 | io::ErrorKind::InvalidInput, |
| 230 | "corrupt deflate stream" , |
| 231 | )), |
| 232 | }; |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | fn dump(&mut self) -> io::Result<()> { |
| 237 | // TODO: should manage this buffer not with `drain` but probably more of |
| 238 | // a deque-like strategy. |
| 239 | while !self.buf.is_empty() { |
| 240 | let n = self.obj.as_mut().unwrap().write(&self.buf)?; |
| 241 | if n == 0 { |
| 242 | return Err(io::ErrorKind::WriteZero.into()); |
| 243 | } |
| 244 | self.buf.drain(..n); |
| 245 | } |
| 246 | Ok(()) |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | impl<W: Write, D: Ops> Write for Writer<W, D> { |
| 251 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| 252 | self.write_with_status(buf).map(|res| res.0) |
| 253 | } |
| 254 | |
| 255 | fn flush(&mut self) -> io::Result<()> { |
| 256 | self.data |
| 257 | .run_vec(&[], &mut self.buf, D::Flush::sync()) |
| 258 | .unwrap(); |
| 259 | |
| 260 | // Unfortunately miniz doesn't actually tell us when we're done with |
| 261 | // pulling out all the data from the internal stream. To remedy this we |
| 262 | // have to continually ask the stream for more memory until it doesn't |
| 263 | // give us a chunk of memory the same size as our own internal buffer, |
| 264 | // at which point we assume it's reached the end. |
| 265 | loop { |
| 266 | self.dump()?; |
| 267 | let before = self.data.total_out(); |
| 268 | self.data |
| 269 | .run_vec(&[], &mut self.buf, D::Flush::none()) |
| 270 | .unwrap(); |
| 271 | if before == self.data.total_out() { |
| 272 | break; |
| 273 | } |
| 274 | } |
| 275 | |
| 276 | self.obj.as_mut().unwrap().flush() |
| 277 | } |
| 278 | } |
| 279 | |
| 280 | impl<W: Write, D: Ops> Drop for Writer<W, D> { |
| 281 | fn drop(&mut self) { |
| 282 | if self.obj.is_some() { |
| 283 | let _ = self.finish(); |
| 284 | } |
| 285 | } |
| 286 | } |
| 287 | |