1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::TaskPool;
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10 let func: Box

= Box::from_raw(data as *mut P);

11 func()
12}
13mod sealed {
14 pub trait Sealed {}
15 impl<T: super::IsA<super::TaskPool>> Sealed for T {}
16}
17
18pub trait TaskPoolExtManual: sealed::Sealed + IsA<TaskPool> + 'static {
19 #[doc(alias = "gst_task_pool_push")]
20 fn push<P: FnOnce() + Send + 'static>(
21 &self,
22 func: P,
23 ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
24 unsafe {
25 let mut error = ptr::null_mut();
26 let func: Box<P> = Box::new(func);
27 let func = Box::into_raw(func);
28
29 let handle = ffi::gst_task_pool_push(
30 self.as_ref().to_glib_none().0,
31 Some(task_pool_trampoline::<P>),
32 func as gpointer,
33 &mut error,
34 );
35
36 if !error.is_null() {
37 debug_assert!(handle.is_null());
38
39 // Assume that task_pool_trampoline was
40 // not called and will not be called
41 drop(Box::from_raw(func));
42
43 return Err(from_glib_full(error));
44 }
45
46 let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
47 handle,
48 task_pool: Some(self.as_ref().clone()),
49 });
50
51 Ok(handle)
52 }
53 }
54}
55
56impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
57
58impl TaskPool {
59 unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
60 ffi::gst_task_pool_join(self.to_glib_none().0, id:id.as_ptr())
61 }
62
63 #[cfg(feature = "v1_20")]
64 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
65 unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
66 ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
67 }
68}
69
70// rustdoc-stripper-ignore-next
71/// A handle for a task which was pushed to a task pool.
72pub trait TaskHandle {
73 // rustdoc-stripper-ignore-next
74 /// Wait for the task to complete.
75 fn join(self);
76}
77
78impl TaskHandle for () {
79 fn join(self) {}
80}
81
82impl TaskHandle for std::convert::Infallible {
83 fn join(self) {}
84}
85
86// rustdoc-stripper-ignore-next
87/// An opaque handle for a task associated with a particular task pool.
88///
89/// Keeps a reference to the pool alive.
90///
91/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is
92/// dropped. Otherwise, needs to be `join`ed to avoid a leak.
93#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
94#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
95pub struct TaskPoolTaskHandle {
96 handle: ptr::NonNull<libc::c_void>,
97 task_pool: Option<TaskPool>,
98}
99
100impl TaskHandle for TaskPoolTaskHandle {
101 #[doc(alias = "gst_task_pool_join")]
102 fn join(mut self) {
103 let task_pool: TaskPool = self.task_pool.take().unwrap();
104 unsafe { task_pool.join(self.handle) }
105 }
106}
107
108impl Drop for TaskPoolTaskHandle {
109 #[doc(alias = "gst_task_pool_dispose_handle")]
110 #[inline]
111 fn drop(&mut self) {
112 if let Some(task_pool: TaskPool) = self.task_pool.take() {
113 cfg_if::cfg_if! {
114 if #[cfg(feature = "v1_20")] {
115 unsafe { task_pool.dispose_handle(self.handle) }
116 } else {
117 crate::warning!(crate::CAT_RUST, obj: &task_pool, "Leaked task handle");
118 }
119 }
120 }
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use std::sync::mpsc::{channel, RecvError};
127
128 use super::*;
129 use crate::prelude::*;
130
131 #[test]
132 fn test_simple() {
133 crate::init().unwrap();
134 let pool = TaskPool::new();
135 pool.prepare().unwrap();
136
137 let (sender, receiver) = channel();
138
139 let handle = pool
140 .push(move || {
141 sender.send(()).unwrap();
142 })
143 .unwrap();
144
145 assert_eq!(receiver.recv(), Ok(()));
146
147 if let Some(handle) = handle {
148 handle.join();
149 }
150
151 // Can't test try_recv here as the default task pool produces no
152 // handles and thus no way to wait for channel destruction
153 assert_eq!(receiver.recv(), Err(RecvError));
154
155 pool.cleanup();
156 }
157}
158