1// Copyright © SixtyFPS GmbH <info@slint.dev>
2// SPDX-License-Identifier: MIT
3
4use futures::{future::OptionFuture, FutureExt};
5
6use super::ControlCommand;
7
8pub struct VideoPlaybackThread {
9 control_sender: smol::channel::Sender<ControlCommand>,
10 packet_sender: smol::channel::Sender<ffmpeg_next::codec::packet::packet::Packet>,
11 receiver_thread: Option<std::thread::JoinHandle<()>>,
12}
13
14impl VideoPlaybackThread {
15 pub fn start(
16 stream: &ffmpeg_next::format::stream::Stream,
17 mut video_frame_callback: Box<dyn FnMut(&ffmpeg_next::util::frame::Video) + Send>,
18 ) -> Result<Self, anyhow::Error> {
19 let (control_sender, control_receiver) = smol::channel::unbounded();
20
21 let (packet_sender, packet_receiver) = smol::channel::bounded(128);
22
23 let decoder_context = ffmpeg_next::codec::Context::from_parameters(stream.parameters())?;
24 let mut packet_decoder = decoder_context.decoder().video()?;
25
26 let clock = StreamClock::new(stream);
27
28 let receiver_thread =
29 std::thread::Builder::new().name("video playback thread".into()).spawn(move || {
30 smol::block_on(async move {
31 let packet_receiver_impl = async {
32 loop {
33 let Ok(packet) = packet_receiver.recv().await else { break };
34
35 smol::future::yield_now().await;
36
37 packet_decoder.send_packet(&packet).unwrap();
38
39 let mut decoded_frame = ffmpeg_next::util::frame::Video::empty();
40
41 while packet_decoder.receive_frame(&mut decoded_frame).is_ok() {
42 if let Some(delay) =
43 clock.convert_pts_to_instant(decoded_frame.pts())
44 {
45 smol::Timer::after(delay).await;
46 }
47
48 video_frame_callback(&decoded_frame);
49 }
50 }
51 }
52 .fuse()
53 .shared();
54
55 let mut playing = true;
56
57 loop {
58 let packet_receiver: OptionFuture<_> =
59 if playing { Some(packet_receiver_impl.clone()) } else { None }.into();
60
61 smol::pin!(packet_receiver);
62
63 futures::select! {
64 _ = packet_receiver => {},
65 received_command = control_receiver.recv().fuse() => {
66 match received_command {
67 Ok(ControlCommand::Pause) => {
68 playing = false;
69 }
70 Ok(ControlCommand::Play) => {
71 playing = true;
72 }
73 Err(_) => {
74 // Channel closed -> quit
75 return;
76 }
77 }
78 }
79 }
80 }
81 })
82 })?;
83
84 Ok(Self { control_sender, packet_sender, receiver_thread: Some(receiver_thread) })
85 }
86
87 pub async fn receive_packet(&self, packet: ffmpeg_next::codec::packet::packet::Packet) -> bool {
88 match self.packet_sender.send(packet).await {
89 Ok(_) => return true,
90 Err(smol::channel::SendError(_)) => return false,
91 }
92 }
93
94 pub async fn send_control_message(&self, message: ControlCommand) {
95 self.control_sender.send(message).await.unwrap();
96 }
97}
98
99impl Drop for VideoPlaybackThread {
100 fn drop(&mut self) {
101 self.control_sender.close();
102 if let Some(receiver_join_handle: JoinHandle<()>) = self.receiver_thread.take() {
103 receiver_join_handle.join().unwrap();
104 }
105 }
106}
107
108struct StreamClock {
109 time_base_seconds: f64,
110 start_time: std::time::Instant,
111}
112
113impl StreamClock {
114 fn new(stream: &ffmpeg_next::format::stream::Stream) -> Self {
115 let time_base_seconds: Rational = stream.time_base();
116 let time_base_seconds: f64 =
117 time_base_seconds.numerator() as f64 / time_base_seconds.denominator() as f64;
118
119 let start_time: Instant = std::time::Instant::now();
120
121 Self { time_base_seconds, start_time }
122 }
123
124 fn convert_pts_to_instant(&self, pts: Option<i64>) -> Option<std::time::Duration> {
125 ptsOption.and_then(|pts: i64| {
126 let pts_since_start: Duration =
127 std::time::Duration::from_secs_f64(secs:pts as f64 * self.time_base_seconds);
128 self.start_time.checked_add(duration:pts_since_start)
129 })
130 .map(|absolute_pts: Instant| absolute_pts.duration_since(earlier:std::time::Instant::now()))
131 }
132}
133