1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4 future,
5 mem::transmute,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures_channel::mpsc::{self, UnboundedReceiver};
11use futures_core::Stream;
12use futures_util::{stream::FusedStream, StreamExt};
13use glib::{
14 ffi::{gboolean, gpointer},
15 prelude::*,
16 source::Priority,
17 translate::*,
18 ControlFlow,
19};
20
21use crate::{Bus, BusSyncReply, Message, MessageType};
22
23unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
24 bus: *mut ffi::GstBus,
25 msg: *mut ffi::GstMessage,
26 func: gpointer,
27) -> gboolean {
28 let func: &mut F = &mut *(func as *mut F);
29 func(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib()
30}
31
32unsafe extern "C" fn destroy_closure_watch<
33 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
34>(
35 ptr: gpointer,
36) {
37 let _ = Box::<F>::from_raw(ptr as *mut _);
38}
39
40fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
41 #[allow(clippy::type_complexity)]
42 let func: Box<F> = Box::new(func);
43 Box::into_raw(func) as gpointer
44}
45
46unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
47 bus: *mut ffi::GstBus,
48 msg: *mut ffi::GstMessage,
49 func: gpointer,
50) -> gboolean {
51 let func: &mut glib::thread_guard::ThreadGuard<F> =
52 &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
53 (func.get_mut())(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib()
54}
55
56unsafe extern "C" fn destroy_closure_watch_local<
57 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
58>(
59 ptr: gpointer,
60) {
61 let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
62}
63
64fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
65 #[allow(clippy::type_complexity)]
66 let func: Box<glib::thread_guard::ThreadGuard<F>> =
67 Box::new(glib::thread_guard::ThreadGuard::new(func));
68 Box::into_raw(func) as gpointer
69}
70
71unsafe extern "C" fn trampoline_sync<
72 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
73>(
74 bus: *mut ffi::GstBus,
75 msg: *mut ffi::GstMessage,
76 func: gpointer,
77) -> ffi::GstBusSyncReply {
78 let f: &F = &*(func as *const F);
79 let res: i32 = f(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib();
80
81 if res == ffi::GST_BUS_DROP {
82 ffi::gst_mini_object_unref(mini_object:msg as *mut _);
83 }
84
85 res
86}
87
88unsafe extern "C" fn destroy_closure_sync<
89 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
90>(
91 ptr: gpointer,
92) {
93 let _ = Box::<F>::from_raw(ptr as *mut _);
94}
95
96fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
97 func: F,
98) -> gpointer {
99 let func: Box<F> = Box::new(func);
100 Box::into_raw(func) as gpointer
101}
102
103impl Bus {
104 #[doc(alias = "gst_bus_add_signal_watch")]
105 #[doc(alias = "gst_bus_add_signal_watch_full")]
106 pub fn add_signal_watch_full(&self, priority: Priority) {
107 unsafe {
108 ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
109 }
110 }
111
112 #[doc(alias = "gst_bus_create_watch")]
113 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
114 where
115 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
116 {
117 skip_assert_initialized!();
118 unsafe {
119 let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
120 glib::ffi::g_source_set_callback(
121 source,
122 Some(transmute::<
123 _,
124 unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
125 >(trampoline_watch::<F> as *const ())),
126 into_raw_watch(func),
127 Some(destroy_closure_watch::<F>),
128 );
129 glib::ffi::g_source_set_priority(source, priority.into_glib());
130
131 if let Some(name) = name {
132 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
133 }
134
135 from_glib_full(source)
136 }
137 }
138
139 #[doc(alias = "gst_bus_add_watch")]
140 #[doc(alias = "gst_bus_add_watch_full")]
141 pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
142 where
143 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
144 {
145 unsafe {
146 let res = ffi::gst_bus_add_watch_full(
147 self.to_glib_none().0,
148 glib::ffi::G_PRIORITY_DEFAULT,
149 Some(trampoline_watch::<F>),
150 into_raw_watch(func),
151 Some(destroy_closure_watch::<F>),
152 );
153
154 if res == 0 {
155 Err(glib::bool_error!("Bus already has a watch"))
156 } else {
157 Ok(BusWatchGuard { bus: self.clone() })
158 }
159 }
160 }
161
162 #[doc(alias = "gst_bus_add_watch")]
163 #[doc(alias = "gst_bus_add_watch_full")]
164 pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
165 where
166 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
167 {
168 unsafe {
169 let ctx = glib::MainContext::ref_thread_default();
170 let _acquire = ctx
171 .acquire()
172 .expect("thread default main context already acquired by another thread");
173
174 let res = ffi::gst_bus_add_watch_full(
175 self.to_glib_none().0,
176 glib::ffi::G_PRIORITY_DEFAULT,
177 Some(trampoline_watch_local::<F>),
178 into_raw_watch_local(func),
179 Some(destroy_closure_watch_local::<F>),
180 );
181
182 if res == 0 {
183 Err(glib::bool_error!("Bus already has a watch"))
184 } else {
185 Ok(BusWatchGuard { bus: self.clone() })
186 }
187 }
188 }
189
190 #[doc(alias = "gst_bus_set_sync_handler")]
191 pub fn set_sync_handler<F>(&self, func: F)
192 where
193 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
194 {
195 #[cfg(not(feature = "v1_18"))]
196 use glib::once_cell::sync::Lazy;
197 #[cfg(not(feature = "v1_18"))]
198 static SET_ONCE_QUARK: Lazy<glib::Quark> =
199 Lazy::new(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
200
201 unsafe {
202 let bus = self.to_glib_none().0;
203
204 #[cfg(not(feature = "v1_18"))]
205 {
206 // This is not thread-safe before 1.16.3, see
207 // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
208 if crate::version() < (1, 16, 3, 0) {
209 if !glib::gobject_ffi::g_object_get_qdata(
210 bus as *mut _,
211 SET_ONCE_QUARK.into_glib(),
212 )
213 .is_null()
214 {
215 panic!("Bus sync handler can only be set once");
216 }
217
218 glib::gobject_ffi::g_object_set_qdata(
219 bus as *mut _,
220 SET_ONCE_QUARK.into_glib(),
221 1 as *mut _,
222 );
223 }
224 }
225
226 ffi::gst_bus_set_sync_handler(
227 bus,
228 Some(trampoline_sync::<F>),
229 into_raw_sync(func),
230 Some(destroy_closure_sync::<F>),
231 )
232 }
233 }
234
235 pub fn unset_sync_handler(&self) {
236 #[cfg(not(feature = "v1_18"))]
237 {
238 // This is not thread-safe before 1.16.3, see
239 // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
240 if crate::version() < (1, 16, 3, 0) {
241 return;
242 }
243 }
244
245 unsafe {
246 use std::ptr;
247
248 ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
249 }
250 }
251
252 #[doc(alias = "gst_bus_pop")]
253 pub fn iter(&self) -> Iter {
254 self.iter_timed(Some(crate::ClockTime::ZERO))
255 }
256
257 #[doc(alias = "gst_bus_timed_pop")]
258 pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
259 Iter {
260 bus: self,
261 timeout: timeout.into(),
262 }
263 }
264
265 #[doc(alias = "gst_bus_pop_filtered")]
266 pub fn iter_filtered<'a>(
267 &'a self,
268 msg_types: &'a [MessageType],
269 ) -> impl Iterator<Item = Message> + 'a {
270 self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
271 }
272
273 #[doc(alias = "gst_bus_timed_pop_filtered")]
274 pub fn iter_timed_filtered<'a>(
275 &'a self,
276 timeout: impl Into<Option<crate::ClockTime>>,
277 msg_types: &'a [MessageType],
278 ) -> impl Iterator<Item = Message> + 'a {
279 self.iter_timed(timeout)
280 .filter(move |msg| msg_types.contains(&msg.type_()))
281 }
282
283 #[doc(alias = "gst_bus_timed_pop_filtered")]
284 pub fn timed_pop_filtered(
285 &self,
286 timeout: impl Into<Option<crate::ClockTime>> + Clone,
287 msg_types: &[MessageType],
288 ) -> Option<Message> {
289 loop {
290 let msg = self.timed_pop(timeout.clone())?;
291 if msg_types.contains(&msg.type_()) {
292 return Some(msg);
293 }
294 }
295 }
296
297 #[doc(alias = "gst_bus_pop_filtered")]
298 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
299 loop {
300 let msg = self.pop()?;
301 if msg_types.contains(&msg.type_()) {
302 return Some(msg);
303 }
304 }
305 }
306
307 pub fn stream(&self) -> BusStream {
308 BusStream::new(self)
309 }
310
311 pub fn stream_filtered<'a>(
312 &self,
313 message_types: &'a [MessageType],
314 ) -> impl Stream<Item = Message> + Unpin + FusedStream + Send + 'a {
315 self.stream().filter(move |message| {
316 let message_type = message.type_();
317
318 future::ready(message_types.contains(&message_type))
319 })
320 }
321}
322
323#[derive(Debug)]
324pub struct Iter<'a> {
325 bus: &'a Bus,
326 timeout: Option<crate::ClockTime>,
327}
328
329impl<'a> Iterator for Iter<'a> {
330 type Item = Message;
331
332 fn next(&mut self) -> Option<Message> {
333 self.bus.timed_pop(self.timeout)
334 }
335}
336
337#[derive(Debug)]
338pub struct BusStream {
339 bus: glib::WeakRef<Bus>,
340 receiver: UnboundedReceiver<Message>,
341}
342
343impl BusStream {
344 fn new(bus: &Bus) -> Self {
345 skip_assert_initialized!();
346
347 let (sender: UnboundedSender, receiver: UnboundedReceiver) = mpsc::unbounded();
348
349 bus.set_sync_handler(func:move |_, message: &Message| {
350 let _ = sender.unbounded_send(msg:message.to_owned());
351
352 BusSyncReply::Drop
353 });
354
355 Self {
356 bus: bus.downgrade(),
357 receiver,
358 }
359 }
360}
361
362impl Drop for BusStream {
363 fn drop(&mut self) {
364 if let Some(bus: Bus) = self.bus.upgrade() {
365 bus.unset_sync_handler();
366 }
367 }
368}
369
370impl Stream for BusStream {
371 type Item = Message;
372
373 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
374 self.receiver.poll_next_unpin(cx:context)
375 }
376}
377
378impl FusedStream for BusStream {
379 fn is_terminated(&self) -> bool {
380 self.receiver.is_terminated()
381 }
382}
383
384// rustdoc-stripper-ignore-next
385/// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`]
386///
387/// When dropped the bus watch is removed from the bus.
388#[derive(Debug)]
389#[must_use = "if unused the bus watch will immediately be removed"]
390pub struct BusWatchGuard {
391 bus: Bus,
392}
393
394impl Drop for BusWatchGuard {
395 fn drop(&mut self) {
396 let _ = self.bus.remove_watch();
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use std::sync::{Arc, Mutex};
403
404 use super::*;
405
406 #[test]
407 fn test_sync_handler() {
408 crate::init().unwrap();
409
410 let bus = Bus::new();
411 let msgs = Arc::new(Mutex::new(Vec::new()));
412 let msgs_clone = msgs.clone();
413 bus.set_sync_handler(move |_, msg| {
414 msgs_clone.lock().unwrap().push(msg.clone());
415 BusSyncReply::Pass
416 });
417
418 bus.post(crate::message::Eos::new()).unwrap();
419
420 let msgs = msgs.lock().unwrap();
421 assert_eq!(msgs.len(), 1);
422 match msgs[0].view() {
423 crate::MessageView::Eos(_) => (),
424 _ => unreachable!(),
425 }
426 }
427
428 #[test]
429 fn test_bus_stream() {
430 crate::init().unwrap();
431
432 let bus = Bus::new();
433 let bus_stream = bus.stream();
434
435 let eos_message = crate::message::Eos::new();
436 bus.post(eos_message).unwrap();
437
438 let bus_future = bus_stream.into_future();
439 let (message, _) = futures_executor::block_on(bus_future);
440
441 match message.unwrap().view() {
442 crate::MessageView::Eos(_) => (),
443 _ => unreachable!(),
444 }
445 }
446}
447