1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{ |
4 | future, |
5 | mem::transmute, |
6 | pin::Pin, |
7 | task::{Context, Poll}, |
8 | }; |
9 | |
10 | use futures_channel::mpsc::{self, UnboundedReceiver}; |
11 | use futures_core::Stream; |
12 | use futures_util::{stream::FusedStream, StreamExt}; |
13 | use glib::{ |
14 | ffi::{gboolean, gpointer}, |
15 | prelude::*, |
16 | source::Priority, |
17 | translate::*, |
18 | ControlFlow, |
19 | }; |
20 | |
21 | use crate::{Bus, BusSyncReply, Message, MessageType}; |
22 | |
23 | unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>( |
24 | bus: *mut ffi::GstBus, |
25 | msg: *mut ffi::GstMessage, |
26 | func: gpointer, |
27 | ) -> gboolean { |
28 | let func: &mut F = &mut *(func as *mut F); |
29 | func(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib() |
30 | } |
31 | |
32 | unsafe extern "C" fn destroy_closure_watch< |
33 | F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static, |
34 | >( |
35 | ptr: gpointer, |
36 | ) { |
37 | let _ = Box::<F>::from_raw(ptr as *mut _); |
38 | } |
39 | |
40 | fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer { |
41 | #[allow (clippy::type_complexity)] |
42 | let func: Box<F> = Box::new(func); |
43 | Box::into_raw(func) as gpointer |
44 | } |
45 | |
46 | unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>( |
47 | bus: *mut ffi::GstBus, |
48 | msg: *mut ffi::GstMessage, |
49 | func: gpointer, |
50 | ) -> gboolean { |
51 | let func: &mut glib::thread_guard::ThreadGuard<F> = |
52 | &mut *(func as *mut glib::thread_guard::ThreadGuard<F>); |
53 | (func.get_mut())(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib() |
54 | } |
55 | |
56 | unsafe extern "C" fn destroy_closure_watch_local< |
57 | F: FnMut(&Bus, &Message) -> ControlFlow + 'static, |
58 | >( |
59 | ptr: gpointer, |
60 | ) { |
61 | let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _); |
62 | } |
63 | |
64 | fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer { |
65 | #[allow (clippy::type_complexity)] |
66 | let func: Box<glib::thread_guard::ThreadGuard<F>> = |
67 | Box::new(glib::thread_guard::ThreadGuard::new(func)); |
68 | Box::into_raw(func) as gpointer |
69 | } |
70 | |
71 | unsafe extern "C" fn trampoline_sync< |
72 | F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, |
73 | >( |
74 | bus: *mut ffi::GstBus, |
75 | msg: *mut ffi::GstMessage, |
76 | func: gpointer, |
77 | ) -> ffi::GstBusSyncReply { |
78 | let f: &F = &*(func as *const F); |
79 | let res: i32 = f(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib(); |
80 | |
81 | if res == ffi::GST_BUS_DROP { |
82 | ffi::gst_mini_object_unref(mini_object:msg as *mut _); |
83 | } |
84 | |
85 | res |
86 | } |
87 | |
88 | unsafe extern "C" fn destroy_closure_sync< |
89 | F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, |
90 | >( |
91 | ptr: gpointer, |
92 | ) { |
93 | let _ = Box::<F>::from_raw(ptr as *mut _); |
94 | } |
95 | |
96 | fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>( |
97 | func: F, |
98 | ) -> gpointer { |
99 | let func: Box<F> = Box::new(func); |
100 | Box::into_raw(func) as gpointer |
101 | } |
102 | |
103 | impl Bus { |
104 | #[doc (alias = "gst_bus_add_signal_watch" )] |
105 | #[doc (alias = "gst_bus_add_signal_watch_full" )] |
106 | pub fn add_signal_watch_full(&self, priority: Priority) { |
107 | unsafe { |
108 | ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib()); |
109 | } |
110 | } |
111 | |
112 | #[doc (alias = "gst_bus_create_watch" )] |
113 | pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source |
114 | where |
115 | F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static, |
116 | { |
117 | skip_assert_initialized!(); |
118 | unsafe { |
119 | let source = ffi::gst_bus_create_watch(self.to_glib_none().0); |
120 | glib::ffi::g_source_set_callback( |
121 | source, |
122 | Some(transmute::< |
123 | _, |
124 | unsafe extern "C" fn(glib::ffi::gpointer) -> i32, |
125 | >(trampoline_watch::<F> as *const ())), |
126 | into_raw_watch(func), |
127 | Some(destroy_closure_watch::<F>), |
128 | ); |
129 | glib::ffi::g_source_set_priority(source, priority.into_glib()); |
130 | |
131 | if let Some(name) = name { |
132 | glib::ffi::g_source_set_name(source, name.to_glib_none().0); |
133 | } |
134 | |
135 | from_glib_full(source) |
136 | } |
137 | } |
138 | |
139 | #[doc (alias = "gst_bus_add_watch" )] |
140 | #[doc (alias = "gst_bus_add_watch_full" )] |
141 | pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError> |
142 | where |
143 | F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static, |
144 | { |
145 | unsafe { |
146 | let res = ffi::gst_bus_add_watch_full( |
147 | self.to_glib_none().0, |
148 | glib::ffi::G_PRIORITY_DEFAULT, |
149 | Some(trampoline_watch::<F>), |
150 | into_raw_watch(func), |
151 | Some(destroy_closure_watch::<F>), |
152 | ); |
153 | |
154 | if res == 0 { |
155 | Err(glib::bool_error!("Bus already has a watch" )) |
156 | } else { |
157 | Ok(BusWatchGuard { bus: self.clone() }) |
158 | } |
159 | } |
160 | } |
161 | |
162 | #[doc (alias = "gst_bus_add_watch" )] |
163 | #[doc (alias = "gst_bus_add_watch_full" )] |
164 | pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError> |
165 | where |
166 | F: FnMut(&Bus, &Message) -> ControlFlow + 'static, |
167 | { |
168 | unsafe { |
169 | let ctx = glib::MainContext::ref_thread_default(); |
170 | let _acquire = ctx |
171 | .acquire() |
172 | .expect("thread default main context already acquired by another thread" ); |
173 | |
174 | let res = ffi::gst_bus_add_watch_full( |
175 | self.to_glib_none().0, |
176 | glib::ffi::G_PRIORITY_DEFAULT, |
177 | Some(trampoline_watch_local::<F>), |
178 | into_raw_watch_local(func), |
179 | Some(destroy_closure_watch_local::<F>), |
180 | ); |
181 | |
182 | if res == 0 { |
183 | Err(glib::bool_error!("Bus already has a watch" )) |
184 | } else { |
185 | Ok(BusWatchGuard { bus: self.clone() }) |
186 | } |
187 | } |
188 | } |
189 | |
190 | #[doc (alias = "gst_bus_set_sync_handler" )] |
191 | pub fn set_sync_handler<F>(&self, func: F) |
192 | where |
193 | F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, |
194 | { |
195 | #[cfg (not(feature = "v1_18" ))] |
196 | use glib::once_cell::sync::Lazy; |
197 | #[cfg (not(feature = "v1_18" ))] |
198 | static SET_ONCE_QUARK: Lazy<glib::Quark> = |
199 | Lazy::new(|| glib::Quark::from_str("gstreamer-rs-sync-handler" )); |
200 | |
201 | unsafe { |
202 | let bus = self.to_glib_none().0; |
203 | |
204 | #[cfg (not(feature = "v1_18" ))] |
205 | { |
206 | // This is not thread-safe before 1.16.3, see |
207 | // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416 |
208 | if crate::version() < (1, 16, 3, 0) { |
209 | if !glib::gobject_ffi::g_object_get_qdata( |
210 | bus as *mut _, |
211 | SET_ONCE_QUARK.into_glib(), |
212 | ) |
213 | .is_null() |
214 | { |
215 | panic!("Bus sync handler can only be set once" ); |
216 | } |
217 | |
218 | glib::gobject_ffi::g_object_set_qdata( |
219 | bus as *mut _, |
220 | SET_ONCE_QUARK.into_glib(), |
221 | 1 as *mut _, |
222 | ); |
223 | } |
224 | } |
225 | |
226 | ffi::gst_bus_set_sync_handler( |
227 | bus, |
228 | Some(trampoline_sync::<F>), |
229 | into_raw_sync(func), |
230 | Some(destroy_closure_sync::<F>), |
231 | ) |
232 | } |
233 | } |
234 | |
235 | pub fn unset_sync_handler(&self) { |
236 | #[cfg (not(feature = "v1_18" ))] |
237 | { |
238 | // This is not thread-safe before 1.16.3, see |
239 | // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416 |
240 | if crate::version() < (1, 16, 3, 0) { |
241 | return; |
242 | } |
243 | } |
244 | |
245 | unsafe { |
246 | use std::ptr; |
247 | |
248 | ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) |
249 | } |
250 | } |
251 | |
252 | #[doc (alias = "gst_bus_pop" )] |
253 | pub fn iter(&self) -> Iter { |
254 | self.iter_timed(Some(crate::ClockTime::ZERO)) |
255 | } |
256 | |
257 | #[doc (alias = "gst_bus_timed_pop" )] |
258 | pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter { |
259 | Iter { |
260 | bus: self, |
261 | timeout: timeout.into(), |
262 | } |
263 | } |
264 | |
265 | #[doc (alias = "gst_bus_pop_filtered" )] |
266 | pub fn iter_filtered<'a>( |
267 | &'a self, |
268 | msg_types: &'a [MessageType], |
269 | ) -> impl Iterator<Item = Message> + 'a { |
270 | self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types) |
271 | } |
272 | |
273 | #[doc (alias = "gst_bus_timed_pop_filtered" )] |
274 | pub fn iter_timed_filtered<'a>( |
275 | &'a self, |
276 | timeout: impl Into<Option<crate::ClockTime>>, |
277 | msg_types: &'a [MessageType], |
278 | ) -> impl Iterator<Item = Message> + 'a { |
279 | self.iter_timed(timeout) |
280 | .filter(move |msg| msg_types.contains(&msg.type_())) |
281 | } |
282 | |
283 | #[doc (alias = "gst_bus_timed_pop_filtered" )] |
284 | pub fn timed_pop_filtered( |
285 | &self, |
286 | timeout: impl Into<Option<crate::ClockTime>> + Clone, |
287 | msg_types: &[MessageType], |
288 | ) -> Option<Message> { |
289 | loop { |
290 | let msg = self.timed_pop(timeout.clone())?; |
291 | if msg_types.contains(&msg.type_()) { |
292 | return Some(msg); |
293 | } |
294 | } |
295 | } |
296 | |
297 | #[doc (alias = "gst_bus_pop_filtered" )] |
298 | pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> { |
299 | loop { |
300 | let msg = self.pop()?; |
301 | if msg_types.contains(&msg.type_()) { |
302 | return Some(msg); |
303 | } |
304 | } |
305 | } |
306 | |
307 | pub fn stream(&self) -> BusStream { |
308 | BusStream::new(self) |
309 | } |
310 | |
311 | pub fn stream_filtered<'a>( |
312 | &self, |
313 | message_types: &'a [MessageType], |
314 | ) -> impl Stream<Item = Message> + Unpin + FusedStream + Send + 'a { |
315 | self.stream().filter(move |message| { |
316 | let message_type = message.type_(); |
317 | |
318 | future::ready(message_types.contains(&message_type)) |
319 | }) |
320 | } |
321 | } |
322 | |
323 | #[derive (Debug)] |
324 | pub struct Iter<'a> { |
325 | bus: &'a Bus, |
326 | timeout: Option<crate::ClockTime>, |
327 | } |
328 | |
329 | impl<'a> Iterator for Iter<'a> { |
330 | type Item = Message; |
331 | |
332 | fn next(&mut self) -> Option<Message> { |
333 | self.bus.timed_pop(self.timeout) |
334 | } |
335 | } |
336 | |
337 | #[derive (Debug)] |
338 | pub struct BusStream { |
339 | bus: glib::WeakRef<Bus>, |
340 | receiver: UnboundedReceiver<Message>, |
341 | } |
342 | |
343 | impl BusStream { |
344 | fn new(bus: &Bus) -> Self { |
345 | skip_assert_initialized!(); |
346 | |
347 | let (sender: UnboundedSender, receiver: UnboundedReceiver) = mpsc::unbounded(); |
348 | |
349 | bus.set_sync_handler(func:move |_, message: &Message| { |
350 | let _ = sender.unbounded_send(msg:message.to_owned()); |
351 | |
352 | BusSyncReply::Drop |
353 | }); |
354 | |
355 | Self { |
356 | bus: bus.downgrade(), |
357 | receiver, |
358 | } |
359 | } |
360 | } |
361 | |
362 | impl Drop for BusStream { |
363 | fn drop(&mut self) { |
364 | if let Some(bus: Bus) = self.bus.upgrade() { |
365 | bus.unset_sync_handler(); |
366 | } |
367 | } |
368 | } |
369 | |
370 | impl Stream for BusStream { |
371 | type Item = Message; |
372 | |
373 | fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> { |
374 | self.receiver.poll_next_unpin(cx:context) |
375 | } |
376 | } |
377 | |
378 | impl FusedStream for BusStream { |
379 | fn is_terminated(&self) -> bool { |
380 | self.receiver.is_terminated() |
381 | } |
382 | } |
383 | |
384 | // rustdoc-stripper-ignore-next |
385 | /// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`] |
386 | /// |
387 | /// When dropped the bus watch is removed from the bus. |
388 | #[derive (Debug)] |
389 | #[must_use = "if unused the bus watch will immediately be removed" ] |
390 | pub struct BusWatchGuard { |
391 | bus: Bus, |
392 | } |
393 | |
394 | impl Drop for BusWatchGuard { |
395 | fn drop(&mut self) { |
396 | let _ = self.bus.remove_watch(); |
397 | } |
398 | } |
399 | |
400 | #[cfg (test)] |
401 | mod tests { |
402 | use std::sync::{Arc, Mutex}; |
403 | |
404 | use super::*; |
405 | |
406 | #[test ] |
407 | fn test_sync_handler() { |
408 | crate::init().unwrap(); |
409 | |
410 | let bus = Bus::new(); |
411 | let msgs = Arc::new(Mutex::new(Vec::new())); |
412 | let msgs_clone = msgs.clone(); |
413 | bus.set_sync_handler(move |_, msg| { |
414 | msgs_clone.lock().unwrap().push(msg.clone()); |
415 | BusSyncReply::Pass |
416 | }); |
417 | |
418 | bus.post(crate::message::Eos::new()).unwrap(); |
419 | |
420 | let msgs = msgs.lock().unwrap(); |
421 | assert_eq!(msgs.len(), 1); |
422 | match msgs[0].view() { |
423 | crate::MessageView::Eos(_) => (), |
424 | _ => unreachable!(), |
425 | } |
426 | } |
427 | |
428 | #[test ] |
429 | fn test_bus_stream() { |
430 | crate::init().unwrap(); |
431 | |
432 | let bus = Bus::new(); |
433 | let bus_stream = bus.stream(); |
434 | |
435 | let eos_message = crate::message::Eos::new(); |
436 | bus.post(eos_message).unwrap(); |
437 | |
438 | let bus_future = bus_stream.into_future(); |
439 | let (message, _) = futures_executor::block_on(bus_future); |
440 | |
441 | match message.unwrap().view() { |
442 | crate::MessageView::Eos(_) => (), |
443 | _ => unreachable!(), |
444 | } |
445 | } |
446 | } |
447 | |