1 | //! Blocking connection API. |
2 | |
3 | use enumflags2::BitFlags; |
4 | use event_listener::EventListener; |
5 | use static_assertions::assert_impl_all; |
6 | use std::{io, ops::Deref}; |
7 | use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName}; |
8 | use zvariant::ObjectPath; |
9 | |
10 | use crate::{ |
11 | blocking::ObjectServer, |
12 | fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply}, |
13 | message::Message, |
14 | utils::block_on, |
15 | DBusError, Error, Result, |
16 | }; |
17 | |
18 | mod builder; |
19 | pub use builder::Builder; |
20 | |
21 | /// A blocking wrapper of [`zbus::Connection`]. |
22 | /// |
23 | /// Most of the API is very similar to [`zbus::Connection`], except it's blocking. |
24 | #[derive (Debug, Clone)] |
25 | #[must_use = "Dropping a `Connection` will close the underlying socket." ] |
26 | pub struct Connection { |
27 | inner: crate::Connection, |
28 | } |
29 | |
30 | assert_impl_all!(Connection: Send, Sync, Unpin); |
31 | |
32 | impl Connection { |
33 | /// Create a `Connection` to the session/user message bus. |
34 | pub fn session() -> Result<Self> { |
35 | block_on(crate::Connection::session()).map(Self::from) |
36 | } |
37 | |
38 | /// Create a `Connection` to the system-wide message bus. |
39 | pub fn system() -> Result<Self> { |
40 | block_on(crate::Connection::system()).map(Self::from) |
41 | } |
42 | |
43 | /// The capacity of the main (unfiltered) queue. |
44 | pub fn max_queued(&self) -> usize { |
45 | self.inner.max_queued() |
46 | } |
47 | |
48 | /// Set the capacity of the main (unfiltered) queue. |
49 | pub fn set_max_queued(mut self, max: usize) { |
50 | self.inner.set_max_queued(max) |
51 | } |
52 | |
53 | /// The server's GUID. |
54 | pub fn server_guid(&self) -> &str { |
55 | self.inner.server_guid() |
56 | } |
57 | |
58 | /// The unique name as assigned by the message bus or `None` if not a message bus connection. |
59 | pub fn unique_name(&self) -> Option<&OwnedUniqueName> { |
60 | self.inner.unique_name() |
61 | } |
62 | |
63 | /// Send `msg` to the peer. |
64 | pub fn send(&self, msg: &Message) -> Result<()> { |
65 | block_on(self.inner.send(msg)) |
66 | } |
67 | |
68 | /// Send a method call. |
69 | /// |
70 | /// Create a method-call message, send it over the connection, then wait for the reply. |
71 | /// |
72 | /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus |
73 | /// error replies are returned as [`Error::MethodError`]. |
74 | pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>( |
75 | &self, |
76 | destination: Option<D>, |
77 | path: P, |
78 | iface: Option<I>, |
79 | method_name: M, |
80 | body: &B, |
81 | ) -> Result<Message> |
82 | where |
83 | D: TryInto<BusName<'d>>, |
84 | P: TryInto<ObjectPath<'p>>, |
85 | I: TryInto<InterfaceName<'i>>, |
86 | M: TryInto<MemberName<'m>>, |
87 | D::Error: Into<Error>, |
88 | P::Error: Into<Error>, |
89 | I::Error: Into<Error>, |
90 | M::Error: Into<Error>, |
91 | B: serde::ser::Serialize + zvariant::DynamicType, |
92 | { |
93 | block_on( |
94 | self.inner |
95 | .call_method(destination, path, iface, method_name, body), |
96 | ) |
97 | } |
98 | |
99 | /// Emit a signal. |
100 | /// |
101 | /// Create a signal message, and send it over the connection. |
102 | pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>( |
103 | &self, |
104 | destination: Option<D>, |
105 | path: P, |
106 | iface: I, |
107 | signal_name: M, |
108 | body: &B, |
109 | ) -> Result<()> |
110 | where |
111 | D: TryInto<BusName<'d>>, |
112 | P: TryInto<ObjectPath<'p>>, |
113 | I: TryInto<InterfaceName<'i>>, |
114 | M: TryInto<MemberName<'m>>, |
115 | D::Error: Into<Error>, |
116 | P::Error: Into<Error>, |
117 | I::Error: Into<Error>, |
118 | M::Error: Into<Error>, |
119 | B: serde::ser::Serialize + zvariant::DynamicType, |
120 | { |
121 | block_on( |
122 | self.inner |
123 | .emit_signal(destination, path, iface, signal_name, body), |
124 | ) |
125 | } |
126 | |
127 | /// Reply to a message. |
128 | /// |
129 | /// Given an existing message (likely a method call), send a reply back to the caller with the |
130 | /// given `body`. |
131 | pub fn reply<B>(&self, call: &Message, body: &B) -> Result<()> |
132 | where |
133 | B: serde::ser::Serialize + zvariant::DynamicType, |
134 | { |
135 | block_on(self.inner.reply(call, body)) |
136 | } |
137 | |
138 | /// Reply an error to a message. |
139 | /// |
140 | /// Given an existing message (likely a method call), send an error reply back to the caller |
141 | /// with the given `error_name` and `body`. |
142 | /// |
143 | /// Returns the message serial number. |
144 | pub fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<()> |
145 | where |
146 | B: serde::ser::Serialize + zvariant::DynamicType, |
147 | E: TryInto<ErrorName<'e>>, |
148 | E::Error: Into<Error>, |
149 | { |
150 | block_on(self.inner.reply_error(call, error_name, body)) |
151 | } |
152 | |
153 | /// Reply to a method call with an error. |
154 | /// |
155 | /// Given an existing method call message header, send an error reply back to the caller |
156 | /// using one of the standard interface reply types. |
157 | /// |
158 | /// Returns the message serial number. |
159 | pub fn reply_dbus_error( |
160 | &self, |
161 | call: &zbus::message::Header<'_>, |
162 | err: impl DBusError, |
163 | ) -> Result<()> { |
164 | block_on(self.inner.reply_dbus_error(call, err)) |
165 | } |
166 | |
167 | /// Register a well-known name for this service on the bus. |
168 | /// |
169 | /// Blocking version of [`crate::Connection::request_name`]. See docs there for more details |
170 | /// and caveats. |
171 | pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()> |
172 | where |
173 | W: TryInto<WellKnownName<'w>>, |
174 | W::Error: Into<Error>, |
175 | { |
176 | block_on(self.inner.request_name(well_known_name)) |
177 | } |
178 | |
179 | /// Register a well-known name for this service on the bus. |
180 | /// |
181 | /// Blocking version of [`crate::Connection::request_name_with_flags`]. See docs there for more |
182 | /// details and caveats. |
183 | pub fn request_name_with_flags<'w, W>( |
184 | &self, |
185 | well_known_name: W, |
186 | flags: BitFlags<RequestNameFlags>, |
187 | ) -> Result<RequestNameReply> |
188 | where |
189 | W: TryInto<WellKnownName<'w>>, |
190 | W::Error: Into<Error>, |
191 | { |
192 | block_on(self.inner.request_name_with_flags(well_known_name, flags)) |
193 | } |
194 | |
195 | /// Deregister a previously registered well-known name for this service on the bus. |
196 | /// |
197 | /// Use this method to deregister a well-known name, registered through |
198 | /// [`Connection::request_name`]. |
199 | /// |
200 | /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with |
201 | /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name |
202 | /// was not previously registered or already deregistered. |
203 | pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool> |
204 | where |
205 | W: TryInto<WellKnownName<'w>>, |
206 | W::Error: Into<Error>, |
207 | { |
208 | block_on(self.inner.release_name(well_known_name)) |
209 | } |
210 | |
211 | /// Checks if `self` is a connection to a message bus. |
212 | /// |
213 | /// This will return `false` for p2p connections. |
214 | pub fn is_bus(&self) -> bool { |
215 | self.inner.is_bus() |
216 | } |
217 | |
218 | /// Get a reference to the associated [`ObjectServer`]. |
219 | /// |
220 | /// The `ObjectServer` is created on-demand. |
221 | pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ { |
222 | self.inner.sync_object_server(true, None) |
223 | } |
224 | |
225 | /// Get a reference to the underlying async Connection. |
226 | pub fn inner(&self) -> &crate::Connection { |
227 | &self.inner |
228 | } |
229 | |
230 | /// Get the underlying async Connection, consuming `self`. |
231 | pub fn into_inner(self) -> crate::Connection { |
232 | self.inner |
233 | } |
234 | |
235 | /// Returns a listener, notified on various connection activity. |
236 | /// |
237 | /// This function is meant for the caller to implement idle or timeout on inactivity. |
238 | pub fn monitor_activity(&self) -> EventListener { |
239 | self.inner.monitor_activity() |
240 | } |
241 | |
242 | /// Returns the peer credentials. |
243 | /// |
244 | /// The fields are populated on the best effort basis. Some or all fields may not even make |
245 | /// sense for certain sockets or on certain platforms and hence will be set to `None`. |
246 | /// |
247 | /// # Caveats |
248 | /// |
249 | /// Currently `unix_group_ids` and `linux_security_label` fields are not populated. |
250 | pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> { |
251 | block_on(self.inner.peer_credentials()) |
252 | } |
253 | |
254 | /// Close the connection. |
255 | /// |
256 | /// After this call, all reading and writing operations will fail. |
257 | pub fn close(self) -> Result<()> { |
258 | block_on(self.inner.close()) |
259 | } |
260 | } |
261 | |
262 | impl From<crate::Connection> for Connection { |
263 | fn from(conn: crate::Connection) -> Self { |
264 | Self { inner: conn } |
265 | } |
266 | } |
267 | |
268 | #[cfg (feature = "p2p" )] |
269 | #[cfg (all(test, unix))] |
270 | mod tests { |
271 | use event_listener::Listener; |
272 | use ntest::timeout; |
273 | #[cfg (all(unix, not(feature = "tokio" )))] |
274 | use std::os::unix::net::UnixStream; |
275 | use std::thread; |
276 | use test_log::test; |
277 | #[cfg (all(unix, feature = "tokio" ))] |
278 | use tokio::net::UnixStream; |
279 | #[cfg (all(windows, not(feature = "tokio" )))] |
280 | use uds_windows::UnixStream; |
281 | |
282 | use crate::{ |
283 | blocking::{connection::Builder, MessageIterator}, |
284 | Guid, |
285 | }; |
286 | |
287 | #[test ] |
288 | #[timeout(15000)] |
289 | fn unix_p2p() { |
290 | let guid = Guid::generate(); |
291 | |
292 | // Tokio needs us to call the sync function from async context. :shrug: |
293 | let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() }); |
294 | |
295 | let (tx, rx) = std::sync::mpsc::channel(); |
296 | let server_thread = thread::spawn(move || { |
297 | let c = Builder::unix_stream(p0) |
298 | .server(guid) |
299 | .unwrap() |
300 | .p2p() |
301 | .build() |
302 | .unwrap(); |
303 | rx.recv().unwrap(); |
304 | let reply = c |
305 | .call_method(None::<()>, "/" , Some("org.zbus.p2p" ), "Test" , &()) |
306 | .unwrap(); |
307 | assert_eq!(reply.to_string(), "Method return" ); |
308 | let val: String = reply.body().deserialize().unwrap(); |
309 | val |
310 | }); |
311 | |
312 | let c = Builder::unix_stream(p1).p2p().build().unwrap(); |
313 | |
314 | let listener = c.monitor_activity(); |
315 | |
316 | let mut s = MessageIterator::from(&c); |
317 | tx.send(()).unwrap(); |
318 | let m = s.next().unwrap().unwrap(); |
319 | assert_eq!(m.to_string(), "Method call Test" ); |
320 | c.reply(&m, &("yay" )).unwrap(); |
321 | |
322 | for _ in s {} |
323 | |
324 | let val = server_thread.join().expect("failed to join server thread" ); |
325 | assert_eq!(val, "yay" ); |
326 | |
327 | // there was some activity |
328 | listener.wait(); |
329 | // eventually, nothing happens and it will timeout |
330 | loop { |
331 | let listener = c.monitor_activity(); |
332 | if listener |
333 | .wait_timeout(std::time::Duration::from_millis(10)) |
334 | .is_none() |
335 | { |
336 | break; |
337 | } |
338 | } |
339 | } |
340 | } |
341 | |