1// Copyright © SixtyFPS GmbH <info@slint.dev>
2// SPDX-License-Identifier: MIT
3
4use std::path::PathBuf;
5
6use futures::{future::OptionFuture, FutureExt};
7
8mod audio;
9mod video;
10
11#[derive(Clone, Copy)]
12pub enum ControlCommand {
13 Play,
14 Pause,
15}
16
17pub struct Player {
18 control_sender: smol::channel::Sender<ControlCommand>,
19 demuxer_thread: Option<std::thread::JoinHandle<()>>,
20 playing: bool,
21 playing_changed_callback: Box<dyn Fn(bool)>,
22}
23
24impl Player {
25 pub fn start(
26 path: PathBuf,
27 video_frame_callback: impl FnMut(&ffmpeg_next::util::frame::Video) + Send + 'static,
28 playing_changed_callback: impl Fn(bool) + 'static,
29 ) -> Result<Self, anyhow::Error> {
30 let (control_sender, control_receiver) = smol::channel::unbounded();
31
32 let demuxer_thread =
33 std::thread::Builder::new().name("demuxer thread".into()).spawn(move || {
34 smol::block_on(async move {
35 let mut input_context = ffmpeg_next::format::input(&path).unwrap();
36
37 let video_stream =
38 input_context.streams().best(ffmpeg_next::media::Type::Video).unwrap();
39 let video_stream_index = video_stream.index();
40 let video_playback_thread = video::VideoPlaybackThread::start(
41 &video_stream,
42 Box::new(video_frame_callback),
43 )
44 .unwrap();
45
46 let audio_stream =
47 input_context.streams().best(ffmpeg_next::media::Type::Audio).unwrap();
48 let audio_stream_index = audio_stream.index();
49 let audio_playback_thread =
50 audio::AudioPlaybackThread::start(&audio_stream).unwrap();
51
52 let mut playing = true;
53
54 // This is sub-optimal, as reading the packets from ffmpeg might be blocking
55 // and the future won't yield for that. So while ffmpeg sits on some blocking
56 // I/O operation, the caller here will also block and we won't end up polling
57 // the control_receiver future further down.
58 let packet_forwarder_impl = async {
59 for (stream, packet) in input_context.packets() {
60 if stream.index() == audio_stream_index {
61 audio_playback_thread.receive_packet(packet).await;
62 } else if stream.index() == video_stream_index {
63 video_playback_thread.receive_packet(packet).await;
64 }
65 }
66 }
67 .fuse()
68 .shared();
69
70 loop {
71 // This is sub-optimal, as reading the packets from ffmpeg might be blocking
72 // and the future won't yield for that. So while ffmpeg sits on some blocking
73 // I/O operation, the caller here will also block and we won't end up polling
74 // the control_receiver future further down.
75 let packet_forwarder: OptionFuture<_> =
76 if playing { Some(packet_forwarder_impl.clone()) } else { None }.into();
77
78 smol::pin!(packet_forwarder);
79
80 futures::select! {
81 _ = packet_forwarder => {}, // playback finished
82 received_command = control_receiver.recv().fuse() => {
83 match received_command {
84 Ok(command) => {
85 video_playback_thread.send_control_message(command).await;
86 audio_playback_thread.send_control_message(command).await;
87 match command {
88 ControlCommand::Play => {
89 // Continue in the loop, polling the packet forwarder future to forward
90 // packets
91 playing = true;
92 },
93 ControlCommand::Pause => {
94 playing = false;
95 }
96 }
97 }
98 Err(_) => {
99 // Channel closed -> quit
100 return;
101 }
102 }
103 }
104 }
105 }
106 })
107 })?;
108
109 let playing = true;
110 playing_changed_callback(playing);
111
112 Ok(Self {
113 control_sender,
114 demuxer_thread: Some(demuxer_thread),
115 playing,
116 playing_changed_callback: Box::new(playing_changed_callback),
117 })
118 }
119
120 pub fn toggle_pause_playing(&mut self) {
121 if self.playing {
122 self.playing = false;
123 self.control_sender.send_blocking(ControlCommand::Pause).unwrap();
124 } else {
125 self.playing = true;
126 self.control_sender.send_blocking(ControlCommand::Play).unwrap();
127 }
128 (self.playing_changed_callback)(self.playing);
129 }
130}
131
132impl Drop for Player {
133 fn drop(&mut self) {
134 self.control_sender.close();
135 if let Some(decoder_thread: JoinHandle<()>) = self.demuxer_thread.take() {
136 decoder_thread.join().unwrap();
137 }
138 }
139}
140