1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{mem, ptr, sync::Arc}; |
4 | |
5 | use glib::{prelude::*, translate::*}; |
6 | |
7 | use crate::{ffi, Task}; |
8 | |
9 | #[allow (clippy::type_complexity)] |
10 | pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> { |
11 | func: Box<(F, *mut ffi::GstTask)>, |
12 | lock: Option<TaskLock>, |
13 | enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>, |
14 | leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>, |
15 | } |
16 | |
17 | impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> { |
18 | #[doc (alias = "gst_task_set_enter_callback" )] |
19 | pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self { |
20 | Self { |
21 | enter_callback: Some(Box::new(enter_callback)), |
22 | ..self |
23 | } |
24 | } |
25 | |
26 | #[doc (alias = "gst_task_set_enter_callback" )] |
27 | pub fn enter_callback_if<E: FnMut(&Task) + Send + 'static>( |
28 | self, |
29 | enter_callback: E, |
30 | predicate: bool, |
31 | ) -> Self { |
32 | if predicate { |
33 | self.enter_callback(enter_callback) |
34 | } else { |
35 | self |
36 | } |
37 | } |
38 | |
39 | #[doc (alias = "gst_task_set_enter_callback" )] |
40 | pub fn enter_callback_if_some<E: FnMut(&Task) + Send + 'static>( |
41 | self, |
42 | enter_callback: Option<E>, |
43 | ) -> Self { |
44 | if let Some(enter_callback) = enter_callback { |
45 | self.enter_callback(enter_callback) |
46 | } else { |
47 | self |
48 | } |
49 | } |
50 | |
51 | #[doc (alias = "gst_task_set_leave_callback" )] |
52 | pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self { |
53 | Self { |
54 | leave_callback: Some(Box::new(leave_callback)), |
55 | ..self |
56 | } |
57 | } |
58 | |
59 | #[doc (alias = "gst_task_set_leave_callback" )] |
60 | pub fn leave_callback_if<E: FnMut(&Task) + Send + 'static>( |
61 | self, |
62 | leave_callback: E, |
63 | predicate: bool, |
64 | ) -> Self { |
65 | if predicate { |
66 | self.leave_callback(leave_callback) |
67 | } else { |
68 | self |
69 | } |
70 | } |
71 | |
72 | #[doc (alias = "gst_task_set_leave_callback" )] |
73 | pub fn leave_callback_if_some<E: FnMut(&Task) + Send + 'static>( |
74 | self, |
75 | leave_callback: Option<E>, |
76 | ) -> Self { |
77 | if let Some(leave_callback) = leave_callback { |
78 | self.leave_callback(leave_callback) |
79 | } else { |
80 | self |
81 | } |
82 | } |
83 | |
84 | #[doc (alias = "gst_task_set_lock" )] |
85 | pub fn lock(self, lock: &TaskLock) -> Self { |
86 | Self { |
87 | lock: Some(lock.clone()), |
88 | ..self |
89 | } |
90 | } |
91 | |
92 | #[doc (alias = "gst_task_set_lock" )] |
93 | pub fn lock_if(self, lock: &TaskLock, predicate: bool) -> Self { |
94 | if predicate { |
95 | self.lock(lock) |
96 | } else { |
97 | self |
98 | } |
99 | } |
100 | |
101 | #[doc (alias = "gst_task_set_lock" )] |
102 | pub fn lock_if_some(self, lock: Option<&TaskLock>) -> Self { |
103 | if let Some(lock) = lock { |
104 | self.lock(lock) |
105 | } else { |
106 | self |
107 | } |
108 | } |
109 | |
110 | #[doc (alias = "gst_task_new" )] |
111 | pub fn build(self) -> Task { |
112 | unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>( |
113 | user_data: glib::ffi::gpointer, |
114 | ) { |
115 | let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _); |
116 | (callback.0)(&from_glib_borrow(callback.1)); |
117 | } |
118 | |
119 | unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>( |
120 | data: glib::ffi::gpointer, |
121 | ) { |
122 | let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _); |
123 | } |
124 | |
125 | unsafe extern "C" fn callback_trampoline( |
126 | task: *mut ffi::GstTask, |
127 | _thread: *mut glib::ffi::GThread, |
128 | data: glib::ffi::gpointer, |
129 | ) { |
130 | let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _); |
131 | callback(&from_glib_borrow(task)); |
132 | } |
133 | |
134 | #[allow (clippy::type_complexity)] |
135 | unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) { |
136 | let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> = |
137 | Box::from_raw(data as *mut _); |
138 | } |
139 | |
140 | unsafe { |
141 | let func_ptr = Box::into_raw(self.func); |
142 | |
143 | let task: Task = from_glib_full(ffi::gst_task_new( |
144 | Some(func_trampoline::<F> as _), |
145 | func_ptr as *mut _, |
146 | Some(destroy_func::<F> as _), |
147 | )); |
148 | |
149 | (*func_ptr).1 = task.to_glib_none().0; |
150 | |
151 | let lock = self.lock.unwrap_or_default(); |
152 | ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0)); |
153 | task.set_data("gstreamer-rs-task-lock" , Arc::clone(&lock.0)); |
154 | |
155 | if let Some(enter_callback) = self.enter_callback { |
156 | ffi::gst_task_set_enter_callback( |
157 | task.to_glib_none().0, |
158 | Some(callback_trampoline), |
159 | Box::into_raw(Box::new(enter_callback)) as *mut _, |
160 | Some(destroy_callback), |
161 | ); |
162 | } |
163 | |
164 | if let Some(leave_callback) = self.leave_callback { |
165 | ffi::gst_task_set_leave_callback( |
166 | task.to_glib_none().0, |
167 | Some(callback_trampoline), |
168 | Box::into_raw(Box::new(leave_callback)) as *mut _, |
169 | Some(destroy_callback), |
170 | ); |
171 | } |
172 | |
173 | task |
174 | } |
175 | } |
176 | } |
177 | |
178 | impl Task { |
179 | #[doc (alias = "gst_task_new" )] |
180 | pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> { |
181 | assert_initialized_main_thread!(); |
182 | TaskBuilder { |
183 | func: Box::new((func, ptr::null_mut())), |
184 | lock: None, |
185 | enter_callback: None, |
186 | leave_callback: None, |
187 | } |
188 | } |
189 | } |
190 | |
191 | #[derive (Debug, Clone)] |
192 | pub struct TaskLock(Arc<RecMutex>); |
193 | |
194 | impl Default for TaskLock { |
195 | #[inline ] |
196 | fn default() -> Self { |
197 | Self::new() |
198 | } |
199 | } |
200 | |
201 | #[derive (Debug)] |
202 | struct RecMutex(glib::ffi::GRecMutex); |
203 | |
204 | unsafe impl Send for RecMutex {} |
205 | unsafe impl Sync for RecMutex {} |
206 | |
207 | #[must_use = "if unused the TaskLock will immediately unlock" ] |
208 | pub struct TaskLockGuard<'a>(&'a RecMutex); |
209 | |
210 | impl TaskLock { |
211 | #[inline ] |
212 | pub fn new() -> Self { |
213 | unsafe { |
214 | let lock: TaskLock = TaskLock(Arc::new(data:RecMutex(mem::zeroed()))); |
215 | glib::ffi::g_rec_mutex_init(rec_mutex:mut_override(&lock.0 .0)); |
216 | lock |
217 | } |
218 | } |
219 | |
220 | // checker-ignore-item |
221 | #[inline ] |
222 | pub fn lock(&self) -> TaskLockGuard { |
223 | unsafe { |
224 | let guard: TaskLockGuard<'_> = TaskLockGuard(&self.0); |
225 | glib::ffi::g_rec_mutex_lock(rec_mutex:mut_override(&self.0 .0)); |
226 | guard |
227 | } |
228 | } |
229 | } |
230 | |
231 | impl Drop for RecMutex { |
232 | #[inline ] |
233 | fn drop(&mut self) { |
234 | unsafe { |
235 | glib::ffi::g_rec_mutex_clear(&mut self.0); |
236 | } |
237 | } |
238 | } |
239 | |
240 | impl Drop for TaskLockGuard<'_> { |
241 | #[inline ] |
242 | fn drop(&mut self) { |
243 | unsafe { |
244 | glib::ffi::g_rec_mutex_unlock(rec_mutex:mut_override(&self.0 .0)); |
245 | } |
246 | } |
247 | } |
248 | |
249 | #[cfg (test)] |
250 | mod tests { |
251 | use std::sync::mpsc::channel; |
252 | |
253 | use super::*; |
254 | use crate::prelude::*; |
255 | |
256 | #[test ] |
257 | fn test_simple() { |
258 | crate::init().unwrap(); |
259 | |
260 | #[derive (Debug, PartialEq, Eq)] |
261 | enum Called { |
262 | Enter, |
263 | Func, |
264 | Leave, |
265 | } |
266 | |
267 | let (send, recv) = channel(); |
268 | let lock = TaskLock::new(); |
269 | |
270 | let task = Task::builder({ |
271 | let send = send.clone(); |
272 | let mut count = 0; |
273 | move |task| { |
274 | count += 1; |
275 | if count >= 3 { |
276 | task.pause().unwrap(); |
277 | } |
278 | send.send(Called::Func).unwrap(); |
279 | } |
280 | }) |
281 | .enter_callback({ |
282 | let send = send.clone(); |
283 | move |_task| { |
284 | send.send(Called::Enter).unwrap(); |
285 | } |
286 | }) |
287 | .leave_callback({ |
288 | move |_task| { |
289 | send.send(Called::Leave).unwrap(); |
290 | } |
291 | }) |
292 | .lock(&lock) |
293 | .build(); |
294 | |
295 | task.start().unwrap(); |
296 | |
297 | assert_eq!(recv.recv(), Ok(Called::Enter)); |
298 | assert_eq!(recv.recv(), Ok(Called::Func)); |
299 | assert_eq!(recv.recv(), Ok(Called::Func)); |
300 | assert_eq!(recv.recv(), Ok(Called::Func)); |
301 | |
302 | assert_eq!(task.state(), crate::TaskState::Paused); |
303 | task.stop().unwrap(); |
304 | assert_eq!(recv.recv(), Ok(Called::Leave)); |
305 | task.join().unwrap(); |
306 | } |
307 | } |
308 | |