1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{future::Future, panic, ptr};
4
5use futures_channel::oneshot;
6
7use crate::translate::*;
8
9#[derive(Debug)]
10#[doc(alias = "GThreadPool")]
11pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
12
13unsafe impl Send for ThreadPool {}
14unsafe impl Sync for ThreadPool {}
15
16// rustdoc-stripper-ignore-next
17/// A handle to a thread running on a [`ThreadPool`].
18///
19/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
20/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
21/// allowing it to complete but discarding the return value.
22#[derive(Debug)]
23pub struct ThreadHandle<T> {
24 rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
25}
26
27impl<T> ThreadHandle<T> {
28 // rustdoc-stripper-ignore-next
29 /// Waits for the associated thread to finish.
30 ///
31 /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
32 /// thread, or `Err` if the thread panicked. This function will return immediately if the
33 /// associated thread has already finished.
34 #[inline]
35 pub fn join(self) -> std::thread::Result<T> {
36 self.rx.recv().unwrap()
37 }
38}
39
40impl ThreadPool {
41 #[doc(alias = "g_thread_pool_new")]
42 pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
43 unsafe {
44 let mut err = ptr::null_mut();
45 let pool = ffi::g_thread_pool_new(
46 Some(spawn_func),
47 ptr::null_mut(),
48 max_threads.map(|v| v as i32).unwrap_or(-1),
49 ffi::GFALSE,
50 &mut err,
51 );
52 if pool.is_null() {
53 Err(from_glib_full(err))
54 } else {
55 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
56 }
57 }
58 }
59
60 #[doc(alias = "g_thread_pool_new")]
61 pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
62 unsafe {
63 let mut err = ptr::null_mut();
64 let pool = ffi::g_thread_pool_new(
65 Some(spawn_func),
66 ptr::null_mut(),
67 max_threads as i32,
68 ffi::GTRUE,
69 &mut err,
70 );
71 if pool.is_null() {
72 Err(from_glib_full(err))
73 } else {
74 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
75 }
76 }
77 }
78
79 #[doc(alias = "g_thread_pool_push")]
80 pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
81 &self,
82 func: F,
83 ) -> Result<ThreadHandle<T>, crate::Error> {
84 let (tx, rx) = std::sync::mpsc::sync_channel(1);
85 unsafe {
86 let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
87 let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
88 });
89 let func = Box::new(func);
90 let mut err = ptr::null_mut();
91
92 let func = Box::into_raw(func);
93 let ret: bool = from_glib(ffi::g_thread_pool_push(
94 self.0.as_ptr(),
95 func as *mut _,
96 &mut err,
97 ));
98 if ret {
99 Ok(ThreadHandle { rx })
100 } else {
101 let _ = Box::from_raw(func);
102 Err(from_glib_full(err))
103 }
104 }
105 }
106
107 pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
108 &self,
109 func: F,
110 ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
111 {
112 let (sender, receiver) = oneshot::channel();
113
114 self.push(move || {
115 let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
116 })?;
117
118 Ok(async move { receiver.await.expect("Dropped before executing") })
119 }
120
121 #[doc(alias = "g_thread_pool_set_max_threads")]
122 pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
123 unsafe {
124 let mut err = ptr::null_mut();
125 let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
126 self.0.as_ptr(),
127 max_threads.map(|v| v as i32).unwrap_or(-1),
128 &mut err,
129 ));
130 if ret {
131 Ok(())
132 } else {
133 Err(from_glib_full(err))
134 }
135 }
136 }
137
138 #[doc(alias = "g_thread_pool_get_max_threads")]
139 #[doc(alias = "get_max_threads")]
140 pub fn max_threads(&self) -> Option<u32> {
141 unsafe {
142 let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
143 if max_threads == -1 {
144 None
145 } else {
146 Some(max_threads as u32)
147 }
148 }
149 }
150
151 #[doc(alias = "g_thread_pool_get_num_threads")]
152 #[doc(alias = "get_num_threads")]
153 pub fn num_threads(&self) -> u32 {
154 unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
155 }
156
157 #[doc(alias = "g_thread_pool_unprocessed")]
158 #[doc(alias = "get_unprocessed")]
159 pub fn unprocessed(&self) -> u32 {
160 unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
161 }
162
163 #[doc(alias = "g_thread_pool_set_max_unused_threads")]
164 pub fn set_max_unused_threads(max_threads: Option<u32>) {
165 unsafe {
166 ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
167 }
168 }
169
170 #[doc(alias = "g_thread_pool_get_max_unused_threads")]
171 #[doc(alias = "get_max_unused_threads")]
172 pub fn max_unused_threads() -> Option<u32> {
173 unsafe {
174 let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
175 if max_unused_threads == -1 {
176 None
177 } else {
178 Some(max_unused_threads as u32)
179 }
180 }
181 }
182
183 #[doc(alias = "g_thread_pool_get_num_unused_threads")]
184 #[doc(alias = "get_num_unused_threads")]
185 pub fn num_unused_threads() -> u32 {
186 unsafe { ffi::g_thread_pool_get_num_unused_threads() }
187 }
188
189 #[doc(alias = "g_thread_pool_stop_unused_threads")]
190 pub fn stop_unused_threads() {
191 unsafe {
192 ffi::g_thread_pool_stop_unused_threads();
193 }
194 }
195
196 #[doc(alias = "g_thread_pool_set_max_idle_time")]
197 pub fn set_max_idle_time(max_idle_time: u32) {
198 unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
199 }
200
201 #[doc(alias = "g_thread_pool_get_max_idle_time")]
202 #[doc(alias = "get_max_idle_time")]
203 pub fn max_idle_time() -> u32 {
204 unsafe { ffi::g_thread_pool_get_max_idle_time() }
205 }
206}
207
208impl Drop for ThreadPool {
209 #[inline]
210 fn drop(&mut self) {
211 unsafe {
212 ffi::g_thread_pool_free(self.0.as_ptr(), immediate:ffi::GFALSE, wait_:ffi::GTRUE);
213 }
214 }
215}
216
217unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
218 let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
219 func()
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn test_push() {
228 use std::sync::mpsc;
229
230 let p = ThreadPool::exclusive(1).unwrap();
231 let (sender, receiver) = mpsc::channel();
232
233 let handle = p
234 .push(move || {
235 sender.send(true).unwrap();
236 123
237 })
238 .unwrap();
239
240 assert_eq!(handle.join().unwrap(), 123);
241 assert_eq!(receiver.recv(), Ok(true));
242 }
243
244 #[test]
245 fn test_push_future() {
246 let c = crate::MainContext::new();
247 let p = ThreadPool::shared(None).unwrap();
248
249 let fut = p.push_future(|| true).unwrap();
250
251 let res = c.block_on(fut);
252 assert!(res.unwrap());
253 }
254}
255