1 | //! Connections and proxies that make blocking method calls. |
2 | |
3 | |
4 | use crate::strings::{BusName, Path, Interface, Member}; |
5 | use crate::arg::{AppendAll, ReadAll, IterAppend}; |
6 | use crate::{channel, Error, Message}; |
7 | use crate::message::{MatchRule, SignalArgs, MessageType}; |
8 | use crate::channel::{Channel, BusType, Token}; |
9 | use std::{cell::RefCell, time::Duration, sync::Mutex}; |
10 | use std::sync::atomic::{AtomicBool, Ordering}; |
11 | use crate::filters::Filters; |
12 | |
13 | #[allow (missing_docs)] |
14 | mod generated_org_freedesktop_standard_interfaces; |
15 | mod generated_org_freedesktop_dbus; |
16 | |
17 | /// This module contains some standard interfaces and an easy way to call them. |
18 | /// |
19 | /// See the [D-Bus specification](https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces) for more information about these standard interfaces. |
20 | /// |
21 | /// The code was created by dbus-codegen. |
22 | pub mod stdintf { |
23 | #[allow (missing_docs)] |
24 | pub mod org_freedesktop_dbus { |
25 | pub use super::super::generated_org_freedesktop_standard_interfaces::*; |
26 | |
27 | #[derive (Debug, PartialEq, Eq, Copy, Clone)] |
28 | pub enum RequestNameReply { |
29 | PrimaryOwner = 1, |
30 | InQueue = 2, |
31 | Exists = 3, |
32 | AlreadyOwner = 4, |
33 | } |
34 | |
35 | #[derive (Debug, PartialEq, Eq, Copy, Clone)] |
36 | pub enum ReleaseNameReply { |
37 | Released = 1, |
38 | NonExistent = 2, |
39 | NotOwner = 3, |
40 | } |
41 | |
42 | #[derive (Debug, PartialEq, Eq, Copy, Clone)] |
43 | pub enum EmitsChangedSignal { |
44 | True, |
45 | Invalidates, |
46 | Const, |
47 | False, |
48 | } |
49 | |
50 | pub (crate) fn request_name<S: crate::blocking::BlockingSender>(s: &S, name: &str, allow_replacement: bool, replace_existing: bool, do_not_queue: bool) |
51 | -> Result<RequestNameReply, crate::Error> { |
52 | let flags: u32 = |
53 | if allow_replacement { 1 } else { 0 } + |
54 | if replace_existing { 2 } else { 0 } + |
55 | if do_not_queue { 4 } else { 0 }; |
56 | let proxy = super::proxy(s); |
57 | use super::org_freedesktop::DBus; |
58 | let r = proxy.request_name(name, flags)?; |
59 | use RequestNameReply::*; |
60 | let all = [PrimaryOwner, InQueue, Exists, AlreadyOwner]; |
61 | all.iter().find(|x| **x as u32 == r).copied().ok_or_else(|| |
62 | crate::Error::new_failed("Invalid reply from DBus server" ) |
63 | ) |
64 | } |
65 | |
66 | pub (crate) fn release_name<S: crate::blocking::BlockingSender>(s: &S, name: &str) |
67 | -> Result<ReleaseNameReply, crate::Error> { |
68 | |
69 | let proxy = super::proxy(s); |
70 | use super::org_freedesktop::DBus; |
71 | let r = proxy.release_name(name)?; |
72 | use ReleaseNameReply::*; |
73 | let all = [Released, NonExistent, NotOwner]; |
74 | all.iter().find(|x| **x as u32 == r).copied().ok_or_else(|| |
75 | crate::Error::new_failed("Invalid reply from DBus server" ) |
76 | ) |
77 | } |
78 | |
79 | use crate::arg; |
80 | impl PropertiesPropertiesChanged { |
81 | pub fn add_prop<F: FnOnce() -> Box<dyn arg::RefArg>>(&mut self, prop_name: &str, emits: EmitsChangedSignal, f: F) -> bool { |
82 | match emits { |
83 | EmitsChangedSignal::False => { false }, |
84 | EmitsChangedSignal::Invalidates => { |
85 | if !self.invalidated_properties.iter().any(|x| x == prop_name) { |
86 | self.invalidated_properties.push(prop_name.into()) |
87 | } |
88 | true |
89 | } |
90 | EmitsChangedSignal::True => { |
91 | let val = f(); |
92 | self.changed_properties.insert(prop_name.into(), arg::Variant(val)); |
93 | true |
94 | } |
95 | EmitsChangedSignal::Const => panic!("Called add_prop with EmitsChangedSignal::Const" ) |
96 | } |
97 | } |
98 | } |
99 | } |
100 | |
101 | // Not public yet, because of lack of named arguments |
102 | pub (super) mod org_freedesktop { |
103 | pub(crate) use super::super::generated_org_freedesktop_dbus::*; |
104 | } |
105 | |
106 | pub (crate) fn proxy<C>(c: C) -> crate::blocking::Proxy<'static, C> { |
107 | super::Proxy::new("org.freedesktop.DBus" , "/org/freedesktop/DBus" , std::time::Duration::from_millis(5000), c) |
108 | } |
109 | } |
110 | |
111 | /// A connection to D-Bus, thread local + non-async version |
112 | pub struct LocalConnection { |
113 | channel: Channel, |
114 | filters: RefCell<Filters<LocalFilterCb>>, |
115 | all_signal_matches: AtomicBool, |
116 | } |
117 | |
118 | /// A connection to D-Bus, non-async version where callbacks are Send but not Sync. |
119 | pub struct Connection { |
120 | channel: Channel, |
121 | filters: RefCell<Filters<FilterCb>>, |
122 | all_signal_matches: AtomicBool, |
123 | } |
124 | |
125 | /// A connection to D-Bus, Send + Sync + non-async version |
126 | pub struct SyncConnection { |
127 | channel: Channel, |
128 | filters: Mutex<Filters<SyncFilterCb>>, |
129 | all_signal_matches: AtomicBool, |
130 | } |
131 | |
132 | use crate::blocking::stdintf::org_freedesktop_dbus; |
133 | |
134 | macro_rules! connimpl { |
135 | ($c: ident, $cb: ident $(, $ss:tt)*) => { |
136 | |
137 | type |
138 | $cb = Box<dyn FnMut(Message, &$c) -> bool $(+ $ss)* + 'static>; |
139 | |
140 | |
141 | impl $c { |
142 | |
143 | /// Create a new connection to the session bus. |
144 | pub fn new_session() -> Result<Self, Error> { |
145 | Channel::get_private(BusType::Session).map(From::from) |
146 | } |
147 | |
148 | /// Create a new connection to the system-wide bus. |
149 | pub fn new_system() -> Result<Self, Error> { |
150 | Channel::get_private(BusType::System).map(From::from) |
151 | } |
152 | |
153 | /// Get the connection's unique name. |
154 | /// |
155 | /// It's usually something like ":1.54" |
156 | pub fn unique_name(&self) -> BusName { self.channel.unique_name().unwrap().into() } |
157 | |
158 | /// Create a convenience struct for easier calling of many methods on the same destination and path. |
159 | pub fn with_proxy<'a, 'b, D: Into<BusName<'a>>, P: Into<Path<'a>>>(&'b self, dest: D, path: P, timeout: Duration) -> |
160 | Proxy<'a, &'b Self> { |
161 | Proxy { connection: self, destination: dest.into(), path: path.into(), timeout } |
162 | } |
163 | |
164 | |
165 | /// Request a name on the D-Bus. |
166 | /// |
167 | /// For detailed information on the flags and return values, see the libdbus documentation. |
168 | pub fn request_name<'a, N: Into<BusName<'a>>>(&self, name: N, allow_replacement: bool, replace_existing: bool, do_not_queue: bool) |
169 | -> Result<org_freedesktop_dbus::RequestNameReply, Error> { |
170 | org_freedesktop_dbus::request_name(&self.channel, &name.into(), allow_replacement, replace_existing, do_not_queue) |
171 | } |
172 | |
173 | /// Release a previously requested name on the D-Bus. |
174 | pub fn release_name<'a, N: Into<BusName<'a>>>(&self, name: N) -> Result<org_freedesktop_dbus::ReleaseNameReply, Error> { |
175 | org_freedesktop_dbus::release_name(&self.channel, &name.into()) |
176 | } |
177 | |
178 | /// Adds a new match to the connection, and sets up a callback when this message arrives. |
179 | /// |
180 | /// If multiple [`MatchRule`]s match the same message, then by default only the first match will |
181 | /// get the callback. This behaviour can be changed for signal messages by calling |
182 | /// [`set_signal_match_mode`](Self::set_signal_match_mode). |
183 | /// |
184 | /// The returned value can be used to remove the match. The match is also removed if the callback |
185 | /// returns "false". |
186 | pub fn add_match<S: ReadAll, F>(&self, match_rule: MatchRule<'static>, f: F) -> Result<Token, Error> |
187 | where F: FnMut(S, &Self, &Message) -> bool $(+ $ss)* + 'static { |
188 | let m = match_rule.match_str(); |
189 | self.add_match_no_cb(&m)?; |
190 | use channel::MatchingReceiver; |
191 | Ok(self.start_receive(match_rule, MakeSignal::make(f, m))) |
192 | } |
193 | |
194 | /// Adds a new match to the connection, without setting up a callback when this message arrives. |
195 | pub fn add_match_no_cb(&self, match_str: &str) -> Result<(), Error> { |
196 | use crate::blocking::stdintf::org_freedesktop::DBus; |
197 | let proxy = stdintf::proxy(self); |
198 | proxy.add_match(match_str) |
199 | } |
200 | |
201 | /// Removes a match from the connection, without removing any callbacks. |
202 | pub fn remove_match_no_cb(&self, match_str: &str) -> Result<(), Error> { |
203 | use crate::blocking::stdintf::org_freedesktop::DBus; |
204 | let proxy = stdintf::proxy(self); |
205 | proxy.remove_match(match_str) |
206 | } |
207 | |
208 | /// Removes a previously added match and callback from the connection. |
209 | pub fn remove_match(&self, id: Token) -> Result<(), Error> { |
210 | use channel::MatchingReceiver; |
211 | let (mr, _) = self.stop_receive(id).ok_or_else(|| Error::new_failed("No match with that id found" ))?; |
212 | self.remove_match_no_cb(&mr.match_str()) |
213 | } |
214 | |
215 | /// If true, configures the connection to send signal messages to all matching [`MatchRule`] |
216 | /// filters added with [`add_match`](Self::add_match) rather than just the first one. This comes |
217 | /// with the following gotchas: |
218 | /// |
219 | /// * The messages might be duplicated, so the message serial might be lost (this is |
220 | /// generally not a problem for signals). |
221 | /// * Panicking inside a match callback might mess with other callbacks, causing them |
222 | /// to be permanently dropped. |
223 | /// * Removing other matches from inside a match callback is not supported. |
224 | /// |
225 | /// This is false by default, for a newly-created connection. |
226 | pub fn set_signal_match_mode(&self, match_all: bool) { |
227 | self.all_signal_matches.store(match_all, Ordering::Release); |
228 | } |
229 | |
230 | /// Tries to handle an incoming message if there is one. If there isn't one, |
231 | /// it will wait up to timeout |
232 | /// |
233 | /// This method only takes "&self" instead of "&mut self", but it is a logic error to call |
234 | /// it recursively and might lead to panics or deadlocks. |
235 | /// |
236 | /// For `SyncConnection`: It is also a logic error to call this method from one thread, while |
237 | /// calling this or other methods from other threads. This can lead to messages being lost. |
238 | pub fn process(&self, timeout: Duration) -> Result<bool, Error> { |
239 | if let Some(msg) = self.channel.blocking_pop_message(timeout)? { |
240 | if self.all_signal_matches.load(Ordering::Acquire) && msg.msg_type() == MessageType::Signal { |
241 | // If it's a signal and the mode is enabled, send a copy of the message to all |
242 | // matching filters. |
243 | let matching_filters = self.filters_mut().remove_all_matching(&msg); |
244 | // `matching_filters` needs to be a separate variable and not inlined here, because |
245 | // if it's inline then the `MutexGuard` will live too long and we'll get a deadlock |
246 | // on the next call to `filters_mut()` below. |
247 | for mut ff in matching_filters { |
248 | if let Ok(copy) = msg.duplicate() { |
249 | if ff.2(copy, self) { |
250 | self.filters_mut().insert(ff); |
251 | } |
252 | } else { |
253 | // Silently drop the message, but add the filter back. |
254 | self.filters_mut().insert(ff); |
255 | } |
256 | } |
257 | } else { |
258 | // Otherwise, send the original message to only the first matching filter. |
259 | let ff = self.filters_mut().remove_first_matching(&msg); |
260 | if let Some(mut ff) = ff { |
261 | if ff.2(msg, self) { |
262 | self.filters_mut().insert(ff); |
263 | } |
264 | } else if let Some(reply) = crate::channel::default_reply(&msg) { |
265 | let _ = self.channel.send(reply); |
266 | } |
267 | } |
268 | Ok(true) |
269 | } else { |
270 | Ok(false) |
271 | } |
272 | } |
273 | |
274 | /// The channel for this connection |
275 | pub fn channel(&self) -> &Channel { |
276 | &self.channel |
277 | } |
278 | } |
279 | |
280 | impl BlockingSender for $c { |
281 | fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> { |
282 | self.channel.send_with_reply_and_block(msg, timeout) |
283 | } |
284 | } |
285 | |
286 | impl From<Channel> for $c { |
287 | fn from(channel: Channel) -> $c { $c { |
288 | channel, |
289 | filters: Default::default(), |
290 | all_signal_matches: AtomicBool::new(false), |
291 | } } |
292 | } |
293 | |
294 | impl channel::Sender for $c { |
295 | fn send(&self, msg: Message) -> Result<u32, ()> { self.channel.send(msg) } |
296 | } |
297 | |
298 | impl<S: ReadAll, F: FnMut(S, &$c, &Message) -> bool $(+ $ss)* + 'static> MakeSignal<$cb, S, $c> for F { |
299 | fn make(mut self, mstr: String) -> $cb { |
300 | Box::new(move |msg: Message, conn: &$c| { |
301 | if let Ok(s) = S::read(&mut msg.iter_init()) { |
302 | if self(s, conn, &msg) { return true }; |
303 | let proxy = stdintf::proxy(conn); |
304 | use crate::blocking::stdintf::org_freedesktop::DBus; |
305 | let _ = proxy.remove_match(&mstr); |
306 | false |
307 | } else { true } |
308 | }) |
309 | } |
310 | } |
311 | |
312 | impl channel::MatchingReceiver for $c { |
313 | type F = $cb; |
314 | fn start_receive(&self, m: MatchRule<'static>, f: Self::F) -> Token { |
315 | self.filters_mut().add(m, f) |
316 | } |
317 | fn stop_receive(&self, id: Token) -> Option<(MatchRule<'static>, Self::F)> { |
318 | self.filters_mut().remove(id) |
319 | } |
320 | } |
321 | |
322 | |
323 | |
324 | } |
325 | } |
326 | |
327 | connimpl!(Connection, FilterCb, Send); |
328 | connimpl!(LocalConnection, LocalFilterCb); |
329 | connimpl!(SyncConnection, SyncFilterCb, Send, Sync); |
330 | |
331 | impl Connection { |
332 | fn filters_mut(&self) -> std::cell::RefMut<Filters<FilterCb>> { self.filters.borrow_mut() } |
333 | } |
334 | |
335 | impl LocalConnection { |
336 | fn filters_mut(&self) -> std::cell::RefMut<Filters<LocalFilterCb>> { self.filters.borrow_mut() } |
337 | } |
338 | |
339 | impl SyncConnection { |
340 | fn filters_mut(&self) -> std::sync::MutexGuard<Filters<SyncFilterCb>> { self.filters.lock().unwrap() } |
341 | } |
342 | |
343 | /// Abstraction over different connections |
344 | pub trait BlockingSender { |
345 | /// Sends a message over the D-Bus and blocks, waiting for a reply or a timeout. This is used for method calls. |
346 | /// |
347 | /// Note: In case of an error reply, this is returned as an Err(), not as a Ok(Message) with the error type. |
348 | fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error>; |
349 | } |
350 | |
351 | impl BlockingSender for Channel { |
352 | fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> { |
353 | Channel::send_with_reply_and_block(self, msg, timeout) |
354 | } |
355 | } |
356 | |
357 | /// A struct that wraps a connection, destination and path. |
358 | /// |
359 | /// A D-Bus "Proxy" is a client-side object that corresponds to a remote object on the server side. |
360 | /// Calling methods on the proxy object calls methods on the remote object. |
361 | /// Read more in the [D-Bus tutorial](https://dbus.freedesktop.org/doc/dbus-tutorial.html#proxies) |
362 | #[derive (Clone, Debug)] |
363 | pub struct Proxy<'a, C> { |
364 | /// Destination, i e what D-Bus service you're communicating with |
365 | pub destination: BusName<'a>, |
366 | /// Object path on the destination |
367 | pub path: Path<'a>, |
368 | /// Timeout for method calls |
369 | pub timeout: Duration, |
370 | /// Some way to send and/or receive messages, either blocking or non-blocking. |
371 | pub connection: C, |
372 | } |
373 | |
374 | impl<'a, C> Proxy<'a, C> { |
375 | /// Creates a new proxy struct. |
376 | pub fn new<D: Into<BusName<'a>>, P: Into<Path<'a>>>(dest: D, path: P, timeout: Duration, connection: C) -> Self { |
377 | Proxy { destination: dest.into(), path: path.into(), timeout, connection } |
378 | } |
379 | } |
380 | |
381 | impl<'a, T: BlockingSender, C: std::ops::Deref<Target=T>> Proxy<'a, C> { |
382 | // impl<'a, S: std::convert::AsRef<channel::Sender>> Proxy<'a, S> { |
383 | /// Make a method call using typed input and output arguments, then block waiting for a reply. |
384 | /// |
385 | /// # Example |
386 | /// |
387 | /// ``` |
388 | /// use dbus::blocking::{Connection, Proxy}; |
389 | /// |
390 | /// let conn = Connection::new_session()?; |
391 | /// let proxy = Proxy::new("org.freedesktop.DBus" , "/" , std::time::Duration::from_millis(5000), &conn); |
392 | /// let (has_owner,): (bool,) = proxy.method_call("org.freedesktop.DBus" , "NameHasOwner" , ("dummy.name.without.owner" ,))?; |
393 | /// assert_eq!(has_owner, false); |
394 | /// # Ok::<(), Box<dyn std::error::Error>>(()) |
395 | /// ``` |
396 | pub fn method_call<'i, 'm, R: ReadAll, A: AppendAll, I: Into<Interface<'i>>, M: Into<Member<'m>>>(&self, i: I, m: M, args: A) -> Result<R, Error> { |
397 | let mut msg = Message::method_call(&self.destination, &self.path, &i.into(), &m.into()); |
398 | args.append(&mut IterAppend::new(&mut msg)); |
399 | let r = self.connection.send_with_reply_and_block(msg, self.timeout)?; |
400 | Ok(R::read(&mut r.iter_init())?) |
401 | } |
402 | |
403 | /// Starts matching incoming messages on this destination and path. |
404 | /// |
405 | /// For matching signals, match_signal might be more convenient. |
406 | /// |
407 | /// The match rule will be modified to include this path and destination only. |
408 | /// |
409 | /// If call_add_match is true, will notify the D-Bus server that matching should start. |
410 | pub fn match_start(&self, mut mr: MatchRule<'static>, call_add_match: bool, f: <T as channel::MatchingReceiver>::F) |
411 | -> Result<Token, Error> |
412 | where T: channel::MatchingReceiver { |
413 | mr.path = Some(self.path.clone().into_static()); |
414 | mr.sender = Some(self.destination.clone().into_static()); |
415 | if call_add_match { |
416 | use crate::blocking::stdintf::org_freedesktop::DBus; |
417 | let proxy = stdintf::proxy(&*self.connection); |
418 | proxy.add_match(&mr.match_str())?; |
419 | } |
420 | |
421 | Ok(self.connection.start_receive(mr, f)) |
422 | } |
423 | |
424 | /// Stops matching a signal added with match_start or match_signal. |
425 | /// |
426 | /// If call_remove_match is true, will notify the D-Bus server that matching should stop, |
427 | /// this should be true in case match_signal was used. |
428 | pub fn match_stop(&self, id: Token, call_remove_match: bool) -> Result<(), Error> |
429 | where T: channel::MatchingReceiver { |
430 | if let Some((mr, _)) = self.connection.stop_receive(id) { |
431 | if call_remove_match { |
432 | use crate::blocking::stdintf::org_freedesktop::DBus; |
433 | let proxy = stdintf::proxy(&*self.connection); |
434 | proxy.remove_match(&mr.match_str())?; |
435 | } |
436 | } |
437 | Ok(()) |
438 | } |
439 | |
440 | /// Sets up an incoming signal match, that calls the supplied callback every time the signal is received. |
441 | /// |
442 | /// The returned value can be used to remove the match. The match is also removed if the callback |
443 | /// returns "false". |
444 | pub fn match_signal<S: SignalArgs + ReadAll, F>(&self, f: F) -> Result<Token, Error> |
445 | where T: channel::MatchingReceiver, |
446 | F: MakeSignal<<T as channel::MatchingReceiver>::F, S, T> |
447 | { |
448 | let mr = S::match_rule(Some(&self.destination), Some(&self.path)).static_clone(); |
449 | let ff = f.make(mr.match_str()); |
450 | self.match_start(mr, true, ff) |
451 | } |
452 | } |
453 | |
454 | /// Internal helper trait |
455 | pub trait MakeSignal<G, S, T> { |
456 | /// Internal helper trait |
457 | fn make(self, mstr: String) -> G; |
458 | } |
459 | |
460 | #[test ] |
461 | fn test_add_match() { |
462 | use self::stdintf::org_freedesktop_dbus::PropertiesPropertiesChanged as Ppc; |
463 | let c = Connection::new_session().unwrap(); |
464 | let x = c.add_match(Ppc::match_rule(None, None), |_: Ppc, _, _| { true }).unwrap(); |
465 | c.remove_match(x).unwrap(); |
466 | } |
467 | |
468 | #[test ] |
469 | fn test_conn_send_sync() { |
470 | fn is_send<T: Send>(_: &T) {} |
471 | fn is_sync<T: Sync>(_: &T) {} |
472 | |
473 | let c = SyncConnection::new_session().unwrap(); |
474 | is_send(&c); |
475 | is_sync(&c); |
476 | |
477 | let c = Connection::new_session().unwrap(); |
478 | is_send(&c); |
479 | } |
480 | |
481 | #[test ] |
482 | fn test_peer() { |
483 | let c = Connection::new_session().unwrap(); |
484 | |
485 | let c_name = c.unique_name().into_static(); |
486 | use std::sync::Arc; |
487 | let done = Arc::new(false); |
488 | let d2 = done.clone(); |
489 | let j = std::thread::spawn(move || { |
490 | let c2 = Connection::new_session().unwrap(); |
491 | |
492 | let proxy = c2.with_proxy(c_name, "/" , Duration::from_secs(5)); |
493 | let (s2,): (String,) = proxy.method_call("org.freedesktop.DBus.Peer" , "GetMachineId" , ()).unwrap(); |
494 | println!("{}" , s2); |
495 | assert_eq!(Arc::strong_count(&d2), 2); |
496 | s2 |
497 | }); |
498 | assert_eq!(Arc::strong_count(&done), 2); |
499 | |
500 | for _ in 0..30 { |
501 | c.process(Duration::from_millis(100)).unwrap(); |
502 | if Arc::strong_count(&done) < 2 { break; } |
503 | } |
504 | |
505 | let s2 = j.join().unwrap(); |
506 | |
507 | #[cfg (unix)] |
508 | { |
509 | let proxy = c.with_proxy("org.a11y.Bus" , "/org/a11y/bus" , Duration::from_secs(5)); |
510 | let (s1,): (String,) = proxy.method_call("org.freedesktop.DBus.Peer" , "GetMachineId" , ()).unwrap(); |
511 | |
512 | assert_eq!(s1, s2); |
513 | } |
514 | |
515 | } |
516 | |