| 1 | // Take a look at the license at the top of the repository in the LICENSE file. |
| 2 | |
| 3 | use std::{ |
| 4 | future, |
| 5 | mem::transmute, |
| 6 | pin::Pin, |
| 7 | task::{Context, Poll}, |
| 8 | }; |
| 9 | |
| 10 | use futures_channel::mpsc::{self, UnboundedReceiver}; |
| 11 | use futures_core::Stream; |
| 12 | use futures_util::{stream::FusedStream, StreamExt}; |
| 13 | use glib::{ |
| 14 | ffi::{gboolean, gpointer}, |
| 15 | prelude::*, |
| 16 | source::Priority, |
| 17 | translate::*, |
| 18 | ControlFlow, |
| 19 | }; |
| 20 | |
| 21 | use crate::{ffi, Bus, BusSyncReply, Message, MessageType}; |
| 22 | |
| 23 | unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>( |
| 24 | bus: *mut ffi::GstBus, |
| 25 | msg: *mut ffi::GstMessage, |
| 26 | func: gpointer, |
| 27 | ) -> gboolean { |
| 28 | let func: &mut F = &mut *(func as *mut F); |
| 29 | func(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib() |
| 30 | } |
| 31 | |
| 32 | unsafe extern "C" fn destroy_closure_watch< |
| 33 | F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static, |
| 34 | >( |
| 35 | ptr: gpointer, |
| 36 | ) { |
| 37 | let _ = Box::<F>::from_raw(ptr as *mut _); |
| 38 | } |
| 39 | |
| 40 | fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer { |
| 41 | #[allow (clippy::type_complexity)] |
| 42 | let func: Box<F> = Box::new(func); |
| 43 | Box::into_raw(func) as gpointer |
| 44 | } |
| 45 | |
| 46 | unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>( |
| 47 | bus: *mut ffi::GstBus, |
| 48 | msg: *mut ffi::GstMessage, |
| 49 | func: gpointer, |
| 50 | ) -> gboolean { |
| 51 | let func: &mut glib::thread_guard::ThreadGuard<F> = |
| 52 | &mut *(func as *mut glib::thread_guard::ThreadGuard<F>); |
| 53 | (func.get_mut())(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib() |
| 54 | } |
| 55 | |
| 56 | unsafe extern "C" fn destroy_closure_watch_local< |
| 57 | F: FnMut(&Bus, &Message) -> ControlFlow + 'static, |
| 58 | >( |
| 59 | ptr: gpointer, |
| 60 | ) { |
| 61 | let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _); |
| 62 | } |
| 63 | |
| 64 | fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer { |
| 65 | #[allow (clippy::type_complexity)] |
| 66 | let func: Box<glib::thread_guard::ThreadGuard<F>> = |
| 67 | Box::new(glib::thread_guard::ThreadGuard::new(func)); |
| 68 | Box::into_raw(func) as gpointer |
| 69 | } |
| 70 | |
| 71 | unsafe extern "C" fn trampoline_sync< |
| 72 | F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, |
| 73 | >( |
| 74 | bus: *mut ffi::GstBus, |
| 75 | msg: *mut ffi::GstMessage, |
| 76 | func: gpointer, |
| 77 | ) -> ffi::GstBusSyncReply { |
| 78 | let f: &F = &*(func as *const F); |
| 79 | let res = f(&from_glib_borrow(ptr:bus), &Message::from_glib_borrow(ptr:msg)).into_glib(); |
| 80 | |
| 81 | if res == ffi::GST_BUS_DROP { |
| 82 | ffi::gst_mini_object_unref(mini_object:msg as *mut _); |
| 83 | } |
| 84 | |
| 85 | res |
| 86 | } |
| 87 | |
| 88 | unsafe extern "C" fn destroy_closure_sync< |
| 89 | F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, |
| 90 | >( |
| 91 | ptr: gpointer, |
| 92 | ) { |
| 93 | let _ = Box::<F>::from_raw(ptr as *mut _); |
| 94 | } |
| 95 | |
| 96 | fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>( |
| 97 | func: F, |
| 98 | ) -> gpointer { |
| 99 | let func: Box<F> = Box::new(func); |
| 100 | Box::into_raw(func) as gpointer |
| 101 | } |
| 102 | |
| 103 | impl Bus { |
| 104 | #[doc (alias = "gst_bus_add_signal_watch" )] |
| 105 | #[doc (alias = "gst_bus_add_signal_watch_full" )] |
| 106 | pub fn add_signal_watch_full(&self, priority: Priority) { |
| 107 | unsafe { |
| 108 | ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib()); |
| 109 | } |
| 110 | } |
| 111 | |
| 112 | #[doc (alias = "gst_bus_create_watch" )] |
| 113 | pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source |
| 114 | where |
| 115 | F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static, |
| 116 | { |
| 117 | skip_assert_initialized!(); |
| 118 | unsafe { |
| 119 | let source = ffi::gst_bus_create_watch(self.to_glib_none().0); |
| 120 | glib::ffi::g_source_set_callback( |
| 121 | source, |
| 122 | Some(transmute::< |
| 123 | *mut (), |
| 124 | unsafe extern "C" fn(glib::ffi::gpointer) -> i32, |
| 125 | >(trampoline_watch::<F> as *mut ())), |
| 126 | into_raw_watch(func), |
| 127 | Some(destroy_closure_watch::<F>), |
| 128 | ); |
| 129 | glib::ffi::g_source_set_priority(source, priority.into_glib()); |
| 130 | |
| 131 | if let Some(name) = name { |
| 132 | glib::ffi::g_source_set_name(source, name.to_glib_none().0); |
| 133 | } |
| 134 | |
| 135 | from_glib_full(source) |
| 136 | } |
| 137 | } |
| 138 | |
| 139 | #[doc (alias = "gst_bus_add_watch" )] |
| 140 | #[doc (alias = "gst_bus_add_watch_full" )] |
| 141 | pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError> |
| 142 | where |
| 143 | F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static, |
| 144 | { |
| 145 | unsafe { |
| 146 | let res = ffi::gst_bus_add_watch_full( |
| 147 | self.to_glib_none().0, |
| 148 | glib::ffi::G_PRIORITY_DEFAULT, |
| 149 | Some(trampoline_watch::<F>), |
| 150 | into_raw_watch(func), |
| 151 | Some(destroy_closure_watch::<F>), |
| 152 | ); |
| 153 | |
| 154 | if res == 0 { |
| 155 | Err(glib::bool_error!("Bus already has a watch" )) |
| 156 | } else { |
| 157 | Ok(BusWatchGuard { bus: self.clone() }) |
| 158 | } |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | #[doc (alias = "gst_bus_add_watch" )] |
| 163 | #[doc (alias = "gst_bus_add_watch_full" )] |
| 164 | pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError> |
| 165 | where |
| 166 | F: FnMut(&Bus, &Message) -> ControlFlow + 'static, |
| 167 | { |
| 168 | unsafe { |
| 169 | let ctx = glib::MainContext::ref_thread_default(); |
| 170 | let _acquire = ctx |
| 171 | .acquire() |
| 172 | .expect("thread default main context already acquired by another thread" ); |
| 173 | |
| 174 | let res = ffi::gst_bus_add_watch_full( |
| 175 | self.to_glib_none().0, |
| 176 | glib::ffi::G_PRIORITY_DEFAULT, |
| 177 | Some(trampoline_watch_local::<F>), |
| 178 | into_raw_watch_local(func), |
| 179 | Some(destroy_closure_watch_local::<F>), |
| 180 | ); |
| 181 | |
| 182 | if res == 0 { |
| 183 | Err(glib::bool_error!("Bus already has a watch" )) |
| 184 | } else { |
| 185 | Ok(BusWatchGuard { bus: self.clone() }) |
| 186 | } |
| 187 | } |
| 188 | } |
| 189 | |
| 190 | #[doc (alias = "gst_bus_set_sync_handler" )] |
| 191 | pub fn set_sync_handler<F>(&self, func: F) |
| 192 | where |
| 193 | F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, |
| 194 | { |
| 195 | unsafe { |
| 196 | let bus = self.to_glib_none().0; |
| 197 | |
| 198 | #[cfg (not(feature = "v1_18" ))] |
| 199 | { |
| 200 | static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> = |
| 201 | std::sync::OnceLock::new(); |
| 202 | |
| 203 | let set_once_quark = SET_ONCE_QUARK |
| 204 | .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler" )); |
| 205 | |
| 206 | // This is not thread-safe before 1.16.3, see |
| 207 | // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416 |
| 208 | if crate::version() < (1, 16, 3, 0) { |
| 209 | if !glib::gobject_ffi::g_object_get_qdata( |
| 210 | bus as *mut _, |
| 211 | set_once_quark.into_glib(), |
| 212 | ) |
| 213 | .is_null() |
| 214 | { |
| 215 | panic!("Bus sync handler can only be set once" ); |
| 216 | } |
| 217 | |
| 218 | glib::gobject_ffi::g_object_set_qdata( |
| 219 | bus as *mut _, |
| 220 | set_once_quark.into_glib(), |
| 221 | 1 as *mut _, |
| 222 | ); |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | ffi::gst_bus_set_sync_handler( |
| 227 | bus, |
| 228 | Some(trampoline_sync::<F>), |
| 229 | into_raw_sync(func), |
| 230 | Some(destroy_closure_sync::<F>), |
| 231 | ) |
| 232 | } |
| 233 | } |
| 234 | |
| 235 | pub fn unset_sync_handler(&self) { |
| 236 | #[cfg (not(feature = "v1_18" ))] |
| 237 | { |
| 238 | // This is not thread-safe before 1.16.3, see |
| 239 | // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416 |
| 240 | if crate::version() < (1, 16, 3, 0) { |
| 241 | return; |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | unsafe { |
| 246 | use std::ptr; |
| 247 | |
| 248 | ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) |
| 249 | } |
| 250 | } |
| 251 | |
| 252 | #[doc (alias = "gst_bus_pop" )] |
| 253 | pub fn iter(&self) -> Iter { |
| 254 | self.iter_timed(Some(crate::ClockTime::ZERO)) |
| 255 | } |
| 256 | |
| 257 | #[doc (alias = "gst_bus_timed_pop" )] |
| 258 | pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter { |
| 259 | Iter { |
| 260 | bus: self, |
| 261 | timeout: timeout.into(), |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | #[doc (alias = "gst_bus_pop_filtered" )] |
| 266 | pub fn iter_filtered<'a>( |
| 267 | &'a self, |
| 268 | msg_types: &'a [MessageType], |
| 269 | ) -> impl Iterator<Item = Message> + 'a { |
| 270 | self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types) |
| 271 | } |
| 272 | |
| 273 | #[doc (alias = "gst_bus_timed_pop_filtered" )] |
| 274 | pub fn iter_timed_filtered<'a>( |
| 275 | &'a self, |
| 276 | timeout: impl Into<Option<crate::ClockTime>>, |
| 277 | msg_types: &'a [MessageType], |
| 278 | ) -> impl Iterator<Item = Message> + 'a { |
| 279 | self.iter_timed(timeout) |
| 280 | .filter(move |msg| msg_types.contains(&msg.type_())) |
| 281 | } |
| 282 | |
| 283 | #[doc (alias = "gst_bus_timed_pop_filtered" )] |
| 284 | pub fn timed_pop_filtered( |
| 285 | &self, |
| 286 | timeout: impl Into<Option<crate::ClockTime>> + Clone, |
| 287 | msg_types: &[MessageType], |
| 288 | ) -> Option<Message> { |
| 289 | loop { |
| 290 | let msg = self.timed_pop(timeout.clone())?; |
| 291 | if msg_types.contains(&msg.type_()) { |
| 292 | return Some(msg); |
| 293 | } |
| 294 | } |
| 295 | } |
| 296 | |
| 297 | #[doc (alias = "gst_bus_pop_filtered" )] |
| 298 | pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> { |
| 299 | loop { |
| 300 | let msg = self.pop()?; |
| 301 | if msg_types.contains(&msg.type_()) { |
| 302 | return Some(msg); |
| 303 | } |
| 304 | } |
| 305 | } |
| 306 | |
| 307 | pub fn stream(&self) -> BusStream { |
| 308 | BusStream::new(self) |
| 309 | } |
| 310 | |
| 311 | pub fn stream_filtered<'a>( |
| 312 | &self, |
| 313 | message_types: &'a [MessageType], |
| 314 | ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a { |
| 315 | self.stream().filter(move |message| { |
| 316 | let message_type = message.type_(); |
| 317 | |
| 318 | future::ready(message_types.contains(&message_type)) |
| 319 | }) |
| 320 | } |
| 321 | } |
| 322 | |
| 323 | #[derive (Debug)] |
| 324 | pub struct Iter<'a> { |
| 325 | bus: &'a Bus, |
| 326 | timeout: Option<crate::ClockTime>, |
| 327 | } |
| 328 | |
| 329 | impl Iterator for Iter<'_> { |
| 330 | type Item = Message; |
| 331 | |
| 332 | fn next(&mut self) -> Option<Message> { |
| 333 | self.bus.timed_pop(self.timeout) |
| 334 | } |
| 335 | } |
| 336 | |
| 337 | #[derive (Debug)] |
| 338 | pub struct BusStream { |
| 339 | bus: glib::WeakRef<Bus>, |
| 340 | receiver: UnboundedReceiver<Message>, |
| 341 | } |
| 342 | |
| 343 | impl BusStream { |
| 344 | fn new(bus: &Bus) -> Self { |
| 345 | skip_assert_initialized!(); |
| 346 | |
| 347 | let (sender: UnboundedSender, receiver: UnboundedReceiver) = mpsc::unbounded(); |
| 348 | |
| 349 | bus.set_sync_handler(func:move |bus: &Bus, message: &Message| { |
| 350 | // First pop all messages that might've been previously queued before creating |
| 351 | // the bus stream. |
| 352 | while let Some(message: Message) = bus.pop() { |
| 353 | let _ = sender.unbounded_send(msg:message); |
| 354 | } |
| 355 | |
| 356 | let _ = sender.unbounded_send(msg:message.to_owned()); |
| 357 | |
| 358 | BusSyncReply::Drop |
| 359 | }); |
| 360 | |
| 361 | Self { |
| 362 | bus: bus.downgrade(), |
| 363 | receiver, |
| 364 | } |
| 365 | } |
| 366 | } |
| 367 | |
| 368 | impl Drop for BusStream { |
| 369 | fn drop(&mut self) { |
| 370 | if let Some(bus: Bus) = self.bus.upgrade() { |
| 371 | bus.unset_sync_handler(); |
| 372 | } |
| 373 | } |
| 374 | } |
| 375 | |
| 376 | impl Stream for BusStream { |
| 377 | type Item = Message; |
| 378 | |
| 379 | fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> { |
| 380 | self.receiver.poll_next_unpin(cx:context) |
| 381 | } |
| 382 | } |
| 383 | |
| 384 | impl FusedStream for BusStream { |
| 385 | fn is_terminated(&self) -> bool { |
| 386 | self.receiver.is_terminated() |
| 387 | } |
| 388 | } |
| 389 | |
| 390 | // rustdoc-stripper-ignore-next |
| 391 | /// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`] |
| 392 | /// |
| 393 | /// When dropped the bus watch is removed from the bus. |
| 394 | #[derive (Debug)] |
| 395 | #[must_use = "if unused the bus watch will immediately be removed" ] |
| 396 | pub struct BusWatchGuard { |
| 397 | bus: Bus, |
| 398 | } |
| 399 | |
| 400 | impl Drop for BusWatchGuard { |
| 401 | fn drop(&mut self) { |
| 402 | let _ = self.bus.remove_watch(); |
| 403 | } |
| 404 | } |
| 405 | |
| 406 | #[cfg (test)] |
| 407 | mod tests { |
| 408 | use std::sync::{Arc, Mutex}; |
| 409 | |
| 410 | use super::*; |
| 411 | |
| 412 | #[test ] |
| 413 | fn test_sync_handler() { |
| 414 | crate::init().unwrap(); |
| 415 | |
| 416 | let bus = Bus::new(); |
| 417 | let msgs = Arc::new(Mutex::new(Vec::new())); |
| 418 | let msgs_clone = msgs.clone(); |
| 419 | bus.set_sync_handler(move |_, msg| { |
| 420 | msgs_clone.lock().unwrap().push(msg.clone()); |
| 421 | BusSyncReply::Pass |
| 422 | }); |
| 423 | |
| 424 | bus.post(crate::message::Eos::new()).unwrap(); |
| 425 | |
| 426 | let msgs = msgs.lock().unwrap(); |
| 427 | assert_eq!(msgs.len(), 1); |
| 428 | match msgs[0].view() { |
| 429 | crate::MessageView::Eos(_) => (), |
| 430 | _ => unreachable!(), |
| 431 | } |
| 432 | } |
| 433 | |
| 434 | #[test ] |
| 435 | fn test_bus_stream() { |
| 436 | crate::init().unwrap(); |
| 437 | |
| 438 | let bus = Bus::new(); |
| 439 | let bus_stream = bus.stream(); |
| 440 | |
| 441 | let eos_message = crate::message::Eos::new(); |
| 442 | bus.post(eos_message).unwrap(); |
| 443 | |
| 444 | let bus_future = bus_stream.into_future(); |
| 445 | let (message, _) = futures_executor::block_on(bus_future); |
| 446 | |
| 447 | match message.unwrap().view() { |
| 448 | crate::MessageView::Eos(_) => (), |
| 449 | _ => unreachable!(), |
| 450 | } |
| 451 | } |
| 452 | } |
| 453 | |