1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use event_listener::Event;
4use static_assertions::assert_impl_all;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10 collections::{HashMap, HashSet, VecDeque},
11 vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::{ObjectPath, Str};
25
26use crate::{
27 address::{self, Address},
28 names::{InterfaceName, WellKnownName},
29 object_server::{ArcInterface, Interface},
30 Connection, Error, Executor, Guid, OwnedGuid, Result,
31};
32
33use super::{
34 handshake::{AuthMechanism, Authenticated},
35 socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
36};
37
38const DEFAULT_MAX_QUEUED: usize = 64;
39
40#[derive(Debug)]
41enum Target {
42 #[cfg(any(unix, not(feature = "tokio")))]
43 UnixStream(UnixStream),
44 TcpStream(TcpStream),
45 #[cfg(any(
46 all(feature = "vsock", not(feature = "tokio")),
47 feature = "tokio-vsock"
48 ))]
49 VsockStream(VsockStream),
50 Address(Address),
51 Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
52 AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53}
54
55type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
56
57/// A builder for [`zbus::Connection`].
58#[derive(Debug)]
59#[must_use]
60pub struct Builder<'a> {
61 target: Option<Target>,
62 max_queued: Option<usize>,
63 // This is only set for p2p server case or pre-authenticated sockets.
64 guid: Option<Guid<'a>>,
65 #[cfg(feature = "p2p")]
66 p2p: bool,
67 internal_executor: bool,
68 interfaces: Interfaces<'a>,
69 names: HashSet<WellKnownName<'a>>,
70 auth_mechanisms: Option<VecDeque<AuthMechanism>>,
71 #[cfg(feature = "bus-impl")]
72 unique_name: Option<crate::names::UniqueName<'a>>,
73 cookie_context: Option<super::handshake::CookieContext<'a>>,
74 cookie_id: Option<usize>,
75}
76
77assert_impl_all!(Builder<'_>: Send, Sync, Unpin);
78
79impl<'a> Builder<'a> {
80 /// Create a builder for the session/user message bus connection.
81 pub fn session() -> Result<Self> {
82 Ok(Self::new(Target::Address(Address::session()?)))
83 }
84
85 /// Create a builder for the system-wide message bus connection.
86 pub fn system() -> Result<Self> {
87 Ok(Self::new(Target::Address(Address::system()?)))
88 }
89
90 /// Create a builder for connection that will use the given [D-Bus bus address].
91 ///
92 /// # Example
93 ///
94 /// Here is an example of connecting to an IBus service:
95 ///
96 /// ```no_run
97 /// # use std::error::Error;
98 /// # use zbus::connection::Builder;
99 /// # use zbus::block_on;
100 /// #
101 /// # block_on(async {
102 /// let addr = "unix:\
103 /// path=/home/zeenix/.cache/ibus/dbus-ET0Xzrk9,\
104 /// guid=fdd08e811a6c7ebe1fef0d9e647230da";
105 /// let conn = Builder::address(addr)?
106 /// .build()
107 /// .await?;
108 ///
109 /// // Do something useful with `conn`..
110 /// # drop(conn);
111 /// # Ok::<(), zbus::Error>(())
112 /// # }).unwrap();
113 /// #
114 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
115 /// ```
116 ///
117 /// **Note:** The IBus address is different for each session. You can find the address for your
118 /// current session using `ibus address` command.
119 ///
120 /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
121 pub fn address<A>(address: A) -> Result<Self>
122 where
123 A: TryInto<Address>,
124 A::Error: Into<Error>,
125 {
126 Ok(Self::new(Target::Address(
127 address.try_into().map_err(Into::into)?,
128 )))
129 }
130
131 /// Create a builder for connection that will use the given unix stream.
132 ///
133 /// If the default `async-io` feature is disabled, this method will expect
134 /// [`tokio::net::UnixStream`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
135 /// argument.
136 ///
137 /// Since tokio currently [does not support Unix domain sockets][tuds] on Windows, this method
138 /// is not available when the `tokio` feature is enabled and building for Windows target.
139 ///
140 /// [tuds]: https://github.com/tokio-rs/tokio/issues/2201
141 #[cfg(any(unix, not(feature = "tokio")))]
142 pub fn unix_stream(stream: UnixStream) -> Self {
143 Self::new(Target::UnixStream(stream))
144 }
145
146 /// Create a builder for connection that will use the given TCP stream.
147 ///
148 /// If the default `async-io` feature is disabled, this method will expect
149 /// [`tokio::net::TcpStream`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html)
150 /// argument.
151 pub fn tcp_stream(stream: TcpStream) -> Self {
152 Self::new(Target::TcpStream(stream))
153 }
154
155 /// Create a builder for connection that will use the given VSOCK stream.
156 ///
157 /// This method is only available when either `vsock` or `tokio-vsock` feature is enabled. The
158 /// type of `stream` is `vsock::VsockStream` with `vsock` feature and `tokio_vsock::VsockStream`
159 /// with `tokio-vsock` feature.
160 #[cfg(any(
161 all(feature = "vsock", not(feature = "tokio")),
162 feature = "tokio-vsock"
163 ))]
164 pub fn vsock_stream(stream: VsockStream) -> Self {
165 Self::new(Target::VsockStream(stream))
166 }
167
168 /// Create a builder for connection that will use the given socket.
169 pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
170 Self::new(Target::Socket(socket.into()))
171 }
172
173 /// Create a builder for a connection that will use the given pre-authenticated socket.
174 ///
175 /// This is similar to [`Builder::socket`], except that the socket is either already
176 /// authenticated or does not require authentication.
177 pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
178 where
179 S: Into<BoxedSplit>,
180 G: TryInto<Guid<'a>>,
181 G::Error: Into<Error>,
182 {
183 let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
184 builder.guid = Some(guid.try_into().map_err(Into::into)?);
185
186 Ok(builder)
187 }
188
189 /// Specify the mechanism to use during authentication.
190 pub fn auth_mechanism(self, auth_mechanism: AuthMechanism) -> Self {
191 #[allow(deprecated)]
192 self.auth_mechanisms(&[auth_mechanism])
193 }
194
195 /// Specify the mechanisms to use during authentication.
196 #[deprecated(since = "4.1.3", note = "Use `auth_mechanism` instead.")]
197 pub fn auth_mechanisms(mut self, auth_mechanisms: &[AuthMechanism]) -> Self {
198 self.auth_mechanisms = Some(VecDeque::from(auth_mechanisms.to_vec()));
199
200 self
201 }
202
203 /// The cookie context to use during authentication.
204 ///
205 /// This is only used when the `cookie` authentication mechanism is enabled and only valid for
206 /// server connection.
207 ///
208 /// If not specified, the default cookie context of `org_freedesktop_general` will be used.
209 ///
210 /// # Errors
211 ///
212 /// If the given string is not a valid cookie context.
213 pub fn cookie_context<C>(mut self, context: C) -> Result<Self>
214 where
215 C: Into<Str<'a>>,
216 {
217 self.cookie_context = Some(context.into().try_into()?);
218
219 Ok(self)
220 }
221
222 /// The ID of the cookie to use during authentication.
223 ///
224 /// This is only used when the `cookie` authentication mechanism is enabled and only valid for
225 /// server connection.
226 ///
227 /// If not specified, the first cookie found in the cookie context file will be used.
228 pub fn cookie_id(mut self, id: usize) -> Self {
229 self.cookie_id = Some(id);
230
231 self
232 }
233
234 /// The to-be-created connection will be a peer-to-peer connection.
235 ///
236 /// This method is only available when the `p2p` feature is enabled.
237 #[cfg(feature = "p2p")]
238 pub fn p2p(mut self) -> Self {
239 self.p2p = true;
240
241 self
242 }
243
244 /// The to-be-created connection will be a server using the given GUID.
245 ///
246 /// The to-be-created connection will wait for incoming client authentication handshake and
247 /// negotiation messages, for peer-to-peer communications after successful creation.
248 ///
249 /// This method is only available when the `p2p` feature is enabled.
250 ///
251 /// **NOTE:** This method is redundant when using [`Builder::authenticated_socket`] since the
252 /// latter already sets the GUID for the connection and zbus doesn't differentiate between a
253 /// server and a client connection, except for authentication.
254 #[cfg(feature = "p2p")]
255 pub fn server<G>(mut self, guid: G) -> Result<Self>
256 where
257 G: TryInto<Guid<'a>>,
258 G::Error: Into<Error>,
259 {
260 self.guid = Some(guid.try_into().map_err(Into::into)?);
261
262 Ok(self)
263 }
264
265 /// Set the capacity of the main (unfiltered) queue.
266 ///
267 /// Since typically you'd want to set this at instantiation time, you can set it through the
268 /// builder.
269 ///
270 /// # Example
271 ///
272 /// ```
273 /// # use std::error::Error;
274 /// # use zbus::connection::Builder;
275 /// # use zbus::block_on;
276 /// #
277 /// # block_on(async {
278 /// let conn = Builder::session()?
279 /// .max_queued(30)
280 /// .build()
281 /// .await?;
282 /// assert_eq!(conn.max_queued(), 30);
283 ///
284 /// # Ok::<(), zbus::Error>(())
285 /// # }).unwrap();
286 /// #
287 /// // Do something useful with `conn`..
288 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
289 /// ```
290 pub fn max_queued(mut self, max: usize) -> Self {
291 self.max_queued = Some(max);
292
293 self
294 }
295
296 /// Enable or disable the internal executor thread.
297 ///
298 /// The thread is enabled by default.
299 ///
300 /// See [Connection::executor] for more details.
301 pub fn internal_executor(mut self, enabled: bool) -> Self {
302 self.internal_executor = enabled;
303
304 self
305 }
306
307 /// Register a D-Bus [`Interface`] to be served at a given path.
308 ///
309 /// This is similar to [`zbus::ObjectServer::at`], except that it allows you to have your
310 /// interfaces available immediately after the connection is established. Typically, this is
311 /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
312 /// replace any previously added interface with the same name at the same path.
313 ///
314 /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
315 /// attempt to add yours, [`Builder::build()`] will fail.
316 pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
317 where
318 I: Interface,
319 P: TryInto<ObjectPath<'a>>,
320 P::Error: Into<Error>,
321 {
322 let path = path.try_into().map_err(Into::into)?;
323 let entry = self.interfaces.entry(path).or_default();
324 entry.insert(I::name(), ArcInterface::new(iface));
325 Ok(self)
326 }
327
328 /// Register a well-known name for this connection on the bus.
329 ///
330 /// This is similar to [`zbus::Connection::request_name`], except the name is requested as part
331 /// of the connection setup ([`Builder::build`]), immediately after interfaces
332 /// registered (through [`Builder::serve_at`]) are advertised. Typically this is
333 /// exactly what you want.
334 pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
335 where
336 W: TryInto<WellKnownName<'a>>,
337 W::Error: Into<Error>,
338 {
339 let well_known_name = well_known_name.try_into().map_err(Into::into)?;
340 self.names.insert(well_known_name);
341
342 Ok(self)
343 }
344
345 /// Sets the unique name of the connection.
346 ///
347 /// This is mainly provided for bus implementations. All other users should not need to use this
348 /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
349 ///
350 /// # Panics
351 ///
352 /// It will panic if the connection is to a message bus as it's the bus that assigns
353 /// peers their unique names.
354 #[cfg(feature = "bus-impl")]
355 pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
356 where
357 U: TryInto<crate::names::UniqueName<'a>>,
358 U::Error: Into<Error>,
359 {
360 if !self.p2p {
361 panic!("unique name can only be set for peer-to-peer connections");
362 }
363 let name = unique_name.try_into().map_err(Into::into)?;
364 self.unique_name = Some(name);
365
366 Ok(self)
367 }
368
369 /// Build the connection, consuming the builder.
370 ///
371 /// # Errors
372 ///
373 /// Until server-side bus connection is supported, attempting to build such a connection will
374 /// result in [`Error::Unsupported`] error.
375 pub async fn build(self) -> Result<Connection> {
376 let executor = Executor::new();
377 #[cfg(not(feature = "tokio"))]
378 let internal_executor = self.internal_executor;
379 // Box the future as it's large and can cause stack overflow.
380 let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
381
382 #[cfg(not(feature = "tokio"))]
383 start_internal_executor(&executor, internal_executor)?;
384
385 Ok(conn)
386 }
387
388 async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
389 #[cfg(feature = "p2p")]
390 let is_bus_conn = !self.p2p;
391 #[cfg(not(feature = "p2p"))]
392 let is_bus_conn = true;
393
394 #[cfg(not(feature = "bus-impl"))]
395 let unique_name = None;
396 #[cfg(feature = "bus-impl")]
397 let unique_name = self.unique_name.take().map(Into::into);
398
399 #[allow(unused_mut)]
400 let (mut stream, server_guid, authenticated) = self.target_connect().await?;
401 let mut auth = if authenticated {
402 let (socket_read, socket_write) = stream.take();
403 Authenticated {
404 #[cfg(unix)]
405 cap_unix_fd: socket_read.can_pass_unix_fd(),
406 socket_read: Some(socket_read),
407 socket_write,
408 // SAFETY: `server_guid` is provided as arg of `Builder::authenticated_socket`.
409 server_guid: server_guid.unwrap(),
410 already_received_bytes: vec![],
411 unique_name,
412 #[cfg(unix)]
413 already_received_fds: vec![],
414 }
415 } else {
416 #[cfg(feature = "p2p")]
417 match self.guid {
418 None => {
419 // SASL Handshake
420 Authenticated::client(stream, server_guid, self.auth_mechanisms, is_bus_conn)
421 .await?
422 }
423 Some(guid) => {
424 if !self.p2p {
425 return Err(Error::Unsupported);
426 }
427
428 let creds = stream.read_mut().peer_credentials().await?;
429 #[cfg(unix)]
430 let client_uid = creds.unix_user_id();
431 #[cfg(windows)]
432 let client_sid = creds.into_windows_sid();
433
434 Authenticated::server(
435 stream,
436 guid.to_owned().into(),
437 #[cfg(unix)]
438 client_uid,
439 #[cfg(windows)]
440 client_sid,
441 self.auth_mechanisms,
442 self.cookie_id,
443 self.cookie_context.unwrap_or_default(),
444 unique_name,
445 )
446 .await?
447 }
448 }
449
450 #[cfg(not(feature = "p2p"))]
451 Authenticated::client(stream, server_guid, self.auth_mechanisms, is_bus_conn).await?
452 };
453
454 // SAFETY: `Authenticated` is always built with these fields set to `Some`.
455 let socket_read = auth.socket_read.take().unwrap();
456 let already_received_bytes = auth.already_received_bytes.drain(..).collect();
457 #[cfg(unix)]
458 let already_received_fds = auth.already_received_fds.drain(..).collect();
459
460 let mut conn = Connection::new(auth, is_bus_conn, executor).await?;
461 conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
462
463 if !self.interfaces.is_empty() {
464 let object_server = conn.sync_object_server(false, None);
465 for (path, interfaces) in self.interfaces {
466 for (name, iface) in interfaces {
467 let added = object_server
468 .inner()
469 .add_arc_interface(path.clone(), name.clone(), iface.clone())
470 .await?;
471 if !added {
472 return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
473 }
474 }
475 }
476
477 let started_event = Event::new();
478 let listener = started_event.listen();
479 conn.start_object_server(Some(started_event));
480
481 listener.await;
482 }
483
484 // Start the socket reader task.
485 conn.init_socket_reader(
486 socket_read,
487 already_received_bytes,
488 #[cfg(unix)]
489 already_received_fds,
490 );
491
492 for name in self.names {
493 conn.request_name(name).await?;
494 }
495
496 Ok(conn)
497 }
498
499 fn new(target: Target) -> Self {
500 Self {
501 target: Some(target),
502 #[cfg(feature = "p2p")]
503 p2p: false,
504 max_queued: None,
505 guid: None,
506 internal_executor: true,
507 interfaces: HashMap::new(),
508 names: HashSet::new(),
509 auth_mechanisms: None,
510 #[cfg(feature = "bus-impl")]
511 unique_name: None,
512 cookie_id: None,
513 cookie_context: None,
514 }
515 }
516
517 async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
518 let mut authenticated = false;
519 let mut guid = None;
520 // SAFETY: `self.target` is always `Some` from the beginning and this method is only called
521 // once.
522 let split = match self.target.take().unwrap() {
523 #[cfg(not(feature = "tokio"))]
524 Target::UnixStream(stream) => Async::new(stream)?.into(),
525 #[cfg(all(unix, feature = "tokio"))]
526 Target::UnixStream(stream) => stream.into(),
527 #[cfg(not(feature = "tokio"))]
528 Target::TcpStream(stream) => Async::new(stream)?.into(),
529 #[cfg(feature = "tokio")]
530 Target::TcpStream(stream) => stream.into(),
531 #[cfg(all(feature = "vsock", not(feature = "tokio")))]
532 Target::VsockStream(stream) => Async::new(stream)?.into(),
533 #[cfg(feature = "tokio-vsock")]
534 Target::VsockStream(stream) => stream.into(),
535 Target::Address(address) => {
536 guid = address.guid().map(|g| g.to_owned().into());
537 match address.connect().await? {
538 #[cfg(any(unix, not(feature = "tokio")))]
539 address::transport::Stream::Unix(stream) => stream.into(),
540 address::transport::Stream::Tcp(stream) => stream.into(),
541 #[cfg(any(
542 all(feature = "vsock", not(feature = "tokio")),
543 feature = "tokio-vsock"
544 ))]
545 address::transport::Stream::Vsock(stream) => stream.into(),
546 }
547 }
548 Target::Socket(stream) => stream,
549 Target::AuthenticatedSocket(stream) => {
550 authenticated = true;
551 guid = self.guid.take().map(Into::into);
552 stream
553 }
554 };
555
556 Ok((split, guid, authenticated))
557 }
558}
559
560/// Start the internal executor thread.
561///
562/// Returns a dummy task that keep the executor ticking thread from exiting due to absence of any
563/// tasks until socket reader task kicks in.
564#[cfg(not(feature = "tokio"))]
565fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
566 if internal_executor {
567 let executor: Executor<'static> = executor.clone();
568 stdBuilder::thread::Builder::new()
569 .name("zbus::Connection executor".into())
570 .spawn(move || {
571 crate::utils::block_on(future:async move {
572 // Run as long as there is a task to run.
573 while !executor.is_empty() {
574 executor.tick().await;
575 }
576 })
577 })?;
578 }
579
580 Ok(())
581}
582