1//! Composable structures to handle reading an image.
2
3
4use std::convert::TryFrom;
5use std::fmt::Debug;
6use std::io::{Read, Seek};
7use rayon_core::{ThreadPool, ThreadPoolBuildError};
8
9use smallvec::alloc::sync::Arc;
10
11use crate::block::{BlockIndex, UncompressedBlock};
12use crate::block::chunk::{Chunk, TileCoordinates};
13use crate::compression::Compression;
14use crate::error::{Error, Result, u64_to_usize, UnitResult};
15use crate::io::{PeekRead, Tracking};
16use crate::meta::{MetaData, OffsetTables};
17use crate::meta::header::Header;
18
19/// Decode the meta data from a byte source, keeping the source ready for further reading.
20/// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`.
21#[derive(Debug)]
22pub struct Reader<R> {
23 meta_data: MetaData,
24 remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough?
25}
26
27impl<R: Read + Seek> Reader<R> {
28
29 /// Start the reading process.
30 /// Immediately decodes the meta data into an internal field.
31 /// Access it via`meta_data()`.
32 pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
33 let mut remaining_reader = PeekRead::new(Tracking::new(read));
34 let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
35 Ok(Self { meta_data, remaining_reader })
36 }
37
38 // must not be mutable, as reading the file later on relies on the meta data
39 /// The decoded exr meta data from the file.
40 pub fn meta_data(&self) -> &MetaData { &self.meta_data }
41
42 /// The decoded exr meta data from the file.
43 pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
44
45 /// Obtain the meta data ownership.
46 pub fn into_meta_data(self) -> MetaData { self.meta_data }
47
48 /// Prepare to read all the chunks from the file.
49 /// Does not decode the chunks now, but returns a decoder.
50 /// Reading all chunks reduces seeking the file, but some chunks might be read without being used.
51 pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
52 let total_chunk_count = {
53 if pedantic {
54 let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
55 validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
56 offset_tables.iter().map(|table| table.len()).sum()
57 }
58 else {
59 usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?)
60 .expect("too large chunk count for this machine")
61 }
62 };
63
64 Ok(AllChunksReader {
65 meta_data: self.meta_data,
66 remaining_chunks: 0 .. total_chunk_count,
67 remaining_bytes: self.remaining_reader,
68 pedantic
69 })
70 }
71
72 /// Prepare to read some the chunks from the file.
73 /// Does not decode the chunks now, but returns a decoder.
74 /// Reading only some chunks may seeking the file, potentially skipping many bytes.
75 // TODO tile indices add no new information to block index??
76 pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
77 let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
78
79 // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk?
80 if pedantic {
81 validate_offset_tables(
82 self.meta_data.headers.as_slice(), &offset_tables,
83 self.remaining_reader.byte_position()
84 )?;
85 }
86
87 let mut filtered_offsets = Vec::with_capacity(
88 (self.meta_data.headers.len() * 32).min(2*2048)
89 );
90
91 // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied
92
93 for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers
94 for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order
95 let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
96
97 let block = BlockIndex {
98 layer: header_index,
99 level: tile.location.level_index,
100 pixel_position: data_indices.position.to_usize("data indices start")?,
101 pixel_size: data_indices.size,
102 };
103
104 if filter(&self.meta_data, tile.location, block) {
105 filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()`
106 }
107 };
108 }
109
110 filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing)
111
112 if pedantic {
113 // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid.
114 if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
115 return Err(Error::invalid("chunk offset table"))
116 }
117 }
118
119 Ok(FilteredChunksReader {
120 meta_data: self.meta_data,
121 expected_filtered_chunk_count: filtered_offsets.len(),
122 remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
123 remaining_bytes: self.remaining_reader
124 })
125 }
126}
127
128
129fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
130 let max_pixel_bytes: usize = headersimpl Iterator.iter() // when compressed, chunks are smaller, but never larger than max
131 .map(|header: &Header| header.max_pixel_file_bytes())
132 .sum();
133
134 // check that each offset is within the bounds
135 let end_byte: usize = chunks_start_byte + max_pixel_bytes;
136 let is_invalid: bool = offset_tablesimpl Iterator.iter().flatten().map(|&u64: u64| u64_to_usize(u64))
137 .any(|chunk_start: usize| chunk_start < chunks_start_byte || chunk_start > end_byte);
138
139 if is_invalid { Err(Error::invalid(message:"offset table")) }
140 else { Ok(()) }
141}
142
143
144
145
146/// Decode the desired chunks and skip the unimportant chunks in the file.
147/// The decoded chunks can be decompressed by calling
148/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
149/// Call `on_progress` to have a callback with each block.
150/// Also contains the image meta data.
151#[derive(Debug)]
152pub struct FilteredChunksReader<R> {
153 meta_data: MetaData,
154 expected_filtered_chunk_count: usize,
155 remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
156 remaining_bytes: PeekRead<Tracking<R>>,
157}
158
159/// Decode all chunks in the file without seeking.
160/// The decoded chunks can be decompressed by calling
161/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
162/// Call `on_progress` to have a callback with each block.
163/// Also contains the image meta data.
164#[derive(Debug)]
165pub struct AllChunksReader<R> {
166 meta_data: MetaData,
167 remaining_chunks: std::ops::Range<usize>,
168 remaining_bytes: PeekRead<Tracking<R>>,
169 pedantic: bool,
170}
171
172/// Decode chunks in the file without seeking.
173/// Calls the supplied closure for each chunk.
174/// The decoded chunks can be decompressed by calling
175/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
176/// Also contains the image meta data.
177#[derive(Debug)]
178pub struct OnProgressChunksReader<R, F> {
179 chunks_reader: R,
180 decoded_chunks: usize,
181 callback: F,
182}
183
184/// Decode chunks in the file.
185/// The decoded chunks can be decompressed by calling
186/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
187/// Call `on_progress` to have a callback with each block.
188/// Also contains the image meta data.
189pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
190
191 /// The decoded exr meta data from the file.
192 fn meta_data(&self) -> &MetaData;
193
194 /// The decoded exr headers from the file.
195 fn headers(&self) -> &[Header] { &self.meta_data().headers }
196
197 /// The number of chunks that this reader will return in total.
198 /// Can be less than the total number of chunks in the file, if some chunks are skipped.
199 fn expected_chunk_count(&self) -> usize;
200
201 /// Read the next compressed chunk from the file.
202 /// Equivalent to `.next()`, as this also is an iterator.
203 /// Returns `None` if all chunks have been read.
204 fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
205
206 /// Create a new reader that calls the provided progress
207 /// callback for each chunk that is read from the file.
208 /// If the file can be successfully decoded,
209 /// the progress will always at least once include 0.0 at the start and 1.0 at the end.
210 fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
211 OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
212 }
213
214 /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block.
215 /// The order of the blocks is not deterministic.
216 /// You can also use `parallel_decompressor` to obtain an iterator instead.
217 /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process.
218 // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>)
219 fn decompress_parallel(
220 self, pedantic: bool,
221 mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
222 ) -> UnitResult
223 {
224 let mut decompressor = match self.parallel_decompressor(pedantic) {
225 Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
226 Ok(decompressor) => decompressor,
227 };
228
229 while let Some(block) = decompressor.next() {
230 insert_block(decompressor.meta_data(), block?)?;
231 }
232
233 debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
234 Ok(())
235 }
236
237 /// Return an iterator that decompresses the chunks with multiple threads.
238 /// The order of the blocks is not deterministic.
239 /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool.
240 /// By default, this uses as many threads as there are CPUs.
241 /// Returns the `self` if there is no need for parallel decompression.
242 fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
243 ParallelBlockDecompressor::new(self, pedantic)
244 }
245
246 /// Return an iterator that decompresses the chunks in this thread.
247 /// You can alternatively use `sequential_decompressor` if you prefer an external iterator.
248 fn decompress_sequential(
249 self, pedantic: bool,
250 mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
251 ) -> UnitResult
252 {
253 let mut decompressor = self.sequential_decompressor(pedantic);
254 while let Some(block) = decompressor.next() {
255 insert_block(decompressor.meta_data(), block?)?;
256 }
257
258 debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
259 Ok(())
260 }
261
262 /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead.
263 fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
264 SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
265 }
266}
267
268impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
269 fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
270 fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
271}
272
273impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
274impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
275 type Item = Result<Chunk>;
276
277 fn next(&mut self) -> Option<Self::Item> {
278 self.chunks_reader.next().map(|item|{
279 {
280 let total_chunks = self.expected_chunk_count() as f64;
281 let callback = &mut self.callback;
282 callback(self.decoded_chunks as f64 / total_chunks);
283 }
284
285 self.decoded_chunks += 1;
286 item
287 })
288 .or_else(||{
289 debug_assert_eq!(
290 self.decoded_chunks, self.expected_chunk_count(),
291 "chunks reader finished but not all chunks are decompressed"
292 );
293
294 let callback = &mut self.callback;
295 callback(1.0);
296 None
297 })
298 }
299
300 fn size_hint(&self) -> (usize, Option<usize>) {
301 self.chunks_reader.size_hint()
302 }
303}
304
305impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
306 fn meta_data(&self) -> &MetaData { &self.meta_data }
307 fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
308}
309
310impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
311impl<R: Read + Seek> Iterator for AllChunksReader<R> {
312 type Item = Result<Chunk>;
313
314 fn next(&mut self) -> Option<Self::Item> {
315 // read as many chunks as the file should contain (inferred from meta data)
316 let next_chunk: Option> = self.remaining_chunks.next()
317 .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
318
319 // if no chunks are left, but some bytes remain, return error
320 if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
321 return Some(Err(Error::invalid(message:"end of file expected")));
322 }
323
324 next_chunk
325 }
326
327 fn size_hint(&self) -> (usize, Option<usize>) {
328 (self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
329 }
330}
331
332impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
333 fn meta_data(&self) -> &MetaData { &self.meta_data }
334 fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
335}
336
337impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
338impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
339 type Item = Result<Chunk>;
340
341 fn next(&mut self) -> Option<Self::Item> {
342 // read as many chunks as we have desired chunk offsets
343 self.remaining_filtered_chunk_indices.next().map(|next_chunk_location: u64|{
344 self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
345 position:usize::try_from(next_chunk_location)
346 .expect(msg:"too large chunk position for this machine")
347 )?;
348
349 let meta_data: &MetaData = &self.meta_data;
350 Chunk::read(&mut self.remaining_bytes, meta_data)
351 })
352
353 // TODO remember last chunk index and then seek to index+size and check whether bytes are left?
354 }
355
356 fn size_hint(&self) -> (usize, Option<usize>) {
357 (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
358 }
359}
360
361/// Read all chunks from the file, decompressing each chunk immediately.
362/// Implements iterator.
363#[derive(Debug)]
364pub struct SequentialBlockDecompressor<R: ChunksReader> {
365 remaining_chunks_reader: R,
366 pedantic: bool,
367}
368
369impl<R: ChunksReader> SequentialBlockDecompressor<R> {
370
371 /// The extracted meta data from the image file.
372 pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
373
374 /// Read and then decompress a single block of pixels from the byte source.
375 pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
376 self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk: Result|{
377 UncompressedBlock::decompress_chunk(chunk:compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
378 })
379 }
380}
381
382/// Decompress the chunks in a file in parallel.
383/// The first call to `next` will fill the thread pool with jobs,
384/// starting to decompress the next few blocks.
385/// These jobs will finish, even if you stop reading more blocks.
386/// Implements iterator.
387#[derive(Debug)]
388pub struct ParallelBlockDecompressor<R: ChunksReader> {
389 remaining_chunks: R,
390 sender: flume::Sender<Result<UncompressedBlock>>,
391 receiver: flume::Receiver<Result<UncompressedBlock>>,
392 currently_decompressing_count: usize,
393 max_threads: usize,
394
395 shared_meta_data_ref: Arc<MetaData>,
396 pedantic: bool,
397
398 pool: ThreadPool,
399}
400
401impl<R: ChunksReader> ParallelBlockDecompressor<R> {
402
403 /// Create a new decompressor. Does not immediately spawn any tasks.
404 /// Decompression starts after the first call to `next`.
405 /// Returns the chunks if parallel decompression should not be used.
406 /// Use `new_with_thread_pool` to customize the threadpool.
407 pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
408 Self::new_with_thread_pool(chunks, pedantic, ||{
409 rayon_core::ThreadPoolBuilder::new()
410 .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
411 .build()
412 })
413 }
414
415 /// Create a new decompressor. Does not immediately spawn any tasks.
416 /// Decompression starts after the first call to `next`.
417 /// Returns the chunks if parallel decompression should not be used.
418 pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
419 -> std::result::Result<Self, R>
420 where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError>
421 {
422 // if no compression is used in the file, don't use a threadpool
423 if chunks.meta_data().headers.iter()
424 .all(|head|head.compression == Compression::Uncompressed)
425 {
426 return Err(chunks);
427 }
428
429 // in case thread pool creation fails (for example on WASM currently),
430 // we revert to sequential decompression
431 let pool = match try_create_thread_pool() {
432 Ok(pool) => pool,
433
434 // TODO print warning?
435 Err(_) => return Err(chunks),
436 };
437
438 let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times
439
440 let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic?
441
442 Ok(Self {
443 shared_meta_data_ref: Arc::new(chunks.meta_data().clone()),
444 currently_decompressing_count: 0,
445 remaining_chunks: chunks,
446 sender: send,
447 receiver: recv,
448 pedantic,
449 max_threads,
450
451 pool,
452 })
453 }
454
455 /// Fill the pool with decompression jobs. Returns the first job that finishes.
456 pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
457
458 while self.currently_decompressing_count < self.max_threads {
459 let block = self.remaining_chunks.next();
460 if let Some(block) = block {
461 let block = match block {
462 Ok(block) => block,
463 Err(error) => return Some(Err(error))
464 };
465
466 let sender = self.sender.clone();
467 let meta = self.shared_meta_data_ref.clone();
468 let pedantic = self.pedantic;
469
470 self.currently_decompressing_count += 1;
471
472 self.pool.spawn(move || {
473 let decompressed_or_err = UncompressedBlock::decompress_chunk(
474 block, &meta, pedantic
475 );
476
477 // by now, decompressing could have failed in another thread.
478 // the error is then already handled, so we simply
479 // don't send the decompressed block and do nothing
480 let _ = sender.send(decompressed_or_err);
481 });
482 }
483 else {
484 // there are no chunks left to decompress
485 break;
486 }
487 }
488
489 if self.currently_decompressing_count > 0 {
490 let next = self.receiver.recv()
491 .expect("all decompressing senders hung up but more messages were expected");
492
493 self.currently_decompressing_count -= 1;
494 Some(next)
495 }
496 else {
497 debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable
498 debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks");
499 None
500 }
501 }
502
503 /// The extracted meta data of the image file.
504 pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
505}
506
507impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
508impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
509 type Item = Result<UncompressedBlock>;
510 fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
511 fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
512}
513
514impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
515impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
516 type Item = Result<UncompressedBlock>;
517 fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
518 fn size_hint(&self) -> (usize, Option<usize>) {
519 let remaining: usize = self.remaining_chunks.len() + self.currently_decompressing_count;
520 (remaining, Some(remaining))
521 }
522}
523
524
525
526
527
528