| 1 | // Copyright © SixtyFPS GmbH <info@slint.dev> |
| 2 | // SPDX-License-Identifier: MIT |
| 3 | |
| 4 | use futures::{future::OptionFuture, FutureExt}; |
| 5 | |
| 6 | use super::ControlCommand; |
| 7 | |
| 8 | pub struct VideoPlaybackThread { |
| 9 | control_sender: smol::channel::Sender<ControlCommand>, |
| 10 | packet_sender: smol::channel::Sender<ffmpeg_next::codec::packet::packet::Packet>, |
| 11 | receiver_thread: Option<std::thread::JoinHandle<()>>, |
| 12 | } |
| 13 | |
| 14 | impl VideoPlaybackThread { |
| 15 | pub fn start( |
| 16 | stream: &ffmpeg_next::format::stream::Stream, |
| 17 | mut video_frame_callback: Box<dyn FnMut(&ffmpeg_next::util::frame::Video) + Send>, |
| 18 | ) -> Result<Self, anyhow::Error> { |
| 19 | let (control_sender, control_receiver) = smol::channel::unbounded(); |
| 20 | |
| 21 | let (packet_sender, packet_receiver) = smol::channel::bounded(128); |
| 22 | |
| 23 | let decoder_context = ffmpeg_next::codec::Context::from_parameters(stream.parameters())?; |
| 24 | let mut packet_decoder = decoder_context.decoder().video()?; |
| 25 | |
| 26 | let clock = StreamClock::new(stream); |
| 27 | |
| 28 | let receiver_thread = |
| 29 | std::thread::Builder::new().name("video playback thread" .into()).spawn(move || { |
| 30 | smol::block_on(async move { |
| 31 | let packet_receiver_impl = async { |
| 32 | loop { |
| 33 | let Ok(packet) = packet_receiver.recv().await else { break }; |
| 34 | |
| 35 | smol::future::yield_now().await; |
| 36 | |
| 37 | packet_decoder.send_packet(&packet).unwrap(); |
| 38 | |
| 39 | let mut decoded_frame = ffmpeg_next::util::frame::Video::empty(); |
| 40 | |
| 41 | while packet_decoder.receive_frame(&mut decoded_frame).is_ok() { |
| 42 | if let Some(delay) = |
| 43 | clock.convert_pts_to_instant(decoded_frame.pts()) |
| 44 | { |
| 45 | smol::Timer::after(delay).await; |
| 46 | } |
| 47 | |
| 48 | video_frame_callback(&decoded_frame); |
| 49 | } |
| 50 | } |
| 51 | } |
| 52 | .fuse() |
| 53 | .shared(); |
| 54 | |
| 55 | let mut playing = true; |
| 56 | |
| 57 | loop { |
| 58 | let packet_receiver: OptionFuture<_> = |
| 59 | if playing { Some(packet_receiver_impl.clone()) } else { None }.into(); |
| 60 | |
| 61 | smol::pin!(packet_receiver); |
| 62 | |
| 63 | futures::select! { |
| 64 | _ = packet_receiver => {}, |
| 65 | received_command = control_receiver.recv().fuse() => { |
| 66 | match received_command { |
| 67 | Ok(ControlCommand::Pause) => { |
| 68 | playing = false; |
| 69 | } |
| 70 | Ok(ControlCommand::Play) => { |
| 71 | playing = true; |
| 72 | } |
| 73 | Err(_) => { |
| 74 | // Channel closed -> quit |
| 75 | return; |
| 76 | } |
| 77 | } |
| 78 | } |
| 79 | } |
| 80 | } |
| 81 | }) |
| 82 | })?; |
| 83 | |
| 84 | Ok(Self { control_sender, packet_sender, receiver_thread: Some(receiver_thread) }) |
| 85 | } |
| 86 | |
| 87 | pub async fn receive_packet(&self, packet: ffmpeg_next::codec::packet::packet::Packet) -> bool { |
| 88 | match self.packet_sender.send(packet).await { |
| 89 | Ok(_) => return true, |
| 90 | Err(smol::channel::SendError(_)) => return false, |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | pub async fn send_control_message(&self, message: ControlCommand) { |
| 95 | self.control_sender.send(message).await.unwrap(); |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | impl Drop for VideoPlaybackThread { |
| 100 | fn drop(&mut self) { |
| 101 | self.control_sender.close(); |
| 102 | if let Some(receiver_join_handle: JoinHandle<()>) = self.receiver_thread.take() { |
| 103 | receiver_join_handle.join().unwrap(); |
| 104 | } |
| 105 | } |
| 106 | } |
| 107 | |
| 108 | struct StreamClock { |
| 109 | time_base_seconds: f64, |
| 110 | start_time: std::time::Instant, |
| 111 | } |
| 112 | |
| 113 | impl StreamClock { |
| 114 | fn new(stream: &ffmpeg_next::format::stream::Stream) -> Self { |
| 115 | let time_base_seconds: Rational = stream.time_base(); |
| 116 | let time_base_seconds: f64 = |
| 117 | time_base_seconds.numerator() as f64 / time_base_seconds.denominator() as f64; |
| 118 | |
| 119 | let start_time: Instant = std::time::Instant::now(); |
| 120 | |
| 121 | Self { time_base_seconds, start_time } |
| 122 | } |
| 123 | |
| 124 | fn convert_pts_to_instant(&self, pts: Option<i64>) -> Option<std::time::Duration> { |
| 125 | ptsOption.and_then(|pts: i64| { |
| 126 | let pts_since_start: Duration = |
| 127 | std::time::Duration::from_secs_f64(secs:pts as f64 * self.time_base_seconds); |
| 128 | self.start_time.checked_add(duration:pts_since_start) |
| 129 | }) |
| 130 | .map(|absolute_pts: Instant| absolute_pts.duration_since(earlier:std::time::Instant::now())) |
| 131 | } |
| 132 | } |
| 133 | |