1 | use enumflags2::BitFlags; |
2 | use event_listener::EventListener; |
3 | use static_assertions::assert_impl_all; |
4 | use std::{convert::TryInto, io, ops::Deref, sync::Arc}; |
5 | use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName}; |
6 | use zvariant::ObjectPath; |
7 | |
8 | use crate::{ |
9 | blocking::ObjectServer, |
10 | fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply}, |
11 | utils::block_on, |
12 | DBusError, Error, Message, Result, |
13 | }; |
14 | |
15 | /// A blocking wrapper of [`zbus::Connection`]. |
16 | /// |
17 | /// Most of the API is very similar to [`zbus::Connection`], except it's blocking. One |
18 | /// notable difference is that there is no equivalent of [`Sink`] implementation provided. |
19 | /// |
20 | /// [`Sink`]: https://docs.rs/futures/0.3.17/futures/sink/trait.Sink.html |
21 | #[derive (derivative::Derivative, Clone)] |
22 | #[derivative(Debug)] |
23 | #[must_use = "Dropping a `Connection` will close the underlying socket." ] |
24 | pub struct Connection { |
25 | inner: crate::Connection, |
26 | } |
27 | |
28 | assert_impl_all!(Connection: Send, Sync, Unpin); |
29 | |
30 | impl Connection { |
31 | /// Create a `Connection` to the session/user message bus. |
32 | pub fn session() -> Result<Self> { |
33 | block_on(crate::Connection::session()).map(Self::from) |
34 | } |
35 | |
36 | /// Create a `Connection` to the system-wide message bus. |
37 | pub fn system() -> Result<Self> { |
38 | block_on(crate::Connection::system()).map(Self::from) |
39 | } |
40 | |
41 | /// The capacity of the main (unfiltered) queue. |
42 | pub fn max_queued(&self) -> usize { |
43 | self.inner.max_queued() |
44 | } |
45 | |
46 | /// Set the capacity of the main (unfiltered) queue. |
47 | pub fn set_max_queued(mut self, max: usize) { |
48 | self.inner.set_max_queued(max) |
49 | } |
50 | |
51 | /// The server's GUID. |
52 | pub fn server_guid(&self) -> &str { |
53 | self.inner.server_guid() |
54 | } |
55 | |
56 | /// The unique name as assigned by the message bus or `None` if not a message bus connection. |
57 | pub fn unique_name(&self) -> Option<&OwnedUniqueName> { |
58 | self.inner.unique_name() |
59 | } |
60 | |
61 | /// Send `msg` to the peer. |
62 | /// |
63 | /// The connection sets a unique serial number on the message before sending it off. |
64 | /// |
65 | /// On successfully sending off `msg`, the assigned serial number is returned. |
66 | pub fn send_message(&self, msg: Message) -> Result<u32> { |
67 | block_on(self.inner.send_message(msg)) |
68 | } |
69 | |
70 | /// Send a method call. |
71 | /// |
72 | /// Create a method-call message, send it over the connection, then wait for the reply. Incoming |
73 | /// messages are received through [`receive_message`] until the matching method reply (error or |
74 | /// return) is received. |
75 | /// |
76 | /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus |
77 | /// error replies are returned as [`MethodError`]. |
78 | /// |
79 | /// [`receive_message`]: struct.Connection.html#method.receive_message |
80 | /// [`MethodError`]: enum.Error.html#variant.MethodError |
81 | pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>( |
82 | &self, |
83 | destination: Option<D>, |
84 | path: P, |
85 | iface: Option<I>, |
86 | method_name: M, |
87 | body: &B, |
88 | ) -> Result<Arc<Message>> |
89 | where |
90 | D: TryInto<BusName<'d>>, |
91 | P: TryInto<ObjectPath<'p>>, |
92 | I: TryInto<InterfaceName<'i>>, |
93 | M: TryInto<MemberName<'m>>, |
94 | D::Error: Into<Error>, |
95 | P::Error: Into<Error>, |
96 | I::Error: Into<Error>, |
97 | M::Error: Into<Error>, |
98 | B: serde::ser::Serialize + zvariant::DynamicType, |
99 | { |
100 | block_on( |
101 | self.inner |
102 | .call_method(destination, path, iface, method_name, body), |
103 | ) |
104 | } |
105 | |
106 | /// Emit a signal. |
107 | /// |
108 | /// Create a signal message, and send it over the connection. |
109 | pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>( |
110 | &self, |
111 | destination: Option<D>, |
112 | path: P, |
113 | iface: I, |
114 | signal_name: M, |
115 | body: &B, |
116 | ) -> Result<()> |
117 | where |
118 | D: TryInto<BusName<'d>>, |
119 | P: TryInto<ObjectPath<'p>>, |
120 | I: TryInto<InterfaceName<'i>>, |
121 | M: TryInto<MemberName<'m>>, |
122 | D::Error: Into<Error>, |
123 | P::Error: Into<Error>, |
124 | I::Error: Into<Error>, |
125 | M::Error: Into<Error>, |
126 | B: serde::ser::Serialize + zvariant::DynamicType, |
127 | { |
128 | block_on( |
129 | self.inner |
130 | .emit_signal(destination, path, iface, signal_name, body), |
131 | ) |
132 | } |
133 | |
134 | /// Reply to a message. |
135 | /// |
136 | /// Given an existing message (likely a method call), send a reply back to the caller with the |
137 | /// given `body`. |
138 | /// |
139 | /// Returns the message serial number. |
140 | pub fn reply<B>(&self, call: &Message, body: &B) -> Result<u32> |
141 | where |
142 | B: serde::ser::Serialize + zvariant::DynamicType, |
143 | { |
144 | block_on(self.inner.reply(call, body)) |
145 | } |
146 | |
147 | /// Reply an error to a message. |
148 | /// |
149 | /// Given an existing message (likely a method call), send an error reply back to the caller |
150 | /// with the given `error_name` and `body`. |
151 | /// |
152 | /// Returns the message serial number. |
153 | pub fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<u32> |
154 | where |
155 | B: serde::ser::Serialize + zvariant::DynamicType, |
156 | E: TryInto<ErrorName<'e>>, |
157 | E::Error: Into<Error>, |
158 | { |
159 | block_on(self.inner.reply_error(call, error_name, body)) |
160 | } |
161 | |
162 | /// Reply to a method call with an error. |
163 | /// |
164 | /// Given an existing method call message header, send an error reply back to the caller |
165 | /// using one of the standard interface reply types. |
166 | /// |
167 | /// Returns the message serial number. |
168 | pub fn reply_dbus_error( |
169 | &self, |
170 | call: &zbus::MessageHeader<'_>, |
171 | err: impl DBusError, |
172 | ) -> Result<u32> { |
173 | block_on(self.inner.reply_dbus_error(call, err)) |
174 | } |
175 | |
176 | /// Register a well-known name for this service on the bus. |
177 | /// |
178 | /// Blocking version of [`crate::Connection::request_name`]. See docs there for more details |
179 | /// and caveats. |
180 | pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()> |
181 | where |
182 | W: TryInto<WellKnownName<'w>>, |
183 | W::Error: Into<Error>, |
184 | { |
185 | block_on(self.inner.request_name(well_known_name)) |
186 | } |
187 | |
188 | /// Register a well-known name for this service on the bus. |
189 | /// |
190 | /// Blocking version of [`crate::Connection::request_name_with_flags`]. See docs there for more |
191 | /// details and caveats. |
192 | pub fn request_name_with_flags<'w, W>( |
193 | &self, |
194 | well_known_name: W, |
195 | flags: BitFlags<RequestNameFlags>, |
196 | ) -> Result<RequestNameReply> |
197 | where |
198 | W: TryInto<WellKnownName<'w>>, |
199 | W::Error: Into<Error>, |
200 | { |
201 | block_on(self.inner.request_name_with_flags(well_known_name, flags)) |
202 | } |
203 | |
204 | /// Deregister a previously registered well-known name for this service on the bus. |
205 | /// |
206 | /// Use this method to deregister a well-known name, registered through |
207 | /// [`Connection::request_name`]. |
208 | /// |
209 | /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with |
210 | /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name |
211 | /// was not previously registered or already deregistered. |
212 | pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool> |
213 | where |
214 | W: TryInto<WellKnownName<'w>>, |
215 | W::Error: Into<Error>, |
216 | { |
217 | block_on(self.inner.release_name(well_known_name)) |
218 | } |
219 | |
220 | /// Checks if `self` is a connection to a message bus. |
221 | /// |
222 | /// This will return `false` for p2p connections. |
223 | pub fn is_bus(&self) -> bool { |
224 | self.inner.is_bus() |
225 | } |
226 | |
227 | /// Get a reference to the associated [`ObjectServer`]. |
228 | /// |
229 | /// The `ObjectServer` is created on-demand. |
230 | pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ { |
231 | self.inner.sync_object_server(true, None) |
232 | } |
233 | |
234 | /// Get a reference to the underlying async Connection. |
235 | pub fn inner(&self) -> &crate::Connection { |
236 | &self.inner |
237 | } |
238 | |
239 | /// Get the underlying async Connection, consuming `self`. |
240 | pub fn into_inner(self) -> crate::Connection { |
241 | self.inner |
242 | } |
243 | |
244 | /// Returns a listener, notified on various connection activity. |
245 | /// |
246 | /// This function is meant for the caller to implement idle or timeout on inactivity. |
247 | pub fn monitor_activity(&self) -> EventListener { |
248 | self.inner.monitor_activity() |
249 | } |
250 | |
251 | /// Returns the peer credentials. |
252 | /// |
253 | /// The fields are populated on the best effort basis. Some or all fields may not even make |
254 | /// sense for certain sockets or on certain platforms and hence will be set to `None`. |
255 | /// |
256 | /// # Caveats |
257 | /// |
258 | /// Currently `unix_group_ids` and `linux_security_label` fields are not populated. |
259 | pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> { |
260 | block_on(self.inner.peer_credentials()) |
261 | } |
262 | } |
263 | |
264 | impl From<crate::Connection> for Connection { |
265 | fn from(conn: crate::Connection) -> Self { |
266 | Self { inner: conn } |
267 | } |
268 | } |
269 | |
270 | #[cfg (all(test, unix))] |
271 | mod tests { |
272 | use ntest::timeout; |
273 | #[cfg (all(unix, not(feature = "tokio" )))] |
274 | use std::os::unix::net::UnixStream; |
275 | use std::thread; |
276 | use test_log::test; |
277 | #[cfg (all(unix, feature = "tokio" ))] |
278 | use tokio::net::UnixStream; |
279 | #[cfg (all(windows, not(feature = "tokio" )))] |
280 | use uds_windows::UnixStream; |
281 | |
282 | use crate::{ |
283 | blocking::{ConnectionBuilder, MessageIterator}, |
284 | Guid, |
285 | }; |
286 | |
287 | #[test ] |
288 | #[timeout(15000)] |
289 | fn unix_p2p() { |
290 | let guid = Guid::generate(); |
291 | |
292 | // Tokio needs us to call the sync function from async context. :shrug: |
293 | let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() }); |
294 | |
295 | let (tx, rx) = std::sync::mpsc::channel(); |
296 | let server_thread = thread::spawn(move || { |
297 | let c = ConnectionBuilder::unix_stream(p0) |
298 | .server(&guid) |
299 | .p2p() |
300 | .build() |
301 | .unwrap(); |
302 | rx.recv().unwrap(); |
303 | let reply = c |
304 | .call_method(None::<()>, "/" , Some("org.zbus.p2p" ), "Test" , &()) |
305 | .unwrap(); |
306 | assert_eq!(reply.to_string(), "Method return" ); |
307 | let val: String = reply.body().unwrap(); |
308 | val |
309 | }); |
310 | |
311 | let c = ConnectionBuilder::unix_stream(p1).p2p().build().unwrap(); |
312 | let listener = c.monitor_activity(); |
313 | let mut s = MessageIterator::from(&c); |
314 | tx.send(()).unwrap(); |
315 | let m = s.next().unwrap().unwrap(); |
316 | assert_eq!(m.to_string(), "Method call Test" ); |
317 | c.reply(&m, &("yay" )).unwrap(); |
318 | |
319 | for _ in s {} |
320 | |
321 | let val = server_thread.join().expect("failed to join server thread" ); |
322 | assert_eq!(val, "yay" ); |
323 | |
324 | // there was some activity |
325 | listener.wait(); |
326 | // eventually, nothing happens and it will timeout |
327 | loop { |
328 | let listener = c.monitor_activity(); |
329 | if !listener.wait_timeout(std::time::Duration::from_millis(10)) { |
330 | break; |
331 | } |
332 | } |
333 | } |
334 | } |
335 | |