1 | //! This module implements per-component parallelism. |
2 | //! It should be possible to implement per-row parallelism as well, |
3 | //! which should also boost performance of grayscale images |
4 | //! and allow scaling to more cores. |
5 | //! However, that would be more complex, so we use this as a starting point. |
6 | |
7 | use super::immediate::ImmediateWorker; |
8 | use super::{RowData, Worker}; |
9 | use crate::decoder::MAX_COMPONENTS; |
10 | use crate::error::Result; |
11 | use std::{ |
12 | mem, |
13 | sync::mpsc::{self, Receiver, Sender}, |
14 | }; |
15 | |
16 | enum WorkerMsg { |
17 | Start(RowData), |
18 | AppendRow(Vec<i16>), |
19 | GetResult(Sender<Vec<u8>>), |
20 | } |
21 | |
22 | #[derive (Default)] |
23 | pub struct MpscWorker { |
24 | senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS], |
25 | } |
26 | |
27 | impl MpscWorker { |
28 | fn start_with( |
29 | &mut self, |
30 | row_data: RowData, |
31 | spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>, |
32 | ) -> Result<()> { |
33 | // if there is no worker thread for this component yet, start one |
34 | let component = row_data.index; |
35 | if self.senders[component].is_none() { |
36 | let sender = spawn_worker(component)?; |
37 | self.senders[component] = Some(sender); |
38 | } |
39 | |
40 | // we do the "take out value and put it back in once we're done" dance here |
41 | // and in all other message-passing methods because there's not that many rows |
42 | // and this should be cheaper than spawning MAX_COMPONENTS many threads up front |
43 | let sender = self.senders[component].as_mut().unwrap(); |
44 | sender |
45 | .send(WorkerMsg::Start(row_data)) |
46 | .expect("jpeg-decoder worker thread error" ); |
47 | Ok(()) |
48 | } |
49 | |
50 | fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { |
51 | let component = row.0; |
52 | let sender = self.senders[component].as_mut().unwrap(); |
53 | sender |
54 | .send(WorkerMsg::AppendRow(row.1)) |
55 | .expect("jpeg-decoder worker thread error" ); |
56 | Ok(()) |
57 | } |
58 | |
59 | fn get_result_with( |
60 | &mut self, |
61 | index: usize, |
62 | collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>, |
63 | ) -> Result<Vec<u8>> { |
64 | let (tx, rx) = mpsc::channel(); |
65 | let sender = mem::take(&mut self.senders[index]).unwrap(); |
66 | sender |
67 | .send(WorkerMsg::GetResult(tx)) |
68 | .expect("jpeg-decoder worker thread error" ); |
69 | Ok(collect(rx)) |
70 | } |
71 | } |
72 | |
73 | impl Worker for MpscWorker { |
74 | fn start(&mut self, row_data: RowData) -> Result<()> { |
75 | self.start_with(row_data, spawn_worker_thread) |
76 | } |
77 | fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { |
78 | MpscWorker::append_row(self, row) |
79 | } |
80 | fn get_result(&mut self, index: usize) -> Result<Vec<u8>> { |
81 | self.get_result_with(index, collect_worker_thread) |
82 | } |
83 | } |
84 | |
85 | fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) { |
86 | let (tx, rx) = mpsc::channel(); |
87 | let closure = move || { |
88 | let mut worker = ImmediateWorker::default(); |
89 | |
90 | while let Ok(message) = rx.recv() { |
91 | match message { |
92 | WorkerMsg::Start(mut data) => { |
93 | // we always set component index to 0 for worker threads |
94 | // because they only ever handle one per thread and we don't want them |
95 | // to attempt to access nonexistent components |
96 | data.index = 0; |
97 | worker.start_immediate(data); |
98 | } |
99 | WorkerMsg::AppendRow(row) => { |
100 | worker.append_row_immediate((0, row)); |
101 | } |
102 | WorkerMsg::GetResult(chan) => { |
103 | let _ = chan.send(worker.get_result_immediate(0)); |
104 | break; |
105 | } |
106 | } |
107 | } |
108 | }; |
109 | |
110 | (tx, closure) |
111 | } |
112 | |
113 | fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> { |
114 | let (tx: Sender, worker: impl FnOnce()) = create_worker(); |
115 | let thread_builder: Builder = |
116 | std::thread::Builder::new().name(format!("worker thread for component {}" , component)); |
117 | thread_builder.spawn(worker)?; |
118 | Ok(tx) |
119 | } |
120 | |
121 | fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> { |
122 | rx.recv().expect(msg:"jpeg-decoder worker thread error" ) |
123 | } |
124 | |