1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4 hash::{Hash, Hasher},
5 ptr,
6 sync::{Arc, Mutex},
7};
8
9use glib::{ffi::gpointer, subclass::prelude::*, translate::*};
10
11use super::prelude::*;
12use crate::{TaskHandle, TaskPool};
13
14pub trait TaskPoolImpl: GstObjectImpl + Send + Sync {
15 // rustdoc-stripper-ignore-next
16 /// Handle to be returned from the `push` function to allow the caller to wait for the task's
17 /// completion.
18 ///
19 /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle
20 /// that does nothing on `join` or drop.
21 type Handle: TaskHandle;
22
23 // rustdoc-stripper-ignore-next
24 /// Prepare the task pool to accept tasks.
25 ///
26 /// This defaults to doing nothing.
27 fn prepare(&self) -> Result<(), glib::Error> {
28 Ok(())
29 }
30
31 // rustdoc-stripper-ignore-next
32 /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped.
33 ///
34 /// This is mainly used internally to ensure proper cleanup of internal data structures in test
35 /// suites.
36 fn cleanup(&self) {}
37
38 // rustdoc-stripper-ignore-next
39 /// Deliver a task to the pool.
40 ///
41 /// If returning `Ok`, you need to call the `func` eventually.
42 ///
43 /// If returning `Err`, the `func` must be dropped without calling it.
44 fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>;
45}
46
47unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool {
48 fn class_init(klass: &mut glib::Class<Self>) {
49 Self::parent_class_init::<T>(class:klass);
50 let klass: &mut GstTaskPoolClass = klass.as_mut();
51 klass.prepare = Some(task_pool_prepare::<T>);
52 klass.cleanup = Some(task_pool_cleanup::<T>);
53 klass.push = Some(task_pool_push::<T>);
54 klass.join = Some(task_pool_join::<T>);
55
56 #[cfg(feature = "v1_20")]
57 {
58 klass.dispose_handle = Some(task_pool_dispose_handle::<T>);
59 }
60 }
61}
62
63unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>(
64 ptr: *mut ffi::GstTaskPool,
65 error: *mut *mut glib::ffi::GError,
66) {
67 let instance: &::Instance = &*(ptr as *mut T::Instance);
68 let imp: &T = instance.imp();
69
70 match imp.prepare() {
71 Ok(()) => {}
72 Err(err: Error) => {
73 if !error.is_null() {
74 *error = err.into_glib_ptr();
75 }
76 }
77 }
78}
79
80unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) {
81 let instance: &::Instance = &*(ptr as *mut T::Instance);
82 let imp: &T = instance.imp();
83
84 imp.cleanup();
85}
86
87unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>(
88 ptr: *mut ffi::GstTaskPool,
89 func: ffi::GstTaskPoolFunction,
90 user_data: gpointer,
91 error: *mut *mut glib::ffi::GError,
92) -> gpointer {
93 let instance: &::Instance = &*(ptr as *mut T::Instance);
94 let imp: &T = instance.imp();
95
96 let func: TaskPoolFunction = TaskPoolFunction::new(func:func.expect(msg:"Tried to push null func"), user_data);
97
98 match imp.push(func.clone()) {
99 Ok(None) => ptr::null_mut(),
100 Ok(Some(handle: ::Handle)) => Box::into_raw(Box::new(handle)) as gpointer,
101 Err(err: Error) => {
102 func.prevent_call();
103 if !error.is_null() {
104 *error = err.into_glib_ptr();
105 }
106 ptr::null_mut()
107 }
108 }
109}
110
111unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) {
112 if id.is_null() {
113 let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
114 crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to join null handle");
115 return;
116 }
117
118 let handle: Box<::Handle> = Box::from_raw(id as *mut T::Handle);
119 handle.join();
120}
121
122#[cfg(feature = "v1_20")]
123#[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
124unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>(
125 ptr: *mut ffi::GstTaskPool,
126 id: gpointer,
127) {
128 if id.is_null() {
129 let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
130 crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to dispose null handle");
131 return;
132 }
133
134 let handle = Box::from_raw(id as *mut T::Handle);
135 drop(handle);
136}
137
138// rustdoc-stripper-ignore-next
139/// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push).
140#[derive(Debug)]
141pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>);
142
143// `Arc<Mutex<Option<…>>>` is required so that we can enforce that the function
144// has not been called and will never be called after `push` returns `Err`.
145
146#[derive(Debug)]
147struct TaskPoolFunctionInner {
148 func: unsafe extern "C" fn(gpointer),
149 user_data: gpointer,
150 warn_on_drop: bool,
151}
152
153unsafe impl Send for TaskPoolFunctionInner {}
154
155impl TaskPoolFunction {
156 fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self {
157 let inner = TaskPoolFunctionInner {
158 func,
159 user_data,
160 warn_on_drop: true,
161 };
162 Self(Arc::new(Mutex::new(Some(inner))))
163 }
164
165 #[inline]
166 fn clone(&self) -> Self {
167 Self(self.0.clone())
168 }
169
170 // rustdoc-stripper-ignore-next
171 /// Consume and execute the function.
172 pub fn call(self) {
173 let mut inner = self
174 .0
175 .lock()
176 .unwrap()
177 .take()
178 .expect("TaskPoolFunction has already been dropped");
179 inner.warn_on_drop = false;
180 unsafe { (inner.func)(inner.user_data) }
181 }
182
183 fn prevent_call(self) {
184 let mut inner = self
185 .0
186 .lock()
187 .unwrap()
188 .take()
189 .expect("TaskPoolFunction has already been called");
190 inner.warn_on_drop = false;
191 drop(inner);
192 }
193
194 #[inline]
195 fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> {
196 Arc::as_ptr(&self.0)
197 }
198}
199
200impl Drop for TaskPoolFunctionInner {
201 fn drop(&mut self) {
202 if self.warn_on_drop {
203 crate::warning!(crate::CAT_RUST, "Leaked task function");
204 }
205 }
206}
207
208impl PartialEq for TaskPoolFunction {
209 fn eq(&self, other: &Self) -> bool {
210 self.as_ptr() == other.as_ptr()
211 }
212}
213
214impl Eq for TaskPoolFunction {}
215
216impl PartialOrd for TaskPoolFunction {
217 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
218 Some(self.cmp(other))
219 }
220}
221
222impl Ord for TaskPoolFunction {
223 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
224 self.as_ptr().cmp(&other.as_ptr())
225 }
226}
227
228impl Hash for TaskPoolFunction {
229 fn hash<H: Hasher>(&self, state: &mut H) {
230 self.as_ptr().hash(state)
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use std::{
237 sync::{
238 atomic,
239 mpsc::{channel, TryRecvError},
240 },
241 thread,
242 };
243
244 use super::*;
245 use crate::prelude::*;
246
247 pub mod imp {
248 use super::*;
249
250 #[derive(Default)]
251 pub struct TestPool {
252 pub(super) prepared: atomic::AtomicBool,
253 pub(super) cleaned_up: atomic::AtomicBool,
254 }
255
256 #[glib::object_subclass]
257 impl ObjectSubclass for TestPool {
258 const NAME: &'static str = "TestPool";
259 type Type = super::TestPool;
260 type ParentType = TaskPool;
261 }
262
263 impl ObjectImpl for TestPool {}
264
265 impl GstObjectImpl for TestPool {}
266
267 impl TaskPoolImpl for TestPool {
268 type Handle = TestHandle;
269
270 fn prepare(&self) -> Result<(), glib::Error> {
271 self.prepared.store(true, atomic::Ordering::SeqCst);
272 Ok(())
273 }
274
275 fn cleanup(&self) {
276 self.cleaned_up.store(true, atomic::Ordering::SeqCst);
277 }
278
279 fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> {
280 let handle = thread::spawn(move || func.call());
281 Ok(Some(TestHandle(handle)))
282 }
283 }
284
285 pub struct TestHandle(thread::JoinHandle<()>);
286
287 impl TaskHandle for TestHandle {
288 fn join(self) {
289 self.0.join().unwrap();
290 }
291 }
292 }
293
294 glib::wrapper! {
295 pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object;
296 }
297
298 unsafe impl Send for TestPool {}
299 unsafe impl Sync for TestPool {}
300
301 impl TestPool {
302 pub fn new() -> Self {
303 Self::default()
304 }
305 }
306
307 impl Default for TestPool {
308 fn default() -> Self {
309 glib::Object::new()
310 }
311 }
312
313 #[test]
314 fn test_simple_subclass() {
315 crate::init().unwrap();
316
317 let pool = TestPool::new();
318 pool.prepare().unwrap();
319
320 let (sender, receiver) = channel();
321
322 let handle = pool
323 .push(move || {
324 sender.send(()).unwrap();
325 })
326 .unwrap();
327 let handle = handle.unwrap();
328
329 assert_eq!(receiver.recv(), Ok(()));
330
331 handle.join();
332 assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
333
334 pool.cleanup();
335
336 let imp = pool.imp();
337 assert!(imp.prepared.load(atomic::Ordering::SeqCst));
338 assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst));
339 }
340}
341