1 | //! Composable structures to handle reading an image.
|
2 |
|
3 |
|
4 | use std::convert::TryFrom;
|
5 | use std::fmt::Debug;
|
6 | use std::io::{Read, Seek};
|
7 | use rayon_core::{ThreadPool, ThreadPoolBuildError};
|
8 |
|
9 | use smallvec::alloc::sync::Arc;
|
10 |
|
11 | use crate::block::{BlockIndex, UncompressedBlock};
|
12 | use crate::block::chunk::{Chunk, TileCoordinates};
|
13 | use crate::compression::Compression;
|
14 | use crate::error::{Error, Result, u64_to_usize, UnitResult};
|
15 | use crate::io::{PeekRead, Tracking};
|
16 | use crate::meta::{MetaData, OffsetTables};
|
17 | use crate::meta::header::Header;
|
18 |
|
19 | /// Decode the meta data from a byte source, keeping the source ready for further reading.
|
20 | /// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`.
|
21 | #[derive (Debug)]
|
22 | pub struct Reader<R> {
|
23 | meta_data: MetaData,
|
24 | remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough?
|
25 | }
|
26 |
|
27 | impl<R: Read + Seek> Reader<R> {
|
28 |
|
29 | /// Start the reading process.
|
30 | /// Immediately decodes the meta data into an internal field.
|
31 | /// Access it via`meta_data()`.
|
32 | pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
|
33 | let mut remaining_reader = PeekRead::new(Tracking::new(read));
|
34 | let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
|
35 | Ok(Self { meta_data, remaining_reader })
|
36 | }
|
37 |
|
38 | // must not be mutable, as reading the file later on relies on the meta data
|
39 | /// The decoded exr meta data from the file.
|
40 | pub fn meta_data(&self) -> &MetaData { &self.meta_data }
|
41 |
|
42 | /// The decoded exr meta data from the file.
|
43 | pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
|
44 |
|
45 | /// Obtain the meta data ownership.
|
46 | pub fn into_meta_data(self) -> MetaData { self.meta_data }
|
47 |
|
48 | /// Prepare to read all the chunks from the file.
|
49 | /// Does not decode the chunks now, but returns a decoder.
|
50 | /// Reading all chunks reduces seeking the file, but some chunks might be read without being used.
|
51 | pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
|
52 | let total_chunk_count = {
|
53 | if pedantic {
|
54 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
|
55 | validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
|
56 | offset_tables.iter().map(|table| table.len()).sum()
|
57 | }
|
58 | else {
|
59 | usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?)
|
60 | .expect("too large chunk count for this machine" )
|
61 | }
|
62 | };
|
63 |
|
64 | Ok(AllChunksReader {
|
65 | meta_data: self.meta_data,
|
66 | remaining_chunks: 0 .. total_chunk_count,
|
67 | remaining_bytes: self.remaining_reader,
|
68 | pedantic
|
69 | })
|
70 | }
|
71 |
|
72 | /// Prepare to read some the chunks from the file.
|
73 | /// Does not decode the chunks now, but returns a decoder.
|
74 | /// Reading only some chunks may seeking the file, potentially skipping many bytes.
|
75 | // TODO tile indices add no new information to block index??
|
76 | pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
|
77 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
|
78 |
|
79 | // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk?
|
80 | if pedantic {
|
81 | validate_offset_tables(
|
82 | self.meta_data.headers.as_slice(), &offset_tables,
|
83 | self.remaining_reader.byte_position()
|
84 | )?;
|
85 | }
|
86 |
|
87 | let mut filtered_offsets = Vec::with_capacity(
|
88 | (self.meta_data.headers.len() * 32).min(2*2048)
|
89 | );
|
90 |
|
91 | // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied
|
92 |
|
93 | for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers
|
94 | for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order
|
95 | let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
|
96 |
|
97 | let block = BlockIndex {
|
98 | layer: header_index,
|
99 | level: tile.location.level_index,
|
100 | pixel_position: data_indices.position.to_usize("data indices start" )?,
|
101 | pixel_size: data_indices.size,
|
102 | };
|
103 |
|
104 | if filter(&self.meta_data, tile.location, block) {
|
105 | filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()`
|
106 | }
|
107 | };
|
108 | }
|
109 |
|
110 | filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing)
|
111 |
|
112 | if pedantic {
|
113 | // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid.
|
114 | if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
|
115 | return Err(Error::invalid("chunk offset table" ))
|
116 | }
|
117 | }
|
118 |
|
119 | Ok(FilteredChunksReader {
|
120 | meta_data: self.meta_data,
|
121 | expected_filtered_chunk_count: filtered_offsets.len(),
|
122 | remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
|
123 | remaining_bytes: self.remaining_reader
|
124 | })
|
125 | }
|
126 | }
|
127 |
|
128 |
|
129 | fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
|
130 | let max_pixel_bytes: usize = headersimpl Iterator .iter() // when compressed, chunks are smaller, but never larger than max
|
131 | .map(|header: &Header| header.max_pixel_file_bytes())
|
132 | .sum();
|
133 |
|
134 | // check that each offset is within the bounds
|
135 | let end_byte: usize = chunks_start_byte + max_pixel_bytes;
|
136 | let is_invalid: bool = offset_tablesimpl Iterator .iter().flatten().map(|&u64: u64| u64_to_usize(u64))
|
137 | .any(|chunk_start: usize| chunk_start < chunks_start_byte || chunk_start > end_byte);
|
138 |
|
139 | if is_invalid { Err(Error::invalid(message:"offset table" )) }
|
140 | else { Ok(()) }
|
141 | }
|
142 |
|
143 |
|
144 |
|
145 |
|
146 | /// Decode the desired chunks and skip the unimportant chunks in the file.
|
147 | /// The decoded chunks can be decompressed by calling
|
148 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
|
149 | /// Call `on_progress` to have a callback with each block.
|
150 | /// Also contains the image meta data.
|
151 | #[derive (Debug)]
|
152 | pub struct FilteredChunksReader<R> {
|
153 | meta_data: MetaData,
|
154 | expected_filtered_chunk_count: usize,
|
155 | remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
|
156 | remaining_bytes: PeekRead<Tracking<R>>,
|
157 | }
|
158 |
|
159 | /// Decode all chunks in the file without seeking.
|
160 | /// The decoded chunks can be decompressed by calling
|
161 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
|
162 | /// Call `on_progress` to have a callback with each block.
|
163 | /// Also contains the image meta data.
|
164 | #[derive (Debug)]
|
165 | pub struct AllChunksReader<R> {
|
166 | meta_data: MetaData,
|
167 | remaining_chunks: std::ops::Range<usize>,
|
168 | remaining_bytes: PeekRead<Tracking<R>>,
|
169 | pedantic: bool,
|
170 | }
|
171 |
|
172 | /// Decode chunks in the file without seeking.
|
173 | /// Calls the supplied closure for each chunk.
|
174 | /// The decoded chunks can be decompressed by calling
|
175 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
|
176 | /// Also contains the image meta data.
|
177 | #[derive (Debug)]
|
178 | pub struct OnProgressChunksReader<R, F> {
|
179 | chunks_reader: R,
|
180 | decoded_chunks: usize,
|
181 | callback: F,
|
182 | }
|
183 |
|
184 | /// Decode chunks in the file.
|
185 | /// The decoded chunks can be decompressed by calling
|
186 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
|
187 | /// Call `on_progress` to have a callback with each block.
|
188 | /// Also contains the image meta data.
|
189 | pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
|
190 |
|
191 | /// The decoded exr meta data from the file.
|
192 | fn meta_data(&self) -> &MetaData;
|
193 |
|
194 | /// The decoded exr headers from the file.
|
195 | fn headers(&self) -> &[Header] { &self.meta_data().headers }
|
196 |
|
197 | /// The number of chunks that this reader will return in total.
|
198 | /// Can be less than the total number of chunks in the file, if some chunks are skipped.
|
199 | fn expected_chunk_count(&self) -> usize;
|
200 |
|
201 | /// Read the next compressed chunk from the file.
|
202 | /// Equivalent to `.next()`, as this also is an iterator.
|
203 | /// Returns `None` if all chunks have been read.
|
204 | fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
|
205 |
|
206 | /// Create a new reader that calls the provided progress
|
207 | /// callback for each chunk that is read from the file.
|
208 | /// If the file can be successfully decoded,
|
209 | /// the progress will always at least once include 0.0 at the start and 1.0 at the end.
|
210 | fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
|
211 | OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
|
212 | }
|
213 |
|
214 | /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block.
|
215 | /// The order of the blocks is not deterministic.
|
216 | /// You can also use `parallel_decompressor` to obtain an iterator instead.
|
217 | /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process.
|
218 | // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>)
|
219 | fn decompress_parallel(
|
220 | self, pedantic: bool,
|
221 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
|
222 | ) -> UnitResult
|
223 | {
|
224 | let mut decompressor = match self.parallel_decompressor(pedantic) {
|
225 | Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
|
226 | Ok(decompressor) => decompressor,
|
227 | };
|
228 |
|
229 | while let Some(block) = decompressor.next() {
|
230 | insert_block(decompressor.meta_data(), block?)?;
|
231 | }
|
232 |
|
233 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks" );
|
234 | Ok(())
|
235 | }
|
236 |
|
237 | /// Return an iterator that decompresses the chunks with multiple threads.
|
238 | /// The order of the blocks is not deterministic.
|
239 | /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool.
|
240 | /// By default, this uses as many threads as there are CPUs.
|
241 | /// Returns the `self` if there is no need for parallel decompression.
|
242 | fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
|
243 | ParallelBlockDecompressor::new(self, pedantic)
|
244 | }
|
245 |
|
246 | /// Return an iterator that decompresses the chunks in this thread.
|
247 | /// You can alternatively use `sequential_decompressor` if you prefer an external iterator.
|
248 | fn decompress_sequential(
|
249 | self, pedantic: bool,
|
250 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
|
251 | ) -> UnitResult
|
252 | {
|
253 | let mut decompressor = self.sequential_decompressor(pedantic);
|
254 | while let Some(block) = decompressor.next() {
|
255 | insert_block(decompressor.meta_data(), block?)?;
|
256 | }
|
257 |
|
258 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks" );
|
259 | Ok(())
|
260 | }
|
261 |
|
262 | /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead.
|
263 | fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
|
264 | SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
|
265 | }
|
266 | }
|
267 |
|
268 | impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
|
269 | fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
|
270 | fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
|
271 | }
|
272 |
|
273 | impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
|
274 | impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
|
275 | type Item = Result<Chunk>;
|
276 |
|
277 | fn next(&mut self) -> Option<Self::Item> {
|
278 | self.chunks_reader.next().map(|item|{
|
279 | {
|
280 | let total_chunks = self.expected_chunk_count() as f64;
|
281 | let callback = &mut self.callback;
|
282 | callback(self.decoded_chunks as f64 / total_chunks);
|
283 | }
|
284 |
|
285 | self.decoded_chunks += 1;
|
286 | item
|
287 | })
|
288 | .or_else(||{
|
289 | debug_assert_eq!(
|
290 | self.decoded_chunks, self.expected_chunk_count(),
|
291 | "chunks reader finished but not all chunks are decompressed"
|
292 | );
|
293 |
|
294 | let callback = &mut self.callback;
|
295 | callback(1.0);
|
296 | None
|
297 | })
|
298 | }
|
299 |
|
300 | fn size_hint(&self) -> (usize, Option<usize>) {
|
301 | self.chunks_reader.size_hint()
|
302 | }
|
303 | }
|
304 |
|
305 | impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
|
306 | fn meta_data(&self) -> &MetaData { &self.meta_data }
|
307 | fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
|
308 | }
|
309 |
|
310 | impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
|
311 | impl<R: Read + Seek> Iterator for AllChunksReader<R> {
|
312 | type Item = Result<Chunk>;
|
313 |
|
314 | fn next(&mut self) -> Option<Self::Item> {
|
315 | // read as many chunks as the file should contain (inferred from meta data)
|
316 | let next_chunk: Option> = self.remaining_chunks.next()
|
317 | .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
|
318 |
|
319 | // if no chunks are left, but some bytes remain, return error
|
320 | if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
|
321 | return Some(Err(Error::invalid(message:"end of file expected" )));
|
322 | }
|
323 |
|
324 | next_chunk
|
325 | }
|
326 |
|
327 | fn size_hint(&self) -> (usize, Option<usize>) {
|
328 | (self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
|
329 | }
|
330 | }
|
331 |
|
332 | impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
|
333 | fn meta_data(&self) -> &MetaData { &self.meta_data }
|
334 | fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
|
335 | }
|
336 |
|
337 | impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
|
338 | impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
|
339 | type Item = Result<Chunk>;
|
340 |
|
341 | fn next(&mut self) -> Option<Self::Item> {
|
342 | // read as many chunks as we have desired chunk offsets
|
343 | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location: u64|{
|
344 | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
|
345 | position:usize::try_from(next_chunk_location)
|
346 | .expect(msg:"too large chunk position for this machine" )
|
347 | )?;
|
348 |
|
349 | let meta_data: &MetaData = &self.meta_data;
|
350 | Chunk::read(&mut self.remaining_bytes, meta_data)
|
351 | })
|
352 |
|
353 | // TODO remember last chunk index and then seek to index+size and check whether bytes are left?
|
354 | }
|
355 |
|
356 | fn size_hint(&self) -> (usize, Option<usize>) {
|
357 | (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
|
358 | }
|
359 | }
|
360 |
|
361 | /// Read all chunks from the file, decompressing each chunk immediately.
|
362 | /// Implements iterator.
|
363 | #[derive (Debug)]
|
364 | pub struct SequentialBlockDecompressor<R: ChunksReader> {
|
365 | remaining_chunks_reader: R,
|
366 | pedantic: bool,
|
367 | }
|
368 |
|
369 | impl<R: ChunksReader> SequentialBlockDecompressor<R> {
|
370 |
|
371 | /// The extracted meta data from the image file.
|
372 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
|
373 |
|
374 | /// Read and then decompress a single block of pixels from the byte source.
|
375 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
|
376 | self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk: Result|{
|
377 | UncompressedBlock::decompress_chunk(chunk:compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
|
378 | })
|
379 | }
|
380 | }
|
381 |
|
382 | /// Decompress the chunks in a file in parallel.
|
383 | /// The first call to `next` will fill the thread pool with jobs,
|
384 | /// starting to decompress the next few blocks.
|
385 | /// These jobs will finish, even if you stop reading more blocks.
|
386 | /// Implements iterator.
|
387 | #[derive (Debug)]
|
388 | pub struct ParallelBlockDecompressor<R: ChunksReader> {
|
389 | remaining_chunks: R,
|
390 | sender: flume::Sender<Result<UncompressedBlock>>,
|
391 | receiver: flume::Receiver<Result<UncompressedBlock>>,
|
392 | currently_decompressing_count: usize,
|
393 | max_threads: usize,
|
394 |
|
395 | shared_meta_data_ref: Arc<MetaData>,
|
396 | pedantic: bool,
|
397 |
|
398 | pool: ThreadPool,
|
399 | }
|
400 |
|
401 | impl<R: ChunksReader> ParallelBlockDecompressor<R> {
|
402 |
|
403 | /// Create a new decompressor. Does not immediately spawn any tasks.
|
404 | /// Decompression starts after the first call to `next`.
|
405 | /// Returns the chunks if parallel decompression should not be used.
|
406 | /// Use `new_with_thread_pool` to customize the threadpool.
|
407 | pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
|
408 | Self::new_with_thread_pool(chunks, pedantic, ||{
|
409 | rayon_core::ThreadPoolBuilder::new()
|
410 | .thread_name(|index| format!("OpenEXR Block Decompressor Thread # {}" , index))
|
411 | .build()
|
412 | })
|
413 | }
|
414 |
|
415 | /// Create a new decompressor. Does not immediately spawn any tasks.
|
416 | /// Decompression starts after the first call to `next`.
|
417 | /// Returns the chunks if parallel decompression should not be used.
|
418 | pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
|
419 | -> std::result::Result<Self, R>
|
420 | where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError>
|
421 | {
|
422 | // if no compression is used in the file, don't use a threadpool
|
423 | if chunks.meta_data().headers.iter()
|
424 | .all(|head|head.compression == Compression::Uncompressed)
|
425 | {
|
426 | return Err(chunks);
|
427 | }
|
428 |
|
429 | // in case thread pool creation fails (for example on WASM currently),
|
430 | // we revert to sequential decompression
|
431 | let pool = match try_create_thread_pool() {
|
432 | Ok(pool) => pool,
|
433 |
|
434 | // TODO print warning?
|
435 | Err(_) => return Err(chunks),
|
436 | };
|
437 |
|
438 | let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times
|
439 |
|
440 | let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic?
|
441 |
|
442 | Ok(Self {
|
443 | shared_meta_data_ref: Arc::new(chunks.meta_data().clone()),
|
444 | currently_decompressing_count: 0,
|
445 | remaining_chunks: chunks,
|
446 | sender: send,
|
447 | receiver: recv,
|
448 | pedantic,
|
449 | max_threads,
|
450 |
|
451 | pool,
|
452 | })
|
453 | }
|
454 |
|
455 | /// Fill the pool with decompression jobs. Returns the first job that finishes.
|
456 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
|
457 |
|
458 | while self.currently_decompressing_count < self.max_threads {
|
459 | let block = self.remaining_chunks.next();
|
460 | if let Some(block) = block {
|
461 | let block = match block {
|
462 | Ok(block) => block,
|
463 | Err(error) => return Some(Err(error))
|
464 | };
|
465 |
|
466 | let sender = self.sender.clone();
|
467 | let meta = self.shared_meta_data_ref.clone();
|
468 | let pedantic = self.pedantic;
|
469 |
|
470 | self.currently_decompressing_count += 1;
|
471 |
|
472 | self.pool.spawn(move || {
|
473 | let decompressed_or_err = UncompressedBlock::decompress_chunk(
|
474 | block, &meta, pedantic
|
475 | );
|
476 |
|
477 | // by now, decompressing could have failed in another thread.
|
478 | // the error is then already handled, so we simply
|
479 | // don't send the decompressed block and do nothing
|
480 | let _ = sender.send(decompressed_or_err);
|
481 | });
|
482 | }
|
483 | else {
|
484 | // there are no chunks left to decompress
|
485 | break;
|
486 | }
|
487 | }
|
488 |
|
489 | if self.currently_decompressing_count > 0 {
|
490 | let next = self.receiver.recv()
|
491 | .expect("all decompressing senders hung up but more messages were expected" );
|
492 |
|
493 | self.currently_decompressing_count -= 1;
|
494 | Some(next)
|
495 | }
|
496 | else {
|
497 | debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks" ); // TODO not reliable
|
498 | debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks" );
|
499 | None
|
500 | }
|
501 | }
|
502 |
|
503 | /// The extracted meta data of the image file.
|
504 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
|
505 | }
|
506 |
|
507 | impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
|
508 | impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
|
509 | type Item = Result<UncompressedBlock>;
|
510 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
|
511 | fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
|
512 | }
|
513 |
|
514 | impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
|
515 | impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
|
516 | type Item = Result<UncompressedBlock>;
|
517 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
|
518 | fn size_hint(&self) -> (usize, Option<usize>) {
|
519 | let remaining: usize = self.remaining_chunks.len() + self.currently_decompressing_count;
|
520 | (remaining, Some(remaining))
|
521 | }
|
522 | }
|
523 |
|
524 |
|
525 |
|
526 |
|
527 |
|
528 | |