1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{
8 atomic::{AtomicBool, Ordering},
9 Arc, Mutex,
10 },
11 task::{Context, Poll, Waker},
12};
13
14use futures_sink::Sink;
15use glib::{
16 ffi::{gboolean, gpointer},
17 prelude::*,
18 translate::*,
19};
20
21use crate::AppSrc;
22
23#[allow(clippy::type_complexity)]
24pub struct AppSrcCallbacks {
25 need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
26 enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
27 seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
28 panicked: AtomicBool,
29 callbacks: ffi::GstAppSrcCallbacks,
30}
31
32unsafe impl Send for AppSrcCallbacks {}
33unsafe impl Sync for AppSrcCallbacks {}
34
35impl AppSrcCallbacks {
36 pub fn builder() -> AppSrcCallbacksBuilder {
37 skip_assert_initialized!();
38
39 AppSrcCallbacksBuilder {
40 need_data: None,
41 enough_data: None,
42 seek_data: None,
43 }
44 }
45}
46
47#[allow(clippy::type_complexity)]
48#[must_use = "The builder must be built to be used"]
49pub struct AppSrcCallbacksBuilder {
50 need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
51 enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
52 seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
53}
54
55impl AppSrcCallbacksBuilder {
56 pub fn need_data<F: FnMut(&AppSrc, u32) + Send + 'static>(self, need_data: F) -> Self {
57 Self {
58 need_data: Some(Box::new(need_data)),
59 ..self
60 }
61 }
62
63 pub fn enough_data<F: Fn(&AppSrc) + Send + Sync + 'static>(self, enough_data: F) -> Self {
64 Self {
65 enough_data: Some(Box::new(enough_data)),
66 ..self
67 }
68 }
69
70 pub fn seek_data<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
71 self,
72 seek_data: F,
73 ) -> Self {
74 Self {
75 seek_data: Some(Box::new(seek_data)),
76 ..self
77 }
78 }
79
80 #[must_use = "Building the callbacks without using them has no effect"]
81 pub fn build(self) -> AppSrcCallbacks {
82 let have_need_data = self.need_data.is_some();
83 let have_enough_data = self.enough_data.is_some();
84 let have_seek_data = self.seek_data.is_some();
85
86 AppSrcCallbacks {
87 need_data: self.need_data,
88 enough_data: self.enough_data,
89 seek_data: self.seek_data,
90 panicked: AtomicBool::new(false),
91 callbacks: ffi::GstAppSrcCallbacks {
92 need_data: if have_need_data {
93 Some(trampoline_need_data)
94 } else {
95 None
96 },
97 enough_data: if have_enough_data {
98 Some(trampoline_enough_data)
99 } else {
100 None
101 },
102 seek_data: if have_seek_data {
103 Some(trampoline_seek_data)
104 } else {
105 None
106 },
107 _gst_reserved: [
108 ptr::null_mut(),
109 ptr::null_mut(),
110 ptr::null_mut(),
111 ptr::null_mut(),
112 ],
113 },
114 }
115 }
116}
117
118unsafe extern "C" fn trampoline_need_data(
119 appsrc: *mut ffi::GstAppSrc,
120 length: u32,
121 callbacks: gpointer,
122) {
123 let callbacks = callbacks as *mut AppSrcCallbacks;
124 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
125
126 if (*callbacks).panicked.load(Ordering::Relaxed) {
127 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
128 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
129 return;
130 }
131
132 if let Some(ref mut need_data) = (*callbacks).need_data {
133 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| need_data(&element, length)));
134 match result {
135 Ok(result) => result,
136 Err(err) => {
137 (*callbacks).panicked.store(true, Ordering::Relaxed);
138 gst::subclass::post_panic_error_message(
139 element.upcast_ref(),
140 element.upcast_ref(),
141 Some(err),
142 );
143 }
144 }
145 }
146}
147
148unsafe extern "C" fn trampoline_enough_data(appsrc: *mut ffi::GstAppSrc, callbacks: gpointer) {
149 let callbacks = callbacks as *const AppSrcCallbacks;
150 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
151
152 if (*callbacks).panicked.load(Ordering::Relaxed) {
153 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
154 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
155 return;
156 }
157
158 if let Some(ref enough_data) = (*callbacks).enough_data {
159 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| enough_data(&element)));
160 match result {
161 Ok(result) => result,
162 Err(err) => {
163 (*callbacks).panicked.store(true, Ordering::Relaxed);
164 gst::subclass::post_panic_error_message(
165 element.upcast_ref(),
166 element.upcast_ref(),
167 Some(err),
168 );
169 }
170 }
171 }
172}
173
174unsafe extern "C" fn trampoline_seek_data(
175 appsrc: *mut ffi::GstAppSrc,
176 offset: u64,
177 callbacks: gpointer,
178) -> gboolean {
179 let callbacks = callbacks as *const AppSrcCallbacks;
180 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
181
182 if (*callbacks).panicked.load(Ordering::Relaxed) {
183 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
184 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
185 return false.into_glib();
186 }
187
188 let ret = if let Some(ref seek_data) = (*callbacks).seek_data {
189 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| seek_data(&element, offset)));
190 match result {
191 Ok(result) => result,
192 Err(err) => {
193 (*callbacks).panicked.store(true, Ordering::Relaxed);
194 gst::subclass::post_panic_error_message(
195 element.upcast_ref(),
196 element.upcast_ref(),
197 Some(err),
198 );
199
200 false
201 }
202 }
203 } else {
204 false
205 };
206
207 ret.into_glib()
208}
209
210unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
211 let _ = Box::<AppSrcCallbacks>::from_raw(ptr as *mut _);
212}
213
214impl AppSrc {
215 // rustdoc-stripper-ignore-next
216 /// Creates a new builder-pattern struct instance to construct [`AppSrc`] objects.
217 ///
218 /// This method returns an instance of [`AppSrcBuilder`](crate::builders::AppSrcBuilder) which can be used to create [`AppSrc`] objects.
219 pub fn builder() -> AppSrcBuilder {
220 assert_initialized_main_thread!();
221 AppSrcBuilder::new()
222 }
223
224 #[doc(alias = "gst_app_src_set_callbacks")]
225 pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) {
226 #[cfg(not(feature = "v1_18"))]
227 use glib::once_cell::sync::Lazy;
228 #[cfg(not(feature = "v1_18"))]
229 static SET_ONCE_QUARK: Lazy<glib::Quark> =
230 Lazy::new(|| glib::Quark::from_str("gstreamer-rs-app-src-callbacks"));
231
232 unsafe {
233 let src = self.to_glib_none().0;
234 #[cfg(not(feature = "v1_18"))]
235 {
236 // This is not thread-safe before 1.16.3, see
237 // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
238 if gst::version() < (1, 16, 3, 0) {
239 if !glib::gobject_ffi::g_object_get_qdata(
240 src as *mut _,
241 SET_ONCE_QUARK.into_glib(),
242 )
243 .is_null()
244 {
245 panic!("AppSrc callbacks can only be set once");
246 }
247
248 glib::gobject_ffi::g_object_set_qdata(
249 src as *mut _,
250 SET_ONCE_QUARK.into_glib(),
251 1 as *mut _,
252 );
253 }
254 }
255
256 ffi::gst_app_src_set_callbacks(
257 src,
258 mut_override(&callbacks.callbacks),
259 Box::into_raw(Box::new(callbacks)) as *mut _,
260 Some(destroy_callbacks),
261 );
262 }
263 }
264
265 #[doc(alias = "gst_app_src_set_latency")]
266 pub fn set_latency(
267 &self,
268 min: impl Into<Option<gst::ClockTime>>,
269 max: impl Into<Option<gst::ClockTime>>,
270 ) {
271 unsafe {
272 ffi::gst_app_src_set_latency(
273 self.to_glib_none().0,
274 min.into().into_glib(),
275 max.into().into_glib(),
276 );
277 }
278 }
279
280 #[doc(alias = "get_latency")]
281 #[doc(alias = "gst_app_src_get_latency")]
282 pub fn latency(&self) -> (Option<gst::ClockTime>, Option<gst::ClockTime>) {
283 unsafe {
284 let mut min = mem::MaybeUninit::uninit();
285 let mut max = mem::MaybeUninit::uninit();
286 ffi::gst_app_src_get_latency(self.to_glib_none().0, min.as_mut_ptr(), max.as_mut_ptr());
287 (from_glib(min.assume_init()), from_glib(max.assume_init()))
288 }
289 }
290
291 #[doc(alias = "do-timestamp")]
292 #[doc(alias = "gst_base_src_set_do_timestamp")]
293 pub fn set_do_timestamp(&self, timestamp: bool) {
294 unsafe {
295 gst_base::ffi::gst_base_src_set_do_timestamp(
296 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
297 timestamp.into_glib(),
298 );
299 }
300 }
301
302 #[doc(alias = "do-timestamp")]
303 #[doc(alias = "gst_base_src_get_do_timestamp")]
304 pub fn do_timestamp(&self) -> bool {
305 unsafe {
306 from_glib(gst_base::ffi::gst_base_src_get_do_timestamp(
307 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc
308 ))
309 }
310 }
311
312 #[doc(alias = "do-timestamp")]
313 pub fn connect_do_timestamp_notify<F: Fn(&Self) + Send + Sync + 'static>(
314 &self,
315 f: F,
316 ) -> glib::SignalHandlerId {
317 unsafe extern "C" fn notify_do_timestamp_trampoline<
318 F: Fn(&AppSrc) + Send + Sync + 'static,
319 >(
320 this: *mut ffi::GstAppSrc,
321 _param_spec: glib::ffi::gpointer,
322 f: glib::ffi::gpointer,
323 ) {
324 let f: &F = &*(f as *const F);
325 f(&AppSrc::from_glib_borrow(this))
326 }
327 unsafe {
328 let f: Box<F> = Box::new(f);
329 glib::signal::connect_raw(
330 self.as_ptr() as *mut _,
331 b"notify::do-timestamp\0".as_ptr() as *const _,
332 Some(mem::transmute::<_, unsafe extern "C" fn()>(
333 notify_do_timestamp_trampoline::<F> as *const (),
334 )),
335 Box::into_raw(f),
336 )
337 }
338 }
339
340 #[doc(alias = "set-automatic-eos")]
341 #[doc(alias = "gst_base_src_set_automatic_eos")]
342 pub fn set_automatic_eos(&self, automatic_eos: bool) {
343 unsafe {
344 gst_base::ffi::gst_base_src_set_automatic_eos(
345 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
346 automatic_eos.into_glib(),
347 );
348 }
349 }
350
351 pub fn sink(&self) -> AppSrcSink {
352 AppSrcSink::new(self)
353 }
354}
355
356// rustdoc-stripper-ignore-next
357/// A [builder-pattern] type to construct [`AppSrc`] objects.
358///
359/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
360#[must_use = "The builder must be built to be used"]
361pub struct AppSrcBuilder {
362 builder: glib::object::ObjectBuilder<'static, AppSrc>,
363 callbacks: Option<AppSrcCallbacks>,
364 automatic_eos: Option<bool>,
365}
366
367impl AppSrcBuilder {
368 fn new() -> Self {
369 Self {
370 builder: glib::Object::builder(),
371 callbacks: None,
372 automatic_eos: None,
373 }
374 }
375
376 // rustdoc-stripper-ignore-next
377 /// Build the [`AppSrc`].
378 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
379 pub fn build(self) -> AppSrc {
380 let appsrc = self.builder.build();
381
382 if let Some(callbacks) = self.callbacks {
383 appsrc.set_callbacks(callbacks);
384 }
385
386 if let Some(automatic_eos) = self.automatic_eos {
387 appsrc.set_automatic_eos(automatic_eos);
388 }
389
390 appsrc
391 }
392
393 pub fn automatic_eos(self, automatic_eos: bool) -> Self {
394 Self {
395 automatic_eos: Some(automatic_eos),
396 ..self
397 }
398 }
399
400 pub fn block(self, block: bool) -> Self {
401 Self {
402 builder: self.builder.property("block", block),
403 ..self
404 }
405 }
406
407 pub fn callbacks(self, callbacks: AppSrcCallbacks) -> Self {
408 Self {
409 callbacks: Some(callbacks),
410 ..self
411 }
412 }
413
414 pub fn caps(self, caps: &gst::Caps) -> Self {
415 Self {
416 builder: self.builder.property("caps", caps),
417 ..self
418 }
419 }
420
421 pub fn do_timestamp(self, do_timestamp: bool) -> Self {
422 Self {
423 builder: self.builder.property("do-timestamp", do_timestamp),
424 ..self
425 }
426 }
427
428 pub fn duration(self, duration: u64) -> Self {
429 Self {
430 builder: self.builder.property("duration", duration),
431 ..self
432 }
433 }
434
435 pub fn format(self, format: gst::Format) -> Self {
436 Self {
437 builder: self.builder.property("format", format),
438 ..self
439 }
440 }
441
442 #[cfg(feature = "v1_18")]
443 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
444 pub fn handle_segment_change(self, handle_segment_change: bool) -> Self {
445 Self {
446 builder: self
447 .builder
448 .property("handle-segment-change", handle_segment_change),
449 ..self
450 }
451 }
452
453 pub fn is_live(self, is_live: bool) -> Self {
454 Self {
455 builder: self.builder.property("is-live", is_live),
456 ..self
457 }
458 }
459
460 #[cfg(feature = "v1_20")]
461 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
462 pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
463 Self {
464 builder: self.builder.property("leaky-type", leaky_type),
465 ..self
466 }
467 }
468
469 #[cfg(feature = "v1_20")]
470 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
471 pub fn max_buffers(self, max_buffers: u64) -> Self {
472 Self {
473 builder: self.builder.property("max-buffers", max_buffers),
474 ..self
475 }
476 }
477
478 pub fn max_bytes(self, max_bytes: u64) -> Self {
479 Self {
480 builder: self.builder.property("max-bytes", max_bytes),
481 ..self
482 }
483 }
484
485 pub fn max_latency(self, max_latency: i64) -> Self {
486 Self {
487 builder: self.builder.property("max-latency", max_latency),
488 ..self
489 }
490 }
491
492 #[cfg(feature = "v1_20")]
493 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
494 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
495 Self {
496 builder: self.builder.property("max-time", max_time),
497 ..self
498 }
499 }
500
501 pub fn min_latency(self, min_latency: i64) -> Self {
502 Self {
503 builder: self.builder.property("min-latency", min_latency),
504 ..self
505 }
506 }
507
508 pub fn min_percent(self, min_percent: u32) -> Self {
509 Self {
510 builder: self.builder.property("min-percent", min_percent),
511 ..self
512 }
513 }
514
515 pub fn size(self, size: i64) -> Self {
516 Self {
517 builder: self.builder.property("size", size),
518 ..self
519 }
520 }
521
522 pub fn stream_type(self, stream_type: crate::AppStreamType) -> Self {
523 Self {
524 builder: self.builder.property("stream-type", stream_type),
525 ..self
526 }
527 }
528
529 pub fn name(self, name: impl Into<glib::GString>) -> Self {
530 Self {
531 builder: self.builder.property("name", name.into()),
532 ..self
533 }
534 }
535}
536
537#[derive(Debug)]
538pub struct AppSrcSink {
539 app_src: glib::WeakRef<AppSrc>,
540 waker_reference: Arc<Mutex<Option<Waker>>>,
541}
542
543impl AppSrcSink {
544 fn new(app_src: &AppSrc) -> Self {
545 skip_assert_initialized!();
546
547 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
548
549 app_src.set_callbacks(
550 AppSrcCallbacks::builder()
551 .need_data({
552 let waker_reference = Arc::clone(&waker_reference);
553
554 move |_, _| {
555 if let Some(waker) = waker_reference.lock().unwrap().take() {
556 waker.wake();
557 }
558 }
559 })
560 .build(),
561 );
562
563 Self {
564 app_src: app_src.downgrade(),
565 waker_reference,
566 }
567 }
568}
569
570impl Drop for AppSrcSink {
571 fn drop(&mut self) {
572 #[cfg(not(feature = "v1_18"))]
573 {
574 // This is not thread-safe before 1.16.3, see
575 // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
576 if gst::version() >= (1, 16, 3, 0) {
577 if let Some(app_src: AppSrc) = self.app_src.upgrade() {
578 app_src.set_callbacks(AppSrcCallbacks::builder().build());
579 }
580 }
581 }
582 }
583}
584
585impl Sink<gst::Sample> for AppSrcSink {
586 type Error = gst::FlowError;
587
588 fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
589 let mut waker = self.waker_reference.lock().unwrap();
590
591 let Some(app_src) = self.app_src.upgrade() else {
592 return Poll::Ready(Err(gst::FlowError::Eos));
593 };
594
595 let current_level_bytes = app_src.current_level_bytes();
596 let max_bytes = app_src.max_bytes();
597
598 if current_level_bytes >= max_bytes && max_bytes != 0 {
599 waker.replace(context.waker().to_owned());
600
601 Poll::Pending
602 } else {
603 Poll::Ready(Ok(()))
604 }
605 }
606
607 fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
608 let Some(app_src) = self.app_src.upgrade() else {
609 return Err(gst::FlowError::Eos);
610 };
611
612 app_src.push_sample(&sample)?;
613
614 Ok(())
615 }
616
617 fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
618 Poll::Ready(Ok(()))
619 }
620
621 fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
622 let Some(app_src) = self.app_src.upgrade() else {
623 return Poll::Ready(Ok(()));
624 };
625
626 app_src.end_of_stream()?;
627
628 Poll::Ready(Ok(()))
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use std::sync::atomic::{AtomicUsize, Ordering};
635
636 use futures_util::{sink::SinkExt, stream::StreamExt};
637 use gst::prelude::*;
638
639 use super::*;
640
641 #[test]
642 fn test_app_src_sink() {
643 gst::init().unwrap();
644
645 let appsrc = gst::ElementFactory::make("appsrc").build().unwrap();
646 let fakesink = gst::ElementFactory::make("fakesink")
647 .property("signal-handoffs", true)
648 .build()
649 .unwrap();
650
651 let pipeline = gst::Pipeline::new();
652 pipeline.add(&appsrc).unwrap();
653 pipeline.add(&fakesink).unwrap();
654
655 appsrc.link(&fakesink).unwrap();
656
657 let mut bus_stream = pipeline.bus().unwrap().stream();
658 let mut app_src_sink = appsrc.dynamic_cast::<AppSrc>().unwrap().sink();
659
660 let sample_quantity = 5;
661
662 let samples = (0..sample_quantity)
663 .map(|_| gst::Sample::builder().buffer(&gst::Buffer::new()).build())
664 .collect::<Vec<gst::Sample>>();
665
666 let mut sample_stream = futures_util::stream::iter(samples).map(Ok);
667
668 let handoff_count_reference = Arc::new(AtomicUsize::new(0));
669
670 fakesink.connect("handoff", false, {
671 let handoff_count_reference = Arc::clone(&handoff_count_reference);
672
673 move |_| {
674 handoff_count_reference.fetch_add(1, Ordering::AcqRel);
675
676 None
677 }
678 });
679
680 pipeline.set_state(gst::State::Playing).unwrap();
681
682 futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap();
683 futures_executor::block_on(app_src_sink.close()).unwrap();
684
685 while let Some(message) = futures_executor::block_on(bus_stream.next()) {
686 match message.view() {
687 gst::MessageView::Eos(_) => break,
688 gst::MessageView::Error(_) => unreachable!(),
689 _ => continue,
690 }
691 }
692
693 pipeline.set_state(gst::State::Null).unwrap();
694
695 assert_eq!(
696 handoff_count_reference.load(Ordering::Acquire),
697 sample_quantity
698 );
699 }
700}
701