1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22 new_preroll: Option<
23 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24 >,
25 new_sample: Option<
26 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27 >,
28 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29 propose_allocation:
30 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31 #[cfg(not(panic = "abort"))]
32 panicked: AtomicBool,
33 callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40 pub fn builder() -> AppSinkCallbacksBuilder {
41 skip_assert_initialized!();
42 AppSinkCallbacksBuilder {
43 eos: None,
44 new_preroll: None,
45 new_sample: None,
46 new_event: None,
47 propose_allocation: None,
48 }
49 }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56 new_preroll: Option<
57 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58 >,
59 new_sample: Option<
60 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61 >,
62 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63 propose_allocation:
64 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68 pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69 Self {
70 eos: Some(Box::new(eos)),
71 ..self
72 }
73 }
74
75 pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76 if predicate {
77 self.eos(eos)
78 } else {
79 self
80 }
81 }
82
83 pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84 if let Some(eos) = eos {
85 self.eos(eos)
86 } else {
87 self
88 }
89 }
90
91 pub fn new_preroll<
92 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93 >(
94 self,
95 new_preroll: F,
96 ) -> Self {
97 Self {
98 new_preroll: Some(Box::new(new_preroll)),
99 ..self
100 }
101 }
102
103 pub fn new_preroll_if<
104 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105 >(
106 self,
107 new_preroll: F,
108 predicate: bool,
109 ) -> Self {
110 if predicate {
111 self.new_preroll(new_preroll)
112 } else {
113 self
114 }
115 }
116
117 pub fn new_preroll_if_some<
118 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119 >(
120 self,
121 new_preroll: Option<F>,
122 ) -> Self {
123 if let Some(new_preroll) = new_preroll {
124 self.new_preroll(new_preroll)
125 } else {
126 self
127 }
128 }
129
130 pub fn new_sample<
131 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132 >(
133 self,
134 new_sample: F,
135 ) -> Self {
136 Self {
137 new_sample: Some(Box::new(new_sample)),
138 ..self
139 }
140 }
141
142 pub fn new_sample_if<
143 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144 >(
145 self,
146 new_sample: F,
147 predicate: bool,
148 ) -> Self {
149 if predicate {
150 self.new_sample(new_sample)
151 } else {
152 self
153 }
154 }
155
156 pub fn new_sample_if_some<
157 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158 >(
159 self,
160 new_sample: Option<F>,
161 ) -> Self {
162 if let Some(new_sample) = new_sample {
163 self.new_sample(new_sample)
164 } else {
165 self
166 }
167 }
168
169 #[cfg(feature = "v1_20")]
170 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171 pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172 Self {
173 new_event: Some(Box::new(new_event)),
174 ..self
175 }
176 }
177
178 #[cfg(feature = "v1_20")]
179 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180 pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181 self,
182 new_event: F,
183 predicate: bool,
184 ) -> Self {
185 if predicate {
186 self.new_event(new_event)
187 } else {
188 self
189 }
190 }
191
192 #[cfg(feature = "v1_20")]
193 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194 pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195 self,
196 new_event: Option<F>,
197 ) -> Self {
198 if let Some(new_event) = new_event {
199 self.new_event(new_event)
200 } else {
201 self
202 }
203 }
204
205 #[cfg(feature = "v1_24")]
206 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207 pub fn propose_allocation<
208 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209 >(
210 self,
211 propose_allocation: F,
212 ) -> Self {
213 Self {
214 propose_allocation: Some(Box::new(propose_allocation)),
215 ..self
216 }
217 }
218
219 #[cfg(feature = "v1_24")]
220 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221 pub fn propose_allocation_if<
222 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223 >(
224 self,
225 propose_allocation: F,
226 predicate: bool,
227 ) -> Self {
228 if predicate {
229 self.propose_allocation(propose_allocation)
230 } else {
231 self
232 }
233 }
234
235 #[cfg(feature = "v1_24")]
236 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237 pub fn propose_allocation_if_some<
238 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239 >(
240 self,
241 propose_allocation: Option<F>,
242 ) -> Self {
243 if let Some(propose_allocation) = propose_allocation {
244 self.propose_allocation(propose_allocation)
245 } else {
246 self
247 }
248 }
249
250 #[must_use = "Building the callbacks without using them has no effect"]
251 pub fn build(self) -> AppSinkCallbacks {
252 let have_eos = self.eos.is_some();
253 let have_new_preroll = self.new_preroll.is_some();
254 let have_new_sample = self.new_sample.is_some();
255 let have_new_event = self.new_event.is_some();
256 let have_propose_allocation = self.propose_allocation.is_some();
257
258 AppSinkCallbacks {
259 eos: self.eos,
260 new_preroll: self.new_preroll,
261 new_sample: self.new_sample,
262 new_event: self.new_event,
263 propose_allocation: self.propose_allocation,
264 #[cfg(not(panic = "abort"))]
265 panicked: AtomicBool::new(false),
266 callbacks: ffi::GstAppSinkCallbacks {
267 eos: if have_eos { Some(trampoline_eos) } else { None },
268 new_preroll: if have_new_preroll {
269 Some(trampoline_new_preroll)
270 } else {
271 None
272 },
273 new_sample: if have_new_sample {
274 Some(trampoline_new_sample)
275 } else {
276 None
277 },
278 new_event: if have_new_event {
279 Some(trampoline_new_event)
280 } else {
281 None
282 },
283 propose_allocation: if have_propose_allocation {
284 Some(trampoline_propose_allocation)
285 } else {
286 None
287 },
288 _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289 },
290 }
291 }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295 let callbacks = callbacks as *mut AppSinkCallbacks;
296 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298 #[cfg(not(panic = "abort"))]
299 if (*callbacks).panicked.load(Ordering::Relaxed) {
300 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302 return;
303 }
304
305 if let Some(ref mut eos) = (*callbacks).eos {
306 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307 match result {
308 Ok(result) => result,
309 Err(err) => {
310 #[cfg(panic = "abort")]
311 {
312 unreachable!("{err:?}");
313 }
314 #[cfg(not(panic = "abort"))]
315 {
316 (*callbacks).panicked.store(true, Ordering::Relaxed);
317 gst::subclass::post_panic_error_message(
318 element.upcast_ref(),
319 element.upcast_ref(),
320 Some(err),
321 );
322 }
323 }
324 }
325 }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329 appsink: *mut ffi::GstAppSink,
330 callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332 let callbacks = callbacks as *mut AppSinkCallbacks;
333 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335 #[cfg(not(panic = "abort"))]
336 if (*callbacks).panicked.load(Ordering::Relaxed) {
337 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339 return gst::FlowReturn::Error.into_glib();
340 }
341
342 let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344 match result {
345 Ok(result) => result,
346 Err(err) => {
347 #[cfg(panic = "abort")]
348 {
349 unreachable!("{err:?}");
350 }
351 #[cfg(not(panic = "abort"))]
352 {
353 (*callbacks).panicked.store(true, Ordering::Relaxed);
354 gst::subclass::post_panic_error_message(
355 element.upcast_ref(),
356 element.upcast_ref(),
357 Some(err),
358 );
359
360 gst::FlowReturn::Error
361 }
362 }
363 }
364 } else {
365 gst::FlowReturn::Error
366 };
367
368 ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372 appsink: *mut ffi::GstAppSink,
373 callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375 let callbacks = callbacks as *mut AppSinkCallbacks;
376 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378 #[cfg(not(panic = "abort"))]
379 if (*callbacks).panicked.load(Ordering::Relaxed) {
380 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382 return gst::FlowReturn::Error.into_glib();
383 }
384
385 let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387 match result {
388 Ok(result) => result,
389 Err(err) => {
390 #[cfg(panic = "abort")]
391 {
392 unreachable!("{err:?}");
393 }
394 #[cfg(not(panic = "abort"))]
395 {
396 (*callbacks).panicked.store(true, Ordering::Relaxed);
397 gst::subclass::post_panic_error_message(
398 element.upcast_ref(),
399 element.upcast_ref(),
400 Some(err),
401 );
402
403 gst::FlowReturn::Error
404 }
405 }
406 }
407 } else {
408 gst::FlowReturn::Error
409 };
410
411 ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415 appsink: *mut ffi::GstAppSink,
416 callbacks: gpointer,
417) -> glib::ffi::gboolean {
418 let callbacks = callbacks as *mut AppSinkCallbacks;
419 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421 #[cfg(not(panic = "abort"))]
422 if (*callbacks).panicked.load(Ordering::Relaxed) {
423 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425 return false.into_glib();
426 }
427
428 let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430 match result {
431 Ok(result) => result,
432 Err(err) => {
433 #[cfg(panic = "abort")]
434 {
435 unreachable!("{err:?}");
436 }
437 #[cfg(not(panic = "abort"))]
438 {
439 (*callbacks).panicked.store(true, Ordering::Relaxed);
440 gst::subclass::post_panic_error_message(
441 element.upcast_ref(),
442 element.upcast_ref(),
443 Some(err),
444 );
445
446 false
447 }
448 }
449 }
450 } else {
451 false
452 };
453
454 ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458 appsink: *mut ffi::GstAppSink,
459 query: *mut gst::ffi::GstQuery,
460 callbacks: gpointer,
461) -> glib::ffi::gboolean {
462 let callbacks = callbacks as *mut AppSinkCallbacks;
463 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465 #[cfg(not(panic = "abort"))]
466 if (*callbacks).panicked.load(Ordering::Relaxed) {
467 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469 return false.into_glib();
470 }
471
472 let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473 let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474 gst::QueryViewMut::Allocation(allocation) => allocation,
475 _ => unreachable!(),
476 };
477 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478 propose_allocation(&element, query)
479 }));
480 match result {
481 Ok(result) => result,
482 Err(err) => {
483 #[cfg(panic = "abort")]
484 {
485 unreachable!("{err:?}");
486 }
487 #[cfg(not(panic = "abort"))]
488 {
489 (*callbacks).panicked.store(true, Ordering::Relaxed);
490 gst::subclass::post_panic_error_message(
491 element.upcast_ref(),
492 element.upcast_ref(),
493 Some(err),
494 );
495 false
496 }
497 }
498 }
499 } else {
500 false
501 };
502
503 ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507 let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511 // rustdoc-stripper-ignore-next
512 /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
513 ///
514 /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
515 pub fn builder() -> AppSinkBuilder {
516 assert_initialized_main_thread!();
517 AppSinkBuilder::new()
518 }
519
520 #[doc(alias = "gst_app_sink_set_callbacks")]
521 pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
522 unsafe {
523 let sink = self.to_glib_none().0;
524
525 #[cfg(not(feature = "v1_18"))]
526 {
527 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
528 std::sync::OnceLock::new();
529
530 let set_once_quark = SET_ONCE_QUARK
531 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
532
533 // This is not thread-safe before 1.16.3, see
534 // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
535 if gst::version() < (1, 16, 3, 0) {
536 if !glib::gobject_ffi::g_object_get_qdata(
537 sink as *mut _,
538 set_once_quark.into_glib(),
539 )
540 .is_null()
541 {
542 panic!("AppSink callbacks can only be set once");
543 }
544
545 glib::gobject_ffi::g_object_set_qdata(
546 sink as *mut _,
547 set_once_quark.into_glib(),
548 1 as *mut _,
549 );
550 }
551 }
552
553 ffi::gst_app_sink_set_callbacks(
554 sink,
555 mut_override(&callbacks.callbacks),
556 Box::into_raw(Box::new(callbacks)) as *mut _,
557 Some(destroy_callbacks),
558 );
559 }
560 }
561
562 #[doc(alias = "drop-out-of-segment")]
563 pub fn drops_out_of_segment(&self) -> bool {
564 unsafe {
565 from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
566 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
567 ))
568 }
569 }
570
571 #[doc(alias = "max-bitrate")]
572 #[doc(alias = "gst_base_sink_get_max_bitrate")]
573 pub fn max_bitrate(&self) -> u64 {
574 unsafe {
575 gst_base::ffi::gst_base_sink_get_max_bitrate(
576 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
577 )
578 }
579 }
580
581 #[doc(alias = "max-lateness")]
582 #[doc(alias = "gst_base_sink_get_max_lateness")]
583 pub fn max_lateness(&self) -> i64 {
584 unsafe {
585 gst_base::ffi::gst_base_sink_get_max_lateness(
586 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
587 )
588 }
589 }
590
591 #[doc(alias = "processing-deadline")]
592 #[cfg(feature = "v1_16")]
593 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
594 #[doc(alias = "gst_base_sink_get_processing_deadline")]
595 pub fn processing_deadline(&self) -> gst::ClockTime {
596 unsafe {
597 try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
598 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
599 ))
600 .expect("undefined processing_deadline")
601 }
602 }
603
604 #[doc(alias = "render-delay")]
605 #[doc(alias = "gst_base_sink_get_render_delay")]
606 pub fn render_delay(&self) -> gst::ClockTime {
607 unsafe {
608 try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
609 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
610 ))
611 .expect("undefined render_delay")
612 }
613 }
614
615 #[cfg(feature = "v1_18")]
616 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
617 #[doc(alias = "gst_base_sink_get_stats")]
618 pub fn stats(&self) -> gst::Structure {
619 unsafe {
620 from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
621 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
622 ))
623 }
624 }
625
626 #[doc(alias = "sync")]
627 pub fn is_sync(&self) -> bool {
628 unsafe {
629 from_glib(gst_base::ffi::gst_base_sink_get_sync(
630 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
631 ))
632 }
633 }
634
635 #[doc(alias = "throttle-time")]
636 #[doc(alias = "gst_base_sink_get_throttle_time")]
637 pub fn throttle_time(&self) -> u64 {
638 unsafe {
639 gst_base::ffi::gst_base_sink_get_throttle_time(
640 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
641 )
642 }
643 }
644
645 #[doc(alias = "ts-offset")]
646 #[doc(alias = "gst_base_sink_get_ts_offset")]
647 pub fn ts_offset(&self) -> gst::ClockTimeDiff {
648 unsafe {
649 gst_base::ffi::gst_base_sink_get_ts_offset(
650 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
651 )
652 }
653 }
654
655 #[doc(alias = "async")]
656 #[doc(alias = "gst_base_sink_is_async_enabled")]
657 pub fn is_async(&self) -> bool {
658 unsafe {
659 from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
660 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
661 ))
662 }
663 }
664
665 #[doc(alias = "last-sample")]
666 pub fn enables_last_sample(&self) -> bool {
667 unsafe {
668 from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
669 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
670 ))
671 }
672 }
673
674 #[doc(alias = "qos")]
675 #[doc(alias = "gst_base_sink_is_qos_enabled")]
676 pub fn is_qos(&self) -> bool {
677 unsafe {
678 from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
679 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
680 ))
681 }
682 }
683
684 #[doc(alias = "async")]
685 #[doc(alias = "gst_base_sink_set_async_enabled")]
686 pub fn set_async(&self, enabled: bool) {
687 unsafe {
688 gst_base::ffi::gst_base_sink_set_async_enabled(
689 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
690 enabled.into_glib(),
691 );
692 }
693 }
694
695 #[doc(alias = "drop-out-of-segment")]
696 #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
697 pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
698 unsafe {
699 gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
700 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
701 drop_out_of_segment.into_glib(),
702 );
703 }
704 }
705
706 #[doc(alias = "last-sample")]
707 pub fn set_enable_last_sample(&self, enabled: bool) {
708 unsafe {
709 gst_base::ffi::gst_base_sink_set_last_sample_enabled(
710 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
711 enabled.into_glib(),
712 );
713 }
714 }
715
716 #[doc(alias = "max-bitrate")]
717 #[doc(alias = "gst_base_sink_set_max_bitrate")]
718 pub fn set_max_bitrate(&self, max_bitrate: u64) {
719 unsafe {
720 gst_base::ffi::gst_base_sink_set_max_bitrate(
721 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
722 max_bitrate,
723 );
724 }
725 }
726
727 #[doc(alias = "max-lateness")]
728 #[doc(alias = "gst_base_sink_set_max_lateness")]
729 pub fn set_max_lateness(&self, max_lateness: i64) {
730 unsafe {
731 gst_base::ffi::gst_base_sink_set_max_lateness(
732 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
733 max_lateness,
734 );
735 }
736 }
737
738 #[doc(alias = "processing-deadline")]
739 #[cfg(feature = "v1_16")]
740 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
741 #[doc(alias = "gst_base_sink_set_processing_deadline")]
742 pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
743 unsafe {
744 gst_base::ffi::gst_base_sink_set_processing_deadline(
745 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
746 processing_deadline.into_glib(),
747 );
748 }
749 }
750
751 #[doc(alias = "qos")]
752 #[doc(alias = "gst_base_sink_set_qos_enabled")]
753 pub fn set_qos(&self, enabled: bool) {
754 unsafe {
755 gst_base::ffi::gst_base_sink_set_qos_enabled(
756 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
757 enabled.into_glib(),
758 );
759 }
760 }
761
762 #[doc(alias = "render-delay")]
763 #[doc(alias = "gst_base_sink_set_render_delay")]
764 pub fn set_render_delay(&self, delay: gst::ClockTime) {
765 unsafe {
766 gst_base::ffi::gst_base_sink_set_render_delay(
767 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
768 delay.into_glib(),
769 );
770 }
771 }
772
773 #[doc(alias = "sync")]
774 #[doc(alias = "gst_base_sink_set_sync")]
775 pub fn set_sync(&self, sync: bool) {
776 unsafe {
777 gst_base::ffi::gst_base_sink_set_sync(
778 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
779 sync.into_glib(),
780 );
781 }
782 }
783
784 #[doc(alias = "throttle-time")]
785 #[doc(alias = "gst_base_sink_set_throttle_time")]
786 pub fn set_throttle_time(&self, throttle: u64) {
787 unsafe {
788 gst_base::ffi::gst_base_sink_set_throttle_time(
789 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
790 throttle,
791 );
792 }
793 }
794
795 #[doc(alias = "ts-offset")]
796 #[doc(alias = "gst_base_sink_set_ts_offset")]
797 pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
798 unsafe {
799 gst_base::ffi::gst_base_sink_set_ts_offset(
800 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
801 offset,
802 );
803 }
804 }
805
806 #[doc(alias = "async")]
807 pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
808 &self,
809 f: F,
810 ) -> glib::SignalHandlerId {
811 unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
812 this: *mut ffi::GstAppSink,
813 _param_spec: glib::ffi::gpointer,
814 f: glib::ffi::gpointer,
815 ) {
816 let f: &F = &*(f as *const F);
817 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
818 }
819 unsafe {
820 let f: Box<F> = Box::new(f);
821 glib::signal::connect_raw(
822 self.as_ptr() as *mut _,
823 b"notify::async\0".as_ptr() as *const _,
824 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
825 notify_async_trampoline::<F> as *const (),
826 )),
827 Box::into_raw(f),
828 )
829 }
830 }
831
832 #[doc(alias = "blocksize")]
833 pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
834 &self,
835 f: F,
836 ) -> glib::SignalHandlerId {
837 unsafe extern "C" fn notify_blocksize_trampoline<
838 F: Fn(&AppSink) + Send + Sync + 'static,
839 >(
840 this: *mut ffi::GstAppSink,
841 _param_spec: glib::ffi::gpointer,
842 f: glib::ffi::gpointer,
843 ) {
844 let f: &F = &*(f as *const F);
845 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
846 }
847 unsafe {
848 let f: Box<F> = Box::new(f);
849 glib::signal::connect_raw(
850 self.as_ptr() as *mut _,
851 b"notify::blocksize\0".as_ptr() as *const _,
852 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
853 notify_blocksize_trampoline::<F> as *const (),
854 )),
855 Box::into_raw(f),
856 )
857 }
858 }
859
860 #[doc(alias = "enable-last-sample")]
861 pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
862 &self,
863 f: F,
864 ) -> glib::SignalHandlerId {
865 unsafe extern "C" fn notify_enable_last_sample_trampoline<
866 F: Fn(&AppSink) + Send + Sync + 'static,
867 >(
868 this: *mut ffi::GstAppSink,
869 _param_spec: glib::ffi::gpointer,
870 f: glib::ffi::gpointer,
871 ) {
872 let f: &F = &*(f as *const F);
873 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
874 }
875 unsafe {
876 let f: Box<F> = Box::new(f);
877 glib::signal::connect_raw(
878 self.as_ptr() as *mut _,
879 b"notify::enable-last-sample\0".as_ptr() as *const _,
880 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
881 notify_enable_last_sample_trampoline::<F> as *const (),
882 )),
883 Box::into_raw(f),
884 )
885 }
886 }
887
888 #[doc(alias = "last-sample")]
889 pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
890 &self,
891 f: F,
892 ) -> glib::SignalHandlerId {
893 unsafe extern "C" fn notify_last_sample_trampoline<
894 F: Fn(&AppSink) + Send + Sync + 'static,
895 >(
896 this: *mut ffi::GstAppSink,
897 _param_spec: glib::ffi::gpointer,
898 f: glib::ffi::gpointer,
899 ) {
900 let f: &F = &*(f as *const F);
901 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
902 }
903 unsafe {
904 let f: Box<F> = Box::new(f);
905 glib::signal::connect_raw(
906 self.as_ptr() as *mut _,
907 b"notify::last-sample\0".as_ptr() as *const _,
908 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
909 notify_last_sample_trampoline::<F> as *const (),
910 )),
911 Box::into_raw(f),
912 )
913 }
914 }
915
916 #[doc(alias = "max-bitrate")]
917 pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
918 &self,
919 f: F,
920 ) -> glib::SignalHandlerId {
921 unsafe extern "C" fn notify_max_bitrate_trampoline<
922 F: Fn(&AppSink) + Send + Sync + 'static,
923 >(
924 this: *mut ffi::GstAppSink,
925 _param_spec: glib::ffi::gpointer,
926 f: glib::ffi::gpointer,
927 ) {
928 let f: &F = &*(f as *const F);
929 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
930 }
931 unsafe {
932 let f: Box<F> = Box::new(f);
933 glib::signal::connect_raw(
934 self.as_ptr() as *mut _,
935 b"notify::max-bitrate\0".as_ptr() as *const _,
936 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
937 notify_max_bitrate_trampoline::<F> as *const (),
938 )),
939 Box::into_raw(f),
940 )
941 }
942 }
943
944 #[doc(alias = "max-lateness")]
945 pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
946 &self,
947 f: F,
948 ) -> glib::SignalHandlerId {
949 unsafe extern "C" fn notify_max_lateness_trampoline<
950 F: Fn(&AppSink) + Send + Sync + 'static,
951 >(
952 this: *mut ffi::GstAppSink,
953 _param_spec: glib::ffi::gpointer,
954 f: glib::ffi::gpointer,
955 ) {
956 let f: &F = &*(f as *const F);
957 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
958 }
959 unsafe {
960 let f: Box<F> = Box::new(f);
961 glib::signal::connect_raw(
962 self.as_ptr() as *mut _,
963 b"notify::max-lateness\0".as_ptr() as *const _,
964 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
965 notify_max_lateness_trampoline::<F> as *const (),
966 )),
967 Box::into_raw(f),
968 )
969 }
970 }
971
972 #[cfg(feature = "v1_16")]
973 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
974 #[doc(alias = "processing-deadline")]
975 pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
976 &self,
977 f: F,
978 ) -> glib::SignalHandlerId {
979 unsafe extern "C" fn notify_processing_deadline_trampoline<
980 F: Fn(&AppSink) + Send + Sync + 'static,
981 >(
982 this: *mut ffi::GstAppSink,
983 _param_spec: glib::ffi::gpointer,
984 f: glib::ffi::gpointer,
985 ) {
986 let f: &F = &*(f as *const F);
987 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
988 }
989 unsafe {
990 let f: Box<F> = Box::new(f);
991 glib::signal::connect_raw(
992 self.as_ptr() as *mut _,
993 b"notify::processing-deadline\0".as_ptr() as *const _,
994 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
995 notify_processing_deadline_trampoline::<F> as *const (),
996 )),
997 Box::into_raw(f),
998 )
999 }
1000 }
1001
1002 #[doc(alias = "qos")]
1003 pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1004 &self,
1005 f: F,
1006 ) -> glib::SignalHandlerId {
1007 unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1008 this: *mut ffi::GstAppSink,
1009 _param_spec: glib::ffi::gpointer,
1010 f: glib::ffi::gpointer,
1011 ) {
1012 let f: &F = &*(f as *const F);
1013 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1014 }
1015 unsafe {
1016 let f: Box<F> = Box::new(f);
1017 glib::signal::connect_raw(
1018 self.as_ptr() as *mut _,
1019 b"notify::qos\0".as_ptr() as *const _,
1020 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1021 notify_qos_trampoline::<F> as *const (),
1022 )),
1023 Box::into_raw(f),
1024 )
1025 }
1026 }
1027
1028 #[doc(alias = "render-delay")]
1029 pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1030 &self,
1031 f: F,
1032 ) -> glib::SignalHandlerId {
1033 unsafe extern "C" fn notify_render_delay_trampoline<
1034 F: Fn(&AppSink) + Send + Sync + 'static,
1035 >(
1036 this: *mut ffi::GstAppSink,
1037 _param_spec: glib::ffi::gpointer,
1038 f: glib::ffi::gpointer,
1039 ) {
1040 let f: &F = &*(f as *const F);
1041 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1042 }
1043 unsafe {
1044 let f: Box<F> = Box::new(f);
1045 glib::signal::connect_raw(
1046 self.as_ptr() as *mut _,
1047 b"notify::render-delay\0".as_ptr() as *const _,
1048 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1049 notify_render_delay_trampoline::<F> as *const (),
1050 )),
1051 Box::into_raw(f),
1052 )
1053 }
1054 }
1055
1056 #[cfg(feature = "v1_18")]
1057 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1058 #[doc(alias = "stats")]
1059 pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1060 &self,
1061 f: F,
1062 ) -> glib::SignalHandlerId {
1063 unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1064 this: *mut ffi::GstAppSink,
1065 _param_spec: glib::ffi::gpointer,
1066 f: glib::ffi::gpointer,
1067 ) {
1068 let f: &F = &*(f as *const F);
1069 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1070 }
1071 unsafe {
1072 let f: Box<F> = Box::new(f);
1073 glib::signal::connect_raw(
1074 self.as_ptr() as *mut _,
1075 b"notify::stats\0".as_ptr() as *const _,
1076 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1077 notify_stats_trampoline::<F> as *const (),
1078 )),
1079 Box::into_raw(f),
1080 )
1081 }
1082 }
1083
1084 #[doc(alias = "sync")]
1085 pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1086 &self,
1087 f: F,
1088 ) -> glib::SignalHandlerId {
1089 unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1090 this: *mut ffi::GstAppSink,
1091 _param_spec: glib::ffi::gpointer,
1092 f: glib::ffi::gpointer,
1093 ) {
1094 let f: &F = &*(f as *const F);
1095 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1096 }
1097 unsafe {
1098 let f: Box<F> = Box::new(f);
1099 glib::signal::connect_raw(
1100 self.as_ptr() as *mut _,
1101 b"notify::sync\0".as_ptr() as *const _,
1102 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1103 notify_sync_trampoline::<F> as *const (),
1104 )),
1105 Box::into_raw(f),
1106 )
1107 }
1108 }
1109
1110 #[doc(alias = "throttle-time")]
1111 pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1112 &self,
1113 f: F,
1114 ) -> glib::SignalHandlerId {
1115 unsafe extern "C" fn notify_throttle_time_trampoline<
1116 F: Fn(&AppSink) + Send + Sync + 'static,
1117 >(
1118 this: *mut ffi::GstAppSink,
1119 _param_spec: glib::ffi::gpointer,
1120 f: glib::ffi::gpointer,
1121 ) {
1122 let f: &F = &*(f as *const F);
1123 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1124 }
1125 unsafe {
1126 let f: Box<F> = Box::new(f);
1127 glib::signal::connect_raw(
1128 self.as_ptr() as *mut _,
1129 b"notify::throttle-time\0".as_ptr() as *const _,
1130 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1131 notify_throttle_time_trampoline::<F> as *const (),
1132 )),
1133 Box::into_raw(f),
1134 )
1135 }
1136 }
1137
1138 #[doc(alias = "ts-offset")]
1139 pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1140 &self,
1141 f: F,
1142 ) -> glib::SignalHandlerId {
1143 unsafe extern "C" fn notify_ts_offset_trampoline<
1144 F: Fn(&AppSink) + Send + Sync + 'static,
1145 >(
1146 this: *mut ffi::GstAppSink,
1147 _param_spec: glib::ffi::gpointer,
1148 f: glib::ffi::gpointer,
1149 ) {
1150 let f: &F = &*(f as *const F);
1151 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1152 }
1153 unsafe {
1154 let f: Box<F> = Box::new(f);
1155 glib::signal::connect_raw(
1156 self.as_ptr() as *mut _,
1157 b"notify::ts-offset\0".as_ptr() as *const _,
1158 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1159 notify_ts_offset_trampoline::<F> as *const (),
1160 )),
1161 Box::into_raw(f),
1162 )
1163 }
1164 }
1165
1166 pub fn stream(&self) -> AppSinkStream {
1167 AppSinkStream::new(self)
1168 }
1169}
1170
1171// rustdoc-stripper-ignore-next
1172/// A [builder-pattern] type to construct [`AppSink`] objects.
1173///
1174/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1175#[must_use = "The builder must be built to be used"]
1176pub struct AppSinkBuilder {
1177 builder: glib::object::ObjectBuilder<'static, AppSink>,
1178 callbacks: Option<AppSinkCallbacks>,
1179 drop_out_of_segment: Option<bool>,
1180}
1181
1182impl AppSinkBuilder {
1183 fn new() -> Self {
1184 Self {
1185 builder: glib::Object::builder(),
1186 callbacks: None,
1187 drop_out_of_segment: None,
1188 }
1189 }
1190
1191 // rustdoc-stripper-ignore-next
1192 /// Build the [`AppSink`].
1193 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1194 pub fn build(self) -> AppSink {
1195 let appsink = self.builder.build();
1196
1197 if let Some(callbacks) = self.callbacks {
1198 appsink.set_callbacks(callbacks);
1199 }
1200
1201 if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1202 appsink.set_drop_out_of_segment(drop_out_of_segment);
1203 }
1204
1205 appsink
1206 }
1207
1208 pub fn async_(self, async_: bool) -> Self {
1209 Self {
1210 builder: self.builder.property("async", async_),
1211 ..self
1212 }
1213 }
1214
1215 pub fn buffer_list(self, buffer_list: bool) -> Self {
1216 Self {
1217 builder: self.builder.property("buffer-list", buffer_list),
1218 ..self
1219 }
1220 }
1221
1222 pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1223 Self {
1224 callbacks: Some(callbacks),
1225 ..self
1226 }
1227 }
1228
1229 pub fn caps(self, caps: &gst::Caps) -> Self {
1230 Self {
1231 builder: self.builder.property("caps", caps),
1232 ..self
1233 }
1234 }
1235
1236 pub fn drop(self, drop: bool) -> Self {
1237 Self {
1238 builder: self.builder.property("drop", drop),
1239 ..self
1240 }
1241 }
1242
1243 pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1244 Self {
1245 builder: self
1246 .builder
1247 .property("drop-out-of-segment", drop_out_of_segment),
1248 ..self
1249 }
1250 }
1251
1252 pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1253 Self {
1254 builder: self
1255 .builder
1256 .property("enable-last-sample", enable_last_sample),
1257 ..self
1258 }
1259 }
1260
1261 pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1262 Self {
1263 builder: self.builder.property("max-bitrate", max_bitrate),
1264 ..self
1265 }
1266 }
1267
1268 pub fn max_buffers(self, max_buffers: u32) -> Self {
1269 Self {
1270 builder: self.builder.property("max-buffers", max_buffers),
1271 ..self
1272 }
1273 }
1274
1275 pub fn max_lateness(self, max_lateness: i64) -> Self {
1276 Self {
1277 builder: self.builder.property("max-lateness", max_lateness),
1278 ..self
1279 }
1280 }
1281
1282 #[cfg(feature = "v1_16")]
1283 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1284 pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1285 Self {
1286 builder: self
1287 .builder
1288 .property("processing-deadline", processing_deadline),
1289 ..self
1290 }
1291 }
1292
1293 pub fn qos(self, qos: bool) -> Self {
1294 Self {
1295 builder: self.builder.property("qos", qos),
1296 ..self
1297 }
1298 }
1299
1300 pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1301 Self {
1302 builder: self.builder.property("render-delay", render_delay),
1303 ..self
1304 }
1305 }
1306
1307 pub fn sync(self, sync: bool) -> Self {
1308 Self {
1309 builder: self.builder.property("sync", sync),
1310 ..self
1311 }
1312 }
1313
1314 pub fn throttle_time(self, throttle_time: u64) -> Self {
1315 Self {
1316 builder: self.builder.property("throttle-time", throttle_time),
1317 ..self
1318 }
1319 }
1320
1321 pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1322 Self {
1323 builder: self.builder.property("ts-offset", ts_offset),
1324 ..self
1325 }
1326 }
1327
1328 pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1329 Self {
1330 builder: self.builder.property("wait-on-eos", wait_on_eos),
1331 ..self
1332 }
1333 }
1334
1335 #[cfg(feature = "v1_24")]
1336 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1337 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1338 Self {
1339 builder: self.builder.property("max-time", max_time),
1340 ..self
1341 }
1342 }
1343
1344 #[cfg(feature = "v1_24")]
1345 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1346 pub fn max_bytes(self, max_bytes: u64) -> Self {
1347 Self {
1348 builder: self.builder.property("max-bytes", max_bytes),
1349 ..self
1350 }
1351 }
1352
1353 pub fn name(self, name: impl Into<glib::GString>) -> Self {
1354 Self {
1355 builder: self.builder.property("name", name.into()),
1356 ..self
1357 }
1358 }
1359}
1360
1361#[derive(Debug)]
1362pub struct AppSinkStream {
1363 app_sink: glib::WeakRef<AppSink>,
1364 waker_reference: Arc<Mutex<Option<Waker>>>,
1365}
1366
1367impl AppSinkStream {
1368 fn new(app_sink: &AppSink) -> Self {
1369 skip_assert_initialized!();
1370
1371 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1372
1373 app_sink.set_callbacks(
1374 AppSinkCallbacks::builder()
1375 .new_sample({
1376 let waker_reference = Arc::clone(&waker_reference);
1377
1378 move |_| {
1379 if let Some(waker) = waker_reference.lock().unwrap().take() {
1380 waker.wake();
1381 }
1382
1383 Ok(gst::FlowSuccess::Ok)
1384 }
1385 })
1386 .eos({
1387 let waker_reference = Arc::clone(&waker_reference);
1388
1389 move |_| {
1390 if let Some(waker) = waker_reference.lock().unwrap().take() {
1391 waker.wake();
1392 }
1393 }
1394 })
1395 .build(),
1396 );
1397
1398 Self {
1399 app_sink: app_sink.downgrade(),
1400 waker_reference,
1401 }
1402 }
1403}
1404
1405impl Drop for AppSinkStream {
1406 fn drop(&mut self) {
1407 #[cfg(not(feature = "v1_18"))]
1408 {
1409 // This is not thread-safe before 1.16.3, see
1410 // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1411 if gst::version() >= (1, 16, 3, 0) {
1412 if let Some(app_sink: AppSink) = self.app_sink.upgrade() {
1413 app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1414 }
1415 }
1416 }
1417 }
1418}
1419
1420impl Stream for AppSinkStream {
1421 type Item = gst::Sample;
1422
1423 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1424 let mut waker: MutexGuard<'_, Option> = self.waker_reference.lock().unwrap();
1425
1426 let Some(app_sink: AppSink) = self.app_sink.upgrade() else {
1427 return Poll::Ready(None);
1428 };
1429
1430 app_sinkOption>>
1431 .try_pull_sample(timeout:gst::ClockTime::ZERO)
1432 .map(|sample: Sample| Poll::Ready(Some(sample)))
1433 .unwrap_or_else(|| {
1434 if app_sink.is_eos() {
1435 return Poll::Ready(None);
1436 }
1437
1438 waker.replace(context.waker().to_owned());
1439
1440 Poll::Pending
1441 })
1442 }
1443}
1444
1445#[cfg(test)]
1446mod tests {
1447 use futures_util::StreamExt;
1448 use gst::prelude::*;
1449
1450 use super::*;
1451
1452 #[test]
1453 fn test_app_sink_stream() {
1454 gst::init().unwrap();
1455
1456 let videotestsrc = gst::ElementFactory::make("videotestsrc")
1457 .property("num-buffers", 5)
1458 .build()
1459 .unwrap();
1460 let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1461
1462 let pipeline = gst::Pipeline::new();
1463 pipeline.add(&videotestsrc).unwrap();
1464 pipeline.add(&appsink).unwrap();
1465
1466 videotestsrc.link(&appsink).unwrap();
1467
1468 let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1469 let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1470
1471 pipeline.set_state(gst::State::Playing).unwrap();
1472 let samples = futures_executor::block_on(samples_future);
1473 pipeline.set_state(gst::State::Null).unwrap();
1474
1475 assert_eq!(samples.len(), 5);
1476 }
1477}
1478