1 | // Copyright © SixtyFPS GmbH <info@slint.dev> |
2 | // SPDX-License-Identifier: MIT |
3 | |
4 | use std::path::PathBuf; |
5 | |
6 | use futures::{future::OptionFuture, FutureExt}; |
7 | |
8 | mod audio; |
9 | mod video; |
10 | |
11 | #[derive (Clone, Copy)] |
12 | pub enum ControlCommand { |
13 | Play, |
14 | Pause, |
15 | } |
16 | |
17 | pub struct Player { |
18 | control_sender: smol::channel::Sender<ControlCommand>, |
19 | demuxer_thread: Option<std::thread::JoinHandle<()>>, |
20 | playing: bool, |
21 | playing_changed_callback: Box<dyn Fn(bool)>, |
22 | } |
23 | |
24 | impl Player { |
25 | pub fn start( |
26 | path: PathBuf, |
27 | video_frame_callback: impl FnMut(&ffmpeg_next::util::frame::Video) + Send + 'static, |
28 | playing_changed_callback: impl Fn(bool) + 'static, |
29 | ) -> Result<Self, anyhow::Error> { |
30 | let (control_sender, control_receiver) = smol::channel::unbounded(); |
31 | |
32 | let demuxer_thread = |
33 | std::thread::Builder::new().name("demuxer thread" .into()).spawn(move || { |
34 | smol::block_on(async move { |
35 | let mut input_context = ffmpeg_next::format::input(&path).unwrap(); |
36 | |
37 | let video_stream = |
38 | input_context.streams().best(ffmpeg_next::media::Type::Video).unwrap(); |
39 | let video_stream_index = video_stream.index(); |
40 | let video_playback_thread = video::VideoPlaybackThread::start( |
41 | &video_stream, |
42 | Box::new(video_frame_callback), |
43 | ) |
44 | .unwrap(); |
45 | |
46 | let audio_stream = |
47 | input_context.streams().best(ffmpeg_next::media::Type::Audio).unwrap(); |
48 | let audio_stream_index = audio_stream.index(); |
49 | let audio_playback_thread = |
50 | audio::AudioPlaybackThread::start(&audio_stream).unwrap(); |
51 | |
52 | let mut playing = true; |
53 | |
54 | // This is sub-optimal, as reading the packets from ffmpeg might be blocking |
55 | // and the future won't yield for that. So while ffmpeg sits on some blocking |
56 | // I/O operation, the caller here will also block and we won't end up polling |
57 | // the control_receiver future further down. |
58 | let packet_forwarder_impl = async { |
59 | for (stream, packet) in input_context.packets() { |
60 | if stream.index() == audio_stream_index { |
61 | audio_playback_thread.receive_packet(packet).await; |
62 | } else if stream.index() == video_stream_index { |
63 | video_playback_thread.receive_packet(packet).await; |
64 | } |
65 | } |
66 | } |
67 | .fuse() |
68 | .shared(); |
69 | |
70 | loop { |
71 | // This is sub-optimal, as reading the packets from ffmpeg might be blocking |
72 | // and the future won't yield for that. So while ffmpeg sits on some blocking |
73 | // I/O operation, the caller here will also block and we won't end up polling |
74 | // the control_receiver future further down. |
75 | let packet_forwarder: OptionFuture<_> = |
76 | if playing { Some(packet_forwarder_impl.clone()) } else { None }.into(); |
77 | |
78 | smol::pin!(packet_forwarder); |
79 | |
80 | futures::select! { |
81 | _ = packet_forwarder => {}, // playback finished |
82 | received_command = control_receiver.recv().fuse() => { |
83 | match received_command { |
84 | Ok(command) => { |
85 | video_playback_thread.send_control_message(command).await; |
86 | audio_playback_thread.send_control_message(command).await; |
87 | match command { |
88 | ControlCommand::Play => { |
89 | // Continue in the loop, polling the packet forwarder future to forward |
90 | // packets |
91 | playing = true; |
92 | }, |
93 | ControlCommand::Pause => { |
94 | playing = false; |
95 | } |
96 | } |
97 | } |
98 | Err(_) => { |
99 | // Channel closed -> quit |
100 | return; |
101 | } |
102 | } |
103 | } |
104 | } |
105 | } |
106 | }) |
107 | })?; |
108 | |
109 | let playing = true; |
110 | playing_changed_callback(playing); |
111 | |
112 | Ok(Self { |
113 | control_sender, |
114 | demuxer_thread: Some(demuxer_thread), |
115 | playing, |
116 | playing_changed_callback: Box::new(playing_changed_callback), |
117 | }) |
118 | } |
119 | |
120 | pub fn toggle_pause_playing(&mut self) { |
121 | if self.playing { |
122 | self.playing = false; |
123 | self.control_sender.send_blocking(ControlCommand::Pause).unwrap(); |
124 | } else { |
125 | self.playing = true; |
126 | self.control_sender.send_blocking(ControlCommand::Play).unwrap(); |
127 | } |
128 | (self.playing_changed_callback)(self.playing); |
129 | } |
130 | } |
131 | |
132 | impl Drop for Player { |
133 | fn drop(&mut self) { |
134 | self.control_sender.close(); |
135 | if let Some(decoder_thread: JoinHandle<()>) = self.demuxer_thread.take() { |
136 | decoder_thread.join().unwrap(); |
137 | } |
138 | } |
139 | } |
140 | |