1 | use enumflags2::{bitflags , BitFlags}; |
2 | use event_listener::{Event, EventListener}; |
3 | use futures_core::{ready, stream}; |
4 | use futures_util::{future::Either, stream::Map}; |
5 | use once_cell::sync::OnceCell; |
6 | use ordered_stream::{join as join_streams, FromFuture, Join, OrderedStream, PollResult}; |
7 | use static_assertions::assert_impl_all; |
8 | use std::{ |
9 | collections::{HashMap, HashSet}, |
10 | convert::{TryFrom, TryInto}, |
11 | future::Future, |
12 | ops::Deref, |
13 | pin::Pin, |
14 | sync::{Arc, RwLock, RwLockReadGuard}, |
15 | task::{Context, Poll}, |
16 | }; |
17 | use tracing::{debug, info_span, instrument, trace, Instrument}; |
18 | |
19 | use zbus_names::{BusName, InterfaceName, MemberName, UniqueName}; |
20 | use zvariant::{ObjectPath, OwnedValue, Str, Value}; |
21 | |
22 | use crate::{ |
23 | fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy}, |
24 | AsyncDrop, CacheProperties, Connection, Error, Executor, MatchRule, Message, MessageFlags, |
25 | MessageSequence, MessageStream, MessageType, OwnedMatchRule, ProxyBuilder, Result, Task, |
26 | }; |
27 | |
28 | /// A client-side interface proxy. |
29 | /// |
30 | /// A `Proxy` is a helper to interact with an interface on a remote object. |
31 | /// |
32 | /// # Example |
33 | /// |
34 | /// ``` |
35 | /// use std::result::Result; |
36 | /// use std::error::Error; |
37 | /// use zbus::{Connection, Proxy}; |
38 | /// |
39 | /// #[tokio::main] |
40 | /// async fn main() -> Result<(), Box<dyn Error>> { |
41 | /// let connection = Connection::session().await?; |
42 | /// let p = Proxy::new( |
43 | /// &connection, |
44 | /// "org.freedesktop.DBus" , |
45 | /// "/org/freedesktop/DBus" , |
46 | /// "org.freedesktop.DBus" , |
47 | /// ).await?; |
48 | /// // owned return value |
49 | /// let _id: String = p.call("GetId" , &()).await?; |
50 | /// // borrowed return value |
51 | /// let _id: &str = p.call_method("GetId" , &()).await?.body()?; |
52 | /// |
53 | /// Ok(()) |
54 | /// } |
55 | /// ``` |
56 | /// |
57 | /// # Note |
58 | /// |
59 | /// It is recommended to use the [`dbus_proxy`] macro, which provides a more convenient and |
60 | /// type-safe *façade* `Proxy` derived from a Rust trait. |
61 | /// |
62 | /// [`futures` crate]: https://crates.io/crates/futures |
63 | /// [`dbus_proxy`]: attr.dbus_proxy.html |
64 | #[derive (Clone, Debug)] |
65 | pub struct Proxy<'a> { |
66 | pub(crate) inner: Arc<ProxyInner<'a>>, |
67 | } |
68 | |
69 | assert_impl_all!(Proxy<'_>: Send, Sync, Unpin); |
70 | |
71 | /// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen |
72 | /// (and possibly other crates). |
73 | #[derive (derivative::Derivative)] |
74 | #[derivative(Debug)] |
75 | pub(crate) struct ProxyInnerStatic { |
76 | #[derivative(Debug = "ignore" )] |
77 | pub(crate) conn: Connection, |
78 | dest_owner_change_match_rule: OnceCell<OwnedMatchRule>, |
79 | } |
80 | |
81 | #[derive (Debug)] |
82 | pub(crate) struct ProxyInner<'a> { |
83 | inner_without_borrows: ProxyInnerStatic, |
84 | pub(crate) destination: BusName<'a>, |
85 | pub(crate) path: ObjectPath<'a>, |
86 | pub(crate) interface: InterfaceName<'a>, |
87 | |
88 | /// Cache of property values. |
89 | property_cache: Option<OnceCell<(Arc<PropertiesCache>, Task<()>)>>, |
90 | /// Set of properties which do not get cached, by name. |
91 | /// This overrides proxy-level caching behavior. |
92 | uncached_properties: HashSet<Str<'a>>, |
93 | } |
94 | |
95 | impl Drop for ProxyInnerStatic { |
96 | fn drop(&mut self) { |
97 | if let Some(rule: OwnedMatchRule) = self.dest_owner_change_match_rule.take() { |
98 | self.conn.queue_remove_match(rule); |
99 | } |
100 | } |
101 | } |
102 | |
103 | /// A property changed event. |
104 | /// |
105 | /// The property changed event generated by [`PropertyStream`]. |
106 | pub struct PropertyChanged<'a, T> { |
107 | name: &'a str, |
108 | properties: Arc<PropertiesCache>, |
109 | proxy: Proxy<'a>, |
110 | phantom: std::marker::PhantomData<T>, |
111 | } |
112 | |
113 | impl<'a, T> PropertyChanged<'a, T> { |
114 | // The name of the property that changed. |
115 | pub fn name(&self) -> &str { |
116 | self.name |
117 | } |
118 | |
119 | // Get the raw value of the property that changed. |
120 | // |
121 | // If the notification signal contained the new value, it has been cached already and this call |
122 | // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch |
123 | // and cache the new value. |
124 | pub async fn get_raw<'p>(&'p self) -> Result<impl Deref<Target = Value<'static>> + 'p> { |
125 | struct Wrapper<'w> { |
126 | name: &'w str, |
127 | values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>, |
128 | } |
129 | |
130 | impl<'w> Deref for Wrapper<'w> { |
131 | type Target = Value<'static>; |
132 | |
133 | fn deref(&self) -> &Self::Target { |
134 | self.values |
135 | .get(self.name) |
136 | .expect("PropertyStream with no corresponding property" ) |
137 | .value |
138 | .as_ref() |
139 | .expect("PropertyStream with no corresponding property" ) |
140 | } |
141 | } |
142 | |
143 | { |
144 | let values = self.properties.values.read().expect("lock poisoned" ); |
145 | if values |
146 | .get(self.name) |
147 | .expect("PropertyStream with no corresponding property" ) |
148 | .value |
149 | .is_some() |
150 | { |
151 | return Ok(Wrapper { |
152 | name: self.name, |
153 | values, |
154 | }); |
155 | } |
156 | } |
157 | |
158 | // The property was invalidated, so we need to fetch the new value. |
159 | let properties_proxy = self.proxy.properties_proxy(); |
160 | let value = properties_proxy |
161 | .get(self.proxy.inner.interface.clone(), self.name) |
162 | .await |
163 | .map_err(crate::Error::from)?; |
164 | |
165 | // Save the new value |
166 | { |
167 | let mut values = self.properties.values.write().expect("lock poisoned" ); |
168 | |
169 | values |
170 | .get_mut(self.name) |
171 | .expect("PropertyStream with no corresponding property" ) |
172 | .value = Some(value); |
173 | } |
174 | |
175 | Ok(Wrapper { |
176 | name: self.name, |
177 | values: self.properties.values.read().expect("lock poisoned" ), |
178 | }) |
179 | } |
180 | } |
181 | |
182 | impl<T> PropertyChanged<'_, T> |
183 | where |
184 | T: TryFrom<zvariant::OwnedValue>, |
185 | T::Error: Into<crate::Error>, |
186 | { |
187 | // Get the value of the property that changed. |
188 | // |
189 | // If the notification signal contained the new value, it has been cached already and this call |
190 | // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch |
191 | // and cache the new value. |
192 | pub async fn get(&self) -> Result<T> { |
193 | self.get_raw() |
194 | .await |
195 | .and_then(|v: impl Deref>| T::try_from(OwnedValue::from(&*v)).map_err(op:Into::into)) |
196 | } |
197 | } |
198 | |
199 | /// A [`stream::Stream`] implementation that yields property change notifications. |
200 | /// |
201 | /// Use [`Proxy::receive_property_changed`] to create an instance of this type. |
202 | #[derive (derivative::Derivative)] |
203 | #[derivative(Debug)] |
204 | pub struct PropertyStream<'a, T> { |
205 | name: &'a str, |
206 | proxy: Proxy<'a>, |
207 | changed_listener: EventListener, |
208 | phantom: std::marker::PhantomData<T>, |
209 | } |
210 | |
211 | impl<'a, T> stream::Stream for PropertyStream<'a, T> |
212 | where |
213 | T: Unpin, |
214 | { |
215 | type Item = PropertyChanged<'a, T>; |
216 | |
217 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
218 | let m = self.get_mut(); |
219 | let properties = match m.proxy.get_property_cache() { |
220 | Some(properties) => properties.clone(), |
221 | // With no cache, we will get no updates; return immediately |
222 | None => return Poll::Ready(None), |
223 | }; |
224 | ready!(Pin::new(&mut m.changed_listener).poll(cx)); |
225 | |
226 | m.changed_listener = properties |
227 | .values |
228 | .read() |
229 | .expect("lock poisoned" ) |
230 | .get(m.name) |
231 | .expect("PropertyStream with no corresponding property" ) |
232 | .event |
233 | .listen(); |
234 | |
235 | Poll::Ready(Some(PropertyChanged { |
236 | name: m.name, |
237 | properties, |
238 | proxy: m.proxy.clone(), |
239 | phantom: std::marker::PhantomData, |
240 | })) |
241 | } |
242 | } |
243 | |
244 | #[derive (Debug)] |
245 | pub(crate) struct PropertiesCache { |
246 | values: RwLock<HashMap<String, PropertyValue>>, |
247 | caching_result: RwLock<CachingResult>, |
248 | } |
249 | |
250 | #[derive (Debug)] |
251 | enum CachingResult { |
252 | Caching { ready: Event }, |
253 | Cached { result: Result<()> }, |
254 | } |
255 | |
256 | impl PropertiesCache { |
257 | #[instrument (skip_all)] |
258 | fn new( |
259 | proxy: PropertiesProxy<'static>, |
260 | interface: InterfaceName<'static>, |
261 | executor: &Executor<'_>, |
262 | uncached_properties: HashSet<zvariant::Str<'static>>, |
263 | ) -> (Arc<Self>, Task<()>) { |
264 | let cache = Arc::new(PropertiesCache { |
265 | values: Default::default(), |
266 | caching_result: RwLock::new(CachingResult::Caching { |
267 | ready: Event::new(), |
268 | }), |
269 | }); |
270 | |
271 | let cache_clone = cache.clone(); |
272 | let task_name = format!("{interface} proxy caching" ); |
273 | let proxy_caching = async move { |
274 | let result = cache_clone |
275 | .init(proxy, interface, uncached_properties) |
276 | .await; |
277 | let (prop_changes, interface, uncached_properties) = { |
278 | let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned" ); |
279 | let ready = match &*caching_result { |
280 | CachingResult::Caching { ready } => ready, |
281 | // SAFETY: This is the only part of the code that changes this state and it's |
282 | // only run once. |
283 | _ => unreachable!(), |
284 | }; |
285 | match result { |
286 | Ok((prop_changes, interface, uncached_properties)) => { |
287 | ready.notify(usize::MAX); |
288 | *caching_result = CachingResult::Cached { result: Ok(()) }; |
289 | |
290 | (prop_changes, interface, uncached_properties) |
291 | } |
292 | Err(e) => { |
293 | ready.notify(usize::MAX); |
294 | *caching_result = CachingResult::Cached { result: Err(e) }; |
295 | |
296 | return; |
297 | } |
298 | } |
299 | }; |
300 | |
301 | if let Err(e) = cache_clone |
302 | .keep_updated(prop_changes, interface, uncached_properties) |
303 | .await |
304 | { |
305 | debug!("Error keeping properties cache updated: {e}" ); |
306 | } |
307 | } |
308 | .instrument(info_span!("{}" , task_name)); |
309 | let task = executor.spawn(proxy_caching, &task_name); |
310 | |
311 | (cache, task) |
312 | } |
313 | |
314 | // new() runs this in a task it spawns for initialization of properties cache. |
315 | async fn init( |
316 | &self, |
317 | proxy: PropertiesProxy<'static>, |
318 | interface: InterfaceName<'static>, |
319 | uncached_properties: HashSet<zvariant::Str<'static>>, |
320 | ) -> Result<( |
321 | PropertiesChangedStream<'static>, |
322 | InterfaceName<'static>, |
323 | HashSet<zvariant::Str<'static>>, |
324 | )> { |
325 | use ordered_stream::OrderedStreamExt; |
326 | |
327 | let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left); |
328 | |
329 | let get_all = proxy |
330 | .connection() |
331 | .call_method_raw( |
332 | Some(proxy.destination()), |
333 | proxy.path(), |
334 | Some(proxy.interface()), |
335 | "GetAll" , |
336 | BitFlags::empty(), |
337 | &interface, |
338 | ) |
339 | .await |
340 | .map(|r| FromFuture::from(r.expect("no reply" )).map(Either::Right))?; |
341 | |
342 | let mut join = join_streams(prop_changes, get_all); |
343 | |
344 | loop { |
345 | match join.next().await { |
346 | Some(Either::Left(_update)) => { |
347 | // discard updates prior to the initial population |
348 | } |
349 | Some(Either::Right(populate)) => { |
350 | populate?.body().map(|values| { |
351 | self.update_cache(&uncached_properties, &values, Vec::new(), &interface); |
352 | })?; |
353 | break; |
354 | } |
355 | None => break, |
356 | } |
357 | } |
358 | if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() { |
359 | // if an update was buffered, then it happened after the get_all returned and needs to |
360 | // be applied before we discard the join |
361 | if let Ok(args) = update.args() { |
362 | if args.interface_name == interface { |
363 | self.update_cache( |
364 | &uncached_properties, |
365 | &args.changed_properties, |
366 | args.invalidated_properties, |
367 | &interface, |
368 | ); |
369 | } |
370 | } |
371 | } |
372 | // This is needed to avoid a "implementation of `OrderedStream` is not general enough" |
373 | // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead |
374 | // of directly to the stream. |
375 | let prop_changes = join.into_inner().0.into_inner(); |
376 | |
377 | Ok((prop_changes, interface, uncached_properties)) |
378 | } |
379 | |
380 | // new() runs this in a task it spawns for keeping the cache in sync. |
381 | #[instrument (skip_all)] |
382 | async fn keep_updated( |
383 | &self, |
384 | mut prop_changes: PropertiesChangedStream<'static>, |
385 | interface: InterfaceName<'static>, |
386 | uncached_properties: HashSet<zvariant::Str<'static>>, |
387 | ) -> Result<()> { |
388 | use futures_util::StreamExt; |
389 | |
390 | trace!("Listening for property changes on {interface}..." ); |
391 | while let Some(update) = prop_changes.next().await { |
392 | if let Ok(args) = update.args() { |
393 | if args.interface_name == interface { |
394 | self.update_cache( |
395 | &uncached_properties, |
396 | &args.changed_properties, |
397 | args.invalidated_properties, |
398 | &interface, |
399 | ); |
400 | } |
401 | } |
402 | } |
403 | |
404 | Ok(()) |
405 | } |
406 | |
407 | fn update_cache( |
408 | &self, |
409 | uncached_properties: &HashSet<Str<'_>>, |
410 | changed: &HashMap<&str, Value<'_>>, |
411 | invalidated: Vec<&str>, |
412 | interface: &InterfaceName<'_>, |
413 | ) { |
414 | let mut values = self.values.write().expect("lock poisoned" ); |
415 | |
416 | for inval in invalidated { |
417 | if uncached_properties.contains(&Str::from(inval)) { |
418 | debug!( |
419 | "Ignoring invalidation of uncached property ` {}. {}`" , |
420 | interface, inval |
421 | ); |
422 | continue; |
423 | } |
424 | trace!("Property ` {interface}. {inval}` invalidated" ); |
425 | |
426 | if let Some(entry) = values.get_mut(inval) { |
427 | entry.value = None; |
428 | entry.event.notify(usize::MAX); |
429 | } |
430 | } |
431 | |
432 | for (property_name, value) in changed { |
433 | if uncached_properties.contains(&Str::from(*property_name)) { |
434 | debug!( |
435 | "Ignoring update of uncached property ` {}. {}`" , |
436 | interface, property_name |
437 | ); |
438 | continue; |
439 | } |
440 | trace!("Property ` {interface}. {property_name}` updated" ); |
441 | |
442 | let entry = values.entry(property_name.to_string()).or_default(); |
443 | |
444 | entry.value = Some(OwnedValue::from(value)); |
445 | entry.event.notify(usize::MAX); |
446 | } |
447 | } |
448 | |
449 | /// Wait for the cache to be populated and return any error encountered during population |
450 | pub(crate) async fn ready(&self) -> Result<()> { |
451 | let listener = match &*self.caching_result.read().expect("lock poisoned" ) { |
452 | CachingResult::Caching { ready } => ready.listen(), |
453 | CachingResult::Cached { result } => return result.clone(), |
454 | }; |
455 | listener.await; |
456 | |
457 | // It must be ready now. |
458 | match &*self.caching_result.read().expect("lock poisoned" ) { |
459 | // SAFETY: We were just notified that state has changed to `Cached` and we never go back |
460 | // to `Caching` once in `Cached`. |
461 | CachingResult::Caching { .. } => unreachable!(), |
462 | CachingResult::Cached { result } => result.clone(), |
463 | } |
464 | } |
465 | } |
466 | |
467 | impl<'a> ProxyInner<'a> { |
468 | pub(crate) fn new( |
469 | conn: Connection, |
470 | destination: BusName<'a>, |
471 | path: ObjectPath<'a>, |
472 | interface: InterfaceName<'a>, |
473 | cache: CacheProperties, |
474 | uncached_properties: HashSet<Str<'a>>, |
475 | ) -> Self { |
476 | let property_cache = match cache { |
477 | CacheProperties::Yes | CacheProperties::Lazily => Some(OnceCell::new()), |
478 | CacheProperties::No => None, |
479 | }; |
480 | Self { |
481 | inner_without_borrows: ProxyInnerStatic { |
482 | conn, |
483 | dest_owner_change_match_rule: OnceCell::new(), |
484 | }, |
485 | destination, |
486 | path, |
487 | interface, |
488 | property_cache, |
489 | uncached_properties, |
490 | } |
491 | } |
492 | |
493 | /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination. |
494 | /// |
495 | /// If the destination is a unique name, we will not subscribe to the signal. |
496 | pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> { |
497 | if !self.inner_without_borrows.conn.is_bus() { |
498 | // Names don't mean much outside the bus context. |
499 | return Ok(()); |
500 | } |
501 | |
502 | let well_known_name = match &self.destination { |
503 | BusName::WellKnown(well_known_name) => well_known_name, |
504 | BusName::Unique(_) => return Ok(()), |
505 | }; |
506 | |
507 | if self |
508 | .inner_without_borrows |
509 | .dest_owner_change_match_rule |
510 | .get() |
511 | .is_some() |
512 | { |
513 | // Already watching over the bus for any name updates so nothing to do here. |
514 | return Ok(()); |
515 | } |
516 | |
517 | let conn = &self.inner_without_borrows.conn; |
518 | let signal_rule: OwnedMatchRule = MatchRule::builder() |
519 | .msg_type(MessageType::Signal) |
520 | .sender("org.freedesktop.DBus" )? |
521 | .path("/org/freedesktop/DBus" )? |
522 | .interface("org.freedesktop.DBus" )? |
523 | .member("NameOwnerChanged" )? |
524 | .add_arg(well_known_name.as_str())? |
525 | .build() |
526 | .to_owned() |
527 | .into(); |
528 | |
529 | conn.add_match( |
530 | signal_rule.clone(), |
531 | Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED), |
532 | ) |
533 | .await?; |
534 | |
535 | if self |
536 | .inner_without_borrows |
537 | .dest_owner_change_match_rule |
538 | .set(signal_rule.clone()) |
539 | .is_err() |
540 | { |
541 | // we raced another destination_unique_name call and added it twice |
542 | conn.remove_match(signal_rule).await?; |
543 | } |
544 | |
545 | Ok(()) |
546 | } |
547 | } |
548 | |
549 | const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8; |
550 | |
551 | impl<'a> Proxy<'a> { |
552 | /// Create a new `Proxy` for the given destination/path/interface. |
553 | pub async fn new<D, P, I>( |
554 | conn: &Connection, |
555 | destination: D, |
556 | path: P, |
557 | interface: I, |
558 | ) -> Result<Proxy<'a>> |
559 | where |
560 | D: TryInto<BusName<'a>>, |
561 | P: TryInto<ObjectPath<'a>>, |
562 | I: TryInto<InterfaceName<'a>>, |
563 | D::Error: Into<Error>, |
564 | P::Error: Into<Error>, |
565 | I::Error: Into<Error>, |
566 | { |
567 | ProxyBuilder::new_bare(conn) |
568 | .destination(destination)? |
569 | .path(path)? |
570 | .interface(interface)? |
571 | .build() |
572 | .await |
573 | } |
574 | |
575 | /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all |
576 | /// passed arguments. |
577 | pub async fn new_owned<D, P, I>( |
578 | conn: Connection, |
579 | destination: D, |
580 | path: P, |
581 | interface: I, |
582 | ) -> Result<Proxy<'a>> |
583 | where |
584 | D: TryInto<BusName<'static>>, |
585 | P: TryInto<ObjectPath<'static>>, |
586 | I: TryInto<InterfaceName<'static>>, |
587 | D::Error: Into<Error>, |
588 | P::Error: Into<Error>, |
589 | I::Error: Into<Error>, |
590 | { |
591 | ProxyBuilder::new_bare(&conn) |
592 | .destination(destination)? |
593 | .path(path)? |
594 | .interface(interface)? |
595 | .build() |
596 | .await |
597 | } |
598 | |
599 | /// Get a reference to the associated connection. |
600 | pub fn connection(&self) -> &Connection { |
601 | &self.inner.inner_without_borrows.conn |
602 | } |
603 | |
604 | /// Get a reference to the destination service name. |
605 | pub fn destination(&self) -> &BusName<'_> { |
606 | &self.inner.destination |
607 | } |
608 | |
609 | /// Get a reference to the object path. |
610 | pub fn path(&self) -> &ObjectPath<'_> { |
611 | &self.inner.path |
612 | } |
613 | |
614 | /// Get a reference to the interface. |
615 | pub fn interface(&self) -> &InterfaceName<'_> { |
616 | &self.inner.interface |
617 | } |
618 | |
619 | /// Introspect the associated object, and return the XML description. |
620 | /// |
621 | /// See the [xml](xml/index.html) or [quick_xml](quick_xml/index.html) module for parsing the |
622 | /// result. |
623 | pub async fn introspect(&self) -> fdo::Result<String> { |
624 | let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn) |
625 | .destination(&self.inner.destination)? |
626 | .path(&self.inner.path)? |
627 | .build() |
628 | .await?; |
629 | |
630 | proxy.introspect().await |
631 | } |
632 | |
633 | fn properties_proxy(&self) -> PropertiesProxy<'_> { |
634 | PropertiesProxy::builder(&self.inner.inner_without_borrows.conn) |
635 | // Safe because already checked earlier |
636 | .destination(self.inner.destination.as_ref()) |
637 | .unwrap() |
638 | // Safe because already checked earlier |
639 | .path(self.inner.path.as_ref()) |
640 | .unwrap() |
641 | // does not have properties |
642 | .cache_properties(CacheProperties::No) |
643 | .build_internal() |
644 | .unwrap() |
645 | .into() |
646 | } |
647 | |
648 | fn owned_properties_proxy(&self) -> PropertiesProxy<'static> { |
649 | PropertiesProxy::builder(&self.inner.inner_without_borrows.conn) |
650 | // Safe because already checked earlier |
651 | .destination(self.inner.destination.to_owned()) |
652 | .unwrap() |
653 | // Safe because already checked earlier |
654 | .path(self.inner.path.to_owned()) |
655 | .unwrap() |
656 | // does not have properties |
657 | .cache_properties(CacheProperties::No) |
658 | .build_internal() |
659 | .unwrap() |
660 | .into() |
661 | } |
662 | |
663 | /// Get the cache, starting it in the background if needed. |
664 | /// |
665 | /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors |
666 | /// encountered in the population. |
667 | pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> { |
668 | let cache = match &self.inner.property_cache { |
669 | Some(cache) => cache, |
670 | None => return None, |
671 | }; |
672 | let (cache, _) = &cache.get_or_init(|| { |
673 | let proxy = self.owned_properties_proxy(); |
674 | let interface = self.interface().to_owned(); |
675 | let uncached_properties: HashSet<zvariant::Str<'static>> = self |
676 | .inner |
677 | .uncached_properties |
678 | .iter() |
679 | .map(|s| s.to_owned()) |
680 | .collect(); |
681 | let executor = self.connection().executor(); |
682 | |
683 | PropertiesCache::new(proxy, interface, executor, uncached_properties) |
684 | }); |
685 | |
686 | Some(cache) |
687 | } |
688 | |
689 | /// Get the cached value of the property `property_name`. |
690 | /// |
691 | /// This returns `None` if the property is not in the cache. This could be because the cache |
692 | /// was invalidated by an update, because caching was disabled for this property or proxy, or |
693 | /// because the cache has not yet been populated. Use `get_property` to fetch the value from |
694 | /// the peer. |
695 | pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>> |
696 | where |
697 | T: TryFrom<OwnedValue>, |
698 | T::Error: Into<Error>, |
699 | { |
700 | self.cached_property_raw(property_name) |
701 | .as_deref() |
702 | .map(|v| T::try_from(OwnedValue::from(v))) |
703 | .transpose() |
704 | .map_err(Into::into) |
705 | } |
706 | |
707 | /// Get the cached value of the property `property_name`. |
708 | /// |
709 | /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This |
710 | /// is useful if you want to avoid allocations and cloning. |
711 | pub fn cached_property_raw<'p>( |
712 | &'p self, |
713 | property_name: &'p str, |
714 | ) -> Option<impl Deref<Target = Value<'static>> + 'p> { |
715 | if let Some(values) = self |
716 | .inner |
717 | .property_cache |
718 | .as_ref() |
719 | .and_then(OnceCell::get) |
720 | .map(|c| c.0.values.read().expect("lock poisoned" )) |
721 | { |
722 | // ensure that the property is in the cache. |
723 | values |
724 | .get(property_name) |
725 | // if the property value has not yet been cached, this will return None. |
726 | .and_then(|e| e.value.as_ref())?; |
727 | |
728 | struct Wrapper<'a> { |
729 | values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>, |
730 | property_name: &'a str, |
731 | } |
732 | |
733 | impl Deref for Wrapper<'_> { |
734 | type Target = Value<'static>; |
735 | |
736 | fn deref(&self) -> &Self::Target { |
737 | self.values |
738 | .get(self.property_name) |
739 | .and_then(|e| e.value.as_ref()) |
740 | .map(|v| v.deref()) |
741 | .expect("inexistent property" ) |
742 | } |
743 | } |
744 | |
745 | Some(Wrapper { |
746 | values, |
747 | property_name, |
748 | }) |
749 | } else { |
750 | None |
751 | } |
752 | } |
753 | |
754 | async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> { |
755 | Ok(self |
756 | .properties_proxy() |
757 | .get(self.inner.interface.as_ref(), property_name) |
758 | .await?) |
759 | } |
760 | |
761 | /// Get the property `property_name`. |
762 | /// |
763 | /// Get the property value from the cache (if caching is enabled) or call the |
764 | /// `Get` method of the `org.freedesktop.DBus.Properties` interface. |
765 | pub async fn get_property<T>(&self, property_name: &str) -> Result<T> |
766 | where |
767 | T: TryFrom<OwnedValue>, |
768 | T::Error: Into<Error>, |
769 | { |
770 | if let Some(cache) = self.get_property_cache() { |
771 | cache.ready().await?; |
772 | } |
773 | if let Some(value) = self.cached_property(property_name)? { |
774 | return Ok(value); |
775 | } |
776 | |
777 | let value = self.get_proxy_property(property_name).await?; |
778 | value.try_into().map_err(Into::into) |
779 | } |
780 | |
781 | /// Set the property `property_name`. |
782 | /// |
783 | /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface. |
784 | pub async fn set_property<'t, T: 't>(&self, property_name: &str, value: T) -> fdo::Result<()> |
785 | where |
786 | T: Into<Value<'t>>, |
787 | { |
788 | self.properties_proxy() |
789 | .set(self.inner.interface.as_ref(), property_name, &value.into()) |
790 | .await |
791 | } |
792 | |
793 | /// Call a method and return the reply. |
794 | /// |
795 | /// Typically, you would want to use [`call`] method instead. Use this method if you need to |
796 | /// deserialize the reply message manually (this way, you can avoid the memory |
797 | /// allocation/copying, by deserializing the reply to an unowned type). |
798 | /// |
799 | /// [`call`]: struct.Proxy.html#method.call |
800 | pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Arc<Message>> |
801 | where |
802 | M: TryInto<MemberName<'m>>, |
803 | M::Error: Into<Error>, |
804 | B: serde::ser::Serialize + zvariant::DynamicType, |
805 | { |
806 | self.inner |
807 | .inner_without_borrows |
808 | .conn |
809 | .call_method( |
810 | Some(&self.inner.destination), |
811 | self.inner.path.as_str(), |
812 | Some(&self.inner.interface), |
813 | method_name, |
814 | body, |
815 | ) |
816 | .await |
817 | } |
818 | |
819 | /// Call a method and return the reply body. |
820 | /// |
821 | /// Use [`call_method`] instead if you need to deserialize the reply manually/separately. |
822 | /// |
823 | /// [`call_method`]: struct.Proxy.html#method.call_method |
824 | pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R> |
825 | where |
826 | M: TryInto<MemberName<'m>>, |
827 | M::Error: Into<Error>, |
828 | B: serde::ser::Serialize + zvariant::DynamicType, |
829 | R: serde::de::DeserializeOwned + zvariant::Type, |
830 | { |
831 | let reply = self.call_method(method_name, body).await?; |
832 | |
833 | reply.body() |
834 | } |
835 | |
836 | /// Call a method and return the reply body, optionally supplying a set of |
837 | /// method flags to control the way the method call message is sent and handled. |
838 | /// |
839 | /// Use [`call`] instead if you do not need any special handling via additional flags. |
840 | /// If the `NoReplyExpected` flag is passed , this will return None immediately |
841 | /// after sending the message, similar to [`call_noreply`] |
842 | /// |
843 | /// [`call`]: struct.Proxy.html#method.call |
844 | /// [`call_noreply`]: struct.Proxy.html#method.call_noreply |
845 | pub async fn call_with_flags<'m, M, B, R>( |
846 | &self, |
847 | method_name: M, |
848 | flags: BitFlags<MethodFlags>, |
849 | body: &B, |
850 | ) -> Result<Option<R>> |
851 | where |
852 | M: TryInto<MemberName<'m>>, |
853 | M::Error: Into<Error>, |
854 | B: serde::ser::Serialize + zvariant::DynamicType, |
855 | R: serde::de::DeserializeOwned + zvariant::Type, |
856 | { |
857 | let flags = flags |
858 | .iter() |
859 | .map(MessageFlags::from) |
860 | .collect::<BitFlags<_>>(); |
861 | match self |
862 | .inner |
863 | .inner_without_borrows |
864 | .conn |
865 | .call_method_raw( |
866 | Some(self.destination()), |
867 | self.path(), |
868 | Some(self.interface()), |
869 | method_name, |
870 | flags, |
871 | body, |
872 | ) |
873 | .await? |
874 | { |
875 | Some(reply) => reply.await?.body().map(Some), |
876 | None => Ok(None), |
877 | } |
878 | } |
879 | |
880 | /// Call a method without expecting a reply |
881 | /// |
882 | /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply. |
883 | pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()> |
884 | where |
885 | M: TryInto<MemberName<'m>>, |
886 | M::Error: Into<Error>, |
887 | B: serde::ser::Serialize + zvariant::DynamicType, |
888 | { |
889 | self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body) |
890 | .await?; |
891 | Ok(()) |
892 | } |
893 | |
894 | /// Create a stream for signal named `signal_name`. |
895 | pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>> |
896 | where |
897 | M: TryInto<MemberName<'m>>, |
898 | M::Error: Into<Error>, |
899 | { |
900 | self.receive_signal_with_args(signal_name, &[]).await |
901 | } |
902 | |
903 | /// Same as [`Proxy::receive_signal`] but with a filter. |
904 | /// |
905 | /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid |
906 | /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use |
907 | /// this method where possible. Note that this filtering is limited to arguments of string |
908 | /// types. |
909 | /// |
910 | /// The arguments are passed as a tuples of argument index and expected value. |
911 | pub async fn receive_signal_with_args<'m, M>( |
912 | &self, |
913 | signal_name: M, |
914 | args: &[(u8, &str)], |
915 | ) -> Result<SignalStream<'m>> |
916 | where |
917 | M: TryInto<MemberName<'m>>, |
918 | M::Error: Into<Error>, |
919 | { |
920 | let signal_name = signal_name.try_into().map_err(Into::into)?; |
921 | self.receive_signals(Some(signal_name), args).await |
922 | } |
923 | |
924 | async fn receive_signals<'m>( |
925 | &self, |
926 | signal_name: Option<MemberName<'m>>, |
927 | args: &[(u8, &str)], |
928 | ) -> Result<SignalStream<'m>> { |
929 | self.inner.subscribe_dest_owner_change().await?; |
930 | |
931 | SignalStream::new(self.clone(), signal_name, args).await |
932 | } |
933 | |
934 | /// Create a stream for all signals emitted by this service. |
935 | pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> { |
936 | self.receive_signals(None, &[]).await |
937 | } |
938 | |
939 | /// Get a stream to receive property changed events. |
940 | /// |
941 | /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it |
942 | /// will only receive the last update. |
943 | /// |
944 | /// If caching is not enabled on this proxy, the resulting stream will not return any events. |
945 | pub async fn receive_property_changed<'name: 'a, T>( |
946 | &self, |
947 | name: &'name str, |
948 | ) -> PropertyStream<'a, T> { |
949 | let properties = self.get_property_cache(); |
950 | let changed_listener = if let Some(properties) = &properties { |
951 | let mut values = properties.values.write().expect("lock poisoned" ); |
952 | let entry = values |
953 | .entry(name.to_string()) |
954 | .or_insert_with(PropertyValue::default); |
955 | entry.event.listen() |
956 | } else { |
957 | Event::new().listen() |
958 | }; |
959 | |
960 | PropertyStream { |
961 | name, |
962 | proxy: self.clone(), |
963 | changed_listener, |
964 | phantom: std::marker::PhantomData, |
965 | } |
966 | } |
967 | |
968 | /// Get a stream to receive destination owner changed events. |
969 | /// |
970 | /// If the proxy destination is a unique name, the stream will be notified of the peer |
971 | /// disconnection from the bus (with a `None` value). |
972 | /// |
973 | /// If the proxy destination is a well-known name, the stream will be notified whenever the name |
974 | /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the |
975 | /// name is released (with a `None` value). |
976 | /// |
977 | /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it |
978 | /// will only receive the last update. |
979 | pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> { |
980 | use futures_util::StreamExt; |
981 | let dbus_proxy = fdo::DBusProxy::builder(self.connection()) |
982 | .cache_properties(CacheProperties::No) |
983 | .build() |
984 | .await?; |
985 | Ok(OwnerChangedStream { |
986 | stream: dbus_proxy |
987 | .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())]) |
988 | .await? |
989 | .map(Box::new(move |signal| { |
990 | let args = signal.args().unwrap(); |
991 | let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned()); |
992 | |
993 | new_owner |
994 | })), |
995 | name: self.destination().clone(), |
996 | }) |
997 | } |
998 | } |
999 | |
1000 | #[derive (Debug, Default)] |
1001 | struct PropertyValue { |
1002 | value: Option<OwnedValue>, |
1003 | event: Event, |
1004 | } |
1005 | |
1006 | /// Flags to use with [`Proxy::call_with_flags`]. |
1007 | #[bitflags ] |
1008 | #[repr (u8)] |
1009 | #[derive (Debug, Copy, Clone, PartialEq, Eq)] |
1010 | pub enum MethodFlags { |
1011 | /// No response is expected from this method call, regardless of whether the |
1012 | /// signature for the interface method indicates a reply type. When passed, |
1013 | /// `call_with_flags` will return `Ok(None)` immediately after successfully |
1014 | /// sending the method call. |
1015 | /// |
1016 | /// Errors encountered while *making* the call will still be returned as |
1017 | /// an `Err` variant, but any errors that are triggered by the receiver's |
1018 | /// handling of the call will not be delivered. |
1019 | NoReplyExpected = 0x1, |
1020 | |
1021 | /// When set on a call whose destination is a message bus, this flag will instruct |
1022 | /// the bus not to [launch][al] a service to handle the call if no application |
1023 | /// on the bus owns the requested name. |
1024 | /// |
1025 | /// This flag is ignored when using a peer-to-peer connection. |
1026 | /// |
1027 | /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services |
1028 | NoAutoStart = 0x2, |
1029 | |
1030 | /// Indicates to the receiver that this client is prepared to wait for interactive |
1031 | /// authorization, which might take a considerable time to complete. For example, the receiver |
1032 | /// may query the user for confirmation via [polkit] or a similar framework. |
1033 | /// |
1034 | /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/ |
1035 | AllowInteractiveAuth = 0x4, |
1036 | } |
1037 | |
1038 | assert_impl_all!(MethodFlags: Send, Sync, Unpin); |
1039 | |
1040 | impl From<MethodFlags> for MessageFlags { |
1041 | fn from(method_flag: MethodFlags) -> Self { |
1042 | match method_flag { |
1043 | MethodFlags::NoReplyExpected => Self::NoReplyExpected, |
1044 | MethodFlags::NoAutoStart => Self::NoAutoStart, |
1045 | MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth, |
1046 | } |
1047 | } |
1048 | } |
1049 | |
1050 | type OwnerChangedStreamMap<'a> = Map< |
1051 | fdo::NameOwnerChangedStream<'a>, |
1052 | Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>, |
1053 | >; |
1054 | |
1055 | /// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes. |
1056 | /// |
1057 | /// Use [`Proxy::receive_owner_changed`] to create an instance of this type. |
1058 | pub struct OwnerChangedStream<'a> { |
1059 | stream: OwnerChangedStreamMap<'a>, |
1060 | name: BusName<'a>, |
1061 | } |
1062 | |
1063 | assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin); |
1064 | |
1065 | impl OwnerChangedStream<'_> { |
1066 | /// The bus name being tracked. |
1067 | pub fn name(&self) -> &BusName<'_> { |
1068 | &self.name |
1069 | } |
1070 | } |
1071 | |
1072 | impl<'a> stream::Stream for OwnerChangedStream<'a> { |
1073 | type Item = Option<UniqueName<'static>>; |
1074 | |
1075 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1076 | use futures_util::StreamExt; |
1077 | self.get_mut().stream.poll_next_unpin(cx) |
1078 | } |
1079 | } |
1080 | |
1081 | /// A [`stream::Stream`] implementation that yields signal [messages](`Message`). |
1082 | /// |
1083 | /// Use [`Proxy::receive_signal`] to create an instance of this type. |
1084 | /// |
1085 | /// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match |
1086 | /// rule registration and [`AsyncDrop`] in its documentation applies here as well. |
1087 | #[derive (Debug)] |
1088 | pub struct SignalStream<'a> { |
1089 | stream: Join<MessageStream, Option<MessageStream>>, |
1090 | src_unique_name: Option<UniqueName<'static>>, |
1091 | signal_name: Option<MemberName<'a>>, |
1092 | } |
1093 | |
1094 | impl<'a> SignalStream<'a> { |
1095 | /// The signal name. |
1096 | pub fn name(&self) -> Option<&MemberName<'a>> { |
1097 | self.signal_name.as_ref() |
1098 | } |
1099 | |
1100 | async fn new( |
1101 | proxy: Proxy<'_>, |
1102 | signal_name: Option<MemberName<'a>>, |
1103 | args: &[(u8, &str)], |
1104 | ) -> Result<SignalStream<'a>> { |
1105 | let mut rule_builder = MatchRule::builder() |
1106 | .msg_type(MessageType::Signal) |
1107 | .sender(proxy.destination())? |
1108 | .path(proxy.path())? |
1109 | .interface(proxy.interface())?; |
1110 | if let Some(name) = &signal_name { |
1111 | rule_builder = rule_builder.member(name)?; |
1112 | } |
1113 | for (i, arg) in args { |
1114 | rule_builder = rule_builder.arg(*i, *arg)?; |
1115 | } |
1116 | let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into(); |
1117 | let conn = proxy.connection(); |
1118 | |
1119 | let (src_unique_name, stream) = match proxy.destination().to_owned() { |
1120 | BusName::Unique(name) => ( |
1121 | Some(name), |
1122 | join_streams( |
1123 | MessageStream::for_match_rule(signal_rule, conn, None).await?, |
1124 | None, |
1125 | ), |
1126 | ), |
1127 | BusName::WellKnown(name) => { |
1128 | use ordered_stream::OrderedStreamExt; |
1129 | |
1130 | let name_owner_changed_rule = MatchRule::builder() |
1131 | .msg_type(MessageType::Signal) |
1132 | .sender("org.freedesktop.DBus" )? |
1133 | .path("/org/freedesktop/DBus" )? |
1134 | .interface("org.freedesktop.DBus" )? |
1135 | .member("NameOwnerChanged" )? |
1136 | .add_arg(name.as_str())? |
1137 | .build(); |
1138 | let , Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>" title="name_owner_changed_stream">name_owner_changed_stream = MessageStream::for_match_rule( |
1139 | name_owner_changed_rule, |
1140 | conn, |
1141 | Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED), |
1142 | ) |
1143 | .await? |
1144 | .map(Either::Left); |
1145 | |
1146 | let , Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>" title="get_name_owner">get_name_owner = conn |
1147 | .call_method_raw( |
1148 | Some("org.freedesktop.DBus" ), |
1149 | "/org/freedesktop/DBus" , |
1150 | Some("org.freedesktop.DBus" ), |
1151 | "GetNameOwner" , |
1152 | BitFlags::empty(), |
1153 | &name, |
1154 | ) |
1155 | .await |
1156 | .map(|r| FromFuture::from(r.expect("no reply" )).map(Either::Right))?; |
1157 | |
1158 | let mut , Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>, Map, extern "rust-call" Right, Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>>" title="join">join = join_streams(name_owner_changed_stream, get_name_owner); |
1159 | |
1160 | let mut src_unique_name = loop { |
1161 | match join.next().await { |
1162 | Some(Either::Left(Ok(msg))) => { |
1163 | let signal = NameOwnerChanged::from_message(msg) |
1164 | .expect("`NameOwnerChanged` signal stream got wrong message" ); |
1165 | { |
1166 | break signal |
1167 | .args() |
1168 | // SAFETY: The filtering code couldn't have let this through if |
1169 | // args were not in order. |
1170 | .expect("`NameOwnerChanged` signal has no args" ) |
1171 | .new_owner() |
1172 | .as_ref() |
1173 | .map(UniqueName::to_owned); |
1174 | } |
1175 | } |
1176 | Some(Either::Left(Err(_))) => (), |
1177 | Some(Either::Right(Ok(response))) => { |
1178 | break Some(response.body::<UniqueName<'_>>()?.to_owned()) |
1179 | } |
1180 | Some(Either::Right(Err(e))) => { |
1181 | // Probably the name is not owned. Not a problem but let's still log it. |
1182 | debug!("Failed to get owner of {name}: {e}" ); |
1183 | |
1184 | break None; |
1185 | } |
1186 | None => { |
1187 | return Err(Error::InputOutput( |
1188 | std::io::Error::new( |
1189 | std::io::ErrorKind::BrokenPipe, |
1190 | "connection closed" , |
1191 | ) |
1192 | .into(), |
1193 | )) |
1194 | } |
1195 | } |
1196 | }; |
1197 | |
1198 | // Let's take into account any buffered NameOwnerChanged signal. |
1199 | let (, Error>, Result, Error>>(Result, Error>) -> Either, Error>, Result, Error>>>" title="stream">stream, _, queued) = join.into_inner(); |
1200 | if let Some(msg) = queued.and_then(|e| match e.0 { |
1201 | Either::Left(Ok(msg)) => Some(msg), |
1202 | Either::Left(Err(_)) | Either::Right(_) => None, |
1203 | }) { |
1204 | if let Some(signal) = NameOwnerChanged::from_message(msg) { |
1205 | if let Ok(args) = signal.args() { |
1206 | match (args.name(), args.new_owner().deref()) { |
1207 | (BusName::WellKnown(n), Some(new_owner)) if n == &name => { |
1208 | src_unique_name = Some(new_owner.to_owned()); |
1209 | } |
1210 | _ => (), |
1211 | } |
1212 | } |
1213 | } |
1214 | } |
1215 | let name_owner_changed_stream = stream.into_inner(); |
1216 | |
1217 | let stream = join_streams( |
1218 | MessageStream::for_match_rule(signal_rule, conn, None).await?, |
1219 | Some(name_owner_changed_stream), |
1220 | ); |
1221 | |
1222 | (src_unique_name, stream) |
1223 | } |
1224 | }; |
1225 | |
1226 | Ok(SignalStream { |
1227 | stream, |
1228 | src_unique_name, |
1229 | signal_name, |
1230 | }) |
1231 | } |
1232 | |
1233 | fn filter(&mut self, msg: &Arc<Message>) -> Result<bool> { |
1234 | let header = msg.header()?; |
1235 | let sender = header.sender()?; |
1236 | if sender == self.src_unique_name.as_ref() { |
1237 | return Ok(true); |
1238 | } |
1239 | |
1240 | // The src_unique_name must be maintained in lock-step with the applied filter |
1241 | if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) { |
1242 | let args = signal.args()?; |
1243 | self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned()); |
1244 | } |
1245 | |
1246 | Ok(false) |
1247 | } |
1248 | } |
1249 | |
1250 | assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin); |
1251 | |
1252 | impl<'a> stream::Stream for SignalStream<'a> { |
1253 | type Item = Arc<Message>; |
1254 | |
1255 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1256 | OrderedStream::poll_next_before(self, cx, before:None).map(|res: PollResult| res.into_data()) |
1257 | } |
1258 | } |
1259 | |
1260 | impl<'a> OrderedStream for SignalStream<'a> { |
1261 | type Data = Arc<Message>; |
1262 | type Ordering = MessageSequence; |
1263 | |
1264 | fn poll_next_before( |
1265 | self: Pin<&mut Self>, |
1266 | cx: &mut Context<'_>, |
1267 | before: Option<&Self::Ordering>, |
1268 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
1269 | let this = self.get_mut(); |
1270 | loop { |
1271 | match ready!(OrderedStream::poll_next_before( |
1272 | Pin::new(&mut this.stream), |
1273 | cx, |
1274 | before |
1275 | )) { |
1276 | PollResult::Item { data, ordering } => { |
1277 | if let Ok(msg) = data { |
1278 | if let Ok(true) = this.filter(&msg) { |
1279 | return Poll::Ready(PollResult::Item { |
1280 | data: msg, |
1281 | ordering, |
1282 | }); |
1283 | } |
1284 | } |
1285 | } |
1286 | PollResult::Terminated => return Poll::Ready(PollResult::Terminated), |
1287 | PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore), |
1288 | } |
1289 | } |
1290 | } |
1291 | } |
1292 | |
1293 | impl<'a> stream::FusedStream for SignalStream<'a> { |
1294 | fn is_terminated(&self) -> bool { |
1295 | ordered_stream::FusedOrderedStream::is_terminated(&self.stream) |
1296 | } |
1297 | } |
1298 | |
1299 | #[async_trait::async_trait ] |
1300 | impl AsyncDrop for SignalStream<'_> { |
1301 | async fn async_drop(self) { |
1302 | let (signals: MessageStream, names: Option, _buffered: Option<(Result, …>, …)>) = self.stream.into_inner(); |
1303 | signals.async_drop().await; |
1304 | if let Some(names: MessageStream) = names { |
1305 | names.async_drop().await; |
1306 | } |
1307 | } |
1308 | } |
1309 | |
1310 | impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> { |
1311 | fn from(proxy: crate::blocking::Proxy<'a>) -> Self { |
1312 | proxy.into_inner() |
1313 | } |
1314 | } |
1315 | |
1316 | #[cfg (test)] |
1317 | mod tests { |
1318 | use super::*; |
1319 | use crate::{dbus_interface , dbus_proxy , utils::block_on, ConnectionBuilder, SignalContext}; |
1320 | use futures_util::StreamExt; |
1321 | use ntest::timeout; |
1322 | use test_log::test; |
1323 | |
1324 | #[test ] |
1325 | #[timeout(15000)] |
1326 | fn signal() { |
1327 | block_on(test_signal()).unwrap(); |
1328 | } |
1329 | |
1330 | async fn test_signal() -> Result<()> { |
1331 | // Register a well-known name with the session bus and ensure we get the appropriate |
1332 | // signals called for that. |
1333 | let conn = Connection::session().await?; |
1334 | let dest_conn = Connection::session().await?; |
1335 | let unique_name = dest_conn.unique_name().unwrap().clone(); |
1336 | |
1337 | let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest" ; |
1338 | let proxy: Proxy<'_> = ProxyBuilder::new_bare(&conn) |
1339 | .destination(well_known)? |
1340 | .path("/does/not/matter" )? |
1341 | .interface("does.not.matter" )? |
1342 | .build() |
1343 | .await?; |
1344 | let mut owner_changed_stream = proxy.receive_owner_changed().await?; |
1345 | |
1346 | let proxy = fdo::DBusProxy::new(&dest_conn).await?; |
1347 | let mut name_acquired_stream = proxy |
1348 | .receive_signal_with_args("NameAcquired" , &[(0, well_known)]) |
1349 | .await?; |
1350 | |
1351 | let prop_stream = |
1352 | proxy |
1353 | .receive_property_changed("SomeProp" ) |
1354 | .await |
1355 | .filter_map(|changed| async move { |
1356 | let v: Option<u32> = changed.get().await.ok(); |
1357 | dbg!(v) |
1358 | }); |
1359 | drop(proxy); |
1360 | drop(prop_stream); |
1361 | |
1362 | dest_conn.request_name(well_known).await?; |
1363 | |
1364 | let (new_owner, acquired_signal) = |
1365 | futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),); |
1366 | |
1367 | assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name); |
1368 | |
1369 | let acquired_signal = acquired_signal.unwrap(); |
1370 | assert_eq!(acquired_signal.body::<&str>().unwrap(), well_known); |
1371 | |
1372 | let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter" , "does.not.matter" ).await?; |
1373 | let mut unique_name_changed_stream = proxy.receive_owner_changed().await?; |
1374 | |
1375 | drop(dest_conn); |
1376 | name_acquired_stream.async_drop().await; |
1377 | |
1378 | // There shouldn't be an owner anymore. |
1379 | let new_owner = owner_changed_stream.next().await; |
1380 | assert!(new_owner.unwrap().is_none()); |
1381 | |
1382 | let new_unique_owner = unique_name_changed_stream.next().await; |
1383 | assert!(new_unique_owner.unwrap().is_none()); |
1384 | |
1385 | Ok(()) |
1386 | } |
1387 | |
1388 | #[test ] |
1389 | #[timeout(15000)] |
1390 | fn signal_stream_deadlock() { |
1391 | block_on(test_signal_stream_deadlock()).unwrap(); |
1392 | } |
1393 | |
1394 | /// Tests deadlocking in signal reception when the message queue is full. |
1395 | /// |
1396 | /// Creates a connection with a small message queue, and a service that |
1397 | /// emits signals at a high rate. First a listener is created that listens |
1398 | /// for that signal which should fill the small queue. Then another signal |
1399 | /// signal listener is created against another signal. Previously, this second |
1400 | /// call to add the match rule never resolved and resulted in a deadlock. |
1401 | async fn test_signal_stream_deadlock() -> Result<()> { |
1402 | #[dbus_proxy( |
1403 | gen_blocking = false, |
1404 | default_path = "/org/zbus/Test" , |
1405 | default_service = "org.zbus.Test.MR501" , |
1406 | interface = "org.zbus.Test" |
1407 | )] |
1408 | trait Test { |
1409 | #[dbus_proxy(signal)] |
1410 | fn my_signal(&self, msg: &str) -> Result<()>; |
1411 | } |
1412 | |
1413 | struct TestIface; |
1414 | |
1415 | #[dbus_interface(name = "org.zbus.Test" )] |
1416 | impl TestIface { |
1417 | #[dbus_interface(signal)] |
1418 | async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>; |
1419 | } |
1420 | |
1421 | let test_iface = TestIface; |
1422 | let server_conn = ConnectionBuilder::session()? |
1423 | .name("org.zbus.Test.MR501" )? |
1424 | .serve_at("/org/zbus/Test" , test_iface)? |
1425 | .build() |
1426 | .await?; |
1427 | |
1428 | let client_conn = ConnectionBuilder::session()?.max_queued(1).build().await?; |
1429 | |
1430 | let test_proxy = TestProxy::new(&client_conn).await?; |
1431 | let test_prop_proxy = PropertiesProxy::builder(&client_conn) |
1432 | .destination("org.zbus.Test.MR501" )? |
1433 | .path("/org/zbus/Test" )? |
1434 | .build() |
1435 | .await?; |
1436 | |
1437 | let (tx, mut rx) = tokio::sync::mpsc::channel(1); |
1438 | |
1439 | let handle = { |
1440 | let tx = tx.clone(); |
1441 | let conn = server_conn.clone(); |
1442 | let server_fut = async move { |
1443 | use std::time::Duration; |
1444 | |
1445 | #[cfg (not(feature = "tokio" ))] |
1446 | use async_io::Timer; |
1447 | |
1448 | #[cfg (feature = "tokio" )] |
1449 | use tokio::time::sleep; |
1450 | |
1451 | let iface_ref = conn |
1452 | .object_server() |
1453 | .interface::<_, TestIface>("/org/zbus/Test" ) |
1454 | .await |
1455 | .unwrap(); |
1456 | |
1457 | let context = iface_ref.signal_context(); |
1458 | while !tx.is_closed() { |
1459 | for _ in 0..10 { |
1460 | TestIface::my_signal(context, "This is a test" ) |
1461 | .await |
1462 | .unwrap(); |
1463 | } |
1464 | |
1465 | #[cfg (not(feature = "tokio" ))] |
1466 | Timer::after(Duration::from_millis(5)).await; |
1467 | |
1468 | #[cfg (feature = "tokio" )] |
1469 | sleep(Duration::from_millis(5)).await; |
1470 | } |
1471 | }; |
1472 | server_conn.executor().spawn(server_fut, "server_task" ) |
1473 | }; |
1474 | |
1475 | let signal_fut = async { |
1476 | let mut signal_stream = test_proxy.receive_my_signal().await.unwrap(); |
1477 | |
1478 | tx.send(()).await.unwrap(); |
1479 | |
1480 | while let Some(_signal) = signal_stream.next().await {} |
1481 | }; |
1482 | |
1483 | let prop_fut = async move { |
1484 | rx.recv().await.unwrap(); |
1485 | let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap(); |
1486 | }; |
1487 | |
1488 | futures_util::pin_mut!(signal_fut); |
1489 | futures_util::pin_mut!(prop_fut); |
1490 | |
1491 | futures_util::future::select(signal_fut, prop_fut).await; |
1492 | |
1493 | handle.await; |
1494 | |
1495 | Ok(()) |
1496 | } |
1497 | } |
1498 | |