1 | // Copyright © SixtyFPS GmbH <info@slint.dev> |
2 | // SPDX-License-Identifier: MIT |
3 | |
4 | use std::pin::Pin; |
5 | |
6 | use bytemuck::Pod; |
7 | use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; |
8 | use cpal::SizedSample; |
9 | |
10 | use futures::future::OptionFuture; |
11 | use futures::FutureExt; |
12 | use ringbuf::ring_buffer::{RbRef, RbWrite}; |
13 | use ringbuf::HeapRb; |
14 | use std::future::Future; |
15 | |
16 | use super::ControlCommand; |
17 | |
18 | pub struct AudioPlaybackThread { |
19 | control_sender: smol::channel::Sender<ControlCommand>, |
20 | packet_sender: smol::channel::Sender<ffmpeg_next::codec::packet::packet::Packet>, |
21 | receiver_thread: Option<std::thread::JoinHandle<()>>, |
22 | } |
23 | |
24 | impl AudioPlaybackThread { |
25 | pub fn start(stream: &ffmpeg_next::format::stream::Stream) -> Result<Self, anyhow::Error> { |
26 | let (control_sender, control_receiver) = smol::channel::unbounded(); |
27 | |
28 | let (packet_sender, packet_receiver) = smol::channel::bounded(128); |
29 | |
30 | let decoder_context = ffmpeg_next::codec::Context::from_parameters(stream.parameters())?; |
31 | let packet_decoder = decoder_context.decoder().audio()?; |
32 | |
33 | let host = cpal::default_host(); |
34 | let device = host.default_output_device().expect("no output device available" ); |
35 | |
36 | let config = device.default_output_config().unwrap(); |
37 | |
38 | let receiver_thread = |
39 | std::thread::Builder::new().name("audio playback thread" .into()).spawn(move || { |
40 | smol::block_on(async move { |
41 | let output_channel_layout = match config.channels() { |
42 | 1 => ffmpeg_next::util::channel_layout::ChannelLayout::MONO, |
43 | 2 => { |
44 | ffmpeg_next::util::channel_layout::ChannelLayout::STEREO_LEFT |
45 | | ffmpeg_next::util::channel_layout::ChannelLayout::STEREO_RIGHT |
46 | } |
47 | _ => todo!(), |
48 | }; |
49 | |
50 | let mut ffmpeg_to_cpal_forwarder = match config.sample_format() { |
51 | cpal::SampleFormat::U8 => FFmpegToCPalForwarder::new::<u8>( |
52 | config, |
53 | &device, |
54 | packet_receiver, |
55 | packet_decoder, |
56 | ffmpeg_next::util::format::sample::Sample::U8( |
57 | ffmpeg_next::util::format::sample::Type::Packed, |
58 | ), |
59 | output_channel_layout, |
60 | ), |
61 | cpal::SampleFormat::F32 => FFmpegToCPalForwarder::new::<f32>( |
62 | config, |
63 | &device, |
64 | packet_receiver, |
65 | packet_decoder, |
66 | ffmpeg_next::util::format::sample::Sample::F32( |
67 | ffmpeg_next::util::format::sample::Type::Packed, |
68 | ), |
69 | output_channel_layout, |
70 | ), |
71 | format @ _ => todo!("unsupported cpal output format {:#?}" , format), |
72 | }; |
73 | |
74 | let packet_receiver_impl = |
75 | async { ffmpeg_to_cpal_forwarder.stream().await }.fuse().shared(); |
76 | |
77 | let mut playing = true; |
78 | |
79 | loop { |
80 | let packet_receiver: OptionFuture<_> = |
81 | if playing { Some(packet_receiver_impl.clone()) } else { None }.into(); |
82 | |
83 | smol::pin!(packet_receiver); |
84 | |
85 | futures::select! { |
86 | _ = packet_receiver => {}, |
87 | received_command = control_receiver.recv().fuse() => { |
88 | match received_command { |
89 | Ok(ControlCommand::Pause) => { |
90 | playing = false; |
91 | } |
92 | Ok(ControlCommand::Play) => { |
93 | playing = true; |
94 | } |
95 | Err(_) => { |
96 | // Channel closed -> quit |
97 | return; |
98 | } |
99 | } |
100 | } |
101 | } |
102 | } |
103 | }) |
104 | })?; |
105 | |
106 | Ok(Self { control_sender, packet_sender, receiver_thread: Some(receiver_thread) }) |
107 | } |
108 | |
109 | pub async fn receive_packet(&self, packet: ffmpeg_next::codec::packet::packet::Packet) -> bool { |
110 | match self.packet_sender.send(packet).await { |
111 | Ok(_) => return true, |
112 | Err(smol::channel::SendError(_)) => return false, |
113 | } |
114 | } |
115 | |
116 | pub async fn send_control_message(&self, message: ControlCommand) { |
117 | self.control_sender.send(message).await.unwrap(); |
118 | } |
119 | } |
120 | |
121 | impl Drop for AudioPlaybackThread { |
122 | fn drop(&mut self) { |
123 | self.control_sender.close(); |
124 | if let Some(receiver_join_handle: JoinHandle<()>) = self.receiver_thread.take() { |
125 | receiver_join_handle.join().unwrap(); |
126 | } |
127 | } |
128 | } |
129 | |
130 | trait FFMpegToCPalSampleForwarder { |
131 | fn forward( |
132 | &mut self, |
133 | audio_frame: ffmpeg_next::frame::Audio, |
134 | ) -> Pin<Box<dyn Future<Output = ()> + '_>>; |
135 | } |
136 | |
137 | impl<T: Pod, R: RbRef> FFMpegToCPalSampleForwarder for ringbuf::Producer<T, R> |
138 | where |
139 | <R as RbRef>::Rb: RbWrite<T>, |
140 | { |
141 | fn forward( |
142 | &mut self, |
143 | audio_frame: ffmpeg_next::frame::Audio, |
144 | ) -> Pin<Box<dyn Future<Output = ()> + '_>> { |
145 | Box::pin(async move { |
146 | // Audio::plane() returns the wrong slice size, so correct it by hand. See also |
147 | // for a fix https://github.com/zmwangx/rust-ffmpeg/pull/104. |
148 | let expected_bytes: usize = |
149 | audio_frame.samples() * audio_frame.channels() as usize * core::mem::size_of::<T>(); |
150 | let cpal_sample_data: &[T] = |
151 | bytemuck::cast_slice(&audio_frame.data(index:0)[..expected_bytes]); |
152 | |
153 | while self.free_len() < cpal_sample_data.len() { |
154 | smol::Timer::after(std::time::Duration::from_millis(16)).await; |
155 | } |
156 | |
157 | // Buffer the samples for playback |
158 | self.push_slice(elems:cpal_sample_data); |
159 | }) |
160 | } |
161 | } |
162 | |
163 | struct FFmpegToCPalForwarder { |
164 | _cpal_stream: cpal::Stream, |
165 | ffmpeg_to_cpal_pipe: Box<dyn FFMpegToCPalSampleForwarder>, |
166 | packet_receiver: smol::channel::Receiver<ffmpeg_next::codec::packet::packet::Packet>, |
167 | packet_decoder: ffmpeg_next::decoder::Audio, |
168 | resampler: ffmpeg_next::software::resampling::Context, |
169 | } |
170 | |
171 | impl FFmpegToCPalForwarder { |
172 | fn new<T: Send + Pod + SizedSample + 'static>( |
173 | config: cpal::SupportedStreamConfig, |
174 | device: &cpal::Device, |
175 | packet_receiver: smol::channel::Receiver<ffmpeg_next::codec::packet::packet::Packet>, |
176 | packet_decoder: ffmpeg_next::decoder::Audio, |
177 | output_format: ffmpeg_next::util::format::sample::Sample, |
178 | output_channel_layout: ffmpeg_next::util::channel_layout::ChannelLayout, |
179 | ) -> Self { |
180 | let buffer = HeapRb::new(4096); |
181 | let (sample_producer, mut sample_consumer) = buffer.split(); |
182 | |
183 | let cpal_stream = device |
184 | .build_output_stream( |
185 | &config.config(), |
186 | move |data, _| { |
187 | let filled = sample_consumer.pop_slice(data); |
188 | data[filled..].fill(T::EQUILIBRIUM); |
189 | }, |
190 | move |err| { |
191 | eprintln!("error feeding audio stream to cpal: {}" , err); |
192 | }, |
193 | None, |
194 | ) |
195 | .unwrap(); |
196 | |
197 | cpal_stream.play().unwrap(); |
198 | |
199 | let resampler = ffmpeg_next::software::resampling::Context::get( |
200 | packet_decoder.format(), |
201 | packet_decoder.channel_layout(), |
202 | packet_decoder.rate(), |
203 | output_format, |
204 | output_channel_layout, |
205 | config.sample_rate().0, |
206 | ) |
207 | .unwrap(); |
208 | |
209 | Self { |
210 | _cpal_stream: cpal_stream, |
211 | ffmpeg_to_cpal_pipe: Box::new(sample_producer), |
212 | packet_receiver, |
213 | packet_decoder, |
214 | resampler, |
215 | } |
216 | } |
217 | |
218 | async fn stream(&mut self) { |
219 | loop { |
220 | let Ok(packet) = self.packet_receiver.recv().await else { break }; |
221 | |
222 | self.packet_decoder.send_packet(&packet).unwrap(); |
223 | |
224 | let mut decoded_frame = ffmpeg_next::util::frame::Audio::empty(); |
225 | |
226 | while self.packet_decoder.receive_frame(&mut decoded_frame).is_ok() { |
227 | let mut resampled_frame = ffmpeg_next::util::frame::Audio::empty(); |
228 | self.resampler.run(&decoded_frame, &mut resampled_frame).unwrap(); |
229 | |
230 | self.ffmpeg_to_cpal_pipe.forward(resampled_frame).await; |
231 | } |
232 | } |
233 | } |
234 | } |
235 | |