| 1 | // Take a look at the license at the top of the repository in the LICENSE file. |
| 2 | |
| 3 | use std::{ |
| 4 | hash::{Hash, Hasher}, |
| 5 | ptr, |
| 6 | sync::{Arc, Mutex}, |
| 7 | }; |
| 8 | |
| 9 | use glib::{ffi::gpointer, subclass::prelude::*, translate::*}; |
| 10 | |
| 11 | use super::prelude::*; |
| 12 | use crate::{ffi, TaskHandle, TaskPool}; |
| 13 | |
| 14 | pub trait TaskPoolImpl: GstObjectImpl + Send + Sync { |
| 15 | // rustdoc-stripper-ignore-next |
| 16 | /// Handle to be returned from the `push` function to allow the caller to wait for the task's |
| 17 | /// completion. |
| 18 | /// |
| 19 | /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle |
| 20 | /// that does nothing on `join` or drop. |
| 21 | type Handle: TaskHandle; |
| 22 | |
| 23 | // rustdoc-stripper-ignore-next |
| 24 | /// Prepare the task pool to accept tasks. |
| 25 | /// |
| 26 | /// This defaults to doing nothing. |
| 27 | fn prepare(&self) -> Result<(), glib::Error> { |
| 28 | Ok(()) |
| 29 | } |
| 30 | |
| 31 | // rustdoc-stripper-ignore-next |
| 32 | /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped. |
| 33 | /// |
| 34 | /// This is mainly used internally to ensure proper cleanup of internal data structures in test |
| 35 | /// suites. |
| 36 | fn cleanup(&self) {} |
| 37 | |
| 38 | // rustdoc-stripper-ignore-next |
| 39 | /// Deliver a task to the pool. |
| 40 | /// |
| 41 | /// If returning `Ok`, you need to call the `func` eventually. |
| 42 | /// |
| 43 | /// If returning `Err`, the `func` must be dropped without calling it. |
| 44 | fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>; |
| 45 | } |
| 46 | |
| 47 | unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool { |
| 48 | fn class_init(klass: &mut glib::Class<Self>) { |
| 49 | Self::parent_class_init::<T>(class:klass); |
| 50 | let klass: &mut GstTaskPoolClass = klass.as_mut(); |
| 51 | klass.prepare = Some(task_pool_prepare::<T>); |
| 52 | klass.cleanup = Some(task_pool_cleanup::<T>); |
| 53 | klass.push = Some(task_pool_push::<T>); |
| 54 | klass.join = Some(task_pool_join::<T>); |
| 55 | |
| 56 | #[cfg (feature = "v1_20" )] |
| 57 | { |
| 58 | klass.dispose_handle = Some(task_pool_dispose_handle::<T>); |
| 59 | } |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>( |
| 64 | ptr: *mut ffi::GstTaskPool, |
| 65 | error: *mut *mut glib::ffi::GError, |
| 66 | ) { |
| 67 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
| 68 | let imp: &T = instance.imp(); |
| 69 | |
| 70 | match imp.prepare() { |
| 71 | Ok(()) => {} |
| 72 | Err(err: Error) => { |
| 73 | if !error.is_null() { |
| 74 | *error = err.into_glib_ptr(); |
| 75 | } |
| 76 | } |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) { |
| 81 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
| 82 | let imp: &T = instance.imp(); |
| 83 | |
| 84 | imp.cleanup(); |
| 85 | } |
| 86 | |
| 87 | unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>( |
| 88 | ptr: *mut ffi::GstTaskPool, |
| 89 | func: ffi::GstTaskPoolFunction, |
| 90 | user_data: gpointer, |
| 91 | error: *mut *mut glib::ffi::GError, |
| 92 | ) -> gpointer { |
| 93 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
| 94 | let imp: &T = instance.imp(); |
| 95 | |
| 96 | let func: TaskPoolFunction = TaskPoolFunction::new(func.expect(msg:"Tried to push null func" ), user_data); |
| 97 | |
| 98 | match imp.push(func.clone()) { |
| 99 | Ok(None) => ptr::null_mut(), |
| 100 | Ok(Some(handle: ::Handle)) => Box::into_raw(Box::new(handle)) as gpointer, |
| 101 | Err(err: Error) => { |
| 102 | func.prevent_call(); |
| 103 | if !error.is_null() { |
| 104 | *error = err.into_glib_ptr(); |
| 105 | } |
| 106 | ptr::null_mut() |
| 107 | } |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) { |
| 112 | if id.is_null() { |
| 113 | let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr); |
| 114 | crate::warning!( |
| 115 | crate::CAT_RUST, |
| 116 | obj = wrap.as_ref(), |
| 117 | "Tried to join null handle" |
| 118 | ); |
| 119 | return; |
| 120 | } |
| 121 | |
| 122 | let handle: Box<::Handle> = Box::from_raw(id as *mut T::Handle); |
| 123 | handle.join(); |
| 124 | } |
| 125 | |
| 126 | #[cfg (feature = "v1_20" )] |
| 127 | #[cfg_attr (docsrs, doc(cfg(feature = "v1_20" )))] |
| 128 | unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>( |
| 129 | ptr: *mut ffi::GstTaskPool, |
| 130 | id: gpointer, |
| 131 | ) { |
| 132 | if id.is_null() { |
| 133 | let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr); |
| 134 | crate::warning!( |
| 135 | crate::CAT_RUST, |
| 136 | obj = wrap.as_ref(), |
| 137 | "Tried to dispose null handle" |
| 138 | ); |
| 139 | return; |
| 140 | } |
| 141 | |
| 142 | let handle = Box::from_raw(id as *mut T::Handle); |
| 143 | drop(handle); |
| 144 | } |
| 145 | |
| 146 | // rustdoc-stripper-ignore-next |
| 147 | /// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push). |
| 148 | #[derive (Debug)] |
| 149 | pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>); |
| 150 | |
| 151 | // `Arc<Mutex<Option<…>>>` is required so that we can enforce that the function |
| 152 | // has not been called and will never be called after `push` returns `Err`. |
| 153 | |
| 154 | #[derive (Debug)] |
| 155 | struct TaskPoolFunctionInner { |
| 156 | func: unsafe extern "C" fn(gpointer), |
| 157 | user_data: gpointer, |
| 158 | warn_on_drop: bool, |
| 159 | } |
| 160 | |
| 161 | unsafe impl Send for TaskPoolFunctionInner {} |
| 162 | |
| 163 | impl TaskPoolFunction { |
| 164 | fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self { |
| 165 | let inner = TaskPoolFunctionInner { |
| 166 | func, |
| 167 | user_data, |
| 168 | warn_on_drop: true, |
| 169 | }; |
| 170 | Self(Arc::new(Mutex::new(Some(inner)))) |
| 171 | } |
| 172 | |
| 173 | #[inline ] |
| 174 | fn clone(&self) -> Self { |
| 175 | Self(self.0.clone()) |
| 176 | } |
| 177 | |
| 178 | // rustdoc-stripper-ignore-next |
| 179 | /// Consume and execute the function. |
| 180 | pub fn call(self) { |
| 181 | let mut inner = self |
| 182 | .0 |
| 183 | .lock() |
| 184 | .unwrap() |
| 185 | .take() |
| 186 | .expect("TaskPoolFunction has already been dropped" ); |
| 187 | inner.warn_on_drop = false; |
| 188 | unsafe { (inner.func)(inner.user_data) } |
| 189 | } |
| 190 | |
| 191 | fn prevent_call(self) { |
| 192 | let mut inner = self |
| 193 | .0 |
| 194 | .lock() |
| 195 | .unwrap() |
| 196 | .take() |
| 197 | .expect("TaskPoolFunction has already been called" ); |
| 198 | inner.warn_on_drop = false; |
| 199 | drop(inner); |
| 200 | } |
| 201 | |
| 202 | #[inline ] |
| 203 | fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> { |
| 204 | Arc::as_ptr(&self.0) |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | impl Drop for TaskPoolFunctionInner { |
| 209 | fn drop(&mut self) { |
| 210 | if self.warn_on_drop { |
| 211 | crate::warning!(crate::CAT_RUST, "Leaked task function" ); |
| 212 | } |
| 213 | } |
| 214 | } |
| 215 | |
| 216 | impl PartialEq for TaskPoolFunction { |
| 217 | fn eq(&self, other: &Self) -> bool { |
| 218 | self.as_ptr() == other.as_ptr() |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | impl Eq for TaskPoolFunction {} |
| 223 | |
| 224 | impl PartialOrd for TaskPoolFunction { |
| 225 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { |
| 226 | Some(self.cmp(other)) |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | impl Ord for TaskPoolFunction { |
| 231 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { |
| 232 | self.as_ptr().cmp(&other.as_ptr()) |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | impl Hash for TaskPoolFunction { |
| 237 | fn hash<H: Hasher>(&self, state: &mut H) { |
| 238 | self.as_ptr().hash(state) |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | #[cfg (test)] |
| 243 | mod tests { |
| 244 | use std::{ |
| 245 | sync::{ |
| 246 | atomic, |
| 247 | mpsc::{channel, TryRecvError}, |
| 248 | }, |
| 249 | thread, |
| 250 | }; |
| 251 | |
| 252 | use super::*; |
| 253 | use crate::prelude::*; |
| 254 | |
| 255 | pub mod imp { |
| 256 | use super::*; |
| 257 | |
| 258 | #[derive (Default)] |
| 259 | pub struct TestPool { |
| 260 | pub(super) prepared: atomic::AtomicBool, |
| 261 | pub(super) cleaned_up: atomic::AtomicBool, |
| 262 | } |
| 263 | |
| 264 | #[glib::object_subclass ] |
| 265 | impl ObjectSubclass for TestPool { |
| 266 | const NAME: &'static str = "TestPool" ; |
| 267 | type Type = super::TestPool; |
| 268 | type ParentType = TaskPool; |
| 269 | } |
| 270 | |
| 271 | impl ObjectImpl for TestPool {} |
| 272 | |
| 273 | impl GstObjectImpl for TestPool {} |
| 274 | |
| 275 | impl TaskPoolImpl for TestPool { |
| 276 | type Handle = TestHandle; |
| 277 | |
| 278 | fn prepare(&self) -> Result<(), glib::Error> { |
| 279 | self.prepared.store(true, atomic::Ordering::SeqCst); |
| 280 | Ok(()) |
| 281 | } |
| 282 | |
| 283 | fn cleanup(&self) { |
| 284 | self.cleaned_up.store(true, atomic::Ordering::SeqCst); |
| 285 | } |
| 286 | |
| 287 | fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> { |
| 288 | let handle = thread::spawn(move || func.call()); |
| 289 | Ok(Some(TestHandle(handle))) |
| 290 | } |
| 291 | } |
| 292 | |
| 293 | pub struct TestHandle(thread::JoinHandle<()>); |
| 294 | |
| 295 | impl TaskHandle for TestHandle { |
| 296 | fn join(self) { |
| 297 | self.0.join().unwrap(); |
| 298 | } |
| 299 | } |
| 300 | } |
| 301 | |
| 302 | glib::wrapper! { |
| 303 | pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object; |
| 304 | } |
| 305 | |
| 306 | unsafe impl Send for TestPool {} |
| 307 | unsafe impl Sync for TestPool {} |
| 308 | |
| 309 | impl TestPool { |
| 310 | pub fn new() -> Self { |
| 311 | Self::default() |
| 312 | } |
| 313 | } |
| 314 | |
| 315 | impl Default for TestPool { |
| 316 | fn default() -> Self { |
| 317 | glib::Object::new() |
| 318 | } |
| 319 | } |
| 320 | |
| 321 | #[test ] |
| 322 | fn test_simple_subclass() { |
| 323 | crate::init().unwrap(); |
| 324 | |
| 325 | let pool = TestPool::new(); |
| 326 | pool.prepare().unwrap(); |
| 327 | |
| 328 | let (sender, receiver) = channel(); |
| 329 | |
| 330 | let handle = pool |
| 331 | .push(move || { |
| 332 | sender.send(()).unwrap(); |
| 333 | }) |
| 334 | .unwrap(); |
| 335 | let handle = handle.unwrap(); |
| 336 | |
| 337 | assert_eq!(receiver.recv(), Ok(())); |
| 338 | |
| 339 | handle.join(); |
| 340 | assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); |
| 341 | |
| 342 | pool.cleanup(); |
| 343 | |
| 344 | let imp = pool.imp(); |
| 345 | assert!(imp.prepared.load(atomic::Ordering::SeqCst)); |
| 346 | assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst)); |
| 347 | } |
| 348 | } |
| 349 | |