| 1 | //! This module implements per-component parallelism. |
| 2 | //! It should be possible to implement per-row parallelism as well, |
| 3 | //! which should also boost performance of grayscale images |
| 4 | //! and allow scaling to more cores. |
| 5 | //! However, that would be more complex, so we use this as a starting point. |
| 6 | |
| 7 | use super::immediate::ImmediateWorker; |
| 8 | use super::{RowData, Worker}; |
| 9 | use crate::decoder::MAX_COMPONENTS; |
| 10 | use crate::error::Result; |
| 11 | use std::{ |
| 12 | mem, |
| 13 | sync::mpsc::{self, Receiver, Sender}, |
| 14 | }; |
| 15 | |
| 16 | enum WorkerMsg { |
| 17 | Start(RowData), |
| 18 | AppendRow(Vec<i16>), |
| 19 | GetResult(Sender<Vec<u8>>), |
| 20 | } |
| 21 | |
| 22 | #[derive (Default)] |
| 23 | pub struct MpscWorker { |
| 24 | senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS], |
| 25 | } |
| 26 | |
| 27 | impl MpscWorker { |
| 28 | fn start_with( |
| 29 | &mut self, |
| 30 | row_data: RowData, |
| 31 | spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>, |
| 32 | ) -> Result<()> { |
| 33 | // if there is no worker thread for this component yet, start one |
| 34 | let component = row_data.index; |
| 35 | if self.senders[component].is_none() { |
| 36 | let sender = spawn_worker(component)?; |
| 37 | self.senders[component] = Some(sender); |
| 38 | } |
| 39 | |
| 40 | // we do the "take out value and put it back in once we're done" dance here |
| 41 | // and in all other message-passing methods because there's not that many rows |
| 42 | // and this should be cheaper than spawning MAX_COMPONENTS many threads up front |
| 43 | let sender = self.senders[component].as_mut().unwrap(); |
| 44 | sender |
| 45 | .send(WorkerMsg::Start(row_data)) |
| 46 | .expect("jpeg-decoder worker thread error" ); |
| 47 | Ok(()) |
| 48 | } |
| 49 | |
| 50 | fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { |
| 51 | let component = row.0; |
| 52 | let sender = self.senders[component].as_mut().unwrap(); |
| 53 | sender |
| 54 | .send(WorkerMsg::AppendRow(row.1)) |
| 55 | .expect("jpeg-decoder worker thread error" ); |
| 56 | Ok(()) |
| 57 | } |
| 58 | |
| 59 | fn get_result_with( |
| 60 | &mut self, |
| 61 | index: usize, |
| 62 | collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>, |
| 63 | ) -> Result<Vec<u8>> { |
| 64 | let (tx, rx) = mpsc::channel(); |
| 65 | let sender = mem::take(&mut self.senders[index]).unwrap(); |
| 66 | sender |
| 67 | .send(WorkerMsg::GetResult(tx)) |
| 68 | .expect("jpeg-decoder worker thread error" ); |
| 69 | Ok(collect(rx)) |
| 70 | } |
| 71 | } |
| 72 | |
| 73 | impl Worker for MpscWorker { |
| 74 | fn start(&mut self, row_data: RowData) -> Result<()> { |
| 75 | self.start_with(row_data, spawn_worker_thread) |
| 76 | } |
| 77 | fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> { |
| 78 | MpscWorker::append_row(self, row) |
| 79 | } |
| 80 | fn get_result(&mut self, index: usize) -> Result<Vec<u8>> { |
| 81 | self.get_result_with(index, collect_worker_thread) |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) { |
| 86 | let (tx, rx) = mpsc::channel(); |
| 87 | let closure = move || { |
| 88 | let mut worker = ImmediateWorker::default(); |
| 89 | |
| 90 | while let Ok(message) = rx.recv() { |
| 91 | match message { |
| 92 | WorkerMsg::Start(mut data) => { |
| 93 | // we always set component index to 0 for worker threads |
| 94 | // because they only ever handle one per thread and we don't want them |
| 95 | // to attempt to access nonexistent components |
| 96 | data.index = 0; |
| 97 | worker.start_immediate(data); |
| 98 | } |
| 99 | WorkerMsg::AppendRow(row) => { |
| 100 | worker.append_row_immediate((0, row)); |
| 101 | } |
| 102 | WorkerMsg::GetResult(chan) => { |
| 103 | let _ = chan.send(worker.get_result_immediate(0)); |
| 104 | break; |
| 105 | } |
| 106 | } |
| 107 | } |
| 108 | }; |
| 109 | |
| 110 | (tx, closure) |
| 111 | } |
| 112 | |
| 113 | fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> { |
| 114 | let (tx: Sender, worker: impl FnOnce()) = create_worker(); |
| 115 | let thread_builder: Builder = |
| 116 | std::thread::Builder::new().name(format!("worker thread for component {}" , component)); |
| 117 | thread_builder.spawn(worker)?; |
| 118 | Ok(tx) |
| 119 | } |
| 120 | |
| 121 | fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> { |
| 122 | rx.recv().expect(msg:"jpeg-decoder worker thread error" ) |
| 123 | } |
| 124 | |