1//! The object server API.
2
3use event_listener::{Event, EventListener};
4use serde::{Deserialize, Serialize};
5use std::{
6 collections::{hash_map::Entry, HashMap},
7 fmt::Write,
8 marker::PhantomData,
9 ops::{Deref, DerefMut},
10 sync::Arc,
11};
12use tracing::{debug, instrument, trace, trace_span, Instrument};
13
14use static_assertions::assert_impl_all;
15use zbus_names::InterfaceName;
16use zvariant::{ObjectPath, OwnedObjectPath, OwnedValue, Signature, Type, Value};
17
18use crate::{
19 async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard},
20 connection::WeakConnection,
21 fdo,
22 fdo::{Introspectable, ManagedObjects, ObjectManager, Peer, Properties},
23 message::{Header, Message},
24 Connection, Error, Result,
25};
26
27mod interface;
28pub(crate) use interface::ArcInterface;
29pub use interface::{DispatchResult, Interface};
30
31mod signal_context;
32pub use signal_context::SignalContext;
33
34/// Opaque structure that derefs to an `Interface` type.
35pub struct InterfaceDeref<'d, I> {
36 iface: RwLockReadGuard<'d, dyn Interface>,
37 phantom: PhantomData<I>,
38}
39
40impl<I> Deref for InterfaceDeref<'_, I>
41where
42 I: Interface,
43{
44 type Target = I;
45
46 fn deref(&self) -> &I {
47 self.iface.downcast_ref::<I>().unwrap()
48 }
49}
50
51/// Opaque structure that mutably derefs to an `Interface` type.
52pub struct InterfaceDerefMut<'d, I> {
53 iface: RwLockWriteGuard<'d, dyn Interface>,
54 phantom: PhantomData<I>,
55}
56
57impl<I> Deref for InterfaceDerefMut<'_, I>
58where
59 I: Interface,
60{
61 type Target = I;
62
63 fn deref(&self) -> &I {
64 self.iface.downcast_ref::<I>().unwrap()
65 }
66}
67
68impl<I> DerefMut for InterfaceDerefMut<'_, I>
69where
70 I: Interface,
71{
72 fn deref_mut(&mut self) -> &mut Self::Target {
73 self.iface.downcast_mut::<I>().unwrap()
74 }
75}
76
77/// Wrapper over an interface, along with its corresponding `SignalContext`
78/// instance. A reference to the underlying interface may be obtained via
79/// [`InterfaceRef::get`] and [`InterfaceRef::get_mut`].
80pub struct InterfaceRef<I> {
81 ctxt: SignalContext<'static>,
82 lock: Arc<RwLock<dyn Interface>>,
83 phantom: PhantomData<I>,
84}
85
86impl<I> InterfaceRef<I>
87where
88 I: 'static,
89{
90 /// Get a reference to the underlying interface.
91 ///
92 /// **WARNING:** If methods (e.g property setters) in `ObjectServer` require `&mut self`
93 /// `ObjectServer` will not be able to access the interface in question until all references
94 /// of this method are dropped, it is highly recommended that the scope of the interface
95 /// returned is restricted.
96 pub async fn get(&self) -> InterfaceDeref<'_, I> {
97 let iface = self.lock.read().await;
98
99 iface
100 .downcast_ref::<I>()
101 .expect("Unexpected interface type");
102
103 InterfaceDeref {
104 iface,
105 phantom: PhantomData,
106 }
107 }
108
109 /// Get a reference to the underlying interface.
110 ///
111 /// **WARNINGS:** Since the `ObjectServer` will not be able to access the interface in question
112 /// until the return value of this method is dropped, it is highly recommended that the scope
113 /// of the interface returned is restricted.
114 ///
115 /// # Errors
116 ///
117 /// If the interface at this instance's path is not valid, `Error::InterfaceNotFound` error is
118 /// returned.
119 ///
120 /// # Examples
121 ///
122 /// ```no_run
123 /// # use std::error::Error;
124 /// # use async_io::block_on;
125 /// # use zbus::{Connection, interface};
126 ///
127 /// struct MyIface(u32);
128 ///
129 /// #[interface(name = "org.myiface.MyIface")]
130 /// impl MyIface {
131 /// #[zbus(property)]
132 /// async fn count(&self) -> u32 {
133 /// self.0
134 /// }
135 /// }
136 ///
137 /// # block_on(async {
138 /// // Setup connection and object_server etc here and then in another part of the code:
139 /// # let connection = Connection::session().await?;
140 /// #
141 /// # let path = "/org/zbus/path";
142 /// # connection.object_server().at(path, MyIface(22)).await?;
143 /// let object_server = connection.object_server();
144 /// let iface_ref = object_server.interface::<_, MyIface>(path).await?;
145 /// let mut iface = iface_ref.get_mut().await;
146 /// iface.0 = 42;
147 /// iface.count_changed(iface_ref.signal_context()).await?;
148 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
149 /// # })?;
150 /// #
151 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
152 /// ```
153 pub async fn get_mut(&self) -> InterfaceDerefMut<'_, I> {
154 let mut iface = self.lock.write().await;
155
156 iface
157 .downcast_ref::<I>()
158 .expect("Unexpected interface type");
159 iface
160 .downcast_mut::<I>()
161 .expect("Unexpected interface type");
162
163 InterfaceDerefMut {
164 iface,
165 phantom: PhantomData,
166 }
167 }
168
169 pub fn signal_context(&self) -> &SignalContext<'static> {
170 &self.ctxt
171 }
172}
173
174impl<I> Clone for InterfaceRef<I> {
175 fn clone(&self) -> Self {
176 Self {
177 ctxt: self.ctxt.clone(),
178 lock: self.lock.clone(),
179 phantom: PhantomData,
180 }
181 }
182}
183
184#[derive(Default, Debug)]
185pub(crate) struct Node {
186 path: OwnedObjectPath,
187 children: HashMap<String, Node>,
188 interfaces: HashMap<InterfaceName<'static>, ArcInterface>,
189}
190
191impl Node {
192 pub(crate) fn new(path: OwnedObjectPath) -> Self {
193 let mut node = Self {
194 path,
195 ..Default::default()
196 };
197 assert!(node.add_interface(Peer));
198 assert!(node.add_interface(Introspectable));
199 assert!(node.add_interface(Properties));
200
201 node
202 }
203
204 // Get the child Node at path.
205 pub(crate) fn get_child(&self, path: &ObjectPath<'_>) -> Option<&Node> {
206 let mut node = self;
207
208 for i in path.split('/').skip(1) {
209 if i.is_empty() {
210 continue;
211 }
212 match node.children.get(i) {
213 Some(n) => node = n,
214 None => return None,
215 }
216 }
217
218 Some(node)
219 }
220
221 // Get the child Node at path. Optionally create one if it doesn't exist.
222 // It also returns the path of parent node that implements ObjectManager (if any). If multiple
223 // parents implement it (they shouldn't), then the closest one is returned.
224 fn get_child_mut(
225 &mut self,
226 path: &ObjectPath<'_>,
227 create: bool,
228 ) -> (Option<&mut Node>, Option<ObjectPath<'_>>) {
229 let mut node = self;
230 let mut node_path = String::new();
231 let mut obj_manager_path = None;
232
233 for i in path.split('/').skip(1) {
234 if i.is_empty() {
235 continue;
236 }
237
238 if node.interfaces.contains_key(&ObjectManager::name()) {
239 obj_manager_path = Some((*node.path).clone());
240 }
241
242 write!(&mut node_path, "/{i}").unwrap();
243 match node.children.entry(i.into()) {
244 Entry::Vacant(e) => {
245 if create {
246 let path = node_path.as_str().try_into().expect("Invalid Object Path");
247 node = e.insert(Node::new(path));
248 } else {
249 return (None, obj_manager_path);
250 }
251 }
252 Entry::Occupied(e) => node = e.into_mut(),
253 }
254 }
255
256 (Some(node), obj_manager_path)
257 }
258
259 pub(crate) fn interface_lock(&self, interface_name: InterfaceName<'_>) -> Option<ArcInterface> {
260 self.interfaces.get(&interface_name).cloned()
261 }
262
263 fn remove_interface(&mut self, interface_name: InterfaceName<'static>) -> bool {
264 self.interfaces.remove(&interface_name).is_some()
265 }
266
267 fn is_empty(&self) -> bool {
268 !self.interfaces.keys().any(|k| {
269 *k != Peer::name()
270 && *k != Introspectable::name()
271 && *k != Properties::name()
272 && *k != ObjectManager::name()
273 })
274 }
275
276 fn remove_node(&mut self, node: &str) -> bool {
277 self.children.remove(node).is_some()
278 }
279
280 fn add_arc_interface(&mut self, name: InterfaceName<'static>, arc_iface: ArcInterface) -> bool {
281 match self.interfaces.entry(name) {
282 Entry::Vacant(e) => {
283 e.insert(arc_iface);
284 true
285 }
286 Entry::Occupied(_) => false,
287 }
288 }
289
290 fn add_interface<I>(&mut self, iface: I) -> bool
291 where
292 I: Interface,
293 {
294 self.add_arc_interface(I::name(), ArcInterface::new(iface))
295 }
296
297 async fn introspect_to_writer<W: Write + Send>(&self, writer: &mut W) {
298 enum Fragment<'a> {
299 /// Represent an unclosed node tree, could be further splitted into sub-`Fragment`s
300 Node {
301 name: &'a str,
302 node: &'a Node,
303 level: usize,
304 },
305 /// Represent a closing `</node>`
306 End { level: usize },
307 }
308
309 let mut stack = Vec::new();
310 stack.push(Fragment::Node {
311 name: "",
312 node: self,
313 level: 0,
314 });
315
316 // This can be seen as traversing the fragment tree in pre-order DFS with formatted XML
317 // fragment, splitted `Fragment::Node`s and `Fragment::End` being current node, left
318 // subtree and right leaf respectively.
319 while let Some(fragment) = stack.pop() {
320 match fragment {
321 Fragment::Node { name, node, level } => {
322 stack.push(Fragment::End { level });
323
324 for (name, node) in &node.children {
325 stack.push(Fragment::Node {
326 name,
327 node,
328 level: level + 2,
329 })
330 }
331
332 if level == 0 {
333 writeln!(
334 writer,
335 r#"
336<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN"
337 "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
338<node>"#
339 )
340 .unwrap();
341 } else {
342 writeln!(
343 writer,
344 "{:indent$}<node name=\"{}\">",
345 "",
346 name,
347 indent = level
348 )
349 .unwrap();
350 }
351
352 for iface in node.interfaces.values() {
353 iface
354 .instance
355 .read()
356 .await
357 .introspect_to_writer(writer, level + 2);
358 }
359 }
360 Fragment::End { level } => {
361 writeln!(writer, "{:indent$}</node>", "", indent = level).unwrap();
362 }
363 }
364 }
365 }
366
367 pub(crate) async fn introspect(&self) -> String {
368 let mut xml = String::with_capacity(1024);
369
370 self.introspect_to_writer(&mut xml).await;
371
372 xml
373 }
374
375 pub(crate) async fn get_managed_objects(&self) -> fdo::Result<ManagedObjects> {
376 let mut managed_objects = ManagedObjects::new();
377
378 // Recursively get all properties of all interfaces of descendants.
379 let mut node_list: Vec<_> = self.children.values().collect();
380 while let Some(node) = node_list.pop() {
381 let mut interfaces = HashMap::new();
382 for iface_name in node.interfaces.keys().filter(|n| {
383 // Filter standard interfaces.
384 *n != &Peer::name()
385 && *n != &Introspectable::name()
386 && *n != &Properties::name()
387 && *n != &ObjectManager::name()
388 }) {
389 let props = node.get_properties(iface_name.clone()).await?;
390 interfaces.insert(iface_name.clone().into(), props);
391 }
392 managed_objects.insert(node.path.clone(), interfaces);
393 node_list.extend(node.children.values());
394 }
395
396 Ok(managed_objects)
397 }
398
399 async fn get_properties(
400 &self,
401 interface_name: InterfaceName<'_>,
402 ) -> fdo::Result<HashMap<String, OwnedValue>> {
403 self.interface_lock(interface_name)
404 .expect("Interface was added but not found")
405 .instance
406 .read()
407 .await
408 .get_all()
409 .await
410 }
411}
412
413/// An object server, holding server-side D-Bus objects & interfaces.
414///
415/// Object servers hold interfaces on various object paths, and expose them over D-Bus.
416///
417/// All object paths will have the standard interfaces implemented on your behalf, such as
418/// `org.freedesktop.DBus.Introspectable` or `org.freedesktop.DBus.Properties`.
419///
420/// # Example
421///
422/// This example exposes the `org.myiface.Example.Quit` method on the `/org/zbus/path`
423/// path.
424///
425/// ```no_run
426/// # use std::error::Error;
427/// use zbus::{Connection, interface};
428/// use event_listener::Event;
429/// # use async_io::block_on;
430///
431/// struct Example {
432/// // Interfaces are owned by the ObjectServer. They can have
433/// // `&mut self` methods.
434/// quit_event: Event,
435/// }
436///
437/// impl Example {
438/// fn new(quit_event: Event) -> Self {
439/// Self { quit_event }
440/// }
441/// }
442///
443/// #[interface(name = "org.myiface.Example")]
444/// impl Example {
445/// // This will be the "Quit" D-Bus method.
446/// async fn quit(&mut self) {
447/// self.quit_event.notify(1);
448/// }
449///
450/// // See `interface` documentation to learn
451/// // how to expose properties & signals as well.
452/// }
453///
454/// # block_on(async {
455/// let connection = Connection::session().await?;
456///
457/// let quit_event = Event::new();
458/// let quit_listener = quit_event.listen();
459/// let interface = Example::new(quit_event);
460/// connection
461/// .object_server()
462/// .at("/org/zbus/path", interface)
463/// .await?;
464///
465/// quit_listener.await;
466/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
467/// # })?;
468/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
469/// ```
470#[derive(Debug)]
471pub struct ObjectServer {
472 conn: WeakConnection,
473 root: RwLock<Node>,
474}
475
476assert_impl_all!(ObjectServer: Send, Sync, Unpin);
477
478impl ObjectServer {
479 /// Creates a new D-Bus `ObjectServer`.
480 pub(crate) fn new(conn: &Connection) -> Self {
481 Self {
482 conn: conn.into(),
483 root: RwLock::new(Node::new("/".try_into().expect("zvariant bug"))),
484 }
485 }
486
487 pub(crate) fn root(&self) -> &RwLock<Node> {
488 &self.root
489 }
490
491 /// Register a D-Bus [`Interface`] at a given path. (see the example above)
492 ///
493 /// Typically you'd want your interfaces to be registered immediately after the associated
494 /// connection is established and therefore use [`zbus::connection::Builder::serve_at`] instead.
495 /// However, there are situations where you'd need to register interfaces dynamically and that's
496 /// where this method becomes useful.
497 ///
498 /// If the interface already exists at this path, returns false.
499 pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
500 where
501 I: Interface,
502 P: TryInto<ObjectPath<'p>>,
503 P::Error: Into<Error>,
504 {
505 self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
506 .await
507 }
508
509 pub(crate) async fn add_arc_interface<'p, P>(
510 &self,
511 path: P,
512 name: InterfaceName<'static>,
513 arc_iface: ArcInterface,
514 ) -> Result<bool>
515 where
516 P: TryInto<ObjectPath<'p>>,
517 P::Error: Into<Error>,
518 {
519 let path = path.try_into().map_err(Into::into)?;
520 let mut root = self.root().write().await;
521 let (node, manager_path) = root.get_child_mut(&path, true);
522 let node = node.unwrap();
523 let added = node.add_arc_interface(name.clone(), arc_iface);
524 if added {
525 if name == ObjectManager::name() {
526 // Just added an object manager. Need to signal all managed objects under it.
527 let ctxt = SignalContext::new(&self.connection(), path)?;
528 let objects = node.get_managed_objects().await?;
529 for (path, owned_interfaces) in objects {
530 let interfaces = owned_interfaces
531 .iter()
532 .map(|(i, props)| {
533 let props = props
534 .iter()
535 .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
536 .collect::<Result<_>>();
537 Ok((i.into(), props?))
538 })
539 .collect::<Result<_>>()?;
540 ObjectManager::interfaces_added(&ctxt, &path, &interfaces).await?;
541 }
542 } else if let Some(manager_path) = manager_path {
543 let ctxt = SignalContext::new(&self.connection(), manager_path.clone())?;
544 let mut interfaces = HashMap::new();
545 let owned_props = node.get_properties(name.clone()).await?;
546 let props = owned_props
547 .iter()
548 .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
549 .collect::<Result<_>>()?;
550 interfaces.insert(name, props);
551
552 ObjectManager::interfaces_added(&ctxt, &path, &interfaces).await?;
553 }
554 }
555
556 Ok(added)
557 }
558
559 /// Unregister a D-Bus [`Interface`] at a given path.
560 ///
561 /// If there are no more interfaces left at that path, destroys the object as well.
562 /// Returns whether the object was destroyed.
563 pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
564 where
565 I: Interface,
566 P: TryInto<ObjectPath<'p>>,
567 P::Error: Into<Error>,
568 {
569 let path = path.try_into().map_err(Into::into)?;
570 let mut root = self.root.write().await;
571 let (node, manager_path) = root.get_child_mut(&path, false);
572 let node = node.ok_or(Error::InterfaceNotFound)?;
573 if !node.remove_interface(I::name()) {
574 return Err(Error::InterfaceNotFound);
575 }
576 if let Some(manager_path) = manager_path {
577 let ctxt = SignalContext::new(&self.connection(), manager_path.clone())?;
578 ObjectManager::interfaces_removed(&ctxt, &path, &[I::name()]).await?;
579 }
580 if node.is_empty() {
581 let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
582 let last_part = path_parts.next().unwrap();
583 let ppath = ObjectPath::from_string_unchecked(
584 path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
585 );
586 root.get_child_mut(&ppath, false)
587 .0
588 .unwrap()
589 .remove_node(last_part);
590 return Ok(true);
591 }
592 Ok(false)
593 }
594
595 /// Get the interface at the given path.
596 ///
597 /// # Errors
598 ///
599 /// If the interface is not registered at the given path, `Error::InterfaceNotFound` error is
600 /// returned.
601 ///
602 /// # Examples
603 ///
604 /// The typical use of this is property changes outside of a dispatched handler:
605 ///
606 /// ```no_run
607 /// # use std::error::Error;
608 /// # use zbus::{Connection, interface};
609 /// # use async_io::block_on;
610 /// #
611 /// struct MyIface(u32);
612 ///
613 /// #[interface(name = "org.myiface.MyIface")]
614 /// impl MyIface {
615 /// #[zbus(property)]
616 /// async fn count(&self) -> u32 {
617 /// self.0
618 /// }
619 /// }
620 ///
621 /// # block_on(async {
622 /// # let connection = Connection::session().await?;
623 /// #
624 /// # let path = "/org/zbus/path";
625 /// # connection.object_server().at(path, MyIface(0)).await?;
626 /// let iface_ref = connection
627 /// .object_server()
628 /// .interface::<_, MyIface>(path).await?;
629 /// let mut iface = iface_ref.get_mut().await;
630 /// iface.0 = 42;
631 /// iface.count_changed(iface_ref.signal_context()).await?;
632 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
633 /// # })?;
634 /// #
635 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
636 /// ```
637 pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
638 where
639 I: Interface,
640 P: TryInto<ObjectPath<'p>>,
641 P::Error: Into<Error>,
642 {
643 let path = path.try_into().map_err(Into::into)?;
644 let root = self.root().read().await;
645 let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
646
647 let lock = node
648 .interface_lock(I::name())
649 .ok_or(Error::InterfaceNotFound)?
650 .instance
651 .clone();
652
653 // Ensure what we return can later be dowcasted safely.
654 lock.read()
655 .await
656 .downcast_ref::<I>()
657 .ok_or(Error::InterfaceNotFound)?;
658
659 let conn = self.connection();
660 // SAFETY: We know that there is a valid path on the node as we already converted w/o error.
661 let ctxt = SignalContext::new(&conn, path).unwrap().into_owned();
662
663 Ok(InterfaceRef {
664 ctxt,
665 lock,
666 phantom: PhantomData,
667 })
668 }
669
670 async fn dispatch_call_to_iface(
671 &self,
672 iface: Arc<RwLock<dyn Interface>>,
673 connection: &Connection,
674 msg: &Message,
675 hdr: &Header<'_>,
676 ) -> fdo::Result<()> {
677 let member = hdr
678 .member()
679 .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
680 let iface_name = hdr
681 .interface()
682 .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
683
684 trace!("acquiring read lock on interface `{}`", iface_name);
685 let read_lock = iface.read().await;
686 trace!("acquired read lock on interface `{}`", iface_name);
687 match read_lock.call(self, connection, msg, member.as_ref()) {
688 DispatchResult::NotFound => {
689 return Err(fdo::Error::UnknownMethod(format!(
690 "Unknown method '{member}'"
691 )));
692 }
693 DispatchResult::Async(f) => {
694 return f.await.map_err(|e| match e {
695 Error::FDO(e) => *e,
696 e => fdo::Error::Failed(format!("{e}")),
697 });
698 }
699 DispatchResult::RequiresMut => {}
700 }
701 drop(read_lock);
702 trace!("acquiring write lock on interface `{}`", iface_name);
703 let mut write_lock = iface.write().await;
704 trace!("acquired write lock on interface `{}`", iface_name);
705 match write_lock.call_mut(self, connection, msg, member.as_ref()) {
706 DispatchResult::NotFound => {}
707 DispatchResult::RequiresMut => {}
708 DispatchResult::Async(f) => {
709 return f.await.map_err(|e| match e {
710 Error::FDO(e) => *e,
711 e => fdo::Error::Failed(format!("{e}")),
712 });
713 }
714 }
715 drop(write_lock);
716 Err(fdo::Error::UnknownMethod(format!(
717 "Unknown method '{member}'"
718 )))
719 }
720
721 async fn dispatch_method_call_try(
722 &self,
723 connection: &Connection,
724 msg: &Message,
725 hdr: &Header<'_>,
726 ) -> fdo::Result<()> {
727 let path = hdr
728 .path()
729 .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
730 let iface_name = hdr
731 .interface()
732 // TODO: In the absence of an INTERFACE field, if two or more interfaces on the same
733 // object have a method with the same name, it is undefined which of those
734 // methods will be invoked. Implementations may choose to either return an
735 // error, or deliver the message as though it had an arbitrary one of those
736 // interfaces.
737 .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
738 // Check that the message has a member before spawning.
739 // Note that an unknown member will still spawn a task. We should instead gather
740 // all the details for the call before spawning.
741 // See also https://github.com/dbus2/zbus/issues/674 for future of Interface.
742 let _ = hdr
743 .member()
744 .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
745
746 // Ensure the root lock isn't held while dispatching the message. That
747 // way, the object server can be mutated during that time.
748 let (iface, with_spawn) = {
749 let root = self.root.read().await;
750 let node = root
751 .get_child(path)
752 .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
753
754 let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
755 fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
756 })?;
757 (iface.instance, iface.spawn_tasks_for_methods)
758 };
759
760 if with_spawn {
761 let executor = connection.executor().clone();
762 let task_name = format!("`{msg}` method dispatcher");
763 let connection = connection.clone();
764 let msg = msg.clone();
765 executor
766 .spawn(
767 async move {
768 let server = connection.object_server();
769 let hdr = msg.header();
770 if let Err(e) = server
771 .dispatch_call_to_iface(iface, &connection, &msg, &hdr)
772 .await
773 {
774 // When not spawning a task, this error is handled by the caller.
775 debug!("Returning error: {}", e);
776 if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
777 debug!(
778 "Error dispatching message. Message: {:?}, error: {:?}",
779 msg, e
780 );
781 }
782 }
783 }
784 .instrument(trace_span!("{}", task_name)),
785 &task_name,
786 )
787 .detach();
788 Ok(())
789 } else {
790 self.dispatch_call_to_iface(iface, connection, msg, hdr)
791 .await
792 }
793 }
794
795 /// Dispatch an incoming message to a registered interface.
796 ///
797 /// The object server will handle the message by:
798 ///
799 /// - looking up the called object path & interface,
800 ///
801 /// - calling the associated method if one exists,
802 ///
803 /// - returning a message (responding to the caller with either a return or error message) to
804 /// the caller through the associated server connection.
805 ///
806 /// Returns an error if the message is malformed.
807 #[instrument(skip(self))]
808 pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
809 let conn = self.connection();
810
811 if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
812 debug!("Returning error: {}", e);
813 conn.reply_dbus_error(hdr, e).await?;
814 }
815 trace!("Handled: {}", msg);
816
817 Ok(())
818 }
819
820 pub(crate) fn connection(&self) -> Connection {
821 self.conn
822 .upgrade()
823 .expect("ObjectServer can't exist w/o an associated Connection")
824 }
825}
826
827impl From<crate::blocking::ObjectServer> for ObjectServer {
828 fn from(server: crate::blocking::ObjectServer) -> Self {
829 server.into_inner()
830 }
831}
832
833/// A response wrapper that notifies after response has been sent.
834///
835/// Sometimes in [`interface`] method implemenations we need to do some other work after the
836/// response has been sent off. This wrapper type allows us to do that. Instead of returning your
837/// intended response type directly, wrap it in this type and return it from your method. The
838/// returned `EventListener` from `new` method will be notified when the response has been sent.
839///
840/// A typical use case is sending off signals after the response has been sent. The easiest way to
841/// do that is to spawn a task from the method that sends the signal but only after being notified
842/// of the response dispatch.
843///
844/// # Caveats
845///
846/// The notification indicates that the response has been sent off, not that destination peer has
847/// received it. That can only be guaranteed for a peer-to-peer connection.
848///
849/// [`interface`]: crate::interface
850#[derive(Debug)]
851pub struct ResponseDispatchNotifier<R> {
852 response: R,
853 event: Option<Event>,
854}
855
856impl<R> ResponseDispatchNotifier<R> {
857 /// Create a new `NotifyResponse`.
858 pub fn new(response: R) -> (Self, EventListener) {
859 let event: Event = Event::new();
860 let listener: EventListener = event.listen();
861 (
862 Self {
863 response,
864 event: Some(event),
865 },
866 listener,
867 )
868 }
869
870 /// Get the response.
871 pub fn response(&self) -> &R {
872 &self.response
873 }
874}
875
876impl<R> Serialize for ResponseDispatchNotifier<R>
877where
878 R: Serialize,
879{
880 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
881 where
882 S: serde::Serializer,
883 {
884 self.response.serialize(serializer)
885 }
886}
887
888impl<'de, R> Deserialize<'de> for ResponseDispatchNotifier<R>
889where
890 R: Deserialize<'de>,
891{
892 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
893 where
894 D: serde::Deserializer<'de>,
895 {
896 Ok(Self {
897 response: R::deserialize(deserializer)?,
898 event: None,
899 })
900 }
901}
902
903impl<R> Type for ResponseDispatchNotifier<R>
904where
905 R: Type,
906{
907 fn signature() -> Signature<'static> {
908 R::signature()
909 }
910}
911
912impl<T> Drop for ResponseDispatchNotifier<T> {
913 fn drop(&mut self) {
914 if let Some(event: Event) = self.event.take() {
915 event.notify(usize::MAX);
916 }
917 }
918}
919