1use enumflags2::BitFlags;
2use event_listener::EventListener;
3use static_assertions::assert_impl_all;
4use std::{convert::TryInto, io, ops::Deref, sync::Arc};
5use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
6use zvariant::ObjectPath;
7
8use crate::{
9 blocking::ObjectServer,
10 fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply},
11 utils::block_on,
12 DBusError, Error, Message, Result,
13};
14
15/// A blocking wrapper of [`zbus::Connection`].
16///
17/// Most of the API is very similar to [`zbus::Connection`], except it's blocking. One
18/// notable difference is that there is no equivalent of [`Sink`] implementation provided.
19///
20/// [`Sink`]: https://docs.rs/futures/0.3.17/futures/sink/trait.Sink.html
21#[derive(derivative::Derivative, Clone)]
22#[derivative(Debug)]
23#[must_use = "Dropping a `Connection` will close the underlying socket."]
24pub struct Connection {
25 inner: crate::Connection,
26}
27
28assert_impl_all!(Connection: Send, Sync, Unpin);
29
30impl Connection {
31 /// Create a `Connection` to the session/user message bus.
32 pub fn session() -> Result<Self> {
33 block_on(crate::Connection::session()).map(Self::from)
34 }
35
36 /// Create a `Connection` to the system-wide message bus.
37 pub fn system() -> Result<Self> {
38 block_on(crate::Connection::system()).map(Self::from)
39 }
40
41 /// The capacity of the main (unfiltered) queue.
42 pub fn max_queued(&self) -> usize {
43 self.inner.max_queued()
44 }
45
46 /// Set the capacity of the main (unfiltered) queue.
47 pub fn set_max_queued(mut self, max: usize) {
48 self.inner.set_max_queued(max)
49 }
50
51 /// The server's GUID.
52 pub fn server_guid(&self) -> &str {
53 self.inner.server_guid()
54 }
55
56 /// The unique name as assigned by the message bus or `None` if not a message bus connection.
57 pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
58 self.inner.unique_name()
59 }
60
61 /// Send `msg` to the peer.
62 ///
63 /// The connection sets a unique serial number on the message before sending it off.
64 ///
65 /// On successfully sending off `msg`, the assigned serial number is returned.
66 pub fn send_message(&self, msg: Message) -> Result<u32> {
67 block_on(self.inner.send_message(msg))
68 }
69
70 /// Send a method call.
71 ///
72 /// Create a method-call message, send it over the connection, then wait for the reply. Incoming
73 /// messages are received through [`receive_message`] until the matching method reply (error or
74 /// return) is received.
75 ///
76 /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
77 /// error replies are returned as [`MethodError`].
78 ///
79 /// [`receive_message`]: struct.Connection.html#method.receive_message
80 /// [`MethodError`]: enum.Error.html#variant.MethodError
81 pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
82 &self,
83 destination: Option<D>,
84 path: P,
85 iface: Option<I>,
86 method_name: M,
87 body: &B,
88 ) -> Result<Arc<Message>>
89 where
90 D: TryInto<BusName<'d>>,
91 P: TryInto<ObjectPath<'p>>,
92 I: TryInto<InterfaceName<'i>>,
93 M: TryInto<MemberName<'m>>,
94 D::Error: Into<Error>,
95 P::Error: Into<Error>,
96 I::Error: Into<Error>,
97 M::Error: Into<Error>,
98 B: serde::ser::Serialize + zvariant::DynamicType,
99 {
100 block_on(
101 self.inner
102 .call_method(destination, path, iface, method_name, body),
103 )
104 }
105
106 /// Emit a signal.
107 ///
108 /// Create a signal message, and send it over the connection.
109 pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
110 &self,
111 destination: Option<D>,
112 path: P,
113 iface: I,
114 signal_name: M,
115 body: &B,
116 ) -> Result<()>
117 where
118 D: TryInto<BusName<'d>>,
119 P: TryInto<ObjectPath<'p>>,
120 I: TryInto<InterfaceName<'i>>,
121 M: TryInto<MemberName<'m>>,
122 D::Error: Into<Error>,
123 P::Error: Into<Error>,
124 I::Error: Into<Error>,
125 M::Error: Into<Error>,
126 B: serde::ser::Serialize + zvariant::DynamicType,
127 {
128 block_on(
129 self.inner
130 .emit_signal(destination, path, iface, signal_name, body),
131 )
132 }
133
134 /// Reply to a message.
135 ///
136 /// Given an existing message (likely a method call), send a reply back to the caller with the
137 /// given `body`.
138 ///
139 /// Returns the message serial number.
140 pub fn reply<B>(&self, call: &Message, body: &B) -> Result<u32>
141 where
142 B: serde::ser::Serialize + zvariant::DynamicType,
143 {
144 block_on(self.inner.reply(call, body))
145 }
146
147 /// Reply an error to a message.
148 ///
149 /// Given an existing message (likely a method call), send an error reply back to the caller
150 /// with the given `error_name` and `body`.
151 ///
152 /// Returns the message serial number.
153 pub fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<u32>
154 where
155 B: serde::ser::Serialize + zvariant::DynamicType,
156 E: TryInto<ErrorName<'e>>,
157 E::Error: Into<Error>,
158 {
159 block_on(self.inner.reply_error(call, error_name, body))
160 }
161
162 /// Reply to a method call with an error.
163 ///
164 /// Given an existing method call message header, send an error reply back to the caller
165 /// using one of the standard interface reply types.
166 ///
167 /// Returns the message serial number.
168 pub fn reply_dbus_error(
169 &self,
170 call: &zbus::MessageHeader<'_>,
171 err: impl DBusError,
172 ) -> Result<u32> {
173 block_on(self.inner.reply_dbus_error(call, err))
174 }
175
176 /// Register a well-known name for this service on the bus.
177 ///
178 /// Blocking version of [`crate::Connection::request_name`]. See docs there for more details
179 /// and caveats.
180 pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
181 where
182 W: TryInto<WellKnownName<'w>>,
183 W::Error: Into<Error>,
184 {
185 block_on(self.inner.request_name(well_known_name))
186 }
187
188 /// Register a well-known name for this service on the bus.
189 ///
190 /// Blocking version of [`crate::Connection::request_name_with_flags`]. See docs there for more
191 /// details and caveats.
192 pub fn request_name_with_flags<'w, W>(
193 &self,
194 well_known_name: W,
195 flags: BitFlags<RequestNameFlags>,
196 ) -> Result<RequestNameReply>
197 where
198 W: TryInto<WellKnownName<'w>>,
199 W::Error: Into<Error>,
200 {
201 block_on(self.inner.request_name_with_flags(well_known_name, flags))
202 }
203
204 /// Deregister a previously registered well-known name for this service on the bus.
205 ///
206 /// Use this method to deregister a well-known name, registered through
207 /// [`Connection::request_name`].
208 ///
209 /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
210 /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
211 /// was not previously registered or already deregistered.
212 pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
213 where
214 W: TryInto<WellKnownName<'w>>,
215 W::Error: Into<Error>,
216 {
217 block_on(self.inner.release_name(well_known_name))
218 }
219
220 /// Checks if `self` is a connection to a message bus.
221 ///
222 /// This will return `false` for p2p connections.
223 pub fn is_bus(&self) -> bool {
224 self.inner.is_bus()
225 }
226
227 /// Get a reference to the associated [`ObjectServer`].
228 ///
229 /// The `ObjectServer` is created on-demand.
230 pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
231 self.inner.sync_object_server(true, None)
232 }
233
234 /// Get a reference to the underlying async Connection.
235 pub fn inner(&self) -> &crate::Connection {
236 &self.inner
237 }
238
239 /// Get the underlying async Connection, consuming `self`.
240 pub fn into_inner(self) -> crate::Connection {
241 self.inner
242 }
243
244 /// Returns a listener, notified on various connection activity.
245 ///
246 /// This function is meant for the caller to implement idle or timeout on inactivity.
247 pub fn monitor_activity(&self) -> EventListener {
248 self.inner.monitor_activity()
249 }
250
251 /// Returns the peer credentials.
252 ///
253 /// The fields are populated on the best effort basis. Some or all fields may not even make
254 /// sense for certain sockets or on certain platforms and hence will be set to `None`.
255 ///
256 /// # Caveats
257 ///
258 /// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
259 pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
260 block_on(self.inner.peer_credentials())
261 }
262}
263
264impl From<crate::Connection> for Connection {
265 fn from(conn: crate::Connection) -> Self {
266 Self { inner: conn }
267 }
268}
269
270#[cfg(all(test, unix))]
271mod tests {
272 use ntest::timeout;
273 #[cfg(all(unix, not(feature = "tokio")))]
274 use std::os::unix::net::UnixStream;
275 use std::thread;
276 use test_log::test;
277 #[cfg(all(unix, feature = "tokio"))]
278 use tokio::net::UnixStream;
279 #[cfg(all(windows, not(feature = "tokio")))]
280 use uds_windows::UnixStream;
281
282 use crate::{
283 blocking::{ConnectionBuilder, MessageIterator},
284 Guid,
285 };
286
287 #[test]
288 #[timeout(15000)]
289 fn unix_p2p() {
290 let guid = Guid::generate();
291
292 // Tokio needs us to call the sync function from async context. :shrug:
293 let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() });
294
295 let (tx, rx) = std::sync::mpsc::channel();
296 let server_thread = thread::spawn(move || {
297 let c = ConnectionBuilder::unix_stream(p0)
298 .server(&guid)
299 .p2p()
300 .build()
301 .unwrap();
302 rx.recv().unwrap();
303 let reply = c
304 .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
305 .unwrap();
306 assert_eq!(reply.to_string(), "Method return");
307 let val: String = reply.body().unwrap();
308 val
309 });
310
311 let c = ConnectionBuilder::unix_stream(p1).p2p().build().unwrap();
312 let listener = c.monitor_activity();
313 let mut s = MessageIterator::from(&c);
314 tx.send(()).unwrap();
315 let m = s.next().unwrap().unwrap();
316 assert_eq!(m.to_string(), "Method call Test");
317 c.reply(&m, &("yay")).unwrap();
318
319 for _ in s {}
320
321 let val = server_thread.join().expect("failed to join server thread");
322 assert_eq!(val, "yay");
323
324 // there was some activity
325 listener.wait();
326 // eventually, nothing happens and it will timeout
327 loop {
328 let listener = c.monitor_activity();
329 if !listener.wait_timeout(std::time::Duration::from_millis(10)) {
330 break;
331 }
332 }
333 }
334}
335