1use enumflags2::{bitflags, BitFlags};
2use event_listener::{Event, EventListener};
3use futures_core::{ready, stream};
4use futures_util::{future::Either, stream::Map};
5use once_cell::sync::OnceCell;
6use ordered_stream::{join as join_streams, FromFuture, Join, OrderedStream, PollResult};
7use static_assertions::assert_impl_all;
8use std::{
9 collections::{HashMap, HashSet},
10 convert::{TryFrom, TryInto},
11 future::Future,
12 ops::Deref,
13 pin::Pin,
14 sync::{Arc, RwLock, RwLockReadGuard},
15 task::{Context, Poll},
16};
17use tracing::{debug, info_span, instrument, trace, Instrument};
18
19use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
20use zvariant::{ObjectPath, OwnedValue, Str, Value};
21
22use crate::{
23 fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy},
24 AsyncDrop, CacheProperties, Connection, Error, Executor, MatchRule, Message, MessageFlags,
25 MessageSequence, MessageStream, MessageType, OwnedMatchRule, ProxyBuilder, Result, Task,
26};
27
28/// A client-side interface proxy.
29///
30/// A `Proxy` is a helper to interact with an interface on a remote object.
31///
32/// # Example
33///
34/// ```
35/// use std::result::Result;
36/// use std::error::Error;
37/// use zbus::{Connection, Proxy};
38///
39/// #[tokio::main]
40/// async fn main() -> Result<(), Box<dyn Error>> {
41/// let connection = Connection::session().await?;
42/// let p = Proxy::new(
43/// &connection,
44/// "org.freedesktop.DBus",
45/// "/org/freedesktop/DBus",
46/// "org.freedesktop.DBus",
47/// ).await?;
48/// // owned return value
49/// let _id: String = p.call("GetId", &()).await?;
50/// // borrowed return value
51/// let _id: &str = p.call_method("GetId", &()).await?.body()?;
52///
53/// Ok(())
54/// }
55/// ```
56///
57/// # Note
58///
59/// It is recommended to use the [`dbus_proxy`] macro, which provides a more convenient and
60/// type-safe *façade* `Proxy` derived from a Rust trait.
61///
62/// [`futures` crate]: https://crates.io/crates/futures
63/// [`dbus_proxy`]: attr.dbus_proxy.html
64#[derive(Clone, Debug)]
65pub struct Proxy<'a> {
66 pub(crate) inner: Arc<ProxyInner<'a>>,
67}
68
69assert_impl_all!(Proxy<'_>: Send, Sync, Unpin);
70
71/// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen
72/// (and possibly other crates).
73#[derive(derivative::Derivative)]
74#[derivative(Debug)]
75pub(crate) struct ProxyInnerStatic {
76 #[derivative(Debug = "ignore")]
77 pub(crate) conn: Connection,
78 dest_owner_change_match_rule: OnceCell<OwnedMatchRule>,
79}
80
81#[derive(Debug)]
82pub(crate) struct ProxyInner<'a> {
83 inner_without_borrows: ProxyInnerStatic,
84 pub(crate) destination: BusName<'a>,
85 pub(crate) path: ObjectPath<'a>,
86 pub(crate) interface: InterfaceName<'a>,
87
88 /// Cache of property values.
89 property_cache: Option<OnceCell<(Arc<PropertiesCache>, Task<()>)>>,
90 /// Set of properties which do not get cached, by name.
91 /// This overrides proxy-level caching behavior.
92 uncached_properties: HashSet<Str<'a>>,
93}
94
95impl Drop for ProxyInnerStatic {
96 fn drop(&mut self) {
97 if let Some(rule: OwnedMatchRule) = self.dest_owner_change_match_rule.take() {
98 self.conn.queue_remove_match(rule);
99 }
100 }
101}
102
103/// A property changed event.
104///
105/// The property changed event generated by [`PropertyStream`].
106pub struct PropertyChanged<'a, T> {
107 name: &'a str,
108 properties: Arc<PropertiesCache>,
109 proxy: Proxy<'a>,
110 phantom: std::marker::PhantomData<T>,
111}
112
113impl<'a, T> PropertyChanged<'a, T> {
114 // The name of the property that changed.
115 pub fn name(&self) -> &str {
116 self.name
117 }
118
119 // Get the raw value of the property that changed.
120 //
121 // If the notification signal contained the new value, it has been cached already and this call
122 // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch
123 // and cache the new value.
124 pub async fn get_raw<'p>(&'p self) -> Result<impl Deref<Target = Value<'static>> + 'p> {
125 struct Wrapper<'w> {
126 name: &'w str,
127 values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
128 }
129
130 impl<'w> Deref for Wrapper<'w> {
131 type Target = Value<'static>;
132
133 fn deref(&self) -> &Self::Target {
134 self.values
135 .get(self.name)
136 .expect("PropertyStream with no corresponding property")
137 .value
138 .as_ref()
139 .expect("PropertyStream with no corresponding property")
140 }
141 }
142
143 {
144 let values = self.properties.values.read().expect("lock poisoned");
145 if values
146 .get(self.name)
147 .expect("PropertyStream with no corresponding property")
148 .value
149 .is_some()
150 {
151 return Ok(Wrapper {
152 name: self.name,
153 values,
154 });
155 }
156 }
157
158 // The property was invalidated, so we need to fetch the new value.
159 let properties_proxy = self.proxy.properties_proxy();
160 let value = properties_proxy
161 .get(self.proxy.inner.interface.clone(), self.name)
162 .await
163 .map_err(crate::Error::from)?;
164
165 // Save the new value
166 {
167 let mut values = self.properties.values.write().expect("lock poisoned");
168
169 values
170 .get_mut(self.name)
171 .expect("PropertyStream with no corresponding property")
172 .value = Some(value);
173 }
174
175 Ok(Wrapper {
176 name: self.name,
177 values: self.properties.values.read().expect("lock poisoned"),
178 })
179 }
180}
181
182impl<T> PropertyChanged<'_, T>
183where
184 T: TryFrom<zvariant::OwnedValue>,
185 T::Error: Into<crate::Error>,
186{
187 // Get the value of the property that changed.
188 //
189 // If the notification signal contained the new value, it has been cached already and this call
190 // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch
191 // and cache the new value.
192 pub async fn get(&self) -> Result<T> {
193 self.get_raw()
194 .await
195 .and_then(|v: impl Deref>| T::try_from(OwnedValue::from(&*v)).map_err(op:Into::into))
196 }
197}
198
199/// A [`stream::Stream`] implementation that yields property change notifications.
200///
201/// Use [`Proxy::receive_property_changed`] to create an instance of this type.
202#[derive(derivative::Derivative)]
203#[derivative(Debug)]
204pub struct PropertyStream<'a, T> {
205 name: &'a str,
206 proxy: Proxy<'a>,
207 changed_listener: EventListener,
208 phantom: std::marker::PhantomData<T>,
209}
210
211impl<'a, T> stream::Stream for PropertyStream<'a, T>
212where
213 T: Unpin,
214{
215 type Item = PropertyChanged<'a, T>;
216
217 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
218 let m = self.get_mut();
219 let properties = match m.proxy.get_property_cache() {
220 Some(properties) => properties.clone(),
221 // With no cache, we will get no updates; return immediately
222 None => return Poll::Ready(None),
223 };
224 ready!(Pin::new(&mut m.changed_listener).poll(cx));
225
226 m.changed_listener = properties
227 .values
228 .read()
229 .expect("lock poisoned")
230 .get(m.name)
231 .expect("PropertyStream with no corresponding property")
232 .event
233 .listen();
234
235 Poll::Ready(Some(PropertyChanged {
236 name: m.name,
237 properties,
238 proxy: m.proxy.clone(),
239 phantom: std::marker::PhantomData,
240 }))
241 }
242}
243
244#[derive(Debug)]
245pub(crate) struct PropertiesCache {
246 values: RwLock<HashMap<String, PropertyValue>>,
247 caching_result: RwLock<CachingResult>,
248}
249
250#[derive(Debug)]
251enum CachingResult {
252 Caching { ready: Event },
253 Cached { result: Result<()> },
254}
255
256impl PropertiesCache {
257 #[instrument(skip_all)]
258 fn new(
259 proxy: PropertiesProxy<'static>,
260 interface: InterfaceName<'static>,
261 executor: &Executor<'_>,
262 uncached_properties: HashSet<zvariant::Str<'static>>,
263 ) -> (Arc<Self>, Task<()>) {
264 let cache = Arc::new(PropertiesCache {
265 values: Default::default(),
266 caching_result: RwLock::new(CachingResult::Caching {
267 ready: Event::new(),
268 }),
269 });
270
271 let cache_clone = cache.clone();
272 let task_name = format!("{interface} proxy caching");
273 let proxy_caching = async move {
274 let result = cache_clone
275 .init(proxy, interface, uncached_properties)
276 .await;
277 let (prop_changes, interface, uncached_properties) = {
278 let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
279 let ready = match &*caching_result {
280 CachingResult::Caching { ready } => ready,
281 // SAFETY: This is the only part of the code that changes this state and it's
282 // only run once.
283 _ => unreachable!(),
284 };
285 match result {
286 Ok((prop_changes, interface, uncached_properties)) => {
287 ready.notify(usize::MAX);
288 *caching_result = CachingResult::Cached { result: Ok(()) };
289
290 (prop_changes, interface, uncached_properties)
291 }
292 Err(e) => {
293 ready.notify(usize::MAX);
294 *caching_result = CachingResult::Cached { result: Err(e) };
295
296 return;
297 }
298 }
299 };
300
301 if let Err(e) = cache_clone
302 .keep_updated(prop_changes, interface, uncached_properties)
303 .await
304 {
305 debug!("Error keeping properties cache updated: {e}");
306 }
307 }
308 .instrument(info_span!("{}", task_name));
309 let task = executor.spawn(proxy_caching, &task_name);
310
311 (cache, task)
312 }
313
314 // new() runs this in a task it spawns for initialization of properties cache.
315 async fn init(
316 &self,
317 proxy: PropertiesProxy<'static>,
318 interface: InterfaceName<'static>,
319 uncached_properties: HashSet<zvariant::Str<'static>>,
320 ) -> Result<(
321 PropertiesChangedStream<'static>,
322 InterfaceName<'static>,
323 HashSet<zvariant::Str<'static>>,
324 )> {
325 use ordered_stream::OrderedStreamExt;
326
327 let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);
328
329 let get_all = proxy
330 .connection()
331 .call_method_raw(
332 Some(proxy.destination()),
333 proxy.path(),
334 Some(proxy.interface()),
335 "GetAll",
336 BitFlags::empty(),
337 &interface,
338 )
339 .await
340 .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
341
342 let mut join = join_streams(prop_changes, get_all);
343
344 loop {
345 match join.next().await {
346 Some(Either::Left(_update)) => {
347 // discard updates prior to the initial population
348 }
349 Some(Either::Right(populate)) => {
350 populate?.body().map(|values| {
351 self.update_cache(&uncached_properties, &values, Vec::new(), &interface);
352 })?;
353 break;
354 }
355 None => break,
356 }
357 }
358 if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
359 // if an update was buffered, then it happened after the get_all returned and needs to
360 // be applied before we discard the join
361 if let Ok(args) = update.args() {
362 if args.interface_name == interface {
363 self.update_cache(
364 &uncached_properties,
365 &args.changed_properties,
366 args.invalidated_properties,
367 &interface,
368 );
369 }
370 }
371 }
372 // This is needed to avoid a "implementation of `OrderedStream` is not general enough"
373 // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead
374 // of directly to the stream.
375 let prop_changes = join.into_inner().0.into_inner();
376
377 Ok((prop_changes, interface, uncached_properties))
378 }
379
380 // new() runs this in a task it spawns for keeping the cache in sync.
381 #[instrument(skip_all)]
382 async fn keep_updated(
383 &self,
384 mut prop_changes: PropertiesChangedStream<'static>,
385 interface: InterfaceName<'static>,
386 uncached_properties: HashSet<zvariant::Str<'static>>,
387 ) -> Result<()> {
388 use futures_util::StreamExt;
389
390 trace!("Listening for property changes on {interface}...");
391 while let Some(update) = prop_changes.next().await {
392 if let Ok(args) = update.args() {
393 if args.interface_name == interface {
394 self.update_cache(
395 &uncached_properties,
396 &args.changed_properties,
397 args.invalidated_properties,
398 &interface,
399 );
400 }
401 }
402 }
403
404 Ok(())
405 }
406
407 fn update_cache(
408 &self,
409 uncached_properties: &HashSet<Str<'_>>,
410 changed: &HashMap<&str, Value<'_>>,
411 invalidated: Vec<&str>,
412 interface: &InterfaceName<'_>,
413 ) {
414 let mut values = self.values.write().expect("lock poisoned");
415
416 for inval in invalidated {
417 if uncached_properties.contains(&Str::from(inval)) {
418 debug!(
419 "Ignoring invalidation of uncached property `{}.{}`",
420 interface, inval
421 );
422 continue;
423 }
424 trace!("Property `{interface}.{inval}` invalidated");
425
426 if let Some(entry) = values.get_mut(inval) {
427 entry.value = None;
428 entry.event.notify(usize::MAX);
429 }
430 }
431
432 for (property_name, value) in changed {
433 if uncached_properties.contains(&Str::from(*property_name)) {
434 debug!(
435 "Ignoring update of uncached property `{}.{}`",
436 interface, property_name
437 );
438 continue;
439 }
440 trace!("Property `{interface}.{property_name}` updated");
441
442 let entry = values.entry(property_name.to_string()).or_default();
443
444 entry.value = Some(OwnedValue::from(value));
445 entry.event.notify(usize::MAX);
446 }
447 }
448
449 /// Wait for the cache to be populated and return any error encountered during population
450 pub(crate) async fn ready(&self) -> Result<()> {
451 let listener = match &*self.caching_result.read().expect("lock poisoned") {
452 CachingResult::Caching { ready } => ready.listen(),
453 CachingResult::Cached { result } => return result.clone(),
454 };
455 listener.await;
456
457 // It must be ready now.
458 match &*self.caching_result.read().expect("lock poisoned") {
459 // SAFETY: We were just notified that state has changed to `Cached` and we never go back
460 // to `Caching` once in `Cached`.
461 CachingResult::Caching { .. } => unreachable!(),
462 CachingResult::Cached { result } => result.clone(),
463 }
464 }
465}
466
467impl<'a> ProxyInner<'a> {
468 pub(crate) fn new(
469 conn: Connection,
470 destination: BusName<'a>,
471 path: ObjectPath<'a>,
472 interface: InterfaceName<'a>,
473 cache: CacheProperties,
474 uncached_properties: HashSet<Str<'a>>,
475 ) -> Self {
476 let property_cache = match cache {
477 CacheProperties::Yes | CacheProperties::Lazily => Some(OnceCell::new()),
478 CacheProperties::No => None,
479 };
480 Self {
481 inner_without_borrows: ProxyInnerStatic {
482 conn,
483 dest_owner_change_match_rule: OnceCell::new(),
484 },
485 destination,
486 path,
487 interface,
488 property_cache,
489 uncached_properties,
490 }
491 }
492
493 /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination.
494 ///
495 /// If the destination is a unique name, we will not subscribe to the signal.
496 pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
497 if !self.inner_without_borrows.conn.is_bus() {
498 // Names don't mean much outside the bus context.
499 return Ok(());
500 }
501
502 let well_known_name = match &self.destination {
503 BusName::WellKnown(well_known_name) => well_known_name,
504 BusName::Unique(_) => return Ok(()),
505 };
506
507 if self
508 .inner_without_borrows
509 .dest_owner_change_match_rule
510 .get()
511 .is_some()
512 {
513 // Already watching over the bus for any name updates so nothing to do here.
514 return Ok(());
515 }
516
517 let conn = &self.inner_without_borrows.conn;
518 let signal_rule: OwnedMatchRule = MatchRule::builder()
519 .msg_type(MessageType::Signal)
520 .sender("org.freedesktop.DBus")?
521 .path("/org/freedesktop/DBus")?
522 .interface("org.freedesktop.DBus")?
523 .member("NameOwnerChanged")?
524 .add_arg(well_known_name.as_str())?
525 .build()
526 .to_owned()
527 .into();
528
529 conn.add_match(
530 signal_rule.clone(),
531 Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
532 )
533 .await?;
534
535 if self
536 .inner_without_borrows
537 .dest_owner_change_match_rule
538 .set(signal_rule.clone())
539 .is_err()
540 {
541 // we raced another destination_unique_name call and added it twice
542 conn.remove_match(signal_rule).await?;
543 }
544
545 Ok(())
546 }
547}
548
549const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
550
551impl<'a> Proxy<'a> {
552 /// Create a new `Proxy` for the given destination/path/interface.
553 pub async fn new<D, P, I>(
554 conn: &Connection,
555 destination: D,
556 path: P,
557 interface: I,
558 ) -> Result<Proxy<'a>>
559 where
560 D: TryInto<BusName<'a>>,
561 P: TryInto<ObjectPath<'a>>,
562 I: TryInto<InterfaceName<'a>>,
563 D::Error: Into<Error>,
564 P::Error: Into<Error>,
565 I::Error: Into<Error>,
566 {
567 ProxyBuilder::new_bare(conn)
568 .destination(destination)?
569 .path(path)?
570 .interface(interface)?
571 .build()
572 .await
573 }
574
575 /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all
576 /// passed arguments.
577 pub async fn new_owned<D, P, I>(
578 conn: Connection,
579 destination: D,
580 path: P,
581 interface: I,
582 ) -> Result<Proxy<'a>>
583 where
584 D: TryInto<BusName<'static>>,
585 P: TryInto<ObjectPath<'static>>,
586 I: TryInto<InterfaceName<'static>>,
587 D::Error: Into<Error>,
588 P::Error: Into<Error>,
589 I::Error: Into<Error>,
590 {
591 ProxyBuilder::new_bare(&conn)
592 .destination(destination)?
593 .path(path)?
594 .interface(interface)?
595 .build()
596 .await
597 }
598
599 /// Get a reference to the associated connection.
600 pub fn connection(&self) -> &Connection {
601 &self.inner.inner_without_borrows.conn
602 }
603
604 /// Get a reference to the destination service name.
605 pub fn destination(&self) -> &BusName<'_> {
606 &self.inner.destination
607 }
608
609 /// Get a reference to the object path.
610 pub fn path(&self) -> &ObjectPath<'_> {
611 &self.inner.path
612 }
613
614 /// Get a reference to the interface.
615 pub fn interface(&self) -> &InterfaceName<'_> {
616 &self.inner.interface
617 }
618
619 /// Introspect the associated object, and return the XML description.
620 ///
621 /// See the [xml](xml/index.html) or [quick_xml](quick_xml/index.html) module for parsing the
622 /// result.
623 pub async fn introspect(&self) -> fdo::Result<String> {
624 let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
625 .destination(&self.inner.destination)?
626 .path(&self.inner.path)?
627 .build()
628 .await?;
629
630 proxy.introspect().await
631 }
632
633 fn properties_proxy(&self) -> PropertiesProxy<'_> {
634 PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
635 // Safe because already checked earlier
636 .destination(self.inner.destination.as_ref())
637 .unwrap()
638 // Safe because already checked earlier
639 .path(self.inner.path.as_ref())
640 .unwrap()
641 // does not have properties
642 .cache_properties(CacheProperties::No)
643 .build_internal()
644 .unwrap()
645 .into()
646 }
647
648 fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
649 PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
650 // Safe because already checked earlier
651 .destination(self.inner.destination.to_owned())
652 .unwrap()
653 // Safe because already checked earlier
654 .path(self.inner.path.to_owned())
655 .unwrap()
656 // does not have properties
657 .cache_properties(CacheProperties::No)
658 .build_internal()
659 .unwrap()
660 .into()
661 }
662
663 /// Get the cache, starting it in the background if needed.
664 ///
665 /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors
666 /// encountered in the population.
667 pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
668 let cache = match &self.inner.property_cache {
669 Some(cache) => cache,
670 None => return None,
671 };
672 let (cache, _) = &cache.get_or_init(|| {
673 let proxy = self.owned_properties_proxy();
674 let interface = self.interface().to_owned();
675 let uncached_properties: HashSet<zvariant::Str<'static>> = self
676 .inner
677 .uncached_properties
678 .iter()
679 .map(|s| s.to_owned())
680 .collect();
681 let executor = self.connection().executor();
682
683 PropertiesCache::new(proxy, interface, executor, uncached_properties)
684 });
685
686 Some(cache)
687 }
688
689 /// Get the cached value of the property `property_name`.
690 ///
691 /// This returns `None` if the property is not in the cache. This could be because the cache
692 /// was invalidated by an update, because caching was disabled for this property or proxy, or
693 /// because the cache has not yet been populated. Use `get_property` to fetch the value from
694 /// the peer.
695 pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
696 where
697 T: TryFrom<OwnedValue>,
698 T::Error: Into<Error>,
699 {
700 self.cached_property_raw(property_name)
701 .as_deref()
702 .map(|v| T::try_from(OwnedValue::from(v)))
703 .transpose()
704 .map_err(Into::into)
705 }
706
707 /// Get the cached value of the property `property_name`.
708 ///
709 /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This
710 /// is useful if you want to avoid allocations and cloning.
711 pub fn cached_property_raw<'p>(
712 &'p self,
713 property_name: &'p str,
714 ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
715 if let Some(values) = self
716 .inner
717 .property_cache
718 .as_ref()
719 .and_then(OnceCell::get)
720 .map(|c| c.0.values.read().expect("lock poisoned"))
721 {
722 // ensure that the property is in the cache.
723 values
724 .get(property_name)
725 // if the property value has not yet been cached, this will return None.
726 .and_then(|e| e.value.as_ref())?;
727
728 struct Wrapper<'a> {
729 values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
730 property_name: &'a str,
731 }
732
733 impl Deref for Wrapper<'_> {
734 type Target = Value<'static>;
735
736 fn deref(&self) -> &Self::Target {
737 self.values
738 .get(self.property_name)
739 .and_then(|e| e.value.as_ref())
740 .map(|v| v.deref())
741 .expect("inexistent property")
742 }
743 }
744
745 Some(Wrapper {
746 values,
747 property_name,
748 })
749 } else {
750 None
751 }
752 }
753
754 async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
755 Ok(self
756 .properties_proxy()
757 .get(self.inner.interface.as_ref(), property_name)
758 .await?)
759 }
760
761 /// Get the property `property_name`.
762 ///
763 /// Get the property value from the cache (if caching is enabled) or call the
764 /// `Get` method of the `org.freedesktop.DBus.Properties` interface.
765 pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
766 where
767 T: TryFrom<OwnedValue>,
768 T::Error: Into<Error>,
769 {
770 if let Some(cache) = self.get_property_cache() {
771 cache.ready().await?;
772 }
773 if let Some(value) = self.cached_property(property_name)? {
774 return Ok(value);
775 }
776
777 let value = self.get_proxy_property(property_name).await?;
778 value.try_into().map_err(Into::into)
779 }
780
781 /// Set the property `property_name`.
782 ///
783 /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface.
784 pub async fn set_property<'t, T: 't>(&self, property_name: &str, value: T) -> fdo::Result<()>
785 where
786 T: Into<Value<'t>>,
787 {
788 self.properties_proxy()
789 .set(self.inner.interface.as_ref(), property_name, &value.into())
790 .await
791 }
792
793 /// Call a method and return the reply.
794 ///
795 /// Typically, you would want to use [`call`] method instead. Use this method if you need to
796 /// deserialize the reply message manually (this way, you can avoid the memory
797 /// allocation/copying, by deserializing the reply to an unowned type).
798 ///
799 /// [`call`]: struct.Proxy.html#method.call
800 pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Arc<Message>>
801 where
802 M: TryInto<MemberName<'m>>,
803 M::Error: Into<Error>,
804 B: serde::ser::Serialize + zvariant::DynamicType,
805 {
806 self.inner
807 .inner_without_borrows
808 .conn
809 .call_method(
810 Some(&self.inner.destination),
811 self.inner.path.as_str(),
812 Some(&self.inner.interface),
813 method_name,
814 body,
815 )
816 .await
817 }
818
819 /// Call a method and return the reply body.
820 ///
821 /// Use [`call_method`] instead if you need to deserialize the reply manually/separately.
822 ///
823 /// [`call_method`]: struct.Proxy.html#method.call_method
824 pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
825 where
826 M: TryInto<MemberName<'m>>,
827 M::Error: Into<Error>,
828 B: serde::ser::Serialize + zvariant::DynamicType,
829 R: serde::de::DeserializeOwned + zvariant::Type,
830 {
831 let reply = self.call_method(method_name, body).await?;
832
833 reply.body()
834 }
835
836 /// Call a method and return the reply body, optionally supplying a set of
837 /// method flags to control the way the method call message is sent and handled.
838 ///
839 /// Use [`call`] instead if you do not need any special handling via additional flags.
840 /// If the `NoReplyExpected` flag is passed , this will return None immediately
841 /// after sending the message, similar to [`call_noreply`]
842 ///
843 /// [`call`]: struct.Proxy.html#method.call
844 /// [`call_noreply`]: struct.Proxy.html#method.call_noreply
845 pub async fn call_with_flags<'m, M, B, R>(
846 &self,
847 method_name: M,
848 flags: BitFlags<MethodFlags>,
849 body: &B,
850 ) -> Result<Option<R>>
851 where
852 M: TryInto<MemberName<'m>>,
853 M::Error: Into<Error>,
854 B: serde::ser::Serialize + zvariant::DynamicType,
855 R: serde::de::DeserializeOwned + zvariant::Type,
856 {
857 let flags = flags
858 .iter()
859 .map(MessageFlags::from)
860 .collect::<BitFlags<_>>();
861 match self
862 .inner
863 .inner_without_borrows
864 .conn
865 .call_method_raw(
866 Some(self.destination()),
867 self.path(),
868 Some(self.interface()),
869 method_name,
870 flags,
871 body,
872 )
873 .await?
874 {
875 Some(reply) => reply.await?.body().map(Some),
876 None => Ok(None),
877 }
878 }
879
880 /// Call a method without expecting a reply
881 ///
882 /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply.
883 pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
884 where
885 M: TryInto<MemberName<'m>>,
886 M::Error: Into<Error>,
887 B: serde::ser::Serialize + zvariant::DynamicType,
888 {
889 self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
890 .await?;
891 Ok(())
892 }
893
894 /// Create a stream for signal named `signal_name`.
895 pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
896 where
897 M: TryInto<MemberName<'m>>,
898 M::Error: Into<Error>,
899 {
900 self.receive_signal_with_args(signal_name, &[]).await
901 }
902
903 /// Same as [`Proxy::receive_signal`] but with a filter.
904 ///
905 /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid
906 /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use
907 /// this method where possible. Note that this filtering is limited to arguments of string
908 /// types.
909 ///
910 /// The arguments are passed as a tuples of argument index and expected value.
911 pub async fn receive_signal_with_args<'m, M>(
912 &self,
913 signal_name: M,
914 args: &[(u8, &str)],
915 ) -> Result<SignalStream<'m>>
916 where
917 M: TryInto<MemberName<'m>>,
918 M::Error: Into<Error>,
919 {
920 let signal_name = signal_name.try_into().map_err(Into::into)?;
921 self.receive_signals(Some(signal_name), args).await
922 }
923
924 async fn receive_signals<'m>(
925 &self,
926 signal_name: Option<MemberName<'m>>,
927 args: &[(u8, &str)],
928 ) -> Result<SignalStream<'m>> {
929 self.inner.subscribe_dest_owner_change().await?;
930
931 SignalStream::new(self.clone(), signal_name, args).await
932 }
933
934 /// Create a stream for all signals emitted by this service.
935 pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
936 self.receive_signals(None, &[]).await
937 }
938
939 /// Get a stream to receive property changed events.
940 ///
941 /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
942 /// will only receive the last update.
943 ///
944 /// If caching is not enabled on this proxy, the resulting stream will not return any events.
945 pub async fn receive_property_changed<'name: 'a, T>(
946 &self,
947 name: &'name str,
948 ) -> PropertyStream<'a, T> {
949 let properties = self.get_property_cache();
950 let changed_listener = if let Some(properties) = &properties {
951 let mut values = properties.values.write().expect("lock poisoned");
952 let entry = values
953 .entry(name.to_string())
954 .or_insert_with(PropertyValue::default);
955 entry.event.listen()
956 } else {
957 Event::new().listen()
958 };
959
960 PropertyStream {
961 name,
962 proxy: self.clone(),
963 changed_listener,
964 phantom: std::marker::PhantomData,
965 }
966 }
967
968 /// Get a stream to receive destination owner changed events.
969 ///
970 /// If the proxy destination is a unique name, the stream will be notified of the peer
971 /// disconnection from the bus (with a `None` value).
972 ///
973 /// If the proxy destination is a well-known name, the stream will be notified whenever the name
974 /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the
975 /// name is released (with a `None` value).
976 ///
977 /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
978 /// will only receive the last update.
979 pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> {
980 use futures_util::StreamExt;
981 let dbus_proxy = fdo::DBusProxy::builder(self.connection())
982 .cache_properties(CacheProperties::No)
983 .build()
984 .await?;
985 Ok(OwnerChangedStream {
986 stream: dbus_proxy
987 .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
988 .await?
989 .map(Box::new(move |signal| {
990 let args = signal.args().unwrap();
991 let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned());
992
993 new_owner
994 })),
995 name: self.destination().clone(),
996 })
997 }
998}
999
1000#[derive(Debug, Default)]
1001struct PropertyValue {
1002 value: Option<OwnedValue>,
1003 event: Event,
1004}
1005
1006/// Flags to use with [`Proxy::call_with_flags`].
1007#[bitflags]
1008#[repr(u8)]
1009#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1010pub enum MethodFlags {
1011 /// No response is expected from this method call, regardless of whether the
1012 /// signature for the interface method indicates a reply type. When passed,
1013 /// `call_with_flags` will return `Ok(None)` immediately after successfully
1014 /// sending the method call.
1015 ///
1016 /// Errors encountered while *making* the call will still be returned as
1017 /// an `Err` variant, but any errors that are triggered by the receiver's
1018 /// handling of the call will not be delivered.
1019 NoReplyExpected = 0x1,
1020
1021 /// When set on a call whose destination is a message bus, this flag will instruct
1022 /// the bus not to [launch][al] a service to handle the call if no application
1023 /// on the bus owns the requested name.
1024 ///
1025 /// This flag is ignored when using a peer-to-peer connection.
1026 ///
1027 /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services
1028 NoAutoStart = 0x2,
1029
1030 /// Indicates to the receiver that this client is prepared to wait for interactive
1031 /// authorization, which might take a considerable time to complete. For example, the receiver
1032 /// may query the user for confirmation via [polkit] or a similar framework.
1033 ///
1034 /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/
1035 AllowInteractiveAuth = 0x4,
1036}
1037
1038assert_impl_all!(MethodFlags: Send, Sync, Unpin);
1039
1040impl From<MethodFlags> for MessageFlags {
1041 fn from(method_flag: MethodFlags) -> Self {
1042 match method_flag {
1043 MethodFlags::NoReplyExpected => Self::NoReplyExpected,
1044 MethodFlags::NoAutoStart => Self::NoAutoStart,
1045 MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
1046 }
1047 }
1048}
1049
1050type OwnerChangedStreamMap<'a> = Map<
1051 fdo::NameOwnerChangedStream<'a>,
1052 Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
1053>;
1054
1055/// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes.
1056///
1057/// Use [`Proxy::receive_owner_changed`] to create an instance of this type.
1058pub struct OwnerChangedStream<'a> {
1059 stream: OwnerChangedStreamMap<'a>,
1060 name: BusName<'a>,
1061}
1062
1063assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin);
1064
1065impl OwnerChangedStream<'_> {
1066 /// The bus name being tracked.
1067 pub fn name(&self) -> &BusName<'_> {
1068 &self.name
1069 }
1070}
1071
1072impl<'a> stream::Stream for OwnerChangedStream<'a> {
1073 type Item = Option<UniqueName<'static>>;
1074
1075 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1076 use futures_util::StreamExt;
1077 self.get_mut().stream.poll_next_unpin(cx)
1078 }
1079}
1080
1081/// A [`stream::Stream`] implementation that yields signal [messages](`Message`).
1082///
1083/// Use [`Proxy::receive_signal`] to create an instance of this type.
1084///
1085/// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match
1086/// rule registration and [`AsyncDrop`] in its documentation applies here as well.
1087#[derive(Debug)]
1088pub struct SignalStream<'a> {
1089 stream: Join<MessageStream, Option<MessageStream>>,
1090 src_unique_name: Option<UniqueName<'static>>,
1091 signal_name: Option<MemberName<'a>>,
1092}
1093
1094impl<'a> SignalStream<'a> {
1095 /// The signal name.
1096 pub fn name(&self) -> Option<&MemberName<'a>> {
1097 self.signal_name.as_ref()
1098 }
1099
1100 async fn new(
1101 proxy: Proxy<'_>,
1102 signal_name: Option<MemberName<'a>>,
1103 args: &[(u8, &str)],
1104 ) -> Result<SignalStream<'a>> {
1105 let mut rule_builder = MatchRule::builder()
1106 .msg_type(MessageType::Signal)
1107 .sender(proxy.destination())?
1108 .path(proxy.path())?
1109 .interface(proxy.interface())?;
1110 if let Some(name) = &signal_name {
1111 rule_builder = rule_builder.member(name)?;
1112 }
1113 for (i, arg) in args {
1114 rule_builder = rule_builder.arg(*i, *arg)?;
1115 }
1116 let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
1117 let conn = proxy.connection();
1118
1119 let (src_unique_name, stream) = match proxy.destination().to_owned() {
1120 BusName::Unique(name) => (
1121 Some(name),
1122 join_streams(
1123 MessageStream::for_match_rule(signal_rule, conn, None).await?,
1124 None,
1125 ),
1126 ),
1127 BusName::WellKnown(name) => {
1128 use ordered_stream::OrderedStreamExt;
1129
1130 let name_owner_changed_rule = MatchRule::builder()
1131 .msg_type(MessageType::Signal)
1132 .sender("org.freedesktop.DBus")?
1133 .path("/org/freedesktop/DBus")?
1134 .interface("org.freedesktop.DBus")?
1135 .member("NameOwnerChanged")?
1136 .add_arg(name.as_str())?
1137 .build();
1138 let , Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>" title="name_owner_changed_stream">name_owner_changed_stream = MessageStream::for_match_rule(
1139 name_owner_changed_rule,
1140 conn,
1141 Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
1142 )
1143 .await?
1144 .map(Either::Left);
1145
1146 let , Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>" title="get_name_owner">get_name_owner = conn
1147 .call_method_raw(
1148 Some("org.freedesktop.DBus"),
1149 "/org/freedesktop/DBus",
1150 Some("org.freedesktop.DBus"),
1151 "GetNameOwner",
1152 BitFlags::empty(),
1153 &name,
1154 )
1155 .await
1156 .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
1157
1158 let mut , Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>, Map, extern "rust-call" Right, Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>>" title="join">join = join_streams(name_owner_changed_stream, get_name_owner);
1159
1160 let mut src_unique_name = loop {
1161 match join.next().await {
1162 Some(Either::Left(Ok(msg))) => {
1163 let signal = NameOwnerChanged::from_message(msg)
1164 .expect("`NameOwnerChanged` signal stream got wrong message");
1165 {
1166 break signal
1167 .args()
1168 // SAFETY: The filtering code couldn't have let this through if
1169 // args were not in order.
1170 .expect("`NameOwnerChanged` signal has no args")
1171 .new_owner()
1172 .as_ref()
1173 .map(UniqueName::to_owned);
1174 }
1175 }
1176 Some(Either::Left(Err(_))) => (),
1177 Some(Either::Right(Ok(response))) => {
1178 break Some(response.body::<UniqueName<'_>>()?.to_owned())
1179 }
1180 Some(Either::Right(Err(e))) => {
1181 // Probably the name is not owned. Not a problem but let's still log it.
1182 debug!("Failed to get owner of {name}: {e}");
1183
1184 break None;
1185 }
1186 None => {
1187 return Err(Error::InputOutput(
1188 std::io::Error::new(
1189 std::io::ErrorKind::BrokenPipe,
1190 "connection closed",
1191 )
1192 .into(),
1193 ))
1194 }
1195 }
1196 };
1197
1198 // Let's take into account any buffered NameOwnerChanged signal.
1199 let (, Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>" title="stream">stream, _, queued) = join.into_inner();
1200 if let Some(msg) = queued.and_then(|e| match e.0 {
1201 Either::Left(Ok(msg)) => Some(msg),
1202 Either::Left(Err(_)) | Either::Right(_) => None,
1203 }) {
1204 if let Some(signal) = NameOwnerChanged::from_message(msg) {
1205 if let Ok(args) = signal.args() {
1206 match (args.name(), args.new_owner().deref()) {
1207 (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
1208 src_unique_name = Some(new_owner.to_owned());
1209 }
1210 _ => (),
1211 }
1212 }
1213 }
1214 }
1215 let name_owner_changed_stream = stream.into_inner();
1216
1217 let stream = join_streams(
1218 MessageStream::for_match_rule(signal_rule, conn, None).await?,
1219 Some(name_owner_changed_stream),
1220 );
1221
1222 (src_unique_name, stream)
1223 }
1224 };
1225
1226 Ok(SignalStream {
1227 stream,
1228 src_unique_name,
1229 signal_name,
1230 })
1231 }
1232
1233 fn filter(&mut self, msg: &Arc<Message>) -> Result<bool> {
1234 let header = msg.header()?;
1235 let sender = header.sender()?;
1236 if sender == self.src_unique_name.as_ref() {
1237 return Ok(true);
1238 }
1239
1240 // The src_unique_name must be maintained in lock-step with the applied filter
1241 if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
1242 let args = signal.args()?;
1243 self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
1244 }
1245
1246 Ok(false)
1247 }
1248}
1249
1250assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin);
1251
1252impl<'a> stream::Stream for SignalStream<'a> {
1253 type Item = Arc<Message>;
1254
1255 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1256 OrderedStream::poll_next_before(self, cx, before:None).map(|res: PollResult| res.into_data())
1257 }
1258}
1259
1260impl<'a> OrderedStream for SignalStream<'a> {
1261 type Data = Arc<Message>;
1262 type Ordering = MessageSequence;
1263
1264 fn poll_next_before(
1265 self: Pin<&mut Self>,
1266 cx: &mut Context<'_>,
1267 before: Option<&Self::Ordering>,
1268 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1269 let this = self.get_mut();
1270 loop {
1271 match ready!(OrderedStream::poll_next_before(
1272 Pin::new(&mut this.stream),
1273 cx,
1274 before
1275 )) {
1276 PollResult::Item { data, ordering } => {
1277 if let Ok(msg) = data {
1278 if let Ok(true) = this.filter(&msg) {
1279 return Poll::Ready(PollResult::Item {
1280 data: msg,
1281 ordering,
1282 });
1283 }
1284 }
1285 }
1286 PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
1287 PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
1288 }
1289 }
1290 }
1291}
1292
1293impl<'a> stream::FusedStream for SignalStream<'a> {
1294 fn is_terminated(&self) -> bool {
1295 ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
1296 }
1297}
1298
1299#[async_trait::async_trait]
1300impl AsyncDrop for SignalStream<'_> {
1301 async fn async_drop(self) {
1302 let (signals: MessageStream, names: Option, _buffered: Option<(Result, …>, …)>) = self.stream.into_inner();
1303 signals.async_drop().await;
1304 if let Some(names: MessageStream) = names {
1305 names.async_drop().await;
1306 }
1307 }
1308}
1309
1310impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
1311 fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
1312 proxy.into_inner()
1313 }
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318 use super::*;
1319 use crate::{dbus_interface, dbus_proxy, utils::block_on, ConnectionBuilder, SignalContext};
1320 use futures_util::StreamExt;
1321 use ntest::timeout;
1322 use test_log::test;
1323
1324 #[test]
1325 #[timeout(15000)]
1326 fn signal() {
1327 block_on(test_signal()).unwrap();
1328 }
1329
1330 async fn test_signal() -> Result<()> {
1331 // Register a well-known name with the session bus and ensure we get the appropriate
1332 // signals called for that.
1333 let conn = Connection::session().await?;
1334 let dest_conn = Connection::session().await?;
1335 let unique_name = dest_conn.unique_name().unwrap().clone();
1336
1337 let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
1338 let proxy: Proxy<'_> = ProxyBuilder::new_bare(&conn)
1339 .destination(well_known)?
1340 .path("/does/not/matter")?
1341 .interface("does.not.matter")?
1342 .build()
1343 .await?;
1344 let mut owner_changed_stream = proxy.receive_owner_changed().await?;
1345
1346 let proxy = fdo::DBusProxy::new(&dest_conn).await?;
1347 let mut name_acquired_stream = proxy
1348 .receive_signal_with_args("NameAcquired", &[(0, well_known)])
1349 .await?;
1350
1351 let prop_stream =
1352 proxy
1353 .receive_property_changed("SomeProp")
1354 .await
1355 .filter_map(|changed| async move {
1356 let v: Option<u32> = changed.get().await.ok();
1357 dbg!(v)
1358 });
1359 drop(proxy);
1360 drop(prop_stream);
1361
1362 dest_conn.request_name(well_known).await?;
1363
1364 let (new_owner, acquired_signal) =
1365 futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
1366
1367 assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
1368
1369 let acquired_signal = acquired_signal.unwrap();
1370 assert_eq!(acquired_signal.body::<&str>().unwrap(), well_known);
1371
1372 let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
1373 let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
1374
1375 drop(dest_conn);
1376 name_acquired_stream.async_drop().await;
1377
1378 // There shouldn't be an owner anymore.
1379 let new_owner = owner_changed_stream.next().await;
1380 assert!(new_owner.unwrap().is_none());
1381
1382 let new_unique_owner = unique_name_changed_stream.next().await;
1383 assert!(new_unique_owner.unwrap().is_none());
1384
1385 Ok(())
1386 }
1387
1388 #[test]
1389 #[timeout(15000)]
1390 fn signal_stream_deadlock() {
1391 block_on(test_signal_stream_deadlock()).unwrap();
1392 }
1393
1394 /// Tests deadlocking in signal reception when the message queue is full.
1395 ///
1396 /// Creates a connection with a small message queue, and a service that
1397 /// emits signals at a high rate. First a listener is created that listens
1398 /// for that signal which should fill the small queue. Then another signal
1399 /// signal listener is created against another signal. Previously, this second
1400 /// call to add the match rule never resolved and resulted in a deadlock.
1401 async fn test_signal_stream_deadlock() -> Result<()> {
1402 #[dbus_proxy(
1403 gen_blocking = false,
1404 default_path = "/org/zbus/Test",
1405 default_service = "org.zbus.Test.MR501",
1406 interface = "org.zbus.Test"
1407 )]
1408 trait Test {
1409 #[dbus_proxy(signal)]
1410 fn my_signal(&self, msg: &str) -> Result<()>;
1411 }
1412
1413 struct TestIface;
1414
1415 #[dbus_interface(name = "org.zbus.Test")]
1416 impl TestIface {
1417 #[dbus_interface(signal)]
1418 async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>;
1419 }
1420
1421 let test_iface = TestIface;
1422 let server_conn = ConnectionBuilder::session()?
1423 .name("org.zbus.Test.MR501")?
1424 .serve_at("/org/zbus/Test", test_iface)?
1425 .build()
1426 .await?;
1427
1428 let client_conn = ConnectionBuilder::session()?.max_queued(1).build().await?;
1429
1430 let test_proxy = TestProxy::new(&client_conn).await?;
1431 let test_prop_proxy = PropertiesProxy::builder(&client_conn)
1432 .destination("org.zbus.Test.MR501")?
1433 .path("/org/zbus/Test")?
1434 .build()
1435 .await?;
1436
1437 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1438
1439 let handle = {
1440 let tx = tx.clone();
1441 let conn = server_conn.clone();
1442 let server_fut = async move {
1443 use std::time::Duration;
1444
1445 #[cfg(not(feature = "tokio"))]
1446 use async_io::Timer;
1447
1448 #[cfg(feature = "tokio")]
1449 use tokio::time::sleep;
1450
1451 let iface_ref = conn
1452 .object_server()
1453 .interface::<_, TestIface>("/org/zbus/Test")
1454 .await
1455 .unwrap();
1456
1457 let context = iface_ref.signal_context();
1458 while !tx.is_closed() {
1459 for _ in 0..10 {
1460 TestIface::my_signal(context, "This is a test")
1461 .await
1462 .unwrap();
1463 }
1464
1465 #[cfg(not(feature = "tokio"))]
1466 Timer::after(Duration::from_millis(5)).await;
1467
1468 #[cfg(feature = "tokio")]
1469 sleep(Duration::from_millis(5)).await;
1470 }
1471 };
1472 server_conn.executor().spawn(server_fut, "server_task")
1473 };
1474
1475 let signal_fut = async {
1476 let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
1477
1478 tx.send(()).await.unwrap();
1479
1480 while let Some(_signal) = signal_stream.next().await {}
1481 };
1482
1483 let prop_fut = async move {
1484 rx.recv().await.unwrap();
1485 let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
1486 };
1487
1488 futures_util::pin_mut!(signal_fut);
1489 futures_util::pin_mut!(prop_fut);
1490
1491 futures_util::future::select(signal_fut, prop_fut).await;
1492
1493 handle.await;
1494
1495 Ok(())
1496 }
1497}
1498