1 | use rayon::iter::{IndexedParallelIterator, ParallelIterator}; |
2 | use rayon::slice::ParallelSliceMut; |
3 | |
4 | use crate::decoder::{choose_color_convert_func, ColorTransform}; |
5 | use crate::error::Result; |
6 | use crate::idct::dequantize_and_idct_block; |
7 | use crate::parser::Component; |
8 | use crate::upsampler::Upsampler; |
9 | use crate::{decoder::MAX_COMPONENTS, parser::Dimensions}; |
10 | |
11 | use std::sync::Arc; |
12 | |
13 | use super::{RowData, Worker}; |
14 | |
15 | /// Technically similar to `immediate::ImmediateWorker` but we copy it since we may prefer |
16 | /// different style of managing the memory allocation, something that multiple actors can access in |
17 | /// parallel. |
18 | #[derive (Default)] |
19 | struct ImmediateWorker { |
20 | offsets: [usize; MAX_COMPONENTS], |
21 | results: [Vec<u8>; MAX_COMPONENTS], |
22 | components: [Option<Component>; MAX_COMPONENTS], |
23 | quantization_tables: [Option<Arc<[u16; 64]>>; MAX_COMPONENTS], |
24 | } |
25 | |
26 | #[derive (Clone, Copy)] |
27 | struct ComponentMetadata { |
28 | block_width: usize, |
29 | block_count: usize, |
30 | line_stride: usize, |
31 | dct_scale: usize, |
32 | } |
33 | |
34 | #[derive (Default)] |
35 | pub struct Scoped { |
36 | inner: ImmediateWorker, |
37 | } |
38 | |
39 | impl ImmediateWorker { |
40 | pub fn start_immediate(&mut self, data: RowData) { |
41 | let elements = data.component.block_size.width as usize |
42 | * data.component.block_size.height as usize |
43 | * data.component.dct_scale |
44 | * data.component.dct_scale; |
45 | self.offsets[data.index] = 0; |
46 | self.results[data.index].resize(elements, 0u8); |
47 | self.components[data.index] = Some(data.component); |
48 | self.quantization_tables[data.index] = Some(data.quantization_table); |
49 | } |
50 | |
51 | pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> { |
52 | core::mem::take(&mut self.results[index]) |
53 | } |
54 | |
55 | pub fn component_metadata(&self, index: usize) -> Option<ComponentMetadata> { |
56 | let component = self.components[index].as_ref()?; |
57 | let block_size = component.block_size; |
58 | let block_width = block_size.width as usize; |
59 | let block_count = block_size.width as usize * component.vertical_sampling_factor as usize; |
60 | let line_stride = block_size.width as usize * component.dct_scale; |
61 | let dct_scale = component.dct_scale; |
62 | |
63 | Some(ComponentMetadata { |
64 | block_width, |
65 | block_count, |
66 | line_stride, |
67 | dct_scale, |
68 | }) |
69 | } |
70 | |
71 | pub fn append_row_locked( |
72 | quantization_table: Arc<[u16; 64]>, |
73 | metadata: ComponentMetadata, |
74 | data: Vec<i16>, |
75 | result_block: &mut [u8], |
76 | ) { |
77 | // Convert coefficients from a MCU row to samples. |
78 | let ComponentMetadata { |
79 | block_count, |
80 | line_stride, |
81 | block_width, |
82 | dct_scale, |
83 | } = metadata; |
84 | |
85 | assert_eq!(data.len(), block_count * 64); |
86 | |
87 | let mut output_buffer = [0; 64]; |
88 | for i in 0..block_count { |
89 | let x = (i % block_width) * dct_scale; |
90 | let y = (i / block_width) * dct_scale; |
91 | |
92 | let coefficients: &[i16; 64] = &data[i * 64..(i + 1) * 64].try_into().unwrap(); |
93 | |
94 | // Write to a temporary intermediate buffer, a 8x8 'image'. |
95 | dequantize_and_idct_block( |
96 | dct_scale, |
97 | coefficients, |
98 | &quantization_table, |
99 | 8, |
100 | &mut output_buffer, |
101 | ); |
102 | |
103 | let write_back = &mut result_block[y * line_stride + x..]; |
104 | |
105 | let buffered_lines = output_buffer.chunks_mut(8); |
106 | let back_lines = write_back.chunks_mut(line_stride); |
107 | |
108 | for (buf, back) in buffered_lines.zip(back_lines).take(dct_scale) { |
109 | back[..dct_scale].copy_from_slice(&buf[..dct_scale]); |
110 | } |
111 | } |
112 | } |
113 | } |
114 | |
115 | impl Worker for Scoped { |
116 | fn start(&mut self, row_data: RowData) -> Result<()> { |
117 | self.inner.start_immediate(row_data); |
118 | Ok(()) |
119 | } |
120 | |
121 | fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { |
122 | let inner = &mut self.inner; |
123 | let (index, data) = row; |
124 | |
125 | let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone(); |
126 | let metadata = inner.component_metadata(index).unwrap(); |
127 | let result_block = &mut inner.results[index][inner.offsets[index]..]; |
128 | inner.offsets[index] += metadata.bytes_used(); |
129 | |
130 | ImmediateWorker::append_row_locked(quantization_table, metadata, data, result_block); |
131 | Ok(()) |
132 | } |
133 | |
134 | fn get_result(&mut self, index: usize) -> Result<Vec<u8>> { |
135 | let result = self.inner.get_result_immediate(index); |
136 | Ok(result) |
137 | } |
138 | |
139 | // Magic sauce, these _may_ run in parallel. |
140 | fn append_rows(&mut self, iter: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> { |
141 | let inner = &mut self.inner; |
142 | rayon::in_place_scope(|scope| { |
143 | let metadatas = [ |
144 | inner.component_metadata(0), |
145 | inner.component_metadata(1), |
146 | inner.component_metadata(2), |
147 | inner.component_metadata(3), |
148 | ]; |
149 | |
150 | let [res0, res1, res2, res3] = &mut inner.results; |
151 | |
152 | // Lazily get the blocks. Note: if we've already collected results from a component |
153 | // then the result vector has already been deallocated/taken. But no more tasks should |
154 | // be created for it. |
155 | let mut result_blocks = [ |
156 | res0.get_mut(inner.offsets[0]..).unwrap_or(&mut []), |
157 | res1.get_mut(inner.offsets[1]..).unwrap_or(&mut []), |
158 | res2.get_mut(inner.offsets[2]..).unwrap_or(&mut []), |
159 | res3.get_mut(inner.offsets[3]..).unwrap_or(&mut []), |
160 | ]; |
161 | |
162 | // First we schedule everything, making sure their index is right etc. |
163 | for (index, data) in iter { |
164 | let metadata = metadatas[index].unwrap(); |
165 | let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone(); |
166 | |
167 | inner.offsets[index] += metadata.bytes_used(); |
168 | let (result_block, tail) = |
169 | core::mem::take(&mut result_blocks[index]).split_at_mut(metadata.bytes_used()); |
170 | result_blocks[index] = tail; |
171 | |
172 | scope.spawn(move |_| { |
173 | ImmediateWorker::append_row_locked( |
174 | quantization_table, |
175 | metadata, |
176 | data, |
177 | result_block, |
178 | ) |
179 | }); |
180 | } |
181 | }); |
182 | |
183 | Ok(()) |
184 | } |
185 | } |
186 | |
187 | impl ComponentMetadata { |
188 | fn bytes_used(&self) -> usize { |
189 | self.block_count * self.dct_scale * self.dct_scale |
190 | } |
191 | } |
192 | |
193 | pub fn compute_image_parallel( |
194 | components: &[Component], |
195 | data: Vec<Vec<u8>>, |
196 | output_size: Dimensions, |
197 | color_transform: ColorTransform, |
198 | ) -> Result<Vec<u8>> { |
199 | let color_convert_func: fn(&[Vec], &mut [u8]) = choose_color_convert_func(component_count:components.len(), color_transform)?; |
200 | let upsampler: Upsampler = Upsampler::new(components, output_width:output_size.width, output_height:output_size.height)?; |
201 | let line_size: usize = output_size.width as usize * components.len(); |
202 | let mut image: Vec = vec![0u8; line_size * output_size.height as usize]; |
203 | |
204 | image |
205 | .par_chunks_mut(line_size) |
206 | .with_max_len(1) |
207 | .enumerate() |
208 | .for_each(|(row: usize, line: &mut [u8])| { |
209 | upsampler.upsample_and_interleave_row( |
210 | &data, |
211 | row, |
212 | output_width:output_size.width as usize, |
213 | output:line, |
214 | color_convert_func, |
215 | ); |
216 | }); |
217 | |
218 | Ok(image) |
219 | } |
220 | |