1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{
6 subclass::{prelude::*, InitializingObject},
7 translate::*,
8 Cast, StaticType,
9};
10use libc::c_char;
11
12use super::prelude::*;
13use crate::{BufferPool, BufferPoolAcquireParams, BufferPoolConfigRef};
14
15pub trait BufferPoolImpl: BufferPoolImplExt + GstObjectImpl + Send + Sync {
16 fn acquire_buffer(
17 &self,
18 params: Option<&BufferPoolAcquireParams>,
19 ) -> Result<crate::Buffer, crate::FlowError> {
20 self.parent_acquire_buffer(params)
21 }
22
23 fn alloc_buffer(
24 &self,
25 params: Option<&BufferPoolAcquireParams>,
26 ) -> Result<crate::Buffer, crate::FlowError> {
27 self.parent_alloc_buffer(params)
28 }
29
30 fn flush_start(&self) {
31 self.parent_flush_start()
32 }
33
34 fn flush_stop(&self) {
35 self.parent_flush_stop()
36 }
37
38 fn free_buffer(&self, buffer: crate::Buffer) {
39 self.parent_free_buffer(buffer)
40 }
41
42 fn release_buffer(&self, buffer: crate::Buffer) {
43 self.parent_release_buffer(buffer)
44 }
45
46 fn reset_buffer(&self, buffer: &mut crate::BufferRef) {
47 self.parent_reset_buffer(buffer)
48 }
49
50 fn start(&self) -> bool {
51 self.parent_start()
52 }
53
54 fn stop(&self) -> bool {
55 self.parent_stop()
56 }
57
58 fn options() -> &'static [&'static str] {
59 &[]
60 }
61
62 fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
63 self.parent_set_config(config)
64 }
65}
66
67mod sealed {
68 pub trait Sealed {}
69 impl<T: super::BufferPoolImplExt> Sealed for T {}
70}
71
72pub trait BufferPoolImplExt: sealed::Sealed + ObjectSubclass {
73 fn parent_acquire_buffer(
74 &self,
75 params: Option<&BufferPoolAcquireParams>,
76 ) -> Result<crate::Buffer, crate::FlowError> {
77 unsafe {
78 let data = Self::type_data();
79 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
80 if let Some(f) = (*parent_class).acquire_buffer {
81 let params_ptr = mut_override(params.to_glib_none().0);
82 let mut buffer = std::ptr::null_mut();
83
84 let result = f(
85 self.obj()
86 .unsafe_cast_ref::<crate::BufferPool>()
87 .to_glib_none()
88 .0,
89 &mut buffer,
90 params_ptr,
91 );
92
93 crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
94 } else {
95 Err(crate::FlowError::NotSupported)
96 }
97 }
98 }
99
100 fn parent_alloc_buffer(
101 &self,
102 params: Option<&BufferPoolAcquireParams>,
103 ) -> Result<crate::Buffer, crate::FlowError> {
104 unsafe {
105 let data = Self::type_data();
106 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
107 if let Some(f) = (*parent_class).alloc_buffer {
108 let params_ptr = mut_override(params.to_glib_none().0);
109 let mut buffer = std::ptr::null_mut();
110
111 let result = f(
112 self.obj()
113 .unsafe_cast_ref::<crate::BufferPool>()
114 .to_glib_none()
115 .0,
116 &mut buffer,
117 params_ptr,
118 );
119
120 crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
121 } else {
122 Err(crate::FlowError::NotSupported)
123 }
124 }
125 }
126
127 fn parent_free_buffer(&self, buffer: crate::Buffer) {
128 unsafe {
129 let data = Self::type_data();
130 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
131 if let Some(f) = (*parent_class).free_buffer {
132 f(
133 self.obj()
134 .unsafe_cast_ref::<crate::BufferPool>()
135 .to_glib_none()
136 .0,
137 buffer.into_glib_ptr(),
138 )
139 }
140 }
141 }
142
143 fn parent_release_buffer(&self, buffer: crate::Buffer) {
144 unsafe {
145 let data = Self::type_data();
146 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
147 if let Some(f) = (*parent_class).release_buffer {
148 f(
149 self.obj()
150 .unsafe_cast_ref::<crate::BufferPool>()
151 .to_glib_none()
152 .0,
153 buffer.into_glib_ptr(),
154 )
155 }
156 }
157 }
158
159 fn parent_reset_buffer(&self, buffer: &mut crate::BufferRef) {
160 unsafe {
161 let data = Self::type_data();
162 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
163 if let Some(f) = (*parent_class).reset_buffer {
164 f(
165 self.obj()
166 .unsafe_cast_ref::<crate::BufferPool>()
167 .to_glib_none()
168 .0,
169 buffer.as_mut_ptr(),
170 )
171 }
172 }
173 }
174
175 fn parent_start(&self) -> bool {
176 unsafe {
177 let data = Self::type_data();
178 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
179 if let Some(f) = (*parent_class).start {
180 let result = f(self
181 .obj()
182 .unsafe_cast_ref::<crate::BufferPool>()
183 .to_glib_none()
184 .0);
185
186 from_glib(result)
187 } else {
188 true
189 }
190 }
191 }
192
193 fn parent_stop(&self) -> bool {
194 unsafe {
195 let data = Self::type_data();
196 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
197 if let Some(f) = (*parent_class).stop {
198 let result = f(self
199 .obj()
200 .unsafe_cast_ref::<crate::BufferPool>()
201 .to_glib_none()
202 .0);
203
204 from_glib(result)
205 } else {
206 true
207 }
208 }
209 }
210
211 fn parent_set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
212 unsafe {
213 let data = Self::type_data();
214 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
215 if let Some(f) = (*parent_class).set_config {
216 let result = f(
217 self.obj()
218 .unsafe_cast_ref::<crate::BufferPool>()
219 .to_glib_none()
220 .0,
221 (*config).as_mut_ptr(),
222 );
223
224 from_glib(result)
225 } else {
226 false
227 }
228 }
229 }
230
231 fn parent_flush_start(&self) {
232 unsafe {
233 let data = Self::type_data();
234 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
235 if let Some(f) = (*parent_class).flush_start {
236 f(self
237 .obj()
238 .unsafe_cast_ref::<crate::BufferPool>()
239 .to_glib_none()
240 .0)
241 }
242 }
243 }
244
245 fn parent_flush_stop(&self) {
246 unsafe {
247 let data = Self::type_data();
248 let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
249 if let Some(f) = (*parent_class).flush_stop {
250 f(self
251 .obj()
252 .unsafe_cast_ref::<crate::BufferPool>()
253 .to_glib_none()
254 .0)
255 }
256 }
257 }
258}
259
260impl<T: BufferPoolImpl> BufferPoolImplExt for T {}
261
262unsafe impl<T: BufferPoolImpl> IsSubclassable<T> for BufferPool {
263 fn class_init(klass: &mut glib::Class<Self>) {
264 Self::parent_class_init::<T>(klass);
265 let klass = klass.as_mut();
266 klass.acquire_buffer = Some(buffer_pool_acquire_buffer::<T>);
267 klass.alloc_buffer = Some(buffer_pool_alloc_buffer::<T>);
268 klass.release_buffer = Some(buffer_pool_release_buffer::<T>);
269 klass.reset_buffer = Some(buffer_pool_reset_buffer::<T>);
270 klass.start = Some(buffer_pool_start::<T>);
271 klass.stop = Some(buffer_pool_stop::<T>);
272 klass.get_options = Some(buffer_pool_get_options::<T>);
273 klass.set_config = Some(buffer_pool_set_config::<T>);
274 klass.flush_start = Some(buffer_pool_flush_start::<T>);
275 klass.flush_stop = Some(buffer_pool_flush_stop::<T>);
276 klass.free_buffer = Some(buffer_pool_free_buffer::<T>);
277 }
278
279 fn instance_init(instance: &mut InitializingObject<T>) {
280 Self::parent_instance_init(instance);
281
282 // Store the pool options in the instance data
283 // for later retrieval in buffer_pool_get_options
284 let options = T::options();
285 instance.set_instance_data(T::type_(), glib::StrV::from(options));
286 }
287}
288
289unsafe extern "C" fn buffer_pool_acquire_buffer<T: BufferPoolImpl>(
290 ptr: *mut ffi::GstBufferPool,
291 buffer: *mut *mut ffi::GstBuffer,
292 params: *mut ffi::GstBufferPoolAcquireParams,
293) -> ffi::GstFlowReturn {
294 let instance: &::Instance = &*(ptr as *mut T::Instance);
295 let imp: &T = instance.imp();
296 let params: Option<BufferPoolAcquireParams> = from_glib_none(ptr:params);
297
298 match imp.acquire_buffer(params.as_ref()) {
299 Ok(b: Buffer) => {
300 *buffer = b.into_glib_ptr();
301 ffi::GST_FLOW_OK
302 }
303 Err(err: FlowError) => err.into_glib(),
304 }
305}
306
307unsafe extern "C" fn buffer_pool_alloc_buffer<T: BufferPoolImpl>(
308 ptr: *mut ffi::GstBufferPool,
309 buffer: *mut *mut ffi::GstBuffer,
310 params: *mut ffi::GstBufferPoolAcquireParams,
311) -> ffi::GstFlowReturn {
312 let instance: &::Instance = &*(ptr as *mut T::Instance);
313 let imp: &T = instance.imp();
314 let params: Option<BufferPoolAcquireParams> = from_glib_none(ptr:params);
315
316 match imp.alloc_buffer(params.as_ref()) {
317 Ok(b: Buffer) => {
318 *buffer = b.into_glib_ptr();
319 ffi::GST_FLOW_OK
320 }
321 Err(err: FlowError) => err.into_glib(),
322 }
323}
324
325unsafe extern "C" fn buffer_pool_flush_start<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
326 // the GstBufferPool implementation calls this
327 // in finalize where the ref_count will already
328 // be zero and we are actually destroyed
329 // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
330 if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
331 // flush_start is a no-op in GstBufferPool
332 return;
333 }
334
335 let instance: &::Instance = &*(ptr as *mut T::Instance);
336 let imp: &T = instance.imp();
337 imp.flush_start();
338}
339
340unsafe extern "C" fn buffer_pool_flush_stop<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
341 // the GstBufferPool implementation calls this
342 // in finalize where the ref_count will already
343 // be zero and we are actually destroyed
344 // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
345 if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
346 // flush_stop is a no-op in GstBufferPool
347 return;
348 }
349
350 let instance: &::Instance = &*(ptr as *mut T::Instance);
351 let imp: &T = instance.imp();
352 imp.flush_stop();
353}
354
355unsafe extern "C" fn buffer_pool_free_buffer<T: BufferPoolImpl>(
356 ptr: *mut ffi::GstBufferPool,
357 buffer: *mut ffi::GstBuffer,
358) {
359 // the GstBufferPool implementation calls this
360 // in finalize where the ref_count will already
361 // be zero and we are actually destroyed
362 // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
363 if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
364 // As a workaround we call free_buffer directly on the
365 // GstBufferPool to prevent leaking the buffer
366 // This will NOT call free_buffer on a subclass.
367 let pool_class: ClassRef<'_, BufferPool> =
368 glib::Class::<crate::BufferPool>::from_type(type_:crate::BufferPool::static_type()).unwrap();
369 let pool_class: &GstBufferPoolClass = pool_class.as_ref();
370 if let Some(f: unsafe fn(*mut GstBufferPool, …)) = pool_class.free_buffer {
371 f(ptr, buffer)
372 }
373 return;
374 }
375
376 let instance: &::Instance = &*(ptr as *mut T::Instance);
377 let imp: &T = instance.imp();
378 imp.free_buffer(from_glib_full(ptr:buffer));
379}
380
381unsafe extern "C" fn buffer_pool_release_buffer<T: BufferPoolImpl>(
382 ptr: *mut ffi::GstBufferPool,
383 buffer: *mut ffi::GstBuffer,
384) {
385 let instance: &::Instance = &*(ptr as *mut T::Instance);
386 let imp: &T = instance.imp();
387 imp.release_buffer(from_glib_full(ptr:buffer));
388}
389
390unsafe extern "C" fn buffer_pool_reset_buffer<T: BufferPoolImpl>(
391 ptr: *mut ffi::GstBufferPool,
392 buffer: *mut ffi::GstBuffer,
393) {
394 let instance: &::Instance = &*(ptr as *mut T::Instance);
395 let imp: &T = instance.imp();
396 imp.reset_buffer(crate::BufferRef::from_mut_ptr(buffer));
397}
398
399unsafe extern "C" fn buffer_pool_start<T: BufferPoolImpl>(
400 ptr: *mut ffi::GstBufferPool,
401) -> glib::ffi::gboolean {
402 let instance: &::Instance = &*(ptr as *mut T::Instance);
403 let imp: &T = instance.imp();
404 imp.start().into_glib()
405}
406
407unsafe extern "C" fn buffer_pool_stop<T: BufferPoolImpl>(
408 ptr: *mut ffi::GstBufferPool,
409) -> glib::ffi::gboolean {
410 // the GstBufferPool implementation calls this
411 // in finalize where the ref_count will already
412 // be zero and we are actually destroyed
413 // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
414 if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
415 // As a workaround we call stop directly on the GstBufferPool
416 // This is needed because the default implementation clears
417 // the pool in stop.
418 let pool_class =
419 glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
420 let pool_class = pool_class.as_ref();
421 let result = if let Some(f) = pool_class.stop {
422 f(ptr)
423 } else {
424 true.into_glib()
425 };
426
427 return result;
428 }
429
430 let instance = &*(ptr as *mut T::Instance);
431 let imp = instance.imp();
432 imp.stop().into_glib()
433}
434
435unsafe extern "C" fn buffer_pool_get_options<T: BufferPoolImpl>(
436 ptr: *mut ffi::GstBufferPool,
437) -> *mut *const c_char {
438 let instance: &::Instance = &*(ptr as *mut T::Instance);
439 let imp: &T = instance.imp();
440 T::instance_data::<glib::StrV>(imp, T::type_())
441 .map(|p| p.as_ptr() as *mut *const _)
442 .unwrap_or(default:ptr::null_mut())
443}
444
445unsafe extern "C" fn buffer_pool_set_config<T: BufferPoolImpl>(
446 ptr: *mut ffi::GstBufferPool,
447 config: *mut ffi::GstStructure,
448) -> glib::ffi::gboolean {
449 let instance: &::Instance = &*(ptr as *mut T::Instance);
450 let imp: &T = instance.imp();
451 impbool.set_config(BufferPoolConfigRef::from_glib_borrow_mut(ptr:config))
452 .into_glib()
453}
454
455#[cfg(test)]
456mod tests {
457 use std::sync::{
458 atomic::{AtomicBool, Ordering},
459 Arc,
460 };
461
462 use super::*;
463 use crate::prelude::*;
464
465 pub mod imp {
466 use super::*;
467
468 #[derive(Default)]
469 pub struct TestBufferPool;
470
471 impl ObjectImpl for TestBufferPool {}
472 impl GstObjectImpl for TestBufferPool {}
473 impl BufferPoolImpl for TestBufferPool {
474 fn options() -> &'static [&'static str] {
475 &["TEST_OPTION"]
476 }
477
478 fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
479 let (caps, size, min_buffers, max_buffers) = config.params().unwrap();
480 config.set_params(caps.as_ref(), size * 2, min_buffers, max_buffers);
481 self.parent_set_config(config)
482 }
483 }
484
485 #[glib::object_subclass]
486 impl ObjectSubclass for TestBufferPool {
487 const NAME: &'static str = "TestBufferPool";
488 type Type = super::TestBufferPool;
489 type ParentType = BufferPool;
490 }
491 }
492
493 glib::wrapper! {
494 pub struct TestBufferPool(ObjectSubclass<imp::TestBufferPool>) @extends BufferPool, crate::Object;
495 }
496
497 impl Default for TestBufferPool {
498 fn default() -> Self {
499 glib::Object::new()
500 }
501 }
502
503 #[test]
504 fn test_pool_options() {
505 crate::init().unwrap();
506 let pool = TestBufferPool::default();
507 assert_eq!(pool.options(), vec!["TEST_OPTION"]);
508 }
509
510 #[test]
511 fn test_pool_acquire() {
512 crate::init().unwrap();
513 let pool = TestBufferPool::default();
514 let mut config = pool.config();
515 config.set_params(None, 1024, 1, 1);
516 pool.set_config(config).expect("failed to set pool config");
517 pool.set_active(true).expect("failed to activate pool");
518 let buffer = pool
519 .acquire_buffer(None)
520 .expect("failed to acquire buffer from pool");
521 assert_eq!(buffer.size(), 2048);
522 }
523
524 #[test]
525 fn test_pool_free_on_finalize() {
526 crate::init().unwrap();
527 let pool = TestBufferPool::default();
528 let mut config = pool.config();
529 config.set_params(None, 1024, 1, 1);
530 pool.set_config(config).expect("failed to set pool config");
531 pool.set_active(true).expect("failed to activate pool");
532 let mut buffer = pool
533 .acquire_buffer(None)
534 .expect("failed to acquire buffer from pool");
535 let finalized = Arc::new(AtomicBool::new(false));
536 unsafe {
537 ffi::gst_mini_object_weak_ref(
538 buffer.make_mut().upcast_mut().as_mut_ptr(),
539 Some(buffer_finalized),
540 Arc::into_raw(finalized.clone()) as *mut _,
541 )
542 };
543 // return the buffer to the pool
544 std::mem::drop(buffer);
545 // drop should finalize the buffer pool which frees all allocated buffers
546 std::mem::drop(pool);
547 assert!(finalized.load(Ordering::SeqCst));
548 }
549
550 #[test]
551 fn test_pool_free_on_deactivate() {
552 crate::init().unwrap();
553 let pool = TestBufferPool::default();
554 let mut config = pool.config();
555 config.set_params(None, 1024, 1, 1);
556 pool.set_config(config).expect("failed to set pool config");
557 pool.set_active(true).expect("failed to activate pool");
558 let mut buffer = pool
559 .acquire_buffer(None)
560 .expect("failed to acquire buffer from pool");
561 let finalized = Arc::new(AtomicBool::new(false));
562 unsafe {
563 ffi::gst_mini_object_weak_ref(
564 buffer.make_mut().upcast_mut().as_mut_ptr(),
565 Some(buffer_finalized),
566 Arc::into_raw(finalized.clone()) as *mut _,
567 )
568 };
569 // return the buffer to the pool
570 std::mem::drop(buffer);
571 // de-activating a poll should free all buffers
572 pool.set_active(false).expect("failed to de-activate pool");
573 assert!(finalized.load(Ordering::SeqCst));
574 }
575
576 unsafe extern "C" fn buffer_finalized(
577 data: *mut libc::c_void,
578 _mini_object: *mut ffi::GstMiniObject,
579 ) {
580 let finalized = Arc::from_raw(data as *const AtomicBool);
581 finalized.store(true, Ordering::SeqCst);
582 }
583}
584