1use core::borrow::BorrowMut;
2
3use crate::frame_decoder::{BlockDecodingStrategy, FrameDecoder, FrameDecoderError};
4use crate::io::{Error, ErrorKind, Read};
5
6/// High level decoder that implements a io::Read that can be used with
7/// io::Read::read_to_end / io::Read::read_exact or passing this to another library / module as a source for the decoded content
8///
9/// The lower level FrameDecoder by comparison allows for finer grained control but need sto have it's decode_blocks method called continously
10/// to decode the zstd-frame.
11pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
12 pub decoder: DEC,
13 source: READ,
14}
15
16impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
17 pub fn new_with_decoder(
18 mut source: READ,
19 mut decoder: DEC,
20 ) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
21 decoder.borrow_mut().init(&mut source)?;
22 Ok(StreamingDecoder { decoder, source })
23 }
24}
25
26impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
27 pub fn new(
28 mut source: READ,
29 ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
30 let mut decoder: FrameDecoder = FrameDecoder::new();
31 decoder.init(&mut source)?;
32 Ok(StreamingDecoder { decoder, source })
33 }
34
35 pub fn inner(self) -> FrameDecoder {
36 self.decoder
37 }
38}
39
40impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
41 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
42 let decoder = self.decoder.borrow_mut();
43 if decoder.is_finished() && decoder.can_collect() == 0 {
44 //No more bytes can ever be decoded
45 return Ok(0);
46 }
47
48 // need to loop. The UpToBytes strategy doesn't take any effort to actually reach that limit.
49 // The first few calls can result in just filling the decode buffer but these bytes can not be collected.
50 // So we need to call this until we can actually collect enough bytes
51
52 // TODO add BlockDecodingStrategy::UntilCollectable(usize) that pushes this logic into the decode_blocks function
53 while decoder.can_collect() < buf.len() && !decoder.is_finished() {
54 //More bytes can be decoded
55 let additional_bytes_needed = buf.len() - decoder.can_collect();
56 match decoder.decode_blocks(
57 &mut self.source,
58 BlockDecodingStrategy::UptoBytes(additional_bytes_needed),
59 ) {
60 Ok(_) => { /*Nothing to do*/ }
61 Err(e) => {
62 let err;
63 #[cfg(feature = "std")]
64 {
65 err = Error::new(ErrorKind::Other, e);
66 }
67 #[cfg(not(feature = "std"))]
68 {
69 err = Error::new(ErrorKind::Other, alloc::boxed::Box::new(e));
70 }
71 return Err(err);
72 }
73 }
74 }
75
76 decoder.read(buf)
77 }
78}
79