1 | //! The client-side proxy API. |
2 | |
3 | use enumflags2::{bitflags , BitFlags}; |
4 | use event_listener::{Event, EventListener}; |
5 | use futures_core::{ready, stream}; |
6 | use futures_util::{future::Either, stream::Map}; |
7 | use ordered_stream::{join as join_streams, FromFuture, Join, OrderedStream, PollResult}; |
8 | use static_assertions::assert_impl_all; |
9 | use std::{ |
10 | collections::{HashMap, HashSet}, |
11 | fmt, |
12 | future::Future, |
13 | ops::Deref, |
14 | pin::Pin, |
15 | sync::{Arc, OnceLock, RwLock, RwLockReadGuard}, |
16 | task::{Context, Poll}, |
17 | }; |
18 | use tracing::{debug, info_span, instrument, trace, Instrument}; |
19 | |
20 | use zbus_names::{BusName, InterfaceName, MemberName, UniqueName}; |
21 | use zvariant::{ObjectPath, OwnedValue, Str, Value}; |
22 | |
23 | use crate::{ |
24 | fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy}, |
25 | message::{Flags, Message, Sequence, Type}, |
26 | AsyncDrop, Connection, Error, Executor, MatchRule, MessageStream, OwnedMatchRule, Result, Task, |
27 | }; |
28 | |
29 | mod builder; |
30 | pub use builder::{Builder, CacheProperties, ProxyDefault}; |
31 | |
32 | /// A client-side interface proxy. |
33 | /// |
34 | /// A `Proxy` is a helper to interact with an interface on a remote object. |
35 | /// |
36 | /// # Example |
37 | /// |
38 | /// ``` |
39 | /// use std::result::Result; |
40 | /// use std::error::Error; |
41 | /// use zbus::{Connection, Proxy}; |
42 | /// |
43 | /// #[tokio::main] |
44 | /// async fn main() -> Result<(), Box<dyn Error>> { |
45 | /// let connection = Connection::session().await?; |
46 | /// let p = Proxy::new( |
47 | /// &connection, |
48 | /// "org.freedesktop.DBus" , |
49 | /// "/org/freedesktop/DBus" , |
50 | /// "org.freedesktop.DBus" , |
51 | /// ).await?; |
52 | /// // owned return value |
53 | /// let _id: String = p.call("GetId" , &()).await?; |
54 | /// // borrowed return value |
55 | /// let body = p.call_method("GetId" , &()).await?.body(); |
56 | /// let _id: &str = body.deserialize()?; |
57 | /// |
58 | /// Ok(()) |
59 | /// } |
60 | /// ``` |
61 | /// |
62 | /// # Note |
63 | /// |
64 | /// It is recommended to use the [`proxy`] macro, which provides a more convenient and |
65 | /// type-safe *façade* `Proxy` derived from a Rust trait. |
66 | /// |
67 | /// [`futures` crate]: https://crates.io/crates/futures |
68 | /// [`proxy`]: attr.proxy.html |
69 | #[derive (Clone, Debug)] |
70 | pub struct Proxy<'a> { |
71 | pub(crate) inner: Arc<ProxyInner<'a>>, |
72 | } |
73 | |
74 | assert_impl_all!(Proxy<'_>: Send, Sync, Unpin); |
75 | |
76 | /// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen |
77 | /// (and possibly other crates). |
78 | pub(crate) struct ProxyInnerStatic { |
79 | pub(crate) conn: Connection, |
80 | dest_owner_change_match_rule: OnceLock<OwnedMatchRule>, |
81 | } |
82 | |
83 | impl fmt::Debug for ProxyInnerStatic { |
84 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
85 | f&mut DebugStruct<'_, '_>.debug_struct("ProxyInnerStatic" ) |
86 | .field( |
87 | name:"dest_owner_change_match_rule" , |
88 | &self.dest_owner_change_match_rule, |
89 | ) |
90 | .finish_non_exhaustive() |
91 | } |
92 | } |
93 | |
94 | #[derive (Debug)] |
95 | pub(crate) struct ProxyInner<'a> { |
96 | inner_without_borrows: ProxyInnerStatic, |
97 | pub(crate) destination: BusName<'a>, |
98 | pub(crate) path: ObjectPath<'a>, |
99 | pub(crate) interface: InterfaceName<'a>, |
100 | |
101 | /// Cache of property values. |
102 | property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>, |
103 | /// Set of properties which do not get cached, by name. |
104 | /// This overrides proxy-level caching behavior. |
105 | uncached_properties: HashSet<Str<'a>>, |
106 | } |
107 | |
108 | impl Drop for ProxyInnerStatic { |
109 | fn drop(&mut self) { |
110 | if let Some(rule: OwnedMatchRule) = self.dest_owner_change_match_rule.take() { |
111 | self.conn.queue_remove_match(rule); |
112 | } |
113 | } |
114 | } |
115 | |
116 | /// A property changed event. |
117 | /// |
118 | /// The property changed event generated by [`PropertyStream`]. |
119 | pub struct PropertyChanged<'a, T> { |
120 | name: &'a str, |
121 | properties: Arc<PropertiesCache>, |
122 | proxy: Proxy<'a>, |
123 | phantom: std::marker::PhantomData<T>, |
124 | } |
125 | |
126 | impl<'a, T> PropertyChanged<'a, T> { |
127 | // The name of the property that changed. |
128 | pub fn name(&self) -> &str { |
129 | self.name |
130 | } |
131 | |
132 | // Get the raw value of the property that changed. |
133 | // |
134 | // If the notification signal contained the new value, it has been cached already and this call |
135 | // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch |
136 | // and cache the new value. |
137 | pub async fn get_raw<'p>(&'p self) -> Result<impl Deref<Target = Value<'static>> + 'p> { |
138 | struct Wrapper<'w> { |
139 | name: &'w str, |
140 | values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>, |
141 | } |
142 | |
143 | impl<'w> Deref for Wrapper<'w> { |
144 | type Target = Value<'static>; |
145 | |
146 | fn deref(&self) -> &Self::Target { |
147 | self.values |
148 | .get(self.name) |
149 | .expect("PropertyStream with no corresponding property" ) |
150 | .value |
151 | .as_ref() |
152 | .expect("PropertyStream with no corresponding property" ) |
153 | } |
154 | } |
155 | |
156 | { |
157 | let values = self.properties.values.read().expect("lock poisoned" ); |
158 | if values |
159 | .get(self.name) |
160 | .expect("PropertyStream with no corresponding property" ) |
161 | .value |
162 | .is_some() |
163 | { |
164 | return Ok(Wrapper { |
165 | name: self.name, |
166 | values, |
167 | }); |
168 | } |
169 | } |
170 | |
171 | // The property was invalidated, so we need to fetch the new value. |
172 | let properties_proxy = self.proxy.properties_proxy(); |
173 | let value = properties_proxy |
174 | .get(self.proxy.inner.interface.clone(), self.name) |
175 | .await |
176 | .map_err(crate::Error::from)?; |
177 | |
178 | // Save the new value |
179 | { |
180 | let mut values = self.properties.values.write().expect("lock poisoned" ); |
181 | |
182 | values |
183 | .get_mut(self.name) |
184 | .expect("PropertyStream with no corresponding property" ) |
185 | .value = Some(value); |
186 | } |
187 | |
188 | Ok(Wrapper { |
189 | name: self.name, |
190 | values: self.properties.values.read().expect("lock poisoned" ), |
191 | }) |
192 | } |
193 | } |
194 | |
195 | impl<T> PropertyChanged<'_, T> |
196 | where |
197 | T: TryFrom<zvariant::OwnedValue>, |
198 | T::Error: Into<crate::Error>, |
199 | { |
200 | // Get the value of the property that changed. |
201 | // |
202 | // If the notification signal contained the new value, it has been cached already and this call |
203 | // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch |
204 | // and cache the new value. |
205 | pub async fn get(&self) -> Result<T> { |
206 | self.get_raw() |
207 | .await |
208 | .and_then(|v: impl Deref>| T::try_from(OwnedValue::try_from(&*v)?).map_err(op:Into::into)) |
209 | } |
210 | } |
211 | |
212 | /// A [`stream::Stream`] implementation that yields property change notifications. |
213 | /// |
214 | /// Use [`Proxy::receive_property_changed`] to create an instance of this type. |
215 | #[derive (Debug)] |
216 | pub struct PropertyStream<'a, T> { |
217 | name: &'a str, |
218 | proxy: Proxy<'a>, |
219 | changed_listener: EventListener, |
220 | phantom: std::marker::PhantomData<T>, |
221 | } |
222 | |
223 | impl<'a, T> stream::Stream for PropertyStream<'a, T> |
224 | where |
225 | T: Unpin, |
226 | { |
227 | type Item = PropertyChanged<'a, T>; |
228 | |
229 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
230 | let m = self.get_mut(); |
231 | let properties = match m.proxy.get_property_cache() { |
232 | Some(properties) => properties.clone(), |
233 | // With no cache, we will get no updates; return immediately |
234 | None => return Poll::Ready(None), |
235 | }; |
236 | ready!(Pin::new(&mut m.changed_listener).poll(cx)); |
237 | |
238 | m.changed_listener = properties |
239 | .values |
240 | .read() |
241 | .expect("lock poisoned" ) |
242 | .get(m.name) |
243 | .expect("PropertyStream with no corresponding property" ) |
244 | .event |
245 | .listen(); |
246 | |
247 | Poll::Ready(Some(PropertyChanged { |
248 | name: m.name, |
249 | properties, |
250 | proxy: m.proxy.clone(), |
251 | phantom: std::marker::PhantomData, |
252 | })) |
253 | } |
254 | } |
255 | |
256 | #[derive (Debug)] |
257 | pub(crate) struct PropertiesCache { |
258 | values: RwLock<HashMap<String, PropertyValue>>, |
259 | caching_result: RwLock<CachingResult>, |
260 | } |
261 | |
262 | #[derive (Debug)] |
263 | enum CachingResult { |
264 | Caching { ready: Event }, |
265 | Cached { result: Result<()> }, |
266 | } |
267 | |
268 | impl PropertiesCache { |
269 | #[instrument (skip_all)] |
270 | fn new( |
271 | proxy: PropertiesProxy<'static>, |
272 | interface: InterfaceName<'static>, |
273 | executor: &Executor<'_>, |
274 | uncached_properties: HashSet<zvariant::Str<'static>>, |
275 | ) -> (Arc<Self>, Task<()>) { |
276 | let cache = Arc::new(PropertiesCache { |
277 | values: Default::default(), |
278 | caching_result: RwLock::new(CachingResult::Caching { |
279 | ready: Event::new(), |
280 | }), |
281 | }); |
282 | |
283 | let cache_clone = cache.clone(); |
284 | let task_name = format!(" {interface} proxy caching" ); |
285 | let proxy_caching = async move { |
286 | let result = cache_clone |
287 | .init(proxy, interface, uncached_properties) |
288 | .await; |
289 | let (prop_changes, interface, uncached_properties) = { |
290 | let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned" ); |
291 | let ready = match &*caching_result { |
292 | CachingResult::Caching { ready } => ready, |
293 | // SAFETY: This is the only part of the code that changes this state and it's |
294 | // only run once. |
295 | _ => unreachable!(), |
296 | }; |
297 | match result { |
298 | Ok((prop_changes, interface, uncached_properties)) => { |
299 | ready.notify(usize::MAX); |
300 | *caching_result = CachingResult::Cached { result: Ok(()) }; |
301 | |
302 | (prop_changes, interface, uncached_properties) |
303 | } |
304 | Err(e) => { |
305 | ready.notify(usize::MAX); |
306 | *caching_result = CachingResult::Cached { result: Err(e) }; |
307 | |
308 | return; |
309 | } |
310 | } |
311 | }; |
312 | |
313 | if let Err(e) = cache_clone |
314 | .keep_updated(prop_changes, interface, uncached_properties) |
315 | .await |
316 | { |
317 | debug!("Error keeping properties cache updated: {e}" ); |
318 | } |
319 | } |
320 | .instrument(info_span!("{}" , task_name)); |
321 | let task = executor.spawn(proxy_caching, &task_name); |
322 | |
323 | (cache, task) |
324 | } |
325 | |
326 | // new() runs this in a task it spawns for initialization of properties cache. |
327 | async fn init( |
328 | &self, |
329 | proxy: PropertiesProxy<'static>, |
330 | interface: InterfaceName<'static>, |
331 | uncached_properties: HashSet<zvariant::Str<'static>>, |
332 | ) -> Result<( |
333 | PropertiesChangedStream<'static>, |
334 | InterfaceName<'static>, |
335 | HashSet<zvariant::Str<'static>>, |
336 | )> { |
337 | use ordered_stream::OrderedStreamExt; |
338 | |
339 | let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left); |
340 | |
341 | let get_all = proxy |
342 | .inner() |
343 | .connection() |
344 | .call_method_raw( |
345 | Some(proxy.inner().destination()), |
346 | proxy.inner().path(), |
347 | Some(proxy.inner().interface()), |
348 | "GetAll" , |
349 | BitFlags::empty(), |
350 | &interface, |
351 | ) |
352 | .await |
353 | .map(|r| FromFuture::from(r.expect("no reply" )).map(Either::Right))?; |
354 | |
355 | let mut join = join_streams(prop_changes, get_all); |
356 | |
357 | loop { |
358 | match join.next().await { |
359 | Some(Either::Left(_update)) => { |
360 | // discard updates prior to the initial population |
361 | } |
362 | Some(Either::Right(populate)) => { |
363 | populate?.body().deserialize().map(|values| { |
364 | self.update_cache(&uncached_properties, &values, Vec::new(), &interface); |
365 | })?; |
366 | break; |
367 | } |
368 | None => break, |
369 | } |
370 | } |
371 | if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() { |
372 | // if an update was buffered, then it happened after the get_all returned and needs to |
373 | // be applied before we discard the join |
374 | if let Ok(args) = update.args() { |
375 | if args.interface_name == interface { |
376 | self.update_cache( |
377 | &uncached_properties, |
378 | &args.changed_properties, |
379 | args.invalidated_properties, |
380 | &interface, |
381 | ); |
382 | } |
383 | } |
384 | } |
385 | // This is needed to avoid a "implementation of `OrderedStream` is not general enough" |
386 | // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead |
387 | // of directly to the stream. |
388 | let prop_changes = join.into_inner().0.into_inner(); |
389 | |
390 | Ok((prop_changes, interface, uncached_properties)) |
391 | } |
392 | |
393 | // new() runs this in a task it spawns for keeping the cache in sync. |
394 | #[instrument (skip_all)] |
395 | async fn keep_updated( |
396 | &self, |
397 | mut prop_changes: PropertiesChangedStream<'static>, |
398 | interface: InterfaceName<'static>, |
399 | uncached_properties: HashSet<zvariant::Str<'static>>, |
400 | ) -> Result<()> { |
401 | use futures_util::StreamExt; |
402 | |
403 | trace!("Listening for property changes on {interface}..." ); |
404 | while let Some(update) = prop_changes.next().await { |
405 | if let Ok(args) = update.args() { |
406 | if args.interface_name == interface { |
407 | self.update_cache( |
408 | &uncached_properties, |
409 | &args.changed_properties, |
410 | args.invalidated_properties, |
411 | &interface, |
412 | ); |
413 | } |
414 | } |
415 | } |
416 | |
417 | Ok(()) |
418 | } |
419 | |
420 | fn update_cache( |
421 | &self, |
422 | uncached_properties: &HashSet<Str<'_>>, |
423 | changed: &HashMap<&str, Value<'_>>, |
424 | invalidated: Vec<&str>, |
425 | interface: &InterfaceName<'_>, |
426 | ) { |
427 | let mut values = self.values.write().expect("lock poisoned" ); |
428 | |
429 | for inval in invalidated { |
430 | if uncached_properties.contains(&Str::from(inval)) { |
431 | debug!( |
432 | "Ignoring invalidation of uncached property ` {}. {}`" , |
433 | interface, inval |
434 | ); |
435 | continue; |
436 | } |
437 | trace!("Property ` {interface}. {inval}` invalidated" ); |
438 | |
439 | if let Some(entry) = values.get_mut(inval) { |
440 | entry.value = None; |
441 | entry.event.notify(usize::MAX); |
442 | } |
443 | } |
444 | |
445 | for (property_name, value) in changed { |
446 | if uncached_properties.contains(&Str::from(*property_name)) { |
447 | debug!( |
448 | "Ignoring update of uncached property ` {}. {}`" , |
449 | interface, property_name |
450 | ); |
451 | continue; |
452 | } |
453 | trace!("Property ` {interface}. {property_name}` updated" ); |
454 | |
455 | let entry = values.entry(property_name.to_string()).or_default(); |
456 | |
457 | let value = match OwnedValue::try_from(value) { |
458 | Ok(value) => value, |
459 | Err(e) => { |
460 | debug!( |
461 | "Failed to convert property ` {interface}. {property_name}` to OwnedValue: {e}" |
462 | ); |
463 | continue; |
464 | } |
465 | }; |
466 | entry.value = Some(value); |
467 | entry.event.notify(usize::MAX); |
468 | } |
469 | } |
470 | |
471 | /// Wait for the cache to be populated and return any error encountered during population |
472 | pub(crate) async fn ready(&self) -> Result<()> { |
473 | let listener = match &*self.caching_result.read().expect("lock poisoned" ) { |
474 | CachingResult::Caching { ready } => ready.listen(), |
475 | CachingResult::Cached { result } => return result.clone(), |
476 | }; |
477 | listener.await; |
478 | |
479 | // It must be ready now. |
480 | match &*self.caching_result.read().expect("lock poisoned" ) { |
481 | // SAFETY: We were just notified that state has changed to `Cached` and we never go back |
482 | // to `Caching` once in `Cached`. |
483 | CachingResult::Caching { .. } => unreachable!(), |
484 | CachingResult::Cached { result } => result.clone(), |
485 | } |
486 | } |
487 | } |
488 | |
489 | impl<'a> ProxyInner<'a> { |
490 | pub(crate) fn new( |
491 | conn: Connection, |
492 | destination: BusName<'a>, |
493 | path: ObjectPath<'a>, |
494 | interface: InterfaceName<'a>, |
495 | cache: CacheProperties, |
496 | uncached_properties: HashSet<Str<'a>>, |
497 | ) -> Self { |
498 | let property_cache = match cache { |
499 | CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()), |
500 | CacheProperties::No => None, |
501 | }; |
502 | Self { |
503 | inner_without_borrows: ProxyInnerStatic { |
504 | conn, |
505 | dest_owner_change_match_rule: OnceLock::new(), |
506 | }, |
507 | destination, |
508 | path, |
509 | interface, |
510 | property_cache, |
511 | uncached_properties, |
512 | } |
513 | } |
514 | |
515 | /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination. |
516 | /// |
517 | /// If the destination is a unique name, we will not subscribe to the signal. |
518 | pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> { |
519 | if !self.inner_without_borrows.conn.is_bus() { |
520 | // Names don't mean much outside the bus context. |
521 | return Ok(()); |
522 | } |
523 | |
524 | let well_known_name = match &self.destination { |
525 | BusName::WellKnown(well_known_name) => well_known_name, |
526 | BusName::Unique(_) => return Ok(()), |
527 | }; |
528 | |
529 | if self |
530 | .inner_without_borrows |
531 | .dest_owner_change_match_rule |
532 | .get() |
533 | .is_some() |
534 | { |
535 | // Already watching over the bus for any name updates so nothing to do here. |
536 | return Ok(()); |
537 | } |
538 | |
539 | let conn = &self.inner_without_borrows.conn; |
540 | let signal_rule: OwnedMatchRule = MatchRule::builder() |
541 | .msg_type(Type::Signal) |
542 | .sender("org.freedesktop.DBus" )? |
543 | .path("/org/freedesktop/DBus" )? |
544 | .interface("org.freedesktop.DBus" )? |
545 | .member("NameOwnerChanged" )? |
546 | .add_arg(well_known_name.as_str())? |
547 | .build() |
548 | .to_owned() |
549 | .into(); |
550 | |
551 | conn.add_match( |
552 | signal_rule.clone(), |
553 | Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED), |
554 | ) |
555 | .await?; |
556 | |
557 | if self |
558 | .inner_without_borrows |
559 | .dest_owner_change_match_rule |
560 | .set(signal_rule.clone()) |
561 | .is_err() |
562 | { |
563 | // we raced another destination_unique_name call and added it twice |
564 | conn.remove_match(signal_rule).await?; |
565 | } |
566 | |
567 | Ok(()) |
568 | } |
569 | } |
570 | |
571 | const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8; |
572 | |
573 | impl<'a> Proxy<'a> { |
574 | /// Create a new `Proxy` for the given destination/path/interface. |
575 | pub async fn new<D, P, I>( |
576 | conn: &Connection, |
577 | destination: D, |
578 | path: P, |
579 | interface: I, |
580 | ) -> Result<Proxy<'a>> |
581 | where |
582 | D: TryInto<BusName<'a>>, |
583 | P: TryInto<ObjectPath<'a>>, |
584 | I: TryInto<InterfaceName<'a>>, |
585 | D::Error: Into<Error>, |
586 | P::Error: Into<Error>, |
587 | I::Error: Into<Error>, |
588 | { |
589 | Builder::new(conn) |
590 | .destination(destination)? |
591 | .path(path)? |
592 | .interface(interface)? |
593 | .build() |
594 | .await |
595 | } |
596 | |
597 | /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all |
598 | /// passed arguments. |
599 | pub async fn new_owned<D, P, I>( |
600 | conn: Connection, |
601 | destination: D, |
602 | path: P, |
603 | interface: I, |
604 | ) -> Result<Proxy<'a>> |
605 | where |
606 | D: TryInto<BusName<'static>>, |
607 | P: TryInto<ObjectPath<'static>>, |
608 | I: TryInto<InterfaceName<'static>>, |
609 | D::Error: Into<Error>, |
610 | P::Error: Into<Error>, |
611 | I::Error: Into<Error>, |
612 | { |
613 | Builder::new(&conn) |
614 | .destination(destination)? |
615 | .path(path)? |
616 | .interface(interface)? |
617 | .build() |
618 | .await |
619 | } |
620 | |
621 | /// Get a reference to the associated connection. |
622 | pub fn connection(&self) -> &Connection { |
623 | &self.inner.inner_without_borrows.conn |
624 | } |
625 | |
626 | /// Get a reference to the destination service name. |
627 | pub fn destination(&self) -> &BusName<'_> { |
628 | &self.inner.destination |
629 | } |
630 | |
631 | /// Get a reference to the object path. |
632 | pub fn path(&self) -> &ObjectPath<'_> { |
633 | &self.inner.path |
634 | } |
635 | |
636 | /// Get a reference to the interface. |
637 | pub fn interface(&self) -> &InterfaceName<'_> { |
638 | &self.inner.interface |
639 | } |
640 | |
641 | /// Introspect the associated object, and return the XML description. |
642 | /// |
643 | /// See the [xml](xml/index.html) module for parsing the |
644 | /// result. |
645 | pub async fn introspect(&self) -> fdo::Result<String> { |
646 | let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn) |
647 | .destination(&self.inner.destination)? |
648 | .path(&self.inner.path)? |
649 | .build() |
650 | .await?; |
651 | |
652 | proxy.introspect().await |
653 | } |
654 | |
655 | fn properties_proxy(&self) -> PropertiesProxy<'_> { |
656 | PropertiesProxy::builder(&self.inner.inner_without_borrows.conn) |
657 | // Safe because already checked earlier |
658 | .destination(self.inner.destination.as_ref()) |
659 | .unwrap() |
660 | // Safe because already checked earlier |
661 | .path(self.inner.path.as_ref()) |
662 | .unwrap() |
663 | // does not have properties |
664 | .cache_properties(CacheProperties::No) |
665 | .build_internal() |
666 | .unwrap() |
667 | .into() |
668 | } |
669 | |
670 | fn owned_properties_proxy(&self) -> PropertiesProxy<'static> { |
671 | PropertiesProxy::builder(&self.inner.inner_without_borrows.conn) |
672 | // Safe because already checked earlier |
673 | .destination(self.inner.destination.to_owned()) |
674 | .unwrap() |
675 | // Safe because already checked earlier |
676 | .path(self.inner.path.to_owned()) |
677 | .unwrap() |
678 | // does not have properties |
679 | .cache_properties(CacheProperties::No) |
680 | .build_internal() |
681 | .unwrap() |
682 | .into() |
683 | } |
684 | |
685 | /// Get the cache, starting it in the background if needed. |
686 | /// |
687 | /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors |
688 | /// encountered in the population. |
689 | pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> { |
690 | let cache = match &self.inner.property_cache { |
691 | Some(cache) => cache, |
692 | None => return None, |
693 | }; |
694 | let (cache, _) = &cache.get_or_init(|| { |
695 | let proxy = self.owned_properties_proxy(); |
696 | let interface = self.interface().to_owned(); |
697 | let uncached_properties: HashSet<zvariant::Str<'static>> = self |
698 | .inner |
699 | .uncached_properties |
700 | .iter() |
701 | .map(|s| s.to_owned()) |
702 | .collect(); |
703 | let executor = self.connection().executor(); |
704 | |
705 | PropertiesCache::new(proxy, interface, executor, uncached_properties) |
706 | }); |
707 | |
708 | Some(cache) |
709 | } |
710 | |
711 | /// Get the cached value of the property `property_name`. |
712 | /// |
713 | /// This returns `None` if the property is not in the cache. This could be because the cache |
714 | /// was invalidated by an update, because caching was disabled for this property or proxy, or |
715 | /// because the cache has not yet been populated. Use `get_property` to fetch the value from |
716 | /// the peer. |
717 | pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>> |
718 | where |
719 | T: TryFrom<OwnedValue>, |
720 | T::Error: Into<Error>, |
721 | { |
722 | self.cached_property_raw(property_name) |
723 | .as_deref() |
724 | .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into)) |
725 | .transpose() |
726 | } |
727 | |
728 | /// Get the cached value of the property `property_name`. |
729 | /// |
730 | /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This |
731 | /// is useful if you want to avoid allocations and cloning. |
732 | pub fn cached_property_raw<'p>( |
733 | &'p self, |
734 | property_name: &'p str, |
735 | ) -> Option<impl Deref<Target = Value<'static>> + 'p> { |
736 | if let Some(values) = self |
737 | .inner |
738 | .property_cache |
739 | .as_ref() |
740 | .and_then(OnceLock::get) |
741 | .map(|c| c.0.values.read().expect("lock poisoned" )) |
742 | { |
743 | // ensure that the property is in the cache. |
744 | values |
745 | .get(property_name) |
746 | // if the property value has not yet been cached, this will return None. |
747 | .and_then(|e| e.value.as_ref())?; |
748 | |
749 | struct Wrapper<'a> { |
750 | values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>, |
751 | property_name: &'a str, |
752 | } |
753 | |
754 | impl Deref for Wrapper<'_> { |
755 | type Target = Value<'static>; |
756 | |
757 | fn deref(&self) -> &Self::Target { |
758 | self.values |
759 | .get(self.property_name) |
760 | .and_then(|e| e.value.as_ref()) |
761 | .map(|v| v.deref()) |
762 | .expect("inexistent property" ) |
763 | } |
764 | } |
765 | |
766 | Some(Wrapper { |
767 | values, |
768 | property_name, |
769 | }) |
770 | } else { |
771 | None |
772 | } |
773 | } |
774 | |
775 | async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> { |
776 | Ok(self |
777 | .properties_proxy() |
778 | .get(self.inner.interface.as_ref(), property_name) |
779 | .await?) |
780 | } |
781 | |
782 | /// Get the property `property_name`. |
783 | /// |
784 | /// Get the property value from the cache (if caching is enabled) or call the |
785 | /// `Get` method of the `org.freedesktop.DBus.Properties` interface. |
786 | pub async fn get_property<T>(&self, property_name: &str) -> Result<T> |
787 | where |
788 | T: TryFrom<OwnedValue>, |
789 | T::Error: Into<Error>, |
790 | { |
791 | if let Some(cache) = self.get_property_cache() { |
792 | cache.ready().await?; |
793 | } |
794 | if let Some(value) = self.cached_property(property_name)? { |
795 | return Ok(value); |
796 | } |
797 | |
798 | let value = self.get_proxy_property(property_name).await?; |
799 | value.try_into().map_err(Into::into) |
800 | } |
801 | |
802 | /// Set the property `property_name`. |
803 | /// |
804 | /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface. |
805 | pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()> |
806 | where |
807 | T: 't + Into<Value<'t>>, |
808 | { |
809 | self.properties_proxy() |
810 | .set(self.inner.interface.as_ref(), property_name, &value.into()) |
811 | .await |
812 | } |
813 | |
814 | /// Call a method and return the reply. |
815 | /// |
816 | /// Typically, you would want to use [`call`] method instead. Use this method if you need to |
817 | /// deserialize the reply message manually (this way, you can avoid the memory |
818 | /// allocation/copying, by deserializing the reply to an unowned type). |
819 | /// |
820 | /// [`call`]: struct.Proxy.html#method.call |
821 | pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message> |
822 | where |
823 | M: TryInto<MemberName<'m>>, |
824 | M::Error: Into<Error>, |
825 | B: serde::ser::Serialize + zvariant::DynamicType, |
826 | { |
827 | self.inner |
828 | .inner_without_borrows |
829 | .conn |
830 | .call_method( |
831 | Some(&self.inner.destination), |
832 | self.inner.path.as_str(), |
833 | Some(&self.inner.interface), |
834 | method_name, |
835 | body, |
836 | ) |
837 | .await |
838 | } |
839 | |
840 | /// Call a method and return the reply body. |
841 | /// |
842 | /// Use [`call_method`] instead if you need to deserialize the reply manually/separately. |
843 | /// |
844 | /// [`call_method`]: struct.Proxy.html#method.call_method |
845 | pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R> |
846 | where |
847 | M: TryInto<MemberName<'m>>, |
848 | M::Error: Into<Error>, |
849 | B: serde::ser::Serialize + zvariant::DynamicType, |
850 | R: for<'d> zvariant::DynamicDeserialize<'d>, |
851 | { |
852 | let reply = self.call_method(method_name, body).await?; |
853 | |
854 | reply.body().deserialize() |
855 | } |
856 | |
857 | /// Call a method and return the reply body, optionally supplying a set of |
858 | /// method flags to control the way the method call message is sent and handled. |
859 | /// |
860 | /// Use [`call`] instead if you do not need any special handling via additional flags. |
861 | /// If the `NoReplyExpected` flag is passed , this will return None immediately |
862 | /// after sending the message, similar to [`call_noreply`] |
863 | /// |
864 | /// [`call`]: struct.Proxy.html#method.call |
865 | /// [`call_noreply`]: struct.Proxy.html#method.call_noreply |
866 | pub async fn call_with_flags<'m, M, B, R>( |
867 | &self, |
868 | method_name: M, |
869 | flags: BitFlags<MethodFlags>, |
870 | body: &B, |
871 | ) -> Result<Option<R>> |
872 | where |
873 | M: TryInto<MemberName<'m>>, |
874 | M::Error: Into<Error>, |
875 | B: serde::ser::Serialize + zvariant::DynamicType, |
876 | R: for<'d> zvariant::DynamicDeserialize<'d>, |
877 | { |
878 | let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>(); |
879 | match self |
880 | .inner |
881 | .inner_without_borrows |
882 | .conn |
883 | .call_method_raw( |
884 | Some(self.destination()), |
885 | self.path(), |
886 | Some(self.interface()), |
887 | method_name, |
888 | flags, |
889 | body, |
890 | ) |
891 | .await? |
892 | { |
893 | Some(reply) => reply.await?.body().deserialize().map(Some), |
894 | None => Ok(None), |
895 | } |
896 | } |
897 | |
898 | /// Call a method without expecting a reply |
899 | /// |
900 | /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply. |
901 | pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()> |
902 | where |
903 | M: TryInto<MemberName<'m>>, |
904 | M::Error: Into<Error>, |
905 | B: serde::ser::Serialize + zvariant::DynamicType, |
906 | { |
907 | self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body) |
908 | .await?; |
909 | Ok(()) |
910 | } |
911 | |
912 | /// Create a stream for signal named `signal_name`. |
913 | pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>> |
914 | where |
915 | M: TryInto<MemberName<'m>>, |
916 | M::Error: Into<Error>, |
917 | { |
918 | self.receive_signal_with_args(signal_name, &[]).await |
919 | } |
920 | |
921 | /// Same as [`Proxy::receive_signal`] but with a filter. |
922 | /// |
923 | /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid |
924 | /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use |
925 | /// this method where possible. Note that this filtering is limited to arguments of string |
926 | /// types. |
927 | /// |
928 | /// The arguments are passed as a tuples of argument index and expected value. |
929 | pub async fn receive_signal_with_args<'m, M>( |
930 | &self, |
931 | signal_name: M, |
932 | args: &[(u8, &str)], |
933 | ) -> Result<SignalStream<'m>> |
934 | where |
935 | M: TryInto<MemberName<'m>>, |
936 | M::Error: Into<Error>, |
937 | { |
938 | let signal_name = signal_name.try_into().map_err(Into::into)?; |
939 | self.receive_signals(Some(signal_name), args).await |
940 | } |
941 | |
942 | async fn receive_signals<'m>( |
943 | &self, |
944 | signal_name: Option<MemberName<'m>>, |
945 | args: &[(u8, &str)], |
946 | ) -> Result<SignalStream<'m>> { |
947 | self.inner.subscribe_dest_owner_change().await?; |
948 | |
949 | SignalStream::new(self.clone(), signal_name, args).await |
950 | } |
951 | |
952 | /// Create a stream for all signals emitted by this service. |
953 | pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> { |
954 | self.receive_signals(None, &[]).await |
955 | } |
956 | |
957 | /// Get a stream to receive property changed events. |
958 | /// |
959 | /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it |
960 | /// will only receive the last update. |
961 | /// |
962 | /// If caching is not enabled on this proxy, the resulting stream will not return any events. |
963 | pub async fn receive_property_changed<'name: 'a, T>( |
964 | &self, |
965 | name: &'name str, |
966 | ) -> PropertyStream<'a, T> { |
967 | let properties = self.get_property_cache(); |
968 | let changed_listener = if let Some(properties) = &properties { |
969 | let mut values = properties.values.write().expect("lock poisoned" ); |
970 | let entry = values |
971 | .entry(name.to_string()) |
972 | .or_insert_with(PropertyValue::default); |
973 | entry.event.listen() |
974 | } else { |
975 | Event::new().listen() |
976 | }; |
977 | |
978 | PropertyStream { |
979 | name, |
980 | proxy: self.clone(), |
981 | changed_listener, |
982 | phantom: std::marker::PhantomData, |
983 | } |
984 | } |
985 | |
986 | /// Get a stream to receive destination owner changed events. |
987 | /// |
988 | /// If the proxy destination is a unique name, the stream will be notified of the peer |
989 | /// disconnection from the bus (with a `None` value). |
990 | /// |
991 | /// If the proxy destination is a well-known name, the stream will be notified whenever the name |
992 | /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the |
993 | /// name is released (with a `None` value). |
994 | /// |
995 | /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it |
996 | /// will only receive the last update. |
997 | pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> { |
998 | use futures_util::StreamExt; |
999 | let dbus_proxy = fdo::DBusProxy::builder(self.connection()) |
1000 | .cache_properties(CacheProperties::No) |
1001 | .build() |
1002 | .await?; |
1003 | Ok(OwnerChangedStream { |
1004 | stream: dbus_proxy |
1005 | .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())]) |
1006 | .await? |
1007 | .map(Box::new(move |signal| { |
1008 | let args = signal.args().unwrap(); |
1009 | let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned()); |
1010 | |
1011 | new_owner |
1012 | })), |
1013 | name: self.destination().clone(), |
1014 | }) |
1015 | } |
1016 | } |
1017 | |
1018 | #[derive (Debug, Default)] |
1019 | struct PropertyValue { |
1020 | value: Option<OwnedValue>, |
1021 | event: Event, |
1022 | } |
1023 | |
1024 | /// Flags to use with [`Proxy::call_with_flags`]. |
1025 | #[bitflags ] |
1026 | #[repr (u8)] |
1027 | #[derive (Debug, Copy, Clone, PartialEq, Eq)] |
1028 | pub enum MethodFlags { |
1029 | /// No response is expected from this method call, regardless of whether the |
1030 | /// signature for the interface method indicates a reply type. When passed, |
1031 | /// `call_with_flags` will return `Ok(None)` immediately after successfully |
1032 | /// sending the method call. |
1033 | /// |
1034 | /// Errors encountered while *making* the call will still be returned as |
1035 | /// an `Err` variant, but any errors that are triggered by the receiver's |
1036 | /// handling of the call will not be delivered. |
1037 | NoReplyExpected = 0x1, |
1038 | |
1039 | /// When set on a call whose destination is a message bus, this flag will instruct |
1040 | /// the bus not to [launch][al] a service to handle the call if no application |
1041 | /// on the bus owns the requested name. |
1042 | /// |
1043 | /// This flag is ignored when using a peer-to-peer connection. |
1044 | /// |
1045 | /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services |
1046 | NoAutoStart = 0x2, |
1047 | |
1048 | /// Indicates to the receiver that this client is prepared to wait for interactive |
1049 | /// authorization, which might take a considerable time to complete. For example, the receiver |
1050 | /// may query the user for confirmation via [polkit] or a similar framework. |
1051 | /// |
1052 | /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/ |
1053 | AllowInteractiveAuth = 0x4, |
1054 | } |
1055 | |
1056 | assert_impl_all!(MethodFlags: Send, Sync, Unpin); |
1057 | |
1058 | impl From<MethodFlags> for Flags { |
1059 | fn from(method_flag: MethodFlags) -> Self { |
1060 | match method_flag { |
1061 | MethodFlags::NoReplyExpected => Self::NoReplyExpected, |
1062 | MethodFlags::NoAutoStart => Self::NoAutoStart, |
1063 | MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth, |
1064 | } |
1065 | } |
1066 | } |
1067 | |
1068 | type OwnerChangedStreamMap<'a> = Map< |
1069 | fdo::NameOwnerChangedStream<'a>, |
1070 | Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>, |
1071 | >; |
1072 | |
1073 | /// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes. |
1074 | /// |
1075 | /// Use [`Proxy::receive_owner_changed`] to create an instance of this type. |
1076 | pub struct OwnerChangedStream<'a> { |
1077 | stream: OwnerChangedStreamMap<'a>, |
1078 | name: BusName<'a>, |
1079 | } |
1080 | |
1081 | assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin); |
1082 | |
1083 | impl OwnerChangedStream<'_> { |
1084 | /// The bus name being tracked. |
1085 | pub fn name(&self) -> &BusName<'_> { |
1086 | &self.name |
1087 | } |
1088 | } |
1089 | |
1090 | impl<'a> stream::Stream for OwnerChangedStream<'a> { |
1091 | type Item = Option<UniqueName<'static>>; |
1092 | |
1093 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1094 | use futures_util::StreamExt; |
1095 | self.get_mut().stream.poll_next_unpin(cx) |
1096 | } |
1097 | } |
1098 | |
1099 | /// A [`stream::Stream`] implementation that yields signal [messages](`Message`). |
1100 | /// |
1101 | /// Use [`Proxy::receive_signal`] to create an instance of this type. |
1102 | /// |
1103 | /// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match |
1104 | /// rule registration and [`AsyncDrop`] in its documentation applies here as well. |
1105 | #[derive (Debug)] |
1106 | pub struct SignalStream<'a> { |
1107 | stream: Join<MessageStream, Option<MessageStream>>, |
1108 | src_unique_name: Option<UniqueName<'static>>, |
1109 | signal_name: Option<MemberName<'a>>, |
1110 | } |
1111 | |
1112 | impl<'a> SignalStream<'a> { |
1113 | /// The signal name. |
1114 | pub fn name(&self) -> Option<&MemberName<'a>> { |
1115 | self.signal_name.as_ref() |
1116 | } |
1117 | |
1118 | async fn new( |
1119 | proxy: Proxy<'_>, |
1120 | signal_name: Option<MemberName<'a>>, |
1121 | args: &[(u8, &str)], |
1122 | ) -> Result<SignalStream<'a>> { |
1123 | let mut rule_builder = MatchRule::builder() |
1124 | .msg_type(Type::Signal) |
1125 | .sender(proxy.destination())? |
1126 | .path(proxy.path())? |
1127 | .interface(proxy.interface())?; |
1128 | if let Some(name) = &signal_name { |
1129 | rule_builder = rule_builder.member(name)?; |
1130 | } |
1131 | for (i, arg) in args { |
1132 | rule_builder = rule_builder.arg(*i, *arg)?; |
1133 | } |
1134 | let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into(); |
1135 | let conn = proxy.connection(); |
1136 | |
1137 | let (src_unique_name, stream) = match proxy.destination().to_owned() { |
1138 | BusName::Unique(name) => ( |
1139 | Some(name), |
1140 | join_streams( |
1141 | MessageStream::for_match_rule(signal_rule, conn, None).await?, |
1142 | None, |
1143 | ), |
1144 | ), |
1145 | BusName::WellKnown(name) => { |
1146 | use ordered_stream::OrderedStreamExt; |
1147 | |
1148 | let name_owner_changed_rule = MatchRule::builder() |
1149 | .msg_type(Type::Signal) |
1150 | .sender("org.freedesktop.DBus" )? |
1151 | .path("/org/freedesktop/DBus" )? |
1152 | .interface("org.freedesktop.DBus" )? |
1153 | .member("NameOwnerChanged" )? |
1154 | .add_arg(name.as_str())? |
1155 | .build(); |
1156 | let name_owner_changed_stream = MessageStream::for_match_rule( |
1157 | name_owner_changed_rule, |
1158 | conn, |
1159 | Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED), |
1160 | ) |
1161 | .await? |
1162 | .map(Either::Left); |
1163 | |
1164 | let get_name_owner = conn |
1165 | .call_method_raw( |
1166 | Some("org.freedesktop.DBus" ), |
1167 | "/org/freedesktop/DBus" , |
1168 | Some("org.freedesktop.DBus" ), |
1169 | "GetNameOwner" , |
1170 | BitFlags::empty(), |
1171 | &name, |
1172 | ) |
1173 | .await |
1174 | .map(|r| FromFuture::from(r.expect("no reply" )).map(Either::Right))?; |
1175 | |
1176 | let mut join = join_streams(name_owner_changed_stream, get_name_owner); |
1177 | |
1178 | let mut src_unique_name = loop { |
1179 | match join.next().await { |
1180 | Some(Either::Left(Ok(msg))) => { |
1181 | let signal = NameOwnerChanged::from_message(msg) |
1182 | .expect("`NameOwnerChanged` signal stream got wrong message" ); |
1183 | { |
1184 | break signal |
1185 | .args() |
1186 | // SAFETY: The filtering code couldn't have let this through if |
1187 | // args were not in order. |
1188 | .expect("`NameOwnerChanged` signal has no args" ) |
1189 | .new_owner() |
1190 | .as_ref() |
1191 | .map(UniqueName::to_owned); |
1192 | } |
1193 | } |
1194 | Some(Either::Left(Err(_))) => (), |
1195 | Some(Either::Right(Ok(response))) => { |
1196 | break Some(response.body().deserialize::<UniqueName<'_>>()?.to_owned()) |
1197 | } |
1198 | Some(Either::Right(Err(e))) => { |
1199 | // Probably the name is not owned. Not a problem but let's still log it. |
1200 | debug!("Failed to get owner of {name}: {e}" ); |
1201 | |
1202 | break None; |
1203 | } |
1204 | None => { |
1205 | return Err(Error::InputOutput( |
1206 | std::io::Error::new( |
1207 | std::io::ErrorKind::BrokenPipe, |
1208 | "connection closed" , |
1209 | ) |
1210 | .into(), |
1211 | )) |
1212 | } |
1213 | } |
1214 | }; |
1215 | |
1216 | // Let's take into account any buffered NameOwnerChanged signal. |
1217 | let (stream, _, queued) = join.into_inner(); |
1218 | if let Some(msg) = queued.and_then(|e| match e.0 { |
1219 | Either::Left(Ok(msg)) => Some(msg), |
1220 | Either::Left(Err(_)) | Either::Right(_) => None, |
1221 | }) { |
1222 | if let Some(signal) = NameOwnerChanged::from_message(msg) { |
1223 | if let Ok(args) = signal.args() { |
1224 | match (args.name(), args.new_owner().deref()) { |
1225 | (BusName::WellKnown(n), Some(new_owner)) if n == &name => { |
1226 | src_unique_name = Some(new_owner.to_owned()); |
1227 | } |
1228 | _ => (), |
1229 | } |
1230 | } |
1231 | } |
1232 | } |
1233 | let name_owner_changed_stream = stream.into_inner(); |
1234 | |
1235 | let stream = join_streams( |
1236 | MessageStream::for_match_rule(signal_rule, conn, None).await?, |
1237 | Some(name_owner_changed_stream), |
1238 | ); |
1239 | |
1240 | (src_unique_name, stream) |
1241 | } |
1242 | }; |
1243 | |
1244 | Ok(SignalStream { |
1245 | stream, |
1246 | src_unique_name, |
1247 | signal_name, |
1248 | }) |
1249 | } |
1250 | |
1251 | fn filter(&mut self, msg: &Message) -> Result<bool> { |
1252 | let header = msg.header(); |
1253 | let sender = header.sender(); |
1254 | if sender == self.src_unique_name.as_ref() { |
1255 | return Ok(true); |
1256 | } |
1257 | |
1258 | // The src_unique_name must be maintained in lock-step with the applied filter |
1259 | if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) { |
1260 | let args = signal.args()?; |
1261 | self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned()); |
1262 | } |
1263 | |
1264 | Ok(false) |
1265 | } |
1266 | } |
1267 | |
1268 | assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin); |
1269 | |
1270 | impl<'a> stream::Stream for SignalStream<'a> { |
1271 | type Item = Message; |
1272 | |
1273 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1274 | OrderedStream::poll_next_before(self, cx, before:None).map(|res: PollResult| res.into_data()) |
1275 | } |
1276 | } |
1277 | |
1278 | impl<'a> OrderedStream for SignalStream<'a> { |
1279 | type Data = Message; |
1280 | type Ordering = Sequence; |
1281 | |
1282 | fn poll_next_before( |
1283 | self: Pin<&mut Self>, |
1284 | cx: &mut Context<'_>, |
1285 | before: Option<&Self::Ordering>, |
1286 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
1287 | let this = self.get_mut(); |
1288 | loop { |
1289 | match ready!(OrderedStream::poll_next_before( |
1290 | Pin::new(&mut this.stream), |
1291 | cx, |
1292 | before |
1293 | )) { |
1294 | PollResult::Item { data, ordering } => { |
1295 | if let Ok(msg) = data { |
1296 | if let Ok(true) = this.filter(&msg) { |
1297 | return Poll::Ready(PollResult::Item { |
1298 | data: msg, |
1299 | ordering, |
1300 | }); |
1301 | } |
1302 | } |
1303 | } |
1304 | PollResult::Terminated => return Poll::Ready(PollResult::Terminated), |
1305 | PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore), |
1306 | } |
1307 | } |
1308 | } |
1309 | } |
1310 | |
1311 | impl<'a> stream::FusedStream for SignalStream<'a> { |
1312 | fn is_terminated(&self) -> bool { |
1313 | ordered_stream::FusedOrderedStream::is_terminated(&self.stream) |
1314 | } |
1315 | } |
1316 | |
1317 | #[async_trait::async_trait ] |
1318 | impl AsyncDrop for SignalStream<'_> { |
1319 | async fn async_drop(self) { |
1320 | let (signals: MessageStream, names: Option, _buffered: Option<(Result, …)>) = self.stream.into_inner(); |
1321 | signals.async_drop().await; |
1322 | if let Some(names: MessageStream) = names { |
1323 | names.async_drop().await; |
1324 | } |
1325 | } |
1326 | } |
1327 | |
1328 | impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> { |
1329 | fn from(proxy: crate::blocking::Proxy<'a>) -> Self { |
1330 | proxy.into_inner() |
1331 | } |
1332 | } |
1333 | |
1334 | /// This trait is implemented by all async proxies, which are generated with the |
1335 | /// [`dbus_proxy`](zbus::dbus_proxy) macro. |
1336 | pub trait ProxyImpl<'c> |
1337 | where |
1338 | Self: Sized, |
1339 | { |
1340 | /// Returns a customizable builder for this proxy. |
1341 | fn builder(conn: &Connection) -> Builder<'c, Self>; |
1342 | |
1343 | /// Consumes `self`, returning the underlying `zbus::Proxy`. |
1344 | fn into_inner(self) -> Proxy<'c>; |
1345 | |
1346 | /// The reference to the underlying `zbus::Proxy`. |
1347 | fn inner(&self) -> &Proxy<'c>; |
1348 | } |
1349 | |
1350 | #[cfg (test)] |
1351 | mod tests { |
1352 | use super::*; |
1353 | use crate::{connection, interface , object_server::SignalContext, proxy, utils::block_on}; |
1354 | use futures_util::StreamExt; |
1355 | use ntest::timeout; |
1356 | use test_log::test; |
1357 | |
1358 | #[test ] |
1359 | #[timeout(15000)] |
1360 | fn signal() { |
1361 | block_on(test_signal()).unwrap(); |
1362 | } |
1363 | |
1364 | async fn test_signal() -> Result<()> { |
1365 | // Register a well-known name with the session bus and ensure we get the appropriate |
1366 | // signals called for that. |
1367 | let conn = Connection::session().await?; |
1368 | let dest_conn = Connection::session().await?; |
1369 | let unique_name = dest_conn.unique_name().unwrap().clone(); |
1370 | |
1371 | let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest" ; |
1372 | let proxy: Proxy<'_> = Builder::new(&conn) |
1373 | .destination(well_known)? |
1374 | .path("/does/not/matter" )? |
1375 | .interface("does.not.matter" )? |
1376 | .build() |
1377 | .await?; |
1378 | let mut owner_changed_stream = proxy.receive_owner_changed().await?; |
1379 | |
1380 | let proxy = fdo::DBusProxy::new(&dest_conn).await?; |
1381 | let mut name_acquired_stream = proxy |
1382 | .inner() |
1383 | .receive_signal_with_args("NameAcquired" , &[(0, well_known)]) |
1384 | .await?; |
1385 | |
1386 | let prop_stream = proxy |
1387 | .inner() |
1388 | .receive_property_changed("SomeProp" ) |
1389 | .await |
1390 | .filter_map(|changed| async move { |
1391 | let v: Option<u32> = changed.get().await.ok(); |
1392 | dbg!(v) |
1393 | }); |
1394 | drop(proxy); |
1395 | drop(prop_stream); |
1396 | |
1397 | dest_conn.request_name(well_known).await?; |
1398 | |
1399 | let (new_owner, acquired_signal) = |
1400 | futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),); |
1401 | |
1402 | assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name); |
1403 | |
1404 | let acquired_signal = acquired_signal.unwrap(); |
1405 | assert_eq!( |
1406 | acquired_signal.body().deserialize::<&str>().unwrap(), |
1407 | well_known |
1408 | ); |
1409 | |
1410 | let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter" , "does.not.matter" ).await?; |
1411 | let mut unique_name_changed_stream = proxy.receive_owner_changed().await?; |
1412 | |
1413 | drop(dest_conn); |
1414 | name_acquired_stream.async_drop().await; |
1415 | |
1416 | // There shouldn't be an owner anymore. |
1417 | let new_owner = owner_changed_stream.next().await; |
1418 | assert!(new_owner.unwrap().is_none()); |
1419 | |
1420 | let new_unique_owner = unique_name_changed_stream.next().await; |
1421 | assert!(new_unique_owner.unwrap().is_none()); |
1422 | |
1423 | Ok(()) |
1424 | } |
1425 | |
1426 | #[test ] |
1427 | #[timeout(15000)] |
1428 | fn signal_stream_deadlock() { |
1429 | block_on(test_signal_stream_deadlock()).unwrap(); |
1430 | } |
1431 | |
1432 | /// Tests deadlocking in signal reception when the message queue is full. |
1433 | /// |
1434 | /// Creates a connection with a small message queue, and a service that |
1435 | /// emits signals at a high rate. First a listener is created that listens |
1436 | /// for that signal which should fill the small queue. Then another signal |
1437 | /// signal listener is created against another signal. Previously, this second |
1438 | /// call to add the match rule never resolved and resulted in a deadlock. |
1439 | async fn test_signal_stream_deadlock() -> Result<()> { |
1440 | #[proxy( |
1441 | gen_blocking = false, |
1442 | default_path = "/org/zbus/Test" , |
1443 | default_service = "org.zbus.Test.MR501" , |
1444 | interface = "org.zbus.Test" |
1445 | )] |
1446 | trait Test { |
1447 | #[zbus(signal)] |
1448 | fn my_signal(&self, msg: &str) -> Result<()>; |
1449 | } |
1450 | |
1451 | struct TestIface; |
1452 | |
1453 | #[interface(name = "org.zbus.Test" )] |
1454 | impl TestIface { |
1455 | #[zbus(signal)] |
1456 | async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>; |
1457 | } |
1458 | |
1459 | let test_iface = TestIface; |
1460 | let server_conn = connection::Builder::session()? |
1461 | .name("org.zbus.Test.MR501" )? |
1462 | .serve_at("/org/zbus/Test" , test_iface)? |
1463 | .build() |
1464 | .await?; |
1465 | |
1466 | let client_conn = connection::Builder::session()? |
1467 | .max_queued(1) |
1468 | .build() |
1469 | .await?; |
1470 | |
1471 | let test_proxy = TestProxy::new(&client_conn).await?; |
1472 | let test_prop_proxy = PropertiesProxy::builder(&client_conn) |
1473 | .destination("org.zbus.Test.MR501" )? |
1474 | .path("/org/zbus/Test" )? |
1475 | .build() |
1476 | .await?; |
1477 | |
1478 | let (tx, mut rx) = tokio::sync::mpsc::channel(1); |
1479 | |
1480 | let handle = { |
1481 | let tx = tx.clone(); |
1482 | let conn = server_conn.clone(); |
1483 | let server_fut = async move { |
1484 | use std::time::Duration; |
1485 | |
1486 | #[cfg (not(feature = "tokio" ))] |
1487 | use async_io::Timer; |
1488 | |
1489 | #[cfg (feature = "tokio" )] |
1490 | use tokio::time::sleep; |
1491 | |
1492 | let iface_ref = conn |
1493 | .object_server() |
1494 | .interface::<_, TestIface>("/org/zbus/Test" ) |
1495 | .await |
1496 | .unwrap(); |
1497 | |
1498 | let context = iface_ref.signal_context(); |
1499 | while !tx.is_closed() { |
1500 | for _ in 0..10 { |
1501 | TestIface::my_signal(context, "This is a test" ) |
1502 | .await |
1503 | .unwrap(); |
1504 | } |
1505 | |
1506 | #[cfg (not(feature = "tokio" ))] |
1507 | Timer::after(Duration::from_millis(5)).await; |
1508 | |
1509 | #[cfg (feature = "tokio" )] |
1510 | sleep(Duration::from_millis(5)).await; |
1511 | } |
1512 | }; |
1513 | server_conn.executor().spawn(server_fut, "server_task" ) |
1514 | }; |
1515 | |
1516 | let signal_fut = async { |
1517 | let mut signal_stream = test_proxy.receive_my_signal().await.unwrap(); |
1518 | |
1519 | tx.send(()).await.unwrap(); |
1520 | |
1521 | while let Some(_signal) = signal_stream.next().await {} |
1522 | }; |
1523 | |
1524 | let prop_fut = async move { |
1525 | rx.recv().await.unwrap(); |
1526 | let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap(); |
1527 | }; |
1528 | |
1529 | futures_util::pin_mut!(signal_fut); |
1530 | futures_util::pin_mut!(prop_fut); |
1531 | |
1532 | futures_util::future::select(signal_fut, prop_fut).await; |
1533 | |
1534 | handle.await; |
1535 | |
1536 | Ok(()) |
1537 | } |
1538 | } |
1539 | |