1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{ |
4 | hash::{Hash, Hasher}, |
5 | ptr, |
6 | sync::{Arc, Mutex}, |
7 | }; |
8 | |
9 | use glib::{ffi::gpointer, subclass::prelude::*, translate::*}; |
10 | |
11 | use super::prelude::*; |
12 | use crate::{ffi, TaskHandle, TaskPool}; |
13 | |
14 | pub trait TaskPoolImpl: GstObjectImpl + Send + Sync { |
15 | // rustdoc-stripper-ignore-next |
16 | /// Handle to be returned from the `push` function to allow the caller to wait for the task's |
17 | /// completion. |
18 | /// |
19 | /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle |
20 | /// that does nothing on `join` or drop. |
21 | type Handle: TaskHandle; |
22 | |
23 | // rustdoc-stripper-ignore-next |
24 | /// Prepare the task pool to accept tasks. |
25 | /// |
26 | /// This defaults to doing nothing. |
27 | fn prepare(&self) -> Result<(), glib::Error> { |
28 | Ok(()) |
29 | } |
30 | |
31 | // rustdoc-stripper-ignore-next |
32 | /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped. |
33 | /// |
34 | /// This is mainly used internally to ensure proper cleanup of internal data structures in test |
35 | /// suites. |
36 | fn cleanup(&self) {} |
37 | |
38 | // rustdoc-stripper-ignore-next |
39 | /// Deliver a task to the pool. |
40 | /// |
41 | /// If returning `Ok`, you need to call the `func` eventually. |
42 | /// |
43 | /// If returning `Err`, the `func` must be dropped without calling it. |
44 | fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>; |
45 | } |
46 | |
47 | unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool { |
48 | fn class_init(klass: &mut glib::Class<Self>) { |
49 | Self::parent_class_init::<T>(class:klass); |
50 | let klass: &mut GstTaskPoolClass = klass.as_mut(); |
51 | klass.prepare = Some(task_pool_prepare::<T>); |
52 | klass.cleanup = Some(task_pool_cleanup::<T>); |
53 | klass.push = Some(task_pool_push::<T>); |
54 | klass.join = Some(task_pool_join::<T>); |
55 | |
56 | #[cfg (feature = "v1_20" )] |
57 | { |
58 | klass.dispose_handle = Some(task_pool_dispose_handle::<T>); |
59 | } |
60 | } |
61 | } |
62 | |
63 | unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>( |
64 | ptr: *mut ffi::GstTaskPool, |
65 | error: *mut *mut glib::ffi::GError, |
66 | ) { |
67 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
68 | let imp: &T = instance.imp(); |
69 | |
70 | match imp.prepare() { |
71 | Ok(()) => {} |
72 | Err(err: Error) => { |
73 | if !error.is_null() { |
74 | *error = err.into_glib_ptr(); |
75 | } |
76 | } |
77 | } |
78 | } |
79 | |
80 | unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) { |
81 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
82 | let imp: &T = instance.imp(); |
83 | |
84 | imp.cleanup(); |
85 | } |
86 | |
87 | unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>( |
88 | ptr: *mut ffi::GstTaskPool, |
89 | func: ffi::GstTaskPoolFunction, |
90 | user_data: gpointer, |
91 | error: *mut *mut glib::ffi::GError, |
92 | ) -> gpointer { |
93 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
94 | let imp: &T = instance.imp(); |
95 | |
96 | let func: TaskPoolFunction = TaskPoolFunction::new(func.expect(msg:"Tried to push null func" ), user_data); |
97 | |
98 | match imp.push(func.clone()) { |
99 | Ok(None) => ptr::null_mut(), |
100 | Ok(Some(handle: ::Handle)) => Box::into_raw(Box::new(handle)) as gpointer, |
101 | Err(err: Error) => { |
102 | func.prevent_call(); |
103 | if !error.is_null() { |
104 | *error = err.into_glib_ptr(); |
105 | } |
106 | ptr::null_mut() |
107 | } |
108 | } |
109 | } |
110 | |
111 | unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) { |
112 | if id.is_null() { |
113 | let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr); |
114 | crate::warning!( |
115 | crate::CAT_RUST, |
116 | obj = wrap.as_ref(), |
117 | "Tried to join null handle" |
118 | ); |
119 | return; |
120 | } |
121 | |
122 | let handle: Box<::Handle> = Box::from_raw(id as *mut T::Handle); |
123 | handle.join(); |
124 | } |
125 | |
126 | #[cfg (feature = "v1_20" )] |
127 | #[cfg_attr (docsrs, doc(cfg(feature = "v1_20" )))] |
128 | unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>( |
129 | ptr: *mut ffi::GstTaskPool, |
130 | id: gpointer, |
131 | ) { |
132 | if id.is_null() { |
133 | let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr); |
134 | crate::warning!( |
135 | crate::CAT_RUST, |
136 | obj = wrap.as_ref(), |
137 | "Tried to dispose null handle" |
138 | ); |
139 | return; |
140 | } |
141 | |
142 | let handle = Box::from_raw(id as *mut T::Handle); |
143 | drop(handle); |
144 | } |
145 | |
146 | // rustdoc-stripper-ignore-next |
147 | /// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push). |
148 | #[derive (Debug)] |
149 | pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>); |
150 | |
151 | // `Arc<Mutex<Option<…>>>` is required so that we can enforce that the function |
152 | // has not been called and will never be called after `push` returns `Err`. |
153 | |
154 | #[derive (Debug)] |
155 | struct TaskPoolFunctionInner { |
156 | func: unsafe extern "C" fn(gpointer), |
157 | user_data: gpointer, |
158 | warn_on_drop: bool, |
159 | } |
160 | |
161 | unsafe impl Send for TaskPoolFunctionInner {} |
162 | |
163 | impl TaskPoolFunction { |
164 | fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self { |
165 | let inner = TaskPoolFunctionInner { |
166 | func, |
167 | user_data, |
168 | warn_on_drop: true, |
169 | }; |
170 | Self(Arc::new(Mutex::new(Some(inner)))) |
171 | } |
172 | |
173 | #[inline ] |
174 | fn clone(&self) -> Self { |
175 | Self(self.0.clone()) |
176 | } |
177 | |
178 | // rustdoc-stripper-ignore-next |
179 | /// Consume and execute the function. |
180 | pub fn call(self) { |
181 | let mut inner = self |
182 | .0 |
183 | .lock() |
184 | .unwrap() |
185 | .take() |
186 | .expect("TaskPoolFunction has already been dropped" ); |
187 | inner.warn_on_drop = false; |
188 | unsafe { (inner.func)(inner.user_data) } |
189 | } |
190 | |
191 | fn prevent_call(self) { |
192 | let mut inner = self |
193 | .0 |
194 | .lock() |
195 | .unwrap() |
196 | .take() |
197 | .expect("TaskPoolFunction has already been called" ); |
198 | inner.warn_on_drop = false; |
199 | drop(inner); |
200 | } |
201 | |
202 | #[inline ] |
203 | fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> { |
204 | Arc::as_ptr(&self.0) |
205 | } |
206 | } |
207 | |
208 | impl Drop for TaskPoolFunctionInner { |
209 | fn drop(&mut self) { |
210 | if self.warn_on_drop { |
211 | crate::warning!(crate::CAT_RUST, "Leaked task function" ); |
212 | } |
213 | } |
214 | } |
215 | |
216 | impl PartialEq for TaskPoolFunction { |
217 | fn eq(&self, other: &Self) -> bool { |
218 | self.as_ptr() == other.as_ptr() |
219 | } |
220 | } |
221 | |
222 | impl Eq for TaskPoolFunction {} |
223 | |
224 | impl PartialOrd for TaskPoolFunction { |
225 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { |
226 | Some(self.cmp(other)) |
227 | } |
228 | } |
229 | |
230 | impl Ord for TaskPoolFunction { |
231 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { |
232 | self.as_ptr().cmp(&other.as_ptr()) |
233 | } |
234 | } |
235 | |
236 | impl Hash for TaskPoolFunction { |
237 | fn hash<H: Hasher>(&self, state: &mut H) { |
238 | self.as_ptr().hash(state) |
239 | } |
240 | } |
241 | |
242 | #[cfg (test)] |
243 | mod tests { |
244 | use std::{ |
245 | sync::{ |
246 | atomic, |
247 | mpsc::{channel, TryRecvError}, |
248 | }, |
249 | thread, |
250 | }; |
251 | |
252 | use super::*; |
253 | use crate::prelude::*; |
254 | |
255 | pub mod imp { |
256 | use super::*; |
257 | |
258 | #[derive (Default)] |
259 | pub struct TestPool { |
260 | pub(super) prepared: atomic::AtomicBool, |
261 | pub(super) cleaned_up: atomic::AtomicBool, |
262 | } |
263 | |
264 | #[glib::object_subclass ] |
265 | impl ObjectSubclass for TestPool { |
266 | const NAME: &'static str = "TestPool" ; |
267 | type Type = super::TestPool; |
268 | type ParentType = TaskPool; |
269 | } |
270 | |
271 | impl ObjectImpl for TestPool {} |
272 | |
273 | impl GstObjectImpl for TestPool {} |
274 | |
275 | impl TaskPoolImpl for TestPool { |
276 | type Handle = TestHandle; |
277 | |
278 | fn prepare(&self) -> Result<(), glib::Error> { |
279 | self.prepared.store(true, atomic::Ordering::SeqCst); |
280 | Ok(()) |
281 | } |
282 | |
283 | fn cleanup(&self) { |
284 | self.cleaned_up.store(true, atomic::Ordering::SeqCst); |
285 | } |
286 | |
287 | fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> { |
288 | let handle = thread::spawn(move || func.call()); |
289 | Ok(Some(TestHandle(handle))) |
290 | } |
291 | } |
292 | |
293 | pub struct TestHandle(thread::JoinHandle<()>); |
294 | |
295 | impl TaskHandle for TestHandle { |
296 | fn join(self) { |
297 | self.0.join().unwrap(); |
298 | } |
299 | } |
300 | } |
301 | |
302 | glib::wrapper! { |
303 | pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object; |
304 | } |
305 | |
306 | unsafe impl Send for TestPool {} |
307 | unsafe impl Sync for TestPool {} |
308 | |
309 | impl TestPool { |
310 | pub fn new() -> Self { |
311 | Self::default() |
312 | } |
313 | } |
314 | |
315 | impl Default for TestPool { |
316 | fn default() -> Self { |
317 | glib::Object::new() |
318 | } |
319 | } |
320 | |
321 | #[test ] |
322 | fn test_simple_subclass() { |
323 | crate::init().unwrap(); |
324 | |
325 | let pool = TestPool::new(); |
326 | pool.prepare().unwrap(); |
327 | |
328 | let (sender, receiver) = channel(); |
329 | |
330 | let handle = pool |
331 | .push(move || { |
332 | sender.send(()).unwrap(); |
333 | }) |
334 | .unwrap(); |
335 | let handle = handle.unwrap(); |
336 | |
337 | assert_eq!(receiver.recv(), Ok(())); |
338 | |
339 | handle.join(); |
340 | assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); |
341 | |
342 | pool.cleanup(); |
343 | |
344 | let imp = pool.imp(); |
345 | assert!(imp.prepared.load(atomic::Ordering::SeqCst)); |
346 | assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst)); |
347 | } |
348 | } |
349 | |