1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{
8 atomic::{AtomicBool, Ordering},
9 Arc, Mutex,
10 },
11 task::{Context, Poll, Waker},
12};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::AppSink;
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22 new_preroll: Option<
23 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24 >,
25 new_sample: Option<
26 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27 >,
28 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29 propose_allocation:
30 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31 panicked: AtomicBool,
32 callbacks: ffi::GstAppSinkCallbacks,
33}
34
35unsafe impl Send for AppSinkCallbacks {}
36unsafe impl Sync for AppSinkCallbacks {}
37
38impl AppSinkCallbacks {
39 pub fn builder() -> AppSinkCallbacksBuilder {
40 skip_assert_initialized!();
41 AppSinkCallbacksBuilder {
42 eos: None,
43 new_preroll: None,
44 new_sample: None,
45 new_event: None,
46 propose_allocation: None,
47 }
48 }
49}
50
51#[allow(clippy::type_complexity)]
52#[must_use = "The builder must be built to be used"]
53pub struct AppSinkCallbacksBuilder {
54 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
55 new_preroll: Option<
56 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
57 >,
58 new_sample: Option<
59 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
60 >,
61 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
62 propose_allocation:
63 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
64}
65
66impl AppSinkCallbacksBuilder {
67 pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
68 Self {
69 eos: Some(Box::new(eos)),
70 ..self
71 }
72 }
73
74 pub fn new_preroll<
75 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
76 >(
77 self,
78 new_preroll: F,
79 ) -> Self {
80 Self {
81 new_preroll: Some(Box::new(new_preroll)),
82 ..self
83 }
84 }
85
86 pub fn new_sample<
87 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
88 >(
89 self,
90 new_sample: F,
91 ) -> Self {
92 Self {
93 new_sample: Some(Box::new(new_sample)),
94 ..self
95 }
96 }
97
98 #[cfg(feature = "v1_20")]
99 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
100 pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
101 Self {
102 new_event: Some(Box::new(new_event)),
103 ..self
104 }
105 }
106
107 #[cfg(feature = "v1_24")]
108 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
109 pub fn propose_allocation<
110 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
111 >(
112 self,
113 propose_allocation: F,
114 ) -> Self {
115 Self {
116 propose_allocation: Some(Box::new(propose_allocation)),
117 ..self
118 }
119 }
120
121 #[must_use = "Building the callbacks without using them has no effect"]
122 pub fn build(self) -> AppSinkCallbacks {
123 let have_eos = self.eos.is_some();
124 let have_new_preroll = self.new_preroll.is_some();
125 let have_new_sample = self.new_sample.is_some();
126 let have_new_event = self.new_event.is_some();
127 let have_propose_allocation = self.propose_allocation.is_some();
128
129 AppSinkCallbacks {
130 eos: self.eos,
131 new_preroll: self.new_preroll,
132 new_sample: self.new_sample,
133 new_event: self.new_event,
134 propose_allocation: self.propose_allocation,
135 panicked: AtomicBool::new(false),
136 callbacks: ffi::GstAppSinkCallbacks {
137 eos: if have_eos { Some(trampoline_eos) } else { None },
138 new_preroll: if have_new_preroll {
139 Some(trampoline_new_preroll)
140 } else {
141 None
142 },
143 new_sample: if have_new_sample {
144 Some(trampoline_new_sample)
145 } else {
146 None
147 },
148 new_event: if have_new_event {
149 Some(trampoline_new_event)
150 } else {
151 None
152 },
153 propose_allocation: if have_propose_allocation {
154 Some(trampoline_propose_allocation)
155 } else {
156 None
157 },
158 _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
159 },
160 }
161 }
162}
163
164unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
165 let callbacks = callbacks as *mut AppSinkCallbacks;
166 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
167
168 if (*callbacks).panicked.load(Ordering::Relaxed) {
169 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
170 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
171 return;
172 }
173
174 if let Some(ref mut eos) = (*callbacks).eos {
175 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
176 match result {
177 Ok(result) => result,
178 Err(err) => {
179 (*callbacks).panicked.store(true, Ordering::Relaxed);
180 gst::subclass::post_panic_error_message(
181 element.upcast_ref(),
182 element.upcast_ref(),
183 Some(err),
184 );
185 }
186 }
187 }
188}
189
190unsafe extern "C" fn trampoline_new_preroll(
191 appsink: *mut ffi::GstAppSink,
192 callbacks: gpointer,
193) -> gst::ffi::GstFlowReturn {
194 let callbacks = callbacks as *mut AppSinkCallbacks;
195 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
196
197 if (*callbacks).panicked.load(Ordering::Relaxed) {
198 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
199 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
200 return gst::FlowReturn::Error.into_glib();
201 }
202
203 let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
204 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
205 match result {
206 Ok(result) => result,
207 Err(err) => {
208 (*callbacks).panicked.store(true, Ordering::Relaxed);
209 gst::subclass::post_panic_error_message(
210 element.upcast_ref(),
211 element.upcast_ref(),
212 Some(err),
213 );
214
215 gst::FlowReturn::Error
216 }
217 }
218 } else {
219 gst::FlowReturn::Error
220 };
221
222 ret.into_glib()
223}
224
225unsafe extern "C" fn trampoline_new_sample(
226 appsink: *mut ffi::GstAppSink,
227 callbacks: gpointer,
228) -> gst::ffi::GstFlowReturn {
229 let callbacks = callbacks as *mut AppSinkCallbacks;
230 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
231
232 if (*callbacks).panicked.load(Ordering::Relaxed) {
233 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
234 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
235 return gst::FlowReturn::Error.into_glib();
236 }
237
238 let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
239 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
240 match result {
241 Ok(result) => result,
242 Err(err) => {
243 (*callbacks).panicked.store(true, Ordering::Relaxed);
244 gst::subclass::post_panic_error_message(
245 element.upcast_ref(),
246 element.upcast_ref(),
247 Some(err),
248 );
249
250 gst::FlowReturn::Error
251 }
252 }
253 } else {
254 gst::FlowReturn::Error
255 };
256
257 ret.into_glib()
258}
259
260unsafe extern "C" fn trampoline_new_event(
261 appsink: *mut ffi::GstAppSink,
262 callbacks: gpointer,
263) -> glib::ffi::gboolean {
264 let callbacks = callbacks as *mut AppSinkCallbacks;
265 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
266
267 if (*callbacks).panicked.load(Ordering::Relaxed) {
268 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
269 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
270 return false.into_glib();
271 }
272
273 let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
274 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
275 match result {
276 Ok(result) => result,
277 Err(err) => {
278 (*callbacks).panicked.store(true, Ordering::Relaxed);
279 gst::subclass::post_panic_error_message(
280 element.upcast_ref(),
281 element.upcast_ref(),
282 Some(err),
283 );
284
285 false
286 }
287 }
288 } else {
289 false
290 };
291
292 ret.into_glib()
293}
294
295unsafe extern "C" fn trampoline_propose_allocation(
296 appsink: *mut ffi::GstAppSink,
297 query: *mut gst::ffi::GstQuery,
298 callbacks: gpointer,
299) -> glib::ffi::gboolean {
300 let callbacks = callbacks as *mut AppSinkCallbacks;
301 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
302
303 if (*callbacks).panicked.load(Ordering::Relaxed) {
304 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
305 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
306 return false.into_glib();
307 }
308
309 let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
310 let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
311 gst::QueryViewMut::Allocation(allocation) => allocation,
312 _ => unreachable!(),
313 };
314 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
315 propose_allocation(&element, query)
316 }));
317 match result {
318 Ok(result) => result,
319 Err(err) => {
320 (*callbacks).panicked.store(true, Ordering::Relaxed);
321 gst::subclass::post_panic_error_message(
322 element.upcast_ref(),
323 element.upcast_ref(),
324 Some(err),
325 );
326
327 false
328 }
329 }
330 } else {
331 false
332 };
333
334 ret.into_glib()
335}
336
337unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
338 let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
339}
340
341impl AppSink {
342 // rustdoc-stripper-ignore-next
343 /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
344 ///
345 /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
346 pub fn builder() -> AppSinkBuilder {
347 assert_initialized_main_thread!();
348 AppSinkBuilder::new()
349 }
350
351 #[doc(alias = "gst_app_sink_set_callbacks")]
352 pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
353 #[cfg(not(feature = "v1_18"))]
354 use glib::once_cell::sync::Lazy;
355 #[cfg(not(feature = "v1_18"))]
356 static SET_ONCE_QUARK: Lazy<glib::Quark> =
357 Lazy::new(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
358
359 unsafe {
360 let sink = self.to_glib_none().0;
361
362 #[cfg(not(feature = "v1_18"))]
363 {
364 // This is not thread-safe before 1.16.3, see
365 // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
366 if gst::version() < (1, 16, 3, 0) {
367 if !glib::gobject_ffi::g_object_get_qdata(
368 sink as *mut _,
369 SET_ONCE_QUARK.into_glib(),
370 )
371 .is_null()
372 {
373 panic!("AppSink callbacks can only be set once");
374 }
375
376 glib::gobject_ffi::g_object_set_qdata(
377 sink as *mut _,
378 SET_ONCE_QUARK.into_glib(),
379 1 as *mut _,
380 );
381 }
382 }
383
384 ffi::gst_app_sink_set_callbacks(
385 sink,
386 mut_override(&callbacks.callbacks),
387 Box::into_raw(Box::new(callbacks)) as *mut _,
388 Some(destroy_callbacks),
389 );
390 }
391 }
392
393 #[doc(alias = "drop-out-of-segment")]
394 pub fn drops_out_of_segment(&self) -> bool {
395 unsafe {
396 from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
397 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
398 ))
399 }
400 }
401
402 #[doc(alias = "max-bitrate")]
403 #[doc(alias = "gst_base_sink_get_max_bitrate")]
404 pub fn max_bitrate(&self) -> u64 {
405 unsafe {
406 gst_base::ffi::gst_base_sink_get_max_bitrate(
407 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
408 )
409 }
410 }
411
412 #[doc(alias = "max-lateness")]
413 #[doc(alias = "gst_base_sink_get_max_lateness")]
414 pub fn max_lateness(&self) -> i64 {
415 unsafe {
416 gst_base::ffi::gst_base_sink_get_max_lateness(
417 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
418 )
419 }
420 }
421
422 #[doc(alias = "processing-deadline")]
423 #[cfg(feature = "v1_16")]
424 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
425 #[doc(alias = "gst_base_sink_get_processing_deadline")]
426 pub fn processing_deadline(&self) -> gst::ClockTime {
427 unsafe {
428 try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
429 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
430 ))
431 .expect("undefined processing_deadline")
432 }
433 }
434
435 #[doc(alias = "render-delay")]
436 #[doc(alias = "gst_base_sink_get_render_delay")]
437 pub fn render_delay(&self) -> gst::ClockTime {
438 unsafe {
439 try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
440 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
441 ))
442 .expect("undefined render_delay")
443 }
444 }
445
446 #[cfg(feature = "v1_18")]
447 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
448 #[doc(alias = "gst_base_sink_get_stats")]
449 pub fn stats(&self) -> gst::Structure {
450 unsafe {
451 from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
452 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
453 ))
454 }
455 }
456
457 #[doc(alias = "sync")]
458 pub fn is_sync(&self) -> bool {
459 unsafe {
460 from_glib(gst_base::ffi::gst_base_sink_get_sync(
461 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
462 ))
463 }
464 }
465
466 #[doc(alias = "throttle-time")]
467 #[doc(alias = "gst_base_sink_get_throttle_time")]
468 pub fn throttle_time(&self) -> u64 {
469 unsafe {
470 gst_base::ffi::gst_base_sink_get_throttle_time(
471 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
472 )
473 }
474 }
475
476 #[doc(alias = "ts-offset")]
477 #[doc(alias = "gst_base_sink_get_ts_offset")]
478 pub fn ts_offset(&self) -> gst::ClockTimeDiff {
479 unsafe {
480 gst_base::ffi::gst_base_sink_get_ts_offset(
481 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
482 )
483 }
484 }
485
486 #[doc(alias = "async")]
487 #[doc(alias = "gst_base_sink_is_async_enabled")]
488 pub fn is_async(&self) -> bool {
489 unsafe {
490 from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
491 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
492 ))
493 }
494 }
495
496 #[doc(alias = "last-sample")]
497 pub fn enables_last_sample(&self) -> bool {
498 unsafe {
499 from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
500 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
501 ))
502 }
503 }
504
505 #[doc(alias = "qos")]
506 #[doc(alias = "gst_base_sink_is_qos_enabled")]
507 pub fn is_qos(&self) -> bool {
508 unsafe {
509 from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
510 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
511 ))
512 }
513 }
514
515 #[doc(alias = "async")]
516 #[doc(alias = "gst_base_sink_set_async_enabled")]
517 pub fn set_async(&self, enabled: bool) {
518 unsafe {
519 gst_base::ffi::gst_base_sink_set_async_enabled(
520 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
521 enabled.into_glib(),
522 );
523 }
524 }
525
526 #[doc(alias = "drop-out-of-segment")]
527 #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
528 pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
529 unsafe {
530 gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
531 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
532 drop_out_of_segment.into_glib(),
533 );
534 }
535 }
536
537 #[doc(alias = "last-sample")]
538 pub fn set_enable_last_sample(&self, enabled: bool) {
539 unsafe {
540 gst_base::ffi::gst_base_sink_set_last_sample_enabled(
541 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
542 enabled.into_glib(),
543 );
544 }
545 }
546
547 #[doc(alias = "max-bitrate")]
548 #[doc(alias = "gst_base_sink_set_max_bitrate")]
549 pub fn set_max_bitrate(&self, max_bitrate: u64) {
550 unsafe {
551 gst_base::ffi::gst_base_sink_set_max_bitrate(
552 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
553 max_bitrate,
554 );
555 }
556 }
557
558 #[doc(alias = "max-lateness")]
559 #[doc(alias = "gst_base_sink_set_max_lateness")]
560 pub fn set_max_lateness(&self, max_lateness: i64) {
561 unsafe {
562 gst_base::ffi::gst_base_sink_set_max_lateness(
563 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
564 max_lateness,
565 );
566 }
567 }
568
569 #[doc(alias = "processing-deadline")]
570 #[cfg(feature = "v1_16")]
571 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
572 #[doc(alias = "gst_base_sink_set_processing_deadline")]
573 pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
574 unsafe {
575 gst_base::ffi::gst_base_sink_set_processing_deadline(
576 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
577 processing_deadline.into_glib(),
578 );
579 }
580 }
581
582 #[doc(alias = "qos")]
583 #[doc(alias = "gst_base_sink_set_qos_enabled")]
584 pub fn set_qos(&self, enabled: bool) {
585 unsafe {
586 gst_base::ffi::gst_base_sink_set_qos_enabled(
587 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
588 enabled.into_glib(),
589 );
590 }
591 }
592
593 #[doc(alias = "render-delay")]
594 #[doc(alias = "gst_base_sink_set_render_delay")]
595 pub fn set_render_delay(&self, delay: gst::ClockTime) {
596 unsafe {
597 gst_base::ffi::gst_base_sink_set_render_delay(
598 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
599 delay.into_glib(),
600 );
601 }
602 }
603
604 #[doc(alias = "sync")]
605 #[doc(alias = "gst_base_sink_set_sync")]
606 pub fn set_sync(&self, sync: bool) {
607 unsafe {
608 gst_base::ffi::gst_base_sink_set_sync(
609 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
610 sync.into_glib(),
611 );
612 }
613 }
614
615 #[doc(alias = "throttle-time")]
616 #[doc(alias = "gst_base_sink_set_throttle_time")]
617 pub fn set_throttle_time(&self, throttle: u64) {
618 unsafe {
619 gst_base::ffi::gst_base_sink_set_throttle_time(
620 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
621 throttle,
622 );
623 }
624 }
625
626 #[doc(alias = "ts-offset")]
627 #[doc(alias = "gst_base_sink_set_ts_offset")]
628 pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
629 unsafe {
630 gst_base::ffi::gst_base_sink_set_ts_offset(
631 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
632 offset,
633 );
634 }
635 }
636
637 #[doc(alias = "async")]
638 pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
639 &self,
640 f: F,
641 ) -> glib::SignalHandlerId {
642 unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
643 this: *mut ffi::GstAppSink,
644 _param_spec: glib::ffi::gpointer,
645 f: glib::ffi::gpointer,
646 ) {
647 let f: &F = &*(f as *const F);
648 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
649 }
650 unsafe {
651 let f: Box<F> = Box::new(f);
652 glib::signal::connect_raw(
653 self.as_ptr() as *mut _,
654 b"notify::async\0".as_ptr() as *const _,
655 Some(mem::transmute::<_, unsafe extern "C" fn()>(
656 notify_async_trampoline::<F> as *const (),
657 )),
658 Box::into_raw(f),
659 )
660 }
661 }
662
663 #[doc(alias = "blocksize")]
664 pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
665 &self,
666 f: F,
667 ) -> glib::SignalHandlerId {
668 unsafe extern "C" fn notify_blocksize_trampoline<
669 F: Fn(&AppSink) + Send + Sync + 'static,
670 >(
671 this: *mut ffi::GstAppSink,
672 _param_spec: glib::ffi::gpointer,
673 f: glib::ffi::gpointer,
674 ) {
675 let f: &F = &*(f as *const F);
676 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
677 }
678 unsafe {
679 let f: Box<F> = Box::new(f);
680 glib::signal::connect_raw(
681 self.as_ptr() as *mut _,
682 b"notify::blocksize\0".as_ptr() as *const _,
683 Some(mem::transmute::<_, unsafe extern "C" fn()>(
684 notify_blocksize_trampoline::<F> as *const (),
685 )),
686 Box::into_raw(f),
687 )
688 }
689 }
690
691 #[doc(alias = "enable-last-sample")]
692 pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
693 &self,
694 f: F,
695 ) -> glib::SignalHandlerId {
696 unsafe extern "C" fn notify_enable_last_sample_trampoline<
697 F: Fn(&AppSink) + Send + Sync + 'static,
698 >(
699 this: *mut ffi::GstAppSink,
700 _param_spec: glib::ffi::gpointer,
701 f: glib::ffi::gpointer,
702 ) {
703 let f: &F = &*(f as *const F);
704 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
705 }
706 unsafe {
707 let f: Box<F> = Box::new(f);
708 glib::signal::connect_raw(
709 self.as_ptr() as *mut _,
710 b"notify::enable-last-sample\0".as_ptr() as *const _,
711 Some(mem::transmute::<_, unsafe extern "C" fn()>(
712 notify_enable_last_sample_trampoline::<F> as *const (),
713 )),
714 Box::into_raw(f),
715 )
716 }
717 }
718
719 #[doc(alias = "last-sample")]
720 pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
721 &self,
722 f: F,
723 ) -> glib::SignalHandlerId {
724 unsafe extern "C" fn notify_last_sample_trampoline<
725 F: Fn(&AppSink) + Send + Sync + 'static,
726 >(
727 this: *mut ffi::GstAppSink,
728 _param_spec: glib::ffi::gpointer,
729 f: glib::ffi::gpointer,
730 ) {
731 let f: &F = &*(f as *const F);
732 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
733 }
734 unsafe {
735 let f: Box<F> = Box::new(f);
736 glib::signal::connect_raw(
737 self.as_ptr() as *mut _,
738 b"notify::last-sample\0".as_ptr() as *const _,
739 Some(mem::transmute::<_, unsafe extern "C" fn()>(
740 notify_last_sample_trampoline::<F> as *const (),
741 )),
742 Box::into_raw(f),
743 )
744 }
745 }
746
747 #[doc(alias = "max-bitrate")]
748 pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
749 &self,
750 f: F,
751 ) -> glib::SignalHandlerId {
752 unsafe extern "C" fn notify_max_bitrate_trampoline<
753 F: Fn(&AppSink) + Send + Sync + 'static,
754 >(
755 this: *mut ffi::GstAppSink,
756 _param_spec: glib::ffi::gpointer,
757 f: glib::ffi::gpointer,
758 ) {
759 let f: &F = &*(f as *const F);
760 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
761 }
762 unsafe {
763 let f: Box<F> = Box::new(f);
764 glib::signal::connect_raw(
765 self.as_ptr() as *mut _,
766 b"notify::max-bitrate\0".as_ptr() as *const _,
767 Some(mem::transmute::<_, unsafe extern "C" fn()>(
768 notify_max_bitrate_trampoline::<F> as *const (),
769 )),
770 Box::into_raw(f),
771 )
772 }
773 }
774
775 #[doc(alias = "max-lateness")]
776 pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
777 &self,
778 f: F,
779 ) -> glib::SignalHandlerId {
780 unsafe extern "C" fn notify_max_lateness_trampoline<
781 F: Fn(&AppSink) + Send + Sync + 'static,
782 >(
783 this: *mut ffi::GstAppSink,
784 _param_spec: glib::ffi::gpointer,
785 f: glib::ffi::gpointer,
786 ) {
787 let f: &F = &*(f as *const F);
788 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
789 }
790 unsafe {
791 let f: Box<F> = Box::new(f);
792 glib::signal::connect_raw(
793 self.as_ptr() as *mut _,
794 b"notify::max-lateness\0".as_ptr() as *const _,
795 Some(mem::transmute::<_, unsafe extern "C" fn()>(
796 notify_max_lateness_trampoline::<F> as *const (),
797 )),
798 Box::into_raw(f),
799 )
800 }
801 }
802
803 #[cfg(feature = "v1_16")]
804 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
805 #[doc(alias = "processing-deadline")]
806 pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
807 &self,
808 f: F,
809 ) -> glib::SignalHandlerId {
810 unsafe extern "C" fn notify_processing_deadline_trampoline<
811 F: Fn(&AppSink) + Send + Sync + 'static,
812 >(
813 this: *mut ffi::GstAppSink,
814 _param_spec: glib::ffi::gpointer,
815 f: glib::ffi::gpointer,
816 ) {
817 let f: &F = &*(f as *const F);
818 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
819 }
820 unsafe {
821 let f: Box<F> = Box::new(f);
822 glib::signal::connect_raw(
823 self.as_ptr() as *mut _,
824 b"notify::processing-deadline\0".as_ptr() as *const _,
825 Some(mem::transmute::<_, unsafe extern "C" fn()>(
826 notify_processing_deadline_trampoline::<F> as *const (),
827 )),
828 Box::into_raw(f),
829 )
830 }
831 }
832
833 #[doc(alias = "qos")]
834 pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
835 &self,
836 f: F,
837 ) -> glib::SignalHandlerId {
838 unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
839 this: *mut ffi::GstAppSink,
840 _param_spec: glib::ffi::gpointer,
841 f: glib::ffi::gpointer,
842 ) {
843 let f: &F = &*(f as *const F);
844 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
845 }
846 unsafe {
847 let f: Box<F> = Box::new(f);
848 glib::signal::connect_raw(
849 self.as_ptr() as *mut _,
850 b"notify::qos\0".as_ptr() as *const _,
851 Some(mem::transmute::<_, unsafe extern "C" fn()>(
852 notify_qos_trampoline::<F> as *const (),
853 )),
854 Box::into_raw(f),
855 )
856 }
857 }
858
859 #[doc(alias = "render-delay")]
860 pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
861 &self,
862 f: F,
863 ) -> glib::SignalHandlerId {
864 unsafe extern "C" fn notify_render_delay_trampoline<
865 F: Fn(&AppSink) + Send + Sync + 'static,
866 >(
867 this: *mut ffi::GstAppSink,
868 _param_spec: glib::ffi::gpointer,
869 f: glib::ffi::gpointer,
870 ) {
871 let f: &F = &*(f as *const F);
872 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
873 }
874 unsafe {
875 let f: Box<F> = Box::new(f);
876 glib::signal::connect_raw(
877 self.as_ptr() as *mut _,
878 b"notify::render-delay\0".as_ptr() as *const _,
879 Some(mem::transmute::<_, unsafe extern "C" fn()>(
880 notify_render_delay_trampoline::<F> as *const (),
881 )),
882 Box::into_raw(f),
883 )
884 }
885 }
886
887 #[cfg(feature = "v1_18")]
888 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
889 #[doc(alias = "stats")]
890 pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
891 &self,
892 f: F,
893 ) -> glib::SignalHandlerId {
894 unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
895 this: *mut ffi::GstAppSink,
896 _param_spec: glib::ffi::gpointer,
897 f: glib::ffi::gpointer,
898 ) {
899 let f: &F = &*(f as *const F);
900 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
901 }
902 unsafe {
903 let f: Box<F> = Box::new(f);
904 glib::signal::connect_raw(
905 self.as_ptr() as *mut _,
906 b"notify::stats\0".as_ptr() as *const _,
907 Some(mem::transmute::<_, unsafe extern "C" fn()>(
908 notify_stats_trampoline::<F> as *const (),
909 )),
910 Box::into_raw(f),
911 )
912 }
913 }
914
915 #[doc(alias = "sync")]
916 pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
917 &self,
918 f: F,
919 ) -> glib::SignalHandlerId {
920 unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
921 this: *mut ffi::GstAppSink,
922 _param_spec: glib::ffi::gpointer,
923 f: glib::ffi::gpointer,
924 ) {
925 let f: &F = &*(f as *const F);
926 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
927 }
928 unsafe {
929 let f: Box<F> = Box::new(f);
930 glib::signal::connect_raw(
931 self.as_ptr() as *mut _,
932 b"notify::sync\0".as_ptr() as *const _,
933 Some(mem::transmute::<_, unsafe extern "C" fn()>(
934 notify_sync_trampoline::<F> as *const (),
935 )),
936 Box::into_raw(f),
937 )
938 }
939 }
940
941 #[doc(alias = "throttle-time")]
942 pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
943 &self,
944 f: F,
945 ) -> glib::SignalHandlerId {
946 unsafe extern "C" fn notify_throttle_time_trampoline<
947 F: Fn(&AppSink) + Send + Sync + 'static,
948 >(
949 this: *mut ffi::GstAppSink,
950 _param_spec: glib::ffi::gpointer,
951 f: glib::ffi::gpointer,
952 ) {
953 let f: &F = &*(f as *const F);
954 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
955 }
956 unsafe {
957 let f: Box<F> = Box::new(f);
958 glib::signal::connect_raw(
959 self.as_ptr() as *mut _,
960 b"notify::throttle-time\0".as_ptr() as *const _,
961 Some(mem::transmute::<_, unsafe extern "C" fn()>(
962 notify_throttle_time_trampoline::<F> as *const (),
963 )),
964 Box::into_raw(f),
965 )
966 }
967 }
968
969 #[doc(alias = "ts-offset")]
970 pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
971 &self,
972 f: F,
973 ) -> glib::SignalHandlerId {
974 unsafe extern "C" fn notify_ts_offset_trampoline<
975 F: Fn(&AppSink) + Send + Sync + 'static,
976 >(
977 this: *mut ffi::GstAppSink,
978 _param_spec: glib::ffi::gpointer,
979 f: glib::ffi::gpointer,
980 ) {
981 let f: &F = &*(f as *const F);
982 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
983 }
984 unsafe {
985 let f: Box<F> = Box::new(f);
986 glib::signal::connect_raw(
987 self.as_ptr() as *mut _,
988 b"notify::ts-offset\0".as_ptr() as *const _,
989 Some(mem::transmute::<_, unsafe extern "C" fn()>(
990 notify_ts_offset_trampoline::<F> as *const (),
991 )),
992 Box::into_raw(f),
993 )
994 }
995 }
996
997 pub fn stream(&self) -> AppSinkStream {
998 AppSinkStream::new(self)
999 }
1000}
1001
1002// rustdoc-stripper-ignore-next
1003/// A [builder-pattern] type to construct [`AppSink`] objects.
1004///
1005/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1006#[must_use = "The builder must be built to be used"]
1007pub struct AppSinkBuilder {
1008 builder: glib::object::ObjectBuilder<'static, AppSink>,
1009 callbacks: Option<AppSinkCallbacks>,
1010 drop_out_of_segment: Option<bool>,
1011}
1012
1013impl AppSinkBuilder {
1014 fn new() -> Self {
1015 Self {
1016 builder: glib::Object::builder(),
1017 callbacks: None,
1018 drop_out_of_segment: None,
1019 }
1020 }
1021
1022 // rustdoc-stripper-ignore-next
1023 /// Build the [`AppSink`].
1024 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1025 pub fn build(self) -> AppSink {
1026 let appsink = self.builder.build();
1027
1028 if let Some(callbacks) = self.callbacks {
1029 appsink.set_callbacks(callbacks);
1030 }
1031
1032 if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1033 appsink.set_drop_out_of_segment(drop_out_of_segment);
1034 }
1035
1036 appsink
1037 }
1038
1039 pub fn async_(self, async_: bool) -> Self {
1040 Self {
1041 builder: self.builder.property("async", async_),
1042 ..self
1043 }
1044 }
1045
1046 pub fn buffer_list(self, buffer_list: bool) -> Self {
1047 Self {
1048 builder: self.builder.property("buffer-list", buffer_list),
1049 ..self
1050 }
1051 }
1052
1053 pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1054 Self {
1055 callbacks: Some(callbacks),
1056 ..self
1057 }
1058 }
1059
1060 pub fn caps(self, caps: &gst::Caps) -> Self {
1061 Self {
1062 builder: self.builder.property("caps", caps),
1063 ..self
1064 }
1065 }
1066
1067 pub fn drop(self, drop: bool) -> Self {
1068 Self {
1069 builder: self.builder.property("drop", drop),
1070 ..self
1071 }
1072 }
1073
1074 pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1075 Self {
1076 builder: self
1077 .builder
1078 .property("drop-out-of-segment", drop_out_of_segment),
1079 ..self
1080 }
1081 }
1082
1083 pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1084 Self {
1085 builder: self
1086 .builder
1087 .property("enable-last-sample", enable_last_sample),
1088 ..self
1089 }
1090 }
1091
1092 pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1093 Self {
1094 builder: self.builder.property("max-bitrate", max_bitrate),
1095 ..self
1096 }
1097 }
1098
1099 pub fn max_buffers(self, max_buffers: u32) -> Self {
1100 Self {
1101 builder: self.builder.property("max-buffers", max_buffers),
1102 ..self
1103 }
1104 }
1105
1106 pub fn max_lateness(self, max_lateness: i64) -> Self {
1107 Self {
1108 builder: self.builder.property("max-lateness", max_lateness),
1109 ..self
1110 }
1111 }
1112
1113 #[cfg(feature = "v1_16")]
1114 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1115 pub fn processing_deadline(self, processing_deadline: i64) -> Self {
1116 Self {
1117 builder: self
1118 .builder
1119 .property("processing-deadline", processing_deadline),
1120 ..self
1121 }
1122 }
1123
1124 pub fn qos(self, qos: bool) -> Self {
1125 Self {
1126 builder: self.builder.property("qos", qos),
1127 ..self
1128 }
1129 }
1130
1131 pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1132 Self {
1133 builder: self.builder.property("render-delay", render_delay),
1134 ..self
1135 }
1136 }
1137
1138 pub fn sync(self, sync: bool) -> Self {
1139 Self {
1140 builder: self.builder.property("sync", sync),
1141 ..self
1142 }
1143 }
1144
1145 pub fn throttle_time(self, throttle_time: u64) -> Self {
1146 Self {
1147 builder: self.builder.property("throttle-time", throttle_time),
1148 ..self
1149 }
1150 }
1151
1152 pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1153 Self {
1154 builder: self.builder.property("ts-offset", ts_offset),
1155 ..self
1156 }
1157 }
1158
1159 pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1160 Self {
1161 builder: self.builder.property("wait-on-eos", wait_on_eos),
1162 ..self
1163 }
1164 }
1165
1166 #[cfg(feature = "v1_24")]
1167 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1168 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1169 Self {
1170 builder: self.builder.property("max-time", max_time),
1171 ..self
1172 }
1173 }
1174
1175 #[cfg(feature = "v1_24")]
1176 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1177 pub fn max_bytes(self, max_bytes: u64) -> Self {
1178 Self {
1179 builder: self.builder.property("max-bytes", max_bytes),
1180 ..self
1181 }
1182 }
1183
1184 pub fn name(self, name: impl Into<glib::GString>) -> Self {
1185 Self {
1186 builder: self.builder.property("name", name.into()),
1187 ..self
1188 }
1189 }
1190}
1191
1192#[derive(Debug)]
1193pub struct AppSinkStream {
1194 app_sink: glib::WeakRef<AppSink>,
1195 waker_reference: Arc<Mutex<Option<Waker>>>,
1196}
1197
1198impl AppSinkStream {
1199 fn new(app_sink: &AppSink) -> Self {
1200 skip_assert_initialized!();
1201
1202 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1203
1204 app_sink.set_callbacks(
1205 AppSinkCallbacks::builder()
1206 .new_sample({
1207 let waker_reference = Arc::clone(&waker_reference);
1208
1209 move |_| {
1210 if let Some(waker) = waker_reference.lock().unwrap().take() {
1211 waker.wake();
1212 }
1213
1214 Ok(gst::FlowSuccess::Ok)
1215 }
1216 })
1217 .eos({
1218 let waker_reference = Arc::clone(&waker_reference);
1219
1220 move |_| {
1221 if let Some(waker) = waker_reference.lock().unwrap().take() {
1222 waker.wake();
1223 }
1224 }
1225 })
1226 .build(),
1227 );
1228
1229 Self {
1230 app_sink: app_sink.downgrade(),
1231 waker_reference,
1232 }
1233 }
1234}
1235
1236impl Drop for AppSinkStream {
1237 fn drop(&mut self) {
1238 #[cfg(not(feature = "v1_18"))]
1239 {
1240 // This is not thread-safe before 1.16.3, see
1241 // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1242 if gst::version() >= (1, 16, 3, 0) {
1243 if let Some(app_sink: AppSink) = self.app_sink.upgrade() {
1244 app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1245 }
1246 }
1247 }
1248 }
1249}
1250
1251impl Stream for AppSinkStream {
1252 type Item = gst::Sample;
1253
1254 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1255 let mut waker: MutexGuard<'_, Option> = self.waker_reference.lock().unwrap();
1256
1257 let Some(app_sink: AppSink) = self.app_sink.upgrade() else {
1258 return Poll::Ready(None);
1259 };
1260
1261 app_sinkOption>>
1262 .try_pull_sample(timeout:gst::ClockTime::ZERO)
1263 .map(|sample: Sample| Poll::Ready(Some(sample)))
1264 .unwrap_or_else(|| {
1265 if app_sink.is_eos() {
1266 return Poll::Ready(None);
1267 }
1268
1269 waker.replace(context.waker().to_owned());
1270
1271 Poll::Pending
1272 })
1273 }
1274}
1275
1276#[cfg(test)]
1277mod tests {
1278 use futures_util::StreamExt;
1279 use gst::prelude::*;
1280
1281 use super::*;
1282
1283 #[test]
1284 fn test_app_sink_stream() {
1285 gst::init().unwrap();
1286
1287 let videotestsrc = gst::ElementFactory::make("videotestsrc")
1288 .property("num-buffers", 5)
1289 .build()
1290 .unwrap();
1291 let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1292
1293 let pipeline = gst::Pipeline::new();
1294 pipeline.add(&videotestsrc).unwrap();
1295 pipeline.add(&appsink).unwrap();
1296
1297 videotestsrc.link(&appsink).unwrap();
1298
1299 let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1300 let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1301
1302 pipeline.set_state(gst::State::Playing).unwrap();
1303 let samples = futures_executor::block_on(samples_future);
1304 pipeline.set_state(gst::State::Null).unwrap();
1305
1306 assert_eq!(samples.len(), 5);
1307 }
1308}
1309