1 | //! Composable structures to handle reading an image.
|
2 |
|
3 |
|
4 | use std::convert::TryFrom;
|
5 | use std::fmt::Debug;
|
6 | use std::io::{Read, Seek};
|
7 | use std::sync::mpsc;
|
8 | use rayon_core::{ThreadPool, ThreadPoolBuildError};
|
9 |
|
10 | use smallvec::alloc::sync::Arc;
|
11 |
|
12 | use crate::block::{BlockIndex, UncompressedBlock};
|
13 | use crate::block::chunk::{Chunk, TileCoordinates};
|
14 | use crate::compression::Compression;
|
15 | use crate::error::{Error, Result, u64_to_usize, UnitResult};
|
16 | use crate::io::{PeekRead, Tracking};
|
17 | use crate::meta::{MetaData, OffsetTables};
|
18 | use crate::meta::header::Header;
|
19 |
|
20 | /// Decode the meta data from a byte source, keeping the source ready for further reading.
|
21 | /// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`.
|
22 | #[derive (Debug)]
|
23 | pub struct Reader<R> {
|
24 | meta_data: MetaData,
|
25 | remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough?
|
26 | }
|
27 |
|
28 | impl<R: Read + Seek> Reader<R> {
|
29 |
|
30 | /// Start the reading process.
|
31 | /// Immediately decodes the meta data into an internal field.
|
32 | /// Access it via`meta_data()`.
|
33 | pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
|
34 | let mut remaining_reader = PeekRead::new(Tracking::new(read));
|
35 | let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
|
36 | Ok(Self { meta_data, remaining_reader })
|
37 | }
|
38 |
|
39 | // must not be mutable, as reading the file later on relies on the meta data
|
40 | /// The decoded exr meta data from the file.
|
41 | pub fn meta_data(&self) -> &MetaData { &self.meta_data }
|
42 |
|
43 | /// The decoded exr meta data from the file.
|
44 | pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
|
45 |
|
46 | /// Obtain the meta data ownership.
|
47 | pub fn into_meta_data(self) -> MetaData { self.meta_data }
|
48 |
|
49 | /// Prepare to read all the chunks from the file.
|
50 | /// Does not decode the chunks now, but returns a decoder.
|
51 | /// Reading all chunks reduces seeking the file, but some chunks might be read without being used.
|
52 | pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
|
53 | let total_chunk_count = {
|
54 | if pedantic {
|
55 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
|
56 | validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
|
57 | offset_tables.iter().map(|table| table.len()).sum()
|
58 | }
|
59 | else {
|
60 | usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?)
|
61 | .expect("too large chunk count for this machine" )
|
62 | }
|
63 | };
|
64 |
|
65 | Ok(AllChunksReader {
|
66 | meta_data: self.meta_data,
|
67 | remaining_chunks: 0 .. total_chunk_count,
|
68 | remaining_bytes: self.remaining_reader,
|
69 | pedantic
|
70 | })
|
71 | }
|
72 |
|
73 | /// Prepare to read some the chunks from the file.
|
74 | /// Does not decode the chunks now, but returns a decoder.
|
75 | /// Reading only some chunks may seeking the file, potentially skipping many bytes.
|
76 | // TODO tile indices add no new information to block index??
|
77 | pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
|
78 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
|
79 |
|
80 | // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk?
|
81 | if pedantic {
|
82 | validate_offset_tables(
|
83 | self.meta_data.headers.as_slice(), &offset_tables,
|
84 | self.remaining_reader.byte_position()
|
85 | )?;
|
86 | }
|
87 |
|
88 | let mut filtered_offsets = Vec::with_capacity(
|
89 | (self.meta_data.headers.len() * 32).min(2*2048)
|
90 | );
|
91 |
|
92 | // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied
|
93 |
|
94 | for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers
|
95 | for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order
|
96 | let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
|
97 |
|
98 | let block = BlockIndex {
|
99 | layer: header_index,
|
100 | level: tile.location.level_index,
|
101 | pixel_position: data_indices.position.to_usize("data indices start" )?,
|
102 | pixel_size: data_indices.size,
|
103 | };
|
104 |
|
105 | if filter(&self.meta_data, tile.location, block) {
|
106 | filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()`
|
107 | }
|
108 | };
|
109 | }
|
110 |
|
111 | filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing)
|
112 |
|
113 | if pedantic {
|
114 | // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid.
|
115 | if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
|
116 | return Err(Error::invalid("chunk offset table" ))
|
117 | }
|
118 | }
|
119 |
|
120 | Ok(FilteredChunksReader {
|
121 | meta_data: self.meta_data,
|
122 | expected_filtered_chunk_count: filtered_offsets.len(),
|
123 | remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
|
124 | remaining_bytes: self.remaining_reader
|
125 | })
|
126 | }
|
127 | }
|
128 |
|
129 |
|
130 | fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
|
131 | let max_pixel_bytes: usize = headersimpl Iterator .iter() // when compressed, chunks are smaller, but never larger than max
|
132 | .map(|header: &Header| header.max_pixel_file_bytes())
|
133 | .sum();
|
134 |
|
135 | // check that each offset is within the bounds
|
136 | let end_byte: usize = chunks_start_byte + max_pixel_bytes;
|
137 | let is_invalid: bool = offset_tablesimpl Iterator .iter().flatten().map(|&u64: u64| u64_to_usize(u64))
|
138 | .any(|chunk_start: usize| chunk_start < chunks_start_byte || chunk_start > end_byte);
|
139 |
|
140 | if is_invalid { Err(Error::invalid(message:"offset table" )) }
|
141 | else { Ok(()) }
|
142 | }
|
143 |
|
144 |
|
145 |
|
146 |
|
147 | /// Decode the desired chunks and skip the unimportant chunks in the file.
|
148 | /// The decoded chunks can be decompressed by calling
|
149 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
|
150 | /// Call `on_progress` to have a callback with each block.
|
151 | /// Also contains the image meta data.
|
152 | #[derive (Debug)]
|
153 | pub struct FilteredChunksReader<R> {
|
154 | meta_data: MetaData,
|
155 | expected_filtered_chunk_count: usize,
|
156 | remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
|
157 | remaining_bytes: PeekRead<Tracking<R>>,
|
158 | }
|
159 |
|
160 | /// Decode all chunks in the file without seeking.
|
161 | /// The decoded chunks can be decompressed by calling
|
162 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
|
163 | /// Call `on_progress` to have a callback with each block.
|
164 | /// Also contains the image meta data.
|
165 | #[derive (Debug)]
|
166 | pub struct AllChunksReader<R> {
|
167 | meta_data: MetaData,
|
168 | remaining_chunks: std::ops::Range<usize>,
|
169 | remaining_bytes: PeekRead<Tracking<R>>,
|
170 | pedantic: bool,
|
171 | }
|
172 |
|
173 | /// Decode chunks in the file without seeking.
|
174 | /// Calls the supplied closure for each chunk.
|
175 | /// The decoded chunks can be decompressed by calling
|
176 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
|
177 | /// Also contains the image meta data.
|
178 | #[derive (Debug)]
|
179 | pub struct OnProgressChunksReader<R, F> {
|
180 | chunks_reader: R,
|
181 | decoded_chunks: usize,
|
182 | callback: F,
|
183 | }
|
184 |
|
185 | /// Decode chunks in the file.
|
186 | /// The decoded chunks can be decompressed by calling
|
187 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
|
188 | /// Call `on_progress` to have a callback with each block.
|
189 | /// Also contains the image meta data.
|
190 | pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
|
191 |
|
192 | /// The decoded exr meta data from the file.
|
193 | fn meta_data(&self) -> &MetaData;
|
194 |
|
195 | /// The decoded exr headers from the file.
|
196 | fn headers(&self) -> &[Header] { &self.meta_data().headers }
|
197 |
|
198 | /// The number of chunks that this reader will return in total.
|
199 | /// Can be less than the total number of chunks in the file, if some chunks are skipped.
|
200 | fn expected_chunk_count(&self) -> usize;
|
201 |
|
202 | /// Read the next compressed chunk from the file.
|
203 | /// Equivalent to `.next()`, as this also is an iterator.
|
204 | /// Returns `None` if all chunks have been read.
|
205 | fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
|
206 |
|
207 | /// Create a new reader that calls the provided progress
|
208 | /// callback for each chunk that is read from the file.
|
209 | /// If the file can be successfully decoded,
|
210 | /// the progress will always at least once include 0.0 at the start and 1.0 at the end.
|
211 | fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
|
212 | OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
|
213 | }
|
214 |
|
215 | /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block.
|
216 | /// The order of the blocks is not deterministic.
|
217 | /// You can also use `parallel_decompressor` to obtain an iterator instead.
|
218 | /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process.
|
219 | // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>)
|
220 | fn decompress_parallel(
|
221 | self, pedantic: bool,
|
222 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
|
223 | ) -> UnitResult
|
224 | {
|
225 | let mut decompressor = match self.parallel_decompressor(pedantic) {
|
226 | Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
|
227 | Ok(decompressor) => decompressor,
|
228 | };
|
229 |
|
230 | while let Some(block) = decompressor.next() {
|
231 | insert_block(decompressor.meta_data(), block?)?;
|
232 | }
|
233 |
|
234 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks" );
|
235 | Ok(())
|
236 | }
|
237 |
|
238 | /// Return an iterator that decompresses the chunks with multiple threads.
|
239 | /// The order of the blocks is not deterministic.
|
240 | /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool.
|
241 | /// By default, this uses as many threads as there are CPUs.
|
242 | /// Returns the `self` if there is no need for parallel decompression.
|
243 | fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
|
244 | ParallelBlockDecompressor::new(self, pedantic)
|
245 | }
|
246 |
|
247 | /// Return an iterator that decompresses the chunks in this thread.
|
248 | /// You can alternatively use `sequential_decompressor` if you prefer an external iterator.
|
249 | fn decompress_sequential(
|
250 | self, pedantic: bool,
|
251 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
|
252 | ) -> UnitResult
|
253 | {
|
254 | let mut decompressor = self.sequential_decompressor(pedantic);
|
255 | while let Some(block) = decompressor.next() {
|
256 | insert_block(decompressor.meta_data(), block?)?;
|
257 | }
|
258 |
|
259 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks" );
|
260 | Ok(())
|
261 | }
|
262 |
|
263 | /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead.
|
264 | fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
|
265 | SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
|
266 | }
|
267 | }
|
268 |
|
269 | impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
|
270 | fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
|
271 | fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
|
272 | }
|
273 |
|
274 | impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
|
275 | impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
|
276 | type Item = Result<Chunk>;
|
277 |
|
278 | fn next(&mut self) -> Option<Self::Item> {
|
279 | self.chunks_reader.next().map(|item|{
|
280 | {
|
281 | let total_chunks = self.expected_chunk_count() as f64;
|
282 | let callback = &mut self.callback;
|
283 | callback(self.decoded_chunks as f64 / total_chunks);
|
284 | }
|
285 |
|
286 | self.decoded_chunks += 1;
|
287 | item
|
288 | })
|
289 | .or_else(||{
|
290 | debug_assert_eq!(
|
291 | self.decoded_chunks, self.expected_chunk_count(),
|
292 | "chunks reader finished but not all chunks are decompressed"
|
293 | );
|
294 |
|
295 | let callback = &mut self.callback;
|
296 | callback(1.0);
|
297 | None
|
298 | })
|
299 | }
|
300 |
|
301 | fn size_hint(&self) -> (usize, Option<usize>) {
|
302 | self.chunks_reader.size_hint()
|
303 | }
|
304 | }
|
305 |
|
306 | impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
|
307 | fn meta_data(&self) -> &MetaData { &self.meta_data }
|
308 | fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
|
309 | }
|
310 |
|
311 | impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
|
312 | impl<R: Read + Seek> Iterator for AllChunksReader<R> {
|
313 | type Item = Result<Chunk>;
|
314 |
|
315 | fn next(&mut self) -> Option<Self::Item> {
|
316 | // read as many chunks as the file should contain (inferred from meta data)
|
317 | let next_chunk: Option> = self.remaining_chunks.next()
|
318 | .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
|
319 |
|
320 | // if no chunks are left, but some bytes remain, return error
|
321 | if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
|
322 | return Some(Err(Error::invalid(message:"end of file expected" )));
|
323 | }
|
324 |
|
325 | next_chunk
|
326 | }
|
327 |
|
328 | fn size_hint(&self) -> (usize, Option<usize>) {
|
329 | (self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
|
330 | }
|
331 | }
|
332 |
|
333 | impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
|
334 | fn meta_data(&self) -> &MetaData { &self.meta_data }
|
335 | fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
|
336 | }
|
337 |
|
338 | impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
|
339 | impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
|
340 | type Item = Result<Chunk>;
|
341 |
|
342 | fn next(&mut self) -> Option<Self::Item> {
|
343 | // read as many chunks as we have desired chunk offsets
|
344 | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location: u64|{
|
345 | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
|
346 | position:usize::try_from(next_chunk_location)
|
347 | .expect(msg:"too large chunk position for this machine" )
|
348 | )?;
|
349 |
|
350 | let meta_data: &MetaData = &self.meta_data;
|
351 | Chunk::read(&mut self.remaining_bytes, meta_data)
|
352 | })
|
353 |
|
354 | // TODO remember last chunk index and then seek to index+size and check whether bytes are left?
|
355 | }
|
356 |
|
357 | fn size_hint(&self) -> (usize, Option<usize>) {
|
358 | (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
|
359 | }
|
360 | }
|
361 |
|
362 | /// Read all chunks from the file, decompressing each chunk immediately.
|
363 | /// Implements iterator.
|
364 | #[derive (Debug)]
|
365 | pub struct SequentialBlockDecompressor<R: ChunksReader> {
|
366 | remaining_chunks_reader: R,
|
367 | pedantic: bool,
|
368 | }
|
369 |
|
370 | impl<R: ChunksReader> SequentialBlockDecompressor<R> {
|
371 |
|
372 | /// The extracted meta data from the image file.
|
373 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
|
374 |
|
375 | /// Read and then decompress a single block of pixels from the byte source.
|
376 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
|
377 | self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk: Result|{
|
378 | UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
|
379 | })
|
380 | }
|
381 | }
|
382 |
|
383 | /// Decompress the chunks in a file in parallel.
|
384 | /// The first call to `next` will fill the thread pool with jobs,
|
385 | /// starting to decompress the next few blocks.
|
386 | /// These jobs will finish, even if you stop reading more blocks.
|
387 | /// Implements iterator.
|
388 | #[derive (Debug)]
|
389 | pub struct ParallelBlockDecompressor<R: ChunksReader> {
|
390 | remaining_chunks: R,
|
391 | sender: mpsc::Sender<Result<UncompressedBlock>>,
|
392 | receiver: mpsc::Receiver<Result<UncompressedBlock>>,
|
393 | currently_decompressing_count: usize,
|
394 | max_threads: usize,
|
395 |
|
396 | shared_meta_data_ref: Arc<MetaData>,
|
397 | pedantic: bool,
|
398 |
|
399 | pool: ThreadPool,
|
400 | }
|
401 |
|
402 | impl<R: ChunksReader> ParallelBlockDecompressor<R> {
|
403 |
|
404 | /// Create a new decompressor. Does not immediately spawn any tasks.
|
405 | /// Decompression starts after the first call to `next`.
|
406 | /// Returns the chunks if parallel decompression should not be used.
|
407 | /// Use `new_with_thread_pool` to customize the threadpool.
|
408 | pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
|
409 | Self::new_with_thread_pool(chunks, pedantic, ||{
|
410 | rayon_core::ThreadPoolBuilder::new()
|
411 | .thread_name(|index| format!("OpenEXR Block Decompressor Thread # {}" , index))
|
412 | .build()
|
413 | })
|
414 | }
|
415 |
|
416 | /// Create a new decompressor. Does not immediately spawn any tasks.
|
417 | /// Decompression starts after the first call to `next`.
|
418 | /// Returns the chunks if parallel decompression should not be used.
|
419 | pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
|
420 | -> std::result::Result<Self, R>
|
421 | where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError>
|
422 | {
|
423 | // if no compression is used in the file, don't use a threadpool
|
424 | if chunks.meta_data().headers.iter()
|
425 | .all(|head|head.compression == Compression::Uncompressed)
|
426 | {
|
427 | return Err(chunks);
|
428 | }
|
429 |
|
430 | // in case thread pool creation fails (for example on WASM currently),
|
431 | // we revert to sequential decompression
|
432 | let pool = match try_create_thread_pool() {
|
433 | Ok(pool) => pool,
|
434 |
|
435 | // TODO print warning?
|
436 | Err(_) => return Err(chunks),
|
437 | };
|
438 |
|
439 | let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times
|
440 |
|
441 | let (send, recv) = mpsc::channel(); // TODO bounded channel simplifies logic?
|
442 |
|
443 | Ok(Self {
|
444 | shared_meta_data_ref: Arc::new(chunks.meta_data().clone()),
|
445 | currently_decompressing_count: 0,
|
446 | remaining_chunks: chunks,
|
447 | sender: send,
|
448 | receiver: recv,
|
449 | pedantic,
|
450 | max_threads,
|
451 |
|
452 | pool,
|
453 | })
|
454 | }
|
455 |
|
456 | /// Fill the pool with decompression jobs. Returns the first job that finishes.
|
457 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
|
458 |
|
459 | while self.currently_decompressing_count < self.max_threads {
|
460 | let block = self.remaining_chunks.next();
|
461 | if let Some(block) = block {
|
462 | let block = match block {
|
463 | Ok(block) => block,
|
464 | Err(error) => return Some(Err(error))
|
465 | };
|
466 |
|
467 | let sender = self.sender.clone();
|
468 | let meta = self.shared_meta_data_ref.clone();
|
469 | let pedantic = self.pedantic;
|
470 |
|
471 | self.currently_decompressing_count += 1;
|
472 |
|
473 | self.pool.spawn(move || {
|
474 | let decompressed_or_err = UncompressedBlock::decompress_chunk(
|
475 | block, &meta, pedantic
|
476 | );
|
477 |
|
478 | // by now, decompressing could have failed in another thread.
|
479 | // the error is then already handled, so we simply
|
480 | // don't send the decompressed block and do nothing
|
481 | let _ = sender.send(decompressed_or_err);
|
482 | });
|
483 | }
|
484 | else {
|
485 | // there are no chunks left to decompress
|
486 | break;
|
487 | }
|
488 | }
|
489 |
|
490 | if self.currently_decompressing_count > 0 {
|
491 | let next = self.receiver.recv()
|
492 | .expect("all decompressing senders hung up but more messages were expected" );
|
493 |
|
494 | self.currently_decompressing_count -= 1;
|
495 | Some(next)
|
496 | }
|
497 | else {
|
498 | debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks" ); // TODO not reliable
|
499 | debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks" );
|
500 | None
|
501 | }
|
502 | }
|
503 |
|
504 | /// The extracted meta data of the image file.
|
505 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
|
506 | }
|
507 |
|
508 | impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
|
509 | impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
|
510 | type Item = Result<UncompressedBlock>;
|
511 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
|
512 | fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
|
513 | }
|
514 |
|
515 | impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
|
516 | impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
|
517 | type Item = Result<UncompressedBlock>;
|
518 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
|
519 | fn size_hint(&self) -> (usize, Option<usize>) {
|
520 | let remaining: usize = self.remaining_chunks.len() + self.currently_decompressing_count;
|
521 | (remaining, Some(remaining))
|
522 | }
|
523 | }
|
524 |
|
525 |
|
526 |
|
527 |
|
528 |
|
529 | |