1#![deny(rust_2018_idioms)]
2#![doc(
3 html_logo_url = "https://storage.googleapis.com/fdo-gitlab-uploads/project/avatar/3213/zbus-logomark.png"
4)]
5#![doc = include_str!("../README.md")]
6#![doc(test(attr(
7 warn(unused),
8 deny(warnings),
9 // W/o this, we seem to get some bogus warning about `extern crate zbus`.
10 allow(unused_extern_crates),
11)))]
12
13#[cfg(doctest)]
14mod doctests {
15 // Book markdown checks
16 doc_comment::doctest!("../../book/src/client.md");
17 doc_comment::doctest!("../../book/src/concepts.md");
18 doc_comment::doctest!("../../book/src/connection.md");
19 doc_comment::doctest!("../../book/src/contributors.md");
20 doc_comment::doctest!("../../book/src/introduction.md");
21 doc_comment::doctest!("../../book/src/server.md");
22 doc_comment::doctest!("../../book/src/blocking.md");
23 doc_comment::doctest!("../../book/src/faq.md");
24}
25
26#[cfg(all(not(feature = "async-io"), not(feature = "tokio")))]
27mod error_message {
28 #[cfg(windows)]
29 compile_error!("Either \"async-io\" (default) or \"tokio\" must be enabled. On Windows \"async-io\" is (currently) required for UNIX socket support");
30
31 #[cfg(not(windows))]
32 compile_error!("Either \"async-io\" (default) or \"tokio\" must be enabled.");
33}
34
35#[cfg(windows)]
36mod win32;
37
38mod dbus_error;
39pub use dbus_error::*;
40
41mod error;
42pub use error::*;
43
44mod address;
45pub use address::*;
46
47mod guid;
48pub use guid::*;
49
50mod message;
51pub use message::*;
52
53mod message_builder;
54pub use message_builder::*;
55
56mod message_header;
57pub use message_header::*;
58
59mod message_field;
60pub use message_field::*;
61
62mod message_fields;
63pub use message_fields::*;
64
65mod handshake;
66pub use handshake::AuthMechanism;
67pub(crate) use handshake::*;
68
69mod connection;
70pub use connection::*;
71mod connection_builder;
72pub use connection_builder::*;
73mod message_stream;
74pub use message_stream::*;
75mod object_server;
76pub use object_server::*;
77mod proxy;
78pub use proxy::*;
79mod proxy_builder;
80pub use proxy_builder::*;
81mod signal_context;
82pub use signal_context::*;
83mod interface;
84pub use interface::*;
85mod abstractions;
86pub use abstractions::*;
87mod match_rule;
88pub use match_rule::*;
89mod match_rule_builder;
90pub use match_rule_builder::*;
91mod socket_reader;
92
93mod utils;
94pub use utils::*;
95
96#[macro_use]
97pub mod fdo;
98
99mod raw;
100pub use raw::Socket;
101
102pub mod blocking;
103
104#[cfg(feature = "xml")]
105pub mod xml;
106
107#[cfg(feature = "quick-xml")]
108pub mod quick_xml;
109
110pub use zbus_macros::{dbus_interface, dbus_proxy, DBusError};
111
112// Required for the macros to function within this crate.
113extern crate self as zbus;
114
115// Macro support module, not part of the public API.
116#[doc(hidden)]
117pub mod export {
118 pub use async_trait;
119 pub use futures_core;
120 pub use futures_util;
121 pub use ordered_stream;
122 pub use serde;
123 pub use static_assertions;
124}
125
126pub use zbus_names as names;
127pub use zvariant;
128
129#[cfg(unix)]
130use zvariant::OwnedFd;
131
132#[cfg(test)]
133mod tests {
134 use std::{
135 collections::HashMap,
136 convert::{TryFrom, TryInto},
137 sync::{mpsc::channel, Arc, Condvar, Mutex},
138 };
139 #[cfg(unix)]
140 use std::{
141 fs::File,
142 os::unix::io::{AsRawFd, FromRawFd},
143 };
144
145 use crate::utils::block_on;
146 use enumflags2::BitFlags;
147 use ntest::timeout;
148 use test_log::test;
149 use tracing::{debug, instrument, trace};
150
151 use zbus_names::UniqueName;
152 #[cfg(unix)]
153 use zvariant::Fd;
154 use zvariant::{OwnedObjectPath, OwnedValue, Type};
155
156 use crate::{
157 blocking::{self, MessageIterator},
158 fdo::{RequestNameFlags, RequestNameReply},
159 Connection, Message, MessageFlags, Result, SignalContext,
160 };
161
162 fn is_gdbus_test() -> bool {
163 std::env::var_os("ZBUS_GDBUS_TEST").is_some()
164 }
165
166 #[test]
167 fn msg() {
168 let mut m = Message::method(
169 None::<()>,
170 Some("org.freedesktop.DBus"),
171 "/org/freedesktop/DBus",
172 Some("org.freedesktop.DBus.Peer"),
173 "GetMachineId",
174 &(),
175 )
176 .unwrap();
177 assert_eq!(m.path().unwrap(), "/org/freedesktop/DBus");
178 assert_eq!(m.interface().unwrap(), "org.freedesktop.DBus.Peer");
179 assert_eq!(m.member().unwrap(), "GetMachineId");
180 m.modify_primary_header(|primary| {
181 primary.set_flags(BitFlags::from(MessageFlags::NoAutoStart));
182 primary.serial_num_or_init(|| 11);
183
184 Ok(())
185 })
186 .unwrap();
187 let primary = m.primary_header();
188 assert!(*primary.serial_num().unwrap() == 11);
189 assert!(primary.flags() == MessageFlags::NoAutoStart);
190 }
191
192 #[test]
193 #[timeout(15000)]
194 #[instrument]
195 fn basic_connection() {
196 let connection = blocking::Connection::session()
197 .map_err(|e| {
198 debug!("error: {}", e);
199
200 e
201 })
202 .unwrap();
203 // Hello method is already called during connection creation so subsequent calls are
204 // expected to fail but only with a D-Bus error.
205 match connection.call_method(
206 Some("org.freedesktop.DBus"),
207 "/org/freedesktop/DBus",
208 Some("org.freedesktop.DBus"),
209 "Hello",
210 &(),
211 ) {
212 Err(crate::Error::MethodError(_, _, _)) => (),
213 Err(e) => panic!("{}", e),
214
215 // GDBus allows the method to be called multiple times
216 Ok(_) if is_gdbus_test() => (),
217
218 _ => panic!(),
219 };
220 }
221
222 #[test]
223 #[timeout(15000)]
224 fn basic_connection_async() {
225 block_on(test_basic_connection()).unwrap();
226 }
227
228 async fn test_basic_connection() -> Result<()> {
229 let connection = Connection::session().await?;
230
231 match connection
232 .call_method(
233 Some("org.freedesktop.DBus"),
234 "/org/freedesktop/DBus",
235 Some("org.freedesktop.DBus"),
236 "Hello",
237 &(),
238 )
239 .await
240 {
241 Err(crate::Error::MethodError(_, _, _)) => (),
242 Err(e) => panic!("{}", e),
243
244 // GDBus allows the method to be called multiple times
245 Ok(_) if is_gdbus_test() => (),
246
247 _ => panic!(),
248 };
249
250 Ok(())
251 }
252
253 #[cfg(all(unix, not(target_os = "macos")))]
254 #[test]
255 #[timeout(15000)]
256 fn fdpass_systemd() {
257 let connection = blocking::Connection::system().unwrap();
258
259 let reply = connection
260 .call_method(
261 Some("org.freedesktop.systemd1"),
262 "/org/freedesktop/systemd1",
263 Some("org.freedesktop.systemd1.Manager"),
264 "DumpByFileDescriptor",
265 &(),
266 )
267 .unwrap();
268
269 assert!(reply
270 .body_signature()
271 .map(|s| s == <Fd>::signature())
272 .unwrap());
273
274 let fd: Fd = reply.body().unwrap();
275 let _fds = reply.take_fds();
276 assert!(fd.as_raw_fd() >= 0);
277 let f = unsafe { File::from_raw_fd(fd.as_raw_fd()) };
278 f.metadata().unwrap();
279 }
280
281 #[test]
282 #[instrument]
283 #[timeout(15000)]
284 fn freedesktop_api() {
285 let connection = blocking::Connection::session()
286 .map_err(|e| {
287 debug!("error: {}", e);
288
289 e
290 })
291 .unwrap();
292
293 let reply = connection
294 .call_method(
295 Some("org.freedesktop.DBus"),
296 "/org/freedesktop/DBus",
297 Some("org.freedesktop.DBus"),
298 "RequestName",
299 &(
300 "org.freedesktop.zbus.sync",
301 BitFlags::from(RequestNameFlags::ReplaceExisting),
302 ),
303 )
304 .unwrap();
305
306 assert!(reply.body_signature().map(|s| s == "u").unwrap());
307 let reply: RequestNameReply = reply.body().unwrap();
308 assert_eq!(reply, RequestNameReply::PrimaryOwner);
309
310 let reply = connection
311 .call_method(
312 Some("org.freedesktop.DBus"),
313 "/org/freedesktop/DBus",
314 Some("org.freedesktop.DBus"),
315 "GetId",
316 &(),
317 )
318 .unwrap();
319
320 assert!(reply
321 .body_signature()
322 .map(|s| s == <&str>::signature())
323 .unwrap());
324 let id: &str = reply.body().unwrap();
325 debug!("Unique ID of the bus: {}", id);
326
327 let reply = connection
328 .call_method(
329 Some("org.freedesktop.DBus"),
330 "/org/freedesktop/DBus",
331 Some("org.freedesktop.DBus"),
332 "NameHasOwner",
333 &"org.freedesktop.zbus.sync",
334 )
335 .unwrap();
336
337 assert!(reply
338 .body_signature()
339 .map(|s| s == bool::signature())
340 .unwrap());
341 assert!(reply.body::<bool>().unwrap());
342
343 let reply = connection
344 .call_method(
345 Some("org.freedesktop.DBus"),
346 "/org/freedesktop/DBus",
347 Some("org.freedesktop.DBus"),
348 "GetNameOwner",
349 &"org.freedesktop.zbus.sync",
350 )
351 .unwrap();
352
353 assert!(reply
354 .body_signature()
355 .map(|s| s == <&str>::signature())
356 .unwrap());
357 assert_eq!(
358 reply.body::<UniqueName<'_>>().unwrap(),
359 *connection.unique_name().unwrap(),
360 );
361
362 // GDBus doesn't provide this method
363 if is_gdbus_test() {
364 return;
365 }
366
367 let reply = connection
368 .call_method(
369 Some("org.freedesktop.DBus"),
370 "/org/freedesktop/DBus",
371 Some("org.freedesktop.DBus"),
372 "GetConnectionCredentials",
373 &"org.freedesktop.DBus",
374 )
375 .unwrap();
376
377 assert!(reply.body_signature().map(|s| s == "a{sv}").unwrap());
378 let hashmap: HashMap<&str, OwnedValue> = reply.body().unwrap();
379
380 let pid: u32 = (&hashmap["ProcessID"]).try_into().unwrap();
381 debug!("DBus bus PID: {}", pid);
382
383 #[cfg(unix)]
384 {
385 let uid: u32 = (&hashmap["UnixUserID"]).try_into().unwrap();
386 debug!("DBus bus UID: {}", uid);
387 }
388 }
389
390 #[test]
391 #[timeout(15000)]
392 fn freedesktop_api_async() {
393 block_on(test_freedesktop_api()).unwrap();
394 }
395
396 #[instrument]
397 async fn test_freedesktop_api() -> Result<()> {
398 let connection = Connection::session().await?;
399
400 let reply = connection
401 .call_method(
402 Some("org.freedesktop.DBus"),
403 "/org/freedesktop/DBus",
404 Some("org.freedesktop.DBus"),
405 "RequestName",
406 &(
407 "org.freedesktop.zbus.async",
408 BitFlags::from(RequestNameFlags::ReplaceExisting),
409 ),
410 )
411 .await
412 .unwrap();
413
414 assert!(reply.body_signature().map(|s| s == "u").unwrap());
415 let reply: RequestNameReply = reply.body().unwrap();
416 assert_eq!(reply, RequestNameReply::PrimaryOwner);
417
418 let reply = connection
419 .call_method(
420 Some("org.freedesktop.DBus"),
421 "/org/freedesktop/DBus",
422 Some("org.freedesktop.DBus"),
423 "GetId",
424 &(),
425 )
426 .await
427 .unwrap();
428
429 assert!(reply
430 .body_signature()
431 .map(|s| s == <&str>::signature())
432 .unwrap());
433 let id: &str = reply.body().unwrap();
434 debug!("Unique ID of the bus: {}", id);
435
436 let reply = connection
437 .call_method(
438 Some("org.freedesktop.DBus"),
439 "/org/freedesktop/DBus",
440 Some("org.freedesktop.DBus"),
441 "NameHasOwner",
442 &"org.freedesktop.zbus.async",
443 )
444 .await
445 .unwrap();
446
447 assert!(reply
448 .body_signature()
449 .map(|s| s == bool::signature())
450 .unwrap());
451 assert!(reply.body::<bool>().unwrap());
452
453 let reply = connection
454 .call_method(
455 Some("org.freedesktop.DBus"),
456 "/org/freedesktop/DBus",
457 Some("org.freedesktop.DBus"),
458 "GetNameOwner",
459 &"org.freedesktop.zbus.async",
460 )
461 .await
462 .unwrap();
463
464 assert!(reply
465 .body_signature()
466 .map(|s| s == <&str>::signature())
467 .unwrap());
468 assert_eq!(
469 reply.body::<UniqueName<'_>>().unwrap(),
470 *connection.unique_name().unwrap(),
471 );
472
473 // GDBus doesn't provide this method
474 if is_gdbus_test() {
475 return Ok(());
476 }
477
478 let reply = connection
479 .call_method(
480 Some("org.freedesktop.DBus"),
481 "/org/freedesktop/DBus",
482 Some("org.freedesktop.DBus"),
483 "GetConnectionCredentials",
484 &"org.freedesktop.DBus",
485 )
486 .await
487 .unwrap();
488
489 assert!(reply.body_signature().map(|s| s == "a{sv}").unwrap());
490 let hashmap: HashMap<&str, OwnedValue> = reply.body().unwrap();
491
492 let pid: u32 = (&hashmap["ProcessID"]).try_into().unwrap();
493 debug!("DBus bus PID: {}", pid);
494
495 #[cfg(unix)]
496 {
497 let uid: u32 = (&hashmap["UnixUserID"]).try_into().unwrap();
498 debug!("DBus bus UID: {}", uid);
499 }
500
501 Ok(())
502 }
503
504 #[test]
505 #[timeout(15000)]
506 fn issue_68() {
507 // Tests the fix for https://github.com/dbus2/zbus/issues/68
508 //
509 // While this is not an exact reproduction of the issue 68, the underlying problem it
510 // produces is exactly the same: `Connection::call_method` dropping all incoming messages
511 // while waiting for the reply to the method call.
512 let conn = blocking::Connection::session().unwrap();
513 let stream = MessageIterator::from(&conn);
514
515 // Send a message as client before service starts to process messages
516 let client_conn = blocking::Connection::session().unwrap();
517 let destination = conn.unique_name().map(UniqueName::<'_>::from);
518 let msg = Message::method(
519 None::<()>,
520 destination,
521 "/org/freedesktop/Issue68",
522 Some("org.freedesktop.Issue68"),
523 "Ping",
524 &(),
525 )
526 .unwrap();
527 let serial = client_conn.send_message(msg).unwrap();
528
529 crate::blocking::fdo::DBusProxy::new(&conn)
530 .unwrap()
531 .get_id()
532 .unwrap();
533
534 for m in stream {
535 let msg = m.unwrap();
536
537 if *msg.primary_header().serial_num().unwrap() == serial {
538 break;
539 }
540 }
541 }
542
543 #[test]
544 #[timeout(15000)]
545 fn issue104() {
546 // Tests the fix for https://github.com/dbus2/zbus/issues/104
547 //
548 // The issue is caused by `dbus_proxy` macro adding `()` around the return value of methods
549 // with multiple out arguments, ending up with double parenthesis around the signature of
550 // the return type and zbus only removing the outer `()` only and then it not matching the
551 // signature we receive on the reply message.
552 use zvariant::{ObjectPath, Value};
553
554 struct Secret;
555 #[super::dbus_interface(name = "org.freedesktop.Secret.Service")]
556 impl Secret {
557 fn open_session(
558 &self,
559 _algorithm: &str,
560 input: Value<'_>,
561 ) -> zbus::fdo::Result<(OwnedValue, OwnedObjectPath)> {
562 Ok((
563 OwnedValue::from(input),
564 ObjectPath::try_from("/org/freedesktop/secrets/Blah")
565 .unwrap()
566 .into(),
567 ))
568 }
569 }
570
571 let secret = Secret;
572 let conn = blocking::ConnectionBuilder::session()
573 .unwrap()
574 .serve_at("/org/freedesktop/secrets", secret)
575 .unwrap()
576 .build()
577 .unwrap();
578 let service_name = conn.unique_name().unwrap().clone();
579
580 {
581 let conn = blocking::Connection::session().unwrap();
582 #[super::dbus_proxy(
583 interface = "org.freedesktop.Secret.Service",
584 assume_defaults = true,
585 gen_async = false
586 )]
587 trait Secret {
588 fn open_session(
589 &self,
590 algorithm: &str,
591 input: &zvariant::Value<'_>,
592 ) -> zbus::Result<(OwnedValue, OwnedObjectPath)>;
593 }
594
595 let proxy = SecretProxy::builder(&conn)
596 .destination(UniqueName::from(service_name))
597 .unwrap()
598 .path("/org/freedesktop/secrets")
599 .unwrap()
600 .build()
601 .unwrap();
602
603 trace!("Calling open_session");
604 proxy.open_session("plain", &Value::from("")).unwrap();
605 trace!("Called open_session");
606 };
607 }
608
609 // This one we just want to see if it builds, no need to run it. For details see:
610 //
611 // https://github.com/dbus2/zbus/issues/121
612 #[test]
613 #[ignore]
614 fn issue_121() {
615 use crate::dbus_proxy;
616
617 #[dbus_proxy(interface = "org.freedesktop.IBus", assume_defaults = true)]
618 trait IBus {
619 /// CurrentInputContext property
620 #[dbus_proxy(property)]
621 fn current_input_context(&self) -> zbus::Result<OwnedObjectPath>;
622
623 /// Engines property
624 #[dbus_proxy(property)]
625 fn engines(&self) -> zbus::Result<Vec<zvariant::OwnedValue>>;
626 }
627 }
628
629 #[test]
630 #[timeout(15000)]
631 fn issue_122() {
632 let conn = blocking::Connection::session().unwrap();
633 let stream = MessageIterator::from(&conn);
634
635 #[allow(clippy::mutex_atomic)]
636 let pair = Arc::new((Mutex::new(false), Condvar::new()));
637 let pair2 = Arc::clone(&pair);
638
639 let child = std::thread::spawn(move || {
640 {
641 let (lock, cvar) = &*pair2;
642 let mut started = lock.lock().unwrap();
643 *started = true;
644 cvar.notify_one();
645 }
646
647 for m in stream {
648 let msg = m.unwrap();
649 let hdr = msg.header().unwrap();
650
651 if hdr.member().unwrap().map(|m| m.as_str()) == Some("ZBusIssue122") {
652 break;
653 }
654 }
655 });
656
657 // Wait for the receiving thread to start up.
658 let (lock, cvar) = &*pair;
659 let mut started = lock.lock().unwrap();
660 while !*started {
661 started = cvar.wait(started).unwrap();
662 }
663 // Still give it some milliseconds to ensure it's already blocking on receive_message call
664 // when we send a message.
665 std::thread::sleep(std::time::Duration::from_millis(100));
666
667 let destination = conn.unique_name().map(UniqueName::<'_>::from);
668 let msg = Message::method(
669 None::<()>,
670 destination,
671 "/does/not/matter",
672 None::<()>,
673 "ZBusIssue122",
674 &(),
675 )
676 .unwrap();
677 conn.send_message(msg).unwrap();
678
679 child.join().unwrap();
680 }
681
682 #[test]
683 #[ignore]
684 fn issue_81() {
685 use zbus::dbus_proxy;
686 use zvariant::{OwnedValue, Type};
687
688 #[derive(
689 Debug, PartialEq, Eq, Clone, Type, OwnedValue, serde::Serialize, serde::Deserialize,
690 )]
691 pub struct DbusPath {
692 id: String,
693 path: OwnedObjectPath,
694 }
695
696 #[dbus_proxy(assume_defaults = true)]
697 trait Session {
698 #[dbus_proxy(property)]
699 fn sessions_tuple(&self) -> zbus::Result<(String, String)>;
700
701 #[dbus_proxy(property)]
702 fn sessions_struct(&self) -> zbus::Result<DbusPath>;
703 }
704 }
705
706 #[test]
707 #[timeout(15000)]
708 fn issue173() {
709 // Tests the fix for https://github.com/dbus2/zbus/issues/173
710 //
711 // The issue is caused by proxy not keeping track of its destination's owner changes
712 // (service restart) and failing to receive signals as a result.
713 let (tx, rx) = channel();
714 let child = std::thread::spawn(move || {
715 let conn = blocking::Connection::session().unwrap();
716 #[super::dbus_proxy(
717 interface = "org.freedesktop.zbus.ComeAndGo",
718 default_service = "org.freedesktop.zbus.ComeAndGo",
719 default_path = "/org/freedesktop/zbus/ComeAndGo"
720 )]
721 trait ComeAndGo {
722 #[dbus_proxy(signal)]
723 fn the_signal(&self) -> zbus::Result<()>;
724 }
725
726 let proxy = ComeAndGoProxyBlocking::new(&conn).unwrap();
727 let signals = proxy.receive_the_signal().unwrap();
728 tx.send(()).unwrap();
729
730 // We receive two signals, each time from different unique names. W/o the fix for
731 // issue#173, the second iteration hangs.
732 for _ in signals.take(2) {
733 tx.send(()).unwrap();
734 }
735 });
736
737 struct ComeAndGo;
738 #[super::dbus_interface(name = "org.freedesktop.zbus.ComeAndGo")]
739 impl ComeAndGo {
740 #[dbus_interface(signal)]
741 async fn the_signal(signal_ctxt: &SignalContext<'_>) -> zbus::Result<()>;
742 }
743
744 rx.recv().unwrap();
745 for _ in 0..2 {
746 let conn = blocking::ConnectionBuilder::session()
747 .unwrap()
748 .serve_at("/org/freedesktop/zbus/ComeAndGo", ComeAndGo)
749 .unwrap()
750 .name("org.freedesktop.zbus.ComeAndGo")
751 .unwrap()
752 .build()
753 .unwrap();
754
755 let iface_ref = conn
756 .object_server()
757 .interface::<_, ComeAndGo>("/org/freedesktop/zbus/ComeAndGo")
758 .unwrap();
759 block_on(ComeAndGo::the_signal(iface_ref.signal_context())).unwrap();
760
761 rx.recv().unwrap();
762
763 // Now we release the name ownership to use a different connection (i-e new unique
764 // name).
765 conn.release_name("org.freedesktop.zbus.ComeAndGo").unwrap();
766 }
767
768 child.join().unwrap();
769 }
770
771 #[test]
772 #[timeout(15000)]
773 fn uncached_property() {
774 block_on(test_uncached_property()).unwrap();
775 }
776
777 async fn test_uncached_property() -> Result<()> {
778 // A dummy boolean test service. It starts as `false` and can be
779 // flipped to `true`. Two properties can access the inner value, with
780 // and without caching.
781 #[derive(Default)]
782 struct ServiceUncachedPropertyTest(bool);
783 #[crate::dbus_interface(name = "org.freedesktop.zbus.UncachedPropertyTest")]
784 impl ServiceUncachedPropertyTest {
785 #[dbus_interface(property)]
786 fn cached_prop(&self) -> bool {
787 self.0
788 }
789 #[dbus_interface(property)]
790 fn uncached_prop(&self) -> bool {
791 self.0
792 }
793 async fn set_inner_to_true(&mut self) -> zbus::fdo::Result<()> {
794 self.0 = true;
795 Ok(())
796 }
797 }
798
799 #[crate::dbus_proxy(
800 interface = "org.freedesktop.zbus.UncachedPropertyTest",
801 default_service = "org.freedesktop.zbus.UncachedPropertyTest",
802 default_path = "/org/freedesktop/zbus/UncachedPropertyTest"
803 )]
804 trait UncachedPropertyTest {
805 #[dbus_proxy(property)]
806 fn cached_prop(&self) -> zbus::Result<bool>;
807
808 #[dbus_proxy(property(emits_changed_signal = "false"))]
809 fn uncached_prop(&self) -> zbus::Result<bool>;
810
811 fn set_inner_to_true(&self) -> zbus::Result<()>;
812 }
813
814 let service = crate::ConnectionBuilder::session()
815 .unwrap()
816 .serve_at(
817 "/org/freedesktop/zbus/UncachedPropertyTest",
818 ServiceUncachedPropertyTest(false),
819 )
820 .unwrap()
821 .build()
822 .await
823 .unwrap();
824
825 let dest = service.unique_name().unwrap();
826
827 let client_conn = crate::Connection::session().await.unwrap();
828 let client = UncachedPropertyTestProxy::builder(&client_conn)
829 .destination(dest)
830 .unwrap()
831 .build()
832 .await
833 .unwrap();
834
835 // Query properties; this populates the cache too.
836 assert!(!client.cached_prop().await.unwrap());
837 assert!(!client.uncached_prop().await.unwrap());
838
839 // Flip the inner value so we can observe the different semantics of
840 // the two properties.
841 client.set_inner_to_true().await.unwrap();
842
843 // Query properties again; the first one should incur a stale read from
844 // cache, while the second one should be able to read the live/updated
845 // value.
846 assert!(!client.cached_prop().await.unwrap());
847 assert!(client.uncached_prop().await.unwrap());
848
849 Ok(())
850 }
851
852 #[test]
853 #[timeout(15000)]
854 fn issue_260() {
855 // Low-level server example in the book doesn't work. The reason was that
856 // `Connection::request_name` implicitly created the associated `ObjectServer` to avoid
857 // #68. This meant that the `ObjectServer` ended up replying to the incoming method call
858 // with an error, before the service code could do so.
859 block_on(async {
860 let connection = Connection::session().await?;
861
862 connection.request_name("org.zbus.Issue260").await?;
863
864 futures_util::try_join!(
865 issue_260_service(&connection),
866 issue_260_client(&connection),
867 )?;
868
869 Ok::<(), zbus::Error>(())
870 })
871 .unwrap();
872 }
873
874 async fn issue_260_service(connection: &Connection) -> Result<()> {
875 use futures_util::stream::TryStreamExt;
876
877 let mut stream = zbus::MessageStream::from(connection);
878 while let Some(msg) = stream.try_next().await? {
879 let msg_header = msg.header()?;
880
881 match msg_header.message_type()? {
882 zbus::MessageType::MethodCall => {
883 connection.reply(&msg, &()).await?;
884
885 break;
886 }
887 _ => continue,
888 }
889 }
890
891 Ok(())
892 }
893
894 async fn issue_260_client(connection: &Connection) -> Result<()> {
895 zbus::Proxy::new(
896 connection,
897 "org.zbus.Issue260",
898 "/org/zbus/Issue260",
899 "org.zbus.Issue260",
900 )
901 .await?
902 .call("Whatever", &())
903 .await?;
904 Ok(())
905 }
906
907 #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
908 // Issue specific to tokio runtime.
909 #[cfg(all(unix, feature = "tokio"))]
910 #[instrument]
911 async fn issue_279() {
912 // On failure to read from the socket, we were closing the error channel from the sender
913 // side and since the underlying tokio API doesn't provide a `close` method on the sender,
914 // the async-channel abstraction was achieving this through calling `close` on receiver,
915 // which is behind an async mutex and we end up with a deadlock.
916 use crate::{ConnectionBuilder, MessageStream};
917 use futures_util::{stream::TryStreamExt, try_join};
918 use tokio::net::UnixStream;
919
920 let guid = crate::Guid::generate();
921 let (p0, p1) = UnixStream::pair().unwrap();
922
923 let server = ConnectionBuilder::unix_stream(p0)
924 .server(&guid)
925 .p2p()
926 .build();
927 let client = ConnectionBuilder::unix_stream(p1).p2p().build();
928 let (client, server) = try_join!(client, server).unwrap();
929 let mut stream = MessageStream::from(client);
930 let next_msg_fut = stream.try_next();
931
932 drop(server);
933
934 assert!(matches!(next_msg_fut.await, Err(_)));
935 }
936
937 #[test(tokio::test(flavor = "multi_thread"))]
938 // Issue specific to tokio runtime.
939 #[cfg(all(unix, feature = "tokio"))]
940 #[instrument]
941 async fn issue_310() {
942 // The issue was we were deadlocking on fetching the new property value after invalidation.
943 // This turned out to be caused by us trying to grab a read lock on resource while holding
944 // a write lock. Thanks to connman for being weird and invalidating the property just before
945 // updating it, so this issue could be exposed.
946 use futures_util::StreamExt;
947 use zbus::ConnectionBuilder;
948
949 struct Station(u64);
950
951 #[zbus::dbus_interface(name = "net.connman.iwd.Station")]
952 impl Station {
953 #[dbus_interface(property)]
954 fn connected_network(&self) -> OwnedObjectPath {
955 format!("/net/connman/iwd/0/33/Network/{}", self.0)
956 .try_into()
957 .unwrap()
958 }
959 }
960
961 #[zbus::dbus_proxy(
962 interface = "net.connman.iwd.Station",
963 default_service = "net.connman.iwd"
964 )]
965 trait Station {
966 #[dbus_proxy(property)]
967 fn connected_network(&self) -> zbus::Result<OwnedObjectPath>;
968 }
969 let connection = ConnectionBuilder::session()
970 .unwrap()
971 .serve_at("/net/connman/iwd/0/33", Station(0))
972 .unwrap()
973 .name("net.connman.iwd")
974 .unwrap()
975 .build()
976 .await
977 .unwrap();
978 let event = Arc::new(event_listener::Event::new());
979 let conn_clone = connection.clone();
980 let event_clone = event.clone();
981 tokio::spawn(async move {
982 for _ in 0..10 {
983 let listener = event_clone.listen();
984 let iface_ref = conn_clone
985 .object_server()
986 .interface::<_, Station>("/net/connman/iwd/0/33")
987 .await
988 .unwrap();
989
990 {
991 let iface = iface_ref.get().await;
992 iface
993 .connected_network_invalidate(iface_ref.signal_context())
994 .await
995 .unwrap();
996 iface
997 .connected_network_changed(iface_ref.signal_context())
998 .await
999 .unwrap();
1000 }
1001 listener.await;
1002 iface_ref.get_mut().await.0 += 1;
1003 }
1004 });
1005
1006 let station = StationProxy::builder(&connection)
1007 .path("/net/connman/iwd/0/33")
1008 .unwrap()
1009 .build()
1010 .await
1011 .unwrap();
1012
1013 let mut changes = station.receive_connected_network_changed().await;
1014
1015 let mut last_received = 0;
1016 while last_received < 9 {
1017 let change = changes.next().await.unwrap();
1018 let path = change.get().await.unwrap();
1019 let received: u64 = path
1020 .split('/')
1021 .last()
1022 .unwrap()
1023 .parse()
1024 .expect("invalid path");
1025 assert!(received >= last_received);
1026 last_received = received;
1027 event.notify(1);
1028 }
1029 }
1030}
1031