1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{mem, ptr, sync::Arc}; |
4 | |
5 | use glib::{prelude::*, translate::*}; |
6 | |
7 | use crate::Task; |
8 | |
9 | #[allow (clippy::type_complexity)] |
10 | pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> { |
11 | func: Box<(F, *mut ffi::GstTask)>, |
12 | lock: Option<TaskLock>, |
13 | enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>, |
14 | leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>, |
15 | } |
16 | |
17 | impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> { |
18 | #[doc (alias = "gst_task_set_enter_callback" )] |
19 | pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self { |
20 | Self { |
21 | enter_callback: Some(Box::new(enter_callback)), |
22 | ..self |
23 | } |
24 | } |
25 | |
26 | #[doc (alias = "gst_task_set_leave_callback" )] |
27 | pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self { |
28 | Self { |
29 | leave_callback: Some(Box::new(leave_callback)), |
30 | ..self |
31 | } |
32 | } |
33 | |
34 | #[doc (alias = "gst_task_set_lock" )] |
35 | pub fn lock(self, lock: &TaskLock) -> Self { |
36 | Self { |
37 | lock: Some(lock.clone()), |
38 | ..self |
39 | } |
40 | } |
41 | |
42 | #[doc (alias = "gst_task_new" )] |
43 | pub fn build(self) -> Task { |
44 | unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>( |
45 | user_data: glib::ffi::gpointer, |
46 | ) { |
47 | let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _); |
48 | (callback.0)(&from_glib_borrow(callback.1)); |
49 | } |
50 | |
51 | unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>( |
52 | data: glib::ffi::gpointer, |
53 | ) { |
54 | let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _); |
55 | } |
56 | |
57 | unsafe extern "C" fn callback_trampoline( |
58 | task: *mut ffi::GstTask, |
59 | _thread: *mut glib::ffi::GThread, |
60 | data: glib::ffi::gpointer, |
61 | ) { |
62 | let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _); |
63 | callback(&from_glib_borrow(task)); |
64 | } |
65 | |
66 | #[allow (clippy::type_complexity)] |
67 | unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) { |
68 | let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> = |
69 | Box::from_raw(data as *mut _); |
70 | } |
71 | |
72 | unsafe { |
73 | let func_ptr = Box::into_raw(self.func); |
74 | |
75 | let task: Task = from_glib_full(ffi::gst_task_new( |
76 | Some(func_trampoline::<F> as _), |
77 | func_ptr as *mut _, |
78 | Some(destroy_func::<F> as _), |
79 | )); |
80 | |
81 | (*func_ptr).1 = task.to_glib_none().0; |
82 | |
83 | let lock = self.lock.unwrap_or_default(); |
84 | ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0)); |
85 | task.set_data("gstreamer-rs-task-lock" , Arc::clone(&lock.0)); |
86 | |
87 | if let Some(enter_callback) = self.enter_callback { |
88 | ffi::gst_task_set_enter_callback( |
89 | task.to_glib_none().0, |
90 | Some(callback_trampoline), |
91 | Box::into_raw(Box::new(enter_callback)) as *mut _, |
92 | Some(destroy_callback), |
93 | ); |
94 | } |
95 | |
96 | if let Some(leave_callback) = self.leave_callback { |
97 | ffi::gst_task_set_leave_callback( |
98 | task.to_glib_none().0, |
99 | Some(callback_trampoline), |
100 | Box::into_raw(Box::new(leave_callback)) as *mut _, |
101 | Some(destroy_callback), |
102 | ); |
103 | } |
104 | |
105 | task |
106 | } |
107 | } |
108 | } |
109 | |
110 | impl Task { |
111 | #[doc (alias = "gst_task_new" )] |
112 | pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> { |
113 | assert_initialized_main_thread!(); |
114 | TaskBuilder { |
115 | func: Box::new((func, ptr::null_mut())), |
116 | lock: None, |
117 | enter_callback: None, |
118 | leave_callback: None, |
119 | } |
120 | } |
121 | } |
122 | |
123 | #[derive (Debug, Clone)] |
124 | pub struct TaskLock(Arc<RecMutex>); |
125 | |
126 | impl Default for TaskLock { |
127 | #[inline ] |
128 | fn default() -> Self { |
129 | Self::new() |
130 | } |
131 | } |
132 | |
133 | #[derive (Debug)] |
134 | struct RecMutex(glib::ffi::GRecMutex); |
135 | |
136 | unsafe impl Send for RecMutex {} |
137 | unsafe impl Sync for RecMutex {} |
138 | |
139 | #[must_use = "if unused the TaskLock will immediately unlock" ] |
140 | pub struct TaskLockGuard<'a>(&'a RecMutex); |
141 | |
142 | impl TaskLock { |
143 | #[inline ] |
144 | pub fn new() -> Self { |
145 | unsafe { |
146 | let lock: TaskLock = TaskLock(Arc::new(data:RecMutex(mem::zeroed()))); |
147 | glib::ffi::g_rec_mutex_init(rec_mutex:mut_override(&lock.0 .0)); |
148 | lock |
149 | } |
150 | } |
151 | |
152 | // checker-ignore-item |
153 | #[inline ] |
154 | pub fn lock(&self) -> TaskLockGuard { |
155 | unsafe { |
156 | let guard: TaskLockGuard<'_> = TaskLockGuard(&self.0); |
157 | glib::ffi::g_rec_mutex_lock(rec_mutex:mut_override(&self.0 .0)); |
158 | guard |
159 | } |
160 | } |
161 | } |
162 | |
163 | impl Drop for RecMutex { |
164 | #[inline ] |
165 | fn drop(&mut self) { |
166 | unsafe { |
167 | glib::ffi::g_rec_mutex_clear(&mut self.0); |
168 | } |
169 | } |
170 | } |
171 | |
172 | impl<'a> Drop for TaskLockGuard<'a> { |
173 | #[inline ] |
174 | fn drop(&mut self) { |
175 | unsafe { |
176 | glib::ffi::g_rec_mutex_unlock(rec_mutex:mut_override(&self.0 .0)); |
177 | } |
178 | } |
179 | } |
180 | |
181 | #[cfg (test)] |
182 | mod tests { |
183 | use std::sync::mpsc::channel; |
184 | |
185 | use super::*; |
186 | use crate::prelude::*; |
187 | |
188 | #[test ] |
189 | fn test_simple() { |
190 | crate::init().unwrap(); |
191 | |
192 | #[derive (Debug, PartialEq, Eq)] |
193 | enum Called { |
194 | Enter, |
195 | Func, |
196 | Leave, |
197 | } |
198 | |
199 | let (send, recv) = channel(); |
200 | let lock = TaskLock::new(); |
201 | |
202 | let task = Task::builder({ |
203 | let send = send.clone(); |
204 | let mut count = 0; |
205 | move |task| { |
206 | count += 1; |
207 | if count >= 3 { |
208 | task.pause().unwrap(); |
209 | } |
210 | send.send(Called::Func).unwrap(); |
211 | } |
212 | }) |
213 | .enter_callback({ |
214 | let send = send.clone(); |
215 | move |_task| { |
216 | send.send(Called::Enter).unwrap(); |
217 | } |
218 | }) |
219 | .leave_callback({ |
220 | move |_task| { |
221 | send.send(Called::Leave).unwrap(); |
222 | } |
223 | }) |
224 | .lock(&lock) |
225 | .build(); |
226 | |
227 | task.start().unwrap(); |
228 | |
229 | assert_eq!(recv.recv(), Ok(Called::Enter)); |
230 | assert_eq!(recv.recv(), Ok(Called::Func)); |
231 | assert_eq!(recv.recv(), Ok(Called::Func)); |
232 | assert_eq!(recv.recv(), Ok(Called::Func)); |
233 | |
234 | assert_eq!(task.state(), crate::TaskState::Paused); |
235 | task.stop().unwrap(); |
236 | assert_eq!(recv.recv(), Ok(Called::Leave)); |
237 | task.join().unwrap(); |
238 | } |
239 | } |
240 | |