1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{ |
4 | hash::{Hash, Hasher}, |
5 | ptr, |
6 | sync::{Arc, Mutex}, |
7 | }; |
8 | |
9 | use glib::{ffi::gpointer, subclass::prelude::*, translate::*}; |
10 | |
11 | use super::prelude::*; |
12 | use crate::{TaskHandle, TaskPool}; |
13 | |
14 | pub trait TaskPoolImpl: GstObjectImpl + Send + Sync { |
15 | // rustdoc-stripper-ignore-next |
16 | /// Handle to be returned from the `push` function to allow the caller to wait for the task's |
17 | /// completion. |
18 | /// |
19 | /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle |
20 | /// that does nothing on `join` or drop. |
21 | type Handle: TaskHandle; |
22 | |
23 | // rustdoc-stripper-ignore-next |
24 | /// Prepare the task pool to accept tasks. |
25 | /// |
26 | /// This defaults to doing nothing. |
27 | fn prepare(&self) -> Result<(), glib::Error> { |
28 | Ok(()) |
29 | } |
30 | |
31 | // rustdoc-stripper-ignore-next |
32 | /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped. |
33 | /// |
34 | /// This is mainly used internally to ensure proper cleanup of internal data structures in test |
35 | /// suites. |
36 | fn cleanup(&self) {} |
37 | |
38 | // rustdoc-stripper-ignore-next |
39 | /// Deliver a task to the pool. |
40 | /// |
41 | /// If returning `Ok`, you need to call the `func` eventually. |
42 | /// |
43 | /// If returning `Err`, the `func` must be dropped without calling it. |
44 | fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>; |
45 | } |
46 | |
47 | unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool { |
48 | fn class_init(klass: &mut glib::Class<Self>) { |
49 | Self::parent_class_init::<T>(class:klass); |
50 | let klass: &mut GstTaskPoolClass = klass.as_mut(); |
51 | klass.prepare = Some(task_pool_prepare::<T>); |
52 | klass.cleanup = Some(task_pool_cleanup::<T>); |
53 | klass.push = Some(task_pool_push::<T>); |
54 | klass.join = Some(task_pool_join::<T>); |
55 | |
56 | #[cfg (feature = "v1_20" )] |
57 | { |
58 | klass.dispose_handle = Some(task_pool_dispose_handle::<T>); |
59 | } |
60 | } |
61 | } |
62 | |
63 | unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>( |
64 | ptr: *mut ffi::GstTaskPool, |
65 | error: *mut *mut glib::ffi::GError, |
66 | ) { |
67 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
68 | let imp: &T = instance.imp(); |
69 | |
70 | match imp.prepare() { |
71 | Ok(()) => {} |
72 | Err(err: Error) => { |
73 | if !error.is_null() { |
74 | *error = err.into_glib_ptr(); |
75 | } |
76 | } |
77 | } |
78 | } |
79 | |
80 | unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) { |
81 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
82 | let imp: &T = instance.imp(); |
83 | |
84 | imp.cleanup(); |
85 | } |
86 | |
87 | unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>( |
88 | ptr: *mut ffi::GstTaskPool, |
89 | func: ffi::GstTaskPoolFunction, |
90 | user_data: gpointer, |
91 | error: *mut *mut glib::ffi::GError, |
92 | ) -> gpointer { |
93 | let instance: &::Instance = &*(ptr as *mut T::Instance); |
94 | let imp: &T = instance.imp(); |
95 | |
96 | let func: TaskPoolFunction = TaskPoolFunction::new(func:func.expect(msg:"Tried to push null func" ), user_data); |
97 | |
98 | match imp.push(func.clone()) { |
99 | Ok(None) => ptr::null_mut(), |
100 | Ok(Some(handle: ::Handle)) => Box::into_raw(Box::new(handle)) as gpointer, |
101 | Err(err: Error) => { |
102 | func.prevent_call(); |
103 | if !error.is_null() { |
104 | *error = err.into_glib_ptr(); |
105 | } |
106 | ptr::null_mut() |
107 | } |
108 | } |
109 | } |
110 | |
111 | unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) { |
112 | if id.is_null() { |
113 | let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr); |
114 | crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to join null handle" ); |
115 | return; |
116 | } |
117 | |
118 | let handle: Box<::Handle> = Box::from_raw(id as *mut T::Handle); |
119 | handle.join(); |
120 | } |
121 | |
122 | #[cfg (feature = "v1_20" )] |
123 | #[cfg_attr (docsrs, doc(cfg(feature = "v1_20" )))] |
124 | unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>( |
125 | ptr: *mut ffi::GstTaskPool, |
126 | id: gpointer, |
127 | ) { |
128 | if id.is_null() { |
129 | let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr); |
130 | crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to dispose null handle" ); |
131 | return; |
132 | } |
133 | |
134 | let handle = Box::from_raw(id as *mut T::Handle); |
135 | drop(handle); |
136 | } |
137 | |
138 | // rustdoc-stripper-ignore-next |
139 | /// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push). |
140 | #[derive (Debug)] |
141 | pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>); |
142 | |
143 | // `Arc<Mutex<Option<…>>>` is required so that we can enforce that the function |
144 | // has not been called and will never be called after `push` returns `Err`. |
145 | |
146 | #[derive (Debug)] |
147 | struct TaskPoolFunctionInner { |
148 | func: unsafe extern "C" fn(gpointer), |
149 | user_data: gpointer, |
150 | warn_on_drop: bool, |
151 | } |
152 | |
153 | unsafe impl Send for TaskPoolFunctionInner {} |
154 | |
155 | impl TaskPoolFunction { |
156 | fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self { |
157 | let inner = TaskPoolFunctionInner { |
158 | func, |
159 | user_data, |
160 | warn_on_drop: true, |
161 | }; |
162 | Self(Arc::new(Mutex::new(Some(inner)))) |
163 | } |
164 | |
165 | #[inline ] |
166 | fn clone(&self) -> Self { |
167 | Self(self.0.clone()) |
168 | } |
169 | |
170 | // rustdoc-stripper-ignore-next |
171 | /// Consume and execute the function. |
172 | pub fn call(self) { |
173 | let mut inner = self |
174 | .0 |
175 | .lock() |
176 | .unwrap() |
177 | .take() |
178 | .expect("TaskPoolFunction has already been dropped" ); |
179 | inner.warn_on_drop = false; |
180 | unsafe { (inner.func)(inner.user_data) } |
181 | } |
182 | |
183 | fn prevent_call(self) { |
184 | let mut inner = self |
185 | .0 |
186 | .lock() |
187 | .unwrap() |
188 | .take() |
189 | .expect("TaskPoolFunction has already been called" ); |
190 | inner.warn_on_drop = false; |
191 | drop(inner); |
192 | } |
193 | |
194 | #[inline ] |
195 | fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> { |
196 | Arc::as_ptr(&self.0) |
197 | } |
198 | } |
199 | |
200 | impl Drop for TaskPoolFunctionInner { |
201 | fn drop(&mut self) { |
202 | if self.warn_on_drop { |
203 | crate::warning!(crate::CAT_RUST, "Leaked task function" ); |
204 | } |
205 | } |
206 | } |
207 | |
208 | impl PartialEq for TaskPoolFunction { |
209 | fn eq(&self, other: &Self) -> bool { |
210 | self.as_ptr() == other.as_ptr() |
211 | } |
212 | } |
213 | |
214 | impl Eq for TaskPoolFunction {} |
215 | |
216 | impl PartialOrd for TaskPoolFunction { |
217 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { |
218 | Some(self.cmp(other)) |
219 | } |
220 | } |
221 | |
222 | impl Ord for TaskPoolFunction { |
223 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { |
224 | self.as_ptr().cmp(&other.as_ptr()) |
225 | } |
226 | } |
227 | |
228 | impl Hash for TaskPoolFunction { |
229 | fn hash<H: Hasher>(&self, state: &mut H) { |
230 | self.as_ptr().hash(state) |
231 | } |
232 | } |
233 | |
234 | #[cfg (test)] |
235 | mod tests { |
236 | use std::{ |
237 | sync::{ |
238 | atomic, |
239 | mpsc::{channel, TryRecvError}, |
240 | }, |
241 | thread, |
242 | }; |
243 | |
244 | use super::*; |
245 | use crate::prelude::*; |
246 | |
247 | pub mod imp { |
248 | use super::*; |
249 | |
250 | #[derive (Default)] |
251 | pub struct TestPool { |
252 | pub(super) prepared: atomic::AtomicBool, |
253 | pub(super) cleaned_up: atomic::AtomicBool, |
254 | } |
255 | |
256 | #[glib::object_subclass ] |
257 | impl ObjectSubclass for TestPool { |
258 | const NAME: &'static str = "TestPool" ; |
259 | type Type = super::TestPool; |
260 | type ParentType = TaskPool; |
261 | } |
262 | |
263 | impl ObjectImpl for TestPool {} |
264 | |
265 | impl GstObjectImpl for TestPool {} |
266 | |
267 | impl TaskPoolImpl for TestPool { |
268 | type Handle = TestHandle; |
269 | |
270 | fn prepare(&self) -> Result<(), glib::Error> { |
271 | self.prepared.store(true, atomic::Ordering::SeqCst); |
272 | Ok(()) |
273 | } |
274 | |
275 | fn cleanup(&self) { |
276 | self.cleaned_up.store(true, atomic::Ordering::SeqCst); |
277 | } |
278 | |
279 | fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> { |
280 | let handle = thread::spawn(move || func.call()); |
281 | Ok(Some(TestHandle(handle))) |
282 | } |
283 | } |
284 | |
285 | pub struct TestHandle(thread::JoinHandle<()>); |
286 | |
287 | impl TaskHandle for TestHandle { |
288 | fn join(self) { |
289 | self.0.join().unwrap(); |
290 | } |
291 | } |
292 | } |
293 | |
294 | glib::wrapper! { |
295 | pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object; |
296 | } |
297 | |
298 | unsafe impl Send for TestPool {} |
299 | unsafe impl Sync for TestPool {} |
300 | |
301 | impl TestPool { |
302 | pub fn new() -> Self { |
303 | Self::default() |
304 | } |
305 | } |
306 | |
307 | impl Default for TestPool { |
308 | fn default() -> Self { |
309 | glib::Object::new() |
310 | } |
311 | } |
312 | |
313 | #[test ] |
314 | fn test_simple_subclass() { |
315 | crate::init().unwrap(); |
316 | |
317 | let pool = TestPool::new(); |
318 | pool.prepare().unwrap(); |
319 | |
320 | let (sender, receiver) = channel(); |
321 | |
322 | let handle = pool |
323 | .push(move || { |
324 | sender.send(()).unwrap(); |
325 | }) |
326 | .unwrap(); |
327 | let handle = handle.unwrap(); |
328 | |
329 | assert_eq!(receiver.recv(), Ok(())); |
330 | |
331 | handle.join(); |
332 | assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); |
333 | |
334 | pool.cleanup(); |
335 | |
336 | let imp = pool.imp(); |
337 | assert!(imp.prepared.load(atomic::Ordering::SeqCst)); |
338 | assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst)); |
339 | } |
340 | } |
341 | |