| 1 | // Take a look at the license at the top of the repository in the LICENSE file. |
| 2 | |
| 3 | use std::{mem, ptr, sync::Arc}; |
| 4 | |
| 5 | use glib::{prelude::*, translate::*}; |
| 6 | |
| 7 | use crate::{ffi, Task}; |
| 8 | |
| 9 | #[allow (clippy::type_complexity)] |
| 10 | pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> { |
| 11 | func: Box<(F, *mut ffi::GstTask)>, |
| 12 | lock: Option<TaskLock>, |
| 13 | enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>, |
| 14 | leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>, |
| 15 | } |
| 16 | |
| 17 | impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> { |
| 18 | #[doc (alias = "gst_task_set_enter_callback" )] |
| 19 | pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self { |
| 20 | Self { |
| 21 | enter_callback: Some(Box::new(enter_callback)), |
| 22 | ..self |
| 23 | } |
| 24 | } |
| 25 | |
| 26 | #[doc (alias = "gst_task_set_enter_callback" )] |
| 27 | pub fn enter_callback_if<E: FnMut(&Task) + Send + 'static>( |
| 28 | self, |
| 29 | enter_callback: E, |
| 30 | predicate: bool, |
| 31 | ) -> Self { |
| 32 | if predicate { |
| 33 | self.enter_callback(enter_callback) |
| 34 | } else { |
| 35 | self |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | #[doc (alias = "gst_task_set_enter_callback" )] |
| 40 | pub fn enter_callback_if_some<E: FnMut(&Task) + Send + 'static>( |
| 41 | self, |
| 42 | enter_callback: Option<E>, |
| 43 | ) -> Self { |
| 44 | if let Some(enter_callback) = enter_callback { |
| 45 | self.enter_callback(enter_callback) |
| 46 | } else { |
| 47 | self |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | #[doc (alias = "gst_task_set_leave_callback" )] |
| 52 | pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self { |
| 53 | Self { |
| 54 | leave_callback: Some(Box::new(leave_callback)), |
| 55 | ..self |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | #[doc (alias = "gst_task_set_leave_callback" )] |
| 60 | pub fn leave_callback_if<E: FnMut(&Task) + Send + 'static>( |
| 61 | self, |
| 62 | leave_callback: E, |
| 63 | predicate: bool, |
| 64 | ) -> Self { |
| 65 | if predicate { |
| 66 | self.leave_callback(leave_callback) |
| 67 | } else { |
| 68 | self |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | #[doc (alias = "gst_task_set_leave_callback" )] |
| 73 | pub fn leave_callback_if_some<E: FnMut(&Task) + Send + 'static>( |
| 74 | self, |
| 75 | leave_callback: Option<E>, |
| 76 | ) -> Self { |
| 77 | if let Some(leave_callback) = leave_callback { |
| 78 | self.leave_callback(leave_callback) |
| 79 | } else { |
| 80 | self |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | #[doc (alias = "gst_task_set_lock" )] |
| 85 | pub fn lock(self, lock: &TaskLock) -> Self { |
| 86 | Self { |
| 87 | lock: Some(lock.clone()), |
| 88 | ..self |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | #[doc (alias = "gst_task_set_lock" )] |
| 93 | pub fn lock_if(self, lock: &TaskLock, predicate: bool) -> Self { |
| 94 | if predicate { |
| 95 | self.lock(lock) |
| 96 | } else { |
| 97 | self |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | #[doc (alias = "gst_task_set_lock" )] |
| 102 | pub fn lock_if_some(self, lock: Option<&TaskLock>) -> Self { |
| 103 | if let Some(lock) = lock { |
| 104 | self.lock(lock) |
| 105 | } else { |
| 106 | self |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | #[doc (alias = "gst_task_new" )] |
| 111 | pub fn build(self) -> Task { |
| 112 | unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>( |
| 113 | user_data: glib::ffi::gpointer, |
| 114 | ) { |
| 115 | let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _); |
| 116 | (callback.0)(&from_glib_borrow(callback.1)); |
| 117 | } |
| 118 | |
| 119 | unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>( |
| 120 | data: glib::ffi::gpointer, |
| 121 | ) { |
| 122 | let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _); |
| 123 | } |
| 124 | |
| 125 | unsafe extern "C" fn callback_trampoline( |
| 126 | task: *mut ffi::GstTask, |
| 127 | _thread: *mut glib::ffi::GThread, |
| 128 | data: glib::ffi::gpointer, |
| 129 | ) { |
| 130 | let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _); |
| 131 | callback(&from_glib_borrow(task)); |
| 132 | } |
| 133 | |
| 134 | #[allow (clippy::type_complexity)] |
| 135 | unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) { |
| 136 | let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> = |
| 137 | Box::from_raw(data as *mut _); |
| 138 | } |
| 139 | |
| 140 | unsafe { |
| 141 | let func_ptr = Box::into_raw(self.func); |
| 142 | |
| 143 | let task: Task = from_glib_full(ffi::gst_task_new( |
| 144 | Some(func_trampoline::<F> as _), |
| 145 | func_ptr as *mut _, |
| 146 | Some(destroy_func::<F> as _), |
| 147 | )); |
| 148 | |
| 149 | (*func_ptr).1 = task.to_glib_none().0; |
| 150 | |
| 151 | let lock = self.lock.unwrap_or_default(); |
| 152 | ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0)); |
| 153 | task.set_data("gstreamer-rs-task-lock" , Arc::clone(&lock.0)); |
| 154 | |
| 155 | if let Some(enter_callback) = self.enter_callback { |
| 156 | ffi::gst_task_set_enter_callback( |
| 157 | task.to_glib_none().0, |
| 158 | Some(callback_trampoline), |
| 159 | Box::into_raw(Box::new(enter_callback)) as *mut _, |
| 160 | Some(destroy_callback), |
| 161 | ); |
| 162 | } |
| 163 | |
| 164 | if let Some(leave_callback) = self.leave_callback { |
| 165 | ffi::gst_task_set_leave_callback( |
| 166 | task.to_glib_none().0, |
| 167 | Some(callback_trampoline), |
| 168 | Box::into_raw(Box::new(leave_callback)) as *mut _, |
| 169 | Some(destroy_callback), |
| 170 | ); |
| 171 | } |
| 172 | |
| 173 | task |
| 174 | } |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | impl Task { |
| 179 | #[doc (alias = "gst_task_new" )] |
| 180 | pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> { |
| 181 | assert_initialized_main_thread!(); |
| 182 | TaskBuilder { |
| 183 | func: Box::new((func, ptr::null_mut())), |
| 184 | lock: None, |
| 185 | enter_callback: None, |
| 186 | leave_callback: None, |
| 187 | } |
| 188 | } |
| 189 | } |
| 190 | |
| 191 | #[derive (Debug, Clone)] |
| 192 | pub struct TaskLock(Arc<RecMutex>); |
| 193 | |
| 194 | impl Default for TaskLock { |
| 195 | #[inline ] |
| 196 | fn default() -> Self { |
| 197 | Self::new() |
| 198 | } |
| 199 | } |
| 200 | |
| 201 | #[derive (Debug)] |
| 202 | struct RecMutex(glib::ffi::GRecMutex); |
| 203 | |
| 204 | unsafe impl Send for RecMutex {} |
| 205 | unsafe impl Sync for RecMutex {} |
| 206 | |
| 207 | #[must_use = "if unused the TaskLock will immediately unlock" ] |
| 208 | pub struct TaskLockGuard<'a>(&'a RecMutex); |
| 209 | |
| 210 | impl TaskLock { |
| 211 | #[inline ] |
| 212 | pub fn new() -> Self { |
| 213 | unsafe { |
| 214 | let lock: TaskLock = TaskLock(Arc::new(data:RecMutex(mem::zeroed()))); |
| 215 | glib::ffi::g_rec_mutex_init(rec_mutex:mut_override(&lock.0 .0)); |
| 216 | lock |
| 217 | } |
| 218 | } |
| 219 | |
| 220 | // checker-ignore-item |
| 221 | #[inline ] |
| 222 | pub fn lock(&self) -> TaskLockGuard { |
| 223 | unsafe { |
| 224 | let guard: TaskLockGuard<'_> = TaskLockGuard(&self.0); |
| 225 | glib::ffi::g_rec_mutex_lock(rec_mutex:mut_override(&self.0 .0)); |
| 226 | guard |
| 227 | } |
| 228 | } |
| 229 | } |
| 230 | |
| 231 | impl Drop for RecMutex { |
| 232 | #[inline ] |
| 233 | fn drop(&mut self) { |
| 234 | unsafe { |
| 235 | glib::ffi::g_rec_mutex_clear(&mut self.0); |
| 236 | } |
| 237 | } |
| 238 | } |
| 239 | |
| 240 | impl Drop for TaskLockGuard<'_> { |
| 241 | #[inline ] |
| 242 | fn drop(&mut self) { |
| 243 | unsafe { |
| 244 | glib::ffi::g_rec_mutex_unlock(rec_mutex:mut_override(&self.0 .0)); |
| 245 | } |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | #[cfg (test)] |
| 250 | mod tests { |
| 251 | use std::sync::mpsc::channel; |
| 252 | |
| 253 | use super::*; |
| 254 | use crate::prelude::*; |
| 255 | |
| 256 | #[test ] |
| 257 | fn test_simple() { |
| 258 | crate::init().unwrap(); |
| 259 | |
| 260 | #[derive (Debug, PartialEq, Eq)] |
| 261 | enum Called { |
| 262 | Enter, |
| 263 | Func, |
| 264 | Leave, |
| 265 | } |
| 266 | |
| 267 | let (send, recv) = channel(); |
| 268 | let lock = TaskLock::new(); |
| 269 | |
| 270 | let task = Task::builder({ |
| 271 | let send = send.clone(); |
| 272 | let mut count = 0; |
| 273 | move |task| { |
| 274 | count += 1; |
| 275 | if count >= 3 { |
| 276 | task.pause().unwrap(); |
| 277 | } |
| 278 | send.send(Called::Func).unwrap(); |
| 279 | } |
| 280 | }) |
| 281 | .enter_callback({ |
| 282 | let send = send.clone(); |
| 283 | move |_task| { |
| 284 | send.send(Called::Enter).unwrap(); |
| 285 | } |
| 286 | }) |
| 287 | .leave_callback({ |
| 288 | move |_task| { |
| 289 | send.send(Called::Leave).unwrap(); |
| 290 | } |
| 291 | }) |
| 292 | .lock(&lock) |
| 293 | .build(); |
| 294 | |
| 295 | task.start().unwrap(); |
| 296 | |
| 297 | assert_eq!(recv.recv(), Ok(Called::Enter)); |
| 298 | assert_eq!(recv.recv(), Ok(Called::Func)); |
| 299 | assert_eq!(recv.recv(), Ok(Called::Func)); |
| 300 | assert_eq!(recv.recv(), Ok(Called::Func)); |
| 301 | |
| 302 | assert_eq!(task.state(), crate::TaskState::Paused); |
| 303 | task.stop().unwrap(); |
| 304 | assert_eq!(recv.recv(), Ok(Called::Leave)); |
| 305 | task.join().unwrap(); |
| 306 | } |
| 307 | } |
| 308 | |