1// Copyright © SixtyFPS GmbH <info@slint.dev>
2// SPDX-License-Identifier: MIT
3
4use std::pin::Pin;
5
6use bytemuck::Pod;
7use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
8use cpal::SizedSample;
9
10use futures::future::OptionFuture;
11use futures::FutureExt;
12use ringbuf::ring_buffer::{RbRef, RbWrite};
13use ringbuf::HeapRb;
14use std::future::Future;
15
16use super::ControlCommand;
17
18pub struct AudioPlaybackThread {
19 control_sender: smol::channel::Sender<ControlCommand>,
20 packet_sender: smol::channel::Sender<ffmpeg_next::codec::packet::packet::Packet>,
21 receiver_thread: Option<std::thread::JoinHandle<()>>,
22}
23
24impl AudioPlaybackThread {
25 pub fn start(stream: &ffmpeg_next::format::stream::Stream) -> Result<Self, anyhow::Error> {
26 let (control_sender, control_receiver) = smol::channel::unbounded();
27
28 let (packet_sender, packet_receiver) = smol::channel::bounded(128);
29
30 let decoder_context = ffmpeg_next::codec::Context::from_parameters(stream.parameters())?;
31 let packet_decoder = decoder_context.decoder().audio()?;
32
33 let host = cpal::default_host();
34 let device = host.default_output_device().expect("no output device available");
35
36 let config = device.default_output_config().unwrap();
37
38 let receiver_thread =
39 std::thread::Builder::new().name("audio playback thread".into()).spawn(move || {
40 smol::block_on(async move {
41 let output_channel_layout = match config.channels() {
42 1 => ffmpeg_next::util::channel_layout::ChannelLayout::MONO,
43 2 => {
44 ffmpeg_next::util::channel_layout::ChannelLayout::STEREO_LEFT
45 | ffmpeg_next::util::channel_layout::ChannelLayout::STEREO_RIGHT
46 }
47 _ => todo!(),
48 };
49
50 let mut ffmpeg_to_cpal_forwarder = match config.sample_format() {
51 cpal::SampleFormat::U8 => FFmpegToCPalForwarder::new::<u8>(
52 config,
53 &device,
54 packet_receiver,
55 packet_decoder,
56 ffmpeg_next::util::format::sample::Sample::U8(
57 ffmpeg_next::util::format::sample::Type::Packed,
58 ),
59 output_channel_layout,
60 ),
61 cpal::SampleFormat::F32 => FFmpegToCPalForwarder::new::<f32>(
62 config,
63 &device,
64 packet_receiver,
65 packet_decoder,
66 ffmpeg_next::util::format::sample::Sample::F32(
67 ffmpeg_next::util::format::sample::Type::Packed,
68 ),
69 output_channel_layout,
70 ),
71 format @ _ => todo!("unsupported cpal output format {:#?}", format),
72 };
73
74 let packet_receiver_impl =
75 async { ffmpeg_to_cpal_forwarder.stream().await }.fuse().shared();
76
77 let mut playing = true;
78
79 loop {
80 let packet_receiver: OptionFuture<_> =
81 if playing { Some(packet_receiver_impl.clone()) } else { None }.into();
82
83 smol::pin!(packet_receiver);
84
85 futures::select! {
86 _ = packet_receiver => {},
87 received_command = control_receiver.recv().fuse() => {
88 match received_command {
89 Ok(ControlCommand::Pause) => {
90 playing = false;
91 }
92 Ok(ControlCommand::Play) => {
93 playing = true;
94 }
95 Err(_) => {
96 // Channel closed -> quit
97 return;
98 }
99 }
100 }
101 }
102 }
103 })
104 })?;
105
106 Ok(Self { control_sender, packet_sender, receiver_thread: Some(receiver_thread) })
107 }
108
109 pub async fn receive_packet(&self, packet: ffmpeg_next::codec::packet::packet::Packet) -> bool {
110 match self.packet_sender.send(packet).await {
111 Ok(_) => return true,
112 Err(smol::channel::SendError(_)) => return false,
113 }
114 }
115
116 pub async fn send_control_message(&self, message: ControlCommand) {
117 self.control_sender.send(message).await.unwrap();
118 }
119}
120
121impl Drop for AudioPlaybackThread {
122 fn drop(&mut self) {
123 self.control_sender.close();
124 if let Some(receiver_join_handle: JoinHandle<()>) = self.receiver_thread.take() {
125 receiver_join_handle.join().unwrap();
126 }
127 }
128}
129
130trait FFMpegToCPalSampleForwarder {
131 fn forward(
132 &mut self,
133 audio_frame: ffmpeg_next::frame::Audio,
134 ) -> Pin<Box<dyn Future<Output = ()> + '_>>;
135}
136
137impl<T: Pod, R: RbRef> FFMpegToCPalSampleForwarder for ringbuf::Producer<T, R>
138where
139 <R as RbRef>::Rb: RbWrite<T>,
140{
141 fn forward(
142 &mut self,
143 audio_frame: ffmpeg_next::frame::Audio,
144 ) -> Pin<Box<dyn Future<Output = ()> + '_>> {
145 Box::pin(async move {
146 // Audio::plane() returns the wrong slice size, so correct it by hand. See also
147 // for a fix https://github.com/zmwangx/rust-ffmpeg/pull/104.
148 let expected_bytes: usize =
149 audio_frame.samples() * audio_frame.channels() as usize * core::mem::size_of::<T>();
150 let cpal_sample_data: &[T] =
151 bytemuck::cast_slice(&audio_frame.data(index:0)[..expected_bytes]);
152
153 while self.free_len() < cpal_sample_data.len() {
154 smol::Timer::after(std::time::Duration::from_millis(16)).await;
155 }
156
157 // Buffer the samples for playback
158 self.push_slice(elems:cpal_sample_data);
159 })
160 }
161}
162
163struct FFmpegToCPalForwarder {
164 _cpal_stream: cpal::Stream,
165 ffmpeg_to_cpal_pipe: Box<dyn FFMpegToCPalSampleForwarder>,
166 packet_receiver: smol::channel::Receiver<ffmpeg_next::codec::packet::packet::Packet>,
167 packet_decoder: ffmpeg_next::decoder::Audio,
168 resampler: ffmpeg_next::software::resampling::Context,
169}
170
171impl FFmpegToCPalForwarder {
172 fn new<T: Send + Pod + SizedSample + 'static>(
173 config: cpal::SupportedStreamConfig,
174 device: &cpal::Device,
175 packet_receiver: smol::channel::Receiver<ffmpeg_next::codec::packet::packet::Packet>,
176 packet_decoder: ffmpeg_next::decoder::Audio,
177 output_format: ffmpeg_next::util::format::sample::Sample,
178 output_channel_layout: ffmpeg_next::util::channel_layout::ChannelLayout,
179 ) -> Self {
180 let buffer = HeapRb::new(4096);
181 let (sample_producer, mut sample_consumer) = buffer.split();
182
183 let cpal_stream = device
184 .build_output_stream(
185 &config.config(),
186 move |data, _| {
187 let filled = sample_consumer.pop_slice(data);
188 data[filled..].fill(T::EQUILIBRIUM);
189 },
190 move |err| {
191 eprintln!("error feeding audio stream to cpal: {}", err);
192 },
193 None,
194 )
195 .unwrap();
196
197 cpal_stream.play().unwrap();
198
199 let resampler = ffmpeg_next::software::resampling::Context::get(
200 packet_decoder.format(),
201 packet_decoder.channel_layout(),
202 packet_decoder.rate(),
203 output_format,
204 output_channel_layout,
205 config.sample_rate().0,
206 )
207 .unwrap();
208
209 Self {
210 _cpal_stream: cpal_stream,
211 ffmpeg_to_cpal_pipe: Box::new(sample_producer),
212 packet_receiver,
213 packet_decoder,
214 resampler,
215 }
216 }
217
218 async fn stream(&mut self) {
219 loop {
220 let Ok(packet) = self.packet_receiver.recv().await else { break };
221
222 self.packet_decoder.send_packet(&packet).unwrap();
223
224 let mut decoded_frame = ffmpeg_next::util::frame::Audio::empty();
225
226 while self.packet_decoder.receive_frame(&mut decoded_frame).is_ok() {
227 let mut resampled_frame = ffmpeg_next::util::frame::Audio::empty();
228 self.resampler.run(&decoded_frame, &mut resampled_frame).unwrap();
229
230 self.ffmpeg_to_cpal_pipe.forward(resampled_frame).await;
231 }
232 }
233 }
234}
235