1 | // Copyright © SixtyFPS GmbH <info@slint.dev> |
2 | // SPDX-License-Identifier: MIT |
3 | |
4 | use futures::{future::OptionFuture, FutureExt}; |
5 | |
6 | use super::ControlCommand; |
7 | |
8 | pub struct VideoPlaybackThread { |
9 | control_sender: smol::channel::Sender<ControlCommand>, |
10 | packet_sender: smol::channel::Sender<ffmpeg_next::codec::packet::packet::Packet>, |
11 | receiver_thread: Option<std::thread::JoinHandle<()>>, |
12 | } |
13 | |
14 | impl VideoPlaybackThread { |
15 | pub fn start( |
16 | stream: &ffmpeg_next::format::stream::Stream, |
17 | mut video_frame_callback: Box<dyn FnMut(&ffmpeg_next::util::frame::Video) + Send>, |
18 | ) -> Result<Self, anyhow::Error> { |
19 | let (control_sender, control_receiver) = smol::channel::unbounded(); |
20 | |
21 | let (packet_sender, packet_receiver) = smol::channel::bounded(128); |
22 | |
23 | let decoder_context = ffmpeg_next::codec::Context::from_parameters(stream.parameters())?; |
24 | let mut packet_decoder = decoder_context.decoder().video()?; |
25 | |
26 | let clock = StreamClock::new(stream); |
27 | |
28 | let receiver_thread = |
29 | std::thread::Builder::new().name("video playback thread" .into()).spawn(move || { |
30 | smol::block_on(async move { |
31 | let packet_receiver_impl = async { |
32 | loop { |
33 | let Ok(packet) = packet_receiver.recv().await else { break }; |
34 | |
35 | smol::future::yield_now().await; |
36 | |
37 | packet_decoder.send_packet(&packet).unwrap(); |
38 | |
39 | let mut decoded_frame = ffmpeg_next::util::frame::Video::empty(); |
40 | |
41 | while packet_decoder.receive_frame(&mut decoded_frame).is_ok() { |
42 | if let Some(delay) = |
43 | clock.convert_pts_to_instant(decoded_frame.pts()) |
44 | { |
45 | smol::Timer::after(delay).await; |
46 | } |
47 | |
48 | video_frame_callback(&decoded_frame); |
49 | } |
50 | } |
51 | } |
52 | .fuse() |
53 | .shared(); |
54 | |
55 | let mut playing = true; |
56 | |
57 | loop { |
58 | let packet_receiver: OptionFuture<_> = |
59 | if playing { Some(packet_receiver_impl.clone()) } else { None }.into(); |
60 | |
61 | smol::pin!(packet_receiver); |
62 | |
63 | futures::select! { |
64 | _ = packet_receiver => {}, |
65 | received_command = control_receiver.recv().fuse() => { |
66 | match received_command { |
67 | Ok(ControlCommand::Pause) => { |
68 | playing = false; |
69 | } |
70 | Ok(ControlCommand::Play) => { |
71 | playing = true; |
72 | } |
73 | Err(_) => { |
74 | // Channel closed -> quit |
75 | return; |
76 | } |
77 | } |
78 | } |
79 | } |
80 | } |
81 | }) |
82 | })?; |
83 | |
84 | Ok(Self { control_sender, packet_sender, receiver_thread: Some(receiver_thread) }) |
85 | } |
86 | |
87 | pub async fn receive_packet(&self, packet: ffmpeg_next::codec::packet::packet::Packet) -> bool { |
88 | match self.packet_sender.send(packet).await { |
89 | Ok(_) => return true, |
90 | Err(smol::channel::SendError(_)) => return false, |
91 | } |
92 | } |
93 | |
94 | pub async fn send_control_message(&self, message: ControlCommand) { |
95 | self.control_sender.send(message).await.unwrap(); |
96 | } |
97 | } |
98 | |
99 | impl Drop for VideoPlaybackThread { |
100 | fn drop(&mut self) { |
101 | self.control_sender.close(); |
102 | if let Some(receiver_join_handle: JoinHandle<()>) = self.receiver_thread.take() { |
103 | receiver_join_handle.join().unwrap(); |
104 | } |
105 | } |
106 | } |
107 | |
108 | struct StreamClock { |
109 | time_base_seconds: f64, |
110 | start_time: std::time::Instant, |
111 | } |
112 | |
113 | impl StreamClock { |
114 | fn new(stream: &ffmpeg_next::format::stream::Stream) -> Self { |
115 | let time_base_seconds: Rational = stream.time_base(); |
116 | let time_base_seconds: f64 = |
117 | time_base_seconds.numerator() as f64 / time_base_seconds.denominator() as f64; |
118 | |
119 | let start_time: Instant = std::time::Instant::now(); |
120 | |
121 | Self { time_base_seconds, start_time } |
122 | } |
123 | |
124 | fn convert_pts_to_instant(&self, pts: Option<i64>) -> Option<std::time::Duration> { |
125 | ptsOption.and_then(|pts: i64| { |
126 | let pts_since_start: Duration = |
127 | std::time::Duration::from_secs_f64(secs:pts as f64 * self.time_base_seconds); |
128 | self.start_time.checked_add(duration:pts_since_start) |
129 | }) |
130 | .map(|absolute_pts: Instant| absolute_pts.duration_since(earlier:std::time::Instant::now())) |
131 | } |
132 | } |
133 | |