1//! Connection API.
2use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender as Broadcaster};
3use enumflags2::BitFlags;
4use event_listener::{Event, EventListener};
5use ordered_stream::{OrderedFuture, OrderedStream, PollResult};
6use static_assertions::assert_impl_all;
7use std::{
8 collections::HashMap,
9 io::{self, ErrorKind},
10 num::NonZeroU32,
11 ops::Deref,
12 pin::Pin,
13 sync::{Arc, OnceLock, Weak},
14 task::{Context, Poll},
15};
16use tracing::{debug, info_span, instrument, trace, trace_span, warn, Instrument};
17use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
18use zvariant::ObjectPath;
19
20use futures_core::Future;
21use futures_util::StreamExt;
22
23use crate::{
24 async_lock::{Mutex, Semaphore, SemaphorePermit},
25 blocking,
26 fdo::{self, ConnectionCredentials, RequestNameFlags, RequestNameReply},
27 is_flatpak,
28 message::{Flags, Message, Type},
29 proxy::CacheProperties,
30 DBusError, Error, Executor, MatchRule, MessageStream, ObjectServer, OwnedGuid, OwnedMatchRule,
31 Result, Task,
32};
33
34mod builder;
35pub use builder::Builder;
36
37pub mod socket;
38pub use socket::Socket;
39
40mod socket_reader;
41use socket_reader::SocketReader;
42
43pub(crate) mod handshake;
44use handshake::Authenticated;
45
46const DEFAULT_MAX_QUEUED: usize = 64;
47const DEFAULT_MAX_METHOD_RETURN_QUEUED: usize = 8;
48
49/// Inner state shared by Connection and WeakConnection
50#[derive(Debug)]
51pub(crate) struct ConnectionInner {
52 server_guid: OwnedGuid,
53 #[cfg(unix)]
54 cap_unix_fd: bool,
55 #[cfg(feature = "p2p")]
56 bus_conn: bool,
57 unique_name: OnceLock<OwnedUniqueName>,
58 registered_names: Mutex<HashMap<WellKnownName<'static>, NameStatus>>,
59
60 activity_event: Arc<Event>,
61 socket_write: Mutex<Box<dyn socket::WriteHalf>>,
62
63 // Our executor
64 executor: Executor<'static>,
65
66 // Socket reader task
67 #[allow(unused)]
68 socket_reader_task: OnceLock<Task<()>>,
69
70 pub(crate) msg_receiver: InactiveReceiver<Result<Message>>,
71 pub(crate) method_return_receiver: InactiveReceiver<Result<Message>>,
72 msg_senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
73
74 subscriptions: Mutex<Subscriptions>,
75
76 object_server: OnceLock<blocking::ObjectServer>,
77 object_server_dispatch_task: OnceLock<Task<()>>,
78}
79
80type Subscriptions = HashMap<OwnedMatchRule, (u64, InactiveReceiver<Result<Message>>)>;
81
82pub(crate) type MsgBroadcaster = Broadcaster<Result<Message>>;
83
84/// A D-Bus connection.
85///
86/// A connection to a D-Bus bus, or a direct peer.
87///
88/// Once created, the connection is authenticated and negotiated and messages can be sent or
89/// received, such as [method calls] or [signals].
90///
91/// For higher-level message handling (typed functions, introspection, documentation reasons etc),
92/// it is recommended to wrap the low-level D-Bus messages into Rust functions with the
93/// [`proxy`] and [`interface`] macros instead of doing it directly on a `Connection`.
94///
95/// Typically, a connection is made to the session bus with [`Connection::session`], or to the
96/// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`]
97/// instances or the on-demand [`ObjectServer`] instance that can be accessed through
98/// [`Connection::object_server`].
99///
100/// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying
101/// data is not cloned. This makes it very convenient to share the connection between different
102/// parts of your code. `Connection` also implements [`std::marker::Sync`] and [`std::marker::Send`]
103/// so you can send and share a connection instance across threads as well.
104///
105/// `Connection` keeps internal queues of incoming message. The default capacity of each of these is
106/// 64. The capacity of the main (unfiltered) queue is configurable through the [`set_max_queued`]
107/// method. When the queue is full, no more messages can be received until room is created for more.
108/// This is why it's important to ensure that all [`crate::MessageStream`] and
109/// [`crate::blocking::MessageIterator`] instances are continuously polled and iterated on,
110/// respectively.
111///
112/// For sending messages you can either use [`Connection::send`] method.
113///
114/// [method calls]: struct.Connection.html#method.call_method
115/// [signals]: struct.Connection.html#method.emit_signal
116/// [`proxy`]: attr.proxy.html
117/// [`interface`]: attr.interface.html
118/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
119/// [`set_max_queued`]: struct.Connection.html#method.set_max_queued
120///
121/// ### Examples
122///
123/// #### Get the session bus ID
124///
125/// ```
126/// # zbus::block_on(async {
127/// use zbus::Connection;
128///
129/// let connection = Connection::session().await?;
130///
131/// let reply_body = connection
132/// .call_method(
133/// Some("org.freedesktop.DBus"),
134/// "/org/freedesktop/DBus",
135/// Some("org.freedesktop.DBus"),
136/// "GetId",
137/// &(),
138/// )
139/// .await?
140/// .body();
141///
142/// let id: &str = reply_body.deserialize()?;
143/// println!("Unique ID of the bus: {}", id);
144/// # Ok::<(), zbus::Error>(())
145/// # }).unwrap();
146/// ```
147///
148/// #### Monitoring all messages
149///
150/// Let's eavesdrop on the session bus 😈 using the [Monitor] interface:
151///
152/// ```rust,no_run
153/// # zbus::block_on(async {
154/// use futures_util::stream::TryStreamExt;
155/// use zbus::{Connection, MessageStream};
156///
157/// let connection = Connection::session().await?;
158///
159/// connection
160/// .call_method(
161/// Some("org.freedesktop.DBus"),
162/// "/org/freedesktop/DBus",
163/// Some("org.freedesktop.DBus.Monitoring"),
164/// "BecomeMonitor",
165/// &(&[] as &[&str], 0u32),
166/// )
167/// .await?;
168///
169/// let mut stream = MessageStream::from(connection);
170/// while let Some(msg) = stream.try_next().await? {
171/// println!("Got message: {}", msg);
172/// }
173///
174/// # Ok::<(), zbus::Error>(())
175/// # }).unwrap();
176/// ```
177///
178/// This should print something like:
179///
180/// ```console
181/// Got message: Signal NameAcquired from org.freedesktop.DBus
182/// Got message: Signal NameLost from org.freedesktop.DBus
183/// Got message: Method call GetConnectionUnixProcessID from :1.1324
184/// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner:
185/// Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus
186/// Got message: Method call AddMatch from :1.918
187/// Got message: Method return from org.freedesktop.DBus
188/// ```
189///
190/// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor
191#[derive(Clone, Debug)]
192#[must_use = "Dropping a `Connection` will close the underlying socket."]
193pub struct Connection {
194 pub(crate) inner: Arc<ConnectionInner>,
195}
196
197assert_impl_all!(Connection: Send, Sync, Unpin);
198
199/// A method call whose completion can be awaited or joined with other streams.
200///
201/// This is useful for cache population method calls, where joining the [`JoinableStream`] with
202/// an update signal stream can be used to ensure that cache updates are not overwritten by a cache
203/// population whose task is scheduled later.
204#[derive(Debug)]
205pub(crate) struct PendingMethodCall {
206 stream: Option<MessageStream>,
207 serial: NonZeroU32,
208}
209
210impl Future for PendingMethodCall {
211 type Output = Result<Message>;
212
213 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214 self.poll_before(cx, before:None).map(|ret: Option<(Sequence, Result<…, …>)>| {
215 ret.map(|(_, r: Result)| r).unwrap_or_else(|| {
216 Err(crate::Error::InputOutput(
217 io::Error::new(kind:ErrorKind::BrokenPipe, error:"socket closed").into(),
218 ))
219 })
220 })
221 }
222}
223
224impl OrderedFuture for PendingMethodCall {
225 type Output = Result<Message>;
226 type Ordering = zbus::message::Sequence;
227
228 fn poll_before(
229 self: Pin<&mut Self>,
230 cx: &mut Context<'_>,
231 before: Option<&Self::Ordering>,
232 ) -> Poll<Option<(Self::Ordering, Self::Output)>> {
233 let this = self.get_mut();
234 if let Some(stream) = &mut this.stream {
235 loop {
236 match Pin::new(&mut *stream).poll_next_before(cx, before) {
237 Poll::Ready(PollResult::Item {
238 data: Ok(msg),
239 ordering,
240 }) => {
241 if msg.header().reply_serial() != Some(this.serial) {
242 continue;
243 }
244 let res = match msg.message_type() {
245 Type::Error => Err(msg.into()),
246 Type::MethodReturn => Ok(msg),
247 _ => continue,
248 };
249 this.stream = None;
250 return Poll::Ready(Some((ordering, res)));
251 }
252 Poll::Ready(PollResult::Item {
253 data: Err(e),
254 ordering,
255 }) => {
256 return Poll::Ready(Some((ordering, Err(e))));
257 }
258
259 Poll::Ready(PollResult::NoneBefore) => {
260 return Poll::Ready(None);
261 }
262 Poll::Ready(PollResult::Terminated) => {
263 return Poll::Ready(None);
264 }
265 Poll::Pending => return Poll::Pending,
266 }
267 }
268 }
269 Poll::Ready(None)
270 }
271}
272
273impl Connection {
274 /// Send `msg` to the peer.
275 pub async fn send(&self, msg: &Message) -> Result<()> {
276 #[cfg(unix)]
277 if !msg.data().fds().is_empty() && !self.inner.cap_unix_fd {
278 return Err(Error::Unsupported);
279 }
280
281 self.inner.activity_event.notify(usize::MAX);
282 let mut write = self.inner.socket_write.lock().await;
283
284 write.send_message(msg).await
285 }
286
287 /// Send a method call.
288 ///
289 /// Create a method-call message, send it over the connection, then wait for the reply.
290 ///
291 /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
292 /// error replies are returned as [`Error::MethodError`].
293 pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
294 &self,
295 destination: Option<D>,
296 path: P,
297 interface: Option<I>,
298 method_name: M,
299 body: &B,
300 ) -> Result<Message>
301 where
302 D: TryInto<BusName<'d>>,
303 P: TryInto<ObjectPath<'p>>,
304 I: TryInto<InterfaceName<'i>>,
305 M: TryInto<MemberName<'m>>,
306 D::Error: Into<Error>,
307 P::Error: Into<Error>,
308 I::Error: Into<Error>,
309 M::Error: Into<Error>,
310 B: serde::ser::Serialize + zvariant::DynamicType,
311 {
312 self.call_method_raw(
313 destination,
314 path,
315 interface,
316 method_name,
317 BitFlags::empty(),
318 body,
319 )
320 .await?
321 .expect("no reply")
322 .await
323 }
324
325 /// Send a method call.
326 ///
327 /// Send the given message, which must be a method call, over the connection and return an
328 /// object that allows the reply to be retrieved. Typically you'd want to use
329 /// [`Connection::call_method`] instead.
330 ///
331 /// If the `flags` do not contain `MethodFlags::NoReplyExpected`, the return value is
332 /// guaranteed to be `Ok(Some(_))`, if there was no error encountered.
333 ///
334 /// INTERNAL NOTE: If this method is ever made pub, flags should become `BitFlags<MethodFlags>`.
335 pub(crate) async fn call_method_raw<'d, 'p, 'i, 'm, D, P, I, M, B>(
336 &self,
337 destination: Option<D>,
338 path: P,
339 interface: Option<I>,
340 method_name: M,
341 flags: BitFlags<Flags>,
342 body: &B,
343 ) -> Result<Option<PendingMethodCall>>
344 where
345 D: TryInto<BusName<'d>>,
346 P: TryInto<ObjectPath<'p>>,
347 I: TryInto<InterfaceName<'i>>,
348 M: TryInto<MemberName<'m>>,
349 D::Error: Into<Error>,
350 P::Error: Into<Error>,
351 I::Error: Into<Error>,
352 M::Error: Into<Error>,
353 B: serde::ser::Serialize + zvariant::DynamicType,
354 {
355 let _permit = acquire_serial_num_semaphore().await;
356
357 let mut builder = Message::method(path, method_name)?;
358 if let Some(sender) = self.unique_name() {
359 builder = builder.sender(sender)?
360 }
361 if let Some(destination) = destination {
362 builder = builder.destination(destination)?
363 }
364 if let Some(interface) = interface {
365 builder = builder.interface(interface)?
366 }
367 for flag in flags {
368 builder = builder.with_flags(flag)?;
369 }
370 let msg = builder.build(body)?;
371
372 let msg_receiver = self.inner.method_return_receiver.activate_cloned();
373 let stream = Some(MessageStream::for_subscription_channel(
374 msg_receiver,
375 // This is a lie but we only use the stream internally so it's fine.
376 None,
377 self,
378 ));
379 let serial = msg.primary_header().serial_num();
380 self.send(&msg).await?;
381 if flags.contains(Flags::NoReplyExpected) {
382 Ok(None)
383 } else {
384 Ok(Some(PendingMethodCall { stream, serial }))
385 }
386 }
387
388 /// Emit a signal.
389 ///
390 /// Create a signal message, and send it over the connection.
391 pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
392 &self,
393 destination: Option<D>,
394 path: P,
395 interface: I,
396 signal_name: M,
397 body: &B,
398 ) -> Result<()>
399 where
400 D: TryInto<BusName<'d>>,
401 P: TryInto<ObjectPath<'p>>,
402 I: TryInto<InterfaceName<'i>>,
403 M: TryInto<MemberName<'m>>,
404 D::Error: Into<Error>,
405 P::Error: Into<Error>,
406 I::Error: Into<Error>,
407 M::Error: Into<Error>,
408 B: serde::ser::Serialize + zvariant::DynamicType,
409 {
410 let _permit = acquire_serial_num_semaphore().await;
411
412 let mut b = Message::signal(path, interface, signal_name)?;
413 if let Some(sender) = self.unique_name() {
414 b = b.sender(sender)?;
415 }
416 if let Some(destination) = destination {
417 b = b.destination(destination)?;
418 }
419 let m = b.build(body)?;
420
421 self.send(&m).await
422 }
423
424 /// Reply to a message.
425 ///
426 /// Given an existing message (likely a method call), send a reply back to the caller with the
427 /// given `body`.
428 pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<()>
429 where
430 B: serde::ser::Serialize + zvariant::DynamicType,
431 {
432 let _permit = acquire_serial_num_semaphore().await;
433
434 let mut b = Message::method_reply(call)?;
435 if let Some(sender) = self.unique_name() {
436 b = b.sender(sender)?;
437 }
438 let m = b.build(body)?;
439 self.send(&m).await
440 }
441
442 /// Reply an error to a message.
443 ///
444 /// Given an existing message (likely a method call), send an error reply back to the caller
445 /// with the given `error_name` and `body`.
446 pub async fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<()>
447 where
448 B: serde::ser::Serialize + zvariant::DynamicType,
449 E: TryInto<ErrorName<'e>>,
450 E::Error: Into<Error>,
451 {
452 let _permit = acquire_serial_num_semaphore().await;
453
454 let mut b = Message::method_error(call, error_name)?;
455 if let Some(sender) = self.unique_name() {
456 b = b.sender(sender)?;
457 }
458 let m = b.build(body)?;
459 self.send(&m).await
460 }
461
462 /// Reply an error to a message.
463 ///
464 /// Given an existing message (likely a method call), send an error reply back to the caller
465 /// using one of the standard interface reply types.
466 pub async fn reply_dbus_error(
467 &self,
468 call: &zbus::message::Header<'_>,
469 err: impl DBusError,
470 ) -> Result<()> {
471 let _permit = acquire_serial_num_semaphore().await;
472
473 let m = err.create_reply(call)?;
474 self.send(&m).await
475 }
476
477 /// Register a well-known name for this connection.
478 ///
479 /// When connecting to a bus, the name is requested from the bus. In case of p2p connection, the
480 /// name (if requested) is used of self-identification.
481 ///
482 /// You can request multiple names for the same connection. Use [`Connection::release_name`] for
483 /// deregistering names registered through this method.
484 ///
485 /// Note that exclusive ownership without queueing is requested (using
486 /// [`RequestNameFlags::ReplaceExisting`] and [`RequestNameFlags::DoNotQueue`] flags) since that
487 /// is the most typical case. If that is not what you want, you should use
488 /// [`Connection::request_name_with_flags`] instead (but make sure then that name is requested
489 /// **after** you've setup your service implementation with the `ObjectServer`).
490 ///
491 /// # Caveats
492 ///
493 /// The associated `ObjectServer` will only handle method calls destined for the unique name of
494 /// this connection or any of the registered well-known names. If no well-known name is
495 /// registered, the method calls destined to all well-known names will be handled.
496 ///
497 /// Since names registered through any other means than `Connection` or [`Builder`]
498 /// API are not known to the connection, method calls destined to those names will only be
499 /// handled by the associated `ObjectServer` if none of the names are registered through
500 /// `Connection*` API. Simply put, either register all the names through `Connection*` API or
501 /// none of them.
502 ///
503 /// # Errors
504 ///
505 /// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer.
506 pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
507 where
508 W: TryInto<WellKnownName<'w>>,
509 W::Error: Into<Error>,
510 {
511 self.request_name_with_flags(
512 well_known_name,
513 RequestNameFlags::ReplaceExisting | RequestNameFlags::DoNotQueue,
514 )
515 .await
516 .map(|_| ())
517 }
518
519 /// Register a well-known name for this connection.
520 ///
521 /// This is the same as [`Connection::request_name`] but allows to specify the flags to use when
522 /// requesting the name.
523 ///
524 /// If the [`RequestNameFlags::DoNotQueue`] flag is not specified and request ends up in the
525 /// queue, you can use [`fdo::NameAcquiredStream`] to be notified when the name is acquired. A
526 /// queued name request can be cancelled using [`Connection::release_name`].
527 ///
528 /// If the [`RequestNameFlags::AllowReplacement`] flag is specified, the requested name can be
529 /// lost if another peer requests the same name. You can use [`fdo::NameLostStream`] to be
530 /// notified when the name is lost
531 ///
532 /// # Example
533 ///
534 /// ```
535 /// #
536 /// # zbus::block_on(async {
537 /// use zbus::{Connection, fdo::{DBusProxy, RequestNameFlags, RequestNameReply}};
538 /// use enumflags2::BitFlags;
539 /// use futures_util::stream::StreamExt;
540 ///
541 /// let name = "org.freedesktop.zbus.QueuedNameTest";
542 /// let conn1 = Connection::session().await?;
543 /// // This should just work right away.
544 /// conn1.request_name(name).await?;
545 ///
546 /// let conn2 = Connection::session().await?;
547 /// // A second request from the another connection will fail with `DoNotQueue` flag, which is
548 /// // implicit with `request_name` method.
549 /// assert!(conn2.request_name(name).await.is_err());
550 ///
551 /// // Now let's try w/o `DoNotQueue` and we should be queued.
552 /// let reply = conn2
553 /// .request_name_with_flags(name, RequestNameFlags::AllowReplacement.into())
554 /// .await?;
555 /// assert_eq!(reply, RequestNameReply::InQueue);
556 /// // Another request should just give us the same response.
557 /// let reply = conn2
558 /// // The flags on subsequent requests will however be ignored.
559 /// .request_name_with_flags(name, BitFlags::empty())
560 /// .await?;
561 /// assert_eq!(reply, RequestNameReply::InQueue);
562 /// let mut acquired_stream = DBusProxy::new(&conn2)
563 /// .await?
564 /// .receive_name_acquired()
565 /// .await?;
566 /// assert!(conn1.release_name(name).await?);
567 /// // This would have waited forever if `conn1` hadn't just release the name.
568 /// let acquired = acquired_stream.next().await.unwrap();
569 /// assert_eq!(acquired.args().unwrap().name, name);
570 ///
571 /// // conn2 made the mistake of being too nice and allowed name replacemnt, so conn1 should be
572 /// // able to take it back.
573 /// let mut lost_stream = DBusProxy::new(&conn2)
574 /// .await?
575 /// .receive_name_lost()
576 /// .await?;
577 /// conn1.request_name(name).await?;
578 /// let lost = lost_stream.next().await.unwrap();
579 /// assert_eq!(lost.args().unwrap().name, name);
580 ///
581 /// # Ok::<(), zbus::Error>(())
582 /// # }).unwrap();
583 /// ```
584 ///
585 /// # Caveats
586 ///
587 /// * Same as that of [`Connection::request_name`].
588 /// * If you wish to track changes to name ownership after this call, make sure that the
589 /// [`fdo::NameAcquired`] and/or [`fdo::NameLostStream`] instance(s) are created **before**
590 /// calling this method. Otherwise, you may loose the signal if it's emitted after this call but
591 /// just before the stream instance get created.
592 pub async fn request_name_with_flags<'w, W>(
593 &self,
594 well_known_name: W,
595 flags: BitFlags<RequestNameFlags>,
596 ) -> Result<RequestNameReply>
597 where
598 W: TryInto<WellKnownName<'w>>,
599 W::Error: Into<Error>,
600 {
601 let well_known_name = well_known_name.try_into().map_err(Into::into)?;
602 // We keep the lock until the end of this function so that the (possibly) spawned task
603 // doesn't end up accessing the name entry before it's inserted.
604 let mut names = self.inner.registered_names.lock().await;
605
606 match names.get(&well_known_name) {
607 Some(NameStatus::Owner(_)) => return Ok(RequestNameReply::AlreadyOwner),
608 Some(NameStatus::Queued(_)) => return Ok(RequestNameReply::InQueue),
609 None => (),
610 }
611
612 if !self.is_bus() {
613 names.insert(well_known_name.to_owned(), NameStatus::Owner(None));
614
615 return Ok(RequestNameReply::PrimaryOwner);
616 }
617
618 let dbus_proxy = fdo::DBusProxy::builder(self)
619 .cache_properties(CacheProperties::No)
620 .build()
621 .await?;
622 let mut acquired_stream = dbus_proxy.receive_name_acquired().await?;
623 let mut lost_stream = dbus_proxy.receive_name_lost().await?;
624 let reply = dbus_proxy
625 .request_name(well_known_name.clone(), flags)
626 .await?;
627 let lost_task_name = format!("monitor name {well_known_name} lost");
628 let name_lost_fut = if flags.contains(RequestNameFlags::AllowReplacement) {
629 let weak_conn = WeakConnection::from(self);
630 let well_known_name = well_known_name.to_owned();
631 Some(
632 async move {
633 loop {
634 let signal = lost_stream.next().await;
635 let inner = match weak_conn.upgrade() {
636 Some(conn) => conn.inner.clone(),
637 None => break,
638 };
639
640 match signal {
641 Some(signal) => match signal.args() {
642 Ok(args) if args.name == well_known_name => {
643 tracing::info!(
644 "Connection `{}` lost name `{}`",
645 // SAFETY: This is bus connection so unique name can't be
646 // None.
647 inner.unique_name.get().unwrap(),
648 well_known_name
649 );
650 inner.registered_names.lock().await.remove(&well_known_name);
651
652 break;
653 }
654 Ok(_) => (),
655 Err(e) => warn!("Failed to parse `NameLost` signal: {}", e),
656 },
657 None => {
658 trace!("`NameLost` signal stream closed");
659 // This is a very strange state we end up in. Now the name is
660 // question remains in the queue
661 // forever. Maybe we can do better here but I
662 // think it's a very unlikely scenario anyway.
663 //
664 // Can happen if the connection is lost/dropped but then the whole
665 // `Connection` instance will go away soon anyway and hence this
666 // strange state along with it.
667 break;
668 }
669 }
670 }
671 }
672 .instrument(info_span!("{}", lost_task_name)),
673 )
674 } else {
675 None
676 };
677 let status = match reply {
678 RequestNameReply::InQueue => {
679 let weak_conn = WeakConnection::from(self);
680 let well_known_name = well_known_name.to_owned();
681 let task_name = format!("monitor name {well_known_name} acquired");
682 let task = self.executor().spawn(
683 async move {
684 loop {
685 let signal = acquired_stream.next().await;
686 let inner = match weak_conn.upgrade() {
687 Some(conn) => conn.inner.clone(),
688 None => break,
689 };
690 match signal {
691 Some(signal) => match signal.args() {
692 Ok(args) if args.name == well_known_name => {
693 let mut names = inner.registered_names.lock().await;
694 if let Some(status) = names.get_mut(&well_known_name) {
695 let task = name_lost_fut.map(|fut| {
696 inner.executor.spawn(fut, &lost_task_name)
697 });
698 *status = NameStatus::Owner(task);
699
700 break;
701 }
702 // else the name was released in the meantime. :shrug:
703 }
704 Ok(_) => (),
705 Err(e) => warn!("Failed to parse `NameAcquired` signal: {}", e),
706 },
707 None => {
708 trace!("`NameAcquired` signal stream closed");
709 // See comment above for similar state in case of `NameLost`
710 // stream.
711 break;
712 }
713 }
714 }
715 }
716 .instrument(info_span!("{}", task_name)),
717 &task_name,
718 );
719
720 NameStatus::Queued(task)
721 }
722 RequestNameReply::PrimaryOwner | RequestNameReply::AlreadyOwner => {
723 let task = name_lost_fut.map(|fut| self.executor().spawn(fut, &lost_task_name));
724
725 NameStatus::Owner(task)
726 }
727 RequestNameReply::Exists => return Err(Error::NameTaken),
728 };
729
730 names.insert(well_known_name.to_owned(), status);
731
732 Ok(reply)
733 }
734
735 /// Deregister a previously registered well-known name for this service on the bus.
736 ///
737 /// Use this method to deregister a well-known name, registered through
738 /// [`Connection::request_name`].
739 ///
740 /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
741 /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
742 /// was not previously registered or already deregistered.
743 pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
744 where
745 W: TryInto<WellKnownName<'w>>,
746 W::Error: Into<Error>,
747 {
748 let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?;
749 let mut names = self.inner.registered_names.lock().await;
750 // FIXME: Should be possible to avoid cloning/allocation here
751 if names.remove(&well_known_name.to_owned()).is_none() {
752 return Ok(false);
753 };
754
755 if !self.is_bus() {
756 return Ok(true);
757 }
758
759 fdo::DBusProxy::builder(self)
760 .cache_properties(CacheProperties::No)
761 .build()
762 .await?
763 .release_name(well_known_name)
764 .await
765 .map(|_| true)
766 .map_err(Into::into)
767 }
768
769 /// Checks if `self` is a connection to a message bus.
770 ///
771 /// This will return `false` for p2p connections. When the `p2p` feature is enabled, this will
772 /// always return `true`.
773 pub fn is_bus(&self) -> bool {
774 #[cfg(feature = "p2p")]
775 {
776 self.inner.bus_conn
777 }
778 #[cfg(not(feature = "p2p"))]
779 {
780 true
781 }
782 }
783
784 /// The unique name of the connection, if set/applicable.
785 ///
786 /// The unique name is assigned by the message bus or set manually using
787 /// [`Connection::set_unique_name`].
788 pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
789 self.inner.unique_name.get()
790 }
791
792 /// Sets the unique name of the connection (if not already set).
793 ///
794 /// This is mainly provided for bus implementations. All other users should not need to use this
795 /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
796 ///
797 /// # Panics
798 ///
799 /// This method panics if the unique name is already set. It will always panic if the connection
800 /// is to a message bus as it's the bus that assigns peers their unique names.
801 #[cfg(feature = "bus-impl")]
802 pub fn set_unique_name<U>(&self, unique_name: U) -> Result<()>
803 where
804 U: TryInto<OwnedUniqueName>,
805 U::Error: Into<Error>,
806 {
807 let name = unique_name.try_into().map_err(Into::into)?;
808 self.set_unique_name_(name);
809
810 Ok(())
811 }
812
813 /// The capacity of the main (unfiltered) queue.
814 pub fn max_queued(&self) -> usize {
815 self.inner.msg_receiver.capacity()
816 }
817
818 /// Set the capacity of the main (unfiltered) queue.
819 pub fn set_max_queued(&mut self, max: usize) {
820 self.inner.msg_receiver.clone().set_capacity(max);
821 }
822
823 /// The server's GUID.
824 pub fn server_guid(&self) -> &OwnedGuid {
825 &self.inner.server_guid
826 }
827
828 /// The underlying executor.
829 ///
830 /// When a connection is built with internal_executor set to false, zbus will not spawn a
831 /// thread to run the executor. You're responsible to continuously [tick the executor][tte].
832 /// Failure to do so will result in hangs.
833 ///
834 /// # Examples
835 ///
836 /// Here is how one would typically run the zbus executor through tokio's scheduler:
837 ///
838 /// ```
839 /// # // Disable on windows because somehow it triggers a stack overflow there:
840 /// # // https://gitlab.freedesktop.org/zeenix/zbus/-/jobs/34023494
841 /// # #[cfg(not(target_os = "unix"))]
842 /// # {
843 /// use zbus::connection::Builder;
844 /// use tokio::task::spawn;
845 ///
846 /// # struct SomeIface;
847 /// #
848 /// # #[zbus::interface]
849 /// # impl SomeIface {
850 /// # }
851 /// #
852 /// #[tokio::main]
853 /// async fn main() {
854 /// let conn = Builder::session()
855 /// .unwrap()
856 /// .internal_executor(false)
857 /// # // This is only for testing a deadlock that used to happen with this combo.
858 /// # .serve_at("/some/iface", SomeIface)
859 /// # .unwrap()
860 /// .build()
861 /// .await
862 /// .unwrap();
863 /// {
864 /// let conn = conn.clone();
865 /// spawn(async move {
866 /// loop {
867 /// conn.executor().tick().await;
868 /// }
869 /// });
870 /// }
871 ///
872 /// // All your other async code goes here.
873 /// }
874 /// # }
875 /// ```
876 ///
877 /// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use
878 /// zbus with tokio, you do not need to worry about this at all. All you need to do is enable
879 /// `tokio` feature. You should also disable the (default) `async-io` feature in your
880 /// `Cargo.toml` to avoid unused dependencies. Also note that **prior** to zbus 3.0, disabling
881 /// `async-io` was required to enable tight `tokio` integration.
882 ///
883 /// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick
884 pub fn executor(&self) -> &Executor<'static> {
885 &self.inner.executor
886 }
887
888 /// Get a reference to the associated [`ObjectServer`].
889 ///
890 /// The `ObjectServer` is created on-demand.
891 ///
892 /// **Note**: Once the `ObjectServer` is created, it will be replying to all method calls
893 /// received on `self`. If you want to manually reply to method calls, do not use this
894 /// method (or any of the `ObjectServer` related API).
895 pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
896 // FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for
897 // crate::ObjectServer instead of this wrapper?
898 struct Wrapper<'a>(&'a blocking::ObjectServer);
899 impl<'a> Deref for Wrapper<'a> {
900 type Target = ObjectServer;
901
902 fn deref(&self) -> &Self::Target {
903 self.0.inner()
904 }
905 }
906
907 Wrapper(self.sync_object_server(true, None))
908 }
909
910 pub(crate) fn sync_object_server(
911 &self,
912 start: bool,
913 started_event: Option<Event>,
914 ) -> &blocking::ObjectServer {
915 self.inner
916 .object_server
917 .get_or_init(move || self.setup_object_server(start, started_event))
918 }
919
920 fn setup_object_server(
921 &self,
922 start: bool,
923 started_event: Option<Event>,
924 ) -> blocking::ObjectServer {
925 if start {
926 self.start_object_server(started_event);
927 }
928
929 blocking::ObjectServer::new(self)
930 }
931
932 #[instrument(skip(self))]
933 pub(crate) fn start_object_server(&self, started_event: Option<Event>) {
934 self.inner.object_server_dispatch_task.get_or_init(|| {
935 trace!("starting ObjectServer task");
936 let weak_conn = WeakConnection::from(self);
937
938 let obj_server_task_name = "ObjectServer task";
939 self.inner.executor.spawn(
940 async move {
941 let mut stream = match weak_conn.upgrade() {
942 Some(conn) => {
943 let mut builder = MatchRule::builder().msg_type(Type::MethodCall);
944 if let Some(unique_name) = conn.unique_name() {
945 builder = builder.destination(&**unique_name).expect("unique name");
946 }
947 let rule = builder.build();
948 match conn.add_match(rule.into(), None).await {
949 Ok(stream) => stream,
950 Err(e) => {
951 // Very unlikely but can happen I guess if connection is closed.
952 debug!("Failed to create message stream: {}", e);
953
954 return;
955 }
956 }
957 }
958 None => {
959 trace!("Connection is gone, stopping associated object server task");
960
961 return;
962 }
963 };
964 if let Some(started_event) = started_event {
965 started_event.notify(1);
966 }
967
968 trace!("waiting for incoming method call messages..");
969 while let Some(msg) = stream.next().await.and_then(|m| {
970 if let Err(e) = &m {
971 debug!("Error while reading from object server stream: {:?}", e);
972 }
973 m.ok()
974 }) {
975 if let Some(conn) = weak_conn.upgrade() {
976 let hdr = msg.header();
977 // If we're connected to a bus, skip the destination check as the
978 // server will only send us method calls destined to us.
979 if !conn.is_bus() {
980 match hdr.destination() {
981 // Unique name is already checked by the match rule.
982 Some(BusName::Unique(_)) | None => (),
983 Some(BusName::WellKnown(dest)) => {
984 let names = conn.inner.registered_names.lock().await;
985 // destination doesn't matter if no name has been registered
986 // (probably means the name is registered through external
987 // means).
988 if !names.is_empty() && !names.contains_key(dest) {
989 trace!(
990 "Got a method call for a different destination: {}",
991 dest
992 );
993
994 continue;
995 }
996 }
997 }
998 }
999 let server = conn.object_server();
1000 if let Err(e) = server.dispatch_call(&msg, &hdr).await {
1001 debug!(
1002 "Error dispatching message. Message: {:?}, error: {:?}",
1003 msg, e
1004 );
1005 }
1006 } else {
1007 // If connection is completely gone, no reason to keep running the task
1008 // anymore.
1009 trace!("Connection is gone, stopping associated object server task");
1010 break;
1011 }
1012 }
1013 }
1014 .instrument(info_span!("{}", obj_server_task_name)),
1015 obj_server_task_name,
1016 )
1017 });
1018 }
1019
1020 pub(crate) async fn add_match(
1021 &self,
1022 rule: OwnedMatchRule,
1023 max_queued: Option<usize>,
1024 ) -> Result<Receiver<Result<Message>>> {
1025 use std::collections::hash_map::Entry;
1026
1027 if self.inner.msg_senders.lock().await.is_empty() {
1028 // This only happens if socket reader task has errored out.
1029 return Err(Error::InputOutput(Arc::new(io::Error::new(
1030 io::ErrorKind::BrokenPipe,
1031 "Socket reader task has errored out",
1032 ))));
1033 }
1034
1035 let mut subscriptions = self.inner.subscriptions.lock().await;
1036 let msg_type = rule.msg_type().unwrap_or(Type::Signal);
1037 match subscriptions.entry(rule.clone()) {
1038 Entry::Vacant(e) => {
1039 let max_queued = max_queued.unwrap_or(DEFAULT_MAX_QUEUED);
1040 let (sender, mut receiver) = broadcast(max_queued);
1041 receiver.set_await_active(false);
1042 if self.is_bus() && msg_type == Type::Signal {
1043 fdo::DBusProxy::builder(self)
1044 .cache_properties(CacheProperties::No)
1045 .build()
1046 .await?
1047 .add_match_rule(e.key().inner().clone())
1048 .await?;
1049 }
1050 e.insert((1, receiver.clone().deactivate()));
1051 self.inner
1052 .msg_senders
1053 .lock()
1054 .await
1055 .insert(Some(rule), sender);
1056
1057 Ok(receiver)
1058 }
1059 Entry::Occupied(mut e) => {
1060 let (num_subscriptions, receiver) = e.get_mut();
1061 *num_subscriptions += 1;
1062 if let Some(max_queued) = max_queued {
1063 if max_queued > receiver.capacity() {
1064 receiver.set_capacity(max_queued);
1065 }
1066 }
1067
1068 Ok(receiver.activate_cloned())
1069 }
1070 }
1071 }
1072
1073 pub(crate) async fn remove_match(&self, rule: OwnedMatchRule) -> Result<bool> {
1074 use std::collections::hash_map::Entry;
1075 let mut subscriptions = self.inner.subscriptions.lock().await;
1076 // TODO when it becomes stable, use HashMap::raw_entry and only require expr: &str
1077 // (both here and in add_match)
1078 let msg_type = rule.msg_type().unwrap_or(Type::Signal);
1079 match subscriptions.entry(rule) {
1080 Entry::Vacant(_) => Ok(false),
1081 Entry::Occupied(mut e) => {
1082 let rule = e.key().inner().clone();
1083 e.get_mut().0 -= 1;
1084 if e.get().0 == 0 {
1085 if self.is_bus() && msg_type == Type::Signal {
1086 fdo::DBusProxy::builder(self)
1087 .cache_properties(CacheProperties::No)
1088 .build()
1089 .await?
1090 .remove_match_rule(rule.clone())
1091 .await?;
1092 }
1093 e.remove();
1094 self.inner
1095 .msg_senders
1096 .lock()
1097 .await
1098 .remove(&Some(rule.into()));
1099 }
1100 Ok(true)
1101 }
1102 }
1103 }
1104
1105 pub(crate) fn queue_remove_match(&self, rule: OwnedMatchRule) {
1106 let conn = self.clone();
1107 let task_name = format!("Remove match `{}`", *rule);
1108 let remove_match =
1109 async move { conn.remove_match(rule).await }.instrument(trace_span!("{}", task_name));
1110 self.inner.executor.spawn(remove_match, &task_name).detach()
1111 }
1112
1113 pub(crate) async fn new(
1114 auth: Authenticated,
1115 #[allow(unused)] bus_connection: bool,
1116 executor: Executor<'static>,
1117 ) -> Result<Self> {
1118 #[cfg(unix)]
1119 let cap_unix_fd = auth.cap_unix_fd;
1120
1121 macro_rules! create_msg_broadcast_channel {
1122 ($size:expr) => {{
1123 let (msg_sender, msg_receiver) = broadcast($size);
1124 let mut msg_receiver = msg_receiver.deactivate();
1125 msg_receiver.set_await_active(false);
1126
1127 (msg_sender, msg_receiver)
1128 }};
1129 }
1130 // The unfiltered message channel.
1131 let (msg_sender, msg_receiver) = create_msg_broadcast_channel!(DEFAULT_MAX_QUEUED);
1132 let mut msg_senders = HashMap::new();
1133 msg_senders.insert(None, msg_sender);
1134
1135 // The special method return & error channel.
1136 let (method_return_sender, method_return_receiver) =
1137 create_msg_broadcast_channel!(DEFAULT_MAX_METHOD_RETURN_QUEUED);
1138 let rule = MatchRule::builder()
1139 .msg_type(Type::MethodReturn)
1140 .build()
1141 .into();
1142 msg_senders.insert(Some(rule), method_return_sender.clone());
1143 let rule = MatchRule::builder().msg_type(Type::Error).build().into();
1144 msg_senders.insert(Some(rule), method_return_sender);
1145 let msg_senders = Arc::new(Mutex::new(msg_senders));
1146 let subscriptions = Mutex::new(HashMap::new());
1147
1148 let connection = Self {
1149 inner: Arc::new(ConnectionInner {
1150 activity_event: Arc::new(Event::new()),
1151 socket_write: Mutex::new(auth.socket_write),
1152 server_guid: auth.server_guid,
1153 #[cfg(unix)]
1154 cap_unix_fd,
1155 #[cfg(feature = "p2p")]
1156 bus_conn: bus_connection,
1157 unique_name: OnceLock::new(),
1158 subscriptions,
1159 object_server: OnceLock::new(),
1160 object_server_dispatch_task: OnceLock::new(),
1161 executor,
1162 socket_reader_task: OnceLock::new(),
1163 msg_senders,
1164 msg_receiver,
1165 method_return_receiver,
1166 registered_names: Mutex::new(HashMap::new()),
1167 }),
1168 };
1169
1170 if let Some(unique_name) = auth.unique_name {
1171 connection.set_unique_name_(unique_name);
1172 }
1173
1174 Ok(connection)
1175 }
1176
1177 /// Create a `Connection` to the session/user message bus.
1178 pub async fn session() -> Result<Self> {
1179 Builder::session()?.build().await
1180 }
1181
1182 /// Create a `Connection` to the system-wide message bus.
1183 pub async fn system() -> Result<Self> {
1184 Builder::system()?.build().await
1185 }
1186
1187 /// Returns a listener, notified on various connection activity.
1188 ///
1189 /// This function is meant for the caller to implement idle or timeout on inactivity.
1190 pub fn monitor_activity(&self) -> EventListener {
1191 self.inner.activity_event.listen()
1192 }
1193
1194 /// Returns the peer credentials.
1195 ///
1196 /// The fields are populated on the best effort basis. Some or all fields may not even make
1197 /// sense for certain sockets or on certain platforms and hence will be set to `None`.
1198 ///
1199 /// # Caveats
1200 ///
1201 /// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
1202 pub async fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
1203 self.inner
1204 .socket_write
1205 .lock()
1206 .await
1207 .peer_credentials()
1208 .await
1209 }
1210
1211 /// Close the connection.
1212 ///
1213 /// After this call, all reading and writing operations will fail.
1214 pub async fn close(self) -> Result<()> {
1215 self.inner.activity_event.notify(usize::MAX);
1216 self.inner
1217 .socket_write
1218 .lock()
1219 .await
1220 .close()
1221 .await
1222 .map_err(Into::into)
1223 }
1224
1225 pub(crate) fn init_socket_reader(
1226 &self,
1227 socket_read: Box<dyn socket::ReadHalf>,
1228 already_read: Vec<u8>,
1229 #[cfg(unix)] already_received_fds: Vec<std::os::fd::OwnedFd>,
1230 ) {
1231 let inner = &self.inner;
1232 inner
1233 .socket_reader_task
1234 .set(
1235 SocketReader::new(
1236 socket_read,
1237 inner.msg_senders.clone(),
1238 already_read,
1239 #[cfg(unix)]
1240 already_received_fds,
1241 inner.activity_event.clone(),
1242 )
1243 .spawn(&inner.executor),
1244 )
1245 .expect("Attempted to set `socket_reader_task` twice");
1246 }
1247
1248 fn set_unique_name_(&self, name: OwnedUniqueName) {
1249 self.inner
1250 .unique_name
1251 .set(name)
1252 // programmer (probably our) error if this fails.
1253 .expect("unique name already set");
1254 }
1255}
1256
1257impl From<crate::blocking::Connection> for Connection {
1258 fn from(conn: crate::blocking::Connection) -> Self {
1259 conn.into_inner()
1260 }
1261}
1262
1263// Internal API that allows keeping a weak connection ref around.
1264#[derive(Debug)]
1265pub(crate) struct WeakConnection {
1266 inner: Weak<ConnectionInner>,
1267}
1268
1269impl WeakConnection {
1270 /// Upgrade to a Connection.
1271 pub fn upgrade(&self) -> Option<Connection> {
1272 self.inner.upgrade().map(|inner: Arc| Connection { inner })
1273 }
1274}
1275
1276impl From<&Connection> for WeakConnection {
1277 fn from(conn: &Connection) -> Self {
1278 Self {
1279 inner: Arc::downgrade(&conn.inner),
1280 }
1281 }
1282}
1283
1284#[derive(Debug)]
1285enum NameStatus {
1286 // The task waits for name lost signal if owner allows replacement.
1287 Owner(#[allow(unused)] Option<Task<()>>),
1288 // The task waits for name acquisition signal.
1289 Queued(#[allow(unused)] Task<()>),
1290}
1291
1292static SERIAL_NUM_SEMAPHORE: Semaphore = Semaphore::new(permits:1);
1293
1294// Make message creation and sending an atomic operation, using an async
1295// semaphore if flatpak portal is detected to workaround an xdg-dbus-proxy issue:
1296//
1297// https://github.com/flatpak/xdg-dbus-proxy/issues/46
1298async fn acquire_serial_num_semaphore() -> Option<SemaphorePermit<'static>> {
1299 if is_flatpak() {
1300 Some(SERIAL_NUM_SEMAPHORE.acquire().await)
1301 } else {
1302 None
1303 }
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308 use super::*;
1309 use crate::fdo::DBusProxy;
1310 use ntest::timeout;
1311 use test_log::test;
1312
1313 #[cfg(windows)]
1314 #[test]
1315 fn connect_autolaunch_session_bus() {
1316 let addr =
1317 crate::win32::autolaunch_bus_address().expect("Unable to get session bus address");
1318
1319 crate::block_on(async { addr.connect().await }).expect("Unable to connect to session bus");
1320 }
1321
1322 #[cfg(target_os = "macos")]
1323 #[test]
1324 fn connect_launchd_session_bus() {
1325 use crate::address::{transport::Launchd, Address, Transport};
1326 crate::block_on(async {
1327 let addr = Address::from(Transport::Launchd(Launchd::new(
1328 "DBUS_LAUNCHD_SESSION_BUS_SOCKET",
1329 )));
1330 addr.connect().await
1331 })
1332 .expect("Unable to connect to session bus");
1333 }
1334
1335 #[test]
1336 #[timeout(15000)]
1337 fn disconnect_on_drop() {
1338 // Reproducer for https://github.com/dbus2/zbus/issues/308 where setting up the
1339 // objectserver would cause the connection to not disconnect on drop.
1340 crate::utils::block_on(test_disconnect_on_drop());
1341 }
1342
1343 async fn test_disconnect_on_drop() {
1344 #[derive(Default)]
1345 struct MyInterface {}
1346
1347 #[crate::interface(name = "dev.peelz.FooBar.Baz")]
1348 impl MyInterface {
1349 fn do_thing(&self) {}
1350 }
1351 let name = "dev.peelz.foobar";
1352 let connection = Builder::session()
1353 .unwrap()
1354 .name(name)
1355 .unwrap()
1356 .serve_at("/dev/peelz/FooBar", MyInterface::default())
1357 .unwrap()
1358 .build()
1359 .await
1360 .unwrap();
1361
1362 let connection2 = Connection::session().await.unwrap();
1363 let dbus = DBusProxy::new(&connection2).await.unwrap();
1364 let mut stream = dbus
1365 .receive_name_owner_changed_with_args(&[(0, name), (2, "")])
1366 .await
1367 .unwrap();
1368
1369 drop(connection);
1370
1371 // If the connection is not dropped, this will hang forever.
1372 stream.next().await.unwrap();
1373
1374 // Let's still make sure the name is gone.
1375 let name_has_owner = dbus.name_has_owner(name.try_into().unwrap()).await.unwrap();
1376 assert!(!name_has_owner);
1377 }
1378}
1379
1380#[cfg(feature = "p2p")]
1381#[cfg(test)]
1382mod p2p_tests {
1383 use futures_util::stream::TryStreamExt;
1384 use ntest::timeout;
1385 use test_log::test;
1386 use zvariant::{Endian, NATIVE_ENDIAN};
1387
1388 use crate::{AuthMechanism, Guid};
1389
1390 use super::*;
1391
1392 // Same numbered client and server are already paired up.
1393 async fn test_p2p(
1394 server1: Connection,
1395 client1: Connection,
1396 server2: Connection,
1397 client2: Connection,
1398 ) -> Result<()> {
1399 let forward1 = {
1400 let stream = MessageStream::from(server1.clone());
1401 let sink = client2.clone();
1402
1403 stream.try_for_each(move |msg| {
1404 let sink = sink.clone();
1405 async move { sink.send(&msg).await }
1406 })
1407 };
1408 let forward2 = {
1409 let stream = MessageStream::from(client2.clone());
1410 let sink = server1.clone();
1411
1412 stream.try_for_each(move |msg| {
1413 let sink = sink.clone();
1414 async move { sink.send(&msg).await }
1415 })
1416 };
1417 let _forward_task = client1.executor().spawn(
1418 async move { futures_util::try_join!(forward1, forward2) },
1419 "forward_task",
1420 );
1421
1422 let server_ready = Event::new();
1423 let server_ready_listener = server_ready.listen();
1424 let client_done = Event::new();
1425 let client_done_listener = client_done.listen();
1426
1427 let server_future = async move {
1428 let mut stream = MessageStream::from(&server2);
1429 server_ready.notify(1);
1430 let method = loop {
1431 let m = stream.try_next().await?.unwrap();
1432 if m.to_string() == "Method call Test" {
1433 assert_eq!(m.body().deserialize::<u64>().unwrap(), 64);
1434 break m;
1435 }
1436 };
1437
1438 // Send another message first to check the queueing function on client side.
1439 server2
1440 .emit_signal(None::<()>, "/", "org.zbus.p2p", "ASignalForYou", &())
1441 .await?;
1442 server2.reply(&method, &("yay")).await?;
1443 client_done_listener.await;
1444
1445 Ok(())
1446 };
1447
1448 let client_future = async move {
1449 let mut stream = MessageStream::from(&client1);
1450 server_ready_listener.await;
1451 // We want to set non-native endian to ensure that:
1452 // 1. the message is actually encoded with the specified endian.
1453 // 2. the server side is able to decode it and replies in the same encoding.
1454 let endian = match NATIVE_ENDIAN {
1455 Endian::Little => Endian::Big,
1456 Endian::Big => Endian::Little,
1457 };
1458 let method = Message::method("/", "Test")?
1459 .interface("org.zbus.p2p")?
1460 .endian(endian)
1461 .build(&64u64)?;
1462 client1.send(&method).await?;
1463 // Check we didn't miss the signal that was sent during the call.
1464 let m = stream.try_next().await?.unwrap();
1465 client_done.notify(1);
1466 assert_eq!(m.to_string(), "Signal ASignalForYou");
1467 let reply = stream.try_next().await?.unwrap();
1468 assert_eq!(reply.to_string(), "Method return");
1469 // Check if the reply was in the non-native endian.
1470 assert_eq!(Endian::from(reply.primary_header().endian_sig()), endian);
1471 reply.body().deserialize::<String>()
1472 };
1473
1474 let (val, _) = futures_util::try_join!(client_future, server_future,)?;
1475 assert_eq!(val, "yay");
1476
1477 Ok(())
1478 }
1479
1480 #[test]
1481 #[timeout(15000)]
1482 fn tcp_p2p() {
1483 crate::utils::block_on(test_tcp_p2p()).unwrap();
1484 }
1485
1486 async fn test_tcp_p2p() -> Result<()> {
1487 let (server1, client1) = tcp_p2p_pipe().await?;
1488 let (server2, client2) = tcp_p2p_pipe().await?;
1489
1490 test_p2p(server1, client1, server2, client2).await
1491 }
1492
1493 async fn tcp_p2p_pipe() -> Result<(Connection, Connection)> {
1494 let guid = Guid::generate();
1495
1496 #[cfg(not(feature = "tokio"))]
1497 let (server_conn_builder, client_conn_builder) = {
1498 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1499 let addr = listener.local_addr().unwrap();
1500 let p1 = std::net::TcpStream::connect(addr).unwrap();
1501 let p0 = listener.incoming().next().unwrap().unwrap();
1502
1503 (
1504 Builder::tcp_stream(p0)
1505 .server(guid)
1506 .unwrap()
1507 .p2p()
1508 .auth_mechanism(AuthMechanism::Anonymous),
1509 Builder::tcp_stream(p1).p2p(),
1510 )
1511 };
1512
1513 #[cfg(feature = "tokio")]
1514 let (server_conn_builder, client_conn_builder) = {
1515 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1516 let addr = listener.local_addr().unwrap();
1517 let p1 = tokio::net::TcpStream::connect(addr).await.unwrap();
1518 let p0 = listener.accept().await.unwrap().0;
1519
1520 (
1521 Builder::tcp_stream(p0)
1522 .server(guid)
1523 .unwrap()
1524 .p2p()
1525 .auth_mechanism(AuthMechanism::Anonymous),
1526 Builder::tcp_stream(p1).p2p(),
1527 )
1528 };
1529
1530 futures_util::try_join!(server_conn_builder.build(), client_conn_builder.build())
1531 }
1532
1533 #[cfg(unix)]
1534 #[test]
1535 #[timeout(15000)]
1536 fn unix_p2p() {
1537 crate::utils::block_on(test_unix_p2p()).unwrap();
1538 }
1539
1540 #[cfg(unix)]
1541 async fn test_unix_p2p() -> Result<()> {
1542 let (server1, client1) = unix_p2p_pipe().await?;
1543 let (server2, client2) = unix_p2p_pipe().await?;
1544
1545 test_p2p(server1, client1, server2, client2).await
1546 }
1547
1548 #[cfg(unix)]
1549 async fn unix_p2p_pipe() -> Result<(Connection, Connection)> {
1550 #[cfg(not(feature = "tokio"))]
1551 use std::os::unix::net::UnixStream;
1552 #[cfg(feature = "tokio")]
1553 use tokio::net::UnixStream;
1554 #[cfg(all(windows, not(feature = "tokio")))]
1555 use uds_windows::UnixStream;
1556
1557 let guid = Guid::generate();
1558
1559 let (p0, p1) = UnixStream::pair().unwrap();
1560
1561 futures_util::try_join!(
1562 Builder::unix_stream(p1).p2p().build(),
1563 Builder::unix_stream(p0).server(guid).unwrap().p2p().build(),
1564 )
1565 }
1566
1567 // Compile-test only since we don't have a VM setup to run this with/in.
1568 #[cfg(any(
1569 all(feature = "vsock", not(feature = "tokio")),
1570 feature = "tokio-vsock"
1571 ))]
1572 #[test]
1573 #[timeout(15000)]
1574 #[ignore]
1575 fn vsock_p2p() {
1576 crate::utils::block_on(test_vsock_p2p()).unwrap();
1577 }
1578
1579 #[cfg(any(
1580 all(feature = "vsock", not(feature = "tokio")),
1581 feature = "tokio-vsock"
1582 ))]
1583 async fn test_vsock_p2p() -> Result<()> {
1584 let (server1, client1) = vsock_p2p_pipe().await?;
1585 let (server2, client2) = vsock_p2p_pipe().await?;
1586
1587 test_p2p(server1, client1, server2, client2).await
1588 }
1589
1590 #[cfg(all(feature = "vsock", not(feature = "tokio")))]
1591 async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
1592 let guid = Guid::generate();
1593
1594 let listener = vsock::VsockListener::bind_with_cid_port(vsock::VMADDR_CID_ANY, 42).unwrap();
1595 let addr = listener.local_addr().unwrap();
1596 let client = vsock::VsockStream::connect(&addr).unwrap();
1597 let server = listener.incoming().next().unwrap().unwrap();
1598
1599 futures_util::try_join!(
1600 Builder::vsock_stream(server)
1601 .server(guid)
1602 .unwrap()
1603 .p2p()
1604 .auth_mechanism(AuthMechanism::Anonymous)
1605 .build(),
1606 Builder::vsock_stream(client).p2p().build(),
1607 )
1608 }
1609
1610 #[cfg(feature = "tokio-vsock")]
1611 async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
1612 let guid = Guid::generate();
1613
1614 let listener = tokio_vsock::VsockListener::bind(2, 42).unwrap();
1615 let client = tokio_vsock::VsockStream::connect(3, 42).await.unwrap();
1616 let server = listener.incoming().next().await.unwrap().unwrap();
1617
1618 futures_util::try_join!(
1619 Builder::vsock_stream(server)
1620 .server(guid)
1621 .unwrap()
1622 .p2p()
1623 .auth_mechanism(AuthMechanism::Anonymous)
1624 .build(),
1625 Builder::vsock_stream(client).p2p().build(),
1626 )
1627 }
1628 #[cfg(any(unix, not(feature = "tokio")))]
1629 #[test]
1630 #[timeout(15000)]
1631 fn unix_p2p_cookie_auth() {
1632 use crate::utils::block_on;
1633 use std::{
1634 fs::{create_dir_all, remove_file, write},
1635 time::{SystemTime as Time, UNIX_EPOCH},
1636 };
1637 #[cfg(unix)]
1638 use std::{
1639 fs::{set_permissions, Permissions},
1640 os::unix::fs::PermissionsExt,
1641 };
1642 use xdg_home::home_dir;
1643
1644 let cookie_context = "zbus-test-cookie-context";
1645 let cookie_id = 123456789;
1646 let cookie = hex::encode(b"our cookie");
1647
1648 // Ensure cookie directory exists.
1649 let cookie_dir = home_dir().unwrap().join(".dbus-keyrings");
1650 create_dir_all(&cookie_dir).unwrap();
1651 #[cfg(unix)]
1652 set_permissions(&cookie_dir, Permissions::from_mode(0o700)).unwrap();
1653
1654 // Create a cookie file.
1655 let cookie_file = cookie_dir.join(cookie_context);
1656 let ts = Time::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
1657 let cookie_entry = format!("{cookie_id} {ts} {cookie}");
1658 write(&cookie_file, cookie_entry).unwrap();
1659
1660 // Explicit cookie ID.
1661 let res1 = block_on(test_unix_p2p_cookie_auth(cookie_context, Some(cookie_id)));
1662 // Implicit cookie ID (first one should be picked).
1663 let res2 = block_on(test_unix_p2p_cookie_auth(cookie_context, None));
1664
1665 // Remove the cookie file.
1666 remove_file(&cookie_file).unwrap();
1667
1668 res1.unwrap();
1669 res2.unwrap();
1670 }
1671
1672 #[cfg(any(unix, not(feature = "tokio")))]
1673 async fn test_unix_p2p_cookie_auth(
1674 cookie_context: &'static str,
1675 cookie_id: Option<usize>,
1676 ) -> Result<()> {
1677 #[cfg(all(unix, not(feature = "tokio")))]
1678 use std::os::unix::net::UnixStream;
1679 #[cfg(all(unix, feature = "tokio"))]
1680 use tokio::net::UnixStream;
1681 #[cfg(all(windows, not(feature = "tokio")))]
1682 use uds_windows::UnixStream;
1683
1684 let guid = Guid::generate();
1685
1686 let (p0, p1) = UnixStream::pair().unwrap();
1687 let mut server_builder = Builder::unix_stream(p0)
1688 .server(guid)
1689 .unwrap()
1690 .p2p()
1691 .auth_mechanism(AuthMechanism::Cookie)
1692 .cookie_context(cookie_context)
1693 .unwrap();
1694 if let Some(cookie_id) = cookie_id {
1695 server_builder = server_builder.cookie_id(cookie_id);
1696 }
1697
1698 futures_util::try_join!(
1699 Builder::unix_stream(p1).p2p().build(),
1700 server_builder.build(),
1701 )
1702 .map(|_| ())
1703 }
1704
1705 #[test]
1706 #[timeout(15000)]
1707 fn channel_pair() {
1708 crate::utils::block_on(test_channel_pair()).unwrap();
1709 }
1710
1711 async fn test_channel_pair() -> Result<()> {
1712 let (server1, client1) = create_channel_pair().await;
1713 let (server2, client2) = create_channel_pair().await;
1714
1715 test_p2p(server1, client1, server2, client2).await
1716 }
1717
1718 async fn create_channel_pair() -> (Connection, Connection) {
1719 let (a, b) = socket::Channel::pair();
1720
1721 let guid = crate::Guid::generate();
1722 let conn1 = Builder::authenticated_socket(a, guid.clone())
1723 .unwrap()
1724 .p2p()
1725 .build()
1726 .await
1727 .unwrap();
1728 let conn2 = Builder::authenticated_socket(b, guid)
1729 .unwrap()
1730 .p2p()
1731 .build()
1732 .await
1733 .unwrap();
1734
1735 (conn1, conn2)
1736 }
1737}
1738