1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{ |
4 | mem, panic, |
5 | pin::Pin, |
6 | ptr, |
7 | sync::{ |
8 | atomic::{AtomicBool, Ordering}, |
9 | Arc, Mutex, |
10 | }, |
11 | task::{Context, Poll, Waker}, |
12 | }; |
13 | |
14 | use futures_sink::Sink; |
15 | use glib::{ |
16 | ffi::{gboolean, gpointer}, |
17 | prelude::*, |
18 | translate::*, |
19 | }; |
20 | |
21 | use crate::AppSrc; |
22 | |
23 | #[allow (clippy::type_complexity)] |
24 | pub struct AppSrcCallbacks { |
25 | need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>, |
26 | enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>, |
27 | seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>, |
28 | panicked: AtomicBool, |
29 | callbacks: ffi::GstAppSrcCallbacks, |
30 | } |
31 | |
32 | unsafe impl Send for AppSrcCallbacks {} |
33 | unsafe impl Sync for AppSrcCallbacks {} |
34 | |
35 | impl AppSrcCallbacks { |
36 | pub fn builder() -> AppSrcCallbacksBuilder { |
37 | skip_assert_initialized!(); |
38 | |
39 | AppSrcCallbacksBuilder { |
40 | need_data: None, |
41 | enough_data: None, |
42 | seek_data: None, |
43 | } |
44 | } |
45 | } |
46 | |
47 | #[allow (clippy::type_complexity)] |
48 | #[must_use = "The builder must be built to be used" ] |
49 | pub struct AppSrcCallbacksBuilder { |
50 | need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>, |
51 | enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>, |
52 | seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>, |
53 | } |
54 | |
55 | impl AppSrcCallbacksBuilder { |
56 | pub fn need_data<F: FnMut(&AppSrc, u32) + Send + 'static>(self, need_data: F) -> Self { |
57 | Self { |
58 | need_data: Some(Box::new(need_data)), |
59 | ..self |
60 | } |
61 | } |
62 | |
63 | pub fn enough_data<F: Fn(&AppSrc) + Send + Sync + 'static>(self, enough_data: F) -> Self { |
64 | Self { |
65 | enough_data: Some(Box::new(enough_data)), |
66 | ..self |
67 | } |
68 | } |
69 | |
70 | pub fn seek_data<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>( |
71 | self, |
72 | seek_data: F, |
73 | ) -> Self { |
74 | Self { |
75 | seek_data: Some(Box::new(seek_data)), |
76 | ..self |
77 | } |
78 | } |
79 | |
80 | #[must_use = "Building the callbacks without using them has no effect" ] |
81 | pub fn build(self) -> AppSrcCallbacks { |
82 | let have_need_data = self.need_data.is_some(); |
83 | let have_enough_data = self.enough_data.is_some(); |
84 | let have_seek_data = self.seek_data.is_some(); |
85 | |
86 | AppSrcCallbacks { |
87 | need_data: self.need_data, |
88 | enough_data: self.enough_data, |
89 | seek_data: self.seek_data, |
90 | panicked: AtomicBool::new(false), |
91 | callbacks: ffi::GstAppSrcCallbacks { |
92 | need_data: if have_need_data { |
93 | Some(trampoline_need_data) |
94 | } else { |
95 | None |
96 | }, |
97 | enough_data: if have_enough_data { |
98 | Some(trampoline_enough_data) |
99 | } else { |
100 | None |
101 | }, |
102 | seek_data: if have_seek_data { |
103 | Some(trampoline_seek_data) |
104 | } else { |
105 | None |
106 | }, |
107 | _gst_reserved: [ |
108 | ptr::null_mut(), |
109 | ptr::null_mut(), |
110 | ptr::null_mut(), |
111 | ptr::null_mut(), |
112 | ], |
113 | }, |
114 | } |
115 | } |
116 | } |
117 | |
118 | unsafe extern "C" fn trampoline_need_data( |
119 | appsrc: *mut ffi::GstAppSrc, |
120 | length: u32, |
121 | callbacks: gpointer, |
122 | ) { |
123 | let callbacks = callbacks as *mut AppSrcCallbacks; |
124 | let element: Borrowed<AppSrc> = from_glib_borrow(appsrc); |
125 | |
126 | if (*callbacks).panicked.load(Ordering::Relaxed) { |
127 | let element: Borrowed<AppSrc> = from_glib_borrow(appsrc); |
128 | gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None); |
129 | return; |
130 | } |
131 | |
132 | if let Some(ref mut need_data) = (*callbacks).need_data { |
133 | let result = panic::catch_unwind(panic::AssertUnwindSafe(|| need_data(&element, length))); |
134 | match result { |
135 | Ok(result) => result, |
136 | Err(err) => { |
137 | (*callbacks).panicked.store(true, Ordering::Relaxed); |
138 | gst::subclass::post_panic_error_message( |
139 | element.upcast_ref(), |
140 | element.upcast_ref(), |
141 | Some(err), |
142 | ); |
143 | } |
144 | } |
145 | } |
146 | } |
147 | |
148 | unsafe extern "C" fn trampoline_enough_data(appsrc: *mut ffi::GstAppSrc, callbacks: gpointer) { |
149 | let callbacks = callbacks as *const AppSrcCallbacks; |
150 | let element: Borrowed<AppSrc> = from_glib_borrow(appsrc); |
151 | |
152 | if (*callbacks).panicked.load(Ordering::Relaxed) { |
153 | let element: Borrowed<AppSrc> = from_glib_borrow(appsrc); |
154 | gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None); |
155 | return; |
156 | } |
157 | |
158 | if let Some(ref enough_data) = (*callbacks).enough_data { |
159 | let result = panic::catch_unwind(panic::AssertUnwindSafe(|| enough_data(&element))); |
160 | match result { |
161 | Ok(result) => result, |
162 | Err(err) => { |
163 | (*callbacks).panicked.store(true, Ordering::Relaxed); |
164 | gst::subclass::post_panic_error_message( |
165 | element.upcast_ref(), |
166 | element.upcast_ref(), |
167 | Some(err), |
168 | ); |
169 | } |
170 | } |
171 | } |
172 | } |
173 | |
174 | unsafe extern "C" fn trampoline_seek_data( |
175 | appsrc: *mut ffi::GstAppSrc, |
176 | offset: u64, |
177 | callbacks: gpointer, |
178 | ) -> gboolean { |
179 | let callbacks = callbacks as *const AppSrcCallbacks; |
180 | let element: Borrowed<AppSrc> = from_glib_borrow(appsrc); |
181 | |
182 | if (*callbacks).panicked.load(Ordering::Relaxed) { |
183 | let element: Borrowed<AppSrc> = from_glib_borrow(appsrc); |
184 | gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None); |
185 | return false.into_glib(); |
186 | } |
187 | |
188 | let ret = if let Some(ref seek_data) = (*callbacks).seek_data { |
189 | let result = panic::catch_unwind(panic::AssertUnwindSafe(|| seek_data(&element, offset))); |
190 | match result { |
191 | Ok(result) => result, |
192 | Err(err) => { |
193 | (*callbacks).panicked.store(true, Ordering::Relaxed); |
194 | gst::subclass::post_panic_error_message( |
195 | element.upcast_ref(), |
196 | element.upcast_ref(), |
197 | Some(err), |
198 | ); |
199 | |
200 | false |
201 | } |
202 | } |
203 | } else { |
204 | false |
205 | }; |
206 | |
207 | ret.into_glib() |
208 | } |
209 | |
210 | unsafe extern "C" fn destroy_callbacks(ptr: gpointer) { |
211 | let _ = Box::<AppSrcCallbacks>::from_raw(ptr as *mut _); |
212 | } |
213 | |
214 | impl AppSrc { |
215 | // rustdoc-stripper-ignore-next |
216 | /// Creates a new builder-pattern struct instance to construct [`AppSrc`] objects. |
217 | /// |
218 | /// This method returns an instance of [`AppSrcBuilder`](crate::builders::AppSrcBuilder) which can be used to create [`AppSrc`] objects. |
219 | pub fn builder() -> AppSrcBuilder { |
220 | assert_initialized_main_thread!(); |
221 | AppSrcBuilder::new() |
222 | } |
223 | |
224 | #[doc (alias = "gst_app_src_set_callbacks" )] |
225 | pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) { |
226 | #[cfg (not(feature = "v1_18" ))] |
227 | use glib::once_cell::sync::Lazy; |
228 | #[cfg (not(feature = "v1_18" ))] |
229 | static SET_ONCE_QUARK: Lazy<glib::Quark> = |
230 | Lazy::new(|| glib::Quark::from_str("gstreamer-rs-app-src-callbacks" )); |
231 | |
232 | unsafe { |
233 | let src = self.to_glib_none().0; |
234 | #[cfg (not(feature = "v1_18" ))] |
235 | { |
236 | // This is not thread-safe before 1.16.3, see |
237 | // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 |
238 | if gst::version() < (1, 16, 3, 0) { |
239 | if !glib::gobject_ffi::g_object_get_qdata( |
240 | src as *mut _, |
241 | SET_ONCE_QUARK.into_glib(), |
242 | ) |
243 | .is_null() |
244 | { |
245 | panic!("AppSrc callbacks can only be set once" ); |
246 | } |
247 | |
248 | glib::gobject_ffi::g_object_set_qdata( |
249 | src as *mut _, |
250 | SET_ONCE_QUARK.into_glib(), |
251 | 1 as *mut _, |
252 | ); |
253 | } |
254 | } |
255 | |
256 | ffi::gst_app_src_set_callbacks( |
257 | src, |
258 | mut_override(&callbacks.callbacks), |
259 | Box::into_raw(Box::new(callbacks)) as *mut _, |
260 | Some(destroy_callbacks), |
261 | ); |
262 | } |
263 | } |
264 | |
265 | #[doc (alias = "gst_app_src_set_latency" )] |
266 | pub fn set_latency( |
267 | &self, |
268 | min: impl Into<Option<gst::ClockTime>>, |
269 | max: impl Into<Option<gst::ClockTime>>, |
270 | ) { |
271 | unsafe { |
272 | ffi::gst_app_src_set_latency( |
273 | self.to_glib_none().0, |
274 | min.into().into_glib(), |
275 | max.into().into_glib(), |
276 | ); |
277 | } |
278 | } |
279 | |
280 | #[doc (alias = "get_latency" )] |
281 | #[doc (alias = "gst_app_src_get_latency" )] |
282 | pub fn latency(&self) -> (Option<gst::ClockTime>, Option<gst::ClockTime>) { |
283 | unsafe { |
284 | let mut min = mem::MaybeUninit::uninit(); |
285 | let mut max = mem::MaybeUninit::uninit(); |
286 | ffi::gst_app_src_get_latency(self.to_glib_none().0, min.as_mut_ptr(), max.as_mut_ptr()); |
287 | (from_glib(min.assume_init()), from_glib(max.assume_init())) |
288 | } |
289 | } |
290 | |
291 | #[doc (alias = "do-timestamp" )] |
292 | #[doc (alias = "gst_base_src_set_do_timestamp" )] |
293 | pub fn set_do_timestamp(&self, timestamp: bool) { |
294 | unsafe { |
295 | gst_base::ffi::gst_base_src_set_do_timestamp( |
296 | self.as_ptr() as *mut gst_base::ffi::GstBaseSrc, |
297 | timestamp.into_glib(), |
298 | ); |
299 | } |
300 | } |
301 | |
302 | #[doc (alias = "do-timestamp" )] |
303 | #[doc (alias = "gst_base_src_get_do_timestamp" )] |
304 | pub fn do_timestamp(&self) -> bool { |
305 | unsafe { |
306 | from_glib(gst_base::ffi::gst_base_src_get_do_timestamp( |
307 | self.as_ptr() as *mut gst_base::ffi::GstBaseSrc |
308 | )) |
309 | } |
310 | } |
311 | |
312 | #[doc (alias = "do-timestamp" )] |
313 | pub fn connect_do_timestamp_notify<F: Fn(&Self) + Send + Sync + 'static>( |
314 | &self, |
315 | f: F, |
316 | ) -> glib::SignalHandlerId { |
317 | unsafe extern "C" fn notify_do_timestamp_trampoline< |
318 | F: Fn(&AppSrc) + Send + Sync + 'static, |
319 | >( |
320 | this: *mut ffi::GstAppSrc, |
321 | _param_spec: glib::ffi::gpointer, |
322 | f: glib::ffi::gpointer, |
323 | ) { |
324 | let f: &F = &*(f as *const F); |
325 | f(&AppSrc::from_glib_borrow(this)) |
326 | } |
327 | unsafe { |
328 | let f: Box<F> = Box::new(f); |
329 | glib::signal::connect_raw( |
330 | self.as_ptr() as *mut _, |
331 | b"notify::do-timestamp \0" .as_ptr() as *const _, |
332 | Some(mem::transmute::<_, unsafe extern "C" fn()>( |
333 | notify_do_timestamp_trampoline::<F> as *const (), |
334 | )), |
335 | Box::into_raw(f), |
336 | ) |
337 | } |
338 | } |
339 | |
340 | #[doc (alias = "set-automatic-eos" )] |
341 | #[doc (alias = "gst_base_src_set_automatic_eos" )] |
342 | pub fn set_automatic_eos(&self, automatic_eos: bool) { |
343 | unsafe { |
344 | gst_base::ffi::gst_base_src_set_automatic_eos( |
345 | self.as_ptr() as *mut gst_base::ffi::GstBaseSrc, |
346 | automatic_eos.into_glib(), |
347 | ); |
348 | } |
349 | } |
350 | |
351 | pub fn sink(&self) -> AppSrcSink { |
352 | AppSrcSink::new(self) |
353 | } |
354 | } |
355 | |
356 | // rustdoc-stripper-ignore-next |
357 | /// A [builder-pattern] type to construct [`AppSrc`] objects. |
358 | /// |
359 | /// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html |
360 | #[must_use = "The builder must be built to be used" ] |
361 | pub struct AppSrcBuilder { |
362 | builder: glib::object::ObjectBuilder<'static, AppSrc>, |
363 | callbacks: Option<AppSrcCallbacks>, |
364 | automatic_eos: Option<bool>, |
365 | } |
366 | |
367 | impl AppSrcBuilder { |
368 | fn new() -> Self { |
369 | Self { |
370 | builder: glib::Object::builder(), |
371 | callbacks: None, |
372 | automatic_eos: None, |
373 | } |
374 | } |
375 | |
376 | // rustdoc-stripper-ignore-next |
377 | /// Build the [`AppSrc`]. |
378 | #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects" ] |
379 | pub fn build(self) -> AppSrc { |
380 | let appsrc = self.builder.build(); |
381 | |
382 | if let Some(callbacks) = self.callbacks { |
383 | appsrc.set_callbacks(callbacks); |
384 | } |
385 | |
386 | if let Some(automatic_eos) = self.automatic_eos { |
387 | appsrc.set_automatic_eos(automatic_eos); |
388 | } |
389 | |
390 | appsrc |
391 | } |
392 | |
393 | pub fn automatic_eos(self, automatic_eos: bool) -> Self { |
394 | Self { |
395 | automatic_eos: Some(automatic_eos), |
396 | ..self |
397 | } |
398 | } |
399 | |
400 | pub fn block(self, block: bool) -> Self { |
401 | Self { |
402 | builder: self.builder.property("block" , block), |
403 | ..self |
404 | } |
405 | } |
406 | |
407 | pub fn callbacks(self, callbacks: AppSrcCallbacks) -> Self { |
408 | Self { |
409 | callbacks: Some(callbacks), |
410 | ..self |
411 | } |
412 | } |
413 | |
414 | pub fn caps(self, caps: &gst::Caps) -> Self { |
415 | Self { |
416 | builder: self.builder.property("caps" , caps), |
417 | ..self |
418 | } |
419 | } |
420 | |
421 | pub fn do_timestamp(self, do_timestamp: bool) -> Self { |
422 | Self { |
423 | builder: self.builder.property("do-timestamp" , do_timestamp), |
424 | ..self |
425 | } |
426 | } |
427 | |
428 | pub fn duration(self, duration: u64) -> Self { |
429 | Self { |
430 | builder: self.builder.property("duration" , duration), |
431 | ..self |
432 | } |
433 | } |
434 | |
435 | pub fn format(self, format: gst::Format) -> Self { |
436 | Self { |
437 | builder: self.builder.property("format" , format), |
438 | ..self |
439 | } |
440 | } |
441 | |
442 | #[cfg (feature = "v1_18" )] |
443 | #[cfg_attr (docsrs, doc(cfg(feature = "v1_18" )))] |
444 | pub fn handle_segment_change(self, handle_segment_change: bool) -> Self { |
445 | Self { |
446 | builder: self |
447 | .builder |
448 | .property("handle-segment-change" , handle_segment_change), |
449 | ..self |
450 | } |
451 | } |
452 | |
453 | pub fn is_live(self, is_live: bool) -> Self { |
454 | Self { |
455 | builder: self.builder.property("is-live" , is_live), |
456 | ..self |
457 | } |
458 | } |
459 | |
460 | #[cfg (feature = "v1_20" )] |
461 | #[cfg_attr (docsrs, doc(cfg(feature = "v1_20" )))] |
462 | pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self { |
463 | Self { |
464 | builder: self.builder.property("leaky-type" , leaky_type), |
465 | ..self |
466 | } |
467 | } |
468 | |
469 | #[cfg (feature = "v1_20" )] |
470 | #[cfg_attr (docsrs, doc(cfg(feature = "v1_20" )))] |
471 | pub fn max_buffers(self, max_buffers: u64) -> Self { |
472 | Self { |
473 | builder: self.builder.property("max-buffers" , max_buffers), |
474 | ..self |
475 | } |
476 | } |
477 | |
478 | pub fn max_bytes(self, max_bytes: u64) -> Self { |
479 | Self { |
480 | builder: self.builder.property("max-bytes" , max_bytes), |
481 | ..self |
482 | } |
483 | } |
484 | |
485 | pub fn max_latency(self, max_latency: i64) -> Self { |
486 | Self { |
487 | builder: self.builder.property("max-latency" , max_latency), |
488 | ..self |
489 | } |
490 | } |
491 | |
492 | #[cfg (feature = "v1_20" )] |
493 | #[cfg_attr (docsrs, doc(cfg(feature = "v1_20" )))] |
494 | pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self { |
495 | Self { |
496 | builder: self.builder.property("max-time" , max_time), |
497 | ..self |
498 | } |
499 | } |
500 | |
501 | pub fn min_latency(self, min_latency: i64) -> Self { |
502 | Self { |
503 | builder: self.builder.property("min-latency" , min_latency), |
504 | ..self |
505 | } |
506 | } |
507 | |
508 | pub fn min_percent(self, min_percent: u32) -> Self { |
509 | Self { |
510 | builder: self.builder.property("min-percent" , min_percent), |
511 | ..self |
512 | } |
513 | } |
514 | |
515 | pub fn size(self, size: i64) -> Self { |
516 | Self { |
517 | builder: self.builder.property("size" , size), |
518 | ..self |
519 | } |
520 | } |
521 | |
522 | pub fn stream_type(self, stream_type: crate::AppStreamType) -> Self { |
523 | Self { |
524 | builder: self.builder.property("stream-type" , stream_type), |
525 | ..self |
526 | } |
527 | } |
528 | |
529 | pub fn name(self, name: impl Into<glib::GString>) -> Self { |
530 | Self { |
531 | builder: self.builder.property("name" , name.into()), |
532 | ..self |
533 | } |
534 | } |
535 | } |
536 | |
537 | #[derive (Debug)] |
538 | pub struct AppSrcSink { |
539 | app_src: glib::WeakRef<AppSrc>, |
540 | waker_reference: Arc<Mutex<Option<Waker>>>, |
541 | } |
542 | |
543 | impl AppSrcSink { |
544 | fn new(app_src: &AppSrc) -> Self { |
545 | skip_assert_initialized!(); |
546 | |
547 | let waker_reference = Arc::new(Mutex::new(None as Option<Waker>)); |
548 | |
549 | app_src.set_callbacks( |
550 | AppSrcCallbacks::builder() |
551 | .need_data({ |
552 | let waker_reference = Arc::clone(&waker_reference); |
553 | |
554 | move |_, _| { |
555 | if let Some(waker) = waker_reference.lock().unwrap().take() { |
556 | waker.wake(); |
557 | } |
558 | } |
559 | }) |
560 | .build(), |
561 | ); |
562 | |
563 | Self { |
564 | app_src: app_src.downgrade(), |
565 | waker_reference, |
566 | } |
567 | } |
568 | } |
569 | |
570 | impl Drop for AppSrcSink { |
571 | fn drop(&mut self) { |
572 | #[cfg (not(feature = "v1_18" ))] |
573 | { |
574 | // This is not thread-safe before 1.16.3, see |
575 | // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 |
576 | if gst::version() >= (1, 16, 3, 0) { |
577 | if let Some(app_src: AppSrc) = self.app_src.upgrade() { |
578 | app_src.set_callbacks(AppSrcCallbacks::builder().build()); |
579 | } |
580 | } |
581 | } |
582 | } |
583 | } |
584 | |
585 | impl Sink<gst::Sample> for AppSrcSink { |
586 | type Error = gst::FlowError; |
587 | |
588 | fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> { |
589 | let mut waker = self.waker_reference.lock().unwrap(); |
590 | |
591 | let Some(app_src) = self.app_src.upgrade() else { |
592 | return Poll::Ready(Err(gst::FlowError::Eos)); |
593 | }; |
594 | |
595 | let current_level_bytes = app_src.current_level_bytes(); |
596 | let max_bytes = app_src.max_bytes(); |
597 | |
598 | if current_level_bytes >= max_bytes && max_bytes != 0 { |
599 | waker.replace(context.waker().to_owned()); |
600 | |
601 | Poll::Pending |
602 | } else { |
603 | Poll::Ready(Ok(())) |
604 | } |
605 | } |
606 | |
607 | fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> { |
608 | let Some(app_src) = self.app_src.upgrade() else { |
609 | return Err(gst::FlowError::Eos); |
610 | }; |
611 | |
612 | app_src.push_sample(&sample)?; |
613 | |
614 | Ok(()) |
615 | } |
616 | |
617 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> { |
618 | Poll::Ready(Ok(())) |
619 | } |
620 | |
621 | fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> { |
622 | let Some(app_src) = self.app_src.upgrade() else { |
623 | return Poll::Ready(Ok(())); |
624 | }; |
625 | |
626 | app_src.end_of_stream()?; |
627 | |
628 | Poll::Ready(Ok(())) |
629 | } |
630 | } |
631 | |
632 | #[cfg (test)] |
633 | mod tests { |
634 | use std::sync::atomic::{AtomicUsize, Ordering}; |
635 | |
636 | use futures_util::{sink::SinkExt, stream::StreamExt}; |
637 | use gst::prelude::*; |
638 | |
639 | use super::*; |
640 | |
641 | #[test ] |
642 | fn test_app_src_sink() { |
643 | gst::init().unwrap(); |
644 | |
645 | let appsrc = gst::ElementFactory::make("appsrc" ).build().unwrap(); |
646 | let fakesink = gst::ElementFactory::make("fakesink" ) |
647 | .property("signal-handoffs" , true) |
648 | .build() |
649 | .unwrap(); |
650 | |
651 | let pipeline = gst::Pipeline::new(); |
652 | pipeline.add(&appsrc).unwrap(); |
653 | pipeline.add(&fakesink).unwrap(); |
654 | |
655 | appsrc.link(&fakesink).unwrap(); |
656 | |
657 | let mut bus_stream = pipeline.bus().unwrap().stream(); |
658 | let mut app_src_sink = appsrc.dynamic_cast::<AppSrc>().unwrap().sink(); |
659 | |
660 | let sample_quantity = 5; |
661 | |
662 | let samples = (0..sample_quantity) |
663 | .map(|_| gst::Sample::builder().buffer(&gst::Buffer::new()).build()) |
664 | .collect::<Vec<gst::Sample>>(); |
665 | |
666 | let mut sample_stream = futures_util::stream::iter(samples).map(Ok); |
667 | |
668 | let handoff_count_reference = Arc::new(AtomicUsize::new(0)); |
669 | |
670 | fakesink.connect("handoff" , false, { |
671 | let handoff_count_reference = Arc::clone(&handoff_count_reference); |
672 | |
673 | move |_| { |
674 | handoff_count_reference.fetch_add(1, Ordering::AcqRel); |
675 | |
676 | None |
677 | } |
678 | }); |
679 | |
680 | pipeline.set_state(gst::State::Playing).unwrap(); |
681 | |
682 | futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap(); |
683 | futures_executor::block_on(app_src_sink.close()).unwrap(); |
684 | |
685 | while let Some(message) = futures_executor::block_on(bus_stream.next()) { |
686 | match message.view() { |
687 | gst::MessageView::Eos(_) => break, |
688 | gst::MessageView::Error(_) => unreachable!(), |
689 | _ => continue, |
690 | } |
691 | } |
692 | |
693 | pipeline.set_state(gst::State::Null).unwrap(); |
694 | |
695 | assert_eq!( |
696 | handoff_count_reference.load(Ordering::Acquire), |
697 | sample_quantity |
698 | ); |
699 | } |
700 | } |
701 | |