| 1 | //! The client-side proxy API. |
| 2 | |
| 3 | use enumflags2::{bitflags , BitFlags}; |
| 4 | use event_listener::{Event, EventListener}; |
| 5 | use futures_core::{ready, stream}; |
| 6 | use futures_util::{future::Either, stream::Map}; |
| 7 | use ordered_stream::{join as join_streams, FromFuture, Join, OrderedStream, PollResult}; |
| 8 | use static_assertions::assert_impl_all; |
| 9 | use std::{ |
| 10 | collections::{HashMap, HashSet}, |
| 11 | fmt, |
| 12 | future::Future, |
| 13 | ops::Deref, |
| 14 | pin::Pin, |
| 15 | sync::{Arc, OnceLock, RwLock, RwLockReadGuard}, |
| 16 | task::{Context, Poll}, |
| 17 | }; |
| 18 | use tracing::{debug, info_span, instrument, trace, Instrument}; |
| 19 | |
| 20 | use zbus_names::{BusName, InterfaceName, MemberName, UniqueName}; |
| 21 | use zvariant::{ObjectPath, OwnedValue, Str, Value}; |
| 22 | |
| 23 | use crate::{ |
| 24 | fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy}, |
| 25 | message::{Flags, Message, Sequence, Type}, |
| 26 | AsyncDrop, Connection, Error, Executor, MatchRule, MessageStream, OwnedMatchRule, Result, Task, |
| 27 | }; |
| 28 | |
| 29 | mod builder; |
| 30 | pub use builder::{Builder, CacheProperties, ProxyDefault}; |
| 31 | |
| 32 | /// A client-side interface proxy. |
| 33 | /// |
| 34 | /// A `Proxy` is a helper to interact with an interface on a remote object. |
| 35 | /// |
| 36 | /// # Example |
| 37 | /// |
| 38 | /// ``` |
| 39 | /// use std::result::Result; |
| 40 | /// use std::error::Error; |
| 41 | /// use zbus::{Connection, Proxy}; |
| 42 | /// |
| 43 | /// #[tokio::main] |
| 44 | /// async fn main() -> Result<(), Box<dyn Error>> { |
| 45 | /// let connection = Connection::session().await?; |
| 46 | /// let p = Proxy::new( |
| 47 | /// &connection, |
| 48 | /// "org.freedesktop.DBus" , |
| 49 | /// "/org/freedesktop/DBus" , |
| 50 | /// "org.freedesktop.DBus" , |
| 51 | /// ).await?; |
| 52 | /// // owned return value |
| 53 | /// let _id: String = p.call("GetId" , &()).await?; |
| 54 | /// // borrowed return value |
| 55 | /// let body = p.call_method("GetId" , &()).await?.body(); |
| 56 | /// let _id: &str = body.deserialize()?; |
| 57 | /// |
| 58 | /// Ok(()) |
| 59 | /// } |
| 60 | /// ``` |
| 61 | /// |
| 62 | /// # Note |
| 63 | /// |
| 64 | /// It is recommended to use the [`proxy`] macro, which provides a more convenient and |
| 65 | /// type-safe *façade* `Proxy` derived from a Rust trait. |
| 66 | /// |
| 67 | /// [`futures` crate]: https://crates.io/crates/futures |
| 68 | /// [`proxy`]: attr.proxy.html |
| 69 | #[derive (Clone, Debug)] |
| 70 | pub struct Proxy<'a> { |
| 71 | pub(crate) inner: Arc<ProxyInner<'a>>, |
| 72 | } |
| 73 | |
| 74 | assert_impl_all!(Proxy<'_>: Send, Sync, Unpin); |
| 75 | |
| 76 | /// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen |
| 77 | /// (and possibly other crates). |
| 78 | pub(crate) struct ProxyInnerStatic { |
| 79 | pub(crate) conn: Connection, |
| 80 | dest_owner_change_match_rule: OnceLock<OwnedMatchRule>, |
| 81 | } |
| 82 | |
| 83 | impl fmt::Debug for ProxyInnerStatic { |
| 84 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 85 | f&mut DebugStruct<'_, '_>.debug_struct("ProxyInnerStatic" ) |
| 86 | .field( |
| 87 | name:"dest_owner_change_match_rule" , |
| 88 | &self.dest_owner_change_match_rule, |
| 89 | ) |
| 90 | .finish_non_exhaustive() |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | #[derive (Debug)] |
| 95 | pub(crate) struct ProxyInner<'a> { |
| 96 | inner_without_borrows: ProxyInnerStatic, |
| 97 | pub(crate) destination: BusName<'a>, |
| 98 | pub(crate) path: ObjectPath<'a>, |
| 99 | pub(crate) interface: InterfaceName<'a>, |
| 100 | |
| 101 | /// Cache of property values. |
| 102 | property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>, |
| 103 | /// Set of properties which do not get cached, by name. |
| 104 | /// This overrides proxy-level caching behavior. |
| 105 | uncached_properties: HashSet<Str<'a>>, |
| 106 | } |
| 107 | |
| 108 | impl Drop for ProxyInnerStatic { |
| 109 | fn drop(&mut self) { |
| 110 | if let Some(rule: OwnedMatchRule) = self.dest_owner_change_match_rule.take() { |
| 111 | self.conn.queue_remove_match(rule); |
| 112 | } |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | /// A property changed event. |
| 117 | /// |
| 118 | /// The property changed event generated by [`PropertyStream`]. |
| 119 | pub struct PropertyChanged<'a, T> { |
| 120 | name: &'a str, |
| 121 | properties: Arc<PropertiesCache>, |
| 122 | proxy: Proxy<'a>, |
| 123 | phantom: std::marker::PhantomData<T>, |
| 124 | } |
| 125 | |
| 126 | impl<'a, T> PropertyChanged<'a, T> { |
| 127 | // The name of the property that changed. |
| 128 | pub fn name(&self) -> &str { |
| 129 | self.name |
| 130 | } |
| 131 | |
| 132 | // Get the raw value of the property that changed. |
| 133 | // |
| 134 | // If the notification signal contained the new value, it has been cached already and this call |
| 135 | // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch |
| 136 | // and cache the new value. |
| 137 | pub async fn get_raw<'p>(&'p self) -> Result<impl Deref<Target = Value<'static>> + 'p> { |
| 138 | struct Wrapper<'w> { |
| 139 | name: &'w str, |
| 140 | values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>, |
| 141 | } |
| 142 | |
| 143 | impl<'w> Deref for Wrapper<'w> { |
| 144 | type Target = Value<'static>; |
| 145 | |
| 146 | fn deref(&self) -> &Self::Target { |
| 147 | self.values |
| 148 | .get(self.name) |
| 149 | .expect("PropertyStream with no corresponding property" ) |
| 150 | .value |
| 151 | .as_ref() |
| 152 | .expect("PropertyStream with no corresponding property" ) |
| 153 | } |
| 154 | } |
| 155 | |
| 156 | { |
| 157 | let values = self.properties.values.read().expect("lock poisoned" ); |
| 158 | if values |
| 159 | .get(self.name) |
| 160 | .expect("PropertyStream with no corresponding property" ) |
| 161 | .value |
| 162 | .is_some() |
| 163 | { |
| 164 | return Ok(Wrapper { |
| 165 | name: self.name, |
| 166 | values, |
| 167 | }); |
| 168 | } |
| 169 | } |
| 170 | |
| 171 | // The property was invalidated, so we need to fetch the new value. |
| 172 | let properties_proxy = self.proxy.properties_proxy(); |
| 173 | let value = properties_proxy |
| 174 | .get(self.proxy.inner.interface.clone(), self.name) |
| 175 | .await |
| 176 | .map_err(crate::Error::from)?; |
| 177 | |
| 178 | // Save the new value |
| 179 | { |
| 180 | let mut values = self.properties.values.write().expect("lock poisoned" ); |
| 181 | |
| 182 | values |
| 183 | .get_mut(self.name) |
| 184 | .expect("PropertyStream with no corresponding property" ) |
| 185 | .value = Some(value); |
| 186 | } |
| 187 | |
| 188 | Ok(Wrapper { |
| 189 | name: self.name, |
| 190 | values: self.properties.values.read().expect("lock poisoned" ), |
| 191 | }) |
| 192 | } |
| 193 | } |
| 194 | |
| 195 | impl<T> PropertyChanged<'_, T> |
| 196 | where |
| 197 | T: TryFrom<zvariant::OwnedValue>, |
| 198 | T::Error: Into<crate::Error>, |
| 199 | { |
| 200 | // Get the value of the property that changed. |
| 201 | // |
| 202 | // If the notification signal contained the new value, it has been cached already and this call |
| 203 | // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch |
| 204 | // and cache the new value. |
| 205 | pub async fn get(&self) -> Result<T> { |
| 206 | self.get_raw() |
| 207 | .await |
| 208 | .and_then(|v: impl Deref>| T::try_from(OwnedValue::try_from(&*v)?).map_err(op:Into::into)) |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | /// A [`stream::Stream`] implementation that yields property change notifications. |
| 213 | /// |
| 214 | /// Use [`Proxy::receive_property_changed`] to create an instance of this type. |
| 215 | #[derive (Debug)] |
| 216 | pub struct PropertyStream<'a, T> { |
| 217 | name: &'a str, |
| 218 | proxy: Proxy<'a>, |
| 219 | changed_listener: EventListener, |
| 220 | phantom: std::marker::PhantomData<T>, |
| 221 | } |
| 222 | |
| 223 | impl<'a, T> stream::Stream for PropertyStream<'a, T> |
| 224 | where |
| 225 | T: Unpin, |
| 226 | { |
| 227 | type Item = PropertyChanged<'a, T>; |
| 228 | |
| 229 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 230 | let m = self.get_mut(); |
| 231 | let properties = match m.proxy.get_property_cache() { |
| 232 | Some(properties) => properties.clone(), |
| 233 | // With no cache, we will get no updates; return immediately |
| 234 | None => return Poll::Ready(None), |
| 235 | }; |
| 236 | ready!(Pin::new(&mut m.changed_listener).poll(cx)); |
| 237 | |
| 238 | m.changed_listener = properties |
| 239 | .values |
| 240 | .read() |
| 241 | .expect("lock poisoned" ) |
| 242 | .get(m.name) |
| 243 | .expect("PropertyStream with no corresponding property" ) |
| 244 | .event |
| 245 | .listen(); |
| 246 | |
| 247 | Poll::Ready(Some(PropertyChanged { |
| 248 | name: m.name, |
| 249 | properties, |
| 250 | proxy: m.proxy.clone(), |
| 251 | phantom: std::marker::PhantomData, |
| 252 | })) |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | #[derive (Debug)] |
| 257 | pub(crate) struct PropertiesCache { |
| 258 | values: RwLock<HashMap<String, PropertyValue>>, |
| 259 | caching_result: RwLock<CachingResult>, |
| 260 | } |
| 261 | |
| 262 | #[derive (Debug)] |
| 263 | enum CachingResult { |
| 264 | Caching { ready: Event }, |
| 265 | Cached { result: Result<()> }, |
| 266 | } |
| 267 | |
| 268 | impl PropertiesCache { |
| 269 | #[instrument (skip_all)] |
| 270 | fn new( |
| 271 | proxy: PropertiesProxy<'static>, |
| 272 | interface: InterfaceName<'static>, |
| 273 | executor: &Executor<'_>, |
| 274 | uncached_properties: HashSet<zvariant::Str<'static>>, |
| 275 | ) -> (Arc<Self>, Task<()>) { |
| 276 | let cache = Arc::new(PropertiesCache { |
| 277 | values: Default::default(), |
| 278 | caching_result: RwLock::new(CachingResult::Caching { |
| 279 | ready: Event::new(), |
| 280 | }), |
| 281 | }); |
| 282 | |
| 283 | let cache_clone = cache.clone(); |
| 284 | let task_name = format!(" {interface} proxy caching" ); |
| 285 | let proxy_caching = async move { |
| 286 | let result = cache_clone |
| 287 | .init(proxy, interface, uncached_properties) |
| 288 | .await; |
| 289 | let (prop_changes, interface, uncached_properties) = { |
| 290 | let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned" ); |
| 291 | let ready = match &*caching_result { |
| 292 | CachingResult::Caching { ready } => ready, |
| 293 | // SAFETY: This is the only part of the code that changes this state and it's |
| 294 | // only run once. |
| 295 | _ => unreachable!(), |
| 296 | }; |
| 297 | match result { |
| 298 | Ok((prop_changes, interface, uncached_properties)) => { |
| 299 | ready.notify(usize::MAX); |
| 300 | *caching_result = CachingResult::Cached { result: Ok(()) }; |
| 301 | |
| 302 | (prop_changes, interface, uncached_properties) |
| 303 | } |
| 304 | Err(e) => { |
| 305 | ready.notify(usize::MAX); |
| 306 | *caching_result = CachingResult::Cached { result: Err(e) }; |
| 307 | |
| 308 | return; |
| 309 | } |
| 310 | } |
| 311 | }; |
| 312 | |
| 313 | if let Err(e) = cache_clone |
| 314 | .keep_updated(prop_changes, interface, uncached_properties) |
| 315 | .await |
| 316 | { |
| 317 | debug!("Error keeping properties cache updated: {e}" ); |
| 318 | } |
| 319 | } |
| 320 | .instrument(info_span!("{}" , task_name)); |
| 321 | let task = executor.spawn(proxy_caching, &task_name); |
| 322 | |
| 323 | (cache, task) |
| 324 | } |
| 325 | |
| 326 | // new() runs this in a task it spawns for initialization of properties cache. |
| 327 | async fn init( |
| 328 | &self, |
| 329 | proxy: PropertiesProxy<'static>, |
| 330 | interface: InterfaceName<'static>, |
| 331 | uncached_properties: HashSet<zvariant::Str<'static>>, |
| 332 | ) -> Result<( |
| 333 | PropertiesChangedStream<'static>, |
| 334 | InterfaceName<'static>, |
| 335 | HashSet<zvariant::Str<'static>>, |
| 336 | )> { |
| 337 | use ordered_stream::OrderedStreamExt; |
| 338 | |
| 339 | let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left); |
| 340 | |
| 341 | let get_all = proxy |
| 342 | .inner() |
| 343 | .connection() |
| 344 | .call_method_raw( |
| 345 | Some(proxy.inner().destination()), |
| 346 | proxy.inner().path(), |
| 347 | Some(proxy.inner().interface()), |
| 348 | "GetAll" , |
| 349 | BitFlags::empty(), |
| 350 | &interface, |
| 351 | ) |
| 352 | .await |
| 353 | .map(|r| FromFuture::from(r.expect("no reply" )).map(Either::Right))?; |
| 354 | |
| 355 | let mut join = join_streams(prop_changes, get_all); |
| 356 | |
| 357 | loop { |
| 358 | match join.next().await { |
| 359 | Some(Either::Left(_update)) => { |
| 360 | // discard updates prior to the initial population |
| 361 | } |
| 362 | Some(Either::Right(populate)) => { |
| 363 | populate?.body().deserialize().map(|values| { |
| 364 | self.update_cache(&uncached_properties, &values, Vec::new(), &interface); |
| 365 | })?; |
| 366 | break; |
| 367 | } |
| 368 | None => break, |
| 369 | } |
| 370 | } |
| 371 | if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() { |
| 372 | // if an update was buffered, then it happened after the get_all returned and needs to |
| 373 | // be applied before we discard the join |
| 374 | if let Ok(args) = update.args() { |
| 375 | if args.interface_name == interface { |
| 376 | self.update_cache( |
| 377 | &uncached_properties, |
| 378 | &args.changed_properties, |
| 379 | args.invalidated_properties, |
| 380 | &interface, |
| 381 | ); |
| 382 | } |
| 383 | } |
| 384 | } |
| 385 | // This is needed to avoid a "implementation of `OrderedStream` is not general enough" |
| 386 | // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead |
| 387 | // of directly to the stream. |
| 388 | let prop_changes = join.into_inner().0.into_inner(); |
| 389 | |
| 390 | Ok((prop_changes, interface, uncached_properties)) |
| 391 | } |
| 392 | |
| 393 | // new() runs this in a task it spawns for keeping the cache in sync. |
| 394 | #[instrument (skip_all)] |
| 395 | async fn keep_updated( |
| 396 | &self, |
| 397 | mut prop_changes: PropertiesChangedStream<'static>, |
| 398 | interface: InterfaceName<'static>, |
| 399 | uncached_properties: HashSet<zvariant::Str<'static>>, |
| 400 | ) -> Result<()> { |
| 401 | use futures_util::StreamExt; |
| 402 | |
| 403 | trace!("Listening for property changes on {interface}..." ); |
| 404 | while let Some(update) = prop_changes.next().await { |
| 405 | if let Ok(args) = update.args() { |
| 406 | if args.interface_name == interface { |
| 407 | self.update_cache( |
| 408 | &uncached_properties, |
| 409 | &args.changed_properties, |
| 410 | args.invalidated_properties, |
| 411 | &interface, |
| 412 | ); |
| 413 | } |
| 414 | } |
| 415 | } |
| 416 | |
| 417 | Ok(()) |
| 418 | } |
| 419 | |
| 420 | fn update_cache( |
| 421 | &self, |
| 422 | uncached_properties: &HashSet<Str<'_>>, |
| 423 | changed: &HashMap<&str, Value<'_>>, |
| 424 | invalidated: Vec<&str>, |
| 425 | interface: &InterfaceName<'_>, |
| 426 | ) { |
| 427 | let mut values = self.values.write().expect("lock poisoned" ); |
| 428 | |
| 429 | for inval in invalidated { |
| 430 | if uncached_properties.contains(&Str::from(inval)) { |
| 431 | debug!( |
| 432 | "Ignoring invalidation of uncached property ` {}. {}`" , |
| 433 | interface, inval |
| 434 | ); |
| 435 | continue; |
| 436 | } |
| 437 | trace!("Property ` {interface}. {inval}` invalidated" ); |
| 438 | |
| 439 | if let Some(entry) = values.get_mut(inval) { |
| 440 | entry.value = None; |
| 441 | entry.event.notify(usize::MAX); |
| 442 | } |
| 443 | } |
| 444 | |
| 445 | for (property_name, value) in changed { |
| 446 | if uncached_properties.contains(&Str::from(*property_name)) { |
| 447 | debug!( |
| 448 | "Ignoring update of uncached property ` {}. {}`" , |
| 449 | interface, property_name |
| 450 | ); |
| 451 | continue; |
| 452 | } |
| 453 | trace!("Property ` {interface}. {property_name}` updated" ); |
| 454 | |
| 455 | let entry = values.entry(property_name.to_string()).or_default(); |
| 456 | |
| 457 | let value = match OwnedValue::try_from(value) { |
| 458 | Ok(value) => value, |
| 459 | Err(e) => { |
| 460 | debug!( |
| 461 | "Failed to convert property ` {interface}. {property_name}` to OwnedValue: {e}" |
| 462 | ); |
| 463 | continue; |
| 464 | } |
| 465 | }; |
| 466 | entry.value = Some(value); |
| 467 | entry.event.notify(usize::MAX); |
| 468 | } |
| 469 | } |
| 470 | |
| 471 | /// Wait for the cache to be populated and return any error encountered during population |
| 472 | pub(crate) async fn ready(&self) -> Result<()> { |
| 473 | let listener = match &*self.caching_result.read().expect("lock poisoned" ) { |
| 474 | CachingResult::Caching { ready } => ready.listen(), |
| 475 | CachingResult::Cached { result } => return result.clone(), |
| 476 | }; |
| 477 | listener.await; |
| 478 | |
| 479 | // It must be ready now. |
| 480 | match &*self.caching_result.read().expect("lock poisoned" ) { |
| 481 | // SAFETY: We were just notified that state has changed to `Cached` and we never go back |
| 482 | // to `Caching` once in `Cached`. |
| 483 | CachingResult::Caching { .. } => unreachable!(), |
| 484 | CachingResult::Cached { result } => result.clone(), |
| 485 | } |
| 486 | } |
| 487 | } |
| 488 | |
| 489 | impl<'a> ProxyInner<'a> { |
| 490 | pub(crate) fn new( |
| 491 | conn: Connection, |
| 492 | destination: BusName<'a>, |
| 493 | path: ObjectPath<'a>, |
| 494 | interface: InterfaceName<'a>, |
| 495 | cache: CacheProperties, |
| 496 | uncached_properties: HashSet<Str<'a>>, |
| 497 | ) -> Self { |
| 498 | let property_cache = match cache { |
| 499 | CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()), |
| 500 | CacheProperties::No => None, |
| 501 | }; |
| 502 | Self { |
| 503 | inner_without_borrows: ProxyInnerStatic { |
| 504 | conn, |
| 505 | dest_owner_change_match_rule: OnceLock::new(), |
| 506 | }, |
| 507 | destination, |
| 508 | path, |
| 509 | interface, |
| 510 | property_cache, |
| 511 | uncached_properties, |
| 512 | } |
| 513 | } |
| 514 | |
| 515 | /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination. |
| 516 | /// |
| 517 | /// If the destination is a unique name, we will not subscribe to the signal. |
| 518 | pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> { |
| 519 | if !self.inner_without_borrows.conn.is_bus() { |
| 520 | // Names don't mean much outside the bus context. |
| 521 | return Ok(()); |
| 522 | } |
| 523 | |
| 524 | let well_known_name = match &self.destination { |
| 525 | BusName::WellKnown(well_known_name) => well_known_name, |
| 526 | BusName::Unique(_) => return Ok(()), |
| 527 | }; |
| 528 | |
| 529 | if self |
| 530 | .inner_without_borrows |
| 531 | .dest_owner_change_match_rule |
| 532 | .get() |
| 533 | .is_some() |
| 534 | { |
| 535 | // Already watching over the bus for any name updates so nothing to do here. |
| 536 | return Ok(()); |
| 537 | } |
| 538 | |
| 539 | let conn = &self.inner_without_borrows.conn; |
| 540 | let signal_rule: OwnedMatchRule = MatchRule::builder() |
| 541 | .msg_type(Type::Signal) |
| 542 | .sender("org.freedesktop.DBus" )? |
| 543 | .path("/org/freedesktop/DBus" )? |
| 544 | .interface("org.freedesktop.DBus" )? |
| 545 | .member("NameOwnerChanged" )? |
| 546 | .add_arg(well_known_name.as_str())? |
| 547 | .build() |
| 548 | .to_owned() |
| 549 | .into(); |
| 550 | |
| 551 | conn.add_match( |
| 552 | signal_rule.clone(), |
| 553 | Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED), |
| 554 | ) |
| 555 | .await?; |
| 556 | |
| 557 | if self |
| 558 | .inner_without_borrows |
| 559 | .dest_owner_change_match_rule |
| 560 | .set(signal_rule.clone()) |
| 561 | .is_err() |
| 562 | { |
| 563 | // we raced another destination_unique_name call and added it twice |
| 564 | conn.remove_match(signal_rule).await?; |
| 565 | } |
| 566 | |
| 567 | Ok(()) |
| 568 | } |
| 569 | } |
| 570 | |
| 571 | const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8; |
| 572 | |
| 573 | impl<'a> Proxy<'a> { |
| 574 | /// Create a new `Proxy` for the given destination/path/interface. |
| 575 | pub async fn new<D, P, I>( |
| 576 | conn: &Connection, |
| 577 | destination: D, |
| 578 | path: P, |
| 579 | interface: I, |
| 580 | ) -> Result<Proxy<'a>> |
| 581 | where |
| 582 | D: TryInto<BusName<'a>>, |
| 583 | P: TryInto<ObjectPath<'a>>, |
| 584 | I: TryInto<InterfaceName<'a>>, |
| 585 | D::Error: Into<Error>, |
| 586 | P::Error: Into<Error>, |
| 587 | I::Error: Into<Error>, |
| 588 | { |
| 589 | Builder::new(conn) |
| 590 | .destination(destination)? |
| 591 | .path(path)? |
| 592 | .interface(interface)? |
| 593 | .build() |
| 594 | .await |
| 595 | } |
| 596 | |
| 597 | /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all |
| 598 | /// passed arguments. |
| 599 | pub async fn new_owned<D, P, I>( |
| 600 | conn: Connection, |
| 601 | destination: D, |
| 602 | path: P, |
| 603 | interface: I, |
| 604 | ) -> Result<Proxy<'a>> |
| 605 | where |
| 606 | D: TryInto<BusName<'static>>, |
| 607 | P: TryInto<ObjectPath<'static>>, |
| 608 | I: TryInto<InterfaceName<'static>>, |
| 609 | D::Error: Into<Error>, |
| 610 | P::Error: Into<Error>, |
| 611 | I::Error: Into<Error>, |
| 612 | { |
| 613 | Builder::new(&conn) |
| 614 | .destination(destination)? |
| 615 | .path(path)? |
| 616 | .interface(interface)? |
| 617 | .build() |
| 618 | .await |
| 619 | } |
| 620 | |
| 621 | /// Get a reference to the associated connection. |
| 622 | pub fn connection(&self) -> &Connection { |
| 623 | &self.inner.inner_without_borrows.conn |
| 624 | } |
| 625 | |
| 626 | /// Get a reference to the destination service name. |
| 627 | pub fn destination(&self) -> &BusName<'_> { |
| 628 | &self.inner.destination |
| 629 | } |
| 630 | |
| 631 | /// Get a reference to the object path. |
| 632 | pub fn path(&self) -> &ObjectPath<'_> { |
| 633 | &self.inner.path |
| 634 | } |
| 635 | |
| 636 | /// Get a reference to the interface. |
| 637 | pub fn interface(&self) -> &InterfaceName<'_> { |
| 638 | &self.inner.interface |
| 639 | } |
| 640 | |
| 641 | /// Introspect the associated object, and return the XML description. |
| 642 | /// |
| 643 | /// See the [xml](xml/index.html) module for parsing the |
| 644 | /// result. |
| 645 | pub async fn introspect(&self) -> fdo::Result<String> { |
| 646 | let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn) |
| 647 | .destination(&self.inner.destination)? |
| 648 | .path(&self.inner.path)? |
| 649 | .build() |
| 650 | .await?; |
| 651 | |
| 652 | proxy.introspect().await |
| 653 | } |
| 654 | |
| 655 | fn properties_proxy(&self) -> PropertiesProxy<'_> { |
| 656 | PropertiesProxy::builder(&self.inner.inner_without_borrows.conn) |
| 657 | // Safe because already checked earlier |
| 658 | .destination(self.inner.destination.as_ref()) |
| 659 | .unwrap() |
| 660 | // Safe because already checked earlier |
| 661 | .path(self.inner.path.as_ref()) |
| 662 | .unwrap() |
| 663 | // does not have properties |
| 664 | .cache_properties(CacheProperties::No) |
| 665 | .build_internal() |
| 666 | .unwrap() |
| 667 | .into() |
| 668 | } |
| 669 | |
| 670 | fn owned_properties_proxy(&self) -> PropertiesProxy<'static> { |
| 671 | PropertiesProxy::builder(&self.inner.inner_without_borrows.conn) |
| 672 | // Safe because already checked earlier |
| 673 | .destination(self.inner.destination.to_owned()) |
| 674 | .unwrap() |
| 675 | // Safe because already checked earlier |
| 676 | .path(self.inner.path.to_owned()) |
| 677 | .unwrap() |
| 678 | // does not have properties |
| 679 | .cache_properties(CacheProperties::No) |
| 680 | .build_internal() |
| 681 | .unwrap() |
| 682 | .into() |
| 683 | } |
| 684 | |
| 685 | /// Get the cache, starting it in the background if needed. |
| 686 | /// |
| 687 | /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors |
| 688 | /// encountered in the population. |
| 689 | pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> { |
| 690 | let cache = match &self.inner.property_cache { |
| 691 | Some(cache) => cache, |
| 692 | None => return None, |
| 693 | }; |
| 694 | let (cache, _) = &cache.get_or_init(|| { |
| 695 | let proxy = self.owned_properties_proxy(); |
| 696 | let interface = self.interface().to_owned(); |
| 697 | let uncached_properties: HashSet<zvariant::Str<'static>> = self |
| 698 | .inner |
| 699 | .uncached_properties |
| 700 | .iter() |
| 701 | .map(|s| s.to_owned()) |
| 702 | .collect(); |
| 703 | let executor = self.connection().executor(); |
| 704 | |
| 705 | PropertiesCache::new(proxy, interface, executor, uncached_properties) |
| 706 | }); |
| 707 | |
| 708 | Some(cache) |
| 709 | } |
| 710 | |
| 711 | /// Get the cached value of the property `property_name`. |
| 712 | /// |
| 713 | /// This returns `None` if the property is not in the cache. This could be because the cache |
| 714 | /// was invalidated by an update, because caching was disabled for this property or proxy, or |
| 715 | /// because the cache has not yet been populated. Use `get_property` to fetch the value from |
| 716 | /// the peer. |
| 717 | pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>> |
| 718 | where |
| 719 | T: TryFrom<OwnedValue>, |
| 720 | T::Error: Into<Error>, |
| 721 | { |
| 722 | self.cached_property_raw(property_name) |
| 723 | .as_deref() |
| 724 | .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into)) |
| 725 | .transpose() |
| 726 | } |
| 727 | |
| 728 | /// Get the cached value of the property `property_name`. |
| 729 | /// |
| 730 | /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This |
| 731 | /// is useful if you want to avoid allocations and cloning. |
| 732 | pub fn cached_property_raw<'p>( |
| 733 | &'p self, |
| 734 | property_name: &'p str, |
| 735 | ) -> Option<impl Deref<Target = Value<'static>> + 'p> { |
| 736 | if let Some(values) = self |
| 737 | .inner |
| 738 | .property_cache |
| 739 | .as_ref() |
| 740 | .and_then(OnceLock::get) |
| 741 | .map(|c| c.0.values.read().expect("lock poisoned" )) |
| 742 | { |
| 743 | // ensure that the property is in the cache. |
| 744 | values |
| 745 | .get(property_name) |
| 746 | // if the property value has not yet been cached, this will return None. |
| 747 | .and_then(|e| e.value.as_ref())?; |
| 748 | |
| 749 | struct Wrapper<'a> { |
| 750 | values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>, |
| 751 | property_name: &'a str, |
| 752 | } |
| 753 | |
| 754 | impl Deref for Wrapper<'_> { |
| 755 | type Target = Value<'static>; |
| 756 | |
| 757 | fn deref(&self) -> &Self::Target { |
| 758 | self.values |
| 759 | .get(self.property_name) |
| 760 | .and_then(|e| e.value.as_ref()) |
| 761 | .map(|v| v.deref()) |
| 762 | .expect("inexistent property" ) |
| 763 | } |
| 764 | } |
| 765 | |
| 766 | Some(Wrapper { |
| 767 | values, |
| 768 | property_name, |
| 769 | }) |
| 770 | } else { |
| 771 | None |
| 772 | } |
| 773 | } |
| 774 | |
| 775 | async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> { |
| 776 | Ok(self |
| 777 | .properties_proxy() |
| 778 | .get(self.inner.interface.as_ref(), property_name) |
| 779 | .await?) |
| 780 | } |
| 781 | |
| 782 | /// Get the property `property_name`. |
| 783 | /// |
| 784 | /// Get the property value from the cache (if caching is enabled) or call the |
| 785 | /// `Get` method of the `org.freedesktop.DBus.Properties` interface. |
| 786 | pub async fn get_property<T>(&self, property_name: &str) -> Result<T> |
| 787 | where |
| 788 | T: TryFrom<OwnedValue>, |
| 789 | T::Error: Into<Error>, |
| 790 | { |
| 791 | if let Some(cache) = self.get_property_cache() { |
| 792 | cache.ready().await?; |
| 793 | } |
| 794 | if let Some(value) = self.cached_property(property_name)? { |
| 795 | return Ok(value); |
| 796 | } |
| 797 | |
| 798 | let value = self.get_proxy_property(property_name).await?; |
| 799 | value.try_into().map_err(Into::into) |
| 800 | } |
| 801 | |
| 802 | /// Set the property `property_name`. |
| 803 | /// |
| 804 | /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface. |
| 805 | pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()> |
| 806 | where |
| 807 | T: 't + Into<Value<'t>>, |
| 808 | { |
| 809 | self.properties_proxy() |
| 810 | .set(self.inner.interface.as_ref(), property_name, &value.into()) |
| 811 | .await |
| 812 | } |
| 813 | |
| 814 | /// Call a method and return the reply. |
| 815 | /// |
| 816 | /// Typically, you would want to use [`call`] method instead. Use this method if you need to |
| 817 | /// deserialize the reply message manually (this way, you can avoid the memory |
| 818 | /// allocation/copying, by deserializing the reply to an unowned type). |
| 819 | /// |
| 820 | /// [`call`]: struct.Proxy.html#method.call |
| 821 | pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message> |
| 822 | where |
| 823 | M: TryInto<MemberName<'m>>, |
| 824 | M::Error: Into<Error>, |
| 825 | B: serde::ser::Serialize + zvariant::DynamicType, |
| 826 | { |
| 827 | self.inner |
| 828 | .inner_without_borrows |
| 829 | .conn |
| 830 | .call_method( |
| 831 | Some(&self.inner.destination), |
| 832 | self.inner.path.as_str(), |
| 833 | Some(&self.inner.interface), |
| 834 | method_name, |
| 835 | body, |
| 836 | ) |
| 837 | .await |
| 838 | } |
| 839 | |
| 840 | /// Call a method and return the reply body. |
| 841 | /// |
| 842 | /// Use [`call_method`] instead if you need to deserialize the reply manually/separately. |
| 843 | /// |
| 844 | /// [`call_method`]: struct.Proxy.html#method.call_method |
| 845 | pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R> |
| 846 | where |
| 847 | M: TryInto<MemberName<'m>>, |
| 848 | M::Error: Into<Error>, |
| 849 | B: serde::ser::Serialize + zvariant::DynamicType, |
| 850 | R: for<'d> zvariant::DynamicDeserialize<'d>, |
| 851 | { |
| 852 | let reply = self.call_method(method_name, body).await?; |
| 853 | |
| 854 | reply.body().deserialize() |
| 855 | } |
| 856 | |
| 857 | /// Call a method and return the reply body, optionally supplying a set of |
| 858 | /// method flags to control the way the method call message is sent and handled. |
| 859 | /// |
| 860 | /// Use [`call`] instead if you do not need any special handling via additional flags. |
| 861 | /// If the `NoReplyExpected` flag is passed , this will return None immediately |
| 862 | /// after sending the message, similar to [`call_noreply`] |
| 863 | /// |
| 864 | /// [`call`]: struct.Proxy.html#method.call |
| 865 | /// [`call_noreply`]: struct.Proxy.html#method.call_noreply |
| 866 | pub async fn call_with_flags<'m, M, B, R>( |
| 867 | &self, |
| 868 | method_name: M, |
| 869 | flags: BitFlags<MethodFlags>, |
| 870 | body: &B, |
| 871 | ) -> Result<Option<R>> |
| 872 | where |
| 873 | M: TryInto<MemberName<'m>>, |
| 874 | M::Error: Into<Error>, |
| 875 | B: serde::ser::Serialize + zvariant::DynamicType, |
| 876 | R: for<'d> zvariant::DynamicDeserialize<'d>, |
| 877 | { |
| 878 | let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>(); |
| 879 | match self |
| 880 | .inner |
| 881 | .inner_without_borrows |
| 882 | .conn |
| 883 | .call_method_raw( |
| 884 | Some(self.destination()), |
| 885 | self.path(), |
| 886 | Some(self.interface()), |
| 887 | method_name, |
| 888 | flags, |
| 889 | body, |
| 890 | ) |
| 891 | .await? |
| 892 | { |
| 893 | Some(reply) => reply.await?.body().deserialize().map(Some), |
| 894 | None => Ok(None), |
| 895 | } |
| 896 | } |
| 897 | |
| 898 | /// Call a method without expecting a reply |
| 899 | /// |
| 900 | /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply. |
| 901 | pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()> |
| 902 | where |
| 903 | M: TryInto<MemberName<'m>>, |
| 904 | M::Error: Into<Error>, |
| 905 | B: serde::ser::Serialize + zvariant::DynamicType, |
| 906 | { |
| 907 | self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body) |
| 908 | .await?; |
| 909 | Ok(()) |
| 910 | } |
| 911 | |
| 912 | /// Create a stream for signal named `signal_name`. |
| 913 | pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>> |
| 914 | where |
| 915 | M: TryInto<MemberName<'m>>, |
| 916 | M::Error: Into<Error>, |
| 917 | { |
| 918 | self.receive_signal_with_args(signal_name, &[]).await |
| 919 | } |
| 920 | |
| 921 | /// Same as [`Proxy::receive_signal`] but with a filter. |
| 922 | /// |
| 923 | /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid |
| 924 | /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use |
| 925 | /// this method where possible. Note that this filtering is limited to arguments of string |
| 926 | /// types. |
| 927 | /// |
| 928 | /// The arguments are passed as a tuples of argument index and expected value. |
| 929 | pub async fn receive_signal_with_args<'m, M>( |
| 930 | &self, |
| 931 | signal_name: M, |
| 932 | args: &[(u8, &str)], |
| 933 | ) -> Result<SignalStream<'m>> |
| 934 | where |
| 935 | M: TryInto<MemberName<'m>>, |
| 936 | M::Error: Into<Error>, |
| 937 | { |
| 938 | let signal_name = signal_name.try_into().map_err(Into::into)?; |
| 939 | self.receive_signals(Some(signal_name), args).await |
| 940 | } |
| 941 | |
| 942 | async fn receive_signals<'m>( |
| 943 | &self, |
| 944 | signal_name: Option<MemberName<'m>>, |
| 945 | args: &[(u8, &str)], |
| 946 | ) -> Result<SignalStream<'m>> { |
| 947 | self.inner.subscribe_dest_owner_change().await?; |
| 948 | |
| 949 | SignalStream::new(self.clone(), signal_name, args).await |
| 950 | } |
| 951 | |
| 952 | /// Create a stream for all signals emitted by this service. |
| 953 | pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> { |
| 954 | self.receive_signals(None, &[]).await |
| 955 | } |
| 956 | |
| 957 | /// Get a stream to receive property changed events. |
| 958 | /// |
| 959 | /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it |
| 960 | /// will only receive the last update. |
| 961 | /// |
| 962 | /// If caching is not enabled on this proxy, the resulting stream will not return any events. |
| 963 | pub async fn receive_property_changed<'name: 'a, T>( |
| 964 | &self, |
| 965 | name: &'name str, |
| 966 | ) -> PropertyStream<'a, T> { |
| 967 | let properties = self.get_property_cache(); |
| 968 | let changed_listener = if let Some(properties) = &properties { |
| 969 | let mut values = properties.values.write().expect("lock poisoned" ); |
| 970 | let entry = values |
| 971 | .entry(name.to_string()) |
| 972 | .or_insert_with(PropertyValue::default); |
| 973 | entry.event.listen() |
| 974 | } else { |
| 975 | Event::new().listen() |
| 976 | }; |
| 977 | |
| 978 | PropertyStream { |
| 979 | name, |
| 980 | proxy: self.clone(), |
| 981 | changed_listener, |
| 982 | phantom: std::marker::PhantomData, |
| 983 | } |
| 984 | } |
| 985 | |
| 986 | /// Get a stream to receive destination owner changed events. |
| 987 | /// |
| 988 | /// If the proxy destination is a unique name, the stream will be notified of the peer |
| 989 | /// disconnection from the bus (with a `None` value). |
| 990 | /// |
| 991 | /// If the proxy destination is a well-known name, the stream will be notified whenever the name |
| 992 | /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the |
| 993 | /// name is released (with a `None` value). |
| 994 | /// |
| 995 | /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it |
| 996 | /// will only receive the last update. |
| 997 | pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> { |
| 998 | use futures_util::StreamExt; |
| 999 | let dbus_proxy = fdo::DBusProxy::builder(self.connection()) |
| 1000 | .cache_properties(CacheProperties::No) |
| 1001 | .build() |
| 1002 | .await?; |
| 1003 | Ok(OwnerChangedStream { |
| 1004 | stream: dbus_proxy |
| 1005 | .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())]) |
| 1006 | .await? |
| 1007 | .map(Box::new(move |signal| { |
| 1008 | let args = signal.args().unwrap(); |
| 1009 | let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned()); |
| 1010 | |
| 1011 | new_owner |
| 1012 | })), |
| 1013 | name: self.destination().clone(), |
| 1014 | }) |
| 1015 | } |
| 1016 | } |
| 1017 | |
| 1018 | #[derive (Debug, Default)] |
| 1019 | struct PropertyValue { |
| 1020 | value: Option<OwnedValue>, |
| 1021 | event: Event, |
| 1022 | } |
| 1023 | |
| 1024 | /// Flags to use with [`Proxy::call_with_flags`]. |
| 1025 | #[bitflags ] |
| 1026 | #[repr (u8)] |
| 1027 | #[derive (Debug, Copy, Clone, PartialEq, Eq)] |
| 1028 | pub enum MethodFlags { |
| 1029 | /// No response is expected from this method call, regardless of whether the |
| 1030 | /// signature for the interface method indicates a reply type. When passed, |
| 1031 | /// `call_with_flags` will return `Ok(None)` immediately after successfully |
| 1032 | /// sending the method call. |
| 1033 | /// |
| 1034 | /// Errors encountered while *making* the call will still be returned as |
| 1035 | /// an `Err` variant, but any errors that are triggered by the receiver's |
| 1036 | /// handling of the call will not be delivered. |
| 1037 | NoReplyExpected = 0x1, |
| 1038 | |
| 1039 | /// When set on a call whose destination is a message bus, this flag will instruct |
| 1040 | /// the bus not to [launch][al] a service to handle the call if no application |
| 1041 | /// on the bus owns the requested name. |
| 1042 | /// |
| 1043 | /// This flag is ignored when using a peer-to-peer connection. |
| 1044 | /// |
| 1045 | /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services |
| 1046 | NoAutoStart = 0x2, |
| 1047 | |
| 1048 | /// Indicates to the receiver that this client is prepared to wait for interactive |
| 1049 | /// authorization, which might take a considerable time to complete. For example, the receiver |
| 1050 | /// may query the user for confirmation via [polkit] or a similar framework. |
| 1051 | /// |
| 1052 | /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/ |
| 1053 | AllowInteractiveAuth = 0x4, |
| 1054 | } |
| 1055 | |
| 1056 | assert_impl_all!(MethodFlags: Send, Sync, Unpin); |
| 1057 | |
| 1058 | impl From<MethodFlags> for Flags { |
| 1059 | fn from(method_flag: MethodFlags) -> Self { |
| 1060 | match method_flag { |
| 1061 | MethodFlags::NoReplyExpected => Self::NoReplyExpected, |
| 1062 | MethodFlags::NoAutoStart => Self::NoAutoStart, |
| 1063 | MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth, |
| 1064 | } |
| 1065 | } |
| 1066 | } |
| 1067 | |
| 1068 | type OwnerChangedStreamMap<'a> = Map< |
| 1069 | fdo::NameOwnerChangedStream<'a>, |
| 1070 | Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>, |
| 1071 | >; |
| 1072 | |
| 1073 | /// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes. |
| 1074 | /// |
| 1075 | /// Use [`Proxy::receive_owner_changed`] to create an instance of this type. |
| 1076 | pub struct OwnerChangedStream<'a> { |
| 1077 | stream: OwnerChangedStreamMap<'a>, |
| 1078 | name: BusName<'a>, |
| 1079 | } |
| 1080 | |
| 1081 | assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin); |
| 1082 | |
| 1083 | impl OwnerChangedStream<'_> { |
| 1084 | /// The bus name being tracked. |
| 1085 | pub fn name(&self) -> &BusName<'_> { |
| 1086 | &self.name |
| 1087 | } |
| 1088 | } |
| 1089 | |
| 1090 | impl<'a> stream::Stream for OwnerChangedStream<'a> { |
| 1091 | type Item = Option<UniqueName<'static>>; |
| 1092 | |
| 1093 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 1094 | use futures_util::StreamExt; |
| 1095 | self.get_mut().stream.poll_next_unpin(cx) |
| 1096 | } |
| 1097 | } |
| 1098 | |
| 1099 | /// A [`stream::Stream`] implementation that yields signal [messages](`Message`). |
| 1100 | /// |
| 1101 | /// Use [`Proxy::receive_signal`] to create an instance of this type. |
| 1102 | /// |
| 1103 | /// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match |
| 1104 | /// rule registration and [`AsyncDrop`] in its documentation applies here as well. |
| 1105 | #[derive (Debug)] |
| 1106 | pub struct SignalStream<'a> { |
| 1107 | stream: Join<MessageStream, Option<MessageStream>>, |
| 1108 | src_unique_name: Option<UniqueName<'static>>, |
| 1109 | signal_name: Option<MemberName<'a>>, |
| 1110 | } |
| 1111 | |
| 1112 | impl<'a> SignalStream<'a> { |
| 1113 | /// The signal name. |
| 1114 | pub fn name(&self) -> Option<&MemberName<'a>> { |
| 1115 | self.signal_name.as_ref() |
| 1116 | } |
| 1117 | |
| 1118 | async fn new( |
| 1119 | proxy: Proxy<'_>, |
| 1120 | signal_name: Option<MemberName<'a>>, |
| 1121 | args: &[(u8, &str)], |
| 1122 | ) -> Result<SignalStream<'a>> { |
| 1123 | let mut rule_builder = MatchRule::builder() |
| 1124 | .msg_type(Type::Signal) |
| 1125 | .sender(proxy.destination())? |
| 1126 | .path(proxy.path())? |
| 1127 | .interface(proxy.interface())?; |
| 1128 | if let Some(name) = &signal_name { |
| 1129 | rule_builder = rule_builder.member(name)?; |
| 1130 | } |
| 1131 | for (i, arg) in args { |
| 1132 | rule_builder = rule_builder.arg(*i, *arg)?; |
| 1133 | } |
| 1134 | let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into(); |
| 1135 | let conn = proxy.connection(); |
| 1136 | |
| 1137 | let (src_unique_name, stream) = match proxy.destination().to_owned() { |
| 1138 | BusName::Unique(name) => ( |
| 1139 | Some(name), |
| 1140 | join_streams( |
| 1141 | MessageStream::for_match_rule(signal_rule, conn, None).await?, |
| 1142 | None, |
| 1143 | ), |
| 1144 | ), |
| 1145 | BusName::WellKnown(name) => { |
| 1146 | use ordered_stream::OrderedStreamExt; |
| 1147 | |
| 1148 | let name_owner_changed_rule = MatchRule::builder() |
| 1149 | .msg_type(Type::Signal) |
| 1150 | .sender("org.freedesktop.DBus" )? |
| 1151 | .path("/org/freedesktop/DBus" )? |
| 1152 | .interface("org.freedesktop.DBus" )? |
| 1153 | .member("NameOwnerChanged" )? |
| 1154 | .add_arg(name.as_str())? |
| 1155 | .build(); |
| 1156 | let name_owner_changed_stream = MessageStream::for_match_rule( |
| 1157 | name_owner_changed_rule, |
| 1158 | conn, |
| 1159 | Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED), |
| 1160 | ) |
| 1161 | .await? |
| 1162 | .map(Either::Left); |
| 1163 | |
| 1164 | let get_name_owner = conn |
| 1165 | .call_method_raw( |
| 1166 | Some("org.freedesktop.DBus" ), |
| 1167 | "/org/freedesktop/DBus" , |
| 1168 | Some("org.freedesktop.DBus" ), |
| 1169 | "GetNameOwner" , |
| 1170 | BitFlags::empty(), |
| 1171 | &name, |
| 1172 | ) |
| 1173 | .await |
| 1174 | .map(|r| FromFuture::from(r.expect("no reply" )).map(Either::Right))?; |
| 1175 | |
| 1176 | let mut join = join_streams(name_owner_changed_stream, get_name_owner); |
| 1177 | |
| 1178 | let mut src_unique_name = loop { |
| 1179 | match join.next().await { |
| 1180 | Some(Either::Left(Ok(msg))) => { |
| 1181 | let signal = NameOwnerChanged::from_message(msg) |
| 1182 | .expect("`NameOwnerChanged` signal stream got wrong message" ); |
| 1183 | { |
| 1184 | break signal |
| 1185 | .args() |
| 1186 | // SAFETY: The filtering code couldn't have let this through if |
| 1187 | // args were not in order. |
| 1188 | .expect("`NameOwnerChanged` signal has no args" ) |
| 1189 | .new_owner() |
| 1190 | .as_ref() |
| 1191 | .map(UniqueName::to_owned); |
| 1192 | } |
| 1193 | } |
| 1194 | Some(Either::Left(Err(_))) => (), |
| 1195 | Some(Either::Right(Ok(response))) => { |
| 1196 | break Some(response.body().deserialize::<UniqueName<'_>>()?.to_owned()) |
| 1197 | } |
| 1198 | Some(Either::Right(Err(e))) => { |
| 1199 | // Probably the name is not owned. Not a problem but let's still log it. |
| 1200 | debug!("Failed to get owner of {name}: {e}" ); |
| 1201 | |
| 1202 | break None; |
| 1203 | } |
| 1204 | None => { |
| 1205 | return Err(Error::InputOutput( |
| 1206 | std::io::Error::new( |
| 1207 | std::io::ErrorKind::BrokenPipe, |
| 1208 | "connection closed" , |
| 1209 | ) |
| 1210 | .into(), |
| 1211 | )) |
| 1212 | } |
| 1213 | } |
| 1214 | }; |
| 1215 | |
| 1216 | // Let's take into account any buffered NameOwnerChanged signal. |
| 1217 | let (stream, _, queued) = join.into_inner(); |
| 1218 | if let Some(msg) = queued.and_then(|e| match e.0 { |
| 1219 | Either::Left(Ok(msg)) => Some(msg), |
| 1220 | Either::Left(Err(_)) | Either::Right(_) => None, |
| 1221 | }) { |
| 1222 | if let Some(signal) = NameOwnerChanged::from_message(msg) { |
| 1223 | if let Ok(args) = signal.args() { |
| 1224 | match (args.name(), args.new_owner().deref()) { |
| 1225 | (BusName::WellKnown(n), Some(new_owner)) if n == &name => { |
| 1226 | src_unique_name = Some(new_owner.to_owned()); |
| 1227 | } |
| 1228 | _ => (), |
| 1229 | } |
| 1230 | } |
| 1231 | } |
| 1232 | } |
| 1233 | let name_owner_changed_stream = stream.into_inner(); |
| 1234 | |
| 1235 | let stream = join_streams( |
| 1236 | MessageStream::for_match_rule(signal_rule, conn, None).await?, |
| 1237 | Some(name_owner_changed_stream), |
| 1238 | ); |
| 1239 | |
| 1240 | (src_unique_name, stream) |
| 1241 | } |
| 1242 | }; |
| 1243 | |
| 1244 | Ok(SignalStream { |
| 1245 | stream, |
| 1246 | src_unique_name, |
| 1247 | signal_name, |
| 1248 | }) |
| 1249 | } |
| 1250 | |
| 1251 | fn filter(&mut self, msg: &Message) -> Result<bool> { |
| 1252 | let header = msg.header(); |
| 1253 | let sender = header.sender(); |
| 1254 | if sender == self.src_unique_name.as_ref() { |
| 1255 | return Ok(true); |
| 1256 | } |
| 1257 | |
| 1258 | // The src_unique_name must be maintained in lock-step with the applied filter |
| 1259 | if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) { |
| 1260 | let args = signal.args()?; |
| 1261 | self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned()); |
| 1262 | } |
| 1263 | |
| 1264 | Ok(false) |
| 1265 | } |
| 1266 | } |
| 1267 | |
| 1268 | assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin); |
| 1269 | |
| 1270 | impl<'a> stream::Stream for SignalStream<'a> { |
| 1271 | type Item = Message; |
| 1272 | |
| 1273 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 1274 | OrderedStream::poll_next_before(self, cx, before:None).map(|res: PollResult| res.into_data()) |
| 1275 | } |
| 1276 | } |
| 1277 | |
| 1278 | impl<'a> OrderedStream for SignalStream<'a> { |
| 1279 | type Data = Message; |
| 1280 | type Ordering = Sequence; |
| 1281 | |
| 1282 | fn poll_next_before( |
| 1283 | self: Pin<&mut Self>, |
| 1284 | cx: &mut Context<'_>, |
| 1285 | before: Option<&Self::Ordering>, |
| 1286 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
| 1287 | let this = self.get_mut(); |
| 1288 | loop { |
| 1289 | match ready!(OrderedStream::poll_next_before( |
| 1290 | Pin::new(&mut this.stream), |
| 1291 | cx, |
| 1292 | before |
| 1293 | )) { |
| 1294 | PollResult::Item { data, ordering } => { |
| 1295 | if let Ok(msg) = data { |
| 1296 | if let Ok(true) = this.filter(&msg) { |
| 1297 | return Poll::Ready(PollResult::Item { |
| 1298 | data: msg, |
| 1299 | ordering, |
| 1300 | }); |
| 1301 | } |
| 1302 | } |
| 1303 | } |
| 1304 | PollResult::Terminated => return Poll::Ready(PollResult::Terminated), |
| 1305 | PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore), |
| 1306 | } |
| 1307 | } |
| 1308 | } |
| 1309 | } |
| 1310 | |
| 1311 | impl<'a> stream::FusedStream for SignalStream<'a> { |
| 1312 | fn is_terminated(&self) -> bool { |
| 1313 | ordered_stream::FusedOrderedStream::is_terminated(&self.stream) |
| 1314 | } |
| 1315 | } |
| 1316 | |
| 1317 | #[async_trait::async_trait ] |
| 1318 | impl AsyncDrop for SignalStream<'_> { |
| 1319 | async fn async_drop(self) { |
| 1320 | let (signals: MessageStream, names: Option, _buffered: Option<(Result, …)>) = self.stream.into_inner(); |
| 1321 | signals.async_drop().await; |
| 1322 | if let Some(names: MessageStream) = names { |
| 1323 | names.async_drop().await; |
| 1324 | } |
| 1325 | } |
| 1326 | } |
| 1327 | |
| 1328 | impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> { |
| 1329 | fn from(proxy: crate::blocking::Proxy<'a>) -> Self { |
| 1330 | proxy.into_inner() |
| 1331 | } |
| 1332 | } |
| 1333 | |
| 1334 | /// This trait is implemented by all async proxies, which are generated with the |
| 1335 | /// [`dbus_proxy`](zbus::dbus_proxy) macro. |
| 1336 | pub trait ProxyImpl<'c> |
| 1337 | where |
| 1338 | Self: Sized, |
| 1339 | { |
| 1340 | /// Returns a customizable builder for this proxy. |
| 1341 | fn builder(conn: &Connection) -> Builder<'c, Self>; |
| 1342 | |
| 1343 | /// Consumes `self`, returning the underlying `zbus::Proxy`. |
| 1344 | fn into_inner(self) -> Proxy<'c>; |
| 1345 | |
| 1346 | /// The reference to the underlying `zbus::Proxy`. |
| 1347 | fn inner(&self) -> &Proxy<'c>; |
| 1348 | } |
| 1349 | |
| 1350 | #[cfg (test)] |
| 1351 | mod tests { |
| 1352 | use super::*; |
| 1353 | use crate::{connection, interface , object_server::SignalContext, proxy, utils::block_on}; |
| 1354 | use futures_util::StreamExt; |
| 1355 | use ntest::timeout; |
| 1356 | use test_log::test; |
| 1357 | |
| 1358 | #[test ] |
| 1359 | #[timeout(15000)] |
| 1360 | fn signal() { |
| 1361 | block_on(test_signal()).unwrap(); |
| 1362 | } |
| 1363 | |
| 1364 | async fn test_signal() -> Result<()> { |
| 1365 | // Register a well-known name with the session bus and ensure we get the appropriate |
| 1366 | // signals called for that. |
| 1367 | let conn = Connection::session().await?; |
| 1368 | let dest_conn = Connection::session().await?; |
| 1369 | let unique_name = dest_conn.unique_name().unwrap().clone(); |
| 1370 | |
| 1371 | let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest" ; |
| 1372 | let proxy: Proxy<'_> = Builder::new(&conn) |
| 1373 | .destination(well_known)? |
| 1374 | .path("/does/not/matter" )? |
| 1375 | .interface("does.not.matter" )? |
| 1376 | .build() |
| 1377 | .await?; |
| 1378 | let mut owner_changed_stream = proxy.receive_owner_changed().await?; |
| 1379 | |
| 1380 | let proxy = fdo::DBusProxy::new(&dest_conn).await?; |
| 1381 | let mut name_acquired_stream = proxy |
| 1382 | .inner() |
| 1383 | .receive_signal_with_args("NameAcquired" , &[(0, well_known)]) |
| 1384 | .await?; |
| 1385 | |
| 1386 | let prop_stream = proxy |
| 1387 | .inner() |
| 1388 | .receive_property_changed("SomeProp" ) |
| 1389 | .await |
| 1390 | .filter_map(|changed| async move { |
| 1391 | let v: Option<u32> = changed.get().await.ok(); |
| 1392 | dbg!(v) |
| 1393 | }); |
| 1394 | drop(proxy); |
| 1395 | drop(prop_stream); |
| 1396 | |
| 1397 | dest_conn.request_name(well_known).await?; |
| 1398 | |
| 1399 | let (new_owner, acquired_signal) = |
| 1400 | futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),); |
| 1401 | |
| 1402 | assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name); |
| 1403 | |
| 1404 | let acquired_signal = acquired_signal.unwrap(); |
| 1405 | assert_eq!( |
| 1406 | acquired_signal.body().deserialize::<&str>().unwrap(), |
| 1407 | well_known |
| 1408 | ); |
| 1409 | |
| 1410 | let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter" , "does.not.matter" ).await?; |
| 1411 | let mut unique_name_changed_stream = proxy.receive_owner_changed().await?; |
| 1412 | |
| 1413 | drop(dest_conn); |
| 1414 | name_acquired_stream.async_drop().await; |
| 1415 | |
| 1416 | // There shouldn't be an owner anymore. |
| 1417 | let new_owner = owner_changed_stream.next().await; |
| 1418 | assert!(new_owner.unwrap().is_none()); |
| 1419 | |
| 1420 | let new_unique_owner = unique_name_changed_stream.next().await; |
| 1421 | assert!(new_unique_owner.unwrap().is_none()); |
| 1422 | |
| 1423 | Ok(()) |
| 1424 | } |
| 1425 | |
| 1426 | #[test ] |
| 1427 | #[timeout(15000)] |
| 1428 | fn signal_stream_deadlock() { |
| 1429 | block_on(test_signal_stream_deadlock()).unwrap(); |
| 1430 | } |
| 1431 | |
| 1432 | /// Tests deadlocking in signal reception when the message queue is full. |
| 1433 | /// |
| 1434 | /// Creates a connection with a small message queue, and a service that |
| 1435 | /// emits signals at a high rate. First a listener is created that listens |
| 1436 | /// for that signal which should fill the small queue. Then another signal |
| 1437 | /// signal listener is created against another signal. Previously, this second |
| 1438 | /// call to add the match rule never resolved and resulted in a deadlock. |
| 1439 | async fn test_signal_stream_deadlock() -> Result<()> { |
| 1440 | #[proxy( |
| 1441 | gen_blocking = false, |
| 1442 | default_path = "/org/zbus/Test" , |
| 1443 | default_service = "org.zbus.Test.MR501" , |
| 1444 | interface = "org.zbus.Test" |
| 1445 | )] |
| 1446 | trait Test { |
| 1447 | #[zbus(signal)] |
| 1448 | fn my_signal(&self, msg: &str) -> Result<()>; |
| 1449 | } |
| 1450 | |
| 1451 | struct TestIface; |
| 1452 | |
| 1453 | #[interface(name = "org.zbus.Test" )] |
| 1454 | impl TestIface { |
| 1455 | #[zbus(signal)] |
| 1456 | async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>; |
| 1457 | } |
| 1458 | |
| 1459 | let test_iface = TestIface; |
| 1460 | let server_conn = connection::Builder::session()? |
| 1461 | .name("org.zbus.Test.MR501" )? |
| 1462 | .serve_at("/org/zbus/Test" , test_iface)? |
| 1463 | .build() |
| 1464 | .await?; |
| 1465 | |
| 1466 | let client_conn = connection::Builder::session()? |
| 1467 | .max_queued(1) |
| 1468 | .build() |
| 1469 | .await?; |
| 1470 | |
| 1471 | let test_proxy = TestProxy::new(&client_conn).await?; |
| 1472 | let test_prop_proxy = PropertiesProxy::builder(&client_conn) |
| 1473 | .destination("org.zbus.Test.MR501" )? |
| 1474 | .path("/org/zbus/Test" )? |
| 1475 | .build() |
| 1476 | .await?; |
| 1477 | |
| 1478 | let (tx, mut rx) = tokio::sync::mpsc::channel(1); |
| 1479 | |
| 1480 | let handle = { |
| 1481 | let tx = tx.clone(); |
| 1482 | let conn = server_conn.clone(); |
| 1483 | let server_fut = async move { |
| 1484 | use std::time::Duration; |
| 1485 | |
| 1486 | #[cfg (not(feature = "tokio" ))] |
| 1487 | use async_io::Timer; |
| 1488 | |
| 1489 | #[cfg (feature = "tokio" )] |
| 1490 | use tokio::time::sleep; |
| 1491 | |
| 1492 | let iface_ref = conn |
| 1493 | .object_server() |
| 1494 | .interface::<_, TestIface>("/org/zbus/Test" ) |
| 1495 | .await |
| 1496 | .unwrap(); |
| 1497 | |
| 1498 | let context = iface_ref.signal_context(); |
| 1499 | while !tx.is_closed() { |
| 1500 | for _ in 0..10 { |
| 1501 | TestIface::my_signal(context, "This is a test" ) |
| 1502 | .await |
| 1503 | .unwrap(); |
| 1504 | } |
| 1505 | |
| 1506 | #[cfg (not(feature = "tokio" ))] |
| 1507 | Timer::after(Duration::from_millis(5)).await; |
| 1508 | |
| 1509 | #[cfg (feature = "tokio" )] |
| 1510 | sleep(Duration::from_millis(5)).await; |
| 1511 | } |
| 1512 | }; |
| 1513 | server_conn.executor().spawn(server_fut, "server_task" ) |
| 1514 | }; |
| 1515 | |
| 1516 | let signal_fut = async { |
| 1517 | let mut signal_stream = test_proxy.receive_my_signal().await.unwrap(); |
| 1518 | |
| 1519 | tx.send(()).await.unwrap(); |
| 1520 | |
| 1521 | while let Some(_signal) = signal_stream.next().await {} |
| 1522 | }; |
| 1523 | |
| 1524 | let prop_fut = async move { |
| 1525 | rx.recv().await.unwrap(); |
| 1526 | let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap(); |
| 1527 | }; |
| 1528 | |
| 1529 | futures_util::pin_mut!(signal_fut); |
| 1530 | futures_util::pin_mut!(prop_fut); |
| 1531 | |
| 1532 | futures_util::future::select(signal_fut, prop_fut).await; |
| 1533 | |
| 1534 | handle.await; |
| 1535 | |
| 1536 | Ok(()) |
| 1537 | } |
| 1538 | } |
| 1539 | |