1 | //! The object server API. |
2 | |
3 | use event_listener::{Event, EventListener}; |
4 | use serde::{Deserialize, Serialize}; |
5 | use std::{ |
6 | collections::{hash_map::Entry, HashMap}, |
7 | fmt::Write, |
8 | marker::PhantomData, |
9 | ops::{Deref, DerefMut}, |
10 | sync::Arc, |
11 | }; |
12 | use tracing::{debug, instrument, trace, trace_span, Instrument}; |
13 | |
14 | use static_assertions::assert_impl_all; |
15 | use zbus_names::InterfaceName; |
16 | use zvariant::{ObjectPath, OwnedObjectPath, OwnedValue, Signature, Type, Value}; |
17 | |
18 | use crate::{ |
19 | async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard}, |
20 | connection::WeakConnection, |
21 | fdo, |
22 | fdo::{Introspectable, ManagedObjects, ObjectManager, Peer, Properties}, |
23 | message::{Header, Message}, |
24 | Connection, Error, Result, |
25 | }; |
26 | |
27 | mod interface; |
28 | pub(crate) use interface::ArcInterface; |
29 | pub use interface::{DispatchResult, Interface}; |
30 | |
31 | mod signal_context; |
32 | pub use signal_context::SignalContext; |
33 | |
34 | /// Opaque structure that derefs to an `Interface` type. |
35 | pub struct InterfaceDeref<'d, I> { |
36 | iface: RwLockReadGuard<'d, dyn Interface>, |
37 | phantom: PhantomData<I>, |
38 | } |
39 | |
40 | impl<I> Deref for InterfaceDeref<'_, I> |
41 | where |
42 | I: Interface, |
43 | { |
44 | type Target = I; |
45 | |
46 | fn deref(&self) -> &I { |
47 | self.iface.downcast_ref::<I>().unwrap() |
48 | } |
49 | } |
50 | |
51 | /// Opaque structure that mutably derefs to an `Interface` type. |
52 | pub struct InterfaceDerefMut<'d, I> { |
53 | iface: RwLockWriteGuard<'d, dyn Interface>, |
54 | phantom: PhantomData<I>, |
55 | } |
56 | |
57 | impl<I> Deref for InterfaceDerefMut<'_, I> |
58 | where |
59 | I: Interface, |
60 | { |
61 | type Target = I; |
62 | |
63 | fn deref(&self) -> &I { |
64 | self.iface.downcast_ref::<I>().unwrap() |
65 | } |
66 | } |
67 | |
68 | impl<I> DerefMut for InterfaceDerefMut<'_, I> |
69 | where |
70 | I: Interface, |
71 | { |
72 | fn deref_mut(&mut self) -> &mut Self::Target { |
73 | self.iface.downcast_mut::<I>().unwrap() |
74 | } |
75 | } |
76 | |
77 | /// Wrapper over an interface, along with its corresponding `SignalContext` |
78 | /// instance. A reference to the underlying interface may be obtained via |
79 | /// [`InterfaceRef::get`] and [`InterfaceRef::get_mut`]. |
80 | pub struct InterfaceRef<I> { |
81 | ctxt: SignalContext<'static>, |
82 | lock: Arc<RwLock<dyn Interface>>, |
83 | phantom: PhantomData<I>, |
84 | } |
85 | |
86 | impl<I> InterfaceRef<I> |
87 | where |
88 | I: 'static, |
89 | { |
90 | /// Get a reference to the underlying interface. |
91 | /// |
92 | /// **WARNING:** If methods (e.g property setters) in `ObjectServer` require `&mut self` |
93 | /// `ObjectServer` will not be able to access the interface in question until all references |
94 | /// of this method are dropped, it is highly recommended that the scope of the interface |
95 | /// returned is restricted. |
96 | pub async fn get(&self) -> InterfaceDeref<'_, I> { |
97 | let iface = self.lock.read().await; |
98 | |
99 | iface |
100 | .downcast_ref::<I>() |
101 | .expect("Unexpected interface type" ); |
102 | |
103 | InterfaceDeref { |
104 | iface, |
105 | phantom: PhantomData, |
106 | } |
107 | } |
108 | |
109 | /// Get a reference to the underlying interface. |
110 | /// |
111 | /// **WARNINGS:** Since the `ObjectServer` will not be able to access the interface in question |
112 | /// until the return value of this method is dropped, it is highly recommended that the scope |
113 | /// of the interface returned is restricted. |
114 | /// |
115 | /// # Errors |
116 | /// |
117 | /// If the interface at this instance's path is not valid, `Error::InterfaceNotFound` error is |
118 | /// returned. |
119 | /// |
120 | /// # Examples |
121 | /// |
122 | /// ```no_run |
123 | /// # use std::error::Error; |
124 | /// # use async_io::block_on; |
125 | /// # use zbus::{Connection, interface}; |
126 | /// |
127 | /// struct MyIface(u32); |
128 | /// |
129 | /// #[interface(name = "org.myiface.MyIface" )] |
130 | /// impl MyIface { |
131 | /// #[zbus(property)] |
132 | /// async fn count(&self) -> u32 { |
133 | /// self.0 |
134 | /// } |
135 | /// } |
136 | /// |
137 | /// # block_on(async { |
138 | /// // Setup connection and object_server etc here and then in another part of the code: |
139 | /// # let connection = Connection::session().await?; |
140 | /// # |
141 | /// # let path = "/org/zbus/path" ; |
142 | /// # connection.object_server().at(path, MyIface(22)).await?; |
143 | /// let object_server = connection.object_server(); |
144 | /// let iface_ref = object_server.interface::<_, MyIface>(path).await?; |
145 | /// let mut iface = iface_ref.get_mut().await; |
146 | /// iface.0 = 42; |
147 | /// iface.count_changed(iface_ref.signal_context()).await?; |
148 | /// # Ok::<_, Box<dyn Error + Send + Sync>>(()) |
149 | /// # })?; |
150 | /// # |
151 | /// # Ok::<_, Box<dyn Error + Send + Sync>>(()) |
152 | /// ``` |
153 | pub async fn get_mut(&self) -> InterfaceDerefMut<'_, I> { |
154 | let mut iface = self.lock.write().await; |
155 | |
156 | iface |
157 | .downcast_ref::<I>() |
158 | .expect("Unexpected interface type" ); |
159 | iface |
160 | .downcast_mut::<I>() |
161 | .expect("Unexpected interface type" ); |
162 | |
163 | InterfaceDerefMut { |
164 | iface, |
165 | phantom: PhantomData, |
166 | } |
167 | } |
168 | |
169 | pub fn signal_context(&self) -> &SignalContext<'static> { |
170 | &self.ctxt |
171 | } |
172 | } |
173 | |
174 | impl<I> Clone for InterfaceRef<I> { |
175 | fn clone(&self) -> Self { |
176 | Self { |
177 | ctxt: self.ctxt.clone(), |
178 | lock: self.lock.clone(), |
179 | phantom: PhantomData, |
180 | } |
181 | } |
182 | } |
183 | |
184 | #[derive (Default, Debug)] |
185 | pub(crate) struct Node { |
186 | path: OwnedObjectPath, |
187 | children: HashMap<String, Node>, |
188 | interfaces: HashMap<InterfaceName<'static>, ArcInterface>, |
189 | } |
190 | |
191 | impl Node { |
192 | pub(crate) fn new(path: OwnedObjectPath) -> Self { |
193 | let mut node = Self { |
194 | path, |
195 | ..Default::default() |
196 | }; |
197 | assert!(node.add_interface(Peer)); |
198 | assert!(node.add_interface(Introspectable)); |
199 | assert!(node.add_interface(Properties)); |
200 | |
201 | node |
202 | } |
203 | |
204 | // Get the child Node at path. |
205 | pub(crate) fn get_child(&self, path: &ObjectPath<'_>) -> Option<&Node> { |
206 | let mut node = self; |
207 | |
208 | for i in path.split('/' ).skip(1) { |
209 | if i.is_empty() { |
210 | continue; |
211 | } |
212 | match node.children.get(i) { |
213 | Some(n) => node = n, |
214 | None => return None, |
215 | } |
216 | } |
217 | |
218 | Some(node) |
219 | } |
220 | |
221 | // Get the child Node at path. Optionally create one if it doesn't exist. |
222 | // It also returns the path of parent node that implements ObjectManager (if any). If multiple |
223 | // parents implement it (they shouldn't), then the closest one is returned. |
224 | fn get_child_mut( |
225 | &mut self, |
226 | path: &ObjectPath<'_>, |
227 | create: bool, |
228 | ) -> (Option<&mut Node>, Option<ObjectPath<'_>>) { |
229 | let mut node = self; |
230 | let mut node_path = String::new(); |
231 | let mut obj_manager_path = None; |
232 | |
233 | for i in path.split('/' ).skip(1) { |
234 | if i.is_empty() { |
235 | continue; |
236 | } |
237 | |
238 | if node.interfaces.contains_key(&ObjectManager::name()) { |
239 | obj_manager_path = Some((*node.path).clone()); |
240 | } |
241 | |
242 | write!(&mut node_path, "/ {i}" ).unwrap(); |
243 | match node.children.entry(i.into()) { |
244 | Entry::Vacant(e) => { |
245 | if create { |
246 | let path = node_path.as_str().try_into().expect("Invalid Object Path" ); |
247 | node = e.insert(Node::new(path)); |
248 | } else { |
249 | return (None, obj_manager_path); |
250 | } |
251 | } |
252 | Entry::Occupied(e) => node = e.into_mut(), |
253 | } |
254 | } |
255 | |
256 | (Some(node), obj_manager_path) |
257 | } |
258 | |
259 | pub(crate) fn interface_lock(&self, interface_name: InterfaceName<'_>) -> Option<ArcInterface> { |
260 | self.interfaces.get(&interface_name).cloned() |
261 | } |
262 | |
263 | fn remove_interface(&mut self, interface_name: InterfaceName<'static>) -> bool { |
264 | self.interfaces.remove(&interface_name).is_some() |
265 | } |
266 | |
267 | fn is_empty(&self) -> bool { |
268 | !self.interfaces.keys().any(|k| { |
269 | *k != Peer::name() |
270 | && *k != Introspectable::name() |
271 | && *k != Properties::name() |
272 | && *k != ObjectManager::name() |
273 | }) |
274 | } |
275 | |
276 | fn remove_node(&mut self, node: &str) -> bool { |
277 | self.children.remove(node).is_some() |
278 | } |
279 | |
280 | fn add_arc_interface(&mut self, name: InterfaceName<'static>, arc_iface: ArcInterface) -> bool { |
281 | match self.interfaces.entry(name) { |
282 | Entry::Vacant(e) => { |
283 | e.insert(arc_iface); |
284 | true |
285 | } |
286 | Entry::Occupied(_) => false, |
287 | } |
288 | } |
289 | |
290 | fn add_interface<I>(&mut self, iface: I) -> bool |
291 | where |
292 | I: Interface, |
293 | { |
294 | self.add_arc_interface(I::name(), ArcInterface::new(iface)) |
295 | } |
296 | |
297 | async fn introspect_to_writer<W: Write + Send>(&self, writer: &mut W) { |
298 | enum Fragment<'a> { |
299 | /// Represent an unclosed node tree, could be further splitted into sub-`Fragment`s |
300 | Node { |
301 | name: &'a str, |
302 | node: &'a Node, |
303 | level: usize, |
304 | }, |
305 | /// Represent a closing `</node>` |
306 | End { level: usize }, |
307 | } |
308 | |
309 | let mut stack = Vec::new(); |
310 | stack.push(Fragment::Node { |
311 | name: "" , |
312 | node: self, |
313 | level: 0, |
314 | }); |
315 | |
316 | // This can be seen as traversing the fragment tree in pre-order DFS with formatted XML |
317 | // fragment, splitted `Fragment::Node`s and `Fragment::End` being current node, left |
318 | // subtree and right leaf respectively. |
319 | while let Some(fragment) = stack.pop() { |
320 | match fragment { |
321 | Fragment::Node { name, node, level } => { |
322 | stack.push(Fragment::End { level }); |
323 | |
324 | for (name, node) in &node.children { |
325 | stack.push(Fragment::Node { |
326 | name, |
327 | node, |
328 | level: level + 2, |
329 | }) |
330 | } |
331 | |
332 | if level == 0 { |
333 | writeln!( |
334 | writer, |
335 | r#" |
336 | <!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN" |
337 | "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd"> |
338 | <node>"# |
339 | ) |
340 | .unwrap(); |
341 | } else { |
342 | writeln!( |
343 | writer, |
344 | " {:indent$}<node name= \"{}\">" , |
345 | "" , |
346 | name, |
347 | indent = level |
348 | ) |
349 | .unwrap(); |
350 | } |
351 | |
352 | for iface in node.interfaces.values() { |
353 | iface |
354 | .instance |
355 | .read() |
356 | .await |
357 | .introspect_to_writer(writer, level + 2); |
358 | } |
359 | } |
360 | Fragment::End { level } => { |
361 | writeln!(writer, " {:indent$}</node>" , "" , indent = level).unwrap(); |
362 | } |
363 | } |
364 | } |
365 | } |
366 | |
367 | pub(crate) async fn introspect(&self) -> String { |
368 | let mut xml = String::with_capacity(1024); |
369 | |
370 | self.introspect_to_writer(&mut xml).await; |
371 | |
372 | xml |
373 | } |
374 | |
375 | pub(crate) async fn get_managed_objects(&self) -> fdo::Result<ManagedObjects> { |
376 | let mut managed_objects = ManagedObjects::new(); |
377 | |
378 | // Recursively get all properties of all interfaces of descendants. |
379 | let mut node_list: Vec<_> = self.children.values().collect(); |
380 | while let Some(node) = node_list.pop() { |
381 | let mut interfaces = HashMap::new(); |
382 | for iface_name in node.interfaces.keys().filter(|n| { |
383 | // Filter standard interfaces. |
384 | *n != &Peer::name() |
385 | && *n != &Introspectable::name() |
386 | && *n != &Properties::name() |
387 | && *n != &ObjectManager::name() |
388 | }) { |
389 | let props = node.get_properties(iface_name.clone()).await?; |
390 | interfaces.insert(iface_name.clone().into(), props); |
391 | } |
392 | managed_objects.insert(node.path.clone(), interfaces); |
393 | node_list.extend(node.children.values()); |
394 | } |
395 | |
396 | Ok(managed_objects) |
397 | } |
398 | |
399 | async fn get_properties( |
400 | &self, |
401 | interface_name: InterfaceName<'_>, |
402 | ) -> fdo::Result<HashMap<String, OwnedValue>> { |
403 | self.interface_lock(interface_name) |
404 | .expect("Interface was added but not found" ) |
405 | .instance |
406 | .read() |
407 | .await |
408 | .get_all() |
409 | .await |
410 | } |
411 | } |
412 | |
413 | /// An object server, holding server-side D-Bus objects & interfaces. |
414 | /// |
415 | /// Object servers hold interfaces on various object paths, and expose them over D-Bus. |
416 | /// |
417 | /// All object paths will have the standard interfaces implemented on your behalf, such as |
418 | /// `org.freedesktop.DBus.Introspectable` or `org.freedesktop.DBus.Properties`. |
419 | /// |
420 | /// # Example |
421 | /// |
422 | /// This example exposes the `org.myiface.Example.Quit` method on the `/org/zbus/path` |
423 | /// path. |
424 | /// |
425 | /// ```no_run |
426 | /// # use std::error::Error; |
427 | /// use zbus::{Connection, interface}; |
428 | /// use event_listener::Event; |
429 | /// # use async_io::block_on; |
430 | /// |
431 | /// struct Example { |
432 | /// // Interfaces are owned by the ObjectServer. They can have |
433 | /// // `&mut self` methods. |
434 | /// quit_event: Event, |
435 | /// } |
436 | /// |
437 | /// impl Example { |
438 | /// fn new(quit_event: Event) -> Self { |
439 | /// Self { quit_event } |
440 | /// } |
441 | /// } |
442 | /// |
443 | /// #[interface(name = "org.myiface.Example" )] |
444 | /// impl Example { |
445 | /// // This will be the "Quit" D-Bus method. |
446 | /// async fn quit(&mut self) { |
447 | /// self.quit_event.notify(1); |
448 | /// } |
449 | /// |
450 | /// // See `interface` documentation to learn |
451 | /// // how to expose properties & signals as well. |
452 | /// } |
453 | /// |
454 | /// # block_on(async { |
455 | /// let connection = Connection::session().await?; |
456 | /// |
457 | /// let quit_event = Event::new(); |
458 | /// let quit_listener = quit_event.listen(); |
459 | /// let interface = Example::new(quit_event); |
460 | /// connection |
461 | /// .object_server() |
462 | /// .at("/org/zbus/path" , interface) |
463 | /// .await?; |
464 | /// |
465 | /// quit_listener.await; |
466 | /// # Ok::<_, Box<dyn Error + Send + Sync>>(()) |
467 | /// # })?; |
468 | /// # Ok::<_, Box<dyn Error + Send + Sync>>(()) |
469 | /// ``` |
470 | #[derive (Debug)] |
471 | pub struct ObjectServer { |
472 | conn: WeakConnection, |
473 | root: RwLock<Node>, |
474 | } |
475 | |
476 | assert_impl_all!(ObjectServer: Send, Sync, Unpin); |
477 | |
478 | impl ObjectServer { |
479 | /// Creates a new D-Bus `ObjectServer`. |
480 | pub(crate) fn new(conn: &Connection) -> Self { |
481 | Self { |
482 | conn: conn.into(), |
483 | root: RwLock::new(Node::new("/" .try_into().expect("zvariant bug" ))), |
484 | } |
485 | } |
486 | |
487 | pub(crate) fn root(&self) -> &RwLock<Node> { |
488 | &self.root |
489 | } |
490 | |
491 | /// Register a D-Bus [`Interface`] at a given path. (see the example above) |
492 | /// |
493 | /// Typically you'd want your interfaces to be registered immediately after the associated |
494 | /// connection is established and therefore use [`zbus::connection::Builder::serve_at`] instead. |
495 | /// However, there are situations where you'd need to register interfaces dynamically and that's |
496 | /// where this method becomes useful. |
497 | /// |
498 | /// If the interface already exists at this path, returns false. |
499 | pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool> |
500 | where |
501 | I: Interface, |
502 | P: TryInto<ObjectPath<'p>>, |
503 | P::Error: Into<Error>, |
504 | { |
505 | self.add_arc_interface(path, I::name(), ArcInterface::new(iface)) |
506 | .await |
507 | } |
508 | |
509 | pub(crate) async fn add_arc_interface<'p, P>( |
510 | &self, |
511 | path: P, |
512 | name: InterfaceName<'static>, |
513 | arc_iface: ArcInterface, |
514 | ) -> Result<bool> |
515 | where |
516 | P: TryInto<ObjectPath<'p>>, |
517 | P::Error: Into<Error>, |
518 | { |
519 | let path = path.try_into().map_err(Into::into)?; |
520 | let mut root = self.root().write().await; |
521 | let (node, manager_path) = root.get_child_mut(&path, true); |
522 | let node = node.unwrap(); |
523 | let added = node.add_arc_interface(name.clone(), arc_iface); |
524 | if added { |
525 | if name == ObjectManager::name() { |
526 | // Just added an object manager. Need to signal all managed objects under it. |
527 | let ctxt = SignalContext::new(&self.connection(), path)?; |
528 | let objects = node.get_managed_objects().await?; |
529 | for (path, owned_interfaces) in objects { |
530 | let interfaces = owned_interfaces |
531 | .iter() |
532 | .map(|(i, props)| { |
533 | let props = props |
534 | .iter() |
535 | .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?))) |
536 | .collect::<Result<_>>(); |
537 | Ok((i.into(), props?)) |
538 | }) |
539 | .collect::<Result<_>>()?; |
540 | ObjectManager::interfaces_added(&ctxt, &path, &interfaces).await?; |
541 | } |
542 | } else if let Some(manager_path) = manager_path { |
543 | let ctxt = SignalContext::new(&self.connection(), manager_path.clone())?; |
544 | let mut interfaces = HashMap::new(); |
545 | let owned_props = node.get_properties(name.clone()).await?; |
546 | let props = owned_props |
547 | .iter() |
548 | .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?))) |
549 | .collect::<Result<_>>()?; |
550 | interfaces.insert(name, props); |
551 | |
552 | ObjectManager::interfaces_added(&ctxt, &path, &interfaces).await?; |
553 | } |
554 | } |
555 | |
556 | Ok(added) |
557 | } |
558 | |
559 | /// Unregister a D-Bus [`Interface`] at a given path. |
560 | /// |
561 | /// If there are no more interfaces left at that path, destroys the object as well. |
562 | /// Returns whether the object was destroyed. |
563 | pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool> |
564 | where |
565 | I: Interface, |
566 | P: TryInto<ObjectPath<'p>>, |
567 | P::Error: Into<Error>, |
568 | { |
569 | let path = path.try_into().map_err(Into::into)?; |
570 | let mut root = self.root.write().await; |
571 | let (node, manager_path) = root.get_child_mut(&path, false); |
572 | let node = node.ok_or(Error::InterfaceNotFound)?; |
573 | if !node.remove_interface(I::name()) { |
574 | return Err(Error::InterfaceNotFound); |
575 | } |
576 | if let Some(manager_path) = manager_path { |
577 | let ctxt = SignalContext::new(&self.connection(), manager_path.clone())?; |
578 | ObjectManager::interfaces_removed(&ctxt, &path, &[I::name()]).await?; |
579 | } |
580 | if node.is_empty() { |
581 | let mut path_parts = path.rsplit('/' ).filter(|i| !i.is_empty()); |
582 | let last_part = path_parts.next().unwrap(); |
583 | let ppath = ObjectPath::from_string_unchecked( |
584 | path_parts.fold(String::new(), |a, p| format!("/ {p}{a}" )), |
585 | ); |
586 | root.get_child_mut(&ppath, false) |
587 | .0 |
588 | .unwrap() |
589 | .remove_node(last_part); |
590 | return Ok(true); |
591 | } |
592 | Ok(false) |
593 | } |
594 | |
595 | /// Get the interface at the given path. |
596 | /// |
597 | /// # Errors |
598 | /// |
599 | /// If the interface is not registered at the given path, `Error::InterfaceNotFound` error is |
600 | /// returned. |
601 | /// |
602 | /// # Examples |
603 | /// |
604 | /// The typical use of this is property changes outside of a dispatched handler: |
605 | /// |
606 | /// ```no_run |
607 | /// # use std::error::Error; |
608 | /// # use zbus::{Connection, interface}; |
609 | /// # use async_io::block_on; |
610 | /// # |
611 | /// struct MyIface(u32); |
612 | /// |
613 | /// #[interface(name = "org.myiface.MyIface" )] |
614 | /// impl MyIface { |
615 | /// #[zbus(property)] |
616 | /// async fn count(&self) -> u32 { |
617 | /// self.0 |
618 | /// } |
619 | /// } |
620 | /// |
621 | /// # block_on(async { |
622 | /// # let connection = Connection::session().await?; |
623 | /// # |
624 | /// # let path = "/org/zbus/path" ; |
625 | /// # connection.object_server().at(path, MyIface(0)).await?; |
626 | /// let iface_ref = connection |
627 | /// .object_server() |
628 | /// .interface::<_, MyIface>(path).await?; |
629 | /// let mut iface = iface_ref.get_mut().await; |
630 | /// iface.0 = 42; |
631 | /// iface.count_changed(iface_ref.signal_context()).await?; |
632 | /// # Ok::<_, Box<dyn Error + Send + Sync>>(()) |
633 | /// # })?; |
634 | /// # |
635 | /// # Ok::<_, Box<dyn Error + Send + Sync>>(()) |
636 | /// ``` |
637 | pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>> |
638 | where |
639 | I: Interface, |
640 | P: TryInto<ObjectPath<'p>>, |
641 | P::Error: Into<Error>, |
642 | { |
643 | let path = path.try_into().map_err(Into::into)?; |
644 | let root = self.root().read().await; |
645 | let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?; |
646 | |
647 | let lock = node |
648 | .interface_lock(I::name()) |
649 | .ok_or(Error::InterfaceNotFound)? |
650 | .instance |
651 | .clone(); |
652 | |
653 | // Ensure what we return can later be dowcasted safely. |
654 | lock.read() |
655 | .await |
656 | .downcast_ref::<I>() |
657 | .ok_or(Error::InterfaceNotFound)?; |
658 | |
659 | let conn = self.connection(); |
660 | // SAFETY: We know that there is a valid path on the node as we already converted w/o error. |
661 | let ctxt = SignalContext::new(&conn, path).unwrap().into_owned(); |
662 | |
663 | Ok(InterfaceRef { |
664 | ctxt, |
665 | lock, |
666 | phantom: PhantomData, |
667 | }) |
668 | } |
669 | |
670 | async fn dispatch_call_to_iface( |
671 | &self, |
672 | iface: Arc<RwLock<dyn Interface>>, |
673 | connection: &Connection, |
674 | msg: &Message, |
675 | hdr: &Header<'_>, |
676 | ) -> fdo::Result<()> { |
677 | let member = hdr |
678 | .member() |
679 | .ok_or_else(|| fdo::Error::Failed("Missing member" .into()))?; |
680 | let iface_name = hdr |
681 | .interface() |
682 | .ok_or_else(|| fdo::Error::Failed("Missing interface" .into()))?; |
683 | |
684 | trace!("acquiring read lock on interface ` {}`" , iface_name); |
685 | let read_lock = iface.read().await; |
686 | trace!("acquired read lock on interface ` {}`" , iface_name); |
687 | match read_lock.call(self, connection, msg, member.as_ref()) { |
688 | DispatchResult::NotFound => { |
689 | return Err(fdo::Error::UnknownMethod(format!( |
690 | "Unknown method ' {member}'" |
691 | ))); |
692 | } |
693 | DispatchResult::Async(f) => { |
694 | return f.await.map_err(|e| match e { |
695 | Error::FDO(e) => *e, |
696 | e => fdo::Error::Failed(format!(" {e}" )), |
697 | }); |
698 | } |
699 | DispatchResult::RequiresMut => {} |
700 | } |
701 | drop(read_lock); |
702 | trace!("acquiring write lock on interface ` {}`" , iface_name); |
703 | let mut write_lock = iface.write().await; |
704 | trace!("acquired write lock on interface ` {}`" , iface_name); |
705 | match write_lock.call_mut(self, connection, msg, member.as_ref()) { |
706 | DispatchResult::NotFound => {} |
707 | DispatchResult::RequiresMut => {} |
708 | DispatchResult::Async(f) => { |
709 | return f.await.map_err(|e| match e { |
710 | Error::FDO(e) => *e, |
711 | e => fdo::Error::Failed(format!(" {e}" )), |
712 | }); |
713 | } |
714 | } |
715 | drop(write_lock); |
716 | Err(fdo::Error::UnknownMethod(format!( |
717 | "Unknown method ' {member}'" |
718 | ))) |
719 | } |
720 | |
721 | async fn dispatch_method_call_try( |
722 | &self, |
723 | connection: &Connection, |
724 | msg: &Message, |
725 | hdr: &Header<'_>, |
726 | ) -> fdo::Result<()> { |
727 | let path = hdr |
728 | .path() |
729 | .ok_or_else(|| fdo::Error::Failed("Missing object path" .into()))?; |
730 | let iface_name = hdr |
731 | .interface() |
732 | // TODO: In the absence of an INTERFACE field, if two or more interfaces on the same |
733 | // object have a method with the same name, it is undefined which of those |
734 | // methods will be invoked. Implementations may choose to either return an |
735 | // error, or deliver the message as though it had an arbitrary one of those |
736 | // interfaces. |
737 | .ok_or_else(|| fdo::Error::Failed("Missing interface" .into()))?; |
738 | // Check that the message has a member before spawning. |
739 | // Note that an unknown member will still spawn a task. We should instead gather |
740 | // all the details for the call before spawning. |
741 | // See also https://github.com/dbus2/zbus/issues/674 for future of Interface. |
742 | let _ = hdr |
743 | .member() |
744 | .ok_or_else(|| fdo::Error::Failed("Missing member" .into()))?; |
745 | |
746 | // Ensure the root lock isn't held while dispatching the message. That |
747 | // way, the object server can be mutated during that time. |
748 | let (iface, with_spawn) = { |
749 | let root = self.root.read().await; |
750 | let node = root |
751 | .get_child(path) |
752 | .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object ' {path}'" )))?; |
753 | |
754 | let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| { |
755 | fdo::Error::UnknownInterface(format!("Unknown interface ' {iface_name}'" )) |
756 | })?; |
757 | (iface.instance, iface.spawn_tasks_for_methods) |
758 | }; |
759 | |
760 | if with_spawn { |
761 | let executor = connection.executor().clone(); |
762 | let task_name = format!("` {msg}` method dispatcher" ); |
763 | let connection = connection.clone(); |
764 | let msg = msg.clone(); |
765 | executor |
766 | .spawn( |
767 | async move { |
768 | let server = connection.object_server(); |
769 | let hdr = msg.header(); |
770 | if let Err(e) = server |
771 | .dispatch_call_to_iface(iface, &connection, &msg, &hdr) |
772 | .await |
773 | { |
774 | // When not spawning a task, this error is handled by the caller. |
775 | debug!("Returning error: {}" , e); |
776 | if let Err(e) = connection.reply_dbus_error(&hdr, e).await { |
777 | debug!( |
778 | "Error dispatching message. Message: {:?}, error: {:?}" , |
779 | msg, e |
780 | ); |
781 | } |
782 | } |
783 | } |
784 | .instrument(trace_span!("{}" , task_name)), |
785 | &task_name, |
786 | ) |
787 | .detach(); |
788 | Ok(()) |
789 | } else { |
790 | self.dispatch_call_to_iface(iface, connection, msg, hdr) |
791 | .await |
792 | } |
793 | } |
794 | |
795 | /// Dispatch an incoming message to a registered interface. |
796 | /// |
797 | /// The object server will handle the message by: |
798 | /// |
799 | /// - looking up the called object path & interface, |
800 | /// |
801 | /// - calling the associated method if one exists, |
802 | /// |
803 | /// - returning a message (responding to the caller with either a return or error message) to |
804 | /// the caller through the associated server connection. |
805 | /// |
806 | /// Returns an error if the message is malformed. |
807 | #[instrument (skip(self))] |
808 | pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> { |
809 | let conn = self.connection(); |
810 | |
811 | if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await { |
812 | debug!("Returning error: {}" , e); |
813 | conn.reply_dbus_error(hdr, e).await?; |
814 | } |
815 | trace!("Handled: {}" , msg); |
816 | |
817 | Ok(()) |
818 | } |
819 | |
820 | pub(crate) fn connection(&self) -> Connection { |
821 | self.conn |
822 | .upgrade() |
823 | .expect("ObjectServer can't exist w/o an associated Connection" ) |
824 | } |
825 | } |
826 | |
827 | impl From<crate::blocking::ObjectServer> for ObjectServer { |
828 | fn from(server: crate::blocking::ObjectServer) -> Self { |
829 | server.into_inner() |
830 | } |
831 | } |
832 | |
833 | /// A response wrapper that notifies after response has been sent. |
834 | /// |
835 | /// Sometimes in [`interface`] method implemenations we need to do some other work after the |
836 | /// response has been sent off. This wrapper type allows us to do that. Instead of returning your |
837 | /// intended response type directly, wrap it in this type and return it from your method. The |
838 | /// returned `EventListener` from `new` method will be notified when the response has been sent. |
839 | /// |
840 | /// A typical use case is sending off signals after the response has been sent. The easiest way to |
841 | /// do that is to spawn a task from the method that sends the signal but only after being notified |
842 | /// of the response dispatch. |
843 | /// |
844 | /// # Caveats |
845 | /// |
846 | /// The notification indicates that the response has been sent off, not that destination peer has |
847 | /// received it. That can only be guaranteed for a peer-to-peer connection. |
848 | /// |
849 | /// [`interface`]: crate::interface |
850 | #[derive (Debug)] |
851 | pub struct ResponseDispatchNotifier<R> { |
852 | response: R, |
853 | event: Option<Event>, |
854 | } |
855 | |
856 | impl<R> ResponseDispatchNotifier<R> { |
857 | /// Create a new `NotifyResponse`. |
858 | pub fn new(response: R) -> (Self, EventListener) { |
859 | let event: Event = Event::new(); |
860 | let listener: EventListener = event.listen(); |
861 | ( |
862 | Self { |
863 | response, |
864 | event: Some(event), |
865 | }, |
866 | listener, |
867 | ) |
868 | } |
869 | |
870 | /// Get the response. |
871 | pub fn response(&self) -> &R { |
872 | &self.response |
873 | } |
874 | } |
875 | |
876 | impl<R> Serialize for ResponseDispatchNotifier<R> |
877 | where |
878 | R: Serialize, |
879 | { |
880 | fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> |
881 | where |
882 | S: serde::Serializer, |
883 | { |
884 | self.response.serialize(serializer) |
885 | } |
886 | } |
887 | |
888 | impl<'de, R> Deserialize<'de> for ResponseDispatchNotifier<R> |
889 | where |
890 | R: Deserialize<'de>, |
891 | { |
892 | fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error> |
893 | where |
894 | D: serde::Deserializer<'de>, |
895 | { |
896 | Ok(Self { |
897 | response: R::deserialize(deserializer)?, |
898 | event: None, |
899 | }) |
900 | } |
901 | } |
902 | |
903 | impl<R> Type for ResponseDispatchNotifier<R> |
904 | where |
905 | R: Type, |
906 | { |
907 | fn signature() -> Signature<'static> { |
908 | R::signature() |
909 | } |
910 | } |
911 | |
912 | impl<T> Drop for ResponseDispatchNotifier<T> { |
913 | fn drop(&mut self) { |
914 | if let Some(event: Event) = self.event.take() { |
915 | event.notify(usize::MAX); |
916 | } |
917 | } |
918 | } |
919 | |