| 1 | //! Composable structures to handle reading an image.
|
| 2 |
|
| 3 |
|
| 4 | use std::convert::TryFrom;
|
| 5 | use std::fmt::Debug;
|
| 6 | use std::io::{Read, Seek};
|
| 7 | use std::sync::mpsc;
|
| 8 | use rayon_core::{ThreadPool, ThreadPoolBuildError};
|
| 9 |
|
| 10 | use smallvec::alloc::sync::Arc;
|
| 11 |
|
| 12 | use crate::block::{BlockIndex, UncompressedBlock};
|
| 13 | use crate::block::chunk::{Chunk, TileCoordinates};
|
| 14 | use crate::compression::Compression;
|
| 15 | use crate::error::{Error, Result, u64_to_usize, UnitResult};
|
| 16 | use crate::io::{PeekRead, Tracking};
|
| 17 | use crate::meta::{MetaData, OffsetTables};
|
| 18 | use crate::meta::header::Header;
|
| 19 |
|
| 20 | /// Decode the meta data from a byte source, keeping the source ready for further reading.
|
| 21 | /// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`.
|
| 22 | #[derive (Debug)]
|
| 23 | pub struct Reader<R> {
|
| 24 | meta_data: MetaData,
|
| 25 | remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough?
|
| 26 | }
|
| 27 |
|
| 28 | impl<R: Read + Seek> Reader<R> {
|
| 29 |
|
| 30 | /// Start the reading process.
|
| 31 | /// Immediately decodes the meta data into an internal field.
|
| 32 | /// Access it via`meta_data()`.
|
| 33 | pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
|
| 34 | let mut remaining_reader = PeekRead::new(Tracking::new(read));
|
| 35 | let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
|
| 36 | Ok(Self { meta_data, remaining_reader })
|
| 37 | }
|
| 38 |
|
| 39 | // must not be mutable, as reading the file later on relies on the meta data
|
| 40 | /// The decoded exr meta data from the file.
|
| 41 | pub fn meta_data(&self) -> &MetaData { &self.meta_data }
|
| 42 |
|
| 43 | /// The decoded exr meta data from the file.
|
| 44 | pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
|
| 45 |
|
| 46 | /// Obtain the meta data ownership.
|
| 47 | pub fn into_meta_data(self) -> MetaData { self.meta_data }
|
| 48 |
|
| 49 | /// Prepare to read all the chunks from the file.
|
| 50 | /// Does not decode the chunks now, but returns a decoder.
|
| 51 | /// Reading all chunks reduces seeking the file, but some chunks might be read without being used.
|
| 52 | pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
|
| 53 | let total_chunk_count = {
|
| 54 | if pedantic {
|
| 55 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
|
| 56 | validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
|
| 57 | offset_tables.iter().map(|table| table.len()).sum()
|
| 58 | }
|
| 59 | else {
|
| 60 | usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?)
|
| 61 | .expect("too large chunk count for this machine" )
|
| 62 | }
|
| 63 | };
|
| 64 |
|
| 65 | Ok(AllChunksReader {
|
| 66 | meta_data: self.meta_data,
|
| 67 | remaining_chunks: 0 .. total_chunk_count,
|
| 68 | remaining_bytes: self.remaining_reader,
|
| 69 | pedantic
|
| 70 | })
|
| 71 | }
|
| 72 |
|
| 73 | /// Prepare to read some the chunks from the file.
|
| 74 | /// Does not decode the chunks now, but returns a decoder.
|
| 75 | /// Reading only some chunks may seeking the file, potentially skipping many bytes.
|
| 76 | // TODO tile indices add no new information to block index??
|
| 77 | pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
|
| 78 | let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
|
| 79 |
|
| 80 | // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk?
|
| 81 | if pedantic {
|
| 82 | validate_offset_tables(
|
| 83 | self.meta_data.headers.as_slice(), &offset_tables,
|
| 84 | self.remaining_reader.byte_position()
|
| 85 | )?;
|
| 86 | }
|
| 87 |
|
| 88 | let mut filtered_offsets = Vec::with_capacity(
|
| 89 | (self.meta_data.headers.len() * 32).min(2*2048)
|
| 90 | );
|
| 91 |
|
| 92 | // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied
|
| 93 |
|
| 94 | for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers
|
| 95 | for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order
|
| 96 | let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
|
| 97 |
|
| 98 | let block = BlockIndex {
|
| 99 | layer: header_index,
|
| 100 | level: tile.location.level_index,
|
| 101 | pixel_position: data_indices.position.to_usize("data indices start" )?,
|
| 102 | pixel_size: data_indices.size,
|
| 103 | };
|
| 104 |
|
| 105 | if filter(&self.meta_data, tile.location, block) {
|
| 106 | filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()`
|
| 107 | }
|
| 108 | };
|
| 109 | }
|
| 110 |
|
| 111 | filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing)
|
| 112 |
|
| 113 | if pedantic {
|
| 114 | // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid.
|
| 115 | if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
|
| 116 | return Err(Error::invalid("chunk offset table" ))
|
| 117 | }
|
| 118 | }
|
| 119 |
|
| 120 | Ok(FilteredChunksReader {
|
| 121 | meta_data: self.meta_data,
|
| 122 | expected_filtered_chunk_count: filtered_offsets.len(),
|
| 123 | remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
|
| 124 | remaining_bytes: self.remaining_reader
|
| 125 | })
|
| 126 | }
|
| 127 | }
|
| 128 |
|
| 129 |
|
| 130 | fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
|
| 131 | let max_pixel_bytes: usize = headersimpl Iterator .iter() // when compressed, chunks are smaller, but never larger than max
|
| 132 | .map(|header: &Header| header.max_pixel_file_bytes())
|
| 133 | .sum();
|
| 134 |
|
| 135 | // check that each offset is within the bounds
|
| 136 | let end_byte: usize = chunks_start_byte + max_pixel_bytes;
|
| 137 | let is_invalid: bool = offset_tablesimpl Iterator .iter().flatten().map(|&u64: u64| u64_to_usize(u64))
|
| 138 | .any(|chunk_start: usize| chunk_start < chunks_start_byte || chunk_start > end_byte);
|
| 139 |
|
| 140 | if is_invalid { Err(Error::invalid(message:"offset table" )) }
|
| 141 | else { Ok(()) }
|
| 142 | }
|
| 143 |
|
| 144 |
|
| 145 |
|
| 146 |
|
| 147 | /// Decode the desired chunks and skip the unimportant chunks in the file.
|
| 148 | /// The decoded chunks can be decompressed by calling
|
| 149 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
|
| 150 | /// Call `on_progress` to have a callback with each block.
|
| 151 | /// Also contains the image meta data.
|
| 152 | #[derive (Debug)]
|
| 153 | pub struct FilteredChunksReader<R> {
|
| 154 | meta_data: MetaData,
|
| 155 | expected_filtered_chunk_count: usize,
|
| 156 | remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
|
| 157 | remaining_bytes: PeekRead<Tracking<R>>,
|
| 158 | }
|
| 159 |
|
| 160 | /// Decode all chunks in the file without seeking.
|
| 161 | /// The decoded chunks can be decompressed by calling
|
| 162 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
|
| 163 | /// Call `on_progress` to have a callback with each block.
|
| 164 | /// Also contains the image meta data.
|
| 165 | #[derive (Debug)]
|
| 166 | pub struct AllChunksReader<R> {
|
| 167 | meta_data: MetaData,
|
| 168 | remaining_chunks: std::ops::Range<usize>,
|
| 169 | remaining_bytes: PeekRead<Tracking<R>>,
|
| 170 | pedantic: bool,
|
| 171 | }
|
| 172 |
|
| 173 | /// Decode chunks in the file without seeking.
|
| 174 | /// Calls the supplied closure for each chunk.
|
| 175 | /// The decoded chunks can be decompressed by calling
|
| 176 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
|
| 177 | /// Also contains the image meta data.
|
| 178 | #[derive (Debug)]
|
| 179 | pub struct OnProgressChunksReader<R, F> {
|
| 180 | chunks_reader: R,
|
| 181 | decoded_chunks: usize,
|
| 182 | callback: F,
|
| 183 | }
|
| 184 |
|
| 185 | /// Decode chunks in the file.
|
| 186 | /// The decoded chunks can be decompressed by calling
|
| 187 | /// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
|
| 188 | /// Call `on_progress` to have a callback with each block.
|
| 189 | /// Also contains the image meta data.
|
| 190 | pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
|
| 191 |
|
| 192 | /// The decoded exr meta data from the file.
|
| 193 | fn meta_data(&self) -> &MetaData;
|
| 194 |
|
| 195 | /// The decoded exr headers from the file.
|
| 196 | fn headers(&self) -> &[Header] { &self.meta_data().headers }
|
| 197 |
|
| 198 | /// The number of chunks that this reader will return in total.
|
| 199 | /// Can be less than the total number of chunks in the file, if some chunks are skipped.
|
| 200 | fn expected_chunk_count(&self) -> usize;
|
| 201 |
|
| 202 | /// Read the next compressed chunk from the file.
|
| 203 | /// Equivalent to `.next()`, as this also is an iterator.
|
| 204 | /// Returns `None` if all chunks have been read.
|
| 205 | fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
|
| 206 |
|
| 207 | /// Create a new reader that calls the provided progress
|
| 208 | /// callback for each chunk that is read from the file.
|
| 209 | /// If the file can be successfully decoded,
|
| 210 | /// the progress will always at least once include 0.0 at the start and 1.0 at the end.
|
| 211 | fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
|
| 212 | OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
|
| 213 | }
|
| 214 |
|
| 215 | /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block.
|
| 216 | /// The order of the blocks is not deterministic.
|
| 217 | /// You can also use `parallel_decompressor` to obtain an iterator instead.
|
| 218 | /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process.
|
| 219 | // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>)
|
| 220 | fn decompress_parallel(
|
| 221 | self, pedantic: bool,
|
| 222 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
|
| 223 | ) -> UnitResult
|
| 224 | {
|
| 225 | let mut decompressor = match self.parallel_decompressor(pedantic) {
|
| 226 | Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
|
| 227 | Ok(decompressor) => decompressor,
|
| 228 | };
|
| 229 |
|
| 230 | while let Some(block) = decompressor.next() {
|
| 231 | insert_block(decompressor.meta_data(), block?)?;
|
| 232 | }
|
| 233 |
|
| 234 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks" );
|
| 235 | Ok(())
|
| 236 | }
|
| 237 |
|
| 238 | /// Return an iterator that decompresses the chunks with multiple threads.
|
| 239 | /// The order of the blocks is not deterministic.
|
| 240 | /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool.
|
| 241 | /// By default, this uses as many threads as there are CPUs.
|
| 242 | /// Returns the `self` if there is no need for parallel decompression.
|
| 243 | fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
|
| 244 | ParallelBlockDecompressor::new(self, pedantic)
|
| 245 | }
|
| 246 |
|
| 247 | /// Return an iterator that decompresses the chunks in this thread.
|
| 248 | /// You can alternatively use `sequential_decompressor` if you prefer an external iterator.
|
| 249 | fn decompress_sequential(
|
| 250 | self, pedantic: bool,
|
| 251 | mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
|
| 252 | ) -> UnitResult
|
| 253 | {
|
| 254 | let mut decompressor = self.sequential_decompressor(pedantic);
|
| 255 | while let Some(block) = decompressor.next() {
|
| 256 | insert_block(decompressor.meta_data(), block?)?;
|
| 257 | }
|
| 258 |
|
| 259 | debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks" );
|
| 260 | Ok(())
|
| 261 | }
|
| 262 |
|
| 263 | /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead.
|
| 264 | fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
|
| 265 | SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
|
| 266 | }
|
| 267 | }
|
| 268 |
|
| 269 | impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
|
| 270 | fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
|
| 271 | fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
|
| 272 | }
|
| 273 |
|
| 274 | impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
|
| 275 | impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
|
| 276 | type Item = Result<Chunk>;
|
| 277 |
|
| 278 | fn next(&mut self) -> Option<Self::Item> {
|
| 279 | self.chunks_reader.next().map(|item|{
|
| 280 | {
|
| 281 | let total_chunks = self.expected_chunk_count() as f64;
|
| 282 | let callback = &mut self.callback;
|
| 283 | callback(self.decoded_chunks as f64 / total_chunks);
|
| 284 | }
|
| 285 |
|
| 286 | self.decoded_chunks += 1;
|
| 287 | item
|
| 288 | })
|
| 289 | .or_else(||{
|
| 290 | debug_assert_eq!(
|
| 291 | self.decoded_chunks, self.expected_chunk_count(),
|
| 292 | "chunks reader finished but not all chunks are decompressed"
|
| 293 | );
|
| 294 |
|
| 295 | let callback = &mut self.callback;
|
| 296 | callback(1.0);
|
| 297 | None
|
| 298 | })
|
| 299 | }
|
| 300 |
|
| 301 | fn size_hint(&self) -> (usize, Option<usize>) {
|
| 302 | self.chunks_reader.size_hint()
|
| 303 | }
|
| 304 | }
|
| 305 |
|
| 306 | impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
|
| 307 | fn meta_data(&self) -> &MetaData { &self.meta_data }
|
| 308 | fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
|
| 309 | }
|
| 310 |
|
| 311 | impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
|
| 312 | impl<R: Read + Seek> Iterator for AllChunksReader<R> {
|
| 313 | type Item = Result<Chunk>;
|
| 314 |
|
| 315 | fn next(&mut self) -> Option<Self::Item> {
|
| 316 | // read as many chunks as the file should contain (inferred from meta data)
|
| 317 | let next_chunk: Option> = self.remaining_chunks.next()
|
| 318 | .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
|
| 319 |
|
| 320 | // if no chunks are left, but some bytes remain, return error
|
| 321 | if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
|
| 322 | return Some(Err(Error::invalid(message:"end of file expected" )));
|
| 323 | }
|
| 324 |
|
| 325 | next_chunk
|
| 326 | }
|
| 327 |
|
| 328 | fn size_hint(&self) -> (usize, Option<usize>) {
|
| 329 | (self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
|
| 330 | }
|
| 331 | }
|
| 332 |
|
| 333 | impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
|
| 334 | fn meta_data(&self) -> &MetaData { &self.meta_data }
|
| 335 | fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
|
| 336 | }
|
| 337 |
|
| 338 | impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
|
| 339 | impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
|
| 340 | type Item = Result<Chunk>;
|
| 341 |
|
| 342 | fn next(&mut self) -> Option<Self::Item> {
|
| 343 | // read as many chunks as we have desired chunk offsets
|
| 344 | self.remaining_filtered_chunk_indices.next().map(|next_chunk_location: u64|{
|
| 345 | self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
|
| 346 | position:usize::try_from(next_chunk_location)
|
| 347 | .expect(msg:"too large chunk position for this machine" )
|
| 348 | )?;
|
| 349 |
|
| 350 | let meta_data: &MetaData = &self.meta_data;
|
| 351 | Chunk::read(&mut self.remaining_bytes, meta_data)
|
| 352 | })
|
| 353 |
|
| 354 | // TODO remember last chunk index and then seek to index+size and check whether bytes are left?
|
| 355 | }
|
| 356 |
|
| 357 | fn size_hint(&self) -> (usize, Option<usize>) {
|
| 358 | (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
|
| 359 | }
|
| 360 | }
|
| 361 |
|
| 362 | /// Read all chunks from the file, decompressing each chunk immediately.
|
| 363 | /// Implements iterator.
|
| 364 | #[derive (Debug)]
|
| 365 | pub struct SequentialBlockDecompressor<R: ChunksReader> {
|
| 366 | remaining_chunks_reader: R,
|
| 367 | pedantic: bool,
|
| 368 | }
|
| 369 |
|
| 370 | impl<R: ChunksReader> SequentialBlockDecompressor<R> {
|
| 371 |
|
| 372 | /// The extracted meta data from the image file.
|
| 373 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
|
| 374 |
|
| 375 | /// Read and then decompress a single block of pixels from the byte source.
|
| 376 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
|
| 377 | self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk: Result|{
|
| 378 | UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
|
| 379 | })
|
| 380 | }
|
| 381 | }
|
| 382 |
|
| 383 | /// Decompress the chunks in a file in parallel.
|
| 384 | /// The first call to `next` will fill the thread pool with jobs,
|
| 385 | /// starting to decompress the next few blocks.
|
| 386 | /// These jobs will finish, even if you stop reading more blocks.
|
| 387 | /// Implements iterator.
|
| 388 | #[derive (Debug)]
|
| 389 | pub struct ParallelBlockDecompressor<R: ChunksReader> {
|
| 390 | remaining_chunks: R,
|
| 391 | sender: mpsc::Sender<Result<UncompressedBlock>>,
|
| 392 | receiver: mpsc::Receiver<Result<UncompressedBlock>>,
|
| 393 | currently_decompressing_count: usize,
|
| 394 | max_threads: usize,
|
| 395 |
|
| 396 | shared_meta_data_ref: Arc<MetaData>,
|
| 397 | pedantic: bool,
|
| 398 |
|
| 399 | pool: ThreadPool,
|
| 400 | }
|
| 401 |
|
| 402 | impl<R: ChunksReader> ParallelBlockDecompressor<R> {
|
| 403 |
|
| 404 | /// Create a new decompressor. Does not immediately spawn any tasks.
|
| 405 | /// Decompression starts after the first call to `next`.
|
| 406 | /// Returns the chunks if parallel decompression should not be used.
|
| 407 | /// Use `new_with_thread_pool` to customize the threadpool.
|
| 408 | pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
|
| 409 | Self::new_with_thread_pool(chunks, pedantic, ||{
|
| 410 | rayon_core::ThreadPoolBuilder::new()
|
| 411 | .thread_name(|index| format!("OpenEXR Block Decompressor Thread # {}" , index))
|
| 412 | .build()
|
| 413 | })
|
| 414 | }
|
| 415 |
|
| 416 | /// Create a new decompressor. Does not immediately spawn any tasks.
|
| 417 | /// Decompression starts after the first call to `next`.
|
| 418 | /// Returns the chunks if parallel decompression should not be used.
|
| 419 | pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
|
| 420 | -> std::result::Result<Self, R>
|
| 421 | where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError>
|
| 422 | {
|
| 423 | // if no compression is used in the file, don't use a threadpool
|
| 424 | if chunks.meta_data().headers.iter()
|
| 425 | .all(|head|head.compression == Compression::Uncompressed)
|
| 426 | {
|
| 427 | return Err(chunks);
|
| 428 | }
|
| 429 |
|
| 430 | // in case thread pool creation fails (for example on WASM currently),
|
| 431 | // we revert to sequential decompression
|
| 432 | let pool = match try_create_thread_pool() {
|
| 433 | Ok(pool) => pool,
|
| 434 |
|
| 435 | // TODO print warning?
|
| 436 | Err(_) => return Err(chunks),
|
| 437 | };
|
| 438 |
|
| 439 | let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times
|
| 440 |
|
| 441 | let (send, recv) = mpsc::channel(); // TODO bounded channel simplifies logic?
|
| 442 |
|
| 443 | Ok(Self {
|
| 444 | shared_meta_data_ref: Arc::new(chunks.meta_data().clone()),
|
| 445 | currently_decompressing_count: 0,
|
| 446 | remaining_chunks: chunks,
|
| 447 | sender: send,
|
| 448 | receiver: recv,
|
| 449 | pedantic,
|
| 450 | max_threads,
|
| 451 |
|
| 452 | pool,
|
| 453 | })
|
| 454 | }
|
| 455 |
|
| 456 | /// Fill the pool with decompression jobs. Returns the first job that finishes.
|
| 457 | pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
|
| 458 |
|
| 459 | while self.currently_decompressing_count < self.max_threads {
|
| 460 | let block = self.remaining_chunks.next();
|
| 461 | if let Some(block) = block {
|
| 462 | let block = match block {
|
| 463 | Ok(block) => block,
|
| 464 | Err(error) => return Some(Err(error))
|
| 465 | };
|
| 466 |
|
| 467 | let sender = self.sender.clone();
|
| 468 | let meta = self.shared_meta_data_ref.clone();
|
| 469 | let pedantic = self.pedantic;
|
| 470 |
|
| 471 | self.currently_decompressing_count += 1;
|
| 472 |
|
| 473 | self.pool.spawn(move || {
|
| 474 | let decompressed_or_err = UncompressedBlock::decompress_chunk(
|
| 475 | block, &meta, pedantic
|
| 476 | );
|
| 477 |
|
| 478 | // by now, decompressing could have failed in another thread.
|
| 479 | // the error is then already handled, so we simply
|
| 480 | // don't send the decompressed block and do nothing
|
| 481 | let _ = sender.send(decompressed_or_err);
|
| 482 | });
|
| 483 | }
|
| 484 | else {
|
| 485 | // there are no chunks left to decompress
|
| 486 | break;
|
| 487 | }
|
| 488 | }
|
| 489 |
|
| 490 | if self.currently_decompressing_count > 0 {
|
| 491 | let next = self.receiver.recv()
|
| 492 | .expect("all decompressing senders hung up but more messages were expected" );
|
| 493 |
|
| 494 | self.currently_decompressing_count -= 1;
|
| 495 | Some(next)
|
| 496 | }
|
| 497 | else {
|
| 498 | debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks" ); // TODO not reliable
|
| 499 | debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks" );
|
| 500 | None
|
| 501 | }
|
| 502 | }
|
| 503 |
|
| 504 | /// The extracted meta data of the image file.
|
| 505 | pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
|
| 506 | }
|
| 507 |
|
| 508 | impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
|
| 509 | impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
|
| 510 | type Item = Result<UncompressedBlock>;
|
| 511 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
|
| 512 | fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
|
| 513 | }
|
| 514 |
|
| 515 | impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
|
| 516 | impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
|
| 517 | type Item = Result<UncompressedBlock>;
|
| 518 | fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
|
| 519 | fn size_hint(&self) -> (usize, Option<usize>) {
|
| 520 | let remaining: usize = self.remaining_chunks.len() + self.currently_decompressing_count;
|
| 521 | (remaining, Some(remaining))
|
| 522 | }
|
| 523 | }
|
| 524 |
|
| 525 |
|
| 526 |
|
| 527 |
|
| 528 |
|
| 529 | |