1//! Composable structures to handle writing an image.
2
3
4use std::fmt::Debug;
5use std::io::Seek;
6use std::iter::Peekable;
7use std::ops::Not;
8use rayon_core::{ThreadPool, ThreadPoolBuildError};
9
10use smallvec::alloc::collections::BTreeMap;
11
12use crate::block::UncompressedBlock;
13use crate::block::chunk::{Chunk};
14use crate::compression::Compression;
15use crate::error::{Error, Result, UnitResult, usize_to_u64};
16use crate::io::{Data, Tracking, Write};
17use crate::meta::{Headers, MetaData, OffsetTables};
18use crate::meta::attribute::LineOrder;
19
20/// Write an exr file by writing one chunk after another in a closure.
21/// In the closure, you are provided a chunk writer, which should be used to write all the chunks.
22/// Assumes the your write destination is buffered.
23pub fn write_chunks_with<W: Write + Seek>(
24 buffered_write: W, headers: Headers, pedantic: bool,
25 write_chunks: impl FnOnce(MetaData, &mut ChunkWriter<W>) -> UnitResult
26) -> UnitResult {
27 // this closure approach ensures that after writing all chunks, the file is always completed and checked and flushed
28 let (meta: MetaData, mut writer: ChunkWriter) = ChunkWriter::new_for_buffered(buffered_byte_writer:buffered_write, headers, pedantic)?;
29 write_chunks(meta, &mut writer)?;
30 writer.complete_meta_data()
31}
32
33/// Can consume compressed pixel chunks, writing them a file.
34/// Use `sequential_blocks_compressor` or `parallel_blocks_compressor` to compress your data,
35/// or use `compress_all_blocks_sequential` or `compress_all_blocks_parallel`.
36/// Use `on_progress` to obtain a new writer
37/// that triggers a callback for each block.
38// #[must_use]
39#[derive(Debug)]
40#[must_use]
41pub struct ChunkWriter<W> {
42 header_count: usize,
43 byte_writer: Tracking<W>,
44 chunk_indices_byte_location: std::ops::Range<usize>,
45 chunk_indices_increasing_y: OffsetTables,
46 chunk_count: usize, // TODO compose?
47}
48
49/// A new writer that triggers a callback
50/// for each block written to the inner writer.
51#[derive(Debug)]
52#[must_use]
53pub struct OnProgressChunkWriter<'w, W, F> {
54 chunk_writer: &'w mut W,
55 written_chunks: usize,
56 on_progress: F,
57}
58
59/// Write chunks to a byte destination.
60/// Then write each chunk with `writer.write_chunk(chunk)`.
61pub trait ChunksWriter: Sized {
62
63 /// The total number of chunks that the complete file will contain.
64 fn total_chunks_count(&self) -> usize;
65
66 /// Any more calls will result in an error and have no effect.
67 /// If writing results in an error, the file and the writer
68 /// may remain in an invalid state and should not be used further.
69 /// Errors when the chunk at this index was already written.
70 fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult;
71
72 /// Obtain a new writer that calls the specified closure for each block that is written to this writer.
73 fn on_progress<F>(&mut self, on_progress: F) -> OnProgressChunkWriter<'_, Self, F> where F: FnMut(f64) {
74 OnProgressChunkWriter { chunk_writer: self, written_chunks: 0, on_progress }
75 }
76
77 /// Obtain a new writer that can compress blocks to chunks, which are then passed to this writer.
78 fn sequential_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> SequentialBlocksCompressor<'w, Self> {
79 SequentialBlocksCompressor::new(meta, self)
80 }
81
82 /// Obtain a new writer that can compress blocks to chunks on multiple threads, which are then passed to this writer.
83 /// Returns none if the sequential compressor should be used instead (thread pool creation failure or too large performance overhead).
84 fn parallel_blocks_compressor<'w>(&'w mut self, meta: &'w MetaData) -> Option<ParallelBlocksCompressor<'w, Self>> {
85 ParallelBlocksCompressor::new(meta, self)
86 }
87
88 /// Compresses all blocks to the file.
89 /// The index of the block must be in increasing line order within the header.
90 /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods.
91 fn compress_all_blocks_sequential(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult {
92 let mut writer = self.sequential_blocks_compressor(meta);
93
94 // TODO check block order if line order is not unspecified!
95 for (index_in_header_increasing_y, block) in blocks {
96 writer.compress_block(index_in_header_increasing_y, block)?;
97 }
98
99 // TODO debug_assert_eq!(self.is_complete());
100 Ok(())
101 }
102
103 /// Compresses all blocks to the file.
104 /// The index of the block must be in increasing line order within the header.
105 /// Obtain iterator with `MetaData::collect_ordered_blocks(...)` or similar methods.
106 /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process.
107 fn compress_all_blocks_parallel(mut self, meta: &MetaData, blocks: impl Iterator<Item=(usize, UncompressedBlock)>) -> UnitResult {
108 let mut parallel_writer = match self.parallel_blocks_compressor(meta) {
109 None => return self.compress_all_blocks_sequential(meta, blocks),
110 Some(writer) => writer,
111 };
112
113 // TODO check block order if line order is not unspecified!
114 for (index_in_header_increasing_y, block) in blocks {
115 parallel_writer.add_block_to_compression_queue(index_in_header_increasing_y, block)?;
116 }
117
118 // TODO debug_assert_eq!(self.is_complete());
119 Ok(())
120 }
121}
122
123
124impl<W> ChunksWriter for ChunkWriter<W> where W: Write + Seek {
125
126 /// The total number of chunks that the complete file will contain.
127 fn total_chunks_count(&self) -> usize { self.chunk_count }
128
129 /// Any more calls will result in an error and have no effect.
130 /// If writing results in an error, the file and the writer
131 /// may remain in an invalid state and should not be used further.
132 /// Errors when the chunk at this index was already written.
133 fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult {
134 let header_chunk_indices = &mut self.chunk_indices_increasing_y[chunk.layer_index];
135
136 if index_in_header_increasing_y >= header_chunk_indices.len() {
137 return Err(Error::invalid("too large chunk index"));
138 }
139
140 let chunk_index_slot = &mut header_chunk_indices[index_in_header_increasing_y];
141 if *chunk_index_slot != 0 {
142 return Err(Error::invalid(format!("chunk at index {} is already written", index_in_header_increasing_y)));
143 }
144
145 *chunk_index_slot = usize_to_u64(self.byte_writer.byte_position());
146 chunk.write(&mut self.byte_writer, self.header_count)?;
147 Ok(())
148 }
149}
150
151impl<W> ChunkWriter<W> where W: Write + Seek {
152 // -- the following functions are private, because they must be called in a strict order --
153
154 /// Writes the meta data and zeroed offset tables as a placeholder.
155 fn new_for_buffered(buffered_byte_writer: W, headers: Headers, pedantic: bool) -> Result<(MetaData, Self)> {
156 let mut write = Tracking::new(buffered_byte_writer);
157 let requirements = MetaData::write_validating_to_buffered(&mut write, headers.as_slice(), pedantic)?;
158
159 // TODO: use increasing line order where possible, but this requires us to know whether we want to be parallel right now
160 /*// if non-parallel compression, we always use increasing order anyways
161 if !parallel || !has_compression {
162 for header in &mut headers {
163 if header.line_order == LineOrder::Unspecified {
164 header.line_order = LineOrder::Increasing;
165 }
166 }
167 }*/
168
169 let offset_table_size: usize = headers.iter().map(|header| header.chunk_count).sum();
170
171 let offset_table_start_byte = write.byte_position();
172 let offset_table_end_byte = write.byte_position() + offset_table_size * u64::BYTE_SIZE;
173
174 // skip offset tables, filling with 0, will be updated after the last chunk has been written
175 write.seek_write_to(offset_table_end_byte)?;
176
177 let header_count = headers.len();
178 let chunk_indices_increasing_y = headers.iter()
179 .map(|header| vec![0_u64; header.chunk_count]).collect();
180
181 let meta_data = MetaData { requirements, headers };
182
183 Ok((meta_data, ChunkWriter {
184 header_count,
185 byte_writer: write,
186 chunk_count: offset_table_size,
187 chunk_indices_byte_location: offset_table_start_byte .. offset_table_end_byte,
188 chunk_indices_increasing_y,
189 }))
190 }
191
192 /// Seek back to the meta data, write offset tables, and flush the byte writer.
193 /// Leaves the writer seeked to the middle of the file.
194 fn complete_meta_data(mut self) -> UnitResult {
195 if self.chunk_indices_increasing_y.iter().flatten().any(|&index| index == 0) {
196 return Err(Error::invalid("some chunks are not written yet"))
197 }
198
199 // write all offset tables
200 debug_assert_ne!(self.byte_writer.byte_position(), self.chunk_indices_byte_location.end, "offset table has already been updated");
201 self.byte_writer.seek_write_to(self.chunk_indices_byte_location.start)?;
202
203 for table in self.chunk_indices_increasing_y {
204 u64::write_slice(&mut self.byte_writer, table.as_slice())?;
205 }
206
207 self.byte_writer.flush()?; // make sure we catch all (possibly delayed) io errors before returning
208 Ok(())
209 }
210
211}
212
213
214impl<'w, W, F> ChunksWriter for OnProgressChunkWriter<'w, W, F> where W: 'w + ChunksWriter, F: FnMut(f64) {
215 fn total_chunks_count(&self) -> usize {
216 self.chunk_writer.total_chunks_count()
217 }
218
219 fn write_chunk(&mut self, index_in_header_increasing_y: usize, chunk: Chunk) -> UnitResult {
220 let total_chunks = self.total_chunks_count();
221 let on_progress = &mut self.on_progress;
222
223 // guarantee on_progress being called with 0 once
224 if self.written_chunks == 0 { on_progress(0.0); }
225
226 self.chunk_writer.write_chunk(index_in_header_increasing_y, chunk)?;
227
228 self.written_chunks += 1;
229
230 on_progress({
231 // guarantee finishing with progress 1.0 for last block at least once, float division might slightly differ from 1.0
232 if self.written_chunks == total_chunks { 1.0 }
233 else { self.written_chunks as f64 / total_chunks as f64 }
234 });
235
236 Ok(())
237 }
238}
239
240
241/// Write blocks that appear in any order and reorder them before writing.
242#[derive(Debug)]
243#[must_use]
244pub struct SortedBlocksWriter<'w, W> {
245 chunk_writer: &'w mut W,
246 pending_chunks: BTreeMap<usize, (usize, Chunk)>,
247 unwritten_chunk_indices: Peekable<std::ops::Range<usize>>,
248 requires_sorting: bool, // using this instead of Option, because of borrowing
249}
250
251
252impl<'w, W> SortedBlocksWriter<'w, W> where W: ChunksWriter {
253
254 /// New sorting writer. Returns `None` if sorting is not required.
255 pub fn new(meta_data: &MetaData, chunk_writer: &'w mut W) -> SortedBlocksWriter<'w, W> {
256 let requires_sorting = meta_data.headers.iter()
257 .any(|header| header.line_order != LineOrder::Unspecified);
258
259 let total_chunk_count = chunk_writer.total_chunks_count();
260
261 SortedBlocksWriter {
262 pending_chunks: BTreeMap::new(),
263 unwritten_chunk_indices: (0 .. total_chunk_count).peekable(),
264 requires_sorting,
265 chunk_writer
266 }
267 }
268
269 /// Write the chunk or stash it. In the closure, write all chunks that can be written now.
270 pub fn write_or_stash_chunk(&mut self, chunk_index_in_file: usize, chunk_y_index: usize, chunk: Chunk) -> UnitResult {
271 if self.requires_sorting.not() {
272 return self.chunk_writer.write_chunk(chunk_y_index, chunk);
273 }
274
275 // write this chunk now if possible
276 if self.unwritten_chunk_indices.peek() == Some(&chunk_index_in_file){
277 self.chunk_writer.write_chunk(chunk_y_index, chunk)?;
278 self.unwritten_chunk_indices.next().expect("peeked chunk index is missing");
279
280 // write all pending blocks that are immediate successors of this block
281 while let Some((next_chunk_y_index, next_chunk)) = self
282 .unwritten_chunk_indices.peek().cloned()
283 .and_then(|id| self.pending_chunks.remove(&id))
284 {
285 self.chunk_writer.write_chunk(next_chunk_y_index, next_chunk)?;
286 self.unwritten_chunk_indices.next().expect("peeked chunk index is missing");
287 }
288 }
289
290 else {
291 // the argument block is not to be written now,
292 // and all the pending blocks are not next up either,
293 // so just stash this block
294 self.pending_chunks.insert(chunk_index_in_file, (chunk_y_index, chunk));
295 }
296
297 Ok(())
298 }
299
300 /// Where the chunks will be written to.
301 pub fn inner_chunks_writer(&self) -> &W {
302 &self.chunk_writer
303 }
304}
305
306
307
308/// Compress blocks to a chunk writer in this thread.
309#[derive(Debug)]
310#[must_use]
311pub struct SequentialBlocksCompressor<'w, W> {
312 meta: &'w MetaData,
313 chunks_writer: &'w mut W,
314}
315
316impl<'w, W> SequentialBlocksCompressor<'w, W> where W: 'w + ChunksWriter {
317
318 /// New blocks writer.
319 pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Self { Self { meta, chunks_writer, } }
320
321 /// This is where the compressed blocks are written to.
322 pub fn inner_chunks_writer(&'w self) -> &'w W { self.chunks_writer }
323
324 /// Compress a single block immediately. The index of the block must be in increasing line order.
325 pub fn compress_block(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult {
326 self.chunks_writer.write_chunk(
327 index_in_header_increasing_y,
328 chunk:block.compress_to_chunk(&self.meta.headers)?
329 )
330 }
331}
332
333/// Compress blocks to a chunk writer with multiple threads.
334#[derive(Debug)]
335#[must_use]
336pub struct ParallelBlocksCompressor<'w, W> {
337 meta: &'w MetaData,
338 sorted_writer: SortedBlocksWriter<'w, W>,
339
340 sender: flume::Sender<Result<(usize, usize, Chunk)>>,
341 receiver: flume::Receiver<Result<(usize, usize, Chunk)>>,
342 pool: rayon_core::ThreadPool,
343
344 currently_compressing_count: usize,
345 written_chunk_count: usize, // used to check for last chunk
346 max_threads: usize,
347 next_incoming_chunk_index: usize, // used to remember original chunk order
348}
349
350impl<'w, W> ParallelBlocksCompressor<'w, W> where W: 'w + ChunksWriter {
351
352 /// New blocks writer. Returns none if sequential compression should be used.
353 /// Use `new_with_thread_pool` to customize the threadpool.
354 pub fn new(meta: &'w MetaData, chunks_writer: &'w mut W) -> Option<Self> {
355 Self::new_with_thread_pool(meta, chunks_writer, ||{
356 rayon_core::ThreadPoolBuilder::new()
357 .thread_name(|index| format!("OpenEXR Block Compressor Thread #{}", index))
358 .build()
359 })
360 }
361
362 /// New blocks writer. Returns none if sequential compression should be used.
363 pub fn new_with_thread_pool<CreatePool>(
364 meta: &'w MetaData, chunks_writer: &'w mut W, try_create_thread_pool: CreatePool)
365 -> Option<Self>
366 where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError>
367 {
368 if meta.headers.iter().all(|head|head.compression == Compression::Uncompressed) {
369 return None;
370 }
371
372 // in case thread pool creation fails (for example on WASM currently),
373 // we revert to sequential compression
374 let pool = match try_create_thread_pool() {
375 Ok(pool) => pool,
376
377 // TODO print warning?
378 Err(_) => return None,
379 };
380
381 let max_threads = pool.current_num_threads().max(1).min(chunks_writer.total_chunks_count()) + 2; // ca one block for each thread at all times
382 let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic?
383
384 Some(Self {
385 sorted_writer: SortedBlocksWriter::new(meta, chunks_writer),
386 next_incoming_chunk_index: 0,
387 currently_compressing_count: 0,
388 written_chunk_count: 0,
389 sender: send,
390 receiver: recv,
391 max_threads,
392 pool,
393 meta,
394 })
395 }
396
397 /// This is where the compressed blocks are written to.
398 pub fn inner_chunks_writer(&'w self) -> &'w W { self.sorted_writer.inner_chunks_writer() }
399
400 // private, as may underflow counter in release mode
401 fn write_next_queued_chunk(&mut self) -> UnitResult {
402 debug_assert!(self.currently_compressing_count > 0, "cannot wait for chunks as there are none left");
403
404 let some_compressed_chunk = self.receiver.recv()
405 .expect("cannot receive compressed block");
406
407 self.currently_compressing_count -= 1;
408 let (chunk_file_index, chunk_y_index, chunk) = some_compressed_chunk?;
409 self.sorted_writer.write_or_stash_chunk(chunk_file_index, chunk_y_index, chunk)?;
410
411 self.written_chunk_count += 1;
412 Ok(())
413 }
414
415 /// Wait until all currently compressing chunks in the compressor have been written.
416 pub fn write_all_queued_chunks(&mut self) -> UnitResult {
417 while self.currently_compressing_count > 0 {
418 self.write_next_queued_chunk()?;
419 }
420
421 debug_assert_eq!(self.currently_compressing_count, 0, "counter does not match block count");
422 Ok(())
423 }
424
425 /// Add a single block to the compressor queue. The index of the block must be in increasing line order.
426 /// When calling this function for the last block, this method waits until all the blocks have been written.
427 /// This only works when you write as many blocks as the image expects, otherwise you can use `wait_for_all_remaining_chunks`.
428 /// Waits for a block from the queue to be written, if the queue already has enough items.
429 pub fn add_block_to_compression_queue(&mut self, index_in_header_increasing_y: usize, block: UncompressedBlock) -> UnitResult {
430
431 // if pipe is full, block to wait for a slot to free up
432 if self.currently_compressing_count >= self.max_threads {
433 self.write_next_queued_chunk()?;
434 }
435
436 // add the argument chunk to the compression queueue
437 let index_in_file = self.next_incoming_chunk_index;
438 let sender = self.sender.clone();
439 let meta = self.meta.clone();
440
441 self.pool.spawn(move ||{
442 let compressed_or_err = block.compress_to_chunk(&meta.headers);
443
444 // by now, decompressing could have failed in another thread.
445 // the error is then already handled, so we simply
446 // don't send the decompressed block and do nothing
447 let _ = sender.send(compressed_or_err.map(move |compressed| (index_in_file, index_in_header_increasing_y, compressed)));
448 });
449
450 self.currently_compressing_count += 1;
451 self.next_incoming_chunk_index += 1;
452
453 // if this is the last chunk, wait for all chunks to complete before returning
454 if self.written_chunk_count + self.currently_compressing_count == self.inner_chunks_writer().total_chunks_count() {
455 self.write_all_queued_chunks()?;
456 debug_assert_eq!(
457 self.written_chunk_count, self.inner_chunks_writer().total_chunks_count(),
458 "written chunk count mismatch"
459 );
460 }
461
462
463 Ok(())
464 }
465}
466
467
468
469