1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{mem, ptr, sync::Arc};
4
5use glib::{prelude::*, translate::*};
6
7use crate::Task;
8
9#[allow(clippy::type_complexity)]
10pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> {
11 func: Box<(F, *mut ffi::GstTask)>,
12 lock: Option<TaskLock>,
13 enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
14 leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
15}
16
17impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> {
18 #[doc(alias = "gst_task_set_enter_callback")]
19 pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self {
20 Self {
21 enter_callback: Some(Box::new(enter_callback)),
22 ..self
23 }
24 }
25
26 #[doc(alias = "gst_task_set_leave_callback")]
27 pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self {
28 Self {
29 leave_callback: Some(Box::new(leave_callback)),
30 ..self
31 }
32 }
33
34 #[doc(alias = "gst_task_set_lock")]
35 pub fn lock(self, lock: &TaskLock) -> Self {
36 Self {
37 lock: Some(lock.clone()),
38 ..self
39 }
40 }
41
42 #[doc(alias = "gst_task_new")]
43 pub fn build(self) -> Task {
44 unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>(
45 user_data: glib::ffi::gpointer,
46 ) {
47 let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _);
48 (callback.0)(&from_glib_borrow(callback.1));
49 }
50
51 unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>(
52 data: glib::ffi::gpointer,
53 ) {
54 let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _);
55 }
56
57 unsafe extern "C" fn callback_trampoline(
58 task: *mut ffi::GstTask,
59 _thread: *mut glib::ffi::GThread,
60 data: glib::ffi::gpointer,
61 ) {
62 let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _);
63 callback(&from_glib_borrow(task));
64 }
65
66 #[allow(clippy::type_complexity)]
67 unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) {
68 let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> =
69 Box::from_raw(data as *mut _);
70 }
71
72 unsafe {
73 let func_ptr = Box::into_raw(self.func);
74
75 let task: Task = from_glib_full(ffi::gst_task_new(
76 Some(func_trampoline::<F> as _),
77 func_ptr as *mut _,
78 Some(destroy_func::<F> as _),
79 ));
80
81 (*func_ptr).1 = task.to_glib_none().0;
82
83 let lock = self.lock.unwrap_or_default();
84 ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0));
85 task.set_data("gstreamer-rs-task-lock", Arc::clone(&lock.0));
86
87 if let Some(enter_callback) = self.enter_callback {
88 ffi::gst_task_set_enter_callback(
89 task.to_glib_none().0,
90 Some(callback_trampoline),
91 Box::into_raw(Box::new(enter_callback)) as *mut _,
92 Some(destroy_callback),
93 );
94 }
95
96 if let Some(leave_callback) = self.leave_callback {
97 ffi::gst_task_set_leave_callback(
98 task.to_glib_none().0,
99 Some(callback_trampoline),
100 Box::into_raw(Box::new(leave_callback)) as *mut _,
101 Some(destroy_callback),
102 );
103 }
104
105 task
106 }
107 }
108}
109
110impl Task {
111 #[doc(alias = "gst_task_new")]
112 pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> {
113 assert_initialized_main_thread!();
114 TaskBuilder {
115 func: Box::new((func, ptr::null_mut())),
116 lock: None,
117 enter_callback: None,
118 leave_callback: None,
119 }
120 }
121}
122
123#[derive(Debug, Clone)]
124pub struct TaskLock(Arc<RecMutex>);
125
126impl Default for TaskLock {
127 #[inline]
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133#[derive(Debug)]
134struct RecMutex(glib::ffi::GRecMutex);
135
136unsafe impl Send for RecMutex {}
137unsafe impl Sync for RecMutex {}
138
139#[must_use = "if unused the TaskLock will immediately unlock"]
140pub struct TaskLockGuard<'a>(&'a RecMutex);
141
142impl TaskLock {
143 #[inline]
144 pub fn new() -> Self {
145 unsafe {
146 let lock: TaskLock = TaskLock(Arc::new(data:RecMutex(mem::zeroed())));
147 glib::ffi::g_rec_mutex_init(rec_mutex:mut_override(&lock.0 .0));
148 lock
149 }
150 }
151
152 // checker-ignore-item
153 #[inline]
154 pub fn lock(&self) -> TaskLockGuard {
155 unsafe {
156 let guard: TaskLockGuard<'_> = TaskLockGuard(&self.0);
157 glib::ffi::g_rec_mutex_lock(rec_mutex:mut_override(&self.0 .0));
158 guard
159 }
160 }
161}
162
163impl Drop for RecMutex {
164 #[inline]
165 fn drop(&mut self) {
166 unsafe {
167 glib::ffi::g_rec_mutex_clear(&mut self.0);
168 }
169 }
170}
171
172impl<'a> Drop for TaskLockGuard<'a> {
173 #[inline]
174 fn drop(&mut self) {
175 unsafe {
176 glib::ffi::g_rec_mutex_unlock(rec_mutex:mut_override(&self.0 .0));
177 }
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use std::sync::mpsc::channel;
184
185 use super::*;
186 use crate::prelude::*;
187
188 #[test]
189 fn test_simple() {
190 crate::init().unwrap();
191
192 #[derive(Debug, PartialEq, Eq)]
193 enum Called {
194 Enter,
195 Func,
196 Leave,
197 }
198
199 let (send, recv) = channel();
200 let lock = TaskLock::new();
201
202 let task = Task::builder({
203 let send = send.clone();
204 let mut count = 0;
205 move |task| {
206 count += 1;
207 if count >= 3 {
208 task.pause().unwrap();
209 }
210 send.send(Called::Func).unwrap();
211 }
212 })
213 .enter_callback({
214 let send = send.clone();
215 move |_task| {
216 send.send(Called::Enter).unwrap();
217 }
218 })
219 .leave_callback({
220 move |_task| {
221 send.send(Called::Leave).unwrap();
222 }
223 })
224 .lock(&lock)
225 .build();
226
227 task.start().unwrap();
228
229 assert_eq!(recv.recv(), Ok(Called::Enter));
230 assert_eq!(recv.recv(), Ok(Called::Func));
231 assert_eq!(recv.recv(), Ok(Called::Func));
232 assert_eq!(recv.recv(), Ok(Called::Func));
233
234 assert_eq!(task.state(), crate::TaskState::Paused);
235 task.stop().unwrap();
236 assert_eq!(recv.recv(), Ok(Called::Leave));
237 task.join().unwrap();
238 }
239}
240